[jboss-cvs] JBoss Messaging SVN: r4984 - in branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1417: src/main/org/jboss/jms/client/state and 6 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Thu Sep 18 15:44:06 EDT 2008


Author: jbertram at redhat.com
Date: 2008-09-18 15:44:05 -0400 (Thu, 18 Sep 2008)
New Revision: 4984

Added:
   branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1417/src/main/org/jboss/messaging/util/CompatibleExecutor.java
   branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1417/src/main/org/jboss/messaging/util/ExecutorFactory.java
   branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1417/src/main/org/jboss/messaging/util/JBMThreadFactory.java
   branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1417/src/main/org/jboss/messaging/util/OrderedExecutorFactory.java
   branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1417/tests/src/org/jboss/test/messaging/util/OrderedExecutorFactoryTest.java
Modified:
   branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1417/src/main/org/jboss/jms/client/container/ClientConsumer.java
   branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1417/src/main/org/jboss/jms/client/container/ConsumerAspect.java
   branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1417/src/main/org/jboss/jms/client/container/SessionAspect.java
   branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1417/src/main/org/jboss/jms/client/state/SessionState.java
   branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1417/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactoryJNDIMapper.java
   branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1417/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
   branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1417/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java
   branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1417/tests/build.properties
Log:
One-off patch work for [JBPAPP-1183]

Modified: branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1417/src/main/org/jboss/jms/client/container/ClientConsumer.java
===================================================================
--- branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1417/src/main/org/jboss/jms/client/container/ClientConsumer.java	2008-09-18 15:26:31 UTC (rev 4983)
+++ branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1417/src/main/org/jboss/jms/client/container/ClientConsumer.java	2008-09-18 19:44:05 UTC (rev 4984)
@@ -24,6 +24,7 @@
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
+import java.util.concurrent.Executor;
 
 import javax.jms.IllegalStateException;
 import javax.jms.JMSException;
@@ -45,8 +46,6 @@
 import org.jboss.messaging.util.prioritylinkedlist.BasicPriorityLinkedList;
 import org.jboss.messaging.util.prioritylinkedlist.PriorityLinkedList;
 
