[jboss-user] [JBoss Cache: Core Edition] - Re: on child insert parent, child collection updated in DB b

dukehoops do-not-reply at jboss.com
Sat Mar 7 15:52:09 EST 2009

OK, I wired up DualNodeJtaTransactionManagerImpl. The test now passes if UserCount is below 4 or less, at 5 or above the test fails. However, the failure I am seeing with this test is NOT the same as with the test in our environment. It is this:

anonymous wrote : -------------------------------------------------------------------------------
  | Test set: org.hibernate.test.cache.jbc2.functional.MVCCConcurrentWriteTest
  | -------------------------------------------------------------------------------
  | Tests run: 6, Failures: 1, Errors: 0, Skipped: 0, Time elapsed: 182.113 sec <<< FAILURE!
  | testManyUsers(org.hibernate.test.cache.jbc2.functional.MVCCConcurrentWriteTest)  Time elapsed: 165.831 sec  <<< FAILURE!
  | junit.framework.AssertionFailedError: Timed out waiting for user threads to finish. Their state at the time of forced shutdown: TEST CONFIG [userCount=5, iterationsPerUser=40, thinkTimeMillis=10]  STATE of UserRunners: org.hibernate.test.cache.jbc2.functional.MVCCConcurrentWriteTest$UserRunner at ffc3eb[customerId=4 iterationsCompleted=30 completedAll=false causeOfFailure=org.hibernate.cache.CacheException: org.hibernate.cache.CacheException: org.jboss.cache.lock.TimeoutException: Unable to acquire lock on Fqn [/TS/test/org/hibernate/cache/UpdateTimestampsCache/Contacts] after [15000] milliseconds for requestor [Thread[UserRunnerThread-4,5,main]]! Lock held by [GlobalTransaction::903]
  | 	at org.hibernate.cache.jbc2.timestamp.TimestampsRegionImpl.put(TimestampsRegionImpl.java:130)
  | 	at org.hibernate.cache.UpdateTimestampsCache.preinvalidate(UpdateTimestampsCache.java:70)
  | 	at org.hibernate.engine.ActionQueue.execute(ActionQueue.java:275)
  | 	at org.hibernate.engine.ActionQueue.executeActions(ActionQueue.java:263)
  | 	at org.hibernate.engine.ActionQueue.executeActions(ActionQueue.java:167)
  | 	at org.hibernate.event.def.AbstractFlushingEventListener.performExecutions(AbstractFlushingEventListener.java:321)
  | 	at org.hibernate.event.def.DefaultFlushEventListener.onFlush(DefaultFlushEventListener.java:50)
  | 	at org.hibernate.impl.SessionImpl.flush(SessionImpl.java:1027)
  | 	at org.hibernate.impl.SessionImpl.managedFlush(SessionImpl.java:365)
  | 	at org.hibernate.transaction.CacheSynchronization.beforeCompletion(CacheSynchronization.java:88)
  | 	at org.hibernate.test.cache.jbc2.functional.util.DualNodeJtaTransactionImpl.commit(DualNodeJtaTransactionImpl.java:76)
  | 	at org.hibernate.test.cache.jbc2.functional.util.DualNodeJtaTransactionManagerImpl.commit(DualNodeJtaTransactionManagerImpl.java:123)
  | 	at org.hibernate.test.cache.jbc2.functional.MVCCConcurrentWriteTest.commitTx(MVCCConcurrentWriteTest.java:255)
  | 	at org.hibernate.test.cache.jbc2.functional.MVCCConcurrentWriteTest.addContact(MVCCConcurrentWriteTest.java:333)
  | 	at org.hibernate.test.cache.jbc2.functional.MVCCConcurrentWriteTest.access$100(MVCCConcurrentWriteTest.java:29)
  | 	at org.hibernate.test.cache.jbc2.functional.MVCCConcurrentWriteTest$UserRunner.run(MVCCConcurrentWriteTest.java:423)
  | 	at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
  | 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
  | 	at java.lang.Thread.run(Thread.java:619)
  | Caused by: org.hibernate.cache.CacheException: org.jboss.cache.lock.TimeoutException: Unable to acquire lock on Fqn [/TS/test/org/hibernate/cache/UpdateTimestampsCache/Contacts] after [15000] milliseconds for requestor [Thread[UserRunnerThread-4,5,main]]! Lock held by [GlobalTransaction::903]
  | 	at org.hibernate.cache.jbc2.util.CacheHelper.put(CacheHelper.java:214)
  | 	at org.hibernate.cache.jbc2.timestamp.TimestampsRegionImpl.put(TimestampsRegionImpl.java:128)
  | 	... 18 more] 
  | org.hibernate.test.cache.jbc2.functional.MVCCConcurrentWriteTest$UserRunner at 18bbb61[customerId=6 iterationsCompleted=28 completedAll=false causeOfFailure=] 
  | org.hibernate.test.cache.jbc2.functional.MVCCConcurrentWriteTest$UserRunner at 1cab18[customerId=2 iterationsCompleted=28 completedAll=false causeOfFailure=] 
  | org.hibernate.test.cache.jbc2.functional.MVCCConcurrentWriteTest$UserRunner at 9cd8db[customerId=3 iterationsCompleted=30 completedAll=false causeOfFailure=] 

