[jboss-dev-forums] [JBoss Transactions Development] - Re: Asynchronicity and transaction context propagation

chtimi2 do-not-reply at jboss.com
Tue Sep 29 05:54:56 EDT 2009


Alright. The first step is to write a unit test that asserts checked transaction semantics.
1/ atomicitySync is the standard synchronous case, and passes
2/ atomicityAsync is the asynchronous case, and fails

I use JBoss Cache (core edition) as the transactional resource.

Now the goal is to make 2/ pass in steps:
	-same test, same results, but using JBossTS (verify correct JBTS integration)
	-set up JBossTS as explained in the previous post to make it pass

But first do you agree with my test?


Here is the test:
@RunWith(SpringJUnit4ClassRunner.class)
  | @ContextConfiguration ( locations={"/application-context_Jdbc_Atomikos.xml"} )
  | public class ComportementTransactionnelAsync implements ApplicationContextAware
  | {
  | 	private Cache cache;
  | 	@Resource ( name="tracksTable" )
  | 	protected TracksTable table;
  | 	protected static ApplicationContext applicationContext; 
  | 	
  | 	@Test
  | 	//@Ignore
  | 	public void atomicitySync () throws Exception
  | 	{
  | 		atomicity ( false );
  | 	}
  | 	
  | 	@Test
  | 	public void atomicityAsync () throws Exception
  | 	{
  | 		atomicity ( true );
  | 	}
  | 	
  | 	private void atomicity ( boolean async ) throws Exception
  | 	{
  | 		final String A_OK="A", B_OK="B", A_KO="C" , B_KO="D";
  | 		updateAB ( A_OK , B_OK , async , false );
  | 		assertEquals ( A_OK , table.getA() );
  | 		assertEquals ( B_OK , table.getB() );
  | 		
  | 		try
  | 		{
  | 			updateAB ( A_KO , B_KO , async , true );
  | 			fail ();
  | 		}
  | 		catch ( Exception e ) {}
  | 		finally
  | 		{
  | 			assertEquals ( B_OK , table.getB() );
  | 			assertEquals ( A_OK , table.getA() );
  | 		}
  | 	}
  | 	
  | 	private void updateAB(String a, String b, boolean async, boolean failOnB) throws InterruptedException 
  | 	{
  | 		table.updateAB ( a , b , async , failOnB );
  | 		if ( async )
  | 		{
  | 			/* Commented since getA can be blocked by updateAB since IsolationLevel=Serializable
  | 			 * assertEquals ( "" , table.getA() ); //Transaction should not be committed yet
  | 			assertEquals ( "" , table.getB() ); //Transaction should not be committed yet*/ 
  | 			
  | 			Thread.sleep(1500);	
  | 		}
  | 	}
  | 
  | 	@Before
  | 	public void before ()
  | 	{
  | 		cache = createCoreCache ();
  | 		cache.start();
  | 		table.setCache ( cache );
  | 		
  | 		table.resetAB ();
  | 		assertEquals ( "" , table.getA() );
  | 		assertEquals ( "" , table.getB() );
  | 	}
  | 	
  | 	@After
  | 	public void stop ()
  | 	{
  | 		table.unsetCache ( cache );
  | 		cache.stop();
  | 	}
  | 
  | 	private Cache createCoreCache() 
  | 	{
  | 		CacheFactory factory = new DefaultCacheFactory();
  | 		Cache cache = factory.createCache("resources/META-INF/replSync-service.xml", false);
  | 		
  | 		cache.getInvocationContext().getOptionOverrides().setForceSynchronous(true);
  | 		
  | 		cache.create();
  | 		return cache;
  | 	}
  | 	
  | 	@Override
  | 	public void setApplicationContext(ApplicationContext applicationContext) throws BeansException 
  | 	{
  | 		TransactionManager tm = (TransactionManager) applicationContext.getBean ( "atomikosTransactionManager" );
  | 		AtomikosTransactionManagerLookup.setAtomikosTransactionManager ( tm );
  | 	}
  | }

