Alright. The first step is to write a unit test that asserts checked transaction
semantics.
1/ atomicitySync is the standard synchronous case, and passes
2/ atomicityAsync is the asynchronous case, and fails
I use JBoss Cache (core edition) as the transactional resource.
Now the goal is to make 2/ pass in steps:
-same test, same results, but using JBossTS (verify correct JBTS integration)
-set up JBossTS as explained in the previous post to make it pass
But first do you agree with my test?
Here is the test:
@RunWith(SpringJUnit4ClassRunner.class)
| @ContextConfiguration ( locations={"/application-context_Jdbc_Atomikos.xml"}
)
| public class ComportementTransactionnelAsync implements ApplicationContextAware
| {
| private Cache cache;
| @Resource ( name="tracksTable" )
| protected TracksTable table;
| protected static ApplicationContext applicationContext;
|
| @Test
| //@Ignore
| public void atomicitySync () throws Exception
| {
| atomicity ( false );
| }
|
| @Test
| public void atomicityAsync () throws Exception
| {
| atomicity ( true );
| }
|
| private void atomicity ( boolean async ) throws Exception
| {
| final String A_OK="A", B_OK="B", A_KO="C" ,
B_KO="D";
| updateAB ( A_OK , B_OK , async , false );
| assertEquals ( A_OK , table.getA() );
| assertEquals ( B_OK , table.getB() );
|
| try
| {
| updateAB ( A_KO , B_KO , async , true );
| fail ();
| }
| catch ( Exception e ) {}
| finally
| {
| assertEquals ( B_OK , table.getB() );
| assertEquals ( A_OK , table.getA() );
| }
| }
|
| private void updateAB(String a, String b, boolean async, boolean failOnB) throws
InterruptedException
| {
| table.updateAB ( a , b , async , failOnB );
| if ( async )
| {
| /* Commented since getA can be blocked by updateAB since
IsolationLevel=Serializable
| * assertEquals ( "" , table.getA() ); //Transaction should not be
committed yet
| assertEquals ( "" , table.getB() ); //Transaction should not be committed
yet*/
|
| Thread.sleep(1500);
| }
| }
|
| @Before
| public void before ()
| {
| cache = createCoreCache ();
| cache.start();
| table.setCache ( cache );
|
| table.resetAB ();
| assertEquals ( "" , table.getA() );
| assertEquals ( "" , table.getB() );
| }
|
| @After
| public void stop ()
| {
| table.unsetCache ( cache );
| cache.stop();
| }
|
| private Cache createCoreCache()
| {
| CacheFactory factory = new DefaultCacheFactory();
| Cache cache =
factory.createCache("resources/META-INF/replSync-service.xml", false);
|
| cache.getInvocationContext().getOptionOverrides().setForceSynchronous(true);
|
| cache.create();
| return cache;
| }
|
| @Override
| public void setApplicationContext(ApplicationContext applicationContext) throws
BeansException
| {
| TransactionManager tm = (TransactionManager) applicationContext.getBean (
"atomikosTransactionManager" );
| AtomikosTransactionManagerLookup.setAtomikosTransactionManager ( tm );
| }
| }
And here is the service implementation:
@Service("tracksTable")
| @Transactional (propagation=Propagation.REQUIRED, isolation=Isolation.SERIALIZABLE,
readOnly=false, timeout=10000)
| public class TracksTableImpl implements TracksTable
| {
| //--------------JBC-----------------
|
| private Cache coreCache;
|
| @Override
| public void updateAB ( String a , String b , boolean async , boolean failOnUpdateB )
| {
| setA ( a );
| Thread t = new Thread ( new SleepAndSetB ( b , failOnUpdateB ) );
| if ( async ) { t.start(); } else t.run();
| }
|
| private class SleepAndSetB implements Runnable
| {
| private final String b;
| private final boolean failOnUpdateB;
| SleepAndSetB ( String b, boolean failOnUpdateB ) { this.b = b; this.failOnUpdateB =
failOnUpdateB; }
| @Override public void run()
| {
| ObjectUtils.sleep(500);
| if ( failOnUpdateB ) throw new TrackException ();
| setB ( b );
| }
| }
|
| @Override
| public String getA()
| {
| Node rootNode = coreCache.getRoot();
| return (String)rootNode.get ( "a" );
| }
|
| @Override
| public String getB()
| {
| Node rootNode = coreCache.getRoot();
| return (String)rootNode.get ( "b" );
| }
|
| private void setA ( String a )
| {
| System.out.println ( "setA [ a=" + a + " ]" );
| Node rootNode = coreCache.getRoot();
| rootNode.put ( "a" , a );
| }
|
| private void setB ( String b )
| {
| System.out.println ( "setB [ b=" + b + " ]" );
| Node rootNode = coreCache.getRoot();
| rootNode.put ( "b" , b );
| }
|
| @Override
| public void resetAB()
| {
| setA ( "" );
| setB ( "" );
| }
|
| @Override
| public void setCache ( Cache cache )
| {
| coreCache = cache;
| }
|
| @Override
| public void unsetCache ( Cache cache )
| {
| coreCache = null;
| }
| }
|
JBossCache conf (replSync-service.xml):
<?xml version="1.0" encoding="UTF-8"?>
| <server>
| <mbean code="org.jboss.cache.jmx.CacheJmxWrapper"
name="jboss.cache:service=TreeCache">
|
| <attribute
name="TransactionManagerLookupClass">hellotrackworld.impl.srv.AtomikosTransactionManagerLookup</attribute>
| <attribute
name="NodeLockingScheme">PESSIMISTIC</attribute>
| <attribute name="IsolationLevel">SERIALIZABLE</attribute>
| <attribute name="CacheMode">LOCAL</attribute>
| <attribute name="UseReplQueue">false</attribute>
| <attribute name="ReplQueueInterval">0</attribute>
| <attribute name="ReplQueueMaxElements">0</attribute>
| <attribute
name="ClusterName">JBossCache-Cluster</attribute>
|
| <attribute name="ClusterConfig">
| <config>
| <UDP mcast_addr="228.10.10.10"
| mcast_port="45588"
| tos="8"
| ucast_recv_buf_size="20000000"
| ucast_send_buf_size="640000"
| mcast_recv_buf_size="25000000"
| mcast_send_buf_size="640000"
| loopback="false"
| discard_incompatible_packets="true"
| max_bundle_size="64000"
| max_bundle_timeout="30"
| use_incoming_packet_handler="true"
| ip_ttl="2"
| enable_bundling="false"
| enable_diagnostics="true"
|
| use_concurrent_stack="true"
|
| thread_naming_pattern="pl"
|
| thread_pool.enabled="true"
| thread_pool.min_threads="1"
| thread_pool.max_threads="25"
| thread_pool.keep_alive_time="30000"
| thread_pool.queue_enabled="true"
| thread_pool.queue_max_size="10"
| thread_pool.rejection_policy="Run"
|
| oob_thread_pool.enabled="true"
| oob_thread_pool.min_threads="1"
| oob_thread_pool.max_threads="4"
| oob_thread_pool.keep_alive_time="10000"
| oob_thread_pool.queue_enabled="true"
| oob_thread_pool.queue_max_size="10"
| oob_thread_pool.rejection_policy="Run"/>
|
| <PING timeout="2000" num_initial_members="3"/>
| <MERGE2 max_interval="30000"
min_interval="10000"/>
| <FD_SOCK/>
| <FD timeout="10000" max_tries="5"
shun="true"/>
| <VERIFY_SUSPECT timeout="1500"/>
| <pbcast.NAKACK max_xmit_size="60000"
| use_mcast_xmit="false" gc_lag="0"
| retransmit_timeout="300,600,1200,2400,4800"
| discard_delivered_msgs="true"/>
| <UNICAST timeout="300,600,1200,2400,3600"/>
| <pbcast.STABLE stability_delay="1000"
desired_avg_gossip="50000"
| max_bytes="400000"/>
| <pbcast.GMS print_local_addr="true"
join_timeout="5000"
| join_retry_timeout="2000" shun="false"
| view_bundling="true"
view_ack_collection_timeout="5000"/>
| <FRAG2 frag_size="60000"/>
| <pbcast.STREAMING_STATE_TRANSFER
use_reading_thread="true"/>
| <!-- <pbcast.STATE_TRANSFER/> -->
| <pbcast.FLUSH timeout="0"/>
| </config>
| </attribute>
|
|
| <attribute name="FetchInMemoryState">true</attribute>
| <attribute name="StateRetrievalTimeout">15000</attribute>
| <attribute name="SyncReplTimeout">15000</attribute>
| <attribute
name="LockAcquisitionTimeout">10000</attribute>
| <attribute
name="UseRegionBasedMarshalling">true</attribute>
| </mbean>
| </server>
|
Spring conf (application-context.xml):
<?xml version="1.0" encoding="UTF-8"?>
| <beans
xmlns="http://www.springframework.org/schema/beans"
|
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
xmlns:context="http://www.springframework.org/schema/context"
|
xmlns:tx="http://www.springframework.org/schema/tx"
|
xmlns:aop="http://www.springframework.org/schema/aop"
| xsi:schemaLocation="
|
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-2.5.xsd
|
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context-2.5.xsd
|
http://www.springframework.org/schema/tx
http://www.springframework.org/schema/tx/spring-tx-2.5.xsd
|
http://www.springframework.org/schema/aop
http://www.springframework.org/schema/aop/spring-aop-2.5.xsd">
|
| <!--CONF GENERALE vvvvv-->
| <context:component-scan base-package="hellotrackworld"/>
| <context:annotation-config/>
| <!--CONF GENERALE ^^^^^-->
|
|
| <!--PARAMETRAGE JTA vvvvv-->
| <bean id="atomikosTransactionManager"
class="com.atomikos.icatch.jta.UserTransactionManager"
init-method="init" destroy-method="close">
| <property name="forceShutdown" value="true"/>
| </bean>
| <bean id="atomikosUserTransaction"
class="com.atomikos.icatch.jta.UserTransactionImp">
| <property name="transactionTimeout" value="300"/>
| </bean>
|
| <bean id="jtaTransactionManager"
class="org.springframework.transaction.jta.JtaTransactionManager">
| <property name="transactionManager"
ref="atomikosTransactionManager" />
| <property name="userTransaction"
ref="atomikosUserTransaction" />
| <property name="allowCustomIsolationLevels" value="true"
/>
| </bean>
|
| <tx:annotation-driven transaction-manager="jtaTransactionManager"/>
| <!--PARAMETRAGE JTA ^^^^^-->
|
| </beans>
|
|
View the original post :
http://www.jboss.org/index.html?module=bb&op=viewtopic&p=4257591#...
Reply to the post :
http://www.jboss.org/index.html?module=bb&op=posting&mode=reply&a...