[hornetq-commits] JBoss hornetq SVN: r8019 - in trunk: src/main/org/hornetq/core/management/impl and 11 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Thu Oct 1 03:47:34 EDT 2009


Author: jmesnil
Date: 2009-10-01 03:47:34 -0400 (Thu, 01 Oct 2009)
New Revision: 8019

Modified:
   trunk/src/main/org/hornetq/core/management/HornetQServerControl.java
   trunk/src/main/org/hornetq/core/management/impl/HornetQServerControlImpl.java
   trunk/src/main/org/hornetq/core/persistence/StorageManager.java
   trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
   trunk/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java
   trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
   trunk/src/main/org/hornetq/core/transaction/ResourceManager.java
   trunk/src/main/org/hornetq/core/transaction/impl/ResourceManagerImpl.java
   trunk/tests/src/org/hornetq/tests/integration/client/HeuristicXATest.java
   trunk/tests/src/org/hornetq/tests/integration/management/HornetQServerControlUsingCoreTest.java
   trunk/tests/src/org/hornetq/tests/integration/xa/BasicXaTest.java
   trunk/tests/src/org/hornetq/tests/unit/core/deployers/impl/QueueDeployerTest.java
   trunk/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java
Log:
HORNETQ-33: Record list of heuristically committed/rolledback transaction branches

* store heuristic completion in the journal
* added management methods to list heuristically completed transactions
* fixed XAResource.recover() implementation to return them in addition to prepared branches
* implemented XAResource.forget() to delete the heuristic completions from the journal

Modified: trunk/src/main/org/hornetq/core/management/HornetQServerControl.java
===================================================================
--- trunk/src/main/org/hornetq/core/management/HornetQServerControl.java	2009-10-01 02:50:16 UTC (rev 8018)
+++ trunk/src/main/org/hornetq/core/management/HornetQServerControl.java	2009-10-01 07:47:34 UTC (rev 8019)
@@ -168,6 +168,10 @@
    @Operation(desc = "List all the prepared transaction, sorted by date, oldest first")
    String[] listPreparedTransactions() throws Exception;
 
+   String[] listHeuristicCommittedTransactions() throws Exception;
+
+   String[] listHeuristicRolledBackTransactions() throws Exception;
+
    @Operation(desc = "Commit a prepared transaction")
    boolean commitPreparedTransaction(@Parameter(desc = "the Base64 representation of a transaction", name = "transactionAsBase64") String transactionAsBase64) throws Exception;
 

Modified: trunk/src/main/org/hornetq/core/management/impl/HornetQServerControlImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/management/impl/HornetQServerControlImpl.java	2009-10-01 02:50:16 UTC (rev 8018)
+++ trunk/src/main/org/hornetq/core/management/impl/HornetQServerControlImpl.java	2009-10-01 07:47:34 UTC (rev 8019)
@@ -394,6 +394,30 @@
       }
       return s;
    }
+   
+   public String[] listHeuristicCommittedTransactions()
+   {
+      List<Xid> xids = resourceManager.getHeuristicCommittedTransactions();
+      String[] s = new String[xids.size()];
+      int i = 0;
+      for (Xid xid : xids)
+      {
+         s[i++] = XidImpl.toBase64String(xid);
+      }
+      return s;
+   }
+   
+   public String[] listHeuristicRolledBackTransactions()
+   {
+      List<Xid> xids = resourceManager.getHeuristicRolledbackTransactions();
+      String[] s = new String[xids.size()];
+      int i = 0;
+      for (Xid xid : xids)
+      {
+         s[i++] = XidImpl.toBase64String(xid);
+      }
+      return s;
+   }
 
    public synchronized boolean commitPreparedTransaction(final String transactionAsBase64) throws Exception
    {
@@ -405,6 +429,8 @@
          {
             Transaction transaction = resourceManager.removeTransaction(xid);
             transaction.commit();
+            long recordID = server.getStorageManager().storeHeuristicCompletion(xid, true);
+            resourceManager.putHeuristicCompletion(recordID, xid, true);
             return true;
          }
       }
@@ -421,6 +447,8 @@
          {
             Transaction transaction = resourceManager.removeTransaction(xid);
             transaction.rollback();
+            long recordID = server.getStorageManager().storeHeuristicCompletion(xid, false);
+            resourceManager.putHeuristicCompletion(recordID, xid, false);
             return true;
          }
       }

