Author: clebert.suconic(a)jboss.com
Date: 2011-12-12 16:36:13 -0500 (Mon, 12 Dec 2011)
New Revision: 11898
Modified:
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/paging/impl/PageImpl.java
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/persistence/StorageManager.java
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java
Log:
JBPAPP-7655 - The customer is facing an issue that's directly related to the way
concurrent paging will happen, and multiple pages being accessed. We are adding a direct
allocation on Paging (and releasing it manually what is causing issues with NIO), and we
are also dealing with some max-IO on paging
Modified:
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java
===================================================================
---
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java 2011-12-11
05:23:46 UTC (rev 11897)
+++
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java 2011-12-12
21:36:13 UTC (rev 11898)
@@ -183,6 +183,7 @@
{
page = pagingStore.createPage((int)pageId);
+ storageManager.beforePageRead();
page.open();
List<PagedMessage> pgdMessages = page.read(storageManager);
@@ -200,6 +201,7 @@
catch (Throwable ignored)
{
}
+ storageManager.afterPageRead();
cache.unlock();
}
}
@@ -451,8 +453,26 @@
// The page is not on cache any more
// We need to read the page-file before deleting it
// to make sure we remove any large-messages pending
- depagedPage.open();
- List<PagedMessage> pgdMessagesList =
depagedPage.read(storageManager);
+ storageManager.beforePageRead();
+
+ List<PagedMessage> pgdMessagesList = null;
+ try
+ {
+ depagedPage.open();
+ pgdMessagesList = depagedPage.read(storageManager);
+ }
+ finally
+ {
+ try
+ {
+ depagedPage.close();
+ }
+ catch (Exception e)
+ {
+ }
+
+ storageManager.afterPageRead();
+ }
depagedPage.close();
pgdMessages = pgdMessagesList.toArray(new
PagedMessage[pgdMessagesList.size()]);
}
Modified:
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java
===================================================================
---
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java 2011-12-11
05:23:46 UTC (rev 11897)
+++
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java 2011-12-12
21:36:13 UTC (rev 11898)
@@ -1290,7 +1290,11 @@
public void remove()
{
deliveredCount.incrementAndGet();
- PageSubscriptionImpl.this.getPageInfo(position).remove(position);
+ PageCursorInfo info = PageSubscriptionImpl.this.getPageInfo(position);
+ if (info != null)
+ {
+ info.remove(position);
+ }
}
/* (non-Javadoc)
Modified:
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/paging/impl/PageImpl.java
===================================================================
---
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/paging/impl/PageImpl.java 2011-12-11
05:23:46 UTC (rev 11897)
+++
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/paging/impl/PageImpl.java 2011-12-12
21:36:13 UTC (rev 11898)
@@ -107,68 +107,77 @@
public List<PagedMessage> read(StorageManager storage) throws Exception
{
- if (isDebug)
- {
- log.debug("reading page " + this.pageId + " on address = " +
storeName);
- }
-
+ if (isDebug)
+ {
+ log.debug("reading page " + this.pageId + " on address = " +
storeName);
+ }
+
ArrayList<PagedMessage> messages = new ArrayList<PagedMessage>();
size.set((int)file.size());
// Using direct buffer, as described on
https://jira.jboss.org/browse/HORNETQ-467
- ByteBuffer buffer2 = ByteBuffer.allocateDirect(size.get());
-
- file.position(0);
- file.read(buffer2);
+ ByteBuffer directBuffer = storage.allocateDirectBuffer((int)file.size());
- buffer2.rewind();
+ try
+ {
- HornetQBuffer fileBuffer = HornetQBuffers.wrappedBuffer(buffer2);
- fileBuffer.writerIndex(fileBuffer.capacity());
+ file.position(0);
+ file.read(directBuffer);
- while (fileBuffer.readable())
- {
- final int position = fileBuffer.readerIndex();
+ directBuffer.rewind();
- byte byteRead = fileBuffer.readByte();
+ HornetQBuffer fileBuffer = HornetQBuffers.wrappedBuffer(directBuffer);
+ fileBuffer.writerIndex(fileBuffer.capacity());
- if (byteRead == PageImpl.START_BYTE)
+ while (fileBuffer.readable())
{
- if (fileBuffer.readerIndex() + DataConstants.SIZE_INT <
fileBuffer.capacity())
+ final int position = fileBuffer.readerIndex();
+
+ byte byteRead = fileBuffer.readByte();
+
+ if (byteRead == PageImpl.START_BYTE)
{
- int messageSize = fileBuffer.readInt();
- int oldPos = fileBuffer.readerIndex();
- if (fileBuffer.readerIndex() + messageSize < fileBuffer.capacity()
&& fileBuffer.getByte(oldPos + messageSize) == PageImpl.END_BYTE)
+ if (fileBuffer.readerIndex() + DataConstants.SIZE_INT <
fileBuffer.capacity())
{
- PagedMessage msg = new PagedMessageImpl();
- msg.decode(fileBuffer);
- byte b = fileBuffer.readByte();
- if (b != PageImpl.END_BYTE)
+ int messageSize = fileBuffer.readInt();
+ int oldPos = fileBuffer.readerIndex();
+ if (fileBuffer.readerIndex() + messageSize < fileBuffer.capacity()
&& fileBuffer.getByte(oldPos + messageSize) == PageImpl.END_BYTE)
{
- // Sanity Check: This would only happen if there is a bug on decode
or any internal code, as this
- // constraint was already checked
- throw new IllegalStateException("Internal error, it wasn't
possible to locate END_BYTE " + b);
+ PagedMessage msg = new PagedMessageImpl();
+ msg.decode(fileBuffer);
+ byte b = fileBuffer.readByte();
+ if (b != PageImpl.END_BYTE)
+ {
+ // Sanity Check: This would only happen if there is a bug on
decode or any internal code, as
+ // this
+ // constraint was already checked
+ throw new IllegalStateException("Internal error, it
wasn't possible to locate END_BYTE " + b);
+ }
+ msg.initMessage(storage);
+ if (isTrace)
+ {
+ log.trace("Reading message " + msg + " on
pageId=" + this.pageId + " for address=" + storeName);
+ }
+ messages.add(msg);
}
- msg.initMessage(storage);
- if (isTrace)
+ else
{
- log.trace("Reading message " + msg + " on
pageId=" + this.pageId + " for address=" + storeName);
+ markFileAsSuspect(position, messages.size());
+ break;
}
- messages.add(msg);
}
- else
- {
- markFileAsSuspect(position, messages.size());
- break;
- }
}
+ else
+ {
+ markFileAsSuspect(position, messages.size());
+ break;
+ }
}
- else
- {
- markFileAsSuspect(position, messages.size());
- break;
- }
}
+ finally
+ {
+ storage.freeDirectuffer(directBuffer);
+ }
numberOfMessages.set(messages.size());
Modified:
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/persistence/StorageManager.java
===================================================================
---
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/persistence/StorageManager.java 2011-12-11
05:23:46 UTC (rev 11897)
+++
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/persistence/StorageManager.java 2011-12-12
21:36:13 UTC (rev 11898)
@@ -13,6 +13,7 @@
package org.hornetq.core.persistence;
+import java.nio.ByteBuffer;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executor;
@@ -88,6 +89,34 @@
* in case of the pools are full
* @throws Exception */
void waitOnOperations() throws Exception;
+
+ /**
+ * We need a safeguard in place to avoid too much concurrent IO happening on Paging,
+ * otherwise the system may become irrensponsive if too many destinations are reading
all the same time.
+ * This is called before we read, so we can limit concurrent reads
+ * @throws Exception
+ */
+ void beforePageRead() throws Exception;
+
+ /**
+ * We need a safeguard in place to avoid too much concurrent IO happening on Paging,
+ * otherwise the system may become irrensponsive if too many destinations are reading
all the same time.
+ * This is called after we read, so we can limit concurrent reads
+ * @throws Exception
+ */
+ void afterPageRead() throws Exception;
+
+
+ /** AIO has an optimized buffer which has a method to release it
+ instead of the way NIO will release data based on GC.
+ These methods will use that buffer if the inner method supports it */
+ ByteBuffer allocateDirectBuffer(int size);
+
+ /** AIO has an optimized buffer which has a method to release it
+ instead of the way NIO will release data based on GC.
+ These methods will use that buffer if the inner method supports it */
+ void freeDirectuffer(ByteBuffer buffer);
+
void clearContext();
Modified:
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
---
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2011-12-11
05:23:46 UTC (rev 11897)
+++
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2011-12-12
21:36:13 UTC (rev 11898)
@@ -166,6 +166,8 @@
private final Journal bindingsJournal;
private final SequentialFileFactory largeMessagesFactory;
+
+ private SequentialFileFactory journalFF = null;
private volatile boolean started;
@@ -261,8 +263,6 @@
syncTransactional = config.isJournalSyncTransactional();
- SequentialFileFactory journalFF = null;
-
if (config.getJournalType() == JournalType.ASYNCIO)
{
JournalStorageManager.log.info("Using AIO Journal");
@@ -1532,6 +1532,41 @@
return info;
}
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.persistence.StorageManager#startPageRead()
+ */
+ public void beforePageRead() throws Exception
+ {
+ // TODO Auto-generated method stub
+
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.persistence.StorageManager#finishPageRead()
+ */
+ public void afterPageRead() throws Exception
+ {
+ // TODO Auto-generated method stub
+
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.persistence.StorageManager#allocateDirectBuffer(long)
+ */
+ public ByteBuffer allocateDirectBuffer(int size)
+ {
+ return journalFF.newBuffer(size);
+ }
+
+ /* (non-Javadoc)
+ * @see
org.hornetq.core.persistence.StorageManager#freeDirectuffer(java.nio.ByteBuffer)
+ */
+ public void freeDirectuffer(ByteBuffer buffer)
+ {
+ journalFF.releaseBuffer(buffer);
+ }
+
// Public
-----------------------------------------------------------------------------------
public Journal getMessageJournal()
Modified:
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java
===================================================================
---
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java 2011-12-11
05:23:46 UTC (rev 11897)
+++
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java 2011-12-12
21:36:13 UTC (rev 11898)
@@ -13,6 +13,7 @@
package org.hornetq.core.persistence.impl.nullpm;
+import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.List;
import java.util.Map;
@@ -569,4 +570,36 @@
}
+ /* (non-Javadoc)
+ * @see org.hornetq.core.persistence.StorageManager#startPageRead()
+ */
+ public void beforePageRead() throws Exception
+ {
+
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.persistence.StorageManager#finishPageRead()
+ */
+ public void afterPageRead() throws Exception
+ {
+
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.persistence.StorageManager#allocateDirectBuffer(int)
+ */
+ public ByteBuffer allocateDirectBuffer(int size)
+ {
+ return ByteBuffer.allocateDirect(size);
+ }
+
+ /* (non-Javadoc)
+ * @see
org.hornetq.core.persistence.StorageManager#freeDirectuffer(java.nio.ByteBuffer)
+ */
+ public void freeDirectuffer(ByteBuffer buffer)
+ {
+ // nothing to be done.. just wait for GC
+ }
+
}
Modified:
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java
===================================================================
---
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java 2011-12-11
05:23:46 UTC (rev 11897)
+++
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java 2011-12-12
21:36:13 UTC (rev 11898)
@@ -13,6 +13,7 @@
package org.hornetq.tests.unit.core.paging.impl;
+import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@@ -1680,6 +1681,41 @@
}
+ /* (non-Javadoc)
+ * @see org.hornetq.core.persistence.StorageManager#startPageRead()
+ */
+ public void beforePageRead() throws Exception
+ {
+ // TODO Auto-generated method stub
+
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.persistence.StorageManager#finishPageRead()
+ */
+ public void afterPageRead() throws Exception
+ {
+ // TODO Auto-generated method stub
+
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.persistence.StorageManager#allocateDirectBuffer(int)
+ */
+ public ByteBuffer allocateDirectBuffer(int size)
+ {
+ return ByteBuffer.allocateDirect(size);
+ }
+
+ /* (non-Javadoc)
+ * @see
org.hornetq.core.persistence.StorageManager#freeDirectuffer(java.nio.ByteBuffer)
+ */
+ public void freeDirectuffer(ByteBuffer buffer)
+ {
+ // TODO Auto-generated method stub
+
+ }
+
}
class FakeStoreFactory implements PagingStoreFactory