[jboss-cvs] JBoss Messaging SVN: r5140 - in branches/Branch_Chunk_Clebert: src/main/org/jboss/messaging/core/journal/impl and 11 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Fri Oct 17 17:34:39 EDT 2008
Author: clebert.suconic at jboss.com
Date: 2008-10-17 17:34:38 -0400 (Fri, 17 Oct 2008)
New Revision: 5140
Added:
branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/paging/PageLargeMessage.java
branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/paging/impl/PageLargeMessageImpl.java
Modified:
branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/journal/SequentialFileFactory.java
branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFile.java
branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFileFactory.java
branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/journal/impl/NIOSequentialFile.java
branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/paging/PageMessage.java
branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/paging/impl/PageMessageImpl.java
branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/paging/impl/PagingManagerImpl.java
branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/persistence/StorageManager.java
branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalServerLargeMessageImpl.java
branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java
branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/persistence/impl/nullpm/NullStorageManager.java
branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/persistence/impl/nullpm/NullStorageServerLargeMessageImpl.java
branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/transaction/impl/TransactionImpl.java
branches/Branch_Chunk_Clebert/tests/src/org/jboss/messaging/tests/integration/chunkmessage/MessageChunkTest.java
branches/Branch_Chunk_Clebert/tests/src/org/jboss/messaging/tests/integration/paging/PagingManagerIntegrationTest.java
branches/Branch_Chunk_Clebert/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java
branches/Branch_Chunk_Clebert/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PageImplTestBase.java
branches/Branch_Chunk_Clebert/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PagingStoreImplTest.java
branches/Branch_Chunk_Clebert/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PagingStoreTestBase.java
Log:
LargeMessages paging and few other tweaks
Modified: branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/journal/SequentialFileFactory.java
===================================================================
--- branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/journal/SequentialFileFactory.java 2008-10-17 18:53:24 UTC (rev 5139)
+++ branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/journal/SequentialFileFactory.java 2008-10-17 21:34:38 UTC (rev 5140)
@@ -35,7 +35,7 @@
*/
public interface SequentialFileFactory
{
- SequentialFile createSequentialFile(String fileName, int maxIO) throws Exception;
+ SequentialFile createSequentialFile(String fileName, int maxIO);
List<String> listFiles(String extension) throws Exception;
Modified: branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFile.java
===================================================================
--- branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFile.java 2008-10-17 18:53:24 UTC (rev 5139)
+++ branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFile.java 2008-10-17 21:34:38 UTC (rev 5140)
@@ -67,7 +67,7 @@
// AIO using a single thread.
private ExecutorService executor;
- public AIOSequentialFile(final String journalDir, final String fileName, final int maxIO) throws Exception
+ public AIOSequentialFile(final String journalDir, final String fileName, final int maxIO)
{
this.journalDir = journalDir;
this.fileName = fileName;
Modified: branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFileFactory.java
===================================================================
--- branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFileFactory.java 2008-10-17 18:53:24 UTC (rev 5139)
+++ branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFileFactory.java 2008-10-17 21:34:38 UTC (rev 5140)
@@ -41,7 +41,7 @@
super(journalDir);
}
- public SequentialFile createSequentialFile(final String fileName, final int maxIO) throws Exception
+ public SequentialFile createSequentialFile(final String fileName, final int maxIO)
{
return new AIOSequentialFile(journalDir, fileName, maxIO);
}
Modified: branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/journal/impl/NIOSequentialFile.java
===================================================================
--- branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/journal/impl/NIOSequentialFile.java 2008-10-17 18:53:24 UTC (rev 5139)
+++ branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/journal/impl/NIOSequentialFile.java 2008-10-17 21:34:38 UTC (rev 5140)
@@ -48,7 +48,7 @@
private final String fileName;
- private File file;
+ private final File file;
private FileChannel channel;
@@ -61,6 +61,9 @@
this.journalDir = journalDir;
this.fileName = fileName;
+
+ this.file = new File(journalDir + "/" + fileName);
+
}
public int getAlignment()
@@ -78,15 +81,13 @@
return fileName;
}
- public boolean isOpen()
+ public synchronized boolean isOpen()
{
- return file != null;
+ return channel != null;
}
public synchronized void open() throws Exception
{
- file = new File(journalDir + "/" + fileName);
-
rfile = new RandomAccessFile(file, "rw");
channel = rfile.getChannel();
@@ -131,15 +132,16 @@
channel = null;
rfile = null;
-
- file = null;
}
public void delete() throws Exception
{
+ if (isOpen())
+ {
+ close();
+ }
+
file.delete();
-
- close();
}
public int read(final ByteBuffer bytes) throws Exception
Added: branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/paging/PageLargeMessage.java
===================================================================
--- branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/paging/PageLargeMessage.java (rev 0)
+++ branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/paging/PageLargeMessage.java 2008-10-17 21:34:38 UTC (rev 5140)
@@ -0,0 +1,42 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.core.paging;
+
+import org.jboss.messaging.core.journal.EncodingSupport;
+import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
+
+/**
+ * A PageLargeMessage
+ *
+ * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ *
+ * Created Oct 17, 2008 4:04:39 PM
+ *
+ *
+ */
+public interface PageLargeMessage extends EncodingSupport
+{
+ byte[] getBytes();
+
+ MessagingBuffer getBuffer();
+}
Modified: branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/paging/PageMessage.java
===================================================================
--- branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/paging/PageMessage.java 2008-10-17 18:53:24 UTC (rev 5139)
+++ branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/paging/PageMessage.java 2008-10-17 21:34:38 UTC (rev 5140)
@@ -36,7 +36,7 @@
*/
public interface PageMessage extends EncodingSupport
{
- ServerMessage getMessage();
+ Object getMessage();
long getTransactionID();
Added: branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/paging/impl/PageLargeMessageImpl.java
===================================================================
--- branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/paging/impl/PageLargeMessageImpl.java (rev 0)
+++ branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/paging/impl/PageLargeMessageImpl.java 2008-10-17 21:34:38 UTC (rev 5140)
@@ -0,0 +1,98 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.core.paging.impl;
+
+import java.nio.ByteBuffer;
+
+import org.jboss.messaging.core.paging.PageLargeMessage;
+import org.jboss.messaging.core.remoting.impl.ByteBufferWrapper;
+import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
+
+/**
+ * <p>A PageLargeMessageImpl>/p>
+ *
+ * <p>A Paged Large Message needs to be instantiated later when the StorageManager instance is available.
+ * This class will hold the bytes until it is possible to instantiate the LargeMessage properly</p>
+ *
+ * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ *
+ * Created Oct 17, 2008 2:46:32 PM
+ *
+ *
+ */
+public class PageLargeMessageImpl implements PageLargeMessage
+{
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ final byte[] bytes;
+
+ /**
+ * @param bytes
+ */
+ public PageLargeMessageImpl(byte[] bytes)
+ {
+ super();
+ this.bytes = bytes;
+ }
+
+ /**
+ * @return the bytes
+ */
+ public byte[] getBytes()
+ {
+ return bytes;
+ }
+
+ public MessagingBuffer getBuffer()
+ {
+ return new ByteBufferWrapper(ByteBuffer.wrap(bytes));
+ }
+
+ /* (non-Javadoc)
+ * @see org.jboss.messaging.core.journal.EncodingSupport#decode(org.jboss.messaging.core.remoting.spi.MessagingBuffer)
+ */
+ public void decode(MessagingBuffer buffer)
+ {
+ throw new IllegalStateException("Not supported");
+ }
+
+ /* (non-Javadoc)
+ * @see org.jboss.messaging.core.journal.EncodingSupport#encode(org.jboss.messaging.core.remoting.spi.MessagingBuffer)
+ */
+ public void encode(MessagingBuffer buffer)
+ {
+ buffer.putBytes(bytes);
+ }
+
+ /* (non-Javadoc)
+ * @see org.jboss.messaging.core.journal.EncodingSupport#getEncodeSize()
+ */
+ public int getEncodeSize()
+ {
+ return bytes.length;
+ }
+
+}
Modified: branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/paging/impl/PageMessageImpl.java
===================================================================
--- branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/paging/impl/PageMessageImpl.java 2008-10-17 18:53:24 UTC (rev 5139)
+++ branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/paging/impl/PageMessageImpl.java 2008-10-17 21:34:38 UTC (rev 5140)
@@ -22,8 +22,10 @@
package org.jboss.messaging.core.paging.impl;
+import org.jboss.messaging.core.journal.EncodingSupport;
import org.jboss.messaging.core.paging.PageMessage;
import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
+import org.jboss.messaging.core.server.ServerLargeMessage;
import org.jboss.messaging.core.server.ServerMessage;
import org.jboss.messaging.core.server.impl.ServerMessageImpl;
import org.jboss.messaging.util.DataConstants;
@@ -50,7 +52,7 @@
// Public --------------------------------------------------------
- private final ServerMessage message;
+ private EncodingSupport message;
private long transactionID = -1;
@@ -87,7 +89,7 @@
this.properties = properties;
}
- public ServerMessage getMessage()
+ public Object getMessage()
{
return message;
}
@@ -107,20 +109,51 @@
public void decode(final MessagingBuffer buffer)
{
transactionID = buffer.getLong();
- message.decode(buffer);
+
+ boolean isLargeMessage = buffer.getBoolean();
+
+ if (isLargeMessage)
+ {
+ int largeMessageHeaderSize = buffer.getInt();
+
+ byte[] bytesLargeMessage = new byte[largeMessageHeaderSize];
+
+ buffer.getBytes(bytesLargeMessage);
+
+ this.message = new PageLargeMessageImpl(bytesLargeMessage);
+
+ }
+ else
+ {
+ message = new ServerMessageImpl();
+ message.decode(buffer);
+ }
+
properties.decode(buffer);
}
public void encode(final MessagingBuffer buffer)
{
buffer.putLong(transactionID);
- message.encode(buffer);
+ buffer.putBoolean(message instanceof ServerLargeMessage);
+ if (message instanceof ServerLargeMessage)
+ {
+ buffer.putInt(message.getEncodeSize());
+ message.encode(buffer);
+ }
+ else
+ {
+ message.encode(buffer);
+ }
properties.encode(buffer);
}
public int getEncodeSize()
{
- return DataConstants.SIZE_LONG + message.getEncodeSize() + properties.getEncodeSize();
+ return DataConstants.SIZE_LONG + DataConstants.SIZE_BYTE +
+ (message instanceof ServerLargeMessage ? DataConstants.SIZE_INT : 0) +
+ message.getEncodeSize() +
+ properties.getEncodeSize();
}
// Package protected ---------------------------------------------
Modified: branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/paging/impl/PagingManagerImpl.java
===================================================================
--- branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/paging/impl/PagingManagerImpl.java 2008-10-17 18:53:24 UTC (rev 5139)
+++ branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/paging/impl/PagingManagerImpl.java 2008-10-17 21:34:38 UTC (rev 5140)
@@ -22,6 +22,15 @@
package org.jboss.messaging.core.paging.impl;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
import org.jboss.messaging.core.logging.Logger;
import org.jboss.messaging.core.paging.LastPageRecord;
import org.jboss.messaging.core.paging.PageMessage;
@@ -32,21 +41,13 @@
import org.jboss.messaging.core.persistence.StorageManager;
import org.jboss.messaging.core.postoffice.PostOffice;
import org.jboss.messaging.core.server.MessageReference;
+import org.jboss.messaging.core.server.ServerLargeMessage;
import org.jboss.messaging.core.server.ServerMessage;
import org.jboss.messaging.core.settings.HierarchicalRepository;
import org.jboss.messaging.core.settings.impl.QueueSettings;
import org.jboss.messaging.util.SimpleString;
import org.jboss.messaging.util.TypedProperties;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.List;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
-
/**
* <p>Look at the <a href="http://wiki.jboss.org/auth/wiki/JBossMessaging2Paging">WIKI</a> for more information.</p>
*
@@ -95,6 +96,8 @@
private static final SimpleString SCHEDULED_DELIVERY_PROP = new SimpleString("JBM_SCHEDULED_DELIVERY_PROP");
+ private static final SimpleString MESSAGE_ID_PROP = new SimpleString("JBM_MESSAGE_ID");
+
// This is just a debug tool method.
// During debugs you could make log.trace as log.info, and change the
// variable isTrace above
@@ -210,6 +213,19 @@
for (PageMessage msg : data)
{
+ ServerMessage pagedMessage = null;
+
+ if (msg.getMessage() instanceof ServerMessage)
+ {
+ pagedMessage = (ServerMessage) msg.getMessage();
+ }
+ else
+ {
+ PageLargeMessageImpl pageLargeMessage = (PageLargeMessageImpl) msg.getMessage();
+ ServerLargeMessage message = this.storageManager.createLargeMessageStorage();
+ message.decode(pageLargeMessage.getBuffer());
+ }
+
final long transactionIdDuringPaging = msg.getTransactionID();
if (transactionIdDuringPaging >= 0)
{
@@ -236,7 +252,7 @@
}
// / Update information about transactions
- if (msg.getMessage().isDurable())
+ if (pagedMessage.isDurable())
{
pageTransactionInfo.decrement();
pageTransactionsToUpdate.add(pageTransactionInfo);
@@ -246,11 +262,11 @@
//if this is a scheduled message we add it to the queue as just that
if(scheduledDeliveryTime == null)
{
- refsToAdd.addAll(postOffice.route(msg.getMessage()));
+ refsToAdd.addAll(postOffice.route(pagedMessage));
}
else
{
- List<MessageReference> refs = postOffice.route(msg.getMessage());
+ List<MessageReference> refs = postOffice.route(pagedMessage);
for (MessageReference ref : refs)
{
ref.setScheduledDeliveryTime(scheduledDeliveryTime);
@@ -258,13 +274,13 @@
scheduledRefsToAdd.addAll(refs);
}
- if (msg.getMessage().getDurableRefCount() != 0)
+ if (pagedMessage.getDurableRefCount() != 0)
{
- storageManager.storeMessageTransactional(depageTransactionID, msg.getMessage());
+ storageManager.storeMessageTransactional(depageTransactionID, pagedMessage);
//write the scheduled message record if needed
if(scheduledDeliveryTime != null)
{
- storageManager.storeMessageScheduledTransactional(depageTransactionID, msg.getMessage(), scheduledDeliveryTime);
+ storageManager.storeMessageScheduledTransactional(depageTransactionID, pagedMessage, scheduledDeliveryTime);
}
}
}
Modified: branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/persistence/StorageManager.java
===================================================================
--- branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/persistence/StorageManager.java 2008-10-17 18:53:24 UTC (rev 5139)
+++ branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/persistence/StorageManager.java 2008-10-17 21:34:38 UTC (rev 5140)
@@ -70,7 +70,7 @@
void storeDeleteMessageTransactional(long txID, long queueID, long messageID) throws Exception;
/** Create an area that will get LargeMessage bytes on the server size*/
- ServerLargeMessage createLargeMessageStorage(final long messageID) throws Exception;
+ ServerLargeMessage createLargeMessageStorage() throws Exception;
Modified: branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalServerLargeMessageImpl.java
===================================================================
--- branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalServerLargeMessageImpl.java 2008-10-17 18:53:24 UTC (rev 5139)
+++ branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalServerLargeMessageImpl.java 2008-10-17 21:34:38 UTC (rev 5140)
@@ -49,16 +49,17 @@
// Attributes ----------------------------------------------------
- final SequentialFile file;
+ private final JournalStorageManager storageManager;
+ private SequentialFile file;
+
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
- public JournalServerLargeMessageImpl(final long id, final SequentialFile file)
+ public JournalServerLargeMessageImpl(JournalStorageManager storageManager)
{
- super(id);
- this.file = file;
+ this.storageManager = storageManager;
}
// Public --------------------------------------------------------
@@ -68,6 +69,8 @@
*/
public synchronized void addBytes(final byte[] bytes) throws Exception
{
+ testFile();
+
if (!file.isOpen())
{
file.open();
@@ -80,8 +83,10 @@
}
@Override
- public synchronized void encodeBody(MessagingBuffer bufferOut, int start, int size)
+ public synchronized void encodeBody(final MessagingBuffer bufferOut, final int start, final int size)
{
+ testFile();
+
try
{
// This could maybe be optimized (maybe reading directly into bufferOut)
@@ -103,7 +108,6 @@
bufferOut.putBytes(bufferRead.array(), 0, bytesRead);
}
- // releaseResources();
}
catch (Exception e)
{
@@ -114,6 +118,8 @@
@Override
public synchronized int getBodySize()
{
+ testFile();
+
try
{
if (!file.isOpen())
@@ -130,14 +136,32 @@
}
}
-
+ @Override
+ public synchronized int getEncodeSize()
+ {
+ return getPropertiesEncodeSize();
+ }
+
+ @Override
+ public void encode(final MessagingBuffer buffer)
+ {
+ encodeProperties(buffer);
+ }
+
+ @Override
+ public void decode(final MessagingBuffer buffer)
+ {
+ decodeProperties(buffer);
+ }
+
+ @Override
public int decrementRefCount()
{
int currentRefCount = super.decrementRefCount();
-
+
if (currentRefCount == 0)
{
- log.info("Deleting file " + this.file + " as the usage was complete");
+ log.info("Deleting file " + file + " as the usage was complete");
try
{
@@ -148,16 +172,15 @@
log.error(e.getMessage(), e);
}
}
-
+
return currentRefCount;
}
-
public void deleteFile() throws MessagingException
{
-
+
// TODO: This should use an executor somewhere...
- // We can't take the risk of blocking the queue when lots of ServerLargeMessages are being deleted
+ // We can't take the risk of blocking the queue when lots of ServerLargeMessages are being deleted
try
{
file.delete();
@@ -187,6 +210,19 @@
// Protected -----------------------------------------------------
+ protected void testFile()
+ {
+ if (file == null)
+ {
+ if (this.messageID <= 0)
+ {
+ throw new RuntimeException("MessageID not set on LargeMessage");
+ }
+
+ file = storageManager.createFileForLargeMessage(this.getMessageID());
+ }
+ }
+
// Private -------------------------------------------------------
// Inner classes -------------------------------------------------
Modified: branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java 2008-10-17 18:53:24 UTC (rev 5139)
+++ branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java 2008-10-17 21:34:38 UTC (rev 5140)
@@ -46,6 +46,7 @@
import org.jboss.messaging.core.journal.Journal;
import org.jboss.messaging.core.journal.PreparedTransactionInfo;
import org.jboss.messaging.core.journal.RecordInfo;
+import org.jboss.messaging.core.journal.SequentialFile;
import org.jboss.messaging.core.journal.SequentialFileFactory;
import org.jboss.messaging.core.journal.impl.AIOSequentialFileFactory;
import org.jboss.messaging.core.journal.impl.JournalImpl;
@@ -216,10 +217,9 @@
}
/** Create an area that will get LargeMessage bytes on the server size*/
- public ServerLargeMessage createLargeMessageStorage(final long messageID) throws Exception
+ public ServerLargeMessage createLargeMessageStorage() throws Exception
{
- return new JournalServerLargeMessageImpl(messageID, largeMessagesFactory.createSequentialFile(messageID + ".msg",
- -1));
+ return new JournalServerLargeMessageImpl(this);
}
// Non transactional operations
@@ -255,7 +255,9 @@
public void storeMessageScheduled(final ServerMessage message, final long scheduledDeliveryTime) throws Exception
{
- messageJournal.appendUpdateRecord(message.getMessageID(), SET_SCHEDULED_DELIVERY_TIME, new ScheduledDeliveryEncoding(message.getMessageID(), scheduledDeliveryTime));
+ messageJournal.appendUpdateRecord(message.getMessageID(),
+ SET_SCHEDULED_DELIVERY_TIME,
+ new ScheduledDeliveryEncoding(message.getMessageID(), scheduledDeliveryTime));
}
// Transactional operations
@@ -322,9 +324,15 @@
messageJournal.appendDeleteRecordTransactional(txID, recordID, null);
}
- public void storeMessageScheduledTransactional(final long txID,final ServerMessage message, final long scheduledDeliveryTime) throws Exception
+ public void storeMessageScheduledTransactional(final long txID,
+ final ServerMessage message,
+ final long scheduledDeliveryTime) throws Exception
{
- messageJournal.appendUpdateRecordTransactional(txID, message.getMessageID(), SET_SCHEDULED_DELIVERY_TIME, new ScheduledDeliveryEncoding(message.getMessageID(), scheduledDeliveryTime));
+ messageJournal.appendUpdateRecordTransactional(txID,
+ message.getMessageID(),
+ SET_SCHEDULED_DELIVERY_TIME,
+ new ScheduledDeliveryEncoding(message.getMessageID(),
+ scheduledDeliveryTime));
}
public void storeDeleteMessageTransactional(final long txID, final long queueID, final long messageID) throws Exception
@@ -381,10 +389,10 @@
{
case ADD_LARGE_MESSAGE:
{
- ServerLargeMessage largeMessage = this.createLargeMessageStorage(record.id);
+ ServerLargeMessage largeMessage = this.createLargeMessageStorage();
LargeMessageEncoding messageEncoding = new LargeMessageEncoding(largeMessage);
-
+
messageEncoding.decode(buff);
List<MessageReference> refs = postOffice.route(largeMessage);
@@ -495,7 +503,8 @@
ScheduledDeliveryEncoding scheduledDeliveryEncoding = new ScheduledDeliveryEncoding(record.id);
scheduledDeliveryEncoding.decode(buff);
List<MessageReference> refs = routedRefs.get(record.id);
- //for any references that have already been routed, we need to remove them from t he queue and re add them as scheduled
+ // for any references that have already been routed, we need to remove them from t he queue and re add
+ // them as scheduled
for (MessageReference ref : refs)
{
ref.getQueue().removeReferenceWithID(ref.getMessage().getMessageID());
@@ -687,8 +696,7 @@
return started;
}
- // Public
- // -----------------------------------------------------------------------------------
+ // Public -----------------------------------------------------------------------------------
public Journal getMessageJournal()
{
@@ -700,9 +708,19 @@
return bindingsJournal;
}
- // Private
- // ----------------------------------------------------------------------------------
+ // Package protected ---------------------------------------------
+ /**
+ * @param messageID
+ * @return
+ */
+ SequentialFile createFileForLargeMessage(long messageID)
+ {
+ return largeMessagesFactory.createSequentialFile(messageID + ".msg", -1);
+ }
+
+ // Private ----------------------------------------------------------------------------------
+
private void loadPreparedTransactions(final PostOffice postOffice,
final Map<Long, Queue> queues,
final ResourceManager resourceManager,
@@ -795,7 +813,8 @@
ScheduledDeliveryEncoding scheduledDeliveryEncoding = new ScheduledDeliveryEncoding(record.id);
scheduledDeliveryEncoding.decode(buff);
List<MessageReference> refs = routedRefs.get(record.id);
- //for any references that have already been routed, we need to remove them from the queue and re add them as scheduled
+ // for any references that have already been routed, we need to remove them from the queue and re add
+ // them as scheduled
for (MessageReference ref : refs)
{
ref.setScheduledDeliveryTime(scheduledDeliveryEncoding.getScheduledDeliveryTime());
@@ -996,7 +1015,7 @@
*/
public void decode(final MessagingBuffer buffer)
{
- message.decodeProperties(buffer);
+ message.decode(buffer);
}
/* (non-Javadoc)
@@ -1004,7 +1023,7 @@
*/
public void encode(final MessagingBuffer buffer)
{
- message.encodeProperties(buffer);
+ message.encode(buffer);
}
/* (non-Javadoc)
@@ -1012,7 +1031,7 @@
*/
public int getEncodeSize()
{
- return message.getPropertiesEncodeSize();
+ return message.getEncodeSize();
}
}
@@ -1109,9 +1128,11 @@
super(queueID);
}
}
+
private static class ScheduledDeliveryEncoding implements EncodingSupport
{
long messageId;
+
long scheduledDeliveryTime;
private ScheduledDeliveryEncoding(long messageId, long scheduledDeliveryTime)
@@ -1145,4 +1166,5 @@
return scheduledDeliveryTime;
}
}
+
}
Modified: branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/persistence/impl/nullpm/NullStorageManager.java
===================================================================
--- branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/persistence/impl/nullpm/NullStorageManager.java 2008-10-17 18:53:24 UTC (rev 5139)
+++ branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/persistence/impl/nullpm/NullStorageManager.java 2008-10-17 21:34:38 UTC (rev 5140)
@@ -150,9 +150,9 @@
/* (non-Javadoc)
* @see org.jboss.messaging.core.persistence.StorageManager#createLargeMessageStorage(long, int, int)
*/
- public ServerLargeMessage createLargeMessageStorage(final long messageID) throws Exception
+ public ServerLargeMessage createLargeMessageStorage() throws Exception
{
- return new NullStorageServerLargeMessageImpl(messageID);
+ return new NullStorageServerLargeMessageImpl();
}
Modified: branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/persistence/impl/nullpm/NullStorageServerLargeMessageImpl.java
===================================================================
--- branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/persistence/impl/nullpm/NullStorageServerLargeMessageImpl.java 2008-10-17 18:53:24 UTC (rev 5139)
+++ branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/persistence/impl/nullpm/NullStorageServerLargeMessageImpl.java 2008-10-17 21:34:38 UTC (rev 5140)
@@ -51,9 +51,9 @@
// Public --------------------------------------------------------
- public NullStorageServerLargeMessageImpl(final long messageID)
+ public NullStorageServerLargeMessageImpl()
{
- super(messageID);
+ super();
}
/* (non-Javadoc)
Modified: branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
===================================================================
--- branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java 2008-10-17 18:53:24 UTC (rev 5139)
+++ branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java 2008-10-17 21:34:38 UTC (rev 5140)
@@ -1174,13 +1174,13 @@
*/
public ServerLargeMessage createLargeMessageStorage(long producerID, long messageID, byte[] header) throws Exception
{
- ServerLargeMessage largeMessage = storageManager.createLargeMessageStorage(messageID);
+ ServerLargeMessage largeMessage = storageManager.createLargeMessageStorage();
MessagingBuffer headerBuffer = new ByteBufferWrapper(ByteBuffer.wrap(header));
largeMessage.decodeProperties(headerBuffer);
- // decodeProperties will clean this, as the client didn send the ID originally
+ // client didn send the ID originally
largeMessage.setMessageID(messageID);
ServerProducer producer = producers.get(producerID);
Modified: branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/transaction/impl/TransactionImpl.java
===================================================================
--- branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/transaction/impl/TransactionImpl.java 2008-10-17 18:53:24 UTC (rev 5139)
+++ branches/Branch_Chunk_Clebert/src/main/org/jboss/messaging/core/transaction/impl/TransactionImpl.java 2008-10-17 21:34:38 UTC (rev 5140)
@@ -356,7 +356,6 @@
ServerMessage message = ref.getMessage();
- // Putting back the size on pagingManager, and reverting the counters
if (message.isDurable() && queue.isDurable())
{
message.incrementDurableRefCount();
Modified: branches/Branch_Chunk_Clebert/tests/src/org/jboss/messaging/tests/integration/chunkmessage/MessageChunkTest.java
===================================================================
--- branches/Branch_Chunk_Clebert/tests/src/org/jboss/messaging/tests/integration/chunkmessage/MessageChunkTest.java 2008-10-17 18:53:24 UTC (rev 5139)
+++ branches/Branch_Chunk_Clebert/tests/src/org/jboss/messaging/tests/integration/chunkmessage/MessageChunkTest.java 2008-10-17 21:34:38 UTC (rev 5140)
@@ -200,6 +200,11 @@
}
}
+
+ public void testPageOnLargeMessage() throws Exception
+ {
+ // TODO: Write a test with LargeMessages and paging
+ }
/**
* @param session
Modified: branches/Branch_Chunk_Clebert/tests/src/org/jboss/messaging/tests/integration/paging/PagingManagerIntegrationTest.java
===================================================================
--- branches/Branch_Chunk_Clebert/tests/src/org/jboss/messaging/tests/integration/paging/PagingManagerIntegrationTest.java 2008-10-17 18:53:24 UTC (rev 5139)
+++ branches/Branch_Chunk_Clebert/tests/src/org/jboss/messaging/tests/integration/paging/PagingManagerIntegrationTest.java 2008-10-17 21:34:38 UTC (rev 5140)
@@ -92,7 +92,7 @@
assertEquals(1, msgs.length);
- assertEqualsByteArrays(msg.getBody().array(), msgs[0].getMessage().getBody().array());
+ assertEqualsByteArrays(msg.getBody().array(), ((ServerMessage)msgs[0].getMessage()).getBody().array());
assertTrue(store.isPaging());
Modified: branches/Branch_Chunk_Clebert/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java
===================================================================
--- branches/Branch_Chunk_Clebert/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java 2008-10-17 18:53:24 UTC (rev 5139)
+++ branches/Branch_Chunk_Clebert/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java 2008-10-17 21:34:38 UTC (rev 5140)
@@ -80,7 +80,7 @@
// Public --------------------------------------------------------
- public SequentialFile createSequentialFile(final String fileName, final int maxAIO) throws Exception
+ public SequentialFile createSequentialFile(final String fileName, final int maxAIO)
{
FakeSequentialFile sf = fileMap.get(fileName);
Modified: branches/Branch_Chunk_Clebert/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PageImplTestBase.java
===================================================================
--- branches/Branch_Chunk_Clebert/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PageImplTestBase.java 2008-10-17 18:53:24 UTC (rev 5139)
+++ branches/Branch_Chunk_Clebert/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PageImplTestBase.java 2008-10-17 21:34:38 UTC (rev 5140)
@@ -119,11 +119,11 @@
for (int i = 0; i < msgs.length; i++)
{
- assertEquals(i, msgs[i].getMessage().getMessageID());
+ assertEquals(i, ((ServerMessage)msgs[i].getMessage()).getMessageID());
- assertEquals(simpleDestination, msgs[i].getMessage().getDestination());
+ assertEquals(simpleDestination, ((ServerMessage)msgs[i].getMessage()).getDestination());
- assertEqualsByteArrays(buffers.get(i).array(), msgs[i].getMessage().getBody().array());
+ assertEqualsByteArrays(buffers.get(i).array(), ((ServerMessage)msgs[i].getMessage()).getBody().array());
}
impl.delete();
Modified: branches/Branch_Chunk_Clebert/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PagingStoreImplTest.java
===================================================================
--- branches/Branch_Chunk_Clebert/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PagingStoreImplTest.java 2008-10-17 18:53:24 UTC (rev 5139)
+++ branches/Branch_Chunk_Clebert/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PagingStoreImplTest.java 2008-10-17 21:34:38 UTC (rev 5140)
@@ -29,6 +29,7 @@
import org.jboss.messaging.core.paging.impl.PageMessageImpl;
import org.jboss.messaging.core.paging.impl.PagingStoreImpl;
import org.jboss.messaging.core.paging.impl.TestSupportPageStore;
+import org.jboss.messaging.core.server.ServerMessage;
import org.jboss.messaging.core.settings.impl.QueueSettings;
import org.jboss.messaging.tests.unit.core.journal.impl.fakes.FakeSequentialFileFactory;
import org.jboss.messaging.util.SimpleString;
@@ -201,8 +202,8 @@
for (int i = 0; i < 10; i++)
{
- assertEquals(0, msg[i].getMessage().getMessageID());
- assertEqualsByteArrays(buffers.get(i).array(), msg[i].getMessage().getBody().array());
+ assertEquals(0, ((ServerMessage)msg[i].getMessage()).getMessageID());
+ assertEqualsByteArrays(buffers.get(i).array(),((ServerMessage)msg[i].getMessage()).getBody().array());
}
}
@@ -264,8 +265,8 @@
for (int i = 0; i < 5; i++)
{
- assertEquals(0, msg[i].getMessage().getMessageID());
- assertEqualsByteArrays(buffers.get(pageNr * 5 + i).array(), msg[i].getMessage().getBody().array());
+ assertEquals(0, ((ServerMessage)msg[i].getMessage()).getMessageID());
+ assertEqualsByteArrays(buffers.get(pageNr * 5 + i).array(), ((ServerMessage)msg[i].getMessage()).getBody().array());
}
}
@@ -307,9 +308,9 @@
assertEquals(1, msgs.length);
- assertEquals(0l, msgs[0].getMessage().getMessageID());
+ assertEquals(0l, ((ServerMessage)msgs[0].getMessage()).getMessageID());
- assertEqualsByteArrays(buffers.get(0).array(), msgs[0].getMessage().getBody().array());
+ assertEqualsByteArrays(buffers.get(0).array(), ((ServerMessage)msgs[0].getMessage()).getBody().array());
assertEquals(1, storeImpl.getNumberOfPages());
Modified: branches/Branch_Chunk_Clebert/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PagingStoreTestBase.java
===================================================================
--- branches/Branch_Chunk_Clebert/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PagingStoreTestBase.java 2008-10-17 18:53:24 UTC (rev 5139)
+++ branches/Branch_Chunk_Clebert/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PagingStoreTestBase.java 2008-10-17 21:34:38 UTC (rev 5140)
@@ -229,15 +229,17 @@
for (PageMessage msg : msgs)
{
- msg.getMessage().getBody().rewind();
- long id = msg.getMessage().getBody().getLong();
- msg.getMessage().getBody().rewind();
+ ((ServerMessage)msg.getMessage()).getBody().rewind();
+ long id = ((ServerMessage)msg.getMessage()).getBody().getLong();
+ ((ServerMessage)msg.getMessage()).getBody().rewind();
PageMessageImpl msgWritten = buffers.remove(id);
buffers2.put(id, msg);
assertNotNull(msgWritten);
- assertEquals(msg.getMessage().getDestination(), msgWritten.getMessage().getDestination());
- assertEqualsByteArrays(msgWritten.getMessage().getBody().array(), msg.getMessage().getBody().array());
+ assertEquals(((ServerMessage)msg.getMessage()).getDestination(),
+ ((ServerMessage)msgWritten.getMessage()).getDestination());
+ assertEqualsByteArrays(((ServerMessage)msgWritten.getMessage()).getBody().array(),
+ ((ServerMessage)msg.getMessage()).getBody().array());
}
}
@@ -293,12 +295,14 @@
for (PageMessage msg : msgs)
{
- msg.getMessage().getBody().rewind();
- long id = msg.getMessage().getBody().getLong();
+ ((ServerMessage)msg.getMessage()).getBody().rewind();
+ long id = ((ServerMessage)msg.getMessage()).getBody().getLong();
PageMessage msgWritten = buffers2.remove(id);
assertNotNull(msgWritten);
- assertEquals(msg.getMessage().getDestination(), msgWritten.getMessage().getDestination());
- assertEqualsByteArrays(msgWritten.getMessage().getBody().array(), msg.getMessage().getBody().array());
+ assertEquals(((ServerMessage)msg.getMessage()).getDestination(),
+ ((ServerMessage)msgWritten.getMessage()).getDestination());
+ assertEqualsByteArrays(((ServerMessage)msgWritten.getMessage()).getBody().array(),
+ ((ServerMessage)msg.getMessage()).getBody().array());
}
}
@@ -307,9 +311,10 @@
lastPage.close();
assertEquals(1, lastMessages.length);
- lastMessages[0].getMessage().getBody().rewind();
- assertEquals(lastMessages[0].getMessage().getBody().getLong(), lastMessageId);
- assertEqualsByteArrays(lastMessages[0].getMessage().getBody().array(), lastMsg.getMessage().getBody().array());
+ ((ServerMessage)lastMessages[0].getMessage()).getBody().rewind();
+ assertEquals(((ServerMessage)lastMessages[0].getMessage()).getBody().getLong(), lastMessageId);
+ assertEqualsByteArrays(((ServerMessage)lastMessages[0].getMessage()).getBody().array(),
+ ((ServerMessage)lastMsg.getMessage()).getBody().array());
assertEquals(0, buffers2.size());
More information about the jboss-cvs-commits
mailing list