[hornetq-commits] JBoss hornetq SVN: r10231 - in branches/Branch_2_2_EAP: src/main/org/hornetq/core/paging/cursor/impl and 8 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Sun Feb 20 00:03:57 EST 2011


Author: clebert.suconic at jboss.com
Date: 2011-02-20 00:03:54 -0500 (Sun, 20 Feb 2011)
New Revision: 10231

Modified:
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/journal/impl/SimpleWaitIOCallback.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/StorageManager.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/OperationContextImpl.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/Queue.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/group/impl/LocalGroupingHandler.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/QueueImpl.java
   branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java
   branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/postoffice/impl/FakeQueue.java
Log:
https://issues.jboss.org/browse/JBPAPP-5949 - Improving thread usage

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/journal/impl/SimpleWaitIOCallback.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/journal/impl/SimpleWaitIOCallback.java	2011-02-17 19:25:42 UTC (rev 10230)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/journal/impl/SimpleWaitIOCallback.java	2011-02-20 05:03:54 UTC (rev 10231)
@@ -35,6 +35,11 @@
    private volatile String errorMessage;
 
    private volatile int errorCode = 0;
+   
+   public String toString()
+   {
+      return "simpleWaitIOCallback";
+   }
 
    public void done()
    {

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java	2011-02-17 19:25:42 UTC (rev 10230)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java	2011-02-20 05:03:54 UTC (rev 10231)
@@ -233,7 +233,15 @@
       {
          public void run()
          {
-            cleanup();
+            storageManager.setContext(storageManager.newSingleThreadContext());
+            try
+            {
+               cleanup();
+            }
+            finally
+            {
+               storageManager.clearContext();
+            }
          }
       });
    }
@@ -285,24 +293,8 @@
 
                   Page currentPage = pagingStore.getCurrentPage();
 
-                  try
-                  {
-                     // First step: Move every cursor to the next bookmarked page (that was just created)
-                     for (PageSubscription cursor : cursorList)
-                     {
-                        cursor.confirmPosition(new PagePositionImpl(currentPage.getPageId(), -1));
-                     }
+                  storePositions(cursorList, currentPage);
 
-                     storageManager.waitOnOperations();
-                  }
-                  finally
-                  {
-                     for (PageSubscription cursor : cursorList)
-                     {
-                        cursor.enableAutoCleanup();
-                     }
-                  }
-
                   pagingStore.stopPaging();
 
                   // This has to be called after we stopped paging
@@ -360,6 +352,35 @@
 
    }
 
