[jbosscache-commits] JBoss Cache SVN: r4553 - in core/trunk/src: main/java/org/jboss/cache/marshall and 1 other directories.

jbosscache-commits at lists.jboss.org jbosscache-commits at lists.jboss.org
Fri Oct 5 08:21:47 EDT 2007


Author: manik.surtani at jboss.com
Date: 2007-10-05 08:21:47 -0400 (Fri, 05 Oct 2007)
New Revision: 4553

Modified:
   core/trunk/src/main/java/org/jboss/cache/interceptors/InvalidationInterceptor.java
   core/trunk/src/main/java/org/jboss/cache/marshall/AbstractMarshaller.java
   core/trunk/src/test/java/org/jboss/cache/invalidation/InvalidationInterceptorTest.java
   core/trunk/src/test/java/org/jboss/cache/invalidation/VersionInconsistencyTest.java
Log:
Fixed invalidation issues and updated tests

Modified: core/trunk/src/main/java/org/jboss/cache/interceptors/InvalidationInterceptor.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/interceptors/InvalidationInterceptor.java	2007-10-05 09:52:00 UTC (rev 4552)
+++ core/trunk/src/main/java/org/jboss/cache/interceptors/InvalidationInterceptor.java	2007-10-05 12:21:47 UTC (rev 4553)
@@ -13,7 +13,10 @@
 import org.jboss.cache.marshall.MethodCall;
 import org.jboss.cache.marshall.MethodCallFactory;
 import org.jboss.cache.marshall.MethodDeclarations;
+import org.jboss.cache.optimistic.DataVersion;
 import org.jboss.cache.optimistic.TransactionWorkspace;
+import org.jboss.cache.optimistic.WorkspaceNode;
+import org.jboss.cache.optimistic.DefaultDataVersion;
 import org.jboss.cache.transaction.GlobalTransaction;
 import org.jboss.cache.transaction.OptimisticTransactionEntry;
 import org.jboss.cache.transaction.TransactionEntry;
