[jbosscache-commits] JBoss Cache SVN: r7526 - in core/trunk/src: test/java/org/jboss/cache/buddyreplication and 2 other directories.

jbosscache-commits at lists.jboss.org jbosscache-commits at lists.jboss.org
Tue Jan 20 06:27:44 EST 2009


Author: mircea.markus
Date: 2009-01-20 06:27:44 -0500 (Tue, 20 Jan 2009)
New Revision: 7526

Added:
   core/trunk/src/main/java/org/jboss/cache/marshall/ReplicationObserver.java
   core/trunk/src/test/java/org/jboss/cache/util/SingleBuddyGravitationHelper.java
Modified:
   core/trunk/src/main/java/org/jboss/cache/marshall/CommandAwareRpcDispatcher.java
   core/trunk/src/test/java/org/jboss/cache/buddyreplication/Buddy3NodesNoPoolNoDataGravitationTest.java
   core/trunk/src/test/java/org/jboss/cache/buddyreplication/BuddyReplicationWithCacheLoaderTest.java
   core/trunk/src/test/java/org/jboss/cache/util/internals/replicationlisteners/PessReplicationListener.java
   core/trunk/src/test/java/org/jboss/cache/util/internals/replicationlisteners/ReplicationListener.java
Log:
more strict replication control

Modified: core/trunk/src/main/java/org/jboss/cache/marshall/CommandAwareRpcDispatcher.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/marshall/CommandAwareRpcDispatcher.java	2009-01-20 11:26:59 UTC (rev 7525)
+++ core/trunk/src/main/java/org/jboss/cache/marshall/CommandAwareRpcDispatcher.java	2009-01-20 11:27:44 UTC (rev 7526)
@@ -69,6 +69,7 @@
    private ExecutorService replicationProcessor;
    private AtomicInteger replicationProcessorCount;
    private boolean asyncSerial;
+   private ReplicationObserver replicationObserver;
 
    public CommandAwareRpcDispatcher()
    {
@@ -114,6 +115,13 @@
       }
    }
 
+   public ReplicationObserver setReplicationObserver(ReplicationObserver replicationObserver)
+   {
+      ReplicationObserver result = this.replicationObserver;
+      this.replicationObserver = replicationObserver;
+      return result;
+   }
+
    /**
     * Serial(sync) marshalling should be enabled for async optimistic caches. That is because optimistic async is a 2PC,
     * which might cause the Commit command to be send before the Prepare command, so replication will fail. This is not
@@ -240,33 +248,41 @@
 
    protected Object executeCommand(ReplicableCommand cmd, Message req) throws Throwable
    {
-      if (cmd == null) throw new NullPointerException("Unable to execute a null command!  Message was " + req);
-      if (trace) log.trace("Executing command: " + cmd + " [sender=" + req.getSrc() + "]");
+      try
+      {
+         if (cmd == null) throw new NullPointerException("Unable to execute a null command!  Message was " + req);
+         if (trace) log.trace("Executing command: " + cmd + " [sender=" + req.getSrc() + "]");
 
-      if (cmd instanceof VisitableCommand)
-      {
-         InvocationContext ctx = invocationContextContainer.get();
-         ctx.setOriginLocal(false);
-         if (!componentRegistry.invocationsAllowed(false))
+         if (cmd instanceof VisitableCommand)
          {
-            return null;
+            InvocationContext ctx = invocationContextContainer.get();
+            ctx.setOriginLocal(false);
+            if (!componentRegistry.invocationsAllowed(false))
+            {
+               return null;
+            }
+            return interceptorChain.invoke(ctx, (VisitableCommand) cmd);
          }
-         return interceptorChain.invoke(ctx, (VisitableCommand) cmd);
-      }
-      else
-      {
-         if (trace) log.trace("This is a non-visitable command - so performing directly and not via the invoker.");
+         else
+         {
+            if (trace) log.trace("This is a non-visitable command - so performing directly and not via the invoker.");
 
-         // need to check cache status for all except buddy replication commands.
-         if (!(cmd instanceof AnnounceBuddyPoolNameCommand ||
-               cmd instanceof AssignToBuddyGroupCommand ||
-               cmd instanceof RemoveFromBuddyGroupCommand)
-               && !componentRegistry.invocationsAllowed(false))
-         {
-            return null;
+            // need to check cache status for all except buddy replication commands.
+            if (!(cmd instanceof AnnounceBuddyPoolNameCommand ||
+                  cmd instanceof AssignToBuddyGroupCommand ||
+                  cmd instanceof RemoveFromBuddyGroupCommand)
+                  && !componentRegistry.invocationsAllowed(false))
+            {
+               return null;
+            }
+            return cmd.perform(null);
          }
-         return cmd.perform(null);
       }
+      finally
+      {
+         if (replicationObserver != null)
+            replicationObserver.afterExecutingCommand(cmd);
+      }
    }
 
    @Override

Added: core/trunk/src/main/java/org/jboss/cache/marshall/ReplicationObserver.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/marshall/ReplicationObserver.java	                        (rev 0)
+++ core/trunk/src/main/java/org/jboss/cache/marshall/ReplicationObserver.java	2009-01-20 11:27:44 UTC (rev 7526)
@@ -0,0 +1,13 @@
+package org.jboss.cache.marshall;
+
+import org.jboss.cache.commands.ReplicableCommand;
+
+/**
+ * This is a hook for observing remotely replicated commands on this instance. Mainly used for unit testing.
+ *
+ * @author Mircea.Markus at jboss.com
+ */
+public interface ReplicationObserver
+{
+   public void afterExecutingCommand(ReplicableCommand command);
+}

