[hornetq-commits] JBoss hornetq SVN: r8196 - in trunk: src/main/org/hornetq/core/persistence/impl/nullpm and 3 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Tue Nov 3 13:03:05 EST 2009


Author: timfox
Date: 2009-11-03 13:03:04 -0500 (Tue, 03 Nov 2009)
New Revision: 8196

Modified:
   trunk/src/main/org/hornetq/core/client/impl/ClientProducerImpl.java
   trunk/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageLargeServerMessage.java
   trunk/src/main/org/hornetq/core/server/group/impl/LocalGroupingHandler.java
   trunk/src/main/org/hornetq/core/server/group/impl/RemoteGroupingHandler.java
   trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
   trunk/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java
   trunk/tests/src/org/hornetq/tests/integration/client/ProducerFlowControlTest.java
Log:
some tweaks and some failing tests in LargeMessageTest

Modified: trunk/src/main/org/hornetq/core/client/impl/ClientProducerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ClientProducerImpl.java	2009-11-03 16:55:02 UTC (rev 8195)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientProducerImpl.java	2009-11-03 18:03:04 UTC (rev 8196)
@@ -236,9 +236,20 @@
       SessionSendMessage message = new SessionSendMessage(msg, sendBlocking);
       
       session.workDone();
-            
+      
+      boolean large;
+      
       if (msg.getBodyInputStream() != null || msg.getEncodeSize() >= minLargeMessageSize || msg.isLargeMessage())
       {
+         large = true;
+      }
+      else
+      {
+         large = false;
+      }
+            
+      if (large)
+      {
          sendMessageInChunks(sendBlocking, msg);
       }      
       else if (sendBlocking)
@@ -257,7 +268,17 @@
          //Note, that for a large message, the encode size only includes the properties + headers
          //Not the continuations, but this is ok since we are only interested in limiting the amount of
          //data in *memory* and continuations go straight to the disk
