[jboss-cvs] JBoss Messaging SVN: r2761 - in trunk: src/main/org/jboss/jms/client/container and 6 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Wed Jun 6 16:06:54 EDT 2007
Author: timfox
Date: 2007-06-06 16:06:54 -0400 (Wed, 06 Jun 2007)
New Revision: 2761
Added:
trunk/tests/src/org/jboss/test/messaging/jms/stress/OpenCloseStressTest.java
Modified:
trunk/docs/userguide/en/modules/c_configuration.xml
trunk/src/main/org/jboss/jms/client/container/SessionAspect.java
trunk/src/main/org/jboss/jms/client/remoting/MessageCallbackHandler.java
trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java
trunk/src/main/org/jboss/jms/server/remoting/JMSServerInvocationHandler.java
trunk/src/main/org/jboss/messaging/core/ChannelSupport.java
trunk/src/main/org/jboss/messaging/core/PagingChannelSupport.java
trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/LocalClusteredQueue.java
Log:
tests/src/org/jboss/test/messaging/jms/stress/OpenCloseStressTest.java
Modified: trunk/docs/userguide/en/modules/c_configuration.xml
===================================================================
--- trunk/docs/userguide/en/modules/c_configuration.xml 2007-06-06 08:17:48 UTC (rev 2760)
+++ trunk/docs/userguide/en/modules/c_configuration.xml 2007-06-06 20:06:54 UTC (rev 2761)
@@ -12,7 +12,7 @@
<itemizedlist>
- <listitem>Choosing the connetion factory</listitem>
+ <listitem>Choosing the connection factory</listitem>
<listitem>Choosing the cluster router policy</listitem>
@@ -28,7 +28,8 @@
</para>
<para>See the section <xref linkend="conf.connectionfactory">Connection Factory configuration</xref> for more information.
- </para>
+ </para>
+
</section>
@@ -126,4 +127,4 @@
-</chapter>
\ No newline at end of file
+</chapter>
Modified: trunk/src/main/org/jboss/jms/client/container/SessionAspect.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/container/SessionAspect.java 2007-06-06 08:17:48 UTC (rev 2760)
+++ trunk/src/main/org/jboss/jms/client/container/SessionAspect.java 2007-06-06 20:06:54 UTC (rev 2761)
@@ -132,7 +132,7 @@
if (ackMode == Session.AUTO_ACKNOWLEDGE || isXAAndConsideredNonTransacted(state))
{
//Acknowledge or cancel any outstanding auto ack
-
+
DeliveryInfo remainingAutoAck = state.getAutoAckInfo();
if (remainingAutoAck != null)
Modified: trunk/src/main/org/jboss/jms/client/remoting/MessageCallbackHandler.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/remoting/MessageCallbackHandler.java 2007-06-06 08:17:48 UTC (rev 2760)
+++ trunk/src/main/org/jboss/jms/client/remoting/MessageCallbackHandler.java 2007-06-06 20:06:54 UTC (rev 2761)
@@ -325,14 +325,19 @@
public void close(long lastDeliveryId) throws JMSException
{
+ log.debug(this + " closing");
+
+ //Wait for the last delivery to arrive
waitForLastDelivery(lastDeliveryId);
+ //Important! We set the listener to null so the next ListenerRunner won't run
+ setMessageListener(null);
+
+ //Now we wait for any current listener runners to run.
waitForOnMessageToComplete();
synchronized (mainLock)
- {
- log.debug(this + " closing");
-
+ {
if (closed)
{
return;
@@ -837,51 +842,48 @@
{
MessageProxy mp = null;
- boolean again = false;
-
+ MessageListener theListener = null;
+
synchronized (mainLock)
{
- if (listener == null)
+ if (listener == null || buffer.isEmpty())
{
listenerRunning = false;
- if (trace) { log.trace("no listener, returning"); }
+ if (trace) { log.trace("no listener or buffer is empty, returning"); }
return;
}
+ theListener = listener;
+
// remove a message from the buffer
- if (buffer.isEmpty())
+ mp = (MessageProxy)buffer.removeFirst();
+
+ if (!buffer.isEmpty())
{
- listenerRunning = false;
-
- if (trace) { log.trace("no messages in buffer, marking listener as not running"); }
+ //Queue up the next runner to run
+
+ if (trace) { log.trace("More messages in buffer so queueing next onMessage to run"); }
+
+ queueRunner(this);
+
+ if (trace) { log.trace("Queued next onMessage to run"); }
}
else
- {
- mp = (MessageProxy)buffer.removeFirst();
-
- if (mp == null)
- {
- throw new java.lang.IllegalStateException("Cannot find message in buffer!");
- }
-
- again = !buffer.isEmpty();
-
- if (!again)
- {
- listenerRunning = false;
- if (trace) { log.trace("no more messages in buffer, marking listener as not running"); }
- }
- }
+ {
+ if (trace) { log.trace("no more messages in buffer, marking listener as not running"); }
+
+ listenerRunning = false;
+ }
}
if (mp != null)
{
try
{
- callOnMessage(sessionDelegate, listener, consumerID, queueName,
+ callOnMessage(sessionDelegate, theListener, consumerID, queueName,
false, mp, ackMode, maxDeliveries, null);
}
catch (JMSException e)
@@ -890,13 +892,7 @@
}
}
- checkStart();
-
- if (again)
- {
- // Queue it up again
- queueRunner(this);
- }
+ checkStart();
}
}
}
Modified: trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java 2007-06-06 08:17:48 UTC (rev 2760)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java 2007-06-06 20:06:54 UTC (rev 2761)
@@ -227,7 +227,7 @@
// queue for delivery later.
if (!started)
{
- if (trace) { log.trace(this + " NOT started yet!"); }
+ if (trace) { log.trace(this + " NOT started!"); }
return null;
}
Modified: trunk/src/main/org/jboss/jms/server/remoting/JMSServerInvocationHandler.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/remoting/JMSServerInvocationHandler.java 2007-06-06 08:17:48 UTC (rev 2760)
+++ trunk/src/main/org/jboss/jms/server/remoting/JMSServerInvocationHandler.java 2007-06-06 20:06:54 UTC (rev 2761)
@@ -39,6 +39,9 @@
import org.jboss.remoting.callback.InvokerCallbackHandler;
import org.jboss.remoting.callback.ServerInvokerCallbackHandler;
+import EDU.oswego.cs.dl.util.concurrent.ReadWriteLock;
+import EDU.oswego.cs.dl.util.concurrent.WriterPreferenceReadWriteLock;
+
/**
* @author <a href="mailto:ovidiu at jboss.org">Ovidiu Feodorov</a>
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
@@ -54,29 +57,49 @@
// Static ---------------------------------------------------------------------------------------
+ private static boolean closed = true;
+
+ private static ReadWriteLock invokeLock;
+
+ public static void setClosed(boolean b)
+ {
+ try
+ {
+ invokeLock.writeLock().acquire();
+
+ try
+ {
+ closed = b;
+ }
+ finally
+ {
+ invokeLock.writeLock().release();
+ }
+ }
+ catch (InterruptedException e)
+ {
+ log.error("Failed to set closed to " + closed, e);
+ }
+ }
+
// Attributes -----------------------------------------------------------------------------------
private ServerInvoker invoker;
+
private MBeanServer server;
protected Map callbackHandlers;
private boolean trace;
-
- //We need some way the server peer can call the invocation handler to make it open/closed
- private static boolean closed = true;
-
- public static synchronized void setClosed(boolean closed)
- {
- JMSServerInvocationHandler.closed = closed;
- }
-
+
// Constructors ---------------------------------------------------------------------------------
public JMSServerInvocationHandler()
{
callbackHandlers = new HashMap();
trace = log.isTraceEnabled();
+
+ invokeLock = new WriterPreferenceReadWriteLock();
}
// ServerInvocationHandler ----------------------------------------------------------------------
@@ -102,8 +125,9 @@
{
if (trace) { log.trace("invoking " + invocation); }
- synchronized (JMSServerInvocationHandler.class)
- {
+ invokeLock.readLock().acquire();
+ try
+ {
if (closed)
{
throw new MessagingJMSException("Cannot handle invocation since messaging server is not active (it is either starting up or shutting down)");
@@ -140,6 +164,11 @@
return request.serverInvoke();
}
+ finally
+ {
+ invokeLock.readLock().release();
+ }
+
}
public void addListener(InvokerCallbackHandler callbackHandler)
Modified: trunk/src/main/org/jboss/messaging/core/ChannelSupport.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/ChannelSupport.java 2007-06-06 08:17:48 UTC (rev 2760)
+++ trunk/src/main/org/jboss/messaging/core/ChannelSupport.java 2007-06-06 20:06:54 UTC (rev 2761)
@@ -82,7 +82,7 @@
protected PersistenceManager pm;
- protected Object refLock;
+ protected Object lock;
protected boolean active = true;
@@ -114,6 +114,7 @@
{
throw new IllegalArgumentException("ChannelSupport requires a non-null message store");
}
+
if (pm == null)
{
throw new IllegalArgumentException("ChannelSupport requires a " +
@@ -132,7 +133,7 @@
messageRefs = new BasicPriorityLinkedList(10);
- refLock = new Object();
+ lock = new Object();
deliveringCount = new SynchronizedInt(0);
@@ -147,7 +148,7 @@
public Delivery handle(DeliveryObserver sender, MessageReference ref, Transaction tx)
{
- if (!active)
+ if (!isActive())
{
return null;
}
@@ -176,9 +177,9 @@
{
pm.updateDeliveryCount(this.channelID, ref);
}
-
+
deliveringCount.decrement();
-
+
if (!checkAndSchedule(ref))
{
cancelInternal(ref);
@@ -190,59 +191,69 @@
public boolean add(Receiver r)
{
if (trace) { log.trace(this + " attempting to add receiver " + r); }
-
- boolean added = router.add(r);
-
- if (trace) { log.trace("receiver " + r + (added ? "" : " NOT") + " added"); }
-
- synchronized (refLock)
- {
- receiversReady = true;
+
+ synchronized (lock)
+ {
+ boolean added = router.add(r);
+
+ if (trace) { log.trace("receiver " + r + (added ? "" : " NOT") + " added"); }
+
+ receiversReady = true;
+
+
+ return added;
}
-
- return added;
}
public boolean remove(Receiver r)
{
- boolean removed = router.remove(r);
-
- if (removed && !router.iterator().hasNext())
- {
- synchronized (refLock)
- {
- receiversReady = false;
- }
- }
-
- if (trace) { log.trace(this + (removed ? " removed " : " did NOT remove ") + r); }
-
- return removed;
+ synchronized (lock)
+ {
+ boolean removed = router.remove(r);
+
+ if (removed && !router.iterator().hasNext())
+ {
+ receiversReady = false;
+ }
+
+ if (trace) { log.trace(this + (removed ? " removed " : " did NOT remove ") + r); }
+
+ return removed;
+ }
}
public void clear()
{
- router.clear();
-
- synchronized (refLock)
+ synchronized (lock)
{
+ router.clear();
+
receiversReady = false;
}
}
public boolean contains(Receiver r)
{
- return router.contains(r);
+ synchronized (lock)
+ {
+ return router.contains(r);
+ }
}
public Iterator iterator()
{
- return router.iterator();
+ synchronized (lock)
+ {
+ return router.iterator();
+ }
}
public int getNumberOfReceivers()
{
- return router.getNumberOfReceivers();
+ synchronized (lock)
+ {
+ return router.getNumberOfReceivers();
+ }
}
// Channel implementation -----------------------------------------------------------------------
@@ -271,7 +282,7 @@
{
if (trace) { log.trace(this + " browse" + (filter == null ? "" : ", filter = " + filter)); }
- synchronized (refLock)
+ synchronized (lock)
{
//FIXME - This is currently broken since it doesn't take into account
// refs paged into persistent storage
@@ -297,28 +308,30 @@
{
checkClosed();
- if (router.getNumberOfReceivers() > 0)
- {
- synchronized (refLock)
- {
- receiversReady = true;
-
- deliverInternal();
- }
-
- }
+ synchronized (lock)
+ {
+ if (router != null && router.getNumberOfReceivers() > 0)
+ {
+ receiversReady = true;
+
+ deliverInternal();
+ }
+ }
}
public void close()
{
- if (router != null)
- {
- router.clear();
-
- router = null;
- }
-
- clearAllScheduledDeliveries();
+ synchronized (lock)
+ {
+ if (router != null)
+ {
+ router.clear();
+
+ router = null;
+ }
+
+ clearAllScheduledDeliveries();
+ }
}
/*
@@ -333,11 +346,12 @@
{
log.debug(this + " removing all references");
- synchronized (refLock)
+ synchronized (lock)
{
if (deliveringCount.get() > 0)
{
- throw new IllegalStateException("Cannot remove references while deliveries are in progress");
+ throw new IllegalStateException("Cannot remove references while deliveries are in progress, there are " +
+ deliveringCount.get());
}
//Now we consume the rest of the messages
@@ -370,7 +384,7 @@
{
List undelivered = new ArrayList();
- synchronized (refLock)
+ synchronized (lock)
{
Iterator iter = messageRefs.getAll().iterator();
@@ -401,7 +415,7 @@
*/
public int getMessageCount()
{
- synchronized (refLock)
+ synchronized (lock)
{
return messageRefs.size() + getDeliveringCount() + getScheduledCount();
}
@@ -422,7 +436,7 @@
public void activate()
{
- synchronized (refLock)
+ synchronized (lock)
{
active = true;
}
@@ -430,7 +444,7 @@
public void deactivate()
{
- synchronized (refLock)
+ synchronized (lock)
{
active = false;
}
@@ -438,7 +452,7 @@
public boolean isActive()
{
- synchronized (refLock)
+ synchronized (lock)
{
return active;
}
@@ -451,7 +465,7 @@
List dels = new ArrayList();
- synchronized (refLock)
+ synchronized (lock)
{
ListIterator liter = messageRefs.iterator();
@@ -491,7 +505,7 @@
public int getMaxSize()
{
- synchronized (refLock)
+ synchronized (lock)
{
return maxSize;
}
@@ -499,7 +513,7 @@
public void setMaxSize(int newSize)
{
- synchronized (refLock)
+ synchronized (lock)
{
int count = getMessageCount();
@@ -524,7 +538,7 @@
//Only used for testing
public int memoryRefCount()
{
- synchronized (refLock)
+ synchronized (lock)
{
return messageRefs.size();
}
@@ -564,7 +578,7 @@
{
if (trace) { log.trace(this + " cancelling " + ref + " in memory"); }
- synchronized (refLock)
+ synchronized (lock)
{
messageRefs.addFirst(ref, ref.getMessage().getPriority());
}
@@ -633,7 +647,7 @@
// Receiver accepted the reference
- synchronized (refLock)
+ synchronized (lock)
{
if (iter == null)
{
@@ -648,7 +662,7 @@
iter.remove();
}
}
-
+
deliveringCount.increment();
}
}
@@ -673,7 +687,7 @@
{
// We synchonize on the ref lock to prevent scheduled deivery kicking in before
// load has finished
- synchronized (refLock)
+ synchronized (lock)
{
// Attempt to push the ref to a receiver
@@ -762,7 +776,7 @@
if (!checkAndSchedule(ref))
{
- synchronized (refLock)
+ synchronized (lock)
{
addReferenceInMemory(ref);
@@ -851,7 +865,7 @@
}
d.getReference().releaseMemoryReference();
-
+
deliveringCount.decrement();
}
else
@@ -992,7 +1006,7 @@
try
{
- synchronized (refLock)
+ synchronized (lock)
{
addReferenceInMemory(ref);
}
@@ -1022,7 +1036,7 @@
// prompt delivery
if (promptDelivery)
{
- synchronized (refLock)
+ synchronized (lock)
{
deliverInternal();
}
Modified: trunk/src/main/org/jboss/messaging/core/PagingChannelSupport.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/PagingChannelSupport.java 2007-06-06 08:17:48 UTC (rev 2760)
+++ trunk/src/main/org/jboss/messaging/core/PagingChannelSupport.java 2007-06-06 20:06:54 UTC (rev 2761)
@@ -147,7 +147,7 @@
// Also need to add the paged refs
- synchronized (refLock)
+ synchronized (lock)
{
count += nextPagingOrder - firstPagingOrder;
}
@@ -160,7 +160,7 @@
//Only used in testing
public int downCacheCount()
{
- synchronized (refLock)
+ synchronized (lock)
{
return downCache.size();
}
@@ -169,7 +169,7 @@
//Only used in testing
public boolean isPaging()
{
- synchronized (refLock)
+ synchronized (lock)
{
return paging;
}
@@ -177,7 +177,7 @@
public void setPagingParams(int fullSize, int pageSize, int downCacheSize)
{
- synchronized (refLock)
+ synchronized (lock)
{
if (active)
{
@@ -194,7 +194,7 @@
public void load() throws Exception
{
- synchronized (refLock)
+ synchronized (lock)
{
if (active)
{
@@ -221,7 +221,7 @@
public void unload() throws Exception
{
- synchronized (refLock)
+ synchronized (lock)
{
if (active)
{
@@ -240,13 +240,6 @@
}
}
- public boolean isActive()
- {
- synchronized (refLock)
- {
- return active;
- }
- }
// Protected ------------------------------------------------------------------------------------
@@ -320,7 +313,7 @@
protected void cancelInternal(MessageReference ref) throws Exception
{
- synchronized (refLock)
+ synchronized (lock)
{
super.cancelInternal(ref);
Modified: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/LocalClusteredQueue.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/LocalClusteredQueue.java 2007-06-06 08:17:48 UTC (rev 2760)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/LocalClusteredQueue.java 2007-06-06 20:06:54 UTC (rev 2761)
@@ -253,7 +253,7 @@
{
if (trace) { log.trace("Merging queue " + remoteQueue + " into " + this); }
- synchronized (refLock)
+ synchronized (lock)
{
flushDownCache();
@@ -421,7 +421,7 @@
{
MessageReference ref;
- synchronized (refLock)
+ synchronized (lock)
{
ref = removeFirstInMemory();
Added: trunk/tests/src/org/jboss/test/messaging/jms/stress/OpenCloseStressTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/stress/OpenCloseStressTest.java (rev 0)
+++ trunk/tests/src/org/jboss/test/messaging/jms/stress/OpenCloseStressTest.java 2007-06-06 20:06:54 UTC (rev 2761)
@@ -0,0 +1,462 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.test.messaging.jms.stress;
+
+import javax.jms.Connection;
+import javax.jms.DeliveryMode;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
+import javax.naming.InitialContext;
+
+import org.jboss.jms.client.JBossConnectionFactory;
+import org.jboss.test.messaging.MessagingTestCase;
+import org.jboss.test.messaging.tools.ServerManagement;
+import org.jboss.util.id.GUID;
+
+/**
+ *
+ * A OpenCloseStressTest.
+ *
+ * This stress test starts several publisher connections and several subscriber connections, then sends and consumes
+ * messages while concurrently closing the sessions.
+ *
+ * This test will help catch race conditions that occurred with rapid open/closing of sessions when messages are being
+ * sent/received
+ *
+ * E.g. http://jira.jboss.com/jira/browse/JBMESSAGING-982
+ *
+ * @author <a href="tim.fox at jboss.com">Tim Fox</a>
+ * @version <tt>$Revision: 2349 $</tt>
+ *
+ * $Id: StressTest.java 2349 2007-02-19 14:15:53Z timfox $
+ */
+public class OpenCloseStressTest extends MessagingTestCase
+{
+ public OpenCloseStressTest(String name)
+ {
+ super(name);
+ }
+
+ InitialContext ic;
+ JBossConnectionFactory cf;
+ Topic topic;
+
+ public void setUp() throws Exception
+ {
+ super.setUp();
+
+ ServerManagement.start("all");
+
+ ic = new InitialContext(ServerManagement.getJNDIEnvironment());
+ cf = (JBossConnectionFactory)ic.lookup("/ConnectionFactory");
+
+ ServerManagement.undeployTopic("TestTopic");
+ ServerManagement.deployTopic("TestTopic");
+
+ topic = (Topic) ic.lookup("topic/TestTopic");
+
+ log.debug("setup done");
+ }
+
+ public void tearDown() throws Exception
+ {
+ ServerManagement.undeployQueue("TestQueue");
+
+ super.tearDown();
+
+ log.debug("tear down done");
+ }
+
+ public void testOpenClose() throws Exception
+ {
+ Connection conn1 = null;
+ Connection conn2 = null;
+ Connection conn3 = null;
+
+ Connection conn4 = null;
+ Connection conn5 = null;
+ Connection conn6 = null;
+ Connection conn7 = null;
+ Connection conn8 = null;
+
+ try
+ {
+ Publisher[] publishers = new Publisher[3];
+
+ final int MSGS_PER_PUBLISHER = 10000;
+
+ conn1 = cf.createConnection();
+ Session sess1 = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageProducer prod1 = sess1.createProducer(topic);
+ prod1.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+ publishers[0] = new Publisher(sess1, prod1, MSGS_PER_PUBLISHER, 2);
+
+ conn2 = cf.createConnection();
+ Session sess2 = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageProducer prod2 = sess2.createProducer(topic);
+ prod2.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+ publishers[1] = new Publisher(sess2, prod2, MSGS_PER_PUBLISHER, 5);
+
+ conn3 = cf.createConnection();
+ Session sess3 = conn3.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageProducer prod3 = sess3.createProducer(topic);
+ prod3.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+ publishers[2] = new Publisher(sess3, prod3, MSGS_PER_PUBLISHER, 1);
+
+ Subscriber[] subscribers = new Subscriber[5];
+
+ conn4 = cf.createConnection();
+ subscribers[0] = new Subscriber(conn4, 3 * MSGS_PER_PUBLISHER, 500, 1000 * 60 * 5, topic, false);
+
+ conn5 = cf.createConnection();
+ subscribers[1] = new Subscriber(conn5, 3 * MSGS_PER_PUBLISHER, 2000, 1000 * 60 * 5, topic, false);
+
+ conn6 = cf.createConnection();
+ subscribers[2] = new Subscriber(conn6, 3 * MSGS_PER_PUBLISHER, 700, 1000 * 60 * 5, topic, false);
+
+ conn7 = cf.createConnection();
+ subscribers[3] = new Subscriber(conn7, 3 * MSGS_PER_PUBLISHER, 1500, 1000 * 60 * 5, topic, true);
+
+ conn8 = cf.createConnection();
+ subscribers[4] = new Subscriber(conn8, 3 * MSGS_PER_PUBLISHER, 1200, 1000 * 60 * 5, topic, true);
+
+ Thread[] threads = new Thread[8];
+
+ //subscribers
+ threads[0] = new Thread(subscribers[0]);
+
+ threads[1] = new Thread(subscribers[1]);
+
+ threads[2] = new Thread(subscribers[2]);
+
+ threads[3] = new Thread(subscribers[3]);
+
+ threads[4] = new Thread(subscribers[4]);
+
+ //publishers
+
+ threads[5] = new Thread(publishers[0]);
+
+ threads[6] = new Thread(publishers[1]);
+
+ threads[7] = new Thread(publishers[2]);
+
+ for (int i = 0; i < subscribers.length; i++)
+ {
+ threads[i].start();
+ }
+
+ // Pause before creating producers otherwise subscribers might not get all messages
+
+ Thread.sleep(5000);
+
+ for (int i = subscribers.length; i < threads.length; i++)
+ {
+ threads[i].start();
+ }
+
+ for (int i = 0; i < threads.length; i++)
+ {
+ threads[i].join();
+ }
+
+ for (int i = 0; i < subscribers.length; i++)
+ {
+ if (subscribers[i].isDurable())
+ {
+ assertEquals(3 * MSGS_PER_PUBLISHER, subscribers[i].getMessagesReceived());
+ }
+ else
+ {
+ //Note that for a non durable subscriber the number of messages received in total
+ //will be somewhat less than the total number received since when recycling the session
+ //there is a period of time after closing the previous session and starting the next one
+ //when messages are being sent and won't be received (since there is no consumer)
+ }
+
+
+ assertFalse(subscribers[i].isFailed());
+ }
+
+ for (int i = 0; i < publishers.length; i++)
+ {
+ assertFalse(publishers[i].isFailed());
+ }
+
+ }
+ finally
+ {
+ if (conn1 != null)
+ {
+ conn1.close();
+ }
+ if (conn2 != null)
+ {
+ conn2.close();
+ }
+ if (conn3 != null)
+ {
+ conn3.close();
+ }
+ if (conn4 != null)
+ {
+ conn4.close();
+ }
+ if (conn5 != null)
+ {
+ conn5.close();
+ }
+ if (conn6 != null)
+ {
+ conn6.close();
+ }
+ if (conn7 != null)
+ {
+ conn7.close();
+ }
+ if (conn8 != null)
+ {
+ conn8.close();
+ }
+ }
+
+ }
+
+ class Publisher implements Runnable
+ {
+ private Session sess;
+
+ private int numMessages;
+
+ private int delay;
+
+ private MessageProducer prod;
+
+ private boolean failed;
+
+ boolean isFailed()
+ {
+ return failed;
+ }
+
+ Publisher(Session sess, MessageProducer prod, int numMessages, int delay)
+ {
+ this.sess = sess;
+
+ this.prod = prod;
+
+ this.numMessages = numMessages;
+
+ this.delay = delay;
+ }
+
+ public void run()
+ {
+ try
+ {
+ for (int i = 0; i < numMessages; i++)
+ {
+ TextMessage tm = sess.createTextMessage("message" + i);
+
+
+// if (i % 1000 == 0)
+// {
+// log.info("Publisher " + this + " sending message " + i);
+// }
+
+ prod.send(tm);
+
+ try
+ {
+ Thread.sleep(delay);
+ }
+ catch (Exception ignore)
+ {
+ }
+ }
+
+ log.info("Publisher " + this + " sent all messages");
+ }
+ catch (JMSException e)
+ {
+ log.error("Failed to send message", e);
+ failed = true;
+ }
+ }
+
+ }
+
+
+ class Subscriber implements Runnable
+ {
+ private Session sess;
+
+ private MessageConsumer cons;
+
+ private int msgsReceived;
+
+ private int numMessages;
+
+ private int delay;
+
+ private Connection conn;
+
+ private boolean failed;
+
+ private long timeout;
+
+ private Destination dest;
+
+ private boolean durable;
+
+ private String subname;
+
+ boolean isFailed()
+ {
+ return failed;
+ }
+
+ boolean isDurable()
+ {
+ return durable;
+ }
+
+
+ synchronized void msgReceived()
+ {
+ msgsReceived++;
+
+// if (msgsReceived % 1000 == 0)
+// {
+// log.info("Subscriber " + this + " received " + msgsReceived + " messages");
+// }
+ }
+
+ synchronized int getMessagesReceived()
+ {
+ return msgsReceived;
+ }
+
+ class Listener implements MessageListener
+ {
+
+ public void onMessage(Message msg)
+ {
+ msgReceived();
+ }
+
+ }
+
+
+
+ Subscriber(Connection conn, int numMessages, int delay, long timeout, Destination dest, boolean durable) throws Exception
+ {
+ this.conn = conn;
+
+ this.numMessages = numMessages;
+
+ this.delay = delay;
+
+ this.timeout = timeout;
+
+ this.dest = dest;
+
+ this.durable = durable;
+
+ if (durable)
+ {
+ conn.setClientID(new GUID().toString());
+
+ this.subname = new GUID().toString();
+ }
+ }
+
+ public void run()
+ {
+ try
+ {
+ long start = System.currentTimeMillis();
+
+ while (((System.currentTimeMillis() - start) < timeout) && msgsReceived < numMessages)
+ {
+ //recycle the session
+
+ recycleSession();
+
+ Thread.sleep(delay);
+ }
+
+ //Delete the durable sub
+
+ if (durable)
+ {
+ recycleSession();
+
+ cons.close();
+
+ sess.unsubscribe(subname);
+ }
+ }
+ catch (Exception e)
+ {
+ log.error("Failed in subscriber", e);
+ failed = true;
+ }
+
+ }
+
+ void recycleSession() throws Exception
+ {
+ conn.stop();
+
+ if (sess != null)
+ {
+ sess.close();
+ }
+
+ sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ if (durable)
+ {
+ cons = sess.createDurableSubscriber((Topic)dest, subname);
+ }
+ else
+ {
+ cons = sess.createConsumer(dest);
+ }
+
+ cons.setMessageListener(new Listener());
+
+ conn.start();
+ }
+
+ }
+
+}
+
More information about the jboss-cvs-commits
mailing list