Modified: core/trunk/src/test/java/org/jboss/cache/buddyreplication/Buddy3NodesNoPoolNoDataGravitationTest.java
===================================================================
--- core/trunk/src/test/java/org/jboss/cache/buddyreplication/Buddy3NodesNoPoolNoDataGravitationTest.java	2009-01-20 11:26:59 UTC (rev 7525)
+++ core/trunk/src/test/java/org/jboss/cache/buddyreplication/Buddy3NodesNoPoolNoDataGravitationTest.java	2009-01-20 11:27:44 UTC (rev 7526)
@@ -1,22 +1,17 @@
 package org.jboss.cache.buddyreplication;
 
-import org.jboss.cache.CacheSPI;
-import org.jboss.cache.Fqn;
 import org.jboss.cache.Cache;
-import org.jboss.cache.commands.remote.DataGravitationCleanupCommand;
+import org.jboss.cache.Fqn;
 import org.jboss.cache.commands.write.PutKeyValueCommand;
-import org.jboss.cache.commands.write.PutDataMapCommand;
-import org.jboss.cache.util.CachePrinter;
 import org.jboss.cache.util.TestingUtil;
 import org.jboss.cache.util.internals.replicationlisteners.ReplicationListener;
-import org.testng.annotations.*;
-import static org.testng.AssertJUnit.assertEquals;
-import static org.testng.AssertJUnit.assertNull;
-import static org.testng.AssertJUnit.assertFalse;
-import static org.testng.AssertJUnit.assertTrue;
+import static org.testng.AssertJUnit.*;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+import static org.jboss.cache.util.SingleBuddyGravitationHelper.*;
 
-import java.util.List;
 import java.util.ArrayList;
