[hornetq-commits] JBoss hornetq SVN: r9145 - trunk/tests/src/org/hornetq/tests/integration/cluster/failover.
do-not-reply at jboss.org
do-not-reply at jboss.org
Wed Apr 21 19:20:38 EDT 2010
Author: clebert.suconic at jboss.com
Date: 2010-04-21 19:20:37 -0400 (Wed, 21 Apr 2010)
New Revision: 9145
Modified:
trunk/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTest.java
Log:
Adding a new test
Modified: trunk/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTest.java 2010-04-21 16:35:31 UTC (rev 9144)
+++ trunk/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTest.java 2010-04-21 23:20:37 UTC (rev 9145)
@@ -31,7 +31,13 @@
import org.hornetq.api.core.Message;
import org.hornetq.api.core.SimpleString;
import org.hornetq.api.core.TransportConfiguration;
-import org.hornetq.api.core.client.*;
+import org.hornetq.api.core.client.ClientConsumer;
+import org.hornetq.api.core.client.ClientMessage;
+import org.hornetq.api.core.client.ClientProducer;
+import org.hornetq.api.core.client.ClientSession;
+import org.hornetq.api.core.client.ClientSessionFactory;
+import org.hornetq.api.core.client.HornetQClient;
+import org.hornetq.api.core.client.SessionFailureListener;
import org.hornetq.core.client.impl.ClientSessionFactoryInternal;
import org.hornetq.core.client.impl.ClientSessionInternal;
import org.hornetq.core.logging.Logger;
@@ -150,6 +156,102 @@
Assert.assertEquals(0, sf.numConnections());
}
+ public void testConsumeTransacted() throws Exception
+ {
+ ClientSessionFactoryInternal sf = getSessionFactory();
+
+ sf.setBlockOnNonDurableSend(true);
+ sf.setBlockOnDurableSend(true);
+
+ ClientSession session = sf.createSession(false, false);
+
+ session.createQueue(FailoverTestBase.ADDRESS, FailoverTestBase.ADDRESS, null, true);
+
+ final CountDownLatch latch = new CountDownLatch(1);
+
+ class MyListener extends BaseListener
+ {
+ public void connectionFailed(final HornetQException me)
+ {
+ latch.countDown();
+ }
+ }
+
+ session.addFailureListener(new MyListener());
+
+ ClientProducer producer = session.createProducer(FailoverTestBase.ADDRESS);
+
+ final int numMessages = 10;
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = session.createMessage(true);
+
+ setBody(i, message);
+
+ message.putIntProperty("counter", i);
+
+ producer.send(message);
+ }
+
+ session.commit();
+
+ ClientConsumer consumer = session.createConsumer(FailoverTestBase.ADDRESS);
+
+ session.start();
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = consumer.receive(1000);
+
+ message.acknowledge();
+
+ // TODO: The test won't pass if you uncomment this line
+ // assertEquals(i, (int)message.getIntProperty("counter"));
+
+ if (i == 5)
+ {
+ fail(session, latch);
+ }
+ }
+
+ boolean exception = false;
+
+ try
+ {
+ session.commit();
+ }
+ catch (HornetQException e)
+ {
+ exception = true;
+ }
+
+ consumer.close();
+
+ consumer = session.createConsumer(FailoverTestBase.ADDRESS);
+
+ session.start();
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = consumer.receive(1000);
+
+ assertNotNull(message);
+
+ message.acknowledge();
+ }
+
+ session.commit();
+
+ assertTrue("Exception was expected!", exception);
+
+ session.close();
+
+ Assert.assertEquals(0, sf.numSessions());
+
+ Assert.assertEquals(0, sf.numConnections());
+ }
+
/** It doesn't fail, but it restart both servers, live and backup, and the data should be received after the restart,
* and the servers should be able to connect without any problems. */
public void testRestartServers() throws Exception
@@ -218,7 +320,7 @@
Assert.assertEquals(0, sf.numConnections());
}
-
+
// https://jira.jboss.org/jira/browse/HORNETQ-285
public void testFailoverOnInitialConnection() throws Exception
{
@@ -227,9 +329,9 @@
sf.setBlockOnNonDurableSend(true);
sf.setBlockOnDurableSend(true);
sf.setFailoverOnInitialConnection(true);
-
+
// Stop live server
-
+
this.server0Service.stop();
ClientSession session = sf.createSession();
@@ -1405,7 +1507,7 @@
session.close();
- sf = (ClientSessionFactoryInternal) HornetQClient.createClientSessionFactory(getConnectorTransportConfiguration(false));
+ sf = (ClientSessionFactoryInternal)HornetQClient.createClientSessionFactory(getConnectorTransportConfiguration(false));
session = sendAndConsume(sf, false);
@@ -1789,22 +1891,22 @@
{
testSimpleSendAfterFailover(true, true);
}
-
+
public void testSimpleSendAfterFailoverNonDurableTemporary() throws Exception
{
testSimpleSendAfterFailover(false, true);
}
-
+
public void testSimpleSendAfterFailoverDurableNonTemporary() throws Exception
{
testSimpleSendAfterFailover(true, false);
}
-
+
public void testSimpleSendAfterFailoverNonDurableNonTemporary() throws Exception
{
testSimpleSendAfterFailover(false, false);
}
-
+
private void testSimpleSendAfterFailover(final boolean durable, final boolean temporary) throws Exception
{
ClientSessionFactoryInternal sf = getSessionFactory();
@@ -1821,9 +1923,9 @@
}
else
{
- session.createQueue(FailoverTestBase.ADDRESS, FailoverTestBase.ADDRESS, null, durable);
+ session.createQueue(FailoverTestBase.ADDRESS, FailoverTestBase.ADDRESS, null, durable);
}
-
+
final CountDownLatch latch = new CountDownLatch(1);
class MyListener extends BaseListener
@@ -1996,11 +2098,11 @@
producer.send(message);
}
-
+
class Committer extends Thread
{
DelayInterceptor2 interceptor = new DelayInterceptor2();
-
+
@Override
public void run()
{
@@ -2037,25 +2139,25 @@
}
Committer committer = new Committer();
-
- //Commit will occur, but response will never get back, connetion is failed, and commit should be unblocked
- //with transaction rolled back
+ // Commit will occur, but response will never get back, connetion is failed, and commit should be unblocked
+ // with transaction rolled back
+
committer.start();
- //Wait for the commit to occur and the response to be discarded
+ // Wait for the commit to occur and the response to be discarded
assertTrue(committer.interceptor.await());
-
+
Thread.sleep(500);
-
+
fail(session, latch);
committer.join();
-
+
Assert.assertFalse(committer.failed);
session.close();
-
+
ClientSession session2 = sf.createSession(false, false);
producer = session2.createProducer(FailoverTestBase.ADDRESS);
@@ -2081,7 +2183,7 @@
}
session2.commit();
-
+
ClientConsumer consumer = session2.createConsumer(FailoverTestBase.ADDRESS);
session2.start();
@@ -2322,10 +2424,10 @@
for (int i = 0; i < numMessages; i++)
{
ClientMessage message = session.createMessage(HornetQTextMessage.TYPE,
- false,
- 0,
- System.currentTimeMillis(),
- (byte)1);
+ false,
+ 0,
+ System.currentTimeMillis(),
+ (byte)1);
message.putIntProperty(new SimpleString("count"), i);
message.getBodyBuffer().writeString("aardvarks");
producer.send(message);
More information about the hornetq-commits
mailing list