[jboss-cvs] JBoss Messaging SVN: r7598 - in trunk: src/main/org/jboss/messaging/core/journal/impl and 6 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Tue Jul 21 11:28:57 EDT 2009


Author: clebert.suconic at jboss.com
Date: 2009-07-21 11:28:56 -0400 (Tue, 21 Jul 2009)
New Revision: 7598

Modified:
   trunk/src/main/org/jboss/messaging/core/journal/EncodingSupport.java
   trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java
   trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalLargeServerMessage.java
   trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java
   trunk/src/main/org/jboss/messaging/core/persistence/impl/nullpm/NullStorageLargeServerMessage.java
   trunk/src/main/org/jboss/messaging/core/server/LargeServerMessage.java
   trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java
   trunk/tests/src/org/jboss/messaging/tests/integration/client/LargeMessageTest.java
   trunk/tests/src/org/jboss/messaging/tests/integration/largemessage/LargeMessageTestBase.java
Log:
https://jira.jboss.org/jira/browse/JBMESSAGING-1496 - link between large messages

Modified: trunk/src/main/org/jboss/messaging/core/journal/EncodingSupport.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/EncodingSupport.java	2009-07-21 14:49:14 UTC (rev 7597)
+++ trunk/src/main/org/jboss/messaging/core/journal/EncodingSupport.java	2009-07-21 15:28:56 UTC (rev 7598)
@@ -26,7 +26,7 @@
 
 /**
  * 
- * This class provides encoding support for the Journal.
+ * This interface provides encoding support for the Journal.
  * 
  * 
  * @author <a href="mailto:clebert.suconic at jboss.com">Clebert Suconic</a>

Modified: trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java	2009-07-21 14:49:14 UTC (rev 7597)
+++ trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java	2009-07-21 15:28:56 UTC (rev 7598)
@@ -890,9 +890,9 @@
       try
       {
 
-         JournalRecord posFiles = records.get(id);
+         JournalRecord jrnRecord = records.get(id);
 
-         if (posFiles == null)
+         if (jrnRecord == null)
          {
             if (!(compactor != null && compactor.lookupRecord(id)))
             {
@@ -915,13 +915,13 @@
 
             // record== null here could only mean there is a compactor, and computing the delete should be done after
             // compacting is done
-            if (posFiles == null)
+            if (jrnRecord == null)
             {
                compactor.addCommandUpdate(id, usedFile, size);
             }
             else
             {
-               posFiles.addUpdateFile(usedFile, size);
+               jrnRecord.addUpdateFile(usedFile, size);
             }
          }
          finally

Modified: trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalLargeServerMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalLargeServerMessage.java	2009-07-21 14:49:14 UTC (rev 7597)
+++ trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalLargeServerMessage.java	2009-07-21 15:28:56 UTC (rev 7598)
@@ -55,6 +55,8 @@
 
    private final JournalStorageManager storageManager;
 
+   private LargeServerMessage linkMessage;
+
    // We should only use the NIO implementation on the Journal
    private SequentialFile file;
 
@@ -81,6 +83,7 @@
                                      final long newID)
    {
       super(copy);
+      this.linkMessage = copy;
       storageManager = copy.storageManager;
       file = fileCopy;
       complete = true;
@@ -153,10 +156,9 @@
       {
          throw new RuntimeException(e.getMessage(), e);
       }
-      // FIXME: The file could be bigger than MAX_INT
-      return (int)bodySize;
+      return (int)Math.min(bodySize, Integer.MAX_VALUE);
    }
-   
+
    public synchronized long getLargeBodySize()
    {
       try
@@ -190,6 +192,22 @@
       decodeProperties(buffer);
    }
 
+   /**
+    * @return the complete
+    */
+   public boolean isComplete()
+   {
+      return complete;
+   }
+
+   /**
+    * @param complete the complete to set
+    */
+   public void setComplete(boolean complete)
+   {
+      this.complete = complete;
+   }
+
    @Override
    public int decrementRefCount()
    {
@@ -197,19 +215,27 @@
 
       if (currentRefCount == 0)
       {
-         if (isTrace)
+         if (linkMessage != null)
          {
-            log.trace("Deleting file " + file + " as the usage was complete");
+            // This file is linked to another message, deleting the reference where it belongs on this case
+            linkMessage.decrementRefCount();
          }
+         else
+         {
+            if (isTrace)
+            {
+               log.trace("Deleting file " + file + " as the usage was complete");
+            }
 
-         try
-         {
-            deleteFile();
+            try
+            {
+               deleteFile();
+            }
+            catch (Exception e)
+            {
+               log.error(e.getMessage(), e);
+            }
          }
-         catch (Exception e)
-         {
-            log.error(e.getMessage(), e);
-         }
       }
 
       return currentRefCount;
@@ -221,12 +247,10 @@
       return true;
    }
 
-   public synchronized void deleteFile() throws MessagingException
+   public synchronized void deleteFile() throws Exception
    {
-      if (file != null)
-      {
-         storageManager.deleteFile(file);
-      }
+      validateFile();
+      storageManager.deleteFile(file);
    }
 
    // We cache this
@@ -270,33 +294,24 @@
       }
    }
 
