[jboss-cvs] JBoss Messaging SVN: r7598 - in trunk: src/main/org/jboss/messaging/core/journal/impl and 6 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Tue Jul 21 11:28:57 EDT 2009
Author: clebert.suconic at jboss.com
Date: 2009-07-21 11:28:56 -0400 (Tue, 21 Jul 2009)
New Revision: 7598
Modified:
trunk/src/main/org/jboss/messaging/core/journal/EncodingSupport.java
trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java
trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalLargeServerMessage.java
trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java
trunk/src/main/org/jboss/messaging/core/persistence/impl/nullpm/NullStorageLargeServerMessage.java
trunk/src/main/org/jboss/messaging/core/server/LargeServerMessage.java
trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java
trunk/tests/src/org/jboss/messaging/tests/integration/client/LargeMessageTest.java
trunk/tests/src/org/jboss/messaging/tests/integration/largemessage/LargeMessageTestBase.java
Log:
https://jira.jboss.org/jira/browse/JBMESSAGING-1496 - link between large messages
Modified: trunk/src/main/org/jboss/messaging/core/journal/EncodingSupport.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/EncodingSupport.java 2009-07-21 14:49:14 UTC (rev 7597)
+++ trunk/src/main/org/jboss/messaging/core/journal/EncodingSupport.java 2009-07-21 15:28:56 UTC (rev 7598)
@@ -26,7 +26,7 @@
/**
*
- * This class provides encoding support for the Journal.
+ * This interface provides encoding support for the Journal.
*
*
* @author <a href="mailto:clebert.suconic at jboss.com">Clebert Suconic</a>
Modified: trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java 2009-07-21 14:49:14 UTC (rev 7597)
+++ trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java 2009-07-21 15:28:56 UTC (rev 7598)
@@ -890,9 +890,9 @@
try
{
- JournalRecord posFiles = records.get(id);
+ JournalRecord jrnRecord = records.get(id);
- if (posFiles == null)
+ if (jrnRecord == null)
{
if (!(compactor != null && compactor.lookupRecord(id)))
{
@@ -915,13 +915,13 @@
// record== null here could only mean there is a compactor, and computing the delete should be done after
// compacting is done
- if (posFiles == null)
+ if (jrnRecord == null)
{
compactor.addCommandUpdate(id, usedFile, size);
}
else
{
- posFiles.addUpdateFile(usedFile, size);
+ jrnRecord.addUpdateFile(usedFile, size);
}
}
finally
Modified: trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalLargeServerMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalLargeServerMessage.java 2009-07-21 14:49:14 UTC (rev 7597)
+++ trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalLargeServerMessage.java 2009-07-21 15:28:56 UTC (rev 7598)
@@ -55,6 +55,8 @@
private final JournalStorageManager storageManager;
+ private LargeServerMessage linkMessage;
+
// We should only use the NIO implementation on the Journal
private SequentialFile file;
@@ -81,6 +83,7 @@
final long newID)
{
super(copy);
+ this.linkMessage = copy;
storageManager = copy.storageManager;
file = fileCopy;
complete = true;
@@ -153,10 +156,9 @@
{
throw new RuntimeException(e.getMessage(), e);
}
- // FIXME: The file could be bigger than MAX_INT
- return (int)bodySize;
+ return (int)Math.min(bodySize, Integer.MAX_VALUE);
}
-
+
public synchronized long getLargeBodySize()
{
try
@@ -190,6 +192,22 @@
decodeProperties(buffer);
}
+ /**
+ * @return the complete
+ */
+ public boolean isComplete()
+ {
+ return complete;
+ }
+
+ /**
+ * @param complete the complete to set
+ */
+ public void setComplete(boolean complete)
+ {
+ this.complete = complete;
+ }
+
@Override
public int decrementRefCount()
{
@@ -197,19 +215,27 @@
if (currentRefCount == 0)
{
- if (isTrace)
+ if (linkMessage != null)
{
- log.trace("Deleting file " + file + " as the usage was complete");
+ // This file is linked to another message, deleting the reference where it belongs on this case
+ linkMessage.decrementRefCount();
}
+ else
+ {
+ if (isTrace)
+ {
+ log.trace("Deleting file " + file + " as the usage was complete");
+ }
- try
- {
- deleteFile();
+ try
+ {
+ deleteFile();
+ }
+ catch (Exception e)
+ {
+ log.error(e.getMessage(), e);
+ }
}
- catch (Exception e)
- {
- log.error(e.getMessage(), e);
- }
}
return currentRefCount;
@@ -221,12 +247,10 @@
return true;
}
- public synchronized void deleteFile() throws MessagingException
+ public synchronized void deleteFile() throws Exception
{
- if (file != null)
- {
- storageManager.deleteFile(file);
- }
+ validateFile();
+ storageManager.deleteFile(file);
}
// We cache this
@@ -270,33 +294,24 @@
}
}
- // TODO: Optimise this per https://jira.jboss.org/jira/browse/JBMESSAGING-1496
@Override
public synchronized ServerMessage copy(final long newID) throws Exception
{
- SequentialFile newfile = storageManager.createFileForLargeMessage(newID, complete);
-
- file.open();
- newfile.open();
-
- file.position(0);
- newfile.position(0);
-
- ByteBuffer buffer = ByteBuffer.allocate(100 * 1024);
-
- for (long i = 0; i < file.size();)
+ incrementRefCount();
+
+ long idToUse = messageID;
+
+ if (linkMessage != null)
{
- buffer.rewind();
- file.read(buffer);
- newfile.write(buffer, false);
- i += buffer.limit();
+ idToUse = linkMessage.getMessageID();
}
- file.close();
- newfile.close();
+ SequentialFile newfile = storageManager.createFileForLargeMessage(idToUse, true);
- JournalLargeServerMessage newMessage = new JournalLargeServerMessage(this, newfile, newID);
+ file.open();
+ JournalLargeServerMessage newMessage = new JournalLargeServerMessage(linkMessage == null ? this : (JournalLargeServerMessage)linkMessage, newfile, newID);
+
return newMessage;
}
@@ -331,6 +346,43 @@
}
}
+ /* (non-Javadoc)
+ * @see org.jboss.messaging.core.server.LargeServerMessage#getLinkedMessage()
+ */
+ public LargeServerMessage getLinkedMessage()
+ {
+ return linkMessage;
+ }
+
+ /* (non-Javadoc)
+ * @see org.jboss.messaging.core.server.LargeServerMessage#setLinkedMessage(org.jboss.messaging.core.server.LargeServerMessage)
+ */
+ public void setLinkedMessage(LargeServerMessage message)
+ {
+ if (file != null)
+ {
+ // Sanity check.. it shouldn't happen
+ throw new IllegalStateException("LargeMessage file was already set");
+ }
+
+ this.linkMessage = message;
+
+ file = storageManager.createFileForLargeMessage(message.getMessageID(), true);
+ try
+ {
+ file.open();
+ this.bodySize = file.size();
+ file.close();
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException("could not setup linked file", e);
+ }
+ finally
+ {
+ }
+ }
+
// Inner classes -------------------------------------------------
}
Modified: trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java 2009-07-21 14:49:14 UTC (rev 7597)
+++ trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java 2009-07-21 15:28:56 UTC (rev 7598)
@@ -107,7 +107,7 @@
public static final int SIZE_FIELDS = SIZE_INT + SIZE_LONG + SIZE_LONG + SIZE_BYTE;
// Message journal record types
-
+
public static final byte ADD_LARGE_MESSAGE = 30;
public static final byte ADD_MESSAGE = 31;
@@ -344,7 +344,7 @@
throw new MessagingException(MessagingException.ILLEGAL_STATE, "MessageId was not assigned to Message");
}
- if (message instanceof LargeServerMessage)
+ if (message.isLargeMessage())
{
messageJournal.appendAddRecordTransactional(txID,
message.getMessageID(),
@@ -357,7 +357,7 @@
}
}
-
+
public void storePageTransaction(final long txID, final PageTransactionInfo pageTransaction) throws Exception
{
if (pageTransaction.getRecordID() != 0)
@@ -505,6 +505,29 @@
LargeMessageEncoding messageEncoding = new LargeMessageEncoding(largeMessage);
messageEncoding.decode(buff);
+
+ Long originalMessageID = (Long)largeMessage.getProperties().getProperty(MessageImpl.HDR_ORIG_MESSAGE_ID);
+
+ // Using the linked file by the original file
+ if (originalMessageID != null)
+ {
+ LargeServerMessage originalMessage = (LargeServerMessage)messages.get(originalMessageID);
+
+ if (originalMessage == null)
+ {
+ // this could happen if the message was deleted but the file still exists as the file still being used
+ originalMessage = createLargeMessage();
+ originalMessage.setMessageID(originalMessageID);
+ originalMessage.setComplete(true);
+ messages.put(originalMessageID, originalMessage);
+ }
+
+ originalMessage.incrementRefCount();
+
+ largeMessage.setLinkedMessage(originalMessage);
+ largeMessage.setComplete(true);
+ }
+
messages.put(record.id, largeMessage);
Modified: trunk/src/main/org/jboss/messaging/core/persistence/impl/nullpm/NullStorageLargeServerMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/persistence/impl/nullpm/NullStorageLargeServerMessage.java 2009-07-21 14:49:14 UTC (rev 7597)
+++ trunk/src/main/org/jboss/messaging/core/persistence/impl/nullpm/NullStorageLargeServerMessage.java 2009-07-21 15:28:56 UTC (rev 7598)
@@ -103,6 +103,38 @@
return true;
}
+ /* (non-Javadoc)
+ * @see org.jboss.messaging.core.server.LargeServerMessage#getLinkedMessage()
+ */
+ public LargeServerMessage getLinkedMessage()
+ {
+ return null;
+ }
+
+ /* (non-Javadoc)
+ * @see org.jboss.messaging.core.server.LargeServerMessage#setLinkedMessage(org.jboss.messaging.core.server.LargeServerMessage)
+ */
+ public void setLinkedMessage(LargeServerMessage message)
+ {
+ }
+
+ /* (non-Javadoc)
+ * @see org.jboss.messaging.core.server.LargeServerMessage#isComplete()
+ */
+ public boolean isComplete()
+ {
+ // nothing to be done on null persistence
+ return true;
+ }
+
+ /* (non-Javadoc)
+ * @see org.jboss.messaging.core.server.LargeServerMessage#setComplete(boolean)
+ */
+ public void setComplete(boolean isComplete)
+ {
+ // nothing to be done on null persistence
+ }
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
Modified: trunk/src/main/org/jboss/messaging/core/server/LargeServerMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/LargeServerMessage.java 2009-07-21 14:49:14 UTC (rev 7597)
+++ trunk/src/main/org/jboss/messaging/core/server/LargeServerMessage.java 2009-07-21 15:28:56 UTC (rev 7598)
@@ -34,6 +34,12 @@
public interface LargeServerMessage extends ServerMessage
{
void addBytes(byte[] bytes) throws Exception;
+
+ /** When a large message is copied (e.g. ExpiryQueue) instead of copying the file, we specify a link between the messages */
+ void setLinkedMessage(LargeServerMessage message);
+
+ /** When a large message is copied (e.g. ExpiryQueue) instead of copying the file, we specify a link between the messages */
+ LargeServerMessage getLinkedMessage();
/** Close the files if opened */
void releaseResources();
@@ -42,5 +48,9 @@
void complete() throws Exception;
+ void setComplete(boolean isComplete);
+
+ boolean isComplete();
+
void deleteFile() throws Exception;
}
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java 2009-07-21 14:49:14 UTC (rev 7597)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java 2009-07-21 15:28:56 UTC (rev 7598)
@@ -141,7 +141,9 @@
// We cache the consumers here since we don't want to include the redistributor
private final Set<Consumer> consumers = new HashSet<Consumer>();
+
private final Map<Consumer, Iterator<MessageReference>> iterators = new HashMap<Consumer, Iterator<MessageReference>>();
+
private ConcurrentMap<SimpleString, Consumer> groups = new ConcurrentHashMap<SimpleString, Consumer>();
public QueueImpl(final long persistenceID,
@@ -342,7 +344,7 @@
{
return;
}
-
+
try
{
lock.acquire();
@@ -359,7 +361,7 @@
{
return;
}
-
+
lock.release();
}
@@ -453,7 +455,7 @@
}
}
}
-
+
return removed;
}
@@ -521,7 +523,7 @@
return new Iterator<MessageReference>()
{
private final Iterator<MessageReference> iterator = messageReferences.iterator();
-
+
public boolean hasNext()
{
return iterator.hasNext();
@@ -1181,10 +1183,18 @@
ServerMessage copy = message.copy(newMessageId);
- SimpleString originalQueue = copy.getDestination();
- copy.putStringProperty(HDR_ORIGINAL_DESTINATION, originalQueue);
- copy.putLongProperty(HDR_ORIG_MESSAGE_ID, message.getMessageID());
-
+ if (ref.getMessage().getProperty(HDR_ORIG_MESSAGE_ID) != null)
+ {
+ copy.putStringProperty(HDR_ORIGINAL_DESTINATION, (SimpleString)ref.getMessage()
+ .getProperty(HDR_ORIGINAL_DESTINATION));
+ copy.putLongProperty(HDR_ORIG_MESSAGE_ID, (Long)ref.getMessage().getProperty(HDR_ORIG_MESSAGE_ID));
+ }
+ else
+ {
+ SimpleString originalQueue = copy.getDestination();
+ copy.putStringProperty(HDR_ORIGINAL_DESTINATION, originalQueue);
+ copy.putLongProperty(HDR_ORIG_MESSAGE_ID, message.getMessageID());
+ }
// reset expiry
copy.setExpiration(0);
if (expiry)
@@ -1290,7 +1300,7 @@
}
Consumer consumer;
-
+
MessageReference reference;
Iterator<MessageReference> iterator = null;
@@ -1301,10 +1311,10 @@
while (true)
{
- consumer = distributionPolicy.getNextConsumer();
-
- iterator = iterators.get(consumer);
-
+ consumer = distributionPolicy.getNextConsumer();
+
+ iterator = iterators.get(consumer);
+
if (iterator == null)
{
reference = messageReferences.peekFirst();
@@ -1327,7 +1337,7 @@
}
}
}
-
+
if (reference == null)
{
nullReferences.add(consumer);
@@ -1358,7 +1368,7 @@
continue;
}
}
-
+
HandleStatus status = handle(reference, consumer);
if (status == HandleStatus.HANDLED)
@@ -1471,7 +1481,7 @@
{
return HandleStatus.BUSY;
}
-
+
HandleStatus status;
boolean filterRejected = false;
@@ -1482,7 +1492,7 @@
{
Consumer consumer = distributionPolicy.getNextConsumer();
consumerCount++;
-
+
final SimpleString groupId = (SimpleString)reference.getMessage().getProperty(MessageImpl.HDR_GROUP_ID);
if (groupId != null)
@@ -1531,15 +1541,15 @@
}
}
}
-
+
if (status == HandleStatus.NO_MATCH)
{
promptDelivery = true;
}
-
+
return status;
}
-
+
private synchronized HandleStatus handle(final MessageReference reference, final Consumer consumer)
{
@@ -1550,10 +1560,8 @@
}
catch (Throwable t)
{
- log.warn("removing consumer which did not handle a message, consumer=" +
- consumer +
- ", message=" +
- reference, t);
+ log.warn("removing consumer which did not handle a message, consumer=" + consumer + ", message=" + reference,
+ t);
// If the consumer throws an exception we remove the consumer
try
@@ -1572,9 +1580,9 @@
throw new IllegalStateException("ClientConsumer.handle() should never return null");
}
- return status;
+ return status;
}
-
+
private void removeExpiringReference(final MessageReference ref) throws Exception
{
if (ref.getMessage().getExpiration() > 0)
@@ -1651,7 +1659,7 @@
}
private synchronized void initPagingStore(SimpleString destination)
- {
+ {
// PagingManager would be null only on testcases
if (pagingStore == null && pagingManager != null)
{
@@ -1667,7 +1675,7 @@
}
}
- private synchronized void startDepaging()
+ private synchronized void startDepaging()
{
if (pagingStore != null)
{
@@ -1685,7 +1693,7 @@
}
}
}
-
+
// Inner classes
// --------------------------------------------------------------------------
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/client/LargeMessageTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/client/LargeMessageTest.java 2009-07-21 14:49:14 UTC (rev 7597)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/client/LargeMessageTest.java 2009-07-21 15:28:56 UTC (rev 7598)
@@ -76,6 +76,376 @@
// Public --------------------------------------------------------
+ public void testDLALargeMessage() throws Exception
+ {
+ final int messageSize = 50000;
+
+ ClientSession session = null;
+
+ try
+ {
+ server = createServer(true);
+
+ server.start();
+
+ ClientSessionFactory sf = createInVMFactory();
+
+ session = sf.createSession(false, false, false);
+
+ session.createQueue(ADDRESS, ADDRESS, true);
+ session.createQueue(ADDRESS, ADDRESS.concat("-2"), true);
+
+ SimpleString ADDRESS_DLA = ADDRESS.concat("-dla");
+
+ AddressSettings addressSettings = new AddressSettings();
+
+ addressSettings.setDeadLetterAddress(ADDRESS_DLA);
+ addressSettings.setMaxDeliveryAttempts(1);
+
+ server.getAddressSettingsRepository().addMatch("*", addressSettings);
+
+ session.createQueue(ADDRESS_DLA, ADDRESS_DLA, true);
+
+ ClientProducer producer = session.createProducer(ADDRESS);
+
+ Message clientFile = createLargeClientMessage(session, messageSize, true);
+
+ producer.send(clientFile);
+
+ session.commit();
+
+ session.start();
+
+ ClientConsumer consumer = session.createConsumer(ADDRESS_DLA);
+
+ ClientConsumer consumerRollback = session.createConsumer(ADDRESS);
+ ClientMessage msg1 = consumerRollback.receive(1000);
+ assertNotNull(msg1);
+ msg1.acknowledge();
+ session.rollback();
+ consumerRollback.close();
+
+ msg1 = consumer.receive(10000);
+
+ assertNotNull(msg1);
+
+ for (int i = 0; i < messageSize; i++)
+ {
+ assertEquals(getSamplebyte(i), msg1.getBody().readByte());
+ }
+
+ session.close();
+ server.stop();
+
+ server = createServer(true);
+
+ server.start();
+
+ sf = createInVMFactory();
+
+ session = sf.createSession(false, false, false);
+
+ session.start();
+
+ consumer = session.createConsumer(ADDRESS_DLA);
+
+ msg1 = consumer.receive(10000);
+
+ assertNotNull(msg1);
+
+ for (int i = 0; i < messageSize; i++)
+ {
+ assertEquals(getSamplebyte(i), msg1.getBody().readByte());
+ }
+
+ msg1.acknowledge();
+
+ session.commit();
+
+ validateNoFilesOnLargeDir(1);
+
+ consumer = session.createConsumer(ADDRESS.concat("-2"));
+
+ msg1 = consumer.receive(10000);
+
+ assertNotNull(msg1);
+
+ for (int i = 0; i < messageSize; i++)
+ {
+ assertEquals(getSamplebyte(i), msg1.getBody().readByte());
+ }
+
+ msg1.acknowledge();
+
+ session.commit();
+
+ session.close();
+
+ validateNoFilesOnLargeDir();
+ }
+ finally
+ {
+ try
+ {
+ server.stop();
+ }
+ catch (Throwable ignored)
+ {
+ }
+
+ try
+ {
+ session.close();
+ }
+ catch (Throwable ignored)
+ {
+ }
+ }
+ }
+
+ public void testDLAOnExpiry() throws Exception
+ {
+ final int messageSize = 50000;
+
+ ClientSession session = null;
+
+ try
+ {
+ server = createServer(true);
+
+ server.start();
+
+ ClientSessionFactory sf = createInVMFactory();
+
+ session = sf.createSession(false, false, false);
+
+ session.createQueue(ADDRESS, ADDRESS, true);
+
+ SimpleString ADDRESS_DLA = ADDRESS.concat("-dla");
+ SimpleString ADDRESS_EXPIRY = ADDRESS.concat("-expiry");
+
+ AddressSettings addressSettings = new AddressSettings();
+
+ addressSettings.setDeadLetterAddress(ADDRESS_DLA);
+ addressSettings.setExpiryAddress(ADDRESS_EXPIRY);
+ addressSettings.setMaxDeliveryAttempts(1);
+
+ server.getAddressSettingsRepository().addMatch("*", addressSettings);
+
+ session.createQueue(ADDRESS_DLA, ADDRESS_DLA, true);
+ session.createQueue(ADDRESS_EXPIRY, ADDRESS_EXPIRY, true);
+
+ ClientProducer producer = session.createProducer(ADDRESS);
+
+ Message clientFile = createLargeClientMessage(session, messageSize, true);
+ clientFile.setExpiration(System.currentTimeMillis());
+
+ producer.send(clientFile);
+
+ session.commit();
+
+ session.start();
+
+ ClientConsumer consumerExpired = session.createConsumer(ADDRESS);
+ // to kick expiry quicker than waiting reaper thread
+ assertNull(consumerExpired.receive(1000));
+ consumerExpired.close();
+
+ ClientConsumer consumerExpiry = session.createConsumer(ADDRESS_EXPIRY);
+
+ ClientMessage msg1 = consumerExpiry.receive(5000);
+ assertNotNull(msg1);
+ msg1.acknowledge();
+
+ session.rollback();
+
+ for (int j = 0; j < messageSize; j++)
+ {
+ assertEquals(getSamplebyte(j), msg1.getBody().readByte());
+ }
+
+ consumerExpiry.close();
+
+ for (int i = 0; i < 10; i++)
+ {
+
+ consumerExpiry = session.createConsumer(ADDRESS_DLA);
+
+ msg1 = consumerExpiry.receive(5000);
+ assertNotNull(msg1);
+ msg1.acknowledge();
+
+ session.rollback();
+
+ for (int j = 0; j < messageSize; j++)
+ {
+ assertEquals(getSamplebyte(j), msg1.getBody().readByte());
+ }
+
+ consumerExpiry.close();
+ }
+
+ session.close();
+ server.stop();
+
+ server = createServer(true);
+
+ server.start();
+
+ sf = createInVMFactory();
+
+ session = sf.createSession(false, false, false);
+
+ session.start();
+
+ consumerExpiry = session.createConsumer(ADDRESS_DLA);
+
+ msg1 = consumerExpiry.receive(5000);
+ assertNotNull(msg1);
+ msg1.acknowledge();
+
+ for (int i = 0; i < messageSize; i++)
+ {
+ assertEquals(getSamplebyte(i), msg1.getBody().readByte());
+ }
+
+ session.commit();
+
+ consumerExpiry.close();
+
+ session.commit();
+
+ session.close();
+
+ validateNoFilesOnLargeDir();
+ }
+ finally
+ {
+ try
+ {
+ server.stop();
+ }
+ catch (Throwable ignored)
+ {
+ }
+
+ try
+ {
+ session.close();
+ }
+ catch (Throwable ignored)
+ {
+ }
+ }
+ }
+
+ public void testExpiryLargeMessage() throws Exception
+ {
+ final int messageSize = 50000;
+
+ ClientSession session = null;
+
+ try
+ {
+ server = createServer(true);
+
+ server.start();
+
+ ClientSessionFactory sf = createInVMFactory();
+
+ session = sf.createSession(false, false, false);
+
+ session.createQueue(ADDRESS, ADDRESS, true);
+
+ SimpleString ADDRESS_EXPIRY = ADDRESS.concat("-expiry");
+
+ AddressSettings addressSettings = new AddressSettings();
+
+ addressSettings.setExpiryAddress(ADDRESS_EXPIRY);
+
+ server.getAddressSettingsRepository().addMatch("*", addressSettings);
+
+ session.createQueue(ADDRESS_EXPIRY, ADDRESS_EXPIRY, true);
+
+ ClientProducer producer = session.createProducer(ADDRESS);
+
+ Message clientFile = createLargeClientMessage(session, messageSize, true);
+
+ clientFile.setExpiration(System.currentTimeMillis());
+
+ producer.send(clientFile);
+
+ session.commit();
+
+ session.start();
+
+ ClientConsumer consumer = session.createConsumer(ADDRESS_EXPIRY);
+
+ // Creating a consumer just to make the expiry process go faster and not have to wait for the reaper
+ ClientConsumer consumer2 = session.createConsumer(ADDRESS);
+ assertNull(consumer2.receive(1000));
+
+ ClientMessage msg1 = consumer.receive(50000);
+
+ assertNotNull(msg1);
+
+ for (int i = 0; i < messageSize; i++)
+ {
+ assertEquals(getSamplebyte(i), msg1.getBody().readByte());
+ }
+
+ session.close();
+ server.stop();
+
+ server = createServer(true);
+
+ server.start();
+
+ sf = createInVMFactory();
+
+ session = sf.createSession(false, false, false);
+
+ session.start();
+
+ consumer = session.createConsumer(ADDRESS_EXPIRY);
+
+ msg1 = consumer.receive(10000);
+
+ assertNotNull(msg1);
+
+ for (int i = 0; i < messageSize; i++)
+ {
+ assertEquals(getSamplebyte(i), msg1.getBody().readByte());
+ }
+
+ msg1.acknowledge();
+
+ session.commit();
+
+ session.close();
+
+ validateNoFilesOnLargeDir();
+ }
+ finally
+ {
+ try
+ {
+ server.stop();
+ }
+ catch (Throwable ignored)
+ {
+ }
+
+ try
+ {
+ session.close();
+ }
+ catch (Throwable ignored)
+ {
+ }
+ }
+ }
+
public void testResendSmallStreamMessage() throws Exception
{
internalTestResendMessage(50000);
@@ -1017,7 +1387,7 @@
}
}
}
-
+
public void testSendStreamingSingleMessage() throws Exception
{
ClientSession session = null;
@@ -1069,7 +1439,7 @@
// }
session.commit();
-
+
assertGlobalSize(server);
assertEquals(0, ((Queue)server.getPostOffice().getBinding(ADDRESS).getBindable()).getDeliveringCount());
assertEquals(0, ((Queue)server.getPostOffice().getBinding(ADDRESS).getBindable()).getMessageCount());
@@ -1344,7 +1714,6 @@
assertEquals(0l, server.getPostOffice().getPagingManager().getTotalMemory());
}
-
// Inner classes -------------------------------------------------
}
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/largemessage/LargeMessageTestBase.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/largemessage/LargeMessageTestBase.java 2009-07-21 14:49:14 UTC (rev 7597)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/largemessage/LargeMessageTestBase.java 2009-07-21 15:28:56 UTC (rev 7598)
@@ -607,14 +607,14 @@
/**
* Deleting a file on LargeDire is an asynchronous process. Wee need to keep looking for a while if the file hasn't been deleted yet
*/
- protected void validateNoFilesOnLargeDir() throws Exception
+ protected void validateNoFilesOnLargeDir(int expect) throws Exception
{
File largeMessagesFileDir = new File(getLargeMessagesDir());
// Deleting the file is async... we keep looking for a period of the time until the file is really gone
for (int i = 0; i < 100; i++)
{
- if (largeMessagesFileDir.listFiles().length > 0)
+ if (largeMessagesFileDir.listFiles().length != expect)
{
Thread.sleep(10);
}
@@ -624,9 +624,17 @@
}
}
- assertEquals(0, largeMessagesFileDir.listFiles().length);
+ assertEquals(expect, largeMessagesFileDir.listFiles().length);
}
+ /**
+ * Deleting a file on LargeDire is an asynchronous process. Wee need to keep looking for a while if the file hasn't been deleted yet
+ */
+ protected void validateNoFilesOnLargeDir() throws Exception
+ {
+ validateNoFilesOnLargeDir(0);
+ }
+
protected OutputStream createFakeOutputStream() throws Exception
{
More information about the jboss-cvs-commits
mailing list