[jbosscache-commits] JBoss Cache SVN: r5161 - in core/trunk/src: main/java/org/jboss/cache/buddyreplication and 7 other directories.

jbosscache-commits at lists.jboss.org jbosscache-commits at lists.jboss.org
Thu Jan 17 17:34:36 EST 2008


Author: manik.surtani at jboss.com
Date: 2008-01-17 17:34:36 -0500 (Thu, 17 Jan 2008)
New Revision: 5161

Added:
   core/trunk/src/main/java/org/jboss/cache/factories/ReplicationQueueFactory.java
Modified:
   core/trunk/src/main/java/org/jboss/cache/CacheImpl.java
   core/trunk/src/main/java/org/jboss/cache/CacheSPI.java
   core/trunk/src/main/java/org/jboss/cache/RPCManager.java
   core/trunk/src/main/java/org/jboss/cache/RPCManagerImpl.java
   core/trunk/src/main/java/org/jboss/cache/RegionManager.java
   core/trunk/src/main/java/org/jboss/cache/ReplicationQueue.java
   core/trunk/src/main/java/org/jboss/cache/buddyreplication/BuddyManager.java
   core/trunk/src/main/java/org/jboss/cache/factories/ComponentRegistry.java
   core/trunk/src/main/java/org/jboss/cache/factories/EmptyConstructorFactory.java
   core/trunk/src/main/java/org/jboss/cache/factories/InterceptorChainFactory.java
   core/trunk/src/main/java/org/jboss/cache/interceptors/BaseRpcInterceptor.java
   core/trunk/src/main/java/org/jboss/cache/interceptors/InvocationContextInterceptor.java
   core/trunk/src/main/java/org/jboss/cache/invocation/CacheInvocationDelegate.java
   core/trunk/src/main/java/org/jboss/cache/lock/LockUtil.java
   core/trunk/src/test/java/org/jboss/cache/api/CacheSPITest.java
   core/trunk/src/test/java/org/jboss/cache/api/NodeAPITest.java
   core/trunk/src/test/java/org/jboss/cache/api/pfer/PutForExternalReadTestBase.java
   core/trunk/src/test/java/org/jboss/cache/misc/TestingUtil.java
Log:
Refactored RPCManager and CacheImpl so that the channel is now created and owned solely by the RPCManager

Modified: core/trunk/src/main/java/org/jboss/cache/CacheImpl.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/CacheImpl.java	2008-01-17 15:22:36 UTC (rev 5160)
+++ core/trunk/src/main/java/org/jboss/cache/CacheImpl.java	2008-01-17 22:34:36 UTC (rev 5161)
@@ -11,7 +11,6 @@
 import org.jboss.cache.buddyreplication.BuddyManager;
 import org.jboss.cache.buddyreplication.GravitateResult;
 import org.jboss.cache.config.Configuration;
-import org.jboss.cache.config.RuntimeConfig;
 import org.jboss.cache.factories.ComponentRegistry;
 import org.jboss.cache.factories.InterceptorChainFactory;
 import org.jboss.cache.factories.annotations.ComponentName;
@@ -22,11 +21,9 @@
 import org.jboss.cache.loader.CacheLoaderManager;
 import org.jboss.cache.lock.IsolationLevel;
 import org.jboss.cache.lock.LockStrategyFactory;
-import org.jboss.cache.lock.LockUtil;
 import org.jboss.cache.lock.LockingException;
 import org.jboss.cache.lock.NodeLock;
 import org.jboss.cache.lock.TimeoutException;
-import org.jboss.cache.marshall.InactiveRegionAwareRpcDispatcher;
 import org.jboss.cache.marshall.Marshaller;
 import org.jboss.cache.marshall.MethodCall;
 import org.jboss.cache.marshall.MethodCallFactory;
@@ -35,29 +32,27 @@
 import org.jboss.cache.notifications.Notifier;
 import org.jboss.cache.notifications.event.NodeModifiedEvent;
 import org.jboss.cache.optimistic.DataVersion;
-import org.jboss.cache.remoting.jgroups.CacheMessageListener;
-import org.jboss.cache.statetransfer.StateTransferManager;
 import org.jboss.cache.transaction.GlobalTransaction;
 import org.jboss.cache.transaction.OptimisticTransactionEntry;
 import org.jboss.cache.transaction.TransactionEntry;
 import org.jboss.cache.transaction.TransactionTable;
 import org.jboss.cache.util.CachePrinter;
-import org.jboss.cache.util.ThreadGate;
-import org.jboss.cache.util.reflect.ReflectionUtil;
-import org.jgroups.*;
-import org.jgroups.blocks.GroupRequest;
-import org.jgroups.blocks.RpcDispatcher;
-import org.jgroups.blocks.RspFilter;
-import org.jgroups.util.Rsp;
-import org.jgroups.util.RspList;
+import org.jgroups.Address;
 
 import javax.management.MBeanServerFactory;
 import javax.transaction.Status;
 import javax.transaction.SystemException;
 import javax.transaction.Transaction;
 import javax.transaction.TransactionManager;
-import java.io.NotSerializableException;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
 
 /**
  * The default implementation class of {@link org.jboss.cache.Cache} and {@link org.jboss.cache.CacheSPI}.  This class
@@ -82,11 +77,6 @@
    private Log log = LogFactory.getLog(CacheImpl.class);
 
    /**
-    * Thread gate used to block Dispatcher during JGroups FLUSH protocol
-    */
-   private final ThreadGate flushBlockGate = new ThreadGate();
-
-   /**
     * Root node.
     */
    private NodeSPI root;
@@ -97,31 +87,6 @@
    private RegionManager regionManager = null;
 
    /**
-    * The JGroups JChannel in use.
-    */
-   protected Channel channel = null;
-
-   /**
-    * True if this CacheImpl is the coordinator.
-    */
-   private volatile boolean coordinator = false;
-
-   /**
-    * List of cluster group members.
-    */
-   private final Vector<Address> members = new Vector<Address>();
-
-   /**
-    * JGroups RpcDispatcher in use.
-    */
-   private RpcDispatcher disp = null;
-
-   /**
-    * JGroups message listener.
-    */
-   private CacheMessageListener messageListener;
-
-   /**
     * Maintains mapping of transactions (keys) and Modifications/Undo-Operations
     */
    private TransactionTable transactionTable;
@@ -148,11 +113,6 @@
    private CacheLoaderManager cacheLoaderManager;
 
    /**
-    * Queue used to replicate updates when mode is repl-async
-    */
-   private ReplicationQueue repl_queue = null;
-
-   /**
     * The current lifecycle state.
     */
    CacheStatus cacheStatus;
@@ -185,6 +145,7 @@
     * from a shutdown hook.
     */
    private boolean invokedFromShutdownHook;
+   private RPCManager rpcManager;
 
    /**
     * Constructs an uninitialized CacheImpl.
@@ -218,8 +179,8 @@
    @Inject
    private void injectDependencies(Notifier notifier, RegionManager regionManager, TransactionManager transactionManager, Marshaller marshaller,
                                    TransactionTable transactionTable, NodeFactory nodeFactory,
-                                   CacheSPI spi, CacheMessageListener messageListener, @ComponentName("remoteDelegate")RemoteCacheInvocationDelegate remoteDelegate,
-                                   Interceptor interceptorChain, BuddyManager buddyManager)
+                                   CacheSPI spi, @ComponentName("remoteDelegate")RemoteCacheInvocationDelegate remoteDelegate,
+                                   Interceptor interceptorChain, BuddyManager buddyManager, RPCManager rpcManager)
    {
       this.notifier = notifier;
       this.regionManager = regionManager;
@@ -227,11 +188,11 @@
       this.transactionTable = transactionTable;
       this.nodeFactory = nodeFactory;
       this.spi = spi;
-      this.messageListener = messageListener;
       this.remoteDelegate = remoteDelegate;
       this.marshaller = marshaller;
       this.interceptorChain = interceptorChain;
       this.buddyManager = buddyManager;
+      this.rpcManager = rpcManager;
    }
 
    public Configuration getConfiguration()
@@ -256,34 +217,6 @@
    }
 
    /**
-    * Returns the local channel address.
-    */
-   public Address getLocalAddress()
-   {
-      return channel != null ? channel.getLocalAddress() : null;
-   }
-
-   /**
-    * Returns the members as a List.
-    * This list may be concurrently modified.
-    */
-   public List<Address> getMembers()
-   {
-      synchronized (members)
-      {
-         return new ArrayList<Address>(members);
-      }
-   }
-
-   /**
-    * Returns <code>true</code> if this node is the group coordinator.
-    */
-   public boolean isCoordinator()
-   {
-      return coordinator;
-   }
-
-   /**
     * Returns the transaction table.
     */
    public TransactionTable getTransactionTable()
@@ -291,38 +224,7 @@
       return transactionTable;
    }
 
-   private void setUseReplQueue(boolean flag)
-   {
-      if (flag)
-      {
-         if (repl_queue == null)
-         {
-            repl_queue = new ReplicationQueue(this, configuration.getReplQueueInterval(), configuration.getReplQueueMaxElements());
-            if (configuration.getReplQueueInterval() >= 0)
-            {
-               repl_queue.start();
-            }
-         }
-      }
-      else
-      {
-         if (repl_queue != null)
-         {
-            repl_queue.stop();
-            repl_queue = null;
-         }
-      }
-   }
-
    /**
-    * Returns the replication queue.
-    */
-   public ReplicationQueue getReplicationQueue()
-   {
-      return repl_queue;
-   }
-
-   /**
     * Sets the cache locking isolation level.
     */
    private void setIsolationLevel(IsolationLevel level)
@@ -339,99 +241,6 @@
    }
 
    /**
-    * Fetches the group state from the current coordinator. If successful, this
-    * will trigger JChannel setState() call.
-    */
-   public void fetchState(long timeout) throws ChannelClosedException, ChannelNotConnectedException
-   {
-      if (channel == null)
-      {
-         throw new ChannelNotConnectedException();
-      }
-      boolean rc = channel.getState(null, timeout);
-      if (rc)
-      {
-         log.debug("fetchState(): state was retrieved successfully");
-      }
-      else
-      {
-         log.debug("fetchState(): state could not be retrieved (first member)");
-      }
-   }
-
-   public void fetchPartialState(List<Address> sources, Fqn sourceTarget, Fqn integrationTarget) throws Exception
-   {
-      String encodedStateId = sourceTarget + StateTransferManager.PARTIAL_STATE_DELIMITER + integrationTarget;
-      fetchPartialState(sources, encodedStateId);
-   }
-
-   public void fetchPartialState(List<Address> sources, Fqn subtree) throws Exception
-   {
-      if (subtree == null)
-      {
-         throw new IllegalArgumentException("Cannot fetch partial state. Null subtree.");
-      }
-      fetchPartialState(sources, subtree.toString());
-   }
-
-   private void fetchPartialState(List<Address> sources, String stateId) throws Exception
-   {
-      if (sources == null || sources.isEmpty() || stateId == null)
-      {
-         // should this really be throwing an exception?  Are there valid use cases where partial state may not be available? - Manik
-         // Yes -- cache is configured LOCAL but app doesn't know it -- Brian
-         //throw new IllegalArgumentException("Cannot fetch partial state, targets are " + sources + " and stateId is " + stateId);
-         if (log.isWarnEnabled())
-         {
-            log.warn("Cannot fetch partial state, targets are " + sources +
-                  " and stateId is " + stateId);
-         }
-         return;
-      }
-
-      List<Address> targets = new LinkedList<Address>(sources);
-
-      //skip *this* node as a target
-      targets.remove(getLocalAddress());
-
-      if (targets.isEmpty())
-      {
-         // Definitely no exception here -- this happens every time the 1st node in the
-         // cluster activates a region!! -- Brian
-         log.debug("Cannot fetch partial state. There are no target members specified");
-         return;
-      }
-
-      log.debug("Node " + getLocalAddress() + " fetching partial state " + stateId + " from members " + targets);
-      boolean successfulTransfer = false;
-      for (Address target : targets)
-      {
-         log.debug("Node " + getLocalAddress() + " fetching partial state " + stateId + " from member " + target);
-         messageListener.setStateSet(false);
-         successfulTransfer = channel.getState(target, stateId, configuration.getStateRetrievalTimeout());
-         if (successfulTransfer)
-         {
-            try
-            {
-               messageListener.waitForState();
-            }
-            catch (Exception transferFailed)
-            {
-               successfulTransfer = false;
-            }
-         }
-         log.debug("Node " + getLocalAddress() + " fetching partial state " + stateId + " from member " + target + (successfulTransfer ? " successful" : " failed"));
-         if (successfulTransfer)
-            break;
-      }
-
-      if (!successfulTransfer)
-      {
-         log.debug("Node " + getLocalAddress() + " could not fetch partial state " + stateId + " from any member " + targets);
-      }
-   }
-
-   /**
     * Lifecycle method. This is like initialize.
     *
     * @throws Exception
@@ -493,18 +302,11 @@
       componentRegistry.wire();
       correctRootNodeType();
 
-      setUseReplQueue(configuration.isUseReplQueue());
       setIsolationLevel(configuration.getIsolationLevel());
 
       cacheStatus = CacheStatus.CREATED;
    }
 
-   protected boolean shouldFetchStateOnStartup()
-   {
-      boolean loaderFetch = cacheLoaderManager != null && cacheLoaderManager.isFetchPersistentState();
-      return !configuration.isInactiveOnStartup() && buddyManager == null && (configuration.isFetchInMemoryState() || loaderFetch);
-   }
-
    /**
     * Creates a new root node if one does not exist, or if the existing one does not match the type according to the configuration.
     */
