[hornetq-commits] JBoss hornetq SVN: r9137 - in trunk: src/main/org/hornetq/core/server/impl and 7 other directories.
do-not-reply at jboss.org
do-not-reply at jboss.org
Fri Apr 16 12:46:09 EDT 2010
Author: ataylor
Date: 2010-04-16 12:46:08 -0400 (Fri, 16 Apr 2010)
New Revision: 9137
Modified:
trunk/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java
trunk/src/main/org/hornetq/core/client/impl/ClientConsumerInternal.java
trunk/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
trunk/src/main/org/hornetq/core/transaction/ResourceManager.java
trunk/src/main/org/hornetq/core/transaction/Transaction.java
trunk/src/main/org/hornetq/core/transaction/impl/ResourceManagerImpl.java
trunk/src/main/org/hornetq/core/transaction/impl/TransactionImpl.java
trunk/src/main/org/hornetq/ra/HornetQRAProperties.java
trunk/src/main/org/hornetq/ra/HornetQResourceAdapter.java
trunk/src/main/org/hornetq/ra/inflow/HornetQActivation.java
trunk/src/main/org/hornetq/ra/inflow/HornetQActivationSpec.java
trunk/src/main/org/hornetq/ra/inflow/HornetQMessageHandler.java
trunk/tests/src/org/hornetq/tests/integration/xa/XaTimeoutTest.java
trunk/tests/src/org/hornetq/tests/unit/core/client/impl/LargeMessageBufferTest.java
trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/BindingsImplTest.java
Log:
https://jira.jboss.org/jira/browse/HORNETQ-363 - fixed tx timeout and RA
Modified: trunk/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java 2010-04-16 14:45:28 UTC (rev 9136)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java 2010-04-16 16:46:08 UTC (rev 9137)
@@ -369,7 +369,7 @@
// if unsetting a previous handler may be in onMessage so wait for completion
else if (handler == null && !noPreviousHandler)
{
- waitForOnMessageToComplete();
+ waitForOnMessageToComplete(true);
}
}
@@ -397,8 +397,13 @@
public void stop() throws HornetQException
{
- waitForOnMessageToComplete();
+ stop(true);
+ }
+ public void stop(final boolean waitForOnMessage) throws HornetQException
+ {
+ waitForOnMessageToComplete(waitForOnMessage);
+
synchronized (this)
{
if (stopped)
@@ -552,7 +557,7 @@
currentLargeMessageBuffer.addPacket(chunk);
}
- public void clear() throws HornetQException
+ public void clear(boolean waitForOnMessage) throws HornetQException
{
synchronized (this)
{
@@ -572,7 +577,7 @@
// Need to send credits for the messages in the buffer
- waitForOnMessageToComplete();
+ waitForOnMessageToComplete(waitForOnMessage);
}
public int getClientWindowSize()
@@ -723,14 +728,14 @@
channel.send(new SessionConsumerFlowCreditMessage(id, credits));
}
- private void waitForOnMessageToComplete()
+ private void waitForOnMessageToComplete(boolean waitForOnMessage)
{
if (handler == null)
{
return;
}
- if (Thread.currentThread() == onMessageThread)
+ if (!waitForOnMessage || Thread.currentThread() == onMessageThread)
{
// If called from inside onMessage then return immediately - otherwise would block
return;
@@ -855,7 +860,7 @@
closing = true;
// Now we wait for any current handler runners to run.
- waitForOnMessageToComplete();
+ waitForOnMessageToComplete(true);
if (currentLargeMessageBuffer != null)
{
Modified: trunk/src/main/org/hornetq/core/client/impl/ClientConsumerInternal.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ClientConsumerInternal.java 2010-04-16 14:45:28 UTC (rev 9136)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientConsumerInternal.java 2010-04-16 16:46:08 UTC (rev 9137)
@@ -46,7 +46,7 @@
void flowControl(final int messageBytes, final boolean discountSlowConsumer) throws HornetQException;
- void clear() throws HornetQException;
+ void clear(boolean waitForOnMessage) throws HornetQException;
void clearAtFailover();
@@ -62,6 +62,8 @@
void stop() throws HornetQException;
+ void stop(boolean waitForOnMessage) throws HornetQException;
+
void start();
SessionQueueQueryResponseMessage getQueueInfo();
Modified: trunk/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java 2010-04-16 14:45:28 UTC (rev 9136)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java 2010-04-16 16:46:08 UTC (rev 9137)
@@ -560,7 +560,7 @@
// We need to make sure we don't get any inflight messages
for (ClientConsumerInternal consumer : consumers.values())
{
- consumer.clear();
+ consumer.clear(true);
}
// Acks must be flushed here *after connection is stopped and all onmessages finished executing
@@ -639,19 +639,7 @@
public void stop() throws HornetQException
{
- checkClosed();
-
- if (started)
- {
- for (ClientConsumerInternal clientConsumerInternal : consumers.values())
- {
- clientConsumerInternal.stop();
- }
-
- channel.sendBlocking(new PacketImpl(PacketImpl.SESS_STOP));
-
- started = false;
- }
+ stop(true);
}
public void addFailureListener(final SessionFailureListener listener)
@@ -1380,13 +1368,13 @@
if (wasStarted)
{
- stop();
+ stop(false);
}
// We need to make sure we don't get any inflight messages
for (ClientConsumerInternal consumer : consumers.values())
{
- consumer.clear();
+ consumer.clear(false);
}
flushAcks();
@@ -1712,6 +1700,23 @@
}
}
+ public void stop(final boolean waitForOnMessage) throws HornetQException
+ {
+ checkClosed();
+
+ if (started)
+ {
+ for (ClientConsumerInternal clientConsumerInternal : consumers.values())
+ {
+ clientConsumerInternal.stop(waitForOnMessage);
+ }
+
+ channel.sendBlocking(new PacketImpl(PacketImpl.SESS_STOP));
+
+ started = false;
+ }
+ }
+
private static class BindingQueryImpl implements BindingQuery
{
Modified: trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2010-04-16 14:45:28 UTC (rev 9136)
+++ trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2010-04-16 16:46:08 UTC (rev 9137)
@@ -136,6 +136,8 @@
private volatile SimpleString defaultAddress;
+ private volatile int timeoutSeconds;
+
// Constructors ---------------------------------------------------------------------------------
public ServerSessionImpl(final String name,
@@ -180,9 +182,11 @@
this.securityStore = securityStore;
+ timeoutSeconds = resourceManager.getTimeoutSeconds();
+
if (!xa)
{
- tx = new TransactionImpl(storageManager);
+ tx = new TransactionImpl(storageManager, timeoutSeconds);
}
this.xa = xa;
@@ -558,7 +562,7 @@
}
finally
{
- tx = new TransactionImpl(storageManager);
+ tx = new TransactionImpl(storageManager, timeoutSeconds);
}
}
@@ -568,12 +572,12 @@
{
// Might be null if XA
- tx = new TransactionImpl(storageManager);
+ tx = new TransactionImpl(storageManager, timeoutSeconds);
}
doRollback(considerLastMessageAsDelivered, tx);
- tx = new TransactionImpl(storageManager);
+ tx = new TransactionImpl(storageManager, timeoutSeconds);
}
public void xaCommit(final Xid xid, final boolean onePhase) throws Exception
@@ -809,7 +813,7 @@
}
else
{
- tx = new TransactionImpl(xid, storageManager, postOffice);
+ tx = new TransactionImpl(xid, storageManager, timeoutSeconds);
boolean added = resourceManager.putTransaction(xid, tx);
@@ -898,7 +902,11 @@
public void xaSetTimeout(final int timeout)
{
- resourceManager.setTimeoutSeconds(timeout);
+ timeoutSeconds = timeout;
+ if(tx != null)
+ {
+ tx.setTimeout(timeout);
+ }
}
public void start()
Modified: trunk/src/main/org/hornetq/core/transaction/ResourceManager.java
===================================================================
--- trunk/src/main/org/hornetq/core/transaction/ResourceManager.java 2010-04-16 14:45:28 UTC (rev 9136)
+++ trunk/src/main/org/hornetq/core/transaction/ResourceManager.java 2010-04-16 16:46:08 UTC (rev 9137)
@@ -37,8 +37,6 @@
int getTimeoutSeconds();
- boolean setTimeoutSeconds(int timeoutSeconds);
-
List<Xid> getPreparedTransactions();
Map<Xid, Long> getPreparedTransactionsWithCreationTime();
Modified: trunk/src/main/org/hornetq/core/transaction/Transaction.java
===================================================================
--- trunk/src/main/org/hornetq/core/transaction/Transaction.java 2010-04-16 14:45:28 UTC (rev 9136)
+++ trunk/src/main/org/hornetq/core/transaction/Transaction.java 2010-04-16 16:46:08 UTC (rev 9137)
@@ -55,12 +55,16 @@
void removeOperation(TransactionOperation sync);
+ boolean hasTimedOut(long currentTime, int defaultTimeout);
+
void putProperty(int index, Object property);
Object getProperty(int index);
void setContainsPersistent();
+ void setTimeout(int timeout);
+
static enum State
{
ACTIVE, PREPARED, COMMITTED, ROLLEDBACK, SUSPENDED, ROLLBACK_ONLY
Modified: trunk/src/main/org/hornetq/core/transaction/impl/ResourceManagerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/transaction/impl/ResourceManagerImpl.java 2010-04-16 14:45:28 UTC (rev 9136)
+++ trunk/src/main/org/hornetq/core/transaction/impl/ResourceManagerImpl.java 2010-04-16 16:46:08 UTC (rev 9137)
@@ -49,8 +49,6 @@
private final int defaultTimeoutSeconds;
- private volatile int timeoutSeconds;
-
private boolean started = false;
private TxTimeoutHandler task;
@@ -64,7 +62,6 @@
final ScheduledExecutorService scheduledThreadPool)
{
this.defaultTimeoutSeconds = defaultTimeoutSeconds;
- timeoutSeconds = defaultTimeoutSeconds;
this.txTimeoutScanPeriod = txTimeoutScanPeriod;
this.scheduledThreadPool = scheduledThreadPool;
}
@@ -125,24 +122,9 @@
public int getTimeoutSeconds()
{
- return timeoutSeconds;
+ return defaultTimeoutSeconds;
}
- public boolean setTimeoutSeconds(final int timeoutSeconds)
- {
- if (timeoutSeconds == 0)
- {
- // reset to default
- this.timeoutSeconds = defaultTimeoutSeconds;
- }
- else
- {
- this.timeoutSeconds = timeoutSeconds;
- }
-
- return true;
- }
-
public List<Xid> getPreparedTransactions()
{
List<Xid> xids = new ArrayList<Xid>();
@@ -231,7 +213,7 @@
for (Transaction tx : transactions.values())
{
- if (tx.getState() != Transaction.State.PREPARED && now > tx.getCreateTime() + timeoutSeconds * 1000)
+ if (tx.hasTimedOut(now, defaultTimeoutSeconds))
{
transactions.remove(tx.getXid());
ResourceManagerImpl.log.warn("transaction with xid " + tx.getXid() + " timed out");
Modified: trunk/src/main/org/hornetq/core/transaction/impl/TransactionImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/transaction/impl/TransactionImpl.java 2010-04-16 14:45:28 UTC (rev 9136)
+++ trunk/src/main/org/hornetq/core/transaction/impl/TransactionImpl.java 2010-04-16 16:46:08 UTC (rev 9137)
@@ -59,6 +59,21 @@
private volatile boolean containsPersistent;
+ private int timeoutSeconds = -1;
+
+ public TransactionImpl(final StorageManager storageManager, final int timeoutSeconds)
+ {
+ this.storageManager = storageManager;
+
+ xid = null;
+
+ id = storageManager.generateUniqueID();
+
+ createTime = System.currentTimeMillis();
+
+ this.timeoutSeconds = timeoutSeconds;
+ }
+
public TransactionImpl(final StorageManager storageManager)
{
this.storageManager = storageManager;
@@ -70,7 +85,7 @@
createTime = System.currentTimeMillis();
}
- public TransactionImpl(final Xid xid, final StorageManager storageManager, final PostOffice postOffice)
+ public TransactionImpl(final Xid xid, final StorageManager storageManager, final int timeoutSeconds)
{
this.storageManager = storageManager;
@@ -79,6 +94,8 @@
id = storageManager.generateUniqueID();
createTime = System.currentTimeMillis();
+
+ this.timeoutSeconds = timeoutSeconds;
}
public TransactionImpl(final long id, final Xid xid, final StorageManager storageManager)
@@ -100,6 +117,11 @@
containsPersistent = true;
}
+ public void setTimeout(final int timeout)
+ {
+ this.timeoutSeconds = timeout;
+ }
+
public long getID()
{
return id;
@@ -110,6 +132,18 @@
return createTime;
}
+ public boolean hasTimedOut(final long currentTime,final int defaultTimeout)
+ {
+ if(timeoutSeconds == - 1)
+ {
+ return getState() != Transaction.State.PREPARED && currentTime > createTime + defaultTimeout * 1000;
+ }
+ else
+ {
+ return getState() != Transaction.State.PREPARED && currentTime > createTime + timeoutSeconds * 1000;
+ }
+ }
+
public void prepare() throws Exception
{
synchronized (timeoutLock)
Modified: trunk/src/main/org/hornetq/ra/HornetQRAProperties.java
===================================================================
--- trunk/src/main/org/hornetq/ra/HornetQRAProperties.java 2010-04-16 14:45:28 UTC (rev 9136)
+++ trunk/src/main/org/hornetq/ra/HornetQRAProperties.java 2010-04-16 16:46:08 UTC (rev 9137)
@@ -41,9 +41,6 @@
/** The password */
private String password;
- /** Use XA */
- private Boolean useXA;
-
/** Use Local TX instead of XA */
private Boolean localTx = false;
@@ -142,52 +139,12 @@
this.localTx = localTx;
}
- /**
- * Get the use XA flag
- * @return The value
- */
- public Boolean getUseXA()
- {
- if (HornetQRAProperties.trace)
- {
- HornetQRAProperties.log.trace("getUseXA()");
- }
- return useXA;
- }
-
- /**
- * Set the use XA flag
- * @param xa The value
- */
- public void setUseXA(final Boolean xa)
- {
- if (HornetQRAProperties.trace)
- {
- HornetQRAProperties.log.trace("setUseXA(" + xa + ")");
- }
-
- useXA = xa;
- }
-
- /**
- * Use XA for communication
- * @return The value
- */
- public boolean isUseXA()
- {
- if (HornetQRAProperties.trace)
- {
- HornetQRAProperties.log.trace("isUseXA()");
- }
-
- return useXA != null && useXA;
- }
@Override
public String toString()
{
- return "HornetQRAProperties[useXA=" + useXA + ", localTx=" + localTx +
+ return "HornetQRAProperties[localTx=" + localTx +
", userName=" + userName + ", password=" + password + "]";
}
}
Modified: trunk/src/main/org/hornetq/ra/HornetQResourceAdapter.java
===================================================================
--- trunk/src/main/org/hornetq/ra/HornetQResourceAdapter.java 2010-04-16 14:45:28 UTC (rev 9136)
+++ trunk/src/main/org/hornetq/ra/HornetQResourceAdapter.java 2010-04-16 16:46:08 UTC (rev 9137)
@@ -1186,37 +1186,8 @@
raProperties.setUseLocalTx(localTx);
}
- /**
- * Get the use XA flag
- *
- * @return The value
- */
- public Boolean getUseXA()
- {
- if (HornetQResourceAdapter.trace)
- {
- HornetQResourceAdapter.log.trace("getUseXA()");
- }
- return raProperties.getUseXA();
- }
-
/**
- * Set the use XA flag
- *
- * @param xa The value
- */
- public void setUseXA(final Boolean xa)
- {
- if (HornetQResourceAdapter.trace)
- {
- HornetQResourceAdapter.log.trace("setUseXA(" + xa + ")");
- }
-
- raProperties.setUseXA(xa);
- }
-
- /**
* Indicates whether some other object is "equal to" this one.
*
* @param obj Object with which to compare
@@ -1287,7 +1258,8 @@
final Integer dupsOkBatchSize,
final Integer transactionBatchSize,
final boolean deliveryTransacted,
- final boolean useLocalTx) throws Exception
+ final boolean useLocalTx,
+ final Integer txTimeout) throws Exception
{
ClientSession result;
Modified: trunk/src/main/org/hornetq/ra/inflow/HornetQActivation.java
===================================================================
--- trunk/src/main/org/hornetq/ra/inflow/HornetQActivation.java 2010-04-16 14:45:28 UTC (rev 9136)
+++ trunk/src/main/org/hornetq/ra/inflow/HornetQActivation.java 2010-04-16 16:46:08 UTC (rev 9137)
@@ -353,7 +353,8 @@
ra.getDupsOKBatchSize(),
ra.getTransactionBatchSize(),
isDeliveryTransacted,
- spec.isUseLocalTx());
+ spec.isUseLocalTx(),
+ spec.getTransactionTimeout());
HornetQActivation.log.debug("Using queue connection " + result);
Modified: trunk/src/main/org/hornetq/ra/inflow/HornetQActivationSpec.java
===================================================================
--- trunk/src/main/org/hornetq/ra/inflow/HornetQActivationSpec.java 2010-04-16 14:45:28 UTC (rev 9136)
+++ trunk/src/main/org/hornetq/ra/inflow/HornetQActivationSpec.java 2010-04-16 16:46:08 UTC (rev 9137)
@@ -92,6 +92,10 @@
/* use local tx instead of XA*/
private Boolean localTx;
+
+ private String transactionManagerLocatorClass = "org.hornetq.integration.jboss.tm.JBoss5TransactionManagerLocator";
+
+ private String transactionManagerLocatorMethod = "getTm";
/**
* Constructor
@@ -644,6 +648,27 @@
this.localTx = localTx;
}
+
+ public void setTransactionManagerLocatorClass(final String transactionManagerLocatorClass)
+ {
+ this.transactionManagerLocatorClass = transactionManagerLocatorClass;
+ }
+
+ public String getTransactionManagerLocatorClass()
+ {
+ return transactionManagerLocatorClass;
+ }
+
+ public String getTransactionManagerLocatorMethod()
+ {
+ return transactionManagerLocatorMethod;
+ }
+
+ public void setTransactionManagerLocatorMethod(final String transactionManagerLocatorMethod)
+ {
+ this.transactionManagerLocatorMethod = transactionManagerLocatorMethod;
+ }
+
/**
* Validate
* @exception InvalidPropertyException Thrown if a validation exception occurs
@@ -755,6 +780,7 @@
public void setReconnectInterval(long interval)
{
}
-
-
+
+
+
}
Modified: trunk/src/main/org/hornetq/ra/inflow/HornetQMessageHandler.java
===================================================================
--- trunk/src/main/org/hornetq/ra/inflow/HornetQMessageHandler.java 2010-04-16 14:45:28 UTC (rev 9136)
+++ trunk/src/main/org/hornetq/ra/inflow/HornetQMessageHandler.java 2010-04-16 16:46:08 UTC (rev 9137)
@@ -12,6 +12,7 @@
*/
package org.hornetq.ra.inflow;
+import java.lang.reflect.Method;
import java.util.UUID;
import javax.jms.InvalidClientIDException;
@@ -19,6 +20,8 @@
import javax.resource.ResourceException;
import javax.resource.spi.endpoint.MessageEndpoint;
import javax.resource.spi.endpoint.MessageEndpointFactory;
+import javax.transaction.SystemException;
+import javax.transaction.TransactionManager;
import org.hornetq.api.core.HornetQException;
import org.hornetq.api.core.SimpleString;
@@ -69,6 +72,8 @@
private final int sessionNr;
+ private TransactionManager tm;
+
public HornetQMessageHandler(final HornetQActivation activation, final ClientSession session, final int sessionNr)
{
this.activation = activation;
@@ -241,14 +246,27 @@
HornetQMessage msg = HornetQMessage.createMessage(message, session);
boolean beforeDelivery = false;
+
try
{
+ if(activation.getActivationSpec().getTransactionTimeout() > 0)
+ {
+ getTm().setTransactionTimeout(activation.getActivationSpec().getTransactionTimeout());
+ }
endpoint.beforeDelivery(HornetQActivation.ONMESSAGE);
beforeDelivery = true;
msg.doBeforeReceive();
((MessageListener)endpoint).onMessage(msg);
message.acknowledge();
- endpoint.afterDelivery();
+ try
+ {
+ endpoint.afterDelivery();
+ }
+ catch (ResourceException e)
+ {
+ HornetQMessageHandler.log.warn("Unable to call after delivery", e);
+ return;
+ }
if (useLocalTx)
{
session.commit();
@@ -266,7 +284,7 @@
}
catch (ResourceException e1)
{
- HornetQMessageHandler.log.warn("Unable to call after delivery");
+ HornetQMessageHandler.log.warn("Unable to call after delivery", e);
}
}
if (useLocalTx || !activation.isDeliveryTransacted())
@@ -284,4 +302,33 @@
}
+ private TransactionManager getTm()
+ {
+ if (tm == null)
+ {
+ try
+ {
+ ClassLoader loader = Thread.currentThread().getContextClassLoader();
+ Class aClass = loader.loadClass(activation.getActivationSpec().getTransactionManagerLocatorClass());
+ Object o = aClass.newInstance();
+ Method m = aClass.getMethod(activation.getActivationSpec().getTransactionManagerLocatorMethod());
+ tm = (TransactionManager)m.invoke(o);
+ }
+ catch (Exception e)
+ {
+ throw new IllegalStateException("unable to create TransactionManager from " + activation.getActivationSpec().getTransactionManagerLocatorClass() +
+ "." +
+ activation.getActivationSpec().getTransactionManagerLocatorMethod(),
+ e);
+ }
+
+ if (tm == null)
+ {
+ throw new IllegalStateException("Cannot locate a transaction manager");
+ }
+ }
+
+ return tm;
+ }
+
}
Modified: trunk/tests/src/org/hornetq/tests/integration/xa/XaTimeoutTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/xa/XaTimeoutTest.java 2010-04-16 14:45:28 UTC (rev 9136)
+++ trunk/tests/src/org/hornetq/tests/integration/xa/XaTimeoutTest.java 2010-04-16 16:46:08 UTC (rev 9137)
@@ -393,11 +393,19 @@
clientProducer.send(m3);
clientProducer.send(m4);
clientSession.end(xid, XAResource.TMSUCCESS);
+
+ clientSession.commit(xid, true);
+
clientSession.setTransactionTimeout(1);
+ clientSession.start(xid, XAResource.TMNOFLAGS);
CountDownLatch latch = new CountDownLatch(1);
messagingService.getResourceManager().getTransaction(xid).addOperation(new RollbackCompleteOperation(latch));
- Assert.assertTrue(latch.await(5, TimeUnit.SECONDS));
-
+ clientProducer.send(m1);
+ clientProducer.send(m2);
+ clientProducer.send(m3);
+ clientProducer.send(m4);
+ clientSession.end(xid, XAResource.TMSUCCESS);
+ Assert.assertTrue(latch.await(2600, TimeUnit.MILLISECONDS));
try
{
clientSession.commit(xid, true);
@@ -408,49 +416,15 @@
}
clientSession.start();
ClientMessage m = clientConsumer.receiveImmediate();
- Assert.assertNull(m);
- }
-
- public void testChangingTimeoutGetsPickedUpCommit() throws Exception
- {
- Xid xid = new XidImpl("xa1".getBytes(), 1, UUIDGenerator.getInstance().generateStringUUID().getBytes());
-
- ClientMessage m1 = createTextMessage("m1", clientSession);
- ClientMessage m2 = createTextMessage("m2", clientSession);
- ClientMessage m3 = createTextMessage("m3", clientSession);
- ClientMessage m4 = createTextMessage("m4", clientSession);
- clientSession.setTransactionTimeout(2);
- clientSession.start(xid, XAResource.TMNOFLAGS);
- clientProducer.send(m1);
- clientProducer.send(m2);
- clientProducer.send(m3);
- clientProducer.send(m4);
- clientSession.end(xid, XAResource.TMSUCCESS);
- clientSession.setTransactionTimeout(10000);
- CountDownLatch latch = new CountDownLatch(1);
- messagingService.getResourceManager().getTransaction(xid).addOperation(new RollbackCompleteOperation(latch));
- Assert.assertFalse(latch.await(2600, TimeUnit.MILLISECONDS));
- clientSession.prepare(xid);
- clientSession.commit(xid, false);
- ClientSession clientSession2 = sessionFactory.createSession(false, true, true);
- ClientConsumer consumer = clientSession2.createConsumer(atestq);
- clientSession2.start();
- ClientMessage m = consumer.receive(500);
Assert.assertNotNull(m);
- Assert.assertEquals(m.getBodyBuffer().readString(), "m1");
- m = consumer.receive(500);
+ m = clientConsumer.receiveImmediate();
Assert.assertNotNull(m);
- m.acknowledge();
- Assert.assertEquals(m.getBodyBuffer().readString(), "m2");
- m = consumer.receive(500);
- m.acknowledge();
+ m = clientConsumer.receiveImmediate();
Assert.assertNotNull(m);
- Assert.assertEquals(m.getBodyBuffer().readString(), "m3");
- m = consumer.receive(500);
- m.acknowledge();
+ m = clientConsumer.receiveImmediate();
Assert.assertNotNull(m);
- Assert.assertEquals(m.getBodyBuffer().readString(), "m4");
- clientSession2.close();
+ m = clientConsumer.receiveImmediate();
+ Assert.assertNull(m);
}
public void testMultipleTransactionsTimedOut() throws Exception
@@ -464,6 +438,7 @@
for (int i = 0; i < clientSessions.length; i++)
{
clientSessions[i] = sessionFactory.createSession(true, false, false);
+ clientSessions[i].setTransactionTimeout(i < 50?2:5000);
}
ClientProducer[] clientProducers = new ClientProducer[xids.length];
@@ -478,7 +453,6 @@
{
messages[i] = createTextMessage("m" + i, clientSession);
}
- clientSession.setTransactionTimeout(2);
for (int i = 0; i < clientSessions.length; i++)
{
clientSessions[i].start(xids[i], XAResource.TMNOFLAGS);
@@ -491,13 +465,21 @@
{
clientSessions[i].end(xids[i], XAResource.TMSUCCESS);
}
- CountDownLatch latch = new CountDownLatch(1);
- messagingService.getResourceManager()
- .getTransaction(xids[clientSessions.length - 1])
- .addOperation(new RollbackCompleteOperation(latch));
- Assert.assertTrue(latch.await(5, TimeUnit.SECONDS));
- for (int i = 0; i < clientSessions.length; i++)
+ CountDownLatch[] latches = new CountDownLatch[xids.length];
+ for (int i1 = 0; i1 < latches.length; i1++)
{
+ latches[i1] = new CountDownLatch(1);
+ messagingService.getResourceManager()
+ .getTransaction(xids[i1])
+ .addOperation(new RollbackCompleteOperation(latches[i1]));
+ }
+ for (int i1 = 0;i1 < latches.length/2; i1++)
+ {
+ Assert.assertTrue(latches[i1].await(5, TimeUnit.SECONDS));
+ }
+
+ for (int i = 0; i < clientSessions.length/2; i++)
+ {
try
{
clientSessions[i].commit(xids[i], true);
@@ -507,11 +489,20 @@
Assert.assertTrue(e.errorCode == XAException.XAER_NOTA);
}
}
+ for (int i = 50; i < clientSessions.length; i++)
+ {
+ clientSessions[i].commit(xids[i], true);
+ }
for (ClientSession session : clientSessions)
{
session.close();
}
clientSession.start();
+ for(int i = 0; i < clientSessions.length/2; i++)
+ {
+ ClientMessage m = clientConsumer.receiveImmediate();
+ Assert.assertNotNull(m);
+ }
ClientMessage m = clientConsumer.receiveImmediate();
Assert.assertNull(m);
}
Modified: trunk/tests/src/org/hornetq/tests/unit/core/client/impl/LargeMessageBufferTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/client/impl/LargeMessageBufferTest.java 2010-04-16 14:45:28 UTC (rev 9136)
+++ trunk/tests/src/org/hornetq/tests/unit/core/client/impl/LargeMessageBufferTest.java 2010-04-16 16:46:08 UTC (rev 9137)
@@ -635,7 +635,7 @@
}
- public void clear() throws HornetQException
+ public void clear(boolean waitForOnMessage) throws HornetQException
{
// TODO Auto-generated method stub
@@ -725,6 +725,11 @@
}
+ public void stop(boolean waitForOnMessage) throws HornetQException
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
public SessionQueueQueryResponseMessage getQueueInfo()
{
// TODO Auto-generated method stub
Modified: trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/BindingsImplTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/BindingsImplTest.java 2010-04-16 14:45:28 UTC (rev 9136)
+++ trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/BindingsImplTest.java 2010-04-16 16:46:08 UTC (rev 9137)
@@ -134,6 +134,11 @@
}
+ public boolean hasTimedOut(long currentTime, int defaultTimeout)
+ {
+ return false;
+ }
+
/* (non-Javadoc)
* @see org.hornetq.core.transaction.Transaction#commit()
*/
@@ -282,6 +287,11 @@
}
+ public void setTimeout(int timeout)
+ {
+
+ }
+
}
class FakeMessage implements ServerMessage
More information about the hornetq-commits
mailing list