[jboss-cvs] JBoss Messaging SVN: r4459 - in branches/Branch_Experimental_JBMESSAGING_1356_2: src/main/org/jboss/jms/server and 7 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Thu Jun 12 21:08:10 EDT 2008


Author: ovidiu.feodorov at jboss.com
Date: 2008-06-12 21:08:10 -0400 (Thu, 12 Jun 2008)
New Revision: 4459

Added:
   branches/Branch_Experimental_JBMESSAGING_1356_2/src/main/org/jboss/messaging/util/DirectSerialExecutor.java
   branches/Branch_Experimental_JBMESSAGING_1356_2/src/main/org/jboss/messaging/util/PooledSerialExecutor.java
   branches/Branch_Experimental_JBMESSAGING_1356_2/src/main/org/jboss/messaging/util/QueuedSerialExecutor.java
   branches/Branch_Experimental_JBMESSAGING_1356_2/src/main/org/jboss/messaging/util/SerialExecutor.java
   branches/Branch_Experimental_JBMESSAGING_1356_2/tests/src/org/jboss/test/messaging/jms/PooledThreadsDeliveryTest.java
   branches/Branch_Experimental_JBMESSAGING_1356_2/tests/src/org/jboss/test/messaging/util/JBMExecutorTest.java
   branches/Branch_Experimental_JBMESSAGING_1356_2/tests/src/org/jboss/test/messaging/util/NamedThreadQueuedExecutorTest.java
   branches/Branch_Experimental_JBMESSAGING_1356_2/tests/src/org/jboss/test/messaging/util/PooledSerialExecutorTest.java
   branches/Branch_Experimental_JBMESSAGING_1356_2/tests/src/org/jboss/test/messaging/util/QueuedSerialExecutorTest.java
   branches/Branch_Experimental_JBMESSAGING_1356_2/tests/src/org/jboss/test/messaging/util/base/
   branches/Branch_Experimental_JBMESSAGING_1356_2/tests/src/org/jboss/test/messaging/util/base/SerialExecutorTestBase.java
Modified:
   branches/Branch_Experimental_JBMESSAGING_1356_2/src/main/org/jboss/jms/client/container/ClientConsumer.java
   branches/Branch_Experimental_JBMESSAGING_1356_2/src/main/org/jboss/jms/client/container/ConsumerAspect.java
   branches/Branch_Experimental_JBMESSAGING_1356_2/src/main/org/jboss/jms/server/ServerPeer.java
   branches/Branch_Experimental_JBMESSAGING_1356_2/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactoryJNDIMapper.java
   branches/Branch_Experimental_JBMESSAGING_1356_2/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java
   branches/Branch_Experimental_JBMESSAGING_1356_2/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
   branches/Branch_Experimental_JBMESSAGING_1356_2/src/main/org/jboss/messaging/util/NamedThreadQueuedExecutor.java
   branches/Branch_Experimental_JBMESSAGING_1356_2/tests/lib/jdbc-drivers/
Log:
applying the 4435, 4455 JBMESSAGING-1356 changes to Branch_Experimental_JBMESSAGING_1356_2 (rooted of 1.4.0.SP3.CP2)

Modified: branches/Branch_Experimental_JBMESSAGING_1356_2/src/main/org/jboss/jms/client/container/ClientConsumer.java
===================================================================
--- branches/Branch_Experimental_JBMESSAGING_1356_2/src/main/org/jboss/jms/client/container/ClientConsumer.java	2008-06-13 00:02:28 UTC (rev 4458)
+++ branches/Branch_Experimental_JBMESSAGING_1356_2/src/main/org/jboss/jms/client/container/ClientConsumer.java	2008-06-13 01:08:10 UTC (rev 4459)
@@ -42,10 +42,11 @@
 import org.jboss.logging.Logger;
 import org.jboss.messaging.core.contract.Message;
 import org.jboss.messaging.util.Future;
+import org.jboss.messaging.util.SerialExecutor;
 import org.jboss.messaging.util.prioritylinkedlist.BasicPriorityLinkedList;
 import org.jboss.messaging.util.prioritylinkedlist.PriorityLinkedList;
 
