[jboss-cvs] JBoss Messaging SVN: r4607 - in trunk: tests/src/org/jboss/messaging/tests/unit/jms/bridge/impl and 1 other directory.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Fri Jun 27 05:36:16 EDT 2008
Author: jmesnil
Date: 2008-06-27 05:36:16 -0400 (Fri, 27 Jun 2008)
New Revision: 4607
Modified:
trunk/src/main/org/jboss/messaging/jms/bridge/impl/BridgeImpl.java
trunk/tests/src/org/jboss/messaging/tests/unit/jms/bridge/impl/BridgeImplTest.java
Log:
more bridge unit tests
Modified: trunk/src/main/org/jboss/messaging/jms/bridge/impl/BridgeImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/jms/bridge/impl/BridgeImpl.java 2008-06-27 00:47:24 UTC (rev 4606)
+++ trunk/src/main/org/jboss/messaging/jms/bridge/impl/BridgeImpl.java 2008-06-27 09:36:16 UTC (rev 4607)
@@ -626,7 +626,7 @@
checkValidValue(maxRetries, "maxRetries");
if (failureRetryInterval == -1 && maxRetries > 0)
{
- throw new IllegalArgumentException("If failureRetryInterval == -1 maxRetries must be 0");
+ throw new IllegalArgumentException("If failureRetryInterval == -1 maxRetries must be set to -1");
}
checkMaxBatchSize(maxBatchSize);
checkValidValue(maxBatchTime, "maxBatchTime");
Modified: trunk/tests/src/org/jboss/messaging/tests/unit/jms/bridge/impl/BridgeImplTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/jms/bridge/impl/BridgeImplTest.java 2008-06-27 00:47:24 UTC (rev 4606)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/jms/bridge/impl/BridgeImplTest.java 2008-06-27 09:36:16 UTC (rev 4607)
@@ -35,6 +35,8 @@
import static org.easymock.EasyMock.verify;
import static org.jboss.messaging.tests.util.RandomUtil.randomString;
+import java.util.Enumeration;
+
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
@@ -61,6 +63,7 @@
import org.jboss.messaging.jms.bridge.DestinationFactory;
import org.jboss.messaging.jms.bridge.QualityOfServiceMode;
import org.jboss.messaging.jms.bridge.impl.BridgeImpl;
+import org.jboss.messaging.jms.client.JBossMessage;
/**
* @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
@@ -125,7 +128,51 @@
verify(sourceCFF, sourceDF, targetCFF, targetDF);
}
+
+ public void testMaxRetriesMustBeDisabledWhenFailureRetryIntervalIsDisabled() throws Exception
+ {
+ long disabledFailureRetryInterval = -1;
+
+ ConnectionFactoryFactory sourceCFF = createStrictMock(ConnectionFactoryFactory.class);
+ ConnectionFactoryFactory targetCFF = createStrictMock(ConnectionFactoryFactory.class);
+ DestinationFactory sourceDF = createStrictMock(DestinationFactory.class);
+ DestinationFactory targetDF = createStrictMock(DestinationFactory.class);
+ String sourceUsername = randomString();
+ String sourcePassword = randomString();
+ String targetUsername = randomString();
+ String targetPassword = randomString();
+ String selector = "color = 'green'";
+ QualityOfServiceMode qosMode = QualityOfServiceMode.AT_MOST_ONCE;
+ int maxBatchSize = 1;
+ long maxBatchTime = -1;
+ String subName = randomString();
+ String clientID = randomString();
+ boolean addMessageIDInHeader = false;
+ replay(sourceCFF, sourceDF, targetCFF, targetDF);
+
+ int maxRetriesGreaterThanZero = 1;
+ try
+ {
+ new BridgeImpl(sourceCFF, targetCFF, sourceDF, targetDF,
+ sourceUsername, sourcePassword, targetUsername, targetPassword,
+ selector, disabledFailureRetryInterval, maxRetriesGreaterThanZero, qosMode, maxBatchSize,
+ maxBatchTime, subName, clientID, addMessageIDInHeader);
+ fail("IllegalArgumentException");
+ } catch(IllegalArgumentException e)
+ {
+ }
+
+ int disabledMaxRetries = -1;
+ Bridge bridge = new BridgeImpl(sourceCFF, targetCFF, sourceDF, targetDF,
+ sourceUsername, sourcePassword, targetUsername, targetPassword,
+ selector, disabledFailureRetryInterval, disabledMaxRetries, qosMode, maxBatchSize,
+ maxBatchTime, subName, clientID, addMessageIDInHeader);
+ assertNotNull(bridge);
+
+ verify(sourceCFF, sourceDF, targetCFF, targetDF);
+ }
+
public void testSetSourceDestinationFactoryWithNull() throws Exception
{
Bridge bridge = new BridgeImpl();
@@ -828,7 +875,285 @@
// with batch size of 2, receive 1 messages and do not send any
doSendMessagesByBatchInXA(2, 1, 0, 0);
}
+
+ public void testExceptionOnSourceAndRetrySucceeds() throws Exception
+ {
+ ConnectionFactoryFactory sourceCFF = createStrictMock(ConnectionFactoryFactory.class);
+ ConnectionFactory sourceCF = createStrictMock(ConnectionFactory.class);
+ Connection sourceConn = createStrictMock(Connection.class);
+ Session sourceSession = createStrictMock(Session.class);
+ MessageConsumer sourceConsumer = createStrictMock(MessageConsumer.class);
+ DestinationFactory sourceDF = createStrictMock(DestinationFactory.class);
+ Destination sourceDest = createStrictMock(Destination.class);
+ ConnectionFactoryFactory targetCFF = createStrictMock(ConnectionFactoryFactory.class);
+ ConnectionFactory targetCF = createStrictMock(ConnectionFactory.class);
+ Connection targetConn = createStrictMock(Connection.class);
+ Session targetSession = createStrictMock(Session.class);
+ MessageProducer targetProducer = createStrictMock(MessageProducer.class);
+ DestinationFactory targetDF = createStrictMock(DestinationFactory.class);
+ Destination targetDest = createStrictMock(Destination.class);
+ TransactionManager tm = createStrictMock(TransactionManager.class);
+ Message message = createNiceMock(Message.class);
+ expect(tm.suspend()).andReturn(null);
+ expect(sourceDF.createDestination()).andReturn(sourceDest);
+ expect(targetDF.createDestination()).andReturn(targetDest);
+ expect(sourceCFF.createConnectionFactory()).andReturn(sourceCF);
+ expect(sourceCF.createConnection()).andReturn(sourceConn);
+ SetExceptionListenerAnswer exceptionListenerAnswer = new SetExceptionListenerAnswer();
+ sourceConn.setExceptionListener(isA(ExceptionListener.class));
+ expectLastCall().andAnswer(exceptionListenerAnswer);
+ expect(sourceConn.createSession(anyBoolean(), anyInt())).andReturn(
+ sourceSession);
+ expect(sourceSession.createConsumer(sourceDest))
+ .andReturn(sourceConsumer);
+ sourceConsumer.setMessageListener(isA(MessageListener.class));
+ expect(targetCFF.createConnectionFactory()).andReturn(targetCF);
+ expect(targetCF.createConnection()).andReturn(targetConn);
+ targetConn.setExceptionListener(isA(ExceptionListener.class));
+ expect(targetConn.createSession(anyBoolean(), anyInt())).andReturn(
+ targetSession);
+ expect(targetSession.createProducer(null)).andReturn(targetProducer);
+ sourceConn.start();
+
+ //after failure detection, we retry to start the bridge:
+ expect(sourceDF.createDestination()).andReturn(sourceDest);
+ expect(targetDF.createDestination()).andReturn(targetDest);
+ expect(sourceCFF.createConnectionFactory()).andReturn(sourceCF);
+ expect(sourceCF.createConnection()).andReturn(sourceConn);
+ sourceConn.setExceptionListener(isA(ExceptionListener.class));
+ expectLastCall().andAnswer(exceptionListenerAnswer);
+ expect(sourceConn.createSession(anyBoolean(), anyInt())).andReturn(
+ sourceSession);
+ expect(sourceSession.createConsumer(sourceDest))
+ .andReturn(sourceConsumer);
+ sourceConsumer.setMessageListener(isA(MessageListener.class));
+ expect(targetCFF.createConnectionFactory()).andReturn(targetCF);
+ expect(targetCF.createConnection()).andReturn(targetConn);
+ targetConn.setExceptionListener(isA(ExceptionListener.class));
+ expect(targetConn.createSession(anyBoolean(), anyInt())).andReturn(
+ targetSession);
+ expect(targetSession.createProducer(null)).andReturn(targetProducer);
+ sourceConn.start();
+
+
+ replay(sourceCFF, sourceCF, sourceConn, sourceSession, sourceConsumer,
+ sourceDF, sourceDest);
+ replay(targetCFF, targetCF, targetConn, targetSession, targetProducer,
+ targetDF, targetDest);
+ replay(tm);
+ replay(message);
+
+ BridgeImpl bridge = new BridgeImpl();
+ assertNotNull(bridge);
+
+ bridge.setSourceConnectionFactoryFactory(sourceCFF);
+ bridge.setSourceDestinationFactory(sourceDF);
+ bridge.setTargetConnectionFactoryFactory(targetCFF);
+ bridge.setTargetDestinationFactory(targetDF);
+ bridge.setFailureRetryInterval(10);
+ bridge.setMaxRetries(2);
+ bridge.setMaxBatchSize(1);
+ bridge.setMaxBatchTime(-1);
+ bridge.setTransactionManager(tm);
+ bridge.setQualityOfServiceMode(QualityOfServiceMode.AT_MOST_ONCE);
+
+ assertFalse(bridge.isStarted());
+ bridge.start();
+ assertTrue(bridge.isStarted());
+
+ exceptionListenerAnswer.listener.onException(new JMSException("exception on the source"));
+ Thread.sleep(4 * bridge.getFailureRetryInterval());
+ // reconnection must have succeded
+ assertTrue(bridge.isStarted());
+
+ verify(sourceCFF, sourceCF, sourceConn, sourceSession, sourceConsumer,
+ sourceDF, sourceDest);
+ verify(targetCFF, targetCF, targetConn, targetSession, targetProducer,
+ targetDF, targetDest);
+ verify(tm);
+ verify(message);
+ }
+
+ public void testExceptionOnSourceAndRetryFails() throws Exception
+ {
+ ConnectionFactoryFactory sourceCFF = createStrictMock(ConnectionFactoryFactory.class);
+ ConnectionFactory sourceCF = createStrictMock(ConnectionFactory.class);
+ Connection sourceConn = createStrictMock(Connection.class);
+ Session sourceSession = createStrictMock(Session.class);
+ MessageConsumer sourceConsumer = createStrictMock(MessageConsumer.class);
+ DestinationFactory sourceDF = createStrictMock(DestinationFactory.class);
+ Destination sourceDest = createStrictMock(Destination.class);
+ ConnectionFactoryFactory targetCFF = createStrictMock(ConnectionFactoryFactory.class);
+ ConnectionFactory targetCF = createStrictMock(ConnectionFactory.class);
+ Connection targetConn = createStrictMock(Connection.class);
+ Session targetSession = createStrictMock(Session.class);
+ MessageProducer targetProducer = createStrictMock(MessageProducer.class);
+ DestinationFactory targetDF = createStrictMock(DestinationFactory.class);
+ Destination targetDest = createStrictMock(Destination.class);
+ TransactionManager tm = createStrictMock(TransactionManager.class);
+ Message message = createNiceMock(Message.class);
+
+ expect(tm.suspend()).andReturn(null);
+ expect(sourceDF.createDestination()).andReturn(sourceDest);
+ expect(targetDF.createDestination()).andReturn(targetDest);
+ expect(sourceCFF.createConnectionFactory()).andReturn(sourceCF);
+ expect(sourceCF.createConnection()).andReturn(sourceConn);
+ SetExceptionListenerAnswer exceptionListenerAnswer = new SetExceptionListenerAnswer();
+ sourceConn.setExceptionListener(isA(ExceptionListener.class));
+ expectLastCall().andAnswer(exceptionListenerAnswer);
+ expect(sourceConn.createSession(anyBoolean(), anyInt())).andReturn(
+ sourceSession);
+ expect(sourceSession.createConsumer(sourceDest))
+ .andReturn(sourceConsumer);
+ sourceConsumer.setMessageListener(isA(MessageListener.class));
+ expect(targetCFF.createConnectionFactory()).andReturn(targetCF);
+ expect(targetCF.createConnection()).andReturn(targetConn);
+ targetConn.setExceptionListener(isA(ExceptionListener.class));
+ expect(targetConn.createSession(anyBoolean(), anyInt())).andReturn(
+ targetSession);
+ expect(targetSession.createProducer(null)).andReturn(targetProducer);
+ sourceConn.start();
+
+ //after failure detection, we clean up...
+ // and it is stopped
+ sourceConn.close();
+ targetConn.close();
+ // ...retry to start the bridge but it fails...
+ expect(sourceDF.createDestination()).andReturn(sourceDest);
+ expect(targetDF.createDestination()).andReturn(targetDest);
+ expect(sourceCFF.createConnectionFactory()).andReturn(sourceCF);
+ expect(sourceCF.createConnection()).andThrow(new JMSException("exception while retrying to connect"));
+ // ... so we clean up again...
+ sourceConn.close();
+ targetConn.close();
+ // ... and finally stop the bridge
+ sourceConn.close();
+ targetConn.close();
+
+ replay(sourceCFF, sourceCF, sourceConn, sourceSession, sourceConsumer,
+ sourceDF, sourceDest);
+ replay(targetCFF, targetCF, targetConn, targetSession, targetProducer,
+ targetDF, targetDest);
+ replay(tm);
+ replay(message);
+
+ BridgeImpl bridge = new BridgeImpl();
+ assertNotNull(bridge);
+
+ bridge.setSourceConnectionFactoryFactory(sourceCFF);
+ bridge.setSourceDestinationFactory(sourceDF);
+ bridge.setTargetConnectionFactoryFactory(targetCFF);
+ bridge.setTargetDestinationFactory(targetDF);
+ bridge.setFailureRetryInterval(10);
+ bridge.setMaxRetries(1);
+ bridge.setMaxBatchSize(1);
+ bridge.setMaxBatchTime(-1);
+ bridge.setTransactionManager(tm);
+ bridge.setQualityOfServiceMode(QualityOfServiceMode.AT_MOST_ONCE);
+
+ assertFalse(bridge.isStarted());
+ bridge.start();
+ assertTrue(bridge.isStarted());
+
+ exceptionListenerAnswer.listener.onException(new JMSException("exception on the source"));
+ Thread.sleep(4 * bridge.getFailureRetryInterval());
+ // reconnection must have failed
+ assertFalse(bridge.isStarted());
+
+ verify(sourceCFF, sourceCF, sourceConn, sourceSession, sourceConsumer,
+ sourceDF, sourceDest);
+ verify(targetCFF, targetCF, targetConn, targetSession, targetProducer,
+ targetDF, targetDest);
+ verify(tm);
+ verify(message);
+ }
+
+ public void testAddMessageIDInHeader() throws Exception
+ {
+ String messageID = randomString();
+
+ ConnectionFactoryFactory sourceCFF = createStrictMock(ConnectionFactoryFactory.class);
+ ConnectionFactory sourceCF = createStrictMock(ConnectionFactory.class);
+ Connection sourceConn = createStrictMock(Connection.class);
+ Session sourceSession = createStrictMock(Session.class);
+ MessageConsumer sourceConsumer = createStrictMock(MessageConsumer.class);
+ DestinationFactory sourceDF = createStrictMock(DestinationFactory.class);
+ Destination sourceDest = createStrictMock(Destination.class);
+ ConnectionFactoryFactory targetCFF = createStrictMock(ConnectionFactoryFactory.class);
+ ConnectionFactory targetCF = createStrictMock(ConnectionFactory.class);
+ Connection targetConn = createStrictMock(Connection.class);
+ Session targetSession = createStrictMock(Session.class);
+ MessageProducer targetProducer = createStrictMock(MessageProducer.class);
+ DestinationFactory targetDF = createStrictMock(DestinationFactory.class);
+ Destination targetDest = createStrictMock(Destination.class);
+ TransactionManager tm = createStrictMock(TransactionManager.class);
+ Message message = createNiceMock(Message.class);
+ Enumeration propsEnum = createStrictMock(Enumeration.class);
+
+ expect(tm.suspend()).andReturn(null);
+ expect(sourceDF.createDestination()).andReturn(sourceDest);
+ expect(targetDF.createDestination()).andReturn(targetDest);
+ expect(sourceCFF.createConnectionFactory()).andReturn(sourceCF);
+ expect(sourceCF.createConnection()).andReturn(sourceConn);
+ sourceConn.setExceptionListener(isA(ExceptionListener.class));
+ expect(sourceConn.createSession(anyBoolean(), anyInt())).andReturn(
+ sourceSession);
+ expect(sourceSession.createConsumer(sourceDest))
+ .andReturn(sourceConsumer);
+ SetMessageListenerAnswer answer = new SetMessageListenerAnswer();
+ sourceConsumer.setMessageListener(isA(MessageListener.class));
+ expectLastCall().andAnswer(answer);
+ expect(targetCFF.createConnectionFactory()).andReturn(targetCF);
+ expect(targetCF.createConnection()).andReturn(targetConn);
+ targetConn.setExceptionListener(isA(ExceptionListener.class));
+ expect(targetConn.createSession(anyBoolean(), anyInt())).andReturn(
+ targetSession);
+ expect(targetSession.createProducer(null)).andReturn(targetProducer);
+ sourceConn.start();
+
+ expect(message.getJMSMessageID()).andReturn(messageID);
+ expect(message.getPropertyNames()).andReturn(propsEnum);
+ expect(propsEnum.hasMoreElements()).andReturn(false);
+ message.setStringProperty(JBossMessage.JBOSS_MESSAGING_BRIDGE_MESSAGE_ID_LIST, messageID);
+ targetProducer.send(targetDest, message, 0, 0, 0);
+
+ replay(sourceCFF, sourceCF, sourceConn, sourceSession, sourceConsumer,
+ sourceDF, sourceDest);
+ replay(targetCFF, targetCF, targetConn, targetSession, targetProducer,
+ targetDF, targetDest);
+ replay(tm);
+ replay(message, propsEnum);
+
+ BridgeImpl bridge = new BridgeImpl();
+ assertNotNull(bridge);
+
+ bridge.setSourceConnectionFactoryFactory(sourceCFF);
+ bridge.setSourceDestinationFactory(sourceDF);
+ bridge.setTargetConnectionFactoryFactory(targetCFF);
+ bridge.setTargetDestinationFactory(targetDF);
+ bridge.setFailureRetryInterval(-1);
+ bridge.setMaxRetries(-1);
+ bridge.setMaxBatchSize(1);
+ bridge.setMaxBatchTime(-1);
+ bridge.setTransactionManager(tm);
+ bridge.setQualityOfServiceMode(QualityOfServiceMode.AT_MOST_ONCE);
+ bridge.setAddMessageIDInHeader(true);
+
+ assertFalse(bridge.isStarted());
+ bridge.start();
+ assertTrue(bridge.isStarted());
+
+ answer.listener.onMessage(message);
+
+ verify(sourceCFF, sourceCF, sourceConn, sourceSession, sourceConsumer,
+ sourceDF, sourceDest);
+ verify(targetCFF, targetCF, targetConn, targetSession, targetProducer,
+ targetDF, targetDest);
+ verify(tm);
+ verify(message, propsEnum);
+ }
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
@@ -1146,6 +1471,17 @@
}
}
+ class SetExceptionListenerAnswer implements IAnswer
+ {
+ ExceptionListener listener = null;
+
+ public Object answer() throws Throwable
+ {
+ listener = (ExceptionListener) getCurrentArguments()[0];
+ return null;
+ }
+ }
+
/*
* A stub which implements both XAConnectionFactory & ConnectionFactory
*/
More information about the jboss-cvs-commits
mailing list