[hornetq-commits] JBoss hornetq SVN: r9137 - in trunk: src/main/org/hornetq/core/server/impl and 7 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Fri Apr 16 12:46:09 EDT 2010


Author: ataylor
Date: 2010-04-16 12:46:08 -0400 (Fri, 16 Apr 2010)
New Revision: 9137

Modified:
   trunk/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java
   trunk/src/main/org/hornetq/core/client/impl/ClientConsumerInternal.java
   trunk/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
   trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
   trunk/src/main/org/hornetq/core/transaction/ResourceManager.java
   trunk/src/main/org/hornetq/core/transaction/Transaction.java
   trunk/src/main/org/hornetq/core/transaction/impl/ResourceManagerImpl.java
   trunk/src/main/org/hornetq/core/transaction/impl/TransactionImpl.java
   trunk/src/main/org/hornetq/ra/HornetQRAProperties.java
   trunk/src/main/org/hornetq/ra/HornetQResourceAdapter.java
   trunk/src/main/org/hornetq/ra/inflow/HornetQActivation.java
   trunk/src/main/org/hornetq/ra/inflow/HornetQActivationSpec.java
   trunk/src/main/org/hornetq/ra/inflow/HornetQMessageHandler.java
   trunk/tests/src/org/hornetq/tests/integration/xa/XaTimeoutTest.java
   trunk/tests/src/org/hornetq/tests/unit/core/client/impl/LargeMessageBufferTest.java
   trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/BindingsImplTest.java
Log:
https://jira.jboss.org/jira/browse/HORNETQ-363 - fixed tx timeout and RA

Modified: trunk/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java	2010-04-16 14:45:28 UTC (rev 9136)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java	2010-04-16 16:46:08 UTC (rev 9137)
@@ -369,7 +369,7 @@
       // if unsetting a previous handler may be in onMessage so wait for completion
       else if (handler == null && !noPreviousHandler)
       {
-         waitForOnMessageToComplete();
+         waitForOnMessageToComplete(true);
       }
    }
 
@@ -397,8 +397,13 @@
 
    public void stop() throws HornetQException
    {
-      waitForOnMessageToComplete();
+      stop(true);
+   }
 
