[hornetq-commits] JBoss hornetq SVN: r11104 - in branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core: server/cluster/impl and 1 other directories.
do-not-reply at jboss.org
do-not-reply at jboss.org
Wed Aug 3 16:44:02 EDT 2011
Author: clebert.suconic at jboss.com
Date: 2011-08-03 16:44:02 -0400 (Wed, 03 Aug 2011)
New Revision: 11104
Modified:
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/impl/QueueImpl.java
Log:
test fixes
Modified: branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java 2011-08-03 20:06:41 UTC (rev 11103)
+++ branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java 2011-08-03 20:44:02 UTC (rev 11104)
@@ -118,7 +118,6 @@
{
public void nodeUP(String nodeID, Pair<TransportConfiguration, TransportConfiguration> connectorPair, boolean last)
{
- Logger.getLogger(this.getClass()).info("ZZZ send nodeDown on " + this);
channel0.send(new ClusterTopologyChangeMessage(nodeID, connectorPair, last));
}
Modified: branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2011-08-03 20:06:41 UTC (rev 11103)
+++ branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2011-08-03 20:44:02 UTC (rev 11104)
@@ -383,14 +383,10 @@
{
public void run()
{
- synchronized (ClusterConnectionImpl.this)
+ if (serverLocator != null)
{
- if (serverLocator != null)
- {
- serverLocator.removeClusterTopologyListener(ClusterConnectionImpl.this);
- serverLocator.close();
- serverLocator = null;
- }
+ serverLocator.close();
+ serverLocator = null;
}
}
@@ -549,55 +545,55 @@
final Pair<TransportConfiguration, TransportConfiguration> connectorPair,
final boolean last)
{
+ if (log.isDebugEnabled())
+ {
+ String ClusterTestBase = "receiving nodeUP for nodeID=";
+ log.debug(this + ClusterTestBase + nodeID + " connectionPair=" + connectorPair);
+ }
+ // discard notifications about ourselves unless its from our backup
- // we propagate the node notifications to all cluster topology listeners
- server.getClusterManager().notifyNodeUp(nodeID, connectorPair, last, false);
-
- synchronized (this)
+ if (nodeID.equals(nodeUUID.toString()))
{
- if (serverLocator == null)
+ if (connectorPair.b != null)
{
- log.debug("ClusterConnection nodeID=" + nodeID + " has already been stopped, ignoring call");
- return;
+ server.getClusterManager().notifyNodeUp(nodeID, connectorPair, last, false);
}
+ return;
+ }
- if (log.isDebugEnabled())
- {
- String ClusterTestBase = "receiving nodeUP for nodeID=";
- log.debug(this + ClusterTestBase + nodeID + " connectionPair=" + connectorPair);
- }
- // discard notifications about ourselves unless its from our backup
+ // we propagate the node notifications to all cluster topology listeners
+ server.getClusterManager().notifyNodeUp(nodeID, connectorPair, last, false);
- if (nodeID.equals(nodeUUID.toString()))
- {
- if (connectorPair.b != null)
- {
- server.getClusterManager().notifyNodeUp(nodeID, connectorPair, last, false);
- }
- return;
- }
+ // if the node is more than 1 hop away, we do not create a bridge for direct cluster connection
+ if (allowDirectConnectionsOnly && !allowableConnections.contains(connectorPair.a))
+ {
+ return;
+ }
- // if the node is more than 1 hop away, we do not create a bridge for direct cluster connection
- if (allowDirectConnectionsOnly && !allowableConnections.contains(connectorPair.a))
+ // FIXME required to prevent cluster connections w/o discovery group
+ // and empty static connectors to create bridges... ulgy!
+ if (serverLocator == null)
+ {
+ log.warn("ServerLocator==null FixME!!!!!");
+ return;
+ }
+ /*we dont create bridges to backups*/
+ if (connectorPair.a == null)
+ {
+ if (isTrace)
{
- return;
+ log.trace(this + " ignoring call with nodeID=" +
+ nodeID +
+ ", connectorPair=" +
+ connectorPair +
+ ", last=" +
+ last);
}
+ return;
+ }
- /*we dont create bridges to backups*/
- if (connectorPair.a == null)
- {
- if (isTrace)
- {
- log.trace(this + " ignoring call with nodeID=" +
- nodeID +
- ", connectorPair=" +
- connectorPair +
- ", last=" +
- last);
- }
- return;
- }
-
+ synchronized (records)
+ {
try
{
MessageFlowRecord record = records.get(nodeID);
@@ -645,22 +641,17 @@
}
catch (Exception e)
{
- log.error(this + "::Failed to update topology", e);
+ log.error("Failed to update topology", e);
}
}
}
- private synchronized void createNewRecord(final String targetNodeID,
- final TransportConfiguration connector,
- final SimpleString queueName,
- final Queue queue,
- final boolean start) throws Exception
+ private void createNewRecord(final String targetNodeID,
+ final TransportConfiguration connector,
+ final SimpleString queueName,
+ final Queue queue,
+ final boolean start) throws Exception
{
- if (serverLocator != null)
- {
- return;
- }
-
MessageFlowRecordImpl record = new MessageFlowRecordImpl(targetNodeID, connector, queueName, queue);
Bridge bridge = createClusteredBridge(record);
@@ -690,7 +681,8 @@
final ServerLocatorInternal targetLocator = new ServerLocatorImpl(this.clusterManagerTopology,
false,
- server.getThreadPool(), server.getScheduledPool(),
+ server.getThreadPool(),
+ server.getScheduledPool(),
record.getConnector());
targetLocator.setReconnectAttempts(0);
@@ -1307,7 +1299,11 @@
{
log.debug(ClusterConnectionImpl.this + "Creating a serverLocator for " + Arrays.toString(tcConfigs));
}
- return new ServerLocatorImpl(clusterManagerTopology, true, server.getThreadPool(), server.getScheduledPool(), tcConfigs);
+ return new ServerLocatorImpl(clusterManagerTopology,
+ true,
+ server.getThreadPool(),
+ server.getScheduledPool(),
+ tcConfigs);
}
else
{
@@ -1337,7 +1333,11 @@
public ServerLocatorInternal createServerLocator()
{
- return new ServerLocatorImpl(clusterManagerTopology, true, server.getThreadPool(), server.getScheduledPool(), dg);
+ return new ServerLocatorImpl(clusterManagerTopology,
+ true,
+ server.getThreadPool(),
+ server.getScheduledPool(),
+ dg);
}
}
}
Modified: branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/impl/QueueImpl.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/impl/QueueImpl.java 2011-08-03 20:06:41 UTC (rev 11103)
+++ branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/impl/QueueImpl.java 2011-08-03 20:44:02 UTC (rev 11104)
@@ -24,6 +24,7 @@
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
+import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
@@ -426,7 +427,13 @@
public void deliverAsync()
{
- getExecutor().execute(deliverRunner);
+ try
+ {
+ getExecutor().execute(deliverRunner);
+ }
+ catch (RejectedExecutionException ignored)
+ {
+ }
}
public void close() throws Exception
More information about the hornetq-commits
mailing list