[jboss-cvs] JBoss Messaging SVN: r8454 - in branches/Branch_1_4: integration/EAP4/etc and 5 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Tue Oct 25 22:45:35 EDT 2011


Author: gaohoward
Date: 2011-10-25 22:45:35 -0400 (Tue, 25 Oct 2011)
New Revision: 8454

Modified:
   branches/Branch_1_4/docs/userguide/en/modules/configuration.xml
   branches/Branch_1_4/integration/EAP4/etc/aop-messaging-client.xml
   branches/Branch_1_4/integration/EAP5/etc/aop-messaging-client.xml
   branches/Branch_1_4/src/main/org/jboss/jms/client/container/ClientConsumer.java
   branches/Branch_1_4/src/main/org/jboss/jms/client/container/ConsumerAspect.java
   branches/Branch_1_4/src/main/org/jboss/jms/client/container/SessionAspect.java
   branches/Branch_1_4/src/main/org/jboss/jms/client/delegate/ClientSessionDelegate.java
   branches/Branch_1_4/src/main/org/jboss/jms/delegate/SessionDelegate.java
   branches/Branch_1_4/src/main/org/jboss/jms/tx/ClientTransaction.java
   branches/Branch_1_4/src/main/org/jboss/jms/tx/ResourceManager.java
Log:
JBMESSAGING-1904
Rolling back 1850/1876 (but retaining the MinTimeoutProcessTime to keep the wireformat backward compatible)



Modified: branches/Branch_1_4/docs/userguide/en/modules/configuration.xml
===================================================================
--- branches/Branch_1_4/docs/userguide/en/modules/configuration.xml	2011-10-20 12:44:23 UTC (rev 8453)
+++ branches/Branch_1_4/docs/userguide/en/modules/configuration.xml	2011-10-26 02:45:35 UTC (rev 8454)
@@ -2609,24 +2609,6 @@
         <para>This attribute takes effect only if the MaxRetryChangeRate attribute is not zero.</para>
       </section>
 
-      <section id="conf.connectionfactory.attributes.minTimeoutProcessTime">
-        <title>MinTimeoutProcessTime</title>
-
-        <para>Minimum processing time allowed for (milliseconds) in a timeout receive. This is the minimum time for the internal processing after a message is arrived at the client
-        but before returning the message to the application. Default is 300. </para>
-
-	<para><note>
-
-        <para>When consumer is using AUTO_ACKNOWLEDGE session and time out for receive method expires in the moment when message is received at the client buffer but acknowledgement
-        is not confirmed by the server, then this message will be discarded and an exception is thrown. like</para>
-
-        <para>javax.jms.JMSException: Timed out before post message processing, discarding message delegator->JBossMessage[23357322686633840]ERSISTENT, deliveryId=0 at  org.jboss.jms.client.container.ClientConsumer.receive(ClientConsumer.java:596)</para>
-
-        <para>This can happen when time out for receive method is too low. Use CLIENT_ACKNOWLEDGE mode or Transactional mode to avoid such message loss.</para>
-        </note></para>
-
-      </section>
-
     <!-- End conf.connectionfactory.attributes -->
   </section>
 

Modified: branches/Branch_1_4/integration/EAP4/etc/aop-messaging-client.xml
===================================================================
--- branches/Branch_1_4/integration/EAP4/etc/aop-messaging-client.xml	2011-10-20 12:44:23 UTC (rev 8453)
+++ branches/Branch_1_4/integration/EAP4/etc/aop-messaging-client.xml	2011-10-26 02:45:35 UTC (rev 8454)
@@ -136,9 +136,6 @@
    <bind pointcut="execution(* org.jboss.jms.client.delegate.ClientSessionDelegate->postDeliver(..))">
       <advice name="handlePostDeliver" aspect="org.jboss.jms.client.container.SessionAspect"/>
    </bind>
-   <bind pointcut="execution(* org.jboss.jms.client.delegate.ClientSessionDelegate->processMessageTimeout(..))">
-      <advice name="handleProcessMessageTimeout" aspect="org.jboss.jms.client.container.SessionAspect"/>
-   </bind>
    <bind pointcut="execution(* org.jboss.jms.client.delegate.ClientSessionDelegate->acknowledgeAll(..))">
       <advice name="handleAcknowledgeAll" aspect="org.jboss.jms.client.container.SessionAspect"/>
    </bind>