+   public void stop(final boolean waitForOnMessage) throws HornetQException
+   {
+      waitForOnMessageToComplete(waitForOnMessage);
+
       synchronized (this)
       {
          if (stopped)
@@ -552,7 +557,7 @@
       currentLargeMessageBuffer.addPacket(chunk);
    }
 
-   public void clear() throws HornetQException
+   public void clear(boolean waitForOnMessage) throws HornetQException
    {
       synchronized (this)
       {
@@ -572,7 +577,7 @@
 
       // Need to send credits for the messages in the buffer
 
-      waitForOnMessageToComplete();
+      waitForOnMessageToComplete(waitForOnMessage);
    }
 
    public int getClientWindowSize()
@@ -723,14 +728,14 @@
       channel.send(new SessionConsumerFlowCreditMessage(id, credits));
    }
 
-   private void waitForOnMessageToComplete()
+   private void waitForOnMessageToComplete(boolean waitForOnMessage)
    {
       if (handler == null)
       {
          return;
       }
 
-      if (Thread.currentThread() == onMessageThread)
+      if (!waitForOnMessage || Thread.currentThread() == onMessageThread)
       {
          // If called from inside onMessage then return immediately - otherwise would block
          return;
@@ -855,7 +860,7 @@
          closing = true;
 
          // Now we wait for any current handler runners to run.
-         waitForOnMessageToComplete();
+         waitForOnMessageToComplete(true);
 
          if (currentLargeMessageBuffer != null)
          {

Modified: trunk/src/main/org/hornetq/core/client/impl/ClientConsumerInternal.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ClientConsumerInternal.java	2010-04-16 14:45:28 UTC (rev 9136)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientConsumerInternal.java	2010-04-16 16:46:08 UTC (rev 9137)
@@ -46,7 +46,7 @@
 
    void flowControl(final int messageBytes, final boolean discountSlowConsumer) throws HornetQException;
 
-   void clear() throws HornetQException;
+   void clear(boolean waitForOnMessage) throws HornetQException;
 
    void clearAtFailover();
 
@@ -62,6 +62,8 @@
 
    void stop() throws HornetQException;
 
+   void stop(boolean waitForOnMessage) throws HornetQException;
+
    void start();
    
    SessionQueueQueryResponseMessage getQueueInfo();

Modified: trunk/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java	2010-04-16 14:45:28 UTC (rev 9136)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java	2010-04-16 16:46:08 UTC (rev 9137)
@@ -560,7 +560,7 @@
       // We need to make sure we don't get any inflight messages
       for (ClientConsumerInternal consumer : consumers.values())
       {
-         consumer.clear();
+         consumer.clear(true);
       }
 
       // Acks must be flushed here *after connection is stopped and all onmessages finished executing
@@ -639,19 +639,7 @@
 
    public void stop() throws HornetQException
    {
-      checkClosed();
-
-      if (started)
-      {
-         for (ClientConsumerInternal clientConsumerInternal : consumers.values())
-         {
-            clientConsumerInternal.stop();
-         }
-
-         channel.sendBlocking(new PacketImpl(PacketImpl.SESS_STOP));
-
-         started = false;
-      }
+      stop(true);
    }
 
    public void addFailureListener(final SessionFailureListener listener)
@@ -1380,13 +1368,13 @@
 
          if (wasStarted)
          {
-            stop();
+            stop(false);
          }
 
          // We need to make sure we don't get any inflight messages
          for (ClientConsumerInternal consumer : consumers.values())
          {
-            consumer.clear();
+            consumer.clear(false);
          }
 
          flushAcks();
@@ -1712,6 +1700,23 @@
       }
    }
 
+   public void stop(final boolean waitForOnMessage) throws HornetQException
+   {
+      checkClosed();
+
+      if (started)
+      {
+         for (ClientConsumerInternal clientConsumerInternal : consumers.values())
+         {
+            clientConsumerInternal.stop(waitForOnMessage);
+         }
+
+         channel.sendBlocking(new PacketImpl(PacketImpl.SESS_STOP));
+
+         started = false;
+      }
+   }
+
    private static class BindingQueryImpl implements BindingQuery
    {
 

Modified: trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java	2010-04-16 14:45:28 UTC (rev 9136)
+++ trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java	2010-04-16 16:46:08 UTC (rev 9137)
@@ -136,6 +136,8 @@
 
    private volatile SimpleString defaultAddress;
 
+   private volatile int timeoutSeconds;
+
    // Constructors ---------------------------------------------------------------------------------
 
    public ServerSessionImpl(final String name,
@@ -180,9 +182,11 @@
 
       this.securityStore = securityStore;
 
+      timeoutSeconds = resourceManager.getTimeoutSeconds();
+
       if (!xa)
       {
-         tx = new TransactionImpl(storageManager);
+         tx = new TransactionImpl(storageManager, timeoutSeconds);
       }
 
       this.xa = xa;
@@ -558,7 +562,7 @@
       }
       finally
       {
-         tx = new TransactionImpl(storageManager);
+         tx = new TransactionImpl(storageManager, timeoutSeconds);
       }
    }
 
@@ -568,12 +572,12 @@
       {
          // Might be null if XA
 
-         tx = new TransactionImpl(storageManager);
+         tx = new TransactionImpl(storageManager, timeoutSeconds);
       }
 
       doRollback(considerLastMessageAsDelivered, tx);
 
-      tx = new TransactionImpl(storageManager);
+      tx = new TransactionImpl(storageManager, timeoutSeconds);
    }
 
    public void xaCommit(final Xid xid, final boolean onePhase) throws Exception
@@ -809,7 +813,7 @@
       }
       else
       {
-         tx = new TransactionImpl(xid, storageManager, postOffice);
+         tx = new TransactionImpl(xid, storageManager, timeoutSeconds);
 
          boolean added = resourceManager.putTransaction(xid, tx);
 
@@ -898,7 +902,11 @@
 
    public void xaSetTimeout(final int timeout)
    {
-      resourceManager.setTimeoutSeconds(timeout);
+      timeoutSeconds = timeout;
+      if(tx != null)
+      {
+         tx.setTimeout(timeout);
+      }
    }
 
    public void start()

Modified: trunk/src/main/org/hornetq/core/transaction/ResourceManager.java
===================================================================
--- trunk/src/main/org/hornetq/core/transaction/ResourceManager.java	2010-04-16 14:45:28 UTC (rev 9136)
+++ trunk/src/main/org/hornetq/core/transaction/ResourceManager.java	2010-04-16 16:46:08 UTC (rev 9137)
@@ -37,8 +37,6 @@
 
    int getTimeoutSeconds();
 
-   boolean setTimeoutSeconds(int timeoutSeconds);
-
    List<Xid> getPreparedTransactions();
 
    Map<Xid, Long> getPreparedTransactionsWithCreationTime();

Modified: trunk/src/main/org/hornetq/core/transaction/Transaction.java
===================================================================
--- trunk/src/main/org/hornetq/core/transaction/Transaction.java	2010-04-16 14:45:28 UTC (rev 9136)
+++ trunk/src/main/org/hornetq/core/transaction/Transaction.java	2010-04-16 16:46:08 UTC (rev 9137)
@@ -55,12 +55,16 @@
 
    void removeOperation(TransactionOperation sync);
 
+   boolean hasTimedOut(long currentTime, int defaultTimeout);
+
    void putProperty(int index, Object property);
 
    Object getProperty(int index);
 
    void setContainsPersistent();
 
+   void setTimeout(int timeout);
+
    static enum State
    {
       ACTIVE, PREPARED, COMMITTED, ROLLEDBACK, SUSPENDED, ROLLBACK_ONLY

Modified: trunk/src/main/org/hornetq/core/transaction/impl/ResourceManagerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/transaction/impl/ResourceManagerImpl.java	2010-04-16 14:45:28 UTC (rev 9136)
+++ trunk/src/main/org/hornetq/core/transaction/impl/ResourceManagerImpl.java	2010-04-16 16:46:08 UTC (rev 9137)
@@ -49,8 +49,6 @@
 
    private final int defaultTimeoutSeconds;
 
-   private volatile int timeoutSeconds;
-
    private boolean started = false;
 
    private TxTimeoutHandler task;
@@ -64,7 +62,6 @@
                               final ScheduledExecutorService scheduledThreadPool)
    {
       this.defaultTimeoutSeconds = defaultTimeoutSeconds;
-      timeoutSeconds = defaultTimeoutSeconds;
       this.txTimeoutScanPeriod = txTimeoutScanPeriod;
       this.scheduledThreadPool = scheduledThreadPool;
    }
@@ -125,24 +122,9 @@
 
    public int getTimeoutSeconds()
    {
-      return timeoutSeconds;
+      return defaultTimeoutSeconds;
    }
 
-   public boolean setTimeoutSeconds(final int timeoutSeconds)
-   {
-      if (timeoutSeconds == 0)
-      {
-         // reset to default
-         this.timeoutSeconds = defaultTimeoutSeconds;
-      }
-      else
-      {
-         this.timeoutSeconds = timeoutSeconds;
-      }
-
-      return true;
-   }
-
    public List<Xid> getPreparedTransactions()
    {
       List<Xid> xids = new ArrayList<Xid>();
@@ -231,7 +213,7 @@
 
          for (Transaction tx : transactions.values())
          {
-            if (tx.getState() != Transaction.State.PREPARED && now > tx.getCreateTime() + timeoutSeconds * 1000)
+            if (tx.hasTimedOut(now, defaultTimeoutSeconds))
             {
                transactions.remove(tx.getXid());
                ResourceManagerImpl.log.warn("transaction with xid " + tx.getXid() + " timed out");

Modified: trunk/src/main/org/hornetq/core/transaction/impl/TransactionImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/transaction/impl/TransactionImpl.java	2010-04-16 14:45:28 UTC (rev 9136)
+++ trunk/src/main/org/hornetq/core/transaction/impl/TransactionImpl.java	2010-04-16 16:46:08 UTC (rev 9137)
@@ -59,6 +59,21 @@
 
    private volatile boolean containsPersistent;
 
+   private int timeoutSeconds = -1;
+
+   public TransactionImpl(final StorageManager storageManager, final int timeoutSeconds)
+   {
+      this.storageManager = storageManager;
+
+      xid = null;
+
+      id = storageManager.generateUniqueID();
+
+      createTime = System.currentTimeMillis();
+
+      this.timeoutSeconds = timeoutSeconds;
+   }
+
    public TransactionImpl(final StorageManager storageManager)
    {
       this.storageManager = storageManager;
@@ -70,7 +85,7 @@
       createTime = System.currentTimeMillis();
    }
 
-   public TransactionImpl(final Xid xid, final StorageManager storageManager, final PostOffice postOffice)
+   public TransactionImpl(final Xid xid, final StorageManager storageManager, final int timeoutSeconds)
    {
       this.storageManager = storageManager;
 
@@ -79,6 +94,8 @@
       id = storageManager.generateUniqueID();
 
       createTime = System.currentTimeMillis();
+
+      this.timeoutSeconds = timeoutSeconds;
    }
 
    public TransactionImpl(final long id, final Xid xid, final StorageManager storageManager)
@@ -100,6 +117,11 @@
       containsPersistent = true;
    }
 
+   public void setTimeout(final int timeout)
+   {
+      this.timeoutSeconds = timeout;
+   }
+
    public long getID()
    {
       return id;
@@ -110,6 +132,18 @@
       return createTime;
    }
 
+   public boolean hasTimedOut(final long currentTime,final int defaultTimeout)
+   {
+      if(timeoutSeconds == - 1)
+      {
+         return getState() != Transaction.State.PREPARED && currentTime > createTime + defaultTimeout * 1000;
+      }
+      else
+      {
+         return getState() != Transaction.State.PREPARED && currentTime > createTime + timeoutSeconds * 1000;
+      }
+   }
+
    public void prepare() throws Exception
    {
       synchronized (timeoutLock)

Modified: trunk/src/main/org/hornetq/ra/HornetQRAProperties.java
===================================================================
--- trunk/src/main/org/hornetq/ra/HornetQRAProperties.java	2010-04-16 14:45:28 UTC (rev 9136)
+++ trunk/src/main/org/hornetq/ra/HornetQRAProperties.java	2010-04-16 16:46:08 UTC (rev 9137)
@@ -41,9 +41,6 @@
    /** The password */
    private String password;
 
-   /** Use XA */
-   private Boolean useXA;
-
    /** Use Local TX instead of XA */
    private Boolean localTx = false;
 
@@ -142,52 +139,12 @@
       this.localTx = localTx;
    }
 
-   /**
-    * Get the use XA flag
-    * @return The value
-    */
-   public Boolean getUseXA()
-   {
-      if (HornetQRAProperties.trace)
-      {
-         HornetQRAProperties.log.trace("getUseXA()");
-      }
 
-      return useXA;
-   }
-
-   /**
-    * Set the use XA flag
-    * @param xa The value
-    */
-   public void setUseXA(final Boolean xa)
-   {
-      if (HornetQRAProperties.trace)
-      {
-         HornetQRAProperties.log.trace("setUseXA(" + xa + ")");
-      }
-
-      useXA = xa;
-   }
-
-   /**
-    * Use XA for communication
-    * @return The value
-    */
-   public boolean isUseXA()
-   {
-      if (HornetQRAProperties.trace)
-      {
-         HornetQRAProperties.log.trace("isUseXA()");
-      }
-
-      return useXA != null && useXA;
-   }
    
    @Override
    public String toString()
    {
-      return "HornetQRAProperties[useXA=" + useXA + ", localTx=" + localTx +
+      return "HornetQRAProperties[localTx=" + localTx +
          ", userName=" + userName + ", password=" + password + "]";
    }
 }

Modified: trunk/src/main/org/hornetq/ra/HornetQResourceAdapter.java
===================================================================
--- trunk/src/main/org/hornetq/ra/HornetQResourceAdapter.java	2010-04-16 14:45:28 UTC (rev 9136)
+++ trunk/src/main/org/hornetq/ra/HornetQResourceAdapter.java	2010-04-16 16:46:08 UTC (rev 9137)
@@ -1186,37 +1186,8 @@
       raProperties.setUseLocalTx(localTx);
    }
 
-   /**
-    * Get the use XA flag
-    *
-    * @return The value
-    */
-   public Boolean getUseXA()
-   {
-      if (HornetQResourceAdapter.trace)
-      {
-         HornetQResourceAdapter.log.trace("getUseXA()");
-      }
 
-      return raProperties.getUseXA();
-   }
-
    /**
-    * Set the use XA flag
-    *
-    * @param xa The value
-    */
-   public void setUseXA(final Boolean xa)
-   {
-      if (HornetQResourceAdapter.trace)
-      {
-         HornetQResourceAdapter.log.trace("setUseXA(" + xa + ")");
-      }
-
-      raProperties.setUseXA(xa);
-   }
-
-   /**
     * Indicates whether some other object is "equal to" this one.
     *
     * @param obj Object with which to compare
@@ -1287,7 +1258,8 @@
                                       final Integer dupsOkBatchSize,
                                       final Integer transactionBatchSize,
                                       final boolean deliveryTransacted,
-                                      final boolean useLocalTx) throws Exception
+                                      final boolean useLocalTx,
+                                      final Integer txTimeout) throws Exception
    {
 
       ClientSession result;

Modified: trunk/src/main/org/hornetq/ra/inflow/HornetQActivation.java
===================================================================
--- trunk/src/main/org/hornetq/ra/inflow/HornetQActivation.java	2010-04-16 14:45:28 UTC (rev 9136)
+++ trunk/src/main/org/hornetq/ra/inflow/HornetQActivation.java	2010-04-16 16:46:08 UTC (rev 9137)
@@ -353,7 +353,8 @@
                                    ra.getDupsOKBatchSize(),
                                    ra.getTransactionBatchSize(),
                                    isDeliveryTransacted,
-                                   spec.isUseLocalTx());
+                                   spec.isUseLocalTx(),
+                                   spec.getTransactionTimeout());
 
          HornetQActivation.log.debug("Using queue connection " + result);
 

Modified: trunk/src/main/org/hornetq/ra/inflow/HornetQActivationSpec.java
===================================================================
--- trunk/src/main/org/hornetq/ra/inflow/HornetQActivationSpec.java	2010-04-16 14:45:28 UTC (rev 9136)
+++ trunk/src/main/org/hornetq/ra/inflow/HornetQActivationSpec.java	2010-04-16 16:46:08 UTC (rev 9137)
@@ -92,6 +92,10 @@
 
    /* use local tx instead of XA*/
    private Boolean localTx;
+
+   private String transactionManagerLocatorClass = "org.hornetq.integration.jboss.tm.JBoss5TransactionManagerLocator";
+
+   private String transactionManagerLocatorMethod = "getTm";
    
    /**
     * Constructor
@@ -644,6 +648,27 @@
       this.localTx = localTx;
    }
 
+
+   public void setTransactionManagerLocatorClass(final String transactionManagerLocatorClass)
+   {
+      this.transactionManagerLocatorClass = transactionManagerLocatorClass;
+   }
+
+   public String getTransactionManagerLocatorClass()
+   {
+      return transactionManagerLocatorClass;
+   }
+
+   public String getTransactionManagerLocatorMethod()
+   {
+      return transactionManagerLocatorMethod;
+   }
+
+   public void setTransactionManagerLocatorMethod(final String transactionManagerLocatorMethod)
+   {
+      this.transactionManagerLocatorMethod = transactionManagerLocatorMethod;
+   }
+
    /**
     * Validate
     * @exception InvalidPropertyException Thrown if a validation exception occurs
@@ -755,6 +780,7 @@
    public void setReconnectInterval(long interval)
    {
    }
-   
-   
+
+
+
 }

Modified: trunk/src/main/org/hornetq/ra/inflow/HornetQMessageHandler.java
===================================================================
--- trunk/src/main/org/hornetq/ra/inflow/HornetQMessageHandler.java	2010-04-16 14:45:28 UTC (rev 9136)
+++ trunk/src/main/org/hornetq/ra/inflow/HornetQMessageHandler.java	2010-04-16 16:46:08 UTC (rev 9137)
@@ -12,6 +12,7 @@
  */
 package org.hornetq.ra.inflow;
 
+import java.lang.reflect.Method;
 import java.util.UUID;
 
 import javax.jms.InvalidClientIDException;
@@ -19,6 +20,8 @@
 import javax.resource.ResourceException;
 import javax.resource.spi.endpoint.MessageEndpoint;
 import javax.resource.spi.endpoint.MessageEndpointFactory;
+import javax.transaction.SystemException;
+import javax.transaction.TransactionManager;
 
 import org.hornetq.api.core.HornetQException;
 import org.hornetq.api.core.SimpleString;
@@ -69,6 +72,8 @@
    
    private final int sessionNr;
 
+   private TransactionManager tm;
+
    public HornetQMessageHandler(final HornetQActivation activation, final ClientSession session, final int sessionNr)
    {
       this.activation = activation;
@@ -241,14 +246,27 @@
 
       HornetQMessage msg = HornetQMessage.createMessage(message, session);
       boolean beforeDelivery = false;
+
       try
       {
+         if(activation.getActivationSpec().getTransactionTimeout() > 0)
+         {
+            getTm().setTransactionTimeout(activation.getActivationSpec().getTransactionTimeout());
+         }
          endpoint.beforeDelivery(HornetQActivation.ONMESSAGE);
          beforeDelivery = true;
          msg.doBeforeReceive();
          ((MessageListener)endpoint).onMessage(msg);
          message.acknowledge();
-         endpoint.afterDelivery();
+         try
+         {
+            endpoint.afterDelivery();
+         }
+         catch (ResourceException e)
+         {
+            HornetQMessageHandler.log.warn("Unable to call after delivery", e);
+            return;
+         }
          if (useLocalTx)
          {
             session.commit();
@@ -266,7 +284,7 @@
             }
             catch (ResourceException e1)
             {
-               HornetQMessageHandler.log.warn("Unable to call after delivery");
+               HornetQMessageHandler.log.warn("Unable to call after delivery", e);
             }
          }
          if (useLocalTx || !activation.isDeliveryTransacted())
@@ -284,4 +302,33 @@
 
    }
 
+   private TransactionManager getTm()
+   {
+      if (tm == null)
+      {
+         try
+         {
+            ClassLoader loader = Thread.currentThread().getContextClassLoader();
+            Class aClass = loader.loadClass(activation.getActivationSpec().getTransactionManagerLocatorClass());
+            Object o = aClass.newInstance();
+            Method m = aClass.getMethod(activation.getActivationSpec().getTransactionManagerLocatorMethod());
+            tm = (TransactionManager)m.invoke(o);
+         }
+         catch (Exception e)
+         {
+            throw new IllegalStateException("unable to create TransactionManager from " + activation.getActivationSpec().getTransactionManagerLocatorClass() +
+                                                     "." +
+                                                     activation.getActivationSpec().getTransactionManagerLocatorMethod(),
+                                            e);
+         }
+
+         if (tm == null)
+         {
+            throw new IllegalStateException("Cannot locate a transaction manager");
+         }
+      }
+
+      return tm;
+   }
+
 }

