[jboss-cvs] JBoss Messaging SVN: r3930 - branches/Branch_Stable/src/main/org/jboss/jms/client/container.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Tue Mar 25 11:58:18 EDT 2008
Author: timfox
Date: 2008-03-25 11:58:18 -0400 (Tue, 25 Mar 2008)
New Revision: 3930
Modified:
branches/Branch_Stable/src/main/org/jboss/jms/client/container/SessionAspect.java
Log:
http://jira.jboss.org/jira/browse/JBMESSAGING-1253
Modified: branches/Branch_Stable/src/main/org/jboss/jms/client/container/SessionAspect.java
===================================================================
--- branches/Branch_Stable/src/main/org/jboss/jms/client/container/SessionAspect.java 2008-03-25 15:46:12 UTC (rev 3929)
+++ branches/Branch_Stable/src/main/org/jboss/jms/client/container/SessionAspect.java 2008-03-25 15:58:18 UTC (rev 3930)
@@ -227,71 +227,75 @@
MethodInvocation mi = (MethodInvocation)invocation;
SessionState state = getState(invocation);
- int ackMode = state.getAcknowledgeMode();
+ synchronized (state)
+ {
- Object[] args = mi.getArguments();
- DeliveryInfo info = (DeliveryInfo)args[0];
-
- if (ackMode == Session.CLIENT_ACKNOWLEDGE)
- {
- // We collect acknowledgments in the list
+ int ackMode = state.getAcknowledgeMode();
- if (trace) { log.trace(this + " added to CLIENT_ACKNOWLEDGE list delivery " + info); }
+ Object[] args = mi.getArguments();
+ DeliveryInfo info = (DeliveryInfo)args[0];
- // Sanity check
- if (info.getConnectionConsumerSession() != null)
+ if (ackMode == Session.CLIENT_ACKNOWLEDGE)
{
- throw new IllegalStateException(
- "CLIENT_ACKNOWLEDGE cannot be used with a connection consumer");
+ // We collect acknowledgments in the list
+
+ if (trace) { log.trace(this + " added to CLIENT_ACKNOWLEDGE list delivery " + info); }
+
+ // Sanity check
+ if (info.getConnectionConsumerSession() != null)
+ {
+ throw new IllegalStateException(
+ "CLIENT_ACKNOWLEDGE cannot be used with a connection consumer");
+ }
+
+ state.getClientAckList().add(info);
}
-
- state.getClientAckList().add(info);
- }
- // if XA and there is no transaction enlisted on XA we will act as AutoAcknowledge
- // However if it's a MDB (if there is a DistinguishedListener) we should behaved as transacted
- else if (ackMode == Session.AUTO_ACKNOWLEDGE || isXAAndConsideredNonTransacted(state))
- {
- // We collect the single acknowledgement in the state.
-
- if (trace) { log.trace(this + " added " + info + " to session state"); }
-
- state.setAutoAckInfo(info);
- }
- else if (ackMode == Session.DUPS_OK_ACKNOWLEDGE)
- {
- if (trace) { log.trace(this + " added to DUPS_OK_ACKNOWLEDGE list delivery " + info); }
-
- state.getClientAckList().add(info);
-
- //Also set here - this would be used for recovery in a message listener
- state.setAutoAckInfo(info);
- }
- else
- {
- Object txID = state.getCurrentTxId();
-
- if (txID != null)
+ // if XA and there is no transaction enlisted on XA we will act as AutoAcknowledge
+ // However if it's a MDB (if there is a DistinguishedListener) we should behaved as transacted
+ else if (ackMode == Session.AUTO_ACKNOWLEDGE || isXAAndConsideredNonTransacted(state))
{
- // the session is non-XA and transacted, or XA and enrolled in a global transaction. An
- // XA session that has not been enrolled in a global transaction behaves as a
- // transacted session.
+ // We collect the single acknowledgement in the state.
+
+ if (trace) { log.trace(this + " added " + info + " to session state"); }
- ConnectionState connState = (ConnectionState)state.getParent();
-
- if (trace) { log.trace("sending acknowlegment transactionally, queueing on resource manager"); }
-
- // If the ack is for a delivery that came through via a connection consumer then we use
- // the connectionConsumer session as the session id, otherwise we use this sessions'
- // session ID
+ state.setAutoAckInfo(info);
+ }
+ else if (ackMode == Session.DUPS_OK_ACKNOWLEDGE)
+ {
+ if (trace) { log.trace(this + " added to DUPS_OK_ACKNOWLEDGE list delivery " + info); }
- ClientSessionDelegate connectionConsumerDelegate =
- (ClientSessionDelegate)info.getConnectionConsumerSession();
+ state.getClientAckList().add(info);
- String sessionId = connectionConsumerDelegate != null ?
- connectionConsumerDelegate.getID() : state.getSessionID();
-
- connState.getResourceManager().addAck(txID, sessionId, info);
- }
+ //Also set here - this would be used for recovery in a message listener
+ state.setAutoAckInfo(info);
+ }
+ else
+ {
+ Object txID = state.getCurrentTxId();
+
+ if (txID != null)
+ {
+ // the session is non-XA and transacted, or XA and enrolled in a global transaction. An
+ // XA session that has not been enrolled in a global transaction behaves as a
+ // transacted session.
+
+ ConnectionState connState = (ConnectionState)state.getParent();
+
+ if (trace) { log.trace("sending acknowlegment transactionally, queueing on resource manager"); }
+
+ // If the ack is for a delivery that came through via a connection consumer then we use
+ // the connectionConsumer session as the session id, otherwise we use this sessions'
+ // session ID
+
+ ClientSessionDelegate connectionConsumerDelegate =
+ (ClientSessionDelegate)info.getConnectionConsumerSession();
+
+ String sessionId = connectionConsumerDelegate != null ?
+ connectionConsumerDelegate.getID() : state.getSessionID();
+
+ connState.getResourceManager().addAck(txID, sessionId, info);
+ }
+ }
}
return null;
@@ -302,85 +306,89 @@
MethodInvocation mi = (MethodInvocation)invocation;
SessionState state = getState(invocation);
- int ackMode = state.getAcknowledgeMode();
-
- SessionDelegate sd = (SessionDelegate)mi.getTargetObject();
-
- boolean res = true;
-
- // if XA and there is no transaction enlisted on XA we will act as AutoAcknowledge
- // However if it's a MDB (if there is a DistinguishedListener) we should behaved as transacted
- if (ackMode == Session.AUTO_ACKNOWLEDGE || isXAAndConsideredNonTransacted(state))
+ synchronized (state)
{
- // It is possible that session.recover() is called inside a message listener onMessage
- // method - i.e. between the invocations of preDeliver and postDeliver. In this case we
- // don't want to acknowledge the last delivered messages - since it will be redelivered.
- if (!state.isRecoverCalled())
- {
- DeliveryInfo delivery = state.getAutoAckInfo();
-
- if (delivery == null)
- {
- throw new IllegalStateException("Cannot find delivery to AUTO_ACKNOWLEDGE");
- }
-
- if (trace) { log.trace(this + " auto acknowledging delivery " + delivery); }
-
- // We clear the state in a finally so then we don't get a knock on
- // exception on the next ack since we haven't cleared the state. See
- // http://jira.jboss.org/jira/browse/JBMESSAGING-852
-
- //This is ok since the message is acked after delivery, then the client
- //could get duplicates anyway
-
- try
- {
- res = ackDelivery(sd, delivery);
- }
- finally
- {
- state.setAutoAckInfo(null);
- }
- }
- else
- {
- if (trace) { log.trace(this + " recover called, so NOT acknowledging"); }
-
- state.setRecoverCalled(false);
- }
- }
- else if (ackMode == Session.DUPS_OK_ACKNOWLEDGE)
- {
- List acks = state.getClientAckList();
- if (!state.isRecoverCalled())
+ int ackMode = state.getAcknowledgeMode();
+
+ SessionDelegate sd = (SessionDelegate)mi.getTargetObject();
+
+ boolean res = true;
+
+ // if XA and there is no transaction enlisted on XA we will act as AutoAcknowledge
+ // However if it's a MDB (if there is a DistinguishedListener) we should behaved as transacted
+ if (ackMode == Session.AUTO_ACKNOWLEDGE || isXAAndConsideredNonTransacted(state))
{
- if (acks.size() >= state.getDupsOKBatchSize())
+ // It is possible that session.recover() is called inside a message listener onMessage
+ // method - i.e. between the invocations of preDeliver and postDeliver. In this case we
+ // don't want to acknowledge the last delivered messages - since it will be redelivered.
+ if (!state.isRecoverCalled())
{
- // We clear the state in a finally
+ DeliveryInfo delivery = state.getAutoAckInfo();
+
+ if (delivery == null)
+ {
+ throw new IllegalStateException("Cannot find delivery to AUTO_ACKNOWLEDGE");
+ }
+
+ if (trace) { log.trace(this + " auto acknowledging delivery " + delivery); }
+
+ // We clear the state in a finally so then we don't get a knock on
+ // exception on the next ack since we haven't cleared the state. See
// http://jira.jboss.org/jira/browse/JBMESSAGING-852
-
+
+ //This is ok since the message is acked after delivery, then the client
+ //could get duplicates anyway
+
try
{
- acknowledgeDeliveries(sd, acks);
+ res = ackDelivery(sd, delivery);
}
finally
- {
- acks.clear();
- state.setAutoAckInfo(null);
+ {
+ state.setAutoAckInfo(null);
}
- }
+ }
+ else
+ {
+ if (trace) { log.trace(this + " recover called, so NOT acknowledging"); }
+
+ state.setRecoverCalled(false);
+ }
}
- else
+ else if (ackMode == Session.DUPS_OK_ACKNOWLEDGE)
{
- if (trace) { log.trace(this + " recover called, so NOT acknowledging"); }
-
- state.setRecoverCalled(false);
+ List acks = state.getClientAckList();
+
+ if (!state.isRecoverCalled())
+ {
+ if (acks.size() >= state.getDupsOKBatchSize())
+ {
+ // We clear the state in a finally
+ // http://jira.jboss.org/jira/browse/JBMESSAGING-852
+
+ try
+ {
+ acknowledgeDeliveries(sd, acks);
+ }
+ finally
+ {
+ acks.clear();
+ state.setAutoAckInfo(null);
+ }
+ }
+ }
+ else
+ {
+ if (trace) { log.trace(this + " recover called, so NOT acknowledging"); }
+
+ state.setRecoverCalled(false);
+ }
+ state.setAutoAckInfo(null);
}
- state.setAutoAckInfo(null);
+
+ return Boolean.valueOf(res);
}
-
- return Boolean.valueOf(res);
}
/**
@@ -390,18 +398,22 @@
{
MethodInvocation mi = (MethodInvocation)invocation;
SessionState state = getState(invocation);
- SessionDelegate del = (SessionDelegate)mi.getTargetObject();
-
- if (!state.getClientAckList().isEmpty())
- {
- //CLIENT_ACKNOWLEDGE can't be used with a MDB so it is safe to always acknowledge all
- //on this session (rather than the connection consumer session)
- acknowledgeDeliveries(del, state.getClientAckList());
- state.getClientAckList().clear();
- }
-
- return null;
+ synchronized (state)
+ {
+ SessionDelegate del = (SessionDelegate)mi.getTargetObject();
+
+ if (!state.getClientAckList().isEmpty())
+ {
+ //CLIENT_ACKNOWLEDGE can't be used with a MDB so it is safe to always acknowledge all
+ //on this session (rather than the connection consumer session)
+ acknowledgeDeliveries(del, state.getClientAckList());
+
+ state.getClientAckList().clear();
+ }
+
+ return null;
+ }
}
/*
@@ -415,50 +427,54 @@
SessionState state = getState(invocation);
- if (state.isTransacted() && !isXAAndConsideredNonTransacted(state))
+ synchronized (state)
{
- throw new IllegalStateException("Cannot recover a transacted session");
- }
-
- if (trace) { log.trace("recovering the session"); }
-
- //Call redeliver
- SessionDelegate del = (SessionDelegate)mi.getTargetObject();
-
- int ackMode = state.getAcknowledgeMode();
-
- if (ackMode == Session.CLIENT_ACKNOWLEDGE)
- {
- List dels = state.getClientAckList();
- state.setClientAckList(new ArrayList());
+ if (state.isTransacted() && !isXAAndConsideredNonTransacted(state))
+ {
+ throw new IllegalStateException("Cannot recover a transacted session");
+ }
- del.redeliver(dels);
-
- state.setRecoverCalled(true);
- }
- else if (ackMode == Session.AUTO_ACKNOWLEDGE || ackMode == Session.DUPS_OK_ACKNOWLEDGE || isXAAndConsideredNonTransacted(state))
- {
- DeliveryInfo info = state.getAutoAckInfo();
+ if (trace) { log.trace("recovering the session"); }
+
+ //Call redeliver
+ SessionDelegate del = (SessionDelegate)mi.getTargetObject();
- //Don't recover if it's already to cancel
+ int ackMode = state.getAcknowledgeMode();
- if (info != null)
+ if (ackMode == Session.CLIENT_ACKNOWLEDGE)
{
- List redels = new ArrayList();
+ List dels = state.getClientAckList();
- redels.add(info);
+ state.setClientAckList(new ArrayList());
- del.redeliver(redels);
-
- state.setAutoAckInfo(null);
-
+ del.redeliver(dels);
+
state.setRecoverCalled(true);
}
- }
-
-
- return null;
+ else if (ackMode == Session.AUTO_ACKNOWLEDGE || ackMode == Session.DUPS_OK_ACKNOWLEDGE || isXAAndConsideredNonTransacted(state))
+ {
+ DeliveryInfo info = state.getAutoAckInfo();
+
+ //Don't recover if it's already to cancel
+
+ if (info != null)
+ {
+ List redels = new ArrayList();
+
+ redels.add(info);
+
+ del.redeliver(redels);
+
+ state.setAutoAckInfo(null);
+
+ state.setRecoverCalled(true);
+ }
+ }
+
+
+ return null;
+ }
}
/**
@@ -538,65 +554,71 @@
public Object handleCommit(Invocation invocation) throws Throwable
{
SessionState state = getState(invocation);
-
- if (!state.isTransacted())
- {
- throw new IllegalStateException("Cannot commit a non-transacted session");
+
+ synchronized (state)
+ {
+ if (!state.isTransacted())
+ {
+ throw new IllegalStateException("Cannot commit a non-transacted session");
+ }
+
+ if (state.isXA())
+ {
+ throw new TransactionInProgressException("Cannot call commit on an XA session");
+ }
+
+ ConnectionState connState = (ConnectionState)state.getParent();
+ ConnectionDelegate conn = (ConnectionDelegate)connState.getDelegate();
+
+ try
+ {
+ connState.getResourceManager().commitLocal((LocalTx)state.getCurrentTxId(), conn);
+ }
+ finally
+ {
+ //Start new local tx
+ Object xid = connState.getResourceManager().createLocalTx();
+
+ state.setCurrentTxId(xid);
+ }
+
+ //TODO on commit we don't want to ACK any messages that have exceeded the max delivery count OR
+
+ return null;
}
-
- if (state.isXA())
- {
- throw new TransactionInProgressException("Cannot call commit on an XA session");
- }
-
- ConnectionState connState = (ConnectionState)state.getParent();
- ConnectionDelegate conn = (ConnectionDelegate)connState.getDelegate();
-
- try
- {
- connState.getResourceManager().commitLocal((LocalTx)state.getCurrentTxId(), conn);
- }
- finally
- {
- //Start new local tx
- Object xid = connState.getResourceManager().createLocalTx();
-
- state.setCurrentTxId(xid);
- }
-
- //TODO on commit we don't want to ACK any messages that have exceeded the max delivery count OR
-
- return null;
}
public Object handleRollback(Invocation invocation) throws Throwable
{
SessionState state = getState(invocation);
-
- if (!state.isTransacted())
- {
- throw new IllegalStateException("Cannot rollback a non-transacted session");
- }
-
- if (state.isXA())
- {
- throw new TransactionInProgressException("Cannot call rollback on an XA session");
- }
- ConnectionState connState = (ConnectionState)state.getParent();
- ResourceManager rm = connState.getResourceManager();
- try
+ synchronized (state)
{
- rm.rollbackLocal((LocalTx)state.getCurrentTxId());
+ if (!state.isTransacted())
+ {
+ throw new IllegalStateException("Cannot rollback a non-transacted session");
+ }
+
+ if (state.isXA())
+ {
+ throw new TransactionInProgressException("Cannot call rollback on an XA session");
+ }
+
+ ConnectionState connState = (ConnectionState)state.getParent();
+ ResourceManager rm = connState.getResourceManager();
+ try
+ {
+ rm.rollbackLocal((LocalTx)state.getCurrentTxId());
+ }
+ finally
+ {
+ // start new local tx
+ Object xid = rm.createLocalTx();
+ state.setCurrentTxId(xid);
+ }
+
+ return null;
}
- finally
- {
- // start new local tx
- Object xid = rm.createLocalTx();
- state.setCurrentTxId(xid);
- }
-
- return null;
}
public Object handleSend(Invocation invocation) throws Throwable
More information about the jboss-cvs-commits
mailing list