Modified: trunk/src/main/org/hornetq/core/persistence/StorageManager.java
===================================================================
--- trunk/src/main/org/hornetq/core/persistence/StorageManager.java	2009-10-01 02:50:16 UTC (rev 8018)
+++ trunk/src/main/org/hornetq/core/persistence/StorageManager.java	2009-10-01 07:47:34 UTC (rev 8019)
@@ -96,6 +96,10 @@
 
    void deletePageTransactional(long txID, long recordID) throws Exception;
 
+   long storeHeuristicCompletion(Xid xid, boolean isCommit) throws Exception;
+   
+   void deleteHeuristicCompletion(long id) throws Exception;
+   
    void loadMessageJournal(PagingManager pagingManager,
                            ResourceManager resourceManager,
                            Map<Long, Queue> queues,

Modified: trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java	2009-10-01 02:50:16 UTC (rev 8018)
+++ trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java	2009-10-01 07:47:34 UTC (rev 8019)
@@ -112,6 +112,8 @@
 
    public static final byte DUPLICATE_ID = 37;
 
+   public static final byte HEURISTIC_COMPLETION = 38;
+
    private UUID persistentID;
 
    private final BatchingIDGenerator idGenerator;
@@ -378,6 +380,18 @@
       messageJournal.appendUpdateRecordTransactional(txID, messageID, ACKNOWLEDGE_REF, new RefEncoding(queueID));
    }
 
+   public long storeHeuristicCompletion(Xid xid, boolean isCommit) throws Exception
+   {
+      long id = generateUniqueID();
+      messageJournal.appendAddRecord(id, HEURISTIC_COMPLETION, new HeuristicCompletionEncoding(xid, isCommit), true);
+      return id;
+   }
+   
+   public void deleteHeuristicCompletion(long id) throws Exception
+   {
+      messageJournal.appendDeleteRecord(id, true);
+   }
+   
    public void deletePageTransactional(final long txID, final long recordID) throws Exception
    {
       messageJournal.appendDeleteRecordTransactional(txID, recordID);
@@ -686,6 +700,13 @@
 
                break;
             }
+            case HEURISTIC_COMPLETION:
+            {
+               HeuristicCompletionEncoding encoding = new HeuristicCompletionEncoding();
+               encoding.decode(buff);
+               resourceManager.putHeuristicCompletion(record.id, encoding.xid, encoding.isCommit);
+               break;
+            }
             default:
             {
                throw new IllegalStateException("Invalid record type " + recordType);
@@ -1208,7 +1229,40 @@
          return XidCodecSupport.getXidEncodeLength(xid);
       }
    }
+   
+   private static class HeuristicCompletionEncoding implements EncodingSupport
+   {
+      Xid xid;
+      boolean isCommit;
+      
+      HeuristicCompletionEncoding(final Xid xid, final boolean isCommit)
+      {
+         this.xid = xid;
+         this.isCommit = isCommit;
+      }
 
+      HeuristicCompletionEncoding()
+      {
+      }
+
+      public void decode(final HornetQBuffer buffer)
+      {
+         xid = XidCodecSupport.decodeXid(buffer);
+         isCommit = buffer.readBoolean();
+      }
+
+      public void encode(final HornetQBuffer buffer)
+      {
+         XidCodecSupport.encodeXid(xid, buffer);
+         buffer.writeBoolean(isCommit);
+      }
+
+      public int getEncodeSize()
+      {
+         return XidCodecSupport.getXidEncodeLength(xid) + DataConstants.SIZE_BOOLEAN;
+      }
+   }
+
    private static class PersistentQueueBindingEncoding implements EncodingSupport, QueueBindingInfo
    {
       long persistenceID;

Modified: trunk/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java
===================================================================
--- trunk/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java	2009-10-01 02:50:16 UTC (rev 8018)
+++ trunk/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java	2009-10-01 07:47:34 UTC (rev 8019)
@@ -164,6 +164,15 @@
                                               final long recordID) throws Exception
    {
    }
