[hornetq-commits] JBoss hornetq SVN: r8790 - in trunk: src/main/org/hornetq/core/config/impl and 14 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Mon Jan 11 13:07:21 EST 2010


Author: timfox
Date: 2010-01-11 13:07:20 -0500 (Mon, 11 Jan 2010)
New Revision: 8790

Modified:
   trunk/docs/user-manual/en/configuration-index.xml
   trunk/docs/user-manual/en/connection-ttl.xml
   trunk/src/main/org/hornetq/core/config/impl/ConfigurationImpl.java
   trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
   trunk/src/main/org/hornetq/core/remoting/Packet.java
   trunk/src/main/org/hornetq/core/remoting/impl/PacketDecoder.java
   trunk/src/main/org/hornetq/core/remoting/impl/RemotingConnectionImpl.java
   trunk/src/main/org/hornetq/core/remoting/impl/wireformat/PacketImpl.java
   trunk/src/main/org/hornetq/core/remoting/impl/wireformat/RollbackMessage.java
   trunk/src/main/org/hornetq/core/remoting/impl/wireformat/SessionCloseMessage.java
   trunk/src/main/org/hornetq/core/remoting/impl/wireformat/SessionXACommitMessage.java
   trunk/src/main/org/hornetq/core/remoting/impl/wireformat/SessionXAPrepareMessage.java
   trunk/src/main/org/hornetq/core/remoting/impl/wireformat/SessionXARollbackMessage.java
   trunk/src/main/org/hornetq/core/server/Queue.java
   trunk/src/main/org/hornetq/core/server/ServerSession.java
   trunk/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
   trunk/src/main/org/hornetq/core/server/cluster/impl/Redistributor.java
   trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
   trunk/src/main/org/hornetq/core/server/impl/LastValueQueue.java
   trunk/src/main/org/hornetq/core/server/impl/QueueFactoryImpl.java
   trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java
   trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
   trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
   trunk/tests/jms-tests/src/org/hornetq/jms/tests/BrowserTest.java
   trunk/tests/jms-tests/src/org/hornetq/jms/tests/MiscellaneousTest.java
   trunk/tests/src/org/hornetq/tests/integration/client/ReceiveImmediateTest.java
   trunk/tests/src/org/hornetq/tests/integration/cluster/failover/ReplicatedLargeMessageFailoverTest.java
   trunk/tests/src/org/hornetq/tests/timing/core/server/impl/QueueImplTest.java
   trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/FakeQueue.java
   trunk/tests/src/org/hornetq/tests/unit/core/server/impl/QueueImplTest.java
   trunk/tests/src/org/hornetq/tests/unit/core/server/impl/fakes/FakeQueueFactory.java
Log:
some packets now processed async + queue has not executor to minimise remoting threads being held for too long, also some other tweaks

Modified: trunk/docs/user-manual/en/configuration-index.xml
===================================================================
--- trunk/docs/user-manual/en/configuration-index.xml	2010-01-11 16:04:54 UTC (rev 8789)
+++ trunk/docs/user-manual/en/configuration-index.xml	2010-01-11 18:07:20 UTC (rev 8790)
@@ -14,7 +14,7 @@
 <!--                                                                               -->
 <!-- Red Hat, as the licensor of this document, waives the right to enforce,       -->
 <!-- and agrees not to assert, Section 4d of CC-BY-SA to the fullest extent        -->
-<!-- permitted by applicable law.                                                  -->
+<!-- permitted by applicable law.                    a                              -->
 <!-- ============================================================================= -->
 <chapter id="configuration-index">
     <title>Configuration Reference</title>
@@ -352,7 +352,7 @@
                             <entry>Should incoming packets on the server be handed off to a thread
                                 from the thread pool for processing or should they be handled on the
                                 remoting thread?</entry>
-                            <entry>false</entry>
+                            <entry>true</entry>
                         </row>
                         <row>
                             <entry><link linkend="transaction-config"

Modified: trunk/docs/user-manual/en/connection-ttl.xml
===================================================================
--- trunk/docs/user-manual/en/connection-ttl.xml	2010-01-11 16:04:54 UTC (rev 8789)
+++ trunk/docs/user-manual/en/connection-ttl.xml	2010-01-11 18:07:20 UTC (rev 8790)
@@ -160,10 +160,12 @@
         <title>Configuring Asynchronous Connection Execution</title>
         <para>By default, packets received on the server side are executed on the remoting
             thread.</para>
-        <para>It is possible instead to use a thread from a thread pool to handle the packents so
+        <para>It is possible instead to use a thread from a thread pool to handle some packets so
             that the remoting thread is not tied up for too long. However, please note that
-            processing operations asynchronously on another thread adds a little more latency. To
-            enable asynchronous connection execution, set the parameter <literal
+            processing operations asynchronously on another thread adds a little more latency.
+            Please note that most short running operations are always handled on the remoting thread for performance reasons.
+           
+            To enable asynchronous connection execution, set the parameter <literal
                 >async-connection-execution-enabled</literal> in <literal
                 >hornetq-configuration.xml</literal> to <literal>true</literal> (default value is
                 <literal>false</literal>).</para>

Modified: trunk/src/main/org/hornetq/core/config/impl/ConfigurationImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/config/impl/ConfigurationImpl.java	2010-01-11 16:04:54 UTC (rev 8789)
+++ trunk/src/main/org/hornetq/core/config/impl/ConfigurationImpl.java	2010-01-11 18:07:20 UTC (rev 8790)
@@ -72,7 +72,7 @@
 
    public static final long DEFAULT_CONNECTION_TTL_OVERRIDE = -1;
 
-   public static final boolean DEFAULT_ASYNC_CONNECTION_EXECUTION_ENABLED = false;
+   public static final boolean DEFAULT_ASYNC_CONNECTION_EXECUTION_ENABLED = true;
 
    public static final String DEFAULT_BINDINGS_DIRECTORY = "data/bindings";
 

Modified: trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java	2010-01-11 16:04:54 UTC (rev 8789)
+++ trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java	2010-01-11 18:07:20 UTC (rev 8790)
@@ -111,8 +111,6 @@
 
    private final Object notificationLock = new Object();
 
-   private final org.hornetq.utils.ExecutorFactory redistributorExecutorFactory;
-
    private final HierarchicalRepository<AddressSettings> addressSettingsRepository;
 
    private final HornetQServer server;
@@ -127,7 +125,6 @@
                          final boolean enableWildCardRouting,
                          final int idCacheSize,
                          final boolean persistIDCache,
-                         final ExecutorFactory orderedExecutorFactory,
                          final HierarchicalRepository<AddressSettings> addressSettingsRepository)
 
    {
@@ -156,8 +153,6 @@
 
       this.persistIDCache = persistIDCache;
 
-      redistributorExecutorFactory = orderedExecutorFactory;
-
       this.addressSettingsRepository = addressSettingsRepository;
 
       this.server = server;
@@ -350,7 +345,7 @@
 
                      if (redistributionDelay != -1)
                      {
-                        queue.addRedistributor(redistributionDelay, redistributorExecutorFactory.getExecutor());
+                        queue.addRedistributor(redistributionDelay);
                      }
                   }
                }
