Author: manik.surtani(a)jboss.com
Date: 2007-11-29 11:58:27 -0500 (Thu, 29 Nov 2007)
New Revision: 4789
Modified:
core/tags/2.1.0.CR2/src/main/java/org/jboss/cache/interceptors/DataGravitatorInterceptor.java
core/tags/2.1.0.CR2/src/main/java/org/jboss/cache/loader/ClusteredCacheLoader.java
Log:
JBCACHE-1227 - group requests hanging when no node is able to provide desired response
Modified:
core/tags/2.1.0.CR2/src/main/java/org/jboss/cache/interceptors/DataGravitatorInterceptor.java
===================================================================
---
core/tags/2.1.0.CR2/src/main/java/org/jboss/cache/interceptors/DataGravitatorInterceptor.java 2007-11-27
19:04:35 UTC (rev 4788)
+++
core/tags/2.1.0.CR2/src/main/java/org/jboss/cache/interceptors/DataGravitatorInterceptor.java 2007-11-29
16:58:27 UTC (rev 4789)
@@ -32,6 +32,7 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
/**
@@ -321,7 +322,7 @@
MethodCall dGrav =
MethodCallFactory.create(MethodDeclarations.dataGravitationMethod, fqn, searchSubtrees);
// doing a GET_ALL is crappy but necessary since JGroups' GET_FIRST could
return null results from nodes that do
// not have either the primary OR backup, and stop polling other valid nodes.
- List resps = cache.getRPCManager().callRemoteMethods(mbrs, dGrav,
GroupRequest.GET_FIRST, true, buddyManager.getBuddyCommunicationTimeout(), new
ResponseValidityFilter());
+ List resps = cache.getRPCManager().callRemoteMethods(mbrs, dGrav,
GroupRequest.GET_ALL, true, buddyManager.getBuddyCommunicationTimeout(), new
ResponseValidityFilter(mbrs, cache.getLocalAddress()));
if (log.isTraceEnabled())
{
log.trace("got responses " + resps);
@@ -486,20 +487,31 @@
public static class ResponseValidityFilter implements RspFilter
{
private int numValidResponses = 0;
-
+ private List<Address> pendingResponders;
+
+ public ResponseValidityFilter(List<Address> expected, Address localAddress)
+ {
+ this.pendingResponders = new ArrayList<Address>(expected);
+ // We'll never get a response from ourself
+ this.pendingResponders.remove(localAddress);
+ }
+
public boolean isAcceptable(Object object, Address address)
{
- if (!(object instanceof GravitateResult)) return false;
-
- GravitateResult response = (GravitateResult) object;
- if (response.isDataFound()) numValidResponses++;
-
- return response.isDataFound();
+ pendingResponders.remove(address);
+
+ if (object instanceof GravitateResult)
+ {
+ GravitateResult response = (GravitateResult) object;
+ if (response.isDataFound()) numValidResponses++;
+ }
+ // always return true to make sure a response is logged by the JGroups
RpcDispatcher.
+ return true;
}
public boolean needMoreResponses()
{
- return numValidResponses < 1;
+ return numValidResponses < 1 && pendingResponders.size() > 0;
}
}
}
Modified:
core/tags/2.1.0.CR2/src/main/java/org/jboss/cache/loader/ClusteredCacheLoader.java
===================================================================
---
core/tags/2.1.0.CR2/src/main/java/org/jboss/cache/loader/ClusteredCacheLoader.java 2007-11-27
19:04:35 UTC (rev 4788)
+++
core/tags/2.1.0.CR2/src/main/java/org/jboss/cache/loader/ClusteredCacheLoader.java 2007-11-29
16:58:27 UTC (rev 4789)
@@ -13,6 +13,7 @@
import org.jboss.cache.Modification;
import org.jboss.cache.NodeSPI;
import org.jboss.cache.RegionManager;
+import org.jboss.cache.buddyreplication.GravitateResult;
import org.jboss.cache.config.CacheLoaderConfig.IndividualCacheLoaderConfig;
import org.jboss.cache.lock.StripedLock;
import org.jboss.cache.marshall.MethodCall;
@@ -29,6 +30,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.ArrayList;
/**
* A cache loader that consults other members in the cluster for values. Does
@@ -90,7 +92,7 @@
MethodCall clusteredGet =
MethodCallFactory.create(MethodDeclarations.clusteredGetMethod, call, false);
List resps = null;
// JBCACHE-1186
- resps = cache.getRPCManager().callRemoteMethods(mbrs, clusteredGet,
GroupRequest.GET_FIRST, true, config.getTimeout(), new ResponseValidityFilter());
+ resps = cache.getRPCManager().callRemoteMethods(mbrs, clusteredGet,
GroupRequest.GET_ALL, true, config.getTimeout(), new ResponseValidityFilter(mbrs,
cache.getLocalAddress()));
if (resps == null)
{
@@ -327,21 +329,33 @@
public static class ResponseValidityFilter implements RspFilter
{
private int numValidResponses = 0;
+ private List<Address> pendingResponders;
+ public ResponseValidityFilter(List<Address> expected, Address localAddress)
+ {
+ this.pendingResponders = new ArrayList<Address>(expected);
+ // We'll never get a response from ourself
+ this.pendingResponders.remove(localAddress);
+ }
+
public boolean isAcceptable(Object object, Address address)
{
- if (!(object instanceof List)) return false;
+ pendingResponders.remove(address);
- List response = (List) object;
- Boolean foundResult = (Boolean) response.get(0);
- if (foundResult) numValidResponses++;
-
- return foundResult;
+ if (object instanceof List)
+ {
+ List response = (List) object;
+ Boolean foundResult = (Boolean) response.get(0);
+ if (foundResult) numValidResponses++;
+ }
+ // always return true to make sure a response is logged by the JGroups
RpcDispatcher.
+ return true;
}
public boolean needMoreResponses()
{
- return numValidResponses < 1;
+ return numValidResponses < 1 && pendingResponders.size() > 0;
}
+
}
}