[jbosscache-commits] JBoss Cache SVN: r7678 - in core/trunk/src: main/java/org/jboss/cache/factories and 5 other directories.

jbosscache-commits at lists.jboss.org jbosscache-commits at lists.jboss.org
Wed Feb 11 01:00:34 EST 2009


Author: jason.greene at jboss.com
Date: 2009-02-11 01:00:34 -0500 (Wed, 11 Feb 2009)
New Revision: 7678

Added:
   core/trunk/src/main/java/org/jboss/cache/marshall/ExtendedResponse.java
   core/trunk/src/main/java/org/jboss/cache/marshall/RequestIgnoredResponse.java
Modified:
   core/trunk/src/main/java/org/jboss/cache/RPCManager.java
   core/trunk/src/main/java/org/jboss/cache/RPCManagerImpl.java
   core/trunk/src/main/java/org/jboss/cache/factories/ComponentRegistry.java
   core/trunk/src/main/java/org/jboss/cache/marshall/CacheMarshaller200.java
   core/trunk/src/main/java/org/jboss/cache/marshall/CommandAwareRpcDispatcher.java
   core/trunk/src/main/java/org/jboss/cache/marshall/InactiveRegionAwareRpcDispatcher.java
   core/trunk/src/main/java/org/jboss/cache/statetransfer/DefaultStateTransferGenerator.java
   core/trunk/src/main/java/org/jboss/cache/statetransfer/DefaultStateTransferIntegrator.java
   core/trunk/src/main/java/org/jboss/cache/transaction/TransactionLog.java
   core/trunk/src/test/java/org/jboss/cache/statetransfer/NonBlockingStateTransferTest.java
   core/trunk/src/test/java/org/jboss/cache/transaction/PrepareCommitContentionTest.java
Log:
Eliminate known race conditions in NBST


Modified: core/trunk/src/main/java/org/jboss/cache/RPCManager.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/RPCManager.java	2009-02-10 20:39:38 UTC (rev 7677)
+++ core/trunk/src/main/java/org/jboss/cache/RPCManager.java	2009-02-11 06:00:34 UTC (rev 7678)
@@ -21,16 +21,16 @@
  */
 package org.jboss.cache;
 
+import java.util.List;
+import java.util.Vector;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.jboss.cache.RPCManagerImpl.FlushTracker;
 import org.jboss.cache.commands.ReplicableCommand;
-import org.jboss.cache.lock.TimeoutException;
 import org.jgroups.Address;
 import org.jgroups.Channel;
 import org.jgroups.blocks.RspFilter;
 
-import java.util.List;
-import java.util.Vector;
-import java.util.concurrent.TimeUnit;
-
 /**
  * Provides a mechanism for communicating with other caches in the cluster.  For now this is based on JGroups as an underlying
  * transport, and in future more transport options may become available.
@@ -153,7 +153,17 @@
     */
    Channel getChannel();
 
-   public void waitForFlush(long timeout);
+   /**
+    * Returns the last state transfer source address.
+    *
+    * @return the last state transfer source address
+    */
+   public Address getLastStateTransferSource();
 
-   public Address getLastStateTransferSource();
+   /**
+    * Returns the flush tracker associated with this manager.
+    *
+    * @return the current flush tracker
+    */
+   public FlushTracker getFlushTracker();
 }

Modified: core/trunk/src/main/java/org/jboss/cache/RPCManagerImpl.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/RPCManagerImpl.java	2009-02-10 20:39:38 UTC (rev 7677)
+++ core/trunk/src/main/java/org/jboss/cache/RPCManagerImpl.java	2009-02-11 06:00:34 UTC (rev 7678)
@@ -21,12 +21,29 @@
  */
 package org.jboss.cache;
 
+import java.net.URL;
+import java.text.NumberFormat;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Set;
+import java.util.Vector;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
+
+import javax.transaction.TransactionManager;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.jboss.cache.commands.ReplicableCommand;
 import org.jboss.cache.config.Configuration;
+import org.jboss.cache.config.RuntimeConfig;
 import org.jboss.cache.config.Configuration.NodeLockingScheme;
-import org.jboss.cache.config.RuntimeConfig;
 import org.jboss.cache.factories.ComponentRegistry;
 import org.jboss.cache.factories.annotations.Inject;
 import org.jboss.cache.factories.annotations.Start;
@@ -65,18 +82,6 @@
 import org.jgroups.util.Rsp;
 import org.jgroups.util.RspList;
 
-import javax.transaction.TransactionManager;
-import java.net.URL;
-import java.text.NumberFormat;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Set;
-import java.util.Vector;
-import java.util.concurrent.TimeUnit;
-
 /**
  * Manager that handles all RPC calls between JBoss Cache instances
  *
@@ -97,17 +102,8 @@
     * True if this Cache is the coordinator.
     */
    private volatile boolean coordinator = false;
-   /**
-    * Thread gate used to block Dispatcher during JGroups FLUSH protocol
-    */
-   private final ReclosableLatch flushBlockGate = new ReclosableLatch();
 
    /**
-    * Thread gate used by NBST to wait for a flush
-    */
-   private final ReclosableLatch flushWaitGate = new ReclosableLatch(false);
-
-   /**
     * The most recent state transfer source
     */
    volatile Address lastStateTransferSource;
@@ -135,7 +131,9 @@
    private volatile boolean isInLocalMode;
    private ComponentRegistry componentRegistry;
    private LockManager lockManager;
+   private FlushTracker flushTracker = new FlushTracker();
 
+
    @Inject
    public void setupDependencies(ChannelMessageListener messageListener, Configuration configuration, Notifier notifier,
                                  CacheSPI spi, Marshaller marshaller, TransactionTable txTable,
@@ -155,6 +153,129 @@
       this.lockManager = lockManager;
    }
 
