[hornetq-commits] JBoss hornetq SVN: r9836 - in branches/2_2_0_HA_Improvements: src/main/org/hornetq/core/server/cluster/impl and 2 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Wed Nov 3 09:02:02 EDT 2010


Author: ataylor
Date: 2010-11-03 09:02:01 -0400 (Wed, 03 Nov 2010)
New Revision: 9836

Modified:
   branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
   branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
   branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
   branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/topology/TopologyClusterTestBase.java
   branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/util/ServiceTestBase.java
Log:
call listeners before reconnect and test fixes

Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java	2010-11-03 08:37:11 UTC (rev 9835)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java	2010-11-03 13:02:01 UTC (rev 9836)
@@ -165,7 +165,7 @@
                                    final int reconnectAttempts,
                                    final ExecutorService threadPool,
                                    final ScheduledExecutorService scheduledThreadPool,
-                                   final List<Interceptor> interceptors) throws HornetQException
+                                   final List<Interceptor> interceptors)
    {
 
       e.fillInStackTrace();
@@ -613,15 +613,19 @@
             connection = null;
          }
 
-         callFailureListeners(me, true, connection != null);
 
          if (connection == null)
          {
             sessionsToClose = new HashSet<ClientSessionInternal>(sessions);
+            callFailureListeners(me, true, false);
          }
       }
 
       // This needs to be outside the failover lock to prevent deadlock
+      if(connection != null)
+      {
+         callFailureListeners(me, true, true);  
+      }
       if (sessionsToClose != null)
       {
          // If connection is null it means we didn't succeed in failing over or reconnecting
@@ -1191,18 +1195,24 @@
          if (type == PacketImpl.DISCONNECT)
          {
             final DisconnectMessage msg = (DisconnectMessage)packet;
+            if (msg.getNodeID() != null)
+                  {
+                     System.out.println("received disconnect from node " + msg.getNodeID());
+                  }
             closeExecutor.execute(new Runnable()
             {
                // Must be executed on new thread since cannot block the netty thread for a long time and fail can
                // cause reconnect loop
                public void run()
                {
-                  conn.fail(new HornetQException(msg.isFailoverOnServerShutdown()?HornetQException.NOT_CONNECTED:HornetQException.DISCONNECTED,
-                                                 "The connection was disconnected because of server shutdown"));
                   if (msg.getNodeID() != null)
                   {
                      serverLocator.notifyNodeDown(msg.getNodeID().toString());
                   }
+
+                  conn.fail(new HornetQException(msg.isFailoverOnServerShutdown()?HornetQException.NOT_CONNECTED:HornetQException.DISCONNECTED,
+                                                 "The connection was disconnected because of server shutdown"));
+
                }
             });
          }

Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java	2010-11-03 08:37:11 UTC (rev 9835)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java	2010-11-03 13:02:01 UTC (rev 9836)
@@ -1297,7 +1297,19 @@
          connectors = new ArrayList<Connector>();
          for (TransportConfiguration initialConnector : initialConnectors)
          {
-            connectors.add(new Connector(initialConnector));
+            ClientSessionFactoryInternal factory = new ClientSessionFactoryImpl(ServerLocatorImpl.this,
+                     initialConnector,
+                     callTimeout,
+                     clientFailureCheckPeriod,
+                     connectionTTL,
+                     retryInterval,
+                     retryIntervalMultiplier,
+                     maxRetryInterval,
+                     reconnectAttempts,
+                     threadPool,
+                     scheduledThreadPool,
+                     interceptors);
+            connectors.add(new Connector(initialConnector, factory));
          }
       }
 
@@ -1336,18 +1348,14 @@
          private boolean interrupted = false;
          private Exception e;
 