@@ -27,6 +30,7 @@
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
 
 /**
  * This interceptor acts as a replacement to the replication interceptor when
@@ -44,11 +48,15 @@
 {
    private long m_invalidations = 0;
    protected TransactionTable txTable;
+   protected Map<GlobalTransaction, List<MethodCall>> txMods;
+   protected boolean optimistic;
 
    public void setCache(CacheSPI cache)
    {
       super.setCache(cache);
       txTable = cache.getTransactionTable();
+      optimistic=cache.getConfiguration().isNodeLockingOptimistic();
+      if (optimistic) txMods = new ConcurrentHashMap<GlobalTransaction, List<MethodCall>>();
    }
 
    public Object invoke(InvocationContext ctx) throws Throwable
@@ -92,49 +100,27 @@
       else
       {
          // not a CRUD method - lets see if it is a tx lifecycle method.
-         if (tx != null && isValid(tx))
+         if (tx != null)
          {
+
+            GlobalTransaction gtx;
+            TransactionEntry entry;
+            List<MethodCall> modifications;
             // lets see if we are in the prepare phase (as this is the only time we actually do anything)
             switch (m.getMethodId())
             {
+
                case MethodDeclarations.prepareMethod_id:
-               case MethodDeclarations.optimisticPrepareMethod_id:
                   log.debug("Entering InvalidationInterceptor's prepare phase");
                   // fetch the modifications before the transaction is committed (and thus removed from the txTable)
-                  GlobalTransaction gtx = ctx.getGlobalTransaction();
-                  TransactionEntry entry = txTable.get(gtx);
+                  gtx = ctx.getGlobalTransaction();
+                  entry = txTable.get(gtx);
                   if (entry == null) throw new IllegalStateException("cannot find transaction entry for " + gtx);
-                  List<MethodCall> modifications = new LinkedList<MethodCall>(entry.getModifications());
+                  modifications = new LinkedList<MethodCall>(entry.getModifications());
 
                   if (modifications.size() > 0)
                   {
-                     if (containsPutForExternalRead(modifications))
-                     {
-                        log.debug("Modification list contains a putForExternalRead operation.  Not invalidating.");
-                     }
-                     else
-                     {
-                        try
-                        {
-                           invalidateModifications(modifications, configuration.isNodeLockingOptimistic() ? getWorkspace(gtx) : null, defaultSynchronous, ctx);
-                        }
-                        catch (Throwable t)
-                        {
-                           log.warn("Unable to broadcast evicts as a part of the prepare phase.  Rolling back.", t);
-                           try
-                           {
-                              tx.setRollbackOnly();
-                           }
-                           catch (SystemException se)
-                           {
-                              throw new RuntimeException("setting tx rollback failed ", se);
-                           }
-                           if (t instanceof RuntimeException)
-                              throw t;
-                           else
-                              throw new RuntimeException("Unable to broadcast invalidation messages", t);
-                        }
-                     }
+                     broadcastInvalidate(modifications, gtx, tx, ctx);
                   }
                   else
                   {
@@ -142,6 +128,35 @@
                   }
 
                   break;
+               case MethodDeclarations.optimisticPrepareMethod_id:
+                  // here we just record the modifications but actually do the invalidate in commit.
+                  gtx = ctx.getGlobalTransaction();
+                  entry = txTable.get(gtx);
+                  if (entry == null) throw new IllegalStateException("cannot find transaction entry for " + gtx);
+                  modifications = new LinkedList<MethodCall>(entry.getModifications());
+
+                  if (modifications.size() > 0)
+                  {
+                     txMods.put(gtx, modifications);
+                  }
+                  break;
+               case MethodDeclarations.commitMethod_id:
+                  if (optimistic)
+                  {
+                     gtx = ctx.getGlobalTransaction();
+                     modifications = txMods.remove(gtx);
+                     broadcastInvalidate(modifications, gtx, tx, ctx);
+                     log.debug("Committing.  Broadcasting invalidations.");
+                  }
+                  break;
+               case MethodDeclarations.rollbackMethod_id:
+                  if (optimistic)
+                  {
+                     gtx = ctx.getGlobalTransaction();
+                     txMods.remove(gtx);
+                     log.debug("Caught a rollback.  Clearing modification in txMods");
+                  }
+                  break;
             }
          }
 
@@ -149,8 +164,41 @@
       return retval;
    }
 
+   private void broadcastInvalidate(List<MethodCall> modifications, GlobalTransaction gtx, Transaction tx, InvocationContext ctx)
+   {
+      if (containsPutForExternalRead(modifications))
+      {
+         log.debug("Modification list contains a putForExternalRead operation.  Not invalidating.");
+      }
+      else
+      {
+         try
+         {
+            invalidateModifications(modifications, configuration.isNodeLockingOptimistic() ? getWorkspace(gtx) : null, defaultSynchronous, ctx);
+         }
+         catch (Throwable t)
+         {
+            log.warn("Unable to broadcast evicts as a part of the prepare phase.  Rolling back.", t);
+            try
+            {
+               tx.setRollbackOnly();
+            }
+            catch (SystemException se)
+            {
+               throw new RuntimeException("setting tx rollback failed ", se);
+            }
+            if (t instanceof RuntimeException)
+               throw (RuntimeException) t;
+            else
+               throw new RuntimeException("Unable to broadcast invalidation messages", t);
+         }
+      }
+   }
+
    private boolean containsPutForExternalRead(List<MethodCall> l)
    {
+      if (l == null) return false;
+      
       for (MethodCall m : l)
          if (m.getMethodId() == MethodDeclarations.putForExternalReadMethodLocal_id || m.getMethodId() == MethodDeclarations.putForExternalReadVersionedMethodLocal_id)
             return true;
@@ -196,13 +244,28 @@
               MethodCallFactory.create(MethodDeclarations.evictVersionedNodeMethodLocal, fqn, workspace.getNode(fqn).getVersion()) :
               MethodCallFactory.create(MethodDeclarations.evictNodeMethodLocal, fqn);
               */
