[hornetq-commits] JBoss hornetq SVN: r10894 - in branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core: server/cluster/impl and 1 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Tue Jun 28 20:59:58 EDT 2011


Author: clebert.suconic at jboss.com
Date: 2011-06-28 20:59:58 -0400 (Tue, 28 Jun 2011)
New Revision: 10894

Modified:
   branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
   branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
   branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/Topology.java
   branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionBridge.java
   branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
   branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
   branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
Log:
Tweaks on my branch

Modified: branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java	2011-06-28 15:21:14 UTC (rev 10893)
+++ branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java	2011-06-29 00:59:58 UTC (rev 10894)
@@ -1245,7 +1245,7 @@
             }
          }
 
-         if (serverLocator.isHA())
+         if (serverLocator.isHA() || serverLocator.isClusterConnection())
          {
             if (isDebug)
             {

Modified: branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java	2011-06-28 15:21:14 UTC (rev 10893)
+++ branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java	2011-06-29 00:59:58 UTC (rev 10894)
@@ -76,7 +76,7 @@
 
    private StaticConnector staticConnector = new StaticConnector();
 
-   private final Topology topology = new Topology();
+   private final Topology topology = new Topology(this);
 
    private Pair<TransportConfiguration, TransportConfiguration>[] topologyArray;
 

Modified: branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/Topology.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/Topology.java	2011-06-28 15:21:14 UTC (rev 10893)
+++ branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/Topology.java	2011-06-29 00:59:58 UTC (rev 10894)
@@ -37,6 +37,21 @@
    
 
    private static final Logger log = Logger.getLogger(Topology.class);
+   
+   /** Used to debug operations.
+    * 
+    *  Someone may argue this is not needed. But it's impossible to debg anything related to topology without knowing what node
+    *  or what object missed a Topology update.
+    *  
+    *  Hence I added some information to locate debugging here. 
+    *  */
+   private final Object owner;
+   
+   
+   public Topology(final Object owner)
+   {
+      this.owner = owner;
+   }
 
    /*
     * topology describes the other cluster nodes that this server knows about:
@@ -54,7 +69,7 @@
       TopologyMember currentMember = topology.get(nodeId);
       if (debug)
       {
-         log.debug("adding = " + nodeId + ":" + member.getConnector());
+         log.debug(this + "::adding = " + nodeId + ":" + member.getConnector(), new Exception ("trace"));
          log.debug("before----------------------------------");
          log.debug(describe());
       }
@@ -87,7 +102,7 @@
       }
       if(debug)
       {
-         log.debug("Topology updated=" + replaced);
+         log.debug(this + "::Topology updated=" + replaced);
          log.debug(describe());
       }
       return replaced;
@@ -192,4 +207,21 @@
    {
       debug = b;
    }
+
+   /* (non-Javadoc)
+    * @see java.lang.Object#toString()
+    */
+   @Override
+   public String toString()
+   {
+      if (owner == null)
+      {
+         return super.toString();
+      }
+      else
+      {
+         return "Topology [owner=" + owner + "]";
+      }
+   }
+   
 }

Modified: branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionBridge.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionBridge.java	2011-06-28 15:21:14 UTC (rev 10893)
+++ branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionBridge.java	2011-06-29 00:59:58 UTC (rev 10894)
@@ -118,8 +118,6 @@
             activated,
             storageManager);
 
-      System.out.println("ClusterConnectionBridge");
-
       this.discoveryLocator = discoveryLocator;
 
       idsHeaderName = MessageImpl.HDR_ROUTE_TO_IDS.concat(name);
@@ -134,6 +132,11 @@
 
       // we need to disable DLQ check on the clustered bridges
       queue.setInternalQueue(true);
+      
+      if (log.isDebugEnabled())
+      {
+         log.debug("Setting up bridge between " + clusterConnection.getConnector() + " and " + targetLocator, new Exception ("trace"));
+      }
    }
 
    @Override

Modified: branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java	2011-06-28 15:21:14 UTC (rev 10893)
+++ branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java	2011-06-29 00:59:58 UTC (rev 10894)
@@ -613,7 +613,7 @@
    protected Bridge createClusteredBridge(MessageFlowRecordImpl record) throws Exception
    {
       
-      ServerLocator targetLocator = HornetQClient.createServerLocatorWithoutHA(record.getConnector());
+      ServerLocatorInternal targetLocator = (ServerLocatorInternal)HornetQClient.createServerLocatorWithoutHA(record.getConnector());
       
       targetLocator.setReconnectAttempts(0);
 
@@ -625,6 +625,11 @@
       targetLocator.setConfirmationWindowSize(serverLocator.getConfirmationWindowSize());
       targetLocator.setBlockOnDurableSend(!useDuplicateDetection);
       targetLocator.setBlockOnNonDurableSend(!useDuplicateDetection);
+      targetLocator.setClusterConnection(true);
+      
+      targetLocator.setNodeID(serverLocator.getNodeID());
+      
+      targetLocator.setClusterTransportConfiguration(serverLocator.getClusterTransportConfiguration());
 
       if(retryInterval > 0)
       {

Modified: branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java	2011-06-28 15:21:14 UTC (rev 10893)
+++ branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java	2011-06-29 00:59:58 UTC (rev 10894)
@@ -101,7 +101,7 @@
 
    private Set<ClusterTopologyListener> topologyListeners = new ConcurrentHashSet<ClusterTopologyListener>();
 
-   private Topology topology = new Topology();
+   private Topology topology = new Topology(this);
 
    private volatile ServerLocatorInternal backupServerLocator;
 
@@ -164,6 +164,11 @@
       return str.toString();
    }
 
+   public String toString()
+   {
+      return "ClusterManagerImpl[server=" + server + "]";
+   }
+   
    public synchronized void start() throws Exception
    {
       if (started)
@@ -327,8 +332,15 @@
    public void addClusterTopologyListener(final ClusterTopologyListener listener, final boolean clusterConnection)
    {
       topologyListeners.add(listener);
+
       // We now need to send the current topology to the client
-      topology.sendTopology(listener);
+      executor.execute(new Runnable(){
+         public void run()
+         {
+            topology.sendTopology(listener);
+            
+         }
+      });
    }
 
    public void removeClusterTopologyListener(final ClusterTopologyListener listener,

Modified: branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java	2011-06-28 15:21:14 UTC (rev 10893)
+++ branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java	2011-06-29 00:59:58 UTC (rev 10894)
@@ -500,9 +500,9 @@
                nodeManager.interrupt();
 
                backupActivationThread.interrupt();
+               
+               backupActivationThread.join(500);
 
-               // TODO: do we really need this?
-               Thread.sleep(1000);
             }
 
             if (System.currentTimeMillis() - start >= timeout)



More information about the hornetq-commits mailing list