[jboss-cvs] JBoss Messaging SVN: r4014 - in trunk: src/main/org/jboss/messaging/core/remoting and 9 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Mon Apr 7 08:28:48 EDT 2008


Author: jmesnil
Date: 2008-04-07 08:28:48 -0400 (Mon, 07 Apr 2008)
New Revision: 4014

Added:
   trunk/src/main/org/jboss/messaging/core/remoting/PacketHandlerRegistrationListener.java
   trunk/src/main/org/jboss/messaging/util/OrderedExecutorFactory.java
   trunk/tests/src/org/jboss/messaging/core/remoting/impl/mina/integration/test/MinaHandlerOrderingTest.java
Modified:
   trunk/src/main/org/jboss/messaging/core/client/impl/ClientBrowserImpl.java
   trunk/src/main/org/jboss/messaging/core/client/impl/ClientConnectionImpl.java
   trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java
   trunk/src/main/org/jboss/messaging/core/client/impl/ClientProducerImpl.java
   trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java
   trunk/src/main/org/jboss/messaging/core/client/impl/RemotingConnection.java
   trunk/src/main/org/jboss/messaging/core/client/impl/RemotingConnectionImpl.java
   trunk/src/main/org/jboss/messaging/core/remoting/PacketDispatcher.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/PacketDispatcherImpl.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/codec/AbstractPacketCodec.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/FilterChainSupport.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaConnector.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaHandler.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaService.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/AbstractPacket.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/CreateConnectionResponse.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/Packet.java
   trunk/src/main/org/jboss/messaging/core/server/impl/DeliveryImpl.java
   trunk/src/main/org/jboss/messaging/core/server/impl/ServerProducerImpl.java
   trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
   trunk/tests/src/org/jboss/messaging/core/remoting/impl/mina/integration/test/MinaHandlerTest.java
   trunk/tests/src/org/jboss/messaging/core/remoting/impl/wireformat/test/unit/PacketTypeTest.java
   trunk/tests/src/org/jboss/messaging/core/remoting/test/unit/PacketDispatcherTest.java
Log:
* replaced MINA's executor filter by a custom-made OrderedExecutorFactory to have more flexibility on thread execution order (correlated by an executorID on the Packet interface)

Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientBrowserImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientBrowserImpl.java	2008-04-04 16:34:55 UTC (rev 4013)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientBrowserImpl.java	2008-04-07 12:28:48 UTC (rev 4014)
@@ -81,7 +81,7 @@
       
       try
       {
-         remotingConnection.send(id, new CloseMessage());
+         remotingConnection.send(id, session.getID(), new CloseMessage());
       }
       finally
       {
@@ -100,7 +100,7 @@
    {
       checkClosed();
       
-      remotingConnection.send(id, new SessionBrowserResetMessage());
+      remotingConnection.send(id, session.getID(), new SessionBrowserResetMessage());
    }
 
    public boolean hasNextMessage() throws MessagingException
@@ -108,7 +108,7 @@
       checkClosed();
       
       SessionBrowserHasNextMessageResponseMessage response =
-         (SessionBrowserHasNextMessageResponseMessage)remotingConnection.send(id, new SessionBrowserHasNextMessageMessage());
+         (SessionBrowserHasNextMessageResponseMessage)remotingConnection.send(id, session.getID(), new SessionBrowserHasNextMessageMessage());
       
       return response.hasNext();
    }
@@ -118,7 +118,7 @@
       checkClosed();
       
       SessionBrowserNextMessageResponseMessage response =
-         (SessionBrowserNextMessageResponseMessage)remotingConnection.send(id, new SessionBrowserNextMessageMessage());
+         (SessionBrowserNextMessageResponseMessage)remotingConnection.send(id, session.getID(), new SessionBrowserNextMessageMessage());
       
       return response.getMessage();
    }
@@ -128,7 +128,7 @@
       checkClosed();
       
       SessionBrowserNextMessageBlockResponseMessage response =
-         (SessionBrowserNextMessageBlockResponseMessage)remotingConnection.send(id, new SessionBrowserNextMessageBlockMessage(maxMessages));
+         (SessionBrowserNextMessageBlockResponseMessage)remotingConnection.send(id, session.getID(), new SessionBrowserNextMessageBlockMessage(maxMessages));
       return response.getMessages();
    }
 

Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientConnectionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientConnectionImpl.java	2008-04-04 16:34:55 UTC (rev 4013)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientConnectionImpl.java	2008-04-07 12:28:48 UTC (rev 4014)
@@ -133,7 +133,7 @@
    {
       checkClosed();
        
-      remotingConnection.send(id, new ConnectionStartMessage(), true);
+      remotingConnection.send(id, id, new ConnectionStartMessage(), true);
    }
    
    public void stop() throws MessagingException

Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java	2008-04-04 16:34:55 UTC (rev 4013)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java	2008-04-07 12:28:48 UTC (rev 4014)
@@ -245,7 +245,7 @@
          
          receiverThread = null;
 
-         remotingConnection.send(id, new CloseMessage());
+         remotingConnection.send(id, session.getID(), new CloseMessage());
 
          remotingConnection.getPacketDispatcher().unregister(id);
       }
@@ -369,7 +369,7 @@
          {
             tokensToSend = 0;
             
-            remotingConnection.send(id, new ConsumerFlowTokenMessage(tokenBatchSize), true);                  
+            remotingConnection.send(id, session.getID(), new ConsumerFlowTokenMessage(tokenBatchSize), true);                  
          }
       }
    }

Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientProducerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientProducerImpl.java	2008-04-04 16:34:55 UTC (rev 4013)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientProducerImpl.java	2008-04-07 12:28:48 UTC (rev 4014)
@@ -139,7 +139,7 @@
    		windowSize--;
    	}
    	
