[jboss-cvs] JBoss Messaging SVN: r4645 - in trunk/tests/src/org/jboss/messaging/tests: timing/core and 8 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Mon Jul 7 09:11:47 EDT 2008


Author: ataylor
Date: 2008-07-07 09:11:47 -0400 (Mon, 07 Jul 2008)
New Revision: 4645

Added:
   trunk/tests/src/org/jboss/messaging/tests/timing/core/journal/
   trunk/tests/src/org/jboss/messaging/tests/timing/core/journal/impl/
   trunk/tests/src/org/jboss/messaging/tests/timing/core/journal/impl/FakeJournalImplTest.java
   trunk/tests/src/org/jboss/messaging/tests/timing/core/journal/impl/JournalImplTestUnit.java
   trunk/tests/src/org/jboss/messaging/tests/timing/core/journal/impl/RealJournalImplAIOTest.java
   trunk/tests/src/org/jboss/messaging/tests/timing/core/journal/impl/RealJournalImplTest.java
   trunk/tests/src/org/jboss/messaging/tests/timing/core/remoting/impl/ResponseHandlerImplTest.java
   trunk/tests/src/org/jboss/messaging/tests/timing/jms/
   trunk/tests/src/org/jboss/messaging/tests/timing/jms/bridge/
   trunk/tests/src/org/jboss/messaging/tests/timing/jms/bridge/impl/
   trunk/tests/src/org/jboss/messaging/tests/timing/jms/bridge/impl/BridgeImplTest.java
Removed:
   trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/timing/
Modified:
   trunk/tests/src/org/jboss/messaging/tests/unit/core/remoting/impl/ResponseHandlerImplTest.java
Log:
moved tests to timing

Copied: trunk/tests/src/org/jboss/messaging/tests/timing/core/journal/impl/FakeJournalImplTest.java (from rev 4641, trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/timing/FakeJournalImplTest.java)
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/timing/core/journal/impl/FakeJournalImplTest.java	                        (rev 0)
+++ trunk/tests/src/org/jboss/messaging/tests/timing/core/journal/impl/FakeJournalImplTest.java	2008-07-07 13:11:47 UTC (rev 4645)
@@ -0,0 +1,42 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.tests.timing.core.journal.impl;
+
+import org.jboss.messaging.core.journal.SequentialFileFactory;
+import org.jboss.messaging.tests.unit.core.journal.impl.fakes.FakeSequentialFileFactory;
+
+/**
+ * 
+ * A FakeJournalImplTest
+ * 
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ */
+public class FakeJournalImplTest extends JournalImplTestUnit
+{
+	protected SequentialFileFactory getFileFactory() throws Exception
+	{
+		return new FakeSequentialFileFactory();
+	}
+}
+

