[jboss-cvs] JBoss Messaging SVN: r6252 - in trunk: src/main/org/jboss/messaging/core/remoting/impl and 3 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Wed Apr 1 03:00:03 EDT 2009


Author: timfox
Date: 2009-04-01 03:00:03 -0400 (Wed, 01 Apr 2009)
New Revision: 6252

Added:
   trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/NettyMultiThreadRandomFailoverTest.java
Modified:
   trunk/src/main/org/jboss/messaging/core/client/impl/ConnectionManagerImpl.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java
   trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
   trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
   trunk/src/main/org/jboss/messaging/jms/client/JBossSession.java
   trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/MultiThreadFailoverSupport.java
   trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/MultiThreadRandomFailoverTestBase.java
Log:
more tweaks to failover + netty multi thread failover test + disabled createproducer security checks

Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ConnectionManagerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ConnectionManagerImpl.java	2009-04-01 06:05:36 UTC (rev 6251)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ConnectionManagerImpl.java	2009-04-01 07:00:03 UTC (rev 6252)
@@ -512,13 +512,6 @@
 
          boolean attemptFailover = (backupConnectorFactory) != null && (failoverOnServerShutdown || me.getCode() != MessagingException.SERVER_DISCONNECTED);
 
-//         log.info(System.identityHashCode(this) + " in failover or reconnect, attemptFailover is " +
-//                  attemptFailover +
-//                  " failoveronservers:" +
-//                  failoverOnServerShutdown +
-//                  " me.getcode " +
-//                  me.getCode());
-
          boolean done = false;
 
          if (attemptFailover || reconnectAttempts != 0)

Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java	2009-04-01 06:05:36 UTC (rev 6251)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java	2009-04-01 07:00:03 UTC (rev 6252)
@@ -1196,14 +1196,16 @@
       public void replicatePacket(final Packet packet, final long replicatedChannelID, final Runnable action)
       {
          packet.setChannelID(replicatedChannelID);
+         
+         boolean runItNow = false;
 
          synchronized (replicationLock)
          {
             if (playedResponsesOnFailure && action != null)
-            {
+            {               
                // Already replicating channel failed, so just play the action now
-
-               action.run();
+               
+               runItNow = true;               
             }
             else
             {
@@ -1221,6 +1223,13 @@
                connection.transportConnection.write(buffer);
             }
          }
+         
+         //Execute outside lock
+         
+         if (runItNow)
+         {
+            action.run();
+         }
       }
 
       public void executeOutstandingDelayedResults()
@@ -1238,6 +1247,8 @@
 
       private void doExecuteOutstandingDelayedResults()
       {
+         List<Runnable> toRun = new ArrayList<Runnable>();
+         
          synchronized (replicationLock)
          {
             // Execute all the response actions now
@@ -1248,7 +1259,7 @@
 
                if (action != null)
                {
-                  action.run();
+                  toRun.add(action);
                }
                else
                {
@@ -1260,6 +1271,12 @@
 
             this.playedResponsesOnFailure = true;
          }
+         
+         //Run outside lock
+         for (Runnable action: toRun)
+         {
+            action.run();
+         }
       }
 
       public void setHandler(final ChannelHandler handler)

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java	2009-04-01 06:05:36 UTC (rev 6251)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java	2009-04-01 07:00:03 UTC (rev 6252)
@@ -220,9 +220,9 @@
       }
 
       configuration.start();
-      
+
       storageManager.start();
-      
+
       securityManager.start();
 
       managementService.start();
@@ -282,7 +282,7 @@
       if (uuid == null)
       {
          uuid = storageManager.getPersistentID();
-         
+
          if (uuid == null && !configuration.isBackup())
          {
             uuid = UUIDGenerator.getInstance().generateUUID();
@@ -306,7 +306,7 @@
          }
 
       }
-      
+
       serverManagement = managementService.registerServer(postOffice,
                                                           storageManager,
                                                           configuration,
@@ -435,26 +435,26 @@
 
       if (!configuration.isBackup())
       {
-         //Once we ready we can start the remoting service so we can start accepting connections
+         // Once we ready we can start the remoting service so we can start accepting connections
          remotingService.start();
       }
-      
+
       started = true;
    }
 
    public synchronized void start() throws Exception
-   {      
+   {
       if (started)
       {
          return;
       }
-   
+
       remotingService.setMessagingServer(this);
 
       if (configuration.isBackup())
       {
          remotingService.start();
-         
+
          // We defer actually initialisation until the live node has contacted the backup
          log.info("Backup server will await live server before becoming operational");
       }
@@ -470,20 +470,20 @@
       {
          return;
       }
-      
+
       if (clusterManager != null)
-      {       
-         clusterManager.stop();         
+      {
+         clusterManager.stop();
       }
-      
+
       remotingService.stop();
-      
+
       managementService.stop();
-      
+
       storageManager.stop();
-      
+
       securityManager.stop();
-      
+
       asyncDeliveryPool.shutdown();
 
       try
@@ -526,7 +526,7 @@
       resourceManager = null;
       serverManagement = null;
 
-      sessions.clear();      
+      sessions.clear();
 
       started = false;
       initialised = false;
@@ -643,27 +643,30 @@
       return clusterManager;
    }
 
-   private synchronized void checkActivate(final RemotingConnection connection)
+   private void checkActivate(final RemotingConnection connection)
    {
       if (configuration.isBackup())
-      {      
-         freezeBackupConnection();
-
-         List<Queue> toActivate = postOffice.activate();
-
-         for (Queue queue : toActivate)
+      {
+         synchronized (this)
          {
-            scheduledExecutor.schedule(new ActivateRunner(queue),
-                                       configuration.getQueueActivationTimeout(),
-                                       TimeUnit.MILLISECONDS);
+            freezeBackupConnection();
+   
+            List<Queue> toActivate = postOffice.activate();
+   
+            for (Queue queue : toActivate)
+            {
+               scheduledExecutor.schedule(new ActivateRunner(queue),
+                                          configuration.getQueueActivationTimeout(),
+                                          TimeUnit.MILLISECONDS);
+            }
+   
+            configuration.setBackup(false);
+   
+            if (clusterManager != null)
+            {
+               clusterManager.activate();
+            }
          }
-
-         configuration.setBackup(false);
-
-         if (clusterManager != null)
-         {
-            clusterManager.activate();
-         }
       }
 
       connection.activate();
@@ -823,7 +826,7 @@
    public boolean isInitialised()
    {
       synchronized (initialiseLock)
-      {         
+      {
          return initialised;
       }
    }
@@ -843,7 +846,7 @@
             {
                throw new IllegalStateException("Backup node already has a unique id but it's not the same as the live node id");
             }
-            
+
             return;
          }
 
@@ -1072,7 +1075,7 @@
             if (config.isDurable())
             {
                storageManager.addQueueBinding(queueBinding);
-            }                      
+            }
          }
       }
    }
@@ -1171,7 +1174,7 @@
       // some other people
       // security my be screwed up, on account of thread local security stack
       // being corrupted.
-      
+
       securityStore.authenticate(username, password);
 
       ServerSession currentSession = sessions.remove(name);

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java	2009-04-01 06:05:36 UTC (rev 6251)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java	2009-04-01 07:00:03 UTC (rev 6252)
@@ -1050,20 +1050,24 @@
       }
       
       remotingConnection.removeFailureListener(this);
