[jbosscache-commits] JBoss Cache SVN: r7905 - in core/trunk/src/main/java/org/jboss/cache: factories and 3 other directories.

jbosscache-commits at lists.jboss.org jbosscache-commits at lists.jboss.org
Mon Mar 16 16:37:52 EDT 2009


Author: manik.surtani at jboss.com
Date: 2009-03-16 16:37:52 -0400 (Mon, 16 Mar 2009)
New Revision: 7905

Modified:
   core/trunk/src/main/java/org/jboss/cache/RPCManagerImpl.java
   core/trunk/src/main/java/org/jboss/cache/factories/ComponentRegistry.java
   core/trunk/src/main/java/org/jboss/cache/jmx/JmxRegistrationManager.java
   core/trunk/src/main/java/org/jboss/cache/marshall/CommandAwareRpcDispatcher.java
   core/trunk/src/main/java/org/jboss/cache/statetransfer/DefaultStateTransferIntegrator.java
   core/trunk/src/main/java/org/jboss/cache/statetransfer/StateProviderBusyException.java
Log:
Re-implemented NBST using RPC instead of a partial FLUSH

Modified: core/trunk/src/main/java/org/jboss/cache/RPCManagerImpl.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/RPCManagerImpl.java	2009-03-16 20:37:26 UTC (rev 7904)
+++ core/trunk/src/main/java/org/jboss/cache/RPCManagerImpl.java	2009-03-16 20:37:52 UTC (rev 7905)
@@ -63,7 +63,6 @@
 import org.jgroups.blocks.GroupRequest;
 import org.jgroups.blocks.RspFilter;
 import org.jgroups.protocols.TP;
-import org.jgroups.protocols.pbcast.FLUSH;
 import org.jgroups.protocols.pbcast.STREAMING_STATE_TRANSFER;
 import org.jgroups.stack.ProtocolStack;
 import org.jgroups.util.Rsp;
@@ -156,11 +155,12 @@
 
    public abstract class FlushTracker
    {
-      private final ReclosableLatch flushBlockGate = new ReclosableLatch();
+      // closed whenever a FLUSH is in progress.  Open by default.
+      final ReclosableLatch flushBlockGate = new ReclosableLatch(true);
       private final AtomicInteger flushCompletionCount = new AtomicInteger();
+      // closed whenever a FLUSH is NOT in progress.  Closed by default.
+      final ReclosableLatch flushWaitGate = new ReclosableLatch(false);
 
-      private final ReclosableLatch flushWaitGate = new ReclosableLatch(false);
-
       public void block()
       {
          flushBlockGate.close();
@@ -179,46 +179,27 @@
          return flushCompletionCount.get();
       }
 
-      public abstract void lockProcessingLock();
+      public abstract void lockProcessingLock() throws InterruptedException;
+
       public abstract void unlockProcessingLock();
-      public abstract void lockSuspendProcessingLock();
+
+      public abstract void lockSuspendProcessingLock() throws InterruptedException;
+
       public abstract void unlockSuspendProcessingLock();
 
-      public void waitForFlushCompletion(long timeout)
+      public void waitForFlushCompletion(long timeout) throws InterruptedException
       {
-         for (; ;)
+         if (channel.flushSupported() && !flushBlockGate.await(timeout, TimeUnit.MILLISECONDS))
          {
-            try
-            {
-               if (channel.flushSupported() && !flushBlockGate.await(timeout, TimeUnit.MILLISECONDS))
-               {
-                  throw new TimeoutException("State retrieval timed out waiting for flush to unblock. (timeout = " + CachePrinter.prettyPrint(timeout) + ")");
-               }
-               return;
-            }
-            catch (InterruptedException e)
-            {
-               Thread.currentThread().interrupt();
-            }
+            throw new TimeoutException("State retrieval timed out waiting for flush to unblock. (timeout = " + CachePrinter.prettyPrint(timeout) + ")");
          }
       }
 
-      public void waitForFlushStart(long timeout)
+      public void waitForFlushStart(long timeout) throws InterruptedException
       {
-         for (; ;)
+         if (channel.flushSupported() && !flushWaitGate.await(timeout, TimeUnit.MILLISECONDS))
          {
-            try
-            {
-               if (channel.flushSupported() && !flushWaitGate.await(timeout, TimeUnit.MILLISECONDS))
-               {
-                  throw new TimeoutException("State retrieval timed out waiting for flush to block. (timeout = " + CachePrinter.prettyPrint(timeout) + " )");
-               }
-               return;
-            }
-            catch (InterruptedException e)
-            {
-               Thread.currentThread().interrupt();
-            }
+            throw new TimeoutException("State retrieval timed out waiting for flush to block. (timeout = " + CachePrinter.prettyPrint(timeout) + " )");
          }
       }
    }
