[jboss-cvs] JBoss Messaging SVN: r8356 - in branches/Branch_1_4: integration/EAP4/etc and 12 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Mon Jun 20 22:45:55 EDT 2011


Author: gaohoward
Date: 2011-06-20 22:45:54 -0400 (Mon, 20 Jun 2011)
New Revision: 8356

Modified:
   branches/Branch_1_4/docs/userguide/en/modules/configuration.xml
   branches/Branch_1_4/integration/EAP4/etc/aop-messaging-client.xml
   branches/Branch_1_4/integration/EAP4/etc/xmdesc/ConnectionFactory-xmbean.xml
   branches/Branch_1_4/integration/EAP5/etc/aop-messaging-client.xml
   branches/Branch_1_4/integration/EAP5/etc/xmdesc/ConnectionFactory-xmbean.xml
   branches/Branch_1_4/src/main/org/jboss/jms/client/container/ClientConsumer.java
   branches/Branch_1_4/src/main/org/jboss/jms/client/container/ConsumerAspect.java
   branches/Branch_1_4/src/main/org/jboss/jms/client/container/SessionAspect.java
   branches/Branch_1_4/src/main/org/jboss/jms/client/container/StateCreationAspect.java
   branches/Branch_1_4/src/main/org/jboss/jms/client/delegate/ClientConnectionDelegate.java
   branches/Branch_1_4/src/main/org/jboss/jms/client/delegate/ClientConnectionFactoryDelegate.java
   branches/Branch_1_4/src/main/org/jboss/jms/client/delegate/ClientSessionDelegate.java
   branches/Branch_1_4/src/main/org/jboss/jms/client/state/ConnectionState.java
   branches/Branch_1_4/src/main/org/jboss/jms/client/state/ConsumerState.java
   branches/Branch_1_4/src/main/org/jboss/jms/client/state/SessionState.java
   branches/Branch_1_4/src/main/org/jboss/jms/delegate/SessionDelegate.java
   branches/Branch_1_4/src/main/org/jboss/jms/server/ConnectionFactoryManager.java
   branches/Branch_1_4/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactory.java
   branches/Branch_1_4/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactoryJNDIMapper.java
   branches/Branch_1_4/src/main/org/jboss/jms/tx/ClientTransaction.java
   branches/Branch_1_4/src/main/org/jboss/jms/tx/ResourceManager.java
   branches/Branch_1_4/tests/src/org/jboss/test/messaging/jms/ConnectionFactoryTest.java
   branches/Branch_1_4/tests/src/org/jboss/test/messaging/jms/clustering/ClusteredConnectionFactoryTest.java
Log:
JBMESSAGING-1876



Modified: branches/Branch_1_4/docs/userguide/en/modules/configuration.xml
===================================================================
--- branches/Branch_1_4/docs/userguide/en/modules/configuration.xml	2011-06-20 12:58:54 UTC (rev 8355)
+++ branches/Branch_1_4/docs/userguide/en/modules/configuration.xml	2011-06-21 02:45:54 UTC (rev 8356)
@@ -2609,6 +2609,14 @@
         <para>This attribute takes effect only if the MaxRetryChangeRate attribute is not zero.</para>
       </section>
 
+      <section id="conf.connectionfactory.attributes.minTimeoutProcessTime">
+        <title>MinTimeoutProcessTime</title>
+
+        <para>Minimum processing time allowed for (milliseconds) in a timeout receive. This is the minimum time for the internal processing after a message is arrived at the client
+        but before returning the message to the application. Default is 300. </para>
+
+      </section>
+
     <!-- End conf.connectionfactory.attributes -->
   </section>
 

Modified: branches/Branch_1_4/integration/EAP4/etc/aop-messaging-client.xml
===================================================================
--- branches/Branch_1_4/integration/EAP4/etc/aop-messaging-client.xml	2011-06-20 12:58:54 UTC (rev 8355)
+++ branches/Branch_1_4/integration/EAP4/etc/aop-messaging-client.xml	2011-06-21 02:45:54 UTC (rev 8356)
@@ -136,6 +136,9 @@
    <bind pointcut="execution(* org.jboss.jms.client.delegate.ClientSessionDelegate->postDeliver(..))">
       <advice name="handlePostDeliver" aspect="org.jboss.jms.client.container.SessionAspect"/>
    </bind>
+   <bind pointcut="execution(* org.jboss.jms.client.delegate.ClientSessionDelegate->processMessageTimeout(..))">
+      <advice name="handleProcessMessageTimeout" aspect="org.jboss.jms.client.container.SessionAspect"/>
+   </bind>
    <bind pointcut="execution(* org.jboss.jms.client.delegate.ClientSessionDelegate->acknowledgeAll(..))">
       <advice name="handleAcknowledgeAll" aspect="org.jboss.jms.client.container.SessionAspect"/>
    </bind>
@@ -283,4 +286,4 @@
    <bind pointcut="execution(* org.jboss.jms.client.delegate.ClientBrowserDelegate->hasNextMessage())">
       <advice name="handleHasNextMessage" aspect="org.jboss.jms.client.container.BrowserAspect"/>
    </bind>         
-</aop>
\ No newline at end of file
+</aop>

Modified: branches/Branch_1_4/integration/EAP4/etc/xmdesc/ConnectionFactory-xmbean.xml
===================================================================
--- branches/Branch_1_4/integration/EAP4/etc/xmdesc/ConnectionFactory-xmbean.xml	2011-06-20 12:58:54 UTC (rev 8355)
+++ branches/Branch_1_4/integration/EAP4/etc/xmdesc/ConnectionFactory-xmbean.xml	2011-06-21 02:45:54 UTC (rev 8356)
@@ -159,6 +159,12 @@
       <name>RetryChangeRateInterval</name>
       <type>long</type>
    </attribute>
