[hornetq-commits] JBoss hornetq SVN: r8369 - in branches/ClebertCallback: src/main/org/hornetq/core/management/impl and 17 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Sun Nov 22 21:04:55 EST 2009


Author: clebert.suconic at jboss.com
Date: 2009-11-22 21:04:55 -0500 (Sun, 22 Nov 2009)
New Revision: 8369

Modified:
   branches/ClebertCallback/src/main/org/hornetq/core/management/ManagementService.java
   branches/ClebertCallback/src/main/org/hornetq/core/management/impl/ManagementServiceImpl.java
   branches/ClebertCallback/src/main/org/hornetq/core/persistence/OperationContext.java
   branches/ClebertCallback/src/main/org/hornetq/core/persistence/StorageManager.java
   branches/ClebertCallback/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
   branches/ClebertCallback/src/main/org/hornetq/core/persistence/impl/journal/OperationContextImpl.java
   branches/ClebertCallback/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java
   branches/ClebertCallback/src/main/org/hornetq/core/postoffice/impl/DuplicateIDCacheImpl.java
   branches/ClebertCallback/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
   branches/ClebertCallback/src/main/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
   branches/ClebertCallback/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
   branches/ClebertCallback/src/main/org/hornetq/core/server/Queue.java
   branches/ClebertCallback/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
   branches/ClebertCallback/src/main/org/hornetq/core/server/impl/LastValueQueue.java
   branches/ClebertCallback/src/main/org/hornetq/core/server/impl/QueueFactoryImpl.java
   branches/ClebertCallback/src/main/org/hornetq/core/server/impl/QueueImpl.java
   branches/ClebertCallback/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
   branches/ClebertCallback/src/main/org/hornetq/core/server/impl/ServerSessionPacketHandler.java
   branches/ClebertCallback/tests/src/org/hornetq/tests/integration/largemessage/ServerLargeMessageTest.java
   branches/ClebertCallback/tests/src/org/hornetq/tests/integration/persistence/RestartSMTest.java
   branches/ClebertCallback/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java
   branches/ClebertCallback/tests/src/org/hornetq/tests/performance/persistence/StorageManagerTimingTest.java
   branches/ClebertCallback/tests/src/org/hornetq/tests/timing/core/server/impl/QueueImplTest.java
   branches/ClebertCallback/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java
   branches/ClebertCallback/tests/src/org/hornetq/tests/unit/core/postoffice/impl/DuplicateDetectionUnitTest.java
   branches/ClebertCallback/tests/src/org/hornetq/tests/unit/core/server/impl/QueueImplTest.java
   branches/ClebertCallback/tests/src/org/hornetq/tests/unit/core/server/impl/fakes/FakeQueueFactory.java
   branches/ClebertCallback/tests/src/org/hornetq/tests/util/UnitTestCase.java
Log:
Reverting unecessary executors.. 

Modified: branches/ClebertCallback/src/main/org/hornetq/core/management/ManagementService.java
===================================================================
--- branches/ClebertCallback/src/main/org/hornetq/core/management/ManagementService.java	2009-11-22 18:49:03 UTC (rev 8368)
+++ branches/ClebertCallback/src/main/org/hornetq/core/management/ManagementService.java	2009-11-23 02:04:55 UTC (rev 8369)
@@ -71,6 +71,8 @@
    ObjectNameBuilder getObjectNameBuilder();
    
    // Resource Registration
+   
+   void setStorageManager(StorageManager storageManager);
 
    HornetQServerControlImpl registerServer(PostOffice postOffice,
                                          StorageManager storageManager,

Modified: branches/ClebertCallback/src/main/org/hornetq/core/management/impl/ManagementServiceImpl.java
===================================================================
--- branches/ClebertCallback/src/main/org/hornetq/core/management/impl/ManagementServiceImpl.java	2009-11-22 18:49:03 UTC (rev 8368)
+++ branches/ClebertCallback/src/main/org/hornetq/core/management/impl/ManagementServiceImpl.java	2009-11-23 02:04:55 UTC (rev 8369)
@@ -59,7 +59,6 @@
 import org.hornetq.core.messagecounter.impl.MessageCounterManagerImpl;
 import org.hornetq.core.paging.PagingManager;
 import org.hornetq.core.persistence.StorageManager;
-import org.hornetq.core.persistence.impl.journal.OperationContextImpl;
 import org.hornetq.core.postoffice.PostOffice;
 import org.hornetq.core.remoting.server.RemotingService;
 import org.hornetq.core.remoting.spi.Acceptor;
@@ -181,6 +180,11 @@
    {
       return messageCounterManager;
    }
+   
+   public void setStorageManager(StorageManager storageManager)
+   {
+      this.storageManager = storageManager;
+   }
 
    public HornetQServerControlImpl registerServer(final PostOffice postOffice,
                                                   final StorageManager storageManager,
@@ -741,28 +745,17 @@
          }
       }
       
-      // TODO: Talk to Andy and Jeff about a better way to sync this...
-      System.out.println("Waiting");
-      final CountDownLatch latch = new CountDownLatch(1);
-      OperationContextImpl.getInstance().executeOnCompletion(new IOAsyncTask()
+      if (storageManager != null)
       {
-
-         public void done()
-         {
-            System.out.println("Done on management");
-            latch.countDown();
-         }
-
-         public void onError(int errorCode, String errorMessage)
-         {
-         }
-         
-      });
-      
-      OperationContextImpl.getInstance().complete();
-      
-      latch.await(5, TimeUnit.SECONDS);
-      System.out.println("Done");
+         System.out.println("Waiting on management...");
+         storageManager.waitOnOperations(managementRequestTimeout);
+         storageManager.clearContext();
+         System.out.println("Done");
+      }
+      else
+      {
+         new Exception("storagemanager is null, can't wait on operations").printStackTrace();
+      }
    }
 
    public void enableNotifications(boolean enabled)

Modified: branches/ClebertCallback/src/main/org/hornetq/core/persistence/OperationContext.java
===================================================================
--- branches/ClebertCallback/src/main/org/hornetq/core/persistence/OperationContext.java	2009-11-22 18:49:03 UTC (rev 8368)
+++ branches/ClebertCallback/src/main/org/hornetq/core/persistence/OperationContext.java	2009-11-23 02:04:55 UTC (rev 8369)
@@ -30,11 +30,6 @@
 public interface OperationContext extends IOCompletion
 {
    
-   boolean hasReplication();
-   
-   /** Reattach the context to the current thread */
-   void reinstall();
-   
    /** The executor used on responses.
     *  If this is not set, it will use the current thread. */
    void setExecutor(Executor executor);
@@ -49,8 +44,5 @@
 
    /** To be called when there are no more operations pending */
    void complete();
-   
-   /** Is this a special operation to sync replication. */
-   boolean isSync();
 
 }

Modified: branches/ClebertCallback/src/main/org/hornetq/core/persistence/StorageManager.java
===================================================================
--- branches/ClebertCallback/src/main/org/hornetq/core/persistence/StorageManager.java	2009-11-22 18:49:03 UTC (rev 8368)
+++ branches/ClebertCallback/src/main/org/hornetq/core/persistence/StorageManager.java	2009-11-23 02:04:55 UTC (rev 8369)
@@ -52,9 +52,12 @@
    /** Get the context associated with the thread for later reuse */
    OperationContext getContext();
    
-   /** It just creates an OperationContext without associating it to any threads */
+   /** It just creates an OperationContext without associating it */
    OperationContext newContext(Executor executor);
    
+   /** Set the context back to the thread */
+   void setContext(OperationContext context);
+   
    // Message related operations
 
    void pageClosed(SimpleString storeName, int pageNumber);
@@ -77,6 +80,8 @@
 
    /** To close the OperationsContext */
    void completeOperations();
+   
+   void clearContext();
 
    UUID getPersistentID();
    

Modified: branches/ClebertCallback/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- branches/ClebertCallback/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java	2009-11-22 18:49:03 UTC (rev 8368)
+++ branches/ClebertCallback/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java	2009-11-23 02:04:55 UTC (rev 8369)
@@ -78,6 +78,7 @@
 import org.hornetq.core.transaction.Transaction.State;
 import org.hornetq.core.transaction.impl.TransactionImpl;
 import org.hornetq.utils.DataConstants;
+import org.hornetq.utils.ExecutorFactory;
 import org.hornetq.utils.Pair;
 import org.hornetq.utils.SimpleString;
 import org.hornetq.utils.UUID;