@@ -247,21 +228,11 @@
    {
       private final ReentrantReadWriteLock coordinationLock = new ReentrantReadWriteLock();
 
-      public void lockProcessingLock()
+      public void lockProcessingLock() throws InterruptedException
       {
-         for (;;)
+         if (!coordinationLock.readLock().tryLock(configuration.getStateRetrievalTimeout(), TimeUnit.MILLISECONDS))
          {
-            try
-            {
-               if (!coordinationLock.readLock().tryLock(configuration.getStateRetrievalTimeout(), TimeUnit.MILLISECONDS))
-                  throw new TimeoutException("Could not obtain processing lock");
-
-               return;
-            }
-            catch (InterruptedException e)
-            {
-               Thread.currentThread().interrupt();
-            }
+            throw new TimeoutException("Could not obtain processing lock");
          }
       }
 
@@ -270,30 +241,38 @@
          coordinationLock.readLock().unlock();
       }
 
-      public void lockSuspendProcessingLock()
+      public void lockSuspendProcessingLock() throws InterruptedException
       {
-         for (;;)
+         if (!coordinationLock.writeLock().tryLock(configuration.getStateRetrievalTimeout(), TimeUnit.MILLISECONDS))
          {
-            try
-            {
-               if (!coordinationLock.writeLock().tryLock(configuration.getStateRetrievalTimeout(), TimeUnit.MILLISECONDS))
-                  throw new TimeoutException("Could not obtain processing lock");
-
-               return;
-            }
-            catch (InterruptedException e)
-            {
-               Thread.currentThread().interrupt();
-            }
+            throw new TimeoutException("Could not obtain processing lock");
          }
       }
 
       public void unlockSuspendProcessingLock()
       {
          if (coordinationLock.isWriteLockedByCurrentThread())
+         {
             coordinationLock.writeLock().unlock();
+         }
       }
 
+      public void waitForFlushCompletion(long timeout) throws InterruptedException
+      {
+         if (!flushBlockGate.await(timeout, TimeUnit.MILLISECONDS))
+         {
+            throw new TimeoutException("State retrieval timed out waiting for flush to unblock. (timeout = " + CachePrinter.prettyPrint(timeout) + ")");
+         }
+      }
+
+      public void waitForFlushStart(long timeout) throws InterruptedException
+      {
+         if (!flushWaitGate.await(timeout, TimeUnit.MILLISECONDS))
+         {
+            throw new TimeoutException("State retrieval timed out waiting for flush to block. (timeout = " + CachePrinter.prettyPrint(timeout) + " )");
+         }
+      }
+
    }
 
    // ------------ START: Lifecycle methods ------------
@@ -332,7 +311,7 @@
                   // Allow commands to be ACKed during state transfer
                   if (nonBlocking)
                   {
-                     componentRegistry.setBlockInStarting(false);
+                     componentRegistry.setStatusCheckNecessary(false);
                   }
                   channel.connect(configuration.getClusterName());
                   if (log.isInfoEnabled()) log.info("Cache local address is " + getLocalAddress());
@@ -376,17 +355,19 @@
                }
             }
 
