[jboss-cvs] JBoss Messaging SVN: r5280 - in trunk: src/main/org/jboss/messaging/core/config and 7 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Wed Nov 5 12:24:25 EST 2008


Author: ataylor
Date: 2008-11-05 12:24:25 -0500 (Wed, 05 Nov 2008)
New Revision: 5280

Modified:
   trunk/src/config/jbm-configuration.xml
   trunk/src/main/org/jboss/messaging/core/config/Configuration.java
   trunk/src/main/org/jboss/messaging/core/config/impl/ConfigurationImpl.java
   trunk/src/main/org/jboss/messaging/core/config/impl/FileConfiguration.java
   trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
   trunk/src/main/org/jboss/messaging/core/transaction/ResourceManager.java
   trunk/src/main/org/jboss/messaging/core/transaction/Transaction.java
   trunk/src/main/org/jboss/messaging/core/transaction/impl/ResourceManagerImpl.java
   trunk/src/main/org/jboss/messaging/core/transaction/impl/TransactionImpl.java
   trunk/tests/config/ConfigurationTest-config.xml
   trunk/tests/src/org/jboss/messaging/tests/integration/xa/XaTimeoutTest.java
   trunk/tests/src/org/jboss/messaging/tests/unit/core/config/impl/ConfigurationImplTest.java
   trunk/tests/src/org/jboss/messaging/tests/unit/core/config/impl/FileConfigurationTest.java
Log:
https://jira.jboss.org/jira/browse/JBMESSAGING-1302 - some changes to xa timeout functionality

Modified: trunk/src/config/jbm-configuration.xml
===================================================================
--- trunk/src/config/jbm-configuration.xml	2008-11-05 16:52:19 UTC (rev 5279)
+++ trunk/src/config/jbm-configuration.xml	2008-11-05 17:24:25 UTC (rev 5280)
@@ -21,6 +21,11 @@
       <jmx-management-enabled>true</jmx-management-enabled>
     
       <connection-scan-period>10000</connection-scan-period>
+
+      <!--how long before timing a transaction out-->
+      <transaction-timeout>60000</transaction-timeout>
+      <!--how often to scan for timedout transactions-->
+      <transaction-timeout-scan-period>1000</transaction-timeout-scan-period>
       
       <!-- Example interceptors 
       <remoting-interceptors>

Modified: trunk/src/main/org/jboss/messaging/core/config/Configuration.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/config/Configuration.java	2008-11-05 16:52:19 UTC (rev 5279)
+++ trunk/src/main/org/jboss/messaging/core/config/Configuration.java	2008-11-05 17:24:25 UTC (rev 5280)
@@ -145,5 +145,13 @@
 
    boolean isWildcardRoutingEnabled();
 
+   long getTransactionTimeout();
+
+   void setTransactionTimeout(long timeout);
+
    boolean isMessageCounterEnabled();
+
+   long getTransactionTimeoutScanPeriod();
+
+   void setTransactionTimeoutScanPeriod(long period);
 }

Modified: trunk/src/main/org/jboss/messaging/core/config/impl/ConfigurationImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/config/impl/ConfigurationImpl.java	2008-11-05 16:52:19 UTC (rev 5279)
+++ trunk/src/main/org/jboss/messaging/core/config/impl/ConfigurationImpl.java	2008-11-05 17:24:25 UTC (rev 5280)
@@ -77,12 +77,16 @@
 
    public static final boolean DEFAULT_MESSAGE_COUNTER_ENABLED = false;
 
+   public static final long DEFAULT_TRANSACTION_TIMEOUT = 60000;
+
+   public static final long DEFAULT_TRANSACTION_TIMEOUT_SCAN_PERIOD = 1000;
+
    // Attributes -----------------------------------------------------------------------------
 
    protected boolean clustered = DEFAULT_CLUSTERED;
 
    protected boolean backup = DEFAULT_BACKUP;
-      
+
    protected long queueActivationTimeout = DEFAULT_QUEUE_ACTIVATION_TIMEOUT;
 
    protected int scheduledThreadPoolMaxSize = DEFAULT_SCHEDULED_THREAD_POOL_MAX_SIZE;
@@ -136,7 +140,11 @@
    protected boolean wildcardRoutingEnabled = DEFAULT_WILDCARD_ROUTING_ENABLED;
 
    protected boolean messageCounterEnabled = DEFAULT_MESSAGE_COUNTER_ENABLED;
