[jbosscache-commits] JBoss Cache SVN: r6897 - in core/branches/flat/src: main/java/org/jboss/starobrno/commands/read and 9 other directories.

jbosscache-commits at lists.jboss.org jbosscache-commits at lists.jboss.org
Thu Oct 9 09:22:09 EDT 2008


Author: manik.surtani at jboss.com
Date: 2008-10-09 09:22:09 -0400 (Thu, 09 Oct 2008)
New Revision: 6897

Added:
   core/branches/flat/src/main/java/org/jboss/starobrno/container/MVCCEntry.java
   core/branches/flat/src/main/java/org/jboss/starobrno/container/MVCCEntryCreator.java
   core/branches/flat/src/main/java/org/jboss/starobrno/container/NullMarkerEntry.java
   core/branches/flat/src/main/java/org/jboss/starobrno/container/ReadCommittedEntry.java
   core/branches/flat/src/main/java/org/jboss/starobrno/container/RepeatableReadEntry.java
   core/branches/flat/src/test/java/org/jboss/starobrno/profiling/
   core/branches/flat/src/test/java/org/jboss/starobrno/profiling/MemConsumptionTest.java
   core/branches/flat/src/test/java/org/jboss/starobrno/util/
   core/branches/flat/src/test/java/org/jboss/starobrno/util/TestingUtil.java
Removed:
   core/branches/flat/src/main/java/org/jboss/starobrno/DataContainer.java
   core/branches/flat/src/main/java/org/jboss/starobrno/RPCManager.java
   core/branches/flat/src/main/java/org/jboss/starobrno/ReplicationException.java
   core/branches/flat/src/main/java/org/jboss/starobrno/TxInterceptor.java
   core/branches/flat/src/main/java/org/jboss/starobrno/UnsortedDataContainer.java
   core/branches/flat/src/main/java/org/jboss/starobrno/mvcc/
Modified:
   core/branches/flat/src/main/java/org/jboss/starobrno/CacheDelegate.java
   core/branches/flat/src/main/java/org/jboss/starobrno/CacheSPI.java
   core/branches/flat/src/main/java/org/jboss/starobrno/commands/read/GetKeyValueCommand.java
   core/branches/flat/src/main/java/org/jboss/starobrno/commands/write/ClearCommand.java
   core/branches/flat/src/main/java/org/jboss/starobrno/commands/write/EvictCommand.java
   core/branches/flat/src/main/java/org/jboss/starobrno/commands/write/PutKeyValueCommand.java
   core/branches/flat/src/main/java/org/jboss/starobrno/commands/write/PutMapCommand.java
   core/branches/flat/src/main/java/org/jboss/starobrno/commands/write/RemoveCommand.java
   core/branches/flat/src/main/java/org/jboss/starobrno/commands/write/ReplaceCommand.java
   core/branches/flat/src/main/java/org/jboss/starobrno/container/DataContainer.java
   core/branches/flat/src/main/java/org/jboss/starobrno/container/UnsortedDataContainer.java
   core/branches/flat/src/main/java/org/jboss/starobrno/context/EntryLookup.java
   core/branches/flat/src/main/java/org/jboss/starobrno/context/InvocationContextImpl.java
   core/branches/flat/src/main/java/org/jboss/starobrno/context/TransactionContextImpl.java
   core/branches/flat/src/main/java/org/jboss/starobrno/factories/EmptyConstructorFactory.java
   core/branches/flat/src/main/java/org/jboss/starobrno/factories/EntryFactory.java
   core/branches/flat/src/main/java/org/jboss/starobrno/factories/EntryFactoryImpl.java
   core/branches/flat/src/main/java/org/jboss/starobrno/interceptors/InterceptorChain.java
   core/branches/flat/src/main/java/org/jboss/starobrno/interceptors/LockingInterceptor.java
   core/branches/flat/src/main/java/org/jboss/starobrno/notifications/NotifierImpl.java
   core/branches/flat/src/test/java/org/jboss/starobrno/BasicTest.java
Log:
Working mem tests

Modified: core/branches/flat/src/main/java/org/jboss/starobrno/CacheDelegate.java
===================================================================
--- core/branches/flat/src/main/java/org/jboss/starobrno/CacheDelegate.java	2008-10-09 12:52:15 UTC (rev 6896)
+++ core/branches/flat/src/main/java/org/jboss/starobrno/CacheDelegate.java	2008-10-09 13:22:09 UTC (rev 6897)
@@ -24,10 +24,10 @@
 import org.jboss.cache.CacheStatus;
 import org.jboss.cache.buddyreplication.BuddyManager;
 import org.jboss.cache.buddyreplication.GravitateResult;
-import org.jboss.cache.interceptors.base.CommandInterceptor;
 import org.jboss.cache.loader.CacheLoaderManager;
 import org.jboss.cache.marshall.Marshaller;
 import org.jboss.cache.statetransfer.StateTransferManager;
+import org.jboss.starobrno.batch.BatchContainer;
 import org.jboss.starobrno.commands.CommandsFactory;
 import org.jboss.starobrno.commands.read.GetKeyValueCommand;
 import org.jboss.starobrno.commands.read.SizeCommand;
@@ -44,11 +44,12 @@
 import org.jboss.starobrno.factories.annotations.Inject;
 import org.jboss.starobrno.factories.annotations.NonVolatile;
 import org.jboss.starobrno.interceptors.InterceptorChain;
+import org.jboss.starobrno.interceptors.base.CommandInterceptor;
 import org.jboss.starobrno.invocation.InvocationContextContainer;
 import org.jboss.starobrno.notifications.Notifier;
+import org.jboss.starobrno.remoting.RPCManager;
 import org.jboss.starobrno.transaction.GlobalTransaction;
 import org.jboss.starobrno.transaction.TransactionTable;
-import org.jboss.starobrno.batch.BatchContainer;
 
 import javax.transaction.Transaction;
 import javax.transaction.TransactionManager;
@@ -295,7 +296,7 @@
       return null;  //TODO: Autogenerated.  Implement me properly
    }
 
-   public Object getRPCManager()
+   public RPCManager getRPCManager()
    {
       return null;  //TODO: Autogenerated.  Implement me properly
    }
@@ -305,7 +306,7 @@
       return null;  //TODO: Autogenerated.  Implement me properly
    }
 
-   public org.jboss.cache.notifications.Notifier getNotifier()
+   public Notifier getNotifier()
    {
       return null;  //TODO: Autogenerated.  Implement me properly
    }

Modified: core/branches/flat/src/main/java/org/jboss/starobrno/CacheSPI.java
===================================================================
--- core/branches/flat/src/main/java/org/jboss/starobrno/CacheSPI.java	2008-10-09 12:52:15 UTC (rev 6896)
+++ core/branches/flat/src/main/java/org/jboss/starobrno/CacheSPI.java	2008-10-09 13:22:09 UTC (rev 6897)
@@ -24,14 +24,14 @@
 import net.jcip.annotations.ThreadSafe;
 import org.jboss.cache.buddyreplication.BuddyManager;
 import org.jboss.cache.buddyreplication.GravitateResult;
-import org.jboss.cache.interceptors.base.CommandInterceptor;
-import org.jboss.cache.loader.CacheLoader;
 import org.jboss.cache.loader.CacheLoaderManager;
 import org.jboss.cache.marshall.Marshaller;
-import org.jboss.cache.notifications.Notifier;
 import org.jboss.cache.statetransfer.StateTransferManager;
 import org.jboss.starobrno.context.InvocationContext;
 import org.jboss.starobrno.factories.ComponentRegistry;
+import org.jboss.starobrno.interceptors.base.CommandInterceptor;
+import org.jboss.starobrno.notifications.Notifier;
+import org.jboss.starobrno.remoting.RPCManager;
 import org.jboss.starobrno.transaction.GlobalTransaction;
 import org.jboss.starobrno.transaction.TransactionTable;
 
@@ -168,7 +168,7 @@
     *
     * @return the {@link org.jboss.cache.RPCManager} configured.
     */
-   Object getRPCManager();
+   RPCManager getRPCManager();
 
    /**
     * Retrieves the current StateTransferManager instance associated with the current Cache instance.

Deleted: core/branches/flat/src/main/java/org/jboss/starobrno/DataContainer.java
===================================================================
--- core/branches/flat/src/main/java/org/jboss/starobrno/DataContainer.java	2008-10-09 12:52:15 UTC (rev 6896)
+++ core/branches/flat/src/main/java/org/jboss/starobrno/DataContainer.java	2008-10-09 13:22:09 UTC (rev 6897)
@@ -1,48 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2008, Red Hat Middleware LLC, and individual contributors
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-package org.jboss.starobrno;
-
-import java.util.Map.Entry;
-import java.util.Set;
-
-/**
- * // TODO: MANIK: Document this
- *
- * @author Manik Surtani (<a href="mailto:manik at jboss.org">manik at jboss.org</a>)
- * @since 3.0
- */
-public interface DataContainer<K, V>
-{
-   Set<Entry<K, V>> getEntries();
-
-   Entry<K, V> getEntry(K k);
-
-   void putEntry(Entry<K, V> entry);
-
-   boolean exists(Entry<K, V> entry);
-
-   int size();
-
-   void clear();
-
-   void removeEntry(K key);
-}

Deleted: core/branches/flat/src/main/java/org/jboss/starobrno/RPCManager.java
===================================================================
--- core/branches/flat/src/main/java/org/jboss/starobrno/RPCManager.java	2008-10-09 12:52:15 UTC (rev 6896)
+++ core/branches/flat/src/main/java/org/jboss/starobrno/RPCManager.java	2008-10-09 13:22:09 UTC (rev 6897)
@@ -1,154 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source.
- * Copyright 2000 - 2008, Red Hat Middleware LLC, and individual contributors
- * as indicated by the @author tags. See the copyright.txt file in the
- * distribution for a full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-package org.jboss.starobrno;
-
-import org.jboss.cache.commands.ReplicableCommand;
-import org.jboss.cache.Fqn;
-import org.jgroups.Address;
-import org.jgroups.Channel;
-import org.jgroups.blocks.RspFilter;
-
-import java.util.List;
-import java.util.Vector;
-
-/**
- * Provides a mechanism for communicating with other caches in the cluster.  For now this is based on JGroups as an underlying
- * transport, and in future more transport options may become available.
- * <p/>
- * Implementations have a simple lifecycle:
- * <ul>
- * <li>start() - starts the underlying channel based on configuration options injected, and connects the channel</li>
- * <li>disconnect() - disconnects the channel</li>
- * <li>stop() - stops the dispatcher and releases resources</li>
- * </ul>
- *
- * @author Manik Surtani
- * @since 2.1.0
- */
-public interface RPCManager
-{
-   /**
-    * Disconnects and closes the underlying JGroups channel.
-    */
-   void disconnect();
-
-   /**
-    * Stops the RPCDispatcher and frees resources.  Closes and disconnects the underlying JGroups channel if this is
-    * still open/connected.
-    */
-   void stop();
-
-   /**
-    * Starts the RPCManager by connecting the underlying JGroups channel (if configured for replication).  Connecting
-    * the channel may also involve state transfer (if configured) so the interceptor chain should be started and
-    * available before this method is called.
-    */
-   void start();
-
-   /**
-    * Invokes an RPC call on other caches in the cluster.
-    *
-    * @param recipients          a list of Addresses to invoke the call on.  If this is null, the call is broadcast to the entire cluster.
-    * @param cacheCommand        the cache command to invoke
-    * @param mode                the group request mode to use.  See {@link org.jgroups.blocks.GroupRequest}.
-    * @param timeout             a timeout after which to throw a replication exception.
-    * @param responseFilter      a response filter with which to filter out failed/unwanted/invalid responses.
-    * @param useOutOfBandMessage if true, the message is put on JGroups' OOB queue.  See JGroups docs for more info.
-    * @return a list of responses from each member contacted.
-    * @throws Exception in the event of problems.
-    */
-   List<Object> callRemoteMethods(Vector<Address> recipients, ReplicableCommand cacheCommand, int mode, long timeout, RspFilter responseFilter, boolean useOutOfBandMessage) throws Exception;
-
-   /**
-    * Invokes an RPC call on other caches in the cluster.
-    *
-    * @param recipients          a list of Addresses to invoke the call on.  If this is null, the call is broadcast to the entire cluster.
-    * @param cacheCommand        the cache command to invoke
-    * @param mode                the group request mode to use.  See {@link org.jgroups.blocks.GroupRequest}.
-    * @param timeout             a timeout after which to throw a replication exception.
-    * @param useOutOfBandMessage if true, the message is put on JGroups' OOB queue.  See JGroups docs for more info.
-    * @return a list of responses from each member contacted.
-    * @throws Exception in the event of problems.
-    */
-   List<Object> callRemoteMethods(Vector<Address> recipients, ReplicableCommand cacheCommand, int mode, long timeout, boolean useOutOfBandMessage) throws Exception;
-
-   /**
-    * Invokes an RPC call on other caches in the cluster.
-    *
-    * @param recipients          a list of Addresses to invoke the call on.  If this is null, the call is broadcast to the entire cluster.
-    * @param cacheCommand        the cache command to invoke
-    * @param synchronous         if true, sets group request mode to {@link org.jgroups.blocks.GroupRequest#GET_ALL}, and if false sets it to {@link org.jgroups.blocks.GroupRequest#GET_NONE}.
-    * @param timeout             a timeout after which to throw a replication exception.
-    * @param useOutOfBandMessage if true, the message is put on JGroups' OOB queue.  See JGroups docs for more info.
-    * @return a list of responses from each member contacted.
-    * @throws Exception in the event of problems.
-    */
-   List<Object> callRemoteMethods(Vector<Address> recipients, ReplicableCommand cacheCommand, boolean synchronous, long timeout, boolean useOutOfBandMessage) throws Exception;
-
-   /**
-    * @return true if the current Channel is the coordinator of the cluster.
-    */
-   boolean isCoordinator();
-
-   /**
-    * @return the Address of the current coordinator.
-    */
-   Address getCoordinator();
-
-   /**
-    * Retrieves the local JGroups channel's address
-    *
-    * @return an Address
-    */
-   Address getLocalAddress();
-
-   /**
-    * Returns a defensively copied list of  members in the current cluster view.
-    */
-   List<Address> getMembers();
-
-   /**
-    * Retrieves partial state from remote instances.
-    *
-    * @param sources           sources to consider for a state transfer
-    * @param sourceTarget      Fqn on source to retrieve state for
-    * @param integrationTarget integration point on local cache to apply state
-    * @throws Exception in the event of problems
-    */
-   void fetchPartialState(List<Address> sources, Fqn sourceTarget, Fqn integrationTarget) throws Exception;
-
-   /**
-    * Retrieves partial state from remote instances.
-    *
-    * @param sources sources to consider for a state transfer
-    * @param subtree Fqn subtree to retrieve.  Will be integrated at the same point.
-    * @throws Exception in the event of problems
-    */
-   void fetchPartialState(List<Address> sources, Fqn subtree) throws Exception;
-
-   /**
-    * Retrieves the Channel
-    *
-    * @return a channel
-    */
-   Channel getChannel();
-}
\ No newline at end of file

