[jboss-cvs] JBoss Messaging SVN: r1802 - in trunk: src/main/org/jboss/jms/client src/main/org/jboss/jms/client/container src/main/org/jboss/jms/client/delegate src/main/org/jboss/jms/client/remoting src/main/org/jboss/jms/client/state src/main/org/jboss/jms/delegate src/main/org/jboss/jms/message src/main/org/jboss/jms/server src/main/org/jboss/jms/server/endpoint src/main/org/jboss/jms/server/endpoint/advised src/main/org/jboss/jms/server/remoting src/main/org/jboss/jms/tx tests tests/src/org/jboss/test/messaging/jms
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Fri Dec 15 16:28:38 EST 2006
Author: timfox
Date: 2006-12-15 16:28:18 -0500 (Fri, 15 Dec 2006)
New Revision: 1802
Added:
trunk/src/main/org/jboss/jms/server/endpoint/DefaultCancel.java
Removed:
trunk/src/main/org/jboss/jms/server/endpoint/Cancel.java
Modified:
trunk/src/main/org/jboss/jms/client/JBossConnectionConsumer.java
trunk/src/main/org/jboss/jms/client/JBossSession.java
trunk/src/main/org/jboss/jms/client/container/AsfAspect.java
trunk/src/main/org/jboss/jms/client/container/HAAspect.java
trunk/src/main/org/jboss/jms/client/container/SessionAspect.java
trunk/src/main/org/jboss/jms/client/container/TransactionAspect.java
trunk/src/main/org/jboss/jms/client/delegate/ClientSessionDelegate.java
trunk/src/main/org/jboss/jms/client/remoting/MessageCallbackHandler.java
trunk/src/main/org/jboss/jms/client/state/SessionState.java
trunk/src/main/org/jboss/jms/delegate/SessionDelegate.java
trunk/src/main/org/jboss/jms/message/MessageProxy.java
trunk/src/main/org/jboss/jms/server/ServerPeer.java
trunk/src/main/org/jboss/jms/server/endpoint/DeliveryInfo.java
trunk/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java
trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
trunk/src/main/org/jboss/jms/server/endpoint/SessionEndpoint.java
trunk/src/main/org/jboss/jms/server/endpoint/advised/SessionAdvised.java
trunk/src/main/org/jboss/jms/server/remoting/JMSWireFormat.java
trunk/src/main/org/jboss/jms/tx/ClientTransaction.java
trunk/tests/build.xml
trunk/tests/src/org/jboss/test/messaging/jms/AcknowledgementTest.java
trunk/tests/src/org/jboss/test/messaging/jms/ConnectionConsumerTest.java
trunk/tests/src/org/jboss/test/messaging/jms/WireFormatTest.java
Log:
Fixes for connection consumer
Modified: trunk/src/main/org/jboss/jms/client/JBossConnectionConsumer.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/JBossConnectionConsumer.java 2006-12-15 17:49:28 UTC (rev 1801)
+++ trunk/src/main/org/jboss/jms/client/JBossConnectionConsumer.java 2006-12-15 21:28:18 UTC (rev 1802)
@@ -304,7 +304,7 @@
for (int i = 0; i < mesList.size(); i++)
{
MessageProxy m = (MessageProxy)mesList.get(i);
- session.addAsfMessage(m, consumerID, channelID, maxDeliveries);
+ session.addAsfMessage(m, consumerID, channelID, maxDeliveries, sess);
if (trace) { log.trace("added " + m + " to session"); }
}
Modified: trunk/src/main/org/jboss/jms/client/JBossSession.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/JBossSession.java 2006-12-15 17:49:28 UTC (rev 1801)
+++ trunk/src/main/org/jboss/jms/client/JBossSession.java 2006-12-15 21:28:18 UTC (rev 1802)
@@ -502,9 +502,10 @@
* This method is used by the JBossConnectionConsumer to load up the session
* with messages to be processed by the session's run() method
*/
- void addAsfMessage(MessageProxy m, int consumerID, long channelID, int maxDeliveries)
+ void addAsfMessage(MessageProxy m, int consumerID, long channelID, int maxDeliveries,
+ SessionDelegate connectionConsumerSession)
{
- delegate.addAsfMessage(m, consumerID, channelID, maxDeliveries);
+ delegate.addAsfMessage(m, consumerID, channelID, maxDeliveries, connectionConsumerSession);
}
// Protected -----------------------------------------------------
Modified: trunk/src/main/org/jboss/jms/client/container/AsfAspect.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/container/AsfAspect.java 2006-12-15 17:49:28 UTC (rev 1801)
+++ trunk/src/main/org/jboss/jms/client/container/AsfAspect.java 2006-12-15 21:28:18 UTC (rev 1802)
@@ -34,7 +34,6 @@
import org.jboss.jms.client.remoting.MessageCallbackHandler;
import org.jboss.jms.client.state.SessionState;
import org.jboss.jms.delegate.ConnectionDelegate;
-import org.jboss.jms.delegate.ConsumerDelegate;
import org.jboss.jms.delegate.SessionDelegate;
import org.jboss.jms.destination.JBossDestination;
import org.jboss.jms.message.MessageProxy;
@@ -131,6 +130,7 @@
int theConsumerID = ((Integer)mi.getArguments()[1]).intValue();
long channelID = ((Long)mi.getArguments()[2]).longValue();
int maxDeliveries = ((Integer)mi.getArguments()[3]).intValue();
+ SessionDelegate connectionConsumerDelegate = ((SessionDelegate)mi.getArguments()[4]);
if (m == null)
{
@@ -142,6 +142,7 @@
holder.consumerID = theConsumerID;
holder.channelID = channelID;
holder.maxDeliveries = maxDeliveries;
+ holder.connectionConsumerDelegate = connectionConsumerDelegate;
msgs.add(holder);
@@ -154,6 +155,7 @@
MethodInvocation mi = (MethodInvocation)invocation;
+ //This is the delegate for the session from the pool
SessionDelegate del = (SessionDelegate)mi.getTargetObject();
int ackMode = getSessionState(invocation).getAcknowledgeMode();
@@ -166,7 +168,8 @@
MessageCallbackHandler.callOnMessage(del, sessionListener, holder.consumerID,
holder.channelID, false,
- holder.msg, ackMode, holder.maxDeliveries);
+ holder.msg, ackMode, holder.maxDeliveries,
+ holder.connectionConsumerDelegate);
}
return null;
@@ -191,5 +194,6 @@
private int consumerID;
private long channelID;
private int maxDeliveries;
+ private SessionDelegate connectionConsumerDelegate;
}
}
Modified: trunk/src/main/org/jboss/jms/client/container/HAAspect.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/container/HAAspect.java 2006-12-15 17:49:28 UTC (rev 1801)
+++ trunk/src/main/org/jboss/jms/client/container/HAAspect.java 2006-12-15 21:28:18 UTC (rev 1802)
@@ -23,12 +23,14 @@
package org.jboss.jms.client.container;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import javax.jms.JMSException;
+import javax.jms.Session;
import org.jboss.aop.joinpoint.Invocation;
import org.jboss.aop.joinpoint.MethodInvocation;
@@ -352,9 +354,7 @@
for(Iterator i = failedState.getChildren().iterator(); i.hasNext(); )
{
SessionState failedSessionState = (SessionState)i.next();
-
- if (trace) { log.trace("Failed session state has " + failedSessionState.getToAck().size() + " deliveries"); }
-
+
int oldSessionId = failedSessionState.getSessionId();
ClientSessionDelegate failedSessionDelegate =
@@ -367,7 +367,7 @@
SessionState newSessionState = (SessionState)newSessionDelegate.getState();
- if (trace) { log.trace("New session state has " + newSessionState.getToAck().size() + " deliveries"); }
+ if (trace) { log.trace("New session state has " + newSessionState.getClientAckList().size() + " deliveries"); }
oldNewSessionStateMap.put(new Integer(oldSessionId), failedSessionState);
@@ -418,33 +418,58 @@
SessionState state = (SessionState)iter.next();
List ackInfos = null;
-
- if (!state.isTransacted() && !state.isXA())
+
+ if (!state.isTransacted() ||
+ (state.isXA() && state.getCurrentTxId() == null))
{
- //Now we remove any unacked np messages - this is because we don't want to ack them
+ //Non transacted session or an XA session with no transaction set (it falls back to auto_ack)
+
+ if (trace) { log.trace("Session is not transacted (or XA with no tx set), retrieving deliveries from session state"); }
+
+
+ //we remove any unacked np messages - this is because we don't want to ack them
//since the server won't know about them and will barf
- Iterator iter2 = state.getToAck().iterator();
-
- if (trace) { log.trace("Removing any np deliveries"); }
-
- while (iter2.hasNext())
+ if (state.getAcknowledgeMode() == Session.CLIENT_ACKNOWLEDGE)
{
- DeliveryInfo info = (DeliveryInfo)iter2.next();
-
- if (!info.getMessageProxy().getMessage().isReliable())
+ Iterator iter2 = state.getClientAckList().iterator();
+
+ if (trace) { log.trace("Removing any np deliveries"); }
+
+ while (iter2.hasNext())
{
- iter2.remove();
-
- if (trace) { log.trace("Removed np delivery: " + info.getDeliveryId()); }
+ DeliveryInfo info = (DeliveryInfo)iter2.next();
+
+ if (!info.getMessageProxy().getMessage().isReliable())
+ {
+ iter2.remove();
+
+ if (trace) { log.trace("Removed np delivery: " + info.getDeliveryId()); }
+ }
}
+
+ ackInfos = state.getClientAckList();
}
+ else
+ {
+ DeliveryInfo autoAck = state.getAutoAckInfo();
+ if (autoAck != null)
+ {
+ if (!autoAck.getMessageProxy().getMessage().isReliable())
+ {
+ //unreliable
+ state.setAutoAckInfo(null);
+ ackInfos = Collections.EMPTY_LIST;
+ }
+ else
+ {
+ //reliable
+ ackInfos = new ArrayList();
+ ackInfos.add(autoAck);
+ }
+ }
+ }
- if (trace) { log.trace("Session is not transacted, retrieving deliveries from session state"); }
-
- //Get the ack infos from the list in the session state
- ackInfos = state.getToAck();
-
if (trace) { log.trace("Retrieved " + ackInfos.size() + " deliveries"); }
}
else
Modified: trunk/src/main/org/jboss/jms/client/container/SessionAspect.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/container/SessionAspect.java 2006-12-15 17:49:28 UTC (rev 1801)
+++ trunk/src/main/org/jboss/jms/client/container/SessionAspect.java 2006-12-15 21:28:18 UTC (rev 1802)
@@ -36,7 +36,7 @@
import org.jboss.jms.client.state.SessionState;
import org.jboss.jms.delegate.SessionDelegate;
import org.jboss.jms.message.MessageProxy;
-import org.jboss.jms.server.endpoint.Cancel;
+import org.jboss.jms.server.endpoint.DefaultCancel;
import org.jboss.jms.server.endpoint.DefaultAck;
import org.jboss.jms.server.endpoint.DeliveryInfo;
import org.jboss.logging.Logger;
@@ -68,6 +68,30 @@
// Public --------------------------------------------------------
+ private void ackDelivery(SessionDelegate sess, DeliveryInfo delivery) throws Exception
+ {
+ SessionDelegate connectionConsumerSession = delivery.getConnectionConsumerSession();
+
+ //If the delivery was obtained via a connection consumer we need to ack via that
+ //otherwise we just use this session
+
+ SessionDelegate sessionToUse = connectionConsumerSession != null ? connectionConsumerSession : sess;
+
+ sessionToUse.acknowledgeDelivery(delivery);
+ }
+
+ private void cancelDelivery(SessionDelegate sess, DeliveryInfo delivery) throws Exception
+ {
+ SessionDelegate connectionConsumerSession = delivery.getConnectionConsumerSession();
+
+ //If the delivery was obtained via a connection consumer we need to cancel via that
+ //otherwise we just use this session
+
+ SessionDelegate sessionToUse = connectionConsumerSession != null ? connectionConsumerSession : sess;
+
+ sessionToUse.cancelDelivery(new DefaultCancel(delivery.getDeliveryId(), delivery.getMessageProxy().getDeliveryCount()));
+ }
+
public Object handleClosing(Invocation invocation) throws Throwable
{
MethodInvocation mi = (MethodInvocation)invocation;
@@ -75,42 +99,51 @@
SessionDelegate del = (SessionDelegate)mi.getTargetObject();
int ackMode = state.getAcknowledgeMode();
-
- // select eligible acknowledgments
- List acks = new ArrayList();
- List cancels = new ArrayList();
- for(Iterator i = state.getToAck().iterator(); i.hasNext(); )
+
+ //We need to either ack (for auto_ack) or cancel (for client_ack)
+ //any deliveries - this is because the message listener might have closed
+ //before on message had finished executing
+
+ if (ackMode == Session.AUTO_ACKNOWLEDGE ||
+ ackMode == Session.DUPS_OK_ACKNOWLEDGE ||
+ (state.isXA() && state.getCurrentTxId() == null))
{
- DeliveryInfo ack = (DeliveryInfo)i.next();
- if (ackMode == Session.AUTO_ACKNOWLEDGE ||
- ackMode == Session.DUPS_OK_ACKNOWLEDGE)
+ //Acknowledge any outstanding auto ack
+
+ DeliveryInfo remainingAutoAck = state.getAutoAckInfo();
+
+ if (remainingAutoAck != null)
{
- acks.add(new DefaultAck(ack.getMessageProxy().getDeliveryId()));
+ if (trace) { log.trace(this + " handleClosing(). Found remaining auto ack. Will ack it " + remainingAutoAck.getDeliveryId()); }
+
+ ackDelivery(del, remainingAutoAck);
+
+ if (trace) { log.trace(this + " acked it"); }
+
+ state.setAutoAckInfo(null);
}
- else
+ }
+ else if (ackMode == Session.CLIENT_ACKNOWLEDGE)
+ {
+ // Cancel any oustanding deliveries
+ // We cancel any client ack or transactional, we do this explicitly so we can pass the updated
+ // delivery count information from client to server. We could just do this on the server but
+ // we would lose delivery count info.
+
+ //CLIENT_ACKNOWLEDGE cannot be used with MDBs so is always safe to cancel on this session
+
+ List cancels = new ArrayList();
+
+ for(Iterator i = state.getClientAckList().iterator(); i.hasNext(); )
{
- Cancel cancel = new Cancel(ack.getMessageProxy().getDeliveryId(), ack.getMessageProxy().getDeliveryCount());
+ DeliveryInfo ack = (DeliveryInfo)i.next();
+ DefaultCancel cancel = new DefaultCancel(ack.getMessageProxy().getDeliveryId(), ack.getMessageProxy().getDeliveryCount());
cancels.add(cancel);
}
- i.remove();
+
+ state.getClientAckList().clear();
}
- // On closing we acknowlege any AUTO_ACKNOWLEDGE or DUPS_OK_ACKNOWLEDGE, since the session
- // might have closed before the onMessage had finished executing.
- // We cancel any client ack or transactional, we do this explicitly so we can pass the updated
- // delivery count information from client to server. We could just do this on the server but
- // we would lose delivery count info.
-
- if (!acks.isEmpty())
- {
- del.acknowledgeBatch(acks);
- }
- if (!cancels.isEmpty())
- {
- log.info("Calling canceldeliveries: " + cancels.size());
- del.cancelDeliveries(cancels);
- }
-
return invocation.invokeNext();
}
@@ -134,28 +167,37 @@
int ackMode = state.getAcknowledgeMode();
- if (ackMode == Session.CLIENT_ACKNOWLEDGE ||
- ackMode == Session.AUTO_ACKNOWLEDGE ||
- ackMode == Session.DUPS_OK_ACKNOWLEDGE ||
- state.getCurrentTxId() == null)
+ Object[] args = mi.getArguments();
+ DeliveryInfo info = (DeliveryInfo)args[0];
+
+ if (ackMode == Session.CLIENT_ACKNOWLEDGE)
{
- // We collect acknowledgments (and not transact them) for CLIENT, AUTO and DUPS_OK, and
- // also for XA sessions not enlisted in a global transaction.
-
- // We store the ack in a list for later acknowledgement or recovery
-
- Object[] args = mi.getArguments();
- DeliveryInfo info = (DeliveryInfo)args[0];
-
- state.getToAck().add(info);
+ // We collect acknowledgments in the list
- if (trace)
- {
- SessionDelegate del = (SessionDelegate)mi.getTargetObject();
- log.trace("ack mode is " + Util.acknowledgmentModeToString(ackMode)+ ", acknowledged on " + del);
- }
+ if (trace) { log.trace(this + " delivery id: " + info.getDeliveryId() + " added to client ack list"); }
+
+ state.getClientAckList().add(info);
+
+ //We can return immediately
+ return null;
}
-
+ else if (ackMode == Session.AUTO_ACKNOWLEDGE ||
+ ackMode == Session.DUPS_OK_ACKNOWLEDGE ||
+ (state.isXA() && state.getCurrentTxId() == null))
+ {
+ //We collect the single acknowledgement in the state.
+ //Currently DUPS_OK is treated the same as AUTO_ACKNOWLDGE
+ //Also XA sessions not enlisted in a global tx are treated as AUTO_ACKNOWLEDGE
+
+ if (trace) { log.trace(this + " delivery id: " + info.getDeliveryId() + " added to client ack member"); }
+
+ state.setAutoAckInfo(info);
+
+ //We can return immediately
+ return null;
+ }
+
+ //Transactional - need to carry on down the stack
return invocation.invokeNext();
}
@@ -166,11 +208,13 @@
SessionState state = getState(invocation);
SessionDelegate del = (SessionDelegate)mi.getTargetObject();
- if (!state.getToAck().isEmpty())
- {
- del.acknowledgeBatch(state.getToAck());
+ if (!state.getClientAckList().isEmpty())
+ {
+ //CLIENT_ACKNOWLEDGE can't be used with a MDB so it is safe to always acknowledge all
+ //on this session (rather than the connection consumer session)
+ del.acknowledgeDeliveries(state.getClientAckList());
- state.getToAck().clear();
+ state.getClientAckList().clear();
}
return null;
@@ -192,42 +236,35 @@
if (ackMode == Session.AUTO_ACKNOWLEDGE ||
ackMode == Session.DUPS_OK_ACKNOWLEDGE ||
- (ackMode != Session.CLIENT_ACKNOWLEDGE && state.getCurrentTxId() == null))
+ (state.isXA() && state.getCurrentTxId() == null))
{
- // We acknowledge immediately on a non-transacted session that does not want to
- // CLIENT_ACKNOWLEDGE, or an XA session not enrolled in a global transaction.
-
+ //We auto acknowledge
+ //Currently DUPS_OK is treated the same as AUTO_ACKNOWLDGE
+ //Also XA sessions not enlisted in a global tx are treated as AUTO_ACKNOWLEDGE
+
SessionDelegate sd = (SessionDelegate)mi.getTargetObject();
if (!state.isRecoverCalled())
{
- if (trace) { log.trace("acknowledging NON-transactionally"); }
-
- List acks = state.getToAck();
+ DeliveryInfo deliveryInfo = state.getAutoAckInfo();
- // Sanity check
- if (acks.size() != 1)
+ if (deliveryInfo == null)
{
- throw new IllegalStateException("Should only be one entry in list. " +
- "There are " + acks.size());
+ throw new IllegalStateException("Cannot find delivery to auto ack");
}
-
- DeliveryInfo ack = (DeliveryInfo)acks.get(0);
-
+
+ if (trace) { log.trace(this + " auto acking delivery " + deliveryInfo.getDeliveryId()); }
+
if (cancel)
{
- List cancels = new ArrayList();
- Cancel c = new Cancel(ack.getMessageProxy().getDeliveryId(), ack.getMessageProxy().getDeliveryCount());
- cancels.add(c);
- sd.cancelDeliveries(cancels);
+ cancelDelivery(sd, deliveryInfo);
}
else
{
- sd.acknowledge(ack);
+ ackDelivery(sd, deliveryInfo);
}
- //TODO we can optimise this for the auto_ack case (i.e. not store in list and have to clear each time)
- state.getToAck().clear();
+ state.setAutoAckInfo(null);
}
else
{
@@ -251,9 +288,7 @@
SessionState state = getState(invocation);
- int ackMode = state.getAcknowledgeMode();
-
- if (ackMode == Session.SESSION_TRANSACTED)
+ if (state.isTransacted())
{
throw new IllegalStateException("Cannot recover a transacted session");
}
@@ -263,9 +298,27 @@
//Call redeliver
SessionDelegate del = (SessionDelegate)mi.getTargetObject();
- del.redeliver(state.getToAck());
+ if (state.getAcknowledgeMode() == Session.CLIENT_ACKNOWLEDGE)
+ {
+ del.redeliver(state.getClientAckList());
+
+ state.getClientAckList().clear();
+ }
+ else
+ {
+ DeliveryInfo info = state.getAutoAckInfo();
+
+ if (info != null)
+ {
+ List redels = new ArrayList();
- state.getToAck().clear();
+ redels.add(info);
+
+ del.redeliver(redels);
+
+ state.setAutoAckInfo(null);
+ }
+ }
state.setRecoverCalled(true);
@@ -297,9 +350,7 @@
* was called.
*/
public Object handleRedeliver(Invocation invocation) throws Throwable
- {
- if (trace) { log.trace("redeliver called"); }
-
+ {
MethodInvocation mi = (MethodInvocation)invocation;
SessionState state = getState(invocation);
@@ -307,8 +358,11 @@
// JMSRedelivered to true.
List toRedeliver = (List)mi.getArguments()[0];
- LinkedList toCancel = new LinkedList();
+ if (trace) { log.trace(this + " handleRedeliver() called: " + toRedeliver); }
+
+ SessionDelegate del = (SessionDelegate)mi.getTargetObject();
+
// Need to be recovered in reverse order.
for (int i = toRedeliver.size() - 1; i >= 0; i--)
{
@@ -319,25 +373,17 @@
if (handler == null)
{
- // This is ok. The original consumer has closed, this message wil get cancelled back
- // to the channel.
- Cancel cancel = new Cancel(info.getMessageProxy().getDeliveryId(), info.getMessageProxy().getDeliveryCount());
- toCancel.addFirst(cancel);
+ // This is ok. The original consumer has closed, so we cancel the message
+
+ cancelDelivery(del, info);
}
else
{
+ if (trace) { log.trace("Adding proxy back to front of buffer"); }
handler.addToFrontOfBuffer(proxy);
}
}
-
- if (!toCancel.isEmpty())
- {
- // Cancel the messages that can't be redelivered locally
-
- SessionDelegate del = (SessionDelegate)mi.getTargetObject();
- del.cancelDeliveries(toCancel);
- }
-
+
return null;
}
Modified: trunk/src/main/org/jboss/jms/client/container/TransactionAspect.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/container/TransactionAspect.java 2006-12-15 17:49:28 UTC (rev 1801)
+++ trunk/src/main/org/jboss/jms/client/container/TransactionAspect.java 2006-12-15 21:28:18 UTC (rev 1802)
@@ -27,6 +27,7 @@
import org.jboss.aop.joinpoint.Invocation;
import org.jboss.aop.joinpoint.MethodInvocation;
+import org.jboss.jms.client.delegate.ClientSessionDelegate;
import org.jboss.jms.client.delegate.DelegateSupport;
import org.jboss.jms.client.state.ConnectionState;
import org.jboss.jms.client.state.HierarchicalState;
@@ -194,7 +195,16 @@
if (trace) { log.trace("sending acknowlegment transactionally, queueing on resource manager"); }
- connState.getResourceManager().addAck(txID, state.getSessionId(), info);
+ //If the ack is for a delivery that came through via a connection consumer then we
+ //use the connectionConsumer session as the session id, otherwise we use this sessions'
+ //session id
+
+ ClientSessionDelegate connectionConsumerDelegate =
+ (ClientSessionDelegate)info.getConnectionConsumerSession();
+
+ int sessionId = connectionConsumerDelegate != null ? connectionConsumerDelegate.getID() : state.getSessionId();
+
+ connState.getResourceManager().addAck(txID, sessionId, info);
}
return null;
Modified: trunk/src/main/org/jboss/jms/client/delegate/ClientSessionDelegate.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/delegate/ClientSessionDelegate.java 2006-12-15 17:49:28 UTC (rev 1801)
+++ trunk/src/main/org/jboss/jms/client/delegate/ClientSessionDelegate.java 2006-12-15 21:28:18 UTC (rev 1802)
@@ -44,6 +44,7 @@
import org.jboss.jms.message.StreamMessageProxy;
import org.jboss.jms.message.TextMessageProxy;
import org.jboss.jms.server.endpoint.Ack;
+import org.jboss.jms.server.endpoint.Cancel;
import org.jboss.jms.server.endpoint.DeliveryInfo;
import org.jboss.remoting.Client;
@@ -112,7 +113,7 @@
* This invocation should either be handled by the client-side interceptor chain or by the
* server-side endpoint.
*/
- public void acknowledge(Ack ack) throws JMSException
+ public void acknowledgeDelivery(Ack ack) throws JMSException
{
throw new IllegalStateException("This invocation should not be handled here!");
}
@@ -121,7 +122,7 @@
* This invocation should either be handled by the client-side interceptor chain or by the
* server-side endpoint.
*/
- public void acknowledgeBatch(List acks) throws JMSException
+ public void acknowledgeDeliveries(List acks) throws JMSException
{
throw new IllegalStateException("This invocation should not be handled here!");
}
@@ -405,7 +406,8 @@
* This invocation should either be handled by the client-side interceptor chain or by the
* server-side endpoint.
*/
- public void addAsfMessage(MessageProxy m, int consumerID, long channelId, int maxDeliveries)
+ public void addAsfMessage(MessageProxy m, int consumerID, long channelId, int maxDeliveries,
+ SessionDelegate connectionConsumerSession)
{
throw new IllegalStateException("This invocation should not be handled here!");
}
@@ -423,7 +425,7 @@
* This invocation should either be handled by the client-side interceptor chain or by the
* server-side endpoint.
*/
- public void cancelDeliveries(List ackInfos)
+ public void cancelDeliveries(List cancels)
{
throw new IllegalStateException("This invocation should not be handled here!");
}
@@ -432,6 +434,15 @@
* This invocation should either be handled by the client-side interceptor chain or by the
* server-side endpoint.
*/
+ public void cancelDelivery(Cancel cancel)
+ {
+ throw new IllegalStateException("This invocation should not be handled here!");
+ }
+
+ /**
+ * This invocation should either be handled by the client-side interceptor chain or by the
+ * server-side endpoint.
+ */
public void recoverDeliveries(List ackInfos) throws JMSException
{
throw new IllegalStateException("This invocation should not be handled here!");
Modified: trunk/src/main/org/jboss/jms/client/remoting/MessageCallbackHandler.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/remoting/MessageCallbackHandler.java 2006-12-15 17:49:28 UTC (rev 1801)
+++ trunk/src/main/org/jboss/jms/client/remoting/MessageCallbackHandler.java 2006-12-15 21:28:18 UTC (rev 1802)
@@ -34,7 +34,7 @@
import org.jboss.jms.delegate.ConsumerDelegate;
import org.jboss.jms.delegate.SessionDelegate;
import org.jboss.jms.message.MessageProxy;
-import org.jboss.jms.server.endpoint.Cancel;
+import org.jboss.jms.server.endpoint.DefaultCancel;
import org.jboss.jms.server.endpoint.DeliveryInfo;
import org.jboss.logging.Logger;
import org.jboss.messaging.util.Future;
@@ -65,6 +65,7 @@
trace = log.isTraceEnabled();
}
+ //This is static so it can be called by the asf layer too
public static void callOnMessage(SessionDelegate sess,
MessageListener listener,
int consumerID,
@@ -72,10 +73,16 @@
boolean isConnectionConsumer,
MessageProxy m,
int ackMode,
- int maxDeliveries)
+ int maxDeliveries,
+ SessionDelegate connectionConsumerSession)
throws JMSException
{
- preDeliver(sess, consumerID, channelID, m, isConnectionConsumer);
+ // If this is the callback-handler for a connection consumer we don't want to acknowledge or
+ // add anything to the tx for this session.
+ if (!isConnectionConsumer)
+ {
+ sess.preDeliver(new DeliveryInfo(m, consumerID, channelID, connectionConsumerSession));
+ }
int tries = 0;
@@ -136,36 +143,16 @@
if (!sess.isClosed())
{
// postDeliver only if the session is not closed
- postDeliver(sess, isConnectionConsumer, cancel);
+
+ // If this is the callback-handler for a connection consumer we don't want to acknowledge or
+ // add anything to the tx for this session
+ if (!isConnectionConsumer)
+ {
+ sess.postDeliver(cancel);
+ }
}
}
- protected static void preDeliver(SessionDelegate sess,
- int consumerID,
- long channelID,
- MessageProxy m,
- boolean isConnectionConsumer)
- throws JMSException
- {
- // If this is the callback-handler for a connection consumer we don't want to acknowledge or
- // add anything to the tx for this session.
- if (!isConnectionConsumer)
- {
- sess.preDeliver(new DeliveryInfo(m, consumerID, channelID));
- }
- }
-
- protected static void postDeliver(SessionDelegate sess, boolean isConnectionConsumer,
- boolean cancel) throws JMSException
- {
- // If this is the callback-handler for a connection consumer we don't want to acknowledge or
- // add anything to the tx for this session
- if (!isConnectionConsumer)
- {
- sess.postDeliver(cancel);
- }
- }
-
// Attributes ----------------------------------------------------
private LinkedList buffer;
@@ -331,7 +318,7 @@
for(Iterator i = buffer.iterator(); i.hasNext();)
{
MessageProxy mp = (MessageProxy)i.next();
- Cancel ack = new Cancel(mp.getDeliveryId(), mp.getDeliveryCount());
+ DefaultCancel ack = new DefaultCancel(mp.getDeliveryId(), mp.getDeliveryCount());
cancels.add(ack);
}
@@ -450,9 +437,15 @@
// If message is expired we still call pre and post deliver. This makes sure the
// message is acknowledged so it gets removed from the queue/subscription.
- preDeliver(sessionDelegate, consumerID, channelID, m, isConnectionConsumer);
+
+ if (!isConnectionConsumer)
+ {
+ sessionDelegate.preDeliver(new DeliveryInfo(m, consumerID, channelID, null));
+
+ sessionDelegate.postDeliver(false);
+ }
- postDeliver(sessionDelegate, isConnectionConsumer, false);
+ //postDeliver(sessionDelegate, isConnectionConsumer, false);
if (!m.getMessage().isExpired())
{
@@ -731,7 +724,7 @@
{
try
{
- callOnMessage(sessionDelegate, listener, consumerID, channelID, false, mp, ackMode, maxDeliveries);
+ callOnMessage(sessionDelegate, listener, consumerID, channelID, false, mp, ackMode, maxDeliveries, null);
}
catch (JMSException e)
{
Modified: trunk/src/main/org/jboss/jms/client/state/SessionState.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/state/SessionState.java 2006-12-15 17:49:28 UTC (rev 1801)
+++ trunk/src/main/org/jboss/jms/client/state/SessionState.java 2006-12-15 21:28:18 UTC (rev 1802)
@@ -32,6 +32,7 @@
import org.jboss.jms.client.remoting.MessageCallbackHandler;
import org.jboss.jms.delegate.SessionDelegate;
import org.jboss.jms.server.Version;
+import org.jboss.jms.server.endpoint.DeliveryInfo;
import org.jboss.jms.tx.MessagingXAResource;
import org.jboss.logging.Logger;
@@ -68,14 +69,16 @@
private boolean recoverCalled;
- // List<AckInfo>
- private List toAck;
+ // List<DeliveryInfo>
+ private List ClientAckList;
+
+ private DeliveryInfo autoAckInfo;
private ConnectionState parent;
private SessionDelegate delegate;
- private Map callbackHandlers;
+ private Map callbackHandlers;
public SessionState(ConnectionState parent, ClientSessionDelegate delegate,
boolean transacted, int ackMode, boolean xa)
@@ -106,7 +109,7 @@
executor = new QueuedExecutor(new LinkedQueue());
- toAck = new ArrayList();
+ ClientAckList = new ArrayList();
// TODO could optimise this to use the same map of callbackmanagers (which holds refs
// to callbackhandlers) in the connection, instead of maintaining another map
@@ -137,11 +140,25 @@
/**
* @return List<AckInfo>
*/
- public List getToAck()
+ public List getClientAckList()
{
- return toAck;
+ return ClientAckList;
}
+ public DeliveryInfo getAutoAckInfo()
+ {
+ return autoAckInfo;
+ }
+
+ public void setAutoAckInfo(DeliveryInfo info)
+ {
+ if (info != null && autoAckInfo != null)
+ {
+ throw new IllegalStateException("There is already a delivery set for auto ack");
+ }
+ autoAckInfo = info;
+ }
+
public int getAcknowledgeMode()
{
return acknowledgeMode;
Modified: trunk/src/main/org/jboss/jms/delegate/SessionDelegate.java
===================================================================
--- trunk/src/main/org/jboss/jms/delegate/SessionDelegate.java 2006-12-15 17:49:28 UTC (rev 1801)
+++ trunk/src/main/org/jboss/jms/delegate/SessionDelegate.java 2006-12-15 21:28:18 UTC (rev 1802)
@@ -78,7 +78,8 @@
XAResource getXAResource();
- void addAsfMessage(MessageProxy m, int consumerID, long channelID, int maxDeliveries);
+ void addAsfMessage(MessageProxy m, int consumerID, long channelID,
+ int maxDeliveries, SessionDelegate connectionConsumerDelegate);
boolean getTransacted();
Modified: trunk/src/main/org/jboss/jms/message/MessageProxy.java
===================================================================
--- trunk/src/main/org/jboss/jms/message/MessageProxy.java 2006-12-15 17:49:28 UTC (rev 1801)
+++ trunk/src/main/org/jboss/jms/message/MessageProxy.java 2006-12-15 21:28:18 UTC (rev 1802)
@@ -65,6 +65,8 @@
// Attributes ----------------------------------------------------
+ //The actual session delegate for the message - needed for doing recovery
+ //so we can recover locally
private transient SessionDelegate delegate;
private transient boolean cc;
Modified: trunk/src/main/org/jboss/jms/server/ServerPeer.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/ServerPeer.java 2006-12-15 17:49:28 UTC (rev 1801)
+++ trunk/src/main/org/jboss/jms/server/ServerPeer.java 2006-12-15 21:28:18 UTC (rev 1802)
@@ -37,6 +37,7 @@
import org.jboss.jms.server.connectionfactory.ConnectionFactoryJNDIMapper;
import org.jboss.jms.server.connectionmanager.SimpleConnectionManager;
import org.jboss.jms.server.connectormanager.SimpleConnectorManager;
+import org.jboss.jms.server.endpoint.ServerSessionEndpoint;
import org.jboss.jms.server.plugin.contract.JMSUserManager;
import org.jboss.jms.server.remoting.JMSServerInvocationHandler;
import org.jboss.jms.server.remoting.JMSWireFormat;
@@ -65,6 +66,8 @@
import org.jboss.system.ServiceMBeanSupport;
import org.w3c.dom.Element;
+import EDU.oswego.cs.dl.util.concurrent.ConcurrentReaderHashMap;
+
/**
* A JMS server peer.
*
@@ -112,6 +115,8 @@
private long failoverStartTimeout = 3000;
private long failoverCompleteTimeout = 12000;
+
+ private Map sessions;
// wired components
@@ -159,9 +164,11 @@
version = Version.instance();
failoverStatusLock = new Object();
+
+ sessions = new ConcurrentReaderHashMap();
started = false;
- }
+ }
// ServiceMBeanSupport overrides ---------------------------------
@@ -566,6 +573,24 @@
}
// Public --------------------------------------------------------
+
+ public ServerSessionEndpoint getSession(Integer sessionID)
+ {
+ return (ServerSessionEndpoint)sessions.get(sessionID);
+ }
+
+ public void addSession(Integer id, ServerSessionEndpoint session)
+ {
+ sessions.put(id, session);
+ }
+
+ public void removeSession(Integer id)
+ {
+ if (sessions.remove(id) == null)
+ {
+ throw new IllegalStateException("Cannot find session with id " + id + " to remove");
+ }
+ }
public Queue getDLQ() throws Exception
{
Deleted: trunk/src/main/org/jboss/jms/server/endpoint/Cancel.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/Cancel.java 2006-12-15 17:49:28 UTC (rev 1801)
+++ trunk/src/main/org/jboss/jms/server/endpoint/Cancel.java 2006-12-15 21:28:18 UTC (rev 1802)
@@ -1,105 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005, JBoss Inc., and individual contributors as indicated
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-package org.jboss.jms.server.endpoint;
-
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-
-import org.jboss.messaging.util.Streamable;
-
-/**
- *
- * A Cancel.
- *
- * Used to send a cancel (NACK) to the server
- *
- * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- * @version <tt>$Revision: 1.1 $</tt>
- *
- * $Id$
- *
- */
-public class Cancel implements Streamable
-{
- // Constants -----------------------------------------------------
-
- // Attributes ----------------------------------------------------
-
- private long deliveryId;
-
- private int deliveryCount;
-
- // Static --------------------------------------------------------
-
- // Constructors --------------------------------------------------
-
- public Cancel()
- {
- }
-
- public Cancel(long deliveryId, int deliveryCount)
- {
- this.deliveryId = deliveryId;
-
- this.deliveryCount = deliveryCount;
- }
-
- // Public --------------------------------------------------------
-
- public long getDeliveryId()
- {
- return deliveryId;
- }
-
- public int getDeliveryCount()
- {
- return deliveryCount;
- }
-
- // Streamable implementation -------------------------------------
-
- public void read(DataInputStream in) throws Exception
- {
- deliveryId = in.readLong();
-
- deliveryCount = in.readInt();
- }
-
- public void write(DataOutputStream out) throws Exception
- {
- out.writeLong(deliveryId);
-
- out.writeInt(deliveryCount);
- }
-
- // Class YYY overrides -------------------------------------------
-
- // Protected -----------------------------------------------------
-
- // Package Private -----------------------------------------------
-
- // Private -------------------------------------------------------
-
- // Inner Classes -------------------------------------------------
-
-}
-
Copied: trunk/src/main/org/jboss/jms/server/endpoint/DefaultCancel.java (from rev 1798, trunk/src/main/org/jboss/jms/server/endpoint/Cancel.java)
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/Cancel.java 2006-12-15 08:59:37 UTC (rev 1798)
+++ trunk/src/main/org/jboss/jms/server/endpoint/DefaultCancel.java 2006-12-15 21:28:18 UTC (rev 1802)
@@ -0,0 +1,85 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.jms.server.endpoint;
+
+
+/**
+ *
+ * A Cancel.
+ *
+ * Used to send a cancel (NACK) to the server
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @version <tt>$Revision: 1.1 $</tt>
+ *
+ * $Id$
+ *
+ */
+public class DefaultCancel implements Cancel
+{
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ private long deliveryId;
+
+ private int deliveryCount;
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ public DefaultCancel()
+ {
+ }
+
+ public DefaultCancel(long deliveryId, int deliveryCount)
+ {
+ this.deliveryId = deliveryId;
+
+ this.deliveryCount = deliveryCount;
+ }
+
+ // Public --------------------------------------------------------
+
+ public long getDeliveryId()
+ {
+ return deliveryId;
+ }
+
+ public int getDeliveryCount()
+ {
+ return deliveryCount;
+ }
+
+ // Class YYY overrides -------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Package Private -----------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner Classes -------------------------------------------------
+
+}
+
Modified: trunk/src/main/org/jboss/jms/server/endpoint/DeliveryInfo.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/DeliveryInfo.java 2006-12-15 17:49:28 UTC (rev 1801)
+++ trunk/src/main/org/jboss/jms/server/endpoint/DeliveryInfo.java 2006-12-15 21:28:18 UTC (rev 1802)
@@ -21,6 +21,7 @@
*/
package org.jboss.jms.server.endpoint;
+import org.jboss.jms.delegate.SessionDelegate;
import org.jboss.jms.message.MessageProxy;
/**
@@ -48,18 +49,29 @@
private MessageProxy msg;
+ //When using the evil abomination known as a ConnectionConsumer, the connection consumer
+ //will get from a session that it created, then pass them onto sessions got from the pool
+ //this means when the messages are acked/cancelled then this needs to be done against
+ //the connection consumer's session not the session from the pool, since that session won't know
+ //about the deliveries on the server side
+ //Therefore if this delivery was done using a connection consumer then this attribute is set
+ //to the connection consumer's session, otherwise it will be null
+ private SessionDelegate connectionConsumerSession;
+
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
-
- public DeliveryInfo(MessageProxy msg, int consumerId, long channelID)
+ public DeliveryInfo(MessageProxy msg, int consumerId, long channelID,
+ SessionDelegate connectionConsumerSession)
{
this.msg = msg;
this.consumerId = consumerId;
this.channelID = channelID;
+
+ this.connectionConsumerSession = connectionConsumerSession;
}
// Public --------------------------------------------------------
@@ -79,6 +91,11 @@
return msg;
}
+ public SessionDelegate getConnectionConsumerSession()
+ {
+ return connectionConsumerSession;
+ }
+
// Ack Implementation -------------------------------------------
Modified: trunk/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java 2006-12-15 17:49:28 UTC (rev 1801)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java 2006-12-15 21:28:18 UTC (rev 1802)
@@ -189,7 +189,11 @@
SessionAdvised sessionAdvised = new SessionAdvised(ep);
- JMSDispatcher.instance.registerTarget(new Integer(sessionID), sessionAdvised);
+ Integer iSessionID = new Integer(sessionID);
+
+ serverPeer.addSession(iSessionID, ep);
+
+ JMSDispatcher.instance.registerTarget(iSessionID, sessionAdvised);
ClientSessionDelegate d = new ClientSessionDelegate(sessionID);
@@ -649,8 +653,14 @@
List acks = sessionState.getAcks();
- ServerSessionEndpoint session = (ServerSessionEndpoint)sessions.get(new Integer(sessionState.getSessionId()));
+ //We need to lookup the session in a global map maintained on the server peer.
+ //We can't just assume it's one of the sessions in the connection.
+ //This is because in the case of a connection consumer, the message might be delivered through one
+ //connection and the transaction committed/rolledback through another.
+ //ConnectionConsumers suck.
+ ServerSessionEndpoint session = serverPeer.getSession(new Integer(sessionState.getSessionId()));
+
session.acknowledgeTransactionally(acks, tx);
}
}
Modified: trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java 2006-12-15 17:49:28 UTC (rev 1801)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java 2006-12-15 21:28:18 UTC (rev 1802)
@@ -83,8 +83,17 @@
import EDU.oswego.cs.dl.util.concurrent.SynchronizedLong;
/**
- * Concrete implementation of SessionEndpoint.
+ * The server side representation of a JMS session.
*
+ * A user must not invoke methods of a session concurrently on different threads, however
+ * there are situations where multiple threads may access this object concurrently, for instance:
+ *
+ * A session can be closed when it's connection is closed by the user which might be called on a different thread
+ * A session can be closed when the server determines the connection is dead.
+ * If the session represents a connection consumer's session then the connection consumer will farm off
+ * messages to different sessions obtained from a pool, these may then cancel/ack etc on different threads, but
+ * the acks/cancels/etc will end up back here on the connection consumer session instance.
+ *
* @author <a href="mailto:ovidiu at jboss.org">Ovidiu Feodorov</a>
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
* @author <a href="mailto:clebert.suconic at jboss.com">Clebert Suconic</a>
@@ -109,6 +118,8 @@
private volatile boolean closed;
private ServerConnectionEndpoint connectionEndpoint;
+
+ private ServerPeer sp;
private Map consumers;
private Map browsers;
@@ -140,7 +151,7 @@
this.connectionEndpoint = connectionEndpoint;
- ServerPeer sp = connectionEndpoint.getServerPeer();
+ sp = connectionEndpoint.getServerPeer();
pm = sp.getPersistenceManagerInstance();
ms = sp.getMessageStore();
@@ -327,24 +338,31 @@
}
}
- public void acknowledgeBatch(List acks) throws JMSException
- {
+ public void acknowledgeDelivery(Ack ack) throws JMSException
+ {
try
{
+ acknowledgeDeliveryInternal(ack);
+ }
+ catch (Throwable t)
+ {
+ throw ExceptionUtil.handleJMSInvocation(t, this + " acknowledge");
+ }
+ }
+
+ public void acknowledgeDeliveries(List acks) throws JMSException
+ {
+ if (trace) {log.trace(this + " acknowledgeDeliveries " + acks); }
+
+ try
+ {
Iterator iter = acks.iterator();
while (iter.hasNext())
{
Ack ack = (Ack)iter.next();
- Delivery del = (Delivery)deliveries.remove(new Long(ack.getDeliveryId()));
-
- if (del == null)
- {
- throw new IllegalStateException("Cannot find delivery to acknowledge: " + ack.getDeliveryId());
- }
-
- del.acknowledge(null);
+ acknowledgeDeliveryInternal(ack);
}
}
catch (Throwable t)
@@ -352,107 +370,46 @@
throw ExceptionUtil.handleJMSInvocation(t, this + " acknowledgeBatch");
}
}
-
- public void acknowledge(Ack ack) throws JMSException
+
+ public void cancelDelivery(Cancel cancel) throws JMSException
{
+ if (trace) {log.trace(this + " cancelDelivery " + cancel); }
+
try
{
- Delivery del = (Delivery)deliveries.remove(new Long(ack.getDeliveryId()));
+ Delivery del = cancelDeliveryInternal(cancel);
- if (del == null)
- {
- throw new IllegalStateException("Cannot find delivery to acknowledge: " + ack.getDeliveryId());
- }
-
- del.acknowledge(null);
+ //Prompt delivery
+ ((Channel)del.getObserver()).deliver(false);
}
catch (Throwable t)
{
- throw ExceptionUtil.handleJMSInvocation(t, this + " acknowledge");
+ throw ExceptionUtil.handleJMSInvocation(t, this + " cancelDelivery");
}
- }
+
+ }
public void cancelDeliveries(List cancels) throws JMSException
{
+ if (trace) {log.trace(this + " cancelDeliveries " + cancels); }
+
try
{
// deliveries must be cancelled in reverse order
-
- List forDLQ = null;
-
+
Set channels = new HashSet();
for (int i = cancels.size() - 1; i >= 0; i--)
{
- Cancel cancel = (Cancel)cancels.get(i);
+ Cancel cancel = (Cancel)cancels.get(i);
- Delivery del = (Delivery)deliveries.remove(new Long(cancel.getDeliveryId()));
+ if (trace) { log.trace("Cancelling delivery " + cancel.getDeliveryId()); }
- if (del == null)
- {
- throw new IllegalStateException("Cannot find delivery to cancel " + cancel.getDeliveryId());
- }
-
- if (cancel.getDeliveryCount() >= maxDeliveryAttempts)
- {
- if (forDLQ == null)
- {
- forDLQ = new ArrayList();
- }
-
- forDLQ.add(del);
- }
- else
- {
- del.getReference().setDeliveryCount(cancel.getDeliveryCount());
-
- del.cancel();
-
- channels.add(del.getObserver());
- }
+ Delivery del = cancelDeliveryInternal(cancel);
+
+ channels.add(del.getObserver());
}
-
- //Send stuff to DLQ
-
- if (forDLQ != null)
- {
- //We do this in a tx so we don't end up with the message in both the original queue
- //and the dlq if it fails half way through
- Transaction tx = tr.createTransaction();
-
- try
- {
- for (int i = forDLQ.size() - 1; i >= 0; i--)
- {
- Delivery del = (Delivery)forDLQ.get(i);
- if (dlq != null)
- {
- //reset delivery count to zero
- del.getReference().setDeliveryCount(0);
-
- dlq.handle(null, del.getReference(), tx);
-
- del.acknowledge(tx);
- }
- else
- {
- log.warn("Cannot send to DLQ since DLQ has not been deployed! The message will be removed");
-
- del.acknowledge(tx);
- }
- }
-
- tx.commit();
- }
- catch (Throwable t)
- {
- tx.rollback();
-
- throw t;
- }
- }
-
// need to prompt delivery for all affected channels
promptDelivery(channels);
@@ -767,21 +724,7 @@
}
}
}
-
-// void promptDeliveryOnConsumers()
-// {
-// if (trace) { log.trace(this + " promptDeliveryOnConsumers(), there are " + consumers.size() + " consumers"); }
-// synchronized (consumers)
-// {
-// for (Iterator i = consumers.values().iterator(); i.hasNext(); )
-// {
-// ServerConsumerEndpoint consumer = (ServerConsumerEndpoint)i.next();
-//
-// consumer.promptDelivery();
-// }
-// }
-// }
-
+
void localClose() throws Throwable
{
if (closed)
@@ -853,6 +796,8 @@
promptDelivery(channels);
deliveries.clear();
+
+ sp.removeSession(new Integer(id));
JMSDispatcher.instance.unregisterTarget(new Integer(id));
@@ -940,7 +885,75 @@
// Protected -----------------------------------------------------
// Private -------------------------------------------------------
+
+ private void acknowledgeDeliveryInternal(Ack ack) throws Throwable
+ {
+ if (trace) { log.trace("Acknowledging delivery " + ack.getDeliveryId()); }
+ Delivery del = (Delivery)deliveries.remove(new Long(ack.getDeliveryId()));
+
+ if (del == null)
+ {
+ throw new IllegalStateException("Cannot find delivery to acknowledge: " + ack.getDeliveryId());
+ }
+
+ del.acknowledge(null);
+ }
+
+ private Delivery cancelDeliveryInternal(Cancel cancel) throws Throwable
+ {
+ Delivery del = (Delivery)deliveries.remove(new Long(cancel.getDeliveryId()));
+
+ if (del == null)
+ {
+ throw new IllegalStateException("Cannot find delivery to cancel " + cancel.getDeliveryId());
+ }
+
+ if (cancel.getDeliveryCount() >= maxDeliveryAttempts)
+ {
+ //Send to DLQ
+
+ //We do this in a tx so we don't end up with the message in both the original queue
+ //and the dlq if it fails half way through
+ Transaction tx = tr.createTransaction();
+
+ try
+ {
+ if (dlq != null)
+ {
+ //reset delivery count to zero
+ del.getReference().setDeliveryCount(0);
+
+ dlq.handle(null, del.getReference(), tx);
+
+ del.acknowledge(tx);
+ }
+ else
+ {
+ log.warn("Cannot send to DLQ since DLQ has not been deployed! The message will be removed");
+
+ del.acknowledge(tx);
+ }
+
+ tx.commit();
+ }
+ catch (Throwable t)
+ {
+ tx.rollback();
+
+ throw t;
+ }
+ }
+ else
+ {
+ del.getReference().setDeliveryCount(cancel.getDeliveryCount());
+
+ del.cancel();
+ }
+
+ return del;
+ }
+
private ConsumerDelegate failoverConsumer(JBossDestination jmsDestination,
String selectorString,
boolean noLocal, String subscriptionName,
@@ -1277,6 +1290,8 @@
}
+
+
// Inner classes -------------------------------------------------
/**
Modified: trunk/src/main/org/jboss/jms/server/endpoint/SessionEndpoint.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/SessionEndpoint.java 2006-12-15 17:49:28 UTC (rev 1801)
+++ trunk/src/main/org/jboss/jms/server/endpoint/SessionEndpoint.java 2006-12-15 21:28:18 UTC (rev 1802)
@@ -68,20 +68,33 @@
JBossTopic createTopic(String topicName) throws JMSException;
/**
- * Acknowledge a batch of messages - used with client acknowledge or dups_ok acknowledge
+ * Acknowledge a list of deliveries
* @param ackInfos
* @throws JMSException
*/
- void acknowledgeBatch(List deliveryIds) throws JMSException;
+ void acknowledgeDeliveries(List deliveryIds) throws JMSException;
/**
- * Acknowledge a message - used for auto acknowledge
+ * Acknowledge a delivery
* @param deliveryId
* @throws JMSException
*/
- void acknowledge(Ack ack) throws JMSException;
+ void acknowledgeDelivery(Ack ack) throws JMSException;
/**
+ * Cancel a list of deliveries.
+ * @param ackInfos
+ */
+ void cancelDeliveries(List cancelInfos) throws JMSException;
+
+ /**
+ * Cancel a delivery
+ * @param cancel
+ * @throws JMSException
+ */
+ void cancelDelivery(Cancel cancel) throws JMSException;
+
+ /**
* Add a temporary destination.
*/
void addTemporaryDestination(JBossDestination destination) throws JMSException;
@@ -108,14 +121,6 @@
void send(JBossMessage message) throws JMSException;
/**
- * Cancel some deliveries.
- * This used at consumer close to cancel any undelivered messages left in the client buffer
- * or at session recovery to cancel any messages that couldn't be redelivered locally
- * @param ackInfos
- */
- void cancelDeliveries(List cancelInfos) throws JMSException;
-
- /**
* Send delivery info to the server so the delivery lists can be repopulated
* used at failover
* @param ackInfos
Modified: trunk/src/main/org/jboss/jms/server/endpoint/advised/SessionAdvised.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/advised/SessionAdvised.java 2006-12-15 17:49:28 UTC (rev 1801)
+++ trunk/src/main/org/jboss/jms/server/endpoint/advised/SessionAdvised.java 2006-12-15 21:28:18 UTC (rev 1802)
@@ -32,6 +32,7 @@
import org.jboss.jms.destination.JBossTopic;
import org.jboss.jms.message.JBossMessage;
import org.jboss.jms.server.endpoint.Ack;
+import org.jboss.jms.server.endpoint.Cancel;
import org.jboss.jms.server.endpoint.SessionEndpoint;
/**
@@ -102,14 +103,14 @@
return endpoint.createTopic(topicName);
}
- public void acknowledgeBatch(List acks) throws JMSException
+ public void acknowledgeDeliveries(List acks) throws JMSException
{
- endpoint.acknowledgeBatch(acks);
+ endpoint.acknowledgeDeliveries(acks);
}
- public void acknowledge(Ack ack) throws JMSException
+ public void acknowledgeDelivery(Ack ack) throws JMSException
{
- endpoint.acknowledge(ack);
+ endpoint.acknowledgeDelivery(ack);
}
public void addTemporaryDestination(JBossDestination destination) throws JMSException
@@ -132,6 +133,11 @@
endpoint.cancelDeliveries(ackInfos);
}
+ public void cancelDelivery(Cancel cancel) throws JMSException
+ {
+ endpoint.cancelDelivery(cancel);
+ }
+
public void recoverDeliveries(List ackInfos) throws JMSException
{
endpoint.recoverDeliveries(ackInfos);
Modified: trunk/src/main/org/jboss/jms/server/remoting/JMSWireFormat.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/remoting/JMSWireFormat.java 2006-12-15 17:49:28 UTC (rev 1801)
+++ trunk/src/main/org/jboss/jms/server/remoting/JMSWireFormat.java 2006-12-15 21:28:18 UTC (rev 1802)
@@ -46,6 +46,7 @@
import org.jboss.jms.server.endpoint.Cancel;
import org.jboss.jms.server.endpoint.ClientDelivery;
import org.jboss.jms.server.endpoint.DefaultAck;
+import org.jboss.jms.server.endpoint.DefaultCancel;
import org.jboss.jms.server.endpoint.DeliveryRecovery;
import org.jboss.jms.tx.TransactionRequest;
import org.jboss.logging.Logger;
@@ -92,14 +93,15 @@
protected static final byte SERIALIZED = 0;
protected static final byte ACKNOWLEDGE = 1;
- protected static final byte ACKNOWLEDGE_BATCH = 2;
- protected static final byte SEND = 3;
- protected static final byte CANCEL_DELIVERIES = 4;
- protected static final byte MORE = 5;
- protected static final byte SEND_TRANSACTION = 6;
- protected static final byte GET_ID_BLOCK = 7;
- protected static final byte RECOVER_DELIVERIES = 8;
- protected static final byte CONFIRM_DELIVERY = 9;
+ protected static final byte ACKNOWLEDGE_LIST = 2;
+ protected static final byte CANCEL = 3;
+ protected static final byte CANCEL_LIST = 4;
+ protected static final byte SEND = 5;
+ protected static final byte MORE = 6;
+ protected static final byte SEND_TRANSACTION = 7;
+ protected static final byte GET_ID_BLOCK = 8;
+ protected static final byte RECOVER_DELIVERIES = 9;
+ protected static final byte CONFIRM_DELIVERY = 10;
// The response codes - start from 100
@@ -231,7 +233,7 @@
if (trace) { log.trace("wrote activate()"); }
}
- else if ("acknowledge".equals(methodName))
+ else if ("acknowledgeDelivery".equals(methodName))
{
dos.writeByte(ACKNOWLEDGE);
@@ -243,11 +245,11 @@
dos.flush();
- if (trace) { log.trace("wrote acknowledge()"); }
+ if (trace) { log.trace("wrote acknowledgeDelivery()"); }
}
- else if ("acknowledgeBatch".equals(methodName))
+ else if ("acknowledgeDeliveries".equals(methodName))
{
- dos.writeByte(ACKNOWLEDGE_BATCH);
+ dos.writeByte(ACKNOWLEDGE_LIST);
writeHeader(mi, dos);
@@ -265,39 +267,27 @@
dos.flush();
- if (trace) { log.trace("wrote acknowledge()"); }
+ if (trace) { log.trace("wrote acknowledgeDeliveries()"); }
}
- else if ("sendTransaction".equals(methodName))
+ else if ("cancelDelivery".equals(methodName))
{
- dos.writeByte(SEND_TRANSACTION);
+ dos.writeByte(CANCEL);
writeHeader(mi, dos);
-
- TransactionRequest request = (TransactionRequest)mi.getArguments()[0];
-
- request.write(dos);
-
+
+ Cancel cancel = (Cancel)mi.getArguments()[0];
+
+ dos.writeLong(cancel.getDeliveryId());
+
+ dos.writeInt(cancel.getDeliveryCount());
+
dos.flush();
- if (trace) { log.trace("wrote getMessageNow()"); }
+ if (trace) { log.trace("wrote cancelDelivery()"); }
}
- else if ("getIdBlock".equals(methodName))
- {
- dos.writeByte(GET_ID_BLOCK);
-
- writeHeader(mi, dos);
-
- int size = ((Integer)mi.getArguments()[0]).intValue();
-
- dos.writeInt(size);
-
- dos.flush();
-
- if (trace) { log.trace("wrote getIdBlock()"); }
- }
else if ("cancelDeliveries".equals(methodName) && mi.getArguments() != null)
{
- dos.writeByte(CANCEL_DELIVERIES);
+ dos.writeByte(CANCEL_LIST);
writeHeader(mi, dos);
@@ -310,7 +300,10 @@
while (iter.hasNext())
{
Cancel cancel = (Cancel)iter.next();
- cancel.write(dos);
+
+ dos.writeLong(cancel.getDeliveryId());
+
+ dos.writeInt(cancel.getDeliveryCount());
}
dos.flush();
@@ -353,6 +346,35 @@
if (trace) { log.trace("wrote confirmDelivery()"); }
}
+ else if ("sendTransaction".equals(methodName))
+ {
+ dos.writeByte(SEND_TRANSACTION);
+
+ writeHeader(mi, dos);
+
+ TransactionRequest request = (TransactionRequest)mi.getArguments()[0];
+
+ request.write(dos);
+
+ dos.flush();
+
+ if (trace) { log.trace("wrote getMessageNow()"); }
+ }
+ else if ("getIdBlock".equals(methodName))
+ {
+ dos.writeByte(GET_ID_BLOCK);
+
+ writeHeader(mi, dos);
+
+ int size = ((Integer)mi.getArguments()[0]).intValue();
+
+ dos.writeInt(size);
+
+ dos.flush();
+
+ if (trace) { log.trace("wrote getIdBlock()"); }
+ }
+
else
{
dos.write(SERIALIZED);
@@ -669,11 +691,11 @@
new InvocationRequest(null, ServerPeer.REMOTING_JMS_SUBSYSTEM,
new MessagingMarshallable(version, mi), null, null, null);
- if (trace) { log.trace("read acknowledge()"); }
+ if (trace) { log.trace("read acknowledgeDelivery()"); }
return request;
}
- case ACKNOWLEDGE_BATCH:
+ case ACKNOWLEDGE_LIST:
{
MethodInvocation mi = readHeader(dis);
@@ -696,24 +718,46 @@
new InvocationRequest(null, ServerPeer.REMOTING_JMS_SUBSYSTEM,
new MessagingMarshallable(version, mi), null, null, null);
- if (trace) { log.trace("read acknowledge()"); }
+ if (trace) { log.trace("read acknowledgeDeliveries()"); }
return request;
}
- case CANCEL_DELIVERIES:
+ case CANCEL:
{
MethodInvocation mi = readHeader(dis);
+
+ long deliveryId = dis.readLong();
+
+ int deliveryCount = dis.readInt();
+
+ Object[] args = new Object[] {new DefaultCancel(deliveryId, deliveryCount)};
+ mi.setArguments(args);
+
+ InvocationRequest request =
+ new InvocationRequest(null, ServerPeer.REMOTING_JMS_SUBSYSTEM,
+ new MessagingMarshallable(version, mi), null, null, null);
+
+ if (trace) { log.trace("read cancelDelivery()"); }
+
+ return request;
+ }
+ case CANCEL_LIST:
+ {
+ MethodInvocation mi = readHeader(dis);
+
int size = dis.readInt();
List acks = new ArrayList(size);
for (int i = 0; i < size; i++)
- {
- Cancel cancel = new Cancel();
+ {
+ long deliveryId = dis.readLong();
- cancel.read(dis);
+ int deliveryCount = dis.readInt();
+ DefaultCancel cancel = new DefaultCancel(deliveryId, deliveryCount);
+
acks.add(cancel);
}
Modified: trunk/src/main/org/jboss/jms/tx/ClientTransaction.java
===================================================================
--- trunk/src/main/org/jboss/jms/tx/ClientTransaction.java 2006-12-15 17:49:28 UTC (rev 1801)
+++ trunk/src/main/org/jboss/jms/tx/ClientTransaction.java 2006-12-15 21:28:18 UTC (rev 1802)
@@ -25,6 +25,7 @@
import java.io.DataOutputStream;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
@@ -139,7 +140,7 @@
}
else
{
- return sessionStatesMap.values();
+ return sessionStatesMap == null ? Collections.emptySet() : sessionStatesMap.values();
}
}
Modified: trunk/tests/build.xml
===================================================================
--- trunk/tests/build.xml 2006-12-15 17:49:28 UTC (rev 1801)
+++ trunk/tests/build.xml 2006-12-15 21:28:18 UTC (rev 1802)
@@ -759,7 +759,7 @@
<include name="**/jms/clustering/*Test.class"/>
<include name="org/jboss/test/messaging/util/ServerManagementTest.class"/>
-->
- <include name="**/jms/clustering/HATest.class"/>
+ <include name="**/jms/clustering/*Test.class"/>
</fileset>
</batchtest>
</junit>
Modified: trunk/tests/src/org/jboss/test/messaging/jms/AcknowledgementTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/AcknowledgementTest.java 2006-12-15 17:49:28 UTC (rev 1801)
+++ trunk/tests/src/org/jboss/test/messaging/jms/AcknowledgementTest.java 2006-12-15 21:28:18 UTC (rev 1802)
@@ -894,6 +894,8 @@
count++;
TextMessage tm = (TextMessage)m;
+
+ log.trace("Got message: " + tm.getText());
// Receive first three messages then recover() session
// Only last message should be redelivered
Modified: trunk/tests/src/org/jboss/test/messaging/jms/ConnectionConsumerTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/ConnectionConsumerTest.java 2006-12-15 17:49:28 UTC (rev 1801)
+++ trunk/tests/src/org/jboss/test/messaging/jms/ConnectionConsumerTest.java 2006-12-15 21:28:18 UTC (rev 1802)
@@ -239,6 +239,84 @@
if (connConsumer != null) connProducer.close();
}
}
+
+
+
+ public void testRedeliveryTransactedDifferentConnection() throws Exception
+ {
+ if (ServerManagement.isRemote()) return;
+
+ Connection connConnectionConsumer = null;
+
+ Connection connConsumer = null;
+
+ Connection connProducer = null;
+
+ try
+ {
+ connConsumer = cf.createConnection();
+
+ connConsumer.start();
+
+ Session sessCons = connConsumer.createSession(true, Session.SESSION_TRANSACTED);
+
+ RedelMessageListener listener = new RedelMessageListener(sessCons);
+
+ sessCons.setMessageListener(listener);
+
+ ServerSessionPool pool = new MockServerSessionPool(sessCons);
+
+ connConnectionConsumer = cf.createConnection();
+
+ connConnectionConsumer.start();
+
+ JBossConnectionConsumer cc = (JBossConnectionConsumer)connConnectionConsumer.createConnectionConsumer(queue, null, pool, 1);
+
+ log.trace("Started connection consumer");
+
+ connProducer = cf.createConnection();
+
+ Session sessProd = connProducer.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageProducer prod = sessProd.createProducer(queue);
+
+ TextMessage m1 = sessProd.createTextMessage("a");
+ TextMessage m2 = sessProd.createTextMessage("b");
+ TextMessage m3 = sessProd.createTextMessage("c");
+ prod.send(m1);
+ prod.send(m2);
+ prod.send(m3);
+
+
+ log.trace("Sent messages");
+
+ //Wait for messages
+
+ listener.waitForLatch(10000);
+
+ if (listener.failed)
+ {
+ fail ("Didn't receive correct messages");
+ }
+
+ cc.close();
+
+ log.trace("Closed connection consumer");
+
+ connProducer.close();
+ connProducer = null;
+ connConsumer.close();
+ connConsumer = null;
+ connConnectionConsumer.close();
+ connConnectionConsumer = null;
+
+ }
+ finally
+ {
+ if (connConsumer != null) connConsumer.close();
+ if (connConsumer != null) connProducer.close();
+ if (connConnectionConsumer != null) connConnectionConsumer.close();
+ }
+ }
public void testCloseWhileProcessing() throws Exception
{
Modified: trunk/tests/src/org/jboss/test/messaging/jms/WireFormatTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/WireFormatTest.java 2006-12-15 17:49:28 UTC (rev 1801)
+++ trunk/tests/src/org/jboss/test/messaging/jms/WireFormatTest.java 2006-12-15 21:28:18 UTC (rev 1802)
@@ -48,6 +48,7 @@
import org.jboss.jms.server.endpoint.Cancel;
import org.jboss.jms.server.endpoint.ClientDelivery;
import org.jboss.jms.server.endpoint.DefaultAck;
+import org.jboss.jms.server.endpoint.DefaultCancel;
import org.jboss.jms.server.endpoint.DeliveryInfo;
import org.jboss.jms.server.remoting.JMSWireFormat;
import org.jboss.jms.server.remoting.MessagingMarshallable;
@@ -93,10 +94,12 @@
protected Method sendMethod;
- protected Method acknowledgeMethod;
+ protected Method acknowledgeDeliveryMethod;
- protected Method acknowledgeBatchMethod;
+ protected Method acknowledgeDeliveriesMethod;
+ protected Method cancelDeliveryMethod;
+
protected Method cancelDeliveriesMethod;
//Consumer
@@ -135,10 +138,12 @@
sendMethod = sessionDelegate.getMethod("send", new Class[] { JBossMessage.class });
- acknowledgeMethod = sessionDelegate.getMethod("acknowledge", new Class[] { Ack.class });
+ acknowledgeDeliveryMethod = sessionDelegate.getMethod("acknowledgeDelivery", new Class[] { Ack.class });
- acknowledgeBatchMethod = sessionDelegate.getMethod("acknowledgeBatch", new Class[] { java.util.List.class });
+ acknowledgeDeliveriesMethod = sessionDelegate.getMethod("acknowledgeDeliveries", new Class[] { java.util.List.class });
+ cancelDeliveryMethod = sessionDelegate.getMethod("cancelDelivery", new Class[] { Cancel.class });
+
cancelDeliveriesMethod = sessionDelegate.getMethod("cancelDeliveries", new Class[] { java.util.List.class });
//TODO - this isn't complete - there are other methods to test
@@ -161,16 +166,26 @@
//Session
- public void testAcknowledge() throws Exception
+ public void testAcknowledgeDelivery() throws Exception
{
- wf.testAcknowledge();
+ wf.testAcknowledgeDelivery();
}
- public void testAcknowledgeBatch() throws Exception
+ public void testAcknowledgeDeliveries() throws Exception
{
- wf.testAcknowledgeBatch();
+ wf.testAcknowledgeDeliveries();
}
+ public void testCancelDelivery() throws Exception
+ {
+ wf.testCancelDelivery();
+ }
+
+ public void testCancelDeliveries() throws Exception
+ {
+ wf.testCancelDeliveries();
+ }
+
public void testSend() throws Exception
{
wf.testSend();
@@ -183,10 +198,6 @@
wf.testMore();
}
- public void testCancelDeliveries() throws Exception
- {
- wf.testCancelDeliveries();
- }
//Connection
@@ -260,13 +271,13 @@
*/
class TestWireFormat extends JMSWireFormat
{
- public void testAcknowledge() throws Exception
+ public void testAcknowledgeDelivery() throws Exception
{
long methodHash = 62365354;
int objectId = 54321;
- MethodInvocation mi = new MethodInvocation(null, methodHash, acknowledgeMethod, acknowledgeMethod, null);
+ MethodInvocation mi = new MethodInvocation(null, methodHash, acknowledgeDeliveryMethod, acknowledgeDeliveryMethod, null);
mi.getMetaData().addMetaData(Dispatcher.DISPATCHER, Dispatcher.OID, new Integer(objectId));
@@ -347,13 +358,13 @@
}
- public void testAcknowledgeBatch() throws Exception
+ public void testAcknowledgeDeliveries() throws Exception
{
long methodHash = 62365354;
int objectId = 54321;
- MethodInvocation mi = new MethodInvocation(null, methodHash, acknowledgeBatchMethod, acknowledgeBatchMethod, null);
+ MethodInvocation mi = new MethodInvocation(null, methodHash, acknowledgeDeliveriesMethod, acknowledgeDeliveriesMethod, null);
mi.getMetaData().addMetaData(Dispatcher.DISPATCHER, Dispatcher.OID, new Integer(objectId));
@@ -396,7 +407,7 @@
assertEquals(77, dis.readByte());
//First byte should be ACKNOWLEDGE
- assertEquals(JMSWireFormat.ACKNOWLEDGE_BATCH, dis.readByte());
+ assertEquals(JMSWireFormat.ACKNOWLEDGE_LIST, dis.readByte());
//Next int should be objectId
assertEquals(objectId, dis.readInt());
@@ -456,7 +467,104 @@
}
+ public void testCancelDelivery() throws Exception
+ {
+ long methodHash = 6236354;
+
+ int objectId = 543271;
+
+ MethodInvocation mi = new MethodInvocation(null, methodHash, cancelDeliveryMethod, cancelDeliveryMethod, null);
+
+ mi.getMetaData().addMetaData(Dispatcher.DISPATCHER, Dispatcher.OID, new Integer(objectId));
+
+ long deliveryID = 765;
+
+ int deliveryCount = 12;
+
+ Cancel cancel = new DefaultCancel(deliveryID, deliveryCount);
+
+ Object[] args = new Object[] { cancel };
+
+ mi.setArguments(args);
+
+ MessagingMarshallable mm = new MessagingMarshallable((byte)77, mi);
+
+ InvocationRequest ir = new InvocationRequest(null, null, mm, null, null, null);
+
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+
+ OutputStream oos = new DataOutputStream(bos);
+
+ wf.write(ir, oos);
+
+ oos.flush();
+
+ byte[] bytes = bos.toByteArray();
+
+ ByteArrayInputStream bis = new ByteArrayInputStream(bytes);
+
+ DataInputStream dis = new DataInputStream(bis);
+
+ //Check the bytes
+
+ //First byte should be version
+ assertEquals(77, dis.readByte());
+
+ //First byte should be CANCEL
+ assertEquals(JMSWireFormat.CANCEL, dis.readByte());
+
+ //Next int should be objectId
+ assertEquals(objectId, dis.readInt());
+
+ //Next long should be methodHash
+ assertEquals(methodHash, dis.readLong());
+
+ //Next should be the deliveryid
+ long l = dis.readLong();
+
+ //Then delivery count
+ int count = dis.readInt();
+
+ assertEquals(deliveryID, l);
+
+ assertEquals(deliveryCount, count);
+
+ //Now eos
+ try
+ {
+ dis.readByte();
+ fail("End of stream expected");
+ }
+ catch (EOFException e)
+ {
+ //Ok
+ }
+
+ bis.reset();
+
+ InputStream ois = new DataInputStream(bis);
+
+ InvocationRequest ir2 = (InvocationRequest)wf.read(ois, null);
+
+ mm = (MessagingMarshallable)ir2.getParameter();
+
+ assertEquals(77, mm.getVersion());
+
+ MethodInvocation mi2 = (MethodInvocation)mm.getLoad();
+
+ assertEquals(methodHash, mi2.getMethodHash());
+
+ assertEquals(objectId, ((Integer)mi2.getMetaData().getMetaData(Dispatcher.DISPATCHER, Dispatcher.OID)).intValue());
+
+ Cancel l2 = (Cancel)mi2.getArguments()[0];
+
+ assertEquals(deliveryID, l2.getDeliveryId());
+
+ assertEquals(deliveryCount, l2.getDeliveryCount());
+
+ }
+
/*
* Test that general serializable invocation requests are marshalled correctky
*/
@@ -729,7 +837,7 @@
MessageProxy proxy = JBossMessage.createThinDelegate(deliveryId, m, deliveryCount);
- DeliveryInfo info = new DeliveryInfo(proxy, 76762, 98982);
+ DeliveryInfo info = new DeliveryInfo(proxy, 76762, 98982, null);
int sessionId = 8787;
@@ -862,8 +970,8 @@
List cancels = new ArrayList();
- Cancel cancel1 = new Cancel(65654, 43);
- Cancel cancel2 = new Cancel(65765, 2);
+ DefaultCancel cancel1 = new DefaultCancel(65654, 43);
+ DefaultCancel cancel2 = new DefaultCancel(65765, 2);
cancels.add(cancel1);
cancels.add(cancel2);
@@ -897,7 +1005,7 @@
assertEquals(77, dis.readByte());
//Next byte should be CANCEL_MESSAGES
- assertEquals(JMSWireFormat.CANCEL_DELIVERIES, dis.readByte());
+ assertEquals(JMSWireFormat.CANCEL_LIST, dis.readByte());
//Next int should be objectId
assertEquals(objectId, dis.readInt());
@@ -912,14 +1020,14 @@
assertEquals(2, size);
//then the AckInfos
- Cancel rcancel1 = new Cancel();
+ long deliveryId = dis.readLong();
+ int deliveryCount = dis.readInt();
+ DefaultCancel rcancel1 = new DefaultCancel(deliveryId, deliveryCount);
- Cancel rcancel2 = new Cancel();
-
- rcancel1.read(dis);
-
- rcancel2.read(dis);
-
+ deliveryId = dis.readLong();
+ deliveryCount = dis.readInt();
+ DefaultCancel rcancel2 = new DefaultCancel(deliveryId, deliveryCount);
+
assertEquals(cancel1.getDeliveryCount(), rcancel1.getDeliveryCount());
assertEquals(cancel1.getDeliveryId(), cancel1.getDeliveryId());
@@ -961,8 +1069,8 @@
assertEquals(2, list.size());
- Cancel xack1 = (Cancel)list.get(0);
- Cancel xack2 = (Cancel)list.get(1);
+ DefaultCancel xack1 = (DefaultCancel)list.get(0);
+ DefaultCancel xack2 = (DefaultCancel)list.get(1);
assertEquals(cancel1.getDeliveryId(), xack1.getDeliveryId());
More information about the jboss-cvs-commits
mailing list