[hornetq-commits] JBoss hornetq SVN: r11875 - in branches/Branch_2_2_EAP: tests/src/org/hornetq/tests/integration and 1 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Thu Dec 8 00:13:36 EST 2011


Author: clebert.suconic at jboss.com
Date: 2011-12-08 00:13:35 -0500 (Thu, 08 Dec 2011)
New Revision: 11875

Modified:
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
   branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/InterceptorTest.java
   branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTest.java
Log:
https://issues.jboss.org/browse/JBPAPP-7673 - Changing behaviour on MessageConsumer to do a round trip when no message is on client buffer, so it will verify if the consumer is still connected before the message consumer returns null

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java	2011-12-08 03:05:11 UTC (rev 11874)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java	2011-12-08 05:13:35 UTC (rev 11875)
@@ -104,6 +104,8 @@
    private volatile boolean closed;
 
    private volatile int creditsToSend;
+   
+   private volatile boolean failedOver;
 
    private volatile Exception lastException;
 
@@ -165,7 +167,7 @@
    // ClientConsumer implementation
    // -----------------------------------------------------------------
 
-   private ClientMessage receive(long timeout, final boolean forcingDelivery) throws HornetQException
+   private ClientMessage receive(final long timeout, final boolean forcingDelivery) throws HornetQException
    {
       checkClosed();
 
@@ -194,17 +196,14 @@
 
       receiverThread = Thread.currentThread();
 
-      if (timeout == 0)
-      {
-         // Effectively infinite
-         timeout = Long.MAX_VALUE;
-      }
-
+      // To verify if deliveryForced was already call
       boolean deliveryForced = false;
+      // To control when to call deliveryForce
+      boolean callForceDelivery = false;
 
       long start = -1;
 
-      long toWait = timeout;
+      long toWait = timeout == 0 ? Long.MAX_VALUE : timeout;
 
       try
       {
@@ -231,13 +230,8 @@
                      // we only force delivery once per call to receive
                      if (!deliveryForced)
                      {
-                        if (isTrace)
-                        {
-                           log.trace("Forcing delivery");
-                        }
-                        session.forceDelivery(id, forceDeliveryCount++);
-
-                        deliveryForced = true;
+                        callForceDelivery = true;
+                        break;
                      }
                   }
 
@@ -262,6 +256,35 @@
                }
             }
 
+            if (failedOver)
+            {
+               if (m == null)
+               {
+                  // if failed over and the buffer is null, we reset the state and try it again
+                  failedOver = false;
+                  deliveryForced = false;
+                  toWait = timeout == 0 ? Long.MAX_VALUE : timeout;
+                  continue;
+               }
+               else
+               {
+                  failedOver = false;
+               }
+            }
+            
+            if (callForceDelivery)
+            {
+               if (isTrace)
+               {
+                  log.trace("Forcing delivery");
+               }
+               // JBPAPP-6030 - Calling forceDelivery outside of the lock to avoid distributed dead locks
+               session.forceDelivery(id, forceDeliveryCount++);
+               callForceDelivery = false;
+               deliveryForced = true;
+               continue;
+            }
+
             if (m != null)
             {
                session.workDone();
@@ -351,19 +374,14 @@
 
    public ClientMessage receive(final long timeout) throws HornetQException
    {
-      if (isBrowseOnly())
+      ClientMessage msg = receive(timeout, false);
+      
+      if (msg == null && !closed)
       {
-         ClientMessage msg = receive(timeout, false);
-         if (msg == null)
-         {
-            msg = receive(0, true);
-         }
-         return msg;
+         msg = receive(0, true);
       }
-      else
-      {
-         return receive(timeout, false);
-      }
+      
+      return msg;
    }
 
    public ClientMessage receive() throws HornetQException
@@ -465,6 +483,8 @@
       lastAckedMessage = null;
 
       creditsToSend = 0;
+      
+      failedOver = true;
 
       ackIndividually = false;
    }
@@ -887,6 +907,8 @@
          {
             rateLimiter.limit();
          }