Modified: branches/Branch_1_4/integration/EAP5/etc/aop-messaging-client.xml
===================================================================
--- branches/Branch_1_4/integration/EAP5/etc/aop-messaging-client.xml	2011-10-20 12:44:23 UTC (rev 8453)
+++ branches/Branch_1_4/integration/EAP5/etc/aop-messaging-client.xml	2011-10-26 02:45:35 UTC (rev 8454)
@@ -136,9 +136,6 @@
    <bind pointcut="execution(* org.jboss.jms.client.delegate.ClientSessionDelegate->postDeliver(..))">
       <advice name="handlePostDeliver" aspect="org.jboss.jms.client.container.SessionAspect"/>
    </bind>
-   <bind pointcut="execution(* org.jboss.jms.client.delegate.ClientSessionDelegate->processMessageTimeout(..))">
-      <advice name="handleProcessMessageTimeout" aspect="org.jboss.jms.client.container.SessionAspect"/>
-   </bind>
    <bind pointcut="execution(* org.jboss.jms.client.delegate.ClientSessionDelegate->acknowledgeAll(..))">
       <advice name="handleAcknowledgeAll" aspect="org.jboss.jms.client.container.SessionAspect"/>
    </bind>

Modified: branches/Branch_1_4/src/main/org/jboss/jms/client/container/ClientConsumer.java
===================================================================
--- branches/Branch_1_4/src/main/org/jboss/jms/client/container/ClientConsumer.java	2011-10-20 12:44:23 UTC (rev 8453)
+++ branches/Branch_1_4/src/main/org/jboss/jms/client/container/ClientConsumer.java	2011-10-26 02:45:35 UTC (rev 8454)
@@ -24,13 +24,7 @@
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executor;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
 import java.util.concurrent.locks.ReentrantLock;
 
 import javax.jms.IllegalStateException;
@@ -312,7 +306,6 @@
    private int consumeCount;
    private boolean firstTime = true;
    private volatile Thread onMessageThread;
-   private ExecutorService pool = Executors.newCachedThreadPool();
    private long maxRetryChangeRate;
    private long retryChangeRateInterval;
    private boolean abortReceive;
@@ -328,9 +321,6 @@
    private Object messageSource;
    private boolean isClustered = false;
 
-   //JBMESSAGING-1876
-   private long minTimeoutProcessTime;
-
    private boolean isSucker = false;
 
    public int getBufferSize()
@@ -349,8 +339,7 @@
                          long maxRetryChangeRate,
                          long retryChangeRateInterval,
                          boolean isClustered,
-                         CallbackManager cbManager,
-                         long minTimeoutProcessTime)
+                         CallbackManager cbManager)
    {
       if (bufferSize < 1)
       {
@@ -378,7 +367,6 @@
          consumerLock = new Object();
       }
       messageSource = cbManager;
-      this.minTimeoutProcessTime = minTimeoutProcessTime;
    }
         
    // Public ---------------------------------------------------------------------------------------
@@ -517,8 +505,6 @@
          
          this.listener = null;
       }
-      
-      pool.shutdownNow();
                            
       if (trace) { log.trace(this + " closed"); }
    }