-   	remotingConnection.send(id, message, !msg.isDurable());
+   	remotingConnection.send(id, session.getID(), message, !msg.isDurable());
    	 	   	
    	if (rateLimiter != null)
    	{

Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java	2008-04-04 16:34:55 UTC (rev 4013)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java	2008-04-07 12:28:48 UTC (rev 4014)
@@ -283,7 +283,7 @@
       //Now we send window size tokens to start the consumption
       //We even send it if windowSize == -1, since we need to start the consumer
       
-      remotingConnection.send(response.getConsumerID(), new ConsumerFlowTokenMessage(response.getWindowSize()), true);
+      remotingConnection.send(response.getConsumerID(), id, new ConsumerFlowTokenMessage(response.getWindowSize()), true);
 
       return consumer;
    }
@@ -409,7 +409,7 @@
       
       if (deliveryExpired)
       {
-         remotingConnection.send(id, new SessionCancelMessage(lastID, true), true);
+         remotingConnection.send(id, id, new SessionCancelMessage(lastID, true), true);
          
          toAckCount = 0;
       }
@@ -801,7 +801,7 @@
       
       SessionAcknowledgeMessage message = new SessionAcknowledgeMessage(lastID, !broken);
             
-      remotingConnection.send(id, message, !block);
+      remotingConnection.send(id, id, message, !block);
       
       acked = true;
    }

Modified: trunk/src/main/org/jboss/messaging/core/client/impl/RemotingConnection.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/RemotingConnection.java	2008-04-04 16:34:55 UTC (rev 4013)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/RemotingConnection.java	2008-04-07 12:28:48 UTC (rev 4014)
@@ -20,9 +20,15 @@
    
    public String getSessionID();
  
-   AbstractPacket send(String id, AbstractPacket packet) throws MessagingException;
+   /**
+    * Use this method if the packet is to be executed in the context of the targetID (i.e. for
+    * sessions, connections & connections factories)
+    */
+   AbstractPacket send(String targetID, AbstractPacket packet) throws MessagingException;
+
+   AbstractPacket send(String targetID, String executorID, AbstractPacket packet) throws MessagingException;
    
-   AbstractPacket send(String id, AbstractPacket packet, boolean oneWay) throws MessagingException;
+   AbstractPacket send(String targetID, String executorID, AbstractPacket packet, boolean oneWay) throws MessagingException;
    
    void setFailureListener(FailureListener newListener);
    

Modified: trunk/src/main/org/jboss/messaging/core/client/impl/RemotingConnectionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/RemotingConnectionImpl.java	2008-04-04 16:34:55 UTC (rev 4013)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/RemotingConnectionImpl.java	2008-04-07 12:28:48 UTC (rev 4014)
@@ -126,19 +126,26 @@
       return session.getID();
    }
  
+   public AbstractPacket send(String targetID, AbstractPacket packet)
+         throws MessagingException
+   {
+      return send(targetID, targetID, packet);
+   }
+
    /**
     * send the packet and block until a response is received (<code>oneWay</code> is set to <code>false</code>)
     */
-   public AbstractPacket send(final String id, final AbstractPacket packet) throws MessagingException
+   public AbstractPacket send(final String targetID, final String executorID, final AbstractPacket packet) throws MessagingException
    {
-      return send(id, packet, false);
+      return send(targetID, executorID, packet, false);
    }
    
