[jboss-cvs] JBoss Messaging SVN: r8454 - in branches/Branch_1_4: integration/EAP4/etc and 5 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Tue Oct 25 22:45:35 EDT 2011
Author: gaohoward
Date: 2011-10-25 22:45:35 -0400 (Tue, 25 Oct 2011)
New Revision: 8454
Modified:
branches/Branch_1_4/docs/userguide/en/modules/configuration.xml
branches/Branch_1_4/integration/EAP4/etc/aop-messaging-client.xml
branches/Branch_1_4/integration/EAP5/etc/aop-messaging-client.xml
branches/Branch_1_4/src/main/org/jboss/jms/client/container/ClientConsumer.java
branches/Branch_1_4/src/main/org/jboss/jms/client/container/ConsumerAspect.java
branches/Branch_1_4/src/main/org/jboss/jms/client/container/SessionAspect.java
branches/Branch_1_4/src/main/org/jboss/jms/client/delegate/ClientSessionDelegate.java
branches/Branch_1_4/src/main/org/jboss/jms/delegate/SessionDelegate.java
branches/Branch_1_4/src/main/org/jboss/jms/tx/ClientTransaction.java
branches/Branch_1_4/src/main/org/jboss/jms/tx/ResourceManager.java
Log:
JBMESSAGING-1904
Rolling back 1850/1876 (but retaining the MinTimeoutProcessTime to keep the wireformat backward compatible)
Modified: branches/Branch_1_4/docs/userguide/en/modules/configuration.xml
===================================================================
--- branches/Branch_1_4/docs/userguide/en/modules/configuration.xml 2011-10-20 12:44:23 UTC (rev 8453)
+++ branches/Branch_1_4/docs/userguide/en/modules/configuration.xml 2011-10-26 02:45:35 UTC (rev 8454)
@@ -2609,24 +2609,6 @@
<para>This attribute takes effect only if the MaxRetryChangeRate attribute is not zero.</para>
</section>
- <section id="conf.connectionfactory.attributes.minTimeoutProcessTime">
- <title>MinTimeoutProcessTime</title>
-
- <para>Minimum processing time allowed for (milliseconds) in a timeout receive. This is the minimum time for the internal processing after a message is arrived at the client
- but before returning the message to the application. Default is 300. </para>
-
- <para><note>
-
- <para>When consumer is using AUTO_ACKNOWLEDGE session and time out for receive method expires in the moment when message is received at the client buffer but acknowledgement
- is not confirmed by the server, then this message will be discarded and an exception is thrown. like</para>
-
- <para>javax.jms.JMSException: Timed out before post message processing, discarding message delegator->JBossMessage[23357322686633840]ERSISTENT, deliveryId=0 at org.jboss.jms.client.container.ClientConsumer.receive(ClientConsumer.java:596)</para>
-
- <para>This can happen when time out for receive method is too low. Use CLIENT_ACKNOWLEDGE mode or Transactional mode to avoid such message loss.</para>
- </note></para>
-
- </section>
-
<!-- End conf.connectionfactory.attributes -->
</section>
Modified: branches/Branch_1_4/integration/EAP4/etc/aop-messaging-client.xml
===================================================================
--- branches/Branch_1_4/integration/EAP4/etc/aop-messaging-client.xml 2011-10-20 12:44:23 UTC (rev 8453)
+++ branches/Branch_1_4/integration/EAP4/etc/aop-messaging-client.xml 2011-10-26 02:45:35 UTC (rev 8454)
@@ -136,9 +136,6 @@
<bind pointcut="execution(* org.jboss.jms.client.delegate.ClientSessionDelegate->postDeliver(..))">
<advice name="handlePostDeliver" aspect="org.jboss.jms.client.container.SessionAspect"/>
</bind>
- <bind pointcut="execution(* org.jboss.jms.client.delegate.ClientSessionDelegate->processMessageTimeout(..))">
- <advice name="handleProcessMessageTimeout" aspect="org.jboss.jms.client.container.SessionAspect"/>
- </bind>
<bind pointcut="execution(* org.jboss.jms.client.delegate.ClientSessionDelegate->acknowledgeAll(..))">
<advice name="handleAcknowledgeAll" aspect="org.jboss.jms.client.container.SessionAspect"/>
</bind>
Modified: branches/Branch_1_4/integration/EAP5/etc/aop-messaging-client.xml
===================================================================
--- branches/Branch_1_4/integration/EAP5/etc/aop-messaging-client.xml 2011-10-20 12:44:23 UTC (rev 8453)
+++ branches/Branch_1_4/integration/EAP5/etc/aop-messaging-client.xml 2011-10-26 02:45:35 UTC (rev 8454)
@@ -136,9 +136,6 @@
<bind pointcut="execution(* org.jboss.jms.client.delegate.ClientSessionDelegate->postDeliver(..))">
<advice name="handlePostDeliver" aspect="org.jboss.jms.client.container.SessionAspect"/>
</bind>
- <bind pointcut="execution(* org.jboss.jms.client.delegate.ClientSessionDelegate->processMessageTimeout(..))">
- <advice name="handleProcessMessageTimeout" aspect="org.jboss.jms.client.container.SessionAspect"/>
- </bind>
<bind pointcut="execution(* org.jboss.jms.client.delegate.ClientSessionDelegate->acknowledgeAll(..))">
<advice name="handleAcknowledgeAll" aspect="org.jboss.jms.client.container.SessionAspect"/>
</bind>
Modified: branches/Branch_1_4/src/main/org/jboss/jms/client/container/ClientConsumer.java
===================================================================
--- branches/Branch_1_4/src/main/org/jboss/jms/client/container/ClientConsumer.java 2011-10-20 12:44:23 UTC (rev 8453)
+++ branches/Branch_1_4/src/main/org/jboss/jms/client/container/ClientConsumer.java 2011-10-26 02:45:35 UTC (rev 8454)
@@ -24,13 +24,7 @@
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
import java.util.concurrent.locks.ReentrantLock;
import javax.jms.IllegalStateException;
@@ -312,7 +306,6 @@
private int consumeCount;
private boolean firstTime = true;
private volatile Thread onMessageThread;
- private ExecutorService pool = Executors.newCachedThreadPool();
private long maxRetryChangeRate;
private long retryChangeRateInterval;
private boolean abortReceive;
@@ -328,9 +321,6 @@
private Object messageSource;
private boolean isClustered = false;
- //JBMESSAGING-1876
- private long minTimeoutProcessTime;
-
private boolean isSucker = false;
public int getBufferSize()
@@ -349,8 +339,7 @@
long maxRetryChangeRate,
long retryChangeRateInterval,
boolean isClustered,
- CallbackManager cbManager,
- long minTimeoutProcessTime)
+ CallbackManager cbManager)
{
if (bufferSize < 1)
{
@@ -378,7 +367,6 @@
consumerLock = new Object();
}
messageSource = cbManager;
- this.minTimeoutProcessTime = minTimeoutProcessTime;
}
// Public ---------------------------------------------------------------------------------------
@@ -517,8 +505,6 @@
this.listener = null;
}
-
- pool.shutdownNow();
if (trace) { log.trace(this + " closed"); }
}
@@ -607,79 +593,15 @@
if (!isConnectionConsumer && !ignore)
{
final DeliveryInfo info = new DeliveryInfo(m, consumerID, queueName, null, shouldAck, m.getSource());
+ sessionDelegate.preDeliver(info);
+
+ // If post deliver didn't succeed and acknowledgement mode is auto_ack
+ // That means the ref wasn't acked since it couldn't be found.
+ // In order to maintain at most once semantics we must therefore not return
+ // the message
- if (timeout <= 0 || sessionDelegate.getTransacted())
- {
- ignore = ! sessionDelegate.preDeliver(info);
+ ignore = ! sessionDelegate.postDeliver();
- // If post deliver didn't succeed and acknowledgement mode is auto_ack
- // That means the ref wasn't acked since it couldn't be found.
- // In order to maintain at most once semantics we must therefore not return
- // the message
-
- if (!ignore)
- {
- ignore = !sessionDelegate.postDeliver();
- }
- }
- else
- {
- //JBMESSAGING-1850
- Callable<Boolean> afterReceive = new Callable<Boolean>()
- {
- public Boolean call() throws Exception
- {
- if (! sessionDelegate.preDeliver(info))
- {
- return true;
- }
-
- // If post deliver didn't succeed and acknowledgement mode is auto_ack
- // That means the ref wasn't acked since it couldn't be found.
- // In order to maintain at most once semantics we must therefore not return
- // the message
-
- return !sessionDelegate.postDeliver();
- }
- };
-
- java.util.concurrent.Future<Boolean> f = pool.submit(afterReceive);
-
- long tmUsed = System.currentTimeMillis() - startTimestamp;
-
- long timeDelta = timeout - tmUsed;
- long timeLeft = timeDelta <= minTimeoutProcessTime ? minTimeoutProcessTime : timeDelta ;
-
- if (trace)
- {
- log.trace("Time left: " + timeLeft + " timeout " + timeout + " tmUsed " + tmUsed);
- }
-
- try
- {
- ignore = f.get(timeLeft, TimeUnit.MILLISECONDS);
- }
- catch (InterruptedException e)
- {
- log.warn("Interrupted during getting future result.", e);
- }
- catch (ExecutionException e)
- {
- log.warn("received application exception.", e.getCause());
- Throwable t = e.getCause();
- if (t instanceof JMSException)
- {
- throw (JMSException)t;
- }
- }
- catch (TimeoutException e)
- {
- log.warn("Timed out waiting for post message processing " + m + " within time " + timeLeft);
- ignore = false;
- sessionDelegate.processMessageTimeout();
- }
- }
-
if (trace)
{
log.trace("Post deliver returned " + !ignore);
Modified: branches/Branch_1_4/src/main/org/jboss/jms/client/container/ConsumerAspect.java
===================================================================
--- branches/Branch_1_4/src/main/org/jboss/jms/client/container/ConsumerAspect.java 2011-10-20 12:44:23 UTC (rev 8453)
+++ branches/Branch_1_4/src/main/org/jboss/jms/client/container/ConsumerAspect.java 2011-10-26 02:45:35 UTC (rev 8454)
@@ -120,7 +120,7 @@
prefetchSize, executor, maxDeliveries, consumerState.isShouldAck(),
redeliveryDelay, consumerState.getMaxRetryChangeRate(),
consumerState.getRetryChangeRateInterval(),
- fcc != null, cm, consumerState.getMinTimeoutProcessTime());
+ fcc != null, cm);
sessionState.addCallbackHandler(messageHandler);
Modified: branches/Branch_1_4/src/main/org/jboss/jms/client/container/SessionAspect.java
===================================================================
--- branches/Branch_1_4/src/main/org/jboss/jms/client/container/SessionAspect.java 2011-10-20 12:44:23 UTC (rev 8453)
+++ branches/Branch_1_4/src/main/org/jboss/jms/client/container/SessionAspect.java 2011-10-26 02:45:35 UTC (rev 8454)
@@ -416,52 +416,6 @@
}
}
- //JBMESSAGING-1876
- public Object handleProcessMessageTimeout(Invocation invocation) throws Throwable
- {
- MethodInvocation mi = (MethodInvocation)invocation;
- SessionState state = getState(invocation);
-
- int ackMode = state.getAcknowledgeMode();
-
- if (ackMode == Session.CLIENT_ACKNOWLEDGE)
- {
- if (trace)
- {
- log.trace("Don't do anything about CLIENT_ACK mode");
- }
- }
- else if (ackMode == Session.AUTO_ACKNOWLEDGE)
- {
- if (trace)
- {
- log.trace("Don't do anything about AUTO_ACK mode");
- }
- }
- else if (ackMode == Session.DUPS_OK_ACKNOWLEDGE)
- {
- if (trace)
- {
- log.trace("Don't do anything about DUPS_OK_ACK mode");
- }
- }
- else
- {
- Object txID = state.getCurrentTxId();
-
- if (txID != null)
- {
- if (trace) { log.trace("Marking tx " + txID + " to be rollback only"); }
-
- ConnectionState connState = (ConnectionState)state.getParent();
-
- connState.getResourceManager().markTxRollbackOnly(txID);
- }
- }
-
- return null;
- }
-
/**
* Used for client acknowledge.
*/
Modified: branches/Branch_1_4/src/main/org/jboss/jms/client/delegate/ClientSessionDelegate.java
===================================================================
--- branches/Branch_1_4/src/main/org/jboss/jms/client/delegate/ClientSessionDelegate.java 2011-10-20 12:44:23 UTC (rev 8453)
+++ branches/Branch_1_4/src/main/org/jboss/jms/client/delegate/ClientSessionDelegate.java 2011-10-26 02:45:35 UTC (rev 8454)
@@ -549,11 +549,6 @@
return "SessionDelegate[" + System.identityHashCode(this) + ", ID=" + id + "]";
}
- public void processMessageTimeout() throws JMSException
- {
- throw new IllegalStateException("This invocation should not be handled here!");
- }
-
public void messageChanged(long messageID)
{
throw new IllegalStateException("This invocation should not be handled here!");
Modified: branches/Branch_1_4/src/main/org/jboss/jms/delegate/SessionDelegate.java
===================================================================
--- branches/Branch_1_4/src/main/org/jboss/jms/delegate/SessionDelegate.java 2011-10-20 12:44:23 UTC (rev 8453)
+++ branches/Branch_1_4/src/main/org/jboss/jms/delegate/SessionDelegate.java 2011-10-26 02:45:35 UTC (rev 8454)
@@ -96,7 +96,5 @@
void acknowledgeAll() throws JMSException;
- void processMessageTimeout() throws JMSException;
-
void messageChanged(long messageID);
}
Modified: branches/Branch_1_4/src/main/org/jboss/jms/tx/ClientTransaction.java
===================================================================
--- branches/Branch_1_4/src/main/org/jboss/jms/tx/ClientTransaction.java 2011-10-20 12:44:23 UTC (rev 8453)
+++ branches/Branch_1_4/src/main/org/jboss/jms/tx/ClientTransaction.java 2011-10-26 02:45:35 UTC (rev 8454)
@@ -65,8 +65,6 @@
private boolean clientSide;
private boolean recovered;
-
- private volatile boolean rollbackOnly = false;
/* Not sent over the wire, this is for differentiating between incompatible versions */
protected boolean supportsRecovered ;
@@ -521,15 +519,4 @@
}
}
-
- public void markRollbackOnly()
- {
- this.rollbackOnly = true;
- }
-
- public boolean isRollbackOnly()
- {
- return this.rollbackOnly;
- }
-
}
Modified: branches/Branch_1_4/src/main/org/jboss/jms/tx/ResourceManager.java
===================================================================
--- branches/Branch_1_4/src/main/org/jboss/jms/tx/ResourceManager.java 2011-10-20 12:44:23 UTC (rev 8453)
+++ branches/Branch_1_4/src/main/org/jboss/jms/tx/ResourceManager.java 2011-10-26 02:45:35 UTC (rev 8454)
@@ -198,13 +198,6 @@
{
throw new IllegalStateException("Cannot find transaction " + xid);
}
-
- //if tx is mark rollback, roll back immediately
- if (tx.isRollbackOnly())
- {
- this.rollbackLocal(xid, tx);
- throw new MessagingTransactionRolledBackException("Rolled back " + tx + " as it is marked rollback only!");
- }
TransactionRequest request =
new TransactionRequest(TransactionRequest.ONE_PHASE_COMMIT_REQUEST, null, tx);
@@ -359,12 +352,6 @@
{
throw new MessagingXAException(XAException.XAER_NOTA, "Cannot find transaction with xid:" + xid);
}
-
- if (state.isRollbackOnly())
- {
- this.rollback(xid, state, connection);
- throw new MessagingXAException(XAException.XA_RBROLLBACK, "Transaction marked rollback only, xid: " + xid);
- }
TransactionRequest request =
new TransactionRequest(TransactionRequest.TWO_PHASE_PREPARE_REQUEST, xid, state);
@@ -388,13 +375,6 @@
{
log.trace("got tx: " + tx + " state " + tx.getState());
}
-
- //roll back for onePhase only. for 2pc, rollback only is processed in prepare
- if ((tx != null) && tx.isRollbackOnly() && onePhase)
- {
- this.rollback(xid, tx, connection);
- throw new MessagingXAException(XAException.XA_RBROLLBACK, "Transaction marked rollback only, xid: " + xid);
- }
if (onePhase)
{
@@ -781,21 +761,6 @@
}
}
}
-
- /**
- * @param txID
- */
- public void markTxRollbackOnly(Object txID) throws JMSException
- {
- ClientTransaction tx = getTxInternal(txID);
-
- if (tx == null)
- {
- throw new JMSException("There is no transaction with id " + txID);
- }
-
- tx.markRollbackOnly();
- }
// Inner Classes --------------------------------------------------------------------------------
}
More information about the jboss-cvs-commits
mailing list