[jboss-cvs] JBoss Messaging SVN: r4594 - trunk/tests/src/org/jboss/messaging/tests/unit/jms/bridge/impl.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Thu Jun 26 09:12:42 EDT 2008
Author: jmesnil
Date: 2008-06-26 09:12:42 -0400 (Thu, 26 Jun 2008)
New Revision: 4594
Modified:
trunk/tests/src/org/jboss/messaging/tests/unit/jms/bridge/impl/BridgeImplTest.java
Log:
added unit test for the XA use cases
Modified: trunk/tests/src/org/jboss/messaging/tests/unit/jms/bridge/impl/BridgeImplTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/jms/bridge/impl/BridgeImplTest.java 2008-06-26 10:03:40 UTC (rev 4593)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/jms/bridge/impl/BridgeImplTest.java 2008-06-26 13:12:42 UTC (rev 4594)
@@ -24,6 +24,7 @@
import static org.easymock.EasyMock.anyBoolean;
import static org.easymock.EasyMock.anyInt;
+import static org.easymock.EasyMock.anyLong;
import static org.easymock.EasyMock.createNiceMock;
import static org.easymock.EasyMock.createStrictMock;
import static org.easymock.EasyMock.expect;
@@ -44,10 +45,16 @@
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
+import javax.jms.XAConnection;
+import javax.jms.XAConnectionFactory;
+import javax.jms.XASession;
+import javax.transaction.Transaction;
import javax.transaction.TransactionManager;
+import javax.transaction.xa.XAResource;
import junit.framework.TestCase;
+import org.easymock.EasyMock;
import org.easymock.IAnswer;
import org.jboss.messaging.jms.bridge.Bridge;
import org.jboss.messaging.jms.bridge.ConnectionFactoryFactory;
@@ -773,21 +780,64 @@
// with batch size of 1, receive 2 messages and send 2
doSendMessagesByBatchInNoTx(1, 2, 2, 2);
}
-
+
public void testSendMessagesInNoTx_2() throws Exception
{
// with batch size of 2, receive 2 messages and send 2
doSendMessagesByBatchInNoTx(2, 2, 2, 1);
}
-
+
public void testSendMessagesInNoTx_3() throws Exception
{
// with batch size of 2, receive 1 messages and do not send any
doSendMessagesByBatchInNoTx(2, 1, 0, 0);
}
-
- public void doSendMessagesByBatchInNoTx(int batchSize, int reveivedCount, int sendCount, int batchCount) throws Exception
+
+ public void testSendMessagesInLocalTx_1() throws Exception
{
+ // with batch size of 1, receive 2 messages and send 2
+ doSendMessagesByBatchInLocalTx(1, 2, 2, 2);
+ }
+
+ public void testSendMessagesInLocalTx_2() throws Exception
+ {
+ // with batch size of 2, receive 2 messages and send 2
+ doSendMessagesByBatchInLocalTx(2, 2, 2, 1);
+ }
+
+ public void testSendMessagesInLocalTx_3() throws Exception
+ {
+ // with batch size of 2, receive 1 messages and do not send any
+ doSendMessagesByBatchInLocalTx(2, 1, 0, 0);
+ }
+
+ public void testSendMessagesInXA_1() throws Exception
+ {
+ // with batch size of 1, receive 2 messages and send 2
+ doSendMessagesByBatchInXA(1, 2, 2, 2);
+ }
+
+ public void testSendMessagesInXA_2() throws Exception
+ {
+ // with batch size of 2, receive 2 messages and send 2
+ doSendMessagesByBatchInXA(2, 2, 2, 1);
+ }
+
+ public void testSendMessagesInXA_3() throws Exception
+ {
+ // with batch size of 2, receive 1 messages and do not send any
+ doSendMessagesByBatchInXA(2, 1, 0, 0);
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ private void doSendMessagesByBatchInNoTx(int batchSize, int receivedCount,
+ int sendCount, int batchCount) throws Exception
+ {
ConnectionFactoryFactory sourceCFF = createStrictMock(ConnectionFactoryFactory.class);
ConnectionFactory sourceCF = createStrictMock(ConnectionFactory.class);
Connection sourceConn = createStrictMock(Connection.class);
@@ -825,7 +875,7 @@
targetSession);
expect(targetSession.createProducer(null)).andReturn(targetProducer);
sourceConn.start();
-
+
if (sendCount > 0)
{
targetProducer.send(targetDest, message, 0, 0, 0);
@@ -863,7 +913,7 @@
bridge.start();
assertTrue(bridge.isStarted());
- for (int i = 0; i < reveivedCount; i++)
+ for (int i = 0; i < receivedCount; i++)
{
answer.listener.onMessage(message);
}
@@ -875,27 +925,14 @@
verify(tm);
verify(message);
}
-
- public void testSendMessagesInLocalTx_1() throws Exception
+
+ /*
+ * The source and target share the same ConnectionFactoryFactory
+ * => the will be handled using a local transaction
+ */
+ private void doSendMessagesByBatchInLocalTx(int batchSize,
+ int receivedCount, int sendCount, int batchCount) throws Exception
{
- // with batch size of 1, receive 2 messages and send 2
- doSendMessagesByBatchInLocalTx(1, 2, 2, 2);
- }
-
- public void testSendMessagesInLocalTx_2() throws Exception
- {
- // with batch size of 2, receive 2 messages and send 2
- doSendMessagesByBatchInLocalTx(2, 2, 2, 1);
- }
-
- public void testSendMessagesInLocalTx_3() throws Exception
- {
- // with batch size of 2, receive 1 messages and do not send any
- doSendMessagesByBatchInLocalTx(2, 1, 0, 0);
- }
-
- public void doSendMessagesByBatchInLocalTx(int batchSize, int reveivedCount, int sendCount, int batchCount) throws Exception
- {
ConnectionFactoryFactory commonCFF = createStrictMock(ConnectionFactoryFactory.class);
ConnectionFactory commonCF = createStrictMock(ConnectionFactory.class);
Connection commonConn = createStrictMock(Connection.class);
@@ -924,7 +961,7 @@
sourceConsumer.setMessageListener(isA(MessageListener.class));
expectLastCall().andAnswer(answer);
commonConn.start();
-
+
if (sendCount > 0)
{
targetProducer.send(targetDest, message, 0, 0, 0);
@@ -961,7 +998,7 @@
bridge.start();
assertTrue(bridge.isStarted());
- for (int i = 0; i < reveivedCount; i++)
+ for (int i = 0; i < receivedCount; i++)
{
answer.listener.onMessage(message);
}
@@ -972,22 +1009,178 @@
verify(tm);
verify(message);
}
-
- // Package protected ---------------------------------------------
- // Protected -----------------------------------------------------
+ /*
+ * QualityOfServiceMode is ONCE_AND_ONLY_ONCE
+ * => XA is used to handle transactions
+ */
+ private void doSendMessagesByBatchInXA(int batchSize, int receivedCount,
+ int sendCount, int batchCount) throws Exception
+ {
+ ConnectionFactoryFactory sourceCFF = createStrictMock(ConnectionFactoryFactory.class);
+ XAConnectionFactory sourceXACF = createStrictMock(XAConnectionFactory.class);
+ ConnectionFactory sourceCF = createStrictMock(ConnectionFactory.class);
+ XAConnFactoryStub sourceCFStub = new XAConnFactoryStub(sourceXACF, sourceCF);
+ XAConnection sourceXAConn = createStrictMock(XAConnection.class);
+ XASession sourceXASession = createStrictMock(XASession.class);
+ XAResource sourceXARes = createStrictMock(XAResource.class);
+ Session sourceSession = createStrictMock(Session.class);
+ MessageConsumer sourceConsumer = createStrictMock(MessageConsumer.class);
+ DestinationFactory sourceDF = createStrictMock(DestinationFactory.class);
+ Destination sourceDest = createStrictMock(Destination.class);
+ ConnectionFactoryFactory targetCFF = createStrictMock(ConnectionFactoryFactory.class);
+ XAConnectionFactory targetXACF = createStrictMock(XAConnectionFactory.class);
+ ConnectionFactory targetCF = createStrictMock(ConnectionFactory.class);
+ XAConnFactoryStub targetCFStub = new XAConnFactoryStub(targetXACF, targetCF);
+ XAConnection targetXAConn = createStrictMock(XAConnection.class);
+ XASession targetXASession = createStrictMock(XASession.class);
+ XAResource targetXARes = createStrictMock(XAResource.class);
+ Session targetSession = createStrictMock(Session.class);
+ MessageProducer targetProducer = createStrictMock(MessageProducer.class);
+ DestinationFactory targetDF = createStrictMock(DestinationFactory.class);
+ Destination targetDest = createStrictMock(Destination.class);
+ TransactionManager tm = createStrictMock(TransactionManager.class);
+ Transaction tx = createStrictMock(Transaction.class);
+ Message message = createNiceMock(Message.class);
- // Private -------------------------------------------------------
+ expect(sourceDF.createDestination()).andReturn(sourceDest);
+ expect(targetDF.createDestination()).andReturn(targetDest);
+ expect(sourceCFF.createConnectionFactory()).andReturn(sourceCFStub);
+ expect(sourceXACF.createXAConnection()).andReturn(sourceXAConn);
+ sourceXAConn.setExceptionListener(isA(ExceptionListener.class));
+ expect(sourceXAConn.createXASession()).andReturn(sourceXASession);
+ expect(sourceXASession.getSession()).andReturn(sourceSession);
+ expect(sourceXASession.getXAResource()).andStubReturn(sourceXARes);
+ expect(sourceSession.createConsumer(sourceDest))
+ .andReturn(sourceConsumer);
+ SetMessageListenerAnswer answer = new SetMessageListenerAnswer();
+ sourceConsumer.setMessageListener(isA(MessageListener.class));
+ expectLastCall().andAnswer(answer);
+ expect(targetCFF.createConnectionFactory()).andReturn(targetCFStub);
+ expect(targetXACF.createXAConnection()).andReturn(targetXAConn);
+ targetXAConn.setExceptionListener(isA(ExceptionListener.class));
+ expect(targetXAConn.createXASession()).andReturn(targetXASession);
+ expect(targetXASession.getSession()).andReturn(targetSession);
+ expect(targetXASession.getXAResource()).andStubReturn(targetXARes);
+ expect(targetSession.createProducer(null)).andReturn(targetProducer);
+
+ // when starting the bridge
+ expect(tm.suspend()).andReturn(null);
+ tm.setTransactionTimeout(anyInt());
+ tm.begin();
+ expect(tm.getTransaction()).andReturn(tx);
+ expect(tm.suspend()).andReturn(null);
+ expect(tx.enlistResource(sourceXARes)).andReturn(true);
+ expect(tx.enlistResource(targetXARes)).andReturn(true);
+ // everytime a batchXA is sent
+ for (int i = 0; i < batchCount; i++)
+ {
+ expect(tx.delistResource(sourceXARes, XAResource.TMSUCCESS)).andReturn(true);
+ expect(tx.delistResource(targetXARes, XAResource.TMSUCCESS)).andReturn(true);
+ tx.commit();
+ tm.setTransactionTimeout(anyInt());
+ tm.begin();
+ expect(tm.getTransaction()).andReturn(tx);
+ expect(tm.suspend()).andReturn(null);
+ expect(tx.enlistResource(sourceXARes)).andReturn(true);
+ expect(tx.enlistResource(targetXARes)).andReturn(true);
+ }
+
+ sourceXAConn.start();
+
+ if (sendCount > 0)
+ {
+ targetProducer.send(targetDest, message, 0, 0, 0);
+ expectLastCall().times(sendCount);
+ }
+
+ replay(sourceCFF, sourceXACF, sourceCF, sourceXAConn, sourceXASession, sourceXARes, sourceSession, sourceConsumer,
+ sourceDF, sourceDest);
+ replay(targetCFF, targetXACF, targetCF, targetXAConn, targetXASession, targetXARes, targetSession, targetProducer,
+ targetDF, targetDest);
+ replay(tm, tx);
+ replay(message);
+
+ BridgeImpl bridge = new BridgeImpl();
+ assertNotNull(bridge);
+
+ bridge.setSourceConnectionFactoryFactory(sourceCFF);
+ bridge.setSourceDestinationFactory(sourceDF);
+ bridge.setTargetConnectionFactoryFactory(targetCFF);
+ bridge.setTargetDestinationFactory(targetDF);
+ bridge.setFailureRetryInterval(-1);
+ bridge.setMaxRetries(-1);
+ bridge.setMaxBatchSize(batchSize);
+ bridge.setMaxBatchTime(-1);
+ bridge.setTransactionManager(tm);
+ bridge.setQualityOfServiceMode(QualityOfServiceMode.ONCE_AND_ONLY_ONCE);
+
+ assertFalse(bridge.isStarted());
+ bridge.start();
+ assertTrue(bridge.isStarted());
+
+ for (int i = 0; i < receivedCount; i++)
+ {
+ answer.listener.onMessage(message);
+ }
+
+ verify(sourceCFF, sourceXACF, sourceCF, sourceXAConn, sourceXASession, sourceXARes, sourceSession, sourceConsumer,
+ sourceDF, sourceDest);
+ verify(targetCFF, targetXACF, targetCF, targetXAConn, targetXASession, targetXARes, targetSession, targetProducer,
+ targetDF, targetDest);
+ verify(tm, tx);
+ verify(message);
+ }
+
// Inner classes -------------------------------------------------
-
+
class SetMessageListenerAnswer implements IAnswer
{
MessageListener listener = null;
+
public Object answer() throws Throwable
{
listener = (MessageListener) getCurrentArguments()[0];
return null;
}
}
+
+ /*
+ * A stub which implements both XAConnectionFactory & ConnectionFactory
+ */
+ class XAConnFactoryStub implements XAConnectionFactory, ConnectionFactory
+ {
+ private XAConnectionFactory xacf;
+ private ConnectionFactory cf;
+
+ public XAConnFactoryStub(XAConnectionFactory xacf, ConnectionFactory cf)
+ {
+ this.xacf = xacf;
+ this.cf = cf;
+ }
+
+ public XAConnection createXAConnection() throws JMSException
+ {
+ return xacf.createXAConnection();
+ }
+
+ public XAConnection createXAConnection(String arg0, String arg1)
+ throws JMSException
+ {
+ return xacf.createXAConnection(arg0, arg1);
+ }
+
+ public Connection createConnection() throws JMSException
+ {
+ return cf.createConnection();
+ }
+
+ public Connection createConnection(String arg0, String arg1)
+ throws JMSException
+ {
+ return cf.createConnection(arg0, arg1);
+ }
+
+ }
}
More information about the jboss-cvs-commits
mailing list