[jboss-cvs] JBoss Messaging SVN: r5266 - in trunk: src/main/org/jboss/messaging/core/transaction/impl and 1 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Tue Nov 4 14:18:07 EST 2008


Author: ataylor
Date: 2008-11-04 14:18:07 -0500 (Tue, 04 Nov 2008)
New Revision: 5266

Added:
   trunk/tests/src/org/jboss/messaging/tests/integration/xa/XaTimeoutTest.java
Modified:
   trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
   trunk/src/main/org/jboss/messaging/core/transaction/impl/ResourceManagerImpl.java
   trunk/src/main/org/jboss/messaging/core/transaction/impl/TransactionImpl.java
Log:
https://jira.jboss.org/jira/browse/JBMESSAGING-1302 - added xa timeout functionality

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java	2008-11-04 19:02:27 UTC (rev 5265)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java	2008-11-04 19:18:07 UTC (rev 5266)
@@ -216,7 +216,7 @@
 
       storeFactory.setPagingManager(pagingManager);
 
-      resourceManager = new ResourceManagerImpl(0);
+      resourceManager = new ResourceManagerImpl(0, scheduledExecutor, storageManager, postOffice, queueSettingsRepository);
       postOffice = new PostOfficeImpl(storageManager,
                                       pagingManager,
                                       queueFactory,

Modified: trunk/src/main/org/jboss/messaging/core/transaction/impl/ResourceManagerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/transaction/impl/ResourceManagerImpl.java	2008-11-04 19:02:27 UTC (rev 5265)
+++ trunk/src/main/org/jboss/messaging/core/transaction/impl/ResourceManagerImpl.java	2008-11-04 19:18:07 UTC (rev 5266)
@@ -18,44 +18,77 @@
  * License along with this software; if not, write to the Free
  * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
  * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */ 
+ */
 
 package org.jboss.messaging.core.transaction.impl;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.LinkedList;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.Callable;
 
 import javax.transaction.xa.Xid;
 
 import org.jboss.messaging.core.transaction.ResourceManager;
 import org.jboss.messaging.core.transaction.Transaction;
+import org.jboss.messaging.core.exception.MessagingException;
+import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.server.MessageReference;
+import org.jboss.messaging.core.server.Queue;
+import org.jboss.messaging.core.persistence.StorageManager;
+import org.jboss.messaging.core.postoffice.PostOffice;
+import org.jboss.messaging.core.settings.impl.QueueSettings;
+import org.jboss.messaging.core.settings.HierarchicalRepository;
 
 /**
- * 
  * A ResourceManagerImpl
- * 
+ * <p/>
  * TODO - implement timeouts
- * 
+ *
  * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- *
  */
 public class ResourceManagerImpl implements ResourceManager
 {
+   private static final Logger log = Logger.getLogger(ResourceManagerImpl.class);
+
    private final ConcurrentMap<Xid, Transaction> transactions = new ConcurrentHashMap<Xid, Transaction>();
-   
+
    private final int defaultTimeoutSeconds;
-   
+
    private volatile int timeoutSeconds;
-   
-   public ResourceManagerImpl(final int defaultTimeoutSeconds)
-   {      
+
+   private final ScheduledExecutorService executorService;
+
+   private final Map<Xid, ScheduledFuture> scheduledTimeoutTxs = new HashMap<Xid, ScheduledFuture>();
+
+   private final StorageManager storageManager;
+
+   private final PostOffice postOffice;
+
+   private final HierarchicalRepository<QueueSettings> queueSettingsRepository;
+
+   public ResourceManagerImpl(final int defaultTimeoutSeconds,
+                              final ScheduledExecutorService scheduledExecutor,
+                              final StorageManager storageManager,
+                              final PostOffice postOffice,
+                              final HierarchicalRepository<QueueSettings> queueSettingsRepository)
+   {
       this.defaultTimeoutSeconds = defaultTimeoutSeconds;
+      this.executorService = scheduledExecutor;
+      this.storageManager = storageManager;
+      this.postOffice = postOffice;
+      this.queueSettingsRepository = queueSettingsRepository;
    }
-   
+
    // ResourceManager implementation ---------------------------------------------
-   
+
    public Transaction getTransaction(final Xid xid)
    {
       return transactions.get(xid);
@@ -63,19 +96,30 @@
 
    public boolean putTransaction(final Xid xid, final Transaction tx)
    {
-      return transactions.putIfAbsent(xid, tx) == null;
+      boolean added = transactions.putIfAbsent(xid, tx) == null;
+      if (added && timeoutSeconds > 0)
+      {
+         ScheduledFuture<Boolean> future = executorService.schedule(new TxTimeoutHandler(tx), timeoutSeconds, TimeUnit.SECONDS);
+         scheduledTimeoutTxs.put(xid, future);
+      }
+      return added;
    }
 
    public Transaction removeTransaction(final Xid xid)
    {
+      ScheduledFuture<Boolean> future = scheduledTimeoutTxs.get(xid);
+      if (future != null)
+      {
+         future.cancel(true);
+      }
       return transactions.remove(xid);
    }
-   
+
    public int getTimeoutSeconds()
    {
       return this.timeoutSeconds;
    }
-   
+
    public boolean setTimeoutSeconds(final int timeoutSeconds)
    {
       if (timeoutSeconds == 0)
@@ -86,9 +130,9 @@
       else
       {
          this.timeoutSeconds = timeoutSeconds;
-      }      
-      
-      return true;
+      }
+
+      return false;
    }
 
    public List<Xid> getPreparedTransactions()
@@ -96,11 +140,53 @@
       List<Xid> xids = new ArrayList<Xid>();
       for (Xid xid : transactions.keySet())
       {
-         if(transactions.get(xid).getState() == Transaction.State.PREPARED)
+         if (transactions.get(xid).getState() == Transaction.State.PREPARED)
          {
             xids.add(xid);
          }
       }
       return xids;
    }
+
+   class TxTimeoutHandler implements Callable
+   {
+      final Transaction tx;
+
+      public TxTimeoutHandler(Transaction tx) {this.tx = tx;}
+
+      public Object call() throws Exception
+      {
+         transactions.remove(tx.getXid());
+         log.warn("transaction with xid " + tx.getXid() + " timed out");
+         List<MessageReference> rolledBack = tx.timeout();
+         Map<Queue, LinkedList<MessageReference>> queueMap = new HashMap<Queue, LinkedList<MessageReference>>();
+
+         for (MessageReference ref : rolledBack)
+         {
+            if (ref.cancel(storageManager, postOffice, queueSettingsRepository))
+            {
+               Queue queue = ref.getQueue();
+
+               LinkedList<MessageReference> list = queueMap.get(queue);
+
+               if (list == null)
+               {
+                  list = new LinkedList<MessageReference>();
+
+                  queueMap.put(queue, list);
+               }
+
+               list.add(ref);
+            }
+         }
+
+         for (Map.Entry<Queue, LinkedList<MessageReference>> entry : queueMap.entrySet())
+         {
+            LinkedList<MessageReference> refs = entry.getValue();
+
+            entry.getKey().addListFirst(refs);
+         }
+         return null;
+      }
+   }
 }

Modified: trunk/src/main/org/jboss/messaging/core/transaction/impl/TransactionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/transaction/impl/TransactionImpl.java	2008-11-04 19:02:27 UTC (rev 5265)
+++ trunk/src/main/org/jboss/messaging/core/transaction/impl/TransactionImpl.java	2008-11-04 19:18:07 UTC (rev 5266)
@@ -18,6 +18,7 @@
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.Collections;
 
 import javax.transaction.xa.Xid;
 
@@ -74,6 +75,8 @@
 
    private MessagingException messagingException;
 
+   private final Object timeoutLock = new Object();
+
    public TransactionImpl(final StorageManager storageManager, final PostOffice postOffice)
    {
       this.storageManager = storageManager;
@@ -185,6 +188,20 @@
       }
    }
 
