[hornetq-commits] JBoss hornetq SVN: r10514 - in branches/Branch_2_2_EAP: src/main/org/hornetq/core/protocol/core and 3 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Fri Apr 15 01:17:27 EDT 2011


Author: clebert.suconic at jboss.com
Date: 2011-04-15 01:17:26 -0400 (Fri, 15 Apr 2011)
New Revision: 10514

Modified:
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/ServerSessionPacketHandler.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/impl/HornetQPacketHandler.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
   branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/RequestorTest.java
   branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/TemporaryQueueTest.java
Log:
JBPAPP-6314 and JBPAPP-6316 - Client ping timeout adding TTL and deleting temporary queues if session can't be reconnected

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java	2011-04-15 02:35:06 UTC (rev 10513)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java	2011-04-15 05:17:26 UTC (rev 10514)
@@ -1093,6 +1093,8 @@
                                                                          0,
                                                                          clientFailureCheckPeriod,
                                                                          TimeUnit.MILLISECONDS);
+               // To make sure the first ping will be sent
+               pingRunnable.send();
             }
             // send a ping every time we create a new remoting connection
             // to set up its TTL on the server side
@@ -1313,8 +1315,8 @@
          first = false;
 
          long now = System.currentTimeMillis();
-
-         if (clientFailureCheckPeriod != -1 && now >= lastCheck + clientFailureCheckPeriod)
+         
+         if (clientFailureCheckPeriod != -1 && connectionTTL != -1 && now >= lastCheck + connectionTTL )
          {
             if (!connection.checkDataReceived())
             {
@@ -1340,6 +1342,14 @@
             }
          }
 
+         send();
+      }
+
+      /**
+       * 
+       */
+      public void send()
+      {
          // Send a ping
 
          Ping ping = new Ping(connectionTTL);

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java	2011-04-15 02:35:06 UTC (rev 10513)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java	2011-04-15 05:17:26 UTC (rev 10514)
@@ -1606,6 +1606,16 @@
    {
       return remotingConnection;
    }
+   
+   public String toString()
+   {
+      StringBuffer buffer = new StringBuffer();
+      for (Map.Entry<String, String> entry : metadata.entrySet())
+      {
+         buffer.append(entry.getKey() + "=" + entry.getValue() + ",");
+      }
+      return "ClientSessionImpl::(" + buffer.toString() + ")";
+   }
 
    // Protected
    // ----------------------------------------------------------------------------

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/ServerSessionPacketHandler.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/ServerSessionPacketHandler.java	2011-04-15 02:35:06 UTC (rev 10513)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/ServerSessionPacketHandler.java	2011-04-15 05:17:26 UTC (rev 10514)
@@ -587,6 +587,20 @@
       }
    }
    
+   public void closeListeners()
+   {
+      List<CloseListener> listeners = remotingConnection.removeCloseListeners();
+      
+      for (CloseListener closeListener: listeners)
+      {
+         closeListener.connectionClosed();
+         if (closeListener instanceof FailureListener)
+         {
+            remotingConnection.removeFailureListener((FailureListener)closeListener);
+         }
+      }
+   }
+   
    public int transferConnection(final CoreRemotingConnection newConnection, final int lastReceivedCommandID)
    {
       // We need to disable delivery on all the consumers while the transfer is occurring- otherwise packets might get

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/impl/HornetQPacketHandler.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/impl/HornetQPacketHandler.java	2011-04-15 02:35:06 UTC (rev 10513)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/impl/HornetQPacketHandler.java	2011-04-15 05:17:26 UTC (rev 10514)
@@ -235,17 +235,15 @@
             response = new ReattachSessionResponseMessage(-1, false);
          }
 
+         log.debug("Reattaching request from " +  connection.getRemoteAddress());
+
+
          ServerSessionPacketHandler sessionHandler = protocolManager.getSessionHandler(request.getName());
 
-         if (!server.checkActivate())
+         if (!server.checkActivate() || sessionHandler == null)
          {
             response = new ReattachSessionResponseMessage(-1, false);
          }
