[jboss-cvs] JBoss Messaging SVN: r7185 - in trunk: src/main/org/jboss/messaging/core/asyncio/impl and 10 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Wed Jun 3 16:36:07 EDT 2009


Author: clebert.suconic at jboss.com
Date: 2009-06-03 16:36:06 -0400 (Wed, 03 Jun 2009)
New Revision: 7185

Added:
   trunk/tests/src/org/jboss/messaging/tests/unit/core/asyncio/TimedBufferTest.java
Removed:
   trunk/tests/src/org/jboss/messaging/tests/unit/util/timedbuffer/TimedBufferTest.java
Modified:
   trunk/src/config/common/schema/jbm-configuration.xsd
   trunk/src/main/org/jboss/messaging/core/asyncio/impl/TimedBuffer.java
   trunk/src/main/org/jboss/messaging/core/config/Configuration.java
   trunk/src/main/org/jboss/messaging/core/config/impl/ConfigurationImpl.java
   trunk/src/main/org/jboss/messaging/core/config/impl/FileConfiguration.java
   trunk/src/main/org/jboss/messaging/core/journal/SequentialFile.java
   trunk/src/main/org/jboss/messaging/core/journal/SequentialFileFactory.java
   trunk/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFile.java
   trunk/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFileFactory.java
   trunk/src/main/org/jboss/messaging/core/journal/impl/AbstractSequentialFactory.java
   trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java
   trunk/src/main/org/jboss/messaging/core/journal/impl/NIOSequentialFileFactory.java
   trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java
   trunk/tests/src/org/jboss/messaging/tests/unit/core/config/impl/FileConfigurationTest.java
   trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalAsyncTest.java
   trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalImplTestUnit.java
   trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java
Log:
fix on AIO: Using only one TimedBuffer per AsyncFileFactory

Modified: trunk/src/config/common/schema/jbm-configuration.xsd
===================================================================
--- trunk/src/config/common/schema/jbm-configuration.xsd	2009-06-03 15:57:12 UTC (rev 7184)
+++ trunk/src/config/common/schema/jbm-configuration.xsd	2009-06-03 20:36:06 UTC (rev 7185)
@@ -194,7 +194,10 @@
 				</xsd:element>
 				<xsd:element name="journal-aio-buffer-size"
 					type="xsd:long" maxOccurs="1" minOccurs="0">
-				</xsd:element>				
+				</xsd:element>
+				<xsd:element name="journal-aio-flush-on-sync"
+					type="xsd:boolean" maxOccurs="1" minOccurs="0">
+				</xsd:element>
 				<xsd:element name="journal-sync-transactional"
 					type="xsd:boolean" maxOccurs="1" minOccurs="0">
 				</xsd:element>

Modified: trunk/src/main/org/jboss/messaging/core/asyncio/impl/TimedBuffer.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/asyncio/impl/TimedBuffer.java	2009-06-03 15:57:12 UTC (rev 7184)
+++ trunk/src/main/org/jboss/messaging/core/asyncio/impl/TimedBuffer.java	2009-06-03 20:36:06 UTC (rev 7185)
@@ -30,8 +30,6 @@
 
 import org.jboss.messaging.core.asyncio.AIOCallback;
 import org.jboss.messaging.core.logging.Logger;
