[jboss-cvs] JBoss Messaging SVN: r5287 - in trunk: src/main/org/jboss/messaging/core/remoting/impl and 2 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Thu Nov 6 06:25:42 EST 2008


Author: timfox
Date: 2008-11-06 06:25:41 -0500 (Thu, 06 Nov 2008)
New Revision: 5287

Added:
   trunk/tests/src/org/jboss/messaging/tests/integration/cluster/ReplicateConnectionFailureTest.java
Modified:
   trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionFactoryImpl.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/ConnectionManagerImpl.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingServiceImpl.java
   trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
Log:
Make sure connection cleanup is replicated


Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionFactoryImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionFactoryImpl.java	2008-11-06 11:07:01 UTC (rev 5286)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionFactoryImpl.java	2008-11-06 11:25:41 UTC (rev 5287)
@@ -403,6 +403,15 @@
 
    public void connectionFailed(final MessagingException me)
    {      
+      if (me.getCode() == MessagingException.OBJECT_CLOSED)
+      {
+         //The server has closed the connection. We don't want failover to occur in this case - 
+         //either the server has booted off the connection, or it didn't receive a ping in time
+         //in either case server side resources on both live and backup will be removed so the client
+         //can't failover anyway
+         return;
+      }
+      
       synchronized (failoverLock)
       {         
          //Now get locks on all channel 1s, whilst holding the failoverLock - this makes sure

Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/ConnectionManagerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/ConnectionManagerImpl.java	2008-11-06 11:07:01 UTC (rev 5286)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/ConnectionManagerImpl.java	2008-11-06 11:25:41 UTC (rev 5287)
@@ -266,7 +266,8 @@
       // conn has been closed from the server side without
       // being returned from the client side so we need to fail the conn and
       // call it's listeners
-      MessagingException me = new MessagingException(MessagingException.OBJECT_CLOSED, "The conn has been closed.");
+      MessagingException me = new MessagingException(MessagingException.OBJECT_CLOSED, "The conn has been closed by the server");
+      
       failConnection(me);
    }
 

Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingServiceImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingServiceImpl.java	2008-11-06 11:07:01 UTC (rev 5286)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingServiceImpl.java	2008-11-06 11:25:41 UTC (rev 5287)
@@ -139,7 +139,7 @@
       failedConnectionTimer = new Timer(true);
 
       failedConnectionsTask = new FailedConnectionsTask();
-
+   
       failedConnectionTimer.schedule(failedConnectionsTask, connectionScanPeriod, connectionScanPeriod);
 
       started = true;
@@ -209,7 +209,7 @@
       {
          throw new IllegalStateException("Unable to create connection, server hasn't finished starting up");
       }
-
+      
       RemotingConnection replicatingConnection = server.getReplicatingConnection();
 
       RemotingConnection rc = new RemotingConnectionImpl(connection,                                                                                             
@@ -231,7 +231,7 @@
    public void connectionDestroyed(final Object connectionID)
    {
       RemotingConnection conn = connections.remove(connectionID);
-
+      
       if (conn != null)
       {
          conn.destroy();

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java	2008-11-06 11:07:01 UTC (rev 5286)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java	2008-11-06 11:25:41 UTC (rev 5287)
@@ -2548,6 +2548,7 @@
    {
       try
       {
+         log.info("Connection failed, so clearing up resources for session " + name);
          for (Runnable runner : failureRunners)
          {
             try
@@ -2562,6 +2563,8 @@
 
          //We call handleClose() since we need to replicate the close too, if there is a backup
          handleClose(new PacketImpl(PacketImpl.SESS_CLOSE));
+         
+         log.info("Cleared up resources for session " + name);
       }
       catch (Throwable t)
       {

Added: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/ReplicateConnectionFailureTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/ReplicateConnectionFailureTest.java	                        (rev 0)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/ReplicateConnectionFailureTest.java	2008-11-06 11:25:41 UTC (rev 5287)
@@ -0,0 +1,190 @@
+/*
+ * JBoss, Home of Professional Open Source Copyright 2005-2008, Red Hat
+ * Middleware LLC, and individual contributors by the @authors tag. See the
+ * copyright.txt in the distribution for a full listing of individual
+ * contributors.
+ * 
+ * This is free software; you can redistribute it and/or modify it under the
+ * terms of the GNU Lesser General Public License as published by the Free
+ * Software Foundation; either version 2.1 of the License, or (at your option)
+ * any later version.
+ * 
+ * This software is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+ * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
+ * details.
+ * 
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this software; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA, or see the FSF
+ * site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.tests.integration.cluster;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import junit.framework.TestCase;
+
+import org.jboss.messaging.core.client.ClientSession;
+import org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl;
+import org.jboss.messaging.core.client.impl.ClientSessionFactoryInternal;
+import org.jboss.messaging.core.client.impl.ClientSessionImpl;
+import org.jboss.messaging.core.config.Configuration;
+import org.jboss.messaging.core.config.TransportConfiguration;
+import org.jboss.messaging.core.config.impl.ConfigurationImpl;
+import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.remoting.impl.RemotingConnectionImpl;
+import org.jboss.messaging.core.remoting.impl.invm.InVMRegistry;
+import org.jboss.messaging.core.remoting.impl.invm.TransportConstants;
+import org.jboss.messaging.core.server.MessagingService;
+import org.jboss.messaging.core.server.impl.MessagingServiceImpl;
+import org.jboss.messaging.util.SimpleString;
+
+/**
+ * 
+ * A ReplicateConnectionFailureTest
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * 
+ * Created 6 Nov 2008 08:42:36
+ *
+ * Test whether when a connection is failed on the server since server receives no ping, that close
+ * is replicated to backup.
+ *
+ */
+public class ReplicateConnectionFailureTest extends TestCase
+{
+   private static final Logger log = Logger.getLogger(ReplicateConnectionFailureTest.class);
+
+   // Constants -----------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+
+   private static final SimpleString ADDRESS = new SimpleString("FailoverTestAddress");
+
+   private MessagingService liveService;
+
+   private MessagingService backupService;
+
+   private final Map<String, Object> backupParams = new HashMap<String, Object>();
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   // Public --------------------------------------------------------
+
+   public void testFailConnection() throws Exception
+   {
+      final long pingPeriod = 500;
+                
+      ClientSessionFactoryInternal sf1 = new ClientSessionFactoryImpl(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory"),
+                                                                      new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
+                                                                                                 backupParams),
+                                                                      pingPeriod,
+                                                                      5000,
+                                                                      1024 * 1024,
+                                                                      -1,
+                                                                      1024 * 1024,
+                                                                      -1,
+                                                                      false,
+                                                                      false,
+                                                                      false,
+                                                                      false,
+                                                                      8);
+
+      sf1.setSendWindowSize(32 * 1024);
+
+      assertEquals(0, liveService.getServer().getRemotingService().getConnections().size());
+
+      assertEquals(0, backupService.getServer().getRemotingService().getConnections().size());
+
+      ClientSession session1 = sf1.createSession(false, true, true, false);
+
+      // One connection
+      assertEquals(1, liveService.getServer().getRemotingService().getConnections().size());
+
+      // One replicating connection per session + one backup connection per session
+      assertEquals(2, backupService.getServer().getRemotingService().getConnections().size());
+
+      session1.close();
+
+      log.info("recreating");
+
+      assertEquals(0, liveService.getServer().getRemotingService().getConnections().size());
+
+      assertEquals(0, backupService.getServer().getRemotingService().getConnections().size());
+
+      session1 = sf1.createSession(false, true, true, false);
+
+      assertEquals(1, liveService.getServer().getRemotingService().getConnections().size());
+
+      assertEquals(2, backupService.getServer().getRemotingService().getConnections().size());
+
+      final RemotingConnectionImpl conn1 = (RemotingConnectionImpl)((ClientSessionImpl)session1).getConnection();
+
+      conn1.stopPingingAfterOne();
+      
+      Thread.sleep(3 * pingPeriod);
+      
+      assertEquals(0, liveService.getServer().getRemotingService().getConnections().size());
+
+      //Should be one connection left to the backup - the other one (replicating connection) should be automatically closed
+      assertEquals(1, backupService.getServer().getRemotingService().getConnections().size());
+
+      session1.close();
+      
+      assertEquals(0, liveService.getServer().getRemotingService().getConnections().size());
+
+      assertEquals(0, backupService.getServer().getRemotingService().getConnections().size());      
+   }
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   @Override
+   protected void setUp() throws Exception
+   {
+      Configuration backupConf = new ConfigurationImpl();
+      backupConf.setConnectionScanPeriod(100);
+      backupConf.setSecurityEnabled(false);
+      backupParams.put(TransportConstants.SERVER_ID_PROP_NAME, 1);
+      backupConf.getAcceptorConfigurations()
+                .add(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMAcceptorFactory",
+                                                backupParams));
+      backupConf.setBackup(true);
+      backupService = MessagingServiceImpl.newNullStorageMessagingServer(backupConf);
+      backupService.start();
+
+      Configuration liveConf = new ConfigurationImpl();
+      liveConf.setConnectionScanPeriod(100);
+      liveConf.setSecurityEnabled(false);
+      liveConf.getAcceptorConfigurations()
+              .add(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMAcceptorFactory"));
+      liveConf.setBackupConnectorConfiguration(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
+                                                                          backupParams));
+      liveService = MessagingServiceImpl.newNullStorageMessagingServer(liveConf);
+      liveService.start();
+   }
+
+   @Override
+   protected void tearDown() throws Exception
+   {
+      assertEquals(0, backupService.getServer().getRemotingService().getConnections().size());
+
+      backupService.stop();
+
+      assertEquals(0, liveService.getServer().getRemotingService().getConnections().size());
+
+      liveService.stop();
+
+      assertEquals(0, InVMRegistry.instance.size());
+   }
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+}




More information about the jboss-cvs-commits mailing list