[jbosscache-commits] JBoss Cache SVN: r7649 - in core/trunk/src: main/java/org/jboss/cache/config and 8 other directories.

jbosscache-commits at lists.jboss.org jbosscache-commits at lists.jboss.org
Wed Feb 4 18:54:51 EST 2009


Author: jason.greene at jboss.com
Date: 2009-02-04 18:54:51 -0500 (Wed, 04 Feb 2009)
New Revision: 7649

Added:
   core/trunk/src/main/java/org/jboss/cache/factories/TransactionLogFactory.java
   core/trunk/src/main/java/org/jboss/cache/transaction/TransactionLog.java
   core/trunk/src/test/java/org/jboss/cache/statetransfer/NonBlockingStateTransferTest.java
Modified:
   core/trunk/src/main/java/org/jboss/cache/RPCManager.java
   core/trunk/src/main/java/org/jboss/cache/RPCManagerImpl.java
   core/trunk/src/main/java/org/jboss/cache/config/Configuration.java
   core/trunk/src/main/java/org/jboss/cache/factories/ComponentRegistry.java
   core/trunk/src/main/java/org/jboss/cache/interceptors/TxInterceptor.java
   core/trunk/src/main/java/org/jboss/cache/marshall/CacheMarshaller200.java
   core/trunk/src/main/java/org/jboss/cache/marshall/CommandAwareRpcDispatcher.java
   core/trunk/src/main/java/org/jboss/cache/marshall/InactiveRegionAwareRpcDispatcher.java
   core/trunk/src/main/java/org/jboss/cache/statetransfer/DefaultStateTransferGenerator.java
   core/trunk/src/main/java/org/jboss/cache/statetransfer/DefaultStateTransferIntegrator.java
   core/trunk/src/main/java/org/jboss/cache/statetransfer/DefaultStateTransferManager.java
   core/trunk/src/test/java/org/jboss/cache/transaction/PrepareCommitContentionTest.java
   core/trunk/src/test/resources/log4j.xml
Log:
Implement NBST

Modified: core/trunk/src/main/java/org/jboss/cache/RPCManager.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/RPCManager.java	2009-02-04 23:48:43 UTC (rev 7648)
+++ core/trunk/src/main/java/org/jboss/cache/RPCManager.java	2009-02-04 23:54:51 UTC (rev 7649)
@@ -22,12 +22,14 @@
 package org.jboss.cache;
 
 import org.jboss.cache.commands.ReplicableCommand;
+import org.jboss.cache.lock.TimeoutException;
 import org.jgroups.Address;
 import org.jgroups.Channel;
 import org.jgroups.blocks.RspFilter;
 
 import java.util.List;
 import java.util.Vector;
+import java.util.concurrent.TimeUnit;
 
 /**
  * Provides a mechanism for communicating with other caches in the cluster.  For now this is based on JGroups as an underlying
@@ -150,4 +152,8 @@
     * @return a channel
     */
    Channel getChannel();
+
+   public void waitForFlush(long timeout);
+
+   public Address getLastStateTransferSource();
 }

Modified: core/trunk/src/main/java/org/jboss/cache/RPCManagerImpl.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/RPCManagerImpl.java	2009-02-04 23:48:43 UTC (rev 7648)
+++ core/trunk/src/main/java/org/jboss/cache/RPCManagerImpl.java	2009-02-04 23:54:51 UTC (rev 7649)
@@ -51,8 +51,10 @@
 import org.jboss.cache.util.reflect.ReflectionUtil;
 import org.jgroups.Address;
 import org.jgroups.Channel;
+import org.jgroups.ChannelClosedException;
 import org.jgroups.ChannelException;
 import org.jgroups.ChannelFactory;
+import org.jgroups.ChannelNotConnectedException;
 import org.jgroups.ExtendedMembershipListener;
 import org.jgroups.JChannel;
 import org.jgroups.StateTransferException;
@@ -100,7 +102,18 @@
     * Thread gate used to block Dispatcher during JGroups FLUSH protocol
     */
    private final ReclosableLatch flushBlockGate = new ReclosableLatch();
+
    /**
+    * Thread gate used by NBST to wait for a flush
+    */
+   private final ReclosableLatch flushWaitGate = new ReclosableLatch(false);
+
+   /**
+    * The most recent state transfer source
+    */
+   volatile Address lastStateTransferSource;
+
+   /**
     * JGroups RpcDispatcher in use.
     */
    private CommandAwareRpcDispatcher rpcDispatcher = null;
@@ -109,7 +122,7 @@
     * JGroups message listener.
     */
    private ChannelMessageListener messageListener;
-   private Configuration configuration;
+   Configuration configuration;
    private Notifier notifier;
    private CacheSPI spi;
    private InvocationContextContainer invocationContextContainer;
@@ -148,6 +161,7 @@
    @Start(priority = 15)
    public void start()
    {
+
       switch (configuration.getCacheMode())
       {
          case LOCAL:
@@ -161,56 +175,131 @@
          case INVALIDATION_SYNC:
             isInLocalMode = false;
             isUsingBuddyReplication = configuration.getBuddyReplicationConfig() != null && configuration.getBuddyReplicationConfig().isEnabled();
-            if (log.isDebugEnabled()) log.debug("Cache mode is " + configuration.getCacheMode());
+            if (log.isDebugEnabled())
+               log.debug("Cache mode is " + configuration.getCacheMode());
 
             boolean fetchState = shouldFetchStateOnStartup();
-            initialiseChannelAndRpcDispatcher(fetchState);
+            boolean nonBlocking = configuration.isNonBlockingStateTransfer();
+            initialiseChannelAndRpcDispatcher(fetchState && !nonBlocking);
 
-            if (fetchState)
+            if (!fetchState || nonBlocking)
             {
                try
                {
-                  long start = System.currentTimeMillis();
-                  // connect and state transfer
-                  channel.connect(configuration.getClusterName(), null, null, configuration.getStateRetrievalTimeout());
-                  //if I am not the only and the first member than wait for a state to arrive
-                  if (getMembers().size() > 1) messageListener.waitForState();
-
-                  if (log.isDebugEnabled())
-                     log.debug("connected, state was retrieved successfully (in " + (System.currentTimeMillis() - start) + " milliseconds)");
+                  // Allow commands to be ACKed during state transfer
+                  if (nonBlocking)
+                     componentRegistry.setBlockInStarting(false);
+                  channel.connect(configuration.getClusterName());
+                  if (log.isInfoEnabled())
+                     log.info("Cache local address is " + getLocalAddress());
                }
-               catch (StateTransferException ste)
-               {
-                  // make sure we disconnect from the channel before we throw this exception!
-                  // JBCACHE-761
-                  disconnect();
-                  throw new CacheException("Unable to fetch state on startup", ste);
-               }
                catch (ChannelException e)
                {
                   throw new CacheException("Unable to connect to JGroups channel", e);
                }
-               catch (Exception ex)
-               {
-                  throw new CacheException("Unable to fetch state on startup", ex);
-               }
+
+               if (!fetchState)
+                  return;
             }
+
+
+            List<Address> members = getMembers();
+
+            long start = System.currentTimeMillis();
+            if (nonBlocking)
+            {
+               startNonBlockStateTransfer(members);
+            }
             else
             {
-               //otherwise just connect
                try
                {
-                  channel.connect(configuration.getClusterName());
+                  channel.connect(configuration.getClusterName(), null, null, configuration.getStateRetrievalTimeout());
+                  if (log.isInfoEnabled())
+                     log.info("Cache local address is " + getLocalAddress());
+
+                  if (members.size() > 1)
+                     messageListener.waitForState();
                }
                catch (ChannelException e)
                {
                   throw new CacheException("Unable to connect to JGroups channel", e);
                }
+               catch (Exception ex)
+               {
+                  // make sure we disconnect from the channel before we throw this exception!
+                  // JBCACHE-761
+                  disconnect();
+                  throw new CacheException("Unable to fetch state on startup", ex);
+               }
             }
-            if (log.isInfoEnabled()) log.info("Cache local address is " + getLocalAddress());
+
+            if (log.isDebugEnabled())
+               log.debug("state was retrieved successfully (in " + (System.currentTimeMillis() - start) + " milliseconds)");
       }
+
    }
 
