[Jboss-cvs] JBoss Messaging SVN: r1317 - in branches/Branch_1_0/src/main/org/jboss/jms: client/container client/state tx
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Tue Sep 19 00:12:52 EDT 2006
Author: ovidiu.feodorov at jboss.com
Date: 2006-09-19 00:12:49 -0400 (Tue, 19 Sep 2006)
New Revision: 1317
Modified:
branches/Branch_1_0/src/main/org/jboss/jms/client/container/SessionAspect.java
branches/Branch_1_0/src/main/org/jboss/jms/client/container/TransactionAspect.java
branches/Branch_1_0/src/main/org/jboss/jms/client/state/SessionState.java
branches/Branch_1_0/src/main/org/jboss/jms/tx/MessagingXAResource.java
Log:
fix for http://jira.jboss.org/jira/browse/JBMESSAGING-410, http://jira.jboss.org/jira/browse/JBMESSAGING-520
Modified: branches/Branch_1_0/src/main/org/jboss/jms/client/container/SessionAspect.java
===================================================================
--- branches/Branch_1_0/src/main/org/jboss/jms/client/container/SessionAspect.java 2006-09-19 04:08:23 UTC (rev 1316)
+++ branches/Branch_1_0/src/main/org/jboss/jms/client/container/SessionAspect.java 2006-09-19 04:12:49 UTC (rev 1317)
@@ -44,6 +44,7 @@
* This aspect is PER_VM
*
* @author <a href="mailto:tim.fox at jboss.com>Tim Fox</a>
+ * @author <a href="mailto:ovidiu at jboss.com>Ovidiu Feodorov</a>
*
* $Id$
*/
@@ -79,24 +80,25 @@
public Object handlePreDeliver(Invocation invocation) throws Throwable
{
MethodInvocation mi = (MethodInvocation)invocation;
-
SessionState state = getState(invocation);
int ackMode = state.getAcknowledgeMode();
- if (ackMode == Session.CLIENT_ACKNOWLEDGE || ackMode == Session.AUTO_ACKNOWLEDGE ||
- ackMode == Session.DUPS_OK_ACKNOWLEDGE)
+ if (ackMode == Session.CLIENT_ACKNOWLEDGE ||
+ ackMode == Session.AUTO_ACKNOWLEDGE ||
+ ackMode == Session.DUPS_OK_ACKNOWLEDGE ||
+ state.getCurrentTxId() == null)
{
+ // We collect acknowledgments (and not transact them) for CLIENT, AUTO and DUPS_OK, and
+ // also for XA sessions not enrolled in a global transaction.
+
SessionDelegate del = (SessionDelegate)mi.getTargetObject();
- //We store the ack in a list for later acknowledgement or recovery
+ // We store the ack in a list for later acknowledgement or recovery
Object[] args = mi.getArguments();
-
MessageProxy mp = (MessageProxy)args[0];
-
int consumerID = ((Integer)args[1]).intValue();
-
AckInfo info = new AckInfo(mp, consumerID);
state.getToAck().add(info);
@@ -110,9 +112,7 @@
public Object handleAcknowledgeAll(Invocation invocation) throws Throwable
{
MethodInvocation mi = (MethodInvocation)invocation;
-
SessionState state = getState(invocation);
-
SessionDelegate del = (SessionDelegate)mi.getTargetObject();
if (!state.getToAck().isEmpty())
@@ -128,42 +128,34 @@
public Object handlePostDeliver(Invocation invocation) throws Throwable
{
MethodInvocation mi = (MethodInvocation)invocation;
-
SessionState state = getState(invocation);
int ackMode = state.getAcknowledgeMode();
- if (ackMode == Session.AUTO_ACKNOWLEDGE || ackMode == Session.DUPS_OK_ACKNOWLEDGE)
+ if (ackMode == Session.AUTO_ACKNOWLEDGE ||
+ ackMode == Session.DUPS_OK_ACKNOWLEDGE ||
+ ackMode != Session.CLIENT_ACKNOWLEDGE && state.getCurrentTxId() == null)
{
- SessionDelegate del = (SessionDelegate)mi.getTargetObject();
-
- // We acknowledge immediately
-
+ // We acknowledge immediately on a non-transacted session that does not want to
+ // CLIENT_ACKNOWLEDGE, or an XA session not enrolled in a global transaction.
+
+ SessionDelegate sd = (SessionDelegate)mi.getTargetObject();
+
if (!state.isRecoverCalled())
{
- //We don't acknowledge the message if recover() was called
-
- //Object[] args = mi.getArguments();
-
- //MessageProxy proxy = (MessageProxy)args[0];
-
- //int consumerID = ((Integer)args[1]).intValue();
+ if (trace) { log.trace("acknowledging NON-transactionally"); }
- //AckInfo ack = new AckInfo(proxy, consumerID);
-
List acks = state.getToAck();
-
AckInfo ack = (AckInfo)acks.get(0);
-
- del.acknowledge(ack);
-
- state.getToAck().clear();
+ sd.acknowledge(ack);
+ state.getToAck().clear();
}
else
{
+ if (trace) { log.trace("recover called, so NOT acknowledging"); }
+
state.setRecoverCalled(false);
}
- if (trace) { log.trace("ack mode is " + Util.acknowledgmentModeToString(ackMode)+ ", acknowledged on " + del); }
}
return null;
Modified: branches/Branch_1_0/src/main/org/jboss/jms/client/container/TransactionAspect.java
===================================================================
--- branches/Branch_1_0/src/main/org/jboss/jms/client/container/TransactionAspect.java 2006-09-19 04:08:23 UTC (rev 1316)
+++ branches/Branch_1_0/src/main/org/jboss/jms/client/container/TransactionAspect.java 2006-09-19 04:12:49 UTC (rev 1317)
@@ -37,6 +37,7 @@
import org.jboss.jms.tx.LocalTx;
import org.jboss.jms.tx.TxState;
import org.jboss.jms.tx.ResourceManager;
+import org.jboss.logging.Logger;
/**
* This aspect handles transaction related logic
@@ -44,6 +45,7 @@
* This aspect is PER_VM.
*
* @author <a href="mailto:tim.fox at jboss.com>Tim Fox</a>
+ * @author <a href="mailto:ovidiu at jboss.com>Ovidiu Feodorov</a>
*
* $Id$
*/
@@ -51,8 +53,12 @@
{
// Constants -----------------------------------------------------
+ private static final Logger log = Logger.getLogger(TransactionAspect.class);
+
// Attributes ----------------------------------------------------
+ private boolean trace = log.isTraceEnabled();
+
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
@@ -151,62 +157,50 @@
public Object handleSend(Invocation invocation) throws Throwable
{
- SessionState sessionState = (SessionState)getState(invocation);
+ SessionState state = (SessionState)getState(invocation);
+ Object txID = state.getCurrentTxId();
- if (sessionState.isTransacted())
+ if (txID != null)
{
- // Session is transacted - so we add message to tx instead of sending now
+ // the session is non-XA and transacted, or XA and enrolled in a global transaction, so
+ // we add the message to a transaction instead of sending it now. An XA session that has
+ // not been enrolled in a global transaction behaves as a non-transacted session.
- Object txID = sessionState.getCurrentTxId();
-
- if (txID == null)
- {
- throw new IllegalStateException("Attempt to send message in tx, but txId is null, XA?" + sessionState.isXA());
- }
-
- ConnectionState connState = (ConnectionState)sessionState.getParent();
-
+ ConnectionState connState = (ConnectionState)state.getParent();
MethodInvocation mi = (MethodInvocation)invocation;
-
Message m = (Message)mi.getArguments()[0];
+ if (trace) { log.trace("sending message " + m + " transactionally, queueing on resource manager"); }
+
connState.getResourceManager().addMessage(txID, m);
// ... and we don't invoke any further interceptors in the stack
return null;
}
- else
- {
- return invocation.invokeNext();
- }
+
+ if (trace) { log.trace("sending message NON-transactionally"); }
+
+ return invocation.invokeNext();
}
public Object handlePreDeliver(Invocation invocation) throws Throwable
{
SessionState state = (SessionState)getState(invocation);
+ Object txID = state.getCurrentTxId();
- if (state.isTransacted())
+ if (txID != null)
{
- MethodInvocation mi = (MethodInvocation)invocation;
+ // the session is non-XA and transacted, or XA and enrolled in a global transaction. An
+ // XA session that has not been enrolled in a global transaction behaves as a
+ // non-transacted session.
+ MethodInvocation mi = (MethodInvocation)invocation;
MessageProxy proxy = (MessageProxy)mi.getArguments()[0];
-
- //long messageID = proxy.getMessage().getMessageID();
-
int consumerID = ((Integer)mi.getArguments()[1]).intValue();
-
AckInfo info = new AckInfo(proxy, consumerID);
-
- Object txID = state.getCurrentTxId();
-
- if (txID == null)
- {
- throw new IllegalStateException("Attempt to send message in tx, but txId is null, XA?" + state.isXA());
- }
-
ConnectionState connState = (ConnectionState)state.getParent();
- //Add the acknowledgement to the transaction
+ if (trace) { log.trace("sending acknowlegment transactionally, queueing on resource manager"); }
connState.getResourceManager().addAck(txID, info);
}
Modified: branches/Branch_1_0/src/main/org/jboss/jms/client/state/SessionState.java
===================================================================
--- branches/Branch_1_0/src/main/org/jboss/jms/client/state/SessionState.java 2006-09-19 04:08:23 UTC (rev 1316)
+++ branches/Branch_1_0/src/main/org/jboss/jms/client/state/SessionState.java 2006-09-19 04:12:49 UTC (rev 1317)
@@ -73,25 +73,27 @@
children = new HashSet();
this.acknowledgeMode = ackMode;
this.transacted = transacted;
- this.xa = xa;
+ this.xa = xa;
+
if (xa)
{
// Create an XA resource
xaResource = new MessagingXAResource(parent.getResourceManager(), this);
}
- if (transacted)
+
+ // If session is transacted and XA, the currentTxId will be updated when the XAResource will
+ // be enrolled with a global transaction.
+
+ if (transacted & !xa)
{
// Create a local tx
currentTxId = parent.getResourceManager().createLocalTx();
}
+
executor = new QueuedExecutor(new LinkedQueue());
- if (ackMode == Session.CLIENT_ACKNOWLEDGE || ackMode == Session.AUTO_ACKNOWLEDGE ||
- ackMode == Session.DUPS_OK_ACKNOWLEDGE)
- {
- toAck = new ArrayList();
- }
-
+ toAck = new ArrayList();
+
// TODO could optimise this to use the same map of callbackmanagers (which holds refs
// to callbackhandlers) in the connection, instead of maintaining another map
callbackHandlers = new HashMap();
Modified: branches/Branch_1_0/src/main/org/jboss/jms/tx/MessagingXAResource.java
===================================================================
--- branches/Branch_1_0/src/main/org/jboss/jms/tx/MessagingXAResource.java 2006-09-19 04:08:23 UTC (rev 1316)
+++ branches/Branch_1_0/src/main/org/jboss/jms/tx/MessagingXAResource.java 2006-09-19 04:12:49 UTC (rev 1317)
@@ -114,6 +114,11 @@
if (trace) { log.trace(this + " committing " + xid + (onePhase ? " (one phase)" : " (two phase)")); }
rm.commit(xid, onePhase, connection);
+
+ // leave the session in a 'clean' state, the currentTxId will be set when the XAResource will
+ // be enrolled with a new transaction.
+
+ setCurrentTransactionId(null);
}
public void end(Xid xid, int flags) throws XAException
@@ -162,8 +167,8 @@
{
if (trace) { log.trace(this + " rolling back " + xid); }
- //TODO Hmmm on rollback should we also stop and start the consumers to remove any transient messages
- //Like we do on local session rollback??
+ // TODO Hmmm on rollback should we also stop and start the consumers to remove any transient
+ // messages, like we do on local session rollback??
rm.rollback(xid, connection);
}
@@ -190,10 +195,11 @@
case TMNOFLAGS :
if (convertTx)
{
- //If I commit/rollback the tx, then there is a short period of time between the AS (or whoever)
- //calling commit on the tx and calling start to enrolling the session in a new tx.
- //If the session has any listeners then in that period, messages can be received asychronously
- //but we want them to be received in the context of a tx, so we convert.
+ // If I commit/rollback the tx, then there is a short period of time between the
+ // AS (or whoever) calling commit on the tx and calling start to enrolling the
+ // session in a new tx. If the session has any listeners then in that period,
+ // messages can be received asychronously but we want them to be received in the
+ // context of a tx, so we convert.
setCurrentTransactionId(rm.convertTx((LocalTx)sessionState.getCurrentTxId(), xid));
}
else
@@ -237,11 +243,6 @@
private void setCurrentTransactionId(final Xid xid)
{
- if (xid == null)
- {
- throw new NullPointerException("null xid");
- }
-
if (trace) { log.trace(this + " setting current xid to " + xid + ", previous " + sessionState.getCurrentTxId()); }
sessionState.setCurrentTxId(xid);
More information about the jboss-cvs-commits
mailing list