[jboss-cvs] JBoss Messaging SVN: r6152 - in trunk: tests/src/org/jboss/messaging/tests/integration/client and 1 other directory.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Tue Mar 24 14:47:27 EDT 2009
Author: ataylor
Date: 2009-03-24 14:47:26 -0400 (Tue, 24 Mar 2009)
New Revision: 6152
Modified:
trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java
trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerInternal.java
trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java
trunk/tests/src/org/jboss/messaging/tests/integration/client/ClientConsumerTest.java
trunk/tests/src/org/jboss/messaging/tests/integration/client/ClientEndToEndTest.java
Log:
fixed session.stop() and tests
Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java 2009-03-24 14:45:53 UTC (rev 6151)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java 2009-03-24 18:47:26 UTC (rev 6152)
@@ -12,14 +12,6 @@
package org.jboss.messaging.core.client.impl;
-import java.io.File;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.channels.FileChannel;
-import java.util.LinkedList;
-import java.util.Queue;
-import java.util.concurrent.Executor;
-
import org.jboss.messaging.core.buffers.ChannelBuffers;
import org.jboss.messaging.core.client.ClientFileMessage;
import org.jboss.messaging.core.client.ClientMessage;
@@ -34,6 +26,14 @@
import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
import org.jboss.messaging.utils.Future;
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.util.LinkedList;
+import java.util.Queue;
+import java.util.concurrent.Executor;
+
/**
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
* @author <a href="mailto:ovidiu at feodorov.com">Ovidiu Feodorov</a>
@@ -94,6 +94,8 @@
private volatile ClientMessage lastAckedMessage;
+ private boolean stopped = false;
+
// Constructors
// ---------------------------------------------------------------------------------
@@ -153,7 +155,8 @@
synchronized (this)
{
- while ((m = buffer.poll()) == null && !closed && toWait > 0)
+ while ((stopped || (m = buffer.poll()) == null) &&
+ !closed && toWait > 0 )
{
if (start == -1)
{
@@ -251,11 +254,17 @@
if (queueUp)
{
+ stopped = false;
for (int i = 0; i < buffer.size(); i++)
{
queueExecutor();
}
}
+ else
+ {
+ stopped = true;
+ waitForOnMessageToComplete();
+ }
}
public void close() throws MessagingException
@@ -288,6 +297,25 @@
return directory != null;
}
+ public synchronized void stop() throws MessagingException
+ {
+ if(stopped)
+ {
+ return;
+ }
+ stopped = true;
+ waitForOnMessageToComplete();
+ }
+
+ public synchronized void start()
+ {
+ stopped = false;
+ for (int i = 0; i < buffer.size(); i++)
+ {
+ queueExecutor();
+ }
+ }
+
public Exception getLastException()
{
return lastException;
@@ -322,9 +350,11 @@
{
// Execute using executor
- buffer.add(messageToHandle);
-
- queueExecutor();
+ buffer.add(messageToHandle);
+ if(!stopped)
+ {
+ queueExecutor();
+ }
}
else
{
@@ -529,11 +559,10 @@
private void callOnMessage() throws Exception
{
- if (closing)
+ if (closing || stopped)
{
return;
}
-
// We pull the message from the buffer from inside the Runnable so we can ensure priority
// ordering. If we just added a Runnable with the message to the executor immediately as we get it
// we could not do that
Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerInternal.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerInternal.java 2009-03-24 14:45:53 UTC (rev 6151)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerInternal.java 2009-03-24 18:47:26 UTC (rev 6152)
@@ -60,4 +60,8 @@
void flushAcks() throws MessagingException;
boolean isFileConsumer();
+
+ void stop() throws MessagingException;
+
+ void start();
}
Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java 2009-03-24 14:45:53 UTC (rev 6151)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java 2009-03-24 18:47:26 UTC (rev 6152)
@@ -21,21 +21,6 @@
*/
package org.jboss.messaging.core.client.impl;
-import static org.jboss.messaging.utils.SimpleString.toSimpleString;
-
-import java.io.File;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Executor;
-import java.util.concurrent.Executors;
-
-import javax.transaction.xa.XAException;
-import javax.transaction.xa.XAResource;
-import javax.transaction.xa.Xid;
-
import org.jboss.messaging.core.buffers.ChannelBuffers;
import org.jboss.messaging.core.client.ClientConsumer;
import org.jboss.messaging.core.client.ClientFileMessage;
@@ -87,8 +72,21 @@
import org.jboss.messaging.utils.OrderedExecutorFactory;
import org.jboss.messaging.utils.SimpleIDGenerator;
import org.jboss.messaging.utils.SimpleString;
+import static org.jboss.messaging.utils.SimpleString.toSimpleString;
import org.jboss.messaging.utils.TokenBucketLimiterImpl;
+import javax.transaction.xa.XAException;
+import javax.transaction.xa.XAResource;
+import javax.transaction.xa.Xid;
+import java.io.File;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+
/*
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
*
@@ -607,6 +605,11 @@
if (!started)
{
+ for (ClientConsumerInternal clientConsumerInternal : consumers.values())
+ {
+ clientConsumerInternal.start();
+ }
+
channel.send(new PacketImpl(PacketImpl.SESS_START));
started = true;
@@ -619,6 +622,11 @@
if (started)
{
+ for (ClientConsumerInternal clientConsumerInternal : consumers.values())
+ {
+ clientConsumerInternal.stop();
+ }
+
channel.sendBlocking(new PacketImpl(PacketImpl.SESS_STOP));
started = false;
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/client/ClientConsumerTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/client/ClientConsumerTest.java 2009-03-24 14:45:53 UTC (rev 6151)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/client/ClientConsumerTest.java 2009-03-24 18:47:26 UTC (rev 6152)
@@ -21,8 +21,6 @@
*/
package org.jboss.messaging.tests.integration.client;
-import java.util.concurrent.CountDownLatch;
-
import org.jboss.messaging.core.client.ClientConsumer;
import org.jboss.messaging.core.client.ClientMessage;
import org.jboss.messaging.core.client.ClientProducer;
@@ -37,6 +35,9 @@
import org.jboss.messaging.tests.util.ServiceTestBase;
import org.jboss.messaging.utils.SimpleString;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
/**
* @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
* @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
@@ -466,8 +467,44 @@
session.close();
}
+ public void testStopStartConsumerSyncReceiveImmediate() throws Exception
+ {
+ ClientSessionFactory sf = createInVMFactory();
- public void testStopConsumer() throws Exception
+ final ClientSession session = sf.createSession(false, true, true);
+
+ session.createQueue(QUEUE, QUEUE, null, false, false);
+
+ ClientProducer producer = session.createProducer(QUEUE);
+
+ final int numMessages = 100;
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = createTextMessage("m" + i, session);
+ message.putIntProperty(new SimpleString("i"), i);
+ producer.send(message);
+ }
+
+ final ClientConsumer consumer = session.createConsumer(QUEUE);
+
+ session.start();
+
+
+ for(int i = 0; i < numMessages/2; i++)
+ {
+ ClientMessage cm = consumer.receive(5000);
+ assertNotNull(cm);
+ cm.acknowledge();
+ }
+ session.stop();
+ ClientMessage cm = consumer.receiveImmediate();
+ assertNull(cm);
+
+ session.close();
+ }
+
+ public void testStopStartConsumerSyncReceive() throws Exception
{
ClientSessionFactory sf = createInVMFactory();
@@ -482,13 +519,57 @@
for (int i = 0; i < numMessages; i++)
{
ClientMessage message = createTextMessage("m" + i, session);
+ message.putIntProperty(new SimpleString("i"), i);
producer.send(message);
}
- final ClientConsumer consumer = session.createConsumer(QUEUE, null, true);
+ final ClientConsumer consumer = session.createConsumer(QUEUE);
session.start();
+
+ for(int i = 0; i < numMessages/2; i++)
+ {
+ ClientMessage cm = consumer.receive(5000);
+ assertNotNull(cm);
+ cm.acknowledge();
+ }
+ session.stop();
+ long time = System.currentTimeMillis();
+ ClientMessage cm = consumer.receive(1000);
+ long taken = System.currentTimeMillis() - time;
+ assertTrue(taken >= 1000);
+ assertNull(cm);
+
+ session.close();
+ }
+
+ public void testStopStartConsumerAsyncSync() throws Exception
+ {
+ ClientSessionFactory sf = createInVMFactory();
+
+ final ClientSession session = sf.createSession(false, true, true);
+
+ session.createQueue(QUEUE, QUEUE, null, false, false);
+
+ ClientProducer producer = session.createProducer(QUEUE);
+
+ final int numMessages = 100;
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = createTextMessage("m" + i, session);
+ message.putIntProperty(new SimpleString("i"), i);
+ producer.send(message);
+ }
+
+ final ClientConsumer consumer = session.createConsumer(QUEUE);
+
+ Queue q = (Queue) messagingService.getServer().getPostOffice().getBinding(QUEUE).getBindable();
+ int ccount = q.getConsumerCount();
+
+ session.start();
+
final CountDownLatch latch = new CountDownLatch(10);
// Message should be in consumer
@@ -513,12 +594,13 @@
if (latch.getCount() == 0)
{
+
+ message.acknowledge();
session.stop(); // Shouldn't this alone prevent messages being delivered to this Handler?
started = false;
- consumer.setMessageHandler(null); // If we comment out this line, the test will fail
+ //consumer.setMessageHandler(null); // If we comment out this line, the test will fail
}
- message.acknowledge();
}
catch (Exception e)
{
@@ -538,11 +620,17 @@
// Make sure no exceptions were thrown from onMessage
assertNull(consumer.getLastException());
-
+ consumer.setMessageHandler(null);
+ session.start();
for (int i = 0; i < 90; i++)
{
ClientMessage msg = consumer.receive(1000);
- assertNotNull(msg);
+ ccount = q.getConsumerCount();
+ if(msg == null)
+ {
+ System.out.println("ClientConsumerTest.testStopConsumer");
+ }
+ assertNotNull("message " + i, msg);
msg.acknowledge();
}
@@ -551,6 +639,208 @@
session.close();
}
+ public void testStopStartConsumerAsyncASync() throws Exception
+ {
+ ClientSessionFactory sf = createInVMFactory();
+
+ final ClientSession session = sf.createSession(false, true, true);
+
+ session.createQueue(QUEUE, QUEUE, null, false, false);
+
+ ClientProducer producer = session.createProducer(QUEUE);
+
+ final int numMessages = 100;
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = createTextMessage("m" + i, session);
+ message.putIntProperty(new SimpleString("i"), i);
+ producer.send(message);
+ }
+
+ final ClientConsumer consumer = session.createConsumer(QUEUE);
+
+ Queue q = (Queue) messagingService.getServer().getPostOffice().getBinding(QUEUE).getBindable();
+ int ccount = q.getConsumerCount();
+
+ session.start();
+
+ CountDownLatch latch = new CountDownLatch(10);
+
+ // Message should be in consumer
+
+ class MyHandler implements MessageHandler
+ {
+ int messageReceived = 0;
+ boolean failed;
+
+ boolean started = true;
+
+ private final CountDownLatch latch;
+
+ private boolean stop = true;
+
+ public MyHandler(CountDownLatch latch)
+ {
+ this.latch = latch;
+ }
+
+ public MyHandler(CountDownLatch latch, boolean stop)
+ {
+ this(latch);
+ this.stop = stop;
+ }
+
+ public void onMessage(final ClientMessage message)
+ {
+
+ try
+ {
+ if (!started)
+ {
+ failed = true;
+ }
+ messageReceived++;
+ latch.countDown();
+
+ if (stop && latch.getCount() == 0)
+ {
+
+ message.acknowledge();
+ session.stop(); // Shouldn't this alone prevent messages being delivered to this Handler?
+ started = false;
+ //consumer.setMessageHandler(null); // If we comment out this line, the test will fail
+ }
+
+ }
+ catch (Exception e)
+ {
+ }
+ }
+ }
+
+ MyHandler handler = new MyHandler(latch);
+
+ consumer.setMessageHandler(handler);
+
+ latch.await();
+
+ Thread.sleep(100);
+
+ assertFalse(handler.failed);
+
+ // Make sure no exceptions were thrown from onMessage
+ assertNull(consumer.getLastException());
+ latch = new CountDownLatch(90);
+ handler = new MyHandler(latch, false);
+ consumer.setMessageHandler(handler);
+ session.start();
+ assertTrue("message received " + handler.messageReceived, latch.await(5, TimeUnit.SECONDS));
+
+ Thread.sleep(100);
+
+ assertFalse(handler.failed);
+ assertNull(consumer.getLastException());
+ session.close();
+ }
+
+
+ public void testSetUnsetMessageHandler() throws Exception
+ {
+ ClientSessionFactory sf = createInVMFactory();
+
+ final ClientSession session = sf.createSession(false, true, true);
+
+ session.createQueue(QUEUE, QUEUE, null, false, false);
+
+ ClientProducer producer = session.createProducer(QUEUE);
+
+ final int numMessages = 100;
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = createTextMessage("m" + i, session);
+ message.putIntProperty(new SimpleString("i"), i);
+ producer.send(message);
+ }
+
+ final ClientConsumer consumer = session.createConsumer(QUEUE);
+
+ Queue q = (Queue) messagingService.getServer().getPostOffice().getBinding(QUEUE).getBindable();
+ int ccount = q.getConsumerCount();
+
+ session.start();
+
+ CountDownLatch latch = new CountDownLatch(50);
+
+ // Message should be in consumer
+
+ class MyHandler implements MessageHandler
+ {
+ int messageReceived = 0;
+ boolean failed;
+
+ boolean started = true;
+
+ private final CountDownLatch latch;
+
+ public MyHandler(CountDownLatch latch)
+ {
+ this.latch = latch;
+ }
+
+ public void onMessage(final ClientMessage message)
+ {
+
+ try
+ {
+ if (!started)
+ {
+ failed = true;
+ }
+ messageReceived++;
+ latch.countDown();
+
+ if (latch.getCount() == 0)
+ {
+
+ message.acknowledge();
+ started = false;
+ consumer.setMessageHandler(null);
+ }
+
+ }
+ catch (Exception e)
+ {
+ }
+ }
+ }
+
+ MyHandler handler = new MyHandler(latch);
+
+ consumer.setMessageHandler(handler);
+
+ latch.await();
+
+ Thread.sleep(100);
+
+ assertFalse(handler.failed);
+
+ // Make sure no exceptions were thrown from onMessage
+ assertNull(consumer.getLastException());
+ latch = new CountDownLatch(50);
+ handler = new MyHandler(latch);
+ consumer.setMessageHandler(handler);
+ session.start();
+ assertTrue("message received " + handler.messageReceived, latch.await(5, TimeUnit.SECONDS));
+
+ Thread.sleep(100);
+
+ assertFalse(handler.failed);
+ assertNull(consumer.getLastException());
+ session.close();
+ }
+
public void testConsumerAckImmediateAutoCommitTrue() throws Exception
{
ClientSessionFactory sf = createInVMFactory();
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/client/ClientEndToEndTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/client/ClientEndToEndTest.java 2009-03-24 14:45:53 UTC (rev 6151)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/client/ClientEndToEndTest.java 2009-03-24 18:47:26 UTC (rev 6152)
@@ -1093,53 +1093,51 @@
}
}
- //FIXME uncomment when https://jira.jboss.org/jira/browse/JBMESSAGING-1549 is fixed
-// public void testAsyncConsumerRollback() throws Exception
-// {
-// MessagingService messagingService = createService(false);
-// try
-// {
-// messagingService.start();
-// ClientSessionFactory cf = createInVMFactory();
-// cf.setBlockOnAcknowledge(true);
-// cf.setAckBatchSize(0);
-// ClientSession sendSession = cf.createSession(false, true, true);
-// final ClientSession session = cf.createSession(false, true, false);
-// sendSession.createQueue(addressA, queueA, false);
-// ClientProducer cp = sendSession.createProducer(addressA);
-// ClientConsumer cc = session.createConsumer(queueA);
-// int numMessages = 100;
-// for (int i = 0; i < numMessages; i++)
-// {
-// cp.send(sendSession.createClientMessage(false));
-// }
-// CountDownLatch latch = new CountDownLatch(numMessages);
-// session.start();
-// cc.setMessageHandler(new ackHandler(session, latch));
-// assertTrue(latch.await(5, TimeUnit.SECONDS));
-// Queue q = (Queue)messagingService.getServer().getPostOffice().getBinding(queueA).getBindable();
-// assertEquals(numMessages, q.getDeliveringCount());
-// assertEquals(numMessages, q.getMessageCount());
-// //Need to stop session first or rollback will cause immediate redelivery
-// session.stop();
-// session.rollback();
-// assertEquals(0, q.getDeliveringCount());
-// assertEquals(numMessages, q.getMessageCount());
-// session.start();
-// latch = new CountDownLatch(numMessages);
-// cc.setMessageHandler(new ackHandler(session, latch));
-// assertTrue(latch.await(5, TimeUnit.SECONDS));
-// sendSession.close();
-// session.close();
-// }
-// finally
-// {
-// if (messagingService.isStarted())
-// {
-// messagingService.stop();
-// }
-// }
-// }
+ public void testAsyncConsumerRollback() throws Exception
+ {
+ MessagingService messagingService = createService(false);
+ try
+ {
+ messagingService.start();
+ ClientSessionFactory cf = createInVMFactory();
+ cf.setBlockOnAcknowledge(true);
+ cf.setAckBatchSize(0);
+ ClientSession sendSession = cf.createSession(false, true, true);
+ final ClientSession session = cf.createSession(false, true, false);
+ sendSession.createQueue(addressA, queueA, false);
+ ClientProducer cp = sendSession.createProducer(addressA);
+ ClientConsumer cc = session.createConsumer(queueA);
+ int numMessages = 100;
+ for (int i = 0; i < numMessages; i++)
+ {
+ cp.send(sendSession.createClientMessage(false));
+ }
+ CountDownLatch latch = new CountDownLatch(numMessages);
+ session.start();
+ cc.setMessageHandler(new ackHandler(session, latch));
+ assertTrue(latch.await(5, TimeUnit.SECONDS));
+ Queue q = (Queue) messagingService.getServer().getPostOffice().getBinding(queueA).getBindable();
+ assertEquals(numMessages, q.getDeliveringCount());
+ assertEquals(numMessages, q.getMessageCount());
+ session.stop();
+ session.rollback();
+ assertEquals(0, q.getDeliveringCount());
+ assertEquals(numMessages, q.getMessageCount());
+ latch = new CountDownLatch(numMessages);
+ cc.setMessageHandler(new ackHandler(session, latch));
+ session.start();
+ assertTrue(latch.await(5, TimeUnit.SECONDS));
+ sendSession.close();
+ session.close();
+ }
+ finally
+ {
+ if (messagingService.isStarted())
+ {
+ messagingService.stop();
+ }
+ }
+ }
public void testSendDeliveryOrderOnCommit() throws Exception
{
@@ -1412,12 +1410,12 @@
consumers[3] = session.createConsumer(queueA);
consumers[4] = session.createConsumer(queueA);
- ClientSession sendSession = cf.createSession(false, true, true);
- ClientProducer cp = sendSession.createProducer(addressA);
+ //ClientSession sendSession = cf.createSession(false, true, true);
+ ClientProducer cp = session.createProducer(addressA);
int numMessage = 100;
for (int i = 0; i < numMessage; i++)
{
- ClientMessage cm = sendSession.createClientMessage(false);
+ ClientMessage cm = session.createClientMessage(false);
cm.getBody().writeInt(i);
cp.send(cm);
}
@@ -1431,7 +1429,7 @@
assertEquals(currMessage++, cm.getBody().readInt());
}
}
- sendSession.close();
+ //sendSession.close();
session.close();
}
finally
More information about the jboss-cvs-commits
mailing list