+      
+      //Note. We do not destroy the replicating connection here. In the case the live server has really crashed
+      //then the connection will get cleaned up anyway when the server ping timeout kicks in.
+      //In the case the live server is really still up, i.e. a split brain situation (or in tests), then closing
+      //the replicating connection will cause the outstanding responses to be be replayed on the live server,
+      //if these reach the client who then subsequently fails over, on reconnection to backup, it will have
+      //received responses that the backup did not know about.
 
       channel.transferConnection(newConnection, this.id, replicatingChannel);
 
       newConnection.syncIDGeneratorSequence(remotingConnection.getIDGeneratorSequence());
 
-      // Destroy the old connection
-      remotingConnection.destroy();
-
       remotingConnection = newConnection;
 
       remotingConnection.addFailureListener(this);
 
       int serverLastReceivedCommandID = channel.getLastReceivedCommandID();
-
+ 
       channel.replayCommands(lastReceivedCommandID, this.id);
 
       if (wasStarted)
@@ -1086,7 +1090,7 @@
    {
       try
       {
-         log.info("Connection timed out, so clearing up resources for session " + name);
+         log.warn("Client connection failed, clearing up resources for session " + name);
 
          for (Runnable runner : failureRunners)
          {
@@ -1102,7 +1106,7 @@
 
          handleClose(new PacketImpl(PacketImpl.SESS_CLOSE));
 
-         log.info("Cleared up resources for session " + name);
+         log.warn("Cleared up resources for session " + name);
       }
       catch (Throwable t)
       {

Modified: trunk/src/main/org/jboss/messaging/jms/client/JBossSession.java
===================================================================
--- trunk/src/main/org/jboss/messaging/jms/client/JBossSession.java	2009-04-01 06:05:36 UTC (rev 6251)
+++ trunk/src/main/org/jboss/messaging/jms/client/JBossSession.java	2009-04-01 07:00:03 UTC (rev 6252)
@@ -331,27 +331,28 @@
       {
          JBossDestination jbd = (JBossDestination)destination;
 
-         if (jbd != null)
-         {
-            if (jbd instanceof Queue)
-            {
-               SessionQueueQueryResponseMessage response = session.queueQuery(jbd.getSimpleAddress());
-   
-               if (!response.isExists())
-               {
-                  throw new InvalidDestinationException("Queue " + jbd.getName() + " does not exist");
-               }
-            }
-            else
-            {
-               SessionBindingQueryResponseMessage response = session.bindingQuery(jbd.getSimpleAddress());
-   
-               if (!response.isExists())
-               {
-                  throw new InvalidDestinationException("Topic " + jbd.getName() + " does not exist");
-               }
-            }
-         }
+         //TODO Uncomment when https://jira.jboss.org/jira/browse/JBMESSAGING-1565 is complete
+//         if (jbd != null)
+//         {
+//            if (jbd instanceof Queue)
+//            {
+//               SessionQueueQueryResponseMessage response = session.queueQuery(jbd.getSimpleAddress());
+//   
+//               if (!response.isExists())
+//               {
+//                  throw new InvalidDestinationException("Queue " + jbd.getName() + " does not exist");
+//               }
+//            }
+//            else
+//            {
+//               SessionBindingQueryResponseMessage response = session.bindingQuery(jbd.getSimpleAddress());
+//   
+//               if (!response.isExists())
+//               {
+//                  throw new InvalidDestinationException("Topic " + jbd.getName() + " does not exist");
+//               }
+//            }
+//         }
 
          ClientProducer producer = session.createProducer(jbd == null ? null : jbd.getSimpleAddress());
 

Modified: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/MultiThreadFailoverSupport.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/MultiThreadFailoverSupport.java	2009-04-01 06:05:36 UTC (rev 6251)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/MultiThreadFailoverSupport.java	2009-04-01 07:00:03 UTC (rev 6252)
@@ -102,6 +102,8 @@
    {
       for (int its = 0; its < numIts; its++)
       {
+         log.info("Beginning iteration " + its);
+         
          start();
 
          final ClientSessionFactoryInternal sf = createSessionFactory();

Modified: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/MultiThreadRandomFailoverTestBase.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/MultiThreadRandomFailoverTestBase.java	2009-04-01 06:05:36 UTC (rev 6251)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/MultiThreadRandomFailoverTestBase.java	2009-04-01 07:00:03 UTC (rev 6252)
@@ -1321,7 +1321,7 @@
                                        final int numThreads,
                                        final boolean failOnCreateConnection) throws Exception
    {
-      this.runTestMultipleThreads(runnable, numThreads, failOnCreateConnection, 1000);
+      runTestMultipleThreads(runnable, numThreads, failOnCreateConnection, 1000);
    }
 
    private void runTestMultipleThreads(final RunnableT runnable,

Added: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/NettyMultiThreadRandomFailoverTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/NettyMultiThreadRandomFailoverTest.java	                        (rev 0)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/NettyMultiThreadRandomFailoverTest.java	2009-04-01 07:00:03 UTC (rev 6252)
@@ -0,0 +1,90 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2009, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.tests.integration.cluster.failover;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl;
+import org.jboss.messaging.core.client.impl.ClientSessionFactoryInternal;
+import org.jboss.messaging.core.config.Configuration;
+import org.jboss.messaging.core.config.TransportConfiguration;
+import org.jboss.messaging.core.config.impl.ConfigurationImpl;
+import org.jboss.messaging.core.server.Messaging;
+import org.jboss.messaging.integration.transports.netty.TransportConstants;
+
+/**
+ * A NettyMultiThreadRandomFailoverTest
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * 
+ * Created 18 Feb 2009 08:01:20
+ *
+ *
+ */
+public class NettyMultiThreadRandomFailoverTest extends MultiThreadRandomFailoverTest
+{
+   @Override
+   protected void start() throws Exception
+   {
+      Configuration backupConf = new ConfigurationImpl();
+      backupConf.setJMXManagementEnabled(false);
+      backupConf.setSecurityEnabled(false);
+      backupParams.put(TransportConstants.PORT_PROP_NAME, TransportConstants.DEFAULT_PORT + 1);
+      backupConf.getAcceptorConfigurations().clear();
+      backupConf.getAcceptorConfigurations()
+                .add(new TransportConfiguration("org.jboss.messaging.integration.transports.netty.NettyAcceptorFactory",
+                                                backupParams));
+      backupConf.setBackup(true);
+      backupServer = Messaging.newNullStorageMessagingServer(backupConf);
+      backupServer.start();
+
+      Configuration liveConf = new ConfigurationImpl();
+      backupConf.setJMXManagementEnabled(false);
+      liveConf.setSecurityEnabled(false);
+      liveConf.getAcceptorConfigurations().clear();
+      liveConf.getAcceptorConfigurations()
+              .add(new TransportConfiguration("org.jboss.messaging.integration.transports.netty.NettyAcceptorFactory"));
+      Map<String, TransportConfiguration> connectors = new HashMap<String, TransportConfiguration>();
+      TransportConfiguration backupTC = new TransportConfiguration("org.jboss.messaging.integration.transports.netty.NettyConnectorFactory",
+                                                                   backupParams,
+                                                                   "backup-connector");
+      connectors.put(backupTC.getName(), backupTC);
+      liveConf.setConnectorConfigurations(connectors);
+      liveConf.setBackupConnectorName(backupTC.getName());
+      liveServer = Messaging.newNullStorageMessagingServer(liveConf);
+      liveServer.start();
+   }
+
+   @Override
+   protected ClientSessionFactoryInternal createSessionFactory()
+   {
+      final ClientSessionFactoryInternal sf = new ClientSessionFactoryImpl(new TransportConfiguration("org.jboss.messaging.integration.transports.netty.NettyConnectorFactory"),
+                                                                           new TransportConfiguration("org.jboss.messaging.integration.transports.netty.NettyConnectorFactory",
+                                                                                                      backupParams));
+
+      sf.setSendWindowSize(32 * 1024);
+      return sf;
+   }
+
+}




More information about the jboss-cvs-commits mailing list