[jbosscache-commits] JBoss Cache SVN: r5400 - in core/trunk/src: main/java/org/jboss/cache/interceptors and 3 other directories.

jbosscache-commits at lists.jboss.org jbosscache-commits at lists.jboss.org
Fri Mar 7 12:05:23 EST 2008


Author: manik.surtani at jboss.com
Date: 2008-03-07 12:05:23 -0500 (Fri, 07 Mar 2008)
New Revision: 5400

Modified:
   core/trunk/src/main/java/org/jboss/cache/CacheImpl.java
   core/trunk/src/main/java/org/jboss/cache/interceptors/OrderedSynchronizationHandler.java
   core/trunk/src/main/java/org/jboss/cache/interceptors/TxInterceptor.java
   core/trunk/src/main/java/org/jboss/cache/transaction/OptimisticTransactionEntry.java
   core/trunk/src/main/java/org/jboss/cache/transaction/TransactionEntry.java
   core/trunk/src/test/java/org/jboss/cache/loader/TcpCacheLoaderTest.java
   core/trunk/src/test/java/org/jboss/cache/transaction/AbortionTest.java
   core/trunk/src/test/java/org/jboss/cache/transaction/InvocationContextCleanupTest.java
   core/trunk/src/test/java/org/jboss/cache/transaction/NotifyingTransactionManager.java
Log:
JBCACHE-1304 - OrderedSynchronizationHandler thread safety and scoping issues

Modified: core/trunk/src/main/java/org/jboss/cache/CacheImpl.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/CacheImpl.java	2008-03-07 15:48:54 UTC (rev 5399)
+++ core/trunk/src/main/java/org/jboss/cache/CacheImpl.java	2008-03-07 17:05:23 UTC (rev 5400)
@@ -2115,8 +2115,16 @@
          Address addr = rpcManager.getLocalAddress();
          gtx = GlobalTransaction.create(addr);
          transactionTable.put(tx, gtx);
-         TransactionEntry ent = configuration.isNodeLockingOptimistic() ? new OptimisticTransactionEntry() : new TransactionEntry();
-         ent.setTransaction(tx);
+         TransactionEntry ent = null;
+         try
+         {
+            ent = configuration.isNodeLockingOptimistic() ? new OptimisticTransactionEntry(tx) : new TransactionEntry(tx);
+         }
+         catch (Exception e)
+         {
+            throw new CacheException("Unable to create a transaction entry!", e);
+         }
+
          transactionTable.put(gtx, ent);
          if (trace)
          {

Modified: core/trunk/src/main/java/org/jboss/cache/interceptors/OrderedSynchronizationHandler.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/interceptors/OrderedSynchronizationHandler.java	2008-03-07 15:48:54 UTC (rev 5399)
+++ core/trunk/src/main/java/org/jboss/cache/interceptors/OrderedSynchronizationHandler.java	2008-03-07 17:05:23 UTC (rev 5400)
@@ -7,9 +7,7 @@
 import javax.transaction.Synchronization;
 import javax.transaction.SystemException;
 import javax.transaction.Transaction;
-import java.util.HashMap;
 import java.util.LinkedList;
-import java.util.Map;
 
 /**
  * Maintains a list of Synchronization handlers. Reason is that we have to
@@ -26,36 +24,15 @@
    private Transaction tx = null;
    private LinkedList<Synchronization> handlers = new LinkedList<Synchronization>();
 
-   /**
-    * Map<Transaction,OrderedSynchronizationHandler>
-    */
-   static Map instances = new HashMap();
-
    static Log log = LogFactory.getLog(OrderedSynchronizationHandler.class);
 
 
-   private OrderedSynchronizationHandler(Transaction tx)
+   public OrderedSynchronizationHandler(Transaction tx) throws SystemException, RollbackException
    {
       this.tx = tx;
+      tx.registerSynchronization(this);
    }
 
-   /**
-    * Creates a new instance of OrderedSynchronizationHandler, or fetches an existing instance. Key is the local
-    * transaction (tx). This instance registers with the TransactionManager automatically
-    *
-    * @param tx
-    */
-   public static OrderedSynchronizationHandler getInstance(Transaction tx) throws SystemException, RollbackException
-   {
-      OrderedSynchronizationHandler retval = (OrderedSynchronizationHandler) instances.get(tx);
-      if (retval != null) return retval;
-      retval = new OrderedSynchronizationHandler(tx);
-      tx.registerSynchronization(retval);
-      instances.put(tx, retval);
-      return retval;
-   }
-
-
    public void registerAtHead(Synchronization handler)
    {
       register(handler, true);
@@ -101,9 +78,6 @@
          }
       }
 
