[jboss-user] [JBossCache] - Re: newbie question - ReplicationException

aditsu do-not-reply at jboss.com
Tue Jan 8 08:06:01 EST 2008


Hi, I decided to come back to this problem and find a better solution than the random sleeps and retries.
We tried using optimistic locking, but it doesn't seem to help at all.
Here's a test program, using 2 replicated cache instances and writing to the same node:


  | package CacheRepl;
  | 
  | import java.util.concurrent.atomic.AtomicInteger;
  | 
  | import javax.transaction.UserTransaction;
  | 
  | import org.apache.log4j.Logger;
  | import org.jboss.cache.Cache;
  | import org.jboss.cache.DefaultCacheFactory;
  | import org.jboss.cache.Fqn;
  | import org.jboss.cache.transaction.DummyTransactionManager;
  | import org.jboss.cache.transaction.DummyUserTransaction;
  | 
  | public class CacheThread extends Thread {
  | 	private static final Logger LOG = Logger.getLogger("test");
  | 	private static final int THREADS = 2;
  |     private static final int REPEAT = 20;
  |     private static final int SLEEP_TIME = 10;
  |     
  |     protected int threadId;
  | 	protected final Cache<Object, Object> cache;
  | 	
  | 	public CacheThread(final int threadId) {
  | 		this.threadId = threadId;
  | 		cache = DefaultCacheFactory.getInstance().createCache("replSync-service.xml", true);
  |     }
  | 	
  |     private boolean doTransaction() {
  |     	final UserTransaction tx = new DummyUserTransaction(DummyTransactionManager.getInstance());
  | 		try {
  | 			tx.begin();
  | 			cache.put(new Fqn<Object>("node"), "key", "value" + threadId);
  | 			tx.commit();
  | 		    return true;
  | 		} catch (Exception e) {
  | 			LOG.error("transaction failed", e);
  | 			try {
  | 				tx.rollback();
  | 			} catch (Exception e1) {
  | 				LOG.warn("rollback failed", e1);
  | 			}
  | 			return false;
  | 		}
  | 	}
  | 	
  | 	@Override
  | 	public void run() {
  | 		int failed = 0;
  | 		for (int i = 0; i < REPEAT; ++i) {
  | 			try {
  | 				Thread.sleep(SLEEP_TIME);
  | 			} catch (Exception e) {
  | 				e.printStackTrace();
  | 			}
  | 			if (!doTransaction()) {
  | 				failed++;
  | 			}
  | 		}
  | 		System.out.println("Thread" + threadId + ": " + failed + " failures");
  | 	}
  | 	
  | 	private void close() {
  | 		cache.stop();
  | 	}
  | 	
  | 	public static void main(String[] args) throws InterruptedException {
  | 		final CacheThread[] threads = new CacheThread[THREADS];
  | 		for (int t = 0; t < THREADS; t++) {
  | 			threads[t] = new CacheThread(t);
  | 		}
  | 		for (int t = 0; t < THREADS; t++) {
  | 			threads[t].start();
  | 		}
  | 		System.out.println("started");
  | 		for (int t = 0; t < THREADS; t++) {
  | 			threads[t].join();
  | 		}
  | 		for (int t = 0; t < THREADS; t++) {
  | 			threads[t].close();
  | 		}
  | 	}
  | }
  | 

