[jboss-cvs] JBoss Messaging SVN: r7141 - in branches/Branch_JBM2_Perf_Clebert: tests/src/org/jboss/messaging/tests/performance/journal and 2 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Sun May 31 00:00:53 EDT 2009


Author: clebert.suconic at jboss.com
Date: 2009-05-31 00:00:52 -0400 (Sun, 31 May 2009)
New Revision: 7141

Added:
   branches/Branch_JBM2_Perf_Clebert/tests/src/org/jboss/messaging/tests/unit/core/asyncio/timedbuffer/
   branches/Branch_JBM2_Perf_Clebert/tests/src/org/jboss/messaging/tests/unit/core/asyncio/timedbuffer/TimedBufferTest.java
Modified:
   branches/Branch_JBM2_Perf_Clebert/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java
   branches/Branch_JBM2_Perf_Clebert/tests/src/org/jboss/messaging/tests/performance/journal/PerformanceComparissonTest.java
Log:
Journal work

Modified: branches/Branch_JBM2_Perf_Clebert/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java
===================================================================
--- branches/Branch_JBM2_Perf_Clebert/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java	2009-05-30 22:49:25 UTC (rev 7140)
+++ branches/Branch_JBM2_Perf_Clebert/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java	2009-05-31 04:00:52 UTC (rev 7141)
@@ -43,9 +43,10 @@
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import org.jboss.messaging.core.buffers.ChannelBuffer;
 import org.jboss.messaging.core.buffers.ChannelBuffers;
@@ -194,11 +195,7 @@
 
    private ExecutorService filesExecutor = null;
 
-   /**
-    * Used to lock access while calculating the positioning of currentFile.
-    * That has to be done in single-thread, and it needs to be a very-fast operation
-    */
-   private final Semaphore lock = new Semaphore(1, true);
+   private final Lock lock = new ReentrantReadWriteLock().writeLock();
 
    private volatile JournalFile currentFile;
 
@@ -219,11 +216,11 @@
    {
       if (fileSize < MIN_FILE_SIZE)
       {
-         throw new IllegalArgumentException("File bufferSize cannot be less than " + MIN_FILE_SIZE + " bytes");
+         throw new IllegalArgumentException("File size cannot be less than " + MIN_FILE_SIZE + " bytes");
       }
       if (fileSize % fileFactory.getAlignment() != 0)
       {
-         throw new IllegalArgumentException("Invalid journal-file-bufferSize " + fileSize +
+         throw new IllegalArgumentException("Invalid journal-file-size " + fileSize +
                                             ", It should be multiple of " +
                                             fileFactory.getAlignment());
       }
@@ -299,9 +296,17 @@
       record.encode(bb);
       bb.writeInt(size);
 
-      JournalFile usedFile = appendRecord(bb.toByteBuffer(), sync, null);
+      lock.lock();
+      try
+      {
+         JournalFile usedFile = appendRecord(bb.toByteBuffer(), sync, null);
 
-      posFilesMap.put(id, new PosFiles(usedFile));
+         posFilesMap.put(id, new PosFiles(usedFile));
+      }
+      finally
+      {
+         lock.unlock();
+      }
    }
 
    public void appendUpdateRecord(final long id, final byte recordType, final byte[] record) throws Exception
@@ -335,9 +340,17 @@
       record.encode(bb);
       bb.writeInt(size);
 
-      JournalFile usedFile = appendRecord(bb.toByteBuffer(), syncNonTransactional, null);
+      lock.lock();
+      try
+      {
+         JournalFile usedFile = appendRecord(bb.toByteBuffer(), syncNonTransactional, null);
 
-      posFiles.addUpdateFile(usedFile);
+         posFiles.addUpdateFile(usedFile);
+      }
+      finally
+      {
+         lock.unlock();
+      }
    }
 
    public void appendDeleteRecord(final long id) throws Exception
@@ -363,9 +376,17 @@
       bb.putLong(id);
       bb.putInt(size);
 
-      JournalFile usedFile = appendRecord(bb, syncNonTransactional, null);
+      lock.lock();
+      try
+      {
+         JournalFile usedFile = appendRecord(bb, syncNonTransactional, null);
 
-      posFiles.addDelete(usedFile);
+         posFiles.addDelete(usedFile);
+      }
+      finally
+      {
+         lock.unlock();
+      }
    }
 
    public void appendAddRecordTransactional(final long txID, final long id, final byte recordType, final byte[] record) throws Exception
