[jbosscache-commits] JBoss Cache SVN: r7606 - in core/branches/flat/src: main/java/org/horizon/executors and 10 other directories.

jbosscache-commits at lists.jboss.org jbosscache-commits at lists.jboss.org
Wed Jan 28 13:26:32 EST 2009


Author: manik.surtani at jboss.com
Date: 2009-01-28 13:26:32 -0500 (Wed, 28 Jan 2009)
New Revision: 7606

Added:
   core/branches/flat/src/test/java/org/horizon/BaseClusteredTest.java
Removed:
   core/branches/flat/src/test/java/org/horizon/BaseReplicatedTest.java
   core/branches/flat/src/test/java/org/horizon/util/internals/ReplicationListener.java
Modified:
   core/branches/flat/src/main/java/org/horizon/Cache.java
   core/branches/flat/src/main/java/org/horizon/CacheDelegate.java
   core/branches/flat/src/main/java/org/horizon/executors/DefaultExecutorFactory.java
   core/branches/flat/src/main/java/org/horizon/executors/DefaultScheduledExecutorFactory.java
   core/branches/flat/src/main/java/org/horizon/factories/InterceptorChainFactory.java
   core/branches/flat/src/main/java/org/horizon/interceptors/InterceptorChain.java
   core/branches/flat/src/main/java/org/horizon/transaction/DummyTransactionManager.java
   core/branches/flat/src/main/java/org/horizon/tree/TreeCache.java
   core/branches/flat/src/main/java/org/horizon/tree/TreeCacheImpl.java
   core/branches/flat/src/main/java/org/horizon/util/TestingUtil.java
   core/branches/flat/src/test/java/org/horizon/api/mvcc/PutForExternalReadTest.java
   core/branches/flat/src/test/java/org/horizon/api/tree/NodeAPITest.java
   core/branches/flat/src/test/java/org/horizon/api/tree/NodeReplicatedMoveTest.java
   core/branches/flat/src/test/java/org/horizon/api/tree/SyncReplTest.java
   core/branches/flat/src/test/java/org/horizon/api/tree/SyncReplTxTest.java
   core/branches/flat/src/test/java/org/horizon/api/tree/TreeCacheAPITest.java
   core/branches/flat/src/test/java/org/horizon/replication/AsyncReplTest.java
   core/branches/flat/src/test/java/org/horizon/replication/SyncReplTest.java
Log:
replication code + tests on shared transport

Modified: core/branches/flat/src/main/java/org/horizon/Cache.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/Cache.java	2009-01-28 14:34:08 UTC (rev 7605)
+++ core/branches/flat/src/main/java/org/horizon/Cache.java	2009-01-28 18:26:32 UTC (rev 7606)
@@ -81,13 +81,48 @@
 
    String getVersion();
 
+   /**
+    * Adds a custom interceptor to the interceptor chain, at specified position, where the first interceptor in the
+    * chain is at position 0 and the last one at NUM_INTERCEPTORS - 1.
+    *
+    * @param i        the interceptor to add
+    * @param position the position to add the interceptor
+    */
    void addInterceptor(CommandInterceptor i, int position);
 
-   void addInterceptor(CommandInterceptor i, Class<? extends CommandInterceptor> afterInterceptor);
+   /**
+    * Adds a custom interceptor to the interceptor chain, after an instance of the specified interceptor type. Throws a
+    * cache exception if it cannot find an interceptor of the specified type.
+    *
+    * @param i                interceptor to add
+    * @param afterInterceptor interceptor type after which to place custom interceptor
+    */
+   void addInterceptorAfter(CommandInterceptor i, Class<? extends CommandInterceptor> afterInterceptor);
 
+   /**
+    * Adds a custom interceptor to the interceptor chain, before an instance of the specified interceptor type. Throws a
+    * cache exception if it cannot find an interceptor of the specified type.
+    *
+    * @param i                 interceptor to add
+    * @param beforeInterceptor interceptor type before which to place custom interceptor
+    */
+   void addInterceptorBefore(CommandInterceptor i, Class<? extends CommandInterceptor> beforeInterceptor);
+
+   /**
+    * Removes the interceptor at a specified position, where the first interceptor in the chain is at position 0 and the
+    * last one at getInterceptorChain().size() - 1.
+    *
+    * @param position the position at which to remove an interceptor
+    */
    void removeInterceptor(int position);
 
+   /**
+    * Removes the interceptor of specified type.
+    *
+    * @param interceptorType type of interceptor to remove
+    */
    void removeInterceptor(Class<? extends CommandInterceptor> interceptorType);
 
+
    CacheManager getCacheManager();
 }

Modified: core/branches/flat/src/main/java/org/horizon/CacheDelegate.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/CacheDelegate.java	2009-01-28 14:34:08 UTC (rev 7605)
+++ core/branches/flat/src/main/java/org/horizon/CacheDelegate.java	2009-01-28 18:26:32 UTC (rev 7606)
@@ -285,19 +285,23 @@
    }
 
    public void addInterceptor(CommandInterceptor i, int position) {
-      throw new IllegalStateException();//todo Implement me properly
+      invoker.addInterceptor(i, position);
    }
 
-   public void addInterceptor(CommandInterceptor i, Class<? extends CommandInterceptor> afterInterceptor) {
-      throw new IllegalStateException();//todo Implement me properly
+   public void addInterceptorAfter(CommandInterceptor i, Class<? extends CommandInterceptor> afterInterceptor) {
+      invoker.addInterceptorAfter(i, afterInterceptor);
    }
 
+   public void addInterceptorBefore(CommandInterceptor i, Class<? extends CommandInterceptor> beforeInterceptor) {
+      invoker.addInterceptorBefore(i, beforeInterceptor);
+   }
+
    public void removeInterceptor(int position) {
-      throw new IllegalStateException();//todo Implement me properly
+      invoker.removeInterceptor(position);
    }
 
    public void removeInterceptor(Class<? extends CommandInterceptor> interceptorType) {
-      throw new IllegalStateException();//todo Implement me properly
+      invoker.removeInterceptor(interceptorType);
    }
 
    public CacheLoaderManager getCacheLoaderManager() {

Modified: core/branches/flat/src/main/java/org/horizon/executors/DefaultExecutorFactory.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/executors/DefaultExecutorFactory.java	2009-01-28 14:34:08 UTC (rev 7605)
+++ core/branches/flat/src/main/java/org/horizon/executors/DefaultExecutorFactory.java	2009-01-28 18:26:32 UTC (rev 7606)
@@ -26,7 +26,7 @@
 
       ThreadFactory tf = new ThreadFactory() {
          public Thread newThread(Runnable r) {
-            return new Thread(threadNamePrefix + "-" + counter.getAndIncrement());
+            return new Thread(r, threadNamePrefix + "-" + counter.getAndIncrement());
          }
       };
 

Modified: core/branches/flat/src/main/java/org/horizon/executors/DefaultScheduledExecutorFactory.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/executors/DefaultScheduledExecutorFactory.java	2009-01-28 14:34:08 UTC (rev 7605)
+++ core/branches/flat/src/main/java/org/horizon/executors/DefaultScheduledExecutorFactory.java	2009-01-28 18:26:32 UTC (rev 7606)
@@ -18,7 +18,7 @@
       final AtomicInteger counter = new AtomicInteger(0);
       return Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
          public Thread newThread(Runnable r) {
-            return new Thread(threadNamePrefix + "-" + counter.getAndIncrement());
+            return new Thread(r, threadNamePrefix + "-" + counter.getAndIncrement());
          }
       });
    }

Modified: core/branches/flat/src/main/java/org/horizon/factories/InterceptorChainFactory.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/factories/InterceptorChainFactory.java	2009-01-28 14:34:08 UTC (rev 7605)
+++ core/branches/flat/src/main/java/org/horizon/factories/InterceptorChainFactory.java	2009-01-28 18:26:32 UTC (rev 7606)
@@ -171,7 +171,7 @@
                throw new ConfigurationException("Cannot add after class: " + config.getAfterClass()
                      + " as no such iterceptor exists in the default chain");
             }
-            interceptorChain.addAfterInterceptor(config.getInterceptor(), withClassName.get(0).getClass());
+            interceptorChain.addInterceptorAfter(config.getInterceptor(), withClassName.get(0).getClass());
          }
          if (config.getBeforeClass() != null) {
             List<CommandInterceptor> withClassName = interceptorChain.getInterceptorsWithClassName(config.getBeforeClass());
@@ -179,7 +179,7 @@
                throw new ConfigurationException("Cannot add before class: " + config.getAfterClass()
                      + " as no such iterceptor exists in the default chain");
             }
-            interceptorChain.addBeforeInterceptor(config.getInterceptor(), withClassName.get(0).getClass());
+            interceptorChain.addInterceptorBefore(config.getInterceptor(), withClassName.get(0).getClass());
          }
       }
    }

Modified: core/branches/flat/src/main/java/org/horizon/interceptors/InterceptorChain.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/interceptors/InterceptorChain.java	2009-01-28 14:34:08 UTC (rev 7605)
+++ core/branches/flat/src/main/java/org/horizon/interceptors/InterceptorChain.java	2009-01-28 18:26:32 UTC (rev 7606)
@@ -182,7 +182,7 @@
     *
     * @return true if the interceptor was added; i.e. the afterInterceptor exists
     */
