[jboss-cvs] JBoss Messaging SVN: r7137 - in branches/Branch_JBM2_Perf_Clebert: src/main/org/jboss/messaging/core/journal/impl and 1 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Fri May 29 23:42:18 EDT 2009


Author: clebert.suconic at jboss.com
Date: 2009-05-29 23:42:18 -0400 (Fri, 29 May 2009)
New Revision: 7137

Removed:
   branches/Branch_JBM2_Perf_Clebert/src/main/org/jboss/messaging/core/asyncio/timedbuffer/TimedBufferController.java
Modified:
   branches/Branch_JBM2_Perf_Clebert/src/main/org/jboss/messaging/core/asyncio/timedbuffer/TimedBuffer.java
   branches/Branch_JBM2_Perf_Clebert/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFile.java
   branches/Branch_JBM2_Perf_Clebert/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFileFactory.java
   branches/Branch_JBM2_Perf_Clebert/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java
   branches/Branch_JBM2_Perf_Clebert/tests/src/org/jboss/messaging/tests/performance/journal/PerformanceComparissonTest.java
Log:
Timers Implementation

Modified: branches/Branch_JBM2_Perf_Clebert/src/main/org/jboss/messaging/core/asyncio/timedbuffer/TimedBuffer.java
===================================================================
--- branches/Branch_JBM2_Perf_Clebert/src/main/org/jboss/messaging/core/asyncio/timedbuffer/TimedBuffer.java	2009-05-30 02:05:46 UTC (rev 7136)
+++ branches/Branch_JBM2_Perf_Clebert/src/main/org/jboss/messaging/core/asyncio/timedbuffer/TimedBuffer.java	2009-05-30 03:42:18 UTC (rev 7137)
@@ -25,8 +25,16 @@
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import org.jboss.messaging.core.asyncio.AIOCallback;
+import org.jboss.messaging.utils.JBMThreadFactory;
 
 /**
  * A TimedBuffer
@@ -44,32 +52,37 @@
 
    // Attributes ----------------------------------------------------
 
-   final TimedBufferObserver bufferObserver;
-   
-   final TimedBufferController controller;
+   private final TimedBufferObserver bufferObserver;
 
-   final long timeout;
+   private final CheckTimer timerRunnable = new CheckTimer();
 
-   final int bufferSize;
+   private volatile ScheduledFuture<?> futureTimerRunnable;
 
-   volatile ByteBuffer currentBuffer;
+   private final long timeout;
 
-   volatile List<AIOCallback> callbacks;
+   private final int bufferSize;
+
+   private volatile ByteBuffer currentBuffer;
+
+   private volatile List<AIOCallback> callbacks;
+
+   private volatile long timeLastWrite = 0;
    
-   volatile long writeLastTime = 0;
+   private final Lock lock = new ReentrantReadWriteLock().writeLock();
 
+   private final ScheduledExecutorService schedule = ScheduledSingleton.getScheduledService();
+
    // Static --------------------------------------------------------
 
    // Constructors --------------------------------------------------
 
    // Public --------------------------------------------------------
 
-   public TimedBuffer(final TimedBufferController controller, final TimedBufferObserver bufferObserver, final int size, final long timeout)
+   public TimedBuffer(final TimedBufferObserver bufferObserver, final int size, final long timeout)
    {
-      this.bufferSize = size;
+      bufferSize = size;
       this.bufferObserver = bufferObserver;
       this.timeout = timeout;
-      this.controller = controller;
    }
 
    public int position()
@@ -84,44 +97,88 @@
       }
    }
 
-   public synchronized boolean checkSize(int sizeChecked)
+   public void checkTimer()
    {
+      if (System.currentTimeMillis() - timeLastWrite > timeout)
+      {
+         lock.lock();
+         try
+         {
+            flush();
+         }
+         finally
+         {
+            lock.unlock();
+         }
+      }
+
+   }
+
+   /**
+    * Verify if the size fits the buffer, if it fits we lock the buffer to avoid a flush until add is called
+    * @param sizeChecked
+    * @return
+    */
+   public synchronized boolean checkSize(final int sizeChecked)
+   {
+      final boolean fits; 
       if (sizeChecked > bufferSize)
       {
          flush();
 
          // We transfer the bytes, as the bufferObserver has special alignment restrictions on the buffer addressing
          currentBuffer = bufferObserver.newBuffer(sizeChecked, sizeChecked);
-
-         return currentBuffer != null;
+         
+         fits = currentBuffer != null; 
       }
       else
       {
          // We verify against the currentBuffer.capacity as the observer may return a smaller buffer
-         if (currentBuffer == null || (currentBuffer.position() + sizeChecked) > currentBuffer.limit())
+         if (currentBuffer == null || currentBuffer.position() + sizeChecked > currentBuffer.limit())
          {
             flush();
             newBuffer(sizeChecked);
          }
-         
-         return currentBuffer != null;
-         
+
+         fits = currentBuffer != null; 
       }
+      
+      // If the size fits, we set the lock as we can't have a flush happening until addBytes was called
+      if (fits)
+      {
+         lock.lock();
+      }
+      
+      return fits;
    }
 
    public synchronized void addBytes(final ByteBuffer bytes, final AIOCallback callback)
    {
-      if (currentBuffer == null)
+      try
       {
-         newBuffer(0);
+         if (currentBuffer == null)
+         {
+            newBuffer(0);
+         }
+   
+         currentBuffer.put(bytes);
+         callbacks.add(callback);
+   
+         if (futureTimerRunnable == null)
+         {
+            futureTimerRunnable = schedule.scheduleAtFixedRate(timerRunnable, timeout, timeout, TimeUnit.MILLISECONDS);
+         }
+   
+         timeLastWrite = System.currentTimeMillis();
+   
+         if (currentBuffer.position() == currentBuffer.capacity())
+         {
+            flush();
+         }
       }
-      
-      currentBuffer.put(bytes);
-      callbacks.add(callback);
-      
-      if (currentBuffer.position() == currentBuffer.capacity())
+      finally
       {
-         flush();
+         lock.unlock();
       }
    }
 