+   public class FlushTracker
+   {
+      private final ReclosableLatch flushBlockGate = new ReclosableLatch();
+      private final AtomicInteger flushCompletionCount = new AtomicInteger();
+      private final ReentrantReadWriteLock coordinationLock = new ReentrantReadWriteLock();
+      private final ReclosableLatch flushWaitGate = new ReclosableLatch(false);
+
+      public void block()
+      {
+         flushBlockGate.close();
+         flushWaitGate.open();
+      }
+
+      public void unblock()
+      {
+         flushWaitGate.close();
+         flushCompletionCount.incrementAndGet();
+         flushBlockGate.open();
+      }
+
+      public int getFlushCompletionCount()
+      {
+         return flushCompletionCount.get();
+      }
+
+      public void lockProcessingLock()
+      {
+         if (! configuration.isNonBlockingStateTransfer())
+            return;
+
+         for (;;)
+         {
+            try
+            {
+               if (!coordinationLock.readLock().tryLock(configuration.getStateRetrievalTimeout(), TimeUnit.MILLISECONDS))
+                  throw new TimeoutException("Could not obtain processing lock");
+
+               return;
+            }
+            catch (InterruptedException e)
+            {
+               Thread.currentThread().interrupt();
+            }
+         }
+      }
+
+      public void unlockProcessingLock()
+      {
+         if (! configuration.isNonBlockingStateTransfer())
+            return;
+
+         coordinationLock.readLock().unlock();
+      }
+
+      public void lockSuspendProcessingLock()
+      {
+         if (! configuration.isNonBlockingStateTransfer())
+            return;
+
+         for (;;)
+         {
+            try
+            {
+               if (!coordinationLock.writeLock().tryLock(configuration.getStateRetrievalTimeout(), TimeUnit.MILLISECONDS))
+                  throw new TimeoutException("Could not obtain processing lock");
+
+               return;
+            }
+            catch (InterruptedException e)
+            {
+               Thread.currentThread().interrupt();
+            }
+         }
+      }
+
+      public void unlockSuspendProcessingLock()
+      {
+         if (! configuration.isNonBlockingStateTransfer())
+            return;
+
+         if (coordinationLock.isWriteLockedByCurrentThread())
+            coordinationLock.writeLock().unlock();
+      }
+
+      public void waitForFlushCompletion(long timeout)
+      {
+         for (; ;)
+         {
+            try
+            {
+               if (channel.flushSupported() && !flushBlockGate.await(timeout, TimeUnit.MILLISECONDS))
+               {
+                  throw new TimeoutException("State retrieval timed out waiting for flush to block. (timeout = " + timeout + " millis) ");
+               }
+               return;
+            }
+            catch (InterruptedException e)
+            {
+               Thread.currentThread().interrupt();
+            }
+         }
+      }
+
+      public void waitForFlushStart(long timeout)
+      {
+         for (; ;)
+         {
+            try
+            {
+               if (channel.flushSupported() && !flushWaitGate.await(timeout, TimeUnit.MILLISECONDS))
+               {
+                  throw new TimeoutException("State retrieval timed out waiting for flush to block. (timeout = " + timeout + " millis) ");
+               }
+               return;
+            }
+            catch (InterruptedException e)
+            {
+               Thread.currentThread().interrupt();
+            }
+         }
+      }
+   }
+
    // ------------ START: Lifecycle methods ------------
 
    @Start(priority = 15)
@@ -243,9 +364,9 @@
                }
             }
 
-            if (log.isDebugEnabled())
+            if (log.isInfoEnabled())
             {
-               log.debug("state was retrieved successfully (in " + (System.currentTimeMillis() - start) + " milliseconds)");
+               log.info("state was retrieved successfully (in " + (System.currentTimeMillis() - start) + " milliseconds)");
             }
       }
 
@@ -273,9 +394,9 @@
 
             try
             {
-               if (log.isTraceEnabled())
+               if (log.isInfoEnabled())
                {
-                  log.trace("Trying to fetch state from: " + member);
+                  log.info("Trying to fetch state from: " + member);
                }
                if (getState(null, member))
                {
@@ -286,18 +407,18 @@
             }
             catch (Exception e)
             {
-               if (log.isTraceEnabled())
+               if (log.isDebugEnabled())
                {
-                  log.trace("Error while fetching state", e);
+                  log.debug("Error while fetching state", e);
                }
             }
          }
 
          if (!success)
          {
-            if (trace)
+            if (log.isWarnEnabled())
             {
-               log.trace("Could not find available peer for state, backing off and retrying");
+               log.warn("Could not find available peer for state, backing off and retrying");
             }
 
             try
@@ -428,12 +549,12 @@
       if (configuration.isUseRegionBasedMarshalling())
       {
          rpcDispatcher = new InactiveRegionAwareRpcDispatcher(channel, messageListener, new MembershipListenerAdaptor(),
-               spi, invocationContextContainer, interceptorChain, componentRegistry, flushBlockGate);
+               spi, invocationContextContainer, interceptorChain, componentRegistry, this);
       }
       else
       {
          rpcDispatcher = new CommandAwareRpcDispatcher(channel, messageListener, new MembershipListenerAdaptor(),
-               invocationContextContainer, invocationContextContainer, interceptorChain, componentRegistry, flushBlockGate);
+               invocationContextContainer, invocationContextContainer, interceptorChain, componentRegistry, this);
       }
       checkAppropriateConfig();
       rpcDispatcher.setRequestMarshaller(marshaller);
@@ -552,6 +673,7 @@
    public List<Object> callRemoteMethods(Vector<Address> recipients, ReplicableCommand command, int mode, long timeout, RspFilter responseFilter, boolean useOutOfBandMessage) throws Exception
    {
       boolean success = true;
+      boolean unlock = false;
       try
       {
          // short circuit if we don't have an RpcDispatcher!
@@ -566,10 +688,11 @@
          {
             log.trace("callRemoteMethods(): valid members are " + recipients + " methods: " + command + " Using OOB? " + useOutOfBandMessage + " modeToUse: " + modeToUse);
          }
-         if (channel.flushSupported() && !flushBlockGate.await(configuration.getStateRetrievalTimeout(), TimeUnit.MILLISECONDS))
-         {
-            throw new TimeoutException("State retrieval timed out waiting for flush unblock. (timeout = " + configuration.getStateRetrievalTimeout() + " millis) ");
-         }
+
+         flushTracker.lockProcessingLock();
+         unlock = true;
+         flushTracker.waitForFlushCompletion(configuration.getStateRetrievalTimeout());
+
          useOutOfBandMessage = false;
          RspList rsps = rpcDispatcher.invokeRemoteCommands(recipients, command, modeToUse, timeout, isUsingBuddyReplication, useOutOfBandMessage, responseFilter);
          if (mode == GroupRequest.GET_NONE) return Collections.emptyList();// async case
@@ -619,6 +742,8 @@
       finally
       {
          computeStats(success);
+         if (unlock)
+            flushTracker.unlockProcessingLock();
       }
    }
 
@@ -722,24 +847,6 @@
       return ((JChannel) channel).getState(target, stateId, configuration.getStateRetrievalTimeout(), !configuration.isNonBlockingStateTransfer());
    }
 