-import EDU.oswego.cs.dl.util.concurrent.QueuedExecutor;
-
 /**
  * @author <a href="mailto:ovidiu at feodorov.com">Ovidiu Feodorov</a>
  * @author <a href="mailto:tim.fox at jboss.com">Tim Fox/a>
@@ -118,9 +117,10 @@
          return false;
       }
    }
-        
+   
+   
    //This is static so it can be called by the asf layer too
-   public static void callOnMessage(SessionDelegate sess,
+   public static void callOnMessageStatic(SessionDelegate sess,
                                     MessageListener listener,
                                     String consumerID,
                                     String queueName,
@@ -186,6 +186,75 @@
          if (trace) { log.trace("Called postDeliver"); }
       }   
    }
+        
+   //Changed it to non-static
+   public void callOnMessage(SessionDelegate sess,
+                                    MessageListener listener,
+                                    String consumerID,
+                                    String queueName,
+                                    boolean isConnectionConsumer,
+                                    MessageProxy m,
+                                    int ackMode,
+                                    int maxDeliveries,
+                                    SessionDelegate connectionConsumerSession,
+                                    boolean shouldAck)
+      throws JMSException
+   {      
+      if (checkExpiredOrReachedMaxdeliveries(m, connectionConsumerSession!=null?connectionConsumerSession:sess, maxDeliveries, shouldAck))
+      {
+         //Message has been cancelled
+         return;
+      }
+      
+      DeliveryInfo deliveryInfo =
+         new DeliveryInfo(m, consumerID, queueName, connectionConsumerSession, shouldAck);
+            
+      m.incDeliveryCount();
+      
+      // If this is the callback-handler for a connection consumer we don't want to acknowledge or
+      // add anything to the tx for this session.
+      if (!isConnectionConsumer)
+      {
+         //We need to call preDeliver, deliver the message then call postDeliver - this is because
+         //it is legal to call session.recover(), or session.rollback() from within the onMessage()
+         //method in which case the last message needs to be delivered so it needs to know about it
+         sess.preDeliver(deliveryInfo);
+      } 
+      
+      try
+      {
+         if (trace) { log.trace("calling listener's onMessage(" + m + ")"); }
+
+         onMessageThread = Thread.currentThread();
+         listener.onMessage(m);
+
+         if (trace) { log.trace("listener's onMessage() finished"); }
+      }
+      catch (RuntimeException e)
+      {
+         long id = m.getMessage().getMessageID();
+
+         log.error("RuntimeException was thrown from onMessage, " + id + " will be redelivered", e);
+         
+         // See JMS 1.1 spec 4.5.2
+
+         if (ackMode == Session.AUTO_ACKNOWLEDGE || ackMode == Session.DUPS_OK_ACKNOWLEDGE)
+         {              
+            sess.recover();
+         }
+      }   
+
+      // If this is the callback-handler for a connection consumer we don't want to acknowledge
+      // or add anything to the tx for this session
+      if (!isConnectionConsumer)
+      {
+      	if (trace) { log.trace("Calling postDeliver"); }
+      	
+         sess.postDeliver();
+         
+         if (trace) { log.trace("Called postDeliver"); }
+      }   
+   }
    
    // Attributes -----------------------------------------------------------------------------------
       
@@ -206,7 +275,7 @@
    private int ackMode;
    private boolean closed;
    private Object mainLock;
-   private QueuedExecutor sessionExecutor;
+   private Executor sessionExecutor;
    private boolean listenerRunning;
    private int maxDeliveries;
    private String queueName;
@@ -219,7 +288,8 @@
    private boolean paused;      
    private int consumeCount;
    private boolean firstTime = true;
-   
+   private volatile Thread onMessageThread;
+
    public int getBufferSize()
    {
       return buffer.size();
@@ -230,7 +300,7 @@
    public ClientConsumer(boolean isCC, int ackMode,                                
                          SessionDelegate sess, ConsumerDelegate cons, String consumerID,
                          String queueName,
-                         int bufferSize, QueuedExecutor sessionExecutor,
+                         int bufferSize, Executor sessionExecutor,
                          int maxDeliveries, boolean shouldAck,
                          long redeliveryDelay)
    {
@@ -693,7 +763,7 @@
    {
       // Wait for any onMessage() executions to complete
 
-      if (Thread.currentThread().equals(sessionExecutor.getThread()))
+      if (Thread.currentThread() == onMessageThread)
       {
          // the current thread already closing this ClientConsumer (this happens when the
          // session is closed from within the MessageListener.onMessage(), for example), so no need
@@ -702,29 +772,17 @@
       }
 
       Future result = new Future();
-      
-      try
-      {
-         sessionExecutor.execute(new Closer(result));
 
-         if (trace) { log.trace(this + " blocking wait for Closer execution"); }
-         result.getResult();
-         if (trace) { log.trace(this + " got Closer result"); }
-      }
-      catch (InterruptedException e)
-      {         
-      }
+      sessionExecutor.execute(new Closer(result));
+
+      if (trace) { log.trace(this + " blocking wait for Closer execution"); }
+      result.getResult();
+      if (trace) { log.trace(this + " got Closer result"); }
    }
 
    private void queueRunner(ListenerRunner runner)
    {
-      try
-      {
-         this.sessionExecutor.execute(runner);
-      }
-      catch (InterruptedException e)
-      {         
-      }
+      this.sessionExecutor.execute(runner);
    }
    
    private void messageAdded()

Modified: branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1417/src/main/org/jboss/jms/client/container/ConsumerAspect.java
===================================================================
--- branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1417/src/main/org/jboss/jms/client/container/ConsumerAspect.java	2008-09-18 15:26:31 UTC (rev 4983)
+++ branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1417/src/main/org/jboss/jms/client/container/ConsumerAspect.java	2008-09-18 19:44:05 UTC (rev 4984)
@@ -21,6 +21,8 @@
   */
 package org.jboss.jms.client.container;
 
+import java.util.concurrent.Executor;
+
 import javax.jms.MessageListener;
 
 import org.jboss.aop.joinpoint.Invocation;
@@ -36,6 +38,7 @@
 import org.jboss.jms.exception.MessagingShutdownException;
 import org.jboss.logging.Logger;
 import org.jboss.messaging.util.MessageQueueNameHelper;
+import org.jboss.messaging.util.OrderedExecutorFactory;
 
 import EDU.oswego.cs.dl.util.concurrent.QueuedExecutor;
 
@@ -82,7 +85,7 @@
       ConsumerState consumerState = (ConsumerState)((DelegateSupport)consumerDelegate).getState();
       String consumerID = consumerState.getConsumerID();
       int prefetchSize = consumerState.getBufferSize();
