[hornetq-commits] JBoss hornetq SVN: r11929 - in branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710: src/main/org/hornetq/core/server and 2 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Thu Dec 22 08:37:06 EST 2011


Author: clebert.suconic at jboss.com
Date: 2011-12-22 08:37:04 -0500 (Thu, 22 Dec 2011)
New Revision: 11929

Added:
   branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/tests/src/org/hornetq/tests/integration/client/ExpiryLargeMessageTest.java
Modified:
   branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
   branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/persistence/impl/journal/LargeServerMessageImpl.java
   branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/server/LargeServerMessage.java
   branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
Log:
backporting JBPAPP-7809 into JBPAPP-7710

Modified: branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java	2011-12-22 13:32:53 UTC (rev 11928)
+++ branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java	2011-12-22 13:37:04 UTC (rev 11929)
@@ -1680,23 +1680,25 @@
 
       if (largeMessage.containsProperty(Message.HDR_ORIG_MESSAGE_ID))
       {
+         // for compatibility: couple with old behaviour, copying the old file to avoid message loss
          long originalMessageID = largeMessage.getLongProperty(Message.HDR_ORIG_MESSAGE_ID);
-
-         LargeServerMessage originalMessage = (LargeServerMessage)messages.get(originalMessageID);
-
-         if (originalMessage == null)
+         
+         SequentialFile currentFile = createFileForLargeMessage(largeMessage.getMessageID(), true);
+         
+         if (!currentFile.exists())
          {
-            // this could happen if the message was deleted but the file still exists as the file still being used
-            originalMessage = createLargeMessage();
-            originalMessage.setDurable(true);
-            originalMessage.setMessageID(originalMessageID);
-            messages.put(originalMessageID, originalMessage);
+            SequentialFile linkedFile = createFileForLargeMessage(originalMessageID, true);
+            if (linkedFile.exists())
+            {
+               linkedFile.copyTo(currentFile);
+               linkedFile.close();
+            }
          }
+         
+         currentFile.close();
+      }
 
-         originalMessage.incrementDelayDeletionCount();
 
-         largeMessage.setLinkedMessage(originalMessage);
-      }
       return largeMessage;
    }
 

Modified: branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/persistence/impl/journal/LargeServerMessageImpl.java
===================================================================
--- branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/persistence/impl/journal/LargeServerMessageImpl.java	2011-12-22 13:32:53 UTC (rev 11928)
+++ branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/persistence/impl/journal/LargeServerMessageImpl.java	2011-12-22 13:37:04 UTC (rev 11929)
@@ -48,8 +48,6 @@
    // Attributes ----------------------------------------------------
 
    private final JournalStorageManager storageManager;
-
-   private LargeServerMessage linkMessage;
    
    private boolean paged;
 
