[jboss-user] [JBoss Cache Users] - Current transation can not see modifications done by committ

veklov do-not-reply at jboss.com
Wed Dec 2 08:51:28 EST 2009


Current transation can not see modifications done by committed transactions while isolation level is READ_COMMITTED.

Cache version is 3.2.1.GA. Can be replicated when cache mode is REPL_SYNC or INVALIDATION_SYNC (the other are not tested). Please see below test cases for details.

Note: Test cases use DummyTransactionManager and behavior of second test case for REPL_SYNC mode depends on whether calls are done in context of JTA transaction or not, so in this mode the issue can be caused by a bug in DummyTransactionManager.

INVALIDATION_SYNC:
import java.util.ArrayList;
  | import java.util.Collections;
  | import java.util.List;
  | import java.util.concurrent.CyclicBarrier;
  | import java.util.concurrent.TimeUnit;
  | import javax.transaction.TransactionManager;
  | 
  | import junit.framework.TestCase;
  | import org.jboss.cache.Cache;
  | import org.jboss.cache.CacheFactory;
  | import org.jboss.cache.CacheSPI;
  | import org.jboss.cache.DefaultCacheFactory;
  | import org.jboss.cache.config.Configuration;
  | import org.jboss.cache.lock.IsolationLevel;
  | 
  | public class TestInvalidationSyncReadCommited extends TestCase {
  | 
  |     private static final String NODE_FQN = "/node";
  |     private static final String KEY = "key";
  |     private static final String VALUE1 = "value1";
  |     private static final String VALUE2 = "value2";
  | 
  |     private static Cache createCache() {
  |         final CacheFactory cf = new DefaultCacheFactory();
  |         final Configuration configuration = new Configuration();
  |         configuration.setCacheMode(Configuration.CacheMode.INVALIDATION_SYNC);
  |         configuration.setTransactionManagerLookupClass("org.jboss.cache.transaction.DummyTransactionManagerLookup");
  |         configuration.setIsolationLevel(IsolationLevel.READ_COMMITTED);
  |         configuration.setLockAcquisitionTimeout(10000L);
  |         return cf.createCache(configuration, true);
  |     }
  | 
  |     public void testPut() throws Exception {
  |         final Cache cache1 = createCache();
  |         final Cache cache2 = createCache();
  |         assertEquals("Members count", 2, cache1.getMembers().size());
  |         assertEquals("Members count", 2, cache2.getMembers().size());
  | 
  |         final CyclicBarrier barrier = new CyclicBarrier(2);
  |         final List exceptions = Collections.synchronizedList(new ArrayList());
  | 
  |         cache1.put(NODE_FQN, KEY, VALUE1);
  | 
  |         final Thread thread1 = new Thread() {
  |             public void run() {
  |                 try {
  |                     getTransactionManager(cache1).begin();
  |                     assertEquals("Must be in cache", VALUE1, cache1.get(NODE_FQN, KEY));
  |                     await(barrier);
  |                     await(barrier);
  | //                    assertEquals("For some reason it is in cache", VALUE1, cache1.get(NODE_FQN, KEY));
  |                     assertNull("Must be invalidated before commit", cache1.get(NODE_FQN, KEY));
  |                     getTransactionManager(cache1).commit();
  |                     assertNull("Must be invalidated", cache1.get(NODE_FQN, KEY));
  |                 } catch (Throwable e) {
  |                     exceptions.add(e);
  |                 }
  |             }
  |         };
  |         final Thread thread2 = new Thread() {
  |             public void run() {
  |                 try {
  |                     await(barrier);
  |                     getTransactionManager(cache2).begin();
  |                     cache2.put(NODE_FQN, KEY, VALUE2);
  |                     getTransactionManager(cache2).commit();
  |                     await(barrier);
  |                 } catch (Throwable e) {
  |                     exceptions.add(e);
  |                 }
  |             }
  |         };
  | 
  |         thread1.start();
  |         thread2.start();
  |         thread1.join();
  |         thread2.join();
  | 
  |         cache1.stop();
  |         cache2.stop();
  | 
  |         if (!exceptions.isEmpty()) {
  |             fail(exceptions.toString());
  |         }
  |     }
  | 
  |     public void testRemoveNode() throws Exception {
  |         final Cache cache1 = createCache();
  |         final Cache cache2 = createCache();
  |         assertEquals("Members count", 2, cache1.getMembers().size());
  |         assertEquals("Members count", 2, cache2.getMembers().size());
  | 
  |         final CyclicBarrier barrier = new CyclicBarrier(2);
  |         final List exceptions = Collections.synchronizedList(new ArrayList());
  | 
  |         cache1.put(NODE_FQN, KEY, VALUE1);
  | 
  |         final Thread thread1 = new Thread() {
  |             public void run() {
  |                 try {
  |                     getTransactionManager(cache1).begin();
  |                     assertEquals("Must be in cache", VALUE1, cache1.get(NODE_FQN, KEY));
  |                     await(barrier);
  |                     await(barrier);
  | //                    assertEquals("For some reason it is in cache", VALUE1, cache1.get(NODE_FQN, KEY));
  |                     assertNull("Must be invalidated before commit", cache1.get(NODE_FQN, KEY));
  |                     getTransactionManager(cache1).commit();
  |                     assertNull("Must be invalidated", cache1.get(NODE_FQN, KEY));
  |                 } catch (Throwable e) {
  |                     exceptions.add(e);
  |                 }
  |             }
  |         };
  |         final Thread thread2 = new Thread() {
  |             public void run() {
  |                 try {
  |                     await(barrier);
  |                     getTransactionManager(cache2).begin();
  |                     cache2.removeNode(NODE_FQN);
  |                     getTransactionManager(cache2).commit();
  |                     await(barrier);
  |                 } catch (Throwable e) {
  |                     exceptions.add(e);
  |                 }
  |             }
  |         };
  | 
  |         thread1.start();
  |         thread2.start();
  |         thread1.join();
  |         thread2.join();
  | 
  |         cache1.stop();
  |         cache2.stop();
  | 
  |         if (!exceptions.isEmpty()) {
  |             fail(exceptions.toString());
  |         }
  |     }
  | 
  |     private static TransactionManager getTransactionManager(final Cache cache) {
  |         return ((CacheSPI) cache).getTransactionManager();
  |     }
  | 
  |     private static void await(final CyclicBarrier barrier) throws Exception {
  |         barrier.await(20, TimeUnit.SECONDS);
  |     }
  | }
  | 