@@ -602,81 +404,17 @@
          case REPL_ASYNC:
          case INVALIDATION_ASYNC:
          case INVALIDATION_SYNC:
-            if (log.isDebugEnabled()) log.debug("cache mode is " + configuration.getCacheMode());
-            initialiseChannelAndRpcDispatcher();
-
-            //connect and transfer state
-            if (shouldFetchStateOnStartup())
-            {
-               try
-               {
-                  long start = System.currentTimeMillis();
-                  channel.connect(configuration.getClusterName(), null, null, configuration.getStateRetrievalTimeout());
-                  // reconfigure log category so that the instance name is reflected as well.
-                  configureLogCategory();
-                  //if I am not the only and the first member than wait for a state to arrive
-                  if (getMembers().size() > 1)
-                  {
-                     messageListener.waitForState();
-                  }
-
-                  if (log.isDebugEnabled())
-                  {
-                     log.debug("connected, state was retrieved successfully (in " + (System.currentTimeMillis() - start)
-                           + " milliseconds)");
-                  }
-               }
-               catch (StateTransferException ste)
-               {
-                  // make sure we disconnect from the channel before we throw this exception!
-                  // JBCACHE-761
-                  channel.disconnect();
-                  channel.close();
-                  throw new CacheException("Unable to fetch state on startup", ste);
-               }
-               catch (ChannelException e)
-               {
-                  throw new CacheException("Unable to connect to JGroups channel", e);
-               }
-               catch (Exception ex)
-               {
-                  throw new CacheException("Unable to fetch state on startup", ex);
-               }
-            }
-            //otherwise just connect
-            else
-            {
-               try
-               {
-                  channel.connect(configuration.getClusterName());
-                  // reconfigure log category so that the instance name is reflected as well.
-                  configureLogCategory();
-               }
-               catch (ChannelException e)
-               {
-                  throw new CacheException("Unable to connect to JGroups channel", e);
-               }
-            }
-            if (log.isInfoEnabled())
-            {
-               log.info("CacheImpl local address is " + channel.getLocalAddress());
-            }
-            if (buddyManager != null && buddyManager.isEnabled())
-            {
-               //buddyManager.init(this);
-               buddyManager.init();
-               if (configuration.isUseReplQueue())
-               {
-                  log.warn("Replication queue not supported when using buddy replication.  Disabling repliction queue.");
-                  configuration.setUseReplQueue(false);
-                  repl_queue = null;
-               }
-            }
+            // reconfigure log category so that the instance name is reflected as well.
+            configureLogCategory();
             break;
          default:
             throw new IllegalArgumentException("cache mode " + configuration.getCacheMode() + " is invalid");
       }
 
+      // these 2 components need to be started manually since they can only be started after ALL other components have started.
+      if (rpcManager != null) rpcManager.start();
+      if (buddyManager != null) buddyManager.init();
+
       //now attempt to preload the cache from the loader - Manik
       if (cacheLoaderManager != null)
       {
@@ -779,25 +517,8 @@
 
       // The rest of these should have already been taken care of in stop,
       // but we do it here as well in case stop failed.
+      rpcManager.stop();
 
-      if (channel != null)
-      {
-         if (channel.isOpen())
-         {
-            try
-            {
-               channel.close();
-               channel.disconnect();
-            }
-            catch (Exception toLog)
-            {
-               log.error("Problem closing channel; setting it to null", toLog);
-            }
-         }
-         channel = null;
-         configuration.getRuntimeConfig().setChannel(null);
-      }
-      disp = null;
       transactionManager = null;
 
       componentRegistry.reset();
@@ -843,35 +564,6 @@
 
       componentRegistry.stop();
 
-      if (channel != null)
-      {
-         log.info("stop(): closing the channel");
-         killChannel();
-         channel = null;
-         configuration.getRuntimeConfig().setChannel(null);
-      }
-
-      if (disp != null)
-      {
-         log.info("stop(): stopping the dispatcher");
-         disp.stop();
-         disp = null;
-      }
-      if (members != null)
-      {
-         synchronized (members)
-         {
-            members.clear();
-         }
-      }
-
-      coordinator = false;
-
-      if (repl_queue != null)
-      {
-         repl_queue.stop();
-      }
-
       if (notifier != null)
       {
          notifier.notifyCacheStopped(spi, spi.getInvocationContext());
@@ -908,37 +600,6 @@
    }
 
    /**
-    * Returns the address of the coordinator or null if there is no
-    * coordinator.
-    * Waits until the membership view is updated.
-    */
-   public Address getCoordinator()
-   {
-      if (channel == null)
-      {
-         return null;
-      }
-
-      synchronized (members)
-      {
-         while (members.isEmpty())
-         {
-            log.debug("getCoordinator(): waiting on viewAccepted()");
-            try
-            {
-               members.wait();
-            }
-            catch (InterruptedException e)
-            {
-               log.error("getCoordinator(): Interrupted while waiting for members to be set", e);
-               break;
-            }
-         }
-         return members.size() > 0 ? members.get(0) : null;
-      }
-   }
-
-   /**
     * Evicts the node at <code>subtree</code> along with all descendant nodes.
     *
     * @param subtree Fqn indicating the uppermost node in the
@@ -979,55 +640,6 @@
 
    }
 
-   private void removeLocksForDeadMembers(NodeSPI node, List deadMembers)
-   {
-      Set<GlobalTransaction> deadOwners = new HashSet<GlobalTransaction>();
-      NodeLock lock = node.getLock();
-      Object owner = lock.getWriterOwner();
-
-      if (isLockOwnerDead(owner, deadMembers))
-      {
-         deadOwners.add((GlobalTransaction) owner);
-      }
-
-      for (Object readOwner : lock.getReaderOwners())
-      {
-         if (isLockOwnerDead(readOwner, deadMembers))
-         {
-            deadOwners.add((GlobalTransaction) readOwner);
-         }
-      }
-
-      for (GlobalTransaction deadOwner : deadOwners)
-      {
-         boolean localTx = deadOwner.getAddress().equals(getLocalAddress());
-         boolean broken = LockUtil.breakTransactionLock(lock, deadOwner, localTx, this);
-
-         if (broken && trace)
-         {
-            log.trace("Broke lock for node " + node.getFqn() +
-                  " held by " + deadOwner);
-         }
-      }
-
-      // Recursively unlock children
-      for (Object child : node.getChildrenDirect())
-      {
-         removeLocksForDeadMembers((NodeSPI) child, deadMembers);
-      }
-   }
-
-   private boolean isLockOwnerDead(Object owner, List deadMembers)
-   {
-      boolean result = false;
-      if (owner != null && owner instanceof GlobalTransaction)
-      {
-         Object addr = ((GlobalTransaction) owner).getAddress();
-         result = deadMembers.contains(addr);
-      }
-      return result;
-   }
-
    // -----------  End Marshalling and State Transfer -----------------------
 
    /**
@@ -1342,155 +954,6 @@
       return count;
    }
 
-   /* ---------------------- Remote method calls -------------------- */
-
-   /**
-    * @param mbrs
-    * @param method_call
-    * @param synchronous
-    * @param exclude_self
-    * @param timeout
-    * @return
-    * @throws Exception
-    * @deprecated Note this is due to be moved to an interceptor.
-    */
-   @Deprecated
-   public List callRemoteMethods(List<Address> mbrs, MethodCall method_call,
-                                 boolean synchronous, boolean exclude_self, long timeout)
-         throws Exception
-   {
-      return callRemoteMethods(mbrs, method_call, synchronous ? GroupRequest.GET_ALL : GroupRequest.GET_NONE, exclude_self, timeout);
-   }
-
-   @Deprecated
-   public List callRemoteMethods(List<Address> mbrs, MethodCall method_call, int mode, boolean exclude_self, long timeout)
-         throws Exception
-   {
-      return callRemoteMethods(mbrs, method_call, mode, exclude_self, timeout, null);
-   }
-
-   /**
-    * Overloaded to allow a finer grained control over JGroups mode
-    *
-    * @param mbrs
-    * @param method_call
-    * @param mode
-    * @param exclude_self
-    * @param timeout
-    * @return
-    * @throws Exception
-    * @deprecated Note this is due to be moved to an interceptor.
-    */
-   @Deprecated
-   public List callRemoteMethods(List<Address> mbrs, MethodCall method_call, int mode, boolean exclude_self, long timeout, RspFilter rspFilter)
-         throws Exception
-   {
-      int modeToUse = mode;
-      int preferredMode;
-      if ((preferredMode = spi.getInvocationContext().getOptionOverrides().getGroupRequestMode()) > -1)
-         modeToUse = preferredMode;
-
-      RspList rsps;
-      List retval;
-      Vector<Address> validMembers;
-
-      if (disp == null)
-      {
-         return null;
-      }
-
-      if (mbrs != null)
-         validMembers = new Vector<Address>(mbrs);
-      else
-      {
-         synchronized (members)
-         {
-            validMembers = new Vector<Address>(this.members);
-         }
-      }
-
-      if (exclude_self && !validMembers.isEmpty())
-      {
-         Object local_addr = getLocalAddress();
-         if (local_addr != null)
-         {
-            validMembers.remove(local_addr);
-         }
-      }
-      if (validMembers.isEmpty())
-      {
-         if (trace)
-         {
-            log.trace("destination list is empty, discarding call");
-         }
-         return null;
-      }
-
-      if (trace)
-      {
-         log.trace("callRemoteMethods(): valid members are " + validMembers + " methods: " + method_call);
-      }
-
-      if (channel.flushSupported())
-      {
-         if (!flushBlockGate.await(configuration.getStateRetrievalTimeout()))
-            throw new TimeoutException("State retrieval timed out waiting for flush unblock.");
-      }
-      rsps = rspFilter == null
-            ? disp.callRemoteMethods(validMembers, method_call, modeToUse, timeout, buddyManager != null && buddyManager.isEnabled())
-            : disp.callRemoteMethods(validMembers, method_call, modeToUse, timeout, buddyManager != null && buddyManager.isEnabled(), false, rspFilter);
-
-      // a null response is 99% likely to be due to a marshalling problem - we throw a NSE, this needs to be changed when
-      // JGroups supports http://jira.jboss.com/jira/browse/JGRP-193
-      if (rsps == null)
-      {
-         // return null;
-         throw new NotSerializableException("RpcDispatcher returned a null.  This is most often caused by args for " + method_call.getName() + " not being serializable.");
-      }
-      if (mode == GroupRequest.GET_NONE)
-      {
-         return Collections.EMPTY_LIST;// async case
-      }
-
-      if (trace)
-      {
-         log.trace("(" + getLocalAddress() + "): responses for method " + method_call.getName() + ":\n" + rsps);
-      }
-
-      retval = new ArrayList(rsps.size());
-
-      for (Rsp rsp : rsps.values())
-      {
-         if (rsp.wasSuspected() || !rsp.wasReceived())
-         {
-            CacheException ex;
-            if (rsp.wasSuspected())
-            {
-               ex = new SuspectException("Suspected member: " + rsp.getSender());
-            }
-            else
-            {
-               ex = new TimeoutException("Replication timeout for " + rsp.getSender());
-            }
-            retval.add(new ReplicationException("rsp=" + rsp, ex));
-         }
-         else
-         {
-            Object value = rsp.getValue();
-            if (value instanceof Exception && !(value instanceof ReplicationException))
-            {
-               // if we have any application-level exceptions make sure we throw them!!
-               if (trace) log.trace("Recieved exception'" + value + "' from " + rsp.getSender());
-               throw (Exception) value;
-            }
-            retval.add(value);
-         }
-      }
-      return retval;
-   }
-
-   /* -------------------- End Remote method calls ------------------ */
-
    /* --------------------- Callbacks -------------------------- */
 
    /* ----- These are VERSIONED callbacks to facilitate JBCACHE-843.  Also see docs/design/DataVersion.txt --- */
@@ -2281,7 +1744,7 @@
 
          if (backupNodeFqn == null && searchSubtrees)
          {
-            backupNodeFqn = BuddyManager.getBackupFqn(BuddyManager.getGroupNameFromAddress(getLocalAddress()), fqn);
+            backupNodeFqn = BuddyManager.getBackupFqn(BuddyManager.getGroupNameFromAddress(rpcManager.getLocalAddress()), fqn);
          }
 
          List<NodeData> list = getNodeData(new LinkedList<NodeData>(), (NodeSPI) actualNode);
@@ -2483,114 +1946,31 @@
       StringBuilder category = new StringBuilder(getClass().getName());
       if (configuration != null)
       {
-         String clusterName = configuration.getClusterName();
-         if (clusterName != null)
+         if (rpcManager != null)
          {
-            category.append('.');
-            category.append(clusterName);
-            if (channel != null && channel.getLocalAddress() != null)
+            String clusterName = configuration.getClusterName();
+            if (clusterName != null)
             {
                category.append('.');
-               category.append(channel.getLocalAddress().toString().replace('.', '_'));
+               category.append(clusterName);
+               if (rpcManager.getLocalAddress() != null)
+               {
+                  category.append('.');
+                  category.append(rpcManager.getLocalAddress().toString().replace('.', '_'));
+               }
             }
          }
+         else
+         {
+            // we're in LOCAL mode
+            category.append("_LOCAL");
+         }
       }
       // replace .s with _s otherwise Log4J will strip them out
       log = LogFactory.getLog(category.toString());
       trace = log.isTraceEnabled();
    }
 
