[hornetq-commits] JBoss hornetq SVN: r11185 - in branches/Branch_2_2_EAP/src/main/org/hornetq/core: remoting/server/impl and 1 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Wed Aug 10 13:30:15 EDT 2011


Author: clebert.suconic at jboss.com
Date: 2011-08-10 13:30:15 -0400 (Wed, 10 Aug 2011)
New Revision: 11185

Modified:
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/Topology.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
Log:
tweaks on my latest changes

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/Topology.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/Topology.java	2011-08-10 16:57:38 UTC (rev 11184)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/Topology.java	2011-08-10 17:30:15 UTC (rev 11185)
@@ -49,8 +49,6 @@
     *  */
    private volatile Object owner;
 
-   private volatile Executor executor;
-
    public Topology(final Object owner)
    {
       this.owner = owner;
@@ -66,11 +64,6 @@
     */
    private final Map<String, TopologyMember> topology = new ConcurrentHashMap<String, TopologyMember>();
 
-   public void setExecutor(Executor executor)
-   {
-      this.executor = executor;
-   }
-
    public void addClusterTopologyListener(final ClusterTopologyListener listener)
    {
       if (log.isDebugEnabled())
@@ -95,7 +88,7 @@
       }
    }
 
-   public  boolean addMember(final String nodeId, final TopologyMember member, final boolean last)
+   public boolean addMember(final String nodeId, final TopologyMember member, final boolean last)
    {
       boolean replaced = false;
 
@@ -157,8 +150,7 @@
 
          if (Topology.log.isDebugEnabled())
          {
-            Topology.log.debug(this +
-                               " Add member nodeId=" +
+            Topology.log.debug(this + " Add member nodeId=" +
                                nodeId +
                                " member = " +
                                member +
@@ -169,35 +161,28 @@
          }
 
       }
-      
+
       if (replaced)
       {
          final ArrayList<ClusterTopologyListener> copy = copyListeners();
-         
 
          // Has to use a different thread otherwise we may get dead locks case the remove is coming from the channel
-         execute(new Runnable(){
-            public void run()
+         for (ClusterTopologyListener listener : copy)
+         {
+            if (Topology.log.isTraceEnabled())
             {
-               for (ClusterTopologyListener listener : copy)
-               {
-                  if (Topology.log.isTraceEnabled())
-                  {
-                     Topology.log.trace(this + " informing " + listener + " about node up = " + nodeId);
-                  }
+               Topology.log.trace(this + " informing " + listener + " about node up = " + nodeId);
+            }
 
-                  try
-                  {
-                     listener.nodeUP(nodeId, member.getConnector(), last);
-                  }
-                  catch (Throwable e)
-                  {
-                     log.warn (e.getMessage(), e);
-                  }
-               }
+            try
+            {
+               listener.nodeUP(nodeId, member.getConnector(), last);
             }
-         });
-         
+            catch (Throwable e)
+            {
+               log.warn(e.getMessage(), e);
+            }
+         }
       }
 
       return replaced;
@@ -208,7 +193,7 @@
     */
    private ArrayList<ClusterTopologyListener> copyListeners()
    {
-      ArrayList <ClusterTopologyListener> listenersCopy;
+      ArrayList<ClusterTopologyListener> listenersCopy;
       synchronized (topologyListeners)
       {
          listenersCopy = new ArrayList<ClusterTopologyListener>(topologyListeners);
@@ -219,12 +204,11 @@
    public boolean removeMember(final String nodeId)
    {
       TopologyMember member;
-      
+
       synchronized (this)
       {
          member = topology.remove(nodeId);
       }
-      
 
       if (Topology.log.isDebugEnabled())
       {
@@ -241,20 +225,14 @@
       {
          final ArrayList<ClusterTopologyListener> copy = copyListeners();
 
-         // Has to use a different thread otherwise we may get dead locks case the remove is coming from the channel
-         execute(new Runnable(){
-            public void run()
+         for (ClusterTopologyListener listener : copy)
+         {
+            if (Topology.log.isTraceEnabled())
             {
-               for (ClusterTopologyListener listener : copy)
-               {
-                  if (Topology.log.isTraceEnabled())
-                  {
-                     Topology.log.trace(this + " informing " + listener + " about node down = " + nodeId);
-                  }
-                  listener.nodeDown(nodeId);
-               }
+               Topology.log.trace(this + " informing " + listener + " about node down = " + nodeId);
             }
-         });
+            listener.nodeDown(nodeId);
+         }
       }
       return member != null;
    }
@@ -268,7 +246,7 @@
    {
       // To make sure it was updated
       addMember(nodeID, member, false);
-      
+
       ArrayList<ClusterTopologyListener> copy = copyListeners();
 
       // Now force sending it
@@ -394,18 +372,6 @@
       }
       return null;
    }
-   
-   private void execute(Runnable runnable)
-   {
-      if (executor != null)
-      {
-         executor.execute(runnable);
-      }
-      else
-      {
-         runnable.run();
-      }
-   }
 
    /* (non-Javadoc)
     * @see java.lang.Object#toString()

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java	2011-08-10 16:57:38 UTC (rev 11184)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java	2011-08-10 17:30:15 UTC (rev 11185)
@@ -162,7 +162,7 @@
       // This needs to be a different thread pool to the main thread pool especially for OIO where we may need
       // to support many hundreds of connections, but the main thread pool must be kept small for better performance
 
-      ThreadFactory tFactory = new HornetQThreadFactory("HornetQ-remoting-threads" + System.identityHashCode(this),
+      ThreadFactory tFactory = new HornetQThreadFactory("HornetQ-remoting-threads-" + server.toString() + "-" + System.identityHashCode(this),
                                                         false,
                                                         tccl);
 

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java	2011-08-10 16:57:38 UTC (rev 11184)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java	2011-08-10 17:30:15 UTC (rev 11185)
@@ -95,7 +95,7 @@
    private final long connectionTTL;
 
    private final long retryInterval;
-   
+
    private final long callTimeout;
 
    private final double retryIntervalMultiplier;
@@ -107,7 +107,7 @@
    private final boolean useDuplicateDetection;
 
    private final boolean routeWhenNoConsumers;
-   
+
    private final int confirmationWindowSize;
 
    private final Map<String, MessageFlowRecord> records = new ConcurrentHashMap<String, MessageFlowRecord>();
@@ -195,7 +195,7 @@
       this.useDuplicateDetection = useDuplicateDetection;
 
       this.routeWhenNoConsumers = routeWhenNoConsumers;
-      
+
       this.confirmationWindowSize = confirmationWindowSize;
 
       this.executorFactory = executorFactory;
@@ -221,7 +221,7 @@
       this.allowDirectConnectionsOnly = allowDirectConnectionsOnly;
 
       this.manager = manager;
-      
+
       this.callTimeout = callTimeout;
 
       this.clusterManagerTopology = clusterManagerTopology;
@@ -293,7 +293,7 @@
       this.maxRetryInterval = maxRetryInterval;
 
       this.reconnectAttempts = reconnectAttempts;
-      
+
       this.callTimeout = callTimeout;
 
       this.useDuplicateDetection = useDuplicateDetection;
@@ -353,7 +353,7 @@
       {
          return;
       }
-      
+
       if (log.isDebugEnabled())
       {
          log.debug(this + "::stopping ClusterConnection");
@@ -382,32 +382,32 @@
             {
             }
          }
+      }
 
-         if (managementService != null)
-         {
-            TypedProperties props = new TypedProperties();
-            props.putSimpleStringProperty(new SimpleString("name"), name);
-            Notification notification = new Notification(nodeUUID.toString(),
-                                                         NotificationType.CLUSTER_CONNECTION_STOPPED,
-                                                         props);
-            managementService.sendNotification(notification);
-         }
+      if (managementService != null)
+      {
+         TypedProperties props = new TypedProperties();
+         props.putSimpleStringProperty(new SimpleString("name"), name);
+         Notification notification = new Notification(nodeUUID.toString(),
+                                                      NotificationType.CLUSTER_CONNECTION_STOPPED,
+                                                      props);
+         managementService.sendNotification(notification);
+      }
 
-         executor.execute(new Runnable()
+      executor.execute(new Runnable()
+      {
+         public void run()
          {
-            public void run()
+            if (serverLocator != null)
             {
-               if (serverLocator != null)
-               {
-                  serverLocator.close();
-                  serverLocator = null;
-               }
-
+               serverLocator.close();
+               serverLocator = null;
             }
-         });
 
-         started = false;
-      }
+         }
+      });
+
+      started = false;
    }
 
    public boolean isStarted()
@@ -636,8 +636,7 @@
             {
                if (isTrace)
                {
-                  log.trace(this +
-                            " ignored nodeUp record for " +
+                  log.trace(this + " ignored nodeUp record for " +
                             connectorPair +
                             " on nodeID=" +
                             nodeID +

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java	2011-08-10 16:57:38 UTC (rev 11184)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java	2011-08-10 17:30:15 UTC (rev 11185)
@@ -378,8 +378,6 @@
          backup = false;
 
          String nodeID = server.getNodeID().toString();
-         
-         topology.setExecutor(executor);
 
          TopologyMember member = topology.getMember(nodeID);
          //swap backup as live and send it to everybody



More information about the hornetq-commits mailing list