and the cache configuration:


  | <server>
  |     <mbean code="org.jboss.cache.jmx.CacheJmxWrapper"
  |     	name="jboss.cache:service=TreeCache">
  | 
  |         <depends>jboss:service=Naming</depends>
  |         <depends>jboss:service=TransactionManager</depends>
  | 
  |       <attribute name="TransactionManagerLookupClass">org.jboss.cache.transaction.DummyTransactionManagerLookup
  |       </attribute>
  |       <!--
  |       	    Node locking scheme:
  |             OPTIMISTIC
  |             PESSIMISTIC (default)
  |       -->
  |       <attribute name="NodeLockingScheme">OPTIMISTIC</attribute>
  |         <!--
  |             Isolation level : SERIALIZABLE
  |                               REPEATABLE_READ (default)
  |                               READ_COMMITTED
  |                               READ_UNCOMMITTED
  |                               NONE
  |         -->
  |         <attribute name="IsolationLevel">READ_COMMITTED</attribute>
  | 
  |         <attribute name="CacheMode">REPL_SYNC</attribute>
  | 
  |         <attribute name="ClusterName">Test cache</attribute>
  | 
  |         <attribute name="ClusterConfig">
  | 			<config>
  | 				<UDP bind_addr="10.0.0.226"
  | 				    mcast_addr="228.10.10.10"
  | 					mcast_port="45588"
  | 					tos="8"
  | 					ucast_recv_buf_size="20000000"
  | 					ucast_send_buf_size="640000"
  | 					mcast_recv_buf_size="25000000"
  | 					mcast_send_buf_size="640000"
  | 					loopback="false"
  | 					discard_incompatible_packets="true"
  | 					max_bundle_size="64000"
  | 					max_bundle_timeout="30"
  | 					use_incoming_packet_handler="true"
  | 					ip_ttl="2"
  | 					enable_bundling="false"
  | 					enable_diagnostics="true"
  | 					use_concurrent_stack="true"
  | 					thread_naming_pattern="pl"
  | 					thread_pool.enabled="true"
  | 					thread_pool.min_threads="1"
  | 					thread_pool.max_threads="25"
  | 					thread_pool.keep_alive_time="30000"
  | 					thread_pool.queue_enabled="true"
  | 					thread_pool.queue_max_size="10"
  | 					thread_pool.rejection_policy="Run"
  | 					oob_thread_pool.enabled="true"
  | 					oob_thread_pool.min_threads="1"
  | 					oob_thread_pool.max_threads="4"
  | 					oob_thread_pool.keep_alive_time="10000"
  | 					oob_thread_pool.queue_enabled="true"
  | 					oob_thread_pool.queue_max_size="10"
  | 					oob_thread_pool.rejection_policy="Run"/>
  | 				<PING timeout="2000" num_initial_members="3"/>
  | 				<MERGE2 max_interval="30000" min_interval="10000"/>
  | 				<FD_SOCK/>
  | 				<FD timeout="10000" max_tries="5" shun="true"/>
  | 				<VERIFY_SUSPECT timeout="1500"/>
  | 				<pbcast.NAKACK max_xmit_size="60000"
  | 					use_mcast_xmit="false" gc_lag="0"
  | 					retransmit_timeout="300,600,1200,2400,4800"
  | 					discard_delivered_msgs="true"/>
  | 				<UNICAST timeout="300,600,1200,2400,3600"/>
  | 				<pbcast.STABLE stability_delay="1000" desired_avg_gossip="50000"
  | 					max_bytes="400000"/>
  | 				<pbcast.GMS print_local_addr="true" join_timeout="5000"
  | 					join_retry_timeout="2000" shun="false"
  | 					view_bundling="true" view_ack_collection_timeout="5000"/>
  | 				<FRAG2 frag_size="60000"/>
  | 				<pbcast.STREAMING_STATE_TRANSFER use_reading_thread="true"/>
  | 				<!-- <pbcast.STATE_TRANSFER/> -->
  | 				<pbcast.FLUSH timeout="0"/>
  | 			</config>
  | 		</attribute>
  | 
  |         <attribute name="FetchInMemoryState">true</attribute>
  |         <attribute name="StateRetrievalTimeout">15000</attribute>
  |         <attribute name="SyncReplTimeout">15000</attribute>
  |         <attribute name="LockAcquisitionTimeout">1000</attribute>
  |         
  |         <attribute name="UseRegionBasedMarshalling">true</attribute>
  |         <attribute name="TransactionTimeout">300</attribute>
  |     </mbean>
  | 
  | </server>
  | 

With both pessimistic and optimistic locking, there are failures every time.
Here are some example exceptions:

