JBoss Cache SVN: r5840 - support/trunk/common.
by jbosscache-commits@lists.jboss.org
Author: manik.surtani(a)jboss.com
Date: 2008-05-14 07:24:44 -0400 (Wed, 14 May 2008)
New Revision: 5840
Modified:
support/trunk/common/pom.xml
Log:
Modified: support/trunk/common/pom.xml
===================================================================
--- support/trunk/common/pom.xml 2008-05-14 10:51:05 UTC (rev 5839)
+++ support/trunk/common/pom.xml 2008-05-14 11:24:44 UTC (rev 5840)
@@ -100,7 +100,7 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
- <version>2.0.2</version>
+ <version>2.0.1</version>
<configuration>
<source>1.5</source>
<target>1.5</target>
@@ -159,7 +159,7 @@
<value>true</value>
</property>
</systemProperties>
- <groups>functional</groups>
+ <groups>${defaultTestGroup}</groups>
<forkMode>always</forkMode>
<!-- increasing JVM heap size -->
<argLine>-Xmx1024M</argLine>
@@ -294,6 +294,7 @@
<properties>
<!-- for now, at least, lets aggregate them -->
<jbosscache.reports.aggregate>true</jbosscache.reports.aggregate>
+ <defaultTestGroup>functional</defaultTestGroup>
</properties>
<repositories>
<repository>
16 years, 7 months
JBoss Cache SVN: r5839 - in core/trunk/src: test/java/org/jboss/cache/optimistic and 1 other directories.
by jbosscache-commits@lists.jboss.org
Author: manik.surtani(a)jboss.com
Date: 2008-05-14 06:51:05 -0400 (Wed, 14 May 2008)
New Revision: 5839
Modified:
core/trunk/src/main/java/org/jboss/cache/interceptors/CacheStoreInterceptor.java
core/trunk/src/main/java/org/jboss/cache/interceptors/OptimisticLockingInterceptor.java
core/trunk/src/main/java/org/jboss/cache/interceptors/OptimisticTxInterceptor.java
core/trunk/src/main/java/org/jboss/cache/interceptors/OptimisticValidatorInterceptor.java
core/trunk/src/main/java/org/jboss/cache/interceptors/TxInterceptor.java
core/trunk/src/test/java/org/jboss/cache/optimistic/DataVersionPersistenceTest.java
core/trunk/src/test/java/org/jboss/cache/transaction/ConcurrentBankTest.java
Log:
One-phase commit and optimistic locking
Modified: core/trunk/src/main/java/org/jboss/cache/interceptors/CacheStoreInterceptor.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/interceptors/CacheStoreInterceptor.java 2008-05-14 10:24:03 UTC (rev 5838)
+++ core/trunk/src/main/java/org/jboss/cache/interceptors/CacheStoreInterceptor.java 2008-05-14 10:51:05 UTC (rev 5839)
@@ -176,12 +176,7 @@
@Override
public Object handleOptimisticPrepareCommand(InvocationContext ctx, OptimisticPrepareCommand command) throws Throwable
{
- if (inTransaction())
- {
- if (trace) log.trace("transactional so don't put stuff in the cloader yet.");
- prepareCacheLoader(command.getGlobalTransaction(), ctx.getTransactionEntry(), command.isOnePhaseCommit());
- }
- return invokeNextInterceptor(ctx, command);
+ return handlePrepareCommand(ctx, command);
}
@Override
Modified: core/trunk/src/main/java/org/jboss/cache/interceptors/OptimisticLockingInterceptor.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/interceptors/OptimisticLockingInterceptor.java 2008-05-14 10:24:03 UTC (rev 5838)
+++ core/trunk/src/main/java/org/jboss/cache/interceptors/OptimisticLockingInterceptor.java 2008-05-14 10:51:05 UTC (rev 5839)
@@ -92,7 +92,7 @@
}
finally
{
- if (!succeeded || command.isOnePhaseCommit()) unlock(ctx, gtx);
+ if (!succeeded) unlock(ctx, gtx);
}
}
Modified: core/trunk/src/main/java/org/jboss/cache/interceptors/OptimisticTxInterceptor.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/interceptors/OptimisticTxInterceptor.java 2008-05-14 10:24:03 UTC (rev 5838)
+++ core/trunk/src/main/java/org/jboss/cache/interceptors/OptimisticTxInterceptor.java 2008-05-14 10:51:05 UTC (rev 5839)
@@ -29,6 +29,11 @@
{
protected final ModificationsReplayVisitor replayVisitor = new ModificationsReplayVisitor();
+ public OptimisticTxInterceptor()
+ {
+ optimistic = true;
+ }
+
@Override
public Object visitOptimisticPrepareCommand(InvocationContext ctx, OptimisticPrepareCommand command) throws Throwable
{
@@ -108,7 +113,8 @@
@Override
protected PrepareCommand buildPrepareCommand(GlobalTransaction gtx, List modifications, boolean onePhaseCommit)
{
- return commandsFactory.buildOptimisticPrepareCommand(gtx, modifications, null, rpcManager.getLocalAddress(), onePhaseCommit);
+ // optimistic locking NEVER does one-phase prepares.
+ return commandsFactory.buildOptimisticPrepareCommand(gtx, modifications, null, rpcManager.getLocalAddress(), false);
}
/**
Modified: core/trunk/src/main/java/org/jboss/cache/interceptors/OptimisticValidatorInterceptor.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/interceptors/OptimisticValidatorInterceptor.java 2008-05-14 10:24:03 UTC (rev 5838)
+++ core/trunk/src/main/java/org/jboss/cache/interceptors/OptimisticValidatorInterceptor.java 2008-05-14 10:51:05 UTC (rev 5839)
@@ -136,13 +136,7 @@
}
}
log.debug("Successfully validated nodes");
- Object retval = invokeNextInterceptor(ctx, command);
- if (command.isOnePhaseCommit())
- {
- // do a comit-phase
- commitTransaction(ctx);
- }
- return retval;
+ return invokeNextInterceptor(ctx, command);
}
private void commitTransaction(InvocationContext ctx)
Modified: core/trunk/src/main/java/org/jboss/cache/interceptors/TxInterceptor.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/interceptors/TxInterceptor.java 2008-05-14 10:24:03 UTC (rev 5838)
+++ core/trunk/src/main/java/org/jboss/cache/interceptors/TxInterceptor.java 2008-05-14 10:51:05 UTC (rev 5839)
@@ -74,6 +74,7 @@
private long prepares = 0;
private long commits = 0;
private long rollbacks = 0;
+ protected boolean optimistic = false;
@Inject
public void intialize(RPCManager rpcManager,
@@ -646,7 +647,7 @@
private boolean isOnePhaseCommit()
{
- if (!configuration.getCacheMode().isSynchronous())
+ if (!configuration.getCacheMode().isSynchronous() && !optimistic)
{
// this is a REPL_ASYNC call - do 1-phase commit. break!
if (trace) log.trace("This is a REPL_ASYNC call (1 phase commit) - do nothing for beforeCompletion()");
@@ -961,7 +962,7 @@
switch (status)
{
case Status.STATUS_COMMITTED:
- boolean onePhaseCommit = !configuration.getCacheMode().isSynchronous();
+ boolean onePhaseCommit = isOnePhaseCommit();
if (log.isDebugEnabled()) log.debug("Running commit phase. One phase? " + onePhaseCommit);
runCommitPhase(ctx, gtx, modifications, onePhaseCommit);
log.debug("Finished commit phase");
Modified: core/trunk/src/test/java/org/jboss/cache/optimistic/DataVersionPersistenceTest.java
===================================================================
--- core/trunk/src/test/java/org/jboss/cache/optimistic/DataVersionPersistenceTest.java 2008-05-14 10:24:03 UTC (rev 5838)
+++ core/trunk/src/test/java/org/jboss/cache/optimistic/DataVersionPersistenceTest.java 2008-05-14 10:51:05 UTC (rev 5839)
@@ -14,6 +14,8 @@
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
+import java.io.IOException;
+
/**
* Tests whether data versions are transferred along with state
*
@@ -27,7 +29,7 @@
private CacheLoader loader;
@BeforeMethod
- public void setUp()
+ public void setUp() throws IOException
{
cache = new DefaultCacheFactory().createCache(false);
@@ -36,6 +38,7 @@
CacheLoaderConfig clc = new CacheLoaderConfig();
CacheLoaderConfig.IndividualCacheLoaderConfig iclc = new CacheLoaderConfig.IndividualCacheLoaderConfig();
+ iclc.setProperties("debug=true");
iclc.setClassName(DummySharedInMemoryCacheLoader.class.getName());
clc.addIndividualCacheLoaderConfig(iclc);
Modified: core/trunk/src/test/java/org/jboss/cache/transaction/ConcurrentBankTest.java
===================================================================
--- core/trunk/src/test/java/org/jboss/cache/transaction/ConcurrentBankTest.java 2008-05-14 10:24:03 UTC (rev 5838)
+++ core/trunk/src/test/java/org/jboss/cache/transaction/ConcurrentBankTest.java 2008-05-14 10:51:05 UTC (rev 5839)
@@ -36,17 +36,17 @@
public class ConcurrentBankTest
{
private CacheSPI<Object, Integer> cache;
- private static Log logger_ = LogFactory.getLog(ConcurrentBankTest.class);
+ private static Log log = LogFactory.getLog(ConcurrentBankTest.class);
private final Fqn NODE = Fqn.fromString("/cachetest");
private final int ROLLBACK_CHANCE = 100;
private static String customer[] = {"cu1", "cu2", "cu3"};
private static final int BOOKINGS = 1000;
- private static boolean _testFailedinThread = false;
+ private static boolean testFailedinThread = false;
private void failMain()
{
- _testFailedinThread = true;
+ testFailedinThread = true;
}
@BeforeMethod(alwaysRun = true)
@@ -69,53 +69,33 @@
TransactionSetup.cleanup();
}
- public void testConcurrentBooking()
+ public void testConcurrentBooking() throws Exception
{
Teller one, two;
- try
+ if (cache.getRoot().get(NODE) == null)
{
- if (cache.getRoot().get(NODE) == null)
- {
- cache.put(NODE, "cu1", 1000);
- cache.put(NODE, "cu2", 1000);
- cache.put(NODE, "cu3", 1000);
- }
+ cache.put(NODE, "cu1", 1000);
+ cache.put(NODE, "cu2", 1000);
+ cache.put(NODE, "cu3", 1000);
+ }
- one = new Teller("one", cache);
- two = new Teller("two", cache);
+ one = new Teller("one", cache);
+ two = new Teller("two", cache);
- one.start();
- TestingUtil.sleepThread((long) 100);
- two.start();
- one.join();
- two.join();
+ one.start();
+ TestingUtil.sleepThread((long) 100);
+ two.start();
+ one.join();
+ two.join();
- log("lock info:\n" + CachePrinter.printCacheLockingInfo(cache) + _testFailedinThread);
- if (_testFailedinThread)
- fail();
- }
- catch (Exception e)
- {
- e.printStackTrace();
- fail(e.toString());
- }
- finally
- {
- /*
- try {
- cache.remove(NODE);
- } catch (Exception e) {
- e.printStackTrace();
- fail();
- }
- */
- }
+ log("lock info:\n" + CachePrinter.printCacheLockingInfo(cache) + testFailedinThread);
+ assert !testFailedinThread;
}
private static void log(String msg)
{
// System.out.println("-- [" + Thread.currentThread() + "]: " + msg);
- logger_.info("-- [" + Thread.currentThread() + "]: " + msg);
+ log.info("-- [" + Thread.currentThread() + "]: " + msg);
}
private class Teller extends Thread
16 years, 7 months
JBoss Cache SVN: r5838 - core/trunk/src/main/java/org/jboss/cache.
by jbosscache-commits@lists.jboss.org
Author: manik.surtani(a)jboss.com
Date: 2008-05-14 06:24:03 -0400 (Wed, 14 May 2008)
New Revision: 5838
Modified:
core/trunk/src/main/java/org/jboss/cache/DataContainer.java
Log:
Updated
Modified: core/trunk/src/main/java/org/jboss/cache/DataContainer.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/DataContainer.java 2008-05-14 09:59:57 UTC (rev 5837)
+++ core/trunk/src/main/java/org/jboss/cache/DataContainer.java 2008-05-14 10:24:03 UTC (rev 5838)
@@ -52,11 +52,17 @@
{
this.configuration = configuration;
this.nodeFactory = nodeFactory;
+
+ // We need to create a root node even at this stage since certain components rely on this being available before
+ // start() is called.
+ // TODO: Investigate which components rely on this being available before start(), and why!
+ createRootNode();
}
@Start(priority = 12)
public void createRootNode()
{
+ if (trace) log.trace("Starting data container");
// create a new root temporarily.
NodeSPI tempRoot = nodeFactory.createRootDataNode();
// if we don't already have a root or the new (temp) root is of a different class (optimistic vs pessimistic) to
@@ -65,7 +71,11 @@
Class currentRootType = root == null ? null : ((NodeInvocationDelegate) root).getDelegationTarget().getClass();
Class tempRootType = ((NodeInvocationDelegate) tempRoot).getDelegationTarget().getClass();
- if (!tempRootType.equals(currentRootType)) setRoot(tempRoot);
+ if (!tempRootType.equals(currentRootType))
+ {
+ if (trace) log.trace("Setting root node to an instance of " + tempRootType);
+ setRoot(tempRoot);
+ }
}
@Stop(priority = 100)
16 years, 7 months
JBoss Cache SVN: r5837 - in core/trunk/src: main/java/org/jboss/cache/commands/tx and 5 other directories.
by jbosscache-commits@lists.jboss.org
Author: manik.surtani(a)jboss.com
Date: 2008-05-14 05:59:57 -0400 (Wed, 14 May 2008)
New Revision: 5837
Modified:
core/trunk/src/main/java/org/jboss/cache/InvocationContext.java
core/trunk/src/main/java/org/jboss/cache/UnversionedNode.java
core/trunk/src/main/java/org/jboss/cache/commands/tx/OptimisticPrepareCommand.java
core/trunk/src/main/java/org/jboss/cache/commands/tx/PrepareCommand.java
core/trunk/src/main/java/org/jboss/cache/interceptors/ActivationInterceptor.java
core/trunk/src/main/java/org/jboss/cache/interceptors/CacheLoaderInterceptor.java
core/trunk/src/main/java/org/jboss/cache/interceptors/CacheStoreInterceptor.java
core/trunk/src/main/java/org/jboss/cache/interceptors/CallInterceptor.java
core/trunk/src/main/java/org/jboss/cache/interceptors/InvalidationInterceptor.java
core/trunk/src/main/java/org/jboss/cache/interceptors/OptimisticNodeInterceptor.java
core/trunk/src/main/java/org/jboss/cache/interceptors/OptimisticReplicationInterceptor.java
core/trunk/src/main/java/org/jboss/cache/interceptors/ReplicationInterceptor.java
core/trunk/src/main/java/org/jboss/cache/interceptors/TxInterceptor.java
core/trunk/src/main/java/org/jboss/cache/transaction/TransactionEntry.java
core/trunk/src/main/java/org/jboss/cache/transaction/TransactionTable.java
core/trunk/src/test/java/org/jboss/cache/api/pfer/PutForExternalReadTestBase.java
core/trunk/src/test/java/org/jboss/cache/invocationcontext/TransactionTest.java
core/trunk/src/test/java/org/jboss/cache/options/cachemodelocal/CacheModeLocalTestBase.java
Log:
JBCACHE-1345: Invocations using LOCAL mode override cannot be rolled back
Modified: core/trunk/src/main/java/org/jboss/cache/InvocationContext.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/InvocationContext.java 2008-05-14 01:36:56 UTC (rev 5836)
+++ core/trunk/src/main/java/org/jboss/cache/InvocationContext.java 2008-05-14 09:59:57 UTC (rev 5837)
@@ -287,19 +287,6 @@
txHasMods = b;
}
- /**
- * Cache loader might have mods which are different from TX's mods; e.g. when cache is local and passivation is on.
- */
- public boolean isCacheLoaderHasMods()
- {
- return cacheLoaderHasMods;
- }
-
- public void setCacheLoaderHasMods(boolean cacheLoaderHasMods)
- {
- this.cacheLoaderHasMods = cacheLoaderHasMods;
- }
-
public boolean isLocalRollbackOnly()
{
return localRollbackOnly;
@@ -346,7 +333,6 @@
this.setOriginLocal(template.isOriginLocal());
this.setTransaction(template.getTransaction());
this.setTxHasMods(template.isTxHasMods());
- this.setCacheLoaderHasMods(template.isCacheLoaderHasMods());
}
@Override
Modified: core/trunk/src/main/java/org/jboss/cache/UnversionedNode.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/UnversionedNode.java 2008-05-14 01:36:56 UTC (rev 5836)
+++ core/trunk/src/main/java/org/jboss/cache/UnversionedNode.java 2008-05-14 09:59:57 UTC (rev 5837)
@@ -246,7 +246,6 @@
private NodeSPI getOrCreateChild(Object child_name, GlobalTransaction gtx, boolean createIfNotExists, boolean notify)
{
-
NodeSPI child;
if (child_name == null)
{
@@ -279,7 +278,7 @@
if (gtx != null)
{
CreateNodeCommand createNodeCommand = commandsFactory.buildCreateNodeCommand(child_fqn);
- transactionTable.addModification(gtx, createNodeCommand);
+ ctx.getTransactionEntry().addModification(createNodeCommand);
}
}
}
Modified: core/trunk/src/main/java/org/jboss/cache/commands/tx/OptimisticPrepareCommand.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/commands/tx/OptimisticPrepareCommand.java 2008-05-14 01:36:56 UTC (rev 5836)
+++ core/trunk/src/main/java/org/jboss/cache/commands/tx/OptimisticPrepareCommand.java 2008-05-14 09:59:57 UTC (rev 5837)
@@ -54,6 +54,12 @@
}
@Override
+ public OptimisticPrepareCommand clone() throws CloneNotSupportedException
+ {
+ return (OptimisticPrepareCommand) super.clone();
+ }
+
+ @Override
@SuppressWarnings("unchecked")
public void setParameters(int commandId, Object[] args)
{
Modified: core/trunk/src/main/java/org/jboss/cache/commands/tx/PrepareCommand.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/commands/tx/PrepareCommand.java 2008-05-14 01:36:56 UTC (rev 5836)
+++ core/trunk/src/main/java/org/jboss/cache/commands/tx/PrepareCommand.java 2008-05-14 09:59:57 UTC (rev 5837)
@@ -6,6 +6,7 @@
import org.jboss.cache.transaction.GlobalTransaction;
import org.jgroups.Address;
+import java.util.Collection;
import java.util.List;
/**
@@ -30,6 +31,11 @@
this.onePhaseCommit = onePhaseCommit;
}
+ public void removeModifications(Collection<ReversibleCommand> modificationsToRemove)
+ {
+ if (modifications != null) modifications.removeAll(modificationsToRemove);
+ }
+
public PrepareCommand()
{
}
@@ -111,6 +117,11 @@
return result;
}
+ @Override
+ public PrepareCommand clone() throws CloneNotSupportedException
+ {
+ return (PrepareCommand) super.clone();
+ }
@Override
public String toString()
Modified: core/trunk/src/main/java/org/jboss/cache/interceptors/ActivationInterceptor.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/interceptors/ActivationInterceptor.java 2008-05-14 01:36:56 UTC (rev 5836)
+++ core/trunk/src/main/java/org/jboss/cache/interceptors/ActivationInterceptor.java 2008-05-14 09:59:57 UTC (rev 5837)
@@ -292,7 +292,7 @@
}
List<Modification> cacheLoaderModifications = new ArrayList<Modification>();
- builder.visitCollection(null, entry.getCacheLoaderModifications());
+ builder.visitCollection(null, entry.getModifications());
if (cacheLoaderModifications.size() > 0)
{
loader.prepare(gtx, cacheLoaderModifications, false);
Modified: core/trunk/src/main/java/org/jboss/cache/interceptors/CacheLoaderInterceptor.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/interceptors/CacheLoaderInterceptor.java 2008-05-14 01:36:56 UTC (rev 5836)
+++ core/trunk/src/main/java/org/jboss/cache/interceptors/CacheLoaderInterceptor.java 2008-05-14 09:59:57 UTC (rev 5837)
@@ -441,7 +441,7 @@
TransactionEntry entry = ctx.getTransactionEntry();
if (entry == null) return false;
- for (ReversibleCommand txCacheCommand : entry.getCacheLoaderModifications())
+ for (ReversibleCommand txCacheCommand : entry.getModifications())
{
if (txCacheCommand instanceof RemoveNodeCommand && fqn.isChildOrEquals(txCacheCommand.getFqn())) return true;
}
Modified: core/trunk/src/main/java/org/jboss/cache/interceptors/CacheStoreInterceptor.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/interceptors/CacheStoreInterceptor.java 2008-05-14 01:36:56 UTC (rev 5836)
+++ core/trunk/src/main/java/org/jboss/cache/interceptors/CacheStoreInterceptor.java 2008-05-14 09:59:57 UTC (rev 5837)
@@ -105,7 +105,7 @@
if (inTransaction())
{
if (trace) log.trace("transactional so don't put stuff in the cloader yet.");
- if (ctx.isCacheLoaderHasMods())
+ if (ctx.isTxHasMods())
{
// this is a commit call.
GlobalTransaction gtx = command.getGlobalTransaction();
@@ -154,7 +154,7 @@
if (inTransaction())
{
if (trace) log.trace("transactional so don't put stuff in the cloader yet.");
- if (ctx.isCacheLoaderHasMods())
+ if (ctx.isTxHasMods())
{
GlobalTransaction gtx = command.getGlobalTransaction();
// this is a rollback method
@@ -352,7 +352,7 @@
{
throw new Exception("entry for transaction " + gtx + " not found in transaction table");
}
- List<ReversibleCommand> modifications = entry.getCacheLoaderModifications();
+ List<ReversibleCommand> modifications = entry.getModifications();
if (modifications.size() == 0)
{
if (trace) log.trace("Transaction has not logged any modifications!");
Modified: core/trunk/src/main/java/org/jboss/cache/interceptors/CallInterceptor.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/interceptors/CallInterceptor.java 2008-05-14 01:36:56 UTC (rev 5836)
+++ core/trunk/src/main/java/org/jboss/cache/interceptors/CallInterceptor.java 2008-05-14 09:59:57 UTC (rev 5837)
@@ -14,7 +14,6 @@
import org.jboss.cache.commands.write.RemoveDataCommand;
import org.jboss.cache.commands.write.RemoveKeyCommand;
import org.jboss.cache.commands.write.RemoveNodeCommand;
-import org.jboss.cache.config.Option;
import org.jboss.cache.factories.annotations.Inject;
import org.jboss.cache.factories.annotations.Start;
import org.jboss.cache.interceptors.base.CommandInterceptor;
@@ -165,20 +164,7 @@
}
else
{
- Option o = ctx.getOptionOverrides();
- if (o != null && o.isCacheModeLocal())
- {
- log.debug("Not adding method to modification list since cache mode local is set.");
- }
- else
- {
- // TODO: 2.2.0: Revisit this, this is a bug if a local rollback occurs!!
- transactionTable.addModification(gtx, command);
- }
-
- // TODO: 2.2.0: consolidate cache loader and regular modification lists!!
-// if (cacheLoaderManager != null)
- if (cacheLoadingEnabled) transactionTable.addCacheLoaderModification(gtx, command);
+ ctx.getTransactionEntry().addModification(command);
}
}
return result;
Modified: core/trunk/src/main/java/org/jboss/cache/interceptors/InvalidationInterceptor.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/interceptors/InvalidationInterceptor.java 2008-05-14 01:36:56 UTC (rev 5836)
+++ core/trunk/src/main/java/org/jboss/cache/interceptors/InvalidationInterceptor.java 2008-05-14 09:59:57 UTC (rev 5837)
@@ -37,9 +37,9 @@
import javax.transaction.SystemException;
import javax.transaction.Transaction;
+import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
-import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -125,17 +125,23 @@
{
Object retval = invokeNextInterceptor(ctx, command);
Transaction tx = ctx.getTransaction();
- if (tx != null && !optimistic)
+ if (tx != null)
{
if (trace) log.trace("Entering InvalidationInterceptor's prepare phase");
// fetch the modifications before the transaction is committed (and thus removed from the txTable)
GlobalTransaction gtx = ctx.getGlobalTransaction();
TransactionEntry entry = ctx.getTransactionEntry();
if (entry == null) throw new IllegalStateException("cannot find transaction entry for " + gtx);
- List<ReversibleCommand> modifications = new LinkedList<ReversibleCommand>(command.getModifications());
- if (modifications.size() > 0)
+
+ if (entry.hasModifications())
{
- broadcastInvalidate(modifications, gtx, tx, ctx);
+ if (entry.hasLocalModifications())
+ {
+ PrepareCommand clone = command.clone();
+ clone.removeModifications(entry.getLocalModifications());
+ command = clone;
+ }
+ broadcastInvalidate(command.getModifications(), gtx, tx, ctx);
}
else
{
@@ -156,10 +162,12 @@
GlobalTransaction gtx = ctx.getGlobalTransaction();
TransactionEntry entry = ctx.getTransactionEntry();
if (entry == null) throw new IllegalStateException("cannot find transaction entry for " + gtx);
- List<ReversibleCommand> modifications = new LinkedList<ReversibleCommand>(command.getModifications());
- if (modifications.size() > 0)
+
+ if (entry.hasModifications())
{
- txMods.put(gtx, modifications);
+ List<ReversibleCommand> mods = new ArrayList<ReversibleCommand>(entry.getModifications());
+ if (entry.hasLocalModifications()) mods.removeAll(entry.getLocalModifications());
+ txMods.put(gtx, mods);
}
}
return retval;
Modified: core/trunk/src/main/java/org/jboss/cache/interceptors/OptimisticNodeInterceptor.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/interceptors/OptimisticNodeInterceptor.java 2008-05-14 01:36:56 UTC (rev 5836)
+++ core/trunk/src/main/java/org/jboss/cache/interceptors/OptimisticNodeInterceptor.java 2008-05-14 09:59:57 UTC (rev 5837)
@@ -29,7 +29,6 @@
import org.jboss.cache.config.Option;
import org.jboss.cache.factories.annotations.Inject;
import org.jboss.cache.factories.annotations.Start;
-import org.jboss.cache.loader.CacheLoaderManager;
import org.jboss.cache.notifications.Notifier;
import static org.jboss.cache.notifications.event.NodeModifiedEvent.ModificationType.*;
import org.jboss.cache.optimistic.DataVersion;
@@ -57,17 +56,14 @@
*/
private NodeFactory nodeFactory;
private Notifier notifier;
- private CacheLoaderManager cacheLoaderManager;
private DataContainer dataContainer;
private long lockAcquisitionTimeout;
@Inject
- protected void injectDependencies(Notifier notifier, NodeFactory nodeFactory, CacheLoaderManager cacheLoaderManager,
- DataContainer dataContainer)
+ protected void injectDependencies(Notifier notifier, NodeFactory nodeFactory, DataContainer dataContainer)
{
this.notifier = notifier;
this.nodeFactory = nodeFactory;
- this.cacheLoaderManager = cacheLoaderManager;
this.dataContainer = dataContainer;
}
@@ -94,7 +90,7 @@
setVersioning(ctx, workspace, workspaceNode);
}
Object result = removeNode(workspace, workspaceNode, true, ctx);
- addToModificationList(gtx, command, ctx);
+ addToModificationList(command, ctx);
return result;
}
@@ -117,7 +113,7 @@
}
}
Object result = putDataKeyValueAndNotify(command.getKey(), command.getValue(), workspace, workspaceNode, ctx);
- addToModificationList(gtx, command, ctx);
+ addToModificationList(command, ctx);
return result;
}
@@ -140,7 +136,7 @@
}
}
putDataMapAndNotify(command.getData(), workspace, workspaceNode, ctx);
- addToModificationList(gtx, command, ctx);
+ addToModificationList(command, ctx);
return null;
}
@@ -160,7 +156,7 @@
setVersioning(ctx, workspace, workspaceNode);
}
moveNodeAndNotify(command.getTo(), workspaceNode, workspace, ctx);
- addToModificationList(gtx, command, ctx);
+ addToModificationList(command, ctx);
return null;
}
@@ -176,7 +172,7 @@
setVersioning(ctx, workspace, workspaceNode);
}
Object result = removeKeyAndNotify(command.getKey(), workspace, workspaceNode, ctx);
- addToModificationList(gtx, command, ctx);
+ addToModificationList(command, ctx);
return result;
}
@@ -191,7 +187,7 @@
setVersioning(ctx, workspace, workspaceNode);
}
removeDataAndNotify(workspace, workspaceNode, ctx);
- addToModificationList(gtx, command, ctx);
+ addToModificationList(command, ctx);
return null;
}
@@ -345,18 +341,15 @@
/**
* Adds a method call to the modification list of a given transaction's transaction entry
- *
- * @param gtx transaction
*/
- private void addToModificationList(GlobalTransaction gtx, ReversibleCommand command, InvocationContext ctx)
+ private void addToModificationList(ReversibleCommand command, InvocationContext ctx)
{
Option opt = ctx.getOptionOverrides();
if (opt == null || !opt.isCacheModeLocal())
{
- txTable.addModification(gtx, command);
+ ctx.getTransactionEntry().addModification(command);
if (log.isDebugEnabled()) log.debug("Adding command " + command + " to modification list");
}
- if (cacheLoaderManager != null) txTable.addCacheLoaderModification(gtx, command);
}
// -----------------------------------------------------------------
Modified: core/trunk/src/main/java/org/jboss/cache/interceptors/OptimisticReplicationInterceptor.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/interceptors/OptimisticReplicationInterceptor.java 2008-05-14 01:36:56 UTC (rev 5836)
+++ core/trunk/src/main/java/org/jboss/cache/interceptors/OptimisticReplicationInterceptor.java 2008-05-14 09:59:57 UTC (rev 5837)
@@ -35,6 +35,7 @@
import org.jboss.cache.optimistic.WorkspaceNode;
import org.jboss.cache.transaction.GlobalTransaction;
import org.jboss.cache.transaction.OptimisticTransactionEntry;
+import org.jboss.cache.transaction.TransactionEntry;
import org.jboss.cache.util.concurrent.ConcurrentHashSet;
import java.util.ArrayList;
@@ -87,12 +88,16 @@
if (!skipReplicationOfTransactionMethod(ctx))
{
GlobalTransaction gtx = getGlobalTransaction(ctx);
-
- if (!gtx.isRemote() && ctx.isOriginLocal())
+ TransactionEntry te = ctx.getTransactionEntry();
+ if (te.hasLocalModifications())
{
- // replicate the prepare call.
- broadcastPrepare(command, gtx, ctx);
+ OptimisticPrepareCommand replicablePrepareCommand = command.clone(); // makre sure we remove any "local" transactions
+ replicablePrepareCommand.removeModifications(te.getLocalModifications());
+ command = replicablePrepareCommand;
}
+
+ // replicate the prepare call.
+ broadcastPrepare(command, gtx, ctx);
}
return retval;
}
Modified: core/trunk/src/main/java/org/jboss/cache/interceptors/ReplicationInterceptor.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/interceptors/ReplicationInterceptor.java 2008-05-14 01:36:56 UTC (rev 5836)
+++ core/trunk/src/main/java/org/jboss/cache/interceptors/ReplicationInterceptor.java 2008-05-14 09:59:57 UTC (rev 5837)
@@ -1,6 +1,7 @@
package org.jboss.cache.interceptors;
import org.jboss.cache.InvocationContext;
+import org.jboss.cache.commands.ReversibleCommand;
import org.jboss.cache.commands.VisitableCommand;
import org.jboss.cache.commands.tx.CommitCommand;
import org.jboss.cache.commands.tx.PrepareCommand;
@@ -14,6 +15,7 @@
import org.jboss.cache.config.Configuration;
import org.jboss.cache.config.Option;
import org.jboss.cache.transaction.GlobalTransaction;
+import org.jboss.cache.transaction.TransactionEntry;
/**
* Takes care of replicating modifications to other nodes in a cluster. Also
@@ -49,6 +51,14 @@
public Object visitPrepareCommand(InvocationContext ctx, PrepareCommand command) throws Throwable
{
Object retVal = invokeNextInterceptor(ctx, command);
+ TransactionEntry te = ctx.getTransactionEntry();
+ if (te.hasLocalModifications())
+ {
+ PrepareCommand replicablePrepareCommand = command.clone(); // makre sure we remove any "local" transactions
+ replicablePrepareCommand.removeModifications(te.getLocalModifications());
+ command = replicablePrepareCommand;
+ }
+
if (!skipReplicationOfTransactionMethod(ctx)) runPreparePhase(command, command.getGlobalTransaction(), ctx);
return retVal;
}
@@ -74,6 +84,13 @@
{
ctx.getTransactionEntry().setForceAsyncReplication(true);
}
+
+ if (ctx.getOptionOverrides().isCacheModeLocal())
+ {
+ if (log.isDebugEnabled()) log.debug("Local mode override detected, will not replicate this command.");
+ ctx.getTransactionEntry().addLocalModification(command);
+ }
+
return returnValue;
}
else
@@ -130,17 +147,16 @@
configuration.getCacheMode() + ", exclude_self=" + true + ", timeout=" +
configuration.getSyncReplTimeout());
}
- if (!isSynchronous(ctx.getOptionOverrides()) || forceAsync)
+
+ replicateCall(ctx, command, !forceAsync && isSynchronous(ctx.getOptionOverrides()), ctx.getOptionOverrides());
+ }
+ else
+ {
+ if (ctx.getOptionOverrides().isCacheModeLocal())
{
- // 2. Replicate change to all *other* members (exclude self !)
- replicateCall(ctx, command, false, ctx.getOptionOverrides());
+ if (log.isDebugEnabled()) log.debug("Local mode override detected, will not replicate this command.");
+ ctx.getTransactionEntry().addLocalModification((ReversibleCommand) command);
}
- else
- {
- // REVISIT Needs to exclude itself and apply the local change manually.
- // This is needed such that transient field is modified properly in-VM.
- replicateCall(ctx, command, true, ctx.getOptionOverrides());
- }
}
return returnValue;
}
Modified: core/trunk/src/main/java/org/jboss/cache/interceptors/TxInterceptor.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/interceptors/TxInterceptor.java 2008-05-14 01:36:56 UTC (rev 5836)
+++ core/trunk/src/main/java/org/jboss/cache/interceptors/TxInterceptor.java 2008-05-14 09:59:57 UTC (rev 5837)
@@ -554,11 +554,10 @@
/**
* creates a commit()
*/
- protected void runCommitPhase(InvocationContext ctx, GlobalTransaction gtx, List modifications, List clModifications, boolean onePhaseCommit)
+ protected void runCommitPhase(InvocationContext ctx, GlobalTransaction gtx, List modifications, boolean onePhaseCommit)
{
// set the hasMods flag in the invocation ctx. This should not be replicated, just used locally by the interceptors.
ctx.setTxHasMods(modifications != null && modifications.size() > 0);
- ctx.setCacheLoaderHasMods(clModifications != null && clModifications.size() > 0);
try
{
VisitableCommand commitCommand = onePhaseCommit ? buildPrepareCommand(gtx, modifications, true) : commandsFactory.buildCommitCommand(gtx);
@@ -950,13 +949,11 @@
if (trace) log.trace("calling aftercompletion for " + gtx);
- List cacheLoaderModifications = null;
// set any transaction wide options as current for this thread.
if (entry != null)
{
// this should ideally be set in beforeCompletion(), after compacting the list.
if (modifications == null) modifications = entry.getModifications();
- cacheLoaderModifications = entry.getCacheLoaderModifications();
ctx.setOptionOverrides(entry.getOption());
}
if (tx != null) transactions.remove(tx);
@@ -966,7 +963,7 @@
case Status.STATUS_COMMITTED:
boolean onePhaseCommit = !configuration.getCacheMode().isSynchronous();
if (log.isDebugEnabled()) log.debug("Running commit phase. One phase? " + onePhaseCommit);
- runCommitPhase(ctx, gtx, modifications, cacheLoaderModifications, onePhaseCommit);
+ runCommitPhase(ctx, gtx, modifications, onePhaseCommit);
log.debug("Finished commit phase");
break;
case Status.STATUS_UNKNOWN:
Modified: core/trunk/src/main/java/org/jboss/cache/transaction/TransactionEntry.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/transaction/TransactionEntry.java 2008-05-14 01:36:56 UTC (rev 5836)
+++ core/trunk/src/main/java/org/jboss/cache/transaction/TransactionEntry.java 2008-05-14 09:59:57 UTC (rev 5837)
@@ -28,7 +28,6 @@
import java.util.LinkedList;
import java.util.List;
import java.util.ListIterator;
-import java.util.concurrent.CopyOnWriteArrayList;
/**
* Information associated with a {@link GlobalTransaction} about the transaction state.
@@ -66,12 +65,12 @@
/**
* List<ReversibleCommand> of modifications ({@link ReversibleCommand}). They will be replicated on TX commit
*/
- private final List<ReversibleCommand> modificationList = new LinkedList<ReversibleCommand>();
+ private List<ReversibleCommand> modificationList;
+ /**
+ * A list of modifications that have been encountered with a LOCAL mode option. These will be removed from the modification list during replication.
+ */
+ private List<ReversibleCommand> localModifications;
- // For some reason we see multiple threads accessing this list - even within the same tx. Could be due to reuse of
- // tx identifiers in the DummyTM, which is where we see this problem.
- private final List<ReversibleCommand> classLoadeModList = new CopyOnWriteArrayList<ReversibleCommand>();
-
/**
* LinkedHashSet<IdentityLock> of locks acquired by the transaction. We use
* a LinkedHashSet because we need efficient Set semantics (same lock can
@@ -103,29 +102,40 @@
public void addModification(ReversibleCommand command)
{
if (command == null) return;
+ if (modificationList == null) modificationList = new LinkedList<ReversibleCommand>();
modificationList.add(command);
}
- public void addCacheLoaderModification(ReversibleCommand command)
- {
- if (command != null) classLoadeModList.add(command);
- }
-
/**
* Returns all modifications.
*/
public List<ReversibleCommand> getModifications()
{
+ if (modificationList == null) return Collections.emptyList();
return modificationList;
}
- public List<ReversibleCommand> getCacheLoaderModifications()
+ /**
+ * Adds a modification to the local modification list.
+ */
+ public void addLocalModification(ReversibleCommand command)
{
- // make sure this isn't modified externally
- return Collections.unmodifiableList(classLoadeModList);
+ if (command == null) return;
+ if (localModifications == null) localModifications = new LinkedList<ReversibleCommand>();
+ localModifications.add(command);
}
/**
+ * Returns all modifications that have been invoked with the LOCAL cache mode option. These will also be in the standard modification list.
+ */
+ public List<ReversibleCommand> getLocalModifications()
+ {
+ if (localModifications == null) return Collections.emptyList();
+ return localModifications;
+ }
+
+
+ /**
* Adds the node that has been removed.
*
* @param fqn
@@ -391,17 +401,26 @@
*/
public boolean hasModifications()
{
- return !modificationList.isEmpty() || !classLoadeModList.isEmpty();
+ return modificationList != null && !modificationList.isEmpty();
}
/**
+ * @return true if any modifications have been invoked with cache mode being LOCAL.
+ */
+ public boolean hasLocalModifications()
+ {
+ return localModifications != null && !localModifications.isEmpty();
+ }
+
+
+ /**
* Cleans up internal state
*/
public void reset()
{
orderedSynchronizationHandler = null;
- modificationList.clear();
- classLoadeModList.clear();
+ if (modificationList != null) modificationList.clear();
+ if (localModifications != null) localModifications.clear();
option = null;
locks.clear();
if (dummyNodesCreatedByCacheLoader != null) dummyNodesCreatedByCacheLoader.clear();
Modified: core/trunk/src/main/java/org/jboss/cache/transaction/TransactionTable.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/transaction/TransactionTable.java 2008-05-14 01:36:56 UTC (rev 5836)
+++ core/trunk/src/main/java/org/jboss/cache/transaction/TransactionTable.java 2008-05-14 09:59:57 UTC (rev 5837)
@@ -11,7 +11,6 @@
import org.jboss.cache.CacheException;
import org.jboss.cache.InvocationContext;
import org.jboss.cache.RPCManager;
-import org.jboss.cache.commands.ReversibleCommand;
import org.jboss.cache.config.Configuration;
import org.jboss.cache.factories.annotations.Inject;
import org.jboss.cache.factories.annotations.NonVolatile;
@@ -201,34 +200,6 @@
}
/**
- * Adds a motification to the global transaction.
- */
- public void addModification(GlobalTransaction gtx, ReversibleCommand m)
- {
- TransactionEntry entry = get(gtx);
- if (entry == null)
- {
- log.error("transaction not found (globalTransaction=" + gtx + ")");
- return;
- }
- entry.addModification(m);
- }
-
- public void addCacheLoaderModification(GlobalTransaction gtx, ReversibleCommand m)
- {
- if (m != null)
- {
- TransactionEntry entry = get(gtx);
- if (entry == null)
- {
- log.error("transaction not found (globalTransaction=" + gtx + ")");
- return;
- }
- entry.addCacheLoaderModification(m);
- }
- }
-
- /**
* Adds a lock to the global transaction.
*/
public void addLock(GlobalTransaction gtx, NodeLock l)
Modified: core/trunk/src/test/java/org/jboss/cache/api/pfer/PutForExternalReadTestBase.java
===================================================================
--- core/trunk/src/test/java/org/jboss/cache/api/pfer/PutForExternalReadTestBase.java 2008-05-14 01:36:56 UTC (rev 5836)
+++ core/trunk/src/test/java/org/jboss/cache/api/pfer/PutForExternalReadTestBase.java 2008-05-14 09:59:57 UTC (rev 5837)
@@ -54,8 +54,6 @@
cache1 = (CacheSPI<String, String>) cf.createCache(UnitTestCacheConfigurationFactory.createConfiguration(cacheMode), false);
cache1.getConfiguration().setTransactionManagerLookupClass("org.jboss.cache.transaction.DummyTransactionManagerLookup");
cache1.getConfiguration().setNodeLockingScheme(optimistic ? Configuration.NodeLockingScheme.OPTIMISTIC : Configuration.NodeLockingScheme.PESSIMISTIC);
-// cache1.getConfiguration().setSyncCommitPhase(optimistic);
-// cache1.getConfiguration().setSyncRollbackPhase(optimistic);
cache1.start();
tm1 = cache1.getConfiguration().getRuntimeConfig().getTransactionManager();
@@ -63,8 +61,6 @@
cache2 = (CacheSPI<String, String>) cf.createCache(UnitTestCacheConfigurationFactory.createConfiguration(cacheMode), false);
cache2.getConfiguration().setTransactionManagerLookupClass("org.jboss.cache.transaction.DummyTransactionManagerLookup");
cache2.getConfiguration().setNodeLockingScheme(optimistic ? Configuration.NodeLockingScheme.OPTIMISTIC : Configuration.NodeLockingScheme.PESSIMISTIC);
-// cache2.getConfiguration().setSyncCommitPhase(optimistic);
-// cache2.getConfiguration().setSyncRollbackPhase(optimistic);
cache2.start();
tm2 = cache2.getConfiguration().getRuntimeConfig().getTransactionManager();
Modified: core/trunk/src/test/java/org/jboss/cache/invocationcontext/TransactionTest.java
===================================================================
--- core/trunk/src/test/java/org/jboss/cache/invocationcontext/TransactionTest.java 2008-05-14 01:36:56 UTC (rev 5836)
+++ core/trunk/src/test/java/org/jboss/cache/invocationcontext/TransactionTest.java 2008-05-14 09:59:57 UTC (rev 5837)
@@ -141,7 +141,7 @@
// check that the transaction entry hasn't leaked stuff.
assert entry.getModifications().isEmpty() : "Should have scrubbed modifications in transaction entry";
- assert entry.getCacheLoaderModifications().isEmpty() : "Should have scrubbed modifications in transaction entry";
+ assert entry.getLocks().isEmpty() : "Should have scrubbed modifications in transaction entry";
assert entry.getOrderedSynchronizationHandler() == null : "Should have removed the ordered sync handler";
}
Modified: core/trunk/src/test/java/org/jboss/cache/options/cachemodelocal/CacheModeLocalTestBase.java
===================================================================
--- core/trunk/src/test/java/org/jboss/cache/options/cachemodelocal/CacheModeLocalTestBase.java 2008-05-14 01:36:56 UTC (rev 5836)
+++ core/trunk/src/test/java/org/jboss/cache/options/cachemodelocal/CacheModeLocalTestBase.java 2008-05-14 09:59:57 UTC (rev 5837)
@@ -536,7 +536,7 @@
assertNull("should be null", cache2.get(fqn, key));
}
- public void testTransactionalBehaviour() throws Exception
+ public void testTransactionalBehaviourCommit() throws Exception
{
TransactionManager mgr = cache1.getTransactionManager();
mgr.begin();
@@ -604,13 +604,13 @@
cache2.getInvocationContext().getOptionOverrides().setCacheModeLocal(true);
cache2.put(fqn, key, "value2");
cache2.getInvocationContext().getOptionOverrides().reset();
- cache2.put(fqn, key, "value2");
+ cache2.put(fqn, key, "value4");
mgr.commit();
delay();
- assertEquals("value2", cache2.get(fqn, key));
+ assertEquals("value4", cache2.get(fqn, key));
if (!isInvalidation)
{
- assertEquals("value2", cache1.get(fqn, key));
+ assertEquals("value4", cache1.get(fqn, key));
}
else
{
@@ -619,6 +619,37 @@
}
+ public void testTransactionalBehaviourRollback() throws Exception
+ {
+ TransactionManager mgr = cache1.getTransactionManager();
+
+ // create these first ...
+ cache1.put("/a", key, "old");
+ cache1.put("/b", key, "old");
+ delay();
+ mgr.begin();
+ cache1.getInvocationContext().getOptionOverrides().reset();
+ cache1.put("/a", key, "value1");
+ cache1.getInvocationContext().getOptionOverrides().setCacheModeLocal(true);
+ cache1.put("/b", key, "value2");
+ mgr.rollback();
+ delay();
+ // cache1 should NOT have this
+ assert cache1.get("/a", key).equals("old");
+ assert cache1.get("/b", key).equals("old");
+
+ if (isInvalidation)
+ {
+ assert cache2.get("/a", key) == null;
+ assert cache2.get("/b", key) == null;
+ }
+ else
+ {
+ assert cache2.get("/a", key).equals("old");
+ assert cache2.get("/b", key).equals("old");
+ }
+ }
+
public void testTransactionalBehaviourViaNodeAPI() throws Exception
{
Node node1 = cache1.getRoot().addChild(fqn);
@@ -689,13 +720,13 @@
cache2.getInvocationContext().getOptionOverrides().setCacheModeLocal(true);
node2.put(key, "value2");
cache2.getInvocationContext().getOptionOverrides().reset();
- node2.put(key, "value2");
+ node2.put(key, "value4");
mgr.commit();
delay();
- assertEquals("value2", cache2.get(fqn, key));
+ assertEquals("value4", cache2.get(fqn, key));
if (!isInvalidation)
{
- assertEquals("value2", cache1.get(fqn, key));
+ assertEquals("value4", cache1.get(fqn, key));
}
else
{
16 years, 7 months
JBoss Cache SVN: r5836 - in core/trunk/src: main/java/org/jboss/cache/buddyreplication and 11 other directories.
by jbosscache-commits@lists.jboss.org
Author: genman
Date: 2008-05-13 21:36:56 -0400 (Tue, 13 May 2008)
New Revision: 5836
Modified:
core/trunk/src/main/java/org/jboss/cache/RPCManagerImpl.java
core/trunk/src/main/java/org/jboss/cache/StringFqn.java
core/trunk/src/main/java/org/jboss/cache/VersionedNode.java
core/trunk/src/main/java/org/jboss/cache/buddyreplication/BuddyManager.java
core/trunk/src/main/java/org/jboss/cache/config/CacheLoaderConfig.java
core/trunk/src/main/java/org/jboss/cache/config/ConfigurationComponent.java
core/trunk/src/main/java/org/jboss/cache/eviction/EvictionQueueList.java
core/trunk/src/main/java/org/jboss/cache/factories/BootstrapFactory.java
core/trunk/src/main/java/org/jboss/cache/factories/ComponentFactory.java
core/trunk/src/main/java/org/jboss/cache/factories/ComponentRegistry.java
core/trunk/src/main/java/org/jboss/cache/interceptors/Interceptor.java
core/trunk/src/main/java/org/jboss/cache/interceptors/InvalidationInterceptor.java
core/trunk/src/main/java/org/jboss/cache/invocation/NodeInvocationDelegate.java
core/trunk/src/main/java/org/jboss/cache/loader/FileCacheLoader.java
core/trunk/src/main/java/org/jboss/cache/loader/JDBCCacheLoaderOld.java
core/trunk/src/main/java/org/jboss/cache/loader/LocalDelegatingCacheLoaderConfig.java
core/trunk/src/main/java/org/jboss/cache/loader/s3/S3LoaderConfig.java
core/trunk/src/main/java/org/jboss/cache/lock/LockManager.java
core/trunk/src/main/java/org/jboss/cache/marshall/MarshalledValue.java
core/trunk/src/test/java/org/jboss/cache/eviction/DummyEvictionConfiguration.java
core/trunk/src/test/java/org/jboss/cache/options/TestVersion.java
Log:
Miscellaneous fixes pointed out by "findbugs"
Most to do with serialization
Modified: core/trunk/src/main/java/org/jboss/cache/RPCManagerImpl.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/RPCManagerImpl.java 2008-05-13 22:04:27 UTC (rev 5835)
+++ core/trunk/src/main/java/org/jboss/cache/RPCManagerImpl.java 2008-05-14 01:36:56 UTC (rev 5836)
@@ -261,9 +261,9 @@
channel = new JChannel(configuration.getClusterConfig());
}
}
- catch (ChannelException el)
+ catch (ChannelException e)
{
- el.printStackTrace();
+ throw new CacheException(e);
}
}
Modified: core/trunk/src/main/java/org/jboss/cache/StringFqn.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/StringFqn.java 2008-05-13 22:04:27 UTC (rev 5835)
+++ core/trunk/src/main/java/org/jboss/cache/StringFqn.java 2008-05-14 01:36:56 UTC (rev 5836)
@@ -19,7 +19,7 @@
*/
// TODO: 3.0.0: Implement proper String escaping.
@Experimental
-public class StringFqn extends Fqn
+public final class StringFqn extends Fqn
{
// Needs to be public because of NodeData serialization.
// TODO: Remove in 3.0.0 once we refactor NodeData to go through a cache marshaller instead of it's current serialization.
Modified: core/trunk/src/main/java/org/jboss/cache/VersionedNode.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/VersionedNode.java 2008-05-13 22:04:27 UTC (rev 5835)
+++ core/trunk/src/main/java/org/jboss/cache/VersionedNode.java 2008-05-14 01:36:56 UTC (rev 5836)
@@ -41,7 +41,7 @@
super(fqn.getLastElement(), fqn, data, false, cache);
if (parent == null && !fqn.isRoot()) throw new NullPointerException("parent");
this.parent = parent;
- if (this.version == null) this.version = DefaultDataVersion.ZERO;
+ this.version = DefaultDataVersion.ZERO;
log = LogFactory.getLog(VersionedNode.class);
}
Modified: core/trunk/src/main/java/org/jboss/cache/buddyreplication/BuddyManager.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/buddyreplication/BuddyManager.java 2008-05-13 22:04:27 UTC (rev 5835)
+++ core/trunk/src/main/java/org/jboss/cache/buddyreplication/BuddyManager.java 2008-05-14 01:36:56 UTC (rev 5836)
@@ -504,7 +504,15 @@
}
if (log.isInfoEnabled()) log.info("Removing self from buddy group " + groupName);
- buddyGroupsIParticipateIn.remove(groupName);
+
+ for (Map.Entry<Address, String> me : buddyPool.entrySet())
+ {
+ if (me.getValue().equals(groupName))
+ {
+ buddyGroupsIParticipateIn.remove(me.getKey());
+ break;
+ }
+ }
// remove backup data for this group
if (log.isInfoEnabled()) log.info("Removing backup data for group " + groupName);
Modified: core/trunk/src/main/java/org/jboss/cache/config/CacheLoaderConfig.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/config/CacheLoaderConfig.java 2008-05-13 22:04:27 UTC (rev 5835)
+++ core/trunk/src/main/java/org/jboss/cache/config/CacheLoaderConfig.java 2008-05-14 01:36:56 UTC (rev 5836)
@@ -184,7 +184,7 @@
private Properties properties;
private SingletonStoreConfig singletonStoreConfig;
- private CacheLoader cacheLoader;
+ private transient CacheLoader cacheLoader;
protected void populateFromBaseConfig(IndividualCacheLoaderConfig base)
{
Modified: core/trunk/src/main/java/org/jboss/cache/config/ConfigurationComponent.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/config/ConfigurationComponent.java 2008-05-13 22:04:27 UTC (rev 5835)
+++ core/trunk/src/main/java/org/jboss/cache/config/ConfigurationComponent.java 2008-05-14 01:36:56 UTC (rev 5836)
@@ -35,7 +35,7 @@
protected transient Log log = LogFactory.getLog(getClass());
private transient CacheSPI cache; // back-reference to test whether the cache is running.
private final Set<ConfigurationComponent> children = Collections.synchronizedSet(new HashSet<ConfigurationComponent>());
- private ComponentRegistry cr;
+ private transient ComponentRegistry cr;
// a workaround to get over immutability checks
private boolean accessible;
Modified: core/trunk/src/main/java/org/jboss/cache/eviction/EvictionQueueList.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/eviction/EvictionQueueList.java 2008-05-13 22:04:27 UTC (rev 5835)
+++ core/trunk/src/main/java/org/jboss/cache/eviction/EvictionQueueList.java 2008-05-14 01:36:56 UTC (rev 5836)
@@ -351,6 +351,8 @@
@Override
public boolean equals(Object o)
{
+ if (!(o instanceof EvictionListEntry))
+ return false;
EvictionListEntry entry = (EvictionListEntry) o;
return this.node.getFqn().equals(entry.node.getFqn());
}
Modified: core/trunk/src/main/java/org/jboss/cache/factories/BootstrapFactory.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/factories/BootstrapFactory.java 2008-05-13 22:04:27 UTC (rev 5835)
+++ core/trunk/src/main/java/org/jboss/cache/factories/BootstrapFactory.java 2008-05-14 01:36:56 UTC (rev 5836)
@@ -17,25 +17,23 @@
public class BootstrapFactory extends ComponentFactory
{
CacheSPI cacheSPI;
- Configuration configuration;
- ComponentRegistry componentRegistry;
public BootstrapFactory(CacheSPI cacheSPI, Configuration configuration, ComponentRegistry componentRegistry)
{
+ super(componentRegistry, configuration);
this.cacheSPI = cacheSPI;
- this.configuration = configuration;
- this.componentRegistry = componentRegistry;
}
- @SuppressWarnings("unchecked")
+ @Override
protected <T> T construct(Class<T> componentType)
{
- if (componentType.equals(CacheSPI.class)) return (T) cacheSPI;
+ if (componentType.isAssignableFrom(CacheSPI.class) ||
+ componentType.isAssignableFrom(Configuration.class) ||
+ componentType.isAssignableFrom(ComponentRegistry.class))
+ {
+ return componentType.cast(cacheSPI);
+ }
- if (componentType.equals(Configuration.class)) return (T) configuration;
-
- if (componentType.equals(ComponentRegistry.class)) return (T) componentRegistry;
-
throw new CacheException("Don't know how to handle type " + componentType);
}
}
Modified: core/trunk/src/main/java/org/jboss/cache/factories/ComponentFactory.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/factories/ComponentFactory.java 2008-05-13 22:04:27 UTC (rev 5835)
+++ core/trunk/src/main/java/org/jboss/cache/factories/ComponentFactory.java 2008-05-14 01:36:56 UTC (rev 5836)
@@ -25,6 +25,22 @@
protected ComponentRegistry componentRegistry;
protected Configuration configuration;
+ /**
+ * Constructs a new ComponentFactory.
+ */
+ public ComponentFactory(ComponentRegistry componentRegistry, Configuration configuration)
+ {
+ this.componentRegistry = componentRegistry;
+ this.configuration = configuration;
+ }
+
+ /**
+ * Constructs a new ComponentFactory.
+ */
+ public ComponentFactory()
+ {
+ }
+
@Inject
private void injectDependencies(Configuration configuration, ComponentRegistry componentRegistry)
{
@@ -49,4 +65,5 @@
}
if (!canConstruct) throw new ConfigurationException("Don't know how to construct " + requestedType);
}
+
}
Modified: core/trunk/src/main/java/org/jboss/cache/factories/ComponentRegistry.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/factories/ComponentRegistry.java 2008-05-13 22:04:27 UTC (rev 5835)
+++ core/trunk/src/main/java/org/jboss/cache/factories/ComponentRegistry.java 2008-05-14 01:36:56 UTC (rev 5836)
@@ -321,16 +321,12 @@
{
// hasn't yet been created. Create and put in registry
cf = instantiateFactory(cfClass);
- if (cf != null)
- {
- // we simply register this factory. Registration will take care of constructing any dependencies.
- registerComponent(cf, cfClass);
- }
+ if (cf == null)
+ throw new ConfigurationException("Unable to locate component factory for component " + componentClass);
+ // we simply register this factory. Registration will take care of constructing any dependencies.
+ registerComponent(cf, cfClass);
}
- if (cf == null)
- throw new ConfigurationException("Unable to locate component factory for component " + componentClass);
-
// ensure the component factory is in the STARTED state!
Component c = componentLookup.get(cfClass.getName());
if (c.instance != cf)
@@ -921,7 +917,7 @@
/**
* Wrapper to encapsulate a method along with a priority
*/
- class PrioritizedMethod implements Comparable<PrioritizedMethod>
+ static class PrioritizedMethod implements Comparable<PrioritizedMethod>
{
Method method;
Component component;
@@ -937,7 +933,7 @@
ReflectionUtil.invokeAccessibly(component.instance, method, null);
}
-
+ @Override
public String toString()
{
return "PrioritizedMethod{" +
Modified: core/trunk/src/main/java/org/jboss/cache/interceptors/Interceptor.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/interceptors/Interceptor.java 2008-05-13 22:04:27 UTC (rev 5835)
+++ core/trunk/src/main/java/org/jboss/cache/interceptors/Interceptor.java 2008-05-14 01:36:56 UTC (rev 5836)
@@ -42,7 +42,6 @@
public abstract class Interceptor extends CommandInterceptor
{
protected CacheSPI<?, ?> cache;
- protected Log log = null;
protected boolean trace;
public void setCache(CacheSPI cache)
Modified: core/trunk/src/main/java/org/jboss/cache/interceptors/InvalidationInterceptor.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/interceptors/InvalidationInterceptor.java 2008-05-13 22:04:27 UTC (rev 5835)
+++ core/trunk/src/main/java/org/jboss/cache/interceptors/InvalidationInterceptor.java 2008-05-14 01:36:56 UTC (rev 5836)
@@ -263,7 +263,7 @@
}
}
- public class InvalidationFilterVisitor extends AbstractVisitor
+ public static class InvalidationFilterVisitor extends AbstractVisitor
{
Set<Fqn> result;
public boolean containsPutForExternalRead;
Modified: core/trunk/src/main/java/org/jboss/cache/invocation/NodeInvocationDelegate.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/invocation/NodeInvocationDelegate.java 2008-05-13 22:04:27 UTC (rev 5835)
+++ core/trunk/src/main/java/org/jboss/cache/invocation/NodeInvocationDelegate.java 2008-05-14 01:36:56 UTC (rev 5836)
@@ -498,6 +498,6 @@
@Override
public String toString()
{
- return node == null ? null : node.toString();
+ return node == null ? "null" : node.toString();
}
}
Modified: core/trunk/src/main/java/org/jboss/cache/loader/FileCacheLoader.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/loader/FileCacheLoader.java 2008-05-13 22:04:27 UTC (rev 5835)
+++ core/trunk/src/main/java/org/jboss/cache/loader/FileCacheLoader.java 2008-05-14 01:36:56 UTC (rev 5836)
@@ -79,8 +79,8 @@
*/
public static final Pattern FQN_PATTERN = Pattern.compile("[\\\\\\/:*<>|\"?]");
private static boolean isOldWindows;
-
- public FileCacheLoader()
+
+ static
{
float osVersion = -1;
try
@@ -95,6 +95,10 @@
isOldWindows = System.getProperty("os.name").toLowerCase().startsWith("windows") && osVersion < 4;
}
+ public FileCacheLoader()
+ {
+ }
+
public void setConfig(IndividualCacheLoaderConfig base)
{
if (base instanceof FileCacheLoaderConfig)
@@ -331,14 +335,16 @@
/* ----------------------- Private methods ------------------------ */
- File getDirectory(Fqn fqn, boolean create)
+ File getDirectory(Fqn fqn, boolean create) throws IOException
{
File f = new File(getFullPath(fqn));
if (!f.exists())
{
if (create)
{
- f.mkdirs();
+ boolean make = f.mkdirs();
+ if (!make)
+ throw new IOException("Unable to mkdirs " + f);
}
else
{
Modified: core/trunk/src/main/java/org/jboss/cache/loader/JDBCCacheLoaderOld.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/loader/JDBCCacheLoaderOld.java 2008-05-13 22:04:27 UTC (rev 5835)
+++ core/trunk/src/main/java/org/jboss/cache/loader/JDBCCacheLoaderOld.java 2008-05-14 01:36:56 UTC (rev 5836)
@@ -76,7 +76,7 @@
@Override
public AdjListJDBCCacheLoaderConfig processConfig(IndividualCacheLoaderConfig base)
{
- if (config instanceof JDBCCacheLoaderOldConfig)
+ if (base instanceof JDBCCacheLoaderOldConfig)
{
config = (JDBCCacheLoaderOldConfig) base;
}
Modified: core/trunk/src/main/java/org/jboss/cache/loader/LocalDelegatingCacheLoaderConfig.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/loader/LocalDelegatingCacheLoaderConfig.java 2008-05-13 22:04:27 UTC (rev 5835)
+++ core/trunk/src/main/java/org/jboss/cache/loader/LocalDelegatingCacheLoaderConfig.java 2008-05-14 01:36:56 UTC (rev 5836)
@@ -9,7 +9,7 @@
{
private static final long serialVersionUID = 4626734068542420865L;
- private Cache delegate;
+ private transient Cache delegate;
public LocalDelegatingCacheLoaderConfig()
{
Modified: core/trunk/src/main/java/org/jboss/cache/loader/s3/S3LoaderConfig.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/loader/s3/S3LoaderConfig.java 2008-05-13 22:04:27 UTC (rev 5835)
+++ core/trunk/src/main/java/org/jboss/cache/loader/s3/S3LoaderConfig.java 2008-05-14 01:36:56 UTC (rev 5836)
@@ -30,9 +30,9 @@
private int port;
- private Bucket bucket = new Bucket("jboss-cache");
+ private transient Bucket bucket = new Bucket("jboss-cache");
- private CallingFormat callingFormat = CallingFormat.SUBDOMAIN;
+ private transient CallingFormat callingFormat = CallingFormat.SUBDOMAIN;
private String location = Connection.LOCATION_DEFAULT;
Modified: core/trunk/src/main/java/org/jboss/cache/lock/LockManager.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/lock/LockManager.java 2008-05-13 22:04:27 UTC (rev 5835)
+++ core/trunk/src/main/java/org/jboss/cache/lock/LockManager.java 2008-05-14 01:36:56 UTC (rev 5836)
@@ -129,7 +129,7 @@
else
{
if (trace)
- log.trace("failed to find or create child " + childName + " of node " + currentNode);
+ log.trace("failed to find or create child " + childName + " of node " + parent);
return false;
}
}
Modified: core/trunk/src/main/java/org/jboss/cache/marshall/MarshalledValue.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/marshall/MarshalledValue.java 2008-05-13 22:04:27 UTC (rev 5835)
+++ core/trunk/src/main/java/org/jboss/cache/marshall/MarshalledValue.java 2008-05-14 01:36:56 UTC (rev 5836)
@@ -138,7 +138,7 @@
int size = in.readInt();
raw = new byte[size];
cachedHashCode = 0;
- in.read(raw);
+ in.readFully(raw);
cachedHashCode = in.readInt();
}
Modified: core/trunk/src/test/java/org/jboss/cache/eviction/DummyEvictionConfiguration.java
===================================================================
--- core/trunk/src/test/java/org/jboss/cache/eviction/DummyEvictionConfiguration.java 2008-05-13 22:04:27 UTC (rev 5835)
+++ core/trunk/src/test/java/org/jboss/cache/eviction/DummyEvictionConfiguration.java 2008-05-14 01:36:56 UTC (rev 5836)
@@ -14,7 +14,7 @@
* @author Daniel Huang (dhuang(a)jboss.org)
* @version $Revision$
*/
-public class DummyEvictionConfiguration implements EvictionPolicyConfig
+public class DummyEvictionConfiguration implements EvictionPolicyConfig, Cloneable
{
public String getEvictionPolicyClass()
{
Modified: core/trunk/src/test/java/org/jboss/cache/options/TestVersion.java
===================================================================
--- core/trunk/src/test/java/org/jboss/cache/options/TestVersion.java 2008-05-13 22:04:27 UTC (rev 5835)
+++ core/trunk/src/test/java/org/jboss/cache/options/TestVersion.java 2008-05-14 01:36:56 UTC (rev 5836)
@@ -38,12 +38,13 @@
}
}
-
+ @Override
public String toString()
{
return "TestVersion-" + myVersion;
}
+ @Override
public boolean equals(Object other)
{
if (other instanceof TestVersion)
@@ -54,4 +55,11 @@
}
return false;
}
+
+ @Override
+ public int hashCode()
+ {
+ return myVersion.hashCode();
+ }
+
}
\ No newline at end of file
16 years, 7 months
JBoss Cache SVN: r5835 - in core/trunk/src: main/java/org/jboss/cache/commands/write and 4 other directories.
by jbosscache-commits@lists.jboss.org
Author: mircea.markus
Date: 2008-05-13 18:04:27 -0400 (Tue, 13 May 2008)
New Revision: 5835
Added:
core/trunk/src/test/java/org/jboss/cache/DataContainerTest.java
core/trunk/src/test/java/org/jboss/cache/mock/
core/trunk/src/test/java/org/jboss/cache/mock/NodeSpiMock.java
Modified:
core/trunk/src/main/java/org/jboss/cache/DataContainer.java
core/trunk/src/main/java/org/jboss/cache/Node.java
core/trunk/src/main/java/org/jboss/cache/commands/write/InvalidateCommand.java
core/trunk/src/main/java/org/jboss/cache/commands/write/PutDataMapCommand.java
core/trunk/src/main/java/org/jboss/cache/commands/write/RemoveDataCommand.java
core/trunk/src/main/java/org/jboss/cache/commands/write/RemoveNodeCommand.java
core/trunk/src/main/java/org/jboss/cache/interceptors/CacheStoreInterceptor.java
core/trunk/src/main/java/org/jboss/cache/interceptors/EvictionInterceptor.java
core/trunk/src/main/java/org/jboss/cache/interceptors/OptimisticCreateIfNotExistsInterceptor.java
core/trunk/src/main/java/org/jboss/cache/lock/LockManager.java
Log:
JBCACHE-1338 - added unit test for DataContainer
Modified: core/trunk/src/main/java/org/jboss/cache/DataContainer.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/DataContainer.java 2008-05-13 14:10:18 UTC (rev 5834)
+++ core/trunk/src/main/java/org/jboss/cache/DataContainer.java 2008-05-13 22:04:27 UTC (rev 5835)
@@ -52,11 +52,10 @@
{
this.configuration = configuration;
this.nodeFactory = nodeFactory;
- createRootNode();
}
@Start(priority = 12)
- private void createRootNode()
+ public void createRootNode()
{
// create a new root temporarily.
NodeSPI tempRoot = nodeFactory.createRootDataNode();
@@ -120,7 +119,7 @@
{
try
{
- return peek(fqn, null);
+ return peekVersioned(fqn, null);
}
catch (CacheException e)
{
@@ -138,13 +137,13 @@
* @param version version of the node to find
* @return a node, if found, or null if not.
*/
- public NodeSPI peek(Fqn fqn, DataVersion version)
+ public NodeSPI peekVersioned(Fqn fqn, DataVersion version)
{
- return peek(fqn, version, false);
+ return peekVersioned(fqn, version, false);
}
/**
- * Similar to {@link #peek(Fqn, org.jboss.cache.optimistic.DataVersion)} except that it throws a {@link org.jboss.cache.NodeNotExistsException}
+ * Similar to {@link #peekVersioned(Fqn, org.jboss.cache.optimistic.DataVersion)} except that it throws a {@link org.jboss.cache.NodeNotExistsException}
* if the node cannot be found.
*
* @param gtx global transaction
@@ -154,7 +153,7 @@
*/
public NodeSPI peekStrict(GlobalTransaction gtx, Fqn fqn, boolean includeInvalid)
{
- NodeSPI n = peek(fqn, null, includeInvalid);
+ NodeSPI n = peekVersioned(fqn, null, includeInvalid);
if (n == null)
{
StringBuilder builder = new StringBuilder();
@@ -176,7 +175,7 @@
* @param includeInvalidNodes if true, invalid nodes are considered
* @return the node, if found, or null otherwise.
*/
- public NodeSPI peek(Fqn fqn, DataVersion version, boolean includeInvalidNodes)
+ public NodeSPI peekVersioned(Fqn fqn, DataVersion version, boolean includeInvalidNodes)
{
if (fqn == null) return null;
@@ -311,7 +310,17 @@
result.add(fqn);
return result;
}
- buildNodesForEviction(node, result);
+ if (fqn.isRoot())
+ {
+ for (Object childName : node.getChildrenNamesDirect())
+ {
+ if (!node.isResident()) result.add(Fqn.fromRelativeElements(fqn, childName));
+ }
+ }
+ else if (!node.isResident())
+ {
+ result.add(fqn);
+ }
}
return result;
}
@@ -322,27 +331,11 @@
{
recursiveAddEvictionNodes(child, result);
}
- buildNodesForEviction(node, result);
- }
-
- private void buildNodesForEviction(NodeSPI node, List<Fqn> nodes)
- {
- if (node == null || node.isResident())
- {
- return;
- }
Fqn fqn = node.getFqn();
- if (fqn.isRoot())
+ if (node != null && !fqn.isRoot() && !node.isResident())
{
- for (Object childName : node.getChildrenNamesDirect())
- {
- if (!node.isResident()) nodes.add(Fqn.fromRelativeElements(fqn, childName));
- }
+ result.add(fqn);
}
- else
- {
- nodes.add(fqn);
- }
}
@Override
@@ -518,7 +511,6 @@
return false;
}
-
if (trace) log.trace("Performing a real remove for node " + f + ", marked for removal.");
if (skipMarkerCheck || n.isDeleted())
{
@@ -526,13 +518,14 @@
{
// do not actually delete; just remove deletion marker
n.markAsDeleted(true);
- // but now remove all children, since the call has been to remove("/")
- n.removeChildrenDirect();
// mark the node to be removed (and all children) as invalid so anyone holding a direct reference to it will
// be aware that it is no longer valid.
n.setValid(false, true);
n.setValid(true, false);
+
+ // but now remove all children, since the call has been to remove("/")
+ n.removeChildrenDirect();
return true;
}
else
@@ -575,8 +568,7 @@
}
else
{
- if (trace)
- log.trace("removing NODE as it is a leaf: evict(" + fqn + ")");
+ if (trace) log.trace("removing NODE as it is a leaf: evict(" + fqn + ")");
removeNode(fqn);
return true;
}
@@ -584,9 +576,10 @@
private void removeNode(Fqn fqn)
{
- NodeSPI targetNode = peek(fqn, null, true);
+ NodeSPI targetNode = peekVersioned(fqn, null, true);
if (targetNode == null) return;
NodeSPI parentNode = targetNode.getParent();
+ targetNode.setValid(false, false);
if (parentNode != null)
{
parentNode.removeChildDirect(fqn.getLastElement());
@@ -596,7 +589,7 @@
protected void removeData(Fqn fqn)
{
- NodeSPI n = peek(fqn, null);
+ NodeSPI n = peekVersioned(fqn, null);
if (n == null)
{
log.warn("node " + fqn + " not found");
@@ -647,30 +640,4 @@
}
return new Object[]{result, n};
}
-
- public void createNodesLocally(Fqn<?> fqn, Map<?, ?> data) throws CacheException
- {
- int treeNodeSize;
- if ((treeNodeSize = fqn.size()) == 0) return;
- NodeSPI n = root;
- for (int i = 0; i < treeNodeSize; i++)
- {
- Object childName = fqn.get(i);
- NodeSPI childNode = n.addChildDirect(Fqn.fromElements(childName));
- if (childNode == null)
- {
- if (trace)
- {
- log.trace("failed to find or create child " + childName + " of node " + n.getFqn());
- }
- return;
- }
- if (i == treeNodeSize - 1)
- {
- // set data
- childNode.putAllDirect(data);
- }
- n = childNode;
- }
- }
}
Modified: core/trunk/src/main/java/org/jboss/cache/Node.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/Node.java 2008-05-13 14:10:18 UTC (rev 5834)
+++ core/trunk/src/main/java/org/jboss/cache/Node.java 2008-05-13 22:04:27 UTC (rev 5835)
@@ -182,7 +182,7 @@
* <pre>
* if (node.get(key).equals(oldValue))
* {
- * node.putAll(key, newValue);
+ * node.put(key, newValue);
* return true;
* }
* else
Modified: core/trunk/src/main/java/org/jboss/cache/commands/write/InvalidateCommand.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/commands/write/InvalidateCommand.java 2008-05-13 14:10:18 UTC (rev 5834)
+++ core/trunk/src/main/java/org/jboss/cache/commands/write/InvalidateCommand.java 2008-05-13 22:04:27 UTC (rev 5835)
@@ -140,7 +140,7 @@
{
// Find the node. This will lock it (if <tt>locking</tt> is true) and
// add the temporarily created parent nodes to the TX's node list if tx != null)
- NodeSPI n = dataContainer.peek(fqn, dataVersion);
+ NodeSPI n = dataContainer.peekVersioned(fqn, dataVersion);
if (n == null)
{
log.warn("node " + fqn + " not found");
Modified: core/trunk/src/main/java/org/jboss/cache/commands/write/PutDataMapCommand.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/commands/write/PutDataMapCommand.java 2008-05-13 14:10:18 UTC (rev 5834)
+++ core/trunk/src/main/java/org/jboss/cache/commands/write/PutDataMapCommand.java 2008-05-13 22:04:27 UTC (rev 5835)
@@ -80,7 +80,7 @@
{
if (trace) log.trace("rollback(" + globalTransaction + ", " + fqn + ", " + data + ")");
- NodeSPI n = dataContainer.peek(fqn, null, true);
+ NodeSPI n = dataContainer.peekVersioned(fqn, null, true);
if (n != null)
{
n.clearDataDirect();
Modified: core/trunk/src/main/java/org/jboss/cache/commands/write/RemoveDataCommand.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/commands/write/RemoveDataCommand.java 2008-05-13 14:10:18 UTC (rev 5834)
+++ core/trunk/src/main/java/org/jboss/cache/commands/write/RemoveDataCommand.java 2008-05-13 22:04:27 UTC (rev 5835)
@@ -55,7 +55,7 @@
public Object perform(InvocationContext ctx)
{
if (trace) log.trace("perform(" + globalTransaction + ", \"" + fqn + "\")");
- NodeSPI targetNode = dataContainer.peek(fqn, dataVersion);
+ NodeSPI targetNode = dataContainer.peekVersioned(fqn, dataVersion);
if (targetNode == null)
{
log.warn("node " + fqn + " not found");
Modified: core/trunk/src/main/java/org/jboss/cache/commands/write/RemoveNodeCommand.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/commands/write/RemoveNodeCommand.java 2008-05-13 14:10:18 UTC (rev 5834)
+++ core/trunk/src/main/java/org/jboss/cache/commands/write/RemoveNodeCommand.java 2008-05-13 22:04:27 UTC (rev 5835)
@@ -59,7 +59,7 @@
log.trace("perform(" + globalTransaction + ", \"" + fqn + "\", undo=" + createUndoOps + ")");
// Find the node. This will add the temporarily created parent nodes to the TX's node list if globalTransaction != null)
- targetNode = dataContainer.peek(fqn, dataVersion, true);
+ targetNode = dataContainer.peekVersioned(fqn, dataVersion, true);
if (targetNode == null)
{
if (trace) log.trace("node " + fqn + " not found");
Modified: core/trunk/src/main/java/org/jboss/cache/interceptors/CacheStoreInterceptor.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/interceptors/CacheStoreInterceptor.java 2008-05-13 14:10:18 UTC (rev 5834)
+++ core/trunk/src/main/java/org/jboss/cache/interceptors/CacheStoreInterceptor.java 2008-05-13 22:04:27 UTC (rev 5835)
@@ -263,7 +263,7 @@
{
loader.removeData(command.getFqn());
// if we are erasing all the data then consider this node loaded
- NodeSPI n = dataContainer.peek(command.getFqn(), false, false);//cache.peek(fqn, false);
+ NodeSPI n = dataContainer.peek(command.getFqn(), false, false);
n.setDataLoaded(true);
return returnValue;
}
Modified: core/trunk/src/main/java/org/jboss/cache/interceptors/EvictionInterceptor.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/interceptors/EvictionInterceptor.java 2008-05-13 14:10:18 UTC (rev 5834)
+++ core/trunk/src/main/java/org/jboss/cache/interceptors/EvictionInterceptor.java 2008-05-13 22:04:27 UTC (rev 5835)
@@ -231,7 +231,7 @@
return;
}
- NodeSPI<?, ?> nodeSPI = dataContainer.peek(event.getFqn(), false, false);//cache.peek(event.getFqn(), false);
+ NodeSPI<?, ?> nodeSPI = dataContainer.peek(event.getFqn(), false, false);
//we do not trigger eviction events for resident nodes
if (nodeSPI != null && nodeSPI.isResident())
{
Modified: core/trunk/src/main/java/org/jboss/cache/interceptors/OptimisticCreateIfNotExistsInterceptor.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/interceptors/OptimisticCreateIfNotExistsInterceptor.java 2008-05-13 14:10:18 UTC (rev 5834)
+++ core/trunk/src/main/java/org/jboss/cache/interceptors/OptimisticCreateIfNotExistsInterceptor.java 2008-05-13 22:04:27 UTC (rev 5835)
@@ -103,7 +103,6 @@
*/
private void createNode(InvocationContext ctx, Fqn targetFqn, boolean suppressNotification) throws CacheException
{
-// if (cache.peek(targetFqn, false) != null) return;
if (dataContainer.peek(targetFqn, false, false) != null) return;
// we do nothing if targetFqn is null
if (targetFqn == null) return;
Modified: core/trunk/src/main/java/org/jboss/cache/lock/LockManager.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/lock/LockManager.java 2008-05-13 14:10:18 UTC (rev 5834)
+++ core/trunk/src/main/java/org/jboss/cache/lock/LockManager.java 2008-05-13 22:04:27 UTC (rev 5835)
@@ -226,8 +226,6 @@
return true;// we're doing a remove and we've reached the PARENT node of the target to be removed.
}
if (!isTargetNode && dataContainer.peek(targetFqn.getAncestor(currentNodeIndex + 2), false, false) == null)
- //if (!isTargetNode && cache.peek(targetFqn.getAncestor(currentNodeIndex + 2), false) == null)
- //if (!isTargetNode && cache.peek(new Fqn(currentNode.getFqn(), targetFqn.get(currentNodeIndex + 1)), false) == null)
{
return createIfNotExists;// we're at a node in the tree, not yet at the target node, and we need to create the next node. So we need a WL here.
}
Added: core/trunk/src/test/java/org/jboss/cache/DataContainerTest.java
===================================================================
--- core/trunk/src/test/java/org/jboss/cache/DataContainerTest.java (rev 0)
+++ core/trunk/src/test/java/org/jboss/cache/DataContainerTest.java 2008-05-13 22:04:27 UTC (rev 5835)
@@ -0,0 +1,332 @@
+package org.jboss.cache;
+
+import org.jboss.cache.config.Configuration;
+import org.jboss.cache.marshall.NodeData;
+import org.jboss.cache.mock.NodeSpiMock;
+import org.jboss.cache.optimistic.DataVersion;
+import org.jboss.cache.optimistic.DefaultDataVersion;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Tests functionality from DataContainer.
+ *
+ * @author Mircea.Markus(a)jboss.com
+ * @since 2.2
+ */
+@Test(groups = "unit")
+public class DataContainerTest
+{
+ private DataContainer container;
+
+ //node structure on which all tests are being run.
+ private NodeSpiMock root;
+ private Fqn a = Fqn.fromString("/a");
+ private NodeSpiMock aNode;
+ private Fqn ab = Fqn.fromString("/a/b");
+ private NodeSpiMock abNode;
+ private Fqn abc = Fqn.fromString("/a/b/c");
+ private NodeSpiMock abcNode;
+ private Fqn ad = Fqn.fromString("/a/d");
+ private NodeSpiMock adNode;
+ private Fqn ade = Fqn.fromString("/a/d/e");
+ private NodeSpiMock adeNode;
+ private Fqn adf = Fqn.fromString("/a/d/f");
+ private NodeSpiMock adfNode;
+ private Fqn adfh = Fqn.fromString("/a/d/f/h");
+ private NodeSpiMock adfhNode;
+ private Fqn adfg = Fqn.fromString("/a/d/f/g");
+ private NodeSpiMock adfgNode;
+ private Fqn notExistent = Fqn.fromString("aaa" + System.currentTimeMillis());
+ //end of node structure.
+
+ @BeforeMethod
+ public void setUp()
+ {
+ root = new NodeSpiMock(Fqn.ROOT);
+ container = new DataContainer();
+ container.setRoot(root);
+ aNode = (NodeSpiMock) root.addChild(a);
+ abNode = (NodeSpiMock) root.addChild(ab);
+ abcNode = (NodeSpiMock) root.addChild(abc);
+ adNode = (NodeSpiMock) root.addChild(ad);
+ adeNode = (NodeSpiMock) root.addChild(ade);
+ adfNode = (NodeSpiMock) root.addChild(adf);
+ adfhNode = (NodeSpiMock) root.addChild(adfh);
+ adfgNode = (NodeSpiMock) root.addChild(adfg);
+ }
+
+ /**
+ * tests {@link org.jboss.cache.DataContainer#peek(Fqn, boolean, boolean)} method
+ */
+ public void testPeekNodesSimple()
+ {
+ assert root == container.peek(Fqn.ROOT, true, true);
+ assert adfgNode == container.peek(adfg, false, false);
+ assert adfgNode == container.peek(adfg, false, true);
+ assert adfgNode == container.peek(adfg, true, true);
+ }
+
+ /**
+ * tests {@link org.jboss.cache.DataContainer#peek(Fqn, boolean, boolean)} for invalid nodes.
+ */
+ public void testPeekInvalidNodes()
+ {
+ adfgNode.setValid(false, false);
+ assert null == container.peek(adfg, true, false);
+ assert adfgNode == container.peek(adfg, true, true);
+ }
+
+ /**
+ * tests {@link org.jboss.cache.DataContainer#peek(Fqn, boolean, boolean)} method for deleted nodes.
+ */
+ public void testPeekDeletedNodes()
+ {
+ adfgNode.markAsDeleted(true);
+ assert null == container.peek(adfg, false, false);
+ assert adfgNode == container.peek(adfg, true, false);
+ }
+
+ /**
+ * tests {@link org.jboss.cache.DataContainer#peekVersioned(Fqn, org.jboss.cache.optimistic.DataVersion, boolean)} method.
+ */
+ public void testPeekVersioned()
+ {
+ assert adfgNode == container.peekVersioned(adfg, null, true) : "if data version is null this returns same value as peek(boolean, boolean)";
+
+ //test pessimistic loking
+ Configuration config = new Configuration();
+ config.setNodeLockingOptimistic(false);
+ DataVersion dataVersion = new DefaultDataVersion(2);
+ container.injectDependencies(config, null);
+ assert adfgNode == container.peekVersioned(adfg, dataVersion, true) : "if NOT opt locking same value as peek(boolean, boolean) expected";
+
+ //test optimistic locking with same version
+ config.setNodeLockingOptimistic(true);
+ DataVersion adfgDataVersion = new DefaultDataVersion(2);
+ adfgNode.setVersion(adfgDataVersion);
+ assert adfgNode == container.peekVersioned(adfg, adfgDataVersion, true) : "same version, expcting node to be returned";
+
+ //test optimistic locking with a an older
+ try
+ {
+ container.peekVersioned(adfg, new DefaultDataVersion(1), true);
+ assert false : "exception expected as version changed.";
+ } catch (CacheException e)
+ {
+ //expected
+ }
+ }
+
+ /**
+ * tests {@link DataContainer#peekStrict(org.jboss.cache.transaction.GlobalTransaction, Fqn, boolean)}.
+ */
+ public void testPeekStrict()
+ {
+ assert adfgNode == container.peekStrict(null, adfg, true) : "if data version is null this returns same value as peek(boolean, boolean)";
+
+ try
+ {
+ container.peekStrict(null, notExistent, true);
+ assert false : "excpetion expected as node does not exist";
+ } catch (Exception e)
+ {
+ //expected
+ }
+ }
+
+ /**
+ * tests {@link org.jboss.cache.DataContainer#exists(Fqn)}
+ */
+ public void testsExists()
+ {
+ assert container.exists(ab) : "ab exists";
+ abNode.markAsDeleted(true);
+ assert !container.exists(ab) : "ab marked as deleted";
+ assert container.exists(ad);
+ adNode.setValid(false, false);
+ assert !container.exists(ade) : "its parent was marked as invalid";
+ }
+
+ /**
+ * tests {@link org.jboss.cache.DataContainer#hasChildren(Fqn)}
+ */
+ public void testHasChildren()
+ {
+ assert container.hasChildren(ad) : " ade is a child of ad";
+ assert !container.hasChildren(notExistent) : " this one does not exist";
+ assert !container.hasChildren(adfg) : "this one exists but does not have children";
+ adNode.setValid(false, false);
+ assert !container.hasChildren(ad) : "ad exists and has children but is invalid";
+ }
+
+ /**
+ * test {@link DataContainer#buildNodeData(java.util.List, NodeSPI)}
+ */
+ public void testBuildNodeData()
+ {
+ abNode.put("ab", "ab");
+ abcNode.put("abc", "abc");
+ List<NodeData> result = new ArrayList<NodeData>();
+ container.buildNodeData(result, abNode);
+ assert result.size() == 2;
+ assert result.contains(new NodeData(ab, abNode.getData()));
+ assert result.contains(new NodeData(abc, abcNode.getData()));
+ }
+
+ /**
+ * tests {@link org.jboss.cache.DataContainer#getNodesForEviction(Fqn, boolean)} in a nonrecursive scenario.
+ */
+ public void testGetNodesForEvictionNonrecursive()
+ {
+ //check for root first
+ List<Fqn> result = container.getNodesForEviction(Fqn.ROOT, false);
+ assert result.size() == 1 : "for root the direct children are considered for eviction";
+ assert result.contains(a);
+
+ //check normal
+ result = container.getNodesForEviction(ad, false);
+ assert result.size() == 1 : "one child expected";
+ assert result.contains(ad);
+
+ //check resident scenario
+ adNode.setResident(true);
+ result = container.getNodesForEviction(ad, false);
+ assert result.size() == 0 : "no children expected";
+ }
+
+ /**
+ * tests {@link org.jboss.cache.DataContainer#getNodesForEviction(Fqn, boolean)} in a recursive scenario.
+ */
+ public void testGetNodesForEvictionRecursive()
+ {
+ //check for root first
+ List<Fqn> result = container.getNodesForEviction(Fqn.ROOT, true);
+ assert result.size() == 8 : "all children are considered for eviction";
+
+ //check normal
+ result = container.getNodesForEviction(ad, true);
+ assert result.size() == 5 : "five childrens expected";
+ assert result.contains(ad);
+ assert result.contains(ade);
+ assert result.contains(adf);
+ assert result.contains(adfh);
+ assert result.contains(adfg);
+
+ //check resident scenario
+ adNode.setResident(true);
+ result = container.getNodesForEviction(ad, true);
+ assert result.size() == 4 : "only children expected";
+ assert result.contains(ade);
+ assert result.contains(adf);
+ assert result.contains(adfh);
+ assert result.contains(adfg);
+ }
+
+ /**
+ * tests {@link DataContainer#getNumberOfNodes()}
+ */
+ public void testGetNumberOfNodes()
+ {
+ assert container.getNumberOfNodes() == 8 : "eoght nodes expected";
+ }
+
+ /**
+ * tests {@link DataContainer#removeFromDataStructure(Fqn, boolean)} having skipMarkerCheck set to false.
+ */
+ public void removeFromDataStructureNoSkip1()
+ {
+ //check inexisten node
+ assert !container.removeFromDataStructure(notExistent, false);
+
+ //check root - all the subnodes should be deleted and marked as invalid, but the root itself
+ root.markAsDeleted(true);
+ assert container.removeFromDataStructure(Fqn.ROOT, false);
+ assert !aNode.isValid();
+ assert !abNode.isValid();
+ assert !abcNode.isValid();
+ assert !adNode.isValid();
+ assert !adeNode.isValid();
+ assert !adfNode.isValid();
+ assert !adfgNode.isValid();
+ assert !adfhNode.isValid();
+ assert root.isValid();
+ }
+
+ /**
+ * tests {@link DataContainer#removeFromDataStructure(Fqn, boolean)} having skipMarkerCheck set to false.
+ */
+ public void removeFromDataStructureNoSkip2()
+ {
+ //check root - all the subnodes should be deleted and marked as invalid, but the root itself
+ root.markAsDeleted(false);
+ assert !container.removeFromDataStructure(Fqn.ROOT, false);
+
+ //check a normal node
+ adNode.markAsDeleted(true);
+ assert container.removeFromDataStructure(ad, false);
+ assert !adeNode.isValid();
+ assert !adfNode.isValid();
+ assert !adfhNode.isValid();
+ assert !adfhNode.isValid();
+ }
+
+ /**
+ * tests {@link DataContainer#removeFromDataStructure(Fqn, boolean)} having skipMarkerCheck set to true.
+ */
+ public void removeFromDataStructureWithSkip()
+ {
+ //check inexisten node
+ assert !container.removeFromDataStructure(notExistent, false);
+
+ //check root - all the subnodes should be deleted and marked as invalid, but the root itself
+ assert container.removeFromDataStructure(Fqn.ROOT, true);
+ assert !aNode.isValid();
+ assert !abNode.isValid();
+ assert !abcNode.isValid();
+ assert !adNode.isValid();
+ assert !adeNode.isValid();
+ assert !adfNode.isValid();
+ assert !adfgNode.isValid();
+ assert !adfhNode.isValid();
+ assert root.isValid();
+ }
+
+ /**
+ * tests {@link org.jboss.cache.DataContainer#evict(Fqn)}
+ */
+ public void testEvict()
+ {
+ //tests eviction of leaf nodes
+ assert container.evict(abc);
+ assert !abcNode.isValid();
+ assert !abNode.hasChild("c");
+
+ //test eviction of intermediate nodes
+ adNode.put("key", "value");
+ assert !container.evict(ad);
+ assert adNode.isValid();
+ assert adNode.getData().isEmpty();
+ assert aNode.hasChild("d");
+ assert adNode.hasChild("e");
+ }
+
+ /**
+ * test {@link org.jboss.cache.DataContainer#createNodes(Fqn)}
+ */
+ public void testCreateNodes()
+ {
+ Object[] objects = container.createNodes(Fqn.fromString("/a/x/y/z"));
+ List result = (List) objects[0];
+ assert result.size() == 3;
+ assert ((NodeSPI)result.get(0)).getFqn().equals(Fqn.fromString("/a/x"));
+ assert ((NodeSPI)result.get(1)).getFqn().equals(Fqn.fromString("/a/x/y"));
+ assert ((NodeSPI)result.get(2)).getFqn().equals(Fqn.fromString("/a/x/y/z"));
+ NodeSPI target = (NodeSPI) objects[1];
+ assert target != null;
+ assert target.getFqn().toString().equals("/a/x/y/z");
+ }
+}
Added: core/trunk/src/test/java/org/jboss/cache/mock/NodeSpiMock.java
===================================================================
--- core/trunk/src/test/java/org/jboss/cache/mock/NodeSpiMock.java (rev 0)
+++ core/trunk/src/test/java/org/jboss/cache/mock/NodeSpiMock.java 2008-05-13 22:04:27 UTC (rev 5835)
@@ -0,0 +1,424 @@
+package org.jboss.cache.mock;
+
+import org.jboss.cache.NodeSPI;
+import org.jboss.cache.Fqn;
+import org.jboss.cache.Node;
+import org.jboss.cache.CacheSPI;
+import org.jboss.cache.optimistic.DataVersion;
+import org.jboss.cache.lock.NodeLock;
+import org.jboss.cache.transaction.GlobalTransaction;
+
+import java.util.*;
+
+/**
+ * @author Mircea.Markus(a)jboss.com
+ * @since 2.2
+ */
+public class NodeSpiMock implements NodeSPI
+{
+
+ boolean isChildrenLoaded;
+ boolean isDataLoaded;
+ Map<Object, NodeSpiMock> children = new HashMap<Object, NodeSpiMock>();
+ boolean isDeleted = false;
+ boolean isValid = true;
+ DataVersion version = null;
+ Map data = new HashMap();
+ NodeSpiMock parent;
+ Fqn fqn;
+ private boolean isResident = false;
+
+ public NodeSpiMock(Fqn fqn)
+ {
+ this.fqn = fqn;
+ }
+
+ public boolean isChildrenLoaded()
+ {
+ return isChildrenLoaded;
+ }
+
+ public void setChildrenLoaded(boolean loaded)
+ {
+ this.isChildrenLoaded = loaded;
+ }
+
+ public boolean isDataLoaded()
+ {
+ return isDataLoaded;
+ }
+
+ public void setDataLoaded(boolean dataLoaded)
+ {
+ this.isDataLoaded = dataLoaded;
+ }
+
+ public Map getChildrenMapDirect()
+ {
+ return children;
+ }
+
+ public void setChildrenMapDirect(Map children)
+ {
+ this.children = new HashMap(children);
+ }
+
+ public NodeSPI getOrCreateChild(Object name, GlobalTransaction tx)
+ {
+ if (children.containsKey(name)) return children.get(name);
+ NodeSpiMock child = newChild(name);
+ return child;
+ }
+
+ private NodeSpiMock newChild(Object name)
+ {
+ NodeSpiMock child = new NodeSpiMock(Fqn.fromRelativeElements(fqn, name));
+ child.parent = this;
+ children.put(name, child);
+ return child;
+ }
+
+ public NodeLock getLock()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ public void setFqn(Fqn f)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ public boolean isDeleted()
+ {
+ return isDeleted;
+ }
+
+ public void markAsDeleted(boolean marker)
+ {
+ this.isDeleted = marker;
+ }
+
+ public void markAsDeleted(boolean marker, boolean recursive)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ public void addChild(Object nodeName, Node nodeToAdd)
+ {
+ children.put(nodeName, (NodeSpiMock) nodeToAdd);
+ ((NodeSpiMock) nodeToAdd).parent = this;
+ ((NodeSpiMock) nodeToAdd).fqn = Fqn.fromRelativeElements(fqn, nodeName);
+ }
+
+ public void printDetails(StringBuffer sb, int indent)
+ {
+ //skip
+ }
+
+ public void print(StringBuffer sb, int indent)
+ {
+ //skip
+ }
+
+ public void setVersion(DataVersion version)
+ {
+ this.version = version;
+ }
+
+ public DataVersion getVersion()
+ {
+ return version;
+ }
+
+ public Set getChildrenDirect()
+ {
+ return new HashSet(children.values());
+ }
+
+ public void removeChildrenDirect()
+ {
+ children.clear();
+ }
+
+ public Set getChildrenDirect(boolean includeMarkedAsDeleted)
+ {
+ Set result = new HashSet();
+ for (NodeSpiMock child : children.values())
+ {
+ if (!includeMarkedAsDeleted && child.isDeleted()) continue;
+ result.add(child);
+ }
+ return result;
+ }
+
+ public NodeSPI getChildDirect(Object childName)
+ {
+ return children.get(childName);
+ }
+
+ public NodeSPI addChildDirect(Fqn childName)
+ {
+ if (childName.size() == 0) return this;
+ Object directChildName = childName.get(0);
+ NodeSpiMock directChild = children.get(directChildName);
+ Fqn subFqn = childName.getSubFqn(1, childName.size());
+ if(directChild == null)
+ {
+ directChild = newChild(directChildName);
+ }
+ return directChild.addChildDirect(subFqn);
+ }
+
+ public NodeSPI addChildDirect(Fqn f, boolean notify)
+ {
+ return addChildDirect(f);
+ }
+
+ public NodeSPI addChildDirect(Object childName, boolean notify)
+ {
+ return newChild(childName);
+ }
+
+ public void addChildDirect(NodeSPI child)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ public NodeSPI getChildDirect(Fqn childName)
+ {
+ return children.get(childName.getLastElement());
+ }
+
+ public boolean removeChildDirect(Fqn fqn)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ public boolean removeChildDirect(Object childName)
+ {
+ return children.remove(childName) != null;
+ }
+
+ public Object removeDirect(Object key)
+ {
+ return data.remove(key);
+ }
+
+ public Object putDirect(Object key, Object value)
+ {
+ return data.put(key, value);
+ }
+
+ public void putAllDirect(Map data)
+ {
+ data.putAll(data);
+ }
+
+ public Map getDataDirect()
+ {
+ return data;
+ }
+
+ public Object getDirect(Object key)
+ {
+ return data.get(key);
+ }
+
+ public void clearDataDirect()
+ {
+ data.clear();
+ }
+
+ public Set getKeysDirect()
+ {
+ return data.keySet();
+ }
+
+ public Set getChildrenNamesDirect()
+ {
+ return new HashSet(children.keySet());
+ }
+
+ public CacheSPI getCache()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ public NodeSPI getParent()
+ {
+ return parent;
+ }
+
+ public boolean hasChildrenDirect()
+ {
+ return !children.isEmpty();
+ }
+
+ public Map getInternalState(boolean onlyInternalState)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ public void setInternalState(Map state)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ public void setValid(boolean valid, boolean recursive)
+ {
+ this.isValid = valid;
+ if (recursive)
+ {
+ for (NodeSpiMock child : children.values())
+ {
+ child.setValid(valid, true);
+ child.isValid = valid;
+ }
+ }
+ }
+
+ public Set getChildren()
+ {
+ return getChildrenDirect();
+ }
+
+ public Set getChildrenNames()
+ {
+ return getChildrenNamesDirect();
+ }
+
+ public Map getData()
+ {
+ return getDataDirect();
+ }
+
+ public Set getKeys()
+ {
+ return getKeysDirect();
+ }
+
+ public Fqn getFqn()
+ {
+ return fqn;
+ }
+
+ public Node addChild(Fqn f)
+ {
+ return addChildDirect(f);
+ }
+
+ public boolean removeChild(Fqn f)
+ {
+ return removeChildDirect(f);
+ }
+
+ public boolean removeChild(Object childName)
+ {
+ return removeChildDirect(childName);
+ }
+
+ public Node getChild(Fqn f)
+ {
+ return getChildDirect(f);
+ }
+
+ public Node getChild(Object name)
+ {
+ return getChildDirect(name);
+ }
+
+ public Object put(Object key, Object value)
+ {
+ return putDirect(key, value);
+ }
+
+ public Object putIfAbsent(Object key, Object value)
+ {
+ if (data.containsKey(key)) return data.get(key);
+ return data.put(key, value);
+ }
+
+ public Object replace(Object key, Object value)
+ {
+ return data.put(key, value);
+ }
+
+ public boolean replace(Object key, Object oldValue, Object newValue)
+ {
+ if (data.get(key).equals(oldValue))
+ {
+ data.put(key, newValue);
+ return true;
+ } else
+ return false;
+ }
+
+ public void putAll(Map map)
+ {
+ putAllDirect(data);
+ }
+
+ public void replaceAll(Map map)
+ {
+ data = map;
+ }
+
+ public Object get(Object key)
+ {
+ return getDirect(key);
+ }
+
+ public Object remove(Object key)
+ {
+ return removeDirect(key);
+ }
+
+ public void clearData()
+ {
+ clearDataDirect();
+ }
+
+ public int dataSize()
+ {
+ return data.size();
+ }
+
+ public boolean hasChild(Fqn f)
+ {
+ NodeSpiMock directChild = children.get(fqn.getLastElement());
+ return directChild != null && (fqn.size() == 1 || directChild.hasChild(f.getSubFqn(1, f.size())));
+ }
+
+ public boolean hasChild(Object o)
+ {
+ return children.containsKey(o);
+ }
+
+ public boolean isValid()
+ {
+ return isValid;
+ }
+
+ public boolean isResident()
+ {
+ return isResident;
+ }
+
+ public void setResident(boolean resident)
+ {
+ this.isResident = resident;
+ }
+
+ public boolean isLockForChildInsertRemove()
+ {
+ return false;
+ }
+
+ public void setLockForChildInsertRemove(boolean lockForChildInsertRemove)
+ {
+ }
+
+ public void releaseObjectReferences(boolean recursive)
+ {
+ }
+}
16 years, 7 months
JBoss Cache SVN: r5834 - in core/trunk/src: main/java/org/jboss/cache/factories and 2 other directories.
by jbosscache-commits@lists.jboss.org
Author: manik.surtani(a)jboss.com
Date: 2008-05-13 10:10:18 -0400 (Tue, 13 May 2008)
New Revision: 5834
Added:
core/trunk/src/main/java/org/jboss/cache/interceptors/OptimisticTxInterceptor.java
Modified:
core/trunk/src/main/java/org/jboss/cache/config/Configuration.java
core/trunk/src/main/java/org/jboss/cache/factories/CommandsFactory.java
core/trunk/src/main/java/org/jboss/cache/factories/InterceptorChainFactory.java
core/trunk/src/main/java/org/jboss/cache/interceptors/BaseRpcInterceptor.java
core/trunk/src/main/java/org/jboss/cache/interceptors/BaseTransactionalContextInterceptor.java
core/trunk/src/main/java/org/jboss/cache/interceptors/DataGravitatorInterceptor.java
core/trunk/src/main/java/org/jboss/cache/interceptors/InvocationContextInterceptor.java
core/trunk/src/main/java/org/jboss/cache/interceptors/OptimisticLockingInterceptor.java
core/trunk/src/main/java/org/jboss/cache/interceptors/OptimisticReplicationInterceptor.java
core/trunk/src/main/java/org/jboss/cache/interceptors/OptimisticValidatorInterceptor.java
core/trunk/src/main/java/org/jboss/cache/interceptors/TxInterceptor.java
core/trunk/src/test/java/org/jboss/cache/optimistic/AsyncFullStackInterceptorTest.java
Log:
JBCACHE-1339: Refactor and simplify TxInterceptor, and other enhancements
Modified: core/trunk/src/main/java/org/jboss/cache/config/Configuration.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/config/Configuration.java 2008-05-12 20:45:48 UTC (rev 5833)
+++ core/trunk/src/main/java/org/jboss/cache/config/Configuration.java 2008-05-13 14:10:18 UTC (rev 5834)
@@ -93,9 +93,14 @@
*/
public boolean isInvalidation()
{
- return this.equals(INVALIDATION_SYNC) || this.equals(INVALIDATION_SYNC);
+ return this == INVALIDATION_SYNC || this == INVALIDATION_SYNC;
}
+ public boolean isSynchronous()
+ {
+ return this == REPL_SYNC || this == INVALIDATION_SYNC;
+ }
+
}
public static CacheMode legacyModeToCacheMode(int legacyMode)
Modified: core/trunk/src/main/java/org/jboss/cache/factories/CommandsFactory.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/factories/CommandsFactory.java 2008-05-12 20:45:48 UTC (rev 5833)
+++ core/trunk/src/main/java/org/jboss/cache/factories/CommandsFactory.java 2008-05-13 14:10:18 UTC (rev 5834)
@@ -254,13 +254,6 @@
return new OptimisticPrepareCommand(gtx, modifications, data, address, onePhaseCommit);
}
-
- public OptimisticPrepareCommand buildOptimisticPrepareCommand(GlobalTransaction gtx, ReversibleCommand command)
- {
- List<ReversibleCommand> list = (List<ReversibleCommand>) (command != null ? Collections.singletonList(command) : Collections.emptyList());
- return buildOptimisticPrepareCommand(gtx, list, null, null, false);
- }
-
public AnnounceBuddyPoolNameCommand buildAnnounceBuddyPoolNameCommand(Address address, String buddyPoolName)
{
AnnounceBuddyPoolNameCommand command = new AnnounceBuddyPoolNameCommand(address, buddyPoolName);
Modified: core/trunk/src/main/java/org/jboss/cache/factories/InterceptorChainFactory.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/factories/InterceptorChainFactory.java 2008-05-12 20:45:48 UTC (rev 5833)
+++ core/trunk/src/main/java/org/jboss/cache/factories/InterceptorChainFactory.java 2008-05-13 14:10:18 UTC (rev 5834)
@@ -34,12 +34,10 @@
private CommandInterceptor createInterceptor(Class<? extends CommandInterceptor> clazz) throws IllegalAccessException, InstantiationException
{
-// CommandInterceptor chainedInterceptor = componentRegistry.getComponent(clazz.getName(), clazz);
CommandInterceptor chainedInterceptor = componentRegistry.getComponent(clazz);
if (chainedInterceptor == null)
{
chainedInterceptor = clazz.newInstance();
-// componentRegistry.registerComponent(clazz.getName(), chainedInterceptor, clazz);
componentRegistry.registerComponent(chainedInterceptor, clazz);
}
else
@@ -66,7 +64,8 @@
interceptorChain.appendIntereceptor(createInterceptor(CacheMgmtInterceptor.class));
// load the tx interceptor
- interceptorChain.appendIntereceptor(createInterceptor(TxInterceptor.class));
+ interceptorChain.appendIntereceptor(createInterceptor(optimistic ? OptimisticTxInterceptor.class : TxInterceptor.class));
+
if (configuration.isUseLazyDeserialization())
interceptorChain.appendIntereceptor(createInterceptor(MarshalledValueInterceptor.class));
interceptorChain.appendIntereceptor(createInterceptor(NotificationInterceptor.class));
Modified: core/trunk/src/main/java/org/jboss/cache/interceptors/BaseRpcInterceptor.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/interceptors/BaseRpcInterceptor.java 2008-05-12 20:45:48 UTC (rev 5833)
+++ core/trunk/src/main/java/org/jboss/cache/interceptors/BaseRpcInterceptor.java 2008-05-13 14:10:18 UTC (rev 5834)
@@ -9,7 +9,6 @@
import org.jboss.cache.cluster.ReplicationQueue;
import org.jboss.cache.commands.ReplicableCommand;
import org.jboss.cache.commands.VisitableCommand;
-import org.jboss.cache.config.Configuration.CacheMode;
import org.jboss.cache.config.Option;
import org.jboss.cache.factories.CommandsFactory;
import org.jboss.cache.factories.annotations.Inject;
@@ -56,8 +55,7 @@
private void init()
{
usingBuddyReplication = configuration.getBuddyReplicationConfig() != null && configuration.getBuddyReplicationConfig().isEnabled();
- CacheMode mode = configuration.getCacheMode();
- defaultSynchronous = (mode == CacheMode.REPL_SYNC || mode == CacheMode.INVALIDATION_SYNC);
+ defaultSynchronous = configuration.getCacheMode().isSynchronous();
}
/**
Modified: core/trunk/src/main/java/org/jboss/cache/interceptors/BaseTransactionalContextInterceptor.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/interceptors/BaseTransactionalContextInterceptor.java 2008-05-12 20:45:48 UTC (rev 5833)
+++ core/trunk/src/main/java/org/jboss/cache/interceptors/BaseTransactionalContextInterceptor.java 2008-05-13 14:10:18 UTC (rev 5834)
@@ -43,7 +43,7 @@
}
}
- protected void setTransactionalContext(Transaction tx, GlobalTransaction gtx, InvocationContext ctx)
+ protected void setTransactionalContext(Transaction tx, GlobalTransaction gtx, TransactionEntry entry, InvocationContext ctx)
{
if (trace)
{
@@ -52,7 +52,14 @@
}
ctx.setTransaction(tx);
ctx.setGlobalTransaction(gtx);
- if (gtx != null) ctx.setTransactionEntry(txTable.get(gtx));
+ if (entry == null)
+ {
+ if (gtx != null) ctx.setTransactionEntry(txTable.get(gtx));
+ }
+ else
+ {
+ ctx.setTransactionEntry(entry);
+ }
}
/**
Modified: core/trunk/src/main/java/org/jboss/cache/interceptors/DataGravitatorInterceptor.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/interceptors/DataGravitatorInterceptor.java 2008-05-12 20:45:48 UTC (rev 5833)
+++ core/trunk/src/main/java/org/jboss/cache/interceptors/DataGravitatorInterceptor.java 2008-05-13 14:10:18 UTC (rev 5834)
@@ -26,10 +26,8 @@
import org.jboss.cache.commands.remote.DataGravitationCleanupCommand;
import org.jboss.cache.commands.tx.CommitCommand;
import org.jboss.cache.commands.tx.RollbackCommand;
-import org.jboss.cache.config.Configuration;
import org.jboss.cache.factories.CommandsFactory;
import org.jboss.cache.factories.annotations.Inject;
-import org.jboss.cache.factories.annotations.Start;
import org.jboss.cache.marshall.NodeData;
import org.jboss.cache.transaction.GlobalTransaction;
import org.jgroups.Address;
@@ -66,7 +64,6 @@
public class DataGravitatorInterceptor extends BaseRpcInterceptor
{
private BuddyManager buddyManager;
- private boolean syncCommunications = false;
/**
* Map that contains commands that need cleaning up. This is keyed on global transaction, and contains a list of
* cleanup commands corresponding to all gravitate calls made during the course of the transaction in question.
@@ -85,12 +82,6 @@
this.cacheSPI = cacheSPI;
}
- @Start
- public void startInterceptor()
- {
- syncCommunications = configuration.getCacheMode() == Configuration.CacheMode.REPL_SYNC || configuration.getCacheMode() == Configuration.CacheMode.INVALIDATION_SYNC;
- }
-
@Override
public Object visitGetChildrenNamesCommand(InvocationContext ctx, GetChildrenNamesCommand command) throws Throwable
{
Modified: core/trunk/src/main/java/org/jboss/cache/interceptors/InvocationContextInterceptor.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/interceptors/InvocationContextInterceptor.java 2008-05-12 20:45:48 UTC (rev 5833)
+++ core/trunk/src/main/java/org/jboss/cache/interceptors/InvocationContextInterceptor.java 2008-05-13 14:10:18 UTC (rev 5834)
@@ -44,65 +44,65 @@
@Override
public Object visitPutDataMapCommand(InvocationContext ctx, PutDataMapCommand command) throws Throwable
{
- return handleAll(ctx, command, command.getGlobalTransaction());
+ return handleAll(ctx, command, command.getGlobalTransaction(), false);
}
@Override
public Object visitPutKeyValueCommand(InvocationContext ctx, PutKeyValueCommand command) throws Throwable
{
- return handleAll(ctx, command, command.getGlobalTransaction());
+ return handleAll(ctx, command, command.getGlobalTransaction(), false);
}
@Override
public Object visitRemoveNodeCommand(InvocationContext ctx, RemoveNodeCommand command) throws Throwable
{
- return handleAll(ctx, command, command.getGlobalTransaction());
+ return handleAll(ctx, command, command.getGlobalTransaction(), false);
}
@Override
public Object visitRemoveDataCommand(InvocationContext ctx, RemoveDataCommand command) throws Throwable
{
- return handleAll(ctx, command, command.getGlobalTransaction());
+ return handleAll(ctx, command, command.getGlobalTransaction(), false);
}
@Override
public Object visitRemoveKeyCommand(InvocationContext ctx, RemoveKeyCommand command) throws Throwable
{
- return handleAll(ctx, command, command.getGlobalTransaction());
+ return handleAll(ctx, command, command.getGlobalTransaction(), false);
}
@Override
public Object visitPrepareCommand(InvocationContext ctx, PrepareCommand command) throws Throwable
{
- return handleAll(ctx, command, command.getGlobalTransaction());
+ return handleAll(ctx, command, command.getGlobalTransaction(), true);
}
@Override
public Object visitRollbackCommand(InvocationContext ctx, RollbackCommand command) throws Throwable
{
- return handleAll(ctx, command, command.getGlobalTransaction());
+ return handleAll(ctx, command, command.getGlobalTransaction(), true);
}
@Override
public Object visitCommitCommand(InvocationContext ctx, CommitCommand command) throws Throwable
{
- return handleAll(ctx, command, command.getGlobalTransaction());
+ return handleAll(ctx, command, command.getGlobalTransaction(), true);
}
@Override
public Object visitOptimisticPrepareCommand(InvocationContext ctx, OptimisticPrepareCommand command) throws Throwable
{
- return handleAll(ctx, command, command.getGlobalTransaction());
+ return handleAll(ctx, command, command.getGlobalTransaction(), true);
}
@Override
public Object handleDefault(InvocationContext ctx, VisitableCommand command) throws Throwable
{
- return handleAll(ctx, command, null);
+ return handleAll(ctx, command, null, false);
}
@SuppressWarnings("deprecation")
- public Object handleAll(InvocationContext ctx, VisitableCommand command, GlobalTransaction gtx) throws Throwable
+ private Object handleAll(InvocationContext ctx, VisitableCommand command, GlobalTransaction gtx, boolean scrubContextOnCompletion) throws Throwable
{
Option optionOverride = ctx.getOptionOverrides();
boolean suppressExceptions = false;
@@ -118,11 +118,11 @@
Transaction tx = getTransaction();
GlobalTransaction realGtx = getGlobalTransaction(tx, gtx);
if (tx == null && realGtx != null && realGtx.isRemote()) tx = txTable.getLocalTransaction(gtx);
- setTransactionalContext(tx, realGtx, ctx);
+ setTransactionalContext(tx, realGtx, null, ctx);
}
else
{
- setTransactionalContext(null, null, ctx);
+ setTransactionalContext(null, null, null, ctx);
}
if (optionOverride != null)
@@ -134,7 +134,7 @@
if (ctx.getTransaction() != null)
{
suspendedTransaction = txManager.suspend();
- setTransactionalContext(null, null, ctx);
+ setTransactionalContext(null, null, null, ctx);
if (trace) log.trace("Suspending transaction " + suspendedTransaction);
resumeSuspended = true;
}
@@ -169,10 +169,19 @@
}
finally
{
+ /*
+ * we should scrub txs after every call to prevent race conditions
+ * basically any other call coming in on the same thread and hijacking any running tx's
+ * was highlighted in JBCACHE-606
+ */
+ if (scrubContextOnCompletion) setTransactionalContext(null, null, null, ctx);
+
// clean up any invocation-scope options set up
if (trace) log.trace("Resetting invocation-scope options");
ctx.getOptionOverrides().reset();
+ // if this is a prepare, opt prepare or
+
if (resumeSuspended)
{
txManager.resume(suspendedTransaction);
Modified: core/trunk/src/main/java/org/jboss/cache/interceptors/OptimisticLockingInterceptor.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/interceptors/OptimisticLockingInterceptor.java 2008-05-12 20:45:48 UTC (rev 5833)
+++ core/trunk/src/main/java/org/jboss/cache/interceptors/OptimisticLockingInterceptor.java 2008-05-13 14:10:18 UTC (rev 5834)
@@ -51,6 +51,8 @@
{
timeout = ctx.getOptionOverrides().getLockAcquisitionTimeout();
}
+
+ boolean succeeded = false;
try
{
TransactionWorkspace<?, ?> workspace = getTransactionWorkspace(ctx);
@@ -75,25 +77,23 @@
}
}
+
+ // locks have acquired so lets pass on up
+ Object retval = invokeNextInterceptor(ctx, command);
+ succeeded = true;
+ return retval;
}
catch (Throwable e)
{
+ succeeded = false;
log.debug("Caught exception attempting to lock nodes ", e);
//we have failed - set to rollback and throw exception
- try
- {
- unlock(ctx, gtx);
- }
- catch (Throwable t)
- {
- // we have failed to unlock - now what?
- log.error("Failed to unlock nodes, after failing to lock nodes during a prepare! Locks are possibly in a very inconsistent state now!", t);
- }
throw e;
}
-
- // locks have acquired so lets pass on up
- return invokeNextInterceptor(ctx, command);
+ finally
+ {
+ if (!succeeded || command.isOnePhaseCommit()) unlock(ctx, gtx);
+ }
}
@Override
@@ -119,15 +119,7 @@
}
finally
{
- try
- {
- unlock(ctx, getGlobalTransaction(ctx));
- }
- catch (Exception e)
- {
- // we have failed to unlock - now what?
- log.error("Failed to unlock nodes after a commit or rollback! Locks are possibly in a very inconsistent state now!", e);
- }
+ unlock(ctx, getGlobalTransaction(ctx));
}
return retval;
}
@@ -139,8 +131,16 @@
*/
private void unlock(InvocationContext ctx, GlobalTransaction gtx)
{
- TransactionEntry entry = ctx.getTransactionEntry();
- entry.releaseAllLocksFIFO(gtx);
+ try
+ {
+ TransactionEntry entry = ctx.getTransactionEntry();
+ entry.releaseAllLocksFIFO(gtx);
+ }
+ catch (Exception e)
+ {
+ // we have failed to unlock - now what?
+ log.error("Failed to unlock nodes after a commit or rollback! Locks are possibly in a very inconsistent state now!", e);
+ }
}
}
Modified: core/trunk/src/main/java/org/jboss/cache/interceptors/OptimisticReplicationInterceptor.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/interceptors/OptimisticReplicationInterceptor.java 2008-05-12 20:45:48 UTC (rev 5833)
+++ core/trunk/src/main/java/org/jboss/cache/interceptors/OptimisticReplicationInterceptor.java 2008-05-13 14:10:18 UTC (rev 5834)
@@ -26,7 +26,6 @@
import org.jboss.cache.commands.write.RemoveDataCommand;
import org.jboss.cache.commands.write.RemoveKeyCommand;
import org.jboss.cache.commands.write.RemoveNodeCommand;
-import org.jboss.cache.config.Configuration;
import org.jboss.cache.config.Option;
import org.jboss.cache.factories.CommandsFactory;
import org.jboss.cache.factories.annotations.Inject;
@@ -187,8 +186,6 @@
protected void broadcastPrepare(OptimisticPrepareCommand command, GlobalTransaction gtx, InvocationContext ctx) throws Throwable
{
- boolean remoteCallSync = configuration.getCacheMode() == Configuration.CacheMode.REPL_SYNC;
-
// this method will return immediately if we're the only member
if (rpcManager.getMembers() != null && rpcManager.getMembers().size() > 1)
{
@@ -210,7 +207,7 @@
{
log.debug("(" + rpcManager.getLocalAddress() + "): broadcasting prepare for " + gtx + " (" + command.getModificationsCount() + " modifications");
}
- replicateCall(ctx, toBroadcast, remoteCallSync, ctx.getOptionOverrides());
+ replicateCall(ctx, toBroadcast, defaultSynchronous, ctx.getOptionOverrides());
}
else
{
Added: core/trunk/src/main/java/org/jboss/cache/interceptors/OptimisticTxInterceptor.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/interceptors/OptimisticTxInterceptor.java (rev 0)
+++ core/trunk/src/main/java/org/jboss/cache/interceptors/OptimisticTxInterceptor.java 2008-05-13 14:10:18 UTC (rev 5834)
@@ -0,0 +1,222 @@
+package org.jboss.cache.interceptors;
+
+import org.jboss.cache.InvocationContext;
+import org.jboss.cache.commands.AbstractVisitor;
+import org.jboss.cache.commands.VersionedDataCommand;
+import org.jboss.cache.commands.VisitableCommand;
+import org.jboss.cache.commands.tx.OptimisticPrepareCommand;
+import org.jboss.cache.commands.tx.PrepareCommand;
+import org.jboss.cache.commands.write.PutDataMapCommand;
+import org.jboss.cache.commands.write.PutKeyValueCommand;
+import org.jboss.cache.commands.write.RemoveDataCommand;
+import org.jboss.cache.commands.write.RemoveKeyCommand;
+import org.jboss.cache.commands.write.RemoveNodeCommand;
+import org.jboss.cache.config.Option;
+import org.jboss.cache.transaction.GlobalTransaction;
+import org.jboss.cache.transaction.OptimisticTransactionEntry;
+import org.jboss.cache.transaction.TransactionEntry;
+
+import javax.transaction.Transaction;
+import java.util.List;
+
+/**
+ * A new interceptor to simplify functionality in the {@link org.jboss.cache.interceptors.TxInterceptor}.
+ *
+ * @author Manik Surtani (<a href="mailto:manik@jboss.org">manik(a)jboss.org</a>)
+ * @since 2.2.0
+ */
+public class OptimisticTxInterceptor extends TxInterceptor
+{
+ protected final ModificationsReplayVisitor replayVisitor = new ModificationsReplayVisitor();
+
+ @Override
+ public Object visitOptimisticPrepareCommand(InvocationContext ctx, OptimisticPrepareCommand command) throws Throwable
+ {
+ // nothing really different from a pessimistic prepare command.
+ return visitPrepareCommand(ctx, command);
+ }
+
+ @Override
+ public Object handleDefault(InvocationContext ctx, VisitableCommand command) throws Throwable
+ {
+ try
+ {
+ Transaction tx = ctx.getTransaction();
+ boolean implicitTransaction = tx == null;
+ if (implicitTransaction)
+ {
+ tx = createLocalTx();
+ // we need to attach this tx to the InvocationContext.
+ ctx.setTransaction(tx);
+ }
+
+ try
+ {
+ Object retval = attachGtxAndPassUpChain(ctx, command);
+ if (implicitTransaction)
+ {
+ copyInvocationScopeOptionsToTxScope(ctx);
+ copyForcedCacheModeToTxScope(ctx);
+ txManager.commit();
+ }
+ return retval;
+ }
+ catch (Throwable t)
+ {
+ if (implicitTransaction)
+ {
+ log.warn("Rolling back, exception encountered", t);
+ try
+ {
+ copyInvocationScopeOptionsToTxScope(ctx);
+ copyForcedCacheModeToTxScope(ctx);
+ txManager.rollback();
+ }
+ catch (Throwable th)
+ {
+ log.warn("Roll back failed encountered", th);
+ }
+ throw t;
+ }
+ }
+ }
+ catch (Throwable th)
+ {
+ ctx.throwIfNeeded(th);
+ }
+
+ return null;
+ }
+
+ private void copyForcedCacheModeToTxScope(InvocationContext ctx)
+ {
+ Option optionOverride = ctx.getOptionOverrides();
+ if (optionOverride != null
+ && (optionOverride.isForceAsynchronous() || optionOverride.isForceSynchronous()))
+ {
+ TransactionEntry entry = ctx.getTransactionEntry();
+ if (entry != null)
+ {
+ if (optionOverride.isForceAsynchronous())
+ entry.setForceAsyncReplication(true);
+ else
+ entry.setForceSyncReplication(true);
+ }
+ }
+ }
+
+ @Override
+ protected PrepareCommand buildPrepareCommand(GlobalTransaction gtx, List modifications, boolean onePhaseCommit)
+ {
+ return commandsFactory.buildOptimisticPrepareCommand(gtx, modifications, null, rpcManager.getLocalAddress(), onePhaseCommit);
+ }
+
+ /**
+ * Replays modifications by passing them up the interceptor chain.
+ *
+ * @throws Throwable
+ */
+ @Override
+ protected boolean replayModifications(InvocationContext ctx, Transaction ltx, PrepareCommand command) throws Throwable
+ {
+ if (log.isDebugEnabled()) log.debug("Handling optimistic remote prepare " + ctx.getGlobalTransaction());
+
+ // invoke all modifications by passing them up the chain, setting data versions first.
+ try
+ {
+ replayVisitor.visitCollection(ctx, command.getModifications());
+ }
+ catch (Throwable t)
+ {
+ log.error("Prepare failed!", t);
+ return false;
+ }
+ return true;
+ }
+
+ @Override
+ protected void cleanupStaleLocks(InvocationContext ctx) throws Throwable
+ {
+ TransactionEntry entry = ctx.getTransactionEntry();
+ if (entry != null)
+ {
+ entry.releaseAllLocksLIFO(ctx.getGlobalTransaction());
+ ((OptimisticTransactionEntry) entry).getTransactionWorkSpace().clearNodes();
+ }
+ }
+
+ @Override
+ protected TransactionEntry createNewTransactionEntry(Transaction tx) throws Exception
+ {
+ return new OptimisticTransactionEntry(tx);
+ }
+
+ private class ModificationsReplayVisitor extends AbstractVisitor
+ {
+ @Override
+ public Object handleDefault(InvocationContext ctx, VisitableCommand command) throws Throwable
+ {
+ Object result = invokeNextInterceptor(ctx, command);
+ assertTxIsStillValid(ctx.getTransaction());
+ return result;
+ }
+
+ @Override
+ public Object visitPutDataMapCommand(InvocationContext ctx, PutDataMapCommand command) throws Throwable
+ {
+ return handleDataVersionCommand(ctx, command);
+ }
+
+ @Override
+ public Object visitPutKeyValueCommand(InvocationContext ctx, PutKeyValueCommand command) throws Throwable
+ {
+ return handleDataVersionCommand(ctx, command);
+ }
+
+ @Override
+ public Object visitRemoveNodeCommand(InvocationContext ctx, RemoveNodeCommand command) throws Throwable
+ {
+ return handleDataVersionCommand(ctx, command);
+ }
+
+ @Override
+ public Object visitRemoveDataCommand(InvocationContext ctx, RemoveDataCommand command) throws Throwable
+ {
+ return handleDataVersionCommand(ctx, command);
+ }
+
+ @Override
+ public Object visitRemoveKeyCommand(InvocationContext ctx, RemoveKeyCommand command) throws Throwable
+ {
+ return handleDataVersionCommand(ctx, command);
+ }
+
+ private Object handleDataVersionCommand(InvocationContext ctx, VersionedDataCommand command) throws Throwable
+ {
+ Option originalOption = ctx.getOptionOverrides();
+ if (command.isVersioned())
+ {
+ Option option = new Option();
+ option.setDataVersion(command.getDataVersion());
+ ctx.setOptionOverrides(option);
+ }
+ Object retval;
+ try
+ {
+ retval = invokeNextInterceptor(ctx, command);
+ assertTxIsStillValid(ctx.getTransaction());
+ }
+ catch (Throwable t)
+ {
+ log.error("method invocation failed", t);
+ throw t;
+ }
+ finally
+ {
+ ctx.setOptionOverrides(originalOption);
+ }
+ return retval;
+ }
+ }
+
+}
Modified: core/trunk/src/main/java/org/jboss/cache/interceptors/OptimisticValidatorInterceptor.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/interceptors/OptimisticValidatorInterceptor.java 2008-05-12 20:45:48 UTC (rev 5833)
+++ core/trunk/src/main/java/org/jboss/cache/interceptors/OptimisticValidatorInterceptor.java 2008-05-13 14:10:18 UTC (rev 5834)
@@ -136,23 +136,21 @@
}
}
log.debug("Successfully validated nodes");
- return invokeNextInterceptor(ctx, command);
+ Object retval = invokeNextInterceptor(ctx, command);
+ if (command.isOnePhaseCommit())
+ {
+ // do a comit-phase
+ commitTransaction(ctx);
+ }
+ return retval;
}
- @Override
- public Object visitCommitCommand(InvocationContext ctx, CommitCommand command) throws Throwable
+ private void commitTransaction(InvocationContext ctx)
{
GlobalTransaction gtx = getGlobalTransaction(ctx);
TransactionWorkspace workspace;
- try
- {
- workspace = getTransactionWorkspace(ctx);
- }
- catch (CacheException e)
- {
- log.warn("we can't rollback", e);
- return invokeNextInterceptor(ctx, command);
- }
+ workspace = getTransactionWorkspace(ctx);
+
if (log.isDebugEnabled()) log.debug("Commiting successfully validated changes for GlobalTransaction " + gtx);
Collection<WorkspaceNode> workspaceNodes = workspace.getNodes().values();
for (WorkspaceNode workspaceNode : workspaceNodes)
@@ -253,6 +251,12 @@
}
}
}
+ }
+
+ @Override
+ public Object visitCommitCommand(InvocationContext ctx, CommitCommand command) throws Throwable
+ {
+ commitTransaction(ctx);
return invokeNextInterceptor(ctx, command);
}
Modified: core/trunk/src/main/java/org/jboss/cache/interceptors/TxInterceptor.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/interceptors/TxInterceptor.java 2008-05-12 20:45:48 UTC (rev 5833)
+++ core/trunk/src/main/java/org/jboss/cache/interceptors/TxInterceptor.java 2008-05-13 14:10:18 UTC (rev 5834)
@@ -13,7 +13,6 @@
import org.jboss.cache.commands.AbstractVisitor;
import org.jboss.cache.commands.ReplicableCommand;
import org.jboss.cache.commands.ReversibleCommand;
-import org.jboss.cache.commands.VersionedDataCommand;
import org.jboss.cache.commands.VisitableCommand;
import org.jboss.cache.commands.tx.CommitCommand;
import org.jboss.cache.commands.tx.OptimisticPrepareCommand;
@@ -26,7 +25,6 @@
import org.jboss.cache.commands.write.RemoveDataCommand;
import org.jboss.cache.commands.write.RemoveKeyCommand;
import org.jboss.cache.commands.write.RemoveNodeCommand;
-import org.jboss.cache.config.Configuration;
import org.jboss.cache.config.Option;
import org.jboss.cache.factories.CommandsFactory;
import org.jboss.cache.factories.ComponentRegistry;
@@ -34,9 +32,9 @@
import org.jboss.cache.invocation.InvocationContextContainer;
import org.jboss.cache.notifications.Notifier;
import org.jboss.cache.transaction.GlobalTransaction;
-import org.jboss.cache.transaction.OptimisticTransactionEntry;
import org.jboss.cache.transaction.TransactionEntry;
import org.jboss.cache.transaction.TransactionTable;
+import org.jboss.cache.util.concurrent.ConcurrentHashSet;
import javax.transaction.InvalidTransactionException;
import javax.transaction.Status;
@@ -48,6 +46,7 @@
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
/**
@@ -61,22 +60,17 @@
*/
public class TxInterceptor extends BaseTransactionalContextInterceptor implements TxInterceptorMBean
{
- private final static Object NULL = new Object();
-
- private CommandsFactory commandsFactory;
- private RPCManager rpcManager;
+ protected CommandsFactory commandsFactory;
+ protected RPCManager rpcManager;
private Notifier notifier;
private InvocationContextContainer invocationContextContainer;
private ComponentRegistry componentRegistry;
- private final ModificationsReplayVisitor replayVisitorNoInject = new ModificationsReplayVisitor(false);
- private final ModificationsReplayVisitor replayVisitorWithInject = new ModificationsReplayVisitor(true);
-
/**
* List <Transaction>that we have registered for
*/
- private final Map transactions = new ConcurrentHashMap(16);
- private final Map rollbackTransactions = new ConcurrentHashMap(16);
+ private final Set<Transaction> transactions = new ConcurrentHashSet<Transaction>();
+ private final Map<Transaction, GlobalTransaction> rollbackTransactions = new ConcurrentHashMap<Transaction, GlobalTransaction>(16);
private long prepares = 0;
private long commits = 0;
private long rollbacks = 0;
@@ -94,17 +88,9 @@
}
@Override
- @SuppressWarnings("unchecked")
- public Object visitOptimisticPrepareCommand(InvocationContext ctx, OptimisticPrepareCommand command) throws Throwable
- {
- return visitPrepareCommand(ctx, command);
- }
-
- @Override
public Object visitPrepareCommand(InvocationContext ctx, PrepareCommand command) throws Throwable
{
Object result = null;
- boolean scrubTxsOnExit = false;
// this is a prepare, commit, or rollback.
if (log.isDebugEnabled()) log.debug("Got gtx from invocation context " + ctx.getGlobalTransaction());
@@ -113,8 +99,7 @@
if (ctx.getGlobalTransaction().isRemote())
{
result = handleRemotePrepare(ctx, command);
- scrubTxsOnExit = true;
- increasePrepares();
+ if (getStatisticsEnabled()) prepares++;
}
else
{
@@ -126,18 +111,10 @@
{
ctx.throwIfNeeded(e);
}
- finally
- {
- scrubOnExist(ctx, scrubTxsOnExit);
- }
+
return result;
}
- private void increasePrepares()
- {
- if (getStatisticsEnabled()) prepares++;
- }
-
@Override
@SuppressWarnings("unchecked")
public Object visitCommitCommand(InvocationContext ctx, CommitCommand command) throws Throwable
@@ -167,7 +144,7 @@
}
if (log.isDebugEnabled()) log.debug(" executing commit() with local TX " + ltx + " under global tx " + gtx);
txManager.commit();
- increaseCommits();
+ if (getStatisticsEnabled()) commits++;
}
finally
{
@@ -187,10 +164,7 @@
{
ctx.throwIfNeeded(throwable);
}
- finally
- {
- scrubOnExist(ctx, true);
- }
+
return null;
}
@@ -211,7 +185,6 @@
{
log.warn("No local transaction for this remotely originating rollback. Possibly rolling back before a prepare call was broadcast?");
txTable.remove(gtx);
- scrubOnExist(ctx, true);
return null;
}
// disconnect if we have a current tx associated
@@ -229,7 +202,7 @@
}
if (log.isDebugEnabled()) log.debug("executing with local TX " + ltx + " under global tx " + gtx);
txManager.rollback();
- increaseRollbacks();
+ if (getStatisticsEnabled()) rollbacks++;
}
finally
{
@@ -251,26 +224,48 @@
{
ctx.throwIfNeeded(throwable);
}
- finally
- {
- scrubOnExist(ctx, true);
- }
+
return null;
}
+ @Override
+ public Object visitInvalidateCommand(InvocationContext ctx, InvalidateCommand command) throws Throwable
+ {
+ return invokeNextInterceptor(ctx, command);
+ }
+
/**
- * we should scrub txs after every call to prevent race conditions
- * basically any other call coming in on the same thread and hijacking any running tx's
- * was highlighted in JBCACHE-606
+ * Tests if we already have a tx running. If so, register a sync handler for this method invocation.
+ * if not, create a local tx if we're using opt locking.
+ *
+ * @return
+ * @throws Throwable
*/
- private void scrubOnExist(InvocationContext ctx, boolean scrubTxsOnExit)
+ @Override
+ public Object handleDefault(InvocationContext ctx, VisitableCommand command) throws Throwable
{
- if (scrubTxsOnExit)
+ try
{
- setTransactionalContext(null, null, ctx);
+ return attachGtxAndPassUpChain(ctx, command);
}
+ catch (Throwable throwable)
+ {
+ ctx.throwIfNeeded(throwable);
+ return null;
+ }
}
+ protected Object attachGtxAndPassUpChain(InvocationContext ctx, VisitableCommand command) throws Throwable
+ {
+ Transaction tx = ctx.getTransaction();
+ if (tx != null) attachGlobalTransaction(ctx, tx, command);
+ return invokeNextInterceptor(ctx, command);
+ }
+
+ // ------------------------------------------------------------------------
+ // JMX statistics
+ // ------------------------------------------------------------------------
+
public long getPrepares()
{
return prepares;
@@ -306,42 +301,56 @@
// --------------------------------------------------------------
+ /**
+ * Handles a remotely originating prepare call, by creating a local transaction for the remote global transaction
+ * and replaying modifications in this new local transaction.
+ *
+ * @param ctx invocation context
+ * @param command prepare command
+ * @return result of the prepare, typically a null.
+ * @throws Throwable in the event of problems.
+ */
private Object handleRemotePrepare(InvocationContext ctx, PrepareCommand command) throws Throwable
{
+ // the InvocationContextInterceptor would have set this for us
GlobalTransaction gtx = ctx.getGlobalTransaction();
- // Is there a local transaction associated with GTX ?
+
+ // Is there a local transaction associated with GTX? (not the current tx associated with the thread, which may be
+ // in the invocation context
Transaction ltx = txTable.getLocalTransaction(gtx);
Transaction currentTx = txManager.getTransaction();
- Object retval = null;
+ Object retval = null;
+ boolean success = false;
try
{
if (ltx == null)
{
if (currentTx != null) txManager.suspend();
- ltx = createLocalTxForGlobalTx(gtx, ctx);// creates new LTX and associates it with a GTX
+ // create a new local transaction
+ ltx = createLocalTx();
+ // associate this with a global tx
+ txTable.put(ltx, gtx);
+ if (trace) log.trace("Created new tx for gtx " + gtx);
+
if (log.isDebugEnabled())
- {
log.debug("Started new local tx as result of remote prepare: local tx=" + ltx + " (status=" + ltx.getStatus() + "), gtx=" + gtx);
- }
}
else
{
//this should be valid
- if (!ctx.isValidTransaction())
+ if (!TransactionTable.isValid(ltx))
throw new CacheException("Transaction " + ltx + " not in correct state to be prepared");
- //associate this thread with this ltx if this ltx is NOT the current tx.
+ //associate this thread with the local transaction associated with the global transaction, IF the localTx is NOT the current tx.
if (currentTx == null || !ltx.equals(currentTx))
{
+ if (trace) log.trace("Suspending current tx " + currentTx);
txManager.suspend();
txManager.resume(ltx);
}
}
- if (trace)
- {
- log.trace("Resuming existing transaction " + ltx + ", global TX=" + gtx);
- }
+ if (trace) log.trace("Resuming existing tx " + ltx + ", global tx=" + gtx);
// at this point we have a non-null ltx
@@ -353,239 +362,42 @@
if (entry == null)
{
// create a new transaction entry
- entry = configuration.isNodeLockingOptimistic() ? new OptimisticTransactionEntry(ltx) : new TransactionEntry(ltx);
- log.debug("creating new tx entry");
+ if (log.isDebugEnabled()) log.debug("creating new tx entry");
+ entry = createNewTransactionEntry(ltx);
txTable.put(gtx, entry);
- ctx.setTransactionEntry(entry);
- if (trace) log.trace("TxTable contents: " + txTable);
}
- setTransactionalContext(ltx, gtx, ctx);
+ setTransactionalContext(ltx, gtx, entry, ctx);
+
// register a sync handler for this tx.
- registerHandler(ltx, new RemoteSynchronizationHandler(gtx, ltx), ctx);
+ registerHandler(ltx, new RemoteSynchronizationHandler(gtx, ltx, entry), ctx);
- if (configuration.isNodeLockingOptimistic())
+ // replay modifications
+ success = replayModifications(ctx, ltx, command);
+
+ // now pass the prepare command up the chain as well.
+ if (command.isOnePhaseCommit())
{
- retval = handleOptimisticPrepare(ctx, gtx, ltx, (OptimisticPrepareCommand) command);
+ if (trace)
+ log.trace("Using one-phase prepare. Not propagating the prepare call up the stack until called to do so by the sync handler.");
}
else
{
- retval = handlePessimisticPrepare(ctx, ltx, command);
+ // now pass up the prepare method itself.
+ invokeNextInterceptor(ctx, command);
}
+ // JBCACHE-361 Confirm that the transaction is ACTIVE
+ assertTxIsStillValid(ltx);
}
finally
{
- txManager.suspend();// suspends ltx - could be null
- // resume whatever else we had going.
- if (currentTx != null) txManager.resume(currentTx);
- if (log.isDebugEnabled()) log.debug("Finished remote prepare " + gtx);
- }
+ // if we are running a one-phase commit, perform a commit or rollback now.
+ if (trace) log.trace("Are we running a 1-phase commit? " + command.isOnePhaseCommit());
- return retval;
- }
- // handler methods.
- // --------------------------------------------------------------
-
-
- @Override
- public Object visitInvalidateCommand(InvocationContext ctx, InvalidateCommand command) throws Throwable
- {
- return invokeNextInterceptor(ctx, command);
- }
-
- /**
- * Tests if we already have a tx running. If so, register a sync handler for this method invocation.
- * if not, create a local tx if we're using opt locking.
- *
- * @return
- * @throws Throwable
- */
- @Override
- public Object handleDefault(InvocationContext ctx, VisitableCommand command) throws Throwable
- {
- Object result;
- try
- {
- Transaction tx = ctx.getTransaction();
- // if there is no current tx and we're using opt locking, we need to use an implicit tx.
- boolean implicitTransaction = configuration.isNodeLockingOptimistic() && tx == null;
- if (implicitTransaction)
- {
- tx = createLocalTx();
- // we need to attach this tx to the InvocationContext.
- ctx.setTransaction(tx);
- }
- if (tx != null)
- attachGlobalTransaction(ctx, tx, command);
-
- GlobalTransaction gtx = ctx.getGlobalTransaction();
-
- try
- {
- result = invokeNextInterceptor(ctx, command);
- if (implicitTransaction)
- {
- copyInvocationScopeOptionsToTxScope(ctx);
- copyForcedCacheModeToTxScope(ctx);
- txManager.commit();
- }
- }
- catch (Throwable t)
- {
- if (implicitTransaction)
- {
- log.warn("Rolling back, exception encountered", t);
- try
- {
- setTransactionalContext(tx, gtx, ctx);
- txManager.rollback();
- }
- catch (Throwable th)
- {
- log.warn("Roll back failed encountered", th);
- }
- throw t;
- }
- else
- {
- throw t;
- }
- }
- return result;
- }
- catch (Throwable throwable)
- {
- ctx.throwIfNeeded(throwable);
- return null;
- }
- }
-
- private void copyForcedCacheModeToTxScope(InvocationContext ctx)
- {
- Option optionOverride = ctx.getOptionOverrides();
- if (optionOverride != null
- && (optionOverride.isForceAsynchronous() || optionOverride.isForceSynchronous()))
- {
- TransactionEntry entry = ctx.getTransactionEntry();
- if (entry != null)
- {
- if (optionOverride.isForceAsynchronous())
- entry.setForceAsyncReplication(true);
- else
- entry.setForceSyncReplication(true);
- }
- }
- }
-
- private ReplicableCommand attachGlobalTransaction(InvocationContext ctx, Transaction tx, VisitableCommand command) throws Throwable
- {
- if (trace)
- {
- log.trace(" local transaction exists - registering global tx if not present for " + Thread.currentThread());
- }
- if (trace)
- {
- GlobalTransaction tempGtx = txTable.get(tx);
- log.trace("Associated gtx in txTable is " + tempGtx);
- }
-
- // register a sync handler for this tx - only if the globalTransaction is not remotely initiated.
- GlobalTransaction gtx = registerTransaction(tx, ctx);
- if (gtx != null)
- {
- command = replaceGtx(command, gtx);
- }
- else
- {
- // get the current globalTransaction from the txTable.
- gtx = txTable.get(tx);
- }
-
- // make sure we attach this globalTransaction to the invocation context.
- ctx.setGlobalTransaction(gtx);
-
- return command;
- }
-
- /**
- * This is called by invoke() if we are in a remote gtx's prepare() phase.
- * Finds the appropriate tx, suspends any existing txs, registers a sync handler
- * and passes up the chain.
- * <p/>
- * Resumes any existing txs before returning.
- *
- * @throws Throwable
- */
- private Object handleOptimisticPrepare(InvocationContext ctx, GlobalTransaction gtx, Transaction ltx, OptimisticPrepareCommand command) throws Throwable
- {
- Object retval;
- if (log.isDebugEnabled()) log.debug("Handling optimistic remote prepare " + gtx);
- replayVisitorWithInject.visitCollection(ctx, command.getModifications());
- retval = invokeNextInterceptor(ctx, command);
- // JBCACHE-361 Confirm that the transaction is ACTIVE
- if (!TransactionTable.isActive(ltx))
- {
- throw new ReplicationException("prepare() failed -- local transaction status is not STATUS_ACTIVE;" +
- " is " + ltx.getStatus());
- }
- return retval;
- }
-
- private Object handlePessimisticPrepare(InvocationContext ctx, Transaction ltx, PrepareCommand command) throws Exception
- {
- boolean success = true;
- Object retval = null;
- try
- {
- // now pass up the prepare method itself.
- try
- {
- replayVisitorNoInject.visitCollection(ctx, command.getModifications());
- if (command.isOnePhaseCommit())
- {
- if (trace)
- log.trace("Using one-phase prepare. Not propagating the prepare call up the stack until called to do so by the sync handler.");
- }
- else
- {
- retval = invokeNextInterceptor(ctx, command);
- }
-
- // JBCACHE-361 Confirm that the transaction is ACTIVE
- if (!ctx.isValidTransaction())
- {
- throw new ReplicationException("prepare() failed -- " +
- "local transaction status is not valid;" +
- " is " + ltx.getStatus());
- }
- }
- catch (Throwable th)
- {
- log.error("prepare method invocation failed", th);
- retval = th;
- success = false;
- if (retval instanceof Exception)
- {
- throw (Exception) retval;
- }
- }
- }
- finally
- {
-
- if (trace)
- {
- log.trace("Are we running a 1-phase commit? " + command.isOnePhaseCommit());
- }
- // 4. If commit == true (one-phase-commit): commit (or rollback) the TX; this will cause
- // {before/after}Completion() to be called in all registered interceptors: the TransactionInterceptor
- // will then commit/rollback against the cache
-
if (command.isOnePhaseCommit())
{
try
{
- // invokeOnePhaseCommitMethod(globalTransaction, modifications.size() > 0, success);
if (success)
{
ltx.commit();
@@ -618,92 +430,76 @@
transactions.remove(ltx);// JBAS-298
}
}
+
+ txManager.suspend();// suspends ltx - could be null
+ // resume whatever else we had going.
+ if (currentTx != null) txManager.resume(currentTx);
+ if (log.isDebugEnabled()) log.debug("Finished remote prepare " + gtx);
}
+
return retval;
}
- public class ModificationsReplayVisitor extends AbstractVisitor
+ protected TransactionEntry createNewTransactionEntry(Transaction tx) throws Exception
{
- private final boolean injectDataVersions;
+ return new TransactionEntry(tx);
+ }
- public ModificationsReplayVisitor(boolean injectDataVersions)
+ private ReplicableCommand attachGlobalTransaction(InvocationContext ctx, Transaction tx, VisitableCommand command) throws Throwable
+ {
+ if (trace)
{
- this.injectDataVersions = injectDataVersions;
+ log.trace(" local transaction exists - registering global tx if not present for " + Thread.currentThread());
}
-
- @Override
- public Object handleDefault(InvocationContext ctx, VisitableCommand command) throws Throwable
+ if (trace)
{
- Object result = invokeNextInterceptor(ctx, command);
- assertTxIsActive(ctx);
- return result;
+ GlobalTransaction tempGtx = txTable.get(tx);
+ log.trace("Associated gtx in txTable is " + tempGtx);
}
- @Override
- public Object visitPutDataMapCommand(InvocationContext ctx, PutDataMapCommand command) throws Throwable
+ // register a sync handler for this tx - only if the globalTransaction is not remotely initiated.
+ GlobalTransaction gtx = registerTransaction(tx, ctx);
+ if (gtx != null)
{
- return handleDataVersionCommand(ctx, command);
+ command = replaceGtx(command, gtx);
}
-
- @Override
- public Object visitPutKeyValueCommand(InvocationContext ctx, PutKeyValueCommand command) throws Throwable
+ else
{
- return handleDataVersionCommand(ctx, command);
+ // get the current globalTransaction from the txTable.
+ gtx = txTable.get(tx);
}
- @Override
- public Object visitRemoveNodeCommand(InvocationContext ctx, RemoveNodeCommand command) throws Throwable
- {
- return handleDataVersionCommand(ctx, command);
- }
+ // make sure we attach this globalTransaction to the invocation context.
+ ctx.setGlobalTransaction(gtx);
- @Override
- public Object visitRemoveDataCommand(InvocationContext ctx, RemoveDataCommand command) throws Throwable
- {
- return handleDataVersionCommand(ctx, command);
- }
+ return command;
+ }
- @Override
- public Object visitRemoveKeyCommand(InvocationContext ctx, RemoveKeyCommand command) throws Throwable
+ /**
+ * Replays modifications
+ *
+ * @param ctx
+ * @param ltx
+ * @param command
+ * @return
+ * @throws Exception
+ */
+ protected boolean replayModifications(InvocationContext ctx, Transaction ltx, PrepareCommand command) throws Throwable
+ {
+ try
{
- return handleDataVersionCommand(ctx, command);
- }
-
- private Object handleDataVersionCommand(InvocationContext ctx, VersionedDataCommand command) throws Throwable
- {
- if (!injectDataVersions) return handleDefault(ctx, command);
- Option originalOption = ctx.getOptionOverrides();
- if (command.isVersioned())
+ // replay modifications
+ for (ReversibleCommand modification : command.getModifications())
{
- Option option = new Option();
- option.setDataVersion(command.getDataVersion());
- ctx.setOptionOverrides(option);
+ invokeNextInterceptor(ctx, modification);
+ assertTxIsStillValid(ltx);
}
- Object retval;
- try
- {
- retval = invokeNextInterceptor(ctx, command);
- assertTxIsActive(ctx);
- }
- catch (Throwable t)
- {
- log.error("method invocation failed", t);
- throw t;
- }
- finally
- {
- ctx.setOptionOverrides(originalOption);
- }
- return retval;
+ return true;
}
-
- private void assertTxIsActive(InvocationContext ctx)
- throws SystemException
+ catch (Throwable th)
{
- if (!TransactionTable.isActive(ctx.getTransaction()))
- {
- throw new ReplicationException("prepare() failed -- " + "local transaction status is not STATUS_ACTIVE; is " + ctx.getTransaction().getStatus());
- }
+ log.error("prepare failed!", th);
+ return false;
}
}
@@ -719,16 +515,6 @@
}
}
- private void increaseCommits()
- {
- if (getStatisticsEnabled()) commits++;
- }
-
- private void increaseRollbacks()
- {
- if (getStatisticsEnabled()) rollbacks++;
- }
-
/**
* Handles a commit or a rollback. Called by the synch handler. Simply tests that we are in the correct tx and
* passes the meth call up the interceptor chain.
@@ -760,38 +546,24 @@
// Transaction phase runners
// --------------------------------------------------------------
+ protected PrepareCommand buildPrepareCommand(GlobalTransaction gtx, List modifications, boolean onePhaseCommit)
+ {
+ return commandsFactory.buildPrepareCommand(gtx, modifications, rpcManager.getLocalAddress(), onePhaseCommit);
+ }
+
/**
- * creates a commit() MethodCall and feeds it to handleCommitRollback();
+ * creates a commit()
*/
- protected void runCommitPhase(InvocationContext ctx, GlobalTransaction gtx, Transaction tx, List modifications, List clModifications, boolean onePhaseCommit)
+ protected void runCommitPhase(InvocationContext ctx, GlobalTransaction gtx, List modifications, List clModifications, boolean onePhaseCommit)
{
// set the hasMods flag in the invocation ctx. This should not be replicated, just used locally by the interceptors.
ctx.setTxHasMods(modifications != null && modifications.size() > 0);
ctx.setCacheLoaderHasMods(clModifications != null && clModifications.size() > 0);
try
{
- VisitableCommand commitCommand;
- if (onePhaseCommit)
- {
- // running a 1-phase commit.
- if (configuration.isNodeLockingOptimistic())
- {
- commitCommand = commandsFactory.buildOptimisticPrepareCommand(gtx, null);
- }
- else
- {
- commitCommand = commandsFactory.buildPrepareCommand(gtx, modifications, rpcManager.getLocalAddress(), true);
- }
- }
- else
- {
- commitCommand = commandsFactory.buildCommitCommand(gtx);
- }
+ VisitableCommand commitCommand = onePhaseCommit ? buildPrepareCommand(gtx, modifications, true) : commandsFactory.buildCommitCommand(gtx);
- if (trace)
- {
- log.trace(" running commit for " + gtx);
- }
+ if (trace) log.trace("Running commit for " + gtx);
handleCommitRollback(ctx, commitCommand);
}
@@ -819,38 +591,27 @@
}
}
-
- private void cleanupStaleLocks(InvocationContext ctx) throws Throwable
+ protected void cleanupStaleLocks(InvocationContext ctx) throws Throwable
{
TransactionEntry entry = ctx.getTransactionEntry();
- if (entry != null)
- {
- entry.releaseAllLocksLIFO(ctx.getGlobalTransaction());
- }
+ if (entry != null) entry.releaseAllLocksLIFO(ctx.getGlobalTransaction());
}
/**
- * creates a rollback() MethodCall and feeds it to handleCommitRollback();
- *
- * @param gtx
+ * creates a rollback()
*/
protected void runRollbackPhase(InvocationContext ctx, GlobalTransaction gtx, Transaction tx, List<ReversibleCommand> modifications)
{
- //Transaction ltx = null;
try
{
ctx.setTxHasMods(modifications != null && modifications.size() > 0);
// JBCACHE-457
VisitableCommand rollbackCommand = commandsFactory.buildRollbackCommand(gtx);
- if (trace)
- {
- log.trace(" running rollback for " + gtx);
- }
+ if (trace) log.trace(" running rollback for " + gtx);
//JBCACHE-359 Store a lookup for the globalTransaction so a listener
// callback can find it
- //ltx = getLocalTxForGlobalTx(globalTransaction);
rollbackTransactions.put(tx, gtx);
handleCommitRollback(ctx, rollbackCommand);
@@ -861,7 +622,7 @@
}
finally
{
- if (tx != null) rollbackTransactions.remove(tx);
+ rollbackTransactions.remove(tx);
}
}
@@ -884,48 +645,35 @@
return newList;
}
+ private boolean isOnePhaseCommit()
+ {
+ if (!configuration.getCacheMode().isSynchronous())
+ {
+ // this is a REPL_ASYNC call - do 1-phase commit. break!
+ if (trace) log.trace("This is a REPL_ASYNC call (1 phase commit) - do nothing for beforeCompletion()");
+ return true;
+ }
+ return false;
+ }
+
/**
* Handles a local prepare - invoked by the sync handler. Tests if the current tx matches the gtx passed in to the
* method call and passes the prepare() call up the chain.
- *
- * @return
- * @throws Throwable
*/
@SuppressWarnings("deprecation")
public Object runPreparePhase(InvocationContext ctx, GlobalTransaction gtx, List<ReversibleCommand> modifications) throws Throwable
{
- // build the method call
- VisitableCommand prepareCommand;
- // if (cache.getCacheModeInternal() != CacheImpl.REPL_ASYNC)
- // {
// running a 2-phase commit.
- if (configuration.isNodeLockingOptimistic())
- {
- prepareCommand = commandsFactory.buildOptimisticPrepareCommand(gtx, modifications, null, rpcManager.getLocalAddress(), false);
- }
- else if (configuration.getCacheMode() != Configuration.CacheMode.REPL_ASYNC)
- {
- prepareCommand = commandsFactory.buildPrepareCommand(gtx, modifications, rpcManager.getLocalAddress(),
- false);// don't commit or rollback - wait for call
- }
- //}
- else
- {
- // this is a REPL_ASYNC call - do 1-phase commit. break!
- if (trace) log.trace("This is a REPL_ASYNC call (1 phase commit) - do nothing for beforeCompletion()");
- return null;
- }
+ VisitableCommand prepareCommand = buildPrepareCommand(gtx, modifications, false);
- // passes a prepare call up the local interceptor chain. The replication interceptor
- // will do the broadcasting if needed. This is so all requests (local/remote) are
- // treated the same
Object result;
// Is there a local transaction associated with GTX ?
Transaction ltx = ctx.getTransaction();
//if ltx is not null and it is already running
- if (txManager.getTransaction() != null && ltx != null && txManager.getTransaction().equals(ltx))
+ Transaction currentTransaction = txManager.getTransaction();
+ if (currentTransaction != null && ltx != null && currentTransaction.equals(ltx))
{
VisitableCommand originalCommand = ctx.getCommand();
ctx.setCommand(prepareCommand);
@@ -953,6 +701,20 @@
// Private helper methods
// --------------------------------------------------------------
+ protected void assertTxIsStillValid(Transaction tx)
+ {
+ if (!TransactionTable.isActive(tx))
+ {
+ try
+ {
+ throw new ReplicationException("prepare() failed -- local transaction status is not STATUS_ACTIVE; is " + tx.getStatus());
+ }
+ catch (SystemException e)
+ {
+ throw new ReplicationException("prepare() failed -- local transaction status is not STATUS_ACTIVE; Unable to retrieve transaction status.");
+ }
+ }
+ }
/**
* Creates a gtx (if one doesnt exist), a sync handler, and registers the tx.
@@ -964,41 +726,42 @@
private GlobalTransaction registerTransaction(Transaction tx, InvocationContext ctx) throws Exception
{
GlobalTransaction gtx;
- if (TransactionTable.isValid(tx) && transactions.put(tx, NULL) == null)
+
+ if (TransactionTable.isValid(tx) && transactions.add(tx))
{
gtx = txTable.getCurrentTransaction(tx, true);
+ TransactionEntry entry;
if (ctx.getGlobalTransaction() == null)
{
ctx.setGlobalTransaction(gtx);
- ctx.setTransactionEntry(txTable.get(gtx));
+ entry = txTable.get(gtx);
+ ctx.setTransactionEntry(entry);
}
+ else
+ {
+ entry = ctx.getTransactionEntry();
+ }
if (gtx.isRemote())
{
// should be no need to register a handler since this a remotely initiated globalTransaction
- if (trace)
- {
- log.trace("is a remotely initiated gtx so no need to register a tx for it");
- }
+ if (trace) log.trace("is a remotely initiated gtx so no need to register a tx for it");
}
else
{
- if (trace)
- {
- log.trace("Registering sync handler for tx " + tx + ", gtx " + gtx);
- }
+ if (trace) log.trace("Registering sync handler for tx " + tx + ", gtx " + gtx);
+
// see the comment in the LocalSyncHandler for the last isOriginLocal param.
- LocalSynchronizationHandler myHandler = new LocalSynchronizationHandler(gtx, tx, !ctx.isOriginLocal());
+ LocalSynchronizationHandler myHandler = new LocalSynchronizationHandler(gtx, tx, entry, !ctx.isOriginLocal());
registerHandler(tx, myHandler, ctx);
}
}
- else if ((gtx = (GlobalTransaction) rollbackTransactions.get(tx)) != null)
+ else if ((gtx = rollbackTransactions.get(tx)) != null)
{
if (trace) log.trace("Transaction " + tx + " is already registered and is rolling back.");
}
else
{
if (trace) log.trace("Transaction " + tx + " is already registered.");
-
}
return gtx;
}
@@ -1022,11 +785,11 @@
}
/**
- * Replaces the global transaction in a method call with a new global transaction passed in.
+ * Replaces the global transaction in a VisitableCommand with a new global transaction passed in.
*/
- private VisitableCommand replaceGtx(VisitableCommand m, final GlobalTransaction gtx) throws Throwable
+ private VisitableCommand replaceGtx(VisitableCommand command, final GlobalTransaction gtx) throws Throwable
{
- m.acceptVisitor(null, new AbstractVisitor()
+ command.acceptVisitor(null, new AbstractVisitor()
{
@Override
public Object visitPutDataMapCommand(InvocationContext ctx, PutDataMapCommand command) throws Throwable
@@ -1091,7 +854,7 @@
return null;
}
});
- return m;
+ return command;
}
/**
@@ -1100,7 +863,7 @@
* @return
* @throws Exception
*/
- private Transaction createLocalTx() throws Exception
+ protected Transaction createLocalTx() throws Exception
{
if (trace)
{
@@ -1113,23 +876,6 @@
return localTx;
}
- /**
- * Creates a new local transaction for a given global transaction.
- *
- * @param gtx
- * @return
- * @throws Exception
- */
- private Transaction createLocalTxForGlobalTx(GlobalTransaction gtx, InvocationContext ctx) throws Exception
- {
- Transaction localTx = createLocalTx();
- txTable.put(localTx, gtx);
- // attach this to the context
- ctx.setTransaction(localTx);
- if (trace) log.trace("Created new tx for gtx " + gtx);
- return localTx;
- }
-
// ------------------------------------------------------------------------
// Synchronization classes
// ------------------------------------------------------------------------
@@ -1144,26 +890,27 @@
TransactionEntry entry = null;
protected InvocationContext ctx; // the context for this call.
- RemoteSynchronizationHandler(GlobalTransaction gtx, Transaction tx)
+ RemoteSynchronizationHandler(GlobalTransaction gtx, Transaction tx, TransactionEntry entry)
{
this.gtx = gtx;
this.tx = tx;
+ this.entry = entry;
}
public void beforeCompletion()
{
if (trace) log.trace("Running beforeCompletion on gtx " + gtx);
- entry = txTable.get(gtx);
+
if (entry == null)
{
log.error("Transaction has a null transaction entry - beforeCompletion() will fail.");
- log.error("TxTable contents: " + txTable);
throw new IllegalStateException("cannot find transaction entry for " + gtx);
}
modifications = entry.getModifications();
ctx = invocationContextContainer.get();
- ctx.setTransactionEntry(entry);
+ setTransactionalContext(tx, gtx, entry, ctx);
+
if (ctx.isOptionsUninitialised() && entry.getOption() != null) ctx.setOptionOverrides(entry.getOption());
assertCanContinue();
@@ -1179,21 +926,18 @@
if (ctx == null)
{
ctx = invocationContextContainer.get();
- }
+ setTransactionalContext(tx, gtx, entry, ctx);
- entry = txTable.get(gtx);
- ctx.setTransactionEntry(entry);
-
- if (ctx.isOptionsUninitialised() && entry != null && entry.getOption() != null)
- {
- // use the options from the transaction entry instead
- ctx.setOptionOverrides(entry.getOption());
+ if (ctx.isOptionsUninitialised() && entry != null && entry.getOption() != null)
+ {
+ // use the options from the transaction entry instead
+ ctx.setOptionOverrides(entry.getOption());
+ }
}
try
{
assertCanContinue();
- setTransactionalContext(tx, gtx, ctx);
try
{
@@ -1220,11 +964,9 @@
switch (status)
{
case Status.STATUS_COMMITTED:
-
- // if this is optimistic or sync repl
- boolean onePhaseCommit = !configuration.isNodeLockingOptimistic() && configuration.getCacheMode() == Configuration.CacheMode.REPL_ASYNC;
+ boolean onePhaseCommit = !configuration.getCacheMode().isSynchronous();
if (log.isDebugEnabled()) log.debug("Running commit phase. One phase? " + onePhaseCommit);
- runCommitPhase(ctx, gtx, tx, modifications, cacheLoaderModifications, onePhaseCommit);
+ runCommitPhase(ctx, gtx, modifications, cacheLoaderModifications, onePhaseCommit);
log.debug("Finished commit phase");
break;
case Status.STATUS_UNKNOWN:
@@ -1240,12 +982,17 @@
throw new IllegalStateException("illegal status: " + status);
}
}
+ catch (Exception th)
+ {
+ log.trace("Caught exception ", th);
+
+ }
finally
{
// clean up the tx table
txTable.remove(gtx);
txTable.remove(tx);
- setTransactionalContext(null, null, ctx);
+ setTransactionalContext(null, null, null, ctx);
cleanupInternalState();
}
}
@@ -1308,9 +1055,9 @@
* @param tx
* @param remoteLocal
*/
- LocalSynchronizationHandler(GlobalTransaction gtx, Transaction tx, boolean remoteLocal)
+ LocalSynchronizationHandler(GlobalTransaction gtx, Transaction tx, TransactionEntry entry, boolean remoteLocal)
{
- super(gtx, tx);
+ super(gtx, tx, entry);
this.remoteLocal = remoteLocal;
}
@@ -1321,7 +1068,7 @@
ctx.setOriginLocal(!remoteLocal); // this is the LOCAL sync handler after all!
// fetch the modifications before the transaction is committed
// (and thus removed from the txTable)
- setTransactionalContext(tx, gtx, ctx);
+ setTransactionalContext(tx, gtx, entry, ctx);
if (!entry.hasModifications())
{
if (trace) log.trace("No modifications in this tx. Skipping beforeCompletion()");
@@ -1329,7 +1076,7 @@
return;
}
- // set any transaction wide options as current for this thread.
+ // set any transaction wide options as current for this thread, caching original options that would then be reset
originalOptions = ctx.getOptionOverrides();
transactionalOptions = entry.getOption();
ctx.setOptionOverrides(transactionalOptions);
@@ -1343,8 +1090,9 @@
case Status.STATUS_PREPARING:
// run a prepare call.
modifications = compact(modifications);
- Object result = runPreparePhase(ctx, gtx, modifications);
+ Object result = isOnePhaseCommit() ? null : runPreparePhase(ctx, gtx, modifications);
+
if (result instanceof Throwable)
{
if (log.isDebugEnabled())
@@ -1376,7 +1124,7 @@
finally
{
localRollbackOnly = false;
- setTransactionalContext(null, null, ctx);
+ setTransactionalContext(null, null, null, ctx);
ctx.setOptionOverrides(originalOptions);
}
}
@@ -1387,9 +1135,8 @@
// could happen if a rollback is called and beforeCompletion() doesn't get called.
if (ctx == null) ctx = invocationContextContainer.get();
ctx.setLocalRollbackOnly(localRollbackOnly);
+ setTransactionalContext(tx, gtx, entry, ctx);
ctx.setOptionOverrides(transactionalOptions);
- ctx.setTransaction(tx);
- ctx.setGlobalTransaction(gtx);
try
{
super.afterCompletion(status);
Modified: core/trunk/src/test/java/org/jboss/cache/optimistic/AsyncFullStackInterceptorTest.java
===================================================================
--- core/trunk/src/test/java/org/jboss/cache/optimistic/AsyncFullStackInterceptorTest.java 2008-05-12 20:45:48 UTC (rev 5833)
+++ core/trunk/src/test/java/org/jboss/cache/optimistic/AsyncFullStackInterceptorTest.java 2008-05-13 14:10:18 UTC (rev 5834)
@@ -146,27 +146,18 @@
mgr.resume(tx);
- boolean fail = false;
- try
- {
- mgr.commit();
- }
- catch (Exception e)
- {
- fail = true;
+ // one-phase-commits wont throw an exception on failure.
+ mgr.commit();
- }
assertNull(mgr.getTransaction());
assertEquals(0, cache.getTransactionTable().getNumGlobalTransactions());
assertEquals(0, cache.getTransactionTable().getNumLocalTransactions());
- assertEquals(true, fail);
-
assertTrue(cache.exists(Fqn.fromString("/one/two")));
assertNotNull(cache.getNode("/one"));
assertEquals(false, cache.getRoot().getLock().isLocked());
- assertEquals(false, ((NodeSPI<Object, Object>) cache.getNode("/one")).getLock().isLocked());
- assertEquals(false, ((NodeSPI<Object, Object>) cache.getNode("/one/two")).getLock().isLocked());
+ assertEquals(false, cache.getNode("/one").getLock().isLocked());
+ assertEquals(false, cache.getNode("/one/two").getLock().isLocked());
assertNotNull(cache.getNode("/one").getChild("two"));
assertEquals(pojo2, cache.get(Fqn.fromString("/one/two"), "key1"));
@@ -350,18 +341,9 @@
mgr.resume(tx);
- boolean fail = false;
- try
- {
- mgr.commit();
- }
- catch (Exception e)
- {
- fail = true;
+ // 1-pc commits wont throw an exception if there is a problem.
+ mgr.commit();
- }
-
- assertEquals(true, fail);
assertNull(mgr.getTransaction());
assertEquals(0, cache.getTransactionTable().getNumGlobalTransactions());
assertEquals(0, cache.getTransactionTable().getNumLocalTransactions());
16 years, 7 months
JBoss Cache SVN: r5833 - core/trunk/src/test/java/org/jboss/cache/statetransfer.
by jbosscache-commits@lists.jboss.org
Author: mircea.markus
Date: 2008-05-12 16:45:48 -0400 (Mon, 12 May 2008)
New Revision: 5833
Modified:
core/trunk/src/test/java/org/jboss/cache/statetransfer/StateTransferConcurrencyTest.java
Log:
fixed issue
Modified: core/trunk/src/test/java/org/jboss/cache/statetransfer/StateTransferConcurrencyTest.java
===================================================================
--- core/trunk/src/test/java/org/jboss/cache/statetransfer/StateTransferConcurrencyTest.java 2008-05-12 19:24:51 UTC (rev 5832)
+++ core/trunk/src/test/java/org/jboss/cache/statetransfer/StateTransferConcurrencyTest.java 2008-05-12 20:45:48 UTC (rev 5833)
@@ -105,7 +105,8 @@
semaphore.release(count);
// Sleep to ensure the threads get all the semaphore tickets
- waitUntillUsersFinish(activators);
+ TestingUtil.sleepThread((long) 1000);
+
// Reacquire the semaphore tickets; when we have them all
// we know the threads are done
for (int i = 0; i < count; i++)
@@ -159,15 +160,6 @@
}
- private void waitUntillUsersFinish(CacheActivator[] activators)
- throws Exception
- {
- for (CacheActivator activator : activators)
- {
- activator.waitUntillFinished();
- }
- }
-
private void waitTillAllReplicationsFinish(int count, CacheSPI[] caches)
throws Exception
{
16 years, 7 months
JBoss Cache SVN: r5832 - in core/trunk/src: test/java/org/jboss/cache/api and 1 other directory.
by jbosscache-commits@lists.jboss.org
Author: manik.surtani(a)jboss.com
Date: 2008-05-12 15:24:51 -0400 (Mon, 12 May 2008)
New Revision: 5832
Modified:
core/trunk/src/main/java/org/jboss/cache/commands/read/GetDataMapCommand.java
core/trunk/src/test/java/org/jboss/cache/api/NodeAPITest.java
Log:
JBCACHE-1337 - Node.getData() is immutable but not defensively copied.
Modified: core/trunk/src/main/java/org/jboss/cache/commands/read/GetDataMapCommand.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/commands/read/GetDataMapCommand.java 2008-05-12 17:20:48 UTC (rev 5831)
+++ core/trunk/src/main/java/org/jboss/cache/commands/read/GetDataMapCommand.java 2008-05-12 19:24:51 UTC (rev 5832)
@@ -4,9 +4,8 @@
import org.jboss.cache.InvocationContext;
import org.jboss.cache.NodeSPI;
import org.jboss.cache.commands.Visitor;
+import org.jboss.cache.util.MapCopy;
-import java.util.Collections;
-
/**
* Implements functionality defined by {@link org.jboss.cache.Cache#getData(org.jboss.cache.Fqn)}
* <p/>
@@ -39,7 +38,7 @@
{
NodeSPI n = dataContainer.peek(fqn);
if (n == null) return null;
- return Collections.unmodifiableMap(n.getDataDirect());
+ return new MapCopy(n.getDataDirect());
}
public Object acceptVisitor(InvocationContext ctx, Visitor visitor) throws Throwable
Modified: core/trunk/src/test/java/org/jboss/cache/api/NodeAPITest.java
===================================================================
--- core/trunk/src/test/java/org/jboss/cache/api/NodeAPITest.java 2008-05-12 17:20:48 UTC (rev 5831)
+++ core/trunk/src/test/java/org/jboss/cache/api/NodeAPITest.java 2008-05-12 19:24:51 UTC (rev 5832)
@@ -216,6 +216,56 @@
}
}
+ public void testDefensiveCopyOfData()
+ {
+ rootNode.put("key", "value");
+ Map<Object, Object> data = rootNode.getData();
+ Set<Object> keys = rootNode.getKeys();
+
+ assert keys.size() == 1;
+ assert keys.contains("key");
+
+ assert data.size() == 1;
+ assert data.containsKey("key");
+
+ // now change stuff.
+
+ rootNode.put("key2", "value2");
+
+ // assert that the collections we initially got have not changed.
+ assert keys.size() == 1;
+ assert keys.contains("key");
+
+ assert data.size() == 1;
+ assert data.containsKey("key");
+ }
+
+ public void testDefensiveCopyOfChildren()
+ {
+ Fqn childFqn = Fqn.fromString("/child");
+ rootNode.addChild(childFqn).put("k", "v");
+ Set<Node<Object, Object>> children = rootNode.getChildren();
+ Set<Object> childrenNames = rootNode.getChildrenNames();
+
+ assert childrenNames.size() == 1;
+ assert childrenNames.contains(childFqn.getLastElement());
+
+ assert children.size() == 1;
+ assert children.iterator().next().getFqn().equals(childFqn);
+
+ // now change stuff.
+
+ rootNode.addChild(Fqn.fromString("/child2"));
+
+ // assert that the collections we initially got have not changed.
+ assert childrenNames.size() == 1;
+ assert childrenNames.contains(childFqn.getLastElement());
+
+ assert children.size() == 1;
+ assert children.iterator().next().getFqn().equals(childFqn);
+ }
+
+
public void testImmutabilityOfChildren()
{
rootNode.addChild(A);
16 years, 7 months
JBoss Cache SVN: r5831 - in core/trunk/src: main/java/org/jboss/cache/eviction and 6 other directories.
by jbosscache-commits@lists.jboss.org
Author: mircea.markus
Date: 2008-05-12 13:20:48 -0400 (Mon, 12 May 2008)
New Revision: 5831
Added:
core/trunk/src/test/java/org/jboss/cache/util/internals/
core/trunk/src/test/java/org/jboss/cache/util/internals/EvictionController.java
core/trunk/src/test/java/org/jboss/cache/util/internals/ReplicationQueueNotifier.java
Modified:
core/trunk/src/main/java/org/jboss/cache/config/EvictionConfig.java
core/trunk/src/main/java/org/jboss/cache/eviction/BaseEvictionAlgorithm.java
core/trunk/src/main/java/org/jboss/cache/eviction/EvictionTimerTask.java
core/trunk/src/main/java/org/jboss/cache/interceptors/InterceptorChain.java
core/trunk/src/test/java/org/jboss/cache/invocation/InterceptorChainTest.java
core/trunk/src/test/java/org/jboss/cache/misc/TestingUtil.java
core/trunk/src/test/java/org/jboss/cache/statetransfer/StateTransferConcurrencyTest.java
Log:
fixed finding of start/stop methods
Modified: core/trunk/src/main/java/org/jboss/cache/config/EvictionConfig.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/config/EvictionConfig.java 2008-05-12 16:45:25 UTC (rev 5830)
+++ core/trunk/src/main/java/org/jboss/cache/config/EvictionConfig.java 2008-05-12 17:20:48 UTC (rev 5831)
@@ -22,6 +22,7 @@
package org.jboss.cache.config;
import org.jboss.cache.RegionManager;
+import org.jboss.cache.Fqn;
import org.jboss.cache.eviction.EvictionPolicy;
import org.jboss.cache.util.Util;
@@ -211,4 +212,20 @@
}
+ /**
+ * Returns the <code>EvictionRegionConfig</code> coresponding to given region fqn, or <code>null</code> if no
+ * match is found.
+ */
+ public EvictionRegionConfig getEvictionRegionConfig(String region)
+ {
+ Fqn<String> fqn = Fqn.fromString(region);
+ for (EvictionRegionConfig evConfig : getEvictionRegionConfigs())
+ {
+ if (evConfig.getRegionFqn().equals(fqn))
+ {
+ return evConfig;
+ }
+ }
+ return null;
+ }
}
Modified: core/trunk/src/main/java/org/jboss/cache/eviction/BaseEvictionAlgorithm.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/eviction/BaseEvictionAlgorithm.java 2008-05-12 16:45:25 UTC (rev 5830)
+++ core/trunk/src/main/java/org/jboss/cache/eviction/BaseEvictionAlgorithm.java 2008-05-12 17:20:48 UTC (rev 5831)
@@ -417,7 +417,7 @@
// if a collection is only guaranteed sort order by adding to the collection,
// this implementation will not guarantee sort order.
ne.setNumberOfNodeVisits(ne.getNumberOfNodeVisits() + 1);
- ne.setModifiedTimeStamp(System.currentTimeMillis());
+ ne.setModifiedTimeStamp(evictedEventNode.getCreationTimestamp());
}
protected void processRemovedElement(EvictedEventNode evictedEventNode) throws EvictionException
@@ -438,7 +438,7 @@
ne.setNumberOfElements(ne.getNumberOfElements() - 1);
// also treat it as a node visit.
ne.setNumberOfNodeVisits(ne.getNumberOfNodeVisits() + 1);
- ne.setModifiedTimeStamp(System.currentTimeMillis());
+ ne.setModifiedTimeStamp(evictedEventNode.getCreationTimestamp());
}
protected void processAddedElement(EvictedEventNode evictedEventNode) throws EvictionException
@@ -459,7 +459,7 @@
// also treat it as a node visit.
ne.setNumberOfNodeVisits(ne.getNumberOfNodeVisits() + 1);
- ne.setModifiedTimeStamp(System.currentTimeMillis());
+ ne.setModifiedTimeStamp(evictedEventNode.getCreationTimestamp());
// log.error ("*** Processing nodeAdded for fqn " + fqn + " NodeEntry's hashcode is " + ne.hashCode());
}
Modified: core/trunk/src/main/java/org/jboss/cache/eviction/EvictionTimerTask.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/eviction/EvictionTimerTask.java 2008-05-12 16:45:25 UTC (rev 5830)
+++ core/trunk/src/main/java/org/jboss/cache/eviction/EvictionTimerTask.java 2008-05-12 17:20:48 UTC (rev 5831)
@@ -74,6 +74,11 @@
return processedRegions.contains(region);
}
+ public Set<Region> getProcessedRegions()
+ {
+ return processedRegions;
+ }
+
public void stop()
{
log.debug("Stopping eviction timer");
@@ -98,36 +103,44 @@
*/
public void run()
{
- synchronized (processedRegions)
- {
- log.trace("***** eviction kicks in");
- for (Region region : processedRegions)
- {
- final EvictionPolicy policy = region.getEvictionPolicy();
-
- synchronized (region)
- {
- final EvictionAlgorithm algo = policy.getEvictionAlgorithm();
- if (algo == null)
- throw new NullPointerException("algorithm null");
- try
- {
- algo.process(region);
- }
- catch (EvictionException e)
- {
- log.error("run(): error processing eviction with exception: " + e.toString()
- + " will reset the eviction queue list.");
- region.resetEvictionQueues();
- log.debug("trace", e);
- }
- }
- }
- }
+ processRegions();
}
};
evictionThread.schedule(tt, wakeupIntervalSeconds * 1000, wakeupIntervalSeconds * 1000);
}
+
+ private void processRegions()
+ {
+ synchronized (processedRegions)
+ {
+ for (Region region : processedRegions)
+ {
+ handleRegion(region);
+ }
+ }
+ }
+
+ private void handleRegion(Region region)
+ {
+ synchronized (region)
+ {
+ final EvictionPolicy policy = region.getEvictionPolicy();
+ final EvictionAlgorithm algo = policy.getEvictionAlgorithm();
+ if (algo == null)
+ throw new NullPointerException("algorithm null");
+ try
+ {
+ algo.process(region);
+ }
+ catch (EvictionException e)
+ {
+ log.error("run(): error processing eviction with exception: " + e.toString()
+ + " will reset the eviction queue list.");
+ region.resetEvictionQueues();
+ log.debug("trace", e);
+ }
+ }
+ }
}
Modified: core/trunk/src/main/java/org/jboss/cache/interceptors/InterceptorChain.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/interceptors/InterceptorChain.java 2008-05-12 16:45:25 UTC (rev 5830)
+++ core/trunk/src/main/java/org/jboss/cache/interceptors/InterceptorChain.java 2008-05-12 17:20:48 UTC (rev 5831)
@@ -14,6 +14,7 @@
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
+import java.util.ArrayList;
/**
* Knows how to build and manage an chain of interceptors. Also in charge with invoking methods on the chain.
@@ -271,6 +272,22 @@
return invocationContextContainer.get();
}
+ /**
+ * Returns all interceptors which extend the given command interceptor.
+ */
+ public List<CommandInterceptor> getInterceptorsWhichExtend(Class<? extends CommandInterceptor> interceptorClass)
+ {
+ List<CommandInterceptor> result = new ArrayList<CommandInterceptor>();
+ for (CommandInterceptor interceptor : asList())
+ {
+ boolean isSubclass = interceptorClass.isAssignableFrom(interceptor.getClass());
+ if (isSubclass)
+ {
+ result.add(interceptor);
+ }
+ }
+ return result;
+ }
public String toString()
{
Modified: core/trunk/src/test/java/org/jboss/cache/invocation/InterceptorChainTest.java
===================================================================
--- core/trunk/src/test/java/org/jboss/cache/invocation/InterceptorChainTest.java 2008-05-12 16:45:25 UTC (rev 5830)
+++ core/trunk/src/test/java/org/jboss/cache/invocation/InterceptorChainTest.java 2008-05-12 17:20:48 UTC (rev 5831)
@@ -126,6 +126,18 @@
assert txInterceptor.getNext().equals(invalidationInterceptor);
}
+ public void testGetInterceptorsWhichExtend()
+ {
+ InvocationContextInterceptor ic2 = (InvocationContextInterceptor) create(InvocationContextInterceptor.class);
+ chain.appendIntereceptor(ic2);
+ List<CommandInterceptor> result = chain.getInterceptorsWhichExtend(InvocationContextInterceptor.class);
+ assert result.contains(icInterceptor);
+ assert result.contains(ic2);
+ assert result.size() == 2;
+ result = chain.getInterceptorsWhichExtend(CommandInterceptor.class);
+ assert result.size() == chain.asList().size();
+ }
+
public void removeInterceptorWithtType()
{
chain.addInterceptor(txInterceptor, 1);
Modified: core/trunk/src/test/java/org/jboss/cache/misc/TestingUtil.java
===================================================================
--- core/trunk/src/test/java/org/jboss/cache/misc/TestingUtil.java 2008-05-12 16:45:25 UTC (rev 5830)
+++ core/trunk/src/test/java/org/jboss/cache/misc/TestingUtil.java 2008-05-12 17:20:48 UTC (rev 5831)
@@ -50,11 +50,26 @@
return extractField(target.getClass(), target, fieldName);
}
- private static Object extractField(Class type, Object target, String fieldName)
+ public static void replaceField(Object newValue, String fieldName, Object owner, Class baseType)
{
Field field;
try
{
+ field = baseType.getDeclaredField(fieldName);
+ field.setAccessible(true);
+ field.set(owner, newValue);
+ } catch (Exception e)
+ {
+ throw new RuntimeException(e);//just to simplify exception handeling
+ }
+ }
+
+
+ public static Object extractField(Class type, Object target, String fieldName)
+ {
+ Field field;
+ try
+ {
field = type.getDeclaredField(fieldName);
field.setAccessible(true);
return field.get(target);
@@ -65,8 +80,7 @@
{
e.printStackTrace();
return null;
- }
- else
+ } else
{
// try with superclass!!
return extractField(type.getSuperclass(), target, fieldName);
@@ -272,8 +286,7 @@
if (members == null || memberCount > members.size())
{
return false;
- }
- else if (memberCount < members.size())
+ } else if (memberCount < members.size())
{
// This is an exceptional condition
StringBuffer sb = new StringBuffer("Cache at address ");
@@ -315,8 +328,7 @@
if (members == null || memberCount > members.size())
{
return false;
- }
- else if (memberCount < members.size())
+ } else if (memberCount < members.size())
{
if (barfIfTooManyMembers)
{
@@ -339,8 +351,7 @@
sb.append(')');
throw new IllegalStateException(sb.toString());
- }
- else return false;
+ } else return false;
}
return true;
@@ -596,8 +607,7 @@
if (c == null)
{
System.out.println(" ** Cache " + count + " is null!");
- }
- else
+ } else
{
System.out.println(" ** Cache " + count + " is " + c.getLocalAddress());
System.out.println(" " + CachePrinter.printCacheLockingInfo(c));
Modified: core/trunk/src/test/java/org/jboss/cache/statetransfer/StateTransferConcurrencyTest.java
===================================================================
--- core/trunk/src/test/java/org/jboss/cache/statetransfer/StateTransferConcurrencyTest.java 2008-05-12 16:45:25 UTC (rev 5830)
+++ core/trunk/src/test/java/org/jboss/cache/statetransfer/StateTransferConcurrencyTest.java 2008-05-12 17:20:48 UTC (rev 5831)
@@ -7,20 +7,14 @@
package org.jboss.cache.statetransfer;
-import org.jboss.cache.Cache;
-import org.jboss.cache.CacheException;
-import org.jboss.cache.CacheSPI;
-import org.jboss.cache.DefaultCacheFactory;
-import org.jboss.cache.Fqn;
-import org.jboss.cache.Node;
-import org.jboss.cache.Region;
-import org.jboss.cache.eviction.LRUConfiguration;
+import org.jboss.cache.*;
import org.jboss.cache.config.Configuration;
-import org.jboss.cache.config.EvictionRegionConfig;
import org.jboss.cache.config.Configuration.CacheMode;
import org.jboss.cache.factories.UnitTestCacheConfigurationFactory;
import org.jboss.cache.marshall.InactiveRegionException;
import org.jboss.cache.misc.TestingUtil;
+import org.jboss.cache.util.internals.EvictionController;
+import org.jboss.cache.util.internals.ReplicationQueueNotifier;
import static org.testng.AssertJUnit.*;
import org.testng.annotations.Test;
@@ -111,8 +105,7 @@
semaphore.release(count);
// Sleep to ensure the threads get all the semaphore tickets
- TestingUtil.sleepThread((long) 1000);
-
+ waitUntillUsersFinish(activators);
// Reacquire the semaphore tickets; when we have them all
// we know the threads are done
for (int i = 0; i < count; i++)
@@ -124,10 +117,10 @@
}
}
- // Sleep to allow any async calls to clear
+ // allow any async calls to clear
if (!sync)
{
- TestingUtil.sleepThread(2000);
+ waitTillAllReplicationsFinish(count, caches);
}
// Ensure the caches held by the activators see all the values
@@ -153,6 +146,7 @@
}
catch (Exception ex)
{
+ ex.printStackTrace();
fail(ex.getLocalizedMessage());
}
finally
@@ -165,6 +159,24 @@
}
+ private void waitUntillUsersFinish(CacheActivator[] activators)
+ throws Exception
+ {
+ for (CacheActivator activator : activators)
+ {
+ activator.waitUntillFinished();
+ }
+ }
+
+ private void waitTillAllReplicationsFinish(int count, CacheSPI[] caches)
+ throws Exception
+ {
+ for (int i = 0; i < count; i++)
+ {
+ new ReplicationQueueNotifier(caches[i]).waitUntillAllReplicated(2000);
+ }
+ }
+
/**
* Starts two caches where each cache has N regions. We put some data in each of the regions.
* We run two threads where each thread creates a cache then goes into a loop where it
@@ -230,7 +242,7 @@
// Sleep to allow any async calls to clear
if (!sync)
{
- TestingUtil.sleepThread(1000);
+ waitTillAllReplicationsFinish(count, caches);
}
// Ensure the caches held by the activators see all the values
@@ -411,9 +423,11 @@
TestingUtil.sleepThread(100);
}
- // Sleep to allow any in transit msgs to clear
- // if (!sync)
- TestingUtil.sleepThread(1000);
+ // Sleep to allow any async calls to clear
+ if (!sync)
+ {
+ waitTillAllReplicationsFinish(count, caches);
+ }
// Ensure the stressors saw no exceptions
for (int i = 0; i < count; i++)
@@ -425,8 +439,6 @@
}
- TestingUtil.sleepThread(1000);
-
// Compare cache contents
for (int i = 0; i < count; i++)
{
@@ -456,13 +468,10 @@
}
}
}
-
}
/**
* Test for JBCACHE-913
- *
- * @throws Exception
*/
public void testEvictionSeesStateTransfer() throws Exception
{
@@ -493,8 +502,6 @@
/**
* Further test for JBCACHE-913
- *
- * @throws Exception
*/
public void testEvictionAfterStateTransfer() throws Exception
{
@@ -516,24 +523,21 @@
}
}
- Thread.sleep(5000);
- assert cache1.getRoot().getChild(Fqn.fromString("/org/jboss/data")).getChildren().size() == 5000;
+ EvictionController ec1 = new EvictionController(cache1);
+ ec1.startEviction();
+ assert cache1.getRoot().getChild(Fqn.fromString("/org/jboss/data")).getChildren().size() == 5000;
+
c = UnitTestCacheConfigurationFactory.createConfiguration(CacheMode.REPL_SYNC, true);
final Cache<Object, Object> cache2 = new DefaultCacheFactory().createCache(c, false);
- log.info("***** before starting the second cache");
cache2.start();
- log.info("***** after starting the second cache");
caches.put("evict2", cache2);
Node<Object, Object> parent = cache2.getRoot().getChild(Fqn.fromString("/org/jboss/test/data"));
parent = cache2.getRoot().getChild(Fqn.fromString("/org/jboss/data"));
Set children = parent.getChildren();
- System.out.println("children.size() = " + children.size());
- System.out.println("cache1.children size = " + cache1.getRoot().getChild(Fqn.fromString("/org/jboss/data")).getChildren().size());
- log.info("***** cache1.children size = " + cache1.getRoot().getChild(Fqn.fromString("/org/jboss/data")).getChildren().size());
- log.info("***** children.size() = " + children.size());
- assertTrue("Minimum number of base children transferred", children.size() >= 4999); //4999 because the root of the region will also be counted, as it is not resident
+ //4999 because the root of the region will also be counted, as it is not resident
+ assertTrue("Minimum number of base children transferred", children.size() >= 4999);
// Sleep 2.5 secs so the nodes we are about to create in data won't
// exceed the 4 sec TTL when eviction thread runs
@@ -628,8 +632,8 @@
assertNull("No exceptions in p1", p1.ex);
assertNull("No exceptions in p2", p2.ex);
- // Sleep 5.1 secs so we are sure the eviction thread ran
- TestingUtil.sleepThread(5100);
+ EvictionController ec2 = new EvictionController(cache2);
+ ec2.startEviction();
parent = cache2.getRoot().getChild(Fqn.fromString("/org/jboss/test/data"));
children = parent.getChildren();
@@ -648,61 +652,18 @@
// Sleep more to let the eviction thread run again,
// which will evict all data nodes due to their ttl of 4 secs
- TestingUtil.sleepThread(8100);
+ ec2.evictRegionWithTimeToLive("/org/jboss/test/data");
parent = cache2.getRoot().getChild(Fqn.fromString("/org/jboss/test/data"));
- children = parent.getChildren();
- if (children != null)
+ if (parent != null)
{
- assertEquals("All data children evicted", 0, children.size());
- }
- }
-
- /**
- * tests that after the state transfer takes place the correct number of nodes is being evcited.
- */
- public void testEvictionAfterStateTransferSimple() throws Exception
- {
- Configuration c = UnitTestCacheConfigurationFactory.createConfiguration(CacheMode.REPL_SYNC, true);
- String baseRegion = "/base";
- int maxRegionNodeCount = 5;
-
-// //set max node node to 5 on default
- ((LRUConfiguration)c.getEvictionConfig().getEvictionRegionConfigs().get(0).getEvictionPolicyConfig()).setMaxNodes(maxRegionNodeCount);
- EvictionRegionConfig baseRegionConfig = c.getEvictionConfig().getEvictionRegionConfigs().get(1).clone();
- baseRegionConfig.setRegionFqn(Fqn.fromString(baseRegion));
- ((LRUConfiguration)baseRegionConfig.getEvictionPolicyConfig()).setMaxAgeSeconds(1000);
- ((LRUConfiguration)baseRegionConfig.getEvictionPolicyConfig()).setMaxNodes(maxRegionNodeCount);
- c.getEvictionConfig().getEvictionRegionConfigs().add(baseRegionConfig);
-
-
- Cache<Object, Object> cache1 = new DefaultCacheFactory().createCache(c, true);
- caches.put("evict1", cache1);
- cache1.getRegion(Fqn.fromString(baseRegion), true).activate();
-
- for (int i = 0; i < maxRegionNodeCount + 5; i++)
- {
- cache1.put(Fqn.fromString(baseRegion + "/" + i), "key", "base" + i);
- if (i == 0)
+ children = parent.getChildren();
+ if (children != null)
{
- cache1.getRoot().getChild(Fqn.fromString(baseRegion)).setResident(true); //so that it won't be counted for eviction
+ assertEquals("All data children evicted", 0, children.size());
}
}
- cache1.put(Fqn.fromString("/org/jboss/test/data/" + 0), "key", "data" + 0);
-
- Thread.sleep(5000);
- assert cache1.getRoot().getChild(Fqn.fromString(baseRegion)).getChildren().size() == maxRegionNodeCount;
- System.out.println("cache1.getRoot().getChild(Fqn.fromString(baseRegion)).getChildren().size() = " + cache1.getRoot().getChild(Fqn.fromString(baseRegion)).getChildren());
-
- final Cache<Object, Object> cache2 = new DefaultCacheFactory().createCache(c.clone(), false);
- cache2.start();
- caches.put("evict2", cache2);
- Thread.sleep(5000);
-
- Node parent = cache2.getRoot().getChild(Fqn.fromString(baseRegion));
- Set children = parent.getChildren();
}
-
private class CacheActivator extends CacheUser
{
@@ -722,42 +683,11 @@
System.out.println("---- Cache" + name + " = " + cache.getLocalAddress() + " being used");
TestingUtil.sleepRandom(5000);
createAndActivateRegion(cache, A_B);
-// waitUntillAllChachesActivatedRegion();
System.out.println(name + " activated region" + " " + System.currentTimeMillis());
Fqn childFqn = Fqn.fromRelativeElements(A_B, name);
-
cache.put(childFqn, "KEY", "VALUE");
- // System.out.println(name + " put fqn " + childFqn + " " + System.currentTimeMillis());
-
}
- /**
- * If we do not wait for all being activated, following scenario might happen:
- * A activates the /a/b
- * A puts something in /a/b and replicates
- * B fails to accept the replication as it has the /a/b region inactive.
- * <p/>
- * So we cannot expect all the put operation to replicate accross all the members from the cluser, WITHOUTH
- * having the region active on ALL members.
- */
- private void waitUntillAllChachesActivatedRegion()
- {
- boolean allActive = true;
- do
- {
- allActive = true;
- for (Cache cache : caches)
- {
- if (cache.getRegion(A_B, false) == null || !cache.getRegion(A_B, false).isActive())
- {
- allActive = false;
- }
- }
- TestingUtil.sleepThread(1000);
- } while (!allActive);
- System.out.println("---- /a/b is active on all cache instances");
- }
-
public Object getCacheValue(Fqn fqn) throws CacheException
{
return cache.get(fqn, "KEY");
Added: core/trunk/src/test/java/org/jboss/cache/util/internals/EvictionController.java
===================================================================
--- core/trunk/src/test/java/org/jboss/cache/util/internals/EvictionController.java (rev 0)
+++ core/trunk/src/test/java/org/jboss/cache/util/internals/EvictionController.java 2008-05-12 17:20:48 UTC (rev 5831)
@@ -0,0 +1,89 @@
+package org.jboss.cache.util.internals;
+
+import org.jboss.cache.*;
+import org.jboss.cache.config.EvictionConfig;
+import org.jboss.cache.config.EvictionRegionConfig;
+import org.jboss.cache.eviction.EvictionTimerTask;
+import org.jboss.cache.eviction.LRUConfiguration;
+import org.jboss.cache.misc.TestingUtil;
+
+import java.lang.reflect.Method;
+import java.util.Timer;
+
+/**
+ * when used on a cache will disable defaul eviction behavior and it will supply means of kicking off evction
+ * programmatically. It is intended for replcaing Thread.sleep(xyz) - like statements in which the executing tests wait
+ * untill eviction finishes.
+ *
+ * @author Mircea.Markus(a)jboss.com
+ * @since 2.2
+ */
+public class EvictionController
+{
+ CacheSPI cache;
+ EvictionTimerTask timerTask;
+
+ public EvictionController(Cache cache)
+ {
+ this.cache = (CacheSPI) cache;
+ RegionManager regionManager = this.cache.getRegionManager();
+ if (regionManager == null)
+ {
+ throw new IllegalStateException("Null region manager; is the cache started?");
+ }
+ timerTask = (EvictionTimerTask) TestingUtil.extractField(regionManager, "evictionTimerTask");
+ if (timerTask == null)
+ {
+ throw new IllegalStateException("No timer task!!!");
+ }
+ Timer evictionThread = (Timer) TestingUtil.extractField(timerTask, "evictionThread");
+ evictionThread.cancel();
+ }
+
+ public void startEviction() throws Exception
+ {
+ Method method = EvictionTimerTask.class.getDeclaredMethod("processRegions", null);
+ method.setAccessible(true);
+ method.invoke(timerTask);
+ }
+
+ /**
+ * Evicts the given region but only after ensuring that region's TTL passed.
+ */
+ public void evictRegionWithTimeToLive(String region) throws Exception
+ {
+ EvictionConfig evConfig = cache.getConfiguration().getEvictionConfig();
+ EvictionRegionConfig erConfig = evConfig.getEvictionRegionConfig(region);
+ if (erConfig == null)
+ {
+ throw new IllegalStateException("No such region!");
+ }
+ int ttl = 0;
+ if (erConfig.getEvictionPolicyConfig() instanceof LRUConfiguration)
+ {
+ LRUConfiguration configuration = (LRUConfiguration) erConfig.getEvictionPolicyConfig();
+ ttl = configuration.getTimeToLiveSeconds();
+ } else
+ {
+ throw new IllegalArgumentException("Only LRU being handled for now; please add other implementations here");
+ }
+ TestingUtil.sleepThread(ttl * 1000 + 500);
+ evictRegion(region);
+ }
+
+ /**
+ * Only evicts the given region.
+ */
+ public void evictRegion(String regionStr) throws Exception
+ {
+ for (Region region : timerTask.getProcessedRegions())
+ {
+ if (region.getFqn().equals(Fqn.fromString(regionStr)))
+ {
+ Method method = EvictionTimerTask.class.getDeclaredMethod("handleRegion", Region.class);
+ method.setAccessible(true);
+ method.invoke(timerTask, region);
+ }
+ }
+ }
+}
Added: core/trunk/src/test/java/org/jboss/cache/util/internals/ReplicationQueueNotifier.java
===================================================================
--- core/trunk/src/test/java/org/jboss/cache/util/internals/ReplicationQueueNotifier.java (rev 0)
+++ core/trunk/src/test/java/org/jboss/cache/util/internals/ReplicationQueueNotifier.java 2008-05-12 17:20:48 UTC (rev 5831)
@@ -0,0 +1,89 @@
+package org.jboss.cache.util.internals;
+
+import org.jboss.cache.Cache;
+import org.jboss.cache.config.Configuration;
+import org.jboss.cache.cluster.ReplicationQueue;
+import org.jboss.cache.factories.ComponentRegistry;
+import org.jboss.cache.interceptors.BaseRpcInterceptor;
+import org.jboss.cache.interceptors.InterceptorChain;
+import org.jboss.cache.interceptors.base.CommandInterceptor;
+import org.jboss.cache.invocation.CacheInvocationDelegate;
+import org.jboss.cache.misc.TestingUtil;
+
+import java.util.List;
+
+/**
+ * Knows how to notify one whether on certain state changes in the replication queue.
+ *
+ * @author Mircea.Markus(a)jboss.com
+ * @since 2.2
+ */
+public class ReplicationQueueNotifier
+{
+ private CacheInvocationDelegate cache;
+ private Object replicated = new Object();
+
+ public ReplicationQueueNotifier(Cache cache)
+ {
+ this.cache = (CacheInvocationDelegate) cache;
+ if (!isAsync(cache))
+ {
+ throw new RuntimeException("No queue events expected on a sync cache!");
+ }
+ replaceInternal();
+ }
+
+ private boolean isAsync(Cache cache)
+ {
+ return cache.getConfiguration().getCacheMode() == Configuration.CacheMode.INVALIDATION_ASYNC ||
+ cache.getConfiguration().getCacheMode() == Configuration.CacheMode.REPL_ASYNC;
+ }
+
+ private void replaceInternal()
+ {
+ ComponentRegistry componentRegistry = TestingUtil.extractComponentRegistry(cache);
+ InterceptorChain ic = componentRegistry.getComponent(InterceptorChain.class);
+ List<CommandInterceptor> commands = ic.getInterceptorsWhichExtend(BaseRpcInterceptor.class);
+ for (CommandInterceptor interceptor: commands)
+ {
+ ReplicationQueue original = (ReplicationQueue) TestingUtil.extractField(BaseRpcInterceptor.class, interceptor, "replicationQueue");
+ TestingUtil.replaceField(new ReplicationQueueDelegate(original),"replicationQueue", interceptor, BaseRpcInterceptor.class);
+ log("replaced replicationQueue in " + interceptor.getClass());
+ }
+ }
+
+ public void waitUntillAllReplicated(long timeout) throws Exception
+ {
+ synchronized (replicated)
+ {
+ replicated.wait(timeout);
+ }
+ log("returning from waitUntillAllReplicated call");
+ }
+
+ private class ReplicationQueueDelegate extends ReplicationQueue
+ {
+ ReplicationQueue original;
+
+ private ReplicationQueueDelegate(ReplicationQueue original)
+ {
+ this.original = original;
+ }
+
+ @Override
+ public void flush()
+ {
+ log("Flush invoked!");
+ original.flush();
+ synchronized (replicated)
+ {
+ replicated.notifyAll();
+ }
+ }
+ }
+
+ private void log(String str)
+ {
+ System.out.println("ReplicationQueueNotifier >>> " + str);
+ }
+}
16 years, 7 months