[hornetq-commits] JBoss hornetq SVN: r11194 - in branches/Branch_2_2_EAP: src/main/org/hornetq/core/server/cluster/impl and 1 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Thu Aug 11 22:20:13 EDT 2011


Author: clebert.suconic at jboss.com
Date: 2011-08-11 22:20:12 -0400 (Thu, 11 Aug 2011)
New Revision: 11194

Modified:
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ServerLocatorInternal.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
   branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
Log:
Improving testsuite

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java	2011-08-12 00:05:39 UTC (rev 11193)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java	2011-08-12 02:20:12 UTC (rev 11194)
@@ -1137,9 +1137,19 @@
 
       super.finalize();
    }
+   
+   public void cleanup()
+   {
+      doClose(false);
+   }
 
    public void close()
    {
+      doClose(true);
+   }
+   
+   protected void doClose(final boolean sendClose)
+   {
       if (closed)
       {
          if (log.isDebugEnabled())
@@ -1176,7 +1186,14 @@
 
       for (ClientSessionFactory factory : clonedFactory)
       {
-         factory.close();
+         if (sendClose)
+         {
+            factory.close();
+         }
+         else
+         {
+            factory.cleanup();
+         }
       }
 
       factories.clear();

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ServerLocatorInternal.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ServerLocatorInternal.java	2011-08-12 00:05:39 UTC (rev 11193)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ServerLocatorInternal.java	2011-08-12 02:20:12 UTC (rev 11194)
@@ -39,6 +39,8 @@
    void setNodeID(String nodeID);
 
    String getNodeID();
+   
+   void cleanup();
 
    ClientSessionFactory connect() throws  Exception;
 

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java	2011-08-12 00:05:39 UTC (rev 11193)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java	2011-08-12 02:20:12 UTC (rev 11194)
@@ -104,7 +104,7 @@
 
    private volatile ServerLocatorInternal backupServerLocator;
 
-   private final Set<ServerLocator> clusterLocators = new ConcurrentHashSet<ServerLocator>();
+   private final Set<ServerLocatorInternal> clusterLocators = new ConcurrentHashSet<ServerLocatorInternal>();
 
    private final Executor executor;
 
@@ -143,21 +143,21 @@
 
       this.clustered = clustered;
    }
-   
+
    public String describe()
    {
       StringWriter str = new StringWriter();
       PrintWriter out = new PrintWriter(str);
-      
+
       out.println("Information on " + this);
       out.println("*******************************************************");
       out.println("Topology: " + topology.describe("Toopology on " + this));
-      
+
       for (ClusterConnection conn : this.clusterConnections.values())
       {
          out.println(conn.describe());
       }
-      
+
       out.println("*******************************************************");
 
       return str.toString();
@@ -167,7 +167,7 @@
    {
       return "ClusterManagerImpl[server=" + server + "]@" + System.identityHashCode(this);
    }
-   
+
    public synchronized void start() throws Exception
    {
       if (started)
@@ -204,56 +204,60 @@
       started = true;
    }
 
-   public synchronized void stop() throws Exception
+   public void stop() throws Exception
    {
-      if (!started)
+      synchronized (this)
       {
-         return;
-      }
+         if (!started)
+         {
+            return;
+         }
 
-      if (clustered)
-      {
-         for (BroadcastGroup group : broadcastGroups.values())
+         if (clustered)
          {
-            group.stop();
-            managementService.unregisterBroadcastGroup(group.getName());
+            for (BroadcastGroup group : broadcastGroups.values())
+            {
+               group.stop();
+               managementService.unregisterBroadcastGroup(group.getName());
+            }
+
+            broadcastGroups.clear();
+
+            for (ClusterConnection clusterConnection : clusterConnections.values())
+            {
+               clusterConnection.stop();
+               managementService.unregisterCluster(clusterConnection.getName().toString());
+            }
+
          }
 
-         broadcastGroups.clear();
-
-         for (ClusterConnection clusterConnection : clusterConnections.values())
+         for (Bridge bridge : bridges.values())
          {
-            clusterConnection.stop();
-            managementService.unregisterCluster(clusterConnection.getName().toString());
+            bridge.stop();
+            managementService.unregisterBridge(bridge.getName().toString());
          }
 
-      }
+         bridges.clear();
 
-      for (Bridge bridge : bridges.values())
-      {
-         bridge.stop();
-         managementService.unregisterBridge(bridge.getName().toString());
+         if (backupServerLocator != null)
+         {
+            backupServerLocator.close();
+            backupServerLocator = null;
+         }
       }
 
-      bridges.clear();
-
-      if (backupServerLocator != null)
+      for (ServerLocatorInternal clusterLocator : clusterLocators)
       {
-         backupServerLocator.close();
-         backupServerLocator = null;
-      }
-
-      executor.execute(new Runnable()
-      {
-         public void run()
+         try
          {
-            for (ServerLocator clusterLocator : clusterLocators)
-            {
-               clusterLocator.close();
-            }
-            clusterLocators.clear();
+            clusterLocator.close();
          }
-      });
+         catch (Exception e)
+         {
+            log.warn("Error closing serverLocator=" + clusterLocator + ", message=" + e.getMessage(), e);
+         }
+      }
+      clusterLocators.clear();
       started = false;
 
       clusterConnections.clear();
@@ -265,9 +269,9 @@
       {
          return;
       }
-      
-      log.debug(this + "::removing nodeID=" + nodeID, new Exception ("trace"));
 
+      log.debug(this + "::removing nodeID=" + nodeID, new Exception("trace"));
+
       topology.removeMember(nodeID);
 
    }
@@ -284,22 +288,32 @@
 
       TopologyMember member = new TopologyMember(connectorPair);
       boolean updated = topology.addMember(nodeID, member, last);
-      
+
       if (!updated)
       {
          if (log.isDebugEnabled())
          {
-            log.debug(this + " ignored notifyNodeUp on nodeID=" + nodeID + " pair=" + connectorPair + " as the topology already knew about it");
+            log.debug(this + " ignored notifyNodeUp on nodeID=" +
+                      nodeID +
+                      " pair=" +
+                      connectorPair +
+                      " as the topology already knew about it");
          }
          return;
       }
 
       if (log.isDebugEnabled())
       {
-         log.debug(this + " received notifyNodeUp nodeID=" + nodeID + " connectorPair=" + connectorPair + 
-                   ", nodeAnnounce=" + nodeAnnounce + ", last=" + last);
+         log.debug(this + " received notifyNodeUp nodeID=" +
+                   nodeID +
+                   " connectorPair=" +
+                   connectorPair +
+                   ", nodeAnnounce=" +
+                   nodeAnnounce +
+                   ", last=" +
+                   last);
       }
-      
+
       // if this is a node being announced we are hearing it direct from the nodes CM so need to inform our cluster
       // connections.
       if (nodeAnnounce)
@@ -312,8 +326,14 @@
          {
             if (log.isTraceEnabled())
             {
-               log.trace(this + " information clusterConnection=" + clusterConnection + 
-                         " nodeID=" + nodeID + " connectorPair=" + connectorPair + " last=" + last);
+               log.trace(this + " information clusterConnection=" +
+                         clusterConnection +
+                         " nodeID=" +
+                         nodeID +
+                         " connectorPair=" +
+                         connectorPair +
+                         " last=" +
+                         last);
             }
             clusterConnection.nodeUP(nodeID, connectorPair, last);
          }
@@ -350,17 +370,17 @@
       topology.addClusterTopologyListener(listener);
 
       // We now need to send the current topology to the client
-      executor.execute(new Runnable(){
+      executor.execute(new Runnable()
+      {
          public void run()
          {
             topology.sendTopology(listener);
-            
+
          }
       });
    }
 
-   public void removeClusterTopologyListener(final ClusterTopologyListener listener,
-                                                          final boolean clusterConnection)
+   public void removeClusterTopologyListener(final ClusterTopologyListener listener, final boolean clusterConnection)
    {
       topology.removeClusterTopologyListener(listener);
    }
@@ -380,8 +400,9 @@
          String nodeID = server.getNodeID().toString();
 
          TopologyMember member = topology.getMember(nodeID);
-         //swap backup as live and send it to everybody
-         member = new TopologyMember(new Pair<TransportConfiguration, TransportConfiguration>(member.getConnector().b, null));
+         // swap backup as live and send it to everybody
+         member = new TopologyMember(new Pair<TransportConfiguration, TransportConfiguration>(member.getConnector().b,
+                                                                                              null));
          topology.addMember(nodeID, member, false);
 
          if (backupServerLocator != null)
@@ -434,7 +455,7 @@
                log.warn("unable to start bridge " + bridge.getName(), e);
             }
          }
