[hornetq-commits] JBoss hornetq SVN: r8365 - in branches/ClebertCallback: src/main/org/hornetq/core/persistence and 9 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Sat Nov 21 18:08:17 EST 2009


Author: clebert.suconic at jboss.com
Date: 2009-11-21 18:08:15 -0500 (Sat, 21 Nov 2009)
New Revision: 8365

Modified:
   branches/ClebertCallback/src/main/org/hornetq/core/management/impl/ManagementServiceImpl.java
   branches/ClebertCallback/src/main/org/hornetq/core/persistence/OperationContext.java
   branches/ClebertCallback/src/main/org/hornetq/core/persistence/StorageManager.java
   branches/ClebertCallback/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
   branches/ClebertCallback/src/main/org/hornetq/core/persistence/impl/journal/OperationContextImpl.java
   branches/ClebertCallback/src/main/org/hornetq/core/persistence/impl/journal/SyncOperation.java
   branches/ClebertCallback/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java
   branches/ClebertCallback/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
   branches/ClebertCallback/src/main/org/hornetq/core/replication/ReplicationManager.java
   branches/ClebertCallback/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
   branches/ClebertCallback/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
   branches/ClebertCallback/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
   branches/ClebertCallback/src/main/org/hornetq/core/server/impl/ServerSessionPacketHandler.java
   branches/ClebertCallback/tests/src/org/hornetq/tests/integration/client/OrderTest.java
   branches/ClebertCallback/tests/src/org/hornetq/tests/integration/replication/ReplicationOrderTest.java
   branches/ClebertCallback/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java
   branches/ClebertCallback/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java
Log:
Simplifying ordering and switch context

Modified: branches/ClebertCallback/src/main/org/hornetq/core/management/impl/ManagementServiceImpl.java
===================================================================
--- branches/ClebertCallback/src/main/org/hornetq/core/management/impl/ManagementServiceImpl.java	2009-11-21 20:49:38 UTC (rev 8364)
+++ branches/ClebertCallback/src/main/org/hornetq/core/management/impl/ManagementServiceImpl.java	2009-11-21 23:08:15 UTC (rev 8365)
@@ -744,7 +744,7 @@
       // TODO: Talk to Andy and Jeff about a better way to sync this...
       System.out.println("Waiting");
       final CountDownLatch latch = new CountDownLatch(1);