-            if (log.isInfoEnabled()) log.info("state was retrieved successfully (in " + CachePrinter.prettyPrint((System.currentTimeMillis() - start)) + ")");
+            if (log.isInfoEnabled())
+            {
+               log.info("state was retrieved successfully (in " + CachePrinter.prettyPrint((System.currentTimeMillis() - start)) + ")");
+            }
       }
    }
 
    private void sanityCheckJGroupsStack(JChannel channel)
    {
       if (channel.getProtocolStack().findProtocol(STREAMING_STATE_TRANSFER.class) == null)
+      {
          throw new ConfigurationException("JGroups channel does not use STREAMING_STATE_TRANSFER!  This is a requirement for non-blocking state transfer.  Either make sure your JGroups configuration uses STREAMING_STATE_TRANSFER or disable non-blocking state transfer.");
-
-      if (channel.getProtocolStack().findProtocol(FLUSH.class) == null)
-         throw new ConfigurationException("JGroups channel does not use FLUSH!  This is a requirement for non-blocking state transfer.  Either make sure your JGroups configuration uses FLUSH or disable non-blocking state transfer.");
+      }
    }
 
    private void sanityCheckConfiguration(boolean nonBlockingStateTransfer, boolean fetchStateOnStart)
@@ -394,12 +375,16 @@
       if (isInLocalMode || !nonBlockingStateTransfer || !fetchStateOnStart) return; // don't care about these cases!
 
       if (configuration.getNodeLockingScheme() != NodeLockingScheme.MVCC)
+      {
          throw new ConfigurationException("Non-blocking state transfer is only supported with the MVCC node locking scheme.  Please change your node locking scheme to MVCC or disable non-blocking state transfer.");
+      }
 
       if (isUsingBuddyReplication)
+      {
          throw new ConfigurationException("Non-blocking state transfer cannot be used with buddy replication at this time.  Please disable either buddy replication or non-blocking state transfer.");
+      }
    }
-   
+
    private void startNonBlockStateTransfer(List<Address> members)
    {
       if (members.size() < 2)
@@ -440,7 +425,10 @@
          if (!success)
          {
             wait <<= 2;
-            if (log.isWarnEnabled()) log.warn("Could not find available peer for state, backing off and retrying after "+wait+" millis.  Retries left: " + (numRetries -1 -i) );
+            if (log.isWarnEnabled())
+            {
+               log.warn("Could not find available peer for state, backing off and retrying after " + wait + " millis.  Retries left: " + (numRetries - 1 - i));
+            }
 
             try
             {
@@ -460,7 +448,7 @@
          throw new CacheException("Unable to fetch state on startup");
       }
 
-      componentRegistry.setBlockInStarting(true);
+      componentRegistry.setStatusCheckNecessary(true);
    }
 
    public void disconnect()
@@ -522,7 +510,10 @@
          {
             ReflectionUtil.setValue(configuration, "accessible", true);
             configuration.setUsingMultiplexer(true);
-            if (log.isDebugEnabled()) log.debug("Created Multiplexer Channel for cache cluster " + configuration.getClusterName() + " using stack " + configuration.getMultiplexerStack());
+            if (log.isDebugEnabled())
+            {
+               log.debug("Created Multiplexer Channel for cache cluster " + configuration.getClusterName() + " using stack " + configuration.getMultiplexerStack());
+            }
          }
          else
          {
@@ -697,8 +688,14 @@
          if (rpcDispatcher == null) return null;
          int modeToUse = mode;
          int preferredMode;
-         if ((preferredMode = spi.getInvocationContext().getOptionOverrides().getGroupRequestMode()) > -1) modeToUse = preferredMode;
-         if (trace) log.trace("callRemoteMethods(): valid members are " + recipients + " methods: " + command + " Using OOB? " + useOutOfBandMessage + " modeToUse: " + modeToUse);
+         if ((preferredMode = spi.getInvocationContext().getOptionOverrides().getGroupRequestMode()) > -1)
+         {
+            modeToUse = preferredMode;
+         }
+         if (trace)
+         {
+            log.trace("callRemoteMethods(): valid members are " + recipients + " methods: " + command + " Using OOB? " + useOutOfBandMessage + " modeToUse: " + modeToUse);
+         }
 
          flushTracker.lockProcessingLock();
          unlock = true;
@@ -707,7 +704,10 @@
          useOutOfBandMessage = false;
          RspList rsps = rpcDispatcher.invokeRemoteCommands(recipients, command, modeToUse, timeout, isUsingBuddyReplication, useOutOfBandMessage, responseFilter);
          if (mode == GroupRequest.GET_NONE) return Collections.emptyList();// async case
-         if (trace) log.trace("(" + getLocalAddress() + "): responses for method " + command.getClass().getSimpleName() + ":\n" + rsps);
+         if (trace)
+         {
+            log.trace("(" + getLocalAddress() + "): responses for method " + command.getClass().getSimpleName() + ":\n" + rsps);
+         }
          // short-circuit no-return-value calls.
          if (rsps == null) return Collections.emptyList();
          List<Object> retval = new ArrayList<Object>(rsps.size());
@@ -751,7 +751,9 @@
       {
          computeStats(success);
          if (unlock)
+         {
             flushTracker.unlockProcessingLock();
+         }
       }
    }
 
@@ -779,7 +781,10 @@
          // should this really be throwing an exception?  Are there valid use cases where partial state may not be available? - Manik
          // Yes -- cache is configured LOCAL but app doesn't know it -- Brian
          //throw new IllegalArgumentException("Cannot fetch partial state, targets are " + sources + " and stateId is " + stateId);
-         if (log.isWarnEnabled()) log.warn("Cannot fetch partial state, targets are " + sources + " and stateId is " + stateId);
+         if (log.isWarnEnabled())
+         {
+            log.warn("Cannot fetch partial state, targets are " + sources + " and stateId is " + stateId);
+         }
          return;
       }
 
