[jboss-cvs] JBoss Messaging SVN: r6641 - in trunk: src/main/org/jboss/messaging/core/remoting/impl and 6 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Thu Apr 30 13:22:52 EDT 2009


Author: jmesnil
Date: 2009-04-30 13:22:52 -0400 (Thu, 30 Apr 2009)
New Revision: 6641

Modified:
   trunk/src/main/org/jboss/messaging/core/management/impl/MessagingServerControl.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java
   trunk/src/main/org/jboss/messaging/core/remoting/server/RemotingService.java
   trunk/src/main/org/jboss/messaging/core/remoting/server/impl/RemotingServiceImpl.java
   trunk/tests/src/org/jboss/messaging/tests/integration/client/TemporaryQueueTest.java
   trunk/tests/src/org/jboss/messaging/tests/integration/clientcrash/ClientCrashTest.java
   trunk/tests/src/org/jboss/messaging/tests/integration/jms/server/management/JMSServerControl2Test.java
   trunk/tests/src/org/jboss/messaging/tests/integration/jms/server/management/JMSUtil.java
   trunk/tests/src/org/jboss/messaging/tests/integration/remoting/PingTest.java
Log:
JBMESSAGING-1421: Server resources are not cleaned up when the client crashes/exits without closing properly JBM resources

* if the server resources have not been properly closed before the connection is closed, we clean them up when the connection TTL is hit
* the only exception is when the connection is closed through the management API. In that case, the clean up is immediate

Modified: trunk/src/main/org/jboss/messaging/core/management/impl/MessagingServerControl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/management/impl/MessagingServerControl.java	2009-04-30 17:11:15 UTC (rev 6640)
+++ trunk/src/main/org/jboss/messaging/core/management/impl/MessagingServerControl.java	2009-04-30 17:22:52 UTC (rev 6641)
@@ -430,6 +430,7 @@
          String remoteAddress = connection.getRemoteAddress();
          if (remoteAddress.contains(ipAddress))
          {
+            remotingService.removeConnection(connection.getID());
             connection.fail(new MessagingException(MessagingException.INTERNAL_ERROR, "connections for " + ipAddress +
                                                                                       " closed by management"));
             closed = true;

Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java	2009-04-30 17:11:15 UTC (rev 6640)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java	2009-04-30 17:22:52 UTC (rev 6641)
@@ -449,6 +449,8 @@
       // Then call the listeners
       callFailureListeners(me);
 
+      callClosingListeners();
+
       internalClose();
 
       for (ChannelImpl channel : channels.values())

Modified: trunk/src/main/org/jboss/messaging/core/remoting/server/RemotingService.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/server/RemotingService.java	2009-04-30 17:11:15 UTC (rev 6640)
+++ trunk/src/main/org/jboss/messaging/core/remoting/server/RemotingService.java	2009-04-30 17:22:52 UTC (rev 6641)
@@ -38,13 +38,22 @@
 {
    RemotingConnection getConnection(Object remotingConnectionID);
 
+   /**
+    * Remove a connection from the connections held by the remoting service.
+    * <strong>This method must be used only from the management API.
+    * RemotingConnections are removed from the remoting service when their connectionTTL is hit.</strong>
+    * @param remotingConnectionID the ID of the RemotingConnection to removed
+    * @return the removed RemotingConnection
+    */
+   RemotingConnection removeConnection(Object remotingConnectionID);
+
    Set<RemotingConnection> getConnections();
-   
+
    void addInterceptor(Interceptor interceptor);
-   
+
    boolean removeInterceptor(Interceptor interceptor);
-   
+
    void freeze();
-   
+
    RemotingConnection getServerSideReplicatingConnection();
 }

Modified: trunk/src/main/org/jboss/messaging/core/remoting/server/impl/RemotingServiceImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/server/impl/RemotingServiceImpl.java	2009-04-30 17:11:15 UTC (rev 6640)
+++ trunk/src/main/org/jboss/messaging/core/remoting/server/impl/RemotingServiceImpl.java	2009-04-30 17:22:52 UTC (rev 6641)
@@ -242,6 +242,11 @@
       return connections.get(remotingConnectionID);
    }
 