-      OperationContextImpl.getContext().executeOnCompletion(new IOAsyncTask()
+      OperationContextImpl.getInstance().executeOnCompletion(new IOAsyncTask()
       {
 
          public void done()
@@ -759,7 +759,7 @@
          
       });
       
-      OperationContextImpl.getContext().complete();
+      OperationContextImpl.getInstance().complete();
       
       latch.await(5, TimeUnit.SECONDS);
       System.out.println("Done");

Modified: branches/ClebertCallback/src/main/org/hornetq/core/persistence/OperationContext.java
===================================================================
--- branches/ClebertCallback/src/main/org/hornetq/core/persistence/OperationContext.java	2009-11-21 20:49:38 UTC (rev 8364)
+++ branches/ClebertCallback/src/main/org/hornetq/core/persistence/OperationContext.java	2009-11-21 23:08:15 UTC (rev 8365)
@@ -13,6 +13,8 @@
 
 package org.hornetq.core.persistence;
 
+import java.util.concurrent.Executor;
+
 import org.hornetq.core.journal.IOAsyncTask;
 import org.hornetq.core.journal.IOCompletion;
 
@@ -29,7 +31,16 @@
 {
    
    boolean hasReplication();
+   
+   /** Reattach the context to the current thread */
+   void reinstall();
+   
+   /** The executor used on responses.
+    *  If this is not set, it will use the current thread. */
+   void setExecutor(Executor executor);
 
+   /** Execute the task when all IO operations are complete,
+    *  Or execute it immediately if nothing is pending.  */
    void executeOnCompletion(IOAsyncTask runnable);
    
    void replicationLineUp();

Modified: branches/ClebertCallback/src/main/org/hornetq/core/persistence/StorageManager.java
===================================================================
--- branches/ClebertCallback/src/main/org/hornetq/core/persistence/StorageManager.java	2009-11-21 20:49:38 UTC (rev 8364)
+++ branches/ClebertCallback/src/main/org/hornetq/core/persistence/StorageManager.java	2009-11-21 23:08:15 UTC (rev 8365)
@@ -15,6 +15,7 @@
 
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.Executor;
 
 import javax.transaction.xa.Xid;
 
@@ -47,6 +48,13 @@
  */
 public interface StorageManager extends HornetQComponent
 {
+   
+   /** Get the context associated with the thread for later reuse */
+   OperationContext getContext();
+   
+   /** It just creates an OperationContext without associating it to any threads */
+   OperationContext newContext(Executor executor);
+   
    // Message related operations
 
    void pageClosed(SimpleString storeName, int pageNumber);

Modified: branches/ClebertCallback/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- branches/ClebertCallback/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java	2009-11-21 20:49:38 UTC (rev 8364)
+++ branches/ClebertCallback/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java	2009-11-21 23:08:15 UTC (rev 8365)
@@ -55,6 +55,7 @@
 import org.hornetq.core.paging.PagingManager;
 import org.hornetq.core.paging.impl.PageTransactionInfoImpl;
 import org.hornetq.core.persistence.GroupingInfo;
+import org.hornetq.core.persistence.OperationContext;
 import org.hornetq.core.persistence.QueueBindingInfo;
 import org.hornetq.core.persistence.StorageManager;
 import org.hornetq.core.postoffice.Binding;
@@ -294,14 +295,7 @@
     */
    public void completeOperations()
    {
-      if (replicator != null)
-      {
-         replicator.closeContext();
-      }
-      else
-      {
-         OperationContextImpl.getContext().complete();
-      }
+      OperationContextImpl.getInstance().complete();
    }
 
    public boolean isReplicated()
@@ -373,9 +367,26 @@
 
    // TODO: shouldn't those page methods be on the PageManager? ^^^^
 
+
+   /* (non-Javadoc)
+    * @see org.hornetq.core.persistence.StorageManager#getContext()
+    */
+   public OperationContext getContext()
+   {
+      return OperationContextImpl.getInstance();
+   }
+
+   /* (non-Javadoc)
+    * @see org.hornetq.core.persistence.StorageManager#newContext()
+    */
+   public OperationContext newContext(Executor executor)
+   {
+      return new OperationContextImpl(executor);
+   }
+
    public void afterCompleteOperations(IOAsyncTask run)
    {
-      OperationContextImpl.getContext().executeOnCompletion(run);
+      OperationContextImpl.getInstance().executeOnCompletion(run);
    }
 
    public UUID getPersistentID()
@@ -506,7 +517,7 @@
 
    public void sync()
    {
-      messageJournal.sync(OperationContextImpl.getContext());
+      messageJournal.sync(OperationContextImpl.getInstance());
    }
 
    // Transactional operations
@@ -1390,7 +1401,7 @@
    
    private IOCompletion getIOContext()
    {
-      return OperationContextImpl.getContext();
+      return OperationContextImpl.getInstance();
    }
 
    private void checkAndCreateDir(final String dir, final boolean create)
@@ -1917,7 +1928,6 @@
       }
 
    }
-   
 
 
 }

Modified: branches/ClebertCallback/src/main/org/hornetq/core/persistence/impl/journal/OperationContextImpl.java
===================================================================
--- branches/ClebertCallback/src/main/org/hornetq/core/persistence/impl/journal/OperationContextImpl.java	2009-11-21 20:49:38 UTC (rev 8364)
+++ branches/ClebertCallback/src/main/org/hornetq/core/persistence/impl/journal/OperationContextImpl.java	2009-11-21 23:08:15 UTC (rev 8365)
@@ -16,21 +16,32 @@
 import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.hornetq.core.journal.IOAsyncTask;
 import org.hornetq.core.persistence.OperationContext;
 