- optimistic locking:

  | org.jboss.cache.lock.TimeoutException: failure acquiring lock: fqn=/, caller=GlobalTransaction:<10.0.0.226:32932>:1, lock=write owner=GlobalTransaction:<10.0.0.226:32931>:2 (org.jboss.cache.lock.LockStrategyReadCommitted at 1fd6bea)
  | 	at org.jboss.cache.lock.IdentityLock.acquire(IdentityLock.java:528)
  | 	at org.jboss.cache.interceptors.OptimisticLockingInterceptor.lockNodes(OptimisticLockingInterceptor.java:119)
  | 	at org.jboss.cache.interceptors.OptimisticLockingInterceptor.invoke(OptimisticLockingInterceptor.java:52)
  | 	at org.jboss.cache.interceptors.Interceptor.invoke(Interceptor.java:76)
  | 	at org.jboss.cache.interceptors.OptimisticReplicationInterceptor.invoke(OptimisticReplicationInterceptor.java:73)
  | 	at org.jboss.cache.interceptors.Interceptor.invoke(Interceptor.java:76)
  | 	at org.jboss.cache.interceptors.NotificationInterceptor.invoke(NotificationInterceptor.java:32)
  | 	at org.jboss.cache.interceptors.Interceptor.invoke(Interceptor.java:76)
  | 	at org.jboss.cache.interceptors.TxInterceptor.handleOptimisticPrepare(TxInterceptor.java:374)
  | 	at org.jboss.cache.interceptors.TxInterceptor.handleRemotePrepare(TxInterceptor.java:250)
  | 	at org.jboss.cache.interceptors.TxInterceptor.invoke(TxInterceptor.java:100)
  | 	at org.jboss.cache.interceptors.Interceptor.invoke(Interceptor.java:76)
  | 	at org.jboss.cache.interceptors.CacheMgmtInterceptor.invoke(CacheMgmtInterceptor.java:123)
  | 	at org.jboss.cache.interceptors.Interceptor.invoke(Interceptor.java:76)
  | 	at org.jboss.cache.interceptors.InvocationContextInterceptor.invoke(InvocationContextInterceptor.java:62)
  | 	at org.jboss.cache.CacheImpl.invokeMethod(CacheImpl.java:3939)
  | 	at org.jboss.cache.CacheImpl._replicate(CacheImpl.java:2853)
  | 	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
  | 	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
  | 	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
  | 	at java.lang.reflect.Method.invoke(Method.java:585)
  | 	at org.jgroups.blocks.MethodCall.invoke(MethodCall.java:330)
  | 	at org.jboss.cache.marshall.InactiveRegionAwareRpcDispatcher.handle(InactiveRegionAwareRpcDispatcher.java:77)
  | 	at org.jgroups.blocks.RequestCorrelator.handleRequest(RequestCorrelator.java:624)
  | 	at org.jgroups.blocks.RequestCorrelator.receiveMessage(RequestCorrelator.java:533)
  | 	at org.jgroups.blocks.RequestCorrelator.receive(RequestCorrelator.java:365)
  | 	at org.jgroups.blocks.MessageDispatcher$ProtocolAdapter.up(MessageDispatcher.java:736)
  | 	at org.jgroups.JChannel.up(JChannel.java:1063)
  | 	at org.jgroups.stack.ProtocolStack.up(ProtocolStack.java:325)
  | 	at org.jgroups.protocols.pbcast.FLUSH.up(FLUSH.java:406)
  | 	at org.jgroups.protocols.pbcast.STREAMING_STATE_TRANSFER.up(STREAMING_STATE_TRANSFER.java:255)
  | 	at org.jgroups.protocols.FRAG2.up(FRAG2.java:197)
  | 	at org.jgroups.protocols.pbcast.GMS.up(GMS.java:722)
  | 	at org.jgroups.protocols.pbcast.STABLE.up(STABLE.java:234)
  | 	at org.jgroups.protocols.UNICAST.up(UNICAST.java:263)
  | 	at org.jgroups.protocols.pbcast.NAKACK.handleMessage(NAKACK.java:720)
  | 	at org.jgroups.protocols.pbcast.NAKACK.up(NAKACK.java:546)
  | 	at org.jgroups.protocols.VERIFY_SUSPECT.up(VERIFY_SUSPECT.java:167)
  | 	at org.jgroups.protocols.FD.up(FD.java:322)
  | 	at org.jgroups.protocols.FD_SOCK.up(FD_SOCK.java:298)
  | 	at org.jgroups.protocols.MERGE2.up(MERGE2.java:145)
  | 	at org.jgroups.protocols.Discovery.up(Discovery.java:220)
  | 	at org.jgroups.protocols.TP$IncomingPacket.handleMyMessage(TP.java:1486)
  | 	at org.jgroups.protocols.TP$IncomingPacket.run(TP.java:1440)
  | 	at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:650)
  | 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:675)
  | 	at java.lang.Thread.run(Thread.java:595)
  | Caused by: org.jboss.cache.lock.TimeoutException: write lock for / could not be acquired after 1000 ms. Locks: Read lock owners: []
  | Write lock owner: GlobalTransaction:<10.0.0.226:32931>:2
  |  (caller=GlobalTransaction:<10.0.0.226:32932>:1, lock info: write owner=GlobalTransaction:<10.0.0.226:32931>:2 (org.jboss.cache.lock.LockStrategyReadCommitted at 1fd6bea))
  | 	at org.jboss.cache.lock.IdentityLock.acquireWriteLock0(IdentityLock.java:244)
  | 	at org.jboss.cache.lock.IdentityLock.acquireWriteLock(IdentityLock.java:167)
  | 	at org.jboss.cache.lock.IdentityLock.acquire(IdentityLock.java:497)
  | 	... 46 more
  | 
  | javax.transaction.RollbackException: outcome is false status: 1
  | 	at org.jboss.cache.transaction.DummyTransaction.commit(DummyTransaction.java:75)
  | 	at org.jboss.cache.transaction.DummyBaseTransactionManager.commit(DummyBaseTransactionManager.java:78)
  | 	at org.jboss.cache.transaction.DummyUserTransaction.commit(DummyUserTransaction.java:77)
  | 	at CacheRepl.CacheThread.doTransaction(CacheThread.java:31)
  | 	at CacheRepl.CacheThread.run(CacheThread.java:53)
  | 

