[hornetq-commits] JBoss hornetq SVN: r11102 - in branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq: core/protocol/core/impl and 4 other directories.
do-not-reply at jboss.org
do-not-reply at jboss.org
Wed Aug 3 15:53:18 EDT 2011
Author: clebert.suconic at jboss.com
Date: 2011-08-03 15:53:17 -0400 (Wed, 03 Aug 2011)
New Revision: 11102
Modified:
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/Topology.java
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/HornetQServer.java
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/utils/HornetQThreadFactory.java
Log:
tweaks on code (fixing tests)
Modified: branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java 2011-08-03 10:47:25 UTC (rev 11101)
+++ branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java 2011-08-03 19:53:17 UTC (rev 11102)
@@ -21,8 +21,8 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
-import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
@@ -117,7 +117,7 @@
private final ExecutorFactory orderedExecutorFactory;
- private final ExecutorService threadPool;
+ private final Executor threadPool;
private final ScheduledExecutorService scheduledThreadPool;
@@ -168,7 +168,7 @@
final double retryIntervalMultiplier,
final long maxRetryInterval,
final int reconnectAttempts,
- final ExecutorService threadPool,
+ final Executor threadPool,
final ScheduledExecutorService scheduledThreadPool,
final List<Interceptor> interceptors)
{
@@ -1402,7 +1402,7 @@
if (log.isTraceEnabled())
{
- log.trace("XXX Disconnect being called on client:" + msg, new Exception ("trace"));
+ log.trace("ZZZ Disconnect being called on client:" + msg + " server locator = " + serverLocator, new Exception ("trace"));
}
closeExecutor.execute(new Runnable()
@@ -1414,7 +1414,7 @@
SimpleString nodeID = msg.getNodeID();
if (log.isTraceEnabled())
{
- log.trace("XXX YYY notifyDown nodeID=" + msg.getNodeID() + " on serverLocator=" + serverLocator);
+ log.trace("ZZZ notifyDown nodeID=" + msg.getNodeID() + " on serverLocator=" + serverLocator + " csf created at ", ClientSessionFactoryImpl.this.e);
}
if (nodeID != null)
{
@@ -1443,11 +1443,11 @@
{
if (isDebug)
{
- log.debug("Node " + topMessage.getNodeID() +
+ log.debug("ZZZ Node " + topMessage.getNodeID() +
" going up, connector = " +
topMessage.getPair() +
", isLast=" +
- topMessage.isLast());
+ topMessage.isLast() + " csf created at\nserverLocator=" + serverLocator, ClientSessionFactoryImpl.this.e);
}
serverLocator.notifyNodeUp(topMessage.getNodeID(), topMessage.getPair(), topMessage.isLast());
}
Modified: branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java 2011-08-03 10:47:25 UTC (rev 11101)
+++ branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java 2011-08-03 19:53:17 UTC (rev 11102)
@@ -84,6 +84,9 @@
private boolean receivedTopology;
private boolean compressLargeMessage;
+
+ // if the system should shutdown the pool when shutting down
+ private transient boolean shutdownPool;
private ExecutorService threadPool;
@@ -249,6 +252,11 @@
private void setThreadPools()
{
+ if (threadPool != null)
+ {
+ return;
+ }
+ else
if (useGlobalPools)
{
threadPool = getGlobalThreadPool();
@@ -257,6 +265,8 @@
}
else
{
+ this.shutdownPool = true;
+
ThreadFactory factory = new HornetQThreadFactory("HornetQ-client-factory-threads-" + System.identityHashCode(this),
true,
getThisClassLoader());
@@ -359,12 +369,19 @@
private ServerLocatorImpl(final Topology topology,
final boolean useHA,
+ final ExecutorService threadPool,
+ final ScheduledExecutorService scheduledExecutor,
final DiscoveryGroupConfiguration discoveryGroupConfiguration,
final TransportConfiguration[] transportConfigs)
{
e.fillInStackTrace();
+ this.scheduledThreadPool = scheduledExecutor;
+
+ this.threadPool = threadPool;
+
this.topology = topology;
+
this.ha = useHA;
this.discoveryGroupConfiguration = discoveryGroupConfiguration;
@@ -442,7 +459,7 @@
*/
public ServerLocatorImpl(final boolean useHA, final DiscoveryGroupConfiguration groupConfiguration)
{
- this(new Topology(null), useHA, groupConfiguration, null);
+ this(new Topology(null), useHA, null, null, groupConfiguration, null);
topology.setOwner(this);
}
@@ -453,7 +470,7 @@
*/
public ServerLocatorImpl(final boolean useHA, final TransportConfiguration... transportConfigs)
{
- this(new Topology(null), useHA, null, transportConfigs);
+ this(new Topology(null), useHA, null, null, null, transportConfigs);
topology.setOwner(this);
}
@@ -463,9 +480,10 @@
* @param discoveryAddress
* @param discoveryPort
*/
- public ServerLocatorImpl(final Topology topology, final boolean useHA, final DiscoveryGroupConfiguration groupConfiguration)
+ public ServerLocatorImpl(final Topology topology, final boolean useHA, final ExecutorService threadPool, final ScheduledExecutorService scheduledExecutor, final DiscoveryGroupConfiguration groupConfiguration)
{
- this(topology, useHA, groupConfiguration, null);
+ this(topology, useHA, threadPool, scheduledExecutor, groupConfiguration, null);
+
}
/**
@@ -473,9 +491,9 @@
*
* @param transportConfigs
*/
- public ServerLocatorImpl(final Topology topology, final boolean useHA, final TransportConfiguration... transportConfigs)
+ public ServerLocatorImpl(final Topology topology, final boolean useHA, final ExecutorService threadPool, final ScheduledExecutorService scheduledExecutor, final TransportConfiguration... transportConfigs)
{
- this(topology, useHA, null, transportConfigs);
+ this(topology, useHA, threadPool, scheduledExecutor, null, transportConfigs);
}
private TransportConfiguration selectConnector()
@@ -1163,7 +1181,7 @@
factories.clear();
- if (!useGlobalPools)
+ if (shutdownPool)
{
if (threadPool != null)
{
@@ -1264,7 +1282,7 @@
TopologyMember actMember = topology.getMember(nodeID);
- if (actMember.getConnector().a != null && actMember.getConnector().b != null)
+ if (actMember != null && actMember.getConnector().a != null && actMember.getConnector().b != null)
{
for (ClientSessionFactory factory : factories)
{
Modified: branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/Topology.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/Topology.java 2011-08-03 10:47:25 UTC (rev 11101)
+++ branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/Topology.java 2011-08-03 19:53:17 UTC (rev 11102)
@@ -55,7 +55,7 @@
public Topology(final Object owner)
{
this.owner = owner;
- Topology.log.debug("ZZZ III Topology@" + Integer.toHexString(System.identityHashCode(this)) + " CREATE",
+ Topology.log.debug("Topology@" + Integer.toHexString(System.identityHashCode(this)) + " CREATE",
new Exception("trace")); // Delete this line
}
@@ -115,7 +115,7 @@
replaced = true;
if (Topology.log.isDebugEnabled())
{
- Topology.log.debug("ZZZ " + this +
+ Topology.log.debug("Add " + this +
" MEMBER WAS NULL, Add member nodeId=" +
nodeId +
" member = " +
@@ -158,7 +158,7 @@
if (Topology.log.isDebugEnabled())
{
- Topology.log.debug("ZZZ " + this +
+ Topology.log.debug(this +
" Add member nodeId=" +
nodeId +
" member = " +
@@ -178,10 +178,17 @@
{
if (Topology.log.isTraceEnabled())
{
- Topology.log.trace("XXX ZZZ " + this + " informing " + listener + " about node up = " + nodeId);
+ Topology.log.trace(this + " informing " + listener + " about node up = " + nodeId);
}
- listener.nodeUP(nodeId, member.getConnector(), last);
+ try
+ {
+ listener.nodeUP(nodeId, member.getConnector(), last);
+ }
+ catch (Throwable e)
+ {
+ log.warn (e.getMessage(), e);
+ }
}
}
@@ -209,7 +216,19 @@
{
member = topology.remove(nodeId);
}
+
+ if (Topology.log.isDebugEnabled())
+ {
+ Topology.log.debug("ZZZ removeMember " + this +
+ " removing nodeID=" +
+ nodeId +
+ ", result=" +
+ member +
+ ", size = " +
+ topology.size(), new Exception("trace"));
+ }
+
if (member != null)
{
ArrayList<ClusterTopologyListener> copy = copyListeners();
@@ -218,23 +237,11 @@
{
if (Topology.log.isTraceEnabled())
{
- Topology.log.trace("XXX ZZZ " + this + " informing " + listener + " about node down = " + nodeId);
+ Topology.log.trace(this + " informing " + listener + " about node down = " + nodeId);
}
listener.nodeDown(nodeId);
}
}
-
- if (Topology.log.isDebugEnabled())
- {
- Topology.log.debug("ZZZ " + this +
- " removing nodeID=" +
- nodeId +
- ", result=" +
- member +
- ", size = " +
- topology.size(), new Exception("trace"));
- }
-
return member != null;
}
Modified: branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java 2011-08-03 10:47:25 UTC (rev 11101)
+++ branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java 2011-08-03 19:53:17 UTC (rev 11102)
@@ -118,6 +118,7 @@
{
public void nodeUP(String nodeID, Pair<TransportConfiguration, TransportConfiguration> connectorPair, boolean last)
{
+ Logger.getLogger(this.getClass()).info("ZZZ send nodeDown on " + this);
channel0.send(new ClusterTopologyChangeMessage(nodeID, connectorPair, last));
}
Modified: branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/HornetQServer.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/HornetQServer.java 2011-08-03 10:47:25 UTC (rev 11101)
+++ branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/HornetQServer.java 2011-08-03 19:53:17 UTC (rev 11102)
@@ -15,6 +15,7 @@
import java.util.List;
import java.util.Set;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import javax.management.MBeanServer;
@@ -152,6 +153,8 @@
ScheduledExecutorService getScheduledPool();
+ ExecutorService getThreadPool();
+
ExecutorFactory getExecutorFactory();
void setGroupingHandler(GroupingHandler groupingHandler);
Modified: branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2011-08-03 10:47:25 UTC (rev 11101)
+++ branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2011-08-03 19:53:17 UTC (rev 11102)
@@ -71,13 +71,13 @@
public class ClusterConnectionImpl implements ClusterConnection
{
private static final Logger log = Logger.getLogger(ClusterConnectionImpl.class);
-
+
private static final boolean isTrace = log.isTraceEnabled();
private final ExecutorFactory executorFactory;
-
+
private final Topology clusterManagerTopology;
-
+
private final Executor executor;
private final HornetQServer server;
@@ -91,15 +91,15 @@
private final SimpleString address;
private final long clientFailureCheckPeriod;
-
+
private final long connectionTTL;
-
+
private final long retryInterval;
-
+
private final double retryIntervalMultiplier;
-
+
private final long maxRetryInterval;
-
+
private final int reconnectAttempts;
private final boolean useDuplicateDetection;
@@ -125,15 +125,15 @@
private final ClusterConnector clusterConnector;
private ServerLocatorInternal serverLocator;
-
+
private final TransportConfiguration connector;
private final boolean allowDirectConnectionsOnly;
private final Set<TransportConfiguration> allowableConnections = new HashSet<TransportConfiguration>();
-
+
private final ClusterManagerImpl manager;
-
+
public ClusterConnectionImpl(final ClusterManagerImpl manager,
final Topology clusterManagerTopology,
final TransportConfiguration[] tcConfigs,
@@ -176,15 +176,15 @@
this.address = address;
this.clientFailureCheckPeriod = clientFailureCheckPeriod;
-
+
this.connectionTTL = connectionTTL;
-
+
this.retryInterval = retryInterval;
-
+
this.retryIntervalMultiplier = retryIntervalMultiplier;
-
+
this.maxRetryInterval = maxRetryInterval;
-
+
this.reconnectAttempts = reconnectAttempts;
this.useDuplicateDetection = useDuplicateDetection;
@@ -192,7 +192,7 @@
this.routeWhenNoConsumers = routeWhenNoConsumers;
this.executorFactory = executorFactory;
-
+
this.executor = executorFactory.getExecutor();
this.server = server;
@@ -212,9 +212,9 @@
this.clusterPassword = clusterPassword;
this.allowDirectConnectionsOnly = allowDirectConnectionsOnly;
-
+
this.manager = manager;
-
+
this.clusterManagerTopology = clusterManagerTopology;
clusterConnector = new StaticClusterConnector(tcConfigs);
@@ -223,7 +223,7 @@
{
// a cluster connection will connect to other nodes only if they are directly connected
// through a static list of connectors or broadcasting using UDP.
- if(allowDirectConnectionsOnly)
+ if (allowDirectConnectionsOnly)
{
allowableConnections.addAll(Arrays.asList(tcConfigs));
}
@@ -237,12 +237,12 @@
final TransportConfiguration connector,
final SimpleString name,
final SimpleString address,
- final long clientFailureCheckPeriod,
- final long connectionTTL,
- final long retryInterval,
- final double retryIntervalMultiplier,
- final long maxRetryInterval,
- final int reconnectAttempts,
+ final long clientFailureCheckPeriod,
+ final long connectionTTL,
+ final long retryInterval,
+ final double retryIntervalMultiplier,
+ final long maxRetryInterval,
+ final int reconnectAttempts,
final boolean useDuplicateDetection,
final boolean routeWhenNoConsumers,
final int confirmationWindowSize,
@@ -273,15 +273,15 @@
this.address = address;
this.clientFailureCheckPeriod = clientFailureCheckPeriod;
-
+
this.connectionTTL = connectionTTL;
-
+
this.retryInterval = retryInterval;
-
+
this.retryIntervalMultiplier = retryIntervalMultiplier;
-
+
this.maxRetryInterval = maxRetryInterval;
-
+
this.reconnectAttempts = reconnectAttempts;
this.useDuplicateDetection = useDuplicateDetection;
@@ -289,7 +289,7 @@
this.routeWhenNoConsumers = routeWhenNoConsumers;
this.executorFactory = executorFactory;
-
+
this.executor = executorFactory.getExecutor();
this.server = server;
@@ -311,9 +311,9 @@
this.allowDirectConnectionsOnly = allowDirectConnectionsOnly;
clusterConnector = new DiscoveryClusterConnector(dg);
-
+
this.manager = manager;
-
+
this.clusterManagerTopology = clusterManagerTopology;
}
@@ -325,13 +325,12 @@
}
started = true;
-
- if(!backup)
+
+ if (!backup)
{
activate();
}
-
}
public void stop() throws Exception
@@ -340,14 +339,23 @@
{
return;
}
+
+ if (log.isDebugEnabled())
+ {
+ log.debug(this + "::stopping ClusterConnection");
+ }
if (serverLocator != null)
{
serverLocator.removeClusterTopologyListener(this);
}
-
- log.debug("Cluster connection being stopped for node" + nodeUUID + ", server = " + this.server + " serverLocator = " + serverLocator );
+ log.debug("Cluster connection being stopped for node" + nodeUUID +
+ ", server = " +
+ this.server +
+ " serverLocator = " +
+ serverLocator);
+
synchronized (this)
{
for (MessageFlowRecord record : records.values())
@@ -370,14 +378,19 @@
props);
managementService.sendNotification(notification);
}
-
- executor.execute(new Runnable(){
+
+ executor.execute(new Runnable()
+ {
public void run()
{
- if(serverLocator != null)
+ synchronized (ClusterConnectionImpl.this)
{
- serverLocator.close();
- serverLocator = null;
+ if (serverLocator != null)
+ {
+ serverLocator.removeClusterTopologyListener(ClusterConnectionImpl.this);
+ serverLocator.close();
+ serverLocator = null;
+ }
}
}
@@ -401,7 +414,7 @@
{
return nodeUUID.toString();
}
-
+
public HornetQServer getServer()
{
return server;
@@ -429,7 +442,7 @@
{
return;
}
-
+
if (log.isDebugEnabled())
{
log.debug("Activating cluster connection nodeID=" + nodeUUID + " for server=" + this.server);
@@ -439,7 +452,6 @@
serverLocator = clusterConnector.createServerLocator();
-
if (serverLocator != null)
{
serverLocator.setNodeID(nodeUUID.toString());
@@ -451,17 +463,17 @@
serverLocator.setClusterTransportConfiguration(connector);
serverLocator.setBackup(server.getConfiguration().isBackup());
serverLocator.setInitialConnectAttempts(-1);
-
+
serverLocator.setClientFailureCheckPeriod(clientFailureCheckPeriod);
serverLocator.setConnectionTTL(connectionTTL);
if (serverLocator.getConfirmationWindowSize() < 0)
{
- // We can't have confirmationSize = -1 on the cluster Bridge
- // Otherwise we won't have confirmation working
+ // We can't have confirmationSize = -1 on the cluster Bridge
+ // Otherwise we won't have confirmation working
serverLocator.setConfirmationWindowSize(0);
}
-
+
if (!useDuplicateDetection)
{
log.debug("DuplicateDetection is disabled, sending clustered messages blocked");
@@ -470,7 +482,7 @@
serverLocator.setBlockOnDurableSend(!useDuplicateDetection);
serverLocator.setBlockOnNonDurableSend(!useDuplicateDetection);
- if(retryInterval > 0)
+ if (retryInterval > 0)
{
this.serverLocator.setRetryInterval(retryInterval);
}
@@ -491,7 +503,7 @@
managementService.sendNotification(notification);
}
}
-
+
public TransportConfiguration getConnector()
{
return connector;
@@ -509,9 +521,9 @@
{
return;
}
-
- //Remove the flow record for that node
-
+
+ // Remove the flow record for that node
+
MessageFlowRecord record = records.remove(nodeID);
if (record != null)
@@ -528,62 +540,64 @@
{
log.error("Failed to close flow record", e);
}
-
+
server.getClusterManager().notifyNodeDown(nodeID);
}
}
-
public void nodeUP(final String nodeID,
- final Pair<TransportConfiguration, TransportConfiguration> connectorPair,
- final boolean last)
+ final Pair<TransportConfiguration, TransportConfiguration> connectorPair,
+ final boolean last)
{
- if (log.isDebugEnabled())
- {
- String ClusterTestBase = "receiving nodeUP for nodeID=";
- log.debug(this + ClusterTestBase + nodeID +
- " connectionPair=" + connectorPair);
- }
- // discard notifications about ourselves unless its from our backup
- if (nodeID.equals(nodeUUID.toString()))
+ // we propagate the node notifications to all cluster topology listeners
+ server.getClusterManager().notifyNodeUp(nodeID, connectorPair, last, false);
+
+ synchronized (this)
{
- if(connectorPair.b != null)
+ if (serverLocator == null)
{
- server.getClusterManager().notifyNodeUp(nodeID, connectorPair, last, false);
+ log.debug("ClusterConnection nodeID=" + nodeID + " has already been stopped, ignoring call");
+ return;
}
- return;
- }
- // we propagate the node notifications to all cluster topology listeners
- server.getClusterManager().notifyNodeUp(nodeID, connectorPair, last, false);
+ if (log.isDebugEnabled())
+ {
+ String ClusterTestBase = "receiving nodeUP for nodeID=";
+ log.debug(this + ClusterTestBase + nodeID + " connectionPair=" + connectorPair);
+ }
+ // discard notifications about ourselves unless its from our backup
- // if the node is more than 1 hop away, we do not create a bridge for direct cluster connection
- if (allowDirectConnectionsOnly && !allowableConnections.contains(connectorPair.a))
- {
- return;
- }
+ if (nodeID.equals(nodeUUID.toString()))
+ {
+ if (connectorPair.b != null)
+ {
+ server.getClusterManager().notifyNodeUp(nodeID, connectorPair, last, false);
+ }
+ return;
+ }
- // FIXME required to prevent cluster connections w/o discovery group
- // and empty static connectors to create bridges... ulgy!
- if (serverLocator == null)
- {
- log.warn("ServerLocator==null FixME!!!!!");
- return;
- }
- /*we dont create bridges to backups*/
- if(connectorPair.a == null)
- {
- if (isTrace)
+ // if the node is more than 1 hop away, we do not create a bridge for direct cluster connection
+ if (allowDirectConnectionsOnly && !allowableConnections.contains(connectorPair.a))
{
- log.trace(this + " ignoring call with nodeID=" + nodeID +
- ", connectorPair=" + connectorPair + ", last=" + last);
+ return;
}
- return;
- }
- synchronized (records)
- {
+ /*we dont create bridges to backups*/
+ if (connectorPair.a == null)
+ {
+ if (isTrace)
+ {
+ log.trace(this + " ignoring call with nodeID=" +
+ nodeID +
+ ", connectorPair=" +
+ connectorPair +
+ ", last=" +
+ last);
+ }
+ return;
+ }
+
try
{
MessageFlowRecord record = records.get(nodeID);
@@ -620,32 +634,42 @@
{
if (isTrace)
{
- log.trace ("XXX " + this + " ignored nodeUp record for " + connectorPair + " on nodeID=" + nodeID + " as the record already existed");
+ log.trace("XXX " + this +
+ " ignored nodeUp record for " +
+ connectorPair +
+ " on nodeID=" +
+ nodeID +
+ " as the record already existed");
}
}
}
catch (Exception e)
{
- log.error("Failed to update topology", e);
+ log.error(this + "::Failed to update topology", e);
}
}
}
- private void createNewRecord(final String targetNodeID,
- final TransportConfiguration connector,
- final SimpleString queueName,
- final Queue queue,
- final boolean start) throws Exception
+ private synchronized void createNewRecord(final String targetNodeID,
+ final TransportConfiguration connector,
+ final SimpleString queueName,
+ final Queue queue,
+ final boolean start) throws Exception
{
+ if (serverLocator != null)
+ {
+ return;
+ }
+
MessageFlowRecordImpl record = new MessageFlowRecordImpl(targetNodeID, connector, queueName, queue);
Bridge bridge = createClusteredBridge(record);
-
+
if (log.isDebugEnabled())
{
log.debug("XXX creating record between " + this.connector + " and " + connector + bridge);
}
-
+
record.setBridge(bridge);
records.put(targetNodeID, record);
@@ -663,9 +687,12 @@
*/
protected Bridge createClusteredBridge(MessageFlowRecordImpl record) throws Exception
{
-
- final ServerLocatorInternal targetLocator = new ServerLocatorImpl(this.clusterManagerTopology, false, record.getConnector());
-
+
+ final ServerLocatorInternal targetLocator = new ServerLocatorImpl(this.clusterManagerTopology,
+ false,
+ server.getThreadPool(), server.getScheduledPool(),
+ record.getConnector());
+
targetLocator.setReconnectAttempts(0);
targetLocator.setInitialConnectAttempts(0);
@@ -677,20 +704,20 @@
targetLocator.setBlockOnDurableSend(!useDuplicateDetection);
targetLocator.setBlockOnNonDurableSend(!useDuplicateDetection);
targetLocator.setClusterConnection(true);
-
+
targetLocator.setRetryInterval(retryInterval);
targetLocator.setMaxRetryInterval(maxRetryInterval);
targetLocator.setRetryIntervalMultiplier(retryIntervalMultiplier);
-
+
targetLocator.setNodeID(serverLocator.getNodeID());
targetLocator.setClusterTransportConfiguration(serverLocator.getClusterTransportConfiguration());
-
- if(retryInterval > 0)
+
+ if (retryInterval > 0)
{
targetLocator.setRetryInterval(retryInterval);
}
-
+
manager.addClusterLocator(targetLocator);
ClusterConnectionBridge bridge = new ClusterConnectionBridge(this,
@@ -719,9 +746,8 @@
record,
record.getConnector());
+ targetLocator.setIdentity("(Cluster-connection-bridge::" + bridge.toString() + "::" + this.toString() + ")");
- targetLocator.setIdentity("(Cluster-connection-bridge::" + bridge.toString() + "::" + this.toString() + ")");
-
return bridge;
}
@@ -732,12 +758,15 @@
private Bridge bridge;
private final String targetNodeID;
+
private final TransportConfiguration connector;
+
private final SimpleString queueName;
+
private final Queue queue;
private final Map<SimpleString, RemoteQueueBinding> bindings = new HashMap<SimpleString, RemoteQueueBinding>();
-
+
private volatile boolean isClosed = false;
private volatile boolean firstReset = false;
@@ -752,8 +781,6 @@
this.connector = connector;
this.queueName = queueName;
}
-
-
/* (non-Javadoc)
* @see java.lang.Object#toString()
@@ -771,12 +798,10 @@
", isClosed=" +
isClosed +
", firstReset=" +
- firstReset +
+ firstReset +
"]";
}
-
-
public String getAddress()
{
return address.toString();
@@ -825,14 +850,14 @@
{
log.trace("Stopping bridge " + bridge);
}
-
+
isClosed = true;
clearBindings();
-
+
bridge.stop();
}
- public boolean isClosed()
+ public boolean isClosed()
{
return isClosed;
}
@@ -842,7 +867,7 @@
clearBindings();
}
- public void setBridge(final Bridge bridge)
+ public void setBridge(final Bridge bridge)
{
this.bridge = bridge;
}
@@ -856,7 +881,7 @@
{
if (isTrace)
{
- log.trace("Flow record on " + clusterConnector + " Receiving message " + message);
+ log.trace("Flow record on " + clusterConnector + " Receiving message " + message);
}
try
{
@@ -1049,12 +1074,12 @@
return;
}
-
+
if (isTrace)
{
log.trace("Adding binding " + clusterName + " into " + ClusterConnectionImpl.this);
}
-
+
bindings.put(clusterName, binding);
try
@@ -1083,7 +1108,7 @@
}
SimpleString clusterName = message.getSimpleStringProperty(ManagementHelper.HDR_CLUSTER_NAME);
-
+
System.out.println("Removing clusterName=" + clusterName + " on " + ClusterConnectionImpl.this);
removeBinding(clusterName);
@@ -1126,10 +1151,12 @@
SimpleString filterString = message.getSimpleStringProperty(ManagementHelper.HDR_FILTERSTRING);
RemoteQueueBinding binding = bindings.get(clusterName);
-
+
if (binding == null)
{
- throw new IllegalStateException("Cannot find binding for " + clusterName + " on " + ClusterConnectionImpl.this);
+ throw new IllegalStateException("Cannot find binding for " + clusterName +
+ " on " +
+ ClusterConnectionImpl.this);
}
binding.addConsumer(filterString);
@@ -1223,7 +1250,7 @@
{
return records;
}
-
+
/* (non-Javadoc)
* @see java.lang.Object#toString()
*/
@@ -1245,18 +1272,16 @@
StringWriter str = new StringWriter();
PrintWriter out = new PrintWriter(str);
-
out.println(this);
out.println("***************************************");
out.println(name + " connected to");
for (Entry<String, MessageFlowRecord> messageFlow : records.entrySet())
{
- out.println("\t Bridge = " + messageFlow.getValue().getBridge());
+ out.println("\t Bridge = " + messageFlow.getValue().getBridge());
out.println("\t Flow Record = " + messageFlow.getValue());
}
out.println("***************************************");
-
-
+
return str.toString();
}
@@ -1276,13 +1301,13 @@
public ServerLocatorInternal createServerLocator()
{
- if(tcConfigs != null && tcConfigs.length > 0)
+ if (tcConfigs != null && tcConfigs.length > 0)
{
if (log.isDebugEnabled())
{
log.debug(ClusterConnectionImpl.this + "Creating a serverLocator for " + Arrays.toString(tcConfigs));
}
- return new ServerLocatorImpl(clusterManagerTopology, true, tcConfigs);
+ return new ServerLocatorImpl(clusterManagerTopology, true, server.getThreadPool(), server.getScheduledPool(), tcConfigs);
}
else
{
@@ -1298,8 +1323,7 @@
{
return "StaticClusterConnector [tcConfigs=" + Arrays.toString(tcConfigs) + "]";
}
-
-
+
}
private class DiscoveryClusterConnector implements ClusterConnector
@@ -1313,7 +1337,7 @@
public ServerLocatorInternal createServerLocator()
{
- return new ServerLocatorImpl(clusterManagerTopology, true, dg);
+ return new ServerLocatorImpl(clusterManagerTopology, true, server.getThreadPool(), server.getScheduledPool(), dg);
}
}
}
Modified: branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2011-08-03 10:47:25 UTC (rev 11101)
+++ branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2011-08-03 19:53:17 UTC (rev 11102)
@@ -864,6 +864,11 @@
return scheduledPool;
}
+ public ExecutorService getThreadPool()
+ {
+ return threadPool;
+ }
+
public Configuration getConfiguration()
{
return configuration;
@@ -1349,7 +1354,7 @@
{
// Create the pools - we have two pools - one for non scheduled - and another for scheduled
- ThreadFactory tFactory = new HornetQThreadFactory("HornetQ-server-threads" + System.identityHashCode(this),
+ ThreadFactory tFactory = new HornetQThreadFactory("HornetQ-server-" + this.toString(),
false,
getThisClassLoader());
Modified: branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/utils/HornetQThreadFactory.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/utils/HornetQThreadFactory.java 2011-08-03 10:47:25 UTC (rev 11101)
+++ branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/utils/HornetQThreadFactory.java 2011-08-03 19:53:17 UTC (rev 11102)
@@ -63,7 +63,7 @@
// when sandboxed, the code does not have the RuntimePermission modifyThreadGroup
if (System.getSecurityManager() == null)
{
- t = new Thread(group, command, "Thread-" + threadCount.getAndIncrement() + " (group:" + group.getName() + ")");
+ t = new Thread(group, command, "Thread-" + threadCount.getAndIncrement() + " (" + group.getName() + ")");
}
else
{
More information about the hornetq-commits
mailing list