-   // TODO: Optimise this per https://jira.jboss.org/jira/browse/JBMESSAGING-1496
    @Override
    public synchronized ServerMessage copy(final long newID) throws Exception
    {
-      SequentialFile newfile = storageManager.createFileForLargeMessage(newID, complete);
-
-      file.open();
-      newfile.open();
-
-      file.position(0);
-      newfile.position(0);
-
-      ByteBuffer buffer = ByteBuffer.allocate(100 * 1024);
-
-      for (long i = 0; i < file.size();)
+      incrementRefCount();
+      
+      long idToUse = messageID;
+      
+      if (linkMessage != null)
       {
-         buffer.rewind();
-         file.read(buffer);
-         newfile.write(buffer, false);
-         i += buffer.limit();
+         idToUse = linkMessage.getMessageID();
       }
 
-      file.close();
-      newfile.close();
+      SequentialFile newfile = storageManager.createFileForLargeMessage(idToUse, true);
 
-      JournalLargeServerMessage newMessage = new JournalLargeServerMessage(this, newfile, newID);
+      file.open();
 
+      JournalLargeServerMessage newMessage = new JournalLargeServerMessage(linkMessage == null ? this : (JournalLargeServerMessage)linkMessage, newfile, newID);
+
       return newMessage;
    }
 
@@ -331,6 +346,43 @@
       }
    }
 
+   /* (non-Javadoc)
+    * @see org.jboss.messaging.core.server.LargeServerMessage#getLinkedMessage()
+    */
+   public LargeServerMessage getLinkedMessage()
+   {
+      return linkMessage;
+   }
+
+   /* (non-Javadoc)
+    * @see org.jboss.messaging.core.server.LargeServerMessage#setLinkedMessage(org.jboss.messaging.core.server.LargeServerMessage)
+    */
+   public void setLinkedMessage(LargeServerMessage message)
+   {
+      if (file != null)
+      {
+         // Sanity check.. it shouldn't happen
+         throw new IllegalStateException("LargeMessage file was already set");
+      }
+
+      this.linkMessage = message;
+
+      file = storageManager.createFileForLargeMessage(message.getMessageID(), true);
+      try
+      {
+         file.open();
+         this.bodySize = file.size();
+         file.close();
+      }
+      catch (Exception e)
+      {
+         throw new RuntimeException("could not setup linked file", e);
+      }
+      finally
+      {
+      }
+   }
+
    // Inner classes -------------------------------------------------
 
 }

Modified: trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java	2009-07-21 14:49:14 UTC (rev 7597)
+++ trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java	2009-07-21 15:28:56 UTC (rev 7598)
@@ -107,7 +107,7 @@
    public static final int SIZE_FIELDS = SIZE_INT + SIZE_LONG + SIZE_LONG + SIZE_BYTE;
 
    // Message journal record types
-
+   
    public static final byte ADD_LARGE_MESSAGE = 30;
 
    public static final byte ADD_MESSAGE = 31;
@@ -344,7 +344,7 @@
          throw new MessagingException(MessagingException.ILLEGAL_STATE, "MessageId was not assigned to Message");
       }
 
-      if (message instanceof LargeServerMessage)
+      if (message.isLargeMessage())
       {
          messageJournal.appendAddRecordTransactional(txID,
                                                      message.getMessageID(),
@@ -357,7 +357,7 @@
       }
 
    }