- pessimistic locking:

  | org.jboss.cache.lock.TimeoutException: failure acquiring lock: fqn=/node, caller=GlobalTransaction:<10.0.0.226:32933>:2, lock=write owner=GlobalTransaction:<10.0.0.226:32934>:1 (org.jboss.cache.lock.LockStrategyReadCommitted at 1977b9b)
  | 	at org.jboss.cache.lock.IdentityLock.acquire(IdentityLock.java:528)
  | 	at org.jboss.cache.interceptors.PessimisticLockInterceptor$LockManager.acquire(PessimisticLockInterceptor.java:579)
  | 	at org.jboss.cache.interceptors.PessimisticLockInterceptor.acquireNodeLock(PessimisticLockInterceptor.java:393)
  | 	at org.jboss.cache.interceptors.PessimisticLockInterceptor.lock(PessimisticLockInterceptor.java:329)
  | 	at org.jboss.cache.interceptors.PessimisticLockInterceptor.invoke(PessimisticLockInterceptor.java:187)
  | 	at org.jboss.cache.interceptors.Interceptor.invoke(Interceptor.java:76)
  | 	at org.jboss.cache.interceptors.ReplicationInterceptor.invoke(ReplicationInterceptor.java:34)
  | 	at org.jboss.cache.interceptors.Interceptor.invoke(Interceptor.java:76)
  | 	at org.jboss.cache.interceptors.NotificationInterceptor.invoke(NotificationInterceptor.java:32)
  | 	at org.jboss.cache.interceptors.Interceptor.invoke(Interceptor.java:76)
  | 	at org.jboss.cache.interceptors.TxInterceptor.replayModifications(TxInterceptor.java:511)
  | 	at org.jboss.cache.interceptors.TxInterceptor.handlePessimisticPrepare(TxInterceptor.java:394)
  | 	at org.jboss.cache.interceptors.TxInterceptor.handleRemotePrepare(TxInterceptor.java:254)
  | 	at org.jboss.cache.interceptors.TxInterceptor.invoke(TxInterceptor.java:100)
  | 	at org.jboss.cache.interceptors.Interceptor.invoke(Interceptor.java:76)
  | 	at org.jboss.cache.interceptors.CacheMgmtInterceptor.invoke(CacheMgmtInterceptor.java:123)
  | 	at org.jboss.cache.interceptors.Interceptor.invoke(Interceptor.java:76)
  | 	at org.jboss.cache.interceptors.InvocationContextInterceptor.invoke(InvocationContextInterceptor.java:62)
  | 	at org.jboss.cache.CacheImpl.invokeMethod(CacheImpl.java:3939)
  | 	at org.jboss.cache.CacheImpl._replicate(CacheImpl.java:2853)
  | 	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
  | 	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
  | 	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
  | 	at java.lang.reflect.Method.invoke(Method.java:585)
  | 	at org.jgroups.blocks.MethodCall.invoke(MethodCall.java:330)
  | 	at org.jboss.cache.marshall.InactiveRegionAwareRpcDispatcher.handle(InactiveRegionAwareRpcDispatcher.java:77)
  | 	at org.jgroups.blocks.RequestCorrelator.handleRequest(RequestCorrelator.java:624)
  | 	at org.jgroups.blocks.RequestCorrelator.receiveMessage(RequestCorrelator.java:533)
  | 	at org.jgroups.blocks.RequestCorrelator.receive(RequestCorrelator.java:365)
  | 	at org.jgroups.blocks.MessageDispatcher$ProtocolAdapter.up(MessageDispatcher.java:736)
  | 	at org.jgroups.JChannel.up(JChannel.java:1063)
  | 	at org.jgroups.stack.ProtocolStack.up(ProtocolStack.java:325)
  | 	at org.jgroups.protocols.pbcast.FLUSH.up(FLUSH.java:406)
  | 	at org.jgroups.protocols.pbcast.STREAMING_STATE_TRANSFER.up(STREAMING_STATE_TRANSFER.java:255)
  | 	at org.jgroups.protocols.FRAG2.up(FRAG2.java:197)
  | 	at org.jgroups.protocols.pbcast.GMS.up(GMS.java:722)
  | 	at org.jgroups.protocols.pbcast.STABLE.up(STABLE.java:234)
  | 	at org.jgroups.protocols.UNICAST.up(UNICAST.java:263)
  | 	at org.jgroups.protocols.pbcast.NAKACK.handleMessage(NAKACK.java:720)
  | 	at org.jgroups.protocols.pbcast.NAKACK.up(NAKACK.java:546)
  | 	at org.jgroups.protocols.VERIFY_SUSPECT.up(VERIFY_SUSPECT.java:167)
  | 	at org.jgroups.protocols.FD.up(FD.java:322)
  | 	at org.jgroups.protocols.FD_SOCK.up(FD_SOCK.java:298)
  | 	at org.jgroups.protocols.MERGE2.up(MERGE2.java:145)
  | 	at org.jgroups.protocols.Discovery.up(Discovery.java:220)
  | 	at org.jgroups.protocols.TP$IncomingPacket.handleMyMessage(TP.java:1486)
  | 	at org.jgroups.protocols.TP$IncomingPacket.run(TP.java:1440)
  | 	at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:650)
  | 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:675)
  | 	at java.lang.Thread.run(Thread.java:595)
  | Caused by: org.jboss.cache.lock.TimeoutException: write lock for /node could not be acquired after 1000 ms. Locks: Read lock owners: []
  | Write lock owner: GlobalTransaction:<10.0.0.226:32934>:1
  |  (caller=GlobalTransaction:<10.0.0.226:32933>:2, lock info: write owner=GlobalTransaction:<10.0.0.226:32934>:1 (org.jboss.cache.lock.LockStrategyReadCommitted at 1977b9b))
  | 	at org.jboss.cache.lock.IdentityLock.acquireWriteLock0(IdentityLock.java:244)
  | 	at org.jboss.cache.lock.IdentityLock.acquireWriteLock(IdentityLock.java:167)
  | 	at org.jboss.cache.lock.IdentityLock.acquire(IdentityLock.java:497)
  | 	... 49 more
  | 
  | javax.transaction.RollbackException: outcome is false status: 1
  | 	at org.jboss.cache.transaction.DummyTransaction.commit(DummyTransaction.java:75)
  | 	at org.jboss.cache.transaction.DummyBaseTransactionManager.commit(DummyBaseTransactionManager.java:78)
  | 	at org.jboss.cache.transaction.DummyUserTransaction.commit(DummyUserTransaction.java:77)
  | 	at CacheRepl.CacheThread.doTransaction(CacheThread.java:31)
  | 	at CacheRepl.CacheThread.run(CacheThread.java:53)
  | 

I don't care about the configuration details, but this test program should NEVER, EVER, NOT EVER, fail under any circumstances, except nuclear attack or something like that.
One cache should always get the lock first, write to the node, finish the transaction, then the other cache can do the same.
Just like 2 threads setting the same AtomicInteger - that NEVER throws an exception.
Is there any solution to this?

Thanks
Adrian

View the original post : http://www.jboss.com/index.html?module=bb&op=viewtopic&p=4117891#4117891

Reply to the post : http://www.jboss.com/index.html?module=bb&op=posting&mode=reply&p=4117891



More information about the jboss-user mailing list