[hornetq-commits] JBoss hornetq SVN: r12027 - in branches/Branch_2_2_AS7: src/main/org/hornetq/core/paging/impl and 4 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Fri Jan 13 17:59:32 EST 2012


Author: clebert.suconic at jboss.com
Date: 2012-01-13 17:59:31 -0500 (Fri, 13 Jan 2012)
New Revision: 12027

Modified:
   branches/Branch_2_2_AS7/src/main/org/hornetq/core/journal/impl/NIOSequentialFileFactory.java
   branches/Branch_2_2_AS7/src/main/org/hornetq/core/paging/impl/PageImpl.java
   branches/Branch_2_2_AS7/src/main/org/hornetq/core/persistence/StorageManager.java
   branches/Branch_2_2_AS7/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
   branches/Branch_2_2_AS7/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java
   branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java
Log:
Merging changes from EAP branch

Modified: branches/Branch_2_2_AS7/src/main/org/hornetq/core/journal/impl/NIOSequentialFileFactory.java
===================================================================
--- branches/Branch_2_2_AS7/src/main/org/hornetq/core/journal/impl/NIOSequentialFileFactory.java	2012-01-13 21:50:04 UTC (rev 12026)
+++ branches/Branch_2_2_AS7/src/main/org/hornetq/core/journal/impl/NIOSequentialFileFactory.java	2012-01-13 22:59:31 UTC (rev 12027)
@@ -13,13 +13,13 @@
 
 package org.hornetq.core.journal.impl;
 
+import java.lang.ref.WeakReference;
 import java.nio.ByteBuffer;
 
 import org.hornetq.core.config.impl.ConfigurationImpl;
 import org.hornetq.core.journal.IOCriticalErrorListener;
 import org.hornetq.core.journal.SequentialFile;
 import org.hornetq.core.journal.SequentialFileFactory;