Deleted: core/branches/flat/src/main/java/org/jboss/starobrno/ReplicationException.java
===================================================================
--- core/branches/flat/src/main/java/org/jboss/starobrno/ReplicationException.java	2008-10-09 12:52:15 UTC (rev 6896)
+++ core/branches/flat/src/main/java/org/jboss/starobrno/ReplicationException.java	2008-10-09 13:22:09 UTC (rev 6897)
@@ -1,57 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source.
- * Copyright 2000 - 2008, Red Hat Middleware LLC, and individual contributors
- * as indicated by the @author tags. See the copyright.txt file in the
- * distribution for a full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-package org.jboss.starobrno;
-
-/**
- * Thrown when a replication problem occurred
- */
-public class ReplicationException extends CacheException
-{
-
-   private static final long serialVersionUID = 33172388691879866L;
-
-   public ReplicationException()
-   {
-      super();
-   }
-
-   public ReplicationException(Throwable cause)
-   {
-      super(cause);
-   }
-
-   public ReplicationException(String msg)
-   {
-      super(msg);
-   }
-
-   public ReplicationException(String msg, Throwable cause)
-   {
-      super(msg, cause);
-   }
-
-   @Override
-   public String toString()
-   {
-      return super.toString();
-   }
-}
\ No newline at end of file

