[hornetq-commits] JBoss hornetq SVN: r11540 - trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client.

do-not-reply at jboss.org do-not-reply at jboss.org
Fri Oct 14 12:54:26 EDT 2011


Author: clebert.suconic at jboss.com
Date: 2011-10-14 12:54:26 -0400 (Fri, 14 Oct 2011)
New Revision: 11540

Modified:
   trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/ConsumerWindowSizeTest.java
   trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/InterruptedLargeMessageTest.java
   trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/LargeMessageTest.java
Log:
JBPAPP-7389 - flow control on large messages

Modified: trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/ConsumerWindowSizeTest.java
===================================================================
--- trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/ConsumerWindowSizeTest.java	2011-10-14 14:36:55 UTC (rev 11539)
+++ trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/ConsumerWindowSizeTest.java	2011-10-14 16:54:26 UTC (rev 11540)
@@ -37,6 +37,7 @@
 import org.hornetq.core.server.Consumer;
 import org.hornetq.core.server.HornetQServer;
 import org.hornetq.core.server.impl.ServerConsumerImpl;
+import org.hornetq.core.settings.impl.AddressSettings;
 import org.hornetq.tests.util.ServiceTestBase;
 
 /**
@@ -911,6 +912,123 @@
       internalTestSlowConsumerOnMessageHandlerNoBuffers(true);
    }
 
+   public void testFlowControl() throws Exception
+   {
+      internalTestFlowControlOnRollback(false);
+   }
+   
+   public void testFlowControlLargeMessage() throws Exception
+   {
+      internalTestFlowControlOnRollback(true);
+   }
+   
+   private void internalTestFlowControlOnRollback(final boolean isLargeMessage) throws Exception
+   {
+
+      HornetQServer server = createServer(false, isNetty());
+      
+      AddressSettings settings = new AddressSettings();
+      settings.setMaxDeliveryAttempts(-1);
+      server.getAddressSettingsRepository().addMatch("#", settings);
+
+      ClientSession session = null;
+
+      try
+      {
+         final int numberOfMessages = 100;
+
+         server.start();
+
+         locator.setConsumerWindowSize(300000);
+         
+         if (isLargeMessage)
+         {
+            // something to ensure we are using large messages
+            locator.setMinLargeMessageSize(100);
+         }
+         else
+         {
+            // To make sure large messages won't kick in, we set anything large
+            locator.setMinLargeMessageSize(Integer.MAX_VALUE);
+         }
+
+         ClientSessionFactory sf = locator.createSessionFactory();
+
+         session = sf.createSession(false, false, false);
+
+         SimpleString ADDRESS = new SimpleString("some-queue");
+
+         session.createQueue(ADDRESS, ADDRESS, true);
+         
+         
+         ClientProducer producer = session.createProducer(ADDRESS);
+         
+         for (int i = 0 ; i < numberOfMessages; i++)
+         {
+            ClientMessage msg = session.createMessage(true);
+            msg.putIntProperty("count", i);
+            msg.getBodyBuffer().writeBytes(new byte[1024]);
+            producer.send(msg);
+         }
+         
+         session.commit();
+         
+         ClientConsumerInternal consumer = (ClientConsumerInternal)session.createConsumer(ADDRESS);
+         
+         session.start();
+
+         for (int repeat = 0; repeat < 100; repeat ++)
+         {
+            System.out.println("Repeat " + repeat);
+            long timeout = System.currentTimeMillis() + 2000;
+            // At least 10 messages on the buffer
+            while (timeout > System.currentTimeMillis() && consumer.getBufferSize() <= 10)
+            {
+               Thread.sleep(10);
+            }
+            assertTrue(consumer.getBufferSize() >= 10);
+            
+            ClientMessage msg = consumer.receive(500);
+            msg.getBodyBuffer().readByte();
+            assertNotNull(msg);
+            msg.acknowledge();
+            session.rollback();
+         }
+         
+         
+         for (int i = 0 ; i < numberOfMessages; i++)
+         {
+            ClientMessage msg = consumer.receive(5000);
+            assertNotNull(msg);
+            System.out.println("msg " + msg);
+            msg.getBodyBuffer().readByte();
+            msg.acknowledge();
+            session.commit();
+         }
+
+      }
+      finally
+      {
+         try
+         {
+            if (session != null)
+            {
+               session.close();
+            }
+         }
+         catch (Exception ignored)
+         {
+         }
+
+         if (server.isStarted())
+         {
+            server.stop();
+         }
+      }
+   }
+
+
+
    public void internalTestSlowConsumerOnMessageHandlerNoBuffers(final boolean largeMessages) throws Exception
    {
 

Modified: trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/InterruptedLargeMessageTest.java
===================================================================
--- trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/InterruptedLargeMessageTest.java	2011-10-14 14:36:55 UTC (rev 11539)
+++ trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/InterruptedLargeMessageTest.java	2011-10-14 16:54:26 UTC (rev 11540)
@@ -408,6 +408,7 @@
          {
             Message clientFile = createLargeClientMessage(session, messageSize, true);
             clientFile.putIntProperty("txid", 2);
+            clientFile.putIntProperty("i", i);
             producer.send(clientFile);
          }
          session.end(xid2, XAResource.TMSUCCESS);
@@ -422,6 +423,7 @@
          
          for (int start = 0 ; start < 2; start++)
          {
+            System.out.println("Start " + start);
             
             sf = locator.createSessionFactory();
             
@@ -437,6 +439,7 @@
             session.start();
             for (int i = 0 ; i < 10; i++)
             {
+               log.info("I = " + i);
                ClientMessage msg = cons1.receive(5000);
                assertNotNull(msg);
                assertEquals(1, msg.getIntProperty("txid").intValue());

Modified: trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/LargeMessageTest.java
===================================================================
--- trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/LargeMessageTest.java	2011-10-14 14:36:55 UTC (rev 11539)
+++ trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/client/LargeMessageTest.java	2011-10-14 16:54:26 UTC (rev 11540)
@@ -15,6 +15,9 @@
 
 import java.io.ByteArrayOutputStream;
 import java.util.HashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import javax.transaction.xa.XAResource;
 import javax.transaction.xa.Xid;
@@ -70,7 +73,136 @@
    {
       return false;
    }
+   
+   public void testRollbackPartiallyConsumedBuffer() throws Exception
+   {
+      for (int i = 0 ; i < 1; i++)
+      {
+         log.info("#test " + i);
+         internalTestRollbackPartiallyConsumedBuffer(false);
+         tearDown();
+         setUp();
+         
+      }
+      
+   }
+   
+   public void testRollbackPartiallyConsumedBufferWithRedeliveryDelay() throws Exception
+   {
+      internalTestRollbackPartiallyConsumedBuffer(true);
+   }
+   
+   
+   private void internalTestRollbackPartiallyConsumedBuffer(final boolean redeliveryDelay) throws Exception
+   {
+      final int messageSize = 100 * 1024;
+      
 
+      final ClientSession session;
+
+      try
+      {
+         server = createServer(true, isNetty());
+         
+         AddressSettings settings = new AddressSettings();
+         if (redeliveryDelay)
+         {
+            settings.setRedeliveryDelay(1000);
+            if (locator.isCompressLargeMessage())
+            {
+               locator.setConsumerWindowSize(0);
+            }
+         }
+         settings.setMaxDeliveryAttempts(-1);
+         
+         server.getAddressSettingsRepository().addMatch("#", settings);
+
+         server.start();
+
+         ClientSessionFactory sf = locator.createSessionFactory();
+
+         session = sf.createSession(false, false, false);
+
+         session.createQueue(ADDRESS, ADDRESS, true);
+
+         ClientProducer producer = session.createProducer(LargeMessageTest.ADDRESS);
+
+         for (int i = 0 ; i < 20; i++)
+         {
+            Message clientFile = createLargeClientMessage(session, messageSize, true);
+            
+            clientFile.putIntProperty("value", i);
+   
+            producer.send(clientFile);
+         }
+
+         session.commit();
+
+         session.start();
+         
+         final CountDownLatch latch = new CountDownLatch(1);
+         
+         final AtomicInteger errors = new AtomicInteger(0);
+
+         ClientConsumer consumer = session.createConsumer(LargeMessageTest.ADDRESS);
+         
+         consumer.setMessageHandler(new MessageHandler()
+         {
+            int counter = 0;
+            public void onMessage(ClientMessage message)
+            {
+               message.getBodyBuffer().readByte();
+               System.out.println("message:" + message);
+               try
+               {
+                  if (counter ++ <  20)
+                  {
+                     Thread.sleep(100);
+                     System.out.println("Rollback");
+                     message.acknowledge();
+                     session.rollback();
+                  }
+                  else
+                  {
+                     message.acknowledge();
+                     session.commit();
+                  }
+                  
+                  if (counter == 40)
+                  {
+                     latch.countDown();
+                  }
+               }
+               catch (Exception e)
+               {
+                  latch.countDown();
+                  e.printStackTrace();
+                  errors.incrementAndGet();
+               }
+            }
+         });
+         
+         assertTrue(latch.await(40, TimeUnit.SECONDS));
+
+         consumer.close();
+
+         session.close();
+
+         validateNoFilesOnLargeDir();
+      }
+      finally
+      {
+         try
+         {
+            server.stop();
+         }
+         catch (Throwable ignored)
+         {
+         }
+      }
+
+   }
+
    public void testCloseConsumer() throws Exception
    {
       final int messageSize = (int)(3.5 * HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE);
@@ -124,7 +256,7 @@
       {
          try
          {
-            server.stop();
+            session.close();
          }
          catch (Throwable ignored)
          {
@@ -132,7 +264,7 @@
 
          try
          {
-            session.close();
+            server.stop();
          }
          catch (Throwable ignored)
          {
@@ -500,16 +632,17 @@
          ClientConsumer consumerExpiry = session.createConsumer(ADDRESS_EXPIRY);
 
          ClientMessage msg1 = consumerExpiry.receive(5000);
+         assertTrue(msg1.isLargeMessage());
          Assert.assertNotNull(msg1);
          msg1.acknowledge();
 
-         session.rollback();
-
          for (int j = 0; j < messageSize; j++)
          {
             Assert.assertEquals(UnitTestCase.getSamplebyte(j), msg1.getBodyBuffer().readByte());
          }
 
+         session.rollback();
+
          consumerExpiry.close();
 
          for (int i = 0; i < 10; i++)
@@ -521,13 +654,13 @@
             Assert.assertNotNull(msg1);
             msg1.acknowledge();
 
-            session.rollback();
-
             for (int j = 0; j < messageSize; j++)
             {
                Assert.assertEquals(UnitTestCase.getSamplebyte(j), msg1.getBodyBuffer().readByte());
             }
 
+            session.rollback();
+
             consumerExpiry.close();
          }
 
@@ -638,13 +771,13 @@
          Assert.assertNotNull(msg1);
          msg1.acknowledge();
 
-         session.rollback();
-
          for (int j = 0; j < messageSize; j++)
          {
             Assert.assertEquals(UnitTestCase.getSamplebyte(j), msg1.getBodyBuffer().readByte());
          }
 
+         session.rollback();
+
          consumerExpiry.close();
 
          for (int i = 0; i < 10; i++)
@@ -655,13 +788,13 @@
             Assert.assertNotNull(msg1);
             msg1.acknowledge();
 
-            session.rollback();
-
             for (int j = 0; j < messageSize; j++)
             {
                Assert.assertEquals(UnitTestCase.getSamplebyte(j), msg1.getBodyBuffer().readByte());
             }
 
+            session.rollback();
+
             consumerExpiry.close();
          }
 
@@ -1892,6 +2025,7 @@
 
          ClientConsumer consumer = session.createConsumer(queue[1]);
          ClientMessage msg = consumer.receive(LargeMessageTest.RECEIVE_WAIT_TIME);
+         msg.getBodyBuffer().readByte();
          Assert.assertNull(consumer.receiveImmediate());
          Assert.assertNotNull(msg);
 



More information about the hornetq-commits mailing list