@@ -607,79 +593,15 @@
                if (!isConnectionConsumer && !ignore)
                {
                   final DeliveryInfo info = new DeliveryInfo(m, consumerID, queueName, null, shouldAck, m.getSource());
+                  sessionDelegate.preDeliver(info);
+
+                  // If post deliver didn't succeed and acknowledgement mode is auto_ack
+                  // That means the ref wasn't acked since it couldn't be found.
+                  // In order to maintain at most once semantics we must therefore not return
+                  // the message
                   
-                  if (timeout <= 0 || sessionDelegate.getTransacted())
-                  {
-                     ignore = ! sessionDelegate.preDeliver(info);
+                  ignore = ! sessionDelegate.postDeliver();
 
-                     // If post deliver didn't succeed and acknowledgement mode is auto_ack
-                     // That means the ref wasn't acked since it couldn't be found.
-                     // In order to maintain at most once semantics we must therefore not return
-                     // the message
-                     
-                     if (!ignore)
-                     {
-                        ignore = !sessionDelegate.postDeliver();
-                     }
-                  }
-                  else
-                  {
-                     //JBMESSAGING-1850
-                     Callable<Boolean> afterReceive = new Callable<Boolean>()
-                     {
-                        public Boolean call() throws Exception
-                        {
-                           if (! sessionDelegate.preDeliver(info))
-                           {
-                              return true;
-                           }
-
-                           // If post deliver didn't succeed and acknowledgement mode is auto_ack
-                           // That means the ref wasn't acked since it couldn't be found.
-                           // In order to maintain at most once semantics we must therefore not return
-                           // the message
-
-                           return !sessionDelegate.postDeliver();
-                        }
-                     };
-
-                     java.util.concurrent.Future<Boolean> f = pool.submit(afterReceive);
-
-                     long tmUsed = System.currentTimeMillis() - startTimestamp;
-                     
-                     long timeDelta = timeout - tmUsed;
-                     long timeLeft = timeDelta <= minTimeoutProcessTime ? minTimeoutProcessTime : timeDelta ; 
-                     
-                     if (trace)
-                     {
-                        log.trace("Time left: " + timeLeft + " timeout " + timeout + " tmUsed " + tmUsed);
-                     }
-                     
-                     try
-                     {
-                        ignore = f.get(timeLeft, TimeUnit.MILLISECONDS);
-                     }
-                     catch (InterruptedException e)
-                     {
-                        log.warn("Interrupted during getting future result.", e);
-                     }
-                     catch (ExecutionException e)
-                     {
-                        log.warn("received application exception.", e.getCause());
-                        Throwable t = e.getCause();
-                        if (t instanceof JMSException)
-                        {
-                           throw (JMSException)t;
-                        }
-                     }
-                     catch (TimeoutException e)
-                     {
-                        log.warn("Timed out waiting for post message processing " + m + " within time " + timeLeft);
-                        ignore = false;
-                        sessionDelegate.processMessageTimeout();
-                     }
-                  }
-
                   if (trace)
                   {
                      log.trace("Post deliver returned " + !ignore);

Modified: branches/Branch_1_4/src/main/org/jboss/jms/client/container/ConsumerAspect.java
===================================================================
--- branches/Branch_1_4/src/main/org/jboss/jms/client/container/ConsumerAspect.java	2011-10-20 12:44:23 UTC (rev 8453)
+++ branches/Branch_1_4/src/main/org/jboss/jms/client/container/ConsumerAspect.java	2011-10-26 02:45:35 UTC (rev 8454)
@@ -120,7 +120,7 @@
                             prefetchSize, executor, maxDeliveries, consumerState.isShouldAck(),
                             redeliveryDelay, consumerState.getMaxRetryChangeRate(), 
                             consumerState.getRetryChangeRateInterval(),
-                            fcc != null, cm, consumerState.getMinTimeoutProcessTime());
+                            fcc != null, cm);
       
       sessionState.addCallbackHandler(messageHandler);
       

Modified: branches/Branch_1_4/src/main/org/jboss/jms/client/container/SessionAspect.java
===================================================================
--- branches/Branch_1_4/src/main/org/jboss/jms/client/container/SessionAspect.java	2011-10-20 12:44:23 UTC (rev 8453)
+++ branches/Branch_1_4/src/main/org/jboss/jms/client/container/SessionAspect.java	2011-10-26 02:45:35 UTC (rev 8454)
@@ -416,52 +416,6 @@
       }
    }
    
-   //JBMESSAGING-1876
-   public Object handleProcessMessageTimeout(Invocation invocation) throws Throwable
-   { 
-      MethodInvocation mi = (MethodInvocation)invocation;
-      SessionState state = getState(invocation);
-
-      int ackMode = state.getAcknowledgeMode();
-                  
-      if (ackMode == Session.CLIENT_ACKNOWLEDGE)
-      {
-         if (trace)
-         {
-            log.trace("Don't do anything about CLIENT_ACK mode");
-         }
-      }
-      else if (ackMode == Session.AUTO_ACKNOWLEDGE)
-      {
-         if (trace)
-         {
-            log.trace("Don't do anything about AUTO_ACK mode");
-         }
-      }
-      else if (ackMode == Session.DUPS_OK_ACKNOWLEDGE)
-      {
-         if (trace)
-         {
-            log.trace("Don't do anything about DUPS_OK_ACK mode");
-         }
-      }
-      else
-      {             
-         Object txID = state.getCurrentTxId();
-   
-         if (txID != null)
-         {
-            if (trace) { log.trace("Marking tx " + txID + " to be rollback only"); }
-            
-            ConnectionState connState = (ConnectionState)state.getParent();
-            
-            connState.getResourceManager().markTxRollbackOnly(txID);
-         }        
-      }
-      
-      return null;
-   }
-   
    /**
     * Used for client acknowledge.
     */

Modified: branches/Branch_1_4/src/main/org/jboss/jms/client/delegate/ClientSessionDelegate.java
===================================================================
--- branches/Branch_1_4/src/main/org/jboss/jms/client/delegate/ClientSessionDelegate.java	2011-10-20 12:44:23 UTC (rev 8453)
+++ branches/Branch_1_4/src/main/org/jboss/jms/client/delegate/ClientSessionDelegate.java	2011-10-26 02:45:35 UTC (rev 8454)
@@ -549,11 +549,6 @@
       return "SessionDelegate[" + System.identityHashCode(this) + ", ID=" + id + "]";
    }
 