-import org.jboss.messaging.utils.TokenBucketLimiter;
-import org.jboss.messaging.utils.TokenBucketLimiterImpl;
 
 /**
  * A TimedBuffer
@@ -48,7 +46,7 @@
 
    // Attributes ----------------------------------------------------
 
-   private final TimedBufferObserver bufferObserver;
+   private TimedBufferObserver bufferObserver;
 
    private CheckTimer timerRunnable = new CheckTimer();
 
@@ -71,6 +69,8 @@
    private Thread timerThread;
    
    private boolean started;
+   
+   private final boolean flushOnSync;
 
    // Static --------------------------------------------------------
 
@@ -78,14 +78,14 @@
 
    // Public --------------------------------------------------------
 
-   public TimedBuffer(final TimedBufferObserver bufferObserver, final int size, final long timeout)
+   public TimedBuffer(final int size, final long timeout, final boolean flushOnSync)
    {
       bufferSize = size;
-      this.bufferObserver = bufferObserver;
       this.timeout = timeout;      
       currentBuffer = ByteBuffer.wrap(new byte[bufferSize]);
       currentBuffer.limit(0);
-      callbacks = new ArrayList<AIOCallback>();      
+      callbacks = new ArrayList<AIOCallback>();
+      this.flushOnSync = flushOnSync;
    }
    
    public synchronized void start()
@@ -127,6 +127,16 @@
       started = false;
    }
 
+   public synchronized void setObserver(TimedBufferObserver observer)
+   {
+      if (this.bufferObserver != null)
+      {
+         flush();
+      }
+      
+      this.bufferObserver = observer;
+   }
+
    public void lock()
    {
       lock.lock();
@@ -177,20 +187,27 @@
    {
       long now = System.nanoTime();
 
+      currentBuffer.put(bytes);
+      callbacks.add(callback);
+
       timeLastAdd = now;
 
       if (sync)
       {
-         // We should flush on the next timeout, no matter what other activity happens on the buffer
-         if (timeLastSync == 0)
+         if (flushOnSync)
          {
-            timeLastSync = now;
+            flush();
          }
+         else
+         {
+            // We should flush on the next timeout, no matter what other activity happens on the buffer
+            if (timeLastSync == 0)
+            {
+               timeLastSync = now;
+            }
+         }
       }
-
-      currentBuffer.put(bytes);
-      callbacks.add(callback);
-
+      
       if (currentBuffer.position() == currentBuffer.capacity())
       {
          flush();
@@ -235,7 +252,10 @@
          lock.lock();
          try
          {            
-            flush();
+            if (bufferObserver != null)
+            {
+               flush();
+            }
          }
          finally
          {

Modified: trunk/src/main/org/jboss/messaging/core/config/Configuration.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/config/Configuration.java	2009-06-03 15:57:12 UTC (rev 7184)
+++ trunk/src/main/org/jboss/messaging/core/config/Configuration.java	2009-06-03 20:36:06 UTC (rev 7185)
@@ -200,10 +200,14 @@
    
    int getAIOBufferSize();
    
-   void setAIOBufferTimeout(long timeout);
+   void setAIOBufferTimeout(int timeout);
    
-   long getAIOBufferTimeout();
+   int getAIOBufferTimeout();
    
+   void setAIOFlushOnSync(boolean flush);
+   
+   boolean isAIOFlushOnSync();
+
    boolean isCreateBindingsDir();
 
    void setCreateBindingsDir(boolean create);

Modified: trunk/src/main/org/jboss/messaging/core/config/impl/ConfigurationImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/config/impl/ConfigurationImpl.java	2009-06-03 15:57:12 UTC (rev 7184)
+++ trunk/src/main/org/jboss/messaging/core/config/impl/ConfigurationImpl.java	2009-06-03 20:36:06 UTC (rev 7185)
@@ -97,8 +97,10 @@
 
    public static final int DEFAULT_JOURNAL_MAX_AIO = 500;
    
-   public static final long DEFAULT_JOURNAL_AIO_BUFFER_TIMEOUT = 500;
+   public static final boolean DEFAULT_JOURNAL_AIO_FLUSH_SYNC = false;
    
+   public static final int DEFAULT_JOURNAL_AIO_BUFFER_TIMEOUT = 1;
+   
    public static final int DEFAULT_JOURNAL_AIO_BUFFER_SIZE = 128 * 1024;
 
    public static final boolean DEFAULT_WILDCARD_ROUTING_ENABLED = true;
@@ -236,9 +238,11 @@
    protected int journalMinFiles = DEFAULT_JOURNAL_MIN_FILES;
 
    protected int journalMaxAIO = DEFAULT_JOURNAL_MAX_AIO;
-     
-   protected long journalAIOBufferTimeout = DEFAULT_JOURNAL_AIO_BUFFER_TIMEOUT;
    
+   protected boolean journalAIOFlushSync = DEFAULT_JOURNAL_AIO_FLUSH_SYNC;
+   
+   protected int journalAIOBufferTimeout = DEFAULT_JOURNAL_AIO_BUFFER_TIMEOUT;
+   
    protected int journalAIOBufferSize = DEFAULT_JOURNAL_AIO_BUFFER_SIZE;
 
    protected boolean wildcardRoutingEnabled = DEFAULT_WILDCARD_ROUTING_ENABLED;
@@ -704,16 +708,27 @@
       jmxManagementEnabled = enabled;
    }
 
-   public void setAIOBufferTimeout(long timeout)
+
+   public void setAIOBufferTimeout(int timeout)
    {
       this.journalAIOBufferTimeout = timeout;
    }
    
-   public long getAIOBufferTimeout()
+   public int getAIOBufferTimeout()
    {
       return journalAIOBufferTimeout;
    }
 
+   public void setAIOFlushOnSync(boolean flush)
+   {
+      journalAIOFlushSync = flush;
+   }
+
+   public boolean isAIOFlushOnSync()
+   {
+      return journalAIOFlushSync;
+   }
+
    public int getAIOBufferSize()
    {
       return journalAIOBufferSize;

Modified: trunk/src/main/org/jboss/messaging/core/config/impl/FileConfiguration.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/config/impl/FileConfiguration.java	2009-06-03 15:57:12 UTC (rev 7184)
+++ trunk/src/main/org/jboss/messaging/core/config/impl/FileConfiguration.java	2009-06-03 20:36:06 UTC (rev 7185)
@@ -297,9 +297,11 @@
       journalSyncNonTransactional = getBoolean(e, "journal-sync-non-transactional", journalSyncNonTransactional);
 
       journalFileSize = getInteger(e, "journal-file-size", journalFileSize);
-
-      journalAIOBufferTimeout = getLong(e, "journal-aio-buffer-timeout", DEFAULT_JOURNAL_AIO_BUFFER_TIMEOUT);
       
+      journalAIOFlushSync = getBoolean(e, "journal-aio-flush-on-sync", DEFAULT_JOURNAL_AIO_FLUSH_SYNC);
+      
+      journalAIOBufferTimeout = getInteger(e, "journal-aio-buffer-timeout", DEFAULT_JOURNAL_AIO_BUFFER_TIMEOUT);
+      
       journalAIOBufferSize = getInteger(e, "journal-aio-buffer-size", DEFAULT_JOURNAL_AIO_BUFFER_SIZE);
 
       journalMinFiles = getInteger(e, "journal-min-files", journalMinFiles);

Modified: trunk/src/main/org/jboss/messaging/core/journal/SequentialFile.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/SequentialFile.java	2009-06-03 15:57:12 UTC (rev 7184)
+++ trunk/src/main/org/jboss/messaging/core/journal/SequentialFile.java	2009-06-03 20:36:06 UTC (rev 7185)
@@ -40,8 +40,6 @@
    void open() throws Exception;
    
    boolean isOpen();
-   
-   void setBuffering(boolean buffering);
 
    /**
     * For certain operations (like loading) we don't need open the file with full maxIO

Modified: trunk/src/main/org/jboss/messaging/core/journal/SequentialFileFactory.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/SequentialFileFactory.java	2009-06-03 15:57:12 UTC (rev 7184)
+++ trunk/src/main/org/jboss/messaging/core/journal/SequentialFileFactory.java	2009-06-03 20:36:06 UTC (rev 7185)
@@ -46,6 +46,13 @@
    void releaseBuffer(ByteBuffer buffer);
    
    void controlBuffersLifeCycle(boolean value);
+   
+   /** The factory may need to do some initialization before the file is activated.
+    *  this was added as a hook for AIO to initialize the Observer on TimedBuffer.
+    *  It could be eventually done the same on NIO if we implement TimedBuffer on NIO */
+   void activate(SequentialFile file);
+   
+   void deactivate(SequentialFile file);
 
    // To be used in tests only
    ByteBuffer wrapBuffer(byte[] bytes);