Modified: trunk/tests/src/org/hornetq/tests/integration/xa/XaTimeoutTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/xa/XaTimeoutTest.java	2010-04-16 14:45:28 UTC (rev 9136)
+++ trunk/tests/src/org/hornetq/tests/integration/xa/XaTimeoutTest.java	2010-04-16 16:46:08 UTC (rev 9137)
@@ -393,11 +393,19 @@
       clientProducer.send(m3);
       clientProducer.send(m4);
       clientSession.end(xid, XAResource.TMSUCCESS);
+
+      clientSession.commit(xid, true);
+
       clientSession.setTransactionTimeout(1);
+      clientSession.start(xid, XAResource.TMNOFLAGS);
       CountDownLatch latch = new CountDownLatch(1);
       messagingService.getResourceManager().getTransaction(xid).addOperation(new RollbackCompleteOperation(latch));
-      Assert.assertTrue(latch.await(5, TimeUnit.SECONDS));
-
+      clientProducer.send(m1);
+      clientProducer.send(m2);
+      clientProducer.send(m3);
+      clientProducer.send(m4);
+      clientSession.end(xid, XAResource.TMSUCCESS);
+      Assert.assertTrue(latch.await(2600, TimeUnit.MILLISECONDS));
       try
       {
          clientSession.commit(xid, true);
@@ -408,49 +416,15 @@
       }
       clientSession.start();
       ClientMessage m = clientConsumer.receiveImmediate();