-      // finally unregister us from the hashmap
-      instances.remove(tx);
-
       // throw the exception so the TM can deal with it.
       if (exceptionInAfterCompletion != null) throw exceptionInAfterCompletion;
    }
@@ -114,13 +88,13 @@
       sb.append("tx=" + getTxAsString() + ", handlers=" + handlers);
       return sb.toString();
    }
-   
+
    private String getTxAsString()
    {
       // JBCACHE-1114 -- don't call toString() on tx or it can lead to stack overflow
       if (tx == null)
          return null;
-      
+
       return tx.getClass().getName() + "@" + System.identityHashCode(tx);
    }
 }

Modified: core/trunk/src/main/java/org/jboss/cache/interceptors/TxInterceptor.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/interceptors/TxInterceptor.java	2008-03-07 15:48:54 UTC (rev 5399)
+++ core/trunk/src/main/java/org/jboss/cache/interceptors/TxInterceptor.java	2008-03-07 17:05:23 UTC (rev 5400)
@@ -383,20 +383,24 @@
          // entry for TX in txTable, the modifications
          // below will need this entry to add their modifications
          // under the GlobalTx key
+         TransactionEntry entry;
          if (txTable.get(gtx) == null)
          {
             // create a new transaction entry
 
-            TransactionEntry entry = configuration.isNodeLockingOptimistic() ? new OptimisticTransactionEntry() : new TransactionEntry();
-            entry.setTransaction(ltx);
+            entry = configuration.isNodeLockingOptimistic() ? new OptimisticTransactionEntry(ltx) : new TransactionEntry(ltx);
             log.debug("creating new tx entry");
             txTable.put(gtx, entry);
             if (trace) log.trace("TxTable contents: " + txTable);
          }
+         else
+         {
+            entry = txTable.get(gtx);
+         }
 
          setTransactionalContext(ltx, gtx, ctx);
          // register a sync handler for this tx.
-         registerHandler(ltx, new RemoteSynchronizationHandler(gtx, ltx), ctx);
+         registerHandler(ltx, new RemoteSynchronizationHandler(gtx, ltx), ctx, entry);
 
          if (configuration.isNodeLockingOptimistic())
          {
@@ -1054,7 +1058,7 @@
             }
             // see the comment in the LocalSyncHandler for the last isOriginLocal param.
             LocalSynchronizationHandler myHandler = new LocalSynchronizationHandler(gtx, tx, !ctx.isOriginLocal());
-            registerHandler(tx, myHandler, ctx);
+            registerHandler(tx, myHandler, ctx, txTable.get(gtx));
          }
       }
       else if ((gtx = (GlobalTransaction) rollbackTransactions.get(tx)) != null)
@@ -1076,9 +1080,9 @@
     * @param handler
     * @throws Exception
     */