-
+   
    public void storePageTransaction(final long txID, final PageTransactionInfo pageTransaction) throws Exception
    {
       if (pageTransaction.getRecordID() != 0)
@@ -505,6 +505,29 @@
                LargeMessageEncoding messageEncoding = new LargeMessageEncoding(largeMessage);
 
                messageEncoding.decode(buff);
+               
+               Long originalMessageID = (Long)largeMessage.getProperties().getProperty(MessageImpl.HDR_ORIG_MESSAGE_ID);
+               
+               // Using the linked file by the original file
+               if (originalMessageID != null)
+               {
+                  LargeServerMessage originalMessage = (LargeServerMessage)messages.get(originalMessageID);
+                  
+                  if (originalMessage == null)
+                  {
+                     // this could happen if the message was deleted but the file still exists as the file still being used
+                      originalMessage = createLargeMessage();
+                     originalMessage.setMessageID(originalMessageID);
+                     originalMessage.setComplete(true);
+                     messages.put(originalMessageID, originalMessage);
+                  }
+                  
+                  originalMessage.incrementRefCount();
+                  
+                  largeMessage.setLinkedMessage(originalMessage);
+                  largeMessage.setComplete(true);
+               }
+               
 
                messages.put(record.id, largeMessage);
 

Modified: trunk/src/main/org/jboss/messaging/core/persistence/impl/nullpm/NullStorageLargeServerMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/persistence/impl/nullpm/NullStorageLargeServerMessage.java	2009-07-21 14:49:14 UTC (rev 7597)
+++ trunk/src/main/org/jboss/messaging/core/persistence/impl/nullpm/NullStorageLargeServerMessage.java	2009-07-21 15:28:56 UTC (rev 7598)
@@ -103,6 +103,38 @@
       return true;
    }
 
+   /* (non-Javadoc)
+    * @see org.jboss.messaging.core.server.LargeServerMessage#getLinkedMessage()
+    */
+   public LargeServerMessage getLinkedMessage()
+   {
+      return null;
+   }
+
+   /* (non-Javadoc)
+    * @see org.jboss.messaging.core.server.LargeServerMessage#setLinkedMessage(org.jboss.messaging.core.server.LargeServerMessage)
+    */
+   public void setLinkedMessage(LargeServerMessage message)
+   {
+   }
+
+   /* (non-Javadoc)
+    * @see org.jboss.messaging.core.server.LargeServerMessage#isComplete()
+    */
+   public boolean isComplete()
+   {
+      // nothing to be done on null persistence
+      return true;
+   }
+
+   /* (non-Javadoc)
+    * @see org.jboss.messaging.core.server.LargeServerMessage#setComplete(boolean)
+    */
+   public void setComplete(boolean isComplete)
+   {
+      // nothing to be done on null persistence
+   }
+
    // Package protected ---------------------------------------------
 
    // Protected -----------------------------------------------------

Modified: trunk/src/main/org/jboss/messaging/core/server/LargeServerMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/LargeServerMessage.java	2009-07-21 14:49:14 UTC (rev 7597)
+++ trunk/src/main/org/jboss/messaging/core/server/LargeServerMessage.java	2009-07-21 15:28:56 UTC (rev 7598)
@@ -34,6 +34,12 @@
 public interface LargeServerMessage extends ServerMessage
 {
    void addBytes(byte[] bytes) throws Exception;
+   
+   /** When a large message is copied (e.g. ExpiryQueue) instead of copying the file, we specify a link between the messages */
+   void setLinkedMessage(LargeServerMessage message);
+   
+   /** When a large message is copied (e.g. ExpiryQueue) instead of copying the file, we specify a link between the messages */
+   LargeServerMessage getLinkedMessage();
 
    /** Close the files if opened */
    void releaseResources();
@@ -42,5 +48,9 @@
    
    void complete() throws Exception;
    
+   void setComplete(boolean isComplete);
+   
+   boolean isComplete();
+   
    void deleteFile() throws Exception;
 }

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java	2009-07-21 14:49:14 UTC (rev 7597)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java	2009-07-21 15:28:56 UTC (rev 7598)
@@ -141,7 +141,9 @@
    // We cache the consumers here since we don't want to include the redistributor
 
    private final Set<Consumer> consumers = new HashSet<Consumer>();
+
    private final Map<Consumer, Iterator<MessageReference>> iterators = new HashMap<Consumer, Iterator<MessageReference>>();