+   public RemotingConnection removeConnection(final Object remotingConnectionID)
+   {
+      return connections.remove(remotingConnectionID);
+   }
+   
    public synchronized Set<RemotingConnection> getConnections()
    {
       return new HashSet<RemotingConnection>(connections.values());
@@ -281,11 +286,9 @@
 
    public void connectionDestroyed(final Object connectionID)
    {  
-      RemotingConnection conn = connections.remove(connectionID);
-      if (conn != null)
-      {
-         conn.destroy();
-      }      
+      // We DO NOT destroy the connection when this event is received.
+      // Instead, the connection will be cleaned up when the connection TTL
+      // is hit in FailedConnectionsTask.
    }
 
    public void connectionException(final Object connectionID, final MessagingException me)
@@ -346,6 +349,8 @@
 
          for (RemotingConnection conn : failedConnections)
          {
+
+            connections.remove(conn.getID());
             MessagingException me = new MessagingException(MessagingException.CONNECTION_TIMEDOUT,
                                                            "Did not receive ping on connection. It is likely a client has exited or crashed without " + "closing its connection, or the network between the server and client has failed. The connection will now be closed.");
 

Modified: trunk/tests/src/org/jboss/messaging/tests/integration/client/TemporaryQueueTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/client/TemporaryQueueTest.java	2009-04-30 17:11:15 UTC (rev 6640)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/client/TemporaryQueueTest.java	2009-04-30 17:22:52 UTC (rev 6641)
@@ -28,6 +28,8 @@
 import org.jboss.messaging.core.client.ClientSession;
 import org.jboss.messaging.core.client.ClientSessionFactory;
 import org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl;
+import org.jboss.messaging.core.client.impl.ClientSessionInternal;
+import org.jboss.messaging.core.config.Configuration;
 import org.jboss.messaging.core.config.TransportConfiguration;
 import org.jboss.messaging.core.exception.MessagingException;
 import org.jboss.messaging.core.remoting.CloseListener;
@@ -47,18 +49,13 @@
  *
  * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
  */
-/**
- * A TemporaryQueueTest
- *
- * @author jmesnil
- *
- *
- */
 public class TemporaryQueueTest extends ServiceTestBase
 {
 
    // Constants -----------------------------------------------------
 
+   private static final long CONNECTION_TTL = 2000;
+
    // Attributes ----------------------------------------------------
 
    private MessagingServer server;
@@ -134,7 +131,7 @@
       });
       session.close();
       //wait for the closing listeners to be fired
-      assertTrue("connection close listeners not fired", latch.await(1, TimeUnit.SECONDS));
+      assertTrue("connection close listeners not fired", latch.await(2 * CONNECTION_TTL, TimeUnit.MILLISECONDS));
       session = sf.createSession(false, true, true);
       session.start();
       
@@ -200,11 +197,12 @@
             latch.countDown();
          }
       });
-      remotingConnection.fail(new MessagingException(MessagingException.INTERNAL_ERROR, "simulate a client failure"));
+      
+      ((ClientSessionInternal)session).getConnection().fail(new MessagingException(MessagingException.INTERNAL_ERROR, "simulate a client failure"));
 
 
       // let some time for the server to clean the connections
-      latch.await(1, TimeUnit.SECONDS);
+      latch.await(2 * CONNECTION_TTL, TimeUnit.MILLISECONDS);
 
       assertEquals(0, server.getConnectionCount());
 