-   /**
-    * Kills the JGroups channel; an unclean channel disconnect
-    */
-   public void killChannel()
-   {
-      if (channel != null)
-      {
-         channel.disconnect();
-         channel.close();
-      }
-   }
-
-   /*----------------------- MembershipListener ------------------------*/
-
-   protected class MembershipListenerAdaptor implements ExtendedMembershipListener
-   {
-
-      public void viewAccepted(View new_view)
-      {
-         Vector<Address> new_mbrs = new_view.getMembers();
-         if (log.isInfoEnabled()) log.info("viewAccepted(): " + new_view);
-         synchronized (members)
-         {
-            boolean needNotification = false;
-            if (new_mbrs != null)
-            {
-               // Determine what members have been removed
-               // and roll back any tx and break any locks
-               Vector<Address> removed = new Vector<Address>(members);
-               removed.removeAll(new_mbrs);
-               removeLocksForDeadMembers(root, removed);
-
-               members.removeAllElements();
-               members.addAll(new_mbrs);
-
-               needNotification = true;
-            }
-
-            // Now that we have a view, figure out if we are the coordinator
-            coordinator = (members.size() != 0 && members.get(0).equals(getLocalAddress()));
-
-            // now notify listeners - *after* updating the coordinator. - JBCACHE-662
-            if (needNotification && notifier != null)
-            {
-               InvocationContext ctx = spi.getInvocationContext();
-               notifier.notifyViewChange(new_view, ctx);
-            }
-
-            // Wake up any threads that are waiting to know who the members
-            // are so they can figure out who the coordinator is
-            members.notifyAll();
-         }
-      }
-
-      /**
-       * Called when a member is suspected.
-       */
-      public void suspect(Address suspected_mbr)
-      {
-      }
-
-      /**
-       * Indicates that a channel has received a BLOCK event from FLUSH protocol.
-       */
-      public void block()
-      {
-         flushBlockGate.close();
-         if (log.isDebugEnabled()) log.debug("Block received at " + getLocalAddress());
-
-         remoteDelegate.block();
-
-         if (log.isDebugEnabled()) log.debug("Block processed at " + getLocalAddress());
-      }
-
-      /**
-       * Indicates that a channel has received a UNBLOCK event from FLUSH protocol.
-       */
-      public void unblock()
-      {
-         if (log.isDebugEnabled()) log.debug("UnBlock received at " + getLocalAddress());
-
-         remoteDelegate.unblock();
-
-         if (log.isDebugEnabled()) log.debug("UnBlock processed at " + getLocalAddress());
-         flushBlockGate.open();
-      }
-
-   }
-
-   /*------------------- End of MembershipListener ----------------------*/
-
    /* ------------------------------ Private methods --------------------------- */
 
    /**
@@ -2712,7 +2092,7 @@
       GlobalTransaction gtx = transactionTable.get(tx);
       if (gtx == null && createIfNotExists)
       {
-         Address addr = getLocalAddress();
+         Address addr = rpcManager.getLocalAddress();
          gtx = GlobalTransaction.create(addr);
          transactionTable.put(tx, gtx);
          TransactionEntry ent = configuration.isNodeLockingOptimistic() ? new OptimisticTransactionEntry() : new TransactionEntry();
@@ -2862,89 +2242,6 @@
       return toReturn;
    }
 
-   private void initialiseChannelAndRpcDispatcher() throws CacheException
-   {
-      channel = configuration.getRuntimeConfig().getChannel();
-      if (channel == null)
-      {
-         // Try to create a multiplexer channel
-         channel = getMultiplexerChannel();
-
-         if (channel != null)
-         {
-            ReflectionUtil.setValue(configuration, "accessible", true);
-            configuration.setUsingMultiplexer(true);
-            if (log.isDebugEnabled())
-            {
-               log.debug("Created Multiplexer Channel for cache cluster " + configuration.getClusterName() +
-                     " using stack " + configuration.getMultiplexerStack());
-            }
-         }
-         else
-         {
-
-            try
-            {
-               if (configuration.getClusterConfig() == null)
-               {
-                  log.debug("setting cluster properties to default value");
-                  channel = new JChannel(configuration.getDefaultClusterConfig());
-               }
-               else
-               {
-                  if (trace)
-                  {
-                     log.trace("Cache cluster properties: " + configuration.getClusterConfig());
-                  }
-                  channel = new JChannel(configuration.getClusterConfig());
-               }
-            }
-            catch (ChannelException el)
-            {
-               el.printStackTrace();
-            }
-         }
-
-         configuration.getRuntimeConfig().setChannel(channel);
-      }
-
-      channel.setOpt(Channel.AUTO_RECONNECT, true);
-      channel.setOpt(Channel.AUTO_GETSTATE, true);
-      channel.setOpt(Channel.BLOCK, true);
-
-      // always use the InactiveRegionAwareRpcDispatcher - exceptions due to regions not being active should not propagate to remote
-      // nodes as errors. - Manik
-      disp = new InactiveRegionAwareRpcDispatcher(channel, messageListener, new MembershipListenerAdaptor(), remoteDelegate);
-
-      disp.setRequestMarshaller(marshaller);
-      disp.setResponseMarshaller(marshaller);
-
-      if (trace) log.trace("Started with RpcDispatcher " + disp);
-   }
-
-   private JChannel getMultiplexerChannel() throws CacheException
-   {
-      String stackName = configuration.getMultiplexerStack();
-
-      RuntimeConfig rtc = configuration.getRuntimeConfig();
-      ChannelFactory channelFactory = rtc.getMuxChannelFactory();
-      JChannel muxchannel = null;
-
-      if (channelFactory != null)
-      {
-         try
-         {
-            muxchannel = (JChannel) channelFactory.createMultiplexerChannel(stackName, configuration.getClusterName());
-         }
-         catch (Exception e)
-         {
-            throw new CacheException("Failed to create multiplexed channel using stack " + stackName, e);
-         }
-      }
-
-      return muxchannel;
-   }
-
    // ================== methods to implement Cache and CacheSPI interfaces ============================
 
    public List<Interceptor> getInterceptorChain()

Modified: core/trunk/src/main/java/org/jboss/cache/CacheSPI.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/CacheSPI.java	2008-01-17 15:22:36 UTC (rev 5160)
+++ core/trunk/src/main/java/org/jboss/cache/CacheSPI.java	2008-01-17 22:34:36 UTC (rev 5161)
@@ -18,7 +18,6 @@
 import org.jboss.cache.statetransfer.StateTransferManager;
 import org.jboss.cache.transaction.GlobalTransaction;
 import org.jboss.cache.transaction.TransactionTable;
-import org.jgroups.Address;
 
 import javax.transaction.Transaction;
 import javax.transaction.TransactionManager;
@@ -285,12 +284,6 @@
     */
    Set<Fqn> getInternalFqns();
 
-   @Deprecated
-   void fetchPartialState(List<Address> members, Fqn subtreeRoot) throws Exception;
-
-   @Deprecated
-   void fetchPartialState(List<Address> members, Fqn subtreeRoot, Fqn integrationPoint) throws Exception;
-
    int getNumberOfLocksHeld();
 
    /**

Modified: core/trunk/src/main/java/org/jboss/cache/RPCManager.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/RPCManager.java	2008-01-17 15:22:36 UTC (rev 5160)
+++ core/trunk/src/main/java/org/jboss/cache/RPCManager.java	2008-01-17 22:34:36 UTC (rev 5161)
@@ -6,24 +6,120 @@
 
 import java.util.List;
 
+/**
+ * Provides a mechanism for communicating with other caches in the cluster.  For now this is based on JGroups as an underlying
+ * transport, and in future more transport options may become available.
+ * <p/>
+ * Implementations have a simple lifecycle:
+ * <ul>
+ * <li>start() - starts the underlying channel based on configuration options injected, and connects the channel</li>
+ * <li>disconnect() - disconnects the channel</li>
+ * <li>stop() - stops the dispatcher and releases resources</li>
+ * </ul>
+ *
+ * @author Manik Surtani
+ * @since 2.1.0
+ */
 public interface RPCManager
 {
    /**
-    * The same as {@link #callRemoteMethods(java.util.List,org.jboss.cache.marshall.MethodCall,int,boolean,long)} except that it adds a JGroups
-    * {@link org.jgroups.blocks.RspFilter} to the list of parameters, which is used to filter results.
+    * Disconnects and closes the underlying JGroups channel.
     */
-   public List callRemoteMethods(List<Address> recipients, MethodCall methodCall, int mode, boolean excludeSelf, long timeout, RspFilter responseFilter) throws Exception;
+   void disconnect();
 
-   public List callRemoteMethods(List<Address> recipients, MethodCall methodCall, int mode, boolean excludeSelf, long timeout) throws Exception;
+   /**
+    * Stops the RPCDispatcher and frees resources.  Closes and disconnects the underlying JGroups channel if this is
+    * still open/connected.
+    */
+   void stop();
 
-   public boolean isCoordinator();
+   /**
+    * Starts the RPCManager by connecting the underlying JGroups channel (if configured for replication).  Connecting
+    * the channel may also involve state transfer (if configured) so the interceptor chain should be started and
+    * available before this method is called.
+    */
+   void start();
 
-   public Address getCoordinator();
+   /**
+    * Invokes an RPC call on other caches in the cluster.
+    *
+    * @param recipients     a list of Addresses to invoke the call on.  If this is null, the call is broadcast to the entire cluster.
+    * @param methodCall     the method call to invoke
+    * @param mode           the group request mode to use.  See {@link org.jgroups.blocks.GroupRequest}.
+    * @param excludeSelf    if true, the message is not looped back to the originator.
+    * @param timeout        a timeout after which to throw a replication exception.
+    * @param responseFilter a response filter with which to filter out failed/unwanted/invalid responses.
+    * @return a list of responses from each member contacted.
+    * @throws Exception in the event of problems.
+    */
+   List<Object> callRemoteMethods(List<Address> recipients, MethodCall methodCall, int mode, boolean excludeSelf, long timeout, RspFilter responseFilter) throws Exception;
 
-   public List callRemoteMethods(List<Address> recipients, MethodCall methodCall, boolean synchronous, boolean excludeSelf, int timeout) throws Exception;
+   /**
+    * Invokes an RPC call on other caches in the cluster.
+    *
+    * @param recipients  a list of Addresses to invoke the call on.  If this is null, the call is broadcast to the entire cluster.
+    * @param methodCall  the method call to invoke
+    * @param mode        the group request mode to use.  See {@link org.jgroups.blocks.GroupRequest}.
+    * @param excludeSelf if true, the message is not looped back to the originator.
+    * @param timeout     a timeout after which to throw a replication exception.
+    * @return a list of responses from each member contacted.
+    * @throws Exception in the event of problems.
+    */
+   List<Object> callRemoteMethods(List<Address> recipients, MethodCall methodCall, int mode, boolean excludeSelf, long timeout) throws Exception;
 
    /**
-    * @return Returns the replication queue (if one is used), null otherwise.
+    * Invokes an RPC call on other caches in the cluster.
+    *
+    * @param recipients  a list of Addresses to invoke the call on.  If this is null, the call is broadcast to the entire cluster.
+    * @param methodCall  the method call to invoke
+    * @param synchronous if true, sets group request mode to {@link org.jgroups.blocks.GroupRequest#GET_ALL}, and if false sets it to {@link org.jgroups.blocks.GroupRequest#GET_NONE}.
+    * @param excludeSelf if true, the message is not looped back to the originator.
+    * @param timeout     a timeout after which to throw a replication exception.
+    * @return a list of responses from each member contacted.
+    * @throws Exception in the event of problems.
     */
-   public ReplicationQueue getReplicationQueue();
+   List<Object> callRemoteMethods(List<Address> recipients, MethodCall methodCall, boolean synchronous, boolean excludeSelf, int timeout) throws Exception;
+
+   /**
+    * @return true if the current Channel is the coordinator of the cluster.
+    */
+   boolean isCoordinator();
+
+   /**
+    * @return the Address of the current coordinator.
+    */
+   Address getCoordinator();
+
+   /**
+    * Retrieves the local JGroups channel's address
+    *
+    * @return an Address
+    */
+   Address getLocalAddress();
+
+   /**
+    * Returns a defensively copied list of  members in the current cluster view.
+    */
+   List<Address> getMembers();
+
+   /**
+    * Retrieves partial state from remote instances.
+    *
+    * @param sources           sources to consider for a state transfer
+    * @param sourceTarget      Fqn on source to retrieve state for
+    * @param integrationTarget integration point on local cache to apply state
+    * @throws Exception in the event of problems
+    */
+   void fetchPartialState(List<Address> sources, Fqn sourceTarget, Fqn integrationTarget) throws Exception;
+
+   /**
+    * Retrieves partial state from remote instances.
+    *
+    * @param sources sources to consider for a state transfer
+    * @param subtree Fqn subtree to retrieve.  Will be integrated at the same point.
+    * @throws Exception in the event of problems
+    */
+   void fetchPartialState(List<Address> sources, Fqn subtree) throws Exception;
+
+
 }

