Author: clebert.suconic(a)jboss.com
Date: 2011-06-30 01:48:56 -0400 (Thu, 30 Jun 2011)
New Revision: 10902
Modified:
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/protocol/core/impl/PacketImpl.java
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/protocol/core/impl/RemotingConnectionImpl.java
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/remoting/impl/invm/InVMAcceptor.java
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/remoting/impl/invm/InVMConnection.java
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/impl/QueueImpl.java
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/integration/logging/HornetQLoggerFormatter.java
branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/distribution/SymmetricClusterTest.java
branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/failover/GroupingFailoverTestBase.java
Log:
changes on my branch
Modified:
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
===================================================================
---
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java 2011-06-29
16:54:35 UTC (rev 10901)
+++
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java 2011-06-30
05:48:56 UTC (rev 10902)
@@ -366,8 +366,18 @@
public void connectionDestroyed(final Object connectionID)
{
- handleConnectionFailure(connectionID,
- new HornetQException(HornetQException.NOT_CONNECTED,
"Channel disconnected"));
+ // It has to use the same executor as the disconnect message is being sent through
+
+ final HornetQException ex = new HornetQException(HornetQException.DISCONNECTED,
"Channel disconnected");
+
+ closeExecutor.execute(new Runnable()
+ {
+ public void run()
+ {
+ handleConnectionFailure(connectionID, ex);
+ }
+ });
+
}
public void connectionException(final Object connectionID, final HornetQException me)
@@ -1365,6 +1375,8 @@
if (type == PacketImpl.DISCONNECT)
{
final DisconnectMessage msg = (DisconnectMessage)packet;
+
+ log.info("PUTZ10 Disconnect arrived: " + msg);
closeExecutor.execute(new Runnable()
{
@@ -1378,6 +1390,7 @@
serverLocator.notifyNodeDown(msg.getNodeID().toString());
}
+ log.info("Disconnect being called on connection");
conn.fail(new HornetQException(HornetQException.DISCONNECTED,
"The connection was disconnected
because of server shutdown"));
Modified:
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
===================================================================
---
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java 2011-06-29
16:54:35 UTC (rev 10901)
+++
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java 2011-06-30
05:48:56 UTC (rev 10902)
@@ -19,6 +19,7 @@
import java.security.AccessController;
import java.security.PrivilegedAction;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
@@ -1232,6 +1233,18 @@
// Notify if waiting on getting topology
notify();
}
+
+ /* (non-Javadoc)
+ * @see java.lang.Object#toString()
+ */
+ @Override
+ public String toString()
+ {
+ return "ServerLocatorImpl [initialConnectors=" +
Arrays.toString(initialConnectors) +
+ ", discoveryGroupConfiguration=" +
+ discoveryGroupConfiguration +
+ "]";
+ }
private void updateArraysAndPairs()
{
Modified:
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
===================================================================
---
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java 2011-06-29
16:54:35 UTC (rev 10901)
+++
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java 2011-06-30
05:48:56 UTC (rev 10902)
@@ -816,6 +816,15 @@
}
+ /* (non-Javadoc)
+ * @see java.lang.Object#toString()
+ */
+ @Override
+ public String toString()
+ {
+ return "PostOfficeImpl [server=" + server + "]";
+ }
+
// Private -----------------------------------------------------------------
/**
Modified:
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/protocol/core/impl/PacketImpl.java
===================================================================
---
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/protocol/core/impl/PacketImpl.java 2011-06-29
16:54:35 UTC (rev 10901)
+++
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/protocol/core/impl/PacketImpl.java 2011-06-30
05:48:56 UTC (rev 10902)
@@ -305,7 +305,7 @@
protected String getParentString()
{
- return "PACKET[type=" + type + ", channelID=" + channelID +
", packetObject=" + this.getClass().getSimpleName();
+ return "PACKET(" + this.getClass().getSimpleName() + ")[type="
+ type + ", channelID=" + channelID + ", packetObject=" +
this.getClass().getSimpleName();
}
// Protected -----------------------------------------------------
Modified:
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/protocol/core/impl/RemotingConnectionImpl.java
===================================================================
---
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/protocol/core/impl/RemotingConnectionImpl.java 2011-06-29
16:54:35 UTC (rev 10901)
+++
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/protocol/core/impl/RemotingConnectionImpl.java 2011-06-30
05:48:56 UTC (rev 10902)
@@ -158,8 +158,6 @@
public String toString()
{
return "RemotingConnectionImpl [clientID=" + clientID +
- ", decoder=" +
- decoder +
", nodeID=" +
nodeID +
", transportConnection=" +
Modified:
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/remoting/impl/invm/InVMAcceptor.java
===================================================================
---
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/remoting/impl/invm/InVMAcceptor.java 2011-06-29
16:54:35 UTC (rev 10901)
+++
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/remoting/impl/invm/InVMAcceptor.java 2011-06-30
05:48:56 UTC (rev 10902)
@@ -228,20 +228,22 @@
public void connectionDestroyed(final Object connectionID)
{
- if (connections.remove(connectionID) != null)
+ InVMConnection connection = (InVMConnection)connections.remove(connectionID);
+
+ if (connection != null)
{
+
listener.connectionDestroyed(connectionID);
- // Execute on different thread to avoid deadlocks
- new Thread()
+ // Execute on different thread after all the packets are sent, to avoid
deadlocks
+ connection.getExecutor().execute(new Runnable()
{
- @Override
public void run()
{
- // Remove on the other side too
- connector.disconnect((String)connectionID);
+ // Remove on the other side too
+ connector.disconnect((String)connectionID);
}
- }.start();
+ });
}
}
Modified:
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/remoting/impl/invm/InVMConnection.java
===================================================================
---
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/remoting/impl/invm/InVMConnection.java 2011-06-29
16:54:35 UTC (rev 10901)
+++
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/remoting/impl/invm/InVMConnection.java 2011-06-30
05:48:56 UTC (rev 10902)
@@ -169,6 +169,11 @@
public void removeReadyListener(ReadyListener listener)
{
}
+
+ public Executor getExecutor()
+ {
+ return executor;
+ }
/* (non-Javadoc)
* @see java.lang.Object#toString()
Modified:
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
===================================================================
---
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java 2011-06-29
16:54:35 UTC (rev 10901)
+++
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java 2011-06-30
05:48:56 UTC (rev 10902)
@@ -472,7 +472,7 @@
if (isTrace)
{
- log.trace("Bridge " + name + " is handling reference=" +
ref);
+ log.trace("Bridge " + this + " is handling reference=" +
ref);
}
ref.handled();
@@ -518,16 +518,11 @@
public final void connectionFailed(final HornetQException me, boolean failedOver)
{
- log.warn(name + "::Connection failed with failedOver=" + failedOver,
me);
- if (isTrace)
- {
- log.trace("Calling BridgeImpl::connectionFailed(HOrnetQException me="
+ me +
- ", boolean failedOver=" +
- failedOver);
- }
+ log.warn(this + "::Connection failed with failedOver=" + failedOver +
"-" + me, me);
+
try
{
- csf.cleanup();
+ // csf.cleanup();
}
catch (Throwable dontCare)
{
@@ -535,21 +530,29 @@
try
{
- session.cleanUp(false);
+ // session.cleanUp(false);
}
catch (Throwable dontCare)
{
}
-
- fail(false);
-
- scheduleRetryConnect();
+
+ if (me.getCode() == HornetQException.DISCONNECTED)
+ {
+ log.warn(this + "::Connection failed with failedOver=" + failedOver +
"-" + me, me);
+ fail(true);
+ }
+ else
+ {
+ log.warn(this + "::Connection failed with failedOver=" + failedOver +
"-" + me, me);
+ fail(false);
+ scheduleRetryConnect();
+ }
}
public void beforeReconnect(final HornetQException exception)
{
- log.warn(name + "::Connection failed before reconnect ", exception);
- fail(false);
+// log.warn(name + "::Connection failed before reconnect ", exception);
+// fail(false);
}
// Package protected ---------------------------------------------
@@ -564,28 +567,12 @@
@Override
public String toString()
{
- return this.getClass().getName() + " [name=" +
- name +
- ", nodeUUID=" +
- nodeUUID +
- ", queue=" +
- queue +
- ", filter=" +
- filter +
- ", forwardingAddress=" +
- forwardingAddress +
- ", useDuplicateDetection=" +
- useDuplicateDetection +
- ", active=" +
- active +
- ", stopping=" +
- stopping +
- "]";
+ return this.getClass().getName() + " [name=" + name + ",
queue=" + queue + " targetConnector=" + this.serverLocator +
"]";
}
protected void fail(final boolean permanently)
{
- log.debug(name + "::BridgeImpl::fail being called, permanently=" +
permanently);
+ log.debug(this + "::fail being called, permanently=" + permanently);
if (queue != null)
{
@@ -625,7 +612,7 @@
/* This is called only when the bridge is activated */
protected void connect()
{
- BridgeImpl.log.info("Connecting bridge " + name + " to its
destination [" + nodeUUID.toString() + "], csf=" + this.csf);
+ BridgeImpl.log.info("Connecting " + this + " to its destination
[" + nodeUUID.toString() + "], csf=" + this.csf);
retryCount++;
@@ -702,7 +689,7 @@
queue.addConsumer(BridgeImpl.this);
queue.deliverAsync();
- BridgeImpl.log.info("Bridge " + name + " is connected [" +
nodeUUID + "-> " + name + "]");
+ BridgeImpl.log.info("Bridge " + this + " is connected");
return;
}
@@ -720,12 +707,12 @@
}
else
{
- BridgeImpl.log.warn("Bridge " + name + " is unable to connect
to destination. Retrying", e);
+ BridgeImpl.log.warn("Bridge " + this + " is unable to connect
to destination. Retrying", e);
}
}
catch (Exception e)
{
- BridgeImpl.log.warn("Bridge " + name + " is unable to connect to
destination. It will be disabled.", e);
+ BridgeImpl.log.warn("Bridge " + this + " is unable to connect to
destination. It will be disabled.", e);
}
scheduleRetryConnect();
Modified:
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/impl/QueueImpl.java
===================================================================
---
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/impl/QueueImpl.java 2011-06-29
16:54:35 UTC (rev 10901)
+++
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/core/server/impl/QueueImpl.java 2011-06-30
05:48:56 UTC (rev 10902)
@@ -1433,7 +1433,7 @@
@Override
public String toString()
{
- return "QueueImpl[name=" + name.toString() + "]@" +
Integer.toHexString(System.identityHashCode(this));
+ return "QueueImpl[name=" + name.toString() + ", postOffice=" +
this.postOffice + "]@" + Integer.toHexString(System.identityHashCode(this));
}
// Private
Modified:
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/integration/logging/HornetQLoggerFormatter.java
===================================================================
---
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/integration/logging/HornetQLoggerFormatter.java 2011-06-29
16:54:35 UTC (rev 10901)
+++
branches/Branch_2_2_EAP_cluster_clean2/src/main/org/hornetq/integration/logging/HornetQLoggerFormatter.java 2011-06-30
05:48:56 UTC (rev 10902)
@@ -59,15 +59,21 @@
{
StringWriter sw = new StringWriter();
PrintWriter pw = new PrintWriter(sw);
- record.getThrown().printStackTrace(pw);
+
+ pw.println(record.getThrown() );
+ StackTraceElement[] trace = record.getThrown().getStackTrace();
+ for (int i=0; i < trace.length; i++)
+ pw.println("\tat " + trace[i]);
pw.close();
+
sb.append(sw.toString());
}
catch (Exception ex)
{
}
}
- //sb.append(HornetQLoggerFormatter.LINE_SEPARATOR);
+
+ sb.append(HornetQLoggerFormatter.LINE_SEPARATOR);
return sb.toString();
}
Modified:
branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
===================================================================
---
branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java 2011-06-29
16:54:35 UTC (rev 10901)
+++
branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java 2011-06-30
05:48:56 UTC (rev 10902)
@@ -13,9 +13,11 @@
package org.hornetq.tests.integration.cluster.distribution;
+import java.io.PrintStream;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@@ -360,25 +362,27 @@
System.out.println("Binding = " + qBinding + ", queue=" +
qBinding.getQueue());
}
}
-
System.out.println("=======================================================================");
+ StringWriter writer = new StringWriter();
+ PrintWriter out = new PrintWriter(writer);
+
try
{
for (HornetQServer hornetQServer : servers)
{
if (hornetQServer != null)
{
- System.out.println(clusterDescription(hornetQServer));
- System.out.println(debugBindings(hornetQServer,
hornetQServer.getConfiguration().getManagementNotificationAddress().toString()));
+ out.println(clusterDescription(hornetQServer));
+ out.println(debugBindings(hornetQServer,
hornetQServer.getConfiguration().getManagementNotificationAddress().toString()));
}
}
for (HornetQServer hornetQServer : servers)
{
- System.out.println("Management bindings on " + hornetQServer);
+ out.println("Management bindings on " + hornetQServer);
if (hornetQServer != null)
{
- System.out.println(debugBindings(hornetQServer,
hornetQServer.getConfiguration().getManagementNotificationAddress().toString()));
+ out.println(debugBindings(hornetQServer,
hornetQServer.getConfiguration().getManagementNotificationAddress().toString()));
}
}
}
@@ -387,7 +391,7 @@
}
- throw new IllegalStateException(msg);
+ throw new IllegalStateException(msg + "\n" + writer.toString());
}
@@ -2016,6 +2020,7 @@
protected void stopServers(final int... nodes) throws Exception
{
+ log.info("Stopping nodes " + Arrays.toString(nodes));
for (int node : nodes)
{
if (servers[node] != null && servers[node].isStarted())
Modified:
branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/distribution/SymmetricClusterTest.java
===================================================================
---
branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/distribution/SymmetricClusterTest.java 2011-06-29
16:54:35 UTC (rev 10901)
+++
branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/distribution/SymmetricClusterTest.java 2011-06-30
05:48:56 UTC (rev 10902)
@@ -52,21 +52,6 @@
{
return false;
}
-
- public void _testLoop() throws Throwable
- {
- for (int i = 0 ; i < 1000; i++)
- {
- log.info("#test " + i);
- testStopAllStartAll();
- if (i + 1 < 1000)
- {
- tearDown();
- setUp();
- }
- }
- }
-
public void testStopAllStartAll() throws Throwable
{
try
@@ -1620,6 +1605,22 @@
verifyReceiveRoundRobinInSomeOrder(10, 26, 27);
}
+
+ public void _testLoop() throws Throwable
+ {
+ for (int i = 0 ; i < 1000; i++)
+ {
+ log.info("#test " + i);
+ testStopSuccessiveServers();
+ if (i + 1 < 1000)
+ {
+ tearDown();
+ setUp();
+ }
+ }
+ }
+
+
public void testStopSuccessiveServers() throws Exception
{
setupCluster();
@@ -1842,5 +1843,12 @@
stopServers(0, 1, 2, 3, 4);
}
+
+ protected boolean isFileStorage()
+ {
+ return false;
+ }
+
+
}
Modified:
branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/failover/GroupingFailoverTestBase.java
===================================================================
---
branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/failover/GroupingFailoverTestBase.java 2011-06-29
16:54:35 UTC (rev 10901)
+++
branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/failover/GroupingFailoverTestBase.java 2011-06-30
05:48:56 UTC (rev 10902)
@@ -79,7 +79,7 @@
waitForBindings(0, "queues.testaddress", 1, 1, true);
waitForBindings(1, "queues.testaddress", 1, 1, true);
- waitForServerTopology(servers[1], 3, 5);
+ waitForTopology(servers[1], 3);
sendWithProperty(0, "queues.testaddress", 10, false,
Message.HDR_GROUP_ID, new SimpleString("id1"));
@@ -162,7 +162,7 @@
waitForBindings(0, "queues.testaddress", 1, 1, true);
waitForBindings(1, "queues.testaddress", 1, 1, true);
- waitForServerTopology(servers[1], 3, 5);
+ waitForTopology(servers[1], 3);
sendWithProperty(0, "queues.testaddress", 10, false,
Message.HDR_GROUP_ID, new SimpleString("id1"));
@@ -215,21 +215,6 @@
}
}
- private void waitForServerTopology(HornetQServer server, int nodes, int seconds)
- throws InterruptedException
- {
- Topology topology = server.getClusterManager().getTopology();
- long timeToWait = System.currentTimeMillis() + (seconds * 1000);
- while(topology.nodes()!= nodes)
- {
- Thread.sleep(100);
- if(System.currentTimeMillis() > timeToWait)
- {
- fail("timed out waiting for server topology");
- }
- }
- }
-
public boolean isNetty()
{
return true;