[jboss-cvs] JBoss Messaging SVN: r5467 - in trunk: tests/src/org/jboss/messaging/tests/integration/cluster/failover and 1 other directory.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Fri Dec 5 12:21:18 EST 2008
Author: timfox
Date: 2008-12-05 12:21:17 -0500 (Fri, 05 Dec 2008)
New Revision: 5467
Modified:
trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java
trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/MultiThreadRandomFailoverTest.java
trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/RandomFailoverTest.java
Log:
Fixed deadlock and tweaks
Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java 2008-12-05 16:40:47 UTC (rev 5466)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java 2008-12-05 17:21:17 UTC (rev 5467)
@@ -435,19 +435,21 @@
{
return;
}
+
+ destroyed = true;
+ }
- log.warn(me.getMessage());
+ log.warn(me.getMessage());
- // Then call the listeners
- callListeners(me);
+ // Then call the listeners
+ callListeners(me);
- internalClose();
+ internalClose();
- for (Channel channel : channels.values())
- {
- channel.fail();
- }
- }
+ for (Channel channel : channels.values())
+ {
+ channel.fail();
+ }
}
public void destroy()
@@ -458,14 +460,16 @@
{
return;
}
+
+ destroyed = true;
+ }
- internalClose();
+ internalClose();
- // TODO: https://jira.jboss.org/jira/browse/JBMESSAGING-1421
- // This affects clustering, so I'm keeping this out for now
- // We need to inform Listeners about the connection being closed
- // callListeners(null);
- }
+ // TODO: https://jira.jboss.org/jira/browse/JBMESSAGING-1421
+ // This affects clustering, so I'm keeping this out for now
+ // We need to inform Listeners about the connection being closed
+ // callListeners(null);
}
public boolean isExpired(final long now)
@@ -578,8 +582,6 @@
pingChannel.close();
- destroyed = true;
-
// We close the underlying transport connection
transportConnection.close();
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/MultiThreadRandomFailoverTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/MultiThreadRandomFailoverTest.java 2008-12-05 16:40:47 UTC (rev 5466)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/MultiThreadRandomFailoverTest.java 2008-12-05 17:21:17 UTC (rev 5467)
@@ -40,6 +40,7 @@
import org.jboss.messaging.core.exception.MessagingException;
import org.jboss.messaging.core.logging.Logger;
import org.jboss.messaging.core.remoting.impl.RemotingConnectionImpl;
+import org.jboss.messaging.core.remoting.impl.invm.InVMConnector;
import org.jboss.messaging.core.remoting.impl.invm.InVMRegistry;
import org.jboss.messaging.core.remoting.impl.invm.TransportConstants;
import org.jboss.messaging.core.server.MessagingService;
@@ -290,7 +291,9 @@
if (!ok)
{
- throw new Exception("Timed out waiting for messages on handler " + System.identityHashCode(handler) + " threadnum " + threadNum);
+ throw new Exception("Timed out waiting for messages on handler " + System.identityHashCode(handler) +
+ " threadnum " +
+ threadNum);
}
if (handler.failure != null)
@@ -375,7 +378,9 @@
if (!ok)
{
- throw new Exception("Timed out waiting for messages on handler " + System.identityHashCode(handler) + " threadnum " + threadNum);
+ throw new Exception("Timed out waiting for messages on handler " + System.identityHashCode(handler) +
+ " threadnum " +
+ threadNum);
}
if (handler.failure != null)
@@ -465,7 +470,9 @@
if (!ok)
{
- throw new Exception("Timed out waiting for messages on handler " + System.identityHashCode(handler) + " threadnum " + threadNum);
+ throw new Exception("Timed out waiting for messages on handler " + System.identityHashCode(handler) +
+ " threadnum " +
+ threadNum);
}
if (handler.failure != null)
@@ -538,7 +545,7 @@
sessions.add(sessConsume);
}
-
+
ClientSession sessSend = sf.createSession(false, false, false);
ClientProducer producer = sessSend.createProducer(ADDRESS);
@@ -546,17 +553,16 @@
sendMessages(sessSend, producer, numMessages, threadNum);
sessSend.rollback();
-
+
sendMessages(sessSend, producer, numMessages, threadNum);
-
+
sessSend.commit();
-
for (ClientSession session : sessions)
{
session.start();
}
-
+
Set<MyHandler> handlers = new HashSet<MyHandler>();
for (ClientConsumer consumer : consumers)
@@ -574,7 +580,9 @@
if (!ok)
{
- throw new Exception("Timed out waiting for messages on handler " + System.identityHashCode(handler) + " threadnum " + threadNum);
+ throw new Exception("Timed out waiting for messages on handler " + System.identityHashCode(handler) +
+ " threadnum " +
+ threadNum);
}
if (handler.failure != null)
@@ -582,9 +590,9 @@
throw new Exception("Handler failed: " + handler.failure);
}
}
-
+
handlers.clear();
-
+
// Set handlers to null
for (ClientConsumer consumer : consumers)
{
@@ -595,7 +603,7 @@
{
session.rollback();
}
-
+
// New handlers
for (ClientConsumer consumer : consumers)
{
@@ -605,27 +613,29 @@
handlers.add(handler);
}
-
+
for (MyHandler handler : handlers)
{
boolean ok = handler.latch.await(10000, TimeUnit.MILLISECONDS);
if (!ok)
{
- throw new Exception("Timed out waiting for messages on handler " + System.identityHashCode(handler) + " threadnum " + threadNum);
+ throw new Exception("Timed out waiting for messages on handler " + System.identityHashCode(handler) +
+ " threadnum " +
+ threadNum);
}
-
+
if (handler.failure != null)
{
throw new Exception("Handler failed on rollback: " + handler.failure);
}
}
-
+
for (ClientSession session : sessions)
{
session.commit();
}
-
+
sessSend.close();
for (ClientSession session : sessions)
{
@@ -1236,8 +1246,12 @@
final ClientSessionFactoryInternal sf = new ClientSessionFactoryImpl(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory"),
new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
- backupParams));
-
+ backupParams),
+ 0,
+ 1,
+ ClientSessionFactoryImpl.DEFAULT_MAX_RETRIES_BEFORE_FAILOVER,
+ ClientSessionFactoryImpl.DEFAULT_MAX_RETRIES_AFTER_FAILOVER);
+
sf.setSendWindowSize(32 * 1024);
ClientSession session = sf.createSession(false, false, false);
@@ -1298,6 +1312,8 @@
}
while (!failer.isExecuted());
+ InVMConnector.resetFailures();
+
session.close();
assertEquals(0, sf.numSessions());
@@ -1339,7 +1355,8 @@
.add(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMAcceptorFactory"));
Map<String, TransportConfiguration> connectors = new HashMap<String, TransportConfiguration>();
TransportConfiguration backupTC = new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
- backupParams, "backup-connector");
+ backupParams,
+ "backup-connector");
connectors.put(backupTC.getName(), backupTC);
liveConf.setConnectorConfigurations(connectors);
liveConf.setBackupConnectorName(backupTC.getName());
@@ -1402,9 +1419,9 @@
int tn = (Integer)msg.getProperty(new SimpleString("threadnum"));
int cnt = (Integer)msg.getProperty(new SimpleString("count"));
-
- // log.info("Got message " + tn + ":" + cnt);
+ // log.info("Got message " + tn + ":" + cnt);
+
Integer c = consumerCounts.get(tn);
if (c == null)
{
@@ -1450,6 +1467,8 @@
RemotingConnectionImpl conn = (RemotingConnectionImpl)((ClientSessionImpl)session).getConnection();
+ InVMConnector.numberOfFailures = 1;
+ InVMConnector.failOnCreateConnection = true;
conn.fail(new MessagingException(MessagingException.NOT_CONNECTED, "blah"));
log.info("** Fail complete");
@@ -1538,11 +1557,11 @@
c = new Integer(cnt);
}
- //log.info(System.identityHashCode(this) + " consumed message " + threadNum + ":" + cnt);
-
+ // log.info(System.identityHashCode(this) + " consumed message " + threadNum + ":" + cnt);
+
if (tn == threadNum && cnt != c.intValue())
{
- failure = "Invalid count, expected " + threadNum + ":" + c + " got " + cnt;
+ failure = "Invalid count, expected " + threadNum + ":" + c + " got " + cnt;
log.error(failure);
latch.countDown();
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/RandomFailoverTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/RandomFailoverTest.java 2008-12-05 16:40:47 UTC (rev 5466)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/RandomFailoverTest.java 2008-12-05 17:21:17 UTC (rev 5467)
@@ -36,6 +36,7 @@
import org.jboss.messaging.core.config.impl.ConfigurationImpl;
import org.jboss.messaging.core.exception.MessagingException;
import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.remoting.impl.invm.InVMConnector;
import org.jboss.messaging.core.remoting.impl.invm.InVMRegistry;
import org.jboss.messaging.core.remoting.impl.invm.TransportConstants;
import org.jboss.messaging.core.server.MessagingService;
@@ -65,7 +66,7 @@
private MessagingService backupService;
private final Map<String, Object> backupParams = new HashMap<String, Object>();
-
+
private Timer timer;
// Static --------------------------------------------------------
@@ -74,7 +75,6 @@
// Public --------------------------------------------------------
-
public void testA() throws Exception
{
runTest(new RunnableT()
@@ -85,7 +85,7 @@
}
});
}
-
+
public void testB() throws Exception
{
runTest(new RunnableT()
@@ -96,7 +96,7 @@
}
});
}
-
+
public void testC() throws Exception
{
runTest(new RunnableT()
@@ -107,7 +107,7 @@
}
});
}
-
+
public void testD() throws Exception
{
runTest(new RunnableT()
@@ -118,7 +118,7 @@
}
});
}
-
+
public void testE() throws Exception
{
runTest(new RunnableT()
@@ -129,7 +129,7 @@
}
});
}
-
+
public void testF() throws Exception
{
runTest(new RunnableT()
@@ -140,7 +140,7 @@
}
});
}
-
+
public void testG() throws Exception
{
runTest(new RunnableT()
@@ -151,7 +151,7 @@
}
});
}
-
+
public void testH() throws Exception
{
runTest(new RunnableT()
@@ -162,7 +162,7 @@
}
});
}
-
+
public void testI() throws Exception
{
runTest(new RunnableT()
@@ -173,7 +173,7 @@
}
});
}
-
+
public void testJ() throws Exception
{
runTest(new RunnableT()
@@ -184,7 +184,7 @@
}
});
}
-
+
public void testK() throws Exception
{
runTest(new RunnableT()
@@ -195,7 +195,7 @@
}
});
}
-
+
public void testL() throws Exception
{
runTest(new RunnableT()
@@ -206,7 +206,7 @@
}
});
}
-
+
public void testN() throws Exception
{
runTest(new RunnableT()
@@ -221,17 +221,21 @@
public void runTest(final RunnableT runnable) throws Exception
{
final int numIts = getNumIterations();
-
+
for (int its = 0; its < numIts; its++)
{
start();
ClientSessionFactoryImpl sf = new ClientSessionFactoryImpl(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory"),
new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
- backupParams));
+ backupParams),
+ 0,
+ 1,
+ ClientSessionFactoryImpl.DEFAULT_MAX_RETRIES_BEFORE_FAILOVER,
+ ClientSessionFactoryImpl.DEFAULT_MAX_RETRIES_AFTER_FAILOVER);
sf.setSendWindowSize(32 * 1024);
-
+
ClientSession session = sf.createSession(false, false, false);
Failer failer = startFailer(1000, session);
@@ -242,25 +246,27 @@
runnable.run(sf);
}
while (!failer.isExecuted());
+
+ InVMConnector.resetFailures();
session.close();
assertEquals(0, sf.numSessions());
-
+
assertEquals(0, sf.numConnections());
-
+
stop();
}
}
-
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
-
+
protected void doTestA(final ClientSessionFactory sf) throws Exception
{
long start = System.currentTimeMillis();
-
+
log.info("starting================");
ClientSession s = sf.createSession(false, false, false);
@@ -902,7 +908,7 @@
Set<ClientConsumer> consumers = new HashSet<ClientConsumer>();
Set<ClientSession> sessions = new HashSet<ClientSession>();
-
+
for (int i = 0; i < numSessions; i++)
{
SimpleString subName = new SimpleString("sub" + i);
@@ -933,12 +939,12 @@
message.getBody().flip();
producer.send(message);
}
-
+
for (ClientSession session : sessions)
{
session.start();
}
-
+
for (int i = 0; i < numMessages; i++)
{
for (ClientConsumer consumer : consumers)
@@ -949,7 +955,7 @@
{
throw new IllegalStateException("Failed to receive message " + i);
}
-
+
assertNotNull(msg);
assertEquals(i, msg.getProperty(new SimpleString("count")));
@@ -982,7 +988,7 @@
}
s.close();
-
+
assertEquals(1, ((ClientSessionFactoryImpl)sf).numSessions());
long end = System.currentTimeMillis();
@@ -1051,7 +1057,7 @@
}
sessSend.commit();
-
+
log.info("sent and committed");
for (int i = 0; i < numMessages; i++)
@@ -1305,7 +1311,7 @@
sessCreate.deleteQueue(ADDRESS);
- sessCreate.close();
+ sessCreate.close();
}
protected void doTestJ(final ClientSessionFactory sf) throws Exception
@@ -1379,7 +1385,6 @@
s.close();
}
-
protected void doTestN(final ClientSessionFactory sf) throws Exception
{
@@ -1426,44 +1431,44 @@
sessCreate.close();
}
-
+
protected int getNumIterations()
{
return 20;
}
-
+
protected void setUp() throws Exception
{
super.setUp();
-
+
log.info("*********** created timer");
timer = new Timer(true);
-
+
log.info("************ Starting test " + this.getName());
}
-
+
protected void tearDown() throws Exception
{
timer.cancel();
-
+
log.info("************ Ended test " + this.getName());
-
+
InVMRegistry.instance.clear();
-
+
super.tearDown();
}
-
+
// Private -------------------------------------------------------
-
+
private Failer startFailer(final long time, final ClientSession session)
{
Failer failer = new Failer((ClientSessionInternal)session);
timer.schedule(failer, (long)(time * Math.random()), 100);
-
+
return failer;
}
-
+
private void start() throws Exception
{
Configuration backupConf = new ConfigurationImpl();
@@ -1486,7 +1491,8 @@
.add(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMAcceptorFactory"));
Map<String, TransportConfiguration> connectors = new HashMap<String, TransportConfiguration>();
TransportConfiguration backupTC = new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
- backupParams, "backup-connector");
+ backupParams,
+ "backup-connector");
connectors.put(backupTC.getName(), backupTC);
liveConf.setConnectorConfigurations(connectors);
liveConf.setBackupConnectorName(backupTC.getName());
@@ -1506,26 +1512,28 @@
assertEquals(0, InVMRegistry.instance.size());
}
-
+
// Inner classes -------------------------------------------------
-
+
class Failer extends TimerTask
- {
+ {
private final ClientSessionInternal session;
private boolean executed;
public Failer(final ClientSessionInternal session)
- {
+ {
this.session = session;
}
public synchronized void run()
{
log.info("** Failing connection");
-
+
+ InVMConnector.numberOfFailures = 1;
+ InVMConnector.failOnCreateConnection = true;
session.getConnection().fail(new MessagingException(MessagingException.NOT_CONNECTED, "oops"));
-
+
log.info("** Fail complete");
cancel();
@@ -1538,9 +1546,9 @@
return executed;
}
}
-
+
public abstract class RunnableT
{
- abstract void run(final ClientSessionFactory sf) throws Exception;
+ abstract void run(final ClientSessionFactory sf) throws Exception;
}
}
More information about the jboss-cvs-commits
mailing list