+   public List<MessageReference> timeout() throws Exception
+   {
+      //we need to synchronize with commit and rollback just in case they get called atthesame time
+      synchronized (timeoutLock)
+      {
+         //if we've already rolled back or committed we don't need to do anything
+         if(state == State.COMMITTED || state == State.ROLLBACK_ONLY)
+         {
+            return Collections.emptyList();
+         }
+         return doRollback();
+      }
+   }
+
    public void addAcknowledgement(final MessageReference acknowledgement) throws Exception
    {
       if (state != State.ACTIVE)
@@ -259,73 +276,76 @@
 //         throw new IllegalStateException("Can't commit, already inmethod " + inMethod);
 //      }
       inMethod = 2;
-      if (state == State.ROLLBACK_ONLY)
+      synchronized (timeoutLock)
       {
-         if (messagingException != null)
+         if (state == State.ROLLBACK_ONLY)
          {
-            throw messagingException;
+            if (messagingException != null)
+            {
+               throw messagingException;
+            }
+            else
+            {
+               throw new IllegalStateException("Transaction is in invalid state " + state);
+            }
+
          }
+         if (xid != null)
+         {
+            if (state != State.PREPARED)
+            {
+               throw new IllegalStateException("Transaction is in invalid state " + state);
+            }
+         }
          else
          {
-            throw new IllegalStateException("Transaction is in invalid state " + state);
+            if (state != State.ACTIVE)
+            {
+               throw new IllegalStateException("Transaction is in invalid state " + state);
+            }
          }
 
-      }
-      if (xid != null)
-      {
          if (state != State.PREPARED)
          {
-            throw new IllegalStateException("Transaction is in invalid state " + state);
+            pageMessages();
          }
-      }
-      else
-      {
-         if (state != State.ACTIVE)
+
+         if (containsPersistent || xid != null)
          {
-            throw new IllegalStateException("Transaction is in invalid state " + state);
+            storageManager.commit(id);
          }
-      }
 
-      if (state != State.PREPARED)
-      {
-         pageMessages();
-      }
+         for (MessageReference ref : refsToAdd)
+         {
+            Long scheduled = scheduledReferences.get(ref);
+            if(scheduled == null)
+            {
+               ref.getQueue().addLast(ref);
+            }
+            else
+            {
+               ref.setScheduledDeliveryTime(scheduled);
+               ref.getQueue().addLast(ref);
+            }
+         }
 
-      if (containsPersistent || xid != null)
-      {
-         storageManager.commit(id);
-      }
-
-      for (MessageReference ref : refsToAdd)
-      {
-         Long scheduled = scheduledReferences.get(ref);
-         if(scheduled == null)
+         // If part of the transaction goes to the queue, and part goes to paging, we can't let depage start for the
+         // transaction until all the messages were added to the queue
+         // or else we could deliver the messages out of order
+         if (pageTransaction != null)
          {
-            ref.getQueue().addLast(ref);
+            pageTransaction.complete();
          }
-         else
+
+         for (MessageReference reference : acknowledgements)
          {
-            ref.setScheduledDeliveryTime(scheduled);
-            ref.getQueue().addLast(ref);
+            reference.getQueue().referenceAcknowledged(reference);
          }
-      }
 
-      // If part of the transaction goes to the queue, and part goes to paging, we can't let depage start for the
-      // transaction until all the messages were added to the queue
-      // or else we could deliver the messages out of order
-      if (pageTransaction != null)
-      {
-         pageTransaction.complete();
-      }
+         clear();
 
-      for (MessageReference reference : acknowledgements)
-      {
-         reference.getQueue().referenceAcknowledged(reference);
+         state = State.COMMITTED;
       }
-
-      clear();
-
-      state = State.COMMITTED;   
       inMethod = -1;
    }
 
