[hornetq-commits] JBoss hornetq SVN: r11898 - in branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710: src/main/org/hornetq/core/paging/impl and 4 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Mon Dec 12 16:36:13 EST 2011


Author: clebert.suconic at jboss.com
Date: 2011-12-12 16:36:13 -0500 (Mon, 12 Dec 2011)
New Revision: 11898

Modified:
   branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java
   branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java
   branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/paging/impl/PageImpl.java
   branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/persistence/StorageManager.java
   branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
   branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java
   branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java
Log:
JBPAPP-7655 - The customer is facing an issue that's directly related to the way concurrent paging will happen, and multiple pages being accessed. We are adding a direct allocation on Paging (and releasing it manually what is causing issues with NIO), and we are also dealing with some max-IO on paging

Modified: branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java
===================================================================
--- branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java	2011-12-11 05:23:46 UTC (rev 11897)
+++ branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java	2011-12-12 21:36:13 UTC (rev 11898)
@@ -183,6 +183,7 @@
             {
                page = pagingStore.createPage((int)pageId);
 
+               storageManager.beforePageRead();
                page.open();
 
                List<PagedMessage> pgdMessages = page.read(storageManager);
@@ -200,6 +201,7 @@
                catch (Throwable ignored)
                {
                }
+               storageManager.afterPageRead();
                cache.unlock();
             }
          }
@@ -451,8 +453,26 @@
                // The page is not on cache any more
                // We need to read the page-file before deleting it
                // to make sure we remove any large-messages pending
-               depagedPage.open();
-               List<PagedMessage> pgdMessagesList = depagedPage.read(storageManager);
+               storageManager.beforePageRead();
+               
+               List<PagedMessage> pgdMessagesList = null;
+               try
+               {
+                  depagedPage.open();
+                  pgdMessagesList = depagedPage.read(storageManager);
+               }
+               finally
+               {
+                  try
+                  {
+                     depagedPage.close();
+                  }
+                  catch (Exception e)
+                  {
+                  }
+                  
+                  storageManager.afterPageRead();
+               }
                depagedPage.close();
                pgdMessages = pgdMessagesList.toArray(new PagedMessage[pgdMessagesList.size()]);
             }

