[jbosscache-commits] JBoss Cache SVN: r5197 - in core/trunk/src: main/java/org/jboss/cache/buddyreplication and 5 other directories.

jbosscache-commits at lists.jboss.org jbosscache-commits at lists.jboss.org
Tue Jan 22 14:34:41 EST 2008


Author: manik.surtani at jboss.com
Date: 2008-01-22 14:34:40 -0500 (Tue, 22 Jan 2008)
New Revision: 5197

Added:
   core/trunk/src/test/java/org/jboss/cache/transaction/PrepareCommitContentionTest.java
Modified:
   core/trunk/src/main/java/org/jboss/cache/RPCManager.java
   core/trunk/src/main/java/org/jboss/cache/RPCManagerImpl.java
   core/trunk/src/main/java/org/jboss/cache/ReplicationQueue.java
   core/trunk/src/main/java/org/jboss/cache/buddyreplication/BuddyManager.java
   core/trunk/src/main/java/org/jboss/cache/interceptors/BaseRpcInterceptor.java
   core/trunk/src/main/java/org/jboss/cache/interceptors/DataGravitatorInterceptor.java
   core/trunk/src/main/java/org/jboss/cache/interceptors/OptimisticReplicationInterceptor.java
   core/trunk/src/main/java/org/jboss/cache/interceptors/ReplicationInterceptor.java
   core/trunk/src/main/java/org/jboss/cache/loader/ClusteredCacheLoader.java
   core/trunk/src/test/java/org/jboss/cache/api/pfer/PutForExternalReadTestBase.java
   core/trunk/src/test/java/org/jboss/cache/marshall/ReturnValueMarshallingTest.java
Log:
JBCACHE-1270 - use OOB messages for 2-phase commit messages

Modified: core/trunk/src/main/java/org/jboss/cache/RPCManager.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/RPCManager.java	2008-01-22 19:06:36 UTC (rev 5196)
+++ core/trunk/src/main/java/org/jboss/cache/RPCManager.java	2008-01-22 19:34:40 UTC (rev 5197)
@@ -43,42 +43,45 @@
    /**
     * Invokes an RPC call on other caches in the cluster.
     *
-    * @param recipients     a list of Addresses to invoke the call on.  If this is null, the call is broadcast to the entire cluster.
-    * @param methodCall     the method call to invoke
-    * @param mode           the group request mode to use.  See {@link org.jgroups.blocks.GroupRequest}.
-    * @param excludeSelf    if true, the message is not looped back to the originator.
-    * @param timeout        a timeout after which to throw a replication exception.
-    * @param responseFilter a response filter with which to filter out failed/unwanted/invalid responses.
+    * @param recipients          a list of Addresses to invoke the call on.  If this is null, the call is broadcast to the entire cluster.
+    * @param methodCall          the method call to invoke
+    * @param mode                the group request mode to use.  See {@link org.jgroups.blocks.GroupRequest}.
+    * @param excludeSelf         if true, the message is not looped back to the originator.
+    * @param timeout             a timeout after which to throw a replication exception.
+    * @param responseFilter      a response filter with which to filter out failed/unwanted/invalid responses.
+    * @param useOutOfBandMessage if true, the message is put on JGroups' OOB queue.  See JGroups docs for more info.
     * @return a list of responses from each member contacted.
     * @throws Exception in the event of problems.
     */