-         public Connector(TransportConfiguration initialConnector)
+         public Connector(TransportConfiguration initialConnector, ClientSessionFactoryInternal factory)
          {
             this.initialConnector = initialConnector;
+            this.factory = factory;
          }
 
          public ClientSessionFactory call() throws HornetQException
          {
-            factory = getFactory();
-            if(factory == null)
-            {
-               return null;
-            }
             try
             {
                factory.connect(reconnectAttempts, failoverOnInitialConnection);
@@ -1359,11 +1367,11 @@
                   this.e = e;
                   throw e;
                }
-               if(factory != null)
+               /*if(factory != null)
                {
                   factory.close();
                   factory = null;
-               }
+               }*/
                return null;
             }
             isConnected = true;
@@ -1382,7 +1390,7 @@
             return isConnected;
          }
 
-         public synchronized void disconnect()
+         public void disconnect()
          {
             interrupted = true;
             
@@ -1393,39 +1401,6 @@
                factory = null;
             }
          }
-
-         private synchronized ClientSessionFactoryInternal getFactory() throws HornetQException
-         {
-            if(interrupted)
-            {
-               return null;
-            }
-            if (factory == null)
-            {
-               try
-              {
-                 initialise();
-              }
-              catch (Exception e)
-              {
-                 throw new HornetQException(HornetQException.INTERNAL_ERROR, "Failed to initialise session factory", e);
-              }
-
-               factory = new ClientSessionFactoryImpl(ServerLocatorImpl.this,
-                     initialConnector,
-                     callTimeout,
-                     clientFailureCheckPeriod,
-                     connectionTTL,
-                     retryInterval,
-                     retryIntervalMultiplier,
-                     maxRetryInterval,
-                     reconnectAttempts,
-                     threadPool,
-                     scheduledThreadPool,
-                     interceptors);
-            }
-            return factory;
-         }
       }
    }
 }

Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java	2010-11-03 08:37:11 UTC (rev 9835)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java	2010-11-03 13:02:01 UTC (rev 9836)
@@ -238,7 +238,7 @@
       {
          return;
       }
-      
+
       if (serverLocator != null)
       {
          serverLocator.removeClusterTopologyListener(this);
@@ -259,10 +259,10 @@
 
          if (serverLocator != null)
          {
+            //serverLocator.removeClusterTopologyListener(this);
             serverLocator.close();
          }
 
-
          if (managementService != null)
          {
             TypedProperties props = new TypedProperties();
@@ -331,14 +331,13 @@
       
       //Remove the flow record for that node
       
-      MessageFlowRecord record = records.remove(nodeID);
+      MessageFlowRecord record = records.get(nodeID);
 
       if (record != null)
       {
          try
          {
             record.reset();
-            //record.close();
          }
          catch (Exception e)
          {

Modified: branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/topology/TopologyClusterTestBase.java
===================================================================
--- branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/topology/TopologyClusterTestBase.java	2010-11-03 08:37:11 UTC (rev 9835)
+++ branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/integration/cluster/topology/TopologyClusterTestBase.java	2010-11-03 13:02:01 UTC (rev 9836)
@@ -59,7 +59,7 @@
 
    private static final Logger log = Logger.getLogger(TopologyClusterTestBase.class);
 
-   private static final long WAIT_TIMEOUT = 30000;
+   private static final long WAIT_TIMEOUT = 5000;
 
    abstract protected ServerLocator createHAServerLocator();
 
@@ -239,8 +239,6 @@
 
       stopServers(2, 3, 1, 4);
 
-      waitForClusterConnections(0, 0);
-
       assertTrue("Was not notified that all servers are DOWN", downLatch.await(10, SECONDS));
       checkContains(new int[] { 0 }, nodeIDs, nodes);
 

Modified: branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/util/ServiceTestBase.java
===================================================================
--- branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/util/ServiceTestBase.java	2010-11-03 08:37:11 UTC (rev 9835)
+++ branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/util/ServiceTestBase.java	2010-11-03 13:02:01 UTC (rev 9836)
@@ -93,7 +93,9 @@
       {
          for (ClientSessionFactoryImpl factory : ClientSessionFactoryImpl.factories)
          {
+          //  System.out.println(threadDump("oops"));
             //factory.e.printStackTrace();
+           // System.exit(0);
          }
       }
       super.tearDown();    //To change body of overridden methods use File | Settings | File Templates.



More information about the hornetq-commits mailing list