Author: clebert.suconic(a)jboss.com
Date: 2011-08-03 22:34:21 -0400 (Wed, 03 Aug 2011)
New Revision: 11108
Modified:
branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/failover/AsynchronousFailoverTest.java
Log:
tweak
Modified:
branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/failover/AsynchronousFailoverTest.java
===================================================================
---
branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/failover/AsynchronousFailoverTest.java 2011-08-03
22:16:02 UTC (rev 11107)
+++
branches/Branch_2_2_EAP_cluster_clean2/tests/src/org/hornetq/tests/integration/cluster/failover/AsynchronousFailoverTest.java 2011-08-04
02:34:21 UTC (rev 11108)
@@ -81,10 +81,10 @@
}
});
}
-
+
public void _testLoop() throws Throwable
{
- for (int i = 0 ; i < 1000; i++)
+ for (int i = 0; i < 1000; i++)
{
log.info("#test " + i);
testTransactional();
@@ -182,69 +182,73 @@
locator.setBlockOnDurableSend(true);
locator.setReconnectAttempts(-1);
locator.setConfirmationWindowSize(10 * 1024 * 1024);
- sf = (ClientSessionFactoryInternal)
createSessionFactoryAndWaitForTopology(locator, 2);
+ sf =
(ClientSessionFactoryInternal)createSessionFactoryAndWaitForTopology(locator, 2);
+ try
+ {
+ ClientSession createSession = sf.createSession(true, true);
- ClientSession createSession = sf.createSession(true, true);
+ createSession.createQueue(FailoverTestBase.ADDRESS,
FailoverTestBase.ADDRESS, null, true);
- createSession.createQueue(FailoverTestBase.ADDRESS, FailoverTestBase.ADDRESS,
null, true);
+ RemotingConnection conn =
((ClientSessionInternal)createSession).getConnection();
- RemotingConnection conn =
((ClientSessionInternal)createSession).getConnection();
+ Thread t = new Thread(runnable);
- Thread t = new Thread(runnable);
+ t.setName("MainTEST");
- t.setName("MainTEST");
+ t.start();
- t.start();
+ long randomDelay = (long)(2000 * Math.random());
- long randomDelay = (long)(2000 * Math.random());
+ AsynchronousFailoverTest.log.info("Sleeping " + randomDelay);
- AsynchronousFailoverTest.log.info("Sleeping " + randomDelay);
+ Thread.sleep(randomDelay);
- Thread.sleep(randomDelay);
+ AsynchronousFailoverTest.log.info("Failing asynchronously");
- AsynchronousFailoverTest.log.info("Failing asynchronously");
-
- MyListener listener = this.listener;
-
- // Simulate failure on connection
- synchronized (lockFail)
- {
- if (log.isDebugEnabled())
+ // Simulate failure on connection
+ synchronized (lockFail)
{
- log.debug("#test crashing test");
+ if (log.isDebugEnabled())
+ {
+ log.debug("#test crashing test");
+ }
+ crash((ClientSession)createSession);
}
- crash((ClientSession) createSession);
- }
- /*if (listener != null)
- {
- boolean ok = listener.latch.await(10000, TimeUnit.MILLISECONDS);
+ /*if (listener != null)
+ {
+ boolean ok = listener.latch.await(10000, TimeUnit.MILLISECONDS);
- Assert.assertTrue(ok);
- }*/
+ Assert.assertTrue(ok);
+ }*/
- runnable.setFailed();
+ runnable.setFailed();
- AsynchronousFailoverTest.log.info("Fail complete");
+ AsynchronousFailoverTest.log.info("Fail complete");
- t.join();
+ t.join();
- runnable.checkForExceptions();
+ runnable.checkForExceptions();
- createSession.close();
+ createSession.close();
- if (sf.numSessions() != 0)
+ if (sf.numSessions() != 0)
+ {
+ DelegatingSession.dumpSessionCreationStacks();
+ }
+
+ Assert.assertEquals(0, sf.numSessions());
+
+ locator.close();
+ }
+ finally
{
- DelegatingSession.dumpSessionCreationStacks();
+ locator.close();
+
+ Assert.assertEquals(0, sf.numConnections());
}
- Assert.assertEquals(0, sf.numSessions());
-
- locator.close();
-
- Assert.assertEquals(0, sf.numConnections());
-
if (i != numIts - 1)
{
tearDown();
@@ -259,7 +263,7 @@
DelegatingSession.debug = false;
}
}
-
+
protected void addPayload(ClientMessage msg)
{
}
@@ -294,7 +298,7 @@
message.getBodyBuffer().writeString("message" + i);
message.putIntProperty("counter", i);
-
+
addPayload(message);
producer.send(message);
@@ -304,7 +308,7 @@
catch (HornetQException e)
{
AsynchronousFailoverTest.log.info("exception when sending message
with counter " + i);
- if(e.getCode() != HornetQException.UNBLOCKED)
+ if (e.getCode() != HornetQException.UNBLOCKED)
{
e.printStackTrace();
}
@@ -386,13 +390,13 @@
{
// For duplication detection
int executionId = 0;
-
+
while (!runner.isFailed())
{
ClientSession session = null;
executionId++;
-
+
log.info("#test doTestTransactional starting now. Execution " +
executionId);
try
@@ -426,14 +430,14 @@
message.putStringProperty(Message.HDR_DUPLICATE_DETECTION_ID, new
SimpleString("id:" + i +
",exec:" +
executionId));
-
+
addPayload(message);
if (log.isDebugEnabled())
{
log.debug("Sending message " + message);
}
-
+
producer.send(message);
}
@@ -449,8 +453,7 @@
logAndSystemOut("#test duplicate id rejected on
sending");
break;
}
- else
- if (e.getCode() == HornetQException.TRANSACTION_ROLLED_BACK ||
e.getCode() == HornetQException.UNBLOCKED)
+ else if (e.getCode() == HornetQException.TRANSACTION_ROLLED_BACK ||
e.getCode() == HornetQException.UNBLOCKED)
{
log.info("#test transaction rollback retrying on
sending");
// OK
@@ -466,14 +469,13 @@
while (retry);
logAndSystemOut("#test Finished sending, starting consumption
now");
-
-
+
boolean blocked = false;
retry = false;
ArrayList<Integer> msgs = new ArrayList<Integer>();
-
- ClientConsumer consumer = null;
+
+ ClientConsumer consumer = null;
do
{
msgs.clear();
@@ -496,14 +498,14 @@
{
break;
}
-
+
if (log.isDebugEnabled())
{
log.debug("Received message " + message);
}
int count = message.getIntProperty("counter");
-
+
if (count != i)
{
log.warn("count was received out of order, " + count +
"!=" + i);
@@ -521,11 +523,13 @@
{
if (blocked)
{
- assertTrue("msgs.size is expected to be 0 or " +
numMessages + " but it was " + msgs.size(), msgs.size() == 0 || msgs.size() ==
numMessages);
+ assertTrue("msgs.size is expected to be 0 or " +
numMessages + " but it was " + msgs.size(),
+ msgs.size() == 0 || msgs.size() == numMessages);
}
else
{
- assertTrue("msgs.size is expected to be " +
numMessages + " but it was " + msgs.size(), msgs.size() == numMessages);
+ assertTrue("msgs.size is expected to be " + numMessages
+ " but it was " + msgs.size(),
+ msgs.size() == numMessages);
}
}
catch (Throwable e)