Modified: core/trunk/src/main/java/org/jboss/cache/RPCManagerImpl.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/RPCManagerImpl.java	2008-01-17 15:22:36 UTC (rev 5160)
+++ core/trunk/src/main/java/org/jboss/cache/RPCManagerImpl.java	2008-01-17 22:34:36 UTC (rev 5161)
@@ -6,12 +6,52 @@
  */
 package org.jboss.cache;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.jboss.cache.buddyreplication.BuddyManager;
+import org.jboss.cache.config.Configuration;
+import org.jboss.cache.config.RuntimeConfig;
+import org.jboss.cache.factories.annotations.ComponentName;
 import org.jboss.cache.factories.annotations.Inject;
+import org.jboss.cache.factories.annotations.Stop;
+import org.jboss.cache.invocation.RemoteCacheInvocationDelegate;
+import org.jboss.cache.loader.CacheLoaderManager;
+import org.jboss.cache.lock.LockUtil;
+import org.jboss.cache.lock.NodeLock;
+import org.jboss.cache.lock.TimeoutException;
+import org.jboss.cache.marshall.InactiveRegionAwareRpcDispatcher;
+import org.jboss.cache.marshall.Marshaller;
 import org.jboss.cache.marshall.MethodCall;
+import org.jboss.cache.notifications.Notifier;
+import org.jboss.cache.remoting.jgroups.CacheMessageListener;
+import org.jboss.cache.statetransfer.StateTransferManager;
+import org.jboss.cache.transaction.GlobalTransaction;
+import org.jboss.cache.transaction.TransactionTable;
+import org.jboss.cache.util.ThreadGate;
+import org.jboss.cache.util.reflect.ReflectionUtil;
 import org.jgroups.Address;
+import org.jgroups.Channel;
+import org.jgroups.ChannelException;
+import org.jgroups.ChannelFactory;
+import org.jgroups.ExtendedMembershipListener;
+import org.jgroups.JChannel;
+import org.jgroups.StateTransferException;
+import org.jgroups.View;
+import org.jgroups.blocks.GroupRequest;
+import org.jgroups.blocks.RpcDispatcher;
 import org.jgroups.blocks.RspFilter;
+import org.jgroups.util.Rsp;
+import org.jgroups.util.RspList;
 
+import javax.transaction.TransactionManager;
+import java.io.NotSerializableException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.LinkedList;
 import java.util.List;