-   private void registerHandler(Transaction tx, Synchronization handler, InvocationContext ctx) throws Exception
+   private void registerHandler(Transaction tx, Synchronization handler, InvocationContext ctx, TransactionEntry entry) throws Exception
    {
-      OrderedSynchronizationHandler orderedHandler = OrderedSynchronizationHandler.getInstance(tx);
+      OrderedSynchronizationHandler orderedHandler = entry.getOrderedSynchronizationHandler(); //OrderedSynchronizationHandler.getInstance(tx);
 
       if (trace) log.trace("registering for TX completion: SynchronizationHandler(" + handler + ")");
 

Modified: core/trunk/src/main/java/org/jboss/cache/transaction/OptimisticTransactionEntry.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/transaction/OptimisticTransactionEntry.java	2008-03-07 15:48:54 UTC (rev 5399)
+++ core/trunk/src/main/java/org/jboss/cache/transaction/OptimisticTransactionEntry.java	2008-03-07 17:05:23 UTC (rev 5400)
@@ -9,6 +9,10 @@
 import org.jboss.cache.optimistic.TransactionWorkspace;
 import org.jboss.cache.optimistic.TransactionWorkspaceImpl;
 
+import javax.transaction.RollbackException;
+import javax.transaction.SystemException;
+import javax.transaction.Transaction;
+
 /**
  * Subclasses the {@link TransactionEntry} class to add a {@link TransactionWorkspace}.  Used with optimistic locking
  * where each call is assigned a trasnaction and a transaction workspace.
@@ -21,6 +25,11 @@
 {
    private TransactionWorkspace transactionWorkSpace = new TransactionWorkspaceImpl();
 
+   public OptimisticTransactionEntry(Transaction tx) throws SystemException, RollbackException
+   {
+      super(tx);
+   }
+
    public String toString()
    {
       StringBuffer sb = new StringBuffer(super.toString());

Modified: core/trunk/src/main/java/org/jboss/cache/transaction/TransactionEntry.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/transaction/TransactionEntry.java	2008-03-07 15:48:54 UTC (rev 5399)
+++ core/trunk/src/main/java/org/jboss/cache/transaction/TransactionEntry.java	2008-03-07 17:05:23 UTC (rev 5400)
@@ -15,11 +15,14 @@
 import org.jboss.cache.Fqn;
 import org.jboss.cache.Modification;
 import org.jboss.cache.config.Option;
+import org.jboss.cache.interceptors.OrderedSynchronizationHandler;
 import org.jboss.cache.invocation.CacheInvocationDelegate;
 import org.jboss.cache.lock.IdentityLock;
 import org.jboss.cache.lock.NodeLock;
 import org.jboss.cache.marshall.MethodCall;
 
+import javax.transaction.RollbackException;
+import javax.transaction.SystemException;
 import javax.transaction.Transaction;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -57,6 +60,7 @@
     */
    private Transaction ltx = null;
    private Option option;
+   private OrderedSynchronizationHandler orderedSynchronizationHandler;
 
    private boolean forceAsyncReplication = false;
    private boolean forceSyncReplication = false;
@@ -97,6 +101,12 @@
     */
    private List<Fqn> removedNodes = new LinkedList<Fqn>();
 
+   public TransactionEntry(Transaction tx) throws SystemException, RollbackException
+   {
+      ltx = tx;
+      orderedSynchronizationHandler = new OrderedSynchronizationHandler(tx);
+   }
+
    /**
     * Adds a modification to the modification list.
     */
@@ -412,4 +422,14 @@
    {
       return this.option;
    }
+
+   public OrderedSynchronizationHandler getOrderedSynchronizationHandler()
+   {
+      return orderedSynchronizationHandler;
+   }
+
+   public void setOrderedSynchronizationHandler(OrderedSynchronizationHandler orderedSynchronizationHandler)
+   {
+      this.orderedSynchronizationHandler = orderedSynchronizationHandler;
+   }
 }

Modified: core/trunk/src/test/java/org/jboss/cache/loader/TcpCacheLoaderTest.java
===================================================================
--- core/trunk/src/test/java/org/jboss/cache/loader/TcpCacheLoaderTest.java	2008-03-07 15:48:54 UTC (rev 5399)
+++ core/trunk/src/test/java/org/jboss/cache/loader/TcpCacheLoaderTest.java	2008-03-07 17:05:23 UTC (rev 5400)
@@ -1,8 +1,8 @@
 package org.jboss.cache.loader;
 
 import org.jboss.cache.CacheException;