@@ -233,10 +231,34 @@
    {
       super.setUp();
 
-      server = createServer(false);
+      
+      Configuration configuration = createDefaultConfig();
+      configuration.setSecurityEnabled(false);
+      server = createServer(false, configuration );
       server.start();
 
-      sf = createInVMFactory();
+      sf = new ClientSessionFactoryImpl(new TransportConfiguration(INVM_CONNECTOR_FACTORY),
+                                        null,
+                                        ClientSessionFactoryImpl.DEFAULT_FAILOVER_ON_SERVER_SHUTDOWN,
+                                        ClientSessionFactoryImpl.DEFAULT_CONNECTION_LOAD_BALANCING_POLICY_CLASS_NAME,
+                                        ClientSessionFactoryImpl.DEFAULT_PING_PERIOD, 
+                                        CONNECTION_TTL,
+                                        ClientSessionFactoryImpl.DEFAULT_CALL_TIMEOUT,
+                                        ClientSessionFactoryImpl.DEFAULT_CONSUMER_WINDOW_SIZE, 
+                                        ClientSessionFactoryImpl.DEFAULT_CONSUMER_MAX_RATE,
+                                        ClientSessionFactoryImpl.DEFAULT_PRODUCER_WINDOW_SIZE,
+                                        ClientSessionFactoryImpl.DEFAULT_PRODUCER_MAX_RATE,
+                                        ClientSessionFactoryImpl.DEFAULT_MIN_LARGE_MESSAGE_SIZE,
+                                        ClientSessionFactoryImpl.DEFAULT_BLOCK_ON_ACKNOWLEDGE,
+                                        ClientSessionFactoryImpl.DEFAULT_BLOCK_ON_NON_PERSISTENT_SEND,
+                                        ClientSessionFactoryImpl.DEFAULT_BLOCK_ON_PERSISTENT_SEND,
+                                        ClientSessionFactoryImpl.DEFAULT_AUTO_GROUP,
+                                        ClientSessionFactoryImpl.DEFAULT_MAX_CONNECTIONS,
+                                        ClientSessionFactoryImpl.DEFAULT_PRE_ACKNOWLEDGE,
+                                        ClientSessionFactoryImpl.DEFAULT_ACK_BATCH_SIZE,
+                                        ClientSessionFactoryImpl.DEFAULT_RETRY_INTERVAL,
+                                        ClientSessionFactoryImpl.DEFAULT_RETRY_INTERVAL_MULTIPLIER,
+                                        ClientSessionFactoryImpl.DEFAULT_RECONNECT_ATTEMPTS);
       session = sf.createSession(false, true, true);
    }
 

Modified: trunk/tests/src/org/jboss/messaging/tests/integration/clientcrash/ClientCrashTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/clientcrash/ClientCrashTest.java	2009-04-30 17:11:15 UTC (rev 6640)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/clientcrash/ClientCrashTest.java	2009-04-30 17:22:52 UTC (rev 6641)
@@ -126,20 +126,20 @@
 
       System.out.println("VM Exited");
 
-      Thread.sleep(1000);
+      Thread.sleep(2 * CONNECTION_TTL);
 
       assertActiveConnections(1);
       // FIXME https://jira.jboss.org/jira/browse/JBMESSAGING-1421
-      // assertActiveSession(1);
+      assertActiveSession(1);
 
       session.close();
 
-      Thread.sleep(1000);
+      Thread.sleep(2 * CONNECTION_TTL);
 
       // the crash must have been detected and the resources cleaned up
       assertActiveConnections(0);
       // FIXME https://jira.jboss.org/jira/browse/JBMESSAGING-1421
-      // assertActiveSession(0);
+      assertActiveSession(0);
    }
 
    // Package protected ---------------------------------------------

Modified: trunk/tests/src/org/jboss/messaging/tests/integration/jms/server/management/JMSServerControl2Test.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/jms/server/management/JMSServerControl2Test.java	2009-04-30 17:11:15 UTC (rev 6640)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/jms/server/management/JMSServerControl2Test.java	2009-04-30 17:22:52 UTC (rev 6641)
@@ -22,10 +22,17 @@
 
 package org.jboss.messaging.tests.integration.jms.server.management;
 
-import org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl;
+import static org.jboss.messaging.tests.util.RandomUtil.randomString;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import javax.jms.Connection;
+import javax.jms.ExceptionListener;
+import javax.jms.JMSException;
+
 import org.jboss.messaging.core.config.Configuration;
 import org.jboss.messaging.core.config.TransportConfiguration;
-import org.jboss.messaging.core.config.cluster.DiscoveryGroupConfiguration;
 import org.jboss.messaging.core.config.impl.ConfigurationImpl;
 import org.jboss.messaging.core.logging.Logger;
 import org.jboss.messaging.core.remoting.impl.invm.InVMAcceptorFactory;
@@ -40,17 +47,6 @@
 import org.jboss.messaging.tests.integration.management.ManagementTestBase;
 import org.jboss.messaging.tests.unit.util.InVMContext;
 