-      QueuedExecutor sessionExecutor = sessionState.getExecutor();
+      Executor executor = sessionState.getExecutor();
       int maxDeliveries = consumerState.getMaxDeliveries();
       long redeliveryDelay = consumerState.getRedeliveryDelay();
       
@@ -109,7 +112,7 @@
       ClientConsumer messageHandler =
          new ClientConsumer(isCC, sessionState.getAcknowledgeMode(),
                             sessionDelegate, consumerDelegate, consumerID, queueName,
-                            prefetchSize, sessionExecutor, maxDeliveries, consumerState.isShouldAck(),
+                            prefetchSize, executor, maxDeliveries, consumerState.isShouldAck(),
                             redeliveryDelay);
       
       sessionState.addCallbackHandler(messageHandler);

Modified: branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1417/src/main/org/jboss/jms/client/container/SessionAspect.java
===================================================================
--- branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1417/src/main/org/jboss/jms/client/container/SessionAspect.java	2008-09-18 15:26:31 UTC (rev 4983)
+++ branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1417/src/main/org/jboss/jms/client/container/SessionAspect.java	2008-09-18 19:44:05 UTC (rev 4984)
@@ -827,8 +827,8 @@
          AsfMessageHolder holder = (AsfMessageHolder)msgs.removeFirst();
 
          if (trace) { log.trace("sending " + holder.msg + " to the message listener" ); }
-         
-         ClientConsumer.callOnMessage(del, state.getDistinguishedListener(), holder.consumerID,
+
+         ClientConsumer.callOnMessageStatic(del, state.getDistinguishedListener(), holder.consumerID,
                                               holder.queueName, false,
                                               holder.msg, ackMode, holder.maxDeliveries,
                                               holder.connectionConsumerDelegate, holder.shouldAck);                          

Modified: branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1417/src/main/org/jboss/jms/client/state/SessionState.java
===================================================================
--- branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1417/src/main/org/jboss/jms/client/state/SessionState.java	2008-09-18 15:26:31 UTC (rev 4983)
+++ branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1417/src/main/org/jboss/jms/client/state/SessionState.java	2008-09-18 19:44:05 UTC (rev 4984)
@@ -29,6 +29,8 @@
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
 
 import javax.jms.MessageListener;
 import javax.jms.Session;
@@ -46,7 +48,10 @@
 import org.jboss.jms.tx.MessagingXAResource;
 import org.jboss.jms.tx.ResourceManager;
 import org.jboss.logging.Logger;
-import org.jboss.messaging.util.JBMExecutor;
+import org.jboss.messaging.util.CompatibleExecutor;
+import org.jboss.messaging.util.ExecutorFactory;
+import org.jboss.messaging.util.JBMThreadFactory;
+import org.jboss.messaging.util.OrderedExecutorFactory;
 import org.jboss.messaging.util.Version;
 
 import EDU.oswego.cs.dl.util.concurrent.LinkedQueue;
@@ -83,8 +88,10 @@
    private MessagingXAResource xaResource;
    private Object currentTxId;
 
-   // Executor used for executing onMessage methods
-   private JBMExecutor executor;
+   // ExecutorFactory used for executing onMessage methods
+   private static final ExecutorFactory executorFactory = new OrderedExecutorFactory(
+           Executors.newCachedThreadPool(new JBMThreadFactory("session-state")));
+   private CompatibleExecutor executor;
 
    private boolean recoverCalled;
    
@@ -142,13 +149,13 @@
          currentTxId = parent.getResourceManager().createLocalTx();
       }
 
-      executor = new JBMExecutor("jbm-client-session-" + sessionID);
-
       clientAckList = new ArrayList();
 
       // TODO could optimise this to use the same map of callbackmanagers (which holds refs
       // to callbackhandlers) in the connection, instead of maintaining another map
       callbackHandlers = new HashMap();
+      
+      executor = executorFactory.getExecutor("jbm-client-session-threads");
    }
 
    // HierarchicalState implementation -------------------------------------------------------------
@@ -424,7 +431,7 @@
       return xaResource;
    }
 
-   public JBMExecutor getExecutor()
+   public CompatibleExecutor getExecutor()
    {
       return executor;
    }

Modified: branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1417/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactoryJNDIMapper.java
===================================================================
--- branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1417/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactoryJNDIMapper.java	2008-09-18 15:26:31 UTC (rev 4983)
+++ branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1417/src/main/org/jboss/jms/server/connectionfactory/ConnectionFactoryJNDIMapper.java	2008-09-18 19:44:05 UTC (rev 4984)
@@ -29,6 +29,7 @@
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.Executors;
 
 import javax.naming.Context;
 import javax.naming.InitialContext;