+   
+   <attribute access="read-write" getMethod="getMinTimeoutProcessTime" setMethod="setMinTimeoutProcessTime">
+      <description>Minimum processing time allowed for (milliseconds) in a timeout receive. Default is 300.</description>
+      <name>MinTimeoutProcessTime</name>
+      <type>long</type>
+   </attribute>
 
    <!-- Managed operations -->
 

Modified: branches/Branch_1_4/integration/EAP5/etc/aop-messaging-client.xml
===================================================================
--- branches/Branch_1_4/integration/EAP5/etc/aop-messaging-client.xml	2011-06-20 12:58:54 UTC (rev 8355)
+++ branches/Branch_1_4/integration/EAP5/etc/aop-messaging-client.xml	2011-06-21 02:45:54 UTC (rev 8356)
@@ -136,6 +136,9 @@
    <bind pointcut="execution(* org.jboss.jms.client.delegate.ClientSessionDelegate->postDeliver(..))">
       <advice name="handlePostDeliver" aspect="org.jboss.jms.client.container.SessionAspect"/>
    </bind>
+   <bind pointcut="execution(* org.jboss.jms.client.delegate.ClientSessionDelegate->processMessageTimeout(..))">
+      <advice name="handleProcessMessageTimeout" aspect="org.jboss.jms.client.container.SessionAspect"/>
+   </bind>
    <bind pointcut="execution(* org.jboss.jms.client.delegate.ClientSessionDelegate->acknowledgeAll(..))">
       <advice name="handleAcknowledgeAll" aspect="org.jboss.jms.client.container.SessionAspect"/>
    </bind>
@@ -283,4 +286,4 @@
    <bind pointcut="execution(* org.jboss.jms.client.delegate.ClientBrowserDelegate->hasNextMessage())">
       <advice name="handleHasNextMessage" aspect="org.jboss.jms.client.container.BrowserAspect"/>
    </bind>         
-</aop>
\ No newline at end of file
+</aop>

Modified: branches/Branch_1_4/integration/EAP5/etc/xmdesc/ConnectionFactory-xmbean.xml
===================================================================
--- branches/Branch_1_4/integration/EAP5/etc/xmdesc/ConnectionFactory-xmbean.xml	2011-06-20 12:58:54 UTC (rev 8355)
+++ branches/Branch_1_4/integration/EAP5/etc/xmdesc/ConnectionFactory-xmbean.xml	2011-06-21 02:45:54 UTC (rev 8356)
@@ -159,6 +159,12 @@
       <name>RetryChangeRateInterval</name>
       <type>long</type>
    </attribute>
+   
+   <attribute access="read-write" getMethod="getMinTimeoutProcessTime" setMethod="setMinTimeoutProcessTime">
+      <description>Minimum processing time allowed for (milliseconds) in a timeout receive. Default is 300.</description>
+      <name>MinTimeoutProcessTime</name>
+      <type>long</type>
+   </attribute>
 
    <!-- Managed operations -->
 

Modified: branches/Branch_1_4/src/main/org/jboss/jms/client/container/ClientConsumer.java
===================================================================
--- branches/Branch_1_4/src/main/org/jboss/jms/client/container/ClientConsumer.java	2011-06-20 12:58:54 UTC (rev 8355)
+++ branches/Branch_1_4/src/main/org/jboss/jms/client/container/ClientConsumer.java	2011-06-21 02:45:54 UTC (rev 8356)
@@ -315,6 +315,9 @@
    private Object messageSource;
    private boolean isClustered = false;
 
+   //JBMESSAGING-1876
+   private long minTimeoutProcessTime;
+
    public int getBufferSize()
    {
       return buffer.size();
@@ -331,7 +334,8 @@
                          long maxRetryChangeRate,
                          long retryChangeRateInterval,
                          boolean isClustered,
-                         CallbackManager cbManager)
+                         CallbackManager cbManager,
+                         long minTimeoutProcessTime)
    {
       if (bufferSize < 1)
       {
@@ -359,6 +363,7 @@
          consumerLock = new Object();
       }
       messageSource = cbManager;
+      this.minTimeoutProcessTime = minTimeoutProcessTime;
    }
         
    // Public ---------------------------------------------------------------------------------------
@@ -627,15 +632,17 @@
 
                      long tmUsed = System.currentTimeMillis() - startTimestamp;
                      
-                     if (tmUsed >= timeout)
+                     long timeDelta = timeout - tmUsed;
+                     long timeLeft = timeDelta <= minTimeoutProcessTime ? minTimeoutProcessTime : timeDelta ; 
+                     
+                     if (trace)
                      {
-                        log.warn("Timed out before post message processing, discarding message " + m);
-                        throw new JMSException("Timed out before post message processing, discarding message " + m);
+                        log.trace("Time left: " + timeLeft + " timeout " + timeout + " tmUsed " + tmUsed);
                      }
-
+                     
                      try
                      {
-                        ignore = f.get(timeout - tmUsed, TimeUnit.MILLISECONDS);
+                        ignore = f.get(timeLeft, TimeUnit.MILLISECONDS);
                      }
                      catch (InterruptedException e)
                      {
@@ -652,8 +659,9 @@
                      }
                      catch (TimeoutException e)
                      {
-                        log.warn("Timed out waiting for post message processing, discarding message " + m);
-                        throw new JMSException("Timed out waiting for post message processing, discarding message " + m);
+                        log.warn("Timed out waiting for post message processing " + m + " within time " + timeLeft);
+                        ignore = false;
+                        sessionDelegate.processMessageTimeout();
                      }
                   }
 

Modified: branches/Branch_1_4/src/main/org/jboss/jms/client/container/ConsumerAspect.java
===================================================================
--- branches/Branch_1_4/src/main/org/jboss/jms/client/container/ConsumerAspect.java	2011-06-20 12:58:54 UTC (rev 8355)
+++ branches/Branch_1_4/src/main/org/jboss/jms/client/container/ConsumerAspect.java	2011-06-21 02:45:54 UTC (rev 8356)
@@ -120,7 +120,7 @@
                             prefetchSize, executor, maxDeliveries, consumerState.isShouldAck(),
                             redeliveryDelay, consumerState.getMaxRetryChangeRate(), 
                             consumerState.getRetryChangeRateInterval(),
