Author: clebert.suconic(a)jboss.com
Date: 2011-07-27 21:32:47 -0400 (Wed, 27 Jul 2011)
New Revision: 11060
Modified:
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
Log:
tweaks
Modified:
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
===================================================================
---
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java 2011-07-27
20:03:39 UTC (rev 11059)
+++
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java 2011-07-28
01:32:47 UTC (rev 11060)
@@ -46,6 +46,7 @@
import org.hornetq.core.cluster.impl.DiscoveryGroupImpl;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.remoting.FailureListener;
+import org.hornetq.utils.ConcurrentHashSet;
import org.hornetq.utils.HornetQThreadFactory;
import org.hornetq.utils.UUIDGenerator;
@@ -65,10 +66,10 @@
private boolean finalizeCheck = true;
private boolean clusterConnection;
-
+
private String identity;
- private final Set<ClusterTopologyListener> topologyListeners = new
HashSet<ClusterTopologyListener>();
+ private final Set<ClusterTopologyListener> topologyListeners = new
ConcurrentHashSet<ClusterTopologyListener>();
private Set<ClientSessionFactory> factories = new
HashSet<ClientSessionFactory>();
@@ -159,7 +160,7 @@
private final List<Interceptor> interceptors = new
CopyOnWriteArrayList<Interceptor>();
private static ExecutorService globalThreadPool;
-
+
private Executor startExecutor;
private static ScheduledExecutorService globalScheduledThreadPool;
@@ -476,7 +477,7 @@
public void start(Executor executor) throws Exception
{
initialise();
-
+
this.startExecutor = executor;
executor.execute(new Runnable()
@@ -650,7 +651,7 @@
{
long toWait = 30000;
long start = System.currentTimeMillis();
- while (!receivedTopology && toWait > 0)
+ while (!ServerLocatorImpl.this.closed &&
!ServerLocatorImpl.this.closing && !receivedTopology && toWait > 0)
{
// Now wait for the topology
@@ -674,12 +675,14 @@
throw new HornetQException(HornetQException.CONNECTION_TIMEDOUT,
"Timed out waiting to receive cluster
topology");
}
+
}
addFactory(factory);
return factory;
}
+
}
public boolean isHA()
@@ -1037,7 +1040,7 @@
throw new IllegalStateException("Cannot set attribute on SessionFactory
after it has been used");
}
}
-
+
public void setIdentity(String identity)
{
this.identity = identity;
@@ -1107,7 +1110,7 @@
if (log.isDebugEnabled())
{
- log.debug("YYY " + this + " is calling close", new Exception
("trace"));
+ log.debug("YYY " + this + " is calling close", new
Exception("trace"));
}
closing = true;
@@ -1188,7 +1191,7 @@
}
return;
}
-
+
if (log.isDebugEnabled())
{
log.debug("XXX YYY nodeDown " + this + " nodeID=" + nodeID +
" as being down", new Exception("trace"));
@@ -1229,7 +1232,11 @@
{
if (log.isDebugEnabled())
{
- log.debug(this + "::Ignoring notifyNodeUp for " + nodeID + "
connectorPair=" + connectorPair + ", since ha=false and
clusterConnection=false");
+ log.debug(this + "::Ignoring notifyNodeUp for " +
+ nodeID +
+ " connectorPair=" +
+ connectorPair +
+ ", since ha=false and clusterConnection=false");
}
return;
}
@@ -1277,9 +1284,11 @@
@Override
public String toString()
{
- if (clusterConnection)
+ if (identity != null)
{
- return "ServerLocatorImpl (clusterConnection identity=" + identity +
") [initialConnectors=" + Arrays.toString(initialConnectors) +
+ return "ServerLocatorImpl (identity=" + identity +
+ ") [initialConnectors=" +
+ Arrays.toString(initialConnectors) +
", discoveryGroupConfiguration=" +
discoveryGroupConfiguration +
"]";
@@ -1444,10 +1453,14 @@
}
}
});
-
+
if (log.isDebugEnabled())
{
- log.debug("XXX Returning " + csf + " after "
+ retryNumber + " retries on StaticConnector " + ServerLocatorImpl.this);
+ log.debug("XXX Returning " + csf +
+ " after " +
+ retryNumber +
+ " retries on StaticConnector " +
+ ServerLocatorImpl.this);
}
return csf;
Modified:
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
===================================================================
---
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java 2011-07-27
20:03:39 UTC (rev 11059)
+++
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java 2011-07-28
01:32:47 UTC (rev 11060)
@@ -625,7 +625,7 @@
BridgeImpl.log.debug("Connecting " + this + " to its destination
[" + nodeUUID.toString() + "], csf=" + this.csf);
retryCount++;
-
+
try
{
if (csf == null || csf.isClosed())
@@ -712,12 +712,15 @@
// We are not going to count this one as a retry
retryCount--;
- scheduleRetryConnectFixedTimeout(100);
+ scheduleRetryConnectFixedTimeout(this.retryInterval);
return;
}
else
{
- BridgeImpl.log.warn("Bridge " + this + " is unable to connect
to destination. Retrying", e);
+ if (log.isDebugEnabled())
+ {
+ log.debug("Bridge " + this + " is unable to connect to
destination. Retrying", e);
+ }
}
}
catch (Exception e)
Modified:
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
===================================================================
---
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2011-07-27
20:03:39 UTC (rev 11059)
+++
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2011-07-28
01:32:47 UTC (rev 11060)
@@ -24,6 +24,7 @@
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
+import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import org.hornetq.api.core.DiscoveryGroupConfiguration;
@@ -31,9 +32,7 @@
import org.hornetq.api.core.SimpleString;
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.api.core.client.ClientMessage;
-import org.hornetq.api.core.client.ClusterTopologyListener;
import org.hornetq.api.core.client.HornetQClient;
-import org.hornetq.api.core.client.ServerLocator;
import org.hornetq.api.core.management.ManagementHelper;
import org.hornetq.api.core.management.NotificationType;
import org.hornetq.core.client.impl.ServerLocatorInternal;
@@ -498,9 +497,9 @@
{
log.error("Failed to close flow record", e);
}
+
+ server.getClusterManager().notifyNodeDown(nodeID);
}
-
- server.getClusterManager().notifyNodeDown(nodeID);
}
@@ -538,6 +537,7 @@
// and empty static connectors to create bridges... ulgy!
if (serverLocator == null)
{
+ log.warn("ServerLocator==null FixME!!!!!");
return;
}
/*we dont create bridges to backups*/
@@ -633,7 +633,7 @@
protected Bridge createClusteredBridge(MessageFlowRecordImpl record) throws Exception
{
- ServerLocatorInternal targetLocator =
(ServerLocatorInternal)HornetQClient.createServerLocatorWithoutHA(record.getConnector());
+ final ServerLocatorInternal targetLocator =
(ServerLocatorInternal)HornetQClient.createServerLocatorWithoutHA(record.getConnector());
targetLocator.setReconnectAttempts(0);
@@ -656,8 +656,6 @@
targetLocator.setClusterTransportConfiguration(serverLocator.getClusterTransportConfiguration());
- targetLocator.addClusterTopologyListener(this);
-
if(retryInterval > 0)
{
targetLocator.setRetryInterval(retryInterval);
Modified:
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
===================================================================
---
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java 2011-07-27
20:03:39 UTC (rev 11059)
+++
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java 2011-07-28
01:32:47 UTC (rev 11060)
@@ -265,7 +265,7 @@
return;
}
- log.info("XXX " + this + "::removing nodeID=" + nodeID);
+ log.debug("XXX " + this + "::removing nodeID=" + nodeID, new
Exception ("trace"));
boolean removed = topology.removeMember(nodeID);