Author: manik.surtani(a)jboss.com
Date: 2008-01-22 14:34:40 -0500 (Tue, 22 Jan 2008)
New Revision: 5197
Added:
core/trunk/src/test/java/org/jboss/cache/transaction/PrepareCommitContentionTest.java
Modified:
core/trunk/src/main/java/org/jboss/cache/RPCManager.java
core/trunk/src/main/java/org/jboss/cache/RPCManagerImpl.java
core/trunk/src/main/java/org/jboss/cache/ReplicationQueue.java
core/trunk/src/main/java/org/jboss/cache/buddyreplication/BuddyManager.java
core/trunk/src/main/java/org/jboss/cache/interceptors/BaseRpcInterceptor.java
core/trunk/src/main/java/org/jboss/cache/interceptors/DataGravitatorInterceptor.java
core/trunk/src/main/java/org/jboss/cache/interceptors/OptimisticReplicationInterceptor.java
core/trunk/src/main/java/org/jboss/cache/interceptors/ReplicationInterceptor.java
core/trunk/src/main/java/org/jboss/cache/loader/ClusteredCacheLoader.java
core/trunk/src/test/java/org/jboss/cache/api/pfer/PutForExternalReadTestBase.java
core/trunk/src/test/java/org/jboss/cache/marshall/ReturnValueMarshallingTest.java
Log:
JBCACHE-1270 - use OOB messages for 2-phase commit messages
Modified: core/trunk/src/main/java/org/jboss/cache/RPCManager.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/RPCManager.java 2008-01-22 19:06:36 UTC (rev
5196)
+++ core/trunk/src/main/java/org/jboss/cache/RPCManager.java 2008-01-22 19:34:40 UTC (rev
5197)
@@ -43,42 +43,45 @@
/**
* Invokes an RPC call on other caches in the cluster.
*
- * @param recipients a list of Addresses to invoke the call on. If this is null,
the call is broadcast to the entire cluster.
- * @param methodCall the method call to invoke
- * @param mode the group request mode to use. See {@link
org.jgroups.blocks.GroupRequest}.
- * @param excludeSelf if true, the message is not looped back to the originator.
- * @param timeout a timeout after which to throw a replication exception.
- * @param responseFilter a response filter with which to filter out
failed/unwanted/invalid responses.
+ * @param recipients a list of Addresses to invoke the call on. If this is
null, the call is broadcast to the entire cluster.
+ * @param methodCall the method call to invoke
+ * @param mode the group request mode to use. See {@link
org.jgroups.blocks.GroupRequest}.
+ * @param excludeSelf if true, the message is not looped back to the
originator.
+ * @param timeout a timeout after which to throw a replication exception.
+ * @param responseFilter a response filter with which to filter out
failed/unwanted/invalid responses.
+ * @param useOutOfBandMessage if true, the message is put on JGroups' OOB queue.
See JGroups docs for more info.
* @return a list of responses from each member contacted.
* @throws Exception in the event of problems.
*/
- List<Object> callRemoteMethods(List<Address> recipients, MethodCall
methodCall, int mode, boolean excludeSelf, long timeout, RspFilter responseFilter) throws
Exception;
+ List<Object> callRemoteMethods(List<Address> recipients, MethodCall
methodCall, int mode, boolean excludeSelf, long timeout, RspFilter responseFilter, boolean
useOutOfBandMessage) throws Exception;
/**
* Invokes an RPC call on other caches in the cluster.
*
- * @param recipients a list of Addresses to invoke the call on. If this is null, the
call is broadcast to the entire cluster.
- * @param methodCall the method call to invoke
- * @param mode the group request mode to use. See {@link
org.jgroups.blocks.GroupRequest}.
- * @param excludeSelf if true, the message is not looped back to the originator.
- * @param timeout a timeout after which to throw a replication exception.
+ * @param recipients a list of Addresses to invoke the call on. If this is
null, the call is broadcast to the entire cluster.
+ * @param methodCall the method call to invoke
+ * @param mode the group request mode to use. See {@link
org.jgroups.blocks.GroupRequest}.
+ * @param excludeSelf if true, the message is not looped back to the
originator.
+ * @param timeout a timeout after which to throw a replication exception.
+ * @param useOutOfBandMessage if true, the message is put on JGroups' OOB queue.
See JGroups docs for more info.
* @return a list of responses from each member contacted.
* @throws Exception in the event of problems.
*/
- List<Object> callRemoteMethods(List<Address> recipients, MethodCall
methodCall, int mode, boolean excludeSelf, long timeout) throws Exception;
+ List<Object> callRemoteMethods(List<Address> recipients, MethodCall
methodCall, int mode, boolean excludeSelf, long timeout, boolean useOutOfBandMessage)
throws Exception;
/**
* Invokes an RPC call on other caches in the cluster.
*
- * @param recipients a list of Addresses to invoke the call on. If this is null, the
call is broadcast to the entire cluster.
- * @param methodCall the method call to invoke
- * @param synchronous if true, sets group request mode to {@link
org.jgroups.blocks.GroupRequest#GET_ALL}, and if false sets it to {@link
org.jgroups.blocks.GroupRequest#GET_NONE}.
- * @param excludeSelf if true, the message is not looped back to the originator.
- * @param timeout a timeout after which to throw a replication exception.
+ * @param recipients a list of Addresses to invoke the call on. If this is
null, the call is broadcast to the entire cluster.
+ * @param methodCall the method call to invoke
+ * @param synchronous if true, sets group request mode to {@link
org.jgroups.blocks.GroupRequest#GET_ALL}, and if false sets it to {@link
org.jgroups.blocks.GroupRequest#GET_NONE}.
+ * @param excludeSelf if true, the message is not looped back to the
originator.
+ * @param timeout a timeout after which to throw a replication exception.
+ * @param useOutOfBandMessage if true, the message is put on JGroups' OOB queue.
See JGroups docs for more info.
* @return a list of responses from each member contacted.
* @throws Exception in the event of problems.
*/
- List<Object> callRemoteMethods(List<Address> recipients, MethodCall
methodCall, boolean synchronous, boolean excludeSelf, int timeout) throws Exception;
+ List<Object> callRemoteMethods(List<Address> recipients, MethodCall
methodCall, boolean synchronous, boolean excludeSelf, int timeout, boolean
useOutOfBandMessage) throws Exception;
/**
* @return true if the current Channel is the coordinator of the cluster.
Modified: core/trunk/src/main/java/org/jboss/cache/RPCManagerImpl.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/RPCManagerImpl.java 2008-01-22 19:06:36 UTC
(rev 5196)
+++ core/trunk/src/main/java/org/jboss/cache/RPCManagerImpl.java 2008-01-22 19:34:40 UTC
(rev 5197)
@@ -20,6 +20,7 @@
import org.jboss.cache.marshall.InactiveRegionAwareRpcDispatcher;
import org.jboss.cache.marshall.Marshaller;
import org.jboss.cache.marshall.MethodCall;
+import org.jboss.cache.marshall.MethodDeclarations;
import org.jboss.cache.notifications.Notifier;
import org.jboss.cache.remoting.jgroups.CacheMessageListener;
import org.jboss.cache.statetransfer.StateTransferManager;
@@ -350,17 +351,17 @@
// ------------ START: RPC call methods ------------
- public List<Object> callRemoteMethods(List<Address> recipients, MethodCall
methodCall, int mode, boolean excludeSelf, long timeout) throws Exception
+ public List<Object> callRemoteMethods(List<Address> recipients, MethodCall
methodCall, int mode, boolean excludeSelf, long timeout, boolean useOutOfBandMessage)
throws Exception
{
- return callRemoteMethods(recipients, methodCall, mode, excludeSelf, timeout,
null);
+ return callRemoteMethods(recipients, methodCall, mode, excludeSelf, timeout, null,
useOutOfBandMessage);
}
- public List<Object> callRemoteMethods(List<Address> recipients, MethodCall
methodCall, boolean synchronous, boolean excludeSelf, int timeout) throws Exception
+ public List<Object> callRemoteMethods(List<Address> recipients, MethodCall
methodCall, boolean synchronous, boolean excludeSelf, int timeout, boolean
useOutOfBandMessage) throws Exception
{
- return callRemoteMethods(recipients, methodCall, synchronous ? GroupRequest.GET_ALL
: GroupRequest.GET_NONE, excludeSelf, timeout);
+ return callRemoteMethods(recipients, methodCall, synchronous ? GroupRequest.GET_ALL
: GroupRequest.GET_NONE, excludeSelf, timeout, useOutOfBandMessage);
}
- public List<Object> callRemoteMethods(List<Address> recipients, MethodCall
methodCall, int mode, boolean excludeSelf, long timeout, RspFilter responseFilter) throws
Exception
+ public List<Object> callRemoteMethods(List<Address> recipients, MethodCall
methodCall, int mode, boolean excludeSelf, long timeout, RspFilter responseFilter, boolean
useOutOfBandMessage) throws Exception
{
int modeToUse = mode;
int preferredMode;
@@ -406,8 +407,8 @@
throw new TimeoutException("State retrieval timed out waiting for flush
unblock.");
}
rsps = responseFilter == null
- ? disp.callRemoteMethods(validMembers, methodCall, modeToUse, timeout,
isUsingBuddyReplication)
- : disp.callRemoteMethods(validMembers, methodCall, modeToUse, timeout,
isUsingBuddyReplication, false, responseFilter);
+ ? disp.callRemoteMethods(validMembers, methodCall, modeToUse, timeout,
isUsingBuddyReplication, useOutOfBandMessage)
+ : disp.callRemoteMethods(validMembers, methodCall, modeToUse, timeout,
isUsingBuddyReplication, useOutOfBandMessage, responseFilter);
// a null response is 99% likely to be due to a marshalling problem - we throw a
NSE, this needs to be changed when
// JGroups supports
http://jira.jboss.com/jira/browse/JGRP-193
@@ -452,6 +453,12 @@
return retval;
}
+ private boolean isCommitMethod(MethodCall call)
+ {
+ return call.getMethodId() == MethodDeclarations.commitMethod_id ||
+ (call.getMethodId() == MethodDeclarations.replicateMethod_id &&
isCommitMethod((MethodCall) call.getArgs()[0]));
+ }
+
// ------------ END: RPC call methods ------------
// ------------ START: Partial state transfer methods ------------
Modified: core/trunk/src/main/java/org/jboss/cache/ReplicationQueue.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/ReplicationQueue.java 2008-01-22 19:06:36 UTC
(rev 5196)
+++ core/trunk/src/main/java/org/jboss/cache/ReplicationQueue.java 2008-01-22 19:34:40 UTC
(rev 5197)
@@ -157,7 +157,7 @@
try
{
// send to all live nodes in the cluster
- rpcManager.callRemoteMethods(null,
MethodCallFactory.create(MethodDeclarations.replicateAllMethod_id, l), false, true,
5000);
+ rpcManager.callRemoteMethods(null,
MethodCallFactory.create(MethodDeclarations.replicateAllMethod_id, l), false, true, 5000,
false);
}
catch (Throwable t)
{
Modified: core/trunk/src/main/java/org/jboss/cache/buddyreplication/BuddyManager.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/buddyreplication/BuddyManager.java 2008-01-22
19:06:36 UTC (rev 5196)
+++ core/trunk/src/main/java/org/jboss/cache/buddyreplication/BuddyManager.java 2008-01-22
19:34:40 UTC (rev 5197)
@@ -1019,7 +1019,7 @@
}
}
- rpcManager.callRemoteMethods(recipients, call, sync, true,
config.getBuddyCommunicationTimeout());
+ rpcManager.callRemoteMethods(recipients, call, sync, true,
config.getBuddyCommunicationTimeout(), false);
}
Modified: core/trunk/src/main/java/org/jboss/cache/interceptors/BaseRpcInterceptor.java
===================================================================
---
core/trunk/src/main/java/org/jboss/cache/interceptors/BaseRpcInterceptor.java 2008-01-22
19:06:36 UTC (rev 5196)
+++
core/trunk/src/main/java/org/jboss/cache/interceptors/BaseRpcInterceptor.java 2008-01-22
19:34:40 UTC (rev 5197)
@@ -73,6 +73,11 @@
}
}
+ protected void replicateCall(InvocationContext ctx, MethodCall call, boolean sync,
Option o, boolean useOutOfBandMessage) throws Throwable
+ {
+ replicateCall(ctx, null, call, sync, o, true, useOutOfBandMessage);
+ }
+
protected void replicateCall(InvocationContext ctx, MethodCall call, boolean sync,
Option o) throws Throwable
{
replicateCall(ctx, null, call, sync, o);
@@ -80,10 +85,10 @@
protected void replicateCall(InvocationContext ctx, List<Address> recipients,
MethodCall call, boolean sync, Option o) throws Throwable
{
- replicateCall(ctx, recipients, call, sync, o, true);
+ replicateCall(ctx, recipients, call, sync, o, true, false);
}
- protected void replicateCall(InvocationContext ctx, List<Address> recipients,
MethodCall call, boolean sync, Option o, boolean wrapMethodCallInReplicateMethod) throws
Throwable
+ protected void replicateCall(InvocationContext ctx, List<Address> recipients,
MethodCall call, boolean sync, Option o, boolean wrapMethodCallInReplicateMethod, boolean
useOutOfBandMessage) throws Throwable
{
if (trace) log.trace("Broadcasting call " + call + " to recipient
list " + recipients);
@@ -123,7 +128,9 @@
toCall,
sync, // is synchronised?
true, // ignore self?
- (int) syncReplTimeout);
+ (int) syncReplTimeout,
+ useOutOfBandMessage
+ );
if (trace) log.trace("responses=" + rsps);
if (sync) checkResponses(rsps);
}
Modified:
core/trunk/src/main/java/org/jboss/cache/interceptors/DataGravitatorInterceptor.java
===================================================================
---
core/trunk/src/main/java/org/jboss/cache/interceptors/DataGravitatorInterceptor.java 2008-01-22
19:06:36 UTC (rev 5196)
+++
core/trunk/src/main/java/org/jboss/cache/interceptors/DataGravitatorInterceptor.java 2008-01-22
19:34:40 UTC (rev 5197)
@@ -262,7 +262,7 @@
if (transactionMods.containsKey(gtx))
{
if (trace) log.trace("Broadcasting commit for gtx " + gtx);
- replicateCall(ctx, getMembersOutsideBuddyGroup(),
MethodCallFactory.create(MethodDeclarations.commitMethod_id, gtx), syncCommunications,
ctx.getOptionOverrides());
+ replicateCall(ctx, getMembersOutsideBuddyGroup(),
MethodCallFactory.create(MethodDeclarations.commitMethod_id, gtx), syncCommunications,
ctx.getOptionOverrides(), true, true);
}
else
{
@@ -310,7 +310,7 @@
// remove main Fqn
if (trace) log.trace("Performing cleanup on [" + backup.backupFqn +
"]");
// remove backup Fqn
- replicateCall(ctx, cache.getMembers(), cleanup, syncCommunications,
ctx.getOptionOverrides(), false);
+ replicateCall(ctx, cache.getMembers(), cleanup, syncCommunications,
ctx.getOptionOverrides(), false, false);
}
else
{
@@ -335,7 +335,7 @@
MethodCall dGrav =
MethodCallFactory.create(MethodDeclarations.dataGravitationMethod_id, fqn,
searchSubtrees);
// doing a GET_ALL is crappy but necessary since JGroups' GET_FIRST could
return null results from nodes that do
// not have either the primary OR backup, and stop polling other valid nodes.
- List resps = cache.getRPCManager().callRemoteMethods(mbrs, dGrav,
GroupRequest.GET_ALL, true, buddyManager.getBuddyCommunicationTimeout(), new
ResponseValidityFilter(mbrs, cache.getLocalAddress()));
+ List resps = cache.getRPCManager().callRemoteMethods(mbrs, dGrav,
GroupRequest.GET_ALL, true, buddyManager.getBuddyCommunicationTimeout(), new
ResponseValidityFilter(mbrs, cache.getLocalAddress()), false);
if (trace)
{
log.trace("got responses " + resps);
Modified:
core/trunk/src/main/java/org/jboss/cache/interceptors/OptimisticReplicationInterceptor.java
===================================================================
---
core/trunk/src/main/java/org/jboss/cache/interceptors/OptimisticReplicationInterceptor.java 2008-01-22
19:06:36 UTC (rev 5196)
+++
core/trunk/src/main/java/org/jboss/cache/interceptors/OptimisticReplicationInterceptor.java 2008-01-22
19:34:40 UTC (rev 5197)
@@ -217,7 +217,7 @@
if (log.isDebugEnabled())
log.debug("running remote commit for " + gtx + " and
coord=" + cache.getLocalAddress());
- replicateCall(ctx, commit_method, remoteCallSync, ctx.getOptionOverrides());
+ replicateCall(ctx, commit_method, remoteCallSync, ctx.getOptionOverrides(),
true);
}
catch (Exception e)
{
Modified:
core/trunk/src/main/java/org/jboss/cache/interceptors/ReplicationInterceptor.java
===================================================================
---
core/trunk/src/main/java/org/jboss/cache/interceptors/ReplicationInterceptor.java 2008-01-22
19:06:36 UTC (rev 5196)
+++
core/trunk/src/main/java/org/jboss/cache/interceptors/ReplicationInterceptor.java 2008-01-22
19:34:40 UTC (rev 5197)
@@ -45,7 +45,7 @@
{
return nextInterceptor(ctx);
}
- replicateCall(ctx, ctx.getMethodCall(), configuration.isSyncCommitPhase(),
ctx.getOptionOverrides());
+ replicateCall(ctx, ctx.getMethodCall(), configuration.isSyncCommitPhase(),
ctx.getOptionOverrides(), true);
return nextInterceptor(ctx);
}
Modified: core/trunk/src/main/java/org/jboss/cache/loader/ClusteredCacheLoader.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/loader/ClusteredCacheLoader.java 2008-01-22
19:06:36 UTC (rev 5196)
+++ core/trunk/src/main/java/org/jboss/cache/loader/ClusteredCacheLoader.java 2008-01-22
19:34:40 UTC (rev 5197)
@@ -103,7 +103,7 @@
MethodCall clusteredGet =
MethodCallFactory.create(MethodDeclarations.clusteredGetMethod_id, call, false);
List resps = null;
// JBCACHE-1186
- resps = cache.getRPCManager().callRemoteMethods(mbrs, clusteredGet,
GroupRequest.GET_ALL, true, config.getTimeout(), new ResponseValidityFilter(mbrs,
cache.getLocalAddress()));
+ resps = cache.getRPCManager().callRemoteMethods(mbrs, clusteredGet,
GroupRequest.GET_ALL, true, config.getTimeout(), new ResponseValidityFilter(mbrs,
cache.getLocalAddress()), false);
if (resps == null)
{
Modified:
core/trunk/src/test/java/org/jboss/cache/api/pfer/PutForExternalReadTestBase.java
===================================================================
---
core/trunk/src/test/java/org/jboss/cache/api/pfer/PutForExternalReadTestBase.java 2008-01-22
19:06:36 UTC (rev 5196)
+++
core/trunk/src/test/java/org/jboss/cache/api/pfer/PutForExternalReadTestBase.java 2008-01-22
19:34:40 UTC (rev 5197)
@@ -177,7 +177,7 @@
{
// specify what we expect called on the mock Rpc Manager. For params we
don't care about, just use ANYTHING.
// setting the mock object to expect the "sync" param to be false.
- expect(rpcManager.callRemoteMethods(anyAddresses(), (MethodCall) anyObject(),
eq(false), anyBoolean(), anyInt())).andReturn(null);
+ expect(rpcManager.callRemoteMethods(anyAddresses(), (MethodCall) anyObject(),
eq(false), anyBoolean(), anyInt(), anyBoolean())).andReturn(null);
}
replay(rpcManager);
@@ -240,7 +240,7 @@
List<Address> memberList = originalRpcManager.getMembers();
expect(barfingRpcManager.getMembers()).andReturn(memberList).anyTimes();
expect(barfingRpcManager.getLocalAddress()).andReturn(originalRpcManager.getLocalAddress()).anyTimes();
- expect(barfingRpcManager.callRemoteMethods(anyAddresses(), (MethodCall)
anyObject(), anyBoolean(), anyBoolean(), anyInt())).andThrow(new
RuntimeException("Barf!")).anyTimes();
+ expect(barfingRpcManager.callRemoteMethods(anyAddresses(), (MethodCall)
anyObject(), anyBoolean(), anyBoolean(), anyInt(), anyBoolean())).andThrow(new
RuntimeException("Barf!")).anyTimes();
replay(barfingRpcManager);
TestingUtil.extractComponentRegistry(cache1).registerComponent(RPCManager.class.getName(),
barfingRpcManager, RPCManager.class);
Modified:
core/trunk/src/test/java/org/jboss/cache/marshall/ReturnValueMarshallingTest.java
===================================================================
---
core/trunk/src/test/java/org/jboss/cache/marshall/ReturnValueMarshallingTest.java 2008-01-22
19:06:36 UTC (rev 5196)
+++
core/trunk/src/test/java/org/jboss/cache/marshall/ReturnValueMarshallingTest.java 2008-01-22
19:34:40 UTC (rev 5197)
@@ -97,7 +97,7 @@
MethodCallFactory.create(MethodDeclarations.getKeyValueMethodLocal_id, fqn,
key, false),
false);
- List responses = cache1.getRPCManager().callRemoteMethods(null, call, true, true,
15000);
+ List responses = cache1.getRPCManager().callRemoteMethods(null, call, true, true,
15000, false);
List response1 = (List) responses.get(0);// response from the first (and only)
node
Boolean found = (Boolean) response1.get(0);
@@ -127,7 +127,7 @@
MethodCall call =
MethodCallFactory.create(MethodDeclarations.dataGravitationMethod_id,
fqn, false);
- List responses = cache1.getRPCManager().callRemoteMethods(null, call, true, true,
15000);
+ List responses = cache1.getRPCManager().callRemoteMethods(null, call, true, true,
15000, false);
GravitateResult data = (GravitateResult) responses.get(0);// response from the
first (and only) node
assertTrue("Should have found remote data", data.isDataFound());
Added:
core/trunk/src/test/java/org/jboss/cache/transaction/PrepareCommitContentionTest.java
===================================================================
--- core/trunk/src/test/java/org/jboss/cache/transaction/PrepareCommitContentionTest.java
(rev 0)
+++
core/trunk/src/test/java/org/jboss/cache/transaction/PrepareCommitContentionTest.java 2008-01-22
19:34:40 UTC (rev 5197)
@@ -0,0 +1,208 @@
+package org.jboss.cache.transaction;
+
+import org.jboss.cache.CacheSPI;
+import org.jboss.cache.DefaultCacheFactory;
+import org.jboss.cache.RPCManager;
+import org.jboss.cache.RPCManagerImpl;
+import org.jboss.cache.config.Configuration;
+import org.jboss.cache.factories.ComponentRegistry;
+import org.jboss.cache.marshall.MethodCall;
+import org.jboss.cache.marshall.MethodDeclarations;
+import org.jboss.cache.misc.TestingUtil;
+import org.jgroups.Address;
+import org.jgroups.blocks.GroupRequest;
+import org.jgroups.blocks.RspFilter;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import javax.transaction.TransactionManager;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+
+/**
+ * This is to test the scenario described in
http://jira.jboss.org/jira/browse/JBCACHE-1270
+ * <p/>
+ * i) Node A sends prepare for GTX1; synchronous. Gets applied on Node B. Locks are held
on B.
+ * ii) Node A sends commit for GTX1; *asynchronous*.
+ * iii) Node A sends lots of other messages related to other sessions.
+ * iv) Node A sends prepare for GTX2; synchronous.
+ * v) Node B is busy, and by luck the GTX2 prepare gets to UNICAST before the GTX1
commit.
+ * vi) GTX2 prepare blocks due to locks from GTX1.
+ * vii) GTX1 commit is blocked in UNICAST because another thread from Node A is
executing.
+ *
+ * @author Manik Surtani (<a
href="mailto:manik@jboss.org">manik@jboss.org</a>)
+ * @since 2.1.0
+ */
+@Test(groups = "functional")
+public class PrepareCommitContentionTest
+{
+ CacheSPI<Object, Object> c1, c2;
+
+ @BeforeMethod
+ public void setUp() throws CloneNotSupportedException
+ {
+ c1 = (CacheSPI<Object, Object>) new DefaultCacheFactory<Object,
Object>().createCache(false);
+ c1.getConfiguration().setCacheMode(Configuration.CacheMode.REPL_SYNC);
+
c1.getConfiguration().setTransactionManagerLookupClass(DummyTransactionManagerLookup.class.getName());
+ c1.getConfiguration().setLockAcquisitionTimeout(5000);
+ c2 = (CacheSPI<Object, Object>) new DefaultCacheFactory<Object,
Object>().createCache(c1.getConfiguration().clone(), false);
+ }
+
+ @AfterMethod
+ public void tearDown()
+ {
+ TestingUtil.killCaches(c1, c2);
+ }
+
+ public void testWithSyncCommitPhase() throws Exception
+ {
+ doTest(true, false);
+ }
+
+ public void testWithDefautCommitPhase() throws Exception
+ {
+ doTest(false, false);
+ }
+
+ public void testControl() throws Exception
+ {
+ try
+ {
+ doTest(false, true);
+ assert false : "Should fail if we don't use out of band messages for
non-sync commits";
+ }
+ catch (AssertionError expected)
+ {
+ // should fail
+ }
+ }
+
+ private void doTest(final boolean syncCommit, boolean noOutOfBandMessages) throws
Exception
+ {
+ c1.getConfiguration().setSyncCommitPhase(syncCommit);
+ c2.getConfiguration().setSyncCommitPhase(syncCommit);
+
+ final CountDownLatch mainThreadCommitLatch = new CountDownLatch(1);
+ final CountDownLatch secondThreadPrepareLatch = new CountDownLatch(1);
+
+ DelayingRPCManager delayingRPCManager = new
DelayingRPCManager(mainThreadCommitLatch, secondThreadPrepareLatch, syncCommit,
noOutOfBandMessages);
+ ComponentRegistry cr = TestingUtil.extractComponentRegistry(c1);
+ cr.registerComponent(RPCManager.class.getName(), delayingRPCManager,
RPCManager.class);
+
+ c1.start();
+ c2.start();
+
+ TestingUtil.blockUntilViewsReceived(60000, c1, c2);
+
+ TransactionManager tm = c1.getTransactionManager();
+
+ Thread secondTransaction = new Thread("SecondThread")
+ {
+ public void run()
+ {
+ // wait until thread1 finishes the prepare.
+ try
+ {
+ secondThreadPrepareLatch.await();
+ }
+ catch (InterruptedException e)
+ {
+ // do nothing
+ }
+
+ try
+ {
+ // now replicate a put on a DIFFERENT node so there is no lock contention
+ TransactionManager tm = c1.getTransactionManager();
+ if (syncCommit)
+ mainThreadCommitLatch.countDown(); // we need to release the main
thread commit latch first otherwise it will deadlock!
+
+ tm.begin();
+ c1.put("/a/b/c", "k", "v2");
+ tm.commit();
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ }
+ }
+ };
+ secondTransaction.start();
+
+
+ tm.begin();
+ c1.put("/a/b/c", "k", "v");
+ tm.commit();
+
+ secondTransaction.join();
+
+ // now assert that both transactions have succeeded
+ assert c1.get("/a/b/c", "k").equals("v2");
+ assert c2.get("/a/b/c", "k").equals("v2");
+ }
+
+ public static class DelayingRPCManager extends RPCManagerImpl
+ {
+ CountDownLatch mainThreadLatch, secondThreadLatch;
+ boolean syncCommit;
+ boolean noOOBMessages = false;
+
+ public DelayingRPCManager(CountDownLatch latch1, CountDownLatch latch2, boolean
syncCommit, boolean noOOBMessages)
+ {
+ mainThreadLatch = latch1;
+ secondThreadLatch = latch2;
+ this.syncCommit = syncCommit;
+ this.noOOBMessages = noOOBMessages;
+ }
+
+ @Override
+ public List<Object> callRemoteMethods(final List<Address> recipients,
final MethodCall methodCall, final int mode, final boolean excludeSelf, final long
timeout, final RspFilter responseFilter, final boolean oob) throws Exception
+ {
+ if (isPrepareMethod(methodCall) &&
Thread.currentThread().getName().equals("SecondThread"))
+ {
+ if (!syncCommit) mainThreadLatch.countDown();
+ }
+ else if (isCommitMethod(methodCall) &&
!Thread.currentThread().getName().equals("SecondThread"))
+ {
+ Thread th = new Thread()
+ {
+ public void run()
+ {
+ try
+ {
+ secondThreadLatch.countDown(); // let the 2nd thread start it's
prepapre
+ mainThreadLatch.await(); // and block arbitrarily until
+ Thread.sleep(1000);
+ DelayingRPCManager.super.callRemoteMethods(recipients, methodCall,
mode, excludeSelf, timeout, responseFilter, !noOOBMessages && oob);
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ }
+ }
+ };
+ th.start();
+ if (mode == GroupRequest.GET_ALL) // sync mode!
+ {
+ th.join();
+ }
+ return Collections.emptyList();
+ }
+ return super.callRemoteMethods(recipients, methodCall, mode, excludeSelf,
timeout, responseFilter, !noOOBMessages && oob);
+ }
+
+ private boolean isCommitMethod(MethodCall call)
+ {
+ return call.getMethodId() == MethodDeclarations.commitMethod_id ||
+ (call.getMethodId() == MethodDeclarations.replicateMethod_id &&
isCommitMethod((MethodCall) call.getArgs()[0]));
+ }
+
+ private boolean isPrepareMethod(MethodCall call)
+ {
+ return call.getMethodId() == MethodDeclarations.prepareMethod_id ||
+ (call.getMethodId() == MethodDeclarations.replicateMethod_id &&
isPrepareMethod((MethodCall) call.getArgs()[0]));
+ }
+ }
+}