[jboss-cvs] JBoss Messaging SVN: r6014 - in trunk: src/main/org/jboss/messaging/core/remoting/impl and 1 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Thu Mar 5 17:39:24 EST 2009
Author: ataylor
Date: 2009-03-05 17:39:23 -0500 (Thu, 05 Mar 2009)
New Revision: 6014
Modified:
trunk/src/main/org/jboss/messaging/core/client/impl/ConnectionManagerImpl.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java
trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/SymmetricClusterTest.java
Log:
thread locking fix and connection destroyed fix
Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ConnectionManagerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ConnectionManagerImpl.java 2009-03-05 17:13:07 UTC (rev 6013)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ConnectionManagerImpl.java 2009-03-05 22:39:23 UTC (rev 6014)
@@ -328,6 +328,11 @@
}
catch (Throwable t)
{
+ if(lock != null)
+ {
+ lock.unlock();
+ lock = null;
+ }
if (connection != null)
{
returnConnection(connection.getID());
Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java 2009-03-05 17:13:07 UTC (rev 6013)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java 2009-03-05 22:39:23 UTC (rev 6014)
@@ -542,6 +542,18 @@
log.error("Failed to execute failure listener", t);
}
}
+ for (ChannelImpl channel : channels.values())
+ {
+ channel.lock.lock();
+ try
+ {
+ channel.sendCondition.signalAll();
+ }
+ finally
+ {
+ channel.lock.unlock();
+ }
+ }
}
private void internalClose()
@@ -1076,7 +1088,10 @@
catch (InterruptedException e)
{
}
-
+ if(closed)
+ {
+ break;
+ }
final long now = System.currentTimeMillis();
toWait -= now - start;
@@ -1438,11 +1453,8 @@
if (packet.isResponse())
{
response = packet;
-
confirm(packet);
-
lock.lock();
-
try
{
sendCondition.signal();
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/SymmetricClusterTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/SymmetricClusterTest.java 2009-03-05 17:13:07 UTC (rev 6013)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/SymmetricClusterTest.java 2009-03-05 22:39:23 UTC (rev 6014)
@@ -65,99 +65,99 @@
return false;
}
-// public void testStopAllStartAll() throws Exception
-// {
-// setupCluster();
-//
-// startServers(0, 1, 2, 3, 4);
-//
-// setupSessionFactory(0, isNetty());
-// setupSessionFactory(1, isNetty());
-// setupSessionFactory(2, isNetty());
-// setupSessionFactory(3, isNetty());
-// setupSessionFactory(4, isNetty());
-//
-// createQueue(0, "queues.testaddress", "queue0", null, false);
-// createQueue(1, "queues.testaddress", "queue0", null, false);
-// createQueue(2, "queues.testaddress", "queue0", null, false);
-// createQueue(3, "queues.testaddress", "queue0", null, false);
-// createQueue(4, "queues.testaddress", "queue0", null, false);
-//
-// addConsumer(0, 0, "queue0", null);
-// addConsumer(1, 1, "queue0", null);
-// addConsumer(2, 2, "queue0", null);
-// addConsumer(3, 3, "queue0", null);
-// addConsumer(4, 4, "queue0", null);
-//
-// waitForBindings(0, "queues.testaddress", 1, 1, true);
-// waitForBindings(1, "queues.testaddress", 1, 1, true);
-// waitForBindings(2, "queues.testaddress", 1, 1, true);
-// waitForBindings(3, "queues.testaddress", 1, 1, true);
-// waitForBindings(4, "queues.testaddress", 1, 1, true);
-//
-// waitForBindings(0, "queues.testaddress", 4, 4, false);
-// waitForBindings(1, "queues.testaddress", 4, 4, false);
-// waitForBindings(2, "queues.testaddress", 4, 4, false);
-// waitForBindings(3, "queues.testaddress", 4, 4, false);
-// waitForBindings(4, "queues.testaddress", 4, 4, false);
-//
-// send(0, "queues.testaddress", 10, false, null);
-//
-// verifyReceiveRoundRobinInSomeOrder(10, 0, 1, 2, 3, 4);
-//
-// this.verifyNotReceive(0, 1, 2, 3, 4);
-//
-// this.removeConsumer(0);
-// this.removeConsumer(1);
-// this.removeConsumer(2);
-// this.removeConsumer(3);
-// this.removeConsumer(4);
-//
-// this.closeAllSessionFactories();
-//
-// stopServers(0, 1, 2, 3, 4);
-//
-// startServers(0, 1, 2, 3, 4);
-//
-// setupSessionFactory(0, isNetty());
-// setupSessionFactory(1, isNetty());
-// setupSessionFactory(2, isNetty());
-// setupSessionFactory(3, isNetty());
-// setupSessionFactory(4, isNetty());
-//
-// createQueue(0, "queues.testaddress", "queue0", null, false);
-// createQueue(1, "queues.testaddress", "queue0", null, false);
-// createQueue(2, "queues.testaddress", "queue0", null, false);
-// createQueue(3, "queues.testaddress", "queue0", null, false);
-// createQueue(4, "queues.testaddress", "queue0", null, false);
-//
-// addConsumer(0, 0, "queue0", null);
-// addConsumer(1, 1, "queue0", null);
-// addConsumer(2, 2, "queue0", null);
-// addConsumer(3, 3, "queue0", null);
-// addConsumer(4, 4, "queue0", null);
-//
-// waitForBindings(0, "queues.testaddress", 1, 1, true);
-// waitForBindings(1, "queues.testaddress", 1, 1, true);
-// waitForBindings(2, "queues.testaddress", 1, 1, true);
-// waitForBindings(3, "queues.testaddress", 1, 1, true);
-// waitForBindings(4, "queues.testaddress", 1, 1, true);
-//
-// waitForBindings(0, "queues.testaddress", 4, 4, false);
-// waitForBindings(1, "queues.testaddress", 4, 4, false);
-// waitForBindings(2, "queues.testaddress", 4, 4, false);
-// waitForBindings(3, "queues.testaddress", 4, 4, false);
-// waitForBindings(4, "queues.testaddress", 4, 4, false);
-//
-// send(0, "queues.testaddress", 10, false, null);
-//
-// verifyReceiveRoundRobinInSomeOrder(10, 0, 1, 2, 3, 4);
-//
-// this.verifyNotReceive(0, 1, 2, 3, 4);
-//
-// log.info("got here");
-// }
+ public void testStopAllStartAll() throws Exception
+ {
+ setupCluster();
+ startServers(0, 1, 2, 3, 4);
+
+ setupSessionFactory(0, isNetty());
+ setupSessionFactory(1, isNetty());
+ setupSessionFactory(2, isNetty());
+ setupSessionFactory(3, isNetty());
+ setupSessionFactory(4, isNetty());
+
+ createQueue(0, "queues.testaddress", "queue0", null, false);
+ createQueue(1, "queues.testaddress", "queue0", null, false);
+ createQueue(2, "queues.testaddress", "queue0", null, false);
+ createQueue(3, "queues.testaddress", "queue0", null, false);
+ createQueue(4, "queues.testaddress", "queue0", null, false);
+
+ addConsumer(0, 0, "queue0", null);
+ addConsumer(1, 1, "queue0", null);
+ addConsumer(2, 2, "queue0", null);
+ addConsumer(3, 3, "queue0", null);
+ addConsumer(4, 4, "queue0", null);
+
+ waitForBindings(0, "queues.testaddress", 1, 1, true);
+ waitForBindings(1, "queues.testaddress", 1, 1, true);
+ waitForBindings(2, "queues.testaddress", 1, 1, true);
+ waitForBindings(3, "queues.testaddress", 1, 1, true);
+ waitForBindings(4, "queues.testaddress", 1, 1, true);
+
+ waitForBindings(0, "queues.testaddress", 4, 4, false);
+ waitForBindings(1, "queues.testaddress", 4, 4, false);
+ waitForBindings(2, "queues.testaddress", 4, 4, false);
+ waitForBindings(3, "queues.testaddress", 4, 4, false);
+ waitForBindings(4, "queues.testaddress", 4, 4, false);
+
+ send(0, "queues.testaddress", 10, false, null);
+
+ verifyReceiveRoundRobinInSomeOrder(10, 0, 1, 2, 3, 4);
+
+ this.verifyNotReceive(0, 1, 2, 3, 4);
+
+ this.removeConsumer(0);
+ this.removeConsumer(1);
+ this.removeConsumer(2);
+ this.removeConsumer(3);
+ this.removeConsumer(4);
+
+ this.closeAllSessionFactories();
+
+ stopServers(0, 1, 2, 3, 4);
+
+ startServers(0, 1, 2, 3, 4);
+
+ setupSessionFactory(0, isNetty());
+ setupSessionFactory(1, isNetty());
+ setupSessionFactory(2, isNetty());
+ setupSessionFactory(3, isNetty());
+ setupSessionFactory(4, isNetty());
+
+ createQueue(0, "queues.testaddress", "queue0", null, false);
+ createQueue(1, "queues.testaddress", "queue0", null, false);
+ createQueue(2, "queues.testaddress", "queue0", null, false);
+ createQueue(3, "queues.testaddress", "queue0", null, false);
+ createQueue(4, "queues.testaddress", "queue0", null, false);
+
+ addConsumer(0, 0, "queue0", null);
+ addConsumer(1, 1, "queue0", null);
+ addConsumer(2, 2, "queue0", null);
+ addConsumer(3, 3, "queue0", null);
+ addConsumer(4, 4, "queue0", null);
+
+ waitForBindings(0, "queues.testaddress", 1, 1, true);
+ waitForBindings(1, "queues.testaddress", 1, 1, true);
+ waitForBindings(2, "queues.testaddress", 1, 1, true);
+ waitForBindings(3, "queues.testaddress", 1, 1, true);
+ waitForBindings(4, "queues.testaddress", 1, 1, true);
+
+ waitForBindings(0, "queues.testaddress", 4, 4, false);
+ waitForBindings(1, "queues.testaddress", 4, 4, false);
+ waitForBindings(2, "queues.testaddress", 4, 4, false);
+ waitForBindings(3, "queues.testaddress", 4, 4, false);
+ waitForBindings(4, "queues.testaddress", 4, 4, false);
+
+ send(0, "queues.testaddress", 10, false, null);
+
+ verifyReceiveRoundRobinInSomeOrder(10, 0, 1, 2, 3, 4);
+
+ this.verifyNotReceive(0, 1, 2, 3, 4);
+
+ log.info("got here");
+ }
+
public void testBasicRoundRobin() throws Exception
{
setupCluster();
More information about the jboss-cvs-commits
mailing list