[jboss-cvs] JBoss Messaging SVN: r5266 - in trunk: src/main/org/jboss/messaging/core/transaction/impl and 1 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Tue Nov 4 14:18:07 EST 2008
Author: ataylor
Date: 2008-11-04 14:18:07 -0500 (Tue, 04 Nov 2008)
New Revision: 5266
Added:
trunk/tests/src/org/jboss/messaging/tests/integration/xa/XaTimeoutTest.java
Modified:
trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
trunk/src/main/org/jboss/messaging/core/transaction/impl/ResourceManagerImpl.java
trunk/src/main/org/jboss/messaging/core/transaction/impl/TransactionImpl.java
Log:
https://jira.jboss.org/jira/browse/JBMESSAGING-1302 - added xa timeout functionality
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java 2008-11-04 19:02:27 UTC (rev 5265)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java 2008-11-04 19:18:07 UTC (rev 5266)
@@ -216,7 +216,7 @@
storeFactory.setPagingManager(pagingManager);
- resourceManager = new ResourceManagerImpl(0);
+ resourceManager = new ResourceManagerImpl(0, scheduledExecutor, storageManager, postOffice, queueSettingsRepository);
postOffice = new PostOfficeImpl(storageManager,
pagingManager,
queueFactory,
Modified: trunk/src/main/org/jboss/messaging/core/transaction/impl/ResourceManagerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/transaction/impl/ResourceManagerImpl.java 2008-11-04 19:02:27 UTC (rev 5265)
+++ trunk/src/main/org/jboss/messaging/core/transaction/impl/ResourceManagerImpl.java 2008-11-04 19:18:07 UTC (rev 5266)
@@ -18,44 +18,77 @@
* License along with this software; if not, write to the Free
* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
+ */
package org.jboss.messaging.core.transaction.impl;
import java.util.ArrayList;
import java.util.List;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.LinkedList;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.Callable;
import javax.transaction.xa.Xid;
import org.jboss.messaging.core.transaction.ResourceManager;
import org.jboss.messaging.core.transaction.Transaction;
+import org.jboss.messaging.core.exception.MessagingException;
+import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.server.MessageReference;
+import org.jboss.messaging.core.server.Queue;
+import org.jboss.messaging.core.persistence.StorageManager;
+import org.jboss.messaging.core.postoffice.PostOffice;
+import org.jboss.messaging.core.settings.impl.QueueSettings;
+import org.jboss.messaging.core.settings.HierarchicalRepository;
/**
- *
* A ResourceManagerImpl
- *
+ * <p/>
* TODO - implement timeouts
- *
+ *
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- *
*/
public class ResourceManagerImpl implements ResourceManager
{
+ private static final Logger log = Logger.getLogger(ResourceManagerImpl.class);
+
private final ConcurrentMap<Xid, Transaction> transactions = new ConcurrentHashMap<Xid, Transaction>();
-
+
private final int defaultTimeoutSeconds;
-
+
private volatile int timeoutSeconds;
-
- public ResourceManagerImpl(final int defaultTimeoutSeconds)
- {
+
+ private final ScheduledExecutorService executorService;
+
+ private final Map<Xid, ScheduledFuture> scheduledTimeoutTxs = new HashMap<Xid, ScheduledFuture>();
+
+ private final StorageManager storageManager;
+
+ private final PostOffice postOffice;
+
+ private final HierarchicalRepository<QueueSettings> queueSettingsRepository;
+
+ public ResourceManagerImpl(final int defaultTimeoutSeconds,
+ final ScheduledExecutorService scheduledExecutor,
+ final StorageManager storageManager,
+ final PostOffice postOffice,
+ final HierarchicalRepository<QueueSettings> queueSettingsRepository)
+ {
this.defaultTimeoutSeconds = defaultTimeoutSeconds;
+ this.executorService = scheduledExecutor;
+ this.storageManager = storageManager;
+ this.postOffice = postOffice;
+ this.queueSettingsRepository = queueSettingsRepository;
}
-
+
// ResourceManager implementation ---------------------------------------------
-
+
public Transaction getTransaction(final Xid xid)
{
return transactions.get(xid);
@@ -63,19 +96,30 @@
public boolean putTransaction(final Xid xid, final Transaction tx)
{
- return transactions.putIfAbsent(xid, tx) == null;
+ boolean added = transactions.putIfAbsent(xid, tx) == null;
+ if (added && timeoutSeconds > 0)
+ {
+ ScheduledFuture<Boolean> future = executorService.schedule(new TxTimeoutHandler(tx), timeoutSeconds, TimeUnit.SECONDS);
+ scheduledTimeoutTxs.put(xid, future);
+ }
+ return added;
}
public Transaction removeTransaction(final Xid xid)
{
+ ScheduledFuture<Boolean> future = scheduledTimeoutTxs.get(xid);
+ if (future != null)
+ {
+ future.cancel(true);
+ }
return transactions.remove(xid);
}
-
+
public int getTimeoutSeconds()
{
return this.timeoutSeconds;
}
-
+
public boolean setTimeoutSeconds(final int timeoutSeconds)
{
if (timeoutSeconds == 0)
@@ -86,9 +130,9 @@
else
{
this.timeoutSeconds = timeoutSeconds;
- }
-
- return true;
+ }
+
+ return false;
}
public List<Xid> getPreparedTransactions()
@@ -96,11 +140,53 @@
List<Xid> xids = new ArrayList<Xid>();
for (Xid xid : transactions.keySet())
{
- if(transactions.get(xid).getState() == Transaction.State.PREPARED)
+ if (transactions.get(xid).getState() == Transaction.State.PREPARED)
{
xids.add(xid);
}
}
return xids;
}
+
+ class TxTimeoutHandler implements Callable
+ {
+ final Transaction tx;
+
+ public TxTimeoutHandler(Transaction tx) {this.tx = tx;}
+
+ public Object call() throws Exception
+ {
+ transactions.remove(tx.getXid());
+ log.warn("transaction with xid " + tx.getXid() + " timed out");
+ List<MessageReference> rolledBack = tx.timeout();
+ Map<Queue, LinkedList<MessageReference>> queueMap = new HashMap<Queue, LinkedList<MessageReference>>();
+
+ for (MessageReference ref : rolledBack)
+ {
+ if (ref.cancel(storageManager, postOffice, queueSettingsRepository))
+ {
+ Queue queue = ref.getQueue();
+
+ LinkedList<MessageReference> list = queueMap.get(queue);
+
+ if (list == null)
+ {
+ list = new LinkedList<MessageReference>();
+
+ queueMap.put(queue, list);
+ }
+
+ list.add(ref);
+ }
+ }
+
+ for (Map.Entry<Queue, LinkedList<MessageReference>> entry : queueMap.entrySet())
+ {
+ LinkedList<MessageReference> refs = entry.getValue();
+
+ entry.getKey().addListFirst(refs);
+ }
+ return null;
+ }
+ }
}
Modified: trunk/src/main/org/jboss/messaging/core/transaction/impl/TransactionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/transaction/impl/TransactionImpl.java 2008-11-04 19:02:27 UTC (rev 5265)
+++ trunk/src/main/org/jboss/messaging/core/transaction/impl/TransactionImpl.java 2008-11-04 19:18:07 UTC (rev 5266)
@@ -18,6 +18,7 @@
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import java.util.Collections;
import javax.transaction.xa.Xid;
@@ -74,6 +75,8 @@
private MessagingException messagingException;
+ private final Object timeoutLock = new Object();
+
public TransactionImpl(final StorageManager storageManager, final PostOffice postOffice)
{
this.storageManager = storageManager;
@@ -185,6 +188,20 @@
}
}
+ public List<MessageReference> timeout() throws Exception
+ {
+ //we need to synchronize with commit and rollback just in case they get called atthesame time
+ synchronized (timeoutLock)
+ {
+ //if we've already rolled back or committed we don't need to do anything
+ if(state == State.COMMITTED || state == State.ROLLBACK_ONLY)
+ {
+ return Collections.emptyList();
+ }
+ return doRollback();
+ }
+ }
+
public void addAcknowledgement(final MessageReference acknowledgement) throws Exception
{
if (state != State.ACTIVE)
@@ -259,73 +276,76 @@
// throw new IllegalStateException("Can't commit, already inmethod " + inMethod);
// }
inMethod = 2;
- if (state == State.ROLLBACK_ONLY)
+ synchronized (timeoutLock)
{
- if (messagingException != null)
+ if (state == State.ROLLBACK_ONLY)
{
- throw messagingException;
+ if (messagingException != null)
+ {
+ throw messagingException;
+ }
+ else
+ {
+ throw new IllegalStateException("Transaction is in invalid state " + state);
+ }
+
}
+ if (xid != null)
+ {
+ if (state != State.PREPARED)
+ {
+ throw new IllegalStateException("Transaction is in invalid state " + state);
+ }
+ }
else
{
- throw new IllegalStateException("Transaction is in invalid state " + state);
+ if (state != State.ACTIVE)
+ {
+ throw new IllegalStateException("Transaction is in invalid state " + state);
+ }
}
- }
- if (xid != null)
- {
if (state != State.PREPARED)
{
- throw new IllegalStateException("Transaction is in invalid state " + state);
+ pageMessages();
}
- }
- else
- {
- if (state != State.ACTIVE)
+
+ if (containsPersistent || xid != null)
{
- throw new IllegalStateException("Transaction is in invalid state " + state);
+ storageManager.commit(id);
}
- }
- if (state != State.PREPARED)
- {
- pageMessages();
- }
+ for (MessageReference ref : refsToAdd)
+ {
+ Long scheduled = scheduledReferences.get(ref);
+ if(scheduled == null)
+ {
+ ref.getQueue().addLast(ref);
+ }
+ else
+ {
+ ref.setScheduledDeliveryTime(scheduled);
+ ref.getQueue().addLast(ref);
+ }
+ }
- if (containsPersistent || xid != null)
- {
- storageManager.commit(id);
- }
-
- for (MessageReference ref : refsToAdd)
- {
- Long scheduled = scheduledReferences.get(ref);
- if(scheduled == null)
+ // If part of the transaction goes to the queue, and part goes to paging, we can't let depage start for the
+ // transaction until all the messages were added to the queue
+ // or else we could deliver the messages out of order
+ if (pageTransaction != null)
{
- ref.getQueue().addLast(ref);
+ pageTransaction.complete();
}
- else
+
+ for (MessageReference reference : acknowledgements)
{
- ref.setScheduledDeliveryTime(scheduled);
- ref.getQueue().addLast(ref);
+ reference.getQueue().referenceAcknowledged(reference);
}
- }
- // If part of the transaction goes to the queue, and part goes to paging, we can't let depage start for the
- // transaction until all the messages were added to the queue
- // or else we could deliver the messages out of order
- if (pageTransaction != null)
- {
- pageTransaction.complete();
- }
+ clear();
- for (MessageReference reference : acknowledgements)
- {
- reference.getQueue().referenceAcknowledged(reference);
+ state = State.COMMITTED;
}
-
- clear();
-
- state = State.COMMITTED;
inMethod = -1;
}
@@ -336,21 +356,35 @@
// throw new IllegalStateException("Can't rollback, already inmethod " + inMethod);
// }
inMethod=1;
- if (xid != null)
+ LinkedList<MessageReference> toCancel;
+ synchronized (timeoutLock)
{
- if (state != State.PREPARED && state != State.ACTIVE)
+ if (xid != null)
{
- throw new IllegalStateException("Transaction is in invalid state " + state);
+ if (state != State.PREPARED && state != State.ACTIVE)
+ {
+ throw new IllegalStateException("Transaction is in invalid state " + state);
+ }
}
- }
- else
- {
- if (state != State.ACTIVE && state != State.ROLLBACK_ONLY)
+ else
{
- throw new IllegalStateException("Transaction is in invalid state " + state);
+ if (state != State.ACTIVE && state != State.ROLLBACK_ONLY)
+ {
+ throw new IllegalStateException("Transaction is in invalid state " + state);
+ }
}
+
+ toCancel = doRollback();
+
+ state = State.ROLLEDBACK;
}
+ inMethod = -1;
+ return toCancel;
+ }
+
+ private LinkedList<MessageReference> doRollback() throws Exception
+ {
if (containsPersistent || xid != null)
{
storageManager.rollback(id);
@@ -362,7 +396,7 @@
}
LinkedList<MessageReference> toCancel = new LinkedList<MessageReference>();
-
+
for (MessageReference ref : acknowledgements)
{
// Queue queue = ref.getQueue();
@@ -370,7 +404,7 @@
// ServerMessage message = ref.getMessage();
// Putting back the size on pagingManager, and reverting the counters
-
+
//FIXME - why????
//Surely paging happens before routing, so cancellation shouldn't effect anything......
// if (message.incrementReference(message.isDurable() && queue.isDurable()) == 1)
@@ -378,14 +412,10 @@
// pagingManager.addSize(message);
// }
- toCancel.add(ref);
+ toCancel.add(ref);
}
-
+
clear();
-
- state = State.ROLLEDBACK;
-
- inMethod = -1;
return toCancel;
}
Added: trunk/tests/src/org/jboss/messaging/tests/integration/xa/XaTimeoutTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/xa/XaTimeoutTest.java (rev 0)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/xa/XaTimeoutTest.java 2008-11-04 19:18:07 UTC (rev 5266)
@@ -0,0 +1,303 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.messaging.tests.integration.xa;
+
+import org.jboss.messaging.core.settings.impl.QueueSettings;
+import org.jboss.messaging.core.server.MessagingService;
+import org.jboss.messaging.core.server.impl.MessagingServiceImpl;
+import org.jboss.messaging.core.client.ClientSession;
+import org.jboss.messaging.core.client.ClientProducer;
+import org.jboss.messaging.core.client.ClientConsumer;
+import org.jboss.messaging.core.client.ClientSessionFactory;
+import org.jboss.messaging.core.client.ClientMessage;
+import org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl;
+import org.jboss.messaging.core.config.impl.ConfigurationImpl;
+import org.jboss.messaging.core.config.TransportConfiguration;
+import org.jboss.messaging.core.exception.MessagingException;
+import org.jboss.messaging.core.transaction.impl.XidImpl;
+import org.jboss.messaging.util.SimpleString;
+import org.jboss.messaging.jms.client.JBossTextMessage;
+import org.jboss.messaging.tests.util.UnitTestCase;
+import org.jboss.util.id.GUID;
+
+import javax.transaction.xa.Xid;
+import javax.transaction.xa.XAResource;
+import javax.transaction.xa.XAException;
+import java.util.Map;
+import java.util.HashMap;
+import java.io.File;
+
+/**
+ * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
+ */
+public class XaTimeoutTest extends UnitTestCase
+{
+ private static final String ACCEPTOR_FACTORY = "org.jboss.messaging.core.remoting.impl.invm.InVMAcceptorFactory";
+ private static final String CONNECTOR_FACTORY = "org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory";
+
+ private Map<String, QueueSettings> queueSettings = new HashMap<String, QueueSettings>();
+
+ private MessagingService messagingService;
+ private ClientSession clientSession;
+ private ClientProducer clientProducer;
+ private ClientConsumer clientConsumer;
+ private ClientSessionFactory sessionFactory;
+ private ConfigurationImpl configuration;
+ private SimpleString atestq = new SimpleString("atestq");
+
+ protected void setUp() throws Exception
+ {
+ queueSettings.clear();
+ configuration = new ConfigurationImpl();
+ configuration.setSecurityEnabled(false);
+ TransportConfiguration transportConfig = new TransportConfiguration(ACCEPTOR_FACTORY);
+ configuration.getAcceptorConfigurations().add(transportConfig);
+ messagingService = MessagingServiceImpl.newNullStorageMessagingServer(configuration);
+ //start the server
+ messagingService.start();
+ //then we create a client as normal
+ sessionFactory = new ClientSessionFactoryImpl(new TransportConfiguration(CONNECTOR_FACTORY));
+ clientSession = sessionFactory.createSession(true, false, false, false);
+ clientSession.createQueue(atestq, atestq, null, true, true);
+ clientProducer = clientSession.createProducer(atestq);
+ clientConsumer = clientSession.createConsumer(atestq);
+ }
+
+ protected void tearDown() throws Exception
+ {
+ if (clientSession != null)
+ {
+ try
+ {
+ clientSession.close();
+ }
+ catch (MessagingException e1)
+ {
+ //
+ }
+ }
+ if (messagingService != null && messagingService.isStarted())
+ {
+ try
+ {
+ messagingService.stop();
+ }
+ catch (Exception e1)
+ {
+ //
+ }
+ }
+ messagingService = null;
+ clientSession = null;
+ }
+
+ public void testSimpleTimeoutOnSendOnCommit() throws Exception
+ {
+ Xid xid = new XidImpl("xa1".getBytes(), 1, new GUID().toString().getBytes());
+
+ ClientMessage m1 = createTextMessage("m1");
+ ClientMessage m2 = createTextMessage("m2");
+ ClientMessage m3 = createTextMessage("m3");
+ ClientMessage m4 = createTextMessage("m4");
+ clientSession.setTransactionTimeout(1);
+ clientSession.start(xid, XAResource.TMNOFLAGS);
+ clientProducer.send(m1);
+ clientProducer.send(m2);
+ clientProducer.send(m3);
+ clientProducer.send(m4);
+ clientSession.end(xid, XAResource.TMSUCCESS);
+ clientSession.prepare(xid);
+ Thread.sleep(1100);
+ try
+ {
+ clientSession.commit(xid, true);
+ }
+ catch (XAException e)
+ {
+ assertTrue(e.errorCode == XAException.XAER_NOTA);
+ }
+ clientSession.start();
+ ClientMessage m = clientConsumer.receive(500);
+ assertNull(m);
+ }
+
+ public void testSimpleTimeoutOnReceive() throws Exception
+ {
+ Xid xid = new XidImpl("xa1".getBytes(), 1, new GUID().toString().getBytes());
+
+ ClientMessage m1 = createTextMessage("m1");
+ ClientMessage m2 = createTextMessage("m2");
+ ClientMessage m3 = createTextMessage("m3");
+ ClientMessage m4 = createTextMessage("m4");
+ ClientSession clientSession2 = sessionFactory.createSession(false, true, true, false);
+ ClientProducer clientProducer2 = clientSession2.createProducer(atestq);
+ clientProducer2.send(m1);
+ clientProducer2.send(m2);
+ clientProducer2.send(m3);
+ clientProducer2.send(m4);
+ clientSession2.close();
+ clientSession.setTransactionTimeout(2);
+ clientSession.start(xid, XAResource.TMNOFLAGS);
+ clientSession.start();
+ ClientMessage m = clientConsumer.receive(500);
+ m.acknowledge();
+ assertNotNull(m);
+ assertEquals(m.getBody().getString(), "m1");
+ m = clientConsumer.receive(500);
+ assertNotNull(m);
+ m.acknowledge();
+ assertEquals(m.getBody().getString(), "m2");
+ m = clientConsumer.receive(500);
+ m.acknowledge();
+ assertNotNull(m);
+ assertEquals(m.getBody().getString(), "m3");
+ m = clientConsumer.receive(500);
+ m.acknowledge();
+ assertNotNull(m);
+ assertEquals(m.getBody().getString(), "m4");
+ clientSession.end(xid, XAResource.TMSUCCESS);
+ clientSession.prepare(xid);
+ Thread.sleep(2100);
+ try
+ {
+ clientSession.commit(xid, true);
+ }
+ catch (XAException e)
+ {
+ assertTrue(e.errorCode == XAException.XAER_NOTA);
+ }
+ clientSession.setTransactionTimeout(0);
+ clientConsumer.close();
+ clientSession2 = sessionFactory.createSession(false, true, true, false);
+ ClientConsumer consumer = clientSession2.createConsumer(atestq);
+ clientSession2.start();
+ m = consumer.receive(500);
+ m.acknowledge();
+ assertNotNull(m);
+ assertEquals(m.getBody().getString(), "m1");
+ m = consumer.receive(500);
+ assertNotNull(m);
+ m.acknowledge();
+ assertEquals(m.getBody().getString(), "m2");
+ m = consumer.receive(500);
+ m.acknowledge();
+ assertNotNull(m);
+ assertEquals(m.getBody().getString(), "m3");
+ m = consumer.receive(500);
+ m.acknowledge();
+ assertNotNull(m);
+ assertEquals(m.getBody().getString(), "m4");
+ clientSession2.close();
+ }
+
+ public void testSimpleTimeoutOnSendAndReceive() throws Exception
+ {
+ Xid xid = new XidImpl("xa1".getBytes(), 1, new GUID().toString().getBytes());
+
+ ClientMessage m1 = createTextMessage("m1");
+ ClientMessage m2 = createTextMessage("m2");
+ ClientMessage m3 = createTextMessage("m3");
+ ClientMessage m4 = createTextMessage("m4");
+ ClientMessage m5 = createTextMessage("m5");
+ ClientMessage m6 = createTextMessage("m6");
+ ClientMessage m7 = createTextMessage("m7");
+ ClientMessage m8 = createTextMessage("m8");
+ ClientSession clientSession2 = sessionFactory.createSession(false, true, true, false);
+ ClientProducer clientProducer2 = clientSession2.createProducer(atestq);
+ clientProducer2.send(m1);
+ clientProducer2.send(m2);
+ clientProducer2.send(m3);
+ clientProducer2.send(m4);
+ clientSession2.close();
+ clientSession.setTransactionTimeout(2);
+ clientSession.start(xid, XAResource.TMNOFLAGS);
+ clientSession.start();
+ clientProducer.send(m5);
+ clientProducer.send(m5);
+ clientProducer.send(m5);
+ clientProducer.send(m5);
+ ClientMessage m = clientConsumer.receive(500);
+ m.acknowledge();
+ assertNotNull(m);
+ assertEquals(m.getBody().getString(), "m1");
+ m = clientConsumer.receive(500);
+ assertNotNull(m);
+ m.acknowledge();
+ assertEquals(m.getBody().getString(), "m2");
+ m = clientConsumer.receive(500);
+ m.acknowledge();
+ assertNotNull(m);
+ assertEquals(m.getBody().getString(), "m3");
+ m = clientConsumer.receive(500);
+ m.acknowledge();
+ assertNotNull(m);
+ assertEquals(m.getBody().getString(), "m4");
+ clientSession.end(xid, XAResource.TMSUCCESS);
+ clientSession.prepare(xid);
+ Thread.sleep(2100);
+ try
+ {
+ clientSession.commit(xid, true);
+ }
+ catch (XAException e)
+ {
+ assertTrue(e.errorCode == XAException.XAER_NOTA);
+ }
+ clientSession.setTransactionTimeout(0);
+ clientConsumer.close();
+ clientSession2 = sessionFactory.createSession(false, true, true, false);
+ ClientConsumer consumer = clientSession2.createConsumer(atestq);
+ clientSession2.start();
+ m = consumer.receive(500);
+ m.acknowledge();
+ assertNotNull(m);
+ assertEquals(m.getBody().getString(), "m1");
+ m = consumer.receive(500);
+ assertNotNull(m);
+ m.acknowledge();
+ assertEquals(m.getBody().getString(), "m2");
+ m = consumer.receive(500);
+ m.acknowledge();
+ assertNotNull(m);
+ assertEquals(m.getBody().getString(), "m3");
+ m = consumer.receive(500);
+ m.acknowledge();
+ assertNotNull(m);
+ assertEquals(m.getBody().getString(), "m4");
+ m = consumer.receive(500);
+ assertNull(m);
+ clientSession2.close();
+ }
+
+ private ClientMessage createTextMessage(String s)
+ {
+ return createTextMessage(s, true);
+ }
+
+ private ClientMessage createTextMessage(String s, boolean durable)
+ {
+ ClientMessage message = clientSession.createClientMessage(JBossTextMessage.TYPE, durable, 0, System.currentTimeMillis(), (byte) 1);
+ message.getBody().putString(s);
+ message.getBody().flip();
+ return message;
+ }
+}
More information about the jboss-cvs-commits
mailing list