[jboss-cvs] JBoss Messaging SVN: r4459 - in branches/Branch_Experimental_JBMESSAGING_1356_2: src/main/org/jboss/jms/server and 7 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Thu Jun 12 21:08:10 EDT 2008
Author: ovidiu.feodorov at jboss.com
Date: 2008-06-12 21:08:10 -0400 (Thu, 12 Jun 2008)
New Revision: 4459
Added:
branches/Branch_Experimental_JBMESSAGING_1356_2/src/main/org/jboss/messaging/util/DirectSerialExecutor.java
branches/Branch_Experimental_JBMESSAGING_1356_2/src/main/org/jboss/messaging/util/PooledSerialExecutor.java
branches/Branch_Experimental_JBMESSAGING_1356_2/src/main/org/jboss/messaging/util/QueuedSerialExecutor.java
branches/Branch_Experimental_JBMESSAGING_1356_2/src/main/org/jboss/messaging/util/SerialExecutor.java
branches/Branch_Experimental_JBMESSAGING_1356_2/tests/src/org/jboss/test/messaging/jms/PooledThreadsDeliveryTest.java
branches/Branch_Experimental_JBMESSAGING_1356_2/tests/src/org/jboss/test/messaging/util/JBMExecutorTest.java
branches/Branch_Experimental_JBMESSAGING_1356_2/tests/src/org/jboss/test/messaging/util/NamedThreadQueuedExecutorTest.java
branches/Branch_Experimental_JBMESSAGING_1356_2/tests/src/org/jboss/test/messaging/util/PooledSerialExecutorTest.java
branches/Branch_Experimental_JBMESSAGING_1356_2/tests/src/org/jboss/test/messaging/util/QueuedSerialExecutorTest.java
branches/Branch_Experimental_JBMESSAGING_1356_2/tests/src/org/jboss/test/messaging/util/base/
branches/Branch_Experimental_JBMESSAGING_1356_2/tests/src/org/jboss/test/messaging/util/base/SerialExecutorTestBase.java
Modified:
branches/Branch_Experimental_JBMESSAGING_1356_2/src/main/org/jboss/jms/client/container/ClientConsumer.java
branches/Branch_Experimental_JBMESSAGING_1356_2/src/main/org/jboss/jms/client/container/ConsumerAspect.java
branches/Branch_Experimental_JBMESSAGING_1356_2/src/main/org/jboss/jms/server/ServerPeer.java
branches/Branch_Experimental_JBMESSAGING_1356_2/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactoryJNDIMapper.java
branches/Branch_Experimental_JBMESSAGING_1356_2/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java
branches/Branch_Experimental_JBMESSAGING_1356_2/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
branches/Branch_Experimental_JBMESSAGING_1356_2/src/main/org/jboss/messaging/util/NamedThreadQueuedExecutor.java
branches/Branch_Experimental_JBMESSAGING_1356_2/tests/lib/jdbc-drivers/
Log:
applying the 4435, 4455 JBMESSAGING-1356 changes to Branch_Experimental_JBMESSAGING_1356_2 (rooted of 1.4.0.SP3.CP2)
Modified: branches/Branch_Experimental_JBMESSAGING_1356_2/src/main/org/jboss/jms/client/container/ClientConsumer.java
===================================================================
--- branches/Branch_Experimental_JBMESSAGING_1356_2/src/main/org/jboss/jms/client/container/ClientConsumer.java 2008-06-13 00:02:28 UTC (rev 4458)
+++ branches/Branch_Experimental_JBMESSAGING_1356_2/src/main/org/jboss/jms/client/container/ClientConsumer.java 2008-06-13 01:08:10 UTC (rev 4459)
@@ -42,10 +42,11 @@
import org.jboss.logging.Logger;
import org.jboss.messaging.core.contract.Message;
import org.jboss.messaging.util.Future;
+import org.jboss.messaging.util.SerialExecutor;
import org.jboss.messaging.util.prioritylinkedlist.BasicPriorityLinkedList;
import org.jboss.messaging.util.prioritylinkedlist.PriorityLinkedList;
-import EDU.oswego.cs.dl.util.concurrent.QueuedExecutor;
+import EDU.oswego.cs.dl.util.concurrent.Executor;
/**
* @author <a href="mailto:ovidiu at feodorov.com">Ovidiu Feodorov</a>
@@ -206,7 +207,7 @@
private int ackMode;
private boolean closed;
private Object mainLock;
- private QueuedExecutor sessionExecutor;
+ private SerialExecutor sessionExecutor;
private boolean listenerRunning;
private int maxDeliveries;
private String queueName;
@@ -230,7 +231,7 @@
public ClientConsumer(boolean isCC, int ackMode,
SessionDelegate sess, ConsumerDelegate cons, String consumerID,
String queueName,
- int bufferSize, QueuedExecutor sessionExecutor,
+ int bufferSize, SerialExecutor sessionExecutor,
int maxDeliveries, boolean shouldAck,
long redeliveryDelay)
{
@@ -693,7 +694,7 @@
{
// Wait for any onMessage() executions to complete
- if (Thread.currentThread().equals(sessionExecutor.getThread()))
+ if (Thread.currentThread().equals(sessionExecutor.getExecutorThread()))
{
// the current thread already closing this ClientConsumer (this happens when the
// session is closed from within the MessageListener.onMessage(), for example), so no need
Modified: branches/Branch_Experimental_JBMESSAGING_1356_2/src/main/org/jboss/jms/client/container/ConsumerAspect.java
===================================================================
--- branches/Branch_Experimental_JBMESSAGING_1356_2/src/main/org/jboss/jms/client/container/ConsumerAspect.java 2008-06-13 00:02:28 UTC (rev 4458)
+++ branches/Branch_Experimental_JBMESSAGING_1356_2/src/main/org/jboss/jms/client/container/ConsumerAspect.java 2008-06-13 01:08:10 UTC (rev 4459)
@@ -36,9 +36,8 @@
import org.jboss.jms.exception.MessagingShutdownException;
import org.jboss.logging.Logger;
import org.jboss.messaging.util.MessageQueueNameHelper;
+import org.jboss.messaging.util.SerialExecutor;
-import EDU.oswego.cs.dl.util.concurrent.QueuedExecutor;
-
/**
*
* Handles operations related to the consumer.
@@ -82,7 +81,7 @@
ConsumerState consumerState = (ConsumerState)((DelegateSupport)consumerDelegate).getState();
String consumerID = consumerState.getConsumerID();
int prefetchSize = consumerState.getBufferSize();
- QueuedExecutor sessionExecutor = sessionState.getExecutor();
+ SerialExecutor sessionExecutor = sessionState.getExecutor();
int maxDeliveries = consumerState.getMaxDeliveries();
long redeliveryDelay = consumerState.getRedeliveryDelay();
Modified: branches/Branch_Experimental_JBMESSAGING_1356_2/src/main/org/jboss/jms/server/ServerPeer.java
===================================================================
--- branches/Branch_Experimental_JBMESSAGING_1356_2/src/main/org/jboss/jms/server/ServerPeer.java 2008-06-13 00:02:28 UTC (rev 4458)
+++ branches/Branch_Experimental_JBMESSAGING_1356_2/src/main/org/jboss/jms/server/ServerPeer.java 2008-06-13 01:08:10 UTC (rev 4459)
@@ -86,6 +86,7 @@
import org.w3c.dom.Element;
import EDU.oswego.cs.dl.util.concurrent.ConcurrentReaderHashMap;
+import EDU.oswego.cs.dl.util.concurrent.PooledExecutor;
/**
* A JMS server peer.
@@ -1383,6 +1384,11 @@
return "ServerPeer[" + getServerPeerID() + "]";
}
+ public PooledExecutor getPooledExecutor()
+ {
+ return new PooledExecutor(10);
+ }
+
// Package protected ----------------------------------------------------------------------------
// Protected ------------------------------------------------------------------------------------
Modified: branches/Branch_Experimental_JBMESSAGING_1356_2/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactoryJNDIMapper.java
===================================================================
--- branches/Branch_Experimental_JBMESSAGING_1356_2/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactoryJNDIMapper.java 2008-06-13 00:02:28 UTC (rev 4458)
+++ branches/Branch_Experimental_JBMESSAGING_1356_2/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactoryJNDIMapper.java 2008-06-13 01:08:10 UTC (rev 4459)
@@ -54,10 +54,9 @@
import org.jboss.messaging.util.JNDIUtil;
import org.jboss.messaging.util.NamedThreadQueuedExecutor;
import org.jboss.messaging.util.Version;
+import org.jboss.messaging.util.SerialExecutor;
import org.jboss.remoting.InvokerLocator;
-import EDU.oswego.cs.dl.util.concurrent.QueuedExecutor;
-
/**
* @author <a href="mailto:ovidiu at feodorov.com">Ovidiu Feodorov</a>
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
@@ -90,7 +89,7 @@
private Replicator replicator;
- private QueuedExecutor notifyExecutor;
+ private SerialExecutor notifyExecutor;
// Constructors ---------------------------------------------------------------------------------
Modified: branches/Branch_Experimental_JBMESSAGING_1356_2/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java
===================================================================
--- branches/Branch_Experimental_JBMESSAGING_1356_2/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java 2008-06-13 00:02:28 UTC (rev 4458)
+++ branches/Branch_Experimental_JBMESSAGING_1356_2/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java 2008-06-13 01:08:10 UTC (rev 4459)
@@ -245,9 +245,10 @@
// create the corresponding server-side session endpoint and register it with this
// connection endpoint instance
- //Note we only replicate transacted and client acknowledge sessions.
- ServerSessionEndpoint ep = new ServerSessionEndpoint(sessionID, this,
- transacted || acknowledgmentMode == Session.CLIENT_ACKNOWLEDGE);
+ //Note we only replicate transacted and client acknowledge sessions.
+ ServerSessionEndpoint ep = new ServerSessionEndpoint(
+ sessionID, this, transacted || acknowledgmentMode == Session.CLIENT_ACKNOWLEDGE,
+ serverPeer.getPooledExecutor());
synchronized (sessions)
{
Modified: branches/Branch_Experimental_JBMESSAGING_1356_2/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
===================================================================
--- branches/Branch_Experimental_JBMESSAGING_1356_2/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java 2008-06-13 00:02:28 UTC (rev 4458)
+++ branches/Branch_Experimental_JBMESSAGING_1356_2/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java 2008-06-13 01:08:10 UTC (rev 4459)
@@ -86,12 +86,14 @@
import org.jboss.messaging.util.GUIDGenerator;
import org.jboss.messaging.util.MessageQueueNameHelper;
import org.jboss.messaging.util.NamedThreadQueuedExecutor;
+import org.jboss.messaging.util.SerialExecutor;
+import org.jboss.messaging.util.PooledSerialExecutor;
import org.jboss.remoting.callback.Callback;
import org.jboss.remoting.callback.ServerInvokerCallbackHandler;
import EDU.oswego.cs.dl.util.concurrent.ConcurrentHashMap;
import EDU.oswego.cs.dl.util.concurrent.LinkedQueue;
-import EDU.oswego.cs.dl.util.concurrent.QueuedExecutor;
+import EDU.oswego.cs.dl.util.concurrent.PooledExecutor;
/**
* The server side representation of a JMS session.
@@ -169,7 +171,7 @@
private long deliveryIdSequence;
//Temporary until we have our own NIO transport
- QueuedExecutor executor;
+ SerialExecutor executor;
private LinkedQueue toDeliver = new LinkedQueue();
@@ -182,12 +184,24 @@
// Constructors ---------------------------------------------------------------------------------
+ /**
+ * @param pooledExecutor - a pre-configured pooled executor for the case we want to bound
+ * the number of threads used by session endpoints throughout a VM. If null, we'll fall
+ * back to a thread per session endpoint (see JBMESSAGING-1356)
+ */
ServerSessionEndpoint(String sessionID, ServerConnectionEndpoint connectionEndpoint,
- boolean replicating) throws Exception
+ boolean replicating, PooledExecutor pooledExecutor) throws Exception
{
this.id = sessionID;
- this.executor = new NamedThreadQueuedExecutor("jbm-server-session-" + sessionID);
+ if (pooledExecutor != null)
+ {
+ this.executor = new PooledSerialExecutor(pooledExecutor);
+ }
+ else
+ {
+ this.executor = new NamedThreadQueuedExecutor("jbm-server-session-" + sessionID);
+ }
this.connectionEndpoint = connectionEndpoint;
Added: branches/Branch_Experimental_JBMESSAGING_1356_2/src/main/org/jboss/messaging/util/DirectSerialExecutor.java
===================================================================
--- branches/Branch_Experimental_JBMESSAGING_1356_2/src/main/org/jboss/messaging/util/DirectSerialExecutor.java (rev 0)
+++ branches/Branch_Experimental_JBMESSAGING_1356_2/src/main/org/jboss/messaging/util/DirectSerialExecutor.java 2008-06-13 01:08:10 UTC (rev 4459)
@@ -0,0 +1,61 @@
+package org.jboss.messaging.util;
+
+import EDU.oswego.cs.dl.util.concurrent.QueuedExecutor;
+
+/**
+ * A SerialExecutor that uses the threads tasks are submitted to it.
+ *
+ * @author <a href="mailto:ovidiu at feodorov.com">Ovidiu Feodorov</a>
+ *
+ * @version <tt>$Revision$</tt>
+ *
+ * $Id$
+ */
+public class DirectSerialExecutor implements SerialExecutor
+{
+ // Constants -----------------------------------------------------------------------------------
+
+ // Static --------------------------------------------------------------------------------------
+
+ // Attributes ----------------------------------------------------------------------------------
+
+ // Constructors --------------------------------------------------------------------------------
+
+ public DirectSerialExecutor()
+ {
+ }
+
+ // Executor implementation ---------------------------------------------------------------------
+
+ public void execute(Runnable runnable) throws InterruptedException
+ {
+ throw new RuntimeException("NOT YET IMPLEMENTED");
+ }
+
+ // SerialExecutor implementation ---------------------------------------------------------------
+
+ public void shutdownAfterProcessingCurrentlyQueuedTasks()
+ {
+ throw new RuntimeException("NOT YET IMPLEMENTED");
+ }
+
+ public void shutdownNow()
+ {
+ throw new RuntimeException("NOT YET IMPLEMENTED");
+ }
+
+ public Thread getExecutorThread()
+ {
+ throw new RuntimeException("NOT YET IMPLEMENTED");
+ }
+
+ // Public --------------------------------------------------------------------------------------
+
+ // Package protected ---------------------------------------------------------------------------
+
+ // Protected -----------------------------------------------------------------------------------
+
+ // Private -------------------------------------------------------------------------------------
+
+ // Inner classes -------------------------------------------------------------------------------
+}
Property changes on: branches/Branch_Experimental_JBMESSAGING_1356_2/src/main/org/jboss/messaging/util/DirectSerialExecutor.java
___________________________________________________________________
Name: svn:executable
+ *
Modified: branches/Branch_Experimental_JBMESSAGING_1356_2/src/main/org/jboss/messaging/util/NamedThreadQueuedExecutor.java
===================================================================
--- branches/Branch_Experimental_JBMESSAGING_1356_2/src/main/org/jboss/messaging/util/NamedThreadQueuedExecutor.java 2008-06-13 00:02:28 UTC (rev 4458)
+++ branches/Branch_Experimental_JBMESSAGING_1356_2/src/main/org/jboss/messaging/util/NamedThreadQueuedExecutor.java 2008-06-13 01:08:10 UTC (rev 4459)
@@ -28,39 +28,44 @@
import EDU.oswego.cs.dl.util.concurrent.ThreadFactory;
/**
- *
+ *
* A NamedThreadQueuedExecutor
- *
+ *
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @author <a href="mailto:ovidiu at feodorov.com">Ovidiu Feodorov</a>
*
*/
-public class NamedThreadQueuedExecutor extends QueuedExecutor
-{
+public class NamedThreadQueuedExecutor extends QueuedExecutor implements SerialExecutor {
private static final Logger log = Logger.getLogger(NamedThreadQueuedExecutor.class);
-
+
private final String name;
-
+
private static final ThreadGroup jbmGroup = new ThreadGroup("JBM-threads");
-
- public NamedThreadQueuedExecutor(String name)
+
+ public Thread getExecutorThread()
+ {
+ return super.getThread();
+ }
+
+ public NamedThreadQueuedExecutor(String name)
{
super(new LinkedQueue());
-
+
this.name = name;
-
+
setThreadFactory(new Factory());
-
+
clearThread();
-
+
restart();
}
-
+
private class Factory implements ThreadFactory
{
public Thread newThread(Runnable command)
- {
+ {
return new Thread(jbmGroup, command, name);
}
-
+
}
}
Added: branches/Branch_Experimental_JBMESSAGING_1356_2/src/main/org/jboss/messaging/util/PooledSerialExecutor.java
===================================================================
--- branches/Branch_Experimental_JBMESSAGING_1356_2/src/main/org/jboss/messaging/util/PooledSerialExecutor.java (rev 0)
+++ branches/Branch_Experimental_JBMESSAGING_1356_2/src/main/org/jboss/messaging/util/PooledSerialExecutor.java 2008-06-13 01:08:10 UTC (rev 4459)
@@ -0,0 +1,226 @@
+package org.jboss.messaging.util;
+
+import EDU.oswego.cs.dl.util.concurrent.PooledExecutor;
+import EDU.oswego.cs.dl.util.concurrent.BoundedBuffer;
+import org.jboss.logging.Logger;
+
+/**
+ * A SerialExecutor implementation that honors the contract of serial task execution while using
+ * an internal or externall pool of threads to carry on the actual executions.
+ *
+ * In case of using an external thread pool, this implementation allows creation of a large number
+ * of serial executors that use a relatively small numbers of threas, thus bounding uncontrolled
+ * proliferation of threads.
+ *
+ * @author <a href="mailto:ovidiu at feodorov.com">Ovidiu Feodorov</a>
+ *
+ * @version <tt>$Revision$</tt>
+ *
+ * $Id$
+ */
+public class PooledSerialExecutor implements SerialExecutor
+{
+ // Constants -----------------------------------------------------------------------------------
+
+ private static final Logger log = Logger.getLogger(PooledSerialExecutor.class);
+
+ public static final int DEFAULT_TASK_QUEUE_SIZE = 50;
+ public static final int DEFAULT_THREAD_COUNT = 3;
+
+ private static final Runnable SHUTDOWN_TASK = new Runnable() { public void run() {}};
+
+ // Static --------------------------------------------------------------------------------------
+
+ // Attributes ----------------------------------------------------------------------------------
+
+ private BoundedBuffer taskQueue;
+ private PooledExecutor delegate;
+ private boolean taskExecuting;
+ private volatile Thread currentlyExecuting;
+ private boolean shuttingDown;
+
+ // Constructors --------------------------------------------------------------------------------
+
+ public PooledSerialExecutor()
+ {
+ this(DEFAULT_TASK_QUEUE_SIZE);
+ }
+
+ public PooledSerialExecutor(int taskQueueSize)
+ {
+ this(taskQueueSize, new PooledExecutor(DEFAULT_THREAD_COUNT));
+ }
+
+ public PooledSerialExecutor(int taskQueueSize, int threadCount)
+ {
+ this(taskQueueSize, new PooledExecutor(threadCount));
+ }
+
+ public PooledSerialExecutor(PooledExecutor delegate)
+ {
+ this(DEFAULT_TASK_QUEUE_SIZE, delegate);
+ }
+
+ public PooledSerialExecutor(int taskQueueSize, PooledExecutor delegate)
+ {
+ this.delegate = delegate;
+ taskQueue = new BoundedBuffer(taskQueueSize);
+ }
+
+ // Executor implementation ---------------------------------------------------------------------
+
+ /**
+ * Note: if the tasks are queued faster than executed, this method will eventually block,
+ * until downstream execution creates space in the queue again.
+ *
+ * @throws InterruptedException
+ */
+ public void execute(Runnable task) throws InterruptedException
+ {
+ taskQueue.put(task);
+ executeNext();
+ }
+
+ // SerialExecutor implementation ---------------------------------------------------------------
+
+ public void shutdownAfterProcessingCurrentlyQueuedTasks()
+ {
+ while(true)
+ {
+ try
+ {
+ // queue a shutdown task
+ taskQueue.put(SHUTDOWN_TASK);
+ return;
+ }
+ catch(InterruptedException e)
+ {
+ log.warn("failed to queue shutdown task, retrying ...", e);
+ }
+ }
+ }
+
+ public synchronized void shutdownNow()
+ {
+ if (shuttingDown)
+ {
+ return;
+ }
+
+ shuttingDown = true;
+
+ if (taskExecuting && currentlyExecuting != null)
+ {
+ currentlyExecuting.interrupt();
+ }
+
+ // clear the rest of the tasks
+ try
+ {
+ while(taskQueue.poll(0) != null)
+ {
+ }
+ }
+ catch(InterruptedException e)
+ {
+ log.warn(e);
+ }
+ }
+
+ public Thread getExecutorThread()
+ {
+ return currentlyExecuting;
+ }
+
+ // Public --------------------------------------------------------------------------------------
+
+ // Package protected ---------------------------------------------------------------------------
+
+ // Protected -----------------------------------------------------------------------------------
+
+ // Private -------------------------------------------------------------------------------------
+
+ private synchronized void executeNext() throws InterruptedException
+ {
+ if (taskExecuting)
+ {
+ // there's other active task already, get out
+ return;
+ }
+
+ // dequeue the oldest task
+ Runnable task = (Runnable)taskQueue.poll(0);
+
+ if (task == null)
+ {
+ // no tasks, get out
+ return;
+ }
+ else if (task == SHUTDOWN_TASK)
+ {
+ delegate.shutdownAfterProcessingCurrentlyQueuedTasks();
+ return;
+ }
+
+ taskExecuting = true; // TODO - what if .execute() throws exception, clean taskExecuting flag
+
+ // fire a new thread with the task we've just dequeued
+ // TODO Warning: for one-thread PooledExecutor, we will end up in recursion here
+ delegate.execute(new TaskWrapper(task));
+
+ // TODO - what happens if execution failes, the task will be lost, because it is dequeeued
+ }
+
+ // Inner classes -------------------------------------------------------------------------------
+
+ private class TaskWrapper implements Runnable
+ {
+ private Runnable task;
+
+ private TaskWrapper(Runnable task)
+ {
+ this.task = task;
+ }
+
+ public void run()
+ {
+ try
+ {
+ currentlyExecuting = Thread.currentThread();
+
+ task.run();
+ }
+ catch(Throwable t)
+ {
+ // task failed, log and get out
+ log.error("task " + task + " failed", t);
+ }
+ finally
+ {
+ synchronized(PooledSerialExecutor.this)
+ {
+ taskExecuting = false;
+ currentlyExecuting = null;
+
+ if (shuttingDown)
+ {
+ return;
+ }
+
+ while(true)
+ {
+ try
+ {
+ executeNext();
+ break;
+ }
+ catch(InterruptedException e)
+ {
+ log.warn("temporary failure to submit next task, retrying ...", e);
+ }
+ }
+ }
+ }
+ }
+ }
+}
Property changes on: branches/Branch_Experimental_JBMESSAGING_1356_2/src/main/org/jboss/messaging/util/PooledSerialExecutor.java
___________________________________________________________________
Name: svn:executable
+ *
Added: branches/Branch_Experimental_JBMESSAGING_1356_2/src/main/org/jboss/messaging/util/QueuedSerialExecutor.java
===================================================================
--- branches/Branch_Experimental_JBMESSAGING_1356_2/src/main/org/jboss/messaging/util/QueuedSerialExecutor.java (rev 0)
+++ branches/Branch_Experimental_JBMESSAGING_1356_2/src/main/org/jboss/messaging/util/QueuedSerialExecutor.java 2008-06-13 01:08:10 UTC (rev 4459)
@@ -0,0 +1,67 @@
+package org.jboss.messaging.util;
+
+import EDU.oswego.cs.dl.util.concurrent.QueuedExecutor;
+
+/**
+ * A SerialExecutor that just simply delegates to a QueuedExecutor.
+ *
+ * @author <a href="mailto:ovidiu at feodorov.com">Ovidiu Feodorov</a>
+ *
+ * @version <tt>$Revision$</tt>
+ *
+ * $Id$
+ */
+public class QueuedSerialExecutor implements SerialExecutor
+{
+ // Constants -----------------------------------------------------------------------------------
+
+ // Static --------------------------------------------------------------------------------------
+
+ // Attributes ----------------------------------------------------------------------------------
+
+ private QueuedExecutor delegate;
+
+ // Constructors --------------------------------------------------------------------------------
+
+ /**
+ * Creates its delegate QueueExecutor.
+ */
+ public QueuedSerialExecutor()
+ {
+ delegate = new QueuedExecutor();
+ }
+
+ // Executor implementation ---------------------------------------------------------------------
+
+ public void execute(Runnable runnable) throws InterruptedException
+ {
+ delegate.execute(runnable);
+ }
+
+ // SerialExecutor implementation ---------------------------------------------------------------
+
+ public void shutdownAfterProcessingCurrentlyQueuedTasks()
+ {
+ delegate.shutdownAfterProcessingCurrentlyQueuedTasks();
+ }
+
+ public void shutdownNow()
+ {
+ delegate.shutdownNow();
+ }
+
+ public Thread getExecutorThread()
+ {
+ return delegate.getThread();
+ }
+
+ // Public --------------------------------------------------------------------------------------
+
+ // Package protected ---------------------------------------------------------------------------
+
+ // Protected -----------------------------------------------------------------------------------
+
+ // Private -------------------------------------------------------------------------------------
+
+ // Inner classes -------------------------------------------------------------------------------
+}
Property changes on: branches/Branch_Experimental_JBMESSAGING_1356_2/src/main/org/jboss/messaging/util/QueuedSerialExecutor.java
___________________________________________________________________
Name: svn:executable
+ *
Added: branches/Branch_Experimental_JBMESSAGING_1356_2/src/main/org/jboss/messaging/util/SerialExecutor.java
===================================================================
--- branches/Branch_Experimental_JBMESSAGING_1356_2/src/main/org/jboss/messaging/util/SerialExecutor.java (rev 0)
+++ branches/Branch_Experimental_JBMESSAGING_1356_2/src/main/org/jboss/messaging/util/SerialExecutor.java 2008-06-13 01:08:10 UTC (rev 4459)
@@ -0,0 +1,47 @@
+package org.jboss.messaging.util;
+
+import EDU.oswego.cs.dl.util.concurrent.Executor;
+
+/**
+ * An Executor type that guarantees serial execution of tasks submitted to it: If task B is
+ * submitted after task A, on the same thread or different threads, then B's execution is
+ * guaranteed not to start unless A's execution finishes (successfully or otherwise).
+ *
+ * A QueuedExecutor is a possible implementation of such type, but the implementation is quite
+ * inefficient if we need a large number of such serial executors, in that it wastefully "books" a
+ * thread per instance. A different implementation may "borrow" execution threads from a common
+ * pool, while maintaining serial execution gurarantee.
+ *
+ * @author <a href="mailto:ovidiu at feodorov.com">Ovidiu Feodorov</a>
+ *
+ * @version <tt>$Revision$</tt>
+ *
+ * $Id$
+ */
+public interface SerialExecutor extends Executor
+{
+ /**
+ * Disassociates itself from all background threads after it processes all tasks currently in
+ * queue. Any tasks entered after this point will not be processed. A shut down serial executor
+ * may not be reused. This method may block if the task queue is finite and full.
+ */
+ void shutdownAfterProcessingCurrentlyQueuedTasks();
+
+ /**
+ * Terminate background thread even if it is currently processing a task. This method uses
+ * Thread.interrupt, so relies on tasks themselves responding appropriately to interruption. If
+ * the current tasks does not terminate on interruption, then the thread will not terminate
+ * until processing current task. A shut down thread cannot be restarted.
+ **/
+ void shutdownNow();
+
+ /**
+ *
+ * @return the thread that is executing the current serial tasks, or is scheduled to execute
+ * the next task, whenever that task may be submitted. Some SerialExecutor
+ * implementation may chose to return null if no current thread is executing and the
+ * thread that is going to execute next task is not known at the moment.
+ */
+ Thread getExecutorThread();
+
+}
Property changes on: branches/Branch_Experimental_JBMESSAGING_1356_2/src/main/org/jboss/messaging/util/SerialExecutor.java
___________________________________________________________________
Name: svn:executable
+ *
Property changes on: branches/Branch_Experimental_JBMESSAGING_1356_2/tests/lib/jdbc-drivers
___________________________________________________________________
Name: svn:ignore
+ *.jar
Added: branches/Branch_Experimental_JBMESSAGING_1356_2/tests/src/org/jboss/test/messaging/jms/PooledThreadsDeliveryTest.java
===================================================================
--- branches/Branch_Experimental_JBMESSAGING_1356_2/tests/src/org/jboss/test/messaging/jms/PooledThreadsDeliveryTest.java (rev 0)
+++ branches/Branch_Experimental_JBMESSAGING_1356_2/tests/src/org/jboss/test/messaging/jms/PooledThreadsDeliveryTest.java 2008-06-13 01:08:10 UTC (rev 4459)
@@ -0,0 +1,135 @@
+package org.jboss.test.messaging.jms;
+
+import javax.jms.Connection;
+import javax.jms.Session;
+import javax.jms.MessageProducer;
+import javax.jms.DeliveryMode;
+import javax.jms.TextMessage;
+import javax.jms.MessageConsumer;
+
+/**
+ * This is a container for all test cases related to
+ * http://jira.jboss.com/jira/browse/JBMESSAGING-1356: an attempt to minimize the number of threads
+ * used by JBM sessions.
+ *
+ * @author <a href="mailto:ovidiu at feodorov.com">Ovidiu Feodorov</a>
+ *
+ * @version <tt>$Revision$</tt>
+ *
+ * $Id$
+ */
+public class PooledThreadsDeliveryTest extends JMSTestCase
+{
+ // Constants -----------------------------------------------------------------------------------
+
+ // Static --------------------------------------------------------------------------------------
+
+ // Attributes ----------------------------------------------------------------------------------
+
+ // Constructors --------------------------------------------------------------------------------
+
+ public PooledThreadsDeliveryTest(String name)
+ {
+ super(name);
+ }
+
+ // Public --------------------------------------------------------------------------------------
+
+ public void testOneMessage() throws Exception
+ {
+ Connection conn = null;
+
+ try
+ {
+ conn = cf.createConnection();
+
+ Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ MessageProducer prod = session.createProducer(queue1);
+ prod.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+
+ TextMessage m = null;
+
+ m = session.createTextMessage("1");
+ prod.send(m);
+
+ session.close();
+
+ session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ MessageConsumer cons = session.createConsumer(queue1);
+
+ conn.start();
+
+ m = (TextMessage)cons.receive();
+ assertEquals("1", m.getText());
+ }
+ finally
+ {
+ if (conn != null)
+ {
+ conn.close();
+ }
+ }
+ }
+
+ public void testMessageSequence() throws Exception
+ {
+ Connection conn = null;
+
+ try
+ {
+ conn = cf.createConnection();
+
+ Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ MessageProducer prod = session.createProducer(queue1);
+ prod.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+
+ TextMessage m = null;
+
+ m = session.createTextMessage("1");
+ prod.send(m);
+
+ m = session.createTextMessage("2");
+ prod.send(m);
+
+ m = session.createTextMessage("3");
+ prod.send(m);
+
+ session.close();
+
+
+ session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ MessageConsumer cons = session.createConsumer(queue1);
+
+ conn.start();
+
+ m = (TextMessage)cons.receive();
+ assertEquals("1", m.getText());
+
+ m = (TextMessage)cons.receive();
+ assertEquals("2", m.getText());
+
+ m = (TextMessage)cons.receive();
+ assertEquals("3", m.getText());
+
+ }
+ finally
+ {
+ if (conn != null)
+ {
+ conn.close();
+ }
+ }
+ }
+
+ // Package protected ---------------------------------------------------------------------------
+
+ // Protected -----------------------------------------------------------------------------------
+
+ // Private -------------------------------------------------------------------------------------
+
+ // Inner classes -------------------------------------------------------------------------------
+}
Property changes on: branches/Branch_Experimental_JBMESSAGING_1356_2/tests/src/org/jboss/test/messaging/jms/PooledThreadsDeliveryTest.java
___________________________________________________________________
Name: svn:executable
+ *
Added: branches/Branch_Experimental_JBMESSAGING_1356_2/tests/src/org/jboss/test/messaging/util/JBMExecutorTest.java
===================================================================
--- branches/Branch_Experimental_JBMESSAGING_1356_2/tests/src/org/jboss/test/messaging/util/JBMExecutorTest.java (rev 0)
+++ branches/Branch_Experimental_JBMESSAGING_1356_2/tests/src/org/jboss/test/messaging/util/JBMExecutorTest.java 2008-06-13 01:08:10 UTC (rev 4459)
@@ -0,0 +1,39 @@
+package org.jboss.test.messaging.util;
+
+import org.jboss.test.messaging.util.base.SerialExecutorTestBase;
+import org.jboss.messaging.util.SerialExecutor;
+import org.jboss.messaging.util.QueuedSerialExecutor;
+import org.jboss.messaging.util.JBMExecutor;
+
+/**
+ * @author <a href="mailto:ovidiu at feodorov.com">Ovidiu Feodorov</a>
+ *
+ * @version <tt>$Revision$</tt>
+ *
+ * $Id$
+ */
+public class JBMExecutorTest extends SerialExecutorTestBase
+{
+ // Constants -----------------------------------------------------------------------------------
+
+ // Static --------------------------------------------------------------------------------------
+
+ // Attributes ----------------------------------------------------------------------------------
+
+ // Constructors --------------------------------------------------------------------------------
+
+ // Public --------------------------------------------------------------------------------------
+
+ // Package protected ---------------------------------------------------------------------------
+
+ // Protected -----------------------------------------------------------------------------------
+
+ // Private -------------------------------------------------------------------------------------
+
+ // Inner classes -------------------------------------------------------------------------------
+
+ protected SerialExecutor getSerialExecutorToTest()
+ {
+ return new JBMExecutor("test-instance");
+ }
+}
Property changes on: branches/Branch_Experimental_JBMESSAGING_1356_2/tests/src/org/jboss/test/messaging/util/JBMExecutorTest.java
___________________________________________________________________
Name: svn:executable
+ *
Added: branches/Branch_Experimental_JBMESSAGING_1356_2/tests/src/org/jboss/test/messaging/util/NamedThreadQueuedExecutorTest.java
===================================================================
--- branches/Branch_Experimental_JBMESSAGING_1356_2/tests/src/org/jboss/test/messaging/util/NamedThreadQueuedExecutorTest.java (rev 0)
+++ branches/Branch_Experimental_JBMESSAGING_1356_2/tests/src/org/jboss/test/messaging/util/NamedThreadQueuedExecutorTest.java 2008-06-13 01:08:10 UTC (rev 4459)
@@ -0,0 +1,39 @@
+package org.jboss.test.messaging.util;
+
+import org.jboss.test.messaging.util.base.SerialExecutorTestBase;
+import org.jboss.messaging.util.SerialExecutor;
+import org.jboss.messaging.util.QueuedSerialExecutor;
+import org.jboss.messaging.util.NamedThreadQueuedExecutor;
+
+/**
+ * @author <a href="mailto:ovidiu at feodorov.com">Ovidiu Feodorov</a>
+ *
+ * @version <tt>$Revision$</tt>
+ *
+ * $Id$
+ */
+public class NamedThreadQueuedExecutorTest extends SerialExecutorTestBase
+{
+ // Constants -----------------------------------------------------------------------------------
+
+ // Static --------------------------------------------------------------------------------------
+
+ // Attributes ----------------------------------------------------------------------------------
+
+ // Constructors --------------------------------------------------------------------------------
+
+ // Public --------------------------------------------------------------------------------------
+
+ // Package protected ---------------------------------------------------------------------------
+
+ // Protected -----------------------------------------------------------------------------------
+
+ // Private -------------------------------------------------------------------------------------
+
+ // Inner classes -------------------------------------------------------------------------------
+
+ protected SerialExecutor getSerialExecutorToTest()
+ {
+ return new NamedThreadQueuedExecutor("test-instance");
+ }
+}
Property changes on: branches/Branch_Experimental_JBMESSAGING_1356_2/tests/src/org/jboss/test/messaging/util/NamedThreadQueuedExecutorTest.java
___________________________________________________________________
Name: svn:executable
+ *
Added: branches/Branch_Experimental_JBMESSAGING_1356_2/tests/src/org/jboss/test/messaging/util/PooledSerialExecutorTest.java
===================================================================
--- branches/Branch_Experimental_JBMESSAGING_1356_2/tests/src/org/jboss/test/messaging/util/PooledSerialExecutorTest.java (rev 0)
+++ branches/Branch_Experimental_JBMESSAGING_1356_2/tests/src/org/jboss/test/messaging/util/PooledSerialExecutorTest.java 2008-06-13 01:08:10 UTC (rev 4459)
@@ -0,0 +1,694 @@
+package org.jboss.test.messaging.util;
+
+import org.jboss.test.messaging.util.base.SerialExecutorTestBase;
+import org.jboss.messaging.util.SerialExecutor;
+import org.jboss.messaging.util.PooledSerialExecutor;
+import org.jboss.logging.Logger;
+import EDU.oswego.cs.dl.util.concurrent.Latch;
+import EDU.oswego.cs.dl.util.concurrent.PooledExecutor;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Collections;
+
+/**
+ * @author <a href="mailto:ovidiu at feodorov.com">Ovidiu Feodorov</a>
+ *
+ * @version <tt>$Revision$</tt>
+ *
+ * $Id$
+ */
+public class PooledSerialExecutorTest extends SerialExecutorTestBase
+{
+ // Constants -----------------------------------------------------------------------------------
+
+ private static final Logger log = Logger.getLogger(PooledSerialExecutorTest.class);
+
+ // Static --------------------------------------------------------------------------------------
+
+ // Attributes ----------------------------------------------------------------------------------
+
+ // Constructors --------------------------------------------------------------------------------
+
+ // Public --------------------------------------------------------------------------------------
+
+ public void testQueueingFasterThanExecution() throws Exception
+ {
+ // we force the task queue to be the smallest possible
+ final SerialExecutor se = new PooledSerialExecutor(1);
+
+ Task a = new Task("A");
+ se.execute(a); // this will be submitted to execution and will keep the executor "occupied"
+
+ Task b = new Task("B");
+ se.execute(b); // this will clog the queue
+
+ final Task c = new Task("C"); // this one will block at submission
+ final Latch cSubmissionLatch = new Latch();
+ new Thread(new Runnable()
+ {
+ public void run()
+ {
+ try
+ {
+ se.execute(c);
+ cSubmissionLatch.release();
+ }
+ catch(Exception e)
+ {
+ log.error(e);
+ }
+ }
+ }, "This thread will block").start();
+
+
+ // at this point, the executor is clogged
+ assertFalse(cSubmissionLatch.attempt(2000));
+
+ // allow A to execute
+ a.enableExecution();
+
+ // this will unclog the executor
+ cSubmissionLatch.acquire();
+ assertTrue(cSubmissionLatch.attempt(0));
+ }
+
+ public void testFloodWithOneThread() throws Exception
+ {
+ // flood a one-thread executor with tasks
+ SerialExecutor se = new PooledSerialExecutor(1000, 1);
+
+ final List<String> executionRecord = Collections.synchronizedList(new ArrayList<String>());
+
+ final int taskCount = 500;
+ final List<Task> tasksInOrderOfSubmission = new ArrayList<Task>(taskCount);
+ for(int i = 0; i < taskCount; i++)
+ {
+ // execution is not enabled
+ Task t = new Task(Integer.toString(i), executionRecord);
+ tasksInOrderOfSubmission.add(t);
+ se.execute(t);
+ }
+
+ for(Task t: tasksInOrderOfSubmission)
+ {
+ assertFalse(t.isCompleted());
+ }
+
+ // enable execution in order inverse to submission
+ for(int i = taskCount - 1; i >= 0; i --)
+ {
+ tasksInOrderOfSubmission.get(i).enableExecution();
+ }
+
+ // wait until all tasks are executed
+ for(Task t: tasksInOrderOfSubmission)
+ {
+ t.waitForCompletion();
+ }
+
+ // make sure the execution order is correct
+ assertEquals(taskCount, executionRecord.size());
+
+ int i = 0;
+ for(String s: executionRecord)
+ {
+ assertEquals(Integer.toString(i), s);
+ i ++;
+ }
+ }
+
+ public void testStackOverflow() throws Exception
+ {
+ fail("I KNOW THIS TEST FAILS, MUST FIX THE IMPLEMENATION");
+
+ // flood a one-thread executor with tasks
+ SerialExecutor se = new PooledSerialExecutor(100000, 1);
+
+ final List<String> executionRecord = Collections.synchronizedList(new ArrayList<String>());
+
+ final int taskCount = 100000;
+ final List<Task> tasksInOrderOfSubmission = new ArrayList<Task>(taskCount);
+ for(int i = 0; i < taskCount; i++)
+ {
+ // execution is not enabled
+ Task t = new Task(Integer.toString(i), executionRecord);
+ tasksInOrderOfSubmission.add(t);
+ se.execute(t);
+ }
+
+ // enable execution in order inverse to submission - this will throw StackOverflow
+ for(int i = taskCount - 1; i >= 0; i --)
+ {
+ tasksInOrderOfSubmission.get(i).enableExecution();
+ }
+
+ // wait until the last task is executed
+ tasksInOrderOfSubmission.get(taskCount - 1).waitForCompletion();
+ }
+
+ public void testFloodWithTwoThreads() throws Exception
+ {
+ // flood a two-thread executor with tasks
+ SerialExecutor se = new PooledSerialExecutor(1000, 2);
+
+ final List<String> executionRecord = Collections.synchronizedList(new ArrayList<String>());
+
+ final int taskCount = 500;
+ final List<Task> tasksInOrderOfSubmission = new ArrayList<Task>(taskCount);
+ for(int i = 0; i < taskCount; i++)
+ {
+ // execution is not enabled
+ Task t = new Task(Integer.toString(i), executionRecord);
+ tasksInOrderOfSubmission.add(t);
+ se.execute(t);
+ }
+
+ for(Task t: tasksInOrderOfSubmission)
+ {
+ assertFalse(t.isCompleted());
+ }
+
+ // enable execution in order inverse to submission
+ for(int i = taskCount - 1; i >= 0; i --)
+ {
+ tasksInOrderOfSubmission.get(i).enableExecution();
+ }
+
+ // wait until all tasks are executed
+ for(Task t: tasksInOrderOfSubmission)
+ {
+ t.waitForCompletion();
+ }
+
+ // make sure the execution order is correct
+ assertEquals(taskCount, executionRecord.size());
+
+ int i = 0;
+ for(String s: executionRecord)
+ {
+ assertEquals(Integer.toString(i), s);
+ i ++;
+ }
+ }
+
+ public void testExternalPoolExecutor_OnePooledThread_TwoSerialExecutors() throws Exception
+ {
+ // lockstep tests, two serial executors, shared pool (1 thread)
+
+ List<String> executionRecord = Collections.synchronizedList(new ArrayList<String>());
+ List<String> executionRecord2 = Collections.synchronizedList(new ArrayList<String>());
+
+ PooledExecutor shared = new PooledExecutor(1);
+
+ final PooledSerialExecutor se = new PooledSerialExecutor(shared);
+ final PooledSerialExecutor se2 = new PooledSerialExecutor(shared);
+
+ Task a = new Task("A", executionRecord);
+
+ se.execute(a);
+
+ final Task b = new Task("B", executionRecord);
+
+ // the thread that "launches" b will block temporarily, so do it from a fresh new one
+ new Thread(new Runnable()
+ {
+ public void run()
+ {
+ try
+ {
+ se.execute(b);
+ }
+ catch(InterruptedException e)
+ {
+ log.warn("interrupted", e);
+ }
+ }
+ }, "B Launcher").start();
+
+
+ final Task c = new Task("C", executionRecord2);
+ final Latch afterC = new Latch();
+
+ // the thread that "launches" c will block temporarily, so do it from a fresh new one
+ new Thread(new Runnable()
+ {
+ public void run()
+ {
+ try
+ {
+ se2.execute(c);
+ afterC.release();
+ }
+ catch(InterruptedException e)
+ {
+ log.warn("interrupted", e);
+ }
+ }
+ }, "C Launcher").start();
+
+ final Task d = new Task("D", executionRecord2);
+
+ // the thread that "launches" d will block temporarily, so do it from a fresh new one
+ new Thread(new Runnable()
+ {
+ public void run()
+ {
+ try
+ {
+ afterC.acquire();
+ se2.execute(d);
+ }
+ catch(InterruptedException e)
+ {
+ log.warn("interrupted", e);
+ }
+ }
+ }, "D Launcher").start();
+
+ assertFalse(a.isCompleted(500));
+ assertFalse(b.isCompleted(500));
+ assertFalse(c.isCompleted(500));
+ assertFalse(d.isCompleted(500));
+
+ // enable execution of everyone, in the inverse order of launching
+ d.enableExecution();
+ c.enableExecution();
+ b.enableExecution();
+ a.enableExecution();
+
+ a.waitForCompletion();
+ b.waitForCompletion();
+ c.waitForCompletion();
+ d.waitForCompletion();
+
+ assertTrue(a.isCompleted());
+ assertTrue(b.isCompleted());
+ assertTrue(c.isCompleted());
+ assertTrue(d.isCompleted());
+
+ assertEquals(2, executionRecord.size());
+ assertEquals("A", executionRecord.get(0));
+ assertEquals("B", executionRecord.get(1));
+
+ assertEquals(2, executionRecord2.size());
+ assertEquals("C", executionRecord2.get(0));
+ assertEquals("D", executionRecord2.get(1));
+ }
+
+ public void testExternalPoolExecutor_OnePooledThreads_Flood() throws Exception
+ {
+ List<String> executionRecord = Collections.synchronizedList(new ArrayList<String>());
+ List<String> executionRecord2 = Collections.synchronizedList(new ArrayList<String>());
+
+ PooledExecutor shared = new PooledExecutor(1);
+
+ final PooledSerialExecutor se = new PooledSerialExecutor(shared);
+ final PooledSerialExecutor se2 = new PooledSerialExecutor(shared);
+
+ int taskCount = 200;
+
+ Task firstA = null, lastA = null;
+
+ final List<Latch> latches = new ArrayList<Latch>(taskCount + 1);
+
+ for(int i = 0; i < taskCount + 1; i++)
+ {
+ latches.add(new Latch());
+ }
+
+ for(int i = 0; i < taskCount; i++)
+ {
+ String taskName = "A" + Integer.toString(i);
+ final Task t = new Task(taskName, executionRecord);
+
+ // enable all, but the first
+ if (i == 0)
+ {
+ firstA = t;
+ }
+ else
+ {
+ t.enableExecution();
+
+ if (i == taskCount - 1)
+ {
+ lastA = t;
+ }
+ }
+
+ final int j = i;
+ new Thread(new Runnable()
+ {
+ public void run()
+ {
+ try
+ {
+ latches.get(j).acquire();
+ se.execute(t);
+ latches.get(j + 1).release();
+ }
+ catch(InterruptedException e)
+ {
+ log.warn("interrupted", e);
+ }
+ }
+ }, taskName + " Launcher").start();
+ }
+
+ // start "sending"
+ latches.get(0).release();
+
+ // now the second executor
+
+ Task firstB = null, lastB = null;
+
+ final List<Latch> latches2 = new ArrayList<Latch>(taskCount + 1);
+
+ for(int i = 0; i < taskCount + 1; i++)
+ {
+ latches2.add(new Latch());
+ }
+
+ for(int i = 0; i < taskCount; i++)
+ {
+ String taskName = "B" + Integer.toString(i);
+ final Task t = new Task(taskName, executionRecord2);
+
+ // enable all, but the first
+ if (i == 0)
+ {
+ firstB = t;
+ }
+ else
+ {
+ t.enableExecution();
+
+ if (i == taskCount - 1)
+ {
+ lastB = t;
+ }
+ }
+
+ final int j = i;
+ new Thread(new Runnable()
+ {
+ public void run()
+ {
+ try
+ {
+ latches2.get(j).acquire();
+ se2.execute(t);
+ latches2.get(j + 1).release();
+ }
+ catch(InterruptedException e)
+ {
+ log.warn("interrupted", e);
+ }
+ }
+ }, taskName + " Launcher").start();
+ }
+
+ // start "sending"
+ latches2.get(0).release();
+
+
+ // "unclog" executors
+ firstB.enableExecution();
+ firstA.enableExecution();
+
+ // wait for all tasks to complete
+ lastB.waitForCompletion();
+ lastA.waitForCompletion();
+
+ // make sure the order is correct
+ assertEquals(taskCount, executionRecord.size());
+ for(int i = 0; i < taskCount; i ++)
+ {
+ assertEquals("A" + i, executionRecord.get(i));
+ }
+
+ assertEquals(taskCount, executionRecord2.size());
+ for(int i = 0; i < taskCount; i ++)
+ {
+ assertEquals("B" + i, executionRecord2.get(i));
+ }
+ }
+
+ public void testExternalPoolExecutor_TwoPooledThreads_TwoSerialExecutors() throws Exception
+ {
+ // lockstep tests, two serial executors, shared pool (2 threads)
+
+ List<String> executionRecord = Collections.synchronizedList(new ArrayList<String>());
+ List<String> executionRecord2 = Collections.synchronizedList(new ArrayList<String>());
+
+ PooledExecutor shared = new PooledExecutor(2);
+
+ final PooledSerialExecutor se = new PooledSerialExecutor(shared);
+ final PooledSerialExecutor se2 = new PooledSerialExecutor(shared);
+
+ Task a = new Task("A", executionRecord);
+
+ se.execute(a);
+
+ final Task b = new Task("B", executionRecord);
+
+ // the thread that "launches" b will block temporarily, so do it from a fresh new one
+ new Thread(new Runnable()
+ {
+ public void run()
+ {
+ try
+ {
+ se.execute(b);
+ }
+ catch(InterruptedException e)
+ {
+ log.warn("interrupted", e);
+ }
+ }
+ }, "B Launcher").start();
+
+
+ final Task c = new Task("C", executionRecord2);
+ final Latch afterC = new Latch();
+
+ // the thread that "launches" c may block temporarily, so do it from a fresh new one
+ new Thread(new Runnable()
+ {
+ public void run()
+ {
+ try
+ {
+ se2.execute(c);
+ afterC.release();
+ }
+ catch(InterruptedException e)
+ {
+ log.warn("interrupted", e);
+ }
+ }
+ }, "C Launcher").start();
+
+ final Task d = new Task("D", executionRecord2);
+
+ // the thread that "launches" d will block temporarily, so do it from a fresh new one
+ new Thread(new Runnable()
+ {
+ public void run()
+ {
+ try
+ {
+ afterC.acquire();
+ se2.execute(d);
+ }
+ catch(InterruptedException e)
+ {
+ log.warn("interrupted", e);
+ }
+ }
+ }, "D Launcher").start();
+
+ assertFalse(a.isCompleted(500));
+ assertFalse(b.isCompleted(500));
+ assertFalse(c.isCompleted(500));
+ assertFalse(d.isCompleted(500));
+
+ // enable execution of everyone, in the inverse order of launching
+ d.enableExecution();
+ c.enableExecution();
+ b.enableExecution();
+ a.enableExecution();
+
+ a.waitForCompletion();
+ b.waitForCompletion();
+ c.waitForCompletion();
+ d.waitForCompletion();
+
+ assertTrue(a.isCompleted());
+ assertTrue(b.isCompleted());
+ assertTrue(c.isCompleted());
+ assertTrue(d.isCompleted());
+
+ assertEquals(2, executionRecord.size());
+ assertEquals("A", executionRecord.get(0));
+ assertEquals("B", executionRecord.get(1));
+
+ assertEquals(2, executionRecord2.size());
+ assertEquals("C", executionRecord2.get(0));
+ assertEquals("D", executionRecord2.get(1));
+ }
+
+ public void testExternalPoolExecutor_TwoPooledThreads_Flood() throws Exception
+ {
+ List<String> executionRecord = Collections.synchronizedList(new ArrayList<String>());
+ List<String> executionRecord2 = Collections.synchronizedList(new ArrayList<String>());
+
+ PooledExecutor shared = new PooledExecutor(2);
+
+ final PooledSerialExecutor se = new PooledSerialExecutor(shared);
+ final PooledSerialExecutor se2 = new PooledSerialExecutor(shared);
+
+ int taskCount = 200;
+
+ Task firstA = null, lastA = null;
+
+ final List<Latch> latches = new ArrayList<Latch>(taskCount + 1);
+
+ for(int i = 0; i < taskCount + 1; i++)
+ {
+ latches.add(new Latch());
+ }
+
+ for(int i = 0; i < taskCount; i++)
+ {
+ String taskName = "A" + Integer.toString(i);
+ final Task t = new Task(taskName, executionRecord);
+
+ // enable all, but the first
+ if (i == 0)
+ {
+ firstA = t;
+ }
+ else
+ {
+ t.enableExecution();
+
+ if (i == taskCount - 1)
+ {
+ lastA = t;
+ }
+ }
+
+ final int j = i;
+ new Thread(new Runnable()
+ {
+ public void run()
+ {
+ try
+ {
+ latches.get(j).acquire();
+ se.execute(t);
+ latches.get(j + 1).release();
+ }
+ catch(InterruptedException e)
+ {
+ log.warn("interrupted", e);
+ }
+ }
+ }, taskName + " Launcher").start();
+ }
+
+ // start "sending"
+ latches.get(0).release();
+
+ // now the second executor
+
+ Task firstB = null, lastB = null;
+
+ final List<Latch> latches2 = new ArrayList<Latch>(taskCount + 1);
+
+ for(int i = 0; i < taskCount + 1; i++)
+ {
+ latches2.add(new Latch());
+ }
+
+ for(int i = 0; i < taskCount; i++)
+ {
+ String taskName = "B" + Integer.toString(i);
+ final Task t = new Task(taskName, executionRecord2);
+
+ // enable all, but the first
+ if (i == 0)
+ {
+ firstB = t;
+ }
+ else
+ {
+ t.enableExecution();
+
+ if (i == taskCount - 1)
+ {
+ lastB = t;
+ }
+ }
+
+ final int j = i;
+ new Thread(new Runnable()
+ {
+ public void run()
+ {
+ try
+ {
+ latches2.get(j).acquire();
+ se2.execute(t);
+ latches2.get(j + 1).release();
+ }
+ catch(InterruptedException e)
+ {
+ log.warn("interrupted", e);
+ }
+ }
+ }, taskName + " Launcher").start();
+ }
+
+ // start "sending"
+ latches2.get(0).release();
+
+
+ // "unclog" executors
+ firstB.enableExecution();
+ firstA.enableExecution();
+
+ // wait for all tasks to complete
+ lastB.waitForCompletion();
+ lastA.waitForCompletion();
+
+ // make sure the order is correct
+ assertEquals(taskCount, executionRecord.size());
+ for(int i = 0; i < taskCount; i ++)
+ {
+ assertEquals("A" + i, executionRecord.get(i));
+ }
+
+ assertEquals(taskCount, executionRecord2.size());
+ for(int i = 0; i < taskCount; i ++)
+ {
+ assertEquals("B" + i, executionRecord2.get(i));
+ }
+ }
+
+ // Package protected ---------------------------------------------------------------------------
+
+ // Protected -----------------------------------------------------------------------------------
+
+ // Private -------------------------------------------------------------------------------------
+
+ // Inner classes -------------------------------------------------------------------------------
+
+ @Override
+ protected SerialExecutor getSerialExecutorToTest()
+ {
+ return new PooledSerialExecutor();
+ }
+}
Property changes on: branches/Branch_Experimental_JBMESSAGING_1356_2/tests/src/org/jboss/test/messaging/util/PooledSerialExecutorTest.java
___________________________________________________________________
Name: svn:executable
+ *
Added: branches/Branch_Experimental_JBMESSAGING_1356_2/tests/src/org/jboss/test/messaging/util/QueuedSerialExecutorTest.java
===================================================================
--- branches/Branch_Experimental_JBMESSAGING_1356_2/tests/src/org/jboss/test/messaging/util/QueuedSerialExecutorTest.java (rev 0)
+++ branches/Branch_Experimental_JBMESSAGING_1356_2/tests/src/org/jboss/test/messaging/util/QueuedSerialExecutorTest.java 2008-06-13 01:08:10 UTC (rev 4459)
@@ -0,0 +1,38 @@
+package org.jboss.test.messaging.util;
+
+import org.jboss.test.messaging.util.base.SerialExecutorTestBase;
+import org.jboss.messaging.util.SerialExecutor;
+import org.jboss.messaging.util.QueuedSerialExecutor;
+
+/**
+ * @author <a href="mailto:ovidiu at feodorov.com">Ovidiu Feodorov</a>
+ *
+ * @version <tt>$Revision$</tt>
+ *
+ * $Id$
+ */
+public class QueuedSerialExecutorTest extends SerialExecutorTestBase
+{
+ // Constants -----------------------------------------------------------------------------------
+
+ // Static --------------------------------------------------------------------------------------
+
+ // Attributes ----------------------------------------------------------------------------------
+
+ // Constructors --------------------------------------------------------------------------------
+
+ // Public --------------------------------------------------------------------------------------
+
+ // Package protected ---------------------------------------------------------------------------
+
+ // Protected -----------------------------------------------------------------------------------
+
+ // Private -------------------------------------------------------------------------------------
+
+ // Inner classes -------------------------------------------------------------------------------
+
+ protected SerialExecutor getSerialExecutorToTest()
+ {
+ return new QueuedSerialExecutor();
+ }
+}
Property changes on: branches/Branch_Experimental_JBMESSAGING_1356_2/tests/src/org/jboss/test/messaging/util/QueuedSerialExecutorTest.java
___________________________________________________________________
Name: svn:executable
+ *
Added: branches/Branch_Experimental_JBMESSAGING_1356_2/tests/src/org/jboss/test/messaging/util/base/SerialExecutorTestBase.java
===================================================================
--- branches/Branch_Experimental_JBMESSAGING_1356_2/tests/src/org/jboss/test/messaging/util/base/SerialExecutorTestBase.java (rev 0)
+++ branches/Branch_Experimental_JBMESSAGING_1356_2/tests/src/org/jboss/test/messaging/util/base/SerialExecutorTestBase.java 2008-06-13 01:08:10 UTC (rev 4459)
@@ -0,0 +1,532 @@
+package org.jboss.test.messaging.util.base;
+
+import org.jboss.messaging.util.SerialExecutor;
+import org.jboss.test.messaging.util.ProxyAssertSupport;
+import org.jboss.logging.Logger;
+import EDU.oswego.cs.dl.util.concurrent.Latch;
+import EDU.oswego.cs.dl.util.concurrent.Slot;
+
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Collections;
+
+/**
+ * A base class that tests the contract of SerialExecutor types.
+ *
+ * @author <a href="mailto:ovidiu at feodorov.com">Ovidiu Feodorov</a>
+ *
+ * @version <tt>$Revision$</tt>
+ *
+ * $Id$
+ */
+public abstract class SerialExecutorTestBase extends ProxyAssertSupport
+{
+ // Constants -----------------------------------------------------------------------------------
+
+ private static final Logger log = Logger.getLogger(SerialExecutorTestBase.class);
+
+ // Static --------------------------------------------------------------------------------------
+
+ // Attributes ----------------------------------------------------------------------------------
+
+ // Constructors --------------------------------------------------------------------------------
+
+ // Public --------------------------------------------------------------------------------------
+
+ public void testSimpleExecution() throws Exception
+ {
+ SerialExecutor se = getSerialExecutorToTest();
+
+ Task task = new Task();
+ task.enableExecution();
+
+ se.execute(task);
+
+ task.waitForCompletion();
+
+ assertTrue(task.isCompleted());
+ }
+
+ public void testSerialExecution_SequentialSubmission() throws Exception
+ {
+ SerialExecutor se = getSerialExecutorToTest();
+ List<String> executionRecord = Collections.synchronizedList(new ArrayList<String>());
+
+ Task a = new Task("A", executionRecord);
+ a.enableExecution();
+
+ se.execute(a);
+
+ Task b = new Task("B", executionRecord);
+ b.enableExecution();
+
+ se.execute(b);
+
+ a.waitForCompletion();
+ b.waitForCompletion();
+
+ assertTrue(a.isCompleted());
+ assertTrue(b.isCompleted());
+
+ assertEquals(2, executionRecord.size());
+ assertEquals("A", executionRecord.get(0));
+ assertEquals("B", executionRecord.get(1));
+ }
+
+ public void testSerialExecution_IntertwinedSubmission() throws Exception
+ {
+ SerialExecutor se = getSerialExecutorToTest();
+ List<String> executionRecord = Collections.synchronizedList(new ArrayList<String>());
+
+ Task a = new Task("A", executionRecord);
+
+ // execution is not enabled yet on "A"
+
+ se.execute(a);
+
+ // submit "B" with execution enabled
+
+ Task b = new Task("B", executionRecord);
+ b.enableExecution();
+
+ se.execute(b);
+
+ // wait a bit, to make sure the scheduler is given a chance to attempt the execution of "B"
+
+ Thread.sleep(1000);
+
+ assertFalse(b.isCompleted());
+
+ // enable execution on "A"
+
+ a.enableExecution();
+
+ a.waitForCompletion();
+ b.waitForCompletion();
+
+ assertTrue(a.isCompleted());
+ assertTrue(b.isCompleted());
+
+ assertEquals(2, executionRecord.size());
+ assertEquals("A", executionRecord.get(0));
+ assertEquals("B", executionRecord.get(1));
+ }
+
+ public void testSerialExecution_IntertwinedSubmission_2() throws Exception
+ {
+ SerialExecutor se = getSerialExecutorToTest();
+ List<String> executionRecord = Collections.synchronizedList(new ArrayList<String>());
+
+ Task a = new Task("A", executionRecord);
+
+ // execution is not enabled yet on "A", but "A" is submitted first
+
+ se.execute(a);
+
+ // execution is not enabled yet on "B", "B" is submitted second
+
+ Task b = new Task("B", executionRecord);
+
+ se.execute(b);
+
+ // enable execution on "B"
+ b.enableExecution();
+
+ // in case of faulty serial executor implementation, there's a chance that "B" passes "A"
+ // so test for that
+
+ assertFalse(b.isCompleted(2000));
+
+ // enable execution on "A"
+
+ a.enableExecution();
+
+ a.waitForCompletion();
+ b.waitForCompletion();
+
+ assertTrue(a.isCompleted());
+ assertTrue(b.isCompleted());
+
+ assertEquals(2, executionRecord.size());
+ assertEquals("A", executionRecord.get(0));
+ assertEquals("B", executionRecord.get(1));
+ }
+
+ public void testSerialExecution_Flood() throws Exception
+ {
+ // flood the executor with a number of tasks and enable execution in the inverse order of
+ // flooding. If the serial order contract is not honored, we'll get a messed up execution
+ // record
+
+ final SerialExecutor se = getSerialExecutorToTest();
+ final List<String> executionRecord = Collections.synchronizedList(new ArrayList<String>());
+
+ final int taskCount = 200;
+ final List<Task> tasksInOrderOfSubmission = new ArrayList<Task>(taskCount);
+ for(int i = 0; i < taskCount; i++)
+ {
+ // execution is not enabled
+ Task t = new Task(Integer.toString(i), executionRecord);
+ tasksInOrderOfSubmission.add(t);
+ }
+
+ final Slot feedingResult = new Slot();
+
+ // feed tasks from a separate thread to prevent test deadlock in case the internal task
+ // queue (if exists at all) is too small
+ new Thread(new Runnable()
+ {
+ public void run()
+ {
+ try
+ {
+ for(int i = 0; i < taskCount; i++)
+ {
+ try
+ {
+ Task t = tasksInOrderOfSubmission.get(i);
+ se.execute(t);
+ }
+ catch(Exception e)
+ {
+ log.error(e);
+ feedingResult.put(e);
+ return;
+ }
+ }
+
+ // feeding done
+ feedingResult.put(Boolean.TRUE);
+ }
+ catch(Exception e)
+ {
+ log.error(e);
+ }
+ }
+ }, "Feeder Thread").start();
+
+ for(Task t: tasksInOrderOfSubmission)
+ {
+ assertFalse(t.isCompleted());
+ }
+
+ // enable execution in order inverse to submission
+ for(int i = taskCount - 1; i >= 0; i --)
+ {
+ tasksInOrderOfSubmission.get(i).enableExecution();
+ }
+
+ // by now, the feeding should've been finished, so check
+ Object result = feedingResult.take();
+ assertEquals(Boolean.TRUE, result);
+
+ // wait until all tasks are executed
+ for(Task t: tasksInOrderOfSubmission)
+ {
+ t.waitForCompletion();
+ }
+
+ // make sure the execution order is correct
+ assertEquals(taskCount, executionRecord.size());
+
+ int i = 0;
+ for(String s: executionRecord)
+ {
+ assertEquals(Integer.toString(i), s);
+ i ++;
+ }
+ }
+
+ public void testShutdownAfterProcessingCurrentlyQueuedTasks() throws Exception
+ {
+ // submit a "long-lasting" task and then immediately attempt to shut down
+
+ SerialExecutor se = getSerialExecutorToTest();
+
+ Task a = new Task("A");
+
+ // execution is not enabled yet on "A", but submit it anyway, this will simulate a "long run"
+
+ se.execute(a);
+
+ // this invocation should go through, but no further tasks are executed
+ se.shutdownAfterProcessingCurrentlyQueuedTasks();
+
+ Task b = new Task("B");
+ b.enableExecution();
+
+ se.execute(b);
+
+ assertFalse(b.isCompleted(1000));
+
+ a.enableExecution();
+ a.waitForCompletion();
+
+ // make sure the first task was correctly executed
+ assertTrue(a.isCompleted());
+
+ // make sure the second task was NOT executed
+ assertFalse(b.isCompleted(1000));
+ }
+
+ public void testShutdownAfterProcessingCurrentlyQueuedTasks_TwoQueuedTasks() throws Exception
+ {
+ // submit a "long-lasting" task, then an "enabled" task and then attempt to shut down
+
+ SerialExecutor se = getSerialExecutorToTest();
+
+ Task a = new Task("A");
+
+ // execution is not enabled yet on "A", but submit it anyway, this will simulate a "long run"
+
+ se.execute(a);
+
+ Task b = new Task("B");
+ b.enableExecution();
+ se.execute(b);
+
+ // this invocation should go through, but no further tasks are executed
+ se.shutdownAfterProcessingCurrentlyQueuedTasks();
+
+ Task c = new Task("C");
+ c.enableExecution();
+ se.execute(c);
+
+ assertFalse(b.isCompleted(500));
+ assertFalse(c.isCompleted(500));
+
+ a.enableExecution();
+ a.waitForCompletion();
+
+ // make sure the first task was correctly executed
+ assertTrue(a.isCompleted());
+
+ // make sure the second task was correctly executed
+ b.waitForCompletion();
+ assertTrue(b.isCompleted());
+
+ // make sure the second task was NOT executed
+ assertFalse(c.isCompleted(1000));
+ }
+
+ public void testShutdownNow() throws Exception
+ {
+ SerialExecutor se = getSerialExecutorToTest();
+
+ se.shutdownNow();
+
+ // make sure no new tasks go through
+ Task a = new Task("A");
+ se.execute(a);
+ assertFalse(a.isCompleted(1000));
+ }
+
+ public void testShutdownNow2() throws Exception
+ {
+ SerialExecutor se = getSerialExecutorToTest();
+
+ Task a = new Task("A");
+ se.execute(a);
+
+ Thread.sleep(300);
+
+ se.shutdownNow();
+
+ assertFalse(a.isCompleted(300));
+
+ // make sure no new tasks go through
+ Task b = new Task("B");
+ se.execute(b);
+ assertFalse(b.isCompleted(500));
+ }
+
+ public void testGetExecutorThread() throws Exception
+ {
+ SerialExecutor se = getSerialExecutorToTest();
+
+ Task a = new Task("A", null, false);
+
+ // execution is not enabled yet on "A", nor the execution completion
+ se.execute(a);
+
+ a.enableExecution();
+
+ Thread.sleep(500);
+
+ Thread t = se.getExecutorThread();
+ assertNotNull(t);
+
+ a.enableExecutionCompletion();
+
+ a.waitForCompletion();
+ assertTrue(a.isCompleted());
+
+ Thread let = a.getLastExecutorThread();
+ assertEquals(t, let);
+ }
+
+ // Package protected ---------------------------------------------------------------------------
+
+ // Protected -----------------------------------------------------------------------------------
+
+ protected abstract SerialExecutor getSerialExecutorToTest();
+
+ // Private -------------------------------------------------------------------------------------
+
+ // Inner classes -------------------------------------------------------------------------------
+
+ protected class Task implements Runnable
+ {
+ private Latch executionEnabledLatch;
+ private Latch executionCompletionEnableddLatch;
+ private Latch executionCompletedLatch;
+ private String name;
+ private List<String> executionRecord;
+ private Thread lastExecutorThread;
+
+ /**
+ * By default, execution is disabled, you need to use enableExecution to enable it.
+ */
+ public Task()
+ {
+ this(null, null, true);
+ }
+
+ /**
+ * By default, execution is disabled, you need to use enableExecution to enable it.
+ */
+ public Task(String name)
+ {
+ this(name, null, true);
+ }
+
+ /**
+ * By default, execution is disabled, you need to use enableExecution to enable it.
+ */
+ public Task(String name, List<String> executionRecord)
+ {
+ this(name, executionRecord, true);
+ }
+
+ /**
+ * By default, execution is disabled, you need to use enableExecution to enable it.
+ */
+ public Task(String name, List<String> executionRecord, boolean executionCompletionEnabled)
+ {
+ if (name == null)
+ {
+ this.name = Integer.toHexString(System.identityHashCode(this));
+ }
+ else
+ {
+ this.name = name;
+ }
+
+ this.executionRecord = executionRecord;
+
+ executionEnabledLatch = new Latch();
+ executionCompletedLatch = new Latch();
+ executionCompletionEnableddLatch = new Latch();
+
+ if (executionCompletionEnabled)
+ {
+ executionCompletionEnableddLatch.release();
+ }
+ }
+
+ /**
+ * Allow this task to be executed.
+ */
+ public void enableExecution()
+ {
+ executionEnabledLatch.release();
+ }
+
+ /**
+ * Allow the execution of this task to complete.
+ */
+ public void enableExecutionCompletion()
+ {
+ executionCompletionEnableddLatch.release();
+ }
+
+ /**
+ * The method blocks, possibly forever, until the task is completely executed (the executor
+ * thread exits run()).
+ */
+ public void waitForCompletion() throws InterruptedException
+ {
+ executionCompletedLatch.acquire();
+ }
+
+ /**
+ * Test completion without waiting at all.
+ */
+ public boolean isCompleted() throws InterruptedException
+ {
+ return isCompleted(0);
+ }
+
+ /**
+ * Test completion waiting for a while.
+ */
+ public boolean isCompleted(long timeout) throws InterruptedException
+ {
+ return executionCompletedLatch.attempt(timeout);
+ }
+
+ /**
+ * @return the last thread that executed this task (may be null if the task was never
+ * executed)
+ */
+ public Thread getLastExecutorThread()
+ {
+ return lastExecutorThread;
+ }
+
+
+ public void run()
+ {
+ try
+ {
+ executionEnabledLatch.acquire();
+ }
+ catch(InterruptedException e)
+ {
+ log.warn("interrupted when acquiring executionEnabledLatch, exiting", e);
+ return;
+ }
+
+ log.info(this + " executing ...");
+
+ lastExecutorThread = Thread.currentThread();
+
+ if (executionRecord != null)
+ {
+ executionRecord.add(getName());
+ }
+
+ try
+ {
+ executionCompletionEnableddLatch.acquire();
+ }
+ catch(InterruptedException e)
+ {
+ log.warn("interrupted when acquiring executionCompletionEnableddLatch, exiting", e);
+ return;
+ }
+
+ executionCompletedLatch.release();
+ }
+
+ public String getName()
+ {
+ return name;
+ }
+
+ public String toString()
+ {
+ return "Task " + name;
+ }
+ }
+}
Property changes on: branches/Branch_Experimental_JBMESSAGING_1356_2/tests/src/org/jboss/test/messaging/util/base/SerialExecutorTestBase.java
___________________________________________________________________
Name: svn:executable
+ *
More information about the jboss-cvs-commits
mailing list