JBoss hornetq SVN: r10903 - branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/distribution.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-06-30 01:53:02 -0400 (Thu, 30 Jun 2011)
New Revision: 10903
Modified:
branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/distribution/NettyFileStorageSymmetricClusterTest.java
branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/distribution/NettyFileStorageSymmetricClusterWithBackupTest.java
branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/distribution/NettyFileStorageSymmetricClusterWithDiscoveryTest.java
Log:
changes on my branch
Modified: branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/distribution/NettyFileStorageSymmetricClusterTest.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/distribution/NettyFileStorageSymmetricClusterTest.java 2011-06-30 05:48:56 UTC (rev 10902)
+++ branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/distribution/NettyFileStorageSymmetricClusterTest.java 2011-06-30 05:53:02 UTC (rev 10903)
@@ -27,4 +27,12 @@
{
return true;
}
+
+
+ protected boolean isFileStorage()
+ {
+ return true;
+ }
+
+
}
Modified: branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/distribution/NettyFileStorageSymmetricClusterWithBackupTest.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/distribution/NettyFileStorageSymmetricClusterWithBackupTest.java 2011-06-30 05:48:56 UTC (rev 10902)
+++ branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/distribution/NettyFileStorageSymmetricClusterWithBackupTest.java 2011-06-30 05:53:02 UTC (rev 10903)
@@ -36,5 +36,11 @@
{
return true;
}
+
+ protected boolean isFileStorage()
+ {
+ return true;
+ }
+
}
Modified: branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/distribution/NettyFileStorageSymmetricClusterWithDiscoveryTest.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/distribution/NettyFileStorageSymmetricClusterWithDiscoveryTest.java 2011-06-30 05:48:56 UTC (rev 10902)
+++ branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/distribution/NettyFileStorageSymmetricClusterWithDiscoveryTest.java 2011-06-30 05:53:02 UTC (rev 10903)
@@ -27,5 +27,11 @@
{
return true;
}
+
+ protected boolean isFileStorage()
+ {
+ return true;
+ }
+
}
13 years, 5 months
JBoss hornetq SVN: r10902 - in branches/Branch_2_2_EAP_cluster_clean2: src/main/org/hornetq/core/postoffice/impl and 7 other directories.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-06-30 01:48:56 -0400 (Thu, 30 Jun 2011)
New Revision: 10902
Modified:
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/protocol/core/impl/PacketImpl.java
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/protocol/core/impl/RemotingConnectionImpl.java
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/remoting/impl/invm/InVMAcceptor.java
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/remoting/impl/invm/InVMConnection.java
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/impl/QueueImpl.java
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/integration/logging/HornetQLoggerFormatter.java
branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/distribution/SymmetricClusterTest.java
branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/failover/GroupingFailoverTestBase.java
Log:
changes on my branch
Modified: branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java 2011-06-29 16:54:35 UTC (rev 10901)
+++ branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java 2011-06-30 05:48:56 UTC (rev 10902)
@@ -366,8 +366,18 @@
public void connectionDestroyed(final Object connectionID)
{
- handleConnectionFailure(connectionID,
- new HornetQException(HornetQException.NOT_CONNECTED, "Channel disconnected"));
+ // It has to use the same executor as the disconnect message is being sent through
+
+ final HornetQException ex = new HornetQException(HornetQException.DISCONNECTED, "Channel disconnected");
+
+ closeExecutor.execute(new Runnable()
+ {
+ public void run()
+ {
+ handleConnectionFailure(connectionID, ex);
+ }
+ });
+
}
public void connectionException(final Object connectionID, final HornetQException me)
@@ -1365,6 +1375,8 @@
if (type == PacketImpl.DISCONNECT)
{
final DisconnectMessage msg = (DisconnectMessage)packet;
+
+ log.info("PUTZ10 Disconnect arrived: " + msg);
closeExecutor.execute(new Runnable()
{
@@ -1378,6 +1390,7 @@
serverLocator.notifyNodeDown(msg.getNodeID().toString());
}
+ log.info("Disconnect being called on connection");
conn.fail(new HornetQException(HornetQException.DISCONNECTED,
"The connection was disconnected because of server shutdown"));
Modified: branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java 2011-06-29 16:54:35 UTC (rev 10901)
+++ branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java 2011-06-30 05:48:56 UTC (rev 10902)
@@ -19,6 +19,7 @@
import java.security.AccessController;
import java.security.PrivilegedAction;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
@@ -1232,6 +1233,18 @@
// Notify if waiting on getting topology
notify();
}
+
+ /* (non-Javadoc)
+ * @see java.lang.Object#toString()
+ */
+ @Override
+ public String toString()
+ {
+ return "ServerLocatorImpl [initialConnectors=" + Arrays.toString(initialConnectors) +
+ ", discoveryGroupConfiguration=" +
+ discoveryGroupConfiguration +
+ "]";
+ }
private void updateArraysAndPairs()
{
Modified: branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java 2011-06-29 16:54:35 UTC (rev 10901)
+++ branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java 2011-06-30 05:48:56 UTC (rev 10902)
@@ -816,6 +816,15 @@
}
+ /* (non-Javadoc)
+ * @see java.lang.Object#toString()
+ */
+ @Override
+ public String toString()
+ {
+ return "PostOfficeImpl [server=" + server + "]";
+ }
+
// Private -----------------------------------------------------------------
/**
Modified: branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/protocol/core/impl/PacketImpl.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/protocol/core/impl/PacketImpl.java 2011-06-29 16:54:35 UTC (rev 10901)
+++ branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/protocol/core/impl/PacketImpl.java 2011-06-30 05:48:56 UTC (rev 10902)
@@ -305,7 +305,7 @@
protected String getParentString()
{
- return "PACKET[type=" + type + ", channelID=" + channelID + ", packetObject=" + this.getClass().getSimpleName();
+ return "PACKET(" + this.getClass().getSimpleName() + ")[type=" + type + ", channelID=" + channelID + ", packetObject=" + this.getClass().getSimpleName();
}
// Protected -----------------------------------------------------
Modified: branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/protocol/core/impl/RemotingConnectionImpl.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/protocol/core/impl/RemotingConnectionImpl.java 2011-06-29 16:54:35 UTC (rev 10901)
+++ branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/protocol/core/impl/RemotingConnectionImpl.java 2011-06-30 05:48:56 UTC (rev 10902)
@@ -158,8 +158,6 @@
public String toString()
{
return "RemotingConnectionImpl [clientID=" + clientID +
- ", decoder=" +
- decoder +
", nodeID=" +
nodeID +
", transportConnection=" +
Modified: branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/remoting/impl/invm/InVMAcceptor.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/remoting/impl/invm/InVMAcceptor.java 2011-06-29 16:54:35 UTC (rev 10901)
+++ branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/remoting/impl/invm/InVMAcceptor.java 2011-06-30 05:48:56 UTC (rev 10902)
@@ -228,20 +228,22 @@
public void connectionDestroyed(final Object connectionID)
{
- if (connections.remove(connectionID) != null)
+ InVMConnection connection = (InVMConnection)connections.remove(connectionID);
+
+ if (connection != null)
{
+
listener.connectionDestroyed(connectionID);
- // Execute on different thread to avoid deadlocks
- new Thread()
+ // Execute on different thread after all the packets are sent, to avoid deadlocks
+ connection.getExecutor().execute(new Runnable()
{
- @Override
public void run()
{
- // Remove on the other side too
- connector.disconnect((String)connectionID);
+ // Remove on the other side too
+ connector.disconnect((String)connectionID);
}
- }.start();
+ });
}
}
Modified: branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/remoting/impl/invm/InVMConnection.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/remoting/impl/invm/InVMConnection.java 2011-06-29 16:54:35 UTC (rev 10901)
+++ branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/remoting/impl/invm/InVMConnection.java 2011-06-30 05:48:56 UTC (rev 10902)
@@ -169,6 +169,11 @@
public void removeReadyListener(ReadyListener listener)
{
}
+
+ public Executor getExecutor()
+ {
+ return executor;
+ }
/* (non-Javadoc)
* @see java.lang.Object#toString()
Modified: branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java 2011-06-29 16:54:35 UTC (rev 10901)
+++ branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java 2011-06-30 05:48:56 UTC (rev 10902)
@@ -472,7 +472,7 @@
if (isTrace)
{
- log.trace("Bridge " + name + " is handling reference=" + ref);
+ log.trace("Bridge " + this + " is handling reference=" + ref);
}
ref.handled();
@@ -518,16 +518,11 @@
public final void connectionFailed(final HornetQException me, boolean failedOver)
{
- log.warn(name + "::Connection failed with failedOver=" + failedOver, me);
- if (isTrace)
- {
- log.trace("Calling BridgeImpl::connectionFailed(HOrnetQException me=" + me +
- ", boolean failedOver=" +
- failedOver);
- }
+ log.warn(this + "::Connection failed with failedOver=" + failedOver + "-" + me, me);
+
try
{
- csf.cleanup();
+ // csf.cleanup();
}
catch (Throwable dontCare)
{
@@ -535,21 +530,29 @@
try
{
- session.cleanUp(false);
+ // session.cleanUp(false);
}
catch (Throwable dontCare)
{
}
-
- fail(false);
-
- scheduleRetryConnect();
+
+ if (me.getCode() == HornetQException.DISCONNECTED)
+ {
+ log.warn(this + "::Connection failed with failedOver=" + failedOver + "-" + me, me);
+ fail(true);
+ }
+ else
+ {
+ log.warn(this + "::Connection failed with failedOver=" + failedOver + "-" + me, me);
+ fail(false);
+ scheduleRetryConnect();
+ }
}
public void beforeReconnect(final HornetQException exception)
{
- log.warn(name + "::Connection failed before reconnect ", exception);
- fail(false);
+// log.warn(name + "::Connection failed before reconnect ", exception);
+// fail(false);
}
// Package protected ---------------------------------------------
@@ -564,28 +567,12 @@
@Override
public String toString()
{
- return this.getClass().getName() + " [name=" +
- name +
- ", nodeUUID=" +
- nodeUUID +
- ", queue=" +
- queue +
- ", filter=" +
- filter +
- ", forwardingAddress=" +
- forwardingAddress +
- ", useDuplicateDetection=" +
- useDuplicateDetection +
- ", active=" +
- active +
- ", stopping=" +
- stopping +
- "]";
+ return this.getClass().getName() + " [name=" + name + ", queue=" + queue + " targetConnector=" + this.serverLocator + "]";
}
protected void fail(final boolean permanently)
{
- log.debug(name + "::BridgeImpl::fail being called, permanently=" + permanently);
+ log.debug(this + "::fail being called, permanently=" + permanently);
if (queue != null)
{
@@ -625,7 +612,7 @@
/* This is called only when the bridge is activated */
protected void connect()
{
- BridgeImpl.log.info("Connecting bridge " + name + " to its destination [" + nodeUUID.toString() + "], csf=" + this.csf);
+ BridgeImpl.log.info("Connecting " + this + " to its destination [" + nodeUUID.toString() + "], csf=" + this.csf);
retryCount++;
@@ -702,7 +689,7 @@
queue.addConsumer(BridgeImpl.this);
queue.deliverAsync();
- BridgeImpl.log.info("Bridge " + name + " is connected [" + nodeUUID + "-> " + name + "]");
+ BridgeImpl.log.info("Bridge " + this + " is connected");
return;
}
@@ -720,12 +707,12 @@
}
else
{
- BridgeImpl.log.warn("Bridge " + name + " is unable to connect to destination. Retrying", e);
+ BridgeImpl.log.warn("Bridge " + this + " is unable to connect to destination. Retrying", e);
}
}
catch (Exception e)
{
- BridgeImpl.log.warn("Bridge " + name + " is unable to connect to destination. It will be disabled.", e);
+ BridgeImpl.log.warn("Bridge " + this + " is unable to connect to destination. It will be disabled.", e);
}
scheduleRetryConnect();
Modified: branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/impl/QueueImpl.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/impl/QueueImpl.java 2011-06-29 16:54:35 UTC (rev 10901)
+++ branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/impl/QueueImpl.java 2011-06-30 05:48:56 UTC (rev 10902)
@@ -1433,7 +1433,7 @@
@Override
public String toString()
{
- return "QueueImpl[name=" + name.toString() + "]@" + Integer.toHexString(System.identityHashCode(this));
+ return "QueueImpl[name=" + name.toString() + ", postOffice=" + this.postOffice + "]@" + Integer.toHexString(System.identityHashCode(this));
}
// Private
Modified: branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/integration/logging/HornetQLoggerFormatter.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/integration/logging/HornetQLoggerFormatter.java 2011-06-29 16:54:35 UTC (rev 10901)
+++ branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/integration/logging/HornetQLoggerFormatter.java 2011-06-30 05:48:56 UTC (rev 10902)
@@ -59,15 +59,21 @@
{
StringWriter sw = new StringWriter();
PrintWriter pw = new PrintWriter(sw);
- record.getThrown().printStackTrace(pw);
+
+ pw.println(record.getThrown() );
+ StackTraceElement[] trace = record.getThrown().getStackTrace();
+ for (int i=0; i < trace.length; i++)
+ pw.println("\tat " + trace[i]);
pw.close();
+
sb.append(sw.toString());
}
catch (Exception ex)
{
}
}
- //sb.append(HornetQLoggerFormatter.LINE_SEPARATOR);
+
+ sb.append(HornetQLoggerFormatter.LINE_SEPARATOR);
return sb.toString();
}
Modified: branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java 2011-06-29 16:54:35 UTC (rev 10901)
+++ branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java 2011-06-30 05:48:56 UTC (rev 10902)
@@ -13,9 +13,11 @@
package org.hornetq.tests.integration.cluster.distribution;
+import java.io.PrintStream;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@@ -360,25 +362,27 @@
System.out.println("Binding = " + qBinding + ", queue=" + qBinding.getQueue());
}
}
- System.out.println("=======================================================================");
+ StringWriter writer = new StringWriter();
+ PrintWriter out = new PrintWriter(writer);
+
try
{
for (HornetQServer hornetQServer : servers)
{
if (hornetQServer != null)
{
- System.out.println(clusterDescription(hornetQServer));
- System.out.println(debugBindings(hornetQServer, hornetQServer.getConfiguration().getManagementNotificationAddress().toString()));
+ out.println(clusterDescription(hornetQServer));
+ out.println(debugBindings(hornetQServer, hornetQServer.getConfiguration().getManagementNotificationAddress().toString()));
}
}
for (HornetQServer hornetQServer : servers)
{
- System.out.println("Management bindings on " + hornetQServer);
+ out.println("Management bindings on " + hornetQServer);
if (hornetQServer != null)
{
- System.out.println(debugBindings(hornetQServer, hornetQServer.getConfiguration().getManagementNotificationAddress().toString()));
+ out.println(debugBindings(hornetQServer, hornetQServer.getConfiguration().getManagementNotificationAddress().toString()));
}
}
}
@@ -387,7 +391,7 @@
}
- throw new IllegalStateException(msg);
+ throw new IllegalStateException(msg + "\n" + writer.toString());
}
@@ -2016,6 +2020,7 @@
protected void stopServers(final int... nodes) throws Exception
{
+ log.info("Stopping nodes " + Arrays.toString(nodes));
for (int node : nodes)
{
if (servers[node] != null && servers[node].isStarted())
Modified: branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/distribution/SymmetricClusterTest.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/distribution/SymmetricClusterTest.java 2011-06-29 16:54:35 UTC (rev 10901)
+++ branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/distribution/SymmetricClusterTest.java 2011-06-30 05:48:56 UTC (rev 10902)
@@ -52,21 +52,6 @@
{
return false;
}
-
- public void _testLoop() throws Throwable
- {
- for (int i = 0 ; i < 1000; i++)
- {
- log.info("#test " + i);
- testStopAllStartAll();
- if (i + 1 < 1000)
- {
- tearDown();
- setUp();
- }
- }
- }
-
public void testStopAllStartAll() throws Throwable
{
try
@@ -1620,6 +1605,22 @@
verifyReceiveRoundRobinInSomeOrder(10, 26, 27);
}
+
+ public void _testLoop() throws Throwable
+ {
+ for (int i = 0 ; i < 1000; i++)
+ {
+ log.info("#test " + i);
+ testStopSuccessiveServers();
+ if (i + 1 < 1000)
+ {
+ tearDown();
+ setUp();
+ }
+ }
+ }
+
+
public void testStopSuccessiveServers() throws Exception
{
setupCluster();
@@ -1842,5 +1843,12 @@
stopServers(0, 1, 2, 3, 4);
}
+
+ protected boolean isFileStorage()
+ {
+ return false;
+ }
+
+
}
Modified: branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/failover/GroupingFailoverTestBase.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/failover/GroupingFailoverTestBase.java 2011-06-29 16:54:35 UTC (rev 10901)
+++ branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/failover/GroupingFailoverTestBase.java 2011-06-30 05:48:56 UTC (rev 10902)
@@ -79,7 +79,7 @@
waitForBindings(0, "queues.testaddress", 1, 1, true);
waitForBindings(1, "queues.testaddress", 1, 1, true);
- waitForServerTopology(servers[1], 3, 5);
+ waitForTopology(servers[1], 3);
sendWithProperty(0, "queues.testaddress", 10, false, Message.HDR_GROUP_ID, new SimpleString("id1"));
@@ -162,7 +162,7 @@
waitForBindings(0, "queues.testaddress", 1, 1, true);
waitForBindings(1, "queues.testaddress", 1, 1, true);
- waitForServerTopology(servers[1], 3, 5);
+ waitForTopology(servers[1], 3);
sendWithProperty(0, "queues.testaddress", 10, false, Message.HDR_GROUP_ID, new SimpleString("id1"));
@@ -215,21 +215,6 @@
}
}
- private void waitForServerTopology(HornetQServer server, int nodes, int seconds)
- throws InterruptedException
- {
- Topology topology = server.getClusterManager().getTopology();
- long timeToWait = System.currentTimeMillis() + (seconds * 1000);
- while(topology.nodes()!= nodes)
- {
- Thread.sleep(100);
- if(System.currentTimeMillis() > timeToWait)
- {
- fail("timed out waiting for server topology");
- }
- }
- }
-
public boolean isNetty()
{
return true;
13 years, 5 months
JBoss hornetq SVN: r10901 - in branches/HORNETQ-720_Replication: hornetq-core/src/main/java/org/hornetq/api/core/management and 2 other directories.
by do-not-reply@jboss.org
Author: borges
Date: 2011-06-29 12:54:35 -0400 (Wed, 29 Jun 2011)
New Revision: 10901
Removed:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/api/core/management/DiscoveryGroupControl.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/management/impl/DiscoveryGroupControlImpl.java
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/management/DiscoveryGroupControlTest.java
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/management/DiscoveryGroupControlUsingCoreTest.java
Modified:
branches/HORNETQ-720_Replication/
Log:
merge from trunk
Property changes on: branches/HORNETQ-720_Replication
___________________________________________________________________
Modified: svn:mergeinfo
- /trunk:10878-10896
+ /trunk:10878-10900
Deleted: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/api/core/management/DiscoveryGroupControl.java
===================================================================
Deleted: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/management/impl/DiscoveryGroupControlImpl.java
===================================================================
Deleted: branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/management/DiscoveryGroupControlTest.java
===================================================================
Deleted: branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/management/DiscoveryGroupControlUsingCoreTest.java
===================================================================
13 years, 5 months
JBoss hornetq SVN: r10900 - in trunk: tests/integration-tests/src/test/java/org/hornetq/tests/integration/management and 1 other directory.
by do-not-reply@jboss.org
Author: borges
Date: 2011-06-29 11:40:52 -0400 (Wed, 29 Jun 2011)
New Revision: 10900
Removed:
trunk/hornetq-core/src/main/java/org/hornetq/core/management/impl/DiscoveryGroupControlImpl.java
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/management/DiscoveryGroupControlTest.java
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/management/DiscoveryGroupControlUsingCoreTest.java
Log:
remove more emtpy(!) files
Deleted: trunk/hornetq-core/src/main/java/org/hornetq/core/management/impl/DiscoveryGroupControlImpl.java
===================================================================
Deleted: trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/management/DiscoveryGroupControlTest.java
===================================================================
Deleted: trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/management/DiscoveryGroupControlUsingCoreTest.java
===================================================================
13 years, 6 months
JBoss hornetq SVN: r10899 - trunk/hornetq-core/src/main/java/org/hornetq/api/core/management.
by do-not-reply@jboss.org
Author: borges
Date: 2011-06-29 11:38:08 -0400 (Wed, 29 Jun 2011)
New Revision: 10899
Removed:
trunk/hornetq-core/src/main/java/org/hornetq/api/core/management/DiscoveryGroupControl.java
Log:
remove empty(!) file
Deleted: trunk/hornetq-core/src/main/java/org/hornetq/api/core/management/DiscoveryGroupControl.java
===================================================================
13 years, 6 months
JBoss hornetq SVN: r10898 - branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl.
by do-not-reply@jboss.org
Author: borges
Date: 2011-06-29 09:51:40 -0400 (Wed, 29 Jun 2011)
New Revision: 10898
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java
Log:
HORNETQ-720 Initialize replicationEndpoint
Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java 2011-06-29 09:58:47 UTC (rev 10897)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java 2011-06-29 13:51:40 UTC (rev 10898)
@@ -87,6 +87,7 @@
import org.hornetq.core.remoting.server.impl.RemotingServiceImpl;
import org.hornetq.core.replication.ReplicationEndpoint;
import org.hornetq.core.replication.ReplicationManager;
+import org.hornetq.core.replication.impl.ReplicationEndpointImpl;
import org.hornetq.core.replication.impl.ReplicationManagerImpl;
import org.hornetq.core.security.CheckType;
import org.hornetq.core.security.Role;
@@ -992,11 +993,17 @@
public synchronized ReplicationEndpoint connectToReplicationEndpoint(final Channel channel) throws Exception
{
- if (!configuration.isBackup())
+ if (configuration.isBackup())
{
- throw new HornetQException(HornetQException.ILLEGAL_STATE, "Connected server is not a backup server");
+ throw new HornetQException(HornetQException.ILLEGAL_STATE, "Connected server is a backup server");
}
+ assert replicationEndpoint == null;
+ replicationEndpoint = new ReplicationEndpointImpl(this);
+
+ if (replicationEndpoint == null)
+ System.err.println("endpoint is null!");
+
if (replicationEndpoint.getChannel() != null)
{
throw new HornetQException(HornetQException.ILLEGAL_STATE,
13 years, 6 months
JBoss hornetq SVN: r10897 - in branches/HORNETQ-720_Replication: hornetq-core/src/main/java/org/hornetq/core and 1 other directories.
by do-not-reply@jboss.org
Author: borges
Date: 2011-06-29 05:58:47 -0400 (Wed, 29 Jun 2011)
New Revision: 10897
Removed:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/list/
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/aardvark/
Modified:
branches/HORNETQ-720_Replication/
Log:
merge changes from trunk
Property changes on: branches/HORNETQ-720_Replication
___________________________________________________________________
Modified: svn:mergeinfo
- /trunk:10878-10884
+ /trunk:10878-10896
13 years, 6 months
JBoss hornetq SVN: r10895 - branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/distribution.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-06-28 21:07:24 -0400 (Tue, 28 Jun 2011)
New Revision: 10895
Modified:
branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/distribution/SimpleSymmetricClusterTest.java
branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/distribution/SymmetricClusterTest.java
Log:
tweaks on clustering
Modified: branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java 2011-06-29 00:59:58 UTC (rev 10894)
+++ branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java 2011-06-29 01:07:24 UTC (rev 10895)
@@ -37,6 +37,7 @@
import org.hornetq.api.core.client.ClientSessionFactory;
import org.hornetq.api.core.client.HornetQClient;
import org.hornetq.api.core.client.ServerLocator;
+import org.hornetq.core.client.impl.Topology;
import org.hornetq.core.config.BroadcastGroupConfiguration;
import org.hornetq.core.config.ClusterConnectionConfiguration;
import org.hornetq.core.config.Configuration;
@@ -84,7 +85,7 @@
TransportConstants.DEFAULT_PORT + 8,
TransportConstants.DEFAULT_PORT + 9, };
- private static final long WAIT_TIMEOUT = 5000;
+ private static final long WAIT_TIMEOUT = 10000;
@Override
protected void setUp() throws Exception
@@ -243,7 +244,37 @@
throw new IllegalStateException(msg);
}
+
+ protected void waitForTopology(final HornetQServer server, final int nodes) throws Exception
+ {
+ log.debug("waiting for " + nodes + " on the topology for server = " + server);
+
+ long start = System.currentTimeMillis();
+
+ Topology topology = server.getClusterManager().getTopology();
+
+ do
+ {
+ if (nodes == topology.getMembers().size())
+ {
+ return;
+ }
+
+ Thread.sleep(10);
+ }
+ while (System.currentTimeMillis() - start < ClusterTestBase.WAIT_TIMEOUT);
+
+ String msg = "Timed out waiting for cluster topology of " + nodes + " (received " + topology.getMembers().size() + ") nodes on server = " + server + ")\n Current topology:" + topology.describe();
+
+ ClusterTestBase.log.error(msg);
+
+ throw new Exception (msg);
+
+
+
+ }
+
protected void waitForBindings(final int node,
final String address,
final int expectedBindingCount,
@@ -287,14 +318,6 @@
{
if (binding instanceof LocalQueueBinding && local || binding instanceof RemoteQueueBinding && !local)
{
- if (local)
- {
- log.debug("found binding " + binding + " on node " + server);
- }
- else
- {
- log.debug("found remote binding " + binding + " on node " + server);
- }
QueueBinding qBinding = (QueueBinding)binding;
bindingCount++;
@@ -1995,7 +2018,7 @@
{
for (int node : nodes)
{
- if (servers[node].isStarted())
+ if (servers[node] != null && servers[node].isStarted())
{
try
{
Modified: branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/distribution/SimpleSymmetricClusterTest.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/distribution/SimpleSymmetricClusterTest.java 2011-06-29 00:59:58 UTC (rev 10894)
+++ branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/distribution/SimpleSymmetricClusterTest.java 2011-06-29 01:07:24 UTC (rev 10895)
@@ -18,7 +18,6 @@
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.core.config.ClusterConnectionConfiguration;
import org.hornetq.core.logging.Logger;
-import org.hornetq.core.server.HornetQServer;
/**
* A SimpleSymmetricClusterTest
@@ -78,7 +77,8 @@
public void tearDown() throws Exception
{
- stopServers(0, 1, 2);
+ log.info("#test tearDown " + loopNumber);
+ stopServers(0, 1, 2, 3, 4);
super.tearDown();
}
@@ -114,8 +114,6 @@
// startServers(3, 4, 5, 0, 1, 2);
startServers(0, 1, 2, 3, 4, 5);
-
- Thread.sleep(1000);
log.info("");
for (int i = 0; i <= 5; i++)
@@ -148,7 +146,7 @@
}
-
+
public void testSimple() throws Exception
{
setupServer(0, true, isNetty());
@@ -157,12 +155,10 @@
setupClusterConnection("cluster0", "queues", false, 1, isNetty(), 0, 1, 2);
setupClusterConnection("cluster1", "queues", false, 1, isNetty(), 1, 2, 0);
- setupClusterConnection("cluster1", "queues", false, 1, isNetty(), 2, 0, 1);
+ setupClusterConnection("cluster2", "queues", false, 1, isNetty(), 2, 0, 1);
startServers(0, 1, 2);
- Thread.sleep(1000);
-
for (int i = 0; i < 10; i++)
log.info("****************************");
for (int i = 0; i <= 2; i++)
@@ -202,7 +198,84 @@
waitForBindings(2, "queues.testaddress", 2, 2, false);
}
+ static int loopNumber;
+ public void _testLoop() throws Throwable
+ {
+ for (int i = 0 ; i < 1000; i++)
+ {
+ loopNumber = i;
+ log.info("#test " + i);
+ testSimple2();
+ if (i + 1 < 1000)
+ {
+ tearDown();
+ setUp();
+ }
+ }
+ }
+
+
+
+ public void testSimple2() throws Exception
+ {
+ setupServer(0, true, isNetty());
+ setupServer(1, true, isNetty());
+ setupServer(2, true, isNetty());
+ setupServer(3, true, isNetty());
+ setupServer(4, true, isNetty());
+
+ setupClusterConnection("cluster0", "queues", false, 1, isNetty(), 0, 1, 2, 3, 4);
+
+ setupClusterConnection("cluster1", "queues", false, 1, isNetty(), 1, 0, 2, 3, 4);
+
+ setupClusterConnection("cluster2", "queues", false, 1, isNetty(), 2, 0, 1, 3, 4);
+
+ setupClusterConnection("cluster3", "queues", false, 1, isNetty(), 3, 0, 1, 2, 4);
+
+ setupClusterConnection("cluster4", "queues", false, 1, isNetty(), 4, 0, 1, 2, 3);
+
+ startServers(0, 1, 2, 3, 4);
+
+ for (int i = 0 ; i <= 4; i++)
+ {
+ waitForTopology(servers[i], 5);
+ }
+
+ log.info("All the servers have been started already!");
+
+ for (int i = 0; i <= 4; i++)
+ {
+ log.info("*************************************\n " + servers[i] +
+ " topology:\n" +
+ servers[i].getClusterManager().getTopology().describe());
+ }
+
+ for (int i = 0; i <= 4; i++)
+ {
+ setupSessionFactory(i, isNetty());
+ }
+
+ for (int i = 0 ; i <= 4; i++)
+ {
+ createQueue(i, "queues.testaddress", "queue0", null, false);
+ }
+
+ for (int i = 0 ; i <= 4; i++)
+ {
+ addConsumer(i, i, "queue0", null);
+ }
+
+ waitForBindings(0, "queues.testaddress", 1, 1, true);
+ waitForBindings(1, "queues.testaddress", 1, 1, true);
+ waitForBindings(2, "queues.testaddress", 1, 1, true);
+
+ waitForBindings(0, "queues.testaddress", 4, 4, false);
+ waitForBindings(1, "queues.testaddress", 4, 4, false);
+ waitForBindings(2, "queues.testaddress", 4, 4, false);
+
+ }
+
public void testSimpleRoundRobbin() throws Exception
{
setupServer(0, true, isNetty());
Modified: branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/distribution/SymmetricClusterTest.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/distribution/SymmetricClusterTest.java 2011-06-29 00:59:58 UTC (rev 10894)
+++ branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/distribution/SymmetricClusterTest.java 2011-06-29 01:07:24 UTC (rev 10895)
@@ -42,6 +42,7 @@
@Override
protected void tearDown() throws Exception
{
+ log.info("#test tearDown");
stopServers();
super.tearDown();
@@ -51,6 +52,20 @@
{
return false;
}
+
+ public void _testLoop() throws Throwable
+ {
+ for (int i = 0 ; i < 1000; i++)
+ {
+ log.info("#test " + i);
+ testStopAllStartAll();
+ if (i + 1 < 1000)
+ {
+ tearDown();
+ setUp();
+ }
+ }
+ }
public void testStopAllStartAll() throws Throwable
{
13 years, 6 months
JBoss hornetq SVN: r10894 - in branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core: server/cluster/impl and 1 other directories.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-06-28 20:59:58 -0400 (Tue, 28 Jun 2011)
New Revision: 10894
Modified:
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/Topology.java
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionBridge.java
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
Log:
Tweaks on my branch
Modified: branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java 2011-06-28 15:21:14 UTC (rev 10893)
+++ branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java 2011-06-29 00:59:58 UTC (rev 10894)
@@ -1245,7 +1245,7 @@
}
}
- if (serverLocator.isHA())
+ if (serverLocator.isHA() || serverLocator.isClusterConnection())
{
if (isDebug)
{
Modified: branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java 2011-06-28 15:21:14 UTC (rev 10893)
+++ branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java 2011-06-29 00:59:58 UTC (rev 10894)
@@ -76,7 +76,7 @@
private StaticConnector staticConnector = new StaticConnector();
- private final Topology topology = new Topology();
+ private final Topology topology = new Topology(this);
private Pair<TransportConfiguration, TransportConfiguration>[] topologyArray;
Modified: branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/Topology.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/Topology.java 2011-06-28 15:21:14 UTC (rev 10893)
+++ branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/Topology.java 2011-06-29 00:59:58 UTC (rev 10894)
@@ -37,6 +37,21 @@
private static final Logger log = Logger.getLogger(Topology.class);
+
+ /** Used to debug operations.
+ *
+ * Someone may argue this is not needed. But it's impossible to debg anything related to topology without knowing what node
+ * or what object missed a Topology update.
+ *
+ * Hence I added some information to locate debugging here.
+ * */
+ private final Object owner;
+
+
+ public Topology(final Object owner)
+ {
+ this.owner = owner;
+ }
/*
* topology describes the other cluster nodes that this server knows about:
@@ -54,7 +69,7 @@
TopologyMember currentMember = topology.get(nodeId);
if (debug)
{
- log.debug("adding = " + nodeId + ":" + member.getConnector());
+ log.debug(this + "::adding = " + nodeId + ":" + member.getConnector(), new Exception ("trace"));
log.debug("before----------------------------------");
log.debug(describe());
}
@@ -87,7 +102,7 @@
}
if(debug)
{
- log.debug("Topology updated=" + replaced);
+ log.debug(this + "::Topology updated=" + replaced);
log.debug(describe());
}
return replaced;
@@ -192,4 +207,21 @@
{
debug = b;
}
+
+ /* (non-Javadoc)
+ * @see java.lang.Object#toString()
+ */
+ @Override
+ public String toString()
+ {
+ if (owner == null)
+ {
+ return super.toString();
+ }
+ else
+ {
+ return "Topology [owner=" + owner + "]";
+ }
+ }
+
}
Modified: branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionBridge.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionBridge.java 2011-06-28 15:21:14 UTC (rev 10893)
+++ branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionBridge.java 2011-06-29 00:59:58 UTC (rev 10894)
@@ -118,8 +118,6 @@
activated,
storageManager);
- System.out.println("ClusterConnectionBridge");
-
this.discoveryLocator = discoveryLocator;
idsHeaderName = MessageImpl.HDR_ROUTE_TO_IDS.concat(name);
@@ -134,6 +132,11 @@
// we need to disable DLQ check on the clustered bridges
queue.setInternalQueue(true);
+
+ if (log.isDebugEnabled())
+ {
+ log.debug("Setting up bridge between " + clusterConnection.getConnector() + " and " + targetLocator, new Exception ("trace"));
+ }
}
@Override
Modified: branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2011-06-28 15:21:14 UTC (rev 10893)
+++ branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2011-06-29 00:59:58 UTC (rev 10894)
@@ -613,7 +613,7 @@
protected Bridge createClusteredBridge(MessageFlowRecordImpl record) throws Exception
{
- ServerLocator targetLocator = HornetQClient.createServerLocatorWithoutHA(record.getConnector());
+ ServerLocatorInternal targetLocator = (ServerLocatorInternal)HornetQClient.createServerLocatorWithoutHA(record.getConnector());
targetLocator.setReconnectAttempts(0);
@@ -625,6 +625,11 @@
targetLocator.setConfirmationWindowSize(serverLocator.getConfirmationWindowSize());
targetLocator.setBlockOnDurableSend(!useDuplicateDetection);
targetLocator.setBlockOnNonDurableSend(!useDuplicateDetection);
+ targetLocator.setClusterConnection(true);
+
+ targetLocator.setNodeID(serverLocator.getNodeID());
+
+ targetLocator.setClusterTransportConfiguration(serverLocator.getClusterTransportConfiguration());
if(retryInterval > 0)
{
Modified: branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java 2011-06-28 15:21:14 UTC (rev 10893)
+++ branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java 2011-06-29 00:59:58 UTC (rev 10894)
@@ -101,7 +101,7 @@
private Set<ClusterTopologyListener> topologyListeners = new ConcurrentHashSet<ClusterTopologyListener>();
- private Topology topology = new Topology();
+ private Topology topology = new Topology(this);
private volatile ServerLocatorInternal backupServerLocator;
@@ -164,6 +164,11 @@
return str.toString();
}
+ public String toString()
+ {
+ return "ClusterManagerImpl[server=" + server + "]";
+ }
+
public synchronized void start() throws Exception
{
if (started)
@@ -327,8 +332,15 @@
public void addClusterTopologyListener(final ClusterTopologyListener listener, final boolean clusterConnection)
{
topologyListeners.add(listener);
+
// We now need to send the current topology to the client
- topology.sendTopology(listener);
+ executor.execute(new Runnable(){
+ public void run()
+ {
+ topology.sendTopology(listener);
+
+ }
+ });
}
public void removeClusterTopologyListener(final ClusterTopologyListener listener,
Modified: branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2011-06-28 15:21:14 UTC (rev 10893)
+++ branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2011-06-29 00:59:58 UTC (rev 10894)
@@ -500,9 +500,9 @@
nodeManager.interrupt();
backupActivationThread.interrupt();
+
+ backupActivationThread.join(500);
- // TODO: do we really need this?
- Thread.sleep(1000);
}
if (System.currentTimeMillis() - start >= timeout)
13 years, 6 months