Author: manik.surtani(a)jboss.com
Date: 2008-05-14 06:51:05 -0400 (Wed, 14 May 2008)
New Revision: 5839
Modified:
core/trunk/src/main/java/org/jboss/cache/interceptors/CacheStoreInterceptor.java
core/trunk/src/main/java/org/jboss/cache/interceptors/OptimisticLockingInterceptor.java
core/trunk/src/main/java/org/jboss/cache/interceptors/OptimisticTxInterceptor.java
core/trunk/src/main/java/org/jboss/cache/interceptors/OptimisticValidatorInterceptor.java
core/trunk/src/main/java/org/jboss/cache/interceptors/TxInterceptor.java
core/trunk/src/test/java/org/jboss/cache/optimistic/DataVersionPersistenceTest.java
core/trunk/src/test/java/org/jboss/cache/transaction/ConcurrentBankTest.java
Log:
One-phase commit and optimistic locking
Modified:
core/trunk/src/main/java/org/jboss/cache/interceptors/CacheStoreInterceptor.java
===================================================================
---
core/trunk/src/main/java/org/jboss/cache/interceptors/CacheStoreInterceptor.java 2008-05-14
10:24:03 UTC (rev 5838)
+++
core/trunk/src/main/java/org/jboss/cache/interceptors/CacheStoreInterceptor.java 2008-05-14
10:51:05 UTC (rev 5839)
@@ -176,12 +176,7 @@
@Override
public Object handleOptimisticPrepareCommand(InvocationContext ctx,
OptimisticPrepareCommand command) throws Throwable
{
- if (inTransaction())
- {
- if (trace) log.trace("transactional so don't put stuff in the cloader
yet.");
- prepareCacheLoader(command.getGlobalTransaction(), ctx.getTransactionEntry(),
command.isOnePhaseCommit());
- }
- return invokeNextInterceptor(ctx, command);
+ return handlePrepareCommand(ctx, command);
}
@Override
Modified:
core/trunk/src/main/java/org/jboss/cache/interceptors/OptimisticLockingInterceptor.java
===================================================================
---
core/trunk/src/main/java/org/jboss/cache/interceptors/OptimisticLockingInterceptor.java 2008-05-14
10:24:03 UTC (rev 5838)
+++
core/trunk/src/main/java/org/jboss/cache/interceptors/OptimisticLockingInterceptor.java 2008-05-14
10:51:05 UTC (rev 5839)
@@ -92,7 +92,7 @@
}
finally
{
- if (!succeeded || command.isOnePhaseCommit()) unlock(ctx, gtx);
+ if (!succeeded) unlock(ctx, gtx);
}
}
Modified:
core/trunk/src/main/java/org/jboss/cache/interceptors/OptimisticTxInterceptor.java
===================================================================
---
core/trunk/src/main/java/org/jboss/cache/interceptors/OptimisticTxInterceptor.java 2008-05-14
10:24:03 UTC (rev 5838)
+++
core/trunk/src/main/java/org/jboss/cache/interceptors/OptimisticTxInterceptor.java 2008-05-14
10:51:05 UTC (rev 5839)
@@ -29,6 +29,11 @@
{
protected final ModificationsReplayVisitor replayVisitor = new
ModificationsReplayVisitor();
+ public OptimisticTxInterceptor()
+ {
+ optimistic = true;
+ }
+
@Override
public Object visitOptimisticPrepareCommand(InvocationContext ctx,
OptimisticPrepareCommand command) throws Throwable
{
@@ -108,7 +113,8 @@
@Override
protected PrepareCommand buildPrepareCommand(GlobalTransaction gtx, List
modifications, boolean onePhaseCommit)
{
- return commandsFactory.buildOptimisticPrepareCommand(gtx, modifications, null,
rpcManager.getLocalAddress(), onePhaseCommit);
+ // optimistic locking NEVER does one-phase prepares.
+ return commandsFactory.buildOptimisticPrepareCommand(gtx, modifications, null,
rpcManager.getLocalAddress(), false);
}
/**
Modified:
core/trunk/src/main/java/org/jboss/cache/interceptors/OptimisticValidatorInterceptor.java
===================================================================
---
core/trunk/src/main/java/org/jboss/cache/interceptors/OptimisticValidatorInterceptor.java 2008-05-14
10:24:03 UTC (rev 5838)
+++
core/trunk/src/main/java/org/jboss/cache/interceptors/OptimisticValidatorInterceptor.java 2008-05-14
10:51:05 UTC (rev 5839)
@@ -136,13 +136,7 @@
}
}
log.debug("Successfully validated nodes");
- Object retval = invokeNextInterceptor(ctx, command);
- if (command.isOnePhaseCommit())
- {
- // do a comit-phase
- commitTransaction(ctx);
- }
- return retval;
+ return invokeNextInterceptor(ctx, command);
}
private void commitTransaction(InvocationContext ctx)
Modified: core/trunk/src/main/java/org/jboss/cache/interceptors/TxInterceptor.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/interceptors/TxInterceptor.java 2008-05-14
10:24:03 UTC (rev 5838)
+++ core/trunk/src/main/java/org/jboss/cache/interceptors/TxInterceptor.java 2008-05-14
10:51:05 UTC (rev 5839)
@@ -74,6 +74,7 @@
private long prepares = 0;
private long commits = 0;
private long rollbacks = 0;
+ protected boolean optimistic = false;
@Inject
public void intialize(RPCManager rpcManager,
@@ -646,7 +647,7 @@
private boolean isOnePhaseCommit()
{
- if (!configuration.getCacheMode().isSynchronous())
+ if (!configuration.getCacheMode().isSynchronous() && !optimistic)
{
// this is a REPL_ASYNC call - do 1-phase commit. break!
if (trace) log.trace("This is a REPL_ASYNC call (1 phase commit) - do
nothing for beforeCompletion()");
@@ -961,7 +962,7 @@
switch (status)
{
case Status.STATUS_COMMITTED:
- boolean onePhaseCommit =
!configuration.getCacheMode().isSynchronous();
+ boolean onePhaseCommit = isOnePhaseCommit();
if (log.isDebugEnabled()) log.debug("Running commit phase. One
phase? " + onePhaseCommit);
runCommitPhase(ctx, gtx, modifications, onePhaseCommit);
log.debug("Finished commit phase");
Modified:
core/trunk/src/test/java/org/jboss/cache/optimistic/DataVersionPersistenceTest.java
===================================================================
---
core/trunk/src/test/java/org/jboss/cache/optimistic/DataVersionPersistenceTest.java 2008-05-14
10:24:03 UTC (rev 5838)
+++
core/trunk/src/test/java/org/jboss/cache/optimistic/DataVersionPersistenceTest.java 2008-05-14
10:51:05 UTC (rev 5839)
@@ -14,6 +14,8 @@
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
+import java.io.IOException;
+
/**
* Tests whether data versions are transferred along with state
*
@@ -27,7 +29,7 @@
private CacheLoader loader;
@BeforeMethod
- public void setUp()
+ public void setUp() throws IOException
{
cache = new DefaultCacheFactory().createCache(false);
@@ -36,6 +38,7 @@
CacheLoaderConfig clc = new CacheLoaderConfig();
CacheLoaderConfig.IndividualCacheLoaderConfig iclc = new
CacheLoaderConfig.IndividualCacheLoaderConfig();
+ iclc.setProperties("debug=true");
iclc.setClassName(DummySharedInMemoryCacheLoader.class.getName());
clc.addIndividualCacheLoaderConfig(iclc);
Modified: core/trunk/src/test/java/org/jboss/cache/transaction/ConcurrentBankTest.java
===================================================================
---
core/trunk/src/test/java/org/jboss/cache/transaction/ConcurrentBankTest.java 2008-05-14
10:24:03 UTC (rev 5838)
+++
core/trunk/src/test/java/org/jboss/cache/transaction/ConcurrentBankTest.java 2008-05-14
10:51:05 UTC (rev 5839)
@@ -36,17 +36,17 @@
public class ConcurrentBankTest
{
private CacheSPI<Object, Integer> cache;
- private static Log logger_ = LogFactory.getLog(ConcurrentBankTest.class);
+ private static Log log = LogFactory.getLog(ConcurrentBankTest.class);
private final Fqn NODE = Fqn.fromString("/cachetest");
private final int ROLLBACK_CHANCE = 100;
private static String customer[] = {"cu1", "cu2",
"cu3"};
private static final int BOOKINGS = 1000;
- private static boolean _testFailedinThread = false;
+ private static boolean testFailedinThread = false;
private void failMain()
{
- _testFailedinThread = true;
+ testFailedinThread = true;
}
@BeforeMethod(alwaysRun = true)
@@ -69,53 +69,33 @@
TransactionSetup.cleanup();
}
- public void testConcurrentBooking()
+ public void testConcurrentBooking() throws Exception
{
Teller one, two;
- try
+ if (cache.getRoot().get(NODE) == null)
{
- if (cache.getRoot().get(NODE) == null)
- {
- cache.put(NODE, "cu1", 1000);
- cache.put(NODE, "cu2", 1000);
- cache.put(NODE, "cu3", 1000);
- }
+ cache.put(NODE, "cu1", 1000);
+ cache.put(NODE, "cu2", 1000);
+ cache.put(NODE, "cu3", 1000);
+ }
- one = new Teller("one", cache);
- two = new Teller("two", cache);
+ one = new Teller("one", cache);
+ two = new Teller("two", cache);
- one.start();
- TestingUtil.sleepThread((long) 100);
- two.start();
- one.join();
- two.join();
+ one.start();
+ TestingUtil.sleepThread((long) 100);
+ two.start();
+ one.join();
+ two.join();
- log("lock info:\n" + CachePrinter.printCacheLockingInfo(cache) +
_testFailedinThread);
- if (_testFailedinThread)
- fail();
- }
- catch (Exception e)
- {
- e.printStackTrace();
- fail(e.toString());
- }
- finally
- {
- /*
- try {
- cache.remove(NODE);
- } catch (Exception e) {
- e.printStackTrace();
- fail();
- }
- */
- }
+ log("lock info:\n" + CachePrinter.printCacheLockingInfo(cache) +
testFailedinThread);
+ assert !testFailedinThread;
}
private static void log(String msg)
{
// System.out.println("-- [" + Thread.currentThread() + "]:
" + msg);
- logger_.info("-- [" + Thread.currentThread() + "]: " + msg);
+ log.info("-- [" + Thread.currentThread() + "]: " + msg);
}
private class Teller extends Thread