+import sun.security.util.PendingException;
+
 /**
- * A ReplicationToken
- *
+ * 
+ * This class will hold operations when there are IO operations...
+ * and it will 
+ * 
  * @author <mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
  */
 public class OperationContextImpl implements OperationContext
 {
    private static final ThreadLocal<OperationContext> tlContext = new ThreadLocal<OperationContext>();
 
-   public static OperationContext getContext()
+   public static void setInstance(OperationContext context)
    {
+      tlContext.set(context);
+   }
+   
+   public static OperationContext getInstance()
+   {
       OperationContext token = tlContext.get();
       if (token == null)
       {
@@ -53,6 +64,14 @@
    private int stored = 0;
 
    private int replicated = 0;
+   
+   private int errorCode = -1;
+   
+   private String errorMessage = null;
+   
+   private Executor executor;
+   
+   private final AtomicInteger executorsPending = new AtomicInteger(0);
 
    /**
     * @param executor
@@ -62,6 +81,21 @@
       super();
    }
 
+   public OperationContextImpl(final Executor executor)
+   {
+      super();
+      this.executor = executor;
+   }
+   
+   /*
+    * @see org.hornetq.core.persistence.OperationContext#reinstall()
+    */
+   public void reinstall()
+   {
+      setInstance(this);
+   }
+
+
    /** To be called by the replication manager, when new replication is added to the queue */
    public void lineUp()
    {
@@ -72,6 +106,12 @@
    {
       replicationLineUp++;
    }
+   
+   /** this method needs to be called before the executor became operational */
+   public void setExecutor(Executor executor)
+   {
+      this.executor = executor;
+   }
 
    public synchronized void replicationDone()
    {
@@ -85,7 +125,7 @@
    }
 
    /** You may have several actions to be done after a replication operation is completed. */
-   public synchronized void executeOnCompletion(IOAsyncTask completion)
+   public synchronized void executeOnCompletion(final IOAsyncTask completion)
    {
       if (tasks == null)
       {
@@ -94,9 +134,30 @@
          minimalStore = storeLineUp;
       }
 
+      // On this case, we can just execute the context directly
       if (replicationLineUp == replicated && storeLineUp == stored)
       {
-         completion.done();
+         if (executor != null)
+         {
+            // We want to avoid the executor if everything is complete...
+            // However, we can't execute the context if there are executions pending
+            // We need to use the executor on this case
+            if (executorsPending.get() == 0)
+            {
+               // No need to use an executor here or a context switch
+               // there are no actions pending.. hence we can just execute the task directly on the same thread
+               completion.done();
+            }
+            else
+            {
+               execute(completion);
+            }
+         }
+         else
+         {
+            // Execute without an executor
+            completion.done();
+         }
       }
       else
       {
@@ -122,19 +183,64 @@
             if (!holder.executed && stored >= holder.storeLined && replicated >= holder.replicationLined)
             {
                holder.executed = true;
-               holder.task.done();
+                
+               if (executor != null)
+               {
+                  // If set, we use an executor to avoid the server being single threaded
+                  execute(holder.task);
+               }
+               else
+               {
+                  holder.task.done();
+               }
+               
                iter.remove();
             }
+            else
+            {
+               // The actions need to be done in order...
+               // so it must achieve both conditions before we can proceed to more tasks
+               break;
+            }
          }
       }
    }
 
+   /**
+    * @param holder
+    */
+   private void execute(final IOAsyncTask task)
+   {
+      executorsPending.incrementAndGet();
+      executor.execute(new Runnable()
+      {
+         public void run()
+         {
+            task.done();
+            executorsPending.decrementAndGet();
+         }
+      });
+   }
+
    /* (non-Javadoc)
     * @see org.hornetq.core.replication.ReplicationToken#complete()
     */
    public void complete()
    {
       tlContext.set(null);
+      
+      // TODO: test and fix exceptions on the Context
+      if (tasks != null && errorMessage != null)
+      {
+         for (TaskHolder run : tasks)
+         {
+            run.task.onError(errorCode, errorMessage);
+         }
+      }
+      
+      // We hold errors until the complete is set, or the callbacks will never get informed
+      errorCode = -1;
+      errorMessage = null;
    }
    
    public boolean isSync()
@@ -147,13 +253,8 @@
     */
    public void onError(int errorCode, String errorMessage)
    {
-      if (tasks != null)
-      {
-         for (TaskHolder run : tasks)
-         {
-            run.task.onError(errorCode, errorMessage);
-         }
-      }
+      this.errorCode = errorCode;
+      this.errorMessage = errorMessage;
    }
 
    class TaskHolder
@@ -174,4 +275,5 @@
       }
    }
 
+
 }