Deleted: core/branches/flat/src/main/java/org/jboss/starobrno/TxInterceptor.java
===================================================================
--- core/branches/flat/src/main/java/org/jboss/starobrno/TxInterceptor.java	2008-10-09 12:52:15 UTC (rev 6896)
+++ core/branches/flat/src/main/java/org/jboss/starobrno/TxInterceptor.java	2008-10-09 13:22:09 UTC (rev 6897)
@@ -1,1041 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source.
- * Copyright 2000 - 2008, Red Hat Middleware LLC, and individual contributors
- * as indicated by the @author tags. See the copyright.txt file in the
- * distribution for a full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-package org.jboss.starobrno;
-
-import org.jboss.cache.util.concurrent.ConcurrentHashSet;
-import org.jboss.starobrno.commands.CommandsFactory;
-import org.jboss.starobrno.commands.ReplicableCommand;
-import org.jboss.starobrno.commands.VisitableCommand;
-import org.jboss.starobrno.commands.tx.CommitCommand;
-import org.jboss.starobrno.commands.tx.PrepareCommand;
-import org.jboss.starobrno.commands.tx.RollbackCommand;
-import org.jboss.starobrno.config.Option;
-import org.jboss.starobrno.context.InvocationContext;
-import org.jboss.starobrno.context.TransactionContext;
-import org.jboss.starobrno.factories.ComponentRegistry;
-import org.jboss.starobrno.factories.annotations.Inject;
-import org.jboss.starobrno.factories.context.ContextFactory;
-import org.jboss.starobrno.interceptors.BaseTransactionalContextInterceptor;
-import org.jboss.starobrno.invocation.InvocationContextContainer;
-import org.jboss.starobrno.jmx.annotations.ManagedAttribute;
-import org.jboss.starobrno.jmx.annotations.ManagedOperation;
-import org.jboss.starobrno.lock.LockManager;
-import org.jboss.starobrno.notifications.Notifier;
-import org.jboss.starobrno.transaction.GlobalTransaction;
-import org.jboss.starobrno.transaction.OrderedSynchronizationHandler;
-import org.jboss.starobrno.transaction.TransactionTable;
-
-import javax.transaction.*;
-import java.util.*;
-import java.util.concurrent.ConcurrentHashMap;
-
-/**
- * This interceptor is the new default at the head of all interceptor chains,
- * and makes transactional attributes available to all interceptors in the chain.
- * This interceptor is also responsible for registering for synchronisation on
- * transaction completion.
- *
- * @author <a href="mailto:manik at jboss.org">Manik Surtani (manik at jboss.org)</a>
- * @author <a href="mailto:stevew at jofti.com">Steve Woodcock (stevew at jofti.com)</a>
- */
-public class TxInterceptor extends BaseTransactionalContextInterceptor
-{
-   protected CommandsFactory commandsFactory;
-   protected RPCManager rpcManager;
-   private Notifier notifier;
-   private InvocationContextContainer invocationContextContainer;
-   private ComponentRegistry componentRegistry;
-   private ContextFactory contextFactory;
-
-   /**
-    * List <Transaction>that we have registered for
-    */
-   private final Set<Transaction> transactions = new ConcurrentHashSet<Transaction>();
-   private final Map<Transaction, GlobalTransaction> rollbackTransactions = new ConcurrentHashMap<Transaction, GlobalTransaction>(16);
-   private long prepares = 0;
-   private long commits = 0;
-   private long rollbacks = 0;
-   protected boolean optimistic = false;
-   private LockManager lockManager;
-   private boolean statsEnabled;
-
-   @Inject
-   public void intialize(RPCManager rpcManager, ContextFactory contextFactory,
-                         Notifier notifier, InvocationContextContainer icc,
-                         CommandsFactory factory, ComponentRegistry componentRegistry, LockManager lockManager)
-   {
-      this.contextFactory = contextFactory;
-      this.commandsFactory = factory;
-      this.rpcManager = rpcManager;
-      this.notifier = notifier;
-      this.invocationContextContainer = icc;
-      this.componentRegistry = componentRegistry;
-      this.lockManager = lockManager;
-      setStatisticsEnabled(configuration.isExposeManagementStatistics());
-   }
-
-   @Override
-   public Object visitPrepareCommand(InvocationContext ctx, PrepareCommand command) throws Throwable
-   {
-      Object result = null;
-
-      // this is a prepare, commit, or rollback.
-      if (log.isDebugEnabled()) log.debug("Got gtx from invocation context " + ctx.getGlobalTransaction());
-      try
-      {
-         if (ctx.getGlobalTransaction().isRemote())
-         {
-            result = handleRemotePrepare(ctx, command);
-            if (getStatisticsEnabled()) prepares++;
-         }
-         else
-         {
-            if (trace) log.trace("received my own message (discarding it)");
-            result = null;
-         }
-      }
-      catch (Throwable e)
-      {
-         ctx.throwIfNeeded(e);
-      }
-
-      return result;
-   }
-
-   @Override
-   public Object visitCommitCommand(InvocationContext ctx, CommitCommand command) throws Throwable
-   {
-      if (!ctx.getGlobalTransaction().isRemote())
-      {
-         if (trace) log.trace("received my own message (discarding it)");
-         return null;
-      }
-      try
-      {
-         if (trace) log.trace("(" + rpcManager.getLocalAddress() + ") call on command [" + command + "]");
-         GlobalTransaction gtx = ctx.getGlobalTransaction();
-         Transaction ltx = txTable.getLocalTransaction(gtx, true);
-         // disconnect if we have a current tx associated
-         Transaction currentTx = txManager.getTransaction();
-         boolean resumeCurrentTxOnCompletion = false;
-         try
-         {
-            if (!ltx.equals(currentTx))
-            {
-               currentTx = txManager.suspend();
-               resumeCurrentTxOnCompletion = true;
-               txManager.resume(ltx);
-               // make sure we set this in the ctx
-               ctx.setTransaction(ltx);
-            }
-            if (log.isDebugEnabled()) log.debug(" executing commit() with local TX " + ltx + " under global tx " + gtx);
-            txManager.commit();
-            if (getStatisticsEnabled()) commits++;
-         }
-         finally
-         {
-            //resume the old transaction if we suspended it
-            if (resumeCurrentTxOnCompletion)
-            {
-               resumeTransactionOnCompletion(ctx, currentTx);
-            }
-            // remove from local lists.
-            transactions.remove(ltx);
-            // this tx has completed.  Clean up in the tx table.
-            txTable.remove(gtx, ltx);
-         }
-         if (log.isDebugEnabled()) log.debug("Finished remote rollback method for " + gtx);
-      }
-      catch (Throwable throwable)
-      {
-         ctx.throwIfNeeded(throwable);
-      }
-      return null;
-   }
-
-   @Override
-   public Object visitRollbackCommand(InvocationContext ctx, RollbackCommand command) throws Throwable
-   {
-      if (!ctx.getGlobalTransaction().isRemote())
-      {
-         if (trace) log.trace("received my own message (discarding it)");
-         return null;
-      }
-      try
-      {
-         if (trace) log.trace("(" + rpcManager.getLocalAddress() + ") call on command [" + command + "]");
-         GlobalTransaction gtx = ctx.getGlobalTransaction();
-         Transaction ltx = txTable.getLocalTransaction(gtx);
-         if (ltx == null)
-         {
-            log.warn("No local transaction for this remotely originating rollback.  Possibly rolling back before a prepare call was broadcast?");
-            txTable.remove(gtx);
-            return null;
-         }
-         // disconnect if we have a current tx associated
-         Transaction currentTx = txManager.getTransaction();
-         boolean resumeCurrentTxOnCompletion = false;
-         try
-         {
-            if (!ltx.equals(currentTx))
-            {
-               currentTx = txManager.suspend();
-               resumeCurrentTxOnCompletion = true;
-               txManager.resume(ltx);
-               // make sure we set this in the ctx
-               ctx.setTransaction(ltx);
-            }
-            if (log.isDebugEnabled()) log.debug("executing with local TX " + ltx + " under global tx " + gtx);
-            txManager.rollback();
-            if (getStatisticsEnabled()) rollbacks++;
-         }
-         finally
-         {
-            //resume the old transaction if we suspended it
-            if (resumeCurrentTxOnCompletion)
-            {
-               resumeTransactionOnCompletion(ctx, currentTx);
-            }
-
-            // remove from local lists.
-            transactions.remove(ltx);
-
-            // this tx has completed.  Clean up in the tx table.
-            txTable.remove(gtx, ltx);
-         }
-         if (log.isDebugEnabled()) log.debug("Finished remote commit/rollback method for " + gtx);
-      }
-      catch (Throwable throwable)
-      {
-         ctx.throwIfNeeded(throwable);
-      }
-
-      return null;
-   }
-
-   @Override
-   public Object handleDefault(InvocationContext ctx, VisitableCommand command) throws Throwable
-   {
-      try
-      {
-         return attachGtxAndPassUpChain(ctx, command);
-      }
-      catch (Throwable throwable)
-      {
-         ctx.throwIfNeeded(throwable);
-         return null;
-      }
-   }
-
-   protected Object attachGtxAndPassUpChain(InvocationContext ctx, VisitableCommand command) throws Throwable
-   {
-      Transaction tx = ctx.getTransaction();
-      if (tx != null) attachGlobalTransaction(ctx, tx, command);
-      return invokeNextInterceptor(ctx, command);
-   }
-
-   // ------------------------------------------------------------------------
-   // JMX statistics
-   // ------------------------------------------------------------------------
-
-   // --------------------------------------------------------------
-
-   /**
-    * Handles a remotely originating prepare call, by creating a local transaction for the remote global transaction
-    * and replaying modifications in this new local transaction.
-    *
-    * @param ctx     invocation context
-    * @param command prepare command
-    * @return result of the prepare, typically a null.
-    * @throws Throwable in the event of problems.
-    */
-   private Object handleRemotePrepare(InvocationContext ctx, PrepareCommand command) throws Throwable
-   {
-      // the InvocationContextInterceptor would have set this for us
-      GlobalTransaction gtx = ctx.getGlobalTransaction();
-
-      // Is there a local transaction associated with GTX?  (not the current tx associated with the thread, which may be
-      // in the invocation context
-      Transaction ltx = txTable.getLocalTransaction(gtx);
-      Transaction currentTx = txManager.getTransaction();
-
-      Object retval = null;
-      boolean success = false;
-      try
-      {
-         if (ltx == null)
-         {
-            if (currentTx != null) txManager.suspend();
-            // create a new local transaction
-            ltx = createLocalTx();
-            // associate this with a global tx
-            txTable.put(ltx, gtx);
-            if (trace) log.trace("Created new tx for gtx " + gtx);
-
-            if (log.isDebugEnabled())
-               log.debug("Started new local tx as result of remote prepare: local tx=" + ltx + " (status=" + ltx.getStatus() + "), gtx=" + gtx);
-         }
-         else
-         {
-            //this should be valid
-            if (!TransactionTable.isValid(ltx))
-               throw new CacheException("Transaction " + ltx + " not in correct state to be prepared");
-
-            //associate this thread with the local transaction associated with the global transaction, IF the localTx is NOT the current tx.
-            if (currentTx == null || !ltx.equals(currentTx))
-            {
-               if (trace) log.trace("Suspending current tx " + currentTx);
-               txManager.suspend();
-               txManager.resume(ltx);
-            }
-         }
-         if (trace) log.trace("Resuming existing tx " + ltx + ", global tx=" + gtx);
-
-         // at this point we have a non-null ltx
-
-         // Asssociate the local TX with the global TX. Create new
-         // transactionContext for TX in txTable, the modifications
-         // below will need this transactionContext to add their modifications
-         // under the GlobalTx key
-         TransactionContext transactionContext = txTable.get(gtx);
-         if (transactionContext == null)
-         {
-            // create a new transaction transactionContext
-            if (log.isDebugEnabled()) log.debug("creating new tx transactionContext");
-            transactionContext = contextFactory.createTransactionContext(ltx);
-            txTable.put(gtx, transactionContext);
-         }
-
-         setTransactionalContext(ltx, gtx, transactionContext, ctx);
-
-         // register a sync handler for this tx.
-         registerHandler(ltx, new RemoteSynchronizationHandler(gtx, ltx, transactionContext), ctx);
-
-         success = false;
-
-         // replay modifications
-         replayModifications(ctx, ltx, command);
-
-         success = true; // no exceptions were thrown above!!
-
-         // now pass the prepare command up the chain as well.
-         if (command.isOnePhaseCommit())
-         {
-            if (trace)
-               log.trace("Using one-phase prepare.  Not propagating the prepare call up the stack until called to do so by the sync handler.");
-         }
-         else
-         {
-            // now pass up the prepare method itself.
-            invokeNextInterceptor(ctx, command);
-         }
-         // JBCACHE-361 Confirm that the transaction is ACTIVE
-         assertTxIsStillValid(ltx);
-      }
-      finally
-      {
-         // if we are running a one-phase commit, perform a commit or rollback now.
-         if (trace) log.trace("Are we running a 1-phase commit? " + command.isOnePhaseCommit());
-
-         if (command.isOnePhaseCommit())
-         {
-            try
-            {
-               if (success)
-               {
-                  ltx.commit();
-               }
-               else
-               {
-                  ltx.rollback();
-               }
-            }
-            catch (Throwable t)
-            {
-               log.error("Commit/rollback failed.", t);
-               if (success)
-               {
-                  // try another rollback...
-                  try
-                  {
-                     log.info("Attempting anotehr rollback");
-                     //invokeOnePhaseCommitMethod(globalTransaction, modifications.size() > 0, false);
-                     ltx.rollback();
-                  }
-                  catch (Throwable t2)
-                  {
-                     log.error("Unable to rollback", t2);
-                  }
-               }
-            }
-            finally
-            {
-               transactions.remove(ltx);// JBAS-298
-            }
-         }
-
-         txManager.suspend();// suspends ltx - could be null
-         // resume whatever else we had going.
-         if (currentTx != null) txManager.resume(currentTx);
-         if (log.isDebugEnabled()) log.debug("Finished remote prepare " + gtx);
-      }
-
-      return retval;
-   }
-
-   private ReplicableCommand attachGlobalTransaction(InvocationContext ctx, Transaction tx, VisitableCommand command) throws Throwable
-   {
-      if (trace)
-      {
-         log.trace(" local transaction exists - registering global tx if not present for " + Thread.currentThread());
-      }
-      if (trace)
-      {
-         GlobalTransaction tempGtx = txTable.get(tx);
-         log.trace("Associated gtx in txTable is " + tempGtx);
-      }
-
-      // register a sync handler for this tx - only if the globalTransaction is not remotely initiated.
-      GlobalTransaction gtx = registerTransaction(tx, ctx);
-      if (gtx == null)
-      {
-         // get the current globalTransaction from the txTable.
-         gtx = txTable.get(tx);
-      }
-
-      // make sure we attach this globalTransaction to the invocation context.
-      ctx.setGlobalTransaction(gtx);
-
-      return command;
-   }
-
-   /**
-    * Replays modifications
-    */
-   protected void replayModifications(InvocationContext ctx, Transaction ltx, PrepareCommand command) throws Throwable
-   {
-      try
-      {
-         // replay modifications
-         for (VisitableCommand modification : command.getModifications())
-         {
-            invokeNextInterceptor(ctx, modification);
-            assertTxIsStillValid(ltx);
-         }
-      }
-      catch (Throwable th)
-      {
-         log.error("prepare failed!", th);
-         throw th;
-      }
-   }
-
-   private void resumeTransactionOnCompletion(InvocationContext ctx, Transaction currentTx)
-         throws SystemException, InvalidTransactionException
-   {
-      if (trace) log.trace("Resuming suspended transaction " + currentTx);
-      txManager.suspend();
-      if (currentTx != null)
-      {
-         txManager.resume(currentTx);
-         ctx.setTransaction(currentTx);
-      }
-   }
-
-   /**
-    * Handles a commit or a rollback.  Called by the synch handler.  Simply tests that we are in the correct tx and
-    * passes the meth call up the interceptor chain.
-    *
-    * @throws Throwable
-    */
-   @SuppressWarnings("deprecation")
-   private Object handleCommitRollback(InvocationContext ctx, VisitableCommand command) throws Throwable
-   {
-      GlobalTransaction gtx = ctx.getGlobalTransaction();
-      Object result;
-      VisitableCommand originalCommand = ctx.getCommand();
-      ctx.setCommand(command);
-      try
-      {
-         result = invokeNextInterceptor(ctx, command);
-      }
-      finally
-      {
-         ctx.setCommand(originalCommand);
-      }
-      if (log.isDebugEnabled()) log.debug("Finished local commit/rollback method for " + gtx);
-      return result;
-   }
-
-   // --------------------------------------------------------------
-   //   Transaction phase runners
-   // --------------------------------------------------------------
-
-   protected PrepareCommand buildPrepareCommand(GlobalTransaction gtx, List modifications, boolean onePhaseCommit)
-   {
-      return commandsFactory.buildPrepareCommand(gtx, modifications, rpcManager.getLocalAddress(), onePhaseCommit);
-   }
-
-   /**
-    * creates a commit()
-    */
-   protected void runCommitPhase(InvocationContext ctx, GlobalTransaction gtx, List modifications, boolean onePhaseCommit)
-   {
-      try
-      {
-         VisitableCommand commitCommand = onePhaseCommit ? buildPrepareCommand(gtx, modifications, true) : commandsFactory.buildCommitCommand(gtx);
-
-         if (trace) log.trace("Running commit for " + gtx);
-
-         handleCommitRollback(ctx, commitCommand);
-      }
-      catch (Throwable e)
-      {
-         log.warn("Commit failed.  Clearing stale locks.");
-         try
-         {
-            cleanupStaleLocks(ctx);
-         }
-         catch (RuntimeException re)
-         {
-            log.error("Unable to clear stale locks", re);
-            throw re;
-         }
-         catch (Throwable e2)
-         {
-            log.error("Unable to clear stale locks", e2);
-            throw new RuntimeException(e2);
-         }
-         if (e instanceof RuntimeException)
-            throw (RuntimeException) e;
-         else
-            throw new RuntimeException("Commit failed.", e);
-      }
-   }
-
-   protected void cleanupStaleLocks(InvocationContext ctx) throws Throwable
-   {
-      TransactionContext transactionContext = ctx.getTransactionContext();
-      if (transactionContext != null) lockManager.unlock(ctx);
-   }
-
-   /**
-    * creates a rollback()
-    */
-   protected void runRollbackPhase(InvocationContext ctx, GlobalTransaction gtx, Transaction tx)
-   {
-      try
-      {
-         // JBCACHE-457
-         VisitableCommand rollbackCommand = commandsFactory.buildRollbackCommand(gtx);
-         if (trace) log.trace(" running rollback for " + gtx);
-
-         //JBCACHE-359 Store a lookup for the globalTransaction so a listener
-         // callback can find it
-         rollbackTransactions.put(tx, gtx);
-
-         handleCommitRollback(ctx, rollbackCommand);
-      }
-      catch (Throwable e)
-      {
-         log.warn("Rollback had a problem", e);
-      }
-      finally
-      {
-         rollbackTransactions.remove(tx);
-      }
-   }
-
-   private boolean isOnePhaseCommit()
-   {
-      if (!configuration.getCacheMode().isSynchronous())
-      {
-         // this is a REPL_ASYNC call - do 1-phase commit.  break!
-         if (trace) log.trace("This is a REPL_ASYNC call (1 phase commit) - do nothing for beforeCompletion()");
-         return true;
-      }
-      return false;
-   }
-
-   /**
-    * Handles a local prepare - invoked by the sync handler.  Tests if the current tx matches the gtx passed in to the
-    * method call and passes the prepare() call up the chain.
-    */
-   @SuppressWarnings("deprecation")
-   public Object runPreparePhase(InvocationContext ctx, GlobalTransaction gtx, List<VisitableCommand> modifications) throws Throwable
-   {
-      // running a 2-phase commit.
-      VisitableCommand prepareCommand = buildPrepareCommand(gtx, modifications, false);
-
-      Object result;
-
-      // Is there a local transaction associated with GTX ?
-      Transaction ltx = ctx.getTransaction();
-
-      //if ltx is not null and it is already running
-      Transaction currentTransaction = txManager.getTransaction();
-      if (currentTransaction != null && ltx != null && currentTransaction.equals(ltx))
-      {
-         VisitableCommand originalCommand = ctx.getCommand();
-         ctx.setCommand(prepareCommand);
-         try
-         {
-            result = invokeNextInterceptor(ctx, prepareCommand);
-         }
-         finally
-         {
-            ctx.setCommand(originalCommand);
-         }
-      }
-      else
-      {
-         log.warn("Local transaction does not exist or does not match expected transaction " + gtx);
-         throw new CacheException(" local transaction " + ltx + " does not exist or does not match expected transaction " + gtx);
-      }
-      return result;
-   }
-
-   // --------------------------------------------------------------
-   //   Private helper methods
-   // --------------------------------------------------------------
-
-   protected void assertTxIsStillValid(Transaction tx)
-   {
-      if (!TransactionTable.isActive(tx))
-      {
-         try
-         {
-            throw new ReplicationException("prepare() failed -- local transaction status is not STATUS_ACTIVE; is " + tx.getStatus());
-         }
-         catch (SystemException e)
-         {
-            throw new ReplicationException("prepare() failed -- local transaction status is not STATUS_ACTIVE; Unable to retrieve transaction status.");
-         }
-      }
-   }
-
-   /**
-    * Creates a gtx (if one doesnt exist), a sync handler, and registers the tx.
-    */
-   private GlobalTransaction registerTransaction(Transaction tx, InvocationContext ctx) throws Exception
-   {
-      GlobalTransaction gtx;
-
-      if (TransactionTable.isValid(tx) && transactions.add(tx))
-      {
-         gtx = txTable.getCurrentTransaction(tx, true);
-         TransactionContext transactionContext;
-         if (ctx.getGlobalTransaction() == null)
-         {
-            ctx.setGlobalTransaction(gtx);
-            transactionContext = txTable.get(gtx);
-            ctx.setTransactionContext(transactionContext);
-         }
-         else
-         {
-            transactionContext = ctx.getTransactionContext();
-         }
-         if (gtx.isRemote())
-         {
-            // should be no need to register a handler since this a remotely initiated globalTransaction
-            if (trace) log.trace("is a remotely initiated gtx so no need to register a tx for it");
-         }
-         else
-         {
-            if (trace) log.trace("Registering sync handler for tx " + tx + ", gtx " + gtx);
-
-            // see the comment in the LocalSyncHandler for the last isOriginLocal param.
-            LocalSynchronizationHandler myHandler = new LocalSynchronizationHandler(gtx, tx, transactionContext, !ctx.isOriginLocal());
-            registerHandler(tx, myHandler, ctx);
-         }
-      }
-      else if ((gtx = rollbackTransactions.get(tx)) != null)
-      {
-         if (trace) log.trace("Transaction " + tx + " is already registered and is rolling back.");
-      }
-      else
-      {
-         if (trace) log.trace("Transaction " + tx + " is already registered.");
-      }
-      return gtx;
-   }
-
-   /**
-    * Registers a sync hander against a tx.
-    */
-   private void registerHandler(Transaction tx, Synchronization handler, InvocationContext ctx) throws Exception
-   {
-      OrderedSynchronizationHandler orderedHandler = ctx.getTransactionContext().getOrderedSynchronizationHandler(); //OrderedSynchronizationHandler.getInstance(tx);
-
-      if (trace) log.trace("registering for TX completion: SynchronizationHandler(" + handler + ")");
-
-      orderedHandler.registerAtHead(handler);// needs to be invoked first on TX commit
-
-      notifier.notifyTransactionRegistered(tx, ctx);
-   }
-
-   /**
-    * Creates and starts a local tx
-    *
-    * @throws Exception
-    */
-   protected Transaction createLocalTx() throws Exception
-   {
-      if (trace)
-      {
-         log.trace("Creating transaction for thread " + Thread.currentThread());
-      }
-      Transaction localTx;
-      if (txManager == null) throw new Exception("Failed to create local transaction; TransactionManager is null");
-      txManager.begin();
-      localTx = txManager.getTransaction();
-      return localTx;
-   }
-
-   // ------------------------------------------------------------------------
-   // Synchronization classes
-   // ------------------------------------------------------------------------
-
-   // this controls the whole transaction
-
-   private class RemoteSynchronizationHandler implements Synchronization
-   {
-      Transaction tx = null;
-      GlobalTransaction gtx = null;
-      List<VisitableCommand> modifications = null;
-      TransactionContext transactionContext = null;
-      protected InvocationContext ctx; // the context for this call.
-
-      RemoteSynchronizationHandler(GlobalTransaction gtx, Transaction tx, TransactionContext entry)
-      {
-         this.gtx = gtx;
-         this.tx = tx;
-         this.transactionContext = entry;
-      }
-
-      public void beforeCompletion()
-      {
-         if (trace) log.trace("Running beforeCompletion on gtx " + gtx);
-
-         if (transactionContext == null)
-         {
-            log.error("Transaction has a null transaction entry - beforeCompletion() will fail.");
-            throw new IllegalStateException("cannot find transaction entry for " + gtx);
-         }
-
-         modifications = transactionContext.getModifications();
-         ctx = invocationContextContainer.get();
-         setTransactionalContext(tx, gtx, transactionContext, ctx);
-
-         if (ctx.isOptionsUninitialised() && transactionContext.getOption() != null)
-            ctx.setOptionOverrides(transactionContext.getOption());
-
-         assertCanContinue();
-
-         ctx.setOriginLocal(false);
-      }
-
-      // this should really not be done here -
-      // it is supposed to be post commit not actually run the commit
-      public void afterCompletion(int status)
-      {
-         // could happen if a rollback is called and beforeCompletion() doesn't get called.
-         if (ctx == null)
-         {
-            ctx = invocationContextContainer.get();
-            setTransactionalContext(tx, gtx, transactionContext, ctx);
-
-            if (ctx.isOptionsUninitialised() && transactionContext != null && transactionContext.getOption() != null)
-            {
-               // use the options from the transaction entry instead
-               ctx.setOptionOverrides(transactionContext.getOption());
-            }
-         }
-
-         try
-         {
-            assertCanContinue();
-
-            try
-            {
-               if (txManager.getTransaction() != null && !txManager.getTransaction().equals(tx)) txManager.resume(tx);
-            }
-            catch (Exception e)
-            {
-               log.error("afterCompletion error: " + status, e);
-            }
-
-            if (trace) log.trace("calling aftercompletion for " + gtx);
-
-            // set any transaction wide options as current for this thread.
-            if (transactionContext != null)
-            {
-               // this should ideally be set in beforeCompletion(), after compacting the list.
-               if (modifications == null) modifications = transactionContext.getModifications();
-               ctx.setOptionOverrides(transactionContext.getOption());
-            }
-            if (tx != null) transactions.remove(tx);
-
-            switch (status)
-            {
-               case Status.STATUS_COMMITTED:
-                  boolean onePhaseCommit = isOnePhaseCommit();
-                  if (log.isDebugEnabled()) log.debug("Running commit phase.  One phase? " + onePhaseCommit);
-                  runCommitPhase(ctx, gtx, modifications, onePhaseCommit);
-                  log.debug("Finished commit phase");
-                  break;
-               case Status.STATUS_UNKNOWN:
-                  log.warn("Received JTA STATUS_UNKNOWN in afterCompletion()!  XA resources may not be in sync.  The app should manually clean up resources at this point.");
-               case Status.STATUS_MARKED_ROLLBACK:
-               case Status.STATUS_ROLLEDBACK:
-                  log.debug("Running rollback phase");
-                  runRollbackPhase(ctx, gtx, tx);
-                  log.debug("Finished rollback phase");
-                  break;
-
-               default:
-                  throw new IllegalStateException("illegal status: " + status);
-            }
-         }
-         catch (Exception th)
-         {
-            log.trace("Caught exception ", th);
-
-         }
-         finally
-         {
-            // clean up the tx table
-            txTable.remove(gtx);
-            txTable.remove(tx);
-            setTransactionalContext(null, null, null, ctx);
-            cleanupInternalState();
-         }
-      }
-
-      private void assertCanContinue()
-      {
-         if (!componentRegistry.invocationsAllowed(true) && (ctx.getOptionOverrides() == null || !ctx.getOptionOverrides().isSkipCacheStatusCheck()))
-            throw new IllegalStateException("Cache not in STARTED state!");
-      }
-
-      /**
-       * Cleans out (nullifies) member variables held by the sync object for easier gc.  Could be (falsely) seen as a mem
-       * leak if the TM implementation hangs on to the synchronizations for an unnecessarily long time even after the tx
-       * completes.  See JBCACHE-1007.
-       */
-      private void cleanupInternalState()
-      {
-         tx = null;
-         gtx = null;
-         modifications = null;
-         if (transactionContext != null) transactionContext.reset();
-         transactionContext = null;
-      }
-
-      @Override
-      public String toString()
-      {
-         return "TxInterceptor.RemoteSynchronizationHandler(gtx=" + gtx + ", tx=" + getTxAsString() + ")";
-      }
-
-      protected String getTxAsString()
-      {
-         // JBCACHE-1114 -- don't call toString() on tx or it can lead to stack overflow
-         if (tx == null)
-            return null;
-
-         return tx.getClass().getName() + "@" + System.identityHashCode(tx);
-      }
-   }
-
-   private class LocalSynchronizationHandler extends RemoteSynchronizationHandler
-   {
-      private boolean localRollbackOnly = true;
-      // a VERY strange situation where a tx has remote origins, but since certain buddy group org methods perform local
-      // cleanups even when remotely triggered, and optimistic locking is used, you end up with an implicit local tx.
-      // This is STILL remotely originating though and this needs to be made explicit here.
-      // this can be checked by inspecting the InvocationContext.isOriginLocal() at the time of registering the sync.
-      private boolean remoteLocal = false;
-      private Option originalOptions, transactionalOptions;
-
-      /**
-       * A Synchronization for locally originating txs.
-       * <p/>
-       * a VERY strange situation where a tx has remote origins, but since certain buddy group org methods perform local
-       * cleanups even when remotely triggered, and optimistic locking is used, you end up with an implicit local tx.
-       * This is STILL remotely originating though and this needs to be made explicit here.
-       * this can be checked by inspecting the InvocationContext.isOriginLocal() at the time of registering the sync.
-       *
-       * @param gtx
-       * @param tx
-       * @param remoteLocal
-       */
-      LocalSynchronizationHandler(GlobalTransaction gtx, Transaction tx, TransactionContext transactionContext, boolean remoteLocal)
-      {
-         super(gtx, tx, transactionContext);
-         this.remoteLocal = remoteLocal;
-      }
-
-      @Override
-      public void beforeCompletion()
-      {
-         super.beforeCompletion();
-         ctx.setOriginLocal(!remoteLocal); // this is the LOCAL sync handler after all!
-         // fetch the modifications before the transaction is committed
-         // (and thus removed from the txTable)
-         setTransactionalContext(tx, gtx, transactionContext, ctx);
-         if (!transactionContext.hasModifications())
-         {
-            if (trace) log.trace("No modifications in this tx.  Skipping beforeCompletion()");
-            modifications = Collections.emptyList();
-            return;
-         }
-         else
-         {
-            modifications = transactionContext.getModifications();
-         }
-
-         // set any transaction wide options as current for this thread, caching original options that would then be reset
-         originalOptions = ctx.getOptionOverrides();
-         transactionalOptions = transactionContext.getOption();
-         ctx.setOptionOverrides(transactionalOptions);
-
-         try
-         {
-            switch (tx.getStatus())
-            {
-               // if we are active or preparing then we can go ahead
-               case Status.STATUS_ACTIVE:
-               case Status.STATUS_PREPARING:
-                  // run a prepare call.
-
-                  Object result = isOnePhaseCommit() ? null : runPreparePhase(ctx, gtx, modifications);
-
-                  if (result instanceof Throwable)
-                  {
-                     if (log.isDebugEnabled())
-                        log.debug("Transaction needs to be rolled back - the cache returned an instance of Throwable for this prepare call (tx=" + tx + " and gtx=" + gtx + ")", (Throwable) result);
-                     tx.setRollbackOnly();
-                     throw (Throwable) result;
-                  }
-                  break;
-               default:
-                  throw new CacheException("transaction " + tx + " in status " + tx.getStatus() + " unable to start transaction");
-            }
-         }
-         catch (Throwable t)
-         {
-            if (log.isWarnEnabled()) log.warn("Caught exception, will now set transaction to roll back", t);
-            try
-            {
-               tx.setRollbackOnly();
-            }
-            catch (SystemException se)
-            {
-               throw new RuntimeException("setting tx rollback failed ", se);
-            }
-            if (t instanceof RuntimeException)
-               throw (RuntimeException) t;
-            else
-               throw new RuntimeException("", t);
-         }
-         finally
-         {
-            localRollbackOnly = false;
-            setTransactionalContext(null, null, null, ctx);
-            ctx.setOptionOverrides(originalOptions);
-         }
-      }
-
-      @Override
-      public void afterCompletion(int status)
-      {
-         // could happen if a rollback is called and beforeCompletion() doesn't get called.
-         if (ctx == null) ctx = invocationContextContainer.get();
-         ctx.setLocalRollbackOnly(localRollbackOnly);
-         setTransactionalContext(tx, gtx, transactionContext, ctx);
-         if (transactionalOptions != null) ctx.setOptionOverrides(transactionalOptions);
-         try
-         {
-            super.afterCompletion(status);
-         }
-         finally
-         {
-            ctx.setOptionOverrides(originalOptions);
-         }
-      }
-
-      @Override
-      public String toString()
-      {
-         return "TxInterceptor.LocalSynchronizationHandler(gtx=" + gtx + ", tx=" + getTxAsString() + ")";
-      }
-   }
-
-   @ManagedOperation
-   public void resetStatistics()
-   {
-      prepares = 0;
-      commits = 0;
-      rollbacks = 0;
-   }
-
-   @ManagedOperation
-   public Map<String, Object> dumpStatistics()
-   {
-      Map<String, Object> retval = new HashMap<String, Object>(3);
-      retval.put("Prepares", prepares);
-      retval.put("Commits", commits);
-      retval.put("Rollbacks", rollbacks);
-      return retval;
-   }
-
-   @ManagedAttribute
-   public boolean getStatisticsEnabled()
-   {
-      return this.statsEnabled;
-   }
-
-   @ManagedAttribute
-   public void setStatisticsEnabled(boolean enabled)
-   {
-      this.statsEnabled = enabled;
-   }
-
-   @ManagedAttribute(description = "number of transaction prepares")
-   public long getPrepares()
-   {
-      return prepares;
-   }
-
-   @ManagedAttribute(description = "number of transaction commits")
-   public long getCommits()
-   {
-      return commits;
-   }
-
-   @ManagedAttribute(description = "number of transaction rollbacks")
-   public long getRollbacks()
-   {
-      return rollbacks;
-   }
-}
\ No newline at end of file

