[hornetq-commits] JBoss hornetq SVN: r10231 - in branches/Branch_2_2_EAP: src/main/org/hornetq/core/paging/cursor/impl and 8 other directories.
do-not-reply at jboss.org
do-not-reply at jboss.org
Sun Feb 20 00:03:57 EST 2011
Author: clebert.suconic at jboss.com
Date: 2011-02-20 00:03:54 -0500 (Sun, 20 Feb 2011)
New Revision: 10231
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/journal/impl/SimpleWaitIOCallback.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/StorageManager.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/OperationContextImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/Queue.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/group/impl/LocalGroupingHandler.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/QueueImpl.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/postoffice/impl/FakeQueue.java
Log:
https://issues.jboss.org/browse/JBPAPP-5949 - Improving thread usage
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/journal/impl/SimpleWaitIOCallback.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/journal/impl/SimpleWaitIOCallback.java 2011-02-17 19:25:42 UTC (rev 10230)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/journal/impl/SimpleWaitIOCallback.java 2011-02-20 05:03:54 UTC (rev 10231)
@@ -35,6 +35,11 @@
private volatile String errorMessage;
private volatile int errorCode = 0;
+
+ public String toString()
+ {
+ return "simpleWaitIOCallback";
+ }
public void done()
{
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java 2011-02-17 19:25:42 UTC (rev 10230)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java 2011-02-20 05:03:54 UTC (rev 10231)
@@ -233,7 +233,15 @@
{
public void run()
{
- cleanup();
+ storageManager.setContext(storageManager.newSingleThreadContext());
+ try
+ {
+ cleanup();
+ }
+ finally
+ {
+ storageManager.clearContext();
+ }
}
});
}
@@ -285,24 +293,8 @@
Page currentPage = pagingStore.getCurrentPage();
- try
- {
- // First step: Move every cursor to the next bookmarked page (that was just created)
- for (PageSubscription cursor : cursorList)
- {
- cursor.confirmPosition(new PagePositionImpl(currentPage.getPageId(), -1));
- }
+ storePositions(cursorList, currentPage);
- storageManager.waitOnOperations();
- }
- finally
- {
- for (PageSubscription cursor : cursorList)
- {
- cursor.enableAutoCleanup();
- }
- }
-
pagingStore.stopPaging();
// This has to be called after we stopped paging
@@ -360,6 +352,35 @@
}
+ /**
+ * @param cursorList
+ * @param currentPage
+ * @throws Exception
+ */
+ protected void storePositions(ArrayList<PageSubscription> cursorList, Page currentPage) throws Exception
+ {
+ try
+ {
+ // First step: Move every cursor to the next bookmarked page (that was just created)
+ for (PageSubscription cursor : cursorList)
+ {
+ cursor.confirmPosition(new PagePositionImpl(currentPage.getPageId(), -1));
+ }
+
+ while (!storageManager.waitOnOperations(5000))
+ {
+ log.warn("Couldn't complete operations on IO context " + storageManager.getContext());
+ }
+ }
+ finally
+ {
+ for (PageSubscription cursor : cursorList)
+ {
+ cursor.enableAutoCleanup();
+ }
+ }
+ }
+
public void printDebug()
{
System.out.println("Debug information for PageCursorProviderImpl:");
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/StorageManager.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/StorageManager.java 2011-02-17 19:25:42 UTC (rev 10230)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/StorageManager.java 2011-02-20 05:03:54 UTC (rev 10231)
@@ -57,6 +57,8 @@
/** It just creates an OperationContext without associating it */
OperationContext newContext(Executor executor);
+
+ OperationContext newSingleThreadContext();
/** Set the context back to the thread */
void setContext(OperationContext context);
@@ -74,10 +76,14 @@
void afterCompleteOperations(IOAsyncTask run);
/** Block until the operations are done.
+ * Warning: Don't use it inside an ordered executor, otherwise the system may lock up
+ * in case of the pools are full
* @throws Exception */
- void waitOnOperations(long timeout) throws Exception;
+ boolean waitOnOperations(long timeout) throws Exception;
/** Block until the operations are done.
+ * Warning: Don't use it inside an ordered executor, otherwise the system may lock up
+ * in case of the pools are full
* @throws Exception */
void waitOnOperations() throws Exception;
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2011-02-17 19:25:42 UTC (rev 10230)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2011-02-20 05:03:54 UTC (rev 10231)
@@ -15,6 +15,8 @@
import java.io.File;
import java.nio.ByteBuffer;
+import java.security.AccessController;
+import java.security.PrivilegedAction;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
@@ -23,6 +25,8 @@
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import javax.transaction.xa.Xid;
@@ -52,10 +56,8 @@
import org.hornetq.core.paging.PagedMessage;
import org.hornetq.core.paging.PagingManager;
import org.hornetq.core.paging.PagingStore;
-import org.hornetq.core.paging.cursor.PageCursorProvider;
import org.hornetq.core.paging.cursor.PagePosition;
import org.hornetq.core.paging.cursor.PageSubscription;
-import org.hornetq.core.paging.cursor.PageSubscriptionCounter;
import org.hornetq.core.paging.cursor.PagedReferenceImpl;
import org.hornetq.core.paging.cursor.impl.PagePositionImpl;
import org.hornetq.core.paging.impl.PageTransactionInfoImpl;
@@ -85,6 +87,7 @@
import org.hornetq.core.transaction.impl.TransactionImpl;
import org.hornetq.utils.DataConstants;
import org.hornetq.utils.ExecutorFactory;
+import org.hornetq.utils.HornetQThreadFactory;
import org.hornetq.utils.UUID;
import org.hornetq.utils.XidCodecSupport;
@@ -165,6 +168,8 @@
private final ExecutorFactory executorFactory;
private final Executor executor;
+
+ private ExecutorService singleThreadExecutor;
private final boolean syncTransactional;
@@ -333,17 +338,14 @@
/* (non-Javadoc)
* @see org.hornetq.core.persistence.StorageManager#blockOnReplication()
*/
- public void waitOnOperations(final long timeout) throws Exception
+ public boolean waitOnOperations(final long timeout) throws Exception
{
if (!started)
{
JournalStorageManager.log.warn("Server is stopped");
throw new IllegalStateException("Server is stopped");
}
- if (!getContext().waitCompletion(timeout))
- {
- throw new HornetQException(HornetQException.IO_ERROR, "Timeout on waiting I/O completion");
- }
+ return getContext().waitCompletion(timeout);
}
/*
@@ -393,6 +395,16 @@
{
OperationContextImpl.setContext(context);
}
+
+ public Executor getSingleThreadExecutor()
+ {
+ return singleThreadExecutor;
+ }
+
+ public OperationContext newSingleThreadContext()
+ {
+ return newContext(singleThreadExecutor);
+ }
/* (non-Javadoc)
* @see org.hornetq.core.persistence.StorageManager#newContext()
@@ -1390,6 +1402,8 @@
checkAndCreateDir(largeMessagesDirectory, createJournalDir);
cleanupIncompleteFiles();
+
+ singleThreadExecutor = Executors.newSingleThreadExecutor(new HornetQThreadFactory("HornetQ-IO-SingleThread", true, getThisClassLoader()));
bindingsJournal.start();
@@ -1398,6 +1412,7 @@
started = true;
}
+
public synchronized void stop() throws Exception
{
if (!started)
@@ -1414,6 +1429,8 @@
bindingsJournal.stop();
messageJournal.stop();
+
+ singleThreadExecutor.shutdown();
journalLoaded = false;
@@ -1834,6 +1851,19 @@
}
}
+
+ private static ClassLoader getThisClassLoader()
+ {
+ return AccessController.doPrivileged(new PrivilegedAction<ClassLoader>()
+ {
+ public ClassLoader run()
+ {
+ return JournalStorageManager.class.getClassLoader();
+ }
+ });
+ }
+
+
// Inner Classes
// ----------------------------------------------------------------------------
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/OperationContextImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/OperationContextImpl.java 2011-02-17 19:25:42 UTC (rev 10230)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/OperationContextImpl.java 2011-02-20 05:03:54 UTC (rev 10231)
@@ -232,10 +232,16 @@
{
public void run()
{
- // If any IO is done inside the callback, it needs to be done on a new context
- OperationContextImpl.clearContext();
- task.done();
- executorsPending.decrementAndGet();
+ try
+ {
+ // If any IO is done inside the callback, it needs to be done on a new context
+ OperationContextImpl.clearContext();
+ task.done();
+ }
+ finally
+ {
+ executorsPending.decrementAndGet();
+ }
}
});
}
@@ -277,6 +283,25 @@
class TaskHolder
{
+
+
+
+ /* (non-Javadoc)
+ * @see java.lang.Object#toString()
+ */
+ @Override
+ public String toString()
+ {
+ return "TaskHolder [storeLined=" + storeLined +
+ ", replicationLined=" +
+ replicationLined +
+ ", pageLined=" +
+ pageLined +
+ ", task=" +
+ task +
+ "]";
+ }
+
int storeLined;
int replicationLined;
@@ -327,19 +352,39 @@
@Override
public String toString()
{
- return "OperationContextImpl [storeLineUp=" + storeLineUp +
+ StringBuffer buffer = new StringBuffer();
+ for (TaskHolder hold : tasks)
+ {
+ buffer.append("Task = " + hold + "\n");
+ }
+
+ return "OperationContextImpl [minimalStore=" + minimalStore +
+ ", storeLineUp=" +
+ storeLineUp +
", stored=" +
stored +
+ ", minimalReplicated=" +
+ minimalReplicated +
", replicationLineUp=" +
replicationLineUp +
", replicated=" +
replicated +
+ ", paged=" +
+ paged +
+ ", minimalPage=" +
+ minimalPage +
", pageLineUp=" +
pageLineUp +
- ", paged=" +
- paged +
- "]";
+ ", errorCode=" +
+ errorCode +
+ ", errorMessage=" +
+ errorMessage +
+ ", executorsPending=" +
+ executorsPending +
+ ", executor=" + this.executor +
+ "]" + buffer.toString();
}
+
}
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java 2011-02-17 19:25:42 UTC (rev 10230)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java 2011-02-20 05:03:54 UTC (rev 10231)
@@ -364,8 +364,9 @@
/* (non-Javadoc)
* @see org.hornetq.core.persistence.StorageManager#blockOnReplication(long)
*/
- public void waitOnOperations(final long timeout) throws Exception
+ public boolean waitOnOperations(final long timeout) throws Exception
{
+ return true;
}
/* (non-Javadoc)
@@ -406,7 +407,14 @@
{
return dummyContext;
}
+
+
+ public OperationContext newSingleThreadContext()
+ {
+ return dummyContext;
+ }
+
/* (non-Javadoc)
* @see org.hornetq.core.persistence.StorageManager#setContext(org.hornetq.core.persistence.OperationContext)
*/
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/Queue.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/Queue.java 2011-02-17 19:25:42 UTC (rev 10230)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/Queue.java 2011-02-20 05:03:54 UTC (rev 10231)
@@ -159,7 +159,7 @@
void resetAllIterators();
- void blockOnExecutorFuture();
+ boolean blockOnExecutorFuture();
void close() throws Exception;
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/group/impl/LocalGroupingHandler.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/group/impl/LocalGroupingHandler.java 2011-02-17 19:25:42 UTC (rev 10230)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/group/impl/LocalGroupingHandler.java 2011-02-20 05:03:54 UTC (rev 10231)
@@ -16,10 +16,12 @@
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
+import org.hornetq.api.core.HornetQException;
import org.hornetq.api.core.SimpleString;
import org.hornetq.api.core.management.ManagementHelper;
import org.hornetq.api.core.management.NotificationType;
import org.hornetq.core.logging.Logger;
+import org.hornetq.core.persistence.OperationContext;
import org.hornetq.core.persistence.StorageManager;
import org.hornetq.core.postoffice.BindingType;
import org.hornetq.core.server.group.GroupingHandler;
@@ -70,30 +72,45 @@
public Response propose(final Proposal proposal) throws Exception
{
- if (proposal.getClusterName() == null)
+ OperationContext originalCtx = storageManager.getContext();
+
+ try
{
- GroupBinding original = map.get(proposal.getGroupId());
- return original == null ? null : new Response(proposal.getGroupId(), original.getClusterName());
- }
- GroupBinding groupBinding = new GroupBinding(proposal.getGroupId(), proposal.getClusterName());
- if (map.putIfAbsent(groupBinding.getGroupId(), groupBinding) == null)
- {
- groupBinding.setId(storageManager.generateUniqueID());
- List<GroupBinding> newList = new ArrayList<GroupBinding>();
- List<GroupBinding> oldList = groupMap.putIfAbsent(groupBinding.getClusterName(), newList);
- if (oldList != null)
+ // the waitCompletion cannot be done inside an ordered executor or we would starve when the thread pool is full
+ storageManager.setContext(storageManager.newSingleThreadContext());
+
+ if (proposal.getClusterName() == null)
{
- newList = oldList;
+ GroupBinding original = map.get(proposal.getGroupId());
+ return original == null ? null : new Response(proposal.getGroupId(), original.getClusterName());
}
- newList.add(groupBinding);
- storageManager.addGrouping(groupBinding);
- storageManager.waitOnOperations(timeout);
- return new Response(groupBinding.getGroupId(), groupBinding.getClusterName());
+ GroupBinding groupBinding = new GroupBinding(proposal.getGroupId(), proposal.getClusterName());
+ if (map.putIfAbsent(groupBinding.getGroupId(), groupBinding) == null)
+ {
+ groupBinding.setId(storageManager.generateUniqueID());
+ List<GroupBinding> newList = new ArrayList<GroupBinding>();
+ List<GroupBinding> oldList = groupMap.putIfAbsent(groupBinding.getClusterName(), newList);
+ if (oldList != null)
+ {
+ newList = oldList;
+ }
+ newList.add(groupBinding);
+ storageManager.addGrouping(groupBinding);
+ if (!storageManager.waitOnOperations(timeout))
+ {
+ throw new HornetQException(HornetQException.IO_ERROR, "Timeout on waiting I/O completion");
+ }
+ return new Response(groupBinding.getGroupId(), groupBinding.getClusterName());
+ }
+ else
+ {
+ groupBinding = map.get(proposal.getGroupId());
+ return new Response(groupBinding.getGroupId(), proposal.getClusterName(), groupBinding.getClusterName());
+ }
}
- else
+ finally
{
- groupBinding = map.get(proposal.getGroupId());
- return new Response(groupBinding.getGroupId(), proposal.getClusterName(), groupBinding.getClusterName());
+ storageManager.setContext(originalCtx);
}
}
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/QueueImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/QueueImpl.java 2011-02-17 19:25:42 UTC (rev 10230)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/QueueImpl.java 2011-02-20 05:03:54 UTC (rev 10231)
@@ -370,10 +370,11 @@
{
// We must block on the executor to ensure any async deliveries have completed or we might get out of order
// deliveries
- blockOnExecutorFuture();
-
- // Go into direct delivery mode
- directDeliver = true;
+ if (blockOnExecutorFuture())
+ {
+ // Go into direct delivery mode
+ directDeliver = true;
+ }
}
checkDirect = false;
}
@@ -420,7 +421,7 @@
blockOnExecutorFuture();
}
- public void blockOnExecutorFuture()
+ public boolean blockOnExecutorFuture()
{
Future future = new Future();
@@ -430,8 +431,10 @@
if (!ok)
{
- throw new IllegalStateException("Timed out waiting for future to complete");
+ log.warn("Couldn't finish waiting executors. Try increasing the thread pool size");
}
+
+ return ok;
}
public synchronized void addConsumer(final Consumer consumer) throws Exception
Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java 2011-02-17 19:25:42 UTC (rev 10230)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java 2011-02-20 05:03:54 UTC (rev 10231)
@@ -1461,8 +1461,9 @@
/* (non-Javadoc)
* @see org.hornetq.core.persistence.StorageManager#blockOnReplication(long)
*/
- public void waitOnOperations(final long timeout) throws Exception
+ public boolean waitOnOperations(final long timeout) throws Exception
{
+ return true;
}
/* (non-Javadoc)
@@ -1659,6 +1660,14 @@
return 0;
}
+ /* (non-Javadoc)
+ * @see org.hornetq.core.persistence.StorageManager#newSingleThreadContext()
+ */
+ public OperationContext newSingleThreadContext()
+ {
+ return getContext();
+ }
+
}
class FakeStoreFactory implements PagingStoreFactory
Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/postoffice/impl/FakeQueue.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/postoffice/impl/FakeQueue.java 2011-02-17 19:25:42 UTC (rev 10230)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/postoffice/impl/FakeQueue.java 2011-02-20 05:03:54 UTC (rev 10231)
@@ -64,10 +64,9 @@
}
- public void blockOnExecutorFuture()
+ public boolean blockOnExecutorFuture()
{
- // TODO Auto-generated method stub
-
+ return true;
}
public void addHead(MessageReference ref)
More information about the hornetq-commits
mailing list