+   private void startNonBlockStateTransfer(List<Address> members)
+   {
+
+      if (members.size() < 2)
+         return;
+
+      boolean success = false;
+
+      outer:
+      for (int i = 0, wait = 1000; i < 5; i++)
+      {
+         for (Address member : members)
+         {
+            if (member.equals(getLocalAddress()))
+               continue;
+
+            try
+            {
+               if (log.isTraceEnabled())
+                  log.trace("Trying to fetch state from: " + member);
+               if (getState(null, member))
+               {
+                  messageListener.waitForState();
+                  success = true;
+                  break outer;
+               }
+            }
+            catch (Exception e)
+            {
+               if (log.isTraceEnabled())
+                  log.trace("Error while fetching state", e);
+            }
+         }
+
+         if (!success)
+         {
+            if (trace)
+               log.trace("Could not find available peer for state, backing off and retrying");
+
+            try
+            {
+               Thread.sleep(wait <<= 2);
+            }
+            catch (InterruptedException e)
+            {
+               Thread.currentThread().interrupt();
+            }
+         }
+
+      }
+
+      if (!success)
+      {
+         disconnect();
+         throw new CacheException("Unable to fetch state on startup");
+      }
+
+      componentRegistry.setBlockInStarting(true);
+   }
+
    public void disconnect()
    {
       if (channel != null && channel.isOpen())
@@ -316,12 +405,12 @@
       if (configuration.isUseRegionBasedMarshalling())
       {
          rpcDispatcher = new InactiveRegionAwareRpcDispatcher(channel, messageListener, new MembershipListenerAdaptor(),
-               spi, invocationContextContainer, interceptorChain, componentRegistry);
+               spi, invocationContextContainer, interceptorChain, componentRegistry, flushBlockGate);
       }
       else
       {
          rpcDispatcher = new CommandAwareRpcDispatcher(channel, messageListener, new MembershipListenerAdaptor(),
-               invocationContextContainer, invocationContextContainer, interceptorChain, componentRegistry);
+               invocationContextContainer, invocationContextContainer, interceptorChain, componentRegistry, flushBlockGate);
       }
       checkAppropriateConfig();
       rpcDispatcher.setRequestMarshaller(marshaller);
@@ -556,7 +645,7 @@
             if (log.isDebugEnabled())
                log.debug("Node " + getLocalAddress() + " fetching partial state " + stateId + " from member " + target);
             messageListener.setStateSet(false);
-            successfulTransfer = channel.getState(target, stateId, configuration.getStateRetrievalTimeout());
+            successfulTransfer = getState(stateId, target);
             if (successfulTransfer)
             {
                try
@@ -588,6 +677,31 @@
 
    }
 
+   private boolean getState(String stateId, Address target) throws ChannelNotConnectedException, ChannelClosedException
+   {
+      lastStateTransferSource = target;
+      return ((JChannel)channel).getState(target, stateId, configuration.getStateRetrievalTimeout(), !configuration.isNonBlockingStateTransfer());
+   }
+
+   public void waitForFlush(long timeout)
+   {
+      for (;;)
+      {
+         try
+         {
+            if (channel.flushSupported() && !flushWaitGate.await(timeout, TimeUnit.MILLISECONDS))
+            {
+               throw new TimeoutException("State retrieval timed out waiting for flush to block. (timeout = " + timeout+ " millis) ");
+            }
+            return;
+         }
+         catch (InterruptedException e)
+         {
+            Thread.currentThread().interrupt();
+         }
+      }
+   }
+
    // ------------ END: Partial state transfer methods ------------
 
    // ------------ START: Informational methods ------------
@@ -599,6 +713,11 @@
       return address == null ? "null" : address.toString();
    }
 
+   public Address getLastStateTransferSource()
+   {
+      return lastStateTransferSource;
+   }
+
    public Address getLocalAddress()
    {
       return channel != null ? channel.getLocalAddress() : null;
@@ -734,6 +853,7 @@
          try
          {
             flushBlockGate.close();
+            flushWaitGate.open();
             if (log.isDebugEnabled()) log.debug("Block received at " + getLocalAddress());
             notifier.notifyCacheBlocked(true);
             notifier.notifyCacheBlocked(false);
@@ -754,6 +874,7 @@
       {
          try
          {
+            flushWaitGate.close();
             if (log.isDebugEnabled()) log.debug("UnBlock received at " + getLocalAddress());
 
             notifier.notifyCacheUnblocked(true);
@@ -862,4 +983,4 @@
          }
       }
    }
-}
\ No newline at end of file
+}

Modified: core/trunk/src/main/java/org/jboss/cache/config/Configuration.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/config/Configuration.java	2009-02-04 23:48:43 UTC (rev 7648)
+++ core/trunk/src/main/java/org/jboss/cache/config/Configuration.java	2009-02-04 23:54:51 UTC (rev 7649)
@@ -187,6 +187,7 @@
    private boolean exposeManagementStatistics = true;
    @Dynamic
    private boolean fetchInMemoryState = true;
+   private boolean nonBlockingStateTransfer = false;
    private short replicationVersion = DEFAULT_REPLICATION_VERSION;
    @Dynamic
    private long lockAcquisitionTimeout = 10000;
@@ -474,7 +475,7 @@
 
    /**
     * Sets the queue size of the bounded queue used to store async serialization events on.  This defaults to 50,000.
-    * 
+    *
     * @param serializationExecutorQueueSize queue size to use
     */
    public void setSerializationExecutorQueueSize(int serializationExecutorQueueSize)
@@ -1141,4 +1142,14 @@
       return null;
    }
 
+   public boolean isNonBlockingStateTransfer()
+   {
+      return nonBlockingStateTransfer;
+   }
+
+   public void setNonBlockingStateTransfer(boolean nonBlockingStateTransfer)
+   {
+      this.nonBlockingStateTransfer = nonBlockingStateTransfer;
+   }
+
 }

Modified: core/trunk/src/main/java/org/jboss/cache/factories/ComponentRegistry.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/factories/ComponentRegistry.java	2009-02-04 23:48:43 UTC (rev 7648)
+++ core/trunk/src/main/java/org/jboss/cache/factories/ComponentRegistry.java	2009-02-04 23:54:51 UTC (rev 7649)
@@ -105,6 +105,8 @@
     */
    private boolean invokedFromShutdownHook;
 