@@ -796,13 +801,19 @@
          return;
       }
 
-      if (log.isDebugEnabled()) log.debug("Node " + getLocalAddress() + " fetching partial state " + stateId + " from members " + targets);
+      if (log.isDebugEnabled())
+      {
+         log.debug("Node " + getLocalAddress() + " fetching partial state " + stateId + " from members " + targets);
+      }
       boolean successfulTransfer = false;
       for (Address target : targets)
       {
          try
          {
-            if (log.isDebugEnabled()) log.debug("Node " + getLocalAddress() + " fetching partial state " + stateId + " from member " + target);
+            if (log.isDebugEnabled())
+            {
+               log.debug("Node " + getLocalAddress() + " fetching partial state " + stateId + " from member " + target);
+            }
             messageListener.setStateSet(false);
             successfulTransfer = getState(stateId, target);
             if (successfulTransfer)
@@ -817,17 +828,26 @@
                   successfulTransfer = false;
                }
             }
-            if (log.isDebugEnabled()) log.debug("Node " + getLocalAddress() + " fetching partial state " + stateId + " from member " + target + (successfulTransfer ? " successful" : " failed"));
+            if (log.isDebugEnabled())
+            {
+               log.debug("Node " + getLocalAddress() + " fetching partial state " + stateId + " from member " + target + (successfulTransfer ? " successful" : " failed"));
+            }
             if (successfulTransfer) break;
          }
          catch (IllegalStateException ise)
          {
             // thrown by the JGroups channel if state retrieval fails.
-            if (log.isInfoEnabled()) log.info("Channel problems fetching state.  Continuing on to next provider. ", ise);
+            if (log.isInfoEnabled())
+            {
+               log.info("Channel problems fetching state.  Continuing on to next provider. ", ise);
+            }
          }
       }
 
-      if (!successfulTransfer && log.isDebugEnabled()) log.debug("Node " + getLocalAddress() + " could not fetch partial state " + stateId + " from any member " + targets);
+      if (!successfulTransfer && log.isDebugEnabled())
+      {
+         log.debug("Node " + getLocalAddress() + " could not fetch partial state " + stateId + " from any member " + targets);
+      }
    }
 
    private boolean getState(String stateId, Address target) throws ChannelNotConnectedException, ChannelClosedException