-         
+
          topology.sendMemberToListeners(nodeID, member);
       }
    }
@@ -460,7 +481,7 @@
          log.warn("no cluster connections defined, unable to announce backup");
       }
    }
-   
+
    void addClusterLocator(final ServerLocatorInternal serverLocator)
    {
       this.clusterLocators.add(serverLocator);
@@ -681,7 +702,7 @@
       }
 
       serverLocator.setConfirmationWindowSize(config.getConfirmationWindowSize());
-      
+
       // We are going to manually retry on the bridge in case of failure
       serverLocator.setReconnectAttempts(0);
       serverLocator.setInitialConnectAttempts(-1);
@@ -693,12 +714,12 @@
       serverLocator.setBlockOnNonDurableSend(!config.isUseDuplicateDetection());
       if (!config.isUseDuplicateDetection())
       {
-         log.debug("Bridge " + config.getName() + 
+         log.debug("Bridge " + config.getName() +
                    " is configured to not use duplicate detecion, it will send messages synchronously");
       }
-      
+
       clusterLocators.add(serverLocator);
-      
+
       Bridge bridge = new BridgeImpl(serverLocator,
                                      config.getReconnectAttempts(),
                                      config.getRetryInterval(),
@@ -731,7 +752,7 @@
    public void destroyBridge(final String name) throws Exception
    {
       Bridge bridge;
-      
+
       synchronized (this)
       {
          bridge = bridges.remove(name);
@@ -741,7 +762,7 @@
             managementService.unregisterBridge(name);
          }
       }
-      
+
       bridge.flushExecutor();
    }
 
@@ -790,10 +811,13 @@
                                         "'. The cluster connection will not be deployed.");
             return;
          }