-   public AbstractPacket send(final String id, final AbstractPacket packet, final boolean oneWay) throws MessagingException
+   public AbstractPacket send(final String targetID, final String executorID, final AbstractPacket packet, final boolean oneWay) throws MessagingException
    {
       assert packet != null;
 
-      packet.setTargetID(id);
+      packet.setTargetID(targetID);
+      packet.setExecutorID(executorID);
       
       AbstractPacket response;
       

Modified: trunk/src/main/org/jboss/messaging/core/remoting/PacketDispatcher.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/PacketDispatcher.java	2008-04-04 16:34:55 UTC (rev 4013)
+++ trunk/src/main/org/jboss/messaging/core/remoting/PacketDispatcher.java	2008-04-07 12:28:48 UTC (rev 4014)
@@ -21,7 +21,9 @@
    void register(PacketHandler handler);
 
    void unregister(String handlerID);
-
+   
+   void setListener(PacketHandlerRegistrationListener listener);
+   
    void dispatch(Packet packet, PacketSender sender) throws Exception;
 
    /** Call filters on a package */

Added: trunk/src/main/org/jboss/messaging/core/remoting/PacketHandlerRegistrationListener.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/PacketHandlerRegistrationListener.java	                        (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/remoting/PacketHandlerRegistrationListener.java	2008-04-07 12:28:48 UTC (rev 4014)
@@ -0,0 +1,19 @@
+/*
+ * JBoss, Home of Professional Open Source
+ *
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
+ */ 
+package org.jboss.messaging.core.remoting;
+
+/**
+ * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
+ *
+ * @version <tt>$Revision$</tt>
+ *
+ */
+public interface PacketHandlerRegistrationListener
+{
+   void handlerRegistered(String handlerID);
+   void handlerUnregistered(String handlerID);
+}

Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/PacketDispatcherImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/PacketDispatcherImpl.java	2008-04-04 16:34:55 UTC (rev 4013)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/PacketDispatcherImpl.java	2008-04-07 12:28:48 UTC (rev 4014)
@@ -15,9 +15,10 @@
 import java.util.concurrent.ConcurrentHashMap;
 
 import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.remoting.Interceptor;
 import org.jboss.messaging.core.remoting.PacketDispatcher;
-import org.jboss.messaging.core.remoting.Interceptor;
 import org.jboss.messaging.core.remoting.PacketHandler;
+import org.jboss.messaging.core.remoting.PacketHandlerRegistrationListener;
 import org.jboss.messaging.core.remoting.PacketSender;
 import org.jboss.messaging.core.remoting.impl.wireformat.Packet;
 
@@ -28,14 +29,18 @@
  */
 public class PacketDispatcherImpl implements PacketDispatcher, Serializable
 {
+
    // Constants -----------------------------------------------------
 
+   private static final long serialVersionUID = -4626926952268528384L;
+
    public static final Logger log = Logger.getLogger(PacketDispatcherImpl.class);
 
    // Attributes ----------------------------------------------------
 
    private Map<String, PacketHandler> handlers;
    public List<Interceptor> filters;
+   private transient PacketHandlerRegistrationListener listener;
 
    // Static --------------------------------------------------------
 
@@ -55,8 +60,6 @@
    }
 
    // Public --------------------------------------------------------
-   
-   
 
    /* (non-Javadoc)
     * @see org.jboss.messaging.core.remoting.impl.IPacketDispatcher#register(org.jboss.messaging.core.remoting.PacketHandler)
@@ -72,6 +75,9 @@
       {
          log.debug("registered " + handler + " with ID " + handler.getID());
       }
+      
+      if (listener != null)
+         listener.handlerRegistered(handler.getID());
    }
 
    /* (non-Javadoc)
@@ -87,7 +93,15 @@
       {
          log.debug("unregistered handler for " + handlerID);
       }
+      
+      if (listener != null)
+         listener.handlerUnregistered(handlerID);
    }
+   
+   public void setListener(PacketHandlerRegistrationListener listener)
+   {
+      this.listener = listener;
+   }
 
    public PacketHandler getHandler(String handlerID)
    {

Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/codec/AbstractPacketCodec.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/codec/AbstractPacketCodec.java	2008-04-04 16:34:55 UTC (rev 4013)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/codec/AbstractPacketCodec.java	2008-04-07 12:28:48 UTC (rev 4014)
@@ -77,13 +77,19 @@
       {
          callbackID = null;
       }
-      int headerLength = LONG_LENGTH + sizeof(targetID) + sizeof(callbackID) + BOOLEAN_LENGTH;
+      String executorID = packet.getExecutorID();
+      if (NO_ID_SET.equals(executorID))
+      {
+         executorID = targetID;
+      }
+      int headerLength = LONG_LENGTH + sizeof(targetID) + sizeof(callbackID) + sizeof(executorID) + BOOLEAN_LENGTH;
 
       buf.put(packet.getType().byteValue());
       buf.putInt(headerLength);
       buf.putLong(correlationID);
       buf.putNullableString(targetID);
       buf.putNullableString(callbackID);
+      buf.putNullableString(executorID);
       buf.putBoolean(packet.isOneWay());
 
       encodeBody(packet, buf);
@@ -139,6 +145,13 @@
       {
          return NOT_OK;
       }
+      try
+      {
+         buffer.getNullableString();
+      } catch (CharacterCodingException e)
+      {
+         return NOT_OK;
+      }
       buffer.getBoolean(); // oneWay boolean
       if (buffer.remaining() < INT_LENGTH)
       {
@@ -168,6 +181,7 @@
       long correlationID = wrapper.getLong();
       String targetID = wrapper.getNullableString();
       String callbackID = wrapper.getNullableString();
+      String executorID = wrapper.getNullableString();
       boolean oneWay = wrapper.getBoolean();
       
       P packet = decodeBody(wrapper);
@@ -179,10 +193,16 @@
       if (targetID == null)
          targetID = NO_ID_SET;
       packet.setTargetID(targetID);
-      packet.setCorrelationID(correlationID);
+      
       if (callbackID == null)
          callbackID = NO_ID_SET;
       packet.setCallbackID(callbackID);
+      
+      if (executorID == null)
+         executorID = targetID;
+      packet.setExecutorID(executorID);
+      
+      packet.setCorrelationID(correlationID);
       packet.setOneWay(oneWay);
 
       return packet;

Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/FilterChainSupport.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/FilterChainSupport.java	2008-04-04 16:34:55 UTC (rev 4013)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/FilterChainSupport.java	2008-04-07 12:28:48 UTC (rev 4014)
@@ -17,7 +17,6 @@
 
 import org.apache.mina.common.DefaultIoFilterChainBuilder;
 import org.apache.mina.filter.codec.ProtocolCodecFilter;
-import org.apache.mina.filter.executor.ExecutorFilter;
 import org.apache.mina.filter.keepalive.KeepAliveFilter;
 import org.apache.mina.filter.logging.LoggingFilter;
 import org.apache.mina.filter.logging.MdcInjectionFilter;
@@ -120,12 +119,6 @@
       filterChain.addLast("logger", filter);
    }
 
-   static void addExecutorFilter(DefaultIoFilterChainBuilder filterChain)
-   {
-      ExecutorFilter executorFilter = new ExecutorFilter();
-      filterChain.addLast("executor", executorFilter);
-   }
-
    static ScheduledExecutorService addBlockingRequestResponseFilter(
          DefaultIoFilterChainBuilder filterChain)
    {

Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaConnector.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaConnector.java	2008-04-04 16:34:55 UTC (rev 4013)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaConnector.java	2008-04-07 12:28:48 UTC (rev 4014)
@@ -8,7 +8,6 @@
 
 import static org.jboss.messaging.core.remoting.impl.mina.FilterChainSupport.addBlockingRequestResponseFilter;
 import static org.jboss.messaging.core.remoting.impl.mina.FilterChainSupport.addCodecFilter;
-import static org.jboss.messaging.core.remoting.impl.mina.FilterChainSupport.addExecutorFilter;
 import static org.jboss.messaging.core.remoting.impl.mina.FilterChainSupport.addKeepAliveFilter;
 import static org.jboss.messaging.core.remoting.impl.mina.FilterChainSupport.addLoggingFilter;
 import static org.jboss.messaging.core.remoting.impl.mina.FilterChainSupport.addMDCFilter;
@@ -18,6 +17,8 @@
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 
 import org.apache.mina.common.CloseFuture;
@@ -38,7 +39,7 @@
 import org.jboss.messaging.core.remoting.NIOSession;
 import org.jboss.messaging.core.remoting.PacketDispatcher;
 import org.jboss.messaging.core.remoting.RemotingException;
-import org.jboss.messaging.core.remoting.impl.wireformat.AbstractPacket;
+import org.jboss.messaging.core.remoting.impl.wireformat.Packet;
 import org.jboss.messaging.core.remoting.impl.wireformat.Ping;
 
 /**
@@ -57,10 +58,14 @@
 
    private Configuration configuration;
 
-   private NioSocketConnector connector;
+   private transient NioSocketConnector connector;
 
+   private PacketDispatcher dispatcher;
+
    private ScheduledExecutorService blockingScheduler;
 
+   private ExecutorService threadPool;
+   
    private IoSession session;
 
    private List<FailureListener> listeners = new ArrayList<FailureListener>();
@@ -85,6 +90,7 @@
       assert keepAliveFactory != null;
 
       this.configuration = configuration;
+      this.dispatcher = dispatcher;
 
       this.connector = new NioSocketConnector();
       DefaultIoFilterChainBuilder filterChain = connector.getFilterChain();
@@ -107,9 +113,6 @@
       blockingScheduler = addBlockingRequestResponseFilter(filterChain);
       addKeepAliveFilter(filterChain, keepAliveFactory, configuration.getKeepAliveInterval(),
             configuration.getKeepAliveTimeout(), this);
-      addExecutorFilter(filterChain);
-
-      connector.setHandler(new MinaHandler(dispatcher, this, false));
       connector.getSessionConfig().setKeepAlive(true);
       connector.getSessionConfig().setReuseAddress(true);
    }
@@ -122,6 +125,9 @@
       {
          return new MinaSession(session);
       }
+      
+      threadPool = Executors.newCachedThreadPool();
+      connector.setHandler(new MinaHandler(dispatcher, threadPool, this, false));
       InetSocketAddress address = new InetSocketAddress(configuration.getHost(), configuration.getPort());
       ConnectFuture future = connector.connect(address);
       connector.setDefaultRemoteAddress(address);
@@ -134,7 +140,7 @@
          throw new IOException("Cannot connect to " + address.toString());
       }
       this.session = future.getSession();
-      AbstractPacket packet = new Ping(Long.toString(session.getId()));
+      Packet packet = new Ping(Long.toString(session.getId()));
       session.write(packet);
       
       return new MinaSession(session);
@@ -153,7 +159,8 @@
       blockingScheduler.shutdown();
       connector.removeListener(ioListener);
       connector.dispose();
-
+      threadPool.shutdown();
+      
       SslFilter sslFilter = (SslFilter) session.getFilterChain().get("ssl");
       // FIXME without this hack, exceptions are thrown:
       // "Unexpected exception from SSLEngine.closeInbound()." -> because the ssl session is not stopped

Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaHandler.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaHandler.java	2008-04-04 16:34:55 UTC (rev 4013)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaHandler.java	2008-04-07 12:28:48 UTC (rev 4014)
@@ -6,17 +6,24 @@
  */
 package org.jboss.messaging.core.remoting.impl.mina;
 
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+
 import org.apache.mina.common.IoHandlerAdapter;
 import org.apache.mina.common.IoSession;
 import org.apache.mina.filter.reqres.Response;
 import org.jboss.messaging.core.exception.MessagingException;
 import org.jboss.messaging.core.logging.Logger;
 import org.jboss.messaging.core.remoting.PacketDispatcher;
+import org.jboss.messaging.core.remoting.PacketHandlerRegistrationListener;
 import org.jboss.messaging.core.remoting.PacketSender;
 import org.jboss.messaging.core.remoting.RemotingException;
 import org.jboss.messaging.core.remoting.impl.wireformat.AbstractPacket;
 import org.jboss.messaging.core.remoting.impl.wireformat.Packet;
 import org.jboss.messaging.core.remoting.impl.wireformat.Ping;
+import org.jboss.messaging.util.OrderedExecutorFactory;
 
 /**
  * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
@@ -24,7 +31,7 @@
  * @version <tt>$Revision$</tt>
  * 
  */
-public class MinaHandler extends IoHandlerAdapter
+public class MinaHandler extends IoHandlerAdapter implements PacketHandlerRegistrationListener
 {
    // Constants -----------------------------------------------------
 
@@ -38,19 +45,40 @@
 
    private boolean closeSessionOnExceptionCaught;
 
+   private OrderedExecutorFactory executorFactory;
+
+   private Map<String, Executor> executors = new HashMap<String, Executor>();
+   
    // Static --------------------------------------------------------
 
    // Constructors --------------------------------------------------
-   
-   public MinaHandler(PacketDispatcher dispatcher, FailureNotifier failureNotifier, boolean closeSessionOnExceptionCaught)
+   public MinaHandler(PacketDispatcher dispatcher, ExecutorService executorService, FailureNotifier failureNotifier, boolean closeSessionOnExceptionCaught)
    {
+      assert dispatcher!= null;
+      assert executorService != null;
+      
       this.dispatcher = dispatcher;
       this.failureNotifier = failureNotifier;
       this.closeSessionOnExceptionCaught = closeSessionOnExceptionCaught;
+      
+      this.executorFactory = new OrderedExecutorFactory(executorService);
+      this.dispatcher.setListener(this);
    }
 
    // Public --------------------------------------------------------
 
+   // PacketHandlerRegistrationListener implementation --------------
+
+   public void handlerRegistered(String handlerID)
+   {
+      // do nothing on registration
+   }
+   
+   public void handlerUnregistered(String handlerID)
+   {
+      executors.remove(handlerID);
+   }
+
    // IoHandlerAdapter overrides ------------------------------------
 
    @Override
@@ -73,7 +101,7 @@
    }
    
    @Override
-   public void messageReceived(final IoSession session, Object message)
+   public void messageReceived(final IoSession session, final Object message)
          throws Exception
    {
       if (message instanceof Response)
@@ -92,17 +120,51 @@
          // response is handled by the keep-alive filter.
          // do nothing
          return;
-      }
-
-      if (!(message instanceof AbstractPacket))
+      } 
+      
+      if (!(message instanceof Packet))
       {
          throw new IllegalArgumentException("Unknown message type: " + message);
       }
-
       
+      final Packet packet = (Packet) message;
+      String executorID = packet.getExecutorID();
       
-      AbstractPacket packet = (AbstractPacket) message;
+      if (AbstractPacket.NO_ID_SET.equals(executorID)) 
+         throw new IllegalArgumentException("executor ID not set for " + packet);
+      
+      Executor executor = executors.get(executorID);
+      if (executor == null)
+      {
+         executor = this.executorFactory.getOrderedExecutor();
+         executors.put(executorID, executor);
+      }
+      
+      executor.execute(new Runnable() 
+      {
+         public void run()
+         {
+            try
+            {
+               messageReceived0(session, packet);
+            } catch (Exception e)
+            {
+               log.error("unexpected error", e);
+            }
+         }
+      });
+      
+   }
 
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   // Private -------------------------------------------------------
+
+   private void messageReceived0(final IoSession session, Packet packet)
+         throws Exception
+   {
       PacketSender sender = new PacketSender()
       {
          public void send(Packet p) throws Exception
@@ -127,12 +189,6 @@
 
       dispatcher.dispatch(packet, sender);
    }
-
-   // Package protected ---------------------------------------------
-
-   // Protected -----------------------------------------------------
-
-   // Private -------------------------------------------------------
-
+   
    // Inner classes -------------------------------------------------
 }

Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaService.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaService.java	2008-04-04 16:34:55 UTC (rev 4013)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaService.java	2008-04-07 12:28:48 UTC (rev 4014)
@@ -10,7 +10,6 @@
 import static org.jboss.messaging.core.remoting.TransportType.INVM;
 import static org.jboss.messaging.core.remoting.impl.RemotingConfigurationValidator.validate;
 import static org.jboss.messaging.core.remoting.impl.mina.FilterChainSupport.addCodecFilter;
