[hornetq-commits] JBoss hornetq SVN: r8019 - in trunk: src/main/org/hornetq/core/management/impl and 11 other directories.
do-not-reply at jboss.org
do-not-reply at jboss.org
Thu Oct 1 03:47:34 EDT 2009
Author: jmesnil
Date: 2009-10-01 03:47:34 -0400 (Thu, 01 Oct 2009)
New Revision: 8019
Modified:
trunk/src/main/org/hornetq/core/management/HornetQServerControl.java
trunk/src/main/org/hornetq/core/management/impl/HornetQServerControlImpl.java
trunk/src/main/org/hornetq/core/persistence/StorageManager.java
trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
trunk/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java
trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
trunk/src/main/org/hornetq/core/transaction/ResourceManager.java
trunk/src/main/org/hornetq/core/transaction/impl/ResourceManagerImpl.java
trunk/tests/src/org/hornetq/tests/integration/client/HeuristicXATest.java
trunk/tests/src/org/hornetq/tests/integration/management/HornetQServerControlUsingCoreTest.java
trunk/tests/src/org/hornetq/tests/integration/xa/BasicXaTest.java
trunk/tests/src/org/hornetq/tests/unit/core/deployers/impl/QueueDeployerTest.java
trunk/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java
Log:
HORNETQ-33: Record list of heuristically committed/rolledback transaction branches
* store heuristic completion in the journal
* added management methods to list heuristically completed transactions
* fixed XAResource.recover() implementation to return them in addition to prepared branches
* implemented XAResource.forget() to delete the heuristic completions from the journal
Modified: trunk/src/main/org/hornetq/core/management/HornetQServerControl.java
===================================================================
--- trunk/src/main/org/hornetq/core/management/HornetQServerControl.java 2009-10-01 02:50:16 UTC (rev 8018)
+++ trunk/src/main/org/hornetq/core/management/HornetQServerControl.java 2009-10-01 07:47:34 UTC (rev 8019)
@@ -168,6 +168,10 @@
@Operation(desc = "List all the prepared transaction, sorted by date, oldest first")
String[] listPreparedTransactions() throws Exception;
+ String[] listHeuristicCommittedTransactions() throws Exception;
+
+ String[] listHeuristicRolledBackTransactions() throws Exception;
+
@Operation(desc = "Commit a prepared transaction")
boolean commitPreparedTransaction(@Parameter(desc = "the Base64 representation of a transaction", name = "transactionAsBase64") String transactionAsBase64) throws Exception;
Modified: trunk/src/main/org/hornetq/core/management/impl/HornetQServerControlImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/management/impl/HornetQServerControlImpl.java 2009-10-01 02:50:16 UTC (rev 8018)
+++ trunk/src/main/org/hornetq/core/management/impl/HornetQServerControlImpl.java 2009-10-01 07:47:34 UTC (rev 8019)
@@ -394,6 +394,30 @@
}
return s;
}
+
+ public String[] listHeuristicCommittedTransactions()
+ {
+ List<Xid> xids = resourceManager.getHeuristicCommittedTransactions();
+ String[] s = new String[xids.size()];
+ int i = 0;
+ for (Xid xid : xids)
+ {
+ s[i++] = XidImpl.toBase64String(xid);
+ }
+ return s;
+ }
+
+ public String[] listHeuristicRolledBackTransactions()
+ {
+ List<Xid> xids = resourceManager.getHeuristicRolledbackTransactions();
+ String[] s = new String[xids.size()];
+ int i = 0;
+ for (Xid xid : xids)
+ {
+ s[i++] = XidImpl.toBase64String(xid);
+ }
+ return s;
+ }
public synchronized boolean commitPreparedTransaction(final String transactionAsBase64) throws Exception
{
@@ -405,6 +429,8 @@
{
Transaction transaction = resourceManager.removeTransaction(xid);
transaction.commit();
+ long recordID = server.getStorageManager().storeHeuristicCompletion(xid, true);
+ resourceManager.putHeuristicCompletion(recordID, xid, true);
return true;
}
}
@@ -421,6 +447,8 @@
{
Transaction transaction = resourceManager.removeTransaction(xid);
transaction.rollback();
+ long recordID = server.getStorageManager().storeHeuristicCompletion(xid, false);
+ resourceManager.putHeuristicCompletion(recordID, xid, false);
return true;
}
}
Modified: trunk/src/main/org/hornetq/core/persistence/StorageManager.java
===================================================================
--- trunk/src/main/org/hornetq/core/persistence/StorageManager.java 2009-10-01 02:50:16 UTC (rev 8018)
+++ trunk/src/main/org/hornetq/core/persistence/StorageManager.java 2009-10-01 07:47:34 UTC (rev 8019)
@@ -96,6 +96,10 @@
void deletePageTransactional(long txID, long recordID) throws Exception;
+ long storeHeuristicCompletion(Xid xid, boolean isCommit) throws Exception;
+
+ void deleteHeuristicCompletion(long id) throws Exception;
+
void loadMessageJournal(PagingManager pagingManager,
ResourceManager resourceManager,
Map<Long, Queue> queues,
Modified: trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2009-10-01 02:50:16 UTC (rev 8018)
+++ trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2009-10-01 07:47:34 UTC (rev 8019)
@@ -112,6 +112,8 @@
public static final byte DUPLICATE_ID = 37;
+ public static final byte HEURISTIC_COMPLETION = 38;
+
private UUID persistentID;
private final BatchingIDGenerator idGenerator;
@@ -378,6 +380,18 @@
messageJournal.appendUpdateRecordTransactional(txID, messageID, ACKNOWLEDGE_REF, new RefEncoding(queueID));
}
+ public long storeHeuristicCompletion(Xid xid, boolean isCommit) throws Exception
+ {
+ long id = generateUniqueID();
+ messageJournal.appendAddRecord(id, HEURISTIC_COMPLETION, new HeuristicCompletionEncoding(xid, isCommit), true);
+ return id;
+ }
+
+ public void deleteHeuristicCompletion(long id) throws Exception
+ {
+ messageJournal.appendDeleteRecord(id, true);
+ }
+
public void deletePageTransactional(final long txID, final long recordID) throws Exception
{
messageJournal.appendDeleteRecordTransactional(txID, recordID);
@@ -686,6 +700,13 @@
break;
}
+ case HEURISTIC_COMPLETION:
+ {
+ HeuristicCompletionEncoding encoding = new HeuristicCompletionEncoding();
+ encoding.decode(buff);
+ resourceManager.putHeuristicCompletion(record.id, encoding.xid, encoding.isCommit);
+ break;
+ }
default:
{
throw new IllegalStateException("Invalid record type " + recordType);
@@ -1208,7 +1229,40 @@
return XidCodecSupport.getXidEncodeLength(xid);
}
}
+
+ private static class HeuristicCompletionEncoding implements EncodingSupport
+ {
+ Xid xid;
+ boolean isCommit;
+
+ HeuristicCompletionEncoding(final Xid xid, final boolean isCommit)
+ {
+ this.xid = xid;
+ this.isCommit = isCommit;
+ }
+ HeuristicCompletionEncoding()
+ {
+ }
+
+ public void decode(final HornetQBuffer buffer)
+ {
+ xid = XidCodecSupport.decodeXid(buffer);
+ isCommit = buffer.readBoolean();
+ }
+
+ public void encode(final HornetQBuffer buffer)
+ {
+ XidCodecSupport.encodeXid(xid, buffer);
+ buffer.writeBoolean(isCommit);
+ }
+
+ public int getEncodeSize()
+ {
+ return XidCodecSupport.getXidEncodeLength(xid) + DataConstants.SIZE_BOOLEAN;
+ }
+ }
+
private static class PersistentQueueBindingEncoding implements EncodingSupport, QueueBindingInfo
{
long persistenceID;
Modified: trunk/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java
===================================================================
--- trunk/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java 2009-10-01 02:50:16 UTC (rev 8018)
+++ trunk/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java 2009-10-01 07:47:34 UTC (rev 8019)
@@ -164,6 +164,15 @@
final long recordID) throws Exception
{
}
+
+ public long storeHeuristicCompletion(final Xid xid, final boolean isCommit) throws Exception
+ {
+ return generateUniqueID();
+ }
+
+ public void deleteHeuristicCompletion(final long txID) throws Exception
+ {
+ }
/* (non-Javadoc)
* @see org.hornetq.core.persistence.StorageManager#createLargeMessageStorage(long, int, int)
Modified: trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2009-10-01 02:50:16 UTC (rev 8018)
+++ trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2009-10-01 07:47:34 UTC (rev 8019)
@@ -782,9 +782,20 @@
if (theTx == null)
{
- final String msg = "Cannot find xid in resource manager: " + xid;
-
- response = new SessionXAResponseMessage(true, XAException.XAER_NOTA, msg);
+ // checked heuristic committed transactions
+ if (resourceManager.getHeuristicCommittedTransactions().contains(xid))
+ {
+ response = new SessionXAResponseMessage(true, XAException.XA_HEURCOM, "transaction has been heuristically committed: " + xid);
+ }
+ // checked heuristic rolled back transactions
+ else if (resourceManager.getHeuristicRolledbackTransactions().contains(xid))
+ {
+ response = new SessionXAResponseMessage(true, XAException.XA_HEURRB, "transaction has been heuristically rolled back: " + xid);
+ }
+ else
+ {
+ response = new SessionXAResponseMessage(true, XAException.XAER_NOTA, "Cannot find xid in resource manager: " + xid);
+ }
}
else
{
@@ -901,11 +912,26 @@
public void handleXAForget(final SessionXAForgetMessage packet)
{
- // Do nothing since we don't support heuristic commits / rollback from the
- // resource manager
+ long id = resourceManager.removeHeuristicCompletion(packet.getXid());
+ int code = XAResource.XA_OK;
+ if (id != -1)
+ {
+ try
+ {
+ storageManager.deleteHeuristicCompletion(id);
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ code = XAException.XAER_RMERR;
+ }
+ } else
+ {
+ code = XAException.XAER_NOTA;
+ }
+
+ Packet response = new SessionXAResponseMessage((code != XAResource.XA_OK), code, null);
- Packet response = new SessionXAResponseMessage(false, XAResource.XA_OK, null);
-
channel.confirm(packet);
channel.send(response);
@@ -1044,9 +1070,20 @@
if (theTx == null)
{
- final String msg = "Cannot find xid in resource manager: " + xid;
-
- response = new SessionXAResponseMessage(true, XAException.XAER_NOTA, msg);
+ // checked heuristic committed transactions
+ if (resourceManager.getHeuristicCommittedTransactions().contains(xid))
+ {
+ response = new SessionXAResponseMessage(true, XAException.XA_HEURCOM, "transaction has ben heuristically committed: " + xid);
+ }
+ // checked heuristic rolled back transactions
+ else if (resourceManager.getHeuristicRolledbackTransactions().contains(xid))
+ {
+ response = new SessionXAResponseMessage(true, XAException.XA_HEURRB, "transaction has ben heuristically rolled back: " + xid);
+ }
+ else
+ {
+ response = new SessionXAResponseMessage(true, XAException.XAER_NOTA, "Cannot find xid in resource manager: " + xid);
+ }
}
else
{
@@ -1249,8 +1286,12 @@
public void handleGetInDoubtXids(final Packet packet)
{
- Packet response = new SessionXAGetInDoubtXidsResponseMessage(resourceManager.getPreparedTransactions());
-
+ List<Xid> indoubtsXids = new ArrayList<Xid>();
+ indoubtsXids.addAll(resourceManager.getPreparedTransactions());
+ indoubtsXids.addAll(resourceManager.getHeuristicCommittedTransactions());
+ indoubtsXids.addAll(resourceManager.getHeuristicRolledbackTransactions());
+ Packet response = new SessionXAGetInDoubtXidsResponseMessage(indoubtsXids);
+
channel.confirm(packet);
channel.send(response);
Modified: trunk/src/main/org/hornetq/core/transaction/ResourceManager.java
===================================================================
--- trunk/src/main/org/hornetq/core/transaction/ResourceManager.java 2009-10-01 02:50:16 UTC (rev 8018)
+++ trunk/src/main/org/hornetq/core/transaction/ResourceManager.java 2009-10-01 07:47:34 UTC (rev 8019)
@@ -43,4 +43,13 @@
List<Xid> getPreparedTransactions();
Map<Xid, Long> getPreparedTransactionsWithCreationTime();
+
+ void putHeuristicCompletion(long txid, Xid xid, boolean b);
+
+ long removeHeuristicCompletion(Xid xid);
+
+ List<Xid> getHeuristicCommittedTransactions();
+
+ List<Xid> getHeuristicRolledbackTransactions();
+
}
Modified: trunk/src/main/org/hornetq/core/transaction/impl/ResourceManagerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/transaction/impl/ResourceManagerImpl.java 2009-10-01 02:50:16 UTC (rev 8018)
+++ trunk/src/main/org/hornetq/core/transaction/impl/ResourceManagerImpl.java 2009-10-01 07:47:34 UTC (rev 8019)
@@ -16,6 +16,7 @@
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -23,7 +24,6 @@
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
import javax.transaction.xa.Xid;
@@ -44,6 +44,8 @@
private final ConcurrentMap<Xid, Transaction> transactions = new ConcurrentHashMap<Xid, Transaction>();
+ private List<HeuristicCompletionHolder> heuristicCompletions = new ArrayList<HeuristicCompletionHolder>();
+
private final int defaultTimeoutSeconds;
private volatile int timeoutSeconds;
@@ -163,7 +165,50 @@
}
return xidsWithCreationTime;
}
+
+ public void putHeuristicCompletion(final long recordID, final Xid xid, final boolean isCommit)
+ {
+ heuristicCompletions.add(new HeuristicCompletionHolder(recordID, xid, isCommit));
+ }
+
+ public List<Xid>getHeuristicCommittedTransactions()
+ {
+ return getHeuristicCompletedTransactions(true);
+ }
+
+ public List<Xid>getHeuristicRolledbackTransactions()
+ {
+ return getHeuristicCompletedTransactions(false);
+ }
+ public long removeHeuristicCompletion(Xid xid)
+ {
+ Iterator<HeuristicCompletionHolder> iterator = heuristicCompletions.iterator();
+ while (iterator.hasNext())
+ {
+ ResourceManagerImpl.HeuristicCompletionHolder holder = (ResourceManagerImpl.HeuristicCompletionHolder)iterator.next();
+ if (holder.xid.equals(xid))
+ {
+ iterator.remove();
+ return holder.recordID;
+ }
+ }
+ return -1;
+ }
+
+ private List<Xid>getHeuristicCompletedTransactions(boolean isCommit)
+ {
+ List<Xid> xids = new ArrayList<Xid>();
+ for (HeuristicCompletionHolder holder : heuristicCompletions)
+ {
+ if (holder.isCommit == isCommit)
+ {
+ xids.add(holder.xid);
+ }
+ }
+ return xids;
+ }
+
class TxTimeoutHandler implements Runnable
{
private boolean closed = false;
@@ -220,4 +265,18 @@
}
}
+
+ private class HeuristicCompletionHolder
+ {
+ public final boolean isCommit;
+ public final Xid xid;
+ public final long recordID;
+
+ public HeuristicCompletionHolder(long recordID, Xid xid, boolean isCommit)
+ {
+ this.recordID = recordID;
+ this.xid = xid;
+ this.isCommit = isCommit;
+ }
+ }
}
Modified: trunk/tests/src/org/hornetq/tests/integration/client/HeuristicXATest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/client/HeuristicXATest.java 2009-10-01 02:50:16 UTC (rev 8018)
+++ trunk/tests/src/org/hornetq/tests/integration/client/HeuristicXATest.java 2009-10-01 07:47:34 UTC (rev 8019)
@@ -138,6 +138,9 @@
System.out.println(preparedTransactions[0]);
+ assertEquals(0, jmxServer.listHeuristicCommittedTransactions().length);
+ assertEquals(0, jmxServer.listHeuristicRolledBackTransactions().length);
+
if (isCommit)
{
jmxServer.commitPreparedTransaction(XidImpl.toBase64String(xid));
@@ -147,6 +150,109 @@
jmxServer.rollbackPreparedTransaction(XidImpl.toBase64String(xid));
}
+ assertEquals(0, jmxServer.listPreparedTransactions().length);
+ if (isCommit)
+ {
+ assertEquals(1, jmxServer.listHeuristicCommittedTransactions().length);
+ assertEquals(0, jmxServer.listHeuristicRolledBackTransactions().length);
+ }
+ else
+ {
+ assertEquals(0, jmxServer.listHeuristicCommittedTransactions().length);
+ assertEquals(1, jmxServer.listHeuristicRolledBackTransactions().length);
+ }
+
+ if (isCommit)
+ {
+ assertEquals(1, ((Queue)server.getPostOffice().getBinding(ADDRESS).getBindable()).getMessageCount());
+
+ session = sf.createSession(false, false, false);
+
+ session.start();
+ ClientConsumer consumer = session.createConsumer(ADDRESS);
+ msg = consumer.receive(1000);
+ assertNotNull(msg);
+ msg.acknowledge();
+ assertEquals(123, msg.getBodySize());
+
+ session.commit();
+ session.close();
+ }
+
+ assertEquals(0, ((Queue)server.getPostOffice().getBinding(ADDRESS).getBindable()).getMessageCount());
+
+ }
+ finally
+ {
+ if (server.isStarted())
+ {
+ server.stop();
+ }
+ }
+
+
+
+ }
+
+ public void testHeuristicCommitWithRestart() throws Exception
+ {
+ doHeuristicCompletionWithRestart(true);
+ }
+
+ public void testHeuristicRollbackWithRestart() throws Exception
+ {
+ doHeuristicCompletionWithRestart(false);
+ }
+
+ private void doHeuristicCompletionWithRestart(final boolean isCommit) throws Exception
+ {
+ Configuration configuration = createDefaultConfig();
+ configuration.setJMXManagementEnabled(true);
+
+ HornetQServer server = createServer(true, configuration, mbeanServer, new HashMap<String, AddressSettings>());
+ try
+ {
+ server.start();
+ Xid xid = newXID();
+
+ ClientSessionFactory sf = createInVMFactory();
+
+ ClientSession session = sf.createSession(true, false, false);
+
+ session.createQueue(ADDRESS, ADDRESS, true);
+
+ session.start(xid, XAResource.TMNOFLAGS);
+
+ ClientProducer producer = session.createProducer(ADDRESS);
+
+ ClientMessage msg = session.createClientMessage(true);
+
+ msg.getBody().writeBytes(new byte[123]);
+
+ producer.send(msg);
+
+ session.end(xid, XAResource.TMSUCCESS);
+
+ session.prepare(xid);
+
+ session.close();
+
+ HornetQServerControl jmxServer = ManagementControlHelper.createHornetQServerControl(mbeanServer);
+
+ String preparedTransactions[] = jmxServer.listPreparedTransactions();
+
+ assertEquals(1, preparedTransactions.length);
+ System.out.println(preparedTransactions[0]);
+
+ if (isCommit)
+ {
+ jmxServer.commitPreparedTransaction(XidImpl.toBase64String(xid));
+ }
+ else
+ {
+ jmxServer.rollbackPreparedTransaction(XidImpl.toBase64String(xid));
+ }
+
preparedTransactions = jmxServer.listPreparedTransactions();
assertEquals(0, preparedTransactions.length);
@@ -169,6 +275,22 @@
assertEquals(0, ((Queue)server.getPostOffice().getBinding(ADDRESS).getBindable()).getMessageCount());
+ server.stop();
+
+ server.start();
+
+ jmxServer = ManagementControlHelper.createHornetQServerControl(mbeanServer);
+ if (isCommit)
+ {
+ String[] listHeuristicCommittedTransactions = jmxServer.listHeuristicCommittedTransactions();
+ assertEquals(1, listHeuristicCommittedTransactions.length);
+ System.out.println(listHeuristicCommittedTransactions[0]);
+ } else
+ {
+ String[] listHeuristicRolledBackTransactions = jmxServer.listHeuristicRolledBackTransactions();
+ assertEquals(1, listHeuristicRolledBackTransactions.length);
+ System.out.println(listHeuristicRolledBackTransactions[0]);
+ }
}
finally
{
@@ -177,9 +299,222 @@
server.stop();
}
}
+ }
+
+ public void testRecoverHeuristicCommitWithRestart() throws Exception
+ {
+ doRecoverHeuristicCompletedTxWithRestart(true);
+ }
+
+ public void testRecoverHeuristicRollbackWithRestart() throws Exception
+ {
+ doRecoverHeuristicCompletedTxWithRestart(false);
+ }
+ private void doRecoverHeuristicCompletedTxWithRestart(boolean heuristicCommit) throws Exception
+ {
+ Configuration configuration = createDefaultConfig();
+ configuration.setJMXManagementEnabled(true);
+
+ HornetQServer server = createServer(true, configuration, mbeanServer, new HashMap<String, AddressSettings>());
+ try
+ {
+ server.start();
+ Xid xid = newXID();
+
+ ClientSessionFactory sf = createInVMFactory();
+
+ ClientSession session = sf.createSession(true, false, false);
+
+ session.createQueue(ADDRESS, ADDRESS, true);
+
+ session.start(xid, XAResource.TMNOFLAGS);
+
+ ClientProducer producer = session.createProducer(ADDRESS);
+
+ ClientMessage msg = session.createClientMessage(true);
+
+ msg.getBody().writeBytes(new byte[123]);
+
+ producer.send(msg);
+
+ session.end(xid, XAResource.TMSUCCESS);
+
+ session.prepare(xid);
+
+ session.close();
+
+ HornetQServerControl jmxServer = ManagementControlHelper.createHornetQServerControl(mbeanServer);
+
+ String preparedTransactions[] = jmxServer.listPreparedTransactions();
+
+ assertEquals(1, preparedTransactions.length);
+ System.out.println(preparedTransactions[0]);
+
+ if (heuristicCommit)
+ {
+ jmxServer.commitPreparedTransaction(XidImpl.toBase64String(xid));
+ }
+ else
+ {
+ jmxServer.rollbackPreparedTransaction(XidImpl.toBase64String(xid));
+ }
+
+ preparedTransactions = jmxServer.listPreparedTransactions();
+ assertEquals(0, preparedTransactions.length);
+
+ if (heuristicCommit)
+ {
+ assertEquals(1, ((Queue)server.getPostOffice().getBinding(ADDRESS).getBindable()).getMessageCount());
+
+ session = sf.createSession(false, false, false);
+
+ session.start();
+ ClientConsumer consumer = session.createConsumer(ADDRESS);
+ msg = consumer.receive(1000);
+ assertNotNull(msg);
+ msg.acknowledge();
+ assertEquals(123, msg.getBodySize());
+
+ session.commit();
+ session.close();
+ }
+
+ assertEquals(0, ((Queue)server.getPostOffice().getBinding(ADDRESS).getBindable()).getMessageCount());
+
+ server.stop();
+
+ server.start();
+
+ jmxServer = ManagementControlHelper.createHornetQServerControl(mbeanServer);
+ if (heuristicCommit)
+ {
+ String[] listHeuristicCommittedTransactions = jmxServer.listHeuristicCommittedTransactions();
+ assertEquals(1, listHeuristicCommittedTransactions.length);
+ System.out.println(listHeuristicCommittedTransactions[0]);
+ } else
+ {
+ String[] listHeuristicRolledBackTransactions = jmxServer.listHeuristicRolledBackTransactions();
+ assertEquals(1, listHeuristicRolledBackTransactions.length);
+ System.out.println(listHeuristicRolledBackTransactions[0]);
+ }
+
+ session = sf.createSession(true, false, false);
+ Xid[] recoveredXids = session.recover(XAResource.TMSTARTRSCAN);
+ assertEquals(1, recoveredXids.length);
+ assertEquals(xid, recoveredXids[0]);
+ assertEquals(0, session.recover(XAResource.TMENDRSCAN).length);
+ }
+ finally
+ {
+ if (server.isStarted())
+ {
+ server.stop();
+ }
+ }
}
+
+ public void testForgetHeuristicCommitAndRestart() throws Exception
+ {
+ doForgetHeuristicCompletedTxAndRestart(true);
+ }
+ public void testForgetHeuristicRollbackAndRestart() throws Exception
+ {
+ doForgetHeuristicCompletedTxAndRestart(false);
+ }
+
+ private void doForgetHeuristicCompletedTxAndRestart(boolean heuristicCommit) throws Exception
+ {
+ Configuration configuration = createDefaultConfig();
+ configuration.setJMXManagementEnabled(true);
+
+ HornetQServer server = createServer(true, configuration, mbeanServer, new HashMap<String, AddressSettings>());
+ try
+ {
+ server.start();
+ Xid xid = newXID();
+
+ ClientSessionFactory sf = createInVMFactory();
+
+ ClientSession session = sf.createSession(true, false, false);
+
+ session.createQueue(ADDRESS, ADDRESS, true);
+
+ session.start(xid, XAResource.TMNOFLAGS);
+
+ ClientProducer producer = session.createProducer(ADDRESS);
+
+ ClientMessage msg = session.createClientMessage(true);
+
+ msg.getBody().writeBytes(new byte[123]);
+
+ producer.send(msg);
+
+ session.end(xid, XAResource.TMSUCCESS);
+
+ session.prepare(xid);
+
+ HornetQServerControl jmxServer = ManagementControlHelper.createHornetQServerControl(mbeanServer);
+
+ String preparedTransactions[] = jmxServer.listPreparedTransactions();
+
+ assertEquals(1, preparedTransactions.length);
+ System.out.println(preparedTransactions[0]);
+
+ if (heuristicCommit)
+ {
+ jmxServer.commitPreparedTransaction(XidImpl.toBase64String(xid));
+ }
+ else
+ {
+ jmxServer.rollbackPreparedTransaction(XidImpl.toBase64String(xid));
+ }
+
+ preparedTransactions = jmxServer.listPreparedTransactions();
+ assertEquals(0, preparedTransactions.length);
+
+ session.forget(xid);
+
+ session.close();
+
+ if (heuristicCommit)
+ {
+ assertEquals(0, jmxServer.listHeuristicCommittedTransactions().length);
+ }
+ else
+ {
+ assertEquals(0, jmxServer.listHeuristicRolledBackTransactions().length);
+ }
+
+
+ server.stop();
+
+ server.start();
+
+ session = sf.createSession(true, false, false);
+ Xid[] recoveredXids = session.recover(XAResource.TMSTARTRSCAN);
+ assertEquals(0, recoveredXids.length);
+ jmxServer = ManagementControlHelper.createHornetQServerControl(mbeanServer);
+ if (heuristicCommit)
+ {
+ assertEquals(0, jmxServer.listHeuristicCommittedTransactions().length);
+ }
+ else
+ {
+ assertEquals(0, jmxServer.listHeuristicRolledBackTransactions().length);
+ }
+
+ }
+ finally
+ {
+ if (server.isStarted())
+ {
+ server.stop();
+ }
+ }
+ }
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
Modified: trunk/tests/src/org/hornetq/tests/integration/management/HornetQServerControlUsingCoreTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/management/HornetQServerControlUsingCoreTest.java 2009-10-01 02:50:16 UTC (rev 8018)
+++ trunk/tests/src/org/hornetq/tests/integration/management/HornetQServerControlUsingCoreTest.java 2009-10-01 07:47:34 UTC (rev 8019)
@@ -362,6 +362,16 @@
{
return (String[])proxy.invokeOperation("listPreparedTransactions");
}
+
+ public String[] listHeuristicCommittedTransactions() throws Exception
+ {
+ return (String[])proxy.invokeOperation("listHeuristicCommittedTransactions");
+ }
+
+ public String[] listHeuristicRolledBackTransactions() throws Exception
+ {
+ return (String[])proxy.invokeOperation("listHeuristicRolledBackTransactions");
+ }
public String[] listRemoteAddresses() throws Exception
{
Modified: trunk/tests/src/org/hornetq/tests/integration/xa/BasicXaTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/xa/BasicXaTest.java 2009-10-01 02:50:16 UTC (rev 8018)
+++ trunk/tests/src/org/hornetq/tests/integration/xa/BasicXaTest.java 2009-10-01 07:47:34 UTC (rev 8019)
@@ -397,11 +397,76 @@
}
- public void testForget() throws Exception
+ public void testForgetUnknownXID() throws Exception
{
- clientSession.forget(newXID());
+ try
+ {
+ clientSession.forget(newXID());
+ fail("should throw a XAERR_NOTA XAException");
+ }
+ catch (XAException e)
+ {
+ assertEquals(XAException.XAER_NOTA, e.errorCode);
+ }
}
+ public void testForgetHeuristicallyCommittedXID() throws Exception
+ {
+ Xid xid = newXID();
+ clientSession.start(xid, XAResource.TMNOFLAGS);
+ clientSession.end(xid, XAResource.TMSUCCESS);
+ clientSession.prepare(xid);
+
+ String[] preparedTransactions = messagingService.getHornetQServerControl().listPreparedTransactions();
+ assertEquals(1, preparedTransactions.length);
+ System.out.println(preparedTransactions[0]);
+ assertTrue(messagingService.getHornetQServerControl().commitPreparedTransaction(XidImpl.toBase64String(xid)));
+ assertEquals(1, messagingService.getHornetQServerControl().listHeuristicCommittedTransactions().length);
+
+ clientSession.forget(xid);
+
+ assertEquals(0, messagingService.getHornetQServerControl().listHeuristicCommittedTransactions().length);
+ }
+
+ public void testForgetHeuristicallyRolledBackXID() throws Exception
+ {
+ Xid xid = newXID();
+ clientSession.start(xid, XAResource.TMNOFLAGS);
+ clientSession.end(xid, XAResource.TMSUCCESS);
+ clientSession.prepare(xid);
+
+ String[] preparedTransactions = messagingService.getHornetQServerControl().listPreparedTransactions();
+ assertEquals(1, preparedTransactions.length);
+ System.out.println(preparedTransactions[0]);
+
+ assertTrue(messagingService.getHornetQServerControl().rollbackPreparedTransaction(XidImpl.toBase64String(xid)));
+ assertEquals(1, messagingService.getHornetQServerControl().listHeuristicRolledBackTransactions().length);
+
+ clientSession.forget(xid);
+
+ assertEquals(0, messagingService.getHornetQServerControl().listHeuristicRolledBackTransactions().length);
+ }
+
+ public void testCommitHeuristicallyCommittedXID() throws Exception
+ {
+ doCompleteHeuristicallyCompletedXID(true, true);
+ }
+
+ public void testCommitHeuristicallyRolledBackXID() throws Exception
+ {
+ doCompleteHeuristicallyCompletedXID(true, false);
+ }
+
+ public void testRollbacktHeuristicallyCommittedXID() throws Exception
+ {
+ doCompleteHeuristicallyCompletedXID(false, true);
+ }
+
+ public void testRollbackHeuristicallyRolledBackXID() throws Exception
+ {
+ doCompleteHeuristicallyCompletedXID(false, false);
+ }
+
public void testSimpleJoin() throws Exception
{
SimpleString ADDRESS1 = new SimpleString("Address-1");
@@ -666,6 +731,62 @@
}
}
+ private void doCompleteHeuristicallyCompletedXID(boolean isCommit, boolean heuristicCommit) throws Exception
+ {
+ Xid xid = newXID();
+ clientSession.start(xid, XAResource.TMNOFLAGS);
+ clientSession.end(xid, XAResource.TMSUCCESS);
+ clientSession.prepare(xid);
+
+ String[] preparedTransactions = messagingService.getHornetQServerControl().listPreparedTransactions();
+ assertEquals(1, preparedTransactions.length);
+
+ if (heuristicCommit)
+ {
+ assertTrue(messagingService.getHornetQServerControl().commitPreparedTransaction(XidImpl.toBase64String(xid)));
+ assertEquals(1, messagingService.getHornetQServerControl().listHeuristicCommittedTransactions().length);
+ }
+ else
+ {
+ assertTrue(messagingService.getHornetQServerControl().rollbackPreparedTransaction(XidImpl.toBase64String(xid)));
+ assertEquals(1, messagingService.getHornetQServerControl().listHeuristicRolledBackTransactions().length);
+ }
+ assertEquals(0, messagingService.getHornetQServerControl().listPreparedTransactions().length);
+
+ try
+ {
+ if (isCommit)
+ {
+ clientSession.commit(xid, false);
+ } else
+ {
+ clientSession.rollback(xid);
+ }
+ fail("neither commit not rollback must succeed on a heuristically completed tx");
+ }
+
+ catch (XAException e)
+ {
+ if (heuristicCommit)
+ {
+ assertEquals(XAException.XA_HEURCOM, e.errorCode);
+ }
+ else
+ {
+ assertEquals(XAException.XA_HEURRB, e.errorCode);
+ }
+ }
+
+ if (heuristicCommit)
+ {
+ assertEquals(1, messagingService.getHornetQServerControl().listHeuristicCommittedTransactions().length);
+ }
+ else
+ {
+ assertEquals(1, messagingService.getHornetQServerControl().listHeuristicRolledBackTransactions().length);
+ }
+ }
+
class TxMessageHandler implements MessageHandler
{
boolean failedToAck = false;
Modified: trunk/tests/src/org/hornetq/tests/unit/core/deployers/impl/QueueDeployerTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/deployers/impl/QueueDeployerTest.java 2009-10-01 02:50:16 UTC (rev 8018)
+++ trunk/tests/src/org/hornetq/tests/unit/core/deployers/impl/QueueDeployerTest.java 2009-10-01 07:47:34 UTC (rev 8019)
@@ -461,7 +461,17 @@
return null;
}
-
+
+ public String[] listHeuristicCommittedTransactions() throws Exception
+ {
+ return null;
+ }
+
+ public String[] listHeuristicRolledBackTransactions() throws Exception
+ {
+ return null;
+ }
+
public String[] listRemoteAddresses() throws Exception
{
Modified: trunk/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java 2009-10-01 02:50:16 UTC (rev 8018)
+++ trunk/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java 2009-10-01 07:47:34 UTC (rev 8019)
@@ -1034,6 +1034,15 @@
{
}
+ public long storeHeuristicCompletion(Xid xid, boolean isCommit) throws Exception
+ {
+ return -1;
+ }
+
+ public void deleteHeuristicCompletion(long txID) throws Exception
+ {
+ }
+
/* (non-Javadoc)
* @see org.hornetq.core.persistence.StorageManager#updateDeliveryCount(org.hornetq.core.server.MessageReference)
*/
More information about the hornetq-commits
mailing list