Copied: trunk/tests/src/org/jboss/messaging/tests/timing/core/journal/impl/JournalImplTestUnit.java (from rev 4641, trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/timing/JournalImplTestUnit.java)
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/timing/core/journal/impl/JournalImplTestUnit.java	                        (rev 0)
+++ trunk/tests/src/org/jboss/messaging/tests/timing/core/journal/impl/JournalImplTestUnit.java	2008-07-07 13:11:47 UTC (rev 4645)
@@ -0,0 +1,191 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.tests.timing.core.journal.impl;
+
+import org.jboss.messaging.core.asyncio.impl.AsynchronousFileImpl;
+import org.jboss.messaging.core.journal.PreparedTransactionInfo;
+import org.jboss.messaging.core.journal.RecordInfo;
+import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.tests.unit.core.journal.impl.JournalImplTestBase;
+
+import java.util.ArrayList;
+
+/**
+ * 
+ * A RealJournalImplTest
+ * 
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @author <a href="mailto:clebert.suconic at jboss.com">Clebert Suconic</a>
+ *
+ */
+public abstract class JournalImplTestUnit extends JournalImplTestBase
+{
+   private static final Logger log = Logger.getLogger(JournalImplTestUnit.class);
+   
+   protected void tearDown() throws Exception
+   {
+      super.tearDown();
+      
+      assertEquals(0, AsynchronousFileImpl.getTotalMaxIO());
+   }
+   
+   public void testAddUpdateDeleteManyLargeFileSize() throws Exception
+   {
+      final int numberAdds = 10000;
+      
+      final int numberUpdates = 5000;
+      
+      final int numberDeletes = 3000;
+                  
+      long[] adds = new long[numberAdds];
+      
+      for (int i = 0; i < numberAdds; i++)
+      {
+         adds[i] = i;
+      }
+      
+      long[] updates = new long[numberUpdates];
+      
+      for (int i = 0; i < numberUpdates; i++)
+      {
+         updates[i] = i;
+      }
+      
+      long[] deletes = new long[numberDeletes];
+      
+      for (int i = 0; i < numberDeletes; i++)
+      {
+         deletes[i] = i;
+      }
+      
+      setup(10, 10 * 1024 * 1024, true);
+      createJournal();
+      startJournal();
+      load();
+      add(adds);
+      update(updates);
+      delete(deletes);
+      stopJournal();
+      createJournal();
+      startJournal();
+      loadAndCheck();
+      
+   }
+   
+   public void testAddUpdateDeleteManySmallFileSize() throws Exception
+   {
+      final int numberAdds = 10000;
+      
+      final int numberUpdates = 5000;
+      
+      final int numberDeletes = 3000;
+                  
+      long[] adds = new long[numberAdds];
+      
+      for (int i = 0; i < numberAdds; i++)
+      {
+         adds[i] = i;
+      }
+      
+      long[] updates = new long[numberUpdates];
+      
+      for (int i = 0; i < numberUpdates; i++)
+      {
+         updates[i] = i;
+      }
+      
+      long[] deletes = new long[numberDeletes];
+      
+      for (int i = 0; i < numberDeletes; i++)
+      {
+         deletes[i] = i;
+      }
+      
+      setup(10, 10 * 1024, true);
+      createJournal();
+      startJournal();
+      load();
+      add(adds);
+      update(updates);
+      delete(deletes);
+
+      stopJournal(false);
+      createJournal();
+      startJournal();
+      loadAndCheck();
+      
+   }
+   
+   public void testReclaimAndReload() throws Exception
+   {
+      setup(2, 10 * 1024 * 1024, false);
+      createJournal();
+      startJournal();
+      load();
+      
+      journal.startReclaimer();
+      
+      long start = System.currentTimeMillis();
+      
+                  
+      byte[] record = generateRecord(recordLength);
+      
+      int NUMBER_OF_RECORDS = 1000;
+
+      for (int count = 0; count < NUMBER_OF_RECORDS; count++)
+      {
+         journal.appendAddRecord(count, (byte)0, record);
+         
+         if (count >= NUMBER_OF_RECORDS / 2)
+         {
+            journal.appendDeleteRecord(count - NUMBER_OF_RECORDS / 2);
+         }
+         
+         if (count % 100 == 0)
+         {
+            log.info("Done: " + count);
+         }
+      }
+      
+      long end = System.currentTimeMillis();
+      
+      double rate = 1000 * ((double)NUMBER_OF_RECORDS) / (end - start);
+      
+      log.info("Rate of " + rate + " adds/removes per sec");
+      
+      log.info("Reclaim status = " + debugJournal());
+               
+      stopJournal();
+      createJournal();
+      startJournal();
+      journal.load(new ArrayList<RecordInfo>(), new ArrayList<PreparedTransactionInfo>());
+      
+      assertEquals(NUMBER_OF_RECORDS / 2, journal.getIDMapSize());
+      
+      stopJournal();
+   }
+   
+   
+}
+
+

Copied: trunk/tests/src/org/jboss/messaging/tests/timing/core/journal/impl/RealJournalImplAIOTest.java (from rev 4641, trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/timing/RealJournalImplAIOTest.java)
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/timing/core/journal/impl/RealJournalImplAIOTest.java	                        (rev 0)
+++ trunk/tests/src/org/jboss/messaging/tests/timing/core/journal/impl/RealJournalImplAIOTest.java	2008-07-07 13:11:47 UTC (rev 4645)
@@ -0,0 +1,67 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.tests.timing.core.journal.impl;
+
+import org.jboss.messaging.core.journal.SequentialFileFactory;
+import org.jboss.messaging.core.journal.impl.AIOSequentialFileFactory;
+import org.jboss.messaging.core.logging.Logger;
+
+import java.io.File;
+
+/**
+ * 
+ * A RealJournalImplTest
+ * 
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ */
+public class RealJournalImplAIOTest extends JournalImplTestUnit
+{
+   private static final Logger log = Logger.getLogger(RealJournalImplAIOTest.class);
+   
+   // Need to run the test over a local disk (no NFS)
+   protected String journalDir = System.getProperty("java.io.tmpdir", "/tmp") + "/journal-test";
+   
+   protected void tearDown() throws Exception
+   {
+      super.tearDown();
+      
+      deleteDirectory(new File(journalDir));
+   }
+
+      
+   protected SequentialFileFactory getFileFactory() throws Exception
+   {
+      File file = new File(journalDir);
+      
+      log.info("deleting directory " + journalDir);
+      
+      deleteDirectory(file);
+      
+      file.mkdir();     
+      
+      return new AIOSequentialFileFactory(journalDir);
+   }
+   
+}
+

