[jbosscache-commits] JBoss Cache SVN: r7184 - in core/trunk/src: main/java/org/jboss/cache/interceptors and 4 other directories.

jbosscache-commits at lists.jboss.org jbosscache-commits at lists.jboss.org
Fri Nov 21 11:35:19 EST 2008


Author: mircea.markus
Date: 2008-11-21 11:35:19 -0500 (Fri, 21 Nov 2008)
New Revision: 7184

Modified:
   core/trunk/src/main/java/org/jboss/cache/RPCManagerImpl.java
   core/trunk/src/main/java/org/jboss/cache/interceptors/LegacyDataGravitatorInterceptor.java
   core/trunk/src/test/java/org/jboss/cache/buddyreplication/BuddyGroupAssignmentTest.java
   core/trunk/src/test/java/org/jboss/cache/buddyreplication/BuddyReplicationFailoverTest.java
   core/trunk/src/test/java/org/jboss/cache/buddyreplication/BuddyReplicationTestsBase.java
   core/trunk/src/test/java/org/jboss/cache/loader/ClusteredCacheLoaderTest.java
   core/trunk/src/test/java/org/jboss/cache/options/cachemodelocal/CacheModeLocalTestBase.java
   core/trunk/src/test/java/org/jboss/cache/util/internals/replicationlisteners/OptimisticReplicationListener.java
   core/trunk/src/test/java/org/jboss/cache/util/internals/replicationlisteners/PessReplicationListener.java
   core/trunk/src/test/java/org/jboss/cache/util/internals/replicationlisteners/ReplicationListener.java
Log:
test concurrency

Modified: core/trunk/src/main/java/org/jboss/cache/RPCManagerImpl.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/RPCManagerImpl.java	2008-11-21 16:28:39 UTC (rev 7183)
+++ core/trunk/src/main/java/org/jboss/cache/RPCManagerImpl.java	2008-11-21 16:35:19 UTC (rev 7184)
@@ -120,7 +120,7 @@
    private InterceptorChain interceptorChain;
 
    private boolean isUsingBuddyReplication;
-   private boolean isInLocalMode;
+   private volatile boolean isInLocalMode;
    private ComponentRegistry componentRegistry;
    private LockManager lockManager;
 

Modified: core/trunk/src/main/java/org/jboss/cache/interceptors/LegacyDataGravitatorInterceptor.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/interceptors/LegacyDataGravitatorInterceptor.java	2008-11-21 16:28:39 UTC (rev 7183)
+++ core/trunk/src/main/java/org/jboss/cache/interceptors/LegacyDataGravitatorInterceptor.java	2008-11-21 16:35:19 UTC (rev 7184)
@@ -322,7 +322,7 @@
       if (trace) log.trace("Requesting data gravitation for Fqn " + fqn);
 
       List<Address> mbrs = rpcManager.getMembers();
-      Boolean searchSubtrees = buddyManager.isDataGravitationSearchBackupTrees() ? Boolean.TRUE : Boolean.FALSE;
+      Boolean searchSubtrees = buddyManager.isDataGravitationSearchBackupTrees();
       GravitateDataCommand command = commandsFactory.buildGravitateDataCommand(fqn, searchSubtrees);
       // doing a GET_ALL is crappy but necessary since JGroups' GET_FIRST could return null results from nodes that do
       // not have either the primary OR backup, and stop polling other valid nodes.

Modified: core/trunk/src/test/java/org/jboss/cache/buddyreplication/BuddyGroupAssignmentTest.java
===================================================================
--- core/trunk/src/test/java/org/jboss/cache/buddyreplication/BuddyGroupAssignmentTest.java	2008-11-21 16:28:39 UTC (rev 7183)
+++ core/trunk/src/test/java/org/jboss/cache/buddyreplication/BuddyGroupAssignmentTest.java	2008-11-21 16:35:19 UTC (rev 7184)
@@ -94,8 +94,8 @@
       assertNoLocks(caches);
    }
 