@@ -399,11 +420,19 @@
       record.encode(bb);
       bb.writeInt(size);
 
-      JournalFile usedFile = appendRecord(bb.toByteBuffer(), false, getTransactionCallback(txID));
+      lock.lock();
+      try
+      {
+         JournalFile usedFile = appendRecord(bb.toByteBuffer(), false, getTransactionCallback(txID));
 
-      JournalTransaction tx = getTransactionInfo(txID);
+         JournalTransaction tx = getTransactionInfo(txID);
 
-      tx.addPositive(usedFile, id);
+         tx.addPositive(usedFile, id);
+      }
+      finally
+      {
+         lock.unlock();
+      }
    }
 
    public void appendUpdateRecordTransactional(final long txID,
@@ -437,11 +466,19 @@
       record.encode(bb);
       bb.writeInt(size);
 
-      JournalFile usedFile = appendRecord(bb.toByteBuffer(), false, getTransactionCallback(txID));
+      lock.lock();
+      try
+      {
+         JournalFile usedFile = appendRecord(bb.toByteBuffer(), false, getTransactionCallback(txID));
 
-      JournalTransaction tx = getTransactionInfo(txID);
+         JournalTransaction tx = getTransactionInfo(txID);
 
-      tx.addPositive(usedFile, id);
+         tx.addPositive(usedFile, id);
+      }
+      finally
+      {
+         lock.unlock();
+      }
    }
 
    public void appendDeleteRecordTransactional(final long txID, final long id, final byte[] record) throws Exception
@@ -469,14 +506,21 @@
       {
          record.encode(bb);
       }
-
       bb.writeInt(size);
 
-      JournalFile usedFile = appendRecord(bb.toByteBuffer(), false, getTransactionCallback(txID));
+      lock.lock();
+      try
+      {
+         JournalFile usedFile = appendRecord(bb.toByteBuffer(), false, getTransactionCallback(txID));
 
-      JournalTransaction tx = getTransactionInfo(txID);
+         JournalTransaction tx = getTransactionInfo(txID);
 
-      tx.addNegative(usedFile, id);
+         tx.addNegative(usedFile, id);
+      }
+      finally
+      {
+         lock.unlock();
+      }
    }
 
    public void appendDeleteRecordTransactional(final long txID, final long id) throws Exception
@@ -497,11 +541,19 @@
       bb.writeInt(0);
       bb.writeInt(size);
 
