[jbosscache-commits] JBoss Cache SVN: r5775 - in core/trunk/src: main/java/org/jboss/cache/buddyreplication and 10 other directories.

jbosscache-commits at lists.jboss.org jbosscache-commits at lists.jboss.org
Wed Apr 30 12:10:12 EDT 2008


Author: manik.surtani at jboss.com
Date: 2008-04-30 12:10:12 -0400 (Wed, 30 Apr 2008)
New Revision: 5775

Modified:
   core/trunk/src/main/java/org/jboss/cache/RPCManager.java
   core/trunk/src/main/java/org/jboss/cache/RPCManagerImpl.java
   core/trunk/src/main/java/org/jboss/cache/buddyreplication/BuddyGroup.java
   core/trunk/src/main/java/org/jboss/cache/buddyreplication/BuddyManager.java
   core/trunk/src/main/java/org/jboss/cache/cluster/ReplicationQueue.java
   core/trunk/src/main/java/org/jboss/cache/commands/write/InvalidateCommand.java
   core/trunk/src/main/java/org/jboss/cache/interceptors/BaseRpcInterceptor.java
   core/trunk/src/main/java/org/jboss/cache/interceptors/DataGravitatorInterceptor.java
   core/trunk/src/main/java/org/jboss/cache/interceptors/InvalidationInterceptor.java
   core/trunk/src/main/java/org/jboss/cache/invocation/NodeInvocationDelegate.java
   core/trunk/src/main/java/org/jboss/cache/loader/ClusteredCacheLoader.java
   core/trunk/src/test/java/org/jboss/cache/api/NodeReplicatedMoveTest.java
   core/trunk/src/test/java/org/jboss/cache/api/pfer/PutForExternalReadTestBase.java
   core/trunk/src/test/java/org/jboss/cache/cluster/ReplicationQueueTest.java
   core/trunk/src/test/java/org/jboss/cache/marshall/ReturnValueMarshallingTest.java
   core/trunk/src/test/java/org/jboss/cache/transaction/PrepareCommitContentionTest.java
Log:
Performance enhancements relating to interactions with the membership list and JGroups rpc dispatcher APIs.

Modified: core/trunk/src/main/java/org/jboss/cache/RPCManager.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/RPCManager.java	2008-04-30 15:51:35 UTC (rev 5774)
+++ core/trunk/src/main/java/org/jboss/cache/RPCManager.java	2008-04-30 16:10:12 UTC (rev 5775)
@@ -6,6 +6,7 @@
 import org.jgroups.blocks.RspFilter;
 
 import java.util.List;
+import java.util.Vector;
 
 /**
  * Provides a mechanism for communicating with other caches in the cluster.  For now this is based on JGroups as an underlying
@@ -45,44 +46,41 @@
     * Invokes an RPC call on other caches in the cluster.
     *
     * @param recipients          a list of Addresses to invoke the call on.  If this is null, the call is broadcast to the entire cluster.
-    * @param methodCall          the method call to invoke
+    * @param cacheCommand        the cache command to invoke
     * @param mode                the group request mode to use.  See {@link org.jgroups.blocks.GroupRequest}.
-    * @param excludeSelf         if true, the message is not looped back to the originator.
     * @param timeout             a timeout after which to throw a replication exception.
     * @param responseFilter      a response filter with which to filter out failed/unwanted/invalid responses.
     * @param useOutOfBandMessage if true, the message is put on JGroups' OOB queue.  See JGroups docs for more info.
     * @return a list of responses from each member contacted.
     * @throws Exception in the event of problems.
     */
-   List<Object> callRemoteMethods(List<Address> recipients, ReplicableCommand cacheCommand, int mode, boolean excludeSelf, long timeout, RspFilter responseFilter, boolean useOutOfBandMessage) throws Exception;
+   List<Object> callRemoteMethods(Vector<Address> recipients, ReplicableCommand cacheCommand, int mode, long timeout, RspFilter responseFilter, boolean useOutOfBandMessage) throws Exception;
 
    /**
     * Invokes an RPC call on other caches in the cluster.
     *
     * @param recipients          a list of Addresses to invoke the call on.  If this is null, the call is broadcast to the entire cluster.
-    * @param CacheCommand        the method call to invoke
+    * @param cacheCommand        the cache command to invoke
     * @param mode                the group request mode to use.  See {@link org.jgroups.blocks.GroupRequest}.
-    * @param excludeSelf         if true, the message is not looped back to the originator.
     * @param timeout             a timeout after which to throw a replication exception.
     * @param useOutOfBandMessage if true, the message is put on JGroups' OOB queue.  See JGroups docs for more info.
     * @return a list of responses from each member contacted.
     * @throws Exception in the event of problems.
     */