-   public void testRemovalFromCluster2Buddies() throws Exception
-   {
+   public void testRemovalFromCluster2Buddies() throws Throwable
+{
       log.debug("Running testRemovalFromCluster2Buddies");
       List<CacheSPI<Object, Object>> caches = createCaches(2, 4, false);
       cachesTL.set(caches);

Modified: core/trunk/src/test/java/org/jboss/cache/buddyreplication/BuddyReplicationFailoverTest.java
===================================================================
--- core/trunk/src/test/java/org/jboss/cache/buddyreplication/BuddyReplicationFailoverTest.java	2008-11-21 16:28:39 UTC (rev 7183)
+++ core/trunk/src/test/java/org/jboss/cache/buddyreplication/BuddyReplicationFailoverTest.java	2008-11-21 16:35:19 UTC (rev 7184)
@@ -8,7 +8,9 @@
 
 import org.jboss.cache.CacheSPI;
 import org.jboss.cache.Fqn;
+import org.jboss.cache.commands.remote.DataGravitationCleanupCommand;
 import org.jboss.cache.util.TestingUtil;
+import org.jboss.cache.util.internals.replicationlisteners.ReplicationListener;
 import org.jgroups.JChannel;
 import org.jgroups.protocols.DISCARD;
 import static org.testng.AssertJUnit.*;
@@ -88,11 +90,13 @@
 
       System.out.println("***** Killed original data owner, about to call a get on a different cache instance.  *****");
 
+      ReplicationListener replListener = ReplicationListener.getReplicationListener(caches.get(1));
+      replListener.expect(DataGravitationCleanupCommand.class);
+
       // according to data gravitation, a call to *any* cache should retrieve the data, and move the data to the new cache.
       assertEquals("Value should have gravitated", value, caches.get(2).get(fqn, key));
+      replListener.waitForReplicationToOccur();
 
-      delay(); // cleanup commands are async
-
       TestingUtil.dumpCacheContents(caches);
 
       // now lets test the eviction part of gravitation
@@ -173,9 +177,12 @@
 
       System.out.println("*** Calling get() on cache[1] with force option");
 
+      ReplicationListener replicationListener0 = ReplicationListener.getReplicationListener(caches.get(0));
+      replicationListener0.expect(DataGravitationCleanupCommand.class);
+
       caches.get(1).getInvocationContext().getOptionOverrides().setForceDataGravitation(true);
       assertEquals("value", caches.get(1).get(fqn, key));
-      delay(); // cleanup commands are async
+      replicationListener0.waitForReplicationToOccur(); // cleanup commands are async
 
       TestingUtil.dumpCacheContents(caches);
 
@@ -218,8 +225,10 @@
       assertNoLocks(caches);
 
       // gravitate to 2:
+      ReplicationListener replListener1 = ReplicationListener.getReplicationListener(caches.get(0));
+      replListener1.expect(DataGravitationCleanupCommand.class);
       caches.get(2).getNode(fqn);  // expectWithTx entire subtree to gravitate.
-      delay(); // cleanup commands are async
+      replListener1.waitForReplicationToOccur(); // cleanup commands are async
 
       Fqn newBackupFqn = fqnTransformer.getBackupFqn(caches.get(2).getLocalAddress(), fqn);
       Fqn newBackupFqn2 = fqnTransformer.getBackupFqn(caches.get(2).getLocalAddress(), fqn2);
@@ -248,9 +257,4 @@
 
       assertNoLocks(caches);
    }
-
-   protected void delay()
-   {
-      TestingUtil.sleepThread(250);
-   }
 }

Modified: core/trunk/src/test/java/org/jboss/cache/buddyreplication/BuddyReplicationTestsBase.java
===================================================================
--- core/trunk/src/test/java/org/jboss/cache/buddyreplication/BuddyReplicationTestsBase.java	2008-11-21 16:28:39 UTC (rev 7183)
+++ core/trunk/src/test/java/org/jboss/cache/buddyreplication/BuddyReplicationTestsBase.java	2008-11-21 16:35:19 UTC (rev 7184)
@@ -95,9 +95,9 @@
       }
       cachesTL.set(null);
       System.gc();