+import org.jboss.cache.CacheSPI;
 import org.jboss.cache.DefaultCacheFactory;
-import org.jboss.cache.CacheSPI;
 import org.jboss.cache.config.Configuration;
 import org.jboss.cache.factories.UnitTestCacheConfigurationFactory;
 import org.jboss.cache.interceptors.OrderedSynchronizationHandler;
@@ -11,6 +11,7 @@
 import org.jboss.cache.notifications.annotation.CacheListener;
 import org.jboss.cache.notifications.annotation.NodeCreated;
 import org.jboss.cache.notifications.event.Event;
+import org.jboss.cache.transaction.GlobalTransaction;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
@@ -184,7 +185,15 @@
    {
       int oldStartCount = START_COUNT;
       cache.getTransactionManager().begin();
-      OrderedSynchronizationHandler.getInstance(cache.getTransactionManager().getTransaction()).registerAtTail(
+
+      cache.put(FQN, "key", "value");
+      cache.put(FQN, "key2", "value2");
+
+      GlobalTransaction gtx = cache.getTransactionTable().get(cache.getTransactionManager().getTransaction());
+      OrderedSynchronizationHandler osh = cache.getTransactionTable().get(gtx).getOrderedSynchronizationHandler();
+
+//      OrderedSynchronizationHandler.getInstance(cache.getTransactionManager().getTransaction()).registerAtTail(
+      osh.registerAtTail(
             new Synchronization()
             {
 
@@ -201,8 +210,6 @@
             }
       );
 
-      cache.put(FQN, "key", "value");
-      cache.put(FQN, "key2", "value2");
       cache.getTransactionManager().commit();
 
       Map m = new HashMap();

Modified: core/trunk/src/test/java/org/jboss/cache/transaction/AbortionTest.java
===================================================================
--- core/trunk/src/test/java/org/jboss/cache/transaction/AbortionTest.java	2008-03-07 15:48:54 UTC (rev 5399)
+++ core/trunk/src/test/java/org/jboss/cache/transaction/AbortionTest.java	2008-03-07 17:05:23 UTC (rev 5400)
@@ -34,25 +34,16 @@
    @BeforeMethod(alwaysRun = true)
    public void setUp() throws Exception
    {
-      System.out.println("********* START: SET UP *************");
       cache1 = initCache(false);
       TestingUtil.sleepThread(1500); // to ensure cache1 is the coordinator
       cache2 = initCache(false);
       cache3 = initCache(true);
-      System.out.println("********* END: SET UP *************");
    }
 
    @AfterMethod(alwaysRun = true)
    public void tearDown() throws Exception
    {
-      System.out.println("********* START: TEAR DOWN *************");
-      destroyCache(cache3);
-      destroyCache(cache2);
-      destroyCache(cache1);
-      cache1 = null;
-      cache2 = null;
-      cache3 = null;
-      System.out.println("********* END: TEAR DOWN *************");
+      TestingUtil.killCaches(cache3, cache2, cache1);
    }
 
    private CacheSPI initCache(boolean notifying)
@@ -121,6 +112,7 @@
       TransactionManager mgr2 = cache2.getTransactionManager();
       assertTrue(cache3.getTransactionManager() instanceof NotifyingTransactionManager);
       NotifyingTransactionManager mgr3 = (NotifyingTransactionManager) cache3.getTransactionManager();
+      mgr3.setCache(cache3);
 
       assertSame(mgr1, mgr2);
       assertNotSame(mgr1, mgr3);
@@ -160,8 +152,10 @@
          this.abortBeforeCompletion = abortBeforeCompletion;
       }
 
-      public void notify(Transaction tx) throws SystemException, RollbackException
+      public void notify(Transaction tx, TransactionEntry entry) throws SystemException, RollbackException
       {
+         OrderedSynchronizationHandler osh = entry.getOrderedSynchronizationHandler();
+
          final Transaction finalTx = tx;
          System.out.println("Notify called.");
          // add an aborting sync handler.
@@ -197,7 +191,6 @@
             }
          };
 
