Author: clebert.suconic(a)jboss.com
Date: 2012-01-13 17:59:31 -0500 (Fri, 13 Jan 2012)
New Revision: 12027
Modified:
branches/Branch_2_2_AS7/src/main/org/hornetq/core/journal/impl/NIOSequentialFileFactory.java
branches/Branch_2_2_AS7/src/main/org/hornetq/core/paging/impl/PageImpl.java
branches/Branch_2_2_AS7/src/main/org/hornetq/core/persistence/StorageManager.java
branches/Branch_2_2_AS7/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
branches/Branch_2_2_AS7/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java
branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java
Log:
Merging changes from EAP branch
Modified:
branches/Branch_2_2_AS7/src/main/org/hornetq/core/journal/impl/NIOSequentialFileFactory.java
===================================================================
---
branches/Branch_2_2_AS7/src/main/org/hornetq/core/journal/impl/NIOSequentialFileFactory.java 2012-01-13
21:50:04 UTC (rev 12026)
+++
branches/Branch_2_2_AS7/src/main/org/hornetq/core/journal/impl/NIOSequentialFileFactory.java 2012-01-13
22:59:31 UTC (rev 12027)
@@ -13,13 +13,13 @@
package org.hornetq.core.journal.impl;
+import java.lang.ref.WeakReference;
import java.nio.ByteBuffer;
import org.hornetq.core.config.impl.ConfigurationImpl;
import org.hornetq.core.journal.IOCriticalErrorListener;
import org.hornetq.core.journal.SequentialFile;
import org.hornetq.core.journal.SequentialFileFactory;
-import org.hornetq.core.logging.Logger;
/**
*
@@ -31,8 +31,6 @@
*/
public class NIOSequentialFileFactory extends AbstractSequentialFileFactory implements
SequentialFileFactory
{
- private static final Logger log = Logger.getLogger(NIOSequentialFileFactory.class);
-
public NIOSequentialFileFactory(final String journalDir)
{
this(journalDir, null);
@@ -103,7 +101,36 @@
public ByteBuffer allocateDirectBuffer(final int size)
{
- return ByteBuffer.allocateDirect(size);
+ // Using direct buffer, as described on
https://jira.jboss.org/browse/HORNETQ-467
+ ByteBuffer buffer2 = null;
+ try
+ {
+ buffer2 = ByteBuffer.allocateDirect(size);
+ }
+ catch (OutOfMemoryError error)
+ {
+ // This is a workaround for the way the JDK will deal with native buffers.
+ // the main portion is outside of the VM heap
+ // and the JDK will not have any reference about it to take GC into account
+ // so we force a GC and try again.
+ WeakReference<Object> obj = new WeakReference<Object>(new
Object());
+ try
+ {
+ long timeout = System.currentTimeMillis() + 5000;
+ while (System.currentTimeMillis() > timeout && obj.get() != null)
+ {
+ System.gc();
+ Thread.sleep(100);
+ }
+ }
+ catch (InterruptedException e)
+ {
+ }
+
+ buffer2 = ByteBuffer.allocateDirect(size);
+
+ }
+ return buffer2;
}
public void releaseDirectBuffer(ByteBuffer buffer)
Modified: branches/Branch_2_2_AS7/src/main/org/hornetq/core/paging/impl/PageImpl.java
===================================================================
--- branches/Branch_2_2_AS7/src/main/org/hornetq/core/paging/impl/PageImpl.java 2012-01-13
21:50:04 UTC (rev 12026)
+++ branches/Branch_2_2_AS7/src/main/org/hornetq/core/paging/impl/PageImpl.java 2012-01-13
22:59:31 UTC (rev 12027)
@@ -107,68 +107,77 @@
public List<PagedMessage> read(StorageManager storage) throws Exception
{
- if (isDebug)
- {
- log.debug("reading page " + this.pageId + " on address = " +
storeName);
- }
-
+ if (isDebug)
+ {
+ log.debug("reading page " + this.pageId + " on address = " +
storeName);
+ }
+
ArrayList<PagedMessage> messages = new ArrayList<PagedMessage>();
size.set((int)file.size());
// Using direct buffer, as described on
https://jira.jboss.org/browse/HORNETQ-467
- ByteBuffer buffer2 = ByteBuffer.allocateDirect(size.get());
-
- file.position(0);
- file.read(buffer2);
+ ByteBuffer directBuffer = storage.allocateDirectBuffer((int)file.size());
- buffer2.rewind();
+ try
+ {
- HornetQBuffer fileBuffer = HornetQBuffers.wrappedBuffer(buffer2);
- fileBuffer.writerIndex(fileBuffer.capacity());
+ file.position(0);
+ file.read(directBuffer);
- while (fileBuffer.readable())
- {
- final int position = fileBuffer.readerIndex();
+ directBuffer.rewind();
- byte byteRead = fileBuffer.readByte();
+ HornetQBuffer fileBuffer = HornetQBuffers.wrappedBuffer(directBuffer);
+ fileBuffer.writerIndex(fileBuffer.capacity());
- if (byteRead == PageImpl.START_BYTE)
+ while (fileBuffer.readable())
{
- if (fileBuffer.readerIndex() + DataConstants.SIZE_INT <
fileBuffer.capacity())
+ final int position = fileBuffer.readerIndex();
+
+ byte byteRead = fileBuffer.readByte();
+
+ if (byteRead == PageImpl.START_BYTE)
{
- int messageSize = fileBuffer.readInt();
- int oldPos = fileBuffer.readerIndex();
- if (fileBuffer.readerIndex() + messageSize < fileBuffer.capacity()
&& fileBuffer.getByte(oldPos + messageSize) == PageImpl.END_BYTE)
+ if (fileBuffer.readerIndex() + DataConstants.SIZE_INT <
fileBuffer.capacity())
{
- PagedMessage msg = new PagedMessageImpl();
- msg.decode(fileBuffer);
- byte b = fileBuffer.readByte();
- if (b != PageImpl.END_BYTE)
+ int messageSize = fileBuffer.readInt();
+ int oldPos = fileBuffer.readerIndex();
+ if (fileBuffer.readerIndex() + messageSize < fileBuffer.capacity()
&& fileBuffer.getByte(oldPos + messageSize) == PageImpl.END_BYTE)
{
- // Sanity Check: This would only happen if there is a bug on decode
or any internal code, as this
- // constraint was already checked
- throw new IllegalStateException("Internal error, it wasn't
possible to locate END_BYTE " + b);
+ PagedMessage msg = new PagedMessageImpl();
+ msg.decode(fileBuffer);
+ byte b = fileBuffer.readByte();
+ if (b != PageImpl.END_BYTE)
+ {
+ // Sanity Check: This would only happen if there is a bug on
decode or any internal code, as
+ // this
+ // constraint was already checked
+ throw new IllegalStateException("Internal error, it
wasn't possible to locate END_BYTE " + b);
+ }
+ msg.initMessage(storage);
+ if (isTrace)
+ {
+ log.trace("Reading message " + msg + " on
pageId=" + this.pageId + " for address=" + storeName);
+ }
+ messages.add(msg);
}
- msg.initMessage(storage);
- if (isTrace)
+ else
{
- log.trace("Reading message " + msg + " on
pageId=" + this.pageId + " for address=" + storeName);
+ markFileAsSuspect(position, messages.size());
+ break;
}
- messages.add(msg);
}
- else
- {
- markFileAsSuspect(position, messages.size());
- break;
- }
}
+ else
+ {
+ markFileAsSuspect(position, messages.size());
+ break;
+ }
}
- else
- {
- markFileAsSuspect(position, messages.size());
- break;
- }
}
+ finally
+ {
+ storage.freeDirectBuffer(directBuffer);
+ }
numberOfMessages.set(messages.size());
Modified:
branches/Branch_2_2_AS7/src/main/org/hornetq/core/persistence/StorageManager.java
===================================================================
---
branches/Branch_2_2_AS7/src/main/org/hornetq/core/persistence/StorageManager.java 2012-01-13
21:50:04 UTC (rev 12026)
+++
branches/Branch_2_2_AS7/src/main/org/hornetq/core/persistence/StorageManager.java 2012-01-13
22:59:31 UTC (rev 12027)
@@ -123,7 +123,7 @@
/** AIO has an optimized buffer which has a method to release it
instead of the way NIO will release data based on GC.
These methods will use that buffer if the inner method supports it */
- void freeDirectuffer(ByteBuffer buffer);
+ void freeDirectBuffer(ByteBuffer buffer);
void clearContext();
Modified:
branches/Branch_2_2_AS7/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
---
branches/Branch_2_2_AS7/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2012-01-13
21:50:04 UTC (rev 12026)
+++
branches/Branch_2_2_AS7/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2012-01-13
22:59:31 UTC (rev 12027)
@@ -1648,7 +1648,7 @@
/* (non-Javadoc)
* @see
org.hornetq.core.persistence.StorageManager#freeDirectuffer(java.nio.ByteBuffer)
*/
- public void freeDirectuffer(ByteBuffer buffer)
+ public void freeDirectBuffer(ByteBuffer buffer)
{
journalFF.releaseBuffer(buffer);
}
Modified:
branches/Branch_2_2_AS7/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java
===================================================================
---
branches/Branch_2_2_AS7/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java 2012-01-13
21:50:04 UTC (rev 12026)
+++
branches/Branch_2_2_AS7/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java 2012-01-13
22:59:31 UTC (rev 12027)
@@ -617,7 +617,7 @@
/* (non-Javadoc)
* @see
org.hornetq.core.persistence.StorageManager#freeDirectuffer(java.nio.ByteBuffer)
*/
- public void freeDirectuffer(ByteBuffer buffer)
+ public void freeDirectBuffer(ByteBuffer buffer)
{
// We can just have hope on GC here :-)
}
Modified:
branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java
===================================================================
---
branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java 2012-01-13
21:50:04 UTC (rev 12026)
+++
branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java 2012-01-13
22:59:31 UTC (rev 12027)
@@ -1737,7 +1737,7 @@
/* (non-Javadoc)
* @see
org.hornetq.core.persistence.StorageManager#freeDirectuffer(java.nio.ByteBuffer)
*/
- public void freeDirectuffer(ByteBuffer buffer)
+ public void freeDirectBuffer(ByteBuffer buffer)
{
// TODO Auto-generated method stub