-                            fcc != null, cm);
+                            fcc != null, cm, consumerState.getMinTimeoutProcessTime());
       
       sessionState.addCallbackHandler(messageHandler);
       

Modified: branches/Branch_1_4/src/main/org/jboss/jms/client/container/SessionAspect.java
===================================================================
--- branches/Branch_1_4/src/main/org/jboss/jms/client/container/SessionAspect.java	2011-06-20 12:58:54 UTC (rev 8355)
+++ branches/Branch_1_4/src/main/org/jboss/jms/client/container/SessionAspect.java	2011-06-21 02:45:54 UTC (rev 8356)
@@ -409,6 +409,59 @@
       }
    }
    
+   //JBMESSAGING-1876
+   public Object handleProcessMessageTimeout(Invocation invocation) throws Throwable
+   { 
+      MethodInvocation mi = (MethodInvocation)invocation;
+      SessionState state = getState(invocation);
+      
+      boolean result = true;
+      
+      synchronized (state)
+      {
+      
+         int ackMode = state.getAcknowledgeMode();
+                  
+         if (ackMode == Session.CLIENT_ACKNOWLEDGE)
+         {
+            if (trace)
+            {
+               log.trace("Don't do anything about CLIENT_ACK mode");
+            }
+         }
+         else if (ackMode == Session.AUTO_ACKNOWLEDGE)
+         {
+            if (trace)
+            {
+               log.trace("Don't do anything about AUTO_ACK mode");
+            }
+         }
+         else if (ackMode == Session.DUPS_OK_ACKNOWLEDGE)
+         {
+            if (trace)
+            {
+               log.trace("Don't do anything about DUPS_OK_ACK mode");
+            }
+         }
+         else
+         {             
+            Object txID = state.getCurrentTxId();
+      
+            if (txID != null)
+            {
+
+               if (trace) { log.trace("Marking tx " + txID + " to be rollback only"); }
+               
+               ConnectionState connState = (ConnectionState)state.getParent();
+               
+               connState.getResourceManager().markTxRollbackOnly(txID);
+            }        
+         }
+      }
+      
+      return null;
+   }
+   
    /**
     * Used for client acknowledge.
     */

Modified: branches/Branch_1_4/src/main/org/jboss/jms/client/container/StateCreationAspect.java
===================================================================
--- branches/Branch_1_4/src/main/org/jboss/jms/client/container/StateCreationAspect.java	2011-06-20 12:58:54 UTC (rev 8355)
+++ branches/Branch_1_4/src/main/org/jboss/jms/client/container/StateCreationAspect.java	2011-06-21 02:45:54 UTC (rev 8356)
@@ -100,7 +100,8 @@
             new ConnectionState(serverID, connectionDelegate,
                                 remotingConnection, versionToUse, 
                                 connectionDelegate.isEnableOrderingGroup(), connectionDelegate.getDefaultOrderingGroupName(),
-                                connectionDelegate.getMaxRetryChangeRate(), connectionDelegate.getRetryChangeRateInterval());
+                                connectionDelegate.getMaxRetryChangeRate(), connectionDelegate.getRetryChangeRateInterval(),
+                                connectionDelegate.getMinTimeoutProcessTime());
 
          remotingConnection.getConnectionListener().setConnectionState(connectionState);
          remotingConnection.getConnectionListener().start();

Modified: branches/Branch_1_4/src/main/org/jboss/jms/client/delegate/ClientConnectionDelegate.java
===================================================================
--- branches/Branch_1_4/src/main/org/jboss/jms/client/delegate/ClientConnectionDelegate.java	2011-06-20 12:58:54 UTC (rev 8355)
+++ branches/Branch_1_4/src/main/org/jboss/jms/client/delegate/ClientConnectionDelegate.java	2011-06-21 02:45:54 UTC (rev 8356)
@@ -90,6 +90,8 @@
    private long maxRetryChangeRate;
    
    private long retryChangeRateInterval;
+   
+   private long minTimeoutProcessTime;
 
    // Static ---------------------------------------------------------------------------------------
 
@@ -396,6 +398,16 @@
    {
       this.retryChangeRateInterval = retryChangeRateInterval;
    }
+
+   public long getMinTimeoutProcessTime()
+   {
+      return minTimeoutProcessTime;
+   }
+
+   public void setMinTimeoutProcessTime(long minTimeoutProcessTime)
+   {
+      this.minTimeoutProcessTime = minTimeoutProcessTime;
+   }
    
    // Package Private ------------------------------------------------------------------------------
 

Modified: branches/Branch_1_4/src/main/org/jboss/jms/client/delegate/ClientConnectionFactoryDelegate.java
===================================================================
--- branches/Branch_1_4/src/main/org/jboss/jms/client/delegate/ClientConnectionFactoryDelegate.java	2011-06-20 12:58:54 UTC (rev 8355)
+++ branches/Branch_1_4/src/main/org/jboss/jms/client/delegate/ClientConnectionFactoryDelegate.java	2011-06-21 02:45:54 UTC (rev 8356)
@@ -88,6 +88,8 @@
    
    private long retryChangeRateInterval;
    