+   
+   public long storeHeuristicCompletion(final Xid xid, final boolean isCommit) throws Exception
+   {
+      return generateUniqueID();
+   }
+   
+   public void deleteHeuristicCompletion(final long txID) throws Exception
+   {
+   }
 
    /* (non-Javadoc)
     * @see org.hornetq.core.persistence.StorageManager#createLargeMessageStorage(long, int, int)

Modified: trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java	2009-10-01 02:50:16 UTC (rev 8018)
+++ trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java	2009-10-01 07:47:34 UTC (rev 8019)
@@ -782,9 +782,20 @@
 
             if (theTx == null)
             {
-               final String msg = "Cannot find xid in resource manager: " + xid;
-
-               response = new SessionXAResponseMessage(true, XAException.XAER_NOTA, msg);
+               // checked heuristic committed transactions
+               if (resourceManager.getHeuristicCommittedTransactions().contains(xid))
+               {
+                  response = new SessionXAResponseMessage(true, XAException.XA_HEURCOM, "transaction has been heuristically committed: " + xid);
+               }
+               // checked heuristic rolled back transactions
+               else if (resourceManager.getHeuristicRolledbackTransactions().contains(xid))
+               {
+                  response = new SessionXAResponseMessage(true, XAException.XA_HEURRB, "transaction has been heuristically rolled back: " + xid);
+               } 
+               else
+               {
+                  response = new SessionXAResponseMessage(true, XAException.XAER_NOTA, "Cannot find xid in resource manager: " + xid);
+               }
             }
             else
             {
@@ -901,11 +912,26 @@
 
    public void handleXAForget(final SessionXAForgetMessage packet)
    {
-      // Do nothing since we don't support heuristic commits / rollback from the
-      // resource manager
+      long id = resourceManager.removeHeuristicCompletion(packet.getXid());
+      int code = XAResource.XA_OK;
+      if (id != -1)
+      {
+         try
+         {
+            storageManager.deleteHeuristicCompletion(id);
+         }
+         catch (Exception e)
+         {
+            e.printStackTrace();
+            code = XAException.XAER_RMERR;
+         }
+      } else
+      {
+         code = XAException.XAER_NOTA;
+      }
+      
+      Packet response = new SessionXAResponseMessage((code != XAResource.XA_OK), code, null);
 
-      Packet response = new SessionXAResponseMessage(false, XAResource.XA_OK, null);
-
       channel.confirm(packet);
 
       channel.send(response);
@@ -1044,9 +1070,20 @@
 
             if (theTx == null)
             {
-               final String msg = "Cannot find xid in resource manager: " + xid;
-
-               response = new SessionXAResponseMessage(true, XAException.XAER_NOTA, msg);
+               // checked heuristic committed transactions
+               if (resourceManager.getHeuristicCommittedTransactions().contains(xid))
+               {
+                  response = new SessionXAResponseMessage(true, XAException.XA_HEURCOM, "transaction has ben heuristically committed: " + xid);
+               }
+               // checked heuristic rolled back transactions
+               else if (resourceManager.getHeuristicRolledbackTransactions().contains(xid))
+               {
+                  response = new SessionXAResponseMessage(true, XAException.XA_HEURRB, "transaction has ben heuristically rolled back: " + xid);
+               } 
+               else
+               {
+                  response = new SessionXAResponseMessage(true, XAException.XAER_NOTA, "Cannot find xid in resource manager: " + xid);
+               }
             }
             else
             {
@@ -1249,8 +1286,12 @@
 
    public void handleGetInDoubtXids(final Packet packet)
    {
-      Packet response = new SessionXAGetInDoubtXidsResponseMessage(resourceManager.getPreparedTransactions());
-
+      List<Xid> indoubtsXids = new ArrayList<Xid>();
+      indoubtsXids.addAll(resourceManager.getPreparedTransactions());
+      indoubtsXids.addAll(resourceManager.getHeuristicCommittedTransactions());
+      indoubtsXids.addAll(resourceManager.getHeuristicRolledbackTransactions());
+      Packet response = new SessionXAGetInDoubtXidsResponseMessage(indoubtsXids);
+      
       channel.confirm(packet);
 
       channel.send(response);

Modified: trunk/src/main/org/hornetq/core/transaction/ResourceManager.java
===================================================================
--- trunk/src/main/org/hornetq/core/transaction/ResourceManager.java	2009-10-01 02:50:16 UTC (rev 8018)
+++ trunk/src/main/org/hornetq/core/transaction/ResourceManager.java	2009-10-01 07:47:34 UTC (rev 8019)
@@ -43,4 +43,13 @@
    List<Xid> getPreparedTransactions();
 
    Map<Xid, Long> getPreparedTransactionsWithCreationTime();
+
+   void putHeuristicCompletion(long txid, Xid xid, boolean b);
+
+   long removeHeuristicCompletion(Xid xid);
+
+   List<Xid> getHeuristicCommittedTransactions();
+
+   List<Xid> getHeuristicRolledbackTransactions();
+
 }

Modified: trunk/src/main/org/hornetq/core/transaction/impl/ResourceManagerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/transaction/impl/ResourceManagerImpl.java	2009-10-01 02:50:16 UTC (rev 8018)
+++ trunk/src/main/org/hornetq/core/transaction/impl/ResourceManagerImpl.java	2009-10-01 07:47:34 UTC (rev 8019)
@@ -16,6 +16,7 @@
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -23,7 +24,6 @@
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.Future;
 import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
 
 import javax.transaction.xa.Xid;
 
@@ -44,6 +44,8 @@
 
    private final ConcurrentMap<Xid, Transaction> transactions = new ConcurrentHashMap<Xid, Transaction>();
 
+   private List<HeuristicCompletionHolder> heuristicCompletions = new ArrayList<HeuristicCompletionHolder>();
+
    private final int defaultTimeoutSeconds;
 
    private volatile int timeoutSeconds;
@@ -163,7 +165,50 @@
       }
       return xidsWithCreationTime;
    }
+   
+   public void putHeuristicCompletion(final long recordID, final Xid xid, final boolean isCommit)
+   {
+      heuristicCompletions.add(new HeuristicCompletionHolder(recordID, xid, isCommit));
+   }
+   
+   public List<Xid>getHeuristicCommittedTransactions()
+   {
+      return getHeuristicCompletedTransactions(true);
+   }
+   
+   public List<Xid>getHeuristicRolledbackTransactions()
+   {
+      return getHeuristicCompletedTransactions(false);
+   }
 
+   public long removeHeuristicCompletion(Xid xid)
+   {
+      Iterator<HeuristicCompletionHolder> iterator = heuristicCompletions.iterator();
+      while (iterator.hasNext())
+      {
+         ResourceManagerImpl.HeuristicCompletionHolder holder = (ResourceManagerImpl.HeuristicCompletionHolder)iterator.next();
+         if (holder.xid.equals(xid))
+         {
+            iterator.remove();
+            return holder.recordID;
+         }
+      }
+      return -1;
+   }
+   
+   private List<Xid>getHeuristicCompletedTransactions(boolean isCommit)
+   {
+      List<Xid> xids = new ArrayList<Xid>();
+      for (HeuristicCompletionHolder holder : heuristicCompletions)
+      {
+         if (holder.isCommit == isCommit)
+         {
+            xids.add(holder.xid);
+         }
+      }
+      return xids;
+   }
+
    class TxTimeoutHandler implements Runnable
    {
       private boolean closed = false;
@@ -220,4 +265,18 @@
       }
 
    }
+   
+   private class HeuristicCompletionHolder
+   {
+      public final boolean isCommit;
+      public final Xid xid;
+      public final long recordID;
+
+      public HeuristicCompletionHolder(long recordID, Xid xid, boolean isCommit)
+      {
+         this.recordID = recordID;
+         this.xid = xid;
+         this.isCommit = isCommit;
+      }
+   }
 }

Modified: trunk/tests/src/org/hornetq/tests/integration/client/HeuristicXATest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/client/HeuristicXATest.java	2009-10-01 02:50:16 UTC (rev 8018)
+++ trunk/tests/src/org/hornetq/tests/integration/client/HeuristicXATest.java	2009-10-01 07:47:34 UTC (rev 8019)
@@ -138,6 +138,9 @@
 
          System.out.println(preparedTransactions[0]);
 
+         assertEquals(0, jmxServer.listHeuristicCommittedTransactions().length);
+         assertEquals(0, jmxServer.listHeuristicRolledBackTransactions().length);
+
          if (isCommit)
          {
             jmxServer.commitPreparedTransaction(XidImpl.toBase64String(xid));
@@ -147,6 +150,109 @@
             jmxServer.rollbackPreparedTransaction(XidImpl.toBase64String(xid));
          }
 
+         assertEquals(0, jmxServer.listPreparedTransactions().length);
+         if (isCommit)
+         {
+            assertEquals(1, jmxServer.listHeuristicCommittedTransactions().length);
+            assertEquals(0, jmxServer.listHeuristicRolledBackTransactions().length);
+         }
+         else
+         {
+            assertEquals(0, jmxServer.listHeuristicCommittedTransactions().length);
+            assertEquals(1, jmxServer.listHeuristicRolledBackTransactions().length);
+         }
+
+         if (isCommit)
+         {
+            assertEquals(1, ((Queue)server.getPostOffice().getBinding(ADDRESS).getBindable()).getMessageCount());
+
+            session = sf.createSession(false, false, false);
+
+            session.start();
+            ClientConsumer consumer = session.createConsumer(ADDRESS);
+            msg = consumer.receive(1000);
+            assertNotNull(msg);
+            msg.acknowledge();
+            assertEquals(123, msg.getBodySize());
+
+            session.commit();
+            session.close();
+         }
+
+         assertEquals(0, ((Queue)server.getPostOffice().getBinding(ADDRESS).getBindable()).getMessageCount());
+
+      }
+      finally
+      {
+         if (server.isStarted())
+         {            
+            server.stop();
+         }
+      }
+      
+      
+
+   }
+   
+   public void testHeuristicCommitWithRestart() throws Exception
+   {
+      doHeuristicCompletionWithRestart(true);
+   }
+   
+   public void testHeuristicRollbackWithRestart() throws Exception
+   {
+      doHeuristicCompletionWithRestart(false);
+   }
+
+   private void doHeuristicCompletionWithRestart(final boolean isCommit) throws Exception
+   {
+      Configuration configuration = createDefaultConfig();
+      configuration.setJMXManagementEnabled(true);
+
+      HornetQServer server = createServer(true, configuration, mbeanServer, new HashMap<String, AddressSettings>());
+      try
+      {
+         server.start();
+         Xid xid = newXID();
+
+         ClientSessionFactory sf = createInVMFactory();
+
+         ClientSession session = sf.createSession(true, false, false);
+
+         session.createQueue(ADDRESS, ADDRESS, true);
+
+         session.start(xid, XAResource.TMNOFLAGS);
+
+         ClientProducer producer = session.createProducer(ADDRESS);
+
+         ClientMessage msg = session.createClientMessage(true);
+
+         msg.getBody().writeBytes(new byte[123]);
+
+         producer.send(msg);
+
+         session.end(xid, XAResource.TMSUCCESS);
+
+         session.prepare(xid);
+
+         session.close();
+
+         HornetQServerControl jmxServer = ManagementControlHelper.createHornetQServerControl(mbeanServer);
+
+         String preparedTransactions[] = jmxServer.listPreparedTransactions();
+
+         assertEquals(1, preparedTransactions.length);
+         System.out.println(preparedTransactions[0]);
+
+         if (isCommit)
+         {
+            jmxServer.commitPreparedTransaction(XidImpl.toBase64String(xid));
+         }
+         else
+         {
+            jmxServer.rollbackPreparedTransaction(XidImpl.toBase64String(xid));
+         }
+
          preparedTransactions = jmxServer.listPreparedTransactions();
          assertEquals(0, preparedTransactions.length);
 
@@ -169,6 +275,22 @@
 
          assertEquals(0, ((Queue)server.getPostOffice().getBinding(ADDRESS).getBindable()).getMessageCount());
 
+         server.stop();
+         
+         server.start();
+
+         jmxServer = ManagementControlHelper.createHornetQServerControl(mbeanServer);
+         if (isCommit)
+         {
+            String[] listHeuristicCommittedTransactions = jmxServer.listHeuristicCommittedTransactions();
+            assertEquals(1, listHeuristicCommittedTransactions.length);
+            System.out.println(listHeuristicCommittedTransactions[0]);
+         } else
+         {
+            String[] listHeuristicRolledBackTransactions = jmxServer.listHeuristicRolledBackTransactions();
+            assertEquals(1, listHeuristicRolledBackTransactions.length);
+            System.out.println(listHeuristicRolledBackTransactions[0]);
+         }
       }
       finally
       {
@@ -177,9 +299,222 @@
             server.stop();
          }
       }
+   }
+   
+   public void testRecoverHeuristicCommitWithRestart() throws Exception
+   {
+      doRecoverHeuristicCompletedTxWithRestart(true);
+   }
+   
+   public void testRecoverHeuristicRollbackWithRestart() throws Exception
+   {
+      doRecoverHeuristicCompletedTxWithRestart(false);
+   }
 
+   private void doRecoverHeuristicCompletedTxWithRestart(boolean heuristicCommit) throws Exception
+   {
+      Configuration configuration = createDefaultConfig();
+      configuration.setJMXManagementEnabled(true);
+
+      HornetQServer server = createServer(true, configuration, mbeanServer, new HashMap<String, AddressSettings>());
+      try
+      {
+         server.start();
+         Xid xid = newXID();
+
+         ClientSessionFactory sf = createInVMFactory();
+
+         ClientSession session = sf.createSession(true, false, false);
+
+         session.createQueue(ADDRESS, ADDRESS, true);
+
+         session.start(xid, XAResource.TMNOFLAGS);
+
+         ClientProducer producer = session.createProducer(ADDRESS);
+
+         ClientMessage msg = session.createClientMessage(true);
+
+         msg.getBody().writeBytes(new byte[123]);
+
+         producer.send(msg);
+
+         session.end(xid, XAResource.TMSUCCESS);
+
+         session.prepare(xid);
+
+         session.close();
+
+         HornetQServerControl jmxServer = ManagementControlHelper.createHornetQServerControl(mbeanServer);
+
+         String preparedTransactions[] = jmxServer.listPreparedTransactions();
+
+         assertEquals(1, preparedTransactions.length);
+         System.out.println(preparedTransactions[0]);
+
+         if (heuristicCommit)
+         {
+            jmxServer.commitPreparedTransaction(XidImpl.toBase64String(xid));
+         }
+         else
+         {
+            jmxServer.rollbackPreparedTransaction(XidImpl.toBase64String(xid));
+         }
+
+         preparedTransactions = jmxServer.listPreparedTransactions();
+         assertEquals(0, preparedTransactions.length);
+
+         if (heuristicCommit)
+         {
+            assertEquals(1, ((Queue)server.getPostOffice().getBinding(ADDRESS).getBindable()).getMessageCount());
+
+            session = sf.createSession(false, false, false);
+
+            session.start();
+            ClientConsumer consumer = session.createConsumer(ADDRESS);
+            msg = consumer.receive(1000);
+            assertNotNull(msg);
+            msg.acknowledge();
+            assertEquals(123, msg.getBodySize());
+
+            session.commit();
+            session.close();
+         }
+
+         assertEquals(0, ((Queue)server.getPostOffice().getBinding(ADDRESS).getBindable()).getMessageCount());
+
+         server.stop();
+         
+         server.start();
+
+         jmxServer = ManagementControlHelper.createHornetQServerControl(mbeanServer);
+         if (heuristicCommit)
+         {
+            String[] listHeuristicCommittedTransactions = jmxServer.listHeuristicCommittedTransactions();
+            assertEquals(1, listHeuristicCommittedTransactions.length);
+            System.out.println(listHeuristicCommittedTransactions[0]);
+         } else
+         {
+            String[] listHeuristicRolledBackTransactions = jmxServer.listHeuristicRolledBackTransactions();
+            assertEquals(1, listHeuristicRolledBackTransactions.length);
+            System.out.println(listHeuristicRolledBackTransactions[0]);
+         }
+         
+         session = sf.createSession(true, false, false);
+         Xid[] recoveredXids = session.recover(XAResource.TMSTARTRSCAN);
+         assertEquals(1, recoveredXids.length);
+         assertEquals(xid, recoveredXids[0]);         
+         assertEquals(0, session.recover(XAResource.TMENDRSCAN).length);
+      }
+      finally
+      {
+         if (server.isStarted())
+         {            
+            server.stop();
+         }
+      }
    }
+   
+   public void testForgetHeuristicCommitAndRestart() throws Exception
+   {
+      doForgetHeuristicCompletedTxAndRestart(true);
+   }
 
+   public void testForgetHeuristicRollbackAndRestart() throws Exception
+   {
+      doForgetHeuristicCompletedTxAndRestart(false);
+   }
+
+   private void doForgetHeuristicCompletedTxAndRestart(boolean heuristicCommit) throws Exception
+   {
+      Configuration configuration = createDefaultConfig();
+      configuration.setJMXManagementEnabled(true);
+
+      HornetQServer server = createServer(true, configuration, mbeanServer, new HashMap<String, AddressSettings>());
+      try
+      {
+         server.start();
+         Xid xid = newXID();
+
+         ClientSessionFactory sf = createInVMFactory();
+
+         ClientSession session = sf.createSession(true, false, false);
+
+         session.createQueue(ADDRESS, ADDRESS, true);
+
+         session.start(xid, XAResource.TMNOFLAGS);
+
+         ClientProducer producer = session.createProducer(ADDRESS);
+
+         ClientMessage msg = session.createClientMessage(true);
+
+         msg.getBody().writeBytes(new byte[123]);
+
+         producer.send(msg);
+
+         session.end(xid, XAResource.TMSUCCESS);
+
+         session.prepare(xid);
+
+         HornetQServerControl jmxServer = ManagementControlHelper.createHornetQServerControl(mbeanServer);
+
+         String preparedTransactions[] = jmxServer.listPreparedTransactions();
+
+         assertEquals(1, preparedTransactions.length);
+         System.out.println(preparedTransactions[0]);
+
+         if (heuristicCommit)
+         {
+            jmxServer.commitPreparedTransaction(XidImpl.toBase64String(xid));
+         }
+         else
+         {
+            jmxServer.rollbackPreparedTransaction(XidImpl.toBase64String(xid));
+         }
+
+         preparedTransactions = jmxServer.listPreparedTransactions();
+         assertEquals(0, preparedTransactions.length);
+
+         session.forget(xid);
+
+         session.close();
+
+         if (heuristicCommit)
+         {
+          assertEquals(0, jmxServer.listHeuristicCommittedTransactions().length);  
+         }
+         else
+         {
+            assertEquals(0, jmxServer.listHeuristicRolledBackTransactions().length);              
+         }
+         
+
+         server.stop();
+         
+         server.start();
+
+         session = sf.createSession(true, false, false);
+         Xid[] recoveredXids = session.recover(XAResource.TMSTARTRSCAN);
+         assertEquals(0, recoveredXids.length);
+         jmxServer = ManagementControlHelper.createHornetQServerControl(mbeanServer);
+         if (heuristicCommit)
+         {
+          assertEquals(0, jmxServer.listHeuristicCommittedTransactions().length);  
+         }
+         else
+         {
+            assertEquals(0, jmxServer.listHeuristicRolledBackTransactions().length);              
+         }
+         
+      }
+      finally
+      {
+         if (server.isStarted())
+         {            
+            server.stop();
+         }
+      }
+   }
+
    // Package protected ---------------------------------------------
 
    // Protected -----------------------------------------------------

Modified: trunk/tests/src/org/hornetq/tests/integration/management/HornetQServerControlUsingCoreTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/management/HornetQServerControlUsingCoreTest.java	2009-10-01 02:50:16 UTC (rev 8018)
+++ trunk/tests/src/org/hornetq/tests/integration/management/HornetQServerControlUsingCoreTest.java	2009-10-01 07:47:34 UTC (rev 8019)
@@ -362,6 +362,16 @@
          {
             return (String[])proxy.invokeOperation("listPreparedTransactions");
          }
+         
+         public String[] listHeuristicCommittedTransactions() throws Exception
+         {
+            return (String[])proxy.invokeOperation("listHeuristicCommittedTransactions");
+         }
+         
+         public String[] listHeuristicRolledBackTransactions() throws Exception
+         {
+            return (String[])proxy.invokeOperation("listHeuristicRolledBackTransactions");
+         }
 
          public String[] listRemoteAddresses() throws Exception
          {

Modified: trunk/tests/src/org/hornetq/tests/integration/xa/BasicXaTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/xa/BasicXaTest.java	2009-10-01 02:50:16 UTC (rev 8018)
+++ trunk/tests/src/org/hornetq/tests/integration/xa/BasicXaTest.java	2009-10-01 07:47:34 UTC (rev 8019)
@@ -397,11 +397,76 @@
 
    }
 
-   public void testForget() throws Exception
+   public void testForgetUnknownXID() throws Exception
    {
-      clientSession.forget(newXID());
+      try
+      {
+         clientSession.forget(newXID());
+         fail("should throw a XAERR_NOTA XAException");
+      }
+      catch (XAException e)
+      {
+         assertEquals(XAException.XAER_NOTA, e.errorCode);
+      }
    }
 
+   public void testForgetHeuristicallyCommittedXID() throws Exception
+   {
+      Xid xid = newXID();
+      clientSession.start(xid, XAResource.TMNOFLAGS);
+      clientSession.end(xid, XAResource.TMSUCCESS);
+      clientSession.prepare(xid);
+
+      String[] preparedTransactions = messagingService.getHornetQServerControl().listPreparedTransactions();
+      assertEquals(1, preparedTransactions.length);
+      System.out.println(preparedTransactions[0]);
+      assertTrue(messagingService.getHornetQServerControl().commitPreparedTransaction(XidImpl.toBase64String(xid)));
+      assertEquals(1, messagingService.getHornetQServerControl().listHeuristicCommittedTransactions().length);
+
+      clientSession.forget(xid);
+
+      assertEquals(0, messagingService.getHornetQServerControl().listHeuristicCommittedTransactions().length);
+   }
+
+   public void testForgetHeuristicallyRolledBackXID() throws Exception
+   {
+      Xid xid = newXID();
+      clientSession.start(xid, XAResource.TMNOFLAGS);
+      clientSession.end(xid, XAResource.TMSUCCESS);
+      clientSession.prepare(xid);
+
+      String[] preparedTransactions = messagingService.getHornetQServerControl().listPreparedTransactions();
+      assertEquals(1, preparedTransactions.length);
+      System.out.println(preparedTransactions[0]);
+
+      assertTrue(messagingService.getHornetQServerControl().rollbackPreparedTransaction(XidImpl.toBase64String(xid)));      
+      assertEquals(1, messagingService.getHornetQServerControl().listHeuristicRolledBackTransactions().length);
+
+      clientSession.forget(xid);
+
+      assertEquals(0, messagingService.getHornetQServerControl().listHeuristicRolledBackTransactions().length);
+   }
+
+   public void testCommitHeuristicallyCommittedXID() throws Exception
+   {
+      doCompleteHeuristicallyCompletedXID(true, true);
+   }
+
+   public void testCommitHeuristicallyRolledBackXID() throws Exception
+   {
+      doCompleteHeuristicallyCompletedXID(true, false);
+   }
+
+   public void testRollbacktHeuristicallyCommittedXID() throws Exception
+   {
+      doCompleteHeuristicallyCompletedXID(false, true);
+   }
+
+   public void testRollbackHeuristicallyRolledBackXID() throws Exception
+   {
+      doCompleteHeuristicallyCompletedXID(false, false);
+   }
+
    public void testSimpleJoin() throws Exception
    {
       SimpleString ADDRESS1 = new SimpleString("Address-1");
@@ -666,6 +731,62 @@
       }
    }
 
+   private void doCompleteHeuristicallyCompletedXID(boolean isCommit, boolean heuristicCommit) throws Exception
+   {
+      Xid xid = newXID();
+      clientSession.start(xid, XAResource.TMNOFLAGS);
+      clientSession.end(xid, XAResource.TMSUCCESS);
+      clientSession.prepare(xid);
+
+      String[] preparedTransactions = messagingService.getHornetQServerControl().listPreparedTransactions();
+      assertEquals(1, preparedTransactions.length);
+
+      if (heuristicCommit)
+      {
+         assertTrue(messagingService.getHornetQServerControl().commitPreparedTransaction(XidImpl.toBase64String(xid)));      
+         assertEquals(1, messagingService.getHornetQServerControl().listHeuristicCommittedTransactions().length);
+      }
+      else
+      {
+         assertTrue(messagingService.getHornetQServerControl().rollbackPreparedTransaction(XidImpl.toBase64String(xid)));      
+         assertEquals(1, messagingService.getHornetQServerControl().listHeuristicRolledBackTransactions().length);         
+      }
+      assertEquals(0, messagingService.getHornetQServerControl().listPreparedTransactions().length);
+
+      try
+      {
+         if (isCommit)
+         {
+            clientSession.commit(xid, false);
+         } else
+         {
+            clientSession.rollback(xid);
+         }
+         fail("neither commit not rollback must succeed on a heuristically completed tx");
+      }
+
+      catch (XAException e)
+      {
+         if (heuristicCommit)
+         {
+            assertEquals(XAException.XA_HEURCOM, e.errorCode);
+         }
+         else
+         {
+            assertEquals(XAException.XA_HEURRB, e.errorCode);
+         }
+      }
+
+      if (heuristicCommit)
+      {
+         assertEquals(1, messagingService.getHornetQServerControl().listHeuristicCommittedTransactions().length);
+      }
+      else
+      {
+         assertEquals(1, messagingService.getHornetQServerControl().listHeuristicRolledBackTransactions().length);
+      }
+   }
+   
    class TxMessageHandler implements MessageHandler
    {
       boolean failedToAck = false;

Modified: trunk/tests/src/org/hornetq/tests/unit/core/deployers/impl/QueueDeployerTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/deployers/impl/QueueDeployerTest.java	2009-10-01 02:50:16 UTC (rev 8018)
+++ trunk/tests/src/org/hornetq/tests/unit/core/deployers/impl/QueueDeployerTest.java	2009-10-01 07:47:34 UTC (rev 8019)
@@ -461,7 +461,17 @@
 
          return null;
       }
-
+      
+      public String[] listHeuristicCommittedTransactions() throws Exception
+      {
+         return null;
+      }
+      
+      public String[] listHeuristicRolledBackTransactions() throws Exception
+      {
+         return null;
+      }
+      
       public String[] listRemoteAddresses() throws Exception
       {
 

Modified: trunk/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java	2009-10-01 02:50:16 UTC (rev 8018)
+++ trunk/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java	2009-10-01 07:47:34 UTC (rev 8019)
@@ -1034,6 +1034,15 @@
       {
       }
 
+      public long storeHeuristicCompletion(Xid xid, boolean isCommit) throws Exception
+      {
+         return -1;
+      }
+      
+      public void deleteHeuristicCompletion(long txID) throws Exception
+      {
+      }
+
       /* (non-Javadoc)
        * @see org.hornetq.core.persistence.StorageManager#updateDeliveryCount(org.hornetq.core.server.MessageReference)
        */



More information about the hornetq-commits mailing list