[hornetq-commits] JBoss hornetq SVN: r11920 - in branches/Branch_2_2_EAP: tests/src/org/hornetq/tests/integration/client and 1 other directory.

do-not-reply at jboss.org do-not-reply at jboss.org
Tue Dec 20 21:47:10 EST 2011


Author: clebert.suconic at jboss.com
Date: 2011-12-20 21:47:09 -0500 (Tue, 20 Dec 2011)
New Revision: 11920

Modified:
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
   branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java
Log:
https://issues.jboss.org/browse/JBPAPP-7161 - improving deletion of messages after duplicate detection

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java	2011-12-21 02:13:14 UTC (rev 11919)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java	2011-12-21 02:47:09 UTC (rev 11920)
@@ -1181,6 +1181,8 @@
             {
                context.getTransaction().markAsRollbackOnly(new HornetQException(HornetQException.DUPLICATE_ID_REJECTED, warnMessage.toString()));
             }
+            
+            message.decrementRefCount();
 
             return false;
          }
@@ -1196,6 +1198,9 @@
          cacheBridge.addToCache(bridgeDupBytes, context.getTransaction());
 
          message.removeProperty(MessageImpl.HDR_BRIDGE_DUPLICATE_ID);
+         
+         message.decrementRefCount();
+
       }
 
       byte[] duplicateIDBytes = message.getDuplicateIDBytes();
@@ -1219,6 +1224,8 @@
             {
                context.getTransaction().markAsRollbackOnly(new HornetQException(HornetQException.DUPLICATE_ID_REJECTED, warnMessage));
             }
+            
+            message.decrementRefCount();
 
             return false;
          }

Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java	2011-12-21 02:13:14 UTC (rev 11919)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java	2011-12-21 02:47:09 UTC (rev 11920)
@@ -30,6 +30,7 @@
 import org.hornetq.core.client.impl.ClientConsumerInternal;
 import org.hornetq.core.config.Configuration;
 import org.hornetq.core.logging.Logger;
+import org.hornetq.core.message.impl.MessageImpl;
 import org.hornetq.core.persistence.impl.journal.JournalStorageManager;
 import org.hornetq.core.persistence.impl.journal.LargeServerMessageImpl;
 import org.hornetq.core.server.HornetQServer;
@@ -73,7 +74,7 @@
    {
       return false;
    }
-   
+
    /**
     * 
     */
@@ -92,34 +93,32 @@
 
    public void testRollbackPartiallyConsumedBuffer() throws Exception
    {
-      for (int i = 0 ; i < 1; i++)
+      for (int i = 0; i < 1; i++)
       {
          log.info("#test " + i);
          internalTestRollbackPartiallyConsumedBuffer(false);
          tearDown();
          setUp();
-         
+
       }
-      
+
    }
-   
+
    public void testRollbackPartiallyConsumedBufferWithRedeliveryDelay() throws Exception
    {
       internalTestRollbackPartiallyConsumedBuffer(true);
    }
-   
-   
+
    private void internalTestRollbackPartiallyConsumedBuffer(final boolean redeliveryDelay) throws Exception
    {
       final int messageSize = 100 * 1024;
-      
 
       final ClientSession session;
 
       try
       {
          server = createServer(true, isNetty());
-         
+
          AddressSettings settings = new AddressSettings();
          if (redeliveryDelay)
          {
@@ -130,7 +129,7 @@
             }
          }
          settings.setMaxDeliveryAttempts(-1);
-         
+
          server.getAddressSettingsRepository().addMatch("#", settings);
 
          server.start();
@@ -143,35 +142,36 @@
 
          ClientProducer producer = session.createProducer(LargeMessageTest.ADDRESS);
 
-         for (int i = 0 ; i < 20; i++)
+         for (int i = 0; i < 20; i++)
          {
             Message clientFile = createLargeClientMessage(session, messageSize, true);
-            
+
             clientFile.putIntProperty("value", i);
-   
+
             producer.send(clientFile);
          }
 
          session.commit();
 
          session.start();
-         
+
          final CountDownLatch latch = new CountDownLatch(1);
-         
+
          final AtomicInteger errors = new AtomicInteger(0);
 
          ClientConsumer consumer = session.createConsumer(LargeMessageTest.ADDRESS);
-         
+
          consumer.setMessageHandler(new MessageHandler()
          {
             int counter = 0;
+
             public void onMessage(ClientMessage message)
             {
                message.getBodyBuffer().readByte();
                System.out.println("message:" + message);
                try
                {
-                  if (counter ++ <  20)
+                  if (counter++ < 20)
                   {
                      Thread.sleep(100);
                      System.out.println("Rollback");
@@ -183,7 +183,7 @@
                      message.acknowledge();
                      session.commit();
                   }
-                  
+
                   if (counter == 40)
                   {
                      latch.countDown();
@@ -197,7 +197,7 @@
                }
             }
          });
-         
+
          assertTrue(latch.await(40, TimeUnit.SECONDS));
 
          consumer.close();
@@ -975,6 +975,74 @@
       }
    }
 