-      MethodCall call = MethodCallFactory.create(MethodDeclarations.invalidateMethodLocal, fqn, (workspace == null ? null : workspace.getNode(fqn).getVersion()));
+      MethodCall call = MethodCallFactory.create(MethodDeclarations.invalidateMethodLocal, fqn, getNodeVersion(workspace, fqn));
 
       if (log.isDebugEnabled()) log.debug("Cache [" + cache.getLocalAddress() + "] replicating " + call);
       // voila, invalidated!
       replicateCall(call, synchronous, ctx.getOptionOverrides());
    }
 
+   protected DataVersion getNodeVersion(TransactionWorkspace w, Fqn f)
+   {
+      if (w == null) return null;
+      WorkspaceNode wn = w.getNode(f);
+      DataVersion v = wn.getVersion();
+
+      if (wn.isVersioningImplicit())
+      {
+         // then send back an incremented version
+         v = ((DefaultDataVersion) v).increment();
+      }
+
+      return v;
+   }
+
    /**
     * Same as <code>invalidateModifications(modifications, workspace, defaultSynchronous)</code>
     *
@@ -243,11 +306,14 @@
    protected Set<Fqn> optimisedIterator(List<MethodCall> list)
    {
       Set<Fqn> fqns = new HashSet<Fqn>();
-      for (MethodCall mc : list)
+      if (list != null)
       {
-         if (MethodDeclarations.isCrudMethod(mc.getMethodId()))
+         for (MethodCall mc : list)
          {
-            fqns.add(findFqn(mc.getArgs()));
+            if (MethodDeclarations.isCrudMethod(mc.getMethodId()))
+            {
+               fqns.add(findFqn(mc.getArgs()));
+            }
          }
       }
       return fqns;

Modified: core/trunk/src/main/java/org/jboss/cache/marshall/AbstractMarshaller.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/marshall/AbstractMarshaller.java	2007-10-05 09:52:00 UTC (rev 4552)
+++ core/trunk/src/main/java/org/jboss/cache/marshall/AbstractMarshaller.java	2007-10-05 12:21:47 UTC (rev 4553)
@@ -135,6 +135,7 @@
          case MethodDeclarations.dataGravitationMethod_id:
          case MethodDeclarations.evictNodeMethodLocal_id:
          case MethodDeclarations.evictVersionedNodeMethodLocal_id:
+         case MethodDeclarations.invalidateMethodLocal_id:   
          case MethodDeclarations.getChildrenNamesMethodLocal_id:
          case MethodDeclarations.getDataMapMethodLocal_id:
          case MethodDeclarations.getKeysMethodLocal_id:

Modified: core/trunk/src/test/java/org/jboss/cache/invalidation/InvalidationInterceptorTest.java
===================================================================
--- core/trunk/src/test/java/org/jboss/cache/invalidation/InvalidationInterceptorTest.java	2007-10-05 09:52:00 UTC (rev 4552)
+++ core/trunk/src/test/java/org/jboss/cache/invalidation/InvalidationInterceptorTest.java	2007-10-05 12:21:47 UTC (rev 4553)
@@ -6,37 +6,38 @@
  */
 package org.jboss.cache.invalidation;
 
-import static org.testng.AssertJUnit.assertEquals;
-import static org.testng.AssertJUnit.assertNull;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import javax.transaction.RollbackException;
-import javax.transaction.Transaction;
-import javax.transaction.TransactionManager;
-
 import junit.framework.Assert;
-
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.jboss.cache.Cache;
 import org.jboss.cache.CacheImpl;
+import org.jboss.cache.CacheSPI;
 import org.jboss.cache.DefaultCacheFactory;
 import org.jboss.cache.Fqn;