+import java.util.Set;
+import java.util.Vector;
 
 /**
  * Manager that handles all RPC calls between JBoss Cache instances
@@ -20,61 +60,603 @@
  */
 public class RPCManagerImpl implements RPCManager
 {
-   private CacheImpl c;
+   private Channel channel;
+   private Log log = LogFactory.getLog(RPCManagerImpl.class);
+   private List<Address> members = new LinkedList<Address>();
+   /**
+    * True if this Cache is the coordinator.
+    */
+   private volatile boolean coordinator = false;
+   /**
+    * Thread gate used to block Dispatcher during JGroups FLUSH protocol
+    */
+   private final ThreadGate flushBlockGate = new ThreadGate();
+   /**
+    * JGroups RpcDispatcher in use.
+    */
+   private RpcDispatcher disp = null;
 
    /**
-    * Empty ctor for mock object creation/unit testing
+    * JGroups message listener.
     */
-   public RPCManagerImpl()
+   private CacheMessageListener messageListener;
+   private Configuration configuration;
+   private Notifier notifier;
+   private CacheSPI spi;
+   private boolean trace = log.isTraceEnabled();
+   private BuddyManager buddyManager;
+   private RemoteCacheInvocationDelegate remoteDelegate;
+   private CacheLoaderManager cacheLoaderManager;
+   private Marshaller marshaller;
+   private TransactionManager txManager;
+   private TransactionTable txTable;
+
+
+   @Inject
+   private void setupDependencies(CacheMessageListener messageListener, Configuration configuration,
+                                  Notifier notifier, CacheSPI spi, BuddyManager buddyManager, Marshaller marshaller,
+                                  @ComponentName("remoteDelegate")RemoteCacheInvocationDelegate remoteDelegate,
+                                  CacheLoaderManager cacheLoaderManager, TransactionTable txTable, TransactionManager txManager)
    {
+      this.messageListener = messageListener;
+      this.configuration = configuration;
+      this.notifier = notifier;
+      this.spi = spi;
+      this.buddyManager = buddyManager;
+      this.remoteDelegate = remoteDelegate;
+      this.cacheLoaderManager = cacheLoaderManager;
+      this.marshaller = marshaller;
+      this.txManager = txManager;
+      this.txTable = txTable;
    }
 
-   @Inject
-   private void setupDependencies(CacheImpl c)
+   // ------------ START: Lifecycle methods ------------
+
+   // This is called manually, rather than by using @Start, since it needs to be called AFTER all other components are started.
+
+   public void start()
    {
-      this.c = c;
+      switch (configuration.getCacheMode())
+      {
+         case LOCAL:
+            log.debug("cache mode is local, will not create the channel");
+            break;
+         case REPL_SYNC:
+         case REPL_ASYNC:
+         case INVALIDATION_ASYNC:
+         case INVALIDATION_SYNC:
+            if (log.isDebugEnabled()) log.debug("Cache mode is " + configuration.getCacheMode());
+
+            initialiseChannelAndRpcDispatcher();
+
+            if (shouldFetchStateOnStartup())
+            {
+               try
+               {
+                  long start = System.currentTimeMillis();
+                  // connect and state transfer
+                  channel.connect(configuration.getClusterName(), null, null, configuration.getStateRetrievalTimeout());
+                  //if I am not the only and the first member than wait for a state to arrive
+                  if (getMembers().size() > 1) messageListener.waitForState();
+
+                  if (log.isDebugEnabled())
+                     log.debug("connected, state was retrieved successfully (in " + (System.currentTimeMillis() - start) + " milliseconds)");
+               }
+               catch (StateTransferException ste)
+               {
+                  // make sure we disconnect from the channel before we throw this exception!
+                  // JBCACHE-761
+                  disconnect();
+                  throw new CacheException("Unable to fetch state on startup", ste);
+               }
+               catch (ChannelException e)
+               {
+                  throw new CacheException("Unable to connect to JGroups channel", e);
+               }
+               catch (Exception ex)
+               {
+                  throw new CacheException("Unable to fetch state on startup", ex);
+               }
+            }
+            else
+            {
+               //otherwise just connect
+               try
+               {
+                  channel.connect(configuration.getClusterName());
+               }
+               catch (ChannelException e)
+               {
+                  throw new CacheException("Unable to connect to JGroups channel", e);
+               }
+            }
+            if (log.isInfoEnabled()) log.info("Cache local address is " + getLocalAddress());
+      }
    }
 
-   public RPCManagerImpl(CacheSPI c)
+   public void disconnect()
    {
-      this.c = (CacheImpl) c;
+      if (channel != null && channel.isOpen())
+      {
+         log.info("Disconnecting and closing the Channel");
+         channel.disconnect();
+         channel.close();
+      }
    }
 
-   // for now, we delegate RPC calls to deprecated methods in CacheImpl.
+   @Stop
+   public void stop()
+   {
+      try
+      {
+         disconnect();
+      }
+      catch (Exception toLog)
+      {
+         log.error("Problem closing channel; setting it to null", toLog);
+      }
 
-   @SuppressWarnings("deprecation")
-   public List callRemoteMethods(List<Address> recipients, MethodCall methodCall, int mode, boolean excludeSelf, long timeout, RspFilter responseFilter) throws Exception
+      channel = null;
+      configuration.getRuntimeConfig().setChannel(null);
+      if (disp != null)
+      {
+         log.info("Stopping the RpcDispatcher");
+         disp.stop();
+         disp = null;
+      }
+      if (members != null)
+      {
+         synchronized (members)
+         {
+            members.clear();
+         }
+      }
+
+      coordinator = false;
+
+      disp = null;
+   }
+
+   /**
+    * @return true if we need to fetch state on startup.  I.e., initiate a state transfer.
+    */
+   private boolean shouldFetchStateOnStartup()
    {
-      return c.callRemoteMethods(recipients, methodCall, mode, excludeSelf, timeout, responseFilter);
+      boolean loaderFetch = cacheLoaderManager != null && cacheLoaderManager.isFetchPersistentState();
+      return !configuration.isInactiveOnStartup() && buddyManager == null && (configuration.isFetchInMemoryState() || loaderFetch);
    }
 
-   @SuppressWarnings("deprecation")
-   public List callRemoteMethods(List<Address> recipients, MethodCall methodCall, int mode, boolean excludeSelf, long timeout) throws Exception
+   private void initialiseChannelAndRpcDispatcher() throws CacheException
    {
-      return c.callRemoteMethods(recipients, methodCall, mode, excludeSelf, timeout);
+      channel = configuration.getRuntimeConfig().getChannel();
+      if (channel == null)
+      {
+         // Try to create a multiplexer channel
+         channel = getMultiplexerChannel();
+
+         if (channel != null)
+         {
+            ReflectionUtil.setValue(configuration, "accessible", true);
+            configuration.setUsingMultiplexer(true);
+            if (log.isDebugEnabled())
+               log.debug("Created Multiplexer Channel for cache cluster " + configuration.getClusterName() + " using stack " + configuration.getMultiplexerStack());
+         }
+         else
+         {
+            try
+            {
+               if (configuration.getClusterConfig() == null)
+               {
+                  log.debug("setting cluster properties to default value");
+                  channel = new JChannel(configuration.getDefaultClusterConfig());
+               }
+               else
+               {
+                  if (trace)
+                  {
+                     log.trace("Cache cluster properties: " + configuration.getClusterConfig());
+                  }
+                  channel = new JChannel(configuration.getClusterConfig());
+               }
+            }
+            catch (ChannelException el)
+            {
+               el.printStackTrace();
+            }
+         }
+
+         configuration.getRuntimeConfig().setChannel(channel);
+      }
+
+      channel.setOpt(Channel.AUTO_RECONNECT, true);
+      channel.setOpt(Channel.AUTO_GETSTATE, true);
+      channel.setOpt(Channel.BLOCK, true);
+
+      // always use the InactiveRegionAwareRpcDispatcher - exceptions due to regions not being active should not propagate to remote
+      // nodes as errors. - Manik
+      disp = new InactiveRegionAwareRpcDispatcher(channel, messageListener, new MembershipListenerAdaptor(), remoteDelegate);
+
+      disp.setRequestMarshaller(marshaller);
+      disp.setResponseMarshaller(marshaller);
    }
 
+   private JChannel getMultiplexerChannel() throws CacheException
+   {
+      String stackName = configuration.getMultiplexerStack();
+
+      RuntimeConfig rtc = configuration.getRuntimeConfig();
+      ChannelFactory channelFactory = rtc.getMuxChannelFactory();
+      JChannel muxchannel = null;
+
+      if (channelFactory != null)
+      {
+         try
+         {
+            muxchannel = (JChannel) channelFactory.createMultiplexerChannel(stackName, configuration.getClusterName());
+         }
+         catch (Exception e)
+         {
+            throw new CacheException("Failed to create multiplexed channel using stack " + stackName, e);
+         }
+      }
+
+      return muxchannel;
+   }
+
+
+   private void removeLocksForDeadMembers(NodeSPI node, List deadMembers)
+   {
+      Set<GlobalTransaction> deadOwners = new HashSet<GlobalTransaction>();
+      NodeLock lock = node.getLock();
+      Object owner = lock.getWriterOwner();
+
+      if (isLockOwnerDead(owner, deadMembers))
+      {
+         deadOwners.add((GlobalTransaction) owner);
+      }
+
+      for (Object readOwner : lock.getReaderOwners())
+      {
+         if (isLockOwnerDead(readOwner, deadMembers))
+         {
+            deadOwners.add((GlobalTransaction) readOwner);
+         }
+      }
+
+      for (GlobalTransaction deadOwner : deadOwners)
+      {
+         boolean localTx = deadOwner.getAddress().equals(getLocalAddress());
+         boolean broken = LockUtil.breakTransactionLock(lock, deadOwner, localTx, txTable, txManager);
+
+         if (broken && trace) log.trace("Broke lock for node " + node.getFqn() + " held by " + deadOwner);
+      }
+
+      // Recursively unlock children
+      for (Object child : node.getChildrenDirect())
+      {
+         removeLocksForDeadMembers((NodeSPI) child, deadMembers);
+      }
+   }
+
+   private boolean isLockOwnerDead(Object owner, List deadMembers)
+   {
+      boolean result = false;
+      if (owner != null && owner instanceof GlobalTransaction)
+      {
+         Object addr = ((GlobalTransaction) owner).getAddress();
+         result = deadMembers.contains(addr);
+      }
+      return result;
+   }
+
+   // ------------ END: Lifecycle methods ------------
+
+   // ------------ START: RPC call methods ------------
+
+   public List<Object> callRemoteMethods(List<Address> recipients, MethodCall methodCall, int mode, boolean excludeSelf, long timeout) throws Exception
+   {
+      return callRemoteMethods(recipients, methodCall, mode, excludeSelf, timeout, null);
+   }
+
+   public List<Object> callRemoteMethods(List<Address> recipients, MethodCall methodCall, boolean synchronous, boolean excludeSelf, int timeout) throws Exception
+   {
+      return callRemoteMethods(recipients, methodCall, synchronous ? GroupRequest.GET_ALL : GroupRequest.GET_NONE, excludeSelf, timeout);
+   }
+
+   public List<Object> callRemoteMethods(List<Address> recipients, MethodCall methodCall, int mode, boolean excludeSelf, long timeout, RspFilter responseFilter) throws Exception
+   {
+      int modeToUse = mode;
+      int preferredMode;
+      if ((preferredMode = spi.getInvocationContext().getOptionOverrides().getGroupRequestMode()) > -1)
+         modeToUse = preferredMode;
+
+      RspList rsps;
+      List<Object> retval;
+      Vector<Address> validMembers;
+
+      if (disp == null)
+      {
+         return null;
+      }
+
+      if (recipients != null)
+         validMembers = new Vector<Address>(recipients);
+      else
+      {
+         synchronized (members)
+         {
+            validMembers = new Vector<Address>(members);
+         }
+      }
+
+      if (excludeSelf && !validMembers.isEmpty())
+      {
+         Address local_addr = getLocalAddress();
+         if (local_addr != null) validMembers.remove(local_addr);
+      }
+
+      if (validMembers.isEmpty())
+      {
+         if (trace) log.trace("destination list is empty, discarding call");
+         return null;
+      }
+
+      if (trace) log.trace("callRemoteMethods(): valid members are " + validMembers + " methods: " + methodCall);
+
+      if (channel.flushSupported())
+      {
+         if (!flushBlockGate.await(configuration.getStateRetrievalTimeout()))
+            throw new TimeoutException("State retrieval timed out waiting for flush unblock.");
+      }
+      rsps = responseFilter == null
+            ? disp.callRemoteMethods(validMembers, methodCall, modeToUse, timeout, buddyManager != null && buddyManager.isEnabled())
+            : disp.callRemoteMethods(validMembers, methodCall, modeToUse, timeout, buddyManager != null && buddyManager.isEnabled(), false, responseFilter);
+
+      // a null response is 99% likely to be due to a marshalling problem - we throw a NSE, this needs to be changed when
+      // JGroups supports http://jira.jboss.com/jira/browse/JGRP-193
+      if (rsps == null)
+      {
+         // return null;
+         throw new NotSerializableException("RpcDispatcher returned a null.  This is most often caused by args for " + methodCall.getName() + " not being serializable.");
+      }
+      if (mode == GroupRequest.GET_NONE) return Collections.emptyList();// async case
+
+      if (trace) log.trace("(" + getLocalAddress() + "): responses for method " + methodCall.getName() + ":\n" + rsps);
+
+      retval = new ArrayList<Object>(rsps.size());
+
+      for (Rsp rsp : rsps.values())
+      {
+         if (rsp.wasSuspected() || !rsp.wasReceived())
+         {
+            CacheException ex;
+            if (rsp.wasSuspected())
+            {
+               ex = new SuspectException("Suspected member: " + rsp.getSender());
+            }
+            else
+            {
+               ex = new TimeoutException("Replication timeout for " + rsp.getSender());
+            }
+            retval.add(new ReplicationException("rsp=" + rsp, ex));
+         }
+         else
+         {
+            Object value = rsp.getValue();
+            if (value instanceof Exception && !(value instanceof ReplicationException))
+            {
+               // if we have any application-level exceptions make sure we throw them!!
+               if (trace) log.trace("Recieved exception'" + value + "' from " + rsp.getSender());
+               throw (Exception) value;
+            }
+            retval.add(value);
+         }
+      }
+      return retval;
+   }
+
+   // ------------ END: RPC call methods ------------
+
+   // ------------ START: Partial state transfer methods ------------
+
+   public void fetchPartialState(List<Address> sources, Fqn sourceTarget, Fqn integrationTarget) throws Exception
+   {
+      String encodedStateId = sourceTarget + StateTransferManager.PARTIAL_STATE_DELIMITER + integrationTarget;
+      fetchPartialState(sources, encodedStateId);
+   }
+
+   public void fetchPartialState(List<Address> sources, Fqn subtree) throws Exception
+   {
+      if (subtree == null)
+      {
+         throw new IllegalArgumentException("Cannot fetch partial state. Null subtree.");
+      }
+      fetchPartialState(sources, subtree.toString());
+   }
+
+   private void fetchPartialState(List<Address> sources, String stateId) throws Exception
+   {
+      if (sources == null || sources.isEmpty() || stateId == null)
+      {
+         // should this really be throwing an exception?  Are there valid use cases where partial state may not be available? - Manik
+         // Yes -- cache is configured LOCAL but app doesn't know it -- Brian
+         //throw new IllegalArgumentException("Cannot fetch partial state, targets are " + sources + " and stateId is " + stateId);
+         if (log.isWarnEnabled())
+            log.warn("Cannot fetch partial state, targets are " + sources + " and stateId is " + stateId);
+         return;
+      }
+
+      List<Address> targets = new LinkedList<Address>(sources);
+
+      //skip *this* node as a target
+      targets.remove(getLocalAddress());
+
+      if (targets.isEmpty())
+      {
+         // Definitely no exception here -- this happens every time the 1st node in the
+         // cluster activates a region!! -- Brian
+         if (log.isDebugEnabled()) log.debug("Cannot fetch partial state. There are no target members specified");
+         return;
+      }
+
+      if (log.isDebugEnabled())
+         log.debug("Node " + getLocalAddress() + " fetching partial state " + stateId + " from members " + targets);
+      boolean successfulTransfer = false;
+      for (Address target : targets)
+      {
+         if (log.isDebugEnabled())
+            log.debug("Node " + getLocalAddress() + " fetching partial state " + stateId + " from member " + target);
+         messageListener.setStateSet(false);
+         successfulTransfer = channel.getState(target, stateId, configuration.getStateRetrievalTimeout());
+         if (successfulTransfer)
+         {
+            try
+            {
+               messageListener.waitForState();
+            }
+            catch (Exception transferFailed)
+            {
+               successfulTransfer = false;
+            }
+         }
+         if (log.isDebugEnabled())
+            log.debug("Node " + getLocalAddress() + " fetching partial state " + stateId + " from member " + target + (successfulTransfer ? " successful" : " failed"));
+         if (successfulTransfer) break;
+      }
+
+      if (!successfulTransfer)
+      {
+         if (log.isDebugEnabled())
+            log.debug("Node " + getLocalAddress() + " could not fetch partial state " + stateId + " from any member " + targets);
+      }
+   }
+
+   // ------------ END: Partial state transfer methods ------------
+
+   // ------------ START: Informational methods ------------
+
+   public Address getLocalAddress()
+   {
+      return channel != null ? channel.getLocalAddress() : null;
+   }
+
+   public List<Address> getMembers()
+   {
+      synchronized (members)
+      {
+         return new ArrayList<Address>(members);
+      }
+   }
+
    public boolean isCoordinator()
    {
-      return c.isCoordinator();
+      return coordinator;
    }
 
    public Address getCoordinator()
    {
-      return c.getCoordinator();
+      if (channel == null)
+      {
+         return null;
+      }
+
+      synchronized (members)
+      {
+         while (members.isEmpty())
+         {
+            log.debug("getCoordinator(): waiting on viewAccepted()");
+            try
+            {
+               members.wait();
+            }
+            catch (InterruptedException e)
+            {
+               log.error("getCoordinator(): Interrupted while waiting for members to be set", e);
+               break;
+            }
+         }
+         return members.size() > 0 ? members.get(0) : null;
+      }
    }
 
-   @SuppressWarnings("deprecation")
-   public List callRemoteMethods(List<Address> recipients, MethodCall methodCall, boolean synchronous, boolean excludeSelf, int timeout) throws Exception
+   // ------------ END: Informational methods ------------
+
+   /*----------------------- MembershipListener ------------------------*/
+
+   protected class MembershipListenerAdaptor implements ExtendedMembershipListener
    {
-      return c.callRemoteMethods(recipients, methodCall, synchronous, excludeSelf, timeout);
+
+      public void viewAccepted(View new_view)
+      {
+         Vector<Address> new_mbrs = new_view.getMembers();
+         if (log.isInfoEnabled()) log.info("viewAccepted(): " + new_view);
+         synchronized (members)
+         {
+            boolean needNotification = false;
+            if (new_mbrs != null)
+            {
+               // Determine what members have been removed
+               // and roll back any tx and break any locks
+               Vector<Address> removed = new Vector<Address>(members);
+               removed.removeAll(new_mbrs);
+               removeLocksForDeadMembers(spi.getRoot(), removed);
+
+               members.clear();
+               members.addAll(new_mbrs);
+
+               needNotification = true;
+            }
+
+            // Now that we have a view, figure out if we are the coordinator
+            coordinator = (members.size() != 0 && members.get(0).equals(getLocalAddress()));
+
+            // now notify listeners - *after* updating the coordinator. - JBCACHE-662
+            if (needNotification && notifier != null)
+            {
+               InvocationContext ctx = spi.getInvocationContext();
+               notifier.notifyViewChange(new_view, ctx);
+            }
+
+            // Wake up any threads that are waiting to know who the members
+            // are so they can figure out who the coordinator is
+            members.notifyAll();
+         }
+      }
+
+      /**
+       * Called when a member is suspected.
+       */
+      public void suspect(Address suspected_mbr)
+      {
+      }
+
+      /**
+       * Indicates that a channel has received a BLOCK event from FLUSH protocol.
+       */
+      public void block()
+      {
+         flushBlockGate.close();
+         if (log.isDebugEnabled()) log.debug("Block received at " + getLocalAddress());
+
+         remoteDelegate.block();
+
+         if (log.isDebugEnabled()) log.debug("Block processed at " + getLocalAddress());
+      }
+
+      /**
+       * Indicates that a channel has received a UNBLOCK event from FLUSH protocol.
+       */
+      public void unblock()
+      {
+         if (log.isDebugEnabled()) log.debug("UnBlock received at " + getLocalAddress());
+
+         remoteDelegate.unblock();
+
+         if (log.isDebugEnabled()) log.debug("UnBlock processed at " + getLocalAddress());
+         flushBlockGate.open();
+      }
+
    }
 
-   /**
-    * @return Returns the replication queue (if one is used), null otherwise.
-    */
-   public ReplicationQueue getReplicationQueue()
-   {
-      return c.getReplicationQueue();
-   }
-}
+   /*------------------- End of MembershipListener ----------------------*/
+}
\ No newline at end of file

Modified: core/trunk/src/main/java/org/jboss/cache/RegionManager.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/RegionManager.java	2008-01-17 15:22:36 UTC (rev 5160)
+++ core/trunk/src/main/java/org/jboss/cache/RegionManager.java	2008-01-17 22:34:36 UTC (rev 5161)
@@ -55,11 +55,13 @@
 
    protected final Set<Fqn> activationChangeNodes = Collections.synchronizedSet(new HashSet<Fqn>());
    protected Configuration configuration;
