[hornetq-commits] JBoss hornetq SVN: r9399 - trunk/tests/src/org/hornetq/tests/integration/client.

do-not-reply at jboss.org do-not-reply at jboss.org
Tue Jul 13 12:22:26 EDT 2010


Author: clebert.suconic at jboss.com
Date: 2010-07-13 12:22:26 -0400 (Tue, 13 Jul 2010)
New Revision: 9399

Modified:
   trunk/tests/src/org/hornetq/tests/integration/client/PagingTest.java
Log:
Adding a new test

Modified: trunk/tests/src/org/hornetq/tests/integration/client/PagingTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/client/PagingTest.java	2010-07-11 11:45:37 UTC (rev 9398)
+++ trunk/tests/src/org/hornetq/tests/integration/client/PagingTest.java	2010-07-13 16:22:26 UTC (rev 9399)
@@ -360,7 +360,155 @@
       }
 
    }
+   
+   
+   /**
+    * - Make a destination in page mode
+    * - Add stuff to a transaction
+    * - Consume the entire destination (not in page mode any more)
+    * - Add stuff to a transaction again
+    * - Check order
+    * 
+    *  Test under discussion at : http://community.jboss.org/thread/154061?tstart=0
+    * 
+    */
+   public void disabled_testDepageDuringTransaction2() throws Exception
+   {
+      boolean IS_DURABLE_MESSAGE = true;
+      clearData();
 
+      Configuration config = createDefaultConfig();
+
+      HornetQServer server = createServer(true,
+                                          config,
+                                          PagingTest.PAGE_SIZE,
+                                          PagingTest.PAGE_MAX,
+                                          new HashMap<String, AddressSettings>());
+
+      server.start();
+
+      final int messageSize = 1024; // 1k
+
+      try
+      {
+         ClientSessionFactory sf = createInVMFactory();
+
+         sf.setBlockOnNonDurableSend(true);
+         sf.setBlockOnDurableSend(true);
+         sf.setBlockOnAcknowledge(true);
+         
+         byte[] body = new byte[messageSize];
+
+         ClientSession sessionTransacted = sf.createSession(null, null, false, false, false, false, 0);
+
+         ClientProducer producerTransacted = sessionTransacted.createProducer(PagingTest.ADDRESS);
+         
+         ClientMessage firstMessage = sessionTransacted.createMessage(IS_DURABLE_MESSAGE);
+         firstMessage.getBodyBuffer().writeBytes(body);
+         firstMessage.putIntProperty(new SimpleString("id"), 0);
+         
+         producerTransacted.send(firstMessage);
+
+         ClientSession session = sf.createSession(null, null, false, true, true, false, 0);
+
+         session.createQueue(PagingTest.ADDRESS, PagingTest.ADDRESS, null, true);
+
+         ClientProducer producer = session.createProducer(PagingTest.ADDRESS);
+
+
+         ClientMessage message = null;
+
+         int numberOfMessages = 0;
+         while (true)
+         {
+            message = session.createMessage(IS_DURABLE_MESSAGE);
+            message.getBodyBuffer().writeBytes(body);
+
+            // Stop sending message as soon as we start paging
+            if (server.getPostOffice().getPagingManager().getPageStore(PagingTest.ADDRESS).isPaging())
+            {
+               break;
+            }
+            numberOfMessages++;
+
+            producer.send(message);
+         }
+
+         Assert.assertTrue(server.getPostOffice().getPagingManager().getPageStore(PagingTest.ADDRESS).isPaging());
+
+         session.start();
+
+         for (int i = 1; i < 10; i++)
+         {
+            message = session.createMessage(true);
+            message.getBodyBuffer().writeBytes(body);
+            message.putIntProperty(new SimpleString("id"), i);
+
+            // Consume messages to force an eventual out of order delivery
+            if (i == 5)
+            {
+               ClientConsumer consumer = session.createConsumer(PagingTest.ADDRESS);
+               for (int j = 0; j < numberOfMessages; j++)
+               {
+                  ClientMessage msg = consumer.receive(PagingTest.RECEIVE_TIMEOUT);
+                  msg.acknowledge();
+                  Assert.assertNotNull(msg);
+               }
+
+               Assert.assertNull(consumer.receiveImmediate());
+               consumer.close();
+            }
+
+            Integer messageID = (Integer)message.getObjectProperty(new SimpleString("id"));
+            Assert.assertNotNull(messageID);
+            Assert.assertEquals(messageID.intValue(), i);
+
+            producerTransacted.send(message);
+         }
+
+         ClientConsumer consumer = session.createConsumer(PagingTest.ADDRESS);
+
+         Assert.assertNull(consumer.receiveImmediate());
+
+         sessionTransacted.commit();
+
+         sessionTransacted.close();
+
+         for (int i = 0; i < 10; i++)
+         {
+            message = consumer.receive(PagingTest.RECEIVE_TIMEOUT);
+
+            Assert.assertNotNull(message);
+
+            Integer messageID = (Integer)message.getObjectProperty(new SimpleString("id"));
+
+            Assert.assertNotNull(messageID);
+            Assert.assertEquals("message received out of order", messageID.intValue(), i);
+
+            message.acknowledge();
+         }
+
+         Assert.assertNull(consumer.receiveImmediate());
+
+         consumer.close();
+
+         session.close();
+      }
+      finally
+      {
+         try
+         {
+            server.stop();
+         }
+         catch (Throwable ignored)
+         {
+         }
+      }
+
+   }
+
+  
+
    public void testPageOnSchedulingNoRestart() throws Exception
    {
       internalTestPageOnScheduling(false);



More information about the hornetq-commits mailing list