Copied: trunk/tests/src/org/jboss/messaging/tests/timing/core/journal/impl/RealJournalImplTest.java (from rev 4641, trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/timing/RealJournalImplTest.java)
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/timing/core/journal/impl/RealJournalImplTest.java	                        (rev 0)
+++ trunk/tests/src/org/jboss/messaging/tests/timing/core/journal/impl/RealJournalImplTest.java	2008-07-07 13:11:47 UTC (rev 4645)
@@ -0,0 +1,60 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.tests.timing.core.journal.impl;
+
+import org.jboss.messaging.core.journal.SequentialFileFactory;
+import org.jboss.messaging.core.journal.impl.NIOSequentialFileFactory;
+import org.jboss.messaging.core.logging.Logger;
+
+import java.io.File;
+
+/**
+ * 
+ * A RealJournalImplTest
+ * 
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @author <a href="mailto:clebert.suconic at jboss.com">Clebert Suconic</a>
+ *
+ */
+public class RealJournalImplTest extends JournalImplTestUnit
+{
+	private static final Logger log = Logger.getLogger(RealJournalImplTest.class);
+	
+	protected String journalDir = System.getProperty("user.home") + "/journal-test";
+		
+	protected SequentialFileFactory getFileFactory() throws Exception
+	{
+		File file = new File(journalDir);
+		
+		log.info("deleting directory " + journalDir);
+		
+		deleteDirectory(file);
+		
+		file.mkdir();		
+		
+		return new NIOSequentialFileFactory(journalDir);
+	}
+	
+	
+}
+

Added: trunk/tests/src/org/jboss/messaging/tests/timing/core/remoting/impl/ResponseHandlerImplTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/timing/core/remoting/impl/ResponseHandlerImplTest.java	                        (rev 0)
+++ trunk/tests/src/org/jboss/messaging/tests/timing/core/remoting/impl/ResponseHandlerImplTest.java	2008-07-07 13:11:47 UTC (rev 4645)
@@ -0,0 +1,60 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.messaging.tests.timing.core.remoting.impl;
+
+import org.jboss.messaging.core.remoting.Packet;
+import org.jboss.messaging.core.remoting.ResponseHandler;
+import org.jboss.messaging.core.remoting.impl.ResponseHandlerImpl;
+import org.jboss.messaging.core.remoting.impl.wireformat.Ping;
+import static org.jboss.messaging.tests.util.RandomUtil.randomLong;
+import org.jboss.messaging.tests.util.UnitTestCase;
+
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * @author <a href="ataylor at redhat.com">Andy Taylor</a>
+ */
+public class ResponseHandlerImplTest extends UnitTestCase
+{
+   protected static final long TIMEOUT = 500;
+
+   public void testReceiveResponseTooLate() throws Exception
+   {
+      final ResponseHandler handler = new ResponseHandlerImpl(randomLong());
+      final AtomicReference<Packet> receivedPacket = new AtomicReference<Packet>();
+
+      Executors.newSingleThreadExecutor().execute(new Runnable() {
+
+         public void run()
+         {
+            Packet response = handler.waitForResponse(TIMEOUT);
+            receivedPacket.set(response);
+         }
+      });
+      // pause for twice the timeout before handling the packet
+      Thread.sleep(TIMEOUT * 2);
+      handler.handle(new Ping(handler.getID()), null);
+
+      assertNull(receivedPacket.get());
+   }
+}