-      Assert.assertNull(m);
-   }
-
-   public void testChangingTimeoutGetsPickedUpCommit() throws Exception
-   {
-      Xid xid = new XidImpl("xa1".getBytes(), 1, UUIDGenerator.getInstance().generateStringUUID().getBytes());
-
-      ClientMessage m1 = createTextMessage("m1", clientSession);
-      ClientMessage m2 = createTextMessage("m2", clientSession);
-      ClientMessage m3 = createTextMessage("m3", clientSession);
-      ClientMessage m4 = createTextMessage("m4", clientSession);
-      clientSession.setTransactionTimeout(2);
-      clientSession.start(xid, XAResource.TMNOFLAGS);
-      clientProducer.send(m1);
-      clientProducer.send(m2);
-      clientProducer.send(m3);
-      clientProducer.send(m4);
-      clientSession.end(xid, XAResource.TMSUCCESS);
-      clientSession.setTransactionTimeout(10000);
-      CountDownLatch latch = new CountDownLatch(1);
-      messagingService.getResourceManager().getTransaction(xid).addOperation(new RollbackCompleteOperation(latch));
-      Assert.assertFalse(latch.await(2600, TimeUnit.MILLISECONDS));
-      clientSession.prepare(xid);
-      clientSession.commit(xid, false);
-      ClientSession clientSession2 = sessionFactory.createSession(false, true, true);
-      ClientConsumer consumer = clientSession2.createConsumer(atestq);
-      clientSession2.start();
-      ClientMessage m = consumer.receive(500);
       Assert.assertNotNull(m);
-      Assert.assertEquals(m.getBodyBuffer().readString(), "m1");
-      m = consumer.receive(500);
+      m = clientConsumer.receiveImmediate();
       Assert.assertNotNull(m);
-      m.acknowledge();
-      Assert.assertEquals(m.getBodyBuffer().readString(), "m2");
-      m = consumer.receive(500);
-      m.acknowledge();
+      m = clientConsumer.receiveImmediate();
       Assert.assertNotNull(m);
-      Assert.assertEquals(m.getBodyBuffer().readString(), "m3");
-      m = consumer.receive(500);
-      m.acknowledge();
+      m = clientConsumer.receiveImmediate();
       Assert.assertNotNull(m);
-      Assert.assertEquals(m.getBodyBuffer().readString(), "m4");
-      clientSession2.close();
+      m = clientConsumer.receiveImmediate();
+      Assert.assertNull(m);
    }
 
    public void testMultipleTransactionsTimedOut() throws Exception