@@ -93,6 +94,7 @@
  */
 public class JournalStorageManager implements StorageManager
 {
+   
    private static final Logger log = Logger.getLogger(JournalStorageManager.class);
 
    private static final long CHECKPOINT_BATCH_SIZE = Integer.MAX_VALUE;
@@ -144,6 +146,9 @@
    private final SequentialFileFactory largeMessagesFactory;
 
    private volatile boolean started;
+   
+   /** Used to create Operation Contexts */
+   private final ExecutorFactory executorFactory;
 
    private final Executor executor;
 
@@ -163,15 +168,17 @@
 
    private final String largeMessagesDirectory;
 
-   public JournalStorageManager(final Configuration config, final Executor executor)
+   public JournalStorageManager(final Configuration config, final ExecutorFactory executorFactory)
    {
-      this(config, executor, null);
+      this(config, executorFactory, null);
    }
 
-   public JournalStorageManager(final Configuration config, final Executor executor, final ReplicationManager replicator)
+   public JournalStorageManager(final Configuration config, final ExecutorFactory executorFactory, final ReplicationManager replicator)
    {
-      this.executor = executor;
+      this.executorFactory = executorFactory;
 
+      this.executor = executorFactory.getExecutor();
+
       this.replicator = replicator;
 
       if (config.getJournalType() != JournalType.NIO && config.getJournalType() != JournalType.ASYNCIO)
@@ -295,8 +302,13 @@
     */
    public void completeOperations()
    {
-      OperationContextImpl.getInstance().complete();
+      getContext().complete();
    }
+   
+   public void clearContext()
+   {
+      OperationContextImpl.clearContext();
+   }
 
    public boolean isReplicated()
    {
@@ -373,8 +385,13 @@
     */
    public OperationContext getContext()
    {
-      return OperationContextImpl.getInstance();
+      return OperationContextImpl.getContext(executorFactory);
    }
+   
+   public void setContext(OperationContext context)
+   {
+      OperationContextImpl.setContext(context);
+   }
 
    /* (non-Javadoc)
     * @see org.hornetq.core.persistence.StorageManager#newContext()
@@ -386,7 +403,7 @@
 
    public void afterCompleteOperations(IOAsyncTask run)
    {
-      OperationContextImpl.getInstance().executeOnCompletion(run);
+      getContext().executeOnCompletion(run);
    }
 
    public UUID getPersistentID()
@@ -469,27 +486,27 @@
          messageJournal.appendAddRecord(message.getMessageID(),
                                         ADD_LARGE_MESSAGE,
                                         new LargeMessageEncoding((LargeServerMessage)message),
-                                        false, getIOContext());
+                                        false, getContext());
       }
       else
       {
-         messageJournal.appendAddRecord(message.getMessageID(), ADD_MESSAGE, message, false, getIOContext());
+         messageJournal.appendAddRecord(message.getMessageID(), ADD_MESSAGE, message, false, getContext());
       }
    }
 
    public void storeReference(final long queueID, final long messageID) throws Exception
    {
-      messageJournal.appendUpdateRecord(messageID, ADD_REF, new RefEncoding(queueID), syncNonTransactional, getIOContext());
+      messageJournal.appendUpdateRecord(messageID, ADD_REF, new RefEncoding(queueID), syncNonTransactional, getContext());
    }
 
    public void storeAcknowledge(final long queueID, final long messageID) throws Exception
    {
-      messageJournal.appendUpdateRecord(messageID, ACKNOWLEDGE_REF, new RefEncoding(queueID), syncNonTransactional, getIOContext());
+      messageJournal.appendUpdateRecord(messageID, ACKNOWLEDGE_REF, new RefEncoding(queueID), syncNonTransactional, getContext());
    }
 
    public void deleteMessage(final long messageID) throws Exception
    {
-      messageJournal.appendDeleteRecord(messageID, syncNonTransactional, getIOContext());
+      messageJournal.appendDeleteRecord(messageID, syncNonTransactional, getContext());
    }
 
    public void updateScheduledDeliveryTime(final MessageReference ref) throws Exception
@@ -500,19 +517,19 @@
       messageJournal.appendUpdateRecord(ref.getMessage().getMessageID(),
                                         SET_SCHEDULED_DELIVERY_TIME,
                                         encoding,
-                                        syncNonTransactional, getIOContext());
+                                        syncNonTransactional, getContext());
    }
 
    public void storeDuplicateID(final SimpleString address, final byte[] duplID, final long recordID) throws Exception
    {
       DuplicateIDEncoding encoding = new DuplicateIDEncoding(address, duplID);
 
-      messageJournal.appendAddRecord(recordID, DUPLICATE_ID, encoding, syncNonTransactional, getIOContext());
+      messageJournal.appendAddRecord(recordID, DUPLICATE_ID, encoding, syncNonTransactional, getContext());
    }
 
    public void deleteDuplicateID(long recordID) throws Exception
    {
-      messageJournal.appendDeleteRecord(recordID, syncNonTransactional, getIOContext());
+      messageJournal.appendDeleteRecord(recordID, syncNonTransactional, getContext());
    }
 
    // Transactional operations
@@ -568,13 +585,13 @@
    public long storeHeuristicCompletion(Xid xid, boolean isCommit) throws Exception
    {
       long id = generateUniqueID();
-      messageJournal.appendAddRecord(id, HEURISTIC_COMPLETION, new HeuristicCompletionEncoding(xid, isCommit), true, getIOContext());
+      messageJournal.appendAddRecord(id, HEURISTIC_COMPLETION, new HeuristicCompletionEncoding(xid, isCommit), true, getContext());
       return id;
    }
 
    public void deleteHeuristicCompletion(long id) throws Exception
    {
-      messageJournal.appendDeleteRecord(id, true, getIOContext());
+      messageJournal.appendDeleteRecord(id, true, getContext());
    }
 
    public void deletePageTransactional(final long txID, final long recordID) throws Exception
@@ -600,17 +617,17 @@
 
    public void prepare(final long txID, final Xid xid) throws Exception
    {
-      messageJournal.appendPrepareRecord(txID, new XidEncoding(xid), syncTransactional, getIOContext());
+      messageJournal.appendPrepareRecord(txID, new XidEncoding(xid), syncTransactional, getContext());
    }
 
    public void commit(final long txID) throws Exception
    {
-      messageJournal.appendCommitRecord(txID, syncTransactional, getIOContext());
+      messageJournal.appendCommitRecord(txID, syncTransactional, getContext());
    }
 
    public void rollback(final long txID) throws Exception
    {
-      messageJournal.appendRollbackRecord(txID, syncTransactional, getIOContext());
+      messageJournal.appendRollbackRecord(txID, syncTransactional, getContext());
    }
 
    public void storeDuplicateIDTransactional(final long txID,
@@ -648,7 +665,7 @@
       messageJournal.appendUpdateRecord(ref.getMessage().getMessageID(),
                                         UPDATE_DELIVERY_COUNT,
                                         updateInfo,
-                                        syncNonTransactional, getIOContext());
+                                        syncNonTransactional, getContext());
    }
 
    private static final class AddMessageRecord
@@ -1394,11 +1411,6 @@
 
    // Private ----------------------------------------------------------------------------------
    
-   private IOCompletion getIOContext()
-   {
-      return OperationContextImpl.getInstance();
-   }
-
    private void checkAndCreateDir(final String dir, final boolean create)
    {
       File f = new File(dir);

Modified: branches/ClebertCallback/src/main/org/hornetq/core/persistence/impl/journal/OperationContextImpl.java
===================================================================
--- branches/ClebertCallback/src/main/org/hornetq/core/persistence/impl/journal/OperationContextImpl.java	2009-11-22 18:49:03 UTC (rev 8368)
+++ branches/ClebertCallback/src/main/org/hornetq/core/persistence/impl/journal/OperationContextImpl.java	2009-11-23 02:04:55 UTC (rev 8369)
@@ -21,9 +21,8 @@
 
 import org.hornetq.core.journal.IOAsyncTask;
 import org.hornetq.core.persistence.OperationContext;
+import org.hornetq.utils.ExecutorFactory;
 
-import sun.security.util.PendingException;
-
 /**
  * 
  * This class will hold operations when there are IO operations...
@@ -33,24 +32,31 @@
  */
 public class OperationContextImpl implements OperationContext
 {
-   private static final ThreadLocal<OperationContext> tlContext = new ThreadLocal<OperationContext>();
+   
+   private static final ThreadLocal<OperationContext> threadLocalContext = new ThreadLocal<OperationContext>();
 
-   public static void setInstance(OperationContext context)
+   public static void clearContext()
    {
-      tlContext.set(context);
+      threadLocalContext.set(null);
    }
    
-   public static OperationContext getInstance()
+   public static OperationContext getContext(final ExecutorFactory executorFactory)
    {
-      OperationContext token = tlContext.get();
+      OperationContext token = threadLocalContext.get();
       if (token == null)
       {
-         token = new OperationContextImpl();
-         tlContext.set(token);
+         token = new OperationContextImpl(executorFactory.getExecutor());
+         threadLocalContext.set(token);
       }
       return token;
    }
-
+   
+   public static void setContext(OperationContext context)
+   {
+      threadLocalContext.set(context);
+   }
+   
+   
    private List<TaskHolder> tasks;
 
    private volatile int storeLineUp = 0;
@@ -64,38 +70,21 @@
    private int stored = 0;
 
    private int replicated = 0;
-   
+
    private int errorCode = -1;
-   
+
    private String errorMessage = null;
-   
+
    private Executor executor;
-   
+
    private final AtomicInteger executorsPending = new AtomicInteger(0);
 
-   /**
-    * @param executor
-    */
-   public OperationContextImpl()
-   {
-      super();
-   }
-
    public OperationContextImpl(final Executor executor)
    {
       super();
       this.executor = executor;
    }
-   
-   /*
-    * @see org.hornetq.core.persistence.OperationContext#reinstall()
-    */
-   public void reinstall()
-   {
-      setInstance(this);
-   }
 
-
    /** To be called by the replication manager, when new replication is added to the queue */
    public void lineUp()
    {
@@ -106,7 +95,7 @@
    {
       replicationLineUp++;
    }
-   
+
    /** this method needs to be called before the executor became operational */
    public void setExecutor(Executor executor)
    {
@@ -119,50 +108,56 @@
       checkTasks();
    }
 
-   public boolean hasReplication()
-   {
-      return replicationLineUp > 0;
-   }
-
    /** You may have several actions to be done after a replication operation is completed. */
-   public synchronized void executeOnCompletion(final IOAsyncTask completion)
+   public void executeOnCompletion(final IOAsyncTask completion)
    {
-      if (tasks == null)
-      {
-         tasks = new LinkedList<TaskHolder>();
-         minimalReplicated = replicationLineUp;
-         minimalStore = storeLineUp;
-      }
+      boolean executeNow = false;
 
-      // On this case, we can just execute the context directly
-      if (replicationLineUp == replicated && storeLineUp == stored)
+      synchronized (this)
       {
-         if (executor != null)
+         if (tasks == null)
          {
-            // We want to avoid the executor if everything is complete...
-            // However, we can't execute the context if there are executions pending
-            // We need to use the executor on this case
-            if (executorsPending.get() == 0)
+            tasks = new LinkedList<TaskHolder>();
+            minimalReplicated = replicationLineUp;
+            minimalStore = storeLineUp;
+         }
+
+         // On this case, we can just execute the context directly
+         if (replicationLineUp == replicated && storeLineUp == stored)
+         {
+            if (executor != null)
             {
-               // No need to use an executor here or a context switch
-               // there are no actions pending.. hence we can just execute the task directly on the same thread
-               completion.done();
+               // We want to avoid the executor if everything is complete...
+               // However, we can't execute the context if there are executions pending
+               // We need to use the executor on this case
+               if (executorsPending.get() == 0)
+               {
+                  // No need to use an executor here or a context switch
+                  // there are no actions pending.. hence we can just execute the task directly on the same thread
+                  executeNow = true;
+               }
+               else
+               {
+                  execute(completion);
+               }
             }
             else
             {
-               execute(completion);
+               executeNow = true;
             }
          }
          else
          {
-            // Execute without an executor
-            completion.done();
+            tasks.add(new TaskHolder(completion));
          }
       }
-      else
+      
+      if (executeNow)
       {
-         tasks.add(new TaskHolder(completion));
+         // Executing outside of any locks
+         completion.done();
       }
+
    }
 
    /** To be called by the storage manager, when data is confirmed on the channel */
@@ -183,7 +178,7 @@
             if (!holder.executed && stored >= holder.storeLined && replicated >= holder.replicationLined)
             {
                holder.executed = true;
-                
+
                if (executor != null)
                {
                   // If set, we use an executor to avoid the server being single threaded
@@ -193,7 +188,7 @@
                {
                   holder.task.done();
                }
-               
+
                iter.remove();
             }
             else
@@ -227,8 +222,6 @@
     */
    public void complete()
    {
-      tlContext.set(null);
-      
       // TODO: test and fix exceptions on the Context
       if (tasks != null && errorMessage != null)
       {
@@ -237,16 +230,11 @@
             run.task.onError(errorCode, errorMessage);
          }
       }
-      
+
       // We hold errors until the complete is set, or the callbacks will never get informed
       errorCode = -1;
       errorMessage = null;
    }
-   
-   public boolean isSync()
-   {
-      return false;
-   }
 
    /* (non-Javadoc)
     * @see org.hornetq.core.asyncio.AIOCallback#onError(int, java.lang.String)
@@ -275,5 +263,4 @@
       }
    }
 
-
 }

Modified: branches/ClebertCallback/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java
===================================================================
--- branches/ClebertCallback/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java	2009-11-22 18:49:03 UTC (rev 8368)
+++ branches/ClebertCallback/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java	2009-11-23 02:04:55 UTC (rev 8369)
@@ -374,4 +374,18 @@
       return null;
    }
 
+   /* (non-Javadoc)
+    * @see org.hornetq.core.persistence.StorageManager#setContext(org.hornetq.core.persistence.OperationContext)
+    */
+   public void setContext(OperationContext context)
+   {
+   }
+
+   /* (non-Javadoc)
+    * @see org.hornetq.core.persistence.StorageManager#clearContext()
+    */
+   public void clearContext()
+   {
+   }
+
 }

Modified: branches/ClebertCallback/src/main/org/hornetq/core/postoffice/impl/DuplicateIDCacheImpl.java
===================================================================
--- branches/ClebertCallback/src/main/org/hornetq/core/postoffice/impl/DuplicateIDCacheImpl.java	2009-11-22 18:49:03 UTC (rev 8368)
+++ branches/ClebertCallback/src/main/org/hornetq/core/postoffice/impl/DuplicateIDCacheImpl.java	2009-11-23 02:04:55 UTC (rev 8369)
@@ -18,7 +18,6 @@
 import java.util.Collections;
 import java.util.List;
 import java.util.Set;
-import java.util.concurrent.Executor;
 
 import org.hornetq.core.logging.Logger;
 import org.hornetq.core.persistence.StorageManager;
@@ -59,14 +58,11 @@
    private final StorageManager storageManager;
 
    private final boolean persist;
-   
-   private final Executor executor;
 
    public DuplicateIDCacheImpl(final SimpleString address,
                                final int size,
                                final StorageManager storageManager,
-                               final boolean persist,
-                               final Executor executor)
+                               final boolean persist)
    {
       this.address = address;
 
@@ -77,8 +73,6 @@
       this.storageManager = storageManager;
 
       this.persist = persist;
-      
-      this.executor = executor;
    }
 
    public void load(final List<Pair<byte[], Long>> theIds) throws Exception
@@ -144,7 +138,7 @@
             storageManager.storeDuplicateID(address, duplID, recordID);
          }
 
-         addToCacheInMemory(duplID, recordID, null);
+         addToCacheInMemory(duplID, recordID);
       }
       else
       {
@@ -161,12 +155,11 @@
       }
    }
 
