[jboss-cvs] JBoss Messaging SVN: r8335 - in branches/JBMESSAGING_1876: integration/EAP5/etc and 4 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Tue Jun 14 03:58:22 EDT 2011


Author: gaohoward
Date: 2011-06-14 03:58:21 -0400 (Tue, 14 Jun 2011)
New Revision: 8335

Modified:
   branches/JBMESSAGING_1876/integration/EAP4/etc/aop-messaging-client.xml
   branches/JBMESSAGING_1876/integration/EAP5/etc/aop-messaging-client.xml
   branches/JBMESSAGING_1876/src/main/org/jboss/jms/client/container/ClientConsumer.java
   branches/JBMESSAGING_1876/src/main/org/jboss/jms/client/container/SessionAspect.java
   branches/JBMESSAGING_1876/src/main/org/jboss/jms/client/delegate/ClientSessionDelegate.java
   branches/JBMESSAGING_1876/src/main/org/jboss/jms/delegate/SessionDelegate.java
   branches/JBMESSAGING_1876/src/main/org/jboss/jms/tx/ClientTransaction.java
   branches/JBMESSAGING_1876/src/main/org/jboss/jms/tx/ResourceManager.java
Log:
first fix



Modified: branches/JBMESSAGING_1876/integration/EAP4/etc/aop-messaging-client.xml
===================================================================
--- branches/JBMESSAGING_1876/integration/EAP4/etc/aop-messaging-client.xml	2011-06-14 02:24:48 UTC (rev 8334)
+++ branches/JBMESSAGING_1876/integration/EAP4/etc/aop-messaging-client.xml	2011-06-14 07:58:21 UTC (rev 8335)
@@ -136,6 +136,9 @@
    <bind pointcut="execution(* org.jboss.jms.client.delegate.ClientSessionDelegate->postDeliver(..))">
       <advice name="handlePostDeliver" aspect="org.jboss.jms.client.container.SessionAspect"/>
    </bind>
+   <bind pointcut="execution(* org.jboss.jms.client.delegate.ClientSessionDelegate->processMessageTimeout(..))">
+      <advice name="handleProcessMessageTimeout" aspect="org.jboss.jms.client.container.SessionAspect"/>
+   </bind>
    <bind pointcut="execution(* org.jboss.jms.client.delegate.ClientSessionDelegate->acknowledgeAll(..))">
       <advice name="handleAcknowledgeAll" aspect="org.jboss.jms.client.container.SessionAspect"/>
    </bind>
@@ -283,4 +286,4 @@
    <bind pointcut="execution(* org.jboss.jms.client.delegate.ClientBrowserDelegate->hasNextMessage())">
       <advice name="handleHasNextMessage" aspect="org.jboss.jms.client.container.BrowserAspect"/>
    </bind>         
-</aop>
\ No newline at end of file
+</aop>

Modified: branches/JBMESSAGING_1876/integration/EAP5/etc/aop-messaging-client.xml
===================================================================
--- branches/JBMESSAGING_1876/integration/EAP5/etc/aop-messaging-client.xml	2011-06-14 02:24:48 UTC (rev 8334)
+++ branches/JBMESSAGING_1876/integration/EAP5/etc/aop-messaging-client.xml	2011-06-14 07:58:21 UTC (rev 8335)
@@ -136,6 +136,9 @@
    <bind pointcut="execution(* org.jboss.jms.client.delegate.ClientSessionDelegate->postDeliver(..))">
       <advice name="handlePostDeliver" aspect="org.jboss.jms.client.container.SessionAspect"/>
    </bind>
+   <bind pointcut="execution(* org.jboss.jms.client.delegate.ClientSessionDelegate->processMessageTimeout(..))">
+      <advice name="handleProcessMessageTimeout" aspect="org.jboss.jms.client.container.SessionAspect"/>
+   </bind>
    <bind pointcut="execution(* org.jboss.jms.client.delegate.ClientSessionDelegate->acknowledgeAll(..))">
       <advice name="handleAcknowledgeAll" aspect="org.jboss.jms.client.container.SessionAspect"/>
    </bind>
@@ -283,4 +286,4 @@
    <bind pointcut="execution(* org.jboss.jms.client.delegate.ClientBrowserDelegate->hasNextMessage())">
       <advice name="handleHasNextMessage" aspect="org.jboss.jms.client.container.BrowserAspect"/>
    </bind>         