@@ -336,21 +356,35 @@
 //         throw new IllegalStateException("Can't rollback, already inmethod " + inMethod);
 //      }
       inMethod=1;
-      if (xid != null)
+      LinkedList<MessageReference> toCancel;
+      synchronized (timeoutLock)
       {
-         if (state != State.PREPARED && state != State.ACTIVE)
+         if (xid != null)
          {
-            throw new IllegalStateException("Transaction is in invalid state " + state);
+            if (state != State.PREPARED && state != State.ACTIVE)
+            {
+               throw new IllegalStateException("Transaction is in invalid state " + state);
+            }
          }
-      }
-      else
-      {
-         if (state != State.ACTIVE && state != State.ROLLBACK_ONLY)
+         else
          {
-            throw new IllegalStateException("Transaction is in invalid state " + state);
+            if (state != State.ACTIVE && state != State.ROLLBACK_ONLY)
+            {
+               throw new IllegalStateException("Transaction is in invalid state " + state);
+            }
          }
+
+         toCancel = doRollback();
+
+         state = State.ROLLEDBACK;
       }
 
+      inMethod = -1;
+      return toCancel;
+   }
+
+   private LinkedList<MessageReference> doRollback() throws Exception
+   {
       if (containsPersistent || xid != null)
       {
          storageManager.rollback(id);
@@ -362,7 +396,7 @@
       }
 
       LinkedList<MessageReference> toCancel = new LinkedList<MessageReference>();
-      
+
       for (MessageReference ref : acknowledgements)
       {
 //         Queue queue = ref.getQueue();
@@ -370,7 +404,7 @@
 //         ServerMessage message = ref.getMessage();
 
          // Putting back the size on pagingManager, and reverting the counters
-         
+
          //FIXME - why????
          //Surely paging happens before routing, so cancellation shouldn't effect anything......
 //         if (message.incrementReference(message.isDurable() && queue.isDurable()) == 1)
@@ -378,14 +412,10 @@
 //            pagingManager.addSize(message);
 //         }
 
-         toCancel.add(ref);         
+         toCancel.add(ref);
       }
-      
+
       clear();
-
-      state = State.ROLLEDBACK;
-      
-      inMethod = -1;
       return toCancel;
    }
 