Modified: branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java
===================================================================
--- branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java	2011-12-11 05:23:46 UTC (rev 11897)
+++ branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java	2011-12-12 21:36:13 UTC (rev 11898)
@@ -1290,7 +1290,11 @@
       public void remove()
       {
          deliveredCount.incrementAndGet();
-         PageSubscriptionImpl.this.getPageInfo(position).remove(position);
+         PageCursorInfo info =  PageSubscriptionImpl.this.getPageInfo(position);
+         if (info != null)
+         {
+            info.remove(position);
+         }
       }
 
       /* (non-Javadoc)

Modified: branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/paging/impl/PageImpl.java
===================================================================
--- branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/paging/impl/PageImpl.java	2011-12-11 05:23:46 UTC (rev 11897)
+++ branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/paging/impl/PageImpl.java	2011-12-12 21:36:13 UTC (rev 11898)
@@ -107,68 +107,77 @@
 
    public List<PagedMessage> read(StorageManager storage) throws Exception
    {
-     if (isDebug)
-     {
-        log.debug("reading page " + this.pageId + " on address = " + storeName);
-     }
-      
+      if (isDebug)
+      {
+         log.debug("reading page " + this.pageId + " on address = " + storeName);
+      }
+
       ArrayList<PagedMessage> messages = new ArrayList<PagedMessage>();
 
       size.set((int)file.size());
       // Using direct buffer, as described on https://jira.jboss.org/browse/HORNETQ-467
-      ByteBuffer buffer2 = ByteBuffer.allocateDirect(size.get());
-      
-      file.position(0);
-      file.read(buffer2);
+      ByteBuffer directBuffer = storage.allocateDirectBuffer((int)file.size());
 
-      buffer2.rewind();
+      try
+      {
 
-      HornetQBuffer fileBuffer = HornetQBuffers.wrappedBuffer(buffer2);
-      fileBuffer.writerIndex(fileBuffer.capacity());
+         file.position(0);
+         file.read(directBuffer);
 
-      while (fileBuffer.readable())
-      {
-         final int position = fileBuffer.readerIndex();
+         directBuffer.rewind();
 
-         byte byteRead = fileBuffer.readByte();
+         HornetQBuffer fileBuffer = HornetQBuffers.wrappedBuffer(directBuffer);
+         fileBuffer.writerIndex(fileBuffer.capacity());
 
-         if (byteRead == PageImpl.START_BYTE)
+         while (fileBuffer.readable())
          {
-            if (fileBuffer.readerIndex() + DataConstants.SIZE_INT < fileBuffer.capacity())
+            final int position = fileBuffer.readerIndex();
+
+            byte byteRead = fileBuffer.readByte();
+
+            if (byteRead == PageImpl.START_BYTE)
             {
-               int messageSize = fileBuffer.readInt();
-               int oldPos = fileBuffer.readerIndex();
-               if (fileBuffer.readerIndex() + messageSize < fileBuffer.capacity() && fileBuffer.getByte(oldPos + messageSize) == PageImpl.END_BYTE)
+               if (fileBuffer.readerIndex() + DataConstants.SIZE_INT < fileBuffer.capacity())
                {
-                  PagedMessage msg = new PagedMessageImpl();
-                  msg.decode(fileBuffer);
-                  byte b = fileBuffer.readByte();
-                  if (b != PageImpl.END_BYTE)
+                  int messageSize = fileBuffer.readInt();
+                  int oldPos = fileBuffer.readerIndex();
+                  if (fileBuffer.readerIndex() + messageSize < fileBuffer.capacity() && fileBuffer.getByte(oldPos + messageSize) == PageImpl.END_BYTE)
                   {
-                     // Sanity Check: This would only happen if there is a bug on decode or any internal code, as this
-                     // constraint was already checked
-                     throw new IllegalStateException("Internal error, it wasn't possible to locate END_BYTE " + b);
+                     PagedMessage msg = new PagedMessageImpl();
+                     msg.decode(fileBuffer);
+                     byte b = fileBuffer.readByte();
+                     if (b != PageImpl.END_BYTE)
+                     {
+                        // Sanity Check: This would only happen if there is a bug on decode or any internal code, as
+                        // this
+                        // constraint was already checked
+                        throw new IllegalStateException("Internal error, it wasn't possible to locate END_BYTE " + b);
+                     }
+                     msg.initMessage(storage);
+                     if (isTrace)
+                     {
+                        log.trace("Reading message " + msg + " on pageId=" + this.pageId + " for address=" + storeName);
+                     }
+                     messages.add(msg);
                   }
-                  msg.initMessage(storage);
-                  if (isTrace)
+                  else
                   {
-                     log.trace("Reading message " + msg + " on pageId=" + this.pageId + " for address=" + storeName);
+                     markFileAsSuspect(position, messages.size());
+                     break;
                   }
-                  messages.add(msg);
                }
-               else
-               {
-                  markFileAsSuspect(position, messages.size());
-                  break;
-               }
             }
+            else
+            {
+               markFileAsSuspect(position, messages.size());
+               break;
+            }
          }
-         else
-         {
-            markFileAsSuspect(position, messages.size());
-            break;
-         }
       }
+      finally
+      {
+         storage.freeDirectuffer(directBuffer);
+      }
 
       numberOfMessages.set(messages.size());
 

Modified: branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/persistence/StorageManager.java
===================================================================
--- branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/persistence/StorageManager.java	2011-12-11 05:23:46 UTC (rev 11897)
+++ branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/persistence/StorageManager.java	2011-12-12 21:36:13 UTC (rev 11898)
@@ -13,6 +13,7 @@
 
 package org.hornetq.core.persistence;
 
+import java.nio.ByteBuffer;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.Executor;
@@ -88,6 +89,34 @@
     *           in case of the pools are full
     * @throws Exception */
    void waitOnOperations() throws Exception;
+   
+   /**
+    * We need a safeguard in place to avoid too much concurrent IO happening on Paging,
+    * otherwise the system may become irrensponsive if too many destinations are reading all the same time.
+    * This is called before we read, so we can limit concurrent reads
+    * @throws Exception
+    */
+   void beforePageRead() throws Exception;
+   
+   /**
+    * We need a safeguard in place to avoid too much concurrent IO happening on Paging,
+    * otherwise the system may become irrensponsive if too many destinations are reading all the same time.
+    * This is called after we read, so we can limit concurrent reads
+    * @throws Exception
+    */
+   void afterPageRead() throws Exception;
+   
+   
+   /** AIO has an optimized buffer which has a method to release it
+       instead of the way NIO will release data based on GC.
+       These methods will use that buffer if the inner method supports it */
+   ByteBuffer allocateDirectBuffer(int size);
+   
+   /** AIO has an optimized buffer which has a method to release it
+       instead of the way NIO will release data based on GC.
+       These methods will use that buffer if the inner method supports it */
+   void freeDirectuffer(ByteBuffer buffer);
+   
 
    void clearContext();
    