-   public synchronized boolean addAfterInterceptor(CommandInterceptor toAdd, Class<? extends CommandInterceptor> afterInterceptor) {
+   public synchronized boolean addInterceptorAfter(CommandInterceptor toAdd, Class<? extends CommandInterceptor> afterInterceptor) {
       CommandInterceptor it = firstInChain;
       while (it != null) {
          if (it.getClass().equals(afterInterceptor)) {
@@ -200,7 +200,7 @@
     *
     * @return true if the interceptor was added; i.e. the afterInterceptor exists
     */
-   public synchronized boolean addBeforeInterceptor(CommandInterceptor toAdd, Class<? extends CommandInterceptor> beforeInterceptor) {
+   public synchronized boolean addInterceptorBefore(CommandInterceptor toAdd, Class<? extends CommandInterceptor> beforeInterceptor) {
       if (firstInChain.getClass().equals(beforeInterceptor)) {
          toAdd.setNext(firstInChain);
          firstInChain = toAdd;

Modified: core/branches/flat/src/main/java/org/horizon/transaction/DummyTransactionManager.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/transaction/DummyTransactionManager.java	2009-01-28 14:34:08 UTC (rev 7605)
+++ core/branches/flat/src/main/java/org/horizon/transaction/DummyTransactionManager.java	2009-01-28 18:26:32 UTC (rev 7606)
@@ -27,6 +27,7 @@
 import javax.naming.Context;
 import javax.naming.InitialContext;
 import javax.naming.NamingException;
+import javax.naming.NoInitialContextException;
 import java.util.Properties;
 
 /**
@@ -54,8 +55,10 @@
             Context ctx = new InitialContext(p);
             ctx.bind("java:/TransactionManager", instance);
             ctx.bind("UserTransaction", utx);
-         }
-         catch (NamingException e) {
+         } catch (NoInitialContextException nie) {
+            log.debug(nie.getMessage());
+
+         } catch (NamingException e) {
             log.debug("binding of DummyTransactionManager failed", e);
          }
       }

Modified: core/branches/flat/src/main/java/org/horizon/tree/TreeCache.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/tree/TreeCache.java	2009-01-28 14:34:08 UTC (rev 7605)
+++ core/branches/flat/src/main/java/org/horizon/tree/TreeCache.java	2009-01-28 18:26:32 UTC (rev 7606)
@@ -23,15 +23,8 @@
 
 import org.horizon.Cache;
 import org.horizon.CacheException;
-import org.horizon.ComponentStatus;
-import org.horizon.config.Configuration;
-import org.horizon.context.InvocationContext;
-import org.horizon.interceptors.base.CommandInterceptor;
 import org.horizon.lifecycle.Lifecycle;
-import org.horizon.notifications.Listenable;
-import org.horizon.remoting.transport.Address;
 
-import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
@@ -55,7 +48,7 @@
  * @see Node
  * @since 1.0
  */
-public interface TreeCache<K, V> extends Lifecycle, Listenable {
+public interface TreeCache<K, V> extends Lifecycle {
    /**
     * Returns the root node of this cache.
     *
@@ -236,41 +229,6 @@
    void evict(Fqn fqn);
 
    /**
-    * Gets where the cache currently is its lifecycle transitions.
-    *
-    * @return the CacheStatus. Will not return <code>null</code>.
-    */
-   ComponentStatus getCacheStatus();
-
-   /**
-    * @return the current invocation context for the current invocation and cache instance.
-    * @throws IllegalStateException if the cache has been destroyed.
-    */
-   InvocationContext getInvocationContext();
-
-   /**
-    * Sets the passed in {@link InvocationContext} as current.
-    *
-    * @param ctx invocation context to use
-    * @throws IllegalStateException if the cache has been destroyed.
-    */
-   void setInvocationContext(InvocationContext ctx);
-
-   /**
-    * Returns the local address of this cache in a cluster, or <code>null</code> if running in local mode.
-    *
-    * @return the local address of this cache in a cluster, or <code>null</code> if running in local mode.
-    */
-   Address getAddress();
-
-   /**
-    * Returns a list of members in the cluster, or <code>null</code> if running in local mode.
-    *
-    * @return a {@link List} of members in the cluster, or <code>null</code> if running in local mode.
-    */
-   List<Address> getMembers();
-
-   /**
     * Moves a part of the cache to a different subtree.
     * <p/>
     * E.g.:
@@ -380,77 +338,23 @@
    void clearData(Fqn fqn);
 
    /**
-    * Starts a batch.  This is a lightweight batching mechanism that groups cache writes together and finally performs
-    * the write, persistence and/or replication when {@link #endBatch(boolean)} is called rather than for each
-    * invocation on the cache.
-    * <p/>
-    * Note that if there is an existing transaction in scope and the cache has been configured to use a JTA compliant
-    * transaction manager, calls to {@link #startBatch()} and {@link #endBatch(boolean)} are ignored and treated as
-    * no-ops.
-    * <p/>
-    *
-    * @see #endBatch(boolean)
-    * @since 1.0
+    * @return a reference to the underlying cache instance
     */
-   void startBatch();
+   Cache<K, V> getCache();
 
    /**
-    * Ends an existing ongoing batch.  A no-op if a batch has not been started yet.
-    * <p/>
-    * Note that if there is an existing transaction in scope and the cache has been configured to use a JTA compliant
-    * transaction manager, calls to {@link #startBatch()} and {@link #endBatch(boolean)} are ignored and treated as
-    * no-ops.
-    * <p/>
+    * Tests if an Fqn exists.  Convenience method for {@link #exists(Fqn)}
     *
-    * @param successful if <tt>true</tt>, changes made in the batch are committed.  If <tt>false</tt>, they are
-    *                   discarded.
-    * @see #startBatch()
-    * @since 1.0
+    * @param fqn string representation of an Fqn
+    * @return true if the fqn exists, false otherwise
     */
-   void endBatch(boolean successful);
+   boolean exists(String fqn);
 
    /**
-    * Adds a custom interceptor to the interceptor chain, at specified position, where the first interceptor in the
-    * chain is at position 0 and the last one at getInterceptorChain().size() - 1.
+    * Tests if an Fqn exists.
     *
-    * @param i        the interceptor to add
-    * @param position the position to add the interceptor
-    * @since 1.0
+    * @param fqn Fqn to test
+    * @return true if the fqn exists, false otherwise
     */
-   void addInterceptor(CommandInterceptor i, int position);
-
-   /**
-    * Adds a custom interceptor to the interceptor chain, after an instance of the specified interceptor type.  Throws a
-    * cache exception if it cannot find an interceptor of the specified type.
-    *
-    * @param i                interceptor to add
-    * @param afterInterceptor interceptor type after which to place custom interceptor
-    * @since 1.0
-    */
-   void addInterceptor(CommandInterceptor i, Class<? extends CommandInterceptor> afterInterceptor);
-
-   /**
-    * Removes the interceptor at a specified position, where the first interceptor in the chain is at position 0 and the
-    * last one at getInterceptorChain().size() - 1.
-    *
-    * @param position the position at which to remove an interceptor
-    * @since 1.0
-    */
-   void removeInterceptor(int position);
-
-   /**
-    * Removes the interceptor of specified type.
-    *
-    * @param interceptorType type of interceptor to remove
-    * @since 1.0
-    */
-   void removeInterceptor(Class<? extends CommandInterceptor> interceptorType);
-
-   Configuration getConfiguration();
-
-   Cache<K, V> getCache();
-
-   boolean exists(String fqn);
-
    boolean exists(Fqn fqn);
 }

Modified: core/branches/flat/src/main/java/org/horizon/tree/TreeCacheImpl.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/tree/TreeCacheImpl.java	2009-01-28 14:34:08 UTC (rev 7605)
+++ core/branches/flat/src/main/java/org/horizon/tree/TreeCacheImpl.java	2009-01-28 18:26:32 UTC (rev 7606)
@@ -23,16 +23,10 @@
 
 import org.horizon.Cache;
 import org.horizon.CacheException;
-import org.horizon.ComponentStatus;
 import org.horizon.atomic.AtomicMap;
-import org.horizon.config.Configuration;
-import org.horizon.context.InvocationContext;
-import org.horizon.interceptors.base.CommandInterceptor;
 import org.horizon.logging.Log;
 import org.horizon.logging.LogFactory;
-import org.horizon.remoting.transport.Address;
 
-import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
@@ -287,70 +281,10 @@
    }
 
    // ------------------ nothing different; just delegate to the cache
-   public void addListener(Object listener) {
-      cache.addListener(listener);
-   }
-
-   public void removeListener(Object listener) {
-      cache.removeListener(listener);
-   }
-
-   public Set<Object> getListeners() {
-      return cache.getListeners();
-   }
-
-   public void startBatch() {
-      cache.startBatch();
-   }
-
-   public void endBatch(boolean successful) {
-      cache.endBatch(successful);
-   }
-
-   public void addInterceptor(CommandInterceptor i, int position) {
-      cache.addInterceptor(i, position);
-   }
-
-   public void addInterceptor(CommandInterceptor i, Class<? extends CommandInterceptor> afterInterceptor) {
-      cache.addInterceptor(i, afterInterceptor);
-   }
-
-   public void removeInterceptor(int position) {
-      cache.removeInterceptor(position);
-   }
-
-   public void removeInterceptor(Class<? extends CommandInterceptor> interceptorType) {
-      cache.removeInterceptor(interceptorType);
-   }
-
-   public Configuration getConfiguration() {
-      return cache.getConfiguration();
-   }
-
    public Cache getCache() {
       return cache;
    }
 
-   public ComponentStatus getCacheStatus() {
-      return cache.getCacheStatus();
-   }
-
-   public InvocationContext getInvocationContext() {
-      return cache.getInvocationContext();
-   }
-
-   public void setInvocationContext(InvocationContext ctx) {
-      cache.setInvocationContext(ctx);
-   }
-
-   public Address getAddress() {
-      return cache.getCacheManager().getAddress();
-   }
-
-   public List<Address> getMembers() {
-      return cache.getCacheManager().getMembers();
-   }
-
    public void start() throws CacheException {
       cache.start();
       createRoot();

Modified: core/branches/flat/src/main/java/org/horizon/util/TestingUtil.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/util/TestingUtil.java	2009-01-28 14:34:08 UTC (rev 7605)
+++ core/branches/flat/src/main/java/org/horizon/util/TestingUtil.java	2009-01-28 18:26:32 UTC (rev 7606)
@@ -89,20 +89,6 @@
    }
 
    /**
-    * Injects an interceptor after a specified interceptor in a running cache.  Your new interceptor need not be
-    * initialised with pointers to the next interceptor, etc. as this method does all that for you, including calling
-    * setCache().
-    *
-    * @param cache                         running cache instance
-    * @param interceptorToInject           interceptor instance to inject.
-    * @param interceptorAfterWhichToInject class of interceptor to search for in the chain and after which to add your
-    *                                      interceptor
-    */
-   public static void injectInterceptor(CacheSPI<?, ?> cache, CommandInterceptor interceptorToInject, Class<? extends CommandInterceptor> interceptorAfterWhichToInject) {
-      cache.addInterceptor(interceptorToInject, interceptorAfterWhichToInject);
-   }
-
-   /**
     * Loops, continually calling {@link #areCacheViewsComplete(Cache[])} until it either returns true or
     * <code>timeout</code> ms have elapsed.
     *
@@ -628,4 +614,8 @@
       ComponentRegistry cr = extractComponentRegistry(cache);
       return cr.getComponent(componentType);
    }
+
+   public static TransactionManager getTransactionManager(Cache cache) {
+      return extractComponent(cache, TransactionManager.class);
+   }
 }

Copied: core/branches/flat/src/test/java/org/horizon/BaseClusteredTest.java (from rev 7605, core/branches/flat/src/test/java/org/horizon/BaseReplicatedTest.java)
===================================================================
--- core/branches/flat/src/test/java/org/horizon/BaseClusteredTest.java	                        (rev 0)
+++ core/branches/flat/src/test/java/org/horizon/BaseClusteredTest.java	2009-01-28 18:26:32 UTC (rev 7606)
@@ -0,0 +1,163 @@
+package org.horizon;
+
+import org.horizon.commands.DataCommand;
+import org.horizon.commands.VisitableCommand;
+import org.horizon.commands.tx.CommitCommand;
+import org.horizon.commands.tx.PrepareCommand;
+import org.horizon.config.Configuration;
+import org.horizon.config.GlobalConfiguration;
+import org.horizon.context.InvocationContext;
+import org.horizon.interceptors.base.CommandInterceptor;
+import org.horizon.manager.CacheManager;
+import org.horizon.manager.DefaultCacheManager;
+import org.horizon.util.TestingUtil;
+import org.testng.annotations.AfterMethod;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+public abstract class BaseClusteredTest {
+   ThreadLocal<List<CacheManager>> cacheManagerThreadLocal = new ThreadLocal<List<CacheManager>>() {
+      @Override
+      protected List<CacheManager> initialValue() {
+         return new LinkedList<CacheManager>();
+      }
+   };
+
+   /**
+    * @return a list of registered cache managers on the current thread.
+    */
+   protected List<CacheManager> getCacheManagers() {
+      return cacheManagerThreadLocal.get();
+   }
+
+   /**
+    * Creates a new cache manager, starts it, and adds it to the list of known cache managers on the current thread.
+    * Uses a default clustered cache manager global config.
+    *
+    * @return the new CacheManager
+    */
+   protected CacheManager addCacheManager() {
+      return addCacheManager(GlobalConfiguration.getClusteredDefault());
+   }
+
+   /**
+    * Creates a new cache manager, starts it, and adds it to the list of known cache managers on the current thread.
+    *
+    * @param globalConfig config to use
+    * @return the new CacheManager
+    */
+   protected CacheManager addCacheManager(GlobalConfiguration globalConfig) {
+      CacheManager cm = new DefaultCacheManager(globalConfig);
+      cacheManagerThreadLocal.get().add(cm);
+      return cm;
+   }
+
+   protected void defineCacheOnAllManagers(String cacheName, Configuration c) {
+      for (CacheManager cm : cacheManagerThreadLocal.get()) {
+         cm.defineCache(cacheName, c);
+      }
+   }
+
+   protected void assertClusterSize(String message, int size) {
+      for (CacheManager cm : cacheManagerThreadLocal.get()) {
+         assert cm.getMembers() != null && cm.getMembers().size() == size : message;
+      }
+   }
+
+   protected ReplListener attachReplicationListener(Cache c) {
+      return new ReplListener(c);
+   }
+
+   @AfterMethod
+   public void cleanupThreadLocals() {
+      TestingUtil.killCacheManagers(cacheManagerThreadLocal.get().toArray(new CacheManager[cacheManagerThreadLocal.get().size()]));
+      cacheManagerThreadLocal.get().clear();
+   }
+
+   protected static class ReplListener {
+      Cache c;
+      Set<Class<? extends VisitableCommand>> expectedCommands;
+      CountDownLatch latch = new CountDownLatch(1);
+
+      public ReplListener(Cache c) {
+         this.c = c;
+         this.c.addInterceptor(new ReplListenerInterceptor(), 0);
+      }
+
+      public void expectAny() {
+         expect();
+      }
+
+      public void expectWithTx(Class<? extends VisitableCommand>... commands) {
+         expect(PrepareCommand.class);
+         expect(commands);
+         //this is because for async replication we have an 1pc transaction
+         if (c.getConfiguration().getCacheMode().isSynchronous()) expect(CommitCommand.class);
+      }
+
+      public void expectAnyWithTx() {
+         expect(PrepareCommand.class);
+         //this is because for async replication we have an 1pc transaction
+         if (c.getConfiguration().getCacheMode().isSynchronous()) expect(CommitCommand.class);
+      }
+
+      public void expect(Class<? extends VisitableCommand>... expectedCommands) {
+         if (this.expectedCommands == null) {
+            this.expectedCommands = new HashSet<Class<? extends VisitableCommand>>();
+         }
+         this.expectedCommands.addAll(Arrays.asList(expectedCommands));
+      }
+
+      public void waitForReplication() {
+         waitForReplication(600, TimeUnit.SECONDS);
+      }
+
+      public void waitForReplication(long time, TimeUnit unit) {
+         assert expectedCommands != null : "there are no replication expectations; please use ReplListener.expect() before calling this method";
+         try {
+            if (!latch.await(time, unit)) {
+               assert false : "Waiting for more than " + time + " " + unit + " and following commands did not replicate: " + expectedCommands;
+            }
+         }
+         catch (InterruptedException e) {
+            throw new IllegalStateException("unexpected", e);
+         }
+         finally {
+            expectedCommands = null;
+            latch = new CountDownLatch(1);
+         }
+      }
+
+      protected class ReplListenerInterceptor extends CommandInterceptor {
+         @Override
+         protected Object handleDefault(InvocationContext ctx, VisitableCommand cmd) throws Throwable {
+            // first pass up chain
+            Object o = invokeNextInterceptor(ctx, cmd);
+            markAsVisited(cmd);
+            return o;
+         }
+
+         @Override
+         public Object visitPrepareCommand(InvocationContext ctx, PrepareCommand cmd) throws Throwable {
+            // first pass up chain
+            Object o = invokeNextInterceptor(ctx, cmd);
+            markAsVisited(cmd);
+            for (DataCommand mod : cmd.getModifications()) markAsVisited(mod);
+            return o;
+         }
+
+         private void markAsVisited(VisitableCommand cmd) {
+            if (expectedCommands != null) {
+               expectedCommands.remove(cmd.getClass());
+               if (expectedCommands.isEmpty()) latch.countDown();
+            }
+         }
+      }
+   }
+}

Deleted: core/branches/flat/src/test/java/org/horizon/BaseReplicatedTest.java
===================================================================
--- core/branches/flat/src/test/java/org/horizon/BaseReplicatedTest.java	2009-01-28 14:34:08 UTC (rev 7605)
+++ core/branches/flat/src/test/java/org/horizon/BaseReplicatedTest.java	2009-01-28 18:26:32 UTC (rev 7606)
@@ -1,68 +0,0 @@
-package org.horizon;
-
-import org.horizon.config.Configuration;
-import org.horizon.config.GlobalConfiguration;
-import org.horizon.manager.CacheManager;
-import org.horizon.manager.DefaultCacheManager;
-import org.horizon.util.TestingUtil;
-import org.testng.annotations.AfterMethod;
-
-import java.util.LinkedList;
-import java.util.List;
-
-public abstract class BaseReplicatedTest {
-   ThreadLocal<List<CacheManager>> cacheManagerThreadLocal = new ThreadLocal<List<CacheManager>>() {
-      @Override
-      protected List<CacheManager> initialValue() {
-         return new LinkedList<CacheManager>();
-      }
-   };
-
-   /**
-    * @return a list of registered cache managers on the current thread.
-    */
-   protected List<CacheManager> getCacheManagers() {
-      return cacheManagerThreadLocal.get();
-   }
-
-   /**
-    * Creates a new cache manager, starts it, and adds it to the list of known cache managers on the current thread.
-    * Uses a default clustered cache manager global config.
-    *
-    * @return the new CacheManager
-    */
-   protected CacheManager addCacheManager() {
-      return addCacheManager(GlobalConfiguration.getClusteredDefault());
-   }
-
-   /**
-    * Creates a new cache manager, starts it, and adds it to the list of known cache managers on the current thread.
-    *
-    * @param globalConfig config to use
-    * @return the new CacheManager
-    */
-   protected CacheManager addCacheManager(GlobalConfiguration globalConfig) {
-      CacheManager cm = new DefaultCacheManager(globalConfig);
-      cacheManagerThreadLocal.get().add(cm);
-      return cm;
-   }
-
-   protected void defineCacheOnAllManagers(String cacheName, Configuration c) {
-      for (CacheManager cm : cacheManagerThreadLocal.get()) {
-         cm.defineCache(cacheName, c);
-      }
-   }
-
-   protected void assertClusterSize(String message, int size) {
-      for (CacheManager cm : cacheManagerThreadLocal.get()) {
-         assert cm.getMembers() != null && cm.getMembers().size() == size : message;
-      }
-   }
-
-   @AfterMethod
-   public void cleanupThreadLocals() {
-      TestingUtil.killCacheManagers(cacheManagerThreadLocal.get().toArray(new CacheManager[cacheManagerThreadLocal.get().size()]));
-      cacheManagerThreadLocal.get().clear();
-   }
-
-}

Modified: core/branches/flat/src/test/java/org/horizon/api/mvcc/PutForExternalReadTest.java
===================================================================
--- core/branches/flat/src/test/java/org/horizon/api/mvcc/PutForExternalReadTest.java	2009-01-28 14:34:08 UTC (rev 7605)
+++ core/branches/flat/src/test/java/org/horizon/api/mvcc/PutForExternalReadTest.java	2009-01-28 18:26:32 UTC (rev 7606)
@@ -2,23 +2,21 @@
 
 import org.easymock.EasyMock;
 import static org.easymock.EasyMock.*;
+import org.horizon.BaseClusteredTest;
 import org.horizon.Cache;
-import org.horizon.CacheSPI;
-import org.horizon.UnitTestCacheFactory;
 import org.horizon.commands.RPCCommand;
 import org.horizon.commands.write.PutKeyValueCommand;
 import org.horizon.commands.write.RemoveCommand;
 import org.horizon.config.Configuration;
 import org.horizon.factories.ComponentRegistry;
+import org.horizon.manager.CacheManager;
 import org.horizon.remoting.RPCManager;
 import org.horizon.remoting.ResponseMode;
 import org.horizon.remoting.transport.Address;
 import org.horizon.transaction.DummyTransactionManagerLookup;
 import org.horizon.transaction.TransactionTable;
 import org.horizon.util.TestingUtil;
-import org.horizon.util.internals.ReplicationListener;
 import static org.testng.AssertJUnit.*;
-import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
@@ -26,52 +24,37 @@
 import javax.transaction.TransactionManager;
 import java.util.List;
 
- at Test(groups = {"functional", "jgroups", "transaction"})
-public class PutForExternalReadTest {
-   protected final String key = "k", value = "v", value2 = "v2";
+ at Test(groups = "functional", sequential = true)
+public class PutForExternalReadTest extends BaseClusteredTest {
+   final String key = "k", value = "v", value2 = "v2";
+   Cache cache1, cache2;
+   TransactionManager tm1, tm2;
+   ReplListener replListener1, replListener2;
 
-   protected CacheSPI<String, String> cache1, cache2;
-
-   ReplicationListener replListener1;
-   ReplicationListener replListener2;
-
-   protected TransactionManager tm1, tm2;
-
-   protected boolean useTx;
-
-
    @BeforeMethod(alwaysRun = true)
    public void setUp() {
-
-      UnitTestCacheFactory<String, String> cf = new UnitTestCacheFactory<String, String>();
       Configuration c = new Configuration();
       c.setCacheMode(Configuration.CacheMode.REPL_SYNC);
-      cache1 = (CacheSPI<String, String>) cf.createCache(c, false);
-      cache1.getConfiguration().setTransactionManagerLookupClass(DummyTransactionManagerLookup.class.getName());
-      // TODO fix this
-//      cache1.getConfiguration().setSerializationExecutorPoolSize(0);//this is very important for async tests!
+      c.setTransactionManagerLookupClass(DummyTransactionManagerLookup.class.getName());
+      CacheManager cm1 = addCacheManager();
+      CacheManager cm2 = addCacheManager();
+      defineCacheOnAllManagers("replSync", c);
 
-      cache1.start();
-      tm1 = cache1.getConfiguration().getRuntimeConfig().getTransactionManager();
+      cache1 = cm1.getCache("replSync");
+      cache2 = cm2.getCache("replSync");
 
-      cache2 = (CacheSPI<String, String>) cf.createCache(cache1.getConfiguration().clone());
-
-      tm2 = cache2.getConfiguration().getRuntimeConfig().getTransactionManager();
-      replListener1 = new ReplicationListener(cache1);
-      replListener2 = new ReplicationListener(cache2);
-
+      tm1 = TestingUtil.getTransactionManager(cache1);
+      tm2 = TestingUtil.getTransactionManager(cache2);
       TestingUtil.blockUntilViewsReceived(10000, cache1, cache2);
-   }
 
-   @AfterMethod(alwaysRun = true)
-   public void tearDown() {
-      TestingUtil.killCaches(cache1, cache2);
+      replListener1 = attachReplicationListener(cache1);
+      replListener2 = attachReplicationListener(cache2);
    }
 
    public void testNoOpWhenKeyPresent() {
       replListener2.expect(PutKeyValueCommand.class);
       cache1.putForExternalRead(key, value);
-      replListener2.waitForReplicationToOccur();
+      replListener2.waitForReplication();
 
 
       assertEquals("PFER should have succeeded", value, cache1.get(key));
@@ -80,14 +63,14 @@
       // reset
       replListener2.expect(RemoveCommand.class);
       cache1.remove(key);
-      replListener2.waitForReplicationToOccur();
+      replListener2.waitForReplication();
 
       assert cache1.isEmpty() : "Should have reset";
       assert cache2.isEmpty() : "Should have reset";
 
       replListener2.expect(PutKeyValueCommand.class);
       cache1.put(key, value);
-      replListener2.waitForReplicationToOccur();
+      replListener2.waitForReplication();
 
       // now this pfer should be a no-op
       cache1.putForExternalRead(key, value2);
@@ -135,7 +118,7 @@
       // create parent node first
       replListener2.expect(PutKeyValueCommand.class);
       cache1.put(key + "0", value);
-      replListener2.waitForReplicationToOccur();
+      replListener2.waitForReplication();
 
       // start a tx and do some stuff.
       replListener2.expect(PutKeyValueCommand.class);
@@ -144,7 +127,7 @@
       cache1.putForExternalRead(key, value); // should have happened in a separate tx and have committed already.
       Transaction t = tm1.suspend();
 
-      replListener2.waitForReplicationToOccur();
+      replListener2.waitForReplication();
       assertEquals("PFER should have completed", value, cache1.get(key));
       assertEquals("PFER should have completed", value, cache2.get(key));
 
@@ -200,7 +183,7 @@
 
       replListener2.expect(PutKeyValueCommand.class);
       cache1.putForExternalRead(key, value);
-      replListener2.waitForReplicationToOccur();
+      replListener2.waitForReplication();
 
       assertEquals("PFER updated cache1", value, cache1.get(key));
       assertEquals("PFER propagated to cache2 as expected", value, cache2.get(key));
@@ -243,7 +226,7 @@
       tm1.begin();
       cache1.putForExternalRead(key, value);
       tm1.commit();
-      replListener2.waitForReplicationToOccur();
+      replListener2.waitForReplication();
 
       TransactionTable tt1 = getTransactionTable(cache1);
       TransactionTable tt2 = getTransactionTable(cache2);
@@ -260,7 +243,7 @@
       cache1.putForExternalRead(key, value);
       cache1.put(key, value);
       tm1.commit();
-      replListener2.waitForReplicationToOccur();
+      replListener2.waitForReplication();
 
       assert tt1.getNumGlobalTransactions() == 0 : "Cache 1 should have no stale global TXs";
       assert tt1.getNumLocalTransactions() == 0 : "Cache 1 should have no stale local TXs";
@@ -272,7 +255,7 @@
       cache1.put(key, value);
       cache1.putForExternalRead(key, value);
       tm1.commit();
-      replListener2.waitForReplicationToOccur();
+      replListener2.waitForReplication();
 
       assert tt1.getNumGlobalTransactions() == 0 : "Cache 1 should have no stale global TXs";
       assert tt1.getNumLocalTransactions() == 0 : "Cache 1 should have no stale local TXs";
@@ -286,7 +269,7 @@
       cache1.putForExternalRead(key, value);
       cache1.put(key, value);
       tm1.commit();
-      replListener2.waitForReplicationToOccur();
+      replListener2.waitForReplication();
 
       assert tt1.getNumGlobalTransactions() == 0 : "Cache 1 should have no stale global TXs";
       assert tt1.getNumLocalTransactions() == 0 : "Cache 1 should have no stale local TXs";

Modified: core/branches/flat/src/test/java/org/horizon/api/tree/NodeAPITest.java
===================================================================
--- core/branches/flat/src/test/java/org/horizon/api/tree/NodeAPITest.java	2009-01-28 14:34:08 UTC (rev 7605)
+++ core/branches/flat/src/test/java/org/horizon/api/tree/NodeAPITest.java	2009-01-28 18:26:32 UTC (rev 7606)
@@ -1,8 +1,8 @@
 package org.horizon.api.tree;
 
-import org.horizon.CacheSPI;
-import org.horizon.UnitTestCacheFactory;
 import org.horizon.config.Configuration;
+import org.horizon.manager.CacheManager;
+import org.horizon.manager.DefaultCacheManager;
 import org.horizon.transaction.DummyTransactionManager;
 import org.horizon.transaction.DummyTransactionManagerLookup;
 import org.horizon.tree.Fqn;
@@ -11,7 +11,6 @@
 import org.horizon.tree.TreeCacheImpl;
 import org.horizon.util.TestingUtil;
 import static org.testng.AssertJUnit.*;
-import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
@@ -26,35 +25,26 @@
  *
  * @author <a href="mailto:manik AT jboss DOT org">Manik Surtani</a>
  */
- at Test(groups = "functional")
+ at Test(groups = "functional", sequential = true)
 public class NodeAPITest {
-   protected ThreadLocal<TreeCache<Object, Object>> cacheTL = new ThreadLocal<TreeCache<Object, Object>>();
-   protected static final Fqn A = Fqn.fromString("/a"), B = Fqn.fromString("/b"), C = Fqn.fromString("/c"), D = Fqn.fromString("/d");
-   protected Fqn A_B = Fqn.fromRelativeFqn(A, B);
-   protected Fqn A_C = Fqn.fromRelativeFqn(A, C);
-   protected TransactionManager tm;
+   static final Fqn A = Fqn.fromString("/a"), B = Fqn.fromString("/b"), C = Fqn.fromString("/c"), D = Fqn.fromString("/d");
+   Fqn A_B = Fqn.fromRelativeFqn(A, B);
+   Fqn A_C = Fqn.fromRelativeFqn(A, C);
+   TransactionManager tm;
+   TreeCache cache;
 
    @BeforeMethod(alwaysRun = true)
    public void setUp() throws Exception {
       // start a single cache instance
       Configuration c = new Configuration();
       c.setTransactionManagerLookupClass(DummyTransactionManagerLookup.class.getName());
-      CacheSPI<Object, Object> cache = (CacheSPI<Object, Object>) new UnitTestCacheFactory<Object, Object>().createCache(c, false);
-      cache.getConfiguration().setInvocationBatchingEnabled(true);
-      cache.start();
-      cacheTL.set(new TreeCacheImpl(cache));
-      tm = cache.getTransactionManager();
+      c.setInvocationBatchingEnabled(true);
+      CacheManager cm = new DefaultCacheManager(c);
+      cache = new TreeCacheImpl(cm.getCache());
+      tm = TestingUtil.getTransactionManager(cache.getCache());
    }
 
-   @AfterMethod(alwaysRun = true)
-   public void tearDown() {
-      TreeCache<Object, Object> cache = cacheTL.get();
-      TestingUtil.killTreeCaches(cache);
-      cacheTL.set(null);
-   }
-
    public void testAddingData() {
-      TreeCache<Object, Object> cache = cacheTL.get();
       Node<Object, Object> rootNode = cache.getRoot();
       Node<Object, Object> nodeA = rootNode.addChild(A);
       nodeA.put("key", "value");
@@ -63,7 +53,6 @@
    }
 
    public void testAddingDataTx() throws Exception {
-      TreeCache<Object, Object> cache = cacheTL.get();
       Node<Object, Object> rootNode = cache.getRoot();
       tm.begin();
       Node<Object, Object> nodeA = rootNode.addChild(A);
@@ -74,7 +63,6 @@
    }
 
    public void testOverwritingDataTx() throws Exception {
-      TreeCache<Object, Object> cache = cacheTL.get();
       Node<Object, Object> rootNode = cache.getRoot();
 
       Node<Object, Object> nodeA = rootNode.addChild(A);
@@ -93,7 +81,7 @@
     * Remember, Fqns are relative!!
     */
    public void testParentsAndChildren() {
-      TreeCache<Object, Object> cache = cacheTL.get();
+
       Node<Object, Object> rootNode = cache.getRoot();
 
       Node<Object, Object> nodeA = rootNode.addChild(A);
@@ -138,7 +126,7 @@
 
 
    public void testImmutabilityOfData() {
-      TreeCache<Object, Object> cache = cacheTL.get();
+
       Node<Object, Object> rootNode = cache.getRoot();
 
       rootNode.put("key", "value");
@@ -161,7 +149,7 @@
    }
 
    public void testDefensiveCopyOfData() {
-      TreeCache<Object, Object> cache = cacheTL.get();
+
       Node<Object, Object> rootNode = cache.getRoot();
 
       rootNode.put("key", "value");
@@ -187,7 +175,7 @@
    }
 
    public void testDefensiveCopyOfChildren() {
-      TreeCache<Object, Object> cache = cacheTL.get();
+
       Node<Object, Object> rootNode = cache.getRoot();
 
       Fqn childFqn = Fqn.fromString("/child");
@@ -215,7 +203,7 @@
 
 
    public void testImmutabilityOfChildren() {
-      TreeCache<Object, Object> cache = cacheTL.get();
+
       Node<Object, Object> rootNode = cache.getRoot();
 
       rootNode.addChild(A);
@@ -230,7 +218,7 @@
    }
 
    public void testGetChildAPI() {
-      TreeCache<Object, Object> cache = cacheTL.get();
+
       Node<Object, Object> rootNode = cache.getRoot();
 
       // creates a Node<Object, Object> with fqn /a/b/c
@@ -249,7 +237,7 @@
    }
 
    public void testClearingData() {
-      TreeCache<Object, Object> cache = cacheTL.get();
+
       Node<Object, Object> rootNode = cache.getRoot();
 
       rootNode.put("k", "v");
@@ -261,7 +249,7 @@
    }
 
    public void testClearingDataTx() throws Exception {
-      TreeCache<Object, Object> cache = cacheTL.get();
+
       Node<Object, Object> rootNode = cache.getRoot();
 
       tm.begin();
@@ -276,7 +264,7 @@
    }
 
    public void testPutData() {
-      TreeCache<Object, Object> cache = cacheTL.get();
+
       Node<Object, Object> rootNode = cache.getRoot();
 
       assertTrue(rootNode.getData().isEmpty());
@@ -311,7 +299,7 @@
    }
 
    public void testGetChildrenNames() throws Exception {
-      TreeCache<Object, Object> cache = cacheTL.get();
+
       Node<Object, Object> rootNode = cache.getRoot();
 
       rootNode.addChild(A).put("k", "v");
@@ -334,8 +322,8 @@
    }
 
    public void testDoubleRemovalOfData() throws Exception {
-      TreeCache<Object, Object> cache = cacheTL.get();
 
+
       assert DummyTransactionManager.getInstance().getTransaction() == null;
       cache.put("/foo/1/2/3", "item", 1);
       assert DummyTransactionManager.getInstance().getTransaction() == null;
@@ -356,8 +344,8 @@
    }
 
    public void testDoubleRemovalOfData2() throws Exception {
-      TreeCache<Object, Object> cache = cacheTL.get();
 
+
       cache.put("/foo/1/2", "item", 1);
       tm.begin();
       assertEquals(cache.get("/foo/1", "item"), null);

Modified: core/branches/flat/src/test/java/org/horizon/api/tree/NodeReplicatedMoveTest.java
===================================================================
--- core/branches/flat/src/test/java/org/horizon/api/tree/NodeReplicatedMoveTest.java	2009-01-28 14:34:08 UTC (rev 7605)
+++ core/branches/flat/src/test/java/org/horizon/api/tree/NodeReplicatedMoveTest.java	2009-01-28 18:26:32 UTC (rev 7606)
@@ -6,8 +6,10 @@
  */
 package org.horizon.api.tree;
 
-import org.horizon.UnitTestCacheFactory;
+import org.horizon.BaseClusteredTest;
+import org.horizon.Cache;
 import org.horizon.config.Configuration;
+import org.horizon.manager.CacheManager;
 import org.horizon.transaction.DummyTransactionManagerLookup;
 import org.horizon.tree.Fqn;
 import org.horizon.tree.Node;
@@ -16,56 +18,46 @@
 import org.horizon.util.TestingUtil;
 import static org.testng.AssertJUnit.assertEquals;
 import static org.testng.AssertJUnit.assertNull;
-import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
 import javax.transaction.TransactionManager;
 
- at Test(groups = {"functional", "jgroups", "pessimistic"}, testName = "api.NodeReplicatedMoveTest")
-public class NodeReplicatedMoveTest {
-   private class NodeReplicatedMoveTestTL {
-      protected TreeCache<Object, Object> cache1;
-      protected TreeCache<Object, Object> cache2;
-      protected TransactionManager tm;
-   }
+ at Test(groups = "functional", sequential = true, testName = "api.NodeReplicatedMoveTest")
+public class NodeReplicatedMoveTest extends BaseClusteredTest {
 
-   protected ThreadLocal<NodeReplicatedMoveTestTL> threadLocal = new ThreadLocal<NodeReplicatedMoveTestTL>();
+   static final Fqn A = Fqn.fromString("/a"), B = Fqn.fromString("/b"), C = Fqn.fromString("/c"), D = Fqn.fromString("/d"), E = Fqn.fromString("/e");
+   static final Object k = "key", vA = "valueA", vB = "valueB", vC = "valueC", vD = "valueD", vE = "valueE";
 
-   protected static final Fqn A = Fqn.fromString("/a"), B = Fqn.fromString("/b"), C = Fqn.fromString("/c"), D = Fqn.fromString("/d"), E = Fqn.fromString("/e");
-   protected static final Object k = "key", vA = "valueA", vB = "valueB", vC = "valueC", vD = "valueD", vE = "valueE";
+   TreeCache<Object, Object> cache1, cache2;
+   TransactionManager tm1;
 
    @BeforeMethod(alwaysRun = true)
    public void setUp() throws Exception {
-      NodeReplicatedMoveTestTL tl = new NodeReplicatedMoveTestTL();
-      threadLocal.set(tl);
       Configuration c = new Configuration();
       c.setInvocationBatchingEnabled(true);
       c.setCacheMode(Configuration.CacheMode.REPL_SYNC);
       c.setTransactionManagerLookupClass(DummyTransactionManagerLookup.class.getName());
       c.setSyncCommitPhase(true);
       c.setSyncRollbackPhase(true);
-      // start a single cache instance
-      tl.cache1 = new TreeCacheImpl<Object, Object>(new UnitTestCacheFactory<Object, Object>().createCache(c.clone()));
-      tl.cache2 = new TreeCacheImpl<Object, Object>(new UnitTestCacheFactory<Object, Object>().createCache(c.clone()));
-      tl.tm = tl.cache1.getConfiguration().getRuntimeConfig().getTransactionManager();
 
-      TestingUtil.blockUntilViewsReceived(10000, tl.cache1.getCache(), tl.cache2.getCache());
-   }
+      CacheManager cm1 = addCacheManager();
+      CacheManager cm2 = addCacheManager();
 
-   @AfterMethod(alwaysRun = true)
-   public void tearDown() {
-      NodeReplicatedMoveTestTL tl = threadLocal.get();
-      if (tl != null) {
-         TestingUtil.killTreeCaches(tl.cache1, tl.cache2);
-         threadLocal.set(null);
-      }
-   }
+      defineCacheOnAllManagers("replSync", c);
 
+      Cache c1 = cm1.getCache("replSync");
+      Cache c2 = cm2.getCache("replSync");
 
+      tm1 = TestingUtil.getTransactionManager(c1);
+      TestingUtil.blockUntilViewsReceived(10000, cm1, cm2);
+
+      cache1 = new TreeCacheImpl<Object, Object>(c1);
+      cache2 = new TreeCacheImpl<Object, Object>(c2);
+   }
+
    public void testReplicatability() {
-      NodeReplicatedMoveTestTL tl = threadLocal.get();
-      Node<Object, Object> rootNode = tl.cache1.getRoot();
+      Node<Object, Object> rootNode = cache1.getRoot();
 
       Node<Object, Object> nodeA = rootNode.addChild(A);
       Node<Object, Object> nodeB = nodeA.addChild(B);
@@ -73,25 +65,24 @@
       nodeA.put(k, vA);
       nodeB.put(k, vB);
 
-      assertEquals(vA, tl.cache1.getRoot().getChild(A).get(k));
-      assertEquals(vB, tl.cache1.getRoot().getChild(A).getChild(B).get(k));
+      assertEquals(vA, cache1.getRoot().getChild(A).get(k));
+      assertEquals(vB, cache1.getRoot().getChild(A).getChild(B).get(k));
 
-      assertEquals(vA, tl.cache2.getRoot().getChild(A).get(k));
-      assertEquals(vB, tl.cache2.getRoot().getChild(A).getChild(B).get(k));
+      assertEquals(vA, cache2.getRoot().getChild(A).get(k));
+      assertEquals(vB, cache2.getRoot().getChild(A).getChild(B).get(k));
 
       // now move...
-      tl.cache1.move(nodeB.getFqn(), Fqn.ROOT);
+      cache1.move(nodeB.getFqn(), Fqn.ROOT);
 
-      assertEquals(vA, tl.cache1.getRoot().getChild(A).get(k));
-      assertEquals(vB, tl.cache1.getRoot().getChild(B).get(k));
+      assertEquals(vA, cache1.getRoot().getChild(A).get(k));
+      assertEquals(vB, cache1.getRoot().getChild(B).get(k));
 
-      assertEquals(vA, tl.cache2.getRoot().getChild(A).get(k));
-      assertEquals(vB, tl.cache2.getRoot().getChild(B).get(k));
+      assertEquals(vA, cache2.getRoot().getChild(A).get(k));
+      assertEquals(vB, cache2.getRoot().getChild(B).get(k));
    }
 
    public void testReplTxCommit() throws Exception {
-      NodeReplicatedMoveTestTL tl = threadLocal.get();
-      Node<Object, Object> rootNode = tl.cache1.getRoot();
+      Node<Object, Object> rootNode = cache1.getRoot();
       Fqn A_B = Fqn.fromRelativeFqn(A, B);
       Node<Object, Object> nodeA = rootNode.addChild(A);
       Node<Object, Object> nodeB = nodeA.addChild(B);
@@ -99,54 +90,53 @@
       nodeA.put(k, vA);
       nodeB.put(k, vB);
 
-      assertEquals(vA, tl.cache1.getRoot().getChild(A).get(k));
-      assertEquals(vB, tl.cache1.getRoot().getChild(A).getChild(B).get(k));
+      assertEquals(vA, cache1.getRoot().getChild(A).get(k));
+      assertEquals(vB, cache1.getRoot().getChild(A).getChild(B).get(k));
 
-      assertEquals(vA, tl.cache2.getRoot().getChild(A).get(k));
-      assertEquals(vB, tl.cache2.getRoot().getChild(A).getChild(B).get(k));
+      assertEquals(vA, cache2.getRoot().getChild(A).get(k));
+      assertEquals(vB, cache2.getRoot().getChild(A).getChild(B).get(k));
 
       // now move...
-      tl.tm.begin();
-      tl.cache1.move(nodeB.getFqn(), Fqn.ROOT);
+      tm1.begin();
+      cache1.move(nodeB.getFqn(), Fqn.ROOT);
 
-      assertEquals(vA, tl.cache1.get(A, k));
-      assertNull(tl.cache1.get(A_B, k));
-      assertEquals(vB, tl.cache1.get(B, k));
-      tl.tm.commit();
+      assertEquals(vA, cache1.get(A, k));
+      assertNull(cache1.get(A_B, k));
+      assertEquals(vB, cache1.get(B, k));
+      tm1.commit();
 
-      assertEquals(vA, tl.cache1.getRoot().getChild(A).get(k));
-      assertEquals(vB, tl.cache1.getRoot().getChild(B).get(k));
-      assertEquals(vA, tl.cache2.getRoot().getChild(A).get(k));
-      assertEquals(vB, tl.cache2.getRoot().getChild(B).get(k));
+      assertEquals(vA, cache1.getRoot().getChild(A).get(k));
+      assertEquals(vB, cache1.getRoot().getChild(B).get(k));
+      assertEquals(vA, cache2.getRoot().getChild(A).get(k));
+      assertEquals(vB, cache2.getRoot().getChild(B).get(k));
 
    }
 
    public void testReplTxRollback() throws Exception {
-      NodeReplicatedMoveTestTL tl = threadLocal.get();
-      Node<Object, Object> rootNode = tl.cache1.getRoot();
+      Node<Object, Object> rootNode = cache1.getRoot();
       Node<Object, Object> nodeA = rootNode.addChild(A);
       Node<Object, Object> nodeB = nodeA.addChild(B);
 
       nodeA.put(k, vA);
       nodeB.put(k, vB);
 
-      assertEquals(vA, tl.cache1.getRoot().getChild(A).get(k));
-      assertEquals(vB, tl.cache1.getRoot().getChild(A).getChild(B).get(k));
-      assertEquals(vA, tl.cache2.getRoot().getChild(A).get(k));
-      assertEquals(vB, tl.cache2.getRoot().getChild(A).getChild(B).get(k));
+      assertEquals(vA, cache1.getRoot().getChild(A).get(k));
+      assertEquals(vB, cache1.getRoot().getChild(A).getChild(B).get(k));
+      assertEquals(vA, cache2.getRoot().getChild(A).get(k));
+      assertEquals(vB, cache2.getRoot().getChild(A).getChild(B).get(k));
 
       // now move...
-      tl.tm.begin();
-      tl.cache1.move(nodeB.getFqn(), Fqn.ROOT);
+      tm1.begin();
+      cache1.move(nodeB.getFqn(), Fqn.ROOT);
 
-      assertEquals(vA, tl.cache1.get(A, k));
-      assertEquals(vB, tl.cache1.get(B, k));
+      assertEquals(vA, cache1.get(A, k));
+      assertEquals(vB, cache1.get(B, k));
 
-      tl.tm.rollback();
+      tm1.rollback();
 
-      assertEquals(vA, tl.cache1.getRoot().getChild(A).get(k));
-      assertEquals(vB, tl.cache1.getRoot().getChild(A).getChild(B).get(k));
-      assertEquals(vA, tl.cache2.getRoot().getChild(A).get(k));
-      assertEquals(vB, tl.cache2.getRoot().getChild(A).getChild(B).get(k));
+      assertEquals(vA, cache1.getRoot().getChild(A).get(k));
+      assertEquals(vB, cache1.getRoot().getChild(A).getChild(B).get(k));
+      assertEquals(vA, cache2.getRoot().getChild(A).get(k));
+      assertEquals(vB, cache2.getRoot().getChild(A).getChild(B).get(k));
    }
 }

Modified: core/branches/flat/src/test/java/org/horizon/api/tree/SyncReplTest.java
===================================================================
--- core/branches/flat/src/test/java/org/horizon/api/tree/SyncReplTest.java	2009-01-28 14:34:08 UTC (rev 7605)
+++ core/branches/flat/src/test/java/org/horizon/api/tree/SyncReplTest.java	2009-01-28 18:26:32 UTC (rev 7606)
@@ -7,17 +7,16 @@
 
 package org.horizon.api.tree;
 
+import org.horizon.BaseClusteredTest;
 import org.horizon.Cache;
-import org.horizon.CacheSPI;
-import org.horizon.UnitTestCacheFactory;
 import org.horizon.config.Configuration;
+import org.horizon.manager.CacheManager;
 import org.horizon.tree.Fqn;
 import org.horizon.tree.Node;
 import org.horizon.tree.TreeCache;
 import org.horizon.tree.TreeCacheImpl;
 import org.horizon.util.TestingUtil;
 import static org.testng.AssertJUnit.*;
-import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
@@ -27,37 +26,31 @@
 /**
  * @author <a href="mailto:manik AT jboss DOT org">Manik Surtani (manik AT jboss DOT org)</a>
  */
- at Test(groups = {"functional", "jgroups", "pessimistic"}, sequential = true, testName = "api.SyncReplTest")
-public class SyncReplTest {
-   private CacheSPI<Object, Object> c1, c2;
+ at Test(groups = "functional", sequential = true, testName = "api.SyncReplTest")
+public class SyncReplTest extends BaseClusteredTest {
    private TreeCache<Object, Object> cache1, cache2;
 
    @BeforeMethod(alwaysRun = true)
    public void setUp() {
-      System.out.println("*** In setUp()");
       Configuration c = new Configuration();
       c.setCacheMode(Configuration.CacheMode.REPL_SYNC);
       c.setInvocationBatchingEnabled(true);
       c.setFetchInMemoryState(false);
 
-      c1 = (CacheSPI<Object, Object>) new UnitTestCacheFactory<Object, Object>().createCache(c.clone());
-      c2 = (CacheSPI<Object, Object>) new UnitTestCacheFactory<Object, Object>().createCache(c.clone());
+      CacheManager cm1 = addCacheManager();
+      CacheManager cm2 = addCacheManager();
 
+      defineCacheOnAllManagers("replSync", c);
+
+      Cache c1 = cm1.getCache("replSync");
+      Cache c2 = cm2.getCache("replSync");
+
       TestingUtil.blockUntilViewsReceived(new Cache[]{c1, c2}, 5000);
 
       cache1 = new TreeCacheImpl<Object, Object>(c1);
       cache2 = new TreeCacheImpl<Object, Object>(c2);
-
-      System.out.println("*** Finished setUp()");
    }
 
-   @AfterMethod(alwaysRun = true)
-   public void tearDown() {
-      TestingUtil.killCaches(c1, c2);
-      cache1 = null;
-      cache2 = null;
-   }
-
    public void testBasicOperation() {
       assertClusterSize("Should only be 2  caches in the cluster!!!", 2);
 
@@ -82,8 +75,8 @@
       assertClusterSize("Should only be 2  caches in the cluster!!!", 2);
 
       Fqn fqn = Fqn.fromString("/JSESSIONID/1010.10.5:3000/1234567890/1");
-      cache1.getConfiguration().setSyncCommitPhase(true);
-      cache2.getConfiguration().setSyncCommitPhase(true);
+      cache1.getCache().getConfiguration().setSyncCommitPhase(true);
+      cache2.getCache().getConfiguration().setSyncCommitPhase(true);
 
 
       cache1.put(fqn, "age", 38);
@@ -100,28 +93,18 @@
       Map<Object, Object> map = new HashMap<Object, Object>();
       map.put("1", "1");
       map.put("2", "2");
-      cache1.getInvocationContext().getOptionOverrides().setSuppressLocking(true);
+      cache1.getCache().getInvocationContext().getOptionOverrides().setSuppressLocking(true);
       cache1.getRoot().addChild(fqn).putAll(map);
-      cache1.getInvocationContext().getOptionOverrides().setSuppressLocking(true);
+      cache1.getCache().getInvocationContext().getOptionOverrides().setSuppressLocking(true);
       assertEquals("Value should be set", "1", cache1.get(fqn, "1"));
 
       map = new HashMap<Object, Object>();
       map.put("3", "3");
       map.put("4", "4");
-      cache1.getInvocationContext().getOptionOverrides().setSuppressLocking(true);
+      cache1.getCache().getInvocationContext().getOptionOverrides().setSuppressLocking(true);
       cache1.getRoot().addChild(fqn1).putAll(map);
 
-      cache1.getInvocationContext().getOptionOverrides().setSuppressLocking(true);
+      cache1.getCache().getInvocationContext().getOptionOverrides().setSuppressLocking(true);
       assertEquals("Value should be set", "2", cache1.get(fqn, "2"));
    }
-
-
-   private void assertClusterSize(String message, int size) {
-      assertClusterSize(message, size, cache1);
-      assertClusterSize(message, size, cache2);
-   }
-
-   private void assertClusterSize(String message, int size, TreeCache c) {
-      assertEquals(message, size, c.getMembers().size());
-   }
 }

Modified: core/branches/flat/src/test/java/org/horizon/api/tree/SyncReplTxTest.java
===================================================================
--- core/branches/flat/src/test/java/org/horizon/api/tree/SyncReplTxTest.java	2009-01-28 14:34:08 UTC (rev 7605)
+++ core/branches/flat/src/test/java/org/horizon/api/tree/SyncReplTxTest.java	2009-01-28 18:26:32 UTC (rev 7606)
@@ -7,10 +7,10 @@
 
 package org.horizon.api.tree;
 
+import org.horizon.BaseClusteredTest;
 import org.horizon.Cache;
-import org.horizon.CacheSPI;
-import org.horizon.UnitTestCacheFactory;
 import org.horizon.config.Configuration;
+import org.horizon.manager.CacheManager;
 import org.horizon.transaction.DummyTransactionManagerLookup;
 import org.horizon.tree.Fqn;
 import org.horizon.tree.Node;
@@ -18,7 +18,6 @@
 import org.horizon.tree.TreeCacheImpl;
 import org.horizon.util.TestingUtil;
 import static org.testng.AssertJUnit.*;
-import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
@@ -28,22 +27,16 @@
 import javax.transaction.RollbackException;
 import javax.transaction.SystemException;
 import javax.transaction.TransactionManager;
-import java.util.ArrayList;
-import java.util.List;
 
 /**
  * @author <a href="mailto:manik AT jboss DOT org">Manik Surtani (manik AT jboss DOT org)</a>
  */
- at Test(groups = {"functional", "jgroups", "transaction", "pessimistic"}, sequential = true, testName = "api.SyncReplTxTest")
-public class SyncReplTxTest {
-   private List<CacheSPI<Object, Object>> flatCaches;
-   private List<TreeCache<Object, Object>> caches;
+ at Test(groups = "functional", sequential = true, testName = "api.SyncReplTxTest")
+public class SyncReplTxTest extends BaseClusteredTest {
+   TreeCache<Object, Object> cache1, cache2;
 
    @BeforeMethod(alwaysRun = true)
    public void setUp() throws CloneNotSupportedException {
-      System.out.println("*** In setUp()");
-      caches = new ArrayList<TreeCache<Object, Object>>();
-      flatCaches = new ArrayList<CacheSPI<Object, Object>>();
       Configuration c = new Configuration();
       c.setCacheMode(Configuration.CacheMode.REPL_SYNC);
       c.setFetchInMemoryState(false);
@@ -52,30 +45,22 @@
       c.setSyncRollbackPhase(true);
       c.setTransactionManagerLookupClass(DummyTransactionManagerLookup.class.getName());
 
-      CacheSPI<Object, Object> cache1 = (CacheSPI<Object, Object>) new UnitTestCacheFactory<Object, Object>().createCache(c.clone());
-      CacheSPI<Object, Object> cache2 = (CacheSPI<Object, Object>) new UnitTestCacheFactory<Object, Object>().createCache(c.clone());
+      CacheManager cm1 = addCacheManager();
+      CacheManager cm2 = addCacheManager();
 
-      flatCaches.add(cache1);
-      flatCaches.add(cache2);
+      defineCacheOnAllManagers("replSync", c);
 
-      TestingUtil.blockUntilViewsReceived(caches.toArray(new Cache[0]), 10000);
+      Cache c1 = cm1.getCache("replSync");
+      Cache c2 = cm2.getCache("replSync");
 
-      caches.add(new TreeCacheImpl<Object, Object>(cache1));
-      caches.add(new TreeCacheImpl<Object, Object>(cache2));
+      TestingUtil.blockUntilViewsReceived(10000, cm1, cm2);
 
-      System.out.println("*** Finished setUp()");
+      cache1 = new TreeCacheImpl<Object, Object>(c1);
+      cache2 = new TreeCacheImpl<Object, Object>(c2);
    }
 
-   @AfterMethod(alwaysRun = true)
-   public void tearDown() {
-      System.out.println("*** In tearDown()");
-      TestingUtil.killTreeCaches(caches);
-      caches = null;
-      System.out.println("*** Finished tearDown()");
-   }
-
-   private TransactionManager beginTransaction(Cache<Object, Object> cache) throws NotSupportedException, SystemException {
-      TransactionManager mgr = cache.getConfiguration().getRuntimeConfig().getTransactionManager();
+   private TransactionManager beginTransaction(Cache cache) throws NotSupportedException, SystemException {
+      TransactionManager mgr = TestingUtil.getTransactionManager(cache);
       mgr.begin();
       return mgr;
    }
@@ -86,29 +71,19 @@
       Fqn f = Fqn.fromString("/test/data");
       String k = "key", v = "value";
 
-      assertNull("Should be null", caches.get(0).getRoot().getChild(f));
-      assertNull("Should be null", caches.get(1).getRoot().getChild(f));
+      assertNull("Should be null", cache1.getRoot().getChild(f));
+      assertNull("Should be null", cache2.getRoot().getChild(f));
 
-      Node<Object, Object> node = caches.get(0).getRoot().addChild(f);
+      Node<Object, Object> node = cache1.getRoot().addChild(f);
 
       assertNotNull("Should not be null", node);
 
-      TransactionManager tm = beginTransaction(caches.get(0).getCache());
+      TransactionManager tm = beginTransaction(cache1.getCache());
       node.put(k, v);
       tm.commit();
 
       assertEquals(v, node.get(k));
-      assertEquals(v, caches.get(0).get(f, k));
-      assertEquals("Should have replicated", v, caches.get(1).get(f, k));
+      assertEquals(v, cache1.get(f, k));
+      assertEquals("Should have replicated", v, cache2.get(f, k));
    }
-
-   private void assertClusterSize(String message, int size) {
-      for (Cache<Object, Object> c : flatCaches) {
-         assertClusterSize(message, size, c);
-      }
-   }
-
-   private void assertClusterSize(String message, int size, Cache<Object, Object> c) {
-      assertEquals(message, size, c.getCacheManager().getMembers().size());
-   }
 }
\ No newline at end of file

Modified: core/branches/flat/src/test/java/org/horizon/api/tree/TreeCacheAPITest.java
===================================================================
--- core/branches/flat/src/test/java/org/horizon/api/tree/TreeCacheAPITest.java	2009-01-28 14:34:08 UTC (rev 7605)
+++ core/branches/flat/src/test/java/org/horizon/api/tree/TreeCacheAPITest.java	2009-01-28 18:26:32 UTC (rev 7606)
@@ -222,7 +222,7 @@
    }
 
    public void testRpcManagerElements() {
-      assertEquals("CacheMode.LOCAL cache has no address", null, cache.getAddress());
-      assertEquals("CacheMode.LOCAL cache has no members list", null, cache.getMembers());
+      assertEquals("CacheMode.LOCAL cache has no address", null, cache.getCache().getCacheManager().getAddress());
+      assertEquals("CacheMode.LOCAL cache has no members list", null, cache.getCache().getCacheManager().getMembers());
    }
 }

Modified: core/branches/flat/src/test/java/org/horizon/replication/AsyncReplTest.java
===================================================================
--- core/branches/flat/src/test/java/org/horizon/replication/AsyncReplTest.java	2009-01-28 14:34:08 UTC (rev 7605)
+++ core/branches/flat/src/test/java/org/horizon/replication/AsyncReplTest.java	2009-01-28 18:26:32 UTC (rev 7606)
@@ -8,99 +8,89 @@
 
 package org.horizon.replication;
 
+import org.horizon.BaseClusteredTest;
 import org.horizon.Cache;
-import org.horizon.CacheSPI;
-import org.horizon.UnitTestCacheManager;
 import org.horizon.config.Configuration;
+import org.horizon.manager.CacheManager;
 import org.horizon.transaction.DummyTransactionManagerLookup;
 import org.horizon.util.TestingUtil;
-import org.horizon.util.internals.ReplicationListener;
-import static org.testng.AssertJUnit.*;
-import org.testng.annotations.AfterMethod;
+import static org.testng.AssertJUnit.assertEquals;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
 import javax.transaction.TransactionManager;
+import java.util.concurrent.TimeUnit;
 
 /**
  * Unit test for replicated async CacheSPI. Use locking and multiple threads to test concurrent access to the tree.
  */
- at Test(groups = {"functional", "jgroups"})
-public class AsyncReplTest {
+ at Test(groups = "functional", sequential = true)
+public class AsyncReplTest extends BaseClusteredTest {
 
-   private class AsyncReplTestTL {
-      private Configuration configuration;
-      private CacheSPI<Object, Object> cache1, cache2;
-      private UnitTestCacheManager cacheManager1, cacheManager2;
-      private ReplicationListener replListener1, replListener2;
-   }
+   Cache cache1, cache2;
 
-   private ThreadLocal<AsyncReplTestTL> threadLocal = new ThreadLocal<AsyncReplTestTL>();
-
    @BeforeMethod(alwaysRun = true)
    public void setUp() throws Exception {
-      AsyncReplTestTL tl = new AsyncReplTestTL();
-      threadLocal.set(tl);
+      Configuration asyncConfiguration = new Configuration();
+      asyncConfiguration.setCacheMode(Configuration.CacheMode.REPL_ASYNC);
+      asyncConfiguration.setSyncCommitPhase(true);
+      asyncConfiguration.setSyncRollbackPhase(true);
+      asyncConfiguration.setTransactionManagerLookupClass(DummyTransactionManagerLookup.class.getName());
 
-      tl.configuration = new Configuration();
-      tl.configuration.setCacheMode(Configuration.CacheMode.REPL_ASYNC);
-      tl.configuration.setTransactionManagerLookupClass(DummyTransactionManagerLookup.class.getName());
+      CacheManager cm1 = addCacheManager();
+      CacheManager cm2 = addCacheManager();
 
-      log("creating cache1");
-      tl.cacheManager1 = new UnitTestCacheManager(tl.configuration);
-      tl.cache1 = (CacheSPI) tl.cacheManager1.createCache("testCache");
-      tl.replListener1 = new ReplicationListener(tl.cache1);
+      defineCacheOnAllManagers("asyncRepl", asyncConfiguration);
 
-      log("creating cache2");
-      tl.cacheManager2 = new UnitTestCacheManager(tl.configuration);
-      tl.cache2 = (CacheSPI) tl.cacheManager2.createCache("testCache");
-      tl.replListener2 = new ReplicationListener(tl.cache2);
+      cache1 = cm1.getCache("asyncRepl");
+      cache2 = cm2.getCache("asyncRepl");
    }
 
-   /**
-    * Provides a hook for multiplexer integration. This default implementation is a no-op; subclasses that test mux
-    * integration would override to integrate the given cache with a multiplexer.
-    * <p/>
-    * param cache a cache that has been configured but not yet created.
-    */
-   protected void configureMultiplexer(Cache cache) throws Exception {
-      // default does nothing
-   }
+   public void testWithNoTx() throws Exception {
+      ReplListener replListener2 = attachReplicationListener(cache2);
 
-   @AfterMethod(alwaysRun = true)
-   public void tearDown() throws Exception {
-      AsyncReplTestTL tl = threadLocal.get();
-      TestingUtil.killCaches(tl.cache1, tl.cache2);
-      threadLocal.set(null);
+      String key = "key";
+
+      replListener2.expectAny();
+      cache1.put(key, "value1");
+      // allow for replication
+      replListener2.waitForReplication(60, TimeUnit.SECONDS);
+      assertEquals("value1", cache1.get(key));
+      assertEquals("value1", cache2.get(key));
+
+      replListener2.expectAny();
+      cache1.put(key, "value2");
+      assertEquals("value2", cache1.get(key));
+
+      replListener2.waitForReplication(60, TimeUnit.SECONDS);
+
+      assertEquals("value2", cache1.get(key));
+      assertEquals("value2", cache2.get(key));
    }
 
-   public void testTxCompletion() throws Exception {
-      AsyncReplTestTL tl = threadLocal.get();
-      CacheSPI<Object, Object> cache1 = tl.cache1;
-      CacheSPI<Object, Object> cache2 = tl.cache2;
-      ReplicationListener replListener1 = tl.replListener1;
-      ReplicationListener replListener2 = tl.replListener2;
+   public void testWithTx() throws Exception {
+      ReplListener replListener2 = attachReplicationListener(cache2);
 
       String key = "key";
 
       replListener2.expectAny();
       cache1.put(key, "value1");
       // allow for replication
-      replListener2.waitForReplicationToOccur(500);
+      replListener2.waitForReplication(60, TimeUnit.SECONDS);
       assertEquals("value1", cache1.get(key));
       assertEquals("value1", cache2.get(key));
 
-      TransactionManager mgr = cache1.getTransactionManager();
+      TransactionManager mgr = TestingUtil.getTransactionManager(cache1);
       mgr.begin();
 
-      replListener2.expectAny();
+      replListener2.expectAnyWithTx();
       cache1.put(key, "value2");
       assertEquals("value2", cache1.get(key));
       assertEquals("value1", cache2.get(key));
 
       mgr.commit();
 
-      replListener2.waitForReplicationToOccur(500);
+      replListener2.waitForReplication(60, TimeUnit.SECONDS);
 
       assertEquals("value2", cache1.get(key));
       assertEquals("value2", cache2.get(key));
@@ -115,86 +105,4 @@
       assertEquals("value2", cache1.get(key));
       assertEquals("value2", cache2.get(key));
    }
-
-   public void testPutShouldNotReplicateToDifferentCluster() {
-      AsyncReplTestTL tl = threadLocal.get();
-      CacheSPI<Object, Object> cache1 = tl.cache1;
-      CacheSPI<Object, Object> cache2 = tl.cache2;
-      ReplicationListener replListener1 = tl.replListener1;
-      ReplicationListener replListener2 = tl.replListener2;
-
-      CacheSPI<Object, Object> cache3 = null, cache4 = null;
-      try {
-         // TODO: fix this
-//         tl.configuration.setClusterName("otherTest");
-         cache3 = (CacheSPI<Object, Object>) new UnitTestCacheManager(tl.configuration).createCache("testCache");
-         cache4 = (CacheSPI<Object, Object>) new UnitTestCacheManager(tl.configuration).createCache("testCache");
-         replListener2.expectAny();
-         cache1.put("age", 38);
-         // because we use async repl, modfication may not yet have been propagated to cache2, so
-         // we have to wait a little
-         replListener2.waitForReplicationToOccur(500);
-         assertNull("Should not have replicated", cache3.get("age"));
-      }
-      catch (Exception e) {
-         fail(e.toString());
-      }
-      finally {
-         if (cache3 != null) {
-            cache3.stop();
-         }
-         if (cache4 != null) {
-            cache4.stop();
-         }
-      }
-   }
-
-   public void testAsyncReplDelay() {
-      Integer age;
-      AsyncReplTestTL tl = threadLocal.get();
-      CacheSPI<Object, Object> cache1 = tl.cache1;
-      CacheSPI<Object, Object> cache2 = tl.cache2;
-      ReplicationListener replListener1 = tl.replListener1;
-      ReplicationListener replListener2 = tl.replListener2;
-
-      try {
-         cache1.put("age", 38);
-
-         // value on cache2 may be 38 or not yet replicated
-         age = (Integer) cache2.get("age");
-         log("attr \"age\" of \"/a/b/c\" on cache2=" + age);
-         assertTrue("should be either null or 38", age == null || age == 38);
-      }
-      catch (Exception e) {
-         fail(e.toString());
-      }
-   }
-
-   public void testAsyncReplTxDelay() {
-      Integer age;
-      AsyncReplTestTL tl = threadLocal.get();
-      CacheSPI<Object, Object> cache1 = tl.cache1;
-      CacheSPI<Object, Object> cache2 = tl.cache2;
-      ReplicationListener replListener1 = tl.replListener1;
-      ReplicationListener replListener2 = tl.replListener2;
-
-      try {
-         TransactionManager tm = cache1.getTransactionManager();
-         tm.begin();
-         cache1.put("age", 38);
-         tm.commit();
-
-         // value on cache2 may be 38 or not yet replicated
-         age = (Integer) cache2.get("age");
-         log("attr \"age\" of \"/a/b/c\" on cache2=" + age);
-         assertTrue("should be either null or 38", age == null || age == 38);
-      }
-      catch (Exception e) {
-         fail(e.toString());
-      }
-   }
-
-   private void log(String msg) {
-      System.out.println("-- [" + Thread.currentThread() + "]: " + msg);
-   }
 }

Modified: core/branches/flat/src/test/java/org/horizon/replication/SyncReplTest.java
===================================================================
--- core/branches/flat/src/test/java/org/horizon/replication/SyncReplTest.java	2009-01-28 14:34:08 UTC (rev 7605)
+++ core/branches/flat/src/test/java/org/horizon/replication/SyncReplTest.java	2009-01-28 18:26:32 UTC (rev 7606)
@@ -7,7 +7,7 @@
 package org.horizon.replication;
 
 import static org.easymock.EasyMock.*;
-import org.horizon.BaseReplicatedTest;
+import org.horizon.BaseClusteredTest;
 import org.horizon.Cache;
 import org.horizon.commands.RPCCommand;
 import org.horizon.config.Configuration;
@@ -32,7 +32,7 @@
  * @author <a href="mailto:manik at jboss.org">Manik Surtani (manik at jboss.org)</a>
  */
 @Test(groups = "functional", sequential = true)
-public class SyncReplTest extends BaseReplicatedTest {
+public class SyncReplTest extends BaseClusteredTest {
    Cache cache1, cache2;
    String k = "key", v = "value";
 

Deleted: core/branches/flat/src/test/java/org/horizon/util/internals/ReplicationListener.java
===================================================================
--- core/branches/flat/src/test/java/org/horizon/util/internals/ReplicationListener.java	2009-01-28 14:34:08 UTC (rev 7605)
+++ core/branches/flat/src/test/java/org/horizon/util/internals/ReplicationListener.java	2009-01-28 18:26:32 UTC (rev 7606)
@@ -1,254 +0,0 @@
-package org.horizon.util.internals;
-
-import org.horizon.Cache;
-import org.horizon.commands.ReplicableCommand;
-import org.horizon.commands.remote.ReplicateCommand;
-import org.horizon.commands.tx.CommitCommand;
-import org.horizon.commands.tx.PrepareCommand;
-import org.horizon.config.Configuration;
-import org.horizon.context.InvocationContext;
-import org.horizon.factories.ComponentRegistry;
-import org.horizon.io.ByteBuffer;
-import org.horizon.logging.Log;
-import org.horizon.logging.LogFactory;
-import org.horizon.marshall.HorizonMarshaller;
-import org.horizon.marshall.Marshaller;
-import org.horizon.remoting.RPCManager;
-import org.horizon.remoting.transport.jgroups.CommandAwareRpcDispatcher;
-import org.horizon.util.TestingUtil;
-import org.jgroups.blocks.RpcDispatcher;
-import org.jgroups.util.Buffer;
-
-import java.io.InputStream;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-import java.util.Arrays;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.Set;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
-
-/*
-* Utility class that notifies when certain commands were asynchronously replicated on secondary cache.
- * Especially useful for avaoiding Thread.sleep() statements.
- * <p/>
- * Usage:
- * <pre>
- *   Cache c1, c2; //these being two async caches
- *   AsyncReplicationListener listener2 = new AsyncReplicationListener(c2);
- *   listener2.expect(PutKeyValueCommand.class);
- *   c1.put(fqn, key, value);
- *   listener2.waitForReplicationToOccur(1000); // -this will block here untill c2 recieves the PutKeyValueCommand command
- * </pre>
- * Lifecycle - after being used (i.e. waitForReplicationToOccur returns sucessfully) the object returns to the
- * non-initialized state and *can* be reused through expect-wait cycle.
- * <b>Note</b>:  this class might be used aswell for sync caches, e.g. a test could have subclasses which use sync and
- * async replication
- *
- * @author Mircea.Markus at jboss.com
- * @since 1.0
- */
-public class ReplicationListener {
-   private CountDownLatch latch = new CountDownLatch(1);
-   private Set<Class<? extends ReplicableCommand>> expectedCommands;
-   Configuration configuration;
-   private static final Log log = LogFactory.getLog(ReplicationListener.class);
-
-   /**
-    * Builds a listener that will observe the given cache for recieving replication commands.
-    */
-   public ReplicationListener(Cache cache) {
-      ComponentRegistry componentRegistry = TestingUtil.extractComponentRegistry(cache);
-      RPCManager rpcManager = componentRegistry.getComponent(RPCManager.class);
-      CommandAwareRpcDispatcher realDispatcher = (CommandAwareRpcDispatcher) TestingUtil.extractField(rpcManager, "rpcDispatcher");
-      RpcDispatcher.Marshaller2 realMarshaller = (RpcDispatcher.Marshaller2) realDispatcher.getMarshaller();
-      RpcDispatcher.Marshaller2 delegate = null;
-      delegate = new MarshallerDelegate(realMarshaller);
-      realDispatcher.setMarshaller(delegate);
-      realDispatcher.setRequestMarshaller(delegate);
-      realDispatcher.setResponseMarshaller(delegate);
-      configuration = cache.getConfiguration();
-   }
-
-   private class MarshallerDelegate implements RpcDispatcher.Marshaller2 {
-      RpcDispatcher.Marshaller2 marshaller;
-
-      private MarshallerDelegate(RpcDispatcher.Marshaller2 marshaller) {
-         this.marshaller = marshaller;
-      }
-
-      public byte[] objectToByteBuffer(Object obj) throws Exception {
-         return marshaller.objectToByteBuffer(obj);
-      }
-
-      public Object objectFromByteBuffer(byte bytes[]) throws Exception {
-         Object result = marshaller.objectFromByteBuffer(bytes);
-         if (result instanceof ReplicateCommand && expectedCommands != null) {
-            ReplicateCommand replicateCommand = (ReplicateCommand) result;
-            return new ReplicateCommandDelegate(replicateCommand);
-         }
-         return result;
-      }
-
-      public Buffer objectToBuffer(Object o) throws Exception {
-         return marshaller.objectToBuffer(o);
-      }
-
-      public Object objectFromByteBuffer(byte[] bytes, int i, int i1) throws Exception {
-         Object result = marshaller.objectFromByteBuffer(bytes, i, i1);
-         if (result instanceof ReplicateCommand && expectedCommands != null) {
-            ReplicateCommand replicateCommand = (ReplicateCommand) result;
-            return new ReplicateCommandDelegate(replicateCommand);
-         }
-         return result;
-      }
-   }
-
-   /**
-    * We want the notification to be performed only *after* the remote command is executed.
-    */
-   private class ReplicateCommandDelegate extends ReplicateCommand {
-      ReplicateCommand realOne;
-
-      private ReplicateCommandDelegate(ReplicateCommand realOne) {
-         this.realOne = realOne;
-      }
-
-      @Override
-      public Object perform(InvocationContext ctx) throws Throwable {
-         try {
-            return realOne.perform(ctx);
-         }
-         finally {
-            log.trace("Processed command: " + realOne);
-            Iterator<Class<? extends ReplicableCommand>> it = expectedCommands.iterator();
-            while (it.hasNext()) {
-               Class<? extends ReplicableCommand> replicableCommandClass = it.next();
-               if (realOne.containsCommandType(replicableCommandClass)) {
-                  it.remove();
-               } else if (realOne.getSingleCommand() instanceof PrepareCommand) //explicit transaction
-               {
-                  PrepareCommand prepareCommand = (PrepareCommand) realOne.getSingleCommand();
-                  if (prepareCommand.containsModificationType(replicableCommandClass)) {
-                     it.remove();
-                  }
-               }
-            }
-            if (expectedCommands.isEmpty()) {
-               latch.countDown();
-            }
-         }
-      }
-   }
-
-   /**
-    * Needed for region based marshalling.
-    */
-   private class RegionMarshallerDelegate extends HorizonMarshaller {
-      private Marshaller realOne;
-
-      private RegionMarshallerDelegate(Marshaller realOne) {
-         this.realOne = realOne;
-      }
-
-      @Override
-      public void objectToObjectStream(Object obj, ObjectOutputStream out) throws Exception {
-         realOne.objectToObjectStream(obj, out);
-      }
-
-      @Override
-      public Object objectFromObjectStream(ObjectInputStream in) throws Exception {
-         return realOne.objectFromObjectStream(in);
-      }
-
-      @Override
-      public Object objectFromStream(InputStream is) throws Exception {
-         return realOne.objectFromStream(is);
-      }
-
-      public Object objectFromByteBuffer(byte[] bytes) throws Exception {
-         return this.objectFromByteBuffer(bytes, 0, bytes.length);
-      }
-
-
-      public ByteBuffer objectToBuffer(Object o) throws Exception {
-         return realOne.objectToBuffer(o);
-      }
-
-      public Object objectFromByteBuffer(byte[] buffer, int i, int i1) throws Exception {
-         Object result = realOne.objectFromByteBuffer(buffer, i, i1);
-         if (result instanceof ReplicateCommand && expectedCommands != null) {
-            ReplicateCommand replicateCommand = (ReplicateCommand) result;
-            result = new ReplicateCommandDelegate(replicateCommand);
-         }
-         return result;
-      }
-   }
-
-   /**
-    * Waits for 1 minute
-    */
-   public void waitForReplicationToOccur() {
-      waitForReplicationToOccur(60000);
-   }
-
-   /**
-    * Blocks for the elements specified through {@link #expect(Class[])} invocations to be replicated in this cache. if
-    * replication does not occur in the give timeout then an exception is being thrown.
-    */
-   public void waitForReplicationToOccur(long timeoutMillis) {
-      log.trace("enter... ReplicationListener.waitForReplicationToOccur");
-      waitForReplicationToOccur(timeoutMillis, TimeUnit.MILLISECONDS);
-      log.trace("exit... ReplicationListener.waitForReplicationToOccur");
-   }
-
-   /**
-    * Similar to {@link #waitForReplicationToOccur(long)} except that this method provides more flexibility in time
-    * units.
-    *
-    * @param timeout  the maximum time to wait
-    * @param timeUnit the time unit of the <tt>timeout</tt> argument.
-    */
-   public void waitForReplicationToOccur(long timeout, TimeUnit timeUnit) {
-      assert expectedCommands != null : "there are no replication expectations; please use AsyncReplicationListener.expect(...) before calling this method";
-      try {
-         if (!latch.await(timeout, timeUnit)) {
-            assert false : "waiting for more than " + timeout + " " + timeUnit + " and following commands did not replicate: " + expectedCommands;
-         }
-      }
-      catch (InterruptedException e) {
-         throw new IllegalStateException("unexpected", e);
-      }
-      finally {
-         expectedCommands = null;
-         latch = new CountDownLatch(1);
-      }
-   }
-
-   /**
-    * {@link #waitForReplicationToOccur(long)} will block untill all the commands specified here are being replicated to
-    * this cache. The method can be called several times with various arguments.
-    */
-   public void expect(Class<? extends ReplicableCommand>... expectedCommands) {
-      if (this.expectedCommands == null) {
-         this.expectedCommands = new HashSet<Class<? extends ReplicableCommand>>();
-      }
-      this.expectedCommands.addAll(Arrays.asList(expectedCommands));
-   }
-
-   /**
-    * Waits untill first command is replicated.
-    */
-   public void expectAny() {
-      expect();
-   }
-
-   public void expectWithTx(Class<? extends ReplicableCommand>... writeCommands) {
-      expect(PrepareCommand.class);
-      //this is because for async replication we have an 1pc transaction
-      if (configuration.getCacheMode().isSynchronous()) expect(CommitCommand.class);
-   }
-
-}
\ No newline at end of file




More information about the jbosscache-commits mailing list