+   /**
+    * @param cursorList
+    * @param currentPage
+    * @throws Exception
+    */
+   protected void storePositions(ArrayList<PageSubscription> cursorList, Page currentPage) throws Exception
+   {
+      try
+      {
+         // First step: Move every cursor to the next bookmarked page (that was just created)
+         for (PageSubscription cursor : cursorList)
+         {
+            cursor.confirmPosition(new PagePositionImpl(currentPage.getPageId(), -1));
+         }
+
+         while (!storageManager.waitOnOperations(5000))
+         {
+            log.warn("Couldn't complete operations on IO context " + storageManager.getContext());
+         }
+      }
+      finally
+      {
+         for (PageSubscription cursor : cursorList)
+         {
+            cursor.enableAutoCleanup();
+         }
+      }
+   }
+
    public void printDebug()
    {
       System.out.println("Debug information for PageCursorProviderImpl:");

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/StorageManager.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/StorageManager.java	2011-02-17 19:25:42 UTC (rev 10230)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/StorageManager.java	2011-02-20 05:03:54 UTC (rev 10231)
@@ -57,6 +57,8 @@
 
    /** It just creates an OperationContext without associating it */
    OperationContext newContext(Executor executor);
+   
+   OperationContext newSingleThreadContext();
 
    /** Set the context back to the thread */
    void setContext(OperationContext context);
@@ -74,10 +76,14 @@
    void afterCompleteOperations(IOAsyncTask run);
 
    /** Block until the operations are done. 
+    *  Warning: Don't use it inside an ordered executor, otherwise the system may lock up
+    *           in case of the pools are full
     * @throws Exception */
-   void waitOnOperations(long timeout) throws Exception;
+   boolean waitOnOperations(long timeout) throws Exception;
 
    /** Block until the operations are done. 
+    *  Warning: Don't use it inside an ordered executor, otherwise the system may lock up
+    *           in case of the pools are full
     * @throws Exception */
    void waitOnOperations() throws Exception;
 

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java	2011-02-17 19:25:42 UTC (rev 10230)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java	2011-02-20 05:03:54 UTC (rev 10231)
@@ -15,6 +15,8 @@
 
 import java.io.File;
 import java.nio.ByteBuffer;
+import java.security.AccessController;
+import java.security.PrivilegedAction;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
@@ -23,6 +25,8 @@
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 
 import javax.transaction.xa.Xid;
 
@@ -52,10 +56,8 @@
 import org.hornetq.core.paging.PagedMessage;
 import org.hornetq.core.paging.PagingManager;
 import org.hornetq.core.paging.PagingStore;
-import org.hornetq.core.paging.cursor.PageCursorProvider;
 import org.hornetq.core.paging.cursor.PagePosition;
 import org.hornetq.core.paging.cursor.PageSubscription;
-import org.hornetq.core.paging.cursor.PageSubscriptionCounter;
 import org.hornetq.core.paging.cursor.PagedReferenceImpl;
 import org.hornetq.core.paging.cursor.impl.PagePositionImpl;
 import org.hornetq.core.paging.impl.PageTransactionInfoImpl;
@@ -85,6 +87,7 @@
 import org.hornetq.core.transaction.impl.TransactionImpl;
 import org.hornetq.utils.DataConstants;
 import org.hornetq.utils.ExecutorFactory;
+import org.hornetq.utils.HornetQThreadFactory;
 import org.hornetq.utils.UUID;
 import org.hornetq.utils.XidCodecSupport;
 
@@ -165,6 +168,8 @@
    private final ExecutorFactory executorFactory;
 
    private final Executor executor;
+   
+   private ExecutorService singleThreadExecutor;
 
    private final boolean syncTransactional;
 
@@ -333,17 +338,14 @@
    /* (non-Javadoc)
     * @see org.hornetq.core.persistence.StorageManager#blockOnReplication()
     */
-   public void waitOnOperations(final long timeout) throws Exception
+   public boolean waitOnOperations(final long timeout) throws Exception
    {
       if (!started)
       {
          JournalStorageManager.log.warn("Server is stopped");
          throw new IllegalStateException("Server is stopped");
       }
-      if (!getContext().waitCompletion(timeout))
-      {
-         throw new HornetQException(HornetQException.IO_ERROR, "Timeout on waiting I/O completion");
-      }
+      return getContext().waitCompletion(timeout);
    }
 
    /*
@@ -393,6 +395,16 @@
    {
       OperationContextImpl.setContext(context);
    }
+   
+   public Executor getSingleThreadExecutor()
+   {
+      return singleThreadExecutor;
+   }
+   
+   public OperationContext newSingleThreadContext()
+   {
+      return newContext(singleThreadExecutor);
+   }
 
    /* (non-Javadoc)
     * @see org.hornetq.core.persistence.StorageManager#newContext()
@@ -1390,6 +1402,8 @@
       checkAndCreateDir(largeMessagesDirectory, createJournalDir);
 
       cleanupIncompleteFiles();
+      
+      singleThreadExecutor = Executors.newSingleThreadExecutor(new HornetQThreadFactory("HornetQ-IO-SingleThread", true, getThisClassLoader()));
 
       bindingsJournal.start();
 
@@ -1398,6 +1412,7 @@
       started = true;
    }
 
+   
    public synchronized void stop() throws Exception
    {
       if (!started)
@@ -1414,6 +1429,8 @@
       bindingsJournal.stop();
 
       messageJournal.stop();
+      
+      singleThreadExecutor.shutdown();
 
       journalLoaded = false;
 
@@ -1834,6 +1851,19 @@
       }
    }
 
+
+   private static ClassLoader getThisClassLoader()
+   {
+      return AccessController.doPrivileged(new PrivilegedAction<ClassLoader>()
+      {
+         public ClassLoader run()
+         {
+            return JournalStorageManager.class.getClassLoader();
+         }
+      });
+   }
+
+
    // Inner Classes
    // ----------------------------------------------------------------------------
 

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/OperationContextImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/OperationContextImpl.java	2011-02-17 19:25:42 UTC (rev 10230)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/OperationContextImpl.java	2011-02-20 05:03:54 UTC (rev 10231)
@@ -232,10 +232,16 @@
          {
             public void run()
             {
-               // If any IO is done inside the callback, it needs to be done on a new context
-               OperationContextImpl.clearContext();
-               task.done();
-               executorsPending.decrementAndGet();
+               try
+               {
+                  // If any IO is done inside the callback, it needs to be done on a new context
+                  OperationContextImpl.clearContext();
+                  task.done();
+               }
+               finally
+               {
+                  executorsPending.decrementAndGet();
+               }
             }
          });
       }
@@ -277,6 +283,25 @@
 
    class TaskHolder
    {
+      
+      
+      
+      /* (non-Javadoc)
+       * @see java.lang.Object#toString()
+       */
+      @Override
+      public String toString()
+      {
+         return "TaskHolder [storeLined=" + storeLined +
+                ", replicationLined=" +
+                replicationLined +
+                ", pageLined=" +
+                pageLined +
+                ", task=" +
+                task +
+                "]";
+      }
+
       int storeLined;
 
       int replicationLined;
@@ -327,19 +352,39 @@
    @Override
    public String toString()
    {
-      return "OperationContextImpl [storeLineUp=" + storeLineUp +
+      StringBuffer buffer = new StringBuffer();
+      for (TaskHolder hold : tasks)
+      {
+         buffer.append("Task = " + hold + "\n");
+      }
+      
+      return "OperationContextImpl [minimalStore=" + minimalStore +
+             ", storeLineUp=" +
+             storeLineUp +
              ", stored=" +
              stored +
+             ", minimalReplicated=" +
+             minimalReplicated +
              ", replicationLineUp=" +
              replicationLineUp +
              ", replicated=" +
              replicated +
+             ", paged=" +
+             paged +
+             ", minimalPage=" +
+             minimalPage +
              ", pageLineUp=" +
              pageLineUp +
-             ", paged=" +
-             paged +
-             "]";
+             ", errorCode=" +
+             errorCode +
+             ", errorMessage=" +
+             errorMessage +
+             ", executorsPending=" +
+             executorsPending +
+             ", executor=" + this.executor +
+             "]" + buffer.toString();
    }
+
    
 
 }

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java	2011-02-17 19:25:42 UTC (rev 10230)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java	2011-02-20 05:03:54 UTC (rev 10231)
@@ -364,8 +364,9 @@
    /* (non-Javadoc)
     * @see org.hornetq.core.persistence.StorageManager#blockOnReplication(long)
     */