@@ -51,13 +52,14 @@
 import org.jboss.messaging.core.contract.ClusterNotification;
 import org.jboss.messaging.core.contract.ClusterNotificationListener;
 import org.jboss.messaging.core.contract.Replicator;
+import org.jboss.messaging.util.CompatibleExecutor;
+import org.jboss.messaging.util.ExecutorFactory;
+import org.jboss.messaging.util.JBMThreadFactory;
 import org.jboss.messaging.util.JNDIUtil;
-import org.jboss.messaging.util.NamedThreadQueuedExecutor;
+import org.jboss.messaging.util.OrderedExecutorFactory;
 import org.jboss.messaging.util.Version;
 import org.jboss.remoting.InvokerLocator;
 
-import EDU.oswego.cs.dl.util.concurrent.QueuedExecutor;
-
 /**
  * @author <a href="mailto:ovidiu at feodorov.com">Ovidiu Feodorov</a>
  * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
@@ -77,6 +79,9 @@
 
    private static boolean trace = log.isTraceEnabled();
 
+   private static final ExecutorFactory executorFactory = new OrderedExecutorFactory(
+           Executors.newCachedThreadPool(new JBMThreadFactory("conn-factory-jndi-mapper")));
+   
    // Attributes -----------------------------------------------------------------------------------
 
    protected Context initialContext;
@@ -90,7 +95,7 @@
 
    private Replicator replicator;
    
-   private QueuedExecutor notifyExecutor;
+   private CompatibleExecutor notifyExecutor;
 
    // Constructors ---------------------------------------------------------------------------------
 
@@ -99,7 +104,7 @@
       this.serverPeer = serverPeer;
       endpoints = new HashMap();
       delegates = new HashMap();          
-      notifyExecutor = new NamedThreadQueuedExecutor("jbm-cf-jndimapper");
+      notifyExecutor = executorFactory.getExecutor("jbm-cf-jndimapper");
    }
 
    // ConnectionFactoryManager implementation ------------------------------------------------------
@@ -424,14 +429,7 @@
       
       //Run on a different thread to prevent distributed deadlock when multiple nodes are starting together
       //and deploying connection factories concurrently
-      try
-      {
-         notifyExecutor.execute(new NotifyRunner());
-      }
-      catch (InterruptedException e)
-      {
-         //Ignore
-      }
+      notifyExecutor.execute(new NotifyRunner());
    }
 
    // Public ---------------------------------------------------------------------------------------

Modified: branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1417/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
===================================================================
--- branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1417/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java	2008-09-18 15:26:31 UTC (rev 4983)
+++ branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1417/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java	2008-09-18 19:44:05 UTC (rev 4984)
@@ -32,6 +32,7 @@
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.Executors;
 
 import javax.jms.IllegalStateException;
 import javax.jms.InvalidDestinationException;
@@ -82,16 +83,18 @@
 import org.jboss.messaging.core.impl.tx.TransactionException;
 import org.jboss.messaging.core.impl.tx.TransactionRepository;
 import org.jboss.messaging.core.impl.tx.TxCallback;
+import org.jboss.messaging.util.CompatibleExecutor;
 import org.jboss.messaging.util.ExceptionUtil;
+import org.jboss.messaging.util.ExecutorFactory;
 import org.jboss.messaging.util.GUIDGenerator;
+import org.jboss.messaging.util.JBMThreadFactory;
 import org.jboss.messaging.util.MessageQueueNameHelper;
-import org.jboss.messaging.util.NamedThreadQueuedExecutor;
+import org.jboss.messaging.util.OrderedExecutorFactory;
 import org.jboss.remoting.callback.Callback;
 import org.jboss.remoting.callback.ServerInvokerCallbackHandler;
 
 import EDU.oswego.cs.dl.util.concurrent.ConcurrentHashMap;
 import EDU.oswego.cs.dl.util.concurrent.LinkedQueue;
-import EDU.oswego.cs.dl.util.concurrent.QueuedExecutor;
 
 /**
  * The server side representation of a JMS session.
@@ -130,6 +133,9 @@
 
    // Static ---------------------------------------------------------------------------------------
 
+
+   private static final ExecutorFactory executorFactory = new OrderedExecutorFactory(
+           Executors.newCachedThreadPool(new JBMThreadFactory("server-session-endpoint")));
    // Attributes -----------------------------------------------------------------------------------
 
    private boolean trace = log.isTraceEnabled();
@@ -169,7 +175,7 @@
    private long deliveryIdSequence;
 
    //Temporary until we have our own NIO transport
-   QueuedExecutor executor;
+   CompatibleExecutor executor;
 
    private LinkedQueue toDeliver = new LinkedQueue();
 
@@ -187,7 +193,7 @@
    {
       this.id = sessionID;
       
-      this.executor = new NamedThreadQueuedExecutor("jbm-server-session-" + sessionID);
+      this.executor = executorFactory.getExecutor("jbm-server-session-" + sessionID);
 
       this.connectionEndpoint = connectionEndpoint;
 

Modified: branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1417/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java
===================================================================
--- branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1417/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java	2008-09-18 15:26:31 UTC (rev 4983)
+++ branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1417/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java	2008-09-18 19:44:05 UTC (rev 4984)
@@ -37,6 +37,7 @@
 import java.util.Properties;
 import java.util.Set;
 import java.util.Vector;
+import java.util.concurrent.Executors;
 
 import javax.management.ListenerNotFoundException;
 import javax.management.MBeanNotificationInfo;
@@ -75,14 +76,16 @@
 import org.jboss.messaging.core.impl.tx.TransactionRepository;
 import org.jboss.messaging.core.impl.tx.TxCallback;
 import org.jboss.messaging.util.ClearableSemaphore;
+import org.jboss.messaging.util.CompatibleExecutor;
 import org.jboss.messaging.util.ConcurrentHashSet;
-import org.jboss.messaging.util.NamedThreadQueuedExecutor;
+import org.jboss.messaging.util.ExecutorFactory;
+import org.jboss.messaging.util.JBMThreadFactory;
+import org.jboss.messaging.util.OrderedExecutorFactory;
 import org.jboss.messaging.util.StreamUtils;
 import org.jgroups.Address;
 import org.jgroups.View;
 
 import EDU.oswego.cs.dl.util.concurrent.ConcurrentHashMap;
-import EDU.oswego.cs.dl.util.concurrent.QueuedExecutor;
 import EDU.oswego.cs.dl.util.concurrent.ReadWriteLock;
 import EDU.oswego.cs.dl.util.concurrent.ReentrantWriterPreferenceReadWriteLock;
 
@@ -110,6 +113,8 @@
    
    private static final long SEMAPHORE_ACQUIRE_TIMEOUT = 10000;
    
+   private static final ExecutorFactory executorFactory = new OrderedExecutorFactory(
+           Executors.newCachedThreadPool(new JBMThreadFactory("msg-post-office")));
    //End only used in testing
 
    // Static ---------------------------------------------------------------------------------------
@@ -217,9 +222,9 @@
    private ServerPeer serverPeer;
    
    //Note this MUST be a queued executor to ensure replicate repsonses arrive back in order
-   private QueuedExecutor replyExecutor;
+   private CompatibleExecutor replyExecutor;
    
-   private QueuedExecutor replicateResponseExecutor;
+   private CompatibleExecutor replicateResponseExecutor;
    
    private volatile int failoverNodeID = -1;
    
@@ -1632,10 +1637,9 @@
          leftSet = new ConcurrentHashSet();
       }
       
-      //NOTE, MUST be a QueuedExecutor so we ensure that responses arrive back in order
-      replyExecutor = new NamedThreadQueuedExecutor("jbm-reply-executor");
+      replyExecutor = executorFactory.getExecutor("jbm-reply-executor");
       
-      replicateResponseExecutor = new NamedThreadQueuedExecutor("jbm-response-executor");
+      replicateResponseExecutor = executorFactory.getExecutor("jbm-response-executor");
    }
    
    private void deInit()

Copied: branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1417/src/main/org/jboss/messaging/util/CompatibleExecutor.java (from rev 4937, branches/Branch_JBossMessaging_1_4_0_SP3_CP/src/main/org/jboss/messaging/util/CompatibleExecutor.java)
===================================================================
--- branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1417/src/main/org/jboss/messaging/util/CompatibleExecutor.java	                        (rev 0)
+++ branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1417/src/main/org/jboss/messaging/util/CompatibleExecutor.java	2008-09-18 19:44:05 UTC (rev 4984)
@@ -0,0 +1,39 @@
+/*
+ * JBoss, Home of Professional Open Source Copyright 2005, JBoss Inc., and
+ * individual contributors as indicated by the @authors tag. See the
+ * copyright.txt in the distribution for a full listing of individual
+ * contributors.
+ * 
+ * This is free software; you can redistribute it and/or modify it under the
+ * terms of the GNU Lesser General Public License as published by the Free
+ * Software Foundation; either version 2.1 of the License, or (at your option)
+ * any later version.
+ * 
+ * This software is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+ * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
+ * details.
+ * 
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this software; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA, or see the FSF
+ * site: http://www.fsf.org.
+ */
+package org.jboss.messaging.util;
+
+import java.util.concurrent.Executor;
+
+public interface CompatibleExecutor extends Executor
+{
+
+   void clearAllExceptCurrentTask();
+
+   void clearClassLoader();
+
+   void shutdownNow();
+
+   void shutdownAfterProcessingCurrentlyQueuedTasks();
+
+   String getName();
+
+}

