[jboss-cvs] JBoss Messaging SVN: r5280 - in trunk: src/main/org/jboss/messaging/core/config and 7 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Wed Nov 5 12:24:25 EST 2008
Author: ataylor
Date: 2008-11-05 12:24:25 -0500 (Wed, 05 Nov 2008)
New Revision: 5280
Modified:
trunk/src/config/jbm-configuration.xml
trunk/src/main/org/jboss/messaging/core/config/Configuration.java
trunk/src/main/org/jboss/messaging/core/config/impl/ConfigurationImpl.java
trunk/src/main/org/jboss/messaging/core/config/impl/FileConfiguration.java
trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
trunk/src/main/org/jboss/messaging/core/transaction/ResourceManager.java
trunk/src/main/org/jboss/messaging/core/transaction/Transaction.java
trunk/src/main/org/jboss/messaging/core/transaction/impl/ResourceManagerImpl.java
trunk/src/main/org/jboss/messaging/core/transaction/impl/TransactionImpl.java
trunk/tests/config/ConfigurationTest-config.xml
trunk/tests/src/org/jboss/messaging/tests/integration/xa/XaTimeoutTest.java
trunk/tests/src/org/jboss/messaging/tests/unit/core/config/impl/ConfigurationImplTest.java
trunk/tests/src/org/jboss/messaging/tests/unit/core/config/impl/FileConfigurationTest.java
Log:
https://jira.jboss.org/jira/browse/JBMESSAGING-1302 - some changes to xa timeout functionality
Modified: trunk/src/config/jbm-configuration.xml
===================================================================
--- trunk/src/config/jbm-configuration.xml 2008-11-05 16:52:19 UTC (rev 5279)
+++ trunk/src/config/jbm-configuration.xml 2008-11-05 17:24:25 UTC (rev 5280)
@@ -21,6 +21,11 @@
<jmx-management-enabled>true</jmx-management-enabled>
<connection-scan-period>10000</connection-scan-period>
+
+ <!--how long before timing a transaction out-->
+ <transaction-timeout>60000</transaction-timeout>
+ <!--how often to scan for timedout transactions-->
+ <transaction-timeout-scan-period>1000</transaction-timeout-scan-period>
<!-- Example interceptors
<remoting-interceptors>
Modified: trunk/src/main/org/jboss/messaging/core/config/Configuration.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/config/Configuration.java 2008-11-05 16:52:19 UTC (rev 5279)
+++ trunk/src/main/org/jboss/messaging/core/config/Configuration.java 2008-11-05 17:24:25 UTC (rev 5280)
@@ -145,5 +145,13 @@
boolean isWildcardRoutingEnabled();
+ long getTransactionTimeout();
+
+ void setTransactionTimeout(long timeout);
+
boolean isMessageCounterEnabled();
+
+ long getTransactionTimeoutScanPeriod();
+
+ void setTransactionTimeoutScanPeriod(long period);
}
Modified: trunk/src/main/org/jboss/messaging/core/config/impl/ConfigurationImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/config/impl/ConfigurationImpl.java 2008-11-05 16:52:19 UTC (rev 5279)
+++ trunk/src/main/org/jboss/messaging/core/config/impl/ConfigurationImpl.java 2008-11-05 17:24:25 UTC (rev 5280)
@@ -77,12 +77,16 @@
public static final boolean DEFAULT_MESSAGE_COUNTER_ENABLED = false;
+ public static final long DEFAULT_TRANSACTION_TIMEOUT = 60000;
+
+ public static final long DEFAULT_TRANSACTION_TIMEOUT_SCAN_PERIOD = 1000;
+
// Attributes -----------------------------------------------------------------------------
protected boolean clustered = DEFAULT_CLUSTERED;
protected boolean backup = DEFAULT_BACKUP;
-
+
protected long queueActivationTimeout = DEFAULT_QUEUE_ACTIVATION_TIMEOUT;
protected int scheduledThreadPoolMaxSize = DEFAULT_SCHEDULED_THREAD_POOL_MAX_SIZE;
@@ -136,7 +140,11 @@
protected boolean wildcardRoutingEnabled = DEFAULT_WILDCARD_ROUTING_ENABLED;
protected boolean messageCounterEnabled = DEFAULT_MESSAGE_COUNTER_ENABLED;
-
+
+ protected long transactionTimeout = DEFAULT_TRANSACTION_TIMEOUT;
+
+ protected long transactionTimeoutScanPeriod = DEFAULT_TRANSACTION_TIMEOUT_SCAN_PERIOD;
+
public boolean isClustered()
{
return clustered;
@@ -156,12 +164,12 @@
{
this.backup = backup;
}
-
+
public long getQueueActivationTimeout()
{
return queueActivationTimeout;
}
-
+
public void setQueueActivationTimeout(long timeout)
{
this.queueActivationTimeout = timeout;
@@ -352,6 +360,26 @@
return wildcardRoutingEnabled;
}
+ public long getTransactionTimeout()
+ {
+ return transactionTimeout;
+ }
+
+ public void setTransactionTimeout(long timeout)
+ {
+ transactionTimeout = timeout;
+ }
+
+ public long getTransactionTimeoutScanPeriod()
+ {
+ return transactionTimeoutScanPeriod;
+ }
+
+ public void setTransactionTimeoutScanPeriod(long period)
+ {
+ transactionTimeoutScanPeriod = period;
+ }
+
public boolean isSecurityEnabled()
{
return securityEnabled;
@@ -391,7 +419,7 @@
{
pagingMaxGlobalSize = maxGlobalSize;
}
-
+
public boolean isMessageCounterEnabled()
{
return messageCounterEnabled;
Modified: trunk/src/main/org/jboss/messaging/core/config/impl/FileConfiguration.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/config/impl/FileConfiguration.java 2008-11-05 16:52:19 UTC (rev 5279)
+++ trunk/src/main/org/jboss/messaging/core/config/impl/FileConfiguration.java 2008-11-05 17:24:25 UTC (rev 5280)
@@ -87,6 +87,10 @@
securityInvalidationInterval = getLong(e, "security-invalidation-interval", securityInvalidationInterval);
connectionScanPeriod = getLong(e, "connection-scan-period", connectionScanPeriod);
+
+ transactionTimeout = getLong(e, "transaction-timeout", transactionTimeout);
+
+ transactionTimeoutScanPeriod = getLong(e, "transaction-timeout-scan-period", transactionTimeoutScanPeriod);
NodeList interceptorNodes = e.getElementsByTagName("remoting-interceptors");
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java 2008-11-05 16:52:19 UTC (rev 5279)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java 2008-11-05 17:24:25 UTC (rev 5280)
@@ -216,7 +216,7 @@
storeFactory.setPagingManager(pagingManager);
- resourceManager = new ResourceManagerImpl(0, scheduledExecutor, storageManager, postOffice, queueSettingsRepository);
+ resourceManager = new ResourceManagerImpl((int) configuration.getTransactionTimeout()/1000, configuration.getTransactionTimeoutScanPeriod(), storageManager, postOffice, queueSettingsRepository);
postOffice = new PostOfficeImpl(storageManager,
pagingManager,
queueFactory,
@@ -237,6 +237,7 @@
this);
postOffice.start();
+ resourceManager.start();
TransportConfiguration backupConnector = configuration.getBackupConnectorConfiguration();
@@ -291,6 +292,8 @@
}
securityStore = null;
+ resourceManager.stop();
+ resourceManager = null;
postOffice.stop();
postOffice = null;
securityRepository = null;
Modified: trunk/src/main/org/jboss/messaging/core/transaction/ResourceManager.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/transaction/ResourceManager.java 2008-11-05 16:52:19 UTC (rev 5279)
+++ trunk/src/main/org/jboss/messaging/core/transaction/ResourceManager.java 2008-11-05 17:24:25 UTC (rev 5280)
@@ -22,6 +22,8 @@
package org.jboss.messaging.core.transaction;
+import org.jboss.messaging.core.server.MessagingComponent;
+
import java.util.List;
import javax.transaction.xa.Xid;
@@ -34,7 +36,7 @@
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
*
*/
-public interface ResourceManager
+public interface ResourceManager extends MessagingComponent
{
boolean putTransaction(Xid xid, Transaction tx);
Modified: trunk/src/main/org/jboss/messaging/core/transaction/Transaction.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/transaction/Transaction.java 2008-11-05 16:52:19 UTC (rev 5279)
+++ trunk/src/main/org/jboss/messaging/core/transaction/Transaction.java 2008-11-05 17:24:25 UTC (rev 5280)
@@ -76,6 +76,8 @@
List<MessageReference> timeout() throws Exception;
+ long getCreateTime();
+
static enum State
{
ACTIVE, PREPARED, COMMITTED, ROLLEDBACK, SUSPENDED, ROLLBACK_ONLY
Modified: trunk/src/main/org/jboss/messaging/core/transaction/impl/ResourceManagerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/transaction/impl/ResourceManagerImpl.java 2008-11-05 16:52:19 UTC (rev 5279)
+++ trunk/src/main/org/jboss/messaging/core/transaction/impl/ResourceManagerImpl.java 2008-11-05 17:24:25 UTC (rev 5280)
@@ -23,30 +23,29 @@
package org.jboss.messaging.core.transaction.impl;
import java.util.ArrayList;
+import java.util.List;
import java.util.HashMap;
+import java.util.Map;
import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
+import java.util.TimerTask;
+import java.util.Set;
+import java.util.HashSet;
import java.util.Timer;
-import java.util.TimerTask;
-import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.TimeUnit;
import javax.transaction.xa.Xid;
+import org.jboss.messaging.core.transaction.ResourceManager;
+import org.jboss.messaging.core.transaction.Transaction;
import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.server.MessageReference;
+import org.jboss.messaging.core.server.Queue;
+import org.jboss.messaging.core.server.MessagingComponent;
import org.jboss.messaging.core.persistence.StorageManager;
import org.jboss.messaging.core.postoffice.PostOffice;
-import org.jboss.messaging.core.server.MessageReference;
-import org.jboss.messaging.core.server.Queue;
+import org.jboss.messaging.core.settings.impl.QueueSettings;
import org.jboss.messaging.core.settings.HierarchicalRepository;
-import org.jboss.messaging.core.settings.impl.QueueSettings;
-import org.jboss.messaging.core.transaction.ResourceManager;
-import org.jboss.messaging.core.transaction.Transaction;
/**
* A ResourceManagerImpl
@@ -55,7 +54,7 @@
*
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
*/
-public class ResourceManagerImpl implements ResourceManager
+public class ResourceManagerImpl implements ResourceManager, MessagingComponent
{
private static final Logger log = Logger.getLogger(ResourceManagerImpl.class);
@@ -65,29 +64,73 @@
private volatile int timeoutSeconds;
- private final ScheduledExecutorService executorService;
-
- private final Map<Xid, ScheduledFuture<Boolean>> scheduledTimeoutTxs = new HashMap<Xid, ScheduledFuture<Boolean>>();
-
private final StorageManager storageManager;
private final PostOffice postOffice;
private final HierarchicalRepository<QueueSettings> queueSettingsRepository;
-
+
+ private boolean started = false;
+
+ private Timer timer;
+
+ private TimerTask task;
+
+ private final long txTimeoutScanPeriod;
+
public ResourceManagerImpl(final int defaultTimeoutSeconds,
- final ScheduledExecutorService scheduledExecutor,
+ final long txTimeoutScanPeriod,
final StorageManager storageManager,
final PostOffice postOffice,
final HierarchicalRepository<QueueSettings> queueSettingsRepository)
{
this.defaultTimeoutSeconds = defaultTimeoutSeconds;
- this.executorService = scheduledExecutor;
+ this.timeoutSeconds = defaultTimeoutSeconds;
+ this.txTimeoutScanPeriod = txTimeoutScanPeriod;
this.storageManager = storageManager;
this.postOffice = postOffice;
this.queueSettingsRepository = queueSettingsRepository;
}
+ // MessagingComponent implementation
+
+ public void start() throws Exception
+ {
+ if (started)
+ {
+ return;
+ }
+ timer = new Timer(true);
+ task = new TxTimeoutHandler();
+ timer.schedule(task, txTimeoutScanPeriod, txTimeoutScanPeriod);
+ started = true;
+ }
+
+ public void stop() throws Exception
+ {
+ if (!started)
+ {
+ return;
+ }
+ if (timer != null)
+ {
+ task.cancel();
+
+ task = null;
+
+ timer.cancel();
+
+ timer = null;
+ }
+
+ started = false;
+ }
+
+ public boolean isStarted()
+ {
+ return started;
+ }
+
// ResourceManager implementation ---------------------------------------------
public Transaction getTransaction(final Xid xid)
@@ -97,24 +140,11 @@
public boolean putTransaction(final Xid xid, final Transaction tx)
{
- boolean added = transactions.putIfAbsent(xid, tx) == null;
- if (added && timeoutSeconds > 0)
- {
- ScheduledFuture<Boolean> future = executorService.schedule(new TxTimeoutHandler(tx),
- timeoutSeconds,
- TimeUnit.SECONDS);
- scheduledTimeoutTxs.put(xid, future);
- }
- return added;
+ return transactions.putIfAbsent(xid, tx) == null;
}
public Transaction removeTransaction(final Xid xid)
{
- ScheduledFuture<Boolean> future = scheduledTimeoutTxs.get(xid);
- if (future != null)
- {
- future.cancel(true);
- }
return transactions.remove(xid);
}
@@ -127,7 +157,7 @@
{
if (timeoutSeconds == 0)
{
- // reset to default
+ //reset to default
this.timeoutSeconds = defaultTimeoutSeconds;
}
else
@@ -141,7 +171,7 @@
public List<Xid> getPreparedTransactions()
{
List<Xid> xids = new ArrayList<Xid>();
-
+
for (Xid xid : transactions.keySet())
{
if (transactions.get(xid).getState() == Transaction.State.PREPARED)
@@ -152,53 +182,63 @@
return xids;
}
- private class TxTimeoutHandler implements Callable
+ class TxTimeoutHandler extends TimerTask
{
- final Transaction tx;
-
- public TxTimeoutHandler(final Transaction tx)
+ public void run()
{
- this.tx = tx;
- }
+ Set<Transaction> timedoutTransactions = new HashSet<Transaction>();
- public Object call() throws Exception
- {
- transactions.remove(tx.getXid());
+ long now = System.currentTimeMillis();
- log.warn("transaction with xid " + tx.getXid() + " timed out");
+ for (Transaction tx : transactions.values())
+ {
+ if (tx.getState() != Transaction.State.PREPARED && now > (tx.getCreateTime() + timeoutSeconds * 1000))
+ {
+ transactions.remove(tx.getXid());
+ log.warn("transaction with xid " + tx.getXid() + " timed out");
+ timedoutTransactions.add(tx);
+ }
+ }
- List<MessageReference> rolledBack = tx.timeout();
-
- Map<Queue, LinkedList<MessageReference>> queueMap = new HashMap<Queue, LinkedList<MessageReference>>();
-
- // TODO - this code is duplicated in ServerSessionImpl - combine
- for (MessageReference ref : rolledBack)
+ for (Transaction failedTransaction : timedoutTransactions)
{
- if (ref.cancel(storageManager, postOffice, queueSettingsRepository))
+ try
{
- Queue queue = ref.getQueue();
+ List<MessageReference> rolledBack = failedTransaction.timeout();
+ Map<Queue, LinkedList<MessageReference>> queueMap = new HashMap<Queue, LinkedList<MessageReference>>();
- LinkedList<MessageReference> list = queueMap.get(queue);
+ for (MessageReference ref : rolledBack)
+ {
+ if (ref.cancel(storageManager, postOffice, queueSettingsRepository))
+ {
+ Queue queue = ref.getQueue();
- if (list == null)
+ LinkedList<MessageReference> list = queueMap.get(queue);
+
+ if (list == null)
+ {
+ list = new LinkedList<MessageReference>();
+
+ queueMap.put(queue, list);
+ }
+
+ list.add(ref);
+ }
+ }
+
+ for (Map.Entry<Queue, LinkedList<MessageReference>> entry : queueMap.entrySet())
{
- list = new LinkedList<MessageReference>();
+ LinkedList<MessageReference> refs = entry.getValue();
- queueMap.put(queue, list);
+ entry.getKey().addListFirst(refs);
}
-
- list.add(ref);
}
+ catch (Exception e)
+ {
+ log.error("failed to timeout transaction, xid:" + failedTransaction.getXid(), e);
+ }
}
+ }
- for (Map.Entry<Queue, LinkedList<MessageReference>> entry : queueMap.entrySet())
- {
- LinkedList<MessageReference> refs = entry.getValue();
-
- entry.getKey().addListFirst(refs);
- }
-
- return null;
- }
}
}
Modified: trunk/src/main/org/jboss/messaging/core/transaction/impl/TransactionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/transaction/impl/TransactionImpl.java 2008-11-05 16:52:19 UTC (rev 5279)
+++ trunk/src/main/org/jboss/messaging/core/transaction/impl/TransactionImpl.java 2008-11-05 17:24:25 UTC (rev 5280)
@@ -39,7 +39,7 @@
/**
* A TransactionImpl
- *
+ *
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
* @author <a href="mailto:andy.taylor at jboss.org>Andy Taylor</a>
*/
@@ -77,6 +77,8 @@
private final Object timeoutLock = new Object();
+ private final long createTime;
+
public TransactionImpl(final StorageManager storageManager, final PostOffice postOffice)
{
this.storageManager = storageManager;
@@ -95,6 +97,8 @@
this.xid = null;
this.id = storageManager.generateUniqueID();
+
+ createTime = System.currentTimeMillis();
}
public TransactionImpl(final Xid xid, final StorageManager storageManager, final PostOffice postOffice)
@@ -115,6 +119,8 @@
this.xid = xid;
this.id = storageManager.generateUniqueID();
+
+ createTime = System.currentTimeMillis();
}
public TransactionImpl(final long id, final Xid xid, final StorageManager storageManager, final PostOffice postOffice)
@@ -135,6 +141,8 @@
{
this.pagingManager = postOffice.getPagingManager();
}
+
+ createTime = System.currentTimeMillis();
}
// Transaction implementation
@@ -197,15 +205,20 @@
synchronized (timeoutLock)
{
// if we've already rolled back or committed we don't need to do anything
- if (state == State.COMMITTED || state == State.ROLLBACK_ONLY)
+ if (state == State.COMMITTED || state == State.ROLLBACK_ONLY || state == State.PREPARED)
{
return Collections.emptyList();
}
-
+
return doRollback();
}
}
+ public long getCreateTime()
+ {
+ return createTime;
+ }
+
public void addAcknowledgement(final MessageReference acknowledgement) throws Exception
{
if (state != State.ACTIVE)
@@ -256,21 +269,24 @@
public void prepare() throws Exception
{
- if (state != State.ACTIVE)
+ synchronized (timeoutLock)
{
- throw new IllegalStateException("Transaction is in invalid state " + state);
- }
+ if (state != State.ACTIVE)
+ {
+ throw new IllegalStateException("Transaction is in invalid state " + state);
+ }
- if (xid == null)
- {
- throw new IllegalStateException("Cannot prepare non XA transaction");
- }
+ if (xid == null)
+ {
+ throw new IllegalStateException("Cannot prepare non XA transaction");
+ }
- pageMessages();
+ pageMessages();
- storageManager.prepare(id, xid);
+ storageManager.prepare(id, xid);
- state = State.PREPARED;
+ state = State.PREPARED;
+ }
}
public void commit() throws Exception
@@ -559,10 +575,7 @@
scheduledReferences.put(ref, scheduledDeliveryTime);
if (ref.getQueue().isDurable())
{
- storageManager.storeMessageReferenceScheduledTransactional(id,
- ref.getQueue().getPersistenceID(),
- message.getMessageID(),
- scheduledDeliveryTime);
+ storageManager.storeMessageReferenceScheduledTransactional(id, ref.getQueue().getPersistenceID(), message.getMessageID(), scheduledDeliveryTime);
}
}
}
Modified: trunk/tests/config/ConfigurationTest-config.xml
===================================================================
--- trunk/tests/config/ConfigurationTest-config.xml 2008-11-05 16:52:19 UTC (rev 5279)
+++ trunk/tests/config/ConfigurationTest-config.xml 2008-11-05 17:24:25 UTC (rev 5280)
@@ -11,6 +11,8 @@
<call-timeout>7654</call-timeout>
<packet-confirmation-batch-size>543</packet-confirmation-batch-size>
<connection-scan-period>6543</connection-scan-period>
+ <transaction-timeout>98765</transaction-timeout>
+ <transaction-timeout-scan-period>56789</transaction-timeout-scan-period>
<remoting-interceptors>
<class-name>org.jboss.messaging.tests.unit.core.config.impl.TestInterceptor1</class-name>
<class-name>org.jboss.messaging.tests.unit.core.config.impl.TestInterceptor2</class-name>
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/xa/XaTimeoutTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/xa/XaTimeoutTest.java 2008-11-05 16:52:19 UTC (rev 5279)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/xa/XaTimeoutTest.java 2008-11-05 17:24:25 UTC (rev 5280)
@@ -49,19 +49,26 @@
/**
* @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
*/
-public class XaTimeoutTest extends UnitTestCase
+public class XaTimeoutTest extends UnitTestCase
{
private static final String ACCEPTOR_FACTORY = "org.jboss.messaging.core.remoting.impl.invm.InVMAcceptorFactory";
+
private static final String CONNECTOR_FACTORY = "org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory";
private Map<String, QueueSettings> queueSettings = new HashMap<String, QueueSettings>();
private MessagingService messagingService;
+
private ClientSession clientSession;
+
private ClientProducer clientProducer;
+
private ClientConsumer clientConsumer;
+
private ClientSessionFactory sessionFactory;
+
private ConfigurationImpl configuration;
+
private SimpleString atestq = new SimpleString("atestq");
protected void setUp() throws Exception
@@ -69,6 +76,7 @@
queueSettings.clear();
configuration = new ConfigurationImpl();
configuration.setSecurityEnabled(false);
+ configuration.setTransactionTimeoutScanPeriod(500);
TransportConfiguration transportConfig = new TransportConfiguration(ACCEPTOR_FACTORY);
configuration.getAcceptorConfigurations().add(transportConfig);
messagingService = MessagingServiceImpl.newNullStorageMessagingServer(configuration);
@@ -125,8 +133,7 @@
clientProducer.send(m3);
clientProducer.send(m4);
clientSession.end(xid, XAResource.TMSUCCESS);
- clientSession.prepare(xid);
- Thread.sleep(1100);
+ Thread.sleep(1500);
try
{
clientSession.commit(xid, true);
@@ -175,8 +182,7 @@
assertNotNull(m);
assertEquals(m.getBody().getString(), "m4");
clientSession.end(xid, XAResource.TMSUCCESS);
- clientSession.prepare(xid);
- Thread.sleep(2100);
+ Thread.sleep(2600);
try
{
clientSession.commit(xid, true);
@@ -232,9 +238,9 @@
clientSession.start(xid, XAResource.TMNOFLAGS);
clientSession.start();
clientProducer.send(m5);
- clientProducer.send(m5);
- clientProducer.send(m5);
- clientProducer.send(m5);
+ clientProducer.send(m6);
+ clientProducer.send(m7);
+ clientProducer.send(m8);
ClientMessage m = clientConsumer.receive(500);
m.acknowledge();
assertNotNull(m);
@@ -252,8 +258,7 @@
assertNotNull(m);
assertEquals(m.getBody().getString(), "m4");
clientSession.end(xid, XAResource.TMSUCCESS);
- clientSession.prepare(xid);
- Thread.sleep(2100);
+ Thread.sleep(2600);
try
{
clientSession.commit(xid, true);
@@ -287,7 +292,209 @@
assertNull(m);
clientSession2.close();
}
-
+
+ public void testPreparedTransactionNotTimedOut() throws Exception
+ {
+ Xid xid = new XidImpl("xa1".getBytes(), 1, new GUID().toString().getBytes());
+
+ ClientMessage m1 = createTextMessage("m1");
+ ClientMessage m2 = createTextMessage("m2");
+ ClientMessage m3 = createTextMessage("m3");
+ ClientMessage m4 = createTextMessage("m4");
+ ClientMessage m5 = createTextMessage("m5");
+ ClientMessage m6 = createTextMessage("m6");
+ ClientMessage m7 = createTextMessage("m7");
+ ClientMessage m8 = createTextMessage("m8");
+ ClientSession clientSession2 = sessionFactory.createSession(false, true, true, false);
+ ClientProducer clientProducer2 = clientSession2.createProducer(atestq);
+ clientProducer2.send(m1);
+ clientProducer2.send(m2);
+ clientProducer2.send(m3);
+ clientProducer2.send(m4);
+ clientSession2.close();
+ clientSession.setTransactionTimeout(2);
+ clientSession.start(xid, XAResource.TMNOFLAGS);
+ clientSession.start();
+ clientProducer.send(m5);
+ clientProducer.send(m6);
+ clientProducer.send(m7);
+ clientProducer.send(m8);
+ ClientMessage m = clientConsumer.receive(500);
+ m.acknowledge();
+ assertNotNull(m);
+ assertEquals(m.getBody().getString(), "m1");
+ m = clientConsumer.receive(500);
+ assertNotNull(m);
+ m.acknowledge();
+ assertEquals(m.getBody().getString(), "m2");
+ m = clientConsumer.receive(500);
+ m.acknowledge();
+ assertNotNull(m);
+ assertEquals(m.getBody().getString(), "m3");
+ m = clientConsumer.receive(500);
+ m.acknowledge();
+ assertNotNull(m);
+ assertEquals(m.getBody().getString(), "m4");
+ clientSession.end(xid, XAResource.TMSUCCESS);
+ clientSession.prepare(xid);
+ Thread.sleep(2600);
+ clientSession.commit(xid, true);
+
+ clientSession.setTransactionTimeout(0);
+ clientConsumer.close();
+ clientSession2 = sessionFactory.createSession(false, true, true, false);
+ ClientConsumer consumer = clientSession2.createConsumer(atestq);
+ clientSession2.start();
+ m = consumer.receive(500);
+ m.acknowledge();
+ assertNotNull(m);
+ assertEquals(m.getBody().getString(), "m5");
+ m = consumer.receive(500);
+ assertNotNull(m);
+ m.acknowledge();
+ assertEquals(m.getBody().getString(), "m6");
+ m = consumer.receive(500);
+ m.acknowledge();
+ assertNotNull(m);
+ assertEquals(m.getBody().getString(), "m7");
+ m = consumer.receive(500);
+ m.acknowledge();
+ assertNotNull(m);
+ assertEquals(m.getBody().getString(), "m8");
+ m = consumer.receive(500);
+ assertNull(m);
+ clientSession2.close();
+ }
+
+ public void testChangingTimeoutGetsPickedUp() throws Exception
+ {
+ Xid xid = new XidImpl("xa1".getBytes(), 1, new GUID().toString().getBytes());
+
+ ClientMessage m1 = createTextMessage("m1");
+ ClientMessage m2 = createTextMessage("m2");
+ ClientMessage m3 = createTextMessage("m3");
+ ClientMessage m4 = createTextMessage("m4");
+ clientSession.start(xid, XAResource.TMNOFLAGS);
+ clientProducer.send(m1);
+ clientProducer.send(m2);
+ clientProducer.send(m3);
+ clientProducer.send(m4);
+ clientSession.end(xid, XAResource.TMSUCCESS);
+ clientSession.setTransactionTimeout(1);
+ Thread.sleep(1500);
+
+ try
+ {
+ clientSession.commit(xid, true);
+ }
+ catch (XAException e)
+ {
+ assertTrue(e.errorCode == XAException.XAER_NOTA);
+ }
+ clientSession.start();
+ ClientMessage m = clientConsumer.receive(500);
+ assertNull(m);
+ }
+
+ public void testChangingTimeoutGetsPickedUpCommit() throws Exception
+ {
+ Xid xid = new XidImpl("xa1".getBytes(), 1, new GUID().toString().getBytes());
+
+ ClientMessage m1 = createTextMessage("m1");
+ ClientMessage m2 = createTextMessage("m2");
+ ClientMessage m3 = createTextMessage("m3");
+ ClientMessage m4 = createTextMessage("m4");
+ clientSession.setTransactionTimeout(2);
+ clientSession.start(xid, XAResource.TMNOFLAGS);
+ clientProducer.send(m1);
+ clientProducer.send(m2);
+ clientProducer.send(m3);
+ clientProducer.send(m4);
+ clientSession.end(xid, XAResource.TMSUCCESS);
+ clientSession.setTransactionTimeout(10000);
+ Thread.sleep(2600);
+ clientSession.prepare(xid);
+ clientSession.commit(xid, true);
+ ClientSession clientSession2 = sessionFactory.createSession(false, true, true, false);
+ ClientConsumer consumer = clientSession2.createConsumer(atestq);
+ clientSession2.start();
+ ClientMessage m = consumer.receive(500);
+ assertNotNull(m);
+ assertEquals(m.getBody().getString(), "m1");
+ m = consumer.receive(500);
+ assertNotNull(m);
+ m.acknowledge();
+ assertEquals(m.getBody().getString(), "m2");
+ m = consumer.receive(500);
+ m.acknowledge();
+ assertNotNull(m);
+ assertEquals(m.getBody().getString(), "m3");
+ m = consumer.receive(500);
+ m.acknowledge();
+ assertNotNull(m);
+ assertEquals(m.getBody().getString(), "m4");
+ clientSession2.close();
+ }
+
+ public void testMultipleTransactionsTimedOut() throws Exception
+ {
+ Xid[] xids = new XidImpl[100];
+ for (int i = 0; i < xids.length; i++)
+ {
+ xids[i] = new XidImpl(("xa" + i).getBytes(), 1, new GUID().toString().getBytes());
+ }
+ ClientSession[] clientSessions = new ClientSession[xids.length];
+ for (int i = 0; i < clientSessions.length; i++)
+ {
+ clientSessions[i] = sessionFactory.createSession(true, false, false, false);
+ }
+
+ ClientProducer[] clientProducers = new ClientProducer[xids.length];
+ for (int i = 0; i < clientProducers.length; i++)
+ {
+ clientProducers[i] = clientSessions[i].createProducer(atestq);
+ }
+
+ ClientMessage[] messages = new ClientMessage[xids.length];
+
+ for (int i = 0; i < messages.length; i++)
+ {
+ messages[i] = createTextMessage("m" + i);
+ }
+ clientSession.setTransactionTimeout(2);
+ for (int i = 0; i < clientSessions.length; i++)
+ {
+ clientSessions[i].start(xids[i], XAResource.TMNOFLAGS);
+ }
+ for (int i = 0; i < clientProducers.length; i++)
+ {
+ clientProducers[i].send(messages[i]);
+ }
+ for (int i = 0; i < clientSessions.length; i++)
+ {
+ clientSessions[i].end(xids[i], XAResource.TMSUCCESS);
+ }
+ Thread.sleep(2500);
+ for (int i = 0; i < clientSessions.length; i++)
+ {
+ try
+ {
+ clientSessions[i].commit(xids[i], true);
+ }
+ catch (XAException e)
+ {
+ assertTrue(e.errorCode == XAException.XAER_NOTA);
+ }
+ }
+ for (ClientSession session : clientSessions)
+ {
+ session.close();
+ }
+ clientSession.start();
+ ClientMessage m = clientConsumer.receive(500);
+ assertNull(m);
+ }
+
private ClientMessage createTextMessage(String s)
{
return createTextMessage(s, true);
Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/config/impl/ConfigurationImplTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/config/impl/ConfigurationImplTest.java 2008-11-05 16:52:19 UTC (rev 5279)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/config/impl/ConfigurationImplTest.java 2008-11-05 17:24:25 UTC (rev 5280)
@@ -71,6 +71,8 @@
assertEquals(ConfigurationImpl.DEFAULT_JOURNAL_MIN_FILES, conf.getJournalMinFiles());
assertEquals(ConfigurationImpl.DEFAULT_JOURNAL_MAX_AIO, conf.getJournalMaxAIO());
assertEquals(ConfigurationImpl.DEFAULT_WILDCARD_ROUTING_ENABLED, conf.isWildcardRoutingEnabled());
+ assertEquals(ConfigurationImpl.DEFAULT_TRANSACTION_TIMEOUT, conf.getTransactionTimeout());
+ assertEquals(ConfigurationImpl.DEFAULT_TRANSACTION_TIMEOUT_SCAN_PERIOD, conf.getTransactionTimeoutScanPeriod());
}
public void testSetGetAttributes()
Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/config/impl/FileConfigurationTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/config/impl/FileConfigurationTest.java 2008-11-05 16:52:19 UTC (rev 5279)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/config/impl/FileConfigurationTest.java 2008-11-05 17:24:25 UTC (rev 5280)
@@ -56,6 +56,8 @@
assertEquals(100, conf.getJournalMinFiles());
assertEquals(56546, conf.getJournalMaxAIO());
assertEquals(true, conf.isWildcardRoutingEnabled());
+ assertEquals(98765, conf.getTransactionTimeout());
+ assertEquals(56789, conf.getTransactionTimeoutScanPeriod());
assertEquals(2, conf.getInterceptorClassNames().size());
assertTrue(conf.getInterceptorClassNames().contains("org.jboss.messaging.tests.unit.core.config.impl.TestInterceptor1"));
More information about the jboss-cvs-commits
mailing list