Modified: branches/ClebertCallback/src/main/org/hornetq/core/persistence/impl/journal/SyncOperation.java
===================================================================
--- branches/ClebertCallback/src/main/org/hornetq/core/persistence/impl/journal/SyncOperation.java	2009-11-21 20:49:38 UTC (rev 8364)
+++ branches/ClebertCallback/src/main/org/hornetq/core/persistence/impl/journal/SyncOperation.java	2009-11-21 23:08:15 UTC (rev 8365)
@@ -13,6 +13,8 @@
 
 package org.hornetq.core.persistence.impl.journal;
 
+import java.util.concurrent.Executor;
+
 import org.hornetq.core.journal.IOAsyncTask;
 import org.hornetq.core.persistence.OperationContext;
 
@@ -125,6 +127,22 @@
       ctx.replicationLineUp();
    }
 
+   /**
+    * @see org.hornetq.core.persistence.OperationContext#setExecutor(java.util.concurrent.Executor)
+    */
+   public void setExecutor(Executor executor)
+   {
+      ctx.setExecutor(executor);
+   }
+
+   /* (non-Javadoc)
+    * @see org.hornetq.core.persistence.OperationContext#reattach()
+    */
+   public void reinstall()
+   {
+      OperationContextImpl.setInstance(this);
+   }
+
    // Package protected ---------------------------------------------
 
    // Protected -----------------------------------------------------

Modified: branches/ClebertCallback/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java
===================================================================
--- branches/ClebertCallback/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java	2009-11-21 20:49:38 UTC (rev 8364)
+++ branches/ClebertCallback/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java	2009-11-21 23:08:15 UTC (rev 8365)
@@ -15,6 +15,7 @@
 
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.Executor;
 import java.util.concurrent.atomic.AtomicLong;
 
 import javax.transaction.xa.Xid;
@@ -26,6 +27,7 @@
 import org.hornetq.core.paging.PagedMessage;
 import org.hornetq.core.paging.PagingManager;
 import org.hornetq.core.persistence.GroupingInfo;
+import org.hornetq.core.persistence.OperationContext;
 import org.hornetq.core.persistence.QueueBindingInfo;
 import org.hornetq.core.persistence.StorageManager;
 import org.hornetq.core.postoffice.Binding;
@@ -356,4 +358,20 @@
    {
    }
 
+   /* (non-Javadoc)
+    * @see org.hornetq.core.persistence.StorageManager#getContext()
+    */
+   public OperationContext getContext()
+   {
+      return null;
+   }
+
+   /* (non-Javadoc)
+    * @see org.hornetq.core.persistence.StorageManager#newContext()
+    */
+   public OperationContext newContext(Executor executor)
+   {
+      return null;
+   }
+
 }

Modified: branches/ClebertCallback/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
===================================================================
--- branches/ClebertCallback/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java	2009-11-21 20:49:38 UTC (rev 8364)
+++ branches/ClebertCallback/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java	2009-11-21 23:08:15 UTC (rev 8365)
@@ -917,10 +917,6 @@
                }
             }
          }
-         else
-         {
-            storageManager.sync();
-         }
 
          message.incrementRefCount(reference);
       }