Copied: trunk/tests/src/org/jboss/messaging/tests/timing/jms/bridge/impl/BridgeImplTest.java (from rev 4641, trunk/tests/src/org/jboss/messaging/tests/unit/jms/bridge/impl/timing/BridgeImplTest.java)
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/timing/jms/bridge/impl/BridgeImplTest.java	                        (rev 0)
+++ trunk/tests/src/org/jboss/messaging/tests/timing/jms/bridge/impl/BridgeImplTest.java	2008-07-07 13:11:47 UTC (rev 4645)
@@ -0,0 +1,500 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.tests.timing.jms.bridge.impl;
+
+import junit.framework.TestCase;
+import static org.easymock.EasyMock.*;
+import org.easymock.IAnswer;
+import org.jboss.messaging.jms.bridge.ConnectionFactoryFactory;
+import org.jboss.messaging.jms.bridge.DestinationFactory;
+import org.jboss.messaging.jms.bridge.QualityOfServiceMode;
+import org.jboss.messaging.jms.bridge.impl.BridgeImpl;
+
+import javax.jms.*;
+import javax.transaction.TransactionManager;
+
+/**
+ * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
+ * 
+ * @version <tt>$Revision$</tt>
+ * 
+ */
+public class BridgeImplTest extends TestCase
+{
+   // Constants -----------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   // Public --------------------------------------------------------
+
+   public void testStartWithRepeatedFailure() throws Exception
+   {
+      ConnectionFactoryFactory sourceCFF = createStrictMock(ConnectionFactoryFactory.class);
+      ConnectionFactory sourceCF = createStrictMock(ConnectionFactory.class);
+      Connection sourceConn = createStrictMock(Connection.class);
+      Session sourceSession = createStrictMock(Session.class);
+      MessageConsumer sourceConsumer = createStrictMock(MessageConsumer.class);
+      DestinationFactory sourceDF = createStrictMock(DestinationFactory.class);
+      Destination sourceDest = createStrictMock(Destination.class);
+      ConnectionFactoryFactory targetCFF = createStrictMock(ConnectionFactoryFactory.class);
+      ConnectionFactory targetCF = createStrictMock(ConnectionFactory.class);
+      Connection targetConn = createStrictMock(Connection.class);
+      Session targetSession = createStrictMock(Session.class);
+      MessageProducer targetProducer = createStrictMock(MessageProducer.class);
+      DestinationFactory targetDF = createStrictMock(DestinationFactory.class);
+      Destination targetDest = createStrictMock(Destination.class);
+      TransactionManager tm = createStrictMock(TransactionManager.class);
+
+      expect(tm.suspend()).andReturn(null);
+      expect(sourceDF.createDestination()).andStubReturn(sourceDest);
+      expect(targetDF.createDestination()).andStubReturn(targetDest);
+      expect(sourceCFF.createConnectionFactory()).andStubReturn(sourceCF);
+      // the source connection can not be created
+      expect(sourceCF.createConnection()).andStubThrow(
+            new JMSException("unable to create a conn"));
+
+      replay(sourceCFF, sourceCF, sourceConn, sourceSession, sourceConsumer,
+            sourceDF, sourceDest);
+      replay(targetCFF, targetCF, targetConn, targetSession, targetProducer,
+            targetDF, targetDest);
+      replay(tm);
+
+      BridgeImpl bridge = new BridgeImpl();
+
+      bridge.setSourceConnectionFactoryFactory(sourceCFF);
+      bridge.setSourceDestinationFactory(sourceDF);
+      bridge.setTargetConnectionFactoryFactory(targetCFF);
+      bridge.setTargetDestinationFactory(targetDF);
+      // retry after 10 ms
+      bridge.setFailureRetryInterval(10);
+      // retry only once
+      bridge.setMaxRetries(1);
+      bridge.setMaxBatchSize(1);
+      bridge.setMaxBatchTime(-1);
+      bridge.setTransactionManager(tm);
+      bridge.setQualityOfServiceMode(QualityOfServiceMode.AT_MOST_ONCE);
+
+      assertFalse(bridge.isStarted());
+      bridge.start();
+
+      Thread.sleep(50);
+      assertFalse(bridge.isStarted());
+      assertTrue(bridge.isFailed());
+
+      verify(sourceCFF, sourceCF, sourceConn, sourceSession, sourceConsumer,
+            sourceDF, sourceDest);
+      verify(targetCFF, targetCF, targetConn, targetSession, targetProducer,
+            targetDF, targetDest);
+      verify(tm);
+   }
+
+   public void testStartWithFailureThenSuccess() throws Exception
+   {
+      ConnectionFactoryFactory sourceCFF = createStrictMock(ConnectionFactoryFactory.class);
+      ConnectionFactory sourceCF = createStrictMock(ConnectionFactory.class);
+      Connection sourceConn = createStrictMock(Connection.class);
+      Session sourceSession = createStrictMock(Session.class);
+      MessageConsumer sourceConsumer = createStrictMock(MessageConsumer.class);
+      DestinationFactory sourceDF = createStrictMock(DestinationFactory.class);
+      Destination sourceDest = createStrictMock(Destination.class);
+      ConnectionFactoryFactory targetCFF = createStrictMock(ConnectionFactoryFactory.class);
+      ConnectionFactory targetCF = createStrictMock(ConnectionFactory.class);
+      Connection targetConn = createStrictMock(Connection.class);
+      Session targetSession = createStrictMock(Session.class);
+      MessageProducer targetProducer = createStrictMock(MessageProducer.class);
+      DestinationFactory targetDF = createStrictMock(DestinationFactory.class);
+      Destination targetDest = createStrictMock(Destination.class);
+      TransactionManager tm = createStrictMock(TransactionManager.class);
+
+      expect(tm.suspend()).andReturn(null);
+      expect(sourceDF.createDestination()).andStubReturn(sourceDest);
+      expect(targetDF.createDestination()).andStubReturn(targetDest);
+      expect(sourceCFF.createConnectionFactory()).andStubReturn(sourceCF);
+      // the source connection can not be created the 1st time...
+      expect(sourceCF.createConnection()).andThrow(
+            new JMSException("unable to create a conn"));
+      // ... and it succeeds the 2nd time
+      expect(sourceCF.createConnection()).andReturn(sourceConn);
+      sourceConn.setExceptionListener(isA(ExceptionListener.class));
+      expect(sourceConn.createSession(anyBoolean(), anyInt())).andReturn(
+            sourceSession);
+      expect(sourceSession.createConsumer(sourceDest))
+            .andReturn(sourceConsumer);
+      sourceConsumer.setMessageListener(isA(MessageListener.class));
+      expect(targetCFF.createConnectionFactory()).andReturn(targetCF);
+      expect(targetCF.createConnection()).andReturn(targetConn);
+      targetConn.setExceptionListener(isA(ExceptionListener.class));
+      expect(targetConn.createSession(anyBoolean(), anyInt())).andReturn(
+            targetSession);
+      expect(targetSession.createProducer(null)).andReturn(targetProducer);
+      sourceConn.start();
+
+      replay(sourceCFF, sourceCF, sourceConn, sourceSession, sourceConsumer,
+            sourceDF, sourceDest);
+      replay(targetCFF, targetCF, targetConn, targetSession, targetProducer,
+            targetDF, targetDest);
+      replay(tm);
+
+      BridgeImpl bridge = new BridgeImpl();
+
+      bridge.setSourceConnectionFactoryFactory(sourceCFF);
+      bridge.setSourceDestinationFactory(sourceDF);
+      bridge.setTargetConnectionFactoryFactory(targetCFF);
+      bridge.setTargetDestinationFactory(targetDF);
+      // retry after 10 ms
+      bridge.setFailureRetryInterval(10);
+      // retry only once
+      bridge.setMaxRetries(1);
+      bridge.setMaxBatchSize(1);
+      bridge.setMaxBatchTime(-1);
+      bridge.setTransactionManager(tm);
+      bridge.setQualityOfServiceMode(QualityOfServiceMode.AT_MOST_ONCE);
+
+      assertFalse(bridge.isStarted());
+      bridge.start();
+
+      Thread.sleep(50);
+      assertTrue(bridge.isStarted());
+      assertFalse(bridge.isFailed());
+
+      verify(sourceCFF, sourceCF, sourceConn, sourceSession, sourceConsumer,
+            sourceDF, sourceDest);
+      verify(targetCFF, targetCF, targetConn, targetSession, targetProducer,
+            targetDF, targetDest);
+      verify(tm);
+   }
+
+   /*
+    * we receive only 1 message. The message is sent when the maxBatchTime
+    * expires even if the maxBatchSize is not reached
+    */
+   public void testSendMessagesWhenMaxBatchTimeExpires() throws Exception
+   {
+      int maxBatchSize = 2;
+      long maxBatchTime = 500;
+
+      ConnectionFactoryFactory sourceCFF = createStrictMock(ConnectionFactoryFactory.class);
+      ConnectionFactory sourceCF = createStrictMock(ConnectionFactory.class);
+      Connection sourceConn = createStrictMock(Connection.class);
+      Session sourceSession = createStrictMock(Session.class);
+      MessageConsumer sourceConsumer = createStrictMock(MessageConsumer.class);
+      DestinationFactory sourceDF = createStrictMock(DestinationFactory.class);
+      Destination sourceDest = createStrictMock(Destination.class);
+      ConnectionFactoryFactory targetCFF = createStrictMock(ConnectionFactoryFactory.class);
+      ConnectionFactory targetCF = createStrictMock(ConnectionFactory.class);
+      Connection targetConn = createStrictMock(Connection.class);
+      Session targetSession = createStrictMock(Session.class);
+      MessageProducer targetProducer = createStrictMock(MessageProducer.class);
+      DestinationFactory targetDF = createStrictMock(DestinationFactory.class);
+      Destination targetDest = createStrictMock(Destination.class);
+      TransactionManager tm = createStrictMock(TransactionManager.class);
+      Message message = createNiceMock(Message.class);
+
+      expect(tm.suspend()).andReturn(null);
+      expect(sourceDF.createDestination()).andReturn(sourceDest);
+      expect(targetDF.createDestination()).andReturn(targetDest);
+      expect(sourceCFF.createConnectionFactory()).andReturn(sourceCF);
+      expect(sourceCF.createConnection()).andReturn(sourceConn);
+      sourceConn.setExceptionListener(isA(ExceptionListener.class));
+      expect(sourceConn.createSession(anyBoolean(), anyInt())).andReturn(
+            sourceSession);
+      expect(sourceSession.createConsumer(sourceDest))
+            .andReturn(sourceConsumer);
+      SetMessageListenerAnswer answer = new SetMessageListenerAnswer();
+      sourceConsumer.setMessageListener(isA(MessageListener.class));
+      expectLastCall().andAnswer(answer);
+      expect(targetCFF.createConnectionFactory()).andReturn(targetCF);
+      expect(targetCF.createConnection()).andReturn(targetConn);
+      targetConn.setExceptionListener(isA(ExceptionListener.class));
+      expect(targetConn.createSession(anyBoolean(), anyInt())).andReturn(
+            targetSession);
+      expect(targetSession.createProducer(null)).andReturn(targetProducer);
+      sourceConn.start();
+
+      targetProducer.send(targetDest, message, 0, 0, 0);
+      targetSession.commit();
+
+      replay(sourceCFF, sourceCF, sourceConn, sourceSession, sourceConsumer,
+            sourceDF, sourceDest);
+      replay(targetCFF, targetCF, targetConn, targetSession, targetProducer,
+            targetDF, targetDest);
+      replay(tm);
+      replay(message);
+
+      BridgeImpl bridge = new BridgeImpl();
+      assertNotNull(bridge);
+
+      bridge.setSourceConnectionFactoryFactory(sourceCFF);
+      bridge.setSourceDestinationFactory(sourceDF);
+      bridge.setTargetConnectionFactoryFactory(targetCFF);
+      bridge.setTargetDestinationFactory(targetDF);
+      bridge.setFailureRetryInterval(-1);
+      bridge.setMaxRetries(-1);
+      bridge.setMaxBatchSize(maxBatchSize);
+      bridge.setMaxBatchTime(maxBatchTime);
+      bridge.setTransactionManager(tm);
+      bridge.setQualityOfServiceMode(QualityOfServiceMode.AT_MOST_ONCE);
+
+      assertFalse(bridge.isStarted());
+      bridge.start();
+      assertTrue(bridge.isStarted());
+
+      answer.listener.onMessage(message);
+
+      Thread.sleep(3 * maxBatchTime);
+
+      verify(sourceCFF, sourceCF, sourceConn, sourceSession, sourceConsumer,
+            sourceDF, sourceDest);
+      verify(targetCFF, targetCF, targetConn, targetSession, targetProducer,
+            targetDF, targetDest);
+      verify(tm);
+      verify(message);
+   }
+
+   public void testExceptionOnSourceAndRetrySucceeds() throws Exception
+   {
+      ConnectionFactoryFactory sourceCFF = createStrictMock(ConnectionFactoryFactory.class);
+      ConnectionFactory sourceCF = createStrictMock(ConnectionFactory.class);
+      Connection sourceConn = createStrictMock(Connection.class);
+      Session sourceSession = createStrictMock(Session.class);
+      MessageConsumer sourceConsumer = createStrictMock(MessageConsumer.class);
+      DestinationFactory sourceDF = createStrictMock(DestinationFactory.class);
+      Destination sourceDest = createStrictMock(Destination.class);
+      ConnectionFactoryFactory targetCFF = createStrictMock(ConnectionFactoryFactory.class);
+      ConnectionFactory targetCF = createStrictMock(ConnectionFactory.class);
+      Connection targetConn = createStrictMock(Connection.class);
+      Session targetSession = createStrictMock(Session.class);
+      MessageProducer targetProducer = createStrictMock(MessageProducer.class);
+      DestinationFactory targetDF = createStrictMock(DestinationFactory.class);
+      Destination targetDest = createStrictMock(Destination.class);
+      TransactionManager tm = createStrictMock(TransactionManager.class);
+      Message message = createNiceMock(Message.class);
+
+      expect(tm.suspend()).andReturn(null);
+      expect(sourceDF.createDestination()).andReturn(sourceDest);
+      expect(targetDF.createDestination()).andReturn(targetDest);
+      expect(sourceCFF.createConnectionFactory()).andReturn(sourceCF);
+      expect(sourceCF.createConnection()).andReturn(sourceConn);
+      SetExceptionListenerAnswer exceptionListenerAnswer = new SetExceptionListenerAnswer();
+      sourceConn.setExceptionListener(isA(ExceptionListener.class));
+      expectLastCall().andAnswer(exceptionListenerAnswer);
+      expect(sourceConn.createSession(anyBoolean(), anyInt())).andReturn(
+            sourceSession);
+      expect(sourceSession.createConsumer(sourceDest))
+            .andReturn(sourceConsumer);
+      sourceConsumer.setMessageListener(isA(MessageListener.class));
+      expect(targetCFF.createConnectionFactory()).andReturn(targetCF);
+      expect(targetCF.createConnection()).andReturn(targetConn);
+      targetConn.setExceptionListener(isA(ExceptionListener.class));
+      expect(targetConn.createSession(anyBoolean(), anyInt())).andReturn(
+            targetSession);
+      expect(targetSession.createProducer(null)).andReturn(targetProducer);
+      sourceConn.start();
+
+      //after failure detection, we retry to start the bridge:
+      expect(sourceDF.createDestination()).andReturn(sourceDest);
+      expect(targetDF.createDestination()).andReturn(targetDest);
+      expect(sourceCFF.createConnectionFactory()).andReturn(sourceCF);
+      expect(sourceCF.createConnection()).andReturn(sourceConn);
+      sourceConn.setExceptionListener(isA(ExceptionListener.class));
+      expectLastCall().andAnswer(exceptionListenerAnswer);
+      expect(sourceConn.createSession(anyBoolean(), anyInt())).andReturn(
+            sourceSession);
+      expect(sourceSession.createConsumer(sourceDest))
+            .andReturn(sourceConsumer);
+      sourceConsumer.setMessageListener(isA(MessageListener.class));
+      expect(targetCFF.createConnectionFactory()).andReturn(targetCF);
+      expect(targetCF.createConnection()).andReturn(targetConn);
+      targetConn.setExceptionListener(isA(ExceptionListener.class));
+      expect(targetConn.createSession(anyBoolean(), anyInt())).andReturn(
+            targetSession);
+      expect(targetSession.createProducer(null)).andReturn(targetProducer);
+      sourceConn.start();
+      
+      
+      replay(sourceCFF, sourceCF, sourceConn, sourceSession, sourceConsumer,
+            sourceDF, sourceDest);
+      replay(targetCFF, targetCF, targetConn, targetSession, targetProducer,
+            targetDF, targetDest);
+      replay(tm);
+      replay(message);
+
+      BridgeImpl bridge = new BridgeImpl();
+      assertNotNull(bridge);
+
+      bridge.setSourceConnectionFactoryFactory(sourceCFF);
+      bridge.setSourceDestinationFactory(sourceDF);
+      bridge.setTargetConnectionFactoryFactory(targetCFF);
+      bridge.setTargetDestinationFactory(targetDF);
+      bridge.setFailureRetryInterval(10);
+      bridge.setMaxRetries(2);
+      bridge.setMaxBatchSize(1);
+      bridge.setMaxBatchTime(-1);
+      bridge.setTransactionManager(tm);
+      bridge.setQualityOfServiceMode(QualityOfServiceMode.AT_MOST_ONCE);
+
+      assertFalse(bridge.isStarted());
+      bridge.start();
+      assertTrue(bridge.isStarted());
+      
+      exceptionListenerAnswer.listener.onException(new JMSException("exception on the source"));
+      Thread.sleep(4 * bridge.getFailureRetryInterval());
+      // reconnection must have succeded
+      assertTrue(bridge.isStarted());
+      
+      verify(sourceCFF, sourceCF, sourceConn, sourceSession, sourceConsumer,
+            sourceDF, sourceDest);
+      verify(targetCFF, targetCF, targetConn, targetSession, targetProducer,
+            targetDF, targetDest);
+      verify(tm);
+      verify(message);
+   }
+   
+   public void testExceptionOnSourceAndRetryFails() throws Exception
+   {
+      ConnectionFactoryFactory sourceCFF = createStrictMock(ConnectionFactoryFactory.class);
+      ConnectionFactory sourceCF = createStrictMock(ConnectionFactory.class);
+      Connection sourceConn = createStrictMock(Connection.class);
+      Session sourceSession = createStrictMock(Session.class);
+      MessageConsumer sourceConsumer = createStrictMock(MessageConsumer.class);
+      DestinationFactory sourceDF = createStrictMock(DestinationFactory.class);
+      Destination sourceDest = createStrictMock(Destination.class);
+      ConnectionFactoryFactory targetCFF = createStrictMock(ConnectionFactoryFactory.class);
+      ConnectionFactory targetCF = createStrictMock(ConnectionFactory.class);
+      Connection targetConn = createStrictMock(Connection.class);
+      Session targetSession = createStrictMock(Session.class);
+      MessageProducer targetProducer = createStrictMock(MessageProducer.class);
+      DestinationFactory targetDF = createStrictMock(DestinationFactory.class);
+      Destination targetDest = createStrictMock(Destination.class);
+      TransactionManager tm = createStrictMock(TransactionManager.class);
+      Message message = createNiceMock(Message.class);
+
+      expect(tm.suspend()).andReturn(null);
+      expect(sourceDF.createDestination()).andReturn(sourceDest);
+      expect(targetDF.createDestination()).andReturn(targetDest);
+      expect(sourceCFF.createConnectionFactory()).andReturn(sourceCF);
+      expect(sourceCF.createConnection()).andReturn(sourceConn);
+      SetExceptionListenerAnswer exceptionListenerAnswer = new SetExceptionListenerAnswer();
+      sourceConn.setExceptionListener(isA(ExceptionListener.class));
+      expectLastCall().andAnswer(exceptionListenerAnswer);
+      expect(sourceConn.createSession(anyBoolean(), anyInt())).andReturn(
+            sourceSession);
+      expect(sourceSession.createConsumer(sourceDest))
+            .andReturn(sourceConsumer);
+      sourceConsumer.setMessageListener(isA(MessageListener.class));
+      expect(targetCFF.createConnectionFactory()).andReturn(targetCF);
+      expect(targetCF.createConnection()).andReturn(targetConn);
+      targetConn.setExceptionListener(isA(ExceptionListener.class));
+      expect(targetConn.createSession(anyBoolean(), anyInt())).andReturn(
+            targetSession);
+      expect(targetSession.createProducer(null)).andReturn(targetProducer);
+      sourceConn.start();
+
+      //after failure detection, we clean up...
+      // and it is stopped
+      sourceConn.close();
+      targetConn.close();
+      // ...retry to start the bridge but it fails...
+      expect(sourceDF.createDestination()).andReturn(sourceDest);
+      expect(targetDF.createDestination()).andReturn(targetDest);
+      expect(sourceCFF.createConnectionFactory()).andReturn(sourceCF);
+      expect(sourceCF.createConnection()).andThrow(new JMSException("exception while retrying to connect"));
+      // ... so we clean up again...
+      sourceConn.close();
+      targetConn.close();
+      // ... and finally stop the bridge
+      sourceConn.close();
+      targetConn.close();
+      
+      replay(sourceCFF, sourceCF, sourceConn, sourceSession, sourceConsumer,
+            sourceDF, sourceDest);
+      replay(targetCFF, targetCF, targetConn, targetSession, targetProducer,
+            targetDF, targetDest);
+      replay(tm);
+      replay(message);
+
+      BridgeImpl bridge = new BridgeImpl();
+      assertNotNull(bridge);
+
+      bridge.setSourceConnectionFactoryFactory(sourceCFF);
+      bridge.setSourceDestinationFactory(sourceDF);
+      bridge.setTargetConnectionFactoryFactory(targetCFF);
+      bridge.setTargetDestinationFactory(targetDF);
+      bridge.setFailureRetryInterval(10);
+      bridge.setMaxRetries(1);
+      bridge.setMaxBatchSize(1);
+      bridge.setMaxBatchTime(-1);
+      bridge.setTransactionManager(tm);
+      bridge.setQualityOfServiceMode(QualityOfServiceMode.AT_MOST_ONCE);
+
+      assertFalse(bridge.isStarted());
+      bridge.start();
+      assertTrue(bridge.isStarted());
+      
+      exceptionListenerAnswer.listener.onException(new JMSException("exception on the source"));
+      Thread.sleep(4 * bridge.getFailureRetryInterval());
+      // reconnection must have failed
+      assertFalse(bridge.isStarted());
+      
+      verify(sourceCFF, sourceCF, sourceConn, sourceSession, sourceConsumer,
+            sourceDF, sourceDest);
+      verify(targetCFF, targetCF, targetConn, targetSession, targetProducer,
+            targetDF, targetDest);
+      verify(tm);
+      verify(message);
+   }
+   
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+   
+   class SetExceptionListenerAnswer implements IAnswer
+   {
+      ExceptionListener listener = null;
+
+      public Object answer() throws Throwable
+      {
+         listener = (ExceptionListener) getCurrentArguments()[0];
+         return null;
+      }
+   }
+   
+   class SetMessageListenerAnswer implements IAnswer
+   {
+      MessageListener listener = null;
+
+      public Object answer() throws Throwable
+      {
+         listener = (MessageListener) getCurrentArguments()[0];
+         return null;
+      }
+   }
+}

Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/remoting/impl/ResponseHandlerImplTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/remoting/impl/ResponseHandlerImplTest.java	2008-07-07 12:53:29 UTC (rev 4644)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/remoting/impl/ResponseHandlerImplTest.java	2008-07-07 13:11:47 UTC (rev 4645)
@@ -80,26 +80,8 @@
       assertNotNull(receivedPacket.get());
    }
 
-   public void testReceiveResponseTooLate() throws Exception
-   {
-      final ResponseHandler handler = new ResponseHandlerImpl(randomLong());
-      final AtomicReference<Packet> receivedPacket = new AtomicReference<Packet>();
 
-      Executors.newSingleThreadExecutor().execute(new Runnable() {
 
-         public void run()
-         {
-            Packet response = handler.waitForResponse(TIMEOUT);
-            receivedPacket.set(response);
-         }         
-      });
-      // pause for twice the timeout before handling the packet
-      Thread.sleep(TIMEOUT * 2);
-      handler.handle(new Ping(handler.getID()), null);
-
-      assertNull(receivedPacket.get());
-   }
-
    public void testSetFailed() throws Exception
    {
       ResponseHandler handler = new ResponseHandlerImpl(randomLong());




More information about the jboss-cvs-commits mailing list