-   
-   private synchronized void addToCacheInMemory(final byte[] duplID, final long recordID, final Executor journalExecutor) throws Exception
+   private synchronized void addToCacheInMemory(final byte[] duplID, final long recordID)
    {
       cache.add(new ByteArrayHolder(duplID));
 
-      final Pair<ByteArrayHolder, Long> id;
+      Pair<ByteArrayHolder, Long> id;
 
       if (pos < ids.size())
       {
@@ -180,27 +173,13 @@
          // reclaimed
          id.a = new ByteArrayHolder(duplID);
 
-         if (journalExecutor != null)
+         try
          {
-            // We can't execute any IO inside the Journal callback, so taking it outside
-            journalExecutor.execute(new Runnable()
-            {
-               public void run()
-               {
-                  try
-                  {
-                     storageManager.deleteDuplicateID(id.b);
-                  }
-                  catch (Exception e)
-                  {
-                     log.warn("Error on deleting duplicate cache");
-                  }
-               }
-            });
+            storageManager.deleteDuplicateID(id.b);
          }
-         else
+         catch (Exception e)
          {
-            storageManager.deleteDuplicateID(id.b);
+            log.warn("Error on deleting duplicate cache", e);
          }
 
          id.b = recordID;
@@ -237,14 +216,8 @@
       {
          if (!done)
          {
-            try
-            {
-               addToCacheInMemory(duplID, recordID, executor);
-            }
-            catch (Exception shouldNotHappen)
-            {
-               // if you pass an executor to addtoCache, an exception will never happen here
-            }
+            addToCacheInMemory(duplID, recordID);
+
             done = true;
          }
       }

Modified: branches/ClebertCallback/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
===================================================================
--- branches/ClebertCallback/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java	2009-11-22 18:49:03 UTC (rev 8368)
+++ branches/ClebertCallback/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java	2009-11-23 02:04:55 UTC (rev 8369)
@@ -743,9 +743,7 @@
 
       if (cache == null)
       {
-         // TODO: What's the right executor? 
-         //       Is there another way
-         cache = new DuplicateIDCacheImpl(address, idCacheSize, storageManager, persistIDCache, redistributorExecutorFactory.getExecutor());
+         cache = new DuplicateIDCacheImpl(address, idCacheSize, storageManager, persistIDCache);
 
          DuplicateIDCache oldCache = duplicateIDCaches.putIfAbsent(address, cache);
 

Modified: branches/ClebertCallback/src/main/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
===================================================================
--- branches/ClebertCallback/src/main/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java	2009-11-22 18:49:03 UTC (rev 8368)
+++ branches/ClebertCallback/src/main/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java	2009-11-23 02:04:55 UTC (rev 8369)
@@ -190,8 +190,10 @@
    {
       Configuration config = server.getConfiguration();
 
-      storage = new JournalStorageManager(config, server.getExecutorFactory().getExecutor());
+      storage = new JournalStorageManager(config, server.getExecutorFactory());
       storage.start();
+      
+      server.getManagementService().setStorageManager(storage);
 
       bindingsJournal = storage.getBindingsJournal();
       messagingJournal = storage.getMessageJournal();

Modified: branches/ClebertCallback/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
===================================================================
--- branches/ClebertCallback/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java	2009-11-22 18:49:03 UTC (rev 8368)
+++ branches/ClebertCallback/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java	2009-11-23 02:04:55 UTC (rev 8369)
@@ -14,8 +14,6 @@
 package org.hornetq.core.replication.impl;
 
 import java.util.LinkedHashSet;
-import java.util.LinkedList;
-import java.util.List;
 import java.util.Queue;
 import java.util.Set;
 import java.util.concurrent.ConcurrentLinkedQueue;
@@ -28,6 +26,7 @@
 import org.hornetq.core.logging.Logger;
 import org.hornetq.core.paging.PagedMessage;
 import org.hornetq.core.persistence.OperationContext;
+import org.hornetq.core.persistence.StorageManager;
 import org.hornetq.core.persistence.impl.journal.OperationContextImpl;
 import org.hornetq.core.remoting.Channel;
 import org.hornetq.core.remoting.ChannelHandler;
@@ -49,6 +48,7 @@
 import org.hornetq.core.remoting.impl.wireformat.ReplicationPrepareMessage;
 import org.hornetq.core.remoting.spi.HornetQBuffer;
 import org.hornetq.core.replication.ReplicationManager;
+import org.hornetq.utils.ExecutorFactory;
 import org.hornetq.utils.SimpleString;
 
 /**
@@ -83,6 +83,8 @@
    private final Object replicationLock = new Object();
 
    private final Queue<OperationContext> pendingTokens = new ConcurrentLinkedQueue<OperationContext>();
+   
+   private final ExecutorFactory executorFactory;
 
    // Static --------------------------------------------------------
 
@@ -91,11 +93,12 @@
    /**
     * @param replicationConnectionManager
     */
-   public ReplicationManagerImpl(final FailoverManager failoverManager, final int backupWindowSize)
+   public ReplicationManagerImpl(final FailoverManager failoverManager, final ExecutorFactory executorFactory, final int backupWindowSize)
    {
       super();
       this.failoverManager = failoverManager;
       this.backupWindowSize = backupWindowSize;
+      this.executorFactory = executorFactory;
    }
 
    // Public --------------------------------------------------------
@@ -430,7 +433,7 @@
    {
       boolean runItNow = false;
 
-      OperationContext repliToken = getContext();
+      OperationContext repliToken = OperationContextImpl.getContext(executorFactory);
       repliToken.replicationLineUp();
 
       synchronized (replicationLock)
@@ -459,67 +462,16 @@
 
    private void replicated()
    {
-      List<OperationContext> tokensToExecute = getTokens();
+      OperationContext ctx = pendingTokens.poll();
 
-      for (OperationContext ctx : tokensToExecute)
+      if (ctx == null)
       {
-         ctx.replicationDone();
+         throw new IllegalStateException("Missing replication token on the queue.");
       }
-   }
 
-   public OperationContext getContext()
-   {
-      return OperationContextImpl.getInstance();
+      ctx.replicationDone();
    }
 
-   /**
-    * This method will first get all the sync tokens (that won't go to the backup node)
-    * Then it will get the round trip tokens.
-    * At last, if the list is empty, it will verify if there are any future tokens that are sync tokens, to avoid a case where no more replication is done due to inactivity.
-    * @return
-    */
-   private List<OperationContext> getTokens()
-   {
-      List<OperationContext> retList = new LinkedList<OperationContext>();
-
-      OperationContext tokenPolled = null;
-
-      // First will get all the non replicated tokens up to the first one that is not replicated
-      do
-      {
-         tokenPolled = pendingTokens.poll();
-
-         if (tokenPolled == null)
-         {
-            throw new IllegalStateException("Missing replication token on the queue.");
-         }
-
-         retList.add(tokenPolled);
-
-      }
-      while (tokenPolled.isSync());
-
-      // This is to avoid a situation where we won't have more replicated packets
-      // We need to make sure we process any pending sync packet up to the next non empty packet
-      synchronized (replicationLock)
-      {
-         while (!pendingTokens.isEmpty() && pendingTokens.peek().isSync())
-         {
-            tokenPolled = pendingTokens.poll();
-            if (!tokenPolled.isSync())
-            {
-               throw new IllegalStateException("Replicatoin context is not a roundtrip token as expected");
-            }
-
-            retList.add(tokenPolled);
-
-         }
-      }
-
-      return retList;
-   }
-
-
    // Inner classes -------------------------------------------------
 
    protected class ResponseHandler implements ChannelHandler

Modified: branches/ClebertCallback/src/main/org/hornetq/core/server/Queue.java
===================================================================
--- branches/ClebertCallback/src/main/org/hornetq/core/server/Queue.java	2009-11-22 18:49:03 UTC (rev 8368)
+++ branches/ClebertCallback/src/main/org/hornetq/core/server/Queue.java	2009-11-23 02:04:55 UTC (rev 8369)
@@ -125,10 +125,6 @@
    
    Collection<Consumer> getConsumers();
    
-   /** We can't execute IO operation when inside the IOCallback / TransactionCallback.
-    *  This method will will perform IO operations in a second thread */
-   boolean checkDLQ(MessageReference ref, Executor ioExecutor) throws Exception;
-   
    boolean checkDLQ(MessageReference ref) throws Exception;
    
    void lockDelivery();

Modified: branches/ClebertCallback/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- branches/ClebertCallback/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java	2009-11-22 18:49:03 UTC (rev 8368)
+++ branches/ClebertCallback/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java	2009-11-23 02:04:55 UTC (rev 8369)
@@ -651,7 +651,7 @@
       
       Executor sessionExecutor = executorFactory.getExecutor();
       
-      OperationContext sessionContext = storageManager.newContext(sessionExecutor);
+      storageManager.newContext(sessionExecutor);
 
       final ServerSessionImpl session = new ServerSessionImpl(name,
                                                               username,
@@ -667,7 +667,6 @@
                                                               postOffice,
                                                               resourceManager,
                                                               securityStore,
-                                                              sessionContext,
                                                               sessionExecutor,
                                                               channel,
                                                               managementService,
@@ -677,7 +676,8 @@
 
       sessions.put(name, session);
 
-      ServerSessionPacketHandler handler = new ServerSessionPacketHandler(session, sessionContext);
+      // The executor on the OperationContext here has to be the same as the session, or we would have ordering issues on messages
+      ServerSessionPacketHandler handler = new ServerSessionPacketHandler(session, storageManager.newContext(sessionExecutor), storageManager);
 
       session.setHandler(handler);
 
@@ -906,7 +906,7 @@
    {
       if (configuration.isPersistenceEnabled())
       {
-         return new JournalStorageManager(configuration, threadPool, replicationManager);
+         return new JournalStorageManager(configuration, this.executorFactory, replicationManager);
       }
       else
       {
@@ -935,6 +935,7 @@
             replicationFailoverManager = createBackupConnection(backupConnector, threadPool, scheduledPool);
 
             replicationManager = new ReplicationManagerImpl(replicationFailoverManager,
+                                                            executorFactory,
                                                             configuration.getBackupWindowSize());
             replicationManager.start();
          }

Modified: branches/ClebertCallback/src/main/org/hornetq/core/server/impl/LastValueQueue.java
===================================================================
--- branches/ClebertCallback/src/main/org/hornetq/core/server/impl/LastValueQueue.java	2009-11-22 18:49:03 UTC (rev 8368)
+++ branches/ClebertCallback/src/main/org/hornetq/core/server/impl/LastValueQueue.java	2009-11-23 02:04:55 UTC (rev 8369)
@@ -51,7 +51,6 @@
                          final Filter filter,
                          final boolean durable,
                          final boolean temporary,
-                         final Executor executor,
                          final ScheduledExecutorService scheduledExecutor,
                          final PostOffice postOffice,
                          final StorageManager storageManager,
@@ -63,7 +62,6 @@
             filter,
             durable,
             temporary,
-            executor,
             scheduledExecutor,
             postOffice,
             storageManager,

Modified: branches/ClebertCallback/src/main/org/hornetq/core/server/impl/QueueFactoryImpl.java
===================================================================
--- branches/ClebertCallback/src/main/org/hornetq/core/server/impl/QueueFactoryImpl.java	2009-11-22 18:49:03 UTC (rev 8368)
+++ branches/ClebertCallback/src/main/org/hornetq/core/server/impl/QueueFactoryImpl.java	2009-11-23 02:04:55 UTC (rev 8369)
@@ -83,7 +83,6 @@
                                     filter,
                                     durable,
                                     temporary,
-                                    executorFactory.getExecutor(),
                                     scheduledExecutor,
                                     postOffice,
                                     storageManager,
@@ -97,7 +96,6 @@
                                filter,
                                durable,
                                temporary,
-                               executorFactory.getExecutor(),
                                scheduledExecutor,
                                postOffice,
                                storageManager,

Modified: branches/ClebertCallback/src/main/org/hornetq/core/server/impl/QueueImpl.java
===================================================================
--- branches/ClebertCallback/src/main/org/hornetq/core/server/impl/QueueImpl.java	2009-11-22 18:49:03 UTC (rev 8368)
+++ branches/ClebertCallback/src/main/org/hornetq/core/server/impl/QueueImpl.java	2009-11-23 02:04:55 UTC (rev 8369)
@@ -115,9 +115,6 @@
 
    private final ScheduledExecutorService scheduledExecutor;
    
-   /** We can't perform any operation on the journal while inside the Transactional operations. */
-   private final Executor journalExecutor;
-
    private final SimpleString address;
 
    private Redistributor redistributor;
@@ -142,7 +139,6 @@
                     final Filter filter,
                     final boolean durable,
                     final boolean temporary,
-                    final Executor executor,
                     final ScheduledExecutorService scheduledExecutor,
                     final PostOffice postOffice,
                     final StorageManager storageManager,
@@ -167,8 +163,6 @@
       this.addressSettingsRepository = addressSettingsRepository;
 
       this.scheduledExecutor = scheduledExecutor;
-      
-      this.journalExecutor = executor;
 
       direct = true;
 
@@ -939,37 +933,12 @@
 
    public boolean checkDLQ(final MessageReference reference) throws Exception
    {
-      return checkDLQ(reference, null);
-   }
-   
-   public boolean checkDLQ(final MessageReference reference, Executor ioExecutor) throws Exception
-   {
       ServerMessage message = reference.getMessage();
 
       if (message.isDurable() && durable)
       {
-         if (ioExecutor != null)
-         {
-            ioExecutor.execute(new Runnable()
-            {
-               public void run()
-               {
-                  try
-                  {
-                     storageManager.updateDeliveryCount(reference);
-                     storageManager.completeOperations();
-                  }
-                  catch (Exception e)
-                  {
-                     log.warn("Can't update delivery count on checkDLQ", e);
-                  }
-               }
-            });
-         }
-         else
-         {
-            storageManager.updateDeliveryCount(reference);
-         }
+         storageManager.updateDeliveryCount(reference);
+         storageManager.waitOnOperations();
       }
 
       AddressSettings addressSettings = addressSettingsRepository.getMatch(address.toString());
@@ -978,28 +947,8 @@
 
       if (maxDeliveries > 0 && reference.getDeliveryCount() >= maxDeliveries)
       {
-         if (ioExecutor != null)
-         {
-            ioExecutor.execute(new Runnable()
-            {
-               public void run()
-               {
-                  try
-                  {
-                     sendToDeadLetterAddress(reference);
-                     storageManager.completeOperations();
-                  }
-                  catch (Exception e)
-                  {
-                     log.warn("Error on DLQ send", e);
-                  }
-               }
-            });
-         }
-         else
-         {
-            sendToDeadLetterAddress(reference);
-         }
+         sendToDeadLetterAddress(reference);
+         storageManager.waitOnOperations();
 
          return false;
       }
@@ -1011,28 +960,7 @@
          {
             reference.setScheduledDeliveryTime(System.currentTimeMillis() + redeliveryDelay);
 
-            if (ioExecutor != null)
-            {
-               ioExecutor.execute(new Runnable()
-               {
-                  public void run()
-                  {
-                     try
-                     {
-                        storageManager.updateScheduledDeliveryTime(reference);
-                        storageManager.completeOperations();
-                     }
-                     catch (Exception e)
-                     {
-                        log.warn("Error on DLQ send", e);
-                     }
-                  }
-               });
-            }
-            else
-            {
-               storageManager.updateScheduledDeliveryTime(reference);
-            }
+            storageManager.updateScheduledDeliveryTime(reference);
          }
 
          deliveringCount.decrementAndGet();
@@ -1495,23 +1423,14 @@
 
             // also note then when this happens as part of a trasaction its the tx commt of the ack that is important
             // not this
-            
-            // and this has to happen in a different thread
-            
-            journalExecutor.execute(new Runnable()
+            try
             {
-               public void run()
-               {
-                  try
-                  {
-                     storageManager.deleteMessage(message.getMessageID());
-                  }
-                  catch (Exception e)
-                  {
-                     log.warn("Unable to remove message id = " + message.getMessageID() + " please remove manually");
-                  }
-               }
-            });
+               storageManager.deleteMessage(message.getMessageID());
+            }
+            catch (Exception e)
+            {
+               log.warn("Unable to remove message id = " + message.getMessageID() + " please remove manually");
+            }
          }
       }
 
@@ -1584,7 +1503,7 @@
          {
             try
             {
-               if (ref.getQueue().checkDLQ(ref, journalExecutor))
+               if (ref.getQueue().checkDLQ(ref))
                {
                   LinkedList<MessageReference> toCancel = queueMap.get(ref.getQueue());
    
@@ -1600,8 +1519,6 @@
             }
             catch (Exception e)
             {
-               // checkDLQ here will be using an executor, this shouldn't happen
-               // don't you just hate checked exceptions in java?
                log.warn("Error on checkDLQ", e);
             }
          }

Modified: branches/ClebertCallback/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
===================================================================
--- branches/ClebertCallback/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java	2009-11-22 18:49:03 UTC (rev 8368)
+++ branches/ClebertCallback/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java	2009-11-23 02:04:55 UTC (rev 8369)
@@ -39,9 +39,7 @@
 import org.hornetq.core.management.ManagementService;
 import org.hornetq.core.management.Notification;
 import org.hornetq.core.paging.PagingStore;
-import org.hornetq.core.persistence.OperationContext;
 import org.hornetq.core.persistence.StorageManager;
-import org.hornetq.core.persistence.impl.journal.OperationContextImpl;
 import org.hornetq.core.postoffice.Binding;
 import org.hornetq.core.postoffice.BindingType;
 import org.hornetq.core.postoffice.Bindings;
@@ -189,10 +187,6 @@
 
    private final SimpleString managementAddress;
    
-   /** We always use the same operation context for the session.
-    *  With that we can perform extra checks */
-   private final OperationContext sessionOperationContext;
-
    // The current currentLargeMessage being processed
    private volatile LargeServerMessage currentLargeMessage;
 
@@ -218,7 +212,6 @@
                             final PostOffice postOffice,
                             final ResourceManager resourceManager,
                             final SecurityStore securityStore,
-                            final OperationContext sessionOperationContext,
                             final Executor executor,
                             final Channel channel,
                             final ManagementService managementService,
@@ -249,8 +242,6 @@
 
       this.securityStore = securityStore;
       
-      this.sessionOperationContext = sessionOperationContext;
-
       this.executor = executor;
 
       if (!xa)

Modified: branches/ClebertCallback/src/main/org/hornetq/core/server/impl/ServerSessionPacketHandler.java
===================================================================
--- branches/ClebertCallback/src/main/org/hornetq/core/server/impl/ServerSessionPacketHandler.java	2009-11-22 18:49:03 UTC (rev 8368)
+++ branches/ClebertCallback/src/main/org/hornetq/core/server/impl/ServerSessionPacketHandler.java	2009-11-23 02:04:55 UTC (rev 8369)
@@ -46,6 +46,7 @@
 
 import org.hornetq.core.logging.Logger;
 import org.hornetq.core.persistence.OperationContext;
+import org.hornetq.core.persistence.StorageManager;
 import org.hornetq.core.remoting.ChannelHandler;
 import org.hornetq.core.remoting.Packet;
 import org.hornetq.core.remoting.impl.wireformat.CreateQueueMessage;
@@ -90,10 +91,16 @@
    private final ServerSession session;
    
    private final OperationContext sessionContext;
+   
+   // Storagemanager here is used to set the Context
+   private final StorageManager storageManager;
 
-   public ServerSessionPacketHandler(final ServerSession session, OperationContext sessionContext)
+   public ServerSessionPacketHandler(final ServerSession session, OperationContext sessionContext, StorageManager storageManager)
    {
       this.session = session;
+      
+      this.storageManager = storageManager;
+      
       this.sessionContext = sessionContext;
    }
 
@@ -108,7 +115,7 @@
 
       if (sessionContext != null)
       {
-         sessionContext.reinstall();
+         storageManager.setContext(sessionContext);
       }
       
       try
@@ -303,7 +310,7 @@
       {
          if (sessionContext != null)
          {
-            sessionContext.complete();
+            storageManager.completeOperations();
          }
       }
    }

Modified: branches/ClebertCallback/tests/src/org/hornetq/tests/integration/largemessage/ServerLargeMessageTest.java
===================================================================
--- branches/ClebertCallback/tests/src/org/hornetq/tests/integration/largemessage/ServerLargeMessageTest.java	2009-11-22 18:49:03 UTC (rev 8368)
+++ branches/ClebertCallback/tests/src/org/hornetq/tests/integration/largemessage/ServerLargeMessageTest.java	2009-11-23 02:04:55 UTC (rev 8369)
@@ -13,6 +13,8 @@
 
 package org.hornetq.tests.integration.largemessage;
 
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 
 import org.hornetq.core.config.Configuration;
@@ -21,6 +23,8 @@
 import org.hornetq.core.server.LargeServerMessage;
 import org.hornetq.core.server.ServerMessage;
 import org.hornetq.tests.util.ServiceTestBase;
+import org.hornetq.utils.ExecutorFactory;
+import org.hornetq.utils.OrderedExecutorFactory;
 
 /**
  * A ServerLargeMessageTest
@@ -36,12 +40,32 @@
 
    // Attributes ----------------------------------------------------
 
+   ExecutorService executor;
+   
+   ExecutorFactory execFactory;
+   
    // Static --------------------------------------------------------
 
    // Constructors --------------------------------------------------
 
    // Public --------------------------------------------------------
    
+   protected void setup() throws Exception
+   {
+      super.setUp();
+      
+      executor = Executors.newCachedThreadPool();
+      
+      execFactory = new OrderedExecutorFactory(executor);
+   }
+   
+   protected void tearDown() throws Exception
+   {
+      executor.shutdown();
+      
+      super.tearDown();
+   }
+   
    public void testLargeMessageCopy() throws Exception
    {
       clearData();
@@ -52,7 +76,7 @@
 
       configuration.setJournalType(JournalType.ASYNCIO);
 
-      final JournalStorageManager journal = new JournalStorageManager(configuration, Executors.newCachedThreadPool());
+      final JournalStorageManager journal = new JournalStorageManager(configuration, execFactory);
       journal.start();
 
       LargeServerMessage msg = journal.createLargeMessage();

Modified: branches/ClebertCallback/tests/src/org/hornetq/tests/integration/persistence/RestartSMTest.java
===================================================================
--- branches/ClebertCallback/tests/src/org/hornetq/tests/integration/persistence/RestartSMTest.java	2009-11-22 18:49:03 UTC (rev 8368)
+++ branches/ClebertCallback/tests/src/org/hornetq/tests/integration/persistence/RestartSMTest.java	2009-11-23 02:04:55 UTC (rev 8369)
@@ -18,6 +18,8 @@
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 
 import org.hornetq.core.config.Configuration;
@@ -30,6 +32,8 @@
 import org.hornetq.core.server.Queue;
 import org.hornetq.tests.unit.core.server.impl.fakes.FakePostOffice;
 import org.hornetq.tests.util.ServiceTestBase;
+import org.hornetq.utils.ExecutorFactory;
+import org.hornetq.utils.OrderedExecutorFactory;
 
 /**
  * A DeleteMessagesRestartTest
@@ -48,11 +52,32 @@
 
    // Attributes ----------------------------------------------------
 
+   ExecutorService executor;
+   
+   ExecutorFactory execFactory;
+   
    // Static --------------------------------------------------------
 
    // Constructors --------------------------------------------------
 
    // Public --------------------------------------------------------
+   
+   protected void setUp() throws Exception
+   {
+      super.setUp();
+      
+      executor = Executors.newCachedThreadPool();
+      
+      this.execFactory = new OrderedExecutorFactory(executor);
+   }
+   
+   protected void tearDown() throws Exception
+   {
+      executor.shutdown();
+      
+      super.tearDown();
+   }
+   
 
    public void testRestartStorageManager() throws Exception
    {
@@ -67,7 +92,8 @@
 
       PostOffice postOffice = new FakePostOffice();
 
-      final JournalStorageManager journal = new JournalStorageManager(configuration, Executors.newCachedThreadPool());
+      final JournalStorageManager journal = new JournalStorageManager(configuration, execFactory);
+      
       try
       {
 

Modified: branches/ClebertCallback/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java
===================================================================
--- branches/ClebertCallback/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java	2009-11-22 18:49:03 UTC (rev 8368)
+++ branches/ClebertCallback/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java	2009-11-23 02:04:55 UTC (rev 8369)
@@ -53,7 +53,7 @@
 import org.hornetq.core.paging.impl.PagingStoreFactoryNIO;
 import org.hornetq.core.persistence.OperationContext;
 import org.hornetq.core.persistence.StorageManager;
-import org.hornetq.core.persistence.impl.journal.OperationContextImpl;
+import org.hornetq.core.persistence.impl.journal.JournalStorageManager;
 import org.hornetq.core.remoting.Interceptor;
 import org.hornetq.core.remoting.Packet;
 import org.hornetq.core.remoting.RemotingConnection;
@@ -70,6 +70,7 @@
 import org.hornetq.tests.util.ServiceTestBase;
 import org.hornetq.utils.ExecutorFactory;
 import org.hornetq.utils.HornetQThreadFactory;
+import org.hornetq.utils.OrderedExecutorFactory;
 import org.hornetq.utils.SimpleString;
 
 /**
@@ -89,6 +90,8 @@
    private ThreadFactory tFactory;
 
    private ExecutorService executor;
+   
+   private ExecutorFactory factory;
 
    private ScheduledExecutorService scheduledExecutor;
 
@@ -114,6 +117,7 @@
       try
       {
          ReplicationManagerImpl manager = new ReplicationManagerImpl(failoverManager,
+                                                                     this.factory,
                                                                      ConfigurationImpl.DEFAULT_BACKUP_WINDOW_SIZE);
          manager.start();
          manager.stop();
@@ -140,6 +144,7 @@
       try
       {
          ReplicationManagerImpl manager = new ReplicationManagerImpl(failoverManager,
+                                                                     this.factory,
                                                                      ConfigurationImpl.DEFAULT_BACKUP_WINDOW_SIZE);
          manager.start();
          try
@@ -182,6 +187,7 @@
       try
       {
          ReplicationManagerImpl manager = new ReplicationManagerImpl(failoverManager,
+                                                                     this.factory,
                                                                      ConfigurationImpl.DEFAULT_BACKUP_WINDOW_SIZE);
 
          manager.start();
@@ -189,6 +195,7 @@
          try
          {
             ReplicationManagerImpl manager2 = new ReplicationManagerImpl(failoverManager,
+                                                                         this.factory,
                                                                          ConfigurationImpl.DEFAULT_BACKUP_WINDOW_SIZE);
 
             manager2.start();
@@ -223,6 +230,7 @@
       try
       {
          ReplicationManagerImpl manager = new ReplicationManagerImpl(failoverManager,
+                                                                     this.factory,
                                                                      ConfigurationImpl.DEFAULT_BACKUP_WINDOW_SIZE);
 
          try
@@ -241,7 +249,7 @@
          server.stop();
       }
    }
-
+   
    public void testSendPackets() throws Exception
    {
 
@@ -257,7 +265,10 @@
 
       try
       {
+         StorageManager storage = getStorage();
+         
          ReplicationManagerImpl manager = new ReplicationManagerImpl(failoverManager,
+                                                                     this.factory,
                                                                      ConfigurationImpl.DEFAULT_BACKUP_WINDOW_SIZE);
          manager.start();
 
@@ -276,7 +287,7 @@
          replicatedJournal.appendPrepareRecord(3, new FakeData(), false);
          replicatedJournal.appendRollbackRecord(3, false);
 
-         blockOnReplication(manager);
+         blockOnReplication(storage, manager);
 
          assertEquals(0, manager.getActiveTokens().size());
 
@@ -294,7 +305,7 @@
          manager.pageWrite(pgmsg, 3);
          manager.pageWrite(pgmsg, 4);
 
-         blockOnReplication(manager);
+         blockOnReplication(storage, manager);
 
          PagingManager pagingManager = createPageManager(server.getStorageManager(),
                                                          server.getConfiguration(),
@@ -313,7 +324,7 @@
          manager.pageDeleted(dummy, 5);
          manager.pageDeleted(dummy, 6);
 
-         blockOnReplication(manager);
+         blockOnReplication(storage, manager);
 
          ServerMessageImpl serverMsg = new ServerMessageImpl();
          serverMsg.setMessageID(500);
@@ -328,7 +339,7 @@
 
          manager.largeMessageDelete(500);
 
-         blockOnReplication(manager);
+         blockOnReplication(storage, manager);
 
          store.start();
 
@@ -363,7 +374,9 @@
 
       try
       {
+         StorageManager storage = getStorage();
          ReplicationManagerImpl manager = new ReplicationManagerImpl(failoverManager,
+                                                                     this.factory,
                                                                      ConfigurationImpl.DEFAULT_BACKUP_WINDOW_SIZE);
          manager.start();
 
@@ -377,7 +390,7 @@
          }
 
          final CountDownLatch latch = new CountDownLatch(1);
-         OperationContextImpl.getInstance().executeOnCompletion(new IOAsyncTask()
+         storage.afterCompleteOperations(new IOAsyncTask()
          {
 
             public void onError(int errorCode, String errorMessage)
@@ -401,13 +414,21 @@
    }
 
    /**
+    * @return
+    */
+   private JournalStorageManager getStorage()
+   {
+      return new JournalStorageManager(createDefaultConfig(), factory);
+   }
+
+   /**
     * @param manager
     * @return
     */
-   private void blockOnReplication(ReplicationManagerImpl manager) throws Exception
+   private void blockOnReplication(StorageManager storage, ReplicationManagerImpl manager) throws Exception
    {
       final CountDownLatch latch = new CountDownLatch(1);
-      OperationContextImpl.getInstance().executeOnCompletion(new IOAsyncTask()
+      storage.afterCompleteOperations(new IOAsyncTask()
       {
 
          public void onError(int errorCode, String errorMessage)
@@ -430,6 +451,7 @@
       try
       {
          ReplicationManagerImpl manager = new ReplicationManagerImpl(failoverManager,
+                                                                     this.factory,
                                                                      ConfigurationImpl.DEFAULT_BACKUP_WINDOW_SIZE);
          manager.start();
          fail("Exception expected");
@@ -455,7 +477,9 @@
 
       try
       {
+         StorageManager storage = getStorage();
          ReplicationManagerImpl manager = new ReplicationManagerImpl(failoverManager,
+                                                                     this.factory,
                                                                      ConfigurationImpl.DEFAULT_BACKUP_WINDOW_SIZE);
          manager.start();
 
@@ -464,7 +488,7 @@
          replicatedJournal.appendPrepareRecord(1, new FakeData(), false);
 
          final CountDownLatch latch = new CountDownLatch(1);
-         OperationContextImpl.getInstance().executeOnCompletion(new IOAsyncTask()
+         storage.afterCompleteOperations(new IOAsyncTask()
          {
 
             public void onError(int errorCode, String errorMessage)
@@ -505,7 +529,9 @@
 
       try
       {
+         StorageManager storage = getStorage();
          ReplicationManagerImpl manager = new ReplicationManagerImpl(failoverManager,
+                                                                     this.factory,
                                                                      ConfigurationImpl.DEFAULT_BACKUP_WINDOW_SIZE);
          manager.start();
 
@@ -515,7 +541,7 @@
 
          final CountDownLatch latch = new CountDownLatch(numberOfAdds);
 
-         OperationContext ctx = OperationContextImpl.getInstance();
+         OperationContext ctx = storage.getContext();
          
          for (int i = 0; i < numberOfAdds; i++)
          {
@@ -593,9 +619,26 @@
       executor = Executors.newCachedThreadPool(tFactory);
 
       scheduledExecutor = new ScheduledThreadPoolExecutor(10, tFactory);
+      
+      factory = new OrderedExecutorFactory(executor);
+   }
 
+   protected void tearDown() throws Exception
+   {
+
+      executor.shutdown();
+
+      scheduledExecutor.shutdown();
+
+      tFactory = null;
+
+      scheduledExecutor = null;
+
+      super.tearDown();
+
    }
 
+
    private FailoverManagerImpl createFailoverManager()
    {
       return createFailoverManager(null);
@@ -622,22 +665,6 @@
                                      scheduledExecutor,
                                      interceptors);
    }
-
-   protected void tearDown() throws Exception
-   {
-
-      executor.shutdown();
-
-      scheduledExecutor.shutdown();
-
-      tFactory = null;
-
-      scheduledExecutor = null;
-
-      super.tearDown();
-
-   }
-
    protected PagingManager createPageManager(StorageManager storageManager,
                                              Configuration configuration,
                                              ExecutorFactory executorFactory,

Modified: branches/ClebertCallback/tests/src/org/hornetq/tests/performance/persistence/StorageManagerTimingTest.java
===================================================================
--- branches/ClebertCallback/tests/src/org/hornetq/tests/performance/persistence/StorageManagerTimingTest.java	2009-11-22 18:49:03 UTC (rev 8368)
+++ branches/ClebertCallback/tests/src/org/hornetq/tests/performance/persistence/StorageManagerTimingTest.java	2009-11-23 02:04:55 UTC (rev 8369)
@@ -15,6 +15,8 @@
 
 import java.io.File;
 import java.util.HashMap;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.atomic.AtomicLong;
 
@@ -29,6 +31,7 @@
 import org.hornetq.core.server.impl.ServerMessageImpl;
 import org.hornetq.tests.unit.core.server.impl.fakes.FakePostOffice;
 import org.hornetq.tests.util.UnitTestCase;
+import org.hornetq.utils.ExecutorFactory;
 import org.hornetq.utils.SimpleString;
 
 /**
@@ -110,6 +113,17 @@
                                        final int transInterval,
                                        final int numberOfThreads) throws Exception
    {
+      
+      final ExecutorService executor = Executors.newCachedThreadPool();
+     
+      ExecutorFactory factory = new ExecutorFactory()
+      {
+         public Executor getExecutor()
+         {
+            return executor;
+         }
+      };
+      
       FileConfiguration configuration = new FileConfiguration();
 
       configuration.start();
@@ -122,7 +136,7 @@
       PostOffice postOffice = new FakePostOffice();
 
       final JournalStorageManager journal = new JournalStorageManager(configuration,
-                                                                      Executors.newCachedThreadPool());
+                                                                      factory);
       journal.start();
 
       HashMap<Long, Queue> queues = new HashMap<Long, Queue>();
@@ -187,6 +201,7 @@
                   if (transInterval > 0 && i % transInterval == 0)
                   {
                      journal.commit(trans);
+                     journal.waitOnOperations();
                      commits++;
                      trans = transactionGenerator.incrementAndGet();
                      commitPending = false;
@@ -194,7 +209,10 @@
                }
 
                if (commitPending)
+               {
                   journal.commit(trans);
+                  journal.waitOnOperations();
+               }
 
                long end = System.currentTimeMillis();
 

Modified: branches/ClebertCallback/tests/src/org/hornetq/tests/timing/core/server/impl/QueueImplTest.java
===================================================================
--- branches/ClebertCallback/tests/src/org/hornetq/tests/timing/core/server/impl/QueueImplTest.java	2009-11-22 18:49:03 UTC (rev 8368)
+++ branches/ClebertCallback/tests/src/org/hornetq/tests/timing/core/server/impl/QueueImplTest.java	2009-11-23 02:04:55 UTC (rev 8369)
@@ -15,8 +15,6 @@
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
@@ -44,22 +42,18 @@
 
    private ScheduledExecutorService scheduledExecutor;
    
-   private ExecutorService executor;
+   //private ExecutorService executor;
 
    public void setUp() throws Exception
    {
    	super.setUp();
 
    	scheduledExecutor = new ScheduledThreadPoolExecutor(1);
-   	
-   	executor = Executors.newSingleThreadExecutor();
    }
 
    public void tearDown() throws Exception
    {
    	scheduledExecutor.shutdownNow();
-   	
-   	executor.shutdown();
 
       super.tearDown();
    }
@@ -78,7 +72,7 @@
 
    public void testScheduledNoConsumer() throws Exception
    {
-      Queue queue = new QueueImpl(1, new SimpleString("address1"), new SimpleString("queue1"), null, false, true, executor, scheduledExecutor, null, null, null);
+      Queue queue = new QueueImpl(1, new SimpleString("address1"), new SimpleString("queue1"), null, false, true, scheduledExecutor, null, null, null);
 
       //Send one scheduled
 
@@ -144,7 +138,7 @@
 
    private void testScheduled(boolean direct) throws Exception
    {
-      Queue queue = new QueueImpl(1,new SimpleString("address1"), new SimpleString("queue1"), null, false, true, executor, scheduledExecutor, null, null, null);
+      Queue queue = new QueueImpl(1,new SimpleString("address1"), new SimpleString("queue1"), null, false, true, scheduledExecutor, null, null, null);
 
       FakeConsumer consumer = null;
 
@@ -251,7 +245,7 @@
             return HandleStatus.HANDLED;
          }
       };
-      Queue queue = new QueueImpl(1, new SimpleString("address1"), queue1, null, false, true, executor, scheduledExecutor, null, null, null);
+      Queue queue = new QueueImpl(1, new SimpleString("address1"), queue1, null, false, true, scheduledExecutor, null, null, null);
       MessageReference messageReference = generateReference(queue, 1);
       queue.addConsumer(consumer);
       messageReference.setScheduledDeliveryTime(System.currentTimeMillis() + 2000);

Modified: branches/ClebertCallback/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java
===================================================================
--- branches/ClebertCallback/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java	2009-11-22 18:49:03 UTC (rev 8368)
+++ branches/ClebertCallback/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java	2009-11-23 02:04:55 UTC (rev 8369)
@@ -1264,6 +1264,20 @@
          return null;
       }
 
+      /* (non-Javadoc)
+       * @see org.hornetq.core.persistence.StorageManager#clearContext()
+       */
+      public void clearContext()
+      {
+      }
+
+      /* (non-Javadoc)
+       * @see org.hornetq.core.persistence.StorageManager#setContext(org.hornetq.core.persistence.OperationContext)
+       */
+      public void setContext(OperationContext context)
+      {
+      }
+
    }
 
    class FakeStoreFactory implements PagingStoreFactory

Modified: branches/ClebertCallback/tests/src/org/hornetq/tests/unit/core/postoffice/impl/DuplicateDetectionUnitTest.java
===================================================================
--- branches/ClebertCallback/tests/src/org/hornetq/tests/unit/core/postoffice/impl/DuplicateDetectionUnitTest.java	2009-11-22 18:49:03 UTC (rev 8368)
+++ branches/ClebertCallback/tests/src/org/hornetq/tests/unit/core/postoffice/impl/DuplicateDetectionUnitTest.java	2009-11-23 02:04:55 UTC (rev 8369)
@@ -17,6 +17,7 @@
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
+import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
@@ -38,6 +39,8 @@
 import org.hornetq.tests.unit.core.server.impl.fakes.FakePostOffice;
 import org.hornetq.tests.util.RandomUtil;
 import org.hornetq.tests.util.ServiceTestBase;
+import org.hornetq.utils.ExecutorFactory;
+import org.hornetq.utils.OrderedExecutorFactory;
 import org.hornetq.utils.Pair;
 import org.hornetq.utils.SimpleString;
 
@@ -60,6 +63,8 @@
 
    ExecutorService executor;
    
+   ExecutorFactory factory;
+   
    @Override
    protected void tearDown() throws Exception
    {
@@ -71,6 +76,7 @@
    {
       super.setUp();
       executor = Executors.newSingleThreadExecutor();
+      factory = new OrderedExecutorFactory(executor);
    }
 
    // Public --------------------------------------------------------
@@ -96,7 +102,7 @@
 
          ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(ConfigurationImpl.DEFAULT_SCHEDULED_THREAD_POOL_MAX_SIZE);
 
-         journal = new JournalStorageManager(configuration, Executors.newCachedThreadPool());
+         journal = new JournalStorageManager(configuration, factory);
 
          journal.start();
          journal.loadBindingJournal(new ArrayList<QueueBindingInfo>(), new ArrayList<GroupingInfo>());
@@ -111,7 +117,7 @@
 
          assertEquals(0, mapDups.size());
 
-         DuplicateIDCacheImpl cacheID = new DuplicateIDCacheImpl(ADDRESS, 10, journal, true, executor);
+         DuplicateIDCacheImpl cacheID = new DuplicateIDCacheImpl(ADDRESS, 10, journal, true);
 
          for (int i = 0; i < 100; i++)
          {
@@ -120,7 +126,7 @@
 
          journal.stop();
 
-         journal = new JournalStorageManager(configuration, Executors.newCachedThreadPool());
+         journal = new JournalStorageManager(configuration, factory);
          journal.start();
          journal.loadBindingJournal(new ArrayList<QueueBindingInfo>(), new ArrayList<GroupingInfo>());
 
@@ -136,7 +142,7 @@
 
          assertEquals(10, values.size());
 
-         cacheID = new DuplicateIDCacheImpl(ADDRESS, 10, journal, true, executor);
+         cacheID = new DuplicateIDCacheImpl(ADDRESS, 10, journal, true);
          cacheID.load(values);
 
          for (int i = 0; i < 100; i++)
@@ -148,7 +154,7 @@
 
          mapDups.clear();
 
-         journal = new JournalStorageManager(configuration, Executors.newCachedThreadPool());
+         journal = new JournalStorageManager(configuration, factory);
          journal.start();
          journal.loadBindingJournal(new ArrayList<QueueBindingInfo>(), new ArrayList<GroupingInfo>());
 

Modified: branches/ClebertCallback/tests/src/org/hornetq/tests/unit/core/server/impl/QueueImplTest.java
===================================================================
--- branches/ClebertCallback/tests/src/org/hornetq/tests/unit/core/server/impl/QueueImplTest.java	2009-11-22 18:49:03 UTC (rev 8368)
+++ branches/ClebertCallback/tests/src/org/hornetq/tests/unit/core/server/impl/QueueImplTest.java	2009-11-23 02:04:55 UTC (rev 8369)
@@ -17,7 +17,6 @@
 import java.util.LinkedList;
 import java.util.List;
 import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 
@@ -46,19 +45,15 @@
 
    private ScheduledExecutorService scheduledExecutor;
    
-   private ExecutorService executor;
-   
    protected void setUp() throws Exception
    {
       super.setUp();
       scheduledExecutor =  Executors.newSingleThreadScheduledExecutor();
-      executor = Executors.newSingleThreadExecutor();      
    }
    
    protected void tearDown() throws Exception
    {
       scheduledExecutor.shutdown();
-      executor.shutdown();
       super.tearDown();
    }
 
@@ -70,18 +65,18 @@
    {
       final SimpleString name = new SimpleString("oobblle");
 
-      Queue queue = new QueueImpl(1, address1, name, null, false, true, executor, scheduledExecutor, null, null, null);
+      Queue queue = new QueueImpl(1, address1, name, null, false, true, scheduledExecutor, null, null, null);
 
       assertEquals(name, queue.getName());
    }
 
    public void testDurable()
    {
-      Queue queue = new QueueImpl(1, address1, queue1, null, false, false, executor, scheduledExecutor, null, null, null);
+      Queue queue = new QueueImpl(1, address1, queue1, null, false, false, scheduledExecutor, null, null, null);
 
       assertFalse(queue.isDurable());
 
-      queue = new QueueImpl(1, address1, queue1, null, true, false, executor, scheduledExecutor, null, null, null);
+      queue = new QueueImpl(1, address1, queue1, null, true, false, scheduledExecutor, null, null, null);
 
       assertTrue(queue.isDurable());
    }
@@ -94,7 +89,7 @@
 
       Consumer cons3 = new FakeConsumer();
 
-      Queue queue = new QueueImpl(1, address1, queue1, null, false, true, executor, scheduledExecutor, null, null, null);
+      Queue queue = new QueueImpl(1, address1, queue1, null, false, true, scheduledExecutor, null, null, null);
 
       assertEquals(0, queue.getConsumerCount());
 
@@ -135,7 +130,7 @@
 
    public void testGetFilter()
    {
-      Queue queue = new QueueImpl(1, address1, queue1, null, false, true, executor, scheduledExecutor, null, null, null);
+      Queue queue = new QueueImpl(1, address1, queue1, null, false, true, scheduledExecutor, null, null, null);
 
       assertNull(queue.getFilter());
 
@@ -152,7 +147,7 @@
          }
       };
 
-      queue = new QueueImpl(1, address1, queue1, filter, false, true, executor, scheduledExecutor, null, null, null);
+      queue = new QueueImpl(1, address1, queue1, filter, false, true, scheduledExecutor, null, null, null);
 
       assertEquals(filter, queue.getFilter());
 
@@ -160,7 +155,7 @@
 
    public void testSimpleadd()
    {
-      Queue queue = new QueueImpl(1, address1, queue1, null, false, true, executor, scheduledExecutor, null, null, null);
+      Queue queue = new QueueImpl(1, address1, queue1, null, false, true, scheduledExecutor, null, null, null);
 
       final int numMessages = 10;
 
@@ -179,7 +174,7 @@
 
    public void testSimpleDirectDelivery() throws Exception
    {
-      Queue queue = new QueueImpl(1, address1, queue1, null, false, true, executor, scheduledExecutor, null, null, null);
+      Queue queue = new QueueImpl(1, address1, queue1, null, false, true, scheduledExecutor, null, null, null);
 
       FakeConsumer consumer = new FakeConsumer();
 
@@ -207,7 +202,7 @@
 
    public void testSimpleNonDirectDelivery() throws Exception
    {
-      Queue queue = new QueueImpl(1, address1, queue1, null, false, true, executor, scheduledExecutor, null, null, null);
+      Queue queue = new QueueImpl(1, address1, queue1, null, false, true, scheduledExecutor, null, null, null);
 
       final int numMessages = 10;
 
@@ -245,7 +240,7 @@
 
    public void testBusyConsumer() throws Exception
    {
-      Queue queue = new QueueImpl(1, address1, queue1, null, false, true, executor, scheduledExecutor, null, null, null);
+      Queue queue = new QueueImpl(1, address1, queue1, null, false, true, scheduledExecutor, null, null, null);
 
       FakeConsumer consumer = new FakeConsumer();
 
@@ -289,7 +284,7 @@
 
    public void testBusyConsumerThenAddMoreMessages() throws Exception
    {
-      Queue queue = new QueueImpl(1, address1, queue1, null, false, true, executor, scheduledExecutor, null, null, null);
+      Queue queue = new QueueImpl(1, address1, queue1, null, false, true, scheduledExecutor, null, null, null);
 
       FakeConsumer consumer = new FakeConsumer();
 
@@ -356,7 +351,7 @@
 
    public void testAddFirstadd() throws Exception
    {
-      Queue queue = new QueueImpl(1, address1, queue1, null, false, true, executor, scheduledExecutor, null, null, null);
+      Queue queue = new QueueImpl(1, address1, queue1, null, false, true, scheduledExecutor, null, null, null);
 
       final int numMessages = 10;
 
@@ -416,7 +411,7 @@
                                   null,
                                   false,
                                   true,
-                                  executor, scheduledExecutor,
+                                  scheduledExecutor,
                                   new FakePostOffice(),
                                   null,
                                   null);
@@ -573,7 +568,7 @@
 
    public void testConsumerReturningNull() throws Exception
    {
-      Queue queue = new QueueImpl(1, address1, queue1, null, false, true, executor, scheduledExecutor, null, null, null);
+      Queue queue = new QueueImpl(1, address1, queue1, null, false, true, scheduledExecutor, null, null, null);
 
       class NullConsumer implements Consumer
       {
@@ -606,7 +601,7 @@
 
    public void testRoundRobinWithQueueing() throws Exception
    {
-      Queue queue = new QueueImpl(1, address1, queue1, null, false, true, executor, scheduledExecutor, null, null, null);
+      Queue queue = new QueueImpl(1, address1, queue1, null, false, true, scheduledExecutor, null, null, null);
 
       final int numMessages = 10;
 
@@ -649,7 +644,7 @@
 
    public void testRoundRobinDirect() throws Exception
    {
-      Queue queue = new QueueImpl(1, address1, queue1, null, false, true, executor, scheduledExecutor, null, null, null);
+      Queue queue = new QueueImpl(1, address1, queue1, null, false, true, scheduledExecutor, null, null, null);
 
       final int numMessages = 10;
 
@@ -690,7 +685,7 @@
 
    public void testWithPriorities() throws Exception
    {
-      Queue queue = new QueueImpl(1, address1, queue1, null, false, true, executor, scheduledExecutor, null, null, null);
+      Queue queue = new QueueImpl(1, address1, queue1, null, false, true, scheduledExecutor, null, null, null);
 
       final int numMessages = 10;
 
@@ -757,7 +752,7 @@
 
    public void testConsumerWithFilterAddAndRemove()
    {
-      Queue queue = new QueueImpl(1, address1, queue1, null, false, true, executor, scheduledExecutor, null, null, null);
+      Queue queue = new QueueImpl(1, address1, queue1, null, false, true, scheduledExecutor, null, null, null);
 
       Filter filter = new FakeFilter("fruit", "orange");
 
@@ -766,7 +761,7 @@
 
    public void testList()
    {
-      Queue queue = new QueueImpl(1, address1, queue1, null, false, true, executor, scheduledExecutor, null, null, null);
+      Queue queue = new QueueImpl(1, address1, queue1, null, false, true, scheduledExecutor, null, null, null);
 
       final int numMessages = 20;
 
@@ -790,7 +785,7 @@
 
    public void testListWithFilter()
    {
-      Queue queue = new QueueImpl(1, address1, queue1, null, false, true, executor, scheduledExecutor, null, null, null);
+      Queue queue = new QueueImpl(1, address1, queue1, null, false, true, scheduledExecutor, null, null, null);
 
       final int numMessages = 20;
 
@@ -832,7 +827,7 @@
                                   null,
                                   false,
                                   true,
-                                  executor, scheduledExecutor,
+                                  scheduledExecutor,
                                   new FakePostOffice(),
                                   null,
                                   null);
@@ -904,7 +899,7 @@
 
    public void testBusyConsumerWithFilterFirstCallBusy() throws Exception
    {
-      Queue queue = new QueueImpl(1, address1, queue1, null, false, true, executor, scheduledExecutor, null, null, null);
+      Queue queue = new QueueImpl(1, address1, queue1, null, false, true, scheduledExecutor, null, null, null);
 
       FakeConsumer consumer = new FakeConsumer(FilterImpl.createFilter("color = 'green'"));
 
@@ -945,7 +940,7 @@
 
    public void testBusyConsumerWithFilterThenAddMoreMessages() throws Exception
    {
-      Queue queue = new QueueImpl(1, address1, queue1, null, false, true, executor, scheduledExecutor, null, null, null);
+      Queue queue = new QueueImpl(1, address1, queue1, null, false, true, scheduledExecutor, null, null, null);
 
       FakeConsumer consumer = new FakeConsumer(FilterImpl.createFilter("color = 'green'"));
 
@@ -1019,7 +1014,7 @@
 
    public void testConsumerWithFilterThenAddMoreMessages() throws Exception
    {
-      Queue queue = new QueueImpl(1, address1, queue1, null, false, true, executor, scheduledExecutor, null, null, null);
+      Queue queue = new QueueImpl(1, address1, queue1, null, false, true, scheduledExecutor, null, null, null);
 
       final int numMessages = 10;
       List<MessageReference> refs = new ArrayList<MessageReference>();
@@ -1089,7 +1084,7 @@
                                   null,
                                   false,
                                   true,
-                                  executor, scheduledExecutor,
+                                  scheduledExecutor,
                                   new FakePostOffice(),
                                   null,
                                   null);
@@ -1181,7 +1176,7 @@
    public void testMessageOrder() throws Exception
    {
       FakeConsumer consumer = new FakeConsumer();
-      Queue queue = new QueueImpl(1, address1, queue1, null, false, true, executor, scheduledExecutor, null, null, null);
+      Queue queue = new QueueImpl(1, address1, queue1, null, false, true, scheduledExecutor, null, null, null);
       MessageReference messageReference = generateReference(queue, 1);
       MessageReference messageReference2 = generateReference(queue, 2);
       MessageReference messageReference3 = generateReference(queue, 3);
@@ -1201,7 +1196,7 @@
 
    public void testMessagesAdded() throws Exception
    {
-      Queue queue = new QueueImpl(1, address1, queue1, null, false, true, executor, scheduledExecutor, null, null, null);
+      Queue queue = new QueueImpl(1, address1, queue1, null, false, true, scheduledExecutor, null, null, null);
       MessageReference messageReference = generateReference(queue, 1);
       MessageReference messageReference2 = generateReference(queue, 2);
       MessageReference messageReference3 = generateReference(queue, 3);
@@ -1213,7 +1208,7 @@
 
    public void testGetReference() throws Exception
    {
-      Queue queue = new QueueImpl(1, address1, queue1, null, false, true, executor, scheduledExecutor, null, null, null);
+      Queue queue = new QueueImpl(1, address1, queue1, null, false, true, scheduledExecutor, null, null, null);
       MessageReference messageReference = generateReference(queue, 1);
       MessageReference messageReference2 = generateReference(queue, 2);
       MessageReference messageReference3 = generateReference(queue, 3);
@@ -1226,7 +1221,7 @@
 
    public void testGetNonExistentReference() throws Exception
    {
-      Queue queue = new QueueImpl(1, address1, queue1, null, false, true, executor, scheduledExecutor, null, null, null);
+      Queue queue = new QueueImpl(1, address1, queue1, null, false, true, scheduledExecutor, null, null, null);
       MessageReference messageReference = generateReference(queue, 1);
       MessageReference messageReference2 = generateReference(queue, 2);
       MessageReference messageReference3 = generateReference(queue, 3);
@@ -1243,7 +1238,7 @@
     */
    public void testPauseAndResumeWithAsync() throws Exception
    {
-      Queue queue = new QueueImpl(1, address1, queue1, null, false, true, executor, scheduledExecutor, null, null, null);
+      Queue queue = new QueueImpl(1, address1, queue1, null, false, true, scheduledExecutor, null, null, null);
 
       // pauses the queue
       queue.pause();
@@ -1298,7 +1293,7 @@
 
    public void testPauseAndResumeWithDirect() throws Exception
    {
-      Queue queue = new QueueImpl(1, address1, queue1, null, false, true, executor, scheduledExecutor, null, null, null);
+      Queue queue = new QueueImpl(1, address1, queue1, null, false, true, scheduledExecutor, null, null, null);
 
       // Now add a consumer
       FakeConsumer consumer = new FakeConsumer();

Modified: branches/ClebertCallback/tests/src/org/hornetq/tests/unit/core/server/impl/fakes/FakeQueueFactory.java
===================================================================
--- branches/ClebertCallback/tests/src/org/hornetq/tests/unit/core/server/impl/fakes/FakeQueueFactory.java	2009-11-22 18:49:03 UTC (rev 8368)
+++ branches/ClebertCallback/tests/src/org/hornetq/tests/unit/core/server/impl/fakes/FakeQueueFactory.java	2009-11-23 02:04:55 UTC (rev 8369)
@@ -35,14 +35,12 @@
 {
    private final ScheduledExecutorService scheduledExecutor = Executors.newSingleThreadScheduledExecutor();
    
-   private final ExecutorService executor = Executors.newSingleThreadExecutor();
-   
 	private PostOffice postOffice;
 
 	public Queue createQueue(long persistenceID, final SimpleString address, SimpleString name, Filter filter,
 			                   boolean durable, boolean temporary)
 	{
-		return new QueueImpl(persistenceID, address, name, filter, durable, temporary, executor, scheduledExecutor, postOffice, null, null);
+		return new QueueImpl(persistenceID, address, name, filter, durable, temporary, scheduledExecutor, postOffice, null, null);
 	}
 	
    public void setPostOffice(PostOffice postOffice)
@@ -54,8 +52,6 @@
    public void stop() throws Exception
    {
       scheduledExecutor.shutdown();
-      
-      executor.shutdown();
    }
 
 }

Modified: branches/ClebertCallback/tests/src/org/hornetq/tests/util/UnitTestCase.java
===================================================================
--- branches/ClebertCallback/tests/src/org/hornetq/tests/util/UnitTestCase.java	2009-11-22 18:49:03 UTC (rev 8368)
+++ branches/ClebertCallback/tests/src/org/hornetq/tests/util/UnitTestCase.java	2009-11-23 02:04:55 UTC (rev 8369)
@@ -50,6 +50,8 @@
 import org.hornetq.core.exception.HornetQException;
 import org.hornetq.core.journal.impl.AIOSequentialFileFactory;
 import org.hornetq.core.logging.Logger;
+import org.hornetq.core.persistence.impl.journal.JournalStorageManager;
+import org.hornetq.core.persistence.impl.journal.OperationContextImpl;
 import org.hornetq.core.postoffice.Binding;
 import org.hornetq.core.postoffice.Bindings;
 import org.hornetq.core.postoffice.PostOffice;
@@ -647,6 +649,8 @@
    @Override
    protected void tearDown() throws Exception
    {
+      OperationContextImpl.clearContext();
+
       deleteDirectory(new File(getTestDir()));
 
       assertEquals(0, InVMRegistry.instance.size());



More information about the hornetq-commits mailing list