+import java.util.List;
 
 /**
  * @author Mircea.Markus at jboss.com
@@ -158,16 +153,12 @@
       assertNoLocks(caches);
 
       backupFqn = fqnTransformer.getBackupFqn(caches.get(1).getLocalAddress(), fqn);
-      replListener.get(0).expect(DataGravitationCleanupCommand.class);
-      replListener.get(2).expect(DataGravitationCleanupCommand.class);
-      replListener.get(2).expect(PutDataMapCommand.class);
 
-
+      inReplicationListeners(replListener).dataWillGravitateFrom(0).to(1);
       caches.get(1).getInvocationContext().getOptionOverrides().setForceDataGravitation(true);
       log.info("Before gravitation call...");
       assertEquals("value", caches.get(1).get(fqn, key));
-      replListener.get(0).waitForReplicationToOccur(); // cleanup commands are async
-      replListener.get(2).waitForReplicationToOccur(); // cleanup commands are async
+      expectGravitation();
 
       TestingUtil.dumpCacheContents(caches);
 

Modified: core/trunk/src/test/java/org/jboss/cache/buddyreplication/BuddyReplicationWithCacheLoaderTest.java
===================================================================
--- core/trunk/src/test/java/org/jboss/cache/buddyreplication/BuddyReplicationWithCacheLoaderTest.java	2009-01-20 11:26:59 UTC (rev 7525)
+++ core/trunk/src/test/java/org/jboss/cache/buddyreplication/BuddyReplicationWithCacheLoaderTest.java	2009-01-20 11:27:44 UTC (rev 7526)
@@ -10,11 +10,14 @@
 import org.jboss.cache.CacheSPI;
 import org.jboss.cache.Fqn;
 import org.jboss.cache.UnitTestCacheFactory;
-import org.jboss.cache.commands.remote.DataGravitationCleanupCommand;
+import org.jboss.cache.commands.write.PutKeyValueCommand;
 import org.jboss.cache.commands.write.PutDataMapCommand;
 import org.jboss.cache.config.Configuration;
 import org.jboss.cache.loader.CacheLoader;
+import static org.jboss.cache.util.SingleBuddyGravitationHelper.expectGravitation;
+import static org.jboss.cache.util.SingleBuddyGravitationHelper.*;
 import org.jboss.cache.util.TestingUtil;
+import org.jboss.cache.util.SingleBuddyGravitationHelper;
 import static org.jboss.cache.util.TestingUtil.dumpCacheContents;
 import org.jboss.cache.util.internals.replicationlisteners.ReplicationListener;
 import static org.testng.AssertJUnit.assertEquals;
@@ -60,14 +63,15 @@
       dataGravitationDefaultTest(false);
    }
 
-   private void dataGravitationDefaultTest(boolean autoGravitate)
-         throws Exception
+   private void dataGravitationDefaultTest(boolean autoGravitate) throws Exception
    {
       // create 3 cachePool
       List<CacheSPI<Object, Object>> caches = createCachesWithCacheLoader(3, autoGravitate, true, passivation);
       cachesTL.set(caches);
-      ReplicationListener replListener0 = ReplicationListener.getReplicationListener(caches.get(0));
-      ReplicationListener replListener1 = ReplicationListener.getReplicationListener(caches.get(1));
+      List<ReplicationListener> replicationListeners = new ArrayList<ReplicationListener>();
+      replicationListeners.add(ReplicationListener.getReplicationListener(caches.get(0)));
+      replicationListeners.add(ReplicationListener.getReplicationListener(caches.get(1)));
+      replicationListeners.add(ReplicationListener.getReplicationListener(caches.get(2)));
 
       CacheLoader[] loaders = getLoaders(caches);
 
@@ -75,7 +79,9 @@
       for (int i = 0; i < 3; i++) loaders[i].remove(Fqn.ROOT);
 
       // put stuff in cache0
+      replicationListeners.get(1).expect(PutKeyValueCommand.class);
       caches.get(0).put(fqn, key, value);
+      replicationListeners.get(1).waitForReplicationToOccur();
 
       // make sure there are no locks.
       assertNoLocks(caches);
@@ -85,12 +91,10 @@
       if (!autoGravitate)
          caches.get(2).getInvocationContext().getOptionOverrides().setForceDataGravitation(true);
 
-      replListener0.expect(DataGravitationCleanupCommand.class);
-      replListener1.expect(DataGravitationCleanupCommand.class);
+      inReplicationListeners(replicationListeners).dataWillGravitateFrom(0).to(2);
       // should cause a gravitation event
       assertEquals(value, caches.get(2).get(fqn, key));
-      replListener0.waitForReplicationToOccur();
-      replListener1.waitForReplicationToOccur();
+      expectGravitation();
 
       assertNoLocks(caches);
 
@@ -157,40 +161,38 @@
     * Tests data gravitation when auto-gravitation is disabled and
     * "removeOnFind=false"; i.e. nodes from which data is gravitated
     * evict it instead of removing it.
-    *
-    * @throws Exception
     */
    public void testWithDataGravitationEvictOnFindNoAuto() throws Exception
    {
       dataGravitationEvictionTest(false);
    }
 