-         theCredits.acquireCredits(msg.getEncodeSize());
+         
+         if (large)
+         {
+            //TODO this is pretty hacky - we should define consistent meanings of encode size             
+                        
+            theCredits.acquireCredits(msg.getHeadersAndPropertiesEncodeSize());
+         }
+         else
+         {         
+            theCredits.acquireCredits(msg.getEncodeSize());
+         }
       }
       catch (InterruptedException e)
       {         

Modified: trunk/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageLargeServerMessage.java
===================================================================
--- trunk/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageLargeServerMessage.java	2009-11-03 16:55:02 UTC (rev 8195)
+++ trunk/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageLargeServerMessage.java	2009-11-03 18:03:04 UTC (rev 8196)
@@ -153,6 +153,12 @@
    {
 
    }
+   
+   @Override
+   public synchronized int getEncodeSize()
+   {
+      return getHeadersAndPropertiesEncodeSize();
+   }
 
    public LargeMessageEncodingContext createNewContext()
    {

Modified: trunk/src/main/org/hornetq/core/server/group/impl/LocalGroupingHandler.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/group/impl/LocalGroupingHandler.java	2009-11-03 16:55:02 UTC (rev 8195)
+++ trunk/src/main/org/hornetq/core/server/group/impl/LocalGroupingHandler.java	2009-11-03 18:03:04 UTC (rev 8196)
@@ -70,7 +70,6 @@
 
    public Response propose(final Proposal proposal) throws Exception
    {
-      log.info("proposing proposal " + proposal);
       if (proposal.getClusterName() == null)
       {
          GroupBinding original = map.get(proposal.getGroupId());

Modified: trunk/src/main/org/hornetq/core/server/group/impl/RemoteGroupingHandler.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/group/impl/RemoteGroupingHandler.java	2009-11-03 16:55:02 UTC (rev 8195)
+++ trunk/src/main/org/hornetq/core/server/group/impl/RemoteGroupingHandler.java	2009-11-03 18:03:04 UTC (rev 8196)
@@ -101,8 +101,6 @@
 
          Notification notification = new Notification(null, NotificationType.PROPOSAL, props);
 
-         log.info("sending proposal " + proposal);
-
          managementService.sendNotification(notification);
 
          sendCondition.await(timeout, TimeUnit.MILLISECONDS);

Modified: trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java	2009-11-03 16:55:02 UTC (rev 8195)
+++ trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java	2009-11-03 18:03:04 UTC (rev 8196)
@@ -88,7 +88,6 @@
 import org.hornetq.core.server.LargeServerMessage;
 import org.hornetq.core.server.MessageReference;
 import org.hornetq.core.server.Queue;
-import org.hornetq.core.server.RoutingContext;
 import org.hornetq.core.server.ServerConsumer;
 import org.hornetq.core.server.ServerMessage;
 import org.hornetq.core.server.ServerSession;
@@ -166,6 +165,8 @@
    private volatile LargeServerMessage currentLargeMessage;
 
    private ServerSessionPacketHandler handler;
+   
+   private boolean closed;
 
    // Constructors ---------------------------------------------------------------------------------
 
@@ -287,8 +288,6 @@
       }
    }
 
-   private boolean closed;
-
    public synchronized void close() throws Exception
    {
       if (tx != null && tx.getXid() == null)

Modified: trunk/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java	2009-11-03 16:55:02 UTC (rev 8195)
+++ trunk/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java	2009-11-03 18:03:04 UTC (rev 8196)
@@ -14,6 +14,8 @@
 package org.hornetq.tests.integration.client;
 
 import java.util.HashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 
 import javax.transaction.xa.XAResource;
 import javax.transaction.xa.Xid;
@@ -26,6 +28,7 @@
 import org.hornetq.core.client.ClientProducer;
 import org.hornetq.core.client.ClientSession;
 import org.hornetq.core.client.ClientSessionFactory;
+import org.hornetq.core.client.MessageHandler;
 import org.hornetq.core.client.impl.ClientConsumerInternal;
 import org.hornetq.core.client.impl.ClientSessionFactoryImpl;
 import org.hornetq.core.config.Configuration;
@@ -70,6 +73,256 @@
 
    // Public --------------------------------------------------------
 
+//   public void testFlowControlWithSyncReceiveNettyZeroConsumerWindowSize() throws Exception
+//   {
+//      testFlowControlWithSyncReceive(true, 0);
+//   }
+//   
+//   public void testFlowControlWithSyncReceiveInVMZeroConsumerWindowSize() throws Exception
+//   {
+//      testFlowControlWithSyncReceive(false, 0);
+//   }
+//   
+//   public void testFlowControlWithSyncReceiveNettySmallConsumerWindowSize() throws Exception
+//   {
+//      testFlowControlWithSyncReceive(true, 1000);
+//   }
+//   
+//   public void testFlowControlWithSyncReceiveInVMSmallConsumerWindowSize() throws Exception
+//   {
+//      testFlowControlWithSyncReceive(false, 1000);
+//   }
+//   
+//   private void testFlowControlWithSyncReceive(final boolean netty, final int consumerWindowSize) throws Exception
+//   {      
+//      ClientSession session = null;
+//
+//      try
+//      {
+//         if (netty)
+//         {
+//            server = createServer(true, createDefaultConfig(true));
+//         }
+//         else
+//         {
+//            server = createServer(true);
+//         }
+//
+//         server.start();
+//
+//         ClientSessionFactory sf = createInVMFactory();
+//         
+//         sf.setConsumerWindowSize(consumerWindowSize);
+//         sf.setMinLargeMessageSize(1000);
+//
+//         int messageSize = 10000;
+//
+//         session = sf.createSession(false, true, true);
+//
+//         session.createTemporaryQueue(ADDRESS, ADDRESS);
+//
+//         ClientProducer producer = session.createProducer(ADDRESS);
+//
+//         final int numMessages = 1000;
+//
+//         for (int i = 0; i < numMessages; i++)
+//         {
+//            Message clientFile = createLargeClientMessage(session, messageSize, true);
+//
+//            producer.send(clientFile);
+//            
+//            log.info("Sent message " + i);
+//         }
+//
+//         ClientConsumer consumer = session.createConsumer(ADDRESS);
+//         
+//         session.start();
+//         
+//         for (int i = 0; i < numMessages; i++)
+//         {
+//            ClientMessage msg = consumer.receive(1000);
+//            
+//            int availBytes = msg.getBody().readableBytes();
+//            
+//            assertEquals(messageSize, availBytes);
+//            
+//            byte[] bytes = new byte[availBytes];
+//            
+//            msg.getBody().readBytes(bytes);
+//
+//            msg.acknowledge();
+//            
+//            log.info("Received message " + i);
+//         }
+//         
+//         session.close();
+//
+//         validateNoFilesOnLargeDir();
+//      }
+//      finally
+//      {
+//         try
+//         {
+//            server.stop();
+//         }
+//         catch (Throwable ignored)
+//         {
+//         }
+//
+//         try
+//         {
+//            session.close();
+//         }
+//         catch (Throwable ignored)
+//         {
+//         }
+//      }
+//   }
+//   
+//   public void testFlowControlWithListenerNettyZeroConsumerWindowSize() throws Exception
+//   {
+//      testFlowControlWithListener(true, 0);
+//   }
+//   
+//   public void testFlowControlWithListenerInVMZeroConsumerWindowSize() throws Exception
+//   {
+//      testFlowControlWithListener(false, 0);
+//   }
+//   
+//   public void testFlowControlWithListenerNettySmallConsumerWindowSize() throws Exception
+//   {
+//      testFlowControlWithListener(true, 1000);
+//   }
+//   
+//   public void testFlowControlWithListenerInVMSmallConsumerWindowSize() throws Exception
+//   {
+//      testFlowControlWithListener(false, 1000);
+//   }
+   
+   private void testFlowControlWithListener(final boolean netty, final int consumerWindowSize) throws Exception
+   {      
+      ClientSession session = null;
+
+      try
+      {
+         if (netty)
+         {
+            server = createServer(true, createDefaultConfig(true));
+         }
+         else
+         {
+            server = createServer(true);
+         }
+
+         server.start();
+
+         ClientSessionFactory sf;
+         
+         if (netty)
+         {
+            sf = createNettyFactory();
+         }
+         else
+         {
+            sf = createInVMFactory();
+         }
+         
+         sf.setConsumerWindowSize(consumerWindowSize);
+         sf.setMinLargeMessageSize(1000);
+
+         final int messageSize = 10000;
+
+         session = sf.createSession(false, true, true);
+
+         session.createTemporaryQueue(ADDRESS, ADDRESS);
+
+         ClientProducer producer = session.createProducer(ADDRESS);
+
+         final int numMessages = 1000;
+
+         for (int i = 0; i < numMessages; i++)
+         {
+            Message clientFile = createLargeClientMessage(session, messageSize, false);
+
+            producer.send(clientFile);
+            
+            log.info("Sent message " + i);
+         }
+
+         ClientConsumer consumer = session.createConsumer(ADDRESS);
+         
+         class MyHandler implements MessageHandler
+         {
+            int count = 0;
+
+            final CountDownLatch latch = new CountDownLatch(1);
+
+            volatile Exception exception;
+
+            public void onMessage(ClientMessage message)
+            {
+               try
+               {
+                  log.info("got message " + count);
+                  
+                  int availBytes = message.getBody().readableBytes();
+                  
+                  assertEquals(messageSize, availBytes);
+                  
+                  byte[] bytes = new byte[availBytes];
+                  
+                  message.getBody().readBytes(bytes);
+                  
+                  message.acknowledge();
+
+                  if (++count == numMessages)
+                  {
+                     latch.countDown();
+                  }
+               }
+               catch (Exception e)
+               {
+                  log.error("Failed to handle message", e);
+
+                  this.exception = e;
+               }
+            }
+         }
+         
+         MyHandler handler = new MyHandler();
+         
+         consumer.setMessageHandler(handler);
+         
+         session.start();
+         
+         handler.latch.await(10000, TimeUnit.MILLISECONDS);
+         
+         assertNull(handler.exception);
+         
+         session.close();
+
+         validateNoFilesOnLargeDir();
+      }
+      finally
+      {
+         try
+         {
+            server.stop();
+         }
+         catch (Throwable ignored)
+         {
+         }
+
+         try
+         {
+            session.close();
+         }
+         catch (Throwable ignored)
+         {
+         }
+      }
+   }
+
    public void testCloseConsumer() throws Exception
    {
       final int messageSize = (int)(3.5 * ClientSessionFactoryImpl.DEFAULT_MIN_LARGE_MESSAGE_SIZE);
@@ -437,9 +690,9 @@
          consumerExpiry = session.createConsumer(ADDRESS_DLA);
 
          msg1 = consumerExpiry.receive(5000);
-         
+
          assertNotNull(msg1);
-         
+
          msg1.acknowledge();
 
          for (int i = 0; i < messageSize; i++)
@@ -545,7 +798,7 @@
          consumerExpiry.close();
 
          for (int i = 0; i < 10; i++)
-         { 
+         {
             consumerExpiry = session.createConsumer(ADDRESS_DLA);
 
             msg1 = consumerExpiry.receive(5000);
@@ -1054,37 +1307,121 @@
 
    public void testFilePersistenceDelayed() throws Exception
    {
-      testChunks(false, false, true, false, true, false, false, false, false, 1, LARGE_MESSAGE_SIZE, RECEIVE_WAIT_TIME, 2000);
+      testChunks(false,
+                 false,
+                 true,
+                 false,
+                 true,
+                 false,
+                 false,
+                 false,
+                 false,
+                 1,
+                 LARGE_MESSAGE_SIZE,
+                 RECEIVE_WAIT_TIME,
+                 2000);
    }
 
    public void testFilePersistenceDelayedConsumer() throws Exception
    {
-      testChunks(false, false, true, false, true, false, false, false, true, 1, LARGE_MESSAGE_SIZE, RECEIVE_WAIT_TIME, 2000);
+      testChunks(false,
+                 false,
+                 true,
+                 false,
+                 true,
+                 false,
+                 false,
+                 false,
+                 true,
+                 1,
+                 LARGE_MESSAGE_SIZE,
+                 RECEIVE_WAIT_TIME,
+                 2000);
    }
 
    public void testFilePersistenceDelayedXA() throws Exception
    {
-      testChunks(true, false, true, false, true, false, false, false, false, 1, LARGE_MESSAGE_SIZE, RECEIVE_WAIT_TIME, 2000);
+      testChunks(true,
+                 false,
+                 true,
+                 false,
+                 true,
+                 false,
+                 false,
+                 false,
+                 false,
+                 1,
+                 LARGE_MESSAGE_SIZE,
+                 RECEIVE_WAIT_TIME,
+                 2000);
    }
 
    public void testFilePersistenceDelayedXAConsumer() throws Exception
    {
-      testChunks(true, false, true, false, true, false, false, false, true, 1, LARGE_MESSAGE_SIZE, RECEIVE_WAIT_TIME, 2000);
+      testChunks(true,
+                 false,
+                 true,
+                 false,
+                 true,
+                 false,
+                 false,
+                 false,
+                 true,
+                 1,
+                 LARGE_MESSAGE_SIZE,
+                 RECEIVE_WAIT_TIME,
+                 2000);
    }
 
    public void testNullPersistence() throws Exception
    {
-      testChunks(false, false, true, false, false, false, false, true, true, 1, LARGE_MESSAGE_SIZE, RECEIVE_WAIT_TIME, 0);
+      testChunks(false,
+                 false,
+                 true,
+                 false,
+                 false,
+                 false,
+                 false,
+                 true,
+                 true,
+                 1,
+                 LARGE_MESSAGE_SIZE,
+                 RECEIVE_WAIT_TIME,
+                 0);
    }
 
    public void testNullPersistenceConsumer() throws Exception
    {
-      testChunks(false, false, true, false, false, false, false, true, true, 1, LARGE_MESSAGE_SIZE, RECEIVE_WAIT_TIME, 0);
+      testChunks(false,
+                 false,
+                 true,
+                 false,
+                 false,
+                 false,
+                 false,
+                 true,
+                 true,
+                 1,
+                 LARGE_MESSAGE_SIZE,
+                 RECEIVE_WAIT_TIME,
+                 0);
    }
 
    public void testNullPersistenceXA() throws Exception
    {
-      testChunks(true, false, true, false, false, false, false, true, false, 1, LARGE_MESSAGE_SIZE, RECEIVE_WAIT_TIME, 0);
+      testChunks(true,
+                 false,
+                 true,
+                 false,
+                 false,
+                 false,
+                 false,
+                 true,
+                 false,
+                 1,
+                 LARGE_MESSAGE_SIZE,
+                 RECEIVE_WAIT_TIME,
+                 0);
    }
 
    public void testNullPersistenceXAConsumer() throws Exception
@@ -1094,22 +1431,70 @@
 
    public void testNullPersistenceDelayed() throws Exception
    {
-      testChunks(false, false, true, false, false, false, false, false, false, 100, LARGE_MESSAGE_SIZE, RECEIVE_WAIT_TIME, 100);
+      testChunks(false,
+                 false,
+                 true,
+                 false,
+                 false,
+                 false,
+                 false,
+                 false,
+                 false,
+                 100,
+                 LARGE_MESSAGE_SIZE,
+                 RECEIVE_WAIT_TIME,
+                 100);
    }
 
    public void testNullPersistenceDelayedConsumer() throws Exception
    {
-      testChunks(false, false, true, false, false, false, false, false, true, 100, LARGE_MESSAGE_SIZE, RECEIVE_WAIT_TIME, 100);
+      testChunks(false,
+                 false,
+                 true,
+                 false,
+                 false,
+                 false,
+                 false,
+                 false,
+                 true,
+                 100,
+                 LARGE_MESSAGE_SIZE,
+                 RECEIVE_WAIT_TIME,
+                 100);
    }
 
    public void testNullPersistenceDelayedXA() throws Exception
    {
-      testChunks(true, false, true, false, false, false, false, false, false, 100, LARGE_MESSAGE_SIZE, RECEIVE_WAIT_TIME, 100);
+      testChunks(true,
+                 false,
+                 true,
+                 false,
+                 false,
+                 false,
+                 false,
+                 false,
+                 false,
+                 100,
+                 LARGE_MESSAGE_SIZE,
+                 RECEIVE_WAIT_TIME,
+                 100);
    }
 
    public void testNullPersistenceDelayedXAConsumer() throws Exception
    {
-      testChunks(true, false, true, false, false, false, false, false, true, 100, LARGE_MESSAGE_SIZE, RECEIVE_WAIT_TIME, 100);
+      testChunks(true,
+                 false,
+                 true,
+                 false,
+                 false,
+                 false,
+                 false,
+                 false,
+                 true,
+                 100,
+                 LARGE_MESSAGE_SIZE,
+                 RECEIVE_WAIT_TIME,
+                 100);
    }
 
    public void testPageOnLargeMessage() throws Exception
@@ -1355,7 +1740,7 @@
    {
       internalTestSendRollback(true, true);
    }
-   
+
    public void testSendRollbackXANonDurable() throws Exception
    {
       internalTestSendRollback(true, false);
@@ -1365,7 +1750,7 @@
    {
       internalTestSendRollback(false, true);
    }
-   
+
    public void testSendRollbackNonDurable() throws Exception
    {
       internalTestSendRollback(false, false);
@@ -2032,7 +2417,7 @@
 
             producer.send(message);
          }
-         
+
          ClientMessage clientFile = createLargeClientMessage(session, numberOfBytesBigMessage);
 
          producer.send(clientFile);

Modified: trunk/tests/src/org/hornetq/tests/integration/client/ProducerFlowControlTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/client/ProducerFlowControlTest.java	2009-11-03 16:55:02 UTC (rev 8195)
+++ trunk/tests/src/org/hornetq/tests/integration/client/ProducerFlowControlTest.java	2009-11-03 18:03:04 UTC (rev 8196)
@@ -86,7 +86,7 @@
       testFlowControl(false, 1000, 0, 10 * 1024, 1024, 1024, 1024, 1, 1, 0, false);
    }
 
-   public void testFlowControlLargeMessagesSmallWindowSize() throws Exception
+   public void testFlowControlLargerMessagesSmallWindowSize() throws Exception
    {
       testFlowControl(false, 1000, 10 * 1024, 10 * 1024, 1024, 1024, 1024, 1, 1, 0, false);
    }
@@ -141,7 +141,7 @@
       testFlowControl(true, 1000, 0, 10 * 1024, 1024, 1024, 1024, 1, 1, 0, false);
    }
 
-   public void testFlowControlLargeMessagesSmallWindowSizeNetty() throws Exception
+   public void testFlowControlLargerMessagesSmallWindowSizeNetty() throws Exception
    {
       testFlowControl(true, 1000, 10 * 1024, 10 * 1024, 1024, 1024, 1024, 1, 1, 0, false);
    }
@@ -173,11 +173,50 @@
                                 final long consumerDelay,
                                 final boolean anon) throws Exception
    {
+      testFlowControl(netty,
+                      numMessages,
+                      messageSize,
+                      maxSize,
+                      producerWindowSize,
+                      consumerWindowSize,
+                      ackBatchSize,
+                      numConsumers,
+                      numProducers,
+                      consumerDelay,
+                      anon,
+                      -1,
+                      false);
+   }
+   
+   public void testFlowControlLargeMessages() throws Exception
+   {
+      testFlowControl(true, 1000, 10000, 100 * 1024, 1024, 1024, 0, 1, 1, 0, false, 1000, true);
+   }
+   
+   public void testFlowControlLargeMessages2() throws Exception
+   {
+      testFlowControl(true, 1000, 10000, -1, 1024, 0, 0, 1, 1, 0, false, 1000, true);
+   }
+
+   private void testFlowControl(final boolean netty,
+                                final int numMessages,
+                                final int messageSize,
+                                final int maxSize,
+                                final int producerWindowSize,
+                                final int consumerWindowSize,
+                                final int ackBatchSize,
+                                final int numConsumers,
+                                final int numProducers,
+                                final long consumerDelay,
+                                final boolean anon,
+                                final int minLargeMessageSize,
+                                final boolean realFiles) throws Exception
+   {
       final SimpleString address = new SimpleString("testaddress");
 
       Configuration config = super.createDefaultConfig(netty);
 
-      HornetQServer server = createServer(false, config);
+      HornetQServer server = createServer(realFiles, config);
 
       AddressSettings addressSettings = new AddressSettings();
       addressSettings.setMaxSizeBytes(maxSize);
@@ -203,6 +242,11 @@
       sf.setConsumerWindowSize(consumerWindowSize);
       sf.setAckBatchSize(ackBatchSize);
 
+      if (minLargeMessageSize != -1)
+      {
+         sf.setMinLargeMessageSize(minLargeMessageSize);
+      }
+
       ClientSession session = sf.createSession(false, true, true, true);
 
       final String queueName = "testqueue";
@@ -212,8 +256,7 @@
          session.createQueue(address, new SimpleString(queueName + i), null, false);
       }
 
-      session.start();
-
+      
       class MyHandler implements MessageHandler
       {
          int count = 0;
@@ -226,6 +269,16 @@
          {
             try
             {
+               log.info("got message " + count);
+               
+               int availBytes = message.getBody().readableBytes();
+               
+               assertEquals(messageSize, availBytes);
+               
+               byte[] bytes = new byte[availBytes];
+               
+               message.getBody().readBytes(bytes);
+               
                message.acknowledge();
 
                if (++count == numMessages * numProducers)
@@ -253,6 +306,8 @@
       {
          handlers[i] = new MyHandler();
 
+         log.info("created consumer");
+         
          ClientConsumer consumer = session.createConsumer(new SimpleString(queueName + i));
 
          consumer.setMessageHandler(handlers[i]);
@@ -289,10 +344,16 @@
             else
             {
                producers[j].send(message);
+               
+               //log.info("sent message " + i);
             }
 
          }
       }
+      
+      log.info("sent messages");
+      
+      session.start();
 
       for (int i = 0; i < numConsumers; i++)
       {
@@ -713,7 +774,6 @@
       assertFalse(store.isExceededAvailableCredits());
 
       server.stop();
-   }     
-   
-   
+   }
+
 }



More information about the hornetq-commits mailing list