Author: borges
Date: 2011-07-22 10:03:31 -0400 (Fri, 22 Jul 2011)
New Revision: 11032
Added:
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/util/CountDownSessionFailureListener.java
Modified:
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/ReplicatedDistributionTest.java
Log:
Remove code duplication
Modified:
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/ReplicatedDistributionTest.java
===================================================================
---
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/ReplicatedDistributionTest.java 2011-07-22
14:02:49 UTC (rev 11031)
+++
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/ReplicatedDistributionTest.java 2011-07-22
14:03:31 UTC (rev 11032)
@@ -24,11 +24,11 @@
import org.hornetq.api.core.client.ClientMessage;
import org.hornetq.api.core.client.ClientProducer;
import org.hornetq.api.core.client.ClientSession;
-import org.hornetq.api.core.client.SessionFailureListener;
import org.hornetq.core.client.impl.ClientSessionInternal;
import org.hornetq.core.settings.impl.AddressSettings;
import org.hornetq.spi.core.protocol.RemotingConnection;
import org.hornetq.tests.integration.cluster.distribution.ClusterTestBase;
+import org.hornetq.tests.util.CountDownSessionFailureListener;
/**
* A SymmetricFailoverTest
@@ -42,6 +42,10 @@
// Constants -----------------------------------------------------
private static final SimpleString ADDRESS = new
SimpleString("test.SomeAddress");
+ private ClientSession sessionOne;
+ private ClientSession sessionThree;
+ private ClientConsumer consThree;
+ private ClientProducer producer;
// Attributes ----------------------------------------------------
@@ -53,38 +57,10 @@
public void testRedistribution() throws Exception
{
- setupSessionFactory(1, 0, true, true);
- setupSessionFactory(3, 2, true, true);
+ commonTestCode();
- ClientSession sessionOne = sfs[1].createSession(true, true);
+ sessionOne.commit();
- ClientSession sessionThree = sfs[3].createSession(false, false);
-
- sessionOne.createQueue(ReplicatedDistributionTest.ADDRESS,
ReplicatedDistributionTest.ADDRESS, true);
-
- sessionThree.createQueue(ReplicatedDistributionTest.ADDRESS,
ReplicatedDistributionTest.ADDRESS, true);
-
- ClientConsumer consThree =
sessionThree.createConsumer(ReplicatedDistributionTest.ADDRESS);
-
- sessionThree.start();
-
- waitForBindings(3, "test.SomeAddress", 1, 1, true);
- waitForBindings(1, "test.SomeAddress", 1, 1, false);
- try
- {
- ClientProducer producer =
sessionOne.createProducer(ReplicatedDistributionTest.ADDRESS);
-
- for (int i = 0; i < 100; i++)
- {
- ClientMessage msg = sessionOne.createMessage(true);
-
- msg.putIntProperty(new SimpleString("key"), i);
-
- producer.send(msg);
- }
-
- sessionOne.commit();
-
for (int i = 0; i < 50; i++)
{
ClientMessage msg = consThree.receive(15000);
@@ -144,50 +120,16 @@
ClientConsumer consOne =
sessionOne.createConsumer(ReplicatedDistributionTest.ADDRESS);
Assert.assertNull(consOne.receiveImmediate());
-
- }
- finally
- {
- sessionOne.close();
- sessionThree.close();
- }
}
public void testSimpleRedistribution() throws Exception
{
- setupSessionFactory(1, 0, true, true);
- setupSessionFactory(3, 2, true, true);
+ commonTestCode();
- ClientSession sessionOne = sfs[1].createSession(true, true);
+ sessionOne.commit();
- ClientSession sessionThree = sfs[3].createSession(false, false);
-
- sessionOne.createQueue(ReplicatedDistributionTest.ADDRESS,
ReplicatedDistributionTest.ADDRESS, true);
-
- sessionThree.createQueue(ReplicatedDistributionTest.ADDRESS,
ReplicatedDistributionTest.ADDRESS, true);
-
- ClientConsumer consThree =
sessionThree.createConsumer(ReplicatedDistributionTest.ADDRESS);
-
- sessionThree.start();
-
- waitForBindings(3, "test.SomeAddress", 1, 1, true);
- waitForBindings(1, "test.SomeAddress", 1, 1, false);
-
- try
- {
- ClientProducer producer =
sessionOne.createProducer(ReplicatedDistributionTest.ADDRESS);
-
for (int i = 0; i < 100; i++)
{
- ClientMessage msg = sessionOne.createMessage(true);
- msg.putIntProperty(new SimpleString("key"), i);
- producer.send(msg);
- }
-
- sessionOne.commit();
-
- for (int i = 0; i < 100; i++)
- {
ClientMessage msg = consThree.receive(15000);
Assert.assertNotNull(msg);
@@ -211,12 +153,20 @@
ClientConsumer consOne =
sessionOne.createConsumer(ReplicatedDistributionTest.ADDRESS);
Assert.assertNull(consOne.receiveImmediate());
+ }
- }
- finally
+ private void commonTestCode() throws Exception, HornetQException
+ {
+ waitForBindings(3, "test.SomeAddress", 1, 1, true);
+ waitForBindings(1, "test.SomeAddress", 1, 1, false);
+
+ producer = sessionOne.createProducer(ReplicatedDistributionTest.ADDRESS);
+
+ for (int i = 0; i < 100; i++)
{
- sessionOne.close();
- sessionThree.close();
+ ClientMessage msg = sessionOne.createMessage(true);
+ msg.putIntProperty(new SimpleString("key"), i);
+ producer.send(msg);
}
}
@@ -233,23 +183,8 @@
final CountDownLatch latch = new CountDownLatch(1);
- class MyListener implements SessionFailureListener
- {
- public void connectionFailed(final HornetQException me, boolean failedOver)
- {
- latch.countDown();
- }
+ session.addFailureListener(new CountDownSessionFailureListener(latch));
- /* (non-Javadoc)
- * @see
org.hornetq.api.core.client.SessionFailureListener#beforeReconnect(org.hornetq.api.core.exception.HornetQException)
- */
- public void beforeReconnect(final HornetQException exception)
- {
- }
- }
-
- session.addFailureListener(new MyListener());
-
RemotingConnection conn = ((ClientSessionInternal)session).getConnection();
// Simulate failure on connection
@@ -280,11 +215,35 @@
getServer(2).getAddressSettingsRepository().addMatch("test.*", as);
getServer(2).getAddressSettingsRepository().addMatch("test.*", as);
- servers[2].start();
servers[1].start();
servers[3].start();
+ servers[2].start();
+
+ setupSessionFactory(1, 0, true, true);
+ setupSessionFactory(3, 2, true, true);
+
+ sessionOne = sfs[1].createSession(true, true);
+ sessionThree = sfs[3].createSession(false, false);
+
+ sessionOne.createQueue(ReplicatedDistributionTest.ADDRESS,
ReplicatedDistributionTest.ADDRESS, true);
+ sessionThree.createQueue(ReplicatedDistributionTest.ADDRESS,
ReplicatedDistributionTest.ADDRESS, true);
+
+ consThree = sessionThree.createConsumer(ReplicatedDistributionTest.ADDRESS);
+
+ sessionThree.start();
}
+ @Override
+ protected void tearDown() throws Exception
+ {
+ if (sessionOne != null)
+ sessionOne.close();
+ if (sessionThree != null)
+ sessionThree.close();
+
+ super.tearDown();
+ }
+
protected boolean isShared()
{
return false;
Added:
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/util/CountDownSessionFailureListener.java
===================================================================
---
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/util/CountDownSessionFailureListener.java
(rev 0)
+++
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/util/CountDownSessionFailureListener.java 2011-07-22
14:03:31 UTC (rev 11032)
@@ -0,0 +1,29 @@
+package org.hornetq.tests.util;
+
+import java.util.concurrent.CountDownLatch;
+
+import org.hornetq.api.core.HornetQException;
+import org.hornetq.api.core.client.SessionFailureListener;
+
+public final class CountDownSessionFailureListener implements SessionFailureListener
+{
+ private final CountDownLatch latch;
+
+ public CountDownSessionFailureListener(CountDownLatch latch)
+ {
+ this.latch = latch;
+ }
+ @Override
+ public void connectionFailed(HornetQException exception, boolean failedOver)
+ {
+ latch.countDown();
+ }
+
+ @Override
+ public void beforeReconnect(HornetQException exception)
+ {
+ // TODO Auto-generated method stub
+
+ }
+
+}