Author: manik.surtani(a)jboss.com
Date: 2007-11-29 12:06:22 -0500 (Thu, 29 Nov 2007)
New Revision: 4790
Modified:
core/trunk/src/main/java/org/jboss/cache/interceptors/DataGravitatorInterceptor.java
core/trunk/src/main/java/org/jboss/cache/loader/ClusteredCacheLoader.java
Log:
JBCACHE-1227 - group requests hanging when no node is able to provide desired response
Modified:
core/trunk/src/main/java/org/jboss/cache/interceptors/DataGravitatorInterceptor.java
===================================================================
---
core/trunk/src/main/java/org/jboss/cache/interceptors/DataGravitatorInterceptor.java 2007-11-29
16:58:27 UTC (rev 4789)
+++
core/trunk/src/main/java/org/jboss/cache/interceptors/DataGravitatorInterceptor.java 2007-11-29
17:06:22 UTC (rev 4790)
@@ -321,7 +321,7 @@
MethodCall dGrav =
MethodCallFactory.create(MethodDeclarations.dataGravitationMethod, fqn, searchSubtrees);
// doing a GET_ALL is crappy but necessary since JGroups' GET_FIRST could
return null results from nodes that do
// not have either the primary OR backup, and stop polling other valid nodes.
- List resps = cache.getRPCManager().callRemoteMethods(mbrs, dGrav,
GroupRequest.GET_FIRST, true, buddyManager.getBuddyCommunicationTimeout(), new
ResponseValidityFilter());
+ List resps = cache.getRPCManager().callRemoteMethods(mbrs, dGrav,
GroupRequest.GET_ALL, true, buddyManager.getBuddyCommunicationTimeout(), new
ResponseValidityFilter(mbrs, cache.getLocalAddress()));
if (log.isTraceEnabled())
{
log.trace("got responses " + resps);
@@ -486,20 +486,32 @@
public static class ResponseValidityFilter implements RspFilter
{
private int numValidResponses = 0;
+ private List<Address> pendingResponders;
+ public ResponseValidityFilter(List<Address> expected, Address localAddress)
+ {
+ // so for now I used a list to keep it consistent
+ this.pendingResponders = new ArrayList<Address>(expected);
+ // We'll never get a response from ourself
+ this.pendingResponders.remove(localAddress);
+ }
+
public boolean isAcceptable(Object object, Address address)
{
- if (!(object instanceof GravitateResult)) return false;
+ pendingResponders.remove(address);
- GravitateResult response = (GravitateResult) object;
- if (response.isDataFound()) numValidResponses++;
-
- return response.isDataFound();
+ if (object instanceof GravitateResult)
+ {
+ GravitateResult response = (GravitateResult) object;
+ if (response.isDataFound()) numValidResponses++;
+ }
+ // always return true to make sure a response is logged by the JGroups
RpcDispatcher.
+ return true;
}
public boolean needMoreResponses()
{
- return numValidResponses < 1;
+ return numValidResponses < 1 && pendingResponders.size() > 0;
}
}
}
Modified: core/trunk/src/main/java/org/jboss/cache/loader/ClusteredCacheLoader.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/loader/ClusteredCacheLoader.java 2007-11-29
16:58:27 UTC (rev 4789)
+++ core/trunk/src/main/java/org/jboss/cache/loader/ClusteredCacheLoader.java 2007-11-29
17:06:22 UTC (rev 4790)
@@ -24,6 +24,7 @@
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
@@ -90,7 +91,7 @@
MethodCall clusteredGet =
MethodCallFactory.create(MethodDeclarations.clusteredGetMethod, call, false);
List resps = null;
// JBCACHE-1186
- resps = cache.getRPCManager().callRemoteMethods(mbrs, clusteredGet,
GroupRequest.GET_FIRST, true, config.getTimeout(), new ResponseValidityFilter());
+ resps = cache.getRPCManager().callRemoteMethods(mbrs, clusteredGet,
GroupRequest.GET_ALL, true, config.getTimeout(), new ResponseValidityFilter(mbrs,
cache.getLocalAddress()));
if (resps == null)
{
@@ -327,21 +328,34 @@
public static class ResponseValidityFilter implements RspFilter
{
private int numValidResponses = 0;
+ private List<Address> pendingResponders;
+ public ResponseValidityFilter(List<Address> expected, Address localAddress)
+ {
+ this.pendingResponders = new ArrayList<Address>(expected);
+ // We'll never get a response from ourself
+ this.pendingResponders.remove(localAddress);
+ }
+
public boolean isAcceptable(Object object, Address address)
{
- if (!(object instanceof List)) return false;
+ pendingResponders.remove(address);
- List response = (List) object;
- Boolean foundResult = (Boolean) response.get(0);
- if (foundResult) numValidResponses++;
-
- return foundResult;
+ if (object instanceof List)
+ {
+ List response = (List) object;
+ Boolean foundResult = (Boolean) response.get(0);
+ if (foundResult) numValidResponses++;
+ }
+ // always return true to make sure a response is logged by the JGroups
RpcDispatcher.
+ return true;
}
public boolean needMoreResponses()
{
- return numValidResponses < 1;
+ return numValidResponses < 1 && pendingResponders.size() > 0;
}
+
}
+
}
Show replies by date