[jbosscache-commits] JBoss Cache SVN: r4553 - in core/trunk/src: main/java/org/jboss/cache/marshall and 1 other directories.
jbosscache-commits at lists.jboss.org
jbosscache-commits at lists.jboss.org
Fri Oct 5 08:21:47 EDT 2007
Author: manik.surtani at jboss.com
Date: 2007-10-05 08:21:47 -0400 (Fri, 05 Oct 2007)
New Revision: 4553
Modified:
core/trunk/src/main/java/org/jboss/cache/interceptors/InvalidationInterceptor.java
core/trunk/src/main/java/org/jboss/cache/marshall/AbstractMarshaller.java
core/trunk/src/test/java/org/jboss/cache/invalidation/InvalidationInterceptorTest.java
core/trunk/src/test/java/org/jboss/cache/invalidation/VersionInconsistencyTest.java
Log:
Fixed invalidation issues and updated tests
Modified: core/trunk/src/main/java/org/jboss/cache/interceptors/InvalidationInterceptor.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/interceptors/InvalidationInterceptor.java 2007-10-05 09:52:00 UTC (rev 4552)
+++ core/trunk/src/main/java/org/jboss/cache/interceptors/InvalidationInterceptor.java 2007-10-05 12:21:47 UTC (rev 4553)
@@ -13,7 +13,10 @@
import org.jboss.cache.marshall.MethodCall;
import org.jboss.cache.marshall.MethodCallFactory;
import org.jboss.cache.marshall.MethodDeclarations;
+import org.jboss.cache.optimistic.DataVersion;
import org.jboss.cache.optimistic.TransactionWorkspace;
+import org.jboss.cache.optimistic.WorkspaceNode;
+import org.jboss.cache.optimistic.DefaultDataVersion;
import org.jboss.cache.transaction.GlobalTransaction;
import org.jboss.cache.transaction.OptimisticTransactionEntry;
import org.jboss.cache.transaction.TransactionEntry;
@@ -27,6 +30,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
/**
* This interceptor acts as a replacement to the replication interceptor when
@@ -44,11 +48,15 @@
{
private long m_invalidations = 0;
protected TransactionTable txTable;
+ protected Map<GlobalTransaction, List<MethodCall>> txMods;
+ protected boolean optimistic;
public void setCache(CacheSPI cache)
{
super.setCache(cache);
txTable = cache.getTransactionTable();
+ optimistic=cache.getConfiguration().isNodeLockingOptimistic();
+ if (optimistic) txMods = new ConcurrentHashMap<GlobalTransaction, List<MethodCall>>();
}
public Object invoke(InvocationContext ctx) throws Throwable
@@ -92,49 +100,27 @@
else
{
// not a CRUD method - lets see if it is a tx lifecycle method.
- if (tx != null && isValid(tx))
+ if (tx != null)
{
+
+ GlobalTransaction gtx;
+ TransactionEntry entry;
+ List<MethodCall> modifications;
// lets see if we are in the prepare phase (as this is the only time we actually do anything)
switch (m.getMethodId())
{
+
case MethodDeclarations.prepareMethod_id:
- case MethodDeclarations.optimisticPrepareMethod_id:
log.debug("Entering InvalidationInterceptor's prepare phase");
// fetch the modifications before the transaction is committed (and thus removed from the txTable)
- GlobalTransaction gtx = ctx.getGlobalTransaction();
- TransactionEntry entry = txTable.get(gtx);
+ gtx = ctx.getGlobalTransaction();
+ entry = txTable.get(gtx);
if (entry == null) throw new IllegalStateException("cannot find transaction entry for " + gtx);
- List<MethodCall> modifications = new LinkedList<MethodCall>(entry.getModifications());
+ modifications = new LinkedList<MethodCall>(entry.getModifications());
if (modifications.size() > 0)
{
- if (containsPutForExternalRead(modifications))
- {
- log.debug("Modification list contains a putForExternalRead operation. Not invalidating.");
- }
- else
- {
- try
- {
- invalidateModifications(modifications, configuration.isNodeLockingOptimistic() ? getWorkspace(gtx) : null, defaultSynchronous, ctx);
- }
- catch (Throwable t)
- {
- log.warn("Unable to broadcast evicts as a part of the prepare phase. Rolling back.", t);
- try
- {
- tx.setRollbackOnly();
- }
- catch (SystemException se)
- {
- throw new RuntimeException("setting tx rollback failed ", se);
- }
- if (t instanceof RuntimeException)
- throw t;
- else
- throw new RuntimeException("Unable to broadcast invalidation messages", t);
- }
- }
+ broadcastInvalidate(modifications, gtx, tx, ctx);
}
else
{
@@ -142,6 +128,35 @@
}
break;
+ case MethodDeclarations.optimisticPrepareMethod_id:
+ // here we just record the modifications but actually do the invalidate in commit.
+ gtx = ctx.getGlobalTransaction();
+ entry = txTable.get(gtx);
+ if (entry == null) throw new IllegalStateException("cannot find transaction entry for " + gtx);
+ modifications = new LinkedList<MethodCall>(entry.getModifications());
+
+ if (modifications.size() > 0)
+ {
+ txMods.put(gtx, modifications);
+ }
+ break;
+ case MethodDeclarations.commitMethod_id:
+ if (optimistic)
+ {
+ gtx = ctx.getGlobalTransaction();
+ modifications = txMods.remove(gtx);
+ broadcastInvalidate(modifications, gtx, tx, ctx);
+ log.debug("Committing. Broadcasting invalidations.");
+ }
+ break;
+ case MethodDeclarations.rollbackMethod_id:
+ if (optimistic)
+ {
+ gtx = ctx.getGlobalTransaction();
+ txMods.remove(gtx);
+ log.debug("Caught a rollback. Clearing modification in txMods");
+ }
+ break;
}
}
@@ -149,8 +164,41 @@
return retval;
}
+ private void broadcastInvalidate(List<MethodCall> modifications, GlobalTransaction gtx, Transaction tx, InvocationContext ctx)
+ {
+ if (containsPutForExternalRead(modifications))
+ {
+ log.debug("Modification list contains a putForExternalRead operation. Not invalidating.");
+ }
+ else
+ {
+ try
+ {
+ invalidateModifications(modifications, configuration.isNodeLockingOptimistic() ? getWorkspace(gtx) : null, defaultSynchronous, ctx);
+ }
+ catch (Throwable t)
+ {
+ log.warn("Unable to broadcast evicts as a part of the prepare phase. Rolling back.", t);
+ try
+ {
+ tx.setRollbackOnly();
+ }
+ catch (SystemException se)
+ {
+ throw new RuntimeException("setting tx rollback failed ", se);
+ }
+ if (t instanceof RuntimeException)
+ throw (RuntimeException) t;
+ else
+ throw new RuntimeException("Unable to broadcast invalidation messages", t);
+ }
+ }
+ }
+
private boolean containsPutForExternalRead(List<MethodCall> l)
{
+ if (l == null) return false;
+
for (MethodCall m : l)
if (m.getMethodId() == MethodDeclarations.putForExternalReadMethodLocal_id || m.getMethodId() == MethodDeclarations.putForExternalReadVersionedMethodLocal_id)
return true;
@@ -196,13 +244,28 @@
MethodCallFactory.create(MethodDeclarations.evictVersionedNodeMethodLocal, fqn, workspace.getNode(fqn).getVersion()) :
MethodCallFactory.create(MethodDeclarations.evictNodeMethodLocal, fqn);
*/
- MethodCall call = MethodCallFactory.create(MethodDeclarations.invalidateMethodLocal, fqn, (workspace == null ? null : workspace.getNode(fqn).getVersion()));
+ MethodCall call = MethodCallFactory.create(MethodDeclarations.invalidateMethodLocal, fqn, getNodeVersion(workspace, fqn));
if (log.isDebugEnabled()) log.debug("Cache [" + cache.getLocalAddress() + "] replicating " + call);
// voila, invalidated!
replicateCall(call, synchronous, ctx.getOptionOverrides());
}
+ protected DataVersion getNodeVersion(TransactionWorkspace w, Fqn f)
+ {
+ if (w == null) return null;
+ WorkspaceNode wn = w.getNode(f);
+ DataVersion v = wn.getVersion();
+
+ if (wn.isVersioningImplicit())
+ {
+ // then send back an incremented version
+ v = ((DefaultDataVersion) v).increment();
+ }
+
+ return v;
+ }
+
/**
* Same as <code>invalidateModifications(modifications, workspace, defaultSynchronous)</code>
*
@@ -243,11 +306,14 @@
protected Set<Fqn> optimisedIterator(List<MethodCall> list)
{
Set<Fqn> fqns = new HashSet<Fqn>();
- for (MethodCall mc : list)
+ if (list != null)
{
- if (MethodDeclarations.isCrudMethod(mc.getMethodId()))
+ for (MethodCall mc : list)
{
- fqns.add(findFqn(mc.getArgs()));
+ if (MethodDeclarations.isCrudMethod(mc.getMethodId()))
+ {
+ fqns.add(findFqn(mc.getArgs()));
+ }
}
}
return fqns;
Modified: core/trunk/src/main/java/org/jboss/cache/marshall/AbstractMarshaller.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/marshall/AbstractMarshaller.java 2007-10-05 09:52:00 UTC (rev 4552)
+++ core/trunk/src/main/java/org/jboss/cache/marshall/AbstractMarshaller.java 2007-10-05 12:21:47 UTC (rev 4553)
@@ -135,6 +135,7 @@
case MethodDeclarations.dataGravitationMethod_id:
case MethodDeclarations.evictNodeMethodLocal_id:
case MethodDeclarations.evictVersionedNodeMethodLocal_id:
+ case MethodDeclarations.invalidateMethodLocal_id:
case MethodDeclarations.getChildrenNamesMethodLocal_id:
case MethodDeclarations.getDataMapMethodLocal_id:
case MethodDeclarations.getKeysMethodLocal_id:
Modified: core/trunk/src/test/java/org/jboss/cache/invalidation/InvalidationInterceptorTest.java
===================================================================
--- core/trunk/src/test/java/org/jboss/cache/invalidation/InvalidationInterceptorTest.java 2007-10-05 09:52:00 UTC (rev 4552)
+++ core/trunk/src/test/java/org/jboss/cache/invalidation/InvalidationInterceptorTest.java 2007-10-05 12:21:47 UTC (rev 4553)
@@ -6,37 +6,38 @@
*/
package org.jboss.cache.invalidation;
-import static org.testng.AssertJUnit.assertEquals;
-import static org.testng.AssertJUnit.assertNull;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import javax.transaction.RollbackException;
-import javax.transaction.Transaction;
-import javax.transaction.TransactionManager;
-
import junit.framework.Assert;
-
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.jboss.cache.Cache;
import org.jboss.cache.CacheImpl;
+import org.jboss.cache.CacheSPI;
import org.jboss.cache.DefaultCacheFactory;
import org.jboss.cache.Fqn;
+import org.jboss.cache.Node;
+import org.jboss.cache.NodeSPI;
import org.jboss.cache.config.CacheLoaderConfig;
import org.jboss.cache.config.Configuration;
import org.jboss.cache.factories.XmlConfigurationParser;
import org.jboss.cache.misc.TestingUtil;
import org.jboss.cache.xml.XmlHelper;
+import static org.testng.AssertJUnit.assertEquals;
+import static org.testng.AssertJUnit.assertNull;
import org.testng.annotations.Test;
import org.w3c.dom.Element;
+import javax.transaction.RollbackException;
+import javax.transaction.Transaction;
+import javax.transaction.TransactionManager;
+import java.util.ArrayList;
+import java.util.List;
+
/**
* Tests the async interceptor
*
* @author <a href="mailto:manik at jboss.org">Manik Surtani (manik at jboss.org)</a>
*/
- at Test(groups = {"functional"})
+ at Test(groups = {"functional", "jgroups"})
public class InvalidationInterceptorTest
{
private static Log log = LogFactory.getLog(InvalidationInterceptorTest.class);
@@ -57,13 +58,19 @@
// now make sure cache2 is in sync with cache1:
cache2.put(fqn, "key", "value");
- Assert.assertNull("Should be null", cache1.get(fqn));
+
+ // since the node already exists even PL will not remove it - but will invalidate it's data
+ Node n = cache1.get(fqn);
+ assert n != null : "Should not be null";
+ assert n.getKeys().isEmpty() : "Should not contain any data";
Assert.assertEquals("value", cache2.get(fqn, "key"));
// now test the invalidation:
cache1.put(fqn, "key2", "value2");
Assert.assertEquals("value2", cache1.get(fqn, "key2"));
- Assert.assertNull("Should have been invalidated!", cache2.get(fqn));
+ n = cache2.get(fqn);
+ assert n != null : "Should not be null";
+ assert n.getKeys().isEmpty() : "Should not contain any data";
// clean up.
cache1.stop();
@@ -126,14 +133,22 @@
// now make sure cache2 is in sync with cache1:
cache2.put(fqn, "key", "value");
TestingUtil.sleepThread(500);// give it time to broadcast the evict call
- Assert.assertNull("Should be null", cache1.get(fqn));
+
+ // since the node already exists even PL will not remove it - but will invalidate it's data
+ Node n = cache1.get(fqn);
+ assert n != null : "Should not be null";
+ assert n.getKeys().isEmpty() : "Should not contain any data";
Assert.assertEquals("value", cache2.get(fqn, "key"));
// now test the invalidation:
cache1.put(fqn, "key2", "value2");
Assert.assertEquals("value2", cache1.get(fqn, "key2"));
TestingUtil.sleepThread(500);// give it time to broadcast the evict call
- Assert.assertNull("Should have been invalidated!", cache2.get(fqn));
+
+ // since the node already exists even PL will not remove it - but will invalidate it's data
+ n = cache2.get(fqn);
+ assert n != null : "Should not be null";
+ assert n.getKeys().isEmpty() : "Should not contain any data";
// clean up.
cache1.stop();
@@ -167,7 +182,10 @@
Assert.assertEquals("value", cache2.get(fqn, "key"));
txm.commit();
- Assert.assertNull("Should be null", cache1.get(fqn));
+ // since the node already exists even PL will not remove it - but will invalidate it's data
+ Node n = cache1.get(fqn);
+ assert n != null : "Should not be null";
+ assert n.getKeys().isEmpty() : "Should not contain any data";
Assert.assertEquals("value", cache2.get(fqn, "key"));
// now test the invalidation again
@@ -180,7 +198,10 @@
txm.commit();
Assert.assertEquals("value2", cache1.get(fqn, "key2"));
- Assert.assertNull("Should have been invalidated!", cache2.get(fqn));
+ // since the node already exists even PL will not remove it - but will invalidate it's data
+ n = cache2.get(fqn);
+ assert n != null : "Should not be null";
+ assert n.getKeys().isEmpty() : "Should not contain any data";
// test a rollback
txm = cache2.getTransactionManager();
@@ -192,7 +213,9 @@
txm.rollback();
Assert.assertEquals("value2", cache1.get(fqn, "key2"));
- Assert.assertNull("Should not have committed", cache2.get(fqn));
+ n = cache2.get(fqn);
+ assert n != null : "Should not be null";
+ assert n.getKeys().isEmpty() : "Should not contain any data";
// clean up.
cache1.stop();
@@ -212,7 +235,9 @@
cache2.put(fqn, "key", "value");
Assert.assertEquals("value", cache2.get(fqn, "key"));
- Assert.assertNull(cache1.get(fqn));
+ Node n = cache1.get(fqn);
+ assert n != null : "Should not be null";
+ assert n.getKeys().isEmpty() : "But should not contain any data";
// start a tx that cache1 will have to send out an evict ...
TransactionManager mgr1 = cache1.getTransactionManager();
@@ -357,7 +382,17 @@
cache2 = null;
}
+ private void dumpVersionInfo(CacheSPI c1, CacheSPI c2, Fqn fqn)
+ {
+ System.out.println("**** Versin Info for Fqn ["+fqn+"] ****");
+ NodeSPI n1 = c1.getRoot().getChildDirect(fqn);
+ System.out.println(" Cache 1: " + n1.getVersion() + " dataLoaded? " + n1.isDataLoaded());
+ NodeSPI n2 = c2.getRoot().getChildDirect(fqn);
+ System.out.println(" Cache 2: " + n2.getVersion() + " dataLoaded? " + n2.isDataLoaded());
+ }
+
+
public void testOptimistic() throws Exception
{
CacheImpl<Object, Object> cache1 = createCache(true);
@@ -366,21 +401,35 @@
Fqn fqn = Fqn.fromString("/a/b");
cache1.put(fqn, "key", "value");
+ dumpVersionInfo(cache1, cache2, fqn);
+
// test that this has NOT replicated, but rather has been invalidated:
Assert.assertEquals("value", cache1.get(fqn, "key"));
- Assert.assertNull("Should NOT have replicated!", cache2.get(fqn));
+ Node n2 = cache2.get(fqn);
+ assert n2 != null : "Should NOT be null; we need to have version info on all instances.";
+ assert n2.get("key") == null : "Data should not have replicated!";
- log.info("***** Node not replicated, as expected.");
-
// now make sure cache2 is in sync with cache1:
cache2.put(fqn, "key", "value");
- Assert.assertNull("Should be null", cache1.get(fqn));
+
+ dumpVersionInfo(cache1, cache2, fqn);
+
+ Node n1 = cache1.get(fqn);
+ assert n1 != null : "Should NOT be null; we need to have version info on all instances.";
+ assert n1.get("key") == null : "Data should not have replicated!";
+
Assert.assertEquals("value", cache2.get(fqn, "key"));
// now test the invalidation:
cache1.put(fqn, "key2", "value2");
+
+ dumpVersionInfo(cache1, cache2, fqn);
+
Assert.assertEquals("value2", cache1.get(fqn, "key2"));
- Assert.assertNull("Should have been invalidated!", cache2.get(fqn));
+ n2 = cache2.get(fqn);
+ assert n2 != null : "Should NOT be null; we need to have version info on all instances.";
+ assert n2.get("key") == null : "Data should have invalidated!";
+ assert n2.get("key2") == null : "Data should have invalidated!";
// with tx's
TransactionManager txm = cache2.getTransactionManager();
@@ -391,7 +440,9 @@
Assert.assertEquals("value2", cache1.get(fqn, "key2"));
txm.commit();
- Assert.assertNull("Should be null", cache1.get(fqn));
+ n1 = cache1.get(fqn);
+ assert n1 != null : "Should NOT be null; we need to have version info on all instances.";
+ assert n1.get("key") == null : "Data should be null!";
Assert.assertEquals("value", cache2.get(fqn, "key"));
// now test the invalidation again
@@ -404,7 +455,9 @@
txm.commit();
Assert.assertEquals("value2", cache1.get(fqn, "key2"));
- Assert.assertNull("Should have been invalidated!", cache2.get(fqn));
+ n2 = cache2.get(fqn);
+ assert n2 != null : "Should NOT be null; we need to have version info on all instances.";
+ assert n2.get("key2") == null : "Data should have invalidated!";
// test a rollback
txm = cache2.getTransactionManager();
@@ -416,7 +469,9 @@
txm.rollback();
Assert.assertEquals("value2", cache1.get(fqn, "key2"));
- Assert.assertNull("Should not have committed", cache2.get(fqn));
+ n2 = cache2.get(fqn);
+ assert n2 != null : "Should NOT be null; we need to have version info on all instances.";
+ assert n2.get("key2") == null : "Should not have committed!";
// clean up.
cache1.stop();
@@ -500,7 +555,7 @@
Assert.assertNull("Should be null", caches.get(1).get(fqn, "key"));
mgr.begin();
caches.get(0).put(fqn, "key", "value");
- Assert.assertEquals("value", caches.get(0).get(fqn, "key"));
+ Assert.assertEquals("value", caches.get(0).get(fqn, "key"));
Assert.assertNull("Should be null", caches.get(1).get(fqn, "key"));
mgr.commit();
Assert.assertEquals("value", caches.get(1).get(fqn, "key"));
@@ -562,12 +617,24 @@
caches.get(0).put(fqn, "key", "value");
assertEquals("expecting value", "value", caches.get(0).get(fqn, "key"));
- assertNull("Should be null", caches.get(1).get(fqn));
+ Node n = caches.get(1).get(fqn);
+ if (optimistic)
+ {
+ assert n != null : "Should NOT be null";
+ assert n.getKeys().isEmpty() : "but should be empty";
+ }
+ else
+ {
+ // only opt locking requires a stub node created on invalidation to hold the data version
+ assert n == null : "Should be null!";
+ }
// now put in caches.get(1), should fire an eviction
caches.get(1).put(fqn, "key", "value2");
assertEquals("expecting value2", "value2", caches.get(1).get(fqn, "key"));
- assertNull("Should be null", caches.get(0).get(fqn));
+ n = caches.get(0).get(fqn);
+ assert n != null : "Should NOT be null";
+ assert n.getKeys().isEmpty() : "but should be empty";
// clean up.
caches.get(0).remove(fqn);
Modified: core/trunk/src/test/java/org/jboss/cache/invalidation/VersionInconsistencyTest.java
===================================================================
--- core/trunk/src/test/java/org/jboss/cache/invalidation/VersionInconsistencyTest.java 2007-10-05 09:52:00 UTC (rev 4552)
+++ core/trunk/src/test/java/org/jboss/cache/invalidation/VersionInconsistencyTest.java 2007-10-05 12:21:47 UTC (rev 4553)
@@ -22,7 +22,7 @@
* @author <a href="mailto:manik at jboss.org">Manik Surtani</a>
* @since 2.1.0
*/
- at Test(groups = {"functional"})
+ at Test(groups = {"functional", "jgroups"})
public class VersionInconsistencyTest
{
private Cache cache1, cache2;
More information about the jbosscache-commits
mailing list