@@ -989,21 +1009,24 @@
        */
       public void block()
       {
-         try
+         if (!configuration.isNonBlockingStateTransfer())
          {
-            if (log.isDebugEnabled()) log.debug("Block received at " + getLocalAddress());
+            try
+            {
+               if (log.isDebugEnabled()) log.debug("Block received at " + getLocalAddress());
 
-            flushTracker.block();
-            notifier.notifyCacheBlocked(true);
-            notifier.notifyCacheBlocked(false);
+               flushTracker.block();
+               notifier.notifyCacheBlocked(true);
+               notifier.notifyCacheBlocked(false);
 
-            if (log.isDebugEnabled()) log.debug("Block processed at " + getLocalAddress());
+               if (log.isDebugEnabled()) log.debug("Block processed at " + getLocalAddress());
+            }
+            catch (Throwable e)
+            {
+               //do not rethrow! jgroups might behave funny, resulting even in deadlock
+               log.error("Error found while processing block()", e);
+            }
          }
-         catch (Throwable e)
-         {
-            //do not rethrow! jgroups might behave funny, resulting even in deadlock
-            log.error("Error found while processing block()", e);
-         }
       }
 
       /**
@@ -1011,23 +1034,25 @@
        */
       public void unblock()
       {
-         try
+         if (!configuration.isNonBlockingStateTransfer())
          {
-            if (log.isDebugEnabled()) log.debug("UnBlock received at " + getLocalAddress());
+            try
+            {
+               if (log.isDebugEnabled()) log.debug("UnBlock received at " + getLocalAddress());
 
-            notifier.notifyCacheUnblocked(true);
-            notifier.notifyCacheUnblocked(false);
-            flushTracker.unblock();
+               notifier.notifyCacheUnblocked(true);
+               notifier.notifyCacheUnblocked(false);
+               flushTracker.unblock();
 
-            if (log.isDebugEnabled()) log.debug("UnBlock processed at " + getLocalAddress());
+               if (log.isDebugEnabled()) log.debug("UnBlock processed at " + getLocalAddress());
+            }
+            catch (Throwable e)
+            {
+               //do not rethrow! jgroups might behave funny, resulting even in deadlock
+               log.error("Error found while processing unblock", e);
+            }
          }
-         catch (Throwable e)
-         {
-            //do not rethrow! jgroups might behave funny, resulting even in deadlock
-            log.error("Error found while processing unblock", e);
-         }
       }
-
    }
 
    //jmx operations

Modified: core/trunk/src/main/java/org/jboss/cache/factories/ComponentRegistry.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/factories/ComponentRegistry.java	2009-03-16 20:37:26 UTC (rev 7904)
+++ core/trunk/src/main/java/org/jboss/cache/factories/ComponentRegistry.java	2009-03-16 20:37:52 UTC (rev 7905)
@@ -105,7 +105,7 @@
     */
    private boolean invokedFromShutdownHook;
 
-   private volatile boolean blockInStarting = true;
+   private volatile boolean statusCheckNecessary = true;
 
    /**
     * Creates an instance of the component registry.  The configuration passed in is automatically registered.
@@ -886,23 +886,26 @@
       if (trace) log.trace("Is remotely originating.");
 
       // else if this is a remote call and the status is STARTING, wait until the cache starts.
-      if (state == CacheStatus.STARTING && blockInStarting)
+      if (statusCheckNecessary)
       {
-         if (trace) log.trace("Cache is starting; block.");
-         try
+         if (state == CacheStatus.STARTING)
          {
-            blockUntilCacheStarts();
-            return true;
+            if (trace) log.trace("Cache is starting; block.");
+            try
+            {
+               blockUntilCacheStarts();
+               return true;
+            }
+            catch (InterruptedException e)
+            {
+               Thread.currentThread().interrupt();
+            }
          }
-         catch (InterruptedException e)
+         else
          {
-            Thread.currentThread().interrupt();
+            log.warn("Received a remote call but the cache is not in STARTED state - ignoring call.");
          }
       }
-      else if (blockInStarting)
-      {
-         log.warn("Received a remote call but the cache is not in STARTED state - ignoring call.");
-      }
 
       return false;
    }
@@ -1022,14 +1025,14 @@
    /**
     * Returns an immutable set contating all the components that exists in the reporsitory at this moment.
     */