-
-         if (sessionHandler == null)
-         {
-            response = new ReattachSessionResponseMessage(-1, false);
-         }
          else
          {
             if (sessionHandler.getChannel().getConfirmationWindowSize() == -1)
@@ -253,7 +251,10 @@
                // Even though session exists, we can't reattach since confi window size == -1,
                // i.e. we don't have a resend cache for commands, so we just close the old session
                // and let the client recreate
+               
+               log.warn("Reattach request from " + connection.getRemoteAddress() + " failed as there is no confirmationWindowSize configured, which may be ok for your system");
 
+               sessionHandler.closeListeners();
                sessionHandler.close();
 
                response = new ReattachSessionResponseMessage(-1, false);

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java	2011-04-15 02:35:06 UTC (rev 10513)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java	2011-04-15 05:17:26 UTC (rev 10514)
@@ -447,6 +447,11 @@
       {
          run();
       }
+      
+      public String toString()
+      {
+         return "Temporary Cleaner for queue " + bindingName;
+      }
 
    }
 

Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/RequestorTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/RequestorTest.java	2011-04-15 02:35:06 UTC (rev 10513)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/RequestorTest.java	2011-04-15 05:17:26 UTC (rev 10514)
@@ -114,7 +114,6 @@
 
       for (int i = 0 ; i < 2000; i++)
       {
-         System.out.println("i = " + i);
          if (i % 100 == 0)
          {
             System.out.println(i);

Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/TemporaryQueueTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/TemporaryQueueTest.java	2011-04-15 02:35:06 UTC (rev 10513)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/TemporaryQueueTest.java	2011-04-15 05:17:26 UTC (rev 10514)
@@ -23,7 +23,13 @@
 import org.hornetq.api.core.Interceptor;
 import org.hornetq.api.core.SimpleString;
 import org.hornetq.api.core.TransportConfiguration;
-import org.hornetq.api.core.client.*;
+import org.hornetq.api.core.client.ClientConsumer;
+import org.hornetq.api.core.client.ClientMessage;
+import org.hornetq.api.core.client.ClientProducer;
+import org.hornetq.api.core.client.ClientSession;
+import org.hornetq.api.core.client.ClientSessionFactory;
+import org.hornetq.api.core.client.HornetQClient;
+import org.hornetq.api.core.client.ServerLocator;
 import org.hornetq.core.client.impl.ClientSessionInternal;
 import org.hornetq.core.config.Configuration;
 import org.hornetq.core.logging.Logger;
@@ -31,7 +37,6 @@
 import org.hornetq.core.protocol.core.impl.PacketImpl;
 import org.hornetq.core.protocol.core.impl.RemotingConnectionImpl;
 import org.hornetq.core.remoting.CloseListener;
-import org.hornetq.core.remoting.impl.invm.InVMConnectorFactory;
 import org.hornetq.core.remoting.server.impl.RemotingServiceImpl;
 import org.hornetq.core.server.HornetQServer;
 import org.hornetq.spi.core.protocol.RemotingConnection;
@@ -43,6 +48,7 @@
  * A TemporaryQueueTest
  *
  * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
+ * @author Clebert Suconic
  */
 public class TemporaryQueueTest extends ServiceTestBase
 {
@@ -288,7 +294,7 @@
       session.close();
    }
 
-   public void _testQueueWithWildcard3() throws Exception
+   public void testQueueWithWildcard3() throws Exception
    {
       session.createQueue("a.b", "queue1");
       session.createTemporaryQueue("a.#", "queue2");
@@ -323,7 +329,41 @@
 
       session2.close();
    }
+   
+   public void testRecreateConsumerOverServerFailure() throws Exception
+   {
+      ServerLocator serverWithReattach = createLocator();
+      serverWithReattach.setReconnectAttempts(-1);
+      serverWithReattach.setRetryInterval(1000);
+      serverWithReattach.setConfirmationWindowSize(-1);
+      ClientSessionFactory reattachSF = serverWithReattach.createSessionFactory();
+      
+      ClientSession session = reattachSF.createSession(false, false);
+      session.createTemporaryQueue("tmpAd", "tmpQ");
+      ClientConsumer consumer = session.createConsumer("tmpQ");
+      
+      ClientProducer prod = session.createProducer("tmpAd");
+      
+      session.start();
+      
+      RemotingConnectionImpl conn = (RemotingConnectionImpl)((ClientSessionInternal)session).getConnection();
 
+      conn.fail(new HornetQException(HornetQException.IO_ERROR));
+      
+      prod.send(session.createMessage(false));
+      session.commit();
+      
+      assertNotNull(consumer.receive(1000));
+      
+      session.close();
+      
+      reattachSF.close();
+      
+      serverWithReattach.close();
+      
+      
+   }
+
    public void testDeleteTemporaryQueueWhenClientCrash() throws Exception
    {
       session.close();
@@ -416,12 +456,18 @@
       server = createServer(false, configuration);
       server.start();
 
-      locator = HornetQClient.createServerLocatorWithoutHA(new TransportConfiguration(UnitTestCase.INVM_CONNECTOR_FACTORY));
-      locator.setConnectionTTL(TemporaryQueueTest.CONNECTION_TTL);
+      locator = createLocator();
       sf = locator.createSessionFactory();
       session = sf.createSession(false, true, true);
    }
 
+   protected ServerLocator createLocator()
+   {
+      ServerLocator retlocator = HornetQClient.createServerLocatorWithoutHA(new TransportConfiguration(UnitTestCase.INVM_CONNECTOR_FACTORY));
+      retlocator.setConnectionTTL(TemporaryQueueTest.CONNECTION_TTL);
+      return retlocator;
+   }
+
    @Override
    protected void tearDown() throws Exception
    {



More information about the hornetq-commits mailing list