[Jboss-cvs] JBoss Messaging SVN: r1229 - in branches/Branch_1_0: src/main/org/jboss/jms/client/container src/main/org/jboss/jms/client/remoting src/main/org/jboss/jms/server/endpoint src/main/org/jboss/jms/tx src/main/org/jboss/messaging/core tests/src/org/jboss/test/messaging/jms
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Tue Aug 22 17:07:13 EDT 2006
Author: ovidiu.feodorov at jboss.com
Date: 2006-08-22 17:07:07 -0400 (Tue, 22 Aug 2006)
New Revision: 1229
Modified:
branches/Branch_1_0/src/main/org/jboss/jms/client/container/ConsumerAspect.java
branches/Branch_1_0/src/main/org/jboss/jms/client/container/SessionAspect.java
branches/Branch_1_0/src/main/org/jboss/jms/client/container/TransactionAspect.java
branches/Branch_1_0/src/main/org/jboss/jms/client/remoting/MessageCallbackHandler.java
branches/Branch_1_0/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
branches/Branch_1_0/src/main/org/jboss/jms/tx/AckInfo.java
branches/Branch_1_0/src/main/org/jboss/jms/tx/ResourceManager.java
branches/Branch_1_0/src/main/org/jboss/messaging/core/ChannelSupport.java
branches/Branch_1_0/tests/src/org/jboss/test/messaging/jms/TransactedSessionTest.java
Log:
minor reformatting
Modified: branches/Branch_1_0/src/main/org/jboss/jms/client/container/ConsumerAspect.java
===================================================================
--- branches/Branch_1_0/src/main/org/jboss/jms/client/container/ConsumerAspect.java 2006-08-22 18:43:19 UTC (rev 1228)
+++ branches/Branch_1_0/src/main/org/jboss/jms/client/container/ConsumerAspect.java 2006-08-22 21:07:07 UTC (rev 1229)
@@ -101,18 +101,17 @@
public Object handleClosing(Invocation invocation) throws Throwable
{
- //First we make sure closing is called on the ServerConsumerEndpoint
- //This ensures that any in transit messages are flushed out to the client side
+ // First we make sure closing is called on the ServerConsumerEndpoint. This ensures that any
+ // in-transit messages are flushed out to the client side.
+
Object res = invocation.invokeNext();
ConsumerState consumerState = getState(invocation);
-
SessionState sessionState = (SessionState)consumerState.getParent();
-
ConnectionState connectionState = (ConnectionState)sessionState.getParent();
- //Then we call close on the messagecallbackhandler which waits for onMessage invocations
- //to complete and then cancels anything in the client buffer
+ // Then we call close on the messagecallbackhandler which waits for onMessage invocations
+ // to complete and then cancels anything in the client buffer.
consumerState.getMessageCallbackHandler().close();
sessionState.removeCallbackHandler(consumerState.getMessageCallbackHandler());
Modified: branches/Branch_1_0/src/main/org/jboss/jms/client/container/SessionAspect.java
===================================================================
--- branches/Branch_1_0/src/main/org/jboss/jms/client/container/SessionAspect.java 2006-08-22 18:43:19 UTC (rev 1228)
+++ branches/Branch_1_0/src/main/org/jboss/jms/client/container/SessionAspect.java 2006-08-22 21:07:07 UTC (rev 1229)
@@ -201,29 +201,33 @@
return null;
}
- /*
+ /**
* Redelivery occurs in two situations:
+ *
* 1) When session.recover() is called (JMS1.1 4.4.11)
+ *
* "A session's recover method is used to stop a session and restart it with its first
- * unacknowledged message. In effect, the session's series of delivered messages
- * is reset to the point after its last acknowledged message."
- * An important note here is that session recovery is LOCAL to the session.
- * Session recovery DOES NOT result in delivered messages being cancelled back
- * to the channel where they can be redelivered - since that may result in them being
- * picked up by another session, which would break the semantics of recovery as described
- * in the spec.
- * 2) When session rollback occurs (JMS1.1 4.4.7)
- * On rollback of a session the spec is clear that session recovery occurs:
- * "If a transaction rollback is done, its produced messages
- * are destroyed and its consumed messages are automatically recovered. For
- * more information on session recovery, see Section 4.4.11 'Message
- * Acknowledgment.'"
- * So on rollback we do session recovery (local redelivery) in the same as if
- * session.recover() was called.
+ * unacknowledged message. In effect, the session's series of delivered messages is reset to the
+ * point after its last acknowledged message."
+ *
+ * An important note here is that session recovery is LOCAL to the session. Session recovery DOES
+ * NOT result in delivered messages being cancelled back to the channel where they can be
+ * redelivered - since that may result in them being picked up by another session, which would
+ * break the semantics of recovery as described in the spec.
+ *
+ * 2) When session rollback occurs (JMS1.1 4.4.7). On rollback of a session the spec is clear
+ * that session recovery occurs:
+ *
+ * "If a transaction rollback is done, its produced messages are destroyed and its consumed
+ * messages are automatically recovered. For more information on session recovery, see Section
+ * 4.4.11 'Message Acknowledgment.'"
+ *
+ * So on rollback we do session recovery (local redelivery) in the same as if session.recover()
+ * was called.
*
- * There is a conflict here though. It seems a CTS test requires messages to be available to OTHER
- * sessions on rollback - see CTSMiscellaneousTest.testContestedQueueOnRollback()
- * Which seems in direct contradiction to the spec.
+ * There is a conflict here though. It seems a CTS test requires messages to be available to
+ * OTHER sessions on rollback - see CTSMiscellaneousTest.testContestedQueueOnRollback(), which
+ * seems in direct contradiction to the spec.
*
* In order to satisfy the test, on session recovery, if there are no local consumers available
* to consume the message, we cancel the message back to the channel.
@@ -233,39 +237,32 @@
if (trace) { log.trace("redeliver called"); }
MethodInvocation mi = (MethodInvocation)invocation;
-
SessionState state = getState(invocation);
- //We put the messages back in the front of their appropriate consumer buffers and
- //set JMSRedelivered to true
+ // We put the messages back in the front of their appropriate consumer buffers and set
+ // JMSRedelivered to true.
List toRedeliver = (List)mi.getArguments()[0];
-
LinkedList toCancel = new LinkedList();
- //Need to be recovered in reverse order
+ // Need to be recovered in reverse order.
for (int i = toRedeliver.size() - 1; i >= 0; i--)
{
AckInfo info = (AckInfo)toRedeliver.get(i);
-
MessageProxy proxy = info.getMessage();
-
proxy.setJMSRedelivered(true);
- //TODO delivery count although optional should be global
- //so we need to send it back to the server
- //but this has performance hit so perhaps we just don't support it?
+ //TODO delivery count although optional should be global so we need to send it back to the
+ // server but this has performance hit so perhaps we just don't support it?
proxy.incDeliveryCount();
MessageCallbackHandler handler = state.getCallbackHandler(info.getConsumerID());
if (handler == null)
{
- // This is ok.
-
- // The original consumer has closed, this message wil get cancelled back to the channel.
-
- toCancel.addFirst(info);
+ // This is ok. The original consumer has closed, this message wil get cancelled back
+ // to the channel.
+ toCancel.addFirst(info);
}
else
{
@@ -275,10 +272,9 @@
if (!toCancel.isEmpty())
{
- //Cancel the messages that can't be redelivered locally
+ // Cancel the messages that can't be redelivered locally
SessionDelegate del = (SessionDelegate)mi.getTargetObject();
-
del.cancelDeliveries(toCancel);
}
Modified: branches/Branch_1_0/src/main/org/jboss/jms/client/container/TransactionAspect.java
===================================================================
--- branches/Branch_1_0/src/main/org/jboss/jms/client/container/TransactionAspect.java 2006-08-22 18:43:19 UTC (rev 1228)
+++ branches/Branch_1_0/src/main/org/jboss/jms/client/container/TransactionAspect.java 2006-08-22 21:07:07 UTC (rev 1229)
@@ -36,6 +36,7 @@
import org.jboss.jms.tx.AckInfo;
import org.jboss.jms.tx.LocalTx;
import org.jboss.jms.tx.TxState;
+import org.jboss.jms.tx.ResourceManager;
/**
* This aspect handles transaction related logic
@@ -49,183 +50,183 @@
public class TransactionAspect
{
// Constants -----------------------------------------------------
-
+
// Attributes ----------------------------------------------------
-
+
// Static --------------------------------------------------------
-
+
// Constructors --------------------------------------------------
-
+
// Public --------------------------------------------------------
-
+
public Object handleClose(Invocation invocation) throws Throwable
{
Object res = invocation.invokeNext();
-
+
SessionState state = (SessionState)getState(invocation);
-
+
ConnectionState connState = (ConnectionState)state.getParent();
-
+
Object xid = state.getCurrentTxId();
-
+
if (xid != null)
{
//Remove transaction from the resource manager
connState.getResourceManager().removeTx(xid);
- }
-
+ }
+
return res;
}
-
+
public Object handleCommit(Invocation invocation) throws Throwable
{
SessionState state = (SessionState)getState(invocation);
-
+
if (!state.isTransacted())
{
throw new IllegalStateException("Cannot commit a non-transacted session");
}
-
+
if (state.isXA())
{
throw new TransactionInProgressException("Cannot call commit on an XA session");
}
-
+
ConnectionState connState = (ConnectionState)state.getParent();
ConnectionDelegate conn = (ConnectionDelegate)connState.getDelegate();
-
+
try
- {
+ {
connState.getResourceManager().commitLocal((LocalTx)state.getCurrentTxId(), conn);
}
finally
{
//Start new local tx
Object xid = connState.getResourceManager().createLocalTx();
-
+
state.setCurrentTxId(xid);
}
-
+
return null;
}
-
+
public Object handleRollback(Invocation invocation) throws Throwable
{
SessionState state = (SessionState)getState(invocation);
-
+
if (!state.isTransacted())
{
throw new IllegalStateException("Cannot rollback a non-transacted session");
}
-
+
if (state.isXA())
{
throw new TransactionInProgressException("Cannot call rollback on an XA session");
}
-
+
ConnectionState connState = (ConnectionState)state.getParent();
+ ResourceManager rm = connState.getResourceManager();
ConnectionDelegate conn = (ConnectionDelegate)connState.getDelegate();
-
- TxState tx = connState.getResourceManager().getTx(state.getCurrentTxId());
-
+
+ TxState tx = rm.getTx(state.getCurrentTxId());
+
if (tx == null)
{
throw new IllegalStateException("Cannot find tx:" + state.getCurrentTxId());
}
-
+
try
{
- connState.getResourceManager().rollbackLocal((LocalTx)state.getCurrentTxId(), conn);
+ rm.rollbackLocal((LocalTx)state.getCurrentTxId(), conn);
}
finally
{
- //Start new local tx
- Object xid = connState.getResourceManager().createLocalTx();
-
+ // start new local tx
+ Object xid = rm.createLocalTx();
state.setCurrentTxId(xid);
- }
-
- return null;
+ }
+
+ return null;
}
-
+
public Object handleSend(Invocation invocation) throws Throwable
{
SessionState sessionState = (SessionState)getState(invocation);
-
+
if (sessionState.isTransacted())
{
//Session is transacted - so we add message to tx instead of sending now
-
+
Object txID = sessionState.getCurrentTxId();
-
+
if (txID == null)
- {
+ {
throw new IllegalStateException("Attempt to send message in tx, but txId is null, XA?" + sessionState.isXA());
}
-
+
ConnectionState connState = (ConnectionState)sessionState.getParent();
-
+
MethodInvocation mi = (MethodInvocation)invocation;
-
- Message m = (Message)mi.getArguments()[0];
-
+
+ Message m = (Message)mi.getArguments()[0];
+
connState.getResourceManager().addMessage(txID, m);
-
+
// ... and we don't invoke any further interceptors in the stack
- return null;
+ return null;
}
else
- {
+ {
return invocation.invokeNext();
}
}
-
+
public Object handlePreDeliver(Invocation invocation) throws Throwable
{
SessionState state = (SessionState)getState(invocation);
-
+
if (state.isTransacted())
{
MethodInvocation mi = (MethodInvocation)invocation;
-
+
MessageProxy proxy = (MessageProxy)mi.getArguments()[0];
-
+
//long messageID = proxy.getMessage().getMessageID();
-
+
int consumerID = ((Integer)mi.getArguments()[1]).intValue();
-
+
AckInfo info = new AckInfo(proxy, consumerID);
-
+
Object txID = state.getCurrentTxId();
-
+
if (txID == null)
{
throw new IllegalStateException("Attempt to send message in tx, but txId is null, XA?" + state.isXA());
}
-
+
ConnectionState connState = (ConnectionState)state.getParent();
-
+
//Add the acknowledgement to the transaction
- connState.getResourceManager().addAck(txID, info);
+ connState.getResourceManager().addAck(txID, info);
}
return null;
}
// Protected ------------------------------------------------------
-
+
// Package Private ------------------------------------------------
-
+
// Private --------------------------------------------------------
-
+
private HierarchicalState getState(Invocation inv)
{
return ((DelegateSupport)inv.getTargetObject()).getState();
}
-
+
// Inner Classes --------------------------------------------------
-
+
}
Modified: branches/Branch_1_0/src/main/org/jboss/jms/client/remoting/MessageCallbackHandler.java
===================================================================
--- branches/Branch_1_0/src/main/org/jboss/jms/client/remoting/MessageCallbackHandler.java 2006-08-22 18:43:19 UTC (rev 1228)
+++ branches/Branch_1_0/src/main/org/jboss/jms/client/remoting/MessageCallbackHandler.java 2006-08-22 21:07:07 UTC (rev 1229)
@@ -311,29 +311,25 @@
waitForOnMessageToComplete();
- //Now we cancel anything left in the buffer
- //The reason we do this now is that otherwise the deliveries wouldn't get cancelled
- //until session close (since we don't cancel consumer's deliveries until then)
- //which is too late - since we need to preserve the order of messages delivered in a session.
+ // Now we cancel anything left in the buffer. The reason we do this now is that otherwise the
+ // deliveries wouldn't get cancelled until session close (since we don't cancel consumer's
+ // deliveries until then), which is too late - since we need to preserve the order of messages
+ // delivered in a session.
if (!buffer.isEmpty())
{
- //Now we cancel any deliveries that might be waiting in our buffer
- //This is because, otherwise the messages wouldn't get cancelled until
- //the corresponding session died.
- //So if another consumer in another session tried to consume from the channel
- //before that session died it wouldn't receive those messages
- Iterator iter = buffer.iterator();
-
+ // Now we cancel any deliveries that might be waiting in our buffer. This is because
+ // otherwise the messages wouldn't get cancelled until the corresponding session died.
+ // So if another consumer in another session tried to consume from the channel before that
+ // session died it wouldn't receive those messages.
+
List ackInfos = new ArrayList();
- while (iter.hasNext())
- {
- MessageProxy mp = (MessageProxy)iter.next();
-
+
+ for(Iterator i = buffer.iterator(); i.hasNext();)
+ {
+ MessageProxy mp = (MessageProxy)i.next();
AckInfo ack = new AckInfo(mp, consumerID);
-
ackInfos.add(ack);
-
}
sessionDelegate.cancelDeliveries(ackInfos);
@@ -346,7 +342,7 @@
private void waitForOnMessageToComplete()
{
- //Wait for any on message executions to complete
+ // Wait for any on message executions to complete
Future result = new Future();
Modified: branches/Branch_1_0/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
===================================================================
--- branches/Branch_1_0/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java 2006-08-22 18:43:19 UTC (rev 1228)
+++ branches/Branch_1_0/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java 2006-08-22 21:07:07 UTC (rev 1229)
@@ -457,7 +457,7 @@
{
try
{
- //Deliveries must be cancelled in reverse order
+ // deliveries must be cancelled in reverse order
Set consumers = new HashSet();
@@ -465,8 +465,9 @@
{
AckInfo ack = (AckInfo)ackInfos.get(i);
- //We look in the global map since the message might have come from connection consumer
- ServerConsumerEndpoint consumer = this.connectionEndpoint.getConsumerEndpoint(ack.getConsumerID());
+ // We look in the global map since the message might have come from connection consumer
+ ServerConsumerEndpoint consumer =
+ this.connectionEndpoint.getConsumerEndpoint(ack.getConsumerID());
if (consumer == null)
{
@@ -474,18 +475,14 @@
}
consumer.cancelDelivery(new Long(ack.getMessageID()));
-
consumers.add(consumer);
}
- //Need to prompt delivery for all consumers
+ // need to prompt delivery for all consumers
- Iterator iter = consumers.iterator();
-
- while (iter.hasNext())
+ for(Iterator i = consumers.iterator(); i.hasNext(); )
{
- ServerConsumerEndpoint consumer = (ServerConsumerEndpoint)iter.next();
-
+ ServerConsumerEndpoint consumer = (ServerConsumerEndpoint)i.next();
consumer.promptDelivery();
}
}
Modified: branches/Branch_1_0/src/main/org/jboss/jms/tx/AckInfo.java
===================================================================
--- branches/Branch_1_0/src/main/org/jboss/jms/tx/AckInfo.java 2006-08-22 18:43:19 UTC (rev 1228)
+++ branches/Branch_1_0/src/main/org/jboss/jms/tx/AckInfo.java 2006-08-22 21:07:07 UTC (rev 1229)
@@ -48,7 +48,7 @@
protected int consumerID;
- //The actual proxy must not get serialized
+ // The actual proxy must not get serialized
protected transient MessageProxy msg;
// Static --------------------------------------------------------
Modified: branches/Branch_1_0/src/main/org/jboss/jms/tx/ResourceManager.java
===================================================================
--- branches/Branch_1_0/src/main/org/jboss/jms/tx/ResourceManager.java 2006-08-22 18:43:19 UTC (rev 1228)
+++ branches/Branch_1_0/src/main/org/jboss/jms/tx/ResourceManager.java 2006-08-22 21:07:07 UTC (rev 1229)
@@ -166,19 +166,19 @@
{
if (trace) { log.trace("rolling back local xid " + xid); }
- TxState tx = removeTx(xid);
+ TxState ts = removeTx(xid);
- if (tx == null)
+ if (ts == null)
{
throw new IllegalStateException("Cannot find transaction with xid:" + xid);
}
- //Don't need messages for rollback
- tx.clearMessages();
+ // don't need messages for rollback
+ ts.clearMessages();
- //For one phase rollback there is nothing to do on the server
+ // for one phase rollback there is nothing to do on the server
- redeliverMessages(tx);
+ redeliverMessages(ts);
}
public void commit(Xid xid, boolean onePhase, ConnectionDelegate connection) throws XAException
@@ -275,59 +275,45 @@
/*
* Rollback has occurred so we need to redeliver any unacked messages corresponding to the acks
- * is in the transaction
+ * is in the transaction.
*/
- private void redeliverMessages(TxState tx) throws JMSException
+ private void redeliverMessages(TxState ts) throws JMSException
{
- Iterator iter = tx.getAcks().iterator();
-
- //Sort them into lists - one for each session
-
- //We use a LinkedHashMap since we need to preserve the order of the sessions
+ // Sort messages into lists, one for each session. We use a LinkedHashMap since we need to
+ // preserve the order of the sessions.
+
Map toAck = new LinkedHashMap();
-
- while (iter.hasNext())
+
+ for(Iterator i = ts.getAcks().iterator(); i.hasNext(); )
{
- AckInfo ack = (AckInfo)iter.next();
-
+ AckInfo ack = (AckInfo)i.next();
SessionDelegate del = ack.msg.getSessionDelegate();
List acks = (List)toAck.get(del);
-
if (acks == null)
{
acks = new ArrayList();
-
toAck.put(del, acks);
}
-
acks.add(ack);
}
- //Now tell each session to redeliver
+ // Now tell each session to redeliver.
LinkedList l = new LinkedList();
- iter = toAck.entrySet().iterator();
-
- //need to reverse the order
- while (iter.hasNext())
+ for(Iterator i = toAck.entrySet().iterator(); i.hasNext();)
{
- Object entry = iter.next();
-
- l.addFirst(entry);
+ // need to reverse the order
+ Object entry = i.next();
+ l.addFirst(entry);
}
- iter = l.iterator();
-
- while (iter.hasNext())
+ for(Iterator i = l.iterator(); i.hasNext();)
{
- Map.Entry entry = (Map.Entry)iter.next();
-
+ Map.Entry entry = (Map.Entry)i.next();
SessionDelegate sess = (SessionDelegate)entry.getKey();
-
List acks = (List)entry.getValue();
-
sess.redeliver(acks);
}
}
Modified: branches/Branch_1_0/src/main/org/jboss/messaging/core/ChannelSupport.java
===================================================================
--- branches/Branch_1_0/src/main/org/jboss/messaging/core/ChannelSupport.java 2006-08-22 18:43:19 UTC (rev 1228)
+++ branches/Branch_1_0/src/main/org/jboss/messaging/core/ChannelSupport.java 2006-08-22 21:07:07 UTC (rev 1229)
@@ -939,49 +939,33 @@
if (!removed)
{
- // This is ok
- // This can happen if the message is cancelled before the result of
- // ServerConsumerDelegate.handle
- // has returned, in which case we won't have a record of the delivery
- // in the Set
+ // This is OK. This can happen if the message is cancelled before the result of
+ // ServerConsumerDelegate.handle has returned, in which case we won't have a record of the
+ // delivery in the Set. In this case we don't want to add the message reference back into
+ // the state since it was never removed in the first place.
- // In this case we don't want to add the message reference back into
- // the state
- // since it was never removed in the first place
-
- if (trace)
- {
- log.trace(this + " can't find delivery " + del
- + " in state so not replacing messsage ref");
- }
-
+ if (trace) { log.trace(this + " can't find delivery " + del + " in state so not replacing messsage ref"); }
}
else
{
+ MessageReference ref;
synchronized (refLock)
{
- messageRefs.addFirst(del.getReference(), del.getReference()
- .getPriority());
+ ref = del.getReference();
+ messageRefs.addFirst(ref, ref.getPriority());
if (paging)
{
- // if paging we need to evict the end reference to storage to
- // preserve the number of refs in the queue
+ // If paging we need to evict the end reference to storage to preserve the number of
+ // references in the queue.
- MessageReference ref = (MessageReference) messageRefs
- .removeLast();
-
- addToDownCache(ref);
-
+ MessageReference last = (MessageReference)messageRefs.removeLast();
+ addToDownCache(last);
refsInStorage++;
}
}
- if (trace)
- {
- log.trace(this + " added " + del.getReference()
- + " back into state");
- }
+ if (trace) { log.trace(this + " added " + ref + " back into state"); }
}
}
Modified: branches/Branch_1_0/tests/src/org/jboss/test/messaging/jms/TransactedSessionTest.java
===================================================================
--- branches/Branch_1_0/tests/src/org/jboss/test/messaging/jms/TransactedSessionTest.java 2006-08-22 18:43:19 UTC (rev 1228)
+++ branches/Branch_1_0/tests/src/org/jboss/test/messaging/jms/TransactedSessionTest.java 2006-08-22 21:07:07 UTC (rev 1229)
@@ -506,7 +506,9 @@
}
/**
- * Make sure redelivered flag is set on redelivery via rollback, different setup
+ * Make sure redelivered flag is set on redelivery via rollback, different setup: we close the
+ * rolled back session and we receive the message whose acknowledgment was cancelled on a new
+ * session.
*/
public void testRedeliveredQueue2() throws Exception
{
@@ -516,16 +518,17 @@
{
conn = cf.createConnection();
- Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageProducer prod = sess.createProducer(queue);
- prod.send(sess.createTextMessage("a message"));
+ Session sendSession = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageProducer prod = sendSession.createProducer(queue);
+ prod.send(sendSession.createTextMessage("a message"));
+
log.debug("Message was sent to the queue");
conn.close();
conn = cf.createConnection();
- sess = conn.createSession(true, Session.SESSION_TRANSACTED);
+ Session sess = conn.createSession(true, Session.SESSION_TRANSACTED);
MessageConsumer cons = sess.createConsumer(queue);
@@ -539,16 +542,70 @@
sess.rollback();
sess.close();
- sess = conn.createSession(true, Session.SESSION_TRANSACTED);
+ Session sess2 = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
- cons = sess.createConsumer(queue);
+ cons = sess2.createConsumer(queue);
tm = (TextMessage)cons.receive();
assertEquals("a message", tm.getText());
assertTrue(tm.getJMSRedelivered());
+ }
+ finally
+ {
+ if (conn != null)
+ {
+ conn.close();
+ }
+ }
+ }
- sess.commit();
+ /**
+ * Make sure redelivered flag is set on redelivery via rollback, different setup: we don't close
+ * the rolled back session and we receive the message whose acknowledgment was cancelled on a new
+ * session.
+ *
+ * TODO: Is this test semantically correct.
+ */
+ public void testRedeliveredQueue3() throws Exception
+ {
+ Connection conn = null;
+
+ try
+ {
+ conn = cf.createConnection();
+
+ Session sendSession = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ MessageProducer prod = sendSession.createProducer(queue);
+ prod.send(sendSession.createTextMessage("a message"));
+
+ log.debug("Message was sent to the queue");
+
+ conn.close();
+
+ conn = cf.createConnection();
+ Session sess = conn.createSession(true, Session.SESSION_TRANSACTED);
+
+ MessageConsumer cons = sess.createConsumer(queue);
+
+ conn.start();
+
+ TextMessage tm = (TextMessage)cons.receive();
+
+ assertEquals("a message", tm.getText());
+ assertFalse(tm.getJMSRedelivered());
+
+ sess.rollback();
+
+ Session sess2 = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ cons = sess2.createConsumer(queue);
+
+ tm = (TextMessage)cons.receive(3000);
+
+ assertEquals("a message", tm.getText());
+ assertTrue(tm.getJMSRedelivered());
}
finally
{
@@ -559,6 +616,7 @@
}
}
+
public void testReceivedRollbackQueue() throws Exception
{
Connection conn = cf.createConnection();
More information about the jboss-cvs-commits
mailing list