@@ -56,6 +63,8 @@
 
    void clearBuffer(ByteBuffer buffer);
    
+   void start();
+   
    void stop();
    
    /** 

Modified: trunk/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFile.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFile.java	2009-06-03 15:57:12 UTC (rev 7184)
+++ trunk/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFile.java	2009-06-03 20:36:06 UTC (rev 7185)
@@ -61,19 +61,21 @@
    private final int maxIO;
 
    private AsynchronousFile aioFile;
-   
+
    private final SequentialFileFactory factory;
-   
-   private long fileSize  = 0;
 
+   private long fileSize = 0;
+
    private final AtomicLong position = new AtomicLong(0);
 
-   private final TimedBuffer timedBuffer;
+   private TimedBuffer timedBuffer;
 
    private final BufferCallback bufferCallback;
-   
-   private boolean buffering = true;
 
+   /** Instead of having AIOSequentialFile implementing the Observer, I have done it on an inner class.
+    *  This is the class returned to the factory when the file is being activated. */
+   private final TimedBufferObserver timedBufferObserver = new LocalBufferObserver();
+
    /** A context switch on AIO would make it to synchronize the disk before
        switching to the new thread, what would cause
        serious performance problems. Because of that we make all the writes on
@@ -100,7 +102,6 @@
       this.bufferCallback = bufferCallback;
       this.executor = executor;
       this.pollerExecutor = pollerExecutor;
-      this.timedBuffer = new TimedBuffer(new LocalBufferObserver(), bufferSize, bufferTimeoutMilliseconds);
    }
 
    public boolean isOpen()
@@ -123,12 +124,12 @@
 
       return pos;
    }
-   
+
    public boolean fits(int size)
    {
       return timedBuffer.checkSize(size);
    }
-   
+
    public void lockBuffer()
    {
       timedBuffer.lock();
@@ -138,14 +139,19 @@
    {
       timedBuffer.unlock();
    }
-   
+
    public synchronized void close() throws Exception
    {
       checkOpened();
       opened = false;
-            
-      timedBuffer.flush();
-      timedBuffer.stop();
+
+//      if (timedBuffer != null)
+//      {
+//         timedBuffer.flush();
+//         timedBuffer.setObserver(null);
+//      } -- remove this
+//      
+      timedBuffer = null;
       
       final CountDownLatch donelatch = new CountDownLatch(1);
 
@@ -164,7 +170,7 @@
       }
 
       aioFile.close();
-      aioFile = null;           
+      aioFile = null;
    }
 
    public void delete() throws Exception
@@ -223,7 +229,7 @@
       }
 
       aioFile.fill(filePosition, blocks, blockSize, fillCharacter);
-      
+
       this.fileSize = aioFile.size();
    }
 
@@ -248,7 +254,6 @@
 
    public synchronized void open(final int currentMaxIO) throws Exception
    {
-      timedBuffer.start();
       opened = true;
       aioFile = newFile();
       aioFile.open(journalDir + "/" + fileName, currentMaxIO);
@@ -298,7 +303,7 @@
 
    public void write(final ByteBuffer bytes, final boolean sync, final IOCallback callback) throws Exception
    {
-      if (buffering)
+      if (timedBuffer != null)
       {
          timedBuffer.addBytes(bytes, sync, callback);
       }
@@ -315,7 +320,7 @@
          IOCallback completion = SimpleWaitIOCallback.getInstance();
 
          write(bytes, true, completion);
-         
+
          completion.waitCompletion();
       }
       else
@@ -324,19 +329,6 @@
       }
    }
 
-   /* (non-Javadoc)
-    * @see org.jboss.messaging.core.journal.SequentialFile#setBuffering(boolean)
-    */
-   public void setBuffering(boolean buffering)
-   {
-      this.buffering = buffering;
-      if (!buffering)
-      {
-         timedBuffer.flush();
-      }
-   };
-
-
    public void sync() throws Exception
    {
       throw new IllegalArgumentException("This method is not supported on AIO");
@@ -353,6 +345,25 @@
       return "AIOSequentialFile:" + journalDir + "/" + fileName;
    }
 