-import static org.jboss.messaging.core.remoting.impl.mina.FilterChainSupport.addExecutorFilter;
 import static org.jboss.messaging.core.remoting.impl.mina.FilterChainSupport.addKeepAliveFilter;
 import static org.jboss.messaging.core.remoting.impl.mina.FilterChainSupport.addLoggingFilter;
 import static org.jboss.messaging.core.remoting.impl.mina.FilterChainSupport.addMDCFilter;
@@ -20,6 +19,8 @@
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 
 import org.apache.mina.common.DefaultIoFilterChainBuilder;
 import org.apache.mina.common.IdleStatus;
@@ -63,6 +64,8 @@
 
    private PacketDispatcher dispatcher;
 
+   private ExecutorService threadPool; 
+   
    private List<FailureListener> listeners = new ArrayList<FailureListener>();
 
    private ServerKeepAliveFactory factory;
@@ -142,7 +145,6 @@
          addLoggingFilter(filterChain);
          addKeepAliveFilter(filterChain, factory,
                config.getKeepAliveInterval(), config.getKeepAliveTimeout(), this);
-         addExecutorFilter(filterChain);
 
          // Bind
          acceptor.setDefaultLocalAddress(new InetSocketAddress(config.getHost(), config.getPort()));
@@ -151,7 +153,8 @@
          acceptor.getSessionConfig().setKeepAlive(true);
          acceptor.setCloseOnDeactivation(false);
 