-   private void dataGravitationEvictionTest(boolean autoGravitate)
-         throws Exception
+   private void dataGravitationEvictionTest(boolean autoGravitate) throws Exception
    {
       // create 3 cachePool
       List<CacheSPI<Object, Object>> caches = createCachesWithCacheLoader(3, autoGravitate, false, passivation);
       ReplicationListener replListener0 = ReplicationListener.getReplicationListener(caches.get(0));
       ReplicationListener replListener1 = ReplicationListener.getReplicationListener(caches.get(1));
-      
+      ReplicationListener replListener2 = ReplicationListener.getReplicationListener(caches.get(2));
+
       cachesTL.set(caches);
       CacheLoader[] loaders = getLoaders(caches);
       Fqn b1 = fqnTransformer.getBackupFqn(caches.get(0).getLocalAddress(), fqn);
       Fqn b2 = fqnTransformer.getBackupFqn(caches.get(2).getLocalAddress(), fqn);
+
       // put stuff in cache0
+      replListener1.expect(PutKeyValueCommand.class);
       caches.get(0).put(fqn, key, value);
+      replListener1.waitForReplicationToOccur();
 
       // request data from cache2
       if (!autoGravitate)
          caches.get(2).getInvocationContext().getOptionOverrides().setForceDataGravitation(true);
 
       // should cause a gravitation event
-      replListener0.expect(PutDataMapCommand.class);
-      replListener0.expect(DataGravitationCleanupCommand.class);
-      replListener1.expect(DataGravitationCleanupCommand.class);
+      SingleBuddyGravitationHelper.inReplicationListeners(replListener0, replListener1, replListener2).dataWillGravitateFrom(0).to(2);
       assertEquals(value, caches.get(2).get(fqn, key));
-      replListener0.waitForReplicationToOccur();
-      replListener1.waitForReplicationToOccur();            
+      expectGravitation();
 
 //      USE REPLICATION LISTENERS!!!!
 

Added: core/trunk/src/test/java/org/jboss/cache/util/SingleBuddyGravitationHelper.java
===================================================================
--- core/trunk/src/test/java/org/jboss/cache/util/SingleBuddyGravitationHelper.java	                        (rev 0)
+++ core/trunk/src/test/java/org/jboss/cache/util/SingleBuddyGravitationHelper.java	2009-01-20 11:27:44 UTC (rev 7526)
@@ -0,0 +1,208 @@
+package org.jboss.cache.util;
+
+import org.jboss.cache.Cache;
+import org.jboss.cache.commands.read.GravitateDataCommand;
+import org.jboss.cache.commands.remote.DataGravitationCleanupCommand;
+import org.jboss.cache.commands.write.PutDataMapCommand;
+import org.jboss.cache.util.internals.replicationlisteners.ReplicationListener;
+
+import java.util.List;
+
+/**
+ * Helper class for monitoring replication between caches during data gravitation.
+ * Usage:
+ *  <pre>
+ *   import static org.jboss.cache.util.SingleBuddyGravitationHelper
+ *    ....
+ *
+ *    inCaches(cache1, cache2, cache3).dataWillGravitateFrom(0).to(1);
+      assertEquals("value", caches.get(1).get(fqn, key)); //this call will cause data gravitation
+      expectGravitation();   //here is where the failure will be if gravitation fails
+ *
+ *  </pre>
+ *
+ *
+ * @author Mircea.Markus at jboss.com
+ */
+public class SingleBuddyGravitationHelper
+{
+   private Cache[] caches;
+   private ReplicationListener[] replicationListeners;
+   private int fromIndex = -1;
+   private int toIndex = -1;
+   static ThreadLocal<SingleBuddyGravitationHelper> perThread = new ThreadLocal<SingleBuddyGravitationHelper>();
+   boolean strict = true;
+
+
+   /**
+    * Creates an <tt>SingleBuddyGravitationHelper</tt> based on the list of caches passed in.
+    * The caches sequence is important, caches[i+1] must be a for caches[i].
+    * Important: it is assumed  that caches do not have replication listeners already built. If so use
+    * {@link SingleBuddyGravitationHelper#inReplicationListeners(java.util.List)} methods, as a cache cannot have 2
+    * replication listener instances. 
+    */
+   public static SingleBuddyGravitationHelper inCaches(Cache... caches)
+   {
+      SingleBuddyGravitationHelper gravitationHelper = new SingleBuddyGravitationHelper(caches);
+      perThread.set(gravitationHelper);
+      return gravitationHelper;
+   }
+
+   /**
+    * Transforms the List in an array and calls {@link SingleBuddyGravitationHelper#inCaches(org.jboss.cache.Cache[])}
+    */
+   public static SingleBuddyGravitationHelper inCaches(List caches)
+   {
+      Cache[] cachesArray = (Cache[]) caches.toArray(new Cache[caches.size()]);
+      return inCaches(cachesArray);
+   }
+
+   /**
+    * Insted of creating a replication listener for each cache, use the already build ReplicationListeners.
+    */
+   public static SingleBuddyGravitationHelper inReplicationListeners(ReplicationListener... caches)
+   {
+      SingleBuddyGravitationHelper gravitationHelper = new SingleBuddyGravitationHelper(caches);
+      perThread.set(gravitationHelper);
+      return gravitationHelper;
+   }
+
+   /**
+    * Transforms the suplied list in an array and calls
+    * {@link SingleBuddyGravitationHelper#inReplicationListeners(org.jboss.cache.util.internals.replicationlisteners.ReplicationListener[])}
+    */
+   public static SingleBuddyGravitationHelper inReplicationListeners(List replListeners)
+   {
+      ReplicationListener[] listeners = (ReplicationListener[]) replListeners.toArray(new ReplicationListener[replListeners.size()]);
+      SingleBuddyGravitationHelper gravitationHelper = new SingleBuddyGravitationHelper(listeners);
+      perThread.set(gravitationHelper);
+      return gravitationHelper;
+   }
+
+
+   /**
+    * After the {@link SingleBuddyGravitationHelper#dataWillGravitateFrom(int)} and {@link SingleBuddyGravitationHelper#to(int)}
+    * are called, and after the gravitation call to the cache is done, here we wait for the gravitation commands to replicate.
+    */
+   public static void expectGravitation()
+   {
+      SingleBuddyGravitationHelper gravitationHelper = perThread.get();
+      assert gravitationHelper != null : "replication helper should be created. Use inReplicationListeners before calling this";
+      gravitationHelper.waitForReplication();
+   }
+
+   /**
+    * You specify here the original data owner.
+    */
+   public SingleBuddyGravitationHelper dataWillGravitateFrom(int index)
+   {
+      assertValidIndex(index);
+      this.fromIndex = index;
+      return this;
+   }
+
+   /**
+    * Here you specify where the data will migrate to.
+    */
+   public SingleBuddyGravitationHelper to(int index)
+   {
+      toIndex = index;
+      assert fromIndex >= 0 : "Must call dataWillGravitateFrom before this one";
+
+      //all other caches must receive an GravitateDataCommand
+      expectGravitateData();
+
+      //all other caches will receive an DataGravitationCleanup command
+      expectGravitationCleanup();
+
+      //buddy of the new data owner should also receive the data
+      int cacheCount = caches.length;
+      int newDataOwnerIndex = getIndex(caches[toIndex]);
+      int newBuddyIndex = (newDataOwnerIndex == cacheCount - 1) ? 0 : newDataOwnerIndex + 1;
+      replicationListeners[newBuddyIndex].expect(PutDataMapCommand.class);
+      return this;
+   }
+
+
+   private SingleBuddyGravitationHelper(Cache[] caches)
+   {
+      this.caches = caches;
+      replicationListeners = new ReplicationListener[caches.length];
+      for (int i = 0; i < caches.length; i++)
+      {
+         replicationListeners[i] = ReplicationListener.getReplicationListener(caches[i]);
+      }
+   }
+
+   private SingleBuddyGravitationHelper(ReplicationListener[] replicationListeners)
+   {
+      this.replicationListeners = replicationListeners;
+      caches = new Cache[replicationListeners.length];
+      for (int i = 0; i < replicationListeners.length; i++)
+      {
+         caches[i] = replicationListeners[i].getCache();
+      }
+   }
+
+   private void assertValidIndex(int index)
+   {
+      assert index >= 0 && index < caches.length;
+   }
+
+   //disable for now
+   private void strict()
+   {
+      this.strict = true;
+   }
+
+   private int getIndex(Cache cache)
+   {
+      for (int i = 0; i < caches.length; i++)
+      {
+         if (caches[i] == cache) return i;
+      }
+      throw new RuntimeException("cache not found withis cache instances");
+   }
+
+   private void expectGravitationCleanup()
+   {
+      if (!strict)
+      {
+         replicationListeners[fromIndex].expect(DataGravitationCleanupCommand.class);
+         return;
+      }
+      for (int i = 0; i < caches.length; i++)
+      {
+         if (i != toIndex) replicationListeners[i].expect(DataGravitationCleanupCommand.class);
+      }
+   }
+
+   private void expectGravitateData()
+   {
+      if (!strict) return;
+      //this means that we are a buddy of the node we gravitate from, so local gravitation will take place and
+      // no DataGravitation commands will be send accross
+      if (newOwnerIsBuddyOfOldOwner())
+      {
+         return;
+      }
+      for (int i = 0; i < caches.length; i++)
+      {
+         if (caches[i] != caches[toIndex]) replicationListeners[i].expect(GravitateDataCommand.class);
+      }
+   }
+
+   private boolean newOwnerIsBuddyOfOldOwner()
+   {
+      return (toIndex - 1 == fromIndex) || (toIndex == caches.length - 1 && fromIndex == 0);
+   }
+
+   private void waitForReplication()
+   {
+      for (ReplicationListener listener : replicationListeners)
+      {
+         if (listener.getCache() != caches[toIndex])
+            listener.waitForReplicationToOccur();
+      }
+   }
+}

Modified: core/trunk/src/test/java/org/jboss/cache/util/internals/replicationlisteners/PessReplicationListener.java
===================================================================
--- core/trunk/src/test/java/org/jboss/cache/util/internals/replicationlisteners/PessReplicationListener.java	2009-01-20 11:26:59 UTC (rev 7525)
+++ core/trunk/src/test/java/org/jboss/cache/util/internals/replicationlisteners/PessReplicationListener.java	2009-01-20 11:27:44 UTC (rev 7526)
@@ -2,10 +2,12 @@
 
 import org.jboss.cache.Cache;
 import org.jboss.cache.commands.ReplicableCommand;
+import org.jboss.cache.commands.read.GravitateDataCommand;
 import org.jboss.cache.commands.remote.DataGravitationCleanupCommand;
 import org.jboss.cache.commands.tx.PrepareCommand;
 import org.jboss.cache.commands.tx.CommitCommand;
 import org.jboss.cache.commands.legacy.write.*;
+import org.jboss.cache.commands.legacy.read.LegacyGravitateDataCommand;
 import org.jboss.cache.commands.write.*;
 
 import java.util.Map;
@@ -31,6 +33,7 @@
       mvcc2PessMap.put(RemoveKeyCommand.class, PessRemoveKeyCommand.class);
       mvcc2PessMap.put(RemoveNodeCommand.class, PessRemoveNodeCommand.class);
       mvcc2PessMap.put(DataGravitationCleanupCommand.class, DataGravitationCleanupCommand.class);
+      mvcc2PessMap.put(GravitateDataCommand.class, LegacyGravitateDataCommand.class);
    }
 
 

