Author: borges
Date: 2011-09-12 09:33:49 -0400 (Mon, 12 Sep 2011)
New Revision: 11326
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/paging/PagingStore.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/paging/impl/PagingManagerImpl.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/paging/impl/PagingStoreImpl.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/utils/OrderedExecutorFactory.java
Log:
HORNETQ-720 Fix dead-lock on PagingStore shutdown, remove dead code, reduce visibility
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/paging/PagingStore.java
===================================================================
---
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/paging/PagingStore.java 2011-09-12
13:33:04 UTC (rev 11325)
+++
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/paging/PagingStore.java 2011-09-12
13:33:49 UTC (rev 11326)
@@ -95,12 +95,10 @@
*/
Page depage() throws Exception;
-
void forceAnotherPage() throws Exception;
Page getCurrentPage();
-
/** @return true if paging was started, or false if paging was already started before
this call */
boolean startPaging() throws Exception;
@@ -110,20 +108,19 @@
void executeRunnableWhenMemoryAvailable(Runnable runnable);
- /** This method will hold and producer, but it wait operations to finish before
locking (write lock) */
- void lock();
+ /**
+ * Write lock the PagingStore.
+ * @param timeout milliseconds to wait for the lock. If value is {@literal -1} then
wait
+ * indefinitely.
+ * @return {@code true} if the lock was obtained, {@code false} otherwise
+ */
+ boolean lock(long timeout);
/**
- *
- * Call this method using the same thread used by the last call of {@link
PagingStore#lock()}
- *
+ * Call this method using the same thread used by the last call of {@link
PagingStore#lock()}.
*/
void unlock();
- /** This is used mostly by tests.
- * We will wait any pending runnable to finish its execution */
- void flushExecutors();
-
/**
* Files to synchronize with backup.
* @return
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java
===================================================================
---
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java 2011-09-12
13:33:04 UTC (rev 11325)
+++
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java 2011-09-12
13:33:49 UTC (rev 11326)
@@ -36,7 +36,7 @@
/**
* A PageProviderIMpl
- *
+ *
* TODO: this may be moved entirely into PagingStore as there's an one-to-one
relationship here
* However I want to keep this isolated as much as possible during development
*
@@ -302,6 +302,12 @@
storageManager.clearContext();
}
}
+
+ @Override
+ public String toString()
+ {
+ return "PageCursorProvider:scheduleCleanup()";
+ }
});
}
@@ -309,8 +315,15 @@
{
ArrayList<Page> depagedPages = new ArrayList<Page>();
- pagingStore.lock();
-
+ while (true)
+ {
+ if (pagingStore.lock(100))
+ {
+ break;
+ }
+ if (!pagingStore.isStarted())
+ return;
+ }
synchronized (this)
{
try
@@ -342,7 +355,6 @@
break;
}
}
-
if (complete)
{
@@ -378,6 +390,7 @@
if (pagingStore.getNumberOfPages() == 0 || pagingStore.getNumberOfPages() ==
1 &&
pagingStore.getCurrentPage().getNumberOfMessages() == 0)
{
+
pagingStore.stopPaging();
}
else
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java
===================================================================
---
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java 2011-09-12
13:33:04 UTC (rev 11325)
+++
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java 2011-09-12
13:33:49 UTC (rev 11326)
@@ -57,12 +57,12 @@
/**
* A PageCursorImpl
*
- * A page cursor will always store its
+ * A page cursor will always store its
* @author <a href="mailto:clebert.suconic@jboss.com">Clebert
Suconic</a>
*
- *
+ *
*/
-public class PageSubscriptionImpl implements PageSubscription
+class PageSubscriptionImpl implements PageSubscription
{
// Constants -----------------------------------------------------
private static final Logger log = Logger.getLogger(PageSubscriptionImpl.class);
@@ -99,7 +99,7 @@
private final SortedMap<Long, PageCursorInfo> consumedPages =
Collections.synchronizedSortedMap(new TreeMap<Long, PageCursorInfo>());
private final PageSubscriptionCounter counter;
-
+
private final Executor executor;
private final AtomicLong deliveredCount = new AtomicLong(0);
@@ -210,7 +210,7 @@
}
}
- /**
+ /**
* It will cleanup all the records for completed pages
* */
public void cleanupEntries() throws Exception
@@ -224,7 +224,7 @@
// First get the completed pages using a lock
synchronized (this)
{
- for (Entry<Long, PageCursorInfo> entry : consumedPages.entrySet())
+ for (Entry<Long, PageCursorInfo> entry : consumedPages.entrySet())
{
PageCursorInfo info = entry.getValue();
if (info.isDone() && !info.isPendingDelete() &&
lastAckedPosition != null)
@@ -363,7 +363,7 @@
}
/**
- *
+ *
*/
private synchronized PagePosition getStartPosition()
{
@@ -395,7 +395,7 @@
}
}
- if (isTrace)
+ if (isTrace)
{
trace("Returning initial position " + retValue);
}
@@ -520,7 +520,7 @@
}
}
- /**
+ /**
* Theres no need to synchronize this method as it's only called from journal load
on startup
*/
public void reloadACK(final PagePosition position)
@@ -550,7 +550,7 @@
processACK(position);
}
-
+
public void lateDeliveryRollback(PagePosition position)
{
PageCursorInfo cursorInfo = processACK(position);
@@ -574,9 +574,9 @@
final long tx = store.generateUniqueID();
try
{
-
+
boolean isPersistent = false;
-
+
synchronized (PageSubscriptionImpl.this)
{
for (PageCursorInfo cursor : consumedPages.values())
@@ -591,12 +591,12 @@
}
}
}
-
+
if (isPersistent)
{
store.commit(tx);
}
-
+
cursorProvider.close(this);
}
catch (Exception e)
@@ -678,7 +678,7 @@
printDebug(toString());
}
- public void printDebug(final String msg)
+ private void printDebug(final String msg)
{
System.out.println("Debug information on PageCurorImpl- " + msg);
for (PageCursorInfo info : consumedPages.values())
@@ -752,7 +752,7 @@
{
log.trace("Scheduling cleanup on pageSubscription for address =
" + pageStore.getAddress() + " queue = " + this.getQueue().getName());
}
-
+
// there's a different page being acked, we will do the check right away
if (autoCleanup)
{
@@ -764,7 +764,7 @@
PageCursorInfo info = getPageInfo(pos);
info.addACK(pos);
-
+
return info;
}
@@ -821,7 +821,7 @@
// Inner classes -------------------------------------------------
- /**
+ /**
* This will hold information about the pending ACKs towards a page.
* This instance will be released as soon as the entire page is consumed, releasing
the memory at that point
* The ref counts are increased also when a message is ignored for any reason.
@@ -839,13 +839,13 @@
private WeakReference<PageCache> cache;
- private Set<PagePosition> removedReferences = new
ConcurrentHashSet<PagePosition>();
+ private final Set<PagePosition> removedReferences = new
ConcurrentHashSet<PagePosition>();
// The page was live at the time of the creation
private final boolean wasLive;
// There's a pending TX to add elements on this page
- private AtomicInteger pendingTX = new AtomicInteger(0);
+ private final AtomicInteger pendingTX = new AtomicInteger(0);
// There's a pending delete on the async IO pipe
// We're holding this object to avoid delete the pages before the IO is
complete,
@@ -946,7 +946,7 @@
}
/**
- *
+ *
*/
protected void checkDone()
{
@@ -1021,6 +1021,7 @@
/* (non-Javadoc)
* @see
org.hornetq.core.transaction.TransactionOperation#getRelatedMessageReferences()
*/
+ @Override
public List<MessageReference> getRelatedMessageReferences()
{
return Collections.emptyList();
@@ -1153,9 +1154,9 @@
{
ignored = true;
}
-
+
PageCursorInfo info = getPageInfo(message.getPosition(), false);
-
+
if (info != null && info.isRemoved(message.getPosition()))
{
continue;
@@ -1191,7 +1192,7 @@
// Say you have a Browser that will only read the files... there's
no need to control PageCursors is
// nothing
// is being changed. That's why the false is passed as a parameter
here
-
+
if (info != null && info.isRemoved(message.getPosition()))
{
valid = false;
@@ -1228,7 +1229,7 @@
}
}
- /** QueueImpl::deliver could be calling hasNext while QueueImpl.depage could be
using next and hasNext as well.
+ /** QueueImpl::deliver could be calling hasNext while QueueImpl.depage could be
using next and hasNext as well.
* It would be a rare race condition but I would prefer avoiding that scenario */
public synchronized boolean hasNext()
{
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/paging/impl/PagingManagerImpl.java
===================================================================
---
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/paging/impl/PagingManagerImpl.java 2011-09-12
13:33:04 UTC (rev 11325)
+++
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/paging/impl/PagingManagerImpl.java 2011-09-12
13:33:49 UTC (rev 11326)
@@ -302,7 +302,7 @@
syncLock.writeLock().lock();
for (PagingStore store : stores.values())
{
- store.lock();
+ store.lock(-1);
}
}
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/paging/impl/PagingStoreImpl.java
===================================================================
---
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/paging/impl/PagingStoreImpl.java 2011-09-12
13:33:04 UTC (rev 11325)
+++
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/paging/impl/PagingStoreImpl.java 2011-09-12
13:33:49 UTC (rev 11326)
@@ -24,6 +24,7 @@
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReadWriteLock;
@@ -224,9 +225,21 @@
// PagingStore implementation ------------------------------------
- public void lock()
+ public boolean lock(long timeout)
{
- lock.writeLock().lock();
+ if (timeout == -1)
+ {
+ lock.writeLock().lock();
+ return true;
+ }
+ try
+ {
+ return lock.writeLock().tryLock(timeout, TimeUnit.MILLISECONDS);
+ }
+ catch (InterruptedException e)
+ {
+ return false;
+ }
}
public void unlock()
@@ -373,7 +386,7 @@
public synchronized void stop() throws Exception
{
- lock();
+ lock(-1);
try
{
if (running)
@@ -397,6 +410,7 @@
}
}
+ /** Wait any pending runnable to finish its execution. */
public void flushExecutors()
{
cursorProvider.flushExecutors();
@@ -571,7 +585,7 @@
public Page createPage(final int pageNumber) throws Exception
{
- lock();
+ lock(-1);
try
{
String fileName = createFileName(pageNumber);
@@ -1131,7 +1145,7 @@
@Override
public void sendPages(ReplicationManager replicator, Collection<Integer>
pageIds) throws Exception
{
- lock();
+ lock(-1);
try
{
for (Integer id : pageIds)
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/utils/OrderedExecutorFactory.java
===================================================================
---
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/utils/OrderedExecutorFactory.java 2011-09-12
13:33:04 UTC (rev 11325)
+++
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/utils/OrderedExecutorFactory.java 2011-09-12
13:33:49 UTC (rev 11326)
@@ -23,9 +23,9 @@
*
* @author <a href="david.lloyd(a)jboss.com">David Lloyd</a>
* @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
- *
+ *
* @version <tt>$Revision$</tt>
- *
+ *
*/
public final class OrderedExecutorFactory implements ExecutorFactory
{
@@ -125,5 +125,10 @@
}
}
}
+
+ public String toString()
+ {
+ return "OrderedExecutor(running=" + running + ", tasks=" +
tasks + ")";
+ }
}
}