-   List<Object> callRemoteMethods(List<Address> recipients, ReplicableCommand cacheCommand, int mode, boolean excludeSelf, long timeout, boolean useOutOfBandMessage) throws Exception;
+   List<Object> callRemoteMethods(Vector<Address> recipients, ReplicableCommand cacheCommand, int mode, long timeout, boolean useOutOfBandMessage) throws Exception;
 
    /**
     * Invokes an RPC call on other caches in the cluster.
     *
     * @param recipients          a list of Addresses to invoke the call on.  If this is null, the call is broadcast to the entire cluster.
-    * @param CacheCommand        the method call to invoke
+    * @param cacheCommand        the cache command to invoke
     * @param synchronous         if true, sets group request mode to {@link org.jgroups.blocks.GroupRequest#GET_ALL}, and if false sets it to {@link org.jgroups.blocks.GroupRequest#GET_NONE}.
-    * @param excludeSelf         if true, the message is not looped back to the originator.
     * @param timeout             a timeout after which to throw a replication exception.
     * @param useOutOfBandMessage if true, the message is put on JGroups' OOB queue.  See JGroups docs for more info.
     * @return a list of responses from each member contacted.
     * @throws Exception in the event of problems.
     */
-   List<Object> callRemoteMethods(List<Address> recipients, ReplicableCommand cacheCommand, boolean synchronous, boolean excludeSelf, int timeout, boolean useOutOfBandMessage) throws Exception;
+   List<Object> callRemoteMethods(Vector<Address> recipients, ReplicableCommand cacheCommand, boolean synchronous, long timeout, boolean useOutOfBandMessage) throws Exception;
 
    /**
     * @return true if the current Channel is the coordinator of the cluster.

Modified: core/trunk/src/main/java/org/jboss/cache/RPCManagerImpl.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/RPCManagerImpl.java	2008-04-30 15:51:35 UTC (rev 5774)
+++ core/trunk/src/main/java/org/jboss/cache/RPCManagerImpl.java	2008-04-30 16:10:12 UTC (rev 5775)
@@ -60,7 +60,9 @@
 {
    private Channel channel;
    private final Log log = LogFactory.getLog(RPCManagerImpl.class);
-   private final List<Address> members = new LinkedList<Address>();
+   private List<Address> members;
+
+   private final Object coordinatorLock = new Object();
    /**
     * True if this Cache is the coordinator.
     */
@@ -208,14 +210,9 @@
          log.info("Stopping the RpcDispatcher");
          disp.stop();
       }
-      if (members != null)
-      {
-         synchronized (members)
-         {
-            members.clear();
-         }
-      }
 
+      if (members != null) members = null;
+
       coordinator = false;
 
       disp = null;
@@ -272,6 +269,9 @@
          configuration.getRuntimeConfig().setChannel(channel);
       }
 
+      // Channel.LOCAL *must* be set to false so we don't see our own messages - otherwise invalidations targeted at
+      // remote instances will be received by self.
+      channel.setOpt(Channel.LOCAL, false);
       channel.setOpt(Channel.AUTO_RECONNECT, true);
       channel.setOpt(Channel.AUTO_GETSTATE, fetchState);
       channel.setOpt(Channel.BLOCK, true);
@@ -373,53 +373,28 @@
 
    // ------------ START: RPC call methods ------------
 
-   public List<Object> callRemoteMethods(List<Address> recipients, ReplicableCommand command, int mode, boolean excludeSelf, long timeout, boolean useOutOfBandMessage) throws Exception
+   public List<Object> callRemoteMethods(Vector<Address> recipients, ReplicableCommand command, int mode, long timeout, boolean useOutOfBandMessage) throws Exception
    {
-      return callRemoteMethods(recipients, command, mode, excludeSelf, timeout, null, useOutOfBandMessage);
+      return callRemoteMethods(recipients, command, mode, timeout, null, useOutOfBandMessage);
    }
 
-   public List<Object> callRemoteMethods(List<Address> recipients, ReplicableCommand command, boolean synchronous, boolean excludeSelf, int timeout, boolean useOutOfBandMessage) throws Exception
+   public List<Object> callRemoteMethods(Vector<Address> recipients, ReplicableCommand command, boolean synchronous, long timeout, boolean useOutOfBandMessage) throws Exception
    {
-      return callRemoteMethods(recipients, command, synchronous ? GroupRequest.GET_ALL : GroupRequest.GET_NONE, excludeSelf, timeout, useOutOfBandMessage);
+      return callRemoteMethods(recipients, command, synchronous ? GroupRequest.GET_ALL : GroupRequest.GET_NONE, timeout, useOutOfBandMessage);
    }
 
