Author: manik.surtani(a)jboss.com
Date: 2007-09-25 09:05:59 -0400 (Tue, 25 Sep 2007)
New Revision: 4502
Modified:
core/trunk/src/main/java/org/jboss/cache/CacheImpl.java
core/trunk/src/main/java/org/jboss/cache/RPCManager.java
core/trunk/src/main/java/org/jboss/cache/RPCManagerImpl.java
core/trunk/src/main/java/org/jboss/cache/loader/ClusteredCacheLoader.java
core/trunk/src/test/java/org/jboss/cache/loader/ClusteredCacheLoaderTest.java
Log:
JBCACHE-1186 - Use a JGroups RspFilter to filter responses for the clustered cache loader.
Note that this imposes a dependency on JGroups 2.6.
Modified: core/trunk/src/main/java/org/jboss/cache/CacheImpl.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/CacheImpl.java 2007-09-24 20:54:55 UTC (rev
4501)
+++ core/trunk/src/main/java/org/jboss/cache/CacheImpl.java 2007-09-25 13:05:59 UTC (rev
4502)
@@ -61,6 +61,7 @@
import org.jgroups.View;
import org.jgroups.blocks.GroupRequest;
import org.jgroups.blocks.RpcDispatcher;
+import org.jgroups.blocks.RspFilter;
import org.jgroups.util.Rsp;
import org.jgroups.util.RspList;
import org.jgroups.util.Util;
@@ -472,7 +473,7 @@
if (log.isWarnEnabled())
{
log.warn("Cannot fetch partial state, targets are " + sources +
- " and stateId is " + stateId);
+ " and stateId is " + stateId);
}
return;
}
@@ -555,7 +556,7 @@
* @throws Error
*/
private void handleLifecycleTransitionFailure(Throwable t)
- throws CacheException, RuntimeException, Error
+ throws CacheException, RuntimeException, Error
{
cacheStatus = CacheStatus.FAILED;
if (t instanceof CacheException)
@@ -839,7 +840,7 @@
catch (CacheException e)
{
log.warn("Needed to call stop() before destroying but stop() "
+
- "threw exception. Proceeding to destroy", e);
+ "threw exception. Proceeding to destroy", e);
}
}
else
@@ -913,7 +914,7 @@
if (failed)
{
log.warn("Attempted to stop() from FAILED state, " +
- "but caught exception; try calling destroy()", t);
+ "but caught exception; try calling destroy()", t);
}
handleLifecycleTransitionFailure(t);
}
@@ -1044,7 +1045,7 @@
protected void createEvictionPolicy()
{
if (configuration.getEvictionConfig() != null
- && configuration.getEvictionConfig().isValidConfig())
+ && configuration.getEvictionConfig().isValidConfig())
{
regionManager.setEvictionConfig(configuration.getEvictionConfig());
regionManager.setUsingEvictions(true);
@@ -1146,8 +1147,8 @@
try
{
child = factory.createDataNode(name,
- subtree.getAncestor(i + 1),
- parent, null, true);
+ subtree.getAncestor(i + 1),
+ parent, null, true);
parent.addChild(name, child);
}
finally
@@ -1241,7 +1242,7 @@
if (broken && log.isTraceEnabled())
{
log.trace("Broke lock for node " + node.getFqn() +
- " held by " + deadOwner);
+ " held by " + deadOwner);
}
}
@@ -1293,7 +1294,7 @@
else
{
throw new CacheException("Initial state transfer failed: " +
- "Channel.getState() returned false");
+ "Channel.getState() returned false");
}
}
}
@@ -1422,7 +1423,7 @@
if (log.isTraceEnabled())
{
log.trace(new
StringBuffer("_get(").append("\"").append(fqn).append("\",
\"").append(key).append("\", \"").
- append(sendNodeEvent).append("\")"));
+ append(sendNodeEvent).append("\")"));
}
if (sendNodeEvent) notifier.notifyNodeVisited(fqn, true, ctx);
NodeSPI<K, V> n = findNode(fqn);
@@ -1848,8 +1849,9 @@
for (Object c : s)
{
NodeSPI child = (NodeSPI) c;
- if (!child.isDeleted()) {
- E e = (E)child.getFqn().getLastElement();
+ if (!child.isDeleted())
+ {
+ E e = (E) child.getFqn().getLastElement();
childNames.add(e);
}
}
@@ -2023,11 +2025,17 @@
@Deprecated
public List callRemoteMethods(List<Address> mbrs, MethodCall method_call,
boolean synchronous, boolean exclude_self, long
timeout)
- throws Exception
+ throws Exception
{
return callRemoteMethods(mbrs, method_call, synchronous ? GroupRequest.GET_ALL :
GroupRequest.GET_NONE, exclude_self, timeout);
}
+ @Deprecated
+ public List callRemoteMethods(List<Address> mbrs, MethodCall method_call, int
mode, boolean exclude_self, long timeout)
+ throws Exception
+ {
+ return callRemoteMethods(mbrs, method_call, mode, exclude_self, timeout, null);
+ }
/**
* Overloaded to allow a finer grained control over JGroups mode
@@ -2042,8 +2050,8 @@
* @deprecated Note this is due to be moved to an interceptor.
*/
@Deprecated
- public List callRemoteMethods(List<Address> mbrs, MethodCall method_call, int
mode, boolean exclude_self, long timeout)
- throws Exception
+ public List callRemoteMethods(List<Address> mbrs, MethodCall method_call, int
mode, boolean exclude_self, long timeout, RspFilter rspFilter)
+ throws Exception
{
RspList rsps = null;
Rsp rsp;
@@ -2091,12 +2099,10 @@
if (channel.flushSupported())
{
flushBlockGate.await(configuration.getStateRetrievalTimeout());
- rsps = disp.callRemoteMethods(validMembers, method_call, mode, timeout,
buddyManager != null && buddyManager.isEnabled());
}
- else
- {
- rsps = disp.callRemoteMethods(validMembers, method_call, mode, timeout,
buddyManager != null && buddyManager.isEnabled());
- }
+ rsps = rspFilter == null
+ ? disp.callRemoteMethods(validMembers, method_call, mode, timeout,
buddyManager != null && buddyManager.isEnabled())
+ : disp.callRemoteMethods(validMembers, method_call, mode, timeout,
buddyManager != null && buddyManager.isEnabled(), false, rspFilter);
// a null response is 99% likely to be due to a marshalling problem - we throw a
NSE, this needs to be changed when
// JGroups supports
http://jira.jboss.com/jira/browse/JGRP-193
@@ -2154,7 +2160,7 @@
@Deprecated
public List callRemoteMethods(List<Address> members, Method method, Object[]
args,
boolean synchronous, boolean exclude_self, long
timeout)
- throws Exception
+ throws Exception
{
return callRemoteMethods(members, MethodCallFactory.create(method, args),
synchronous, exclude_self, timeout);
}
@@ -2175,7 +2181,7 @@
public List callRemoteMethods(Vector<Address> members, String method_name,
Class[] types, Object[] args,
boolean synchronous, boolean exclude_self, long
timeout)
- throws Exception
+ throws Exception
{
Method method = getClass().getDeclaredMethod(method_name, types);
return callRemoteMethods(members, method, args, synchronous, exclude_self,
timeout);
@@ -2241,7 +2247,7 @@
* Otherwise they will not be created (used by rollback()).
*/
public void _put(GlobalTransaction tx, String fqn, Map<K, V> data, boolean
create_undo_ops)
- throws CacheException
+ throws CacheException
{
_put(tx, Fqn.fromString(fqn), data, create_undo_ops);
}
@@ -2263,7 +2269,7 @@
* Otherwise they will not be created (used by rollback()).
*/
public void _put(GlobalTransaction tx, Fqn fqn, Map<K, V> data, boolean
create_undo_ops)
- throws CacheException
+ throws CacheException
{
_put(tx, fqn, data, create_undo_ops, false);
}
@@ -2285,7 +2291,7 @@
* Otherwise they will not be created (used by rollback()).
*/
public void _put(GlobalTransaction tx, Fqn fqn, Map<K, V> data, boolean
create_undo_ops, boolean erase_contents)
- throws CacheException
+ throws CacheException
{
if (log.isTraceEnabled())
{
@@ -2320,7 +2326,7 @@
* @return Previous value (if any)
*/
public Object _put(GlobalTransaction tx, String fqn, K key, V value, boolean
create_undo_ops)
- throws CacheException
+ throws CacheException
{
return _put(tx, Fqn.fromString(fqn), key, value, create_undo_ops);
}
@@ -2330,9 +2336,9 @@
try
{
return tx != null && (
- tx.getStatus() == javax.transaction.Status.STATUS_ROLLEDBACK ||
- tx.getStatus() == javax.transaction.Status.STATUS_ROLLING_BACK ||
- tx.getStatus() == javax.transaction.Status.STATUS_MARKED_ROLLBACK);
+ tx.getStatus() == javax.transaction.Status.STATUS_ROLLEDBACK ||
+ tx.getStatus() == javax.transaction.Status.STATUS_ROLLING_BACK
||
+ tx.getStatus() ==
javax.transaction.Status.STATUS_MARKED_ROLLBACK);
}
catch (Exception e)
{
@@ -2347,12 +2353,12 @@
* @return Previous value (if any)
*/
public Object _put(GlobalTransaction tx, Fqn fqn, K key, V value, boolean
create_undo_ops)
- throws CacheException
+ throws CacheException
{
if (log.isTraceEnabled())
{
log.trace(new StringBuffer("_put(").append(tx).append(",
\"").
- append(fqn).append("\", k=").append(key).append(",
v=").append(value).append(")"));
+ append(fqn).append("\", k=").append(key).append(",
v=").append(value).append(")"));
}
@@ -2407,7 +2413,7 @@
}
public boolean _remove(GlobalTransaction tx, Fqn fqn, boolean create_undo_ops, boolean
sendNodeEvent)
- throws CacheException
+ throws CacheException
{
return _remove(tx, fqn, create_undo_ops, sendNodeEvent, false);
}
@@ -2421,7 +2427,7 @@
* @param sendNodeEvent
*/
public boolean _remove(GlobalTransaction tx, Fqn fqn, boolean create_undo_ops, boolean
sendNodeEvent, boolean eviction)
- throws CacheException
+ throws CacheException
{
return _remove(tx, fqn, create_undo_ops, sendNodeEvent, eviction, null);
}
@@ -2441,7 +2447,7 @@
* @throws CacheException
*/
public boolean _remove(GlobalTransaction tx, Fqn fqn, boolean create_undo_ops, boolean
sendNodeEvent, boolean eviction, DataVersion version)
- throws CacheException
+ throws CacheException
{
NodeSPI<K, V> n;
@@ -2551,7 +2557,7 @@
* @return Object
*/
public V _remove(GlobalTransaction tx, String fqn, K key, boolean create_undo_ops)
- throws CacheException
+ throws CacheException
{
return _remove(tx, Fqn.fromString(fqn), key, create_undo_ops);
}
@@ -2564,7 +2570,7 @@
* @return Object
*/
public V _remove(GlobalTransaction tx, Fqn fqn, K key, boolean create_undo_ops)
- throws CacheException
+ throws CacheException
{
MethodCall undo_op = null;
V old_value = null;
@@ -2609,7 +2615,7 @@
* Internal method to remove data from a node.
*/
public void _removeData(GlobalTransaction tx, String fqn, boolean create_undo_ops)
- throws CacheException
+ throws CacheException
{
_removeData(tx, Fqn.fromString(fqn), create_undo_ops);
}
@@ -2618,7 +2624,7 @@
* Internal method to remove data from a node.
*/
public void _removeData(GlobalTransaction tx, Fqn fqn, boolean create_undo_ops)
- throws CacheException
+ throws CacheException
{
_removeData(tx, fqn, create_undo_ops, true);
}
@@ -2627,7 +2633,7 @@
* Internal method to remove data from a node.
*/
public void _removeData(GlobalTransaction tx, Fqn fqn, boolean create_undo_ops,
boolean sendNodeEvent)
- throws CacheException
+ throws CacheException
{
_removeData(tx, fqn, create_undo_ops, sendNodeEvent, false);
}
@@ -2636,7 +2642,7 @@
* Internal method to remove data from a node.
*/
public void _removeData(GlobalTransaction tx, Fqn fqn, boolean create_undo_ops,
boolean sendNodeEvent, boolean eviction)
- throws CacheException
+ throws CacheException
{
_removeData(tx, fqn, create_undo_ops, sendNodeEvent, eviction, null);
}
@@ -2645,7 +2651,7 @@
* Internal method to remove data from a node.
*/
public void _removeData(GlobalTransaction tx, Fqn fqn, boolean create_undo_ops,
boolean sendNodeEvent, boolean eviction, DataVersion version)
- throws CacheException
+ throws CacheException
{
MethodCall undo_op = null;
@@ -2673,7 +2679,7 @@
if (!data.isEmpty())
{
undo_op = MethodCallFactory.create(MethodDeclarations.putDataMethodLocal,
- tx, fqn, new HashMap<K,V>(data),
false);
+ tx, fqn, new HashMap<K, V>(data), false);
}
}
@@ -2790,10 +2796,10 @@
/**
* Very much like an evict(), except that regardless of whether there is a child
present, this call will never
* remove the node from memory - just remove its contents.
- *
+ * <p/>
* Also, potentially throws a cache exception if data versioning is used and the node
in memory has a newer data
* version than what is passed in.
- *
+ * <p/>
* Finally, the data version of the in-memory node is updated to the version being
evicted to prevent versions
* going out of sync.
*
@@ -2823,7 +2829,7 @@
put(fqn, m);
getInvocationContext().getOptionOverrides().setCacheModeLocal(false);
NodeSPI nodeSPI = (NodeSPI) root.getChild(fqn);
- nodeSPI.setVersion(versionToInvalidate);
+ nodeSPI.setVersion(versionToInvalidate);
}
}
@@ -2846,7 +2852,7 @@
* Compensating method to {@link #_remove(GlobalTransaction,Fqn,boolean)}.
*/
public void _addChild(GlobalTransaction gtx, Fqn parent_fqn, Object child_name, Node
cn, boolean undoOps)
- throws CacheException
+ throws CacheException
{
NodeSPI childNode = (NodeSPI) cn;
if (log.isTraceEnabled())
@@ -2973,7 +2979,7 @@
* @return a GravitateResult which contains the data for the gravitation
*/
public GravitateResult gravitateData(Fqn fqn, boolean searchSubtrees)
- throws CacheException
+ throws CacheException
{
// we need to get the state for this Fqn and its sub-nodes.
@@ -3189,7 +3195,7 @@
* Should not be called.
*/
public void _lock(Fqn fqn, NodeLock.LockType lock_type, boolean recursive)
- throws TimeoutException, LockingException
+ throws TimeoutException, LockingException
{
throw new UnsupportedOperationException("method _lock() should not be invoked
on CacheImpl");
}
@@ -3464,7 +3470,7 @@
else
{
log.error("Caught " + t.getClass().getName()
- + " while responding to state transfer request", t);
+ + " while responding to state transfer request", t);
}
}
@@ -3546,7 +3552,7 @@
out = new MarshalledValueOutputStream(baos);
getStateTransferManager().getState(out, Fqn.fromString(sourceRoot),
- configuration.getStateRetrievalTimeout(),
true, true);
+ configuration.getStateRetrievalTimeout(), true, true);
}
catch (Throwable t)
{
@@ -4215,18 +4221,18 @@
protected String getDefaultProperties()
{
return "UDP(mcast_addr=224.0.0.36;mcast_port=55566;ip_ttl=32;" +
- "mcast_send_buf_size=150000;mcast_recv_buf_size=80000):" +
- "PING(timeout=1000;num_initial_members=2):" +
- "MERGE2(min_interval=5000;max_interval=10000):" +
- "FD_SOCK:" +
- "VERIFY_SUSPECT(timeout=1500):" +
- "pbcast.NAKACK(gc_lag=50;retransmit_timeout=600,1200,2400,4800):"
+
- "UNICAST(timeout=600,1200,2400,4800):" +
- "pbcast.STABLE(desired_avg_gossip=20000):" +
- "FRAG(frag_size=8192):" +
- "pbcast.GMS(join_timeout=5000;join_retry_timeout=2000;" +
- "shun=false;print_local_addr=true):" +
- "pbcast.STATE_TRANSFER";
+ "mcast_send_buf_size=150000;mcast_recv_buf_size=80000):" +
+ "PING(timeout=1000;num_initial_members=2):" +
+ "MERGE2(min_interval=5000;max_interval=10000):" +
+ "FD_SOCK:" +
+ "VERIFY_SUSPECT(timeout=1500):" +
+ "pbcast.NAKACK(gc_lag=50;retransmit_timeout=600,1200,2400,4800):"
+
+ "UNICAST(timeout=600,1200,2400,4800):" +
+ "pbcast.STABLE(desired_avg_gossip=20000):" +
+ "FRAG(frag_size=8192):" +
+ "pbcast.GMS(join_timeout=5000;join_retry_timeout=2000;" +
+ "shun=false;print_local_addr=true):" +
+ "pbcast.STATE_TRANSFER";
}
private void initialiseCacheLoaderManager() throws CacheException
@@ -4287,7 +4293,7 @@
if (log.isDebugEnabled())
{
log.debug("Created Multiplexer Channel for cache cluster " +
configuration.getClusterName() +
- " using stack " +
configuration.getMultiplexerStack());
+ " using stack " + configuration.getMultiplexerStack());
}
}
else
Modified: core/trunk/src/main/java/org/jboss/cache/RPCManager.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/RPCManager.java 2007-09-24 20:54:55 UTC (rev
4501)
+++ core/trunk/src/main/java/org/jboss/cache/RPCManager.java 2007-09-25 13:05:59 UTC (rev
4502)
@@ -2,12 +2,19 @@
import org.jboss.cache.marshall.MethodCall;
import org.jgroups.Address;
+import org.jgroups.blocks.RspFilter;
import java.lang.reflect.Method;
import java.util.List;
public interface RPCManager
{
+ /**
+ * The same as {@link
#callRemoteMethods(java.util.List,org.jboss.cache.marshall.MethodCall,int,boolean,long)}
except that it adds a JGroups
+ * {@link org.jgroups.blocks.RspFilter} to the list of parameters, which is used to
filter results.
+ */
+ public List callRemoteMethods(List<Address> recipients, MethodCall methodCall,
int mode, boolean excludeSelf, long timeout, RspFilter responseFilter) throws Exception;
+
public List callRemoteMethods(List<Address> recipients, MethodCall methodCall,
int mode, boolean excludeSelf, long timeout) throws Exception;
public boolean isCoordinator();
Modified: core/trunk/src/main/java/org/jboss/cache/RPCManagerImpl.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/RPCManagerImpl.java 2007-09-24 20:54:55 UTC
(rev 4501)
+++ core/trunk/src/main/java/org/jboss/cache/RPCManagerImpl.java 2007-09-25 13:05:59 UTC
(rev 4502)
@@ -8,6 +8,7 @@
import org.jboss.cache.marshall.MethodCall;
import org.jgroups.Address;
+import org.jgroups.blocks.RspFilter;
import java.lang.reflect.Method;
import java.util.List;
@@ -36,6 +37,12 @@
// for now, we delegate RPC calls to deprecated methods in CacheImpl.
@SuppressWarnings("deprecation")
+ public List callRemoteMethods(List<Address> recipients, MethodCall methodCall,
int mode, boolean excludeSelf, long timeout, RspFilter responseFilter) throws Exception
+ {
+ return c.callRemoteMethods(recipients, methodCall, mode, excludeSelf, timeout,
responseFilter);
+ }
+
+ @SuppressWarnings("deprecation")
public List callRemoteMethods(List<Address> recipients, MethodCall methodCall,
int mode, boolean excludeSelf, long timeout) throws Exception
{
return c.callRemoteMethods(recipients, methodCall, mode, excludeSelf, timeout);
Modified: core/trunk/src/main/java/org/jboss/cache/loader/ClusteredCacheLoader.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/loader/ClusteredCacheLoader.java 2007-09-24
20:54:55 UTC (rev 4501)
+++ core/trunk/src/main/java/org/jboss/cache/loader/ClusteredCacheLoader.java 2007-09-25
13:05:59 UTC (rev 4502)
@@ -20,6 +20,7 @@
import org.jboss.cache.marshall.MethodDeclarations;
import org.jgroups.Address;
import org.jgroups.blocks.GroupRequest;
+import org.jgroups.blocks.RspFilter;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
@@ -87,7 +88,10 @@
if (log.isTraceEnabled()) log.trace("cache=" + cache.getLocalAddress() +
"; calling with " + call);
List<Address> mbrs = cache.getMembers();
MethodCall clusteredGet =
MethodCallFactory.create(MethodDeclarations.clusteredGetMethod, call, false);
- List resps = cache.getRPCManager().callRemoteMethods(mbrs, clusteredGet,
GroupRequest.GET_FIRST, true, config.getTimeout());
+ List resps = null;
+ // JBCACHE-1186
+ resps = cache.getRPCManager().callRemoteMethods(mbrs, clusteredGet,
GroupRequest.GET_FIRST, true, config.getTimeout(), new ResponseValidityFilter());
+
if (resps == null)
{
if (log.isInfoEnabled()) log.info("No replies to call " + call +
". Perhaps we're alone in the cluster?");
@@ -319,4 +323,25 @@
public void setRegionManager(RegionManager manager)
{
}
+
+ public static class ResponseValidityFilter implements RspFilter
+ {
+ private int numValidResponses = 0;
+
+ public boolean isAcceptable(Object object, Address address)
+ {
+ if (!(object instanceof List)) return false;
+
+ List response = (List) object;
+ Boolean foundResult = (Boolean) response.get(0);
+ if (foundResult) numValidResponses++;
+
+ return foundResult;
+ }
+
+ public boolean needMoreResponses()
+ {
+ return numValidResponses < 1;
+ }
+ }
}
Modified: core/trunk/src/test/java/org/jboss/cache/loader/ClusteredCacheLoaderTest.java
===================================================================
---
core/trunk/src/test/java/org/jboss/cache/loader/ClusteredCacheLoaderTest.java 2007-09-24
20:54:55 UTC (rev 4501)
+++
core/trunk/src/test/java/org/jboss/cache/loader/ClusteredCacheLoaderTest.java 2007-09-25
13:05:59 UTC (rev 4502)
@@ -6,16 +6,7 @@
*/
package org.jboss.cache.loader;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.Random;
-import java.util.Set;
-import java.util.concurrent.CopyOnWriteArraySet;
-import java.util.concurrent.CountDownLatch;
-
import junit.framework.Assert;
-
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.jboss.cache.CacheImpl;
@@ -26,12 +17,22 @@
import org.jboss.cache.lock.TimeoutException;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.CountDownLatch;
+
/**
* Tests ClusteredCacheLoader
*
* @author <a href="mailto:manik@jboss.org">Manik Surtani
(manik(a)jboss.org)</a>
*/
+@Test(groups = {"functional"})
public class ClusteredCacheLoaderTest extends AbstractCacheLoaderTestBase
{
private static Log log = LogFactory.getLog(ClusteredCacheLoaderTest.class);