[hornetq-commits] JBoss hornetq SVN: r11206 - in branches/Branch_2_2_EAP: src/main/org/hornetq/core/server/cluster/impl and 1 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Sun Aug 14 12:00:30 EDT 2011


Author: clebert.suconic at jboss.com
Date: 2011-08-14 12:00:29 -0400 (Sun, 14 Aug 2011)
New Revision: 11206

Modified:
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
   branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
Log:
tweaks on tests

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java	2011-08-14 07:07:04 UTC (rev 11205)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java	2011-08-14 16:00:29 UTC (rev 11206)
@@ -662,7 +662,10 @@
 
             reconnectSessions(oldConnection, reconnectAttempts);
 
-            oldConnection.destroy();
+            if (oldConnection != null)
+            {
+               oldConnection.destroy();
+            }
          }
          else
          {
@@ -1342,16 +1345,28 @@
 
    private void lockChannel1()
    {
-      Channel channel1 = connection.getChannel(1, -1);
-
-      channel1.getLock().lock();
+      if (connection != null)
+      {
+         Channel channel1 = connection.getChannel(1, -1);
+   
+         if (channel1 != null)
+         {
+            channel1.getLock().lock();
+         }
+      }
    }
 
    private void unlockChannel1()
    {
-      Channel channel1 = connection.getChannel(1, -1);
-
-      channel1.getLock().unlock();
+      if (connection != null)
+      {
+         Channel channel1 = connection.getChannel(1, -1);
+   
+         if (channel1 != null)
+         {
+            channel1.getLock().unlock();
+         }
+      }
    }
 
    private void forceReturnChannel1()

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java	2011-08-14 07:07:04 UTC (rev 11205)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java	2011-08-14 16:00:29 UTC (rev 11206)
@@ -658,9 +658,7 @@
                                 final Queue queue,
                                 final boolean start) throws Exception
    {
-      final Topology topology = new Topology(null);
-      topology.setExecutor(executorFactory.getExecutor());
-      final ServerLocatorInternal targetLocator = new ServerLocatorImpl(topology, false, connector);
+      final ServerLocatorInternal targetLocator = new ServerLocatorImpl(clusterManagerTopology, false, connector);
 
       targetLocator.setReconnectAttempts(0);
 
@@ -687,55 +685,9 @@
       }
 
       targetLocator.disableFinalizeCheck();
+      
+      MessageFlowRecordImpl record = new MessageFlowRecordImpl(targetLocator, targetNodeID, connector, queueName, queue);
 
-      final ClusterTopologyListener listenerOnBridgeTopology = new ClusterTopologyListener()
-      {
-
-         public void nodeDown(String nodeID)
-         {
-            clusterManagerTopology.removeMember(nodeID);
-         }
-
-         public void nodeUP(String nodeID,
-                            Pair<TransportConfiguration, TransportConfiguration> connectorPair,
-                            boolean last)
-         {
-            clusterManagerTopology.addMember(nodeID, new TopologyMember(connectorPair), last);
-         }
-
-      };
-
-      final ClusterTopologyListener listenerOnMainTopology = new ClusterTopologyListener()
-      {
-         public void nodeDown(String nodeID)
-         {
-            topology.removeMember(nodeID);
-         }
-
-         public void nodeUP(String nodeID,
-                            Pair<TransportConfiguration, TransportConfiguration> connectorPair,
-                            boolean last)
-         {
-            topology.addMember(nodeID, new TopologyMember(connectorPair), last);
-         }
-
-      };
-
-      // Establish a proxy between each other topology
-      topology.addClusterTopologyListener(listenerOnBridgeTopology);
-
-      clusterManagerTopology.addClusterTopologyListener(listenerOnMainTopology);
-
-      MessageFlowRecordImpl record = new MessageFlowRecordImpl(listenerOnMainTopology,
-                                                               listenerOnBridgeTopology,
-                                                               targetLocator,
-                                                               targetNodeID,
-                                                               connector,
-                                                               queueName,
-                                                               queue);
-
-      topology.setOwner(record);
-
       ClusterConnectionBridge bridge = new ClusterConnectionBridge(this,
                                                                    manager,
                                                                    targetLocator,
@@ -777,14 +729,6 @@
       if (start)
       {
          bridge.start();
-         
-         bridge.getExecutor().execute(new Runnable(){
-            public void run()
-            {
-               topology.sendTopology(listenerOnBridgeTopology);
-               clusterManagerTopology.sendTopology(listenerOnMainTopology);
-            }
-         });
       }
    }
 
@@ -804,8 +748,6 @@
 
       private boolean disconnected = false;
 
-      private boolean sentInitialTopology = false;
-
       private final Queue queue;
 
       private final Map<SimpleString, RemoteQueueBinding> bindings = new HashMap<SimpleString, RemoteQueueBinding>();
@@ -814,13 +756,7 @@
 
       private volatile boolean firstReset = false;
 
-      private final ClusterTopologyListener listenerOnMainTopology;
-
-      private final ClusterTopologyListener listenerOnBridgeTopology;
-
-      public MessageFlowRecordImpl(final ClusterTopologyListener listenerOnMainTopology,
-                                   final ClusterTopologyListener listenerOnBridgeTopology,
-                                   final ServerLocatorInternal targetLocator,
+      public MessageFlowRecordImpl(final ServerLocatorInternal targetLocator,
                                    final String targetNodeID,
                                    final TransportConfiguration connector,
                                    final SimpleString queueName,
@@ -831,8 +767,6 @@
          this.targetNodeID = targetNodeID;
          this.connector = connector;
          this.queueName = queueName;
-         this.listenerOnMainTopology = listenerOnMainTopology;
-         this.listenerOnBridgeTopology = listenerOnBridgeTopology;
       }
 
       /* (non-Javadoc)
@@ -909,9 +843,6 @@
             log.trace("Stopping bridge " + bridge);
          }
 
-         clusterManagerTopology.removeClusterTopologyListener(listenerOnMainTopology);
-         targetLocator.getTopology().removeClusterTopologyListener(listenerOnBridgeTopology);
-
          isClosed = true;
          clearBindings();
 

Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java	2011-08-14 07:07:04 UTC (rev 11205)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java	2011-08-14 16:00:29 UTC (rev 11206)
@@ -2079,7 +2079,7 @@
             {
                ClusterTestBase.log.info("stopping server " + node);
                servers[node].stop();
-               Thread.sleep(1000);
+               Thread.sleep(500);
                ClusterTestBase.log.info("server " + node + " stopped");
             }
             catch (Exception e)



More information about the hornetq-commits mailing list