+
    private ConcurrentMap<SimpleString, Consumer> groups = new ConcurrentHashMap<SimpleString, Consumer>();
 
    public QueueImpl(final long persistenceID,
@@ -342,7 +344,7 @@
       {
          return;
       }
-      
+
       try
       {
          lock.acquire();
@@ -359,7 +361,7 @@
       {
          return;
       }
-      
+
       lock.release();
    }
 
@@ -453,7 +455,7 @@
             }
          }
       }
-      
+
       return removed;
    }
 
@@ -521,7 +523,7 @@
       return new Iterator<MessageReference>()
       {
          private final Iterator<MessageReference> iterator = messageReferences.iterator();
-         
+
          public boolean hasNext()
          {
             return iterator.hasNext();
@@ -1181,10 +1183,18 @@
 
       ServerMessage copy = message.copy(newMessageId);
 
-      SimpleString originalQueue = copy.getDestination();
-      copy.putStringProperty(HDR_ORIGINAL_DESTINATION, originalQueue);
-      copy.putLongProperty(HDR_ORIG_MESSAGE_ID, message.getMessageID());
-
+      if (ref.getMessage().getProperty(HDR_ORIG_MESSAGE_ID) != null)
+      {
+         copy.putStringProperty(HDR_ORIGINAL_DESTINATION, (SimpleString)ref.getMessage()
+                                                                           .getProperty(HDR_ORIGINAL_DESTINATION));
+         copy.putLongProperty(HDR_ORIG_MESSAGE_ID, (Long)ref.getMessage().getProperty(HDR_ORIG_MESSAGE_ID));
+      }
+      else
+      {
+         SimpleString originalQueue = copy.getDestination();
+         copy.putStringProperty(HDR_ORIGINAL_DESTINATION, originalQueue);
+         copy.putLongProperty(HDR_ORIG_MESSAGE_ID, message.getMessageID());
+      }
       // reset expiry
       copy.setExpiration(0);
       if (expiry)
@@ -1290,7 +1300,7 @@
       }
 
       Consumer consumer;
-      
+
       MessageReference reference;
 
       Iterator<MessageReference> iterator = null;
@@ -1301,10 +1311,10 @@
 
       while (true)
       {
-        consumer = distributionPolicy.getNextConsumer();
-         
-        iterator = iterators.get(consumer);
-         
+         consumer = distributionPolicy.getNextConsumer();
+
+         iterator = iterators.get(consumer);
+
          if (iterator == null)
          {
             reference = messageReferences.peekFirst();
@@ -1327,7 +1337,7 @@
                }
             }
          }
-         
+
          if (reference == null)
          {
             nullReferences.add(consumer);
@@ -1358,7 +1368,7 @@
                continue;
             }
          }
-         
+
          HandleStatus status = handle(reference, consumer);
 
          if (status == HandleStatus.HANDLED)
@@ -1471,7 +1481,7 @@
       {
          return HandleStatus.BUSY;
       }
-      
+
       HandleStatus status;
 
       boolean filterRejected = false;
@@ -1482,7 +1492,7 @@
       {
          Consumer consumer = distributionPolicy.getNextConsumer();
          consumerCount++;
-         
+
          final SimpleString groupId = (SimpleString)reference.getMessage().getProperty(MessageImpl.HDR_GROUP_ID);
 
          if (groupId != null)
@@ -1531,15 +1541,15 @@
             }
          }
       }
-      
+
       if (status == HandleStatus.NO_MATCH)
       {
          promptDelivery = true;
       }
-      
+
       return status;
    }
-   
+
    private synchronized HandleStatus handle(final MessageReference reference, final Consumer consumer)
    {
 
@@ -1550,10 +1560,8 @@
       }
       catch (Throwable t)
       {
-         log.warn("removing consumer which did not handle a message, consumer=" +
-                  consumer +
-                  ", message=" +
-                  reference, t);
+         log.warn("removing consumer which did not handle a message, consumer=" + consumer + ", message=" + reference,
+                  t);
 
          // If the consumer throws an exception we remove the consumer
          try
@@ -1572,9 +1580,9 @@
          throw new IllegalStateException("ClientConsumer.handle() should never return null");
       }
 
-      return status; 
+      return status;
    }