-import org.hornetq.core.logging.Logger;
 
 /**
  * 
@@ -31,8 +31,6 @@
  */
 public class NIOSequentialFileFactory extends AbstractSequentialFileFactory implements SequentialFileFactory
 {
-   private static final Logger log = Logger.getLogger(NIOSequentialFileFactory.class);
-
    public NIOSequentialFileFactory(final String journalDir)
    {
       this(journalDir, null);
@@ -103,7 +101,36 @@
    
    public ByteBuffer allocateDirectBuffer(final int size)
    {
-      return ByteBuffer.allocateDirect(size);
+      // Using direct buffer, as described on https://jira.jboss.org/browse/HORNETQ-467
+      ByteBuffer buffer2 = null;
+      try
+      {
+         buffer2 = ByteBuffer.allocateDirect(size);
+      }
+      catch (OutOfMemoryError error)
+      {
+         // This is a workaround for the way the JDK will deal with native buffers.
+         // the main portion is outside of the VM heap
+         // and the JDK will not have any reference about it to take GC into account
+         // so we force a GC and try again.
+         WeakReference<Object> obj = new WeakReference<Object>(new Object());
+         try
+         {
+            long timeout = System.currentTimeMillis() + 5000;
+            while (System.currentTimeMillis() > timeout && obj.get() != null)
+            {
+               System.gc();
+               Thread.sleep(100);
+            }
+         }
+         catch (InterruptedException e)
+         {
+         }
+         
+         buffer2 = ByteBuffer.allocateDirect(size);
+         
+      }
+      return buffer2;
    }
    
    public void releaseDirectBuffer(ByteBuffer buffer)

Modified: branches/Branch_2_2_AS7/src/main/org/hornetq/core/paging/impl/PageImpl.java
===================================================================
--- branches/Branch_2_2_AS7/src/main/org/hornetq/core/paging/impl/PageImpl.java	2012-01-13 21:50:04 UTC (rev 12026)
+++ branches/Branch_2_2_AS7/src/main/org/hornetq/core/paging/impl/PageImpl.java	2012-01-13 22:59:31 UTC (rev 12027)
@@ -107,68 +107,77 @@
 
    public List<PagedMessage> read(StorageManager storage) throws Exception
    {
-	  if (isDebug)
-	  {
-	     log.debug("reading page " + this.pageId + " on address = " + storeName);
-	  }
-      
+      if (isDebug)
+      {
+         log.debug("reading page " + this.pageId + " on address = " + storeName);
+      }
+
       ArrayList<PagedMessage> messages = new ArrayList<PagedMessage>();
 
       size.set((int)file.size());
       // Using direct buffer, as described on https://jira.jboss.org/browse/HORNETQ-467
-      ByteBuffer buffer2 = ByteBuffer.allocateDirect(size.get());
-      
-      file.position(0);
-      file.read(buffer2);
+      ByteBuffer directBuffer = storage.allocateDirectBuffer((int)file.size());
 
-      buffer2.rewind();
+      try
+      {
 
-      HornetQBuffer fileBuffer = HornetQBuffers.wrappedBuffer(buffer2);
-      fileBuffer.writerIndex(fileBuffer.capacity());
+         file.position(0);
+         file.read(directBuffer);
 
-      while (fileBuffer.readable())
-      {
-         final int position = fileBuffer.readerIndex();
+         directBuffer.rewind();
 
-         byte byteRead = fileBuffer.readByte();
+         HornetQBuffer fileBuffer = HornetQBuffers.wrappedBuffer(directBuffer);
+         fileBuffer.writerIndex(fileBuffer.capacity());
 
-         if (byteRead == PageImpl.START_BYTE)
+         while (fileBuffer.readable())
          {
-            if (fileBuffer.readerIndex() + DataConstants.SIZE_INT < fileBuffer.capacity())
+            final int position = fileBuffer.readerIndex();
+
+            byte byteRead = fileBuffer.readByte();
+
+            if (byteRead == PageImpl.START_BYTE)
             {
-               int messageSize = fileBuffer.readInt();
-               int oldPos = fileBuffer.readerIndex();
-               if (fileBuffer.readerIndex() + messageSize < fileBuffer.capacity() && fileBuffer.getByte(oldPos + messageSize) == PageImpl.END_BYTE)
+               if (fileBuffer.readerIndex() + DataConstants.SIZE_INT < fileBuffer.capacity())
                {
-                  PagedMessage msg = new PagedMessageImpl();
-                  msg.decode(fileBuffer);
-                  byte b = fileBuffer.readByte();
-                  if (b != PageImpl.END_BYTE)
+                  int messageSize = fileBuffer.readInt();
+                  int oldPos = fileBuffer.readerIndex();
+                  if (fileBuffer.readerIndex() + messageSize < fileBuffer.capacity() && fileBuffer.getByte(oldPos + messageSize) == PageImpl.END_BYTE)
                   {
-                     // Sanity Check: This would only happen if there is a bug on decode or any internal code, as this
-                     // constraint was already checked
-                     throw new IllegalStateException("Internal error, it wasn't possible to locate END_BYTE " + b);
+                     PagedMessage msg = new PagedMessageImpl();
+                     msg.decode(fileBuffer);
+                     byte b = fileBuffer.readByte();
+                     if (b != PageImpl.END_BYTE)
+                     {
+                        // Sanity Check: This would only happen if there is a bug on decode or any internal code, as
+                        // this
+                        // constraint was already checked
+                        throw new IllegalStateException("Internal error, it wasn't possible to locate END_BYTE " + b);
+                     }
+                     msg.initMessage(storage);
+                     if (isTrace)
+                     {
+                        log.trace("Reading message " + msg + " on pageId=" + this.pageId + " for address=" + storeName);
+                     }
+                     messages.add(msg);
                   }
-                  msg.initMessage(storage);
-                  if (isTrace)
+                  else
                   {
-                     log.trace("Reading message " + msg + " on pageId=" + this.pageId + " for address=" + storeName);
+                     markFileAsSuspect(position, messages.size());
+                     break;
                   }
-                  messages.add(msg);
                }
-               else
-               {
-                  markFileAsSuspect(position, messages.size());
-                  break;
-               }
             }
+            else
+            {
+               markFileAsSuspect(position, messages.size());
+               break;
+            }
          }
-         else
-         {
-            markFileAsSuspect(position, messages.size());
-            break;
-         }
       }
+      finally
+      {
+         storage.freeDirectBuffer(directBuffer);
+      }
 
       numberOfMessages.set(messages.size());
 

Modified: branches/Branch_2_2_AS7/src/main/org/hornetq/core/persistence/StorageManager.java
===================================================================
--- branches/Branch_2_2_AS7/src/main/org/hornetq/core/persistence/StorageManager.java	2012-01-13 21:50:04 UTC (rev 12026)
+++ branches/Branch_2_2_AS7/src/main/org/hornetq/core/persistence/StorageManager.java	2012-01-13 22:59:31 UTC (rev 12027)
@@ -123,7 +123,7 @@
    /** AIO has an optimized buffer which has a method to release it
        instead of the way NIO will release data based on GC.
        These methods will use that buffer if the inner method supports it */
-   void freeDirectuffer(ByteBuffer buffer);
+   void freeDirectBuffer(ByteBuffer buffer);
    
 
    void clearContext();

Modified: branches/Branch_2_2_AS7/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- branches/Branch_2_2_AS7/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java	2012-01-13 21:50:04 UTC (rev 12026)
+++ branches/Branch_2_2_AS7/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java	2012-01-13 22:59:31 UTC (rev 12027)
@@ -1648,7 +1648,7 @@
    /* (non-Javadoc)
     * @see org.hornetq.core.persistence.StorageManager#freeDirectuffer(java.nio.ByteBuffer)
     */
-   public void freeDirectuffer(ByteBuffer buffer)
+   public void freeDirectBuffer(ByteBuffer buffer)
    {
       journalFF.releaseBuffer(buffer);
    }

Modified: branches/Branch_2_2_AS7/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java
===================================================================
--- branches/Branch_2_2_AS7/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java	2012-01-13 21:50:04 UTC (rev 12026)
+++ branches/Branch_2_2_AS7/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java	2012-01-13 22:59:31 UTC (rev 12027)
@@ -617,7 +617,7 @@
    /* (non-Javadoc)
     * @see org.hornetq.core.persistence.StorageManager#freeDirectuffer(java.nio.ByteBuffer)
     */
-   public void freeDirectuffer(ByteBuffer buffer)
+   public void freeDirectBuffer(ByteBuffer buffer)
    {
       // We can just have hope on GC here :-)
    }

Modified: branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java
===================================================================
--- branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java	2012-01-13 21:50:04 UTC (rev 12026)
+++ branches/Branch_2_2_AS7/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java	2012-01-13 22:59:31 UTC (rev 12027)
@@ -1737,7 +1737,7 @@
       /* (non-Javadoc)
        * @see org.hornetq.core.persistence.StorageManager#freeDirectuffer(java.nio.ByteBuffer)
        */
-      public void freeDirectuffer(ByteBuffer buffer)
+      public void freeDirectBuffer(ByteBuffer buffer)
       {
          // TODO Auto-generated method stub
          



More information about the hornetq-commits mailing list