-</aop>
\ No newline at end of file
+</aop>

Modified: branches/JBMESSAGING_1876/src/main/org/jboss/jms/client/container/ClientConsumer.java
===================================================================
--- branches/JBMESSAGING_1876/src/main/org/jboss/jms/client/container/ClientConsumer.java	2011-06-14 02:24:48 UTC (rev 8334)
+++ branches/JBMESSAGING_1876/src/main/org/jboss/jms/client/container/ClientConsumer.java	2011-06-14 07:58:21 UTC (rev 8335)
@@ -315,6 +315,9 @@
    private Object messageSource;
    private boolean isClustered = false;
 
+   //JBMESSAGING-1876
+   private long minTimeoutProcessTime = 50;
+
    public int getBufferSize()
    {
       return buffer.size();
@@ -627,15 +630,16 @@
 
                      long tmUsed = System.currentTimeMillis() - startTimestamp;
                      
-                     if (tmUsed >= timeout)
+                     long timeLeft = (timeout - tmUsed) <= 0 ? minTimeoutProcessTime : tmUsed + minTimeoutProcessTime;
+                     
+                     if (trace)
                      {
-                        log.warn("Timed out before post message processing, discarding message " + m);
-                        throw new JMSException("Timed out before post message processing, discarding message " + m);
+                        log.trace("Time left: " + timeLeft + " timeout " + timeout + " tmUsed " + tmUsed);
                      }
-
+                     
                      try
                      {
-                        ignore = f.get(timeout - tmUsed, TimeUnit.MILLISECONDS);
+                        ignore = f.get(timeLeft, TimeUnit.MILLISECONDS);
                      }
                      catch (InterruptedException e)
                      {
@@ -652,8 +656,9 @@
                      }
                      catch (TimeoutException e)
                      {
-                        log.warn("Timed out waiting for post message processing, discarding message " + m);
-                        throw new JMSException("Timed out waiting for post message processing, discarding message " + m);
+                        log.warn("Timed out waiting for post message processing " + m);
+                        ignore = false;
+                        sessionDelegate.processMessageTimeout();
                      }
                   }
 

Modified: branches/JBMESSAGING_1876/src/main/org/jboss/jms/client/container/SessionAspect.java
===================================================================
--- branches/JBMESSAGING_1876/src/main/org/jboss/jms/client/container/SessionAspect.java	2011-06-14 02:24:48 UTC (rev 8334)
+++ branches/JBMESSAGING_1876/src/main/org/jboss/jms/client/container/SessionAspect.java	2011-06-14 07:58:21 UTC (rev 8335)
@@ -409,6 +409,62 @@
       }
    }
    
+   //JBMESSAGING-1876
+   public Object handleProcessMessageTimeout(Invocation invocation) throws Throwable
+   { 
+      MethodInvocation mi = (MethodInvocation)invocation;
+      SessionState state = getState(invocation);
+      
+      boolean result = true;
+      
+      synchronized (state)
+      {
+      
+         int ackMode = state.getAcknowledgeMode();
+         
+         Object[] args = mi.getArguments();
+         DeliveryInfo info = (DeliveryInfo)args[0];
+         
+         if (ackMode == Session.CLIENT_ACKNOWLEDGE)
+         {
+            if (trace)
+            {
+               log.trace("Don't do anything about CLIENT_ACK mode");
+            }
+         }
+         else if (ackMode == Session.AUTO_ACKNOWLEDGE)
+         {
+            if (trace)
+            {
+               log.trace("Don't do anything about AUTO_ACK mode");
+            }
+         }
+         else if (ackMode == Session.DUPS_OK_ACKNOWLEDGE)
+         {
+            if (trace)
+            {
+               log.trace("Don't do anything about DUPS_OK_ACK mode");
+            }
+         }
+         else
+         {             
+            Object txID = state.getCurrentTxId();
+      
+            if (txID != null)
+            {
+
+               if (trace) { log.trace("Marking tx " + txID + " to be rollback only"); }
+               
+               ConnectionState connState = (ConnectionState)state.getParent();
+               
+               connState.getResourceManager().markTxRollbackOnly(txID);
+            }        
+         }
+      }
+      
+      return null;
+   }
+   
    /**
     * Used for client acknowledge.
     */

