[hornetq-commits] JBoss hornetq SVN: r11202 - in branches/Branch_2_2_EAP: tests/src/org/hornetq/tests/integration/cluster/distribution and 1 other directory.

do-not-reply at jboss.org do-not-reply at jboss.org
Sat Aug 13 03:07:31 EDT 2011


Author: clebert.suconic
Date: 2011-08-13 03:07:31 -0400 (Sat, 13 Aug 2011)
New Revision: 11202

Modified:
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
   branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
Log:
testsuite fixes

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java	2011-08-13 06:36:26 UTC (rev 11201)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java	2011-08-13 07:07:31 UTC (rev 11202)
@@ -661,10 +661,8 @@
                                 final Queue queue,
                                 final boolean start) throws Exception
    {
-      Topology topology = new Topology(null);
-      final ServerLocatorInternal targetLocator = new ServerLocatorImpl(topology,
-                                                                        false,
-                                                                        connector);
+      final Topology topology = new Topology(null);
+      final ServerLocatorInternal targetLocator = new ServerLocatorImpl(topology, false, connector);
 
       targetLocator.setReconnectAttempts(0);
 
@@ -689,35 +687,53 @@
       {
          targetLocator.setRetryInterval(retryInterval);
       }
-      
+
       targetLocator.disableFinalizeCheck();
-      
+
       targetLocator.connect();
-      
+      ClusterTopologyListener listenerOnBridgeTopology = new ClusterTopologyListener()
+      {
 
-      MessageFlowRecordImpl record = new MessageFlowRecordImpl(targetLocator, targetNodeID, connector, queueName, queue);
+         public void nodeDown(String nodeID)
+         {
+            clusterManagerTopology.removeMember(nodeID);
+         }
 
-      topology.setOwner(record);
-      
-      // Establish a proxy to the main topology. 
-      // We are going to listen for adds and removes on the bridges as well
-      topology.addClusterTopologyListener(new ClusterTopologyListener(){
+         public void nodeUP(String nodeID,
+                            Pair<TransportConfiguration, TransportConfiguration> connectorPair,
+                            boolean last)
+         {
+            clusterManagerTopology.addMember(nodeID, new TopologyMember(connectorPair), last);
+         }
 
+      };
+
+      ClusterTopologyListener listenerOnMainTopology = new ClusterTopologyListener()
+      {
          public void nodeDown(String nodeID)
          {
-            clusterManagerTopology.removeMember(nodeID);
+            topology.removeMember(nodeID);
          }
 
          public void nodeUP(String nodeID,
                             Pair<TransportConfiguration, TransportConfiguration> connectorPair,
                             boolean last)
          {
-            clusterManagerTopology.addMember(nodeID,new TopologyMember(connectorPair), last);
+            topology.addMember(nodeID, new TopologyMember(connectorPair), last);
          }
-         
-      });
 
+      };
 
+      // Establish a proxy between each other topology
+      topology.addClusterTopologyListener(listenerOnBridgeTopology);
+
+      clusterManagerTopology.addClusterTopologyListener(listenerOnMainTopology);
+
+
+      MessageFlowRecordImpl record = new MessageFlowRecordImpl(listenerOnMainTopology, listenerOnBridgeTopology, targetLocator, targetNodeID, connector, queueName, queue);
+
+      topology.setOwner(record);
+
       ClusterConnectionBridge bridge = new ClusterConnectionBridge(this,
                                                                    manager,
                                                                    targetLocator,
@@ -761,7 +777,7 @@
          bridge.start();
       }
    }
-   
+
    // Inner classes -----------------------------------------------------------------------------------
 
    private class MessageFlowRecordImpl implements MessageFlowRecord
@@ -771,11 +787,11 @@
       private final String targetNodeID;
 
       private final TransportConfiguration connector;
-      
+
       private final ServerLocatorInternal targetLocator;
 
       private final SimpleString queueName;
-      
+
       private boolean disconnected = false;;
 
       private final Queue queue;
@@ -785,8 +801,14 @@
       private volatile boolean isClosed = false;
 
       private volatile boolean firstReset = false;
+      
+      private final ClusterTopologyListener listenerOnMainTopology;
+      
+      private final ClusterTopologyListener listenerOnBridgeTopology;
 
-      public MessageFlowRecordImpl(final ServerLocatorInternal targetLocator,
+      public MessageFlowRecordImpl(final ClusterTopologyListener listenerOnMainTopology,
+                                   final ClusterTopologyListener listenerOnBridgeTopology,
+                                   final ServerLocatorInternal targetLocator,
                                    final String targetNodeID,
                                    final TransportConfiguration connector,
                                    final SimpleString queueName,
@@ -797,6 +819,8 @@
          this.targetNodeID = targetNodeID;
          this.connector = connector;
          this.queueName = queueName;
+         this.listenerOnMainTopology = listenerOnMainTopology;
+         this.listenerOnBridgeTopology = listenerOnBridgeTopology;
       }
 
       /* (non-Javadoc)
@@ -818,7 +842,7 @@
                 firstReset +
                 "]";
       }
-      
+
       public void serverDisconnected()
       {
          this.disconnected = true;
@@ -872,17 +896,20 @@
          {
             log.trace("Stopping bridge " + bridge);
          }
+         
+         clusterManagerTopology.removeClusterTopologyListener(listenerOnMainTopology);
+         targetLocator.getTopology().removeClusterTopologyListener(listenerOnBridgeTopology);
 
          isClosed = true;
          clearBindings();
-         
+
          if (disconnected)
          {
             bridge.disconnect();
          }
 
          bridge.stop();
-         
+
          bridge.getExecutor().execute(new Runnable()
          {
             public void run()
@@ -1356,9 +1383,7 @@
             {
                log.debug(ClusterConnectionImpl.this + "Creating a serverLocator for " + Arrays.toString(tcConfigs));
             }
-            return new ServerLocatorImpl(clusterManagerTopology,
-                                         true,
-                                         tcConfigs);
+            return new ServerLocatorImpl(clusterManagerTopology, true, tcConfigs);
          }
          else
          {
@@ -1388,9 +1413,7 @@
 
       public ServerLocatorInternal createServerLocator()
       {
-         return new ServerLocatorImpl(clusterManagerTopology,
-                                      true,
-                                      dg);
+         return new ServerLocatorImpl(clusterManagerTopology, true, dg);
       }
    }
 }

Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java	2011-08-13 06:36:26 UTC (rev 11201)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java	2011-08-13 07:07:31 UTC (rev 11202)
@@ -2031,7 +2031,7 @@
           * we need to wait a little while between server start up to allow the server to communicate in some order.
           * This is to avoid split brain on startup
           * */
-         Thread.sleep(1000);
+         Thread.sleep(500);
       }
 
    }



More information about the hornetq-commits mailing list