[jbosscache-commits] JBoss Cache SVN: r5660 - in core/trunk/src/test/java/org/jboss/cache: transaction and 1 other directory.

jbosscache-commits at lists.jboss.org jbosscache-commits at lists.jboss.org
Thu Apr 24 08:05:20 EDT 2008


Author: manik.surtani at jboss.com
Date: 2008-04-24 08:05:19 -0400 (Thu, 24 Apr 2008)
New Revision: 5660

Modified:
   core/trunk/src/test/java/org/jboss/cache/misc/TestingUtil.java
   core/trunk/src/test/java/org/jboss/cache/transaction/PrepareCommitContentionTest.java
Log:
Added test

Modified: core/trunk/src/test/java/org/jboss/cache/misc/TestingUtil.java
===================================================================
--- core/trunk/src/test/java/org/jboss/cache/misc/TestingUtil.java	2008-04-24 11:10:33 UTC (rev 5659)
+++ core/trunk/src/test/java/org/jboss/cache/misc/TestingUtil.java	2008-04-24 12:05:19 UTC (rev 5660)
@@ -20,6 +20,7 @@
 import org.jboss.cache.invocation.InterceptorChain;
 import org.jboss.cache.loader.CacheLoader;
 import org.jboss.cache.loader.CacheLoaderManager;
+import org.jgroups.JChannel;
 
 import java.io.File;
 import java.lang.reflect.Field;
@@ -559,4 +560,30 @@
    {
       return (CommandsFactory) extractField(cache, "commandsFactory");
    }
+
+   public static String getJGroupsAttribute(Cache cache, String protocol, String attribute)
+   {
+      String s = ((JChannel) ((CacheSPI) cache).getRPCManager().getChannel()).getProperties();
+      String[] protocols = s.split(":");
+      String attribs = null;
+      for (String p : protocols)
+      {
+         boolean hasAttribs = p.contains("(");
+         String name = hasAttribs ? p.substring(0, p.indexOf('(')) : p;
+         attribs = hasAttribs ? p.substring(p.indexOf('(') + 1, p.length() - 1) : null;
+
+         if (name.equalsIgnoreCase(protocol)) break;
+      }
+
+      if (attribs != null)
+      {
+         String[] attrArray = attribs.split(";");
+         for (String a : attrArray)
+         {
+            String[] kvPairs = a.split("=");
+            if (kvPairs[0].equalsIgnoreCase(attribute)) return kvPairs[1];
+         }
+      }
+      return null;
+   }
 }

Modified: core/trunk/src/test/java/org/jboss/cache/transaction/PrepareCommitContentionTest.java
===================================================================
--- core/trunk/src/test/java/org/jboss/cache/transaction/PrepareCommitContentionTest.java	2008-04-24 11:10:33 UTC (rev 5659)
+++ core/trunk/src/test/java/org/jboss/cache/transaction/PrepareCommitContentionTest.java	2008-04-24 12:05:19 UTC (rev 5660)
@@ -4,24 +4,24 @@
 import org.jboss.cache.DefaultCacheFactory;
 import org.jboss.cache.Fqn;
 import org.jboss.cache.RPCManager;
-import org.jboss.cache.RPCManagerImpl;
 import org.jboss.cache.commands.CacheCommand;
 import org.jboss.cache.commands.remote.ReplicateCommand;
 import org.jboss.cache.commands.tx.CommitCommand;
 import org.jboss.cache.commands.tx.PrepareCommand;
-import org.jboss.cache.config.Configuration;
+import static org.jboss.cache.config.Configuration.CacheMode.REPL_SYNC;
 import org.jboss.cache.factories.ComponentRegistry;
+import org.jboss.cache.factories.UnitTestCacheConfigurationFactory;
 import org.jboss.cache.misc.TestingUtil;
 import org.jgroups.Address;
+import org.jgroups.Channel;
 import org.jgroups.blocks.RspFilter;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