-      
+
       new UnitTestCacheFactory().cleanUp();
-      
+
    }
 
    protected final static int VIEW_BLOCK_TIMEOUT = 5000;
@@ -131,7 +131,7 @@
    {
       CacheSPI<Object, Object> c = (CacheSPI<Object, Object>) new UnitTestCacheFactory<Object, Object>().createCache(UnitTestCacheConfigurationFactory.createConfiguration(CacheMode.REPL_SYNC, false, false, true), false);
 
-      String threadId = Thread.currentThread().getName();      
+      String threadId = Thread.currentThread().getName();
       //c.getConfiguration().setClusterName("BuddyReplicationTest-" + threadId);
 
       BuddyReplicationConfig brc = new BuddyReplicationConfig();
@@ -321,7 +321,7 @@
 
    public void waitForBuddy(Cache dataOwner, Cache buddy, boolean onlyBuddy) throws Exception
    {
-      waitForBuddy(dataOwner, buddy, onlyBuddy, 60000);   
+      waitForBuddy(dataOwner, buddy, onlyBuddy, 60000);
    }
 
    /**
@@ -333,7 +333,7 @@
       long timeout = 60000 + caches.length;
       for (int i = 0; i < caches.length - 1; i++)
       {
-         waitForBuddy(caches[i], caches[i+1], true, timeout);
+         waitForBuddy(caches[i], caches[i + 1], true, timeout);
       }
       waitForBuddy(caches[caches.length - 1], caches[0], true, timeout);
    }
@@ -367,7 +367,7 @@
       BuddyGroup group = buddyBuddyManager.buddyGroupsIParticipateIn.get(dataOwnerLocalAddress);
       result = result & buddyBuddyManager.buddyGroupsIParticipateIn.containsKey(dataOwnerLocalAddress);
       if (onlyBuddy) result = result && group.getBuddies().size() == 1;
-      result = result & group!= null && group.getBuddies() != null && group.getBuddies().contains(buddyLocalAddress);
+      result = result & group != null && group.getBuddies() != null && group.getBuddies().contains(buddyLocalAddress);
       return result;
    }
 

Modified: core/trunk/src/test/java/org/jboss/cache/loader/ClusteredCacheLoaderTest.java
===================================================================
--- core/trunk/src/test/java/org/jboss/cache/loader/ClusteredCacheLoaderTest.java	2008-11-21 16:28:39 UTC (rev 7183)
+++ core/trunk/src/test/java/org/jboss/cache/loader/ClusteredCacheLoaderTest.java	2008-11-21 16:35:19 UTC (rev 7184)
@@ -53,9 +53,7 @@
 
       Configuration c1 = new Configuration();
       Configuration c2 = new Configuration();
-      //c1.setClusterName("CCL-Test-" + Thread.currentThread().getName());
       c1.setStateRetrievalTimeout(2000);
-      //c2.setClusterName("CCL-Test-" + Thread.currentThread().getName());
       c2.setStateRetrievalTimeout(2000);
       c1.setCacheMode(Configuration.CacheMode.REPL_SYNC);
       c2.setCacheMode(Configuration.CacheMode.REPL_SYNC);
@@ -69,6 +67,8 @@
       
       cache1 = (CacheSPI<Object, Object>) new UnitTestCacheFactory<Object, Object>().createCache(c1, false);
       cache2 = (CacheSPI<Object, Object>) new UnitTestCacheFactory<Object, Object>().createCache(c2, false);
+      cache1.getConfiguration().setSerializationExecutorPoolSize(0);
+      cache2.getConfiguration().setSerializationExecutorPoolSize(0);
 
 
       if (useRegionBasedMarshalling)
@@ -225,7 +225,7 @@
       assertTrue("should exist", loader2.exists(fqn));
    }
 
-   public void testCacheLoaderThreadSafety() throws Exception
+   public void testCacheLoaderThreadSafety() throws Throwable
    {
       threadSafetyTest(true);
    }

Modified: core/trunk/src/test/java/org/jboss/cache/options/cachemodelocal/CacheModeLocalTestBase.java
===================================================================
--- core/trunk/src/test/java/org/jboss/cache/options/cachemodelocal/CacheModeLocalTestBase.java	2008-11-21 16:28:39 UTC (rev 7183)
+++ core/trunk/src/test/java/org/jboss/cache/options/cachemodelocal/CacheModeLocalTestBase.java	2008-11-21 16:35:19 UTC (rev 7184)
@@ -724,10 +724,8 @@
          assertNull("should be invalidated", cache2.get(fqn, key));
       }
 
-      // now cache2
-      replListener1.expect(PutDataMapCommand.class);
+      //do not expect replication for this one as the node is already thre
       Node node2 = root2.addChild(fqn);
-      replListener1.waitForReplicationToOccur();
 
       mgr = cache2.getTransactionManager();
       replListener1.expectWithTx(PutKeyValueCommand.class);

Modified: core/trunk/src/test/java/org/jboss/cache/util/internals/replicationlisteners/OptimisticReplicationListener.java
===================================================================
--- core/trunk/src/test/java/org/jboss/cache/util/internals/replicationlisteners/OptimisticReplicationListener.java	2008-11-21 16:28:39 UTC (rev 7183)
+++ core/trunk/src/test/java/org/jboss/cache/util/internals/replicationlisteners/OptimisticReplicationListener.java	2008-11-21 16:35:19 UTC (rev 7184)
@@ -27,10 +27,17 @@
     */
    public void expect(Class<? extends ReplicableCommand>... expectedCommands)
    {
-      //in the case of optimistic replication, an prepare and an commit is expected foe each node 
+      //in the case of optimistic replication, an prepare and an commit is expected foe each node
       for (Class<? extends ReplicableCommand> command : expectedCommands)
       {
-         internalExpect(OptimisticPrepareCommand.class, CommitCommand.class);
+         if (isRemoteCommand(command))
+         {
+            internalExpect(command);
+         }
+         else
+         {
+            internalExpect(OptimisticPrepareCommand.class, CommitCommand.class);
+         }
       }
    }
 
@@ -42,7 +49,7 @@
       internalExpect(OptimisticPrepareCommand.class, CommitCommand.class);
    }
 