-   public List<Object> callRemoteMethods(List<Address> recipients, ReplicableCommand command, int mode, boolean excludeSelf, long timeout, RspFilter responseFilter, boolean useOutOfBandMessage) throws Exception
+   public List<Object> callRemoteMethods(Vector<Address> recipients, ReplicableCommand command, int mode, long timeout, RspFilter responseFilter, boolean useOutOfBandMessage) throws Exception
    {
+      // short circuit if we don't have an RpcDispatcher!
+      if (disp == null) return null;
+
       int modeToUse = mode;
       int preferredMode;
       if ((preferredMode = spi.getInvocationContext().getOptionOverrides().getGroupRequestMode()) > -1)
          modeToUse = preferredMode;
 
-      RspList rsps;
-      List<Object> retval;
-      Vector<Address> validMembers;
-
-      if (disp == null)
-      {
-         return null;
-      }
-
-      if (recipients != null)
-         validMembers = new Vector<Address>(recipients);
-      else
-      {
-         validMembers = new Vector<Address>(members);
-      }
-
-      if (excludeSelf && !validMembers.isEmpty())
-      {
-         Address local_addr = getLocalAddress();
-         if (local_addr != null) validMembers.remove(local_addr);
-      }
-
-      if (validMembers.isEmpty())
-      {
-         if (trace) log.trace("destination list is empty, discarding call");
-         return null;
-      }
-
       if (trace)
-         log.trace("callRemoteMethods(): valid members are " + validMembers + " methods: " + command + " Using OOB? " + useOutOfBandMessage);
+         log.trace("callRemoteMethods(): valid members are " + recipients + " methods: " + command + " Using OOB? " + useOutOfBandMessage);
 
       if (channel.flushSupported())
       {
@@ -429,13 +404,12 @@
 
       useOutOfBandMessage = false;
 
-      rsps = disp.invokeRemoteCommands(validMembers, command, modeToUse, timeout, isUsingBuddyReplication, useOutOfBandMessage, responseFilter);
+      RspList rsps = disp.invokeRemoteCommands(recipients, command, modeToUse, timeout, isUsingBuddyReplication, useOutOfBandMessage, responseFilter);
 
       // a null response is 99% likely to be due to a marshalling problem - we throw a NSE, this needs to be changed when
       // JGroups supports http://jira.jboss.com/jira/browse/JGRP-193
       if (rsps == null)
       {
-         // return null;
          throw new NotSerializableException("RpcDispatcher returned a null.  This is most often caused by args for " + command.getClass().getSimpleName() + " not being serializable.");
       }
       if (mode == GroupRequest.GET_NONE) return Collections.emptyList();// async case
@@ -443,7 +417,7 @@
       if (trace)
          log.trace("(" + getLocalAddress() + "): responses for method " + command.getClass().getSimpleName() + ":\n" + rsps);
 
-      retval = new ArrayList<Object>(rsps.size());
+      List<Object> retval = new ArrayList<Object>(rsps.size());
 
       for (Rsp rsp : rsps.values())
       {
@@ -561,10 +535,10 @@
    public List<Address> getMembers()
    {
       if (isInLocalMode) return null;
-      synchronized (members)
-      {
-         return new ArrayList<Address>(members);
-      }
+      if (members == null)
+         return Collections.emptyList();
+      else
+         return members;
    }
 
    public boolean isCoordinator()
@@ -579,14 +553,14 @@
          return null;
       }
 
-      synchronized (members)
+      synchronized (coordinatorLock)
       {
-         while (members.isEmpty())
+         while (members == null || members.isEmpty())
          {
             log.debug("getCoordinator(): waiting on viewAccepted()");
             try
             {
-               members.wait();
+               coordinatorLock.wait();
             }
             catch (InterruptedException e)
             {
@@ -594,7 +568,7 @@
                break;
             }
          }
-         return members.size() > 0 ? members.get(0) : null;
+         return members != null && members.size() > 0 ? members.get(0) : null;
       }
    }
 
@@ -605,40 +579,42 @@
    protected class MembershipListenerAdaptor implements ExtendedMembershipListener
    {
 
-      public void viewAccepted(View new_view)
+      public void viewAccepted(View newView)
       {
-         Vector<Address> new_mbrs = new_view.getMembers();
-         if (log.isInfoEnabled()) log.info("Received new cluster view: " + new_view);
-         synchronized (members)
+         Vector<Address> newMembers = newView.getMembers();
+         if (log.isInfoEnabled()) log.info("Received new cluster view: " + newView);
+         synchronized (coordinatorLock)
          {
             boolean needNotification = false;
-            if (new_mbrs != null)
+            if (newMembers != null)
             {
-               // Determine what members have been removed
-               // and roll back any tx and break any locks
-               Vector<Address> removed = new Vector<Address>(members);
-               removed.removeAll(new_mbrs);
-               removeLocksForDeadMembers(spi.getRoot(), removed);
+               if (members != null)
+               {
+                  // we had a membership list before this event.  Check to make sure we haven't lost any members,
+                  // and if so, determine what members have been removed
+                  // and roll back any tx and break any locks
+                  List<Address> removed = new ArrayList<Address>(members);
+                  removed.removeAll(newMembers);
+                  removeLocksForDeadMembers(spi.getRoot(), removed);
+               }
 
-               members.clear();
-               members.addAll(new_mbrs);
+               members = new ArrayList<Address>(newMembers); // defensive copy.
 
                needNotification = true;
             }
 
             // Now that we have a view, figure out if we are the coordinator
-            coordinator = (members.size() != 0 && members.get(0).equals(getLocalAddress()));
+            coordinator = (members != null && members.size() != 0 && members.get(0).equals(getLocalAddress()));
 
             // now notify listeners - *after* updating the coordinator. - JBCACHE-662
             if (needNotification && notifier != null)
             {
                InvocationContext ctx = spi.getInvocationContext();
-               notifier.notifyViewChange(new_view, ctx);
+               notifier.notifyViewChange(newView, ctx);
             }
 
-            // Wake up any threads that are waiting to know who the members
-            // are so they can figure out who the coordinator is
-            members.notifyAll();
+            // Wake up any threads that are waiting to know about who the coordinator is
+            coordinatorLock.notifyAll();
          }
       }
 

Modified: core/trunk/src/main/java/org/jboss/cache/buddyreplication/BuddyGroup.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/buddyreplication/BuddyGroup.java	2008-04-30 15:51:35 UTC (rev 5774)
+++ core/trunk/src/main/java/org/jboss/cache/buddyreplication/BuddyGroup.java	2008-04-30 16:10:12 UTC (rev 5775)
@@ -15,7 +15,7 @@
 import java.util.Collections;
 import java.util.Date;
 import java.util.List;
-import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.Vector;
 
 /**
  * Value object that represents a buddy group
@@ -39,7 +39,7 @@
    /**
     * List<Address> - a list of JGroups addresses
     */
-   private final List<Address> buddies = new CopyOnWriteArrayList<Address>();
+   private final Vector<Address> buddies = new Vector<Address>();
 
    public String getGroupName()
    {
@@ -97,4 +97,14 @@
       return b.toString();
    }
 
+   /**
+    * Added in 2.2.0 as an optimisation for JGroups which internally uses vectors.
+    *
+    * @return a list of buddies
+    * @since 2.2.0
+    */
+   public Vector<Address> getBuddiesAsVector()
+   {
+      return buddies;
+   }
 }

Modified: core/trunk/src/main/java/org/jboss/cache/buddyreplication/BuddyManager.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/buddyreplication/BuddyManager.java	2008-04-30 15:51:35 UTC (rev 5774)
+++ core/trunk/src/main/java/org/jboss/cache/buddyreplication/BuddyManager.java	2008-04-30 16:10:12 UTC (rev 5775)
@@ -639,6 +639,16 @@
       return buddyGroup.getBuddies();
    }
 
