[jbosscache-commits] JBoss Cache SVN: r4790 - in core/trunk/src/main/java/org/jboss/cache: loader and 1 other directory.

jbosscache-commits at lists.jboss.org jbosscache-commits at lists.jboss.org
Thu Nov 29 12:06:22 EST 2007


Author: manik.surtani at jboss.com
Date: 2007-11-29 12:06:22 -0500 (Thu, 29 Nov 2007)
New Revision: 4790

Modified:
   core/trunk/src/main/java/org/jboss/cache/interceptors/DataGravitatorInterceptor.java
   core/trunk/src/main/java/org/jboss/cache/loader/ClusteredCacheLoader.java
Log:
JBCACHE-1227 - group requests hanging when no node is able to provide desired response

Modified: core/trunk/src/main/java/org/jboss/cache/interceptors/DataGravitatorInterceptor.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/interceptors/DataGravitatorInterceptor.java	2007-11-29 16:58:27 UTC (rev 4789)
+++ core/trunk/src/main/java/org/jboss/cache/interceptors/DataGravitatorInterceptor.java	2007-11-29 17:06:22 UTC (rev 4790)
@@ -321,7 +321,7 @@
       MethodCall dGrav = MethodCallFactory.create(MethodDeclarations.dataGravitationMethod, fqn, searchSubtrees);
       // doing a GET_ALL is crappy but necessary since JGroups' GET_FIRST could return null results from nodes that do
       // not have either the primary OR backup, and stop polling other valid nodes.
-      List resps = cache.getRPCManager().callRemoteMethods(mbrs, dGrav, GroupRequest.GET_FIRST, true, buddyManager.getBuddyCommunicationTimeout(), new ResponseValidityFilter());
+      List resps = cache.getRPCManager().callRemoteMethods(mbrs, dGrav, GroupRequest.GET_ALL, true, buddyManager.getBuddyCommunicationTimeout(), new ResponseValidityFilter(mbrs, cache.getLocalAddress()));
       if (log.isTraceEnabled())
       {
          log.trace("got responses " + resps);
@@ -486,20 +486,32 @@
    public static class ResponseValidityFilter implements RspFilter
    {
       private int numValidResponses = 0;
+      private List<Address> pendingResponders;
 
+      public ResponseValidityFilter(List<Address> expected, Address localAddress)
+      {
+         // so for now I used a list to keep it consistent
+         this.pendingResponders = new ArrayList<Address>(expected);
+         // We'll never get a response from ourself
+         this.pendingResponders.remove(localAddress);
+      }
+
       public boolean isAcceptable(Object object, Address address)
       {
-         if (!(object instanceof GravitateResult)) return false;
+         pendingResponders.remove(address);
 
-         GravitateResult response = (GravitateResult) object;
-         if (response.isDataFound()) numValidResponses++;
-
-         return response.isDataFound();
+         if (object instanceof GravitateResult)
+         {
+            GravitateResult response = (GravitateResult) object;
+            if (response.isDataFound()) numValidResponses++;
+         }
+         // always return true to make sure a response is logged by the JGroups RpcDispatcher.
+         return true;
       }
 
       public boolean needMoreResponses()
       {
-         return numValidResponses < 1;
+         return numValidResponses < 1 && pendingResponders.size() > 0;
       }
    }
 }

Modified: core/trunk/src/main/java/org/jboss/cache/loader/ClusteredCacheLoader.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/loader/ClusteredCacheLoader.java	2007-11-29 16:58:27 UTC (rev 4789)
+++ core/trunk/src/main/java/org/jboss/cache/loader/ClusteredCacheLoader.java	2007-11-29 17:06:22 UTC (rev 4790)
@@ -24,6 +24,7 @@
 
 import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
@@ -90,7 +91,7 @@
       MethodCall clusteredGet = MethodCallFactory.create(MethodDeclarations.clusteredGetMethod, call, false);
       List resps = null;
       // JBCACHE-1186
-      resps = cache.getRPCManager().callRemoteMethods(mbrs, clusteredGet, GroupRequest.GET_FIRST, true, config.getTimeout(), new ResponseValidityFilter());
+      resps = cache.getRPCManager().callRemoteMethods(mbrs, clusteredGet, GroupRequest.GET_ALL, true, config.getTimeout(), new ResponseValidityFilter(mbrs, cache.getLocalAddress()));
 
       if (resps == null)
       {
@@ -327,21 +328,34 @@
    public static class ResponseValidityFilter implements RspFilter
    {
       private int numValidResponses = 0;
+      private List<Address> pendingResponders;
 
+      public ResponseValidityFilter(List<Address> expected, Address localAddress)
+      {
+         this.pendingResponders = new ArrayList<Address>(expected);
+         // We'll never get a response from ourself
+         this.pendingResponders.remove(localAddress);
+      }
+
       public boolean isAcceptable(Object object, Address address)
       {
-         if (!(object instanceof List)) return false;
+         pendingResponders.remove(address);
 
-         List response = (List) object;
-         Boolean foundResult = (Boolean) response.get(0);
-         if (foundResult) numValidResponses++;
-
-         return foundResult;
+         if (object instanceof List)
+         {
+            List response = (List) object;
+            Boolean foundResult = (Boolean) response.get(0);
+            if (foundResult) numValidResponses++;
+         }
+         // always return true to make sure a response is logged by the JGroups RpcDispatcher.
+         return true;
       }
 
       public boolean needMoreResponses()
       {
-         return numValidResponses < 1;
+         return numValidResponses < 1 && pendingResponders.size() > 0;
       }
+
    }
+
 }




More information about the jbosscache-commits mailing list