-   public void waitOnOperations(final long timeout) throws Exception
+   public boolean waitOnOperations(final long timeout) throws Exception
    {
+      return true;
    }
 
    /* (non-Javadoc)
@@ -406,7 +407,14 @@
    {
       return dummyContext;
    }
+   
+   
+   public OperationContext newSingleThreadContext()
+   {
+      return dummyContext;
+   }
 
+
    /* (non-Javadoc)
     * @see org.hornetq.core.persistence.StorageManager#setContext(org.hornetq.core.persistence.OperationContext)
     */

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/Queue.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/Queue.java	2011-02-17 19:25:42 UTC (rev 10230)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/Queue.java	2011-02-20 05:03:54 UTC (rev 10231)
@@ -159,7 +159,7 @@
    
    void resetAllIterators();
 
-   void blockOnExecutorFuture();
+   boolean blockOnExecutorFuture();
    
    void close() throws Exception;
    

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/group/impl/LocalGroupingHandler.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/group/impl/LocalGroupingHandler.java	2011-02-17 19:25:42 UTC (rev 10230)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/group/impl/LocalGroupingHandler.java	2011-02-20 05:03:54 UTC (rev 10231)
@@ -16,10 +16,12 @@
 import java.util.List;
 import java.util.concurrent.ConcurrentHashMap;
 
+import org.hornetq.api.core.HornetQException;
 import org.hornetq.api.core.SimpleString;
 import org.hornetq.api.core.management.ManagementHelper;
 import org.hornetq.api.core.management.NotificationType;
 import org.hornetq.core.logging.Logger;
+import org.hornetq.core.persistence.OperationContext;
 import org.hornetq.core.persistence.StorageManager;
 import org.hornetq.core.postoffice.BindingType;
 import org.hornetq.core.server.group.GroupingHandler;