REPL_SYNC (This case can be fixed by commenting out begin/commit calls in thread2, e.g. behavior depends on whether calls are done in context of JTA transaction or not. Is that an issue with DummyTransactionManager or with cache itself?):
import java.util.ArrayList;
  | import java.util.Collections;
  | import java.util.List;
  | import java.util.concurrent.BrokenBarrierException;
  | import java.util.concurrent.CyclicBarrier;
  | import java.util.concurrent.TimeUnit;
  | import java.util.concurrent.TimeoutException;
  | import javax.transaction.TransactionManager;
  | 
  | import junit.framework.TestCase;
  | import org.jboss.cache.Cache;
  | import org.jboss.cache.CacheFactory;
  | import org.jboss.cache.CacheSPI;
  | import org.jboss.cache.DefaultCacheFactory;
  | import org.jboss.cache.config.Configuration;
  | import org.jboss.cache.lock.IsolationLevel;
  | 
  | public class TestReplSyncReadCommitted2 extends TestCase {
  | 
  |     private static final String NODE_FQN = "/node";
  |     private static final String KEY = "key";
  |     private static final String VALUE1 = "value1";
  |     private static final String VALUE2 = "value2";
  | 
  |     private static Cache createCache() {
  |         final CacheFactory cf = new DefaultCacheFactory();
  |         final Configuration configuration = new Configuration();
  |         configuration.setCacheMode(Configuration.CacheMode.REPL_SYNC);
  |         configuration.setTransactionManagerLookupClass("org.jboss.cache.transaction.DummyTransactionManagerLookup");
  |         configuration.setIsolationLevel(IsolationLevel.READ_COMMITTED);
  |         configuration.setLockAcquisitionTimeout(10000L);
  |         return cf.createCache(configuration, true);
  |     }
  | 
  |     public void testRemoveNodeTwoCaches() throws InterruptedException {
  |         final Cache cache1 = createCache();
  |         final Cache cache2 = createCache();
  |         assertEquals("Members count", 2, cache1.getMembers().size());
  |         assertEquals("Members count", 2, cache2.getMembers().size());
  | 
  |         final CyclicBarrier barrier = new CyclicBarrier(2);
  |         final List exceptions = Collections.synchronizedList(new ArrayList());
  | 
  |         final Thread thread1 = new Thread() {
  |             public void run() {
  |                 try {
  |                     await(barrier);
  |                     await(barrier);
  |                     getTransactionManager(cache1).begin();
  |                     assertEquals("Must be replicated", VALUE1, cache1.get(NODE_FQN, KEY));
  |                     await(barrier);
  |                     await(barrier);
  |                     assertNull("Must be removed", cache1.get(NODE_FQN, KEY));
  |                     getTransactionManager(cache1).commit();
  |                 } catch (Throwable e) {
  |                     exceptions.add(e);
  |                 }
  |             }
  |         };
  |         final Thread thread2 = new Thread() {
  |             public void run() {
  |                 try {
  |                     await(barrier);
  |                     getTransactionManager(cache2).begin();
  |                     cache2.put(NODE_FQN, KEY, VALUE1);
  |                     getTransactionManager(cache2).commit();
  |                     await(barrier);
  |                     await(barrier);
  |                     getTransactionManager(cache2).begin();
  |                     cache2.removeNode(NODE_FQN);
  |                     getTransactionManager(cache2).commit();
  |                     await(barrier);
  |                 } catch (Throwable e) {
  |                     exceptions.add(e);
  |                 }
  |             }
  |         };
  | 
  |         thread1.start();
  |         thread2.start();
  |         thread1.join();
  |         thread2.join();
  | 
  |         cache1.stop();
  |         cache2.stop();
  | 
  |         if (!exceptions.isEmpty()) {
  |             fail(exceptions.toString());
  |         }
  |     }
  | 
  |     public void testPutTwoCaches() throws InterruptedException {
  |         final Cache cache1 = createCache();
  |         final Cache cache2 = createCache();
  |         assertEquals("Members count", 2, cache1.getMembers().size());
  |         assertEquals("Members count", 2, cache2.getMembers().size());
  | 
  |         final CyclicBarrier barrier = new CyclicBarrier(2);
  |         final List exceptions = Collections.synchronizedList(new ArrayList());
  | 
  |         final Thread thread1 = new Thread() {
  |             public void run() {
  |                 try {
  |                     await(barrier);
  |                     await(barrier);
  |                     getTransactionManager(cache1).begin();
  |                     assertEquals("Must be replicated", VALUE1, cache1.get(NODE_FQN, KEY));
  |                     await(barrier);
  |                     await(barrier);
  |                     assertEquals("Must be replicated", VALUE2, cache1.get(NODE_FQN, KEY));
  |                     getTransactionManager(cache1).commit();
  |                 } catch (Throwable e) {
  |                     exceptions.add(e);
  |                 }
  |             }
  |         };
  |         final Thread thread2 = new Thread() {
  |             public void run() {
  |                 try {
  |                     await(barrier);
  |                     getTransactionManager(cache2).begin();
  |                     cache2.put(NODE_FQN, KEY, VALUE1);
  |                     getTransactionManager(cache2).commit();
  |                     await(barrier);
  |                     await(barrier);
  |                     getTransactionManager(cache2).begin();
  |                     cache2.put(NODE_FQN, KEY, VALUE2);
  |                     getTransactionManager(cache2).commit();
  |                     await(barrier);
  |                 } catch (Throwable e) {
  |                     exceptions.add(e);
  |                 }
  |             }
  |         };
  | 
  |         thread1.start();
  |         thread2.start();
  |         thread1.join();
  |         thread2.join();
  | 
  |         cache1.stop();
  |         cache2.stop();
  | 
  |         if (!exceptions.isEmpty()) {
  |             fail(exceptions.toString());
  |         }
  |     }
  | 
  |     private static TransactionManager getTransactionManager(final Cache cache) {
  |         return ((CacheSPI) cache).getTransactionManager();
  |     }
  | 
  |     private static void await(final CyclicBarrier barrier)
  |             throws InterruptedException, BrokenBarrierException, TimeoutException {
  |         barrier.await(20, TimeUnit.SECONDS);
  |     }
  | }
  | 

View the original post : http://www.jboss.org/index.html?module=bb&op=viewtopic&p=4268540#4268540

Reply to the post : http://www.jboss.org/index.html?module=bb&op=posting&mode=reply&p=4268540



More information about the jboss-user mailing list