+import org.jboss.cache.Node;
+import org.jboss.cache.NodeSPI;
 import org.jboss.cache.config.CacheLoaderConfig;
 import org.jboss.cache.config.Configuration;
 import org.jboss.cache.factories.XmlConfigurationParser;
 import org.jboss.cache.misc.TestingUtil;
 import org.jboss.cache.xml.XmlHelper;
+import static org.testng.AssertJUnit.assertEquals;
+import static org.testng.AssertJUnit.assertNull;
 import org.testng.annotations.Test;
 import org.w3c.dom.Element;
 
+import javax.transaction.RollbackException;
+import javax.transaction.Transaction;
+import javax.transaction.TransactionManager;
+import java.util.ArrayList;
+import java.util.List;
+
 /**
  * Tests the async interceptor
  *
  * @author <a href="mailto:manik at jboss.org">Manik Surtani (manik at jboss.org)</a>
  */
- at Test(groups = {"functional"})
+ at Test(groups = {"functional", "jgroups"})
 public class InvalidationInterceptorTest
 {
    private static Log log = LogFactory.getLog(InvalidationInterceptorTest.class);
@@ -57,13 +58,19 @@
 
       // now make sure cache2 is in sync with cache1:
       cache2.put(fqn, "key", "value");
-      Assert.assertNull("Should be null", cache1.get(fqn));
+
+      // since the node already exists even PL will not remove it - but will invalidate it's data
+      Node n = cache1.get(fqn);
+      assert n != null : "Should not be null";
+      assert n.getKeys().isEmpty() : "Should not contain any data";
       Assert.assertEquals("value", cache2.get(fqn, "key"));
 
       // now test the invalidation:
       cache1.put(fqn, "key2", "value2");
       Assert.assertEquals("value2", cache1.get(fqn, "key2"));
-      Assert.assertNull("Should have been invalidated!", cache2.get(fqn));
+      n = cache2.get(fqn);
+      assert n != null : "Should not be null";
+      assert n.getKeys().isEmpty() : "Should not contain any data";
 
       // clean up.
       cache1.stop();
@@ -126,14 +133,22 @@
       // now make sure cache2 is in sync with cache1:
       cache2.put(fqn, "key", "value");
       TestingUtil.sleepThread(500);// give it time to broadcast the evict call
-      Assert.assertNull("Should be null", cache1.get(fqn));
+
+            // since the node already exists even PL will not remove it - but will invalidate it's data
+      Node n = cache1.get(fqn);
+      assert n != null : "Should not be null";
+      assert n.getKeys().isEmpty() : "Should not contain any data";
       Assert.assertEquals("value", cache2.get(fqn, "key"));
 
       // now test the invalidation:
       cache1.put(fqn, "key2", "value2");
       Assert.assertEquals("value2", cache1.get(fqn, "key2"));
       TestingUtil.sleepThread(500);// give it time to broadcast the evict call
-      Assert.assertNull("Should have been invalidated!", cache2.get(fqn));
+      
+      // since the node already exists even PL will not remove it - but will invalidate it's data
+      n = cache2.get(fqn);
+      assert n != null : "Should not be null";
+      assert n.getKeys().isEmpty() : "Should not contain any data";
 
       // clean up.
       cache1.stop();
@@ -167,7 +182,10 @@
       Assert.assertEquals("value", cache2.get(fqn, "key"));
       txm.commit();
 
-      Assert.assertNull("Should be null", cache1.get(fqn));
+            // since the node already exists even PL will not remove it - but will invalidate it's data
+      Node n = cache1.get(fqn);
+      assert n != null : "Should not be null";
+      assert n.getKeys().isEmpty() : "Should not contain any data";
       Assert.assertEquals("value", cache2.get(fqn, "key"));
 
       // now test the invalidation again
@@ -180,7 +198,10 @@
       txm.commit();
 
       Assert.assertEquals("value2", cache1.get(fqn, "key2"));
-      Assert.assertNull("Should have been invalidated!", cache2.get(fqn));
+      // since the node already exists even PL will not remove it - but will invalidate it's data
+      n = cache2.get(fqn);
+      assert n != null : "Should not be null";
+      assert n.getKeys().isEmpty() : "Should not contain any data";
 
       // test a rollback
       txm = cache2.getTransactionManager();
@@ -192,7 +213,9 @@
       txm.rollback();
 
       Assert.assertEquals("value2", cache1.get(fqn, "key2"));
-      Assert.assertNull("Should not have committed", cache2.get(fqn));
+      n = cache2.get(fqn);
+      assert n != null : "Should not be null";
+      assert n.getKeys().isEmpty() : "Should not contain any data";
 
       // clean up.
       cache1.stop();
@@ -212,7 +235,9 @@
 
       cache2.put(fqn, "key", "value");
       Assert.assertEquals("value", cache2.get(fqn, "key"));
-      Assert.assertNull(cache1.get(fqn));
+      Node n = cache1.get(fqn);
+      assert n != null : "Should not be null";
+      assert n.getKeys().isEmpty() : "But should not contain any data";
 
       // start a tx that cache1 will have to send out an evict ...
       TransactionManager mgr1 = cache1.getTransactionManager();
@@ -357,7 +382,17 @@
       cache2 = null;
    }
 