+   protected RPCManager rpcManager;
 
    @Inject
-   void injectDependencies(CacheSPI cache, Configuration configuration)
+   void injectDependencies(CacheSPI cache, Configuration configuration, RPCManager rpcManager)
    {
       this.cache = cache;
+      this.rpcManager = rpcManager;
       this.configuration = configuration;
    }
 
@@ -412,7 +414,7 @@
             }
 
             List<Address> members = cache.getMembers();
-            cache.fetchPartialState(members, subtreeRoot.getFqn());
+            rpcManager.fetchPartialState(members, subtreeRoot.getFqn());
          }
          else if (!BuddyManager.isBackupFqn(fqn))
          {
@@ -440,7 +442,7 @@
                   subtreeRoot = cache.getRoot().getChild(buddyRoot);
                   cache.getInvocationContext().getOptionOverrides().setCacheModeLocal(false);
                }
-               cache.fetchPartialState(sources, fqn, subtreeRoot.getFqn());
+               rpcManager.fetchPartialState(sources, fqn, subtreeRoot.getFqn());
             }
          }
          else

Modified: core/trunk/src/main/java/org/jboss/cache/ReplicationQueue.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/ReplicationQueue.java	2008-01-17 15:22:36 UTC (rev 5160)
+++ core/trunk/src/main/java/org/jboss/cache/ReplicationQueue.java	2008-01-17 22:34:36 UTC (rev 5161)
@@ -9,6 +9,10 @@
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.jboss.cache.config.Configuration;
+import org.jboss.cache.factories.annotations.Inject;
+import org.jboss.cache.factories.annotations.Start;
+import org.jboss.cache.factories.annotations.Stop;
 import org.jboss.cache.marshall.MethodCall;
 import org.jboss.cache.marshall.MethodCallFactory;
 import org.jboss.cache.marshall.MethodDeclarations;
@@ -31,8 +35,6 @@
 
    private static Log log = LogFactory.getLog(ReplicationQueue.class);
 
-   private CacheImpl cache = null;
-
    /**
     * We flush every 5 seconds. Inactive if -1 or 0
     */
@@ -57,71 +59,47 @@
     * The timer task, only calls flush() when executed by Timer
     */
    private MyTask task = null;
+   private RPCManager rpcManager;
+   private Configuration configuration;
+   private boolean enabled;
 
-   public ReplicationQueue()
-   {
-   }
 
-   /**
-    * Constructs a new ReplicationQueue.
-    */
-   public ReplicationQueue(CacheImpl cache, long interval, long max_elements)
+   public boolean isEnabled()
    {
-      this.cache = cache;
-      this.interval = interval;
-      this.max_elements = max_elements;
+      return enabled;
    }
 
-   /**
-    * Returns the flush interval in milliseconds.
-    */
-   public long getInterval()
+   @Inject
+   private void injectDependencies(RPCManager rpcManager, Configuration configuration)
    {
-      return interval;
+      this.rpcManager = rpcManager;
+      this.configuration = configuration;
+      enabled = configuration.isUseReplQueue() && (configuration.getBuddyReplicationConfig() == null || !configuration.getBuddyReplicationConfig().isEnabled());
    }
 
    /**
-    * Sets the flush interval in milliseconds.
-    */
-   public void setInterval(long interval)
-   {
-      this.interval = interval;
-      stop();
-      start();
-   }
-
-   /**
-    * Returns the maximum number of elements to hold.
-    * If the maximum number is reached, flushes in the calling thread.
-    */
-   public long getMax_elements()
-   {
-      return max_elements;
-   }
-
-   /**
-    * Sets the maximum number of elements to hold.
-    */
-   public void setMax_elements(long max_elements)
-   {
-      this.max_elements = max_elements;
-   }
-
-   /**
     * Starts the asynchronous flush queue.
     */
+   @Start
    public synchronized void start()
    {
-      if (interval > 0)
+      this.interval = configuration.getReplQueueInterval();
+      this.max_elements = configuration.getReplQueueMaxElements();
+      // check again
+      enabled = configuration.isUseReplQueue() && (configuration.getBuddyReplicationConfig() == null || !configuration.getBuddyReplicationConfig().isEnabled());
+      if (enabled)
       {
-         if (task == null)
-            task = new MyTask();
-         if (timer == null)
+         if (interval > 0)
          {
-            timer = new Timer(true);
-            timer.schedule(task,
-                  500, // delay before initial flush
-                  interval); // interval between flushes
+            if (task == null)
+               task = new MyTask();
+            if (timer == null)
+            {
+               timer = new Timer(true);
+               timer.schedule(task,
+                     500, // delay before initial flush
+                     interval); // interval between flushes
+            }
          }
       }
    }
@@ -129,6 +107,7 @@
    /**
     * Stops the asynchronous flush queue.
     */
+   @Stop
    public synchronized void stop()
    {
       if (task != null)
@@ -178,7 +157,7 @@
          try
          {
             // send to all live nodes in the cluster
-            cache.getRPCManager().callRemoteMethods(null, MethodCallFactory.create(MethodDeclarations.replicateAllMethod_id, l), false, true, 5000);
+            rpcManager.callRemoteMethods(null, MethodCallFactory.create(MethodDeclarations.replicateAllMethod_id, l), false, true, 5000);
          }
          catch (Throwable t)
          {

Modified: core/trunk/src/main/java/org/jboss/cache/buddyreplication/BuddyManager.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/buddyreplication/BuddyManager.java	2008-01-17 15:22:36 UTC (rev 5160)
+++ core/trunk/src/main/java/org/jboss/cache/buddyreplication/BuddyManager.java	2008-01-17 22:34:36 UTC (rev 5161)
@@ -240,7 +240,6 @@
       }
    }
 
-   // For now, this is initialised MANUALLY from CacheImpl.internalStart()
    public void init() throws CacheException
    {
       log.debug("Starting BuddyManager");

Modified: core/trunk/src/main/java/org/jboss/cache/factories/ComponentRegistry.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/factories/ComponentRegistry.java	2008-01-17 15:22:36 UTC (rev 5160)
+++ core/trunk/src/main/java/org/jboss/cache/factories/ComponentRegistry.java	2008-01-17 22:34:36 UTC (rev 5161)
@@ -107,6 +107,7 @@
       s.add(LockTableFactory.class);
       s.add(RuntimeConfigAwareFactory.class);
       s.add(TransactionManagerFactory.class);
+      s.add(ReplicationQueueFactory.class);
       return s;
    }
 

Modified: core/trunk/src/main/java/org/jboss/cache/factories/EmptyConstructorFactory.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/factories/EmptyConstructorFactory.java	2008-01-17 15:22:36 UTC (rev 5160)
+++ core/trunk/src/main/java/org/jboss/cache/factories/EmptyConstructorFactory.java	2008-01-17 22:34:36 UTC (rev 5161)
@@ -19,7 +19,9 @@
  * @author Manik Surtani (<a href="mailto:manik at jboss.org">manik at jboss.org</a>)
  * @since 2.1.0
  */
- at DefaultFactoryFor(classes = {StateTransferManager.class, TransactionTable.class, RegionManager.class, Notifier.class, CacheMessageListener.class, CacheLoaderManager.class, RemoteCacheInvocationDelegate.class, Marshaller.class, InvocationContextContainer.class})
+ at DefaultFactoryFor(classes = {StateTransferManager.class, TransactionTable.class, RegionManager.class, Notifier.class,
+      CacheMessageListener.class, CacheLoaderManager.class, RemoteCacheInvocationDelegate.class, Marshaller.class,
+      InvocationContextContainer.class})
 public class EmptyConstructorFactory extends ComponentFactory
 {
    @Override

Modified: core/trunk/src/main/java/org/jboss/cache/factories/InterceptorChainFactory.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/factories/InterceptorChainFactory.java	2008-01-17 15:22:36 UTC (rev 5160)
+++ core/trunk/src/main/java/org/jboss/cache/factories/InterceptorChainFactory.java	2008-01-17 22:34:36 UTC (rev 5161)
@@ -106,7 +106,10 @@
 
 
       call_interceptor = createInterceptor(CallInterceptor.class);
+      // load the icInterceptor first
+      first = setFirstInterceptor(invocationCtxInterceptor);
 
+
       if (isUsingBuddyReplication()) dataGravitatorInterceptor = createInterceptor(DataGravitatorInterceptor.class);
 
       lock_interceptor = createInterceptor(PessimisticLockInterceptor.class);
@@ -145,144 +148,45 @@
          }
       }
 
-      // load the icInterceptor first
-      if (first == null) first = invocationCtxInterceptor;
-
       // load the cache management interceptor next
       if (configuration.getExposeManagementStatistics())
       {
-         if (first == null)
-         {
-            first = cacheMgmtInterceptor;
-         }
-         else
-         {
-            addInterceptor(first, cacheMgmtInterceptor);
-         }
+         addInterceptor(first, cacheMgmtInterceptor);
       }
 
       // load the tx interceptor
-      if (first == null)
-      {
-         first = txInterceptor;
-      }
-      else
-      {
-         addInterceptor(first, txInterceptor);
-      }
+      addInterceptor(first, txInterceptor);
 
-      if (first == null)
-         first = notificationInterceptor;
-      else
-         addInterceptor(first, notificationInterceptor);
+      addInterceptor(first, notificationInterceptor);
 
-      if (repl_interceptor != null)
-      {
-         if (first == null)
-         {
-            first = repl_interceptor;
-         }
-         else
-         {
-            addInterceptor(first, repl_interceptor);
-         }
-      }
+      if (repl_interceptor != null) addInterceptor(first, repl_interceptor);
 
-      if (first == null)
-      {
-         first = lock_interceptor;
-      }
-      else
-      {
-         addInterceptor(first, lock_interceptor);
-      }
+      addInterceptor(first, lock_interceptor);
 
 
-      if (unlock_interceptor != null)
-      {
-         if (first == null)
-         {
-            first = unlock_interceptor;
-         }
-         else
-         {
-            addInterceptor(first, unlock_interceptor);
-         }
-      }
+      if (unlock_interceptor != null) addInterceptor(first, unlock_interceptor);
 
       if (activation_interceptor != null)
       {
-         if (first == null)
-         {
-            first = activation_interceptor;
-         }
-         else
-         {
-            addInterceptor(first, activation_interceptor);
-         }
-         if (first == null)
-         {
-            first = passivation_interceptor;
-         }
-         else
-         {
-            addInterceptor(first, passivation_interceptor);
-         }
+         addInterceptor(first, activation_interceptor);
+         addInterceptor(first, passivation_interceptor);
       }
 
       if (cache_loader_interceptor != null)
       {
-         if (first == null)
-         {
-            first = cache_loader_interceptor;
-         }
-         else
-         {
-            addInterceptor(first, cache_loader_interceptor);
-         }
-         if (first == null)
-         {
-            first = cache_store_interceptor;
-         }
-         else
-         {
-            addInterceptor(first, cache_store_interceptor);
-         }
+         addInterceptor(first, cache_loader_interceptor);
+         addInterceptor(first, cache_store_interceptor);
       }
 
-      if (dataGravitatorInterceptor != null)
-      {
-         if (first == null)
-         {
-            first = dataGravitatorInterceptor;
-         }
-         else
-         {
-            addInterceptor(first, dataGravitatorInterceptor);
-         }
-      }
+      if (dataGravitatorInterceptor != null) addInterceptor(first, dataGravitatorInterceptor);
 
       if (configuration.getEvictionConfig() != null && configuration.getEvictionConfig().isValidConfig())
       {
          eviction_interceptor = createInterceptor(EvictionInterceptor.class);
-         if (first == null)
-         {
-            first = eviction_interceptor;
-         }
-         else
-         {
-            addInterceptor(first, eviction_interceptor);
-         }
+         addInterceptor(first, eviction_interceptor);
       }
 
-      if (first == null)
-      {
-         first = call_interceptor;
-      }
-      else
-      {
-         addInterceptor(first, call_interceptor);
-      }
+      addInterceptor(first, call_interceptor);
 
       return setLastInterceptorPointer(first, call_interceptor);
    }
@@ -306,6 +210,10 @@
       Interceptor invocationCtxInterceptor = createInterceptor(InvocationContextInterceptor.class);
       Interceptor notificationInterceptor = createInterceptor(NotificationInterceptor.class);
 
+      // load the icInterceptor first
+      first = setFirstInterceptor(invocationCtxInterceptor);
+
+
       if (isUsingCacheLoaders())
       {
          if (configuration.getCacheLoaderConfig().isPassivation())
@@ -353,193 +261,81 @@
          evictionInterceptor = createInterceptor(EvictionInterceptor.class);
       }
 