@@ -931,6 +927,8 @@
       }
       else
       {
+         // This will use the same thread if there are no pending operations
+         // avoiding a context switch on this case
          storageManager.afterCompleteOperations(new IOAsyncTask()
          {
             public void onError(int errorCode, String errorMessage)

Modified: branches/ClebertCallback/src/main/org/hornetq/core/replication/ReplicationManager.java
===================================================================
--- branches/ClebertCallback/src/main/org/hornetq/core/replication/ReplicationManager.java	2009-11-21 20:49:38 UTC (rev 8364)
+++ branches/ClebertCallback/src/main/org/hornetq/core/replication/ReplicationManager.java	2009-11-21 23:08:15 UTC (rev 8365)
@@ -50,8 +50,6 @@
 
    void appendRollbackRecord(byte journalID, long txID) throws Exception;
    
-   void closeContext();
-   
    /** A list of tokens that are still waiting for replications to be completed */
    Set<OperationContext> getActiveTokens();
 

Modified: branches/ClebertCallback/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
===================================================================
--- branches/ClebertCallback/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java	2009-11-21 20:49:38 UTC (rev 8364)
+++ branches/ClebertCallback/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java	2009-11-21 23:08:15 UTC (rev 8365)
@@ -392,25 +392,7 @@
       started = false;
    }
 
-   /* (non-Javadoc)
-    * @see org.hornetq.core.replication.ReplicationManager#completeToken()
-    */
-   public void closeContext()
-   {
-      final OperationContext token = getContext();
 
-      if (token != null)
-      {
-         // Remove from pending tokens as soon as this is complete
-         if (!token.hasReplication())
-         {
-            sync(token);
-         }
-         token.complete();
-      }
-   }
-
-
    /* method for testcases only
     * @see org.hornetq.core.replication.ReplicationManager#getPendingTokens()
     */
@@ -513,7 +495,7 @@
    
    public OperationContext getContext()
    {
-      return OperationContextImpl.getContext();
+      return OperationContextImpl.getInstance();
    }
 
    /**

Modified: branches/ClebertCallback/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- branches/ClebertCallback/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java	2009-11-21 20:49:38 UTC (rev 8364)
+++ branches/ClebertCallback/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java	2009-11-21 23:08:15 UTC (rev 8365)
@@ -22,6 +22,7 @@
 import java.util.Set;
 import java.util.Map.Entry;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
@@ -61,6 +62,7 @@
 import org.hornetq.core.paging.impl.PagingManagerImpl;
 import org.hornetq.core.paging.impl.PagingStoreFactoryNIO;
 import org.hornetq.core.persistence.GroupingInfo;
+import org.hornetq.core.persistence.OperationContext;
 import org.hornetq.core.persistence.QueueBindingInfo;
 import org.hornetq.core.persistence.StorageManager;
 import org.hornetq.core.persistence.impl.journal.JournalStorageManager;
@@ -646,6 +648,10 @@
       }
 
       Channel channel = connection.getChannel(channelID, sendWindowSize);
+      
+      Executor sessionExecutor = executorFactory.getExecutor();
+      
+      OperationContext sessionContext = storageManager.newContext(sessionExecutor);
 
       final ServerSessionImpl session = new ServerSessionImpl(name,
                                                               username,
@@ -661,7 +667,8 @@
                                                               postOffice,
                                                               resourceManager,
                                                               securityStore,
-                                                              executorFactory.getExecutor(),
+                                                              sessionContext,
+                                                              sessionExecutor,
                                                               channel,
                                                               managementService,
                                                               // queueFactory,
@@ -670,7 +677,7 @@
 
       sessions.put(name, session);
 
-      ServerSessionPacketHandler handler = new ServerSessionPacketHandler(session);
+      ServerSessionPacketHandler handler = new ServerSessionPacketHandler(session, sessionContext);
 
       session.setHandler(handler);
 

Modified: branches/ClebertCallback/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
===================================================================
--- branches/ClebertCallback/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java	2009-11-21 20:49:38 UTC (rev 8364)
+++ branches/ClebertCallback/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java	2009-11-21 23:08:15 UTC (rev 8365)
@@ -39,6 +39,7 @@
 import org.hornetq.core.management.ManagementService;
 import org.hornetq.core.management.Notification;
 import org.hornetq.core.paging.PagingStore;
+import org.hornetq.core.persistence.OperationContext;
 import org.hornetq.core.persistence.StorageManager;
 import org.hornetq.core.persistence.impl.journal.OperationContextImpl;
 import org.hornetq.core.postoffice.Binding;
@@ -187,6 +188,10 @@
    private final HornetQServer server;
 
    private final SimpleString managementAddress;
+   
+   /** We always use the same operation context for the session.
+    *  With that we can perform extra checks */
+   private final OperationContext sessionOperationContext;
 
    // The current currentLargeMessage being processed
    private volatile LargeServerMessage currentLargeMessage;
@@ -213,6 +218,7 @@
                             final PostOffice postOffice,
                             final ResourceManager resourceManager,
                             final SecurityStore securityStore,
+                            final OperationContext sessionOperationContext,
                             final Executor executor,
                             final Channel channel,
                             final ManagementService managementService,
@@ -242,6 +248,8 @@
       this.resourceManager = resourceManager;
 
       this.securityStore = securityStore;
+      
+      this.sessionOperationContext = sessionOperationContext;
 
       this.executor = executor;
 
@@ -1739,8 +1747,6 @@
             doSendResponse(confirmPacket, response, flush, closeChannel);
          }
       });