@@ -133,6 +190,14 @@
          currentBuffer = null;
          callbacks = null;
       }
+      
+      if (futureTimerRunnable != null)
+      {
+         futureTimerRunnable.cancel(false);
+         futureTimerRunnable = null;
+      }
+
+      timeLastWrite = 0;
    }
 
    // Package protected ---------------------------------------------
@@ -141,7 +206,7 @@
 
    // Private -------------------------------------------------------
 
-   private void newBuffer(int minSize)
+   private void newBuffer(final int minSize)
    {
       currentBuffer = bufferObserver.newBuffer(minSize, bufferSize);
       callbacks = new ArrayList<AIOCallback>();
@@ -149,4 +214,30 @@
 
    // Inner classes -------------------------------------------------
 
+   class CheckTimer implements Runnable
+   {
+      public void run()
+      {
+         checkTimer();
+      }
+   }
+
+   // TODO: is there a better place to get this schedule service from?
+   static class ScheduledSingleton
+   {
+      private static ScheduledExecutorService scheduleService;
+
+      private static synchronized ScheduledExecutorService getScheduledService()
+      {
+         if (scheduleService == null)
+         {
+            ThreadFactory factory = new JBMThreadFactory("JBM-buffer-scheduled-control", true);
+
+            scheduleService = Executors.newScheduledThreadPool(2, factory);
+         }
+
+         return scheduleService;
+      }
+   }
+
 }

Deleted: branches/Branch_JBM2_Perf_Clebert/src/main/org/jboss/messaging/core/asyncio/timedbuffer/TimedBufferController.java
===================================================================
--- branches/Branch_JBM2_Perf_Clebert/src/main/org/jboss/messaging/core/asyncio/timedbuffer/TimedBufferController.java	2009-05-30 02:05:46 UTC (rev 7136)
+++ branches/Branch_JBM2_Perf_Clebert/src/main/org/jboss/messaging/core/asyncio/timedbuffer/TimedBufferController.java	2009-05-30 03:42:18 UTC (rev 7137)
@@ -1,54 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005-2009, Red Hat Middleware LLC, and individual contributors
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-
-
-package org.jboss.messaging.core.asyncio.timedbuffer;
-
-/**
- * A TimedBufferController
- *
- * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
- *
- *
- */
-public class TimedBufferController
-{
-
-   // Constants -----------------------------------------------------
-
-   // Attributes ----------------------------------------------------
-
-   // Static --------------------------------------------------------
-
-   // Constructors --------------------------------------------------
-
-   // Public --------------------------------------------------------
-
-   // Package protected ---------------------------------------------
-
-   // Protected -----------------------------------------------------
-
-   // Private -------------------------------------------------------
-
-   // Inner classes -------------------------------------------------
-
-}