Copied: branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1417/src/main/org/jboss/messaging/util/ExecutorFactory.java (from rev 4937, branches/Branch_JBossMessaging_1_4_0_SP3_CP/src/main/org/jboss/messaging/util/ExecutorFactory.java)
===================================================================
--- branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1417/src/main/org/jboss/messaging/util/ExecutorFactory.java	                        (rev 0)
+++ branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1417/src/main/org/jboss/messaging/util/ExecutorFactory.java	2008-09-18 19:44:05 UTC (rev 4984)
@@ -0,0 +1,38 @@
+/*
+ * JBoss, Home of Professional Open Source Copyright 2005-2008, Red Hat
+ * Middleware LLC, and individual contributors by the @authors tag. See the
+ * copyright.txt in the distribution for a full listing of individual
+ * contributors.
+ * 
+ * This is free software; you can redistribute it and/or modify it under the
+ * terms of the GNU Lesser General Public License as published by the Free
+ * Software Foundation; either version 2.1 of the License, or (at your option)
+ * any later version.
+ * 
+ * This software is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+ * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
+ * details.
+ * 
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this software; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA, or see the FSF
+ * site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.util;
+
+import java.util.concurrent.Executor;
+
+/*
+ * 
+ * A ExecutorFactory
+ * 
+ * Copied from JBM2.0
+ * 
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ */
+public interface ExecutorFactory
+{
+   CompatibleExecutor getExecutor(String name);
+}