+   private volatile boolean blockInStarting = true;
+
    /**
     * Creates an instance of the component registry.  The configuration passed in is automatically registered.
     *
@@ -202,6 +204,7 @@
       s.add(RegionManagerFactory.class);
       s.add(NodeMetaFactory.class);
       s.add(CommandsMetaFactory.class);
+      s.add(TransactionLogFactory.class);
       return s;
    }
 
@@ -883,7 +886,7 @@
       log.trace("Is remotely originating.");
 
       // else if this is a remote call and the status is STARTING, wait until the cache starts.
-      if (state == CacheStatus.STARTING)
+      if (state == CacheStatus.STARTING && blockInStarting)
       {
          log.trace("Cache is starting; block.");
          try
@@ -1023,4 +1026,9 @@
       HashSet<Component> defensiveCopy = new HashSet<Component>(componentLookup.values());
       return Collections.unmodifiableSet(defensiveCopy);
    }
+
+   public void setBlockInStarting(boolean blockInStarting)
+   {
+      this.blockInStarting = blockInStarting;
+   }
 }

Added: core/trunk/src/main/java/org/jboss/cache/factories/TransactionLogFactory.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/factories/TransactionLogFactory.java	                        (rev 0)
+++ core/trunk/src/main/java/org/jboss/cache/factories/TransactionLogFactory.java	2009-02-04 23:54:51 UTC (rev 7649)
@@ -0,0 +1,40 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2000 - 2008, Red Hat Middleware LLC, and individual contributors
+ * as indicated by the @author tags. See the copyright.txt file in the
+ * distribution for a full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.cache.factories;
+
+import org.jboss.cache.factories.annotations.DefaultFactoryFor;
+import org.jboss.cache.transaction.TransactionLog;
+
+/**
+ * Constructs {@link org.jboss.cache.transaction.TransactionLog} instances.
+ *
+ * @author Jason T. Greene
+ * @since 3.0
+ */
+ at DefaultFactoryFor(classes = TransactionLog.class)
+public class TransactionLogFactory extends ComponentFactory
+{
+   protected <T> T construct(Class<T> componentType)
+   {
+      return componentType.cast(new TransactionLog());
+   }
+}


Property changes on: core/trunk/src/main/java/org/jboss/cache/factories/TransactionLogFactory.java
___________________________________________________________________
Name: svn:keywords
   + Id Revision
Name: svn:eol-style
   + LF

Modified: core/trunk/src/main/java/org/jboss/cache/interceptors/TxInterceptor.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/interceptors/TxInterceptor.java	2009-02-04 23:48:43 UTC (rev 7648)
+++ core/trunk/src/main/java/org/jboss/cache/interceptors/TxInterceptor.java	2009-02-04 23:54:51 UTC (rev 7649)
@@ -21,6 +21,20 @@
  */
 package org.jboss.cache.interceptors;
 
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+import javax.transaction.InvalidTransactionException;
+import javax.transaction.Status;
+import javax.transaction.Synchronization;
+import javax.transaction.SystemException;
+import javax.transaction.Transaction;
+
 import org.jboss.cache.CacheException;
 import org.jboss.cache.InvocationContext;
 import org.jboss.cache.RPCManager;
@@ -51,21 +65,11 @@
 import org.jboss.cache.notifications.Notifier;
 import org.jboss.cache.transaction.GlobalTransaction;
 import org.jboss.cache.transaction.TransactionContext;
+import org.jboss.cache.transaction.TransactionLog;
 import org.jboss.cache.transaction.TransactionTable;
+import org.jboss.cache.util.Immutables;
 import org.jboss.cache.util.concurrent.ConcurrentHashSet;
 
-import javax.transaction.InvalidTransactionException;
-import javax.transaction.Status;
-import javax.transaction.Synchronization;
-import javax.transaction.SystemException;
-import javax.transaction.Transaction;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-
 /**
  * This interceptor is the new default at the head of all interceptor chains,
  * and makes transactional attributes available to all interceptors in the chain.
@@ -83,12 +87,14 @@
    private InvocationContextContainer invocationContextContainer;
    private ComponentRegistry componentRegistry;
    private ContextFactory contextFactory;
+   private TransactionLog transactionLog;
 
    /**
     * List <Transaction>that we have registered for
     */
    private final Set<Transaction> transactions = new ConcurrentHashSet<Transaction>();
    private final Map<Transaction, GlobalTransaction> rollbackTransactions = new ConcurrentHashMap<Transaction, GlobalTransaction>(16);
+
    private long prepares = 0;
    private long commits = 0;
    private long rollbacks = 0;