+   public void testSentWithDuplicateID() throws Exception
+   {
+      final int messageSize = 3 * HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE;
+
+      ClientSession session = null;
+
+      try
+      {
+         server = createServer(true, isNetty());
+
+         server.start();
+
+         ClientSessionFactory sf = locator.createSessionFactory();
+
+         session = sf.createSession(true, true, 0);
+
+         session.createQueue(LargeMessageTest.ADDRESS, LargeMessageTest.ADDRESS, true);
+
+         ClientProducer producer = session.createProducer(LargeMessageTest.ADDRESS);
+
+         String someDuplicateInfo = "Anything";
+
+         for (int i = 0; i < 10; i++)
+         {
+            Message clientFile = createLargeClientMessage(session, messageSize, true);
+
+            clientFile.putBytesProperty(MessageImpl.HDR_BRIDGE_DUPLICATE_ID,  someDuplicateInfo.getBytes());
+
+            producer.send(clientFile);
+         }
+         
+         ClientConsumer consumer = session.createConsumer(ADDRESS);
+         
+         session.start();
+         
+         ClientMessage msg = consumer.receive(10000);
+         
+         assertNotNull(msg);
+         
+         msg.acknowledge();
+         
+         assertNull(consumer.receiveImmediate());
+         
+         session.commit();
+         
+         validateNoFilesOnLargeDir();
+         
+      }
+      finally
+      {
+         try
+         {
+            server.stop();
+         }
+         catch (Throwable ignored)
+         {
+         }
+
+         try
+         {
+            session.close();
+         }
+         catch (Throwable ignored)
+         {
+         }
+      }
+   }
+
    public void testResendSmallStreamMessage() throws Exception
    {
       internalTestResendMessage(50000);
@@ -1729,7 +1797,6 @@
                  100);
    }
 
-
    public void testPageOnLargeMessage() throws Exception
    {
       testPageOnLargeMessage(true, false);
@@ -2600,7 +2667,7 @@
          }
       }
    }
-   
+
    // JBPAPP-6237
    public void testPageOnLargeMessageMultipleQueues() throws Exception
    {
@@ -2754,7 +2821,6 @@
 
    }
 
-
    // JBPAPP-6237
    public void testPageOnLargeMessageMultipleQueues2() throws Exception
    {
@@ -2796,7 +2862,7 @@
          for (int i = 0; i < 100; i++)
          {
             ClientMessage message = session.createMessage(true);
-            
+
             message.putIntProperty("msgID", msgId++);
 
             message.putBooleanProperty("TestLarge", false);
@@ -2813,7 +2879,6 @@
             producer.send(message);
          }
 
-
          for (int i = 0; i < 10; i++)
          {
             ClientMessage clientFile = createLargeClientMessage(session, numberOfBytesBigMessage);
@@ -2830,34 +2895,34 @@
             ClientConsumer consumer = session.createConsumer(LargeMessageTest.ADDRESS.concat("-" + ad));
 
             session.start();
-            
-            for (int received = 0 ; received < 5; received++)
+
+            for (int received = 0; received < 5; received++)
             {
                for (int i = 0; i < 100; i++)
                {
                   ClientMessage message2 = consumer.receive(LargeMessageTest.RECEIVE_WAIT_TIME);
-   
+
                   Assert.assertNotNull(message2);
-                  
+
                   assertFalse(message2.getBooleanProperty("TestLarge"));
-   
+
                   message2.acknowledge();
-   
+
                   Assert.assertNotNull(message2);
                }
-   
+
                for (int i = 0; i < 10; i++)
                {
                   ClientMessage messageLarge = consumer.receive(RECEIVE_WAIT_TIME);
-   
+
                   Assert.assertNotNull(messageLarge);
-                  
+
                   assertTrue(messageLarge.getBooleanProperty("TestLarge"));
-   
+
                   ByteArrayOutputStream bout = new ByteArrayOutputStream();
-                  
+
                   messageLarge.acknowledge();
-                  
+
                   messageLarge.saveToOutputStream(bout);
                   byte[] body = bout.toByteArray();
                   assertEquals(numberOfBytesBigMessage, body.length);
@@ -2866,7 +2931,7 @@
                      assertEquals(getSamplebyte(bi), body[bi]);
                   }
                }
-               
+
                session.rollback();
             }
 



More information about the hornetq-commits mailing list