[hornetq-commits] JBoss hornetq SVN: r11063 - in branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core: server/cluster/impl and 1 other directory.
do-not-reply at jboss.org
do-not-reply at jboss.org
Wed Jul 27 23:47:42 EDT 2011
Author: clebert.suconic at jboss.com
Date: 2011-07-27 23:47:39 -0400 (Wed, 27 Jul 2011)
New Revision: 11063
Modified:
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
Log:
fixes
Modified: branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java 2011-07-28 03:01:31 UTC (rev 11062)
+++ branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java 2011-07-28 03:47:39 UTC (rev 11063)
@@ -648,28 +648,22 @@
if (ha || clusterConnection)
{
- long toWait = 30000;
- long start = System.currentTimeMillis();
- while (!ServerLocatorImpl.this.closed && !ServerLocatorImpl.this.closing && !receivedTopology && toWait > 0)
+ long timeout = System.currentTimeMillis() + 30000;
+ while (!ServerLocatorImpl.this.closed && !ServerLocatorImpl.this.closing && !receivedTopology && timeout > System.currentTimeMillis())
{
// Now wait for the topology
-
+
try
{
- wait(toWait);
+ wait(1000);
}
catch (InterruptedException ignore)
{
}
- long now = System.currentTimeMillis();
-
- toWait -= now - start;
-
- start = now;
}
- if (toWait <= 0)
+ if (System.currentTimeMillis() > timeout && ! receivedTopology && !closed && !closing)
{
throw new HornetQException(HornetQException.CONNECTION_TIMEDOUT,
"Timed out waiting to receive cluster topology");
Modified: branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2011-07-28 03:01:31 UTC (rev 11062)
+++ branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2011-07-28 03:47:39 UTC (rev 11063)
@@ -24,7 +24,6 @@
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
-import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import org.hornetq.api.core.DiscoveryGroupConfiguration;
@@ -125,7 +124,10 @@
private final Set<TransportConfiguration> allowableConnections = new HashSet<TransportConfiguration>();
- public ClusterConnectionImpl(final TransportConfiguration[] tcConfigs,
+ private final ClusterManagerImpl manager;
+
+ public ClusterConnectionImpl(final ClusterManagerImpl manager,
+ final TransportConfiguration[] tcConfigs,
final TransportConfiguration connector,
final SimpleString name,
final SimpleString address,
@@ -199,6 +201,8 @@
this.clusterPassword = clusterPassword;
this.allowDirectConnectionsOnly = allowDirectConnectionsOnly;
+
+ this.manager = manager;
clusterConnector = new StaticClusterConnector(tcConfigs);
@@ -214,7 +218,8 @@
}
- public ClusterConnectionImpl(DiscoveryGroupConfiguration dg,
+ public ClusterConnectionImpl(final ClusterManagerImpl manager,
+ DiscoveryGroupConfiguration dg,
final TransportConfiguration connector,
final SimpleString name,
final SimpleString address,
@@ -290,6 +295,8 @@
this.allowDirectConnectionsOnly = allowDirectConnectionsOnly;
clusterConnector = new DiscoveryClusterConnector(dg);
+
+ this.manager = manager;
}
public synchronized void start() throws Exception
@@ -646,7 +653,6 @@
targetLocator.setBlockOnDurableSend(!useDuplicateDetection);
targetLocator.setBlockOnNonDurableSend(!useDuplicateDetection);
targetLocator.setClusterConnection(true);
- targetLocator.setIdentity("(Cluster-connection-bridge::" + this.toString() + ")");
targetLocator.setRetryInterval(retryInterval);
targetLocator.setMaxRetryInterval(maxRetryInterval);
@@ -660,6 +666,8 @@
{
targetLocator.setRetryInterval(retryInterval);
}
+
+ manager.addClusterLocator(targetLocator);
ClusterConnectionBridge bridge = new ClusterConnectionBridge(this,
targetLocator,
@@ -687,7 +695,10 @@
record,
record.getConnector());
- return bridge;
+
+ targetLocator.setIdentity("(Cluster-connection-bridge::" + bridge.toString() + "::" + this.toString() + ")");
+
+ return bridge;
}
// Inner classes -----------------------------------------------------------------------------------
Modified: branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java 2011-07-28 03:01:31 UTC (rev 11062)
+++ branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java 2011-07-28 03:47:39 UTC (rev 11063)
@@ -19,7 +19,6 @@
import java.io.StringWriter;
import java.lang.reflect.Array;
import java.net.InetAddress;
-import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
@@ -37,6 +36,7 @@
import org.hornetq.api.core.client.ClientSessionFactory;
import org.hornetq.api.core.client.ClusterTopologyListener;
import org.hornetq.api.core.client.HornetQClient;
+import org.hornetq.api.core.client.ServerLocator;
import org.hornetq.core.client.impl.ServerLocatorInternal;
import org.hornetq.core.client.impl.Topology;
import org.hornetq.core.client.impl.TopologyMember;
@@ -106,7 +106,7 @@
private volatile ServerLocatorInternal backupServerLocator;
- private final List<ServerLocatorInternal> clusterLocators = new ArrayList<ServerLocatorInternal>();
+ private final Set<ServerLocator> clusterLocators = new ConcurrentHashSet<ServerLocator>();
private final Executor executor;
@@ -245,9 +245,11 @@
backupServerLocator = null;
}
- for (ServerLocatorInternal clusterLocator : clusterLocators)
+ for (ServerLocator clusterLocator : clusterLocators)
{
+ log.info("WWW Closing clusterLocator " + clusterLocator);
clusterLocator.close();
+ log.info("WWW Closed clusterLocator " + clusterLocator);
}
clusterLocators.clear();
started = false;
@@ -483,6 +485,11 @@
log.warn("no cluster connections defined, unable to announce backup");
}
}
+
+ void addClusterLocator(final ServerLocatorInternal serverLocator)
+ {
+ this.clusterLocators.add(serverLocator);
+ }
private synchronized void announceNode()
{
@@ -721,7 +728,9 @@
log.debug("Bridge " + config.getName() +
" is configured to not use duplicate detecion, it will send messages synchronously");
}
+
clusterLocators.add(serverLocator);
+
Bridge bridge = new BridgeImpl(serverLocator,
config.getReconnectAttempts(),
config.getRetryInterval(),
@@ -819,7 +828,8 @@
log.debug("XXX " + this + " Starting a Discovery Group Cluster Connection, name=" + config.getDiscoveryGroupName() + ", dg=" + dg);
}
- clusterConnection = new ClusterConnectionImpl(dg,
+ clusterConnection = new ClusterConnectionImpl(this,
+ dg,
connector,
new SimpleString(config.getName()),
new SimpleString(config.getAddress()),
@@ -854,7 +864,8 @@
log.debug("XXX " + this + " defining cluster connection towards " + Arrays.toString(tcConfigs));
}
- clusterConnection = new ClusterConnectionImpl(tcConfigs,
+ clusterConnection = new ClusterConnectionImpl(this,
+ tcConfigs,
connector,
new SimpleString(config.getName()),
new SimpleString(config.getAddress()),
More information about the hornetq-commits
mailing list