[hornetq-commits] JBoss hornetq SVN: r9144 - in trunk: src/main/org/hornetq/core/server and 3 other directories.
do-not-reply at jboss.org
do-not-reply at jboss.org
Wed Apr 21 12:35:33 EDT 2010
Author: clebert.suconic at jboss.com
Date: 2010-04-21 12:35:31 -0400 (Wed, 21 Apr 2010)
New Revision: 9144
Modified:
trunk/build-maven.xml
trunk/src/main/org/hornetq/core/server/ActivateCallback.java
trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
trunk/src/main/org/hornetq/jms/server/impl/JMSServerManagerImpl.java
trunk/tests/src/org/hornetq/tests/integration/cluster/failover/AsynchronousFailoverTest.java
trunk/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java
Log:
Changes to Asynchronous Failover Test
Modified: trunk/build-maven.xml
===================================================================
--- trunk/build-maven.xml 2010-04-21 13:10:13 UTC (rev 9143)
+++ trunk/build-maven.xml 2010-04-21 16:35:31 UTC (rev 9144)
@@ -13,7 +13,7 @@
-->
<project default="upload" name="HornetQ">
- <property name="hornetq.version" value="2.1.0.BETA2"/>
+ <property name="hornetq.version" value="2.1.0.BETA3"/>
<property name="build.dir" value="build"/>
<property name="jars.dir" value="${build.dir}/jars"/>
Modified: trunk/src/main/org/hornetq/core/server/ActivateCallback.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/ActivateCallback.java 2010-04-21 13:10:13 UTC (rev 9143)
+++ trunk/src/main/org/hornetq/core/server/ActivateCallback.java 2010-04-21 16:35:31 UTC (rev 9144)
@@ -22,5 +22,7 @@
*/
public interface ActivateCallback
{
+ void preActivate();
+
void activated();
}
Modified: trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2010-04-21 13:10:13 UTC (rev 9143)
+++ trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2010-04-21 16:35:31 UTC (rev 9144)
@@ -860,6 +860,15 @@
}
}
+
+ private void callPreActiveCallbacks()
+ {
+ for (ActivateCallback callback : activateCallbacks)
+ {
+ callback.preActivate();
+ }
+ }
+
public synchronized boolean checkActivate() throws Exception
{
if (configuration.isBackup())
@@ -938,6 +947,8 @@
{
deploymentManager = new FileDeploymentManager(configuration.getFileDeployerScanPeriod());
}
+
+ callPreActiveCallbacks();
startReplication();
Modified: trunk/src/main/org/hornetq/jms/server/impl/JMSServerManagerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/jms/server/impl/JMSServerManagerImpl.java 2010-04-21 13:10:13 UTC (rev 9143)
+++ trunk/src/main/org/hornetq/jms/server/impl/JMSServerManagerImpl.java 2010-04-21 16:35:31 UTC (rev 9144)
@@ -166,6 +166,11 @@
// ActivateCallback implementation -------------------------------------
+ public void preActivate()
+ {
+
+ }
+
public synchronized void activated()
{
active = true;
Modified: trunk/tests/src/org/hornetq/tests/integration/cluster/failover/AsynchronousFailoverTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/cluster/failover/AsynchronousFailoverTest.java 2010-04-21 13:10:13 UTC (rev 9143)
+++ trunk/tests/src/org/hornetq/tests/integration/cluster/failover/AsynchronousFailoverTest.java 2010-04-21 16:35:31 UTC (rev 9144)
@@ -21,6 +21,8 @@
import junit.framework.Assert;
import org.hornetq.api.core.HornetQException;
+import org.hornetq.api.core.Message;
+import org.hornetq.api.core.SimpleString;
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.api.core.client.ClientConsumer;
import org.hornetq.api.core.client.ClientMessage;
@@ -50,6 +52,8 @@
private volatile ClientSessionFactoryInternal sf;
+ private Object lockFail = new Object();
+
class MyListener implements SessionFailureListener
{
CountDownLatch latch = new CountDownLatch(1);
@@ -87,11 +91,22 @@
{
runTest(new TestRunner()
{
+ volatile boolean running = false;
+
public void run()
{
try
{
- doTestTransactional(this);
+ assertFalse(running);
+ running = true;
+ try
+ {
+ doTestTransactional(this);
+ }
+ finally
+ {
+ running = false;
+ }
}
catch (Throwable e)
{
@@ -107,7 +122,7 @@
volatile boolean failed;
ArrayList<Throwable> errors = new ArrayList<Throwable>();
-
+
boolean isFailed()
{
return failed;
@@ -122,30 +137,27 @@
{
failed = false;
}
-
+
synchronized void addException(Throwable e)
{
errors.add(e);
}
-
+
void checkForExceptions() throws Throwable
{
if (errors.size() > 0)
{
log.warn("Exceptions on test:");
- for (Throwable e: errors)
+ for (Throwable e : errors)
{
log.warn(e.getMessage(), e);
}
// throwing the first error that happened on the Runnable
throw errors.get(0);
}
-
-
}
-
-
+
}
private void runTest(final TestRunner runnable) throws Throwable
@@ -173,6 +185,8 @@
Thread t = new Thread(runnable);
+ t.setName("MainTEST");
+
t.start();
long randomDelay = (long)(2000 * Math.random());
@@ -186,7 +200,10 @@
MyListener listener = this.listener;
// Simulate failure on connection
- conn.fail(new HornetQException(HornetQException.NOT_CONNECTED));
+ synchronized (lockFail)
+ {
+ conn.fail(new HornetQException(HornetQException.NOT_CONNECTED));
+ }
if (listener != null)
{
@@ -200,7 +217,7 @@
AsynchronousFailoverTest.log.info("Fail complete");
t.join();
-
+
runnable.checkForExceptions();
createSession.close();
@@ -242,24 +259,24 @@
session.addFailureListener(listener);
this.listener = listener;
-
+
ClientProducer producer = session.createProducer(FailoverTestBase.ADDRESS);
final int numMessages = 1000;
for (int i = 0; i < numMessages; i++)
{
- ClientMessage message = session.createMessage(true);
-
- message.getBodyBuffer().writeString("message" + i);
-
- message.putIntProperty("counter", i);
-
boolean retry = false;
do
{
try
{
+ ClientMessage message = session.createMessage(true);
+
+ message.getBodyBuffer().writeString("message" + i);
+
+ message.putIntProperty("counter", i);
+
producer.send(message);
retry = false;
@@ -343,69 +360,59 @@
private void doTestTransactional(final TestRunner runner) throws Exception
{
+ // For duplication detection
+ int executionId = 0;
+
while (!runner.isFailed())
{
ClientSession session = null;
+ executionId++;
+
try
{
- session = sf.createSession(false, false);
MyListener listener = new MyListener();
- session.addFailureListener(listener);
-
this.listener = listener;
+ boolean retry = false;
- ClientProducer producer = session.createProducer(FailoverTestBase.ADDRESS);
-
final int numMessages = 1000;
- for (int i = 0; i < numMessages; i++)
+ session = sf.createSession(false, false);
+
+ session.addFailureListener(listener);
+
+ do
{
- ClientMessage message = session.createMessage(true);
+ try
+ {
+ ClientProducer producer = session.createProducer(FailoverTestBase.ADDRESS);
- message.getBodyBuffer().writeString("message" + i);
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = session.createMessage(true);
- message.putIntProperty("counter", i);
+ message.getBodyBuffer().writeString("message" + i);
- boolean retry = false;
- do
- {
- try
- {
+ message.putIntProperty("counter", i);
+
+ message.putStringProperty(Message.HDR_DUPLICATE_DETECTION_ID, new SimpleString("id:" + i +
+ ",exec:" +
+ executionId));
+
producer.send(message);
}
- catch (HornetQException e)
- {
- Assert.assertEquals(e.getCode(), HornetQException.UNBLOCKED);
- retry = true;
- }
- }
- while (retry);
- }
-
- boolean retry = false;
- while (retry)
- {
- try
- {
session.commit();
retry = false;
}
catch (HornetQException e)
{
- if (e.getCode() == HornetQException.TRANSACTION_ROLLED_BACK)
+ if (e.getCode() == HornetQException.TRANSACTION_ROLLED_BACK || e.getCode() == HornetQException.UNBLOCKED)
{
// OK
- session.close();
-
- continue;
- }
- else if (e.getCode() == HornetQException.UNBLOCKED)
- {
retry = true;
}
else
@@ -414,70 +421,79 @@
}
}
}
+ while (retry);
- ClientConsumer consumer = null;
- while (true)
+
+ boolean blocked = false;
+
+ retry = false;
+
+ ClientConsumer consumer = null;
+ do
{
+ ArrayList<Integer> msgs = new ArrayList<Integer>();
try
{
- consumer = session.createConsumer(FailoverTestBase.ADDRESS);
-
- break;
- }
- catch (HornetQException e)
- {
- if (e.getCode() == HornetQException.UNBLOCKED)
+ if (consumer == null)
{
- continue;
+ consumer = session.createConsumer(FailoverTestBase.ADDRESS);
+ session.start();
}
- throw e;
- }
- }
-
- session.start();
- int lastCount = -1;
- while (true)
- {
- ClientMessage message = consumer.receive(500);
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = consumer.receive(500);
+ if (message == null)
+ {
+ break;
+ }
- if (message == null)
- {
- break;
- }
+ int count = message.getIntProperty("counter");
- // There may be some missing or duplicate messages - but the order should be correct
+ msgs.add(count);
- int count = message.getIntProperty("counter");
+ message.acknowledge();
+ }
- Assert.assertTrue("count:" + count + " last count:" + lastCount, count >= lastCount);
+ session.commit();
+
+ if (blocked)
+ {
+ assertTrue("msgs.size is expected to be 0 or " + numMessages + " but it was " + msgs.size(), msgs.size() == 0 || msgs.size() == numMessages);
+ }
+ else
+ {
+ assertTrue("msgs.size is expected to be " + numMessages + " but it was " + msgs.size(), msgs.size() == numMessages);
+ }
- lastCount = count;
+ int i = 0;
+ for (Integer msg : msgs)
+ {
+ assertEquals(i++, (int)msg);
+ }
- message.acknowledge();
- }
-
- retry = false;
- while (retry)
- {
- try
- {
- session.commit();
-
retry = false;
+ blocked = false;
}
catch (HornetQException e)
{
if (e.getCode() == HornetQException.TRANSACTION_ROLLED_BACK)
{
- // OK
- session.close();
-
- continue;
+ // TODO: https://jira.jboss.org/jira/browse/HORNETQ-369
+ // ATM RolledBack exception is being called with the transaction is committed.
+ // the test will fail if you remove this next line
+ blocked = true;
}
else if (e.getCode() == HornetQException.UNBLOCKED)
{
+ // TODO: https://jira.jboss.org/jira/browse/HORNETQ-369
+ // This part of the test is never being called.
+ blocked = true;
+ }
+
+ if (e.getCode() == HornetQException.UNBLOCKED || e.getCode() == HornetQException.TRANSACTION_ROLLED_BACK)
+ {
retry = true;
}
else
@@ -486,6 +502,7 @@
}
}
}
+ while (retry);
}
finally
{
Modified: trunk/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java 2010-04-21 13:10:13 UTC (rev 9143)
+++ trunk/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java 2010-04-21 16:35:31 UTC (rev 9144)
@@ -27,6 +27,7 @@
import org.hornetq.core.remoting.impl.invm.InVMConnector;
import org.hornetq.core.remoting.impl.invm.InVMRegistry;
import org.hornetq.core.remoting.impl.invm.TransportConstants;
+import org.hornetq.core.server.ActivateCallback;
import org.hornetq.core.server.HornetQServer;
import org.hornetq.tests.util.ServiceTestBase;
@@ -98,6 +99,26 @@
config1.setSharedStore(true);
config1.setBackup(true);
server1Service = createServer(true, config1);
+
+ server1Service.registerActivateCallback(new ActivateCallback()
+ {
+
+ public void preActivate()
+ {
+ // To avoid two servers messing up with the same journal at any single point
+ try
+ {
+ server0Service.getStorageManager().stop();
+ }
+ catch (Exception ignored)
+ {
+ }
+ }
+
+ public void activated()
+ {
+ }
+ });
Configuration config0 = super.createDefaultConfig();
config0.getAcceptorConfigurations().clear();
More information about the hornetq-commits
mailing list