-   protected void postCommandExecution(ReplicateCommand realOne)
+   protected void postReplicateExecution(ReplicateCommand realOne)
    {
       int initialExpectations = expectedCommands.size();
       List<ReplicableCommand> mods = getAllModifications(realOne);

Modified: core/trunk/src/test/java/org/jboss/cache/util/internals/replicationlisteners/PessReplicationListener.java
===================================================================
--- core/trunk/src/test/java/org/jboss/cache/util/internals/replicationlisteners/PessReplicationListener.java	2008-11-21 16:28:39 UTC (rev 7183)
+++ core/trunk/src/test/java/org/jboss/cache/util/internals/replicationlisteners/PessReplicationListener.java	2008-11-21 16:35:19 UTC (rev 7184)
@@ -2,6 +2,7 @@
 
 import org.jboss.cache.Cache;
 import org.jboss.cache.commands.ReplicableCommand;
+import org.jboss.cache.commands.remote.DataGravitationCleanupCommand;
 import org.jboss.cache.commands.tx.PrepareCommand;
 import org.jboss.cache.commands.tx.CommitCommand;
 import org.jboss.cache.commands.legacy.write.*;
@@ -29,6 +30,7 @@
       mvcc2PessMap.put(PutKeyValueCommand.class, PessPutKeyValueCommand.class);
       mvcc2PessMap.put(RemoveKeyCommand.class, PessRemoveKeyCommand.class);
       mvcc2PessMap.put(RemoveNodeCommand.class, PessRemoveNodeCommand.class);
+      mvcc2PessMap.put(DataGravitationCleanupCommand.class, DataGravitationCleanupCommand.class);
    }
 
 
