[hornetq-commits] JBoss hornetq SVN: r11130 - in branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core: server/cluster/impl and 1 other directory.
do-not-reply at jboss.org
do-not-reply at jboss.org
Fri Aug 5 00:17:52 EDT 2011
Author: clebert.suconic at jboss.com
Date: 2011-08-05 00:17:52 -0400 (Fri, 05 Aug 2011)
New Revision: 11130
Modified:
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
Log:
tweaks
Modified: branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java 2011-08-05 03:13:03 UTC (rev 11129)
+++ branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java 2011-08-05 04:17:52 UTC (rev 11130)
@@ -84,7 +84,7 @@
private boolean receivedTopology;
private boolean compressLargeMessage;
-
+
// if the system should shutdown the pool when shutting down
private transient boolean shutdownPool;
@@ -252,13 +252,12 @@
private void setThreadPools()
{
- if (threadPool != null)
- {
- return;
- }
- else
- if (useGlobalPools)
+ if (threadPool != null)
{
+ return;
+ }
+ else if (useGlobalPools)
+ {
threadPool = getGlobalThreadPool();
scheduledThreadPool = getGlobalScheduledThreadPool();
@@ -266,7 +265,7 @@
else
{
this.shutdownPool = true;
-
+
ThreadFactory factory = new HornetQThreadFactory("HornetQ-client-factory-threads-" + System.identityHashCode(this),
true,
getThisClassLoader());
@@ -369,19 +368,19 @@
private ServerLocatorImpl(final Topology topology,
final boolean useHA,
- final ExecutorService threadPool,
- final ScheduledExecutorService scheduledExecutor,
+ final ExecutorService threadPool,
+ final ScheduledExecutorService scheduledExecutor,
final DiscoveryGroupConfiguration discoveryGroupConfiguration,
final TransportConfiguration[] transportConfigs)
{
e.fillInStackTrace();
-
+
this.scheduledThreadPool = scheduledExecutor;
-
+
this.threadPool = threadPool;
-
+
this.topology = topology;
-
+
this.ha = useHA;
this.discoveryGroupConfiguration = discoveryGroupConfiguration;
@@ -480,10 +479,14 @@
* @param discoveryAddress
* @param discoveryPort
*/
- public ServerLocatorImpl(final Topology topology, final boolean useHA, final ExecutorService threadPool, final ScheduledExecutorService scheduledExecutor, final DiscoveryGroupConfiguration groupConfiguration)
+ public ServerLocatorImpl(final Topology topology,
+ final boolean useHA,
+ final ExecutorService threadPool,
+ final ScheduledExecutorService scheduledExecutor,
+ final DiscoveryGroupConfiguration groupConfiguration)
{
this(topology, useHA, threadPool, scheduledExecutor, groupConfiguration, null);
-
+
}
/**
@@ -491,7 +494,11 @@
*
* @param transportConfigs
*/
- public ServerLocatorImpl(final Topology topology, final boolean useHA, final ExecutorService threadPool, final ScheduledExecutorService scheduledExecutor, final TransportConfiguration... transportConfigs)
+ public ServerLocatorImpl(final Topology topology,
+ final boolean useHA,
+ final ExecutorService threadPool,
+ final ScheduledExecutorService scheduledExecutor,
+ final TransportConfiguration... transportConfigs)
{
this(topology, useHA, threadPool, scheduledExecutor, null, transportConfigs);
}
@@ -565,7 +572,7 @@
addFactory(sf);
return sf;
}
-
+
public boolean isClosed()
{
return closed || closing;
@@ -697,10 +704,12 @@
if (ha || clusterConnection)
{
long timeout = System.currentTimeMillis() + 30000;
- while (!ServerLocatorImpl.this.closed && !ServerLocatorImpl.this.closing && !receivedTopology && timeout > System.currentTimeMillis())
+ while (!ServerLocatorImpl.this.closed && !ServerLocatorImpl.this.closing &&
+ !receivedTopology &&
+ timeout > System.currentTimeMillis())
{
// Now wait for the topology
-
+
try
{
wait(1000);
@@ -711,7 +720,7 @@
}
- if (System.currentTimeMillis() > timeout && ! receivedTopology && !closed && !closing)
+ if (System.currentTimeMillis() > timeout && !receivedTopology && !closed && !closing)
{
throw new HornetQException(HornetQException.CONNECTION_TIMEDOUT,
"Timed out waiting to receive cluster topology");
@@ -1237,7 +1246,7 @@
}
topology.removeMember(nodeID);
-
+
if (!topology.isEmpty())
{
updateArraysAndPairs();
@@ -1257,8 +1266,8 @@
}
public void notifyNodeUp(final String nodeID,
- final Pair<TransportConfiguration, TransportConfiguration> connectorPair,
- final boolean last)
+ final Pair<TransportConfiguration, TransportConfiguration> connectorPair,
+ final boolean last)
{
if (!clusterConnection && !ha)
{
@@ -1275,7 +1284,8 @@
if (log.isDebugEnabled())
{
- log.debug("XXX ZZZ NodeUp " + this + "::nodeID=" + nodeID + ", connectorPair=" + connectorPair, new Exception ("trace"));
+ log.debug("XXX ZZZ NodeUp " + this + "::nodeID=" + nodeID + ", connectorPair=" + connectorPair,
+ new Exception("trace"));
}
topology.addMember(nodeID, new TopologyMember(connectorPair), last);
@@ -1335,7 +1345,7 @@
private synchronized void updateArraysAndPairs()
{
Collection<TopologyMember> membersCopy = topology.getMembers();
-
+
topologyArray = (Pair<TransportConfiguration, TransportConfiguration>[])Array.newInstance(Pair.class,
membersCopy.size());
Modified: branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java 2011-08-05 03:13:03 UTC (rev 11129)
+++ branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java 2011-08-05 04:17:52 UTC (rev 11130)
@@ -791,19 +791,24 @@
try
{
log.debug("stopping bridge " + BridgeImpl.this);
-
- if (session != null)
- {
- log.debug("Cleaning up session " + session);
- session.close();
- session.removeFailureListener(BridgeImpl.this);
- }
+///////////////
+// if (session != null)
+// {
+// log.debug("Cleaning up session " + session);
+// session.close();
+//
+// }
+
+ session.removeFailureListener(BridgeImpl.this);
+
if (csf != null)
{
csf.cleanup();
}
+ serverLocator.close();
+
synchronized (BridgeImpl.this)
{
log.debug("Closing Session for bridge " + BridgeImpl.this.name);
More information about the hornetq-commits
mailing list