-import EDU.oswego.cs.dl.util.concurrent.QueuedExecutor;
+import EDU.oswego.cs.dl.util.concurrent.Executor;
 
 /**
  * @author <a href="mailto:ovidiu at feodorov.com">Ovidiu Feodorov</a>
@@ -206,7 +207,7 @@
    private int ackMode;
    private boolean closed;
    private Object mainLock;
-   private QueuedExecutor sessionExecutor;
+   private SerialExecutor sessionExecutor;
    private boolean listenerRunning;
    private int maxDeliveries;
    private String queueName;
@@ -230,7 +231,7 @@
    public ClientConsumer(boolean isCC, int ackMode,                                
                          SessionDelegate sess, ConsumerDelegate cons, String consumerID,
                          String queueName,
-                         int bufferSize, QueuedExecutor sessionExecutor,
+                         int bufferSize, SerialExecutor sessionExecutor,
                          int maxDeliveries, boolean shouldAck,
                          long redeliveryDelay)
    {
@@ -693,7 +694,7 @@
    {
       // Wait for any onMessage() executions to complete
 
-      if (Thread.currentThread().equals(sessionExecutor.getThread()))
+      if (Thread.currentThread().equals(sessionExecutor.getExecutorThread()))
       {
          // the current thread already closing this ClientConsumer (this happens when the
          // session is closed from within the MessageListener.onMessage(), for example), so no need

Modified: branches/Branch_Experimental_JBMESSAGING_1356_2/src/main/org/jboss/jms/client/container/ConsumerAspect.java
===================================================================
--- branches/Branch_Experimental_JBMESSAGING_1356_2/src/main/org/jboss/jms/client/container/ConsumerAspect.java	2008-06-13 00:02:28 UTC (rev 4458)
+++ branches/Branch_Experimental_JBMESSAGING_1356_2/src/main/org/jboss/jms/client/container/ConsumerAspect.java	2008-06-13 01:08:10 UTC (rev 4459)
@@ -36,9 +36,8 @@
 import org.jboss.jms.exception.MessagingShutdownException;
 import org.jboss.logging.Logger;
 import org.jboss.messaging.util.MessageQueueNameHelper;
+import org.jboss.messaging.util.SerialExecutor;
 
-import EDU.oswego.cs.dl.util.concurrent.QueuedExecutor;
-
 /**
  * 
  * Handles operations related to the consumer.
@@ -82,7 +81,7 @@
       ConsumerState consumerState = (ConsumerState)((DelegateSupport)consumerDelegate).getState();
       String consumerID = consumerState.getConsumerID();
       int prefetchSize = consumerState.getBufferSize();
-      QueuedExecutor sessionExecutor = sessionState.getExecutor();
+      SerialExecutor sessionExecutor = sessionState.getExecutor();
       int maxDeliveries = consumerState.getMaxDeliveries();
       long redeliveryDelay = consumerState.getRedeliveryDelay();
       

Modified: branches/Branch_Experimental_JBMESSAGING_1356_2/src/main/org/jboss/jms/server/ServerPeer.java
===================================================================
--- branches/Branch_Experimental_JBMESSAGING_1356_2/src/main/org/jboss/jms/server/ServerPeer.java	2008-06-13 00:02:28 UTC (rev 4458)
+++ branches/Branch_Experimental_JBMESSAGING_1356_2/src/main/org/jboss/jms/server/ServerPeer.java	2008-06-13 01:08:10 UTC (rev 4459)
@@ -86,6 +86,7 @@
 import org.w3c.dom.Element;
 
 import EDU.oswego.cs.dl.util.concurrent.ConcurrentReaderHashMap;
+import EDU.oswego.cs.dl.util.concurrent.PooledExecutor;
 
 /**
  * A JMS server peer.
@@ -1383,6 +1384,11 @@
       return "ServerPeer[" + getServerPeerID() + "]";
    }
 
+    public PooledExecutor getPooledExecutor()
+    {
+        return new PooledExecutor(10);
+    }
+
    // Package protected ----------------------------------------------------------------------------
 
    // Protected ------------------------------------------------------------------------------------

Modified: branches/Branch_Experimental_JBMESSAGING_1356_2/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactoryJNDIMapper.java
===================================================================
--- branches/Branch_Experimental_JBMESSAGING_1356_2/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactoryJNDIMapper.java	2008-06-13 00:02:28 UTC (rev 4458)
+++ branches/Branch_Experimental_JBMESSAGING_1356_2/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactoryJNDIMapper.java	2008-06-13 01:08:10 UTC (rev 4459)
@@ -54,10 +54,9 @@
 import org.jboss.messaging.util.JNDIUtil;
 import org.jboss.messaging.util.NamedThreadQueuedExecutor;
 import org.jboss.messaging.util.Version;
+import org.jboss.messaging.util.SerialExecutor;
 import org.jboss.remoting.InvokerLocator;
 
-import EDU.oswego.cs.dl.util.concurrent.QueuedExecutor;
-
 /**
  * @author <a href="mailto:ovidiu at feodorov.com">Ovidiu Feodorov</a>
  * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
@@ -90,7 +89,7 @@
 
    private Replicator replicator;
    
-   private QueuedExecutor notifyExecutor;
+   private SerialExecutor notifyExecutor;
 
    // Constructors ---------------------------------------------------------------------------------
 

Modified: branches/Branch_Experimental_JBMESSAGING_1356_2/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java
===================================================================
--- branches/Branch_Experimental_JBMESSAGING_1356_2/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java	2008-06-13 00:02:28 UTC (rev 4458)
+++ branches/Branch_Experimental_JBMESSAGING_1356_2/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java	2008-06-13 01:08:10 UTC (rev 4459)
@@ -245,9 +245,10 @@
          // create the corresponding server-side session endpoint and register it with this
          // connection endpoint instance
 
-         //Note we only replicate transacted and client acknowledge sessions.
-         ServerSessionEndpoint ep = new ServerSessionEndpoint(sessionID, this,
-         		                     transacted || acknowledgmentMode == Session.CLIENT_ACKNOWLEDGE);
+          //Note we only replicate transacted and client acknowledge sessions.
+          ServerSessionEndpoint ep = new ServerSessionEndpoint(
+              sessionID, this, transacted || acknowledgmentMode == Session.CLIENT_ACKNOWLEDGE,
+              serverPeer.getPooledExecutor());
 
          synchronized (sessions)
          {

Modified: branches/Branch_Experimental_JBMESSAGING_1356_2/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
===================================================================
--- branches/Branch_Experimental_JBMESSAGING_1356_2/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java	2008-06-13 00:02:28 UTC (rev 4458)
+++ branches/Branch_Experimental_JBMESSAGING_1356_2/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java	2008-06-13 01:08:10 UTC (rev 4459)
@@ -86,12 +86,14 @@
 import org.jboss.messaging.util.GUIDGenerator;
 import org.jboss.messaging.util.MessageQueueNameHelper;
 import org.jboss.messaging.util.NamedThreadQueuedExecutor;
+import org.jboss.messaging.util.SerialExecutor;
+import org.jboss.messaging.util.PooledSerialExecutor;
 import org.jboss.remoting.callback.Callback;
 import org.jboss.remoting.callback.ServerInvokerCallbackHandler;
 
 import EDU.oswego.cs.dl.util.concurrent.ConcurrentHashMap;
 import EDU.oswego.cs.dl.util.concurrent.LinkedQueue;
-import EDU.oswego.cs.dl.util.concurrent.QueuedExecutor;
+import EDU.oswego.cs.dl.util.concurrent.PooledExecutor;
 
 /**
  * The server side representation of a JMS session.
@@ -169,7 +171,7 @@
    private long deliveryIdSequence;
 
    //Temporary until we have our own NIO transport
-   QueuedExecutor executor;
+   SerialExecutor executor;
 
    private LinkedQueue toDeliver = new LinkedQueue();
 
@@ -182,12 +184,24 @@
 
    // Constructors ---------------------------------------------------------------------------------
 
+    /**
+     * @param pooledExecutor - a pre-configured pooled executor for the case we want to bound
+     *        the number of threads used by session endpoints throughout a VM. If null, we'll fall
+     *        back to a thread per session endpoint (see JBMESSAGING-1356)
+     */
    ServerSessionEndpoint(String sessionID, ServerConnectionEndpoint connectionEndpoint,
-   		                boolean replicating) throws Exception
+                         boolean replicating, PooledExecutor pooledExecutor) throws Exception
    {
       this.id = sessionID;
       
-      this.executor = new NamedThreadQueuedExecutor("jbm-server-session-" + sessionID);
+       if (pooledExecutor != null)
+       {
+           this.executor = new PooledSerialExecutor(pooledExecutor);
+       }
+       else
+       {
+           this.executor = new NamedThreadQueuedExecutor("jbm-server-session-" + sessionID);
+       }
 
       this.connectionEndpoint = connectionEndpoint;
 

Added: branches/Branch_Experimental_JBMESSAGING_1356_2/src/main/org/jboss/messaging/util/DirectSerialExecutor.java
===================================================================
--- branches/Branch_Experimental_JBMESSAGING_1356_2/src/main/org/jboss/messaging/util/DirectSerialExecutor.java	                        (rev 0)
+++ branches/Branch_Experimental_JBMESSAGING_1356_2/src/main/org/jboss/messaging/util/DirectSerialExecutor.java	2008-06-13 01:08:10 UTC (rev 4459)
@@ -0,0 +1,61 @@
+package org.jboss.messaging.util;
+
+import EDU.oswego.cs.dl.util.concurrent.QueuedExecutor;
+
+/**
+ * A SerialExecutor that uses the threads tasks are submitted to it.
+ *
+ * @author <a href="mailto:ovidiu at feodorov.com">Ovidiu Feodorov</a>
+ *
+ * @version <tt>$Revision$</tt>
+ *
+ * $Id$
+ */
+public class DirectSerialExecutor implements SerialExecutor
+{
+    // Constants -----------------------------------------------------------------------------------
+
+    // Static --------------------------------------------------------------------------------------
+
+    // Attributes ----------------------------------------------------------------------------------
+
+    // Constructors --------------------------------------------------------------------------------
+
+    public DirectSerialExecutor()
+    {
+    }
+
+    // Executor implementation ---------------------------------------------------------------------
+
+    public void execute(Runnable runnable) throws InterruptedException
+    {
+        throw new RuntimeException("NOT YET IMPLEMENTED");
+    }
+
+    // SerialExecutor implementation ---------------------------------------------------------------
+
+    public void shutdownAfterProcessingCurrentlyQueuedTasks()
+    {
+        throw new RuntimeException("NOT YET IMPLEMENTED");
+    }
+
+    public void shutdownNow()
+    {
+        throw new RuntimeException("NOT YET IMPLEMENTED");
+    }
+
+    public Thread getExecutorThread()
+    {
+        throw new RuntimeException("NOT YET IMPLEMENTED");
+    }
+
+    // Public --------------------------------------------------------------------------------------
+
+    // Package protected ---------------------------------------------------------------------------
+
+    // Protected -----------------------------------------------------------------------------------
+
+    // Private -------------------------------------------------------------------------------------
+
+    // Inner classes -------------------------------------------------------------------------------
+}


Property changes on: branches/Branch_Experimental_JBMESSAGING_1356_2/src/main/org/jboss/messaging/util/DirectSerialExecutor.java
___________________________________________________________________
Name: svn:executable
   + *

Modified: branches/Branch_Experimental_JBMESSAGING_1356_2/src/main/org/jboss/messaging/util/NamedThreadQueuedExecutor.java
===================================================================
--- branches/Branch_Experimental_JBMESSAGING_1356_2/src/main/org/jboss/messaging/util/NamedThreadQueuedExecutor.java	2008-06-13 00:02:28 UTC (rev 4458)
+++ branches/Branch_Experimental_JBMESSAGING_1356_2/src/main/org/jboss/messaging/util/NamedThreadQueuedExecutor.java	2008-06-13 01:08:10 UTC (rev 4459)
@@ -28,39 +28,44 @@
 import EDU.oswego.cs.dl.util.concurrent.ThreadFactory;
 
 /**
- * 
+ *
  * A NamedThreadQueuedExecutor
- * 
+ *
  * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @author <a href="mailto:ovidiu at feodorov.com">Ovidiu Feodorov</a>
  *
  */
-public class NamedThreadQueuedExecutor extends QueuedExecutor
-{
+public class NamedThreadQueuedExecutor extends QueuedExecutor implements SerialExecutor {
 	private static final Logger log = Logger.getLogger(NamedThreadQueuedExecutor.class);
-	  	  
+
 	private final String name;
-	
+
 	private static final ThreadGroup jbmGroup = new ThreadGroup("JBM-threads");
-	
-	public NamedThreadQueuedExecutor(String name)
+
+    public Thread getExecutorThread()
+    {
+        return super.getThread();
+    }
+
+    public NamedThreadQueuedExecutor(String name)
 	{
 		super(new LinkedQueue());
-		
+
 		this.name = name;
-		
+
 		setThreadFactory(new Factory());
-		
+
 		clearThread();
-		
+
 		restart();
 	}
-	
+
 	private class Factory implements ThreadFactory
 	{
 		public Thread newThread(Runnable command)
-		{			
+		{
 			return new Thread(jbmGroup, command, name);
 		}
-		
+
 	}
 }

Added: branches/Branch_Experimental_JBMESSAGING_1356_2/src/main/org/jboss/messaging/util/PooledSerialExecutor.java
===================================================================
--- branches/Branch_Experimental_JBMESSAGING_1356_2/src/main/org/jboss/messaging/util/PooledSerialExecutor.java	                        (rev 0)
+++ branches/Branch_Experimental_JBMESSAGING_1356_2/src/main/org/jboss/messaging/util/PooledSerialExecutor.java	2008-06-13 01:08:10 UTC (rev 4459)
@@ -0,0 +1,226 @@
+package org.jboss.messaging.util;
+
+import EDU.oswego.cs.dl.util.concurrent.PooledExecutor;
+import EDU.oswego.cs.dl.util.concurrent.BoundedBuffer;
+import org.jboss.logging.Logger;
+
+/**
+ * A SerialExecutor implementation that honors the contract of serial task execution while using
+ * an internal or externall pool of threads to carry on the actual executions.
+ *
+ * In case of using an external thread pool, this implementation allows creation of a large number
+ * of serial executors that use a relatively small numbers of threas, thus bounding uncontrolled
+ * proliferation of threads.
+ *
+ * @author <a href="mailto:ovidiu at feodorov.com">Ovidiu Feodorov</a>
+ *
+ * @version <tt>$Revision$</tt>
+ *
+ * $Id$
+ */
+public class PooledSerialExecutor implements SerialExecutor
+{
+    // Constants -----------------------------------------------------------------------------------
+
+    private static final Logger log = Logger.getLogger(PooledSerialExecutor.class);
+
+    public static final int DEFAULT_TASK_QUEUE_SIZE = 50;
+    public static final int DEFAULT_THREAD_COUNT = 3;
+
+    private static final Runnable SHUTDOWN_TASK = new Runnable() { public void run() {}};
+
+    // Static --------------------------------------------------------------------------------------
+
+    // Attributes ----------------------------------------------------------------------------------
+
+    private BoundedBuffer taskQueue;
+    private PooledExecutor delegate;
+    private boolean taskExecuting;
+    private volatile Thread currentlyExecuting;
+    private boolean shuttingDown;
+
+    // Constructors --------------------------------------------------------------------------------
+
+    public PooledSerialExecutor()
+    {
+        this(DEFAULT_TASK_QUEUE_SIZE);
+    }
+
+    public PooledSerialExecutor(int taskQueueSize)
+    {
+        this(taskQueueSize, new PooledExecutor(DEFAULT_THREAD_COUNT));
+    }
+
+    public PooledSerialExecutor(int taskQueueSize, int threadCount)
+    {
+        this(taskQueueSize, new PooledExecutor(threadCount));
+    }
+
+    public PooledSerialExecutor(PooledExecutor delegate)
+    {
+        this(DEFAULT_TASK_QUEUE_SIZE, delegate);
+    }
+
+    public PooledSerialExecutor(int taskQueueSize, PooledExecutor delegate)
+    {
+        this.delegate = delegate;
+        taskQueue = new BoundedBuffer(taskQueueSize);
+    }
+
+    // Executor implementation ---------------------------------------------------------------------
+
+    /**
+     * Note: if the tasks are queued faster than executed, this method will eventually block,
+     *       until downstream execution creates space in the queue again.
+     *
+     * @throws InterruptedException
+     */
+    public void execute(Runnable task) throws InterruptedException
+    {
+        taskQueue.put(task);
+        executeNext();
+    }
+
+    // SerialExecutor implementation ---------------------------------------------------------------
+
+    public void shutdownAfterProcessingCurrentlyQueuedTasks()
+    {
+        while(true)
+        {
+            try
+            {
+                // queue a shutdown task
+                taskQueue.put(SHUTDOWN_TASK);
+                return;
+            }
+            catch(InterruptedException e)
+            {
+                log.warn("failed to queue shutdown task, retrying ...", e);
+            }
+        }
+    }
+
+    public synchronized void shutdownNow()
+    {
+        if (shuttingDown)
+        {
+            return;
+        }
+
+        shuttingDown = true;
+
+        if (taskExecuting && currentlyExecuting != null)
+        {
+            currentlyExecuting.interrupt();
+        }
+
+        // clear the rest of the tasks
+        try
+        {
+            while(taskQueue.poll(0) != null)
+            {
+            }
+        }
+        catch(InterruptedException e)
+        {
+            log.warn(e);
+        }
+    }
+
+    public Thread getExecutorThread()
+    {
+        return currentlyExecuting;
+    }
+
+    // Public --------------------------------------------------------------------------------------
+
+    // Package protected ---------------------------------------------------------------------------
+
+    // Protected -----------------------------------------------------------------------------------
+
+    // Private -------------------------------------------------------------------------------------
+
+    private synchronized void executeNext() throws InterruptedException
+    {
+        if (taskExecuting)
+        {
+            // there's other active task already, get out
+            return;
+        }
+
+        // dequeue the oldest task
+        Runnable task = (Runnable)taskQueue.poll(0);
+
+        if (task == null)
+        {
+            // no tasks, get out
+            return;
+        }
+        else if (task == SHUTDOWN_TASK)
+        {
+            delegate.shutdownAfterProcessingCurrentlyQueuedTasks();
+            return;
+        }
+
+        taskExecuting = true; // TODO - what if .execute() throws exception, clean taskExecuting flag
+
+        // fire a new thread with the task we've just dequeued
+        // TODO Warning: for one-thread PooledExecutor, we will end up in recursion here
+        delegate.execute(new TaskWrapper(task));
+
+        // TODO - what happens if execution failes, the task will be lost, because it is dequeeued
+    }
+
+    // Inner classes -------------------------------------------------------------------------------
+
+    private class TaskWrapper implements Runnable
+    {
+        private Runnable task;
+
+        private TaskWrapper(Runnable task)
+        {
+            this.task = task;
+        }
+
+        public void run()
+        {
+            try
+            {
+                currentlyExecuting = Thread.currentThread();
+
+                task.run();
+            }
+            catch(Throwable t)
+            {
+                // task failed, log and get out
+                log.error("task " + task + " failed", t);
+            }
+            finally
+            {
+                synchronized(PooledSerialExecutor.this)
+                {
+                    taskExecuting = false;
+                    currentlyExecuting = null;
+
+                    if (shuttingDown)
+                    {
+                        return;
+                    }
+
+                    while(true)
+                    {
+                        try
+                        {
+                            executeNext();
+                            break;
+                        }
+                        catch(InterruptedException e)
+                        {
+                            log.warn("temporary failure to submit next task, retrying ...", e);
+                        }
+                    }
+                }
+            }
+        }
+    }
+}


Property changes on: branches/Branch_Experimental_JBMESSAGING_1356_2/src/main/org/jboss/messaging/util/PooledSerialExecutor.java
___________________________________________________________________
Name: svn:executable
   + *

Added: branches/Branch_Experimental_JBMESSAGING_1356_2/src/main/org/jboss/messaging/util/QueuedSerialExecutor.java
===================================================================
--- branches/Branch_Experimental_JBMESSAGING_1356_2/src/main/org/jboss/messaging/util/QueuedSerialExecutor.java	                        (rev 0)
+++ branches/Branch_Experimental_JBMESSAGING_1356_2/src/main/org/jboss/messaging/util/QueuedSerialExecutor.java	2008-06-13 01:08:10 UTC (rev 4459)
@@ -0,0 +1,67 @@
+package org.jboss.messaging.util;
+
+import EDU.oswego.cs.dl.util.concurrent.QueuedExecutor;
+
+/**
+ * A SerialExecutor that just simply delegates to a QueuedExecutor.
+ *
+ * @author <a href="mailto:ovidiu at feodorov.com">Ovidiu Feodorov</a>
+ *
+ * @version <tt>$Revision$</tt>
+ *
+ * $Id$
+ */
+public class QueuedSerialExecutor implements SerialExecutor
+{
+    // Constants -----------------------------------------------------------------------------------
+
+    // Static --------------------------------------------------------------------------------------
+
+    // Attributes ----------------------------------------------------------------------------------
+
+    private QueuedExecutor delegate;
+
+    // Constructors --------------------------------------------------------------------------------
+
+    /**
+     * Creates its delegate QueueExecutor.
+     */
+    public QueuedSerialExecutor()
+    {
+        delegate = new QueuedExecutor();
+    }
+
+    // Executor implementation ---------------------------------------------------------------------
+
+    public void execute(Runnable runnable) throws InterruptedException
+    {
+        delegate.execute(runnable);
+    }
+
+    // SerialExecutor implementation ---------------------------------------------------------------
+
+    public void shutdownAfterProcessingCurrentlyQueuedTasks()
+    {
+        delegate.shutdownAfterProcessingCurrentlyQueuedTasks();
+    }
+
+    public void shutdownNow()
+    {
+        delegate.shutdownNow();
+    }
+
+    public Thread getExecutorThread()
+    {
+        return delegate.getThread();
+    }
+
+    // Public --------------------------------------------------------------------------------------
+
+    // Package protected ---------------------------------------------------------------------------
+
+    // Protected -----------------------------------------------------------------------------------
+
+    // Private -------------------------------------------------------------------------------------
+
+    // Inner classes -------------------------------------------------------------------------------
+}


Property changes on: branches/Branch_Experimental_JBMESSAGING_1356_2/src/main/org/jboss/messaging/util/QueuedSerialExecutor.java
___________________________________________________________________
Name: svn:executable
   + *

Added: branches/Branch_Experimental_JBMESSAGING_1356_2/src/main/org/jboss/messaging/util/SerialExecutor.java
===================================================================
--- branches/Branch_Experimental_JBMESSAGING_1356_2/src/main/org/jboss/messaging/util/SerialExecutor.java	                        (rev 0)
+++ branches/Branch_Experimental_JBMESSAGING_1356_2/src/main/org/jboss/messaging/util/SerialExecutor.java	2008-06-13 01:08:10 UTC (rev 4459)
@@ -0,0 +1,47 @@
+package org.jboss.messaging.util;
+
+import EDU.oswego.cs.dl.util.concurrent.Executor;
+
+/**
+ * An Executor type that guarantees serial execution of tasks submitted to it: If task B is
+ * submitted after task A, on the same thread or different threads, then B's execution is
+ * guaranteed not to start unless A's execution finishes (successfully or otherwise).
+ *
+ * A QueuedExecutor is a possible implementation of such type, but the implementation is quite
+ * inefficient if we need a large number of such serial executors, in that it wastefully "books" a
+ * thread per instance. A different implementation may "borrow" execution threads from a common
+ * pool, while maintaining serial execution gurarantee.
+ *
+ * @author <a href="mailto:ovidiu at feodorov.com">Ovidiu Feodorov</a>
+ *
+ * @version <tt>$Revision$</tt>
+ *
+ * $Id$
+ */
+public interface SerialExecutor extends Executor
+{
+    /**
+     * Disassociates itself from all background threads after it processes all tasks currently in
+     * queue. Any tasks entered after this point will not be processed. A shut down serial executor
+     * may not be reused. This method may block if the task queue is finite and full.
+     */
+    void shutdownAfterProcessingCurrentlyQueuedTasks();
+
+    /**
+     * Terminate background thread even if it is currently processing a task. This method uses
+     * Thread.interrupt, so relies on tasks themselves responding appropriately to interruption. If
+     * the current tasks does not terminate on interruption, then the thread will not terminate
+     * until processing current task. A shut down thread cannot be restarted.
+     **/
+    void shutdownNow();
+
+    /**
+     * 
+     * @return the thread that is executing the current serial tasks, or is scheduled to execute
+     *         the next task, whenever that task may be submitted. Some SerialExecutor
+     *         implementation may chose to return null if no current thread is executing and the
+     *         thread that is going to execute next task is not known at the moment.
+     */
+    Thread getExecutorThread();
+    
+}


Property changes on: branches/Branch_Experimental_JBMESSAGING_1356_2/src/main/org/jboss/messaging/util/SerialExecutor.java
___________________________________________________________________
Name: svn:executable
   + *


Property changes on: branches/Branch_Experimental_JBMESSAGING_1356_2/tests/lib/jdbc-drivers
___________________________________________________________________
Name: svn:ignore
   + *.jar


Added: branches/Branch_Experimental_JBMESSAGING_1356_2/tests/src/org/jboss/test/messaging/jms/PooledThreadsDeliveryTest.java
===================================================================
--- branches/Branch_Experimental_JBMESSAGING_1356_2/tests/src/org/jboss/test/messaging/jms/PooledThreadsDeliveryTest.java	                        (rev 0)
+++ branches/Branch_Experimental_JBMESSAGING_1356_2/tests/src/org/jboss/test/messaging/jms/PooledThreadsDeliveryTest.java	2008-06-13 01:08:10 UTC (rev 4459)
@@ -0,0 +1,135 @@
+package org.jboss.test.messaging.jms;
+
+import javax.jms.Connection;
+import javax.jms.Session;
+import javax.jms.MessageProducer;
+import javax.jms.DeliveryMode;
+import javax.jms.TextMessage;
+import javax.jms.MessageConsumer;
+
+/**
+ * This is a container for all test cases related to
+ * http://jira.jboss.com/jira/browse/JBMESSAGING-1356: an attempt to minimize the number of threads 
+ * used by JBM sessions.
+ *
+ * @author <a href="mailto:ovidiu at feodorov.com">Ovidiu Feodorov</a>
+ *
+ * @version <tt>$Revision$</tt>
+ *
+ * $Id$
+ */
+public class PooledThreadsDeliveryTest extends JMSTestCase
+{
+    // Constants -----------------------------------------------------------------------------------
+
+    // Static --------------------------------------------------------------------------------------
+
+    // Attributes ----------------------------------------------------------------------------------
+
+    // Constructors --------------------------------------------------------------------------------
+
+    public PooledThreadsDeliveryTest(String name)
+    {
+       super(name);
+    }
+
+    // Public --------------------------------------------------------------------------------------
+
+    public void testOneMessage() throws Exception
+    {
+        Connection conn = null;
+
+        try
+        {
+            conn = cf.createConnection();
+
+            Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+            MessageProducer prod = session.createProducer(queue1);
+            prod.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+
+            TextMessage m = null;
+
+            m = session.createTextMessage("1");
+            prod.send(m);
+
+            session.close();
+
+            session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+            MessageConsumer cons = session.createConsumer(queue1);
+
+            conn.start();
+
+            m = (TextMessage)cons.receive();
+            assertEquals("1", m.getText());
+        }
+        finally
+        {
+            if (conn != null)
+            {
+                conn.close();
+            }
+        }
+    }
+
+    public void testMessageSequence() throws Exception
+    {
+        Connection conn = null;
+
+        try
+        {
+            conn = cf.createConnection();
+
+            Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+            MessageProducer prod = session.createProducer(queue1);
+            prod.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+
+            TextMessage m = null;
+
+            m = session.createTextMessage("1");
+            prod.send(m);
+
+            m = session.createTextMessage("2");
+            prod.send(m);
+
+            m = session.createTextMessage("3");
+            prod.send(m);
+
+            session.close();
+
+
+            session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+            MessageConsumer cons = session.createConsumer(queue1);
+
+            conn.start();
+
+            m = (TextMessage)cons.receive();
+            assertEquals("1", m.getText());
+
+            m = (TextMessage)cons.receive();
+            assertEquals("2", m.getText());
+
+            m = (TextMessage)cons.receive();
+            assertEquals("3", m.getText());
+            
+        }
+        finally
+        {
+            if (conn != null)
+            {
+                conn.close();
+            }
+        }
+    }
+
+    // Package protected ---------------------------------------------------------------------------
+
+    // Protected -----------------------------------------------------------------------------------
+
+    // Private -------------------------------------------------------------------------------------
+
+    // Inner classes -------------------------------------------------------------------------------
+}


Property changes on: branches/Branch_Experimental_JBMESSAGING_1356_2/tests/src/org/jboss/test/messaging/jms/PooledThreadsDeliveryTest.java
___________________________________________________________________
Name: svn:executable
   + *

Added: branches/Branch_Experimental_JBMESSAGING_1356_2/tests/src/org/jboss/test/messaging/util/JBMExecutorTest.java
===================================================================
--- branches/Branch_Experimental_JBMESSAGING_1356_2/tests/src/org/jboss/test/messaging/util/JBMExecutorTest.java	                        (rev 0)
+++ branches/Branch_Experimental_JBMESSAGING_1356_2/tests/src/org/jboss/test/messaging/util/JBMExecutorTest.java	2008-06-13 01:08:10 UTC (rev 4459)
@@ -0,0 +1,39 @@
+package org.jboss.test.messaging.util;
+
+import org.jboss.test.messaging.util.base.SerialExecutorTestBase;
+import org.jboss.messaging.util.SerialExecutor;
+import org.jboss.messaging.util.QueuedSerialExecutor;
+import org.jboss.messaging.util.JBMExecutor;
+
+/**
+ * @author <a href="mailto:ovidiu at feodorov.com">Ovidiu Feodorov</a>
+ *
+ * @version <tt>$Revision$</tt>
+ *
+ * $Id$
+ */
+public class JBMExecutorTest extends SerialExecutorTestBase 
+{
+    // Constants -----------------------------------------------------------------------------------
+
+    // Static --------------------------------------------------------------------------------------
+
+    // Attributes ----------------------------------------------------------------------------------
+
+    // Constructors --------------------------------------------------------------------------------
+
+    // Public --------------------------------------------------------------------------------------
+
+    // Package protected ---------------------------------------------------------------------------
+
+    // Protected -----------------------------------------------------------------------------------
+
+    // Private -------------------------------------------------------------------------------------
+
+    // Inner classes -------------------------------------------------------------------------------
+
+    protected SerialExecutor getSerialExecutorToTest()
+    {
+        return new JBMExecutor("test-instance");
+    }
+}


Property changes on: branches/Branch_Experimental_JBMESSAGING_1356_2/tests/src/org/jboss/test/messaging/util/JBMExecutorTest.java
___________________________________________________________________
Name: svn:executable
   + *

Added: branches/Branch_Experimental_JBMESSAGING_1356_2/tests/src/org/jboss/test/messaging/util/NamedThreadQueuedExecutorTest.java
===================================================================
--- branches/Branch_Experimental_JBMESSAGING_1356_2/tests/src/org/jboss/test/messaging/util/NamedThreadQueuedExecutorTest.java	                        (rev 0)
+++ branches/Branch_Experimental_JBMESSAGING_1356_2/tests/src/org/jboss/test/messaging/util/NamedThreadQueuedExecutorTest.java	2008-06-13 01:08:10 UTC (rev 4459)
@@ -0,0 +1,39 @@
+package org.jboss.test.messaging.util;
+
+import org.jboss.test.messaging.util.base.SerialExecutorTestBase;
+import org.jboss.messaging.util.SerialExecutor;
+import org.jboss.messaging.util.QueuedSerialExecutor;
+import org.jboss.messaging.util.NamedThreadQueuedExecutor;
+
+/**
+ * @author <a href="mailto:ovidiu at feodorov.com">Ovidiu Feodorov</a>
+ *
+ * @version <tt>$Revision$</tt>
+ *
+ * $Id$
+ */
+public class NamedThreadQueuedExecutorTest extends SerialExecutorTestBase 
+{
+    // Constants -----------------------------------------------------------------------------------
+
+    // Static --------------------------------------------------------------------------------------
+
+    // Attributes ----------------------------------------------------------------------------------
+
+    // Constructors --------------------------------------------------------------------------------
+
+    // Public --------------------------------------------------------------------------------------
+
+    // Package protected ---------------------------------------------------------------------------
+
+    // Protected -----------------------------------------------------------------------------------
+
+    // Private -------------------------------------------------------------------------------------
+
+    // Inner classes -------------------------------------------------------------------------------
+
+    protected SerialExecutor getSerialExecutorToTest()
+    {
+        return new NamedThreadQueuedExecutor("test-instance");
+    }
+}


Property changes on: branches/Branch_Experimental_JBMESSAGING_1356_2/tests/src/org/jboss/test/messaging/util/NamedThreadQueuedExecutorTest.java
___________________________________________________________________
Name: svn:executable
   + *

Added: branches/Branch_Experimental_JBMESSAGING_1356_2/tests/src/org/jboss/test/messaging/util/PooledSerialExecutorTest.java
===================================================================
--- branches/Branch_Experimental_JBMESSAGING_1356_2/tests/src/org/jboss/test/messaging/util/PooledSerialExecutorTest.java	                        (rev 0)
+++ branches/Branch_Experimental_JBMESSAGING_1356_2/tests/src/org/jboss/test/messaging/util/PooledSerialExecutorTest.java	2008-06-13 01:08:10 UTC (rev 4459)
@@ -0,0 +1,694 @@
+package org.jboss.test.messaging.util;
+
+import org.jboss.test.messaging.util.base.SerialExecutorTestBase;
+import org.jboss.messaging.util.SerialExecutor;
+import org.jboss.messaging.util.PooledSerialExecutor;
+import org.jboss.logging.Logger;
+import EDU.oswego.cs.dl.util.concurrent.Latch;
+import EDU.oswego.cs.dl.util.concurrent.PooledExecutor;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Collections;
+
+/**
+ * @author <a href="mailto:ovidiu at feodorov.com">Ovidiu Feodorov</a>
+ *
+ * @version <tt>$Revision$</tt>
+ *
+ * $Id$
+ */
+public class PooledSerialExecutorTest extends SerialExecutorTestBase
+{
+    // Constants -----------------------------------------------------------------------------------
+
+    private static final Logger log = Logger.getLogger(PooledSerialExecutorTest.class);
+
+    // Static --------------------------------------------------------------------------------------
+
+    // Attributes ----------------------------------------------------------------------------------
+
+    // Constructors --------------------------------------------------------------------------------
+
+    // Public --------------------------------------------------------------------------------------
+
+    public void testQueueingFasterThanExecution() throws Exception
+    {
+        // we force the task queue to be the smallest possible
+        final SerialExecutor se = new PooledSerialExecutor(1);
+
+        Task a = new Task("A");
+        se.execute(a); // this will be submitted to execution and will keep the executor "occupied"
+
+        Task b = new Task("B");
+        se.execute(b); // this will clog the queue
+
+        final Task c = new Task("C"); // this one will block at submission
+        final Latch cSubmissionLatch = new Latch();
+        new Thread(new Runnable()
+        {
+            public void run()
+            {
+                try
+                {
+                    se.execute(c);
+                    cSubmissionLatch.release();
+                }
+                catch(Exception e)
+                {
+                    log.error(e);
+                }
+            }
+        }, "This thread will block").start();
+
+
+        // at this point, the executor is clogged
+        assertFalse(cSubmissionLatch.attempt(2000));
+
+        // allow A to execute
+        a.enableExecution();
+
+        // this will unclog the executor
+        cSubmissionLatch.acquire();
+        assertTrue(cSubmissionLatch.attempt(0));
+    }
+
+    public void testFloodWithOneThread() throws Exception
+    {
+        // flood a one-thread executor with tasks
+        SerialExecutor se = new PooledSerialExecutor(1000, 1);
+
+        final List<String> executionRecord = Collections.synchronizedList(new ArrayList<String>());
+
+        final int taskCount = 500;
+        final List<Task> tasksInOrderOfSubmission = new ArrayList<Task>(taskCount);
+        for(int i = 0; i < taskCount; i++)
+        {
+            // execution is not enabled
+            Task t = new Task(Integer.toString(i), executionRecord);
+            tasksInOrderOfSubmission.add(t);
+            se.execute(t);
+        }
+
+        for(Task t: tasksInOrderOfSubmission)
+        {
+            assertFalse(t.isCompleted());
+        }
+
+        // enable execution in order inverse to submission
+        for(int i = taskCount - 1; i >= 0; i --)
+        {
+            tasksInOrderOfSubmission.get(i).enableExecution();
+        }
+
+        // wait until all tasks are executed
+        for(Task t: tasksInOrderOfSubmission)
+        {
+            t.waitForCompletion();
+        }
+
+        // make sure the execution order is correct
+        assertEquals(taskCount, executionRecord.size());
+
+        int i = 0;
+        for(String s: executionRecord)
+        {
+            assertEquals(Integer.toString(i), s);
+            i ++;
+        }
+    }
+
+    public void testStackOverflow() throws Exception
+    {
+        fail("I KNOW THIS TEST FAILS, MUST FIX THE IMPLEMENATION");
+
+        // flood a one-thread executor with tasks
+        SerialExecutor se = new PooledSerialExecutor(100000, 1);
+
+        final List<String> executionRecord = Collections.synchronizedList(new ArrayList<String>());
+
+        final int taskCount = 100000;
+        final List<Task> tasksInOrderOfSubmission = new ArrayList<Task>(taskCount);
+        for(int i = 0; i < taskCount; i++)
+        {
+            // execution is not enabled
+            Task t = new Task(Integer.toString(i), executionRecord);
+            tasksInOrderOfSubmission.add(t);
+            se.execute(t);
+        }
+
+        // enable execution in order inverse to submission - this will throw StackOverflow
+        for(int i = taskCount - 1; i >= 0; i --)
+        {
+            tasksInOrderOfSubmission.get(i).enableExecution();
+        }
+
+        // wait until the last task is executed
+        tasksInOrderOfSubmission.get(taskCount - 1).waitForCompletion();
+    }
+
+    public void testFloodWithTwoThreads() throws Exception
+    {
+        // flood a two-thread executor with tasks
+        SerialExecutor se = new PooledSerialExecutor(1000, 2);
+
+        final List<String> executionRecord = Collections.synchronizedList(new ArrayList<String>());
+
+        final int taskCount = 500;
+        final List<Task> tasksInOrderOfSubmission = new ArrayList<Task>(taskCount);
+        for(int i = 0; i < taskCount; i++)
+        {
+            // execution is not enabled
+            Task t = new Task(Integer.toString(i), executionRecord);
+            tasksInOrderOfSubmission.add(t);
+            se.execute(t);
+        }
+
+        for(Task t: tasksInOrderOfSubmission)
+        {
+            assertFalse(t.isCompleted());
+        }
+
+        // enable execution in order inverse to submission
+        for(int i = taskCount - 1; i >= 0; i --)
+        {
+            tasksInOrderOfSubmission.get(i).enableExecution();
+        }
+
+        // wait until all tasks are executed
+        for(Task t: tasksInOrderOfSubmission)
+        {
+            t.waitForCompletion();
+        }
+
+        // make sure the execution order is correct
+        assertEquals(taskCount, executionRecord.size());
+
+        int i = 0;
+        for(String s: executionRecord)
+        {
+            assertEquals(Integer.toString(i), s);
+            i ++;
+        }
+    }
+
+    public void testExternalPoolExecutor_OnePooledThread_TwoSerialExecutors() throws Exception
+    {
+        // lockstep tests, two serial executors, shared pool (1 thread)
+
+        List<String> executionRecord = Collections.synchronizedList(new ArrayList<String>());
+        List<String> executionRecord2 = Collections.synchronizedList(new ArrayList<String>());
+
+        PooledExecutor shared = new PooledExecutor(1);
+
+        final PooledSerialExecutor se = new PooledSerialExecutor(shared);
+        final PooledSerialExecutor se2 = new PooledSerialExecutor(shared);
+
+        Task a = new Task("A", executionRecord);
+
+        se.execute(a);
+
+        final Task b = new Task("B", executionRecord);
+
+        // the thread that "launches" b will block temporarily, so do it from a fresh new one
+        new Thread(new Runnable()
+        {
+            public void run()
+            {
+                try
+                {
+                    se.execute(b);
+                }
+                catch(InterruptedException e)
+                {
+                    log.warn("interrupted", e);
+                }
+            }
+        }, "B Launcher").start();
+
+
+        final Task c = new Task("C", executionRecord2);
+        final Latch afterC = new Latch();
+
+        // the thread that "launches" c will block temporarily, so do it from a fresh new one
+        new Thread(new Runnable()
+        {
+            public void run()
+            {
+                try
+                {
+                    se2.execute(c);
+                    afterC.release();
+                }
+                catch(InterruptedException e)
+                {
+                    log.warn("interrupted", e);
+                }
+            }
+        }, "C Launcher").start();
+
+        final Task d = new Task("D", executionRecord2);
+
+        // the thread that "launches" d will block temporarily, so do it from a fresh new one
+        new Thread(new Runnable()
+        {
+            public void run()
+            {
+                try
+                {
+                    afterC.acquire();
+                    se2.execute(d);
+                }
+                catch(InterruptedException e)
+                {
+                    log.warn("interrupted", e);
+                }
+            }
+        }, "D Launcher").start();
+
+        assertFalse(a.isCompleted(500));
+        assertFalse(b.isCompleted(500));
+        assertFalse(c.isCompleted(500));
+        assertFalse(d.isCompleted(500));
+
+        // enable execution of everyone, in the inverse order of launching
+        d.enableExecution();
+        c.enableExecution();
+        b.enableExecution();
+        a.enableExecution();
+
+        a.waitForCompletion();
+        b.waitForCompletion();
+        c.waitForCompletion();
+        d.waitForCompletion();
+
+        assertTrue(a.isCompleted());
+        assertTrue(b.isCompleted());
+        assertTrue(c.isCompleted());
+        assertTrue(d.isCompleted());
+
+        assertEquals(2, executionRecord.size());
+        assertEquals("A", executionRecord.get(0));
+        assertEquals("B", executionRecord.get(1));
+
+        assertEquals(2, executionRecord2.size());
+        assertEquals("C", executionRecord2.get(0));
+        assertEquals("D", executionRecord2.get(1));
+    }
+
+    public void testExternalPoolExecutor_OnePooledThreads_Flood() throws Exception
+    {
+        List<String> executionRecord = Collections.synchronizedList(new ArrayList<String>());
+        List<String> executionRecord2 = Collections.synchronizedList(new ArrayList<String>());
+
+        PooledExecutor shared = new PooledExecutor(1);
+
+        final PooledSerialExecutor se = new PooledSerialExecutor(shared);
+        final PooledSerialExecutor se2 = new PooledSerialExecutor(shared);
+
+        int taskCount = 200;
+
+        Task firstA = null, lastA = null;
+
+        final List<Latch> latches = new ArrayList<Latch>(taskCount + 1);
+
+        for(int i = 0; i < taskCount + 1; i++)
+        {
+            latches.add(new Latch());
+        }
+
+        for(int i = 0; i < taskCount; i++)
+        {
+            String taskName = "A" + Integer.toString(i);
+            final Task t = new Task(taskName, executionRecord);
+
+            // enable all, but the first
+            if (i == 0)
+            {
+                firstA = t;
+            }
+            else
+            {
+                t.enableExecution();
+
+                if (i == taskCount - 1)
+                {
+                    lastA = t;
+                }
+            }
+
+            final int j = i;
+            new Thread(new Runnable()
+            {
+                public void run()
+                {
+                    try
+                    {
+                        latches.get(j).acquire();
+                        se.execute(t);
+                        latches.get(j + 1).release();
+                    }
+                    catch(InterruptedException e)
+                    {
+                        log.warn("interrupted", e);
+                    }
+                }
+            }, taskName + " Launcher").start();
+        }
+
+        // start "sending"
+        latches.get(0).release();
+
+        // now the second executor
+
+        Task firstB = null, lastB = null;
+
+        final List<Latch> latches2 = new ArrayList<Latch>(taskCount + 1);
+
+        for(int i = 0; i < taskCount + 1; i++)
+        {
+            latches2.add(new Latch());
+        }
+
+        for(int i = 0; i < taskCount; i++)
+        {
+            String taskName = "B" + Integer.toString(i);
+            final Task t = new Task(taskName, executionRecord2);
+
+            // enable all, but the first
+            if (i == 0)
+            {
+                firstB = t;
+            }
+            else
+            {
+                t.enableExecution();
+
+                if (i == taskCount - 1)
+                {
+                    lastB = t;
+                }
+            }
+
+            final int j = i;
+            new Thread(new Runnable()
+            {
+                public void run()
+                {
+                    try
+                    {
+                        latches2.get(j).acquire();
+                        se2.execute(t);
+                        latches2.get(j + 1).release();
+                    }
+                    catch(InterruptedException e)
+                    {
+                        log.warn("interrupted", e);
+                    }
+                }
+            }, taskName + " Launcher").start();
+        }
+
+        // start "sending"
+        latches2.get(0).release();
+
+
+        // "unclog" executors
+        firstB.enableExecution();
+        firstA.enableExecution();
+
+        // wait for all tasks to complete
+        lastB.waitForCompletion();
+        lastA.waitForCompletion();
+
+        // make sure the order is correct
+        assertEquals(taskCount, executionRecord.size());
+        for(int i = 0; i < taskCount; i ++)
+        {
+            assertEquals("A" + i, executionRecord.get(i));
+        }
+
+        assertEquals(taskCount, executionRecord2.size());
+        for(int i = 0; i < taskCount; i ++)
+        {
+            assertEquals("B" + i, executionRecord2.get(i));
+        }
+    }
+
+    public void testExternalPoolExecutor_TwoPooledThreads_TwoSerialExecutors() throws Exception
+    {
+        // lockstep tests, two serial executors, shared pool (2 threads)
+
+        List<String> executionRecord = Collections.synchronizedList(new ArrayList<String>());
+        List<String> executionRecord2 = Collections.synchronizedList(new ArrayList<String>());
+
+        PooledExecutor shared = new PooledExecutor(2);
+
+        final PooledSerialExecutor se = new PooledSerialExecutor(shared);
+        final PooledSerialExecutor se2 = new PooledSerialExecutor(shared);
+
+        Task a = new Task("A", executionRecord);
+
+        se.execute(a);
+
+        final Task b = new Task("B", executionRecord);
+
+        // the thread that "launches" b will block temporarily, so do it from a fresh new one
+        new Thread(new Runnable()
+        {
+            public void run()
+            {
+                try
+                {
+                    se.execute(b);
+                }
+                catch(InterruptedException e)
+                {
+                    log.warn("interrupted", e);
+                }
+            }
+        }, "B Launcher").start();
+
+
+        final Task c = new Task("C", executionRecord2);
+        final Latch afterC = new Latch();
+
+        // the thread that "launches" c may block temporarily, so do it from a fresh new one
+        new Thread(new Runnable()
+        {
+            public void run()
+            {
+                try
+                {
+                    se2.execute(c);
+                    afterC.release();
+                }
+                catch(InterruptedException e)
+                {
+                    log.warn("interrupted", e);
+                }
+            }
+        }, "C Launcher").start();
+
+        final Task d = new Task("D", executionRecord2);
+
+        // the thread that "launches" d will block temporarily, so do it from a fresh new one
+        new Thread(new Runnable()
+        {
+            public void run()
+            {
+                try
+                {
+                    afterC.acquire();
+                    se2.execute(d);
+                }
+                catch(InterruptedException e)
+                {
+                    log.warn("interrupted", e);
+                }
+            }
+        }, "D Launcher").start();
+
+        assertFalse(a.isCompleted(500));
+        assertFalse(b.isCompleted(500));
+        assertFalse(c.isCompleted(500));
+        assertFalse(d.isCompleted(500));
+
+        // enable execution of everyone, in the inverse order of launching
+        d.enableExecution();
+        c.enableExecution();
+        b.enableExecution();
+        a.enableExecution();
+
+        a.waitForCompletion();
+        b.waitForCompletion();
+        c.waitForCompletion();
+        d.waitForCompletion();
+
+        assertTrue(a.isCompleted());
+        assertTrue(b.isCompleted());
+        assertTrue(c.isCompleted());
+        assertTrue(d.isCompleted());
+
+        assertEquals(2, executionRecord.size());
+        assertEquals("A", executionRecord.get(0));
+        assertEquals("B", executionRecord.get(1));
+
+        assertEquals(2, executionRecord2.size());
+        assertEquals("C", executionRecord2.get(0));
+        assertEquals("D", executionRecord2.get(1));
+    }
+
+    public void testExternalPoolExecutor_TwoPooledThreads_Flood() throws Exception
+    {
+        List<String> executionRecord = Collections.synchronizedList(new ArrayList<String>());
+        List<String> executionRecord2 = Collections.synchronizedList(new ArrayList<String>());
+
+        PooledExecutor shared = new PooledExecutor(2);
+
+        final PooledSerialExecutor se = new PooledSerialExecutor(shared);
+        final PooledSerialExecutor se2 = new PooledSerialExecutor(shared);
+
+        int taskCount = 200;
+
+        Task firstA = null, lastA = null;
+
+        final List<Latch> latches = new ArrayList<Latch>(taskCount + 1);
+
+        for(int i = 0; i < taskCount + 1; i++)
+        {
+            latches.add(new Latch());
+        }
+
+        for(int i = 0; i < taskCount; i++)
+        {
+            String taskName = "A" + Integer.toString(i);
+            final Task t = new Task(taskName, executionRecord);
+
+            // enable all, but the first
+            if (i == 0)
+            {
+                firstA = t;
+            }
+            else
+            {
+                t.enableExecution();
+
+                if (i == taskCount - 1)
+                {
+                    lastA = t;
+                }
+            }
+
+            final int j = i;
+            new Thread(new Runnable()
+            {
+                public void run()
+                {
+                    try
+                    {
+                        latches.get(j).acquire();
+                        se.execute(t);
+                        latches.get(j + 1).release();
+                    }
+                    catch(InterruptedException e)
+                    {
+                        log.warn("interrupted", e);
+                    }
+                }
+            }, taskName + " Launcher").start();
+        }
+
+        // start "sending"
+        latches.get(0).release();
+
+        // now the second executor
+
+        Task firstB = null, lastB = null;
+
+        final List<Latch> latches2 = new ArrayList<Latch>(taskCount + 1);
+
+        for(int i = 0; i < taskCount + 1; i++)
+        {
+            latches2.add(new Latch());
+        }
+
+        for(int i = 0; i < taskCount; i++)
+        {
+            String taskName = "B" + Integer.toString(i);
+            final Task t = new Task(taskName, executionRecord2);
+
+            // enable all, but the first
+            if (i == 0)
+            {
+                firstB = t;
+            }
+            else
+            {
+                t.enableExecution();
+
+                if (i == taskCount - 1)
+                {
+                    lastB = t;
+                }
+            }
+
+            final int j = i;
+            new Thread(new Runnable()
+            {
+                public void run()
+                {
+                    try
+                    {
+                        latches2.get(j).acquire();
+                        se2.execute(t);
+                        latches2.get(j + 1).release();
+                    }
+                    catch(InterruptedException e)
+                    {
+                        log.warn("interrupted", e);
+                    }
+                }
+            }, taskName + " Launcher").start();
+        }
+
+        // start "sending"
+        latches2.get(0).release();
+
+
+        // "unclog" executors
+        firstB.enableExecution();
+        firstA.enableExecution();
+
+        // wait for all tasks to complete
+        lastB.waitForCompletion();
+        lastA.waitForCompletion();
+
+        // make sure the order is correct
+        assertEquals(taskCount, executionRecord.size());
+        for(int i = 0; i < taskCount; i ++)
+        {
+            assertEquals("A" + i, executionRecord.get(i));
+        }
+
+        assertEquals(taskCount, executionRecord2.size());
+        for(int i = 0; i < taskCount; i ++)
+        {
+            assertEquals("B" + i, executionRecord2.get(i));
+        }
+    }
+
+    // Package protected ---------------------------------------------------------------------------
+
+    // Protected -----------------------------------------------------------------------------------
+
+    // Private -------------------------------------------------------------------------------------
+
+    // Inner classes -------------------------------------------------------------------------------
+
+    @Override
+    protected SerialExecutor getSerialExecutorToTest()
+    {
+        return new PooledSerialExecutor();
+    }
+}


Property changes on: branches/Branch_Experimental_JBMESSAGING_1356_2/tests/src/org/jboss/test/messaging/util/PooledSerialExecutorTest.java
___________________________________________________________________
Name: svn:executable
   + *

Added: branches/Branch_Experimental_JBMESSAGING_1356_2/tests/src/org/jboss/test/messaging/util/QueuedSerialExecutorTest.java
===================================================================
--- branches/Branch_Experimental_JBMESSAGING_1356_2/tests/src/org/jboss/test/messaging/util/QueuedSerialExecutorTest.java	                        (rev 0)
+++ branches/Branch_Experimental_JBMESSAGING_1356_2/tests/src/org/jboss/test/messaging/util/QueuedSerialExecutorTest.java	2008-06-13 01:08:10 UTC (rev 4459)
@@ -0,0 +1,38 @@
+package org.jboss.test.messaging.util;
+
+import org.jboss.test.messaging.util.base.SerialExecutorTestBase;
+import org.jboss.messaging.util.SerialExecutor;
+import org.jboss.messaging.util.QueuedSerialExecutor;
+
+/**
+ * @author <a href="mailto:ovidiu at feodorov.com">Ovidiu Feodorov</a>
+ *
+ * @version <tt>$Revision$</tt>
+ *
+ * $Id$
+ */
+public class QueuedSerialExecutorTest extends SerialExecutorTestBase
+{
+    // Constants -----------------------------------------------------------------------------------
+
+    // Static --------------------------------------------------------------------------------------
+
+    // Attributes ----------------------------------------------------------------------------------
+
+    // Constructors --------------------------------------------------------------------------------
+
+    // Public --------------------------------------------------------------------------------------
+
+    // Package protected ---------------------------------------------------------------------------
+
+    // Protected -----------------------------------------------------------------------------------
+
+    // Private -------------------------------------------------------------------------------------
+
+    // Inner classes -------------------------------------------------------------------------------
+
+    protected SerialExecutor getSerialExecutorToTest()
+    {
+        return new QueuedSerialExecutor();
+    }
+}


Property changes on: branches/Branch_Experimental_JBMESSAGING_1356_2/tests/src/org/jboss/test/messaging/util/QueuedSerialExecutorTest.java
___________________________________________________________________
Name: svn:executable
   + *

Added: branches/Branch_Experimental_JBMESSAGING_1356_2/tests/src/org/jboss/test/messaging/util/base/SerialExecutorTestBase.java
===================================================================
--- branches/Branch_Experimental_JBMESSAGING_1356_2/tests/src/org/jboss/test/messaging/util/base/SerialExecutorTestBase.java	                        (rev 0)
+++ branches/Branch_Experimental_JBMESSAGING_1356_2/tests/src/org/jboss/test/messaging/util/base/SerialExecutorTestBase.java	2008-06-13 01:08:10 UTC (rev 4459)
@@ -0,0 +1,532 @@
+package org.jboss.test.messaging.util.base;
+
+import org.jboss.messaging.util.SerialExecutor;
+import org.jboss.test.messaging.util.ProxyAssertSupport;
+import org.jboss.logging.Logger;
+import EDU.oswego.cs.dl.util.concurrent.Latch;
+import EDU.oswego.cs.dl.util.concurrent.Slot;
+
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Collections;
+
+/**
+ * A base class that tests the contract of SerialExecutor types.
+ *
+ * @author <a href="mailto:ovidiu at feodorov.com">Ovidiu Feodorov</a>
+ *
+ * @version <tt>$Revision$</tt>
+ *
+ * $Id$
+ */
+public abstract class SerialExecutorTestBase extends ProxyAssertSupport
+{
+    // Constants -----------------------------------------------------------------------------------
+
+    private static final Logger log = Logger.getLogger(SerialExecutorTestBase.class);
+
+    // Static --------------------------------------------------------------------------------------
+
+    // Attributes ----------------------------------------------------------------------------------
+
+    // Constructors --------------------------------------------------------------------------------
+
+    // Public --------------------------------------------------------------------------------------
+
+    public void testSimpleExecution() throws Exception
+    {
+        SerialExecutor se = getSerialExecutorToTest();
+
+        Task task = new Task();
+        task.enableExecution();
+
+        se.execute(task);
+
+        task.waitForCompletion();
+
+        assertTrue(task.isCompleted());
+    }
+
+    public void testSerialExecution_SequentialSubmission() throws Exception
+    {
+        SerialExecutor se = getSerialExecutorToTest();
+        List<String> executionRecord = Collections.synchronizedList(new ArrayList<String>());
+
+        Task a = new Task("A", executionRecord);
+        a.enableExecution();
+
+        se.execute(a);
+
+        Task b = new Task("B", executionRecord);
+        b.enableExecution();
+
+        se.execute(b);
+
+        a.waitForCompletion();
+        b.waitForCompletion();
+
+        assertTrue(a.isCompleted());
+        assertTrue(b.isCompleted());
+
+        assertEquals(2, executionRecord.size());
+        assertEquals("A", executionRecord.get(0));
+        assertEquals("B", executionRecord.get(1));
+    }
+
+    public void testSerialExecution_IntertwinedSubmission() throws Exception
+    {
+        SerialExecutor se = getSerialExecutorToTest();
+        List<String> executionRecord = Collections.synchronizedList(new ArrayList<String>());
+
+        Task a = new Task("A", executionRecord);
+
+        // execution is not enabled yet on "A"
+
+        se.execute(a);
+
+        // submit "B" with execution enabled
+
+        Task b = new Task("B", executionRecord);
+        b.enableExecution();
+
+        se.execute(b);
+
+        // wait a bit, to make sure the scheduler is given a chance to attempt the execution of "B"
+
+        Thread.sleep(1000);
+
+        assertFalse(b.isCompleted());
+
+        // enable execution on "A"
+
+        a.enableExecution();
+
+        a.waitForCompletion();
+        b.waitForCompletion();
+
+        assertTrue(a.isCompleted());
+        assertTrue(b.isCompleted());
+
+        assertEquals(2, executionRecord.size());
+        assertEquals("A", executionRecord.get(0));
+        assertEquals("B", executionRecord.get(1));
+    }
+
+    public void testSerialExecution_IntertwinedSubmission_2() throws Exception
+    {
+        SerialExecutor se = getSerialExecutorToTest();
+        List<String> executionRecord = Collections.synchronizedList(new ArrayList<String>());
+
+        Task a = new Task("A", executionRecord);
+
+        // execution is not enabled yet on "A", but "A" is submitted first
+
+        se.execute(a);
+
+        // execution is not enabled yet on "B", "B" is submitted second
+
+        Task b = new Task("B", executionRecord);
+
+        se.execute(b);
+
+        // enable execution on "B"
+        b.enableExecution();
+
+        // in case of faulty serial executor implementation, there's a chance that "B" passes "A"
+        // so test for that
+
+        assertFalse(b.isCompleted(2000));
+
+        // enable execution on "A"
+
+        a.enableExecution();
+
+        a.waitForCompletion();
+        b.waitForCompletion();
+
+        assertTrue(a.isCompleted());
+        assertTrue(b.isCompleted());
+
+        assertEquals(2, executionRecord.size());
+        assertEquals("A", executionRecord.get(0));
+        assertEquals("B", executionRecord.get(1));
+    }
+
+    public void testSerialExecution_Flood() throws Exception
+    {
+        // flood the executor with a number of tasks and enable execution in the inverse order of
+        // flooding. If the serial order contract is not honored, we'll get a messed up execution
+        // record
+
+        final SerialExecutor se = getSerialExecutorToTest();
+        final List<String> executionRecord = Collections.synchronizedList(new ArrayList<String>());
+
+        final int taskCount = 200;
+        final List<Task> tasksInOrderOfSubmission = new ArrayList<Task>(taskCount);
+        for(int i = 0; i < taskCount; i++)
+        {
+            // execution is not enabled
+            Task t = new Task(Integer.toString(i), executionRecord);
+            tasksInOrderOfSubmission.add(t);
+        }
+
+        final Slot feedingResult = new Slot();
+
+        // feed tasks from a separate thread to prevent test deadlock in case the internal task
+        // queue (if exists at all) is too small
+        new Thread(new Runnable()
+        {
+            public void run()
+            {
+                try
+                {
+                    for(int i = 0; i < taskCount; i++)
+                    {
+                        try
+                        {
+                            Task t = tasksInOrderOfSubmission.get(i);
+                            se.execute(t);
+                        }
+                        catch(Exception e)
+                        {
+                            log.error(e);
+                            feedingResult.put(e);
+                            return;
+                        }
+                    }
+
+                    // feeding done
+                    feedingResult.put(Boolean.TRUE);
+                }
+                catch(Exception e)
+                {
+                    log.error(e);
+                }
+            }
+        }, "Feeder Thread").start();
+
+        for(Task t: tasksInOrderOfSubmission)
+        {
+            assertFalse(t.isCompleted());
+        }
+
+        // enable execution in order inverse to submission
+        for(int i = taskCount - 1; i >= 0; i --)
+        {
+            tasksInOrderOfSubmission.get(i).enableExecution();
+        }
+
+        // by now, the feeding should've been finished, so check
+        Object result = feedingResult.take();
+        assertEquals(Boolean.TRUE, result);
+
+        // wait until all tasks are executed
+        for(Task t: tasksInOrderOfSubmission)
+        {
+            t.waitForCompletion();
+        }
+
+        // make sure the execution order is correct
+        assertEquals(taskCount, executionRecord.size());
+
+        int i = 0;
+        for(String s: executionRecord)
+        {
+            assertEquals(Integer.toString(i), s);
+            i ++;
+        }
+    }
+
+    public void testShutdownAfterProcessingCurrentlyQueuedTasks() throws Exception
+    {
+        // submit a "long-lasting" task and then immediately attempt to shut down
+
+        SerialExecutor se = getSerialExecutorToTest();
+
+        Task a = new Task("A");
+
+        // execution is not enabled yet on "A", but submit it anyway, this will simulate a "long run"
+
+        se.execute(a);
+
+        // this invocation should go through, but no further tasks are executed
+        se.shutdownAfterProcessingCurrentlyQueuedTasks();
+
+        Task b = new Task("B");
+        b.enableExecution();
+
+        se.execute(b);
+
+        assertFalse(b.isCompleted(1000));
+
+        a.enableExecution();
+        a.waitForCompletion();
+
+        // make sure the first task was correctly executed
+        assertTrue(a.isCompleted());
+
+        // make sure the second task was NOT executed
+        assertFalse(b.isCompleted(1000));
+    }
+
+    public void testShutdownAfterProcessingCurrentlyQueuedTasks_TwoQueuedTasks() throws Exception
+    {
+        // submit a "long-lasting" task, then an "enabled" task and then attempt to shut down
+
+        SerialExecutor se = getSerialExecutorToTest();
+
+        Task a = new Task("A");
+
+        // execution is not enabled yet on "A", but submit it anyway, this will simulate a "long run"
+
+        se.execute(a);
+
+        Task b = new Task("B");
+        b.enableExecution();
+        se.execute(b);
+
+        // this invocation should go through, but no further tasks are executed
+        se.shutdownAfterProcessingCurrentlyQueuedTasks();
+
+        Task c = new Task("C");
+        c.enableExecution();
+        se.execute(c);
+
+        assertFalse(b.isCompleted(500));
+        assertFalse(c.isCompleted(500));
+
+        a.enableExecution();
+        a.waitForCompletion();
+
+        // make sure the first task was correctly executed
+        assertTrue(a.isCompleted());
+
+        // make sure the second task was correctly executed
+        b.waitForCompletion();
+        assertTrue(b.isCompleted());
+
+        // make sure the second task was NOT executed
+        assertFalse(c.isCompleted(1000));
+    }
+
+    public void testShutdownNow() throws Exception
+    {
+        SerialExecutor se = getSerialExecutorToTest();
+
+        se.shutdownNow();
+
+        // make sure no new tasks go through
+        Task a = new Task("A");
+        se.execute(a);
+        assertFalse(a.isCompleted(1000));
+    }
+
+    public void testShutdownNow2() throws Exception
+    {
+        SerialExecutor se = getSerialExecutorToTest();
+
+        Task a = new Task("A");
+        se.execute(a);
+
+        Thread.sleep(300);
+
+        se.shutdownNow();
+
+        assertFalse(a.isCompleted(300));
+
+        // make sure no new tasks go through
+        Task b = new Task("B");
+        se.execute(b);
+        assertFalse(b.isCompleted(500));
+    }
+
+    public void testGetExecutorThread() throws Exception
+    {
+        SerialExecutor se = getSerialExecutorToTest();
+
+        Task a = new Task("A", null, false);
+
+        // execution is not enabled yet on "A", nor the execution completion
+        se.execute(a);
+
+        a.enableExecution();
+
+        Thread.sleep(500);
+
+        Thread t = se.getExecutorThread();
+        assertNotNull(t);
+
+        a.enableExecutionCompletion();
+
+        a.waitForCompletion();
+        assertTrue(a.isCompleted());
+
+        Thread let = a.getLastExecutorThread();
+        assertEquals(t, let);
+    }
+
+    // Package protected ---------------------------------------------------------------------------
+
+    // Protected -----------------------------------------------------------------------------------
+
+    protected abstract SerialExecutor getSerialExecutorToTest();
+
+    // Private -------------------------------------------------------------------------------------
+
+    // Inner classes -------------------------------------------------------------------------------
+
+    protected class Task implements Runnable
+    {
+        private Latch executionEnabledLatch;
+        private Latch executionCompletionEnableddLatch;
+        private Latch executionCompletedLatch;
+        private String name;
+        private List<String> executionRecord;
+        private Thread lastExecutorThread;
+
+        /**
+         * By default, execution is disabled, you need to use enableExecution to enable it.
+         */
+        public Task()
+        {
+            this(null, null, true);
+        }
+
+        /**
+         * By default, execution is disabled, you need to use enableExecution to enable it.
+         */
+        public Task(String name)
+        {
+            this(name, null, true);
+        }
+
+        /**
+         * By default, execution is disabled, you need to use enableExecution to enable it.
+         */
+        public Task(String name, List<String> executionRecord)
+        {
+            this(name, executionRecord, true);
+        }
+
+        /**
+         * By default, execution is disabled, you need to use enableExecution to enable it.
+         */
+        public Task(String name, List<String> executionRecord, boolean executionCompletionEnabled)
+        {
+            if (name == null)
+            {
+                this.name = Integer.toHexString(System.identityHashCode(this));
+            }
+            else
+            {
+                this.name = name;
+            }
+
+            this.executionRecord = executionRecord;
+
+            executionEnabledLatch = new Latch();
+            executionCompletedLatch = new Latch();
+            executionCompletionEnableddLatch = new Latch();
+
+            if (executionCompletionEnabled)
+            {
+                executionCompletionEnableddLatch.release();
+            }
+        }
+
+        /**
+         * Allow this task to be executed.
+         */
+        public void enableExecution()
+        {
+            executionEnabledLatch.release();
+        }
+
+        /**
+         * Allow the execution of this task to complete.
+         */
+        public void enableExecutionCompletion()
+        {
+            executionCompletionEnableddLatch.release();
+        }
+
+        /**
+         * The method blocks, possibly forever, until the task is completely executed (the executor
+         * thread exits run()).
+         */
+        public void waitForCompletion() throws InterruptedException
+        {
+            executionCompletedLatch.acquire();
+        }
+
+        /**
+         * Test completion without waiting at all.
+         */
+        public boolean isCompleted() throws InterruptedException
+        {
+            return isCompleted(0);
+        }
+
+        /**
+         * Test completion waiting for a while.
+         */
+        public boolean isCompleted(long timeout) throws InterruptedException
+        {
+            return executionCompletedLatch.attempt(timeout);
+        }
+
+        /**
+         * @return the last thread that executed this task (may be null if the task was never
+         *         executed)
+         */
+        public Thread getLastExecutorThread()
+        {
+            return lastExecutorThread;
+        }
+
+
+        public void run()
+        {
+            try
+            {
+                executionEnabledLatch.acquire();
+            }
+            catch(InterruptedException e)
+            {
+                log.warn("interrupted when acquiring executionEnabledLatch, exiting", e);
+                return;
+            }
+
+            log.info(this + " executing ...");
+
+            lastExecutorThread = Thread.currentThread();
+
+            if (executionRecord != null)
+            {
+                executionRecord.add(getName());
+            }
+
+            try
+            {
+                executionCompletionEnableddLatch.acquire();
+            }
+            catch(InterruptedException e)
+            {
+                log.warn("interrupted when acquiring executionCompletionEnableddLatch, exiting", e);
+                return;
+            }
+
+            executionCompletedLatch.release();
+        }
+
+        public String getName()
+        {
+            return name;
+        }
+
+        public String toString()
+        {
+            return "Task " + name;
+        }
+    }
+}


Property changes on: branches/Branch_Experimental_JBMESSAGING_1356_2/tests/src/org/jboss/test/messaging/util/base/SerialExecutorTestBase.java
___________________________________________________________________
Name: svn:executable
   + *




More information about the jboss-cvs-commits mailing list