Author: manik.surtani(a)jboss.com
Date: 2008-04-30 12:10:12 -0400 (Wed, 30 Apr 2008)
New Revision: 5775
Modified:
core/trunk/src/main/java/org/jboss/cache/RPCManager.java
core/trunk/src/main/java/org/jboss/cache/RPCManagerImpl.java
core/trunk/src/main/java/org/jboss/cache/buddyreplication/BuddyGroup.java
core/trunk/src/main/java/org/jboss/cache/buddyreplication/BuddyManager.java
core/trunk/src/main/java/org/jboss/cache/cluster/ReplicationQueue.java
core/trunk/src/main/java/org/jboss/cache/commands/write/InvalidateCommand.java
core/trunk/src/main/java/org/jboss/cache/interceptors/BaseRpcInterceptor.java
core/trunk/src/main/java/org/jboss/cache/interceptors/DataGravitatorInterceptor.java
core/trunk/src/main/java/org/jboss/cache/interceptors/InvalidationInterceptor.java
core/trunk/src/main/java/org/jboss/cache/invocation/NodeInvocationDelegate.java
core/trunk/src/main/java/org/jboss/cache/loader/ClusteredCacheLoader.java
core/trunk/src/test/java/org/jboss/cache/api/NodeReplicatedMoveTest.java
core/trunk/src/test/java/org/jboss/cache/api/pfer/PutForExternalReadTestBase.java
core/trunk/src/test/java/org/jboss/cache/cluster/ReplicationQueueTest.java
core/trunk/src/test/java/org/jboss/cache/marshall/ReturnValueMarshallingTest.java
core/trunk/src/test/java/org/jboss/cache/transaction/PrepareCommitContentionTest.java
Log:
Performance enhancements relating to interactions with the membership list and JGroups rpc
dispatcher APIs.
Modified: core/trunk/src/main/java/org/jboss/cache/RPCManager.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/RPCManager.java 2008-04-30 15:51:35 UTC (rev
5774)
+++ core/trunk/src/main/java/org/jboss/cache/RPCManager.java 2008-04-30 16:10:12 UTC (rev
5775)
@@ -6,6 +6,7 @@
import org.jgroups.blocks.RspFilter;
import java.util.List;
+import java.util.Vector;
/**
* Provides a mechanism for communicating with other caches in the cluster. For now this
is based on JGroups as an underlying
@@ -45,44 +46,41 @@
* Invokes an RPC call on other caches in the cluster.
*
* @param recipients a list of Addresses to invoke the call on. If this is
null, the call is broadcast to the entire cluster.
- * @param methodCall the method call to invoke
+ * @param cacheCommand the cache command to invoke
* @param mode the group request mode to use. See {@link
org.jgroups.blocks.GroupRequest}.
- * @param excludeSelf if true, the message is not looped back to the
originator.
* @param timeout a timeout after which to throw a replication exception.
* @param responseFilter a response filter with which to filter out
failed/unwanted/invalid responses.
* @param useOutOfBandMessage if true, the message is put on JGroups' OOB queue.
See JGroups docs for more info.
* @return a list of responses from each member contacted.
* @throws Exception in the event of problems.
*/
- List<Object> callRemoteMethods(List<Address> recipients, ReplicableCommand
cacheCommand, int mode, boolean excludeSelf, long timeout, RspFilter responseFilter,
boolean useOutOfBandMessage) throws Exception;
+ List<Object> callRemoteMethods(Vector<Address> recipients,
ReplicableCommand cacheCommand, int mode, long timeout, RspFilter responseFilter, boolean
useOutOfBandMessage) throws Exception;
/**
* Invokes an RPC call on other caches in the cluster.
*
* @param recipients a list of Addresses to invoke the call on. If this is
null, the call is broadcast to the entire cluster.
- * @param CacheCommand the method call to invoke
+ * @param cacheCommand the cache command to invoke
* @param mode the group request mode to use. See {@link
org.jgroups.blocks.GroupRequest}.
- * @param excludeSelf if true, the message is not looped back to the
originator.
* @param timeout a timeout after which to throw a replication exception.
* @param useOutOfBandMessage if true, the message is put on JGroups' OOB queue.
See JGroups docs for more info.
* @return a list of responses from each member contacted.
* @throws Exception in the event of problems.
*/
- List<Object> callRemoteMethods(List<Address> recipients, ReplicableCommand
cacheCommand, int mode, boolean excludeSelf, long timeout, boolean useOutOfBandMessage)
throws Exception;
+ List<Object> callRemoteMethods(Vector<Address> recipients,
ReplicableCommand cacheCommand, int mode, long timeout, boolean useOutOfBandMessage)
throws Exception;
/**
* Invokes an RPC call on other caches in the cluster.
*
* @param recipients a list of Addresses to invoke the call on. If this is
null, the call is broadcast to the entire cluster.
- * @param CacheCommand the method call to invoke
+ * @param cacheCommand the cache command to invoke
* @param synchronous if true, sets group request mode to {@link
org.jgroups.blocks.GroupRequest#GET_ALL}, and if false sets it to {@link
org.jgroups.blocks.GroupRequest#GET_NONE}.
- * @param excludeSelf if true, the message is not looped back to the
originator.
* @param timeout a timeout after which to throw a replication exception.
* @param useOutOfBandMessage if true, the message is put on JGroups' OOB queue.
See JGroups docs for more info.
* @return a list of responses from each member contacted.
* @throws Exception in the event of problems.
*/
- List<Object> callRemoteMethods(List<Address> recipients, ReplicableCommand
cacheCommand, boolean synchronous, boolean excludeSelf, int timeout, boolean
useOutOfBandMessage) throws Exception;
+ List<Object> callRemoteMethods(Vector<Address> recipients,
ReplicableCommand cacheCommand, boolean synchronous, long timeout, boolean
useOutOfBandMessage) throws Exception;
/**
* @return true if the current Channel is the coordinator of the cluster.
Modified: core/trunk/src/main/java/org/jboss/cache/RPCManagerImpl.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/RPCManagerImpl.java 2008-04-30 15:51:35 UTC
(rev 5774)
+++ core/trunk/src/main/java/org/jboss/cache/RPCManagerImpl.java 2008-04-30 16:10:12 UTC
(rev 5775)
@@ -60,7 +60,9 @@
{
private Channel channel;
private final Log log = LogFactory.getLog(RPCManagerImpl.class);
- private final List<Address> members = new LinkedList<Address>();
+ private List<Address> members;
+
+ private final Object coordinatorLock = new Object();
/**
* True if this Cache is the coordinator.
*/
@@ -208,14 +210,9 @@
log.info("Stopping the RpcDispatcher");
disp.stop();
}
- if (members != null)
- {
- synchronized (members)
- {
- members.clear();
- }
- }
+ if (members != null) members = null;
+
coordinator = false;
disp = null;
@@ -272,6 +269,9 @@
configuration.getRuntimeConfig().setChannel(channel);
}
+ // Channel.LOCAL *must* be set to false so we don't see our own messages -
otherwise invalidations targeted at
+ // remote instances will be received by self.
+ channel.setOpt(Channel.LOCAL, false);
channel.setOpt(Channel.AUTO_RECONNECT, true);
channel.setOpt(Channel.AUTO_GETSTATE, fetchState);
channel.setOpt(Channel.BLOCK, true);
@@ -373,53 +373,28 @@
// ------------ START: RPC call methods ------------
- public List<Object> callRemoteMethods(List<Address> recipients,
ReplicableCommand command, int mode, boolean excludeSelf, long timeout, boolean
useOutOfBandMessage) throws Exception
+ public List<Object> callRemoteMethods(Vector<Address> recipients,
ReplicableCommand command, int mode, long timeout, boolean useOutOfBandMessage) throws
Exception
{
- return callRemoteMethods(recipients, command, mode, excludeSelf, timeout, null,
useOutOfBandMessage);
+ return callRemoteMethods(recipients, command, mode, timeout, null,
useOutOfBandMessage);
}
- public List<Object> callRemoteMethods(List<Address> recipients,
ReplicableCommand command, boolean synchronous, boolean excludeSelf, int timeout, boolean
useOutOfBandMessage) throws Exception
+ public List<Object> callRemoteMethods(Vector<Address> recipients,
ReplicableCommand command, boolean synchronous, long timeout, boolean useOutOfBandMessage)
throws Exception
{
- return callRemoteMethods(recipients, command, synchronous ? GroupRequest.GET_ALL :
GroupRequest.GET_NONE, excludeSelf, timeout, useOutOfBandMessage);
+ return callRemoteMethods(recipients, command, synchronous ? GroupRequest.GET_ALL :
GroupRequest.GET_NONE, timeout, useOutOfBandMessage);
}
- public List<Object> callRemoteMethods(List<Address> recipients,
ReplicableCommand command, int mode, boolean excludeSelf, long timeout, RspFilter
responseFilter, boolean useOutOfBandMessage) throws Exception
+ public List<Object> callRemoteMethods(Vector<Address> recipients,
ReplicableCommand command, int mode, long timeout, RspFilter responseFilter, boolean
useOutOfBandMessage) throws Exception
{
+ // short circuit if we don't have an RpcDispatcher!
+ if (disp == null) return null;
+
int modeToUse = mode;
int preferredMode;
if ((preferredMode =
spi.getInvocationContext().getOptionOverrides().getGroupRequestMode()) > -1)
modeToUse = preferredMode;
- RspList rsps;
- List<Object> retval;
- Vector<Address> validMembers;
-
- if (disp == null)
- {
- return null;
- }
-
- if (recipients != null)
- validMembers = new Vector<Address>(recipients);
- else
- {
- validMembers = new Vector<Address>(members);
- }
-
- if (excludeSelf && !validMembers.isEmpty())
- {
- Address local_addr = getLocalAddress();
- if (local_addr != null) validMembers.remove(local_addr);
- }
-
- if (validMembers.isEmpty())
- {
- if (trace) log.trace("destination list is empty, discarding call");
- return null;
- }
-
if (trace)
- log.trace("callRemoteMethods(): valid members are " + validMembers +
" methods: " + command + " Using OOB? " + useOutOfBandMessage);
+ log.trace("callRemoteMethods(): valid members are " + recipients +
" methods: " + command + " Using OOB? " + useOutOfBandMessage);
if (channel.flushSupported())
{
@@ -429,13 +404,12 @@
useOutOfBandMessage = false;
- rsps = disp.invokeRemoteCommands(validMembers, command, modeToUse, timeout,
isUsingBuddyReplication, useOutOfBandMessage, responseFilter);
+ RspList rsps = disp.invokeRemoteCommands(recipients, command, modeToUse, timeout,
isUsingBuddyReplication, useOutOfBandMessage, responseFilter);
// a null response is 99% likely to be due to a marshalling problem - we throw a
NSE, this needs to be changed when
// JGroups supports
http://jira.jboss.com/jira/browse/JGRP-193
if (rsps == null)
{
- // return null;
throw new NotSerializableException("RpcDispatcher returned a null. This is
most often caused by args for " + command.getClass().getSimpleName() + " not
being serializable.");
}
if (mode == GroupRequest.GET_NONE) return Collections.emptyList();// async case
@@ -443,7 +417,7 @@
if (trace)
log.trace("(" + getLocalAddress() + "): responses for method
" + command.getClass().getSimpleName() + ":\n" + rsps);
- retval = new ArrayList<Object>(rsps.size());
+ List<Object> retval = new ArrayList<Object>(rsps.size());
for (Rsp rsp : rsps.values())
{
@@ -561,10 +535,10 @@
public List<Address> getMembers()
{
if (isInLocalMode) return null;
- synchronized (members)
- {
- return new ArrayList<Address>(members);
- }
+ if (members == null)
+ return Collections.emptyList();
+ else
+ return members;
}
public boolean isCoordinator()
@@ -579,14 +553,14 @@
return null;
}
- synchronized (members)
+ synchronized (coordinatorLock)
{
- while (members.isEmpty())
+ while (members == null || members.isEmpty())
{
log.debug("getCoordinator(): waiting on viewAccepted()");
try
{
- members.wait();
+ coordinatorLock.wait();
}
catch (InterruptedException e)
{
@@ -594,7 +568,7 @@
break;
}
}
- return members.size() > 0 ? members.get(0) : null;
+ return members != null && members.size() > 0 ? members.get(0) :
null;
}
}
@@ -605,40 +579,42 @@
protected class MembershipListenerAdaptor implements ExtendedMembershipListener
{
- public void viewAccepted(View new_view)
+ public void viewAccepted(View newView)
{
- Vector<Address> new_mbrs = new_view.getMembers();
- if (log.isInfoEnabled()) log.info("Received new cluster view: " +
new_view);
- synchronized (members)
+ Vector<Address> newMembers = newView.getMembers();
+ if (log.isInfoEnabled()) log.info("Received new cluster view: " +
newView);
+ synchronized (coordinatorLock)
{
boolean needNotification = false;
- if (new_mbrs != null)
+ if (newMembers != null)
{
- // Determine what members have been removed
- // and roll back any tx and break any locks
- Vector<Address> removed = new Vector<Address>(members);
- removed.removeAll(new_mbrs);
- removeLocksForDeadMembers(spi.getRoot(), removed);
+ if (members != null)
+ {
+ // we had a membership list before this event. Check to make sure we
haven't lost any members,
+ // and if so, determine what members have been removed
+ // and roll back any tx and break any locks
+ List<Address> removed = new ArrayList<Address>(members);
+ removed.removeAll(newMembers);
+ removeLocksForDeadMembers(spi.getRoot(), removed);
+ }
- members.clear();
- members.addAll(new_mbrs);
+ members = new ArrayList<Address>(newMembers); // defensive copy.
needNotification = true;
}
// Now that we have a view, figure out if we are the coordinator
- coordinator = (members.size() != 0 &&
members.get(0).equals(getLocalAddress()));
+ coordinator = (members != null && members.size() != 0 &&
members.get(0).equals(getLocalAddress()));
// now notify listeners - *after* updating the coordinator. - JBCACHE-662
if (needNotification && notifier != null)
{
InvocationContext ctx = spi.getInvocationContext();
- notifier.notifyViewChange(new_view, ctx);
+ notifier.notifyViewChange(newView, ctx);
}
- // Wake up any threads that are waiting to know who the members
- // are so they can figure out who the coordinator is
- members.notifyAll();
+ // Wake up any threads that are waiting to know about who the coordinator is
+ coordinatorLock.notifyAll();
}
}
Modified: core/trunk/src/main/java/org/jboss/cache/buddyreplication/BuddyGroup.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/buddyreplication/BuddyGroup.java 2008-04-30
15:51:35 UTC (rev 5774)
+++ core/trunk/src/main/java/org/jboss/cache/buddyreplication/BuddyGroup.java 2008-04-30
16:10:12 UTC (rev 5775)
@@ -15,7 +15,7 @@
import java.util.Collections;
import java.util.Date;
import java.util.List;
-import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.Vector;
/**
* Value object that represents a buddy group
@@ -39,7 +39,7 @@
/**
* List<Address> - a list of JGroups addresses
*/
- private final List<Address> buddies = new
CopyOnWriteArrayList<Address>();
+ private final Vector<Address> buddies = new Vector<Address>();
public String getGroupName()
{
@@ -97,4 +97,14 @@
return b.toString();
}
+ /**
+ * Added in 2.2.0 as an optimisation for JGroups which internally uses vectors.
+ *
+ * @return a list of buddies
+ * @since 2.2.0
+ */
+ public Vector<Address> getBuddiesAsVector()
+ {
+ return buddies;
+ }
}
Modified: core/trunk/src/main/java/org/jboss/cache/buddyreplication/BuddyManager.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/buddyreplication/BuddyManager.java 2008-04-30
15:51:35 UTC (rev 5774)
+++ core/trunk/src/main/java/org/jboss/cache/buddyreplication/BuddyManager.java 2008-04-30
16:10:12 UTC (rev 5775)
@@ -639,6 +639,16 @@
return buddyGroup.getBuddies();
}
+ /**
+ * Created as an optimisation for JGroups, which uses vectors.
+ *
+ * @since 2.2.0
+ */
+ public Vector<Address> getBuddyAddressesAsVector()
+ {
+ return buddyGroup.getBuddiesAsVector();
+ }
+
public List<Address> getMembersOutsideBuddyGroup()
{
List<Address> members = new
ArrayList<Address>(rpcManager.getMembers());
@@ -981,7 +991,7 @@
}
}
- rpcManager.callRemoteMethods(recipients, call, true, true,
config.getBuddyCommunicationTimeout(), false);
+ rpcManager.callRemoteMethods(new Vector<Address>(recipients), call, true,
config.getBuddyCommunicationTimeout(), false);
}
Modified: core/trunk/src/main/java/org/jboss/cache/cluster/ReplicationQueue.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/cluster/ReplicationQueue.java 2008-04-30
15:51:35 UTC (rev 5774)
+++ core/trunk/src/main/java/org/jboss/cache/cluster/ReplicationQueue.java 2008-04-30
16:10:12 UTC (rev 5775)
@@ -154,7 +154,7 @@
ReplicateCommand replicateCommand =
commandsFactory.buildReplicateCommand(toReplicate);
// send to all live nodes in the cluster
- rpcManager.callRemoteMethods(null, replicateCommand, false, true, 5000,
false);
+ rpcManager.callRemoteMethods(null, replicateCommand, false,
configuration.getSyncReplTimeout(), false);
}
catch (Throwable t)
{
Modified: core/trunk/src/main/java/org/jboss/cache/commands/write/InvalidateCommand.java
===================================================================
---
core/trunk/src/main/java/org/jboss/cache/commands/write/InvalidateCommand.java 2008-04-30
15:51:35 UTC (rev 5774)
+++
core/trunk/src/main/java/org/jboss/cache/commands/write/InvalidateCommand.java 2008-04-30
16:10:12 UTC (rev 5775)
@@ -2,9 +2,13 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.jboss.cache.*;
+import org.jboss.cache.CacheException;
+import org.jboss.cache.CacheSPI;
+import org.jboss.cache.Fqn;
+import org.jboss.cache.InvocationContext;
+import org.jboss.cache.Node;
+import org.jboss.cache.NodeSPI;
import org.jboss.cache.commands.Visitor;
-import org.jboss.cache.config.Configuration;
import org.jboss.cache.config.Option;
import org.jboss.cache.optimistic.DataVersion;
@@ -118,7 +122,10 @@
// mark the node to be removed (and all children) as invalid so anyone holding a
direct reference to it will
// be aware that it is no longer valid.
- ((NodeSPI) node).setValid(false, true);
+ NodeSPI nSPI = (NodeSPI) node;
+ nSPI.setValid(false, true);
+ // root nodes can never be invalid
+ if (fqn.isRoot()) nSPI.setValid(true, false); // non-recursive.
if (dataVersion != null)
{
Modified: core/trunk/src/main/java/org/jboss/cache/interceptors/BaseRpcInterceptor.java
===================================================================
---
core/trunk/src/main/java/org/jboss/cache/interceptors/BaseRpcInterceptor.java 2008-04-30
15:51:35 UTC (rev 5774)
+++
core/trunk/src/main/java/org/jboss/cache/interceptors/BaseRpcInterceptor.java 2008-04-30
16:10:12 UTC (rev 5775)
@@ -22,6 +22,7 @@
import javax.transaction.Transaction;
import java.util.List;
+import java.util.Vector;
/**
* Acts as a base for all RPC calls - subclassed by
@@ -90,7 +91,7 @@
replicateCall(ctx, null, call, sync, o, false);
}
- protected void replicateCall(InvocationContext ctx, List<Address> recipients,
ReplicableCommand c, boolean sync, Option o, boolean useOutOfBandMessage) throws
Throwable
+ protected void replicateCall(InvocationContext ctx, Vector<Address> recipients,
ReplicableCommand c, boolean sync, Option o, boolean useOutOfBandMessage) throws
Throwable
{
long syncReplTimeout = configuration.getSyncReplTimeout();
@@ -116,10 +117,10 @@
}
}
- replicateCall(recipients, c, sync, true, useOutOfBandMessage, false, (int)
syncReplTimeout);
+ replicateCall(recipients, c, sync, true, useOutOfBandMessage, false,
syncReplTimeout);
}
- protected void replicateCall(List<Address> recipients, ReplicableCommand call,
boolean sync, boolean wrapCacheCommandInReplicateMethod, boolean useOutOfBandMessage,
boolean isBroadcast, int timeout) throws Throwable
+ protected void replicateCall(Vector<Address> recipients, ReplicableCommand call,
boolean sync, boolean wrapCacheCommandInReplicateMethod, boolean useOutOfBandMessage,
boolean isBroadcast, long timeout) throws Throwable
{
if (trace) log.trace("Broadcasting call " + call + " to recipient
list " + recipients);
@@ -132,10 +133,10 @@
{
if (usingBuddyReplication && !isBroadcast) call =
buddyManager.transformFqns((VisitableCommand) call);
- List<Address> callRecipients = recipients;
+ Vector<Address> callRecipients = recipients;
if (callRecipients == null)
{
- callRecipients = usingBuddyReplication && !isBroadcast ?
buddyManager.getBuddyAddresses() : rpcManager.getMembers();
+ callRecipients = usingBuddyReplication && !isBroadcast ?
buddyManager.getBuddyAddressesAsVector() : null;
if (trace)
log.trace("Setting call recipients to " + callRecipients +
" since the original list of recipients passed in is null.");
}
@@ -145,7 +146,6 @@
List rsps = rpcManager.callRemoteMethods(callRecipients,
toCall,
sync, // is synchronised?
- true, // ignore self?
timeout,
useOutOfBandMessage
);
Modified:
core/trunk/src/main/java/org/jboss/cache/interceptors/DataGravitatorInterceptor.java
===================================================================
---
core/trunk/src/main/java/org/jboss/cache/interceptors/DataGravitatorInterceptor.java 2008-04-30
15:51:35 UTC (rev 5774)
+++
core/trunk/src/main/java/org/jboss/cache/interceptors/DataGravitatorInterceptor.java 2008-04-30
16:10:12 UTC (rev 5775)
@@ -310,7 +310,7 @@
GravitateDataCommand command = commandsFactory.buildGravitateDataCommand(fqn,
searchSubtrees);
// doing a GET_ALL is crappy but necessary since JGroups' GET_FIRST could
return null results from nodes that do
// not have either the primary OR backup, and stop polling other valid nodes.
- List resps = rpcManager.callRemoteMethods(mbrs, command, GroupRequest.GET_ALL,
true, buddyManager.getBuddyCommunicationTimeout(), new ResponseValidityFilter(mbrs,
rpcManager.getLocalAddress()), false);
+ List resps = rpcManager.callRemoteMethods(null, command, GroupRequest.GET_ALL,
buddyManager.getBuddyCommunicationTimeout(), new ResponseValidityFilter(mbrs,
rpcManager.getLocalAddress()), false);
if (trace) log.trace("got responses " + resps);
Modified:
core/trunk/src/main/java/org/jboss/cache/interceptors/InvalidationInterceptor.java
===================================================================
---
core/trunk/src/main/java/org/jboss/cache/interceptors/InvalidationInterceptor.java 2008-04-30
15:51:35 UTC (rev 5774)
+++
core/trunk/src/main/java/org/jboss/cache/interceptors/InvalidationInterceptor.java 2008-04-30
16:10:12 UTC (rev 5775)
@@ -81,12 +81,11 @@
{
Option optionOverride = ctx.getOptionOverrides();
if (optionOverride != null && optionOverride.isCacheModeLocal() &&
(ctx.getTransaction() == null))
- //||
MethodDeclarations.isTransactionLifecycleMethod(ctx.getMethodCall().getMethodId())))
{
// skip replication!!
return true;
}
- if (trace) log.trace("(" + rpcManager.getLocalAddress() + ") method
call " + ctx.getMethodCall());
+ if (trace) log.trace("(" + rpcManager.getLocalAddress() + ") Command
" + ctx.getCommand());
return false;
}
@@ -134,7 +133,7 @@
Transaction tx = ctx.getTransaction();
if (tx != null && !optimistic)
{
- log.debug("Entering InvalidationInterceptor's prepare phase");
+ if (trace) log.trace("Entering InvalidationInterceptor's prepare
phase");
// fetch the modifications before the transaction is committed (and thus removed
from the txTable)
GlobalTransaction gtx = ctx.getGlobalTransaction();
TransactionEntry entry = txTable.get(gtx);
@@ -146,7 +145,7 @@
}
else
{
- log.debug("Nothing to invalidate - no modifications in the
transaction.");
+ if (trace) log.trace("Nothing to invalidate - no modifications in the
transaction.");
}
}
return retval;
@@ -182,7 +181,7 @@
GlobalTransaction gtx = ctx.getGlobalTransaction();
List<ReversibleCommand> modifications = txMods.remove(gtx);
broadcastInvalidate(modifications, gtx, tx, ctx);
- log.debug("Committing. Broadcasting invalidations.");
+ if (trace) log.trace("Committing. Broadcasting invalidations.");
}
return retval;
}
Modified: core/trunk/src/main/java/org/jboss/cache/invocation/NodeInvocationDelegate.java
===================================================================
---
core/trunk/src/main/java/org/jboss/cache/invocation/NodeInvocationDelegate.java 2008-04-30
15:51:35 UTC (rev 5774)
+++
core/trunk/src/main/java/org/jboss/cache/invocation/NodeInvocationDelegate.java 2008-04-30
16:10:12 UTC (rev 5775)
@@ -493,7 +493,7 @@
protected void assertValid()
{
- if (!node.isValid())
+ if (!getFqn().isRoot() && !node.isValid())
throw new NodeNotValidException("Node " + getFqn() + " is not
valid. Perhaps it has been moved or removed.");
}
Modified: core/trunk/src/main/java/org/jboss/cache/loader/ClusteredCacheLoader.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/loader/ClusteredCacheLoader.java 2008-04-30
15:51:35 UTC (rev 5774)
+++ core/trunk/src/main/java/org/jboss/cache/loader/ClusteredCacheLoader.java 2008-04-30
16:10:12 UTC (rev 5775)
@@ -112,11 +112,10 @@
private Object callRemote(DataCommand dataCommand) throws Exception
{
if (trace) log.trace("cache=" + cache.getLocalAddress() + "; calling
with " + dataCommand);
- List<Address> mbrs = cache.getMembers();
ClusteredGetCommand clusteredGet = commandsFactory.buildClusteredGetCommand(false,
dataCommand);
- List resps = null;
+ List resps;
// JBCACHE-1186
- resps = cache.getRPCManager().callRemoteMethods(mbrs, clusteredGet,
GroupRequest.GET_ALL, true, config.getTimeout(), new ResponseValidityFilter(mbrs,
cache.getLocalAddress()), false);
+ resps = cache.getRPCManager().callRemoteMethods(null, clusteredGet,
GroupRequest.GET_ALL, config.getTimeout(), new ResponseValidityFilter(cache.getMembers(),
cache.getLocalAddress()), false);
if (resps == null)
{
Modified: core/trunk/src/test/java/org/jboss/cache/api/NodeReplicatedMoveTest.java
===================================================================
--- core/trunk/src/test/java/org/jboss/cache/api/NodeReplicatedMoveTest.java 2008-04-30
15:51:35 UTC (rev 5774)
+++ core/trunk/src/test/java/org/jboss/cache/api/NodeReplicatedMoveTest.java 2008-04-30
16:10:12 UTC (rev 5775)
@@ -11,6 +11,7 @@
import org.jboss.cache.DefaultCacheFactory;
import org.jboss.cache.Fqn;
import org.jboss.cache.Node;
+import org.jboss.cache.NodeNotExistsException;
import org.jboss.cache.NodeSPI;
import org.jboss.cache.config.Configuration;
import org.jboss.cache.config.Configuration.CacheMode;
@@ -93,8 +94,11 @@
cache2.stop();
cache1.destroy();
cache2.destroy();
-
cache1.getConfiguration().setNodeLockingScheme(Configuration.NodeLockingScheme.OPTIMISTIC);
-
cache2.getConfiguration().setNodeLockingScheme(Configuration.NodeLockingScheme.OPTIMISTIC);
+ if (optimistic)
+ {
+
cache1.getConfiguration().setNodeLockingScheme(Configuration.NodeLockingScheme.OPTIMISTIC);
+
cache2.getConfiguration().setNodeLockingScheme(Configuration.NodeLockingScheme.OPTIMISTIC);
+ }
cache1.getConfiguration().setCacheMode(Configuration.CacheMode.INVALIDATION_SYNC);
cache2.getConfiguration().setCacheMode(Configuration.CacheMode.INVALIDATION_SYNC);
cache1.start();
@@ -109,8 +113,8 @@
assertEquals(vA, cache1.getRoot().getChild(A).get(k));
assertEquals(vB, cache1.getRoot().getChild(A).getChild(B).get(k));
- assertInvalidated(cache2, A, "Should be invalidated");
- assertInvalidated(cache2, Fqn.fromRelativeElements(A, B.getLastElement()),
"Should be invalidated");
+ assertInvalidated(cache2, A, "Should be invalidated", optimistic);
+ assertInvalidated(cache2, Fqn.fromRelativeElements(A, B.getLastElement()),
"Should be invalidated", optimistic);
// now move...
cache1.move(nodeB.getFqn(), Fqn.ROOT);
@@ -118,22 +122,30 @@
assertEquals(vA, cache1.getRoot().getChild(A).get(k));
assertEquals(vB, cache1.getRoot().getChild(B).get(k));
- assertInvalidated(cache2, A, "Should be invalidated");
- assertInvalidated(cache2, B, "Should be invalidated");
+ assertInvalidated(cache2, A, "Should be invalidated", optimistic);
+ assertInvalidated(cache2, B, "Should be invalidated", optimistic);
// now make sure a node exists on cache 2
cache2.getRoot().addChild(A).put("k2", "v2");
// te invalidation will happen in afterCompletion, hence no exception!
- cache1.move(B, A);// should throw an NPE
+ try
+ {
+ cache1.move(B, A);// should throw an NPE
+ if (!optimistic) assert false : "Should throw an exception!";
+ }
+ catch (NodeNotExistsException expected)
+ {
+ if (optimistic) assert false : "Should not have thrown an
exception!";
+ }
}
- private void assertInvalidated(Cache cache, Fqn fqn, String msg)
+ private void assertInvalidated(Cache cache, Fqn fqn, String msg, boolean optimistic)
{
assert cache.getRoot().getChild(fqn) == null : msg;
NodeSPI n = ((CacheSPI) cache).peek(fqn, true, true);
- assert n != null : msg;
- assert !n.isValid() : msg;
+ assert n == null || optimistic : msg;
+ assert !optimistic || !n.isValid() : msg;
}
public void testReplTxCommit() throws Exception
Modified:
core/trunk/src/test/java/org/jboss/cache/api/pfer/PutForExternalReadTestBase.java
===================================================================
---
core/trunk/src/test/java/org/jboss/cache/api/pfer/PutForExternalReadTestBase.java 2008-04-30
15:51:35 UTC (rev 5774)
+++
core/trunk/src/test/java/org/jboss/cache/api/pfer/PutForExternalReadTestBase.java 2008-04-30
16:10:12 UTC (rev 5775)
@@ -28,6 +28,7 @@
import javax.transaction.Transaction;
import javax.transaction.TransactionManager;
import java.util.List;
+import java.util.Vector;
@Test(groups = {"functional", "jgroups", "transaction"})
public abstract class PutForExternalReadTestBase
@@ -158,7 +159,7 @@
assertEquals("PFER should have been a no-op", value, cache2.get(fqn,
key));
}
- private List<Address> anyAddresses()
+ private Vector<Address> anyAddresses()
{
anyObject();
return null;
@@ -178,7 +179,7 @@
{
// specify what we expect called on the mock Rpc Manager. For params we
don't care about, just use ANYTHING.
// setting the mock object to expect the "sync" param to be false.
- expect(rpcManager.callRemoteMethods(anyAddresses(), (ReplicableCommand)
anyObject(), eq(false), anyBoolean(), anyInt(), anyBoolean())).andReturn(null);
+ expect(rpcManager.callRemoteMethods(anyAddresses(), (ReplicableCommand)
anyObject(), eq(false), anyLong(), anyBoolean())).andReturn(null);
}
replay(rpcManager);
@@ -241,7 +242,7 @@
List<Address> memberList = originalRpcManager.getMembers();
expect(barfingRpcManager.getMembers()).andReturn(memberList).anyTimes();
expect(barfingRpcManager.getLocalAddress()).andReturn(originalRpcManager.getLocalAddress()).anyTimes();
- expect(barfingRpcManager.callRemoteMethods(anyAddresses(), (ReplicableCommand)
anyObject(), anyBoolean(), anyBoolean(), anyInt(), anyBoolean())).andThrow(new
RuntimeException("Barf!")).anyTimes();
+ expect(barfingRpcManager.callRemoteMethods(anyAddresses(), (ReplicableCommand)
anyObject(), anyBoolean(), anyLong(), anyBoolean())).andThrow(new
RuntimeException("Barf!")).anyTimes();
replay(barfingRpcManager);
TestingUtil.extractComponentRegistry(cache1).registerComponent(RPCManager.class.getName(),
barfingRpcManager, RPCManager.class);
Modified: core/trunk/src/test/java/org/jboss/cache/cluster/ReplicationQueueTest.java
===================================================================
--- core/trunk/src/test/java/org/jboss/cache/cluster/ReplicationQueueTest.java 2008-04-30
15:51:35 UTC (rev 5774)
+++ core/trunk/src/test/java/org/jboss/cache/cluster/ReplicationQueueTest.java 2008-04-30
16:10:12 UTC (rev 5775)
@@ -16,7 +16,7 @@
import org.testng.annotations.Test;
import java.util.Collections;
-import java.util.List;
+import java.util.Vector;
import java.util.concurrent.CountDownLatch;
@Test(groups = "functional")
@@ -82,7 +82,7 @@
// now try the last PUT which should result in the queue being flushed.
expect(mockRpcManager.getMembers()).andReturn(originalRpcManager.getMembers()).anyTimes();
- expect(mockRpcManager.callRemoteMethods((List<Address>) anyObject(),
(ReplicableCommand) anyObject(), anyBoolean(), anyBoolean(), anyInt(),
anyBoolean())).andReturn(Collections.emptyList()).anyTimes();
+ expect(mockRpcManager.callRemoteMethods((Vector<Address>) anyObject(),
(ReplicableCommand) anyObject(), anyBoolean(), anyLong(),
anyBoolean())).andReturn(Collections.emptyList()).anyTimes();
replay(mockRpcManager);
cache.put("/a/b/c/LAST", "k", "v");
@@ -111,7 +111,7 @@
// expect basic cluster related calls
expect(mockRpcManager.getMembers()).andReturn(originalRpcManager.getMembers()).anyTimes();
- expect(mockRpcManager.callRemoteMethods((List<Address>) anyObject(),
(ReplicableCommand) anyObject(), anyBoolean(), anyBoolean(), anyInt(),
anyBoolean())).andReturn(Collections.emptyList()).anyTimes();
+ expect(mockRpcManager.callRemoteMethods((Vector<Address>) anyObject(),
(ReplicableCommand) anyObject(), anyBoolean(), anyLong(),
anyBoolean())).andReturn(Collections.emptyList()).anyTimes();
replay(mockRpcManager);
Thread[] threads = new Thread[numThreads];
Modified:
core/trunk/src/test/java/org/jboss/cache/marshall/ReturnValueMarshallingTest.java
===================================================================
---
core/trunk/src/test/java/org/jboss/cache/marshall/ReturnValueMarshallingTest.java 2008-04-30
15:51:35 UTC (rev 5774)
+++
core/trunk/src/test/java/org/jboss/cache/marshall/ReturnValueMarshallingTest.java 2008-04-30
16:10:12 UTC (rev 5775)
@@ -102,7 +102,7 @@
DataCommand command = new GetKeyValueCommand(fqn, key, false);
ClusteredGetCommand clusteredGet = new ClusteredGetCommand(false, command);
- List responses = cache1.getRPCManager().callRemoteMethods(null, clusteredGet, true,
true, 15000, false);
+ List responses = cache1.getRPCManager().callRemoteMethods(null, clusteredGet, true,
15000, false);
List response1 = (List) responses.get(0);// response from the first (and only)
node
Boolean found = (Boolean) response1.get(0);
@@ -130,7 +130,7 @@
GravitateDataCommand gravitateDataCommand = new GravitateDataCommand(fqn, false);
- List responses = cache1.getRPCManager().callRemoteMethods(null,
gravitateDataCommand, true, true, 15000, false);
+ List responses = cache1.getRPCManager().callRemoteMethods(null,
gravitateDataCommand, true, 15000, false);
GravitateResult data = (GravitateResult) responses.get(0);// response from the
first (and only) node
assertTrue("Should have found remote data", data.isDataFound());
Modified:
core/trunk/src/test/java/org/jboss/cache/transaction/PrepareCommitContentionTest.java
===================================================================
---
core/trunk/src/test/java/org/jboss/cache/transaction/PrepareCommitContentionTest.java 2008-04-30
15:51:35 UTC (rev 5774)
+++
core/trunk/src/test/java/org/jboss/cache/transaction/PrepareCommitContentionTest.java 2008-04-30
16:10:12 UTC (rev 5775)
@@ -22,6 +22,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Vector;
/**
* This is to test the scenario described in
http://jira.jboss.org/jira/browse/JBCACHE-1270
@@ -101,22 +102,22 @@
}
}
- public List<Object> callRemoteMethods(List<Address> recipients,
ReplicableCommand cacheCommand, int mode, boolean excludeSelf, long timeout, RspFilter
responseFilter, boolean useOutOfBandMessage) throws Exception
+ public List<Object> callRemoteMethods(Vector<Address> recipients,
ReplicableCommand cacheCommand, int mode, long timeout, RspFilter responseFilter, boolean
useOutOfBandMessage) throws Exception
{
logCall(cacheCommand, useOutOfBandMessage);
- return delegate.callRemoteMethods(recipients, cacheCommand, mode, excludeSelf,
timeout, responseFilter, useOutOfBandMessage);
+ return delegate.callRemoteMethods(recipients, cacheCommand, mode, timeout,
responseFilter, useOutOfBandMessage);
}
- public List<Object> callRemoteMethods(List<Address> recipients,
ReplicableCommand cacheCommand, int mode, boolean excludeSelf, long timeout, boolean
useOutOfBandMessage) throws Exception
+ public List<Object> callRemoteMethods(Vector<Address> recipients,
ReplicableCommand cacheCommand, int mode, long timeout, boolean useOutOfBandMessage)
throws Exception
{
logCall(cacheCommand, useOutOfBandMessage);
- return delegate.callRemoteMethods(recipients, cacheCommand, mode, excludeSelf,
timeout, useOutOfBandMessage);
+ return delegate.callRemoteMethods(recipients, cacheCommand, mode, timeout,
useOutOfBandMessage);
}
- public List<Object> callRemoteMethods(List<Address> recipients,
ReplicableCommand command, boolean synchronous, boolean excludeSelf, int timeout, boolean
useOutOfBandMessage) throws Exception
+ public List<Object> callRemoteMethods(Vector<Address> recipients,
ReplicableCommand command, boolean synchronous, long timeout, boolean useOutOfBandMessage)
throws Exception
{
logCall(command, useOutOfBandMessage);
- return delegate.callRemoteMethods(recipients, command, synchronous, excludeSelf,
timeout, useOutOfBandMessage);
+ return delegate.callRemoteMethods(recipients, command, synchronous, timeout,
useOutOfBandMessage);
}
public boolean isCoordinator()