@@ -420,7 +415,7 @@
 
                      if (redistributionDelay != -1)
                      {
-                        queue.addRedistributor(redistributionDelay, redistributorExecutorFactory.getExecutor());
+                        queue.addRedistributor(redistributionDelay);
                      }
                   }
                }

Modified: trunk/src/main/org/hornetq/core/remoting/Packet.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/Packet.java	2010-01-11 16:04:54 UTC (rev 8789)
+++ trunk/src/main/org/hornetq/core/remoting/Packet.java	2010-01-11 18:07:20 UTC (rev 8790)
@@ -80,4 +80,6 @@
     * @return true if confirmation is required
     */
    boolean isRequiresConfirmations();
+   
+   boolean isAsyncExec();
 }

Modified: trunk/src/main/org/hornetq/core/remoting/impl/PacketDecoder.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/impl/PacketDecoder.java	2010-01-11 16:04:54 UTC (rev 8789)
+++ trunk/src/main/org/hornetq/core/remoting/impl/PacketDecoder.java	2010-01-11 18:07:20 UTC (rev 8790)
@@ -110,6 +110,7 @@
 import org.hornetq.core.remoting.impl.wireformat.SessionBindingQueryMessage;
 import org.hornetq.core.remoting.impl.wireformat.SessionBindingQueryResponseMessage;
 import org.hornetq.core.remoting.impl.wireformat.SessionCloseMessage;
+import org.hornetq.core.remoting.impl.wireformat.SessionCommitMessage;
 import org.hornetq.core.remoting.impl.wireformat.SessionConsumerCloseMessage;
 import org.hornetq.core.remoting.impl.wireformat.SessionConsumerFlowCreditMessage;
 import org.hornetq.core.remoting.impl.wireformat.SessionCreateConsumerMessage;
@@ -221,7 +222,7 @@
          }
          case SESS_COMMIT:
          {
-            packet = new PacketImpl(PacketImpl.SESS_COMMIT);
+            packet = new SessionCommitMessage();
             break;
          }
          case SESS_ROLLBACK:

Modified: trunk/src/main/org/hornetq/core/remoting/impl/RemotingConnectionImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/impl/RemotingConnectionImpl.java	2010-01-11 16:04:54 UTC (rev 8789)
+++ trunk/src/main/org/hornetq/core/remoting/impl/RemotingConnectionImpl.java	2010-01-11 18:07:20 UTC (rev 8790)
@@ -334,12 +334,12 @@
          channels.clear();
       }
    }
-   
+
    public void flushConfirmations()
    {
       synchronized (transferLock)
       {
-         for (Channel channel: channels.values())
+         for (Channel channel : channels.values())
          {
             channel.flushConfirmations();
          }
@@ -349,18 +349,16 @@
    // Buffer Handler implementation
    // ----------------------------------------------------
 
+   private volatile boolean executing;
+
    public void bufferReceived(final Object connectionID, final HornetQBuffer buffer)
    {
       final Packet packet = decoder.decode(buffer);
 
-      if (executor == null || packet.getType() == PacketImpl.PING)
+      if (packet.isAsyncExec() && executor != null)
       {
-         // Pings must always be handled out of band so we can send pings back to the client quickly
-         // otherwise they would get in the queue with everything else which might give an intolerable delay
-         doBufferReceived(packet);
-      }
-      else
-      {
+         executing = true;
+
          executor.execute(new Runnable()
          {
             public void run()
@@ -373,10 +371,24 @@
                {
                   RemotingConnectionImpl.log.error("Unexpected error", t);
                }
+
+               executing = false;
             }
          });
       }
-
+      else
+      {
+         //To prevent out of order execution if interleaving sync and async operations on same connection
+         while (executing)
+         {
+            Thread.yield();
+         }
+         
+         // Pings must always be handled out of band so we can send pings back to the client quickly
+         // otherwise they would get in the queue with everything else which might give an intolerable delay
+         doBufferReceived(packet);
+      }
+     
       dataReceived = true;
    }
 

Modified: trunk/src/main/org/hornetq/core/remoting/impl/wireformat/PacketImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/impl/wireformat/PacketImpl.java	2010-01-11 16:04:54 UTC (rev 8789)
+++ trunk/src/main/org/hornetq/core/remoting/impl/wireformat/PacketImpl.java	2010-01-11 18:07:20 UTC (rev 8790)
@@ -262,6 +262,11 @@
    {
       return true;
    }
+   
+   public boolean isAsyncExec()
+   {
+      return false;
+   }
 
    @Override
    public String toString()

Modified: trunk/src/main/org/hornetq/core/remoting/impl/wireformat/RollbackMessage.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/impl/wireformat/RollbackMessage.java	2010-01-11 16:04:54 UTC (rev 8789)
+++ trunk/src/main/org/hornetq/core/remoting/impl/wireformat/RollbackMessage.java	2010-01-11 18:07:20 UTC (rev 8790)
@@ -72,6 +72,11 @@
    {
       considerLastMessageAsDelivered = buffer.readBoolean();
    }
+   
+   public boolean isAsyncExec()
+   {
+      return true;
+   }
 
    // Static --------------------------------------------------------
 

Modified: trunk/src/main/org/hornetq/core/remoting/impl/wireformat/SessionCloseMessage.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/impl/wireformat/SessionCloseMessage.java	2010-01-11 16:04:54 UTC (rev 8789)
+++ trunk/src/main/org/hornetq/core/remoting/impl/wireformat/SessionCloseMessage.java	2010-01-11 18:07:20 UTC (rev 8790)
@@ -44,6 +44,12 @@
 
       return super.equals(other);
    }
+   
+   @Override
+   public boolean isAsyncExec()
+   {
+      return true;
+   }
 
    // Package protected ---------------------------------------------
 

Modified: trunk/src/main/org/hornetq/core/remoting/impl/wireformat/SessionXACommitMessage.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/impl/wireformat/SessionXACommitMessage.java	2010-01-11 16:04:54 UTC (rev 8789)
+++ trunk/src/main/org/hornetq/core/remoting/impl/wireformat/SessionXACommitMessage.java	2010-01-11 18:07:20 UTC (rev 8790)
@@ -62,6 +62,12 @@
    }
 
    @Override
