[jbosscache-commits] JBoss Cache SVN: r7526 - in core/trunk/src: test/java/org/jboss/cache/buddyreplication and 2 other directories.
jbosscache-commits at lists.jboss.org
jbosscache-commits at lists.jboss.org
Tue Jan 20 06:27:44 EST 2009
Author: mircea.markus
Date: 2009-01-20 06:27:44 -0500 (Tue, 20 Jan 2009)
New Revision: 7526
Added:
core/trunk/src/main/java/org/jboss/cache/marshall/ReplicationObserver.java
core/trunk/src/test/java/org/jboss/cache/util/SingleBuddyGravitationHelper.java
Modified:
core/trunk/src/main/java/org/jboss/cache/marshall/CommandAwareRpcDispatcher.java
core/trunk/src/test/java/org/jboss/cache/buddyreplication/Buddy3NodesNoPoolNoDataGravitationTest.java
core/trunk/src/test/java/org/jboss/cache/buddyreplication/BuddyReplicationWithCacheLoaderTest.java
core/trunk/src/test/java/org/jboss/cache/util/internals/replicationlisteners/PessReplicationListener.java
core/trunk/src/test/java/org/jboss/cache/util/internals/replicationlisteners/ReplicationListener.java
Log:
more strict replication control
Modified: core/trunk/src/main/java/org/jboss/cache/marshall/CommandAwareRpcDispatcher.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/marshall/CommandAwareRpcDispatcher.java 2009-01-20 11:26:59 UTC (rev 7525)
+++ core/trunk/src/main/java/org/jboss/cache/marshall/CommandAwareRpcDispatcher.java 2009-01-20 11:27:44 UTC (rev 7526)
@@ -69,6 +69,7 @@
private ExecutorService replicationProcessor;
private AtomicInteger replicationProcessorCount;
private boolean asyncSerial;
+ private ReplicationObserver replicationObserver;
public CommandAwareRpcDispatcher()
{
@@ -114,6 +115,13 @@
}
}
+ public ReplicationObserver setReplicationObserver(ReplicationObserver replicationObserver)
+ {
+ ReplicationObserver result = this.replicationObserver;
+ this.replicationObserver = replicationObserver;
+ return result;
+ }
+
/**
* Serial(sync) marshalling should be enabled for async optimistic caches. That is because optimistic async is a 2PC,
* which might cause the Commit command to be send before the Prepare command, so replication will fail. This is not
@@ -240,33 +248,41 @@
protected Object executeCommand(ReplicableCommand cmd, Message req) throws Throwable
{
- if (cmd == null) throw new NullPointerException("Unable to execute a null command! Message was " + req);
- if (trace) log.trace("Executing command: " + cmd + " [sender=" + req.getSrc() + "]");
+ try
+ {
+ if (cmd == null) throw new NullPointerException("Unable to execute a null command! Message was " + req);
+ if (trace) log.trace("Executing command: " + cmd + " [sender=" + req.getSrc() + "]");
- if (cmd instanceof VisitableCommand)
- {
- InvocationContext ctx = invocationContextContainer.get();
- ctx.setOriginLocal(false);
- if (!componentRegistry.invocationsAllowed(false))
+ if (cmd instanceof VisitableCommand)
{
- return null;
+ InvocationContext ctx = invocationContextContainer.get();
+ ctx.setOriginLocal(false);
+ if (!componentRegistry.invocationsAllowed(false))
+ {
+ return null;
+ }
+ return interceptorChain.invoke(ctx, (VisitableCommand) cmd);
}
- return interceptorChain.invoke(ctx, (VisitableCommand) cmd);
- }
- else
- {
- if (trace) log.trace("This is a non-visitable command - so performing directly and not via the invoker.");
+ else
+ {
+ if (trace) log.trace("This is a non-visitable command - so performing directly and not via the invoker.");
- // need to check cache status for all except buddy replication commands.
- if (!(cmd instanceof AnnounceBuddyPoolNameCommand ||
- cmd instanceof AssignToBuddyGroupCommand ||
- cmd instanceof RemoveFromBuddyGroupCommand)
- && !componentRegistry.invocationsAllowed(false))
- {
- return null;
+ // need to check cache status for all except buddy replication commands.
+ if (!(cmd instanceof AnnounceBuddyPoolNameCommand ||
+ cmd instanceof AssignToBuddyGroupCommand ||
+ cmd instanceof RemoveFromBuddyGroupCommand)
+ && !componentRegistry.invocationsAllowed(false))
+ {
+ return null;
+ }
+ return cmd.perform(null);
}
- return cmd.perform(null);
}
+ finally
+ {
+ if (replicationObserver != null)
+ replicationObserver.afterExecutingCommand(cmd);
+ }
}
@Override
Added: core/trunk/src/main/java/org/jboss/cache/marshall/ReplicationObserver.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/marshall/ReplicationObserver.java (rev 0)
+++ core/trunk/src/main/java/org/jboss/cache/marshall/ReplicationObserver.java 2009-01-20 11:27:44 UTC (rev 7526)
@@ -0,0 +1,13 @@
+package org.jboss.cache.marshall;
+
+import org.jboss.cache.commands.ReplicableCommand;
+
+/**
+ * This is a hook for observing remotely replicated commands on this instance. Mainly used for unit testing.
+ *
+ * @author Mircea.Markus at jboss.com
+ */
+public interface ReplicationObserver
+{
+ public void afterExecutingCommand(ReplicableCommand command);
+}
Modified: core/trunk/src/test/java/org/jboss/cache/buddyreplication/Buddy3NodesNoPoolNoDataGravitationTest.java
===================================================================
--- core/trunk/src/test/java/org/jboss/cache/buddyreplication/Buddy3NodesNoPoolNoDataGravitationTest.java 2009-01-20 11:26:59 UTC (rev 7525)
+++ core/trunk/src/test/java/org/jboss/cache/buddyreplication/Buddy3NodesNoPoolNoDataGravitationTest.java 2009-01-20 11:27:44 UTC (rev 7526)
@@ -1,22 +1,17 @@
package org.jboss.cache.buddyreplication;
-import org.jboss.cache.CacheSPI;
-import org.jboss.cache.Fqn;
import org.jboss.cache.Cache;
-import org.jboss.cache.commands.remote.DataGravitationCleanupCommand;
+import org.jboss.cache.Fqn;
import org.jboss.cache.commands.write.PutKeyValueCommand;
-import org.jboss.cache.commands.write.PutDataMapCommand;
-import org.jboss.cache.util.CachePrinter;
import org.jboss.cache.util.TestingUtil;
import org.jboss.cache.util.internals.replicationlisteners.ReplicationListener;
-import org.testng.annotations.*;
-import static org.testng.AssertJUnit.assertEquals;
-import static org.testng.AssertJUnit.assertNull;
-import static org.testng.AssertJUnit.assertFalse;
-import static org.testng.AssertJUnit.assertTrue;
+import static org.testng.AssertJUnit.*;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+import static org.jboss.cache.util.SingleBuddyGravitationHelper.*;
-import java.util.List;
import java.util.ArrayList;
+import java.util.List;
/**
* @author Mircea.Markus at jboss.com
@@ -158,16 +153,12 @@
assertNoLocks(caches);
backupFqn = fqnTransformer.getBackupFqn(caches.get(1).getLocalAddress(), fqn);
- replListener.get(0).expect(DataGravitationCleanupCommand.class);
- replListener.get(2).expect(DataGravitationCleanupCommand.class);
- replListener.get(2).expect(PutDataMapCommand.class);
-
+ inReplicationListeners(replListener).dataWillGravitateFrom(0).to(1);
caches.get(1).getInvocationContext().getOptionOverrides().setForceDataGravitation(true);
log.info("Before gravitation call...");
assertEquals("value", caches.get(1).get(fqn, key));
- replListener.get(0).waitForReplicationToOccur(); // cleanup commands are async
- replListener.get(2).waitForReplicationToOccur(); // cleanup commands are async
+ expectGravitation();
TestingUtil.dumpCacheContents(caches);
Modified: core/trunk/src/test/java/org/jboss/cache/buddyreplication/BuddyReplicationWithCacheLoaderTest.java
===================================================================
--- core/trunk/src/test/java/org/jboss/cache/buddyreplication/BuddyReplicationWithCacheLoaderTest.java 2009-01-20 11:26:59 UTC (rev 7525)
+++ core/trunk/src/test/java/org/jboss/cache/buddyreplication/BuddyReplicationWithCacheLoaderTest.java 2009-01-20 11:27:44 UTC (rev 7526)
@@ -10,11 +10,14 @@
import org.jboss.cache.CacheSPI;
import org.jboss.cache.Fqn;
import org.jboss.cache.UnitTestCacheFactory;
-import org.jboss.cache.commands.remote.DataGravitationCleanupCommand;
+import org.jboss.cache.commands.write.PutKeyValueCommand;
import org.jboss.cache.commands.write.PutDataMapCommand;
import org.jboss.cache.config.Configuration;
import org.jboss.cache.loader.CacheLoader;
+import static org.jboss.cache.util.SingleBuddyGravitationHelper.expectGravitation;
+import static org.jboss.cache.util.SingleBuddyGravitationHelper.*;
import org.jboss.cache.util.TestingUtil;
+import org.jboss.cache.util.SingleBuddyGravitationHelper;
import static org.jboss.cache.util.TestingUtil.dumpCacheContents;
import org.jboss.cache.util.internals.replicationlisteners.ReplicationListener;
import static org.testng.AssertJUnit.assertEquals;
@@ -60,14 +63,15 @@
dataGravitationDefaultTest(false);
}
- private void dataGravitationDefaultTest(boolean autoGravitate)
- throws Exception
+ private void dataGravitationDefaultTest(boolean autoGravitate) throws Exception
{
// create 3 cachePool
List<CacheSPI<Object, Object>> caches = createCachesWithCacheLoader(3, autoGravitate, true, passivation);
cachesTL.set(caches);
- ReplicationListener replListener0 = ReplicationListener.getReplicationListener(caches.get(0));
- ReplicationListener replListener1 = ReplicationListener.getReplicationListener(caches.get(1));
+ List<ReplicationListener> replicationListeners = new ArrayList<ReplicationListener>();
+ replicationListeners.add(ReplicationListener.getReplicationListener(caches.get(0)));
+ replicationListeners.add(ReplicationListener.getReplicationListener(caches.get(1)));
+ replicationListeners.add(ReplicationListener.getReplicationListener(caches.get(2)));
CacheLoader[] loaders = getLoaders(caches);
@@ -75,7 +79,9 @@
for (int i = 0; i < 3; i++) loaders[i].remove(Fqn.ROOT);
// put stuff in cache0
+ replicationListeners.get(1).expect(PutKeyValueCommand.class);
caches.get(0).put(fqn, key, value);
+ replicationListeners.get(1).waitForReplicationToOccur();
// make sure there are no locks.
assertNoLocks(caches);
@@ -85,12 +91,10 @@
if (!autoGravitate)
caches.get(2).getInvocationContext().getOptionOverrides().setForceDataGravitation(true);
- replListener0.expect(DataGravitationCleanupCommand.class);
- replListener1.expect(DataGravitationCleanupCommand.class);
+ inReplicationListeners(replicationListeners).dataWillGravitateFrom(0).to(2);
// should cause a gravitation event
assertEquals(value, caches.get(2).get(fqn, key));
- replListener0.waitForReplicationToOccur();
- replListener1.waitForReplicationToOccur();
+ expectGravitation();
assertNoLocks(caches);
@@ -157,40 +161,38 @@
* Tests data gravitation when auto-gravitation is disabled and
* "removeOnFind=false"; i.e. nodes from which data is gravitated
* evict it instead of removing it.
- *
- * @throws Exception
*/
public void testWithDataGravitationEvictOnFindNoAuto() throws Exception
{
dataGravitationEvictionTest(false);
}
- private void dataGravitationEvictionTest(boolean autoGravitate)
- throws Exception
+ private void dataGravitationEvictionTest(boolean autoGravitate) throws Exception
{
// create 3 cachePool
List<CacheSPI<Object, Object>> caches = createCachesWithCacheLoader(3, autoGravitate, false, passivation);
ReplicationListener replListener0 = ReplicationListener.getReplicationListener(caches.get(0));
ReplicationListener replListener1 = ReplicationListener.getReplicationListener(caches.get(1));
-
+ ReplicationListener replListener2 = ReplicationListener.getReplicationListener(caches.get(2));
+
cachesTL.set(caches);
CacheLoader[] loaders = getLoaders(caches);
Fqn b1 = fqnTransformer.getBackupFqn(caches.get(0).getLocalAddress(), fqn);
Fqn b2 = fqnTransformer.getBackupFqn(caches.get(2).getLocalAddress(), fqn);
+
// put stuff in cache0
+ replListener1.expect(PutKeyValueCommand.class);
caches.get(0).put(fqn, key, value);
+ replListener1.waitForReplicationToOccur();
// request data from cache2
if (!autoGravitate)
caches.get(2).getInvocationContext().getOptionOverrides().setForceDataGravitation(true);
// should cause a gravitation event
- replListener0.expect(PutDataMapCommand.class);
- replListener0.expect(DataGravitationCleanupCommand.class);
- replListener1.expect(DataGravitationCleanupCommand.class);
+ SingleBuddyGravitationHelper.inReplicationListeners(replListener0, replListener1, replListener2).dataWillGravitateFrom(0).to(2);
assertEquals(value, caches.get(2).get(fqn, key));
- replListener0.waitForReplicationToOccur();
- replListener1.waitForReplicationToOccur();
+ expectGravitation();
// USE REPLICATION LISTENERS!!!!
Added: core/trunk/src/test/java/org/jboss/cache/util/SingleBuddyGravitationHelper.java
===================================================================
--- core/trunk/src/test/java/org/jboss/cache/util/SingleBuddyGravitationHelper.java (rev 0)
+++ core/trunk/src/test/java/org/jboss/cache/util/SingleBuddyGravitationHelper.java 2009-01-20 11:27:44 UTC (rev 7526)
@@ -0,0 +1,208 @@
+package org.jboss.cache.util;
+
+import org.jboss.cache.Cache;
+import org.jboss.cache.commands.read.GravitateDataCommand;
+import org.jboss.cache.commands.remote.DataGravitationCleanupCommand;
+import org.jboss.cache.commands.write.PutDataMapCommand;
+import org.jboss.cache.util.internals.replicationlisteners.ReplicationListener;
+
+import java.util.List;
+
+/**
+ * Helper class for monitoring replication between caches during data gravitation.
+ * Usage:
+ * <pre>
+ * import static org.jboss.cache.util.SingleBuddyGravitationHelper
+ * ....
+ *
+ * inCaches(cache1, cache2, cache3).dataWillGravitateFrom(0).to(1);
+ assertEquals("value", caches.get(1).get(fqn, key)); //this call will cause data gravitation
+ expectGravitation(); //here is where the failure will be if gravitation fails
+ *
+ * </pre>
+ *
+ *
+ * @author Mircea.Markus at jboss.com
+ */
+public class SingleBuddyGravitationHelper
+{
+ private Cache[] caches;
+ private ReplicationListener[] replicationListeners;
+ private int fromIndex = -1;
+ private int toIndex = -1;
+ static ThreadLocal<SingleBuddyGravitationHelper> perThread = new ThreadLocal<SingleBuddyGravitationHelper>();
+ boolean strict = true;
+
+
+ /**
+ * Creates an <tt>SingleBuddyGravitationHelper</tt> based on the list of caches passed in.
+ * The caches sequence is important, caches[i+1] must be a for caches[i].
+ * Important: it is assumed that caches do not have replication listeners already built. If so use
+ * {@link SingleBuddyGravitationHelper#inReplicationListeners(java.util.List)} methods, as a cache cannot have 2
+ * replication listener instances.
+ */
+ public static SingleBuddyGravitationHelper inCaches(Cache... caches)
+ {
+ SingleBuddyGravitationHelper gravitationHelper = new SingleBuddyGravitationHelper(caches);
+ perThread.set(gravitationHelper);
+ return gravitationHelper;
+ }
+
+ /**
+ * Transforms the List in an array and calls {@link SingleBuddyGravitationHelper#inCaches(org.jboss.cache.Cache[])}
+ */
+ public static SingleBuddyGravitationHelper inCaches(List caches)
+ {
+ Cache[] cachesArray = (Cache[]) caches.toArray(new Cache[caches.size()]);
+ return inCaches(cachesArray);
+ }
+
+ /**
+ * Insted of creating a replication listener for each cache, use the already build ReplicationListeners.
+ */
+ public static SingleBuddyGravitationHelper inReplicationListeners(ReplicationListener... caches)
+ {
+ SingleBuddyGravitationHelper gravitationHelper = new SingleBuddyGravitationHelper(caches);
+ perThread.set(gravitationHelper);
+ return gravitationHelper;
+ }
+
+ /**
+ * Transforms the suplied list in an array and calls
+ * {@link SingleBuddyGravitationHelper#inReplicationListeners(org.jboss.cache.util.internals.replicationlisteners.ReplicationListener[])}
+ */
+ public static SingleBuddyGravitationHelper inReplicationListeners(List replListeners)
+ {
+ ReplicationListener[] listeners = (ReplicationListener[]) replListeners.toArray(new ReplicationListener[replListeners.size()]);
+ SingleBuddyGravitationHelper gravitationHelper = new SingleBuddyGravitationHelper(listeners);
+ perThread.set(gravitationHelper);
+ return gravitationHelper;
+ }
+
+
+ /**
+ * After the {@link SingleBuddyGravitationHelper#dataWillGravitateFrom(int)} and {@link SingleBuddyGravitationHelper#to(int)}
+ * are called, and after the gravitation call to the cache is done, here we wait for the gravitation commands to replicate.
+ */
+ public static void expectGravitation()
+ {
+ SingleBuddyGravitationHelper gravitationHelper = perThread.get();
+ assert gravitationHelper != null : "replication helper should be created. Use inReplicationListeners before calling this";
+ gravitationHelper.waitForReplication();
+ }
+
+ /**
+ * You specify here the original data owner.
+ */
+ public SingleBuddyGravitationHelper dataWillGravitateFrom(int index)
+ {
+ assertValidIndex(index);
+ this.fromIndex = index;
+ return this;
+ }
+
+ /**
+ * Here you specify where the data will migrate to.
+ */
+ public SingleBuddyGravitationHelper to(int index)
+ {
+ toIndex = index;
+ assert fromIndex >= 0 : "Must call dataWillGravitateFrom before this one";
+
+ //all other caches must receive an GravitateDataCommand
+ expectGravitateData();
+
+ //all other caches will receive an DataGravitationCleanup command
+ expectGravitationCleanup();
+
+ //buddy of the new data owner should also receive the data
+ int cacheCount = caches.length;
+ int newDataOwnerIndex = getIndex(caches[toIndex]);
+ int newBuddyIndex = (newDataOwnerIndex == cacheCount - 1) ? 0 : newDataOwnerIndex + 1;
+ replicationListeners[newBuddyIndex].expect(PutDataMapCommand.class);
+ return this;
+ }
+
+
+ private SingleBuddyGravitationHelper(Cache[] caches)
+ {
+ this.caches = caches;
+ replicationListeners = new ReplicationListener[caches.length];
+ for (int i = 0; i < caches.length; i++)
+ {
+ replicationListeners[i] = ReplicationListener.getReplicationListener(caches[i]);
+ }
+ }
+
+ private SingleBuddyGravitationHelper(ReplicationListener[] replicationListeners)
+ {
+ this.replicationListeners = replicationListeners;
+ caches = new Cache[replicationListeners.length];
+ for (int i = 0; i < replicationListeners.length; i++)
+ {
+ caches[i] = replicationListeners[i].getCache();
+ }
+ }
+
+ private void assertValidIndex(int index)
+ {
+ assert index >= 0 && index < caches.length;
+ }
+
+ //disable for now
+ private void strict()
+ {
+ this.strict = true;
+ }
+
+ private int getIndex(Cache cache)
+ {
+ for (int i = 0; i < caches.length; i++)
+ {
+ if (caches[i] == cache) return i;
+ }
+ throw new RuntimeException("cache not found withis cache instances");
+ }
+
+ private void expectGravitationCleanup()
+ {
+ if (!strict)
+ {
+ replicationListeners[fromIndex].expect(DataGravitationCleanupCommand.class);
+ return;
+ }
+ for (int i = 0; i < caches.length; i++)
+ {
+ if (i != toIndex) replicationListeners[i].expect(DataGravitationCleanupCommand.class);
+ }
+ }
+
+ private void expectGravitateData()
+ {
+ if (!strict) return;
+ //this means that we are a buddy of the node we gravitate from, so local gravitation will take place and
+ // no DataGravitation commands will be send accross
+ if (newOwnerIsBuddyOfOldOwner())
+ {
+ return;
+ }
+ for (int i = 0; i < caches.length; i++)
+ {
+ if (caches[i] != caches[toIndex]) replicationListeners[i].expect(GravitateDataCommand.class);
+ }
+ }
+
+ private boolean newOwnerIsBuddyOfOldOwner()
+ {
+ return (toIndex - 1 == fromIndex) || (toIndex == caches.length - 1 && fromIndex == 0);
+ }
+
+ private void waitForReplication()
+ {
+ for (ReplicationListener listener : replicationListeners)
+ {
+ if (listener.getCache() != caches[toIndex])
+ listener.waitForReplicationToOccur();
+ }
+ }
+}
Modified: core/trunk/src/test/java/org/jboss/cache/util/internals/replicationlisteners/PessReplicationListener.java
===================================================================
--- core/trunk/src/test/java/org/jboss/cache/util/internals/replicationlisteners/PessReplicationListener.java 2009-01-20 11:26:59 UTC (rev 7525)
+++ core/trunk/src/test/java/org/jboss/cache/util/internals/replicationlisteners/PessReplicationListener.java 2009-01-20 11:27:44 UTC (rev 7526)
@@ -2,10 +2,12 @@
import org.jboss.cache.Cache;
import org.jboss.cache.commands.ReplicableCommand;
+import org.jboss.cache.commands.read.GravitateDataCommand;
import org.jboss.cache.commands.remote.DataGravitationCleanupCommand;
import org.jboss.cache.commands.tx.PrepareCommand;
import org.jboss.cache.commands.tx.CommitCommand;
import org.jboss.cache.commands.legacy.write.*;
+import org.jboss.cache.commands.legacy.read.LegacyGravitateDataCommand;
import org.jboss.cache.commands.write.*;
import java.util.Map;
@@ -31,6 +33,7 @@
mvcc2PessMap.put(RemoveKeyCommand.class, PessRemoveKeyCommand.class);
mvcc2PessMap.put(RemoveNodeCommand.class, PessRemoveNodeCommand.class);
mvcc2PessMap.put(DataGravitationCleanupCommand.class, DataGravitationCleanupCommand.class);
+ mvcc2PessMap.put(GravitateDataCommand.class, LegacyGravitateDataCommand.class);
}
Modified: core/trunk/src/test/java/org/jboss/cache/util/internals/replicationlisteners/ReplicationListener.java
===================================================================
--- core/trunk/src/test/java/org/jboss/cache/util/internals/replicationlisteners/ReplicationListener.java 2009-01-20 11:26:59 UTC (rev 7525)
+++ core/trunk/src/test/java/org/jboss/cache/util/internals/replicationlisteners/ReplicationListener.java 2009-01-20 11:27:44 UTC (rev 7526)
@@ -1,27 +1,21 @@
package org.jboss.cache.util.internals.replicationlisteners;
-import org.jboss.cache.*;
-import org.jboss.cache.config.Configuration;
+import org.jboss.cache.Cache;
+import org.jboss.cache.RPCManager;
import org.jboss.cache.commands.ReplicableCommand;
-import org.jboss.cache.commands.VisitableCommand;
import org.jboss.cache.commands.remote.*;
import org.jboss.cache.commands.tx.PrepareCommand;
+import org.jboss.cache.config.Configuration;
import org.jboss.cache.factories.ComponentRegistry;
-import org.jboss.cache.io.ByteBuffer;
-import org.jboss.cache.marshall.AbstractMarshaller;
import org.jboss.cache.marshall.CommandAwareRpcDispatcher;
-import org.jboss.cache.marshall.InactiveRegionAwareRpcDispatcher;
-import org.jboss.cache.marshall.Marshaller;
-import org.jboss.cache.marshall.RegionalizedMethodCall;
+import org.jboss.cache.marshall.ReplicationObserver;
import org.jboss.cache.util.TestingUtil;
-import org.jgroups.blocks.RpcDispatcher;
-import org.jgroups.util.Buffer;
import org.jgroups.Address;
-import java.io.InputStream;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@@ -56,13 +50,14 @@
* @author Mircea.Markus at jboss.com
* @since 2.2
*/
-abstract public class ReplicationListener
+abstract public class ReplicationListener implements ReplicationObserver
{
public static final long DEFAULT_TIMEOUT = 10000;
private CountDownLatch latch = new CountDownLatch(1);
protected List<Class<? extends ReplicableCommand>> expectedCommands;
protected Configuration config;
protected final Address localAddress;
+ private Cache cache;
/**
* Builds a listener that will observe the given cache for recieving replication commands.
@@ -72,21 +67,13 @@
ComponentRegistry componentRegistry = TestingUtil.extractComponentRegistry(cache);
RPCManager rpcManager = componentRegistry.getComponent(RPCManager.class);
CommandAwareRpcDispatcher realDispatcher = (CommandAwareRpcDispatcher) TestingUtil.extractField(rpcManager, "rpcDispatcher");
- RpcDispatcher.Marshaller2 realMarshaller = (RpcDispatcher.Marshaller2) realDispatcher.getMarshaller();
- RpcDispatcher.Marshaller2 delegate = null;
-// if ((realMarshaller instanceof RegionMarshallerDelegate) || (realMarshaller instanceof MarshallerDelegate))
-// {
-// throw new RuntimeException("Illegal state");
-// }
- if (realDispatcher instanceof InactiveRegionAwareRpcDispatcher)
- delegate = new RegionMarshallerDelegate((Marshaller) realMarshaller);
- else
- delegate = new MarshallerDelegate(realMarshaller);
- realDispatcher.setMarshaller(delegate);
- realDispatcher.setRequestMarshaller(delegate);
- realDispatcher.setResponseMarshaller(delegate);
- this.config = cache.getConfiguration();
+ if (realDispatcher.setReplicationObserver(this) != null)
+ {
+ throw new RuntimeException("Replication listener already present");
+ }
this.localAddress = cache.getLocalAddress();
+ this.config = cache.getConfiguration();
+ this.cache = cache;
}
protected ReplicationListener()
@@ -125,118 +112,41 @@
}
}
- private class MarshallerDelegate implements RpcDispatcher.Marshaller2
+ public void afterExecutingCommand(ReplicableCommand realOne)
{
- RpcDispatcher.Marshaller2 marshaller;
-
- private MarshallerDelegate(RpcDispatcher.Marshaller2 marshaller)
+ if (expectedCommands == null)
{
- this.marshaller = marshaller;
+ log("skipping command " + realOne);
+ return;
}
+ log("Processed command: " + realOne);
- public byte[] objectToByteBuffer(Object obj) throws Exception
+ if (realOne instanceof ReplicateCommand)
{
- return marshaller.objectToByteBuffer(obj);
+ postReplicateExecution((ReplicateCommand)realOne);
}
-
- public Object objectFromByteBuffer(byte bytes[]) throws Exception
+ else
{
- Object result = marshaller.objectFromByteBuffer(bytes);
- return process(result);
+ postNonVisitableExecution(realOne);
}
-
- public Object objectFromByteBuffer(byte[] bytes, int i, int i1) throws Exception
+ if (expectedCommands.isEmpty())
{
- Object result = marshaller.objectFromByteBuffer(bytes, i, i1);
- return process(result);
+ latch.countDown();
}
- private Object process(Object result)
- {
- boolean isReplicate = result instanceof ReplicateCommand;
- boolean isRemote = result != null && isRemoteCommand(result.getClass());
- if ((isReplicate || isRemote) && (expectedCommands != null))
- {
- ReplicableCommand replicateCommand = (ReplicableCommand) result;
- result = new ReplicableCommandDelegate(replicateCommand);
- }
- return result;
- }
-
- public Buffer objectToBuffer(Object o) throws Exception
- {
- return marshaller.objectToBuffer(o);
- }
}
- /**
- * We want the notification to be performed only *after* the remote command is executed.
- */
- private class ReplicableCommandDelegate implements ReplicableCommand
+ private void log(String s)
{
- ReplicableCommand realOne;
-
- private ReplicableCommandDelegate(ReplicableCommand realOne)
- {
- if (realOne instanceof VisitableCommand)
- {
- throw new IllegalArgumentException("Visitable commands not allowed!!! ;recieved="
- + realOne.getClass().getName());
- }
- this.realOne = realOne;
- }
-
- public Object perform(InvocationContext ctx) throws Throwable
- {
- try
- {
- return realOne.perform(ctx);
- }
- finally
- {
- log("Processed command: " + realOne);
- if (realOne instanceof ReplicateCommand)
- {
- postReplicateExecution((ReplicateCommand)realOne);
- }
- else
- {
- postNonVisitableExecution(realOne);
- }
- if (expectedCommands.isEmpty())
- {
- latch.countDown();
- }
- }
- }
-
- public int getCommandId()
- {
- return realOne.getCommandId();
- }
-
- public Object[] getParameters()
- {
- return realOne.getParameters();
- }
-
- public void setParameters(int commandId, Object[] parameters)
- {
- realOne.setParameters(commandId, parameters);
- }
-
- @Override
- public String toString()
- {
- return "ReplicableCommandDelegate{" +
- "realOne=" + realOne +
- '}';
- }
+ System.out.println("[" + localAddress + "] " + s);
}
protected void postNonVisitableExecution(ReplicableCommand realOne)
{
- expectedCommands.remove(realOne.getClass());
+ if (!expectedCommands.remove(realOne.getClass()))
+ {
+ log("not expecting command " + realOne + " ");
+ }
}
protected void postReplicateExecution(ReplicateCommand realOne)
@@ -259,74 +169,16 @@
}
}
- /**
- * Needed for region based marshalling.
- */
- private class RegionMarshallerDelegate extends AbstractMarshaller
- {
- private Marshaller realOne;
- private RegionMarshallerDelegate(Marshaller realOne)
- {
- this.realOne = realOne;
- }
-
- public void objectToObjectStream(Object obj, ObjectOutputStream out) throws Exception
- {
- realOne.objectToObjectStream(obj, out);
- }
-
- public Object objectFromObjectStream(ObjectInputStream in) throws Exception
- {
- return realOne.objectFromObjectStream(in);
- }
-
- public Object objectFromStream(InputStream is) throws Exception
- {
- return realOne.objectFromStream(is);
- }
-
- public void objectToObjectStream(Object obj, ObjectOutputStream out, Fqn region) throws Exception
- {
- realOne.objectToObjectStream(obj, out, region);
- }
-
- public RegionalizedMethodCall regionalizedMethodCallFromByteBuffer(byte[] buffer) throws Exception
- {
- RegionalizedMethodCall result = realOne.regionalizedMethodCallFromByteBuffer(buffer);
- if (result.command instanceof ReplicateCommand && expectedCommands != null)
- {
- ReplicateCommand replicateCommand = (ReplicateCommand) result.command;
- result.command = new ReplicableCommandDelegate(replicateCommand);
- }
- return result;
- }
-
- public RegionalizedMethodCall regionalizedMethodCallFromObjectStream(ObjectInputStream in) throws Exception
- {
- return realOne.regionalizedMethodCallFromObjectStream(in);
- }
-
- public ByteBuffer objectToBuffer(Object o) throws Exception
- {
- return realOne.objectToBuffer(o);
- }
-
- public Object objectFromByteBuffer(byte[] bytes, int i, int i1) throws Exception
- {
- return realOne.objectFromByteBuffer(bytes, i, i1);
- }
- }
-
/**
* Blocks for the elements specified through {@link #expect(Class[])} invocations to be replicated in this cache.
* if replication does not occur in the give timeout then an exception is being thrown.
*/
public void waitForReplicationToOccur(long timeoutMillis)
{
- log("enter... ReplicationListener.waitForReplicationToOccur");
+// log("enter... ReplicationListener.waitForReplicationToOccur");
waitForReplicationToOccur(timeoutMillis, TimeUnit.MILLISECONDS);
- log("exit... ReplicationListener.waitForReplicationToOccur");
+// log("exit... ReplicationListener.waitForReplicationToOccur");
}
/**
@@ -350,7 +202,7 @@
{
if (!expectedCommands.isEmpty() && !latch.await(timeout, timeUnit))
{
- assert false : "waiting for more than " + timeout + " " + timeUnit + " and following commands did not replicate: " + expectedCommands;
+ assert false : "[" +localAddress + "] waiting for more than " + timeout + " " + timeUnit + " and following commands did not replicate: " + expectedCommands;
}
}
catch (InterruptedException e)
@@ -390,8 +242,8 @@
clazz.equals(RemoveFromBuddyGroupCommand.class) || clazz.equals(ReplicateCommand.class);
}
- private void log(String msg)
+ public Cache getCache()
{
- System.out.println("[" + localAddress + "]" + msg);
+ return cache;
}
}
More information about the jbosscache-commits
mailing list