[hornetq-commits] JBoss hornetq SVN: r11820 - in branches/Branch_2_2_EAP: tests/src/org/hornetq/tests/integration/cluster/failover and 1 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Fri Dec 2 16:35:47 EST 2011


Author: clebert.suconic at jboss.com
Date: 2011-12-02 16:35:46 -0500 (Fri, 02 Dec 2011)
New Revision: 11820

Modified:
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionInternal.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/DelegatingSession.java
   branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTest.java
   branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/util/InVMNodeManager.java
Log:
https://issues.jboss.org/browse/JBPAPP-6543, https://issues.jboss.org/browse/HORNETQ-685 (Fix provided by Andy Taylor.. I'm just doing the commit after a review)

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java	2011-12-02 21:23:10 UTC (rev 11819)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java	2011-12-02 21:35:46 UTC (rev 11820)
@@ -921,6 +921,17 @@
     */
    private void reconnectSessions(final CoreRemotingConnection oldConnection, final int reconnectAttempts)
    {
+      HashSet<ClientSessionInternal> sessionsToFailover;
+      synchronized (sessions)
+      {
+         sessionsToFailover = new HashSet<ClientSessionInternal>(sessions);
+      }
+
+      for (ClientSessionInternal session : sessionsToFailover)
+      {
+         session.preHandleFailover(connection);
+      }
+
       getConnectionWithRetry(reconnectAttempts);
 
       if (connection == null)
@@ -946,12 +957,6 @@
 
       connection.setFailureListeners(newListeners);
 
-      HashSet<ClientSessionInternal> sessionsToFailover;
-      synchronized (sessions)
-      {
-         sessionsToFailover = new HashSet<ClientSessionInternal>(sessions);
-      }
-
       for (ClientSessionInternal session : sessionsToFailover)
       {
          session.handleFailover(connection);
@@ -968,7 +973,7 @@
                                             " multiplier = " +
                                             retryIntervalMultiplier, new Exception("trace"));
       }
-
+      
       long interval = retryInterval;
 
       int count = 0;
@@ -1042,6 +1047,10 @@
             }
             else
             {
+               if (log.isDebugEnabled())
+               {
+            	   log.debug("Reconnection successfull");
+               }
                return;
             }
          }

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java	2011-12-02 21:23:10 UTC (rev 11819)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java	2011-12-02 21:35:46 UTC (rev 11820)
@@ -757,7 +757,10 @@
       }
 
       checkClosed();
-
+      if (log.isDebugEnabled())
+      {
+         log.debug("client ack messageID = " + messageID);
+      }
       SessionAcknowledgeMessage message = new SessionAcknowledgeMessage(consumerID, messageID, blockOnAcknowledge);
 
       if (blockOnAcknowledge)
@@ -935,6 +938,14 @@
       sendAckHandler = handler;
    }
 
+   public void preHandleFailover(CoreRemotingConnection connection)
+   {
+      // We lock the channel to prevent any packets to be added to the resend
+      // cache during the failover process
+      //we also do this before the connection fails over to give the session a chance to block for failover
+      channel.lock();
+   }
+
    // Needs to be synchronized to prevent issues with occurring concurrently with close()
 
    public void handleFailover(final CoreRemotingConnection backupConnection)
@@ -948,9 +959,6 @@
 
          boolean resetCreditManager = false;
 
-         // We lock the channel to prevent any packets to be added to the resend
-         // cache during the failover process
-         channel.lock();
          try
          {
             channel.transferConnection(backupConnection);

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionInternal.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionInternal.java	2011-12-02 21:23:10 UTC (rev 11819)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionInternal.java	2011-12-02 21:35:46 UTC (rev 11820)
@@ -59,6 +59,8 @@
 
    void handleReceiveContinuation(long consumerID, SessionReceiveContinuationMessage continuation) throws Exception;
 
+   void preHandleFailover(CoreRemotingConnection connection);
+
    void handleFailover(CoreRemotingConnection backupConnection);
 
    RemotingConnection getConnection();
@@ -92,5 +94,4 @@
    void setPacketSize(int packetSize);
 
    void resetIfNeeded() throws HornetQException;
-
 }

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/DelegatingSession.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/DelegatingSession.java	2011-12-02 21:23:10 UTC (rev 11819)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/DelegatingSession.java	2011-12-02 21:35:46 UTC (rev 11820)
@@ -373,6 +373,11 @@
       return session.getXAResource();
    }
 