-      if (first == null) first = invocationCtxInterceptor;
-
       if (configuration.getExposeManagementStatistics())
       {
          cacheMgmtInterceptor = createInterceptor(CacheMgmtInterceptor.class);
-         if (first == null)
-         {
-            first = cacheMgmtInterceptor;
-         }
-         else
-         {
-            addInterceptor(first, cacheMgmtInterceptor);
-         }
+         addInterceptor(first, cacheMgmtInterceptor);
       }
 
       if (txInterceptor != null)
       {
-         if (first == null)
-         {
-            first = txInterceptor;
-         }
-         else
-         {
-            addInterceptor(first, txInterceptor);
-         }
+         addInterceptor(first, txInterceptor);
       }
 
-      if (first == null)
-         first = notificationInterceptor;
-      else
-         addInterceptor(first, notificationInterceptor);
+      addInterceptor(first, notificationInterceptor);
 
-      if (first == null)
-      {
-         first = replicationInterceptor;
-      }
-      else
-      {
-         addInterceptor(first, replicationInterceptor);
-      }
+      addInterceptor(first, replicationInterceptor);
 
       if (passivationInterceptor != null && !configuration.getCacheLoaderConfig().isFetchPersistentState())
       {
-         if (first == null)
-         {
-            first = passivationInterceptor;
-         }
-         else
-         {
-            addInterceptor(first, passivationInterceptor);
-         }
+         addInterceptor(first, passivationInterceptor);
       }
 
       // add the cache store interceptor here
       if (cacheStoreInterceptor != null && !configuration.getCacheLoaderConfig().isFetchPersistentState())
       {
-         if (first == null)
-         {
-            first = cacheStoreInterceptor;
-         }
-         else
-         {
-            addInterceptor(first, cacheStoreInterceptor);
-         }
+         addInterceptor(first, cacheStoreInterceptor);
       }
 
       // cache loader interceptor is only invoked if we are ready to write to the actual tree cache
       if (activationInterceptor != null)
       {
-         if (first == null)
-         {
-            first = activationInterceptor;
-         }
-         else
-         {
-            addInterceptor(first, activationInterceptor);
-         }
+         addInterceptor(first, activationInterceptor);
 
          if (configuration.getCacheLoaderConfig().isFetchPersistentState())
          {
-            if (first == null)
-            {
-               first = passivationInterceptor;
-            }
-            else
-            {
-               addInterceptor(first, passivationInterceptor);
-            }
+            addInterceptor(first, passivationInterceptor);
          }
       }
 
       if (cacheLoaderInterceptor != null)
       {
-         if (first == null)
-         {
-            first = cacheLoaderInterceptor;
-         }
-         else
-         {
-            addInterceptor(first, cacheLoaderInterceptor);
-         }
+         addInterceptor(first, cacheLoaderInterceptor);
 
          if (configuration.getCacheLoaderConfig().isFetchPersistentState())
          {
-            if (first == null)
-            {
-               first = cacheStoreInterceptor;
-            }
-            else
-            {
-               addInterceptor(first, cacheStoreInterceptor);
-            }
+            addInterceptor(first, cacheStoreInterceptor);
          }
       }
 
       if (dataGravitatorInterceptor != null)
       {
-         if (first == null)
-         {
-            first = dataGravitatorInterceptor;
-         }
-         else
-         {
-            addInterceptor(first, dataGravitatorInterceptor);
-         }
+         addInterceptor(first, dataGravitatorInterceptor);
       }
 
 
-      if (first == null)
-      {
-         first = lockInterceptor;
-      }
-      else
-      {
-         addInterceptor(first, lockInterceptor);
-      }
+      addInterceptor(first, lockInterceptor);
 
-      if (first == null)
-      {
-         first = validationInterceptor;
-      }
-      else
-      {
-         addInterceptor(first, validationInterceptor);
-      }
+      addInterceptor(first, validationInterceptor);
+      addInterceptor(first, createIfNotExistsInterceptor);
 
-      if (first == null)
-      {
-         first = createIfNotExistsInterceptor;
-      }
-      else
-      {
-         addInterceptor(first, createIfNotExistsInterceptor);
-      }
-
       // eviction interceptor to come before the optimistic node interceptor
-      if (first == null)
-      {
-         first = evictionInterceptor;
-      }
-      else
-      {
-         addInterceptor(first, evictionInterceptor);
-      }
+      addInterceptor(first, evictionInterceptor);
 
-      if (first == null)
-      {
-         first = nodeInterceptor;
-      }
-      else
-      {
-         addInterceptor(first, nodeInterceptor);
-      }
+      addInterceptor(first, nodeInterceptor);
 
+      addInterceptor(first, invokerInterceptor);
 
-      if (first == null)
-      {
-         first = invokerInterceptor;
-      }
-      else
-      {
-         addInterceptor(first, invokerInterceptor);
-      }
-
       return setLastInterceptorPointer(first, invokerInterceptor);
    }
 
+   public Interceptor setFirstInterceptor(Interceptor i)
+   {
+      componentRegistry.registerComponent(Interceptor.class.getName(), i, Interceptor.class);
+      return i;
+   }
+
+
    public static List<Interceptor> asList(Interceptor interceptor)
    {
       if (interceptor == null)

Added: core/trunk/src/main/java/org/jboss/cache/factories/ReplicationQueueFactory.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/factories/ReplicationQueueFactory.java	                        (rev 0)
+++ core/trunk/src/main/java/org/jboss/cache/factories/ReplicationQueueFactory.java	2008-01-17 22:34:36 UTC (rev 5161)
@@ -0,0 +1,29 @@
+package org.jboss.cache.factories;
+
+import org.jboss.cache.ReplicationQueue;
+import org.jboss.cache.config.Configuration;
+import org.jboss.cache.factories.annotations.DefaultFactoryFor;
+
+/**
+ * RPCManager factory
+ *
+ * @author Manik Surtani (<a href="mailto:manik at jboss.org">manik at jboss.org</a>)
+ * @since 2.1.0
+ */
+ at DefaultFactoryFor(classes = ReplicationQueue.class)
+public class ReplicationQueueFactory extends EmptyConstructorFactory
+{
+   @Override
+   public <T> T construct(String componentName, Class<T> componentType)
+   {
+      if ((configuration.getCacheMode() == Configuration.CacheMode.REPL_ASYNC || configuration.getCacheMode() == Configuration.CacheMode.INVALIDATION_ASYNC)
+            && configuration.isUseReplQueue())
+      {
+         return super.construct(componentName, componentType);
+      }
+      else
+      {
+         return null;
+      }
+   }
+}
\ No newline at end of file

Modified: core/trunk/src/main/java/org/jboss/cache/interceptors/BaseRpcInterceptor.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/interceptors/BaseRpcInterceptor.java	2008-01-17 15:22:36 UTC (rev 5160)
+++ core/trunk/src/main/java/org/jboss/cache/interceptors/BaseRpcInterceptor.java	2008-01-17 22:34:36 UTC (rev 5161)
@@ -6,6 +6,7 @@
 import org.jboss.cache.CacheSPI;
 import org.jboss.cache.InvocationContext;
 import org.jboss.cache.RPCManager;
+import org.jboss.cache.ReplicationQueue;
 import org.jboss.cache.buddyreplication.BuddyManager;
 import org.jboss.cache.config.Configuration.CacheMode;
 import org.jboss.cache.config.Option;
@@ -32,11 +33,13 @@
    private RPCManager rpcManager;
    private boolean usingBuddyReplication;
    protected boolean defaultSynchronous;
+   private ReplicationQueue replicationQueue;
 
    @Inject
-   private void injectComponents(RPCManager rpcManager, BuddyManager buddyManager)
+   private void injectComponents(RPCManager rpcManager, BuddyManager buddyManager, ReplicationQueue replicationQueue)
    {
       this.rpcManager = rpcManager;
+      this.replicationQueue = replicationQueue;
       this.buddyManager = buddyManager;
       usingBuddyReplication = buddyManager != null && buddyManager.isEnabled();
    }
@@ -95,7 +98,7 @@
             else if (te.isForceSyncReplication()) sync = true;
          }
       }
-      if (!sync && rpcManager.getReplicationQueue() != null && !usingBuddyReplication)
+      if (!sync && replicationQueue != null && !usingBuddyReplication)
       {
          putCallOnAsyncReplicationQueue(call);
       }
@@ -130,7 +133,7 @@
    protected void putCallOnAsyncReplicationQueue(MethodCall call)
    {
       if (log.isDebugEnabled()) log.debug("Putting call " + call + " on the replication queue.");
-      rpcManager.getReplicationQueue().add(MethodCallFactory.create(MethodDeclarations.replicateMethod_id, call));
+      replicationQueue.add(MethodCallFactory.create(MethodDeclarations.replicateMethod_id, call));
    }
 
    //todo info expt for this is InvocationContext, move method there

Modified: core/trunk/src/main/java/org/jboss/cache/interceptors/InvocationContextInterceptor.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/interceptors/InvocationContextInterceptor.java	2008-01-17 15:22:36 UTC (rev 5160)
+++ core/trunk/src/main/java/org/jboss/cache/interceptors/InvocationContextInterceptor.java	2008-01-17 22:34:36 UTC (rev 5161)
@@ -37,7 +37,7 @@
       boolean resumeSuspended = false;
 
       if (trace)
-         log.trace("Invoked on cache instance [" + cache.getLocalAddress() + "] and InvocationContext [" + ctx + "]");
+         log.trace("Invoked with InvocationContext [" + ctx + "]");
 
       if (MethodDeclarations.isBlockUnblockMethod(call.getMethodId())) return nextInterceptor(ctx);
 

Modified: core/trunk/src/main/java/org/jboss/cache/invocation/CacheInvocationDelegate.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/invocation/CacheInvocationDelegate.java	2008-01-17 15:22:36 UTC (rev 5160)
+++ core/trunk/src/main/java/org/jboss/cache/invocation/CacheInvocationDelegate.java	2008-01-17 22:34:36 UTC (rev 5161)
@@ -202,16 +202,6 @@
       return cache.getInternalFqns();
    }
 