-      JournalFile usedFile = appendRecord(bb.toByteBuffer(), false, getTransactionCallback(txID));
+      lock.lock();
+      try
+      {
+         JournalFile usedFile = appendRecord(bb.toByteBuffer(), false, getTransactionCallback(txID));
 
-      JournalTransaction tx = getTransactionInfo(txID);
+         JournalTransaction tx = getTransactionInfo(txID);
 
-      tx.addNegative(usedFile, id);
+         tx.addNegative(usedFile, id);
+      }
+      finally
+      {
+         lock.unlock();
+      }
    }
 
    /** 
@@ -530,9 +582,17 @@
 
       TransactionCallback callback = getTransactionCallback(txID);
 
-      JournalFile usedFile = appendRecord(bb, syncTransactional, callback);
+      lock.lock();
+      try
+      {
+         JournalFile usedFile = appendRecord(bb, syncTransactional, callback);
 
-      tx.prepare(usedFile);
+         tx.prepare(usedFile);
+      }
+      finally
+      {
+         lock.unlock();
+      }
 
       // We should wait this outside of the lock, to increase throughput
       if (callback != null)
@@ -576,11 +636,19 @@
 
       TransactionCallback callback = getTransactionCallback(txID);
 
-      JournalFile usedFile = appendRecord(bb, syncTransactional, callback);
+      lock.lock();
+      try
+      {
+         JournalFile usedFile = appendRecord(bb, syncTransactional, callback);
 
-      transactionCallbacks.remove(txID);
+         transactionCallbacks.remove(txID);
 
-      tx.commit(usedFile);
+         tx.commit(usedFile);
+      }
+      finally
+      {
+         lock.unlock();
+      }
 
       // We should wait this outside of the lock, to increase throuput
       if (callback != null)
@@ -615,11 +683,19 @@
 
       TransactionCallback callback = getTransactionCallback(txID);
 
-      JournalFile usedFile = appendRecord(bb, syncTransactional, callback);
+      lock.lock();
+      try
+      {
+         JournalFile usedFile = appendRecord(bb, syncTransactional, callback);
 
-      transactionCallbacks.remove(txID);
+         transactionCallbacks.remove(txID);
 
-      tx.rollback(usedFile);
+         tx.rollback(usedFile);
+      }
+      finally
+      {
+         lock.unlock();
+      }
 
       // We should wait this outside of the lock, to increase throuput
       if (callback != null)
@@ -684,11 +760,11 @@
     *   <tr><td>RecordID</td><td>Long (8 bytes)</td></tr>
     *   <tr><td>BodySize(Add, update and delete)</td><td>Integer (4 bytes)</td></tr>
     *   <tr><td>UserDefinedRecordType (If add/update only)</td><td>Byte (1)</td</tr>
-    *   <tr><td>RecordBody</td><td>Byte Array (bufferSize=BodySize)</td></tr>
+    *   <tr><td>RecordBody</td><td>Byte Array (size=BodySize)</td></tr>
     *   <tr><td>Check Size</td><td>Integer (4 bytes)</td></tr>
     * </table>
     * 
-    * <p> The check-bufferSize is used to validate if the record is valid and complete </p>
+    * <p> The check-size is used to validate if the record is valid and complete </p>
     * 
     * <p>Commit/Prepare record layout:</p>
     * <table border=1>
@@ -736,10 +812,10 @@
          {
             // FIXME - We should extract everything we can from this file
             // and then we shouldn't ever reuse this file on reclaiming (instead
-            // reclaim on different bufferSize files would aways throw the file away)
+            // reclaim on different size files would aways throw the file away)
             // rather than throw ISE!
             // We don't want to leave the user with an unusable system
-            throw new IllegalStateException("File is wrong bufferSize " + bytesRead +
+            throw new IllegalStateException("File is wrong size " + bytesRead +
                                             " expected " +
                                             fileSize +
                                             " : " +
@@ -777,19 +853,6 @@
             // This is what supports us from not re-filling the whole file
             int readFileId = wholeFileBuffer.getInt();
 
-            // IV - This record is from a previous file-usage. The file was
-            // reused and we need to ignore this record
-            if (readFileId != file.getOrderingID())
-            {
-               // If a file has damaged records, we make it a dataFile, and the
-               // next reclaiming will fix it
-               hasData = true;
-
-               wholeFileBuffer.position(pos + 1);
-
-               continue;
-            }
-
             long transactionID = 0;
 
             if (isTransaction(recordType))
@@ -816,9 +879,9 @@
                maxID = Math.max(maxID, recordID);
             }
 
-            // We use the bufferSize of the record to validate the health of the
+            // We use the size of the record to validate the health of the
             // record.
-            // (V) We verify the bufferSize of the record
+            // (V) We verify the size of the record
 
             // The variable record portion used on Updates and Appends
             int variableSize = 0;
@@ -842,6 +905,8 @@
                if (wholeFileBuffer.position() + variableSize > fileSize)
                {
                   log.warn("Record at position " + pos +
+                           " type = " +
+                           recordType +
                            " file:" +
                            file.getFile().getFileName() +
                            " is corrupted and it is being ignored");
@@ -862,17 +927,17 @@
             {
                if (recordType == PREPARE_RECORD)
                {
-                  // Add the variable bufferSize required for preparedTransactions
+                  // Add the variable size required for preparedTransactions
                   preparedTransactionExtraDataSize = wholeFileBuffer.getInt();
                }
                // Both commit and record contain the recordSummary, and this is
-               // used to calculate the record-bufferSize on both record-types
+               // used to calculate the record-size on both record-types
                variableSize += wholeFileBuffer.getInt() * SIZE_INT * 2;
             }
 
             int recordSize = getRecordSize(recordType);
 
-            // VI - this is completing V, We will validate the bufferSize at the end
+            // VI - this is completing V, We will validate the size at the end
             // of the record,
             // But we avoid buffer overflows by damaged data
             if (pos + recordSize + variableSize + preparedTransactionExtraDataSize > fileSize)
@@ -880,9 +945,11 @@
                // Avoid a buffer overflow caused by damaged data... continue
                // scanning for more records...
                log.warn("Record at position " + pos +
+                        " recordType = " +
+                        recordType +
                         " file:" +
                         file.getFile().getFileName() +
-                        " is corrupted and it is being ignored");
+                        " is corrupted and it is being ignored (II)");
                // If a file has damaged records, we make it a dataFile, and the
                // next reclaiming will fix it
                hasData = true;
@@ -896,16 +963,18 @@
 
             int checkSize = wholeFileBuffer.getInt();
 
-            // VII - The checkSize at the end has to match with the bufferSize
+            // VII - The checkSize at the end has to match with the size
             // informed at the beggining.
             // This is like testing a hash for the record. (We could replace the
             // checkSize by some sort of calculated hash)
             if (checkSize != variableSize + recordSize + preparedTransactionExtraDataSize)
             {
                log.warn("Record at position " + pos +
+                        " recordType = " +
+                        recordType +
                         " file:" +
                         file.getFile().getFileName() +
-                        " is corrupted and it is being ignored");
+                        " is corrupted and it is being ignored (III)");
 
                // If a file has damaged records, we make it a dataFile, and the
                // next reclaiming will fix it
@@ -916,6 +985,17 @@
                continue;
             }
 
+            // This record is from a previous file-usage. The file was
+            // reused and we need to ignore this record
+            if (readFileId != file.getOrderingID())
+            {
+               // If a file has damaged records, we make it a dataFile, and the
+               // next reclaiming will fix it
+               hasData = true;
+
+               continue;
+            }
+
             wholeFileBuffer.position(oldPos);
 
             // At this point everything is checked. So we relax and just load
@@ -1204,7 +1284,7 @@
 
       // Create any more files we need
 
-      // FIXME - bufferSize() involves a scan
+      // FIXME - size() involves a scan
       int filesToCreate = minFiles - (dataFiles.size() + freeFiles.size());
 
       if (filesToCreate > 0)
@@ -1391,7 +1471,7 @@
 
             dataFiles.remove(file);
 
-            // FIXME - bufferSize() involves a scan!!!
+            // FIXME - size() involves a scan!!!
             if (freeFiles.size() + dataFiles.size() + 1 + openedFiles.size() < minFiles)
             {
                // Re-initialise it
@@ -1501,7 +1581,7 @@
          throw new IllegalStateException("Journal is already stopped");
       }
 
-      lock.acquire();
+      lock.lock();
 
       try
       {
@@ -1536,7 +1616,7 @@
       }
       finally
       {
-         lock.release();
+         lock.unlock();
       }
    }
 
@@ -1744,7 +1824,7 @@
 
    private int getRecordSize(final byte recordType)
    {
-      // The record bufferSize (without the variable portion)
+      // The record size (without the variable portion)
       int recordSize = 0;
       switch (recordType)
       {
@@ -1845,11 +1925,8 @@
     * */
    private JournalFile appendRecord(final ByteBuffer bb, final boolean sync, final TransactionCallback callback) throws Exception
    {
-      lock.acquire();
+      lock.lock();
 
-      // TOOD: when we add the timer on AIO, we need to make sure this routine locks buffered timer somehow, as the
-      // offSet verification can't happen in the middle of the buffered timer
-
       try
       {
          if (state != STATE_LOADED)
@@ -1908,8 +1985,7 @@
       finally
       {
          currentFile.getFile().unlockBuffer();
-
-         lock.release();
+         lock.unlock();
       }
 
    }
@@ -1974,9 +2050,17 @@
    // You need to guarantee lock.acquire() before calling this method
    private void moveNextFile() throws InterruptedException
    {
-      closeFile(currentFile);
+      lock.lock();
+      try
+      {
+         closeFile(currentFile);
 
-      currentFile = enqueueOpenFile();
+         currentFile = enqueueOpenFile();
+      }
+      finally
+      {
+         lock.unlock();
+      }
    }
 
    /** 

Modified: branches/Branch_JBM2_Perf_Clebert/tests/src/org/jboss/messaging/tests/performance/journal/PerformanceComparissonTest.java
===================================================================
--- branches/Branch_JBM2_Perf_Clebert/tests/src/org/jboss/messaging/tests/performance/journal/PerformanceComparissonTest.java	2009-05-30 22:49:25 UTC (rev 7140)
+++ branches/Branch_JBM2_Perf_Clebert/tests/src/org/jboss/messaging/tests/performance/journal/PerformanceComparissonTest.java	2009-05-31 04:00:52 UTC (rev 7141)
@@ -289,7 +289,7 @@
       
    }
 
-   public void disabled_testDeleteme() throws Exception
+   public void testDeleteme() throws Exception
    {
 
       JournalImpl journal = new JournalImpl(1024 * 1024 * 10, // 10M.. we believe that's the usual cilinder

Added: branches/Branch_JBM2_Perf_Clebert/tests/src/org/jboss/messaging/tests/unit/core/asyncio/timedbuffer/TimedBufferTest.java
===================================================================
--- branches/Branch_JBM2_Perf_Clebert/tests/src/org/jboss/messaging/tests/unit/core/asyncio/timedbuffer/TimedBufferTest.java	                        (rev 0)
+++ branches/Branch_JBM2_Perf_Clebert/tests/src/org/jboss/messaging/tests/unit/core/asyncio/timedbuffer/TimedBufferTest.java	2009-05-31 04:00:52 UTC (rev 7141)
@@ -0,0 +1,133 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2009, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+
+package org.jboss.messaging.tests.unit.core.asyncio.timedbuffer;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.jboss.messaging.core.asyncio.AIOCallback;
+import org.jboss.messaging.core.asyncio.timedbuffer.TimedBuffer;
+import org.jboss.messaging.core.asyncio.timedbuffer.TimedBufferObserver;
+import org.jboss.messaging.tests.util.UnitTestCase;
+
+/**
+ * A TimedBufferTest
+ *
+ * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public class TimedBufferTest extends UnitTestCase
+{
+
+   // Constants -----------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   // Public --------------------------------------------------------
+   
+   AIOCallback dummyCallback = new AIOCallback()
+   {
+
+      public void done()
+      {
+      }
+
+      public void onError(int errorCode, String errorMessage)
+      {
+      }
+   };
+
+   
+   public void testFillBuffer()
+   {
+      final ArrayList<ByteBuffer> buffers = new ArrayList<ByteBuffer>();
+      final AtomicInteger flushTimes = new AtomicInteger(0);
+      class TestObserver implements TimedBufferObserver
+      {
+         public void flushBuffer(ByteBuffer buffer, List<AIOCallback> callbacks)
+         {
+            buffers.add(buffer);
+            flushTimes.incrementAndGet();
+         }
+
+         /* (non-Javadoc)
+          * @see org.jboss.messaging.core.asyncio.timedbuffer.TimedBufferObserver#newBuffer(int, int)
+          */
+         public ByteBuffer newBuffer(int minSize, int maxSize)
+         {
+            return ByteBuffer.allocate(maxSize);
+         }
+      }
+      
+      TimedBuffer timedBuffer = new TimedBuffer(new TestObserver(), 100, 3600 * 1000); // Any big timeout
+      
+      int x = 0;
+      for (int i = 0 ; i < 10; i++)
+      {
+         ByteBuffer record = ByteBuffer.allocate(10);
+         for (int j = 0 ; j < 10; j++)
+         {
+            record.put((byte)getSamplebyte(x++));
+         }
+         
+         record.rewind();
+         timedBuffer.addBytes(record, dummyCallback);
+      }
+      
+      
+      assertEquals(1, flushTimes.get());
+      
+      ByteBuffer flushedBuffer = buffers.get(0);
+      
+      assertEquals(100, flushedBuffer.limit());
+      
+      assertEquals(100, flushedBuffer.capacity());
+      
+
+      flushedBuffer.rewind();
+      
+      for (int i = 0; i < 100; i++)
+      {
+         assertEquals(getSamplebyte(i), flushedBuffer.get());
+      }
+      
+      
+   }
+   
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+
+}




More information about the jboss-cvs-commits mailing list