@@ -66,6 +68,8 @@
 
    private Class<? extends ReplicableCommand> getPessCommand(Class<? extends ReplicableCommand> command)
    {
-      return mvcc2PessMap.get((Class<? extends ReplicableCommand>) command);
+      Class<? extends ReplicableCommand> result = mvcc2PessMap.get((Class<? extends ReplicableCommand>) command);
+      if (result == null) throw new IllegalStateException("Unknown command: " + command);
+      return result;
    }
 }

Modified: core/trunk/src/test/java/org/jboss/cache/util/internals/replicationlisteners/ReplicationListener.java
===================================================================
--- core/trunk/src/test/java/org/jboss/cache/util/internals/replicationlisteners/ReplicationListener.java	2008-11-21 16:28:39 UTC (rev 7183)
+++ core/trunk/src/test/java/org/jboss/cache/util/internals/replicationlisteners/ReplicationListener.java	2008-11-21 16:35:19 UTC (rev 7184)
@@ -3,7 +3,8 @@
 import org.jboss.cache.*;
 import org.jboss.cache.config.Configuration;
 import org.jboss.cache.commands.ReplicableCommand;
-import org.jboss.cache.commands.remote.ReplicateCommand;
+import org.jboss.cache.commands.VisitableCommand;
+import org.jboss.cache.commands.remote.*;
 import org.jboss.cache.commands.tx.PrepareCommand;
 import org.jboss.cache.factories.ComponentRegistry;
 import org.jboss.cache.io.ByteBuffer;
@@ -15,6 +16,7 @@
 import org.jboss.cache.util.TestingUtil;
 import org.jgroups.blocks.RpcDispatcher;
 import org.jgroups.util.Buffer;
+import org.jgroups.Address;
 
 import java.io.InputStream;
 import java.io.ObjectInputStream;
@@ -61,6 +63,7 @@
    private CountDownLatch latch = new CountDownLatch(1);
    protected List<Class<? extends ReplicableCommand>> expectedCommands;
    protected Configuration config;
+   protected final Address localAddress;
 
    /**
     * Builds a listener that will observe the given cache for recieving replication commands.
@@ -84,10 +87,12 @@
       realDispatcher.setRequestMarshaller(delegate);
       realDispatcher.setResponseMarshaller(delegate);
       this.config = cache.getConfiguration();
+      this.localAddress = cache.getLocalAddress();
    }
 
    protected ReplicationListener()
    {
+      localAddress = null;
    }
 
    abstract public void expect(Class<? extends ReplicableCommand>... expectedCommands);
@@ -138,44 +143,51 @@
       public Object objectFromByteBuffer(byte bytes[]) throws Exception
       {
          Object result = marshaller.objectFromByteBuffer(bytes);
-         if (result instanceof ReplicateCommand && expectedCommands != null)
-         {
-            ReplicateCommand replicateCommand = (ReplicateCommand) result;
-            return new ReplicateCommandDelegate(replicateCommand);
-         }
-         return result;
+         return process(result);
       }
 
-      public Buffer objectToBuffer(Object o) throws Exception
+      public Object objectFromByteBuffer(byte[] bytes, int i, int i1) throws Exception
       {
-         return marshaller.objectToBuffer(o);
+         Object result = marshaller.objectFromByteBuffer(bytes, i, i1);
+         return process(result);
       }
 
-      public Object objectFromByteBuffer(byte[] bytes, int i, int i1) throws Exception
+      private Object process(Object result)
       {
-         Object result = marshaller.objectFromByteBuffer(bytes, i, i1);
-         if (result instanceof ReplicateCommand && expectedCommands != null)
+         System.out.println(localAddress + " >>>> " + result );
+         boolean isReplicate = result instanceof ReplicateCommand;
+         boolean isRemote = result != null && isRemoteCommand(result.getClass());
+         if ((isReplicate || isRemote) && (expectedCommands != null))
          {
-            ReplicateCommand replicateCommand = (ReplicateCommand) result;
-            return new ReplicateCommandDelegate(replicateCommand);
+            ReplicableCommand replicateCommand = (ReplicableCommand) result;
+            result = new ReplicableCommandDelegate(replicateCommand);
          }
          return result;
       }
+
+      public Buffer objectToBuffer(Object o) throws Exception
+      {
+         return marshaller.objectToBuffer(o);
+      }
    }
 
    /**
     * We want the notification to be performed only *after* the remote command is executed.
     */