And here is the service implementation:
@Service("tracksTable")
  | @Transactional (propagation=Propagation.REQUIRED, isolation=Isolation.SERIALIZABLE, readOnly=false, timeout=10000)
  | public class TracksTableImpl implements TracksTable
  | {
  | 	//--------------JBC-----------------
  | 	
  | 	private Cache coreCache;
  | 	
  | 	@Override
  | 	public void updateAB ( String a , String b , boolean async , boolean failOnUpdateB ) 
  | 	{
  | 		setA ( a );
  | 		Thread t = new Thread ( new SleepAndSetB ( b , failOnUpdateB ) );
  | 		if ( async ) { t.start(); } else t.run();
  | 	}
  | 	
  | 	private class SleepAndSetB implements Runnable
  | 	{
  | 		private final String b;
  | 		private final boolean failOnUpdateB;
  | 		SleepAndSetB ( String b, boolean failOnUpdateB ) { this.b = b; this.failOnUpdateB = failOnUpdateB; }
  | 		@Override public void run() 
  | 		{ 
  | 			ObjectUtils.sleep(500);  
  | 			if ( failOnUpdateB ) throw new TrackException ();
  | 			setB ( b ); 
  | 		}
  | 	}
  | 
  | 	@Override
  | 	public String getA() 
  | 	{
  | 		Node rootNode = coreCache.getRoot();
  | 		return (String)rootNode.get ( "a" );
  | 	}
  | 
  | 	@Override
  | 	public String getB() 
  | 	{
  | 		Node rootNode = coreCache.getRoot();
  | 		return (String)rootNode.get ( "b" );
  | 	}
  | 	
  | 	private void setA ( String a ) 
  | 	{
  | 		System.out.println ( "setA [ a=" + a + " ]" );
  | 		Node rootNode = coreCache.getRoot();
  | 		rootNode.put ( "a" , a );		
  | 	}
  | 	
  | 	private void setB ( String b ) 
  | 	{
  | 		System.out.println ( "setB [ b=" + b + " ]" );
  | 		Node rootNode = coreCache.getRoot();
  | 		rootNode.put ( "b" , b );		
  | 	}
  | 	
  | 	@Override
  | 	public void resetAB() 
  | 	{
  | 		setA ( "" );
  | 		setB ( "" );
  | 	}
  | 	
  | 	@Override
  | 	public void setCache ( Cache cache ) 
  | 	{
  | 		coreCache = cache;
  | 	}
  | 	
  | 	@Override
  | 	public void unsetCache ( Cache cache ) 
  | 	{
  | 		coreCache = null;
  | 	}
  | }
  | 