-   
+
    private void removeExpiringReference(final MessageReference ref) throws Exception
    {
       if (ref.getMessage().getExpiration() > 0)
@@ -1651,7 +1659,7 @@
    }
 
    private synchronized void initPagingStore(SimpleString destination)
-   {      
+   {
       // PagingManager would be null only on testcases
       if (pagingStore == null && pagingManager != null)
       {
@@ -1667,7 +1675,7 @@
       }
    }
 
-  private synchronized void startDepaging()
+   private synchronized void startDepaging()
    {
       if (pagingStore != null)
       {
@@ -1685,7 +1693,7 @@
          }
       }
    }
-  
+
    // Inner classes
    // --------------------------------------------------------------------------
 

Modified: trunk/tests/src/org/jboss/messaging/tests/integration/client/LargeMessageTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/client/LargeMessageTest.java	2009-07-21 14:49:14 UTC (rev 7597)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/client/LargeMessageTest.java	2009-07-21 15:28:56 UTC (rev 7598)
@@ -76,6 +76,376 @@
 
    // Public --------------------------------------------------------
 
+   public void testDLALargeMessage() throws Exception
+   {
+      final int messageSize = 50000;
+
+      ClientSession session = null;
+
+      try
+      {
+         server = createServer(true);
+
+         server.start();
+
+         ClientSessionFactory sf = createInVMFactory();
+
+         session = sf.createSession(false, false, false);
+
+         session.createQueue(ADDRESS, ADDRESS, true);
+         session.createQueue(ADDRESS, ADDRESS.concat("-2"), true);
+
+         SimpleString ADDRESS_DLA = ADDRESS.concat("-dla");
+
+         AddressSettings addressSettings = new AddressSettings();
+
+         addressSettings.setDeadLetterAddress(ADDRESS_DLA);
+         addressSettings.setMaxDeliveryAttempts(1);
+
+         server.getAddressSettingsRepository().addMatch("*", addressSettings);
+
+         session.createQueue(ADDRESS_DLA, ADDRESS_DLA, true);
+
+         ClientProducer producer = session.createProducer(ADDRESS);
+
+         Message clientFile = createLargeClientMessage(session, messageSize, true);
+
+         producer.send(clientFile);
+
+         session.commit();
+
+         session.start();
+
+         ClientConsumer consumer = session.createConsumer(ADDRESS_DLA);
+
+         ClientConsumer consumerRollback = session.createConsumer(ADDRESS);
+         ClientMessage msg1 = consumerRollback.receive(1000);
+         assertNotNull(msg1);
+         msg1.acknowledge();
+         session.rollback();
+         consumerRollback.close();
+
+         msg1 = consumer.receive(10000);
+
+         assertNotNull(msg1);
+
+         for (int i = 0; i < messageSize; i++)
+         {
+            assertEquals(getSamplebyte(i), msg1.getBody().readByte());
+         }
+
+         session.close();
+         server.stop();
+
+         server = createServer(true);
+
+         server.start();
+
+         sf = createInVMFactory();
+
+         session = sf.createSession(false, false, false);
+
+         session.start();
+
+         consumer = session.createConsumer(ADDRESS_DLA);
+
+         msg1 = consumer.receive(10000);
+
+         assertNotNull(msg1);
+
+         for (int i = 0; i < messageSize; i++)
+         {
+            assertEquals(getSamplebyte(i), msg1.getBody().readByte());
+         }
+
+         msg1.acknowledge();
+
+         session.commit();
+
+         validateNoFilesOnLargeDir(1);
+
+         consumer = session.createConsumer(ADDRESS.concat("-2"));
+
+         msg1 = consumer.receive(10000);
+
+         assertNotNull(msg1);
+
+         for (int i = 0; i < messageSize; i++)
+         {
+            assertEquals(getSamplebyte(i), msg1.getBody().readByte());
+         }
+
+         msg1.acknowledge();
+
+         session.commit();
+
+         session.close();
+
+         validateNoFilesOnLargeDir();
+      }
+      finally
+      {
+         try
+         {
+            server.stop();
+         }
+         catch (Throwable ignored)
+         {
+         }
+
+         try
+         {
+            session.close();
+         }
+         catch (Throwable ignored)
+         {
+         }
+      }
+   }
+
+   public void testDLAOnExpiry() throws Exception
+   {
+      final int messageSize = 50000;
+
+      ClientSession session = null;
+
+      try
+      {
+         server = createServer(true);
+
+         server.start();
+
+         ClientSessionFactory sf = createInVMFactory();
+
+         session = sf.createSession(false, false, false);
+
+         session.createQueue(ADDRESS, ADDRESS, true);
+
+         SimpleString ADDRESS_DLA = ADDRESS.concat("-dla");
+         SimpleString ADDRESS_EXPIRY = ADDRESS.concat("-expiry");
+
+         AddressSettings addressSettings = new AddressSettings();
+
+         addressSettings.setDeadLetterAddress(ADDRESS_DLA);
+         addressSettings.setExpiryAddress(ADDRESS_EXPIRY);
+         addressSettings.setMaxDeliveryAttempts(1);
+
+         server.getAddressSettingsRepository().addMatch("*", addressSettings);
+
+         session.createQueue(ADDRESS_DLA, ADDRESS_DLA, true);
+         session.createQueue(ADDRESS_EXPIRY, ADDRESS_EXPIRY, true);
+
+         ClientProducer producer = session.createProducer(ADDRESS);
+
+         Message clientFile = createLargeClientMessage(session, messageSize, true);
+         clientFile.setExpiration(System.currentTimeMillis());
+
+         producer.send(clientFile);
+
+         session.commit();
+
+         session.start();
+
+         ClientConsumer consumerExpired = session.createConsumer(ADDRESS);
+         // to kick expiry quicker than waiting reaper thread
+         assertNull(consumerExpired.receive(1000));
+         consumerExpired.close();
+
+         ClientConsumer consumerExpiry = session.createConsumer(ADDRESS_EXPIRY);
+
+         ClientMessage msg1 = consumerExpiry.receive(5000);
+         assertNotNull(msg1);
+         msg1.acknowledge();
+
+         session.rollback();
+
+         for (int j = 0; j < messageSize; j++)
+         {
+            assertEquals(getSamplebyte(j), msg1.getBody().readByte());
+         }
+
+         consumerExpiry.close();
+
+         for (int i = 0; i < 10; i++)
+         {
+
+            consumerExpiry = session.createConsumer(ADDRESS_DLA);
+
+            msg1 = consumerExpiry.receive(5000);
+            assertNotNull(msg1);
+            msg1.acknowledge();
+
+            session.rollback();
+
+            for (int j = 0; j < messageSize; j++)
+            {
+               assertEquals(getSamplebyte(j), msg1.getBody().readByte());
+            }
+
+            consumerExpiry.close();
+         }
+
+         session.close();
+         server.stop();
+
+         server = createServer(true);
+
+         server.start();
+
+         sf = createInVMFactory();
+
+         session = sf.createSession(false, false, false);
+
+         session.start();
+
+         consumerExpiry = session.createConsumer(ADDRESS_DLA);
+
+         msg1 = consumerExpiry.receive(5000);
+         assertNotNull(msg1);
+         msg1.acknowledge();
+
+         for (int i = 0; i < messageSize; i++)
+         {
+            assertEquals(getSamplebyte(i), msg1.getBody().readByte());
+         }
+
+         session.commit();
+
+         consumerExpiry.close();
+
+         session.commit();
+
+         session.close();
+
+         validateNoFilesOnLargeDir();
+      }
+      finally
+      {
+         try
+         {
+            server.stop();
+         }
+         catch (Throwable ignored)
+         {
+         }
+
+         try
+         {
+            session.close();
+         }
+         catch (Throwable ignored)
+         {
+         }
+      }
+   }
+
+   public void testExpiryLargeMessage() throws Exception
+   {
+      final int messageSize = 50000;
+
+      ClientSession session = null;
+
+      try
+      {
+         server = createServer(true);
+
+         server.start();
+
+         ClientSessionFactory sf = createInVMFactory();
+
+         session = sf.createSession(false, false, false);
+
+         session.createQueue(ADDRESS, ADDRESS, true);
+
+         SimpleString ADDRESS_EXPIRY = ADDRESS.concat("-expiry");
+
+         AddressSettings addressSettings = new AddressSettings();
+
+         addressSettings.setExpiryAddress(ADDRESS_EXPIRY);
+
+         server.getAddressSettingsRepository().addMatch("*", addressSettings);
+
+         session.createQueue(ADDRESS_EXPIRY, ADDRESS_EXPIRY, true);
+
+         ClientProducer producer = session.createProducer(ADDRESS);
+
+         Message clientFile = createLargeClientMessage(session, messageSize, true);
+
+         clientFile.setExpiration(System.currentTimeMillis());
+
+         producer.send(clientFile);
+
+         session.commit();
+
+         session.start();
+
+         ClientConsumer consumer = session.createConsumer(ADDRESS_EXPIRY);
+
+         // Creating a consumer just to make the expiry process go faster and not have to wait for the reaper
+         ClientConsumer consumer2 = session.createConsumer(ADDRESS);
+         assertNull(consumer2.receive(1000));
+
+         ClientMessage msg1 = consumer.receive(50000);
+
+         assertNotNull(msg1);
+
+         for (int i = 0; i < messageSize; i++)
+         {
+            assertEquals(getSamplebyte(i), msg1.getBody().readByte());
+         }
+
+         session.close();
+         server.stop();
+
+         server = createServer(true);
+
+         server.start();
+
+         sf = createInVMFactory();
+
+         session = sf.createSession(false, false, false);
+
+         session.start();
+
+         consumer = session.createConsumer(ADDRESS_EXPIRY);
+
+         msg1 = consumer.receive(10000);
+
+         assertNotNull(msg1);
+
+         for (int i = 0; i < messageSize; i++)
+         {
+            assertEquals(getSamplebyte(i), msg1.getBody().readByte());
+         }
+
+         msg1.acknowledge();
+
+         session.commit();
+
+         session.close();
+
+         validateNoFilesOnLargeDir();
+      }
+      finally
+      {
+         try
+         {
+            server.stop();
+         }
+         catch (Throwable ignored)
+         {
+         }
+
+         try
+         {
+            session.close();
+         }
+         catch (Throwable ignored)
+         {
+         }
+      }
+   }
+
    public void testResendSmallStreamMessage() throws Exception
    {
       internalTestResendMessage(50000);
@@ -1017,7 +1387,7 @@
          }
       }
    }