-   List<Object> callRemoteMethods(List<Address> recipients, MethodCall methodCall, int mode, boolean excludeSelf, long timeout, RspFilter responseFilter) throws Exception;
+   List<Object> callRemoteMethods(List<Address> recipients, MethodCall methodCall, int mode, boolean excludeSelf, long timeout, RspFilter responseFilter, boolean useOutOfBandMessage) throws Exception;
 
    /**
     * Invokes an RPC call on other caches in the cluster.
     *
-    * @param recipients  a list of Addresses to invoke the call on.  If this is null, the call is broadcast to the entire cluster.
-    * @param methodCall  the method call to invoke
-    * @param mode        the group request mode to use.  See {@link org.jgroups.blocks.GroupRequest}.
-    * @param excludeSelf if true, the message is not looped back to the originator.
-    * @param timeout     a timeout after which to throw a replication exception.
+    * @param recipients          a list of Addresses to invoke the call on.  If this is null, the call is broadcast to the entire cluster.
+    * @param methodCall          the method call to invoke
+    * @param mode                the group request mode to use.  See {@link org.jgroups.blocks.GroupRequest}.
+    * @param excludeSelf         if true, the message is not looped back to the originator.
+    * @param timeout             a timeout after which to throw a replication exception.
+    * @param useOutOfBandMessage if true, the message is put on JGroups' OOB queue.  See JGroups docs for more info.
     * @return a list of responses from each member contacted.
     * @throws Exception in the event of problems.
     */
-   List<Object> callRemoteMethods(List<Address> recipients, MethodCall methodCall, int mode, boolean excludeSelf, long timeout) throws Exception;
+   List<Object> callRemoteMethods(List<Address> recipients, MethodCall methodCall, int mode, boolean excludeSelf, long timeout, boolean useOutOfBandMessage) throws Exception;
 
    /**
     * Invokes an RPC call on other caches in the cluster.
     *
-    * @param recipients  a list of Addresses to invoke the call on.  If this is null, the call is broadcast to the entire cluster.
-    * @param methodCall  the method call to invoke
-    * @param synchronous if true, sets group request mode to {@link org.jgroups.blocks.GroupRequest#GET_ALL}, and if false sets it to {@link org.jgroups.blocks.GroupRequest#GET_NONE}.
-    * @param excludeSelf if true, the message is not looped back to the originator.
-    * @param timeout     a timeout after which to throw a replication exception.
+    * @param recipients          a list of Addresses to invoke the call on.  If this is null, the call is broadcast to the entire cluster.
+    * @param methodCall          the method call to invoke
+    * @param synchronous         if true, sets group request mode to {@link org.jgroups.blocks.GroupRequest#GET_ALL}, and if false sets it to {@link org.jgroups.blocks.GroupRequest#GET_NONE}.
+    * @param excludeSelf         if true, the message is not looped back to the originator.
+    * @param timeout             a timeout after which to throw a replication exception.
+    * @param useOutOfBandMessage if true, the message is put on JGroups' OOB queue.  See JGroups docs for more info.
     * @return a list of responses from each member contacted.
     * @throws Exception in the event of problems.
     */