@@ -464,6 +438,7 @@
       for (int i = 0; i < clientSessions.length; i++)
       {
          clientSessions[i] = sessionFactory.createSession(true, false, false);
+         clientSessions[i].setTransactionTimeout(i < 50?2:5000);
       }
 
       ClientProducer[] clientProducers = new ClientProducer[xids.length];
@@ -478,7 +453,6 @@
       {
          messages[i] = createTextMessage("m" + i, clientSession);
       }
-      clientSession.setTransactionTimeout(2);
       for (int i = 0; i < clientSessions.length; i++)
       {
          clientSessions[i].start(xids[i], XAResource.TMNOFLAGS);
@@ -491,13 +465,21 @@
       {
          clientSessions[i].end(xids[i], XAResource.TMSUCCESS);
       }
-      CountDownLatch latch = new CountDownLatch(1);
-      messagingService.getResourceManager()
-                      .getTransaction(xids[clientSessions.length - 1])
-                      .addOperation(new RollbackCompleteOperation(latch));
-      Assert.assertTrue(latch.await(5, TimeUnit.SECONDS));
-      for (int i = 0; i < clientSessions.length; i++)
+      CountDownLatch[] latches = new CountDownLatch[xids.length];
+      for (int i1 = 0; i1 < latches.length; i1++)
       {
+         latches[i1] = new CountDownLatch(1);
+         messagingService.getResourceManager()
+                      .getTransaction(xids[i1])
+                      .addOperation(new RollbackCompleteOperation(latches[i1]));
+      }
+      for (int i1 = 0;i1 < latches.length/2; i1++)
+      {
+         Assert.assertTrue(latches[i1].await(5, TimeUnit.SECONDS));
+      }
+
+      for (int i = 0; i < clientSessions.length/2; i++)
+      {
          try
          {
             clientSessions[i].commit(xids[i], true);
@@ -507,11 +489,20 @@
             Assert.assertTrue(e.errorCode == XAException.XAER_NOTA);
          }
       }
+      for (int i = 50; i < clientSessions.length; i++)
+      {
+         clientSessions[i].commit(xids[i], true);
+      }
       for (ClientSession session : clientSessions)
       {
          session.close();
       }
       clientSession.start();
+      for(int i = 0; i < clientSessions.length/2; i++)
+      {
+         ClientMessage m = clientConsumer.receiveImmediate();
+         Assert.assertNotNull(m);   
+      }
       ClientMessage m = clientConsumer.receiveImmediate();
       Assert.assertNull(m);
    }

