Author: borges
Date: 2011-07-27 13:25:49 -0400 (Wed, 27 Jul 2011)
New Revision: 11054
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailBackAutoTest.java
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTest.java
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/util/TestableServer.java
branches/HORNETQ-720_Replication/tests/unit-tests/src/test/java/org/hornetq/tests/util/ServiceTestBase.java
Log:
clean up
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
===================================================================
---
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java 2011-07-27
17:24:19 UTC (rev 11053)
+++
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java 2011-07-27
17:25:49 UTC (rev 11054)
@@ -370,9 +370,6 @@
}
- /* (non-Javadoc)
- * @see
org.hornetq.core.replication.ReplicationManager#compareJournals(org.hornetq.core.journal.JournalLoadInformation[])
- */
public void compareJournals(final JournalLoadInformation[] journalInfo) throws
HornetQException
{
replicatingChannel.sendBlocking(new ReplicationCompareDataMessage(journalInfo));
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
---
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java 2011-07-27
17:24:19 UTC (rev 11053)
+++
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java 2011-07-27
17:25:49 UTC (rev 11054)
@@ -153,7 +153,7 @@
public static final String GENERIC_IGNORED_FILTER = "__HQX=-1";
// Static
- //
---------------------------------------------------------------------------------------
+ //
-----------------------------------------------------------------------------------
// Attributes
//
-----------------------------------------------------------------------------------
@@ -2006,6 +2006,7 @@
return;
JournalStorageManager journalStorageManager =
(JournalStorageManager)storageManager;
+
replicationManager = new ReplicationManagerImpl(rc, executorFactory);
replicationManager.start();
Modified:
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailBackAutoTest.java
===================================================================
---
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailBackAutoTest.java 2011-07-27
17:24:19 UTC (rev 11053)
+++
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailBackAutoTest.java 2011-07-27
17:25:49 UTC (rev 11054)
@@ -54,17 +54,7 @@
@Override
protected void tearDown() throws Exception
{
- if (locator != null)
- {
- try
- {
- locator.close();
- }
- catch (Exception e)
- {
- //
- }
- }
+ closeServerLocator(locator);
super.tearDown();
}
Modified:
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTest.java
===================================================================
---
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTest.java 2011-07-27
17:24:19 UTC (rev 11053)
+++
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTest.java 2011-07-27
17:25:49 UTC (rev 11054)
@@ -102,7 +102,7 @@
protected void tearDown() throws Exception
{
closeSessionFactory();
- locator.close();
+ closeServerLocator(locator);
super.tearDown();
}
@@ -215,7 +215,7 @@
ClientProducer producer = session.createProducer(FailoverTestBase.ADDRESS);
- sendMessages(session, producer);
+ sendMessagesSomeDurable(session, producer);
crash(session);
@@ -346,7 +346,7 @@
ClientProducer producer = session.createProducer(FailoverTestBase.ADDRESS);
- sendMessages(session, producer);
+ sendMessagesSomeDurable(session, producer);
crash(session);
@@ -390,7 +390,7 @@
ClientProducer producer = session.createProducer(FailoverTestBase.ADDRESS);
- sendMessages(session, producer);
+ sendMessagesSomeDurable(session, producer);
crash(session);
@@ -442,7 +442,7 @@
ClientProducer producer = session.createProducer(FailoverTestBase.ADDRESS);
- sendMessages(session, producer);
+ sendMessagesSomeDurable(session, producer);
session.commit();
@@ -484,7 +484,7 @@
ClientProducer producer = session.createProducer(FailoverTestBase.ADDRESS);
- sendMessages(session, producer);
+ sendMessagesSomeDurable(session, producer);
// messages will be delivered to the consumer when the session is committed
session.commit();
@@ -524,7 +524,7 @@
ClientProducer producer = session1.createProducer(FailoverTestBase.ADDRESS);
- sendMessages(session1, producer);
+ sendMessagesSomeDurable(session1, producer);
session1.commit();
@@ -652,7 +652,7 @@
session.start(xid, XAResource.TMNOFLAGS);
- sendMessages(session, producer);
+ sendMessagesSomeDurable(session, producer);
crash(session);
@@ -696,7 +696,7 @@
session.start(xid, XAResource.TMNOFLAGS);
- sendMessages(session, producer);
+ sendMessagesSomeDurable(session, producer);
session.end(xid, XAResource.TMSUCCESS);
@@ -746,7 +746,7 @@
session.start(xid, XAResource.TMNOFLAGS);
- sendMessages(session, producer);
+ sendMessagesSomeDurable(session, producer);
session.end(xid, XAResource.TMSUCCESS);
@@ -790,7 +790,7 @@
session.start(xid, XAResource.TMNOFLAGS);
- sendMessages(session, producer);
+ sendMessagesSomeDurable(session, producer);
session.end(xid, XAResource.TMSUCCESS);
@@ -831,7 +831,7 @@
ClientProducer producer = session1.createProducer(FailoverTestBase.ADDRESS);
- sendMessages(session1, producer);
+ sendMessagesSomeDurable(session1, producer);
session1.commit();
@@ -877,7 +877,7 @@
ClientProducer producer = session1.createProducer(FailoverTestBase.ADDRESS);
- sendMessages(session1, producer);
+ sendMessagesSomeDurable(session1, producer);
session1.commit();
@@ -925,7 +925,7 @@
ClientProducer producer = session1.createProducer(FailoverTestBase.ADDRESS);
- sendMessages(session1, producer);
+ sendMessagesSomeDurable(session1, producer);
session1.commit();
@@ -1067,7 +1067,7 @@
ClientProducer producer = session.createProducer(FailoverTestBase.ADDRESS);
- sendMessages(session, producer);
+ sendMessagesSomeDurable(session, producer);
ClientConsumer consumer = session.createConsumer(FailoverTestBase.ADDRESS, true);
@@ -1093,11 +1093,13 @@
closeSessionFactory();
}
- private void sendMessages(ClientSession session, ClientProducer producer) throws
Exception, HornetQException
+ private void sendMessagesSomeDurable(ClientSession session, ClientProducer producer)
throws Exception, HornetQException
{
for (int i = 0; i < NUM_MESSAGES; i++)
{
- ClientMessage message = session.createMessage(isDurable(i));
+ // some are durable, some are not!
+ boolean durable = isDurable(i);
+ ClientMessage message = session.createMessage(durable);
setBody(i, message);
message.putIntProperty("counter", i);
producer.send(message);
@@ -1114,7 +1116,7 @@
ClientProducer producer = session.createProducer(FailoverTestBase.ADDRESS);
- sendMessages(session, producer);
+ sendMessagesSomeDurable(session, producer);
ClientConsumer consumer = session.createConsumer(FailoverTestBase.ADDRESS);
@@ -1179,7 +1181,7 @@
ClientProducer producer = session.createProducer(FailoverTestBase.ADDRESS);
- sendMessages(session, producer);
+ sendMessagesSomeDurable(session, producer);
ClientConsumer consumer = session.createConsumer(FailoverTestBase.ADDRESS);
@@ -1203,20 +1205,8 @@
}
// Should get the same ones after failover since we didn't ack
+ receiveMessagesAndAck(consumer, NUM_MESSAGES, NUM_MESSAGES * 2);
- for (int i = NUM_MESSAGES; i < NUM_MESSAGES * 2; i++)
- {
- ClientMessage message = consumer.receive(1000);
-
- Assert.assertNotNull(message);
-
- assertMessageBody(i, message);
-
- Assert.assertEquals(i, message.getIntProperty("counter").intValue());
-
- message.acknowledge();
- }
-
session.close();
closeSessionFactory();
@@ -1224,14 +1214,7 @@
private void receiveMessages(ClientConsumer consumer) throws HornetQException
{
- for (int i = 0; i < NUM_MESSAGES; i++)
- {
- ClientMessage message = consumer.receive(1000);
- Assert.assertNotNull(message);
- assertMessageBody(i, message);
- Assert.assertEquals(i, message.getIntProperty("counter").intValue());
- message.acknowledge();
- }
+ receiveMessagesAndAck(consumer, 0, NUM_MESSAGES);
}
public void testSimpleSendAfterFailoverDurableTemporary() throws Exception
@@ -1281,7 +1264,7 @@
crash(session);
- sendMessages(session, producer);
+ sendMessagesSomeDurable(session, producer);
receiveMessages(consumer);
Modified:
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java
===================================================================
---
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java 2011-07-27
17:24:19 UTC (rev 11053)
+++
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java 2011-07-27
17:25:49 UTC (rev 11054)
@@ -182,8 +182,8 @@
@Override
protected void tearDown() throws Exception
{
- backupServer.stop();
- liveServer.stop();
+ stopComponent(backupServer);
+ stopComponent(liveServer);
Assert.assertEquals(0, InVMRegistry.instance.size());
@@ -230,11 +230,11 @@
return sf;
}
- protected static void waitForBackup(ClientSessionFactoryInternal sf, long seconds)
throws Exception
+ protected static void waitForBackup(ClientSessionFactoryInternal sessionFactory, long
seconds) throws Exception
{
- long time = System.currentTimeMillis();
- long toWait = seconds * 1000;
- while (sf.getBackupConnector() == null)
+ final long toWait = seconds * 1000;
+ final long time = System.currentTimeMillis();
+ while (sessionFactory.getBackupConnector() == null)
{
try
{
@@ -244,7 +244,7 @@
{
//ignore
}
- if (sf.getBackupConnector() != null)
+ if (sessionFactory.getBackupConnector() != null)
{
break;
}
@@ -253,7 +253,7 @@
fail("backup server never started");
}
}
- System.out.println("sf.getBackupConnector() = " +
sf.getBackupConnector());
+ System.out.println("sf.getBackupConnector() = " +
sessionFactory.getBackupConnector());
}
protected TransportConfiguration getNettyAcceptorTransportConfiguration(final boolean
live)
Modified:
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/util/TestableServer.java
===================================================================
---
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/util/TestableServer.java 2011-07-27
17:24:19 UTC (rev 11053)
+++
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/util/TestableServer.java 2011-07-27
17:25:49 UTC (rev 11054)
@@ -15,6 +15,7 @@
import org.hornetq.api.core.Interceptor;
import org.hornetq.api.core.client.ClientSession;
+import org.hornetq.core.server.HornetQComponent;
import org.hornetq.core.server.HornetQServer;
/**
@@ -24,7 +25,7 @@
*
*
*/
-public interface TestableServer
+public interface TestableServer extends HornetQComponent
{
HornetQServer getServer();
Modified:
branches/HORNETQ-720_Replication/tests/unit-tests/src/test/java/org/hornetq/tests/util/ServiceTestBase.java
===================================================================
---
branches/HORNETQ-720_Replication/tests/unit-tests/src/test/java/org/hornetq/tests/util/ServiceTestBase.java 2011-07-27
17:24:19 UTC (rev 11053)
+++
branches/HORNETQ-720_Replication/tests/unit-tests/src/test/java/org/hornetq/tests/util/ServiceTestBase.java 2011-07-27
17:25:49 UTC (rev 11054)
@@ -24,7 +24,9 @@
import junit.framework.Assert;
+import org.hornetq.api.core.HornetQException;
import org.hornetq.api.core.TransportConfiguration;
+import org.hornetq.api.core.client.ClientConsumer;
import org.hornetq.api.core.client.ClientMessage;
import org.hornetq.api.core.client.ClientProducer;
import org.hornetq.api.core.client.ClientSession;
@@ -33,7 +35,6 @@
import org.hornetq.api.core.client.ServerLocator;
import org.hornetq.core.client.impl.ClientSessionFactoryImpl;
import org.hornetq.core.config.Configuration;
-import org.hornetq.core.logging.Logger;
import org.hornetq.core.remoting.impl.invm.InVMAcceptorFactory;
import org.hornetq.core.remoting.impl.invm.InVMConnectorFactory;
import org.hornetq.core.remoting.impl.invm.InVMRegistry;
@@ -70,14 +71,7 @@
{
for (ServerLocator locator : locators)
{
- try
- {
- locator.close();
- }
- catch (Exception e)
- {
- e.printStackTrace();
- }
+ closeServerLocator(locator);
}
locators.clear();
super.tearDown();
@@ -90,6 +84,20 @@
}
}
+ public static final void closeServerLocator(ServerLocator locator)
+ {
+ if (locator == null)
+ return;
+ try
+ {
+ locator.close();
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ }
+ }
+
protected final static void waitForComponent(final HornetQComponent component, final
long seconds) throws Exception
{
long time = System.currentTimeMillis();
@@ -111,12 +119,19 @@
}
}
- protected final void stopComponent(HornetQComponent component) throws Exception
+ protected static final void stopComponent(HornetQComponent component)
{
if (component == null)
return;
if (component.isStarted())
- component.stop();
+ try
+ {
+ component.stop();
+ }
+ catch (Exception e)
+ {
+ // no-op
+ }
}
protected static Map<String, Object> generateParams(final int node, final
boolean netty)
@@ -166,9 +181,6 @@
return new TransportConfiguration(className, params);
}
- // Static --------------------------------------------------------
- private final Logger log = Logger.getLogger(this.getClass());
-
// Constructors --------------------------------------------------
public ServiceTestBase()
@@ -507,7 +519,7 @@
* @param numMessages
* @throws Exception
*/
- public void sendMessages(ClientSession session, ClientProducer producer, int
numMessages) throws Exception
+ public final void sendMessages(ClientSession session, ClientProducer producer, int
numMessages) throws Exception
{
for (int i = 0; i < numMessages; i++)
{
@@ -519,6 +531,19 @@
}
+ protected final
+ void receiveMessagesAndAck(ClientConsumer consumer, int start, int msgCount)
throws HornetQException
+ {
+ for (int i = start; i < msgCount; i++)
+ {
+ ClientMessage message = consumer.receive(1000);
+ Assert.assertNotNull(message);
+ assertMessageBody(i, message);
+ Assert.assertEquals(i, message.getIntProperty("counter").intValue());
+ message.acknowledge();
+ }
+ }
+
/**
* Deleting a file on LargeDire is an asynchronous process. We need to keep looking
for a while
* if the file hasn't been deleted yet.