+   /**
+    * Created as an optimisation for JGroups, which uses vectors.
+    *
+    * @since 2.2.0
+    */
+   public Vector<Address> getBuddyAddressesAsVector()
+   {
+      return buddyGroup.getBuddiesAsVector();
+   }
+
    public List<Address> getMembersOutsideBuddyGroup()
    {
       List<Address> members = new ArrayList<Address>(rpcManager.getMembers());
@@ -981,7 +991,7 @@
          }
       }
 
-      rpcManager.callRemoteMethods(recipients, call, true, true, config.getBuddyCommunicationTimeout(), false);
+      rpcManager.callRemoteMethods(new Vector<Address>(recipients), call, true, config.getBuddyCommunicationTimeout(), false);
    }
 
 

Modified: core/trunk/src/main/java/org/jboss/cache/cluster/ReplicationQueue.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/cluster/ReplicationQueue.java	2008-04-30 15:51:35 UTC (rev 5774)
+++ core/trunk/src/main/java/org/jboss/cache/cluster/ReplicationQueue.java	2008-04-30 16:10:12 UTC (rev 5775)
@@ -154,7 +154,7 @@
 
             ReplicateCommand replicateCommand = commandsFactory.buildReplicateCommand(toReplicate);
             // send to all live nodes in the cluster
