[hornetq-commits] JBoss hornetq SVN: r11054 - in branches/HORNETQ-720_Replication: hornetq-core/src/main/java/org/hornetq/core/server/impl and 3 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Wed Jul 27 13:25:49 EDT 2011


Author: borges
Date: 2011-07-27 13:25:49 -0400 (Wed, 27 Jul 2011)
New Revision: 11054

Modified:
   branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
   branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java
   branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailBackAutoTest.java
   branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTest.java
   branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java
   branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/util/TestableServer.java
   branches/HORNETQ-720_Replication/tests/unit-tests/src/test/java/org/hornetq/tests/util/ServiceTestBase.java
Log:
clean up

Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java	2011-07-27 17:24:19 UTC (rev 11053)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java	2011-07-27 17:25:49 UTC (rev 11054)
@@ -370,9 +370,6 @@
 
    }
 
-   /* (non-Javadoc)
-    * @see org.hornetq.core.replication.ReplicationManager#compareJournals(org.hornetq.core.journal.JournalLoadInformation[])
-    */
    public void compareJournals(final JournalLoadInformation[] journalInfo) throws HornetQException
    {
       replicatingChannel.sendBlocking(new ReplicationCompareDataMessage(journalInfo));

Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java	2011-07-27 17:24:19 UTC (rev 11053)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java	2011-07-27 17:25:49 UTC (rev 11054)
@@ -153,7 +153,7 @@
    public static final String GENERIC_IGNORED_FILTER = "__HQX=-1";
 
    // Static
-   // ---------------------------------------------------------------------------------------
+   // -----------------------------------------------------------------------------------
 
    // Attributes
    // -----------------------------------------------------------------------------------
@@ -2006,6 +2006,7 @@
          return;
       JournalStorageManager journalStorageManager = (JournalStorageManager)storageManager;
 
+
       replicationManager = new ReplicationManagerImpl(rc, executorFactory);
       replicationManager.start();
 

Modified: branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailBackAutoTest.java
===================================================================
--- branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailBackAutoTest.java	2011-07-27 17:24:19 UTC (rev 11053)
+++ branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailBackAutoTest.java	2011-07-27 17:25:49 UTC (rev 11054)
@@ -54,17 +54,7 @@
    @Override
    protected void tearDown() throws Exception
    {
-      if (locator != null)
-      {
-         try
-         {
-            locator.close();
-         }
-         catch (Exception e)
-         {
-            //
-         }
-      }
+      closeServerLocator(locator);
       super.tearDown();
    }
 

Modified: branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTest.java
===================================================================
--- branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTest.java	2011-07-27 17:24:19 UTC (rev 11053)
+++ branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTest.java	2011-07-27 17:25:49 UTC (rev 11054)
@@ -102,7 +102,7 @@
    protected void tearDown() throws Exception
    {
       closeSessionFactory();
-      locator.close();
+      closeServerLocator(locator);
       super.tearDown();
    }
 
@@ -215,7 +215,7 @@
 
       ClientProducer producer = session.createProducer(FailoverTestBase.ADDRESS);
 
-      sendMessages(session, producer);
+      sendMessagesSomeDurable(session, producer);
 
       crash(session);
 
@@ -346,7 +346,7 @@
 
       ClientProducer producer = session.createProducer(FailoverTestBase.ADDRESS);
 
-      sendMessages(session, producer);
+      sendMessagesSomeDurable(session, producer);
 
       crash(session);
 
@@ -390,7 +390,7 @@
 
       ClientProducer producer = session.createProducer(FailoverTestBase.ADDRESS);
 
-      sendMessages(session, producer);
+      sendMessagesSomeDurable(session, producer);
 
       crash(session);
 
@@ -442,7 +442,7 @@
 
       ClientProducer producer = session.createProducer(FailoverTestBase.ADDRESS);
 
-      sendMessages(session, producer);
+      sendMessagesSomeDurable(session, producer);
 
       session.commit();
 
@@ -484,7 +484,7 @@
 
       ClientProducer producer = session.createProducer(FailoverTestBase.ADDRESS);
 
-      sendMessages(session, producer);
+      sendMessagesSomeDurable(session, producer);
 
       // messages will be delivered to the consumer when the session is committed
       session.commit();
@@ -524,7 +524,7 @@
 
       ClientProducer producer = session1.createProducer(FailoverTestBase.ADDRESS);
 
-      sendMessages(session1, producer);
+      sendMessagesSomeDurable(session1, producer);
 
       session1.commit();
 
@@ -652,7 +652,7 @@
 
       session.start(xid, XAResource.TMNOFLAGS);
 
-      sendMessages(session, producer);
+      sendMessagesSomeDurable(session, producer);
 
       crash(session);
 
@@ -696,7 +696,7 @@
 
       session.start(xid, XAResource.TMNOFLAGS);
 
-      sendMessages(session, producer);
+      sendMessagesSomeDurable(session, producer);
 
       session.end(xid, XAResource.TMSUCCESS);
 
@@ -746,7 +746,7 @@
 
       session.start(xid, XAResource.TMNOFLAGS);
 
-      sendMessages(session, producer);
+      sendMessagesSomeDurable(session, producer);
 
       session.end(xid, XAResource.TMSUCCESS);
 
@@ -790,7 +790,7 @@
 
       session.start(xid, XAResource.TMNOFLAGS);
 
-      sendMessages(session, producer);
+      sendMessagesSomeDurable(session, producer);
 
       session.end(xid, XAResource.TMSUCCESS);
 
@@ -831,7 +831,7 @@
 
       ClientProducer producer = session1.createProducer(FailoverTestBase.ADDRESS);
 
-      sendMessages(session1, producer);
+      sendMessagesSomeDurable(session1, producer);
 
       session1.commit();
 
@@ -877,7 +877,7 @@
 
       ClientProducer producer = session1.createProducer(FailoverTestBase.ADDRESS);
 
-      sendMessages(session1, producer);
+      sendMessagesSomeDurable(session1, producer);
 
       session1.commit();
 
@@ -925,7 +925,7 @@
 
       ClientProducer producer = session1.createProducer(FailoverTestBase.ADDRESS);
 
-      sendMessages(session1, producer);
+      sendMessagesSomeDurable(session1, producer);
 
       session1.commit();
 
@@ -1067,7 +1067,7 @@
 
       ClientProducer producer = session.createProducer(FailoverTestBase.ADDRESS);
 
-      sendMessages(session, producer);
+      sendMessagesSomeDurable(session, producer);
 
       ClientConsumer consumer = session.createConsumer(FailoverTestBase.ADDRESS, true);
 
@@ -1093,11 +1093,13 @@
       closeSessionFactory();
    }
 
