[jboss-cvs] JBoss Messaging SVN: r3238 - in trunk: src/main/org/jboss/jms/client/delegate and 8 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Sun Oct 21 19:29:01 EDT 2007
Author: timfox
Date: 2007-10-21 19:29:00 -0400 (Sun, 21 Oct 2007)
New Revision: 3238
Added:
trunk/src/main/org/jboss/jms/wireformat/SessionAcknowledgeDeliveryResponse.java
Modified:
trunk/src/main/org/jboss/jms/client/container/ClientConsumer.java
trunk/src/main/org/jboss/jms/client/container/FailoverValveInterceptor.java
trunk/src/main/org/jboss/jms/client/container/SessionAspect.java
trunk/src/main/org/jboss/jms/client/delegate/ClientSessionDelegate.java
trunk/src/main/org/jboss/jms/delegate/SessionDelegate.java
trunk/src/main/org/jboss/jms/delegate/SessionEndpoint.java
trunk/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java
trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
trunk/src/main/org/jboss/jms/server/endpoint/advised/SessionAdvised.java
trunk/src/main/org/jboss/jms/server/security/SecurityMetadataStore.java
trunk/src/main/org/jboss/jms/wireformat/PacketSupport.java
trunk/src/main/org/jboss/jms/wireformat/SessionAcknowledgeDeliveryRequest.java
trunk/src/main/org/jboss/messaging/core/impl/JDBCPersistenceManager.java
trunk/src/main/org/jboss/messaging/core/jmx/JDBCPersistenceManagerService.java
trunk/tests/src/org/jboss/test/messaging/jms/clustering/FailoverTest.java
Log:
Ack tweaks
Modified: trunk/src/main/org/jboss/jms/client/container/ClientConsumer.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/container/ClientConsumer.java 2007-10-21 15:27:51 UTC (rev 3237)
+++ trunk/src/main/org/jboss/jms/client/container/ClientConsumer.java 2007-10-21 23:29:00 UTC (rev 3238)
@@ -461,7 +461,12 @@
sessionDelegate.preDeliver(info);
- sessionDelegate.postDeliver();
+ //If post deliver didn't succeed and acknowledgement mode is auto_ack
+ //That means the ref wasn't acked since it couldn't be found.
+ //In order to maintain at most once semantics we must therefore not return
+ //the message
+
+ ignore = !sessionDelegate.postDeliver();
}
if (!ignore)
Modified: trunk/src/main/org/jboss/jms/client/container/FailoverValveInterceptor.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/container/FailoverValveInterceptor.java 2007-10-21 15:27:51 UTC (rev 3237)
+++ trunk/src/main/org/jboss/jms/client/container/FailoverValveInterceptor.java 2007-10-21 23:29:00 UTC (rev 3238)
@@ -12,6 +12,7 @@
import org.jboss.jms.client.FailoverCommandCenter;
import org.jboss.jms.client.FailoverValve2;
import org.jboss.jms.client.FailureDetector;
+import org.jboss.jms.client.delegate.ClientConnectionDelegate;
import org.jboss.jms.client.delegate.ClientSessionDelegate;
import org.jboss.jms.client.delegate.DelegateSupport;
import org.jboss.jms.client.remoting.JMSRemotingConnection;
@@ -119,8 +120,8 @@
// Set retry flag as true on send() and sendTransaction()
// more details at http://jira.jboss.org/jira/browse/JBMESSAGING-809
- if (invocation.getTargetObject() instanceof ClientSessionDelegate &&
- (methodName.equals("send") || methodName.equals("sendTransaction")))
+ if ((invocation.getTargetObject() instanceof ClientSessionDelegate && methodName.equals("send")) ||
+ (invocation.getTargetObject() instanceof ClientConnectionDelegate && methodName.equals("sendTransaction")))
{
log.trace(this + " caught " + methodName + "() invocation, enabling check for duplicates");
Modified: trunk/src/main/org/jboss/jms/client/container/SessionAspect.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/container/SessionAspect.java 2007-10-21 15:27:51 UTC (rev 3237)
+++ trunk/src/main/org/jboss/jms/client/container/SessionAspect.java 2007-10-21 23:29:00 UTC (rev 3238)
@@ -305,6 +305,8 @@
int ackMode = state.getAcknowledgeMode();
SessionDelegate sd = (SessionDelegate)mi.getTargetObject();
+
+ boolean res = true;
// if XA and there is no transaction enlisted on XA we will act as AutoAcknowledge
// However if it's a MDB (if there is a DistinguishedListener) we should behaved as transacted
@@ -333,7 +335,7 @@
try
{
- ackDelivery(sd, delivery);
+ res = ackDelivery(sd, delivery);
}
finally
{
@@ -375,11 +377,10 @@
state.setRecoverCalled(false);
}
- state.setAutoAckInfo(null);
-
+ state.setAutoAckInfo(null);
}
- return null;
+ return Boolean.valueOf(res);
}
/**
@@ -829,7 +830,7 @@
return (SessionState)((DelegateSupport)inv.getTargetObject()).getState();
}
- private void ackDelivery(SessionDelegate sess, DeliveryInfo delivery) throws Exception
+ private boolean ackDelivery(SessionDelegate sess, DeliveryInfo delivery) throws Exception
{
if (delivery.isShouldAck())
{
@@ -840,8 +841,12 @@
SessionDelegate sessionToUse = connectionConsumerSession != null ? connectionConsumerSession : sess;
- sessionToUse.acknowledgeDelivery(delivery);
+ return sessionToUse.acknowledgeDelivery(delivery);
}
+ else
+ {
+ return false;
+ }
}
private void cancelDelivery(SessionDelegate sess, DeliveryInfo delivery) throws Exception
Modified: trunk/src/main/org/jboss/jms/client/delegate/ClientSessionDelegate.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/delegate/ClientSessionDelegate.java 2007-10-21 15:27:51 UTC (rev 3237)
+++ trunk/src/main/org/jboss/jms/client/delegate/ClientSessionDelegate.java 2007-10-21 23:29:00 UTC (rev 3238)
@@ -161,11 +161,11 @@
// SessionDelegate implementation ---------------------------------------------------------------
- public void acknowledgeDelivery(Ack ack) throws JMSException
+ public boolean acknowledgeDelivery(Ack ack) throws JMSException
{
RequestSupport req = new SessionAcknowledgeDeliveryRequest(id, version, ack);
- doInvoke(client, req);
+ return ((Boolean)doInvoke(client, req)).booleanValue();
}
public void acknowledgeDeliveries(List acks) throws JMSException
@@ -347,7 +347,7 @@
* This invocation should either be handled by the client-side interceptor chain or by the
* server-side endpoint.
*/
- public void postDeliver() throws JMSException
+ public boolean postDeliver() throws JMSException
{
throw new IllegalStateException("This invocation should not be handled here!");
}
Modified: trunk/src/main/org/jboss/jms/delegate/SessionDelegate.java
===================================================================
--- trunk/src/main/org/jboss/jms/delegate/SessionDelegate.java 2007-10-21 15:27:51 UTC (rev 3237)
+++ trunk/src/main/org/jboss/jms/delegate/SessionDelegate.java 2007-10-21 23:29:00 UTC (rev 3238)
@@ -66,7 +66,7 @@
void preDeliver(DeliveryInfo deliveryInfo) throws JMSException;
- void postDeliver() throws JMSException;
+ boolean postDeliver() throws JMSException;
MessageListener getMessageListener() throws JMSException;
Modified: trunk/src/main/org/jboss/jms/delegate/SessionEndpoint.java
===================================================================
--- trunk/src/main/org/jboss/jms/delegate/SessionEndpoint.java 2007-10-21 15:27:51 UTC (rev 3237)
+++ trunk/src/main/org/jboss/jms/delegate/SessionEndpoint.java 2007-10-21 23:29:00 UTC (rev 3238)
@@ -73,7 +73,7 @@
* Acknowledge a delivery
* @throws JMSException
*/
- void acknowledgeDelivery(Ack ack) throws JMSException;
+ boolean acknowledgeDelivery(Ack ack) throws JMSException;
/**
* Cancel a list of deliveries.
Modified: trunk/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java 2007-10-21 15:27:51 UTC (rev 3237)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java 2007-10-21 23:29:00 UTC (rev 3238)
@@ -651,6 +651,16 @@
{
if (trace) { log.trace(this + " sending message " + msg + (tx == null ? " non-transactionally" : " in " + tx)); }
+ if (checkForDuplicates && msg.isReliable())
+ {
+ // Message is already stored... so just ignoring the call
+ if (serverPeer.getPersistenceManagerInstance().referenceExists(msg.getMessageID()))
+ {
+ if (trace) { log.trace("Duplicate of " + msg + " exists in database - probably sent before failover"); }
+ return;
+ }
+ }
+
JBossDestination dest = (JBossDestination)msg.getJMSDestination();
// This allows the no-local consumers to filter out the messages that come from the same
@@ -659,14 +669,6 @@
// TODO Do we want to set this for ALL messages. Optimisation is possible here.
msg.setConnectionID(id);
- if (checkForDuplicates)
- {
- // Message is already stored... so just ignoring the call
- if (serverPeer.getPersistenceManagerInstance().referenceExists(msg.getMessageID()))
- {
- return;
- }
- }
// We must reference the message *before* we send it the destination to be handled. This is
// so we can guarantee that the message doesn't disappear from the store before the
@@ -763,10 +765,6 @@
{
if (trace) { log.trace(this + " processing transaction " + tx); }
- // used on checkForDuplicates...
- // we only check the first iteration
- boolean firstIteration = true;
-
for (Iterator i = txState.getSessionStates().iterator(); i.hasNext(); )
{
SessionTxState sessionState = (SessionTxState)i.next();
@@ -776,19 +774,8 @@
for (Iterator j = sessionState.getMsgs().iterator(); j.hasNext(); )
{
JBossMessage message = (JBossMessage)j.next();
- if (checkForDuplicates && firstIteration)
- {
- firstIteration = false;
- if (serverPeer.getPersistenceManagerInstance().
- referenceExists(message.getMessageID()))
- {
- // This means the transaction was previously completed...
- // we are done here then... no need to even check for ACKs or anything else
- log.trace("Transaction " + tx + " was previously completed, ignoring call");
- return;
- }
- }
- sendMessage(message, tx, false);
+
+ sendMessage(message, tx, checkForDuplicates);
}
// send the acks
Modified: trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java 2007-10-21 15:27:51 UTC (rev 3237)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java 2007-10-21 23:29:00 UTC (rev 3238)
@@ -93,7 +93,6 @@
import EDU.oswego.cs.dl.util.concurrent.ConcurrentHashMap;
import EDU.oswego.cs.dl.util.concurrent.LinkedQueue;
import EDU.oswego.cs.dl.util.concurrent.QueuedExecutor;
-import EDU.oswego.cs.dl.util.concurrent.SynchronizedInt;
import EDU.oswego.cs.dl.util.concurrent.SynchronizedLong;
/**
@@ -399,7 +398,7 @@
{
do
{
- connectionEndpoint.sendMessage(message, null, false);
+ connectionEndpoint.sendMessage(message, null, checkForDuplicates);
expectedSequence++;
@@ -428,11 +427,11 @@
}
}
- public void acknowledgeDelivery(Ack ack) throws JMSException
+ public boolean acknowledgeDelivery(Ack ack) throws JMSException
{
try
{
- acknowledgeDeliveryInternal(ack);
+ return acknowledgeDeliveryInternal(ack);
}
catch (Throwable t)
{
@@ -1740,16 +1739,17 @@
}
}
- private void acknowledgeDeliveryInternal(Ack ack) throws Throwable
+ private boolean acknowledgeDeliveryInternal(Ack ack) throws Throwable
{
if (trace) { log.trace(this + " acknowledging delivery " + ack); }
-
+
DeliveryRecord rec = (DeliveryRecord)deliveries.remove(new Long(ack.getDeliveryID()));
if (rec == null)
{
+ //This can happen if an ack comes in after failover
log.debug("Cannot find " + ack + " to acknowledge, it was probably acknowledged before");
- return;
+ return false;
}
rec.del.acknowledge(null);
@@ -1762,6 +1762,8 @@
}
if (trace) { log.trace(this + " acknowledged delivery " + ack); }
+
+ return true;
}
/* TODO We can combine this with createConsumerDelegateInternal once we move the distinction between queues and topics
Modified: trunk/src/main/org/jboss/jms/server/endpoint/advised/SessionAdvised.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/advised/SessionAdvised.java 2007-10-21 15:27:51 UTC (rev 3237)
+++ trunk/src/main/org/jboss/jms/server/endpoint/advised/SessionAdvised.java 2007-10-21 23:29:00 UTC (rev 3238)
@@ -115,9 +115,9 @@
endpoint.acknowledgeDeliveries(acks);
}
- public void acknowledgeDelivery(Ack ack) throws JMSException
+ public boolean acknowledgeDelivery(Ack ack) throws JMSException
{
- endpoint.acknowledgeDelivery(ack);
+ return endpoint.acknowledgeDelivery(ack);
}
public void addTemporaryDestination(JBossDestination destination) throws JMSException
Modified: trunk/src/main/org/jboss/jms/server/security/SecurityMetadataStore.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/security/SecurityMetadataStore.java 2007-10-21 15:27:51 UTC (rev 3237)
+++ trunk/src/main/org/jboss/jms/server/security/SecurityMetadataStore.java 2007-10-21 23:29:00 UTC (rev 3238)
@@ -310,7 +310,7 @@
{
log.warn("WARNING! POTENTIAL SECURITY RISK. It has been detected that the MessageSucker component " +
"which sucks messages from one node to another has not had its password changed from the installation default. " +
- "Please see the JBoss Messaging userguide for instructions on how to do this.");
+ "Please see the JBoss Messaging user guide for instructions on how to do this.");
}
}
Modified: trunk/src/main/org/jboss/jms/wireformat/PacketSupport.java
===================================================================
--- trunk/src/main/org/jboss/jms/wireformat/PacketSupport.java 2007-10-21 15:27:51 UTC (rev 3237)
+++ trunk/src/main/org/jboss/jms/wireformat/PacketSupport.java 2007-10-21 23:29:00 UTC (rev 3238)
@@ -156,6 +156,7 @@
public static final int RESP_SESSION_CREATEBROWSERDELEGATE = 100301;
public static final int RESP_SESSION_CREATEQUEUE = 100302;
public static final int RESP_SESSION_CREATETOPIC = 100303;
+ public static final int RESP_SESSION_ACKNOWLEDGEDELIVERY = 100304;
// Browser
// -----------------------
@@ -344,6 +345,10 @@
case RESP_SESSION_CREATETOPIC:
packet = new SessionCreateTopicResponse();
break;
+ case RESP_SESSION_ACKNOWLEDGEDELIVERY:
+ packet = new SessionAcknowledgeDeliveryResponse();
+ break;
+
// Browser
case RESP_BROWSER_NEXTMESSAGE:
Modified: trunk/src/main/org/jboss/jms/wireformat/SessionAcknowledgeDeliveryRequest.java
===================================================================
--- trunk/src/main/org/jboss/jms/wireformat/SessionAcknowledgeDeliveryRequest.java 2007-10-21 15:27:51 UTC (rev 3237)
+++ trunk/src/main/org/jboss/jms/wireformat/SessionAcknowledgeDeliveryRequest.java 2007-10-21 23:29:00 UTC (rev 3238)
@@ -75,9 +75,9 @@
throw new IllegalStateException("Cannot find object in dispatcher with id " + objectId);
}
- endpoint.acknowledgeDelivery(ack);
+ boolean res = endpoint.acknowledgeDelivery(ack);
- return null;
+ return new SessionAcknowledgeDeliveryResponse(res);
}
public void write(DataOutputStream os) throws Exception
Added: trunk/src/main/org/jboss/jms/wireformat/SessionAcknowledgeDeliveryResponse.java
===================================================================
--- trunk/src/main/org/jboss/jms/wireformat/SessionAcknowledgeDeliveryResponse.java (rev 0)
+++ trunk/src/main/org/jboss/jms/wireformat/SessionAcknowledgeDeliveryResponse.java 2007-10-21 23:29:00 UTC (rev 3238)
@@ -0,0 +1,63 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.jms.wireformat;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+
+
+public class SessionAcknowledgeDeliveryResponse extends ResponseSupport
+{
+ private boolean res;
+
+ public SessionAcknowledgeDeliveryResponse()
+ {
+ }
+
+ public SessionAcknowledgeDeliveryResponse(boolean res)
+ {
+ super(PacketSupport.RESP_SESSION_ACKNOWLEDGEDELIVERY);
+
+ this.res = res;
+ }
+
+ public Object getResponse()
+ {
+ return Boolean.valueOf(res);
+ }
+
+ public void write(DataOutputStream os) throws Exception
+ {
+ super.write(os);
+
+ os.writeBoolean(res);
+
+ os.flush();
+ }
+
+ public void read(DataInputStream is) throws Exception
+ {
+ res = is.readBoolean();
+ }
+
+}
+
Modified: trunk/src/main/org/jboss/messaging/core/impl/JDBCPersistenceManager.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/impl/JDBCPersistenceManager.java 2007-10-21 15:27:51 UTC (rev 3237)
+++ trunk/src/main/org/jboss/messaging/core/impl/JDBCPersistenceManager.java 2007-10-21 23:29:00 UTC (rev 3238)
@@ -118,8 +118,6 @@
{
super(ds, tm, sqlProperties, createTablesOnStartup);
- log.info("Set blob " + supportsBlobSelect);
-
//usingBatchUpdates is currently ignored due to sketchy support from databases
this.usingBinaryStream = usingBinaryStream;
Modified: trunk/src/main/org/jboss/messaging/core/jmx/JDBCPersistenceManagerService.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/jmx/JDBCPersistenceManagerService.java 2007-10-21 15:27:51 UTC (rev 3237)
+++ trunk/src/main/org/jboss/messaging/core/jmx/JDBCPersistenceManagerService.java 2007-10-21 23:29:00 UTC (rev 3238)
@@ -188,7 +188,6 @@
public void setSupportsBlobOnSelect(boolean b)
{
- log.info("Calling set blob on select " + b);
this.supportsBlobOnSelect = b;
}
Modified: trunk/tests/src/org/jboss/test/messaging/jms/clustering/FailoverTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/clustering/FailoverTest.java 2007-10-21 15:27:51 UTC (rev 3237)
+++ trunk/tests/src/org/jboss/test/messaging/jms/clustering/FailoverTest.java 2007-10-21 23:29:00 UTC (rev 3238)
@@ -1135,7 +1135,7 @@
conn.close();
}
}
- }
+ }
public void testFailoverMessageOnServer() throws Exception
{
@@ -1173,7 +1173,7 @@
}
if (event == null)
{
- fail("Did not get expected FAILOVER_COMPLETED event");
+ fail("Did not get expected FAILOVER_STARTED event");
}
}
@@ -1181,8 +1181,12 @@
// test the client-side failover valve
TextMessage tm = (TextMessage)cons.receive(60000);
+
assertNotNull(tm);
assertEquals("blip", tm.getText());
+
+ tm = (TextMessage)cons.receive(1000);
+ assertNull(tm);
}
finally
{
@@ -1400,64 +1404,64 @@
failureOnInvocation(PoisonInterceptor.FAIL_AFTER_SEND);
}
- // This test is commented out until http://jira.jboss.com/jira/browse/JBMESSAGING-604 is complete
-// public void testFailureRightAfterSendTransaction() throws Exception
-// {
-// Connection conn = null;
-//
-// try
-// {
-// conn = this.createConnectionOnServer(cf, 1);
-//
-// assertEquals(1, getServerId(conn));
-//
-// // we "cripple" the remoting connection by removing ConnectionListener. This way, failures
-// // cannot be "cleanly" detected by the client-side pinger, and we'll fail on an invocation
-// JMSRemotingConnection rc = ((ClientConnectionDelegate)((JBossConnection)conn).
-// getDelegate()).getRemotingConnection();
-// rc.removeConnectionListener();
-//
-// // poison the server
-// ServerManagement.poisonTheServer(1, PoisonInterceptor.FAIL_AFTER_SENDTRANSACTION);
-//
-// Session session = conn.createSession(true, Session.SESSION_TRANSACTED);
-//
-// conn.start();
-//
-// MessageProducer producer = session.createProducer(queue[0]);
-//
-// producer.setDeliveryMode(DeliveryMode.PERSISTENT);
-//
-// MessageConsumer consumer = session.createConsumer(queue[0]);
-//
-// producer.send(session.createTextMessage("before-poison1"));
-// producer.send(session.createTextMessage("before-poison2"));
-// producer.send(session.createTextMessage("before-poison3"));
-// session.commit();
-//
-// Thread.sleep(2000);
-//
-// for (int i = 1; i <= 3; i++)
-// {
-// TextMessage tm = (TextMessage) consumer.receive(5000);
-//
-// assertNotNull(tm);
-//
-// assertEquals("before-poison" + i, tm.getText());
-// }
-//
-// assertNull(consumer.receive(3000));
-//
-// }
-// finally
-// {
-// if (conn != null)
-// {
-// conn.close();
-// }
-// }
-// }
+ public void testFailureRightAfterSendTransaction() throws Exception
+ {
+ Connection conn = null;
+
+ try
+ {
+ conn = this.createConnectionOnServer(cf, 1);
+ assertEquals(1, getServerId(conn));
+
+ // we "cripple" the remoting connection by removing ConnectionListener. This way, failures
+ // cannot be "cleanly" detected by the client-side pinger, and we'll fail on an invocation
+ JMSRemotingConnection rc = ((ClientConnectionDelegate)((JBossConnection)conn).
+ getDelegate()).getRemotingConnection();
+ rc.removeConnectionListener();
+
+ // poison the server
+ ServerManagement.poisonTheServer(1, PoisonInterceptor.FAIL_AFTER_SENDTRANSACTION);
+
+ Session session = conn.createSession(true, Session.SESSION_TRANSACTED);
+
+ conn.start();
+
+ MessageProducer producer = session.createProducer(queue[0]);
+
+ producer.setDeliveryMode(DeliveryMode.PERSISTENT);
+
+ MessageConsumer consumer = session.createConsumer(queue[0]);
+
+ producer.send(session.createTextMessage("before-poison1"));
+ producer.send(session.createTextMessage("before-poison2"));
+ producer.send(session.createTextMessage("before-poison3"));
+ session.commit();
+
+ Thread.sleep(2000);
+
+ for (int i = 1; i <= 3; i++)
+ {
+ TextMessage tm = (TextMessage) consumer.receive(5000);
+
+ assertNotNull(tm);
+
+ assertEquals("before-poison" + i, tm.getText());
+ }
+
+ assertNull(consumer.receive(3000));
+
+ session.commit();
+ }
+ finally
+ {
+ if (conn != null)
+ {
+ conn.close();
+ }
+ }
+ }
+
public void testCloseConsumer() throws Exception
{
Connection conn0 = null;
@@ -1831,9 +1835,17 @@
TextMessage tm = (TextMessage)consumer.receive(5000);
- assertNotNull(tm);
+ if(typeOfFailure == PoisonInterceptor.FAIL_AFTER_ACKNOWLEDGE_DELIVERY)
+ {
+ //With auto_ack we won't the message - remember auto ack is "at most once"
+ assertNull(tm);
+ }
+ else
+ {
+ assertNotNull(tm);
- assertEquals("before-poison", tm.getText());
+ assertEquals("before-poison", tm.getText());
+ }
checkEmpty(queue[1], 0);
}
More information about the jboss-cvs-commits
mailing list