-   public void waitForFlush(long timeout)
-   {
-      for (; ;)
-      {
-         try
-         {
-            if (channel.flushSupported() && !flushWaitGate.await(timeout, TimeUnit.MILLISECONDS))
-            {
-               throw new TimeoutException("State retrieval timed out waiting for flush to block. (timeout = " + timeout + " millis) ");
-            }
-            return;
-         }
-         catch (InterruptedException e)
-         {
-            Thread.currentThread().interrupt();
-         }
-      }
-   }
 
    // ------------ END: Partial state transfer methods ------------
 
@@ -895,9 +1002,9 @@
       {
          try
          {
-            flushBlockGate.close();
-            flushWaitGate.open();
             if (log.isDebugEnabled()) log.debug("Block received at " + getLocalAddress());
+
+            flushTracker.block();
             notifier.notifyCacheBlocked(true);
             notifier.notifyCacheBlocked(false);
 
@@ -917,14 +1024,13 @@
       {
          try
          {
-            flushWaitGate.close();
             if (log.isDebugEnabled()) log.debug("UnBlock received at " + getLocalAddress());
 
             notifier.notifyCacheUnblocked(true);
             notifier.notifyCacheUnblocked(false);
+            flushTracker.unblock();
 
             if (log.isDebugEnabled()) log.debug("UnBlock processed at " + getLocalAddress());
-            flushBlockGate.open();
          }
          catch (Throwable e)
          {
@@ -1034,4 +1140,9 @@
          }
       }
    }
+
+   public FlushTracker getFlushTracker()
+   {
+      return flushTracker;
+   }
 }

Modified: core/trunk/src/main/java/org/jboss/cache/factories/ComponentRegistry.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/factories/ComponentRegistry.java	2009-02-10 20:39:38 UTC (rev 7677)
+++ core/trunk/src/main/java/org/jboss/cache/factories/ComponentRegistry.java	2009-02-11 06:00:34 UTC (rev 7678)
@@ -93,7 +93,7 @@
    // component and method containers
    final Map<String, Component> componentLookup = new HashMap<String, Component>();
 
