[jbosscache-commits] JBoss Cache SVN: r7829 - in core/branches/flat/src: main/java/org/horizon/config and 17 other directories.

jbosscache-commits at lists.jboss.org jbosscache-commits at lists.jboss.org
Tue Mar 3 09:56:48 EST 2009


Author: manik.surtani at jboss.com
Date: 2009-03-03 09:56:48 -0500 (Tue, 03 Mar 2009)
New Revision: 7829

Added:
   core/branches/flat/src/main/java/org/horizon/remoting/transport/DistributedSync.java
   core/branches/flat/src/main/java/org/horizon/remoting/transport/jgroups/ExtendedResponse.java
   core/branches/flat/src/main/java/org/horizon/remoting/transport/jgroups/FlushBasedDistributedSync.java
   core/branches/flat/src/main/java/org/horizon/remoting/transport/jgroups/RequestIgnoredResponse.java
   core/branches/flat/src/test/java/org/horizon/statetransfer/StateTransferCacheLoaderFunctionalTest.java
Modified:
   core/branches/flat/src/main/java/org/horizon/commands/tx/PrepareCommand.java
   core/branches/flat/src/main/java/org/horizon/config/Configuration.java
   core/branches/flat/src/main/java/org/horizon/config/GlobalConfiguration.java
   core/branches/flat/src/main/java/org/horizon/config/parsing/XmlConfigurationParserImpl.java
   core/branches/flat/src/main/java/org/horizon/context/TransactionContext.java
   core/branches/flat/src/main/java/org/horizon/context/TransactionContextImpl.java
   core/branches/flat/src/main/java/org/horizon/interceptors/CacheStoreInterceptor.java
   core/branches/flat/src/main/java/org/horizon/interceptors/CallInterceptor.java
   core/branches/flat/src/main/java/org/horizon/interceptors/InvalidationInterceptor.java
   core/branches/flat/src/main/java/org/horizon/interceptors/InvocationContextInterceptor.java
   core/branches/flat/src/main/java/org/horizon/interceptors/ReplicationInterceptor.java
   core/branches/flat/src/main/java/org/horizon/interceptors/TxInterceptor.java
   core/branches/flat/src/main/java/org/horizon/manager/DefaultCacheManager.java
   core/branches/flat/src/main/java/org/horizon/marshall/HorizonMarshaller.java
   core/branches/flat/src/main/java/org/horizon/remoting/InboundInvocationHandlerImpl.java
   core/branches/flat/src/main/java/org/horizon/remoting/RPCManager.java
   core/branches/flat/src/main/java/org/horizon/remoting/RPCManagerImpl.java
   core/branches/flat/src/main/java/org/horizon/remoting/ReplicationQueue.java
   core/branches/flat/src/main/java/org/horizon/remoting/transport/Transport.java
   core/branches/flat/src/main/java/org/horizon/remoting/transport/jgroups/CommandAwareRpcDispatcher.java
   core/branches/flat/src/main/java/org/horizon/remoting/transport/jgroups/JGroupsTransport.java
   core/branches/flat/src/main/java/org/horizon/statetransfer/StateTransferManagerImpl.java
   core/branches/flat/src/main/java/org/horizon/transaction/TransactionLog.java
   core/branches/flat/src/main/java/org/horizon/transaction/TransactionTable.java
   core/branches/flat/src/main/java/org/horizon/util/Util.java
   core/branches/flat/src/main/resources/config-samples/all.xml
   core/branches/flat/src/main/resources/schema/horizon-config-1.0.xsd
   core/branches/flat/src/test/java/org/horizon/api/mvcc/PutForExternalReadTest.java
   core/branches/flat/src/test/java/org/horizon/invalidation/BaseInvalidationTest.java
   core/branches/flat/src/test/java/org/horizon/replication/SyncReplTest.java
   core/branches/flat/src/test/java/org/horizon/statetransfer/StateTransferFunctionalTest.java
Log:
More NBST fixes

Modified: core/branches/flat/src/main/java/org/horizon/commands/tx/PrepareCommand.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/commands/tx/PrepareCommand.java	2009-03-03 13:01:12 UTC (rev 7828)
+++ core/branches/flat/src/main/java/org/horizon/commands/tx/PrepareCommand.java	2009-03-03 14:56:48 UTC (rev 7829)
@@ -22,7 +22,6 @@
 package org.horizon.commands.tx;
 
 import org.horizon.commands.ReplicableCommand;
-import org.horizon.commands.VisitableCommand;
 import org.horizon.commands.Visitor;
 import org.horizon.commands.write.WriteCommand;
 import org.horizon.context.InvocationContext;
@@ -53,7 +52,7 @@
       this.onePhaseCommit = onePhaseCommit;
    }
 
-   public void removeModifications(Collection<VisitableCommand> modificationsToRemove) {
+   public void removeModifications(Collection<WriteCommand> modificationsToRemove) {
       if (modifications != null) modifications.removeAll(modificationsToRemove);
    }
 

Modified: core/branches/flat/src/main/java/org/horizon/config/Configuration.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/config/Configuration.java	2009-03-03 13:01:12 UTC (rev 7828)
+++ core/branches/flat/src/main/java/org/horizon/config/Configuration.java	2009-03-03 14:56:48 UTC (rev 7829)
@@ -58,6 +58,10 @@
       return useAsyncSerialization;
    }
 
+   public boolean isStateTransferEnabled() {
+      return fetchInMemoryState || (cacheLoaderManagerConfig != null && cacheLoaderManagerConfig.isFetchPersistentState());
+   }
+
    /**
     * Cache replication mode.
     */

Modified: core/branches/flat/src/main/java/org/horizon/config/GlobalConfiguration.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/config/GlobalConfiguration.java	2009-03-03 13:01:12 UTC (rev 7828)
+++ core/branches/flat/src/main/java/org/horizon/config/GlobalConfiguration.java	2009-03-03 14:56:48 UTC (rev 7829)
@@ -50,6 +50,7 @@
    private short marshallVersion = DEFAULT_MARSHALL_VERSION;
 
    private GlobalComponentRegistry gcr;
+   private long distributedSyncTimeout = 60000; // default
 
    /**
     * Behavior of the JVM shutdown hook registered by the cache
@@ -276,6 +277,15 @@
       this.marshallVersion = Version.getVersionShort(marshallVersion);
    }
 
+   public long getDistributedSyncTimeout() {
+      return distributedSyncTimeout;
+   }
+
+   public void setDistributedSyncTimeout(long distributedSyncTimeout) {
+      testImmutability("distributedSyncTimeout");
+      this.distributedSyncTimeout = distributedSyncTimeout;
+   }
+
    @Override
    public boolean equals(Object o) {
       if (this == o) return true;
@@ -312,6 +322,7 @@
          return false;
       if (transportProperties != null ? !transportProperties.equals(that.transportProperties) : that.transportProperties != null)
          return false;
+      if (distributedSyncTimeout != that.distributedSyncTimeout) return false;
 
       return true;
    }
@@ -335,6 +346,7 @@
       result = 31 * result + (clusterName != null ? clusterName.hashCode() : 0);
       result = 31 * result + (shutdownHookBehavior != null ? shutdownHookBehavior.hashCode() : 0);
       result = 31 * result + (int) marshallVersion;
+      result = (int) (31 * result + distributedSyncTimeout);
       return result;
    }
 

Modified: core/branches/flat/src/main/java/org/horizon/config/parsing/XmlConfigurationParserImpl.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/config/parsing/XmlConfigurationParserImpl.java	2009-03-03 13:01:12 UTC (rev 7828)
+++ core/branches/flat/src/main/java/org/horizon/config/parsing/XmlConfigurationParserImpl.java	2009-03-03 14:56:48 UTC (rev 7829)
@@ -276,7 +276,7 @@
          throw new ConfigurationException("Unable to configure eviction", e);
       }
 
-      if (p != null && !p.isEmpty()) XmlConfigHelper. setValues(cfg, p, false, true);
+      if (p != null && !p.isEmpty()) XmlConfigHelper.setValues(cfg, p, false, true);
 
       evictionConfig.setAlgorithmConfig(cfg);
       config.setEvictionConfig(evictionConfig);
@@ -369,6 +369,9 @@
          tmp = getAttributeValue(e, "clusterName");
          if (existsAttribute(tmp)) gc.setClusterName(tmp);
 
+         tmp = getAttributeValue(e, "distributedSyncTimeout");
+         if (existsAttribute(tmp)) gc.setDistributedSyncTimeout(getLong(tmp));
+
          Properties p = XmlConfigHelper.extractProperties(e);
          if (p != null) gc.setTransportProperties(p);
       }

Modified: core/branches/flat/src/main/java/org/horizon/context/TransactionContext.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/context/TransactionContext.java	2009-03-03 13:01:12 UTC (rev 7828)
+++ core/branches/flat/src/main/java/org/horizon/context/TransactionContext.java	2009-03-03 14:56:48 UTC (rev 7829)
@@ -21,7 +21,7 @@
  */
 package org.horizon.context;
 
-import org.horizon.commands.VisitableCommand;
+import org.horizon.commands.write.WriteCommand;
 import org.horizon.transaction.GlobalTransaction;
 
 import javax.transaction.Transaction;
@@ -41,7 +41,7 @@
     *
     * @param command modification
     */
-   void addModification(VisitableCommand command);
+   void addModification(WriteCommand command);
 
    /**
     * Returns all modifications.  If there are no modifications in this transaction this method will return an empty
@@ -49,7 +49,7 @@
     *
     * @return list of modifications.
     */
-   List<VisitableCommand> getModifications();
+   List<WriteCommand> getModifications();
 
    /**
     * Adds a modification to the local modification list.
@@ -57,7 +57,7 @@
     * @param command command to add to list.  Should not be null.
     * @throws NullPointerException if the command to be added is null.
     */
-   void addLocalModification(VisitableCommand command);
+   void addLocalModification(WriteCommand command);
 
    /**
     * Returns all modifications that have been invoked with the LOCAL cache mode option.  These will also be in the
@@ -65,7 +65,7 @@
     *
     * @return list of LOCAL modifications, or an empty list.
     */
-   List<VisitableCommand> getLocalModifications();
+   List<WriteCommand> getLocalModifications();
 
    /**
     * Adds the key that has been removed in the scope of the current transaction.

Modified: core/branches/flat/src/main/java/org/horizon/context/TransactionContextImpl.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/context/TransactionContextImpl.java	2009-03-03 13:01:12 UTC (rev 7828)
+++ core/branches/flat/src/main/java/org/horizon/context/TransactionContextImpl.java	2009-03-03 14:56:48 UTC (rev 7829)
@@ -21,7 +21,7 @@
  */
 package org.horizon.context;
 
-import org.horizon.commands.VisitableCommand;
+import org.horizon.commands.write.WriteCommand;
 import org.horizon.container.MVCCEntry;
 import org.horizon.transaction.GlobalTransaction;
 import org.horizon.util.FastCopyHashMap;
@@ -50,12 +50,12 @@
    /**
     * List&lt;VisitableCommand&gt; of modifications. They will be replicated on TX commit
     */
-   private List<VisitableCommand> modificationList;
+   private List<WriteCommand> modificationList;
    /**
     * A list of modifications that have been encountered with a LOCAL mode option.  These will be removed from the
     * modification list during replication.
     */