To repro:
-set userCount to 5 or more 
-run the test and wait patiently for up to 5 minutes (not sure why, as with < 5 User threads this test finished in 3 seconds).

The fact that this test isn't using a real JTA tx manager is a problem IMO - as the above exception seems to do more with TX cleanup (though I'd be happy to be proven wrong) . Even if the above bug is worked around and the test starts passing, it won't prove there isn't a problem since a crucial piece of the stack is replaced with a simpler impl.

Here's the updated source:

package org.hibernate.test.cache.jbc2.functional;
  | import java.util.HashSet;
  | import java.util.Random;
  | import java.util.Set;
  | import java.util.concurrent.ExecutorService;
  | import java.util.concurrent.Executors;
  | import java.util.concurrent.TimeUnit;
  | import javax.transaction.SystemException;
  | import junit.framework.Test;
  | import org.hibernate.FlushMode;
  | import org.hibernate.Session;
  | import org.hibernate.cfg.Configuration;
  | import org.hibernate.exception.ExceptionUtils;
  | import org.hibernate.junit.functional.FunctionalTestClassTestSuite;
  | import org.hibernate.stat.SecondLevelCacheStatistics;
  | import org.hibernate.test.cache.jbc2.functional.util.DualNodeConnectionProviderImpl;
  | import org.hibernate.test.cache.jbc2.functional.util.DualNodeJtaTransactionManagerImpl;
  | import org.hibernate.test.cache.jbc2.functional.util.DualNodeTestUtil;
  | import org.hibernate.test.cache.jbc2.functional.util.DualNodeTransactionManagerLookup;
  | import org.hibernate.transaction.CMTTransactionFactory;
  | import org.slf4j.Logger;
  | import org.slf4j.LoggerFactory;
  | /**
  |  *
  |  * @author nikita_tovstoles at mba.berkeley.edu
  |  */
  | public class MVCCConcurrentWriteTest extends MVCCJBossCacheTest {
  |     private static final Logger LOG = LoggerFactory.getLogger(MVCCConcurrentWriteTest.class);
  |     /**
  |      * when USER_COUNT==1, tests pass, when >4 tests fail
  |      */
  |     final private int USER_COUNT = 5;
  |     final private int ITERATION_COUNT = 40;
  |     final private int THINK_TIME_MILLIS = 10;
  |     final private long LAUNCH_INTERVAL_MILLIS = 10;
  |     final private Random random = new Random();
  |     /**
  |      * kill switch used to stop all users when one fails
  |      */
  |     private static boolean TERMINATE_ALL_USERS = false;
  |     /**
  |      * collection of IDs of all customers participating in this test
  |      */
  |     private Set<Integer> customerIDs = new HashSet<Integer>();
  |     public MVCCConcurrentWriteTest(String x) {
  |         super(x);
  |     }
  |     /**
  |      * test that DB can be queried
  |      * @throws java.lang.Exception
  |      */
  |     public void testPingDb() throws Exception {
  |         try {
  |             beginTx();
  |             getEnvironment().getSessionFactory().getCurrentSession().createQuery("from " + Customer.class.getName()).list();
  |             commitTx();
  |         } catch (Exception e) {
  |             rollbackTx();
  |             fail("failed to query DB; exception=" + e);
  |         }
  |     }
  |     @Override
  |     protected void prepareTest() throws Exception {
  |         super.prepareTest();
  |     }
  |     @Override
  |     protected void cleanupTest() throws Exception {
  |         try {
  |             super.cleanupTest();
  |         } finally {
  |             cleanup();
  |         //DualNodeJtaTransactionManagerImpl.cleanupTransactions();
  |         //DualNodeJtaTransactionManagerImpl.cleanupTransactionManagers();
  |         }
  |     }
  |     @Override
  |     public void configure(Configuration cfg) {
  |         super.configure(cfg);
  |         cfg.setProperty(DualNodeTestUtil.NODE_ID_PROP, DualNodeTestUtil.LOCAL);
  |     }
  |     @Override
  |     protected Class getConnectionProviderClass() {
  |         return DualNodeConnectionProviderImpl.class;
  |     }
  |     @Override
  |     protected Class getTransactionManagerLookupClass() {
  |         return DualNodeTransactionManagerLookup.class;
  |     }
  |     @Override
  |     protected Class getTransactionFactoryClass() {
  |         return CMTTransactionFactory.class;
  |     }
  |     @Override
  |     public void testEmptySecondLevelCacheEntry() throws Exception {
  |         //do nothing
  |     }
  |     @Override
  |     public void testQueryCacheInvalidation() {
  |         //do nothing
  |     }
  |     @Override
  |     public void testStaleWritesLeaveCacheConsistent() {
  |         //do nothing
  |     }
  |     public void testSingleUser() throws Exception {
  |         //setup
  |         Customer customer = createCustomer(0);
  |         final Integer customerId = customer.getId();
  |         getCustomerIDs().add(customerId);
  |         assertNull("contact exists despite not being added", getFirstContact(customerId));
  |         //check that cache was hit
  |         SecondLevelCacheStatistics customerSlcs = getEnvironment().getSessionFactory().getStatistics().getSecondLevelCacheStatistics(
  |                 getPrefixedRegionName(Customer.class.getName()));
  |         assertEquals(customerSlcs.getPutCount(), 1);
  |         assertEquals(customerSlcs.getElementCountInMemory(), 1);
  |         assertEquals(customerSlcs.getEntries().size(), 1);
  |         SecondLevelCacheStatistics contactsCollectionSlcs = getEnvironment().getSessionFactory().getStatistics().getSecondLevelCacheStatistics(
  |                 getPrefixedRegionName(Customer.class.getName() + ".contacts"));
  |         assertEquals(1, contactsCollectionSlcs.getPutCount());
  |         assertEquals(1, contactsCollectionSlcs.getElementCountInMemory());
  |         assertEquals(1, contactsCollectionSlcs.getEntries().size());
  |         final Contact contact = addContact(customerId);
  |         assertNotNull("contact returned by addContact is null", contact);
  |         assertEquals("Customer.contacts cache was not invalidated after addContact",
  |                 0, contactsCollectionSlcs.getElementCountInMemory());
  |         assertNotNull("Contact missing after successful add call", getFirstContact(customerId));
  |         //read everyone's contacts
  |         readEveryonesFirstContact();
  |         removeContact(customerId);
  |         assertNull("contact still exists after successful remove call", getFirstContact(customerId));
  |     }
  |     public void testManyUsers() throws Exception {
  |         //setup - create users
  |         for (int i = 0; i < USER_COUNT; i++) {
  |             Customer customer = createCustomer(0);
  |             getCustomerIDs().add(customer.getId());
  |         }
  |         assertEquals("failed to create enough Customers", USER_COUNT, getCustomerIDs().size());
  |         final ExecutorService pool = Executors.newFixedThreadPool(USER_COUNT);
  |         Set<UserRunner> runners = new HashSet<UserRunner>();
  |         for (Integer customerId : getCustomerIDs()) {
  |             UserRunner r = new UserRunner(customerId);
  |             runners.add(r);
  |             pool.execute(r);
  |             LOG.info("launched " + r);
  |             Thread.sleep(LAUNCH_INTERVAL_MILLIS); //rampup
  |         }
  |         assertEquals("not all user threads launched", USER_COUNT, runners.size());
  |         pool.shutdown();
  |         boolean finishedInTime = pool.awaitTermination(120, TimeUnit.SECONDS);
  |         if (!finishedInTime) { //timed out waiting for users to finish
  |             fail("Timed out waiting for user threads to finish. Their state at the time of forced shutdown: " + statusOfRunnersToString(runners));
  |         } else {
  |             //if here -> pool finished before timing out
  |             //check whether all runners suceeded
  |             boolean success = true;
  |             for (UserRunner r : runners) {
  |                 if (!r.isSuccess()) {
  |                     success = false;
  |                     break;
  |                 }
  |             }
  |             assertTrue("at least one UserRunner failed: " + statusOfRunnersToString(runners), success);
  |         }
  |     }
  |     public void cleanup() throws Exception {
  |         getCustomerIDs().clear();
  |         String deleteContactHQL = "delete from Contact";
  |         String deleteCustomerHQL = "delete from Customer";
  |         beginTx();
  |         try {
  |             //Session session = getSessions().getCurrentSession();
  |             Session session = getEnvironment().getSessionFactory().getCurrentSession();
  |             session.createQuery(deleteContactHQL).setFlushMode(FlushMode.AUTO).executeUpdate();
  |             session.createQuery(deleteCustomerHQL).setFlushMode(FlushMode.AUTO).executeUpdate();
  |             commitTx();
  |         } catch (Exception e) {
  |             rollbackTx();
  |             throw e;
  |         }
  |     }
  |     private Customer createCustomer(int nameSuffix) throws Exception {
  |         beginTx();
  |         try {
  |             Customer customer = new Customer();
  |             customer.setName("customer_" + nameSuffix);
  |             customer.setContacts(new HashSet<Contact>());
  |             getEnvironment().getSessionFactory().getCurrentSession().persist(customer);
  |             commitTx();
  |             return customer;
  |         } catch (Exception e) {
  |             rollbackTx();
  |             throw e;
  |         }
  |     }
  |     /**
  |      * delegate method since I'm trying to figure out which txManager to use
  |      * given that this test runs multiple threads (SimpleJtaTxMgrImpl isn't suited for that).
  |      *
  |      * What is needed is a thread-safe JTATransactionManager impl that can handle concurrent TXs
  |      * 
  |      * @throws java.lang.Exception
  |      */
  |     private void beginTx() throws Exception {
  |         DualNodeJtaTransactionManagerImpl.getInstance(DualNodeTestUtil.LOCAL).begin();
  |     }
  |     /**
  |      * @see #beginTx()
  |      * @throws java.lang.Exception
  |      */
  |     private void commitTx() throws Exception {
  |         DualNodeJtaTransactionManagerImpl.getInstance(DualNodeTestUtil.LOCAL).commit();
  |     }
  |     /**
  |      * @see #beginTx()
  |      * @throws java.lang.Exception
  |      */
  |     private void rollbackTx() throws Exception {
  |         DualNodeJtaTransactionManagerImpl.getInstance(DualNodeTestUtil.LOCAL).rollback();
  |     }
  |     /**
  |      * read first contact of every Customer participating in this test.
  |      * this forces concurrent cache writes of Customer.contacts Collection cache node
  |      * 
  |      * @return who cares
  |      * @throws java.lang.Exception
  |      */
  |     private void readEveryonesFirstContact() throws Exception {
  |         beginTx();
  |         try {
  |             for (Integer customerId : getCustomerIDs()) {
  |                 final Customer customer = (Customer) getEnvironment().getSessionFactory().getCurrentSession().load(Customer.class, customerId);
  |                 Set<Contact> contacts = customer.getContacts();
  |                 Contact firstContact = contacts.isEmpty() ? null : contacts.iterator().next();
  |             }
  |             commitTx();
  |         } catch (Exception e) {
  |             rollbackTx();
  |             throw e;
  |         }
  |     }
  |     /**
  |      * -load existing Customer
  |      * -get customer's contacts; return 1st one
  |      * 
  |      * @param customerId
  |      * @return first Contact or null if customer has none
  |      */
  |     private Contact getFirstContact(Integer customerId) throws Exception {
  |         assert customerId != null;
  |         beginTx();
  |         try {
  |             final Customer customer = (Customer) getEnvironment().getSessionFactory().getCurrentSession().load(Customer.class, customerId);
  |             Set<Contact> contacts = customer.getContacts();
  |             Contact firstContact = contacts.isEmpty() ? null : contacts.iterator().next();
  |             commitTx();
  |             return firstContact;
  |         } catch (Exception e) {
  |             rollbackTx();
  |             throw e;
  |         }
  |     }
  |     /**
  |      * -load existing Customer     
  |      * -create a new Contact and add to customer's contacts
  |      * 
  |      * @param customerId
  |      * @return added Contact
  |      */
  |     private Contact addContact(Integer customerId) throws Exception {
  |         assert customerId != null;
  |         beginTx();
  |         try {
  |             final Customer customer = (Customer) getEnvironment().getSessionFactory().getCurrentSession().load(Customer.class, customerId);
  |             Contact contact = new Contact();
  |             contact.setName("contact name");
  |             contact.setTlf("wtf is tlf?");
  |             contact.setCustomer(customer);
  |             customer.getContacts().add(contact);
  |             //assuming contact is persisted via cascade from customer
  |             commitTx();
  |             return contact;
  |         } catch (Exception e) {
  |             rollbackTx();
  |             throw e;
  |         }
  |     }
  |     /**
  |      * remove existing 'contact' from customer's list of contacts
  |      * 
  |      * @param contact contact to remove from customer's contacts
  |      * @param customerId
  |      * @throws IllegalStateException if customer does not own a contact
  |      */
  |     private void removeContact(Integer customerId) throws Exception {
  |         assert customerId != null;
  |         beginTx();
  |         try {
  |             Customer customer = (Customer) getEnvironment().getSessionFactory().getCurrentSession().load(Customer.class, customerId);
  |             Set<Contact> contacts = customer.getContacts();
  |             if (contacts.size() != 1) {
  |                 throw new IllegalStateException("can't remove contact: customer id=" + customerId + " expected exactly 1 contact, " +
  |                         "actual count=" + contacts.size());
  |             }
  |             Contact contact = contacts.iterator().next();
  |             contacts.remove(contact);
  |             contact.setCustomer(null);
  |             //explicitly delete Contact because hbm has no 'DELETE_ORPHAN' cascade?
  |             //getEnvironment().getSessionFactory().getCurrentSession().delete(contact); //appears to not be needed
  |             //assuming contact is persisted via cascade from customer
  |             commitTx();
  |         } catch (Exception e) {
  |             rollbackTx();
  |             throw e;
  |         }
  |     }
  |     /**
  |      * @return the customerIDs
  |      */
  |     public Set<Integer> getCustomerIDs() {
  |         return customerIDs;
  |     }
  |     private String statusOfRunnersToString(Set<UserRunner> runners) {
  |         assert runners != null;
  |         StringBuilder sb = new StringBuilder("TEST CONFIG [userCount=" + USER_COUNT +
  |                 ", iterationsPerUser=" + ITERATION_COUNT +
  |                 ", thinkTimeMillis=" + THINK_TIME_MILLIS + "] " +
  |                 " STATE of UserRunners: ");
  |         for (UserRunner r : runners) {
  |             sb.append(r.toString() + System.getProperty("line.separator"));
  |         }
  |         return sb.toString();
  |     }
  |     class UserRunner implements Runnable {
  |         final private Integer customerId;
  |         private int completedIterations = 0;
  |         private Throwable causeOfFailure;
  |         public UserRunner(final Integer cId) {
  |             assert cId != null;
  |             this.customerId = cId;
  |         }
  |         private boolean contactExists() throws Exception {
  |             return getFirstContact(customerId) != null;
  |         }
  |         public void run() {
  |             //name this thread for easier log tracing
  |             Thread.currentThread().setName("UserRunnerThread-" + getCustomerId());
  |             try {
  |                 for (int i = 0; i < ITERATION_COUNT; i++) {
  |                     if (contactExists()) {
  |                         throw new IllegalStateException("contact already exists before add, customerId=" + customerId);
  |                     }
  |                     addContact(customerId);
  |                     if (!contactExists()) {
  |                         throw new IllegalStateException("contact missing after successful add, customerId=" + customerId);
  |                     }
  |                     thinkRandomTime();
  |                     //read everyone's contacts
  |                     readEveryonesFirstContact();
  |                     thinkRandomTime();
  |                     removeContact(customerId);
  |                     if (contactExists()) {
  |                         throw new IllegalStateException("contact still exists after successful remove call, customerId=" + customerId);
  |                     }
  |                     thinkRandomTime();
  |                     ++completedIterations;
  |                 }
  |             } catch (Throwable t) {
  |                 this.causeOfFailure = t;
  |                 TERMINATE_ALL_USERS = true;
  |                 //rollback current transaction if any
  |                 //really should not happen since above methods all follow begin-commit-rollback pattern
  |                 try {
  |                     if (DualNodeJtaTransactionManagerImpl.getInstance(DualNodeTestUtil.LOCAL).getTransaction() != null) {
  |                         DualNodeJtaTransactionManagerImpl.getInstance(DualNodeTestUtil.LOCAL).rollback();
  |                     }
  |                 } catch (SystemException ex) {
  |                     throw new RuntimeException("failed to rollback tx", ex);
  |                 }
  |             }
  |         }
  |         public boolean isSuccess() {
  |             return ITERATION_COUNT == getCompletedIterations();
  |         }
  |         public int getCompletedIterations() {
  |             return completedIterations;
  |         }
  |         public Throwable getCauseOfFailure() {
  |             return causeOfFailure;
  |         }
  |         public Integer getCustomerId() {
  |             return customerId;
  |         }
  |         @Override
  |         public String toString() {
  |             return super.toString() +
  |                     "[customerId=" + getCustomerId() +
  |                     " iterationsCompleted=" + getCompletedIterations() +
  |                     " completedAll=" + isSuccess() +
  |                     " causeOfFailure=" + (this.causeOfFailure != null ? ExceptionUtils.getStackTrace(causeOfFailure) : "") + "] ";
  |         }
  |     }
  |     /**
  |      * sleep between 0 and THINK_TIME_MILLIS.
  |      * @throws RuntimeException if sleep is interruped or TERMINATE_ALL_USERS flag was set to true in the meantime
  |      */
  |     private void thinkRandomTime() {
  |         try {
  |             Thread.sleep(random.nextInt(THINK_TIME_MILLIS));
  |         } catch (InterruptedException ex) {
  |             throw new RuntimeException("sleep interrupted", ex);
  |         }
  |         if (TERMINATE_ALL_USERS) {
  |             throw new RuntimeException("told to terminate (because a UserRunner had failed)");
  |         }
  |     }
  |     public static Test suite() {
  |         return new FunctionalTestClassTestSuite(MVCCConcurrentWriteTest.class);
  |     }
  | }

View the original post : http://www.jboss.org/index.html?module=bb&op=viewtopic&p=4215956#4215956

Reply to the post : http://www.jboss.org/index.html?module=bb&op=posting&mode=reply&p=4215956

More information about the jboss-user mailing list