-   CacheStatus state = CacheStatus.INSTANTIATED;
+   volatile CacheStatus state = CacheStatus.INSTANTIATED;
 
    /**
     * Hook to shut down the cache when the JVM exits.
@@ -899,10 +899,11 @@
             Thread.currentThread().interrupt();
          }
       }
-      else
+      else if (blockInStarting)
       {
          log.warn("Received a remote call but the cache is not in STARTED state - ignoring call.");
       }
+
       return false;
    }
 

Modified: core/trunk/src/main/java/org/jboss/cache/marshall/CacheMarshaller200.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/marshall/CacheMarshaller200.java	2009-02-10 20:39:38 UTC (rev 7677)
+++ core/trunk/src/main/java/org/jboss/cache/marshall/CacheMarshaller200.java	2009-02-11 06:00:34 UTC (rev 7678)
@@ -38,6 +38,7 @@
 import java.util.TreeMap;
 import java.util.TreeSet;
 
+import org.apache.commons.httpclient.cookie.IgnoreCookiesSpec;
 import org.jboss.cache.CacheException;
 import org.jboss.cache.Fqn;
 import org.jboss.cache.Region;
@@ -93,6 +94,8 @@
    protected static final int MAGICNUMBER_DOUBLE = 28;
    protected static final int MAGICNUMBER_OBJECT = 29;
    protected static final int MAGICNUMBER_TXLOG_ENTRY = 50;
+   protected static final int MAGICNUMBER_REQUEST_IGNORED_RESPONSE = 51;
+   protected static final int MAGICNUMBER_EXTENDED_RESPONSE = 52;
    protected static final int MAGICNUMBER_NULL = 99;
    protected static final int MAGICNUMBER_SERIALIZABLE = 100;
    protected static final int MAGICNUMBER_REF = 101;
@@ -448,6 +451,15 @@
          out.writeByte(MAGICNUMBER_GRAVITATERESULT);
          marshallGravitateResult((GravitateResult) o, out, refMap);
       }
+      else if (o instanceof RequestIgnoredResponse)
+      {
+         out.writeByte(MAGICNUMBER_REQUEST_IGNORED_RESPONSE);
+      }
+      else if (o instanceof ExtendedResponse)
+      {
+         out.writeByte(MAGICNUMBER_EXTENDED_RESPONSE);
+         marshallExtendedResponse((ExtendedResponse)o, out, refMap);
+      }
       else if (o instanceof Serializable)
       {
          if (trace)
@@ -464,6 +476,12 @@
       }
    }
 
+   private void marshallExtendedResponse(ExtendedResponse response, ObjectOutputStream out, Map<Object, Integer> refMap) throws Exception
+   {
+      out.writeBoolean(response.isReplayIgnoredRequests());
+      marshallObject(response.getResponse(), out, refMap);
+   }
+
    private void marshallLogEntry(LogEntry log, ObjectOutputStream out, Map<Object, Integer> refMap) throws Exception
    {
       marshallObject(log.getTransaction(), out, refMap);
@@ -673,6 +691,10 @@
             return retVal;
          case MAGICNUMBER_GRAVITATERESULT:
             return unmarshallGravitateResult(in, refMap);
+         case MAGICNUMBER_REQUEST_IGNORED_RESPONSE:
+            return new RequestIgnoredResponse();
+         case MAGICNUMBER_EXTENDED_RESPONSE:
+            return unmarshallExtendedResponse(in, refMap);
          default:
             if (log.isErrorEnabled())
             {
@@ -683,8 +705,17 @@
       throw new Exception("Unknown magic number " + magicNumber);
    }
 
+   private ExtendedResponse unmarshallExtendedResponse(ObjectInputStream in, UnmarshalledReferences refMap) throws Exception
+   {
+      boolean replayIgnoredRequests = in.readBoolean();
+      ExtendedResponse response = new ExtendedResponse(unmarshallObject(in, refMap));
+      response.setReplayIgnoredRequests(replayIgnoredRequests);
+
+      return response;
+   }
+
    @SuppressWarnings("unchecked")
-   private Object unmarshallLogEntry(ObjectInputStream in, UnmarshalledReferences refMap) throws Exception
+   private LogEntry unmarshallLogEntry(ObjectInputStream in, UnmarshalledReferences refMap) throws Exception
    {
       GlobalTransaction gtx = (GlobalTransaction)unmarshallObject(in, refMap);
       List<WriteCommand> mods = (List<WriteCommand>)unmarshallObject(in, refMap);

Modified: core/trunk/src/main/java/org/jboss/cache/marshall/CommandAwareRpcDispatcher.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/marshall/CommandAwareRpcDispatcher.java	2009-02-10 20:39:38 UTC (rev 7677)
+++ core/trunk/src/main/java/org/jboss/cache/marshall/CommandAwareRpcDispatcher.java	2009-02-11 06:00:34 UTC (rev 7678)
@@ -22,16 +22,21 @@
 package org.jboss.cache.marshall;
 
 import java.io.NotSerializableException;
+import java.util.Map;
 import java.util.Vector;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
+import java.util.concurrent.Semaphore;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import org.jboss.cache.InvocationContext;
+import org.jboss.cache.RPCManager;
+import org.jboss.cache.RPCManagerImpl.FlushTracker;
 import org.jboss.cache.commands.ReplicableCommand;
 import org.jboss.cache.commands.VisitableCommand;
 import org.jboss.cache.commands.remote.AnnounceBuddyPoolNameCommand;
@@ -50,6 +55,7 @@
 import org.jgroups.MembershipListener;
 import org.jgroups.Message;
 import org.jgroups.MessageListener;
+import org.jgroups.blocks.GroupRequest;
 import org.jgroups.blocks.RpcDispatcher;
 import org.jgroups.blocks.RspFilter;
 import org.jgroups.util.Buffer;
@@ -72,20 +78,20 @@
    private AtomicInteger replicationProcessorCount;
    private boolean asyncSerial;
    private Configuration configuration;
-   private ReclosableLatch flushGate;
+   private RPCManager rpcManager;
    private ReplicationObserver replicationObserver;
 
    public CommandAwareRpcDispatcher() {}
 
    public CommandAwareRpcDispatcher(Channel channel, MessageListener l, MembershipListener l2,
                                     Object serverObj, InvocationContextContainer container, InterceptorChain interceptorChain,
-                                    ComponentRegistry componentRegistry, ReclosableLatch flushGate)
+                                    ComponentRegistry componentRegistry, RPCManager manager)
    {
       super(channel, l, l2, serverObj);
       this.invocationContextContainer = container;
       this.componentRegistry = componentRegistry;
       this.interceptorChain = interceptorChain;
-      this.flushGate = flushGate;
+      this.rpcManager = manager;
 
       trace = log.isTraceEnabled();
 
@@ -198,7 +204,8 @@
          log.trace(new StringBuilder("dests=").append(dests).append(", command=").append(command).
                append(", mode=").append(mode).append(", timeout=").append(timeout));
 
-      ReplicationTask replicationTask = new ReplicationTask(command, oob, dests, mode, timeout, anycasting, filter);
+      boolean supportReplay = configuration.isNonBlockingStateTransfer();
+      ReplicationTask replicationTask = new ReplicationTask(command, oob, dests, mode, timeout, anycasting, supportReplay, filter);
       Future<RspList> response = replicationProcessor.submit(replicationTask);
       if (asyncSerial)
       {
@@ -253,25 +260,41 @@
 
    protected Object executeCommand(ReplicableCommand cmd, Message req) throws Throwable
    {
+      boolean unlock = false;
+      FlushTracker flushTracker = rpcManager.getFlushTracker();
+
       try
       {
          if (cmd == null) throw new NullPointerException("Unable to execute a null command!  Message was " + req);
          if (trace) log.trace("Executing command: " + cmd + " [sender=" + req.getSrc() + "]");
 
-         if (channel.flushSupported() && !flushGate.await(configuration.getStateRetrievalTimeout(), TimeUnit.MILLISECONDS))
+         boolean replayIgnored = false;
+
+
+         if (configuration.isNonBlockingStateTransfer())
          {
-            throw new TimeoutException("State retrieval timed out waiting for flush unblock. (timeout = " + configuration.getStateRetrievalTimeout() + " millis) ");
+            int flushCount  = flushTracker.getFlushCompletionCount();
+            flushTracker.lockProcessingLock();
+            unlock = true;
+
+            flushTracker.waitForFlushCompletion(configuration.getStateRetrievalTimeout());
+
+            // If this thread blocked during a NBST flush, then inform the sender
+            // it needs to replay ignored messages
+            replayIgnored = flushTracker.getFlushCompletionCount() != flushCount;
          }
 
+         Object ret;
+
          if (cmd instanceof VisitableCommand)
          {
             InvocationContext ctx = invocationContextContainer.get();
             ctx.setOriginLocal(false);
             if (!componentRegistry.invocationsAllowed(false))
             {
-               return null;
+               return new RequestIgnoredResponse();
             }
-            return interceptorChain.invoke(ctx, (VisitableCommand) cmd);
+            ret = interceptorChain.invoke(ctx, (VisitableCommand) cmd);
          }
          else
          {
@@ -283,15 +306,27 @@
                   cmd instanceof RemoveFromBuddyGroupCommand)
                   && !componentRegistry.invocationsAllowed(false))
             {
-               return null;
+               return new RequestIgnoredResponse();
             }
-            return cmd.perform(null);
+            ret = cmd.perform(null);
          }
+
+         if (replayIgnored)
+         {
+            ExtendedResponse extended = new ExtendedResponse(ret);
+            extended.setReplayIgnoredRequests(true);
+            ret = extended;
+         }
+
+         return ret;
       }
       finally
       {
          if (replicationObserver != null)
             replicationObserver.afterExecutingCommand(cmd);
+
+         if (unlock)
+            flushTracker.unlockProcessingLock();
       }
    }
 
@@ -309,9 +344,10 @@
       private int mode;
       private long timeout;
       private boolean anycasting;
+      private boolean supportReplay;
       private RspFilter filter;
 
-      private ReplicationTask(ReplicableCommand command, boolean oob, Vector<Address> dests, int mode, long timeout, boolean anycasting, RspFilter filter)
+      private ReplicationTask(ReplicableCommand command, boolean oob, Vector<Address> dests, int mode, long timeout, boolean anycasting, boolean supportReplay, RspFilter filter)
       {
          this.command = command;
          this.oob = oob;
@@ -319,6 +355,7 @@
          this.mode = mode;
          this.timeout = timeout;
          this.anycasting = anycasting;
+         this.supportReplay = supportReplay;
          this.filter = filter;
       }
 
@@ -338,6 +375,9 @@
          Message msg = new Message();
          msg.setBuffer(buf);
          if (oob) msg.setFlag(Message.OOB);
+
+         // Replay capability requires responses from all members!
+         int mode = supportReplay ? GroupRequest.GET_ALL : this.mode;
          RspList retval = castMessage(dests, msg, mode, timeout, anycasting, filter);
          if (trace) log.trace("responses: " + retval);
 
@@ -347,6 +387,36 @@
 
          if (retval == null)
             throw new NotSerializableException("RpcDispatcher returned a null.  This is most often caused by args for " + command.getClass().getSimpleName() + " not being serializable.");
+
+         if (supportReplay)
+         {
+            boolean replay = false;
+            Vector<Address> ignorers = new Vector<Address>();
+            for (Map.Entry<Address, Rsp> entry : retval.entrySet())
+            {
+               Object value = entry.getValue().getValue();
+               if (value instanceof RequestIgnoredResponse)
+               {
+                  ignorers.add(entry.getKey());
+               }
+               else if (value instanceof ExtendedResponse)
+               {
+                  ExtendedResponse extended = (ExtendedResponse) value;
+                  replay |= extended.isReplayIgnoredRequests();
+                  entry.getValue().setValue(extended.getResponse());
+               }
+            }
+
+            if (replay && ignorers.size() > 0)
+            {
+               if (trace)
+                  log.trace("Replaying message to ignoring senders: " + ignorers);
+               RspList responses = castMessage(ignorers, msg, GroupRequest.GET_ALL, timeout, anycasting, filter);
+               if (responses != null)
+                  retval.putAll(responses);
+            }
+         }
+
          return retval;
       }
    }

Added: core/trunk/src/main/java/org/jboss/cache/marshall/ExtendedResponse.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/marshall/ExtendedResponse.java	                        (rev 0)
+++ core/trunk/src/main/java/org/jboss/cache/marshall/ExtendedResponse.java	2009-02-11 06:00:34 UTC (rev 7678)
@@ -0,0 +1,53 @@
+/*
+* JBoss, Home of Professional Open Source
+* Copyright 2005, JBoss Inc., and individual contributors as indicated
+* by the @authors tag. See the copyright.txt in the distribution for a
+* full listing of individual contributors.
+*
+* This is free software; you can redistribute it and/or modify it
+* under the terms of the GNU Lesser General Public License as
+* published by the Free Software Foundation; either version 2.1 of
+* the License, or (at your option) any later version.
+*
+* This software is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+* Lesser General Public License for more details.
+*
+* You should have received a copy of the GNU Lesser General Public
+* License along with this software; if not, write to the Free
+* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+*/
+package org.jboss.cache.marshall;
+
+/**
+ * A response with extended information
+ *
+ * @author Jason T. Greene
+ */
+public class ExtendedResponse
+{
+   private boolean replayIgnoredRequests;
+   private final Object response;
+
+   public ExtendedResponse(Object response)
+   {
+      this.response = response;
+   }
+
+   public boolean isReplayIgnoredRequests()
+   {
+      return replayIgnoredRequests;
+   }
+
+   public void setReplayIgnoredRequests(boolean replayIgnoredRequests)
+   {
+      this.replayIgnoredRequests = replayIgnoredRequests;
+   }
+
+   public Object getResponse()
+   {
+      return response;
+   }
+}


