[jboss-cvs] JBoss Messaging SVN: r8356 - in branches/Branch_1_4: integration/EAP4/etc and 12 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Mon Jun 20 22:45:55 EDT 2011
Author: gaohoward
Date: 2011-06-20 22:45:54 -0400 (Mon, 20 Jun 2011)
New Revision: 8356
Modified:
branches/Branch_1_4/docs/userguide/en/modules/configuration.xml
branches/Branch_1_4/integration/EAP4/etc/aop-messaging-client.xml
branches/Branch_1_4/integration/EAP4/etc/xmdesc/ConnectionFactory-xmbean.xml
branches/Branch_1_4/integration/EAP5/etc/aop-messaging-client.xml
branches/Branch_1_4/integration/EAP5/etc/xmdesc/ConnectionFactory-xmbean.xml
branches/Branch_1_4/src/main/org/jboss/jms/client/container/ClientConsumer.java
branches/Branch_1_4/src/main/org/jboss/jms/client/container/ConsumerAspect.java
branches/Branch_1_4/src/main/org/jboss/jms/client/container/SessionAspect.java
branches/Branch_1_4/src/main/org/jboss/jms/client/container/StateCreationAspect.java
branches/Branch_1_4/src/main/org/jboss/jms/client/delegate/ClientConnectionDelegate.java
branches/Branch_1_4/src/main/org/jboss/jms/client/delegate/ClientConnectionFactoryDelegate.java
branches/Branch_1_4/src/main/org/jboss/jms/client/delegate/ClientSessionDelegate.java
branches/Branch_1_4/src/main/org/jboss/jms/client/state/ConnectionState.java
branches/Branch_1_4/src/main/org/jboss/jms/client/state/ConsumerState.java
branches/Branch_1_4/src/main/org/jboss/jms/client/state/SessionState.java
branches/Branch_1_4/src/main/org/jboss/jms/delegate/SessionDelegate.java
branches/Branch_1_4/src/main/org/jboss/jms/server/ConnectionFactoryManager.java
branches/Branch_1_4/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactory.java
branches/Branch_1_4/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactoryJNDIMapper.java
branches/Branch_1_4/src/main/org/jboss/jms/tx/ClientTransaction.java
branches/Branch_1_4/src/main/org/jboss/jms/tx/ResourceManager.java
branches/Branch_1_4/tests/src/org/jboss/test/messaging/jms/ConnectionFactoryTest.java
branches/Branch_1_4/tests/src/org/jboss/test/messaging/jms/clustering/ClusteredConnectionFactoryTest.java
Log:
JBMESSAGING-1876
Modified: branches/Branch_1_4/docs/userguide/en/modules/configuration.xml
===================================================================
--- branches/Branch_1_4/docs/userguide/en/modules/configuration.xml 2011-06-20 12:58:54 UTC (rev 8355)
+++ branches/Branch_1_4/docs/userguide/en/modules/configuration.xml 2011-06-21 02:45:54 UTC (rev 8356)
@@ -2609,6 +2609,14 @@
<para>This attribute takes effect only if the MaxRetryChangeRate attribute is not zero.</para>
</section>
+ <section id="conf.connectionfactory.attributes.minTimeoutProcessTime">
+ <title>MinTimeoutProcessTime</title>
+
+ <para>Minimum processing time allowed for (milliseconds) in a timeout receive. This is the minimum time for the internal processing after a message is arrived at the client
+ but before returning the message to the application. Default is 300. </para>
+
+ </section>
+
<!-- End conf.connectionfactory.attributes -->
</section>
Modified: branches/Branch_1_4/integration/EAP4/etc/aop-messaging-client.xml
===================================================================
--- branches/Branch_1_4/integration/EAP4/etc/aop-messaging-client.xml 2011-06-20 12:58:54 UTC (rev 8355)
+++ branches/Branch_1_4/integration/EAP4/etc/aop-messaging-client.xml 2011-06-21 02:45:54 UTC (rev 8356)
@@ -136,6 +136,9 @@
<bind pointcut="execution(* org.jboss.jms.client.delegate.ClientSessionDelegate->postDeliver(..))">
<advice name="handlePostDeliver" aspect="org.jboss.jms.client.container.SessionAspect"/>
</bind>
+ <bind pointcut="execution(* org.jboss.jms.client.delegate.ClientSessionDelegate->processMessageTimeout(..))">
+ <advice name="handleProcessMessageTimeout" aspect="org.jboss.jms.client.container.SessionAspect"/>
+ </bind>
<bind pointcut="execution(* org.jboss.jms.client.delegate.ClientSessionDelegate->acknowledgeAll(..))">
<advice name="handleAcknowledgeAll" aspect="org.jboss.jms.client.container.SessionAspect"/>
</bind>
@@ -283,4 +286,4 @@
<bind pointcut="execution(* org.jboss.jms.client.delegate.ClientBrowserDelegate->hasNextMessage())">
<advice name="handleHasNextMessage" aspect="org.jboss.jms.client.container.BrowserAspect"/>
</bind>
-</aop>
\ No newline at end of file
+</aop>
Modified: branches/Branch_1_4/integration/EAP4/etc/xmdesc/ConnectionFactory-xmbean.xml
===================================================================
--- branches/Branch_1_4/integration/EAP4/etc/xmdesc/ConnectionFactory-xmbean.xml 2011-06-20 12:58:54 UTC (rev 8355)
+++ branches/Branch_1_4/integration/EAP4/etc/xmdesc/ConnectionFactory-xmbean.xml 2011-06-21 02:45:54 UTC (rev 8356)
@@ -159,6 +159,12 @@
<name>RetryChangeRateInterval</name>
<type>long</type>
</attribute>
+
+ <attribute access="read-write" getMethod="getMinTimeoutProcessTime" setMethod="setMinTimeoutProcessTime">
+ <description>Minimum processing time allowed for (milliseconds) in a timeout receive. Default is 300.</description>
+ <name>MinTimeoutProcessTime</name>
+ <type>long</type>
+ </attribute>
<!-- Managed operations -->
Modified: branches/Branch_1_4/integration/EAP5/etc/aop-messaging-client.xml
===================================================================
--- branches/Branch_1_4/integration/EAP5/etc/aop-messaging-client.xml 2011-06-20 12:58:54 UTC (rev 8355)
+++ branches/Branch_1_4/integration/EAP5/etc/aop-messaging-client.xml 2011-06-21 02:45:54 UTC (rev 8356)
@@ -136,6 +136,9 @@
<bind pointcut="execution(* org.jboss.jms.client.delegate.ClientSessionDelegate->postDeliver(..))">
<advice name="handlePostDeliver" aspect="org.jboss.jms.client.container.SessionAspect"/>
</bind>
+ <bind pointcut="execution(* org.jboss.jms.client.delegate.ClientSessionDelegate->processMessageTimeout(..))">
+ <advice name="handleProcessMessageTimeout" aspect="org.jboss.jms.client.container.SessionAspect"/>
+ </bind>
<bind pointcut="execution(* org.jboss.jms.client.delegate.ClientSessionDelegate->acknowledgeAll(..))">
<advice name="handleAcknowledgeAll" aspect="org.jboss.jms.client.container.SessionAspect"/>
</bind>
@@ -283,4 +286,4 @@
<bind pointcut="execution(* org.jboss.jms.client.delegate.ClientBrowserDelegate->hasNextMessage())">
<advice name="handleHasNextMessage" aspect="org.jboss.jms.client.container.BrowserAspect"/>
</bind>
-</aop>
\ No newline at end of file
+</aop>
Modified: branches/Branch_1_4/integration/EAP5/etc/xmdesc/ConnectionFactory-xmbean.xml
===================================================================
--- branches/Branch_1_4/integration/EAP5/etc/xmdesc/ConnectionFactory-xmbean.xml 2011-06-20 12:58:54 UTC (rev 8355)
+++ branches/Branch_1_4/integration/EAP5/etc/xmdesc/ConnectionFactory-xmbean.xml 2011-06-21 02:45:54 UTC (rev 8356)
@@ -159,6 +159,12 @@
<name>RetryChangeRateInterval</name>
<type>long</type>
</attribute>
+
+ <attribute access="read-write" getMethod="getMinTimeoutProcessTime" setMethod="setMinTimeoutProcessTime">
+ <description>Minimum processing time allowed for (milliseconds) in a timeout receive. Default is 300.</description>
+ <name>MinTimeoutProcessTime</name>
+ <type>long</type>
+ </attribute>
<!-- Managed operations -->
Modified: branches/Branch_1_4/src/main/org/jboss/jms/client/container/ClientConsumer.java
===================================================================
--- branches/Branch_1_4/src/main/org/jboss/jms/client/container/ClientConsumer.java 2011-06-20 12:58:54 UTC (rev 8355)
+++ branches/Branch_1_4/src/main/org/jboss/jms/client/container/ClientConsumer.java 2011-06-21 02:45:54 UTC (rev 8356)
@@ -315,6 +315,9 @@
private Object messageSource;
private boolean isClustered = false;
+ //JBMESSAGING-1876
+ private long minTimeoutProcessTime;
+
public int getBufferSize()
{
return buffer.size();
@@ -331,7 +334,8 @@
long maxRetryChangeRate,
long retryChangeRateInterval,
boolean isClustered,
- CallbackManager cbManager)
+ CallbackManager cbManager,
+ long minTimeoutProcessTime)
{
if (bufferSize < 1)
{
@@ -359,6 +363,7 @@
consumerLock = new Object();
}
messageSource = cbManager;
+ this.minTimeoutProcessTime = minTimeoutProcessTime;
}
// Public ---------------------------------------------------------------------------------------
@@ -627,15 +632,17 @@
long tmUsed = System.currentTimeMillis() - startTimestamp;
- if (tmUsed >= timeout)
+ long timeDelta = timeout - tmUsed;
+ long timeLeft = timeDelta <= minTimeoutProcessTime ? minTimeoutProcessTime : timeDelta ;
+
+ if (trace)
{
- log.warn("Timed out before post message processing, discarding message " + m);
- throw new JMSException("Timed out before post message processing, discarding message " + m);
+ log.trace("Time left: " + timeLeft + " timeout " + timeout + " tmUsed " + tmUsed);
}
-
+
try
{
- ignore = f.get(timeout - tmUsed, TimeUnit.MILLISECONDS);
+ ignore = f.get(timeLeft, TimeUnit.MILLISECONDS);
}
catch (InterruptedException e)
{
@@ -652,8 +659,9 @@
}
catch (TimeoutException e)
{
- log.warn("Timed out waiting for post message processing, discarding message " + m);
- throw new JMSException("Timed out waiting for post message processing, discarding message " + m);
+ log.warn("Timed out waiting for post message processing " + m + " within time " + timeLeft);
+ ignore = false;
+ sessionDelegate.processMessageTimeout();
}
}
Modified: branches/Branch_1_4/src/main/org/jboss/jms/client/container/ConsumerAspect.java
===================================================================
--- branches/Branch_1_4/src/main/org/jboss/jms/client/container/ConsumerAspect.java 2011-06-20 12:58:54 UTC (rev 8355)
+++ branches/Branch_1_4/src/main/org/jboss/jms/client/container/ConsumerAspect.java 2011-06-21 02:45:54 UTC (rev 8356)
@@ -120,7 +120,7 @@
prefetchSize, executor, maxDeliveries, consumerState.isShouldAck(),
redeliveryDelay, consumerState.getMaxRetryChangeRate(),
consumerState.getRetryChangeRateInterval(),
- fcc != null, cm);
+ fcc != null, cm, consumerState.getMinTimeoutProcessTime());
sessionState.addCallbackHandler(messageHandler);
Modified: branches/Branch_1_4/src/main/org/jboss/jms/client/container/SessionAspect.java
===================================================================
--- branches/Branch_1_4/src/main/org/jboss/jms/client/container/SessionAspect.java 2011-06-20 12:58:54 UTC (rev 8355)
+++ branches/Branch_1_4/src/main/org/jboss/jms/client/container/SessionAspect.java 2011-06-21 02:45:54 UTC (rev 8356)
@@ -409,6 +409,59 @@
}
}
+ //JBMESSAGING-1876
+ public Object handleProcessMessageTimeout(Invocation invocation) throws Throwable
+ {
+ MethodInvocation mi = (MethodInvocation)invocation;
+ SessionState state = getState(invocation);
+
+ boolean result = true;
+
+ synchronized (state)
+ {
+
+ int ackMode = state.getAcknowledgeMode();
+
+ if (ackMode == Session.CLIENT_ACKNOWLEDGE)
+ {
+ if (trace)
+ {
+ log.trace("Don't do anything about CLIENT_ACK mode");
+ }
+ }
+ else if (ackMode == Session.AUTO_ACKNOWLEDGE)
+ {
+ if (trace)
+ {
+ log.trace("Don't do anything about AUTO_ACK mode");
+ }
+ }
+ else if (ackMode == Session.DUPS_OK_ACKNOWLEDGE)
+ {
+ if (trace)
+ {
+ log.trace("Don't do anything about DUPS_OK_ACK mode");
+ }
+ }
+ else
+ {
+ Object txID = state.getCurrentTxId();
+
+ if (txID != null)
+ {
+
+ if (trace) { log.trace("Marking tx " + txID + " to be rollback only"); }
+
+ ConnectionState connState = (ConnectionState)state.getParent();
+
+ connState.getResourceManager().markTxRollbackOnly(txID);
+ }
+ }
+ }
+
+ return null;
+ }
+
/**
* Used for client acknowledge.
*/
Modified: branches/Branch_1_4/src/main/org/jboss/jms/client/container/StateCreationAspect.java
===================================================================
--- branches/Branch_1_4/src/main/org/jboss/jms/client/container/StateCreationAspect.java 2011-06-20 12:58:54 UTC (rev 8355)
+++ branches/Branch_1_4/src/main/org/jboss/jms/client/container/StateCreationAspect.java 2011-06-21 02:45:54 UTC (rev 8356)
@@ -100,7 +100,8 @@
new ConnectionState(serverID, connectionDelegate,
remotingConnection, versionToUse,
connectionDelegate.isEnableOrderingGroup(), connectionDelegate.getDefaultOrderingGroupName(),
- connectionDelegate.getMaxRetryChangeRate(), connectionDelegate.getRetryChangeRateInterval());
+ connectionDelegate.getMaxRetryChangeRate(), connectionDelegate.getRetryChangeRateInterval(),
+ connectionDelegate.getMinTimeoutProcessTime());
remotingConnection.getConnectionListener().setConnectionState(connectionState);
remotingConnection.getConnectionListener().start();
Modified: branches/Branch_1_4/src/main/org/jboss/jms/client/delegate/ClientConnectionDelegate.java
===================================================================
--- branches/Branch_1_4/src/main/org/jboss/jms/client/delegate/ClientConnectionDelegate.java 2011-06-20 12:58:54 UTC (rev 8355)
+++ branches/Branch_1_4/src/main/org/jboss/jms/client/delegate/ClientConnectionDelegate.java 2011-06-21 02:45:54 UTC (rev 8356)
@@ -90,6 +90,8 @@
private long maxRetryChangeRate;
private long retryChangeRateInterval;
+
+ private long minTimeoutProcessTime;
// Static ---------------------------------------------------------------------------------------
@@ -396,6 +398,16 @@
{
this.retryChangeRateInterval = retryChangeRateInterval;
}
+
+ public long getMinTimeoutProcessTime()
+ {
+ return minTimeoutProcessTime;
+ }
+
+ public void setMinTimeoutProcessTime(long minTimeoutProcessTime)
+ {
+ this.minTimeoutProcessTime = minTimeoutProcessTime;
+ }
// Package Private ------------------------------------------------------------------------------
Modified: branches/Branch_1_4/src/main/org/jboss/jms/client/delegate/ClientConnectionFactoryDelegate.java
===================================================================
--- branches/Branch_1_4/src/main/org/jboss/jms/client/delegate/ClientConnectionFactoryDelegate.java 2011-06-20 12:58:54 UTC (rev 8355)
+++ branches/Branch_1_4/src/main/org/jboss/jms/client/delegate/ClientConnectionFactoryDelegate.java 2011-06-21 02:45:54 UTC (rev 8356)
@@ -88,6 +88,8 @@
private long retryChangeRateInterval;
+ private long minTimeoutProcessTime;
+
// Static ---------------------------------------------------------------------------------------
/*
@@ -119,7 +121,7 @@
public ClientConnectionFactoryDelegate(String uniqueName, String objectID, int serverID, String serverLocatorURI,
Version serverVersion, boolean clientPing, boolean strictTck,
boolean sendAcksAsync, boolean enableOrderingGroup, String defaultOrderingGroupName,
- long maxRetryChangeRate, long retryChangeRateInterval)
+ long maxRetryChangeRate, long retryChangeRateInterval, long minTimeoutProcessTime)
{
super(objectID);
@@ -134,6 +136,7 @@
this.setDefaultOrderingGroupName(defaultOrderingGroupName);
this.maxRetryChangeRate = maxRetryChangeRate;
this.retryChangeRateInterval = retryChangeRateInterval;
+ this.minTimeoutProcessTime = minTimeoutProcessTime;
}
public ClientConnectionFactoryDelegate()
@@ -220,6 +223,8 @@
connectionDelegate.setMaxRetryChangeRate(this.maxRetryChangeRate);
connectionDelegate.setRetryChangeRateInterval(this.retryChangeRateInterval);
+
+ connectionDelegate.setMinTimeoutProcessTime(this.minTimeoutProcessTime);
}
else
{
@@ -387,6 +392,8 @@
maxRetryChangeRate = in.readLong();
retryChangeRateInterval = in.readLong();
+
+ minTimeoutProcessTime = in.readLong();
}
public void write(DataOutputStream out) throws Exception
@@ -408,6 +415,8 @@
out.writeLong(this.maxRetryChangeRate);
out.writeLong(this.retryChangeRateInterval);
+
+ out.writeLong(this.minTimeoutProcessTime);
}
/**
Modified: branches/Branch_1_4/src/main/org/jboss/jms/client/delegate/ClientSessionDelegate.java
===================================================================
--- branches/Branch_1_4/src/main/org/jboss/jms/client/delegate/ClientSessionDelegate.java 2011-06-20 12:58:54 UTC (rev 8355)
+++ branches/Branch_1_4/src/main/org/jboss/jms/client/delegate/ClientSessionDelegate.java 2011-06-21 02:45:54 UTC (rev 8356)
@@ -549,6 +549,11 @@
return "SessionDelegate[" + System.identityHashCode(this) + ", ID=" + id + "]";
}
+ public void processMessageTimeout() throws JMSException
+ {
+ throw new IllegalStateException("This invocation should not be handled here!");
+ }
+
// Protected ------------------------------------------------------------------------------------
// Package Private ------------------------------------------------------------------------------
Modified: branches/Branch_1_4/src/main/org/jboss/jms/client/state/ConnectionState.java
===================================================================
--- branches/Branch_1_4/src/main/org/jboss/jms/client/state/ConnectionState.java 2011-06-20 12:58:54 UTC (rev 8355)
+++ branches/Branch_1_4/src/main/org/jboss/jms/client/state/ConnectionState.java 2011-06-21 02:45:54 UTC (rev 8356)
@@ -96,13 +96,15 @@
private long maxRetryChangeRate;
private long retryChangeRateInterval;
+
+ private long minTimeoutProcessTime;
// Constructors ---------------------------------------------------------------------------------
public ConnectionState(int serverID, ConnectionDelegate delegate,
JMSRemotingConnection remotingConnection,
Version versionToUse, boolean enableOrderingGroup, String defaultOrderingGroupName,
- long maxRetryChangeRate, long retryChangeRateInterval)
+ long maxRetryChangeRate, long retryChangeRateInterval, long minTimeoutProcessTime)
throws Exception
{
super(null, (DelegateSupport)delegate);
@@ -128,6 +130,8 @@
this.maxRetryChangeRate = maxRetryChangeRate;
this.retryChangeRateInterval = retryChangeRateInterval;
+
+ this.minTimeoutProcessTime = minTimeoutProcessTime;
}
// HierarchicalState implementation -------------------------------------------------------------
@@ -338,6 +342,11 @@
}
+ public long getMinTimeoutProcessTime()
+ {
+ return minTimeoutProcessTime;
+ }
+
// Package protected ----------------------------------------------------------------------------
// Protected ------------------------------------------------------------------------------------
Modified: branches/Branch_1_4/src/main/org/jboss/jms/client/state/ConsumerState.java
===================================================================
--- branches/Branch_1_4/src/main/org/jboss/jms/client/state/ConsumerState.java 2011-06-20 12:58:54 UTC (rev 8355)
+++ branches/Branch_1_4/src/main/org/jboss/jms/client/state/ConsumerState.java 2011-06-21 02:45:54 UTC (rev 8356)
@@ -69,6 +69,8 @@
private long maxRetryChangeRate;
private long retryChangeRateInterval;
+ private long minTimeoutProcessTime;
+
// Constructors ---------------------------------------------------------------------------------
public ConsumerState(SessionState parent, ConsumerDelegate delegate, JBossDestination dest,
@@ -88,6 +90,7 @@
this.redeliveryDelay = redeliveryDelay;
this.maxRetryChangeRate = parent.getMaxRetryChangeRate();
this.retryChangeRateInterval = parent.getRetryChangeRateInterval();
+ this.minTimeoutProcessTime = parent.getMinTimeoutProcessTime();
//We don't store deliveries if this a non durable subscriber
@@ -241,6 +244,11 @@
clientConsumer.abortReceive();
}
+ public long getMinTimeoutProcessTime()
+ {
+ return this.minTimeoutProcessTime;
+ }
+
// Package protected ----------------------------------------------------------------------------
// Protected ------------------------------------------------------------------------------------
Modified: branches/Branch_1_4/src/main/org/jboss/jms/client/state/SessionState.java
===================================================================
--- branches/Branch_1_4/src/main/org/jboss/jms/client/state/SessionState.java 2011-06-20 12:58:54 UTC (rev 8355)
+++ branches/Branch_1_4/src/main/org/jboss/jms/client/state/SessionState.java 2011-06-21 02:45:54 UTC (rev 8356)
@@ -646,5 +646,10 @@
}
}
+ public long getMinTimeoutProcessTime()
+ {
+ return parent.getMinTimeoutProcessTime();
+ }
+
}
Modified: branches/Branch_1_4/src/main/org/jboss/jms/delegate/SessionDelegate.java
===================================================================
--- branches/Branch_1_4/src/main/org/jboss/jms/delegate/SessionDelegate.java 2011-06-20 12:58:54 UTC (rev 8355)
+++ branches/Branch_1_4/src/main/org/jboss/jms/delegate/SessionDelegate.java 2011-06-21 02:45:54 UTC (rev 8356)
@@ -95,4 +95,6 @@
ProducerDelegate createProducerDelegate(JBossDestination destination) throws JMSException;
void acknowledgeAll() throws JMSException;
+
+ void processMessageTimeout() throws JMSException;
}
Modified: branches/Branch_1_4/src/main/org/jboss/jms/server/ConnectionFactoryManager.java
===================================================================
--- branches/Branch_1_4/src/main/org/jboss/jms/server/ConnectionFactoryManager.java 2011-06-20 12:58:54 UTC (rev 8355)
+++ branches/Branch_1_4/src/main/org/jboss/jms/server/ConnectionFactoryManager.java 2011-06-21 02:45:54 UTC (rev 8356)
@@ -55,7 +55,8 @@
boolean enableOrderingGroup,
String defaultOrderingGroupName,
long maxRetryChangeRate,
- long retryChangeRateInterval) throws Exception;
+ long retryChangeRateInterval,
+ long minTimeoutProcessTime) throws Exception;
void unregisterConnectionFactory(String uniqueName, boolean supportsFailover, boolean supportsLoadBalancing) throws Exception;
}
Modified: branches/Branch_1_4/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactory.java
===================================================================
--- branches/Branch_1_4/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactory.java 2011-06-20 12:58:54 UTC (rev 8355)
+++ branches/Branch_1_4/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactory.java 2011-06-21 02:45:54 UTC (rev 8356)
@@ -92,6 +92,8 @@
private long maxRetryChangeRate = 0;
private long retryChangeRateInterval = 5000;
+
+ private long minTimeoutProcessTime = 300;
// Constructors ---------------------------------------------------------------------------------
@@ -213,7 +215,7 @@
defaultTempQueueFullSize, defaultTempQueuePageSize,
defaultTempQueueDownCacheSize, dupsOKBatchSize, supportsFailover, supportsLoadBalancing,
loadBalancingFactory, strictTck, sendAcksAsync, enableOrderingGroup, defaultOrderingGroupName,
- maxRetryChangeRate, retryChangeRateInterval);
+ maxRetryChangeRate, retryChangeRateInterval, minTimeoutProcessTime);
String info = "Connector " + locator.getProtocol() + "://" +
locator.getHost() + ":" + locator.getPort();
@@ -514,6 +516,16 @@
this.retryChangeRateInterval = retryChangeRateInterval;
}
+ public long getMinTimeoutProcessTime()
+ {
+ return minTimeoutProcessTime;
+ }
+
+ public void setMinTimeoutProcessTime(long minTimeoutProcessTime)
+ {
+ this.minTimeoutProcessTime = minTimeoutProcessTime;
+ }
+
// JMX managed operations -----------------------------------------------------------------------
// Public ---------------------------------------------------------------------------------------
Modified: branches/Branch_1_4/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactoryJNDIMapper.java
===================================================================
--- branches/Branch_1_4/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactoryJNDIMapper.java 2011-06-20 12:58:54 UTC (rev 8355)
+++ branches/Branch_1_4/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactoryJNDIMapper.java 2011-06-21 02:45:54 UTC (rev 8356)
@@ -133,7 +133,8 @@
boolean enableOrderingGroup,
String defaultOrderingGroupName,
long maxRetryChangeRate,
- long retryChangeRateInterval)
+ long retryChangeRateInterval,
+ long minTimeoutProcessTime)
throws Exception
{
log.debug(this + " registering connection factory '" + uniqueName + "', bindings: " + jndiBindings);
@@ -189,7 +190,7 @@
new ClientConnectionFactoryDelegate(uniqueName, id, serverPeer.getServerPeerID(),
locatorURI, version, clientPing, useStrict,
sendAcksAsync, enableOrderingGroup, defaultOrderingGroupName,
- maxRetryChangeRate, retryChangeRateInterval);
+ maxRetryChangeRate, retryChangeRateInterval, minTimeoutProcessTime);
log.debug(this + " created local delegate " + localDelegate);
Modified: branches/Branch_1_4/src/main/org/jboss/jms/tx/ClientTransaction.java
===================================================================
--- branches/Branch_1_4/src/main/org/jboss/jms/tx/ClientTransaction.java 2011-06-20 12:58:54 UTC (rev 8355)
+++ branches/Branch_1_4/src/main/org/jboss/jms/tx/ClientTransaction.java 2011-06-21 02:45:54 UTC (rev 8356)
@@ -65,6 +65,8 @@
private boolean clientSide;
private boolean recovered;
+
+ private boolean rollbackOnly = false;
// Static --------------------------------------------------------
@@ -502,4 +504,14 @@
}
+ public void markRollbackOnly()
+ {
+ this.rollbackOnly = true;
+ }
+
+ public boolean isRollbackOnly()
+ {
+ return this.rollbackOnly;
+ }
+
}
Modified: branches/Branch_1_4/src/main/org/jboss/jms/tx/ResourceManager.java
===================================================================
--- branches/Branch_1_4/src/main/org/jboss/jms/tx/ResourceManager.java 2011-06-20 12:58:54 UTC (rev 8355)
+++ branches/Branch_1_4/src/main/org/jboss/jms/tx/ResourceManager.java 2011-06-21 02:45:54 UTC (rev 8356)
@@ -197,6 +197,13 @@
{
throw new IllegalStateException("Cannot find transaction " + xid);
}
+
+ //if tx is mark rollback, roll back immediately
+ if (tx.isRollbackOnly())
+ {
+ this.rollbackLocal(xid, tx);
+ throw new MessagingTransactionRolledBackException("Rolled back " + tx + " as it is marked rollback only!");
+ }
TransactionRequest request =
new TransactionRequest(TransactionRequest.ONE_PHASE_COMMIT_REQUEST, null, tx);
@@ -239,6 +246,16 @@
throw new IllegalStateException("Cannot find transaction with xid:" + xid);
}
+ this.rollbackLocal(xid, ts);
+ }
+
+ private void rollbackLocal(Object xid, ClientTransaction ts) throws JMSException
+ {
+ if (ts == null)
+ {
+ throw new IllegalStateException("Cannot find transaction with xid:" + xid);
+ }
+
// don't need messages for rollback
// We don't clear the acks since we need to redeliver locally
ts.clearMessages();
@@ -329,7 +346,13 @@
if (state == null)
{
throw new MessagingXAException(XAException.XAER_NOTA, "Cannot find transaction with xid:" + xid);
- }
+ }
+
+ if (state.isRollbackOnly())
+ {
+ this.rollback(xid, state, connection);
+ throw new MessagingXAException(XAException.XA_RBROLLBACK, "Transaction marked rollback only, xid: " + xid);
+ }
TransactionRequest request =
new TransactionRequest(TransactionRequest.TWO_PHASE_PREPARE_REQUEST, xid, state);
@@ -350,6 +373,13 @@
ClientTransaction tx = removeTxInternal(xid);
if (trace) { log.trace("got tx: " + tx + " state " + tx.getState()); }
+
+ //roll back for onePhase only. for 2pc, rollback only is processed in prepare
+ if ((tx != null) && tx.isRollbackOnly() && onePhase)
+ {
+ this.rollback(xid, tx, connection);
+ throw new MessagingXAException(XAException.XA_RBROLLBACK, "Transaction marked rollback only, xid: " + xid);
+ }
if (onePhase)
{
@@ -407,6 +437,16 @@
throw new java.lang.IllegalStateException("Cannot find xid to remove " + xid);
}
+ this.rollback(xid, tx, connection);
+ }
+
+ private void rollback(Xid xid, ClientTransaction tx, ConnectionDelegate connection) throws XAException
+ {
+ if (tx == null)
+ {
+ throw new java.lang.IllegalStateException("Cannot find xid to remove " + xid);
+ }
+
//It's possible we don't actually have the prepared tx here locally - this
//may happen if we have recovered from failure and the transaction manager
//is calling rollback on the transaction as part of the recovery process.
@@ -414,12 +454,9 @@
TransactionRequest request = null;
//don't need the messages
- if (tx != null)
- {
- tx.clearMessages();
- }
+ tx.clearMessages();
- if ((tx == null) || tx.getState() == ClientTransaction.TX_PREPARED)
+ if (tx.getState() == ClientTransaction.TX_PREPARED)
{
//2PC rollback
@@ -432,13 +469,9 @@
else
{
//For one phase rollback there is nothing to do on the server
-
- if (tx == null)
- {
- throw new MessagingXAException(XAException.XAER_NOTA, "Cannot find transaction with xid:" + xid);
- }
+ if (trace) { log.trace("It is a one phase roll back, tx: " + tx); }
}
-
+
//we redeliver the messages
//locally to their original consumers if they are still open or cancel them to the server
//if the original consumers have closed
@@ -447,20 +480,14 @@
try
{
- if (tx != null)
- {
- redeliverMessages(tx);
-
- tx.setState(ClientTransaction.TX_ROLLEDBACK);
- }
-
+ redeliverMessages(tx);
+ tx.setState(ClientTransaction.TX_ROLLEDBACK);
}
catch (JMSException e)
{
log.error("Failed to redeliver", e);
}
- }
-
+ }
Xid joinTx(Xid xid) throws XAException
{
@@ -717,6 +744,21 @@
}
}
}
+
+ /**
+ * @param txID
+ */
+ public void markTxRollbackOnly(Object txID) throws JMSException
+ {
+ ClientTransaction tx = getTxInternal(txID);
+
+ if (tx == null)
+ {
+ throw new JMSException("There is no transaction with id " + txID);
+ }
+
+ tx.markRollbackOnly();
+ }
// Inner Classes --------------------------------------------------------------------------------
Modified: branches/Branch_1_4/tests/src/org/jboss/test/messaging/jms/ConnectionFactoryTest.java
===================================================================
--- branches/Branch_1_4/tests/src/org/jboss/test/messaging/jms/ConnectionFactoryTest.java 2011-06-20 12:58:54 UTC (rev 8355)
+++ branches/Branch_1_4/tests/src/org/jboss/test/messaging/jms/ConnectionFactoryTest.java 2011-06-21 02:45:54 UTC (rev 8356)
@@ -969,6 +969,109 @@
}
}
+ //https://issues.jboss.org/browse/JBMESSAGING-1876
+ public void testMinTimeoutProcessTimeConfigSettings() throws Exception
+ {
+ Connection c = null;
+
+ try
+ {
+ String mbeanConfig = "<mbean code=\"org.jboss.jms.server.connectionfactory.ConnectionFactory\"\n" + " name=\"jboss.messaging.connectionfactory:service=TestTimeoutProcessConfigSettingsFactory\"\n"
+ + " xmbean-dd=\"xmdesc/ConnectionFactory-xmbean.xml\">\n"
+ + " <depends optional-attribute-name=\"ServerPeer\">jboss.messaging:service=ServerPeer</depends>\n"
+ + " <depends optional-attribute-name=\"Connector\">jboss.messaging:service=Connector,transport=bisocket</depends>\n"
+ + " <attribute name=\"JNDIBindings\">\n"
+ + " <bindings>\n"
+ + " <binding>/TestTimeoutProcessConfigSettingsFactory</binding>\n"
+ + " </bindings>\n"
+ + " </attribute>\n"
+ + " <attribute name=\"MinTimeoutProcessTime\">2345</attribute>\n"
+ + " </mbean>";
+
+ ObjectName on = ServerManagement.deploy(mbeanConfig);
+ ServerManagement.invoke(on, "create", new Object[0], new String[0]);
+ ServerManagement.invoke(on, "start", new Object[0], new String[0]);
+
+ ConnectionFactory cf = (ConnectionFactory)ic.lookup("/TestTimeoutProcessConfigSettingsFactory");
+ c = cf.createConnection();
+
+ ClientConnectionDelegate del1 = (ClientConnectionDelegate)((JBossConnection)c).getDelegate();
+
+ ConnectionState state1 = (ConnectionState)del1.getState();
+
+ long minTime = state1.getMinTimeoutProcessTime();
+
+ assertEquals(2345, minTime);
+ }
+ finally
+ {
+ try
+ {
+ if (c != null)
+ {
+ log.info("Closing connection");
+ c.close();
+ log.info("Closed connection");
+ }
+ }
+ catch (Exception e)
+ {
+ log.warn(e.toString(), e);
+ }
+ }
+ }
+
+ //https://issues.jboss.org/browse/JBMESSAGING-1876
+ public void testMinTimeoutProcessTimeDefaults() throws Exception
+ {
+ Connection c = null;
+
+ try
+ {
+ String mbeanConfig = "<mbean code=\"org.jboss.jms.server.connectionfactory.ConnectionFactory\"\n" + " name=\"jboss.messaging.connectionfactory:service=TestTimeoutProcessConfigDefaultSettingsFactory\"\n"
+ + " xmbean-dd=\"xmdesc/ConnectionFactory-xmbean.xml\">\n"
+ + " <depends optional-attribute-name=\"ServerPeer\">jboss.messaging:service=ServerPeer</depends>\n"
+ + " <depends optional-attribute-name=\"Connector\">jboss.messaging:service=Connector,transport=bisocket</depends>\n"
+ + " <attribute name=\"JNDIBindings\">\n"
+ + " <bindings>\n"
+ + " <binding>/TestTimeoutProcessConfigDefaultSettingsFactory</binding>\n"
+ + " </bindings>\n"
+ + " </attribute>\n"
+ + " </mbean>";
+
+ ObjectName on = ServerManagement.deploy(mbeanConfig);
+ ServerManagement.invoke(on, "create", new Object[0], new String[0]);
+ ServerManagement.invoke(on, "start", new Object[0], new String[0]);
+
+ ConnectionFactory cf = (ConnectionFactory)ic.lookup("/TestTimeoutProcessConfigDefaultSettingsFactory");
+ c = cf.createConnection();
+
+ ClientConnectionDelegate del1 = (ClientConnectionDelegate)((JBossConnection)c).getDelegate();
+
+ ConnectionState state1 = (ConnectionState)del1.getState();
+
+ long minTime = state1.getMinTimeoutProcessTime();
+
+ assertEquals(300, minTime);
+ }
+ finally
+ {
+ try
+ {
+ if (c != null)
+ {
+ log.info("Closing connection");
+ c.close();
+ log.info("Closed connection");
+ }
+ }
+ catch (Exception e)
+ {
+ log.warn(e.toString(), e);
+ }
+ }
+ }
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
Modified: branches/Branch_1_4/tests/src/org/jboss/test/messaging/jms/clustering/ClusteredConnectionFactoryTest.java
===================================================================
--- branches/Branch_1_4/tests/src/org/jboss/test/messaging/jms/clustering/ClusteredConnectionFactoryTest.java 2011-06-20 12:58:54 UTC (rev 8355)
+++ branches/Branch_1_4/tests/src/org/jboss/test/messaging/jms/clustering/ClusteredConnectionFactoryTest.java 2011-06-21 02:45:54 UTC (rev 8356)
@@ -397,6 +397,169 @@
}
}
+ //JBMESSAGING-1876
+ public void testMinTimeoutProcessTimeSettings() throws Exception
+ {
+ Connection c = null;
+
+ try
+ {
+ String mbeanConfig = "<mbean code=\"org.jboss.jms.server.connectionfactory.ConnectionFactory\"\n" + " name=\"jboss.messaging.connectionfactory:service=TestChangeRateConfigSettingsFactory\"\n"
+ + " xmbean-dd=\"xmdesc/ConnectionFactory-xmbean.xml\">\n"
+ + " <depends optional-attribute-name=\"ServerPeer\">jboss.messaging:service=ServerPeer</depends>\n"
+ + " <depends optional-attribute-name=\"Connector\">jboss.messaging:service=Connector,transport=bisocket</depends>\n"
+ + " <attribute name=\"JNDIBindings\">\n"
+ + " <bindings>\n"
+ + " <binding>/ClusteredTestMinTimeoutProcessTimeConfigSettingsFactory</binding>\n"
+ + " </bindings>\n"
+ + " </attribute>\n"
+ + " <attribute name=\"SupportsFailover\">true</attribute>"
+ + " <attribute name=\"SupportsLoadBalancing\">true</attribute>"
+ + " <attribute name=\"MinTimeoutProcessTime\">100</attribute>\n"
+ + " </mbean>";
+
+ ObjectName on = ServerManagement.deploy(mbeanConfig);
+ ServerManagement.invoke(on, "create", new Object[0], new String[0]);
+ ServerManagement.invoke(on, "start", new Object[0], new String[0]);
+
+ ConnectionFactory cf = (ConnectionFactory)ic[0].lookup("/ClusteredTestMinTimeoutProcessTimeConfigSettingsFactory");
+ c = cf.createConnection();
+
+ ClientConnectionDelegate del1 = (ClientConnectionDelegate)((JBossConnection)c).getDelegate();
+
+ ConnectionState state1 = (ConnectionState)del1.getState();
+
+ long minTime = state1.getMinTimeoutProcessTime();
+
+ assertEquals(100, minTime);
+ }
+ finally
+ {
+ try
+ {
+ if (c != null)
+ {
+ log.info("Closing connection");
+ c.close();
+ log.info("Closed connection");
+ }
+ }
+ catch (Exception e)
+ {
+ log.warn(e.toString(), e);
+ }
+ }
+ }
+
+ //https://issues.jboss.org/browse/JBMESSAGING-1876
+ public void testMinTimeoutProcessTimeSettings2() throws Exception
+ {
+ Connection c = null;
+
+ try
+ {
+ String mbeanConfig = "<mbean code=\"org.jboss.jms.server.connectionfactory.ConnectionFactory\"\n" + " name=\"jboss.messaging.connectionfactory:service=TestChangeRateConfigSettingsFactory\"\n"
+ + " xmbean-dd=\"xmdesc/ConnectionFactory-xmbean.xml\">\n"
+ + " <depends optional-attribute-name=\"ServerPeer\">jboss.messaging:service=ServerPeer</depends>\n"
+ + " <depends optional-attribute-name=\"Connector\">jboss.messaging:service=Connector,transport=bisocket</depends>\n"
+ + " <attribute name=\"JNDIBindings\">\n"
+ + " <bindings>\n"
+ + " <binding>/ClusteredTestMinTimeoutProcessTimeConfigSettingsFactory</binding>\n"
+ + " </bindings>\n"
+ + " </attribute>\n"
+ + " <attribute name=\"SupportsFailover\">false</attribute>"
+ + " <attribute name=\"SupportsLoadBalancing\">true</attribute>"
+ + " <attribute name=\"MinTimeoutProcessTime\">100</attribute>\n"
+ + " <attribute name=\"MaxRetryChangeRate\">10</attribute>\n"
+ + " <attribute name=\"RetryChangeRateInterval\">2345</attribute>\n"
+ + " </mbean>";
+
+ ObjectName on = ServerManagement.deploy(mbeanConfig);
+ ServerManagement.invoke(on, "create", new Object[0], new String[0]);
+ ServerManagement.invoke(on, "start", new Object[0], new String[0]);
+
+ ConnectionFactory cf = (ConnectionFactory)ic[0].lookup("/ClusteredTestMinTimeoutProcessTimeConfigSettingsFactory");
+ c = cf.createConnection();
+
+ ClientConnectionDelegate del1 = (ClientConnectionDelegate)((JBossConnection)c).getDelegate();
+
+ ConnectionState state1 = (ConnectionState)del1.getState();
+
+ long minTime = state1.getMinTimeoutProcessTime();
+
+ assertEquals(100, minTime);
+ }
+ finally
+ {
+ try
+ {
+ if (c != null)
+ {
+ log.info("Closing connection");
+ c.close();
+ log.info("Closed connection");
+ }
+ }
+ catch (Exception e)
+ {
+ log.warn(e.toString(), e);
+ }
+ }
+ }
+
+ //https://issues.jboss.org/browse/JBMESSAGING-1876
+ public void testMinTimeoutProcessTimeDefaults() throws Exception
+ {
+ Connection c = null;
+
+ try
+ {
+ String mbeanConfig = "<mbean code=\"org.jboss.jms.server.connectionfactory.ConnectionFactory\"\n" + " name=\"jboss.messaging.connectionfactory:service=TestChangeRateConfigDefaultsFactory\"\n"
+ + " xmbean-dd=\"xmdesc/ConnectionFactory-xmbean.xml\">\n"
+ + " <depends optional-attribute-name=\"ServerPeer\">jboss.messaging:service=ServerPeer</depends>\n"
+ + " <depends optional-attribute-name=\"Connector\">jboss.messaging:service=Connector,transport=bisocket</depends>\n"
+ + " <attribute name=\"JNDIBindings\">\n"
+ + " <bindings>\n"
+ + " <binding>/ClusteredTestChangeRateConfigDefaultsFactory</binding>\n"
+ + " </bindings>\n"
+ + " </attribute>\n"
+ + " <attribute name=\"SupportsFailover\">true</attribute>"
+ + " <attribute name=\"SupportsLoadBalancing\">true</attribute>"
+ + " </mbean>";
+
+ ObjectName on = ServerManagement.deploy(mbeanConfig);
+ ServerManagement.invoke(on, "create", new Object[0], new String[0]);
+ ServerManagement.invoke(on, "start", new Object[0], new String[0]);
+
+ ConnectionFactory cf = (ConnectionFactory)ic[0].lookup("/ClusteredTestChangeRateConfigDefaultsFactory");
+ c = cf.createConnection();
+
+ ClientConnectionDelegate del1 = (ClientConnectionDelegate)((JBossConnection)c).getDelegate();
+
+ ConnectionState state1 = (ConnectionState)del1.getState();
+
+ long minTime = state1.getMinTimeoutProcessTime();
+
+ assertEquals(300, minTime);
+ }
+ finally
+ {
+ try
+ {
+ if (c != null)
+ {
+ log.info("Closing connection");
+ c.close();
+ log.info("Closed connection");
+ }
+ }
+ catch (Exception e)
+ {
+ log.warn(e.toString(), e);
+ }
+ }
+ }
+
// Package protected ----------------------------------------------------------------------------
// Protected ------------------------------------------------------------------------------------
More information about the jboss-cvs-commits
mailing list