[hornetq-commits] JBoss hornetq SVN: r11205 - in branches/Branch_2_2_EAP/src/main/org/hornetq/core: server/cluster/impl and 1 other directory.

do-not-reply at jboss.org do-not-reply at jboss.org
Sun Aug 14 03:07:05 EDT 2011


Author: clebert.suconic at jboss.com
Date: 2011-08-14 03:07:04 -0400 (Sun, 14 Aug 2011)
New Revision: 11205

Modified:
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionFactoryInternal.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/Topology.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
Log:
Improvements on tests & fixes

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java	2011-08-13 17:22:40 UTC (rev 11204)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java	2011-08-14 07:07:04 UTC (rev 11205)
@@ -97,6 +97,8 @@
    private TransportConfiguration backupConfig;
 
    private ConnectorFactory connectorFactory;
+   
+   private transient boolean finalizeCheck = true; 
 
    private final long callTimeout;
 
@@ -207,6 +209,11 @@
       this.interceptors = interceptors;
 
    }
+   
+   public void disableFinalizeCheck()
+   {
+      finalizeCheck = false;
+   }
 
    public void connect(int initialConnectAttempts, boolean failoverOnInitialConnection) throws HornetQException
    {
@@ -1268,7 +1275,7 @@
             }
          }
 
-         if (serverLocator.isHA() || serverLocator.isClusterConnection())
+         if (serverLocator.getTopology() != null)
          {
             if (isTrace)
             {
@@ -1298,7 +1305,7 @@
 
    public void finalize() throws Throwable
    {
-      if (!closed)
+      if (!closed && finalizeCheck)
       {
          log.warn("I'm closing a core ClientSessionFactory you left open. Please make sure you close all ClientSessionFactories explicitly " + "before letting them go out of scope! " +
                   System.identityHashCode(this));

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionFactoryInternal.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionFactoryInternal.java	2011-08-13 17:22:40 UTC (rev 11204)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionFactoryInternal.java	2011-08-14 07:07:04 UTC (rev 11205)
@@ -31,6 +31,8 @@
 
    boolean removeFailureListener(SessionFailureListener listener);
    
+   void disableFinalizeCheck();
+
    // for testing
 
    int numConnections();

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java	2011-08-13 17:22:40 UTC (rev 11204)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java	2011-08-14 07:07:04 UTC (rev 11205)
@@ -1569,6 +1569,9 @@
                                                                                 threadPool,
                                                                                 scheduledThreadPool,
                                                                                 interceptors);
+            
+            factory.disableFinalizeCheck();
+            
             connectors.add(new Connector(initialConnector, factory));
          }
       }

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/Topology.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/Topology.java	2011-08-13 17:22:40 UTC (rev 11204)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/Topology.java	2011-08-14 07:07:04 UTC (rev 11205)
@@ -34,11 +34,15 @@
 public class Topology implements Serializable
 {
 
+   private static final int BACKOF_TIMEOUT = 500;
+	
    private static final long serialVersionUID = -9037171688692471371L;
 
    private final Set<ClusterTopologyListener> topologyListeners = new HashSet<ClusterTopologyListener>();
 
    private static final Logger log = Logger.getLogger(Topology.class);
+   
+   private transient HashMap<String, Long> mapBackof = new HashMap<String, Long>();
 
    private Executor executor = null;
 
@@ -88,7 +92,7 @@
    {
       if (log.isDebugEnabled())
       {
-         log.debug(this + "::PPP Removing topology listener " + listener, new Exception("Trace"));
+         log.debug(this + "::Removing topology listener " + listener, new Exception("Trace"));
       }
       synchronized (topologyListeners)
       {
@@ -102,12 +106,21 @@
 
       synchronized (this)
       {
+         Long lastTime = mapBackof.get(nodeId);
+         
+         if (lastTime != null && System.currentTimeMillis() - lastTime.longValue() < BACKOF_TIMEOUT)
+         {
+            // The cluster may get in loop without this..
+            // Case one node is stll sending nodeDown while another member is sending nodeUp
+            log.debug("Node was considered down too fast, ignoring addMember on Topology");
+            return false;
+         }
+
          TopologyMember currentMember = topology.get(nodeId);
 
          if (Topology.log.isDebugEnabled())
          {
             Topology.log.debug(this + "::adding = " + nodeId + ":" + member.getConnector(), new Exception("trace"));
-            Topology.log.debug(describe("Before:"));
          }
 
          if (currentMember == null)
@@ -152,12 +165,6 @@
 
          if (Topology.log.isDebugEnabled())
          {
-            Topology.log.debug(this + "::Topology updated=" + replaced);
-            Topology.log.debug(describe(this + "::After:"));
-         }
-
-         if (Topology.log.isDebugEnabled())
-         {
             Topology.log.debug(this + " Add member nodeId=" +
                                nodeId +
                                " member = " +
@@ -222,6 +229,7 @@
 
       synchronized (this)
       {
+         mapBackof.put(nodeId, new Long(System.currentTimeMillis()));
          member = topology.remove(nodeId);
       }
 

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java	2011-08-13 17:22:40 UTC (rev 11204)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java	2011-08-14 07:07:04 UTC (rev 11205)
@@ -610,7 +610,7 @@
 
    protected void fail(final boolean permanently)
    {
-      log.debug(this + "::fail being called, permanently=" + permanently);
+      log.debug(this + "\n\t::fail being called, permanently=" + permanently);
 
       if (queue != null)
       {

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java	2011-08-13 17:22:40 UTC (rev 11204)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java	2011-08-14 07:07:04 UTC (rev 11205)
@@ -688,8 +688,6 @@
 
       targetLocator.disableFinalizeCheck();
 
-      targetLocator.connect();
-      
       final ClusterTopologyListener listenerOnBridgeTopology = new ClusterTopologyListener()
       {
 
@@ -728,7 +726,13 @@
 
       clusterManagerTopology.addClusterTopologyListener(listenerOnMainTopology);
 
-      MessageFlowRecordImpl record = new MessageFlowRecordImpl(listenerOnMainTopology, listenerOnBridgeTopology, targetLocator, targetNodeID, connector, queueName, queue);
+      MessageFlowRecordImpl record = new MessageFlowRecordImpl(listenerOnMainTopology,
+                                                               listenerOnBridgeTopology,
+                                                               targetLocator,
+                                                               targetNodeID,
+                                                               connector,
+                                                               queueName,
+                                                               queue);
 
       topology.setOwner(record);
 
@@ -798,8 +802,10 @@
 
       private final SimpleString queueName;
 
-      private boolean disconnected = false;;
+      private boolean disconnected = false;
 
+      private boolean sentInitialTopology = false;
+
       private final Queue queue;
 
       private final Map<SimpleString, RemoteQueueBinding> bindings = new HashMap<SimpleString, RemoteQueueBinding>();
@@ -807,9 +813,9 @@
       private volatile boolean isClosed = false;
 
       private volatile boolean firstReset = false;
-      
+
       private final ClusterTopologyListener listenerOnMainTopology;
-      
+
       private final ClusterTopologyListener listenerOnBridgeTopology;
 
       public MessageFlowRecordImpl(final ClusterTopologyListener listenerOnMainTopology,
@@ -902,7 +908,7 @@
          {
             log.trace("Stopping bridge " + bridge);
          }
-         
+
          clusterManagerTopology.removeClusterTopologyListener(listenerOnMainTopology);
          targetLocator.getTopology().removeClusterTopologyListener(listenerOnBridgeTopology);
 



More information about the hornetq-commits mailing list