[hornetq-commits] JBoss hornetq SVN: r9145 - trunk/tests/src/org/hornetq/tests/integration/cluster/failover.

do-not-reply at jboss.org do-not-reply at jboss.org
Wed Apr 21 19:20:38 EDT 2010


Author: clebert.suconic at jboss.com
Date: 2010-04-21 19:20:37 -0400 (Wed, 21 Apr 2010)
New Revision: 9145

Modified:
   trunk/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTest.java
Log:
Adding a new test

Modified: trunk/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTest.java	2010-04-21 16:35:31 UTC (rev 9144)
+++ trunk/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTest.java	2010-04-21 23:20:37 UTC (rev 9145)
@@ -31,7 +31,13 @@
 import org.hornetq.api.core.Message;
 import org.hornetq.api.core.SimpleString;
 import org.hornetq.api.core.TransportConfiguration;
-import org.hornetq.api.core.client.*;
+import org.hornetq.api.core.client.ClientConsumer;
+import org.hornetq.api.core.client.ClientMessage;
+import org.hornetq.api.core.client.ClientProducer;
+import org.hornetq.api.core.client.ClientSession;
+import org.hornetq.api.core.client.ClientSessionFactory;
+import org.hornetq.api.core.client.HornetQClient;
+import org.hornetq.api.core.client.SessionFailureListener;
 import org.hornetq.core.client.impl.ClientSessionFactoryInternal;
 import org.hornetq.core.client.impl.ClientSessionInternal;
 import org.hornetq.core.logging.Logger;
@@ -150,6 +156,102 @@
       Assert.assertEquals(0, sf.numConnections());
    }
 
+   public void testConsumeTransacted() throws Exception
+   {
+      ClientSessionFactoryInternal sf = getSessionFactory();
+
+      sf.setBlockOnNonDurableSend(true);
+      sf.setBlockOnDurableSend(true);
+
+      ClientSession session = sf.createSession(false, false);
+
+      session.createQueue(FailoverTestBase.ADDRESS, FailoverTestBase.ADDRESS, null, true);
+
+      final CountDownLatch latch = new CountDownLatch(1);
+
+      class MyListener extends BaseListener
+      {
+         public void connectionFailed(final HornetQException me)
+         {
+            latch.countDown();
+         }
+      }
+
+      session.addFailureListener(new MyListener());
+
+      ClientProducer producer = session.createProducer(FailoverTestBase.ADDRESS);
+
+      final int numMessages = 10;
+
+      for (int i = 0; i < numMessages; i++)
+      {
+         ClientMessage message = session.createMessage(true);
+
+         setBody(i, message);
+
+         message.putIntProperty("counter", i);
+
+         producer.send(message);
+      }
+
+      session.commit();
+
+      ClientConsumer consumer = session.createConsumer(FailoverTestBase.ADDRESS);
+
+      session.start();
+
+      for (int i = 0; i < numMessages; i++)
+      {
+         ClientMessage message = consumer.receive(1000);
+
+         message.acknowledge();
+
+         // TODO: The test won't pass if you uncomment this line
+         // assertEquals(i, (int)message.getIntProperty("counter"));
+
+         if (i == 5)
+         {
+            fail(session, latch);
+         }
+      }
+
+      boolean exception = false;
+
+      try
+      {
+         session.commit();
+      }
+      catch (HornetQException e)
+      {
+         exception = true;
+      }
+
+      consumer.close();
+
+      consumer = session.createConsumer(FailoverTestBase.ADDRESS);
+
+      session.start();
+
+      for (int i = 0; i < numMessages; i++)
+      {
+         ClientMessage message = consumer.receive(1000);
+         
+         assertNotNull(message);
+
+         message.acknowledge();
+      }
+
+      session.commit();
+
+      assertTrue("Exception was expected!", exception);
+
+      session.close();
+
+      Assert.assertEquals(0, sf.numSessions());
+
+      Assert.assertEquals(0, sf.numConnections());
+   }
+
    /** It doesn't fail, but it restart both servers, live and backup, and the data should be received after the restart,
     *  and the servers should be able to connect without any problems. */
    public void testRestartServers() throws Exception