-   private class ReplicateCommandDelegate extends ReplicateCommand
+   private class ReplicableCommandDelegate implements ReplicableCommand
    {
-      ReplicateCommand realOne;
+      ReplicableCommand realOne;
 
-      private ReplicateCommandDelegate(ReplicateCommand realOne)
+      private ReplicableCommandDelegate(ReplicableCommand realOne)
       {
+         if (realOne instanceof VisitableCommand)
+         {
+            throw new IllegalArgumentException("Visitable commands not allowed!!! ;recieved="
+                  + realOne.getClass().getName());
+         }
          this.realOne = realOne;
       }
 
-      @Override
       public Object perform(InvocationContext ctx) throws Throwable
       {
          try
@@ -184,19 +196,45 @@
          }
          finally
          {
-            postCommandExecution(realOne);
+            if (realOne instanceof ReplicateCommand)
+            {
+               postReplicateExecution((ReplicateCommand)realOne);
+            }
+            else
+            {
+               postNonVisitableExecution(realOne);
+            }
             if (expectedCommands.isEmpty())
             {
                latch.countDown();
             }
          }
       }
+
+      public int getCommandId()
+      {
+         return realOne.getCommandId();
+      }
+
+      public Object[] getParameters()
+      {
+         return realOne.getParameters();
+      }
+
+      public void setParameters(int commandId, Object[] parameters)
+      {
+         realOne.setParameters(commandId, parameters);
+      }
    }
 
-
-   protected void postCommandExecution(ReplicateCommand realOne)
+   protected void postNonVisitableExecution(ReplicableCommand realOne)
    {
-      System.out.println("Processed command: " + realOne + "; expecting - " + expectedCommands.size() + " commands");
+      expectedCommands.remove(realOne.getClass());
+   }
+   
+   protected void postReplicateExecution(ReplicateCommand realOne)
+   {
+      System.out.println("Processed command: " + realOne + "; expecting " + expectedCommands.size() + " commands");
       Iterator<Class<? extends ReplicableCommand>> it = expectedCommands.iterator();
       while (it.hasNext())
       {
@@ -254,7 +292,7 @@
          if (result.command instanceof ReplicateCommand && expectedCommands != null)
          {
             ReplicateCommand replicateCommand = (ReplicateCommand) result.command;
-            result.command = new ReplicateCommandDelegate(replicateCommand);
+            result.command = new ReplicableCommandDelegate(replicateCommand);
          }
          return result;
       }
@@ -333,4 +371,11 @@
       }
       this.expectedCommands.addAll(Arrays.asList(expectedCommands));
    }
+
+   protected boolean isRemoteCommand(Class clazz)
+   {
+      return clazz.equals(AnnounceBuddyPoolNameCommand.class) || clazz.equals(AssignToBuddyGroupCommand.class) ||
+            clazz.equals(ClusteredGetCommand.class) || clazz.equals(DataGravitationCleanupCommand.class) ||
+            clazz.equals(RemoveFromBuddyGroupCommand.class) || clazz.equals(ReplicateCommand.class);
+   }
 }




More information about the jbosscache-commits mailing list