Modified: core/trunk/src/test/java/org/jboss/cache/util/internals/replicationlisteners/ReplicationListener.java
===================================================================
--- core/trunk/src/test/java/org/jboss/cache/util/internals/replicationlisteners/ReplicationListener.java	2009-01-20 11:26:59 UTC (rev 7525)
+++ core/trunk/src/test/java/org/jboss/cache/util/internals/replicationlisteners/ReplicationListener.java	2009-01-20 11:27:44 UTC (rev 7526)
@@ -1,27 +1,21 @@
 package org.jboss.cache.util.internals.replicationlisteners;
 
-import org.jboss.cache.*;
-import org.jboss.cache.config.Configuration;
+import org.jboss.cache.Cache;
+import org.jboss.cache.RPCManager;
 import org.jboss.cache.commands.ReplicableCommand;
-import org.jboss.cache.commands.VisitableCommand;
 import org.jboss.cache.commands.remote.*;
 import org.jboss.cache.commands.tx.PrepareCommand;
+import org.jboss.cache.config.Configuration;
 import org.jboss.cache.factories.ComponentRegistry;
-import org.jboss.cache.io.ByteBuffer;
-import org.jboss.cache.marshall.AbstractMarshaller;
 import org.jboss.cache.marshall.CommandAwareRpcDispatcher;
