[jboss-cvs] JBoss Messaging SVN: r4607 - in trunk: tests/src/org/jboss/messaging/tests/unit/jms/bridge/impl and 1 other directory.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Fri Jun 27 05:36:16 EDT 2008


Author: jmesnil
Date: 2008-06-27 05:36:16 -0400 (Fri, 27 Jun 2008)
New Revision: 4607

Modified:
   trunk/src/main/org/jboss/messaging/jms/bridge/impl/BridgeImpl.java
   trunk/tests/src/org/jboss/messaging/tests/unit/jms/bridge/impl/BridgeImplTest.java
Log:
more bridge unit tests

Modified: trunk/src/main/org/jboss/messaging/jms/bridge/impl/BridgeImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/jms/bridge/impl/BridgeImpl.java	2008-06-27 00:47:24 UTC (rev 4606)
+++ trunk/src/main/org/jboss/messaging/jms/bridge/impl/BridgeImpl.java	2008-06-27 09:36:16 UTC (rev 4607)
@@ -626,7 +626,7 @@
       checkValidValue(maxRetries, "maxRetries");
       if (failureRetryInterval == -1 && maxRetries > 0)
       {
-         throw new IllegalArgumentException("If failureRetryInterval == -1 maxRetries must be 0");
+         throw new IllegalArgumentException("If failureRetryInterval == -1 maxRetries must be set to -1");
       }
       checkMaxBatchSize(maxBatchSize);
       checkValidValue(maxBatchTime, "maxBatchTime");

Modified: trunk/tests/src/org/jboss/messaging/tests/unit/jms/bridge/impl/BridgeImplTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/jms/bridge/impl/BridgeImplTest.java	2008-06-27 00:47:24 UTC (rev 4606)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/jms/bridge/impl/BridgeImplTest.java	2008-06-27 09:36:16 UTC (rev 4607)
@@ -35,6 +35,8 @@
 import static org.easymock.EasyMock.verify;
 import static org.jboss.messaging.tests.util.RandomUtil.randomString;
 
+import java.util.Enumeration;
+
 import javax.jms.Connection;
 import javax.jms.ConnectionFactory;
 import javax.jms.Destination;
@@ -61,6 +63,7 @@
 import org.jboss.messaging.jms.bridge.DestinationFactory;
 import org.jboss.messaging.jms.bridge.QualityOfServiceMode;
 import org.jboss.messaging.jms.bridge.impl.BridgeImpl;