Property changes on: core/trunk/src/main/java/org/jboss/cache/marshall/ExtendedResponse.java
___________________________________________________________________
Name: svn:keywords
   + Id Revision
Name: svn:eol-style
   + LF

Modified: core/trunk/src/main/java/org/jboss/cache/marshall/InactiveRegionAwareRpcDispatcher.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/marshall/InactiveRegionAwareRpcDispatcher.java	2009-02-10 20:39:38 UTC (rev 7677)
+++ core/trunk/src/main/java/org/jboss/cache/marshall/InactiveRegionAwareRpcDispatcher.java	2009-02-11 06:00:34 UTC (rev 7678)
@@ -21,6 +21,7 @@
  */
 package org.jboss.cache.marshall;
 
+import org.jboss.cache.RPCManager;
 import org.jboss.cache.commands.ReplicableCommand;
 import org.jboss.cache.factories.ComponentRegistry;
 import org.jboss.cache.interceptors.InterceptorChain;
@@ -47,9 +48,9 @@
     */
    public InactiveRegionAwareRpcDispatcher(Channel channel, MessageListener l, MembershipListener l2, Object serverObj,
                                            InvocationContextContainer container, InterceptorChain interceptorChain,
-                                           ComponentRegistry componentRegistry, ReclosableLatch flushBlockGate)
+                                           ComponentRegistry componentRegistry, RPCManager manager)
    {
-      super(channel, l, l2, serverObj, container, interceptorChain, componentRegistry, flushBlockGate);
+      super(channel, l, l2, serverObj, container, interceptorChain, componentRegistry, manager);
    }
 
    @Override

Added: core/trunk/src/main/java/org/jboss/cache/marshall/RequestIgnoredResponse.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/marshall/RequestIgnoredResponse.java	                        (rev 0)
+++ core/trunk/src/main/java/org/jboss/cache/marshall/RequestIgnoredResponse.java	2009-02-11 06:00:34 UTC (rev 7678)
@@ -0,0 +1,32 @@
+/*
+* JBoss, Home of Professional Open Source
+* Copyright 2005, JBoss Inc., and individual contributors as indicated
+* by the @authors tag. See the copyright.txt in the distribution for a
+* full listing of individual contributors.
+*
+* This is free software; you can redistribute it and/or modify it
+* under the terms of the GNU Lesser General Public License as
+* published by the Free Software Foundation; either version 2.1 of
+* the License, or (at your option) any later version.
+*
+* This software is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+* Lesser General Public License for more details.
+*
+* You should have received a copy of the GNU Lesser General Public
+* License along with this software; if not, write to the Free
+* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+*/
+package org.jboss.cache.marshall;
+
+/**
+ * Indicates that the request was ignored,
+ *
+ * @author Jason T. Greene
+ */
+public class RequestIgnoredResponse
+{
+
+}


Property changes on: core/trunk/src/main/java/org/jboss/cache/marshall/RequestIgnoredResponse.java
___________________________________________________________________
Name: svn:keywords
   + Id Revision
Name: svn:eol-style
   + LF