-import org.jboss.cache.marshall.InactiveRegionAwareRpcDispatcher;
-import org.jboss.cache.marshall.Marshaller;
-import org.jboss.cache.marshall.RegionalizedMethodCall;
+import org.jboss.cache.marshall.ReplicationObserver;
 import org.jboss.cache.util.TestingUtil;
-import org.jgroups.blocks.RpcDispatcher;
-import org.jgroups.util.Buffer;
 import org.jgroups.Address;
 
-import java.io.InputStream;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
@@ -56,13 +50,14 @@
  * @author Mircea.Markus at jboss.com
  * @since 2.2
  */
-abstract public class ReplicationListener
+abstract public class ReplicationListener implements ReplicationObserver
 {
    public static final long DEFAULT_TIMEOUT = 10000;
    private CountDownLatch latch = new CountDownLatch(1);
    protected List<Class<? extends ReplicableCommand>> expectedCommands;
    protected Configuration config;
    protected final Address localAddress;
+   private Cache cache;
 
    /**
     * Builds a listener that will observe the given cache for recieving replication commands.
@@ -72,21 +67,13 @@
       ComponentRegistry componentRegistry = TestingUtil.extractComponentRegistry(cache);
       RPCManager rpcManager = componentRegistry.getComponent(RPCManager.class);
       CommandAwareRpcDispatcher realDispatcher = (CommandAwareRpcDispatcher) TestingUtil.extractField(rpcManager, "rpcDispatcher");
-      RpcDispatcher.Marshaller2 realMarshaller = (RpcDispatcher.Marshaller2) realDispatcher.getMarshaller();
-      RpcDispatcher.Marshaller2 delegate = null;
-//      if ((realMarshaller instanceof RegionMarshallerDelegate) || (realMarshaller instanceof MarshallerDelegate))
-//      {
-//         throw new RuntimeException("Illegal state");
-//      }
-      if (realDispatcher instanceof InactiveRegionAwareRpcDispatcher)
-         delegate = new RegionMarshallerDelegate((Marshaller) realMarshaller);
-      else
-         delegate = new MarshallerDelegate(realMarshaller);
-      realDispatcher.setMarshaller(delegate);
-      realDispatcher.setRequestMarshaller(delegate);
-      realDispatcher.setResponseMarshaller(delegate);
-      this.config = cache.getConfiguration();
+      if (realDispatcher.setReplicationObserver(this) != null)
+      {
+         throw new RuntimeException("Replication listener already present");
+      }
       this.localAddress = cache.getLocalAddress();
+      this.config = cache.getConfiguration();
+      this.cache = cache;
    }
 
    protected ReplicationListener()
@@ -125,118 +112,41 @@
       }
    }
 
-   private class MarshallerDelegate implements RpcDispatcher.Marshaller2
+   public void afterExecutingCommand(ReplicableCommand realOne)
    {
-      RpcDispatcher.Marshaller2 marshaller;
-
-      private MarshallerDelegate(RpcDispatcher.Marshaller2 marshaller)
+      if (expectedCommands == null)
       {
-         this.marshaller = marshaller;
+         log("skipping command " + realOne);
+         return;
       }
+      log("Processed command: " + realOne);
 
-      public byte[] objectToByteBuffer(Object obj) throws Exception
+      if (realOne instanceof ReplicateCommand)
       {
-         return marshaller.objectToByteBuffer(obj);
+         postReplicateExecution((ReplicateCommand)realOne);
       }
-
-      public Object objectFromByteBuffer(byte bytes[]) throws Exception
+      else
       {
-         Object result = marshaller.objectFromByteBuffer(bytes);
-         return process(result);
+         postNonVisitableExecution(realOne);
       }
-
-      public Object objectFromByteBuffer(byte[] bytes, int i, int i1) throws Exception
+      if (expectedCommands.isEmpty())
       {
-         Object result = marshaller.objectFromByteBuffer(bytes, i, i1);
-         return process(result);
+         latch.countDown();
       }
 
-      private Object process(Object result)
-      {
-         boolean isReplicate = result instanceof ReplicateCommand;
-         boolean isRemote = result != null && isRemoteCommand(result.getClass());
-         if ((isReplicate || isRemote) && (expectedCommands != null))
-         {
-            ReplicableCommand replicateCommand = (ReplicableCommand) result;
-            result = new ReplicableCommandDelegate(replicateCommand);
-         }
-         return result;
-      }
-
-      public Buffer objectToBuffer(Object o) throws Exception
-      {
-         return marshaller.objectToBuffer(o);
-      }
    }
 
-   /**
-    * We want the notification to be performed only *after* the remote command is executed.
-    */
-   private class ReplicableCommandDelegate implements ReplicableCommand
+   private void log(String s)
    {
-      ReplicableCommand realOne;
-
-      private ReplicableCommandDelegate(ReplicableCommand realOne)
-      {
-         if (realOne instanceof VisitableCommand)
-         {
-            throw new IllegalArgumentException("Visitable commands not allowed!!! ;recieved="
-                  + realOne.getClass().getName());
-         }
-         this.realOne = realOne;
-      }
-
-      public Object perform(InvocationContext ctx) throws Throwable
-      {
-         try
-         {
-            return realOne.perform(ctx);
-         }
-         finally
-         {
-           log("Processed command: " + realOne);
-           if (realOne instanceof ReplicateCommand)
-            {
-               postReplicateExecution((ReplicateCommand)realOne);
-            }
-            else
-            {
-               postNonVisitableExecution(realOne);
-            }
-            if (expectedCommands.isEmpty())
-            {
-               latch.countDown();
-            }
-         }
-      }
-
-     public int getCommandId()
-      {
-         return realOne.getCommandId();
-      }
-
-      public Object[] getParameters()
-      {
-         return realOne.getParameters();
-      }
-
-      public void setParameters(int commandId, Object[] parameters)
-      {
-         realOne.setParameters(commandId, parameters);
-      }
-
-      @Override
-      public String toString()
-      {
-         return "ReplicableCommandDelegate{" +
-               "realOne=" + realOne +
-               '}';
-      }
+      System.out.println("[" + localAddress + "] " + s);
    }
 
    protected void postNonVisitableExecution(ReplicableCommand realOne)
    {
-      expectedCommands.remove(realOne.getClass());
+      if (!expectedCommands.remove(realOne.getClass()))
+      {
+         log("not expecting command " + realOne + " ");
+      }
    }
 
    protected void postReplicateExecution(ReplicateCommand realOne)
@@ -259,74 +169,16 @@
       }
    }
 
