[hornetq-commits] JBoss hornetq SVN: r10753 - in branches/Branch_2_2_EAP: src/main/org/hornetq/core/client/impl and 5 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Tue May 31 14:19:22 EDT 2011


Author: clebert.suconic at jboss.com
Date: 2011-05-31 14:19:22 -0400 (Tue, 31 May 2011)
New Revision: 10753

Modified:
   branches/Branch_2_2_EAP/examples/jms/paging/src/org/hornetq/jms/example/PagingExample.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/Queue.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/QueueImpl.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
   branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/PagingTest.java
   branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/postoffice/impl/FakeQueue.java
   branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/server/cluster/impl/RemoteQueueBindImplTest.java
Log:
HORNETQ-708 - Fixing consumeImmediate on paging as part of my current work on paging

Modified: branches/Branch_2_2_EAP/examples/jms/paging/src/org/hornetq/jms/example/PagingExample.java
===================================================================
--- branches/Branch_2_2_EAP/examples/jms/paging/src/org/hornetq/jms/example/PagingExample.java	2011-05-28 16:34:41 UTC (rev 10752)
+++ branches/Branch_2_2_EAP/examples/jms/paging/src/org/hornetq/jms/example/PagingExample.java	2011-05-31 18:19:22 UTC (rev 10753)
@@ -87,7 +87,7 @@
          messageProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
 
          // Step 13. Send the message for about 1K, which should be over the memory limit imposed by the server
-         for (int i = 0; i < 1000; i++)
+         for (int i = 0; i < 10000; i++)
          {
             messageProducer.send(message);
          }
@@ -106,7 +106,7 @@
          // paging
          // until messages are ACKed
 
-         for (int i = 0; i < 1000; i++)
+         for (int i = 0; i < 10000; i++)
          {
             message = (BytesMessage)messageConsumer.receive(3000);
 

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java	2011-05-28 16:34:41 UTC (rev 10752)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java	2011-05-31 18:19:22 UTC (rev 10753)
@@ -17,7 +17,6 @@
 import java.util.Iterator;
 import java.util.concurrent.Executor;
 
-import org.hornetq.api.core.HornetQBuffers;
 import org.hornetq.api.core.HornetQException;
 import org.hornetq.api.core.SimpleString;
 import org.hornetq.api.core.client.ClientMessage;
@@ -48,6 +47,8 @@
    // ------------------------------------------------------------------------------------
 
    private static final Logger log = Logger.getLogger(ClientConsumerImpl.class);
+   
+   private static final boolean isTrace = log.isTraceEnabled();
 
    private static final boolean trace = ClientConsumerImpl.log.isTraceEnabled();
 
@@ -223,6 +224,10 @@
                      // we only force delivery once per call to receive
                      if (!deliveryForced)
                      {
+                        if (isTrace)
+                        {
+                           log.trace("Forcing delivery");
+                        }
                         session.forceDelivery(id, forceDeliveryCount++);
 
                         deliveryForced = true;
@@ -258,15 +263,26 @@
                {
                   long seq = m.getLongProperty(ClientConsumerImpl.FORCED_DELIVERY_MESSAGE);
 
-                  if (forcingDelivery && seq == forceDeliveryCount - 1)
+                  // Need to check if forceDelivery was called at this call
+                  // As we could be receiving a message that came from a previous call
+                  if (forcingDelivery && deliveryForced && seq == forceDeliveryCount - 1)
                   {
                      // forced delivery messages are discarded, nothing has been delivered by the queue
                      resetIfSlowConsumer();
+                     
+                     if (isTrace)
+                     {
+                        log.trace("There was nothing on the queue, leaving it now:: returning null");
+                     }
 
                      return null;
                   }
                   else
                   {
+                     if (isTrace)
+                     {
+                        log.trace("Ignored force delivery answer as it belonged to another call");
+                     }
                      // Ignore the message
                      continue;
                   }
@@ -301,11 +317,20 @@
                {
                   largeMessageReceived = m;
                }
+               
+               if (isTrace)
+               {
+                  log.trace("Returning " + m);
+               }
 
                return m;
             }
             else
             {
+               if (isTrace)
+               {
+                  log.trace("Returning null");
+               }
                resetIfSlowConsumer();
                return null;
             }

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/Queue.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/Queue.java	2011-05-28 16:34:41 UTC (rev 10752)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/Queue.java	2011-05-31 18:19:22 UTC (rev 10753)
@@ -71,6 +71,9 @@
    void cancel(MessageReference reference, long timeBase) throws Exception;
 
    void deliverAsync();
+   
+   /** This method will make sure that any pending message (including paged message) will be delivered  */
+   void forceDelivery();
 
    long getMessageCount();
 

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/QueueImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/QueueImpl.java	2011-05-28 16:34:41 UTC (rev 10752)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/QueueImpl.java	2011-05-31 18:19:22 UTC (rev 10753)
@@ -403,6 +403,25 @@
       executor.execute(concurrentPoller);
    }
 
+   public void forceDelivery()
+   {
+      if (pageSubscription != null && pageSubscription.isPaging())
+      {
+         if (isTrace)
+         {
+         	log.trace("Force delivery scheduling depage");
+         }
+         scheduleDepage();
+      }
+      
+      if (isTrace)
+      {
+      	log.trace("Force delivery deliverying async");
+      }
+      
+      deliverAsync();
+   }
+   
    public void deliverAsync()
    {
       getExecutor().execute(deliverRunner);
@@ -1670,6 +1689,11 @@
       
       if (isTrace)
       {
+         if (depaged == 0 && queueMemorySize.get() >= maxSize)
+         {
+            log.trace("Couldn't depage any message as the maxSize on the queue was achieved. There are too many pending messages to be acked in reference to the page configuration");
+         }
+         
          log.trace("Queue Memory Size after depage on queue="+this.getName() + " is " + queueMemorySize.get() + " with maxSize = " + maxSize + ". Depaged " + depaged + " messages");
       }
       

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java	2011-05-28 16:34:41 UTC (rev 10752)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java	2011-05-31 18:19:22 UTC (rev 10753)
@@ -673,7 +673,7 @@
             }
             else
             {
-               messageQueue.deliverAsync();
+               messageQueue.forceDelivery();
             }
          }
       }

Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/PagingTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/PagingTest.java	2011-05-28 16:34:41 UTC (rev 10752)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/PagingTest.java	2011-05-31 18:19:22 UTC (rev 10753)
@@ -354,6 +354,149 @@
 
    }
 
