[hornetq-commits] JBoss hornetq SVN: r9655 - branches/Branch_2_1/tests/src/org/hornetq/tests/integration/paging.

do-not-reply at jboss.org do-not-reply at jboss.org
Wed Sep 8 12:21:01 EDT 2010


Author: clebert.suconic at jboss.com
Date: 2010-09-08 12:21:01 -0400 (Wed, 08 Sep 2010)
New Revision: 9655

Modified:
   branches/Branch_2_1/tests/src/org/hornetq/tests/integration/paging/PagingSendTest.java
Log:
Adding new test on paging and ordering

Modified: branches/Branch_2_1/tests/src/org/hornetq/tests/integration/paging/PagingSendTest.java
===================================================================
--- branches/Branch_2_1/tests/src/org/hornetq/tests/integration/paging/PagingSendTest.java	2010-09-08 11:30:24 UTC (rev 9654)
+++ branches/Branch_2_1/tests/src/org/hornetq/tests/integration/paging/PagingSendTest.java	2010-09-08 16:21:01 UTC (rev 9655)
@@ -13,6 +13,10 @@
 
 package org.hornetq.tests.integration.paging;
 
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
 import junit.framework.Assert;
 
 import org.hornetq.api.core.SimpleString;
@@ -58,7 +62,7 @@
 
       AddressSettings defaultSetting = new AddressSettings();
       defaultSetting.setPageSizeBytes(10 * 1024);
-      defaultSetting.setMaxSizeBytes(100 * 1024);
+      defaultSetting.setMaxSizeBytes(20 * 1024);
 
       server.getAddressSettingsRepository().addMatch("#", defaultSetting);
 
@@ -155,6 +159,115 @@
       }
    }
 
+   public void testOrderOverTX() throws Exception
+   {
+      HornetQServer server = newHornetQServer();
+
+      server.start();
+
+      try
+      {
+         ClientSessionFactory sf;
+
+         if (isNetty())
+         {
+            sf = createNettyFactory();
+         }
+         else
+         {
+            sf = createInVMFactory();
+         }
+
+         ClientSession sessionConsumer = sf.createSession(true, true, 0);
+
+         sessionConsumer.createQueue(PagingSendTest.ADDRESS, PagingSendTest.ADDRESS, null, true);
+
+         final ClientSession sessionProducer = sf.createSession(false, false);
+         final ClientProducer producer = sessionProducer.createProducer(PagingSendTest.ADDRESS);
+
+         final AtomicInteger errors = new AtomicInteger(0);
+
+         final int TOTAL_MESSAGES = 10000;
+         
+         // Consumer will be ready after we have commits
+         final CountDownLatch ready = new CountDownLatch(1);
+
+         Thread tProducer = new Thread()
+         {
+            public void run()
+            {
+               try
+               {
+                  int commit = 0;
+                  for (int i = 0; i < TOTAL_MESSAGES; i++)
+                  {
+                     ClientMessage msg = sessionProducer.createMessage(true);
+                     msg.getBodyBuffer().writeBytes(new byte[1024]);
+                     msg.putIntProperty("count", i);
+                     producer.send(msg);
+
+                     if (i % 100 == 0 && i > 0)
+                     {
+                        sessionProducer.commit();
+                        if (commit++ > 2)
+                        {
+                           ready.countDown();
+                        }
+                     }
+                  }
+                  
+                  sessionProducer.commit();
+
+               }
+               catch (Exception e)
+               {
+                  e.printStackTrace();
+                  errors.incrementAndGet();
+               }
+            }
+         };
+
+         ClientConsumer consumer = sessionConsumer.createConsumer(PagingSendTest.ADDRESS);
+
+         sessionConsumer.start();
+         
+         tProducer.start();
+         
+         assertTrue(ready.await(10, TimeUnit.SECONDS));
+
+         for (int i = 0; i < TOTAL_MESSAGES; i++)
+         {
+            ClientMessage msg = consumer.receive(10000);
+
+            Assert.assertNotNull(msg);
+            
+            System.out.println("i = " + i);
+            
+            assertEquals(i, msg.getIntProperty("count").intValue());
+            
+            msg.acknowledge();
+         }
+         
+         tProducer.join();
+
+         sessionConsumer.close();
+         
+         sessionProducer.close();
+         
+         assertEquals(0, errors.get());
+      }
+      finally
+      {
+         try
+         {
+            server.stop();
+         }
+         catch (Throwable ignored)
+         {
+         }
+      }
+   }
+
    // Package protected ---------------------------------------------
 
    // Protected -----------------------------------------------------



More information about the hornetq-commits mailing list