Copied: branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1417/src/main/org/jboss/messaging/util/JBMThreadFactory.java (from rev 4937, branches/Branch_JBossMessaging_1_4_0_SP3_CP/src/main/org/jboss/messaging/util/JBMThreadFactory.java)
===================================================================
--- branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1417/src/main/org/jboss/messaging/util/JBMThreadFactory.java	                        (rev 0)
+++ branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1417/src/main/org/jboss/messaging/util/JBMThreadFactory.java	2008-09-18 19:44:05 UTC (rev 4984)
@@ -0,0 +1,57 @@
+/*
+ * JBoss, Home of Professional Open Source Copyright 2005-2008, Red Hat
+ * Middleware LLC, and individual contributors by the @authors tag. See the
+ * copyright.txt in the distribution for a full listing of individual
+ * contributors.
+ * 
+ * This is free software; you can redistribute it and/or modify it under the
+ * terms of the GNU Lesser General Public License as published by the Free
+ * Software Foundation; either version 2.1 of the License, or (at your option)
+ * any later version.
+ * 
+ * This software is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+ * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
+ * details.
+ * 
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this software; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA, or see the FSF
+ * site: http://www.fsf.org.
+ */
+package org.jboss.messaging.util;
+
+import java.util.concurrent.ThreadFactory;
+
+/*
+ * 
+ * A JBMThreadFactory
+ * 
+ * Copied from JBM2.0
+ * 
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ */
+public class JBMThreadFactory implements ThreadFactory
+{
+   private ThreadGroup group;
+
+   public JBMThreadFactory(final String groupName)
+   {
+      this.group = new ThreadGroup(groupName);
+   }
+
+   public Thread newThread(final Runnable command)
+   {
+      Thread t = new Thread(group, command);
+
+      if (command instanceof CompatibleExecutor)
+      {
+         t.setName(((CompatibleExecutor)command).getName());
+      }
+
+      // Don't want to prevent VM from exiting
+      t.setDaemon(true);
+
+      return t;
+   }
+}