-   public void fetchPartialState(List<Address> members, Fqn subtreeRoot) throws Exception
-   {
-      cache.fetchPartialState(members, subtreeRoot);
-   }
-
-   public void fetchPartialState(List<Address> members, Fqn subtreeRoot, Fqn integrationPoint) throws Exception
-   {
-      cache.fetchPartialState(members, subtreeRoot, integrationPoint);
-   }
-
    public int getNumberOfLocksHeld()
    {
       return cache.getNumberOfLocksHeld();
@@ -307,12 +297,12 @@
 
    public Address getLocalAddress()
    {
-      return cache.getLocalAddress();
+      return rpcManager.getLocalAddress();
    }
 
    public List<Address> getMembers()
    {
-      return cache.getMembers();
+      return rpcManager.getMembers();
    }
 
    public String getVersion()

Modified: core/trunk/src/main/java/org/jboss/cache/lock/LockUtil.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/lock/LockUtil.java	2008-01-17 15:22:36 UTC (rev 5160)
+++ core/trunk/src/main/java/org/jboss/cache/lock/LockUtil.java	2008-01-17 22:34:36 UTC (rev 5161)
@@ -2,9 +2,7 @@
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.jboss.cache.CacheImpl;
 import org.jboss.cache.Node;
-import org.jboss.cache.NodeSPI;
 import org.jboss.cache.statetransfer.StateTransferManager;
 import org.jboss.cache.transaction.GlobalTransaction;
 import org.jboss.cache.transaction.TransactionTable;
@@ -12,7 +10,6 @@
 import javax.transaction.Status;
 import javax.transaction.Transaction;
 import javax.transaction.TransactionManager;
-import java.util.Iterator;
 
 public abstract class LockUtil
 {
@@ -26,11 +23,8 @@
    public static boolean breakTransactionLock(NodeLock lock,
                                               GlobalTransaction gtx,
                                               boolean localTx,
-                                              CacheImpl cache)
+                                              TransactionTable tx_table, TransactionManager tm)
    {
-      TransactionTable tx_table = cache.getTransactionTable();
-      TransactionManager tm = cache.getTransactionManager();
-
       boolean broken = false;
       int tryCount = 0;
       int lastStatus = TransactionLockStatus.STATUS_BROKEN;
@@ -55,89 +49,6 @@
    }
 
    /**
-    * Forcibly acquire a read lock on the given node for the given owner,
-    * breaking any existing locks that prevent the read lock.  If the
-    * existing lock is held by a GlobalTransaction, breaking the lock may
-    * result in a rollback of the transaction.
-    *
-    * @param node         the node
-    * @param newOwner     the new owner (usually a Thread or GlobalTransaction)
-    * @param lockChildren <code>true</code> if this method should be recursively
-    *                     applied to <code>node</code>'s children.
-    */
-   public static void forceAcquireLock(NodeSPI<?, ?> node,
-                                       Object newOwner,
-                                       CacheImpl cache,
-                                       boolean lockChildren)
-   {
-
-      NodeLock lock = node.getLock();
-      boolean acquired = lock.isOwner(newOwner);
-
-      if (!acquired && log.isDebugEnabled())
-      {
-         log.debug("Force acquiring lock on node " + node.getFqn());
-      }
-
-      TransactionTable tx_table = cache.getTransactionTable();
-      TransactionManager tm = cache.getTransactionManager();
-      Object localAddress = cache.getLocalAddress();
-      boolean serializable = cache.getConfiguration().getIsolationLevel() == IsolationLevel.SERIALIZABLE;
-
-      while (!acquired)
-      {
-         Object curOwner = null;
-         boolean attempted = false;
-
-         // Keep breaking write locks until we acquire a read lock
-         // or there are no more write locks
-         while (!acquired && ((curOwner = lock.getWriterOwner()) != null))
-         {
-            acquired = acquireLockFromOwner(node, lock, curOwner, newOwner, tx_table, tm, localAddress);
-            attempted = true;
-         }
-
-         // If no more write locks, but we haven't acquired, see if we
-         // need to break read locks as well.
-         if (!acquired && serializable)
-         {
-            Iterator it = lock.getReaderOwners().iterator();
-            if (it.hasNext())
-            {
-               curOwner = it.next();
-               acquired = acquireLockFromOwner(node, lock, curOwner, newOwner, tx_table, tm, localAddress);
-               attempted = true;
-               // Don't keep iterating due to the risk of
-               // ConcurrentModificationException if readers are removed
-               // Just go back through our outer loop to get the nextInterceptor one
-            }
-         }
-
-         if (!acquired && !attempted)
-         {
-            // We only try to acquire above if someone else has the lock.
-            // Seems no one is holding a lock and it's there for the taking.
-            try
-            {
-               acquired = lock.acquire(newOwner, 1, NodeLock.LockType.READ);
-            }
-            catch (Exception ignored)
-            {
-            }
-         }
-      }
-
-      // Recursively unlock children
-      if (lockChildren)
-      {
-         for (NodeSPI n : node.getChildrenDirect())
-         {
-            forceAcquireLock(n, newOwner, cache, true);
-         }
-      }
-   }
-
-   /**
     * Attempts to acquire a read lock on <code>node</code> for
     * <code>newOwner</code>, if necessary breaking locks held by
     * <code>curOwner</code>.
@@ -158,7 +69,7 @@
       if (log.isTraceEnabled())
       {
          log.trace("Attempting to acquire lock for node " + node.getFqn() +
-                   " from owner " + curOwner);
+               " from owner " + curOwner);
       }
 
       boolean acquired = false;
@@ -192,7 +103,7 @@
          if (broken && log.isTraceEnabled())
          {
             log.trace("Broke lock for node " + node.getFqn() +
-                      " held by owner " + curOwner);
+                  " held by owner " + curOwner);
          }
 
          try
@@ -266,7 +177,7 @@
                      if (log.isTraceEnabled())
                      {
                         log.trace("Attempting to break transaction lock held " +
-                                  " by " + gtx + " by rolling back local tx");
+                              " by " + gtx + " by rolling back local tx");
                      }
                      // This thread has to join the tx
                      tm.resume(tx);
@@ -315,8 +226,8 @@
                      if (log.isTraceEnabled())
                      {
                         log.trace("Attempting to break transaction lock held " +
-                                  "by " + gtx + " by marking local tx as " +
-                                  "rollback-only");
+                              "by " + gtx + " by marking local tx as " +
+                              "rollback-only");
                      }
                      tx.setRollbackOnly();
                      break;
@@ -349,7 +260,7 @@
          // Race condition; gtx was cleared from tx_table.
          // Just double check if gtx still holds a lock
          if (gtx == lock.getWriterOwner()
-             || lock.getReaderOwners().contains(gtx))
+               || lock.getReaderOwners().contains(gtx))
          {
             // TODO should we throw an exception??
             lock.release(gtx);

Modified: core/trunk/src/test/java/org/jboss/cache/api/CacheSPITest.java
===================================================================
--- core/trunk/src/test/java/org/jboss/cache/api/CacheSPITest.java	2008-01-17 15:22:36 UTC (rev 5160)
+++ core/trunk/src/test/java/org/jboss/cache/api/CacheSPITest.java	2008-01-17 22:34:36 UTC (rev 5161)
@@ -89,8 +89,8 @@
       Configuration conf1 = UnitTestCacheConfigurationFactory.createConfiguration(CacheMode.REPL_SYNC);
       Configuration conf2 = UnitTestCacheConfigurationFactory.createConfiguration(CacheMode.REPL_SYNC);
 
-      cache1 = (CacheSPI<Object, Object>) new DefaultCacheFactory().createCache(conf1, false);
-      cache2 = (CacheSPI<Object, Object>) new DefaultCacheFactory().createCache(conf2, false);
+      cache1 = (CacheSPI<Object, Object>) new DefaultCacheFactory<Object, Object>().createCache(conf1, false);
+      cache2 = (CacheSPI<Object, Object>) new DefaultCacheFactory<Object, Object>().createCache(conf2, false);
 
       cache1.start();
       assertTrue("Cache1 is coordinator", cache1.getRPCManager().isCoordinator());

Modified: core/trunk/src/test/java/org/jboss/cache/api/NodeAPITest.java
===================================================================
--- core/trunk/src/test/java/org/jboss/cache/api/NodeAPITest.java	2008-01-17 15:22:36 UTC (rev 5160)
+++ core/trunk/src/test/java/org/jboss/cache/api/NodeAPITest.java	2008-01-17 22:34:36 UTC (rev 5161)
@@ -28,7 +28,7 @@
  * @author <a href="mailto:manik at jboss.org">Manik Surtani</a>
  * @since 2.0.0
  */
- at Test(groups = {"functional"})
+ at Test(groups = "functional")
 public class NodeAPITest
 {
    private Node<Object, Object> rootNode;

Modified: core/trunk/src/test/java/org/jboss/cache/api/pfer/PutForExternalReadTestBase.java
===================================================================
--- core/trunk/src/test/java/org/jboss/cache/api/pfer/PutForExternalReadTestBase.java	2008-01-17 15:22:36 UTC (rev 5160)
+++ core/trunk/src/test/java/org/jboss/cache/api/pfer/PutForExternalReadTestBase.java	2008-01-17 22:34:36 UTC (rev 5161)
@@ -9,7 +9,6 @@
 import org.jboss.cache.Fqn;
 import org.jboss.cache.NodeSPI;
 import org.jboss.cache.RPCManager;
-import org.jboss.cache.RPCManagerImpl;
 import org.jboss.cache.config.Configuration;
 import org.jboss.cache.config.Configuration.CacheMode;
 import org.jboss.cache.factories.UnitTestCacheConfigurationFactory;
@@ -166,11 +165,11 @@
 
    public void testAsyncForce() throws Exception
    {
-      RPCManager rpcManager = EasyMock.createMock(RPCManager.class);
+      RPCManager rpcManager = EasyMock.createNiceMock(RPCManager.class);
       RPCManager originalRpcManager = cache1.getConfiguration().getRuntimeConfig().getRPCManager();
-
+      List<Address> memberList = originalRpcManager.getMembers();
+      expect(rpcManager.getMembers()).andReturn(memberList).anyTimes();
       // inject a mock RPC manager so that we can test whether calls made are sync or async.
-      //cache1.getConfiguration().getRuntimeConfig().setRPCManager(rpcManager);
       TestingUtil.extractComponentRegistry(cache1).registerComponent(RPCManager.class.getName(), rpcManager, RPCManager.class);
 
       // invalidations will not trigger any rpc call sfor PFER
@@ -178,7 +177,6 @@
       {
          // specify what we expect called on the mock Rpc Manager.  For params we don't care about, just use ANYTHING.
          // setting the mock object to expect the "sync" param to be false.
-         expect(rpcManager.getReplicationQueue()).andReturn(null);
          expect(rpcManager.callRemoteMethods(anyAddresses(), (MethodCall) anyObject(), eq(false), anyBoolean(), anyInt())).andReturn(null);
       }
 
@@ -233,52 +231,58 @@
          assertEquals("parent fqn tx should have completed", value, cache2.get(parentFqn, key));
    }
 
-   public void testExceptionSuppression()
+   public void testExceptionSuppression() throws Exception
    {
-      RPCManager barfingRpcManager = new RPCManagerImpl()
+      RPCManager barfingRpcManager = EasyMock.createNiceMock(RPCManager.class);
+      RPCManager originalRpcManager = cache1.getConfiguration().getRuntimeConfig().getRPCManager();
+      try
       {
-         @Override
-         public List callRemoteMethods(List<Address> recipients, MethodCall method, boolean synchronous, boolean excludeSelf, int timeout)
-         {
-            throw new RuntimeException("Barf");
-         }
-      };
+         List<Address> memberList = originalRpcManager.getMembers();
+         expect(barfingRpcManager.getMembers()).andReturn(memberList).anyTimes();
+         expect(barfingRpcManager.getLocalAddress()).andReturn(originalRpcManager.getLocalAddress()).anyTimes();
+         expect(barfingRpcManager.callRemoteMethods(anyAddresses(), (MethodCall) anyObject(), anyBoolean(), anyBoolean(), anyInt())).andThrow(new RuntimeException("Barf!")).anyTimes();
+         replay(barfingRpcManager);
 
-      TestingUtil.extractComponentRegistry(cache1).registerComponent(RPCManager.class.getName(), barfingRpcManager, RPCManager.class);
-      cache1.getConfiguration().getRuntimeConfig().setRPCManager(barfingRpcManager);
+         TestingUtil.extractComponentRegistry(cache1).registerComponent(RPCManager.class.getName(), barfingRpcManager, RPCManager.class);
+         cache1.getConfiguration().getRuntimeConfig().setRPCManager(barfingRpcManager);
 
-      try
-      {
-         cache1.put(fqn, key, value);
-         if (!optimistic) fail("Should have barfed");
-      }
-      catch (RuntimeException re)
-      {
-      }
-
-      if (optimistic && !isUsingInvalidation())
-      {
-         // proves that the put did, in fact, barf.  Doesn't work for invalidations since the inability to invalidate will not cause a rollback.
-         assertNull(cache1.get(fqn, key));
-      }
-      else
-      {
-         // clean up any indeterminate state left over
          try
          {
-            cache1.removeNode(fqn);
-            // as above, the inability to invalidate will not cause an exception
-            if (!isUsingInvalidation()) fail("Should have barfed");
+            cache1.put(fqn, key, value);
+            if (!optimistic) fail("Should have barfed");
          }
          catch (RuntimeException re)
          {
          }
-      }
 
-      assertNull("Should have cleaned up", cache1.get(fqn, key));
+         if (optimistic && !isUsingInvalidation())
+         {
+            // proves that the put did, in fact, barf.  Doesn't work for invalidations since the inability to invalidate will not cause a rollback.
+            assertNull(cache1.get(fqn, key));
+         }
+         else
+         {
+            // clean up any indeterminate state left over
+            try
+            {
+               cache1.removeNode(fqn);
+               // as above, the inability to invalidate will not cause an exception
+               if (!isUsingInvalidation()) fail("Should have barfed");
+            }
+            catch (RuntimeException re)
+            {
+            }
+         }
 
-      // should not barf
-      cache1.putForExternalRead(fqn, key, value);
+         assertNull("Should have cleaned up", cache1.get(fqn, key));
+
+         // should not barf
+         cache1.putForExternalRead(fqn, key, value);
+      }
+      finally
+      {
+         TestingUtil.extractComponentRegistry(cache1).registerComponent(RPCManager.class.getName(), originalRpcManager, RPCManager.class);
+      }
    }
 
    public void testBasicPropagation() throws Exception

Modified: core/trunk/src/test/java/org/jboss/cache/misc/TestingUtil.java
===================================================================
--- core/trunk/src/test/java/org/jboss/cache/misc/TestingUtil.java	2008-01-17 15:22:36 UTC (rev 5160)
+++ core/trunk/src/test/java/org/jboss/cache/misc/TestingUtil.java	2008-01-17 22:34:36 UTC (rev 5161)
@@ -283,7 +283,7 @@
     */
    public static boolean isCacheViewComplete(CacheImpl cache, int memberCount)
    {
-      List members = cache.getMembers();
+      List members = cache.getRPCManager().getMembers();
       if (members == null || memberCount > members.size())
       {
          return false;
@@ -292,7 +292,7 @@
       {
          // This is an exceptional condition
          StringBuffer sb = new StringBuffer("Cache at address ");
-         sb.append(cache.getLocalAddress());
+         sb.append(cache.getRPCManager().getLocalAddress());
          sb.append(" had ");
          sb.append(members.size());
          sb.append(" members; expecting ");




More information about the jbosscache-commits mailing list