Modified: core/trunk/src/main/java/org/jboss/cache/statetransfer/DefaultStateTransferGenerator.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/statetransfer/DefaultStateTransferGenerator.java	2009-02-10 20:39:38 UTC (rev 7677)
+++ core/trunk/src/main/java/org/jboss/cache/statetransfer/DefaultStateTransferGenerator.java	2009-02-11 06:00:34 UTC (rev 7678)
@@ -26,6 +26,7 @@
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -36,6 +37,7 @@
 import org.jboss.cache.Node;
 import org.jboss.cache.RPCManager;
 import org.jboss.cache.Version;
+import org.jboss.cache.RPCManagerImpl.FlushTracker;
 import org.jboss.cache.config.Configuration;
 import org.jboss.cache.factories.annotations.Inject;
 import org.jboss.cache.factories.annotations.Start;
@@ -136,68 +138,93 @@
          CacheLoader cacheLoader = cache.getCacheLoaderManager() == null ? null : cache.getCacheLoaderManager().getCacheLoader();
          if (cacheLoader != null && generatePersistent)
          {
-            if (log.isTraceEnabled())
-            {
-               log.trace("writing persistent state for " + fqn + ",using " + cache.getCacheLoaderManager().getCacheLoader().getClass());
-            }
-
-            if (fqn.isRoot())
-            {
-               cacheLoader.loadEntireState(out);
-            }
-            else
-            {
-               cacheLoader.loadState(fqn, out);
-            }
-
-            if (log.isTraceEnabled())
-            {
-               log.trace("persistent state succesfully written");
-            }
+            writePersistentData(out, fqn, cacheLoader);
          }
          delimitStream(out);
 
          if (nonBlocking && generateTransient)
          {
-            for (int nonProgress = 0, size = txLog.size(); size > 0;)
-            {
-               if (log.isTraceEnabled())
-                  log.trace("Tx Log remaining entries = " + size);
-               txLog.writeCommitLog(cache.getMarshaller(), out);
-               int newSize = txLog.size();
+            writeTxLog(out);
+         }
 
-               // If size did not decrease then we did not make progress, and could be wasting
-               // our time. Limit this to the specified max.
-               if (newSize >= size && ++nonProgress >= maxNonProgressingLogWrites)
-                  break;
+      }
+      catch (Exception e)
+      {
+         cache.getMarshaller().objectToObjectStream(new NodeDataExceptionMarker(e, cache.getLocalAddress()), out);
+         throw e;
+      }
+      finally
+      {
+         if (nonBlocking)
+            txLog.deactivate();
+      }
+   }
 
-               size = newSize;
-            }
+   private void writePersistentData(ObjectOutputStream out, Fqn fqn, CacheLoader cacheLoader) throws Exception
+   {
+      if (log.isTraceEnabled())
+      {
+         log.trace("writing persistent state for " + fqn + ",using " + cache.getCacheLoaderManager().getCacheLoader().getClass());
+      }
 
-            // Signal to sender that we need a flush to get a consistent view
-            // of the remaining transactions.
-            delimitStream(out);
-            out.flush();
-            rpcManager.waitForFlush(flushTimeout);
+      if (fqn.isRoot())
+      {
+         cacheLoader.loadEntireState(out);
+      }
+      else
+      {
+         cacheLoader.loadState(fqn, out);
+      }
 
-            // Write remaining transactions
+      if (log.isTraceEnabled())
+      {
+         log.trace("persistent state succesfully written");
+      }
+   }
+
+   private void writeTxLog(ObjectOutputStream out) throws Exception
+   {
+      FlushTracker flushTracker = rpcManager.getFlushTracker();
+
+      try
+      {
+         for (int nonProgress = 0, size = txLog.size(); size > 0;)
+         {
+            if (log.isTraceEnabled())
+               log.trace("Tx Log remaining entries = " + size);
             txLog.writeCommitLog(cache.getMarshaller(), out);
-            delimitStream(out);
+            int newSize = txLog.size();
 
-            // Write all non-completed prepares
-            txLog.writePendingPrepares(cache.getMarshaller(), out);
-            delimitStream(out);
+            // If size did not decrease then we did not make progress, and could be wasting
+            // our time. Limit this to the specified max.
+            if (newSize >= size && ++nonProgress >= maxNonProgressingLogWrites)
+               break;
+
+            size = newSize;
          }
 
+         // Wait on incoming and outgoing threads to line-up in front of
+         // the flush gate.
+         flushTracker.lockSuspendProcessingLock();
+
+         // Signal to sender that we need a flush to get a consistent view
+         // of the remaining transactions.
+         delimitStream(out);
+         out.flush();
+         flushTracker.waitForFlushStart(flushTimeout);
+
+         // Write remaining transactions
+         txLog.writeCommitLog(cache.getMarshaller(), out);
+         delimitStream(out);
+
+         // Write all non-completed prepares
+         txLog.writePendingPrepares(cache.getMarshaller(), out);
+         delimitStream(out);
+         out.flush();
       }
-      catch (Exception e)
-      {
-         cache.getMarshaller().objectToObjectStream(new NodeDataExceptionMarker(e, cache.getLocalAddress()), out);
-         throw e;
-      }
       finally
       {
-         txLog.deactivate();
+         flushTracker.unlockSuspendProcessingLock();
       }
    }
 

Modified: core/trunk/src/main/java/org/jboss/cache/statetransfer/DefaultStateTransferIntegrator.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/statetransfer/DefaultStateTransferIntegrator.java	2009-02-10 20:39:38 UTC (rev 7677)
+++ core/trunk/src/main/java/org/jboss/cache/statetransfer/DefaultStateTransferIntegrator.java	2009-02-11 06:00:34 UTC (rev 7678)
@@ -230,6 +230,10 @@
 
          notifyAllNodesCreated(cache.getInvocationContext(), target);
       }
+      catch (CacheException ce)
+      {
+         throw ce;
+      }
       catch (Exception e)
       {
          throw new CacheException(e);
@@ -387,6 +391,14 @@
    private List<NodeData> readNodesAsList(ObjectInputStream in) throws Exception
    {
       Object obj = cache.getMarshaller().objectFromObjectStream(in);
+      if (obj instanceof NodeDataExceptionMarker)
+      {
+         Throwable cause = ((NodeDataExceptionMarker)obj).getCause();
+         if (cause instanceof Exception)
+            throw (Exception) cause;
+
+         throw new CacheException(cause);
+      }
       if (obj instanceof NodeDataMarker) return null;
 
       return (List<NodeData>) obj;

Modified: core/trunk/src/main/java/org/jboss/cache/transaction/TransactionLog.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/transaction/TransactionLog.java	2009-02-10 20:39:38 UTC (rev 7677)
+++ core/trunk/src/main/java/org/jboss/cache/transaction/TransactionLog.java	2009-02-11 06:00:34 UTC (rev 7678)
@@ -69,7 +69,7 @@
       }
    }
 
