[jboss-cvs] JBoss Messaging SVN: r4014 - in trunk: src/main/org/jboss/messaging/core/remoting and 9 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Mon Apr 7 08:28:48 EDT 2008
Author: jmesnil
Date: 2008-04-07 08:28:48 -0400 (Mon, 07 Apr 2008)
New Revision: 4014
Added:
trunk/src/main/org/jboss/messaging/core/remoting/PacketHandlerRegistrationListener.java
trunk/src/main/org/jboss/messaging/util/OrderedExecutorFactory.java
trunk/tests/src/org/jboss/messaging/core/remoting/impl/mina/integration/test/MinaHandlerOrderingTest.java
Modified:
trunk/src/main/org/jboss/messaging/core/client/impl/ClientBrowserImpl.java
trunk/src/main/org/jboss/messaging/core/client/impl/ClientConnectionImpl.java
trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java
trunk/src/main/org/jboss/messaging/core/client/impl/ClientProducerImpl.java
trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java
trunk/src/main/org/jboss/messaging/core/client/impl/RemotingConnection.java
trunk/src/main/org/jboss/messaging/core/client/impl/RemotingConnectionImpl.java
trunk/src/main/org/jboss/messaging/core/remoting/PacketDispatcher.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/PacketDispatcherImpl.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/codec/AbstractPacketCodec.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/FilterChainSupport.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaConnector.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaHandler.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaService.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/AbstractPacket.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/CreateConnectionResponse.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/Packet.java
trunk/src/main/org/jboss/messaging/core/server/impl/DeliveryImpl.java
trunk/src/main/org/jboss/messaging/core/server/impl/ServerProducerImpl.java
trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
trunk/tests/src/org/jboss/messaging/core/remoting/impl/mina/integration/test/MinaHandlerTest.java
trunk/tests/src/org/jboss/messaging/core/remoting/impl/wireformat/test/unit/PacketTypeTest.java
trunk/tests/src/org/jboss/messaging/core/remoting/test/unit/PacketDispatcherTest.java
Log:
* replaced MINA's executor filter by a custom-made OrderedExecutorFactory to have more flexibility on thread execution order (correlated by an executorID on the Packet interface)
Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientBrowserImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientBrowserImpl.java 2008-04-04 16:34:55 UTC (rev 4013)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientBrowserImpl.java 2008-04-07 12:28:48 UTC (rev 4014)
@@ -81,7 +81,7 @@
try
{
- remotingConnection.send(id, new CloseMessage());
+ remotingConnection.send(id, session.getID(), new CloseMessage());
}
finally
{
@@ -100,7 +100,7 @@
{
checkClosed();
- remotingConnection.send(id, new SessionBrowserResetMessage());
+ remotingConnection.send(id, session.getID(), new SessionBrowserResetMessage());
}
public boolean hasNextMessage() throws MessagingException
@@ -108,7 +108,7 @@
checkClosed();
SessionBrowserHasNextMessageResponseMessage response =
- (SessionBrowserHasNextMessageResponseMessage)remotingConnection.send(id, new SessionBrowserHasNextMessageMessage());
+ (SessionBrowserHasNextMessageResponseMessage)remotingConnection.send(id, session.getID(), new SessionBrowserHasNextMessageMessage());
return response.hasNext();
}
@@ -118,7 +118,7 @@
checkClosed();
SessionBrowserNextMessageResponseMessage response =
- (SessionBrowserNextMessageResponseMessage)remotingConnection.send(id, new SessionBrowserNextMessageMessage());
+ (SessionBrowserNextMessageResponseMessage)remotingConnection.send(id, session.getID(), new SessionBrowserNextMessageMessage());
return response.getMessage();
}
@@ -128,7 +128,7 @@
checkClosed();
SessionBrowserNextMessageBlockResponseMessage response =
- (SessionBrowserNextMessageBlockResponseMessage)remotingConnection.send(id, new SessionBrowserNextMessageBlockMessage(maxMessages));
+ (SessionBrowserNextMessageBlockResponseMessage)remotingConnection.send(id, session.getID(), new SessionBrowserNextMessageBlockMessage(maxMessages));
return response.getMessages();
}
Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientConnectionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientConnectionImpl.java 2008-04-04 16:34:55 UTC (rev 4013)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientConnectionImpl.java 2008-04-07 12:28:48 UTC (rev 4014)
@@ -133,7 +133,7 @@
{
checkClosed();
- remotingConnection.send(id, new ConnectionStartMessage(), true);
+ remotingConnection.send(id, id, new ConnectionStartMessage(), true);
}
public void stop() throws MessagingException
Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java 2008-04-04 16:34:55 UTC (rev 4013)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java 2008-04-07 12:28:48 UTC (rev 4014)
@@ -245,7 +245,7 @@
receiverThread = null;
- remotingConnection.send(id, new CloseMessage());
+ remotingConnection.send(id, session.getID(), new CloseMessage());
remotingConnection.getPacketDispatcher().unregister(id);
}
@@ -369,7 +369,7 @@
{
tokensToSend = 0;
- remotingConnection.send(id, new ConsumerFlowTokenMessage(tokenBatchSize), true);
+ remotingConnection.send(id, session.getID(), new ConsumerFlowTokenMessage(tokenBatchSize), true);
}
}
}
Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientProducerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientProducerImpl.java 2008-04-04 16:34:55 UTC (rev 4013)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientProducerImpl.java 2008-04-07 12:28:48 UTC (rev 4014)
@@ -139,7 +139,7 @@
windowSize--;
}
- remotingConnection.send(id, message, !msg.isDurable());
+ remotingConnection.send(id, session.getID(), message, !msg.isDurable());
if (rateLimiter != null)
{
Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java 2008-04-04 16:34:55 UTC (rev 4013)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java 2008-04-07 12:28:48 UTC (rev 4014)
@@ -283,7 +283,7 @@
//Now we send window size tokens to start the consumption
//We even send it if windowSize == -1, since we need to start the consumer
- remotingConnection.send(response.getConsumerID(), new ConsumerFlowTokenMessage(response.getWindowSize()), true);
+ remotingConnection.send(response.getConsumerID(), id, new ConsumerFlowTokenMessage(response.getWindowSize()), true);
return consumer;
}
@@ -409,7 +409,7 @@
if (deliveryExpired)
{
- remotingConnection.send(id, new SessionCancelMessage(lastID, true), true);
+ remotingConnection.send(id, id, new SessionCancelMessage(lastID, true), true);
toAckCount = 0;
}
@@ -801,7 +801,7 @@
SessionAcknowledgeMessage message = new SessionAcknowledgeMessage(lastID, !broken);
- remotingConnection.send(id, message, !block);
+ remotingConnection.send(id, id, message, !block);
acked = true;
}
Modified: trunk/src/main/org/jboss/messaging/core/client/impl/RemotingConnection.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/RemotingConnection.java 2008-04-04 16:34:55 UTC (rev 4013)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/RemotingConnection.java 2008-04-07 12:28:48 UTC (rev 4014)
@@ -20,9 +20,15 @@
public String getSessionID();
- AbstractPacket send(String id, AbstractPacket packet) throws MessagingException;
+ /**
+ * Use this method if the packet is to be executed in the context of the targetID (i.e. for
+ * sessions, connections & connections factories)
+ */
+ AbstractPacket send(String targetID, AbstractPacket packet) throws MessagingException;
+
+ AbstractPacket send(String targetID, String executorID, AbstractPacket packet) throws MessagingException;
- AbstractPacket send(String id, AbstractPacket packet, boolean oneWay) throws MessagingException;
+ AbstractPacket send(String targetID, String executorID, AbstractPacket packet, boolean oneWay) throws MessagingException;
void setFailureListener(FailureListener newListener);
Modified: trunk/src/main/org/jboss/messaging/core/client/impl/RemotingConnectionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/RemotingConnectionImpl.java 2008-04-04 16:34:55 UTC (rev 4013)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/RemotingConnectionImpl.java 2008-04-07 12:28:48 UTC (rev 4014)
@@ -126,19 +126,26 @@
return session.getID();
}
+ public AbstractPacket send(String targetID, AbstractPacket packet)
+ throws MessagingException
+ {
+ return send(targetID, targetID, packet);
+ }
+
/**
* send the packet and block until a response is received (<code>oneWay</code> is set to <code>false</code>)
*/
- public AbstractPacket send(final String id, final AbstractPacket packet) throws MessagingException
+ public AbstractPacket send(final String targetID, final String executorID, final AbstractPacket packet) throws MessagingException
{
- return send(id, packet, false);
+ return send(targetID, executorID, packet, false);
}
- public AbstractPacket send(final String id, final AbstractPacket packet, final boolean oneWay) throws MessagingException
+ public AbstractPacket send(final String targetID, final String executorID, final AbstractPacket packet, final boolean oneWay) throws MessagingException
{
assert packet != null;
- packet.setTargetID(id);
+ packet.setTargetID(targetID);
+ packet.setExecutorID(executorID);
AbstractPacket response;
Modified: trunk/src/main/org/jboss/messaging/core/remoting/PacketDispatcher.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/PacketDispatcher.java 2008-04-04 16:34:55 UTC (rev 4013)
+++ trunk/src/main/org/jboss/messaging/core/remoting/PacketDispatcher.java 2008-04-07 12:28:48 UTC (rev 4014)
@@ -21,7 +21,9 @@
void register(PacketHandler handler);
void unregister(String handlerID);
-
+
+ void setListener(PacketHandlerRegistrationListener listener);
+
void dispatch(Packet packet, PacketSender sender) throws Exception;
/** Call filters on a package */
Added: trunk/src/main/org/jboss/messaging/core/remoting/PacketHandlerRegistrationListener.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/PacketHandlerRegistrationListener.java (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/remoting/PacketHandlerRegistrationListener.java 2008-04-07 12:28:48 UTC (rev 4014)
@@ -0,0 +1,19 @@
+/*
+ * JBoss, Home of Professional Open Source
+ *
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
+ */
+package org.jboss.messaging.core.remoting;
+
+/**
+ * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
+ *
+ * @version <tt>$Revision$</tt>
+ *
+ */
+public interface PacketHandlerRegistrationListener
+{
+ void handlerRegistered(String handlerID);
+ void handlerUnregistered(String handlerID);
+}
Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/PacketDispatcherImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/PacketDispatcherImpl.java 2008-04-04 16:34:55 UTC (rev 4013)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/PacketDispatcherImpl.java 2008-04-07 12:28:48 UTC (rev 4014)
@@ -15,9 +15,10 @@
import java.util.concurrent.ConcurrentHashMap;
import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.remoting.Interceptor;
import org.jboss.messaging.core.remoting.PacketDispatcher;
-import org.jboss.messaging.core.remoting.Interceptor;
import org.jboss.messaging.core.remoting.PacketHandler;
+import org.jboss.messaging.core.remoting.PacketHandlerRegistrationListener;
import org.jboss.messaging.core.remoting.PacketSender;
import org.jboss.messaging.core.remoting.impl.wireformat.Packet;
@@ -28,14 +29,18 @@
*/
public class PacketDispatcherImpl implements PacketDispatcher, Serializable
{
+
// Constants -----------------------------------------------------
+ private static final long serialVersionUID = -4626926952268528384L;
+
public static final Logger log = Logger.getLogger(PacketDispatcherImpl.class);
// Attributes ----------------------------------------------------
private Map<String, PacketHandler> handlers;
public List<Interceptor> filters;
+ private transient PacketHandlerRegistrationListener listener;
// Static --------------------------------------------------------
@@ -55,8 +60,6 @@
}
// Public --------------------------------------------------------
-
-
/* (non-Javadoc)
* @see org.jboss.messaging.core.remoting.impl.IPacketDispatcher#register(org.jboss.messaging.core.remoting.PacketHandler)
@@ -72,6 +75,9 @@
{
log.debug("registered " + handler + " with ID " + handler.getID());
}
+
+ if (listener != null)
+ listener.handlerRegistered(handler.getID());
}
/* (non-Javadoc)
@@ -87,7 +93,15 @@
{
log.debug("unregistered handler for " + handlerID);
}
+
+ if (listener != null)
+ listener.handlerUnregistered(handlerID);
}
+
+ public void setListener(PacketHandlerRegistrationListener listener)
+ {
+ this.listener = listener;
+ }
public PacketHandler getHandler(String handlerID)
{
Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/codec/AbstractPacketCodec.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/codec/AbstractPacketCodec.java 2008-04-04 16:34:55 UTC (rev 4013)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/codec/AbstractPacketCodec.java 2008-04-07 12:28:48 UTC (rev 4014)
@@ -77,13 +77,19 @@
{
callbackID = null;
}
- int headerLength = LONG_LENGTH + sizeof(targetID) + sizeof(callbackID) + BOOLEAN_LENGTH;
+ String executorID = packet.getExecutorID();
+ if (NO_ID_SET.equals(executorID))
+ {
+ executorID = targetID;
+ }
+ int headerLength = LONG_LENGTH + sizeof(targetID) + sizeof(callbackID) + sizeof(executorID) + BOOLEAN_LENGTH;
buf.put(packet.getType().byteValue());
buf.putInt(headerLength);
buf.putLong(correlationID);
buf.putNullableString(targetID);
buf.putNullableString(callbackID);
+ buf.putNullableString(executorID);
buf.putBoolean(packet.isOneWay());
encodeBody(packet, buf);
@@ -139,6 +145,13 @@
{
return NOT_OK;
}
+ try
+ {
+ buffer.getNullableString();
+ } catch (CharacterCodingException e)
+ {
+ return NOT_OK;
+ }
buffer.getBoolean(); // oneWay boolean
if (buffer.remaining() < INT_LENGTH)
{
@@ -168,6 +181,7 @@
long correlationID = wrapper.getLong();
String targetID = wrapper.getNullableString();
String callbackID = wrapper.getNullableString();
+ String executorID = wrapper.getNullableString();
boolean oneWay = wrapper.getBoolean();
P packet = decodeBody(wrapper);
@@ -179,10 +193,16 @@
if (targetID == null)
targetID = NO_ID_SET;
packet.setTargetID(targetID);
- packet.setCorrelationID(correlationID);
+
if (callbackID == null)
callbackID = NO_ID_SET;
packet.setCallbackID(callbackID);
+
+ if (executorID == null)
+ executorID = targetID;
+ packet.setExecutorID(executorID);
+
+ packet.setCorrelationID(correlationID);
packet.setOneWay(oneWay);
return packet;
Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/FilterChainSupport.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/FilterChainSupport.java 2008-04-04 16:34:55 UTC (rev 4013)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/FilterChainSupport.java 2008-04-07 12:28:48 UTC (rev 4014)
@@ -17,7 +17,6 @@
import org.apache.mina.common.DefaultIoFilterChainBuilder;
import org.apache.mina.filter.codec.ProtocolCodecFilter;
-import org.apache.mina.filter.executor.ExecutorFilter;
import org.apache.mina.filter.keepalive.KeepAliveFilter;
import org.apache.mina.filter.logging.LoggingFilter;
import org.apache.mina.filter.logging.MdcInjectionFilter;
@@ -120,12 +119,6 @@
filterChain.addLast("logger", filter);
}
- static void addExecutorFilter(DefaultIoFilterChainBuilder filterChain)
- {
- ExecutorFilter executorFilter = new ExecutorFilter();
- filterChain.addLast("executor", executorFilter);
- }
-
static ScheduledExecutorService addBlockingRequestResponseFilter(
DefaultIoFilterChainBuilder filterChain)
{
Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaConnector.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaConnector.java 2008-04-04 16:34:55 UTC (rev 4013)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaConnector.java 2008-04-07 12:28:48 UTC (rev 4014)
@@ -8,7 +8,6 @@
import static org.jboss.messaging.core.remoting.impl.mina.FilterChainSupport.addBlockingRequestResponseFilter;
import static org.jboss.messaging.core.remoting.impl.mina.FilterChainSupport.addCodecFilter;
-import static org.jboss.messaging.core.remoting.impl.mina.FilterChainSupport.addExecutorFilter;
import static org.jboss.messaging.core.remoting.impl.mina.FilterChainSupport.addKeepAliveFilter;
import static org.jboss.messaging.core.remoting.impl.mina.FilterChainSupport.addLoggingFilter;
import static org.jboss.messaging.core.remoting.impl.mina.FilterChainSupport.addMDCFilter;
@@ -18,6 +17,8 @@
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import org.apache.mina.common.CloseFuture;
@@ -38,7 +39,7 @@
import org.jboss.messaging.core.remoting.NIOSession;
import org.jboss.messaging.core.remoting.PacketDispatcher;
import org.jboss.messaging.core.remoting.RemotingException;
-import org.jboss.messaging.core.remoting.impl.wireformat.AbstractPacket;
+import org.jboss.messaging.core.remoting.impl.wireformat.Packet;
import org.jboss.messaging.core.remoting.impl.wireformat.Ping;
/**
@@ -57,10 +58,14 @@
private Configuration configuration;
- private NioSocketConnector connector;
+ private transient NioSocketConnector connector;
+ private PacketDispatcher dispatcher;
+
private ScheduledExecutorService blockingScheduler;
+ private ExecutorService threadPool;
+
private IoSession session;
private List<FailureListener> listeners = new ArrayList<FailureListener>();
@@ -85,6 +90,7 @@
assert keepAliveFactory != null;
this.configuration = configuration;
+ this.dispatcher = dispatcher;
this.connector = new NioSocketConnector();
DefaultIoFilterChainBuilder filterChain = connector.getFilterChain();
@@ -107,9 +113,6 @@
blockingScheduler = addBlockingRequestResponseFilter(filterChain);
addKeepAliveFilter(filterChain, keepAliveFactory, configuration.getKeepAliveInterval(),
configuration.getKeepAliveTimeout(), this);
- addExecutorFilter(filterChain);
-
- connector.setHandler(new MinaHandler(dispatcher, this, false));
connector.getSessionConfig().setKeepAlive(true);
connector.getSessionConfig().setReuseAddress(true);
}
@@ -122,6 +125,9 @@
{
return new MinaSession(session);
}
+
+ threadPool = Executors.newCachedThreadPool();
+ connector.setHandler(new MinaHandler(dispatcher, threadPool, this, false));
InetSocketAddress address = new InetSocketAddress(configuration.getHost(), configuration.getPort());
ConnectFuture future = connector.connect(address);
connector.setDefaultRemoteAddress(address);
@@ -134,7 +140,7 @@
throw new IOException("Cannot connect to " + address.toString());
}
this.session = future.getSession();
- AbstractPacket packet = new Ping(Long.toString(session.getId()));
+ Packet packet = new Ping(Long.toString(session.getId()));
session.write(packet);
return new MinaSession(session);
@@ -153,7 +159,8 @@
blockingScheduler.shutdown();
connector.removeListener(ioListener);
connector.dispose();
-
+ threadPool.shutdown();
+
SslFilter sslFilter = (SslFilter) session.getFilterChain().get("ssl");
// FIXME without this hack, exceptions are thrown:
// "Unexpected exception from SSLEngine.closeInbound()." -> because the ssl session is not stopped
Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaHandler.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaHandler.java 2008-04-04 16:34:55 UTC (rev 4013)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaHandler.java 2008-04-07 12:28:48 UTC (rev 4014)
@@ -6,17 +6,24 @@
*/
package org.jboss.messaging.core.remoting.impl.mina;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+
import org.apache.mina.common.IoHandlerAdapter;
import org.apache.mina.common.IoSession;
import org.apache.mina.filter.reqres.Response;
import org.jboss.messaging.core.exception.MessagingException;
import org.jboss.messaging.core.logging.Logger;
import org.jboss.messaging.core.remoting.PacketDispatcher;
+import org.jboss.messaging.core.remoting.PacketHandlerRegistrationListener;
import org.jboss.messaging.core.remoting.PacketSender;
import org.jboss.messaging.core.remoting.RemotingException;
import org.jboss.messaging.core.remoting.impl.wireformat.AbstractPacket;
import org.jboss.messaging.core.remoting.impl.wireformat.Packet;
import org.jboss.messaging.core.remoting.impl.wireformat.Ping;
+import org.jboss.messaging.util.OrderedExecutorFactory;
/**
* @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
@@ -24,7 +31,7 @@
* @version <tt>$Revision$</tt>
*
*/
-public class MinaHandler extends IoHandlerAdapter
+public class MinaHandler extends IoHandlerAdapter implements PacketHandlerRegistrationListener
{
// Constants -----------------------------------------------------
@@ -38,19 +45,40 @@
private boolean closeSessionOnExceptionCaught;
+ private OrderedExecutorFactory executorFactory;
+
+ private Map<String, Executor> executors = new HashMap<String, Executor>();
+
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
-
- public MinaHandler(PacketDispatcher dispatcher, FailureNotifier failureNotifier, boolean closeSessionOnExceptionCaught)
+ public MinaHandler(PacketDispatcher dispatcher, ExecutorService executorService, FailureNotifier failureNotifier, boolean closeSessionOnExceptionCaught)
{
+ assert dispatcher!= null;
+ assert executorService != null;
+
this.dispatcher = dispatcher;
this.failureNotifier = failureNotifier;
this.closeSessionOnExceptionCaught = closeSessionOnExceptionCaught;
+
+ this.executorFactory = new OrderedExecutorFactory(executorService);
+ this.dispatcher.setListener(this);
}
// Public --------------------------------------------------------
+ // PacketHandlerRegistrationListener implementation --------------
+
+ public void handlerRegistered(String handlerID)
+ {
+ // do nothing on registration
+ }
+
+ public void handlerUnregistered(String handlerID)
+ {
+ executors.remove(handlerID);
+ }
+
// IoHandlerAdapter overrides ------------------------------------
@Override
@@ -73,7 +101,7 @@
}
@Override
- public void messageReceived(final IoSession session, Object message)
+ public void messageReceived(final IoSession session, final Object message)
throws Exception
{
if (message instanceof Response)
@@ -92,17 +120,51 @@
// response is handled by the keep-alive filter.
// do nothing
return;
- }
-
- if (!(message instanceof AbstractPacket))
+ }
+
+ if (!(message instanceof Packet))
{
throw new IllegalArgumentException("Unknown message type: " + message);
}
-
+ final Packet packet = (Packet) message;
+ String executorID = packet.getExecutorID();
- AbstractPacket packet = (AbstractPacket) message;
+ if (AbstractPacket.NO_ID_SET.equals(executorID))
+ throw new IllegalArgumentException("executor ID not set for " + packet);
+
+ Executor executor = executors.get(executorID);
+ if (executor == null)
+ {
+ executor = this.executorFactory.getOrderedExecutor();
+ executors.put(executorID, executor);
+ }
+
+ executor.execute(new Runnable()
+ {
+ public void run()
+ {
+ try
+ {
+ messageReceived0(session, packet);
+ } catch (Exception e)
+ {
+ log.error("unexpected error", e);
+ }
+ }
+ });
+
+ }
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ private void messageReceived0(final IoSession session, Packet packet)
+ throws Exception
+ {
PacketSender sender = new PacketSender()
{
public void send(Packet p) throws Exception
@@ -127,12 +189,6 @@
dispatcher.dispatch(packet, sender);
}
-
- // Package protected ---------------------------------------------
-
- // Protected -----------------------------------------------------
-
- // Private -------------------------------------------------------
-
+
// Inner classes -------------------------------------------------
}
Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaService.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaService.java 2008-04-04 16:34:55 UTC (rev 4013)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaService.java 2008-04-07 12:28:48 UTC (rev 4014)
@@ -10,7 +10,6 @@
import static org.jboss.messaging.core.remoting.TransportType.INVM;
import static org.jboss.messaging.core.remoting.impl.RemotingConfigurationValidator.validate;
import static org.jboss.messaging.core.remoting.impl.mina.FilterChainSupport.addCodecFilter;
-import static org.jboss.messaging.core.remoting.impl.mina.FilterChainSupport.addExecutorFilter;
import static org.jboss.messaging.core.remoting.impl.mina.FilterChainSupport.addKeepAliveFilter;
import static org.jboss.messaging.core.remoting.impl.mina.FilterChainSupport.addLoggingFilter;
import static org.jboss.messaging.core.remoting.impl.mina.FilterChainSupport.addMDCFilter;
@@ -20,6 +19,8 @@
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import org.apache.mina.common.DefaultIoFilterChainBuilder;
import org.apache.mina.common.IdleStatus;
@@ -63,6 +64,8 @@
private PacketDispatcher dispatcher;
+ private ExecutorService threadPool;
+
private List<FailureListener> listeners = new ArrayList<FailureListener>();
private ServerKeepAliveFactory factory;
@@ -142,7 +145,6 @@
addLoggingFilter(filterChain);
addKeepAliveFilter(filterChain, factory,
config.getKeepAliveInterval(), config.getKeepAliveTimeout(), this);
- addExecutorFilter(filterChain);
// Bind
acceptor.setDefaultLocalAddress(new InetSocketAddress(config.getHost(), config.getPort()));
@@ -151,7 +153,8 @@
acceptor.getSessionConfig().setKeepAlive(true);
acceptor.setCloseOnDeactivation(false);
- acceptor.setHandler(new MinaHandler(dispatcher, this, true));
+ threadPool = Executors.newCachedThreadPool();
+ acceptor.setHandler(new MinaHandler(dispatcher, threadPool, this, true));
acceptor.bind();
acceptorListener = new MinaSessionListener();
acceptor.addListener(acceptorListener);
@@ -176,6 +179,7 @@
acceptor.unbind();
acceptor.dispose();
acceptor = null;
+ threadPool.shutdown();
}
REGISTRY.unregister(config);
Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/AbstractPacket.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/AbstractPacket.java 2008-04-04 16:34:55 UTC (rev 4013)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/AbstractPacket.java 2008-04-07 12:28:48 UTC (rev 4014)
@@ -31,6 +31,8 @@
private String callbackID = NO_ID_SET;
+ String executorID = NO_ID_SET;
+
private final PacketType type;
/**
@@ -92,6 +94,18 @@
{
return callbackID;
}
+
+ public String getExecutorID()
+ {
+ return executorID;
+ }
+
+ public void setExecutorID(String executorID)
+ {
+ assertValidID(executorID);
+
+ this.executorID = executorID;
+ }
public void setOneWay(boolean oneWay)
{
@@ -131,7 +145,7 @@
{
return "PACKET[type=" + type
+ ", correlationID=" + correlationID + ", targetID=" + targetID
- + ", callbackID=" + callbackID + ", oneWay=" + oneWay;
+ + ", callbackID=" + callbackID + ", executorID=" + executorID + ", oneWay=" + oneWay;
}
// Protected -----------------------------------------------------
Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/CreateConnectionResponse.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/CreateConnectionResponse.java 2008-04-04 16:34:55 UTC (rev 4013)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/CreateConnectionResponse.java 2008-04-07 12:28:48 UTC (rev 4014)
@@ -43,6 +43,12 @@
return connectionID;
}
+ @Override
+ public String toString()
+ {
+ return getParentString() + ", connectionID" + connectionID + "]";
+ }
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/Packet.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/Packet.java 2008-04-04 16:34:55 UTC (rev 4013)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/Packet.java 2008-04-07 12:28:48 UTC (rev 4014)
@@ -7,6 +7,7 @@
package org.jboss.messaging.core.remoting.impl.wireformat;
+
public interface Packet
{
// Public --------------------------------------------------------
@@ -21,10 +22,14 @@
void setTargetID(String targetID);
+ String getCallbackID();
+
void setCallbackID(String callbackID);
- String getCallbackID();
+ String getExecutorID();
+ void setExecutorID(String executorID);
+
void setOneWay(boolean oneWay);
boolean isOneWay();
@@ -35,6 +40,4 @@
* An AbstractPacket is a request if it has a target ID and a correlation ID
*/
public boolean isRequest();
-
-
}
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/DeliveryImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/DeliveryImpl.java 2008-04-04 16:34:55 UTC (rev 4013)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/DeliveryImpl.java 2008-04-07 12:28:48 UTC (rev 4014)
@@ -42,16 +42,20 @@
private final MessageReference reference;
+ private final String sessionID;
+
private final String consumerID;
private final long deliveryID;
private final PacketSender sender;
- public DeliveryImpl(final MessageReference reference, final String consumerID,
+ public DeliveryImpl(final MessageReference reference,
+ final String sessionID, final String consumerID,
final long deliveryID, final PacketSender sender)
{
this.reference = reference;
+ this.sessionID = sessionID;
this.consumerID = consumerID;
this.deliveryID = deliveryID;
this.sender = sender;
@@ -83,6 +87,7 @@
ConsumerDeliverMessage message = new ConsumerDeliverMessage(copy, deliveryID);
message.setTargetID(consumerID);
+ message.setExecutorID(sessionID);
sender.send(message);
}
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerProducerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerProducerImpl.java 2008-04-04 16:34:55 UTC (rev 4013)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerProducerImpl.java 2008-04-07 12:28:48 UTC (rev 4014)
@@ -108,7 +108,7 @@
Packet packet = new ProducerReceiveTokensMessage(1);
packet.setTargetID(id);
-
+ packet.setExecutorID(session.getID());
sender.send(packet);
}
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java 2008-04-04 16:34:55 UTC (rev 4013)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java 2008-04-07 12:28:48 UTC (rev 4014)
@@ -224,7 +224,7 @@
public synchronized void handleDelivery(final MessageReference ref, final ServerConsumer consumer) throws Exception
{
- Delivery delivery = new DeliveryImpl(ref, consumer.getID(), deliveryIDSequence++, sender);
+ Delivery delivery = new DeliveryImpl(ref, id, consumer.getID(), deliveryIDSequence++, sender);
deliveries.add(delivery);
Added: trunk/src/main/org/jboss/messaging/util/OrderedExecutorFactory.java
===================================================================
--- trunk/src/main/org/jboss/messaging/util/OrderedExecutorFactory.java (rev 0)
+++ trunk/src/main/org/jboss/messaging/util/OrderedExecutorFactory.java 2008-04-07 12:28:48 UTC (rev 4014)
@@ -0,0 +1,75 @@
+/*
+ * JBoss, Home of Professional Open Source
+ *
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
+ */
+package org.jboss.messaging.util;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.Set;
+import java.util.concurrent.Executor;
+
+/**
+ * This factory creates a hierarchy of Executor which shares the threads of the
+ * parent Executor (typically, the root parent is a Thread pool).
+ *
+ * @author <a href="david.lloyd at jboss.com">David Lloyd</a>
+ * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
+ *
+ * @version <tt>$Revision$</tt>
+ *
+ */
+public final class OrderedExecutorFactory
+{
+ private final Executor parent;
+ private final Set<ChildExecutor> runningChildren = Collections
+ .synchronizedSet(new HashSet<ChildExecutor>());
+
+ public OrderedExecutorFactory(final Executor parent)
+ {
+ this.parent = parent;
+ }
+
+ public Executor getOrderedExecutor()
+ {
+ return new ChildExecutor();
+ }
+
+ private final class ChildExecutor implements Executor, Runnable
+ {
+ private final LinkedList<Runnable> tasks = new LinkedList<Runnable>();
+
+ public void execute(Runnable command)
+ {
+ synchronized (tasks)
+ {
+ tasks.add(command);
+ if (tasks.size() == 1 && runningChildren.add(this))
+ {
+ parent.execute(this);
+ }
+ }
+ }
+
+ public void run()
+ {
+ for (;;)
+ {
+ final Runnable task;
+ synchronized (tasks)
+ {
+ task = tasks.poll();
+ if (task == null)
+ {
+ runningChildren.remove(this);
+ return;
+ }
+ }
+ task.run();
+ }
+ }
+ }
+}
Added: trunk/tests/src/org/jboss/messaging/core/remoting/impl/mina/integration/test/MinaHandlerOrderingTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/core/remoting/impl/mina/integration/test/MinaHandlerOrderingTest.java (rev 0)
+++ trunk/tests/src/org/jboss/messaging/core/remoting/impl/mina/integration/test/MinaHandlerOrderingTest.java 2008-04-07 12:28:48 UTC (rev 4014)
@@ -0,0 +1,130 @@
+/*
+ * JBoss, Home of Professional Open Source
+ *
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
+ */
+package org.jboss.messaging.core.remoting.impl.mina.integration.test;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static java.util.concurrent.TimeUnit.SECONDS;
+
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import junit.framework.TestCase;
+
+import org.jboss.messaging.core.remoting.PacketDispatcher;
+import org.jboss.messaging.core.remoting.impl.PacketDispatcherImpl;
+import org.jboss.messaging.core.remoting.impl.mina.MinaHandler;
+import org.jboss.messaging.core.remoting.impl.wireformat.TextPacket;
+import org.jboss.messaging.core.remoting.test.unit.TestPacketHandler;
+
+/**
+ * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
+ *
+ * @version <tt>$Revision$</tt>
+ *
+ */
+public class MinaHandlerOrderingTest extends TestCase
+{
+
+ private MinaHandler handler;
+ private ExecutorService threadPool;
+
+ private TestPacketHandler handler_1;
+ private TestPacketHandler handler_2;
+ private PacketDispatcher clientDispatcher;
+
+ // Constants -----------------------------------------------------
+
+ private static final int MANY_MESSAGES = 10000;
+
+ // Attributes ----------------------------------------------------
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ /**
+ * Test that when messages are sent to 2 different targetIDs,
+ * the messages are handled concurrently by the 2 PacketHandlers
+ */
+ public void testSerializationOrder() throws Exception
+ {
+ handler_1.expectMessage(2);
+ handler_2.expectMessage(MANY_MESSAGES);
+
+ TextPacket packet_1 = new TextPacket("testSerializationOrder handled by handle_1");
+ packet_1.setTargetID(handler_1.getID());
+ packet_1.setExecutorID(handler_1.getID());
+
+ // we send 1 packet to handler_1
+ // then many packets to handler_2
+ // and again 1 packet to handler_1
+ handler.messageReceived(null, packet_1);
+ for (int i = 0; i < MANY_MESSAGES; i++)
+ {
+ TextPacket packet_2 = new TextPacket(Integer.toString(i));
+ packet_2.setTargetID(handler_2.getID());
+ packet_2.setExecutorID(handler_2.getID());
+ handler.messageReceived(null, packet_2);
+ }
+ handler.messageReceived(null, packet_1);
+
+ // we expect to receive the 2 packets on handler_1
+ // *before* handler_2 received all its packets
+ assertTrue(handler_1.await(50, MILLISECONDS));
+ int size = handler_2.getPackets().size();
+ assertTrue("handler_2 should not have received all its message (size:" + size + ")", size < MANY_MESSAGES);
+
+ assertTrue(handler_2.await(5, SECONDS));
+ List<TextPacket> packetsReceivedByHandler_2 = handler_2.getPackets();
+ assertEquals(MANY_MESSAGES, packetsReceivedByHandler_2.size());
+ // we check that handler_2 receives all its messages in order:
+ for (int i = 0; i < MANY_MESSAGES; i++)
+ {
+ TextPacket p = packetsReceivedByHandler_2.get(i);
+ assertEquals(Integer.toString(i), p.getText());
+ }
+ }
+
+ // TestCase overrides --------------------------------------------
+
+ @Override
+ protected void setUp() throws Exception
+ {
+ clientDispatcher = new PacketDispatcherImpl();
+ threadPool = Executors.newCachedThreadPool();
+ handler = new MinaHandler(clientDispatcher, threadPool, null, true);
+
+ handler_1 = new TestPacketHandler();
+ clientDispatcher.register(handler_1);
+ handler_2 = new TestPacketHandler();
+ clientDispatcher.register(handler_2);
+ }
+
+ @Override
+ protected void tearDown() throws Exception
+ {
+ clientDispatcher.unregister(handler_1.getID());
+ clientDispatcher.unregister(handler_2.getID());
+ threadPool.shutdown();
+ handler_1 = null;
+ handler_2 = null;
+ clientDispatcher = null;
+ handler = null;
+ threadPool = null;
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+}
Modified: trunk/tests/src/org/jboss/messaging/core/remoting/impl/mina/integration/test/MinaHandlerTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/core/remoting/impl/mina/integration/test/MinaHandlerTest.java 2008-04-04 16:34:55 UTC (rev 4013)
+++ trunk/tests/src/org/jboss/messaging/core/remoting/impl/mina/integration/test/MinaHandlerTest.java 2008-04-07 12:28:48 UTC (rev 4014)
@@ -6,6 +6,11 @@
*/
package org.jboss.messaging.core.remoting.impl.mina.integration.test;
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
import junit.framework.TestCase;
import org.jboss.messaging.core.remoting.PacketDispatcher;
@@ -24,6 +29,7 @@
{
private MinaHandler handler;
+ private ExecutorService threadPool;
private TestPacketHandler packetHandler;
private PacketDispatcher clientDispatcher;
@@ -51,7 +57,8 @@
public void testReceiveUnhandledAbstractPacket() throws Exception
{
TextPacket packet = new TextPacket("testReceiveUnhandledAbstractPacket");
-
+ packet.setExecutorID(packetHandler.getID());
+
handler.messageReceived(null, packet);
assertEquals(0, packetHandler.getPackets().size());
@@ -59,12 +66,15 @@
public void testReceiveHandledAbstractPacket() throws Exception
{
+ packetHandler.expectMessage(1);
TextPacket packet = new TextPacket("testReceiveHandledAbstractPacket");
packet.setTargetID(packetHandler.getID());
+ packet.setExecutorID(packetHandler.getID());
handler.messageReceived(null, packet);
+ assertTrue(packetHandler.await(500, MILLISECONDS));
assertEquals(1, packetHandler.getPackets().size());
assertEquals(packet.getText(), packetHandler.getPackets().get(0)
.getText());
@@ -76,7 +86,8 @@
protected void setUp() throws Exception
{
clientDispatcher = new PacketDispatcherImpl();
- handler = new MinaHandler(clientDispatcher, null, true);
+ threadPool = Executors.newCachedThreadPool();
+ handler = new MinaHandler(clientDispatcher, threadPool, null, true);
packetHandler = new TestPacketHandler();
clientDispatcher.register(packetHandler);
@@ -86,9 +97,11 @@
protected void tearDown() throws Exception
{
clientDispatcher.unregister(packetHandler.getID());
+ threadPool.shutdown();
packetHandler = null;
clientDispatcher = null;
handler = null;
+ threadPool = null;
}
// Package protected ---------------------------------------------
Modified: trunk/tests/src/org/jboss/messaging/core/remoting/impl/wireformat/test/unit/PacketTypeTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/core/remoting/impl/wireformat/test/unit/PacketTypeTest.java 2008-04-04 16:34:55 UTC (rev 4013)
+++ trunk/tests/src/org/jboss/messaging/core/remoting/impl/wireformat/test/unit/PacketTypeTest.java 2008-04-07 12:28:48 UTC (rev 4014)
@@ -287,14 +287,14 @@
assertEquals(buffer.get(), packet.getType().byteValue());
- String targetID = packet.getTargetID();
- if (NO_ID_SET.equals(packet.getTargetID()))
- targetID = null;
- String callbackID = packet.getCallbackID();
- if (NO_ID_SET.equals(packet.getCallbackID()))
- callbackID = null;
+ String targetID = (packet.getTargetID().equals(NO_ID_SET) ? null : packet
+ .getTargetID());
+ String callbackID = (packet.getCallbackID().equals(NO_ID_SET) ? null
+ : packet.getCallbackID());
+ String executorID = (packet.getExecutorID().equals(NO_ID_SET) ? null
+ : packet.getExecutorID());
- int headerLength = LONG_LENGTH + sizeof(targetID) + sizeof(callbackID)
+ int headerLength = LONG_LENGTH + sizeof(targetID) + sizeof(callbackID) + sizeof(executorID)
+ BOOLEAN_LENGTH;
assertEquals(buffer.getInt(), headerLength);
assertEquals(buffer.getLong(), packet.getCorrelationID());
@@ -305,10 +305,14 @@
String bufferCallbackID = buffer.getNullableString();
if (bufferCallbackID == null)
bufferCallbackID = NO_ID_SET;
+ String bufferExecutorID = buffer.getNullableString();
+ if (bufferExecutorID == null)
+ bufferExecutorID = NO_ID_SET;
boolean oneWay = buffer.getBoolean();
assertEquals(bufferTargetID, packet.getTargetID());
assertEquals(bufferCallbackID, packet.getCallbackID());
+ assertEquals(bufferExecutorID, packet.getExecutorID());
assertEquals(oneWay, packet.isOneWay());
}
@@ -318,8 +322,10 @@
.getTargetID());
String callbackID = (packet.getCallbackID().equals(NO_ID_SET) ? null
: packet.getCallbackID());
+ String executorID = (packet.getExecutorID().equals(NO_ID_SET) ? null
+ : packet.getExecutorID());
- int headerLength = LONG_LENGTH + sizeof(targetID) + sizeof(callbackID)
+ int headerLength = LONG_LENGTH + sizeof(targetID) + sizeof(callbackID) + sizeof(executorID)
+ BOOLEAN_LENGTH;
ByteBuffer expected = ByteBuffer.allocate(1 + 1 + INT_LENGTH
+ headerLength);
@@ -329,6 +335,7 @@
expected.putLong(packet.getCorrelationID());
putNullableString(targetID, expected);
putNullableString(callbackID, expected);
+ putNullableString(executorID, expected);
expected.put(packet.isOneWay() ? TRUE : FALSE);
expected.flip();
@@ -363,6 +370,7 @@
packet.setCallbackID(randomString());
packet.setCorrelationID(randomLong());
packet.setTargetID(randomString());
+ packet.setExecutorID(randomString());
AbstractPacketCodec<AbstractPacket> codec = PacketCodecFactory
.createCodecForEmptyPacket(NULL, NullPacket.class);
Modified: trunk/tests/src/org/jboss/messaging/core/remoting/test/unit/PacketDispatcherTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/core/remoting/test/unit/PacketDispatcherTest.java 2008-04-04 16:34:55 UTC (rev 4013)
+++ trunk/tests/src/org/jboss/messaging/core/remoting/test/unit/PacketDispatcherTest.java 2008-04-07 12:28:48 UTC (rev 4014)
@@ -15,6 +15,7 @@
import junit.framework.TestCase;
import org.jboss.messaging.core.remoting.PacketHandler;
+import org.jboss.messaging.core.remoting.PacketHandlerRegistrationListener;
import org.jboss.messaging.core.remoting.PacketSender;
import org.jboss.messaging.core.remoting.impl.PacketDispatcherImpl;
import org.jboss.messaging.core.remoting.impl.wireformat.TextPacket;
@@ -113,6 +114,26 @@
verify(handler, sender);
}
+ public void testRegistrationListener() throws Exception
+ {
+ PacketHandlerRegistrationListener listener = createMock(PacketHandlerRegistrationListener.class);
+ PacketHandler handler = createMock(PacketHandler.class);
+
+ String id = randomUUID().toString();
+ expect(handler.getID()).andStubReturn(id);
+ listener.handlerRegistered(id);
+ expectLastCall().once();
+ listener.handlerUnregistered(id);
+ expectLastCall().once();
+
+ replay(handler, listener);
+
+ dispatcher.setListener(listener);
+ dispatcher.register(handler);
+ dispatcher.unregister(id);
+
+ verify(handler, listener);
+ }
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
More information about the jboss-cvs-commits
mailing list