-   private void sendMessages(ClientSession session, ClientProducer producer) throws Exception, HornetQException
+   private void sendMessagesSomeDurable(ClientSession session, ClientProducer producer) throws Exception, HornetQException
    {
       for (int i = 0; i < NUM_MESSAGES; i++)
       {
-         ClientMessage message = session.createMessage(isDurable(i));
+         // some are durable, some are not!
+         boolean durable = isDurable(i);
+         ClientMessage message = session.createMessage(durable);
          setBody(i, message);
          message.putIntProperty("counter", i);
          producer.send(message);
@@ -1114,7 +1116,7 @@
 
       ClientProducer producer = session.createProducer(FailoverTestBase.ADDRESS);
 
-      sendMessages(session, producer);
+      sendMessagesSomeDurable(session, producer);
 
       ClientConsumer consumer = session.createConsumer(FailoverTestBase.ADDRESS);
 
@@ -1179,7 +1181,7 @@
 
       ClientProducer producer = session.createProducer(FailoverTestBase.ADDRESS);
 
-      sendMessages(session, producer);
+      sendMessagesSomeDurable(session, producer);
 
       ClientConsumer consumer = session.createConsumer(FailoverTestBase.ADDRESS);
 
@@ -1203,20 +1205,8 @@
       }
 
       // Should get the same ones after failover since we didn't ack
+      receiveMessagesAndAck(consumer, NUM_MESSAGES, NUM_MESSAGES * 2);
 
-      for (int i = NUM_MESSAGES; i < NUM_MESSAGES * 2; i++)
-      {
-         ClientMessage message = consumer.receive(1000);
-
-         Assert.assertNotNull(message);
-
-         assertMessageBody(i, message);
-
-         Assert.assertEquals(i, message.getIntProperty("counter").intValue());
-
-         message.acknowledge();
-      }
-
       session.close();
 
       closeSessionFactory();
@@ -1224,14 +1214,7 @@
 
    private void receiveMessages(ClientConsumer consumer) throws HornetQException
    {
-      for (int i = 0; i < NUM_MESSAGES; i++)
-      {
-         ClientMessage message = consumer.receive(1000);
-         Assert.assertNotNull(message);
-         assertMessageBody(i, message);
-         Assert.assertEquals(i, message.getIntProperty("counter").intValue());
-         message.acknowledge();
-      }
+      receiveMessagesAndAck(consumer, 0, NUM_MESSAGES);
    }
 
    public void testSimpleSendAfterFailoverDurableTemporary() throws Exception
@@ -1281,7 +1264,7 @@
 
       crash(session);
 
-      sendMessages(session, producer);
+      sendMessagesSomeDurable(session, producer);
 
       receiveMessages(consumer);
 

Modified: branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java
===================================================================
--- branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java	2011-07-27 17:24:19 UTC (rev 11053)
+++ branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java	2011-07-27 17:25:49 UTC (rev 11054)
@@ -182,8 +182,8 @@
    @Override
    protected void tearDown() throws Exception
    {
-      backupServer.stop();
-      liveServer.stop();
+      stopComponent(backupServer);
+      stopComponent(liveServer);
 
       Assert.assertEquals(0, InVMRegistry.instance.size());
 
@@ -230,11 +230,11 @@
       return sf;
    }
 
-   protected static void waitForBackup(ClientSessionFactoryInternal sf, long seconds) throws Exception
+   protected static void waitForBackup(ClientSessionFactoryInternal sessionFactory, long seconds) throws Exception
    {
-      long time = System.currentTimeMillis();
-      long toWait = seconds * 1000;
-      while (sf.getBackupConnector() == null)
+      final long toWait = seconds * 1000;
+      final long time = System.currentTimeMillis();
+      while (sessionFactory.getBackupConnector() == null)
       {
          try
          {
@@ -244,7 +244,7 @@
          {
             //ignore
          }
-         if (sf.getBackupConnector() != null)
+         if (sessionFactory.getBackupConnector() != null)
          {
             break;
          }
@@ -253,7 +253,7 @@
             fail("backup server never started");
          }
       }
-      System.out.println("sf.getBackupConnector() = " + sf.getBackupConnector());
+      System.out.println("sf.getBackupConnector() = " + sessionFactory.getBackupConnector());
    }
 
    protected TransportConfiguration getNettyAcceptorTransportConfiguration(final boolean live)

Modified: branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/util/TestableServer.java
===================================================================
--- branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/util/TestableServer.java	2011-07-27 17:24:19 UTC (rev 11053)
+++ branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/util/TestableServer.java	2011-07-27 17:25:49 UTC (rev 11054)
@@ -15,6 +15,7 @@
 
 import org.hornetq.api.core.Interceptor;
 import org.hornetq.api.core.client.ClientSession;
+import org.hornetq.core.server.HornetQComponent;
 import org.hornetq.core.server.HornetQServer;
 
 /**
@@ -24,7 +25,7 @@
  *
  *
  */
-public interface TestableServer
+public interface TestableServer extends HornetQComponent
 {
 
    HornetQServer getServer();

Modified: branches/HORNETQ-720_Replication/tests/unit-tests/src/test/java/org/hornetq/tests/util/ServiceTestBase.java
===================================================================
--- branches/HORNETQ-720_Replication/tests/unit-tests/src/test/java/org/hornetq/tests/util/ServiceTestBase.java	2011-07-27 17:24:19 UTC (rev 11053)
+++ branches/HORNETQ-720_Replication/tests/unit-tests/src/test/java/org/hornetq/tests/util/ServiceTestBase.java	2011-07-27 17:25:49 UTC (rev 11054)
@@ -24,7 +24,9 @@
 
 import junit.framework.Assert;
 
+import org.hornetq.api.core.HornetQException;
 import org.hornetq.api.core.TransportConfiguration;
+import org.hornetq.api.core.client.ClientConsumer;
 import org.hornetq.api.core.client.ClientMessage;
 import org.hornetq.api.core.client.ClientProducer;
 import org.hornetq.api.core.client.ClientSession;
@@ -33,7 +35,6 @@
 import org.hornetq.api.core.client.ServerLocator;
 import org.hornetq.core.client.impl.ClientSessionFactoryImpl;
 import org.hornetq.core.config.Configuration;
-import org.hornetq.core.logging.Logger;
 import org.hornetq.core.remoting.impl.invm.InVMAcceptorFactory;
 import org.hornetq.core.remoting.impl.invm.InVMConnectorFactory;
 import org.hornetq.core.remoting.impl.invm.InVMRegistry;
@@ -70,14 +71,7 @@
    {
       for (ServerLocator locator : locators)
       {
-         try
-         {
-            locator.close();
-         }
-         catch (Exception e)
-         {
-            e.printStackTrace();
-         }
+         closeServerLocator(locator);
       }
       locators.clear();
       super.tearDown();
@@ -90,6 +84,20 @@
       }
    }
 
+   public static final void closeServerLocator(ServerLocator locator)
+   {
+      if (locator == null)
+         return;
+      try
+      {
+         locator.close();
+      }
+      catch (Exception e)
+      {
+         e.printStackTrace();
+      }
+   }
+
    protected final static void waitForComponent(final HornetQComponent component, final long seconds) throws Exception
    {
       long time = System.currentTimeMillis();
@@ -111,12 +119,19 @@
       }
    }
 