@@ -99,9 +105,11 @@
    @Inject
    public void intialize(RPCManager rpcManager, ContextFactory contextFactory,
                          Notifier notifier, InvocationContextContainer icc,
-                         CommandsFactory factory, ComponentRegistry componentRegistry, LockManager lockManager)
+                         TransactionLog transactionLog, CommandsFactory factory,
+                         ComponentRegistry componentRegistry, LockManager lockManager)
    {
       this.contextFactory = contextFactory;
+      this.transactionLog = transactionLog;
       this.commandsFactory = factory;
       this.rpcManager = rpcManager;
       this.notifier = notifier;
@@ -117,10 +125,12 @@
       Object result = null;
 
       // this is a prepare, commit, or rollback.
-      if (trace) log.trace("Got gtx from invocation context " + ctx.getGlobalTransaction());
+      GlobalTransaction gtx = ctx.getGlobalTransaction();
+      if (trace) log.trace("Got gtx from invocation context " + gtx);
+
       try
       {
-         if (ctx.getGlobalTransaction().isRemote())
+         if (gtx.isRemote())
          {
             result = handleRemotePrepare(ctx, command);
             if (getStatisticsEnabled()) prepares++;
@@ -142,6 +152,8 @@
    @Override
    public Object visitCommitCommand(InvocationContext ctx, CommitCommand command) throws Throwable
    {
+      GlobalTransaction gtx = ctx.getGlobalTransaction();
+
       if (!ctx.getGlobalTransaction().isRemote())
       {
          if (trace) log.trace("received my own message (discarding it)");
@@ -150,7 +162,7 @@
       try
       {
          if (trace) log.trace("(" + rpcManager.getLocalAddress() + ") call on command [" + command + "]");
-         GlobalTransaction gtx = ctx.getGlobalTransaction();
+
          Transaction ltx = txTable.getLocalTransaction(gtx, true);
          // disconnect if we have a current tx associated
          Transaction currentTx = txManager.getTransaction();
@@ -194,7 +206,9 @@
    @Override
    public Object visitRollbackCommand(InvocationContext ctx, RollbackCommand command) throws Throwable
    {
-      if (!ctx.getGlobalTransaction().isRemote())
+      GlobalTransaction gtx = ctx.getGlobalTransaction();
+
+      if (!gtx.isRemote())
       {
          if (trace) log.trace("received my own message (discarding it)");
          return null;
@@ -202,7 +216,6 @@
       try
       {
          if (trace) log.trace("(" + rpcManager.getLocalAddress() + ") call on command [" + command + "]");
-         GlobalTransaction gtx = ctx.getGlobalTransaction();
          Transaction ltx = txTable.getLocalTransaction(gtx);
          if (ltx == null)
          {
@@ -268,7 +281,12 @@
    {
       try
       {
-         return attachGtxAndPassUpChain(ctx, command);
+         Object ret = attachGtxAndPassUpChain(ctx, command);
+
+         if (command instanceof WriteCommand && ctx.getTransaction() == null)
+            transactionLog.logNoTxWrite((WriteCommand)command);
+
+         return ret;
       }
       catch (Throwable throwable)
       {
@@ -382,6 +400,8 @@
          }
          else
          {
+            transactionLog.logPrepare(command);
+
             // now pass up the prepare method itself.
             invokeNextInterceptor(ctx, command);
          }
@@ -544,11 +564,22 @@
    {
       try
       {
-         VisitableCommand commitCommand = onePhaseCommit ? buildPrepareCommand(gtx, modifications, true) : commandsFactory.buildCommitCommand(gtx);
-
          if (trace) log.trace("Running commit for " + gtx);
 
+         VisitableCommand commitCommand = onePhaseCommit ? buildPrepareCommand(gtx, modifications, true)
+                                                         : commandsFactory.buildCommitCommand(gtx);
+
+
          handleCommitRollback(ctx, commitCommand);
+
+         if (onePhaseCommit)
+         {
+            transactionLog.logOnePhaseCommit(gtx, modifications);
+         }
+         else
+         {
+            transactionLog.logCommit(gtx);
+         }
       }
       catch (Throwable e)
       {
@@ -595,6 +626,8 @@
          VisitableCommand rollbackCommand = commandsFactory.buildRollbackCommand(gtx);
          if (trace) log.trace(" running rollback for " + gtx);
 
+         transactionLog.rollback(gtx);
+
          //JBCACHE-359 Store a lookup for the globalTransaction so a listener
          // callback can find it
          rollbackTransactions.put(tx, gtx);
@@ -630,8 +663,10 @@
    public Object runPreparePhase(InvocationContext ctx, GlobalTransaction gtx, List<WriteCommand> modifications) throws Throwable
    {
       // running a 2-phase commit.
-      VisitableCommand prepareCommand = buildPrepareCommand(gtx, modifications, false);
+      PrepareCommand prepareCommand = buildPrepareCommand(gtx, modifications, false);
 
+      transactionLog.logPrepare(prepareCommand);
+
       Object result;
 
       // Is there a local transaction associated with GTX ?

Modified: core/trunk/src/main/java/org/jboss/cache/marshall/CacheMarshaller200.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/marshall/CacheMarshaller200.java	2009-02-04 23:48:43 UTC (rev 7648)
+++ core/trunk/src/main/java/org/jboss/cache/marshall/CacheMarshaller200.java	2009-02-04 23:54:51 UTC (rev 7649)
@@ -21,21 +21,6 @@
  */
 package org.jboss.cache.marshall;
 
-import org.jboss.cache.CacheException;
-import org.jboss.cache.Fqn;
-import org.jboss.cache.Region;
-import org.jboss.cache.Region.Status;
-import org.jboss.cache.buddyreplication.GravitateResult;
-import org.jboss.cache.commands.CommandsFactory;
-import org.jboss.cache.commands.ReplicableCommand;
-import org.jboss.cache.factories.annotations.Inject;
-import org.jboss.cache.optimistic.DefaultDataVersion;
-import org.jboss.cache.transaction.GlobalTransaction;
-import org.jboss.cache.util.FastCopyHashMap;
-import org.jboss.cache.util.Immutables;
-import org.jgroups.Address;
-import org.jgroups.stack.IpAddress;
-
 import java.io.Externalizable;
 import java.io.IOException;
 import java.io.ObjectInputStream;
@@ -53,6 +38,23 @@
 import java.util.TreeMap;
 import java.util.TreeSet;
 
+import org.jboss.cache.CacheException;
+import org.jboss.cache.Fqn;
+import org.jboss.cache.Region;
+import org.jboss.cache.Region.Status;
+import org.jboss.cache.buddyreplication.GravitateResult;
+import org.jboss.cache.commands.CommandsFactory;
+import org.jboss.cache.commands.ReplicableCommand;
+import org.jboss.cache.commands.WriteCommand;
+import org.jboss.cache.factories.annotations.Inject;
+import org.jboss.cache.optimistic.DefaultDataVersion;
+import org.jboss.cache.transaction.GlobalTransaction;
+import org.jboss.cache.transaction.TransactionLog.LogEntry;
+import org.jboss.cache.util.FastCopyHashMap;
+import org.jboss.cache.util.Immutables;
+import org.jgroups.Address;
+import org.jgroups.stack.IpAddress;
+
 /**
  * An enhanced marshaller for RPC calls between CacheImpl instances.
  *
@@ -90,10 +92,12 @@
    protected static final int MAGICNUMBER_FLOAT = 27;
    protected static final int MAGICNUMBER_DOUBLE = 28;
    protected static final int MAGICNUMBER_OBJECT = 29;
+   protected static final int MAGICNUMBER_TXLOG_ENTRY = 50;
    protected static final int MAGICNUMBER_NULL = 99;
    protected static final int MAGICNUMBER_SERIALIZABLE = 100;
    protected static final int MAGICNUMBER_REF = 101;
 
+
    protected static final InactiveRegionException IRE = new InactiveRegionException("Cannot unmarshall to an inactive region");
 
    public CacheMarshaller200()
@@ -343,6 +347,11 @@
          if (useRefs) writeReference(out, createReference(o, refMap));
          marshallGlobalTransaction((GlobalTransaction) o, out, refMap);
       }
+      else if (o instanceof LogEntry)
+      {
+         out.writeByte(MAGICNUMBER_TXLOG_ENTRY);
+         marshallLogEntry((LogEntry)o, out, refMap);
+      }
       else if (o instanceof IpAddress)
       {
          out.writeByte(MAGICNUMBER_IPADDRESS);
@@ -455,6 +464,12 @@
       }
    }
 
+   private void marshallLogEntry(LogEntry log, ObjectOutputStream out, Map<Object, Integer> refMap) throws Exception
+   {
+      marshallObject(log.getTransaction(), out, refMap);
+      marshallObject(log.getModifications(), out, refMap);
+   }
+
    private void marshallGravitateResult(GravitateResult gravitateResult, ObjectOutputStream out, Map<Object, Integer> refMap) throws Exception
    {
       marshallObject(gravitateResult.isDataFound(), out, refMap);
@@ -605,6 +620,8 @@
             retVal = unmarshallGlobalTransaction(in, refMap);
             if (useRefs) refMap.putReferencedObject(reference, retVal);
             return retVal;
+         case MAGICNUMBER_TXLOG_ENTRY:
+            return unmarshallLogEntry(in, refMap);
          case MAGICNUMBER_IPADDRESS:
             retVal = unmarshallIpAddress(in);
             return retVal;
@@ -666,6 +683,15 @@
       throw new Exception("Unknown magic number " + magicNumber);
    }
 
+   @SuppressWarnings("unchecked")
+   private Object unmarshallLogEntry(ObjectInputStream in, UnmarshalledReferences refMap) throws Exception
+   {
+      GlobalTransaction gtx = (GlobalTransaction)unmarshallObject(in, refMap);
+      List<WriteCommand> mods = (List<WriteCommand>)unmarshallObject(in, refMap);
+
+      return new LogEntry(gtx, mods);
+   }
+
    private FastCopyHashMap unmarshallFastCopyHashMap(ObjectInputStream in, UnmarshalledReferences refMap) throws Exception
    {
       FastCopyHashMap map = new FastCopyHashMap();

Modified: core/trunk/src/main/java/org/jboss/cache/marshall/CommandAwareRpcDispatcher.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/marshall/CommandAwareRpcDispatcher.java	2009-02-04 23:48:43 UTC (rev 7648)
+++ core/trunk/src/main/java/org/jboss/cache/marshall/CommandAwareRpcDispatcher.java	2009-02-04 23:54:51 UTC (rev 7649)
@@ -21,6 +21,16 @@
  */
 package org.jboss.cache.marshall;
 
+import java.io.NotSerializableException;
+import java.util.Vector;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
 import org.jboss.cache.InvocationContext;
 import org.jboss.cache.commands.ReplicableCommand;
 import org.jboss.cache.commands.VisitableCommand;
@@ -31,7 +41,9 @@
 import org.jboss.cache.factories.ComponentRegistry;
 import org.jboss.cache.interceptors.InterceptorChain;
 import org.jboss.cache.invocation.InvocationContextContainer;
+import org.jboss.cache.lock.TimeoutException;
 import org.jboss.cache.util.concurrent.BoundedExecutors;
+import org.jboss.cache.util.concurrent.ReclosableLatch;
 import org.jboss.cache.util.concurrent.WithinThreadExecutor;
 import org.jgroups.Address;
 import org.jgroups.Channel;
@@ -44,16 +56,6 @@
 import org.jgroups.util.Rsp;
 import org.jgroups.util.RspList;
 
-import java.io.NotSerializableException;
-import java.util.Vector;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-
 /**
  * A JGroups RPC dispatcher that knows how to deal with {@link org.jboss.cache.commands.ReplicableCommand}s.
  *
@@ -69,24 +71,27 @@
    private ExecutorService replicationProcessor;
    private AtomicInteger replicationProcessorCount;
    private boolean asyncSerial;
+   private Configuration configuration;
+   private ReclosableLatch flushGate;
    private ReplicationObserver replicationObserver;
 
-   public CommandAwareRpcDispatcher()
-   {
-   }
+   public CommandAwareRpcDispatcher() {}
 
-   public CommandAwareRpcDispatcher(Channel channel, MessageListener l, MembershipListener l2, Object serverObj,
-                                    InvocationContextContainer container, InterceptorChain interceptorChain,
-                                    ComponentRegistry componentRegistry)
+   public CommandAwareRpcDispatcher(Channel channel, MessageListener l, MembershipListener l2,
+                                    Object serverObj, InvocationContextContainer container, InterceptorChain interceptorChain,
+                                    ComponentRegistry componentRegistry, ReclosableLatch flushGate)
    {
       super(channel, l, l2, serverObj);
       this.invocationContextContainer = container;
       this.componentRegistry = componentRegistry;
       this.interceptorChain = interceptorChain;
+      this.flushGate = flushGate;
+
       trace = log.isTraceEnabled();
 
       // what sort of a repl processor do we need?
       Configuration c = componentRegistry.getComponent(Configuration.class);
+      this.configuration = c;
       replicationProcessor = c.getRuntimeConfig().getAsyncSerializationExecutor();
       if (c.getCacheMode().isSynchronous() ||
             (replicationProcessor == null && c.getSerializationExecutorPoolSize() < 1) || requireSyncMarshalling(c)) // if an executor has not been injected and the pool size is set
@@ -125,7 +130,7 @@
    /**
     * Serial(sync) marshalling should be enabled for async optimistic caches. That is because optimistic async is a 2PC,
     * which might cause the Commit command to be send before the Prepare command, so replication will fail. This is not
-    * the same for async <b>pessimistic/mvcc</b> replication, as this uses a 1PC. 
+    * the same for async <b>pessimistic/mvcc</b> replication, as this uses a 1PC.
     */
    private boolean requireSyncMarshalling(Configuration c)
    {
@@ -253,6 +258,11 @@
          if (cmd == null) throw new NullPointerException("Unable to execute a null command!  Message was " + req);
          if (trace) log.trace("Executing command: " + cmd + " [sender=" + req.getSrc() + "]");
 
+         if (channel.flushSupported() && !flushGate.await(configuration.getStateRetrievalTimeout(), TimeUnit.MILLISECONDS))
+         {
+            throw new TimeoutException("State retrieval timed out waiting for flush unblock. (timeout = " + configuration.getStateRetrievalTimeout() + " millis) ");
+         }
+
          if (cmd instanceof VisitableCommand)
          {
             InvocationContext ctx = invocationContextContainer.get();

Modified: core/trunk/src/main/java/org/jboss/cache/marshall/InactiveRegionAwareRpcDispatcher.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/marshall/InactiveRegionAwareRpcDispatcher.java	2009-02-04 23:48:43 UTC (rev 7648)
+++ core/trunk/src/main/java/org/jboss/cache/marshall/InactiveRegionAwareRpcDispatcher.java	2009-02-04 23:54:51 UTC (rev 7649)
@@ -25,6 +25,7 @@
 import org.jboss.cache.factories.ComponentRegistry;
 import org.jboss.cache.interceptors.InterceptorChain;
 import org.jboss.cache.invocation.InvocationContextContainer;
+import org.jboss.cache.util.concurrent.ReclosableLatch;
 import org.jgroups.Channel;
 import org.jgroups.MembershipListener;
 import org.jgroups.Message;
@@ -46,9 +47,9 @@
     */
    public InactiveRegionAwareRpcDispatcher(Channel channel, MessageListener l, MembershipListener l2, Object serverObj,
                                            InvocationContextContainer container, InterceptorChain interceptorChain,
-                                           ComponentRegistry componentRegistry)
+                                           ComponentRegistry componentRegistry, ReclosableLatch flushBlockGate)
    {
-      super(channel, l, l2, serverObj, container, interceptorChain, componentRegistry);
+      super(channel, l, l2, serverObj, container, interceptorChain, componentRegistry, flushBlockGate);
    }
 
    @Override

Modified: core/trunk/src/main/java/org/jboss/cache/statetransfer/DefaultStateTransferGenerator.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/statetransfer/DefaultStateTransferGenerator.java	2009-02-04 23:48:43 UTC (rev 7648)
+++ core/trunk/src/main/java/org/jboss/cache/statetransfer/DefaultStateTransferGenerator.java	2009-02-04 23:54:51 UTC (rev 7649)
@@ -21,25 +21,29 @@
  */
 package org.jboss.cache.statetransfer;
 
+import java.io.ObjectOutputStream;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.jboss.cache.CacheException;
 import org.jboss.cache.CacheSPI;
 import org.jboss.cache.Fqn;
 import org.jboss.cache.InternalNode;
 import org.jboss.cache.Node;
+import org.jboss.cache.RPCManager;
 import org.jboss.cache.Version;
+import org.jboss.cache.config.Configuration;
 import org.jboss.cache.factories.annotations.Inject;
 import org.jboss.cache.factories.annotations.Start;
 import org.jboss.cache.loader.CacheLoader;
 import org.jboss.cache.marshall.NodeData;
 import org.jboss.cache.marshall.NodeDataExceptionMarker;
+import org.jboss.cache.transaction.TransactionLog;
 
-import java.io.ObjectOutputStream;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
 public class DefaultStateTransferGenerator implements StateTransferGenerator
 {
 
@@ -48,13 +52,27 @@
    private Log log = LogFactory.getLog(getClass().getName());
 
    private CacheSPI cache;
+   private RPCManager rpcManager;
 
    private Set<Fqn> internalFqns;
 
+   private boolean nonBlocking;
+   private long flushTimeout;
+   private int maxNonProgressingLogWrites = 5;
+   private TransactionLog txLog;
+
+
+
    @Inject
-   public void inject(CacheSPI cache)
+   public void inject(CacheSPI cache, RPCManager rpcManager, Configuration configuration, TransactionLog txLog)
    {
       this.cache = cache;
+      this.nonBlocking = true;
+
+      this.flushTimeout = configuration.getStateRetrievalTimeout();
+      this.nonBlocking = configuration.isNonBlockingStateTransfer();
+      this.txLog = txLog;
+      this.rpcManager = rpcManager;
    }
 
    @Start(priority = 14)
@@ -72,6 +90,16 @@
          cache.getMarshaller().objectToObjectStream(STATE_TRANSFER_VERSION, out);
          if (generateTransient)
          {
+            if (nonBlocking)
+            {
+               if (! txLog.activate())
+                  throw new CacheException("Busy performing state transfer for someone else");
+
+               if (log.isTraceEnabled())
+                  log.trace("Transaction log activated!");
+
+            }
+
             //transient + marker
             if (log.isTraceEnabled())
             {
@@ -128,12 +156,49 @@
             }
          }
          delimitStream(out);
+
+         if (nonBlocking && generateTransient)
+         {
+            for (int nonProgress = 0, size = txLog.size(); nonProgress < maxNonProgressingLogWrites && size > 0;)
+            {
+               if (log.isTraceEnabled())
+                  log.trace("Tx Log remaining entries = " + size);
+               txLog.writeCommitLog(cache.getMarshaller(), out);
+               int newSize = txLog.size();
+
+               // If size did not decrease then we did not make progress, and could be wasting
+               // our time. Limit this to the specified max.
+               if (newSize >= size)
+                  nonProgress++;
+
+               size = newSize;
+            }
+
+            // Signal to sender that we need a flush to get a consistent view
+            // of the remaining transactions.
+            delimitStream(out);
+            out.flush();
+            rpcManager.waitForFlush(flushTimeout);
+
+            // Write remaining transactions
+            txLog.writeCommitLog(cache.getMarshaller(), out);
+            delimitStream(out);
+
+            // Write all non-completed prepares
+            txLog.writePendingPrepares(cache.getMarshaller(), out);
+            delimitStream(out);
+         }
+
       }
       catch (Exception e)
       {
          cache.getMarshaller().objectToObjectStream(new NodeDataExceptionMarker(e, cache.getLocalAddress()), out);
          throw e;
       }
+      finally
+      {
+         txLog.deactivate();
+      }
    }
 
    private Fqn getFqn(Object o)

Modified: core/trunk/src/main/java/org/jboss/cache/statetransfer/DefaultStateTransferIntegrator.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/statetransfer/DefaultStateTransferIntegrator.java	2009-02-04 23:48:43 UTC (rev 7648)
+++ core/trunk/src/main/java/org/jboss/cache/statetransfer/DefaultStateTransferIntegrator.java	2009-02-04 23:54:51 UTC (rev 7649)
@@ -21,6 +21,16 @@
  */
 package org.jboss.cache.statetransfer;
 
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.jboss.cache.CacheException;
@@ -30,26 +40,27 @@
 import org.jboss.cache.InvocationContext;
 import org.jboss.cache.Node;
 import org.jboss.cache.NodeSPI;
+import org.jboss.cache.RPCManager;
 import org.jboss.cache.buddyreplication.BuddyManager;
+import org.jboss.cache.commands.WriteCommand;
+import org.jboss.cache.commands.tx.PrepareCommand;
 import org.jboss.cache.config.Configuration;
+import org.jboss.cache.factories.ComponentRegistry;
 import org.jboss.cache.factories.annotations.Inject;
 import org.jboss.cache.factories.annotations.Start;
+import org.jboss.cache.interceptors.InterceptorChain;
+import org.jboss.cache.invocation.InvocationContextContainer;
 import org.jboss.cache.loader.CacheLoader;
 import org.jboss.cache.loader.CacheLoaderManager;
 import org.jboss.cache.marshall.NodeData;
 import org.jboss.cache.marshall.NodeDataExceptionMarker;
 import org.jboss.cache.marshall.NodeDataMarker;
 import org.jboss.cache.notifications.event.NodeModifiedEvent;
+import org.jboss.cache.transaction.TransactionLog;
+import org.jboss.cache.transaction.TransactionLog.LogEntry;
+import org.jgroups.Address;
+import org.jgroups.Channel;
 
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
 public class DefaultStateTransferIntegrator implements StateTransferIntegrator
 {
 
@@ -60,13 +71,25 @@
 
    private Set<Fqn> internalFqns;
    private Configuration cfg;
+   private RPCManager manager;
+   private TransactionLog txLog;
    private boolean needToPersistState;   // for JBCACHE-131
+   private boolean nonBlocking;
+   private InvocationContextContainer container;
+   private InterceptorChain chain;
+   private ComponentRegistry registry;
 
    @Inject
-   public void inject(CacheSPI<?, ?> cache, Configuration cfg)
+   public void inject(CacheSPI<?, ?> cache, Configuration cfg, RPCManager rpcManager, TransactionLog txLog, InvocationContextContainer container, InterceptorChain chain, ComponentRegistry registry)
    {
       this.cache = cache;
       this.cfg = cfg;
+      this.manager = rpcManager;
+      this.nonBlocking = cfg.isNonBlockingStateTransfer();
+      this.txLog = txLog;
+      this.container = container;
+      this.chain = chain;
+      this.registry = registry;
    }
 
    @Start(priority = 14)
@@ -89,8 +112,103 @@
       {
          integratePersistentState(ois, targetRoot);
       }
+
+      // Delimiter
+      verifyMarker(cache.getMarshaller().objectFromObjectStream(ois));
+
+      if (nonBlocking)
+         integrateTxLog(ois);
    }
 
+   private void integrateTxLog(ObjectInputStream ois) throws Exception
+   {
+      if (trace)
+         log.trace("Integrating transaction log");
+
+      processCommitLog(ois);
+
+      Channel channel = manager.getChannel();
+
+      List<Address> targets = new ArrayList<Address>(2);
+      targets.add(channel.getLocalAddress());
+      targets.add(manager.getLastStateTransferSource());
+
+      if (trace)
+         log.trace("Flushing targets: " + targets);
+
+      if (!channel.startFlush(targets, false))
+         throw new CacheException("Could not flush channel! State-transfer failed!");
+
+      try
+      {
+         if (trace)
+            log.trace("Retrieving/Applying post-flush commits");
+         processCommitLog(ois);
+
+         if (trace)
+            log.trace("Retrieving/Applying pending prepares");
+         Object object = cache.getMarshaller().objectFromObjectStream(ois);
+         while (object instanceof PrepareCommand)
+         {
+            PrepareCommand command = (PrepareCommand)object;
+            if (! txLog.hasPendingPrepare(command))
+            {
+               InvocationContext ctx = container.get();
+               ctx.setOriginLocal(false);
+               ctx.getOptionOverrides().setCacheModeLocal(true);
+               ctx.getOptionOverrides().setSkipCacheStatusCheck(true);
+               chain.invoke(ctx, command);
+            }
+            object = cache.getMarshaller().objectFromObjectStream(ois);
+         }
+         verifyMarker(object);
+
+         // Block all remote commands once transfer is complete,
+         // and before FLUSH completes
+         registry.setBlockInStarting(true);
+      }
+      finally
+      {
+         if (trace)
+            log.trace("Stopping flush");
+         channel.stopFlush(targets);
+      }
+   }
+
+   private void processCommitLog(ObjectInputStream ois) throws Exception
+   {
+      Object object = cache.getMarshaller().objectFromObjectStream(ois);
+      while (object instanceof LogEntry)
+      {
+         List<WriteCommand> mods = ((LogEntry)object).getModifications();
+         log.trace("Mods = " + mods);
+         for (WriteCommand mod : mods)
+         {
+            InvocationContext ctx = container.get();
+            ctx.setOriginLocal(false);
+            ctx.getOptionOverrides().setCacheModeLocal(true);
+            ctx.getOptionOverrides().setSkipCacheStatusCheck(true);
+            chain.invoke(ctx, mod);
+         }
+
+         object = cache.getMarshaller().objectFromObjectStream(ois);
+      }
+      verifyMarker(object);
+   }
+
+   private void verifyMarker(Object object)
+   {
+      if (object instanceof NodeDataExceptionMarker)
+      {
+         NodeDataExceptionMarker e = (NodeDataExceptionMarker)object;
+         throw new CacheException("Error in state transfer stream", e.getCause());
+      }
+      else if (! (object instanceof NodeDataMarker))
+      {
+         throw new CacheException("Invalid object unmarshalled");
+      }
+   }
+
    protected void integrateTransientState(ObjectInputStream in, InternalNode target) throws Exception
    {
       boolean transientSet = false;

Modified: core/trunk/src/main/java/org/jboss/cache/statetransfer/DefaultStateTransferManager.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/statetransfer/DefaultStateTransferManager.java	2009-02-04 23:48:43 UTC (rev 7648)
+++ core/trunk/src/main/java/org/jboss/cache/statetransfer/DefaultStateTransferManager.java	2009-02-04 23:54:51 UTC (rev 7649)
@@ -51,7 +51,6 @@
    protected static final boolean trace = log.isTraceEnabled();
 
    public static final NodeData STREAMING_DELIMITER_NODE = new NodeDataMarker();
-
    public static final String PARTIAL_STATE_DELIMITER = "_PARTIAL_STATE_DELIMITER";
 
    protected CacheSPI cache;

Added: core/trunk/src/main/java/org/jboss/cache/transaction/TransactionLog.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/transaction/TransactionLog.java	                        (rev 0)
+++ core/trunk/src/main/java/org/jboss/cache/transaction/TransactionLog.java	2009-02-04 23:54:51 UTC (rev 7649)
@@ -0,0 +1,179 @@
+/*
+* JBoss, Home of Professional Open Source
+* Copyright 2005, JBoss Inc., and individual contributors as indicated
+* by the @authors tag. See the copyright.txt in the distribution for a
+* full listing of individual contributors.
+*
+* This is free software; you can redistribute it and/or modify it
+* under the terms of the GNU Lesser General Public License as
+* published by the Free Software Foundation; either version 2.1 of
+* the License, or (at your option) any later version.
+*
+* This software is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+* Lesser General Public License for more details.
+*
+* You should have received a copy of the GNU Lesser General Public
+* License along with this software; if not, write to the Free
+* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+*/
+package org.jboss.cache.transaction;
+
+import java.io.ObjectOutputStream;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.jboss.cache.commands.WriteCommand;
+import org.jboss.cache.commands.tx.PrepareCommand;
+import org.jboss.cache.marshall.Marshaller;
+
+/**
+ * Logs transactions and writes for Non-Blocking State Transfer
+ *
+ * @author Jason T. Greene
+ */
+public class TransactionLog
+{
+   private final Map<GlobalTransaction, PrepareCommand> pendingPrepares = new ConcurrentHashMap<GlobalTransaction, PrepareCommand>();
+   private final BlockingQueue<LogEntry> entries = new LinkedBlockingQueue<LogEntry>();
+   private AtomicBoolean active = new AtomicBoolean();
+
+   public static class LogEntry
+   {
+      private final GlobalTransaction transaction;
+      private final List<WriteCommand> modifications;
+
+      public LogEntry(GlobalTransaction transaction, List<WriteCommand> modifications)
+      {
+         this.transaction = transaction;
+         this.modifications = modifications;
+      }
+
+      public GlobalTransaction getTransaction()
+      {
+         return transaction;
+      }
+
+      public List<WriteCommand> getModifications()
+      {
+         return modifications;
+      }
+   }
+
+   private Log log = LogFactory.getLog(getClass().getName());
+
+   public void logPrepare(PrepareCommand command)
+   {
+      pendingPrepares.put(command.getGlobalTransaction(), command);
+   }
+
+   public void logCommit(GlobalTransaction gtx)
+   {
+      PrepareCommand command = pendingPrepares.remove(gtx);
+      if (command == null)
+      {
+         log.error("Could not find matching prepare for commit: " + gtx);
+         return;
+      }
+
+      addEntry(new LogEntry(gtx, command.getModifications()));
+   }
+
+   private void addEntry(LogEntry entry)
+   {
+      if (! isActive())
+         return;
+
+      for (;;)
+      {
+         try
+         {
+            if (log.isTraceEnabled())
+               log.trace("Added commit entry to tx log" + entry);
+
+            entries.put(entry);
+            break;
+         }
+         catch (InterruptedException e)
+         {
+            Thread.currentThread().interrupt();
+         }
+      }
+   }
+
+   public void logOnePhaseCommit(GlobalTransaction gtx, List<WriteCommand> modifications)
+   {
+      // Just in case...
+      if (gtx != null) pendingPrepares.remove(gtx);
+      addEntry(new LogEntry(gtx, modifications));
+   }
+
+   public void logNoTxWrite(WriteCommand write)
+   {
+      if (! isActive())
+         return;
+
+      ArrayList<WriteCommand> list = new ArrayList<WriteCommand>();
+      list.add(write);
+      addEntry(new LogEntry(null, list));
+   }
+
+   public void rollback(GlobalTransaction gtx)
+   {
+      pendingPrepares.remove(gtx);
+   }
+
+   public boolean isActive()
+   {
+      return active.get();
+   }
+
+   public boolean activate()
+   {
+      return active.compareAndSet(false, true);
+   }
+
+   public void deactivate()
+   {
+      active.set(false);
+      entries.clear();
+   }
+
+   public int size()
+   {
+      return entries.size();
+   }
+
+   public void writeCommitLog(Marshaller marshaller, ObjectOutputStream out) throws Exception
+   {
+     List<LogEntry> buffer = new ArrayList<LogEntry>(10);
+
+     while (entries.drainTo(buffer, 10) > 0)
+     {
+        for (LogEntry entry : buffer)
+           marshaller.objectToObjectStream(entry, out);
+
+        buffer.clear();
+     }
+   }
+
+   public void writePendingPrepares(Marshaller marshaller, ObjectOutputStream out) throws Exception
+   {
+      for (PrepareCommand entry : pendingPrepares.values())
+         marshaller.objectToObjectStream(entry, out);
+   }
+
+   public boolean hasPendingPrepare(PrepareCommand command)
+   {
+      return pendingPrepares.containsKey(command.getGlobalTransaction());
+   }
+}


Property changes on: core/trunk/src/main/java/org/jboss/cache/transaction/TransactionLog.java
___________________________________________________________________
Name: svn:keywords
   + Id Revision
Name: svn:eol-style
   + LF

Added: core/trunk/src/test/java/org/jboss/cache/statetransfer/NonBlockingStateTransferTest.java
===================================================================
--- core/trunk/src/test/java/org/jboss/cache/statetransfer/NonBlockingStateTransferTest.java	                        (rev 0)
+++ core/trunk/src/test/java/org/jboss/cache/statetransfer/NonBlockingStateTransferTest.java	2009-02-04 23:54:51 UTC (rev 7649)
@@ -0,0 +1,247 @@
+/*
+ * JBoss, the OpenSource J2EE webOS
+ *
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
+ */
+
+package org.jboss.cache.statetransfer;
+
+import static org.testng.AssertJUnit.assertEquals;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.jboss.cache.Cache;
+import org.jboss.cache.CacheSPI;
+import org.jboss.cache.Fqn;
+import org.jboss.cache.UnitTestCacheFactory;
+import org.jboss.cache.config.Configuration;
+import org.jboss.cache.config.Configuration.CacheMode;
+import org.jboss.cache.factories.UnitTestConfigurationFactory;
+import org.jboss.cache.util.TestingUtil;
+import org.testng.annotations.Test;
+
+ at Test(groups="functional")
+public class NonBlockingStateTransferTest
+{
+   public static final Fqn A = Fqn.fromString("/a");
+   public static final Fqn B = Fqn.fromString("/b");
+   public static final Fqn C = Fqn.fromString("/c");
+   protected static final String ADDRESS_CLASSNAME = "org.jboss.cache.marshall.data.Address";
+   protected static final String PERSON_CLASSNAME = "org.jboss.cache.marshall.data.Person";
+   public static final Fqn A_B = Fqn.fromString("/a/b");
+   public static final Fqn A_C = Fqn.fromString("/a/c");
+   public static final Fqn A_D = Fqn.fromString("/a/d");
+   public static final String JOE = "JOE";
+   public static final String BOB = "BOB";
+   public static final String JANE = "JANE";
+   public static final Integer TWENTY = 20;
+   public static final Integer FORTY = 40;
+
+   public static class DelayTransfer implements Serializable
+   {
+      private transient int count;
+
+      private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException
+      {
+         in.defaultReadObject();
+      }
+
+      private void writeObject(ObjectOutputStream out) throws IOException
+      {
+         out.defaultWriteObject();
+
+         // RPC is first serialization, ST is second
+         if (count++ == 0)
+            return;
+
+         try
+         {
+            Thread.sleep(2000);
+         }
+         catch (InterruptedException e)
+         {
+         }
+      }
+
+   }
+   private static class WritingRunner implements Runnable
+   {
+      private final Cache<Object,Object> cache;
+      private final boolean tx;
+      private volatile boolean stop;
+      private volatile int result;
+
+      WritingRunner(Cache<Object, Object> cache, boolean tx)
+      {
+         this.cache = cache;
+         this.tx = tx;
+      }
+
+      public int result()
+      {
+         return result;
+      }
+
+      public void run()
+      {
+         int c = 0;
+         while (!stop)
+         {
+            try
+            {
+               if (tx)
+                  cache.getConfiguration().getRuntimeConfig().getTransactionManager().begin();
+               cache.put("/test" + c, "test", c++);
+               if (tx)
+                  cache.getConfiguration().getRuntimeConfig().getTransactionManager().commit();
+            }
+            catch (Exception e)
+            {
+            }
+         }
+         result = c;
+      }
+
+      public void stop()
+      {
+         stop = true;
+      }
+   }
+
+   private CacheSPI<Object, Object> createCache(String name)
+   {
+      Configuration config = UnitTestConfigurationFactory.createConfiguration(CacheMode.REPL_SYNC);
+      config.setClusterName(name + "-" + Thread.currentThread().getName());
+      config.setNonBlockingStateTransfer(true);
+      CacheSPI<Object, Object> cache = (CacheSPI<Object, Object>) new UnitTestCacheFactory<Object, Object>().createCache(config, false, getClass());
+
+      // Use marshaller
+
+      cache.create();
+      cache.start();
+      return cache;
+   }
+
+   public void testInitialStateTransfer() throws Exception
+   {
+      CacheSPI<Object, Object> cache1 = createCache("nbst");
+
+      writeInitialData(cache1);
+
+      CacheSPI<Object, Object> cache2 = createCache("nbst");
+
+      // Pause to give caches time to see each other
+      TestingUtil.blockUntilViewsReceived(new CacheSPI[]{cache1, cache2}, 60000);
+
+      verifyInitialData(cache2);
+
+      TestingUtil.killCaches(cache1, cache2);
+   }
+
+
+   public void testSTWithThirdWritingNonTxCache() throws Exception
+   {
+      thirdWritingCacheTest(false, "nbst1");
+   }
+
+   public void testSTWithThirdWritingTxCache() throws Exception
+   {
+      thirdWritingCacheTest(true, "nbst2");
+   }
+
+   public void testSTWithWritingNonTxThread() throws Exception
+   {
+      writingThreadTest(false, "nbst3");
+   }
+
+   public void testSTWithWritingTxThread() throws Exception
+   {
+      writingThreadTest(true, "nbst4");
+   }
+
+
+   private void thirdWritingCacheTest(boolean tx, String name) throws InterruptedException
+   {
+      final CacheSPI<Object, Object> cache1 = createCache(name);
+      final CacheSPI<Object, Object> cache3 = createCache(name);
+
+      writeInitialData(cache1);
+
+      // Delay the transient copy, so that we get a more thorough log test
+      cache1.put("/delay", "delay", new DelayTransfer());
+
+      WritingRunner writer = new WritingRunner(cache3, tx);
+      Thread writerThread = new Thread(writer);
+      writerThread.start();
+
+      CacheSPI<Object, Object> cache2 = createCache(name);
+
+      // Pause to give caches time to see each other
+      TestingUtil.blockUntilViewsReceived(new CacheSPI[]{cache1, cache2, cache3}, 60000);
+
+      writer.stop();
+      writerThread.join();
+
+      verifyInitialData(cache2);
+
+      int count = writer.result();
+
+      for (int c = 0; c < count; c++)
+         assertEquals(c, cache2.get("/test" + c, "test"));
+
+      TestingUtil.killCaches(cache1, cache2, cache3);
+   }
+
+   private void verifyInitialData(CacheSPI<Object, Object> cache2)
+   {
+      assertEquals("Incorrect name for /a/b", JOE, cache2.get(A_B, "name"));
+      assertEquals("Incorrect age for /a/b", TWENTY, cache2.get(A_B, "age"));
+      assertEquals("Incorrect name for /a/c", BOB, cache2.get(A_C, "name"));
+      assertEquals("Incorrect age for /a/c", FORTY, cache2.get(A_C, "age"));
+   }
+
+   private void writeInitialData(final CacheSPI<Object, Object> cache1)
+   {
+      cache1.put(A_B, "name", JOE);
+      cache1.put(A_B, "age", TWENTY);
+      cache1.put(A_C, "name", BOB);
+      cache1.put(A_C, "age", FORTY);
+   }
+
+   private void writingThreadTest(boolean tx, String name) throws InterruptedException
+   {
+      final CacheSPI<Object, Object> cache1 = createCache(name);
+
+      writeInitialData(cache1);
+
+      // Delay the transient copy, so that we get a more thorough log test
+      cache1.put("/delay", "delay", new DelayTransfer());
+
+      WritingRunner writer = new WritingRunner(cache1, tx);
+      Thread writerThread = new Thread(writer);
+      writerThread.start();
+
+      CacheSPI<Object, Object> cache2 = createCache(name);
+
+      // Pause to give caches time to see each other
+      TestingUtil.blockUntilViewsReceived(new CacheSPI[]{cache1, cache2}, 60000);
+
+      writer.stop();
+      writerThread.join();
+
+      verifyInitialData(cache2);
+
+      int count = writer.result();
+
+      for (int c = 0; c < count; c++)
+         assertEquals(c, cache2.get("/test" + c, "test"));
+
+      TestingUtil.killCaches(cache1, cache2);
+   }
+}


Property changes on: core/trunk/src/test/java/org/jboss/cache/statetransfer/NonBlockingStateTransferTest.java
___________________________________________________________________
Name: svn:keywords
   + Id Revision
Name: svn:eol-style
   + LF

Modified: core/trunk/src/test/java/org/jboss/cache/transaction/PrepareCommitContentionTest.java
===================================================================
--- core/trunk/src/test/java/org/jboss/cache/transaction/PrepareCommitContentionTest.java	2009-02-04 23:48:43 UTC (rev 7648)
+++ core/trunk/src/test/java/org/jboss/cache/transaction/PrepareCommitContentionTest.java	2009-02-04 23:54:51 UTC (rev 7649)
@@ -156,5 +156,15 @@
       {
          return delegate.getChannel();
       }
+
+      public Address getLastStateTransferSource()
+      {
+         return delegate.getLastStateTransferSource();
+      }
+
+      public void waitForFlush(long timeout)
+      {
+         delegate.waitForFlush(timeout);
+      }
    }
 }

Modified: core/trunk/src/test/resources/log4j.xml
===================================================================
--- core/trunk/src/test/resources/log4j.xml	2009-02-04 23:48:43 UTC (rev 7648)
+++ core/trunk/src/test/resources/log4j.xml	2009-02-04 23:54:51 UTC (rev 7649)
@@ -57,7 +57,7 @@
     <!-- ================ -->
 
     <category name="org.jboss.cache">
-        <priority value="WARN"/>
+        <priority value="TRACE"/>
     </category>
 
     <!-- these two are in separate sections since they




More information about the jbosscache-commits mailing list