+         
+         failedOver = false;
 
          synchronized (this)
          {

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java	2011-12-08 03:05:11 UTC (rev 11874)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java	2011-12-08 05:13:35 UTC (rev 11875)
@@ -398,15 +398,8 @@
    {
       checkClosed();
 
-      // JBPAPP-6030 - Using the executor to avoid distributed dead locks 
-      executor.execute(new Runnable()
-      {
-         public void run()
-         {
-            SessionForceConsumerDelivery request = new SessionForceConsumerDelivery(consumerID, sequence);
-            channel.send(request);
-         }
-      });
+      SessionForceConsumerDelivery request = new SessionForceConsumerDelivery(consumerID, sequence);
+      channel.send(request);
    }
 
    public ClientConsumer createConsumer(final SimpleString queueName) throws HornetQException

Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/InterceptorTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/InterceptorTest.java	2011-12-08 03:05:11 UTC (rev 11874)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/InterceptorTest.java	2011-12-08 05:13:35 UTC (rev 11875)
@@ -19,6 +19,7 @@
 import org.hornetq.api.core.Message;
 import org.hornetq.api.core.SimpleString;
 import org.hornetq.api.core.client.*;
+import org.hornetq.core.client.impl.ClientConsumerImpl;
 import org.hornetq.core.logging.Logger;
 import org.hornetq.core.protocol.core.Packet;
 import org.hornetq.core.protocol.core.impl.PacketImpl;
@@ -127,16 +128,38 @@
    {
       public boolean intercept(final Packet packet, final RemotingConnection connection) throws HornetQException
       {
+         if (isForceDeliveryResponse(packet))
+         {
+            return true;
+         }
+         
          if (packet.getType() == PacketImpl.SESS_RECEIVE_MSG)
          {
             return false;
          }
-
+         
          return true;
       }
 
    }
+   /**
+    * @param packet
+    */
+   private boolean isForceDeliveryResponse(final Packet packet)
+   {
+      if (packet.getType() == PacketImpl.SESS_RECEIVE_MSG)
+      {
+         SessionReceiveMessage msg = (SessionReceiveMessage) packet;
+         if (msg.getMessage().containsProperty(ClientConsumerImpl.FORCED_DELIVERY_MESSAGE))
+         {
+            return true;
+         }
+      }
+      
+      return false;
+   }
 
+
    private class MyInterceptor5 implements Interceptor
    {
       private final String key;
@@ -224,6 +247,12 @@
 
       public boolean intercept(final Packet packet, final RemotingConnection connection) throws HornetQException
       {
+         
+         if (isForceDeliveryResponse(packet))
+         {
+            return true;
+         }
+         
          if (packet.getType() == PacketImpl.SESS_RECEIVE_MSG)
          {
             SessionReceiveMessage p = (SessionReceiveMessage)packet;
@@ -262,6 +291,8 @@
       for (int i = 0; i < numMessages; i++)
       {
          ClientMessage message = session.createMessage(false);
+         
+         message.putIntProperty("count", i);
 
          message.putStringProperty(InterceptorTest.key, "apple");
 
@@ -275,7 +306,11 @@
       for (int i = 0; i < numMessages; i++)
       {
          ClientMessage message = consumer.receive(1000);
-
+         
+         assertNotNull(message);
+         
+         assertEquals(i, message.getIntProperty("count").intValue());
+         
          Assert.assertEquals("orange", message.getStringProperty(InterceptorTest.key));
       }
 
@@ -413,7 +448,7 @@
       for (int i = 0; i < numMessages; i++)
       {
          ClientMessage message = session.createMessage(false);
-
+         
          producer.send(message);
       }
 
@@ -422,7 +457,7 @@
       session.start();
 
       ClientMessage message = consumer.receive(100);
-
+      
       Assert.assertNull(message);
 
       session.close();

Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTest.java	2011-12-08 03:05:11 UTC (rev 11874)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTest.java	2011-12-08 05:13:35 UTC (rev 11875)
@@ -195,7 +195,7 @@
          ClientMessage m = consumer.receive(1000);
          assertNotNull(m);
          System.out.println("received message " + i);
- //        assertEquals(i, m.getIntProperty("counter").intValue());
+         // assertEquals(i, m.getIntProperty("counter").intValue());
       }
    }
 
@@ -271,10 +271,125 @@
       crash(session);
       endLatch.await(60, TimeUnit.SECONDS);
       assertTrue("received only " + received.size(), received.size() == 500);
-      
+
       session.close();
    }
+   
+   public void testTimeoutOnFailoverConsumeBlocked() throws Exception
+   {
+      locator.setCallTimeout(5000);
+      locator.setBlockOnNonDurableSend(true);
+      locator.setConsumerWindowSize(0);
+      locator.setBlockOnDurableSend(true);
+      locator.setAckBatchSize(0);
+      locator.setBlockOnAcknowledge(true);
+      locator.setReconnectAttempts(-1);
+      locator.setRetryInterval(500);
+      locator.setAckBatchSize(0);
+      ((InVMNodeManager)nodeManager).failoverPause = 5000l;
 
+      ClientSessionFactoryInternal sf = (ClientSessionFactoryInternal)locator.createSessionFactory();
+
+      final ClientSession session = createSession(sf, true, true);
+
+      session.createQueue(FailoverTestBase.ADDRESS, FailoverTestBase.ADDRESS, null, true);
+
+      final ClientProducer producer = session.createProducer(FailoverTestBase.ADDRESS);
+
+      for (int i = 0; i < 500; i++)
+      {
+         ClientMessage message = session.createMessage(true);
+         message.putIntProperty("counter", i);
+         message.putBooleanProperty("end", i == 499);
+         producer.send(message);
+      }
+
+      final CountDownLatch latch = new CountDownLatch(1);
+      final CountDownLatch endLatch = new CountDownLatch(1);
+
+      final ClientConsumer consumer = session.createConsumer(FailoverTestBase.ADDRESS);
+      session.start();
+
+      final Map<Integer, ClientMessage> received = new HashMap<Integer, ClientMessage>();
+
+      Thread t = new Thread()
+      {
+         public void run()
+         {
+            ClientMessage message = null;
+            try
+            {
+               while ((message = getMessage()) != null)
+               {
+                  Integer counter = message.getIntProperty("counter");
+                  received.put(counter, message);
+                  try
+                  {
+                     log.info("acking message = id = " + message.getMessageID() +
+                              ", counter = " +
+                              message.getIntProperty("counter"));
+                     message.acknowledge();
+                  }
+                  catch (HornetQException e)
+                  {
+                     e.printStackTrace();
+                     continue;
+                  }
+                  log.info("Acked counter = " + counter);
+                  if (counter.equals(10))
+                  {
+                     latch.countDown();
+                  }
+                  if (received.size() == 500)
+                  {
+                     endLatch.countDown();
+                  }
+                  
+                  if (message.getBooleanProperty("end"))
+                  {
+                     break;
+                  }
+               }
+            }
+            catch (Exception e)
+            {
+               e.printStackTrace();
+            }
+
+         }
+         
+         private ClientMessage getMessage()
+         {
+            while (true)
+            {
+               try
+               {
+                  ClientMessage msg = consumer.receive(20000);
+                  if (msg == null)
+                  {
+                     log.info("Returning null message on consuming");
+                  }
+                  return msg;
+               }
+               catch (Exception ignored)
+               {
+                  // retry
+                  ignored.printStackTrace();
+               }
+            }
+         }
+      };
+      t.start();
+      latch.await(10, TimeUnit.SECONDS);
+      log.info("crashing session");
+      crash(session);
+      endLatch.await(60, TimeUnit.SECONDS);
+      t.join();
+      assertTrue("received only " + received.size(), received.size() == 500);
+
+      session.close();
+   }
+
    // https://issues.jboss.org/browse/HORNETQ-685
    public void testTimeoutOnFailoverTransactionCommit() throws Exception
    {



More information about the hornetq-commits mailing list