-   
+
+   protected long transactionTimeout = DEFAULT_TRANSACTION_TIMEOUT;
+
+   protected long transactionTimeoutScanPeriod = DEFAULT_TRANSACTION_TIMEOUT_SCAN_PERIOD;
+
    public boolean isClustered()
    {
       return clustered;
@@ -156,12 +164,12 @@
    {
       this.backup = backup;
    }
-   
+
    public long getQueueActivationTimeout()
    {
       return queueActivationTimeout;
    }
-   
+
    public void setQueueActivationTimeout(long timeout)
    {
       this.queueActivationTimeout = timeout;
@@ -352,6 +360,26 @@
       return wildcardRoutingEnabled;
    }
 
+   public long getTransactionTimeout()
+   {
+      return transactionTimeout;
+   }
+
+   public void setTransactionTimeout(long timeout)
+   {
+      transactionTimeout = timeout;
+   }
+
+   public long getTransactionTimeoutScanPeriod()
+   {
+      return transactionTimeoutScanPeriod;
+   }
+
+   public void setTransactionTimeoutScanPeriod(long period)
+   {
+      transactionTimeoutScanPeriod = period;
+   }
+
    public boolean isSecurityEnabled()
    {
       return securityEnabled;
@@ -391,7 +419,7 @@
    {
       pagingMaxGlobalSize = maxGlobalSize;
    }
-   
+
    public boolean isMessageCounterEnabled()
    {
       return messageCounterEnabled;

Modified: trunk/src/main/org/jboss/messaging/core/config/impl/FileConfiguration.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/config/impl/FileConfiguration.java	2008-11-05 16:52:19 UTC (rev 5279)
+++ trunk/src/main/org/jboss/messaging/core/config/impl/FileConfiguration.java	2008-11-05 17:24:25 UTC (rev 5280)
@@ -87,6 +87,10 @@
       securityInvalidationInterval = getLong(e, "security-invalidation-interval", securityInvalidationInterval);
       
       connectionScanPeriod = getLong(e, "connection-scan-period", connectionScanPeriod);
+
+      transactionTimeout = getLong(e, "transaction-timeout", transactionTimeout);
+
+      transactionTimeoutScanPeriod = getLong(e, "transaction-timeout-scan-period", transactionTimeoutScanPeriod);
             
       NodeList interceptorNodes = e.getElementsByTagName("remoting-interceptors");
 

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java	2008-11-05 16:52:19 UTC (rev 5279)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java	2008-11-05 17:24:25 UTC (rev 5280)
@@ -216,7 +216,7 @@
 
       storeFactory.setPagingManager(pagingManager);
 
-      resourceManager = new ResourceManagerImpl(0, scheduledExecutor, storageManager, postOffice, queueSettingsRepository);
+      resourceManager = new ResourceManagerImpl((int) configuration.getTransactionTimeout()/1000, configuration.getTransactionTimeoutScanPeriod(), storageManager, postOffice, queueSettingsRepository);
       postOffice = new PostOfficeImpl(storageManager,
                                       pagingManager,
                                       queueFactory,
@@ -237,6 +237,7 @@
                                                           this);
 
       postOffice.start();
+      resourceManager.start();
 
       TransportConfiguration backupConnector = configuration.getBackupConnectorConfiguration();
 
@@ -291,6 +292,8 @@
       }
 
       securityStore = null;
+      resourceManager.stop();
+      resourceManager = null;
       postOffice.stop();
       postOffice = null;
       securityRepository = null;

Modified: trunk/src/main/org/jboss/messaging/core/transaction/ResourceManager.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/transaction/ResourceManager.java	2008-11-05 16:52:19 UTC (rev 5279)
+++ trunk/src/main/org/jboss/messaging/core/transaction/ResourceManager.java	2008-11-05 17:24:25 UTC (rev 5280)
@@ -22,6 +22,8 @@
 
 package org.jboss.messaging.core.transaction;
 
+import org.jboss.messaging.core.server.MessagingComponent;
+
 import java.util.List;
 
 import javax.transaction.xa.Xid;
@@ -34,7 +36,7 @@
  * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
  *
  */
-public interface ResourceManager
+public interface ResourceManager extends MessagingComponent
 {
    boolean putTransaction(Xid xid, Transaction tx);
    

Modified: trunk/src/main/org/jboss/messaging/core/transaction/Transaction.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/transaction/Transaction.java	2008-11-05 16:52:19 UTC (rev 5279)
+++ trunk/src/main/org/jboss/messaging/core/transaction/Transaction.java	2008-11-05 17:24:25 UTC (rev 5280)
@@ -76,6 +76,8 @@
 
    List<MessageReference> timeout() throws Exception;
 
+   long getCreateTime();
+
    static enum State
    {
       ACTIVE, PREPARED, COMMITTED, ROLLEDBACK, SUSPENDED, ROLLBACK_ONLY

Modified: trunk/src/main/org/jboss/messaging/core/transaction/impl/ResourceManagerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/transaction/impl/ResourceManagerImpl.java	2008-11-05 16:52:19 UTC (rev 5279)
+++ trunk/src/main/org/jboss/messaging/core/transaction/impl/ResourceManagerImpl.java	2008-11-05 17:24:25 UTC (rev 5280)
@@ -23,30 +23,29 @@
 package org.jboss.messaging.core.transaction.impl;
 
 import java.util.ArrayList;
+import java.util.List;
 import java.util.HashMap;
+import java.util.Map;
 import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
+import java.util.TimerTask;
+import java.util.Set;
+import java.util.HashSet;
 import java.util.Timer;
-import java.util.TimerTask;
-import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.TimeUnit;
 
 import javax.transaction.xa.Xid;
 
+import org.jboss.messaging.core.transaction.ResourceManager;
+import org.jboss.messaging.core.transaction.Transaction;
 import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.server.MessageReference;
+import org.jboss.messaging.core.server.Queue;
+import org.jboss.messaging.core.server.MessagingComponent;
 import org.jboss.messaging.core.persistence.StorageManager;
 import org.jboss.messaging.core.postoffice.PostOffice;
-import org.jboss.messaging.core.server.MessageReference;
-import org.jboss.messaging.core.server.Queue;
+import org.jboss.messaging.core.settings.impl.QueueSettings;
 import org.jboss.messaging.core.settings.HierarchicalRepository;
-import org.jboss.messaging.core.settings.impl.QueueSettings;
-import org.jboss.messaging.core.transaction.ResourceManager;
-import org.jboss.messaging.core.transaction.Transaction;
 
 /**
  * A ResourceManagerImpl
@@ -55,7 +54,7 @@
  *
  * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
  */
-public class ResourceManagerImpl implements ResourceManager
+public class ResourceManagerImpl implements ResourceManager, MessagingComponent
 {
    private static final Logger log = Logger.getLogger(ResourceManagerImpl.class);
 
@@ -65,29 +64,73 @@
 
    private volatile int timeoutSeconds;
 
-   private final ScheduledExecutorService executorService;
-
-   private final Map<Xid, ScheduledFuture<Boolean>> scheduledTimeoutTxs = new HashMap<Xid, ScheduledFuture<Boolean>>();
-
    private final StorageManager storageManager;
 
    private final PostOffice postOffice;
 
    private final HierarchicalRepository<QueueSettings> queueSettingsRepository;
-   
+
+   private boolean started = false;
+
+   private Timer timer;
+
+   private TimerTask task;
+
+   private final long txTimeoutScanPeriod;
+
    public ResourceManagerImpl(final int defaultTimeoutSeconds,
-                              final ScheduledExecutorService scheduledExecutor,
+                              final long txTimeoutScanPeriod,
                               final StorageManager storageManager,
                               final PostOffice postOffice,
                               final HierarchicalRepository<QueueSettings> queueSettingsRepository)
    {
       this.defaultTimeoutSeconds = defaultTimeoutSeconds;
-      this.executorService = scheduledExecutor;
+      this.timeoutSeconds = defaultTimeoutSeconds;
+      this.txTimeoutScanPeriod = txTimeoutScanPeriod;
       this.storageManager = storageManager;
       this.postOffice = postOffice;
       this.queueSettingsRepository = queueSettingsRepository;
    }
 
+   // MessagingComponent implementation
+
+   public void start() throws Exception
+   {
+      if (started)
+      {
+         return;
+      }
+      timer = new Timer(true);
+      task = new TxTimeoutHandler();
+      timer.schedule(task, txTimeoutScanPeriod, txTimeoutScanPeriod);
+      started = true;
+   }
+
+   public void stop() throws Exception
+   {
+      if (!started)
+      {
+         return;
+      }
+      if (timer != null)
+      {
+         task.cancel();
+
+         task = null;
+
+         timer.cancel();
+
+         timer = null;
+      }
+
+      started = false;
+   }
+
+   public boolean isStarted()
+   {
+      return started;
+   }
+
    // ResourceManager implementation ---------------------------------------------
 
    public Transaction getTransaction(final Xid xid)
@@ -97,24 +140,11 @@
 
    public boolean putTransaction(final Xid xid, final Transaction tx)
    {
-      boolean added = transactions.putIfAbsent(xid, tx) == null;
-      if (added && timeoutSeconds > 0)
-      {
-         ScheduledFuture<Boolean> future = executorService.schedule(new TxTimeoutHandler(tx),
-                                                                    timeoutSeconds,
-                                                                    TimeUnit.SECONDS);
-         scheduledTimeoutTxs.put(xid, future);
-      }
-      return added;
+      return transactions.putIfAbsent(xid, tx) == null;
    }
 
    public Transaction removeTransaction(final Xid xid)
    {
-      ScheduledFuture<Boolean> future = scheduledTimeoutTxs.get(xid);
-      if (future != null)
-      {
-         future.cancel(true);
-      }
       return transactions.remove(xid);
    }
 
@@ -127,7 +157,7 @@
    {
       if (timeoutSeconds == 0)
       {
-         // reset to default
+         //reset to default
          this.timeoutSeconds = defaultTimeoutSeconds;
       }
       else
@@ -141,7 +171,7 @@
    public List<Xid> getPreparedTransactions()
    {
       List<Xid> xids = new ArrayList<Xid>();
-      
+
       for (Xid xid : transactions.keySet())
       {
          if (transactions.get(xid).getState() == Transaction.State.PREPARED)
@@ -152,53 +182,63 @@
       return xids;
    }
 
-   private class TxTimeoutHandler implements Callable
+   class TxTimeoutHandler extends TimerTask
    {
-      final Transaction tx;
-
-      public TxTimeoutHandler(final Transaction tx)
+      public void run()
       {
-         this.tx = tx;
-      }
+         Set<Transaction> timedoutTransactions = new HashSet<Transaction>();
 
-      public Object call() throws Exception
-      {
-         transactions.remove(tx.getXid());
+         long now = System.currentTimeMillis();
 
-         log.warn("transaction with xid " + tx.getXid() + " timed out");
+         for (Transaction tx : transactions.values())
+         {
+            if (tx.getState() != Transaction.State.PREPARED && now > (tx.getCreateTime() + timeoutSeconds * 1000))
+            {
+               transactions.remove(tx.getXid());
+               log.warn("transaction with xid " + tx.getXid() + " timed out");
+               timedoutTransactions.add(tx);
+            }
+         }
 
-         List<MessageReference> rolledBack = tx.timeout();
-
-         Map<Queue, LinkedList<MessageReference>> queueMap = new HashMap<Queue, LinkedList<MessageReference>>();
-
-         // TODO - this code is duplicated in ServerSessionImpl - combine
-         for (MessageReference ref : rolledBack)
+         for (Transaction failedTransaction : timedoutTransactions)
          {
-            if (ref.cancel(storageManager, postOffice, queueSettingsRepository))
+            try
             {
-               Queue queue = ref.getQueue();
+               List<MessageReference> rolledBack = failedTransaction.timeout();
+               Map<Queue, LinkedList<MessageReference>> queueMap = new HashMap<Queue, LinkedList<MessageReference>>();
 
-               LinkedList<MessageReference> list = queueMap.get(queue);
+               for (MessageReference ref : rolledBack)
+               {
+                  if (ref.cancel(storageManager, postOffice, queueSettingsRepository))
+                  {
+                     Queue queue = ref.getQueue();
 
-               if (list == null)
+                     LinkedList<MessageReference> list = queueMap.get(queue);
+
+                     if (list == null)
+                     {
+                        list = new LinkedList<MessageReference>();
+
+                        queueMap.put(queue, list);
+                     }
+
+                     list.add(ref);
+                  }
+               }
+
+               for (Map.Entry<Queue, LinkedList<MessageReference>> entry : queueMap.entrySet())
                {
-                  list = new LinkedList<MessageReference>();
+                  LinkedList<MessageReference> refs = entry.getValue();
 
-                  queueMap.put(queue, list);
+                  entry.getKey().addListFirst(refs);
                }
-
-               list.add(ref);
             }
+            catch (Exception e)
+            {
+               log.error("failed to timeout transaction, xid:" + failedTransaction.getXid(), e);
+            }
          }
+      }
 
-         for (Map.Entry<Queue, LinkedList<MessageReference>> entry : queueMap.entrySet())
-         {
-            LinkedList<MessageReference> refs = entry.getValue();
-
-            entry.getKey().addListFirst(refs);
-         }
-
-         return null;
-      }
    }
 }

Modified: trunk/src/main/org/jboss/messaging/core/transaction/impl/TransactionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/transaction/impl/TransactionImpl.java	2008-11-05 16:52:19 UTC (rev 5279)
+++ trunk/src/main/org/jboss/messaging/core/transaction/impl/TransactionImpl.java	2008-11-05 17:24:25 UTC (rev 5280)
@@ -39,7 +39,7 @@
 
 /**
  * A TransactionImpl
- * 
+ *
  * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
  * @author <a href="mailto:andy.taylor at jboss.org>Andy Taylor</a>
  */
@@ -77,6 +77,8 @@
 
    private final Object timeoutLock = new Object();
 
+   private final long createTime;
+
    public TransactionImpl(final StorageManager storageManager, final PostOffice postOffice)
    {
       this.storageManager = storageManager;
@@ -95,6 +97,8 @@
       this.xid = null;
 
       this.id = storageManager.generateUniqueID();
+
+      createTime = System.currentTimeMillis();
    }
 
    public TransactionImpl(final Xid xid, final StorageManager storageManager, final PostOffice postOffice)
@@ -115,6 +119,8 @@
       this.xid = xid;
 
       this.id = storageManager.generateUniqueID();
+
+      createTime = System.currentTimeMillis();
    }
 
    public TransactionImpl(final long id, final Xid xid, final StorageManager storageManager, final PostOffice postOffice)
@@ -135,6 +141,8 @@
       {
          this.pagingManager = postOffice.getPagingManager();
       }
+
+      createTime = System.currentTimeMillis();
    }
 
    // Transaction implementation
@@ -197,15 +205,20 @@
       synchronized (timeoutLock)
       {
          // if we've already rolled back or committed we don't need to do anything
-         if (state == State.COMMITTED || state == State.ROLLBACK_ONLY)
+         if (state == State.COMMITTED || state == State.ROLLBACK_ONLY || state == State.PREPARED)
          {
             return Collections.emptyList();
          }
-         
+
          return doRollback();
       }
    }
 
+   public long getCreateTime()
+   {
+      return createTime;
+   }
+
    public void addAcknowledgement(final MessageReference acknowledgement) throws Exception
    {
       if (state != State.ACTIVE)
@@ -256,21 +269,24 @@
 
    public void prepare() throws Exception
    {
-      if (state != State.ACTIVE)
+      synchronized (timeoutLock)
       {
-         throw new IllegalStateException("Transaction is in invalid state " + state);
-      }
+         if (state != State.ACTIVE)
+         {
+            throw new IllegalStateException("Transaction is in invalid state " + state);
+         }
 
-      if (xid == null)
-      {
-         throw new IllegalStateException("Cannot prepare non XA transaction");
-      }
+         if (xid == null)
+         {
+            throw new IllegalStateException("Cannot prepare non XA transaction");
+         }
 
-      pageMessages();
+         pageMessages();
 
-      storageManager.prepare(id, xid);
+         storageManager.prepare(id, xid);
 
-      state = State.PREPARED;
+         state = State.PREPARED;
+      }
    }
 
    public void commit() throws Exception
@@ -559,10 +575,7 @@
                scheduledReferences.put(ref, scheduledDeliveryTime);
                if (ref.getQueue().isDurable())
                {
-                  storageManager.storeMessageReferenceScheduledTransactional(id,
-                                                                             ref.getQueue().getPersistenceID(),
-                                                                             message.getMessageID(),
-                                                                             scheduledDeliveryTime);
+                  storageManager.storeMessageReferenceScheduledTransactional(id, ref.getQueue().getPersistenceID(), message.getMessageID(), scheduledDeliveryTime);
                }
             }
          }