+   private long minTimeoutProcessTime;
+   
    // Static ---------------------------------------------------------------------------------------
    
    /*
@@ -119,7 +121,7 @@
    public ClientConnectionFactoryDelegate(String uniqueName, String objectID, int serverID, String serverLocatorURI,
                                           Version serverVersion, boolean clientPing, boolean strictTck,
                                           boolean sendAcksAsync, boolean enableOrderingGroup, String defaultOrderingGroupName,
-                                          long maxRetryChangeRate, long retryChangeRateInterval)
+                                          long maxRetryChangeRate, long retryChangeRateInterval, long minTimeoutProcessTime)
    {
       super(objectID);
 
@@ -134,6 +136,7 @@
       this.setDefaultOrderingGroupName(defaultOrderingGroupName);
       this.maxRetryChangeRate = maxRetryChangeRate;
       this.retryChangeRateInterval = retryChangeRateInterval;
+      this.minTimeoutProcessTime = minTimeoutProcessTime;
    }
    
    public ClientConnectionFactoryDelegate()
@@ -220,6 +223,8 @@
          connectionDelegate.setMaxRetryChangeRate(this.maxRetryChangeRate);
          
          connectionDelegate.setRetryChangeRateInterval(this.retryChangeRateInterval);
+         
+         connectionDelegate.setMinTimeoutProcessTime(this.minTimeoutProcessTime);
       }
       else
       {
@@ -387,6 +392,8 @@
       maxRetryChangeRate = in.readLong();
       
       retryChangeRateInterval = in.readLong();
+      
+      minTimeoutProcessTime = in.readLong();
    }
 
    public void write(DataOutputStream out) throws Exception
@@ -408,6 +415,8 @@
       out.writeLong(this.maxRetryChangeRate);
       
       out.writeLong(this.retryChangeRateInterval);
+      
+      out.writeLong(this.minTimeoutProcessTime);
    }
 
    /**

Modified: branches/Branch_1_4/src/main/org/jboss/jms/client/delegate/ClientSessionDelegate.java
===================================================================
--- branches/Branch_1_4/src/main/org/jboss/jms/client/delegate/ClientSessionDelegate.java	2011-06-20 12:58:54 UTC (rev 8355)
+++ branches/Branch_1_4/src/main/org/jboss/jms/client/delegate/ClientSessionDelegate.java	2011-06-21 02:45:54 UTC (rev 8356)
@@ -549,6 +549,11 @@
       return "SessionDelegate[" + System.identityHashCode(this) + ", ID=" + id + "]";
    }
 
+   public void processMessageTimeout() throws JMSException
+   {
+      throw new IllegalStateException("This invocation should not be handled here!");
+   }
+
    // Protected ------------------------------------------------------------------------------------
 
    // Package Private ------------------------------------------------------------------------------

Modified: branches/Branch_1_4/src/main/org/jboss/jms/client/state/ConnectionState.java
===================================================================
--- branches/Branch_1_4/src/main/org/jboss/jms/client/state/ConnectionState.java	2011-06-20 12:58:54 UTC (rev 8355)
+++ branches/Branch_1_4/src/main/org/jboss/jms/client/state/ConnectionState.java	2011-06-21 02:45:54 UTC (rev 8356)
@@ -96,13 +96,15 @@
    private long maxRetryChangeRate;
    
    private long retryChangeRateInterval;
+   
+   private long minTimeoutProcessTime;
 
    // Constructors ---------------------------------------------------------------------------------
 
    public ConnectionState(int serverID, ConnectionDelegate delegate,
                           JMSRemotingConnection remotingConnection,
                           Version versionToUse, boolean enableOrderingGroup, String defaultOrderingGroupName,
-                          long maxRetryChangeRate, long retryChangeRateInterval)
+                          long maxRetryChangeRate, long retryChangeRateInterval, long minTimeoutProcessTime)
       throws Exception
    {
       super(null, (DelegateSupport)delegate);
@@ -128,6 +130,8 @@
       this.maxRetryChangeRate = maxRetryChangeRate;
       
       this.retryChangeRateInterval = retryChangeRateInterval;
+      
+      this.minTimeoutProcessTime = minTimeoutProcessTime;
    }
 
    // HierarchicalState implementation -------------------------------------------------------------
@@ -338,6 +342,11 @@
       
    }
 
+   public long getMinTimeoutProcessTime()
+   {
+      return minTimeoutProcessTime;
+   }
+
    // Package protected ----------------------------------------------------------------------------
 
    // Protected ------------------------------------------------------------------------------------

Modified: branches/Branch_1_4/src/main/org/jboss/jms/client/state/ConsumerState.java
===================================================================
--- branches/Branch_1_4/src/main/org/jboss/jms/client/state/ConsumerState.java	2011-06-20 12:58:54 UTC (rev 8355)
+++ branches/Branch_1_4/src/main/org/jboss/jms/client/state/ConsumerState.java	2011-06-21 02:45:54 UTC (rev 8356)
@@ -69,6 +69,8 @@
    private long maxRetryChangeRate;
    private long retryChangeRateInterval;
    
+   private long minTimeoutProcessTime;
+   
    // Constructors ---------------------------------------------------------------------------------
 
    public ConsumerState(SessionState parent, ConsumerDelegate delegate, JBossDestination dest,
@@ -88,6 +90,7 @@
       this.redeliveryDelay = redeliveryDelay;
       this.maxRetryChangeRate = parent.getMaxRetryChangeRate();
       this.retryChangeRateInterval = parent.getRetryChangeRateInterval();
+      this.minTimeoutProcessTime = parent.getMinTimeoutProcessTime();
     
       //We don't store deliveries if this a non durable subscriber
       
@@ -241,6 +244,11 @@
       clientConsumer.abortReceive();
    }
 
+   public long getMinTimeoutProcessTime()
+   {
+      return this.minTimeoutProcessTime;
+   }
+
    // Package protected ----------------------------------------------------------------------------
 
    // Protected ------------------------------------------------------------------------------------

Modified: branches/Branch_1_4/src/main/org/jboss/jms/client/state/SessionState.java
===================================================================
--- branches/Branch_1_4/src/main/org/jboss/jms/client/state/SessionState.java	2011-06-20 12:58:54 UTC (rev 8355)
+++ branches/Branch_1_4/src/main/org/jboss/jms/client/state/SessionState.java	2011-06-21 02:45:54 UTC (rev 8356)
@@ -646,5 +646,10 @@
       }
    }
 
+   public long getMinTimeoutProcessTime()
+   {
+      return parent.getMinTimeoutProcessTime();
+   }
+
 }
 

Modified: branches/Branch_1_4/src/main/org/jboss/jms/delegate/SessionDelegate.java
===================================================================
--- branches/Branch_1_4/src/main/org/jboss/jms/delegate/SessionDelegate.java	2011-06-20 12:58:54 UTC (rev 8355)
+++ branches/Branch_1_4/src/main/org/jboss/jms/delegate/SessionDelegate.java	2011-06-21 02:45:54 UTC (rev 8356)
@@ -95,4 +95,6 @@
    ProducerDelegate createProducerDelegate(JBossDestination destination) throws JMSException;
 
    void acknowledgeAll() throws JMSException;
+
+   void processMessageTimeout() throws JMSException;
 }

Modified: branches/Branch_1_4/src/main/org/jboss/jms/server/ConnectionFactoryManager.java
===================================================================
--- branches/Branch_1_4/src/main/org/jboss/jms/server/ConnectionFactoryManager.java	2011-06-20 12:58:54 UTC (rev 8355)
+++ branches/Branch_1_4/src/main/org/jboss/jms/server/ConnectionFactoryManager.java	2011-06-21 02:45:54 UTC (rev 8356)
@@ -55,7 +55,8 @@
                                  boolean enableOrderingGroup,
                                  String defaultOrderingGroupName,
                                  long maxRetryChangeRate,
-                                 long retryChangeRateInterval) throws Exception;
+                                 long retryChangeRateInterval,
+                                 long minTimeoutProcessTime) throws Exception;
 
    void unregisterConnectionFactory(String uniqueName, boolean supportsFailover, boolean supportsLoadBalancing) throws Exception;
 }

Modified: branches/Branch_1_4/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactory.java
===================================================================
--- branches/Branch_1_4/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactory.java	2011-06-20 12:58:54 UTC (rev 8355)
+++ branches/Branch_1_4/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactory.java	2011-06-21 02:45:54 UTC (rev 8356)
@@ -92,6 +92,8 @@
    private long maxRetryChangeRate = 0;
    
    private long retryChangeRateInterval = 5000;
+   
+   private long minTimeoutProcessTime = 300;
 
    // Constructors ---------------------------------------------------------------------------------
 
@@ -213,7 +215,7 @@
                                       defaultTempQueueFullSize, defaultTempQueuePageSize,                                      
                                       defaultTempQueueDownCacheSize, dupsOKBatchSize, supportsFailover, supportsLoadBalancing,
                                       loadBalancingFactory, strictTck, sendAcksAsync, enableOrderingGroup, defaultOrderingGroupName,
-                                      maxRetryChangeRate, retryChangeRateInterval);               
+                                      maxRetryChangeRate, retryChangeRateInterval, minTimeoutProcessTime);               
          
          String info = "Connector " + locator.getProtocol() + "://" +
             locator.getHost() + ":" + locator.getPort();
@@ -514,6 +516,16 @@
       this.retryChangeRateInterval = retryChangeRateInterval;
    }
    
+   public long getMinTimeoutProcessTime()
+   {
+      return minTimeoutProcessTime;
+   }
+   
+   public void setMinTimeoutProcessTime(long minTimeoutProcessTime)
+   {
+      this.minTimeoutProcessTime = minTimeoutProcessTime;
+   }
+   
    // JMX managed operations -----------------------------------------------------------------------
 
    // Public ---------------------------------------------------------------------------------------

Modified: branches/Branch_1_4/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactoryJNDIMapper.java
===================================================================
--- branches/Branch_1_4/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactoryJNDIMapper.java	2011-06-20 12:58:54 UTC (rev 8355)
+++ branches/Branch_1_4/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactoryJNDIMapper.java	2011-06-21 02:45:54 UTC (rev 8356)
@@ -133,7 +133,8 @@
                                                       boolean enableOrderingGroup,
                                                       String defaultOrderingGroupName,
                                                       long maxRetryChangeRate,
-                                                      long retryChangeRateInterval)
+                                                      long retryChangeRateInterval,
+                                                      long minTimeoutProcessTime)
       throws Exception
    {
       log.debug(this + " registering connection factory '" + uniqueName + "', bindings: " + jndiBindings);
@@ -189,7 +190,7 @@
          new ClientConnectionFactoryDelegate(uniqueName, id, serverPeer.getServerPeerID(),
                                              locatorURI, version, clientPing, useStrict,
                                              sendAcksAsync, enableOrderingGroup, defaultOrderingGroupName,
-                                             maxRetryChangeRate, retryChangeRateInterval);
+                                             maxRetryChangeRate, retryChangeRateInterval, minTimeoutProcessTime);
 
       log.debug(this + " created local delegate " + localDelegate);
 

Modified: branches/Branch_1_4/src/main/org/jboss/jms/tx/ClientTransaction.java
===================================================================
--- branches/Branch_1_4/src/main/org/jboss/jms/tx/ClientTransaction.java	2011-06-20 12:58:54 UTC (rev 8355)
+++ branches/Branch_1_4/src/main/org/jboss/jms/tx/ClientTransaction.java	2011-06-21 02:45:54 UTC (rev 8356)
@@ -65,6 +65,8 @@
    private boolean clientSide;
    
    private boolean recovered;
+   
+   private boolean rollbackOnly = false;
 
    // Static --------------------------------------------------------
 
@@ -502,4 +504,14 @@
 
    }
 
+   public void markRollbackOnly()
+   {
+      this.rollbackOnly = true;
+   }
+
+   public boolean isRollbackOnly()
+   {
+      return this.rollbackOnly;
+   }
+
 }

Modified: branches/Branch_1_4/src/main/org/jboss/jms/tx/ResourceManager.java
===================================================================
--- branches/Branch_1_4/src/main/org/jboss/jms/tx/ResourceManager.java	2011-06-20 12:58:54 UTC (rev 8355)
+++ branches/Branch_1_4/src/main/org/jboss/jms/tx/ResourceManager.java	2011-06-21 02:45:54 UTC (rev 8356)
@@ -197,6 +197,13 @@
       {
          throw new IllegalStateException("Cannot find transaction " + xid);
       }
+      
+      //if tx is mark rollback, roll back immediately
+      if (tx.isRollbackOnly())
+      {
+         this.rollbackLocal(xid, tx);
+         throw new MessagingTransactionRolledBackException("Rolled back " + tx + " as it is marked rollback only!");
+      }
                   
       TransactionRequest request =
          new TransactionRequest(TransactionRequest.ONE_PHASE_COMMIT_REQUEST, null, tx);
@@ -239,6 +246,16 @@
          throw new IllegalStateException("Cannot find transaction with xid:" + xid);         
       }
       
+      this.rollbackLocal(xid, ts);
+   }
+   
+   private void rollbackLocal(Object xid, ClientTransaction ts) throws JMSException
+   {
+      if (ts == null)
+      {      
+         throw new IllegalStateException("Cannot find transaction with xid:" + xid);         
+      }
+      
       // don't need messages for rollback
       // We don't clear the acks since we need to redeliver locally
       ts.clearMessages();
@@ -329,7 +346,13 @@
       if (state == null)
       { 
          throw new MessagingXAException(XAException.XAER_NOTA, "Cannot find transaction with xid:" + xid);
-      } 
+      }
+
+      if (state.isRollbackOnly())
+      {
+         this.rollback(xid, state, connection);
+         throw new MessagingXAException(XAException.XA_RBROLLBACK, "Transaction marked rollback only, xid: " + xid);
+      }
       
       TransactionRequest request =
          new TransactionRequest(TransactionRequest.TWO_PHASE_PREPARE_REQUEST, xid, state);
@@ -350,6 +373,13 @@
       ClientTransaction tx = removeTxInternal(xid);
       
       if (trace) { log.trace("got tx: " + tx + " state " + tx.getState()); }
+      
+      //roll back for onePhase only. for 2pc, rollback only is processed in prepare
+      if ((tx != null) && tx.isRollbackOnly() && onePhase)
+      {
+         this.rollback(xid, tx, connection);
+         throw new MessagingXAException(XAException.XA_RBROLLBACK, "Transaction marked rollback only, xid: " + xid);
+      }
           
       if (onePhase)
       {
@@ -407,6 +437,16 @@
          throw new java.lang.IllegalStateException("Cannot find xid to remove " + xid);
       }
       
+      this.rollback(xid, tx, connection);
+   }
+   
+   private void rollback(Xid xid, ClientTransaction tx, ConnectionDelegate connection) throws XAException
+   {
+      if (tx == null)
+      {
+         throw new java.lang.IllegalStateException("Cannot find xid to remove " + xid);
+      }
+      
       //It's possible we don't actually have the prepared tx here locally - this
       //may happen if we have recovered from failure and the transaction manager
       //is calling rollback on the transaction as part of the recovery process.
@@ -414,12 +454,9 @@
       TransactionRequest request = null;
       
       //don't need the messages
-      if (tx != null)
-      {
-         tx.clearMessages();
-      }
+      tx.clearMessages();
              
-      if ((tx == null) || tx.getState() == ClientTransaction.TX_PREPARED)
+      if (tx.getState() == ClientTransaction.TX_PREPARED)
       {
          //2PC rollback
          
@@ -432,13 +469,9 @@
       else
       {
          //For one phase rollback there is nothing to do on the server 
-         
-         if (tx == null)
-         {     
-            throw new MessagingXAException(XAException.XAER_NOTA, "Cannot find transaction with xid:" + xid);
-         }
+         if (trace) { log.trace("It is a one phase roll back, tx: " + tx); }
       }
-                  
+
       //we redeliver the messages
       //locally to their original consumers if they are still open or cancel them to the server
       //if the original consumers have closed
@@ -447,20 +480,14 @@
       
       try
       {
-         if (tx != null)
-         {
-            redeliverMessages(tx);
-            
-            tx.setState(ClientTransaction.TX_ROLLEDBACK);  
-         }
-         
+         redeliverMessages(tx);
+         tx.setState(ClientTransaction.TX_ROLLEDBACK);  
       }
       catch (JMSException e)
       {
          log.error("Failed to redeliver", e);
       }                               
-   }
-  
+   }  
    
    Xid joinTx(Xid xid) throws XAException
    {
@@ -717,6 +744,21 @@
          }
       }
    }
+
+   /**
+    * @param txID
+    */
+   public void markTxRollbackOnly(Object txID) throws JMSException
+   {
+      ClientTransaction tx = getTxInternal(txID);
+      
+      if (tx == null)
+      {
+         throw new JMSException("There is no transaction with id " + txID);
+      }
+      
+      tx.markRollbackOnly();
+   }
    
    // Inner Classes --------------------------------------------------------------------------------
   

