[jboss-cvs] JBoss Messaging SVN: r4594 - trunk/tests/src/org/jboss/messaging/tests/unit/jms/bridge/impl.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Thu Jun 26 09:12:42 EDT 2008


Author: jmesnil
Date: 2008-06-26 09:12:42 -0400 (Thu, 26 Jun 2008)
New Revision: 4594

Modified:
   trunk/tests/src/org/jboss/messaging/tests/unit/jms/bridge/impl/BridgeImplTest.java
Log:
added unit test for the XA use cases

Modified: trunk/tests/src/org/jboss/messaging/tests/unit/jms/bridge/impl/BridgeImplTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/jms/bridge/impl/BridgeImplTest.java	2008-06-26 10:03:40 UTC (rev 4593)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/jms/bridge/impl/BridgeImplTest.java	2008-06-26 13:12:42 UTC (rev 4594)
@@ -24,6 +24,7 @@
 
 import static org.easymock.EasyMock.anyBoolean;
 import static org.easymock.EasyMock.anyInt;
+import static org.easymock.EasyMock.anyLong;
 import static org.easymock.EasyMock.createNiceMock;
 import static org.easymock.EasyMock.createStrictMock;
 import static org.easymock.EasyMock.expect;
@@ -44,10 +45,16 @@
 import javax.jms.MessageListener;
 import javax.jms.MessageProducer;
 import javax.jms.Session;
+import javax.jms.XAConnection;
+import javax.jms.XAConnectionFactory;
+import javax.jms.XASession;
+import javax.transaction.Transaction;
 import javax.transaction.TransactionManager;
+import javax.transaction.xa.XAResource;
 
 import junit.framework.TestCase;
 
+import org.easymock.EasyMock;
 import org.easymock.IAnswer;
 import org.jboss.messaging.jms.bridge.Bridge;
 import org.jboss.messaging.jms.bridge.ConnectionFactoryFactory;
@@ -773,21 +780,64 @@
       // with batch size of 1, receive 2 messages and send 2
       doSendMessagesByBatchInNoTx(1, 2, 2, 2);
    }
-   
+
    public void testSendMessagesInNoTx_2() throws Exception
    {
       // with batch size of 2, receive 2 messages and send 2
       doSendMessagesByBatchInNoTx(2, 2, 2, 1);
    }
-   
+
    public void testSendMessagesInNoTx_3() throws Exception
    {
       // with batch size of 2, receive 1 messages and do not send any
       doSendMessagesByBatchInNoTx(2, 1, 0, 0);
    }
-   
-   public void doSendMessagesByBatchInNoTx(int batchSize, int reveivedCount, int sendCount, int batchCount) throws Exception
+
+   public void testSendMessagesInLocalTx_1() throws Exception
    {
+      // with batch size of 1, receive 2 messages and send 2
+      doSendMessagesByBatchInLocalTx(1, 2, 2, 2);
+   }
+
+   public void testSendMessagesInLocalTx_2() throws Exception
+   {
+      // with batch size of 2, receive 2 messages and send 2
+      doSendMessagesByBatchInLocalTx(2, 2, 2, 1);
+   }
+
+   public void testSendMessagesInLocalTx_3() throws Exception
+   {
+      // with batch size of 2, receive 1 messages and do not send any
+      doSendMessagesByBatchInLocalTx(2, 1, 0, 0);
+   }
+
+   public void testSendMessagesInXA_1() throws Exception
+   {
+      // with batch size of 1, receive 2 messages and send 2
+      doSendMessagesByBatchInXA(1, 2, 2, 2);
+   }
+
+   public void testSendMessagesInXA_2() throws Exception
+   {
+      // with batch size of 2, receive 2 messages and send 2
+      doSendMessagesByBatchInXA(2, 2, 2, 1);
+   }
+
+   public void testSendMessagesInXA_3() throws Exception
+   {
+      // with batch size of 2, receive 1 messages and do not send any
+      doSendMessagesByBatchInXA(2, 1, 0, 0);
+   }
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   // Private -------------------------------------------------------
+
+   private void doSendMessagesByBatchInNoTx(int batchSize, int receivedCount,
+         int sendCount, int batchCount) throws Exception
+   {
       ConnectionFactoryFactory sourceCFF = createStrictMock(ConnectionFactoryFactory.class);
       ConnectionFactory sourceCF = createStrictMock(ConnectionFactory.class);
       Connection sourceConn = createStrictMock(Connection.class);
@@ -825,7 +875,7 @@
             targetSession);
       expect(targetSession.createProducer(null)).andReturn(targetProducer);
       sourceConn.start();