-import javax.transaction.TransactionManager;
-import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
-import java.util.concurrent.CountDownLatch;
+import java.util.Map;
 
 /**
  * This is to test the scenario described in http://jira.jboss.org/jira/browse/JBCACHE-1270
@@ -40,186 +40,118 @@
 @Test(groups = "functional")
 public class PrepareCommitContentionTest
 {
-   CacheSPI<Object, Object> c1, c2;
+   CacheSPI<Object, Object> c1;
 
    @BeforeMethod
    public void setUp() throws CloneNotSupportedException
    {
-      c1 = (CacheSPI<Object, Object>) new DefaultCacheFactory<Object, Object>().createCache(false);
-      c1.getConfiguration().setCacheMode(Configuration.CacheMode.REPL_SYNC);
-      c1.getConfiguration().setTransactionManagerLookupClass(DummyTransactionManagerLookup.class.getName());
-      c1.getConfiguration().setLockAcquisitionTimeout(5000);
-      c2 = (CacheSPI<Object, Object>) new DefaultCacheFactory<Object, Object>().createCache(c1.getConfiguration().clone(), false);
+      c1 = (CacheSPI<Object, Object>) new DefaultCacheFactory<Object, Object>().createCache(UnitTestCacheConfigurationFactory.createConfiguration(REPL_SYNC));
    }
 
    @AfterMethod
    public void tearDown()
    {
-      TestingUtil.killCaches(c1, c2);
+      TestingUtil.killCaches(c1);
    }
 
-   public void testWithSyncCommitPhase() throws Exception
+   public void testOOBFlag() throws Exception
    {
-      doTest(true, false);
-   }
+      DelegatingRPCManager delegatingRPCManager = new DelegatingRPCManager();
+      ComponentRegistry cr = TestingUtil.extractComponentRegistry(c1);
+      RPCManager origRpcManager = cr.getComponent(RPCManager.class);
+      delegatingRPCManager.delegate = origRpcManager;
+      cr.registerComponent(delegatingRPCManager, RPCManager.class);
 
-   public void testWithDefautCommitPhase() throws Exception
-   {
-      doTest(false, false);
+      c1.getTransactionManager().begin();
+      c1.put("/a", "k", "v");
+      c1.getTransactionManager().commit();
+
+      // now check what we have gathered:
+
+      assert delegatingRPCManager.log.get(CommitCommand.class) : "Commit commands should be sent using OOB!";
+      assert !delegatingRPCManager.log.get(PrepareCommand.class) : "Prepare commands should NOT be sent using OOB!";
    }
 
-   public void testControl() throws Exception
+   private static class DelegatingRPCManager implements RPCManager
    {
-      try
+      RPCManager delegate;
+      Map<Class<? extends CacheCommand>, Boolean> log = new HashMap<Class<? extends CacheCommand>, Boolean>();
+
+      public void disconnect()
       {
-         doTest(false, true);
-         assert false : "Should fail if we don't use out of band messages for non-sync commits";
+         delegate.disconnect();
       }
-      catch (AssertionError expected)
+
+      public void stop()
       {
-         // should fail
+         delegate.stop();
       }
-   }
 
-   private void doTest(final boolean syncCommit, boolean noOutOfBandMessages) throws Exception
-   {
-      c1.getConfiguration().setSyncCommitPhase(syncCommit);
-      c2.getConfiguration().setSyncCommitPhase(syncCommit);
+      public void start()
+      {
+         delegate.start();
+      }
 
-      final CountDownLatch mainThreadCommitLatch = new CountDownLatch(1);
-      final CountDownLatch secondThreadPrepareLatch = new CountDownLatch(1);
-      final Fqn fqn = Fqn.fromString("/a/b/c");
-
-      DelayingRPCManager delayingRPCManager = new DelayingRPCManager(mainThreadCommitLatch, secondThreadPrepareLatch, syncCommit, noOutOfBandMessages);
-      ComponentRegistry cr = TestingUtil.extractComponentRegistry(c1);
-      cr.registerComponent(RPCManager.class.getName(), delayingRPCManager, RPCManager.class);
-
-      c1.start();
-      c2.start();
-
-      TestingUtil.blockUntilViewsReceived(60000, c1, c2);
-
-      TransactionManager tm = c1.getTransactionManager();
-
-      Thread secondTransaction = new Thread("SecondThread")
+      void logCall(CacheCommand command, boolean oob)
       {
-         public void run()
+         if (command instanceof ReplicateCommand)
          {
-            // wait until thread1 finishes the prepare.
-            try
-            {
-               secondThreadPrepareLatch.await();
-            }
-            catch (InterruptedException e)
-            {
-               // do nothing
-            }
-
-            try
-            {
-               // now replicate a put on the SAME node so there is lock contention
-               TransactionManager tm = c1.getTransactionManager();
-               if (syncCommit)
-                  mainThreadCommitLatch.countDown(); // we need to release the main thread commit latch first otherwise it will deadlock!
-
-               tm.begin();
-               c1.put(fqn, "k", "v2");
-               tm.commit();
-            }
-            catch (Exception e)
-            {
-               e.printStackTrace();
-            }
+            CacheCommand cmd = ((ReplicateCommand) command).getSingleModification();
+            log.put(cmd.getClass(), oob);
          }
-      };
-      secondTransaction.start();
+      }
 
+      public List<Object> callRemoteMethods(List<Address> recipients, CacheCommand cacheCommand, int mode, boolean excludeSelf, long timeout, RspFilter responseFilter, boolean useOutOfBandMessage) throws Exception
+      {
+         logCall(cacheCommand, useOutOfBandMessage);
+         return delegate.callRemoteMethods(recipients, cacheCommand, mode, excludeSelf, timeout, responseFilter, useOutOfBandMessage);
+      }
 
-      tm.begin();
-      c1.put(fqn, "k", "v");
-      tm.commit();
+      public List<Object> callRemoteMethods(List<Address> recipients, CacheCommand cacheCommand, int mode, boolean excludeSelf, long timeout, boolean useOutOfBandMessage) throws Exception
+      {
+         logCall(cacheCommand, useOutOfBandMessage);
+         return delegate.callRemoteMethods(recipients, cacheCommand, mode, excludeSelf, timeout, useOutOfBandMessage);
+      }
 
-      secondTransaction.join();
+      public List<Object> callRemoteMethods(List<Address> recipients, CacheCommand command, boolean synchronous, boolean excludeSelf, int timeout, boolean useOutOfBandMessage) throws Exception
+      {
+         logCall(command, useOutOfBandMessage);
+         return delegate.callRemoteMethods(recipients, command, synchronous, excludeSelf, timeout, useOutOfBandMessage);
+      }
 
-      // now assert that both transactions have succeeded
-      assert c1.get(fqn, "k").equals("v2");
-      assert c2.get(fqn, "k").equals("v2");
-   }
+      public boolean isCoordinator()
+      {
+         return delegate.isCoordinator();
+      }
 
-   public static class DelayingRPCManager extends RPCManagerImpl
-   {
-      CountDownLatch mainThreadLatch, secondThreadLatch;
-      boolean syncCommit;
-      boolean noOOBMessages = false;
+      public Address getCoordinator()
+      {
+         return delegate.getCoordinator();
+      }
 
-      public DelayingRPCManager(CountDownLatch latch1, CountDownLatch latch2, boolean syncCommit, boolean noOOBMessages)
+      public Address getLocalAddress()
       {
-         mainThreadLatch = latch1;
-         secondThreadLatch = latch2;
-         this.syncCommit = syncCommit;
-         this.noOOBMessages = noOOBMessages;
+         return delegate.getLocalAddress();
       }
 
-      @Override
-      public List<Object> callRemoteMethods(final List<Address> recipients, final CacheCommand command, final int mode, final boolean excludeSelf, final long timeout, final RspFilter responseFilter, final boolean oob) throws Exception
+      public List<Address> getMembers()
       {
-         if (isPrepareMethod(command) && Thread.currentThread().getName().equals("SecondThread"))
-         {
-            if (!syncCommit) mainThreadLatch.countDown();
-         }
-         else if (isCommitMethod(command) && !Thread.currentThread().getName().equals("SecondThread"))
-         {
-            Thread th = new Thread()
-            {
-               public void run()
-               {
-                  try
-                  {
-                     secondThreadLatch.countDown(); // let the secondTransaction start it's prepapre
-                     mainThreadLatch.await(); // and block arbitrarily until the secondTransaction informs us to proceed
-                     Thread.sleep(1000);
-                     DelayingRPCManager.super.callRemoteMethods(recipients, command, mode, excludeSelf, timeout, responseFilter, !noOOBMessages && oob);
-                  }
-                  catch (Exception e)
-                  {
-                     e.printStackTrace();
-                  }
-               }
-            };
-            th.start();
-            if (syncCommit) th.join();
+         return delegate.getMembers();
+      }
 
-            return Collections.emptyList();
-         }
-         return super.callRemoteMethods(recipients, command, mode, excludeSelf, timeout, responseFilter, !noOOBMessages && oob);
+      public void fetchPartialState(List<Address> sources, Fqn sourceTarget, Fqn integrationTarget) throws Exception
+      {
+         delegate.fetchPartialState(sources, sourceTarget, integrationTarget);
       }
 
-      private boolean isCommitMethod(CacheCommand call)
+      public void fetchPartialState(List<Address> sources, Fqn subtree) throws Exception
       {
-         if (call instanceof CommitCommand) return true;
-         if (call instanceof ReplicateCommand)
-         {
-            ReplicateCommand rCom = (ReplicateCommand) call;
-            if (rCom.isSingleCommand())
-            {
-               return rCom.getSingleModification() instanceof CommitCommand;
-            }
-         }
-         return false;
+         delegate.fetchPartialState(sources, subtree);
       }
 
-      private boolean isPrepareMethod(CacheCommand call)
+      public Channel getChannel()
       {
-         if (call instanceof PrepareCommand) return true;
-         if (call instanceof ReplicateCommand)
-         {
-            ReplicateCommand rCom = (ReplicateCommand) call;
-            if (rCom.isSingleCommand())
-            {
-               return rCom.getSingleModification() instanceof PrepareCommand;
-            }
-         }
-         return false;
+         return delegate.getChannel();
       }
    }
 }




More information about the jbosscache-commits mailing list