+   // Public methods
+   // -----------------------------------------------------------------------------------------------------
+
+   public void setTimedBuffer(TimedBuffer buffer)
+   {
+      if (timedBuffer != null)
+      {
+         timedBuffer.setObserver(null);
+      }
+      
+      this.timedBuffer = buffer;
+
+      if (buffer != null)
+      {
+         buffer.setObserver(this.timedBufferObserver);
+      }
+
+   }
+
    // Protected methods
    // -----------------------------------------------------------------------------------------------------
 
@@ -383,8 +394,7 @@
          throw new IllegalStateException("File not opened");
       }
    }
-   
-   
+
    private static class DelegateCallback implements IOCallback
    {
       final List<AIOCallback> delegates;
@@ -435,7 +445,7 @@
       public void flushBuffer(ByteBuffer buffer, List<AIOCallback> callbacks)
       {
          buffer.flip();
-         
+
          if (buffer.limit() == 0)
          {
             factory.releaseBuffer(buffer);
@@ -467,6 +477,11 @@
             return (int)(fileSize - position.get());
          }
       }
+      
+      public String toString()
+      {
+         return "TimedBufferObserver on file (" + AIOSequentialFile.this.fileName + ")";
+      }
 
    }
 

Modified: trunk/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFileFactory.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFileFactory.java	2009-06-03 15:57:12 UTC (rev 7184)
+++ trunk/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFileFactory.java	2009-06-03 20:36:06 UTC (rev 7185)
@@ -28,6 +28,7 @@
 import java.util.concurrent.Executors;
 
 import org.jboss.messaging.core.asyncio.impl.AsynchronousFileImpl;
+import org.jboss.messaging.core.asyncio.impl.TimedBuffer;
 import org.jboss.messaging.core.config.impl.ConfigurationImpl;
 import org.jboss.messaging.core.journal.BufferCallback;
 import org.jboss.messaging.core.journal.SequentialFile;
@@ -42,11 +43,11 @@
  *
  */
 public class AIOSequentialFileFactory extends AbstractSequentialFactory