Added: trunk/tests/src/org/jboss/messaging/tests/integration/xa/XaTimeoutTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/xa/XaTimeoutTest.java	                        (rev 0)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/xa/XaTimeoutTest.java	2008-11-04 19:18:07 UTC (rev 5266)
@@ -0,0 +1,303 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.messaging.tests.integration.xa;
+
+import org.jboss.messaging.core.settings.impl.QueueSettings;
+import org.jboss.messaging.core.server.MessagingService;
+import org.jboss.messaging.core.server.impl.MessagingServiceImpl;
+import org.jboss.messaging.core.client.ClientSession;
+import org.jboss.messaging.core.client.ClientProducer;
+import org.jboss.messaging.core.client.ClientConsumer;
+import org.jboss.messaging.core.client.ClientSessionFactory;
+import org.jboss.messaging.core.client.ClientMessage;
+import org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl;
+import org.jboss.messaging.core.config.impl.ConfigurationImpl;
+import org.jboss.messaging.core.config.TransportConfiguration;
+import org.jboss.messaging.core.exception.MessagingException;
+import org.jboss.messaging.core.transaction.impl.XidImpl;
+import org.jboss.messaging.util.SimpleString;
+import org.jboss.messaging.jms.client.JBossTextMessage;
+import org.jboss.messaging.tests.util.UnitTestCase;
+import org.jboss.util.id.GUID;
+
+import javax.transaction.xa.Xid;
+import javax.transaction.xa.XAResource;
+import javax.transaction.xa.XAException;
+import java.util.Map;
+import java.util.HashMap;
+import java.io.File;
+
+/**
+ * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
+ */
+public class XaTimeoutTest  extends UnitTestCase
+{
+   private static final String ACCEPTOR_FACTORY = "org.jboss.messaging.core.remoting.impl.invm.InVMAcceptorFactory";
+   private static final String CONNECTOR_FACTORY = "org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory";
+
+   private Map<String, QueueSettings> queueSettings = new HashMap<String, QueueSettings>();
+
+   private MessagingService messagingService;
+   private ClientSession clientSession;
+   private ClientProducer clientProducer;
+   private ClientConsumer clientConsumer;
+   private ClientSessionFactory sessionFactory;
+   private ConfigurationImpl configuration;
+   private SimpleString atestq = new SimpleString("atestq");
+
+   protected void setUp() throws Exception
+   {
+      queueSettings.clear();
+      configuration = new ConfigurationImpl();
+      configuration.setSecurityEnabled(false);
+      TransportConfiguration transportConfig = new TransportConfiguration(ACCEPTOR_FACTORY);
+      configuration.getAcceptorConfigurations().add(transportConfig);
+      messagingService = MessagingServiceImpl.newNullStorageMessagingServer(configuration);
+      //start the server
+      messagingService.start();
+      //then we create a client as normal
+      sessionFactory = new ClientSessionFactoryImpl(new TransportConfiguration(CONNECTOR_FACTORY));
+      clientSession = sessionFactory.createSession(true, false, false, false);
+      clientSession.createQueue(atestq, atestq, null, true, true);
+      clientProducer = clientSession.createProducer(atestq);
+      clientConsumer = clientSession.createConsumer(atestq);
+   }
+
+   protected void tearDown() throws Exception
+   {
+      if (clientSession != null)
+      {
+         try
+         {
+            clientSession.close();
+         }
+         catch (MessagingException e1)
+         {
+            //
+         }
+      }
+      if (messagingService != null && messagingService.isStarted())
+      {
+         try
+         {
+            messagingService.stop();
+         }
+         catch (Exception e1)
+         {
+            //
+         }
+      }
+      messagingService = null;
+      clientSession = null;
+   }
+
+   public void testSimpleTimeoutOnSendOnCommit() throws Exception
+   {
+      Xid xid = new XidImpl("xa1".getBytes(), 1, new GUID().toString().getBytes());
+
+      ClientMessage m1 = createTextMessage("m1");
+      ClientMessage m2 = createTextMessage("m2");
+      ClientMessage m3 = createTextMessage("m3");
+      ClientMessage m4 = createTextMessage("m4");
+      clientSession.setTransactionTimeout(1);
+      clientSession.start(xid, XAResource.TMNOFLAGS);
+      clientProducer.send(m1);
+      clientProducer.send(m2);
+      clientProducer.send(m3);
+      clientProducer.send(m4);
+      clientSession.end(xid, XAResource.TMSUCCESS);
+      clientSession.prepare(xid);
+      Thread.sleep(1100);
+      try
+      {
+         clientSession.commit(xid, true);
+      }
+      catch (XAException e)
+      {
+         assertTrue(e.errorCode == XAException.XAER_NOTA);
+      }
+      clientSession.start();
+      ClientMessage m = clientConsumer.receive(500);
+      assertNull(m);
+   }
+
+   public void testSimpleTimeoutOnReceive() throws Exception
+   {
+      Xid xid = new XidImpl("xa1".getBytes(), 1, new GUID().toString().getBytes());
+
+      ClientMessage m1 = createTextMessage("m1");
+      ClientMessage m2 = createTextMessage("m2");
+      ClientMessage m3 = createTextMessage("m3");
+      ClientMessage m4 = createTextMessage("m4");
+      ClientSession clientSession2 = sessionFactory.createSession(false, true, true, false);
+      ClientProducer clientProducer2 = clientSession2.createProducer(atestq);
+      clientProducer2.send(m1);
+      clientProducer2.send(m2);
+      clientProducer2.send(m3);
+      clientProducer2.send(m4);
+      clientSession2.close();
+      clientSession.setTransactionTimeout(2);
+      clientSession.start(xid, XAResource.TMNOFLAGS);
+      clientSession.start();
+      ClientMessage m = clientConsumer.receive(500);
+      m.acknowledge();
+      assertNotNull(m);
+      assertEquals(m.getBody().getString(), "m1");
+      m = clientConsumer.receive(500);
+      assertNotNull(m);
+      m.acknowledge();
+      assertEquals(m.getBody().getString(), "m2");
+      m = clientConsumer.receive(500);
+      m.acknowledge();
+      assertNotNull(m);
+      assertEquals(m.getBody().getString(), "m3");
+      m = clientConsumer.receive(500);
+      m.acknowledge();
+      assertNotNull(m);
+      assertEquals(m.getBody().getString(), "m4");
+      clientSession.end(xid, XAResource.TMSUCCESS);
+      clientSession.prepare(xid);
+      Thread.sleep(2100);
+      try
+      {
+         clientSession.commit(xid, true);
+      }
+      catch (XAException e)
+      {
+         assertTrue(e.errorCode == XAException.XAER_NOTA);
+      }
+      clientSession.setTransactionTimeout(0);
+      clientConsumer.close();
+      clientSession2 = sessionFactory.createSession(false, true, true, false);
+      ClientConsumer consumer = clientSession2.createConsumer(atestq);
+      clientSession2.start();
+      m = consumer.receive(500);
+      m.acknowledge();
+      assertNotNull(m);
+      assertEquals(m.getBody().getString(), "m1");
+      m = consumer.receive(500);
+      assertNotNull(m);
+      m.acknowledge();
+      assertEquals(m.getBody().getString(), "m2");
+      m = consumer.receive(500);
+      m.acknowledge();
+      assertNotNull(m);
+      assertEquals(m.getBody().getString(), "m3");
+      m = consumer.receive(500);
+      m.acknowledge();
+      assertNotNull(m);
+      assertEquals(m.getBody().getString(), "m4");
+      clientSession2.close();
+   }
+
+   public void testSimpleTimeoutOnSendAndReceive() throws Exception
+   {
+      Xid xid = new XidImpl("xa1".getBytes(), 1, new GUID().toString().getBytes());
+
+      ClientMessage m1 = createTextMessage("m1");
+      ClientMessage m2 = createTextMessage("m2");
+      ClientMessage m3 = createTextMessage("m3");
+      ClientMessage m4 = createTextMessage("m4");
+      ClientMessage m5 = createTextMessage("m5");
+      ClientMessage m6 = createTextMessage("m6");
+      ClientMessage m7 = createTextMessage("m7");
+      ClientMessage m8 = createTextMessage("m8");
+      ClientSession clientSession2 = sessionFactory.createSession(false, true, true, false);
+      ClientProducer clientProducer2 = clientSession2.createProducer(atestq);
+      clientProducer2.send(m1);
+      clientProducer2.send(m2);
+      clientProducer2.send(m3);
+      clientProducer2.send(m4);
+      clientSession2.close();
+      clientSession.setTransactionTimeout(2);
+      clientSession.start(xid, XAResource.TMNOFLAGS);
+      clientSession.start();
+      clientProducer.send(m5);
+      clientProducer.send(m5);
+      clientProducer.send(m5);
+      clientProducer.send(m5);
+      ClientMessage m = clientConsumer.receive(500);
+      m.acknowledge();
+      assertNotNull(m);
+      assertEquals(m.getBody().getString(), "m1");
+      m = clientConsumer.receive(500);
+      assertNotNull(m);
+      m.acknowledge();
+      assertEquals(m.getBody().getString(), "m2");
+      m = clientConsumer.receive(500);
+      m.acknowledge();
+      assertNotNull(m);
+      assertEquals(m.getBody().getString(), "m3");
+      m = clientConsumer.receive(500);
+      m.acknowledge();
+      assertNotNull(m);
+      assertEquals(m.getBody().getString(), "m4");
+      clientSession.end(xid, XAResource.TMSUCCESS);
+      clientSession.prepare(xid);
+      Thread.sleep(2100);
+      try
+      {
+         clientSession.commit(xid, true);
+      }
+      catch (XAException e)
+      {
+         assertTrue(e.errorCode == XAException.XAER_NOTA);
+      }
+      clientSession.setTransactionTimeout(0);
+      clientConsumer.close();
+      clientSession2 = sessionFactory.createSession(false, true, true, false);
+      ClientConsumer consumer = clientSession2.createConsumer(atestq);
+      clientSession2.start();
+      m = consumer.receive(500);
+      m.acknowledge();
+      assertNotNull(m);
+      assertEquals(m.getBody().getString(), "m1");
+      m = consumer.receive(500);
+      assertNotNull(m);
+      m.acknowledge();
+      assertEquals(m.getBody().getString(), "m2");
+      m = consumer.receive(500);
+      m.acknowledge();
+      assertNotNull(m);
+      assertEquals(m.getBody().getString(), "m3");
+      m = consumer.receive(500);
+      m.acknowledge();
+      assertNotNull(m);
+      assertEquals(m.getBody().getString(), "m4");
+      m = consumer.receive(500);
+      assertNull(m);
+      clientSession2.close();
+   }
+   
+   private ClientMessage createTextMessage(String s)
+   {
+      return createTextMessage(s, true);
+   }
+
+   private ClientMessage createTextMessage(String s, boolean durable)
+   {
+      ClientMessage message = clientSession.createClientMessage(JBossTextMessage.TYPE, durable, 0, System.currentTimeMillis(), (byte) 1);
+      message.getBody().putString(s);
+      message.getBody().flip();
+      return message;
+   }
+}




More information about the jboss-cvs-commits mailing list