[Jboss-cvs] JBoss Messaging SVN: r1205 - in trunk: src/main/org/jboss/jms/server src/main/org/jboss/jms/server/endpoint src/main/org/jboss/messaging/core tests/src/org/jboss/test/messaging/jms/stress

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Mon Aug 7 17:50:28 EDT 2006


Author: timfox
Date: 2006-08-07 17:50:20 -0400 (Mon, 07 Aug 2006)
New Revision: 1205

Modified:
   trunk/src/main/org/jboss/jms/server/QueuedExecutorPool.java
   trunk/src/main/org/jboss/jms/server/ServerPeer.java
   trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java
   trunk/src/main/org/jboss/messaging/core/ChannelSupport.java
   trunk/tests/src/org/jboss/test/messaging/jms/stress/RelayStressTest.java
Log:

http://jira.jboss.com/jira/browse/JBMESSAGING-493



Modified: trunk/src/main/org/jboss/jms/server/QueuedExecutorPool.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/QueuedExecutorPool.java	2006-08-06 18:21:41 UTC (rev 1204)
+++ trunk/src/main/org/jboss/jms/server/QueuedExecutorPool.java	2006-08-07 21:50:20 UTC (rev 1205)
@@ -23,6 +23,7 @@
 
 import org.jboss.messaging.util.RotatingPool;
 
+import EDU.oswego.cs.dl.util.concurrent.LinkedQueue;
 import EDU.oswego.cs.dl.util.concurrent.QueuedExecutor;
 
 /**
@@ -43,7 +44,11 @@
    
    protected Object createEntry()
    {
-      return new QueuedExecutor();
+      //We make sure the executor queue is not bounded
+      //otherwise we could end up with everything grinding to a halt if the
+      //same executor  is shared by many consumers and it gets full
+      //The default bounded linked queue will block
+      return new QueuedExecutor(new LinkedQueue());
    }
    
    public void shutdown()

Modified: trunk/src/main/org/jboss/jms/server/ServerPeer.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/ServerPeer.java	2006-08-06 18:21:41 UTC (rev 1204)
+++ trunk/src/main/org/jboss/jms/server/ServerPeer.java	2006-08-07 21:50:20 UTC (rev 1205)
@@ -94,7 +94,7 @@
    protected String defaultQueueJNDIContext;
    protected String defaultTopicJNDIContext;
    
-   protected int queuedExecutorPoolSize = 50;
+   protected int queuedExecutorPoolSize = 200;
 
    protected boolean started;
 

Modified: trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java	2006-08-06 18:21:41 UTC (rev 1204)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java	2006-08-07 21:50:20 UTC (rev 1205)
@@ -143,13 +143,13 @@
       // the client before the client has fully finished creating the MessageCallbackHandler.
       this.clientConsumerFull = true;
             
-      // We allocate an executor for this consumer based on the destination name so that all
-      // consumers for the same destination currently use the same executor (we can change this if
-      // need be). Note that they do not use the same executor as the channel of the destination.
+      // We allocate an executor from the rotating pool for each consumer based on it's id
+      // This gives better latency than each consumer for the destination using the same
+      // executor
       QueuedExecutorPool pool =
          sessionEndpoint.getConnectionEndpoint().getServerPeer().getQueuedExecutorPool();
       
-      this.executor = (QueuedExecutor)pool.get("consumer" + dest.getName());
+      this.executor = (QueuedExecutor)pool.get("consumer" + id);
              
       // Note that using a PooledExecutor with a linked queue is not sufficient to ensure that
       // deliveries for the same consumer happen serially, since even if they are queued serially

Modified: trunk/src/main/org/jboss/messaging/core/ChannelSupport.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/ChannelSupport.java	2006-08-06 18:21:41 UTC (rev 1204)
+++ trunk/src/main/org/jboss/messaging/core/ChannelSupport.java	2006-08-07 21:50:20 UTC (rev 1205)
@@ -206,20 +206,27 @@
       
       Future result = new Future();
 
-      try
-      {
-         // Instead of executing directly, we add the handle request to the event queue.
-         // Since remoting doesn't currently handle non blocking IO, we still have to wait for the
-         // result, but when remoting does, we can use a full SEDA approach and get even better
-         // throughput.
-         this.executor.execute(new HandleRunnable(result, sender, r, tx));
+      if (tx == null)
+      {         
+         try
+         {
+            // Instead of executing directly, we add the handle request to the event queue.
+            // Since remoting doesn't currently handle non blocking IO, we still have to wait for the
+            // result, but when remoting does, we can use a full SEDA approach and get even better
+            // throughput.
+            this.executor.execute(new HandleRunnable(result, sender, r));
+         }
+         catch (InterruptedException e)
+         {
+            log.warn("Thread interrupted", e);
+         }
+   
+         return (Delivery)result.getResult();
       }
-      catch (InterruptedException e)
+      else
       {
-         log.warn("Thread interrupted", e);
+         return this.handleInternal(sender, r, tx);
       }
-
-      return (Delivery)result.getResult();
    }
 
    // DeliveryObserver implementation --------------------------
@@ -1728,19 +1735,16 @@
 
       Routable routable;
 
-      Transaction tx;
-
-      HandleRunnable(Future result, DeliveryObserver sender, Routable routable, Transaction tx)
+      HandleRunnable(Future result, DeliveryObserver sender, Routable routable)
       {
          this.result = result;
          this.sender = sender;
          this.routable = routable;
-         this.tx = tx;
       }
 
       public void run()
       {
-         Delivery d = handleInternal(sender, routable, tx);
+         Delivery d = handleInternal(sender, routable, null);
          result.setResult(d);
       }
    }

Modified: trunk/tests/src/org/jboss/test/messaging/jms/stress/RelayStressTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/jms/stress/RelayStressTest.java	2006-08-06 18:21:41 UTC (rev 1204)
+++ trunk/tests/src/org/jboss/test/messaging/jms/stress/RelayStressTest.java	2006-08-07 21:50:20 UTC (rev 1205)
@@ -80,11 +80,11 @@
       
       Topic topic = (Topic)ic.lookup("/topic/StressTestTopic");
       
-      final int numMessages = 50000;
+      final int numMessages = 20000;
       
-      final int numRelayers = 10;
+      final int numRelayers = 5;
       
-      final int numConsumers = 50;
+      final int numConsumers = 20;
       
       Connection conn = cf.createConnection();
       




More information about the jboss-cvs-commits mailing list