Deleted: core/branches/flat/src/main/java/org/jboss/starobrno/UnsortedDataContainer.java
===================================================================
--- core/branches/flat/src/main/java/org/jboss/starobrno/UnsortedDataContainer.java	2008-10-09 12:52:15 UTC (rev 6896)
+++ core/branches/flat/src/main/java/org/jboss/starobrno/UnsortedDataContainer.java	2008-10-09 13:22:09 UTC (rev 6897)
@@ -1,78 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2008, Red Hat Middleware LLC, and individual contributors
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-package org.jboss.starobrno;
-
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-
-/**
- * // TODO: crappy and inefficient - but just a placeholder for now.
- *
- * @author Manik Surtani (<a href="mailto:manik at jboss.org">manik at jboss.org</a>)
- * @since 3.0
- */
-public class UnsortedDataContainer<K, V> implements DataContainer<K, V>
-{
-   private final ConcurrentMap<K, V> data = new ConcurrentHashMap<K, V>();
-
-   public Set<Entry<K, V>> getEntries()
-   {
-      return data.entrySet();
-   }
-
-   public Entry<K, V> getEntry(K k)
-   {
-      if (k == null) throw new NullPointerException("I don't like nulls!");
-      for (Entry<K, V> e : data.entrySet())
-      {
-         if (k.equals(e.getKey())) return e;
-      }
-      return null;
-   }
-
-   public void putEntry(Entry<K, V> kvEntry)
-   {
-      data.put(kvEntry.getKey(), kvEntry.getValue());
-   }
-
-   public boolean exists(Entry<K, V> kvEntry)
-   {
-      return data.containsKey(kvEntry.getKey());
-   }
-
-   public int size()
-   {
-      return data.size();
-   }
-
-   public void clear()
-   {
-      data.clear();
-   }
-
-   public void removeEntry(K key)
-   {
-      data.remove(key);
-   }
-}

Modified: core/branches/flat/src/main/java/org/jboss/starobrno/commands/read/GetKeyValueCommand.java
===================================================================
--- core/branches/flat/src/main/java/org/jboss/starobrno/commands/read/GetKeyValueCommand.java	2008-10-09 12:52:15 UTC (rev 6896)
+++ core/branches/flat/src/main/java/org/jboss/starobrno/commands/read/GetKeyValueCommand.java	2008-10-09 13:22:09 UTC (rev 6897)
@@ -24,8 +24,8 @@
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.jboss.starobrno.commands.Visitor;
+import org.jboss.starobrno.container.MVCCEntry;
 import org.jboss.starobrno.context.InvocationContext;
-import org.jboss.starobrno.mvcc.MVCCEntry;
 import org.jboss.starobrno.notifications.Notifier;
 
 /**

Modified: core/branches/flat/src/main/java/org/jboss/starobrno/commands/write/ClearCommand.java
===================================================================
--- core/branches/flat/src/main/java/org/jboss/starobrno/commands/write/ClearCommand.java	2008-10-09 12:52:15 UTC (rev 6896)
+++ core/branches/flat/src/main/java/org/jboss/starobrno/commands/write/ClearCommand.java	2008-10-09 13:22:09 UTC (rev 6897)
@@ -23,8 +23,8 @@
 
 import org.jboss.starobrno.commands.VisitableCommand;
 import org.jboss.starobrno.commands.Visitor;
+import org.jboss.starobrno.container.MVCCEntry;
 import org.jboss.starobrno.context.InvocationContext;
-import org.jboss.starobrno.mvcc.MVCCEntry;
 
 /**
  * @author Mircea.Markus at jboss.com

Modified: core/branches/flat/src/main/java/org/jboss/starobrno/commands/write/EvictCommand.java
===================================================================
--- core/branches/flat/src/main/java/org/jboss/starobrno/commands/write/EvictCommand.java	2008-10-09 12:52:15 UTC (rev 6896)
+++ core/branches/flat/src/main/java/org/jboss/starobrno/commands/write/EvictCommand.java	2008-10-09 13:22:09 UTC (rev 6897)
@@ -23,8 +23,8 @@
 
 import org.jboss.starobrno.commands.Visitor;
 import org.jboss.starobrno.commands.read.AbstractDataCommand;
+import org.jboss.starobrno.container.MVCCEntry;
 import org.jboss.starobrno.context.InvocationContext;
-import org.jboss.starobrno.mvcc.MVCCEntry;
 
 /**
  * @author Mircea.Markus at jboss.com

Modified: core/branches/flat/src/main/java/org/jboss/starobrno/commands/write/PutKeyValueCommand.java
===================================================================
--- core/branches/flat/src/main/java/org/jboss/starobrno/commands/write/PutKeyValueCommand.java	2008-10-09 12:52:15 UTC (rev 6896)
+++ core/branches/flat/src/main/java/org/jboss/starobrno/commands/write/PutKeyValueCommand.java	2008-10-09 13:22:09 UTC (rev 6897)
@@ -23,8 +23,8 @@
 
 import org.jboss.starobrno.commands.Visitor;
 import org.jboss.starobrno.commands.read.AbstractDataCommand;
+import org.jboss.starobrno.container.MVCCEntry;
 import org.jboss.starobrno.context.InvocationContext;
-import org.jboss.starobrno.mvcc.MVCCEntry;
 
 /**
  * Implements functionality defined by {@link Cache#put(org.jboss.cache.Fqn, Object, Object)}.

Modified: core/branches/flat/src/main/java/org/jboss/starobrno/commands/write/PutMapCommand.java
===================================================================
--- core/branches/flat/src/main/java/org/jboss/starobrno/commands/write/PutMapCommand.java	2008-10-09 12:52:15 UTC (rev 6896)
+++ core/branches/flat/src/main/java/org/jboss/starobrno/commands/write/PutMapCommand.java	2008-10-09 13:22:09 UTC (rev 6897)
@@ -23,8 +23,8 @@
 
 import org.jboss.starobrno.commands.VisitableCommand;
 import org.jboss.starobrno.commands.Visitor;
+import org.jboss.starobrno.container.MVCCEntry;
 import org.jboss.starobrno.context.InvocationContext;
-import org.jboss.starobrno.mvcc.MVCCEntry;
 
 import java.util.Map;
 import java.util.Map.Entry;

Modified: core/branches/flat/src/main/java/org/jboss/starobrno/commands/write/RemoveCommand.java
===================================================================
--- core/branches/flat/src/main/java/org/jboss/starobrno/commands/write/RemoveCommand.java	2008-10-09 12:52:15 UTC (rev 6896)
+++ core/branches/flat/src/main/java/org/jboss/starobrno/commands/write/RemoveCommand.java	2008-10-09 13:22:09 UTC (rev 6897)
@@ -23,8 +23,8 @@
 
 import org.jboss.starobrno.commands.Visitor;
 import org.jboss.starobrno.commands.read.AbstractDataCommand;
+import org.jboss.starobrno.container.MVCCEntry;
 import org.jboss.starobrno.context.InvocationContext;
-import org.jboss.starobrno.mvcc.MVCCEntry;
 
 
 /**

Modified: core/branches/flat/src/main/java/org/jboss/starobrno/commands/write/ReplaceCommand.java
===================================================================
--- core/branches/flat/src/main/java/org/jboss/starobrno/commands/write/ReplaceCommand.java	2008-10-09 12:52:15 UTC (rev 6896)
+++ core/branches/flat/src/main/java/org/jboss/starobrno/commands/write/ReplaceCommand.java	2008-10-09 13:22:09 UTC (rev 6897)
@@ -23,8 +23,8 @@
 
 import org.jboss.starobrno.commands.Visitor;
 import org.jboss.starobrno.commands.read.AbstractDataCommand;
+import org.jboss.starobrno.container.MVCCEntry;
 import org.jboss.starobrno.context.InvocationContext;
-import org.jboss.starobrno.mvcc.MVCCEntry;
 
 
 /**

Modified: core/branches/flat/src/main/java/org/jboss/starobrno/container/DataContainer.java
===================================================================
--- core/branches/flat/src/main/java/org/jboss/starobrno/container/DataContainer.java	2008-10-09 12:52:15 UTC (rev 6896)
+++ core/branches/flat/src/main/java/org/jboss/starobrno/container/DataContainer.java	2008-10-09 13:22:09 UTC (rev 6897)
@@ -21,7 +21,6 @@
  */
 package org.jboss.starobrno.container;
 