-      
-      storageManager.completeOperations();
    }
 
    /**

Modified: branches/ClebertCallback/src/main/org/hornetq/core/server/impl/ServerSessionPacketHandler.java
===================================================================
--- branches/ClebertCallback/src/main/org/hornetq/core/server/impl/ServerSessionPacketHandler.java	2009-11-21 20:49:38 UTC (rev 8364)
+++ branches/ClebertCallback/src/main/org/hornetq/core/server/impl/ServerSessionPacketHandler.java	2009-11-21 23:08:15 UTC (rev 8365)
@@ -45,6 +45,7 @@
 import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_XA_SUSPEND;
 
 import org.hornetq.core.logging.Logger;
+import org.hornetq.core.persistence.OperationContext;
 import org.hornetq.core.remoting.ChannelHandler;
 import org.hornetq.core.remoting.Packet;
 import org.hornetq.core.remoting.impl.wireformat.CreateQueueMessage;
@@ -80,16 +81,20 @@
  * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
  * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
  * @author <a href="mailto:andy.taylor at jboss.org>Andy Taylor</a>
+ * @author <a href="mailto:clebert.suconic at jboss.org>Clebert Suconic</a>
  */
 public class ServerSessionPacketHandler implements ChannelHandler
 {
    private static final Logger log = Logger.getLogger(ServerSessionPacketHandler.class);
 
    private final ServerSession session;
+   
+   private final OperationContext sessionContext;
 
-   public ServerSessionPacketHandler(final ServerSession session)
+   public ServerSessionPacketHandler(final ServerSession session, OperationContext sessionContext)
    {
       this.session = session;
+      this.sessionContext = sessionContext;
    }
 
    public long getID()
@@ -101,6 +106,11 @@
    {
       byte type = packet.getType();
 
+      if (sessionContext != null)
+      {
+         sessionContext.reinstall();
+      }
+      
       try
       {
          switch (type)
@@ -289,5 +299,12 @@
       {
          log.error("Caught unexpected exception", t);
       }
+      finally
+      {
+         if (sessionContext != null)
+         {
+            sessionContext.complete();
+         }
+      }
    }
 }

Modified: branches/ClebertCallback/tests/src/org/hornetq/tests/integration/client/OrderTest.java
===================================================================
--- branches/ClebertCallback/tests/src/org/hornetq/tests/integration/client/OrderTest.java	2009-11-21 20:49:38 UTC (rev 8364)
+++ branches/ClebertCallback/tests/src/org/hornetq/tests/integration/client/OrderTest.java	2009-11-21 23:08:15 UTC (rev 8365)
@@ -13,18 +13,11 @@
 
 package org.hornetq.tests.integration.client;
 
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
 import org.hornetq.core.client.ClientConsumer;
 import org.hornetq.core.client.ClientMessage;
 import org.hornetq.core.client.ClientProducer;
 import org.hornetq.core.client.ClientSession;
 import org.hornetq.core.client.ClientSessionFactory;
-import org.hornetq.core.client.SessionFailureListener;
-import org.hornetq.core.client.impl.ClientSessionInternal;
-import org.hornetq.core.exception.HornetQException;
-import org.hornetq.core.remoting.RemotingConnection;
 import org.hornetq.core.server.HornetQServer;
 import org.hornetq.tests.util.ServiceTestBase;
 
@@ -47,9 +40,6 @@
    protected void setUp() throws Exception
    {
       super.setUp();
-      server = createServer(true, true);
-      server.getConfiguration().setJournalFileSize(10 * 1024 * 1024);
-      server.start();
    }
 
    protected void tearDown() throws Exception
@@ -64,8 +54,21 @@
 
    // Public --------------------------------------------------------
 
-   public void testSimpleOrder() throws Exception
+   public void testSimpleOrderNoStorage() throws Exception
    {
+      doTestSimpleOrder(false);
+   }
+
+   public void testSimpleOrderPersistence() throws Exception
+   {
+      doTestSimpleOrder(true);
+   }
+
+   public void doTestSimpleOrder(final boolean persistent) throws Exception
+   {
+      server = createServer(persistent, true);
+      server.start();
+
       ClientSessionFactory sf = createNettyFactory();
 
       sf.setBlockOnNonPersistentSend(false);
@@ -92,16 +95,16 @@
 
          boolean started = false;
 
-         for (int start = 0; start < 3; start++)
+         for (int start = 0; start < 2; start++)
          {
 
-            if (start == 2)
+            if (persistent && start == 1)
             {
                started = true;
                server.stop();
                server.start();
             }
-
+            
             session = sf.createSession(true, true);
 
             session.start();
@@ -142,40 +145,6 @@
 
    }
 
-   private void fail(ClientSession session) throws InterruptedException
-   {
-
-      final CountDownLatch latch = new CountDownLatch(1);
-
-      class MyListener implements SessionFailureListener
-      {
-         public void connectionFailed(HornetQException me)
-         {
-            latch.countDown();
-         }
-
-         public void beforeReconnect(HornetQException exception)
-         {
-         }
-      }
-
-      MyListener listener = new MyListener();
-      session.addFailureListener(listener);
-
-      RemotingConnection conn = ((ClientSessionInternal)session).getConnection();
-
-      // Simulate failure on connection
-      conn.fail(new HornetQException(HornetQException.NOT_CONNECTED));
-
-      // Wait to be informed of failure
-
-      boolean ok = latch.await(1000, TimeUnit.MILLISECONDS);
-
-      assertTrue(ok);
-
-      session.removeFailureListener(listener);
-   }
-
    // Package protected ---------------------------------------------
 
    // Protected -----------------------------------------------------

Modified: branches/ClebertCallback/tests/src/org/hornetq/tests/integration/replication/ReplicationOrderTest.java
===================================================================
--- branches/ClebertCallback/tests/src/org/hornetq/tests/integration/replication/ReplicationOrderTest.java	2009-11-21 20:49:38 UTC (rev 8364)
+++ branches/ClebertCallback/tests/src/org/hornetq/tests/integration/replication/ReplicationOrderTest.java	2009-11-21 23:08:15 UTC (rev 8365)
@@ -52,17 +52,6 @@
 
    // Public --------------------------------------------------------
 
-   public void test() throws Exception
-   {
-      for (int i = 0; i < 100; i++)
-      {
-         System.out.println("<<<<<< " + i + " >>>>>>>");
-         testMixedPersistentAndNonPersistentMessagesOrderWithReplicatedBackup();
-         tearDown();
-         setUp();
-      }
-   }
-
    public void testMixedPersistentAndNonPersistentMessagesOrderWithReplicatedBackup() throws Exception
    {
       doTestMixedPersistentAndNonPersistentMessagesOrderWithReplicatedBackup(false);

Modified: branches/ClebertCallback/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java
===================================================================
--- branches/ClebertCallback/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java	2009-11-21 20:49:38 UTC (rev 8364)
+++ branches/ClebertCallback/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java	2009-11-21 23:08:15 UTC (rev 8365)
@@ -51,6 +51,7 @@
 import org.hornetq.core.paging.impl.PagedMessageImpl;
 import org.hornetq.core.paging.impl.PagingManagerImpl;
 import org.hornetq.core.paging.impl.PagingStoreFactoryNIO;
+import org.hornetq.core.persistence.OperationContext;
 import org.hornetq.core.persistence.StorageManager;
 import org.hornetq.core.persistence.impl.journal.OperationContextImpl;
 import org.hornetq.core.remoting.Interceptor;
@@ -368,7 +369,6 @@
 
          Journal replicatedJournal = new ReplicatedJournal((byte)1, new FakeJournal(), manager);
 
-         Thread.sleep(100);
          TestInterceptor.value.set(false);
 
          for (int i = 0; i < 500; i++)
@@ -377,7 +377,7 @@
          }
 
          final CountDownLatch latch = new CountDownLatch(1);
-         OperationContextImpl.getContext().executeOnCompletion(new IOAsyncTask()
+         OperationContextImpl.getInstance().executeOnCompletion(new IOAsyncTask()
          {
 
             public void onError(int errorCode, String errorMessage)
@@ -390,8 +390,6 @@
             }
          });
 
-         manager.closeContext();
-
          server.stop();
 
          assertTrue(latch.await(50, TimeUnit.SECONDS));
@@ -409,7 +407,7 @@
    private void blockOnReplication(ReplicationManagerImpl manager) throws Exception
    {
       final CountDownLatch latch = new CountDownLatch(1);
-      OperationContextImpl.getContext().executeOnCompletion(new IOAsyncTask()
+      OperationContextImpl.getInstance().executeOnCompletion(new IOAsyncTask()
       {
 
          public void onError(int errorCode, String errorMessage)
@@ -422,8 +420,6 @@
          }
       });
 
-      manager.closeContext();
-
       assertTrue(latch.await(30, TimeUnit.SECONDS));
    }
 
@@ -468,7 +464,7 @@
          replicatedJournal.appendPrepareRecord(1, new FakeData(), false);
 
          final CountDownLatch latch = new CountDownLatch(1);
-         OperationContextImpl.getContext().executeOnCompletion(new IOAsyncTask()
+         OperationContextImpl.getInstance().executeOnCompletion(new IOAsyncTask()
          {
 
             public void onError(int errorCode, String errorMessage)
@@ -481,8 +477,6 @@
             }
          });
 
-         manager.closeContext();
-
          assertTrue(latch.await(1, TimeUnit.SECONDS));
 
          assertEquals(0, manager.getActiveTokens().size());
@@ -521,6 +515,8 @@
 
          final CountDownLatch latch = new CountDownLatch(numberOfAdds);
 
+         OperationContext ctx = OperationContextImpl.getInstance();
+         
          for (int i = 0; i < numberOfAdds; i++)
          {
             final int nAdd = i;
@@ -529,12 +525,8 @@
             {
                replicatedJournal.appendPrepareRecord(i, new FakeData(), false);
             }
-            else
-            {
-               manager.sync(OperationContextImpl.getContext());
-            }
 
-            OperationContextImpl.getContext().executeOnCompletion(new IOAsyncTask()
+            ctx.executeOnCompletion(new IOAsyncTask()
             {
 
                public void onError(int errorCode, String errorMessage)
@@ -543,12 +535,11 @@
 
                public void done()
                {
+                  System.out.println("Add " + nAdd);
                   executions.add(nAdd);
                   latch.countDown();
                }
             });
-
-            manager.closeContext();
          }
 
          assertTrue(latch.await(10, TimeUnit.SECONDS));

Modified: branches/ClebertCallback/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java
===================================================================
--- branches/ClebertCallback/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java	2009-11-21 20:49:38 UTC (rev 8364)
+++ branches/ClebertCallback/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java	2009-11-21 23:08:15 UTC (rev 8365)
@@ -44,6 +44,7 @@
 import org.hornetq.core.paging.impl.PagingStoreImpl;
 import org.hornetq.core.paging.impl.TestSupportPageStore;
 import org.hornetq.core.persistence.GroupingInfo;
+import org.hornetq.core.persistence.OperationContext;
 import org.hornetq.core.persistence.QueueBindingInfo;
 import org.hornetq.core.persistence.StorageManager;
 import org.hornetq.core.postoffice.Binding;
@@ -1247,6 +1248,22 @@
       {
       }
 
+      /* (non-Javadoc)
+       * @see org.hornetq.core.persistence.StorageManager#getContext()
+       */
+      public OperationContext getContext()
+      {
+         return null;
+      }
+
+      /* (non-Javadoc)
+       * @see org.hornetq.core.persistence.StorageManager#newContext(java.util.concurrent.Executor)
+       */
+      public OperationContext newContext(Executor executor)
+      {
+         return null;
+      }
+
    }
 
    class FakeStoreFactory implements PagingStoreFactory



More information about the hornetq-commits mailing list