Author: borges
Date: 2012-03-16 11:23:41 -0400 (Fri, 16 Mar 2012)
New Revision: 12309
Modified:
trunk/hornetq-core/src/main/java/org/hornetq/core/paging/impl/PagingStoreImpl.java
Log:
HORNETQ-720 Avoid dead-locks on isPaging(), and make it faster
Modified:
trunk/hornetq-core/src/main/java/org/hornetq/core/paging/impl/PagingStoreImpl.java
===================================================================
---
trunk/hornetq-core/src/main/java/org/hornetq/core/paging/impl/PagingStoreImpl.java 2012-03-16
15:23:31 UTC (rev 12308)
+++
trunk/hornetq-core/src/main/java/org/hornetq/core/paging/impl/PagingStoreImpl.java 2012-03-16
15:23:41 UTC (rev 12309)
@@ -54,15 +54,14 @@
import org.hornetq.core.settings.impl.AddressFullMessagePolicy;
import org.hornetq.core.settings.impl.AddressSettings;
import org.hornetq.core.transaction.Transaction;
-import org.hornetq.core.transaction.Transaction.State;
import org.hornetq.core.transaction.TransactionOperation;
import org.hornetq.core.transaction.TransactionPropertyIndexes;
import org.hornetq.utils.Future;
/**
- *
+ *
* @see PagingStore
- *
+ *
* @author <a href="mailto:clebert.suconic@jboss.com">Clebert
Suconic</a>
* @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
*
@@ -116,6 +115,7 @@
private volatile Page currentPage;
+ private final Object pagingGuard = new Object();
private volatile boolean paging = false;
private final PageCursorProvider cursorProvider;
@@ -208,7 +208,7 @@
pageSize = addressSettings.getPageSizeBytes();
addressFullMessagePolicy = addressSettings.getAddressFullMessagePolicy();
-
+
if (cursorProvider != null)
{
cursorProvider.setCacheMaxSize(addressSettings.getPageCacheMaxSize());
@@ -286,7 +286,7 @@
{
return pageSize;
}
-
+
public String getFolder()
{
SequentialFileFactory factoryUsed = this.fileFactory;
@@ -302,28 +302,19 @@
public boolean isPaging()
{
- lock.readLock().lock();
-
- try
+ synchronized (pagingGuard)
{
if (addressFullMessagePolicy == AddressFullMessagePolicy.BLOCK)
{
return false;
}
- else if (addressFullMessagePolicy == AddressFullMessagePolicy.DROP)
+ if (addressFullMessagePolicy == AddressFullMessagePolicy.DROP)
{
return isFull();
}
- else
- {
- return paging;
- }
- }
- finally
- {
- lock.readLock().unlock();
- }
- }
+ return paging;
+ }
+ }
public int getNumberOfPages()
{
@@ -527,7 +518,10 @@
lock.writeLock().lock();
try
{
- paging = false;
+ synchronized (pagingGuard)
+ {
+ paging = false;
+ }
}
finally
{
@@ -542,18 +536,13 @@
return false;
}
- lock.readLock().lock();
- try
+ synchronized (pagingGuard)
{
if (paging)
{
return false;
}
}
- finally
- {
- lock.readLock().unlock();
- }
// if the first check failed, we do it again under a global currentPageLock
// (writeLock) this time
@@ -561,6 +550,8 @@
try
{
+ synchronized (pagingGuard)
+ {
if (paging)
{
return false;
@@ -584,6 +575,7 @@
paging = true;
return true;
+ }
}
finally
{
@@ -595,7 +587,7 @@
{
return currentPage;
}
-
+
public boolean checkPageFileExists(final int pageNumber)
{
String fileName = createFileName(pageNumber);
@@ -827,9 +819,7 @@
}
- private
- boolean
- page(ServerMessage message, final RoutingContext ctx, RouteContextList
listCtx, final boolean sync)
+ private boolean page(ServerMessage message, final RoutingContext ctx,
RouteContextList listCtx, final boolean sync)
throws Exception
{
if (!running)
@@ -849,7 +839,7 @@
PagingStoreImpl.log.warn("Messages are being dropped on address
" + getStoreName());
}
-
+
if (log.isDebugEnabled())
{
log.debug("Message " + message + " beig dropped for
fullAddressPolicy==DROP");
@@ -869,20 +859,13 @@
}
// We need to ensure a read lock, as depage could change the paging state
- lock.readLock().lock();
+ synchronized (pagingGuard) {
- try
- {
// First check done concurrently, to avoid synchronization and increase
throughput
- if (!paging)
- {
+ if (!paging) {
return false;
}
- }
- finally
- {
- lock.readLock().unlock();
- }
+ }
Transaction tx = ctx.getTransaction();
@@ -890,9 +873,12 @@
try
{
- if (!paging)
+ synchronized (pagingGuard)
{
- return false;
+ if (!paging)
+ {
+ return false;
+ }
}
if (!message.isDurable())
@@ -904,7 +890,7 @@
PagedMessage pagedMessage = new PagedMessageImpl(message, routeQueues(tx,
listCtx), tx == null ? -1 : tx.getID());
-
+
if (message.isLargeMessage())
{
((LargeServerMessage)message).setPaged();
@@ -918,16 +904,16 @@
openNewPage();
currentPageSize.addAndGet(bytesToWrite);
}
-
+
currentPage.write(pagedMessage);
if (isTrace)
{
- log.trace("Paging message " + pagedMessage + " on pageStore
" + this.getStoreName() +
+ log.trace("Paging message " + pagedMessage + " on pageStore
" + this.getStoreName() +
" pageId=" + currentPage.getPageId());
}
-
-
+
+
if (tx != null)
{
installPageTransaction(tx, listCtx);
@@ -990,15 +976,15 @@
private static class FinishPageMessageOperation implements TransactionOperation
{
public final PageTransactionInfo pageTransaction;
-
+
private final StorageManager storageManager;
-
+
private final PagingManager pagingManager;
-
+
private final Set<PagingStore> usedStores = new
HashSet<PagingStore>();
private boolean stored = false;
-
+
public void addStore(PagingStore store)
{
this.usedStores.add(store);
@@ -1123,9 +1109,9 @@
}
/**
- *
+ *
* Note: Decimalformat is not thread safe, Use synchronization before calling this
method
- *
+ *
* @param pageID
* @return
*/
@@ -1144,6 +1130,7 @@
{
return maxSize > 0 && getAddressSize() > maxSize;
}
+
@Override
public Collection<Integer> getCurrentIds() throws Exception
{