[Jboss-cvs] JBoss Messaging SVN: r1205 - in trunk: src/main/org/jboss/jms/server src/main/org/jboss/jms/server/endpoint src/main/org/jboss/messaging/core tests/src/org/jboss/test/messaging/jms/stress
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Mon Aug 7 17:50:28 EDT 2006
Author: timfox
Date: 2006-08-07 17:50:20 -0400 (Mon, 07 Aug 2006)
New Revision: 1205
Modified:
trunk/src/main/org/jboss/jms/server/QueuedExecutorPool.java
trunk/src/main/org/jboss/jms/server/ServerPeer.java
trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java
trunk/src/main/org/jboss/messaging/core/ChannelSupport.java
trunk/tests/src/org/jboss/test/messaging/jms/stress/RelayStressTest.java
Log:
http://jira.jboss.com/jira/browse/JBMESSAGING-493
Modified: trunk/src/main/org/jboss/jms/server/QueuedExecutorPool.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/QueuedExecutorPool.java 2006-08-06 18:21:41 UTC (rev 1204)
+++ trunk/src/main/org/jboss/jms/server/QueuedExecutorPool.java 2006-08-07 21:50:20 UTC (rev 1205)
@@ -23,6 +23,7 @@
import org.jboss.messaging.util.RotatingPool;
+import EDU.oswego.cs.dl.util.concurrent.LinkedQueue;
import EDU.oswego.cs.dl.util.concurrent.QueuedExecutor;
/**
@@ -43,7 +44,11 @@
protected Object createEntry()
{
- return new QueuedExecutor();
+ //We make sure the executor queue is not bounded
+ //otherwise we could end up with everything grinding to a halt if the
+ //same executor is shared by many consumers and it gets full
+ //The default bounded linked queue will block
+ return new QueuedExecutor(new LinkedQueue());
}
public void shutdown()
Modified: trunk/src/main/org/jboss/jms/server/ServerPeer.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/ServerPeer.java 2006-08-06 18:21:41 UTC (rev 1204)
+++ trunk/src/main/org/jboss/jms/server/ServerPeer.java 2006-08-07 21:50:20 UTC (rev 1205)
@@ -94,7 +94,7 @@
protected String defaultQueueJNDIContext;
protected String defaultTopicJNDIContext;
- protected int queuedExecutorPoolSize = 50;
+ protected int queuedExecutorPoolSize = 200;
protected boolean started;
Modified: trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java 2006-08-06 18:21:41 UTC (rev 1204)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java 2006-08-07 21:50:20 UTC (rev 1205)
@@ -143,13 +143,13 @@
// the client before the client has fully finished creating the MessageCallbackHandler.
this.clientConsumerFull = true;
- // We allocate an executor for this consumer based on the destination name so that all
- // consumers for the same destination currently use the same executor (we can change this if
- // need be). Note that they do not use the same executor as the channel of the destination.
+ // We allocate an executor from the rotating pool for each consumer based on it's id
+ // This gives better latency than each consumer for the destination using the same
+ // executor
QueuedExecutorPool pool =
sessionEndpoint.getConnectionEndpoint().getServerPeer().getQueuedExecutorPool();
- this.executor = (QueuedExecutor)pool.get("consumer" + dest.getName());
+ this.executor = (QueuedExecutor)pool.get("consumer" + id);
// Note that using a PooledExecutor with a linked queue is not sufficient to ensure that
// deliveries for the same consumer happen serially, since even if they are queued serially
Modified: trunk/src/main/org/jboss/messaging/core/ChannelSupport.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/ChannelSupport.java 2006-08-06 18:21:41 UTC (rev 1204)
+++ trunk/src/main/org/jboss/messaging/core/ChannelSupport.java 2006-08-07 21:50:20 UTC (rev 1205)
@@ -206,20 +206,27 @@
Future result = new Future();
- try
- {
- // Instead of executing directly, we add the handle request to the event queue.
- // Since remoting doesn't currently handle non blocking IO, we still have to wait for the
- // result, but when remoting does, we can use a full SEDA approach and get even better
- // throughput.
- this.executor.execute(new HandleRunnable(result, sender, r, tx));
+ if (tx == null)
+ {
+ try
+ {
+ // Instead of executing directly, we add the handle request to the event queue.
+ // Since remoting doesn't currently handle non blocking IO, we still have to wait for the
+ // result, but when remoting does, we can use a full SEDA approach and get even better
+ // throughput.
+ this.executor.execute(new HandleRunnable(result, sender, r));
+ }
+ catch (InterruptedException e)
+ {
+ log.warn("Thread interrupted", e);
+ }
+
+ return (Delivery)result.getResult();
}
- catch (InterruptedException e)
+ else
{
- log.warn("Thread interrupted", e);
+ return this.handleInternal(sender, r, tx);
}
-
- return (Delivery)result.getResult();
}
// DeliveryObserver implementation --------------------------
@@ -1728,19 +1735,16 @@
Routable routable;
- Transaction tx;
-
- HandleRunnable(Future result, DeliveryObserver sender, Routable routable, Transaction tx)
+ HandleRunnable(Future result, DeliveryObserver sender, Routable routable)
{
this.result = result;
this.sender = sender;
this.routable = routable;
- this.tx = tx;
}
public void run()
{
- Delivery d = handleInternal(sender, routable, tx);
+ Delivery d = handleInternal(sender, routable, null);
result.setResult(d);
}
}
Modified: trunk/tests/src/org/jboss/test/messaging/jms/stress/RelayStressTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/stress/RelayStressTest.java 2006-08-06 18:21:41 UTC (rev 1204)
+++ trunk/tests/src/org/jboss/test/messaging/jms/stress/RelayStressTest.java 2006-08-07 21:50:20 UTC (rev 1205)
@@ -80,11 +80,11 @@
Topic topic = (Topic)ic.lookup("/topic/StressTestTopic");
- final int numMessages = 50000;
+ final int numMessages = 20000;
- final int numRelayers = 10;
+ final int numRelayers = 5;
- final int numConsumers = 50;
+ final int numConsumers = 20;
Connection conn = cf.createConnection();
More information about the jboss-cvs-commits
mailing list