Copied: branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1417/src/main/org/jboss/messaging/util/OrderedExecutorFactory.java (from rev 4937, branches/Branch_JBossMessaging_1_4_0_SP3_CP/src/main/org/jboss/messaging/util/OrderedExecutorFactory.java)
===================================================================
--- branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1417/src/main/org/jboss/messaging/util/OrderedExecutorFactory.java	                        (rev 0)
+++ branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1417/src/main/org/jboss/messaging/util/OrderedExecutorFactory.java	2008-09-18 19:44:05 UTC (rev 4984)
@@ -0,0 +1,189 @@
+/*
+ * JBoss, Home of Professional Open Source Copyright 2005-2008, Red Hat
+ * Middleware LLC, and individual contributors by the @authors tag. See the
+ * copyright.txt in the distribution for a full listing of individual
+ * contributors.
+ * 
+ * This is free software; you can redistribute it and/or modify it under the
+ * terms of the GNU Lesser General Public License as published by the Free
+ * Software Foundation; either version 2.1 of the License, or (at your option)
+ * any later version.
+ * 
+ * This software is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+ * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
+ * details.
+ * 
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this software; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA, or see the FSF
+ * site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.util;
+
+import java.security.AccessController;
+import java.security.PrivilegedAction;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.Set;
+import java.util.concurrent.Executor;
+
+/*
+ * 
+ * This factory creates a hierarchy of Executor which shares the threads of the
+ * parent Executor (typically, the root parent is a Thread pool).
+ * 
+ * Copied from JBM2.0
+ * 
+ * @author <a href="david.lloyd at jboss.com">David Lloyd</a>
+ * 
+ * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
+ * 
+ * @version <tt>$Revision$</tt>
+ */
+public final class OrderedExecutorFactory implements ExecutorFactory
+{
+   private final Executor parent;
+
+   private final Set<ChildExecutor> runningChildren = Collections.synchronizedSet(new HashSet<ChildExecutor>());
+
+   public OrderedExecutorFactory(final Executor parent)
+   {
+      this.parent = parent;
+   }
+
+   public CompatibleExecutor getExecutor(String name)
+   {
+      return new ChildExecutor(name);
+   }
+
+   private final class ChildExecutor implements CompatibleExecutor, Runnable
+   {
+      private final LinkedList<Runnable> tasks = new LinkedList<Runnable>();
+
+      private boolean needToSetClassLoader = true;
+
+      private ClassLoader tcl;
+
+      private boolean shutdown = false;
+
+      private String name = "default-thread";
+
+      public ChildExecutor(String n)
+      {
+         name = n;
+      }
+
+      public void execute(Runnable command)
+      {
+         if (needToSetClassLoader)
+         {
+            tcl = AccessController.doPrivileged(new PrivilegedAction<ClassLoader>()
+            {
+               public ClassLoader run()
+               {
+                  return Thread.currentThread().getContextClassLoader();
+               }
+            });
+         }
+         synchronized (tasks)
+         {
+            if (!shutdown)
+            {
+               tasks.add(command);
+               if (tasks.size() == 1 && runningChildren.add(this))
+               {
+                  parent.execute(this);
+               }
+            }
+         }
+      }
+
+      @SuppressWarnings("unchecked")
+      public void run()
+      {
+         if (needToSetClassLoader)
+         {
+            needToSetClassLoader = false;
+
+            AccessController.doPrivileged(new PrivilegedAction()
+            {
+               public Object run()
+               {
+                  Thread.currentThread().setContextClassLoader(tcl);
+                  return null;
+               }
+            });
+         }
+
+         for (;;)
+         {
+            final Runnable task;
+            synchronized (tasks)
+            {
+               task = tasks.poll();
+               if (task == null)
+               {
+                  runningChildren.remove(this);
+                  tasks.notify();
+                  return;
+               }
+            }
+            task.run();
+         }
+      }
+
+      public void clearAllExceptCurrentTask()
+      {
+         synchronized (tasks)
+         {
+            tasks.clear();
+         }
+      }
+
+      public void clearClassLoader()
+      {
+         execute(new Runnable()
+         {
+            @SuppressWarnings("unchecked")
+            public void run()
+            {
+               needToSetClassLoader = true;
+            }
+         });
+      }
+
+      // old behavior also terminates the thread. with a pool we shouldn't do
+      // this.
+      public void shutdownNow()
+      {
+         tasks.clear();
+      }
+
+      public void shutdownAfterProcessingCurrentlyQueuedTasks()
+      {
+         synchronized (tasks)
+         {
+            shutdown = true;
+            while (tasks.size() > 0)
+            {
+               try
+               {
+                  tasks.wait();
+               }
+               catch (InterruptedException e)
+               {
+               }
+            }
+         }
+      }
+
+      public String getName()
+      {
+         return name;
+      }
+
+   }
+}