+import org.jboss.messaging.jms.client.JBossMessage;
 
 /**
  * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
@@ -125,7 +128,51 @@
 
       verify(sourceCFF, sourceDF, targetCFF, targetDF);
    }
+   
+   public void testMaxRetriesMustBeDisabledWhenFailureRetryIntervalIsDisabled() throws Exception
+   {
+      long disabledFailureRetryInterval = -1;
+      
+      ConnectionFactoryFactory sourceCFF = createStrictMock(ConnectionFactoryFactory.class);
+      ConnectionFactoryFactory targetCFF = createStrictMock(ConnectionFactoryFactory.class);
+      DestinationFactory sourceDF = createStrictMock(DestinationFactory.class);
+      DestinationFactory targetDF = createStrictMock(DestinationFactory.class);
+      String sourceUsername = randomString();
+      String sourcePassword = randomString();
+      String targetUsername = randomString();
+      String targetPassword = randomString();
+      String selector = "color = 'green'";
+      QualityOfServiceMode qosMode = QualityOfServiceMode.AT_MOST_ONCE;
+      int maxBatchSize = 1;
+      long maxBatchTime = -1;
+      String subName = randomString();
+      String clientID = randomString();
+      boolean addMessageIDInHeader = false;
 
+      replay(sourceCFF, sourceDF, targetCFF, targetDF);
+
+      int maxRetriesGreaterThanZero = 1;
+      try
+      {
+         new BridgeImpl(sourceCFF, targetCFF, sourceDF, targetDF,
+               sourceUsername, sourcePassword, targetUsername, targetPassword,
+               selector, disabledFailureRetryInterval, maxRetriesGreaterThanZero, qosMode, maxBatchSize,
+               maxBatchTime, subName, clientID, addMessageIDInHeader);
+         fail("IllegalArgumentException");
+      } catch(IllegalArgumentException e)
+      {         
+      }
+      
+      int disabledMaxRetries = -1;
+      Bridge bridge = new BridgeImpl(sourceCFF, targetCFF, sourceDF, targetDF,
+            sourceUsername, sourcePassword, targetUsername, targetPassword,
+            selector, disabledFailureRetryInterval, disabledMaxRetries, qosMode, maxBatchSize,
+            maxBatchTime, subName, clientID, addMessageIDInHeader);
+      assertNotNull(bridge);
+      
+      verify(sourceCFF, sourceDF, targetCFF, targetDF);
+   }
+
    public void testSetSourceDestinationFactoryWithNull() throws Exception
    {
       Bridge bridge = new BridgeImpl();
@@ -828,7 +875,285 @@
       // with batch size of 2, receive 1 messages and do not send any
       doSendMessagesByBatchInXA(2, 1, 0, 0);
    }
+   
+   public void testExceptionOnSourceAndRetrySucceeds() throws Exception
+   {
+      ConnectionFactoryFactory sourceCFF = createStrictMock(ConnectionFactoryFactory.class);
+      ConnectionFactory sourceCF = createStrictMock(ConnectionFactory.class);
+      Connection sourceConn = createStrictMock(Connection.class);
+      Session sourceSession = createStrictMock(Session.class);
+      MessageConsumer sourceConsumer = createStrictMock(MessageConsumer.class);
+      DestinationFactory sourceDF = createStrictMock(DestinationFactory.class);
+      Destination sourceDest = createStrictMock(Destination.class);
+      ConnectionFactoryFactory targetCFF = createStrictMock(ConnectionFactoryFactory.class);
+      ConnectionFactory targetCF = createStrictMock(ConnectionFactory.class);
+      Connection targetConn = createStrictMock(Connection.class);
+      Session targetSession = createStrictMock(Session.class);
+      MessageProducer targetProducer = createStrictMock(MessageProducer.class);
+      DestinationFactory targetDF = createStrictMock(DestinationFactory.class);
+      Destination targetDest = createStrictMock(Destination.class);
+      TransactionManager tm = createStrictMock(TransactionManager.class);
+      Message message = createNiceMock(Message.class);
 
+      expect(tm.suspend()).andReturn(null);
+      expect(sourceDF.createDestination()).andReturn(sourceDest);
+      expect(targetDF.createDestination()).andReturn(targetDest);
+      expect(sourceCFF.createConnectionFactory()).andReturn(sourceCF);
+      expect(sourceCF.createConnection()).andReturn(sourceConn);
+      SetExceptionListenerAnswer exceptionListenerAnswer = new SetExceptionListenerAnswer();
+      sourceConn.setExceptionListener(isA(ExceptionListener.class));
+      expectLastCall().andAnswer(exceptionListenerAnswer);
+      expect(sourceConn.createSession(anyBoolean(), anyInt())).andReturn(
+            sourceSession);
+      expect(sourceSession.createConsumer(sourceDest))
+            .andReturn(sourceConsumer);
+      sourceConsumer.setMessageListener(isA(MessageListener.class));
+      expect(targetCFF.createConnectionFactory()).andReturn(targetCF);
+      expect(targetCF.createConnection()).andReturn(targetConn);
+      targetConn.setExceptionListener(isA(ExceptionListener.class));
+      expect(targetConn.createSession(anyBoolean(), anyInt())).andReturn(
+            targetSession);
+      expect(targetSession.createProducer(null)).andReturn(targetProducer);
+      sourceConn.start();
+
+      //after failure detection, we retry to start the bridge:
+      expect(sourceDF.createDestination()).andReturn(sourceDest);
+      expect(targetDF.createDestination()).andReturn(targetDest);
+      expect(sourceCFF.createConnectionFactory()).andReturn(sourceCF);
+      expect(sourceCF.createConnection()).andReturn(sourceConn);
+      sourceConn.setExceptionListener(isA(ExceptionListener.class));
+      expectLastCall().andAnswer(exceptionListenerAnswer);
+      expect(sourceConn.createSession(anyBoolean(), anyInt())).andReturn(
+            sourceSession);
+      expect(sourceSession.createConsumer(sourceDest))
+            .andReturn(sourceConsumer);
+      sourceConsumer.setMessageListener(isA(MessageListener.class));
+      expect(targetCFF.createConnectionFactory()).andReturn(targetCF);
+      expect(targetCF.createConnection()).andReturn(targetConn);
+      targetConn.setExceptionListener(isA(ExceptionListener.class));
+      expect(targetConn.createSession(anyBoolean(), anyInt())).andReturn(
+            targetSession);
+      expect(targetSession.createProducer(null)).andReturn(targetProducer);
+      sourceConn.start();
+      
+      
+      replay(sourceCFF, sourceCF, sourceConn, sourceSession, sourceConsumer,
+            sourceDF, sourceDest);
+      replay(targetCFF, targetCF, targetConn, targetSession, targetProducer,
+            targetDF, targetDest);
+      replay(tm);
+      replay(message);
+
+      BridgeImpl bridge = new BridgeImpl();
+      assertNotNull(bridge);
+
+      bridge.setSourceConnectionFactoryFactory(sourceCFF);
+      bridge.setSourceDestinationFactory(sourceDF);
+      bridge.setTargetConnectionFactoryFactory(targetCFF);
+      bridge.setTargetDestinationFactory(targetDF);
+      bridge.setFailureRetryInterval(10);
+      bridge.setMaxRetries(2);
+      bridge.setMaxBatchSize(1);
+      bridge.setMaxBatchTime(-1);
+      bridge.setTransactionManager(tm);
+      bridge.setQualityOfServiceMode(QualityOfServiceMode.AT_MOST_ONCE);
+
+      assertFalse(bridge.isStarted());
+      bridge.start();
+      assertTrue(bridge.isStarted());
+      
+      exceptionListenerAnswer.listener.onException(new JMSException("exception on the source"));
+      Thread.sleep(4 * bridge.getFailureRetryInterval());
+      // reconnection must have succeded
+      assertTrue(bridge.isStarted());
+      
+      verify(sourceCFF, sourceCF, sourceConn, sourceSession, sourceConsumer,
+            sourceDF, sourceDest);
+      verify(targetCFF, targetCF, targetConn, targetSession, targetProducer,
+            targetDF, targetDest);
+      verify(tm);
+      verify(message);
+   }
+
+   public void testExceptionOnSourceAndRetryFails() throws Exception
+   {
+      ConnectionFactoryFactory sourceCFF = createStrictMock(ConnectionFactoryFactory.class);
+      ConnectionFactory sourceCF = createStrictMock(ConnectionFactory.class);
+      Connection sourceConn = createStrictMock(Connection.class);
+      Session sourceSession = createStrictMock(Session.class);
+      MessageConsumer sourceConsumer = createStrictMock(MessageConsumer.class);
+      DestinationFactory sourceDF = createStrictMock(DestinationFactory.class);
+      Destination sourceDest = createStrictMock(Destination.class);
+      ConnectionFactoryFactory targetCFF = createStrictMock(ConnectionFactoryFactory.class);
+      ConnectionFactory targetCF = createStrictMock(ConnectionFactory.class);
+      Connection targetConn = createStrictMock(Connection.class);
+      Session targetSession = createStrictMock(Session.class);
+      MessageProducer targetProducer = createStrictMock(MessageProducer.class);
+      DestinationFactory targetDF = createStrictMock(DestinationFactory.class);
+      Destination targetDest = createStrictMock(Destination.class);
+      TransactionManager tm = createStrictMock(TransactionManager.class);
+      Message message = createNiceMock(Message.class);
+
+      expect(tm.suspend()).andReturn(null);
+      expect(sourceDF.createDestination()).andReturn(sourceDest);
+      expect(targetDF.createDestination()).andReturn(targetDest);
+      expect(sourceCFF.createConnectionFactory()).andReturn(sourceCF);
+      expect(sourceCF.createConnection()).andReturn(sourceConn);
+      SetExceptionListenerAnswer exceptionListenerAnswer = new SetExceptionListenerAnswer();
+      sourceConn.setExceptionListener(isA(ExceptionListener.class));
+      expectLastCall().andAnswer(exceptionListenerAnswer);
+      expect(sourceConn.createSession(anyBoolean(), anyInt())).andReturn(
+            sourceSession);
+      expect(sourceSession.createConsumer(sourceDest))
+            .andReturn(sourceConsumer);
+      sourceConsumer.setMessageListener(isA(MessageListener.class));
+      expect(targetCFF.createConnectionFactory()).andReturn(targetCF);
+      expect(targetCF.createConnection()).andReturn(targetConn);
+      targetConn.setExceptionListener(isA(ExceptionListener.class));
+      expect(targetConn.createSession(anyBoolean(), anyInt())).andReturn(
+            targetSession);
+      expect(targetSession.createProducer(null)).andReturn(targetProducer);
+      sourceConn.start();
+
+      //after failure detection, we clean up...
+      // and it is stopped
+      sourceConn.close();
+      targetConn.close();
+      // ...retry to start the bridge but it fails...
+      expect(sourceDF.createDestination()).andReturn(sourceDest);
+      expect(targetDF.createDestination()).andReturn(targetDest);
+      expect(sourceCFF.createConnectionFactory()).andReturn(sourceCF);
+      expect(sourceCF.createConnection()).andThrow(new JMSException("exception while retrying to connect"));
+      // ... so we clean up again...
+      sourceConn.close();
+      targetConn.close();
+      // ... and finally stop the bridge
+      sourceConn.close();
+      targetConn.close();
+      
+      replay(sourceCFF, sourceCF, sourceConn, sourceSession, sourceConsumer,
+            sourceDF, sourceDest);
+      replay(targetCFF, targetCF, targetConn, targetSession, targetProducer,
+            targetDF, targetDest);
+      replay(tm);
+      replay(message);
+
+      BridgeImpl bridge = new BridgeImpl();
+      assertNotNull(bridge);
+
+      bridge.setSourceConnectionFactoryFactory(sourceCFF);
+      bridge.setSourceDestinationFactory(sourceDF);
+      bridge.setTargetConnectionFactoryFactory(targetCFF);
+      bridge.setTargetDestinationFactory(targetDF);
+      bridge.setFailureRetryInterval(10);
+      bridge.setMaxRetries(1);
+      bridge.setMaxBatchSize(1);
+      bridge.setMaxBatchTime(-1);
+      bridge.setTransactionManager(tm);
+      bridge.setQualityOfServiceMode(QualityOfServiceMode.AT_MOST_ONCE);
+
+      assertFalse(bridge.isStarted());
+      bridge.start();
+      assertTrue(bridge.isStarted());
+      
+      exceptionListenerAnswer.listener.onException(new JMSException("exception on the source"));
+      Thread.sleep(4 * bridge.getFailureRetryInterval());
+      // reconnection must have failed
+      assertFalse(bridge.isStarted());
+      
+      verify(sourceCFF, sourceCF, sourceConn, sourceSession, sourceConsumer,
+            sourceDF, sourceDest);
+      verify(targetCFF, targetCF, targetConn, targetSession, targetProducer,
+            targetDF, targetDest);
+      verify(tm);
+      verify(message);
+   }
+   
+   public void testAddMessageIDInHeader() throws Exception
+   {
+      String messageID = randomString();
+      
+      ConnectionFactoryFactory sourceCFF = createStrictMock(ConnectionFactoryFactory.class);
+      ConnectionFactory sourceCF = createStrictMock(ConnectionFactory.class);
+      Connection sourceConn = createStrictMock(Connection.class);
+      Session sourceSession = createStrictMock(Session.class);
+      MessageConsumer sourceConsumer = createStrictMock(MessageConsumer.class);
+      DestinationFactory sourceDF = createStrictMock(DestinationFactory.class);
+      Destination sourceDest = createStrictMock(Destination.class);
+      ConnectionFactoryFactory targetCFF = createStrictMock(ConnectionFactoryFactory.class);
+      ConnectionFactory targetCF = createStrictMock(ConnectionFactory.class);
+      Connection targetConn = createStrictMock(Connection.class);
+      Session targetSession = createStrictMock(Session.class);
+      MessageProducer targetProducer = createStrictMock(MessageProducer.class);
+      DestinationFactory targetDF = createStrictMock(DestinationFactory.class);
+      Destination targetDest = createStrictMock(Destination.class);
+      TransactionManager tm = createStrictMock(TransactionManager.class);
+      Message message = createNiceMock(Message.class);
+      Enumeration propsEnum = createStrictMock(Enumeration.class);
+
+      expect(tm.suspend()).andReturn(null);
+      expect(sourceDF.createDestination()).andReturn(sourceDest);
+      expect(targetDF.createDestination()).andReturn(targetDest);
+      expect(sourceCFF.createConnectionFactory()).andReturn(sourceCF);
+      expect(sourceCF.createConnection()).andReturn(sourceConn);
+      sourceConn.setExceptionListener(isA(ExceptionListener.class));
+      expect(sourceConn.createSession(anyBoolean(), anyInt())).andReturn(
+            sourceSession);
+      expect(sourceSession.createConsumer(sourceDest))
+            .andReturn(sourceConsumer);
+      SetMessageListenerAnswer answer = new SetMessageListenerAnswer();
+      sourceConsumer.setMessageListener(isA(MessageListener.class));
+      expectLastCall().andAnswer(answer);
+      expect(targetCFF.createConnectionFactory()).andReturn(targetCF);
+      expect(targetCF.createConnection()).andReturn(targetConn);
+      targetConn.setExceptionListener(isA(ExceptionListener.class));
+      expect(targetConn.createSession(anyBoolean(), anyInt())).andReturn(
+            targetSession);
+      expect(targetSession.createProducer(null)).andReturn(targetProducer);
+      sourceConn.start();
+
+      expect(message.getJMSMessageID()).andReturn(messageID);
+      expect(message.getPropertyNames()).andReturn(propsEnum);
+      expect(propsEnum.hasMoreElements()).andReturn(false);
+      message.setStringProperty(JBossMessage.JBOSS_MESSAGING_BRIDGE_MESSAGE_ID_LIST, messageID);
+      targetProducer.send(targetDest, message, 0, 0, 0);
+
+      replay(sourceCFF, sourceCF, sourceConn, sourceSession, sourceConsumer,
+            sourceDF, sourceDest);
+      replay(targetCFF, targetCF, targetConn, targetSession, targetProducer,
+            targetDF, targetDest);
+      replay(tm);
+      replay(message, propsEnum);
+
+      BridgeImpl bridge = new BridgeImpl();
+      assertNotNull(bridge);
+
+      bridge.setSourceConnectionFactoryFactory(sourceCFF);
+      bridge.setSourceDestinationFactory(sourceDF);
+      bridge.setTargetConnectionFactoryFactory(targetCFF);
+      bridge.setTargetDestinationFactory(targetDF);
+      bridge.setFailureRetryInterval(-1);
+      bridge.setMaxRetries(-1);
+      bridge.setMaxBatchSize(1);
+      bridge.setMaxBatchTime(-1);
+      bridge.setTransactionManager(tm);
+      bridge.setQualityOfServiceMode(QualityOfServiceMode.AT_MOST_ONCE);
+      bridge.setAddMessageIDInHeader(true);
+      
+      assertFalse(bridge.isStarted());
+      bridge.start();
+      assertTrue(bridge.isStarted());
+
+      answer.listener.onMessage(message);
+
+      verify(sourceCFF, sourceCF, sourceConn, sourceSession, sourceConsumer,
+            sourceDF, sourceDest);
+      verify(targetCFF, targetCF, targetConn, targetSession, targetProducer,
+            targetDF, targetDest);
+      verify(tm);
+      verify(message, propsEnum);
+   }
+   
    // Package protected ---------------------------------------------
 
    // Protected -----------------------------------------------------
@@ -1146,6 +1471,17 @@
       }
    }
    
+   class SetExceptionListenerAnswer implements IAnswer
+   {
+      ExceptionListener listener = null;
+
+      public Object answer() throws Throwable
+      {
+         listener = (ExceptionListener) getCurrentArguments()[0];
+         return null;
+      }
+   }
+   
    /*
     * A stub which implements both XAConnectionFactory & ConnectionFactory
     */




More information about the jboss-cvs-commits mailing list