-{     
+{
    private static final Logger log = Logger.getLogger(AIOSequentialFileFactory.class);
 
    private static final boolean trace = log.isTraceEnabled();
-      
+
    private final ReuseBuffersController buffersControl = new ReuseBuffersController();
 
    // This method exists just to make debug easier.
@@ -60,30 +61,68 @@
    /** A single AIO write executor for every AIO File.
     *  This is used only for AIO & instant operations. We only need one executor-thread for the entire journal as we always have only one active file.
     *  And even if we had multiple files at a given moment, this should still be ok, as we control max-io in a semaphore, guaranteeing AIO calls don't block on disk calls */
-   private final Executor writeExecutor = Executors.newSingleThreadExecutor(new JBMThreadFactory("JBM-AIO-writer-pool" + System.identityHashCode(this), true));
-   
+   private final Executor writeExecutor = Executors.newSingleThreadExecutor(new JBMThreadFactory("JBM-AIO-writer-pool" + System.identityHashCode(this),
+                                                                                                 true));
 
-   private final Executor pollerExecutor = Executors.newCachedThreadPool(new JBMThreadFactory("JBM-AIO-poller-pool" + System.identityHashCode(this), true));
-   
+   private final Executor pollerExecutor = Executors.newCachedThreadPool(new JBMThreadFactory("JBM-AIO-poller-pool" + System.identityHashCode(this),
+                                                                                              true));
+
    private final int bufferSize;
-   
+
    private final long bufferTimeout;
-   
+
+   private final TimedBuffer timedBuffer;
+
    public AIOSequentialFileFactory(final String journalDir)
    {
-      this(journalDir, ConfigurationImpl.DEFAULT_JOURNAL_AIO_BUFFER_SIZE, ConfigurationImpl.DEFAULT_JOURNAL_AIO_BUFFER_TIMEOUT);
+      this(journalDir,
+           ConfigurationImpl.DEFAULT_JOURNAL_AIO_BUFFER_SIZE,
+           ConfigurationImpl.DEFAULT_JOURNAL_AIO_BUFFER_TIMEOUT,
+           ConfigurationImpl.DEFAULT_JOURNAL_AIO_FLUSH_SYNC);
    }
 
-   public AIOSequentialFileFactory(final String journalDir, int bufferSize, long bufferTimeout)
+   public AIOSequentialFileFactory(final String journalDir, int bufferSize, long bufferTimeout, boolean flushOnSync)
    {
       super(journalDir);
       this.bufferSize = bufferSize;
       this.bufferTimeout = bufferTimeout;
+      this.timedBuffer = new TimedBuffer(bufferSize, bufferTimeout, flushOnSync);
    }
 
+   /* (non-Javadoc)
+    * @see org.jboss.messaging.core.journal.SequentialFileFactory#activate(org.jboss.messaging.core.journal.SequentialFile)
+    */
+   public void activate(SequentialFile file)
+   {
+      final AIOSequentialFile sequentialFile = (AIOSequentialFile)file;
+      timedBuffer.lock();
+      try
+      {
+         sequentialFile.setTimedBuffer(timedBuffer);
+      }
+      finally
+      {
+         timedBuffer.unlock();
+      }
+   }
+
+   public void deactivate(SequentialFile file)
+   {
+      timedBuffer.flush();
+      timedBuffer.setObserver(null);
+   }
+
    public SequentialFile createSequentialFile(final String fileName, final int maxIO)
    {
-      return new AIOSequentialFile(this, bufferSize, bufferTimeout, journalDir, fileName, maxIO, buffersControl.callback, writeExecutor, pollerExecutor);
+      return new AIOSequentialFile(this,
+                                   bufferSize,
+                                   bufferTimeout,
+                                   journalDir,
+                                   fileName,
+                                   maxIO,
+                                   buffersControl.callback,
+                                   writeExecutor,
+                                   pollerExecutor);
    }
 
    public boolean isSupportsCallbacks()
@@ -95,7 +134,7 @@
    {
       return AsynchronousFileImpl.isLoaded();
    }
-   
+
    public void controlBuffersLifeCycle(boolean value)
    {
       if (value)
@@ -114,7 +153,7 @@
       {
          size = (size / 512 + 1) * 512;
       }
-      
+
       return buffersControl.newBuffer(size);
    }
 
@@ -152,13 +191,18 @@
    {
       AsynchronousFileImpl.destroyBuffer(buffer);
    }
-   
+
+   public void start()
+   {
+      timedBuffer.start();
+   }
+
    public void stop()
    {
       buffersControl.clearPoll();
+      timedBuffer.stop();
    }
-   
-   
+
    /** Class that will control buffer-reuse */
    private class ReuseBuffersController
    {
@@ -168,17 +212,17 @@
        * On the case of the AIO this is almost called by the native layer as soon as the buffer is not being used any more
        * and ready to be reused or GCed */
       private final ConcurrentLinkedQueue<ByteBuffer> reuseBuffersQueue = new ConcurrentLinkedQueue<ByteBuffer>();
-      
+
       /** During reload we may disable/enable buffer reuse */
       private boolean enabled = true;
 
       final BufferCallback callback = new LocalBufferCallback();
-      
+
       public void enable()
       {
          this.enabled = true;
       }
-      
+
       public void disable()
       {
          this.enabled = false;
@@ -191,7 +235,8 @@
          // just to cleanup this
          if (bufferSize > 0 && System.currentTimeMillis() - bufferReuseLastTime > 10000)
          {
-            if (trace) trace("Clearing reuse buffers queue with " + reuseBuffersQueue.size() + " elements");
+            if (trace)
+               trace("Clearing reuse buffers queue with " + reuseBuffersQueue.size() + " elements");
 
             bufferReuseLastTime = System.currentTimeMillis();
 
@@ -201,7 +246,7 @@
          // if a buffer is bigger than the configured-bufferSize, we just create a new
          // buffer.
          if (size > bufferSize)
-         {           
+         {
             return AsynchronousFileImpl.newBuffer(size);
          }
          else
@@ -218,16 +263,16 @@
                // if empty create a new one.
                buffer = AsynchronousFileImpl.newBuffer(bufferSize);
 
-               buffer.limit(alignedSize);               
+               buffer.limit(alignedSize);
             }
             else
-            {               
+            {
                clearBuffer(buffer);
 
                // set the limit of the buffer to the bufferSize being required
                buffer.limit(alignedSize);
             }
-            
+
             buffer.rewind();
 
             return buffer;
@@ -237,7 +282,7 @@
       public void clearPoll()
       {
          ByteBuffer reusedBuffer;
-         
+
          while ((reusedBuffer = reuseBuffersQueue.poll()) != null)
          {
             releaseBuffer(reusedBuffer);
@@ -251,7 +296,7 @@
             if (enabled)
             {
                bufferReuseLastTime = System.currentTimeMillis();
-   
+
                // If a buffer has any other than the configured bufferSize, the buffer
                // will be just sent to GC
                if (buffer.capacity() == bufferSize)
@@ -267,5 +312,4 @@
       }
    }
 
-   
 }

Modified: trunk/src/main/org/jboss/messaging/core/journal/impl/AbstractSequentialFactory.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/impl/AbstractSequentialFactory.java	2009-06-03 15:57:12 UTC (rev 7184)
+++ trunk/src/main/org/jboss/messaging/core/journal/impl/AbstractSequentialFactory.java	2009-06-03 20:36:06 UTC (rev 7185)
@@ -25,10 +25,12 @@
 import java.io.File;
 import java.io.FilenameFilter;
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.Arrays;
 import java.util.List;
 
 import org.jboss.messaging.core.journal.BufferCallback;
+import org.jboss.messaging.core.journal.SequentialFile;
 import org.jboss.messaging.core.journal.SequentialFileFactory;
 import org.jboss.messaging.core.logging.Logger;
 
@@ -60,6 +62,22 @@
    {
    }
    
+   public void start()
+   {
+   }
+   
+   public void activate(SequentialFile file)
+   {
+   }
+   
+   public void releaseBuffer(ByteBuffer buffer)
+   {
+   }
+   
+   public void deactivate(SequentialFile file)
+   {
+   }
+
    /** 
     * Create the directory if it doesn't exist yet
     */

Modified: trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java	2009-06-03 15:57:12 UTC (rev 7184)
+++ trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java	2009-06-03 20:36:06 UTC (rev 7185)
@@ -1355,6 +1355,8 @@
 
          openFile(currentFile);
       }
+      
+      fileFactory.activate(currentFile.getFile());
 
       pushOpenedFile();
 
@@ -1591,6 +1593,8 @@
       }
 
       filesExecutor = Executors.newSingleThreadExecutor();
+      
+      fileFactory.start();
 
       state = STATE_STARTED;
    }
@@ -1663,8 +1667,6 @@
 
       SequentialFile sf = file.getFile();
 
-      sf.setBuffering(false);
-
       sf.open(1);
       
       sf.position(0);
@@ -1677,8 +1679,6 @@
 
       sf.write(bb, true);
 
-      sf.setBuffering(true);
-
       JournalFile jf = new JournalFileImpl(sf, newOrderingID);
 
       sf.position(bb.limit());
@@ -2045,12 +2045,8 @@
 
       bb.rewind();
 
-      sequentialFile.setBuffering(false);
-
       sequentialFile.write(bb, true);
 
-      sequentialFile.setBuffering(true);
-
       JournalFile info = new JournalFileImpl(sequentialFile, orderingID);
 
       if (!keepOpened)
@@ -2082,6 +2078,9 @@
          closeFile(currentFile);
 
          currentFile = enqueueOpenFile();
+         
+         fileFactory.activate(currentFile.getFile());
+        
       }
       finally
       {
@@ -2177,6 +2176,7 @@
 
    private void closeFile(final JournalFile file)
    {
+      fileFactory.deactivate(file.getFile());
       filesExecutor.execute(new Runnable()
       {
          public void run()

Modified: trunk/src/main/org/jboss/messaging/core/journal/impl/NIOSequentialFileFactory.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/impl/NIOSequentialFileFactory.java	2009-06-03 15:57:12 UTC (rev 7184)
+++ trunk/src/main/org/jboss/messaging/core/journal/impl/NIOSequentialFileFactory.java	2009-06-03 20:36:06 UTC (rev 7185)
@@ -55,7 +55,7 @@
    {
       return new NIOSequentialFile(journalDir, fileName);
    }
-
+   
    public boolean isSupportsCallbacks()
    {
       return false;
@@ -94,12 +94,4 @@
       return bytes;
    }
 
-   /* (non-Javadoc)
-    * @see org.jboss.messaging.core.journal.SequentialFileFactory#releaseBuffer(java.nio.ByteBuffer)
-    */
-   public void releaseBuffer(ByteBuffer buffer)
-   {
-      // nothing to be done here
-   }
-
 }

Modified: trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java	2009-06-03 15:57:12 UTC (rev 7184)
+++ trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java	2009-06-03 20:36:06 UTC (rev 7185)
@@ -191,7 +191,8 @@
          {
             journalFF = new AIOSequentialFileFactory(journalDir,
                                                      config.getAIOBufferSize(),
-                                                     config.getAIOBufferTimeout());
+                                                     config.getAIOBufferTimeout(),
+                                                     config.isAIOFlushOnSync());
             log.info("AIO loaded successfully");
          }
       }

Copied: trunk/tests/src/org/jboss/messaging/tests/unit/core/asyncio/TimedBufferTest.java (from rev 7184, trunk/tests/src/org/jboss/messaging/tests/unit/util/timedbuffer/TimedBufferTest.java)
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/asyncio/TimedBufferTest.java	                        (rev 0)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/asyncio/TimedBufferTest.java	2009-06-03 20:36:06 UTC (rev 7185)
@@ -0,0 +1,141 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2009, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+
+package org.jboss.messaging.tests.unit.core.asyncio;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.jboss.messaging.core.asyncio.AIOCallback;
+import org.jboss.messaging.core.asyncio.impl.TimedBuffer;
+import org.jboss.messaging.core.asyncio.impl.TimedBufferObserver;
+import org.jboss.messaging.tests.util.UnitTestCase;
+
+/**
+ * A TimedBufferTest
+ *
+ * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public class TimedBufferTest extends UnitTestCase
+{
+
+   // Constants -----------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   // Public --------------------------------------------------------
+   
+   AIOCallback dummyCallback = new AIOCallback()
+   {
+
+      public void done()
+      {
+      }
+
+      public void onError(int errorCode, String errorMessage)
+      {
+      }
+   };
+
+   
+   public void testFillBuffer()
+   {
+      final ArrayList<ByteBuffer> buffers = new ArrayList<ByteBuffer>();
+      final AtomicInteger flushTimes = new AtomicInteger(0);
+      class TestObserver implements TimedBufferObserver
+      {
+         public void flushBuffer(ByteBuffer buffer, List<AIOCallback> callbacks)
+         {
+            buffers.add(buffer);
+            flushTimes.incrementAndGet();
+         }
+
+         /* (non-Javadoc)
+          * @see org.jboss.messaging.utils.timedbuffer.TimedBufferObserver#newBuffer(int, int)
+          */
+         public ByteBuffer newBuffer(int minSize, int maxSize)
+         {
+            return ByteBuffer.allocate(maxSize);
+         }
+
+         public int getRemainingBytes()
+         {
+            return 1024*1024;
+         }
+      }
+      
+      TimedBuffer timedBuffer = new TimedBuffer(100, 3600 * 1000, false); // Any big timeout
+      
+      timedBuffer.setObserver(new TestObserver());
+      
+      int x = 0;
+      for (int i = 0 ; i < 10; i++)
+      {
+         ByteBuffer record = ByteBuffer.allocate(10);
+         for (int j = 0 ; j < 10; j++)
+         {
+            record.put((byte)getSamplebyte(x++));
+         }
+         
+         timedBuffer.checkSize(10);
+         record.rewind();
+         timedBuffer.addBytes(record, false, dummyCallback);
+      }
+      
+      
+      assertEquals(1, flushTimes.get());
+      
+      ByteBuffer flushedBuffer = buffers.get(0);
+      
+      assertEquals(100, flushedBuffer.limit());
+      
+      assertEquals(100, flushedBuffer.capacity());
+      
+
+      flushedBuffer.rewind();
+      
+      for (int i = 0; i < 100; i++)
+      {
+         assertEquals(getSamplebyte(i), flushedBuffer.get());
+      }
+      
+      
+   }
+   
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+
+}

Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/config/impl/FileConfigurationTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/config/impl/FileConfigurationTest.java	2009-06-03 15:57:12 UTC (rev 7184)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/config/impl/FileConfigurationTest.java	2009-06-03 20:36:06 UTC (rev 7185)
@@ -79,7 +79,8 @@
       assertEquals("somedir2", conf.getJournalDirectory());
       assertEquals(false, conf.isCreateJournalDir());
       assertEquals(JournalType.NIO, conf.getJournalType());