-   private Log log = LogFactory.getLog(getClass().getName());
+   private static Log log = LogFactory.getLog(TransactionLog.class);
 
    public void logPrepare(PrepareCommand command)
    {
@@ -145,6 +145,8 @@
    public void deactivate()
    {
       active.set(false);
+      if (entries.size() > 0)
+         log.error("Unprocessed Transaction Log Entries! = " + entries.size());
       entries.clear();
    }
 

Modified: core/trunk/src/test/java/org/jboss/cache/statetransfer/NonBlockingStateTransferTest.java
===================================================================
--- core/trunk/src/test/java/org/jboss/cache/statetransfer/NonBlockingStateTransferTest.java	2009-02-10 20:39:38 UTC (rev 7677)
+++ core/trunk/src/test/java/org/jboss/cache/statetransfer/NonBlockingStateTransferTest.java	2009-02-11 06:00:34 UTC (rev 7678)
@@ -13,11 +13,10 @@
 import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
 import java.io.Serializable;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.List;
-import java.util.ArrayList;
+import java.util.concurrent.atomic.AtomicReference;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.jboss.cache.Cache;
 import org.jboss.cache.CacheSPI;
 import org.jboss.cache.Fqn;
@@ -27,7 +26,6 @@
 import org.jboss.cache.factories.UnitTestConfigurationFactory;
 import org.jboss.cache.util.TestingUtil;
 import org.testng.annotations.Test;
-import org.testng.annotations.AfterMethod;
 
 @Test(groups="functional", testName = "statetransfer.NonBlockingStateTransferTest")
 public class NonBlockingStateTransferTest
@@ -46,17 +44,9 @@
    public static final Integer TWENTY = 20;
    public static final Integer FORTY = 40;
 
-   private List<Cache> createdCaches = new ArrayList<Cache>();
+   private volatile int testCount = 0;
 
-   @AfterMethod 
-   public void clearCaches()
-   {
-      for (Cache c : createdCaches)
-      {
-         TestingUtil.killCaches(c);
-      }
-      createdCaches.clear();
-   }
+   private static final Log log = LogFactory.getLog(NonBlockingStateTransferTest.class);
 
    public static class DelayTransfer implements Serializable
    {
@@ -121,6 +111,8 @@
             }
             catch (Exception e)
             {
+               e.printStackTrace();
+               log.error(e);
             }
          }
          result = c;
@@ -134,84 +126,170 @@
 
    private CacheSPI<Object, Object> createCache(String name)
    {
+      return createCache(name, true);
+
+   }
+
+   private CacheSPI<Object, Object> createCache(String name, boolean start)
+   {
       Configuration config = UnitTestConfigurationFactory.createConfiguration(CacheMode.REPL_SYNC);
+      config.setSyncCommitPhase(true);
       config.setClusterName(name + "-" + Thread.currentThread().getName());
       config.setNonBlockingStateTransfer(true);
+      config.setSyncReplTimeout(30000);
       CacheSPI<Object, Object> cache = (CacheSPI<Object, Object>) new UnitTestCacheFactory<Object, Object>().createCache(config, false, getClass());
 
-      // Use marshaller
-
       cache.create();
-      cache.start();
-      createdCaches.add(cache);
+      if (start)
+         cache.start();
       return cache;
-
    }
 
    public void testInitialStateTransfer() throws Exception
    {
-      CacheSPI<Object, Object> cache1 = createCache("nbst");
+      testCount++;
+      log.info("testInitialStateTransfer start - " + testCount);
+      CacheSPI<Object, Object> cache1 = null, cache2 = null;
+      try
+      {
+         cache1 = createCache("nbst");
+         writeInitialData(cache1);
 
-      writeInitialData(cache1);
+         cache2 = createCache("nbst");
 
-      CacheSPI<Object, Object> cache2 = createCache("nbst");
+         // Pause to give caches time to see each other
+         TestingUtil.blockUntilViewsReceived(new CacheSPI[] { cache1, cache2 }, 60000);
 
-      // Pause to give caches time to see each other
-      TestingUtil.blockUntilViewsReceived(new CacheSPI[]{cache1, cache2}, 60000);
+         verifyInitialData(cache2);
+      }
+      finally
+      {
+         TestingUtil.killCaches(cache1, cache2);
+      }
+      log.info("testInitialStateTransfer end - " + testCount);
+   }
 
-      verifyInitialData(cache2);
+   public void testConcurrentStateTransfer() throws Exception
+   {
+      testCount++;
+      log.info("testConcurrentStateTransfer start - " + testCount);
+      CacheSPI<Object, Object> cache1 = null, cache2 = null, cache3 = null, cache4 = null;
+      try
+      {
+         cache1 = createCache("nbst");
+         writeInitialData(cache1);
+
+         cache2 = createCache("nbst");
+
+         cache1.put("/delay", "delay", new DelayTransfer());
+
+         // Pause to give caches time to see each other
+         TestingUtil.blockUntilViewsReceived(new CacheSPI[] { cache1, cache2 }, 60000);
+         verifyInitialData(cache2);
+
+         final CacheSPI<Object, Object >c3 = cache3 = createCache("nbst", false);
+         final CacheSPI<Object, Object >c4 = cache4 = createCache("nbst", false);
+
+         Thread t1 = new Thread(new Runnable()
+         {
+            public void run()
+            {
+               c3.start();
+            }
+         });
+         t1.start();
+
+         Thread t2 = new Thread(new Runnable()
+         {
+            public void run()
+            {
+               c4.start();
+            }
+         });
+         t2.start();
+
+         t1.join();
+         t2.join();
+
+         TestingUtil.blockUntilViewsReceived(new CacheSPI[] { cache1, cache2, cache3, cache4 }, 60000);
+         verifyInitialData(cache3);
+         verifyInitialData(cache4);
+      }
+      finally
+      {
+         TestingUtil.killCaches(cache1, cache2, cache3, cache4);
+      }
+      log.info("testConcurrentStateTransfer end - " + testCount);
    }
 