JBossCache conf (replSync-service.xml):
<?xml version="1.0" encoding="UTF-8"?>
  | <server>
  |    <mbean code="org.jboss.cache.jmx.CacheJmxWrapper" name="jboss.cache:service=TreeCache">
  | 
  |       <attribute name="TransactionManagerLookupClass">hellotrackworld.impl.srv.AtomikosTransactionManagerLookup</attribute>
  |       <attribute name="NodeLockingScheme">PESSIMISTIC</attribute>
  |       <attribute name="IsolationLevel">SERIALIZABLE</attribute>
  |       <attribute name="CacheMode">LOCAL</attribute>
  |       <attribute name="UseReplQueue">false</attribute>
  |       <attribute name="ReplQueueInterval">0</attribute>
  |       <attribute name="ReplQueueMaxElements">0</attribute>
  |       <attribute name="ClusterName">JBossCache-Cluster</attribute>
  | 
  |       <attribute name="ClusterConfig">
  |          <config>
  |             <UDP mcast_addr="228.10.10.10"
  |                  mcast_port="45588"
  |                  tos="8"
  |                  ucast_recv_buf_size="20000000"
  |                  ucast_send_buf_size="640000"
  |                  mcast_recv_buf_size="25000000"
  |                  mcast_send_buf_size="640000"
  |                  loopback="false"
  |                  discard_incompatible_packets="true"
  |                  max_bundle_size="64000"
  |                  max_bundle_timeout="30"
  |                  use_incoming_packet_handler="true"
  |                  ip_ttl="2"
  |                  enable_bundling="false"
  |                  enable_diagnostics="true"
  | 
  |                  use_concurrent_stack="true"
  | 
  |                  thread_naming_pattern="pl"
  | 
  |                  thread_pool.enabled="true"
  |                  thread_pool.min_threads="1"
  |                  thread_pool.max_threads="25"
  |                  thread_pool.keep_alive_time="30000"
  |                  thread_pool.queue_enabled="true"
  |                  thread_pool.queue_max_size="10"
  |                  thread_pool.rejection_policy="Run"
  | 
  |                  oob_thread_pool.enabled="true"
  |                  oob_thread_pool.min_threads="1"
  |                  oob_thread_pool.max_threads="4"
  |                  oob_thread_pool.keep_alive_time="10000"
  |                  oob_thread_pool.queue_enabled="true"
  |                  oob_thread_pool.queue_max_size="10"
  |                  oob_thread_pool.rejection_policy="Run"/>
  | 
  |             <PING timeout="2000" num_initial_members="3"/>
  |             <MERGE2 max_interval="30000" min_interval="10000"/>
  |             <FD_SOCK/>
  |             <FD timeout="10000" max_tries="5" shun="true"/>
  |             <VERIFY_SUSPECT timeout="1500"/>
  |             <pbcast.NAKACK max_xmit_size="60000"
  |                            use_mcast_xmit="false" gc_lag="0"
  |                            retransmit_timeout="300,600,1200,2400,4800"
  |                            discard_delivered_msgs="true"/>
  |             <UNICAST timeout="300,600,1200,2400,3600"/>
  |             <pbcast.STABLE stability_delay="1000" desired_avg_gossip="50000"
  |                            max_bytes="400000"/>
  |             <pbcast.GMS print_local_addr="true" join_timeout="5000"
  |                         join_retry_timeout="2000" shun="false"
  |                         view_bundling="true" view_ack_collection_timeout="5000"/>
  |             <FRAG2 frag_size="60000"/>
  |             <pbcast.STREAMING_STATE_TRANSFER use_reading_thread="true"/>
  |             <!-- <pbcast.STATE_TRANSFER/> -->
  |             <pbcast.FLUSH timeout="0"/>
  |          </config>
  |       </attribute>
  | 
  | 
  |       <attribute name="FetchInMemoryState">true</attribute>
  |       <attribute name="StateRetrievalTimeout">15000</attribute>
  |       <attribute name="SyncReplTimeout">15000</attribute>
  |       <attribute name="LockAcquisitionTimeout">10000</attribute>
  |       <attribute name="UseRegionBasedMarshalling">true</attribute>
  |    </mbean>
  | </server>
  | 

Spring conf (application-context.xml):
<?xml version="1.0" encoding="UTF-8"?>
  | <beans xmlns="http://www.springframework.org/schema/beans"
  |        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  |        xmlns:context="http://www.springframework.org/schema/context"
  |        xmlns:tx="http://www.springframework.org/schema/tx"
  |        xmlns:aop="http://www.springframework.org/schema/aop"
  |        xsi:schemaLocation="
  | http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.5.xsd
  | http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-2.5.xsd
  | http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-2.5.xsd
  | http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-2.5.xsd">
  |     
  |     				<!--CONF GENERALE vvvvv-->
  |     <context:component-scan base-package="hellotrackworld"/>
  |     <context:annotation-config/>
  |     				<!--CONF GENERALE ^^^^^-->
  |     
  | 	
  | 					<!--PARAMETRAGE JTA vvvvv-->
  |   	<bean id="atomikosTransactionManager" class="com.atomikos.icatch.jta.UserTransactionManager" init-method="init" destroy-method="close">
  | 		<property name="forceShutdown" value="true"/>
  | 	</bean>
  | 	<bean id="atomikosUserTransaction" class="com.atomikos.icatch.jta.UserTransactionImp">
  |     	<property name="transactionTimeout" value="300"/>
  | 	</bean>
  |   	
  |   	<bean id="jtaTransactionManager" class="org.springframework.transaction.jta.JtaTransactionManager">
  |     	<property name="transactionManager" ref="atomikosTransactionManager" />
  |     	<property name="userTransaction" ref="atomikosUserTransaction" />
  |     	<property name="allowCustomIsolationLevels" value="true" />
  |   	</bean>
  | 	
  | 	<tx:annotation-driven transaction-manager="jtaTransactionManager"/>
  | 					<!--PARAMETRAGE JTA ^^^^^-->
  | 
  | </beans>
  | 
  | 


View the original post : http://www.jboss.org/index.html?module=bb&op=viewtopic&p=4257591#4257591

Reply to the post : http://www.jboss.org/index.html?module=bb&op=posting&mode=reply&p=4257591



More information about the jboss-dev-forums mailing list