-   
+
    public void testSendStreamingSingleMessage() throws Exception
    {
       ClientSession session = null;
@@ -1069,7 +1439,7 @@
          // }
 
          session.commit();
-         
+
          assertGlobalSize(server);
          assertEquals(0, ((Queue)server.getPostOffice().getBinding(ADDRESS).getBindable()).getDeliveringCount());
          assertEquals(0, ((Queue)server.getPostOffice().getBinding(ADDRESS).getBindable()).getMessageCount());
@@ -1344,7 +1714,6 @@
       assertEquals(0l, server.getPostOffice().getPagingManager().getTotalMemory());
    }
 
-
    // Inner classes -------------------------------------------------
 
 }

Modified: trunk/tests/src/org/jboss/messaging/tests/integration/largemessage/LargeMessageTestBase.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/largemessage/LargeMessageTestBase.java	2009-07-21 14:49:14 UTC (rev 7597)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/largemessage/LargeMessageTestBase.java	2009-07-21 15:28:56 UTC (rev 7598)
@@ -607,14 +607,14 @@
    /**
     * Deleting a file on LargeDire is an asynchronous process. Wee need to keep looking for a while if the file hasn't been deleted yet
     */
-   protected void validateNoFilesOnLargeDir() throws Exception
+   protected void validateNoFilesOnLargeDir(int expect) throws Exception
    {
       File largeMessagesFileDir = new File(getLargeMessagesDir());
 
       // Deleting the file is async... we keep looking for a period of the time until the file is really gone
       for (int i = 0; i < 100; i++)
       {
-         if (largeMessagesFileDir.listFiles().length > 0)
+         if (largeMessagesFileDir.listFiles().length != expect)
          {
             Thread.sleep(10);
          }
@@ -624,9 +624,17 @@
          }
       }
 
-      assertEquals(0, largeMessagesFileDir.listFiles().length);
+      assertEquals(expect, largeMessagesFileDir.listFiles().length);
    }
 
+   /**
+    * Deleting a file on LargeDire is an asynchronous process. Wee need to keep looking for a while if the file hasn't been deleted yet
+    */
+   protected void validateNoFilesOnLargeDir() throws Exception
+   {
+      validateNoFilesOnLargeDir(0);
+   }
+
    protected OutputStream createFakeOutputStream() throws Exception
    {
 




More information about the jboss-cvs-commits mailing list