[jboss-cvs] JBoss Messaging SVN: r1571 - in branches/Branch_1_0_1_SP: . src/main/org/jboss/jms/client/container src/main/org/jboss/jms/client/delegate src/main/org/jboss/jms/client/remoting src/main/org/jboss/jms/client/state src/main/org/jboss/jms/message src/main/org/jboss/jms/server src/main/org/jboss/jms/server/endpoint tests/src/org/jboss/test/messaging/jms
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Thu Nov 16 22:31:27 EST 2006
Author: ovidiu.feodorov at jboss.com
Date: 2006-11-16 22:31:19 -0500 (Thu, 16 Nov 2006)
New Revision: 1571
Modified:
branches/Branch_1_0_1_SP/
branches/Branch_1_0_1_SP/src/main/org/jboss/jms/client/container/SessionAspect.java
branches/Branch_1_0_1_SP/src/main/org/jboss/jms/client/container/TransactionAspect.java
branches/Branch_1_0_1_SP/src/main/org/jboss/jms/client/delegate/ClientConsumerDelegate.java
branches/Branch_1_0_1_SP/src/main/org/jboss/jms/client/delegate/ClientSessionDelegate.java
branches/Branch_1_0_1_SP/src/main/org/jboss/jms/client/remoting/MessageCallbackHandler.java
branches/Branch_1_0_1_SP/src/main/org/jboss/jms/client/state/ConsumerState.java
branches/Branch_1_0_1_SP/src/main/org/jboss/jms/message/MessageProxy.java
branches/Branch_1_0_1_SP/src/main/org/jboss/jms/server/ServerPeer.java
branches/Branch_1_0_1_SP/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java
branches/Branch_1_0_1_SP/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
branches/Branch_1_0_1_SP/tests/src/org/jboss/test/messaging/jms/DLQTest.java
branches/Branch_1_0_1_SP/tests/src/org/jboss/test/messaging/jms/MessageConsumerTest.java
branches/Branch_1_0_1_SP/tests/src/org/jboss/test/messaging/jms/SessionTest.java
Log:
minor refactoring and reformatting
Property changes on: branches/Branch_1_0_1_SP
___________________________________________________________________
Name: svn:ignore
- output
thirdparty
messaging.iws
+ output
thirdparty
messaging.iws
messaging-Branch_1_0_1_SP.ipr
messaging-Branch_1_0_1_SP.iws
messaging-Branch_1_0_1_SP.iml
Modified: branches/Branch_1_0_1_SP/src/main/org/jboss/jms/client/container/SessionAspect.java
===================================================================
--- branches/Branch_1_0_1_SP/src/main/org/jboss/jms/client/container/SessionAspect.java 2006-11-17 03:24:50 UTC (rev 1570)
+++ branches/Branch_1_0_1_SP/src/main/org/jboss/jms/client/container/SessionAspect.java 2006-11-17 03:31:19 UTC (rev 1571)
@@ -92,12 +92,11 @@
i.remove();
}
- //On closing we acknowlege any client or dups ok, since the session might have closed
- //before the onMessage had finished executing
-
- //And any client ack, or transactional we cancel, we do this explicitly so we can pass the
- //updated delivery count information from client to server.
- //We could just do this on the server but we would lose delivery count info
+ // On closing we acknowlege any AUTO_ACKNOWLEDGE or DUPS_OK_ACKNOWLEDGE, since the session
+ // might have closed before the onMessage had finished executing.
+ // We cancel any client ack or transactional, we do this explicitly so we can pass the updated
+ // delivery count information from client to server. We could just do this on the server but
+ // we would lose delivery count info.
if (!acks.isEmpty())
{
@@ -183,7 +182,7 @@
if (cancel && ackMode != Session.AUTO_ACKNOWLEDGE && ackMode != Session.DUPS_OK_ACKNOWLEDGE)
{
- throw new IllegalStateException("Ack mode must be auto ack or dups ok");
+ throw new IllegalStateException("Ack mode must be AUTO_ACKNOWLEDGE or DUPS_OK_ACKNOWLEDGE");
}
if (ackMode == Session.AUTO_ACKNOWLEDGE ||
@@ -201,13 +200,15 @@
List acks = state.getToAck();
- //Sanity check
+ // Sanity check
if (acks.size() != 1)
{
- throw new IllegalStateException("Should only be one entry in list. There are " + acks.size());
+ throw new IllegalStateException("Should only be one entry in list. " +
+ "There are " + acks.size());
}
AckInfo ack = (AckInfo)acks.get(0);
+
if (cancel)
{
sd.cancelDeliveries(acks);
Modified: branches/Branch_1_0_1_SP/src/main/org/jboss/jms/client/container/TransactionAspect.java
===================================================================
--- branches/Branch_1_0_1_SP/src/main/org/jboss/jms/client/container/TransactionAspect.java 2006-11-17 03:24:50 UTC (rev 1570)
+++ branches/Branch_1_0_1_SP/src/main/org/jboss/jms/client/container/TransactionAspect.java 2006-11-17 03:31:19 UTC (rev 1571)
@@ -24,7 +24,6 @@
import javax.jms.IllegalStateException;
import javax.jms.Message;
import javax.jms.TransactionInProgressException;
-import javax.jms.Session;
import org.jboss.aop.joinpoint.Invocation;
import org.jboss.aop.joinpoint.MethodInvocation;
Modified: branches/Branch_1_0_1_SP/src/main/org/jboss/jms/client/delegate/ClientConsumerDelegate.java
===================================================================
--- branches/Branch_1_0_1_SP/src/main/org/jboss/jms/client/delegate/ClientConsumerDelegate.java 2006-11-17 03:24:50 UTC (rev 1570)
+++ branches/Branch_1_0_1_SP/src/main/org/jboss/jms/client/delegate/ClientConsumerDelegate.java 2006-11-17 03:31:19 UTC (rev 1571)
@@ -51,7 +51,6 @@
// Attributes ----------------------------------------------------
protected int bufferSize;
-
protected int maxDeliveries;
// Static --------------------------------------------------------
Modified: branches/Branch_1_0_1_SP/src/main/org/jboss/jms/client/delegate/ClientSessionDelegate.java
===================================================================
--- branches/Branch_1_0_1_SP/src/main/org/jboss/jms/client/delegate/ClientSessionDelegate.java 2006-11-17 03:24:50 UTC (rev 1570)
+++ branches/Branch_1_0_1_SP/src/main/org/jboss/jms/client/delegate/ClientSessionDelegate.java 2006-11-17 03:31:19 UTC (rev 1571)
@@ -401,7 +401,8 @@
* This invocation should either be handled by the client-side interceptor chain or by the
* server-side endpoint.
*/
- public void addAsfMessage(MessageProxy m, int consumerID, ConsumerDelegate cons, int maxDeliveries)
+ public void addAsfMessage(MessageProxy m, int consumerID,
+ ConsumerDelegate cons, int maxDeliveries)
{
throw new IllegalStateException("This invocation should not be handled here!");
}
Modified: branches/Branch_1_0_1_SP/src/main/org/jboss/jms/client/remoting/MessageCallbackHandler.java
===================================================================
--- branches/Branch_1_0_1_SP/src/main/org/jboss/jms/client/remoting/MessageCallbackHandler.java 2006-11-17 03:24:50 UTC (rev 1570)
+++ branches/Branch_1_0_1_SP/src/main/org/jboss/jms/client/remoting/MessageCallbackHandler.java 2006-11-17 03:31:19 UTC (rev 1571)
@@ -112,8 +112,8 @@
{
log.error("Max redeliveries has occurred for message: " + m.getJMSMessageID());
- //postdeliver will do a cancel rather than an ack which will cause the ref to end
- //up in the dlq
+ // postdeliver will do a cancel rather than an ack which will cause the mesage
+ // to end up in the DLQ
cancel = true;
@@ -152,8 +152,8 @@
}
}
- protected static void postDeliver(SessionDelegate sess, boolean isConnectionConsumer, boolean cancel)
- throws JMSException
+ protected static void postDeliver(SessionDelegate sess, boolean isConnectionConsumer,
+ boolean cancel) throws JMSException
{
// If this is the callback-handler for a connection consumer we don't want to acknowledge or
// add anything to the tx for this session
@@ -166,33 +166,19 @@
// Attributes ----------------------------------------------------
private LinkedList buffer;
-
private SessionDelegate sessionDelegate;
-
private ConsumerDelegate consumerDelegate;
-
private int consumerID;
-
private boolean isConnectionConsumer;
-
private volatile Thread receiverThread;
-
private MessageListener listener;
-
private int ackMode;
-
private boolean closed;
-
private Object mainLock;
-
private boolean serverSending;
-
private int bufferSize;
-
private QueuedExecutor sessionExecutor;
-
private boolean listenerRunning;
-
private int maxDeliveries;
// Constructors --------------------------------------------------
@@ -208,25 +194,15 @@
}
this.bufferSize = bufferSize;
-
buffer = new LinkedList();
-
isConnectionConsumer = isCC;
-
this.ackMode = ackMode;
-
this.sessionDelegate = sess;
-
this.consumerDelegate = cons;
-
this.consumerID = consumerID;
-
this.serverSending = true;
-
- mainLock = new Object();
-
+ mainLock = new Object();
this.sessionExecutor = sessionExecutor;
-
this.maxDeliveries = maxDeliveries;
}
Modified: branches/Branch_1_0_1_SP/src/main/org/jboss/jms/client/state/ConsumerState.java
===================================================================
--- branches/Branch_1_0_1_SP/src/main/org/jboss/jms/client/state/ConsumerState.java 2006-11-17 03:24:50 UTC (rev 1570)
+++ branches/Branch_1_0_1_SP/src/main/org/jboss/jms/client/state/ConsumerState.java 2006-11-17 03:31:19 UTC (rev 1571)
@@ -28,7 +28,6 @@
import org.jboss.jms.client.remoting.MessageCallbackHandler;
import org.jboss.jms.delegate.ConsumerDelegate;
import org.jboss.jms.server.Version;
-import org.jboss.logging.Logger;
/**
* State corresponding to a Consumer. This state is acessible inside aspects/interceptors.
@@ -40,9 +39,6 @@
*/
public class ConsumerState extends HierarchicalStateSupport
{
- private static final Logger log = Logger.getLogger(ConsumerState.class);
-
-
private Destination destination;
private String selector;
Modified: branches/Branch_1_0_1_SP/src/main/org/jboss/jms/message/MessageProxy.java
===================================================================
--- branches/Branch_1_0_1_SP/src/main/org/jboss/jms/message/MessageProxy.java 2006-11-17 03:24:50 UTC (rev 1570)
+++ branches/Branch_1_0_1_SP/src/main/org/jboss/jms/message/MessageProxy.java 2006-11-17 03:31:19 UTC (rev 1571)
@@ -80,7 +80,6 @@
protected transient boolean bodyReadOnly;
protected int deliveryCount;
- //protected transient boolean jmsRedelivered;
// Constructors --------------------------------------------------
Modified: branches/Branch_1_0_1_SP/src/main/org/jboss/jms/server/ServerPeer.java
===================================================================
--- branches/Branch_1_0_1_SP/src/main/org/jboss/jms/server/ServerPeer.java 2006-11-17 03:24:50 UTC (rev 1570)
+++ branches/Branch_1_0_1_SP/src/main/org/jboss/jms/server/ServerPeer.java 2006-11-17 03:31:19 UTC (rev 1571)
@@ -35,7 +35,6 @@
import javax.naming.NamingException;
import org.jboss.aop.AspectXmlLoader;
-import org.jboss.jms.destination.JBossDestination;
import org.jboss.jms.destination.JBossQueue;
import org.jboss.jms.server.connectionfactory.ConnectionFactoryJNDIMapper;
import org.jboss.jms.server.connectionmanager.SimpleConnectionManager;
@@ -170,7 +169,7 @@
{
try
{
- log.info("starting serverpeer");
+ log.debug("starting ServerPeer");
if (started)
{
@@ -523,24 +522,7 @@
return null;
}
- CoreDestination dlq = channelMapper.getCoreDestination(new JBossQueue(dlqName));
-
-// if (dlq == null)
-// {
-// //DLQ not deployed - so deploy default one
-// log.info("DLQ not deployed so deploying default one");
-//
-// createDestinationDefault(true, dlqName, null);
-//
-// dlq = channelMapper.getCoreDestination(new JBossQueue(dlqName));
-//
-// if (dlq == null)
-// {
-// throw new IllegalStateException("Cannot find dlq!");
-// }
-// }
-
- return dlq;
+ return channelMapper.getCoreDestination(new JBossQueue(dlqName));
}
public boolean isDeployed(boolean isQueue, String name)
Modified: branches/Branch_1_0_1_SP/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java
===================================================================
--- branches/Branch_1_0_1_SP/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java 2006-11-17 03:24:50 UTC (rev 1570)
+++ branches/Branch_1_0_1_SP/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java 2006-11-17 03:31:19 UTC (rev 1571)
@@ -54,10 +54,8 @@
import org.jboss.messaging.core.SimpleDelivery;
import org.jboss.messaging.core.SingleReceiverDelivery;
import org.jboss.messaging.core.local.CoreDestination;
-import org.jboss.messaging.core.plugin.contract.PersistenceManager;
import org.jboss.messaging.core.tx.Transaction;
import org.jboss.messaging.core.tx.TransactionException;
-import org.jboss.messaging.core.tx.TransactionRepository;
import org.jboss.messaging.core.tx.TxCallback;
import org.jboss.messaging.util.Future;
@@ -125,20 +123,15 @@
private Map deliveries;
- private int maxDeliveryAttempts;
-
private CoreDestination dlq;
- private TransactionRepository tr;
-
// Constructors --------------------------------------------------
protected ServerConsumerEndpoint(int id, Channel channel,
ServerSessionEndpoint sessionEndpoint,
String selector, boolean noLocal, JBossDestination dest,
- int prefetchSize, int maxDeliveryAttempts,
- CoreDestination dlq, TransactionRepository tr)
- throws InvalidSelectorException
+ int prefetchSize, CoreDestination dlq)
+ throws InvalidSelectorException
{
if (trace) { log.trace("constructing consumer endpoint " + id); }
@@ -146,9 +139,7 @@
this.channel = channel;
this.sessionEndpoint = sessionEndpoint;
this.prefetchSize = prefetchSize;
- this.maxDeliveryAttempts = maxDeliveryAttempts;
this.dlq = dlq;
- this.tr = tr;
// We always created with clientConsumerFull = true. This prevents the SCD sending messages to
// the client before the client has fully finished creating the MessageCallbackHandler.
Modified: branches/Branch_1_0_1_SP/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
===================================================================
--- branches/Branch_1_0_1_SP/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java 2006-11-17 03:24:50 UTC (rev 1570)
+++ branches/Branch_1_0_1_SP/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java 2006-11-17 03:31:19 UTC (rev 1571)
@@ -263,8 +263,7 @@
ServerConsumerEndpoint ep =
new ServerConsumerEndpoint(consumerID,
subscription == null ? (Channel)coreDestination : subscription,
- this, selector, noLocal, jmsDestination, prefetchSize,
- maxDeliveryAttempts, dlq, tr);
+ this, selector, noLocal, jmsDestination, prefetchSize, dlq);
JMSDispatcher.instance.registerTarget(new Integer(consumerID), new ConsumerAdvised(ep));
Modified: branches/Branch_1_0_1_SP/tests/src/org/jboss/test/messaging/jms/DLQTest.java
===================================================================
--- branches/Branch_1_0_1_SP/tests/src/org/jboss/test/messaging/jms/DLQTest.java 2006-11-17 03:24:50 UTC (rev 1570)
+++ branches/Branch_1_0_1_SP/tests/src/org/jboss/test/messaging/jms/DLQTest.java 2006-11-17 03:31:19 UTC (rev 1571)
@@ -21,8 +21,6 @@
*/
package org.jboss.test.messaging.jms;
-import java.util.Enumeration;
-
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
@@ -31,7 +29,6 @@
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Queue;
-import javax.jms.QueueBrowser;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.naming.InitialContext;
@@ -53,42 +50,25 @@
*/
public class DLQTest extends MessagingTestCase
{
+ // Constants -----------------------------------------------------
+
+ // Static --------------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
protected InitialContext ic;
-
protected ConnectionFactory cf;
-
protected Queue queue;
- protected void setUp() throws Exception
- {
- super.setUp();
-
- ServerManagement.start("all");
-
- ic = new InitialContext(ServerManagement.getJNDIEnvironment());
-
- cf = (ConnectionFactory)ic.lookup("/ConnectionFactory");
-
- ServerManagement.deployQueue("Queue");
-
- queue = (Queue)ic.lookup("/queue/Queue");
-
- }
+ // Constructors --------------------------------------------------
- protected void tearDown() throws Exception
- {
- super.tearDown();
-
- ServerManagement.undeployQueue("Queue");
-
- if (ic != null) ic.close();
- }
-
public DLQTest(String name)
{
super(name);
}
-
+
+ // Public --------------------------------------------------------
+
public void testDLQAlreadyDeployed() throws Exception
{
if (ServerManagement.isRemote())
@@ -96,33 +76,33 @@
//This test can only run in local mode
return;
}
-
+
ServerManagement.deployQueue("DLQ");
-
+
CoreDestination dlq = ServerManagement.getServer().getServerPeer().getDLQ();
-
+
assertNotNull(dlq);
-
+
InitialContext ic = null;
-
- try
+
+ try
{
ic = new InitialContext(ServerManagement.getJNDIEnvironment());
-
+
JBossQueue q = (JBossQueue)ic.lookup("/queue/DLQ");
-
+
assertNotNull(q);
-
+
assertEquals("DLQ", q.getName());
}
finally
- {
+ {
if (ic != null) ic.close();
-
+
ServerManagement.undeployQueue("DLQ");
- }
+ }
}
-
+
public void testDLQNotAlreadyDeployed() throws Exception
{
if (ServerManagement.isRemote())
@@ -130,21 +110,21 @@
//This test can only run in local mode
return;
}
-
+
CoreDestination dlq = ServerManagement.getServer().getServerPeer().getDLQ();
-
+
assertNull(dlq);
-
+
InitialContext ic = null;
-
- try
+
+ try
{
ic = new InitialContext(ServerManagement.getJNDIEnvironment());
-
+
try
{
- JBossQueue q = (JBossQueue)ic.lookup("/queue/DLQ");
-
+ ic.lookup("/queue/DLQ");
+
fail();
}
catch (NameNotFoundException e)
@@ -153,277 +133,309 @@
}
}
finally
- {
+ {
if (ic != null) ic.close();
- }
+ }
}
-
+
public void testSendToDLQWithMessageListenerPersistent() throws Exception
{
sendToDLQWithMessageListener(true);
}
-
+
public void testSendToDLQWithMessageListenerNonPersistent() throws Exception
{
sendToDLQWithMessageListener(false);
}
-
+
public void testSendToDLQWithReceivePersistent() throws Exception
{
sendToDLQWithReceive(true);
}
-
+
public void testSendToDLQWithReceiveNonPersistent() throws Exception
{
sendToDLQWithReceive(false);
}
-
+
public void testSendToDLQWithReceivePartialPersistent() throws Exception
{
sendToDLQWithReceivePartial(true);
}
-
+
public void testSendToDLQWithReceivePartialNonPersistent() throws Exception
{
sendToDLQWithReceivePartial(false);
}
-
+
public void sendToDLQWithMessageListener(boolean persistent) throws Exception
{
Connection conn = null;
-
+
ServerManagement.deployQueue("DLQ");
-
+
Queue dlq = (Queue)ic.lookup("/queue/DLQ");
-
+
try
- {
+ {
conn = cf.createConnection();
-
+
Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
+
MessageProducer prod = sess.createProducer(queue);
-
+
prod.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
-
+
for (int i = 0; i < 10; i++)
{
TextMessage tm = sess.createTextMessage("Message:" + i);
-
+
prod.send(tm);
}
-
+
MessageConsumer cons = sess.createConsumer(queue);
-
+
cons.setMessageListener(new FailingMessageListener());
-
+
conn.start();
-
+
Thread.sleep(4000);
-
- cons.setMessageListener(null);
-
+
+ cons.setMessageListener(null);
+
Message m = cons.receive(1000);
-
+
assertNull(m);
-
+
//Message should all be in the dlq - let's check
-
+
MessageConsumer cons2 = sess.createConsumer(dlq);
-
+
for (int i = 0; i < 10; i++)
{
TextMessage tm = (TextMessage)cons2.receive(1000);
-
+
assertNotNull(tm);
-
+
assertEquals("Message:" + i, tm.getText());
}
-
+
}
finally
{
ServerManagement.undeployQueue("DLQ");
-
+
if (conn != null) conn.close();
}
}
-
+
public void sendToDLQWithReceive(boolean persistent) throws Exception
{
Connection conn = null;
-
+
ServerManagement.deployQueue("DLQ");
-
+
Queue dlq = (Queue)ic.lookup("/queue/DLQ");
-
+
try
- {
+ {
conn = cf.createConnection();
-
+
Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
+
MessageProducer prod = sess.createProducer(queue);
-
- prod.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
-
+
+ prod.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
+
for (int i = 0; i < 10; i++)
{
TextMessage tm = sess.createTextMessage("Message:" + i);
-
+
prod.send(tm);
}
-
+
Session sess2 = conn.createSession(true, Session.SESSION_TRANSACTED);
-
+
MessageConsumer cons = sess2.createConsumer(queue);
-
+
conn.start();
-
+
for (int i = 0; i < 10; i++) // retries - default is 10
{
for (int j = 0; j < 10; j++)
{
TextMessage tm = (TextMessage)cons.receive(1000);
-
+
assertNotNull(tm);
-
+
assertEquals("Message:" + j, tm.getText());
}
-
+
//rollback should cause redelivery
sess2.rollback();
}
-
+
cons.close();
-
+
MessageConsumer cons2 = sess2.createConsumer(queue);
-
+
Message m = cons2.receive(1000);
-
+
assertNull(m);
//Message should all be in the dlq - let's check
-
+
MessageConsumer cons3 = sess.createConsumer(dlq);
-
+
for (int i = 0; i < 10; i++)
{
TextMessage tm = (TextMessage)cons3.receive(1000);
-
+
assertNotNull(tm);
-
+
assertEquals("Message:" + i, tm.getText());
}
-
+
}
finally
{
ServerManagement.undeployQueue("DLQ");
-
+
if (conn != null) conn.close();
}
}
-
+
public void sendToDLQWithReceivePartial(boolean persistent) throws Exception
{
Connection conn = null;
-
+
ServerManagement.deployQueue("DLQ");
-
+
Queue dlq = (Queue)ic.lookup("/queue/DLQ");
-
+
try
- {
+ {
conn = cf.createConnection();
-
+
Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
+
MessageProducer prod = sess.createProducer(queue);
-
- prod.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
-
+
+ prod.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
+
for (int i = 0; i < 10; i++)
{
TextMessage tm = sess.createTextMessage("Message:" + i);
-
+
prod.send(tm);
}
-
+
Session sess2 = conn.createSession(true, Session.SESSION_TRANSACTED);
-
+
MessageConsumer cons = sess2.createConsumer(queue);
-
+
conn.start();
-
+
for (int i = 0; i < 5; i++) // retries - default is 10
{
for (int j = 0; j < 10; j++)
{
TextMessage tm = (TextMessage)cons.receive(1000);
-
+
assertNotNull(tm);
-
+
assertEquals("Message:" + j, tm.getText());
}
-
+
//rollback should cause redelivery
sess2.rollback();
}
-
+
//They should now be cancelled back to the server
cons.close();
-
+
cons = sess2.createConsumer(queue);
-
+
for (int i = 0; i < 5; i++) // retries - default is 10
{
for (int j = 0; j < 10; j++)
{
TextMessage tm = (TextMessage)cons.receive(1000);
-
+
assertNotNull(tm);
-
+
assertEquals("Message:" + j, tm.getText());
}
-
+
//rollback should cause redelivery
sess2.rollback();
}
-
+
cons.close();
-
+
//Now they should be in DLQ
-
+
MessageConsumer cons2 = sess2.createConsumer(queue);
-
+
Message m = cons2.receive(1000);
-
+
assertNull(m);
//Message should all be in the dlq - let's check
-
+
MessageConsumer cons3 = sess.createConsumer(dlq);
-
+
for (int i = 0; i < 10; i++)
{
TextMessage tm = (TextMessage)cons3.receive(1000);
-
+
assertNotNull(tm);
-
+
assertEquals("Message:" + i, tm.getText());
}
-
+
}
finally
{
ServerManagement.undeployQueue("DLQ");
-
+
if (conn != null) conn.close();
}
}
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+
+ ServerManagement.start("all");
+
+ ic = new InitialContext(ServerManagement.getJNDIEnvironment());
+
+ cf = (ConnectionFactory)ic.lookup("/ConnectionFactory");
+
+ ServerManagement.deployQueue("Queue");
+
+ queue = (Queue)ic.lookup("/queue/Queue");
+
+ }
+
+ protected void tearDown() throws Exception
+ {
+ super.tearDown();
+
+ ServerManagement.undeployQueue("Queue");
+
+ if (ic != null) ic.close();
+ }
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
-
class FailingMessageListener implements MessageListener
{
Modified: branches/Branch_1_0_1_SP/tests/src/org/jboss/test/messaging/jms/MessageConsumerTest.java
===================================================================
--- branches/Branch_1_0_1_SP/tests/src/org/jboss/test/messaging/jms/MessageConsumerTest.java 2006-11-17 03:24:50 UTC (rev 1570)
+++ branches/Branch_1_0_1_SP/tests/src/org/jboss/test/messaging/jms/MessageConsumerTest.java 2006-11-17 03:31:19 UTC (rev 1571)
@@ -48,7 +48,6 @@
import javax.naming.InitialContext;
import org.jboss.jms.destination.JBossTopic;
-import org.jboss.jms.message.MessageProxy;
import org.jboss.test.messaging.MessagingTestCase;
import org.jboss.test.messaging.tools.ServerManagement;
Modified: branches/Branch_1_0_1_SP/tests/src/org/jboss/test/messaging/jms/SessionTest.java
===================================================================
--- branches/Branch_1_0_1_SP/tests/src/org/jboss/test/messaging/jms/SessionTest.java 2006-11-17 03:24:50 UTC (rev 1570)
+++ branches/Branch_1_0_1_SP/tests/src/org/jboss/test/messaging/jms/SessionTest.java 2006-11-17 03:31:19 UTC (rev 1571)
@@ -520,7 +520,7 @@
conn.start();
TextMessage rm = (TextMessage)s.createConsumer(queue).receive(1000);
- assertEquals("bex", m.getText());
+ assertEquals("bex", rm.getText());
conn.close();
}
More information about the jboss-cvs-commits
mailing list