[hornetq-commits] JBoss hornetq SVN: r10796 - in branches/Branch_2_2_EAP-cluster-cleanup: src/main/org/hornetq/core/server/cluster/impl and 5 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Tue Jun 14 01:08:29 EDT 2011


Author: clebert.suconic at jboss.com
Date: 2011-06-14 01:08:28 -0400 (Tue, 14 Jun 2011)
New Revision: 10796

Modified:
   branches/Branch_2_2_EAP-cluster-cleanup/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
   branches/Branch_2_2_EAP-cluster-cleanup/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
   branches/Branch_2_2_EAP-cluster-cleanup/src/main/org/hornetq/core/client/impl/DelegatingSession.java
   branches/Branch_2_2_EAP-cluster-cleanup/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
   branches/Branch_2_2_EAP-cluster-cleanup/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionBridge.java
   branches/Branch_2_2_EAP-cluster-cleanup/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
   branches/Branch_2_2_EAP-cluster-cleanup/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeReconnectTest.java
   branches/Branch_2_2_EAP-cluster-cleanup/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
   branches/Branch_2_2_EAP-cluster-cleanup/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java
   branches/Branch_2_2_EAP-cluster-cleanup/tests/src/org/hornetq/tests/util/UnitTestCase.java
Log:
Cluster cleanup

Modified: branches/Branch_2_2_EAP-cluster-cleanup/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
===================================================================
--- branches/Branch_2_2_EAP-cluster-cleanup/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java	2011-06-13 18:30:39 UTC (rev 10795)
+++ branches/Branch_2_2_EAP-cluster-cleanup/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java	2011-06-14 05:08:28 UTC (rev 10796)
@@ -406,10 +406,10 @@
 
    public void causeExit()
    {
-      exitLoop = true;
       synchronized (waitLock)
       {
-         waitLock.notify();
+         exitLoop = true;
+         waitLock.notifyAll();
       }
    }
 
@@ -420,7 +420,7 @@
          return;
       }
 
-      // we need to stopthe factory from connecting if it is in the middle aof trying to failover before we get the lock
+      // we need to stop the factory from connecting if it is in the middle of trying to failover before we get the lock
       causeExit();
       synchronized (createSessionLock)
       {
@@ -764,6 +764,8 @@
                ChannelHandler handler = new ClientSessionPacketHandler(session, sessionChannel);
 
                sessionChannel.setHandler(handler);
+               
+               log.info("Creating session "  + session);
 
                return new DelegatingSession(session);
             }