-import java.util.Map.Entry;
 import java.util.Set;
 
 /**
@@ -32,17 +31,17 @@
  */
 public interface DataContainer<K, V>
 {
-   Set<Entry<K, V>> getEntries();
+   V get(K k);
 
-   Entry<K, V> getEntry(K k);
+   void put(K k, V v);
 
-   void putEntry(Entry<K, V> entry);
+   boolean containsKey(K k);
 
-   boolean exists(Entry<K, V> entry);
+   V remove(K k);
 
    int size();
 
    void clear();
 
-   void removeEntry(K key);
+   Set<K> keySet();
 }

Copied: core/branches/flat/src/main/java/org/jboss/starobrno/container/MVCCEntry.java (from rev 6894, core/branches/flat/src/main/java/org/jboss/starobrno/mvcc/MVCCEntry.java)
===================================================================
--- core/branches/flat/src/main/java/org/jboss/starobrno/container/MVCCEntry.java	                        (rev 0)
+++ core/branches/flat/src/main/java/org/jboss/starobrno/container/MVCCEntry.java	2008-10-09 13:22:09 UTC (rev 6897)
@@ -0,0 +1,57 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.starobrno.container;
+
+import org.jboss.starobrno.context.InvocationContext;
+
+import java.util.Map.Entry;
+
+/**
+ * // TODO: MANIK: Document this
+ *
+ * @author Manik Surtani (<a href="mailto:manik at jboss.org">manik at jboss.org</a>)
+ * @since 3.0
+ */
+public interface MVCCEntry extends Entry
+{
+   boolean isNullEntry();
+
+   void copyForUpdate(DataContainer container, boolean writeSkewCheck);
+
+   void commitUpdate(InvocationContext ctx, DataContainer container);
+
+   void rollbackUpdate();
+
+   boolean isChanged();
+
+   boolean isCreated();
+
+   void setCreated(boolean created);
+
+   boolean isDeleted();
+
+   void setDeleted(boolean deleted);
+
+   boolean isValid();
+
+   void setValid(boolean valid);
+}

Copied: core/branches/flat/src/main/java/org/jboss/starobrno/container/MVCCEntryCreator.java (from rev 6895, core/branches/flat/src/main/java/org/jboss/starobrno/mvcc/MVCCEntryWrapper.java)
===================================================================
--- core/branches/flat/src/main/java/org/jboss/starobrno/container/MVCCEntryCreator.java	                        (rev 0)
+++ core/branches/flat/src/main/java/org/jboss/starobrno/container/MVCCEntryCreator.java	2008-10-09 13:22:09 UTC (rev 6897)
@@ -0,0 +1,179 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.starobrno.container;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.jboss.cache.lock.TimeoutException;
+import org.jboss.starobrno.config.Configuration;
+import org.jboss.starobrno.context.InvocationContext;
+import org.jboss.starobrno.factories.EntryFactory;
+import org.jboss.starobrno.factories.annotations.Inject;
+import org.jboss.starobrno.factories.annotations.Start;
+import org.jboss.starobrno.lock.LockManager;
+
+/**
+ * Wraps mvcc entries.
+ *
+ * @author Manik Surtani (<a href="mailto:manik at jboss.org">manik at jboss.org</a>)
+ * @since 3.0
+ */
+public class MVCCEntryCreator
+{
+   DataContainer container;
+   boolean writeSkewCheck;
+   LockManager lockManager;
+   Configuration configuration;
+   long defaultLockAcquisitionTimeout;
+   EntryFactory entryFactory;
+
+   private static final Log log = LogFactory.getLog(MVCCEntryCreator.class);
+   private static final boolean trace = log.isTraceEnabled();
+
+
+   @Inject
+   public void injectDependencies(DataContainer dataContainer, LockManager lockManager, Configuration configuration, EntryFactory entryFactory)
+   {
+      this.container = dataContainer;
+      this.configuration = configuration;
+      this.lockManager = lockManager;
+      this.entryFactory = entryFactory;
+   }
+
+   @Start
+   public void start()
+   {
+      defaultLockAcquisitionTimeout = configuration.getLockAcquisitionTimeout();
+      writeSkewCheck = configuration.isWriteSkewCheck();
+   }
+
+   public MVCCEntry wrapEntryForReading(InvocationContext ctx, Object key, boolean putInContext) throws InterruptedException
+   {
+      return wrapEntryForReading(ctx, key, putInContext, false);
+   }
+
+   public MVCCEntry wrapEntryForReading(InvocationContext ctx, Object key, boolean putInContext, boolean forceWriteLock) throws InterruptedException
+   {
+      // TODO: Do we need to wrap for reading if we are not in a TX?
+      // TODO: Also, do we need to wrap for reading even IN a TX if we are using read-committed?
+
+      MVCCEntry mvccEntry;
+      if (forceWriteLock)
+      {
+         if (trace) log.trace("Forcing lock on reading key " + key);
+         return wrapEntryForWriting(ctx, key, false, false);
+      }
+      else if ((mvccEntry = ctx.lookupEntry(key)) == null)
+      {
+         if (trace) log.trace("Key " + key + " is not in context, fetching from container.");
+         // simple implementation.  Peek the node, wrap it, put wrapped node in the context.
+         Object value = container.get(key);
+         mvccEntry = entryFactory.createWrappedEntry(key, value, false);
+         if (mvccEntry != null && putInContext) ctx.putLookedUpEntry(mvccEntry);
+         return mvccEntry;
+      }
+      else
+      {
+         if (trace) log.trace("Key " + key + " is already in context.");
+         return mvccEntry;
+      }
+   }
+
+   public MVCCEntry wrapEntryForWriting(InvocationContext ctx, Object key, boolean createIfAbsent, boolean forceLockIfAbsent) throws InterruptedException
+   {
+      MVCCEntry mvccEntry = ctx.lookupEntry(key);
+      if (createIfAbsent && mvccEntry != null && mvccEntry.isNullEntry()) mvccEntry = null;
+      if (mvccEntry != null) // exists in context!  Just acquire lock if needed, and wrap.
+      {
+         // acquire lock if needed
+         if (acquireLock(ctx, key))
+         {
+            // create a copy of the underlying node
+            mvccEntry.copyForUpdate(container, writeSkewCheck);
+         }
+         if (trace) log.trace("Retrieving wrapped node " + key);
+         if (mvccEntry.isDeleted() && createIfAbsent)
+         {
+            if (trace) log.trace("Node is deleted in current scope.  Need to un-delete.");
+            mvccEntry.setDeleted(false);
+            mvccEntry.setValid(true);
+         }
+      }
+      else
+      {
+         // else, fetch from dataContainer.
+         Object value = container.get(key);
+         if (value != null)
+         {
+            // exists in cache!  Just acquire lock if needed, and wrap.
+            // do we need a lock?
+            boolean needToCopy = false;
+            if (acquireLock(ctx, key)) needToCopy = true;
+            mvccEntry = entryFactory.createWrappedEntry(key, value, false);
+            ctx.putLookedUpEntry(mvccEntry);
+            if (needToCopy) mvccEntry.copyForUpdate(container, writeSkewCheck);
+         }
+         else if (createIfAbsent) // else, do we need to create one?
+         {
+            // now to lock and create the node.  Lock first to prevent concurrent creation!
+            acquireLock(ctx, key);
+            mvccEntry = entryFactory.createWrappedEntry(key, value, true);
+            mvccEntry.setCreated(true);
+            ctx.putLookedUpEntry(mvccEntry);
+            mvccEntry.copyForUpdate(container, writeSkewCheck);
+         }
+      }
+
+      // see if we need to force the lock on nonexistent entries.
+      if (mvccEntry == null && forceLockIfAbsent) acquireLock(ctx, key);
+
+      return mvccEntry;
+   }
+
+   /**
+    * Attempts to lock a node if the lock isn't already held in the current scope, and records the lock in the context.
+    *
+    * @param ctx context
+    * @param fqn Fqn to lock
+    * @return true if a lock was needed and acquired, false if it didn't need to acquire the lock (i.e., lock was already held)
+    * @throws InterruptedException if interrupted
+    * @throws org.jboss.cache.lock.TimeoutException
+    *                              if we are unable to acquire the lock after a specified timeout.
+    */
+   private boolean acquireLock(InvocationContext ctx, Object key) throws InterruptedException, TimeoutException
+   {
+      // don't EVER use lockManager.isLocked() since with lock striping it may be the case that we hold the relevant
+      // lock which may be shared with another Fqn that we have a lock for already.
+      // nothing wrong, just means that we fail to record the lock.  And that is a problem.
+      // Better to check our records and lock again if necessary.
+      if (!ctx.hasLockedKey(key))
+      {
+         if (!lockManager.lockAndRecord(key, ctx))
+         {
+            Object owner = lockManager.getOwner(key);
+            throw new TimeoutException("Unable to acquire lock on key [" + key + "] after [" + ctx.getLockAcquisitionTimeout(defaultLockAcquisitionTimeout) + "] milliseconds for requestor [" + lockManager.getLockOwner(ctx) + "]! Lock held by [" + owner + "]");
+         }
+         return true;
+      }
+      return false;
+   }
+}

Copied: core/branches/flat/src/main/java/org/jboss/starobrno/container/NullMarkerEntry.java (from rev 6895, core/branches/flat/src/main/java/org/jboss/starobrno/mvcc/NullMarkerEntry.java)
===================================================================
--- core/branches/flat/src/main/java/org/jboss/starobrno/container/NullMarkerEntry.java	                        (rev 0)
+++ core/branches/flat/src/main/java/org/jboss/starobrno/container/NullMarkerEntry.java	2008-10-09 13:22:09 UTC (rev 6897)
@@ -0,0 +1,68 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2000 - 2008, Red Hat Middleware LLC, and individual contributors
+ * as indicated by the @author tags. See the copyright.txt file in the
+ * distribution for a full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.starobrno.container;
+
+/**
+ * A marker node to represent a null node for repeatable read, so that a read that returns a null can continue to return
+ * null.
+ *
+ * @author Manik Surtani (<a href="mailto:manik at jboss.org">manik at jboss.org</a>)
+ * @since 3.0
+ */
+public class NullMarkerEntry extends ReadCommittedEntry
+{
+   /**
+    * @return always returns true
+    */
+   @Override
+   public boolean isNullEntry()
+   {
+      return true;
+   }
+
+   /**
+    * @return always returns true so that any get commands, upon getting this node, will ignore the node as though it were removed.
+    */
+   @Override
+   public boolean isDeleted()
+   {
+      return true;
+   }
+
+   /**
+    * @return always returns true so that any get commands, upon getting this node, will ignore the node as though it were invalid.
+    */
+   @Override
+   public boolean isValid()
+   {
+      return false;
+   }
+
+   /**
+    * A no-op.
+    */
+   @Override
+   public void copyForUpdate(DataContainer d, boolean b)
+   {
+      // no op
+   }
+}
\ No newline at end of file

Copied: core/branches/flat/src/main/java/org/jboss/starobrno/container/ReadCommittedEntry.java (from rev 6895, core/branches/flat/src/main/java/org/jboss/starobrno/mvcc/ReadCommittedEntry.java)
===================================================================
--- core/branches/flat/src/main/java/org/jboss/starobrno/container/ReadCommittedEntry.java	                        (rev 0)
+++ core/branches/flat/src/main/java/org/jboss/starobrno/container/ReadCommittedEntry.java	2008-10-09 13:22:09 UTC (rev 6897)
@@ -0,0 +1,205 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2000 - 2008, Red Hat Middleware LLC, and individual contributors
+ * as indicated by the @author tags. See the copyright.txt file in the
+ * distribution for a full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.starobrno.container;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import static org.jboss.starobrno.container.ReadCommittedEntry.Flags.*;
+import org.jboss.starobrno.context.InvocationContext;
+
+/**
+ * A node delegate that encapsulates read committed semantics when writes are initiated, committed or rolled back.
+ *
+ * @author Manik Surtani (<a href="mailto:manik at jboss.org">manik at jboss.org</a>)
+ * @since 3.0
+ */
+public class ReadCommittedEntry implements MVCCEntry
+{
+   private static final Log log = LogFactory.getLog(ReadCommittedEntry.class);
+   private static final boolean trace = log.isTraceEnabled();
+
+   protected Object key, value, oldValue;
+   protected byte flags = 0;
+
+
+   protected ReadCommittedEntry()
+   {
+      setValid(true);
+   }
+
+   public ReadCommittedEntry(Object key, Object value)
+   {
+      setValid(true);
+      this.key = key;
+      this.value = value;
+   }
+
+   public Object getKey()
+   {
+      return key;
+   }
+
+   public Object getValue()
+   {
+      return value;
+   }
+
+   public Object setValue(Object value)
+   {
+      return this.value = value;
+   }
+
+   protected static enum Flags
+   {
+      CHANGED(0x1), CREATED(0x2), DELETED(0x4), VALID(0x8);
+      final byte mask;
+
+      Flags(int mask)
+      {
+         this.mask = (byte) mask;
+      }
+   }
+
+   /**
+    * Tests whether a flag is set.
+    *
+    * @param flag flag to test
+    * @return true if set, false otherwise.
+    */
+   protected final boolean isFlagSet(Flags flag)
+   {
+      return (flags & flag.mask) != 0;
+   }
+
+   /**
+    * Unility method that sets the value of the given flag to true.
+    *
+    * @param flag flag to set
+    */
+   protected final void setFlag(Flags flag)
+   {
+      flags |= flag.mask;
+   }
+
+   /**
+    * Utility method that sets the value of the flag to false.
+    *
+    * @param flag flag to unset
+    */
+   protected final void unsetFlag(Flags flag)
+   {
+      flags &= ~flag.mask;
+   }
+
+   public boolean isNullEntry()
+   {
+      return false;
+   }
+
+   public void copyForUpdate(DataContainer container, boolean writeSkewCheck)
+   {
+      if (isFlagSet(CHANGED)) return; // already copied
+
+      setFlag(CHANGED);  // mark as changed
+
+      // if newly created, then nothing to copy.
+      if (!isFlagSet(CREATED)) oldValue = value;
+   }
+
+   @SuppressWarnings("unchecked")
+   public void commitUpdate(InvocationContext ctx, DataContainer container)
+   {
+      // only do stuff if there are changes.
+      if (isFlagSet(CHANGED))
+      {
+         if (trace)
+            log.trace("Updating entry [" + getKey() + "].  deleted=" + isDeleted() + " valid=" + isValid() + " changed=" + isChanged() + " created=" + isFlagSet(CREATED));
+         if (isFlagSet(DELETED))
+         {
+            container.remove(key);
+
+         }
+         else
+         {
+            container.put(key, value);
+         }
+         reset();
+      }
+   }
+
+   private void reset()
+   {
+      oldValue = null;
+      flags = 0;
+      setValid(true);
+   }
+
+   public void rollbackUpdate()
+   {
+      value = oldValue;
+      reset();
+   }
+
+   public boolean isChanged()
+   {
+      return isFlagSet(CHANGED);
+   }
+
+   public boolean isValid()
+   {
+      return isFlagSet(VALID);
+   }
+
+   public void setValid(boolean valid)
+   {
+      if (valid)
+         setFlag(VALID);
+      else
+         unsetFlag(VALID);
+   }
+
+   public boolean isCreated()
+   {
+      return isFlagSet(CREATED);
+   }
+
+   public void setCreated(boolean created)
+   {
+      if (created)
+         setFlag(CREATED);
+      else
+         unsetFlag(CREATED);
+   }
+
+   public boolean isDeleted()
+   {
+      return isFlagSet(DELETED);
+   }
+
+   public void setDeleted(boolean deleted)
+   {
+      if (deleted)
+         setFlag(DELETED);
+      else
+         unsetFlag(DELETED);
+   }
+}
\ No newline at end of file