Modified: trunk/tests/config/ConfigurationTest-config.xml
===================================================================
--- trunk/tests/config/ConfigurationTest-config.xml	2008-11-05 16:52:19 UTC (rev 5279)
+++ trunk/tests/config/ConfigurationTest-config.xml	2008-11-05 17:24:25 UTC (rev 5280)
@@ -11,6 +11,8 @@
       <call-timeout>7654</call-timeout>    
       <packet-confirmation-batch-size>543</packet-confirmation-batch-size>
       <connection-scan-period>6543</connection-scan-period>
+      <transaction-timeout>98765</transaction-timeout>
+      <transaction-timeout-scan-period>56789</transaction-timeout-scan-period>
       <remoting-interceptors>
          <class-name>org.jboss.messaging.tests.unit.core.config.impl.TestInterceptor1</class-name>
          <class-name>org.jboss.messaging.tests.unit.core.config.impl.TestInterceptor2</class-name>

Modified: trunk/tests/src/org/jboss/messaging/tests/integration/xa/XaTimeoutTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/xa/XaTimeoutTest.java	2008-11-05 16:52:19 UTC (rev 5279)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/xa/XaTimeoutTest.java	2008-11-05 17:24:25 UTC (rev 5280)
@@ -49,19 +49,26 @@
 /**
  * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
  */