-            rpcManager.callRemoteMethods(null, replicateCommand, false, true, 5000, false);
+            rpcManager.callRemoteMethods(null, replicateCommand, false, configuration.getSyncReplTimeout(), false);
          }
          catch (Throwable t)
          {

Modified: core/trunk/src/main/java/org/jboss/cache/commands/write/InvalidateCommand.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/commands/write/InvalidateCommand.java	2008-04-30 15:51:35 UTC (rev 5774)
+++ core/trunk/src/main/java/org/jboss/cache/commands/write/InvalidateCommand.java	2008-04-30 16:10:12 UTC (rev 5775)
@@ -2,9 +2,13 @@
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.jboss.cache.*;
+import org.jboss.cache.CacheException;
+import org.jboss.cache.CacheSPI;
+import org.jboss.cache.Fqn;
+import org.jboss.cache.InvocationContext;
+import org.jboss.cache.Node;
+import org.jboss.cache.NodeSPI;
 import org.jboss.cache.commands.Visitor;
-import org.jboss.cache.config.Configuration;
 import org.jboss.cache.config.Option;
 import org.jboss.cache.optimistic.DataVersion;
 
@@ -118,7 +122,10 @@
 
       // mark the node to be removed (and all children) as invalid so anyone holding a direct reference to it will
       // be aware that it is no longer valid.
-      ((NodeSPI) node).setValid(false, true);
+      NodeSPI nSPI = (NodeSPI) node;
+      nSPI.setValid(false, true);
+      // root nodes can never be invalid
+      if (fqn.isRoot()) nSPI.setValid(true, false); // non-recursive.
 
       if (dataVersion != null)
       {

Modified: core/trunk/src/main/java/org/jboss/cache/interceptors/BaseRpcInterceptor.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/interceptors/BaseRpcInterceptor.java	2008-04-30 15:51:35 UTC (rev 5774)
+++ core/trunk/src/main/java/org/jboss/cache/interceptors/BaseRpcInterceptor.java	2008-04-30 16:10:12 UTC (rev 5775)
@@ -22,6 +22,7 @@
 
 import javax.transaction.Transaction;
 import java.util.List;
+import java.util.Vector;
 
 /**
  * Acts as a base for all RPC calls - subclassed by
@@ -90,7 +91,7 @@
       replicateCall(ctx, null, call, sync, o, false);
    }
 
-   protected void replicateCall(InvocationContext ctx, List<Address> recipients, ReplicableCommand c, boolean sync, Option o, boolean useOutOfBandMessage) throws Throwable
+   protected void replicateCall(InvocationContext ctx, Vector<Address> recipients, ReplicableCommand c, boolean sync, Option o, boolean useOutOfBandMessage) throws Throwable
    {
       long syncReplTimeout = configuration.getSyncReplTimeout();
 
@@ -116,10 +117,10 @@
          }
       }
 
-      replicateCall(recipients, c, sync, true, useOutOfBandMessage, false, (int) syncReplTimeout);
+      replicateCall(recipients, c, sync, true, useOutOfBandMessage, false, syncReplTimeout);
    }
 
-   protected void replicateCall(List<Address> recipients, ReplicableCommand call, boolean sync, boolean wrapCacheCommandInReplicateMethod, boolean useOutOfBandMessage, boolean isBroadcast, int timeout) throws Throwable
+   protected void replicateCall(Vector<Address> recipients, ReplicableCommand call, boolean sync, boolean wrapCacheCommandInReplicateMethod, boolean useOutOfBandMessage, boolean isBroadcast, long timeout) throws Throwable
    {
       if (trace) log.trace("Broadcasting call " + call + " to recipient list " + recipients);
 
@@ -132,10 +133,10 @@
       {
          if (usingBuddyReplication && !isBroadcast) call = buddyManager.transformFqns((VisitableCommand) call);
 
-         List<Address> callRecipients = recipients;
+         Vector<Address> callRecipients = recipients;
          if (callRecipients == null)
          {
-            callRecipients = usingBuddyReplication && !isBroadcast ? buddyManager.getBuddyAddresses() : rpcManager.getMembers();
+            callRecipients = usingBuddyReplication && !isBroadcast ? buddyManager.getBuddyAddressesAsVector() : null;
             if (trace)
                log.trace("Setting call recipients to " + callRecipients + " since the original list of recipients passed in is null.");
          }
@@ -145,7 +146,6 @@
          List rsps = rpcManager.callRemoteMethods(callRecipients,
                toCall,
                sync, // is synchronised?
-               true, // ignore self?
                timeout,
                useOutOfBandMessage
          );

Modified: core/trunk/src/main/java/org/jboss/cache/interceptors/DataGravitatorInterceptor.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/interceptors/DataGravitatorInterceptor.java	2008-04-30 15:51:35 UTC (rev 5774)
+++ core/trunk/src/main/java/org/jboss/cache/interceptors/DataGravitatorInterceptor.java	2008-04-30 16:10:12 UTC (rev 5775)
@@ -310,7 +310,7 @@
       GravitateDataCommand command = commandsFactory.buildGravitateDataCommand(fqn, searchSubtrees);
       // doing a GET_ALL is crappy but necessary since JGroups' GET_FIRST could return null results from nodes that do
       // not have either the primary OR backup, and stop polling other valid nodes.
-      List resps = rpcManager.callRemoteMethods(mbrs, command, GroupRequest.GET_ALL, true, buddyManager.getBuddyCommunicationTimeout(), new ResponseValidityFilter(mbrs, rpcManager.getLocalAddress()), false);
+      List resps = rpcManager.callRemoteMethods(null, command, GroupRequest.GET_ALL, buddyManager.getBuddyCommunicationTimeout(), new ResponseValidityFilter(mbrs, rpcManager.getLocalAddress()), false);
 
       if (trace) log.trace("got responses " + resps);
 

Modified: core/trunk/src/main/java/org/jboss/cache/interceptors/InvalidationInterceptor.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/interceptors/InvalidationInterceptor.java	2008-04-30 15:51:35 UTC (rev 5774)
+++ core/trunk/src/main/java/org/jboss/cache/interceptors/InvalidationInterceptor.java	2008-04-30 16:10:12 UTC (rev 5775)
@@ -81,12 +81,11 @@
    {
       Option optionOverride = ctx.getOptionOverrides();
       if (optionOverride != null && optionOverride.isCacheModeLocal() && (ctx.getTransaction() == null))
-      //|| MethodDeclarations.isTransactionLifecycleMethod(ctx.getMethodCall().getMethodId())))
       {
          // skip replication!!
          return true;
       }
-      if (trace) log.trace("(" + rpcManager.getLocalAddress() + ") method call " + ctx.getMethodCall());
+      if (trace) log.trace("(" + rpcManager.getLocalAddress() + ") Command " + ctx.getCommand());
       return false;
    }
 
@@ -134,7 +133,7 @@
       Transaction tx = ctx.getTransaction();
       if (tx != null && !optimistic)
       {
-         log.debug("Entering InvalidationInterceptor's prepare phase");
+         if (trace) log.trace("Entering InvalidationInterceptor's prepare phase");
          // fetch the modifications before the transaction is committed (and thus removed from the txTable)
          GlobalTransaction gtx = ctx.getGlobalTransaction();
          TransactionEntry entry = txTable.get(gtx);
@@ -146,7 +145,7 @@
          }
          else
          {
-            log.debug("Nothing to invalidate - no modifications in the transaction.");
+            if (trace) log.trace("Nothing to invalidate - no modifications in the transaction.");
          }
       }
       return retval;
@@ -182,7 +181,7 @@
          GlobalTransaction gtx = ctx.getGlobalTransaction();
          List<ReversibleCommand> modifications = txMods.remove(gtx);
          broadcastInvalidate(modifications, gtx, tx, ctx);
-         log.debug("Committing.  Broadcasting invalidations.");
+         if (trace) log.trace("Committing.  Broadcasting invalidations.");
       }
       return retval;
    }

Modified: core/trunk/src/main/java/org/jboss/cache/invocation/NodeInvocationDelegate.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/invocation/NodeInvocationDelegate.java	2008-04-30 15:51:35 UTC (rev 5774)
+++ core/trunk/src/main/java/org/jboss/cache/invocation/NodeInvocationDelegate.java	2008-04-30 16:10:12 UTC (rev 5775)
@@ -493,7 +493,7 @@
 
    protected void assertValid()
    {
-      if (!node.isValid())
+      if (!getFqn().isRoot() && !node.isValid())
          throw new NodeNotValidException("Node " + getFqn() + " is not valid.  Perhaps it has been moved or removed.");
    }
 

Modified: core/trunk/src/main/java/org/jboss/cache/loader/ClusteredCacheLoader.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/loader/ClusteredCacheLoader.java	2008-04-30 15:51:35 UTC (rev 5774)
+++ core/trunk/src/main/java/org/jboss/cache/loader/ClusteredCacheLoader.java	2008-04-30 16:10:12 UTC (rev 5775)
@@ -112,11 +112,10 @@
    private Object callRemote(DataCommand dataCommand) throws Exception
    {
       if (trace) log.trace("cache=" + cache.getLocalAddress() + "; calling with " + dataCommand);
-      List<Address> mbrs = cache.getMembers();
       ClusteredGetCommand clusteredGet = commandsFactory.buildClusteredGetCommand(false, dataCommand);
-      List resps = null;
+      List resps;
       // JBCACHE-1186
-      resps = cache.getRPCManager().callRemoteMethods(mbrs, clusteredGet, GroupRequest.GET_ALL, true, config.getTimeout(), new ResponseValidityFilter(mbrs, cache.getLocalAddress()), false);
+      resps = cache.getRPCManager().callRemoteMethods(null, clusteredGet, GroupRequest.GET_ALL, config.getTimeout(), new ResponseValidityFilter(cache.getMembers(), cache.getLocalAddress()), false);
 
       if (resps == null)
       {

Modified: core/trunk/src/test/java/org/jboss/cache/api/NodeReplicatedMoveTest.java
===================================================================
--- core/trunk/src/test/java/org/jboss/cache/api/NodeReplicatedMoveTest.java	2008-04-30 15:51:35 UTC (rev 5774)
+++ core/trunk/src/test/java/org/jboss/cache/api/NodeReplicatedMoveTest.java	2008-04-30 16:10:12 UTC (rev 5775)
@@ -11,6 +11,7 @@
 import org.jboss.cache.DefaultCacheFactory;
 import org.jboss.cache.Fqn;
 import org.jboss.cache.Node;
+import org.jboss.cache.NodeNotExistsException;
 import org.jboss.cache.NodeSPI;
 import org.jboss.cache.config.Configuration;
 import org.jboss.cache.config.Configuration.CacheMode;
@@ -93,8 +94,11 @@
       cache2.stop();
       cache1.destroy();
       cache2.destroy();
-      cache1.getConfiguration().setNodeLockingScheme(Configuration.NodeLockingScheme.OPTIMISTIC);
-      cache2.getConfiguration().setNodeLockingScheme(Configuration.NodeLockingScheme.OPTIMISTIC);
+      if (optimistic)
+      {
+         cache1.getConfiguration().setNodeLockingScheme(Configuration.NodeLockingScheme.OPTIMISTIC);
+         cache2.getConfiguration().setNodeLockingScheme(Configuration.NodeLockingScheme.OPTIMISTIC);
+      }
       cache1.getConfiguration().setCacheMode(Configuration.CacheMode.INVALIDATION_SYNC);
       cache2.getConfiguration().setCacheMode(Configuration.CacheMode.INVALIDATION_SYNC);
       cache1.start();
@@ -109,8 +113,8 @@
       assertEquals(vA, cache1.getRoot().getChild(A).get(k));
       assertEquals(vB, cache1.getRoot().getChild(A).getChild(B).get(k));
 
-      assertInvalidated(cache2, A, "Should be invalidated");
-      assertInvalidated(cache2, Fqn.fromRelativeElements(A, B.getLastElement()), "Should be invalidated");
+      assertInvalidated(cache2, A, "Should be invalidated", optimistic);
+      assertInvalidated(cache2, Fqn.fromRelativeElements(A, B.getLastElement()), "Should be invalidated", optimistic);
 
       // now move...
       cache1.move(nodeB.getFqn(), Fqn.ROOT);
@@ -118,22 +122,30 @@
       assertEquals(vA, cache1.getRoot().getChild(A).get(k));
       assertEquals(vB, cache1.getRoot().getChild(B).get(k));
 
-      assertInvalidated(cache2, A, "Should be invalidated");
-      assertInvalidated(cache2, B, "Should be invalidated");
+      assertInvalidated(cache2, A, "Should be invalidated", optimistic);
+      assertInvalidated(cache2, B, "Should be invalidated", optimistic);
 
       // now make sure a node exists on cache 2
       cache2.getRoot().addChild(A).put("k2", "v2");
 
       // te invalidation will happen in afterCompletion, hence no exception!
-      cache1.move(B, A);// should throw an NPE
+      try
+      {
+         cache1.move(B, A);// should throw an NPE
+         if (!optimistic) assert false : "Should throw an exception!";
+      }
+      catch (NodeNotExistsException expected)
+      {
+         if (optimistic) assert false : "Should not have thrown an exception!";
+      }
    }
 
-   private void assertInvalidated(Cache cache, Fqn fqn, String msg)
+   private void assertInvalidated(Cache cache, Fqn fqn, String msg, boolean optimistic)
    {
       assert cache.getRoot().getChild(fqn) == null : msg;
       NodeSPI n = ((CacheSPI) cache).peek(fqn, true, true);
-      assert n != null : msg;
-      assert !n.isValid() : msg;
+      assert n == null || optimistic : msg;
+      assert !optimistic || !n.isValid() : msg;
    }
 
    public void testReplTxCommit() throws Exception

Modified: core/trunk/src/test/java/org/jboss/cache/api/pfer/PutForExternalReadTestBase.java
===================================================================
--- core/trunk/src/test/java/org/jboss/cache/api/pfer/PutForExternalReadTestBase.java	2008-04-30 15:51:35 UTC (rev 5774)
+++ core/trunk/src/test/java/org/jboss/cache/api/pfer/PutForExternalReadTestBase.java	2008-04-30 16:10:12 UTC (rev 5775)
@@ -28,6 +28,7 @@
 import javax.transaction.Transaction;
 import javax.transaction.TransactionManager;
 import java.util.List;
+import java.util.Vector;
 
 @Test(groups = {"functional", "jgroups", "transaction"})
 public abstract class PutForExternalReadTestBase
@@ -158,7 +159,7 @@
          assertEquals("PFER should have been a no-op", value, cache2.get(fqn, key));
    }
 
-   private List<Address> anyAddresses()
+   private Vector<Address> anyAddresses()
    {
       anyObject();
       return null;
@@ -178,7 +179,7 @@
       {
          // specify what we expect called on the mock Rpc Manager.  For params we don't care about, just use ANYTHING.
          // setting the mock object to expect the "sync" param to be false.
-         expect(rpcManager.callRemoteMethods(anyAddresses(), (ReplicableCommand) anyObject(), eq(false), anyBoolean(), anyInt(), anyBoolean())).andReturn(null);
+         expect(rpcManager.callRemoteMethods(anyAddresses(), (ReplicableCommand) anyObject(), eq(false), anyLong(), anyBoolean())).andReturn(null);
       }
 
       replay(rpcManager);
@@ -241,7 +242,7 @@
          List<Address> memberList = originalRpcManager.getMembers();
          expect(barfingRpcManager.getMembers()).andReturn(memberList).anyTimes();
          expect(barfingRpcManager.getLocalAddress()).andReturn(originalRpcManager.getLocalAddress()).anyTimes();
-         expect(barfingRpcManager.callRemoteMethods(anyAddresses(), (ReplicableCommand) anyObject(), anyBoolean(), anyBoolean(), anyInt(), anyBoolean())).andThrow(new RuntimeException("Barf!")).anyTimes();
+         expect(barfingRpcManager.callRemoteMethods(anyAddresses(), (ReplicableCommand) anyObject(), anyBoolean(), anyLong(), anyBoolean())).andThrow(new RuntimeException("Barf!")).anyTimes();
          replay(barfingRpcManager);
 
          TestingUtil.extractComponentRegistry(cache1).registerComponent(RPCManager.class.getName(), barfingRpcManager, RPCManager.class);

Modified: core/trunk/src/test/java/org/jboss/cache/cluster/ReplicationQueueTest.java
===================================================================
--- core/trunk/src/test/java/org/jboss/cache/cluster/ReplicationQueueTest.java	2008-04-30 15:51:35 UTC (rev 5774)
+++ core/trunk/src/test/java/org/jboss/cache/cluster/ReplicationQueueTest.java	2008-04-30 16:10:12 UTC (rev 5775)
@@ -16,7 +16,7 @@
 import org.testng.annotations.Test;
 
 import java.util.Collections;
-import java.util.List;
+import java.util.Vector;
 import java.util.concurrent.CountDownLatch;
 
 @Test(groups = "functional")
@@ -82,7 +82,7 @@
 
       // now try the last PUT which should result in the queue being flushed.
       expect(mockRpcManager.getMembers()).andReturn(originalRpcManager.getMembers()).anyTimes();
-      expect(mockRpcManager.callRemoteMethods((List<Address>) anyObject(), (ReplicableCommand) anyObject(), anyBoolean(), anyBoolean(), anyInt(), anyBoolean())).andReturn(Collections.emptyList()).anyTimes();
+      expect(mockRpcManager.callRemoteMethods((Vector<Address>) anyObject(), (ReplicableCommand) anyObject(), anyBoolean(), anyLong(), anyBoolean())).andReturn(Collections.emptyList()).anyTimes();
       replay(mockRpcManager);
 
       cache.put("/a/b/c/LAST", "k", "v");
@@ -111,7 +111,7 @@
 
       // expect basic cluster related calls
       expect(mockRpcManager.getMembers()).andReturn(originalRpcManager.getMembers()).anyTimes();
-      expect(mockRpcManager.callRemoteMethods((List<Address>) anyObject(), (ReplicableCommand) anyObject(), anyBoolean(), anyBoolean(), anyInt(), anyBoolean())).andReturn(Collections.emptyList()).anyTimes();
+      expect(mockRpcManager.callRemoteMethods((Vector<Address>) anyObject(), (ReplicableCommand) anyObject(), anyBoolean(), anyLong(), anyBoolean())).andReturn(Collections.emptyList()).anyTimes();
       replay(mockRpcManager);
 
       Thread[] threads = new Thread[numThreads];

Modified: core/trunk/src/test/java/org/jboss/cache/marshall/ReturnValueMarshallingTest.java
===================================================================
--- core/trunk/src/test/java/org/jboss/cache/marshall/ReturnValueMarshallingTest.java	2008-04-30 15:51:35 UTC (rev 5774)
+++ core/trunk/src/test/java/org/jboss/cache/marshall/ReturnValueMarshallingTest.java	2008-04-30 16:10:12 UTC (rev 5775)
@@ -102,7 +102,7 @@
       DataCommand command = new GetKeyValueCommand(fqn, key, false);
       ClusteredGetCommand clusteredGet = new ClusteredGetCommand(false, command);
 
-      List responses = cache1.getRPCManager().callRemoteMethods(null, clusteredGet, true, true, 15000, false);
+      List responses = cache1.getRPCManager().callRemoteMethods(null, clusteredGet, true, 15000, false);
       List response1 = (List) responses.get(0);// response from the first (and only) node
 
       Boolean found = (Boolean) response1.get(0);
@@ -130,7 +130,7 @@
 
       GravitateDataCommand gravitateDataCommand = new GravitateDataCommand(fqn, false);
 
-      List responses = cache1.getRPCManager().callRemoteMethods(null, gravitateDataCommand, true, true, 15000, false);
+      List responses = cache1.getRPCManager().callRemoteMethods(null, gravitateDataCommand, true, 15000, false);
       GravitateResult data = (GravitateResult) responses.get(0);// response from the first (and only) node
 
       assertTrue("Should have found remote data", data.isDataFound());

Modified: core/trunk/src/test/java/org/jboss/cache/transaction/PrepareCommitContentionTest.java
===================================================================
--- core/trunk/src/test/java/org/jboss/cache/transaction/PrepareCommitContentionTest.java	2008-04-30 15:51:35 UTC (rev 5774)
+++ core/trunk/src/test/java/org/jboss/cache/transaction/PrepareCommitContentionTest.java	2008-04-30 16:10:12 UTC (rev 5775)
@@ -22,6 +22,7 @@
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Vector;
 
 /**
  * This is to test the scenario described in http://jira.jboss.org/jira/browse/JBCACHE-1270
@@ -101,22 +102,22 @@
          }
       }
 
-      public List<Object> callRemoteMethods(List<Address> recipients, ReplicableCommand cacheCommand, int mode, boolean excludeSelf, long timeout, RspFilter responseFilter, boolean useOutOfBandMessage) throws Exception
+      public List<Object> callRemoteMethods(Vector<Address> recipients, ReplicableCommand cacheCommand, int mode, long timeout, RspFilter responseFilter, boolean useOutOfBandMessage) throws Exception
       {
          logCall(cacheCommand, useOutOfBandMessage);
-         return delegate.callRemoteMethods(recipients, cacheCommand, mode, excludeSelf, timeout, responseFilter, useOutOfBandMessage);
+         return delegate.callRemoteMethods(recipients, cacheCommand, mode, timeout, responseFilter, useOutOfBandMessage);
       }
 
-      public List<Object> callRemoteMethods(List<Address> recipients, ReplicableCommand cacheCommand, int mode, boolean excludeSelf, long timeout, boolean useOutOfBandMessage) throws Exception
+      public List<Object> callRemoteMethods(Vector<Address> recipients, ReplicableCommand cacheCommand, int mode, long timeout, boolean useOutOfBandMessage) throws Exception
       {
          logCall(cacheCommand, useOutOfBandMessage);
-         return delegate.callRemoteMethods(recipients, cacheCommand, mode, excludeSelf, timeout, useOutOfBandMessage);
+         return delegate.callRemoteMethods(recipients, cacheCommand, mode, timeout, useOutOfBandMessage);
       }
 
-      public List<Object> callRemoteMethods(List<Address> recipients, ReplicableCommand command, boolean synchronous, boolean excludeSelf, int timeout, boolean useOutOfBandMessage) throws Exception
+      public List<Object> callRemoteMethods(Vector<Address> recipients, ReplicableCommand command, boolean synchronous, long timeout, boolean useOutOfBandMessage) throws Exception
       {
          logCall(command, useOutOfBandMessage);
-         return delegate.callRemoteMethods(recipients, command, synchronous, excludeSelf, timeout, useOutOfBandMessage);
+         return delegate.callRemoteMethods(recipients, command, synchronous, timeout, useOutOfBandMessage);
       }
 
       public boolean isCoordinator()




More information about the jbosscache-commits mailing list