-      assertEquals(10000, conf.getAIOBufferSize());        
+      assertEquals(10000, conf.getAIOBufferSize());
+      assertEquals(true, conf.isAIOFlushOnSync());      
       assertEquals(1000, conf.getAIOBufferTimeout());      
       assertEquals(false, conf.isJournalSyncTransactional());
       assertEquals(true, conf.isJournalSyncNonTransactional());

Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalAsyncTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalAsyncTest.java	2009-06-03 15:57:12 UTC (rev 7184)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalAsyncTest.java	2009-06-03 20:36:06 UTC (rev 7185)
@@ -147,10 +147,10 @@
             {
                for (int i = 0; i < 10; i++)
                {
-                  journalImpl.appendAddRecordTransactional(1l, i, (byte)1, new SimpleEncoding(1, (byte)0), false);
+                  journalImpl.appendAddRecordTransactional(1l, i, (byte)1, new SimpleEncoding(1, (byte)0), true);
                }
 
-               journalImpl.appendRollbackRecord(1l, false);
+               journalImpl.appendRollbackRecord(1l, true);
             }
             catch (Exception e)
             {
@@ -211,10 +211,10 @@
             {
                for (int i = 0; i < 10; i++)
                {
-                  journalImpl.appendAddRecordTransactional(1l, i, (byte)1, new SimpleEncoding(1, (byte)0), false);
+                  journalImpl.appendAddRecordTransactional(1l, i, (byte)1, new SimpleEncoding(1, (byte)0), true);
                }
 
-               journalImpl.appendCommitRecord(1l, false);
+               journalImpl.appendCommitRecord(1l, true);
             }
             catch (Exception e)
             {
@@ -268,7 +268,7 @@
       factory.setHoldCallbacks(true, null);
       factory.setGenerateErrors(true);
 
-      journalImpl.appendAddRecordTransactional(1l, 1, (byte)1, new SimpleEncoding(1, (byte)0), false);
+      journalImpl.appendAddRecordTransactional(1l, 1, (byte)1, new SimpleEncoding(1, (byte)0), true);
 
       factory.flushAllCallbacks();
 
@@ -277,7 +277,7 @@
 
       try
       {
-         journalImpl.appendAddRecordTransactional(1l, 2, (byte)1, new SimpleEncoding(1, (byte)0), false);
+         journalImpl.appendAddRecordTransactional(1l, 2, (byte)1, new SimpleEncoding(1, (byte)0), true);
          fail("Exception expected"); // An exception already happened in one
          // of the elements on this transaction.
          // We can't accept any more elements on
@@ -298,7 +298,7 @@
 
       try
       {
-         journalImpl.appendAddRecord(1l, (byte)0, new SimpleEncoding(1, (byte)0), false);
+         journalImpl.appendAddRecord(1l, (byte)0, new SimpleEncoding(1, (byte)0), true);
          fail("Exception expected");
       }
       catch (Exception ignored)

Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalImplTestUnit.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalImplTestUnit.java	2009-06-03 15:57:12 UTC (rev 7184)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalImplTestUnit.java	2009-06-03 20:36:06 UTC (rev 7185)
@@ -119,7 +119,6 @@
       // exceptions)
       for (int i = 0; i < 100; i++)
       {
-         System.out.println("i = " + i);
          journal.appendAddRecord(1, (byte)1, new SimpleEncoding(2, (byte)'a'), false);
       }
       stopJournal();
@@ -1666,6 +1665,7 @@
       assertEquals(1, journal.getIDMapSize());
    }
 
+   
    public void testPrepareNoReclaim() throws Exception
    {
       setup(2, calculateRecordSize(JournalImpl.SIZE_HEADER, getAlignment()) + calculateRecordSize(recordLength,

Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java	2009-06-03 15:57:12 UTC (rev 7184)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java	2009-06-03 20:36:06 UTC (rev 7185)
@@ -641,4 +641,25 @@
    {
    }
 
+   /* (non-Javadoc)
+    * @see org.jboss.messaging.core.journal.SequentialFileFactory#activate(org.jboss.messaging.core.journal.SequentialFile)
+    */
+   public void activate(SequentialFile file)
+   {
+   }
+
+   /* (non-Javadoc)
+    * @see org.jboss.messaging.core.journal.SequentialFileFactory#start()
+    */
+   public void start()
+   {
+   }
+
+   /* (non-Javadoc)
+    * @see org.jboss.messaging.core.journal.SequentialFileFactory#deactivate(org.jboss.messaging.core.journal.SequentialFile)
+    */
+   public void deactivate(SequentialFile file)
+   {
+   }
+
 }

Deleted: trunk/tests/src/org/jboss/messaging/tests/unit/util/timedbuffer/TimedBufferTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/util/timedbuffer/TimedBufferTest.java	2009-06-03 15:57:12 UTC (rev 7184)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/util/timedbuffer/TimedBufferTest.java	2009-06-03 20:36:06 UTC (rev 7185)
@@ -1,140 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005-2009, Red Hat Middleware LLC, and individual contributors
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-
-
-package org.jboss.messaging.tests.unit.util.timedbuffer;
-
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.jboss.messaging.core.asyncio.AIOCallback;
-import org.jboss.messaging.core.asyncio.impl.TimedBuffer;
-import org.jboss.messaging.core.asyncio.impl.TimedBufferObserver;
-import org.jboss.messaging.tests.util.UnitTestCase;
-
-/**
- * A TimedBufferTest
- *
- * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
- *
- *
- */
-public class TimedBufferTest extends UnitTestCase
-{
-
-   // Constants -----------------------------------------------------
-
-   // Attributes ----------------------------------------------------
-
-   // Static --------------------------------------------------------
-
-   // Constructors --------------------------------------------------
-
-   // Public --------------------------------------------------------
-   
-   AIOCallback dummyCallback = new AIOCallback()
-   {
-
-      public void done()
-      {
-      }
-
-      public void onError(int errorCode, String errorMessage)
-      {
-      }
-   };
-
-   
-   public void testFillBuffer()
-   {
-      final ArrayList<ByteBuffer> buffers = new ArrayList<ByteBuffer>();
-      final AtomicInteger flushTimes = new AtomicInteger(0);
-      class TestObserver implements TimedBufferObserver
-      {
-         //TODO: fix the test
-         public void flushBuffer(ByteBuffer buffer, List<AIOCallback> callbacks)
-         {
-            buffers.add(buffer);
-            flushTimes.incrementAndGet();
-         }
-
-         /* (non-Javadoc)
-          * @see org.jboss.messaging.utils.timedbuffer.TimedBufferObserver#newBuffer(int, int)
-          */
-         public ByteBuffer newBuffer(int minSize, int maxSize)
-         {
-            return ByteBuffer.allocate(maxSize);
-         }
-
-         public int getRemainingBytes()
-         {
-            return 1024*1024;
-         }
-      }
-      
-      TimedBuffer timedBuffer = new TimedBuffer(new TestObserver(), 100, 3600 * 1000); // Any big timeout
-      
-      int x = 0;
-      for (int i = 0 ; i < 10; i++)
-      {
-         ByteBuffer record = ByteBuffer.allocate(10);
-         for (int j = 0 ; j < 10; j++)
-         {
-            record.put((byte)getSamplebyte(x++));
-         }
-         
-         timedBuffer.checkSize(10);
-         record.rewind();
-         timedBuffer.addBytes(record, false, dummyCallback);
-      }
-      
-      
-      assertEquals(1, flushTimes.get());
-      
-      ByteBuffer flushedBuffer = buffers.get(0);
-      
-      assertEquals(100, flushedBuffer.limit());
-      
-      assertEquals(100, flushedBuffer.capacity());
-      
-
-      flushedBuffer.rewind();
-      
-      for (int i = 0; i < 100; i++)
-      {
-         assertEquals(getSamplebyte(i), flushedBuffer.get());
-      }
-      
-      
-   }
-   
-   // Package protected ---------------------------------------------
-
-   // Protected -----------------------------------------------------
-
-   // Private -------------------------------------------------------
-
-   // Inner classes -------------------------------------------------
-
-}




More information about the jboss-cvs-commits mailing list