[jbosscache-commits] JBoss Cache SVN: r5660 - in core/trunk/src/test/java/org/jboss/cache: transaction and 1 other directory.
jbosscache-commits at lists.jboss.org
jbosscache-commits at lists.jboss.org
Thu Apr 24 08:05:20 EDT 2008
Author: manik.surtani at jboss.com
Date: 2008-04-24 08:05:19 -0400 (Thu, 24 Apr 2008)
New Revision: 5660
Modified:
core/trunk/src/test/java/org/jboss/cache/misc/TestingUtil.java
core/trunk/src/test/java/org/jboss/cache/transaction/PrepareCommitContentionTest.java
Log:
Added test
Modified: core/trunk/src/test/java/org/jboss/cache/misc/TestingUtil.java
===================================================================
--- core/trunk/src/test/java/org/jboss/cache/misc/TestingUtil.java 2008-04-24 11:10:33 UTC (rev 5659)
+++ core/trunk/src/test/java/org/jboss/cache/misc/TestingUtil.java 2008-04-24 12:05:19 UTC (rev 5660)
@@ -20,6 +20,7 @@
import org.jboss.cache.invocation.InterceptorChain;
import org.jboss.cache.loader.CacheLoader;
import org.jboss.cache.loader.CacheLoaderManager;
+import org.jgroups.JChannel;
import java.io.File;
import java.lang.reflect.Field;
@@ -559,4 +560,30 @@
{
return (CommandsFactory) extractField(cache, "commandsFactory");
}
+
+ public static String getJGroupsAttribute(Cache cache, String protocol, String attribute)
+ {
+ String s = ((JChannel) ((CacheSPI) cache).getRPCManager().getChannel()).getProperties();
+ String[] protocols = s.split(":");
+ String attribs = null;
+ for (String p : protocols)
+ {
+ boolean hasAttribs = p.contains("(");
+ String name = hasAttribs ? p.substring(0, p.indexOf('(')) : p;
+ attribs = hasAttribs ? p.substring(p.indexOf('(') + 1, p.length() - 1) : null;
+
+ if (name.equalsIgnoreCase(protocol)) break;
+ }
+
+ if (attribs != null)
+ {
+ String[] attrArray = attribs.split(";");
+ for (String a : attrArray)
+ {
+ String[] kvPairs = a.split("=");
+ if (kvPairs[0].equalsIgnoreCase(attribute)) return kvPairs[1];
+ }
+ }
+ return null;
+ }
}
Modified: core/trunk/src/test/java/org/jboss/cache/transaction/PrepareCommitContentionTest.java
===================================================================
--- core/trunk/src/test/java/org/jboss/cache/transaction/PrepareCommitContentionTest.java 2008-04-24 11:10:33 UTC (rev 5659)
+++ core/trunk/src/test/java/org/jboss/cache/transaction/PrepareCommitContentionTest.java 2008-04-24 12:05:19 UTC (rev 5660)
@@ -4,24 +4,24 @@
import org.jboss.cache.DefaultCacheFactory;
import org.jboss.cache.Fqn;
import org.jboss.cache.RPCManager;
-import org.jboss.cache.RPCManagerImpl;
import org.jboss.cache.commands.CacheCommand;
import org.jboss.cache.commands.remote.ReplicateCommand;
import org.jboss.cache.commands.tx.CommitCommand;
import org.jboss.cache.commands.tx.PrepareCommand;
-import org.jboss.cache.config.Configuration;
+import static org.jboss.cache.config.Configuration.CacheMode.REPL_SYNC;
import org.jboss.cache.factories.ComponentRegistry;
+import org.jboss.cache.factories.UnitTestCacheConfigurationFactory;
import org.jboss.cache.misc.TestingUtil;
import org.jgroups.Address;
+import org.jgroups.Channel;
import org.jgroups.blocks.RspFilter;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
-import javax.transaction.TransactionManager;
-import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
-import java.util.concurrent.CountDownLatch;
+import java.util.Map;
/**
* This is to test the scenario described in http://jira.jboss.org/jira/browse/JBCACHE-1270
@@ -40,186 +40,118 @@
@Test(groups = "functional")
public class PrepareCommitContentionTest
{
- CacheSPI<Object, Object> c1, c2;
+ CacheSPI<Object, Object> c1;
@BeforeMethod
public void setUp() throws CloneNotSupportedException
{
- c1 = (CacheSPI<Object, Object>) new DefaultCacheFactory<Object, Object>().createCache(false);
- c1.getConfiguration().setCacheMode(Configuration.CacheMode.REPL_SYNC);
- c1.getConfiguration().setTransactionManagerLookupClass(DummyTransactionManagerLookup.class.getName());
- c1.getConfiguration().setLockAcquisitionTimeout(5000);
- c2 = (CacheSPI<Object, Object>) new DefaultCacheFactory<Object, Object>().createCache(c1.getConfiguration().clone(), false);
+ c1 = (CacheSPI<Object, Object>) new DefaultCacheFactory<Object, Object>().createCache(UnitTestCacheConfigurationFactory.createConfiguration(REPL_SYNC));
}
@AfterMethod
public void tearDown()
{
- TestingUtil.killCaches(c1, c2);
+ TestingUtil.killCaches(c1);
}
- public void testWithSyncCommitPhase() throws Exception
+ public void testOOBFlag() throws Exception
{
- doTest(true, false);
- }
+ DelegatingRPCManager delegatingRPCManager = new DelegatingRPCManager();
+ ComponentRegistry cr = TestingUtil.extractComponentRegistry(c1);
+ RPCManager origRpcManager = cr.getComponent(RPCManager.class);
+ delegatingRPCManager.delegate = origRpcManager;
+ cr.registerComponent(delegatingRPCManager, RPCManager.class);
- public void testWithDefautCommitPhase() throws Exception
- {
- doTest(false, false);
+ c1.getTransactionManager().begin();
+ c1.put("/a", "k", "v");
+ c1.getTransactionManager().commit();
+
+ // now check what we have gathered:
+
+ assert delegatingRPCManager.log.get(CommitCommand.class) : "Commit commands should be sent using OOB!";
+ assert !delegatingRPCManager.log.get(PrepareCommand.class) : "Prepare commands should NOT be sent using OOB!";
}
- public void testControl() throws Exception
+ private static class DelegatingRPCManager implements RPCManager
{
- try
+ RPCManager delegate;
+ Map<Class<? extends CacheCommand>, Boolean> log = new HashMap<Class<? extends CacheCommand>, Boolean>();
+
+ public void disconnect()
{
- doTest(false, true);
- assert false : "Should fail if we don't use out of band messages for non-sync commits";
+ delegate.disconnect();
}
- catch (AssertionError expected)
+
+ public void stop()
{
- // should fail
+ delegate.stop();
}
- }
- private void doTest(final boolean syncCommit, boolean noOutOfBandMessages) throws Exception
- {
- c1.getConfiguration().setSyncCommitPhase(syncCommit);
- c2.getConfiguration().setSyncCommitPhase(syncCommit);
+ public void start()
+ {
+ delegate.start();
+ }
- final CountDownLatch mainThreadCommitLatch = new CountDownLatch(1);
- final CountDownLatch secondThreadPrepareLatch = new CountDownLatch(1);
- final Fqn fqn = Fqn.fromString("/a/b/c");
-
- DelayingRPCManager delayingRPCManager = new DelayingRPCManager(mainThreadCommitLatch, secondThreadPrepareLatch, syncCommit, noOutOfBandMessages);
- ComponentRegistry cr = TestingUtil.extractComponentRegistry(c1);
- cr.registerComponent(RPCManager.class.getName(), delayingRPCManager, RPCManager.class);
-
- c1.start();
- c2.start();
-
- TestingUtil.blockUntilViewsReceived(60000, c1, c2);
-
- TransactionManager tm = c1.getTransactionManager();
-
- Thread secondTransaction = new Thread("SecondThread")
+ void logCall(CacheCommand command, boolean oob)
{
- public void run()
+ if (command instanceof ReplicateCommand)
{
- // wait until thread1 finishes the prepare.
- try
- {
- secondThreadPrepareLatch.await();
- }
- catch (InterruptedException e)
- {
- // do nothing
- }
-
- try
- {
- // now replicate a put on the SAME node so there is lock contention
- TransactionManager tm = c1.getTransactionManager();
- if (syncCommit)
- mainThreadCommitLatch.countDown(); // we need to release the main thread commit latch first otherwise it will deadlock!
-
- tm.begin();
- c1.put(fqn, "k", "v2");
- tm.commit();
- }
- catch (Exception e)
- {
- e.printStackTrace();
- }
+ CacheCommand cmd = ((ReplicateCommand) command).getSingleModification();
+ log.put(cmd.getClass(), oob);
}
- };
- secondTransaction.start();
+ }
+ public List<Object> callRemoteMethods(List<Address> recipients, CacheCommand cacheCommand, int mode, boolean excludeSelf, long timeout, RspFilter responseFilter, boolean useOutOfBandMessage) throws Exception
+ {
+ logCall(cacheCommand, useOutOfBandMessage);
+ return delegate.callRemoteMethods(recipients, cacheCommand, mode, excludeSelf, timeout, responseFilter, useOutOfBandMessage);
+ }
- tm.begin();
- c1.put(fqn, "k", "v");
- tm.commit();
+ public List<Object> callRemoteMethods(List<Address> recipients, CacheCommand cacheCommand, int mode, boolean excludeSelf, long timeout, boolean useOutOfBandMessage) throws Exception
+ {
+ logCall(cacheCommand, useOutOfBandMessage);
+ return delegate.callRemoteMethods(recipients, cacheCommand, mode, excludeSelf, timeout, useOutOfBandMessage);
+ }
- secondTransaction.join();
+ public List<Object> callRemoteMethods(List<Address> recipients, CacheCommand command, boolean synchronous, boolean excludeSelf, int timeout, boolean useOutOfBandMessage) throws Exception
+ {
+ logCall(command, useOutOfBandMessage);
+ return delegate.callRemoteMethods(recipients, command, synchronous, excludeSelf, timeout, useOutOfBandMessage);
+ }
- // now assert that both transactions have succeeded
- assert c1.get(fqn, "k").equals("v2");
- assert c2.get(fqn, "k").equals("v2");
- }
+ public boolean isCoordinator()
+ {
+ return delegate.isCoordinator();
+ }
- public static class DelayingRPCManager extends RPCManagerImpl
- {
- CountDownLatch mainThreadLatch, secondThreadLatch;
- boolean syncCommit;
- boolean noOOBMessages = false;
+ public Address getCoordinator()
+ {
+ return delegate.getCoordinator();
+ }
- public DelayingRPCManager(CountDownLatch latch1, CountDownLatch latch2, boolean syncCommit, boolean noOOBMessages)
+ public Address getLocalAddress()
{
- mainThreadLatch = latch1;
- secondThreadLatch = latch2;
- this.syncCommit = syncCommit;
- this.noOOBMessages = noOOBMessages;
+ return delegate.getLocalAddress();
}
- @Override
- public List<Object> callRemoteMethods(final List<Address> recipients, final CacheCommand command, final int mode, final boolean excludeSelf, final long timeout, final RspFilter responseFilter, final boolean oob) throws Exception
+ public List<Address> getMembers()
{
- if (isPrepareMethod(command) && Thread.currentThread().getName().equals("SecondThread"))
- {
- if (!syncCommit) mainThreadLatch.countDown();
- }
- else if (isCommitMethod(command) && !Thread.currentThread().getName().equals("SecondThread"))
- {
- Thread th = new Thread()
- {
- public void run()
- {
- try
- {
- secondThreadLatch.countDown(); // let the secondTransaction start it's prepapre
- mainThreadLatch.await(); // and block arbitrarily until the secondTransaction informs us to proceed
- Thread.sleep(1000);
- DelayingRPCManager.super.callRemoteMethods(recipients, command, mode, excludeSelf, timeout, responseFilter, !noOOBMessages && oob);
- }
- catch (Exception e)
- {
- e.printStackTrace();
- }
- }
- };
- th.start();
- if (syncCommit) th.join();
+ return delegate.getMembers();
+ }
- return Collections.emptyList();
- }
- return super.callRemoteMethods(recipients, command, mode, excludeSelf, timeout, responseFilter, !noOOBMessages && oob);
+ public void fetchPartialState(List<Address> sources, Fqn sourceTarget, Fqn integrationTarget) throws Exception
+ {
+ delegate.fetchPartialState(sources, sourceTarget, integrationTarget);
}
- private boolean isCommitMethod(CacheCommand call)
+ public void fetchPartialState(List<Address> sources, Fqn subtree) throws Exception
{
- if (call instanceof CommitCommand) return true;
- if (call instanceof ReplicateCommand)
- {
- ReplicateCommand rCom = (ReplicateCommand) call;
- if (rCom.isSingleCommand())
- {
- return rCom.getSingleModification() instanceof CommitCommand;
- }
- }
- return false;
+ delegate.fetchPartialState(sources, subtree);
}
- private boolean isPrepareMethod(CacheCommand call)
+ public Channel getChannel()
{
- if (call instanceof PrepareCommand) return true;
- if (call instanceof ReplicateCommand)
- {
- ReplicateCommand rCom = (ReplicateCommand) call;
- if (rCom.isSingleCommand())
- {
- return rCom.getSingleModification() instanceof PrepareCommand;
- }
- }
- return false;
+ return delegate.getChannel();
}
}
}
More information about the jbosscache-commits
mailing list