+   public void preHandleFailover(CoreRemotingConnection connection)
+   {
+      session.preHandleFailover(connection);
+   }
+
    public void handleFailover(final CoreRemotingConnection backupConnection)
    {
       session.handleFailover(backupConnection);

Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTest.java	2011-12-02 21:23:10 UTC (rev 11819)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTest.java	2011-12-02 21:35:46 UTC (rev 11820)
@@ -44,6 +44,7 @@
 import org.hornetq.core.logging.Logger;
 import org.hornetq.core.transaction.impl.XidImpl;
 import org.hornetq.jms.client.HornetQTextMessage;
+import org.hornetq.tests.integration.cluster.util.InVMNodeManager;
 import org.hornetq.tests.integration.cluster.util.TestableServer;
 import org.hornetq.tests.util.RandomUtil;
 
@@ -129,6 +130,246 @@
       return sf.createSession(xa, autoCommitSends, autoCommitAcks);
    }
 
+   // https://issues.jboss.org/browse/HORNETQ-685
+   public void testTimeoutOnFailover() throws Exception
+   {
+      locator.setCallTimeout(5000);
+      locator.setBlockOnNonDurableSend(true);
+      locator.setBlockOnDurableSend(true);
+      locator.setAckBatchSize(0);
+      locator.setReconnectAttempts(-1);
+      ((InVMNodeManager)nodeManager).failoverPause = 5000l;
+
+      ClientSessionFactoryInternal sf = (ClientSessionFactoryInternal)locator.createSessionFactory();
+
+      final ClientSession session = createSession(sf, true, true);
+
+      session.createQueue(FailoverTestBase.ADDRESS, FailoverTestBase.ADDRESS, null, true);
+
+      final ClientProducer producer = session.createProducer(FailoverTestBase.ADDRESS);
+
+      final CountDownLatch latch = new CountDownLatch(10);
+
+      Runnable r = new Runnable()
+      {
+         public void run()
+         {
+            for (int i = 0; i < 500; i++)
+            {
+               ClientMessage message = session.createMessage(true);
+               message.putIntProperty("counter", i);
+               try
+               {
+                  System.out.println("sending message: " + i);
+                  producer.send(message);
+                  if (i < 10)
+                  {
+                     latch.countDown();
+                  }
+               }
+               catch (HornetQException e)
+               {
+                  // this is our retry
+                  try
+                  {
+                     producer.send(message);
+                  }
+                  catch (HornetQException e1)
+                  {
+                     e1.printStackTrace();
+                  }
+               }
+            }
+         }
+      };
+      Thread t = new Thread(r);
+      t.start();
+      latch.await(10, TimeUnit.SECONDS);
+      log.info("crashing session");
+      crash(session);
+      t.join(5000);
+      ClientConsumer consumer = session.createConsumer(FailoverTestBase.ADDRESS);
+      session.start();
+      for (int i = 0; i < 500; i++)
+      {
+         ClientMessage m = consumer.receive(1000);
+         assertNotNull(m);
+         System.out.println("received message " + i);
+ //        assertEquals(i, m.getIntProperty("counter").intValue());
+      }
+   }
+
+   // https://issues.jboss.org/browse/HORNETQ-685
+   public void testTimeoutOnFailoverConsume() throws Exception
+   {
+      locator.setCallTimeout(5000);
+      locator.setBlockOnNonDurableSend(true);
+      locator.setBlockOnDurableSend(true);
+      locator.setAckBatchSize(0);
+      locator.setBlockOnAcknowledge(true);
+      locator.setReconnectAttempts(-1);
+      locator.setRetryInterval(500);
+      locator.setAckBatchSize(0);
+      ((InVMNodeManager)nodeManager).failoverPause = 5000l;
+
+      ClientSessionFactoryInternal sf = (ClientSessionFactoryInternal)locator.createSessionFactory();
+
+      final ClientSession session = createSession(sf, true, true);
+
+      session.createQueue(FailoverTestBase.ADDRESS, FailoverTestBase.ADDRESS, null, true);
+
+      final ClientProducer producer = session.createProducer(FailoverTestBase.ADDRESS);
+
+      for (int i = 0; i < 500; i++)
+      {
+         ClientMessage message = session.createMessage(true);
+         message.putIntProperty("counter", i);
+         producer.send(message);
+      }
+
+      final CountDownLatch latch = new CountDownLatch(1);
+      final CountDownLatch endLatch = new CountDownLatch(1);
+
+      final ClientConsumer consumer = session.createConsumer(FailoverTestBase.ADDRESS);
+      session.start();
+
+      final Map<Integer, ClientMessage> received = new HashMap<Integer, ClientMessage>();
+
+      consumer.setMessageHandler(new MessageHandler()
+      {
+
+         public void onMessage(ClientMessage message)
+         {
+            Integer counter = message.getIntProperty("counter");
+            received.put(counter, message);
+            try
+            {
+               log.info("acking message = id = " + message.getMessageID() +
+                        ", counter = " +
+                        message.getIntProperty("counter"));
+               message.acknowledge();
+            }
+            catch (HornetQException e)
+            {
+               e.printStackTrace();
+               return;
+            }
+            log.info("Acked counter = " + counter);
+            if (counter.equals(10))
+            {
+               latch.countDown();
+            }
+            if (received.size() == 500)
+            {
+               endLatch.countDown();
+            }
+         }
+
+      });
+      latch.await(10, TimeUnit.SECONDS);
+      log.info("crashing session");
+      crash(session);
+      endLatch.await(60, TimeUnit.SECONDS);
+      assertTrue("received only " + received.size(), received.size() == 500);
+      
+      session.close();
+   }
+
+   // https://issues.jboss.org/browse/HORNETQ-685
+   public void testTimeoutOnFailoverTransactionCommit() throws Exception
+   {
+      locator.setCallTimeout(2000);
+      locator.setBlockOnNonDurableSend(true);
+      locator.setBlockOnDurableSend(true);
+      locator.setAckBatchSize(0);
+      locator.setReconnectAttempts(-1);
+      ((InVMNodeManager)nodeManager).failoverPause = 5000l;
+
+      ClientSessionFactoryInternal sf = (ClientSessionFactoryInternal)locator.createSessionFactory();
+
+      final ClientSession session = createSession(sf, true, false, false);
+
+      session.createQueue(FailoverTestBase.ADDRESS, FailoverTestBase.ADDRESS, null, true);
+
+      final ClientProducer producer = session.createProducer(FailoverTestBase.ADDRESS);
+
+      Xid xid = new XidImpl("uhuhuhu".getBytes(), 126512, "auhsduashd".getBytes());
+
+      session.start(xid, XAResource.TMNOFLAGS);
+
+      for (int i = 0; i < 500; i++)
+      {
+         ClientMessage message = session.createMessage(true);
+         message.putIntProperty("counter", i);
+
+         System.out.println("sending message: " + i);
+         producer.send(message);
+
+      }
+      session.end(xid, XAResource.TMSUCCESS);
+      session.prepare(xid);
+      System.out.println("crashing session");
+      crash(false, session);
+
+      session.commit(xid, false);
+
+      ClientConsumer consumer = session.createConsumer(FailoverTestBase.ADDRESS);
+      session.start();
+      for (int i = 0; i < 500; i++)
+      {
+         ClientMessage m = consumer.receive(1000);
+         assertNotNull(m);
+         System.out.println("received message " + i);
+         assertEquals(i, m.getIntProperty("counter").intValue());
+      }
+   }
+
+   // https://issues.jboss.org/browse/HORNETQ-685
+   public void testTimeoutOnFailoverTransactionRollback() throws Exception
+   {
+      locator.setCallTimeout(2000);
+      locator.setBlockOnNonDurableSend(true);
+      locator.setBlockOnDurableSend(true);
+      locator.setAckBatchSize(0);
+      locator.setReconnectAttempts(-1);
+      ((InVMNodeManager)nodeManager).failoverPause = 5000l;
+
+      ClientSessionFactoryInternal sf = (ClientSessionFactoryInternal)locator.createSessionFactory();
+
+      final ClientSession session = createSession(sf, true, false, false);
+
+      session.createQueue(FailoverTestBase.ADDRESS, FailoverTestBase.ADDRESS, null, true);
+
+      final ClientProducer producer = session.createProducer(FailoverTestBase.ADDRESS);
+
+      Xid xid = new XidImpl("uhuhuhu".getBytes(), 126512, "auhsduashd".getBytes());
+
+      session.start(xid, XAResource.TMNOFLAGS);
+
+      for (int i = 0; i < 500; i++)
+      {
+         ClientMessage message = session.createMessage(true);
+         message.putIntProperty("counter", i);
+
+         System.out.println("sending message: " + i);
+         producer.send(message);
+      }
+
+      session.end(xid, XAResource.TMSUCCESS);
+      session.prepare(xid);
+      System.out.println("crashing session");
+      crash(false, session);
+
+      session.rollback(xid);
+
+      ClientConsumer consumer = session.createConsumer(FailoverTestBase.ADDRESS);
+      session.start();
+
+      ClientMessage m = consumer.receive(1000);
+      assertNull(m);
+
+   }
+
    // https://jira.jboss.org/browse/HORNETQ-522
    public void testNonTransactedWithZeroConsumerWindowSize() throws Exception
    {
@@ -1334,7 +1575,7 @@
 
       session2.end(xid, XAResource.TMSUCCESS);
 
-     // session2.prepare(xid);
+      // session2.prepare(xid);
 
       crash(session2);
 

Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/util/InVMNodeManager.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/util/InVMNodeManager.java	2011-12-02 21:23:10 UTC (rev 11819)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/util/InVMNodeManager.java	2011-12-02 21:35:46 UTC (rev 11820)
@@ -37,6 +37,8 @@
 
    public State state = NOT_STARTED;
 
+   public long failoverPause = 0l;
+
    public InVMNodeManager()
    {
       liveLock = new Semaphore(1);
@@ -73,6 +75,10 @@
          }
       }
       while (true);
+      if(failoverPause > 0l)
+      {
+         Thread.sleep(failoverPause);
+      }
    }
 
    @Override



More information about the hornetq-commits mailing list