[jboss-cvs] JBoss Messaging SVN: r2313 - in trunk: src/main/org/jboss/jms/tx and 5 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Wed Feb 14 15:25:03 EST 2007
Author: timfox
Date: 2007-02-14 15:25:03 -0500 (Wed, 14 Feb 2007)
New Revision: 2313
Added:
trunk/tests/src/org/jboss/test/messaging/jms/clustering/XAFailoverTest.java
Modified:
trunk/src/main/org/jboss/jms/server/ServerPeer.java
trunk/src/main/org/jboss/jms/tx/ResourceManager.java
trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/FailoverStatus.java
trunk/src/main/org/jboss/messaging/core/tx/Transaction.java
trunk/src/main/org/jboss/messaging/core/tx/TransactionRepository.java
trunk/tests/src/org/jboss/test/messaging/jms/clustering/FailoverTest.java
trunk/tests/src/org/jboss/test/messaging/jms/clustering/base/ClusteringTestBase.java
trunk/tests/src/org/jboss/test/messaging/tools/aop/PoisonInterceptor.java
Log:
http://jira.jboss.org/jira/browse/JBMESSAGING-734
Modified: trunk/src/main/org/jboss/jms/server/ServerPeer.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/ServerPeer.java 2007-02-14 20:02:30 UTC (rev 2312)
+++ trunk/src/main/org/jboss/jms/server/ServerPeer.java 2007-02-14 20:25:03 UTC (rev 2313)
@@ -1444,15 +1444,32 @@
{
if (key.equals(DefaultClusteredPostOffice.FAILED_OVER_FOR_KEY))
{
- // We have a failover status change - notify anyone waiting
-
- log.debug(ServerPeer.this + ".FailoverListener got failover event, notifying those waiting on lock");
-
+ if (updatedReplicantMap != null && originatingNodeId == serverPeerID)
+ {
+ FailoverStatus status = (FailoverStatus)updatedReplicantMap.get(new Integer(serverPeerID));
+
+ if (status != null && status.isFailingOver())
+ {
+ //We prompt txRepository to load any prepared txs - so we can take over responsibility for
+ //in doubt transactions from other nodes
+ try
+ {
+ txRepository.loadPreparedTransactions();
+ }
+ catch (Exception e)
+ {
+ log.error("Failed to load prepared transactions", e);
+ }
+ }
+ }
+
synchronized (failoverStatusLock)
{
+ log.debug(ServerPeer.this + ".FailoverListener got failover event, notifying those waiting on lock");
+
failoverStatusLock.notifyAll();
}
- }
+ }
}
}
}
Modified: trunk/src/main/org/jboss/jms/tx/ResourceManager.java
===================================================================
--- trunk/src/main/org/jboss/jms/tx/ResourceManager.java 2007-02-14 20:02:30 UTC (rev 2312)
+++ trunk/src/main/org/jboss/jms/tx/ResourceManager.java 2007-02-14 20:25:03 UTC (rev 2313)
@@ -533,6 +533,8 @@
{
Xid[] txs = conn.getPreparedTransactions();
+ if (trace) { log.trace("Got " + txs.length + " transactions from server"); }
+
//populate with TxState --MK
for (int i = 0; i < txs.length;i++)
{
Modified: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/FailoverStatus.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/FailoverStatus.java 2007-02-14 20:02:30 UTC (rev 2312)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/FailoverStatus.java 2007-02-14 20:25:03 UTC (rev 2313)
@@ -102,6 +102,11 @@
{
return failingOver && currentlyFailingOverForNode == nodeId;
}
+
+ public boolean isFailingOver()
+ {
+ return failingOver;
+ }
public String toString()
{
Modified: trunk/src/main/org/jboss/messaging/core/tx/Transaction.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/tx/Transaction.java 2007-02-14 20:02:30 UTC (rev 2312)
+++ trunk/src/main/org/jboss/messaging/core/tx/Transaction.java 2007-02-14 20:25:03 UTC (rev 2313)
@@ -62,9 +62,8 @@
private Map callbackMap;
- private boolean loadedAtStartup;
-
-
+ private boolean recoveredFromStorage;
+
/**
* If this is a XA transaction, when a commit is executed the transaction has to be removed from the transaction repository.
* This reference will guarantee the reference back to the repository where the transaction was created
@@ -182,6 +181,14 @@
{
throw new TransactionException("Transaction already rolled back, cannot commit");
}
+
+ if (recoveredFromStorage)
+ {
+ //Commit can come in for an in doubt tx that has been recovered from storage
+ //but for which recover() has not yet been called
+ //therefore we might need to load it's state
+ loadState();
+ }
if (trace) { log.trace(this + " executing before commit hooks"); }
@@ -292,6 +299,14 @@
throw new TransactionException("Transaction already rolled back, cannot rollback");
}
+ if (recoveredFromStorage)
+ {
+ //Commit can come in for an in doubt tx that has been recovered from storage
+ //but for which recover() has not yet been called
+ //therefore we might need to load it's state
+ loadState();
+ }
+
if (trace) { log.trace(this + " executing before rollback hooks"); }
boolean onePhase = state != STATE_PREPARED;
@@ -335,6 +350,15 @@
if (trace) { log.trace(this + " rollback process complete"); }
}
+ public void loadState() throws Exception
+ {
+ repository.handleReferences(this);
+
+ repository.handleAcks(this);
+
+ recoveredFromStorage = false;
+ }
+
public synchronized void setRollbackOnly() throws Exception
{
if (trace) { log.trace("setting ROLLBACK_ONLY on " + this); }
@@ -347,14 +371,14 @@
return id;
}
- public boolean isLoadedAtStartup()
+ public boolean isRecoveredFromStorage()
{
- return this.loadedAtStartup;
+ return this.recoveredFromStorage;
}
- public void setLoadedAtStartup(boolean loadedAtStartup)
+ public void setRecoveredFromStorage(boolean recoveredFromStorage)
{
- this.loadedAtStartup = loadedAtStartup;
+ this.recoveredFromStorage = recoveredFromStorage;
}
public void setState(int state)
Modified: trunk/src/main/org/jboss/messaging/core/tx/TransactionRepository.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/tx/TransactionRepository.java 2007-02-14 20:02:30 UTC (rev 2312)
+++ trunk/src/main/org/jboss/messaging/core/tx/TransactionRepository.java 2007-02-14 20:25:03 UTC (rev 2313)
@@ -128,11 +128,11 @@
ArrayList prepared = new ArrayList();
Iterator iter = globalToLocalMap.values().iterator();
-
+
while (iter.hasNext())
{
Transaction tx = (Transaction) iter.next();
-
+
if (tx.getXid() != null && tx.getState() == Transaction.STATE_PREPARED)
{
try
@@ -143,13 +143,10 @@
//in which case the tx will already have the references and acks in them
//in this case we DO NOT want to replay them again, since they will end up in the transaction state
//twice
- //In other words we only want to replay acks and sends if this tx was loaded at startup
- if (tx.isLoadedAtStartup())
+ //In other words we only want to replay acks and sends if this tx was recovered from the db
+ if (tx.isRecoveredFromStorage())
{
- handleReferences(tx);
- handleAcks(tx);
-
- tx.setLoadedAtStartup(false);
+ tx.loadState();
}
}
catch (Exception e)
@@ -160,6 +157,8 @@
prepared.add(tx.getXid());
}
}
+
+ if (trace) { log.trace("Returning " + prepared.size() + " transactions"); }
return prepared;
}
@@ -186,16 +185,26 @@
{
PreparedTxInfo txInfo = (PreparedTxInfo) iter.next();
- if (trace) log.trace("reinstating TX(XID: " + txInfo.getXid() + ", LocalId " + txInfo.getTxId() +")");
+ //This method may be called more than once - e.g. when failover occurs so we don't want to add the
+ //prepared tx if it is already in memory
- Transaction tx = createTransaction(txInfo);
-
- tx.setState(Transaction.STATE_PREPARED);
-
- tx.setLoadedAtStartup(true);
-
+ if (!globalToLocalMap.containsKey(txInfo.getXid()))
+ {
+ Transaction tx = createTransaction(txInfo);
+
+ tx.setState(Transaction.STATE_PREPARED);
+
+ tx.setRecoveredFromStorage(true);
+
+ if (trace) log.trace("reinstating TX(XID: " + txInfo.getXid() + ", LocalId " + txInfo.getTxId() +")");
+
+ }
+ else
+ {
+ if (trace) log.trace("Not adding to map since it's already in map");
+ }
}
- }
+ }
}
public List getPreparedTransactions()
@@ -272,22 +281,20 @@
return this.globalToLocalMap.size();
}
+
+
// Package protected ---------------------------------------------
- // Protected -----------------------------------------------------
-
- // Private -------------------------------------------------------
-
- /**
- * Load the references and invoke the channel to handle those refs
- */
- private void handleReferences(Transaction tx) throws Exception
+ /**
+ * Load the references and invoke the channel to handle those refs
+ */
+ void handleReferences(Transaction tx) throws Exception
{
if (trace) log.trace("Handle references for TX(XID: " + tx.getXid() + ", LocalID: " + tx.getId()+ "):");
long txId = tx.getId();
- List pairs = persistenceManager.getMessageChannelPairRefsForTx(txId);
+ List pairs = persistenceManager.getMessageChannelPairRefsForTx(txId);
if (trace) log.trace("Found " + pairs.size() + " unhandled references.");
@@ -327,13 +334,13 @@
ref.releaseMemoryReference();
}
}
- }
- }
+ }
+ }
- /**
- * Load the acks and acknowledge them
- */
- private void handleAcks(Transaction tx) throws Exception
+ /**
+ * Load the acks and acknowledge them
+ */
+ void handleAcks(Transaction tx) throws Exception
{
long txId = tx.getId();
@@ -401,7 +408,13 @@
tx.addCallback(new CancelCallback(dels), this);
}
- }
+ }
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+
/**
* Creates a prepared transaction
Modified: trunk/tests/src/org/jboss/test/messaging/jms/clustering/FailoverTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/clustering/FailoverTest.java 2007-02-14 20:02:30 UTC (rev 2312)
+++ trunk/tests/src/org/jboss/test/messaging/jms/clustering/FailoverTest.java 2007-02-14 20:25:03 UTC (rev 2313)
@@ -1926,36 +1926,5 @@
}
// Inner classes --------------------------------------------------------------------------------
-
- private class SimpleFailoverListener implements FailoverListener
- {
- private LinkedQueue buffer;
-
- public SimpleFailoverListener()
- {
- buffer = new LinkedQueue();
- }
-
- public void failoverEventOccured(FailoverEvent event)
- {
- try
- {
- buffer.put(event);
- }
- catch(InterruptedException e)
- {
- throw new RuntimeException("Putting thread interrupted while trying to add event " +
- "to buffer", e);
- }
- }
-
- /**
- * Blocks until a FailoverEvent is available or timeout occurs, in which case returns null.
- */
- public FailoverEvent getEvent(long timeout) throws InterruptedException
- {
- return (FailoverEvent)buffer.poll(timeout);
- }
- }
-
+
}
Added: trunk/tests/src/org/jboss/test/messaging/jms/clustering/XAFailoverTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/clustering/XAFailoverTest.java (rev 0)
+++ trunk/tests/src/org/jboss/test/messaging/jms/clustering/XAFailoverTest.java 2007-02-14 20:25:03 UTC (rev 2313)
@@ -0,0 +1,949 @@
+/**
+ * JBoss, Home of Professional Open Source
+ *
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
+ */
+package org.jboss.test.messaging.jms.clustering;
+
+import javax.jms.Connection;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.jms.XAConnection;
+import javax.jms.XAConnectionFactory;
+import javax.jms.XASession;
+import javax.naming.InitialContext;
+import javax.transaction.Transaction;
+import javax.transaction.TransactionManager;
+import javax.transaction.xa.XAException;
+import javax.transaction.xa.XAResource;
+import javax.transaction.xa.Xid;
+
+import org.jboss.jms.client.FailoverEvent;
+import org.jboss.jms.client.JBossConnection;
+import org.jboss.test.messaging.jms.clustering.base.ClusteringTestBase;
+import org.jboss.test.messaging.tools.ServerManagement;
+import org.jboss.test.messaging.tools.aop.PoisonInterceptor;
+import org.jboss.test.messaging.tools.jmx.ServiceContainer;
+import org.jboss.test.messaging.tools.jndi.InVMInitialContextFactory;
+
+/**
+ *
+ * A XAFailoverTest
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @version <tt>$Revision: 1.1 $</tt>
+ *
+ * $Id$
+ *
+ */
+public class XAFailoverTest extends ClusteringTestBase
+{
+ // Constants ------------------------------------------------------------------------------------
+
+ // Static ---------------------------------------------------------------------------------------
+
+ // Attributes -----------------------------------------------------------------------------------
+
+ private ServiceContainer sc;
+
+ private TransactionManager tm;
+
+ private Transaction suspended;
+
+ // Constructors ---------------------------------------------------------------------------------
+
+ public XAFailoverTest(String name)
+ {
+ super(name);
+ }
+
+ // Public ---------------------------------------------------------------------------------------
+
+ protected void setUp() throws Exception
+ {
+ nodeCount = 2;
+
+ super.setUp();
+
+ sc = new ServiceContainer("transaction");
+
+ //Don't drop the tables again!
+ sc.start(false);
+
+ InitialContext localIc = new InitialContext(InVMInitialContextFactory.getJNDIEnvironment());
+
+ tm = (TransactionManager)localIc.lookup(ServiceContainer.TRANSACTION_MANAGER_JNDI_NAME);
+
+ suspended = tm.suspend();
+
+ log.debug("setup done");
+ }
+
+ protected void tearDown() throws Exception
+ {
+ super.tearDown();
+
+ sc.stop();
+
+ if (suspended != null)
+ {
+ tm.resume(suspended);
+ }
+ }
+
+ public void testSimpleXAConnectionFailover() throws Exception
+ {
+ XAConnection conn = null;
+
+ XAConnectionFactory xaCF = (XAConnectionFactory)cf;
+
+ try
+ {
+ // skip connection to node 0
+ conn = xaCF.createXAConnection();
+ conn.close();
+
+ // create a connection to node 1
+ conn = xaCF.createXAConnection();
+ conn.start();
+
+ assertEquals(1, ((JBossConnection)conn).getServerID());
+
+ // register a failover listener
+ SimpleFailoverListener failoverListener = new SimpleFailoverListener();
+ ((JBossConnection)conn).registerFailoverListener(failoverListener);
+
+ log.debug("killing node 1 ....");
+
+ ServerManagement.kill(1);
+
+ log.info("########");
+ log.info("######## KILLED NODE 1");
+ log.info("########");
+
+ // wait for the client-side failover to complete
+
+ while(true)
+ {
+ FailoverEvent event = failoverListener.getEvent(120000);
+ if (event != null && FailoverEvent.FAILOVER_COMPLETED == event.getType())
+ {
+ break;
+ }
+ if (event == null)
+ {
+ fail("Did not get expected FAILOVER_COMPLETED event");
+ }
+ }
+
+ // failover complete
+ log.info("failover completed");
+
+ assertEquals(0, ((JBossConnection)conn).getServerID());
+
+ }
+ finally
+ {
+ if (conn != null)
+ {
+ conn.close();
+ }
+ }
+ }
+
+
+ public void testSendFailBeforePrepare() throws Exception
+ {
+ XAConnection xaConn = null;
+
+ XAConnectionFactory xaCF = (XAConnectionFactory)cf;
+
+ Connection conn = null;
+
+ try
+ {
+ // skip connection to node 0
+ xaConn = xaCF.createXAConnection();
+ xaConn.close();
+
+ // create a connection to node 1
+ xaConn = xaCF.createXAConnection();
+
+ assertEquals(1, ((JBossConnection)xaConn).getServerID());
+
+ conn = cf.createConnection();
+ conn.close();
+ conn = cf.createConnection();
+
+ assertEquals(1, ((JBossConnection)conn).getServerID());
+
+ conn.start();
+
+
+ // register a failover listener
+ SimpleFailoverListener failoverListener = new SimpleFailoverListener();
+ ((JBossConnection)xaConn).registerFailoverListener(failoverListener);
+
+ // Create a normal consumer on the queue
+ Session sessRec = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ MessageConsumer cons = sessRec.createConsumer(queue[1]);
+
+ // Create an XA session
+
+ XASession sess = xaConn.createXASession();
+
+ XAResource res = sess.getXAResource();
+
+ MessageProducer prod = sess.createProducer(queue[1]);
+
+ tm.begin();
+
+ Transaction tx = tm.getTransaction();
+
+ tx.enlistResource(res);
+
+ //Enlist a dummy XAResource to force 2pc
+ XAResource dummy = new DummyXAResource();
+
+ tx.enlistResource(dummy);
+
+ //Send a message
+
+ TextMessage msg = sess.createTextMessage("Cupid stunt");
+
+ prod.send(msg);
+
+ //Make sure message can't be received
+
+ Message m = cons.receive(2000);
+
+ assertNull(m);
+
+ tx.delistResource(res, XAResource.TMSUCCESS);
+
+ tx.delistResource(dummy, XAResource.TMSUCCESS);
+
+ //Now kill node 1
+
+ log.debug("killing node 1 ....");
+
+ ServerManagement.kill(1);
+
+ log.info("########");
+ log.info("######## KILLED NODE 1");
+ log.info("########");
+
+ // wait for the client-side failover to complete
+
+ while(true)
+ {
+ FailoverEvent event = failoverListener.getEvent(120000);
+ if (event != null && FailoverEvent.FAILOVER_COMPLETED == event.getType())
+ {
+ break;
+ }
+ if (event == null)
+ {
+ fail("Did not get expected FAILOVER_COMPLETED event");
+ }
+ }
+
+ // failover complete
+ log.info("failover completed");
+
+ //Now commit the transaction
+
+ tm.commit();
+
+ // Message should now be receivable
+
+ TextMessage mrec = (TextMessage)cons.receive(2000);
+
+ assertNotNull(mrec);
+
+ assertEquals(msg.getText(), mrec.getText());
+
+ m = cons.receive(2000);
+
+ assertNull(m);
+
+ assertEquals(0, ((JBossConnection)xaConn).getServerID());
+
+ }
+ finally
+ {
+ if (xaConn != null)
+ {
+ xaConn.close();
+ }
+ if (conn != null)
+ {
+ conn.close();
+ }
+ }
+ }
+
+
+ public void testSendAndReceiveFailBeforePrepare() throws Exception
+ {
+ XAConnection xaConn = null;
+
+ XAConnectionFactory xaCF = (XAConnectionFactory)cf;
+
+ Connection conn = null;
+
+ try
+ {
+ // skip connection to node 0
+ xaConn = xaCF.createXAConnection();
+ xaConn.close();
+
+ // create a connection to node 1
+ xaConn = xaCF.createXAConnection();
+
+ assertEquals(1, ((JBossConnection)xaConn).getServerID());
+
+ conn = cf.createConnection();
+ conn.close();
+ conn = cf.createConnection();
+
+ assertEquals(1, ((JBossConnection)conn).getServerID());
+
+ conn.start();
+
+ xaConn.start();
+
+ // register a failover listener
+ SimpleFailoverListener failoverListener = new SimpleFailoverListener();
+ ((JBossConnection)xaConn).registerFailoverListener(failoverListener);
+
+ // Create a normal consumer on the queue
+ Session sessRec = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ //Send a message to the queue
+ MessageProducer prod = sessRec.createProducer(queue[1]);
+
+ TextMessage sent = sessRec.createTextMessage("plop");
+
+ prod.send(sent);
+
+ // Create an XA session
+
+ XASession sess = xaConn.createXASession();
+
+ XAResource res = sess.getXAResource();
+
+ MessageProducer prod2 = sess.createProducer(queue[1]);
+
+ MessageConsumer cons2 = sess.createConsumer(queue[1]);
+
+ tm.begin();
+
+ Transaction tx = tm.getTransaction();
+
+ tx.enlistResource(res);
+
+ //Enlist a dummy XAResource to force 2pc
+ XAResource dummy = new DummyXAResource();
+
+ tx.enlistResource(dummy);
+
+ //receive a message
+
+ TextMessage received = (TextMessage)cons2.receive(2000);
+
+ assertNotNull(received);
+
+ assertEquals(sent.getText(), received.getText());
+
+ //Send a message
+
+ TextMessage msg = sess.createTextMessage("Cupid stunt");
+
+ prod2.send(msg);
+
+ // Make sure can't be received
+
+ MessageConsumer cons = sessRec.createConsumer(queue[1]);
+
+ Message m = cons.receive(2000);
+
+ assertNull(m);
+
+ tx.delistResource(res, XAResource.TMSUCCESS);
+
+ tx.delistResource(dummy, XAResource.TMSUCCESS);
+
+ //Now kill node 1
+
+ log.debug("killing node 1 ....");
+
+ ServerManagement.kill(1);
+
+ log.info("########");
+ log.info("######## KILLED NODE 1");
+ log.info("########");
+
+ // wait for the client-side failover to complete
+
+ while(true)
+ {
+ FailoverEvent event = failoverListener.getEvent(120000);
+ if (event != null && FailoverEvent.FAILOVER_COMPLETED == event.getType())
+ {
+ break;
+ }
+ if (event == null)
+ {
+ fail("Did not get expected FAILOVER_COMPLETED event");
+ }
+ }
+
+ // failover complete
+ log.info("failover completed");
+
+ //Now commit the transaction
+
+ tm.commit();
+
+ // Message should now be receivable
+
+ cons2.close();
+
+ TextMessage mrec = (TextMessage)cons.receive(2000);
+
+ assertNotNull(mrec);
+
+ assertEquals(msg.getText(), mrec.getText());
+
+ m = cons.receive(2000);
+
+ //And the other message should be acked
+ assertNull(m);
+
+ assertEquals(0, ((JBossConnection)xaConn).getServerID());
+
+ }
+ finally
+ {
+ if (xaConn != null)
+ {
+ xaConn.close();
+ }
+ if (conn != null)
+ {
+ conn.close();
+ }
+ }
+ }
+
+
+ public void testSendAndReceiveTwoConnectionsFailBeforePrepare() throws Exception
+ {
+ XAConnection xaConn0 = null;
+
+ XAConnection xaConn1 = null;
+
+ XAConnectionFactory xaCF = (XAConnectionFactory)cf;
+
+ Connection conn0 = null;
+
+ Connection conn1 = null;
+
+ try
+ {
+ xaConn0 = xaCF.createXAConnection();
+
+ assertEquals(0, ((JBossConnection)xaConn0).getServerID());
+
+ xaConn1 = xaCF.createXAConnection();
+
+ assertEquals(1, ((JBossConnection)xaConn1).getServerID());
+
+ conn0 = cf.createConnection();
+
+ assertEquals(0, ((JBossConnection)conn0).getServerID());
+
+ conn1 = cf.createConnection();
+
+ assertEquals(1, ((JBossConnection)conn1).getServerID());
+
+ xaConn0.start();
+
+ xaConn1.start();
+
+
+ // register a failover listener
+ SimpleFailoverListener failoverListener = new SimpleFailoverListener();
+ ((JBossConnection)xaConn1).registerFailoverListener(failoverListener);
+
+ //Send a message to each queue
+
+ Session sess = conn0.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ MessageProducer prod = sess.createProducer(queue[0]);
+
+ TextMessage sent0 = sess.createTextMessage("plop0");
+
+ prod.send(sent0);
+
+ sess.close();
+
+ sess = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ prod = sess.createProducer(queue[1]);
+
+ TextMessage sent1 = sess.createTextMessage("plop1");
+
+ prod.send(sent1);
+
+ sess.close();
+
+
+
+
+ XASession sess0 = xaConn0.createXASession();
+
+ XAResource res0 = sess0.getXAResource();
+
+ MessageProducer prod0 = sess0.createProducer(queue[0]);
+
+ MessageConsumer cons0 = sess0.createConsumer(queue[0]);
+
+
+ XASession sess1 = xaConn1.createXASession();
+
+ XAResource res1 = sess1.getXAResource();
+
+ MessageProducer prod1 = sess1.createProducer(queue[1]);
+
+ MessageConsumer cons1 = sess1.createConsumer(queue[1]);
+
+
+ tm.begin();
+
+ Transaction tx = tm.getTransaction();
+
+ tx.enlistResource(res0);
+
+ tx.enlistResource(res1);
+
+ //receive a message
+
+ TextMessage received = (TextMessage)cons0.receive(2000);
+
+ assertNotNull(received);
+
+ assertEquals(sent0.getText(), received.getText());
+
+
+ received = (TextMessage)cons1.receive(2000);
+
+ assertNotNull(received);
+
+ assertEquals(sent1.getText(), received.getText());
+
+
+
+ //Send a message
+
+ TextMessage msg0 = sess0.createTextMessage("Cupid stunt0");
+
+ prod0.send(msg0);
+
+ TextMessage msg1 = sess1.createTextMessage("Cupid stunt1");
+
+ prod1.send(msg1);
+
+
+
+ tx.delistResource(res0, XAResource.TMSUCCESS);
+
+ tx.delistResource(res1, XAResource.TMSUCCESS);
+
+ //Now kill node 1
+
+ log.debug("killing node 1 ....");
+
+ ServerManagement.kill(1);
+
+ log.info("########");
+ log.info("######## KILLED NODE 1");
+ log.info("########");
+
+ // wait for the client-side failover to complete
+
+ while(true)
+ {
+ FailoverEvent event = failoverListener.getEvent(120000);
+ if (event != null && FailoverEvent.FAILOVER_COMPLETED == event.getType())
+ {
+ break;
+ }
+ if (event == null)
+ {
+ fail("Did not get expected FAILOVER_COMPLETED event");
+ }
+ }
+
+ // failover complete
+ log.info("failover completed");
+
+ //Now commit the transaction
+
+ tm.commit();
+
+ cons0.close();
+
+ cons1.close();
+
+ // Message should now be receivable
+
+ conn0.start();
+
+ Session sessRec0 = conn0.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ cons0 = sessRec0.createConsumer(queue[0]);
+
+ TextMessage mrec = (TextMessage)cons0.receive(2000);
+
+ assertNotNull(mrec);
+
+ assertEquals(msg0.getText(), mrec.getText());
+
+ Message m = cons0.receive(2000);
+
+ //And the other message should be acked
+ assertNull(m);
+
+
+ conn1.start();
+
+ Session sessRec1 = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ cons1 = sessRec1.createConsumer(queue[0]);
+
+ mrec = (TextMessage)cons1.receive(2000);
+
+ assertNotNull(mrec);
+
+ assertEquals(msg1.getText(), mrec.getText());
+
+ m = cons1.receive(2000);
+
+ //And the other message should be acked
+ assertNull(m);
+
+ assertEquals(0, ((JBossConnection)xaConn1).getServerID());
+
+ }
+ finally
+ {
+ if (xaConn1 != null)
+ {
+ xaConn1.close();
+ }
+ if (conn0 != null)
+ {
+ conn0.close();
+ }
+ if (conn1 != null)
+ {
+ conn1.close();
+ }
+ }
+ }
+
+
+
+
+ public void testSendAndReceiveTwoConnectionsFailAfterPrepareAndRecover() throws Exception
+ {
+ XAConnection xaConn0 = null;
+
+ XAConnection xaConn1 = null;
+
+ XAConnectionFactory xaCF = (XAConnectionFactory)cf;
+
+ Connection conn0 = null;
+
+ Connection conn1 = null;
+
+ try
+ {
+ xaConn0 = xaCF.createXAConnection();
+
+ assertEquals(0, ((JBossConnection)xaConn0).getServerID());
+
+ xaConn1 = xaCF.createXAConnection();
+
+ assertEquals(1, ((JBossConnection)xaConn1).getServerID());
+
+ conn0 = cf.createConnection();
+
+ assertEquals(0, ((JBossConnection)conn0).getServerID());
+
+ conn1 = cf.createConnection();
+
+ assertEquals(1, ((JBossConnection)conn1).getServerID());
+
+ xaConn0.start();
+
+ xaConn1.start();
+
+
+ // register a failover listener
+ SimpleFailoverListener failoverListener = new SimpleFailoverListener();
+ ((JBossConnection)xaConn1).registerFailoverListener(failoverListener);
+
+ //Send a message to each queue
+
+ Session sess = conn0.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ MessageProducer prod = sess.createProducer(queue[0]);
+
+ TextMessage sent0 = sess.createTextMessage("plop0");
+
+ prod.send(sent0);
+
+ sess.close();
+
+ sess = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ prod = sess.createProducer(queue[1]);
+
+ TextMessage sent1 = sess.createTextMessage("plop1");
+
+ prod.send(sent1);
+
+ sess.close();
+
+
+
+
+ XASession sess0 = xaConn0.createXASession();
+
+ XAResource res0 = sess0.getXAResource();
+
+ MessageProducer prod0 = sess0.createProducer(queue[0]);
+
+ MessageConsumer cons0 = sess0.createConsumer(queue[0]);
+
+
+ XASession sess1 = xaConn1.createXASession();
+
+ XAResource res1 = sess1.getXAResource();
+
+ MessageProducer prod1 = sess1.createProducer(queue[1]);
+
+ MessageConsumer cons1 = sess1.createConsumer(queue[1]);
+
+
+ tm.begin();
+
+ Transaction tx = tm.getTransaction();
+
+ tx.enlistResource(res0);
+
+ tx.enlistResource(res1);
+
+ //receive a message
+
+ TextMessage received = (TextMessage)cons0.receive(2000);
+
+ assertNotNull(received);
+
+ assertEquals(sent0.getText(), received.getText());
+
+
+ received = (TextMessage)cons1.receive(2000);
+
+ assertNotNull(received);
+
+ assertEquals(sent1.getText(), received.getText());
+
+
+
+ //Send a message
+
+ TextMessage msg0 = sess0.createTextMessage("Cupid stunt0");
+
+ prod0.send(msg0);
+
+ TextMessage msg1 = sess1.createTextMessage("Cupid stunt1");
+
+ prod1.send(msg1);
+
+ tx.delistResource(res0, XAResource.TMSUCCESS);
+
+ tx.delistResource(res1, XAResource.TMSUCCESS);
+
+ // We poison node 1 so that it crashes after prepare but before commit is processed
+
+ ServerManagement.poisonTheServer(1, PoisonInterceptor.TYPE_2PC_COMMIT);
+
+ tm.commit();
+
+ //Now kill node 1
+
+ log.debug("killing node 1 ....");
+
+ ServerManagement.kill(1);
+
+ log.info("########");
+ log.info("######## KILLED NODE 1");
+ log.info("########");
+
+ // wait for the client-side failover to complete
+
+ while(true)
+ {
+ FailoverEvent event = failoverListener.getEvent(120000);
+ if (event != null && FailoverEvent.FAILOVER_COMPLETED == event.getType())
+ {
+ break;
+ }
+ if (event == null)
+ {
+ fail("Did not get expected FAILOVER_COMPLETED event");
+ }
+ }
+
+ //When the node comes back up, the invocation to commit() will be retried on the new node.
+ //The new node will by then already have loaded into memory the prepared transactions from
+ //the failed node so this should complete ok
+
+ // failover complete
+ log.info("failover completed");
+
+ cons0.close();
+
+ cons1.close();
+
+
+ // Message should now be receivable
+
+ conn0.start();
+
+ Session sessRec0 = conn0.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ cons0 = sessRec0.createConsumer(queue[0]);
+
+ TextMessage mrec = (TextMessage)cons0.receive(2000);
+
+ assertNotNull(mrec);
+
+ assertEquals(msg0.getText(), mrec.getText());
+
+ Message m = cons0.receive(2000);
+
+ //And the other message should be acked
+ assertNull(m);
+
+
+ conn1.start();
+
+ Session sessRec1 = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ cons1 = sessRec1.createConsumer(queue[0]);
+
+ mrec = (TextMessage)cons1.receive(2000);
+
+ assertNotNull(mrec);
+
+ assertEquals(msg1.getText(), mrec.getText());
+
+ m = cons1.receive(2000);
+
+ //And the other message should be acked
+ assertNull(m);
+
+ assertEquals(0, ((JBossConnection)xaConn1).getServerID());
+
+ }
+ finally
+ {
+ if (xaConn1 != null)
+ {
+ xaConn1.close();
+ }
+ if (conn0 != null)
+ {
+ conn0.close();
+ }
+ if (conn1 != null)
+ {
+ conn1.close();
+ }
+ }
+ }
+
+
+
+
+
+ // Inner classes --------------------------------------------------------------------------------
+
+ static class DummyXAResource implements XAResource
+ {
+ boolean failOnPrepare;
+
+ DummyXAResource()
+ {
+ }
+
+ public void commit(Xid arg0, boolean arg1) throws XAException
+ {
+ }
+
+ public void end(Xid arg0, int arg1) throws XAException
+ {
+ }
+
+ public void forget(Xid arg0) throws XAException
+ {
+ }
+
+ public int getTransactionTimeout() throws XAException
+ {
+ return 0;
+ }
+
+ public boolean isSameRM(XAResource arg0) throws XAException
+ {
+ return false;
+ }
+
+ public int prepare(Xid arg0) throws XAException
+ {
+ return XAResource.XA_OK;
+ }
+
+ public Xid[] recover(int arg0) throws XAException
+ {
+ return null;
+ }
+
+ public void rollback(Xid arg0) throws XAException
+ {
+ }
+
+ public boolean setTransactionTimeout(int arg0) throws XAException
+ {
+ return false;
+ }
+
+ public void start(Xid arg0, int arg1) throws XAException
+ {
+
+ }
+
+ }
+
+
+}
Modified: trunk/tests/src/org/jboss/test/messaging/jms/clustering/base/ClusteringTestBase.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/clustering/base/ClusteringTestBase.java 2007-02-14 20:02:30 UTC (rev 2312)
+++ trunk/tests/src/org/jboss/test/messaging/jms/clustering/base/ClusteringTestBase.java 2007-02-14 20:25:03 UTC (rev 2313)
@@ -31,12 +31,17 @@
import javax.jms.Topic;
import javax.naming.Context;
import javax.naming.InitialContext;
+
+import org.jboss.jms.client.FailoverEvent;
+import org.jboss.jms.client.FailoverListener;
import org.jboss.jms.client.JBossConnection;
import org.jboss.jms.client.delegate.DelegateSupport;
import org.jboss.jms.client.state.ConnectionState;
import org.jboss.test.messaging.MessagingTestCase;
import org.jboss.test.messaging.tools.ServerManagement;
+import EDU.oswego.cs.dl.util.concurrent.LinkedQueue;
+
/**
* @author <a href="mailto:tim.fox at jboss.org">Tim Fox</a>
* @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
@@ -276,5 +281,36 @@
}
// Inner classes --------------------------------------------------------------------------------
+
+ protected class SimpleFailoverListener implements FailoverListener
+ {
+ private LinkedQueue buffer;
+ public SimpleFailoverListener()
+ {
+ buffer = new LinkedQueue();
+ }
+
+ public void failoverEventOccured(FailoverEvent event)
+ {
+ try
+ {
+ buffer.put(event);
+ }
+ catch(InterruptedException e)
+ {
+ throw new RuntimeException("Putting thread interrupted while trying to add event " +
+ "to buffer", e);
+ }
+ }
+
+ /**
+ * Blocks until a FailoverEvent is available or timeout occurs, in which case returns null.
+ */
+ public FailoverEvent getEvent(long timeout) throws InterruptedException
+ {
+ return (FailoverEvent)buffer.poll(timeout);
+ }
+ }
+
}
Modified: trunk/tests/src/org/jboss/test/messaging/tools/aop/PoisonInterceptor.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/tools/aop/PoisonInterceptor.java 2007-02-14 20:02:30 UTC (rev 2312)
+++ trunk/tests/src/org/jboss/test/messaging/tools/aop/PoisonInterceptor.java 2007-02-14 20:25:03 UTC (rev 2313)
@@ -94,14 +94,13 @@
if (request.getRequestType() == TransactionRequest.TWO_PHASE_COMMIT_REQUEST
&& type == TYPE_2PC_COMMIT)
{
- //Crash on 2pc commit - used in message bridge tests
+ //Crash before 2pc commit (after prepare)- used in message bridge tests
log.info("##### Crashing on 2PC commit!!");
crash(target);
}
- else
- if (request.getRequestType() == TransactionRequest.ONE_PHASE_COMMIT_REQUEST &&
+ else if (request.getRequestType() == TransactionRequest.ONE_PHASE_COMMIT_REQUEST &&
type == FAIL_AFTER_SENDTRANSACTION)
{
invocation.invokeNext();
More information about the jboss-cvs-commits
mailing list