+   public void testReceiveImmediate() throws Exception
+   {
+      clearData();
+
+      Configuration config = createDefaultConfig();
+
+      config.setJournalSyncNonTransactional(false);
+
+      HornetQServer server = createServer(true,
+                                          config,
+                                          PagingTest.PAGE_SIZE,
+                                          PagingTest.PAGE_MAX,
+                                          new HashMap<String, AddressSettings>());
+
+      server.start();
+
+      final int messageSize = 1024;
+
+      final int numberOfMessages = 1000;
+
+      try
+      {
+         ServerLocator locator = createInVMNonHALocator();
+
+         locator.setBlockOnNonDurableSend(true);
+         locator.setBlockOnDurableSend(true);
+         locator.setBlockOnAcknowledge(true);
+
+         ClientSessionFactory sf = locator.createSessionFactory();
+
+         ClientSession session = sf.createSession(false, false, false);
+
+         session.createQueue(PagingTest.ADDRESS, PagingTest.ADDRESS, null, true);
+
+         ClientProducer producer = session.createProducer(PagingTest.ADDRESS);
+
+         ClientMessage message = null;
+
+         byte[] body = new byte[messageSize];
+
+         ByteBuffer bb = ByteBuffer.wrap(body);
+
+         for (int j = 1; j <= messageSize; j++)
+         {
+            bb.put(getSamplebyte(j));
+         }
+
+         for (int i = 0; i < numberOfMessages; i++)
+         {
+            message = session.createMessage(true);
+
+            HornetQBuffer bodyLocal = message.getBodyBuffer();
+
+            bodyLocal.writeBytes(body);
+
+            message.putIntProperty(new SimpleString("id"), i);
+
+            producer.send(message);
+            if (i % 1000 == 0)
+            {
+               session.commit();
+            }
+         }
+         session.commit();
+         session.close();
+         
+         session = null;
+
+         sf.close();
+         locator.close();
+
+         server.stop();
+
+         server = createServer(true,
+                               config,
+                               PagingTest.PAGE_SIZE,
+                               PagingTest.PAGE_MAX,
+                               new HashMap<String, AddressSettings>());
+         server.start();
+
+         locator = createInVMNonHALocator();
+         sf = locator.createSessionFactory();
+
+         Queue queue = server.locateQueue(ADDRESS);
+
+         assertEquals(numberOfMessages, queue.getMessageCount());
+
+         LinkedList<Xid> xids = new LinkedList<Xid>();
+
+         int msgReceived = 0;
+         ClientSession sessionConsumer = sf.createSession(false, false, false);
+         sessionConsumer.start();
+         ClientConsumer consumer = sessionConsumer.createConsumer(PagingTest.ADDRESS);
+         for (int msgCount = 0; msgCount < numberOfMessages; msgCount++)
+         {
+            log.info("Received " + msgCount);
+            msgReceived++;
+            ClientMessage msg = consumer.receiveImmediate();
+            if (msg == null)
+            {
+               log.info("It's null. leaving now");
+               sessionConsumer.commit();
+               fail("Didn't receive a message");
+            }
+            msg.acknowledge();
+            
+            if (msgCount % 5 == 0)
+            {
+               log.info("commit");
+               sessionConsumer.commit();
+            }
+         }
+         
+         sessionConsumer.commit();
+         
+         sessionConsumer.close();
+
+         sf.close();
+
+         locator.close();
+
+         assertEquals(0, queue.getMessageCount());
+
+         long timeout = System.currentTimeMillis() + 5000;
+         while (timeout > System.currentTimeMillis() && queue.getPageSubscription().getPagingStore().isPaging())
+         {
+            Thread.sleep(100);
+         }
+         assertFalse(queue.getPageSubscription().getPagingStore().isPaging());
+      }
+      finally
+      {
+         try
+         {
+            server.stop();
+         }
+         catch (Throwable ignored)
+         {
+         }
+      }
+
+   }
+
    public void testMissingTXEverythingAcked() throws Exception
    {
       clearData();
@@ -3642,11 +3785,18 @@
                }
             });
 