+   public boolean isAsyncExec()
+   {
+      return true;
+   }
+   
+   @Override
    public void encodeRest(final HornetQBuffer buffer)
    {
       XidCodecSupport.encodeXid(xid, buffer);

Modified: trunk/src/main/org/hornetq/core/remoting/impl/wireformat/SessionXAPrepareMessage.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/impl/wireformat/SessionXAPrepareMessage.java	2010-01-11 16:04:54 UTC (rev 8789)
+++ trunk/src/main/org/hornetq/core/remoting/impl/wireformat/SessionXAPrepareMessage.java	2010-01-11 18:07:20 UTC (rev 8790)
@@ -64,6 +64,11 @@
    {
       xid = XidCodecSupport.decodeXid(buffer);
    }
+   
+   public boolean isAsyncExec()
+   {
+      return true;
+   }
 
    @Override
    public boolean equals(final Object other)

Modified: trunk/src/main/org/hornetq/core/remoting/impl/wireformat/SessionXARollbackMessage.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/impl/wireformat/SessionXARollbackMessage.java	2010-01-11 16:04:54 UTC (rev 8789)
+++ trunk/src/main/org/hornetq/core/remoting/impl/wireformat/SessionXARollbackMessage.java	2010-01-11 18:07:20 UTC (rev 8790)
@@ -52,6 +52,7 @@
    {
       return xid;
    }
+   
 
    @Override
    public void encodeRest(final HornetQBuffer buffer)
@@ -64,6 +65,11 @@
    {
       xid = XidCodecSupport.decodeXid(buffer);
    }
+   
+   public boolean isAsyncExec()
+   {
+      return true;
+   }
 
    @Override
    public boolean equals(final Object other)

Modified: trunk/src/main/org/hornetq/core/server/Queue.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/Queue.java	2010-01-11 16:04:54 UTC (rev 8789)
+++ trunk/src/main/org/hornetq/core/server/Queue.java	2010-01-11 18:07:20 UTC (rev 8790)
@@ -63,7 +63,7 @@
 
    void cancel(MessageReference reference) throws Exception;
 
-   void deliverAsync(Executor executor);
+   void deliverAsync();
 
    int getMessageCount();
 
@@ -116,7 +116,7 @@
 
    int moveReferences(Filter filter, SimpleString toAddress) throws Exception;
 
-   void addRedistributor(long delay, Executor executor);
+   void addRedistributor(long delay);
 
    void cancelRedistributor() throws Exception;
 
@@ -152,6 +152,8 @@
     * @return true if paused, false otherwise.
     */
    boolean isPaused();
+   
+   Executor getExecutor();
 
 
 }

Modified: trunk/src/main/org/hornetq/core/server/ServerSession.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/ServerSession.java	2010-01-11 16:04:54 UTC (rev 8789)
+++ trunk/src/main/org/hornetq/core/server/ServerSession.java	2010-01-11 18:07:20 UTC (rev 8790)
@@ -68,8 +68,6 @@
 
    void close() throws Exception;
 
-   void promptDelivery(Queue queue);
-
    void handleAcknowledge(final SessionAcknowledgeMessage packet);
 
    void handleExpired(final SessionExpiredMessage packet);

Modified: trunk/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java	2010-01-11 16:04:54 UTC (rev 8789)
+++ trunk/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java	2010-01-11 18:07:20 UTC (rev 8790)
@@ -540,7 +540,7 @@
 
                if (queue != null)
                {
-                  queue.deliverAsync(executor);
+                  queue.deliverAsync();
                }
             }
          }
@@ -683,7 +683,7 @@
 
             queue.addConsumer(BridgeImpl.this);
 
-            queue.deliverAsync(executor);
+            queue.deliverAsync();
 
             BridgeImpl.log.info("Bridge " + name + " is connected to its destination");
 
@@ -762,7 +762,7 @@
 
             if (queue != null)
             {
-               queue.deliverAsync(executor);
+               queue.deliverAsync();
             }
          }
          catch (Exception e)

Modified: trunk/src/main/org/hornetq/core/server/cluster/impl/Redistributor.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/cluster/impl/Redistributor.java	2010-01-11 16:04:54 UTC (rev 8789)
+++ trunk/src/main/org/hornetq/core/server/cluster/impl/Redistributor.java	2010-01-11 18:07:20 UTC (rev 8790)
@@ -189,7 +189,7 @@
          {
             active = true;
 
-            queue.deliverAsync(executor);
+            queue.deliverAsync();
          }
       }
    }

Modified: trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java	2010-01-11 16:04:54 UTC (rev 8789)
+++ trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java	2010-01-11 18:07:20 UTC (rev 8790)
@@ -633,8 +633,6 @@
 
       Channel channel = connection.getChannel(channelID, sendWindowSize);
 
-      Executor sessionExecutor = executorFactory.getExecutor();
-
       final ServerSessionImpl session = new ServerSessionImpl(name,
                                                               username,
                                                               password,
@@ -649,7 +647,6 @@
                                                               postOffice,
                                                               resourceManager,
                                                               securityStore,
-                                                              sessionExecutor,
                                                               channel,
                                                               managementService,
                                                               this,
@@ -657,10 +654,8 @@
 
       sessions.put(name, session);
 
-      // The executor on the OperationContext here has to be the same as the session, or we would have ordering issues
-      // on messages
       ServerSessionPacketHandler handler = new ServerSessionPacketHandler(session,
-                                                                          storageManager.newContext(sessionExecutor),
+                                                                          storageManager.newContext(executorFactory.getExecutor()),
                                                                           storageManager);
 
       session.setHandler(handler);
@@ -1030,8 +1025,9 @@
 
       if (ConfigurationImpl.DEFAULT_CLUSTER_USER.equals(configuration.getClusterUser()) && ConfigurationImpl.DEFAULT_CLUSTER_PASSWORD.equals(configuration.getClusterPassword()))
       {
-         log.warn("It has been detected that the cluster admin user and password which are used to " + "replicate management operation from one node to the other have not been changed from the installation default. "
-                  + "Please see the HornetQ user guide for instructions on how to do this.");
+         log.warn("Security risk! It has been detected that the cluster admin user and password "
+                  + "have not been changed from the installation default. "
+                  + "Please see the HornetQ user guide, cluster chapter, for instructions on how to do this.");
       }
 
       securityStore = new SecurityStoreImpl(securityRepository,
@@ -1059,7 +1055,6 @@
                                       configuration.isWildcardRoutingEnabled(),
                                       configuration.getIDCacheSize(),
                                       configuration.isPersistIDCache(),
-                                      executorFactory,
                                       addressSettingsRepository);
 
       messagingServerControl = managementService.registerServer(postOffice,

Modified: trunk/src/main/org/hornetq/core/server/impl/LastValueQueue.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/LastValueQueue.java	2010-01-11 16:04:54 UTC (rev 8789)
+++ trunk/src/main/org/hornetq/core/server/impl/LastValueQueue.java	2010-01-11 18:07:20 UTC (rev 8790)
@@ -14,6 +14,7 @@
 
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executor;
 import java.util.concurrent.ScheduledExecutorService;
 
 import org.hornetq.api.core.Message;
@@ -53,7 +54,8 @@
                          final ScheduledExecutorService scheduledExecutor,
                          final PostOffice postOffice,
                          final StorageManager storageManager,
-                         final HierarchicalRepository<AddressSettings> addressSettingsRepository)
+                         final HierarchicalRepository<AddressSettings> addressSettingsRepository,
+                         final Executor executor)
    {
       super(persistenceID,
             address,
@@ -64,7 +66,8 @@
             scheduledExecutor,
             postOffice,
             storageManager,
-            addressSettingsRepository);
+            addressSettingsRepository,
+            executor);
    }
 
    @Override

Modified: trunk/src/main/org/hornetq/core/server/impl/QueueFactoryImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/QueueFactoryImpl.java	2010-01-11 16:04:54 UTC (rev 8789)
+++ trunk/src/main/org/hornetq/core/server/impl/QueueFactoryImpl.java	2010-01-11 18:07:20 UTC (rev 8790)
@@ -43,6 +43,8 @@
    private PostOffice postOffice;
 
    private final StorageManager storageManager;