Modified: branches/Branch_1_4/tests/src/org/jboss/test/messaging/jms/ConnectionFactoryTest.java
===================================================================
--- branches/Branch_1_4/tests/src/org/jboss/test/messaging/jms/ConnectionFactoryTest.java	2011-06-20 12:58:54 UTC (rev 8355)
+++ branches/Branch_1_4/tests/src/org/jboss/test/messaging/jms/ConnectionFactoryTest.java	2011-06-21 02:45:54 UTC (rev 8356)
@@ -969,6 +969,109 @@
       }
    }
 
+   //https://issues.jboss.org/browse/JBMESSAGING-1876
+   public void testMinTimeoutProcessTimeConfigSettings() throws Exception
+   {
+      Connection c = null;
+
+      try
+      {
+         String mbeanConfig = "<mbean code=\"org.jboss.jms.server.connectionfactory.ConnectionFactory\"\n" + "       name=\"jboss.messaging.connectionfactory:service=TestTimeoutProcessConfigSettingsFactory\"\n"
+                              + "       xmbean-dd=\"xmdesc/ConnectionFactory-xmbean.xml\">\n"
+                              + "       <depends optional-attribute-name=\"ServerPeer\">jboss.messaging:service=ServerPeer</depends>\n"
+                              + "       <depends optional-attribute-name=\"Connector\">jboss.messaging:service=Connector,transport=bisocket</depends>\n"
+                              + "       <attribute name=\"JNDIBindings\">\n"
+                              + "          <bindings>\n"
+                              + "            <binding>/TestTimeoutProcessConfigSettingsFactory</binding>\n"
+                              + "          </bindings>\n"
+                              + "       </attribute>\n"
+                              + "       <attribute name=\"MinTimeoutProcessTime\">2345</attribute>\n"
+                              + " </mbean>";
+
+         ObjectName on = ServerManagement.deploy(mbeanConfig);
+         ServerManagement.invoke(on, "create", new Object[0], new String[0]);
+         ServerManagement.invoke(on, "start", new Object[0], new String[0]);
+
+         ConnectionFactory cf = (ConnectionFactory)ic.lookup("/TestTimeoutProcessConfigSettingsFactory");
+         c = cf.createConnection();
+         
+         ClientConnectionDelegate del1 = (ClientConnectionDelegate)((JBossConnection)c).getDelegate();
+         
+         ConnectionState state1 = (ConnectionState)del1.getState();
+         
+         long minTime = state1.getMinTimeoutProcessTime();
+         
+         assertEquals(2345, minTime);
+      }
+      finally
+      {
+         try
+         {
+            if (c != null)
+            {
+               log.info("Closing connection");
+               c.close();
+               log.info("Closed connection");
+            }
+         }
+         catch (Exception e)
+         {
+            log.warn(e.toString(), e);
+         }
+      }
+   }
+
+   //https://issues.jboss.org/browse/JBMESSAGING-1876
+   public void testMinTimeoutProcessTimeDefaults() throws Exception
+   {
+      Connection c = null;
+
+      try
+      {
+         String mbeanConfig = "<mbean code=\"org.jboss.jms.server.connectionfactory.ConnectionFactory\"\n" + "       name=\"jboss.messaging.connectionfactory:service=TestTimeoutProcessConfigDefaultSettingsFactory\"\n"
+                              + "       xmbean-dd=\"xmdesc/ConnectionFactory-xmbean.xml\">\n"
+                              + "       <depends optional-attribute-name=\"ServerPeer\">jboss.messaging:service=ServerPeer</depends>\n"
+                              + "       <depends optional-attribute-name=\"Connector\">jboss.messaging:service=Connector,transport=bisocket</depends>\n"
+                              + "       <attribute name=\"JNDIBindings\">\n"
+                              + "          <bindings>\n"
+                              + "            <binding>/TestTimeoutProcessConfigDefaultSettingsFactory</binding>\n"
+                              + "          </bindings>\n"
+                              + "       </attribute>\n"
+                              + " </mbean>";
+
+         ObjectName on = ServerManagement.deploy(mbeanConfig);
+         ServerManagement.invoke(on, "create", new Object[0], new String[0]);
+         ServerManagement.invoke(on, "start", new Object[0], new String[0]);
+
+         ConnectionFactory cf = (ConnectionFactory)ic.lookup("/TestTimeoutProcessConfigDefaultSettingsFactory");
+         c = cf.createConnection();
+         
+         ClientConnectionDelegate del1 = (ClientConnectionDelegate)((JBossConnection)c).getDelegate();
+         
+         ConnectionState state1 = (ConnectionState)del1.getState();
+         
+         long minTime = state1.getMinTimeoutProcessTime();
+         
+         assertEquals(300, minTime);
+      }
+      finally
+      {
+         try
+         {
+            if (c != null)
+            {
+               log.info("Closing connection");
+               c.close();
+               log.info("Closed connection");
+            }
+         }
+         catch (Exception e)
+         {
+            log.warn(e.toString(), e);
+         }
+      }
+   }
+
    // Package protected ---------------------------------------------
 
    // Protected -----------------------------------------------------