Modified: branches/Branch_JBM2_Perf_Clebert/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFile.java
===================================================================
--- branches/Branch_JBM2_Perf_Clebert/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFile.java	2009-05-30 02:05:46 UTC (rev 7136)
+++ branches/Branch_JBM2_Perf_Clebert/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFile.java	2009-05-30 03:42:18 UTC (rev 7137)
@@ -101,7 +101,7 @@
       this.bufferCallback = bufferCallback;
       this.executor = executor;
       this.pollerExecutor = pollerExecutor;
-      this.timedBuffer = new TimedBuffer(null, new LocalBufferObserver(), bufferSize, bufferTimeoutMilliseconds);
+      this.timedBuffer = new TimedBuffer(new LocalBufferObserver(), bufferSize, bufferTimeoutMilliseconds);
    }
 
    public boolean isOpen()

Modified: branches/Branch_JBM2_Perf_Clebert/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFileFactory.java
===================================================================
--- branches/Branch_JBM2_Perf_Clebert/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFileFactory.java	2009-05-30 02:05:46 UTC (rev 7136)
+++ branches/Branch_JBM2_Perf_Clebert/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFileFactory.java	2009-05-30 03:42:18 UTC (rev 7137)
@@ -77,7 +77,7 @@
 
    public AIOSequentialFileFactory(final String journalDir)
    {
-      this(journalDir, 1024 * 1024, 2);
+      this(journalDir, 1024 * 1024, 1);
    }
 
 

Modified: branches/Branch_JBM2_Perf_Clebert/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java
===================================================================
--- branches/Branch_JBM2_Perf_Clebert/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java	2009-05-30 02:05:46 UTC (rev 7136)
+++ branches/Branch_JBM2_Perf_Clebert/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java	2009-05-30 03:42:18 UTC (rev 7137)
@@ -1868,6 +1868,12 @@
          if (!currentFile.getFile().fits(size))
          {
             moveNextFile();
+            
+            // Needs to do it again to guarantee locking on timed buffers on AIO
+            if (!currentFile.getFile().fits(size))
+            {
+               throw new IllegalStateException("Inconsistence on sizes on journal");
+            }
          }
 
          if (currentFile == null)
@@ -1933,9 +1939,13 @@
       bb.putInt(orderingID);
 
       bb.rewind();
+      
+      sequentialFile.setBuffering(false);
 
       sequentialFile.write(bb, true);
 
+      sequentialFile.setBuffering(true);
+
       JournalFile info = new JournalFileImpl(sequentialFile, orderingID);
 
       if (!keepOpened)

Modified: branches/Branch_JBM2_Perf_Clebert/tests/src/org/jboss/messaging/tests/performance/journal/PerformanceComparissonTest.java
===================================================================
--- branches/Branch_JBM2_Perf_Clebert/tests/src/org/jboss/messaging/tests/performance/journal/PerformanceComparissonTest.java	2009-05-30 02:05:46 UTC (rev 7136)
+++ branches/Branch_JBM2_Perf_Clebert/tests/src/org/jboss/messaging/tests/performance/journal/PerformanceComparissonTest.java	2009-05-30 03:42:18 UTC (rev 7137)
@@ -249,6 +249,33 @@
          testJournal(new NIOSequentialFileFactory(getTestDir()), NUM_RECORDS, SIZE_RECORD, 13, 10 * 1024 * 1024);
       }
    }
+   
+   
+   public void testTransactional() throws Exception
+   {
+      SequentialFileFactory factory = new AIOSequentialFileFactory(getTestDir(), 1024 * 1024, 1);
+      
+      JournalImpl journal = new JournalImpl(1024 * 1024 * 10, // 10M.. we believe that's the usual cilinder
+                                            // size.. not an exact science here
+                                            10, // number of files pre-allocated
+                                            true, // sync on commit
+                                            false, // no sync on non transactional
+                                            factory, // AIO or NIO
+                                            "jbm", // file name
+                                            "jbm", // extension
+                                            500); // it's like a semaphore for callback on the AIO layer
+      
+      journal.start();
+      journal.load(dummyLoader);
+      
+      journal.appendAddRecordTransactional(1, 1, (byte)1, new byte[]{(byte)1});
+      journal.appendCommitRecord(1);
+      
+      journal.stop();
+      
+      
+      
+   }
 
    public void disabled_testDeleteme() throws Exception
    {




More information about the jboss-cvs-commits mailing list