+   
+   private final ExecutorFactory executorFactory;
 
    public QueueFactoryImpl(final ExecutorFactory executorFactory,
                            final ScheduledExecutorService scheduledExecutor,
@@ -54,6 +56,8 @@
       this.scheduledExecutor = scheduledExecutor;
 
       this.storageManager = storageManager;
+      
+      this.executorFactory = executorFactory;
    }
 
    public void setPostOffice(final PostOffice postOffice)
@@ -82,7 +86,8 @@
                                     scheduledExecutor,
                                     postOffice,
                                     storageManager,
-                                    addressSettingsRepository);
+                                    addressSettingsRepository,
+                                    executorFactory.getExecutor());
       }
       else
       {
@@ -95,7 +100,8 @@
                                scheduledExecutor,
                                postOffice,
                                storageManager,
-                               addressSettingsRepository);
+                               addressSettingsRepository,
+                               executorFactory.getExecutor());
       }
 
       return queue;

Modified: trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java	2010-01-11 16:04:54 UTC (rev 8789)
+++ trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java	2010-01-11 18:07:20 UTC (rev 8790)
@@ -127,7 +127,7 @@
 
    private int pos;
 
-   private final boolean dontAdd;
+   private final Executor executor;
 
    public QueueImpl(final long id,
                     final SimpleString address,
@@ -138,7 +138,8 @@
                     final ScheduledExecutorService scheduledExecutor,
                     final PostOffice postOffice,
                     final StorageManager storageManager,
-                    final HierarchicalRepository<AddressSettings> addressSettingsRepository)
+                    final HierarchicalRepository<AddressSettings> addressSettingsRepository,
+                    final Executor executor)
    {
       this.id = id;
 
@@ -173,7 +174,7 @@
          expiryAddress = null;
       }
 
-      dontAdd = System.getProperty("org.hornetq.opt.dontadd") != null;
+      this.executor = executor;
    }
 
    // Bindable implementation -------------------------------------------------------------------------------------
@@ -235,7 +236,7 @@
       add(ref, true);
    }
 
-   public void deliverAsync(final Executor executor)
+   public void deliverAsync()
    {
       // Prevent too many executors running at once
 
@@ -244,7 +245,12 @@
          executor.execute(deliverRunner);
       }
    }
-
+   
+   public Executor getExecutor()
+   {
+      return executor;
+   }
+   
    public synchronized void deliverNow()
    {
       deliverRunner.run();
@@ -295,7 +301,7 @@
       return removed;
    }
 
-   public synchronized void addRedistributor(final long delay, final Executor executor)
+   public synchronized void addRedistributor(final long delay)
    {
       if (future != null)
       {
@@ -307,7 +313,7 @@
       if (redistributor != null)
       {
          // Just prompt delivery
-         deliverAsync(executor);
+         deliverAsync();
       }
 
       if (delay > 0)
@@ -919,7 +925,7 @@
 
          redistributor.start();
 
-         deliverAsync(executor);
+         deliverAsync();
       }
    }
 
@@ -1284,11 +1290,6 @@
 
    protected synchronized void add(final MessageReference ref, final boolean first)
    {
-      if (dontAdd)
-      {
-         return;
-      }
-
       if (!first)
       {
          messagesAdded.incrementAndGet();
@@ -1335,7 +1336,7 @@
             // We have consumers with filters which don't match, so we need
             // to prompt delivery every time
             // a new message arrives
-            deliver();
+            deliverAsync();
          }
       }
    }
@@ -1424,7 +1425,7 @@
             add(ref, true);
          }
 
-         deliver();
+         deliverAsync();
       }
    }
 
@@ -1549,7 +1550,7 @@
    {
       paused = false;
 
-      deliver();
+      deliverAsync();
    }
 
    public synchronized boolean isPaused()

Modified: trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java	2010-01-11 16:04:54 UTC (rev 8789)
+++ trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java	2010-01-11 18:07:20 UTC (rev 8790)
@@ -136,7 +136,6 @@
                              final Channel channel,
                              final boolean preAcknowledge,
                              final boolean strictUpdateDeliveryCount,
-                             final Executor executor,
                              final ManagementService managementService) throws Exception
    {
 
@@ -150,7 +149,7 @@
 
       messageQueue = binding.getQueue();
 
-      this.executor = executor;
+      this.executor = messageQueue.getExecutor();
 
       this.started = browseOnly || started;
 
@@ -598,7 +597,7 @@
             }
             else
             {
-               session.promptDelivery(messageQueue);
+               messageQueue.deliverAsync();
             }
          }
       }
@@ -660,7 +659,8 @@
                else
                {
                   // prompt Delivery only if chunk was finished
-                  session.promptDelivery(messageQueue);
+         
+                  messageQueue.deliverAsync();
                }
             }
          }

Modified: trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java	2010-01-11 16:04:54 UTC (rev 8789)
+++ trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java	2010-01-11 18:07:20 UTC (rev 8790)
@@ -22,7 +22,6 @@
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Executor;
 
 import javax.transaction.xa.XAException;
 import javax.transaction.xa.XAResource;
@@ -137,8 +136,6 @@
 
    private final Map<Long, ServerConsumer> consumers = new ConcurrentHashMap<Long, ServerConsumer>();
 
-   private final Executor executor;
-
    private Transaction tx;
 
    private final StorageManager storageManager;
@@ -190,7 +187,6 @@
                             final PostOffice postOffice,
                             final ResourceManager resourceManager,
                             final SecurityStore securityStore,
-                            final Executor executor,
                             final Channel channel,
                             final ManagementService managementService,
                             final HornetQServer server,
@@ -220,8 +216,6 @@
 
       this.securityStore = securityStore;
 
-      this.executor = executor;
-
       if (!xa)
       {
          tx = new TransactionImpl(storageManager);
@@ -338,11 +332,6 @@
       }
    }
 
-   public void promptDelivery(final Queue queue)
-   {
-      queue.deliverAsync(executor);
-   }
-
    public void handleCreateConsumer(final SessionCreateConsumerMessage packet)
    {
       SimpleString name = packet.getQueueName();
@@ -376,7 +365,6 @@
                                                           channel,
                                                           preAcknowledge,
                                                           strictUpdateDeliveryCount,
-                                                          executor,
                                                           managementService);
 
          consumers.put(consumer.getID(), consumer);

Modified: trunk/tests/jms-tests/src/org/hornetq/jms/tests/BrowserTest.java
===================================================================
--- trunk/tests/jms-tests/src/org/hornetq/jms/tests/BrowserTest.java	2010-01-11 16:04:54 UTC (rev 8789)
+++ trunk/tests/jms-tests/src/org/hornetq/jms/tests/BrowserTest.java	2010-01-11 18:07:20 UTC (rev 8790)
@@ -47,6 +47,7 @@
 
    // Public ---------------------------------------------------------------------------------------
 