-         
+
          if (log.isDebugEnabled())
          {
-            log.debug(this + " Starting a Discovery Group Cluster Connection, name=" + config.getDiscoveryGroupName() + ", dg=" + dg);
+            log.debug(this + " Starting a Discovery Group Cluster Connection, name=" +
+                      config.getDiscoveryGroupName() +
+                      ", dg=" +
+                      dg);
          }
 
          clusterConnection = new ClusterConnectionImpl(this,
@@ -828,7 +852,7 @@
       {
          TransportConfiguration[] tcConfigs = config.getStaticConnectors() != null ? connectorNameListToArray(config.getStaticConnectors())
                                                                                   : null;
-         
+
          if (log.isDebugEnabled())
          {
             log.debug(this + " defining cluster connection towards " + Arrays.toString(tcConfigs));
@@ -869,7 +893,7 @@
 
       if (log.isDebugEnabled())
       {
-         log.debug("ClusterConnection.start at " + clusterConnection, new Exception ("trace"));
+         log.debug("ClusterConnection.start at " + clusterConnection, new Exception("trace"));
       }
       clusterConnection.start();
 

Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java	2011-08-12 00:05:39 UTC (rev 11193)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java	2011-08-12 02:20:12 UTC (rev 11194)
@@ -2059,6 +2059,7 @@
             {
                ClusterTestBase.log.info("stopping server " + node);
                servers[node].stop();
+               Thread.sleep(500);
                ClusterTestBase.log.info("server " + node + " stopped");
             }
             catch (Exception e)



More information about the hornetq-commits mailing list