-         acceptor.setHandler(new MinaHandler(dispatcher, this, true));
+         threadPool = Executors.newCachedThreadPool();
+         acceptor.setHandler(new MinaHandler(dispatcher, threadPool, this, true));
          acceptor.bind();
          acceptorListener = new MinaSessionListener();
          acceptor.addListener(acceptorListener);
@@ -176,6 +179,7 @@
          acceptor.unbind();
          acceptor.dispose();
          acceptor = null;
+         threadPool.shutdown();
       }
       
       REGISTRY.unregister(config);

Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/AbstractPacket.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/AbstractPacket.java	2008-04-04 16:34:55 UTC (rev 4013)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/AbstractPacket.java	2008-04-07 12:28:48 UTC (rev 4014)
@@ -31,6 +31,8 @@
 
    private String callbackID = NO_ID_SET;
 
+   String executorID = NO_ID_SET;
+
    private final PacketType type;
 
    /**
@@ -92,6 +94,18 @@
    {
       return callbackID;
    }
+   
+  public String getExecutorID()
+  {
+     return executorID;
+  }
+  
+  public void setExecutorID(String executorID)
+  {
+     assertValidID(executorID);
+     
+     this.executorID = executorID;
+  }
 
    public void setOneWay(boolean oneWay)
    {
@@ -131,7 +145,7 @@
    {
       return "PACKET[type=" + type
             + ", correlationID=" + correlationID + ", targetID=" + targetID
-            + ", callbackID=" + callbackID + ", oneWay=" + oneWay;
+            + ", callbackID=" + callbackID + ", executorID=" + executorID + ", oneWay=" + oneWay;
    }
 
    // Protected -----------------------------------------------------

Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/CreateConnectionResponse.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/CreateConnectionResponse.java	2008-04-04 16:34:55 UTC (rev 4013)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/CreateConnectionResponse.java	2008-04-07 12:28:48 UTC (rev 4014)
@@ -43,6 +43,12 @@
       return connectionID;
    }
    
+   @Override
+   public String toString()
+   {
+      return getParentString() + ", connectionID" + connectionID + "]";
+   }
+
    // Package protected ---------------------------------------------
 
    // Protected -----------------------------------------------------

Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/Packet.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/Packet.java	2008-04-04 16:34:55 UTC (rev 4013)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/Packet.java	2008-04-07 12:28:48 UTC (rev 4014)
@@ -7,6 +7,7 @@
 
 package org.jboss.messaging.core.remoting.impl.wireformat;
 
+
 public interface Packet
 {
    // Public --------------------------------------------------------
@@ -21,10 +22,14 @@
 
    void setTargetID(String targetID);
 
+   String getCallbackID();
+
    void setCallbackID(String callbackID);
 
-   String getCallbackID();
+   String getExecutorID();
 
+   void setExecutorID(String executorID);
+
    void setOneWay(boolean oneWay);
 
    boolean isOneWay();
@@ -35,6 +40,4 @@
     * An AbstractPacket is a request if it has a target ID and a correlation ID
     */
    public boolean isRequest();
