[hornetq-commits] JBoss hornetq SVN: r7943 - in trunk: src/main/org/hornetq/core/remoting/server/impl and 2 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Tue Sep 8 12:41:34 EDT 2009


Author: timfox
Date: 2009-09-08 12:41:34 -0400 (Tue, 08 Sep 2009)
New Revision: 7943

Added:
   trunk/tests/src/org/hornetq/tests/integration/jms/connection/CloseDestroyedConnectionTest.java
Modified:
   trunk/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java
   trunk/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java
   trunk/tests/src/org/hornetq/tests/integration/client/SessionTest.java
   trunk/tests/src/org/hornetq/tests/integration/jms/connection/CloseConnectionOnGCTest.java
Log:
https://jira.jboss.org/jira/browse/HORNETQ-121

Modified: trunk/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java	2009-09-08 14:28:34 UTC (rev 7942)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java	2009-09-08 16:41:34 UTC (rev 7943)
@@ -149,7 +149,7 @@
       if (handler != null)
       {
          throw new HornetQException(HornetQException.ILLEGAL_STATE,
-                                      "Cannot call receive(...) - a MessageHandler is set");
+                                    "Cannot call receive(...) - a MessageHandler is set");
       }
 
       if (clientWindowSize == 0)
@@ -210,7 +210,7 @@
             {
                // if we have already pre acked we cant expire
                boolean expired = m.isExpired();
-               
+
                flowControlBeforeConsumption(m);
 
                if (expired)
@@ -279,7 +279,7 @@
       if (receiverThread != null)
       {
          throw new HornetQException(HornetQException.ILLEGAL_STATE,
-                                      "Cannot set MessageHandler - consumer is in receive(...)");
+                                    "Cannot set MessageHandler - consumer is in receive(...)");
       }
 
       boolean noPreviousHandler = handler == null;
@@ -400,7 +400,7 @@
 
       // Flow control for the first packet, we will have others
       flowControl(packet.getPacketSize(), false);
-            
+
       ClientMessageInternal currentChunkMessage = new ClientMessageImpl(packet.getDeliveryCount());
 
       currentChunkMessage.decodeProperties(ChannelBuffers.wrappedBuffer(packet.getLargeMessageHeader()));
@@ -439,18 +439,18 @@
    {
       synchronized (this)
       {
-         //Need to send credits for the messages in the buffer
-         
-         for (ClientMessageInternal message: this.buffer)
+         // Need to send credits for the messages in the buffer
+
+         for (ClientMessageInternal message : this.buffer)
          {
             flowControlBeforeConsumption(message);
          }
 
          buffer.clear();
       }
-      
-      //Need to send credits for the messages in the buffer
 
+      // Need to send credits for the messages in the buffer
+
       waitForOnMessageToComplete();
    }
 
@@ -742,17 +742,19 @@
 
          flushAcks();
 
+         clearBuffer();
+
          if (sendCloseMessage)
          {
             channel.sendBlocking(new SessionConsumerCloseMessage(id));
          }
-
-         clearBuffer();
       }
-      finally
+      catch (Throwable t)
       {
-         session.removeConsumer(this);
+         // Consumer close should always return without exception
       }
+
+      session.removeConsumer(this);
    }
 
    private void clearBuffer()

Modified: trunk/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java	2009-09-08 14:28:34 UTC (rev 7942)
+++ trunk/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java	2009-09-08 16:41:34 UTC (rev 7943)
@@ -461,7 +461,7 @@
       }
 
       public void run()
-      {
+      {         
          while (!closed)
          {
             long now = System.currentTimeMillis();

Modified: trunk/tests/src/org/hornetq/tests/integration/client/SessionTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/client/SessionTest.java	2009-09-08 14:28:34 UTC (rev 7942)
+++ trunk/tests/src/org/hornetq/tests/integration/client/SessionTest.java	2009-09-08 16:41:34 UTC (rev 7943)
@@ -21,8 +21,10 @@
 import org.hornetq.core.client.ClientProducer;
 import org.hornetq.core.client.ClientSession;
 import org.hornetq.core.client.ClientSessionFactory;
+import org.hornetq.core.client.impl.ClientSessionInternal;
 import org.hornetq.core.exception.HornetQException;
 import org.hornetq.core.remoting.FailureListener;
+import org.hornetq.core.remoting.RemotingConnection;
 import org.hornetq.core.remoting.impl.wireformat.SessionBindingQueryResponseMessage;
 import org.hornetq.core.remoting.impl.wireformat.SessionQueueQueryResponseMessage;
 import org.hornetq.core.server.HornetQServer;
@@ -107,6 +109,61 @@
          }
       }
    }