-
    public void testSTWithThirdWritingNonTxCache() throws Exception
    {
+      testCount++;
+      log.info("testSTWithThirdWritingNonTxCache start - " + testCount);
       thirdWritingCacheTest(false, "nbst1");
+      log.info("testSTWithThirdWritingNonTxCache end - " + testCount);
    }
 
    public void testSTWithThirdWritingTxCache() throws Exception
    {
+      testCount++;
+      log.info("testSTWithThirdWritingTxCache start - " + testCount);
       thirdWritingCacheTest(true, "nbst2");
+      log.info("testSTWithThirdWritingTxCache end - " + testCount);
    }
 
    public void testSTWithWritingNonTxThread() throws Exception
    {
+      testCount++;
+      log.info("testSTWithWritingNonTxThread start - " + testCount);
       writingThreadTest(false, "nbst3");
+      log.info("testSTWithWritingNonTxThread end - " + testCount);
    }
 
    public void testSTWithWritingTxThread() throws Exception
    {
+      testCount++;
+      log.info("testSTWithWritingTxThread start - " + testCount);
       writingThreadTest(true, "nbst4");
+      log.info("testSTWithWritingTxThread end - " + testCount);
    }
 
-
    private void thirdWritingCacheTest(boolean tx, String name) throws InterruptedException
    {
-      final CacheSPI<Object, Object> cache1 = createCache(name);
-      final CacheSPI<Object, Object> cache3 = createCache(name);
+      CacheSPI<Object, Object> cache1 = null, cache2 = null, cache3 = null;
+      try
+      {
+         cache1 = createCache(name);
+         cache3 = createCache(name);
 
-      writeInitialData(cache1);
+         writeInitialData(cache1);
 
-      // Delay the transient copy, so that we get a more thorough log test
-      cache1.put("/delay", "delay", new DelayTransfer());
+         // Delay the transient copy, so that we get a more thorough log test
+         cache1.put("/delay", "delay", new DelayTransfer());
 
-      WritingRunner writer = new WritingRunner(cache3, tx);
-      Thread writerThread = new Thread(writer);
-      writerThread.start();
+         WritingRunner writer = new WritingRunner(cache3, tx);
+         Thread writerThread = new Thread(writer);
+         writerThread.start();
 
-      CacheSPI<Object, Object> cache2 = createCache(name);
+         cache2 = createCache(name);
 
-      // Pause to give caches time to see each other
-      TestingUtil.blockUntilViewsReceived(new CacheSPI[]{cache1, cache2, cache3}, 60000);
+         // Pause to give caches time to see each other
+         TestingUtil.blockUntilViewsReceived(new CacheSPI[] { cache1, cache2, cache3 }, 60000);
 
-      writer.stop();
-      writerThread.join();
+         writer.stop();
+         writerThread.join();
 
-      verifyInitialData(cache2);
+         verifyInitialData(cache2);
 
-      int count = writer.result();
+         int count = writer.result();
 
-      for (int c = 0; c < count; c++)
-         assertEquals(c, cache2.get("/test" + c, "test"));
+         for (int c = 0; c < count; c++)
+            assertEquals(c, cache2.get("/test" + c, "test"));
+      }
+      finally
+      {
+         TestingUtil.killCaches(cache1, cache2, cache3);
+      }
    }
 
    private void verifyInitialData(CacheSPI<Object, Object> cache2)
@@ -232,30 +310,38 @@
 
    private void writingThreadTest(boolean tx, String name) throws InterruptedException
    {
-      final CacheSPI<Object, Object> cache1 = createCache(name);
+      CacheSPI<Object, Object> cache1 = null, cache2 = null;
+      try
+      {
+         cache1 = createCache(name);
 
-      writeInitialData(cache1);
+         writeInitialData(cache1);
 
-      // Delay the transient copy, so that we get a more thorough log test
-      cache1.put("/delay", "delay", new DelayTransfer());
+         // Delay the transient copy, so that we get a more thorough log test
+         cache1.put("/delay", "delay", new DelayTransfer());
 
-      WritingRunner writer = new WritingRunner(cache1, tx);
-      Thread writerThread = new Thread(writer);
-      writerThread.start();
+         WritingRunner writer = new WritingRunner(cache1, tx);
+         Thread writerThread = new Thread(writer);
+         writerThread.start();
 
-      CacheSPI<Object, Object> cache2 = createCache(name);
+         cache2 = createCache(name);
 
-      // Pause to give caches time to see each other
-      TestingUtil.blockUntilViewsReceived(new CacheSPI[]{cache1, cache2}, 60000);
+         // Pause to give caches time to see each other
+         TestingUtil.blockUntilViewsReceived(new CacheSPI[] { cache1, cache2 }, 60000);
 
-      writer.stop();
-      writerThread.join();
+         writer.stop();
+         writerThread.join();
 
-      verifyInitialData(cache2);
+         verifyInitialData(cache2);
 
-      int count = writer.result();
+         int count = writer.result();
 
-      for (int c = 0; c < count; c++)
-         assertEquals(c, cache2.get("/test" + c, "test"));
+         for (int c = 0; c < count; c++)
+            assertEquals(c, cache2.get("/test" + c, "test"));
+      }
+      finally
+      {
+         TestingUtil.killCaches(cache1, cache2);
+      }
    }
 }

Modified: core/trunk/src/test/java/org/jboss/cache/transaction/PrepareCommitContentionTest.java
===================================================================
--- core/trunk/src/test/java/org/jboss/cache/transaction/PrepareCommitContentionTest.java	2009-02-10 20:39:38 UTC (rev 7677)
+++ core/trunk/src/test/java/org/jboss/cache/transaction/PrepareCommitContentionTest.java	2009-02-11 06:00:34 UTC (rev 7678)
@@ -4,6 +4,7 @@
 import org.jboss.cache.Fqn;
 import org.jboss.cache.RPCManager;
 import org.jboss.cache.UnitTestCacheFactory;
+import org.jboss.cache.RPCManagerImpl.FlushTracker;
 import org.jboss.cache.commands.ReplicableCommand;
 import org.jboss.cache.commands.remote.ReplicateCommand;
 import org.jboss.cache.commands.tx.CommitCommand;
@@ -23,6 +24,7 @@
 import java.util.List;
 import java.util.Map;
 import java.util.Vector;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 /**
  * This is to test the scenario described in http://jira.jboss.org/jira/browse/JBCACHE-1270
@@ -162,9 +164,9 @@
          return delegate.getLastStateTransferSource();
       }
 
-      public void waitForFlush(long timeout)
+      public FlushTracker getFlushTracker()
       {
-         delegate.waitForFlush(timeout);
+         return delegate.getFlushTracker();
       }
    }
 }




More information about the jbosscache-commits mailing list