Modified: branches/JBMESSAGING_1876/src/main/org/jboss/jms/client/delegate/ClientSessionDelegate.java
===================================================================
--- branches/JBMESSAGING_1876/src/main/org/jboss/jms/client/delegate/ClientSessionDelegate.java	2011-06-14 02:24:48 UTC (rev 8334)
+++ branches/JBMESSAGING_1876/src/main/org/jboss/jms/client/delegate/ClientSessionDelegate.java	2011-06-14 07:58:21 UTC (rev 8335)
@@ -549,6 +549,11 @@
       return "SessionDelegate[" + System.identityHashCode(this) + ", ID=" + id + "]";
    }
 
+   public void processMessageTimeout() throws JMSException
+   {
+      throw new IllegalStateException("This invocation should not be handled here!");
+   }
+
    // Protected ------------------------------------------------------------------------------------
 
    // Package Private ------------------------------------------------------------------------------

Modified: branches/JBMESSAGING_1876/src/main/org/jboss/jms/delegate/SessionDelegate.java
===================================================================
--- branches/JBMESSAGING_1876/src/main/org/jboss/jms/delegate/SessionDelegate.java	2011-06-14 02:24:48 UTC (rev 8334)
+++ branches/JBMESSAGING_1876/src/main/org/jboss/jms/delegate/SessionDelegate.java	2011-06-14 07:58:21 UTC (rev 8335)
@@ -95,4 +95,6 @@
    ProducerDelegate createProducerDelegate(JBossDestination destination) throws JMSException;
 
    void acknowledgeAll() throws JMSException;
+
+   void processMessageTimeout() throws JMSException;
 }

Modified: branches/JBMESSAGING_1876/src/main/org/jboss/jms/tx/ClientTransaction.java
===================================================================
--- branches/JBMESSAGING_1876/src/main/org/jboss/jms/tx/ClientTransaction.java	2011-06-14 02:24:48 UTC (rev 8334)
+++ branches/JBMESSAGING_1876/src/main/org/jboss/jms/tx/ClientTransaction.java	2011-06-14 07:58:21 UTC (rev 8335)
@@ -65,6 +65,8 @@
    private boolean clientSide;
    
    private boolean recovered;
+   
+   private boolean rollbackOnly = false;
 
    // Static --------------------------------------------------------
 
@@ -502,4 +504,14 @@
 
    }
 
+   public void markRollbackOnly()
+   {
+      this.rollbackOnly = true;
+   }
+
+   public boolean isRollbackOnly()
+   {
+      return this.rollbackOnly;
+   }
+
 }

Modified: branches/JBMESSAGING_1876/src/main/org/jboss/jms/tx/ResourceManager.java
===================================================================
--- branches/JBMESSAGING_1876/src/main/org/jboss/jms/tx/ResourceManager.java	2011-06-14 02:24:48 UTC (rev 8334)
+++ branches/JBMESSAGING_1876/src/main/org/jboss/jms/tx/ResourceManager.java	2011-06-14 07:58:21 UTC (rev 8335)
@@ -197,6 +197,13 @@
       {
          throw new IllegalStateException("Cannot find transaction " + xid);
       }
+      
+      //if tx is mark rollback, roll back immediately
+      if (tx.isRollbackOnly())
+      {
+         this.rollbackLocal(xid);
+         throw new MessagingTransactionRolledBackException("Rolled back " + tx + " as it is marked rollback only!");
+      }
                   
       TransactionRequest request =
          new TransactionRequest(TransactionRequest.ONE_PHASE_COMMIT_REQUEST, null, tx);
@@ -350,6 +357,13 @@
       ClientTransaction tx = removeTxInternal(xid);
       
       if (trace) { log.trace("got tx: " + tx + " state " + tx.getState()); }
+      
+      
+      if ((tx != null) && tx.isRollbackOnly())
+      {
+         this.rollback(xid, connection);
+         throw new MessagingXAException(XAException.XA_RBROLLBACK, "Transaction marked rollback only, xid: " + xid);
+      }
           
       if (onePhase)
       {
@@ -717,6 +731,21 @@
          }
       }
    }
+
+   /**
+    * @param txID
+    */
+   public void markTxRollbackOnly(Object txID) throws JMSException
+   {
+      ClientTransaction tx = getTxInternal(txID);
+      
+      if (tx == null)
+      {
+         throw new JMSException("There is no transaction with id " + txID);
+      }
+      
+      tx.markRollbackOnly();
+   }
    
    // Inner Classes --------------------------------------------------------------------------------
   



More information about the jboss-cvs-commits mailing list