+   
+   //Closing a session if the underlying remoting connection is deaad should cleanly
+   //release all resources
+   public void testCloseSessionOnDestroyedConnection() throws Exception
+   {
+      HornetQServer server = createServer(false);
+      try
+      {
+         //Make sure we have a short connection TTL so sessions will be quickly closed on the server
+         long ttl = 500;
+         server.getConfiguration().setConnectionTTLOverride(ttl);
+         server.start();
+         ClientSessionFactory cf = createInVMFactory();
+         ClientSessionInternal clientSession = (ClientSessionInternal)cf.createSession(false, true, true);
+         clientSession.createQueue(queueName, queueName, false);
+         ClientProducer producer = clientSession.createProducer();
+         ClientConsumer consumer = clientSession.createConsumer(queueName);   
+         
+         assertEquals(1, server.getRemotingService().getConnections().size());
+         
+         RemotingConnection rc = clientSession.getConnection();
+         
+         rc.fail(new HornetQException(HornetQException.INTERNAL_ERROR));
+         
+         clientSession.close();
+         
+         long start = System.currentTimeMillis();
+         
+         while (true)
+         {           
+            int cons = server.getRemotingService().getConnections().size();
+            
+            if (cons == 0)
+            {
+               break;
+            }               
+            
+            long now = System.currentTimeMillis();
+            
+            if (now - start > 10000)
+            {
+               throw new Exception("Timed out waiting for connections to close");
+            }
+            
+            Thread.sleep(50);
+         }         
+      }
+      finally
+      {
+         if (server.isStarted())
+         {
+            server.stop();
+         }
+      }
+   }
 
    public void testBindingQuery() throws Exception
    {

Modified: trunk/tests/src/org/hornetq/tests/integration/jms/connection/CloseConnectionOnGCTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/jms/connection/CloseConnectionOnGCTest.java	2009-09-08 14:28:34 UTC (rev 7942)
+++ trunk/tests/src/org/hornetq/tests/integration/jms/connection/CloseConnectionOnGCTest.java	2009-09-08 16:41:34 UTC (rev 7943)
@@ -16,16 +16,9 @@
 import javax.jms.Session;
 
 import org.hornetq.core.client.impl.ClientSessionFactoryImpl;
-import org.hornetq.core.config.Configuration;
 import org.hornetq.core.config.TransportConfiguration;
-import org.hornetq.core.config.impl.ConfigurationImpl;
-import org.hornetq.core.server.HornetQ;
-import org.hornetq.core.server.HornetQServer;
 import org.hornetq.jms.client.HornetQConnectionFactory;
-import org.hornetq.jms.server.impl.JMSServerManagerImpl;
-import org.hornetq.tests.integration.jms.server.management.NullInitialContext;
 import org.hornetq.tests.util.JMSTestBase;
-import org.hornetq.tests.util.UnitTestCase;
 
 /**
  * 

Added: trunk/tests/src/org/hornetq/tests/integration/jms/connection/CloseDestroyedConnectionTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/jms/connection/CloseDestroyedConnectionTest.java	                        (rev 0)
+++ trunk/tests/src/org/hornetq/tests/integration/jms/connection/CloseDestroyedConnectionTest.java	2009-09-08 16:41:34 UTC (rev 7943)
@@ -0,0 +1,128 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.hornetq.tests.integration.jms.connection;
+
+import javax.jms.Connection;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.QueueBrowser;
+import javax.jms.Session;
+
+import org.hornetq.core.client.impl.ClientSessionInternal;
+import org.hornetq.core.config.TransportConfiguration;
+import org.hornetq.core.exception.HornetQException;
+import org.hornetq.core.logging.Logger;
+import org.hornetq.core.remoting.RemotingConnection;
+import org.hornetq.jms.HornetQQueue;
+import org.hornetq.jms.client.HornetQConnectionFactory;
+import org.hornetq.jms.client.HornetQSession;
+import org.hornetq.tests.util.JMSTestBase;
+
+/**
+ * 
+ * A CloseDestroyedConnectionTest
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ *
+ */
+public class CloseDestroyedConnectionTest extends JMSTestBase
+{
+   private static final Logger log = Logger.getLogger(CloseDestroyedConnectionTest.class);
+
+   private HornetQConnectionFactory cf;
+
+   @Override
+   protected void setUp() throws Exception
+   {
+      super.setUp();
+
+      cf = new HornetQConnectionFactory(new TransportConfiguration("org.hornetq.core.remoting.impl.invm.InVMConnectorFactory"));
+      cf.setBlockOnPersistentSend(true);
+      cf.setPreAcknowledge(true);
+   }
+
+   @Override
+   protected void tearDown() throws Exception
+   {
+      cf = null;
+
+      super.tearDown();
+   }
+
+   /*
+    * Closing a connection that is destroyed should cleanly close everything without throwing exceptions
+    */
+   public void testCloseDestroyedConnection() throws Exception
+   {
+      long connectionTTL = 500;
+      cf.setClientFailureCheckPeriod(connectionTTL / 2);
+      // Need to set connection ttl to a low figure so connections get removed quickly on the server
+      cf.setConnectionTTL(connectionTTL);
+      
+      Connection conn = cf.createConnection();
+           
+      assertEquals(1, server.getRemotingService().getConnections().size());
+            
+      Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+      
+      //Give time for the initial ping to reach the server before we fail (it has connection TTL in it)
+      Thread.sleep(500);
+      
+      String queueName = "myqueue";
+      
+      Queue queue = new HornetQQueue(queueName);
+      
+      super.createQueue(queueName);
+      
+      MessageConsumer consumer = sess.createConsumer(queue);
+      
+      MessageProducer producer = sess.createProducer(queue);
+      
+      QueueBrowser browser = sess.createBrowser(queue);
+            
+      // Now fail the underlying connection
+      
+      ClientSessionInternal sessi = (ClientSessionInternal)((HornetQSession)sess).getCoreSession();
+      
+      RemotingConnection rc = sessi.getConnection();
+      
+      rc.fail(new HornetQException(HornetQException.INTERNAL_ERROR));
+      
+      // Now close the connection
+      
+      conn.close();
+      
+      long start = System.currentTimeMillis();
+      
+      while (true)
+      {           
+         int cons = server.getRemotingService().getConnections().size();
+         
+         if (cons == 0)
+         {
+            break;
+         }               
+         
+         long now = System.currentTimeMillis();
+         
+         if (now - start > 10000)
+         {
+            throw new Exception("Timed out waiting for connections to close");
+         }
+         
+         Thread.sleep(50);
+      }
+   }
+}



More information about the hornetq-commits mailing list