[hornetq-commits] JBoss hornetq SVN: r11205 - in branches/Branch_2_2_EAP/src/main/org/hornetq/core: server/cluster/impl and 1 other directory.
do-not-reply at jboss.org
do-not-reply at jboss.org
Sun Aug 14 03:07:05 EDT 2011
Author: clebert.suconic at jboss.com
Date: 2011-08-14 03:07:04 -0400 (Sun, 14 Aug 2011)
New Revision: 11205
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionFactoryInternal.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/Topology.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
Log:
Improvements on tests & fixes
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java 2011-08-13 17:22:40 UTC (rev 11204)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java 2011-08-14 07:07:04 UTC (rev 11205)
@@ -97,6 +97,8 @@
private TransportConfiguration backupConfig;
private ConnectorFactory connectorFactory;
+
+ private transient boolean finalizeCheck = true;
private final long callTimeout;
@@ -207,6 +209,11 @@
this.interceptors = interceptors;
}
+
+ public void disableFinalizeCheck()
+ {
+ finalizeCheck = false;
+ }
public void connect(int initialConnectAttempts, boolean failoverOnInitialConnection) throws HornetQException
{
@@ -1268,7 +1275,7 @@
}
}
- if (serverLocator.isHA() || serverLocator.isClusterConnection())
+ if (serverLocator.getTopology() != null)
{
if (isTrace)
{
@@ -1298,7 +1305,7 @@
public void finalize() throws Throwable
{
- if (!closed)
+ if (!closed && finalizeCheck)
{
log.warn("I'm closing a core ClientSessionFactory you left open. Please make sure you close all ClientSessionFactories explicitly " + "before letting them go out of scope! " +
System.identityHashCode(this));
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionFactoryInternal.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionFactoryInternal.java 2011-08-13 17:22:40 UTC (rev 11204)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionFactoryInternal.java 2011-08-14 07:07:04 UTC (rev 11205)
@@ -31,6 +31,8 @@
boolean removeFailureListener(SessionFailureListener listener);
+ void disableFinalizeCheck();
+
// for testing
int numConnections();
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java 2011-08-13 17:22:40 UTC (rev 11204)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java 2011-08-14 07:07:04 UTC (rev 11205)
@@ -1569,6 +1569,9 @@
threadPool,
scheduledThreadPool,
interceptors);
+
+ factory.disableFinalizeCheck();
+
connectors.add(new Connector(initialConnector, factory));
}
}
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/Topology.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/Topology.java 2011-08-13 17:22:40 UTC (rev 11204)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/Topology.java 2011-08-14 07:07:04 UTC (rev 11205)
@@ -34,11 +34,15 @@
public class Topology implements Serializable
{
+ private static final int BACKOF_TIMEOUT = 500;
+
private static final long serialVersionUID = -9037171688692471371L;
private final Set<ClusterTopologyListener> topologyListeners = new HashSet<ClusterTopologyListener>();
private static final Logger log = Logger.getLogger(Topology.class);
+
+ private transient HashMap<String, Long> mapBackof = new HashMap<String, Long>();
private Executor executor = null;
@@ -88,7 +92,7 @@
{
if (log.isDebugEnabled())
{
- log.debug(this + "::PPP Removing topology listener " + listener, new Exception("Trace"));
+ log.debug(this + "::Removing topology listener " + listener, new Exception("Trace"));
}
synchronized (topologyListeners)
{
@@ -102,12 +106,21 @@
synchronized (this)
{
+ Long lastTime = mapBackof.get(nodeId);
+
+ if (lastTime != null && System.currentTimeMillis() - lastTime.longValue() < BACKOF_TIMEOUT)
+ {
+ // The cluster may get in loop without this..
+ // Case one node is stll sending nodeDown while another member is sending nodeUp
+ log.debug("Node was considered down too fast, ignoring addMember on Topology");
+ return false;
+ }
+
TopologyMember currentMember = topology.get(nodeId);
if (Topology.log.isDebugEnabled())
{
Topology.log.debug(this + "::adding = " + nodeId + ":" + member.getConnector(), new Exception("trace"));
- Topology.log.debug(describe("Before:"));
}
if (currentMember == null)
@@ -152,12 +165,6 @@
if (Topology.log.isDebugEnabled())
{
- Topology.log.debug(this + "::Topology updated=" + replaced);
- Topology.log.debug(describe(this + "::After:"));
- }
-
- if (Topology.log.isDebugEnabled())
- {
Topology.log.debug(this + " Add member nodeId=" +
nodeId +
" member = " +
@@ -222,6 +229,7 @@
synchronized (this)
{
+ mapBackof.put(nodeId, new Long(System.currentTimeMillis()));
member = topology.remove(nodeId);
}
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java 2011-08-13 17:22:40 UTC (rev 11204)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java 2011-08-14 07:07:04 UTC (rev 11205)
@@ -610,7 +610,7 @@
protected void fail(final boolean permanently)
{
- log.debug(this + "::fail being called, permanently=" + permanently);
+ log.debug(this + "\n\t::fail being called, permanently=" + permanently);
if (queue != null)
{
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2011-08-13 17:22:40 UTC (rev 11204)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2011-08-14 07:07:04 UTC (rev 11205)
@@ -688,8 +688,6 @@
targetLocator.disableFinalizeCheck();
- targetLocator.connect();
-
final ClusterTopologyListener listenerOnBridgeTopology = new ClusterTopologyListener()
{
@@ -728,7 +726,13 @@
clusterManagerTopology.addClusterTopologyListener(listenerOnMainTopology);
- MessageFlowRecordImpl record = new MessageFlowRecordImpl(listenerOnMainTopology, listenerOnBridgeTopology, targetLocator, targetNodeID, connector, queueName, queue);
+ MessageFlowRecordImpl record = new MessageFlowRecordImpl(listenerOnMainTopology,
+ listenerOnBridgeTopology,
+ targetLocator,
+ targetNodeID,
+ connector,
+ queueName,
+ queue);
topology.setOwner(record);
@@ -798,8 +802,10 @@
private final SimpleString queueName;
- private boolean disconnected = false;;
+ private boolean disconnected = false;
+ private boolean sentInitialTopology = false;
+
private final Queue queue;
private final Map<SimpleString, RemoteQueueBinding> bindings = new HashMap<SimpleString, RemoteQueueBinding>();
@@ -807,9 +813,9 @@
private volatile boolean isClosed = false;
private volatile boolean firstReset = false;
-
+
private final ClusterTopologyListener listenerOnMainTopology;
-
+
private final ClusterTopologyListener listenerOnBridgeTopology;
public MessageFlowRecordImpl(final ClusterTopologyListener listenerOnMainTopology,
@@ -902,7 +908,7 @@
{
log.trace("Stopping bridge " + bridge);
}
-
+
clusterManagerTopology.removeClusterTopologyListener(listenerOnMainTopology);
targetLocator.getTopology().removeClusterTopologyListener(listenerOnBridgeTopology);
More information about the hornetq-commits
mailing list