Modified: branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java	2011-12-11 05:23:46 UTC (rev 11897)
+++ branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java	2011-12-12 21:36:13 UTC (rev 11898)
@@ -166,6 +166,8 @@
    private final Journal bindingsJournal;
 
    private final SequentialFileFactory largeMessagesFactory;
+   
+   private SequentialFileFactory journalFF = null;
 
    private volatile boolean started;
 
@@ -261,8 +263,6 @@
 
       syncTransactional = config.isJournalSyncTransactional();
 
-      SequentialFileFactory journalFF = null;
-
       if (config.getJournalType() == JournalType.ASYNCIO)
       {
          JournalStorageManager.log.info("Using AIO Journal");
@@ -1532,6 +1532,41 @@
       return info;
    }
 
+
+   /* (non-Javadoc)
+    * @see org.hornetq.core.persistence.StorageManager#startPageRead()
+    */
+   public void beforePageRead() throws Exception
+   {
+      // TODO Auto-generated method stub
+      
+   }
+
+   /* (non-Javadoc)
+    * @see org.hornetq.core.persistence.StorageManager#finishPageRead()
+    */
+   public void afterPageRead() throws Exception
+   {
+      // TODO Auto-generated method stub
+      
+   }
+
+   /* (non-Javadoc)
+    * @see org.hornetq.core.persistence.StorageManager#allocateDirectBuffer(long)
+    */
+   public ByteBuffer allocateDirectBuffer(int size)
+   {
+      return journalFF.newBuffer(size);
+   }
+
+   /* (non-Javadoc)
+    * @see org.hornetq.core.persistence.StorageManager#freeDirectuffer(java.nio.ByteBuffer)
+    */
+   public void freeDirectuffer(ByteBuffer buffer)
+   {
+      journalFF.releaseBuffer(buffer);
+   }
+
    // Public -----------------------------------------------------------------------------------
 
    public Journal getMessageJournal()

Modified: branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java
===================================================================
--- branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java	2011-12-11 05:23:46 UTC (rev 11897)
+++ branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java	2011-12-12 21:36:13 UTC (rev 11898)
@@ -13,6 +13,7 @@
 
 package org.hornetq.core.persistence.impl.nullpm;
 
+import java.nio.ByteBuffer;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
@@ -569,4 +570,36 @@
       
    }
 
+   /* (non-Javadoc)
+    * @see org.hornetq.core.persistence.StorageManager#startPageRead()
+    */
+   public void beforePageRead() throws Exception
+   {
+      
+   }
+
+   /* (non-Javadoc)
+    * @see org.hornetq.core.persistence.StorageManager#finishPageRead()
+    */
+   public void afterPageRead() throws Exception
+   {
+      
+   }
+
+   /* (non-Javadoc)
+    * @see org.hornetq.core.persistence.StorageManager#allocateDirectBuffer(int)
+    */
+   public ByteBuffer allocateDirectBuffer(int size)
+   {
+      return ByteBuffer.allocateDirect(size);
+   }
+
+   /* (non-Javadoc)
+    * @see org.hornetq.core.persistence.StorageManager#freeDirectuffer(java.nio.ByteBuffer)
+    */
+   public void freeDirectuffer(ByteBuffer buffer)
+   {
+      // nothing to be done.. just wait for GC
+   }
+
 }

Modified: branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java
===================================================================
--- branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java	2011-12-11 05:23:46 UTC (rev 11897)
+++ branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java	2011-12-12 21:36:13 UTC (rev 11898)
@@ -13,6 +13,7 @@
 
 package org.hornetq.tests.unit.core.paging.impl;
 
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -1680,6 +1681,41 @@
          
       }
 
+      /* (non-Javadoc)
+       * @see org.hornetq.core.persistence.StorageManager#startPageRead()
+       */
+      public void beforePageRead() throws Exception
+      {
+         // TODO Auto-generated method stub
+         
+      }
+
+      /* (non-Javadoc)
+       * @see org.hornetq.core.persistence.StorageManager#finishPageRead()
+       */
+      public void afterPageRead() throws Exception
+      {
+         // TODO Auto-generated method stub
+         
+      }
+
+      /* (non-Javadoc)
+       * @see org.hornetq.core.persistence.StorageManager#allocateDirectBuffer(int)
+       */
+      public ByteBuffer allocateDirectBuffer(int size)
+      {
+         return ByteBuffer.allocateDirect(size);
+      }
+
+      /* (non-Javadoc)
+       * @see org.hornetq.core.persistence.StorageManager#freeDirectuffer(java.nio.ByteBuffer)
+       */
+      public void freeDirectuffer(ByteBuffer buffer)
+      {
+         // TODO Auto-generated method stub
+         
+      }
+
    }
 
    class FakeStoreFactory implements PagingStoreFactory



More information about the hornetq-commits mailing list