[jboss-cvs] JBoss Messaging SVN: r4204 - in trunk: src/main/org/jboss/messaging/core/client/impl and 3 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Thu May 15 08:20:26 EDT 2008
Author: timfox
Date: 2008-05-15 08:20:26 -0400 (Thu, 15 May 2008)
New Revision: 4204
Modified:
trunk/src/main/org/jboss/messaging/core/client/ClientSession.java
trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java
trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaConnector.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaHandler.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaService.java
trunk/src/main/org/jboss/messaging/jms/client/JBossSession.java
trunk/tests/src/org/jboss/messaging/tests/integration/core/remoting/mina/MinaHandlerOrderingTest.java
trunk/tests/src/org/jboss/messaging/tests/integration/core/remoting/mina/MinaHandlerTest.java
Log:
Don't order on messages received
Modified: trunk/src/main/org/jboss/messaging/core/client/ClientSession.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/ClientSession.java 2008-05-15 11:38:09 UTC (rev 4203)
+++ trunk/src/main/org/jboss/messaging/core/client/ClientSession.java 2008-05-15 12:20:26 UTC (rev 4204)
@@ -35,7 +35,7 @@
SessionBindingQueryResponseMessage bindingQuery(SimpleString address) throws MessagingException;
ClientConsumer createConsumer(SimpleString queueName, SimpleString filterString, boolean noLocal,
- boolean autoDeleteQueue) throws MessagingException;
+ boolean autoDeleteQueue, boolean direct) throws MessagingException;
ClientConsumer createConsumer(SimpleString queueName) throws MessagingException;
Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java 2008-05-15 11:38:09 UTC (rev 4203)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java 2008-05-15 12:20:26 UTC (rev 4204)
@@ -72,6 +72,8 @@
private final PriorityLinkedList<ClientMessage> buffer = new PriorityLinkedListImpl<ClientMessage>(10);
+ private final boolean direct;
+
private volatile Thread receiverThread;
private volatile Thread onMessageThread;
@@ -83,6 +85,8 @@
private volatile long ignoreDeliveryMark = -1;
private volatile int tokensToSend;
+
+
// Constructors
// ---------------------------------------------------------------------------------
@@ -100,7 +104,8 @@
final long clientTargetID,
final ExecutorService sessionExecutor,
final RemotingConnection remotingConnection,
- final int tokenBatchSize)
+ final int tokenBatchSize,
+ final boolean direct)
{
this.targetID = targetID;
@@ -113,6 +118,8 @@
this.remotingConnection = remotingConnection;
this.tokenBatchSize = tokenBatchSize;
+
+ this.direct = direct;
}
// ClientConsumer implementation
@@ -309,8 +316,7 @@
if (handler != null)
{
- //TODO
- if (true)
+ if (direct)
{
//Dispatch it directly on remoting thread
Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java 2008-05-15 11:38:09 UTC (rev 4203)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java 2008-05-15 12:20:26 UTC (rev 4204)
@@ -259,11 +259,11 @@
public ClientConsumer createConsumer(final SimpleString queueName) throws MessagingException
{
- return createConsumer(queueName, null, false, false);
+ return createConsumer(queueName, null, false, false, false);
}
public ClientConsumer createConsumer(final SimpleString queueName, final SimpleString filterString, final boolean noLocal,
- final boolean autoDeleteQueue) throws MessagingException
+ final boolean autoDeleteQueue, final boolean direct) throws MessagingException
{
checkClosed();
@@ -278,7 +278,7 @@
int tokenBatchSize = response.getWindowSize() == -1 ? 0 : 1;
ClientConsumerInternal consumer =
- new ClientConsumerImpl(this, response.getConsumerTargetID(), clientTargetID, executor, remotingConnection, tokenBatchSize);
+ new ClientConsumerImpl(this, response.getConsumerTargetID(), clientTargetID, executor, remotingConnection, tokenBatchSize, direct);
consumers.put(response.getConsumerTargetID(), consumer);
Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaConnector.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaConnector.java 2008-05-15 11:38:09 UTC (rev 4203)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaConnector.java 2008-05-15 12:20:26 UTC (rev 4204)
@@ -151,7 +151,9 @@
}
threadPool = Executors.newCachedThreadPool();
- handler = new MinaHandler(dispatcher, threadPool, this, false);
+ //We don't order executions in the handler for messages received - this is done in the ClientConsumeImpl
+ //since they are put on the queue in order
+ handler = new MinaHandler(dispatcher, threadPool, this, false, false);
connector.setHandler(handler);
InetSocketAddress address = new InetSocketAddress(location.getHost(), location.getPort());
ConnectFuture future = connector.connect(address);
Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaHandler.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaHandler.java 2008-05-15 11:38:09 UTC (rev 4203)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaHandler.java 2008-05-15 12:20:26 UTC (rev 4204)
@@ -51,14 +51,15 @@
// Note! must use ConcurrentMap here to avoid race condition
private final ConcurrentMap<Long, Executor> executors = new ConcurrentHashMap<Long, Executor>();
-
+
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
public MinaHandler(final PacketDispatcher dispatcher,
- final ExecutorService executorService,
- final CleanUpNotifier failureNotifier,
- final boolean closeSessionOnExceptionCaught)
+ final ExecutorService executorService,
+ final CleanUpNotifier failureNotifier,
+ final boolean closeSessionOnExceptionCaught,
+ final boolean useExecutor)
{
assert dispatcher != null;
assert executorService != null;
@@ -66,8 +67,14 @@
this.dispatcher = dispatcher;
this.failureNotifier = failureNotifier;
this.closeSessionOnExceptionCaught = closeSessionOnExceptionCaught;
-
- this.executorFactory = new OrderedExecutorFactory(executorService);
+ if (useExecutor)
+ {
+ this.executorFactory = new OrderedExecutorFactory(executorService);
+ }
+ else
+ {
+ this.executorFactory = null;
+ }
this.dispatcher.setListener(this);
}
@@ -112,35 +119,44 @@
throws Exception
{
final Packet packet = (Packet) message;
- long executorID = packet.getExecutorID();
-
- Executor executor = executors.get(executorID);
- if (executor == null)
+
+ if (executorFactory != null)
{
- executor = executorFactory.getOrderedExecutor();
-
- Executor oldExecutor = executors.putIfAbsent(executorID, executor);
-
- if (oldExecutor != null)
+
+ long executorID = packet.getExecutorID();
+
+ Executor executor = executors.get(executorID);
+ if (executor == null)
{
- //Avoid race
- executor = oldExecutor;
+ executor = executorFactory.getOrderedExecutor();
+
+ Executor oldExecutor = executors.putIfAbsent(executorID, executor);
+
+ if (oldExecutor != null)
+ {
+ //Avoid race
+ executor = oldExecutor;
+ }
}
- }
-
- executor.execute(new Runnable()
- {
- public void run()
+
+ executor.execute(new Runnable()
{
- try
+ public void run()
{
- messageReceivedInternal(session, packet);
- } catch (Exception e)
- {
- log.error("unexpected error", e);
+ try
+ {
+ messageReceivedInternal(session, packet);
+ } catch (Exception e)
+ {
+ log.error("unexpected error", e);
+ }
}
- }
- });
+ });
+ }
+ else
+ {
+ messageReceivedInternal(session, packet);
+ }
}
private final int high = 2000;
Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaService.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaService.java 2008-05-15 11:38:09 UTC (rev 4203)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/MinaService.java 2008-05-15 12:20:26 UTC (rev 4204)
@@ -161,7 +161,7 @@
acceptor.setCloseOnDeactivation(false);
threadPool = Executors.newCachedThreadPool();
- acceptor.setHandler(new MinaHandler(dispatcher, threadPool, this, true));
+ acceptor.setHandler(new MinaHandler(dispatcher, threadPool, this, true, true));
acceptor.bind();
acceptorListener = new MinaSessionListener();
acceptor.addListener(acceptorListener);
Modified: trunk/src/main/org/jboss/messaging/jms/client/JBossSession.java
===================================================================
--- trunk/src/main/org/jboss/messaging/jms/client/JBossSession.java 2008-05-15 11:38:09 UTC (rev 4203)
+++ trunk/src/main/org/jboss/messaging/jms/client/JBossSession.java 2008-05-15 12:20:26 UTC (rev 4204)
@@ -487,7 +487,7 @@
throw new InvalidDestinationException("Queue " + dest.getName() + " does not exist");
}
- consumer = session.createConsumer(dest.getSimpleAddress(), coreFilterString, noLocal, false);
+ consumer = session.createConsumer(dest.getSimpleAddress(), coreFilterString, noLocal, false, false);
}
else
{
@@ -508,7 +508,7 @@
session.createQueue(dest.getSimpleAddress(), queueName, coreFilterString, false, false);
- consumer = session.createConsumer(queueName, null, noLocal, true);
+ consumer = session.createConsumer(queueName, null, noLocal, true, false);
}
else
{
@@ -570,7 +570,7 @@
}
}
- consumer = session.createConsumer(queueName, null, noLocal, false);
+ consumer = session.createConsumer(queueName, null, noLocal, false, false);
}
}
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/core/remoting/mina/MinaHandlerOrderingTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/core/remoting/mina/MinaHandlerOrderingTest.java 2008-05-15 11:38:09 UTC (rev 4203)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/core/remoting/mina/MinaHandlerOrderingTest.java 2008-05-15 12:20:26 UTC (rev 4204)
@@ -102,7 +102,7 @@
{
clientDispatcher = new PacketDispatcherImpl(null);
threadPool = Executors.newCachedThreadPool();
- handler = new MinaHandler(clientDispatcher, threadPool, null, true);
+ handler = new MinaHandler(clientDispatcher, threadPool, null, true, true);
handler_1 = new TestPacketHandler(23);
clientDispatcher.register(handler_1);
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/core/remoting/mina/MinaHandlerTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/core/remoting/mina/MinaHandlerTest.java 2008-05-15 11:38:09 UTC (rev 4203)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/core/remoting/mina/MinaHandlerTest.java 2008-05-15 12:20:26 UTC (rev 4204)
@@ -78,7 +78,7 @@
{
clientDispatcher = new PacketDispatcherImpl(null);
threadPool = Executors.newCachedThreadPool();
- handler = new MinaHandler(clientDispatcher, threadPool, null, true);
+ handler = new MinaHandler(clientDispatcher, threadPool, null, true, true);
packetHandler = new TestPacketHandler(23);
clientDispatcher.register(packetHandler);
More information about the jboss-cvs-commits
mailing list