+   
    public void testCreateBrowserOnNullDestination() throws Exception
    {
       Connection conn = null;

Modified: trunk/tests/jms-tests/src/org/hornetq/jms/tests/MiscellaneousTest.java
===================================================================
--- trunk/tests/jms-tests/src/org/hornetq/jms/tests/MiscellaneousTest.java	2010-01-11 16:04:54 UTC (rev 8789)
+++ trunk/tests/jms-tests/src/org/hornetq/jms/tests/MiscellaneousTest.java	2010-01-11 18:07:20 UTC (rev 8790)
@@ -55,6 +55,7 @@
 
    // Public --------------------------------------------------------
 
+  
    public void testBrowser() throws Exception
    {
       Connection conn = null;

Modified: trunk/tests/src/org/hornetq/tests/integration/client/ReceiveImmediateTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/client/ReceiveImmediateTest.java	2010-01-11 16:04:54 UTC (rev 8789)
+++ trunk/tests/src/org/hornetq/tests/integration/client/ReceiveImmediateTest.java	2010-01-11 18:07:20 UTC (rev 8790)
@@ -65,7 +65,7 @@
    {
       doConsumerReceiveImmediateWithNoMessages(false);
    }
-
+   
    public void testConsumerReceiveImmediate() throws Exception
    {
       doConsumerReceiveImmediate(false);

Modified: trunk/tests/src/org/hornetq/tests/integration/cluster/failover/ReplicatedLargeMessageFailoverTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/cluster/failover/ReplicatedLargeMessageFailoverTest.java	2010-01-11 16:04:54 UTC (rev 8789)
+++ trunk/tests/src/org/hornetq/tests/integration/cluster/failover/ReplicatedLargeMessageFailoverTest.java	2010-01-11 18:07:20 UTC (rev 8790)
@@ -13,6 +13,8 @@
 
 package org.hornetq.tests.integration.cluster.failover;
 
+import org.hornetq.core.logging.Logger;
+
 /**
  * A ReplicatedLargeMessageFailoverTest
  *
@@ -22,6 +24,7 @@
  */
 public class ReplicatedLargeMessageFailoverTest extends LargeMessageFailoverTest
 {
+   private static final Logger log = Logger.getLogger(ReplicatedLargeMessageFailoverTest.class);
 
    // Constants -----------------------------------------------------
 
@@ -31,6 +34,7 @@
 
    // Constructors --------------------------------------------------
 
+  
    public ReplicatedLargeMessageFailoverTest()
    {
       super();

Modified: trunk/tests/src/org/hornetq/tests/timing/core/server/impl/QueueImplTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/timing/core/server/impl/QueueImplTest.java	2010-01-11 16:04:54 UTC (rev 8789)
+++ trunk/tests/src/org/hornetq/tests/timing/core/server/impl/QueueImplTest.java	2010-01-11 18:07:20 UTC (rev 8790)
@@ -15,6 +15,7 @@
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
@@ -84,7 +85,8 @@
                                   scheduledExecutor,
                                   null,
                                   null,
-                                  null);
+                                  null,
+                                  Executors.newSingleThreadExecutor());
 
       // Send one scheduled
 
@@ -158,7 +160,8 @@
                                   scheduledExecutor,
                                   null,
                                   null,
-                                  null);
+                                  null,
+                                  Executors.newSingleThreadExecutor());
 
       FakeConsumer consumer = null;
 
@@ -273,7 +276,8 @@
                                   scheduledExecutor,
                                   null,
                                   null,
-                                  null);
+                                  null,
+                                  Executors.newSingleThreadExecutor());
       MessageReference messageReference = generateReference(queue, 1);
       queue.addConsumer(consumer);
       messageReference.setScheduledDeliveryTime(System.currentTimeMillis() + 2000);

Modified: trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/FakeQueue.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/FakeQueue.java	2010-01-11 16:04:54 UTC (rev 8789)
+++ trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/FakeQueue.java	2010-01-11 18:07:20 UTC (rev 8790)
@@ -82,9 +82,9 @@
    }
 
    /* (non-Javadoc)
-    * @see org.hornetq.core.server.Queue#addRedistributor(long, java.util.concurrent.Executor)
+    * @see org.hornetq.core.server.Queue#addRedistributor(long)
     */
-   public void addRedistributor(final long delay, final Executor executor)
+   public void addRedistributor(final long delay)
    {
       // TODO Auto-generated method stub
 
@@ -172,9 +172,9 @@
    }
 
    /* (non-Javadoc)
-    * @see org.hornetq.core.server.Queue#deliverAsync(java.util.concurrent.Executor)
+    * @see org.hornetq.core.server.Queue#deliverAsync()
     */
-   public void deliverAsync(final Executor executor)
+   public void deliverAsync()
    {
       // TODO Auto-generated method stub
 
@@ -520,4 +520,10 @@
       return false;
    }
 
+   public Executor getExecutor()
+   {
+      // TODO Auto-generated method stub
+      return null;
+   }
+
 }
\ No newline at end of file

Modified: trunk/tests/src/org/hornetq/tests/unit/core/server/impl/QueueImplTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/server/impl/QueueImplTest.java	2010-01-11 16:04:54 UTC (rev 8789)
+++ trunk/tests/src/org/hornetq/tests/unit/core/server/impl/QueueImplTest.java	2010-01-11 18:07:20 UTC (rev 8790)
@@ -18,6 +18,7 @@
 import java.util.LinkedList;
 import java.util.List;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 
@@ -35,6 +36,7 @@
 import org.hornetq.tests.unit.core.server.impl.fakes.FakeFilter;
 import org.hornetq.tests.unit.core.server.impl.fakes.FakePostOffice;
 import org.hornetq.tests.util.UnitTestCase;