-public class XaTimeoutTest  extends UnitTestCase
+public class XaTimeoutTest extends UnitTestCase
 {
    private static final String ACCEPTOR_FACTORY = "org.jboss.messaging.core.remoting.impl.invm.InVMAcceptorFactory";
+
    private static final String CONNECTOR_FACTORY = "org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory";
 
    private Map<String, QueueSettings> queueSettings = new HashMap<String, QueueSettings>();
 
    private MessagingService messagingService;
+   
    private ClientSession clientSession;
+
    private ClientProducer clientProducer;
+
    private ClientConsumer clientConsumer;
+
    private ClientSessionFactory sessionFactory;
+
    private ConfigurationImpl configuration;
+
    private SimpleString atestq = new SimpleString("atestq");
 
    protected void setUp() throws Exception
@@ -69,6 +76,7 @@
       queueSettings.clear();
       configuration = new ConfigurationImpl();
       configuration.setSecurityEnabled(false);
+      configuration.setTransactionTimeoutScanPeriod(500);
       TransportConfiguration transportConfig = new TransportConfiguration(ACCEPTOR_FACTORY);
       configuration.getAcceptorConfigurations().add(transportConfig);
       messagingService = MessagingServiceImpl.newNullStorageMessagingServer(configuration);
@@ -125,8 +133,7 @@
       clientProducer.send(m3);
       clientProducer.send(m4);
       clientSession.end(xid, XAResource.TMSUCCESS);
-      clientSession.prepare(xid);
-      Thread.sleep(1100);
+      Thread.sleep(1500);
       try
       {
          clientSession.commit(xid, true);
@@ -175,8 +182,7 @@
       assertNotNull(m);
       assertEquals(m.getBody().getString(), "m4");
       clientSession.end(xid, XAResource.TMSUCCESS);
-      clientSession.prepare(xid);
-      Thread.sleep(2100);
+      Thread.sleep(2600);
       try
       {
          clientSession.commit(xid, true);
@@ -232,9 +238,9 @@
       clientSession.start(xid, XAResource.TMNOFLAGS);
       clientSession.start();
       clientProducer.send(m5);
-      clientProducer.send(m5);
-      clientProducer.send(m5);
-      clientProducer.send(m5);
+      clientProducer.send(m6);
+      clientProducer.send(m7);
+      clientProducer.send(m8);
       ClientMessage m = clientConsumer.receive(500);
       m.acknowledge();
       assertNotNull(m);
@@ -252,8 +258,7 @@
       assertNotNull(m);
       assertEquals(m.getBody().getString(), "m4");
       clientSession.end(xid, XAResource.TMSUCCESS);
-      clientSession.prepare(xid);
-      Thread.sleep(2100);
+      Thread.sleep(2600);
       try
       {
          clientSession.commit(xid, true);
@@ -287,7 +292,209 @@
       assertNull(m);
       clientSession2.close();
    }
-   
+
+   public void testPreparedTransactionNotTimedOut() throws Exception
+   {
+      Xid xid = new XidImpl("xa1".getBytes(), 1, new GUID().toString().getBytes());
+
+      ClientMessage m1 = createTextMessage("m1");
+      ClientMessage m2 = createTextMessage("m2");
+      ClientMessage m3 = createTextMessage("m3");
+      ClientMessage m4 = createTextMessage("m4");
+      ClientMessage m5 = createTextMessage("m5");
+      ClientMessage m6 = createTextMessage("m6");
+      ClientMessage m7 = createTextMessage("m7");
+      ClientMessage m8 = createTextMessage("m8");
+      ClientSession clientSession2 = sessionFactory.createSession(false, true, true, false);
+      ClientProducer clientProducer2 = clientSession2.createProducer(atestq);
+      clientProducer2.send(m1);
+      clientProducer2.send(m2);
+      clientProducer2.send(m3);
+      clientProducer2.send(m4);
+      clientSession2.close();
+      clientSession.setTransactionTimeout(2);
+      clientSession.start(xid, XAResource.TMNOFLAGS);
+      clientSession.start();
+      clientProducer.send(m5);
+      clientProducer.send(m6);
+      clientProducer.send(m7);
+      clientProducer.send(m8);
+      ClientMessage m = clientConsumer.receive(500);
+      m.acknowledge();
+      assertNotNull(m);
+      assertEquals(m.getBody().getString(), "m1");
+      m = clientConsumer.receive(500);
+      assertNotNull(m);
+      m.acknowledge();
+      assertEquals(m.getBody().getString(), "m2");
+      m = clientConsumer.receive(500);
+      m.acknowledge();
+      assertNotNull(m);
+      assertEquals(m.getBody().getString(), "m3");
+      m = clientConsumer.receive(500);
+      m.acknowledge();
+      assertNotNull(m);
+      assertEquals(m.getBody().getString(), "m4");
+      clientSession.end(xid, XAResource.TMSUCCESS);
+      clientSession.prepare(xid);
+      Thread.sleep(2600);
+      clientSession.commit(xid, true);
+
+      clientSession.setTransactionTimeout(0);
+      clientConsumer.close();
+      clientSession2 = sessionFactory.createSession(false, true, true, false);
+      ClientConsumer consumer = clientSession2.createConsumer(atestq);
+      clientSession2.start();
+      m = consumer.receive(500);
+      m.acknowledge();
+      assertNotNull(m);
+      assertEquals(m.getBody().getString(), "m5");
+      m = consumer.receive(500);
+      assertNotNull(m);
+      m.acknowledge();
+      assertEquals(m.getBody().getString(), "m6");
+      m = consumer.receive(500);
+      m.acknowledge();
+      assertNotNull(m);
+      assertEquals(m.getBody().getString(), "m7");
+      m = consumer.receive(500);
+      m.acknowledge();
+      assertNotNull(m);
+      assertEquals(m.getBody().getString(), "m8");
+      m = consumer.receive(500);
+      assertNull(m);
+      clientSession2.close();
+   }
+
+   public void testChangingTimeoutGetsPickedUp() throws Exception
+   {
+      Xid xid = new XidImpl("xa1".getBytes(), 1, new GUID().toString().getBytes());
+
+      ClientMessage m1 = createTextMessage("m1");
+      ClientMessage m2 = createTextMessage("m2");
+      ClientMessage m3 = createTextMessage("m3");
+      ClientMessage m4 = createTextMessage("m4");
+      clientSession.start(xid, XAResource.TMNOFLAGS);
+      clientProducer.send(m1);
+      clientProducer.send(m2);
+      clientProducer.send(m3);
+      clientProducer.send(m4);
+      clientSession.end(xid, XAResource.TMSUCCESS);
+      clientSession.setTransactionTimeout(1);
+      Thread.sleep(1500);
+
+      try
+      {
+         clientSession.commit(xid, true);
+      }
+      catch (XAException e)
+      {
+         assertTrue(e.errorCode == XAException.XAER_NOTA);
+      }
+      clientSession.start();
+      ClientMessage m = clientConsumer.receive(500);
+      assertNull(m);
+   }
+
+   public void testChangingTimeoutGetsPickedUpCommit() throws Exception
+   {
+      Xid xid = new XidImpl("xa1".getBytes(), 1, new GUID().toString().getBytes());
+
+      ClientMessage m1 = createTextMessage("m1");
+      ClientMessage m2 = createTextMessage("m2");
+      ClientMessage m3 = createTextMessage("m3");
+      ClientMessage m4 = createTextMessage("m4");
+      clientSession.setTransactionTimeout(2);
+      clientSession.start(xid, XAResource.TMNOFLAGS);
+      clientProducer.send(m1);
+      clientProducer.send(m2);
+      clientProducer.send(m3);
+      clientProducer.send(m4);
+      clientSession.end(xid, XAResource.TMSUCCESS);
+      clientSession.setTransactionTimeout(10000);
+      Thread.sleep(2600);
+      clientSession.prepare(xid);
+      clientSession.commit(xid, true);
+      ClientSession clientSession2 = sessionFactory.createSession(false, true, true, false);
+      ClientConsumer consumer = clientSession2.createConsumer(atestq);
+      clientSession2.start();
+      ClientMessage m = consumer.receive(500);
+      assertNotNull(m);
+      assertEquals(m.getBody().getString(), "m1");
+      m = consumer.receive(500);
+      assertNotNull(m);
+      m.acknowledge();
+      assertEquals(m.getBody().getString(), "m2");
+      m = consumer.receive(500);
+      m.acknowledge();
+      assertNotNull(m);
+      assertEquals(m.getBody().getString(), "m3");
+      m = consumer.receive(500);
+      m.acknowledge();
+      assertNotNull(m);
+      assertEquals(m.getBody().getString(), "m4");
+      clientSession2.close();
+   }
+
+   public void testMultipleTransactionsTimedOut() throws Exception
+   {
+      Xid[] xids = new XidImpl[100];
+      for (int i = 0; i < xids.length; i++)
+      {
+         xids[i] = new XidImpl(("xa" + i).getBytes(), 1, new GUID().toString().getBytes());
+      }
+      ClientSession[] clientSessions = new ClientSession[xids.length];
+      for (int i = 0; i < clientSessions.length; i++)
+      {
+         clientSessions[i] = sessionFactory.createSession(true, false, false, false);
+      }
+
+      ClientProducer[] clientProducers = new ClientProducer[xids.length];
+      for (int i = 0; i < clientProducers.length; i++)
+      {
+         clientProducers[i] = clientSessions[i].createProducer(atestq);
+      }
+
+      ClientMessage[] messages = new ClientMessage[xids.length];
+
+      for (int i = 0; i < messages.length; i++)
+      {
+         messages[i] = createTextMessage("m" + i);
+      }
+      clientSession.setTransactionTimeout(2);
+      for (int i = 0; i < clientSessions.length; i++)
+      {
+         clientSessions[i].start(xids[i], XAResource.TMNOFLAGS);
+      }
+      for (int i = 0; i < clientProducers.length; i++)
+      {
+         clientProducers[i].send(messages[i]);
+      }
+      for (int i = 0; i < clientSessions.length; i++)
+      {
+         clientSessions[i].end(xids[i], XAResource.TMSUCCESS);
+      }
+      Thread.sleep(2500);
+      for (int i = 0; i < clientSessions.length; i++)
+      {
+         try
+         {
+            clientSessions[i].commit(xids[i], true);
+         }
+         catch (XAException e)
+         {
+            assertTrue(e.errorCode == XAException.XAER_NOTA);
+         }
+      }
+      for (ClientSession session : clientSessions)
+      {
+         session.close();
+      }
+      clientSession.start();
+      ClientMessage m = clientConsumer.receive(500);
+      assertNull(m);
+   }
+
    private ClientMessage createTextMessage(String s)
    {
       return createTextMessage(s, true);

Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/config/impl/ConfigurationImplTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/config/impl/ConfigurationImplTest.java	2008-11-05 16:52:19 UTC (rev 5279)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/config/impl/ConfigurationImplTest.java	2008-11-05 17:24:25 UTC (rev 5280)
@@ -71,6 +71,8 @@
       assertEquals(ConfigurationImpl.DEFAULT_JOURNAL_MIN_FILES, conf.getJournalMinFiles());      
       assertEquals(ConfigurationImpl.DEFAULT_JOURNAL_MAX_AIO, conf.getJournalMaxAIO());
       assertEquals(ConfigurationImpl.DEFAULT_WILDCARD_ROUTING_ENABLED, conf.isWildcardRoutingEnabled());
+      assertEquals(ConfigurationImpl.DEFAULT_TRANSACTION_TIMEOUT, conf.getTransactionTimeout());
+      assertEquals(ConfigurationImpl.DEFAULT_TRANSACTION_TIMEOUT_SCAN_PERIOD, conf.getTransactionTimeoutScanPeriod());
    }
    
    public void testSetGetAttributes()

Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/config/impl/FileConfigurationTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/config/impl/FileConfigurationTest.java	2008-11-05 16:52:19 UTC (rev 5279)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/config/impl/FileConfigurationTest.java	2008-11-05 17:24:25 UTC (rev 5280)
@@ -56,6 +56,8 @@
       assertEquals(100, conf.getJournalMinFiles());      
       assertEquals(56546, conf.getJournalMaxAIO());
       assertEquals(true, conf.isWildcardRoutingEnabled());
+      assertEquals(98765, conf.getTransactionTimeout());
+      assertEquals(56789, conf.getTransactionTimeoutScanPeriod());
       
       assertEquals(2, conf.getInterceptorClassNames().size());
       assertTrue(conf.getInterceptorClassNames().contains("org.jboss.messaging.tests.unit.core.config.impl.TestInterceptor1"));




More information about the jboss-cvs-commits mailing list