Author: clebert.suconic(a)jboss.com
Date: 2011-06-17 12:37:51 -0400 (Fri, 17 Jun 2011)
New Revision: 10849
Modified:
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/api/core/DiscoveryGroupConfiguration.java
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/ClusterConnection.java
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/MessageFlowRecord.java
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
Log:
tweaks
Modified:
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/api/core/DiscoveryGroupConfiguration.java
===================================================================
---
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/api/core/DiscoveryGroupConfiguration.java 2011-06-17
15:52:20 UTC (rev 10848)
+++
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/api/core/DiscoveryGroupConfiguration.java 2011-06-17
16:37:51 UTC (rev 10849)
@@ -179,4 +179,26 @@
result = 31 * result + (int) (discoveryInitialWaitTimeout ^
(discoveryInitialWaitTimeout >>> 32));
return result;
}
+
+ /* (non-Javadoc)
+ * @see java.lang.Object#toString()
+ */
+ @Override
+ public String toString()
+ {
+ return "DiscoveryGroupConfiguration [discoveryInitialWaitTimeout=" +
discoveryInitialWaitTimeout +
+ ", groupAddress=" +
+ groupAddress +
+ ", groupPort=" +
+ groupPort +
+ ", localBindAddress=" +
+ localBindAddress +
+ ", name=" +
+ name +
+ ", refreshTimeout=" +
+ refreshTimeout +
+ "]";
+ }
+
+
}
Modified:
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
===================================================================
---
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java 2011-06-17
15:52:20 UTC (rev 10848)
+++
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java 2011-06-17
16:37:51 UTC (rev 10849)
@@ -422,6 +422,13 @@
return;
}
+ synchronized (exitLock)
+ {
+ exitLock.notifyAll();
+ }
+
+ forceReturnChannel1();
+
// we need to stop the factory from connecting if it is in the middle of trying to
failover before we get the lock
causeExit();
synchronized (createSessionLock)
@@ -942,7 +949,7 @@
{
if (isDebug)
{
- log.debug("Trying reconnection attempt " + count);
+ log.debug("Trying reconnection attempt " + count + "/"
+ reconnectAttempts);
}
getConnection();
@@ -1055,6 +1062,11 @@
try
{
DelegatingBufferHandler handler = new DelegatingBufferHandler();
+
+ if (log.isDebugEnabled())
+ {
+ log.debug("Trying to connect with connector = " +
connectorFactory + ", parameters = " + connectorConfig.getParams());
+ }
connector = connectorFactory.createConnector(connectorConfig.getParams(),
handler,
Modified:
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
===================================================================
---
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java 2011-06-17
15:52:20 UTC (rev 10848)
+++
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java 2011-06-17
16:37:51 UTC (rev 10849)
@@ -65,7 +65,7 @@
private StaticConnector staticConnector = new StaticConnector();
- private Topology topology = new Topology();
+ private final Topology topology = new Topology();
private Pair<TransportConfiguration, TransportConfiguration>[] topologyArray;
@@ -599,7 +599,7 @@
threadPool,
scheduledThreadPool,
interceptors);
- factory.connect(initialConnectAttempts, failoverOnInitialConnection);
+ factory.connect(reconnectAttempts, failoverOnInitialConnection);
}
catch (HornetQException e)
{
@@ -1096,8 +1096,10 @@
{
staticConnector.disconnect();
}
+
+ Set<ClientSessionFactory> clonedFactory = new
HashSet<ClientSessionFactory>(factories);
- for (ClientSessionFactory factory : factories)
+ for (ClientSessionFactory factory : clonedFactory)
{
factory.close();
}
@@ -1263,16 +1265,6 @@
public synchronized void factoryClosed(final ClientSessionFactory factory)
{
factories.remove(factory);
-
- if (factories.isEmpty())
- {
- // Go back to using the broadcast or static list
-
- receivedTopology = false;
-
- topology = null;
-
- }
}
public Topology getTopology()
Modified:
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/ClusterConnection.java
===================================================================
---
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/ClusterConnection.java 2011-06-17
15:52:20 UTC (rev 10848)
+++
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/ClusterConnection.java 2011-06-17
16:37:51 UTC (rev 10849)
@@ -41,14 +41,6 @@
*/
Map<String, String> getNodes();
- void handleReplicatedAddBinding(SimpleString address,
- SimpleString uniqueName,
- SimpleString routingName,
- long queueID,
- SimpleString filterString,
- SimpleString queueName,
- int distance) throws Exception;
-
void activate() throws Exception;
TransportConfiguration getConnector();
Modified:
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/MessageFlowRecord.java
===================================================================
---
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/MessageFlowRecord.java 2011-06-17
15:52:20 UTC (rev 10848)
+++
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/MessageFlowRecord.java 2011-06-17
16:37:51 UTC (rev 10849)
@@ -33,14 +33,8 @@
Bridge getBridge();
void close() throws Exception;
-
- public void resume() throws Exception;
-
+
boolean isClosed();
void reset() throws Exception;
-
- void pause() throws Exception;
-
- boolean isPaused();
}
Modified:
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
===================================================================
---
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java 2011-06-17
15:52:20 UTC (rev 10848)
+++
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java 2011-06-17
16:37:51 UTC (rev 10849)
@@ -617,7 +617,7 @@
protected ClientSessionFactoryInternal createSessionFactory() throws Exception
{
ClientSessionFactoryInternal csf =
(ClientSessionFactoryInternal)serverLocator.createSessionFactory();
- // csf.setReconnectAttempts(0);
+ csf.setReconnectAttempts(0);
//csf.setInitialReconnectAttempts(1);
return csf;
}
Modified:
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
===================================================================
---
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2011-06-17
15:52:20 UTC (rev 10848)
+++
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2011-06-17
16:37:51 UTC (rev 10849)
@@ -489,7 +489,7 @@
// TODO: does it need to be sync?
- public synchronized void nodeUP(final String nodeID,
+ public void nodeUP(final String nodeID,
final Pair<TransportConfiguration,
TransportConfiguration> connectorPair,
final boolean last)
{
@@ -563,10 +563,6 @@
else
{
log.info("Reattaching nodeID=" + nodeID);
- if (record.isPaused())
- {
- record.resume();
- }
}
}
catch (Exception e)
@@ -808,26 +804,8 @@
bridge.stop();
}
- public void pause() throws Exception
+ public boolean isClosed()
{
- paused = true;
- clearBindings();
- bridge.pause();
- }
-
- public boolean isPaused()
- {
- return paused;
- }
-
- public void resume() throws Exception
- {
- paused = false;
- bridge.resume();
- }
-
- public boolean isClosed()
- {
return isClosed;
}
@@ -836,7 +814,6 @@
clearBindings();
}
-
public void setBridge(final Bridge bridge)
{
this.bridge = bridge;
@@ -972,6 +949,7 @@
private synchronized void clearBindings() throws Exception
{
+ log.debug(ClusterConnectionImpl.this + " clearing bindings");
for (RemoteQueueBinding binding : new
HashSet<RemoteQueueBinding>(bindings.values()))
{
removeBinding(binding.getClusterName());
@@ -980,6 +958,10 @@
private synchronized void doBindingAdded(final ClientMessage message) throws
Exception
{
+ if (log.isTraceEnabled())
+ {
+ log.trace(ClusterConnectionImpl.this + " Adding binding " +
message);
+ }
if (!message.containsProperty(ManagementHelper.HDR_DISTANCE))
{
throw new IllegalStateException("distance is null");
@@ -1039,6 +1021,11 @@
return;
}
+
+ if (isTrace)
+ {
+ log.trace("Adding binding " + clusterName + " into " +
ClusterConnectionImpl.this);
+ }
bindings.put(clusterName, binding);
@@ -1058,6 +1045,10 @@
private void doBindingRemoved(final ClientMessage message) throws Exception
{
+ if (log.isTraceEnabled())
+ {
+ log.trace(ClusterConnectionImpl.this + " Removing binding " +
message);
+ }
if (!message.containsProperty(ManagementHelper.HDR_CLUSTER_NAME))
{
throw new IllegalStateException("clusterName is null");
@@ -1082,6 +1073,10 @@
private synchronized void doConsumerCreated(final ClientMessage message) throws
Exception
{
+ if (log.isTraceEnabled())
+ {
+ log.trace(ClusterConnectionImpl.this + " Consumer created " +
message);
+ }
if (!message.containsProperty(ManagementHelper.HDR_DISTANCE))
{
throw new IllegalStateException("distance is null");
@@ -1136,6 +1131,10 @@
private synchronized void doConsumerClosed(final ClientMessage message) throws
Exception
{
+ if (log.isTraceEnabled())
+ {
+ log.trace(ClusterConnectionImpl.this + " Consumer closed " +
message);
+ }
if (!message.containsProperty(ManagementHelper.HDR_DISTANCE))
{
throw new IllegalStateException("distance is null");
@@ -1189,49 +1188,6 @@
}
- public void handleReplicatedAddBinding(final SimpleString address,
- final SimpleString uniqueName,
- final SimpleString routingName,
- final long queueID,
- final SimpleString filterString,
- final SimpleString queueName,
- final int distance) throws Exception
- {
- Binding queueBinding = postOffice.getBinding(queueName);
-
- if (queueBinding == null)
- {
- throw new IllegalStateException("Cannot find s & f queue " +
queueName);
- }
-
- Queue queue = (Queue)queueBinding.getBindable();
-
- RemoteQueueBinding binding = new
RemoteQueueBindingImpl(server.getStorageManager().generateUniqueID(),
- address,
- uniqueName,
- routingName,
- queueID,
- filterString,
- queue,
- queueName,
- distance);
-
- if (postOffice.getBinding(uniqueName) != null)
- {
- ClusterConnectionImpl.log.warn("Remoting queue binding " + uniqueName
+
- " has already been bound in the post office.
Most likely cause for this is you have a loop " +
- "in your cluster due to cluster max-hops
being too large or you have multiple cluster connections to the same nodes using
overlapping addresses");
-
- return;
- }
-
- postOffice.addBinding(binding);
-
- Bindings theBindings = postOffice.getBindingsForAddress(address);
-
- theBindings.setRouteWhenNoConsumers(routeWhenNoConsumers);
- }
-
// for testing only
public Map<String, MessageFlowRecord> getRecords()
{
@@ -1286,6 +1242,17 @@
return null;
}
}
+
+ /* (non-Javadoc)
+ * @see java.lang.Object#toString()
+ */
+ @Override
+ public String toString()
+ {
+ return "StaticClusterConnector [tcConfigs=" +
Arrays.toString(tcConfigs) + "]";
+ }
+
+
}
private class DiscoveryClusterConnector implements ClusterConnector
Modified:
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
===================================================================
---
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java 2011-06-17
15:52:20 UTC (rev 10848)
+++
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java 2011-06-17
16:37:51 UTC (rev 10849)
@@ -97,14 +97,8 @@
// the cluster connections which links this node to other cluster nodes
private final Map<String, ClusterConnection> clusterConnections = new
HashMap<String, ClusterConnection>();
- // regular client listeners to be notified of cluster topology changes.
- // they correspond to regular clients using a HA ServerLocator
- private Set<ClusterTopologyListener> clientListeners = new
ConcurrentHashSet<ClusterTopologyListener>();
+ private Set<ClusterTopologyListener> topologyListeners = new
ConcurrentHashSet<ClusterTopologyListener>();
- // cluster connections listeners to be notified of cluster topology changes
- // they correspond to cluster connections on *other nodes connected to this one*
- private Set<ClusterTopologyListener> clusterConnectionListeners = new
ConcurrentHashSet<ClusterTopologyListener>();
-
private Topology topology = new Topology();
private volatile ServerLocatorInternal backupServerLocator;
@@ -208,8 +202,7 @@
managementService.unregisterCluster(clusterConnection.getName().toString());
}
- clusterConnectionListeners.clear();
- clientListeners.clear();
+ topologyListeners.clear();
clusterConnections.clear();
topology.clear();
@@ -249,15 +242,10 @@
if (removed)
{
- for (ClusterTopologyListener listener : clientListeners)
+ for (ClusterTopologyListener listener : topologyListeners)
{
listener.nodeDown(nodeID);
}
-
- for (ClusterTopologyListener listener : clusterConnectionListeners)
- {
- listener.nodeDown(nodeID);
- }
}
}
@@ -274,16 +262,11 @@
return;
}
- for (ClusterTopologyListener listener : clientListeners)
+ for (ClusterTopologyListener listener : topologyListeners)
{
listener.nodeUP(nodeID, member.getConnector(), last);
}
- for (ClusterTopologyListener listener : clusterConnectionListeners)
- {
- listener.nodeUP(nodeID, member.getConnector(), last);
- }
-
// if this is a node being announced we are hearing it direct from the nodes CM so
need to inform our cluster
// connections.
if (nodeAnnounce)
@@ -322,18 +305,7 @@
public void addClusterTopologyListener(final ClusterTopologyListener listener, final
boolean clusterConnection)
{
- synchronized (this)
- {
- if (clusterConnection)
- {
- this.clusterConnectionListeners.add(listener);
- }
- else
- {
- this.clientListeners.add(listener);
- }
- }
-
+ topologyListeners.add(listener);
// We now need to send the current topology to the client
topology.sendTopology(listener);
}
@@ -341,14 +313,7 @@
public void removeClusterTopologyListener(final ClusterTopologyListener listener,
final boolean
clusterConnection)
{
- if (clusterConnection)
- {
- this.clusterConnectionListeners.remove(listener);
- }
- else
- {
- this.clientListeners.remove(listener);
- }
+ topologyListeners.add(listener);
}
public Topology getTopology()
@@ -425,15 +390,14 @@
}
}
- for (ClusterTopologyListener listener : clientListeners)
+ for (ClusterTopologyListener listener : topologyListeners)
{
+ if (log.isDebugEnabled())
+ {
+ log.debug("Informing client listener " + listener + " about
itself node " + nodeID + " with connector=" + member.getConnector());
+ }
listener.nodeUP(nodeID, member.getConnector(), false);
}
-
- for (ClusterTopologyListener listener : clusterConnectionListeners)
- {
- listener.nodeUP(nodeID, member.getConnector(), false);
- }
}
}
@@ -499,16 +463,10 @@
// Propagate the announcement
- for (ClusterTopologyListener listener : clientListeners)
+ for (ClusterTopologyListener listener : topologyListeners)
{
listener.nodeUP(nodeID, member.getConnector(), false);
}
-
- for (ClusterTopologyListener listener : clusterConnectionListeners)
- {
- listener.nodeUP(nodeID, member.getConnector(), false);
- }
-
}
private synchronized void deployBroadcastGroup(final BroadcastGroupConfiguration
config) throws Exception