[jboss-user] [JBoss Cache Users] - Current transation can not see modifications done by committ
veklov
do-not-reply at jboss.com
Wed Dec 2 08:51:28 EST 2009
Current transation can not see modifications done by committed transactions while isolation level is READ_COMMITTED.
Cache version is 3.2.1.GA. Can be replicated when cache mode is REPL_SYNC or INVALIDATION_SYNC (the other are not tested). Please see below test cases for details.
Note: Test cases use DummyTransactionManager and behavior of second test case for REPL_SYNC mode depends on whether calls are done in context of JTA transaction or not, so in this mode the issue can be caused by a bug in DummyTransactionManager.
INVALIDATION_SYNC:
import java.util.ArrayList;
| import java.util.Collections;
| import java.util.List;
| import java.util.concurrent.CyclicBarrier;
| import java.util.concurrent.TimeUnit;
| import javax.transaction.TransactionManager;
|
| import junit.framework.TestCase;
| import org.jboss.cache.Cache;
| import org.jboss.cache.CacheFactory;
| import org.jboss.cache.CacheSPI;
| import org.jboss.cache.DefaultCacheFactory;
| import org.jboss.cache.config.Configuration;
| import org.jboss.cache.lock.IsolationLevel;
|
| public class TestInvalidationSyncReadCommited extends TestCase {
|
| private static final String NODE_FQN = "/node";
| private static final String KEY = "key";
| private static final String VALUE1 = "value1";
| private static final String VALUE2 = "value2";
|
| private static Cache createCache() {
| final CacheFactory cf = new DefaultCacheFactory();
| final Configuration configuration = new Configuration();
| configuration.setCacheMode(Configuration.CacheMode.INVALIDATION_SYNC);
| configuration.setTransactionManagerLookupClass("org.jboss.cache.transaction.DummyTransactionManagerLookup");
| configuration.setIsolationLevel(IsolationLevel.READ_COMMITTED);
| configuration.setLockAcquisitionTimeout(10000L);
| return cf.createCache(configuration, true);
| }
|
| public void testPut() throws Exception {
| final Cache cache1 = createCache();
| final Cache cache2 = createCache();
| assertEquals("Members count", 2, cache1.getMembers().size());
| assertEquals("Members count", 2, cache2.getMembers().size());
|
| final CyclicBarrier barrier = new CyclicBarrier(2);
| final List exceptions = Collections.synchronizedList(new ArrayList());
|
| cache1.put(NODE_FQN, KEY, VALUE1);
|
| final Thread thread1 = new Thread() {
| public void run() {
| try {
| getTransactionManager(cache1).begin();
| assertEquals("Must be in cache", VALUE1, cache1.get(NODE_FQN, KEY));
| await(barrier);
| await(barrier);
| // assertEquals("For some reason it is in cache", VALUE1, cache1.get(NODE_FQN, KEY));
| assertNull("Must be invalidated before commit", cache1.get(NODE_FQN, KEY));
| getTransactionManager(cache1).commit();
| assertNull("Must be invalidated", cache1.get(NODE_FQN, KEY));
| } catch (Throwable e) {
| exceptions.add(e);
| }
| }
| };
| final Thread thread2 = new Thread() {
| public void run() {
| try {
| await(barrier);
| getTransactionManager(cache2).begin();
| cache2.put(NODE_FQN, KEY, VALUE2);
| getTransactionManager(cache2).commit();
| await(barrier);
| } catch (Throwable e) {
| exceptions.add(e);
| }
| }
| };
|
| thread1.start();
| thread2.start();
| thread1.join();
| thread2.join();
|
| cache1.stop();
| cache2.stop();
|
| if (!exceptions.isEmpty()) {
| fail(exceptions.toString());
| }
| }
|
| public void testRemoveNode() throws Exception {
| final Cache cache1 = createCache();
| final Cache cache2 = createCache();
| assertEquals("Members count", 2, cache1.getMembers().size());
| assertEquals("Members count", 2, cache2.getMembers().size());
|
| final CyclicBarrier barrier = new CyclicBarrier(2);
| final List exceptions = Collections.synchronizedList(new ArrayList());
|
| cache1.put(NODE_FQN, KEY, VALUE1);
|
| final Thread thread1 = new Thread() {
| public void run() {
| try {
| getTransactionManager(cache1).begin();
| assertEquals("Must be in cache", VALUE1, cache1.get(NODE_FQN, KEY));
| await(barrier);
| await(barrier);
| // assertEquals("For some reason it is in cache", VALUE1, cache1.get(NODE_FQN, KEY));
| assertNull("Must be invalidated before commit", cache1.get(NODE_FQN, KEY));
| getTransactionManager(cache1).commit();
| assertNull("Must be invalidated", cache1.get(NODE_FQN, KEY));
| } catch (Throwable e) {
| exceptions.add(e);
| }
| }
| };
| final Thread thread2 = new Thread() {
| public void run() {
| try {
| await(barrier);
| getTransactionManager(cache2).begin();
| cache2.removeNode(NODE_FQN);
| getTransactionManager(cache2).commit();
| await(barrier);
| } catch (Throwable e) {
| exceptions.add(e);
| }
| }
| };
|
| thread1.start();
| thread2.start();
| thread1.join();
| thread2.join();
|
| cache1.stop();
| cache2.stop();
|
| if (!exceptions.isEmpty()) {
| fail(exceptions.toString());
| }
| }
|
| private static TransactionManager getTransactionManager(final Cache cache) {
| return ((CacheSPI) cache).getTransactionManager();
| }
|
| private static void await(final CyclicBarrier barrier) throws Exception {
| barrier.await(20, TimeUnit.SECONDS);
| }
| }
|
REPL_SYNC (This case can be fixed by commenting out begin/commit calls in thread2, e.g. behavior depends on whether calls are done in context of JTA transaction or not. Is that an issue with DummyTransactionManager or with cache itself?):
import java.util.ArrayList;
| import java.util.Collections;
| import java.util.List;
| import java.util.concurrent.BrokenBarrierException;
| import java.util.concurrent.CyclicBarrier;
| import java.util.concurrent.TimeUnit;
| import java.util.concurrent.TimeoutException;
| import javax.transaction.TransactionManager;
|
| import junit.framework.TestCase;
| import org.jboss.cache.Cache;
| import org.jboss.cache.CacheFactory;
| import org.jboss.cache.CacheSPI;
| import org.jboss.cache.DefaultCacheFactory;
| import org.jboss.cache.config.Configuration;
| import org.jboss.cache.lock.IsolationLevel;
|
| public class TestReplSyncReadCommitted2 extends TestCase {
|
| private static final String NODE_FQN = "/node";
| private static final String KEY = "key";
| private static final String VALUE1 = "value1";
| private static final String VALUE2 = "value2";
|
| private static Cache createCache() {
| final CacheFactory cf = new DefaultCacheFactory();
| final Configuration configuration = new Configuration();
| configuration.setCacheMode(Configuration.CacheMode.REPL_SYNC);
| configuration.setTransactionManagerLookupClass("org.jboss.cache.transaction.DummyTransactionManagerLookup");
| configuration.setIsolationLevel(IsolationLevel.READ_COMMITTED);
| configuration.setLockAcquisitionTimeout(10000L);
| return cf.createCache(configuration, true);
| }
|
| public void testRemoveNodeTwoCaches() throws InterruptedException {
| final Cache cache1 = createCache();
| final Cache cache2 = createCache();
| assertEquals("Members count", 2, cache1.getMembers().size());
| assertEquals("Members count", 2, cache2.getMembers().size());
|
| final CyclicBarrier barrier = new CyclicBarrier(2);
| final List exceptions = Collections.synchronizedList(new ArrayList());
|
| final Thread thread1 = new Thread() {
| public void run() {
| try {
| await(barrier);
| await(barrier);
| getTransactionManager(cache1).begin();
| assertEquals("Must be replicated", VALUE1, cache1.get(NODE_FQN, KEY));
| await(barrier);
| await(barrier);
| assertNull("Must be removed", cache1.get(NODE_FQN, KEY));
| getTransactionManager(cache1).commit();
| } catch (Throwable e) {
| exceptions.add(e);
| }
| }
| };
| final Thread thread2 = new Thread() {
| public void run() {
| try {
| await(barrier);
| getTransactionManager(cache2).begin();
| cache2.put(NODE_FQN, KEY, VALUE1);
| getTransactionManager(cache2).commit();
| await(barrier);
| await(barrier);
| getTransactionManager(cache2).begin();
| cache2.removeNode(NODE_FQN);
| getTransactionManager(cache2).commit();
| await(barrier);
| } catch (Throwable e) {
| exceptions.add(e);
| }
| }
| };
|
| thread1.start();
| thread2.start();
| thread1.join();
| thread2.join();
|
| cache1.stop();
| cache2.stop();
|
| if (!exceptions.isEmpty()) {
| fail(exceptions.toString());
| }
| }
|
| public void testPutTwoCaches() throws InterruptedException {
| final Cache cache1 = createCache();
| final Cache cache2 = createCache();
| assertEquals("Members count", 2, cache1.getMembers().size());
| assertEquals("Members count", 2, cache2.getMembers().size());
|
| final CyclicBarrier barrier = new CyclicBarrier(2);
| final List exceptions = Collections.synchronizedList(new ArrayList());
|
| final Thread thread1 = new Thread() {
| public void run() {
| try {
| await(barrier);
| await(barrier);
| getTransactionManager(cache1).begin();
| assertEquals("Must be replicated", VALUE1, cache1.get(NODE_FQN, KEY));
| await(barrier);
| await(barrier);
| assertEquals("Must be replicated", VALUE2, cache1.get(NODE_FQN, KEY));
| getTransactionManager(cache1).commit();
| } catch (Throwable e) {
| exceptions.add(e);
| }
| }
| };
| final Thread thread2 = new Thread() {
| public void run() {
| try {
| await(barrier);
| getTransactionManager(cache2).begin();
| cache2.put(NODE_FQN, KEY, VALUE1);
| getTransactionManager(cache2).commit();
| await(barrier);
| await(barrier);
| getTransactionManager(cache2).begin();
| cache2.put(NODE_FQN, KEY, VALUE2);
| getTransactionManager(cache2).commit();
| await(barrier);
| } catch (Throwable e) {
| exceptions.add(e);
| }
| }
| };
|
| thread1.start();
| thread2.start();
| thread1.join();
| thread2.join();
|
| cache1.stop();
| cache2.stop();
|
| if (!exceptions.isEmpty()) {
| fail(exceptions.toString());
| }
| }
|
| private static TransactionManager getTransactionManager(final Cache cache) {
| return ((CacheSPI) cache).getTransactionManager();
| }
|
| private static void await(final CyclicBarrier barrier)
| throws InterruptedException, BrokenBarrierException, TimeoutException {
| barrier.await(20, TimeUnit.SECONDS);
| }
| }
|
View the original post : http://www.jboss.org/index.html?module=bb&op=viewtopic&p=4268540#4268540
Reply to the post : http://www.jboss.org/index.html?module=bb&op=posting&mode=reply&p=4268540
More information about the jboss-user
mailing list