Modified: branches/Branch_1_4/tests/src/org/jboss/test/messaging/jms/clustering/ClusteredConnectionFactoryTest.java
===================================================================
--- branches/Branch_1_4/tests/src/org/jboss/test/messaging/jms/clustering/ClusteredConnectionFactoryTest.java	2011-06-20 12:58:54 UTC (rev 8355)
+++ branches/Branch_1_4/tests/src/org/jboss/test/messaging/jms/clustering/ClusteredConnectionFactoryTest.java	2011-06-21 02:45:54 UTC (rev 8356)
@@ -397,6 +397,169 @@
       }
    }
 
+   //JBMESSAGING-1876
+   public void testMinTimeoutProcessTimeSettings() throws Exception
+   {
+      Connection c = null;
+
+      try
+      {
+         String mbeanConfig = "<mbean code=\"org.jboss.jms.server.connectionfactory.ConnectionFactory\"\n" + "       name=\"jboss.messaging.connectionfactory:service=TestChangeRateConfigSettingsFactory\"\n"
+                              + "       xmbean-dd=\"xmdesc/ConnectionFactory-xmbean.xml\">\n"
+                              + "       <depends optional-attribute-name=\"ServerPeer\">jboss.messaging:service=ServerPeer</depends>\n"
+                              + "       <depends optional-attribute-name=\"Connector\">jboss.messaging:service=Connector,transport=bisocket</depends>\n"
+                              + "       <attribute name=\"JNDIBindings\">\n"
+                              + "          <bindings>\n"
+                              + "            <binding>/ClusteredTestMinTimeoutProcessTimeConfigSettingsFactory</binding>\n"
+                              + "          </bindings>\n"
+                              + "       </attribute>\n"
+                              + "       <attribute name=\"SupportsFailover\">true</attribute>"
+                              + "       <attribute name=\"SupportsLoadBalancing\">true</attribute>"
+                              + "       <attribute name=\"MinTimeoutProcessTime\">100</attribute>\n"
+                              + " </mbean>";
+
+         ObjectName on = ServerManagement.deploy(mbeanConfig);
+         ServerManagement.invoke(on, "create", new Object[0], new String[0]);
+         ServerManagement.invoke(on, "start", new Object[0], new String[0]);
+
+         ConnectionFactory cf = (ConnectionFactory)ic[0].lookup("/ClusteredTestMinTimeoutProcessTimeConfigSettingsFactory");
+         c = cf.createConnection();
+         
+         ClientConnectionDelegate del1 = (ClientConnectionDelegate)((JBossConnection)c).getDelegate();
+         
+         ConnectionState state1 = (ConnectionState)del1.getState();
+         
+         long minTime = state1.getMinTimeoutProcessTime();
+         
+         assertEquals(100, minTime);
+      }
+      finally
+      {
+         try
+         {
+            if (c != null)
+            {
+               log.info("Closing connection");
+               c.close();
+               log.info("Closed connection");
+            }
+         }
+         catch (Exception e)
+         {
+            log.warn(e.toString(), e);
+         }
+      }
+   }
+
+   //https://issues.jboss.org/browse/JBMESSAGING-1876
+   public void testMinTimeoutProcessTimeSettings2() throws Exception
+   {
+      Connection c = null;
+
+      try
+      {
+         String mbeanConfig = "<mbean code=\"org.jboss.jms.server.connectionfactory.ConnectionFactory\"\n" + "       name=\"jboss.messaging.connectionfactory:service=TestChangeRateConfigSettingsFactory\"\n"
+                              + "       xmbean-dd=\"xmdesc/ConnectionFactory-xmbean.xml\">\n"
+                              + "       <depends optional-attribute-name=\"ServerPeer\">jboss.messaging:service=ServerPeer</depends>\n"
+                              + "       <depends optional-attribute-name=\"Connector\">jboss.messaging:service=Connector,transport=bisocket</depends>\n"
+                              + "       <attribute name=\"JNDIBindings\">\n"
+                              + "          <bindings>\n"
+                              + "            <binding>/ClusteredTestMinTimeoutProcessTimeConfigSettingsFactory</binding>\n"
+                              + "          </bindings>\n"
+                              + "       </attribute>\n"
+                              + "       <attribute name=\"SupportsFailover\">false</attribute>"
+                              + "       <attribute name=\"SupportsLoadBalancing\">true</attribute>"
+                              + "       <attribute name=\"MinTimeoutProcessTime\">100</attribute>\n"
+                              + "       <attribute name=\"MaxRetryChangeRate\">10</attribute>\n"
+                              + "       <attribute name=\"RetryChangeRateInterval\">2345</attribute>\n"
+                              + " </mbean>";
+
+         ObjectName on = ServerManagement.deploy(mbeanConfig);
+         ServerManagement.invoke(on, "create", new Object[0], new String[0]);
+         ServerManagement.invoke(on, "start", new Object[0], new String[0]);
+
+         ConnectionFactory cf = (ConnectionFactory)ic[0].lookup("/ClusteredTestMinTimeoutProcessTimeConfigSettingsFactory");
+         c = cf.createConnection();
+         
+         ClientConnectionDelegate del1 = (ClientConnectionDelegate)((JBossConnection)c).getDelegate();
+         
+         ConnectionState state1 = (ConnectionState)del1.getState();
+         
+         long minTime = state1.getMinTimeoutProcessTime();
+         
+         assertEquals(100, minTime);
+      }
+      finally
+      {
+         try
+         {
+            if (c != null)
+            {
+               log.info("Closing connection");
+               c.close();
+               log.info("Closed connection");
+            }
+         }
+         catch (Exception e)
+         {
+            log.warn(e.toString(), e);
+         }
+      }
+   }
+
+   //https://issues.jboss.org/browse/JBMESSAGING-1876
+   public void testMinTimeoutProcessTimeDefaults() throws Exception
+   {
+      Connection c = null;
+
+      try
+      {
+         String mbeanConfig = "<mbean code=\"org.jboss.jms.server.connectionfactory.ConnectionFactory\"\n" + "       name=\"jboss.messaging.connectionfactory:service=TestChangeRateConfigDefaultsFactory\"\n"
+                              + "       xmbean-dd=\"xmdesc/ConnectionFactory-xmbean.xml\">\n"
+                              + "       <depends optional-attribute-name=\"ServerPeer\">jboss.messaging:service=ServerPeer</depends>\n"
+                              + "       <depends optional-attribute-name=\"Connector\">jboss.messaging:service=Connector,transport=bisocket</depends>\n"
+                              + "       <attribute name=\"JNDIBindings\">\n"
+                              + "          <bindings>\n"
+                              + "            <binding>/ClusteredTestChangeRateConfigDefaultsFactory</binding>\n"
+                              + "          </bindings>\n"
+                              + "       </attribute>\n"
+                              + "       <attribute name=\"SupportsFailover\">true</attribute>"
+                              + "       <attribute name=\"SupportsLoadBalancing\">true</attribute>"
+                              + " </mbean>";
+
+         ObjectName on = ServerManagement.deploy(mbeanConfig);
+         ServerManagement.invoke(on, "create", new Object[0], new String[0]);
+         ServerManagement.invoke(on, "start", new Object[0], new String[0]);
+
+         ConnectionFactory cf = (ConnectionFactory)ic[0].lookup("/ClusteredTestChangeRateConfigDefaultsFactory");
+         c = cf.createConnection();
+         
+         ClientConnectionDelegate del1 = (ClientConnectionDelegate)((JBossConnection)c).getDelegate();
+         
+         ConnectionState state1 = (ConnectionState)del1.getState();
+         
+         long minTime = state1.getMinTimeoutProcessTime();
+         
+         assertEquals(300, minTime);
+      }
+      finally
+      {
+         try
+         {
+            if (c != null)
+            {
+               log.info("Closing connection");
+               c.close();
+               log.info("Closed connection");
+            }
+         }
+         catch (Exception e)
+         {
+            log.warn(e.toString(), e);
+         }
+      }
+   }
+
    // Package protected ----------------------------------------------------------------------------
 
    // Protected ------------------------------------------------------------------------------------



More information about the jboss-cvs-commits mailing list