@@ -70,30 +72,45 @@
 
    public Response propose(final Proposal proposal) throws Exception
    {
-      if (proposal.getClusterName() == null)
+      OperationContext originalCtx = storageManager.getContext();
+      
+      try
       {
-         GroupBinding original = map.get(proposal.getGroupId());
-         return original == null ? null : new Response(proposal.getGroupId(), original.getClusterName());
-      }
-      GroupBinding groupBinding = new GroupBinding(proposal.getGroupId(), proposal.getClusterName());
-      if (map.putIfAbsent(groupBinding.getGroupId(), groupBinding) == null)
-      {
-         groupBinding.setId(storageManager.generateUniqueID());
-         List<GroupBinding> newList = new ArrayList<GroupBinding>();
-         List<GroupBinding> oldList = groupMap.putIfAbsent(groupBinding.getClusterName(), newList);
-         if (oldList != null)
+         // the waitCompletion cannot be done inside an ordered executor or we would starve when the thread pool is full
+         storageManager.setContext(storageManager.newSingleThreadContext());
+      
+         if (proposal.getClusterName() == null)
          {
-            newList = oldList;
+            GroupBinding original = map.get(proposal.getGroupId());
+            return original == null ? null : new Response(proposal.getGroupId(), original.getClusterName());
          }
-         newList.add(groupBinding);
-         storageManager.addGrouping(groupBinding);
-         storageManager.waitOnOperations(timeout);
-         return new Response(groupBinding.getGroupId(), groupBinding.getClusterName());
+         GroupBinding groupBinding = new GroupBinding(proposal.getGroupId(), proposal.getClusterName());
+         if (map.putIfAbsent(groupBinding.getGroupId(), groupBinding) == null)
+         {
+            groupBinding.setId(storageManager.generateUniqueID());
+            List<GroupBinding> newList = new ArrayList<GroupBinding>();
+            List<GroupBinding> oldList = groupMap.putIfAbsent(groupBinding.getClusterName(), newList);
+            if (oldList != null)
+            {
+               newList = oldList;
+            }
+            newList.add(groupBinding);
+            storageManager.addGrouping(groupBinding);
+            if (!storageManager.waitOnOperations(timeout))
+            {
+               throw new HornetQException(HornetQException.IO_ERROR, "Timeout on waiting I/O completion");
+            }
+            return new Response(groupBinding.getGroupId(), groupBinding.getClusterName());
+         }
+         else
+         {
+            groupBinding = map.get(proposal.getGroupId());
+            return new Response(groupBinding.getGroupId(), proposal.getClusterName(), groupBinding.getClusterName());
+         }
       }
-      else
+      finally
       {
-         groupBinding = map.get(proposal.getGroupId());
-         return new Response(groupBinding.getGroupId(), proposal.getClusterName(), groupBinding.getClusterName());
+         storageManager.setContext(originalCtx);
       }
    }
 

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/QueueImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/QueueImpl.java	2011-02-17 19:25:42 UTC (rev 10230)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/QueueImpl.java	2011-02-20 05:03:54 UTC (rev 10231)
@@ -370,10 +370,11 @@
          {
             // We must block on the executor to ensure any async deliveries have completed or we might get out of order
             // deliveries
-            blockOnExecutorFuture();
-
-            // Go into direct delivery mode
-            directDeliver = true;
+            if (blockOnExecutorFuture())
+            {
+               // Go into direct delivery mode
+               directDeliver = true;
+            }
          }
          checkDirect = false;
       }
@@ -420,7 +421,7 @@
       blockOnExecutorFuture();
    }
 
-   public void blockOnExecutorFuture()
+   public boolean blockOnExecutorFuture()
    {
       Future future = new Future();
 
@@ -430,8 +431,10 @@
 
       if (!ok)
       {
-         throw new IllegalStateException("Timed out waiting for future to complete");
+         log.warn("Couldn't finish waiting executors. Try increasing the thread pool size");
       }
+      
+      return ok;
    }
 
    public synchronized void addConsumer(final Consumer consumer) throws Exception

Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java	2011-02-17 19:25:42 UTC (rev 10230)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java	2011-02-20 05:03:54 UTC (rev 10231)
@@ -1461,8 +1461,9 @@
       /* (non-Javadoc)
        * @see org.hornetq.core.persistence.StorageManager#blockOnReplication(long)
        */
-      public void waitOnOperations(final long timeout) throws Exception
+      public boolean waitOnOperations(final long timeout) throws Exception
       {
+         return true;
       }
 
       /* (non-Javadoc)
@@ -1659,6 +1660,14 @@
          return 0;
       }
 
+      /* (non-Javadoc)
+       * @see org.hornetq.core.persistence.StorageManager#newSingleThreadContext()
+       */
+      public OperationContext newSingleThreadContext()
+      {
+         return getContext();
+      }
+
    }
 
    class FakeStoreFactory implements PagingStoreFactory

Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/postoffice/impl/FakeQueue.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/postoffice/impl/FakeQueue.java	2011-02-17 19:25:42 UTC (rev 10230)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/postoffice/impl/FakeQueue.java	2011-02-20 05:03:54 UTC (rev 10231)
@@ -64,10 +64,9 @@
       
    }
 
-   public void blockOnExecutorFuture()
+   public boolean blockOnExecutorFuture()
    {
-      // TODO Auto-generated method stub
-      
+      return true;
    }
 
    public void addHead(MessageReference ref)



More information about the hornetq-commits mailing list