-   public Set<Component> getRegiteredComponents()
+   public Set<Component> getRegisteredComponents()
    {
       HashSet<Component> defensiveCopy = new HashSet<Component>(componentLookup.values());
       return Collections.unmodifiableSet(defensiveCopy);
    }
 
-   public void setBlockInStarting(boolean blockInStarting)
+   public void setStatusCheckNecessary(boolean statusCheckNecessary)
    {
-      this.blockInStarting = blockInStarting;
+      this.statusCheckNecessary = statusCheckNecessary;
    }
 }

Modified: core/trunk/src/main/java/org/jboss/cache/jmx/JmxRegistrationManager.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/jmx/JmxRegistrationManager.java	2009-03-16 20:37:26 UTC (rev 7904)
+++ core/trunk/src/main/java/org/jboss/cache/jmx/JmxRegistrationManager.java	2009-03-16 20:37:52 UTC (rev 7905)
@@ -196,7 +196,7 @@
    private List<ResourceDMBean> getResourceDMBeans()
    {
       List<ResourceDMBean> resourceDMBeans = new ArrayList<ResourceDMBean>();
-      for (ComponentRegistry.Component component : cacheSpi.getComponentRegistry().getRegiteredComponents())
+      for (ComponentRegistry.Component component : cacheSpi.getComponentRegistry().getRegisteredComponents())
       {
          ResourceDMBean resourceDMBean = new ResourceDMBean(component.getInstance());
          if (resourceDMBean.isManagedResource())

Modified: core/trunk/src/main/java/org/jboss/cache/marshall/CommandAwareRpcDispatcher.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/marshall/CommandAwareRpcDispatcher.java	2009-03-16 20:37:26 UTC (rev 7904)
+++ core/trunk/src/main/java/org/jboss/cache/marshall/CommandAwareRpcDispatcher.java	2009-03-16 20:37:52 UTC (rev 7905)
@@ -21,17 +21,6 @@
  */
 package org.jboss.cache.marshall;
 
-import java.io.NotSerializableException;
-import java.util.Map;
-import java.util.Vector;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-
 import org.jboss.cache.InvocationContext;
 import org.jboss.cache.RPCManager;
 import org.jboss.cache.RPCManagerImpl.FlushTracker;
@@ -40,6 +29,7 @@
 import org.jboss.cache.commands.remote.AnnounceBuddyPoolNameCommand;
 import org.jboss.cache.commands.remote.AssignToBuddyGroupCommand;
 import org.jboss.cache.commands.remote.RemoveFromBuddyGroupCommand;
+import org.jboss.cache.commands.remote.StateTransferControlCommand;
 import org.jboss.cache.config.Configuration;
 import org.jboss.cache.factories.ComponentRegistry;
 import org.jboss.cache.interceptors.InterceptorChain;
@@ -58,6 +48,17 @@
 import org.jgroups.util.Rsp;
 import org.jgroups.util.RspList;
 
+import java.io.NotSerializableException;
+import java.util.Map;
+import java.util.Vector;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
 /**
  * A JGroups RPC dispatcher that knows how to deal with {@link org.jboss.cache.commands.ReplicableCommand}s.
  *
@@ -267,7 +268,7 @@
          boolean replayIgnored = false;
 
 
-         if (configuration.isNonBlockingStateTransfer())
+         if (configuration.isNonBlockingStateTransfer() && !(cmd instanceof StateTransferControlCommand))
          {
             int flushCount  = flushTracker.getFlushCompletionCount();
             flushTracker.lockProcessingLock();
@@ -297,10 +298,7 @@
             if (trace) log.trace("This is a non-visitable command - so performing directly and not via the invoker.");
 
             // need to check cache status for all except buddy replication commands.
-            if (!(cmd instanceof AnnounceBuddyPoolNameCommand ||
-                  cmd instanceof AssignToBuddyGroupCommand ||
-                  cmd instanceof RemoveFromBuddyGroupCommand)
-                  && !componentRegistry.invocationsAllowed(false))
+            if (requiresRunningCache(cmd) && !componentRegistry.invocationsAllowed(false))
             {
                return new RequestIgnoredResponse();
             }
@@ -326,6 +324,14 @@
       }
    }
 
+   private boolean requiresRunningCache(ReplicableCommand cmd)
+   {
+      return !(cmd instanceof AnnounceBuddyPoolNameCommand ||
+                  cmd instanceof AssignToBuddyGroupCommand ||
+                  cmd instanceof RemoveFromBuddyGroupCommand ||
+                  cmd instanceof StateTransferControlCommand);
+   }
+
    @Override
    public String toString()
    {

Modified: core/trunk/src/main/java/org/jboss/cache/statetransfer/DefaultStateTransferIntegrator.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/statetransfer/DefaultStateTransferIntegrator.java	2009-03-16 20:37:26 UTC (rev 7904)
+++ core/trunk/src/main/java/org/jboss/cache/statetransfer/DefaultStateTransferIntegrator.java	2009-03-16 20:37:52 UTC (rev 7905)
@@ -32,7 +32,9 @@
 import org.jboss.cache.NodeSPI;
 import org.jboss.cache.RPCManager;
 import org.jboss.cache.buddyreplication.BuddyManager;
+import org.jboss.cache.commands.CommandsFactory;
 import org.jboss.cache.commands.WriteCommand;
+import org.jboss.cache.commands.remote.StateTransferControlCommand;
 import org.jboss.cache.commands.tx.PrepareCommand;
 import org.jboss.cache.config.Configuration;
 import org.jboss.cache.factories.ComponentRegistry;
@@ -49,17 +51,16 @@
 import org.jboss.cache.transaction.TransactionLog;
 import org.jboss.cache.transaction.TransactionLog.LogEntry;
 import org.jgroups.Address;
-import org.jgroups.Channel;
 
 import java.io.IOException;
 import java.io.ObjectInputStream;
-import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.Vector;
 
 public class DefaultStateTransferIntegrator implements StateTransferIntegrator
 {
@@ -71,25 +72,29 @@
 
    private Set<Fqn> internalFqns;
    private Configuration cfg;
-   private RPCManager manager;
+   private RPCManager rpcManager;
    private TransactionLog txLog;
    private boolean needToPersistState;   // for JBCACHE-131
    private boolean nonBlocking;
    private InvocationContextContainer container;
    private InterceptorChain chain;
    private ComponentRegistry registry;
+   private CommandsFactory commandsFactory;
 
    @Inject
-   public void inject(CacheSPI<?, ?> cache, Configuration cfg, RPCManager rpcManager, TransactionLog txLog, InvocationContextContainer container, InterceptorChain chain, ComponentRegistry registry)
+   public void inject(CacheSPI<?, ?> cache, Configuration cfg, RPCManager rpcManager, TransactionLog txLog,
+                      InvocationContextContainer container, InterceptorChain chain, ComponentRegistry registry,
+                      CommandsFactory commandsFactory)
    {
       this.cache = cache;
       this.cfg = cfg;
-      this.manager = rpcManager;
+      this.rpcManager = rpcManager;
       this.nonBlocking = cfg.isNonBlockingStateTransfer();
       this.txLog = txLog;
       this.container = container;
       this.chain = chain;
       this.registry = registry;
+      this.commandsFactory = commandsFactory;
    }
 
    @Start(priority = 14)
@@ -120,47 +125,21 @@
          integrateTxLog(ois);
    }
 
-   private void doPartialFlush(Channel channel, List<Address> members)
+   /**
+    * Mimics a partial flush between the current instance and the address to flush, by opening and closing the necessary
+    * latches on both ends.
+    * @param addressToFlush address to flush in addition to the current address
+    * @param block if true, mimics setting a flush.  Otherwise, mimics un-setting a flush.
+    * @throws Exception if there are issues
+    */
+   private void mimicPartialFlushViaRPC(Address addressToFlush, boolean block) throws Exception
    {
-      int retries = 5;
-      int sleepBetweenRetries = 250;
-      int sleepIncreaseFactor = 2;
-      if (trace) log.trace("Attempting a partial flush on members " + members + " with up to " + retries + " retries.");
-
-      boolean success = false;
-      int i;
-      for (i=1; i<=retries; i++)
-      {
-         if (trace) log.trace("Attempt number " + i);
-         try
-         {
-            if (success = channel.startFlush(members, false)) break;
-            if (trace) log.trace("Channel.startFlush() returned false!");
-         }
-         catch (Exception e)
-         {
-            if (trace) log.trace("Caught exception attempting a partial flush", e);
-         }
-         try
-         {
-            if (trace) log.trace("Partial state transfer failed.  Backing off for " + sleepBetweenRetries + " millis and retrying");
-            Thread.sleep(sleepBetweenRetries);
-            sleepBetweenRetries *= sleepIncreaseFactor;
-         }
-         catch (InterruptedException ie)
-         {
-            Thread.currentThread().interrupt();
-         }
-      }
-
-      if (success)
-      {
-         if (log.isDebugEnabled()) log.debug("Partial flush between " + members + " succeeded!");
-      }
-      else
-      {
-         throw new CacheException("Could initiate partial flush between " +members+ "! State-transfer failed!");
-      }
+      StateTransferControlCommand cmd = commandsFactory.buildStateTransferControlCommand(block);
+      Vector<Address> recipient = new Vector<Address>();
+      recipient.add(addressToFlush);
+      if (!block) rpcManager.getFlushTracker().unblock();
+      rpcManager.callRemoteMethods(recipient, cmd, true, cfg.getStateRetrievalTimeout(), true);
+      if (block) rpcManager.getFlushTracker().block();
    }
 
    private void integrateTxLog(ObjectInputStream ois) throws Exception
@@ -170,11 +149,8 @@
 
       processCommitLog(ois);
 
-      Channel channel = manager.getChannel();
+      mimicPartialFlushViaRPC(rpcManager.getLastStateTransferSource(), true);
 
-      List<Address> targets = Arrays.asList(channel.getLocalAddress(), manager.getLastStateTransferSource());
-      doPartialFlush(channel, targets);            
-
       try
       {
          if (trace)
@@ -201,12 +177,13 @@
 
          // Block all remote commands once transfer is complete,
          // and before FLUSH completes
-         registry.setBlockInStarting(true);
+         registry.setStatusCheckNecessary(true);
       }
       finally
       {
          if (trace) log.trace("Stopping partial flush");
-         channel.stopFlush(targets);
+//         channel.stopFlush(targets);
+         mimicPartialFlushViaRPC(rpcManager.getLastStateTransferSource(), false);
       }
    }
 

Modified: core/trunk/src/main/java/org/jboss/cache/statetransfer/StateProviderBusyException.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/statetransfer/StateProviderBusyException.java	2009-03-16 20:37:26 UTC (rev 7904)
+++ core/trunk/src/main/java/org/jboss/cache/statetransfer/StateProviderBusyException.java	2009-03-16 20:37:52 UTC (rev 7905)
@@ -1,12 +1,14 @@
 package org.jboss.cache.statetransfer;
 
+import org.jboss.cache.CacheException;
+
 /**
  * Thrown when a state provider is busy
  *
  * @author Manik Surtani
  * @since 3.1
  */
-public class StateProviderBusyException extends Exception
+public class StateProviderBusyException extends CacheException
 {
    public StateProviderBusyException()
    {




More information about the jbosscache-commits mailing list