OK, I wired up DualNodeJtaTransactionManagerImpl. The test now passes if UserCount is
below 4 or less, at 5 or above the test fails. However, the failure I am seeing with this
test is NOT the same as with the test in our environment. It is this:
anonymous wrote :
-------------------------------------------------------------------------------
| Test set: org.hibernate.test.cache.jbc2.functional.MVCCConcurrentWriteTest
| -------------------------------------------------------------------------------
| Tests run: 6, Failures: 1, Errors: 0, Skipped: 0, Time elapsed: 182.113 sec
<<< FAILURE!
| testManyUsers(org.hibernate.test.cache.jbc2.functional.MVCCConcurrentWriteTest) Time
elapsed: 165.831 sec <<< FAILURE!
| junit.framework.AssertionFailedError: Timed out waiting for user threads to finish.
Their state at the time of forced shutdown: TEST CONFIG [userCount=5,
iterationsPerUser=40, thinkTimeMillis=10] STATE of UserRunners:
org.hibernate.test.cache.jbc2.functional.MVCCConcurrentWriteTest$UserRunner@ffc3eb[customerId=4
iterationsCompleted=30 completedAll=false
causeOfFailure=org.hibernate.cache.CacheException: org.hibernate.cache.CacheException:
org.jboss.cache.lock.TimeoutException: Unable to acquire lock on Fqn
[/TS/test/org/hibernate/cache/UpdateTimestampsCache/Contacts] after [15000] milliseconds
for requestor [Thread[UserRunnerThread-4,5,main]]! Lock held by [GlobalTransaction::903]
| at
org.hibernate.cache.jbc2.timestamp.TimestampsRegionImpl.put(TimestampsRegionImpl.java:130)
| at
org.hibernate.cache.UpdateTimestampsCache.preinvalidate(UpdateTimestampsCache.java:70)
| at org.hibernate.engine.ActionQueue.execute(ActionQueue.java:275)
| at org.hibernate.engine.ActionQueue.executeActions(ActionQueue.java:263)
| at org.hibernate.engine.ActionQueue.executeActions(ActionQueue.java:167)
| at
org.hibernate.event.def.AbstractFlushingEventListener.performExecutions(AbstractFlushingEventListener.java:321)
| at
org.hibernate.event.def.DefaultFlushEventListener.onFlush(DefaultFlushEventListener.java:50)
| at org.hibernate.impl.SessionImpl.flush(SessionImpl.java:1027)
| at org.hibernate.impl.SessionImpl.managedFlush(SessionImpl.java:365)
| at
org.hibernate.transaction.CacheSynchronization.beforeCompletion(CacheSynchronization.java:88)
| at
org.hibernate.test.cache.jbc2.functional.util.DualNodeJtaTransactionImpl.commit(DualNodeJtaTransactionImpl.java:76)
| at
org.hibernate.test.cache.jbc2.functional.util.DualNodeJtaTransactionManagerImpl.commit(DualNodeJtaTransactionManagerImpl.java:123)
| at
org.hibernate.test.cache.jbc2.functional.MVCCConcurrentWriteTest.commitTx(MVCCConcurrentWriteTest.java:255)
| at
org.hibernate.test.cache.jbc2.functional.MVCCConcurrentWriteTest.addContact(MVCCConcurrentWriteTest.java:333)
| at
org.hibernate.test.cache.jbc2.functional.MVCCConcurrentWriteTest.access$100(MVCCConcurrentWriteTest.java:29)
| at
org.hibernate.test.cache.jbc2.functional.MVCCConcurrentWriteTest$UserRunner.run(MVCCConcurrentWriteTest.java:423)
| at
java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
| at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
| at java.lang.Thread.run(Thread.java:619)
| Caused by: org.hibernate.cache.CacheException: org.jboss.cache.lock.TimeoutException:
Unable to acquire lock on Fqn
[/TS/test/org/hibernate/cache/UpdateTimestampsCache/Contacts] after [15000] milliseconds
for requestor [Thread[UserRunnerThread-4,5,main]]! Lock held by [GlobalTransaction::903]
| at org.hibernate.cache.jbc2.util.CacheHelper.put(CacheHelper.java:214)
| at
org.hibernate.cache.jbc2.timestamp.TimestampsRegionImpl.put(TimestampsRegionImpl.java:128)
| ... 18 more]
|
org.hibernate.test.cache.jbc2.functional.MVCCConcurrentWriteTest$UserRunner@18bbb61[customerId=6
iterationsCompleted=28 completedAll=false causeOfFailure=]
|
org.hibernate.test.cache.jbc2.functional.MVCCConcurrentWriteTest$UserRunner@1cab18[customerId=2
iterationsCompleted=28 completedAll=false causeOfFailure=]
|
org.hibernate.test.cache.jbc2.functional.MVCCConcurrentWriteTest$UserRunner@9cd8db[customerId=3
iterationsCompleted=30 completedAll=false causeOfFailure=]
|
|
To repro:
-set userCount to 5 or more
-run the test and wait patiently for up to 5 minutes (not sure why, as with < 5 User
threads this test finished in 3 seconds).
The fact that this test isn't using a real JTA tx manager is a problem IMO - as the
above exception seems to do more with TX cleanup (though I'd be happy to be proven
wrong) . Even if the above bug is worked around and the test starts passing, it won't
prove there isn't a problem since a crucial piece of the stack is replaced with a
simpler impl.
Here's the updated source:
package org.hibernate.test.cache.jbc2.functional;
|
| import java.util.HashSet;
| import java.util.Random;
| import java.util.Set;
| import java.util.concurrent.ExecutorService;
| import java.util.concurrent.Executors;
| import java.util.concurrent.TimeUnit;
| import javax.transaction.SystemException;
| import junit.framework.Test;
| import org.hibernate.FlushMode;
| import org.hibernate.Session;
| import org.hibernate.cfg.Configuration;
| import org.hibernate.exception.ExceptionUtils;
| import org.hibernate.junit.functional.FunctionalTestClassTestSuite;
| import org.hibernate.stat.SecondLevelCacheStatistics;
| import org.hibernate.test.cache.jbc2.functional.util.DualNodeConnectionProviderImpl;
| import
org.hibernate.test.cache.jbc2.functional.util.DualNodeJtaTransactionManagerImpl;
| import org.hibernate.test.cache.jbc2.functional.util.DualNodeTestUtil;
| import
org.hibernate.test.cache.jbc2.functional.util.DualNodeTransactionManagerLookup;
| import org.hibernate.transaction.CMTTransactionFactory;
| import org.slf4j.Logger;
| import org.slf4j.LoggerFactory;
|
| /**
| *
| * @author nikita_tovstoles(a)mba.berkeley.edu
| */
| public class MVCCConcurrentWriteTest extends MVCCJBossCacheTest {
|
| private static final Logger LOG =
LoggerFactory.getLogger(MVCCConcurrentWriteTest.class);
| /**
| * when USER_COUNT==1, tests pass, when >4 tests fail
| */
| final private int USER_COUNT = 5;
| final private int ITERATION_COUNT = 40;
| final private int THINK_TIME_MILLIS = 10;
| final private long LAUNCH_INTERVAL_MILLIS = 10;
| final private Random random = new Random();
| /**
| * kill switch used to stop all users when one fails
| */
| private static boolean TERMINATE_ALL_USERS = false;
| /**
| * collection of IDs of all customers participating in this test
| */
| private Set<Integer> customerIDs = new HashSet<Integer>();
|
| public MVCCConcurrentWriteTest(String x) {
| super(x);
| }
|
| /**
| * test that DB can be queried
| * @throws java.lang.Exception
| */
| public void testPingDb() throws Exception {
| try {
| beginTx();
|
getEnvironment().getSessionFactory().getCurrentSession().createQuery("from " +
Customer.class.getName()).list();
| commitTx();
| } catch (Exception e) {
| rollbackTx();
| fail("failed to query DB; exception=" + e);
| }
| }
|
| @Override
| protected void prepareTest() throws Exception {
| super.prepareTest();
| }
|
| @Override
| protected void cleanupTest() throws Exception {
| try {
| super.cleanupTest();
|
| } finally {
| cleanup();
| //DualNodeJtaTransactionManagerImpl.cleanupTransactions();
| //DualNodeJtaTransactionManagerImpl.cleanupTransactionManagers();
| }
| }
|
| @Override
| public void configure(Configuration cfg) {
| super.configure(cfg);
| cfg.setProperty(DualNodeTestUtil.NODE_ID_PROP, DualNodeTestUtil.LOCAL);
| }
|
| @Override
| protected Class getConnectionProviderClass() {
| return DualNodeConnectionProviderImpl.class;
| }
|
| @Override
| protected Class getTransactionManagerLookupClass() {
| return DualNodeTransactionManagerLookup.class;
| }
|
| @Override
| protected Class getTransactionFactoryClass() {
| return CMTTransactionFactory.class;
| }
|
| @Override
| public void testEmptySecondLevelCacheEntry() throws Exception {
| //do nothing
| }
|
| @Override
| public void testQueryCacheInvalidation() {
| //do nothing
| }
|
| @Override
| public void testStaleWritesLeaveCacheConsistent() {
| //do nothing
| }
|
| public void testSingleUser() throws Exception {
| //setup
| Customer customer = createCustomer(0);
| final Integer customerId = customer.getId();
| getCustomerIDs().add(customerId);
|
| assertNull("contact exists despite not being added",
getFirstContact(customerId));
|
| //check that cache was hit
| SecondLevelCacheStatistics customerSlcs =
getEnvironment().getSessionFactory().getStatistics().getSecondLevelCacheStatistics(
| getPrefixedRegionName(Customer.class.getName()));
| assertEquals(customerSlcs.getPutCount(), 1);
| assertEquals(customerSlcs.getElementCountInMemory(), 1);
| assertEquals(customerSlcs.getEntries().size(), 1);
|
| SecondLevelCacheStatistics contactsCollectionSlcs =
getEnvironment().getSessionFactory().getStatistics().getSecondLevelCacheStatistics(
| getPrefixedRegionName(Customer.class.getName() +
".contacts"));
| assertEquals(1, contactsCollectionSlcs.getPutCount());
| assertEquals(1, contactsCollectionSlcs.getElementCountInMemory());
| assertEquals(1, contactsCollectionSlcs.getEntries().size());
|
| final Contact contact = addContact(customerId);
| assertNotNull("contact returned by addContact is null", contact);
| assertEquals("Customer.contacts cache was not invalidated after
addContact",
| 0, contactsCollectionSlcs.getElementCountInMemory());
|
| assertNotNull("Contact missing after successful add call",
getFirstContact(customerId));
|
|
| //read everyone's contacts
| readEveryonesFirstContact();
|
| removeContact(customerId);
| assertNull("contact still exists after successful remove call",
getFirstContact(customerId));
|
| }
|
| public void testManyUsers() throws Exception {
|
| //setup - create users
| for (int i = 0; i < USER_COUNT; i++) {
| Customer customer = createCustomer(0);
| getCustomerIDs().add(customer.getId());
| }
|
| assertEquals("failed to create enough Customers", USER_COUNT,
getCustomerIDs().size());
|
| final ExecutorService pool = Executors.newFixedThreadPool(USER_COUNT);
|
| Set<UserRunner> runners = new HashSet<UserRunner>();
| for (Integer customerId : getCustomerIDs()) {
| UserRunner r = new UserRunner(customerId);
| runners.add(r);
| pool.execute(r);
| LOG.info("launched " + r);
| Thread.sleep(LAUNCH_INTERVAL_MILLIS); //rampup
| }
|
| assertEquals("not all user threads launched", USER_COUNT,
runners.size());
|
| pool.shutdown();
| boolean finishedInTime = pool.awaitTermination(120, TimeUnit.SECONDS);
|
| if (!finishedInTime) { //timed out waiting for users to finish
| fail("Timed out waiting for user threads to finish. Their state at
the time of forced shutdown: " + statusOfRunnersToString(runners));
| } else {
| //if here -> pool finished before timing out
| //check whether all runners suceeded
| boolean success = true;
| for (UserRunner r : runners) {
| if (!r.isSuccess()) {
| success = false;
| break;
| }
| }
| assertTrue("at least one UserRunner failed: " +
statusOfRunnersToString(runners), success);
| }
| }
|
| public void cleanup() throws Exception {
|
| getCustomerIDs().clear();
|
| String deleteContactHQL = "delete from Contact";
| String deleteCustomerHQL = "delete from Customer";
|
| beginTx();
| try {
|
| //Session session = getSessions().getCurrentSession();
| Session session =
getEnvironment().getSessionFactory().getCurrentSession();
|
session.createQuery(deleteContactHQL).setFlushMode(FlushMode.AUTO).executeUpdate();
|
session.createQuery(deleteCustomerHQL).setFlushMode(FlushMode.AUTO).executeUpdate();
| commitTx();
| } catch (Exception e) {
| rollbackTx();
| throw e;
| }
|
| }
|
| private Customer createCustomer(int nameSuffix) throws Exception {
| beginTx();
| try {
| Customer customer = new Customer();
| customer.setName("customer_" + nameSuffix);
| customer.setContacts(new HashSet<Contact>());
|
|
getEnvironment().getSessionFactory().getCurrentSession().persist(customer);
| commitTx();
| return customer;
| } catch (Exception e) {
| rollbackTx();
| throw e;
| }
| }
|
| /**
| * delegate method since I'm trying to figure out which txManager to use
| * given that this test runs multiple threads (SimpleJtaTxMgrImpl isn't suited
for that).
| *
| * What is needed is a thread-safe JTATransactionManager impl that can handle
concurrent TXs
| *
| * @throws java.lang.Exception
| */
| private void beginTx() throws Exception {
|
DualNodeJtaTransactionManagerImpl.getInstance(DualNodeTestUtil.LOCAL).begin();
| }
|
| /**
| * @see #beginTx()
| * @throws java.lang.Exception
| */
| private void commitTx() throws Exception {
|
DualNodeJtaTransactionManagerImpl.getInstance(DualNodeTestUtil.LOCAL).commit();
| }
|
| /**
| * @see #beginTx()
| * @throws java.lang.Exception
| */
| private void rollbackTx() throws Exception {
|
DualNodeJtaTransactionManagerImpl.getInstance(DualNodeTestUtil.LOCAL).rollback();
| }
|
| /**
| * read first contact of every Customer participating in this test.
| * this forces concurrent cache writes of Customer.contacts Collection cache node
| *
| * @return who cares
| * @throws java.lang.Exception
| */
| private void readEveryonesFirstContact() throws Exception {
| beginTx();
| try {
| for (Integer customerId : getCustomerIDs()) {
| final Customer customer = (Customer)
getEnvironment().getSessionFactory().getCurrentSession().load(Customer.class,
customerId);
| Set<Contact> contacts = customer.getContacts();
| Contact firstContact = contacts.isEmpty() ? null :
contacts.iterator().next();
| }
| commitTx();
| } catch (Exception e) {
| rollbackTx();
| throw e;
| }
| }
|
| /**
| * -load existing Customer
| * -get customer's contacts; return 1st one
| *
| * @param customerId
| * @return first Contact or null if customer has none
| */
| private Contact getFirstContact(Integer customerId) throws Exception {
| assert customerId != null;
|
| beginTx();
| try {
| final Customer customer = (Customer)
getEnvironment().getSessionFactory().getCurrentSession().load(Customer.class,
customerId);
| Set<Contact> contacts = customer.getContacts();
| Contact firstContact = contacts.isEmpty() ? null :
contacts.iterator().next();
| commitTx();
| return firstContact;
| } catch (Exception e) {
| rollbackTx();
| throw e;
| }
| }
|
| /**
| * -load existing Customer
| * -create a new Contact and add to customer's contacts
| *
| * @param customerId
| * @return added Contact
| */
| private Contact addContact(Integer customerId) throws Exception {
| assert customerId != null;
|
| beginTx();
| try {
| final Customer customer = (Customer)
getEnvironment().getSessionFactory().getCurrentSession().load(Customer.class,
customerId);
|
| Contact contact = new Contact();
| contact.setName("contact name");
| contact.setTlf("wtf is tlf?");
|
| contact.setCustomer(customer);
| customer.getContacts().add(contact);
|
| //assuming contact is persisted via cascade from customer
| commitTx();
|
| return contact;
| } catch (Exception e) {
| rollbackTx();
| throw e;
| }
| }
|
| /**
| * remove existing 'contact' from customer's list of contacts
| *
| * @param contact contact to remove from customer's contacts
| * @param customerId
| * @throws IllegalStateException if customer does not own a contact
| */
| private void removeContact(Integer customerId) throws Exception {
| assert customerId != null;
|
| beginTx();
| try {
| Customer customer = (Customer)
getEnvironment().getSessionFactory().getCurrentSession().load(Customer.class,
customerId);
| Set<Contact> contacts = customer.getContacts();
| if (contacts.size() != 1) {
| throw new IllegalStateException("can't remove contact:
customer id=" + customerId + " expected exactly 1 contact, " +
| "actual count=" + contacts.size());
| }
|
| Contact contact = contacts.iterator().next();
| contacts.remove(contact);
| contact.setCustomer(null);
|
| //explicitly delete Contact because hbm has no 'DELETE_ORPHAN'
cascade?
|
//getEnvironment().getSessionFactory().getCurrentSession().delete(contact); //appears to
not be needed
|
| //assuming contact is persisted via cascade from customer
| commitTx();
| } catch (Exception e) {
| rollbackTx();
| throw e;
| }
| }
|
| /**
| * @return the customerIDs
| */
| public Set<Integer> getCustomerIDs() {
| return customerIDs;
| }
|
| private String statusOfRunnersToString(Set<UserRunner> runners) {
| assert runners != null;
|
| StringBuilder sb = new StringBuilder("TEST CONFIG [userCount=" +
USER_COUNT +
| ", iterationsPerUser=" + ITERATION_COUNT +
| ", thinkTimeMillis=" + THINK_TIME_MILLIS + "] " +
| " STATE of UserRunners: ");
|
| for (UserRunner r : runners) {
| sb.append(r.toString() + System.getProperty("line.separator"));
| }
| return sb.toString();
| }
|
| class UserRunner implements Runnable {
|
| final private Integer customerId;
| private int completedIterations = 0;
| private Throwable causeOfFailure;
|
| public UserRunner(final Integer cId) {
| assert cId != null;
| this.customerId = cId;
| }
|
| private boolean contactExists() throws Exception {
| return getFirstContact(customerId) != null;
| }
|
| public void run() {
|
| //name this thread for easier log tracing
| Thread.currentThread().setName("UserRunnerThread-" +
getCustomerId());
| try {
| for (int i = 0; i < ITERATION_COUNT; i++) {
|
| if (contactExists()) {
| throw new IllegalStateException("contact already exists
before add, customerId=" + customerId);
| }
|
| addContact(customerId);
|
| if (!contactExists()) {
| throw new IllegalStateException("contact missing after
successful add, customerId=" + customerId);
| }
|
| thinkRandomTime();
|
| //read everyone's contacts
| readEveryonesFirstContact();
|
| thinkRandomTime();
|
| removeContact(customerId);
|
| if (contactExists()) {
| throw new IllegalStateException("contact still exists
after successful remove call, customerId=" + customerId);
| }
|
| thinkRandomTime();
|
| ++completedIterations;
| }
|
| } catch (Throwable t) {
|
| this.causeOfFailure = t;
| TERMINATE_ALL_USERS = true;
|
| //rollback current transaction if any
| //really should not happen since above methods all follow
begin-commit-rollback pattern
| try {
| if
(DualNodeJtaTransactionManagerImpl.getInstance(DualNodeTestUtil.LOCAL).getTransaction() !=
null) {
|
DualNodeJtaTransactionManagerImpl.getInstance(DualNodeTestUtil.LOCAL).rollback();
| }
| } catch (SystemException ex) {
| throw new RuntimeException("failed to rollback tx",
ex);
| }
| }
| }
|
| public boolean isSuccess() {
| return ITERATION_COUNT == getCompletedIterations();
| }
|
| public int getCompletedIterations() {
| return completedIterations;
| }
|
| public Throwable getCauseOfFailure() {
| return causeOfFailure;
| }
|
| public Integer getCustomerId() {
| return customerId;
| }
|
| @Override
| public String toString() {
| return super.toString() +
| "[customerId=" + getCustomerId() +
| " iterationsCompleted=" + getCompletedIterations() +
| " completedAll=" + isSuccess() +
| " causeOfFailure=" + (this.causeOfFailure != null ?
ExceptionUtils.getStackTrace(causeOfFailure) : "") + "] ";
| }
| }
|
| /**
| * sleep between 0 and THINK_TIME_MILLIS.
| * @throws RuntimeException if sleep is interruped or TERMINATE_ALL_USERS flag was
set to true in the meantime
| */
| private void thinkRandomTime() {
| try {
| Thread.sleep(random.nextInt(THINK_TIME_MILLIS));
| } catch (InterruptedException ex) {
| throw new RuntimeException("sleep interrupted", ex);
| }
|
| if (TERMINATE_ALL_USERS) {
| throw new RuntimeException("told to terminate (because a UserRunner
had failed)");
| }
| }
|
| public static Test suite() {
| return new FunctionalTestClassTestSuite(MVCCConcurrentWriteTest.class);
| }
| }
|
View the original post :
http://www.jboss.org/index.html?module=bb&op=viewtopic&p=4215956#...
Reply to the post :
http://www.jboss.org/index.html?module=bb&op=posting&mode=reply&a...