-      
+
       if (sendCount > 0)
       {
          targetProducer.send(targetDest, message, 0, 0, 0);
@@ -863,7 +913,7 @@
       bridge.start();
       assertTrue(bridge.isStarted());
 
-      for (int i = 0; i < reveivedCount; i++)
+      for (int i = 0; i < receivedCount; i++)
       {
          answer.listener.onMessage(message);
       }
@@ -875,27 +925,14 @@
       verify(tm);
       verify(message);
    }
-   
-   public void testSendMessagesInLocalTx_1() throws Exception
+
+   /*
+    * The source and target share the same ConnectionFactoryFactory
+    * => the will be handled using a local transaction
+    */
+   private void doSendMessagesByBatchInLocalTx(int batchSize,
+         int receivedCount, int sendCount, int batchCount) throws Exception
    {
-      // with batch size of 1, receive 2 messages and send 2
-      doSendMessagesByBatchInLocalTx(1, 2, 2, 2);
-   }
-   
-   public void testSendMessagesInLocalTx_2() throws Exception
-   {
-      // with batch size of 2, receive 2 messages and send 2
-      doSendMessagesByBatchInLocalTx(2, 2, 2, 1);
-   }
-   
-   public void testSendMessagesInLocalTx_3() throws Exception
-   {
-      // with batch size of 2, receive 1 messages and do not send any
-      doSendMessagesByBatchInLocalTx(2, 1, 0, 0);
-   }
-   
-   public void doSendMessagesByBatchInLocalTx(int batchSize, int reveivedCount, int sendCount, int batchCount) throws Exception
-   {
       ConnectionFactoryFactory commonCFF = createStrictMock(ConnectionFactoryFactory.class);
       ConnectionFactory commonCF = createStrictMock(ConnectionFactory.class);
       Connection commonConn = createStrictMock(Connection.class);
@@ -924,7 +961,7 @@
       sourceConsumer.setMessageListener(isA(MessageListener.class));
       expectLastCall().andAnswer(answer);
       commonConn.start();
-      
+
       if (sendCount > 0)
       {
          targetProducer.send(targetDest, message, 0, 0, 0);
@@ -961,7 +998,7 @@
       bridge.start();
       assertTrue(bridge.isStarted());
 
-      for (int i = 0; i < reveivedCount; i++)
+      for (int i = 0; i < receivedCount; i++)
       {
          answer.listener.onMessage(message);
       }
@@ -972,22 +1009,178 @@
       verify(tm);
       verify(message);
    }
-   
-   // Package protected ---------------------------------------------
 
-   // Protected -----------------------------------------------------
+   /*
+    * QualityOfServiceMode is ONCE_AND_ONLY_ONCE
+    * => XA is used to handle transactions
+    */
+   private void doSendMessagesByBatchInXA(int batchSize, int receivedCount,
+         int sendCount, int batchCount) throws Exception
+   {
+      ConnectionFactoryFactory sourceCFF = createStrictMock(ConnectionFactoryFactory.class);
+      XAConnectionFactory sourceXACF = createStrictMock(XAConnectionFactory.class);
+      ConnectionFactory sourceCF = createStrictMock(ConnectionFactory.class);
+      XAConnFactoryStub sourceCFStub = new XAConnFactoryStub(sourceXACF, sourceCF);
+      XAConnection sourceXAConn = createStrictMock(XAConnection.class);
+      XASession sourceXASession = createStrictMock(XASession.class);
+      XAResource sourceXARes = createStrictMock(XAResource.class);
+      Session sourceSession = createStrictMock(Session.class);
+      MessageConsumer sourceConsumer = createStrictMock(MessageConsumer.class);
+      DestinationFactory sourceDF = createStrictMock(DestinationFactory.class);
+      Destination sourceDest = createStrictMock(Destination.class);
+      ConnectionFactoryFactory targetCFF = createStrictMock(ConnectionFactoryFactory.class);
+      XAConnectionFactory targetXACF = createStrictMock(XAConnectionFactory.class);
+      ConnectionFactory targetCF = createStrictMock(ConnectionFactory.class);
+      XAConnFactoryStub targetCFStub = new XAConnFactoryStub(targetXACF, targetCF);
+      XAConnection targetXAConn = createStrictMock(XAConnection.class);
+      XASession targetXASession = createStrictMock(XASession.class);
+      XAResource targetXARes = createStrictMock(XAResource.class);
+      Session targetSession = createStrictMock(Session.class);
+      MessageProducer targetProducer = createStrictMock(MessageProducer.class);
+      DestinationFactory targetDF = createStrictMock(DestinationFactory.class);
+      Destination targetDest = createStrictMock(Destination.class);
+      TransactionManager tm = createStrictMock(TransactionManager.class);
+      Transaction tx = createStrictMock(Transaction.class);
+      Message message = createNiceMock(Message.class);
 
-   // Private -------------------------------------------------------
+      expect(sourceDF.createDestination()).andReturn(sourceDest);
+      expect(targetDF.createDestination()).andReturn(targetDest);
+      expect(sourceCFF.createConnectionFactory()).andReturn(sourceCFStub);
+      expect(sourceXACF.createXAConnection()).andReturn(sourceXAConn);
+      sourceXAConn.setExceptionListener(isA(ExceptionListener.class));
+      expect(sourceXAConn.createXASession()).andReturn(sourceXASession);
+      expect(sourceXASession.getSession()).andReturn(sourceSession);
+      expect(sourceXASession.getXAResource()).andStubReturn(sourceXARes);
+      expect(sourceSession.createConsumer(sourceDest))
+            .andReturn(sourceConsumer);
+      SetMessageListenerAnswer answer = new SetMessageListenerAnswer();
+      sourceConsumer.setMessageListener(isA(MessageListener.class));
+      expectLastCall().andAnswer(answer);
+      expect(targetCFF.createConnectionFactory()).andReturn(targetCFStub);
+      expect(targetXACF.createXAConnection()).andReturn(targetXAConn);
+      targetXAConn.setExceptionListener(isA(ExceptionListener.class));
+      expect(targetXAConn.createXASession()).andReturn(targetXASession);
+      expect(targetXASession.getSession()).andReturn(targetSession);
+      expect(targetXASession.getXAResource()).andStubReturn(targetXARes);
+      expect(targetSession.createProducer(null)).andReturn(targetProducer);
+      
+      // when starting the bridge
+      expect(tm.suspend()).andReturn(null);
+      tm.setTransactionTimeout(anyInt());
+      tm.begin();
+      expect(tm.getTransaction()).andReturn(tx);
+      expect(tm.suspend()).andReturn(null);
+      expect(tx.enlistResource(sourceXARes)).andReturn(true);
+      expect(tx.enlistResource(targetXARes)).andReturn(true);
 
+      // everytime a batchXA is sent
+      for (int i = 0; i < batchCount; i++)
+      {
+         expect(tx.delistResource(sourceXARes, XAResource.TMSUCCESS)).andReturn(true);
+         expect(tx.delistResource(targetXARes, XAResource.TMSUCCESS)).andReturn(true);
+         tx.commit();
+         tm.setTransactionTimeout(anyInt());
+         tm.begin();
+         expect(tm.getTransaction()).andReturn(tx);
+         expect(tm.suspend()).andReturn(null);
+         expect(tx.enlistResource(sourceXARes)).andReturn(true);
+         expect(tx.enlistResource(targetXARes)).andReturn(true);         
+      }
+      
+      sourceXAConn.start();
+
+      if (sendCount > 0)
+      {
+         targetProducer.send(targetDest, message, 0, 0, 0);
+         expectLastCall().times(sendCount);
+      }
+
+      replay(sourceCFF, sourceXACF, sourceCF, sourceXAConn, sourceXASession, sourceXARes, sourceSession, sourceConsumer,
+            sourceDF, sourceDest);
+      replay(targetCFF, targetXACF, targetCF, targetXAConn, targetXASession, targetXARes, targetSession, targetProducer,
+            targetDF, targetDest);
+      replay(tm, tx);
+      replay(message);
+
+      BridgeImpl bridge = new BridgeImpl();
+      assertNotNull(bridge);
+
+      bridge.setSourceConnectionFactoryFactory(sourceCFF);
+      bridge.setSourceDestinationFactory(sourceDF);
+      bridge.setTargetConnectionFactoryFactory(targetCFF);
+      bridge.setTargetDestinationFactory(targetDF);
+      bridge.setFailureRetryInterval(-1);
+      bridge.setMaxRetries(-1);
+      bridge.setMaxBatchSize(batchSize);
+      bridge.setMaxBatchTime(-1);
+      bridge.setTransactionManager(tm);
+      bridge.setQualityOfServiceMode(QualityOfServiceMode.ONCE_AND_ONLY_ONCE);
+
+      assertFalse(bridge.isStarted());
+      bridge.start();
+      assertTrue(bridge.isStarted());
+
+      for (int i = 0; i < receivedCount; i++)
+      {
+         answer.listener.onMessage(message);
+      }
+
+      verify(sourceCFF, sourceXACF, sourceCF, sourceXAConn, sourceXASession, sourceXARes, sourceSession, sourceConsumer,
+            sourceDF, sourceDest);
+      verify(targetCFF, targetXACF, targetCF, targetXAConn, targetXASession, targetXARes, targetSession, targetProducer,
+            targetDF, targetDest);
+      verify(tm, tx);
+      verify(message);
+   }
+
    // Inner classes -------------------------------------------------
-   
+
    class SetMessageListenerAnswer implements IAnswer
    {
       MessageListener listener = null;
+
       public Object answer() throws Throwable
       {
          listener = (MessageListener) getCurrentArguments()[0];
          return null;
       }
    }
+   
+   /*
+    * A stub which implements both XAConnectionFactory & ConnectionFactory
+    */
+   class XAConnFactoryStub implements XAConnectionFactory, ConnectionFactory
+   {
+      private XAConnectionFactory xacf;
+      private ConnectionFactory cf;
+
+      public XAConnFactoryStub(XAConnectionFactory xacf, ConnectionFactory cf)
+      {
+         this.xacf = xacf;
+         this.cf = cf;
+      }
+
+      public XAConnection createXAConnection() throws JMSException
+      {
+         return xacf.createXAConnection();
+      }
+
+      public XAConnection createXAConnection(String arg0, String arg1)
+            throws JMSException
+      {
+         return xacf.createXAConnection(arg0, arg1);
+      }
+
+      public Connection createConnection() throws JMSException
+      {
+         return cf.createConnection();
+      }
+
+      public Connection createConnection(String arg0, String arg1)
+            throws JMSException
+      {
+         return cf.createConnection(arg0, arg1);
+      }
+      
+   }
 }




More information about the jboss-cvs-commits mailing list