[hornetq-commits] JBoss hornetq SVN: r11046 - in branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core: postoffice and 3 other directories.
do-not-reply at jboss.org
do-not-reply at jboss.org
Tue Jul 26 18:58:42 EDT 2011
Author: clebert.suconic at jboss.com
Date: 2011-07-26 18:58:42 -0400 (Tue, 26 Jul 2011)
New Revision: 11046
Modified:
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/Topology.java
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/postoffice/QueueInfo.java
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionBridge.java
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
Log:
test fixes and debug
Modified: branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java 2011-07-26 00:13:47 UTC (rev 11045)
+++ branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java 2011-07-26 22:58:42 UTC (rev 11046)
@@ -1400,7 +1400,7 @@
if (log.isTraceEnabled())
{
- log.trace("Disconnect being called on client:" + msg);
+ log.trace("XXX Disconnect being called on client:" + msg, new Exception ("trace"));
}
closeExecutor.execute(new Runnable()
@@ -1410,6 +1410,10 @@
public void run()
{
SimpleString nodeID = msg.getNodeID();
+ if (log.isTraceEnabled())
+ {
+ log.trace("XXX notify nodeID=" + msg.getNodeID() + " on serverLocator=" + serverLocator);
+ }
if (nodeID != null)
{
serverLocator.notifyNodeDown(msg.getNodeID().toString());
Modified: branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java 2011-07-26 00:13:47 UTC (rev 11045)
+++ branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java 2011-07-26 22:58:42 UTC (rev 11046)
@@ -644,7 +644,7 @@
}
while (retry);
- if (ha)
+ if (ha || clusterConnection)
{
long toWait = 30000;
long start = System.currentTimeMillis();
@@ -1164,10 +1164,19 @@
{
boolean removed = false;
- if (!ha)
+ if (!clusterConnection && !ha)
{
+ if (log.isDebugEnabled())
+ {
+ log.debug("ignoring notifyNodeDown=" + nodeID + " as isHA=false");
+ }
return;
}
+
+ if (log.isDebugEnabled())
+ {
+ log.debug("XXX " + this + "::Notify nodeID=" + nodeID + " as being down");
+ }
removed = topology.removeMember(nodeID);
@@ -1200,8 +1209,12 @@
final Pair<TransportConfiguration, TransportConfiguration> connectorPair,
final boolean last)
{
- if (!ha)
+ if (!clusterConnection && !ha)
{
+ if (log.isDebugEnabled())
+ {
+ log.debug(this + "::Ignoring notifyNodeUp for " + nodeID + " connectorPair=" + connectorPair + ", since ha=false and clusterConnection=false");
+ }
return;
}
@@ -1274,7 +1287,7 @@
this.initialConnectors[count++] = entry.getConnector();
}
- if (ha && clusterConnection && !receivedTopology && initialConnectors.length > 0)
+ if (clusterConnection && !receivedTopology && initialConnectors.length > 0)
{
// FIXME the node is alone in the cluster. We create a connection to the new node
// to trigger the node notification to form the cluster.
Modified: branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/Topology.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/Topology.java 2011-07-26 00:13:47 UTC (rev 11045)
+++ branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/Topology.java 2011-07-26 22:58:42 UTC (rev 11046)
@@ -110,9 +110,9 @@
public synchronized boolean removeMember(String nodeId)
{
TopologyMember member = topology.remove(nodeId);
- if (debug)
+ if (log.isDebugEnabled())
{
- log.debug("Removing member " + member);
+ log.debug("XXX Removing member " + member, new Exception ("trace"));
}
return (member != null);
}
Modified: branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/postoffice/QueueInfo.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/postoffice/QueueInfo.java 2011-07-26 00:13:47 UTC (rev 11045)
+++ branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/postoffice/QueueInfo.java 2011-07-26 22:58:42 UTC (rev 11046)
@@ -129,4 +129,30 @@
{
numberOfConsumers--;
}
+
+ /* (non-Javadoc)
+ * @see java.lang.Object#toString()
+ */
+ @Override
+ public String toString()
+ {
+ return "QueueInfo [routingName=" + routingName +
+ ", clusterName=" +
+ clusterName +
+ ", address=" +
+ address +
+ ", filterString=" +
+ filterString +
+ ", id=" +
+ id +
+ ", filterStrings=" +
+ filterStrings +
+ ", numberOfConsumers=" +
+ numberOfConsumers +
+ ", distance=" +
+ distance +
+ "]";
+ }
+
+
}
Modified: branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java 2011-07-26 00:13:47 UTC (rev 11045)
+++ branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java 2011-07-26 22:58:42 UTC (rev 11046)
@@ -753,6 +753,12 @@
{
throw new IllegalStateException("Cannot find queue " + queueName);
}
+
+ if (log.isDebugEnabled())
+ {
+ log.debug("XXX PostOffice.sendQueueInfoToQueue on server=" + this.server + ", queueName=" + queueName + " and address=" + address,
+ new Exception ("trace"));
+ }
Queue queue = (Queue)binding.getBindable();
@@ -769,6 +775,10 @@
for (QueueInfo info : queueInfos.values())
{
+ if (log.isDebugEnabled())
+ {
+ log.debug("XXX QueueInfo on sendQueueInfoToQueue = " + info);
+ }
if (info.getAddress().startsWith(address))
{
message = createQueueInfoMessage(NotificationType.BINDING_ADDED, queueName);
@@ -789,7 +799,7 @@
message = createQueueInfoMessage(NotificationType.CONSUMER_CREATED, queueName);
message.putStringProperty(ManagementHelper.HDR_ADDRESS, info.getAddress());
- message.putStringProperty(ManagementHelper.HDR_CLUSTER_NAME, info.getClusterName());
+ message.putStringProperty(ManagementHelper.HDR_CLUSTER_NAME, info.getClusterName());
message.putStringProperty(ManagementHelper.HDR_ROUTING_NAME, info.getRoutingName());
message.putIntProperty(ManagementHelper.HDR_DISTANCE, info.getDistance());
Modified: branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java 2011-07-26 00:13:47 UTC (rev 11045)
+++ branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java 2011-07-26 22:58:42 UTC (rev 11046)
@@ -24,6 +24,7 @@
import org.hornetq.api.core.client.ClusterTopologyListener;
import org.hornetq.api.core.client.HornetQClient;
import org.hornetq.core.config.Configuration;
+import org.hornetq.core.logging.Logger;
import org.hornetq.core.protocol.core.Channel;
import org.hornetq.core.protocol.core.ChannelHandler;
import org.hornetq.core.protocol.core.CoreRemotingConnection;
@@ -49,6 +50,10 @@
*/
public class CoreProtocolManager implements ProtocolManager
{
+ private static final Logger log = Logger.getLogger(CoreProtocolManager.class);
+
+ private static final boolean isTrace = log.isTraceEnabled();
+
private final HornetQServer server;
private final List<Interceptor> interceptors;
@@ -147,6 +152,10 @@
{
pair = new Pair<TransportConfiguration, TransportConfiguration>(msg.getConnector(), null);
}
+ if (isTrace)
+ {
+ log.trace("XXX Server " + server + " receiving nodeUp from NodeID=" + msg.getNodeID() + ", pair=" + pair);
+ }
server.getClusterManager().notifyNodeUp(msg.getNodeID(), pair, false, true);
}
}
Modified: branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionBridge.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionBridge.java 2011-07-26 00:13:47 UTC (rev 11045)
+++ branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionBridge.java 2011-07-26 22:58:42 UTC (rev 11046)
@@ -135,7 +135,7 @@
if (log.isDebugEnabled())
{
- log.debug("Setting up bridge between " + clusterConnection.getConnector() + " and " + targetLocator, new Exception ("trace"));
+ log.debug("XXX Setting up bridge between " + clusterConnection.getConnector() + " and " + targetLocator, new Exception ("trace"));
}
}
@@ -247,7 +247,7 @@
ClientMessage message = session.createMessage(false);
- System.out.println("Requesting sendQueueInfoToQueue");
+ log.debug("XXX Requesting sendQueueInfoToQueue through " + this, new Exception ("trace"));
ManagementHelper.putOperationInvocation(message,
ResourceNames.CORE_SERVER,
"sendQueueInfoToQueue",
Modified: branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2011-07-26 00:13:47 UTC (rev 11045)
+++ branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2011-07-26 22:58:42 UTC (rev 11046)
@@ -542,6 +542,11 @@
/*we dont create bridges to backups*/
if(connectorPair.a == null)
{
+ if (isTrace)
+ {
+ log.trace(this + " ignoring call with nodeID=" + nodeID +
+ ", connectorPair=" + connectorPair + ", last=" + last);
+ }
return;
}
@@ -553,6 +558,10 @@
if (record == null)
{
+ if (log.isDebugEnabled())
+ {
+ log.debug(this + "::Creating record for nodeID=" + nodeID + ", connectorPair=" + connectorPair);
+ }
// New node - create a new flow record
@@ -575,6 +584,13 @@
createNewRecord(nodeID, connectorPair.a, queueName, queue, true);
}
+ else
+ {
+ if (isTrace)
+ {
+ log.trace ("XXX " + this + " ignored nodeUp record for " + connectorPair + " on nodeID=" + nodeID + " as the record already existed");
+ }
+ }
}
catch (Exception e)
{
@@ -595,7 +611,7 @@
if (log.isDebugEnabled())
{
- log.debug("creating record between " + this.connector + " and " + connector + bridge);
+ log.debug("XXX creating record between " + this.connector + " and " + connector + bridge);
}
record.setBridge(bridge);
Modified: branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java 2011-07-26 00:13:47 UTC (rev 11045)
+++ branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java 2011-07-26 22:58:42 UTC (rev 11046)
@@ -20,6 +20,7 @@
import java.lang.reflect.Array;
import java.net.InetAddress;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -262,6 +263,8 @@
{
return;
}
+
+ log.info("XXX " + this + "::removing nodeID=" + nodeID);
boolean removed = topology.removeMember(nodeID);
@@ -280,16 +283,20 @@
final boolean last,
final boolean nodeAnnounce)
{
- TopologyMember member = new TopologyMember(connectorPair);
- boolean updated = topology.addMember(nodeID, member);
-
if (log.isDebugEnabled())
{
- log.debug(this + "::NodeUp " + nodeID + connectorPair);
+ log.debug("XXX " + this + "::NodeUp " + nodeID + connectorPair + ", nodeAnnounce=" + nodeAnnounce);
}
+ TopologyMember member = new TopologyMember(connectorPair);
+ boolean updated = topology.addMember(nodeID, member);
+
if (!updated)
{
+ if (log.isDebugEnabled())
+ {
+ log.debug("XXX " + this + " ignored notifyNodeUp on nodeID=" + nodeID + " pair=" + connectorPair + " as the topology already knew about it");
+ }
return;
}
@@ -298,12 +305,27 @@
listener.nodeUP(nodeID, member.getConnector(), last);
}
+ if (log.isDebugEnabled())
+ {
+ log.debug("XXX " + this + " received notifyNodeUp nodeID=" + nodeID + " connectorPair=" + connectorPair +
+ ", nodeAnnounce=" + nodeAnnounce + ", last=" + last);
+ }
+
// if this is a node being announced we are hearing it direct from the nodes CM so need to inform our cluster
// connections.
if (nodeAnnounce)
{
+ if (log.isDebugEnabled())
+ {
+ log.debug("Informing " + nodeID + " to " + clusterConnections.toString());
+ }
for (ClusterConnection clusterConnection : clusterConnections.values())
{
+ if (log.isTraceEnabled())
+ {
+ log.trace("XXX " + this + " information clusterConnection=" + clusterConnection +
+ " nodeID=" + nodeID + " connectorPair=" + connectorPair + " last=" + last);
+ }
clusterConnection.nodeUP(nodeID, connectorPair, last);
}
}
@@ -828,7 +850,7 @@
if (log.isDebugEnabled())
{
- log.debug("XXX " + this + " defining cluster connection towards " + tcConfigs);
+ log.debug("XXX " + this + " defining cluster connection towards " + Arrays.toString(tcConfigs));
}
clusterConnection = new ClusterConnectionImpl(tcConfigs,
More information about the hornetq-commits
mailing list