Copied: core/branches/flat/src/main/java/org/jboss/starobrno/container/RepeatableReadEntry.java (from rev 6895, core/branches/flat/src/main/java/org/jboss/starobrno/mvcc/RepeatableReadEntry.java)
===================================================================
--- core/branches/flat/src/main/java/org/jboss/starobrno/container/RepeatableReadEntry.java	                        (rev 0)
+++ core/branches/flat/src/main/java/org/jboss/starobrno/container/RepeatableReadEntry.java	2008-10-09 13:22:09 UTC (rev 6897)
@@ -0,0 +1,69 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2000 - 2008, Red Hat Middleware LLC, and individual contributors
+ * as indicated by the @author tags. See the copyright.txt file in the
+ * distribution for a full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.starobrno.container;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.jboss.cache.mvcc.RepeatableReadNode;
+import org.jboss.starobrno.CacheException;
+import static org.jboss.starobrno.container.ReadCommittedEntry.Flags.CHANGED;
+
+/**
+ * A node delegate that encapsulates repeatable read semantics when writes are initiated, committed or rolled back.
+ *
+ * @author Manik Surtani (<a href="mailto:manik at jboss.org">manik at jboss.org</a>)
+ * @since 3.0
+ */
+public class RepeatableReadEntry extends ReadCommittedEntry
+{
+   private static final Log log = LogFactory.getLog(RepeatableReadNode.class);
+
+   public RepeatableReadEntry(Object key, Object value)
+   {
+      super(key, value);
+   }
+
+   @Override
+   public void copyForUpdate(DataContainer container, boolean writeSkewCheck)
+   {
+      if (isFlagSet(CHANGED)) return; // already copied
+
+      // mark node as changed.
+      setFlag(CHANGED);
+
+      if (writeSkewCheck)
+      {
+         // check for write skew.
+         Object actualValue = container.get(key);
+
+         if (actualValue != null && actualValue != value)
+         {
+            String errormsg = new StringBuilder().append("Detected write skew on key [").append(getKey()).append("].  Another process has changed the entry since we last read it!").toString();
+            if (log.isWarnEnabled()) log.warn(errormsg + ".  Unable to copy entry for update.");
+            throw new CacheException(errormsg);
+         }
+      }
+
+      // make a backup copy
+      oldValue = value;
+   }
+}
\ No newline at end of file

Modified: core/branches/flat/src/main/java/org/jboss/starobrno/container/UnsortedDataContainer.java
===================================================================
--- core/branches/flat/src/main/java/org/jboss/starobrno/container/UnsortedDataContainer.java	2008-10-09 12:52:15 UTC (rev 6896)
+++ core/branches/flat/src/main/java/org/jboss/starobrno/container/UnsortedDataContainer.java	2008-10-09 13:22:09 UTC (rev 6897)
@@ -21,7 +21,6 @@
  */
 package org.jboss.starobrno.container;
 
-import java.util.Map.Entry;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
@@ -36,29 +35,24 @@
 {
    private final ConcurrentMap<K, V> data = new ConcurrentHashMap<K, V>();
 
-   public Set<Entry<K, V>> getEntries()
+   public V get(K k)
    {
-      return data.entrySet();
+      return data.get(k);
    }
 
-   public Entry<K, V> getEntry(K k)
+   public void put(K k, V v)
    {
-      if (k == null) throw new NullPointerException("I don't like nulls!");
-      for (Entry<K, V> e : data.entrySet())
-      {
-         if (k.equals(e.getKey())) return e;
-      }
-      return null;
+      data.put(k, v);
    }
 
-   public void putEntry(Entry<K, V> kvEntry)
+   public boolean containsKey(K k)
    {
-      data.put(kvEntry.getKey(), kvEntry.getValue());
+      return data.containsKey(k);
    }
 
-   public boolean exists(Entry<K, V> kvEntry)
+   public V remove(K k)
    {
-      return data.containsKey(kvEntry.getKey());
+      return data.remove(k);
    }
 
    public int size()
@@ -71,8 +65,8 @@
       data.clear();
    }
 
-   public void removeEntry(K key)
+   public Set<K> keySet()
    {
-      data.remove(key);
+      return data.keySet();
    }
 }

Modified: core/branches/flat/src/main/java/org/jboss/starobrno/context/EntryLookup.java
===================================================================
--- core/branches/flat/src/main/java/org/jboss/starobrno/context/EntryLookup.java	2008-10-09 12:52:15 UTC (rev 6896)
+++ core/branches/flat/src/main/java/org/jboss/starobrno/context/EntryLookup.java	2008-10-09 13:22:09 UTC (rev 6897)
@@ -21,7 +21,7 @@
  */
 package org.jboss.starobrno.context;
 
-import org.jboss.starobrno.mvcc.MVCCEntry;
+import org.jboss.starobrno.container.MVCCEntry;
 
 import java.util.Map;
 import java.util.Set;

Modified: core/branches/flat/src/main/java/org/jboss/starobrno/context/InvocationContextImpl.java
===================================================================
--- core/branches/flat/src/main/java/org/jboss/starobrno/context/InvocationContextImpl.java	2008-10-09 12:52:15 UTC (rev 6896)
+++ core/branches/flat/src/main/java/org/jboss/starobrno/context/InvocationContextImpl.java	2008-10-09 13:22:09 UTC (rev 6897)
@@ -26,7 +26,7 @@
 import org.jboss.cache.util.Immutables;
 import org.jboss.starobrno.commands.VisitableCommand;
 import org.jboss.starobrno.config.Option;
-import org.jboss.starobrno.mvcc.MVCCEntry;
+import org.jboss.starobrno.container.MVCCEntry;
 import org.jboss.starobrno.transaction.GlobalTransaction;
 import org.jboss.starobrno.transaction.TransactionTable;
 

Modified: core/branches/flat/src/main/java/org/jboss/starobrno/context/TransactionContextImpl.java
===================================================================
--- core/branches/flat/src/main/java/org/jboss/starobrno/context/TransactionContextImpl.java	2008-10-09 12:52:15 UTC (rev 6896)
+++ core/branches/flat/src/main/java/org/jboss/starobrno/context/TransactionContextImpl.java	2008-10-09 13:22:09 UTC (rev 6897)
@@ -24,7 +24,7 @@
 import org.jboss.cache.util.Immutables;
 import org.jboss.starobrno.commands.VisitableCommand;
 import org.jboss.starobrno.config.Option;
-import org.jboss.starobrno.mvcc.MVCCEntry;
+import org.jboss.starobrno.container.MVCCEntry;
 import org.jboss.starobrno.transaction.OrderedSynchronizationHandler;
 
 import javax.transaction.RollbackException;

Modified: core/branches/flat/src/main/java/org/jboss/starobrno/factories/EmptyConstructorFactory.java
===================================================================
--- core/branches/flat/src/main/java/org/jboss/starobrno/factories/EmptyConstructorFactory.java	2008-10-09 12:52:15 UTC (rev 6896)
+++ core/branches/flat/src/main/java/org/jboss/starobrno/factories/EmptyConstructorFactory.java	2008-10-09 13:22:09 UTC (rev 6897)
@@ -23,7 +23,6 @@
 
 
 import org.jboss.cache.RegionRegistry;
-import org.jboss.starobrno.batch.BatchContainer;
 import org.jboss.cache.buddyreplication.BuddyFqnTransformer;
 import org.jboss.cache.invocation.CacheInvocationDelegate;
 import org.jboss.cache.loader.CacheLoaderManager;
@@ -31,12 +30,13 @@
 import org.jboss.cache.marshall.Marshaller;
 import org.jboss.cache.marshall.VersionAwareMarshaller;
 import org.jboss.cache.remoting.jgroups.ChannelMessageListener;
+import org.jboss.starobrno.batch.BatchContainer;
 import org.jboss.starobrno.commands.CommandsFactory;
 import org.jboss.starobrno.config.ConfigurationException;
+import org.jboss.starobrno.container.MVCCEntryCreator;
 import org.jboss.starobrno.factories.annotations.DefaultFactoryFor;
 import org.jboss.starobrno.factories.context.ContextFactory;
 import org.jboss.starobrno.invocation.InvocationContextContainer;
-import org.jboss.starobrno.mvcc.MVCCEntryWrapper;
 import org.jboss.starobrno.notifications.Notifier;
 import org.jboss.starobrno.transaction.TransactionTable;
 
@@ -48,7 +48,7 @@
  */
 @DefaultFactoryFor(classes = {Notifier.class, RegionRegistry.class,
       ChannelMessageListener.class, CacheLoaderManager.class, Marshaller.class, InvocationContextContainer.class,
-      CacheInvocationDelegate.class, TransactionTable.class, MVCCEntryWrapper.class,
+      CacheInvocationDelegate.class, TransactionTable.class, MVCCEntryCreator.class,
       LockStrategyFactory.class, BuddyFqnTransformer.class, BatchContainer.class,
       ContextFactory.class, EntryFactory.class, CommandsFactory.class})
 public class EmptyConstructorFactory extends ComponentFactory

Modified: core/branches/flat/src/main/java/org/jboss/starobrno/factories/EntryFactory.java
===================================================================
--- core/branches/flat/src/main/java/org/jboss/starobrno/factories/EntryFactory.java	2008-10-09 12:52:15 UTC (rev 6896)
+++ core/branches/flat/src/main/java/org/jboss/starobrno/factories/EntryFactory.java	2008-10-09 13:22:09 UTC (rev 6897)
@@ -21,10 +21,8 @@
  */
 package org.jboss.starobrno.factories;
 
-import org.jboss.starobrno.mvcc.MVCCEntry;
+import org.jboss.starobrno.container.MVCCEntry;
 
-import java.util.Map.Entry;
-
 /**
  * // TODO: MANIK: Document this
  *
@@ -33,7 +31,5 @@
  */
 public interface EntryFactory
 {
-   Entry createEntry(Object key, Object value, boolean putInContainer);
-
-   MVCCEntry createWrappedEntry(Entry entry);
+   MVCCEntry createWrappedEntry(Object key, Object value, boolean isForInsert);
 }

Modified: core/branches/flat/src/main/java/org/jboss/starobrno/factories/EntryFactoryImpl.java
===================================================================
--- core/branches/flat/src/main/java/org/jboss/starobrno/factories/EntryFactoryImpl.java	2008-10-09 12:52:15 UTC (rev 6896)
+++ core/branches/flat/src/main/java/org/jboss/starobrno/factories/EntryFactoryImpl.java	2008-10-09 13:22:09 UTC (rev 6897)
@@ -24,16 +24,13 @@
 import org.jboss.cache.lock.IsolationLevel;
 import org.jboss.starobrno.config.Configuration;
 import org.jboss.starobrno.container.DataContainer;
+import org.jboss.starobrno.container.MVCCEntry;
+import org.jboss.starobrno.container.NullMarkerEntry;
+import org.jboss.starobrno.container.ReadCommittedEntry;
+import org.jboss.starobrno.container.RepeatableReadEntry;
 import org.jboss.starobrno.factories.annotations.Inject;
 import org.jboss.starobrno.factories.annotations.Start;
-import org.jboss.starobrno.mvcc.EntryImpl;
-import org.jboss.starobrno.mvcc.MVCCEntry;
-import org.jboss.starobrno.mvcc.NullMarkerEntry;
-import org.jboss.starobrno.mvcc.ReadCommittedEntry;
-import org.jboss.starobrno.mvcc.RepeatableReadEntry;
 
-import java.util.Map.Entry;
-
 /**
  * // TODO: MANIK: Document this
  *
@@ -61,18 +58,11 @@
       useRepeatableRead = configuration.getIsolationLevel() == IsolationLevel.REPEATABLE_READ;
    }
 
-   public Entry createEntry(Object key, Object value, boolean putInContainer)
+   public MVCCEntry createWrappedEntry(Object key, Object value, boolean isForInsert)
    {
-      Entry e = new EntryImpl(key, value);
-      if (putInContainer) dataContainer.putEntry(e);
-      return e;
-   }
+      if (value == null && !isForInsert) return useRepeatableRead ? NULL_MARKER : null;
 
-   public MVCCEntry createWrappedEntry(Entry entry)
-   {
-      if (entry == null) return useRepeatableRead ? NULL_MARKER : null;
-
-      MVCCEntry mvccEntry = useRepeatableRead ? new RepeatableReadEntry(entry) : new ReadCommittedEntry(entry);
+      MVCCEntry mvccEntry = useRepeatableRead ? new RepeatableReadEntry(key, value) : new ReadCommittedEntry(key, value);
       return mvccEntry;
    }
 }

Modified: core/branches/flat/src/main/java/org/jboss/starobrno/interceptors/InterceptorChain.java
===================================================================
--- core/branches/flat/src/main/java/org/jboss/starobrno/interceptors/InterceptorChain.java	2008-10-09 12:52:15 UTC (rev 6896)
+++ core/branches/flat/src/main/java/org/jboss/starobrno/interceptors/InterceptorChain.java	2008-10-09 13:22:09 UTC (rev 6897)
@@ -24,13 +24,12 @@
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.jboss.starobrno.CacheException;
+import org.jboss.starobrno.commands.VisitableCommand;
 import org.jboss.starobrno.context.InvocationContext;
 import org.jboss.starobrno.factories.annotations.Inject;
 import org.jboss.starobrno.factories.annotations.Start;
-import org.jboss.starobrno.commands.VisitableCommand;
 import org.jboss.starobrno.interceptors.base.CommandInterceptor;
 import org.jboss.starobrno.invocation.InvocationContextContainer;
-import org.jboss.cache.util.CachePrinter;
 
 import java.util.ArrayList;
 import java.util.Collections;
@@ -75,6 +74,7 @@
    @Start
    private void printChainInfo()
    {
+      if (log.isDebugEnabled()) log.debug("Interceptor chain size: " + size());
       if (log.isDebugEnabled()) log.debug("Interceptor chain is: " + toString());
    }
 
@@ -358,14 +358,11 @@
    {
       StringBuilder sb = new StringBuilder();
       CommandInterceptor i = firstInChain;
-      if (i != null)
+      while (i != null)
       {
-         if (i.getNext() != null)
-         {
-            sb.append(i.getNext()).append("\n");
-         }
-         sb.append("\t>> ");
+         sb.append("\n\t>> ");
          sb.append(i.getClass().getName());
+         i = i.getNext();
       }
       return sb.toString();
    }

Modified: core/branches/flat/src/main/java/org/jboss/starobrno/interceptors/LockingInterceptor.java
===================================================================
--- core/branches/flat/src/main/java/org/jboss/starobrno/interceptors/LockingInterceptor.java	2008-10-09 12:52:15 UTC (rev 6896)
+++ core/branches/flat/src/main/java/org/jboss/starobrno/interceptors/LockingInterceptor.java	2008-10-09 13:22:09 UTC (rev 6897)
@@ -21,6 +21,7 @@
  */
 package org.jboss.starobrno.interceptors;
 