-
-
 }

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/DeliveryImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/DeliveryImpl.java	2008-04-04 16:34:55 UTC (rev 4013)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/DeliveryImpl.java	2008-04-07 12:28:48 UTC (rev 4014)
@@ -42,16 +42,20 @@
    
    private final MessageReference reference;
    
+   private final String sessionID;
+
    private final String consumerID;
    
    private final long deliveryID;
    
    private final PacketSender sender;
 
-   public DeliveryImpl(final MessageReference reference, final String consumerID,
+   public DeliveryImpl(final MessageReference reference, 
+                       final String sessionID, final String consumerID,
                        final long deliveryID, final PacketSender sender)
    {      
       this.reference = reference;
+      this.sessionID = sessionID;
       this.consumerID = consumerID;
       this.deliveryID = deliveryID;
       this.sender = sender;
@@ -83,6 +87,7 @@
       ConsumerDeliverMessage message = new ConsumerDeliverMessage(copy, deliveryID);
       
       message.setTargetID(consumerID);
+      message.setExecutorID(sessionID);
       
       sender.send(message);
    }

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerProducerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerProducerImpl.java	2008-04-04 16:34:55 UTC (rev 4013)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerProducerImpl.java	2008-04-07 12:28:48 UTC (rev 4014)
@@ -108,7 +108,7 @@
 		Packet packet = new ProducerReceiveTokensMessage(1);
 		
 		packet.setTargetID(id);
-			
+		packet.setExecutorID(session.getID());
 		sender.send(packet);		
 	}
 	

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java	2008-04-04 16:34:55 UTC (rev 4013)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java	2008-04-07 12:28:48 UTC (rev 4014)
@@ -224,7 +224,7 @@
    
    public synchronized void handleDelivery(final MessageReference ref, final ServerConsumer consumer) throws Exception
    {
-      Delivery delivery = new DeliveryImpl(ref, consumer.getID(), deliveryIDSequence++, sender);
+      Delivery delivery = new DeliveryImpl(ref, id, consumer.getID(), deliveryIDSequence++, sender);
 
       deliveries.add(delivery);
 

Added: trunk/src/main/org/jboss/messaging/util/OrderedExecutorFactory.java
===================================================================
--- trunk/src/main/org/jboss/messaging/util/OrderedExecutorFactory.java	                        (rev 0)
+++ trunk/src/main/org/jboss/messaging/util/OrderedExecutorFactory.java	2008-04-07 12:28:48 UTC (rev 4014)
@@ -0,0 +1,75 @@
+/*
+ * JBoss, Home of Professional Open Source
+ *
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
+ */
+package org.jboss.messaging.util;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.Set;
+import java.util.concurrent.Executor;
+
+/**
+ * This factory creates a hierarchy of Executor which shares the threads of the
+ * parent Executor (typically, the root parent is a Thread pool).
+ * 
+ * @author <a href="david.lloyd at jboss.com">David Lloyd</a>
+ * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
+ * 
+ * @version <tt>$Revision$</tt>
+ * 
+ */
+public final class OrderedExecutorFactory
+{
+   private final Executor parent;
+   private final Set<ChildExecutor> runningChildren = Collections
+         .synchronizedSet(new HashSet<ChildExecutor>());
+
+   public OrderedExecutorFactory(final Executor parent)
+   {
+      this.parent = parent;
+   }
+
+   public Executor getOrderedExecutor()
+   {
+      return new ChildExecutor();
+   }
+
+   private final class ChildExecutor implements Executor, Runnable
+   {
+      private final LinkedList<Runnable> tasks = new LinkedList<Runnable>();
+
+      public void execute(Runnable command)
+      {
+         synchronized (tasks)
+         {
+            tasks.add(command);
+            if (tasks.size() == 1 && runningChildren.add(this))
+            {
+               parent.execute(this);
+            }
+         }
+      }
+
+      public void run()
+      {
+         for (;;)
+         {
+            final Runnable task;
+            synchronized (tasks)
+            {
+               task = tasks.poll();
+               if (task == null)
+               {
+                  runningChildren.remove(this);
+                  return;
+               }
+            }
+            task.run();
+         }
+      }
+   }
+}

Added: trunk/tests/src/org/jboss/messaging/core/remoting/impl/mina/integration/test/MinaHandlerOrderingTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/core/remoting/impl/mina/integration/test/MinaHandlerOrderingTest.java	                        (rev 0)
+++ trunk/tests/src/org/jboss/messaging/core/remoting/impl/mina/integration/test/MinaHandlerOrderingTest.java	2008-04-07 12:28:48 UTC (rev 4014)
@@ -0,0 +1,130 @@
+/*
+ * JBoss, Home of Professional Open Source
+ *
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
+ */
+package org.jboss.messaging.core.remoting.impl.mina.integration.test;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static java.util.concurrent.TimeUnit.SECONDS;
+
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import junit.framework.TestCase;
+
+import org.jboss.messaging.core.remoting.PacketDispatcher;
+import org.jboss.messaging.core.remoting.impl.PacketDispatcherImpl;
+import org.jboss.messaging.core.remoting.impl.mina.MinaHandler;
+import org.jboss.messaging.core.remoting.impl.wireformat.TextPacket;
+import org.jboss.messaging.core.remoting.test.unit.TestPacketHandler;
+
+/**
+ * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
+ * 
+ * @version <tt>$Revision$</tt>
+ * 
+ */
+public class MinaHandlerOrderingTest extends TestCase
+{
+
+   private MinaHandler handler;
+   private ExecutorService threadPool;
+   
+   private TestPacketHandler handler_1;
+   private TestPacketHandler handler_2;
+   private PacketDispatcher clientDispatcher;
+
+   // Constants -----------------------------------------------------
+
+   private static final int MANY_MESSAGES = 10000;
+   
+   // Attributes ----------------------------------------------------
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   // Public --------------------------------------------------------
+
+   /**
+    * Test that when messages are sent to 2 different targetIDs,
+    * the messages are handled concurrently by the 2 PacketHandlers
+    */
+   public void testSerializationOrder() throws Exception
+   {
+      handler_1.expectMessage(2);
+      handler_2.expectMessage(MANY_MESSAGES);
+
+      TextPacket packet_1 = new TextPacket("testSerializationOrder handled by handle_1");
+      packet_1.setTargetID(handler_1.getID());
+      packet_1.setExecutorID(handler_1.getID());
+
+      // we send 1 packet to handler_1
+      // then many packets to handler_2
+      // and again 1 packet to handler_1
+      handler.messageReceived(null, packet_1);
+      for (int i = 0; i < MANY_MESSAGES; i++)
+      {
+         TextPacket packet_2 = new TextPacket(Integer.toString(i));
+         packet_2.setTargetID(handler_2.getID());
+         packet_2.setExecutorID(handler_2.getID());
+         handler.messageReceived(null, packet_2);
+      }
+      handler.messageReceived(null, packet_1);
+
+      // we expect to receive the 2 packets on handler_1
+      // *before* handler_2 received all its packets 
+      assertTrue(handler_1.await(50, MILLISECONDS));
+      int size = handler_2.getPackets().size();
+      assertTrue("handler_2 should not have received all its message (size:" + size + ")", size < MANY_MESSAGES);
+
+      assertTrue(handler_2.await(5, SECONDS));
+      List<TextPacket> packetsReceivedByHandler_2 = handler_2.getPackets();
+      assertEquals(MANY_MESSAGES, packetsReceivedByHandler_2.size());      
+      // we check that handler_2 receives all its messages in order:
+      for (int i = 0; i < MANY_MESSAGES; i++)
+      {
+         TextPacket p = packetsReceivedByHandler_2.get(i);
+         assertEquals(Integer.toString(i), p.getText());
+      }      
+   }
+
+   // TestCase overrides --------------------------------------------
+
+   @Override
+   protected void setUp() throws Exception
+   {
+      clientDispatcher = new PacketDispatcherImpl();
+      threadPool = Executors.newCachedThreadPool();
+      handler = new MinaHandler(clientDispatcher, threadPool, null, true);
+
+      handler_1 = new TestPacketHandler();
+      clientDispatcher.register(handler_1);
+      handler_2 = new TestPacketHandler();
+      clientDispatcher.register(handler_2);
+   }
+
+   @Override
+   protected void tearDown() throws Exception
+   {
+      clientDispatcher.unregister(handler_1.getID());
+      clientDispatcher.unregister(handler_2.getID());
+      threadPool.shutdown();
+      handler_1 = null;
+      handler_2 = null;
+      clientDispatcher = null;
+      handler = null;
+      threadPool = null;
+   }
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+}

Modified: trunk/tests/src/org/jboss/messaging/core/remoting/impl/mina/integration/test/MinaHandlerTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/core/remoting/impl/mina/integration/test/MinaHandlerTest.java	2008-04-04 16:34:55 UTC (rev 4013)
+++ trunk/tests/src/org/jboss/messaging/core/remoting/impl/mina/integration/test/MinaHandlerTest.java	2008-04-07 12:28:48 UTC (rev 4014)
@@ -6,6 +6,11 @@
  */
 package org.jboss.messaging.core.remoting.impl.mina.integration.test;
 
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
 import junit.framework.TestCase;
 
 import org.jboss.messaging.core.remoting.PacketDispatcher;
@@ -24,6 +29,7 @@
 {
 
    private MinaHandler handler;
+   private ExecutorService threadPool;
    private TestPacketHandler packetHandler;
    private PacketDispatcher clientDispatcher;
 
@@ -51,7 +57,8 @@
    public void testReceiveUnhandledAbstractPacket() throws Exception
    {
       TextPacket packet = new TextPacket("testReceiveUnhandledAbstractPacket");
-
+      packet.setExecutorID(packetHandler.getID());
+      
       handler.messageReceived(null, packet);
 
       assertEquals(0, packetHandler.getPackets().size());
@@ -59,12 +66,15 @@
 
    public void testReceiveHandledAbstractPacket() throws Exception
    {
+      packetHandler.expectMessage(1);
 
       TextPacket packet = new TextPacket("testReceiveHandledAbstractPacket");
       packet.setTargetID(packetHandler.getID());
+      packet.setExecutorID(packetHandler.getID());
 
       handler.messageReceived(null, packet);
 
+      assertTrue(packetHandler.await(500, MILLISECONDS));
       assertEquals(1, packetHandler.getPackets().size());
       assertEquals(packet.getText(), packetHandler.getPackets().get(0)
             .getText());
@@ -76,7 +86,8 @@
    protected void setUp() throws Exception
    {
       clientDispatcher = new PacketDispatcherImpl();
-      handler = new MinaHandler(clientDispatcher, null, true);
+      threadPool = Executors.newCachedThreadPool();
+      handler = new MinaHandler(clientDispatcher, threadPool, null, true);
 
       packetHandler = new TestPacketHandler();
       clientDispatcher.register(packetHandler);
@@ -86,9 +97,11 @@
    protected void tearDown() throws Exception
    {
       clientDispatcher.unregister(packetHandler.getID());
+      threadPool.shutdown();
       packetHandler = null;
       clientDispatcher = null;
       handler = null;
+      threadPool = null;
    }
 
    // Package protected ---------------------------------------------

Modified: trunk/tests/src/org/jboss/messaging/core/remoting/impl/wireformat/test/unit/PacketTypeTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/core/remoting/impl/wireformat/test/unit/PacketTypeTest.java	2008-04-04 16:34:55 UTC (rev 4013)
+++ trunk/tests/src/org/jboss/messaging/core/remoting/impl/wireformat/test/unit/PacketTypeTest.java	2008-04-07 12:28:48 UTC (rev 4014)
@@ -287,14 +287,14 @@
 
       assertEquals(buffer.get(), packet.getType().byteValue());
 
-      String targetID = packet.getTargetID();
-      if (NO_ID_SET.equals(packet.getTargetID()))
-         targetID = null;
-      String callbackID = packet.getCallbackID();
-      if (NO_ID_SET.equals(packet.getCallbackID()))
-         callbackID = null;
+      String targetID = (packet.getTargetID().equals(NO_ID_SET) ? null : packet
+            .getTargetID());
+      String callbackID = (packet.getCallbackID().equals(NO_ID_SET) ? null
+            : packet.getCallbackID());
+      String executorID = (packet.getExecutorID().equals(NO_ID_SET) ? null
+            : packet.getExecutorID());
 
-      int headerLength = LONG_LENGTH + sizeof(targetID) + sizeof(callbackID)
+      int headerLength = LONG_LENGTH + sizeof(targetID) + sizeof(callbackID) + sizeof(executorID)
             + BOOLEAN_LENGTH;
       assertEquals(buffer.getInt(), headerLength);
       assertEquals(buffer.getLong(), packet.getCorrelationID());
@@ -305,10 +305,14 @@
       String bufferCallbackID = buffer.getNullableString();
       if (bufferCallbackID == null)
          bufferCallbackID = NO_ID_SET;
+      String bufferExecutorID = buffer.getNullableString();
+      if (bufferExecutorID == null)
+         bufferExecutorID = NO_ID_SET;
       boolean oneWay = buffer.getBoolean();
 
       assertEquals(bufferTargetID, packet.getTargetID());
       assertEquals(bufferCallbackID, packet.getCallbackID());
+      assertEquals(bufferExecutorID, packet.getExecutorID());
       assertEquals(oneWay, packet.isOneWay());
    }
 
@@ -318,8 +322,10 @@
             .getTargetID());
       String callbackID = (packet.getCallbackID().equals(NO_ID_SET) ? null
             : packet.getCallbackID());
+      String executorID = (packet.getExecutorID().equals(NO_ID_SET) ? null
+            : packet.getExecutorID());
 
-      int headerLength = LONG_LENGTH + sizeof(targetID) + sizeof(callbackID)
+      int headerLength = LONG_LENGTH + sizeof(targetID) + sizeof(callbackID) + sizeof(executorID)
             + BOOLEAN_LENGTH;
       ByteBuffer expected = ByteBuffer.allocate(1 + 1 + INT_LENGTH
             + headerLength);
@@ -329,6 +335,7 @@
       expected.putLong(packet.getCorrelationID());
       putNullableString(targetID, expected);
       putNullableString(callbackID, expected);
+      putNullableString(executorID, expected);
       expected.put(packet.isOneWay() ? TRUE : FALSE);
       expected.flip();
 
@@ -363,6 +370,7 @@
       packet.setCallbackID(randomString());
       packet.setCorrelationID(randomLong());
       packet.setTargetID(randomString());
+      packet.setExecutorID(randomString());
 
       AbstractPacketCodec<AbstractPacket> codec = PacketCodecFactory
             .createCodecForEmptyPacket(NULL, NullPacket.class);

Modified: trunk/tests/src/org/jboss/messaging/core/remoting/test/unit/PacketDispatcherTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/core/remoting/test/unit/PacketDispatcherTest.java	2008-04-04 16:34:55 UTC (rev 4013)
+++ trunk/tests/src/org/jboss/messaging/core/remoting/test/unit/PacketDispatcherTest.java	2008-04-07 12:28:48 UTC (rev 4014)
@@ -15,6 +15,7 @@
 import junit.framework.TestCase;
 
 import org.jboss.messaging.core.remoting.PacketHandler;
+import org.jboss.messaging.core.remoting.PacketHandlerRegistrationListener;
 import org.jboss.messaging.core.remoting.PacketSender;
 import org.jboss.messaging.core.remoting.impl.PacketDispatcherImpl;
 import org.jboss.messaging.core.remoting.impl.wireformat.TextPacket;
@@ -113,6 +114,26 @@
       verify(handler, sender);
    }
 
+   public void testRegistrationListener() throws Exception
+   {
+      PacketHandlerRegistrationListener listener = createMock(PacketHandlerRegistrationListener.class);
+      PacketHandler handler = createMock(PacketHandler.class);
+
+      String id = randomUUID().toString();
+      expect(handler.getID()).andStubReturn(id);
+      listener.handlerRegistered(id);
+      expectLastCall().once();
+      listener.handlerUnregistered(id);
+      expectLastCall().once();
+
+      replay(handler, listener);
+      
+      dispatcher.setListener(listener);
+      dispatcher.register(handler);
+      dispatcher.unregister(id);
+      
+      verify(handler, listener);
+   }
    // Package protected ---------------------------------------------
 
    // Protected -----------------------------------------------------




More information about the jboss-cvs-commits mailing list