Modified: branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1417/tests/build.properties
===================================================================
--- branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1417/tests/build.properties	2008-09-18 15:26:31 UTC (rev 4983)
+++ branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1417/tests/build.properties	2008-09-18 19:44:05 UTC (rev 4984)
@@ -2,5 +2,6 @@
 # Local overrides for the builds initiated by build.sh
 #
 
-#test.bind.address=192.168.1.2
+test.bind.address=localhost
+jgroups.bind_addr=localhost
 

Copied: branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1417/tests/src/org/jboss/test/messaging/util/OrderedExecutorFactoryTest.java (from rev 4937, branches/Branch_JBossMessaging_1_4_0_SP3_CP/tests/src/org/jboss/test/messaging/util/OrderedExecutorFactoryTest.java)
===================================================================
--- branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1417/tests/src/org/jboss/test/messaging/util/OrderedExecutorFactoryTest.java	                        (rev 0)
+++ branches/Branch_JBossMessaging_1_4_0_SP3_CP03_JBMESSAGING_1417/tests/src/org/jboss/test/messaging/util/OrderedExecutorFactoryTest.java	2008-09-18 19:44:05 UTC (rev 4984)
@@ -0,0 +1,93 @@
+/*
+ * JBoss, Home of Professional Open Source Copyright 2005, JBoss Inc., and
+ * individual contributors as indicated by the @authors tag. See the
+ * copyright.txt in the distribution for a full listing of individual
+ * contributors.
+ * 
+ * This is free software; you can redistribute it and/or modify it under the
+ * terms of the GNU Lesser General Public License as published by the Free
+ * Software Foundation; either version 2.1 of the License, or (at your option)
+ * any later version.
+ * 
+ * This software is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+ * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
+ * details.
+ * 
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this software; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA, or see the FSF
+ * site: http://www.fsf.org.
+ */
+package org.jboss.test.messaging.util;
+
+import java.util.ArrayList;
+import java.util.concurrent.Executors;
+
+import org.jboss.messaging.util.CompatibleExecutor;
+import org.jboss.messaging.util.JBMThreadFactory;
+import org.jboss.messaging.util.OrderedExecutorFactory;
+import org.jboss.test.messaging.MessagingTestCase;
+
+public class OrderedExecutorFactoryTest extends MessagingTestCase
+{
+
+   private ArrayList<CounterRunnable> exeQueue = new ArrayList<CounterRunnable>();
+
+   public OrderedExecutorFactoryTest(String name)
+   {
+      super(name);
+   }
+
+   public void testExecutionOrder()
+   {
+      OrderedExecutorFactory factory = new OrderedExecutorFactory(Executors.newCachedThreadPool(new JBMThreadFactory("test-thread-factory")));
+      CompatibleExecutor executor = factory.getExecutor("test executor");
+
+      final int numTasks = 200000;
+
+      CounterRunnable[] tasks = new CounterRunnable[numTasks];
+
+      for (int i = 0; i < numTasks; ++i)
+      {
+         tasks[i] = new CounterRunnable(this);
+      }
+      exeQueue.clear();
+      for (int i = 0; i < numTasks; ++i)
+      {
+         executor.execute(tasks[i]);
+      }
+
+      executor.shutdownAfterProcessingCurrentlyQueuedTasks();
+      assertTrue(exeQueue.size() == numTasks);
+
+      for (int i = 0; i < numTasks; ++i)
+      {
+         CounterRunnable finTask = exeQueue.get(i);
+         assertTrue(finTask == tasks[i]);
+      }
+
+      exeQueue.clear();
+   }
+
+   class CounterRunnable implements Runnable
+   {
+      private OrderedExecutorFactoryTest myUser;
+
+      public CounterRunnable(OrderedExecutorFactoryTest user)
+      {
+         myUser = user;
+      }
+
+      public void run()
+      {
+         myUser.registerOrder(this);
+      }
+
+   }
+
+   public void registerOrder(CounterRunnable task)
+   {
+      exeQueue.add(task);
+   }
+}




More information about the jboss-cvs-commits mailing list