@@ -889,13 +891,8 @@
 
       synchronized (waitLock)
       {
-         while (true)
+         while (!exitLoop)
          {
-            if (exitLoop)
-            {
-               return;
-            }
-
             if (log.isDebugEnabled())
             {
                log.debug("Trying reconnection attempt " + count);
@@ -927,12 +924,12 @@
                   
                   try
                   {
-                     waitLock.wait(interval);
+                      waitLock.wait(interval);
                   }
                   catch (InterruptedException ignore)
                   {
                   }
-
+                  
                   // Exponential back-off
                   long newInterval = (long)(interval * retryIntervalMultiplier);
 
@@ -1283,6 +1280,21 @@
 
          if (type == PacketImpl.DISCONNECT)
          {
+//            ClientSessionFactoryImpl.this.closed = true;
+//            
+//            for (ClientSessionInternal session : ClientSessionFactoryImpl.this.sessions)
+//            {
+//               try
+//               {
+//                  log.info("cleanup session on Factory " + session);
+//                  session.cleanUp(false);
+//               }
+//               catch (Exception e)
+//               {
+//                  log.warn("Error cleaning up session " + session, e);
+//               }
+//             }
+//            
             final DisconnectMessage msg = (DisconnectMessage)packet;
             
             closeExecutor.execute(new Runnable()

Modified: branches/Branch_2_2_EAP-cluster-cleanup/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
===================================================================
--- branches/Branch_2_2_EAP-cluster-cleanup/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java	2011-06-13 18:30:39 UTC (rev 10795)
+++ branches/Branch_2_2_EAP-cluster-cleanup/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java	2011-06-14 05:08:28 UTC (rev 10796)
@@ -843,8 +843,11 @@
    {
       if (closed)
       {
+         log.info("Session was already closed, giving up now, this=" + this);
          return;
       }
+      
+      log.info("Calling close on session "  + this);
 
       try
       {
@@ -1607,6 +1610,10 @@
       return remotingConnection;
    }
    
+   /* (non-Javadoc)
+    * @see java.lang.Object#toString()
+    */
+   @Override
    public String toString()
    {
       StringBuffer buffer = new StringBuffer();
@@ -1614,7 +1621,8 @@
       {
          buffer.append(entry.getKey() + "=" + entry.getValue() + ",");
       }
-      return "ClientSessionImpl::(" + buffer.toString() + ")";
+      
+      return "ClientSessionImpl [name=" + name + ", username=" + username + ", closed=" + closed + " metaData=(" + buffer + ")]@" + Integer.toHexString(hashCode()) ;
    }
 
    // Protected
@@ -1776,13 +1784,25 @@
    private void doCleanup(boolean failingOver)
    {
       remotingConnection.removeFailureListener(this);
+      
+      log.info("calling cleanup on " + this);
 
       synchronized (this)
       {
          closed = true;
 
          channel.close();
+
+         
+         log.info("Calling unblock on " + this);
+         
+         // if the server is sending a disconnect
+         // any pending blocked operation could hang without this
+         channel.returnBlocking();
       }
+      
+      
+      log.info("Cleaned up " + this);
 
       sessionFactory.removeSession(this, failingOver);
    }

Modified: branches/Branch_2_2_EAP-cluster-cleanup/src/main/org/hornetq/core/client/impl/DelegatingSession.java
===================================================================
--- branches/Branch_2_2_EAP-cluster-cleanup/src/main/org/hornetq/core/client/impl/DelegatingSession.java	2011-06-13 18:30:39 UTC (rev 10795)
+++ branches/Branch_2_2_EAP-cluster-cleanup/src/main/org/hornetq/core/client/impl/DelegatingSession.java	2011-06-14 05:08:28 UTC (rev 10796)
@@ -578,4 +578,10 @@
    {
       return session.isCompressLargeMessages();
    }
+   
+   public String toString()
+   {
+      return "DelegatingSession [session=" + session + "]";
+   }
+
 }

Modified: branches/Branch_2_2_EAP-cluster-cleanup/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
===================================================================
--- branches/Branch_2_2_EAP-cluster-cleanup/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java	2011-06-13 18:30:39 UTC (rev 10795)
+++ branches/Branch_2_2_EAP-cluster-cleanup/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java	2011-06-14 05:08:28 UTC (rev 10796)
@@ -242,24 +242,33 @@
 
    public void stop() throws Exception
    {
-      if (started)
-      {
-         executor.execute(new Runnable()
-         {
-            public void run()
-            {
-               // We need to stop the csf here otherwise the stop runnable never runs since the createobjectsrunnable is
-               // trying to connect to the target
-               // server which isn't up in an infinite loop
-               if (csf != null)
-               {
-                  //csf.close();
-                  csf = null; 
-               }
-            }
-         });
-      }
       
+         // TODO: Remove this during merge:
+         // If we close the csf, at the same time we could have the bridge calling close and proper cancellations (not just reseting as we used to do)
+         // we could have either Dead locks or very slow shutdowns on the testsuite.
+         // The solution I could find so far was to just leave the csf on
+      
+          // TODO: Need to find a better way to close the CSF
+      
+      
+//      if (started)
+//      {
+//         executor.execute(new Runnable()
+//         {
+//            public void run()
+//            {
+//               // We need to stop the csf here otherwise the stop runnable never runs since the createobjectsrunnable is
+//               // trying to connect to the target
+//               // server which isn't up in an infinite loop
+//               if (csf != null)
+//               {
+//                  csf.close();
+//                  csf = null; 
+//               }
+//            }
+//         });
+//      }
+//      
       log.info("Bridge " + this.name + " being stopped");
       
       stopping = true;
@@ -590,7 +599,18 @@
             csf = createSessionFactory();
             // Session is pre-acknowledge
             session = (ClientSessionInternal)csf.createSession(user, password, false, true, true, true, 1);
+            
+            try
+            {
+               session.addMetaData("Session-for-bridge", name.toString());
+               session.addMetaData("nodeUUID", nodeUUID.toString());
+            }
+            catch (Throwable dontCare)
+            {
+               // addMetaData here is just for debug purposes
+            }
 
+
             if (forwardingAddress != null)
             {
                BindingQuery query = null;
@@ -661,6 +681,8 @@
          }
          catch (HornetQException e)
          {
+            e.printStackTrace();
+            System.out.println("ex " + e);
             if (csf != null)
             {
                csf.close();
@@ -707,14 +729,18 @@
 
    private class StopRunnable implements Runnable
    {
+      Exception created = new Exception ("Stop bridge called at for session = " + session);
       public void run()
       {
          try
          {
             // We need to close the session outside of the lock,
             // so any pending operation will be canceled right away
+            csf = null;
             if (session != null)
             {
+               log.info("Stopping bridge called at ", created);
+               log.info("Cleaning up session " + session);
                session.close();
                session.removeFailureListener(BridgeImpl.this);
             }

Modified: branches/Branch_2_2_EAP-cluster-cleanup/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionBridge.java
===================================================================
--- branches/Branch_2_2_EAP-cluster-cleanup/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionBridge.java	2011-06-13 18:30:39 UTC (rev 10795)
+++ branches/Branch_2_2_EAP-cluster-cleanup/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionBridge.java	2011-06-14 05:08:28 UTC (rev 10796)
@@ -149,7 +149,6 @@
    {
       if (flowRecord != null)
       {
-         // TODO: can I really remove this? nope
          flowRecord.reset();
 
          if (notifConsumer != null)

Modified: branches/Branch_2_2_EAP-cluster-cleanup/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- branches/Branch_2_2_EAP-cluster-cleanup/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java	2011-06-13 18:30:39 UTC (rev 10795)
+++ branches/Branch_2_2_EAP-cluster-cleanup/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java	2011-06-14 05:08:28 UTC (rev 10796)
@@ -63,13 +63,11 @@
 import org.hornetq.core.paging.impl.PagingManagerImpl;
 import org.hornetq.core.paging.impl.PagingStoreFactoryNIO;
 import org.hornetq.core.persistence.GroupingInfo;
-import org.hornetq.core.persistence.OperationContext;
 import org.hornetq.core.persistence.QueueBindingInfo;
 import org.hornetq.core.persistence.StorageManager;
 import org.hornetq.core.persistence.config.PersistedAddressSetting;
 import org.hornetq.core.persistence.config.PersistedRoles;
 import org.hornetq.core.persistence.impl.journal.JournalStorageManager;
-import org.hornetq.core.persistence.impl.journal.OperationContextImpl;
 import org.hornetq.core.persistence.impl.nullpm.NullStorageManager;
 import org.hornetq.core.postoffice.Binding;
 import org.hornetq.core.postoffice.DuplicateIDCache;
@@ -159,6 +157,8 @@
    private final MBeanServer mbeanServer;
 
    private volatile boolean started;
+   
+   private volatile boolean stopped;
 
    private volatile SecurityStore securityStore;
 
@@ -353,6 +353,11 @@
 
             nodeManager.startLiveNode();
 
+            if (stopped)
+            {
+               return;
+            }
+            
             initialisePart2();
             
             log.info("Server is now live");
@@ -379,6 +384,8 @@
 
    private class SharedStoreBackupActivation implements Activation
    {
+      
+      volatile boolean closed = false;
       public void run()
       {
          try
@@ -397,6 +404,11 @@
             
             configuration.setBackup(false);
             
+            if (stopped)
+            {
+               return;
+            }
+            
             initialisePart2();
             
             clusterManager.activate();
@@ -480,6 +492,7 @@
 
                backupActivationThread.interrupt();
 
+               // TODO: do we really need this?
                Thread.sleep(1000);
             }
 
@@ -536,6 +549,8 @@
 
    public synchronized void start() throws Exception
    {
+      stopped = false;
+      
       initialiseLogging();
 
       checkJournalDirectory();
@@ -618,6 +633,7 @@
 
    public void stop() throws Exception
    {
+      stopped = true;
       stop(configuration.isFailoverOnServerShutdown());
    }
 
@@ -1463,7 +1479,13 @@
    private void initialisePart2() throws Exception
    {
       // Load the journal and populate queues, transactions and caches in memory
-
+      
+      
+      if (stopped)
+      {
+         return;
+      }
+      
       pagingManager.reloadStores();
       
       JournalLoadInformation[] journalInfo = loadJournals();

Modified: branches/Branch_2_2_EAP-cluster-cleanup/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeReconnectTest.java
===================================================================
--- branches/Branch_2_2_EAP-cluster-cleanup/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeReconnectTest.java	2011-06-13 18:30:39 UTC (rev 10795)
+++ branches/Branch_2_2_EAP-cluster-cleanup/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeReconnectTest.java	2011-06-14 05:08:28 UTC (rev 10796)
@@ -21,10 +21,15 @@
 import junit.framework.Assert;
 
 import org.hornetq.api.core.HornetQException;
-import org.hornetq.api.core.Pair;
 import org.hornetq.api.core.SimpleString;
 import org.hornetq.api.core.TransportConfiguration;
-import org.hornetq.api.core.client.*;
+import org.hornetq.api.core.client.ClientConsumer;
+import org.hornetq.api.core.client.ClientMessage;
+import org.hornetq.api.core.client.ClientProducer;
+import org.hornetq.api.core.client.ClientSession;
+import org.hornetq.api.core.client.ClientSessionFactory;
+import org.hornetq.api.core.client.HornetQClient;
+import org.hornetq.api.core.client.ServerLocator;
 import org.hornetq.core.config.BridgeConfiguration;
 import org.hornetq.core.config.CoreQueueConfiguration;
 import org.hornetq.core.config.impl.ConfigurationImpl;

Modified: branches/Branch_2_2_EAP-cluster-cleanup/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
===================================================================
--- branches/Branch_2_2_EAP-cluster-cleanup/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java	2011-06-13 18:30:39 UTC (rev 10795)
+++ branches/Branch_2_2_EAP-cluster-cleanup/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java	2011-06-14 05:08:28 UTC (rev 10796)
@@ -1812,6 +1812,7 @@
          * we need to wait a lil while between server start up to allow the server to communicate in some order.
          * This is to avoid split brain on startup
          * */
+         // TODO: Do we really need this?
          Thread.sleep(500);
       }
       for (int node : nodes)

Modified: branches/Branch_2_2_EAP-cluster-cleanup/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java
===================================================================
--- branches/Branch_2_2_EAP-cluster-cleanup/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java	2011-06-13 18:30:39 UTC (rev 10795)
+++ branches/Branch_2_2_EAP-cluster-cleanup/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java	2011-06-14 05:08:28 UTC (rev 10796)
@@ -140,6 +140,7 @@
             staticConnectors, false);
       backupConfig.getClusterConfigurations().add(cccLive);
       backupServer = createBackupServer();
+      backupServer.getServer().setIdentity("bkpIdentityServer");
 
       liveConfig = super.createDefaultConfig();
       liveConfig.getAcceptorConfigurations().clear();

Modified: branches/Branch_2_2_EAP-cluster-cleanup/tests/src/org/hornetq/tests/util/UnitTestCase.java
===================================================================
--- branches/Branch_2_2_EAP-cluster-cleanup/tests/src/org/hornetq/tests/util/UnitTestCase.java	2011-06-13 18:30:39 UTC (rev 10795)
+++ branches/Branch_2_2_EAP-cluster-cleanup/tests/src/org/hornetq/tests/util/UnitTestCase.java	2011-06-14 05:08:28 UTC (rev 10796)
@@ -971,7 +971,15 @@
       }
       
       // We shutdown the global pools to give a better isolation between tests
-      ServerLocatorImpl.clearThreadPools();
+      try
+      {
+         ServerLocatorImpl.clearThreadPools();
+      }
+      catch (Throwable e)
+      {
+         log.info(threadDump(e.getMessage()));
+         throw new RuntimeException (e.getMessage(), e);
+      }
    }
 
    protected byte[] autoEncode(final Object... args)



More information about the hornetq-commits mailing list