-            if (!message.waitOutputStreamCompletion(5000))
+            try
             {
-               log.info(threadDump("dump"));
-               fail("Couldn't finish large message sending");
+               if (!message.waitOutputStreamCompletion(5000))
+               {
+                  log.info(threadDump("dump"));
+                  fail("Couldn't finish large message receiving");
+               }
             }
+            catch (Throwable e)
+            {
+               fail("Couldn't finish large message receiving for id=" + message.getStringProperty("id") + " with messageID=" + message.getMessageID());
+            }
 
          }
          

Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/postoffice/impl/FakeQueue.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/postoffice/impl/FakeQueue.java	2011-05-28 16:34:41 UTC (rev 10752)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/postoffice/impl/FakeQueue.java	2011-05-31 18:19:22 UTC (rev 10753)
@@ -656,4 +656,13 @@
       return 0;
    }
 
+   /* (non-Javadoc)
+    * @see org.hornetq.core.server.Queue#forceDelivery()
+    */
+   public void forceDelivery()
+   {
+      // TODO Auto-generated method stub
+      
+   }
+
 }
\ No newline at end of file

Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/server/cluster/impl/RemoteQueueBindImplTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/server/cluster/impl/RemoteQueueBindImplTest.java	2011-05-28 16:34:41 UTC (rev 10752)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/server/cluster/impl/RemoteQueueBindImplTest.java	2011-05-31 18:19:22 UTC (rev 10753)
@@ -650,6 +650,15 @@
          return null;
       }
 
+      /* (non-Javadoc)
+       * @see org.hornetq.core.server.Queue#forceDelivery()
+       */
+      public void forceDelivery()
+      {
+         // TODO Auto-generated method stub
+         
+      }
+
    }
 
 }



More information about the hornetq-commits mailing list