-   public void processMessageTimeout() throws JMSException
-   {
-      throw new IllegalStateException("This invocation should not be handled here!");
-   }
-
    public void messageChanged(long messageID)
    {
       throw new IllegalStateException("This invocation should not be handled here!");

Modified: branches/Branch_1_4/src/main/org/jboss/jms/delegate/SessionDelegate.java
===================================================================
--- branches/Branch_1_4/src/main/org/jboss/jms/delegate/SessionDelegate.java	2011-10-20 12:44:23 UTC (rev 8453)
+++ branches/Branch_1_4/src/main/org/jboss/jms/delegate/SessionDelegate.java	2011-10-26 02:45:35 UTC (rev 8454)
@@ -96,7 +96,5 @@
 
    void acknowledgeAll() throws JMSException;
 
-   void processMessageTimeout() throws JMSException;
-
    void messageChanged(long messageID);
 }

Modified: branches/Branch_1_4/src/main/org/jboss/jms/tx/ClientTransaction.java
===================================================================
--- branches/Branch_1_4/src/main/org/jboss/jms/tx/ClientTransaction.java	2011-10-20 12:44:23 UTC (rev 8453)
+++ branches/Branch_1_4/src/main/org/jboss/jms/tx/ClientTransaction.java	2011-10-26 02:45:35 UTC (rev 8454)
@@ -65,8 +65,6 @@
    private boolean clientSide;
    
    private boolean recovered;
-   
-   private volatile boolean rollbackOnly = false;
 
    /* Not sent over the wire, this is for differentiating between incompatible versions */
    protected boolean supportsRecovered ;
@@ -521,15 +519,4 @@
       }
 
    }
-
-   public void markRollbackOnly()
-   {
-      this.rollbackOnly = true;
-   }
-
-   public boolean isRollbackOnly()
-   {
-      return this.rollbackOnly;
-   }
-
 }

Modified: branches/Branch_1_4/src/main/org/jboss/jms/tx/ResourceManager.java
===================================================================
--- branches/Branch_1_4/src/main/org/jboss/jms/tx/ResourceManager.java	2011-10-20 12:44:23 UTC (rev 8453)
+++ branches/Branch_1_4/src/main/org/jboss/jms/tx/ResourceManager.java	2011-10-26 02:45:35 UTC (rev 8454)
@@ -198,13 +198,6 @@
       {
          throw new IllegalStateException("Cannot find transaction " + xid);
       }
-      
-      //if tx is mark rollback, roll back immediately
-      if (tx.isRollbackOnly())
-      {
-         this.rollbackLocal(xid, tx);
-         throw new MessagingTransactionRolledBackException("Rolled back " + tx + " as it is marked rollback only!");
-      }
                   
       TransactionRequest request =
          new TransactionRequest(TransactionRequest.ONE_PHASE_COMMIT_REQUEST, null, tx);
@@ -359,12 +352,6 @@
       { 
          throw new MessagingXAException(XAException.XAER_NOTA, "Cannot find transaction with xid:" + xid);
       }
-
-      if (state.isRollbackOnly())
-      {
-         this.rollback(xid, state, connection);
-         throw new MessagingXAException(XAException.XA_RBROLLBACK, "Transaction marked rollback only, xid: " + xid);
-      }
       
       TransactionRequest request =
          new TransactionRequest(TransactionRequest.TWO_PHASE_PREPARE_REQUEST, xid, state);
@@ -388,13 +375,6 @@
       {
          log.trace("got tx: " + tx + " state " + tx.getState()); 
       }
-      
-      //roll back for onePhase only. for 2pc, rollback only is processed in prepare
-      if ((tx != null) && tx.isRollbackOnly() && onePhase)
-      {
-         this.rollback(xid, tx, connection);
-         throw new MessagingXAException(XAException.XA_RBROLLBACK, "Transaction marked rollback only, xid: " + xid);
-      }
           
       if (onePhase)
       {
@@ -781,21 +761,6 @@
          }
       }
    }
-
-   /**
-    * @param txID
-    */
-   public void markTxRollbackOnly(Object txID) throws JMSException
-   {
-      ClientTransaction tx = getTxInternal(txID);
-      
-      if (tx == null)
-      {
-         throw new JMSException("There is no transaction with id " + txID);
-      }
-      
-      tx.markRollbackOnly();
-   }
    
    // Inner Classes --------------------------------------------------------------------------------
 }



More information about the jboss-cvs-commits mailing list