@@ -218,7 +320,7 @@
 
       Assert.assertEquals(0, sf.numConnections());
    }
-   
+
    // https://jira.jboss.org/jira/browse/HORNETQ-285
    public void testFailoverOnInitialConnection() throws Exception
    {
@@ -227,9 +329,9 @@
       sf.setBlockOnNonDurableSend(true);
       sf.setBlockOnDurableSend(true);
       sf.setFailoverOnInitialConnection(true);
-      
+
       // Stop live server
-      
+
       this.server0Service.stop();
 
       ClientSession session = sf.createSession();
@@ -1405,7 +1507,7 @@
 
       session.close();
 
-      sf = (ClientSessionFactoryInternal) HornetQClient.createClientSessionFactory(getConnectorTransportConfiguration(false));
+      sf = (ClientSessionFactoryInternal)HornetQClient.createClientSessionFactory(getConnectorTransportConfiguration(false));
 
       session = sendAndConsume(sf, false);
 
@@ -1789,22 +1891,22 @@
    {
       testSimpleSendAfterFailover(true, true);
    }
-   
+
    public void testSimpleSendAfterFailoverNonDurableTemporary() throws Exception
    {
       testSimpleSendAfterFailover(false, true);
    }
-   
+
    public void testSimpleSendAfterFailoverDurableNonTemporary() throws Exception
    {
       testSimpleSendAfterFailover(true, false);
    }
-   
+
    public void testSimpleSendAfterFailoverNonDurableNonTemporary() throws Exception
    {
       testSimpleSendAfterFailover(false, false);
    }
-   
+
    private void testSimpleSendAfterFailover(final boolean durable, final boolean temporary) throws Exception
    {
       ClientSessionFactoryInternal sf = getSessionFactory();
@@ -1821,9 +1923,9 @@
       }
       else
       {
-         session.createQueue(FailoverTestBase.ADDRESS, FailoverTestBase.ADDRESS, null, durable);         
+         session.createQueue(FailoverTestBase.ADDRESS, FailoverTestBase.ADDRESS, null, durable);
       }
-                
+
       final CountDownLatch latch = new CountDownLatch(1);
 
       class MyListener extends BaseListener
@@ -1996,11 +2098,11 @@
 
          producer.send(message);
       }
-      
+
       class Committer extends Thread
       {
          DelayInterceptor2 interceptor = new DelayInterceptor2();
-         
+
          @Override
          public void run()
          {
@@ -2037,25 +2139,25 @@
       }
 
       Committer committer = new Committer();
-      
-      //Commit will occur, but response will never get back, connetion is failed, and commit should be unblocked
-      //with transaction rolled back
 
+      // Commit will occur, but response will never get back, connetion is failed, and commit should be unblocked
+      // with transaction rolled back
+
       committer.start();
 
-      //Wait for the commit to occur and the response to be discarded
+      // Wait for the commit to occur and the response to be discarded
       assertTrue(committer.interceptor.await());
-      
+
       Thread.sleep(500);
-      
+
       fail(session, latch);
 
       committer.join();
-      
+
       Assert.assertFalse(committer.failed);
 
       session.close();
-      
+
       ClientSession session2 = sf.createSession(false, false);
 
       producer = session2.createProducer(FailoverTestBase.ADDRESS);
@@ -2081,7 +2183,7 @@
       }
 
       session2.commit();
-      
+
       ClientConsumer consumer = session2.createConsumer(FailoverTestBase.ADDRESS);
 
       session2.start();
@@ -2322,10 +2424,10 @@
       for (int i = 0; i < numMessages; i++)
       {
          ClientMessage message = session.createMessage(HornetQTextMessage.TYPE,
-                                                             false,
-                                                             0,
-                                                             System.currentTimeMillis(),
-                                                             (byte)1);
+                                                       false,
+                                                       0,
+                                                       System.currentTimeMillis(),
+                                                       (byte)1);
          message.putIntProperty(new SimpleString("count"), i);
          message.getBodyBuffer().writeString("aardvarks");
          producer.send(message);



More information about the hornetq-commits mailing list