[jbosscache-commits] JBoss Cache SVN: r4789 - in core/tags/2.1.0.CR2/src/main/java/org/jboss/cache: loader and 1 other directory.

jbosscache-commits at lists.jboss.org jbosscache-commits at lists.jboss.org
Thu Nov 29 11:58:28 EST 2007


Author: manik.surtani at jboss.com
Date: 2007-11-29 11:58:27 -0500 (Thu, 29 Nov 2007)
New Revision: 4789

Modified:
   core/tags/2.1.0.CR2/src/main/java/org/jboss/cache/interceptors/DataGravitatorInterceptor.java
   core/tags/2.1.0.CR2/src/main/java/org/jboss/cache/loader/ClusteredCacheLoader.java
Log:
JBCACHE-1227 - group requests hanging when no node is able to provide desired response

Modified: core/tags/2.1.0.CR2/src/main/java/org/jboss/cache/interceptors/DataGravitatorInterceptor.java
===================================================================
--- core/tags/2.1.0.CR2/src/main/java/org/jboss/cache/interceptors/DataGravitatorInterceptor.java	2007-11-27 19:04:35 UTC (rev 4788)
+++ core/tags/2.1.0.CR2/src/main/java/org/jboss/cache/interceptors/DataGravitatorInterceptor.java	2007-11-29 16:58:27 UTC (rev 4789)
@@ -32,6 +32,7 @@
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 
 /**
@@ -321,7 +322,7 @@
       MethodCall dGrav = MethodCallFactory.create(MethodDeclarations.dataGravitationMethod, fqn, searchSubtrees);
       // doing a GET_ALL is crappy but necessary since JGroups' GET_FIRST could return null results from nodes that do
       // not have either the primary OR backup, and stop polling other valid nodes.
-      List resps = cache.getRPCManager().callRemoteMethods(mbrs, dGrav, GroupRequest.GET_FIRST, true, buddyManager.getBuddyCommunicationTimeout(), new ResponseValidityFilter());
+      List resps = cache.getRPCManager().callRemoteMethods(mbrs, dGrav, GroupRequest.GET_ALL, true, buddyManager.getBuddyCommunicationTimeout(), new ResponseValidityFilter(mbrs, cache.getLocalAddress()));
       if (log.isTraceEnabled())
       {
          log.trace("got responses " + resps);
@@ -486,20 +487,31 @@
    public static class ResponseValidityFilter implements RspFilter
    {
       private int numValidResponses = 0;
-
+      private List<Address> pendingResponders;
+      
+      public ResponseValidityFilter(List<Address> expected, Address localAddress)
+      {
+         this.pendingResponders = new ArrayList<Address>(expected);
+         // We'll never get a response from ourself
+         this.pendingResponders.remove(localAddress);
+      }
+      
       public boolean isAcceptable(Object object, Address address)
       {
-         if (!(object instanceof GravitateResult)) return false;
-
-         GravitateResult response = (GravitateResult) object;
-         if (response.isDataFound()) numValidResponses++;
-
-         return response.isDataFound();
+         pendingResponders.remove(address);
+         
+         if (object instanceof GravitateResult)
+         {
+            GravitateResult response = (GravitateResult) object;
+            if (response.isDataFound()) numValidResponses++;
+         }
+         // always return true to make sure a response is logged by the JGroups RpcDispatcher.
+         return true;
       }
 
       public boolean needMoreResponses()
       {
-         return numValidResponses < 1;
+         return numValidResponses < 1 && pendingResponders.size() > 0;
       }
    }
 }

Modified: core/tags/2.1.0.CR2/src/main/java/org/jboss/cache/loader/ClusteredCacheLoader.java
===================================================================
--- core/tags/2.1.0.CR2/src/main/java/org/jboss/cache/loader/ClusteredCacheLoader.java	2007-11-27 19:04:35 UTC (rev 4788)
+++ core/tags/2.1.0.CR2/src/main/java/org/jboss/cache/loader/ClusteredCacheLoader.java	2007-11-29 16:58:27 UTC (rev 4789)
@@ -13,6 +13,7 @@
 import org.jboss.cache.Modification;
 import org.jboss.cache.NodeSPI;
 import org.jboss.cache.RegionManager;
+import org.jboss.cache.buddyreplication.GravitateResult;
 import org.jboss.cache.config.CacheLoaderConfig.IndividualCacheLoaderConfig;
 import org.jboss.cache.lock.StripedLock;
 import org.jboss.cache.marshall.MethodCall;
@@ -29,6 +30,7 @@
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.ArrayList;
 
 /**
  * A cache loader that consults other members in the cluster for values.  Does
@@ -90,7 +92,7 @@
       MethodCall clusteredGet = MethodCallFactory.create(MethodDeclarations.clusteredGetMethod, call, false);
       List resps = null;
       // JBCACHE-1186
-      resps = cache.getRPCManager().callRemoteMethods(mbrs, clusteredGet, GroupRequest.GET_FIRST, true, config.getTimeout(), new ResponseValidityFilter());
+      resps = cache.getRPCManager().callRemoteMethods(mbrs, clusteredGet, GroupRequest.GET_ALL, true, config.getTimeout(), new ResponseValidityFilter(mbrs, cache.getLocalAddress()));
 
       if (resps == null)
       {
@@ -327,21 +329,33 @@
    public static class ResponseValidityFilter implements RspFilter
    {
       private int numValidResponses = 0;
+      private List<Address> pendingResponders;
 
+      public ResponseValidityFilter(List<Address> expected, Address localAddress)
+      {
+         this.pendingResponders = new ArrayList<Address>(expected);
+         // We'll never get a response from ourself
+         this.pendingResponders.remove(localAddress);
+      }
+
       public boolean isAcceptable(Object object, Address address)
       {
-         if (!(object instanceof List)) return false;
+         pendingResponders.remove(address);
 
-         List response = (List) object;
-         Boolean foundResult = (Boolean) response.get(0);
-         if (foundResult) numValidResponses++;
-
-         return foundResult;
+         if (object instanceof List)
+         {
+            List response = (List) object;
+            Boolean foundResult = (Boolean) response.get(0);
+            if (foundResult) numValidResponses++;
+         }
+         // always return true to make sure a response is logged by the JGroups RpcDispatcher.
+         return true;
       }
 
       public boolean needMoreResponses()
       {
-         return numValidResponses < 1;
+         return numValidResponses < 1 && pendingResponders.size() > 0;
       }
+
    }
 }




More information about the jbosscache-commits mailing list