-   /**
-    * Needed for region based marshalling.
-    */
-   private class RegionMarshallerDelegate extends AbstractMarshaller
-   {
-      private Marshaller realOne;
 
-      private RegionMarshallerDelegate(Marshaller realOne)
-      {
-         this.realOne = realOne;
-      }
-
-      public void objectToObjectStream(Object obj, ObjectOutputStream out) throws Exception
-      {
-         realOne.objectToObjectStream(obj, out);
-      }
-
-      public Object objectFromObjectStream(ObjectInputStream in) throws Exception
-      {
-         return realOne.objectFromObjectStream(in);
-      }
-
-      public Object objectFromStream(InputStream is) throws Exception
-      {
-         return realOne.objectFromStream(is);
-      }
-
-      public void objectToObjectStream(Object obj, ObjectOutputStream out, Fqn region) throws Exception
-      {
-         realOne.objectToObjectStream(obj, out, region);
-      }
-
-      public RegionalizedMethodCall regionalizedMethodCallFromByteBuffer(byte[] buffer) throws Exception
-      {
-         RegionalizedMethodCall result = realOne.regionalizedMethodCallFromByteBuffer(buffer);
-         if (result.command instanceof ReplicateCommand && expectedCommands != null)
-         {
-            ReplicateCommand replicateCommand = (ReplicateCommand) result.command;
-            result.command = new ReplicableCommandDelegate(replicateCommand);
-         }
-         return result;
-      }
-
-      public RegionalizedMethodCall regionalizedMethodCallFromObjectStream(ObjectInputStream in) throws Exception
-      {
-         return realOne.regionalizedMethodCallFromObjectStream(in);
-      }
-
-      public ByteBuffer objectToBuffer(Object o) throws Exception
-      {
-         return realOne.objectToBuffer(o);
-      }
-
-      public Object objectFromByteBuffer(byte[] bytes, int i, int i1) throws Exception
-      {
-         return realOne.objectFromByteBuffer(bytes, i, i1);
-      }
-   }
-
    /**
     * Blocks for the elements specified through {@link #expect(Class[])} invocations to be replicated in this cache.
     * if replication does not occur in the give timeout then an exception is being thrown.
     */
    public void waitForReplicationToOccur(long timeoutMillis)
    {
-      log("enter... ReplicationListener.waitForReplicationToOccur");
+//      log("enter... ReplicationListener.waitForReplicationToOccur");
       waitForReplicationToOccur(timeoutMillis, TimeUnit.MILLISECONDS);
-      log("exit... ReplicationListener.waitForReplicationToOccur");
+//      log("exit... ReplicationListener.waitForReplicationToOccur");
    }
 
    /**
@@ -350,7 +202,7 @@
       {
          if (!expectedCommands.isEmpty() && !latch.await(timeout, timeUnit))
          {
-            assert false : "waiting for more than " + timeout + " " + timeUnit + " and following commands did not replicate: " + expectedCommands;
+            assert false : "[" +localAddress + "] waiting for more than " + timeout + " " + timeUnit + " and following commands did not replicate: " + expectedCommands;
          }
       }
       catch (InterruptedException e)
@@ -390,8 +242,8 @@
             clazz.equals(RemoveFromBuddyGroupCommand.class) || clazz.equals(ReplicateCommand.class);
    }
 
-   private void log(String msg)
+   public Cache getCache()
    {
-      System.out.println("[" + localAddress + "]" + msg);
+      return cache;
    }
 }




More information about the jbosscache-commits mailing list