-         OrderedSynchronizationHandler osh = OrderedSynchronizationHandler.getInstance(tx);
          osh.registerAtHead(abort);
          System.out.println("Added sync handler.");
       }

Modified: core/trunk/src/test/java/org/jboss/cache/transaction/InvocationContextCleanupTest.java
===================================================================
--- core/trunk/src/test/java/org/jboss/cache/transaction/InvocationContextCleanupTest.java	2008-03-07 15:48:54 UTC (rev 5399)
+++ core/trunk/src/test/java/org/jboss/cache/transaction/InvocationContextCleanupTest.java	2008-03-07 17:05:23 UTC (rev 5400)
@@ -81,10 +81,13 @@
 
       mgr.begin();
 
-      OrderedSynchronizationHandler orderedHandler = OrderedSynchronizationHandler.getInstance(mgr.getTransaction());
+      cache0.put("/test", "x", "y");
+
+      GlobalTransaction gtx = cache0.getTransactionTable().get(mgr.getTransaction());
+      OrderedSynchronizationHandler orderedHandler = cache0.getTransactionTable().get(gtx).getOrderedSynchronizationHandler();
+//      OrderedSynchronizationHandler orderedHandler = OrderedSynchronizationHandler.getInstance(mgr.getTransaction());
       orderedHandler.registerAtTail(new DummySynchronization(cache0, mgr));
 
-      cache0.put("/test", "x", "y");
       try
       {
          mgr.commit();

Modified: core/trunk/src/test/java/org/jboss/cache/transaction/NotifyingTransactionManager.java
===================================================================
--- core/trunk/src/test/java/org/jboss/cache/transaction/NotifyingTransactionManager.java	2008-03-07 15:48:54 UTC (rev 5399)
+++ core/trunk/src/test/java/org/jboss/cache/transaction/NotifyingTransactionManager.java	2008-03-07 17:05:23 UTC (rev 5400)
@@ -7,7 +7,10 @@
 package org.jboss.cache.transaction;
 
 
-import javax.transaction.NotSupportedException;
+import org.jboss.cache.CacheSPI;
+
+import javax.transaction.HeuristicMixedException;
+import javax.transaction.HeuristicRollbackException;
 import javax.transaction.RollbackException;
 import javax.transaction.SystemException;
 import javax.transaction.Transaction;
@@ -24,18 +27,35 @@
    private static final long serialVersionUID = -2994163352889758708L;
 
    private Notification notification;
+   private CacheSPI cache;
 
-   public void begin() throws SystemException, NotSupportedException
+   @Override
+   public void commit() throws HeuristicMixedException, SystemException, HeuristicRollbackException, RollbackException
    {
-      super.begin();
+      notifyListeners();
+      super.commit();
+   }
+
+   @Override
+   public void rollback() throws SystemException
+   {
+      notifyListeners();
+      super.rollback();
+   }
+
+   private void notifyListeners()
+   {
       try
       {
-         System.out.println("Calling notification.notify()");
-         notification.notify(getTransaction());
+         log.debug("Calling notification.notify()");
+         TransactionTable txTable = cache.getTransactionTable();
+         Transaction tx = getTransaction();
+         GlobalTransaction gtx = txTable.get(tx);
+         notification.notify(tx, txTable.get(gtx));
       }
-      catch (RollbackException e)
+      catch (Exception e)
       {
-         e.printStackTrace();
+         log.debug(e);
       }
    }
 
@@ -46,15 +66,23 @@
 
    public interface Notification
    {
-      public void notify(Transaction tx) throws SystemException, RollbackException;
+      public void notify(Transaction tx, TransactionEntry entry) throws SystemException, RollbackException;
    }
-   
 
+   public CacheSPI getCache()
+   {
+      return cache;
+   }
+
+   public void setCache(CacheSPI cache)
+   {
+      this.cache = cache;
+   }
+
    public Notification getNotification()
    {
       return notification;
    }
-   
 
    public void setNotification(Notification notification)
    {




More information about the jbosscache-commits mailing list