-   List<Object> callRemoteMethods(List<Address> recipients, MethodCall methodCall, boolean synchronous, boolean excludeSelf, int timeout) throws Exception;
+   List<Object> callRemoteMethods(List<Address> recipients, MethodCall methodCall, boolean synchronous, boolean excludeSelf, int timeout, boolean useOutOfBandMessage) throws Exception;
 
    /**
     * @return true if the current Channel is the coordinator of the cluster.

Modified: core/trunk/src/main/java/org/jboss/cache/RPCManagerImpl.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/RPCManagerImpl.java	2008-01-22 19:06:36 UTC (rev 5196)
+++ core/trunk/src/main/java/org/jboss/cache/RPCManagerImpl.java	2008-01-22 19:34:40 UTC (rev 5197)
@@ -20,6 +20,7 @@
 import org.jboss.cache.marshall.InactiveRegionAwareRpcDispatcher;
 import org.jboss.cache.marshall.Marshaller;
 import org.jboss.cache.marshall.MethodCall;
+import org.jboss.cache.marshall.MethodDeclarations;
 import org.jboss.cache.notifications.Notifier;
 import org.jboss.cache.remoting.jgroups.CacheMessageListener;
 import org.jboss.cache.statetransfer.StateTransferManager;
@@ -350,17 +351,17 @@
 
    // ------------ START: RPC call methods ------------
 
-   public List<Object> callRemoteMethods(List<Address> recipients, MethodCall methodCall, int mode, boolean excludeSelf, long timeout) throws Exception
+   public List<Object> callRemoteMethods(List<Address> recipients, MethodCall methodCall, int mode, boolean excludeSelf, long timeout, boolean useOutOfBandMessage) throws Exception
    {
-      return callRemoteMethods(recipients, methodCall, mode, excludeSelf, timeout, null);
+      return callRemoteMethods(recipients, methodCall, mode, excludeSelf, timeout, null, useOutOfBandMessage);
    }
 
-   public List<Object> callRemoteMethods(List<Address> recipients, MethodCall methodCall, boolean synchronous, boolean excludeSelf, int timeout) throws Exception
+   public List<Object> callRemoteMethods(List<Address> recipients, MethodCall methodCall, boolean synchronous, boolean excludeSelf, int timeout, boolean useOutOfBandMessage) throws Exception
    {
-      return callRemoteMethods(recipients, methodCall, synchronous ? GroupRequest.GET_ALL : GroupRequest.GET_NONE, excludeSelf, timeout);
+      return callRemoteMethods(recipients, methodCall, synchronous ? GroupRequest.GET_ALL : GroupRequest.GET_NONE, excludeSelf, timeout, useOutOfBandMessage);
    }
 
-   public List<Object> callRemoteMethods(List<Address> recipients, MethodCall methodCall, int mode, boolean excludeSelf, long timeout, RspFilter responseFilter) throws Exception
+   public List<Object> callRemoteMethods(List<Address> recipients, MethodCall methodCall, int mode, boolean excludeSelf, long timeout, RspFilter responseFilter, boolean useOutOfBandMessage) throws Exception
    {
       int modeToUse = mode;
       int preferredMode;
@@ -406,8 +407,8 @@
             throw new TimeoutException("State retrieval timed out waiting for flush unblock.");
       }
       rsps = responseFilter == null
-            ? disp.callRemoteMethods(validMembers, methodCall, modeToUse, timeout, isUsingBuddyReplication)
-            : disp.callRemoteMethods(validMembers, methodCall, modeToUse, timeout, isUsingBuddyReplication, false, responseFilter);
+            ? disp.callRemoteMethods(validMembers, methodCall, modeToUse, timeout, isUsingBuddyReplication, useOutOfBandMessage)
+            : disp.callRemoteMethods(validMembers, methodCall, modeToUse, timeout, isUsingBuddyReplication, useOutOfBandMessage, responseFilter);
 
       // a null response is 99% likely to be due to a marshalling problem - we throw a NSE, this needs to be changed when
       // JGroups supports http://jira.jboss.com/jira/browse/JGRP-193
@@ -452,6 +453,12 @@
       return retval;
    }
 
+   private boolean isCommitMethod(MethodCall call)
+   {
+      return call.getMethodId() == MethodDeclarations.commitMethod_id ||
+            (call.getMethodId() == MethodDeclarations.replicateMethod_id && isCommitMethod((MethodCall) call.getArgs()[0]));
+   }
+
    // ------------ END: RPC call methods ------------
 
    // ------------ START: Partial state transfer methods ------------

Modified: core/trunk/src/main/java/org/jboss/cache/ReplicationQueue.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/ReplicationQueue.java	2008-01-22 19:06:36 UTC (rev 5196)
+++ core/trunk/src/main/java/org/jboss/cache/ReplicationQueue.java	2008-01-22 19:34:40 UTC (rev 5197)
@@ -157,7 +157,7 @@
          try
          {
             // send to all live nodes in the cluster
-            rpcManager.callRemoteMethods(null, MethodCallFactory.create(MethodDeclarations.replicateAllMethod_id, l), false, true, 5000);
+            rpcManager.callRemoteMethods(null, MethodCallFactory.create(MethodDeclarations.replicateAllMethod_id, l), false, true, 5000, false);
          }
          catch (Throwable t)
          {

Modified: core/trunk/src/main/java/org/jboss/cache/buddyreplication/BuddyManager.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/buddyreplication/BuddyManager.java	2008-01-22 19:06:36 UTC (rev 5196)
+++ core/trunk/src/main/java/org/jboss/cache/buddyreplication/BuddyManager.java	2008-01-22 19:34:40 UTC (rev 5197)
@@ -1019,7 +1019,7 @@
          }
       }
 
-      rpcManager.callRemoteMethods(recipients, call, sync, true, config.getBuddyCommunicationTimeout());
+      rpcManager.callRemoteMethods(recipients, call, sync, true, config.getBuddyCommunicationTimeout(), false);
    }
 
 

Modified: core/trunk/src/main/java/org/jboss/cache/interceptors/BaseRpcInterceptor.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/interceptors/BaseRpcInterceptor.java	2008-01-22 19:06:36 UTC (rev 5196)
+++ core/trunk/src/main/java/org/jboss/cache/interceptors/BaseRpcInterceptor.java	2008-01-22 19:34:40 UTC (rev 5197)
@@ -73,6 +73,11 @@
       }
    }
 
+   protected void replicateCall(InvocationContext ctx, MethodCall call, boolean sync, Option o, boolean useOutOfBandMessage) throws Throwable
+   {
+      replicateCall(ctx, null, call, sync, o, true, useOutOfBandMessage);
+   }
+
    protected void replicateCall(InvocationContext ctx, MethodCall call, boolean sync, Option o) throws Throwable
    {
       replicateCall(ctx, null, call, sync, o);
@@ -80,10 +85,10 @@
 
    protected void replicateCall(InvocationContext ctx, List<Address> recipients, MethodCall call, boolean sync, Option o) throws Throwable
    {
-      replicateCall(ctx, recipients, call, sync, o, true);
+      replicateCall(ctx, recipients, call, sync, o, true, false);
    }
 
-   protected void replicateCall(InvocationContext ctx, List<Address> recipients, MethodCall call, boolean sync, Option o, boolean wrapMethodCallInReplicateMethod) throws Throwable
+   protected void replicateCall(InvocationContext ctx, List<Address> recipients, MethodCall call, boolean sync, Option o, boolean wrapMethodCallInReplicateMethod, boolean useOutOfBandMessage) throws Throwable
    {
 
       if (trace) log.trace("Broadcasting call " + call + " to recipient list " + recipients);
@@ -123,7 +128,9 @@
                toCall,
                sync, // is synchronised?
                true, // ignore self?
-               (int) syncReplTimeout);
+               (int) syncReplTimeout,
+               useOutOfBandMessage
+         );
          if (trace) log.trace("responses=" + rsps);
          if (sync) checkResponses(rsps);
       }

Modified: core/trunk/src/main/java/org/jboss/cache/interceptors/DataGravitatorInterceptor.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/interceptors/DataGravitatorInterceptor.java	2008-01-22 19:06:36 UTC (rev 5196)
+++ core/trunk/src/main/java/org/jboss/cache/interceptors/DataGravitatorInterceptor.java	2008-01-22 19:34:40 UTC (rev 5197)
@@ -262,7 +262,7 @@
       if (transactionMods.containsKey(gtx))
       {
          if (trace) log.trace("Broadcasting commit for gtx " + gtx);
-         replicateCall(ctx, getMembersOutsideBuddyGroup(), MethodCallFactory.create(MethodDeclarations.commitMethod_id, gtx), syncCommunications, ctx.getOptionOverrides());
+         replicateCall(ctx, getMembersOutsideBuddyGroup(), MethodCallFactory.create(MethodDeclarations.commitMethod_id, gtx), syncCommunications, ctx.getOptionOverrides(), true, true);
       }
       else
       {
@@ -310,7 +310,7 @@
          // remove main Fqn
          if (trace) log.trace("Performing cleanup on [" + backup.backupFqn + "]");
          // remove backup Fqn
-         replicateCall(ctx, cache.getMembers(), cleanup, syncCommunications, ctx.getOptionOverrides(), false);
+         replicateCall(ctx, cache.getMembers(), cleanup, syncCommunications, ctx.getOptionOverrides(), false, false);
       }
       else
       {
@@ -335,7 +335,7 @@
       MethodCall dGrav = MethodCallFactory.create(MethodDeclarations.dataGravitationMethod_id, fqn, searchSubtrees);
       // doing a GET_ALL is crappy but necessary since JGroups' GET_FIRST could return null results from nodes that do
       // not have either the primary OR backup, and stop polling other valid nodes.
-      List resps = cache.getRPCManager().callRemoteMethods(mbrs, dGrav, GroupRequest.GET_ALL, true, buddyManager.getBuddyCommunicationTimeout(), new ResponseValidityFilter(mbrs, cache.getLocalAddress()));
+      List resps = cache.getRPCManager().callRemoteMethods(mbrs, dGrav, GroupRequest.GET_ALL, true, buddyManager.getBuddyCommunicationTimeout(), new ResponseValidityFilter(mbrs, cache.getLocalAddress()), false);
       if (trace)
       {
          log.trace("got responses " + resps);

Modified: core/trunk/src/main/java/org/jboss/cache/interceptors/OptimisticReplicationInterceptor.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/interceptors/OptimisticReplicationInterceptor.java	2008-01-22 19:06:36 UTC (rev 5196)
+++ core/trunk/src/main/java/org/jboss/cache/interceptors/OptimisticReplicationInterceptor.java	2008-01-22 19:34:40 UTC (rev 5197)
@@ -217,7 +217,7 @@
             if (log.isDebugEnabled())
                log.debug("running remote commit for " + gtx + " and coord=" + cache.getLocalAddress());
 
-            replicateCall(ctx, commit_method, remoteCallSync, ctx.getOptionOverrides());
+            replicateCall(ctx, commit_method, remoteCallSync, ctx.getOptionOverrides(), true);
          }
          catch (Exception e)
          {

Modified: core/trunk/src/main/java/org/jboss/cache/interceptors/ReplicationInterceptor.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/interceptors/ReplicationInterceptor.java	2008-01-22 19:06:36 UTC (rev 5196)
+++ core/trunk/src/main/java/org/jboss/cache/interceptors/ReplicationInterceptor.java	2008-01-22 19:34:40 UTC (rev 5197)
@@ -45,7 +45,7 @@
       {
          return nextInterceptor(ctx);
       }
-      replicateCall(ctx, ctx.getMethodCall(), configuration.isSyncCommitPhase(), ctx.getOptionOverrides());
+      replicateCall(ctx, ctx.getMethodCall(), configuration.isSyncCommitPhase(), ctx.getOptionOverrides(), true);
       return nextInterceptor(ctx);
    }
 

Modified: core/trunk/src/main/java/org/jboss/cache/loader/ClusteredCacheLoader.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/loader/ClusteredCacheLoader.java	2008-01-22 19:06:36 UTC (rev 5196)
+++ core/trunk/src/main/java/org/jboss/cache/loader/ClusteredCacheLoader.java	2008-01-22 19:34:40 UTC (rev 5197)
@@ -103,7 +103,7 @@
       MethodCall clusteredGet = MethodCallFactory.create(MethodDeclarations.clusteredGetMethod_id, call, false);
       List resps = null;
       // JBCACHE-1186
-      resps = cache.getRPCManager().callRemoteMethods(mbrs, clusteredGet, GroupRequest.GET_ALL, true, config.getTimeout(), new ResponseValidityFilter(mbrs, cache.getLocalAddress()));
+      resps = cache.getRPCManager().callRemoteMethods(mbrs, clusteredGet, GroupRequest.GET_ALL, true, config.getTimeout(), new ResponseValidityFilter(mbrs, cache.getLocalAddress()), false);
 
       if (resps == null)
       {

Modified: core/trunk/src/test/java/org/jboss/cache/api/pfer/PutForExternalReadTestBase.java
===================================================================
--- core/trunk/src/test/java/org/jboss/cache/api/pfer/PutForExternalReadTestBase.java	2008-01-22 19:06:36 UTC (rev 5196)
+++ core/trunk/src/test/java/org/jboss/cache/api/pfer/PutForExternalReadTestBase.java	2008-01-22 19:34:40 UTC (rev 5197)
@@ -177,7 +177,7 @@
       {
          // specify what we expect called on the mock Rpc Manager.  For params we don't care about, just use ANYTHING.
          // setting the mock object to expect the "sync" param to be false.
-         expect(rpcManager.callRemoteMethods(anyAddresses(), (MethodCall) anyObject(), eq(false), anyBoolean(), anyInt())).andReturn(null);
+         expect(rpcManager.callRemoteMethods(anyAddresses(), (MethodCall) anyObject(), eq(false), anyBoolean(), anyInt(), anyBoolean())).andReturn(null);
       }
 
       replay(rpcManager);
@@ -240,7 +240,7 @@
          List<Address> memberList = originalRpcManager.getMembers();
          expect(barfingRpcManager.getMembers()).andReturn(memberList).anyTimes();
          expect(barfingRpcManager.getLocalAddress()).andReturn(originalRpcManager.getLocalAddress()).anyTimes();
-         expect(barfingRpcManager.callRemoteMethods(anyAddresses(), (MethodCall) anyObject(), anyBoolean(), anyBoolean(), anyInt())).andThrow(new RuntimeException("Barf!")).anyTimes();
+         expect(barfingRpcManager.callRemoteMethods(anyAddresses(), (MethodCall) anyObject(), anyBoolean(), anyBoolean(), anyInt(), anyBoolean())).andThrow(new RuntimeException("Barf!")).anyTimes();
          replay(barfingRpcManager);
 
          TestingUtil.extractComponentRegistry(cache1).registerComponent(RPCManager.class.getName(), barfingRpcManager, RPCManager.class);

Modified: core/trunk/src/test/java/org/jboss/cache/marshall/ReturnValueMarshallingTest.java
===================================================================
--- core/trunk/src/test/java/org/jboss/cache/marshall/ReturnValueMarshallingTest.java	2008-01-22 19:06:36 UTC (rev 5196)
+++ core/trunk/src/test/java/org/jboss/cache/marshall/ReturnValueMarshallingTest.java	2008-01-22 19:34:40 UTC (rev 5197)
@@ -97,7 +97,7 @@
             MethodCallFactory.create(MethodDeclarations.getKeyValueMethodLocal_id, fqn, key, false),
             false);
 
-      List responses = cache1.getRPCManager().callRemoteMethods(null, call, true, true, 15000);
+      List responses = cache1.getRPCManager().callRemoteMethods(null, call, true, true, 15000, false);
       List response1 = (List) responses.get(0);// response from the first (and only) node
 
       Boolean found = (Boolean) response1.get(0);
@@ -127,7 +127,7 @@
       MethodCall call = MethodCallFactory.create(MethodDeclarations.dataGravitationMethod_id,
             fqn, false);
 
-      List responses = cache1.getRPCManager().callRemoteMethods(null, call, true, true, 15000);
+      List responses = cache1.getRPCManager().callRemoteMethods(null, call, true, true, 15000, false);
       GravitateResult data = (GravitateResult) responses.get(0);// response from the first (and only) node
 
       assertTrue("Should have found remote data", data.isDataFound());

Added: core/trunk/src/test/java/org/jboss/cache/transaction/PrepareCommitContentionTest.java
===================================================================
--- core/trunk/src/test/java/org/jboss/cache/transaction/PrepareCommitContentionTest.java	                        (rev 0)
+++ core/trunk/src/test/java/org/jboss/cache/transaction/PrepareCommitContentionTest.java	2008-01-22 19:34:40 UTC (rev 5197)
@@ -0,0 +1,208 @@
+package org.jboss.cache.transaction;
+
+import org.jboss.cache.CacheSPI;
+import org.jboss.cache.DefaultCacheFactory;
+import org.jboss.cache.RPCManager;
+import org.jboss.cache.RPCManagerImpl;
+import org.jboss.cache.config.Configuration;
+import org.jboss.cache.factories.ComponentRegistry;
+import org.jboss.cache.marshall.MethodCall;
+import org.jboss.cache.marshall.MethodDeclarations;
+import org.jboss.cache.misc.TestingUtil;
+import org.jgroups.Address;
+import org.jgroups.blocks.GroupRequest;
+import org.jgroups.blocks.RspFilter;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import javax.transaction.TransactionManager;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+
+/**
+ * This is to test the scenario described in http://jira.jboss.org/jira/browse/JBCACHE-1270
+ * <p/>
+ * i) Node A sends prepare for GTX1; synchronous. Gets applied on Node B. Locks are held on B.
+ * ii) Node A sends commit for GTX1; *asynchronous*.
+ * iii) Node A sends lots of other messages related to other sessions.
+ * iv) Node A sends prepare for GTX2; synchronous.
+ * v) Node B is busy, and by luck the GTX2 prepare gets to UNICAST before the GTX1 commit.
+ * vi) GTX2 prepare blocks due to locks from GTX1.
+ * vii) GTX1 commit is blocked in UNICAST because another thread from Node A is executing.
+ *
+ * @author Manik Surtani (<a href="mailto:manik at jboss.org">manik at jboss.org</a>)
+ * @since 2.1.0
+ */
+ at Test(groups = "functional")
+public class PrepareCommitContentionTest
+{
+   CacheSPI<Object, Object> c1, c2;
+
+   @BeforeMethod
+   public void setUp() throws CloneNotSupportedException
+   {
+      c1 = (CacheSPI<Object, Object>) new DefaultCacheFactory<Object, Object>().createCache(false);
+      c1.getConfiguration().setCacheMode(Configuration.CacheMode.REPL_SYNC);
+      c1.getConfiguration().setTransactionManagerLookupClass(DummyTransactionManagerLookup.class.getName());
+      c1.getConfiguration().setLockAcquisitionTimeout(5000);
+      c2 = (CacheSPI<Object, Object>) new DefaultCacheFactory<Object, Object>().createCache(c1.getConfiguration().clone(), false);
+   }
+
+   @AfterMethod
+   public void tearDown()
+   {
+      TestingUtil.killCaches(c1, c2);
+   }
+
+   public void testWithSyncCommitPhase() throws Exception
+   {
+      doTest(true, false);
+   }
+
+   public void testWithDefautCommitPhase() throws Exception
+   {
+      doTest(false, false);
+   }
+
+   public void testControl() throws Exception
+   {
+      try
+      {
+         doTest(false, true);
+         assert false : "Should fail if we don't use out of band messages for non-sync commits";
+      }
+      catch (AssertionError expected)
+      {
+         // should fail
+      }
+   }
+
+   private void doTest(final boolean syncCommit, boolean noOutOfBandMessages) throws Exception
+   {
+      c1.getConfiguration().setSyncCommitPhase(syncCommit);
+      c2.getConfiguration().setSyncCommitPhase(syncCommit);
+
+      final CountDownLatch mainThreadCommitLatch = new CountDownLatch(1);
+      final CountDownLatch secondThreadPrepareLatch = new CountDownLatch(1);
+
+      DelayingRPCManager delayingRPCManager = new DelayingRPCManager(mainThreadCommitLatch, secondThreadPrepareLatch, syncCommit, noOutOfBandMessages);
+      ComponentRegistry cr = TestingUtil.extractComponentRegistry(c1);
+      cr.registerComponent(RPCManager.class.getName(), delayingRPCManager, RPCManager.class);
+
+      c1.start();
+      c2.start();
+
+      TestingUtil.blockUntilViewsReceived(60000, c1, c2);
+
+      TransactionManager tm = c1.getTransactionManager();
+
+      Thread secondTransaction = new Thread("SecondThread")
+      {
+         public void run()
+         {
+            // wait until thread1 finishes the prepare.
+            try
+            {
+               secondThreadPrepareLatch.await();
+            }
+            catch (InterruptedException e)
+            {
+               // do nothing
+            }
+
+            try
+            {
+               // now replicate a put on a DIFFERENT node so there is no lock contention
+               TransactionManager tm = c1.getTransactionManager();
+               if (syncCommit)
+                  mainThreadCommitLatch.countDown(); // we need to release the main thread commit latch first otherwise it will deadlock!
+
+               tm.begin();
+               c1.put("/a/b/c", "k", "v2");
+               tm.commit();
+            }
+            catch (Exception e)
+            {
+               e.printStackTrace();
+            }
+         }
+      };
+      secondTransaction.start();
+
+
+      tm.begin();
+      c1.put("/a/b/c", "k", "v");
+      tm.commit();
+
+      secondTransaction.join();
+
+      // now assert that both transactions have succeeded
+      assert c1.get("/a/b/c", "k").equals("v2");
+      assert c2.get("/a/b/c", "k").equals("v2");
+   }
+
+   public static class DelayingRPCManager extends RPCManagerImpl
+   {
+      CountDownLatch mainThreadLatch, secondThreadLatch;
+      boolean syncCommit;
+      boolean noOOBMessages = false;
+
+      public DelayingRPCManager(CountDownLatch latch1, CountDownLatch latch2, boolean syncCommit, boolean noOOBMessages)
+      {
+         mainThreadLatch = latch1;
+         secondThreadLatch = latch2;
+         this.syncCommit = syncCommit;
+         this.noOOBMessages = noOOBMessages;
+      }
+
+      @Override
+      public List<Object> callRemoteMethods(final List<Address> recipients, final MethodCall methodCall, final int mode, final boolean excludeSelf, final long timeout, final RspFilter responseFilter, final boolean oob) throws Exception
+      {
+         if (isPrepareMethod(methodCall) && Thread.currentThread().getName().equals("SecondThread"))
+         {
+            if (!syncCommit) mainThreadLatch.countDown();
+         }
+         else if (isCommitMethod(methodCall) && !Thread.currentThread().getName().equals("SecondThread"))
+         {
+            Thread th = new Thread()
+            {
+               public void run()
+               {
+                  try
+                  {
+                     secondThreadLatch.countDown(); // let the 2nd thread start it's prepapre
+                     mainThreadLatch.await(); // and block arbitrarily until
+                     Thread.sleep(1000);
+                     DelayingRPCManager.super.callRemoteMethods(recipients, methodCall, mode, excludeSelf, timeout, responseFilter, !noOOBMessages && oob);
+                  }
+                  catch (Exception e)
+                  {
+                     e.printStackTrace();
+                  }
+               }
+            };
+            th.start();
+            if (mode == GroupRequest.GET_ALL) // sync mode!
+            {
+               th.join();
+            }
+            return Collections.emptyList();
+         }
+         return super.callRemoteMethods(recipients, methodCall, mode, excludeSelf, timeout, responseFilter, !noOOBMessages && oob);
+      }
+
+      private boolean isCommitMethod(MethodCall call)
+      {
+         return call.getMethodId() == MethodDeclarations.commitMethod_id ||
+               (call.getMethodId() == MethodDeclarations.replicateMethod_id && isCommitMethod((MethodCall) call.getArgs()[0]));
+      }
+
+      private boolean isPrepareMethod(MethodCall call)
+      {
+         return call.getMethodId() == MethodDeclarations.prepareMethod_id ||
+               (call.getMethodId() == MethodDeclarations.replicateMethod_id && isPrepareMethod((MethodCall) call.getArgs()[0]));
+      }
+   }
+}




More information about the jbosscache-commits mailing list