+import org.hornetq.utils.Future;
 
 /**
  * A QueueTest
@@ -46,18 +48,22 @@
    // The tests ----------------------------------------------------------------
 
    private ScheduledExecutorService scheduledExecutor;
+   
+   private ExecutorService executor;
 
    @Override
    protected void setUp() throws Exception
    {
       super.setUp();
       scheduledExecutor = Executors.newSingleThreadScheduledExecutor();
+      executor = Executors.newSingleThreadExecutor();
    }
 
    @Override
    protected void tearDown() throws Exception
    {
-      scheduledExecutor.shutdown();
+      scheduledExecutor.shutdownNow();
+      executor.shutdownNow();
       super.tearDown();
    }
 
@@ -70,15 +76,16 @@
       final SimpleString name = new SimpleString("oobblle");
 
       QueueImpl queue = new QueueImpl(1,
-                                  QueueImplTest.address1,
-                                  name,
-                                  null,
-                                  false,
-                                  true,
-                                  scheduledExecutor,
-                                  null,
-                                  null,
-                                  null);
+                                      QueueImplTest.address1,
+                                      name,
+                                      null,
+                                      false,
+                                      true,
+                                      scheduledExecutor,
+                                      null,
+                                      null,
+                                      null,
+                                      executor);
 
       Assert.assertEquals(name, queue.getName());
    }
@@ -86,15 +93,16 @@
    public void testDurable()
    {
       QueueImpl queue = new QueueImpl(1,
-                                  QueueImplTest.address1,
-                                  QueueImplTest.queue1,
-                                  null,
-                                  false,
-                                  false,
-                                  scheduledExecutor,
-                                  null,
-                                  null,
-                                  null);
+                                      QueueImplTest.address1,
+                                      QueueImplTest.queue1,
+                                      null,
+                                      false,
+                                      false,
+                                      scheduledExecutor,
+                                      null,
+                                      null,
+                                      null,
+                                      executor);
 
       Assert.assertFalse(queue.isDurable());
 
@@ -107,7 +115,8 @@
                             scheduledExecutor,
                             null,
                             null,
-                            null);
+                            null,
+                            executor);
 
       Assert.assertTrue(queue.isDurable());
    }
@@ -121,15 +130,16 @@
       Consumer cons3 = new FakeConsumer();
 
       QueueImpl queue = new QueueImpl(1,
-                                  QueueImplTest.address1,
-                                  QueueImplTest.queue1,
-                                  null,
-                                  false,
-                                  true,
-                                  scheduledExecutor,
-                                  null,
-                                  null,
-                                  null);
+                                      QueueImplTest.address1,
+                                      QueueImplTest.queue1,
+                                      null,
+                                      false,
+                                      true,
+                                      scheduledExecutor,
+                                      null,
+                                      null,
+                                      null,
+                                      executor);
 
       Assert.assertEquals(0, queue.getConsumerCount());
 
@@ -171,15 +181,16 @@
    public void testGetFilter()
    {
       QueueImpl queue = new QueueImpl(1,
-                                  QueueImplTest.address1,
-                                  QueueImplTest.queue1,
-                                  null,
-                                  false,
-                                  true,
-                                  scheduledExecutor,
-                                  null,
-                                  null,
-                                  null);
+                                      QueueImplTest.address1,
+                                      QueueImplTest.queue1,
+                                      null,
+                                      false,
+                                      true,
+                                      scheduledExecutor,
+                                      null,
+                                      null,
+                                      null,
+                                      executor);
 
       Assert.assertNull(queue.getFilter());
 
@@ -205,7 +216,8 @@
                             scheduledExecutor,
                             null,
                             null,
-                            null);
+                            null,
+                            executor);
 
       Assert.assertEquals(filter, queue.getFilter());
 
@@ -214,15 +226,16 @@
    public void testSimpleadd()
    {
       QueueImpl queue = new QueueImpl(1,
-                                  QueueImplTest.address1,
-                                  QueueImplTest.queue1,
-                                  null,
-                                  false,
-                                  true,
-                                  scheduledExecutor,
-                                  null,
-                                  null,
-                                  null);
+                                      QueueImplTest.address1,
+                                      QueueImplTest.queue1,
+                                      null,
+                                      false,
+                                      true,
+                                      scheduledExecutor,
+                                      null,
+                                      null,
+                                      null,
+                                      executor);
 
       final int numMessages = 10;
 
@@ -242,15 +255,16 @@
    public void testSimpleDirectDelivery() throws Exception
    {
       QueueImpl queue = new QueueImpl(1,
-                                  QueueImplTest.address1,
-                                  QueueImplTest.queue1,
-                                  null,
-                                  false,
-                                  true,
-                                  scheduledExecutor,
-                                  null,
-                                  null,
-                                  null);
+                                      QueueImplTest.address1,
+                                      QueueImplTest.queue1,
+                                      null,
+                                      false,
+                                      true,
+                                      scheduledExecutor,
+                                      null,
+                                      null,
+                                      null,
+                                      executor);
 
       FakeConsumer consumer = new FakeConsumer();
 
@@ -279,15 +293,16 @@
    public void testSimpleNonDirectDelivery() throws Exception
    {
       QueueImpl queue = new QueueImpl(1,
-                                  QueueImplTest.address1,
-                                  QueueImplTest.queue1,
-                                  null,
-                                  false,
-                                  true,
-                                  scheduledExecutor,
-                                  null,
-                                  null,
-                                  null);
+                                      QueueImplTest.address1,
+                                      QueueImplTest.queue1,
+                                      null,
+                                      false,
+                                      true,
+                                      scheduledExecutor,
+                                      null,
+                                      null,
+                                      null,
+                                      executor);
 
       final int numMessages = 10;
 
@@ -326,15 +341,16 @@
    public void testBusyConsumer() throws Exception
    {
       QueueImpl queue = new QueueImpl(1,
-                                  QueueImplTest.address1,
-                                  QueueImplTest.queue1,
-                                  null,
-                                  false,
-                                  true,
-                                  scheduledExecutor,
-                                  null,
-                                  null,
-                                  null);
+                                      QueueImplTest.address1,
+                                      QueueImplTest.queue1,
+                                      null,
+                                      false,
+                                      true,
+                                      scheduledExecutor,
+                                      null,
+                                      null,
+                                      null,
+                                      executor);
 
       FakeConsumer consumer = new FakeConsumer();
 
@@ -379,15 +395,16 @@
    public void testBusyConsumerThenAddMoreMessages() throws Exception
    {
       QueueImpl queue = new QueueImpl(1,
-                                  QueueImplTest.address1,
-                                  QueueImplTest.queue1,
-                                  null,
-                                  false,
-                                  true,
-                                  scheduledExecutor,
-                                  null,
-                                  null,
-                                  null);
+                                      QueueImplTest.address1,
+                                      QueueImplTest.queue1,
+                                      null,
+                                      false,
+                                      true,
+                                      scheduledExecutor,
+                                      null,
+                                      null,
+                                      null,
+                                      executor);
 
       FakeConsumer consumer = new FakeConsumer();
 
@@ -455,15 +472,16 @@
    public void testAddFirstadd() throws Exception
    {
       QueueImpl queue = new QueueImpl(1,
-                                  QueueImplTest.address1,
-                                  QueueImplTest.queue1,
-                                  null,
-                                  false,
-                                  true,
-                                  scheduledExecutor,
-                                  null,
-                                  null,
-                                  null);
+                                      QueueImplTest.address1,
+                                      QueueImplTest.queue1,
+                                      null,
+                                      false,
+                                      true,
+                                      scheduledExecutor,
+                                      null,
+                                      null,
+                                      null,
+                                      executor);
 
       final int numMessages = 10;
 
@@ -518,15 +536,16 @@
    public void testChangeConsumersAndDeliver() throws Exception
    {
       QueueImpl queue = new QueueImpl(1,
-                                  QueueImplTest.address1,
-                                  QueueImplTest.queue1,
-                                  null,
-                                  false,
-                                  true,
-                                  scheduledExecutor,
-                                  new FakePostOffice(),
-                                  null,
-                                  null);
+                                      QueueImplTest.address1,
+                                      QueueImplTest.queue1,
+                                      null,
+                                      false,
+                                      true,
+                                      scheduledExecutor,
+                                      new FakePostOffice(),
+                                      null,
+                                      null,
+                                      executor);
 
       final int numMessages = 10;
 
@@ -681,15 +700,16 @@
    public void testConsumerReturningNull() throws Exception
    {
       QueueImpl queue = new QueueImpl(1,
-                                  QueueImplTest.address1,
-                                  QueueImplTest.queue1,
-                                  null,
-                                  false,
-                                  true,
-                                  scheduledExecutor,
-                                  null,
-                                  null,
-                                  null);
+                                      QueueImplTest.address1,
+                                      QueueImplTest.queue1,
+                                      null,
+                                      false,
+                                      true,
+                                      scheduledExecutor,
+                                      null,
+                                      null,
+                                      null,
+                                      executor);
 
       class NullConsumer implements Consumer
       {
@@ -723,15 +743,16 @@
    public void testRoundRobinWithQueueing() throws Exception
    {
       QueueImpl queue = new QueueImpl(1,
-                                  QueueImplTest.address1,
-                                  QueueImplTest.queue1,
-                                  null,
-                                  false,
-                                  true,
-                                  scheduledExecutor,
-                                  null,
-                                  null,
-                                  null);
+                                      QueueImplTest.address1,
+                                      QueueImplTest.queue1,
+                                      null,
+                                      false,
+                                      true,
+                                      scheduledExecutor,
+                                      null,
+                                      null,
+                                      null,
+                                      executor);
 
       final int numMessages = 10;
 
@@ -775,15 +796,16 @@
    public void testRoundRobinDirect() throws Exception
    {
       QueueImpl queue = new QueueImpl(1,
-                                  QueueImplTest.address1,
-                                  QueueImplTest.queue1,
-                                  null,
-                                  false,
-                                  true,
-                                  scheduledExecutor,
-                                  null,
-                                  null,
-                                  null);
+                                      QueueImplTest.address1,
+                                      QueueImplTest.queue1,
+                                      null,
+                                      false,
+                                      true,
+                                      scheduledExecutor,
+                                      null,
+                                      null,
+                                      null,
+                                      executor);
 
       final int numMessages = 10;
 
@@ -825,15 +847,16 @@
    public void testWithPriorities() throws Exception
    {
       QueueImpl queue = new QueueImpl(1,
-                                  QueueImplTest.address1,
-                                  QueueImplTest.queue1,
-                                  null,
-                                  false,
-                                  true,
-                                  scheduledExecutor,
-                                  null,
-                                  null,
-                                  null);
+                                      QueueImplTest.address1,
+                                      QueueImplTest.queue1,
+                                      null,
+                                      false,
+                                      true,
+                                      scheduledExecutor,
+                                      null,
+                                      null,
+                                      null,
+                                      executor);
 
       final int numMessages = 10;
 
@@ -901,15 +924,16 @@
    public void testConsumerWithFilterAddAndRemove()
    {
       QueueImpl queue = new QueueImpl(1,
-                                  QueueImplTest.address1,
-                                  QueueImplTest.queue1,
-                                  null,
-                                  false,
-                                  true,
-                                  scheduledExecutor,
-                                  null,
-                                  null,
-                                  null);
+                                      QueueImplTest.address1,
+                                      QueueImplTest.queue1,
+                                      null,
+                                      false,
+                                      true,
+                                      scheduledExecutor,
+                                      null,
+                                      null,
+                                      null,
+                                      executor);
 
       Filter filter = new FakeFilter("fruit", "orange");
 
@@ -919,15 +943,16 @@
    public void testIterator()
    {
       QueueImpl queue = new QueueImpl(1,
-                                  QueueImplTest.address1,
-                                  QueueImplTest.queue1,
-                                  null,
-                                  false,
-                                  true,
-                                  scheduledExecutor,
-                                  null,
-                                  null,
-                                  null);
+                                      QueueImplTest.address1,
+                                      QueueImplTest.queue1,
+                                      null,
+                                      false,
+                                      true,
+                                      scheduledExecutor,
+                                      null,
+                                      null,
+                                      null,
+                                      executor);
 
       final int numMessages = 20;
 
@@ -952,19 +977,30 @@
       }
       assertRefListsIdenticalRefs(refs, list);
    }
+   
+   private void awaitExecution()
+   {
+      Future future = new Future();
+      
+      executor.execute(future);
+      
+      future.await(10000);
+   }
 
    public void testConsumeWithFiltersAddAndRemoveConsumer() throws Exception
    {
+    
       QueueImpl queue = new QueueImpl(1,
-                                  QueueImplTest.address1,
-                                  QueueImplTest.queue1,
-                                  null,
-                                  false,
-                                  true,
-                                  scheduledExecutor,
-                                  new FakePostOffice(),
-                                  null,
-                                  null);
+                                      QueueImplTest.address1,
+                                      QueueImplTest.queue1,
+                                      null,
+                                      false,
+                                      true,
+                                      scheduledExecutor,
+                                      new FakePostOffice(),
+                                      null,
+                                      null,
+                                      executor);
 
       Filter filter = new FakeFilter("fruit", "orange");
 
@@ -989,6 +1025,8 @@
       refs.add(ref2);
 
       Assert.assertEquals(2, queue.getMessageCount());
+      
+      awaitExecution();;
 
       Assert.assertEquals(1, consumer.getReferences().size());
 
@@ -1023,6 +1061,8 @@
       refs.add(ref4);
 
       Assert.assertEquals(3, queue.getMessageCount());
+      
+      awaitExecution();;
 
       Assert.assertEquals(1, consumer.getReferences().size());
 
@@ -1034,15 +1074,16 @@
    public void testBusyConsumerWithFilterFirstCallBusy() throws Exception
    {
       QueueImpl queue = new QueueImpl(1,
-                                  QueueImplTest.address1,
-                                  QueueImplTest.queue1,
-                                  null,
-                                  false,
-                                  true,
-                                  scheduledExecutor,
-                                  null,
-                                  null,
-                                  null);
+                                      QueueImplTest.address1,
+                                      QueueImplTest.queue1,
+                                      null,
+                                      false,
+                                      true,
+                                      scheduledExecutor,
+                                      null,
+                                      null,
+                                      null,
+                                      executor);
 
       FakeConsumer consumer = new FakeConsumer(FilterImpl.createFilter("color = 'green'"));
 
@@ -1084,15 +1125,16 @@
    public void testBusyConsumerWithFilterThenAddMoreMessages() throws Exception
    {
       QueueImpl queue = new QueueImpl(1,
-                                  QueueImplTest.address1,
-                                  QueueImplTest.queue1,
-                                  null,
-                                  false,
-                                  true,
-                                  scheduledExecutor,
-                                  null,
-                                  null,
-                                  null);
+                                      QueueImplTest.address1,
+                                      QueueImplTest.queue1,
+                                      null,
+                                      false,
+                                      true,
+                                      scheduledExecutor,
+                                      null,
+                                      null,
+                                      null,
+                                      executor);
 
       FakeConsumer consumer = new FakeConsumer(FilterImpl.createFilter("color = 'green'"));
 
@@ -1167,15 +1209,16 @@
    public void testConsumerWithFilterThenAddMoreMessages() throws Exception
    {
       QueueImpl queue = new QueueImpl(1,
-                                  QueueImplTest.address1,
-                                  QueueImplTest.queue1,
-                                  null,
-                                  false,
-                                  true,
-                                  scheduledExecutor,
-                                  null,
-                                  null,
-                                  null);
+                                      QueueImplTest.address1,
+                                      QueueImplTest.queue1,
+                                      null,
+                                      false,
+                                      true,
+                                      scheduledExecutor,
+                                      null,
+                                      null,
+                                      null,
+                                      executor);
 
       final int numMessages = 10;
       List<MessageReference> refs = new ArrayList<MessageReference>();
@@ -1240,15 +1283,16 @@
    private void testConsumerWithFilters(final boolean direct) throws Exception
    {
       QueueImpl queue = new QueueImpl(1,
-                                  QueueImplTest.address1,
-                                  QueueImplTest.queue1,
-                                  null,
-                                  false,
-                                  true,
-                                  scheduledExecutor,
-                                  new FakePostOffice(),
-                                  null,
-                                  null);
+                                      QueueImplTest.address1,
+                                      QueueImplTest.queue1,
+                                      null,
+                                      false,
+                                      true,
+                                      scheduledExecutor,
+                                      new FakePostOffice(),
+                                      null,
+                                      null,
+                                      executor);
 
       Filter filter = new FakeFilter("fruit", "orange");
 
@@ -1309,6 +1353,8 @@
       }
 
       Assert.assertEquals(6, queue.getMessageCount());
+      
+      awaitExecution();;
 
       Assert.assertEquals(2, consumer.getReferences().size());
 
@@ -1338,15 +1384,16 @@
    {
       FakeConsumer consumer = new FakeConsumer();
       QueueImpl queue = new QueueImpl(1,
-                                  QueueImplTest.address1,
-                                  QueueImplTest.queue1,
-                                  null,
-                                  false,
-                                  true,
-                                  scheduledExecutor,
-                                  null,
-                                  null,
-                                  null);
+                                      QueueImplTest.address1,
+                                      QueueImplTest.queue1,
+                                      null,
+                                      false,
+                                      true,
+                                      scheduledExecutor,
+                                      null,
+                                      null,
+                                      null,
+                                      executor);
       MessageReference messageReference = generateReference(queue, 1);
       MessageReference messageReference2 = generateReference(queue, 2);
       MessageReference messageReference3 = generateReference(queue, 3);
@@ -1367,15 +1414,16 @@
    public void testMessagesAdded() throws Exception
    {
       QueueImpl queue = new QueueImpl(1,
-                                  QueueImplTest.address1,
-                                  QueueImplTest.queue1,
-                                  null,
-                                  false,
-                                  true,
-                                  scheduledExecutor,
-                                  null,
-                                  null,
-                                  null);
+                                      QueueImplTest.address1,
+                                      QueueImplTest.queue1,
+                                      null,
+                                      false,
+                                      true,
+                                      scheduledExecutor,
+                                      null,
+                                      null,
+                                      null,
+                                      executor);
       MessageReference messageReference = generateReference(queue, 1);
       MessageReference messageReference2 = generateReference(queue, 2);
       MessageReference messageReference3 = generateReference(queue, 3);
@@ -1388,15 +1436,16 @@
    public void testGetReference() throws Exception
    {
       QueueImpl queue = new QueueImpl(1,
-                                  QueueImplTest.address1,
-                                  QueueImplTest.queue1,
-                                  null,
-                                  false,
-                                  true,
-                                  scheduledExecutor,
-                                  null,
-                                  null,
-                                  null);
+                                      QueueImplTest.address1,
+                                      QueueImplTest.queue1,
+                                      null,
+                                      false,
+                                      true,
+                                      scheduledExecutor,
+                                      null,
+                                      null,
+                                      null,
+                                      executor);
       MessageReference messageReference = generateReference(queue, 1);
       MessageReference messageReference2 = generateReference(queue, 2);
       MessageReference messageReference3 = generateReference(queue, 3);
@@ -1410,15 +1459,16 @@
    public void testGetNonExistentReference() throws Exception
    {
       QueueImpl queue = new QueueImpl(1,
-                                  QueueImplTest.address1,
-                                  QueueImplTest.queue1,
-                                  null,
-                                  false,
-                                  true,
-                                  scheduledExecutor,
-                                  null,
-                                  null,
-                                  null);
+                                      QueueImplTest.address1,
+                                      QueueImplTest.queue1,
+                                      null,
+                                      false,
+                                      true,
+                                      scheduledExecutor,
+                                      null,
+                                      null,
+                                      null,
+                                      executor);
       MessageReference messageReference = generateReference(queue, 1);
       MessageReference messageReference2 = generateReference(queue, 2);
       MessageReference messageReference3 = generateReference(queue, 3);
@@ -1436,15 +1486,16 @@
    public void testPauseAndResumeWithAsync() throws Exception
    {
       QueueImpl queue = new QueueImpl(1,
-                                  QueueImplTest.address1,
-                                  QueueImplTest.queue1,
-                                  null,
-                                  false,
-                                  true,
-                                  scheduledExecutor,
-                                  null,
-                                  null,
-                                  null);
+                                      QueueImplTest.address1,
+                                      QueueImplTest.queue1,
+                                      null,
+                                      false,
+                                      true,
+                                      scheduledExecutor,
+                                      null,
+                                      null,
+                                      null,
+                                      executor);
 
       // pauses the queue
       queue.pause();
@@ -1483,6 +1534,8 @@
       Assert.assertEquals(0, queue.getDeliveringCount());
       // resuming work
       queue.resume();
+      
+      awaitExecution();;
 
       // after resuming the delivery begins.
       assertRefListsIdenticalRefs(refs, consumer.getReferences());
@@ -1500,15 +1553,16 @@
    public void testPauseAndResumeWithDirect() throws Exception
    {
       QueueImpl queue = new QueueImpl(1,
-                                  QueueImplTest.address1,
-                                  QueueImplTest.queue1,
-                                  null,
-                                  false,
-                                  true,
-                                  scheduledExecutor,
-                                  null,
-                                  null,
-                                  null);
+                                      QueueImplTest.address1,
+                                      QueueImplTest.queue1,
+                                      null,
+                                      false,
+                                      true,
+                                      scheduledExecutor,
+                                      null,
+                                      null,
+                                      null,
+                                      executor);
 
       // Now add a consumer
       FakeConsumer consumer = new FakeConsumer();
@@ -1538,6 +1592,10 @@
 
       // brings the queue to resumed state.
       queue.resume();
+      
+      
+      awaitExecution();;
+      
       // resuming delivery of messages
       assertRefListsIdenticalRefs(refs, consumer.getReferences());
       Assert.assertEquals(numMessages, queue.getMessageCount());

Modified: trunk/tests/src/org/hornetq/tests/unit/core/server/impl/fakes/FakeQueueFactory.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/server/impl/fakes/FakeQueueFactory.java	2010-01-11 16:04:54 UTC (rev 8789)
+++ trunk/tests/src/org/hornetq/tests/unit/core/server/impl/fakes/FakeQueueFactory.java	2010-01-11 18:07:20 UTC (rev 8790)
@@ -52,6 +52,7 @@
                            scheduledExecutor,
                            postOffice,
                            null,
+                           null,
                            null);
    }
 



More information about the hornetq-commits mailing list