-   protected final void stopComponent(HornetQComponent component) throws Exception
+   protected static final void stopComponent(HornetQComponent component)
    {
       if (component == null)
          return;
       if (component.isStarted())
-         component.stop();
+         try
+         {
+            component.stop();
+         }
+         catch (Exception e)
+         {
+            // no-op
+         }
    }
 
    protected static Map<String, Object> generateParams(final int node, final boolean netty)
@@ -166,9 +181,6 @@
       return new TransportConfiguration(className, params);
    }
 
-   // Static --------------------------------------------------------
-   private final Logger log = Logger.getLogger(this.getClass());
-
    // Constructors --------------------------------------------------
 
    public ServiceTestBase()
@@ -507,7 +519,7 @@
     * @param numMessages
     * @throws Exception
     */
-   public void sendMessages(ClientSession session, ClientProducer producer, int numMessages) throws Exception
+   public final void sendMessages(ClientSession session, ClientProducer producer, int numMessages) throws Exception
    {
       for (int i = 0; i < numMessages; i++)
       {
@@ -519,6 +531,19 @@
    }
 
 
+   protected final
+            void receiveMessagesAndAck(ClientConsumer consumer, int start, int msgCount) throws HornetQException
+   {
+      for (int i = start; i < msgCount; i++)
+      {
+         ClientMessage message = consumer.receive(1000);
+         Assert.assertNotNull(message);
+         assertMessageBody(i, message);
+         Assert.assertEquals(i, message.getIntProperty("counter").intValue());
+         message.acknowledge();
+      }
+   }
+
    /**
     * Deleting a file on LargeDire is an asynchronous process. We need to keep looking for a while
     * if the file hasn't been deleted yet.



More information about the hornetq-commits mailing list