Modified: trunk/tests/src/org/hornetq/tests/unit/core/client/impl/LargeMessageBufferTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/client/impl/LargeMessageBufferTest.java	2010-04-16 14:45:28 UTC (rev 9136)
+++ trunk/tests/src/org/hornetq/tests/unit/core/client/impl/LargeMessageBufferTest.java	2010-04-16 16:46:08 UTC (rev 9137)
@@ -635,7 +635,7 @@
 
       }
 
-      public void clear() throws HornetQException
+      public void clear(boolean waitForOnMessage) throws HornetQException
       {
          // TODO Auto-generated method stub
 
@@ -725,6 +725,11 @@
 
       }
 
+      public void stop(boolean waitForOnMessage) throws HornetQException
+      {
+         //To change body of implemented methods use File | Settings | File Templates.
+      }
+
       public SessionQueueQueryResponseMessage getQueueInfo()
       {
          // TODO Auto-generated method stub

Modified: trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/BindingsImplTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/BindingsImplTest.java	2010-04-16 14:45:28 UTC (rev 9136)
+++ trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/BindingsImplTest.java	2010-04-16 16:46:08 UTC (rev 9137)
@@ -134,6 +134,11 @@
 
       }
 
+      public boolean hasTimedOut(long currentTime, int defaultTimeout)
+      {
+         return false;  
+      }
+
       /* (non-Javadoc)
        * @see org.hornetq.core.transaction.Transaction#commit()
        */
@@ -282,6 +287,11 @@
 
       }
 
+      public void setTimeout(int timeout)
+      {
+         
+      }
+
    }
 
    class FakeMessage implements ServerMessage



More information about the hornetq-commits mailing list