[jboss-cvs] JBoss Messaging SVN: r8595 - in branches/JBossMessaging_1_4_8_SP8_JBossMessaging-1940: src/main/org/jboss/jms/server and 3 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Fri Mar 8 03:28:01 EST 2013
Author: raggz
Date: 2013-03-08 03:28:00 -0500 (Fri, 08 Mar 2013)
New Revision: 8595
Modified:
branches/JBossMessaging_1_4_8_SP8_JBossMessaging-1940/integration/EAP5/etc/server/default/deploy/db2-persistence-service.xml
branches/JBossMessaging_1_4_8_SP8_JBossMessaging-1940/integration/EAP5/etc/server/default/deploy/mssql-persistence-service.xml
branches/JBossMessaging_1_4_8_SP8_JBossMessaging-1940/integration/EAP5/etc/server/default/deploy/mysql-persistence-service.xml
branches/JBossMessaging_1_4_8_SP8_JBossMessaging-1940/integration/EAP5/etc/server/default/deploy/ndb-persistence-service.xml
branches/JBossMessaging_1_4_8_SP8_JBossMessaging-1940/integration/EAP5/etc/server/default/deploy/oracle-persistence-service.xml
branches/JBossMessaging_1_4_8_SP8_JBossMessaging-1940/integration/EAP5/etc/server/default/deploy/postgresql-persistence-service.xml
branches/JBossMessaging_1_4_8_SP8_JBossMessaging-1940/integration/EAP5/etc/server/default/deploy/sybase-persistence-service.xml
branches/JBossMessaging_1_4_8_SP8_JBossMessaging-1940/src/main/org/jboss/jms/server/ServerPeer.java
branches/JBossMessaging_1_4_8_SP8_JBossMessaging-1940/src/main/org/jboss/messaging/core/impl/JDBCPersistenceManager.java
branches/JBossMessaging_1_4_8_SP8_JBossMessaging-1940/src/main/org/jboss/messaging/core/impl/JDBCSupport.java
branches/JBossMessaging_1_4_8_SP8_JBossMessaging-1940/tests/src/org/jboss/test/messaging/jms/clustering/XAFailoverTest.java
branches/JBossMessaging_1_4_8_SP8_JBossMessaging-1940/tests/src/org/jboss/test/messaging/tools/ServerManagement.java
Log:
JBMessaging-1940
Modified: branches/JBossMessaging_1_4_8_SP8_JBossMessaging-1940/integration/EAP5/etc/server/default/deploy/db2-persistence-service.xml
===================================================================
--- branches/JBossMessaging_1_4_8_SP8_JBossMessaging-1940/integration/EAP5/etc/server/default/deploy/db2-persistence-service.xml 2013-03-08 08:06:59 UTC (rev 8594)
+++ branches/JBossMessaging_1_4_8_SP8_JBossMessaging-1940/integration/EAP5/etc/server/default/deploy/db2-persistence-service.xml 2013-03-08 08:28:00 UTC (rev 8595)
@@ -84,6 +84,7 @@
INSERT_COUNTER=INSERT INTO JBM_COUNTER (NAME, NEXT_ID) VALUES (?, ?)
SELECT_ALL_CHANNELS=SELECT DISTINCT(CHANNEL_ID) FROM JBM_MSG_REF
UPDATE_TX=UPDATE JBM_TX SET NODE_ID=? WHERE NODE_ID=?
+ MOVE_TX_NODE=UPDATE JBM_TX SET NODE_ID=? WHERE TRANSACTION_ID=?
UPDATE_ID_IN_CACHE=UPDATE JBM_ID_CACHE SET JBM_ID = ? WHERE NODE_ID = ? AND CNTR = ?
INSERT_ID_IN_CACHE=INSERT INTO JBM_ID_CACHE (NODE_ID, CNTR, JBM_ID) VALUES (?, ?, ?)
LOAD_ID_CACHE=SELECT CNTR, JBM_ID FROM JBM_ID_CACHE WHERE NODE_ID = ?
Modified: branches/JBossMessaging_1_4_8_SP8_JBossMessaging-1940/integration/EAP5/etc/server/default/deploy/mssql-persistence-service.xml
===================================================================
--- branches/JBossMessaging_1_4_8_SP8_JBossMessaging-1940/integration/EAP5/etc/server/default/deploy/mssql-persistence-service.xml 2013-03-08 08:06:59 UTC (rev 8594)
+++ branches/JBossMessaging_1_4_8_SP8_JBossMessaging-1940/integration/EAP5/etc/server/default/deploy/mssql-persistence-service.xml 2013-03-08 08:28:00 UTC (rev 8595)
@@ -80,6 +80,7 @@
INSERT_COUNTER=INSERT INTO JBM_COUNTER (NAME, NEXT_ID) VALUES (?, ?)
SELECT_ALL_CHANNELS=SELECT DISTINCT(CHANNEL_ID) FROM JBM_MSG_REF
UPDATE_TX=UPDATE JBM_TX SET NODE_ID=? WHERE NODE_ID=?
+ MOVE_TX_NODE=UPDATE JBM_TX SET NODE_ID=? WHERE TRANSACTION_ID=?
UPDATE_ID_IN_CACHE=UPDATE JBM_ID_CACHE SET JBM_ID = ? WHERE NODE_ID = ? AND CNTR = ?
INSERT_ID_IN_CACHE=INSERT INTO JBM_ID_CACHE (NODE_ID, CNTR, JBM_ID) VALUES (?, ?, ?)
LOAD_ID_CACHE=SELECT CNTR, JBM_ID FROM JBM_ID_CACHE WHERE NODE_ID = ?
Modified: branches/JBossMessaging_1_4_8_SP8_JBossMessaging-1940/integration/EAP5/etc/server/default/deploy/mysql-persistence-service.xml
===================================================================
--- branches/JBossMessaging_1_4_8_SP8_JBossMessaging-1940/integration/EAP5/etc/server/default/deploy/mysql-persistence-service.xml 2013-03-08 08:06:59 UTC (rev 8594)
+++ branches/JBossMessaging_1_4_8_SP8_JBossMessaging-1940/integration/EAP5/etc/server/default/deploy/mysql-persistence-service.xml 2013-03-08 08:28:00 UTC (rev 8595)
@@ -80,6 +80,7 @@
INSERT_COUNTER=INSERT INTO JBM_COUNTER (NAME, NEXT_ID) VALUES (?, ?)
SELECT_ALL_CHANNELS=SELECT DISTINCT(CHANNEL_ID) FROM JBM_MSG_REF
UPDATE_TX=UPDATE JBM_TX SET NODE_ID=? WHERE NODE_ID=?
+ MOVE_TX_NODE=UPDATE JBM_TX SET NODE_ID=? WHERE TRANSACTION_ID=?
UPDATE_ID_IN_CACHE=UPDATE JBM_ID_CACHE SET JBM_ID = ? WHERE NODE_ID = ? AND CNTR = ?
INSERT_ID_IN_CACHE=INSERT INTO JBM_ID_CACHE (NODE_ID, CNTR, JBM_ID) VALUES (?, ?, ?)
LOAD_ID_CACHE=SELECT CNTR, JBM_ID FROM JBM_ID_CACHE WHERE NODE_ID = ?
Modified: branches/JBossMessaging_1_4_8_SP8_JBossMessaging-1940/integration/EAP5/etc/server/default/deploy/ndb-persistence-service.xml
===================================================================
--- branches/JBossMessaging_1_4_8_SP8_JBossMessaging-1940/integration/EAP5/etc/server/default/deploy/ndb-persistence-service.xml 2013-03-08 08:06:59 UTC (rev 8594)
+++ branches/JBossMessaging_1_4_8_SP8_JBossMessaging-1940/integration/EAP5/etc/server/default/deploy/ndb-persistence-service.xml 2013-03-08 08:28:00 UTC (rev 8595)
@@ -80,6 +80,7 @@
INSERT_COUNTER=INSERT INTO JBM_COUNTER (NAME, NEXT_ID) VALUES (?, ?)
SELECT_ALL_CHANNELS=SELECT DISTINCT(CHANNEL_ID) FROM JBM_MSG_REF
UPDATE_TX=UPDATE JBM_TX SET NODE_ID=? WHERE NODE_ID=?
+ MOVE_TX_NODE=UPDATE JBM_TX SET NODE_ID=? WHERE TRANSACTION_ID=?
UPDATE_ID_IN_CACHE=UPDATE JBM_ID_CACHE SET JBM_ID = ? WHERE NODE_ID = ? AND CNTR = ?
INSERT_ID_IN_CACHE=INSERT INTO JBM_ID_CACHE (NODE_ID, CNTR, JBM_ID) VALUES (?, ?, ?)
LOAD_ID_CACHE=SELECT CNTR, JBM_ID FROM JBM_ID_CACHE WHERE NODE_ID = ?
Modified: branches/JBossMessaging_1_4_8_SP8_JBossMessaging-1940/integration/EAP5/etc/server/default/deploy/oracle-persistence-service.xml
===================================================================
--- branches/JBossMessaging_1_4_8_SP8_JBossMessaging-1940/integration/EAP5/etc/server/default/deploy/oracle-persistence-service.xml 2013-03-08 08:06:59 UTC (rev 8594)
+++ branches/JBossMessaging_1_4_8_SP8_JBossMessaging-1940/integration/EAP5/etc/server/default/deploy/oracle-persistence-service.xml 2013-03-08 08:28:00 UTC (rev 8595)
@@ -84,6 +84,7 @@
INSERT_COUNTER=INSERT INTO JBM_COUNTER (NAME, NEXT_ID) VALUES (?, ?)
SELECT_ALL_CHANNELS=SELECT DISTINCT(CHANNEL_ID) FROM JBM_MSG_REF
UPDATE_TX=UPDATE JBM_TX SET NODE_ID=? WHERE NODE_ID=?
+ MOVE_TX_NODE=UPDATE JBM_TX SET NODE_ID=? WHERE TRANSACTION_ID=?
UPDATE_ID_IN_CACHE=UPDATE JBM_ID_CACHE SET JBM_ID = ? WHERE NODE_ID = ? AND CNTR = ?
INSERT_ID_IN_CACHE=INSERT INTO JBM_ID_CACHE (NODE_ID, CNTR, JBM_ID) VALUES (?, ?, ?)
LOAD_ID_CACHE=SELECT CNTR, JBM_ID FROM JBM_ID_CACHE WHERE NODE_ID = ?
Modified: branches/JBossMessaging_1_4_8_SP8_JBossMessaging-1940/integration/EAP5/etc/server/default/deploy/postgresql-persistence-service.xml
===================================================================
--- branches/JBossMessaging_1_4_8_SP8_JBossMessaging-1940/integration/EAP5/etc/server/default/deploy/postgresql-persistence-service.xml 2013-03-08 08:06:59 UTC (rev 8594)
+++ branches/JBossMessaging_1_4_8_SP8_JBossMessaging-1940/integration/EAP5/etc/server/default/deploy/postgresql-persistence-service.xml 2013-03-08 08:28:00 UTC (rev 8595)
@@ -80,6 +80,7 @@
INSERT_COUNTER=INSERT INTO JBM_COUNTER (NAME, NEXT_ID) VALUES (?, ?)
SELECT_ALL_CHANNELS=SELECT DISTINCT(CHANNEL_ID) FROM JBM_MSG_REF
UPDATE_TX=UPDATE JBM_TX SET NODE_ID=? WHERE NODE_ID=?
+ MOVE_TX_NODE=UPDATE JBM_TX SET NODE_ID=? WHERE TRANSACTION_ID=?
UPDATE_ID_IN_CACHE=UPDATE JBM_ID_CACHE SET JBM_ID = ? WHERE NODE_ID = ? AND CNTR = ?
INSERT_ID_IN_CACHE=INSERT INTO JBM_ID_CACHE (NODE_ID, CNTR, JBM_ID) VALUES (?, ?, ?)
LOAD_ID_CACHE=SELECT CNTR, JBM_ID FROM JBM_ID_CACHE WHERE NODE_ID = ?
Modified: branches/JBossMessaging_1_4_8_SP8_JBossMessaging-1940/integration/EAP5/etc/server/default/deploy/sybase-persistence-service.xml
===================================================================
--- branches/JBossMessaging_1_4_8_SP8_JBossMessaging-1940/integration/EAP5/etc/server/default/deploy/sybase-persistence-service.xml 2013-03-08 08:06:59 UTC (rev 8594)
+++ branches/JBossMessaging_1_4_8_SP8_JBossMessaging-1940/integration/EAP5/etc/server/default/deploy/sybase-persistence-service.xml 2013-03-08 08:28:00 UTC (rev 8595)
@@ -85,6 +85,7 @@
INSERT_COUNTER=INSERT INTO JBM_COUNTER (NAME, NEXT_ID) VALUES (?, ?)
SELECT_ALL_CHANNELS=SELECT DISTINCT(CHANNEL_ID) FROM JBM_MSG_REF
UPDATE_TX=UPDATE JBM_TX SET NODE_ID=? WHERE NODE_ID=?
+ MOVE_TX_NODE=UPDATE JBM_TX SET NODE_ID=? WHERE TRANSACTION_ID=?
UPDATE_ID_IN_CACHE=UPDATE JBM_ID_CACHE SET JBM_ID = ? WHERE NODE_ID = ? AND CNTR = ?
INSERT_ID_IN_CACHE=INSERT INTO JBM_ID_CACHE (NODE_ID, CNTR, JBM_ID) VALUES (?, ?, ?)
LOAD_ID_CACHE=SELECT CNTR, JBM_ID FROM JBM_ID_CACHE WHERE NODE_ID = ?
Modified: branches/JBossMessaging_1_4_8_SP8_JBossMessaging-1940/src/main/org/jboss/jms/server/ServerPeer.java
===================================================================
--- branches/JBossMessaging_1_4_8_SP8_JBossMessaging-1940/src/main/org/jboss/jms/server/ServerPeer.java 2013-03-08 08:06:59 UTC (rev 8594)
+++ branches/JBossMessaging_1_4_8_SP8_JBossMessaging-1940/src/main/org/jboss/jms/server/ServerPeer.java 2013-03-08 08:28:00 UTC (rev 8595)
@@ -274,6 +274,7 @@
{
((JDBCPersistenceManager)persistenceManager).injectNodeID(serverPeerID);
((JDBCPersistenceManager)persistenceManager).setStopServerPeerOnDBFailure(this.stopServerPeerOnDBFailure);
+ ((JDBCPersistenceManager)persistenceManager).setServerPeer(this);
}
else if (persistenceManager instanceof NullPersistenceManager)
{
Modified: branches/JBossMessaging_1_4_8_SP8_JBossMessaging-1940/src/main/org/jboss/messaging/core/impl/JDBCPersistenceManager.java
===================================================================
--- branches/JBossMessaging_1_4_8_SP8_JBossMessaging-1940/src/main/org/jboss/messaging/core/impl/JDBCPersistenceManager.java 2013-03-08 08:06:59 UTC (rev 8594)
+++ branches/JBossMessaging_1_4_8_SP8_JBossMessaging-1940/src/main/org/jboss/messaging/core/impl/JDBCPersistenceManager.java 2013-03-08 08:28:00 UTC (rev 8595)
@@ -22,11 +22,14 @@
package org.jboss.messaging.core.impl;
import org.jboss.jms.message.JBossMessage;
+import org.jboss.jms.server.ServerPeer;
import org.jboss.jms.tx.MessagingXid;
import org.jboss.logging.Logger;
+import org.jboss.messaging.core.contract.Binding;
import org.jboss.messaging.core.contract.Message;
import org.jboss.messaging.core.contract.MessageReference;
import org.jboss.messaging.core.contract.PersistenceManager;
+import org.jboss.messaging.core.contract.PostOffice;
import org.jboss.messaging.core.impl.message.MessageFactory;
import org.jboss.messaging.core.impl.message.MessageSupport;
import org.jboss.messaging.core.impl.tx.PreparedTxInfo;
@@ -106,16 +109,12 @@
private boolean supportsTxAge;
- private int maxRetry = 25;
-
- private int retryInterval = 1000;
-
- private boolean retryOnConnectionFailure = false;
-
private volatile boolean stopServerPeerOnDBFailure = false;
private Object moveLock = new Object();
+ private ServerPeer server;
+
// Constructors --------------------------------------------------
public JDBCPersistenceManager(DataSource ds, TransactionManager tm,
@@ -1192,6 +1191,14 @@
public Object doTransaction() throws Exception
{
PreparedStatement statement = null;
+ PreparedStatement statement1 = null;
+ PreparedStatement statement2 = null;
+ PreparedStatement statement3 = null;
+ PreparedStatement statement4 = null;
+ ResultSet rs = null;
+ ResultSet rs1 = null;
+ ResultSet rs2 = null;
+
try
{
statement = conn.prepareStatement(getSQLStatement("UPDATE_TX"));
@@ -1199,14 +1206,106 @@
statement.setInt(2, fromNodeID);
int affected = statement.executeUpdate();
+ statement.close();
+ statement = null;
+
log.debug("Merged " + affected + " transactions from channel "
+ fromNodeID + " into node " + toNodeID);
+
+ //The following code is for JBMESSAGING-1940
+ if (affected > 0)
+ {
+ //check non-movable transactions
+ statement1 = conn.prepareStatement(getSQLStatement("SELECT_PREPARED_TRANSACTIONS"));
+ statement1.setInt(1, nodeID);
+
+ rs = statement1.executeQuery();
+
+ List<Long> nonMovableTxs = new ArrayList<Long>();
+
+ //check '+" messages, if binding is missing, don't merge this tx
+ String sql = getSQLStatement("SELECT_MESSAGE_ID_FOR_REF");
+ statement2 = conn.prepareStatement(sql);
+
+ //check '-' messages, if binding is missing, don't merge this tx
+ sql = getSQLStatement("SELECT_MESSAGE_ID_FOR_ACK");
+ statement3 = conn.prepareStatement(sql);
+
+ PostOffice postOffice = server.getPostOfficeInstance();
+
+ while (rs.next())
+ {
+ boolean found = false;
+
+ long txId = rs.getLong(1);
+
+ statement2.setLong(1, txId);
+
+ rs1 = statement2.executeQuery();
+
+ while (rs1.next())
+ {
+ long channelId = rs1.getLong(2);
+ Binding binding = postOffice.getBindingForChannelID(channelId);
+ if (binding == null)
+ {
+ nonMovableTxs.add(txId);
+ found = true;
+ break;
+ }
+ }
+
+ rs1.close();
+ rs1 = null;
+ if (found) continue;
+
+ statement3.setLong(1, txId);
+
+ rs2 = statement3.executeQuery();
+
+ while (rs2.next())
+ {
+ long channelId = rs2.getLong(2);
+ Binding binding = postOffice.getBindingForChannelID(channelId);
+ if (binding == null)
+ {
+ nonMovableTxs.add(txId);
+ break;
+ }
+ }
+ rs2.close();
+ rs2 = null;
+ }
+
+ rs.close();
+ rs = null;
+ statement2.close();
+ statement2 = null;
+ statement3.close();
+ statement3 = null;
+
+ if (nonMovableTxs.size() > 0)
+ {
+ //put non-movable tx back
+ statement4 = conn.prepareStatement(getSQLStatement("MOVE_TX_NODE"));
+ for (Long nonMTxId : nonMovableTxs)
+ {
+ statement4.setInt(1, fromNodeID);
+ statement4.setLong(2, nonMTxId);
+ statement4.executeUpdate();
+ }
+ statement4.close();
+ statement4 = null;
+ log.debug("Moved transactions back: " + nonMovableTxs);
+ }
+ }
return null;
}
finally
{
- closeStatement(statement);
+ closeResultSet(rs, rs1, rs2);
+ closeStatement(statement, statement1, statement2, statement3, statement4);
}
}
}
@@ -3091,6 +3190,7 @@
map.put("SELECT_MESSAGE_ID_FOR_ACK",
"SELECT MESSAGE_ID, CHANNEL_ID FROM JBM_MSG_REF WHERE TRANSACTION_ID = ? AND STATE = '-' ORDER BY ORD");
map.put("UPDATE_TX", "UPDATE JBM_TX SET NODE_ID=? WHERE NODE_ID=?");
+ map.put("MOVE_TX_NODE", "UPDATE JBM_TX SET NODE_ID=? WHERE TRANSACTION_ID=?");
// Counter
map.put("UPDATE_COUNTER",
@@ -3613,4 +3713,9 @@
this.stopServerPeerOnDBFailure = crashServerOnDBFailure;
}
+ public void setServerPeer(ServerPeer server)
+ {
+ this.server = server;
+ }
+
}
Modified: branches/JBossMessaging_1_4_8_SP8_JBossMessaging-1940/src/main/org/jboss/messaging/core/impl/JDBCSupport.java
===================================================================
--- branches/JBossMessaging_1_4_8_SP8_JBossMessaging-1940/src/main/org/jboss/messaging/core/impl/JDBCSupport.java 2013-03-08 08:06:59 UTC (rev 8594)
+++ branches/JBossMessaging_1_4_8_SP8_JBossMessaging-1940/src/main/org/jboss/messaging/core/impl/JDBCSupport.java 2013-03-08 08:28:00 UTC (rev 8595)
@@ -252,39 +252,45 @@
return false;
}
- protected void closeResultSet(ResultSet rs)
+ protected void closeResultSet(ResultSet... rss)
{
- if (rs != null)
+ for (ResultSet rs : rss)
{
- try
+ if (rs != null)
{
- rs.close();
+ try
+ {
+ rs.close();
+ }
+ catch (Throwable e)
+ {
+ if (trace)
+ {
+ log.trace("Failed to close result set", e);
+ }
+ }
}
- catch (Throwable e)
- {
- if (trace)
- {
- log.trace("Failed to close result set", e);
- }
- }
}
}
- protected void closeStatement(Statement st)
+ protected void closeStatement(Statement... sts)
{
- if (st != null)
+ for (Statement st : sts)
{
- try
+ if (st != null)
{
- st.close();
+ try
+ {
+ st.close();
+ }
+ catch (Throwable e)
+ {
+ if (trace)
+ {
+ log.trace("Failed to close statement", e);
+ }
+ }
}
- catch (Throwable e)
- {
- if (trace)
- {
- log.trace("Failed to close statement", e);
- }
- }
}
}
Modified: branches/JBossMessaging_1_4_8_SP8_JBossMessaging-1940/tests/src/org/jboss/test/messaging/jms/clustering/XAFailoverTest.java
===================================================================
--- branches/JBossMessaging_1_4_8_SP8_JBossMessaging-1940/tests/src/org/jboss/test/messaging/jms/clustering/XAFailoverTest.java 2013-03-08 08:06:59 UTC (rev 8594)
+++ branches/JBossMessaging_1_4_8_SP8_JBossMessaging-1940/tests/src/org/jboss/test/messaging/jms/clustering/XAFailoverTest.java 2013-03-08 08:28:00 UTC (rev 8595)
@@ -17,6 +17,7 @@
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
+import javax.jms.Queue;
import javax.jms.ServerSession;
import javax.jms.ServerSessionPool;
import javax.jms.Session;
@@ -1185,6 +1186,187 @@
}
}
+ /**
+ * deploy a distributed queue on only one node and send some messages ,
+ * receive the messages within a XA transaction, crash the node after
+ * tx prepared. Then simulate XA recovery on the other node. The expected
+ * behavior is that the recovery won't pick up the messages, which are
+ * only able to be recovered on the original node.
+ * https://jira.jboss.org/jira/browse/JBMESSAGING-1940
+ */
+ public void testRecoveryWithSingleDistributedTargetWithMessage() throws Exception
+ {
+ Connection conn = null;
+ XAConnection xaconn = null;
+
+ try
+ {
+ ServerManagement.deployQueue(false, "singleDeployedNonDistributedQueue", 1);
+ Queue singleNonClusteredQueue = (Queue)ic[1].lookup("queue/singleDeployedNonDistributedQueue");
+
+ conn = createConnectionOnServer(cf, 1);
+
+ Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageProducer prod = session.createProducer(singleNonClusteredQueue);
+
+ int num = 10;
+ for (int i = 0; i < num; i++)
+ {
+ TextMessage m = session.createTextMessage("TX1-" + i);
+ prod.send(m);
+ }
+
+ xaconn = this.createXAConnectionOnServer(cf, 1);
+ xaconn.start();
+
+ XASession xasess = xaconn.createXASession();
+ XAResource res = xasess.getXAResource();
+
+ //tx - receive - prepared
+ Xid xid1 = new MessagingXid("bq1".getBytes(), 42, "eemeli".getBytes());
+
+ res.start(xid1, XAResource.TMNOFLAGS);
+
+ MessageConsumer consumer = xasess.createConsumer(singleNonClusteredQueue);
+ for (int i = 0; i < num; i++)
+ {
+ Message m = consumer.receive(5000);
+ assertNotNull(m);
+ }
+
+ res.end(xid1, XAResource.TMSUCCESS);
+
+ res.prepare(xid1);
+
+ //tx - send - prepared
+ Xid xid2 = new MessagingXid("bq2".getBytes(), 42, "eemeli2".getBytes());
+
+ res.start(xid2, XAResource.TMNOFLAGS);
+
+ MessageProducer producer = xasess.createProducer(singleNonClusteredQueue);
+ for (int i = 0; i < num; i++)
+ {
+ Message m = xasess.createTextMessage("TX2-" + i);
+ producer.send(m);
+ }
+
+ res.end(xid2, XAResource.TMSUCCESS);
+
+ res.prepare(xid2);
+
+ //normal tx on a distributed queue
+ Xid xid3 = new MessagingXid("bq3".getBytes(), 42, "eemeli3".getBytes());
+
+ res.start(xid3, XAResource.TMNOFLAGS);
+
+ producer = xasess.createProducer(queue[1]);
+ for (int i = 0; i < num; i++)
+ {
+ Message m = xasess.createTextMessage("TX3-" + i);
+ producer.send(m);
+ }
+
+ res.end(xid3, XAResource.TMSUCCESS);
+
+ res.prepare(xid3);
+
+ xaconn.close();
+
+ // register a failover listener
+ SimpleFailoverListener failoverListener = new SimpleFailoverListener();
+ ((JBossConnection)conn).registerFailoverListener(failoverListener);
+
+ ServerManagement.kill(1);
+
+ // wait for the client-side failover to complete
+
+ while(true)
+ {
+ FailoverEvent event = failoverListener.getEvent(30000);
+ if (event != null && FailoverEvent.FAILOVER_COMPLETED == event.getType())
+ {
+ break;
+ }
+ if (event == null)
+ {
+ fail("Did not get expected FAILOVER_COMPLETED event");
+ }
+ }
+
+ xaconn = createXAConnectionOnServer(cf, 0);
+ xaconn.start();
+
+ XAResource recoveryRes = xaconn.createXASession().getXAResource();
+ Xid[] txs = recoveryRes.recover(XAResource.TMSTARTRSCAN);
+
+ //the transactions xid1 and xid2 shouldn't be recovered on node 0, because
+ //its messages stay at node 1.
+ assertEquals(1, txs.length);
+
+ assertTrue(txs[0].equals(xid3));
+
+ //commit
+ recoveryRes.commit(txs[0], false);
+
+ txs = recoveryRes.recover(XAResource.TMENDRSCAN);
+ assertEquals(0, txs.length);
+
+ //now receive messages
+ Session sess = xaconn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ consumer = sess.createConsumer(queue[0]);
+ for (int i = 0; i < num; i++)
+ {
+ Message m = consumer.receive(5000);
+ assertNotNull(m);
+ }
+
+ xaconn.close();
+
+ //simulate restarting node 1, do not clean db
+ ServerManagement.start(1, config, overrides, false);
+
+ ServerManagement.deployQueue(false, "singleDeployedNonDistributedQueue", 1);
+ ic[1] = new InitialContext(ServerManagement.getJNDIEnvironment(1));
+
+ singleNonClusteredQueue = (Queue)ic[1].lookup("queue/singleDeployedNonDistributedQueue");
+
+ xaconn = createXAConnectionOnServer(cf, 1);
+ xaconn.start();
+
+ recoveryRes = xaconn.createXASession().getXAResource();
+ txs = recoveryRes.recover(XAResource.TMSTARTRSCAN);
+
+ //now the tx should get recovered.
+ assertEquals(2, txs.length);
+
+ recoveryRes.commit(txs[0], false);
+ recoveryRes.commit(txs[1], false);
+
+ txs = recoveryRes.recover(XAResource.TMENDRSCAN);
+ assertEquals(0, txs.length);
+
+ //receive message on xid2
+ sess = xaconn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ consumer = sess.createConsumer(singleNonClusteredQueue);
+ for (int i = 0; i < num; i++)
+ {
+ Message m = consumer.receive(5000);
+ assertNotNull(m);
+ }
+ }
+ finally
+ {
+ if (conn != null)
+ {
+ conn.close();
+ }
+ if (xaconn != null)
+ {
+ xaconn.close();
+ }
+ }
+ }
+
// Inner classes --------------------------------------------------------------------------------
Modified: branches/JBossMessaging_1_4_8_SP8_JBossMessaging-1940/tests/src/org/jboss/test/messaging/tools/ServerManagement.java
===================================================================
--- branches/JBossMessaging_1_4_8_SP8_JBossMessaging-1940/tests/src/org/jboss/test/messaging/tools/ServerManagement.java 2013-03-08 08:06:59 UTC (rev 8594)
+++ branches/JBossMessaging_1_4_8_SP8_JBossMessaging-1940/tests/src/org/jboss/test/messaging/tools/ServerManagement.java 2013-03-08 08:28:00 UTC (rev 8595)
@@ -1004,6 +1004,15 @@
}
/**
+ * Simulates a queue deployment
+ */
+ public static void deployQueue(boolean clustered, String name, int serverIndex) throws Exception
+ {
+ insureStarted(serverIndex);
+ servers[serverIndex].getServer().deployQueue(name, null, clustered);
+ }
+
+ /**
* Simulates a queue deployment (copying the queue descriptor in the deploy directory).
*/
public static void deployQueue(String name) throws Exception
More information about the jboss-cvs-commits
mailing list