[jboss-cvs] JBoss Messaging SVN: r8335 - in branches/JBMESSAGING_1876: integration/EAP5/etc and 4 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Tue Jun 14 03:58:22 EDT 2011
Author: gaohoward
Date: 2011-06-14 03:58:21 -0400 (Tue, 14 Jun 2011)
New Revision: 8335
Modified:
branches/JBMESSAGING_1876/integration/EAP4/etc/aop-messaging-client.xml
branches/JBMESSAGING_1876/integration/EAP5/etc/aop-messaging-client.xml
branches/JBMESSAGING_1876/src/main/org/jboss/jms/client/container/ClientConsumer.java
branches/JBMESSAGING_1876/src/main/org/jboss/jms/client/container/SessionAspect.java
branches/JBMESSAGING_1876/src/main/org/jboss/jms/client/delegate/ClientSessionDelegate.java
branches/JBMESSAGING_1876/src/main/org/jboss/jms/delegate/SessionDelegate.java
branches/JBMESSAGING_1876/src/main/org/jboss/jms/tx/ClientTransaction.java
branches/JBMESSAGING_1876/src/main/org/jboss/jms/tx/ResourceManager.java
Log:
first fix
Modified: branches/JBMESSAGING_1876/integration/EAP4/etc/aop-messaging-client.xml
===================================================================
--- branches/JBMESSAGING_1876/integration/EAP4/etc/aop-messaging-client.xml 2011-06-14 02:24:48 UTC (rev 8334)
+++ branches/JBMESSAGING_1876/integration/EAP4/etc/aop-messaging-client.xml 2011-06-14 07:58:21 UTC (rev 8335)
@@ -136,6 +136,9 @@
<bind pointcut="execution(* org.jboss.jms.client.delegate.ClientSessionDelegate->postDeliver(..))">
<advice name="handlePostDeliver" aspect="org.jboss.jms.client.container.SessionAspect"/>
</bind>
+ <bind pointcut="execution(* org.jboss.jms.client.delegate.ClientSessionDelegate->processMessageTimeout(..))">
+ <advice name="handleProcessMessageTimeout" aspect="org.jboss.jms.client.container.SessionAspect"/>
+ </bind>
<bind pointcut="execution(* org.jboss.jms.client.delegate.ClientSessionDelegate->acknowledgeAll(..))">
<advice name="handleAcknowledgeAll" aspect="org.jboss.jms.client.container.SessionAspect"/>
</bind>
@@ -283,4 +286,4 @@
<bind pointcut="execution(* org.jboss.jms.client.delegate.ClientBrowserDelegate->hasNextMessage())">
<advice name="handleHasNextMessage" aspect="org.jboss.jms.client.container.BrowserAspect"/>
</bind>
-</aop>
\ No newline at end of file
+</aop>
Modified: branches/JBMESSAGING_1876/integration/EAP5/etc/aop-messaging-client.xml
===================================================================
--- branches/JBMESSAGING_1876/integration/EAP5/etc/aop-messaging-client.xml 2011-06-14 02:24:48 UTC (rev 8334)
+++ branches/JBMESSAGING_1876/integration/EAP5/etc/aop-messaging-client.xml 2011-06-14 07:58:21 UTC (rev 8335)
@@ -136,6 +136,9 @@
<bind pointcut="execution(* org.jboss.jms.client.delegate.ClientSessionDelegate->postDeliver(..))">
<advice name="handlePostDeliver" aspect="org.jboss.jms.client.container.SessionAspect"/>
</bind>
+ <bind pointcut="execution(* org.jboss.jms.client.delegate.ClientSessionDelegate->processMessageTimeout(..))">
+ <advice name="handleProcessMessageTimeout" aspect="org.jboss.jms.client.container.SessionAspect"/>
+ </bind>
<bind pointcut="execution(* org.jboss.jms.client.delegate.ClientSessionDelegate->acknowledgeAll(..))">
<advice name="handleAcknowledgeAll" aspect="org.jboss.jms.client.container.SessionAspect"/>
</bind>
@@ -283,4 +286,4 @@
<bind pointcut="execution(* org.jboss.jms.client.delegate.ClientBrowserDelegate->hasNextMessage())">
<advice name="handleHasNextMessage" aspect="org.jboss.jms.client.container.BrowserAspect"/>
</bind>
-</aop>
\ No newline at end of file
+</aop>
Modified: branches/JBMESSAGING_1876/src/main/org/jboss/jms/client/container/ClientConsumer.java
===================================================================
--- branches/JBMESSAGING_1876/src/main/org/jboss/jms/client/container/ClientConsumer.java 2011-06-14 02:24:48 UTC (rev 8334)
+++ branches/JBMESSAGING_1876/src/main/org/jboss/jms/client/container/ClientConsumer.java 2011-06-14 07:58:21 UTC (rev 8335)
@@ -315,6 +315,9 @@
private Object messageSource;
private boolean isClustered = false;
+ //JBMESSAGING-1876
+ private long minTimeoutProcessTime = 50;
+
public int getBufferSize()
{
return buffer.size();
@@ -627,15 +630,16 @@
long tmUsed = System.currentTimeMillis() - startTimestamp;
- if (tmUsed >= timeout)
+ long timeLeft = (timeout - tmUsed) <= 0 ? minTimeoutProcessTime : tmUsed + minTimeoutProcessTime;
+
+ if (trace)
{
- log.warn("Timed out before post message processing, discarding message " + m);
- throw new JMSException("Timed out before post message processing, discarding message " + m);
+ log.trace("Time left: " + timeLeft + " timeout " + timeout + " tmUsed " + tmUsed);
}
-
+
try
{
- ignore = f.get(timeout - tmUsed, TimeUnit.MILLISECONDS);
+ ignore = f.get(timeLeft, TimeUnit.MILLISECONDS);
}
catch (InterruptedException e)
{
@@ -652,8 +656,9 @@
}
catch (TimeoutException e)
{
- log.warn("Timed out waiting for post message processing, discarding message " + m);
- throw new JMSException("Timed out waiting for post message processing, discarding message " + m);
+ log.warn("Timed out waiting for post message processing " + m);
+ ignore = false;
+ sessionDelegate.processMessageTimeout();
}
}
Modified: branches/JBMESSAGING_1876/src/main/org/jboss/jms/client/container/SessionAspect.java
===================================================================
--- branches/JBMESSAGING_1876/src/main/org/jboss/jms/client/container/SessionAspect.java 2011-06-14 02:24:48 UTC (rev 8334)
+++ branches/JBMESSAGING_1876/src/main/org/jboss/jms/client/container/SessionAspect.java 2011-06-14 07:58:21 UTC (rev 8335)
@@ -409,6 +409,62 @@
}
}
+ //JBMESSAGING-1876
+ public Object handleProcessMessageTimeout(Invocation invocation) throws Throwable
+ {
+ MethodInvocation mi = (MethodInvocation)invocation;
+ SessionState state = getState(invocation);
+
+ boolean result = true;
+
+ synchronized (state)
+ {
+
+ int ackMode = state.getAcknowledgeMode();
+
+ Object[] args = mi.getArguments();
+ DeliveryInfo info = (DeliveryInfo)args[0];
+
+ if (ackMode == Session.CLIENT_ACKNOWLEDGE)
+ {
+ if (trace)
+ {
+ log.trace("Don't do anything about CLIENT_ACK mode");
+ }
+ }
+ else if (ackMode == Session.AUTO_ACKNOWLEDGE)
+ {
+ if (trace)
+ {
+ log.trace("Don't do anything about AUTO_ACK mode");
+ }
+ }
+ else if (ackMode == Session.DUPS_OK_ACKNOWLEDGE)
+ {
+ if (trace)
+ {
+ log.trace("Don't do anything about DUPS_OK_ACK mode");
+ }
+ }
+ else
+ {
+ Object txID = state.getCurrentTxId();
+
+ if (txID != null)
+ {
+
+ if (trace) { log.trace("Marking tx " + txID + " to be rollback only"); }
+
+ ConnectionState connState = (ConnectionState)state.getParent();
+
+ connState.getResourceManager().markTxRollbackOnly(txID);
+ }
+ }
+ }
+
+ return null;
+ }
+
/**
* Used for client acknowledge.
*/
Modified: branches/JBMESSAGING_1876/src/main/org/jboss/jms/client/delegate/ClientSessionDelegate.java
===================================================================
--- branches/JBMESSAGING_1876/src/main/org/jboss/jms/client/delegate/ClientSessionDelegate.java 2011-06-14 02:24:48 UTC (rev 8334)
+++ branches/JBMESSAGING_1876/src/main/org/jboss/jms/client/delegate/ClientSessionDelegate.java 2011-06-14 07:58:21 UTC (rev 8335)
@@ -549,6 +549,11 @@
return "SessionDelegate[" + System.identityHashCode(this) + ", ID=" + id + "]";
}
+ public void processMessageTimeout() throws JMSException
+ {
+ throw new IllegalStateException("This invocation should not be handled here!");
+ }
+
// Protected ------------------------------------------------------------------------------------
// Package Private ------------------------------------------------------------------------------
Modified: branches/JBMESSAGING_1876/src/main/org/jboss/jms/delegate/SessionDelegate.java
===================================================================
--- branches/JBMESSAGING_1876/src/main/org/jboss/jms/delegate/SessionDelegate.java 2011-06-14 02:24:48 UTC (rev 8334)
+++ branches/JBMESSAGING_1876/src/main/org/jboss/jms/delegate/SessionDelegate.java 2011-06-14 07:58:21 UTC (rev 8335)
@@ -95,4 +95,6 @@
ProducerDelegate createProducerDelegate(JBossDestination destination) throws JMSException;
void acknowledgeAll() throws JMSException;
+
+ void processMessageTimeout() throws JMSException;
}
Modified: branches/JBMESSAGING_1876/src/main/org/jboss/jms/tx/ClientTransaction.java
===================================================================
--- branches/JBMESSAGING_1876/src/main/org/jboss/jms/tx/ClientTransaction.java 2011-06-14 02:24:48 UTC (rev 8334)
+++ branches/JBMESSAGING_1876/src/main/org/jboss/jms/tx/ClientTransaction.java 2011-06-14 07:58:21 UTC (rev 8335)
@@ -65,6 +65,8 @@
private boolean clientSide;
private boolean recovered;
+
+ private boolean rollbackOnly = false;
// Static --------------------------------------------------------
@@ -502,4 +504,14 @@
}
+ public void markRollbackOnly()
+ {
+ this.rollbackOnly = true;
+ }
+
+ public boolean isRollbackOnly()
+ {
+ return this.rollbackOnly;
+ }
+
}
Modified: branches/JBMESSAGING_1876/src/main/org/jboss/jms/tx/ResourceManager.java
===================================================================
--- branches/JBMESSAGING_1876/src/main/org/jboss/jms/tx/ResourceManager.java 2011-06-14 02:24:48 UTC (rev 8334)
+++ branches/JBMESSAGING_1876/src/main/org/jboss/jms/tx/ResourceManager.java 2011-06-14 07:58:21 UTC (rev 8335)
@@ -197,6 +197,13 @@
{
throw new IllegalStateException("Cannot find transaction " + xid);
}
+
+ //if tx is mark rollback, roll back immediately
+ if (tx.isRollbackOnly())
+ {
+ this.rollbackLocal(xid);
+ throw new MessagingTransactionRolledBackException("Rolled back " + tx + " as it is marked rollback only!");
+ }
TransactionRequest request =
new TransactionRequest(TransactionRequest.ONE_PHASE_COMMIT_REQUEST, null, tx);
@@ -350,6 +357,13 @@
ClientTransaction tx = removeTxInternal(xid);
if (trace) { log.trace("got tx: " + tx + " state " + tx.getState()); }
+
+
+ if ((tx != null) && tx.isRollbackOnly())
+ {
+ this.rollback(xid, connection);
+ throw new MessagingXAException(XAException.XA_RBROLLBACK, "Transaction marked rollback only, xid: " + xid);
+ }
if (onePhase)
{
@@ -717,6 +731,21 @@
}
}
}
+
+ /**
+ * @param txID
+ */
+ public void markTxRollbackOnly(Object txID) throws JMSException
+ {
+ ClientTransaction tx = getTxInternal(txID);
+
+ if (tx == null)
+ {
+ throw new JMSException("There is no transaction with id " + txID);
+ }
+
+ tx.markRollbackOnly();
+ }
// Inner Classes --------------------------------------------------------------------------------
More information about the jboss-cvs-commits
mailing list