Author: manik.surtani(a)jboss.com
Date: 2009-01-28 13:26:32 -0500 (Wed, 28 Jan 2009)
New Revision: 7606
Added:
core/branches/flat/src/test/java/org/horizon/BaseClusteredTest.java
Removed:
core/branches/flat/src/test/java/org/horizon/BaseReplicatedTest.java
core/branches/flat/src/test/java/org/horizon/util/internals/ReplicationListener.java
Modified:
core/branches/flat/src/main/java/org/horizon/Cache.java
core/branches/flat/src/main/java/org/horizon/CacheDelegate.java
core/branches/flat/src/main/java/org/horizon/executors/DefaultExecutorFactory.java
core/branches/flat/src/main/java/org/horizon/executors/DefaultScheduledExecutorFactory.java
core/branches/flat/src/main/java/org/horizon/factories/InterceptorChainFactory.java
core/branches/flat/src/main/java/org/horizon/interceptors/InterceptorChain.java
core/branches/flat/src/main/java/org/horizon/transaction/DummyTransactionManager.java
core/branches/flat/src/main/java/org/horizon/tree/TreeCache.java
core/branches/flat/src/main/java/org/horizon/tree/TreeCacheImpl.java
core/branches/flat/src/main/java/org/horizon/util/TestingUtil.java
core/branches/flat/src/test/java/org/horizon/api/mvcc/PutForExternalReadTest.java
core/branches/flat/src/test/java/org/horizon/api/tree/NodeAPITest.java
core/branches/flat/src/test/java/org/horizon/api/tree/NodeReplicatedMoveTest.java
core/branches/flat/src/test/java/org/horizon/api/tree/SyncReplTest.java
core/branches/flat/src/test/java/org/horizon/api/tree/SyncReplTxTest.java
core/branches/flat/src/test/java/org/horizon/api/tree/TreeCacheAPITest.java
core/branches/flat/src/test/java/org/horizon/replication/AsyncReplTest.java
core/branches/flat/src/test/java/org/horizon/replication/SyncReplTest.java
Log:
replication code + tests on shared transport
Modified: core/branches/flat/src/main/java/org/horizon/Cache.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/Cache.java 2009-01-28 14:34:08 UTC (rev
7605)
+++ core/branches/flat/src/main/java/org/horizon/Cache.java 2009-01-28 18:26:32 UTC (rev
7606)
@@ -81,13 +81,48 @@
String getVersion();
+ /**
+ * Adds a custom interceptor to the interceptor chain, at specified position, where
the first interceptor in the
+ * chain is at position 0 and the last one at NUM_INTERCEPTORS - 1.
+ *
+ * @param i the interceptor to add
+ * @param position the position to add the interceptor
+ */
void addInterceptor(CommandInterceptor i, int position);
- void addInterceptor(CommandInterceptor i, Class<? extends CommandInterceptor>
afterInterceptor);
+ /**
+ * Adds a custom interceptor to the interceptor chain, after an instance of the
specified interceptor type. Throws a
+ * cache exception if it cannot find an interceptor of the specified type.
+ *
+ * @param i interceptor to add
+ * @param afterInterceptor interceptor type after which to place custom interceptor
+ */
+ void addInterceptorAfter(CommandInterceptor i, Class<? extends
CommandInterceptor> afterInterceptor);
+ /**
+ * Adds a custom interceptor to the interceptor chain, before an instance of the
specified interceptor type. Throws a
+ * cache exception if it cannot find an interceptor of the specified type.
+ *
+ * @param i interceptor to add
+ * @param beforeInterceptor interceptor type before which to place custom interceptor
+ */
+ void addInterceptorBefore(CommandInterceptor i, Class<? extends
CommandInterceptor> beforeInterceptor);
+
+ /**
+ * Removes the interceptor at a specified position, where the first interceptor in the
chain is at position 0 and the
+ * last one at getInterceptorChain().size() - 1.
+ *
+ * @param position the position at which to remove an interceptor
+ */
void removeInterceptor(int position);
+ /**
+ * Removes the interceptor of specified type.
+ *
+ * @param interceptorType type of interceptor to remove
+ */
void removeInterceptor(Class<? extends CommandInterceptor> interceptorType);
+
CacheManager getCacheManager();
}
Modified: core/branches/flat/src/main/java/org/horizon/CacheDelegate.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/CacheDelegate.java 2009-01-28 14:34:08
UTC (rev 7605)
+++ core/branches/flat/src/main/java/org/horizon/CacheDelegate.java 2009-01-28 18:26:32
UTC (rev 7606)
@@ -285,19 +285,23 @@
}
public void addInterceptor(CommandInterceptor i, int position) {
- throw new IllegalStateException();//todo Implement me properly
+ invoker.addInterceptor(i, position);
}
- public void addInterceptor(CommandInterceptor i, Class<? extends
CommandInterceptor> afterInterceptor) {
- throw new IllegalStateException();//todo Implement me properly
+ public void addInterceptorAfter(CommandInterceptor i, Class<? extends
CommandInterceptor> afterInterceptor) {
+ invoker.addInterceptorAfter(i, afterInterceptor);
}
+ public void addInterceptorBefore(CommandInterceptor i, Class<? extends
CommandInterceptor> beforeInterceptor) {
+ invoker.addInterceptorBefore(i, beforeInterceptor);
+ }
+
public void removeInterceptor(int position) {
- throw new IllegalStateException();//todo Implement me properly
+ invoker.removeInterceptor(position);
}
public void removeInterceptor(Class<? extends CommandInterceptor>
interceptorType) {
- throw new IllegalStateException();//todo Implement me properly
+ invoker.removeInterceptor(interceptorType);
}
public CacheLoaderManager getCacheLoaderManager() {
Modified:
core/branches/flat/src/main/java/org/horizon/executors/DefaultExecutorFactory.java
===================================================================
---
core/branches/flat/src/main/java/org/horizon/executors/DefaultExecutorFactory.java 2009-01-28
14:34:08 UTC (rev 7605)
+++
core/branches/flat/src/main/java/org/horizon/executors/DefaultExecutorFactory.java 2009-01-28
18:26:32 UTC (rev 7606)
@@ -26,7 +26,7 @@
ThreadFactory tf = new ThreadFactory() {
public Thread newThread(Runnable r) {
- return new Thread(threadNamePrefix + "-" +
counter.getAndIncrement());
+ return new Thread(r, threadNamePrefix + "-" +
counter.getAndIncrement());
}
};
Modified:
core/branches/flat/src/main/java/org/horizon/executors/DefaultScheduledExecutorFactory.java
===================================================================
---
core/branches/flat/src/main/java/org/horizon/executors/DefaultScheduledExecutorFactory.java 2009-01-28
14:34:08 UTC (rev 7605)
+++
core/branches/flat/src/main/java/org/horizon/executors/DefaultScheduledExecutorFactory.java 2009-01-28
18:26:32 UTC (rev 7606)
@@ -18,7 +18,7 @@
final AtomicInteger counter = new AtomicInteger(0);
return Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
public Thread newThread(Runnable r) {
- return new Thread(threadNamePrefix + "-" +
counter.getAndIncrement());
+ return new Thread(r, threadNamePrefix + "-" +
counter.getAndIncrement());
}
});
}
Modified:
core/branches/flat/src/main/java/org/horizon/factories/InterceptorChainFactory.java
===================================================================
---
core/branches/flat/src/main/java/org/horizon/factories/InterceptorChainFactory.java 2009-01-28
14:34:08 UTC (rev 7605)
+++
core/branches/flat/src/main/java/org/horizon/factories/InterceptorChainFactory.java 2009-01-28
18:26:32 UTC (rev 7606)
@@ -171,7 +171,7 @@
throw new ConfigurationException("Cannot add after class: " +
config.getAfterClass()
+ " as no such iterceptor exists in the default chain");
}
- interceptorChain.addAfterInterceptor(config.getInterceptor(),
withClassName.get(0).getClass());
+ interceptorChain.addInterceptorAfter(config.getInterceptor(),
withClassName.get(0).getClass());
}
if (config.getBeforeClass() != null) {
List<CommandInterceptor> withClassName =
interceptorChain.getInterceptorsWithClassName(config.getBeforeClass());
@@ -179,7 +179,7 @@
throw new ConfigurationException("Cannot add before class: " +
config.getAfterClass()
+ " as no such iterceptor exists in the default chain");
}
- interceptorChain.addBeforeInterceptor(config.getInterceptor(),
withClassName.get(0).getClass());
+ interceptorChain.addInterceptorBefore(config.getInterceptor(),
withClassName.get(0).getClass());
}
}
}
Modified: core/branches/flat/src/main/java/org/horizon/interceptors/InterceptorChain.java
===================================================================
---
core/branches/flat/src/main/java/org/horizon/interceptors/InterceptorChain.java 2009-01-28
14:34:08 UTC (rev 7605)
+++
core/branches/flat/src/main/java/org/horizon/interceptors/InterceptorChain.java 2009-01-28
18:26:32 UTC (rev 7606)
@@ -182,7 +182,7 @@
*
* @return true if the interceptor was added; i.e. the afterInterceptor exists
*/
- public synchronized boolean addAfterInterceptor(CommandInterceptor toAdd, Class<?
extends CommandInterceptor> afterInterceptor) {
+ public synchronized boolean addInterceptorAfter(CommandInterceptor toAdd, Class<?
extends CommandInterceptor> afterInterceptor) {
CommandInterceptor it = firstInChain;
while (it != null) {
if (it.getClass().equals(afterInterceptor)) {
@@ -200,7 +200,7 @@
*
* @return true if the interceptor was added; i.e. the afterInterceptor exists
*/
- public synchronized boolean addBeforeInterceptor(CommandInterceptor toAdd, Class<?
extends CommandInterceptor> beforeInterceptor) {
+ public synchronized boolean addInterceptorBefore(CommandInterceptor toAdd, Class<?
extends CommandInterceptor> beforeInterceptor) {
if (firstInChain.getClass().equals(beforeInterceptor)) {
toAdd.setNext(firstInChain);
firstInChain = toAdd;
Modified:
core/branches/flat/src/main/java/org/horizon/transaction/DummyTransactionManager.java
===================================================================
---
core/branches/flat/src/main/java/org/horizon/transaction/DummyTransactionManager.java 2009-01-28
14:34:08 UTC (rev 7605)
+++
core/branches/flat/src/main/java/org/horizon/transaction/DummyTransactionManager.java 2009-01-28
18:26:32 UTC (rev 7606)
@@ -27,6 +27,7 @@
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;
+import javax.naming.NoInitialContextException;
import java.util.Properties;
/**
@@ -54,8 +55,10 @@
Context ctx = new InitialContext(p);
ctx.bind("java:/TransactionManager", instance);
ctx.bind("UserTransaction", utx);
- }
- catch (NamingException e) {
+ } catch (NoInitialContextException nie) {
+ log.debug(nie.getMessage());
+
+ } catch (NamingException e) {
log.debug("binding of DummyTransactionManager failed", e);
}
}
Modified: core/branches/flat/src/main/java/org/horizon/tree/TreeCache.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/tree/TreeCache.java 2009-01-28 14:34:08
UTC (rev 7605)
+++ core/branches/flat/src/main/java/org/horizon/tree/TreeCache.java 2009-01-28 18:26:32
UTC (rev 7606)
@@ -23,15 +23,8 @@
import org.horizon.Cache;
import org.horizon.CacheException;
-import org.horizon.ComponentStatus;
-import org.horizon.config.Configuration;
-import org.horizon.context.InvocationContext;
-import org.horizon.interceptors.base.CommandInterceptor;
import org.horizon.lifecycle.Lifecycle;
-import org.horizon.notifications.Listenable;
-import org.horizon.remoting.transport.Address;
-import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -55,7 +48,7 @@
* @see Node
* @since 1.0
*/
-public interface TreeCache<K, V> extends Lifecycle, Listenable {
+public interface TreeCache<K, V> extends Lifecycle {
/**
* Returns the root node of this cache.
*
@@ -236,41 +229,6 @@
void evict(Fqn fqn);
/**
- * Gets where the cache currently is its lifecycle transitions.
- *
- * @return the CacheStatus. Will not return <code>null</code>.
- */
- ComponentStatus getCacheStatus();
-
- /**
- * @return the current invocation context for the current invocation and cache
instance.
- * @throws IllegalStateException if the cache has been destroyed.
- */
- InvocationContext getInvocationContext();
-
- /**
- * Sets the passed in {@link InvocationContext} as current.
- *
- * @param ctx invocation context to use
- * @throws IllegalStateException if the cache has been destroyed.
- */
- void setInvocationContext(InvocationContext ctx);
-
- /**
- * Returns the local address of this cache in a cluster, or
<code>null</code> if running in local mode.
- *
- * @return the local address of this cache in a cluster, or
<code>null</code> if running in local mode.
- */
- Address getAddress();
-
- /**
- * Returns a list of members in the cluster, or <code>null</code> if
running in local mode.
- *
- * @return a {@link List} of members in the cluster, or <code>null</code>
if running in local mode.
- */
- List<Address> getMembers();
-
- /**
* Moves a part of the cache to a different subtree.
* <p/>
* E.g.:
@@ -380,77 +338,23 @@
void clearData(Fqn fqn);
/**
- * Starts a batch. This is a lightweight batching mechanism that groups cache writes
together and finally performs
- * the write, persistence and/or replication when {@link #endBatch(boolean)} is called
rather than for each
- * invocation on the cache.
- * <p/>
- * Note that if there is an existing transaction in scope and the cache has been
configured to use a JTA compliant
- * transaction manager, calls to {@link #startBatch()} and {@link #endBatch(boolean)}
are ignored and treated as
- * no-ops.
- * <p/>
- *
- * @see #endBatch(boolean)
- * @since 1.0
+ * @return a reference to the underlying cache instance
*/
- void startBatch();
+ Cache<K, V> getCache();
/**
- * Ends an existing ongoing batch. A no-op if a batch has not been started yet.
- * <p/>
- * Note that if there is an existing transaction in scope and the cache has been
configured to use a JTA compliant
- * transaction manager, calls to {@link #startBatch()} and {@link #endBatch(boolean)}
are ignored and treated as
- * no-ops.
- * <p/>
+ * Tests if an Fqn exists. Convenience method for {@link #exists(Fqn)}
*
- * @param successful if <tt>true</tt>, changes made in the batch are
committed. If <tt>false</tt>, they are
- * discarded.
- * @see #startBatch()
- * @since 1.0
+ * @param fqn string representation of an Fqn
+ * @return true if the fqn exists, false otherwise
*/
- void endBatch(boolean successful);
+ boolean exists(String fqn);
/**
- * Adds a custom interceptor to the interceptor chain, at specified position, where
the first interceptor in the
- * chain is at position 0 and the last one at getInterceptorChain().size() - 1.
+ * Tests if an Fqn exists.
*
- * @param i the interceptor to add
- * @param position the position to add the interceptor
- * @since 1.0
+ * @param fqn Fqn to test
+ * @return true if the fqn exists, false otherwise
*/
- void addInterceptor(CommandInterceptor i, int position);
-
- /**
- * Adds a custom interceptor to the interceptor chain, after an instance of the
specified interceptor type. Throws a
- * cache exception if it cannot find an interceptor of the specified type.
- *
- * @param i interceptor to add
- * @param afterInterceptor interceptor type after which to place custom interceptor
- * @since 1.0
- */
- void addInterceptor(CommandInterceptor i, Class<? extends CommandInterceptor>
afterInterceptor);
-
- /**
- * Removes the interceptor at a specified position, where the first interceptor in the
chain is at position 0 and the
- * last one at getInterceptorChain().size() - 1.
- *
- * @param position the position at which to remove an interceptor
- * @since 1.0
- */
- void removeInterceptor(int position);
-
- /**
- * Removes the interceptor of specified type.
- *
- * @param interceptorType type of interceptor to remove
- * @since 1.0
- */
- void removeInterceptor(Class<? extends CommandInterceptor> interceptorType);
-
- Configuration getConfiguration();
-
- Cache<K, V> getCache();
-
- boolean exists(String fqn);
-
boolean exists(Fqn fqn);
}
Modified: core/branches/flat/src/main/java/org/horizon/tree/TreeCacheImpl.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/tree/TreeCacheImpl.java 2009-01-28
14:34:08 UTC (rev 7605)
+++ core/branches/flat/src/main/java/org/horizon/tree/TreeCacheImpl.java 2009-01-28
18:26:32 UTC (rev 7606)
@@ -23,16 +23,10 @@
import org.horizon.Cache;
import org.horizon.CacheException;
-import org.horizon.ComponentStatus;
import org.horizon.atomic.AtomicMap;
-import org.horizon.config.Configuration;
-import org.horizon.context.InvocationContext;
-import org.horizon.interceptors.base.CommandInterceptor;
import org.horizon.logging.Log;
import org.horizon.logging.LogFactory;
-import org.horizon.remoting.transport.Address;
-import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -287,70 +281,10 @@
}
// ------------------ nothing different; just delegate to the cache
- public void addListener(Object listener) {
- cache.addListener(listener);
- }
-
- public void removeListener(Object listener) {
- cache.removeListener(listener);
- }
-
- public Set<Object> getListeners() {
- return cache.getListeners();
- }
-
- public void startBatch() {
- cache.startBatch();
- }
-
- public void endBatch(boolean successful) {
- cache.endBatch(successful);
- }
-
- public void addInterceptor(CommandInterceptor i, int position) {
- cache.addInterceptor(i, position);
- }
-
- public void addInterceptor(CommandInterceptor i, Class<? extends
CommandInterceptor> afterInterceptor) {
- cache.addInterceptor(i, afterInterceptor);
- }
-
- public void removeInterceptor(int position) {
- cache.removeInterceptor(position);
- }
-
- public void removeInterceptor(Class<? extends CommandInterceptor>
interceptorType) {
- cache.removeInterceptor(interceptorType);
- }
-
- public Configuration getConfiguration() {
- return cache.getConfiguration();
- }
-
public Cache getCache() {
return cache;
}
- public ComponentStatus getCacheStatus() {
- return cache.getCacheStatus();
- }
-
- public InvocationContext getInvocationContext() {
- return cache.getInvocationContext();
- }
-
- public void setInvocationContext(InvocationContext ctx) {
- cache.setInvocationContext(ctx);
- }
-
- public Address getAddress() {
- return cache.getCacheManager().getAddress();
- }
-
- public List<Address> getMembers() {
- return cache.getCacheManager().getMembers();
- }
-
public void start() throws CacheException {
cache.start();
createRoot();
Modified: core/branches/flat/src/main/java/org/horizon/util/TestingUtil.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/util/TestingUtil.java 2009-01-28 14:34:08
UTC (rev 7605)
+++ core/branches/flat/src/main/java/org/horizon/util/TestingUtil.java 2009-01-28 18:26:32
UTC (rev 7606)
@@ -89,20 +89,6 @@
}
/**
- * Injects an interceptor after a specified interceptor in a running cache. Your new
interceptor need not be
- * initialised with pointers to the next interceptor, etc. as this method does all
that for you, including calling
- * setCache().
- *
- * @param cache running cache instance
- * @param interceptorToInject interceptor instance to inject.
- * @param interceptorAfterWhichToInject class of interceptor to search for in the
chain and after which to add your
- * interceptor
- */
- public static void injectInterceptor(CacheSPI<?, ?> cache, CommandInterceptor
interceptorToInject, Class<? extends CommandInterceptor>
interceptorAfterWhichToInject) {
- cache.addInterceptor(interceptorToInject, interceptorAfterWhichToInject);
- }
-
- /**
* Loops, continually calling {@link #areCacheViewsComplete(Cache[])} until it either
returns true or
* <code>timeout</code> ms have elapsed.
*
@@ -628,4 +614,8 @@
ComponentRegistry cr = extractComponentRegistry(cache);
return cr.getComponent(componentType);
}
+
+ public static TransactionManager getTransactionManager(Cache cache) {
+ return extractComponent(cache, TransactionManager.class);
+ }
}
Copied: core/branches/flat/src/test/java/org/horizon/BaseClusteredTest.java (from rev
7605, core/branches/flat/src/test/java/org/horizon/BaseReplicatedTest.java)
===================================================================
--- core/branches/flat/src/test/java/org/horizon/BaseClusteredTest.java
(rev 0)
+++ core/branches/flat/src/test/java/org/horizon/BaseClusteredTest.java 2009-01-28
18:26:32 UTC (rev 7606)
@@ -0,0 +1,163 @@
+package org.horizon;
+
+import org.horizon.commands.DataCommand;
+import org.horizon.commands.VisitableCommand;
+import org.horizon.commands.tx.CommitCommand;
+import org.horizon.commands.tx.PrepareCommand;
+import org.horizon.config.Configuration;
+import org.horizon.config.GlobalConfiguration;
+import org.horizon.context.InvocationContext;
+import org.horizon.interceptors.base.CommandInterceptor;
+import org.horizon.manager.CacheManager;
+import org.horizon.manager.DefaultCacheManager;
+import org.horizon.util.TestingUtil;
+import org.testng.annotations.AfterMethod;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+public abstract class BaseClusteredTest {
+ ThreadLocal<List<CacheManager>> cacheManagerThreadLocal = new
ThreadLocal<List<CacheManager>>() {
+ @Override
+ protected List<CacheManager> initialValue() {
+ return new LinkedList<CacheManager>();
+ }
+ };
+
+ /**
+ * @return a list of registered cache managers on the current thread.
+ */
+ protected List<CacheManager> getCacheManagers() {
+ return cacheManagerThreadLocal.get();
+ }
+
+ /**
+ * Creates a new cache manager, starts it, and adds it to the list of known cache
managers on the current thread.
+ * Uses a default clustered cache manager global config.
+ *
+ * @return the new CacheManager
+ */
+ protected CacheManager addCacheManager() {
+ return addCacheManager(GlobalConfiguration.getClusteredDefault());
+ }
+
+ /**
+ * Creates a new cache manager, starts it, and adds it to the list of known cache
managers on the current thread.
+ *
+ * @param globalConfig config to use
+ * @return the new CacheManager
+ */
+ protected CacheManager addCacheManager(GlobalConfiguration globalConfig) {
+ CacheManager cm = new DefaultCacheManager(globalConfig);
+ cacheManagerThreadLocal.get().add(cm);
+ return cm;
+ }
+
+ protected void defineCacheOnAllManagers(String cacheName, Configuration c) {
+ for (CacheManager cm : cacheManagerThreadLocal.get()) {
+ cm.defineCache(cacheName, c);
+ }
+ }
+
+ protected void assertClusterSize(String message, int size) {
+ for (CacheManager cm : cacheManagerThreadLocal.get()) {
+ assert cm.getMembers() != null && cm.getMembers().size() == size :
message;
+ }
+ }
+
+ protected ReplListener attachReplicationListener(Cache c) {
+ return new ReplListener(c);
+ }
+
+ @AfterMethod
+ public void cleanupThreadLocals() {
+ TestingUtil.killCacheManagers(cacheManagerThreadLocal.get().toArray(new
CacheManager[cacheManagerThreadLocal.get().size()]));
+ cacheManagerThreadLocal.get().clear();
+ }
+
+ protected static class ReplListener {
+ Cache c;
+ Set<Class<? extends VisitableCommand>> expectedCommands;
+ CountDownLatch latch = new CountDownLatch(1);
+
+ public ReplListener(Cache c) {
+ this.c = c;
+ this.c.addInterceptor(new ReplListenerInterceptor(), 0);
+ }
+
+ public void expectAny() {
+ expect();
+ }
+
+ public void expectWithTx(Class<? extends VisitableCommand>... commands) {
+ expect(PrepareCommand.class);
+ expect(commands);
+ //this is because for async replication we have an 1pc transaction
+ if (c.getConfiguration().getCacheMode().isSynchronous())
expect(CommitCommand.class);
+ }
+
+ public void expectAnyWithTx() {
+ expect(PrepareCommand.class);
+ //this is because for async replication we have an 1pc transaction
+ if (c.getConfiguration().getCacheMode().isSynchronous())
expect(CommitCommand.class);
+ }
+
+ public void expect(Class<? extends VisitableCommand>... expectedCommands) {
+ if (this.expectedCommands == null) {
+ this.expectedCommands = new HashSet<Class<? extends
VisitableCommand>>();
+ }
+ this.expectedCommands.addAll(Arrays.asList(expectedCommands));
+ }
+
+ public void waitForReplication() {
+ waitForReplication(600, TimeUnit.SECONDS);
+ }
+
+ public void waitForReplication(long time, TimeUnit unit) {
+ assert expectedCommands != null : "there are no replication expectations;
please use ReplListener.expect() before calling this method";
+ try {
+ if (!latch.await(time, unit)) {
+ assert false : "Waiting for more than " + time + " " +
unit + " and following commands did not replicate: " + expectedCommands;
+ }
+ }
+ catch (InterruptedException e) {
+ throw new IllegalStateException("unexpected", e);
+ }
+ finally {
+ expectedCommands = null;
+ latch = new CountDownLatch(1);
+ }
+ }
+
+ protected class ReplListenerInterceptor extends CommandInterceptor {
+ @Override
+ protected Object handleDefault(InvocationContext ctx, VisitableCommand cmd)
throws Throwable {
+ // first pass up chain
+ Object o = invokeNextInterceptor(ctx, cmd);
+ markAsVisited(cmd);
+ return o;
+ }
+
+ @Override
+ public Object visitPrepareCommand(InvocationContext ctx, PrepareCommand cmd)
throws Throwable {
+ // first pass up chain
+ Object o = invokeNextInterceptor(ctx, cmd);
+ markAsVisited(cmd);
+ for (DataCommand mod : cmd.getModifications()) markAsVisited(mod);
+ return o;
+ }
+
+ private void markAsVisited(VisitableCommand cmd) {
+ if (expectedCommands != null) {
+ expectedCommands.remove(cmd.getClass());
+ if (expectedCommands.isEmpty()) latch.countDown();
+ }
+ }
+ }
+ }
+}
Deleted: core/branches/flat/src/test/java/org/horizon/BaseReplicatedTest.java
===================================================================
--- core/branches/flat/src/test/java/org/horizon/BaseReplicatedTest.java 2009-01-28
14:34:08 UTC (rev 7605)
+++ core/branches/flat/src/test/java/org/horizon/BaseReplicatedTest.java 2009-01-28
18:26:32 UTC (rev 7606)
@@ -1,68 +0,0 @@
-package org.horizon;
-
-import org.horizon.config.Configuration;
-import org.horizon.config.GlobalConfiguration;
-import org.horizon.manager.CacheManager;
-import org.horizon.manager.DefaultCacheManager;
-import org.horizon.util.TestingUtil;
-import org.testng.annotations.AfterMethod;
-
-import java.util.LinkedList;
-import java.util.List;
-
-public abstract class BaseReplicatedTest {
- ThreadLocal<List<CacheManager>> cacheManagerThreadLocal = new
ThreadLocal<List<CacheManager>>() {
- @Override
- protected List<CacheManager> initialValue() {
- return new LinkedList<CacheManager>();
- }
- };
-
- /**
- * @return a list of registered cache managers on the current thread.
- */
- protected List<CacheManager> getCacheManagers() {
- return cacheManagerThreadLocal.get();
- }
-
- /**
- * Creates a new cache manager, starts it, and adds it to the list of known cache
managers on the current thread.
- * Uses a default clustered cache manager global config.
- *
- * @return the new CacheManager
- */
- protected CacheManager addCacheManager() {
- return addCacheManager(GlobalConfiguration.getClusteredDefault());
- }
-
- /**
- * Creates a new cache manager, starts it, and adds it to the list of known cache
managers on the current thread.
- *
- * @param globalConfig config to use
- * @return the new CacheManager
- */
- protected CacheManager addCacheManager(GlobalConfiguration globalConfig) {
- CacheManager cm = new DefaultCacheManager(globalConfig);
- cacheManagerThreadLocal.get().add(cm);
- return cm;
- }
-
- protected void defineCacheOnAllManagers(String cacheName, Configuration c) {
- for (CacheManager cm : cacheManagerThreadLocal.get()) {
- cm.defineCache(cacheName, c);
- }
- }
-
- protected void assertClusterSize(String message, int size) {
- for (CacheManager cm : cacheManagerThreadLocal.get()) {
- assert cm.getMembers() != null && cm.getMembers().size() == size :
message;
- }
- }
-
- @AfterMethod
- public void cleanupThreadLocals() {
- TestingUtil.killCacheManagers(cacheManagerThreadLocal.get().toArray(new
CacheManager[cacheManagerThreadLocal.get().size()]));
- cacheManagerThreadLocal.get().clear();
- }
-
-}
Modified:
core/branches/flat/src/test/java/org/horizon/api/mvcc/PutForExternalReadTest.java
===================================================================
---
core/branches/flat/src/test/java/org/horizon/api/mvcc/PutForExternalReadTest.java 2009-01-28
14:34:08 UTC (rev 7605)
+++
core/branches/flat/src/test/java/org/horizon/api/mvcc/PutForExternalReadTest.java 2009-01-28
18:26:32 UTC (rev 7606)
@@ -2,23 +2,21 @@
import org.easymock.EasyMock;
import static org.easymock.EasyMock.*;
+import org.horizon.BaseClusteredTest;
import org.horizon.Cache;
-import org.horizon.CacheSPI;
-import org.horizon.UnitTestCacheFactory;
import org.horizon.commands.RPCCommand;
import org.horizon.commands.write.PutKeyValueCommand;
import org.horizon.commands.write.RemoveCommand;
import org.horizon.config.Configuration;
import org.horizon.factories.ComponentRegistry;
+import org.horizon.manager.CacheManager;
import org.horizon.remoting.RPCManager;
import org.horizon.remoting.ResponseMode;
import org.horizon.remoting.transport.Address;
import org.horizon.transaction.DummyTransactionManagerLookup;
import org.horizon.transaction.TransactionTable;
import org.horizon.util.TestingUtil;
-import org.horizon.util.internals.ReplicationListener;
import static org.testng.AssertJUnit.*;
-import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
@@ -26,52 +24,37 @@
import javax.transaction.TransactionManager;
import java.util.List;
-@Test(groups = {"functional", "jgroups", "transaction"})
-public class PutForExternalReadTest {
- protected final String key = "k", value = "v", value2 =
"v2";
+@Test(groups = "functional", sequential = true)
+public class PutForExternalReadTest extends BaseClusteredTest {
+ final String key = "k", value = "v", value2 = "v2";
+ Cache cache1, cache2;
+ TransactionManager tm1, tm2;
+ ReplListener replListener1, replListener2;
- protected CacheSPI<String, String> cache1, cache2;
-
- ReplicationListener replListener1;
- ReplicationListener replListener2;
-
- protected TransactionManager tm1, tm2;
-
- protected boolean useTx;
-
-
@BeforeMethod(alwaysRun = true)
public void setUp() {
-
- UnitTestCacheFactory<String, String> cf = new UnitTestCacheFactory<String,
String>();
Configuration c = new Configuration();
c.setCacheMode(Configuration.CacheMode.REPL_SYNC);
- cache1 = (CacheSPI<String, String>) cf.createCache(c, false);
-
cache1.getConfiguration().setTransactionManagerLookupClass(DummyTransactionManagerLookup.class.getName());
- // TODO fix this
-// cache1.getConfiguration().setSerializationExecutorPoolSize(0);//this is very
important for async tests!
+ c.setTransactionManagerLookupClass(DummyTransactionManagerLookup.class.getName());
+ CacheManager cm1 = addCacheManager();
+ CacheManager cm2 = addCacheManager();
+ defineCacheOnAllManagers("replSync", c);
- cache1.start();
- tm1 = cache1.getConfiguration().getRuntimeConfig().getTransactionManager();
+ cache1 = cm1.getCache("replSync");
+ cache2 = cm2.getCache("replSync");
- cache2 = (CacheSPI<String, String>)
cf.createCache(cache1.getConfiguration().clone());
-
- tm2 = cache2.getConfiguration().getRuntimeConfig().getTransactionManager();
- replListener1 = new ReplicationListener(cache1);
- replListener2 = new ReplicationListener(cache2);
-
+ tm1 = TestingUtil.getTransactionManager(cache1);
+ tm2 = TestingUtil.getTransactionManager(cache2);
TestingUtil.blockUntilViewsReceived(10000, cache1, cache2);
- }
- @AfterMethod(alwaysRun = true)
- public void tearDown() {
- TestingUtil.killCaches(cache1, cache2);
+ replListener1 = attachReplicationListener(cache1);
+ replListener2 = attachReplicationListener(cache2);
}
public void testNoOpWhenKeyPresent() {
replListener2.expect(PutKeyValueCommand.class);
cache1.putForExternalRead(key, value);
- replListener2.waitForReplicationToOccur();
+ replListener2.waitForReplication();
assertEquals("PFER should have succeeded", value, cache1.get(key));
@@ -80,14 +63,14 @@
// reset
replListener2.expect(RemoveCommand.class);
cache1.remove(key);
- replListener2.waitForReplicationToOccur();
+ replListener2.waitForReplication();
assert cache1.isEmpty() : "Should have reset";
assert cache2.isEmpty() : "Should have reset";
replListener2.expect(PutKeyValueCommand.class);
cache1.put(key, value);
- replListener2.waitForReplicationToOccur();
+ replListener2.waitForReplication();
// now this pfer should be a no-op
cache1.putForExternalRead(key, value2);
@@ -135,7 +118,7 @@
// create parent node first
replListener2.expect(PutKeyValueCommand.class);
cache1.put(key + "0", value);
- replListener2.waitForReplicationToOccur();
+ replListener2.waitForReplication();
// start a tx and do some stuff.
replListener2.expect(PutKeyValueCommand.class);
@@ -144,7 +127,7 @@
cache1.putForExternalRead(key, value); // should have happened in a separate tx and
have committed already.
Transaction t = tm1.suspend();
- replListener2.waitForReplicationToOccur();
+ replListener2.waitForReplication();
assertEquals("PFER should have completed", value, cache1.get(key));
assertEquals("PFER should have completed", value, cache2.get(key));
@@ -200,7 +183,7 @@
replListener2.expect(PutKeyValueCommand.class);
cache1.putForExternalRead(key, value);
- replListener2.waitForReplicationToOccur();
+ replListener2.waitForReplication();
assertEquals("PFER updated cache1", value, cache1.get(key));
assertEquals("PFER propagated to cache2 as expected", value,
cache2.get(key));
@@ -243,7 +226,7 @@
tm1.begin();
cache1.putForExternalRead(key, value);
tm1.commit();
- replListener2.waitForReplicationToOccur();
+ replListener2.waitForReplication();
TransactionTable tt1 = getTransactionTable(cache1);
TransactionTable tt2 = getTransactionTable(cache2);
@@ -260,7 +243,7 @@
cache1.putForExternalRead(key, value);
cache1.put(key, value);
tm1.commit();
- replListener2.waitForReplicationToOccur();
+ replListener2.waitForReplication();
assert tt1.getNumGlobalTransactions() == 0 : "Cache 1 should have no stale
global TXs";
assert tt1.getNumLocalTransactions() == 0 : "Cache 1 should have no stale
local TXs";
@@ -272,7 +255,7 @@
cache1.put(key, value);
cache1.putForExternalRead(key, value);
tm1.commit();
- replListener2.waitForReplicationToOccur();
+ replListener2.waitForReplication();
assert tt1.getNumGlobalTransactions() == 0 : "Cache 1 should have no stale
global TXs";
assert tt1.getNumLocalTransactions() == 0 : "Cache 1 should have no stale
local TXs";
@@ -286,7 +269,7 @@
cache1.putForExternalRead(key, value);
cache1.put(key, value);
tm1.commit();
- replListener2.waitForReplicationToOccur();
+ replListener2.waitForReplication();
assert tt1.getNumGlobalTransactions() == 0 : "Cache 1 should have no stale
global TXs";
assert tt1.getNumLocalTransactions() == 0 : "Cache 1 should have no stale
local TXs";
Modified: core/branches/flat/src/test/java/org/horizon/api/tree/NodeAPITest.java
===================================================================
--- core/branches/flat/src/test/java/org/horizon/api/tree/NodeAPITest.java 2009-01-28
14:34:08 UTC (rev 7605)
+++ core/branches/flat/src/test/java/org/horizon/api/tree/NodeAPITest.java 2009-01-28
18:26:32 UTC (rev 7606)
@@ -1,8 +1,8 @@
package org.horizon.api.tree;
-import org.horizon.CacheSPI;
-import org.horizon.UnitTestCacheFactory;
import org.horizon.config.Configuration;
+import org.horizon.manager.CacheManager;
+import org.horizon.manager.DefaultCacheManager;
import org.horizon.transaction.DummyTransactionManager;
import org.horizon.transaction.DummyTransactionManagerLookup;
import org.horizon.tree.Fqn;
@@ -11,7 +11,6 @@
import org.horizon.tree.TreeCacheImpl;
import org.horizon.util.TestingUtil;
import static org.testng.AssertJUnit.*;
-import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
@@ -26,35 +25,26 @@
*
* @author <a href="mailto:manik AT jboss DOT org">Manik
Surtani</a>
*/
-@Test(groups = "functional")
+@Test(groups = "functional", sequential = true)
public class NodeAPITest {
- protected ThreadLocal<TreeCache<Object, Object>> cacheTL = new
ThreadLocal<TreeCache<Object, Object>>();
- protected static final Fqn A = Fqn.fromString("/a"), B =
Fqn.fromString("/b"), C = Fqn.fromString("/c"), D =
Fqn.fromString("/d");
- protected Fqn A_B = Fqn.fromRelativeFqn(A, B);
- protected Fqn A_C = Fqn.fromRelativeFqn(A, C);
- protected TransactionManager tm;
+ static final Fqn A = Fqn.fromString("/a"), B =
Fqn.fromString("/b"), C = Fqn.fromString("/c"), D =
Fqn.fromString("/d");
+ Fqn A_B = Fqn.fromRelativeFqn(A, B);
+ Fqn A_C = Fqn.fromRelativeFqn(A, C);
+ TransactionManager tm;
+ TreeCache cache;
@BeforeMethod(alwaysRun = true)
public void setUp() throws Exception {
// start a single cache instance
Configuration c = new Configuration();
c.setTransactionManagerLookupClass(DummyTransactionManagerLookup.class.getName());
- CacheSPI<Object, Object> cache = (CacheSPI<Object, Object>) new
UnitTestCacheFactory<Object, Object>().createCache(c, false);
- cache.getConfiguration().setInvocationBatchingEnabled(true);
- cache.start();
- cacheTL.set(new TreeCacheImpl(cache));
- tm = cache.getTransactionManager();
+ c.setInvocationBatchingEnabled(true);
+ CacheManager cm = new DefaultCacheManager(c);
+ cache = new TreeCacheImpl(cm.getCache());
+ tm = TestingUtil.getTransactionManager(cache.getCache());
}
- @AfterMethod(alwaysRun = true)
- public void tearDown() {
- TreeCache<Object, Object> cache = cacheTL.get();
- TestingUtil.killTreeCaches(cache);
- cacheTL.set(null);
- }
-
public void testAddingData() {
- TreeCache<Object, Object> cache = cacheTL.get();
Node<Object, Object> rootNode = cache.getRoot();
Node<Object, Object> nodeA = rootNode.addChild(A);
nodeA.put("key", "value");
@@ -63,7 +53,6 @@
}
public void testAddingDataTx() throws Exception {
- TreeCache<Object, Object> cache = cacheTL.get();
Node<Object, Object> rootNode = cache.getRoot();
tm.begin();
Node<Object, Object> nodeA = rootNode.addChild(A);
@@ -74,7 +63,6 @@
}
public void testOverwritingDataTx() throws Exception {
- TreeCache<Object, Object> cache = cacheTL.get();
Node<Object, Object> rootNode = cache.getRoot();
Node<Object, Object> nodeA = rootNode.addChild(A);
@@ -93,7 +81,7 @@
* Remember, Fqns are relative!!
*/
public void testParentsAndChildren() {
- TreeCache<Object, Object> cache = cacheTL.get();
+
Node<Object, Object> rootNode = cache.getRoot();
Node<Object, Object> nodeA = rootNode.addChild(A);
@@ -138,7 +126,7 @@
public void testImmutabilityOfData() {
- TreeCache<Object, Object> cache = cacheTL.get();
+
Node<Object, Object> rootNode = cache.getRoot();
rootNode.put("key", "value");
@@ -161,7 +149,7 @@
}
public void testDefensiveCopyOfData() {
- TreeCache<Object, Object> cache = cacheTL.get();
+
Node<Object, Object> rootNode = cache.getRoot();
rootNode.put("key", "value");
@@ -187,7 +175,7 @@
}
public void testDefensiveCopyOfChildren() {
- TreeCache<Object, Object> cache = cacheTL.get();
+
Node<Object, Object> rootNode = cache.getRoot();
Fqn childFqn = Fqn.fromString("/child");
@@ -215,7 +203,7 @@
public void testImmutabilityOfChildren() {
- TreeCache<Object, Object> cache = cacheTL.get();
+
Node<Object, Object> rootNode = cache.getRoot();
rootNode.addChild(A);
@@ -230,7 +218,7 @@
}
public void testGetChildAPI() {
- TreeCache<Object, Object> cache = cacheTL.get();
+
Node<Object, Object> rootNode = cache.getRoot();
// creates a Node<Object, Object> with fqn /a/b/c
@@ -249,7 +237,7 @@
}
public void testClearingData() {
- TreeCache<Object, Object> cache = cacheTL.get();
+
Node<Object, Object> rootNode = cache.getRoot();
rootNode.put("k", "v");
@@ -261,7 +249,7 @@
}
public void testClearingDataTx() throws Exception {
- TreeCache<Object, Object> cache = cacheTL.get();
+
Node<Object, Object> rootNode = cache.getRoot();
tm.begin();
@@ -276,7 +264,7 @@
}
public void testPutData() {
- TreeCache<Object, Object> cache = cacheTL.get();
+
Node<Object, Object> rootNode = cache.getRoot();
assertTrue(rootNode.getData().isEmpty());
@@ -311,7 +299,7 @@
}
public void testGetChildrenNames() throws Exception {
- TreeCache<Object, Object> cache = cacheTL.get();
+
Node<Object, Object> rootNode = cache.getRoot();
rootNode.addChild(A).put("k", "v");
@@ -334,8 +322,8 @@
}
public void testDoubleRemovalOfData() throws Exception {
- TreeCache<Object, Object> cache = cacheTL.get();
+
assert DummyTransactionManager.getInstance().getTransaction() == null;
cache.put("/foo/1/2/3", "item", 1);
assert DummyTransactionManager.getInstance().getTransaction() == null;
@@ -356,8 +344,8 @@
}
public void testDoubleRemovalOfData2() throws Exception {
- TreeCache<Object, Object> cache = cacheTL.get();
+
cache.put("/foo/1/2", "item", 1);
tm.begin();
assertEquals(cache.get("/foo/1", "item"), null);
Modified:
core/branches/flat/src/test/java/org/horizon/api/tree/NodeReplicatedMoveTest.java
===================================================================
---
core/branches/flat/src/test/java/org/horizon/api/tree/NodeReplicatedMoveTest.java 2009-01-28
14:34:08 UTC (rev 7605)
+++
core/branches/flat/src/test/java/org/horizon/api/tree/NodeReplicatedMoveTest.java 2009-01-28
18:26:32 UTC (rev 7606)
@@ -6,8 +6,10 @@
*/
package org.horizon.api.tree;
-import org.horizon.UnitTestCacheFactory;
+import org.horizon.BaseClusteredTest;
+import org.horizon.Cache;
import org.horizon.config.Configuration;
+import org.horizon.manager.CacheManager;
import org.horizon.transaction.DummyTransactionManagerLookup;
import org.horizon.tree.Fqn;
import org.horizon.tree.Node;
@@ -16,56 +18,46 @@
import org.horizon.util.TestingUtil;
import static org.testng.AssertJUnit.assertEquals;
import static org.testng.AssertJUnit.assertNull;
-import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
import javax.transaction.TransactionManager;
-@Test(groups = {"functional", "jgroups", "pessimistic"},
testName = "api.NodeReplicatedMoveTest")
-public class NodeReplicatedMoveTest {
- private class NodeReplicatedMoveTestTL {
- protected TreeCache<Object, Object> cache1;
- protected TreeCache<Object, Object> cache2;
- protected TransactionManager tm;
- }
+@Test(groups = "functional", sequential = true, testName =
"api.NodeReplicatedMoveTest")
+public class NodeReplicatedMoveTest extends BaseClusteredTest {
- protected ThreadLocal<NodeReplicatedMoveTestTL> threadLocal = new
ThreadLocal<NodeReplicatedMoveTestTL>();
+ static final Fqn A = Fqn.fromString("/a"), B =
Fqn.fromString("/b"), C = Fqn.fromString("/c"), D =
Fqn.fromString("/d"), E = Fqn.fromString("/e");
+ static final Object k = "key", vA = "valueA", vB =
"valueB", vC = "valueC", vD = "valueD", vE =
"valueE";
- protected static final Fqn A = Fqn.fromString("/a"), B =
Fqn.fromString("/b"), C = Fqn.fromString("/c"), D =
Fqn.fromString("/d"), E = Fqn.fromString("/e");
- protected static final Object k = "key", vA = "valueA", vB =
"valueB", vC = "valueC", vD = "valueD", vE =
"valueE";
+ TreeCache<Object, Object> cache1, cache2;
+ TransactionManager tm1;
@BeforeMethod(alwaysRun = true)
public void setUp() throws Exception {
- NodeReplicatedMoveTestTL tl = new NodeReplicatedMoveTestTL();
- threadLocal.set(tl);
Configuration c = new Configuration();
c.setInvocationBatchingEnabled(true);
c.setCacheMode(Configuration.CacheMode.REPL_SYNC);
c.setTransactionManagerLookupClass(DummyTransactionManagerLookup.class.getName());
c.setSyncCommitPhase(true);
c.setSyncRollbackPhase(true);
- // start a single cache instance
- tl.cache1 = new TreeCacheImpl<Object, Object>(new
UnitTestCacheFactory<Object, Object>().createCache(c.clone()));
- tl.cache2 = new TreeCacheImpl<Object, Object>(new
UnitTestCacheFactory<Object, Object>().createCache(c.clone()));
- tl.tm = tl.cache1.getConfiguration().getRuntimeConfig().getTransactionManager();
- TestingUtil.blockUntilViewsReceived(10000, tl.cache1.getCache(),
tl.cache2.getCache());
- }
+ CacheManager cm1 = addCacheManager();
+ CacheManager cm2 = addCacheManager();
- @AfterMethod(alwaysRun = true)
- public void tearDown() {
- NodeReplicatedMoveTestTL tl = threadLocal.get();
- if (tl != null) {
- TestingUtil.killTreeCaches(tl.cache1, tl.cache2);
- threadLocal.set(null);
- }
- }
+ defineCacheOnAllManagers("replSync", c);
+ Cache c1 = cm1.getCache("replSync");
+ Cache c2 = cm2.getCache("replSync");
+ tm1 = TestingUtil.getTransactionManager(c1);
+ TestingUtil.blockUntilViewsReceived(10000, cm1, cm2);
+
+ cache1 = new TreeCacheImpl<Object, Object>(c1);
+ cache2 = new TreeCacheImpl<Object, Object>(c2);
+ }
+
public void testReplicatability() {
- NodeReplicatedMoveTestTL tl = threadLocal.get();
- Node<Object, Object> rootNode = tl.cache1.getRoot();
+ Node<Object, Object> rootNode = cache1.getRoot();
Node<Object, Object> nodeA = rootNode.addChild(A);
Node<Object, Object> nodeB = nodeA.addChild(B);
@@ -73,25 +65,24 @@
nodeA.put(k, vA);
nodeB.put(k, vB);
- assertEquals(vA, tl.cache1.getRoot().getChild(A).get(k));
- assertEquals(vB, tl.cache1.getRoot().getChild(A).getChild(B).get(k));
+ assertEquals(vA, cache1.getRoot().getChild(A).get(k));
+ assertEquals(vB, cache1.getRoot().getChild(A).getChild(B).get(k));
- assertEquals(vA, tl.cache2.getRoot().getChild(A).get(k));
- assertEquals(vB, tl.cache2.getRoot().getChild(A).getChild(B).get(k));
+ assertEquals(vA, cache2.getRoot().getChild(A).get(k));
+ assertEquals(vB, cache2.getRoot().getChild(A).getChild(B).get(k));
// now move...
- tl.cache1.move(nodeB.getFqn(), Fqn.ROOT);
+ cache1.move(nodeB.getFqn(), Fqn.ROOT);
- assertEquals(vA, tl.cache1.getRoot().getChild(A).get(k));
- assertEquals(vB, tl.cache1.getRoot().getChild(B).get(k));
+ assertEquals(vA, cache1.getRoot().getChild(A).get(k));
+ assertEquals(vB, cache1.getRoot().getChild(B).get(k));
- assertEquals(vA, tl.cache2.getRoot().getChild(A).get(k));
- assertEquals(vB, tl.cache2.getRoot().getChild(B).get(k));
+ assertEquals(vA, cache2.getRoot().getChild(A).get(k));
+ assertEquals(vB, cache2.getRoot().getChild(B).get(k));
}
public void testReplTxCommit() throws Exception {
- NodeReplicatedMoveTestTL tl = threadLocal.get();
- Node<Object, Object> rootNode = tl.cache1.getRoot();
+ Node<Object, Object> rootNode = cache1.getRoot();
Fqn A_B = Fqn.fromRelativeFqn(A, B);
Node<Object, Object> nodeA = rootNode.addChild(A);
Node<Object, Object> nodeB = nodeA.addChild(B);
@@ -99,54 +90,53 @@
nodeA.put(k, vA);
nodeB.put(k, vB);
- assertEquals(vA, tl.cache1.getRoot().getChild(A).get(k));
- assertEquals(vB, tl.cache1.getRoot().getChild(A).getChild(B).get(k));
+ assertEquals(vA, cache1.getRoot().getChild(A).get(k));
+ assertEquals(vB, cache1.getRoot().getChild(A).getChild(B).get(k));
- assertEquals(vA, tl.cache2.getRoot().getChild(A).get(k));
- assertEquals(vB, tl.cache2.getRoot().getChild(A).getChild(B).get(k));
+ assertEquals(vA, cache2.getRoot().getChild(A).get(k));
+ assertEquals(vB, cache2.getRoot().getChild(A).getChild(B).get(k));
// now move...
- tl.tm.begin();
- tl.cache1.move(nodeB.getFqn(), Fqn.ROOT);
+ tm1.begin();
+ cache1.move(nodeB.getFqn(), Fqn.ROOT);
- assertEquals(vA, tl.cache1.get(A, k));
- assertNull(tl.cache1.get(A_B, k));
- assertEquals(vB, tl.cache1.get(B, k));
- tl.tm.commit();
+ assertEquals(vA, cache1.get(A, k));
+ assertNull(cache1.get(A_B, k));
+ assertEquals(vB, cache1.get(B, k));
+ tm1.commit();
- assertEquals(vA, tl.cache1.getRoot().getChild(A).get(k));
- assertEquals(vB, tl.cache1.getRoot().getChild(B).get(k));
- assertEquals(vA, tl.cache2.getRoot().getChild(A).get(k));
- assertEquals(vB, tl.cache2.getRoot().getChild(B).get(k));
+ assertEquals(vA, cache1.getRoot().getChild(A).get(k));
+ assertEquals(vB, cache1.getRoot().getChild(B).get(k));
+ assertEquals(vA, cache2.getRoot().getChild(A).get(k));
+ assertEquals(vB, cache2.getRoot().getChild(B).get(k));
}
public void testReplTxRollback() throws Exception {
- NodeReplicatedMoveTestTL tl = threadLocal.get();
- Node<Object, Object> rootNode = tl.cache1.getRoot();
+ Node<Object, Object> rootNode = cache1.getRoot();
Node<Object, Object> nodeA = rootNode.addChild(A);
Node<Object, Object> nodeB = nodeA.addChild(B);
nodeA.put(k, vA);
nodeB.put(k, vB);
- assertEquals(vA, tl.cache1.getRoot().getChild(A).get(k));
- assertEquals(vB, tl.cache1.getRoot().getChild(A).getChild(B).get(k));
- assertEquals(vA, tl.cache2.getRoot().getChild(A).get(k));
- assertEquals(vB, tl.cache2.getRoot().getChild(A).getChild(B).get(k));
+ assertEquals(vA, cache1.getRoot().getChild(A).get(k));
+ assertEquals(vB, cache1.getRoot().getChild(A).getChild(B).get(k));
+ assertEquals(vA, cache2.getRoot().getChild(A).get(k));
+ assertEquals(vB, cache2.getRoot().getChild(A).getChild(B).get(k));
// now move...
- tl.tm.begin();
- tl.cache1.move(nodeB.getFqn(), Fqn.ROOT);
+ tm1.begin();
+ cache1.move(nodeB.getFqn(), Fqn.ROOT);
- assertEquals(vA, tl.cache1.get(A, k));
- assertEquals(vB, tl.cache1.get(B, k));
+ assertEquals(vA, cache1.get(A, k));
+ assertEquals(vB, cache1.get(B, k));
- tl.tm.rollback();
+ tm1.rollback();
- assertEquals(vA, tl.cache1.getRoot().getChild(A).get(k));
- assertEquals(vB, tl.cache1.getRoot().getChild(A).getChild(B).get(k));
- assertEquals(vA, tl.cache2.getRoot().getChild(A).get(k));
- assertEquals(vB, tl.cache2.getRoot().getChild(A).getChild(B).get(k));
+ assertEquals(vA, cache1.getRoot().getChild(A).get(k));
+ assertEquals(vB, cache1.getRoot().getChild(A).getChild(B).get(k));
+ assertEquals(vA, cache2.getRoot().getChild(A).get(k));
+ assertEquals(vB, cache2.getRoot().getChild(A).getChild(B).get(k));
}
}
Modified: core/branches/flat/src/test/java/org/horizon/api/tree/SyncReplTest.java
===================================================================
--- core/branches/flat/src/test/java/org/horizon/api/tree/SyncReplTest.java 2009-01-28
14:34:08 UTC (rev 7605)
+++ core/branches/flat/src/test/java/org/horizon/api/tree/SyncReplTest.java 2009-01-28
18:26:32 UTC (rev 7606)
@@ -7,17 +7,16 @@
package org.horizon.api.tree;
+import org.horizon.BaseClusteredTest;
import org.horizon.Cache;
-import org.horizon.CacheSPI;
-import org.horizon.UnitTestCacheFactory;
import org.horizon.config.Configuration;
+import org.horizon.manager.CacheManager;
import org.horizon.tree.Fqn;
import org.horizon.tree.Node;
import org.horizon.tree.TreeCache;
import org.horizon.tree.TreeCacheImpl;
import org.horizon.util.TestingUtil;
import static org.testng.AssertJUnit.*;
-import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
@@ -27,37 +26,31 @@
/**
* @author <a href="mailto:manik AT jboss DOT org">Manik Surtani (manik
AT jboss DOT org)</a>
*/
-@Test(groups = {"functional", "jgroups", "pessimistic"},
sequential = true, testName = "api.SyncReplTest")
-public class SyncReplTest {
- private CacheSPI<Object, Object> c1, c2;
+@Test(groups = "functional", sequential = true, testName =
"api.SyncReplTest")
+public class SyncReplTest extends BaseClusteredTest {
private TreeCache<Object, Object> cache1, cache2;
@BeforeMethod(alwaysRun = true)
public void setUp() {
- System.out.println("*** In setUp()");
Configuration c = new Configuration();
c.setCacheMode(Configuration.CacheMode.REPL_SYNC);
c.setInvocationBatchingEnabled(true);
c.setFetchInMemoryState(false);
- c1 = (CacheSPI<Object, Object>) new UnitTestCacheFactory<Object,
Object>().createCache(c.clone());
- c2 = (CacheSPI<Object, Object>) new UnitTestCacheFactory<Object,
Object>().createCache(c.clone());
+ CacheManager cm1 = addCacheManager();
+ CacheManager cm2 = addCacheManager();
+ defineCacheOnAllManagers("replSync", c);
+
+ Cache c1 = cm1.getCache("replSync");
+ Cache c2 = cm2.getCache("replSync");
+
TestingUtil.blockUntilViewsReceived(new Cache[]{c1, c2}, 5000);
cache1 = new TreeCacheImpl<Object, Object>(c1);
cache2 = new TreeCacheImpl<Object, Object>(c2);
-
- System.out.println("*** Finished setUp()");
}
- @AfterMethod(alwaysRun = true)
- public void tearDown() {
- TestingUtil.killCaches(c1, c2);
- cache1 = null;
- cache2 = null;
- }
-
public void testBasicOperation() {
assertClusterSize("Should only be 2 caches in the cluster!!!", 2);
@@ -82,8 +75,8 @@
assertClusterSize("Should only be 2 caches in the cluster!!!", 2);
Fqn fqn = Fqn.fromString("/JSESSIONID/1010.10.5:3000/1234567890/1");
- cache1.getConfiguration().setSyncCommitPhase(true);
- cache2.getConfiguration().setSyncCommitPhase(true);
+ cache1.getCache().getConfiguration().setSyncCommitPhase(true);
+ cache2.getCache().getConfiguration().setSyncCommitPhase(true);
cache1.put(fqn, "age", 38);
@@ -100,28 +93,18 @@
Map<Object, Object> map = new HashMap<Object, Object>();
map.put("1", "1");
map.put("2", "2");
- cache1.getInvocationContext().getOptionOverrides().setSuppressLocking(true);
+
cache1.getCache().getInvocationContext().getOptionOverrides().setSuppressLocking(true);
cache1.getRoot().addChild(fqn).putAll(map);
- cache1.getInvocationContext().getOptionOverrides().setSuppressLocking(true);
+
cache1.getCache().getInvocationContext().getOptionOverrides().setSuppressLocking(true);
assertEquals("Value should be set", "1", cache1.get(fqn,
"1"));
map = new HashMap<Object, Object>();
map.put("3", "3");
map.put("4", "4");
- cache1.getInvocationContext().getOptionOverrides().setSuppressLocking(true);
+
cache1.getCache().getInvocationContext().getOptionOverrides().setSuppressLocking(true);
cache1.getRoot().addChild(fqn1).putAll(map);
- cache1.getInvocationContext().getOptionOverrides().setSuppressLocking(true);
+
cache1.getCache().getInvocationContext().getOptionOverrides().setSuppressLocking(true);
assertEquals("Value should be set", "2", cache1.get(fqn,
"2"));
}
-
-
- private void assertClusterSize(String message, int size) {
- assertClusterSize(message, size, cache1);
- assertClusterSize(message, size, cache2);
- }
-
- private void assertClusterSize(String message, int size, TreeCache c) {
- assertEquals(message, size, c.getMembers().size());
- }
}
Modified: core/branches/flat/src/test/java/org/horizon/api/tree/SyncReplTxTest.java
===================================================================
--- core/branches/flat/src/test/java/org/horizon/api/tree/SyncReplTxTest.java 2009-01-28
14:34:08 UTC (rev 7605)
+++ core/branches/flat/src/test/java/org/horizon/api/tree/SyncReplTxTest.java 2009-01-28
18:26:32 UTC (rev 7606)
@@ -7,10 +7,10 @@
package org.horizon.api.tree;
+import org.horizon.BaseClusteredTest;
import org.horizon.Cache;
-import org.horizon.CacheSPI;
-import org.horizon.UnitTestCacheFactory;
import org.horizon.config.Configuration;
+import org.horizon.manager.CacheManager;
import org.horizon.transaction.DummyTransactionManagerLookup;
import org.horizon.tree.Fqn;
import org.horizon.tree.Node;
@@ -18,7 +18,6 @@
import org.horizon.tree.TreeCacheImpl;
import org.horizon.util.TestingUtil;
import static org.testng.AssertJUnit.*;
-import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
@@ -28,22 +27,16 @@
import javax.transaction.RollbackException;
import javax.transaction.SystemException;
import javax.transaction.TransactionManager;
-import java.util.ArrayList;
-import java.util.List;
/**
* @author <a href="mailto:manik AT jboss DOT org">Manik Surtani (manik
AT jboss DOT org)</a>
*/
-@Test(groups = {"functional", "jgroups", "transaction",
"pessimistic"}, sequential = true, testName = "api.SyncReplTxTest")
-public class SyncReplTxTest {
- private List<CacheSPI<Object, Object>> flatCaches;
- private List<TreeCache<Object, Object>> caches;
+@Test(groups = "functional", sequential = true, testName =
"api.SyncReplTxTest")
+public class SyncReplTxTest extends BaseClusteredTest {
+ TreeCache<Object, Object> cache1, cache2;
@BeforeMethod(alwaysRun = true)
public void setUp() throws CloneNotSupportedException {
- System.out.println("*** In setUp()");
- caches = new ArrayList<TreeCache<Object, Object>>();
- flatCaches = new ArrayList<CacheSPI<Object, Object>>();
Configuration c = new Configuration();
c.setCacheMode(Configuration.CacheMode.REPL_SYNC);
c.setFetchInMemoryState(false);
@@ -52,30 +45,22 @@
c.setSyncRollbackPhase(true);
c.setTransactionManagerLookupClass(DummyTransactionManagerLookup.class.getName());
- CacheSPI<Object, Object> cache1 = (CacheSPI<Object, Object>) new
UnitTestCacheFactory<Object, Object>().createCache(c.clone());
- CacheSPI<Object, Object> cache2 = (CacheSPI<Object, Object>) new
UnitTestCacheFactory<Object, Object>().createCache(c.clone());
+ CacheManager cm1 = addCacheManager();
+ CacheManager cm2 = addCacheManager();
- flatCaches.add(cache1);
- flatCaches.add(cache2);
+ defineCacheOnAllManagers("replSync", c);
- TestingUtil.blockUntilViewsReceived(caches.toArray(new Cache[0]), 10000);
+ Cache c1 = cm1.getCache("replSync");
+ Cache c2 = cm2.getCache("replSync");
- caches.add(new TreeCacheImpl<Object, Object>(cache1));
- caches.add(new TreeCacheImpl<Object, Object>(cache2));
+ TestingUtil.blockUntilViewsReceived(10000, cm1, cm2);
- System.out.println("*** Finished setUp()");
+ cache1 = new TreeCacheImpl<Object, Object>(c1);
+ cache2 = new TreeCacheImpl<Object, Object>(c2);
}
- @AfterMethod(alwaysRun = true)
- public void tearDown() {
- System.out.println("*** In tearDown()");
- TestingUtil.killTreeCaches(caches);
- caches = null;
- System.out.println("*** Finished tearDown()");
- }
-
- private TransactionManager beginTransaction(Cache<Object, Object> cache) throws
NotSupportedException, SystemException {
- TransactionManager mgr =
cache.getConfiguration().getRuntimeConfig().getTransactionManager();
+ private TransactionManager beginTransaction(Cache cache) throws NotSupportedException,
SystemException {
+ TransactionManager mgr = TestingUtil.getTransactionManager(cache);
mgr.begin();
return mgr;
}
@@ -86,29 +71,19 @@
Fqn f = Fqn.fromString("/test/data");
String k = "key", v = "value";
- assertNull("Should be null", caches.get(0).getRoot().getChild(f));
- assertNull("Should be null", caches.get(1).getRoot().getChild(f));
+ assertNull("Should be null", cache1.getRoot().getChild(f));
+ assertNull("Should be null", cache2.getRoot().getChild(f));
- Node<Object, Object> node = caches.get(0).getRoot().addChild(f);
+ Node<Object, Object> node = cache1.getRoot().addChild(f);
assertNotNull("Should not be null", node);
- TransactionManager tm = beginTransaction(caches.get(0).getCache());
+ TransactionManager tm = beginTransaction(cache1.getCache());
node.put(k, v);
tm.commit();
assertEquals(v, node.get(k));
- assertEquals(v, caches.get(0).get(f, k));
- assertEquals("Should have replicated", v, caches.get(1).get(f, k));
+ assertEquals(v, cache1.get(f, k));
+ assertEquals("Should have replicated", v, cache2.get(f, k));
}
-
- private void assertClusterSize(String message, int size) {
- for (Cache<Object, Object> c : flatCaches) {
- assertClusterSize(message, size, c);
- }
- }
-
- private void assertClusterSize(String message, int size, Cache<Object, Object>
c) {
- assertEquals(message, size, c.getCacheManager().getMembers().size());
- }
}
\ No newline at end of file
Modified: core/branches/flat/src/test/java/org/horizon/api/tree/TreeCacheAPITest.java
===================================================================
--- core/branches/flat/src/test/java/org/horizon/api/tree/TreeCacheAPITest.java 2009-01-28
14:34:08 UTC (rev 7605)
+++ core/branches/flat/src/test/java/org/horizon/api/tree/TreeCacheAPITest.java 2009-01-28
18:26:32 UTC (rev 7606)
@@ -222,7 +222,7 @@
}
public void testRpcManagerElements() {
- assertEquals("CacheMode.LOCAL cache has no address", null,
cache.getAddress());
- assertEquals("CacheMode.LOCAL cache has no members list", null,
cache.getMembers());
+ assertEquals("CacheMode.LOCAL cache has no address", null,
cache.getCache().getCacheManager().getAddress());
+ assertEquals("CacheMode.LOCAL cache has no members list", null,
cache.getCache().getCacheManager().getMembers());
}
}
Modified: core/branches/flat/src/test/java/org/horizon/replication/AsyncReplTest.java
===================================================================
--- core/branches/flat/src/test/java/org/horizon/replication/AsyncReplTest.java 2009-01-28
14:34:08 UTC (rev 7605)
+++ core/branches/flat/src/test/java/org/horizon/replication/AsyncReplTest.java 2009-01-28
18:26:32 UTC (rev 7606)
@@ -8,99 +8,89 @@
package org.horizon.replication;
+import org.horizon.BaseClusteredTest;
import org.horizon.Cache;
-import org.horizon.CacheSPI;
-import org.horizon.UnitTestCacheManager;
import org.horizon.config.Configuration;
+import org.horizon.manager.CacheManager;
import org.horizon.transaction.DummyTransactionManagerLookup;
import org.horizon.util.TestingUtil;
-import org.horizon.util.internals.ReplicationListener;
-import static org.testng.AssertJUnit.*;
-import org.testng.annotations.AfterMethod;
+import static org.testng.AssertJUnit.assertEquals;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
import javax.transaction.TransactionManager;
+import java.util.concurrent.TimeUnit;
/**
* Unit test for replicated async CacheSPI. Use locking and multiple threads to test
concurrent access to the tree.
*/
-@Test(groups = {"functional", "jgroups"})
-public class AsyncReplTest {
+@Test(groups = "functional", sequential = true)
+public class AsyncReplTest extends BaseClusteredTest {
- private class AsyncReplTestTL {
- private Configuration configuration;
- private CacheSPI<Object, Object> cache1, cache2;
- private UnitTestCacheManager cacheManager1, cacheManager2;
- private ReplicationListener replListener1, replListener2;
- }
+ Cache cache1, cache2;
- private ThreadLocal<AsyncReplTestTL> threadLocal = new
ThreadLocal<AsyncReplTestTL>();
-
@BeforeMethod(alwaysRun = true)
public void setUp() throws Exception {
- AsyncReplTestTL tl = new AsyncReplTestTL();
- threadLocal.set(tl);
+ Configuration asyncConfiguration = new Configuration();
+ asyncConfiguration.setCacheMode(Configuration.CacheMode.REPL_ASYNC);
+ asyncConfiguration.setSyncCommitPhase(true);
+ asyncConfiguration.setSyncRollbackPhase(true);
+
asyncConfiguration.setTransactionManagerLookupClass(DummyTransactionManagerLookup.class.getName());
- tl.configuration = new Configuration();
- tl.configuration.setCacheMode(Configuration.CacheMode.REPL_ASYNC);
-
tl.configuration.setTransactionManagerLookupClass(DummyTransactionManagerLookup.class.getName());
+ CacheManager cm1 = addCacheManager();
+ CacheManager cm2 = addCacheManager();
- log("creating cache1");
- tl.cacheManager1 = new UnitTestCacheManager(tl.configuration);
- tl.cache1 = (CacheSPI) tl.cacheManager1.createCache("testCache");
- tl.replListener1 = new ReplicationListener(tl.cache1);
+ defineCacheOnAllManagers("asyncRepl", asyncConfiguration);
- log("creating cache2");
- tl.cacheManager2 = new UnitTestCacheManager(tl.configuration);
- tl.cache2 = (CacheSPI) tl.cacheManager2.createCache("testCache");
- tl.replListener2 = new ReplicationListener(tl.cache2);
+ cache1 = cm1.getCache("asyncRepl");
+ cache2 = cm2.getCache("asyncRepl");
}
- /**
- * Provides a hook for multiplexer integration. This default implementation is a
no-op; subclasses that test mux
- * integration would override to integrate the given cache with a multiplexer.
- * <p/>
- * param cache a cache that has been configured but not yet created.
- */
- protected void configureMultiplexer(Cache cache) throws Exception {
- // default does nothing
- }
+ public void testWithNoTx() throws Exception {
+ ReplListener replListener2 = attachReplicationListener(cache2);
- @AfterMethod(alwaysRun = true)
- public void tearDown() throws Exception {
- AsyncReplTestTL tl = threadLocal.get();
- TestingUtil.killCaches(tl.cache1, tl.cache2);
- threadLocal.set(null);
+ String key = "key";
+
+ replListener2.expectAny();
+ cache1.put(key, "value1");
+ // allow for replication
+ replListener2.waitForReplication(60, TimeUnit.SECONDS);
+ assertEquals("value1", cache1.get(key));
+ assertEquals("value1", cache2.get(key));
+
+ replListener2.expectAny();
+ cache1.put(key, "value2");
+ assertEquals("value2", cache1.get(key));
+
+ replListener2.waitForReplication(60, TimeUnit.SECONDS);
+
+ assertEquals("value2", cache1.get(key));
+ assertEquals("value2", cache2.get(key));
}
- public void testTxCompletion() throws Exception {
- AsyncReplTestTL tl = threadLocal.get();
- CacheSPI<Object, Object> cache1 = tl.cache1;
- CacheSPI<Object, Object> cache2 = tl.cache2;
- ReplicationListener replListener1 = tl.replListener1;
- ReplicationListener replListener2 = tl.replListener2;
+ public void testWithTx() throws Exception {
+ ReplListener replListener2 = attachReplicationListener(cache2);
String key = "key";
replListener2.expectAny();
cache1.put(key, "value1");
// allow for replication
- replListener2.waitForReplicationToOccur(500);
+ replListener2.waitForReplication(60, TimeUnit.SECONDS);
assertEquals("value1", cache1.get(key));
assertEquals("value1", cache2.get(key));
- TransactionManager mgr = cache1.getTransactionManager();
+ TransactionManager mgr = TestingUtil.getTransactionManager(cache1);
mgr.begin();
- replListener2.expectAny();
+ replListener2.expectAnyWithTx();
cache1.put(key, "value2");
assertEquals("value2", cache1.get(key));
assertEquals("value1", cache2.get(key));
mgr.commit();
- replListener2.waitForReplicationToOccur(500);
+ replListener2.waitForReplication(60, TimeUnit.SECONDS);
assertEquals("value2", cache1.get(key));
assertEquals("value2", cache2.get(key));
@@ -115,86 +105,4 @@
assertEquals("value2", cache1.get(key));
assertEquals("value2", cache2.get(key));
}
-
- public void testPutShouldNotReplicateToDifferentCluster() {
- AsyncReplTestTL tl = threadLocal.get();
- CacheSPI<Object, Object> cache1 = tl.cache1;
- CacheSPI<Object, Object> cache2 = tl.cache2;
- ReplicationListener replListener1 = tl.replListener1;
- ReplicationListener replListener2 = tl.replListener2;
-
- CacheSPI<Object, Object> cache3 = null, cache4 = null;
- try {
- // TODO: fix this
-// tl.configuration.setClusterName("otherTest");
- cache3 = (CacheSPI<Object, Object>) new
UnitTestCacheManager(tl.configuration).createCache("testCache");
- cache4 = (CacheSPI<Object, Object>) new
UnitTestCacheManager(tl.configuration).createCache("testCache");
- replListener2.expectAny();
- cache1.put("age", 38);
- // because we use async repl, modfication may not yet have been propagated to
cache2, so
- // we have to wait a little
- replListener2.waitForReplicationToOccur(500);
- assertNull("Should not have replicated",
cache3.get("age"));
- }
- catch (Exception e) {
- fail(e.toString());
- }
- finally {
- if (cache3 != null) {
- cache3.stop();
- }
- if (cache4 != null) {
- cache4.stop();
- }
- }
- }
-
- public void testAsyncReplDelay() {
- Integer age;
- AsyncReplTestTL tl = threadLocal.get();
- CacheSPI<Object, Object> cache1 = tl.cache1;
- CacheSPI<Object, Object> cache2 = tl.cache2;
- ReplicationListener replListener1 = tl.replListener1;
- ReplicationListener replListener2 = tl.replListener2;
-
- try {
- cache1.put("age", 38);
-
- // value on cache2 may be 38 or not yet replicated
- age = (Integer) cache2.get("age");
- log("attr \"age\" of \"/a/b/c\" on cache2=" +
age);
- assertTrue("should be either null or 38", age == null || age == 38);
- }
- catch (Exception e) {
- fail(e.toString());
- }
- }
-
- public void testAsyncReplTxDelay() {
- Integer age;
- AsyncReplTestTL tl = threadLocal.get();
- CacheSPI<Object, Object> cache1 = tl.cache1;
- CacheSPI<Object, Object> cache2 = tl.cache2;
- ReplicationListener replListener1 = tl.replListener1;
- ReplicationListener replListener2 = tl.replListener2;
-
- try {
- TransactionManager tm = cache1.getTransactionManager();
- tm.begin();
- cache1.put("age", 38);
- tm.commit();
-
- // value on cache2 may be 38 or not yet replicated
- age = (Integer) cache2.get("age");
- log("attr \"age\" of \"/a/b/c\" on cache2=" +
age);
- assertTrue("should be either null or 38", age == null || age == 38);
- }
- catch (Exception e) {
- fail(e.toString());
- }
- }
-
- private void log(String msg) {
- System.out.println("-- [" + Thread.currentThread() + "]: " +
msg);
- }
}
Modified: core/branches/flat/src/test/java/org/horizon/replication/SyncReplTest.java
===================================================================
--- core/branches/flat/src/test/java/org/horizon/replication/SyncReplTest.java 2009-01-28
14:34:08 UTC (rev 7605)
+++ core/branches/flat/src/test/java/org/horizon/replication/SyncReplTest.java 2009-01-28
18:26:32 UTC (rev 7606)
@@ -7,7 +7,7 @@
package org.horizon.replication;
import static org.easymock.EasyMock.*;
-import org.horizon.BaseReplicatedTest;
+import org.horizon.BaseClusteredTest;
import org.horizon.Cache;
import org.horizon.commands.RPCCommand;
import org.horizon.config.Configuration;
@@ -32,7 +32,7 @@
* @author <a href="mailto:manik@jboss.org">Manik Surtani
(manik(a)jboss.org)</a>
*/
@Test(groups = "functional", sequential = true)
-public class SyncReplTest extends BaseReplicatedTest {
+public class SyncReplTest extends BaseClusteredTest {
Cache cache1, cache2;
String k = "key", v = "value";
Deleted:
core/branches/flat/src/test/java/org/horizon/util/internals/ReplicationListener.java
===================================================================
---
core/branches/flat/src/test/java/org/horizon/util/internals/ReplicationListener.java 2009-01-28
14:34:08 UTC (rev 7605)
+++
core/branches/flat/src/test/java/org/horizon/util/internals/ReplicationListener.java 2009-01-28
18:26:32 UTC (rev 7606)
@@ -1,254 +0,0 @@
-package org.horizon.util.internals;
-
-import org.horizon.Cache;
-import org.horizon.commands.ReplicableCommand;
-import org.horizon.commands.remote.ReplicateCommand;
-import org.horizon.commands.tx.CommitCommand;
-import org.horizon.commands.tx.PrepareCommand;
-import org.horizon.config.Configuration;
-import org.horizon.context.InvocationContext;
-import org.horizon.factories.ComponentRegistry;
-import org.horizon.io.ByteBuffer;
-import org.horizon.logging.Log;
-import org.horizon.logging.LogFactory;
-import org.horizon.marshall.HorizonMarshaller;
-import org.horizon.marshall.Marshaller;
-import org.horizon.remoting.RPCManager;
-import org.horizon.remoting.transport.jgroups.CommandAwareRpcDispatcher;
-import org.horizon.util.TestingUtil;
-import org.jgroups.blocks.RpcDispatcher;
-import org.jgroups.util.Buffer;
-
-import java.io.InputStream;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-import java.util.Arrays;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.Set;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
-
-/*
-* Utility class that notifies when certain commands were asynchronously replicated on
secondary cache.
- * Especially useful for avaoiding Thread.sleep() statements.
- * <p/>
- * Usage:
- * <pre>
- * Cache c1, c2; //these being two async caches
- * AsyncReplicationListener listener2 = new AsyncReplicationListener(c2);
- * listener2.expect(PutKeyValueCommand.class);
- * c1.put(fqn, key, value);
- * listener2.waitForReplicationToOccur(1000); // -this will block here untill c2
recieves the PutKeyValueCommand command
- * </pre>
- * Lifecycle - after being used (i.e. waitForReplicationToOccur returns sucessfully) the
object returns to the
- * non-initialized state and *can* be reused through expect-wait cycle.
- * <b>Note</b>: this class might be used aswell for sync caches, e.g. a test
could have subclasses which use sync and
- * async replication
- *
- * @author Mircea.Markus(a)jboss.com
- * @since 1.0
- */
-public class ReplicationListener {
- private CountDownLatch latch = new CountDownLatch(1);
- private Set<Class<? extends ReplicableCommand>> expectedCommands;
- Configuration configuration;
- private static final Log log = LogFactory.getLog(ReplicationListener.class);
-
- /**
- * Builds a listener that will observe the given cache for recieving replication
commands.
- */
- public ReplicationListener(Cache cache) {
- ComponentRegistry componentRegistry = TestingUtil.extractComponentRegistry(cache);
- RPCManager rpcManager = componentRegistry.getComponent(RPCManager.class);
- CommandAwareRpcDispatcher realDispatcher = (CommandAwareRpcDispatcher)
TestingUtil.extractField(rpcManager, "rpcDispatcher");
- RpcDispatcher.Marshaller2 realMarshaller = (RpcDispatcher.Marshaller2)
realDispatcher.getMarshaller();
- RpcDispatcher.Marshaller2 delegate = null;
- delegate = new MarshallerDelegate(realMarshaller);
- realDispatcher.setMarshaller(delegate);
- realDispatcher.setRequestMarshaller(delegate);
- realDispatcher.setResponseMarshaller(delegate);
- configuration = cache.getConfiguration();
- }
-
- private class MarshallerDelegate implements RpcDispatcher.Marshaller2 {
- RpcDispatcher.Marshaller2 marshaller;
-
- private MarshallerDelegate(RpcDispatcher.Marshaller2 marshaller) {
- this.marshaller = marshaller;
- }
-
- public byte[] objectToByteBuffer(Object obj) throws Exception {
- return marshaller.objectToByteBuffer(obj);
- }
-
- public Object objectFromByteBuffer(byte bytes[]) throws Exception {
- Object result = marshaller.objectFromByteBuffer(bytes);
- if (result instanceof ReplicateCommand && expectedCommands != null) {
- ReplicateCommand replicateCommand = (ReplicateCommand) result;
- return new ReplicateCommandDelegate(replicateCommand);
- }
- return result;
- }
-
- public Buffer objectToBuffer(Object o) throws Exception {
- return marshaller.objectToBuffer(o);
- }
-
- public Object objectFromByteBuffer(byte[] bytes, int i, int i1) throws Exception {
- Object result = marshaller.objectFromByteBuffer(bytes, i, i1);
- if (result instanceof ReplicateCommand && expectedCommands != null) {
- ReplicateCommand replicateCommand = (ReplicateCommand) result;
- return new ReplicateCommandDelegate(replicateCommand);
- }
- return result;
- }
- }
-
- /**
- * We want the notification to be performed only *after* the remote command is
executed.
- */
- private class ReplicateCommandDelegate extends ReplicateCommand {
- ReplicateCommand realOne;
-
- private ReplicateCommandDelegate(ReplicateCommand realOne) {
- this.realOne = realOne;
- }
-
- @Override
- public Object perform(InvocationContext ctx) throws Throwable {
- try {
- return realOne.perform(ctx);
- }
- finally {
- log.trace("Processed command: " + realOne);
- Iterator<Class<? extends ReplicableCommand>> it =
expectedCommands.iterator();
- while (it.hasNext()) {
- Class<? extends ReplicableCommand> replicableCommandClass =
it.next();
- if (realOne.containsCommandType(replicableCommandClass)) {
- it.remove();
- } else if (realOne.getSingleCommand() instanceof PrepareCommand)
//explicit transaction
- {
- PrepareCommand prepareCommand = (PrepareCommand)
realOne.getSingleCommand();
- if (prepareCommand.containsModificationType(replicableCommandClass)) {
- it.remove();
- }
- }
- }
- if (expectedCommands.isEmpty()) {
- latch.countDown();
- }
- }
- }
- }
-
- /**
- * Needed for region based marshalling.
- */
- private class RegionMarshallerDelegate extends HorizonMarshaller {
- private Marshaller realOne;
-
- private RegionMarshallerDelegate(Marshaller realOne) {
- this.realOne = realOne;
- }
-
- @Override
- public void objectToObjectStream(Object obj, ObjectOutputStream out) throws
Exception {
- realOne.objectToObjectStream(obj, out);
- }
-
- @Override
- public Object objectFromObjectStream(ObjectInputStream in) throws Exception {
- return realOne.objectFromObjectStream(in);
- }
-
- @Override
- public Object objectFromStream(InputStream is) throws Exception {
- return realOne.objectFromStream(is);
- }
-
- public Object objectFromByteBuffer(byte[] bytes) throws Exception {
- return this.objectFromByteBuffer(bytes, 0, bytes.length);
- }
-
-
- public ByteBuffer objectToBuffer(Object o) throws Exception {
- return realOne.objectToBuffer(o);
- }
-
- public Object objectFromByteBuffer(byte[] buffer, int i, int i1) throws Exception
{
- Object result = realOne.objectFromByteBuffer(buffer, i, i1);
- if (result instanceof ReplicateCommand && expectedCommands != null) {
- ReplicateCommand replicateCommand = (ReplicateCommand) result;
- result = new ReplicateCommandDelegate(replicateCommand);
- }
- return result;
- }
- }
-
- /**
- * Waits for 1 minute
- */
- public void waitForReplicationToOccur() {
- waitForReplicationToOccur(60000);
- }
-
- /**
- * Blocks for the elements specified through {@link #expect(Class[])} invocations to
be replicated in this cache. if
- * replication does not occur in the give timeout then an exception is being thrown.
- */
- public void waitForReplicationToOccur(long timeoutMillis) {
- log.trace("enter... ReplicationListener.waitForReplicationToOccur");
- waitForReplicationToOccur(timeoutMillis, TimeUnit.MILLISECONDS);
- log.trace("exit... ReplicationListener.waitForReplicationToOccur");
- }
-
- /**
- * Similar to {@link #waitForReplicationToOccur(long)} except that this method
provides more flexibility in time
- * units.
- *
- * @param timeout the maximum time to wait
- * @param timeUnit the time unit of the <tt>timeout</tt> argument.
- */
- public void waitForReplicationToOccur(long timeout, TimeUnit timeUnit) {
- assert expectedCommands != null : "there are no replication expectations;
please use AsyncReplicationListener.expect(...) before calling this method";
- try {
- if (!latch.await(timeout, timeUnit)) {
- assert false : "waiting for more than " + timeout + " " +
timeUnit + " and following commands did not replicate: " + expectedCommands;
- }
- }
- catch (InterruptedException e) {
- throw new IllegalStateException("unexpected", e);
- }
- finally {
- expectedCommands = null;
- latch = new CountDownLatch(1);
- }
- }
-
- /**
- * {@link #waitForReplicationToOccur(long)} will block untill all the commands
specified here are being replicated to
- * this cache. The method can be called several times with various arguments.
- */
- public void expect(Class<? extends ReplicableCommand>... expectedCommands) {
- if (this.expectedCommands == null) {
- this.expectedCommands = new HashSet<Class<? extends
ReplicableCommand>>();
- }
- this.expectedCommands.addAll(Arrays.asList(expectedCommands));
- }
-
- /**
- * Waits untill first command is replicated.
- */
- public void expectAny() {
- expect();
- }
-
- public void expectWithTx(Class<? extends ReplicableCommand>... writeCommands) {
- expect(PrepareCommand.class);
- //this is because for async replication we have an 1pc transaction
- if (configuration.getCacheMode().isSynchronous()) expect(CommitCommand.class);
- }
-
-}
\ No newline at end of file