+   private void dumpVersionInfo(CacheSPI c1, CacheSPI c2, Fqn fqn)
+   {
+      System.out.println("**** Versin Info for Fqn ["+fqn+"] ****");
+      NodeSPI n1 = c1.getRoot().getChildDirect(fqn);
+      System.out.println("  Cache 1: " + n1.getVersion() + " dataLoaded? " + n1.isDataLoaded());
 
+      NodeSPI n2 = c2.getRoot().getChildDirect(fqn);
+      System.out.println("  Cache 2: " + n2.getVersion() + " dataLoaded? " + n2.isDataLoaded());
+   }
+
+
    public void testOptimistic() throws Exception
    {
       CacheImpl<Object, Object> cache1 = createCache(true);
@@ -366,21 +401,35 @@
       Fqn fqn = Fqn.fromString("/a/b");
       cache1.put(fqn, "key", "value");
 
+      dumpVersionInfo(cache1, cache2, fqn);
+
       // test that this has NOT replicated, but rather has been invalidated:
       Assert.assertEquals("value", cache1.get(fqn, "key"));
-      Assert.assertNull("Should NOT have replicated!", cache2.get(fqn));
+      Node n2 = cache2.get(fqn);
+      assert n2 != null : "Should NOT be null; we need to have version info on all instances.";
+      assert n2.get("key") == null : "Data should not have replicated!";
 
-      log.info("***** Node not replicated, as expected.");
-
       // now make sure cache2 is in sync with cache1:
       cache2.put(fqn, "key", "value");
-      Assert.assertNull("Should be null", cache1.get(fqn));
+
+      dumpVersionInfo(cache1, cache2, fqn);
+
+      Node n1 = cache1.get(fqn);
+      assert n1 != null : "Should NOT be null; we need to have version info on all instances.";
+      assert n1.get("key") == null : "Data should not have replicated!";
+
       Assert.assertEquals("value", cache2.get(fqn, "key"));
 
       // now test the invalidation:
       cache1.put(fqn, "key2", "value2");
+
+      dumpVersionInfo(cache1, cache2, fqn);
+
       Assert.assertEquals("value2", cache1.get(fqn, "key2"));
-      Assert.assertNull("Should have been invalidated!", cache2.get(fqn));
+      n2 = cache2.get(fqn);
+      assert n2 != null : "Should NOT be null; we need to have version info on all instances.";
+      assert n2.get("key") == null : "Data should have invalidated!";
+      assert n2.get("key2") == null : "Data should have invalidated!";
 
       // with tx's
       TransactionManager txm = cache2.getTransactionManager();
@@ -391,7 +440,9 @@
       Assert.assertEquals("value2", cache1.get(fqn, "key2"));
       txm.commit();
 
-      Assert.assertNull("Should be null", cache1.get(fqn));
+      n1 = cache1.get(fqn);
+      assert n1 != null : "Should NOT be null; we need to have version info on all instances.";
+      assert n1.get("key") == null : "Data should be null!";
       Assert.assertEquals("value", cache2.get(fqn, "key"));
 
       // now test the invalidation again
@@ -404,7 +455,9 @@
       txm.commit();
 
       Assert.assertEquals("value2", cache1.get(fqn, "key2"));
-      Assert.assertNull("Should have been invalidated!", cache2.get(fqn));
+      n2 = cache2.get(fqn);
+      assert n2 != null : "Should NOT be null; we need to have version info on all instances.";
+      assert n2.get("key2") == null : "Data should have invalidated!";
 
       // test a rollback
       txm = cache2.getTransactionManager();
@@ -416,7 +469,9 @@
       txm.rollback();
 
       Assert.assertEquals("value2", cache1.get(fqn, "key2"));
-      Assert.assertNull("Should not have committed", cache2.get(fqn));
+      n2 = cache2.get(fqn);
+      assert n2 != null : "Should NOT be null; we need to have version info on all instances.";
+      assert n2.get("key2") == null : "Should not have committed!";
 
       // clean up.
       cache1.stop();
@@ -500,7 +555,7 @@
       Assert.assertNull("Should be null", caches.get(1).get(fqn, "key"));
       mgr.begin();
       caches.get(0).put(fqn, "key", "value");
-      Assert.assertEquals("value", caches.get(0).get(fqn, "key"));
+      Assert.assertEquals("value", caches.get(0).get(fqn, "key"));      
       Assert.assertNull("Should be null", caches.get(1).get(fqn, "key"));
       mgr.commit();
       Assert.assertEquals("value", caches.get(1).get(fqn, "key"));
@@ -562,12 +617,24 @@
 
       caches.get(0).put(fqn, "key", "value");
       assertEquals("expecting value", "value", caches.get(0).get(fqn, "key"));
-      assertNull("Should be null", caches.get(1).get(fqn));
+      Node n = caches.get(1).get(fqn);
+      if (optimistic)
+      {
+         assert n != null : "Should NOT be null";
+         assert n.getKeys().isEmpty() : "but should be empty";
+      }
+      else
+      {
+         // only opt locking requires a stub node created on invalidation to hold the data version
+         assert n == null : "Should be null!";
+      }
 
       // now put in caches.get(1), should fire an eviction
       caches.get(1).put(fqn, "key", "value2");
       assertEquals("expecting value2", "value2", caches.get(1).get(fqn, "key"));
-      assertNull("Should be null", caches.get(0).get(fqn));
+      n = caches.get(0).get(fqn);
+      assert n != null : "Should NOT be null";
+      assert n.getKeys().isEmpty() : "but should be empty";
 
       // clean up.
       caches.get(0).remove(fqn);

Modified: core/trunk/src/test/java/org/jboss/cache/invalidation/VersionInconsistencyTest.java
===================================================================
--- core/trunk/src/test/java/org/jboss/cache/invalidation/VersionInconsistencyTest.java	2007-10-05 09:52:00 UTC (rev 4552)
+++ core/trunk/src/test/java/org/jboss/cache/invalidation/VersionInconsistencyTest.java	2007-10-05 12:21:47 UTC (rev 4553)
@@ -22,7 +22,7 @@
  * @author <a href="mailto:manik at jboss.org">Manik Surtani</a>
  * @since 2.1.0
  */
- at Test(groups = {"functional"})
+ at Test(groups = {"functional", "jgroups"})
 public class VersionInconsistencyTest
 {
    private Cache cache1, cache2;




More information about the jbosscache-commits mailing list