+import org.jboss.cache.lock.IsolationLevel;
 import org.jboss.starobrno.commands.VisitableCommand;
 import org.jboss.starobrno.commands.read.GetKeyValueCommand;
 import org.jboss.starobrno.commands.read.GravitateDataCommand;
@@ -35,16 +36,18 @@
 import org.jboss.starobrno.commands.write.RemoveCommand;
 import org.jboss.starobrno.commands.write.ReplaceCommand;
 import org.jboss.starobrno.container.DataContainer;
+import org.jboss.starobrno.container.MVCCEntry;
+import org.jboss.starobrno.container.MVCCEntryCreator;
 import org.jboss.starobrno.context.InvocationContext;
 import org.jboss.starobrno.factories.annotations.Inject;
+import org.jboss.starobrno.factories.annotations.Start;
 import org.jboss.starobrno.interceptors.base.PrePostProcessingCommandInterceptor;
 import org.jboss.starobrno.lock.LockManager;
-import org.jboss.starobrno.mvcc.MVCCEntry;
-import org.jboss.starobrno.mvcc.MVCCEntryWrapper;
 
+import java.util.HashSet;
 import java.util.List;
 import java.util.ListIterator;
-import java.util.Map.Entry;
+import java.util.Set;
 
 /**
  * Interceptor to implement <a href="http://wiki.jboss.org/wiki/JBossCacheMVCC">MVCC</a> functionality.
@@ -57,16 +60,23 @@
 {
    LockManager lockManager;
    DataContainer dataContainer;
-   MVCCEntryWrapper entryWrapper;
+   MVCCEntryCreator entryWrapper;
+   boolean useReadCommitted;
 
    @Inject
-   public void setDependencies(LockManager lockManager, DataContainer dataContainer, MVCCEntryWrapper entryWrapper)
+   public void setDependencies(LockManager lockManager, DataContainer dataContainer, MVCCEntryCreator entryWrapper)
    {
       this.lockManager = lockManager;
       this.dataContainer = dataContainer;
       this.entryWrapper = entryWrapper;
    }
 
+   @Start
+   private void determineIsolationLevel()
+   {
+      useReadCommitted = configuration.getIsolationLevel() == IsolationLevel.READ_COMMITTED;
+   }
+
    @Override
    protected boolean doBeforeCall(InvocationContext ctx, VisitableCommand command)
    {
@@ -138,11 +148,7 @@
    public Object handleClearCommand(InvocationContext ctx, ClearCommand command) throws Throwable
    {
       // get a snapshot of all keys in the data container
-      for (Object o : dataContainer.getEntries())
-      {
-         Object key = ((Entry) o).getKey();
-         entryWrapper.wrapEntryForWriting(ctx, key, false, false);
-      }
+      for (Object key : dataContainer.keySet()) entryWrapper.wrapEntryForWriting(ctx, key, false, false);
 
       return invokeNextInterceptor(ctx, command);
    }
@@ -211,7 +217,22 @@
       }
       else
       {
-         if (trace) log.trace("Nothing to do since there is a transaction in scope.");
+         if (useReadCommitted)
+         {
+            // wipe all unchanged entries from context, to force subsequent reads to go to the container,
+            // hence providing R_C semantics.
+            Set<Object> keysToRemove = new HashSet<Object>();
+            for (MVCCEntry e : ctx.getLookedUpEntries().values())
+            {
+               if (e.isChanged()) keysToRemove.add(e.getKey());
+            }
+            for (Object k : keysToRemove) ctx.removeKeyLocked(k);
+         }
+         else
+         {
+            if (trace) log.trace("Nothing to do since there is a transaction in scope.");
+         }
+
       }
    }
 

Modified: core/branches/flat/src/main/java/org/jboss/starobrno/notifications/NotifierImpl.java
===================================================================
--- core/branches/flat/src/main/java/org/jboss/starobrno/notifications/NotifierImpl.java	2008-10-09 12:52:15 UTC (rev 6896)
+++ core/branches/flat/src/main/java/org/jboss/starobrno/notifications/NotifierImpl.java	2008-10-09 13:22:09 UTC (rev 6897)
@@ -36,13 +36,13 @@
 import org.jboss.starobrno.CacheException;
 import org.jboss.starobrno.CacheSPI;
 import org.jboss.starobrno.config.Configuration;
+import org.jboss.starobrno.container.MVCCEntry;
 import org.jboss.starobrno.context.InvocationContext;
 import org.jboss.starobrno.factories.annotations.Destroy;
 import org.jboss.starobrno.factories.annotations.Inject;
 import org.jboss.starobrno.factories.annotations.NonVolatile;
 import org.jboss.starobrno.factories.annotations.Start;
 import org.jboss.starobrno.factories.annotations.Stop;
-import org.jboss.starobrno.mvcc.MVCCEntry;
 import org.jboss.starobrno.notifications.event.NodeModifiedEvent;
 import org.jgroups.View;
 

Modified: core/branches/flat/src/test/java/org/jboss/starobrno/BasicTest.java
===================================================================
--- core/branches/flat/src/test/java/org/jboss/starobrno/BasicTest.java	2008-10-09 12:52:15 UTC (rev 6896)
+++ core/branches/flat/src/test/java/org/jboss/starobrno/BasicTest.java	2008-10-09 13:22:09 UTC (rev 6897)
@@ -31,7 +31,7 @@
  * @author Manik Surtani (<a href="mailto:manik at jboss.org">manik at jboss.org</a>)
  * @since 3.0
  */
