[hornetq-commits] JBoss hornetq SVN: r11104 - in branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core: server/cluster/impl and 1 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Wed Aug 3 16:44:02 EDT 2011


Author: clebert.suconic at jboss.com
Date: 2011-08-03 16:44:02 -0400 (Wed, 03 Aug 2011)
New Revision: 11104

Modified:
   branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java
   branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
   branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/impl/QueueImpl.java
Log:
test fixes

Modified: branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java	2011-08-03 20:06:41 UTC (rev 11103)
+++ branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java	2011-08-03 20:44:02 UTC (rev 11104)
@@ -118,7 +118,6 @@
                {
                   public void nodeUP(String nodeID, Pair<TransportConfiguration, TransportConfiguration> connectorPair, boolean last)
                   {
-                      Logger.getLogger(this.getClass()).info("ZZZ send nodeDown on " + this);
                       channel0.send(new ClusterTopologyChangeMessage(nodeID, connectorPair, last));
                   }
                   

Modified: branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java	2011-08-03 20:06:41 UTC (rev 11103)
+++ branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java	2011-08-03 20:44:02 UTC (rev 11104)
@@ -383,14 +383,10 @@
          {
             public void run()
             {
-               synchronized (ClusterConnectionImpl.this)
+               if (serverLocator != null)
                {
-                  if (serverLocator != null)
-                  {
-                     serverLocator.removeClusterTopologyListener(ClusterConnectionImpl.this);
-                     serverLocator.close();
-                     serverLocator = null;
-                  }
+                  serverLocator.close();
+                  serverLocator = null;
                }
 
             }
@@ -549,55 +545,55 @@
                       final Pair<TransportConfiguration, TransportConfiguration> connectorPair,
                       final boolean last)
    {
+      if (log.isDebugEnabled())
+      {
+         String ClusterTestBase = "receiving nodeUP for nodeID=";
+         log.debug(this + ClusterTestBase + nodeID + " connectionPair=" + connectorPair);
+      }
+      // discard notifications about ourselves unless its from our backup
 
-      // we propagate the node notifications to all cluster topology listeners
-      server.getClusterManager().notifyNodeUp(nodeID, connectorPair, last, false);
-
-      synchronized (this)
+      if (nodeID.equals(nodeUUID.toString()))
       {
-         if (serverLocator == null)
+         if (connectorPair.b != null)
          {
-            log.debug("ClusterConnection nodeID=" + nodeID + " has already been stopped, ignoring call");
-            return;
+            server.getClusterManager().notifyNodeUp(nodeID, connectorPair, last, false);
          }
+         return;
+      }
 
-         if (log.isDebugEnabled())
-         {
-            String ClusterTestBase = "receiving nodeUP for nodeID=";
-            log.debug(this + ClusterTestBase + nodeID + " connectionPair=" + connectorPair);
-         }
-         // discard notifications about ourselves unless its from our backup
+      // we propagate the node notifications to all cluster topology listeners
+      server.getClusterManager().notifyNodeUp(nodeID, connectorPair, last, false);
 
-         if (nodeID.equals(nodeUUID.toString()))
-         {
-            if (connectorPair.b != null)
-            {
-               server.getClusterManager().notifyNodeUp(nodeID, connectorPair, last, false);
-            }
-            return;
-         }
+      // if the node is more than 1 hop away, we do not create a bridge for direct cluster connection
+      if (allowDirectConnectionsOnly && !allowableConnections.contains(connectorPair.a))
+      {
+         return;
+      }
 
-         // if the node is more than 1 hop away, we do not create a bridge for direct cluster connection
-         if (allowDirectConnectionsOnly && !allowableConnections.contains(connectorPair.a))
+      // FIXME required to prevent cluster connections w/o discovery group
+      // and empty static connectors to create bridges... ulgy!
+      if (serverLocator == null)
+      {
+         log.warn("ServerLocator==null FixME!!!!!");
+         return;
+      }
+      /*we dont create bridges to backups*/
+      if (connectorPair.a == null)
+      {
+         if (isTrace)
          {
-            return;
+            log.trace(this + " ignoring call with nodeID=" +
+                      nodeID +
+                      ", connectorPair=" +
+                      connectorPair +
+                      ", last=" +
+                      last);
          }
+         return;
+      }
 
-         /*we dont create bridges to backups*/
-         if (connectorPair.a == null)
-         {
-            if (isTrace)
-            {
-               log.trace(this + " ignoring call with nodeID=" +
-                         nodeID +
-                         ", connectorPair=" +
-                         connectorPair +
-                         ", last=" +
-                         last);
-            }
-            return;
-         }
-
+      synchronized (records)
+      {
          try
          {
             MessageFlowRecord record = records.get(nodeID);
@@ -645,22 +641,17 @@
          }
          catch (Exception e)
          {
-            log.error(this + "::Failed to update topology", e);
+            log.error("Failed to update topology", e);
          }
       }
    }
 
-   private synchronized void createNewRecord(final String targetNodeID,
-                                             final TransportConfiguration connector,
-                                             final SimpleString queueName,
-                                             final Queue queue,
-                                             final boolean start) throws Exception
+   private void createNewRecord(final String targetNodeID,
+                                final TransportConfiguration connector,
+                                final SimpleString queueName,
+                                final Queue queue,
+                                final boolean start) throws Exception
    {
-      if (serverLocator != null)
-      {
-         return;
-      }
-
       MessageFlowRecordImpl record = new MessageFlowRecordImpl(targetNodeID, connector, queueName, queue);
 
       Bridge bridge = createClusteredBridge(record);
@@ -690,7 +681,8 @@
 
       final ServerLocatorInternal targetLocator = new ServerLocatorImpl(this.clusterManagerTopology,
                                                                         false,
-                                                                        server.getThreadPool(), server.getScheduledPool(), 
+                                                                        server.getThreadPool(),
+                                                                        server.getScheduledPool(),
                                                                         record.getConnector());
 
       targetLocator.setReconnectAttempts(0);
@@ -1307,7 +1299,11 @@
             {
                log.debug(ClusterConnectionImpl.this + "Creating a serverLocator for " + Arrays.toString(tcConfigs));
             }
-            return new ServerLocatorImpl(clusterManagerTopology, true, server.getThreadPool(), server.getScheduledPool(), tcConfigs);
+            return new ServerLocatorImpl(clusterManagerTopology,
+                                         true,
+                                         server.getThreadPool(),
+                                         server.getScheduledPool(),
+                                         tcConfigs);
          }
          else
          {
@@ -1337,7 +1333,11 @@
 
       public ServerLocatorInternal createServerLocator()
       {
-         return new ServerLocatorImpl(clusterManagerTopology, true, server.getThreadPool(), server.getScheduledPool(), dg);
+         return new ServerLocatorImpl(clusterManagerTopology,
+                                      true,
+                                      server.getThreadPool(),
+                                      server.getScheduledPool(),
+                                      dg);
       }
    }
 }

Modified: branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/impl/QueueImpl.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/impl/QueueImpl.java	2011-08-03 20:06:41 UTC (rev 11103)
+++ branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/impl/QueueImpl.java	2011-08-03 20:44:02 UTC (rev 11104)
@@ -24,6 +24,7 @@
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Executor;
+import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
@@ -426,7 +427,13 @@
    
    public void deliverAsync()
    {
-      getExecutor().execute(deliverRunner);
+      try
+      {
+         getExecutor().execute(deliverRunner);
+      }
+      catch (RejectedExecutionException ignored)
+      {
+      }
    }
 
    public void close() throws Exception



More information about the hornetq-commits mailing list