Author: clebert.suconic(a)jboss.com
Date: 2011-06-28 20:59:58 -0400 (Tue, 28 Jun 2011)
New Revision: 10894
Modified:
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/Topology.java
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionBridge.java
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
Log:
Tweaks on my branch
Modified:
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
===================================================================
---
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java 2011-06-28
15:21:14 UTC (rev 10893)
+++
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java 2011-06-29
00:59:58 UTC (rev 10894)
@@ -1245,7 +1245,7 @@
}
}
- if (serverLocator.isHA())
+ if (serverLocator.isHA() || serverLocator.isClusterConnection())
{
if (isDebug)
{
Modified:
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
===================================================================
---
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java 2011-06-28
15:21:14 UTC (rev 10893)
+++
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java 2011-06-29
00:59:58 UTC (rev 10894)
@@ -76,7 +76,7 @@
private StaticConnector staticConnector = new StaticConnector();
- private final Topology topology = new Topology();
+ private final Topology topology = new Topology(this);
private Pair<TransportConfiguration, TransportConfiguration>[] topologyArray;
Modified:
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/Topology.java
===================================================================
---
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/Topology.java 2011-06-28
15:21:14 UTC (rev 10893)
+++
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/Topology.java 2011-06-29
00:59:58 UTC (rev 10894)
@@ -37,6 +37,21 @@
private static final Logger log = Logger.getLogger(Topology.class);
+
+ /** Used to debug operations.
+ *
+ * Someone may argue this is not needed. But it's impossible to debg anything
related to topology without knowing what node
+ * or what object missed a Topology update.
+ *
+ * Hence I added some information to locate debugging here.
+ * */
+ private final Object owner;
+
+
+ public Topology(final Object owner)
+ {
+ this.owner = owner;
+ }
/*
* topology describes the other cluster nodes that this server knows about:
@@ -54,7 +69,7 @@
TopologyMember currentMember = topology.get(nodeId);
if (debug)
{
- log.debug("adding = " + nodeId + ":" +
member.getConnector());
+ log.debug(this + "::adding = " + nodeId + ":" +
member.getConnector(), new Exception ("trace"));
log.debug("before----------------------------------");
log.debug(describe());
}
@@ -87,7 +102,7 @@
}
if(debug)
{
- log.debug("Topology updated=" + replaced);
+ log.debug(this + "::Topology updated=" + replaced);
log.debug(describe());
}
return replaced;
@@ -192,4 +207,21 @@
{
debug = b;
}
+
+ /* (non-Javadoc)
+ * @see java.lang.Object#toString()
+ */
+ @Override
+ public String toString()
+ {
+ if (owner == null)
+ {
+ return super.toString();
+ }
+ else
+ {
+ return "Topology [owner=" + owner + "]";
+ }
+ }
+
}
Modified:
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionBridge.java
===================================================================
---
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionBridge.java 2011-06-28
15:21:14 UTC (rev 10893)
+++
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionBridge.java 2011-06-29
00:59:58 UTC (rev 10894)
@@ -118,8 +118,6 @@
activated,
storageManager);
- System.out.println("ClusterConnectionBridge");
-
this.discoveryLocator = discoveryLocator;
idsHeaderName = MessageImpl.HDR_ROUTE_TO_IDS.concat(name);
@@ -134,6 +132,11 @@
// we need to disable DLQ check on the clustered bridges
queue.setInternalQueue(true);
+
+ if (log.isDebugEnabled())
+ {
+ log.debug("Setting up bridge between " +
clusterConnection.getConnector() + " and " + targetLocator, new Exception
("trace"));
+ }
}
@Override
Modified:
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
===================================================================
---
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2011-06-28
15:21:14 UTC (rev 10893)
+++
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2011-06-29
00:59:58 UTC (rev 10894)
@@ -613,7 +613,7 @@
protected Bridge createClusteredBridge(MessageFlowRecordImpl record) throws Exception
{
- ServerLocator targetLocator =
HornetQClient.createServerLocatorWithoutHA(record.getConnector());
+ ServerLocatorInternal targetLocator =
(ServerLocatorInternal)HornetQClient.createServerLocatorWithoutHA(record.getConnector());
targetLocator.setReconnectAttempts(0);
@@ -625,6 +625,11 @@
targetLocator.setConfirmationWindowSize(serverLocator.getConfirmationWindowSize());
targetLocator.setBlockOnDurableSend(!useDuplicateDetection);
targetLocator.setBlockOnNonDurableSend(!useDuplicateDetection);
+ targetLocator.setClusterConnection(true);
+
+ targetLocator.setNodeID(serverLocator.getNodeID());
+
+
targetLocator.setClusterTransportConfiguration(serverLocator.getClusterTransportConfiguration());
if(retryInterval > 0)
{
Modified:
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
===================================================================
---
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java 2011-06-28
15:21:14 UTC (rev 10893)
+++
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java 2011-06-29
00:59:58 UTC (rev 10894)
@@ -101,7 +101,7 @@
private Set<ClusterTopologyListener> topologyListeners = new
ConcurrentHashSet<ClusterTopologyListener>();
- private Topology topology = new Topology();
+ private Topology topology = new Topology(this);
private volatile ServerLocatorInternal backupServerLocator;
@@ -164,6 +164,11 @@
return str.toString();
}
+ public String toString()
+ {
+ return "ClusterManagerImpl[server=" + server + "]";
+ }
+
public synchronized void start() throws Exception
{
if (started)
@@ -327,8 +332,15 @@
public void addClusterTopologyListener(final ClusterTopologyListener listener, final
boolean clusterConnection)
{
topologyListeners.add(listener);
+
// We now need to send the current topology to the client
- topology.sendTopology(listener);
+ executor.execute(new Runnable(){
+ public void run()
+ {
+ topology.sendTopology(listener);
+
+ }
+ });
}
public void removeClusterTopologyListener(final ClusterTopologyListener listener,
Modified:
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
---
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2011-06-28
15:21:14 UTC (rev 10893)
+++
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2011-06-29
00:59:58 UTC (rev 10894)
@@ -500,9 +500,9 @@
nodeManager.interrupt();
backupActivationThread.interrupt();
+
+ backupActivationThread.join(500);
- // TODO: do we really need this?
- Thread.sleep(1000);
}
if (System.currentTimeMillis() - start >= timeout)