-   private List<VisitableCommand> localModifications;
+   private List<WriteCommand> localModifications;
 
    /**
     * A list of dummy uninitialised entries created by the cache loader interceptor to load data for a given entry in
@@ -97,24 +97,24 @@
       lookedUpEntries.putAll(entries);
    }
 
-   public void addModification(VisitableCommand command) {
+   public void addModification(WriteCommand command) {
       if (command == null) return;
-      if (modificationList == null) modificationList = new LinkedList<VisitableCommand>();
+      if (modificationList == null) modificationList = new LinkedList<WriteCommand>();
       modificationList.add(command);
    }
 
-   public List<VisitableCommand> getModifications() {
+   public List<WriteCommand> getModifications() {
       if (modificationList == null) return Collections.emptyList();
       return modificationList;
    }
 
-   public void addLocalModification(VisitableCommand command) {
+   public void addLocalModification(WriteCommand command) {
       if (command == null) throw new NullPointerException("Command is null!");
-      if (localModifications == null) localModifications = new LinkedList<VisitableCommand>();
+      if (localModifications == null) localModifications = new LinkedList<WriteCommand>();
       localModifications.add(command);
    }
 
-   public List<VisitableCommand> getLocalModifications() {
+   public List<WriteCommand> getLocalModifications() {
       if (localModifications == null) return Collections.emptyList();
       return localModifications;
    }

Modified: core/branches/flat/src/main/java/org/horizon/interceptors/CacheStoreInterceptor.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/interceptors/CacheStoreInterceptor.java	2009-03-03 13:01:12 UTC (rev 7828)
+++ core/branches/flat/src/main/java/org/horizon/interceptors/CacheStoreInterceptor.java	2009-03-03 14:56:48 UTC (rev 7829)
@@ -31,6 +31,7 @@
 import org.horizon.commands.write.PutMapCommand;
 import org.horizon.commands.write.RemoveCommand;
 import org.horizon.commands.write.ReplaceCommand;
+import org.horizon.commands.write.WriteCommand;
 import org.horizon.config.CacheLoaderManagerConfig;
 import org.horizon.container.MVCCEntry;
 import org.horizon.context.InvocationContext;
@@ -239,14 +240,14 @@
       if (transactionContext == null) {
          throw new Exception("transactionContext for transaction " + gtx + " not found in transaction table");
       }
-      List<VisitableCommand> modifications = transactionContext.getModifications();
+      List<WriteCommand> modifications = transactionContext.getModifications();
       if (modifications.size() == 0) {
          log.trace("Transaction has not logged any modifications!");
          return;
       }
       log.trace("Cache loader modification list: {0}", modifications);
       StoreModificationsBuilder modsBuilder = new StoreModificationsBuilder(getStatisticsEnabled());
-      for (VisitableCommand cacheCommand : modifications) cacheCommand.acceptVisitor(ctx, modsBuilder);
+      for (WriteCommand cacheCommand : modifications) cacheCommand.acceptVisitor(ctx, modsBuilder);
       int numMods = modsBuilder.modifications.size();
       log.trace("Converted method calls to cache loader modifications.  List size: {0}", numMods);
 

Modified: core/branches/flat/src/main/java/org/horizon/interceptors/CallInterceptor.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/interceptors/CallInterceptor.java	2009-03-03 13:01:12 UTC (rev 7828)
+++ core/branches/flat/src/main/java/org/horizon/interceptors/CallInterceptor.java	2009-03-03 14:56:48 UTC (rev 7829)
@@ -30,6 +30,7 @@
 import org.horizon.commands.write.ClearCommand;
 import org.horizon.commands.write.PutKeyValueCommand;
 import org.horizon.commands.write.RemoveCommand;
+import org.horizon.commands.write.WriteCommand;
 import org.horizon.context.InvocationContext;
 import org.horizon.interceptors.base.CommandInterceptor;
 import org.horizon.transaction.GlobalTransaction;
@@ -99,7 +100,7 @@
       return handleAlterCacheMethod(ctx, command);
    }
 
-   private Object handleAlterCacheMethod(InvocationContext ctx, VisitableCommand command)
+   private Object handleAlterCacheMethod(InvocationContext ctx, WriteCommand command)
          throws Throwable {
       Object result = invokeCommand(ctx, command);
       if (ctx.isValidTransaction()) {

Modified: core/branches/flat/src/main/java/org/horizon/interceptors/InvalidationInterceptor.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/interceptors/InvalidationInterceptor.java	2009-03-03 13:01:12 UTC (rev 7828)
+++ core/branches/flat/src/main/java/org/horizon/interceptors/InvalidationInterceptor.java	2009-03-03 14:56:48 UTC (rev 7829)
@@ -217,7 +217,8 @@
          // increment invalidations counter if statistics maintained
          incrementInvalidations();
          InvalidateCommand command = commandsFactory.buildInvalidateCommand(keys);
-         if (log.isDebugEnabled()) log.debug("Cache [" + rpcManager.getAddress() + "] replicating " + command);
+         if (log.isDebugEnabled())
+            log.debug("Cache [" + rpcManager.getTransport().getAddress() + "] replicating " + command);
          // voila, invalidated!
          replicateCall(ctx, command, synchronous);
       }

Modified: core/branches/flat/src/main/java/org/horizon/interceptors/InvocationContextInterceptor.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/interceptors/InvocationContextInterceptor.java	2009-03-03 13:01:12 UTC (rev 7828)
+++ core/branches/flat/src/main/java/org/horizon/interceptors/InvocationContextInterceptor.java	2009-03-03 14:56:48 UTC (rev 7829)
@@ -195,7 +195,7 @@
     * @return true if the gtx is remote, false if it originated locally.
     */
    private boolean isRemoteGlobalTx(GlobalTransaction gtx) {
-      return gtx != null && (gtx.getAddress() != null) && (!gtx.getAddress().equals(rpcManager.getAddress()));
+      return gtx != null && (gtx.getAddress() != null) && (!gtx.getAddress().equals(rpcManager.getTransport().getAddress()));
    }
 
    private void copyInvocationScopeOptionsToTxScope(InvocationContext ctx) {

Modified: core/branches/flat/src/main/java/org/horizon/interceptors/ReplicationInterceptor.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/interceptors/ReplicationInterceptor.java	2009-03-03 13:01:12 UTC (rev 7828)
+++ core/branches/flat/src/main/java/org/horizon/interceptors/ReplicationInterceptor.java	2009-03-03 14:56:48 UTC (rev 7829)
@@ -112,7 +112,7 @@
       if (command.isSuccessful()) {
          if (ctx.getTransaction() == null && ctx.isOriginLocal()) {
             if (trace) {
-               log.trace("invoking method " + command.getClass().getSimpleName() + ", members=" + rpcManager.getMembers() + ", mode=" +
+               log.trace("invoking method " + command.getClass().getSimpleName() + ", members=" + rpcManager.getTransport().getMembers() + ", mode=" +
                      configuration.getCacheMode() + ", exclude_self=" + true + ", timeout=" +
                      configuration.getSyncReplTimeout());
             }
@@ -138,7 +138,7 @@
    protected void runPreparePhase(PrepareCommand prepareMethod, GlobalTransaction gtx, InvocationContext ctx) throws Throwable {
       boolean async = configuration.getCacheMode() == Configuration.CacheMode.REPL_ASYNC;
       if (trace) {
-         log.trace("(" + rpcManager.getAddress() + "): running remote prepare for global tx " + gtx + " with async mode=" + async);
+         log.trace("(" + rpcManager.getTransport().getAddress() + "): running remote prepare for global tx " + gtx + " with async mode=" + async);
       }
 
       // this method will return immediately if we're the only member (because exclude_self=true)

Modified: core/branches/flat/src/main/java/org/horizon/interceptors/TxInterceptor.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/interceptors/TxInterceptor.java	2009-03-03 13:01:12 UTC (rev 7828)
+++ core/branches/flat/src/main/java/org/horizon/interceptors/TxInterceptor.java	2009-03-03 14:56:48 UTC (rev 7829)
@@ -28,6 +28,7 @@
 import org.horizon.commands.tx.CommitCommand;
 import org.horizon.commands.tx.PrepareCommand;
 import org.horizon.commands.tx.RollbackCommand;
+import org.horizon.commands.write.WriteCommand;
 import org.horizon.context.InvocationContext;
 import org.horizon.context.TransactionContext;
 import org.horizon.factories.ComponentRegistry;
@@ -42,6 +43,7 @@
 import org.horizon.notifications.cachelistener.CacheNotifier;
 import org.horizon.remoting.ReplicationException;
 import org.horizon.transaction.GlobalTransaction;
+import org.horizon.transaction.TransactionLog;
 import org.horizon.transaction.TransactionTable;
 import org.horizon.util.concurrent.ConcurrentHashSet;
 
@@ -74,6 +76,7 @@
    private ComponentRegistry componentRegistry;
    private ContextFactory contextFactory;
    private CacheManager cacheManager;
+   private TransactionLog transactionLog;
 
    /**
     * List <Transaction>that we have registered for
@@ -89,7 +92,8 @@
    @Inject
    public void intialize(CacheManager cacheManager, ContextFactory contextFactory,
                          CacheNotifier notifier, InvocationContextContainer icc,
-                         CommandsFactory factory, ComponentRegistry componentRegistry, LockManager lockManager) {
+                         CommandsFactory factory, ComponentRegistry componentRegistry, LockManager lockManager,
+                         TransactionLog transactionLog) {
       this.contextFactory = contextFactory;
       this.commandsFactory = factory;
       this.cacheManager = cacheManager;
@@ -97,6 +101,7 @@
       this.invocationContextContainer = icc;
       this.componentRegistry = componentRegistry;
       this.lockManager = lockManager;
+      this.transactionLog = transactionLog;
       setStatisticsEnabled(configuration.isExposeManagementStatistics());
    }
 
@@ -226,7 +231,11 @@
    @Override
    public Object handleDefault(InvocationContext ctx, VisitableCommand command) throws Throwable {
       try {
-         return attachGtxAndPassUpChain(ctx, command);
+         Object retval = attachGtxAndPassUpChain(ctx, command);
+         // log non-transactional modification
+         if (command instanceof WriteCommand && ctx.getTransaction() == null)
+            transactionLog.logNoTxWrite((WriteCommand) command);
+         return retval;
       }
       catch (Throwable throwable) {
          throwIfNeeded(ctx, throwable);
@@ -326,7 +335,9 @@
             if (trace)
                log.trace("Using one-phase prepare.  Not propagating the prepare call up the stack until called to do so by the sync handler.");
          } else {
-            // now pass up the prepare method itself.
+            // first log the transaction...
+            transactionLog.logPrepare(command);
+            // then pass up the prepare method itself.
             invokeNextInterceptor(ctx, command);
          }
          // JBCACHE-361 Confirm that the transaction is ACTIVE
@@ -439,20 +450,24 @@
    //   Transaction phase runners
    // --------------------------------------------------------------
 
-   protected PrepareCommand buildPrepareCommand(GlobalTransaction gtx, List modifications, boolean onePhaseCommit) {
+   protected PrepareCommand buildPrepareCommand(GlobalTransaction gtx, List<WriteCommand> modifications, boolean onePhaseCommit) {
       return commandsFactory.buildPrepareCommand(gtx, modifications, cacheManager.getAddress(), onePhaseCommit);
    }
 
    /**
     * creates a commit()
     */
-   protected void runCommitPhase(InvocationContext ctx, GlobalTransaction gtx, List modifications, boolean onePhaseCommit) {
+   protected void runCommitPhase(InvocationContext ctx, GlobalTransaction gtx, List<WriteCommand> modifications, boolean onePhaseCommit) {
       try {
          VisitableCommand commitCommand = onePhaseCommit ? buildPrepareCommand(gtx, modifications, true) : commandsFactory.buildCommitCommand(gtx);
 
          if (trace) log.trace("Running commit for " + gtx);
 
          handleCommitRollback(ctx, commitCommand);
+         if (onePhaseCommit)
+            transactionLog.logOnePhaseCommit(gtx, modifications);
+         else
+            transactionLog.logCommit(gtx);
       }
       catch (Throwable e) {
          log.warn("Commit failed.  Clearing stale locks.");
@@ -487,6 +502,7 @@
          // JBCACHE-457
          VisitableCommand rollbackCommand = commandsFactory.buildRollbackCommand(gtx);
          if (trace) log.trace(" running rollback for {0}", gtx);
+         transactionLog.rollback(gtx);
 
          //JBCACHE-359 Store a lookup for the globalTransaction so a listener
          // callback can find it
@@ -515,10 +531,12 @@
     * Handles a local prepare - invoked by the sync handler.  Tests if the current tx matches the gtx passed in to the
     * method call and passes the prepare() call up the chain.
     */
-   public Object runPreparePhase(InvocationContext ctx, GlobalTransaction gtx, List<VisitableCommand> modifications) throws Throwable {
+   public Object runPreparePhase(InvocationContext ctx, GlobalTransaction gtx, List<WriteCommand> modifications) throws Throwable {
       // running a 2-phase commit.
-      VisitableCommand prepareCommand = buildPrepareCommand(gtx, modifications, false);
+      PrepareCommand prepareCommand = buildPrepareCommand(gtx, modifications, false);
 
+      transactionLog.logPrepare(prepareCommand);
+
       Object result;
 
       // Is there a local transaction associated with GTX ?
@@ -624,7 +642,7 @@
    private class RemoteSynchronizationHandler implements Synchronization {
       Transaction tx = null;
       GlobalTransaction gtx = null;
-      List<VisitableCommand> modifications = null;
+      List<WriteCommand> modifications = null;
       TransactionContext transactionContext = null;
       protected InvocationContext ctx; // the context for this call.
 

Modified: core/branches/flat/src/main/java/org/horizon/manager/DefaultCacheManager.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/manager/DefaultCacheManager.java	2009-03-03 13:01:12 UTC (rev 7828)
+++ core/branches/flat/src/main/java/org/horizon/manager/DefaultCacheManager.java	2009-03-03 14:56:48 UTC (rev 7829)
@@ -307,17 +307,17 @@
 
    public List<Address> getMembers() {
       RPCManager rpcManager = globalComponentRegistry.getComponent(RPCManager.class);
-      return rpcManager == null ? null : rpcManager.getMembers();
+      return rpcManager == null ? null : rpcManager.getTransport().getMembers();
    }
 
    public Address getAddress() {
       RPCManager rpcManager = globalComponentRegistry.getComponent(RPCManager.class);
-      return rpcManager == null ? null : rpcManager.getAddress();
+      return rpcManager == null ? null : rpcManager.getTransport().getAddress();
    }
 
    public boolean isCoordinator() {
       RPCManager rpcManager = globalComponentRegistry.getComponent(RPCManager.class);
-      return rpcManager != null && rpcManager.isCoordinator();
+      return rpcManager != null && rpcManager.getTransport().isCoordinator();
    }
 
    private Cache createCache(String cacheName) {

Modified: core/branches/flat/src/main/java/org/horizon/marshall/HorizonMarshaller.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/marshall/HorizonMarshaller.java	2009-03-03 13:01:12 UTC (rev 7828)
+++ core/branches/flat/src/main/java/org/horizon/marshall/HorizonMarshaller.java	2009-03-03 14:56:48 UTC (rev 7829)
@@ -25,6 +25,7 @@
 import org.horizon.atomic.DeltaAware;
 import org.horizon.commands.RemoteCommandFactory;
 import org.horizon.commands.ReplicableCommand;
+import org.horizon.commands.write.WriteCommand;
 import org.horizon.io.ByteBuffer;
 import org.horizon.io.ExposedByteArrayOutputStream;
 import org.horizon.logging.Log;
@@ -32,6 +33,7 @@
 import org.horizon.remoting.transport.Address;
 import org.horizon.remoting.transport.jgroups.JGroupsAddress;
 import org.horizon.transaction.GlobalTransaction;
+import org.horizon.transaction.TransactionLog;
 import org.horizon.util.FastCopyHashMap;
 import org.horizon.util.Immutables;
 import org.jboss.util.NotImplementedException;
@@ -79,6 +81,7 @@
    protected static final int MAGICNUMBER_OBJECT = 22;
    protected static final int MAGICNUMBER_SINGLETON_LIST = 23;
    protected static final int MAGICNUMBER_COMMAND = 24;
+   protected static final int MAGICNUMBER_TRANSACTION_LOG = 25;
    protected static final int MAGICNUMBER_NULL = 99;
    protected static final int MAGICNUMBER_SERIALIZABLE = 100;
    protected static final int MAGICNUMBER_REF = 101;
@@ -189,6 +192,11 @@
             out.writeByte(MAGICNUMBER_STRING);
             if (useRefs) writeReference(out, createReference(o, refMap));
             marshallString((String) o, out);
+         } else if (o instanceof TransactionLog.LogEntry) {
+            out.writeByte(MAGICNUMBER_TRANSACTION_LOG);
+            TransactionLog.LogEntry le = (TransactionLog.LogEntry) o;
+            marshallObject(le.getTransaction(), out, refMap);
+            marshallObject(le.getModifications(), out, refMap);
          } else if (o instanceof Serializable) {
             if (trace) {
                log.trace("Warning: using object serialization for " + o.getClass());
@@ -305,6 +313,9 @@
          case MAGICNUMBER_JG_ADDRESS:
             retVal = unmarshallJGroupsAddress(in);
             return retVal;
+         case MAGICNUMBER_TRANSACTION_LOG:
+            retVal = new TransactionLog.LogEntry((GlobalTransaction) unmarshallObject(in, refMap), (List<WriteCommand>) unmarshallObject(in, refMap));
+            return retVal;
          case MAGICNUMBER_ARRAY:
             return unmarshallArray(in, refMap);
          case MAGICNUMBER_ARRAY_LIST:

Modified: core/branches/flat/src/main/java/org/horizon/remoting/InboundInvocationHandlerImpl.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/remoting/InboundInvocationHandlerImpl.java	2009-03-03 13:01:12 UTC (rev 7828)
+++ core/branches/flat/src/main/java/org/horizon/remoting/InboundInvocationHandlerImpl.java	2009-03-03 14:56:48 UTC (rev 7829)
@@ -44,8 +44,7 @@
       }
 
       if (!cr.getStatus().allowInvocations()) {
-         if (log.isInfoEnabled()) log.info("Cache named {0} exists but isn't in a state to handle invocations.  Its state is {1}", cacheName, cr.getStatus());
-         return null;
+         throw new IllegalStateException("Cache named " + cacheName + " exists but isn't in a state to handle invocations.  Its state is " + cr.getStatus());
       }
 
       InterceptorChain ic = cr.getComponent(InterceptorChain.class);
@@ -69,7 +68,7 @@
    private StateTransferManager getStateTransferManager(String cacheName) throws StateTransferException {
       ComponentRegistry cr = gcr.getNamedComponentRegistry(cacheName);
       if (cr == null) {
-         String msg = "Cache named "+cacheName+" does not exist on this cache manager!";
+         String msg = "Cache named " + cacheName + " does not exist on this cache manager!";
          log.info(msg);
          throw new StateTransferException(msg);
       }

Modified: core/branches/flat/src/main/java/org/horizon/remoting/RPCManager.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/remoting/RPCManager.java	2009-03-03 13:01:12 UTC (rev 7828)
+++ core/branches/flat/src/main/java/org/horizon/remoting/RPCManager.java	2009-03-03 14:56:48 UTC (rev 7829)
@@ -27,6 +27,7 @@
 import org.horizon.factories.scopes.Scopes;
 import org.horizon.lifecycle.Lifecycle;
 import org.horizon.remoting.transport.Address;
+import org.horizon.remoting.transport.Transport;
 import org.horizon.statetransfer.StateTransferException;
 
 import java.util.List;
@@ -48,78 +49,76 @@
    /**
     * Invokes an RPC call on other caches in the cluster.
     *
-    * @param recipients       a list of Addresses to invoke the call on.  If this is null, the call is broadcast to the
-    *                         entire cluster.
-    * @param rpcCommand       the cache command to invoke
-    * @param mode             the response mode to use
-    * @param timeout          a timeout after which to throw a replication exception.
-    * @param usePriorityQueue if true, a priority queue is used to deliver messages.  May not be supported by all
-    *                         implementations.
-    * @param responseFilter   a response filter with which to filter out failed/unwanted/invalid responses.
+    * @param recipients           a list of Addresses to invoke the call on.  If this is null, the call is broadcast to
+    *                             the entire cluster.
+    * @param rpcCommand           the cache command to invoke
+    * @param mode                 the response mode to use
+    * @param timeout              a timeout after which to throw a replication exception.
+    * @param usePriorityQueue     if true, a priority queue is used to deliver messages.  May not be supported by all
+    *                             implementations.
+    * @param responseFilter       a response filter with which to filter out failed/unwanted/invalid responses.
+    * @param stateTransferEnabled if true, additional replaying is considered if messages need to be re-sent during the
+    *                             course of a state transfer
     * @return a list of responses from each member contacted.
     * @throws Exception in the event of problems.
     */
-   List<Object> invokeRemotely(List<Address> recipients, RPCCommand rpcCommand, ResponseMode mode, long timeout, boolean usePriorityQueue, ResponseFilter responseFilter) throws Exception;
+   List<Object> invokeRemotely(List<Address> recipients, RPCCommand rpcCommand, ResponseMode mode, long timeout, boolean usePriorityQueue, ResponseFilter responseFilter, boolean stateTransferEnabled) throws Exception;
 
    /**
     * Invokes an RPC call on other caches in the cluster.
     *
-    * @param recipients       a list of Addresses to invoke the call on.  If this is null, the call is broadcast to the
-    *                         entire cluster.
-    * @param rpcCommand       the cache command to invoke
-    * @param mode             the response mode to use
-    * @param timeout          a timeout after which to throw a replication exception.
-    * @param usePriorityQueue if true, a priority queue is used to deliver messages.  May not be supported by all
-    *                         implementations.
+    * @param recipients           a list of Addresses to invoke the call on.  If this is null, the call is broadcast to
+    *                             the entire cluster.
+    * @param rpcCommand           the cache command to invoke
+    * @param mode                 the response mode to use
+    * @param timeout              a timeout after which to throw a replication exception.
+    * @param usePriorityQueue     if true, a priority queue is used to deliver messages.  May not be supported by all
+    *                             implementations.
+    * @param stateTransferEnabled if true, additional replaying is considered if messages need to be re-sent during the
+    *                             course of a state transfer
     * @return a list of responses from each member contacted.
     * @throws Exception in the event of problems.
     */
-   List<Object> invokeRemotely(List<Address> recipients, RPCCommand rpcCommand, ResponseMode mode, long timeout, boolean usePriorityQueue) throws Exception;
+   List<Object> invokeRemotely(List<Address> recipients, RPCCommand rpcCommand, ResponseMode mode, long timeout, boolean usePriorityQueue, boolean stateTransferEnabled) throws Exception;
 
    /**
     * Invokes an RPC call on other caches in the cluster.
     *
-    * @param recipients a list of Addresses to invoke the call on.  If this is null, the call is broadcast to the entire
-    *                   cluster.
-    * @param rpcCommand the cache command to invoke
-    * @param mode       the response mode to use
-    * @param timeout    a timeout after which to throw a replication exception.
+    * @param recipients           a list of Addresses to invoke the call on.  If this is null, the call is broadcast to
+    *                             the entire cluster.
+    * @param rpcCommand           the cache command to invoke
+    * @param mode                 the response mode to use
+    * @param timeout              a timeout after which to throw a replication exception.
+    * @param stateTransferEnabled if true, additional replaying is considered if messages need to be re-sent during the
+    *                             course of a state transfer
     * @return a list of responses from each member contacted.
     * @throws Exception in the event of problems.
     */
-   List<Object> invokeRemotely(List<Address> recipients, RPCCommand rpcCommand, ResponseMode mode, long timeout) throws Exception;
+   List<Object> invokeRemotely(List<Address> recipients, RPCCommand rpcCommand, ResponseMode mode, long timeout, boolean stateTransferEnabled) throws Exception;
 
    /**
-    * @return true if the current Channel is the coordinator of the cluster.
-    */
-   boolean isCoordinator();
-
-   /**
-    * @return the Address of the current coordinator.
-    */
-   Address getCoordinator();
-
-   /**
-    * Retrieves the current cache instance's network address
+    * Initiates a state retrieval process from neighbouring caches.  This method will block until it either times out,
+    * or state is retrieved and applied.
     *
-    * @return an Address
+    * @param cacheName name of cache requesting state
+    * @param timeout   length of time to try to retrieve state on each peer
+    * @throws org.horizon.statetransfer.StateTransferException
+    *          in the event of problems
     */
-   Address getAddress();
+   void retrieveState(String cacheName, long timeout) throws StateTransferException;
 
    /**
-    * Returns a list of  members in the current cluster view.
-    *
-    * @return a list of members.  Typically, this would be defensively copied.
+    * @return a reference to the underlying transport.
     */
-   List<Address> getMembers();
+   Transport getTransport();
 
    /**
-    * Initiates a state retrieval process from neighbouring caches.  This method will block until it either times out,
-    * or state is retrieved and applied.
+    * If {@link #retrieveState(String, long)} has been invoked and hasn't yet returned (i.e., a state transfer is in
+    * progress), this method will return the current Address from which a state transfer is being attempted.  Otherwise,
+    * this method returns a null.
     *
-    * @param cacheName name of cache requesting state
-    * @param timeout length of time to try to retrieve state on each peer
-    * @throws org.horizon.statetransfer.StateTransferException in the event of problems
+    * @return the current Address from which a state transfer is being attempted, if a state transfer is in progress, or
+    *         a null otherwise.
     */
-   void retrieveState(String cacheName, long timeout) throws StateTransferException;
+   Address getCurrentStateTransferSource();
 }
\ No newline at end of file

Modified: core/branches/flat/src/main/java/org/horizon/remoting/RPCManagerImpl.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/remoting/RPCManagerImpl.java	2009-03-03 13:01:12 UTC (rev 7828)
+++ core/branches/flat/src/main/java/org/horizon/remoting/RPCManagerImpl.java	2009-03-03 14:56:48 UTC (rev 7829)
@@ -38,6 +38,7 @@
    private final AtomicLong replicationFailures = new AtomicLong(0);
    boolean statisticsEnabled = false; // by default, don't gather statistics.
    private static final Log log = LogFactory.getLog(RPCManagerImpl.class);
+   private volatile Address currentStateTransferSource;
 
    @Inject
    public void injectDependencies(GlobalConfiguration globalConfiguration, Transport t, InboundInvocationHandler handler,
@@ -45,7 +46,8 @@
                                   @ComponentName(KnownComponentNames.ASYNC_SERIALIZATION_EXECUTOR) ExecutorService e,
                                   CacheManagerNotifier notifier) {
       this.t = t;
-      this.t.initialize(globalConfiguration, globalConfiguration.getTransportProperties(), marshaller, e, handler, notifier);
+      this.t.initialize(globalConfiguration, globalConfiguration.getTransportProperties(), marshaller, e, handler,
+                        notifier, globalConfiguration.getDistributedSyncTimeout());
    }
 
    @Start(priority = 10)
@@ -58,36 +60,20 @@
       t.stop();
    }
 
-   public List<Object> invokeRemotely(List<Address> recipients, RPCCommand rpcCommand, ResponseMode mode, long timeout, boolean usePriorityQueue, ResponseFilter responseFilter) throws Exception {
-      return t.invokeRemotely(recipients, rpcCommand, mode, timeout, usePriorityQueue, responseFilter);
+   public List<Object> invokeRemotely(List<Address> recipients, RPCCommand rpcCommand, ResponseMode mode, long timeout, boolean usePriorityQueue, ResponseFilter responseFilter, boolean stateTransferEnabled) throws Exception {
+      return t.invokeRemotely(recipients, rpcCommand, mode, timeout, usePriorityQueue, responseFilter, stateTransferEnabled);
    }
 
-   public List<Object> invokeRemotely(List<Address> recipients, RPCCommand rpcCommand, ResponseMode mode, long timeout, boolean usePriorityQueue) throws Exception {
-      return t.invokeRemotely(recipients, rpcCommand, mode, timeout, usePriorityQueue, null);
+   public List<Object> invokeRemotely(List<Address> recipients, RPCCommand rpcCommand, ResponseMode mode, long timeout, boolean usePriorityQueue, boolean stateTransferEnabled) throws Exception {
+      return t.invokeRemotely(recipients, rpcCommand, mode, timeout, usePriorityQueue, null, stateTransferEnabled);
    }
 
-   public List<Object> invokeRemotely(List<Address> recipients, RPCCommand rpcCommand, ResponseMode mode, long timeout) throws Exception {
-      return t.invokeRemotely(recipients, rpcCommand, mode, timeout, false, null);
+   public List<Object> invokeRemotely(List<Address> recipients, RPCCommand rpcCommand, ResponseMode mode, long timeout, boolean stateTransferEnabled) throws Exception {
+      return t.invokeRemotely(recipients, rpcCommand, mode, timeout, false, null, stateTransferEnabled);
    }
 
-   public boolean isCoordinator() {
-      return t.isCoordinator();
-   }
-
-   public Address getCoordinator() {
-      return t.getCoordinator();
-   }
-
-   public Address getAddress() {
-      return t.getAddress();
-   }
-
-   public List<Address> getMembers() {
-      return t.getMembers();
-   }
-
    public void retrieveState(String cacheName, long timeout) throws StateTransferException {
-      List<Address> members = getMembers();
+      List<Address> members = t.getMembers();
       if (members.size() < 2) {
          if (log.isDebugEnabled())
             log.debug("We're the only member in the cluster; no one to retrieve state from. Not doing anything!");
@@ -95,38 +81,56 @@
       }
 
       boolean success = false;
-      outer:
-      for (int i = 0, wait = 1000; i < 5; i++) {
-         for (Address member : members) {
-            if (!member.equals(getAddress())) {
-               try {
-                  if (log.isInfoEnabled()) log.info("Trying to fetch state from {0}", member);
-                  if (t.retrieveState(cacheName, member, timeout)) {
-                     if (log.isInfoEnabled()) log.info("Successfully retrieved and applied state from {0}", member);
-                     success = true;
-                     break outer;
+
+      try {
+
+         outer:
+         for (int i = 0, wait = 1000; i < 5; i++) {
+            for (Address member : members) {
+               if (!member.equals(t.getAddress())) {
+                  try {
+                     if (log.isInfoEnabled()) log.info("Trying to fetch state from {0}", member);
+                     currentStateTransferSource = member;
+                     if (t.retrieveState(cacheName, member, timeout)) {
+                        if (log.isInfoEnabled()) log.info("Successfully retrieved and applied state from {0}", member);
+                        success = true;
+                        break outer;
+                     }
+                  } catch (StateTransferException e) {
+                     if (log.isDebugEnabled()) log.debug("Error while fetching state from member " + member, e);
+                  } finally {
+                     currentStateTransferSource = null;
                   }
-               } catch (StateTransferException e) {
-                  if (log.isDebugEnabled()) log.debug("Error while fetching state from member " + member, e);
                }
-            }
 
-            if (!success) {
-               if (log.isWarnEnabled()) log.warn("Could not find available peer for state, backing off and retrying");
+               if (!success) {
+                  if (log.isWarnEnabled())
+                     log.warn("Could not find available peer for state, backing off and retrying");
 
-               try {
-                  Thread.sleep(wait <<= 2);
+                  try {
+                     Thread.sleep(wait <<= 2);
+                  }
+                  catch (InterruptedException e) {
+                     Thread.currentThread().interrupt();
+                  }
                }
-               catch (InterruptedException e) {
-                  Thread.currentThread().interrupt();
-               }
             }
          }
+      } finally {
+         currentStateTransferSource = null;
       }
 
       if (!success) throw new StateTransferException("Unable to fetch state on startup");
    }
 
+   public Transport getTransport() {
+      return t;
+   }
+
+   public Address getCurrentStateTransferSource() {
+      return currentStateTransferSource;
+   }
+
    // -------------------------------------------- JMX information -----------------------------------------------
 
    @ManagedOperation

Modified: core/branches/flat/src/main/java/org/horizon/remoting/ReplicationQueue.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/remoting/ReplicationQueue.java	2009-03-03 13:01:12 UTC (rev 7828)
+++ core/branches/flat/src/main/java/org/horizon/remoting/ReplicationQueue.java	2009-03-03 14:56:48 UTC (rev 7829)
@@ -67,6 +67,7 @@
    private Configuration configuration;
    private boolean enabled;
    private CommandsFactory commandsFactory;
+   private boolean stateTransferEnabled;
 
    public boolean isEnabled() {
       return enabled;
@@ -86,6 +87,7 @@
     */
    @Start
    public synchronized void start() {
+      stateTransferEnabled = configuration.isStateTransferEnabled();
       long interval = configuration.getReplQueueInterval();
       log.trace("Starting replication queue, with interval {0} and maxElements {1}", interval, maxElements);
       this.maxElements = configuration.getReplQueueMaxElements();
@@ -142,7 +144,7 @@
             log.trace("Flushing {0} elements", toReplicateSize);
             ReplicateCommand replicateCommand = commandsFactory.buildReplicateCommand(toReplicate);
             // send to all live caches in the cluster
-            rpcManager.invokeRemotely(null, replicateCommand, ResponseMode.ASYNCHRONOUS, configuration.getSyncReplTimeout());
+            rpcManager.invokeRemotely(null, replicateCommand, ResponseMode.ASYNCHRONOUS, configuration.getSyncReplTimeout(), stateTransferEnabled);
          }
          catch (Throwable t) {
             log.error("failed replicating " + toReplicate.size() + " elements in replication queue", t);

Added: core/branches/flat/src/main/java/org/horizon/remoting/transport/DistributedSync.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/remoting/transport/DistributedSync.java	                        (rev 0)
+++ core/branches/flat/src/main/java/org/horizon/remoting/transport/DistributedSync.java	2009-03-03 14:56:48 UTC (rev 7829)
@@ -0,0 +1,68 @@
+package org.horizon.remoting.transport;
+
+import net.jcip.annotations.ThreadSafe;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * This is an abstraction of a cluster-wide synchronization.  Its purpose is to maintain a set of locks that are aware
+ * of block and unblock commands issued across a cluster.  In addition to these block and unblock phases, sub-phases
+ * such as a start processing
+ *
+ * @author Manik Surtani
+ * @since 1.0
+ */
+ at ThreadSafe
+public interface DistributedSync {
+
+   /**
+    * @return the number of syncs that have occured
+    */
+   int getSyncCount();
+
+   /**
+    * Blocks until the start of a sync - either by the current RPCManager instance or a remote one - is received.  This
+    * should return immediately if sync is already in progress.
+    *
+    * @param timeout  timeout after which to give up
+    * @param timeUnit time unit
+    * @throws TimeoutException if waiting for the sync times out.
+    */
+   void blockUntilAcquired(long timeout, TimeUnit timeUnit) throws TimeoutException;
+
+   /**
+    * Blocks until an ongoing sync ends.  This is returns immediately if there is no ongoing sync.
+    *
+    * @param timeout  timeout after which to give up
+    * @param timeUnit time unit
+    * @throws TimeoutException if waiting for an ongoing sync to end times out.
+    */
+   void blockUntilReleased(long timeout, TimeUnit timeUnit) throws TimeoutException;
+
+   /**
+    * Acquires the sync.  This could be from a local or remote source.
+    */
+   void acquireSync();
+
+   /**
+    * Releases the sync.  This could be from a local or remote source.
+    */
+   void releaseSync();
+
+   /**
+    * Acquires a processing lock.  This is typically acquired after the sync is acquired, and is meant for local (not
+    * remote) use.
+    *
+    * @param exclusive whether the lock is exclusive
+    * @param timeout   timeout after which to give up
+    * @param timeUnit  time unit
+    * @throws TimeoutException if waiting for the lock times out
+    */
+   void acquireProcessingLock(boolean exclusive, long timeout, TimeUnit timeUnit) throws TimeoutException;
+
+   /**
+    * Releases any processing locks that may be held by the current thread.
+    */
+   void releaseProcessingLock();
+}

Modified: core/branches/flat/src/main/java/org/horizon/remoting/transport/Transport.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/remoting/transport/Transport.java	2009-03-03 13:01:12 UTC (rev 7828)
+++ core/branches/flat/src/main/java/org/horizon/remoting/transport/Transport.java	2009-03-03 14:56:48 UTC (rev 7829)
@@ -31,14 +31,16 @@
    /**
     * Initializes the transport with global cache configuration and transport-specific properties.
     *
-    * @param c             global cache-wide configuration
-    * @param p             properties to set
-    * @param marshaller    marshaller to use for marshalling and unmarshalling
-    * @param asyncExecutor executor to use for asynchronous calls
-    * @param handler       handler for invoking remotely originating calls on the local cache
+    * @param c                      global cache-wide configuration
+    * @param p                      properties to set
+    * @param marshaller             marshaller to use for marshalling and unmarshalling
+    * @param asyncExecutor          executor to use for asynchronous calls
+    * @param handler                handler for invoking remotely originating calls on the local cache
+    * @param notifier               notifier to use
+    * @param distributedSyncTimeout timeout to wait for distributed syncs
     */
    void initialize(GlobalConfiguration c, Properties p, Marshaller marshaller, ExecutorService asyncExecutor,
-                   InboundInvocationHandler handler, CacheManagerNotifier notifier);
+                   InboundInvocationHandler handler, CacheManagerNotifier notifier, long distributedSyncTimeout);
 
    /**
     * Invokes an RPC call on other caches in the cluster.
@@ -51,10 +53,11 @@
     * @param usePriorityQueue if true, a priority queue is used to deliver messages.  May not be supported by all
     *                         implementations.
     * @param responseFilter   a response filter with which to filter out failed/unwanted/invalid responses.
+    * @param supportReplay    whether replays of missed messages is supported
     * @return a list of responses from each member contacted.
     * @throws Exception in the event of problems.
     */
-   List<Object> invokeRemotely(List<Address> recipients, RPCCommand rpcCommand, ResponseMode mode, long timeout, boolean usePriorityQueue, ResponseFilter responseFilter) throws Exception;
+   List<Object> invokeRemotely(List<Address> recipients, RPCCommand rpcCommand, ResponseMode mode, long timeout, boolean usePriorityQueue, ResponseFilter responseFilter, boolean supportReplay) throws Exception;
 
    /**
     * @return true if the current Channel is the coordinator of the cluster.
@@ -81,14 +84,34 @@
    List<Address> getMembers();
 
    /**
-    * Initiates a state retrieval from a specific cache (by typically invoking {@link org.horizon.remoting.InboundInvocationHandler#generateState(String, java.io.OutputStream)}),
-    * and applies this state to the current cache via the  {@link InboundInvocationHandler#applyState(String, java.io.InputStream)} callback.
+    * Initiates a state retrieval from a specific cache (by typically invoking {@link
+    * org.horizon.remoting.InboundInvocationHandler#generateState(String, java.io.OutputStream)}), and applies this
+    * state to the current cache via the  {@link InboundInvocationHandler#applyState(String, java.io.InputStream)}
+    * callback.
     *
     * @param cacheName name of cache for which to retrieve state
-    * @param address address of remote cache from which to retrieve state
-    * @param timeout state retrieval timeout in milliseconds 
-    * @throws org.horizon.statetransfer.StateTransferException if state cannot be retrieved from the specific cache
+    * @param address   address of remote cache from which to retrieve state
+    * @param timeout   state retrieval timeout in milliseconds
     * @return true if state was transferred and applied successfully, false if it timed out.
+    * @throws org.horizon.statetransfer.StateTransferException
+    *          if state cannot be retrieved from the specific cache
     */
    boolean retrieveState(String cacheName, Address address, long timeout) throws StateTransferException;
+
+   /**
+    * @return an instance of a DistributedSync that can be used to wait for synchronization events across a cluster.
+    */
+   DistributedSync getDistributedSync();
+
+   /**
+    * Blocks all RPC calls to and between a set of Addresses.  If a null is passed in, the entire cluster is blocked.
+    *
+    * @param addresses addresses to block
+    */
+   void blockRPC(Address... addresses);
+
+   /**
+    * Releases a block performed by calling {@link #blockRPC(Address[])}
+    */
+   void unblockRPC();
 }

Modified: core/branches/flat/src/main/java/org/horizon/remoting/transport/jgroups/CommandAwareRpcDispatcher.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/remoting/transport/jgroups/CommandAwareRpcDispatcher.java	2009-03-03 13:01:12 UTC (rev 7828)
+++ core/branches/flat/src/main/java/org/horizon/remoting/transport/jgroups/CommandAwareRpcDispatcher.java	2009-03-03 14:56:48 UTC (rev 7829)
@@ -27,6 +27,7 @@
 import org.horizon.logging.Log;
 import org.horizon.logging.LogFactory;
 import org.horizon.remoting.InboundInvocationHandler;
+import org.horizon.remoting.transport.DistributedSync;
 import org.jgroups.Address;
 import org.jgroups.Channel;
 import org.jgroups.Message;
@@ -38,10 +39,12 @@
 import org.jgroups.util.RspList;
 
 import java.io.NotSerializableException;
+import java.util.Map;
 import java.util.Vector;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
 
 /**
  * A JGroups RPC dispatcher that knows how to deal with {@link ReplicableCommand}s.
@@ -53,7 +56,10 @@
    protected boolean trace;
    ExecutorService asyncExecutor;
    InboundInvocationHandler inboundInvocationHandler;
+   DistributedSync distributedSync;
+   long distributedSyncTimeout;
    private Log log = LogFactory.getLog(CommandAwareRpcDispatcher.class);
+   private static final RequestIgnoredResponse REQUEST_IGNORED_RESPONSE = new RequestIgnoredResponse();
 
    public CommandAwareRpcDispatcher() {
    }
@@ -61,12 +67,14 @@
    public CommandAwareRpcDispatcher(Channel channel,
                                     JGroupsTransport transport,
                                     ExecutorService asyncExecutor,
-                                    InboundInvocationHandler inboundInvocationHandler) {
+                                    InboundInvocationHandler inboundInvocationHandler,
+                                    DistributedSync distributedSync, long distributedSyncTimeout) {
       super(channel, transport, transport, transport);
       this.asyncExecutor = asyncExecutor;
       this.inboundInvocationHandler = inboundInvocationHandler;
-
+      this.distributedSync = distributedSync;
       trace = log.isTraceEnabled();
+      this.distributedSyncTimeout = distributedSyncTimeout;
    }
 
    protected boolean isValid(Message req) {
@@ -83,9 +91,9 @@
     * org.jgroups.blocks.RspFilter)} except that this version is aware of {@link ReplicableCommand} objects.
     */
    public RspList invokeRemoteCommands(Vector<Address> dests, ReplicableCommand command, int mode, long timeout,
-                                       boolean anycasting, boolean oob, RspFilter filter)
+                                       boolean anycasting, boolean oob, RspFilter filter, boolean supportReplay)
          throws NotSerializableException, ExecutionException, InterruptedException {
-      ReplicationTask task = new ReplicationTask(command, oob, dests, mode, timeout, anycasting, filter);
+      ReplicationTask task = new ReplicationTask(command, oob, dests, mode, timeout, anycasting, filter, supportReplay);
 
       if (mode == GroupRequest.GET_NONE) {
          asyncExecutor.submit(task);
@@ -131,8 +139,40 @@
 
    protected Object executeCommand(RPCCommand cmd, Message req) throws Throwable {
       if (cmd == null) throw new NullPointerException("Unable to execute a null command!  Message was " + req);
-      if (trace) log.trace("Executing command: {0} [sender={1}]", cmd, req.getSrc());
-      return inboundInvocationHandler.handle(cmd);
+      if (trace) log.trace("Attempting to execute command: {0} [sender={1}]", cmd, req.getSrc());
+
+      boolean unlock = false;
+      try {
+
+         int flushCount = distributedSync.getSyncCount();
+         distributedSync.acquireProcessingLock(false, distributedSyncTimeout, MILLISECONDS);
+         unlock = true;
+
+         distributedSync.blockUntilReleased(distributedSyncTimeout, MILLISECONDS);
+
+         // If this thread blocked during a NBST flush, then inform the sender
+         // it needs to replay ignored messages
+         boolean replayIgnored = distributedSync.getSyncCount() != flushCount;
+
+         Object retval;
+         try {
+            retval = inboundInvocationHandler.handle(cmd);
+         } catch (IllegalStateException ise) {
+            if (trace) log.trace("Unable to execute command, cache not in a receptive state");
+            // cache not in a started state, request replay
+            return REQUEST_IGNORED_RESPONSE;
+         }
+
+         if (replayIgnored) {
+            ExtendedResponse extended = new ExtendedResponse(retval);
+            extended.setReplayIgnoredRequests(true);
+            retval = extended;
+         }
+         return retval;
+
+      } finally {
+         if (unlock) distributedSync.releaseProcessingLock();
+      }
    }
 
    @Override
@@ -149,10 +189,11 @@
       private long timeout;
       private boolean anycasting;
       private RspFilter filter;
+      boolean supportReplay = false;
 
       private ReplicationTask(ReplicableCommand command, boolean oob, Vector<Address> dests,
                               int mode, long timeout,
-                              boolean anycasting, RspFilter filter) {
+                              boolean anycasting, RspFilter filter, boolean supportReplay) {
          this.command = command;
          this.oob = oob;
          this.dests = dests;
@@ -160,6 +201,7 @@
          this.timeout = timeout;
          this.anycasting = anycasting;
          this.filter = filter;
+         this.supportReplay = supportReplay;
       }
 
       public RspList call() throws Exception {
@@ -174,6 +216,8 @@
          Message msg = new Message();
          msg.setBuffer(buf);
          if (oob) msg.setFlag(Message.OOB);
+         // Replay capability requires responses from all members!
+         int mode = supportReplay ? GroupRequest.GET_ALL : this.mode;
          RspList retval = castMessage(dests, msg, mode, timeout, anycasting, filter);
          if (trace) log.trace("responses: {0}", retval);
 
@@ -183,6 +227,30 @@
          if (retval == null)
             throw new NotSerializableException("RpcDispatcher returned a null.  This is most often caused by args for "
                   + command.getClass().getSimpleName() + " not being serializable.");
+
+         if (supportReplay) {
+            boolean replay = false;
+            Vector<Address> ignorers = new Vector<Address>();
+            for (Map.Entry<Address, Rsp> entry : retval.entrySet()) {
+               Object value = entry.getValue().getValue();
+               if (value instanceof RequestIgnoredResponse) {
+                  ignorers.add(entry.getKey());
+               } else if (value instanceof ExtendedResponse) {
+                  ExtendedResponse extended = (ExtendedResponse) value;
+                  replay |= extended.isReplayIgnoredRequests();
+                  entry.getValue().setValue(extended.getResponse());
+               }
+            }
+
+            if (replay && ignorers.size() > 0) {
+               if (trace)
+                  log.trace("Replaying message to ignoring senders: " + ignorers);
+               RspList responses = castMessage(ignorers, msg, GroupRequest.GET_ALL, timeout, anycasting, filter);
+               if (responses != null)
+                  retval.putAll(responses);
+            }
+         }
+
          return retval;
       }
    }

Added: core/branches/flat/src/main/java/org/horizon/remoting/transport/jgroups/ExtendedResponse.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/remoting/transport/jgroups/ExtendedResponse.java	                        (rev 0)
+++ core/branches/flat/src/main/java/org/horizon/remoting/transport/jgroups/ExtendedResponse.java	2009-03-03 14:56:48 UTC (rev 7829)
@@ -0,0 +1,50 @@
+/*
+* JBoss, Home of Professional Open Source
+* Copyright 2005, JBoss Inc., and individual contributors as indicated
+* by the @authors tag. See the copyright.txt in the distribution for a
+* full listing of individual contributors.
+*
+* This is free software; you can redistribute it and/or modify it
+* under the terms of the GNU Lesser General Public License as
+* published by the Free Software Foundation; either version 2.1 of
+* the License, or (at your option) any later version.
+*
+* This software is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+* Lesser General Public License for more details.
+*
+* You should have received a copy of the GNU Lesser General Public
+* License along with this software; if not, write to the Free
+* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+*/
+package org.horizon.remoting.transport.jgroups;
+
+import java.io.Serializable;
+
+/**
+ * A response with extended information
+ *
+ * @author Jason T. Greene
+ */
+public class ExtendedResponse implements Serializable {
+   private boolean replayIgnoredRequests;
+   private final Object response;
+
+   public ExtendedResponse(Object response) {
+      this.response = response;
+   }
+
+   public boolean isReplayIgnoredRequests() {
+      return replayIgnoredRequests;
+   }
+
+   public void setReplayIgnoredRequests(boolean replayIgnoredRequests) {
+      this.replayIgnoredRequests = replayIgnoredRequests;
+   }
+
+   public Object getResponse() {
+      return response;
+   }
+}

Added: core/branches/flat/src/main/java/org/horizon/remoting/transport/jgroups/FlushBasedDistributedSync.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/remoting/transport/jgroups/FlushBasedDistributedSync.java	                        (rev 0)
+++ core/branches/flat/src/main/java/org/horizon/remoting/transport/jgroups/FlushBasedDistributedSync.java	2009-03-03 14:56:48 UTC (rev 7829)
@@ -0,0 +1,98 @@
+package org.horizon.remoting.transport.jgroups;
+
+import net.jcip.annotations.ThreadSafe;
+import org.horizon.logging.Log;
+import org.horizon.logging.LogFactory;
+import org.horizon.remoting.transport.DistributedSync;
+import org.horizon.util.Util;
+import org.horizon.util.concurrent.ReclosableLatch;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+/**
+ * A DistributedSync based on JGroups' FLUSH protocol
+ *
+ * @author Manik Surtani
+ * @since 1.0
+ */
+ at ThreadSafe
+public class FlushBasedDistributedSync implements DistributedSync {
+
+   private final ReentrantReadWriteLock processingLock = new ReentrantReadWriteLock();
+   private final ReclosableLatch flushBlockGate = new ReclosableLatch();
+   private final AtomicInteger flushCompletionCount = new AtomicInteger();
+   private final ReclosableLatch flushWaitGate = new ReclosableLatch(false);
+   private static final Log log = LogFactory.getLog(FlushBasedDistributedSync.class);
+
+   public int getSyncCount() {
+      return flushCompletionCount.get();
+   }
+
+   public void blockUntilAcquired(long timeout, TimeUnit timeUnit) throws TimeoutException {
+      while (true) {
+         try {
+            if (!flushWaitGate.await(timeout, timeUnit))
+               throw new TimeoutException("Timed out waiting for a cluster-wide sync to be acquired. (timeout = " + Util.prettyPrintTime(timeout) + ")");
+            return;
+         }
+         catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+         }
+      }
+   }
+
+   public void blockUntilReleased(long timeout, TimeUnit timeUnit) throws TimeoutException {
+      while (true) {
+         try {
+            if (!flushBlockGate.await(timeout, timeUnit))
+               throw new TimeoutException("Timed out waiting for a cluster-wide sync to be released. (timeout = " + Util.prettyPrintTime(timeout) + ")");
+            return;
+         }
+         catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+         }
+      }
+   }
+
+   public void acquireSync() {
+      flushBlockGate.close();
+      flushWaitGate.open();
+   }
+
+   public void releaseSync() {
+      flushWaitGate.close();
+      flushCompletionCount.incrementAndGet();
+      flushBlockGate.open();
+   }
+
+   public void acquireProcessingLock(boolean exclusive, long timeout, TimeUnit timeUnit) throws TimeoutException {
+      Lock lock = exclusive ? processingLock.writeLock() : processingLock.readLock();
+      while (true) {
+         try {
+            if (!lock.tryLock(timeout, timeUnit))
+               throw new TimeoutException("Could not obtain " + (exclusive ? "exclusive" : "shared") + " processing lock");
+
+            return;
+         }
+         catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+         }
+      }
+   }
+
+   public void releaseProcessingLock() {
+      try {
+         if (processingLock.isWriteLockedByCurrentThread()) {
+            processingLock.writeLock().unlock();
+         } else {
+            processingLock.readLock().unlock();
+         }
+      } catch (IllegalMonitorStateException imse) {
+         if (log.isTraceEnabled()) log.trace("Did not own lock!");
+      }
+   }
+}

Modified: core/branches/flat/src/main/java/org/horizon/remoting/transport/jgroups/JGroupsTransport.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/remoting/transport/jgroups/JGroupsTransport.java	2009-03-03 13:01:12 UTC (rev 7828)
+++ core/branches/flat/src/main/java/org/horizon/remoting/transport/jgroups/JGroupsTransport.java	2009-03-03 14:56:48 UTC (rev 7829)
@@ -14,15 +14,16 @@
 import org.horizon.remoting.ResponseFilter;
 import org.horizon.remoting.ResponseMode;
 import org.horizon.remoting.transport.Address;
+import org.horizon.remoting.transport.DistributedSync;
 import org.horizon.remoting.transport.Transport;
 import org.horizon.statetransfer.StateTransferException;
 import org.horizon.util.FileLookup;
 import org.horizon.util.Util;
 import org.jgroups.Channel;
 import org.jgroups.ChannelException;
+import org.jgroups.ExtendedMembershipListener;
 import org.jgroups.ExtendedMessageListener;
 import org.jgroups.JChannel;
-import org.jgroups.MembershipListener;
 import org.jgroups.Message;
 import org.jgroups.View;
 import org.jgroups.blocks.GroupRequest;
@@ -40,6 +41,8 @@
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ExecutorService;
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 /**
  * An encapsulation of a JGroups transport
@@ -47,7 +50,7 @@
  * @author Manik Surtani
  * @since 1.0
  */
-public class JGroupsTransport implements Transport, MembershipListener, ExtendedMessageListener {
+public class JGroupsTransport implements Transport, ExtendedMembershipListener, ExtendedMessageListener {
    public static final String CONFIGURATION_STRING = "configurationString";
    public static final String CONFIGURATION_XML = "configurationXml";
    public static final String CONFIGURATION_FILE = "configurationFile";
@@ -68,26 +71,24 @@
    ExecutorService asyncExecutor;
    CacheManagerNotifier notifier;
    final ConcurrentMap<String, StateTransferMonitor> stateTransfersInProgress = new ConcurrentHashMap<String, StateTransferMonitor>();
+   private final FlushBasedDistributedSync flushTracker = new FlushBasedDistributedSync();
+   volatile List<org.jgroups.Address> membersBlocked;
+   AtomicBoolean flushInProgress = new AtomicBoolean(false);
+   long distributedSyncTimeout;
 
-   /**
-    * Reference to an exception that was raised during state installation on this node.
-    */
-   protected volatile Exception setStateException;
-   private final Object stateLock = new Object();
-
-
    // ------------------------------------------------------------------------------------------------------------------
    // Lifecycle and setup stuff
    // ------------------------------------------------------------------------------------------------------------------
 
    public void initialize(GlobalConfiguration c, Properties p, Marshaller marshaller, ExecutorService asyncExecutor,
-                          InboundInvocationHandler inboundInvocationHandler, CacheManagerNotifier notifier) {
+                          InboundInvocationHandler inboundInvocationHandler, CacheManagerNotifier notifier, long distributedSyncTimeout) {
       this.c = c;
       this.p = p;
       this.marshaller = marshaller;
       this.asyncExecutor = asyncExecutor;
       this.inboundInvocationHandler = inboundInvocationHandler;
       this.notifier = notifier;
+      this.distributedSyncTimeout = distributedSyncTimeout;
    }
 
    public void start() {
@@ -137,7 +138,7 @@
       channel.setOpt(Channel.AUTO_GETSTATE, false);
       channel.setOpt(Channel.BLOCK, true);
       dispatcher = new CommandAwareRpcDispatcher(channel, this,
-                                                 asyncExecutor, inboundInvocationHandler);
+                                                 asyncExecutor, inboundInvocationHandler, flushTracker, distributedSyncTimeout);
       MarshallerAdapter adapter = new MarshallerAdapter(marshaller);
       dispatcher.setRequestMarshaller(adapter);
       dispatcher.setResponseMarshaller(adapter);
@@ -228,7 +229,7 @@
          cleanup = true;
          ((JChannel) channel).getState(toJGroupsAddress(address), cacheName, timeout, false);
          mon.waitForState();
-         return true;
+         return mon.getSetStateException() == null;
       } catch (StateTransferException ste) {
          throw ste;
       } catch (Exception e) {
@@ -239,6 +240,72 @@
       }
    }
 
+   public DistributedSync getDistributedSync() {
+      return flushTracker;
+   }
+
+   public void blockRPC(Address... addresses) {
+      if (flushInProgress.compareAndSet(false, true)) {
+         // TODO make these configurable!!
+         int retries = 5;
+         int sleepBetweenRetries = 250;
+         int sleepIncreaseFactor = 2;
+         if (trace) log.trace("Attempting a partial flush on members {0} with up to {1} retries.", members, retries);
+
+         boolean success = false;
+         int i;
+         for (i = 1; i <= retries; i++) {
+            if (trace) log.trace("Attempt number " + i);
+            try {
+
+               if (addresses == null) {
+                  success = channel.startFlush(false);
+               } else {
+                  membersBlocked = toJGroupsAddressList(addresses);
+                  success = channel.startFlush(membersBlocked, false);
+               }
+
+               if (success) break;
+               if (trace) log.trace("Channel.startFlush() returned false!");
+            } catch (Exception e) {
+               if (trace) log.trace("Caught exception attempting a partial flush", e);
+            }
+            try {
+               if (trace)
+                  log.trace("Partial state transfer failed.  Backing off for " + sleepBetweenRetries + " millis and retrying");
+               Thread.sleep(sleepBetweenRetries);
+               sleepBetweenRetries *= sleepIncreaseFactor;
+            } catch (InterruptedException ie) {
+               Thread.currentThread().interrupt();
+            }
+         }
+
+         if (success) {
+            if (log.isDebugEnabled()) log.debug("Partial flush between {0} succeeded!", membersBlocked);
+         } else {
+            flushInProgress.set(false);
+            throw new CacheException("Could initiate partial flush between " + membersBlocked + "!");
+         }
+      } else {
+         throw new CacheException("Cannot block RPC; a block is already in progress!");
+      }
+   }
+
+   public void unblockRPC() {
+      if (flushInProgress.get()) {
+         try {
+            if (membersBlocked == null) {
+               channel.stopFlush();
+            } else {
+               channel.stopFlush(membersBlocked);
+               membersBlocked = null;
+            }
+         } finally {
+            flushInProgress.set(false);
+         }
+      }
+   }
+
    public Address getAddress() {
       if (address == null) {
          address = new JGroupsAddress(channel.getLocalAddress());
@@ -251,7 +318,8 @@
    // outbound RPC
    // ------------------------------------------------------------------------------------------------------------------
 
-   public List<Object> invokeRemotely(List<Address> recipients, RPCCommand rpcCommand, ResponseMode mode, long timeout, boolean usePriorityQueue, ResponseFilter responseFilter)
+   public List<Object> invokeRemotely(List<Address> recipients, RPCCommand rpcCommand, ResponseMode mode, long timeout,
+                                      boolean usePriorityQueue, ResponseFilter responseFilter, boolean supportReplay)
          throws Exception {
 
       if (recipients != null && recipients.isEmpty()) {
@@ -262,39 +330,51 @@
 
       log.trace("dests={0}, command={1}, mode={2}, timeout={3}", recipients, rpcCommand, mode, timeout);
 
-      RspList rsps = dispatcher.invokeRemoteCommands(toJGroupsAddressVector(recipients), rpcCommand, toJGroupsMode(mode),
-                                                     timeout, false, usePriorityQueue,
-                                                     toJGroupsFilter(responseFilter));
+      // Acquire a "processing" lock so that any other code is made aware of a network call in progress
+      // make sure this is non-exclusive since concurrent network calls are valid for most situations.
+      flushTracker.acquireProcessingLock(false, distributedSyncTimeout, MILLISECONDS);
+      boolean unlock = true;
+      // if there is a FLUSH in progress, block till it completes
+      flushTracker.blockUntilReleased(distributedSyncTimeout, MILLISECONDS);
 
-      if (mode == ResponseMode.ASYNCHRONOUS) return Collections.emptyList();// async case
+      try {
+         RspList rsps = dispatcher.invokeRemoteCommands(toJGroupsAddressVector(recipients), rpcCommand, toJGroupsMode(mode),
+                                                        timeout, false, usePriorityQueue,
+                                                        toJGroupsFilter(responseFilter), supportReplay);
 
-      if (trace)
-         log.trace("Cache [{0}]: responses for command {1}:\n{2}", getAddress(), rpcCommand.getClass().getSimpleName(), rsps);
+         if (mode == ResponseMode.ASYNCHRONOUS) return Collections.emptyList();// async case
 
-      // short-circuit no-return-value calls.
-      if (rsps == null) return Collections.emptyList();
-      List<Object> retval = new ArrayList<Object>(rsps.size());
+         if (trace)
+            log.trace("Cache [{0}]: responses for command {1}:\n{2}", getAddress(), rpcCommand.getClass().getSimpleName(), rsps);
 
-      for (Rsp rsp : rsps.values()) {
-         if (rsp.wasSuspected() || !rsp.wasReceived()) {
-            CacheException ex;
-            if (rsp.wasSuspected()) {
-               ex = new SuspectException("Suspected member: " + rsp.getSender());
+         // short-circuit no-return-value calls.
+         if (rsps == null) return Collections.emptyList();
+         List<Object> retval = new ArrayList<Object>(rsps.size());
+
+         for (Rsp rsp : rsps.values()) {
+            if (rsp.wasSuspected() || !rsp.wasReceived()) {
+               CacheException ex;
+               if (rsp.wasSuspected()) {
+                  ex = new SuspectException("Suspected member: " + rsp.getSender());
+               } else {
+                  ex = new TimeoutException("Replication timeout for " + rsp.getSender());
+               }
+               retval.add(new ReplicationException("rsp=" + rsp, ex));
             } else {
-               ex = new TimeoutException("Replication timeout for " + rsp.getSender());
+               Object value = rsp.getValue();
+               if (value instanceof Exception && !(value instanceof ReplicationException)) {
+                  // if we have any application-level exceptions make sure we throw them!!
+                  if (trace) log.trace("Recieved exception'" + value + "' from " + rsp.getSender());
+                  throw (Exception) value;
+               }
+               retval.add(value);
             }
-            retval.add(new ReplicationException("rsp=" + rsp, ex));
-         } else {
-            Object value = rsp.getValue();
-            if (value instanceof Exception && !(value instanceof ReplicationException)) {
-               // if we have any application-level exceptions make sure we throw them!!
-               if (trace) log.trace("Recieved exception'" + value + "' from " + rsp.getSender());
-               throw (Exception) value;
-            }
-            retval.add(value);
          }
+         return retval;
+      } finally {
+         // release the "processing" lock so that other threads are aware of the network call having completed
+         if (unlock) flushTracker.releaseProcessingLock();
       }
-      return retval;
    }
 
    private int toJGroupsMode(ResponseMode mode) {
@@ -360,9 +440,13 @@
    }
 
    public void block() {
-      // a no-op
+      flushTracker.acquireSync();
    }
 
+   public void unblock() {
+      flushTracker.releaseSync();
+   }
+
    public void receive(Message msg) {
       // no-op
    }
@@ -388,13 +472,14 @@
    }
 
    public void getState(String cacheName, OutputStream ostream) {
-      if (trace) log.trace("Received request to generate state for cache {0}.  Attempting to generate state.", cacheName);
+      if (trace)
+         log.trace("Received request to generate state for cache named '{0}'.  Attempting to generate state.", cacheName);
       try {
          inboundInvocationHandler.generateState(cacheName, ostream);
       } catch (StateTransferException e) {
          log.error("Caught while responding to state transfer request", e);
       } finally {
-         Util.closeStream(ostream);
+         Util.flushAndCloseStream(ostream);
       }
    }
 
@@ -403,14 +488,15 @@
    }
 
    public void setState(String cacheName, InputStream istream) {
-      if (trace) log.trace("Received state for cache {0}.  Attempting to apply state.", cacheName);
-      StateTransferMonitor mon = stateTransfersInProgress.get(cacheName);
+      StateTransferMonitor mon = null;
       try {
+         if (trace) log.trace("Received state for cache named '{0}'.  Attempting to apply state.", cacheName);
+         mon = stateTransfersInProgress.get(cacheName);
          inboundInvocationHandler.applyState(cacheName, istream);
          mon.notifyStateReceiptSucceeded();
-      } catch (StateTransferException e) {
+      } catch (Exception e) {
          log.error("Failed setting state", e);
-         mon.notifyStateReceiptFailed(e);
+         mon.notifyStateReceiptFailed(e instanceof StateTransferException ? (StateTransferException) e : new StateTransferException(e));
       } finally {
          Util.closeStream(istream);
       }
@@ -433,6 +519,18 @@
       return retval;
    }
 
+   private List<org.jgroups.Address> toJGroupsAddressList(Address... addresses) {
+      if (addresses == null) return null;
+      if (addresses.length == 0) return Collections.emptyList();
+
+      List<org.jgroups.Address> retval = new ArrayList<org.jgroups.Address>(addresses.length);
+      for (Address a : addresses) {
+         JGroupsAddress ja = (JGroupsAddress) a;
+         retval.add(ja.address);
+      }
+      return retval;
+   }
+
    private org.jgroups.Address toJGroupsAddress(Address a) {
       return ((JGroupsAddress) a).address;
    }

Added: core/branches/flat/src/main/java/org/horizon/remoting/transport/jgroups/RequestIgnoredResponse.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/remoting/transport/jgroups/RequestIgnoredResponse.java	                        (rev 0)
+++ core/branches/flat/src/main/java/org/horizon/remoting/transport/jgroups/RequestIgnoredResponse.java	2009-03-03 14:56:48 UTC (rev 7829)
@@ -0,0 +1,36 @@
+/*
+* JBoss, Home of Professional Open Source
+* Copyright 2005, JBoss Inc., and individual contributors as indicated
+* by the @authors tag. See the copyright.txt in the distribution for a
+* full listing of individual contributors.
+*
+* This is free software; you can redistribute it and/or modify it
+* under the terms of the GNU Lesser General Public License as
+* published by the Free Software Foundation; either version 2.1 of
+* the License, or (at your option) any later version.
+*
+* This software is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+* Lesser General Public License for more details.
+*
+* You should have received a copy of the GNU Lesser General Public
+* License along with this software; if not, write to the Free
+* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+*/
+package org.horizon.remoting.transport.jgroups;
+
+import java.io.Serializable;
+
+/**
+ * Indicates that the request was ignored,
+ *
+ * @author Jason T. Greene
+ */
+public class RequestIgnoredResponse implements Serializable {
+   @Override
+   public String toString() {
+      return "RequestIgnoredResponse";
+   }
+}

Modified: core/branches/flat/src/main/java/org/horizon/statetransfer/StateTransferManagerImpl.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/statetransfer/StateTransferManagerImpl.java	2009-03-03 13:01:12 UTC (rev 7828)
+++ core/branches/flat/src/main/java/org/horizon/statetransfer/StateTransferManagerImpl.java	2009-03-03 14:56:48 UTC (rev 7829)
@@ -22,12 +22,18 @@
 package org.horizon.statetransfer;
 
 import org.horizon.AdvancedCache;
-import org.horizon.transaction.TransactionLog;
+import org.horizon.commands.tx.PrepareCommand;
+import org.horizon.commands.write.WriteCommand;
 import org.horizon.config.Configuration;
 import org.horizon.container.DataContainer;
+import org.horizon.context.InvocationContext;
 import org.horizon.factories.annotations.Inject;
 import org.horizon.factories.annotations.Start;
+import org.horizon.interceptors.InterceptorChain;
+import org.horizon.invocation.InvocationContextContainer;
 import org.horizon.invocation.Options;
+import org.horizon.io.UnclosableObjectInputStream;
+import org.horizon.io.UnclosableObjectOutputStream;
 import org.horizon.loader.CacheLoaderException;
 import org.horizon.loader.CacheLoaderManager;
 import org.horizon.loader.CacheStore;
@@ -36,6 +42,9 @@
 import org.horizon.logging.LogFactory;
 import org.horizon.marshall.Marshaller;
 import org.horizon.remoting.RPCManager;
+import org.horizon.remoting.transport.DistributedSync;
+import org.horizon.remoting.transport.Transport;
+import org.horizon.transaction.TransactionLog;
 import org.horizon.util.Util;
 
 import java.io.IOException;
@@ -44,8 +53,9 @@
 import java.io.ObjectOutputStream;
 import java.io.OutputStream;
 import java.io.Serializable;
+import java.util.List;
 import java.util.Set;
-import java.util.concurrent.TimeUnit;
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
 
 public class StateTransferManagerImpl implements StateTransferManager {
 
@@ -57,13 +67,18 @@
    CacheStore cs;
    Marshaller marshaller;
    TransactionLog transactionLog;
+   InvocationContextContainer invocationContextContainer;
+   InterceptorChain interceptorChain;
    private static final Log log = LogFactory.getLog(StateTransferManagerImpl.class);
+   private static final boolean trace = log.isTraceEnabled();
    private static final Delimiter DELIMITER = new Delimiter();
 
+   boolean transientState, persistentState;
+
    @Inject
    public void injectDependencies(RPCManager rpcManager, AdvancedCache cache, Configuration configuration,
                                   DataContainer dataContainer, CacheLoaderManager clm, Marshaller marshaller,
-                                  TransactionLog transactionLog) {
+                                  TransactionLog transactionLog, InterceptorChain interceptorChain, InvocationContextContainer invocationContextContainer) {
       this.rpcManager = rpcManager;
       this.cache = cache;
       this.configuration = configuration;
@@ -71,12 +86,17 @@
       this.clm = clm;
       this.marshaller = marshaller;
       this.transactionLog = transactionLog;
+      this.invocationContextContainer = invocationContextContainer;
+      this.interceptorChain = interceptorChain;
    }
 
    @Start(priority = 14)
    // it is imperative that this starts *after* the RPCManager does.
    public void start() throws StateTransferException {
-      cs = clm == null || !clm.isEnabled() || !clm.isFetchPersistentState() ? null : clm.getCacheStore();
+      log.trace("Data container is {0}", System.identityHashCode(dataContainer));
+      cs = clm == null ? null : clm.getCacheStore();
+      transientState = configuration.isFetchInMemoryState();
+      persistentState = cs != null && clm.isEnabled() && clm.isFetchPersistentState() && !clm.isShared();
 
       long startTime = 0;
       if (log.isDebugEnabled()) {
@@ -93,43 +113,162 @@
    }
 
    public void generateState(OutputStream out) throws StateTransferException {
-      if (log.isDebugEnabled()) log.debug("Generating state");
+      ObjectOutputStream oos = null;
+      boolean txLogActivated = false;
+      try {
+         boolean canProvideState = (transientState || persistentState)
+               && (txLogActivated = transactionLog.activate());
+         if (log.isDebugEnabled()) log.debug("Generating state.  Can provide? {0}", canProvideState);
+         oos = new ObjectOutputStream(out);
+         marshaller.objectToObjectStream(canProvideState, oos);
 
+         if (canProvideState) {
+            delimit(oos);
+            if (transientState) generateInMemoryState(oos);
+            delimit(oos);
+            if (persistentState) generatePersistentState(oos);
+            delimit(oos);
+            generateTransactionLog(oos);
+
+            if (log.isDebugEnabled()) log.debug("State generated, closing object stream");
+         } else {
+            if (log.isDebugEnabled()) log.debug("Not providing state!");
+         }
+
+      } catch (StateTransferException ste) {
+         throw ste;
+      } catch (Exception e) {
+         throw new StateTransferException(e);
+      } finally {
+         Util.flushAndCloseStream(oos);
+         if (txLogActivated) transactionLog.deactivate();
+      }
+   }
+
+   private void generateTransactionLog(ObjectOutputStream oos) throws Exception {
+      // todo this should be configurable
+      int maxNonProgressingLogWrites = 100;
+      int flushTimeout = 60000;
+
+      DistributedSync distributedSync = rpcManager.getTransport().getDistributedSync();
+
       try {
-         ObjectOutputStream oos = new ObjectOutputStream(out);
+         if (trace) log.trace("Transaction log size is {0}", transactionLog.size());
+         for (int nonProgress = 0, size = transactionLog.size(); size > 0;) {
+            if (trace) log.trace("Tx Log remaining entries = " + size);
+            transactionLog.writeCommitLog(marshaller, oos);
+            int newSize = transactionLog.size();
+
+            // If size did not decrease then we did not make progress, and could be wasting
+            // our time. Limit this to the specified max.
+            if (newSize >= size && ++nonProgress >= maxNonProgressingLogWrites)
+               break;
+
+            size = newSize;
+         }
+
+         // Wait on incoming and outgoing threads to line-up in front of
+         // the distributed sync.
+         distributedSync.acquireProcessingLock(true, configuration.getStateRetrievalTimeout(), MILLISECONDS);
+
+         // Signal to sender that we need a flush to get a consistent view
+         // of the remaining transactions.
          delimit(oos);
-         generateInMemoryState(oos);
+         oos.flush();
+         if (trace) log.trace("Waiting for a distributed sync block");
+         distributedSync.blockUntilAcquired(flushTimeout, MILLISECONDS);
+         if (trace) log.trace("Distributed sync block received, proceeding with writing commit log");
+         // Write remaining transactions
+         transactionLog.writeCommitLog(marshaller, oos);
          delimit(oos);
-         generatePersistentState(oos);
+
+         // Write all non-completed prepares
+         transactionLog.writePendingPrepares(marshaller, oos);
          delimit(oos);
          oos.flush();
-         oos.close();
-         if (log.isDebugEnabled()) log.debug("State generated, closing object stream");
-         // just close the object stream but do NOT close the underlying stream
-      } catch (StateTransferException ste) {
-         throw ste;
-      } catch (Exception e) {
-         throw new StateTransferException(e);
       }
+      finally {
+         distributedSync.releaseProcessingLock();
+      }
    }
 
+   private void processCommitLog(ObjectInputStream ois) throws Exception {
+      Object object = marshaller.objectFromObjectStream(ois);
+      while (object instanceof TransactionLog.LogEntry) {
+         List<WriteCommand> mods = ((TransactionLog.LogEntry) object).getModifications();
+         if (trace) log.trace("Mods = {0}", mods);
+         for (WriteCommand mod : mods) {
+            InvocationContext ctx = invocationContextContainer.get();
+            ctx.setOriginLocal(false);
+            ctx.setOptions(Options.CACHE_MODE_LOCAL, Options.SKIP_CACHE_STATUS_CHECK);
+            interceptorChain.invoke(ctx, mod);
+         }
+
+         object = marshaller.objectFromObjectStream(ois);
+      }
+
+      assertDelimited(object);
+   }
+
+   private void applyTransactionLog(ObjectInputStream ois) throws Exception {
+      if (trace) log.trace("Integrating transaction log");
+
+      processCommitLog(ois);
+      Transport t = rpcManager.getTransport();
+      t.blockRPC(rpcManager.getTransport().getAddress(), rpcManager.getCurrentStateTransferSource());
+
+      try {
+         if (trace)
+            log.trace("Retrieving/Applying post-flush commits");
+         processCommitLog(ois);
+
+         if (trace)
+            log.trace("Retrieving/Applying pending prepares");
+         Object object = marshaller.objectFromObjectStream(ois);
+         while (object instanceof PrepareCommand) {
+            PrepareCommand command = (PrepareCommand) object;
+            if (!transactionLog.hasPendingPrepare(command)) {
+               InvocationContext ctx = invocationContextContainer.get();
+               ctx.setOriginLocal(false);
+               ctx.setOptions(Options.CACHE_MODE_LOCAL, Options.SKIP_CACHE_STATUS_CHECK);
+               interceptorChain.invoke(ctx, command);
+            }
+            object = marshaller.objectFromObjectStream(ois);
+         }
+         assertDelimited(object);
+      }
+      finally {
+         if (trace) log.trace("Stopping partial flush");
+         t.unblockRPC();
+      }
+   }
+
    public void applyState(InputStream in) throws StateTransferException {
       if (log.isDebugEnabled()) log.debug("Applying state");
-
+      ObjectInputStream ois = null;
       try {
-         ObjectInputStream ois = new ObjectInputStream(in);
-         assertDelimited(ois);
-         applyInMemoryState(ois);
-         assertDelimited(ois);
-         applyPersistentState(ois);
-         assertDelimited(ois);
-         if (log.isDebugEnabled()) log.debug("State applied, closing object stream");
-         ois.close();
-         // just close the object stream but do NOT close the underlying stream
+         ois = new ObjectInputStream(in);
+         boolean canProvideState = (Boolean) marshaller.objectFromObjectStream(ois);
+         if (canProvideState) {
+            assertDelimited(ois);
+            if (transientState) applyInMemoryState(ois);
+            assertDelimited(ois);
+            if (persistentState) applyPersistentState(ois);
+            assertDelimited(ois);
+            applyTransactionLog(ois);
+            if (log.isDebugEnabled()) log.debug("State applied, closing object stream");
+         } else {
+            String msg = "Provider cannot provide state!";
+            if (log.isDebugEnabled()) log.debug(msg);
+            throw new StateTransferException(msg);
+         }
       } catch (StateTransferException ste) {
          throw ste;
       } catch (Exception e) {
          throw new StateTransferException(e);
+      } finally {
+         // just close the object stream but do NOT close the underlying stream
+         Util.closeStream(ois);
       }
    }
 
@@ -137,7 +276,8 @@
       dataContainer.clear();
       try {
          Set<StoredEntry> set = (Set<StoredEntry>) marshaller.objectFromObjectStream(i);
-         for (StoredEntry se: set) cache.put(se.getKey(), se.getValue(), se.getLifespan(), TimeUnit.MILLISECONDS, Options.CACHE_MODE_LOCAL);
+         for (StoredEntry se : set)
+            cache.put(se.getKey(), se.getValue(), se.getLifespan(), MILLISECONDS, Options.CACHE_MODE_LOCAL);
       } catch (Exception e) {
          dataContainer.clear();
          throw new StateTransferException(e);
@@ -149,6 +289,7 @@
       // TODO is it safe enough to get these from the data container directly?
       try {
          Set<StoredEntry> s = dataContainer.getAllEntriesForStorage();
+         if (log.isDebugEnabled()) log.debug("Writing {0} StoredEntries to stream", s.size());
          marshaller.objectToObjectStream(s, o);
       } catch (Exception e) {
          throw new StateTransferException(e);
@@ -156,40 +297,39 @@
    }
 
    private void applyPersistentState(ObjectInputStream i) throws StateTransferException {
-      if (cs == null) {
-         if (log.isDebugEnabled()) log.debug("Not configured to fetch persistent state, or no cache store configured.  Skipping applying persistent state.");
-      } else {
-         try {
-            cs.fromStream(i);
-         } catch (CacheLoaderException cle) {
-            throw new StateTransferException(cle);
-         }
+      try {
+         // always use the unclosable stream delegate to ensure the impl doesn't close the stream
+         cs.fromStream(new UnclosableObjectInputStream(i));
+      } catch (CacheLoaderException cle) {
+         throw new StateTransferException(cle);
       }
    }
 
    private void generatePersistentState(ObjectOutputStream o) throws StateTransferException {
-      if (cs == null) {
-         if (log.isDebugEnabled()) log.debug("Not configured to fetch persistent state, or no cache store configured.  Skipping generating persistent state.");
-      } else {
-         try {
-            cs.toStream(o);
-         } catch (CacheLoaderException cle) {
-            throw new StateTransferException(cle);
-         }
+      try {
+         // always use the unclosable stream delegate to ensure the impl doesn't close the stream
+         cs.toStream(new UnclosableObjectOutputStream(o));
+      } catch (CacheLoaderException cle) {
+         throw new StateTransferException(cle);
       }
    }
 
    private void delimit(ObjectOutputStream o) throws IOException {
-      o.writeObject(DELIMITER);
+      marshaller.objectToObjectStream(DELIMITER, o);
    }
 
    private void assertDelimited(ObjectInputStream i) throws StateTransferException {
       Object o;
       try {
-         o = i.readObject();
+         o = marshaller.objectFromObjectStream(i);
       } catch (Exception e) {
          throw new StateTransferException(e);
       }
+      assertDelimited(o);
+   }
+
+   private void assertDelimited(Object o) throws StateTransferException {
+      if (o instanceof Exception) throw new StateTransferException((Exception) o);
       if ((o == null) || !(o instanceof Delimiter))
          throw new StateTransferException("Expected a delimiter, recieved " + o);
    }

Modified: core/branches/flat/src/main/java/org/horizon/transaction/TransactionLog.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/transaction/TransactionLog.java	2009-03-03 13:01:12 UTC (rev 7828)
+++ core/branches/flat/src/main/java/org/horizon/transaction/TransactionLog.java	2009-03-03 14:56:48 UTC (rev 7829)
@@ -41,43 +41,36 @@
  *
  * @author Jason T. Greene
  */
-public class TransactionLog
-{
+public class TransactionLog {
    private final Map<GlobalTransaction, PrepareCommand> pendingPrepares = new ConcurrentHashMap<GlobalTransaction, PrepareCommand>();
    private final BlockingQueue<LogEntry> entries = new LinkedBlockingQueue<LogEntry>();
    private AtomicBoolean active = new AtomicBoolean();
 
-   public static class LogEntry
-   {
+   public static class LogEntry {
       private final GlobalTransaction transaction;
       private final List<WriteCommand> modifications;
 
-      public LogEntry(GlobalTransaction transaction, List<WriteCommand> modifications)
-      {
+      public LogEntry(GlobalTransaction transaction, List<WriteCommand> modifications) {
          this.transaction = transaction;
          this.modifications = modifications;
       }
 
-      public GlobalTransaction getTransaction()
-      {
+      public GlobalTransaction getTransaction() {
          return transaction;
       }
 
-      public List<WriteCommand> getModifications()
-      {
+      public List<WriteCommand> getModifications() {
          return modifications;
       }
    }
 
    private static Log log = LogFactory.getLog(TransactionLog.class);
 
-   public void logPrepare(PrepareCommand command)
-   {
+   public void logPrepare(PrepareCommand command) {
       pendingPrepares.put(command.getGlobalTransaction(), command);
    }
 
-   public void logCommit(GlobalTransaction gtx)
-   {
+   public void logCommit(GlobalTransaction gtx) {
       PrepareCommand command = pendingPrepares.remove(gtx);
       // it is perfectly normal for a prepare not to be logged for this gtx, for example if a transaction did not
       // modify anything, then beforeCompletion() is not invoked and logPrepare() will not be called to register the
@@ -85,38 +78,32 @@
       if (command != null) addEntry(new LogEntry(gtx, command.getModifications()));
    }
 
-   private void addEntry(LogEntry entry)
-   {
-      if (! isActive())
+   private void addEntry(LogEntry entry) {
+      if (!isActive())
          return;
 
-      for (;;)
-      {
-         try
-         {
+      for (; ;) {
+         try {
             if (log.isTraceEnabled())
                log.trace("Added commit entry to tx log" + entry);
 
             entries.put(entry);
             break;
          }
-         catch (InterruptedException e)
-         {
+         catch (InterruptedException e) {
             Thread.currentThread().interrupt();
          }
       }
    }
 
-   public void logOnePhaseCommit(GlobalTransaction gtx, List<WriteCommand> modifications)
-   {
+   public final void logOnePhaseCommit(GlobalTransaction gtx, List<WriteCommand> modifications) {
       // Just in case...
       if (gtx != null) pendingPrepares.remove(gtx);
       if (!modifications.isEmpty()) addEntry(new LogEntry(gtx, modifications));
    }
 
-   public void logNoTxWrite(WriteCommand write)
-   {
-      if (! isActive())
+   public final void logNoTxWrite(WriteCommand write) {
+      if (!isActive())
          return;
 
       ArrayList<WriteCommand> list = new ArrayList<WriteCommand>();
@@ -124,55 +111,46 @@
       addEntry(new LogEntry(null, list));
    }
 
-   public void rollback(GlobalTransaction gtx)
-   {
+   public void rollback(GlobalTransaction gtx) {
       pendingPrepares.remove(gtx);
    }
 
-   public boolean isActive()
-   {
+   public final boolean isActive() {
       return active.get();
    }
 
-   public boolean activate()
-   {
+   public final boolean activate() {
       return active.compareAndSet(false, true);
    }
 
-   public void deactivate()
-   {
+   public final void deactivate() {
       active.set(false);
       if (entries.size() > 0)
-         log.error("Unprocessed Transaction Log Entries! = " + entries.size());
+         log.error("Unprocessed Transaction Log Entries! = {0}", entries.size());
       entries.clear();
    }
 
-   public int size()
-   {
+   public int size() {
       return entries.size();
    }
 
-   public void writeCommitLog(Marshaller marshaller, ObjectOutputStream out) throws Exception
-   {
-     List<LogEntry> buffer = new ArrayList<LogEntry>(10);
+   public void writeCommitLog(Marshaller marshaller, ObjectOutputStream out) throws Exception {
+      List<LogEntry> buffer = new ArrayList<LogEntry>(10);
 
-     while (entries.drainTo(buffer, 10) > 0)
-     {
-        for (LogEntry entry : buffer)
-           marshaller.objectToObjectStream(entry, out);
+      while (entries.drainTo(buffer, 10) > 0) {
+         for (LogEntry entry : buffer)
+            marshaller.objectToObjectStream(entry, out);
 
-        buffer.clear();
-     }
+         buffer.clear();
+      }
    }
 
-   public void writePendingPrepares(Marshaller marshaller, ObjectOutputStream out) throws Exception
-   {
+   public void writePendingPrepares(Marshaller marshaller, ObjectOutputStream out) throws Exception {
       for (PrepareCommand entry : pendingPrepares.values())
          marshaller.objectToObjectStream(entry, out);
    }
 
-   public boolean hasPendingPrepare(PrepareCommand command)
-   {
+   public boolean hasPendingPrepare(PrepareCommand command) {
       return pendingPrepares.containsKey(command.getGlobalTransaction());
    }
 }

Modified: core/branches/flat/src/main/java/org/horizon/transaction/TransactionTable.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/transaction/TransactionTable.java	2009-03-03 13:01:12 UTC (rev 7828)
+++ core/branches/flat/src/main/java/org/horizon/transaction/TransactionTable.java	2009-03-03 14:56:48 UTC (rev 7829)
@@ -26,11 +26,13 @@
 import org.horizon.context.TransactionContext;
 import org.horizon.factories.annotations.Inject;
 import org.horizon.factories.annotations.NonVolatile;
+import org.horizon.factories.annotations.Start;
 import org.horizon.factories.context.ContextFactory;
 import org.horizon.logging.Log;
 import org.horizon.logging.LogFactory;
 import org.horizon.remoting.RPCManager;
 import org.horizon.remoting.transport.Address;
+import org.horizon.remoting.transport.Transport;
 
 import javax.transaction.Status;
 import javax.transaction.SystemException;
@@ -64,6 +66,7 @@
    private TransactionManager transactionManager = null;
 
    private RPCManager rpcManager;
+   private Transport transport;
 
    private ContextFactory contextFactory;
 
@@ -74,6 +77,13 @@
       this.contextFactory = contextFactory;
    }
 
+   @Start(priority = 12)
+   // needs to happen after RPCManager
+   public void start() {
+      transport = rpcManager == null ? null : rpcManager.getTransport();
+   }
+
+
    /**
     * Returns the number of local transactions.
     */
@@ -312,7 +322,7 @@
    }
 
    private Address getAddress() {
-      return rpcManager == null ? null : rpcManager.getAddress();
+      return transport == null ? null : transport.getAddress();
    }
 
    public TransactionContext getTransactionContext(GlobalTransaction gtx) {

Modified: core/branches/flat/src/main/java/org/horizon/util/Util.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/util/Util.java	2009-03-03 13:01:12 UTC (rev 7828)
+++ core/branches/flat/src/main/java/org/horizon/util/Util.java	2009-03-03 14:56:48 UTC (rev 7829)
@@ -157,18 +157,26 @@
    }
 
    public static void closeStream(InputStream i) {
+      if (i == null) return;
       try {
-         if (i != null) i.close();
+         i.close();
       } catch (Exception e) {
 
       }
    }
 
-   public static void closeStream(OutputStream o) {
-       try {
-         if (o != null) o.close();
+   public static void flushAndCloseStream(OutputStream o) {
+      if (o == null) return;
+      try {
+         o.flush();
       } catch (Exception e) {
 
       }
+
+      try {
+         o.close();
+      } catch (Exception e) {
+
+      }
    }
 }

Modified: core/branches/flat/src/main/resources/config-samples/all.xml
===================================================================
--- core/branches/flat/src/main/resources/config-samples/all.xml	2009-03-03 13:01:12 UTC (rev 7828)
+++ core/branches/flat/src/main/resources/config-samples/all.xml	2009-03-03 14:56:48 UTC (rev 7829)
@@ -33,7 +33,8 @@
          There is no added cost to defining a transport but not creating a cache that uses one, since the transport
          is created and initialized lazily.
       -->
-      <transport transportClass="org.horizon.remoting.transport.jgroups.JGroupsTransport" clusterName="horizon-cluster">
+      <transport transportClass="org.horizon.remoting.transport.jgroups.JGroupsTransport" clusterName="horizon-cluster"
+                 distributedSyncTimeout="50000">
          <!-- Note that the JGroups transport uses sensible defaults if no configuration property is defined. -->
          <property name="configurationFile" value="udp.xml"/>
          <!-- See the JGroupsTransport javadocs for more options -->

Modified: core/branches/flat/src/main/resources/schema/horizon-config-1.0.xsd
===================================================================
--- core/branches/flat/src/main/resources/schema/horizon-config-1.0.xsd	2009-03-03 13:01:12 UTC (rev 7828)
+++ core/branches/flat/src/main/resources/schema/horizon-config-1.0.xsd	2009-03-03 14:56:48 UTC (rev 7829)
@@ -139,6 +139,7 @@
       </xs:sequence>
       <xs:attribute name="transportClass" type="xs:string"/>
       <xs:attribute name="clusterName" type="xs:string"/>
+      <xs:attribute name="distributedSyncTimeout" type="xs:long"/>
    </xs:complexType>
 
    <xs:complexType name="syncType">

Modified: core/branches/flat/src/test/java/org/horizon/api/mvcc/PutForExternalReadTest.java
===================================================================
--- core/branches/flat/src/test/java/org/horizon/api/mvcc/PutForExternalReadTest.java	2009-03-03 13:01:12 UTC (rev 7828)
+++ core/branches/flat/src/test/java/org/horizon/api/mvcc/PutForExternalReadTest.java	2009-03-03 14:56:48 UTC (rev 7829)
@@ -106,7 +106,7 @@
          // specify what we expectWithTx called on the mock Rpc Manager.  For params we don't care about, just use ANYTHING.
          // setting the mock object to expectWithTx the "sync" param to be false.
          expect(mockTransport.invokeRemotely((List<Address>) anyObject(), (RPCCommand) anyObject(),
-                                             eq(ResponseMode.ASYNCHRONOUS), anyLong(), anyBoolean(), (ResponseFilter) isNull())).andReturn(null);
+                                             eq(ResponseMode.ASYNCHRONOUS), anyLong(), anyBoolean(), (ResponseFilter) isNull(), anyBoolean())).andReturn(null);
 
          replay(mockAddress1, mockAddress2, mockTransport);
 
@@ -160,7 +160,7 @@
 
 
          expect(mockTransport.invokeRemotely(anyAddresses(), (RPCCommand) anyObject(), anyResponseMode(),
-                                             anyLong(), anyBoolean(), (ResponseFilter) anyObject()))
+                                             anyLong(), anyBoolean(), (ResponseFilter) anyObject(), anyBoolean()))
                .andThrow(new RuntimeException("Barf!")).anyTimes();
 
          replay(mockTransport);

Modified: core/branches/flat/src/test/java/org/horizon/invalidation/BaseInvalidationTest.java
===================================================================
--- core/branches/flat/src/test/java/org/horizon/invalidation/BaseInvalidationTest.java	2009-03-03 13:01:12 UTC (rev 7828)
+++ core/branches/flat/src/test/java/org/horizon/invalidation/BaseInvalidationTest.java	2009-03-03 14:56:48 UTC (rev 7829)
@@ -182,7 +182,7 @@
          expect(mockTransport.getAddress()).andReturn(addressOne).anyTimes();
          expect(mockTransport.invokeRemotely((List<Address>) anyObject(), (RPCCommand) anyObject(),
                                              eq(isSync ? ResponseMode.SYNCHRONOUS : ResponseMode.ASYNCHRONOUS),
-                                             anyLong(), anyBoolean(), (ResponseFilter) anyObject())).andReturn(null).anyTimes();
+                                             anyLong(), anyBoolean(), (ResponseFilter) anyObject(), anyBoolean())).andReturn(null).anyTimes();
          replay(mockTransport);
 
          cache1.put("k", "v");

Modified: core/branches/flat/src/test/java/org/horizon/replication/SyncReplTest.java
===================================================================
--- core/branches/flat/src/test/java/org/horizon/replication/SyncReplTest.java	2009-03-03 13:01:12 UTC (rev 7828)
+++ core/branches/flat/src/test/java/org/horizon/replication/SyncReplTest.java	2009-03-03 14:56:48 UTC (rev 7829)
@@ -145,7 +145,7 @@
          rpcManager.setTransport(mockTransport);
 
          expect(mockTransport.invokeRemotely((List<Address>) anyObject(), (RPCCommand) anyObject(), eq(ResponseMode.SYNCHRONOUS),
-                                             anyLong(), anyBoolean(), (ResponseFilter) anyObject()))
+                                             anyLong(), anyBoolean(), (ResponseFilter) anyObject(), anyBoolean()))
                .andReturn(Collections.emptyList()).once();
 
          replay(mockTransport);
@@ -157,7 +157,7 @@
          expect(mockTransport.getAddress()).andReturn(mockAddressOne).anyTimes();
          expect(mockTransport.getMembers()).andReturn(addresses).anyTimes();
          expect(mockTransport.invokeRemotely((List<Address>) anyObject(), (RPCCommand) anyObject(), eq(ResponseMode.ASYNCHRONOUS),
-                                             anyLong(), anyBoolean(), (ResponseFilter) anyObject()))
+                                             anyLong(), anyBoolean(), (ResponseFilter) anyObject(), anyBoolean()))
                .andReturn(Collections.emptyList()).once();
 
          replay(mockTransport);

Added: core/branches/flat/src/test/java/org/horizon/statetransfer/StateTransferCacheLoaderFunctionalTest.java
===================================================================
--- core/branches/flat/src/test/java/org/horizon/statetransfer/StateTransferCacheLoaderFunctionalTest.java	                        (rev 0)
+++ core/branches/flat/src/test/java/org/horizon/statetransfer/StateTransferCacheLoaderFunctionalTest.java	2009-03-03 14:56:48 UTC (rev 7829)
@@ -0,0 +1,88 @@
+package org.horizon.statetransfer;
+
+import org.horizon.Cache;
+import org.horizon.config.CacheLoaderManagerConfig;
+import org.horizon.loader.CacheLoader;
+import org.horizon.loader.CacheLoaderConfig;
+import org.horizon.loader.CacheLoaderManager;
+import org.horizon.loader.dummy.DummyInMemoryCacheStore;
+import org.horizon.manager.CacheManager;
+import org.horizon.test.TestingUtil;
+import org.testng.annotations.Test;
+
+ at Test(groups = "functional", testName = "statetransfer.StateTransferCacheLoaderFunctionalTest", enabled = false)
+public class StateTransferCacheLoaderFunctionalTest extends StateTransferFunctionalTest {
+   int id;
+   ThreadLocal<Boolean> sharedCacheLoader = new ThreadLocal<Boolean>() {
+      protected Boolean initialValue() {
+         return false;
+      }
+   };
+
+   @Override
+   protected CacheManager createCacheManager() {
+      // increment the DIMCS store id
+      CacheLoaderManagerConfig clmc = new CacheLoaderManagerConfig();
+      CacheLoaderConfig clc = new DummyInMemoryCacheStore.Cfg("store number " + id++);
+      clmc.addCacheLoaderConfig(clc);
+      clc.setFetchPersistentState(true);
+      clmc.setShared(sharedCacheLoader.get());
+      config.setCacheLoaderManagerConfig(clmc);
+      return super.createCacheManager();
+   }
+
+   @Override
+   protected void writeInitialData(final Cache<Object, Object> c) {
+      super.writeInitialData(c);
+      c.evict(A_B_NAME);
+      c.evict(A_B_AGE);
+      c.evict(A_C_NAME);
+      c.evict(A_C_AGE);
+      c.evict(A_D_NAME);
+      c.evict(A_D_AGE);
+   }
+
+   protected void verifyInitialDataOnLoader(Cache<Object, Object> c) throws Exception {
+      CacheLoader l = TestingUtil.extractComponent(c, CacheLoaderManager.class).getCacheLoader();
+      assert l.containsKey(A_B_AGE);
+      assert l.containsKey(A_B_NAME);
+      assert l.containsKey(A_C_AGE);
+      assert l.containsKey(A_C_NAME);
+      assert l.load(A_B_AGE).getValue().equals(TWENTY);
+      assert l.load(A_B_NAME).getValue().equals(JOE);
+      assert l.load(A_C_AGE).getValue().equals(FORTY);
+      assert l.load(A_C_NAME).getValue().equals(BOB);
+   }
+
+   protected void verifyNoData(Cache<Object, Object> c) {
+      assert c.isEmpty() : "Cache should be empty!";
+   }
+
+   protected void verifyNoDataOnLoader(Cache<Object, Object> c) throws Exception {
+      CacheLoader l = TestingUtil.extractComponent(c, CacheLoaderManager.class).getCacheLoader();
+      assert !l.containsKey(A_B_AGE);
+      assert !l.containsKey(A_B_NAME);
+      assert !l.containsKey(A_C_AGE);
+      assert !l.containsKey(A_C_NAME);
+      assert !l.containsKey(A_D_AGE);
+      assert !l.containsKey(A_D_NAME);
+   }
+
+
+   public void testSharedLoader() throws Exception {
+      sharedCacheLoader.set(true);
+      Cache<Object, Object> c1 = createCacheManager().getCache(cacheName);
+      writeInitialData(c1);
+
+      // starting the second cache would initialize an in-memory state transfer but not a persistent one since the loader is shared
+      Cache<Object, Object> c2 = createCacheManager().getCache(cacheName);
+
+      TestingUtil.blockUntilViewsReceived(60000, c1, c2);
+
+      verifyInitialDataOnLoader(c1);
+      verifyInitialData(c1);
+
+      verifyNoDataOnLoader(c2);
+      verifyNoData(c2);
+   }
+}

Modified: core/branches/flat/src/test/java/org/horizon/statetransfer/StateTransferFunctionalTest.java
===================================================================
--- core/branches/flat/src/test/java/org/horizon/statetransfer/StateTransferFunctionalTest.java	2009-03-03 13:01:12 UTC (rev 7828)
+++ core/branches/flat/src/test/java/org/horizon/statetransfer/StateTransferFunctionalTest.java	2009-03-03 14:56:48 UTC (rev 7829)
@@ -1,13 +1,13 @@
 package org.horizon.statetransfer;
 
 import org.horizon.Cache;
-import org.horizon.transaction.DummyTransactionManagerLookup;
 import org.horizon.config.Configuration;
 import org.horizon.logging.Log;
 import org.horizon.logging.LogFactory;
 import org.horizon.manager.CacheManager;
 import org.horizon.test.MultipleCacheManagersTest;
 import org.horizon.test.TestingUtil;
+import org.horizon.transaction.DummyTransactionManagerLookup;
 import org.testng.annotations.Test;
 
 import javax.transaction.TransactionManager;
@@ -34,7 +34,7 @@
    public static final Integer FORTY = 40;
 
    Configuration config;
-   private static final String cacheName = "nbst";
+   protected static final String cacheName = "nbst";
 
    private volatile int testCount = 0;
 
@@ -54,9 +54,9 @@
       config.setTransactionManagerLookupClass(DummyTransactionManagerLookup.class.getName());
    }
 
-   private CacheManager createCacheManager() {
+   protected CacheManager createCacheManager() {
       CacheManager cm = addClusterEnabledCacheManager();
-      cm.defineCache(cacheName, config);
+      cm.defineCache(cacheName, config.clone());
       return cm;
    }
 
@@ -86,17 +86,18 @@
 
    }
 
-   private static class WritingRunner implements Runnable {
+   private static class WritingThread extends Thread {
       private final Cache<Object, Object> cache;
       private final boolean tx;
       private volatile boolean stop;
       private volatile int result;
       private TransactionManager tm;
 
-      WritingRunner(Cache<Object, Object> cache, boolean tx) {
+      WritingThread(Cache<Object, Object> cache, boolean tx) {
          this.cache = cache;
          this.tx = tx;
          if (tx) tm = TestingUtil.getTransactionManager(cache);
+         setDaemon(true);
       }
 
       public int result() {
@@ -107,19 +108,27 @@
          int c = 0;
          while (!stop) {
             try {
-               if (tx) tm.begin();
-               cache.put("test" + c, c++);
-               if (tx) tm.commit();
+               if (c % 1000 == 0) {
+                  if (tx) tm.begin();
+                  for (int i = 0; i < 1000; i++) cache.remove("test" + c);
+                  if (tx) tm.commit();
+                  c = 0;
+               } else {
+                  if (tx) tm.begin();
+                  cache.put("test" + c, c++);
+                  if (tx) tm.commit();
+               }
             }
             catch (Exception e) {
-               e.printStackTrace();
-               log.error(e);
+//               e.printStackTrace();
+//               log.error(e);
+               stopThread();
             }
          }
          result = c;
       }
 
-      public void stop() {
+      public void stopThread() {
          stop = true;
       }
    }
@@ -163,6 +172,7 @@
             cm3.getCache(cacheName);
          }
       });
+      t1.setName("CacheStarter-Cache3");
       t1.start();
 
       Thread t2 = new Thread(new Runnable() {
@@ -170,6 +180,7 @@
             cm4.getCache(cacheName);
          }
       });
+      t2.setName("CacheStarter-Cache4");
       t2.start();
 
       t1.join();
@@ -178,7 +189,7 @@
       cache3 = cm3.getCache(cacheName);
       cache4 = cm4.getCache(cacheName);
 
-      TestingUtil.blockUntilViewsReceived(60000, cache1, cache2, cache3, cache4);
+      TestingUtil.blockUntilViewsReceived(120000, cache1, cache2, cache3, cache4);
       verifyInitialData(cache3);
       verifyInitialData(cache4);
       log.info("testConcurrentStateTransfer end - " + testCount);
@@ -205,6 +216,7 @@
       log.info("testSTWithWritingNonTxThread end - " + testCount);
    }
 
+   @Test(invocationCount = 10)
    public void testSTWithWritingTxThread() throws Exception {
       testCount++;
       log.info("testSTWithWritingTxThread start - " + testCount);
@@ -222,8 +234,7 @@
       // Delay the transient copy, so that we get a more thorough log test
       cache1.put("delay", new DelayTransfer());
 
-      WritingRunner writer = new WritingRunner(cache3, tx);
-      Thread writerThread = new Thread(writer);
+      WritingThread writerThread = new WritingThread(cache3, tx);
       writerThread.start();
 
       cache2 = createCacheManager().getCache(cacheName);
@@ -231,25 +242,25 @@
       // Pause to give caches time to see each other
       TestingUtil.blockUntilViewsReceived(60000, cache1, cache2, cache3);
 
-      writer.stop();
+      writerThread.stopThread();
       writerThread.join();
 
       verifyInitialData(cache2);
 
-      int count = writer.result();
+      int count = writerThread.result();
 
       for (int c = 0; c < count; c++)
          assert cache2.get("test" + c).equals(c);
    }
 
-   private void verifyInitialData(Cache<Object, Object> c) {
+   protected void verifyInitialData(Cache<Object, Object> c) {
       assert JOE.equals(c.get(A_B_NAME)) : "Incorrect value for key " + A_B_NAME;
       assert TWENTY.equals(c.get(A_B_AGE)) : "Incorrect value for key " + A_B_AGE;
       assert BOB.equals(c.get(A_C_NAME)) : "Incorrect value for key " + A_C_NAME;
       assert FORTY.equals(c.get(A_C_AGE)) : "Incorrect value for key " + A_C_AGE;
    }
 
-   private void writeInitialData(final Cache<Object, Object> c) {
+   protected void writeInitialData(final Cache<Object, Object> c) {
       c.put(A_B_NAME, JOE);
       c.put(A_B_AGE, TWENTY);
       c.put(A_C_NAME, BOB);
@@ -261,25 +272,24 @@
       cache1 = createCacheManager().getCache(cacheName);
 
       writeInitialData(cache1);
-
       // Delay the transient copy, so that we get a more thorough log test
       cache1.put("delay", new DelayTransfer());
 
-      WritingRunner writer = new WritingRunner(cache1, tx);
-      Thread writerThread = new Thread(writer);
+      WritingThread writerThread = new WritingThread(cache1, tx);
       writerThread.start();
-
+      verifyInitialData(cache1);
       cache2 = createCacheManager().getCache(cacheName);
 
       // Pause to give caches time to see each other
       TestingUtil.blockUntilViewsReceived(60000, cache1, cache2);
 
-      writer.stop();
+      writerThread.stopThread();
       writerThread.join();
 
+      verifyInitialData(cache1);
       verifyInitialData(cache2);
 
-      int count = writer.result();
+      int count = writerThread.result();
 
       for (int c = 0; c < count; c++)
          assert cache2.get("test" + c).equals(c);




More information about the jbosscache-commits mailing list