- at Test
+ at Test(groups = "functional")
 public class BasicTest
 {
    public void basicTest()

Added: core/branches/flat/src/test/java/org/jboss/starobrno/profiling/MemConsumptionTest.java
===================================================================
--- core/branches/flat/src/test/java/org/jboss/starobrno/profiling/MemConsumptionTest.java	                        (rev 0)
+++ core/branches/flat/src/test/java/org/jboss/starobrno/profiling/MemConsumptionTest.java	2008-10-09 13:22:09 UTC (rev 6897)
@@ -0,0 +1,114 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.cache.profiling;
+
+import org.jboss.cache.DefaultCacheFactory;
+import org.jboss.starobrno.Cache;
+import org.jboss.starobrno.util.TestingUtil;
+import org.testng.annotations.Test;
+
+import java.io.IOException;
+import java.text.NumberFormat;
+import java.util.Arrays;
+import java.util.Random;
+import java.util.concurrent.TimeUnit;
+
+ at Test(groups = "profiling")
+public class MemConsumptionTest
+{
+   // adjust the next 3 numbers
+   int numEntries = 1000000;
+   int payloadSize = 20; // bytes
+   int keySize = 10; // bytes
+   int bytesPerChar = 2;
+
+   Random r = new Random();
+
+   public void testMemConsumption() throws IOException
+   {
+      int kBytesCached = (bytesPerChar * numEntries * (payloadSize + keySize)) / 1024;
+      System.out.println("Bytes to be cached: " + NumberFormat.getIntegerInstance().format(kBytesCached) + " kb");
+
+//      Cache<byte[], byte[]> c = new DefaultCacheFactory<byte[], byte[]>().createCache(); // default LOCAL cache
+      Cache<String, String> c = new DefaultCacheFactory<String, String>().createCache(); // default LOCAL cache
+//      Map<String, String> c = new ConcurrentHashMap<String, String>();
+      long start = System.nanoTime();
+      for (int i = 0; i < numEntries; i++)
+      {
+//         c.put(generateUniqueKey(i, keySize), generateBytePayload(payloadSize));
+         c.put(generateUniqueString(i, keySize), generateRandomString(payloadSize));
+         if (i % 1000 == 0) System.out.println("Added " + i + " entries");
+      }
+      long duration = System.nanoTime() - start;
+      System.out.println("Done, took " + TimeUnit.NANOSECONDS.toSeconds(duration) + " seconds!");
+
+      System.out.println("Calling System.gc()");
+      System.gc();  // clear any unnecessary objects
+
+      TestingUtil.sleepThread(1000); // wait for gc
+
+      // wait for manual test exit
+      System.out.println("Cache populated; check mem usage using jconsole, etc.!");
+      System.in.read();
+   }
+
+   private String generateUniqueString(int runNumber, int keySize)
+   {
+      // string size should be exactly equal to key size but also be unique.
+      // start by creating a string from the run number
+      StringBuilder sb = new StringBuilder();
+      // append the run number
+      sb.append(runNumber);
+      for (int i = sb.length(); i < keySize; i++) sb.append("_");
+      return sb.toString();
+   }
+
+   private byte[] generateUniqueKey(int runNumber, int keySize)
+   {
+      byte[] b = new byte[keySize];
+      b[0] = (byte) (runNumber >>> 0);
+      b[1] = (byte) (runNumber >>> 8);
+      b[2] = (byte) (runNumber >>> 16);
+      b[3] = (byte) (runNumber >>> 24);
+
+      for (int i = 4; i < keySize; i++) b[i] = 0;
+      return b;
+   }
+
+   private byte[] generateBytePayload(int payloadSize)
+   {
+      byte[] b = new byte[payloadSize];
+      Arrays.fill(b, (byte) 0);
+      return b;
+   }
+
+   private String generateRandomString(int stringSize)
+   {
+      StringBuilder sb = new StringBuilder();
+      for (int i = 0; i < stringSize; i++)
+      {
+         sb.append(r.nextInt(9)); // single digit
+      }
+      assert sb.length() == stringSize;
+      return sb.toString();
+   }
+}

Added: core/branches/flat/src/test/java/org/jboss/starobrno/util/TestingUtil.java
===================================================================
--- core/branches/flat/src/test/java/org/jboss/starobrno/util/TestingUtil.java	                        (rev 0)
+++ core/branches/flat/src/test/java/org/jboss/starobrno/util/TestingUtil.java	2008-10-09 13:22:09 UTC (rev 6897)
@@ -0,0 +1,641 @@
+/*
+ * JBoss, the OpenSource J2EE webOS
+ *
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
+ */
+
+package org.jboss.starobrno.util;
+
+import org.jboss.cache.CacheStatus;
+import org.jboss.cache.invocation.CacheInvocationDelegate;
+import org.jboss.starobrno.Cache;
+import org.jboss.starobrno.CacheSPI;
+import org.jboss.starobrno.commands.CommandsFactory;
+import org.jboss.starobrno.commands.VisitableCommand;
+import org.jboss.starobrno.factories.ComponentRegistry;
+import org.jboss.starobrno.interceptors.InterceptorChain;
+import org.jboss.starobrno.interceptors.base.CommandInterceptor;
+import org.jboss.starobrno.lock.LockManager;
+import org.jgroups.JChannel;
+
+import java.io.File;
+import java.lang.reflect.Field;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Random;
+
+/**
+ * Utilities for unit testing JBossCache.
+ *
+ * @author <a href="mailto://brian.stansberry@jboss.com">Brian Stansberry</a>
+ * @version $Revision$
+ */
+public class TestingUtil
+{
+   private static Random random = new Random();
+
+   /**
+    * Extracts the value of a field in a given target instance using reflection, able to extract private fields as well.
+    *
+    * @param target    object to extract field from
+    * @param fieldName name of field to extract
+    * @return field value
+    */
+   public static Object extractField(Object target, String fieldName)
+   {
+      return extractField(target.getClass(), target, fieldName);
+   }
+
+   public static void replaceField(Object newValue, String fieldName, Object owner, Class baseType)
+   {
+      Field field;
+      try
+      {
+         field = baseType.getDeclaredField(fieldName);
+         field.setAccessible(true);
+         field.set(owner, newValue);
+      }
+      catch (Exception e)
+      {
+         throw new RuntimeException(e);//just to simplify exception handeling
+      }
+   }
+
+
+   public static Object extractField(Class type, Object target, String fieldName)
+   {
+      Field field;
+      try
+      {
+         field = type.getDeclaredField(fieldName);
+         field.setAccessible(true);
+         return field.get(target);
+      }
+      catch (Exception e)
+      {
+         if (type.equals(Object.class))
+         {
+            e.printStackTrace();
+            return null;
+         }
+         else
+         {
+            // try with superclass!!
+            return extractField(type.getSuperclass(), target, fieldName);
+         }
+      }
+   }
+
+   public static <T extends CommandInterceptor> T findInterceptor(CacheSPI<?, ?> cache, Class<T> interceptorToFind)
+   {
+      for (CommandInterceptor i : cache.getInterceptorChain())
+      {
+         if (interceptorToFind.isInstance(i)) return interceptorToFind.cast(i);
+      }
+      return null;
+   }
+
+   /**
+    * Injects an interceptor after a specified interceptor in a running cache.  Your new interceptor need not be
+    * initialised with pointers to the next interceptor, etc. as this method does all that for you, including calling
+    * setCache().
+    *
+    * @param cache                         running cache instance
+    * @param interceptorToInject           interceptor instance to inject.
+    * @param interceptorAfterWhichToInject class of interceptor to search for in the chain and after which to add your interceptor
+    */
+   public static void injectInterceptor(CacheSPI<?, ?> cache, CommandInterceptor interceptorToInject, Class<? extends CommandInterceptor> interceptorAfterWhichToInject)
+   {
+      cache.addInterceptor(interceptorToInject, interceptorAfterWhichToInject);
+   }
+
+   /**
+    * Loops, continually calling {@link #areCacheViewsComplete(org.jboss.cache.Cache[])}
+    * until it either returns true or <code>timeout</code> ms have elapsed.
+    *
+    * @param caches  caches which must all have consistent views
+    * @param timeout max number of ms to loop
+    * @throws RuntimeException if <code>timeout</code> ms have elapse without
+    *                          all caches having the same number of members.
+    */
+   public static void blockUntilViewsReceived(Cache[] caches, long timeout)
+   {
+      long failTime = System.currentTimeMillis() + timeout;
+
+      while (System.currentTimeMillis() < failTime)
+      {
+         sleepThread(100);
+         if (areCacheViewsComplete(caches))
+         {
+            return;
+         }
+      }
+
+      throw new RuntimeException("timed out before caches had complete views");
+   }
+
+   /**
+    * Version of blockUntilViewsReceived that uses varargs
+    */
+   public static void blockUntilViewsReceived(long timeout, Cache... caches)
+   {
+      blockUntilViewsReceived(caches, timeout);
+   }
+
+   /**
+    * Loops, continually calling {@link #areCacheViewsComplete(org.jboss.cache.CacheSPI[])}
+    * until it either returns true or <code>timeout</code> ms have elapsed.
+    *
+    * @param caches  caches which must all have consistent views
+    * @param timeout max number of ms to loop
+    * @throws RuntimeException if <code>timeout</code> ms have elapse without
+    *                          all caches having the same number of members.
+    */
+   public static void blockUntilViewsReceived(CacheSPI[] caches, long timeout)
+   {
+      long failTime = System.currentTimeMillis() + timeout;
+
+      while (System.currentTimeMillis() < failTime)
+      {
+         sleepThread(100);
+         if (areCacheViewsComplete(caches))
+         {
+            return;
+         }
+      }
+
+      throw new RuntimeException("timed out before caches had complete views");
+   }
+
+
+   /**
+    * An overloaded version of {@link #blockUntilViewsReceived(long,org.jboss.cache.Cache[])} that allows for 'shrinking' clusters.
+    * I.e., the usual method barfs if there are more members than expected.  This one takes a param (barfIfTooManyMembers) which,
+    * if false, will NOT barf but will wait until the cluster 'shrinks' to the desired size.  Useful if in tests, you kill
+    * a member and want to wait until this fact is known across the cluster.
+    *
+    * @param timeout
+    * @param barfIfTooManyMembers
+    * @param caches
+    */
+   public static void blockUntilViewsReceived(long timeout, boolean barfIfTooManyMembers, Cache... caches)
+   {
+      long failTime = System.currentTimeMillis() + timeout;
+
+      while (System.currentTimeMillis() < failTime)
+      {
+         sleepThread(100);
+         if (areCacheViewsComplete(caches, barfIfTooManyMembers))
+         {
+            return;
+         }
+      }
+
+      throw new RuntimeException("timed out before caches had complete views");
+   }
+
+   /**
+    * Loops, continually calling {@link #areCacheViewsComplete(org.jboss.cache.CacheSPI[])}
+    * until it either returns true or <code>timeout</code> ms have elapsed.
+    *
+    * @param groupSize number of caches expected in the group
+    * @param timeout   max number of ms to loop
+    * @throws RuntimeException if <code>timeout</code> ms have elapse without
+    *                          all caches having the same number of members.
+    */
+   public static void blockUntilViewReceived(CacheSPI cache, int groupSize, long timeout)
+   {
+      blockUntilViewReceived(cache, groupSize, timeout, true);
+   }
+
+   public static void blockUntilViewReceived(CacheSPI cache, int groupSize, long timeout, boolean barfIfTooManyMembersInView)
+   {
+      long failTime = System.currentTimeMillis() + timeout;
+
+      while (System.currentTimeMillis() < failTime)
+      {
+         sleepThread(100);
+         if (isCacheViewComplete(cache, groupSize, barfIfTooManyMembersInView))
+         {
+            return;
+         }
+      }
+
+      throw new RuntimeException("timed out before caches had complete views");
+   }
+
+   /**
+    * Checks each cache to see if the number of elements in the array
+    * returned by {@link CacheSPI#getMembers()} matches the size of
+    * the <code>caches</code> parameter.
+    *
+    * @param caches caches that should form a View
+    * @return <code>true</code> if all caches have
+    *         <code>caches.length</code> members; false otherwise
+    * @throws IllegalStateException if any of the caches have MORE view
+    *                               members than caches.length
+    */
+   public static boolean areCacheViewsComplete(Cache[] caches)
+   {
+      return areCacheViewsComplete(caches, true);
+   }
+
+   public static boolean areCacheViewsComplete(Cache[] caches, boolean barfIfTooManyMembers)
+   {
+      int memberCount = caches.length;
+
+      for (int i = 0; i < memberCount; i++)
+      {
+         if (!isCacheViewComplete(caches[i], memberCount, barfIfTooManyMembers))
+         {
+            return false;
+         }
+      }
+
+      return true;
+   }
+
+   /**
+    * Checks each cache to see if the number of elements in the array
+    * returned by {@link org.jboss.cache.RPCManager#getMembers()} matches the size of
+    * the <code>caches</code> parameter.
+    *
+    * @param caches caches that should form a View
+    * @return <code>true</code> if all caches have
+    *         <code>caches.length</code> members; false otherwise
+    * @throws IllegalStateException if any of the caches have MORE view
+    *                               members than caches.length
+    */
+   public static boolean areCacheViewsComplete(CacheSPI[] caches)
+   {
+      if (caches == null) throw new NullPointerException("Cache impl array is null");
+      Cache[] c = new Cache[caches.length];
+      for (int i = 0; i < caches.length; i++) c[i] = caches[i];
+      return areCacheViewsComplete(c);
+   }
+
+   /**
+    * @param cache
+    * @param memberCount
+    */
+   public static boolean isCacheViewComplete(CacheSPI cache, int memberCount)
+   {
+      List members = cache.getRPCManager().getMembers();
+      if (members == null || memberCount > members.size())
+      {
+         return false;
+      }
+      else if (memberCount < members.size())
+      {
+         // This is an exceptional condition
+         StringBuilder sb = new StringBuilder("Cache at address ");
+         sb.append(cache.getRPCManager().getLocalAddress());
+         sb.append(" had ");
+         sb.append(members.size());
+         sb.append(" members; expecting ");
+         sb.append(memberCount);
+         sb.append(". Members were (");
+         for (int j = 0; j < members.size(); j++)
+         {
+            if (j > 0)
+            {
+               sb.append(", ");
+            }
+            sb.append(members.get(j));
+         }
+         sb.append(')');
+
+         throw new IllegalStateException(sb.toString());
+      }
+
+      return true;
+   }
+
+   /**
+    * @param c
+    * @param memberCount
+    */
+   public static boolean isCacheViewComplete(Cache c, int memberCount)
+   {
+      return isCacheViewComplete(c, memberCount, true);
+   }
+
+   public static boolean isCacheViewComplete(Cache c, int memberCount, boolean barfIfTooManyMembers)
+   {
+      CacheSPI cache = (CacheSPI) c;
+      List members = cache.getRPCManager().getMembers();
+      if (members == null || memberCount > members.size())
+      {
+         return false;
+      }
+      else if (memberCount < members.size())
+      {
+         if (barfIfTooManyMembers)
+         {
+            // This is an exceptional condition
+            StringBuilder sb = new StringBuilder("Cache at address ");
+            sb.append(cache.getRPCManager().getLocalAddress());
+            sb.append(" had ");
+            sb.append(members.size());
+            sb.append(" members; expecting ");
+            sb.append(memberCount);
+            sb.append(". Members were (");
+            for (int j = 0; j < members.size(); j++)
+            {
+               if (j > 0)
+               {
+                  sb.append(", ");
+               }
+               sb.append(members.get(j));
+            }
+            sb.append(')');
+
+            throw new IllegalStateException(sb.toString());
+         }
+         else return false;
+      }
+
+      return true;
+   }
+
+
+   /**
+    * Puts the current thread to sleep for the desired number of ms, suppressing
+    * any exceptions.
+    *
+    * @param sleeptime number of ms to sleep
+    */
+   public static void sleepThread(long sleeptime)
+   {
+      try
+      {
+         Thread.sleep(sleeptime);
+      }
+      catch (InterruptedException ie)
+      {
+      }
+   }
+
+   public static void sleepRandom(int maxTime)
+   {
+      sleepThread(random.nextInt(maxTime));
+   }
+
+   public static void recursiveFileRemove(String directoryName)
+   {
+      File file = new File(directoryName);
+      recursiveFileRemove(file);
+   }
+
+   public static void recursiveFileRemove(File file)
+   {
+      if (file.exists())
+      {
+         System.out.println("Deleting file " + file);
+         recursivedelete(file);
+      }
+   }
+
+   private static void recursivedelete(File f)
+   {
+      if (f.isDirectory())
+      {
+         File[] files = f.listFiles();
+         for (File file : files)
+         {
+            recursivedelete(file);
+         }
+      }
+      //System.out.println("File " + f.toURI() + " deleted = " + f.delete());
+      f.delete();
+   }
+
+   /**
+    * Kills a cache - stops it, clears any data in any cache loaders, and rolls back any associated txs
+    */
+   public static void killCaches(Cache... caches)
+   {
+      for (Cache c : caches)
+      {
+         try
+         {
+            if (c != null && c.getCacheStatus() == CacheStatus.STARTED)
+            {
+               CacheSPI spi = (CacheSPI) c;
+               if (spi.getTransactionManager() != null)
+               {
+                  try
+                  {
+                     spi.getTransactionManager().rollback();
+                  }
+                  catch (Exception e)
+                  {
+                     // don't care
+                  }
+               }
+
+               // TODO fix cache loader stuff
+               /*
+               CacheLoaderManager clm = spi.getCacheLoaderManager();
+               CacheLoader cl = clm == null ? null : clm.getCacheLoader();
+               if (cl != null)
+               {
+                  try
+                  {
+                     cl.remove(Fqn.ROOT);
+                  }
+                  catch (Exception e)
+                  {
+                     // don't care
+                  }
+               }
+               */
+
+               spi.stop();
+            }
+         }
+         catch (Throwable t)
+         {
+
+         }
+      }
+   }
+
+   /**
+    * Clears any associated transactions with the current thread in the caches' transaction managers.
+    */
+   public static void killTransactions(Cache... caches)
+   {
+      for (Cache c : caches)
+      {
+         if (c != null && c.getCacheStatus() == CacheStatus.STARTED)
+         {
+            CacheSPI ci = (CacheSPI) c;
+            if (ci.getTransactionManager() != null)
+            {
+               try
+               {
+                  ci.getTransactionManager().rollback();
+               }
+               catch (Exception e)
+               {
+                  // don't care
+               }
+            }
+         }
+      }
+   }
+
+   /**
+    * For testing only - introspects a cache and extracts the ComponentRegistry
+    *
+    * @param cache cache to introspect
+    * @return component registry
+    */
+   public static ComponentRegistry extractComponentRegistry(Cache cache)
+   {
+      return (ComponentRegistry) extractField(cache, "componentRegistry");
+   }
+
+   public static LockManager extractLockManager(Cache cache)
+   {
+      return extractComponentRegistry(cache).getComponent(LockManager.class);
+   }
+
+   /**
+    * For testing only - introspects a cache and extracts the ComponentRegistry
+    *
+    * @param ci interceptor chain to introspect
+    * @return component registry
+    */
+   public static ComponentRegistry extractComponentRegistry(InterceptorChain ci)
+   {
+      return (ComponentRegistry) extractField(ci, "componentRegistry");
+   }
+
+
+   /**
+    * Replaces the existing interceptor chain in the cache wih one represented by the interceptor passed in.  This
+    * utility updates dependencies on all components that rely on the interceptor chain as well.
+    *
+    * @param cache       cache that needs to be altered
+    * @param interceptor the first interceptor in the new chain.
+    */
+   public static void replaceInterceptorChain(CacheSPI<?, ?> cache, CommandInterceptor interceptor)
+   {
+      ComponentRegistry cr = extractComponentRegistry(cache);
+      // make sure all interceptors here are wired.
+      CommandInterceptor i = interceptor;
+      do
+      {
+         cr.wireDependencies(i);
+      }
+      while ((i = i.getNext()) != null);
+
+      InterceptorChain inch = cr.getComponent(InterceptorChain.class);
+      inch.setFirstInChain(interceptor);
+   }
+
+   /**
+    * Retrieves the remote delegate for a given cache.  It is on this remote delegate that the JGroups RPCDispatcher
+    * invokes remote methods.
+    *
+    * @param cache cache instance for which a remote delegate is to be retrieved
+    * @return remote delegate, or null if the cacge is not configured for replication.
+    */
+   public static CacheInvocationDelegate getInvocationDelegate(CacheSPI cache)
+   {
+      ComponentRegistry cr = extractComponentRegistry(cache);
+      return cr.getComponent(CacheInvocationDelegate.class);
+   }
+
+   /**
+    * Blocks until the cache has reached a specified state.
+    *
+    * @param cache       cache to watch
+    * @param cacheStatus status to wait for
+    * @param timeout     timeout to wait for
+    */
+   public static void blockUntilCacheStatusAchieved(Cache cache, CacheStatus cacheStatus, long timeout)
+   {
+      CacheSPI spi = (CacheSPI) cache;
+      long killTime = System.currentTimeMillis() + timeout;
+      while (System.currentTimeMillis() < killTime)
+      {
+         if (spi.getCacheStatus() == cacheStatus) return;
+         sleepThread(50);
+      }
+      throw new RuntimeException("Timed out waiting for condition");
+   }
+
+   public static void replicateCommand(CacheSPI cache, VisitableCommand command) throws Throwable
+   {
+      ComponentRegistry cr = extractComponentRegistry(cache);
+      InterceptorChain ic = cr.getComponent(InterceptorChain.class);
+      ic.invoke(command);
+   }
+
+   public static void blockUntilViewsReceived(int timeout, List caches)
+   {
+      blockUntilViewsReceived((Cache[]) caches.toArray(new Cache[]{}), timeout);
+   }
+
+
+   public static CommandsFactory extractCommandsFactory(CacheSPI<Object, Object> cache)
+   {
+      return (CommandsFactory) extractField(cache, "commandsFactory");
+   }
+
+   public static String getJGroupsAttribute(Cache cache, String protocol, String attribute)
+   {
+      String s = ((JChannel) ((CacheSPI) cache).getRPCManager().getChannel()).getProperties();
+      String[] protocols = s.split(":");
+      String attribs = null;
+      for (String p : protocols)
+      {
+         boolean hasAttribs = p.contains("(");
+         String name = hasAttribs ? p.substring(0, p.indexOf('(')) : p;
+         attribs = hasAttribs ? p.substring(p.indexOf('(') + 1, p.length() - 1) : null;
+
+         if (name.equalsIgnoreCase(protocol)) break;
+      }
+
+      if (attribs != null)
+      {
+         String[] attrArray = attribs.split(";");
+         for (String a : attrArray)
+         {
+            String[] kvPairs = a.split("=");
+            if (kvPairs[0].equalsIgnoreCase(attribute)) return kvPairs[1];
+         }
+      }
+      return null;
+   }
+
+   public static void dumpCacheContents(List caches)
+   {
+      System.out.println("**** START: Cache Contents ****");
+      int count = 1;
+      for (Object o : caches)
+      {
+         CacheSPI c = (CacheSPI) o;
+         if (c == null)
+         {
+            System.out.println("  ** Cache " + count + " is null!");
+         }
+         else
+         {
+            System.out.println("  ** Cache " + count + " is " + c.getRPCManager().getLocalAddress());
+//            System.out.println("    " + CachePrinter.printCacheDetails(c));
+         }
+         count++;
+      }
+      System.out.println("**** END: Cache Contents ****");
+   }
+
+   public static void dumpCacheContents(Cache... caches)
+   {
+      dumpCacheContents(Arrays.asList(caches));
+   }
+}




More information about the jbosscache-commits mailing list