-import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_RECONNECT_ATTEMPTS;
-import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_FAILOVER_ON_SERVER_SHUTDOWN;
-import static org.jboss.messaging.tests.util.RandomUtil.randomString;
-
-import javax.jms.Connection;
-import javax.jms.ConnectionFactory;
-import javax.jms.ExceptionListener;
-import javax.jms.JMSException;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
 /**
  * A QueueControlTest
  *
@@ -66,6 +62,10 @@
 
    private static final Logger log = Logger.getLogger(JMSServerControl2Test.class);
 
+   private static final long CONNECTION_TTL = 1000;
+
+   private static final long PING_PERIOD = CONNECTION_TTL / 2;
+
    // Attributes ----------------------------------------------------
 
    private InVMContext context;
@@ -166,21 +166,21 @@
 
          assertEquals(0, control.listConnectionIDs().length);
 
-         Connection connection = JMSUtil.createConnection(connectorFactory);
+         Connection connection = JMSUtil.createConnection(connectorFactory, CONNECTION_TTL, PING_PERIOD);
 
          String[] connectionIDs = control.listConnectionIDs();
          assertEquals(1, connectionIDs.length);
 
-         Connection connection2 = JMSUtil.createConnection(connectorFactory);
+         Connection connection2 = JMSUtil.createConnection(connectorFactory, CONNECTION_TTL, PING_PERIOD);
          assertEquals(2, control.listConnectionIDs().length);
 
          connection.close();
-         Thread.sleep(500);
+         Thread.sleep(2 * CONNECTION_TTL);
 
          assertEquals(1, control.listConnectionIDs().length);
 
          connection2.close();
-         Thread.sleep(500);
+         Thread.sleep(2 * CONNECTION_TTL);
 
          assertEquals(0, control.listConnectionIDs().length);
       }
@@ -204,7 +204,7 @@
 
          assertEquals(0, control.listConnectionIDs().length);
 
-         Connection connection = JMSUtil.createConnection(connectorFactory);
+         Connection connection = JMSUtil.createConnection(connectorFactory, CONNECTION_TTL, PING_PERIOD);
 
          String[] connectionIDs = control.listConnectionIDs();
          assertEquals(1, connectionIDs.length);
@@ -218,7 +218,7 @@
 
          connection.close();
 
-         Thread.sleep(500);
+         Thread.sleep(2 * CONNECTION_TTL);
 
          assertEquals(0, control.listConnectionIDs().length);
       }
@@ -242,7 +242,7 @@
 
          assertEquals(0, control.listRemoteAddresses().length);
 
-         Connection connection = JMSUtil.createConnection(connectorFactory);
+         Connection connection = JMSUtil.createConnection(connectorFactory, CONNECTION_TTL, PING_PERIOD);
 
          String[] remoteAddresses = control.listRemoteAddresses();
          assertEquals(1, remoteAddresses.length);
@@ -256,8 +256,7 @@
          
          connection.close();
 
-         // FIXME: with Netty, the server is not notified immediately that the connection is closed
-         Thread.sleep(1000);
+         Thread.sleep(2 * CONNECTION_TTL);
 
          assertEquals(0, control.listRemoteAddresses().length);
       }
@@ -283,7 +282,7 @@
          assertEquals(0, server.getConnectionCount());
          assertEquals(0, control.listRemoteAddresses().length);
 
-         Connection connection = JMSUtil.createConnection(connectorFactory);
+         Connection connection = JMSUtil.createConnection(connectorFactory, CONNECTION_TTL, PING_PERIOD);
 
          assertEquals(1, server.getConnectionCount());
 
@@ -302,8 +301,12 @@
 
          assertTrue(control.closeConnectionsForAddress(remoteAddress));
 
-         boolean gotException = exceptionLatch.await(5, TimeUnit.SECONDS);
+         boolean gotException = exceptionLatch.await(2 * CONNECTION_TTL, TimeUnit.MILLISECONDS);
          assertTrue("did not received the expected JMSException", gotException);
+         for (String string : control.listRemoteAddresses())
+         {
+            System.out.println(string);
+         }
          assertEquals(0, control.listRemoteAddresses().length);
          assertEquals(0, server.getConnectionCount());
       }
@@ -331,7 +334,7 @@
          assertEquals(0, server.getConnectionCount());
          assertEquals(0, control.listRemoteAddresses().length);
 
-         Connection connection = JMSUtil.createConnection(connectorFactory);
+         Connection connection = JMSUtil.createConnection(connectorFactory, CONNECTION_TTL, PING_PERIOD);
 
          assertEquals(1, server.getConnectionCount());
          String[] remoteAddresses = control.listRemoteAddresses();
@@ -348,7 +351,7 @@
 
          assertFalse(control.closeConnectionsForAddress(unknownAddress));
 
-         boolean gotException = exceptionLatch.await(500, TimeUnit.MILLISECONDS);
+         boolean gotException = exceptionLatch.await(2 * CONNECTION_TTL, TimeUnit.MILLISECONDS);
          assertFalse(gotException);
 
          assertEquals(1, control.listRemoteAddresses().length);

Modified: trunk/tests/src/org/jboss/messaging/tests/integration/jms/server/management/JMSUtil.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/jms/server/management/JMSUtil.java	2009-04-30 17:11:15 UTC (rev 6640)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/jms/server/management/JMSUtil.java	2009-04-30 17:22:52 UTC (rev 6641)
@@ -79,11 +79,16 @@
 
    public static Connection createConnection(String connectorFactory) throws JMSException
    {
+      return createConnection(connectorFactory, DEFAULT_CONNECTION_TTL, DEFAULT_PING_PERIOD);
+   }
+
+   public static Connection createConnection(String connectorFactory, long connectionTTL, long pingPeriod) throws JMSException
+   {
       JBossConnectionFactory cf = new JBossConnectionFactory(new TransportConfiguration(connectorFactory),
                                                              null,
                                                              DEFAULT_CONNECTION_LOAD_BALANCING_POLICY_CLASS_NAME,
-                                                             DEFAULT_PING_PERIOD,
-                                                             DEFAULT_CONNECTION_TTL,
+                                                             pingPeriod,
+                                                             connectionTTL,
                                                              DEFAULT_CALL_TIMEOUT,
                                                              null,
                                                              DEFAULT_ACK_BATCH_SIZE,
@@ -106,7 +111,7 @@
 
       return cf.createConnection();
    }
-
+   
    static MessageConsumer createConsumer(Connection connection, Destination destination, String connectorFactory) throws JMSException
    {
       Session s = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

Modified: trunk/tests/src/org/jboss/messaging/tests/integration/remoting/PingTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/remoting/PingTest.java	2009-04-30 17:11:15 UTC (rev 6640)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/remoting/PingTest.java	2009-04-30 17:22:52 UTC (rev 6641)
@@ -344,6 +344,8 @@
 
          Thread.sleep(PING_INTERVAL);
       }
+      
+      Thread.sleep(3 * PING_INTERVAL);
 
       assertTrue(server.getRemotingService().getConnections().isEmpty());
 
@@ -364,13 +366,24 @@
    {
       Interceptor noPongInterceptor = new Interceptor()
       {
+         boolean allowPing = true;
+         
          public boolean intercept(Packet packet, RemotingConnection conn) throws MessagingException
          {
             log.info("In interceptor, packet is " + packet.getType());
             if (packet.getType() == PacketImpl.PING)
             {
-               log.info("Ignoring Ping packet.. it will be dropped");
-               return false;
+               if (allowPing)
+               {
+                  log.info("allow 1 ping");
+                  allowPing = false;
+                  return true;
+               }
+               else
+               {
+                  log.info("Ignoring Ping packet.. it will be dropped");
+                  return false;
+               }
             }
             else
             {
@@ -434,23 +447,14 @@
 
       serverConn.addFailureListener(serverListener);
 
-      for (int i = 0; i < 40; i++)
-      {
-         // a few tries to avoid a possible race caused by GCs or similar issues
-         if (server.getRemotingService().getConnections().isEmpty() && clientListener.getException() != null)
-         {
-            break;
-         }
-
-         Thread.sleep(PING_INTERVAL);
-      }
-
+      Thread.sleep(3 * PING_INTERVAL);
+      
       assertNotNull(clientListener.getException());
 
-      // We don't receive an exception on the server in this case
-      assertNull(serverListener.getException());
+      // We receive an exception on the server in this case too
+      assertNotNull(serverListener.getException());
 
-      assertTrue(server.getRemotingService().getConnections().isEmpty());
+      assertEquals(0, server.getRemotingService().getConnections().size());
 
       server.getRemotingService().removeInterceptor(noPongInterceptor);
 




More information about the jboss-cvs-commits mailing list