@@ -78,7 +76,6 @@
    private LargeServerMessageImpl(final LargeServerMessageImpl copy, TypedProperties properties, final SequentialFile fileCopy, final long newID)
    {
       super(copy, properties);
-      linkMessage = copy;
       storageManager = copy.storageManager;
       file = fileCopy;
       bodySize = copy.bodySize;
@@ -155,18 +152,28 @@
    public synchronized void incrementDelayDeletionCount()
    {
       delayDeletionCount.incrementAndGet();
+      try
+      {
+         incrementRefCount();
+      }
+      catch (Exception e)
+      {
+         log.warn(e.getMessage(), e);
+      }
    }
 
    public synchronized void decrementDelayDeletionCount() throws Exception
    {
       int count = delayDeletionCount.decrementAndGet();
+      
+      decrementRefCount();
 
       if (count == 0)
       {
          checkDelete();
       }
    }
-
+   
    @Override
    public BodyEncoder getBodyEncoder() throws HornetQException
    {
@@ -178,27 +185,19 @@
    {
       if (getRefCount() <= 0)
       {
-         if (linkMessage != null)
+         if (LargeServerMessageImpl.isTrace)
          {
-            // This file is linked to another message, deleting the reference where it belongs on this case
-            linkMessage.decrementDelayDeletionCount();
+            LargeServerMessageImpl.log.trace("Deleting file " + file + " as the usage was complete");
          }
-         else
-         {
-            if (LargeServerMessageImpl.isTrace)
-            {
-               LargeServerMessageImpl.log.trace("Deleting file " + file + " as the usage was complete");
-            }
 
-            try
-            {
-               deleteFile();
-            }
-            catch (Exception e)
-            {
-               LargeServerMessageImpl.log.error(e.getMessage(), e);
-            }
+         try
+         {
+            deleteFile();
          }
+         catch (Exception e)
+         {
+            LargeServerMessageImpl.log.error(e.getMessage(), e);
+         }
       }
    }
 
@@ -289,15 +288,9 @@
    {
       long idToUse = messageID;
 
-      if (linkMessage != null)
-      {
-         idToUse = linkMessage.getMessageID();
-      }
-
       SequentialFile newfile = storageManager.createFileForLargeMessage(idToUse, durable);
 
-      ServerMessage newMessage = new LargeServerMessageImpl(linkMessage == null ? this
-                                                                               : (LargeServerMessageImpl)linkMessage,
+      ServerMessage newMessage = new LargeServerMessageImpl(this,
                                                             properties,
                                                             newfile,
                                                             messageID);
@@ -308,65 +301,43 @@
    @Override
    public synchronized ServerMessage copy(final long newID)
    {
-      if (!paged)
+      try
       {
-         incrementDelayDeletionCount();
-   
-         long idToUse = messageID;
-   
-         if (linkMessage != null)
-         {
-            idToUse = linkMessage.getMessageID();
-         }
-   
-         SequentialFile newfile = storageManager.createFileForLargeMessage(idToUse, durable);
-   
-         ServerMessage newMessage = new LargeServerMessageImpl(linkMessage == null ? this
-                                                                                  : (LargeServerMessageImpl)linkMessage,
-                                                               properties,
-                                                               newfile,
-                                                               newID);
+         validateFile();
+         
+         SequentialFile file = this.file;
+         
+         SequentialFile newFile = storageManager.createFileForLargeMessage(newID, durable);
+         
+         file.copyTo(newFile);
+         
+         LargeServerMessageImpl newMessage = new LargeServerMessageImpl(this, properties, newFile, newID);
+         
          return newMessage;
       }
-      else
+      catch (Exception e)
       {
-         try
-         {
-            validateFile();
-            
-            SequentialFile file = this.file;
-            
-            SequentialFile newFile = storageManager.createFileForLargeMessage(newID, durable);
-            
-            file.copyTo(newFile);
-            
-            LargeServerMessageImpl newMessage = new LargeServerMessageImpl(this, properties, newFile, newID);
-            
-            newMessage.linkMessage = null;
-            
-            newMessage.setPaged();
-            
-            return newMessage;
-         }
-         catch (Exception e)
-         {
-            log.warn("Error on copying large message this for DLA or Expiry", e);
-            return null;
-         }
+         log.warn("Error on copying large message " + this + " for DLA or Expiry", e);
+         return null;
       }
+      finally
+      {
+         releaseResources();
+      }
    }
 
-   public SequentialFile getFile()
+   public SequentialFile getFile() throws Exception
    {
+      validateFile();
       return file;
    }
 
    @Override
    public String toString()
    {
-      return "ServerMessage[messageID=" + messageID + ",priority=" + this.getPriority() + 
+      return "LargeServerMessage[messageID=" + messageID + ",priority=" + this.getPriority() + 
       ",expiration=[" + (this.getExpiration() != 0 ? new java.util.Date(this.getExpiration()) : "null") + "]" +
-      ", durable=" + durable + ", address=" + getAddress()  + ",properties=" + properties.toString() + "]";
+      ", durable=" + durable + ", address=" + getAddress()  + ",properties=" + properties.toString() + "]@" + System.identityHashCode(this);
    }
 
 
@@ -396,7 +367,7 @@
 
             file = storageManager.createFileForLargeMessage(getMessageID(), durable);
 
-            file.open();
+            openFile();
             
             bodySize = file.size();
          }
@@ -407,32 +378,27 @@
          throw new HornetQException(HornetQException.INTERNAL_ERROR, e.getMessage(), e);
       }
    }
-
-   /* (non-Javadoc)
-    * @see org.hornetq.core.server.LargeServerMessage#setLinkedMessage(org.hornetq.core.server.LargeServerMessage)
-    */
-   public void setLinkedMessage(final LargeServerMessage message)
+   
+   protected void openFile() throws Exception
    {
-      if (file != null)
+     if (file == null)
+     {
+        validateFile();
+     }
+     else
+      if (!file.isOpen())
       {
-         // Sanity check.. it shouldn't happen
-         throw new IllegalStateException("LargeMessage file was already set");
-      }
-
-      linkMessage = message;
-
-      file = storageManager.createFileForLargeMessage(message.getMessageID(), durable);
-      try
-      {
          file.open();
-         bodySize = file.size();
-         file.close();
       }
-      catch (Exception e)
-      {
-         throw new RuntimeException("could not setup linked file", e);
-      }
    }
+   
+   protected void closeFile() throws Exception
+   {
+     if (file != null && file.isOpen())
+     {
+        file.close();
+     }
+   }
 
    // Inner classes -------------------------------------------------
 
@@ -444,6 +410,10 @@
       {
          try
          {
+            if (cFile != null && cFile.isOpen())
+            {
+               cFile.close();
+            }
             cFile = file.copy();
             cFile.open();
          }

Modified: branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/server/LargeServerMessage.java
===================================================================
--- branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/server/LargeServerMessage.java	2011-12-22 13:32:53 UTC (rev 11928)
+++ branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/server/LargeServerMessage.java	2011-12-22 13:37:04 UTC (rev 11929)
@@ -13,6 +13,7 @@
 
 package org.hornetq.core.server;
 
+
 /**
  * A LargeMessage
  *
@@ -26,9 +27,6 @@
 {
    void addBytes(byte[] bytes) throws Exception;
 
-   /** When a large message is copied (e.g. ExpiryQueue) instead of copying the file, we specify a link between the messages */
-   void setLinkedMessage(LargeServerMessage message);
-
    boolean isFileExists() throws Exception;
    
    /**

Modified: branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
===================================================================
--- branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java	2011-12-22 13:32:53 UTC (rev 11928)
+++ branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java	2011-12-22 13:37:04 UTC (rev 11929)
@@ -930,7 +930,7 @@
                int localChunkLen = 0;
 
                localChunkLen = (int)Math.min(sizePendingLargeMessage - positionPendingLargeMessage, minLargeMessageSize);
-
+               
                HornetQBuffer bodyBuffer = HornetQBuffers.fixedBuffer(localChunkLen);
 
                context.encode(bodyBuffer, localChunkLen);

Added: branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/tests/src/org/hornetq/tests/integration/client/ExpiryLargeMessageTest.java
===================================================================
--- branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/tests/src/org/hornetq/tests/integration/client/ExpiryLargeMessageTest.java	                        (rev 0)
+++ branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/tests/src/org/hornetq/tests/integration/client/ExpiryLargeMessageTest.java	2011-12-22 13:37:04 UTC (rev 11929)
@@ -0,0 +1,415 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.integration.client;
+
+import java.io.File;
+
+import org.hornetq.api.core.Message;
+import org.hornetq.api.core.SimpleString;
+import org.hornetq.api.core.client.ClientConsumer;
+import org.hornetq.api.core.client.ClientMessage;
+import org.hornetq.api.core.client.ClientProducer;
+import org.hornetq.api.core.client.ClientSession;
+import org.hornetq.api.core.client.ClientSessionFactory;
+import org.hornetq.api.core.client.ServerLocator;
+import org.hornetq.core.server.HornetQServer;
+import org.hornetq.core.settings.impl.AddressFullMessagePolicy;
+import org.hornetq.core.settings.impl.AddressSettings;
+import org.hornetq.tests.util.ServiceTestBase;
+
+/**
+ * This test will send large messages in page-mode, DLQ then, expiry then, and they should be received fine
+ *
+ * @author clebertsuconic
+ *
+ *
+ */
+public class ExpiryLargeMessageTest extends ServiceTestBase
+{
+
+   // Constants -----------------------------------------------------
+   final SimpleString EXPIRY = new SimpleString("my-expiry");
+
+   final SimpleString DLQ = new SimpleString("my-DLQ");
+
+   final SimpleString MY_QUEUE = new SimpleString("MY-QUEUE");
+
+   final int messageSize = 10 * 1024;
+
+   // it has to be an even number
+   final int numberOfMessages = 50;
+
+   // Attributes ----------------------------------------------------
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   // Public --------------------------------------------------------
+
+   public void testExpiryMessagesThenDLQ() throws Exception
+   {
+      HornetQServer server = createServer(true);
+
+      server.getConfiguration().setMessageExpiryScanPeriod(600000);
+
+      AddressSettings setting = new AddressSettings();
+      setting.setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE);
+      setting.setMaxDeliveryAttempts(5);
+      setting.setMaxSizeBytes(50 * 1024);
+      setting.setPageSizeBytes(10 * 1024);
+      setting.setExpiryAddress(EXPIRY);
+      setting.setDeadLetterAddress(DLQ);
+      server.getAddressSettingsRepository().addMatch(MY_QUEUE.toString(), setting);
+
+      setting.setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE);
+      setting.setMaxDeliveryAttempts(5);
+      setting.setMaxSizeBytes(50 * 1024);
+      setting.setPageSizeBytes(10 * 1024);
+      setting.setDeadLetterAddress(DLQ);
+      server.getAddressSettingsRepository().addMatch(EXPIRY.toString(), setting);
+
+      server.start();
+
+      try
+      {
+
+         server.createQueue(EXPIRY, EXPIRY, null, true, false);
+
+         server.createQueue(DLQ, DLQ, null, true, false);
+
+         server.createQueue(MY_QUEUE, MY_QUEUE, null, true, false);
+
+         ServerLocator locator = createInVMNonHALocator();
+
+         ClientSessionFactory sf = locator.createSessionFactory();
+
+         ClientSession session = sf.createSession(true, true, 0);
+
+         byte bufferSample[] = new byte[messageSize];
+
+         for (int i = 0; i < bufferSample.length; i++)
+         {
+            bufferSample[i] = getSamplebyte(i);
+         }
+
+         ClientProducer producer = session.createProducer(MY_QUEUE);
+
+         long timeToExpiry = System.currentTimeMillis() + 1000;
+         for (int i = 0; i < numberOfMessages; i++)
+         {
+            ClientMessage message = session.createMessage(true);
+
+            message.putIntProperty("count", i);
+
+            // Send a few regular messages first, then all is just large messages
+            if (i % 2 == 0)
+            {
+               message.putBooleanProperty("tst-large", false);
+               message.getBodyBuffer().writeBytes(bufferSample);
+            }
+            else
+            {
+               message.putBooleanProperty("tst-large", true);
+               message.setBodyInputStream(createFakeLargeStream(messageSize));
+            }
+
+            message.setExpiration(timeToExpiry);
+
+            producer.send(message);
+         }
+
+         server.stop();
+         server.start();
+
+         sf = locator.createSessionFactory();
+
+         session = sf.createSession(true, true, 0);
+
+         Thread.sleep(1500);
+
+         ClientConsumer cons = session.createConsumer(MY_QUEUE);
+         assertNull(cons.receive(1000));
+
+         session.close();
+
+         session = sf.createSession(false, false);
+
+         cons = session.createConsumer(EXPIRY);
+         session.start();
+
+         // Consume half of the messages to make sure all the messages are paging (on the second try)
+         for (int i = 0; i < numberOfMessages / 2; i++)
+         {
+            ClientMessage msg = cons.receive(5000);
+            assertNotNull(msg);
+            msg.acknowledge();
+         }
+
+         session.commit();
+
+         cons.close();
+
+         for (int rep = 0; rep < 6; rep++)
+         {
+            cons = session.createConsumer(EXPIRY);
+            session.start();
+
+            System.out.println("Trying " + rep);
+            for (int i = 0; i < numberOfMessages / 2; i++)
+            {
+               ClientMessage message = cons.receive(5000);
+               assertNotNull(message);
+
+               if (i % 10 == 0)
+               {
+                  System.out.println("Received " + i);
+               }
+
+               for (int location = 0; location < messageSize; location++)
+               {
+                  assertEquals(getSamplebyte((long)location), message.getBodyBuffer().readByte());
+               }
+               message.acknowledge();
+            }
+
+            session.rollback();
+
+            cons.close();
+
+            session.close();
+            sf.close();
+
+            if (rep == 0)
+            {
+               // restart the server at the first try
+               server.stop();
+               server.start();
+            }
+
+            sf = locator.createSessionFactory();
+            session = sf.createSession(false, false);
+            session.start();
+         }
+
+         cons = session.createConsumer(EXPIRY);
+         session.start();
+         assertNull(cons.receiveImmediate());
+
+         cons.close();
+
+         session.close();
+         sf.close();
+
+         for (int rep = 0; rep < 2; rep++)
+         {
+            sf = locator.createSessionFactory();
+
+            session = sf.createSession(false, false);
+
+            cons = session.createConsumer(DLQ);
+
+            session.start();
+
+            for (int i = 0; i < numberOfMessages / 2; i++)
+            {
+               ClientMessage message = cons.receive(5000);
+               assertNotNull(message);
+
+               if (i % 10 == 0)
+               {
+                  System.out.println("Received " + i);
+               }
+
+               for (int location = 0; location < messageSize; location++)
+               {
+                  assertEquals(getSamplebyte((long)location), message.getBodyBuffer().readByte());
+               }
+               message.acknowledge();
+            }
+            if (rep == 0)
+            {
+               session.rollback();
+               session.close();
+               sf.close();
+               server.stop();
+               server.start();
+            }
+         }
+
+         session.commit();
+
+         assertNull(cons.receiveImmediate());
+
+         session.close();
+         sf.close();
+         locator.close();
+      }
+      finally
+      {
+         server.stop();
+      }
+   }
+
+   /** 
+    * Tests if the system would still couple with old data where the LargeMessage was linked to its previous copy
+    * @throws Exception
+    */
+   public void testCompatilityWithLinks() throws Exception
+   {
+      HornetQServer server = createServer(true);
+
+      server.getConfiguration().setMessageExpiryScanPeriod(600000);
+
+      AddressSettings setting = new AddressSettings();
+      setting.setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE);
+      setting.setMaxDeliveryAttempts(5);
+      setting.setMaxSizeBytes(-1);
+      setting.setPageSizeBytes(10 * 1024);
+      setting.setExpiryAddress(EXPIRY);
+      setting.setDeadLetterAddress(DLQ);
+      server.getAddressSettingsRepository().addMatch(MY_QUEUE.toString(), setting);
+
+      setting.setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE);
+      setting.setMaxDeliveryAttempts(5);
+      setting.setMaxSizeBytes(-1);
+      setting.setPageSizeBytes(10 * 1024);
+      setting.setDeadLetterAddress(DLQ);
+      server.getAddressSettingsRepository().addMatch(EXPIRY.toString(), setting);
+
+      server.start();
+
+      try
+      {
+
+         server.createQueue(EXPIRY, EXPIRY, null, true, false);
+
+         server.createQueue(DLQ, DLQ, null, true, false);
+
+         server.createQueue(MY_QUEUE, MY_QUEUE, null, true, false);
+
+         ServerLocator locator = createInVMNonHALocator();
+
+         ClientSessionFactory sf = locator.createSessionFactory();
+
+         ClientSession session = sf.createSession(true, true, 0);
+
+         byte bufferSample[] = new byte[messageSize];
+
+         for (int i = 0; i < bufferSample.length; i++)
+         {
+            bufferSample[i] = getSamplebyte(i);
+         }
+
+         ClientProducer producer = session.createProducer(MY_QUEUE);
+
+         long timeToExpiry = System.currentTimeMillis() + 1000;
+         for (int i = 0; i < numberOfMessages; i++)
+         {
+            ClientMessage message = session.createMessage(true);
+
+            message.putIntProperty("count", i);
+
+            // Everything is going to be a large message
+            message.putBooleanProperty("tst-large", true);
+            message.setBodyInputStream(createFakeLargeStream(messageSize));
+
+            message.setExpiration(timeToExpiry);
+
+            producer.send(message);
+         }
+
+         server.stop();
+         server.start();
+
+         sf = locator.createSessionFactory();
+
+         session = sf.createSession(true, true, 0);
+         session.start();
+
+         Thread.sleep(1500);
+
+         ClientConsumer cons = session.createConsumer(MY_QUEUE);
+         assertNull(cons.receive(1000));
+
+         session.close();
+
+         session = sf.createSession(false, false);
+
+         cons = session.createConsumer(EXPIRY);
+         session.start();
+
+         ClientMessage msg = cons.receive(5000);
+         assertNotNull(msg);
+         msg.acknowledge();
+         session.rollback();
+
+         server.stop();
+
+         // rename the file, simulating old behaviour
+         long messageID = msg.getMessageID();
+         long oldID = msg.getLongProperty(Message.HDR_ORIG_MESSAGE_ID);
+
+         File largeMessagesFileDir = new File(getLargeMessagesDir());
+         File oldFile = new File(largeMessagesFileDir, oldID + ".msg");
+         File currentFile = new File(largeMessagesFileDir, messageID + ".msg");
+         currentFile.renameTo(oldFile);
+
+         server.start();
+
+         sf = locator.createSessionFactory();
+
+         session = sf.createSession(true, true, 0);
+         session.start();
+
+         cons = session.createConsumer(EXPIRY);
+
+         for (int i = 0; i < numberOfMessages; i++)
+         {
+            ClientMessage message = cons.receive(5000);
+            assertNotNull(message);
+
+            if (i % 10 == 0)
+            {
+               System.out.println("Received " + i);
+            }
+
+            for (int location = 0; location < messageSize; location++)
+            {
+               assertEquals(getSamplebyte((long)location), message.getBodyBuffer().readByte());
+            }
+            message.acknowledge();
+         }
+
+         session.commit();
+
+         session.close();
+         sf.close();
+         locator.close();
+      }
+      finally
+      {
+         server.stop();
+      }
+   }
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+
+}



More information about the hornetq-commits mailing list