[hornetq-commits] JBoss hornetq SVN: r8267 - in trunk: src/main/org/hornetq/core/journal and 11 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Wed Nov 11 23:39:28 EST 2009


Author: clebert.suconic at jboss.com
Date: 2009-11-11 23:39:27 -0500 (Wed, 11 Nov 2009)
New Revision: 8267

Added:
   trunk/src/main/org/hornetq/core/journal/impl/TimedBuffer.java
   trunk/src/main/org/hornetq/core/journal/impl/TimedBufferObserver.java
   trunk/tests/src/org/hornetq/tests/integration/journal/NIONoBufferJournalImplTest.java
   trunk/tests/src/org/hornetq/tests/stress/journal/AIOMultiThreadCompactorStressTest.java
   trunk/tests/src/org/hornetq/tests/stress/journal/NIOMultiThreadCompactorStressTest.java
   trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/TimedBufferTest.java
Removed:
   trunk/src/main/org/hornetq/core/asyncio/impl/TimedBuffer.java
   trunk/src/main/org/hornetq/core/asyncio/impl/TimedBufferObserver.java
   trunk/tests/src/org/hornetq/tests/integration/journal/MultiThreadCompactorTest.java
   trunk/tests/src/org/hornetq/tests/stress/journal/remote/
   trunk/tests/src/org/hornetq/tests/unit/core/asyncio/TimedBufferTest.java
Modified:
   trunk/src/main/org/hornetq/core/journal/SequentialFile.java
   trunk/src/main/org/hornetq/core/journal/impl/AIOSequentialFile.java
   trunk/src/main/org/hornetq/core/journal/impl/AIOSequentialFileFactory.java
   trunk/src/main/org/hornetq/core/journal/impl/AbstractJournalUpdateTask.java
   trunk/src/main/org/hornetq/core/journal/impl/AbstractSequentialFactory.java
   trunk/src/main/org/hornetq/core/journal/impl/AbstractSequentialFile.java
   trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java
   trunk/src/main/org/hornetq/core/journal/impl/NIOSequentialFile.java
   trunk/src/main/org/hornetq/core/journal/impl/NIOSequentialFileFactory.java
   trunk/src/main/org/hornetq/core/paging/impl/PageImpl.java
   trunk/src/main/org/hornetq/core/paging/impl/PagingStoreFactoryNIO.java
   trunk/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
   trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
   trunk/src/main/org/hornetq/core/transaction/impl/TransactionImpl.java
   trunk/tests/src/org/hornetq/tests/integration/jms/server/JMSServerStartStopTest.java
   trunk/tests/src/org/hornetq/tests/integration/journal/ValidateTransactionHealthTest.java
   trunk/tests/src/org/hornetq/tests/stress/journal/LargeJournalStressTest.java
   trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/AlignedJournalImplTest.java
   trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/JournalImplTestUnit.java
   trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/SequentialFileFactoryTestBase.java
   trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java
   trunk/tests/src/org/hornetq/tests/unit/core/paging/impl/PageImplTest.java
Log:
https://jira.jboss.org/jira/browse/HORNETQ-214 - Implementing TimedBuffer into NIO

Deleted: trunk/src/main/org/hornetq/core/asyncio/impl/TimedBuffer.java
===================================================================
--- trunk/src/main/org/hornetq/core/asyncio/impl/TimedBuffer.java	2009-11-12 02:47:08 UTC (rev 8266)
+++ trunk/src/main/org/hornetq/core/asyncio/impl/TimedBuffer.java	2009-11-12 04:39:27 UTC (rev 8267)
@@ -1,400 +0,0 @@
-/*
- * Copyright 2009 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *    http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied.  See the License for the specific language governing
- * permissions and limitations under the License.
- */
-
-package org.hornetq.core.asyncio.impl;
-
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Timer;
-import java.util.TimerTask;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
-import org.hornetq.core.asyncio.AIOCallback;
-import org.hornetq.core.buffers.ChannelBuffers;
-import org.hornetq.core.logging.Logger;
-import org.hornetq.core.remoting.spi.HornetQBuffer;
-import org.hornetq.utils.VariableLatch;
-
-/**
- * A TimedBuffer
- *
- * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
- * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- *
- */
-public class TimedBuffer
-{
-   // Constants -----------------------------------------------------
-
-   private static final Logger log = Logger.getLogger(TimedBuffer.class);
-
-   // Attributes ----------------------------------------------------
-
-   private TimedBufferObserver bufferObserver;
-
-   // This is used to pause and resume the timer
-   // This is a reusable Latch, that uses java.util.concurrent base classes
-   private final VariableLatch latchTimer = new VariableLatch();
-
-   private CheckTimer timerRunnable = new CheckTimer();
-
-   private final int bufferSize;
-
-   private final HornetQBuffer buffer;
-
-   private int bufferLimit = 0;
-
-   private List<AIOCallback> callbacks;
-
-   private final Lock lock = new ReentrantReadWriteLock().writeLock();
-
-   // used to measure inactivity. This buffer will be automatically flushed when more than timeout inactive
-   private volatile boolean active = false;
-
-   private final long timeout;
-
-   // used to measure sync requests. When a sync is requested, it shouldn't take more than timeout to happen
-   private volatile boolean pendingSync = false;
-
-   private Thread timerThread;
-
-   private volatile boolean started;
-
-   private final boolean flushOnSync;
-
-   // for logging write rates
-
-   private final boolean logRates;
-
-   private volatile long bytesFlushed;
-
-   private Timer logRatesTimer;
-
-   private TimerTask logRatesTimerTask;
-
-   private long lastExecution;
-
-   // Static --------------------------------------------------------
-
-   // Constructors --------------------------------------------------
-
-   // Public --------------------------------------------------------
-
-   public TimedBuffer(final int size, final long timeout, final boolean flushOnSync, final boolean logRates)
-   {
-      bufferSize = size;
-      this.logRates = logRates;
-      if (logRates)
-      {
-         this.logRatesTimer = new Timer(true);
-      }
-      // Setting the interval for nano-sleeps
-
-      buffer = ChannelBuffers.buffer(bufferSize);
-      buffer.clear();
-      bufferLimit = 0;
-
-      callbacks = new ArrayList<AIOCallback>();
-      this.flushOnSync = flushOnSync;
-      latchTimer.up();
-      this.timeout = timeout;
-   }
-
-   public synchronized void start()
-   {
-      if (started)
-      {
-         return;
-      }
-
-      timerRunnable = new CheckTimer();
-
-      timerThread = new Thread(timerRunnable, "hornetq-aio-timer");
-
-      timerThread.start();
-
-      if (logRates)
-      {
-         logRatesTimerTask = new LogRatesTimerTask();
-
-         logRatesTimer.scheduleAtFixedRate(logRatesTimerTask, 2000, 2000);
-      }
-
-      started = true;
-   }
-
-   public void stop()
-   {
-      if (!started)
-      {
-         return;
-      }
-
-      this.flush();
-
-      this.bufferObserver = null;
-
-      latchTimer.down();
-
-      timerRunnable.close();
-
-      if (logRates)
-      {
-         logRatesTimerTask.cancel();
-      }
-
-      while (timerThread.isAlive())
-      {
-         try
-         {
-            timerThread.join();
-         }
-         catch (InterruptedException e)
-         {
-         }
-      }
-
-      started = false;
-   }
-
-   public synchronized void setObserver(TimedBufferObserver observer)
-   {
-      if (this.bufferObserver != null)
-      {
-         flush();
-      }
-
-      this.bufferObserver = observer;
-   }
-
-   public void disableAutoFlush()
-   {
-      lock.lock();
-   }
-
-   public void enableAutoFlush()
-   {
-      lock.unlock();
-   }
-
-   /**
-    * Verify if the size fits the buffer
-    * @param sizeChecked
-    * @return
-    */
-   public synchronized boolean checkSize(final int sizeChecked)
-   {
-      if (sizeChecked > bufferSize)
-      {
-         throw new IllegalStateException("Can't write records bigger than the bufferSize(" + bufferSize +
-                                         ") on the journal");
-      }
-
-      if (bufferLimit == 0 || buffer.writerIndex() + sizeChecked > bufferLimit)
-      {
-         flush();
-
-         final int remaining = bufferObserver.getRemainingBytes();
-
-         if (sizeChecked > remaining)
-         {
-            return false;
-         }
-         else
-         {
-            buffer.clear();
-            bufferLimit = Math.min(remaining, bufferSize);
-            return true;
-         }
-      }
-      else
-      {
-         return true;
-      }
-   }
-
-   public synchronized void addBytes(final byte[] bytes, final boolean sync, final AIOCallback callback)
-   {
-      if (buffer.writerIndex() == 0)
-      {
-         // Resume latch
-         latchTimer.down();
-      }
-
-      buffer.writeBytes(bytes);
-
-      callbacks.add(callback);
-
-      active = true;
-
-      if (sync)
-      {
-         if (flushOnSync)
-         {
-            flush();
-         }
-         else
-         {
-            // We should flush on the next timeout, no matter what other activity happens on the buffer
-            if (!pendingSync)
-            {
-               pendingSync = true;
-            }
-         }
-      }
-
-      if (buffer.writerIndex() == bufferLimit)
-      {
-         flush();
-      }
-   }
-
-   public synchronized void flush()
-   {
-      if (buffer.writerIndex() > 0)
-      {
-         latchTimer.up();
-
-         int pos = buffer.writerIndex();
-
-         if (logRates)
-         {
-            bytesFlushed += pos;
-         }
-
-         ByteBuffer directBuffer = bufferObserver.newBuffer(bufferSize, pos);
-
-         // Putting a byteArray on a native buffer is much faster, since it will do in a single native call.
-         // Using directBuffer.put(buffer) would make several append calls for each byte
-
-         directBuffer.put(buffer.array(), 0, pos);
-
-         bufferObserver.flushBuffer(directBuffer, callbacks);
-
-         callbacks = new ArrayList<AIOCallback>();
-
-         active = false;
-         pendingSync = false;
-
-         buffer.clear();
-         bufferLimit = 0;
-      }
-   }
-
-   // Package protected ---------------------------------------------
-
-   // Protected -----------------------------------------------------
-
-   // Private -------------------------------------------------------
-
-   private void checkTimer()
-   {
-      // if inactive for more than the timeout
-      // of if a sync happened at more than the the timeout ago
-      if (!active || pendingSync)
-      {
-         lock.lock();
-         try
-         {
-            if (bufferObserver != null)
-            {
-               flush();
-            }
-         }
-         finally
-         {
-            lock.unlock();
-         }
-      }
-
-      // Set the buffer as inactive.. we will flush the buffer next tick if nothing change this
-      active = false;
-   }
-
-   // Inner classes -------------------------------------------------
-
-   private class LogRatesTimerTask extends TimerTask
-   {
-      private boolean closed;
-
-      @Override
-      public synchronized void run()
-      {
-         if (!closed)
-         {
-            long now = System.currentTimeMillis();
-
-            if (lastExecution != 0)
-            {
-               double rate = 1000 * ((double)bytesFlushed) / (now - lastExecution);
-               log.info("Write rate = " + rate + " bytes / sec or " + (long)(rate / (1024 * 1024)) + " MiB / sec");
-            }
-
-            lastExecution = now;
-
-            bytesFlushed = 0;
-         }
-      }
-
-      public synchronized boolean cancel()
-      {
-         closed = true;
-
-         return super.cancel();
-      }
-   }
-
-   private class CheckTimer implements Runnable
-   {
-      private volatile boolean closed = false;
-
-      public void run()
-      {
-         while (!closed)
-         {
-            try
-            {
-               latchTimer.waitCompletion();
-            }
-            catch (InterruptedException ignored)
-            {
-            }
-
-            sleep();
-
-            checkTimer();
-
-         }
-      }
-
-      /**
-       * 
-       */
-      private void sleep()
-      {
-         long time = System.nanoTime() + timeout;
-         while (time > System.nanoTime())
-         {
-            Thread.yield();
-         }
-      }
-
-      public void close()
-      {
-         closed = true;
-      }
-   }
-
-}

Deleted: trunk/src/main/org/hornetq/core/asyncio/impl/TimedBufferObserver.java
===================================================================
--- trunk/src/main/org/hornetq/core/asyncio/impl/TimedBufferObserver.java	2009-11-12 02:47:08 UTC (rev 8266)
+++ trunk/src/main/org/hornetq/core/asyncio/impl/TimedBufferObserver.java	2009-11-12 04:39:27 UTC (rev 8267)
@@ -1,60 +0,0 @@
-/*
- * Copyright 2009 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *    http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied.  See the License for the specific language governing
- * permissions and limitations under the License.
- */
-
-
-package org.hornetq.core.asyncio.impl;
-
-import java.nio.ByteBuffer;
-import java.util.List;
-
-import org.hornetq.core.asyncio.AIOCallback;
-
-/**
- * A TimedBufferObserver
- *
- * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
- *
- *
- */
-public interface TimedBufferObserver
-{
-
-   // Constants -----------------------------------------------------
-
-   // Attributes ----------------------------------------------------
-
-   // Static --------------------------------------------------------
-
-   // Constructors --------------------------------------------------
-
-   // Public --------------------------------------------------------
-   
-   public void flushBuffer(ByteBuffer buffer, List<AIOCallback> callbacks);
-   
-   
-   /** Return the number of remaining bytes that still fit on the observer (file) */
-   public int getRemainingBytes();
-   
-   
-   public ByteBuffer newBuffer(int size, int limit);
-   
-
-   // Package protected ---------------------------------------------
-
-   // Protected -----------------------------------------------------
-
-   // Private -------------------------------------------------------
-
-   // Inner classes -------------------------------------------------
-
-}

Modified: trunk/src/main/org/hornetq/core/journal/SequentialFile.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/SequentialFile.java	2009-11-12 02:47:08 UTC (rev 8266)
+++ trunk/src/main/org/hornetq/core/journal/SequentialFile.java	2009-11-12 04:39:27 UTC (rev 8267)
@@ -15,6 +15,7 @@
 
 import java.nio.ByteBuffer;
 
+import org.hornetq.core.journal.impl.TimedBuffer;
 import org.hornetq.core.remoting.spi.HornetQBuffer;
 
 /**
@@ -59,9 +60,11 @@
 
    void write(HornetQBuffer bytes, boolean sync) throws Exception;
 
-   void write(ByteBuffer bytes, boolean sync, IOCallback callback) throws Exception;
+   /** Write directly to the file without using any buffer */
+   void writeDirect(ByteBuffer bytes, boolean sync, IOCallback callback);
 
-   void write(ByteBuffer bytes, boolean sync) throws Exception;
+   /** Write directly to the file without using any buffer */
+   void writeDirect(ByteBuffer bytes, boolean sync) throws Exception;
 
    int read(ByteBuffer bytes, IOCallback callback) throws Exception;
 
@@ -86,4 +89,6 @@
    void enableAutoFlush();
 
    SequentialFile copy();
+   
+   void setTimedBuffer(TimedBuffer buffer);
 }

Modified: trunk/src/main/org/hornetq/core/journal/impl/AIOSequentialFile.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/AIOSequentialFile.java	2009-11-12 02:47:08 UTC (rev 8266)
+++ trunk/src/main/org/hornetq/core/journal/impl/AIOSequentialFile.java	2009-11-12 04:39:27 UTC (rev 8267)
@@ -25,8 +25,6 @@
 import org.hornetq.core.asyncio.AsynchronousFile;
 import org.hornetq.core.asyncio.BufferCallback;
 import org.hornetq.core.asyncio.impl.AsynchronousFileImpl;
-import org.hornetq.core.asyncio.impl.TimedBuffer;
-import org.hornetq.core.asyncio.impl.TimedBufferObserver;
 import org.hornetq.core.journal.IOCallback;
 import org.hornetq.core.journal.SequentialFileFactory;
 import org.hornetq.core.journal.SequentialFile;
@@ -50,20 +48,8 @@
 
    private AsynchronousFile aioFile;
 
-   private final SequentialFileFactory factory;
-
-   private long fileSize = 0;
-
-   private final AtomicLong position = new AtomicLong(0);
-
-   private TimedBuffer timedBuffer;
-
    private final BufferCallback bufferCallback;
 
-   /** Instead of having AIOSequentialFile implementing the Observer, I have done it on an inner class.
-    *  This is the class returned to the factory when the file is being activated. */
-   private final TimedBufferObserver timedBufferObserver = new LocalBufferObserver();
-
    /** A context switch on AIO would make it to synchronize the disk before
        switching to the new thread, what would cause
        serious performance problems. Because of that we make all the writes on
@@ -83,8 +69,7 @@
                             final Executor executor,
                             final Executor pollerExecutor)
    {
-      super(directory, new File(directory + "/" + fileName));
-      this.factory = factory;
+      super(directory, new File(directory + "/" + fileName), factory);
       this.maxIO = maxIO;
       this.bufferCallback = bufferCallback;
       this.executor = executor;
@@ -112,21 +97,6 @@
       return pos;
    }
 
-   public boolean fits(int size)
-   {
-      return timedBuffer.checkSize(size);
-   }
-
-   public void disableAutoFlush()
-   {
-      timedBuffer.disableAutoFlush();
-   }
-
-   public void enableAutoFlush()
-   {
-      timedBuffer.enableAutoFlush();
-   }
-
    public SequentialFile copy()
    {
       return new AIOSequentialFile(factory, -1, -1, getFile().getParent(), getFileName(), maxIO, bufferCallback, executor, pollerExecutor);
@@ -244,16 +214,6 @@
       aioFile.setBufferCallback(callback);
    }
 
-   public void position(final long pos) throws Exception
-   {
-      position.set(pos);
-   }
-
-   public long position() throws Exception
-   {
-      return position.get();
-   }
-
    public int read(final ByteBuffer bytes, final IOCallback callback) throws Exception
    {
       int bytesToRead = bytes.limit();
@@ -277,64 +237,8 @@
 
       return bytesRead;
    }
+   
 
-   public void write(final HornetQBuffer bytes, final boolean sync, final IOCallback callback) throws Exception
-   {
-      if (timedBuffer != null)
-      {
-         timedBuffer.addBytes(bytes.array(), sync, callback);
-      }
-      else
-      {
-         ByteBuffer buffer = factory.newBuffer(bytes.capacity());
-         buffer.put(bytes.array());
-         doWrite(buffer, callback);
-      }
-   }
-
-   public void write(final HornetQBuffer bytes, final boolean sync) throws Exception
-   {
-      if (sync)
-      {
-         IOCallback completion = SimpleWaitIOCallback.getInstance();
-
-         write(bytes, true, completion);
-
-         completion.waitCompletion();
-      }
-      else
-      {
-         write(bytes, false, DummyCallback.getInstance());
-      }
-   }
-
-   public void write(final ByteBuffer bytes, final boolean sync, final IOCallback callback) throws Exception
-   {
-      if (timedBuffer != null)
-      {
-         // sanity check.. it shouldn't happen
-         log.warn("Illegal buffered usage. Can't use ByteBuffer write while buffer SequentialFile");
-      }
-
-      doWrite(bytes, callback);
-   }
-
-   public void write(final ByteBuffer bytes, final boolean sync) throws Exception
-   {
-      if (sync)
-      {
-         IOCallback completion = SimpleWaitIOCallback.getInstance();
-
-         write(bytes, true, completion);
-
-         completion.waitCompletion();
-      }
-      else
-      {
-         write(bytes, false, DummyCallback.getInstance());
-      }
-   }
-
    public void sync() throws Exception
    {
       throw new IllegalArgumentException("This method is not supported on AIO");
@@ -361,22 +265,6 @@
    // Public methods
    // -----------------------------------------------------------------------------------------------------
 
-   public void setTimedBuffer(TimedBuffer buffer)
-   {
-      if (timedBuffer != null)
-      {
-         timedBuffer.setObserver(null);
-      }
-
-      this.timedBuffer = buffer;
-
-      if (buffer != null)
-      {
-         buffer.setObserver(this.timedBufferObserver);
-      }
-
-   }
-
    // Protected methods
    // -----------------------------------------------------------------------------------------------------
 
@@ -388,10 +276,29 @@
       return new AsynchronousFileImpl(executor, pollerExecutor);
    }
 
-   // Private methods
-   // -----------------------------------------------------------------------------------------------------
+   
+   public void writeDirect(final ByteBuffer bytes, final boolean sync) throws Exception
+   {
+      if (sync)
+      {
+         IOCallback completion = SimpleWaitIOCallback.getInstance();
+  
+         writeDirect(bytes, true, completion);
+  
+         completion.waitCompletion();
+      }
+      else
+      {
+         writeDirect(bytes, false, DummyCallback.getInstance());
+      }
+   }
 
-   private void doWrite(final ByteBuffer bytes, final IOCallback callback)
+   
+   /**
+    * 
+    * @param sync Not used on AIO
+    *  */
+   public void writeDirect(final ByteBuffer bytes, final boolean sync, IOCallback callback)
    {
       final int bytesToWrite = factory.calculateBlockSize(bytes.limit());
 
@@ -400,6 +307,10 @@
       aioFile.write(positionToWrite, bytesToWrite, bytes, callback);
    }
 
+
+   // Private methods
+   // -----------------------------------------------------------------------------------------------------
+
    private void checkOpened() throws Exception
    {
       if (aioFile == null || !opened)
@@ -407,93 +318,4 @@
          throw new IllegalStateException("File not opened");
       }
    }
-
-   private static class DelegateCallback implements IOCallback
-   {
-      final List<AIOCallback> delegates;
-
-      DelegateCallback(List<AIOCallback> delegates)
-      {
-         this.delegates = delegates;
-      }
-
-      public void done()
-      {
-         for (AIOCallback callback : delegates)
-         {
-            try
-            {
-               callback.done();
-            }
-            catch (Throwable e)
-            {
-               log.warn(e.getMessage(), e);
-            }
-         }
-      }
-
-      public void onError(int errorCode, String errorMessage)
-      {
-         for (AIOCallback callback : delegates)
-         {
-            try
-            {
-               callback.onError(errorCode, errorMessage);
-            }
-            catch (Throwable e)
-            {
-               log.warn(e.getMessage(), e);
-            }
-         }
-      }
-
-      public void waitCompletion() throws Exception
-      {
-      }
-   }
-
-   private class LocalBufferObserver implements TimedBufferObserver
-   {
-      public void flushBuffer(ByteBuffer buffer, List<AIOCallback> callbacks)
-      {
-         buffer.flip();
-
-         if (buffer.limit() == 0)
-         {
-            factory.releaseBuffer(buffer);
-         }
-         else
-         {
-            doWrite(buffer, new DelegateCallback(callbacks));
-         }
-      }
-
-      public ByteBuffer newBuffer(int size, int limit)
-      {
-         size = factory.calculateBlockSize(size);
-         limit = factory.calculateBlockSize(limit);
-
-         ByteBuffer buffer = factory.newBuffer(size);
-         buffer.limit(limit);
-         return buffer;
-      }
-
-      public int getRemainingBytes()
-      {
-         if (fileSize - position.get() > Integer.MAX_VALUE)
-         {
-            return Integer.MAX_VALUE;
-         }
-         else
-         {
-            return (int)(fileSize - position.get());
-         }
-      }
-
-      public String toString()
-      {
-         return "TimedBufferObserver on file (" + getFile().getName() + ")";
-      }
-
-   }
 }

Modified: trunk/src/main/org/hornetq/core/journal/impl/AIOSequentialFileFactory.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/AIOSequentialFileFactory.java	2009-11-12 02:47:08 UTC (rev 8266)
+++ trunk/src/main/org/hornetq/core/journal/impl/AIOSequentialFileFactory.java	2009-11-12 04:39:27 UTC (rev 8267)
@@ -21,7 +21,6 @@
 
 import org.hornetq.core.asyncio.BufferCallback;
 import org.hornetq.core.asyncio.impl.AsynchronousFileImpl;
-import org.hornetq.core.asyncio.impl.TimedBuffer;
 import org.hornetq.core.config.impl.ConfigurationImpl;
 import org.hornetq.core.journal.SequentialFile;
 import org.hornetq.core.logging.Logger;
@@ -61,12 +60,6 @@
 
    private ExecutorService pollerExecutor;
 
-   private final int bufferSize;
-
-   private final long bufferTimeout;
-
-   private final TimedBuffer timedBuffer;
-
    public AIOSequentialFileFactory(final String journalDir)
    {
       this(journalDir,
@@ -82,43 +75,9 @@
                                    final boolean flushOnSync,
                                    final boolean logRates)
    {
-      super(journalDir);
-      this.bufferSize = bufferSize;
-      this.bufferTimeout = bufferTimeout;
-      timedBuffer = new TimedBuffer(bufferSize, bufferTimeout, flushOnSync, logRates);
+      super(journalDir, true, bufferSize, bufferTimeout, flushOnSync, logRates);
    }
 
-   /* (non-Javadoc)
-    * @see org.hornetq.core.journal.SequentialFileFactory#activate(org.hornetq.core.journal.SequentialFile)
-    */
-   @Override
-   public void activateBuffer(final SequentialFile file)
-   {
-      final AIOSequentialFile sequentialFile = (AIOSequentialFile)file;
-      timedBuffer.disableAutoFlush();
-      try
-      {
-         sequentialFile.setTimedBuffer(timedBuffer);
-      }
-      finally
-      {
-         timedBuffer.enableAutoFlush();
-      }
-   }
-
-   @Override
-   public void flush()
-   {
-      timedBuffer.flush();
-   }
-
-   @Override
-   public void deactivateBuffer()
-   {
-      timedBuffer.flush();
-      timedBuffer.setObserver(null);
-   }
-
    public SequentialFile createSequentialFile(final String fileName, final int maxIO)
    {
       return new AIOSequentialFile(this,
@@ -191,7 +150,7 @@
    @Override
    public void start()
    {
-      timedBuffer.start();
+      super.start();
 
       writeExecutor = Executors.newSingleThreadExecutor(new HornetQThreadFactory("HornetQ-AIO-writer-pool" + System.identityHashCode(this),
                                                                                  true));
@@ -204,10 +163,10 @@
    @Override
    public void stop()
    {
+      super.stop();
+      
       buffersControl.stop();
 
-      timedBuffer.stop();
-
       writeExecutor.shutdown();
 
       try

Modified: trunk/src/main/org/hornetq/core/journal/impl/AbstractJournalUpdateTask.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/AbstractJournalUpdateTask.java	2009-11-12 02:47:08 UTC (rev 8266)
+++ trunk/src/main/org/hornetq/core/journal/impl/AbstractJournalUpdateTask.java	2009-11-12 04:39:27 UTC (rev 8267)
@@ -165,7 +165,7 @@
 
          writeBuffer.rewind();
 
-         controlFile.write(writeBuffer, true);
+         controlFile.writeDirect(writeBuffer, true);
 
          return controlFile;
       }
@@ -181,7 +181,7 @@
       if (writingChannel != null)
       {
          sequentialFile.position(0);
-         sequentialFile.write(writingChannel.toByteBuffer(), true);
+         sequentialFile.writeDirect(writingChannel.toByteBuffer(), true);
          sequentialFile.close();
          newDataFiles.add(currentFile);
       }

Modified: trunk/src/main/org/hornetq/core/journal/impl/AbstractSequentialFactory.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/AbstractSequentialFactory.java	2009-11-12 02:47:08 UTC (rev 8266)
+++ trunk/src/main/org/hornetq/core/journal/impl/AbstractSequentialFactory.java	2009-11-12 04:39:27 UTC (rev 8267)
@@ -38,33 +38,87 @@
 
    protected final String journalDir;
 
-   public AbstractSequentialFactory(final String journalDir)
+   protected final TimedBuffer timedBuffer;
+   
+   protected final int bufferSize;
+
+   protected final long bufferTimeout;
+
+
+   public AbstractSequentialFactory(final String journalDir,
+                                    final boolean buffered,
+                                    final int bufferSize,
+                                    final long bufferTimeout,
+                                    final boolean flushOnSync,
+                                    final boolean logRates)
    {
       this.journalDir = journalDir;
+      if (buffered)
+      {
+         timedBuffer = new TimedBuffer(bufferSize, bufferTimeout, flushOnSync, logRates);
+      }
+      else
+      {
+         timedBuffer = null;
+      }
+      this.bufferSize = bufferSize;
+      this.bufferTimeout = bufferTimeout;
    }
 
-   
    public void stop()
    {
+      if (timedBuffer != null)
+      {
+         timedBuffer.stop();
+      }
    }
-   
+
    public void start()
    {
+      if (timedBuffer != null)
+      {
+         timedBuffer.start();
+      }
    }
-   
-   public void activateBuffer(SequentialFile file)
+
+   /* (non-Javadoc)
+    * @see org.hornetq.core.journal.SequentialFileFactory#activate(org.hornetq.core.journal.SequentialFile)
+    */
+   public void activateBuffer(final SequentialFile file)
    {
+      if (timedBuffer != null)
+      {
+         timedBuffer.disableAutoFlush();
+         try
+         {
+            file.setTimedBuffer(timedBuffer);
+         }
+         finally
+         {
+            file.enableAutoFlush();
+         }
+      }
    }
    
-   public void releaseBuffer(ByteBuffer buffer)
+   public void flush()
    {
+      if (timedBuffer != null)
+      {
+         timedBuffer.flush();
+      }
    }
-   
+
    public void deactivateBuffer()
    {
+      if (timedBuffer != null)
+      {
+         timedBuffer.flush();
+         timedBuffer.setObserver(null);
+      }
    }
-   
-   public void flush()
+
+
+   public void releaseBuffer(ByteBuffer buffer)
    {
    }
 

Modified: trunk/src/main/org/hornetq/core/journal/impl/AbstractSequentialFile.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/AbstractSequentialFile.java	2009-11-12 02:47:08 UTC (rev 8266)
+++ trunk/src/main/org/hornetq/core/journal/impl/AbstractSequentialFile.java	2009-11-12 04:39:27 UTC (rev 8267)
@@ -14,8 +14,16 @@
 package org.hornetq.core.journal.impl;
 
 import java.io.File;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
 
+import org.hornetq.core.asyncio.AIOCallback;
+import org.hornetq.core.journal.IOCallback;
 import org.hornetq.core.journal.SequentialFile;
+import org.hornetq.core.journal.SequentialFileFactory;
+import org.hornetq.core.logging.Logger;
+import org.hornetq.core.remoting.spi.HornetQBuffer;
 
 /**
  * A AbstractSequentialFile
@@ -28,6 +36,7 @@
 {
 
    // Constants -----------------------------------------------------
+   private static final Logger log = Logger.getLogger(AbstractSequentialFile.class);
 
    // Attributes ----------------------------------------------------
 
@@ -35,6 +44,21 @@
 
    private final String directory;
 
+   protected final SequentialFileFactory factory;
+
+   protected long fileSize = 0;
+
+   protected final AtomicLong position = new AtomicLong(0);
+
+   protected TimedBuffer timedBuffer;
+
+   /** Instead of having AIOSequentialFile implementing the Observer, I have done it on an inner class.
+    *  This is the class returned to the factory when the file is being activated. */
+   protected final TimedBufferObserver timedBufferObserver = new LocalBufferObserver();
+
+
+
+
    // Static --------------------------------------------------------
 
    // Constructors --------------------------------------------------
@@ -43,11 +67,12 @@
     * @param file
     * @param directory
     */
-   public AbstractSequentialFile(String directory, File file)
+   public AbstractSequentialFile(String directory, File file, SequentialFileFactory factory)
    {
       super();
       this.file = file;
       this.directory = directory;
+      this.factory = factory;
    }
 
    // Public --------------------------------------------------------
@@ -73,7 +98,17 @@
       file.delete();
    }
 
+   public void position(final long pos) throws Exception
+   {
+      position.set(pos);
+   }
 
+   public long position() throws Exception
+   {
+      return position.get();
+   }
+
+
    public final void renameTo(final String newFileName) throws Exception
    {
       close();
@@ -87,6 +122,84 @@
       }
    }
    
+
+   public final boolean fits(int size)
+   {
+      if (timedBuffer == null)
+      {
+         return this.position.get() + size <= fileSize;
+      }
+      else
+      {
+         return timedBuffer.checkSize(size);
+      }
+   }
+
+   public final void disableAutoFlush()
+   {
+      if (timedBuffer != null)
+      {
+         timedBuffer.disableAutoFlush();
+      }
+   }
+
+   public final void enableAutoFlush()
+   {
+      if (timedBuffer != null)
+      {
+         timedBuffer.enableAutoFlush();
+      }
+   }
+
+   public void setTimedBuffer(TimedBuffer buffer)
+   {
+      if (timedBuffer != null)
+      {
+         timedBuffer.setObserver(null);
+      }
+
+      this.timedBuffer = buffer;
+
+      if (buffer != null)
+      {
+         buffer.setObserver(this.timedBufferObserver);
+      }
+
+   }
+   
+   public void write(final HornetQBuffer bytes, final boolean sync, final IOCallback callback) throws Exception
+   {
+      if (timedBuffer != null)
+      {
+         timedBuffer.addBytes(bytes.array(), sync, callback);
+      }
+      else
+      {
+         ByteBuffer buffer = factory.newBuffer(bytes.capacity());
+         buffer.put(bytes.array());
+         buffer.rewind();
+         writeDirect(buffer, sync, callback);
+      }
+   }
+
+   public void write(final HornetQBuffer bytes, final boolean sync) throws Exception
+   {
+      if (sync)
+      {
+         IOCallback completion = SimpleWaitIOCallback.getInstance();
+
+         write(bytes, true, completion);
+
+         completion.waitCompletion();
+      }
+      else
+      {
+         write(bytes, false, DummyCallback.getInstance());
+      }
+   }
+   
+   
+
    // Package protected ---------------------------------------------
 
    // Protected -----------------------------------------------------
@@ -96,8 +209,98 @@
       return file;
    }
 
+
    // Private -------------------------------------------------------
 
    // Inner classes -------------------------------------------------
 
+   protected static class DelegateCallback implements IOCallback
+   {
+      final List<IOCallback> delegates;
+
+      DelegateCallback(List<IOCallback> delegates)
+      {
+         this.delegates = delegates;
+      }
+
+      public void done()
+      {
+         for (IOCallback callback : delegates)
+         {
+            try
+            {
+               callback.done();
+            }
+            catch (Throwable e)
+            {
+               log.warn(e.getMessage(), e);
+            }
+         }
+      }
+
+      public void onError(int errorCode, String errorMessage)
+      {
+         for (AIOCallback callback : delegates)
+         {
+            try
+            {
+               callback.onError(errorCode, errorMessage);
+            }
+            catch (Throwable e)
+            {
+               log.warn(e.getMessage(), e);
+            }
+         }
+      }
+
+      public void waitCompletion() throws Exception
+      {
+      }
+   }
+
+   protected class LocalBufferObserver implements TimedBufferObserver
+   {
+      public void flushBuffer(ByteBuffer buffer, List<IOCallback> callbacks)
+      {
+         buffer.flip();
+
+         if (buffer.limit() == 0)
+         {
+            factory.releaseBuffer(buffer);
+         }
+         else
+         {
+            writeDirect(buffer, true, new DelegateCallback(callbacks));
+         }
+      }
+
+      public ByteBuffer newBuffer(int size, int limit)
+      {
+         size = factory.calculateBlockSize(size);
+         limit = factory.calculateBlockSize(limit);
+
+         ByteBuffer buffer = factory.newBuffer(size);
+         buffer.limit(limit);
+         return buffer;
+      }
+
+      public int getRemainingBytes()
+      {
+         if (fileSize - position.get() > Integer.MAX_VALUE)
+         {
+            return Integer.MAX_VALUE;
+         }
+         else
+         {
+            return (int)(fileSize - position.get());
+         }
+      }
+
+      public String toString()
+      {
+         return "TimedBufferObserver on file (" + getFile().getName() + ")";
+      }
+
+   }
+
 }

Modified: trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java	2009-11-12 02:47:08 UTC (rev 8266)
+++ trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java	2009-11-12 04:39:27 UTC (rev 8267)
@@ -2685,7 +2685,7 @@
 
       bb.rewind();
 
-      sf.write(bb, true);
+      sf.writeDirect(bb, true);
 
       JournalFile jf = new JournalFileImpl(sf, newFileID);
 
@@ -2993,7 +2993,7 @@
 
          bb.rewind();
 
-         sequentialFile.write(bb, true);
+         sequentialFile.writeDirect(bb, true);
       }
 
       JournalFile info = new JournalFileImpl(sequentialFile, fileID);

Modified: trunk/src/main/org/hornetq/core/journal/impl/NIOSequentialFile.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/NIOSequentialFile.java	2009-11-12 02:47:08 UTC (rev 8266)
+++ trunk/src/main/org/hornetq/core/journal/impl/NIOSequentialFile.java	2009-11-12 04:39:27 UTC (rev 8267)
@@ -14,15 +14,15 @@
 package org.hornetq.core.journal.impl;
 
 import java.io.File;
+import java.io.IOException;
 import java.io.RandomAccessFile;
 import java.nio.ByteBuffer;
 import java.nio.channels.FileChannel;
-import java.util.concurrent.atomic.AtomicLong;
 
 import org.hornetq.core.journal.IOCallback;
 import org.hornetq.core.journal.SequentialFile;
+import org.hornetq.core.journal.SequentialFileFactory;
 import org.hornetq.core.logging.Logger;
-import org.hornetq.core.remoting.spi.HornetQBuffer;
 
 /**
  * 
@@ -36,22 +36,18 @@
 {
    private static final Logger log = Logger.getLogger(NIOSequentialFile.class);
 
-   private long fileSize = 0;
-
    private FileChannel channel;
 
    private RandomAccessFile rfile;
 
-   private final AtomicLong position = new AtomicLong(0);
-
-   public NIOSequentialFile(final String directory, final String fileName)
+   public NIOSequentialFile(final SequentialFileFactory factory, final String directory, final String fileName)
    {
-      super(directory, new File(directory + "/" + fileName));
+      super(directory, new File(directory + "/" + fileName), factory);
    }
 
-   public NIOSequentialFile(File file)
+   public NIOSequentialFile(final SequentialFileFactory factory, final File file)
    {
-      super(file.getParent(), new File(file.getPath()));
+      super(file.getParent(), new File(file.getPath()), factory);
    }
 
    public int getAlignment()
@@ -64,11 +60,6 @@
       return position;
    }
 
-   public boolean fits(final int size)
-   {
-      return this.position.get() + size <= fileSize;
-   }
-
    public synchronized boolean isOpen()
    {
       return channel != null;
@@ -136,7 +127,7 @@
 
       notifyAll();
    }
-   
+
    public int read(final ByteBuffer bytes) throws Exception
    {
       return read(bytes, null);
@@ -147,11 +138,14 @@
       try
       {
          int bytesRead = channel.read(bytes);
+         
          if (callback != null)
          {
             callback.done();
          }
+         
          bytes.flip();
+         
          return bytesRead;
       }
       catch (Exception e)
@@ -166,53 +160,6 @@
 
    }
 
-   public void write(final HornetQBuffer bytes, final boolean sync) throws Exception
-   {
-      write(ByteBuffer.wrap(bytes.array()), sync);
-   }
-
-   public void write(final HornetQBuffer bytes, final boolean sync, final IOCallback callback) throws Exception
-   {
-      write(ByteBuffer.wrap(bytes.array()), sync, callback);
-   }
-
-   public void write(final ByteBuffer bytes, final boolean sync) throws Exception
-   {
-      position.addAndGet(bytes.limit());
-
-      channel.write(bytes);
-
-      if (sync)
-      {
-         sync();
-      }
-   }
-
-   public void write(final ByteBuffer bytes, final boolean sync, final IOCallback callback) throws Exception
-   {
-      try
-      {
-         position.addAndGet(bytes.limit());
-
-         channel.write(bytes);
-
-         if (sync)
-         {
-            sync();
-         }
-
-         if (callback != null)
-         {
-            callback.done();
-         }
-      }
-      catch (Exception e)
-      {
-         callback.onError(-1, e.getMessage());
-         throw e;
-      }
-   }
-
    public void sync() throws Exception
    {
       if (channel != null)
@@ -235,44 +182,64 @@
 
    public void position(final long pos) throws Exception
    {
+      super.position(pos);
       channel.position(pos);
-      position.set(pos);
    }
 
-   public long position() throws Exception
-   {
-      return position.get();
-   }
-
    @Override
    public String toString()
    {
       return "NIOSequentialFile " + getFile();
    }
 
-   /* (non-Javadoc)
-    * @see org.hornetq.core.journal.SequentialFile#setBuffering(boolean)
-    */
-   public void setBuffering(boolean buffering)
+   public SequentialFile copy()
    {
+      return new NIOSequentialFile(factory, getFile());
    }
 
-   /* (non-Javadoc)
-    * @see org.hornetq.core.journal.SequentialFile#lockBuffer()
-    */
-   public void disableAutoFlush()
+   public void writeDirect(final ByteBuffer bytes, final boolean sync, final IOCallback callback)
    {
+      if (callback == null)
+      {
+         throw new NullPointerException("callback parameter need to be set");
+      }
+      
+      try
+      {
+         internalWrite(bytes, sync, callback);
+      }
+      catch (Exception e)
+      {
+         callback.onError(-1, e.getMessage());
+      }
    }
 
-   /* (non-Javadoc)
-    * @see org.hornetq.core.journal.SequentialFile#unlockBuffer()
-    */
-   public void enableAutoFlush()
+   public void writeDirect(final ByteBuffer bytes, final boolean sync) throws Exception
    {
+      internalWrite(bytes, sync, null);
    }
 
-   public SequentialFile copy()
+   /**
+    * @param bytes
+    * @param sync
+    * @param callback
+    * @throws IOException
+    * @throws Exception
+    */
+   private void internalWrite(final ByteBuffer bytes, final boolean sync, final IOCallback callback) throws Exception
    {
-      return new NIOSequentialFile(getFile());
+      position.addAndGet(bytes.limit());
+
+      channel.write(bytes);
+
+      if (sync)
+      {
+         sync();
+      }
+
+      if (callback != null)
+      {
+         callback.done();
+      }
    }
 }

Modified: trunk/src/main/org/hornetq/core/journal/impl/NIOSequentialFileFactory.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/NIOSequentialFileFactory.java	2009-11-12 02:47:08 UTC (rev 8266)
+++ trunk/src/main/org/hornetq/core/journal/impl/NIOSequentialFileFactory.java	2009-11-12 04:39:27 UTC (rev 8267)
@@ -15,6 +15,7 @@
 
 import java.nio.ByteBuffer;
 
+import org.hornetq.core.config.impl.ConfigurationImpl;
 import org.hornetq.core.journal.SequentialFile;
 import org.hornetq.core.journal.SequentialFileFactory;
 import org.hornetq.core.logging.Logger;
@@ -31,25 +32,46 @@
 {
    private static final Logger log = Logger.getLogger(NIOSequentialFileFactory.class);
 
+
    public NIOSequentialFileFactory(final String journalDir)
    {
-      super(journalDir);
+      this(journalDir,
+           false,
+           ConfigurationImpl.DEFAULT_JOURNAL_AIO_BUFFER_SIZE,
+           ConfigurationImpl.DEFAULT_JOURNAL_AIO_BUFFER_TIMEOUT,
+           ConfigurationImpl.DEFAULT_JOURNAL_AIO_FLUSH_SYNC,
+           false);
+   }
 
-      if (journalDir == null)
-      {
-         new Exception("journalDir is null").printStackTrace();
-      }
+   public NIOSequentialFileFactory(final String journalDir, boolean buffered)
+   {
+      this(journalDir,
+           buffered,
+           ConfigurationImpl.DEFAULT_JOURNAL_AIO_BUFFER_SIZE,
+           ConfigurationImpl.DEFAULT_JOURNAL_AIO_BUFFER_TIMEOUT,
+           ConfigurationImpl.DEFAULT_JOURNAL_AIO_FLUSH_SYNC,
+           false);
    }
 
+   public NIOSequentialFileFactory(final String journalDir,
+                                   final boolean buffered,
+                                   final int bufferSize,
+                                   final long bufferTimeout,
+                                   final boolean flushOnSync,
+                                   final boolean logRates)
+   {
+      super(journalDir, buffered, bufferSize, bufferTimeout, flushOnSync, logRates);
+   }
+
    // maxIO is ignored on NIO
    public SequentialFile createSequentialFile(final String fileName, final int maxIO)
    {
-      return new NIOSequentialFile(journalDir, fileName);
+      return new NIOSequentialFile(this, journalDir, fileName);
    }
 
    public boolean isSupportsCallbacks()
    {
-      return false;
+      return timedBuffer != null;
    }
 
    public ByteBuffer newBuffer(final int size)

Copied: trunk/src/main/org/hornetq/core/journal/impl/TimedBuffer.java (from rev 8261, trunk/src/main/org/hornetq/core/asyncio/impl/TimedBuffer.java)
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/TimedBuffer.java	                        (rev 0)
+++ trunk/src/main/org/hornetq/core/journal/impl/TimedBuffer.java	2009-11-12 04:39:27 UTC (rev 8267)
@@ -0,0 +1,400 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.journal.impl;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.hornetq.core.buffers.ChannelBuffers;
+import org.hornetq.core.journal.IOCallback;
+import org.hornetq.core.logging.Logger;
+import org.hornetq.core.remoting.spi.HornetQBuffer;
+import org.hornetq.utils.VariableLatch;
+
+/**
+ * A TimedBuffer
+ *
+ * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ */
+public class TimedBuffer
+{
+   // Constants -----------------------------------------------------
+
+   private static final Logger log = Logger.getLogger(TimedBuffer.class);
+
+   // Attributes ----------------------------------------------------
+
+   private TimedBufferObserver bufferObserver;
+
+   // This is used to pause and resume the timer
+   // This is a reusable Latch, that uses java.util.concurrent base classes
+   private final VariableLatch latchTimer = new VariableLatch();
+
+   private CheckTimer timerRunnable = new CheckTimer();
+
+   private final int bufferSize;
+
+   private final HornetQBuffer buffer;
+
+   private int bufferLimit = 0;
+
+   private List<IOCallback> callbacks;
+
+   private final Lock lock = new ReentrantReadWriteLock().writeLock();
+
+   // used to measure inactivity. This buffer will be automatically flushed when more than timeout inactive
+   private volatile boolean active = false;
+
+   private final long timeout;
+
+   // used to measure sync requests. When a sync is requested, it shouldn't take more than timeout to happen
+   private volatile boolean pendingSync = false;
+
+   private Thread timerThread;
+
+   private volatile boolean started;
+
+   private final boolean flushOnSync;
+
+   // for logging write rates
+
+   private final boolean logRates;
+
+   private volatile long bytesFlushed;
+
+   private Timer logRatesTimer;
+
+   private TimerTask logRatesTimerTask;
+
+   private long lastExecution;
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   // Public --------------------------------------------------------
+
+   public TimedBuffer(final int size, final long timeout, final boolean flushOnSync, final boolean logRates)
+   {
+      bufferSize = size;
+      this.logRates = logRates;
+      if (logRates)
+      {
+         this.logRatesTimer = new Timer(true);
+      }
+      // Setting the interval for nano-sleeps
+
+      buffer = ChannelBuffers.buffer(bufferSize);
+      buffer.clear();
+      bufferLimit = 0;
+
+      callbacks = new ArrayList<IOCallback>();
+      this.flushOnSync = flushOnSync;
+      latchTimer.up();
+      this.timeout = timeout;
+   }
+
+   public synchronized void start()
+   {
+      if (started)
+      {
+         return;
+      }
+
+      timerRunnable = new CheckTimer();
+
+      timerThread = new Thread(timerRunnable, "hornetq-aio-timer");
+
+      timerThread.start();
+
+      if (logRates)
+      {
+         logRatesTimerTask = new LogRatesTimerTask();
+
+         logRatesTimer.scheduleAtFixedRate(logRatesTimerTask, 2000, 2000);
+      }
+
+      started = true;
+   }
+
+   public void stop()
+   {
+      if (!started)
+      {
+         return;
+      }
+
+      this.flush();
+
+      this.bufferObserver = null;
+
+      latchTimer.down();
+
+      timerRunnable.close();
+
+      if (logRates)
+      {
+         logRatesTimerTask.cancel();
+      }
+
+      while (timerThread.isAlive())
+      {
+         try
+         {
+            timerThread.join();
+         }
+         catch (InterruptedException e)
+         {
+         }
+      }
+
+      started = false;
+   }
+
+   public synchronized void setObserver(TimedBufferObserver observer)
+   {
+      if (this.bufferObserver != null)
+      {
+         flush();
+      }
+
+      this.bufferObserver = observer;
+   }
+
+   public void disableAutoFlush()
+   {
+      lock.lock();
+   }
+
+   public void enableAutoFlush()
+   {
+      lock.unlock();
+   }
+
+   /**
+    * Verify if the size fits the buffer
+    * @param sizeChecked
+    * @return
+    */
+   public synchronized boolean checkSize(final int sizeChecked)
+   {
+      if (sizeChecked > bufferSize)
+      {
+         throw new IllegalStateException("Can't write records bigger than the bufferSize(" + bufferSize +
+                                         ") on the journal");
+      }
+
+      if (bufferLimit == 0 || buffer.writerIndex() + sizeChecked > bufferLimit)
+      {
+         flush();
+
+         final int remaining = bufferObserver.getRemainingBytes();
+
+         if (sizeChecked > remaining)
+         {
+            return false;
+         }
+         else
+         {
+            buffer.clear();
+            bufferLimit = Math.min(remaining, bufferSize);
+            return true;
+         }
+      }
+      else
+      {
+         return true;
+      }
+   }
+
+   public synchronized void addBytes(final byte[] bytes, final boolean sync, final IOCallback callback)
+   {
+      if (buffer.writerIndex() == 0)
+      {
+         // Resume latch
+         latchTimer.down();
+      }
+
+      buffer.writeBytes(bytes);
+
+      callbacks.add(callback);
+
+      active = true;
+
+      if (sync)
+      {
+         if (flushOnSync)
+         {
+            flush();
+         }
+         else
+         {
+            // We should flush on the next timeout, no matter what other activity happens on the buffer
+            if (!pendingSync)
+            {
+               pendingSync = true;
+            }
+         }
+      }
+
+      if (buffer.writerIndex() == bufferLimit)
+      {
+         flush();
+      }
+   }
+
+   public synchronized void flush()
+   {
+      if (buffer.writerIndex() > 0)
+      {
+         latchTimer.up();
+
+         int pos = buffer.writerIndex();
+
+         if (logRates)
+         {
+            bytesFlushed += pos;
+         }
+
+         ByteBuffer directBuffer = bufferObserver.newBuffer(bufferSize, pos);
+
+         // Putting a byteArray on a native buffer is much faster, since it will do in a single native call.
+         // Using directBuffer.put(buffer) would make several append calls for each byte
+
+         directBuffer.put(buffer.array(), 0, pos);
+
+         bufferObserver.flushBuffer(directBuffer, callbacks);
+
+         callbacks = new ArrayList<IOCallback>();
+
+         active = false;
+         pendingSync = false;
+
+         buffer.clear();
+         bufferLimit = 0;
+      }
+   }
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   // Private -------------------------------------------------------
+
+   private void checkTimer()
+   {
+      // if inactive for more than the timeout
+      // of if a sync happened at more than the the timeout ago
+      if (!active || pendingSync)
+      {
+         lock.lock();
+         try
+         {
+            if (bufferObserver != null)
+            {
+               flush();
+            }
+         }
+         finally
+         {
+            lock.unlock();
+         }
+      }
+
+      // Set the buffer as inactive.. we will flush the buffer next tick if nothing change this
+      active = false;
+   }
+
+   // Inner classes -------------------------------------------------
+
+   private class LogRatesTimerTask extends TimerTask
+   {
+      private boolean closed;
+
+      @Override
+      public synchronized void run()
+      {
+         if (!closed)
+         {
+            long now = System.currentTimeMillis();
+
+            if (lastExecution != 0)
+            {
+               double rate = 1000 * ((double)bytesFlushed) / (now - lastExecution);
+               log.info("Write rate = " + rate + " bytes / sec or " + (long)(rate / (1024 * 1024)) + " MiB / sec");
+            }
+
+            lastExecution = now;
+
+            bytesFlushed = 0;
+         }
+      }
+
+      public synchronized boolean cancel()
+      {
+         closed = true;
+
+         return super.cancel();
+      }
+   }
+
+   private class CheckTimer implements Runnable
+   {
+      private volatile boolean closed = false;
+
+      public void run()
+      {
+         while (!closed)
+         {
+            try
+            {
+               latchTimer.waitCompletion();
+            }
+            catch (InterruptedException ignored)
+            {
+            }
+
+            sleep();
+
+            checkTimer();
+
+         }
+      }
+
+      /**
+       * 
+       */
+      private void sleep()
+      {
+         long time = System.nanoTime() + timeout;
+         while (time > System.nanoTime())
+         {
+            Thread.yield();
+         }
+      }
+
+      public void close()
+      {
+         closed = true;
+      }
+   }
+
+}

Copied: trunk/src/main/org/hornetq/core/journal/impl/TimedBufferObserver.java (from rev 8261, trunk/src/main/org/hornetq/core/asyncio/impl/TimedBufferObserver.java)
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/TimedBufferObserver.java	                        (rev 0)
+++ trunk/src/main/org/hornetq/core/journal/impl/TimedBufferObserver.java	2009-11-12 04:39:27 UTC (rev 8267)
@@ -0,0 +1,60 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+
+package org.hornetq.core.journal.impl;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+
+import org.hornetq.core.journal.IOCallback;
+
+/**
+ * A TimedBufferObserver
+ *
+ * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public interface TimedBufferObserver
+{
+
+   // Constants -----------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   // Public --------------------------------------------------------
+   
+   public void flushBuffer(ByteBuffer buffer, List<IOCallback> callbacks);
+   
+   
+   /** Return the number of remaining bytes that still fit on the observer (file) */
+   public int getRemainingBytes();
+   
+   
+   public ByteBuffer newBuffer(int size, int limit);
+   
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+
+}

Modified: trunk/src/main/org/hornetq/core/paging/impl/PageImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/paging/impl/PageImpl.java	2009-11-12 02:47:08 UTC (rev 8266)
+++ trunk/src/main/org/hornetq/core/paging/impl/PageImpl.java	2009-11-12 04:39:27 UTC (rev 8267)
@@ -157,18 +157,12 @@
 
       buffer.rewind();
 
-      file.write(buffer, false);
+      file.writeDirect(buffer, false);
 
       numberOfMessages.incrementAndGet();
       size.addAndGet(buffer.limit());
       
       storageManager.pageWrite(message, pageId);
-      
-      if (message.getMessage(null).isLargeMessage())
-      {
-         // If we don't sync on large messages we could have the risk of unattended files on disk
-         sync();
-      }
    }
 
    public void sync() throws Exception

Modified: trunk/src/main/org/hornetq/core/paging/impl/PagingStoreFactoryNIO.java
===================================================================
--- trunk/src/main/org/hornetq/core/paging/impl/PagingStoreFactoryNIO.java	2009-11-12 02:47:08 UTC (rev 8266)
+++ trunk/src/main/org/hornetq/core/paging/impl/PagingStoreFactoryNIO.java	2009-11-12 04:39:27 UTC (rev 8267)
@@ -216,7 +216,7 @@
 
    protected SequentialFileFactory newFileFactory(final String directoryName)
    {
-      return new NIOSequentialFileFactory(directory + File.separatorChar + directoryName);
+      return new NIOSequentialFileFactory(directory + File.separatorChar + directoryName, false);
    }
 
    // Private -------------------------------------------------------

Modified: trunk/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java	2009-11-12 02:47:08 UTC (rev 8266)
+++ trunk/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java	2009-11-12 04:39:27 UTC (rev 8267)
@@ -541,13 +541,6 @@
 
       file.open();
 
-      long size = file.size();
-
-      if (fileFactory.isSupportsCallbacks() && size < pageSize)
-      {
-         file.fill((int)size, (int)(pageSize - size), (byte)0);
-      }
-
       file.position(0);
 
       file.close();

Modified: trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java	2009-11-12 02:47:08 UTC (rev 8266)
+++ trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java	2009-11-12 04:39:27 UTC (rev 8267)
@@ -227,7 +227,12 @@
          if (!AIOSequentialFileFactory.isSupported())
          {
             log.warn("AIO wasn't located on this platform, it will fall back to using pure Java NIO. " + "If your platform is Linux, install LibAIO to enable the AIO journal");
-            journalFF = new NIOSequentialFileFactory(journalDir);
+            journalFF = new NIOSequentialFileFactory(journalDir,
+                                                     true,
+                                                     config.getAIOBufferSize(),
+                                                     config.getAIOBufferTimeout(),
+                                                     config.isAIOFlushOnSync(),
+                                                     config.isLogJournalWriteRate());
          }
          else
          {
@@ -278,7 +283,7 @@
 
       largeMessagesDirectory = config.getLargeMessagesDirectory();
 
-      largeMessagesFactory = new NIOSequentialFileFactory(largeMessagesDirectory);
+      largeMessagesFactory = new NIOSequentialFileFactory(largeMessagesDirectory, false);
 
       perfBlastPages = config.getJournalPerfBlastPages();
    }
@@ -405,7 +410,7 @@
    {
       file.position(file.size());
 
-      file.write(ByteBuffer.wrap(bytes), false);
+      file.writeDirect(ByteBuffer.wrap(bytes), false);
 
       if (isReplicated())
       {
@@ -679,10 +684,10 @@
    }
 
    public JournalLoadInformation loadMessageJournal(final PostOffice postOffice,
-                                  final PagingManager pagingManager,
-                                  final ResourceManager resourceManager,
-                                  final Map<Long, Queue> queues,
-                                  final Map<SimpleString, List<Pair<byte[], Long>>> duplicateIDMap) throws Exception
+                                                    final PagingManager pagingManager,
+                                                    final ResourceManager resourceManager,
+                                                    final Map<Long, Queue> queues,
+                                                    final Map<SimpleString, List<Pair<byte[], Long>>> duplicateIDMap) throws Exception
    {
       List<RecordInfo> records = new ArrayList<RecordInfo>();
 
@@ -690,8 +695,10 @@
 
       Map<Long, ServerMessage> messages = new HashMap<Long, ServerMessage>();
 
-      JournalLoadInformation info = messageJournal.load(records, preparedTransactions, new LargeMessageTXFailureCallback(messages));
-      
+      JournalLoadInformation info = messageJournal.load(records,
+                                                        preparedTransactions,
+                                                        new LargeMessageTXFailureCallback(messages));
+
       ArrayList<LargeServerMessage> largeMessages = new ArrayList<LargeServerMessage>();
 
       Map<Long, Map<Long, AddMessageRecord>> queueMap = new HashMap<Long, Map<Long, AddMessageRecord>>();
@@ -919,7 +926,7 @@
       {
          messageJournal.perfBlast(perfBlastPages);
       }
-      
+
       return info;
    }
 
@@ -981,7 +988,7 @@
 
          // Use same method as load message journal to prune out acks, so they don't get added.
          // Then have reacknowledge(tx) methods on queue, which needs to add the page size
-  
+
          // first get any sent messages for this tx and recreate
          for (RecordInfo record : preparedTransaction.records)
          {
@@ -990,11 +997,11 @@
             HornetQBuffer buff = ChannelBuffers.wrappedBuffer(data);
 
             byte recordType = record.getUserRecordType();
-            
+
             switch (recordType)
             {
                case ADD_LARGE_MESSAGE:
-               {                 
+               {
                   messages.put(record.id, parseLargeMessage(messages, buff));
 
                   break;
@@ -1011,7 +1018,7 @@
                }
                case ADD_REF:
                {
-                
+
                   long messageID = record.id;
 
                   RefEncoding encoding = new RefEncoding();
@@ -1190,7 +1197,8 @@
       bindingsJournal.appendDeleteRecord(queueBindingID, true);
    }
 
-   public JournalLoadInformation loadBindingJournal(final List<QueueBindingInfo> queueBindingInfos, final List<GroupingInfo> groupingInfos) throws Exception
+   public JournalLoadInformation loadBindingJournal(final List<QueueBindingInfo> queueBindingInfos,
+                                                    final List<GroupingInfo> groupingInfos) throws Exception
    {
       List<RecordInfo> records = new ArrayList<RecordInfo>();
 
@@ -1240,7 +1248,7 @@
             throw new IllegalStateException("Invalid record type " + rec);
          }
       }
-      
+
       return bindingsInfo;
    }
 
@@ -1304,7 +1312,7 @@
       JournalLoadInformation[] info = new JournalLoadInformation[2];
       info[0] = bindingsJournal.loadInternalOnly();
       info[1] = messageJournal.loadInternalOnly();
-      
+
       return info;
    }
 

Modified: trunk/src/main/org/hornetq/core/transaction/impl/TransactionImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/transaction/impl/TransactionImpl.java	2009-11-12 02:47:08 UTC (rev 8266)
+++ trunk/src/main/org/hornetq/core/transaction/impl/TransactionImpl.java	2009-11-12 04:39:27 UTC (rev 8267)
@@ -226,7 +226,7 @@
                      catch (Exception e)
                      {
                         // https://jira.jboss.org/jira/browse/HORNETQ-188
-                        // After commit shouldn't thow an exception
+                        // After commit shouldn't throw an exception
                         log.warn(e.getMessage(), e);
                      }
                   }

Modified: trunk/tests/src/org/hornetq/tests/integration/jms/server/JMSServerStartStopTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/jms/server/JMSServerStartStopTest.java	2009-11-12 02:47:08 UTC (rev 8266)
+++ trunk/tests/src/org/hornetq/tests/integration/jms/server/JMSServerStartStopTest.java	2009-11-12 04:39:27 UTC (rev 8267)
@@ -59,7 +59,7 @@
    
    public void testStopStart1() throws Exception
    {
-      final int numMessages = 5;
+      final int numMessages = 5 ;
       
       for (int j = 0; j < numMessages; j++)
       {

Deleted: trunk/tests/src/org/hornetq/tests/integration/journal/MultiThreadCompactorTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/journal/MultiThreadCompactorTest.java	2009-11-12 02:47:08 UTC (rev 8266)
+++ trunk/tests/src/org/hornetq/tests/integration/journal/MultiThreadCompactorTest.java	2009-11-12 04:39:27 UTC (rev 8267)
@@ -1,552 +0,0 @@
-/*
- * Copyright 2009 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *    http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied.  See the License for the specific language governing
- * permissions and limitations under the License.
- */
-
-package org.hornetq.tests.integration.journal;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.CountDownLatch;
-
-import javax.transaction.xa.XAException;
-import javax.transaction.xa.XAResource;
-import javax.transaction.xa.Xid;
-
-import org.hornetq.core.asyncio.impl.AsynchronousFileImpl;
-import org.hornetq.core.buffers.ChannelBuffers;
-import org.hornetq.core.client.ClientConsumer;
-import org.hornetq.core.client.ClientMessage;
-import org.hornetq.core.client.ClientProducer;
-import org.hornetq.core.client.ClientSession;
-import org.hornetq.core.client.ClientSessionFactory;
-import org.hornetq.core.config.Configuration;
-import org.hornetq.core.config.impl.ConfigurationImpl;
-import org.hornetq.core.exception.HornetQException;
-import org.hornetq.core.journal.PreparedTransactionInfo;
-import org.hornetq.core.journal.RecordInfo;
-import org.hornetq.core.journal.impl.JournalImpl;
-import org.hornetq.core.journal.impl.NIOSequentialFileFactory;
-import org.hornetq.core.server.HornetQServer;
-import org.hornetq.core.server.JournalType;
-import org.hornetq.tests.util.ServiceTestBase;
-import org.hornetq.utils.SimpleString;
-
-/**
- * A MultiThreadConsumerStressTest
- *
- * @author <mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
- *
- *
- */
-public class MultiThreadCompactorTest extends ServiceTestBase
-{
-
-   // Constants -----------------------------------------------------
-
-   // Attributes ----------------------------------------------------
-
-   final SimpleString ADDRESS = new SimpleString("SomeAddress");
-
-   final SimpleString QUEUE = new SimpleString("SomeQueue");
-
-   private HornetQServer server;
-
-   private ClientSessionFactory sf;
-
-   protected int getNumberOfIterations()
-   {
-      return 3;
-   }
-
-   protected void setUp() throws Exception
-   {
-      super.setUp();
-   }
-
-   protected void tearDown() throws Exception
-   {
-      stopServer();
-      super.tearDown();
-   }
-
-   public void testMultiThreadCompact() throws Throwable
-   {
-      setupServer(JournalType.ASYNCIO);
-      for (int i = 0; i < getNumberOfIterations(); i++)
-      {
-         System.out.println("######################################");
-         System.out.println("test # " + i);
-
-         internalTestProduceAndConsume();
-         stopServer();
-
-         NIOSequentialFileFactory factory = new NIOSequentialFileFactory(getJournalDir());
-         JournalImpl journal = new JournalImpl(ConfigurationImpl.DEFAULT_JOURNAL_FILE_SIZE,
-                                               2,
-                                               0,
-                                               0,
-                                               factory,
-                                               "hornetq-data",
-                                               "hq",
-                                               100);
-
-         List<RecordInfo> committedRecords = new ArrayList<RecordInfo>();
-         List<PreparedTransactionInfo> preparedTransactions = new ArrayList<PreparedTransactionInfo>();
-
-         journal.start();
-         journal.load(committedRecords, preparedTransactions, null);
-
-         assertEquals(0, committedRecords.size());
-         assertEquals(0, preparedTransactions.size());
-
-         System.out.println("DataFiles = " + journal.getDataFilesCount());
-
-         if (i % 2 == 0 && i > 0)
-         {
-            System.out.println("DataFiles = " + journal.getDataFilesCount());
-            journal.forceMoveNextFile();
-            assertEquals(0, journal.getDataFilesCount());
-         }
-
-         journal.stop();
-         journal = null;
-
-         setupServer(JournalType.ASYNCIO);
-
-      }
-   }
-
-   /**
-    * @param xid
-    * @throws HornetQException
-    * @throws XAException
-    */
-   private void addEmptyTransaction(Xid xid) throws HornetQException, XAException
-   {
-      ClientSessionFactory sf = createInVMFactory();
-      sf.setBlockOnNonPersistentSend(false);
-      sf.setBlockOnAcknowledge(false);
-      ClientSession session = sf.createSession(true, false, false);
-      session.start(xid, XAResource.TMNOFLAGS);
-      session.end(xid, XAResource.TMSUCCESS);
-      session.prepare(xid);
-      session.close();
-      sf.close();
-   }
-
-   private void checkEmptyXID(Xid xid) throws HornetQException, XAException
-   {
-      ClientSessionFactory sf = createInVMFactory();
-      sf.setBlockOnNonPersistentSend(false);
-      sf.setBlockOnAcknowledge(false);
-      ClientSession session = sf.createSession(true, false, false);
-
-      Xid[] xids = session.recover(XAResource.TMSTARTRSCAN);
-      assertEquals(1, xids.length);
-      assertEquals(xid, xids[0]);
-
-      session.rollback(xid);
-
-      session.close();
-      sf.close();
-   }
-
-   public void internalTestProduceAndConsume() throws Throwable
-   {
-
-      addBogusData(100, "LAZY-QUEUE");
-
-      Xid xid = null;
-      xid = newXID();
-      addEmptyTransaction(xid);
-
-      System.out.println(getTemporaryDir());
-      boolean transactionalOnConsume = true;
-      boolean transactionalOnProduce = true;
-      int numberOfConsumers = 30;
-      // this test assumes numberOfConsumers == numberOfProducers
-      int numberOfProducers = numberOfConsumers;
-      int produceMessage = 5000;
-      int commitIntervalProduce = 100;
-      int consumeMessage = (int)(produceMessage * 0.9);
-      int commitIntervalConsume = 100;
-
-      System.out.println("ConsumeMessages = " + consumeMessage + " produceMessage = " + produceMessage);
-
-      // Number of messages expected to be received after restart
-      int numberOfMessagesExpected = (produceMessage - consumeMessage) * numberOfConsumers;
-
-      CountDownLatch latchReady = new CountDownLatch(numberOfConsumers + numberOfProducers);
-
-      CountDownLatch latchStart = new CountDownLatch(1);
-
-      ArrayList<BaseThread> threads = new ArrayList<BaseThread>();
-
-      ProducerThread[] prod = new ProducerThread[numberOfProducers];
-      for (int i = 0; i < numberOfProducers; i++)
-      {
-         prod[i] = new ProducerThread(i,
-                                      latchReady,
-                                      latchStart,
-                                      transactionalOnConsume,
-                                      produceMessage,
-                                      commitIntervalProduce);
-         prod[i].start();
-         threads.add(prod[i]);
-      }
-
-      ConsumerThread[] cons = new ConsumerThread[numberOfConsumers];
-
-      for (int i = 0; i < numberOfConsumers; i++)
-      {
-         cons[i] = new ConsumerThread(i,
-                                      latchReady,
-                                      latchStart,
-                                      transactionalOnProduce,
-                                      consumeMessage,
-                                      commitIntervalConsume);
-         cons[i].start();
-         threads.add(cons[i]);
-      }
-
-      latchReady.await();
-      latchStart.countDown();
-
-      for (BaseThread t : threads)
-      {
-         t.join();
-         if (t.e != null)
-         {
-            throw t.e;
-         }
-      }
-
-      server.stop();
-
-      setupServer(JournalType.ASYNCIO);
-
-      drainQueue(numberOfMessagesExpected, QUEUE);
-      drainQueue(100, new SimpleString("LAZY-QUEUE"));
-
-      server.stop();
-
-      setupServer(JournalType.ASYNCIO);
-      drainQueue(0, QUEUE);
-      drainQueue(0, new SimpleString("LAZY-QUEUE"));
-
-      checkEmptyXID(xid);
-
-   }
-
-   /**
-    * @param numberOfMessagesExpected
-    * @param queue
-    * @throws HornetQException
-    */
-   private void drainQueue(int numberOfMessagesExpected, SimpleString queue) throws HornetQException
-   {
-      ClientSession sess = sf.createSession(true, true);
-
-      ClientConsumer consumer = sess.createConsumer(queue);
-
-      sess.start();
-
-      for (int i = 0; i < numberOfMessagesExpected; i++)
-      {
-         ClientMessage msg = consumer.receive(5000);
-         assertNotNull(msg);
-
-         if (i % 100 == 0)
-         {
-            // System.out.println("Received #" + i + "  on thread after start");
-         }
-         msg.acknowledge();
-      }
-
-      assertNull(consumer.receiveImmediate());
-
-      sess.close();
-   }
-
-   /**
-    * @throws HornetQException
-    */
-   private void addBogusData(int nmessages, String queue) throws HornetQException
-   {
-      ClientSession session = sf.createSession(false, false);
-      try
-      {
-         session.createQueue(queue, queue, true);
-      }
-      catch (Exception ignored)
-      {
-      }
-
-      ClientProducer prod = session.createProducer(queue);
-      for (int i = 0; i < nmessages; i++)
-      {
-         ClientMessage msg = session.createClientMessage(true);
-         msg.getBody().writeBytes(new byte[1024]);
-         prod.send(msg);
-      }
-      session.commit();
-
-      session.start();
-
-      ClientConsumer cons = session.createConsumer(queue);
-      assertNotNull(cons.receive(1000));
-      session.rollback();
-      session.close();
-   }
-
-   protected void stopServer() throws Exception
-   {
-      try
-      {
-         if (server != null && server.isStarted())
-         {
-            server.stop();
-         }
-      }
-      catch (Throwable e)
-      {
-         e.printStackTrace(System.out); // System.out => junit reports
-      }
-
-      sf = null;
-   }
-
-   private void setupServer(JournalType journalType) throws Exception, HornetQException
-   {
-      if (!AsynchronousFileImpl.isLoaded())
-      {
-         journalType = JournalType.NIO;
-      }
-      if (server == null)
-      {
-         Configuration config = createDefaultConfig(true);
-         config.setJournalFileSize(ConfigurationImpl.DEFAULT_JOURNAL_FILE_SIZE);
-
-         config.setJournalType(journalType);
-         config.setJMXManagementEnabled(false);
-
-         config.setJournalFileSize(ConfigurationImpl.DEFAULT_JOURNAL_FILE_SIZE);
-         config.setJournalMinFiles(ConfigurationImpl.DEFAULT_JOURNAL_MIN_FILES);
-
-         config.setJournalCompactMinFiles(ConfigurationImpl.DEFAULT_JOURNAL_COMPACT_MIN_FILES);
-         config.setJournalCompactPercentage(ConfigurationImpl.DEFAULT_JOURNAL_COMPACT_PERCENTAGE);
-
-         // This test is supposed to not sync.. All the ACKs are async, and it was supposed to not sync
-         config.setJournalSyncNonTransactional(false);
-
-         // config.setJournalCompactMinFiles(0);
-         // config.setJournalCompactPercentage(0);
-
-         server = createServer(true, config);
-      }
-
-      server.start();
-
-      sf = createNettyFactory();
-      sf.setBlockOnPersistentSend(false);
-      sf.setBlockOnAcknowledge(false);
-
-      ClientSession sess = sf.createSession();
-
-      try
-      {
-         sess.createQueue(ADDRESS, QUEUE, true);
-      }
-      catch (Exception ignored)
-      {
-      }
-
-      sess.close();
-
-      sf = createInVMFactory();
-   }
-
-   // Static --------------------------------------------------------
-
-   // Constructors --------------------------------------------------
-
-   // Public --------------------------------------------------------
-
-   class BaseThread extends Thread
-   {
-      Throwable e;
-
-      final CountDownLatch latchReady;
-
-      final CountDownLatch latchStart;
-
-      final int numberOfMessages;
-
-      final int commitInterval;
-
-      final boolean transactional;
-
-      BaseThread(String name,
-                 CountDownLatch latchReady,
-                 CountDownLatch latchStart,
-                 boolean transactional,
-                 int numberOfMessages,
-                 int commitInterval)
-      {
-         super(name);
-         this.transactional = transactional;
-         this.latchReady = latchReady;
-         this.latchStart = latchStart;
-         this.commitInterval = commitInterval;
-         this.numberOfMessages = numberOfMessages;
-      }
-
-   }
-
-   class ProducerThread extends BaseThread
-   {
-      ProducerThread(int id,
-                     CountDownLatch latchReady,
-                     CountDownLatch latchStart,
-                     boolean transactional,
-                     int numberOfMessages,
-                     int commitInterval)
-      {
-         super("ClientProducer:" + id, latchReady, latchStart, transactional, numberOfMessages, commitInterval);
-      }
-
-      public void run()
-      {
-         ClientSession session = null;
-         latchReady.countDown();
-         try
-         {
-            latchStart.await();
-            session = sf.createSession(!transactional, !transactional);
-            ClientProducer prod = session.createProducer(ADDRESS);
-            for (int i = 0; i < numberOfMessages; i++)
-            {
-               if (transactional)
-               {
-                  if (i % commitInterval == 0)
-                  {
-                     session.commit();
-                  }
-               }
-               if (i % 100 == 0)
-               {
-                  // System.out.println(Thread.currentThread().getName() + "::sent #" + i);
-               }
-               ClientMessage msg = session.createClientMessage(true);
-               msg.setBody(ChannelBuffers.wrappedBuffer(new byte[1024]));
-               prod.send(msg);
-            }
-
-            if (transactional)
-            {
-               session.commit();
-            }
-
-            System.out.println("Thread " + Thread.currentThread().getName() +
-                               " sent " +
-                               numberOfMessages +
-                               "  messages");
-         }
-         catch (Throwable e)
-         {
-            e.printStackTrace();
-            this.e = e;
-         }
-         finally
-         {
-            try
-            {
-               session.close();
-            }
-            catch (Throwable e)
-            {
-               this.e = e;
-            }
-         }
-      }
-   }
-
-   class ConsumerThread extends BaseThread
-   {
-      ConsumerThread(int id,
-                     CountDownLatch latchReady,
-                     CountDownLatch latchStart,
-                     boolean transactional,
-                     int numberOfMessages,
-                     int commitInterval)
-      {
-         super("ClientConsumer:" + id, latchReady, latchStart, transactional, numberOfMessages, commitInterval);
-      }
-
-      public void run()
-      {
-         ClientSession session = null;
-         latchReady.countDown();
-         try
-         {
-            latchStart.await();
-            session = sf.createSession(!transactional, !transactional);
-            session.start();
-            ClientConsumer cons = session.createConsumer(QUEUE);
-            for (int i = 0; i < numberOfMessages; i++)
-            {
-               ClientMessage msg = cons.receive(60 * 1000);
-               msg.acknowledge();
-               if (i % commitInterval == 0)
-               {
-                  session.commit();
-               }
-               if (i % 100 == 0)
-               {
-                  // System.out.println(Thread.currentThread().getName() + "::received #" + i);
-               }
-            }
-
-            System.out.println("Thread " + Thread.currentThread().getName() +
-                               " received " +
-                               numberOfMessages +
-                               " messages");
-
-            session.commit();
-         }
-         catch (Throwable e)
-         {
-            this.e = e;
-         }
-         finally
-         {
-            try
-            {
-               session.close();
-            }
-            catch (Throwable e)
-            {
-               this.e = e;
-            }
-         }
-      }
-   }
-
-   // Package protected ---------------------------------------------
-
-   // Protected -----------------------------------------------------
-
-   // Private -------------------------------------------------------
-
-   // Inner classes -------------------------------------------------
-
-}

Added: trunk/tests/src/org/hornetq/tests/integration/journal/NIONoBufferJournalImplTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/journal/NIONoBufferJournalImplTest.java	                        (rev 0)
+++ trunk/tests/src/org/hornetq/tests/integration/journal/NIONoBufferJournalImplTest.java	2009-11-12 04:39:27 UTC (rev 8267)
@@ -0,0 +1,54 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.integration.journal;
+
+import java.io.File;
+
+import org.hornetq.core.journal.SequentialFileFactory;
+import org.hornetq.core.journal.impl.NIOSequentialFileFactory;
+import org.hornetq.core.logging.Logger;
+import org.hornetq.tests.unit.core.journal.impl.JournalImplTestUnit;
+
+/**
+ * 
+ * A RealJournalImplTest
+ * 
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ */
+public class NIONoBufferJournalImplTest extends JournalImplTestUnit
+{
+   private static final Logger log = Logger.getLogger(NIONoBufferJournalImplTest.class);
+
+   @Override
+   protected SequentialFileFactory getFileFactory() throws Exception
+   {
+      File file = new File(getTestDir());
+
+      log.debug("deleting directory " + getTestDir());
+
+      deleteDirectory(file);
+
+      file.mkdir();
+
+      return new NIOSequentialFileFactory(getTestDir(), false);
+   }
+
+   @Override
+   protected int getAlignment()
+   {
+      return 1;
+   }
+
+}

Modified: trunk/tests/src/org/hornetq/tests/integration/journal/ValidateTransactionHealthTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/journal/ValidateTransactionHealthTest.java	2009-11-12 02:47:08 UTC (rev 8266)
+++ trunk/tests/src/org/hornetq/tests/integration/journal/ValidateTransactionHealthTest.java	2009-11-12 04:39:27 UTC (rev 8267)
@@ -16,13 +16,17 @@
 import java.io.File;
 import java.nio.ByteBuffer;
 import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
 
 import org.hornetq.core.asyncio.impl.AsynchronousFileImpl;
+import org.hornetq.core.config.impl.ConfigurationImpl;
 import org.hornetq.core.journal.LoaderCallback;
 import org.hornetq.core.journal.PreparedTransactionInfo;
 import org.hornetq.core.journal.RecordInfo;
+import org.hornetq.core.journal.SequentialFileFactory;
+import org.hornetq.core.journal.impl.AIOSequentialFileFactory;
 import org.hornetq.core.journal.impl.JournalImpl;
-import org.hornetq.tests.stress.journal.remote.RemoteJournalAppender;
+import org.hornetq.core.journal.impl.NIOSequentialFileFactory;
 import org.hornetq.tests.util.SpawnedVMSupport;
 import org.hornetq.tests.util.UnitTestCase;
 
@@ -40,6 +44,8 @@
 
    // Attributes ----------------------------------------------------
 
+   private static final int OK = 10;
+
    // Static --------------------------------------------------------
 
    // Constructors --------------------------------------------------
@@ -85,7 +91,31 @@
    {
       internalTest("nio", getTestDir(), 10000, 0, true, true, 1);
    }
+   
+   
 
+   public void testNIO2() throws Exception
+   {
+      internalTest("nio2", getTestDir(), 10000, 100, true, true, 1);
+   }
+
+   public void testNIO2HugeTransaction() throws Exception
+   {
+      internalTest("nio2", getTestDir(), 10000, 10000, true, true, 1);
+   }
+
+   public void testNIO2MultiThread() throws Exception
+   {
+      internalTest("nio2", getTestDir(), 1000, 100, true, true, 10);
+   }
+
+   public void testNIO2NonTransactional() throws Exception
+   {
+      internalTest("nio2", getTestDir(), 10000, 0, true, true, 1);
+   }
+
+
+
    // Package protected ---------------------------------------------
 
    // Protected -----------------------------------------------------
@@ -124,18 +154,18 @@
          {
             if (externalProcess)
             {
-               Process process = SpawnedVMSupport.spawnVM(RemoteJournalAppender.class.getCanonicalName(),
+               Process process = SpawnedVMSupport.spawnVM(ValidateTransactionHealthTest.class.getCanonicalName(),
                                                           type,
                                                           journalDir,
                                                           Long.toString(numberOfRecords),
                                                           Integer.toString(transactionSize),
                                                           Integer.toString(numberOfThreads));
                process.waitFor();
-               assertEquals(RemoteJournalAppender.OK, process.exitValue());
+               assertEquals(ValidateTransactionHealthTest.OK, process.exitValue());
             }
             else
             {
-               JournalImpl journal = RemoteJournalAppender.appendData(type,
+               JournalImpl journal = ValidateTransactionHealthTest.appendData(type,
                                                                       journalDir,
                                                                       numberOfRecords,
                                                                       transactionSize,
@@ -155,7 +185,7 @@
 
    private void reload(final String type, final String journalDir, final long numberOfRecords, final int numberOfThreads) throws Exception
    {
-      JournalImpl journal = RemoteJournalAppender.createJournal(type, journalDir);
+      JournalImpl journal = ValidateTransactionHealthTest.createJournal(type, journalDir);
 
       journal.start();
       Loader loadTest = new Loader(numberOfRecords);
@@ -243,5 +273,210 @@
       }
 
    }
+   
+   
+   // Remote part of the test =================================================================
+   
 
+
+   public static void main(String args[]) throws Exception
+   {
+
+      if (args.length != 5)
+      {
+         System.err.println("Use: java -cp <classpath> " + ValidateTransactionHealthTest.class.getCanonicalName() +
+                            " aio|nio <journalDirectory> <NumberOfElements> <TransactionSize> <NumberOfThreads>");
+         System.exit(-1);
+      }
+      System.out.println("Running");
+      String journalType = args[0];
+      String journalDir = args[1];
+      long numberOfElements = Long.parseLong(args[2]);
+      int transactionSize = Integer.parseInt(args[3]);
+      int numberOfThreads = Integer.parseInt(args[4]);
+
+      try
+      {
+         appendData(journalType, journalDir, numberOfElements, transactionSize, numberOfThreads);
+
+      }
+      catch (Exception e)
+      {
+         e.printStackTrace(System.out);
+         System.exit(-1);
+      }
+
+      System.exit(OK);
+   }
+
+   public static JournalImpl appendData(String journalType,
+                                        String journalDir,
+                                        long numberOfElements,
+                                        int transactionSize,
+                                        int numberOfThreads) throws Exception
+   {
+      final JournalImpl journal = createJournal(journalType, journalDir);
+
+      journal.start();
+      journal.load(new LoaderCallback()
+      {
+
+         public void addPreparedTransaction(PreparedTransactionInfo preparedTransaction)
+         {
+         }
+
+         public void addRecord(RecordInfo info)
+         {
+         }
+
+         public void deleteRecord(long id)
+         {
+         }
+
+         public void updateRecord(RecordInfo info)
+         {
+         }
+
+         public void failedTransaction(long transactionID, List<RecordInfo> records, List<RecordInfo> recordsToDelete)
+         {
+         }
+      });
+
+      LocalThreads threads[] = new LocalThreads[numberOfThreads];
+      final AtomicLong sequenceTransaction = new AtomicLong();
+
+      for (int i = 0; i < numberOfThreads; i++)
+      {
+         threads[i] = new LocalThreads(journal, numberOfElements, transactionSize, sequenceTransaction);
+         threads[i].start();
+      }
+
+      Exception e = null;
+      for (LocalThreads t : threads)
+      {
+         t.join();
+
+         if (t.e != null)
+         {
+            e = t.e;
+         }
+      }
+
+      if (e != null)
+      {
+         throw e;
+      }
+
+      return journal;
+   }
+
+   public static JournalImpl createJournal(String journalType, String journalDir)
+   {
+      JournalImpl journal = new JournalImpl(10485760,
+                                            2,
+                                            0,
+                                            0,
+                                            getFactory(journalType, journalDir),
+                                            "journaltst",
+                                            "tst",
+                                            500);
+      return journal;
+   }
+
+   public static SequentialFileFactory getFactory(String factoryType, String directory)
+   {
+      if (factoryType.equals("aio"))
+      {
+         return new AIOSequentialFileFactory(directory,
+                                             ConfigurationImpl.DEFAULT_JOURNAL_AIO_BUFFER_SIZE,
+                                             ConfigurationImpl.DEFAULT_JOURNAL_AIO_BUFFER_TIMEOUT,
+                                             ConfigurationImpl.DEFAULT_JOURNAL_AIO_FLUSH_SYNC,
+                                             false);
+      }
+      else
+      if (factoryType.equals("nio2"))
+      {
+         return new NIOSequentialFileFactory(directory, false);
+      }
+      else
+      {
+         return new NIOSequentialFileFactory(directory);
+      }
+   }
+
+   static class LocalThreads extends Thread
+   {
+      final JournalImpl journal;
+
+      final long numberOfElements;
+
+      final int transactionSize;
+
+      final AtomicLong nextID;
+
+      Exception e;
+
+      public LocalThreads(JournalImpl journal, long numberOfElements, int transactionSize, AtomicLong nextID)
+      {
+         super();
+         this.journal = journal;
+         this.numberOfElements = numberOfElements;
+         this.transactionSize = transactionSize;
+         this.nextID = nextID;
+      }
+
+      public void run()
+      {
+         try
+         {
+            int transactionCounter = 0;
+
+            long transactionId = nextID.incrementAndGet();
+
+            for (long i = 0; i < numberOfElements; i++)
+            {
+
+               long id = nextID.incrementAndGet();
+
+               ByteBuffer buffer = ByteBuffer.allocate(512 * 3);
+               buffer.putLong(id);
+
+               if (transactionSize != 0)
+               {
+                  journal.appendAddRecordTransactional(transactionId, id, (byte)99, buffer.array());
+
+                  if (++transactionCounter == transactionSize)
+                  {
+                     System.out.println("Commit transaction " + transactionId);
+                     journal.appendCommitRecord(transactionId, true);
+                     transactionCounter = 0;
+                     transactionId = nextID.incrementAndGet();
+                  }
+               }
+               else
+               {
+                  journal.appendAddRecord(id, (byte)99, buffer.array(), false);
+               }
+            }
+
+            if (transactionCounter != 0)
+            {
+               journal.appendCommitRecord(transactionId, true);
+            }
+
+            if (transactionSize == 0)
+            {
+               journal.debugWait();
+            }
+         }
+         catch (Exception e)
+         {
+            this.e = e;
+         }
+
+      }
+   }
+
+   
+
 }

Added: trunk/tests/src/org/hornetq/tests/stress/journal/AIOMultiThreadCompactorStressTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/stress/journal/AIOMultiThreadCompactorStressTest.java	                        (rev 0)
+++ trunk/tests/src/org/hornetq/tests/stress/journal/AIOMultiThreadCompactorStressTest.java	2009-11-12 04:39:27 UTC (rev 8267)
@@ -0,0 +1,45 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.stress.journal;
+
+import junit.framework.TestSuite;
+
+import org.hornetq.core.server.JournalType;
+
+/**
+ * A AIOMultiThreadCompactorStressTest
+ *
+ * @author <mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public class AIOMultiThreadCompactorStressTest extends NIOMultiThreadCompactorStressTest
+{
+
+   
+   public static TestSuite suite()
+   {
+      return createAIOTestSuite(AIOMultiThreadCompactorStressTest.class);
+   }
+   
+
+   /**
+    * @return
+    */
+   protected JournalType getJournalType()
+   {
+      return JournalType.ASYNCIO;
+   }
+
+}

Modified: trunk/tests/src/org/hornetq/tests/stress/journal/LargeJournalStressTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/stress/journal/LargeJournalStressTest.java	2009-11-12 02:47:08 UTC (rev 8266)
+++ trunk/tests/src/org/hornetq/tests/stress/journal/LargeJournalStressTest.java	2009-11-12 04:39:27 UTC (rev 8267)
@@ -262,6 +262,7 @@
    private void setupServer(JournalType journalType) throws Exception, HornetQException
    {
       Configuration config = createDefaultConfig();
+      config.setJournalSyncNonTransactional(false);
       config.setJournalFileSize(ConfigurationImpl.DEFAULT_JOURNAL_FILE_SIZE);
 
       config.setJournalType(journalType);
@@ -274,6 +275,10 @@
       server.start();
 
       sf = createInVMFactory();
+      sf.setBlockOnAcknowledge(false);
+      sf.setBlockOnNonPersistentSend(false);
+      sf.setBlockOnPersistentSend(false);
+      
 
       ClientSession sess = sf.createSession();
 

Copied: trunk/tests/src/org/hornetq/tests/stress/journal/NIOMultiThreadCompactorStressTest.java (from rev 8264, trunk/tests/src/org/hornetq/tests/integration/journal/MultiThreadCompactorTest.java)
===================================================================
--- trunk/tests/src/org/hornetq/tests/stress/journal/NIOMultiThreadCompactorStressTest.java	                        (rev 0)
+++ trunk/tests/src/org/hornetq/tests/stress/journal/NIOMultiThreadCompactorStressTest.java	2009-11-12 04:39:27 UTC (rev 8267)
@@ -0,0 +1,560 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.stress.journal;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+
+import javax.transaction.xa.XAException;
+import javax.transaction.xa.XAResource;
+import javax.transaction.xa.Xid;
+
+import org.hornetq.core.asyncio.impl.AsynchronousFileImpl;
+import org.hornetq.core.buffers.ChannelBuffers;
+import org.hornetq.core.client.ClientConsumer;
+import org.hornetq.core.client.ClientMessage;
+import org.hornetq.core.client.ClientProducer;
+import org.hornetq.core.client.ClientSession;
+import org.hornetq.core.client.ClientSessionFactory;
+import org.hornetq.core.config.Configuration;
+import org.hornetq.core.config.impl.ConfigurationImpl;
+import org.hornetq.core.exception.HornetQException;
+import org.hornetq.core.journal.PreparedTransactionInfo;
+import org.hornetq.core.journal.RecordInfo;
+import org.hornetq.core.journal.impl.JournalImpl;
+import org.hornetq.core.journal.impl.NIOSequentialFileFactory;
+import org.hornetq.core.server.HornetQServer;
+import org.hornetq.core.server.JournalType;
+import org.hornetq.tests.util.ServiceTestBase;
+import org.hornetq.utils.SimpleString;
+
+/**
+ * A MultiThreadConsumerStressTest
+ *
+ * @author <mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public class NIOMultiThreadCompactorStressTest extends ServiceTestBase
+{
+
+   // Constants -----------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+
+   final SimpleString ADDRESS = new SimpleString("SomeAddress");
+
+   final SimpleString QUEUE = new SimpleString("SomeQueue");
+
+   private HornetQServer server;
+
+   private ClientSessionFactory sf;
+
+   protected int getNumberOfIterations()
+   {
+      return 3;
+   }
+
+   protected void setUp() throws Exception
+   {
+      super.setUp();
+   }
+
+   protected void tearDown() throws Exception
+   {
+      stopServer();
+      super.tearDown();
+   }
+
+   public void testMultiThreadCompact() throws Throwable
+   {
+      setupServer(getJournalType());
+      for (int i = 0; i < getNumberOfIterations(); i++)
+      {
+         System.out.println("######################################");
+         System.out.println("test # " + i);
+
+         internalTestProduceAndConsume();
+         stopServer();
+
+         NIOSequentialFileFactory factory = new NIOSequentialFileFactory(getJournalDir());
+         JournalImpl journal = new JournalImpl(ConfigurationImpl.DEFAULT_JOURNAL_FILE_SIZE,
+                                               2,
+                                               0,
+                                               0,
+                                               factory,
+                                               "hornetq-data",
+                                               "hq",
+                                               100);
+
+         List<RecordInfo> committedRecords = new ArrayList<RecordInfo>();
+         List<PreparedTransactionInfo> preparedTransactions = new ArrayList<PreparedTransactionInfo>();
+
+         journal.start();
+         journal.load(committedRecords, preparedTransactions, null);
+
+         assertEquals(0, committedRecords.size());
+         assertEquals(0, preparedTransactions.size());
+
+         System.out.println("DataFiles = " + journal.getDataFilesCount());
+
+         if (i % 2 == 0 && i > 0)
+         {
+            System.out.println("DataFiles = " + journal.getDataFilesCount());
+            journal.forceMoveNextFile();
+            assertEquals(0, journal.getDataFilesCount());
+         }
+
+         journal.stop();
+         journal = null;
+
+         setupServer(getJournalType());
+
+      }
+   }
+
+   /**
+    * @return
+    */
+   protected JournalType getJournalType()
+   {
+      return JournalType.NIO;
+   }
+
+   /**
+    * @param xid
+    * @throws HornetQException
+    * @throws XAException
+    */
+   private void addEmptyTransaction(Xid xid) throws HornetQException, XAException
+   {
+      ClientSessionFactory sf = createInVMFactory();
+      sf.setBlockOnNonPersistentSend(false);
+      sf.setBlockOnAcknowledge(false);
+      ClientSession session = sf.createSession(true, false, false);
+      session.start(xid, XAResource.TMNOFLAGS);
+      session.end(xid, XAResource.TMSUCCESS);
+      session.prepare(xid);
+      session.close();
+      sf.close();
+   }
+
+   private void checkEmptyXID(Xid xid) throws HornetQException, XAException
+   {
+      ClientSessionFactory sf = createInVMFactory();
+      sf.setBlockOnNonPersistentSend(false);
+      sf.setBlockOnAcknowledge(false);
+      ClientSession session = sf.createSession(true, false, false);
+
+      Xid[] xids = session.recover(XAResource.TMSTARTRSCAN);
+      assertEquals(1, xids.length);
+      assertEquals(xid, xids[0]);
+
+      session.rollback(xid);
+
+      session.close();
+      sf.close();
+   }
+
+   public void internalTestProduceAndConsume() throws Throwable
+   {
+
+      addBogusData(100, "LAZY-QUEUE");
+
+      Xid xid = null;
+      xid = newXID();
+      addEmptyTransaction(xid);
+
+      System.out.println(getTemporaryDir());
+      boolean transactionalOnConsume = true;
+      boolean transactionalOnProduce = true;
+      int numberOfConsumers = 30;
+      // this test assumes numberOfConsumers == numberOfProducers
+      int numberOfProducers = numberOfConsumers;
+      int produceMessage = 5000;
+      int commitIntervalProduce = 100;
+      int consumeMessage = (int)(produceMessage * 0.9);
+      int commitIntervalConsume = 100;
+
+      System.out.println("ConsumeMessages = " + consumeMessage + " produceMessage = " + produceMessage);
+
+      // Number of messages expected to be received after restart
+      int numberOfMessagesExpected = (produceMessage - consumeMessage) * numberOfConsumers;
+
+      CountDownLatch latchReady = new CountDownLatch(numberOfConsumers + numberOfProducers);
+
+      CountDownLatch latchStart = new CountDownLatch(1);
+
+      ArrayList<BaseThread> threads = new ArrayList<BaseThread>();
+
+      ProducerThread[] prod = new ProducerThread[numberOfProducers];
+      for (int i = 0; i < numberOfProducers; i++)
+      {
+         prod[i] = new ProducerThread(i,
+                                      latchReady,
+                                      latchStart,
+                                      transactionalOnConsume,
+                                      produceMessage,
+                                      commitIntervalProduce);
+         prod[i].start();
+         threads.add(prod[i]);
+      }
+
+      ConsumerThread[] cons = new ConsumerThread[numberOfConsumers];
+
+      for (int i = 0; i < numberOfConsumers; i++)
+      {
+         cons[i] = new ConsumerThread(i,
+                                      latchReady,
+                                      latchStart,
+                                      transactionalOnProduce,
+                                      consumeMessage,
+                                      commitIntervalConsume);
+         cons[i].start();
+         threads.add(cons[i]);
+      }
+
+      latchReady.await();
+      latchStart.countDown();
+
+      for (BaseThread t : threads)
+      {
+         t.join();
+         if (t.e != null)
+         {
+            throw t.e;
+         }
+      }
+
+      server.stop();
+
+      setupServer(getJournalType());
+
+      drainQueue(numberOfMessagesExpected, QUEUE);
+      drainQueue(100, new SimpleString("LAZY-QUEUE"));
+
+      server.stop();
+
+      setupServer(getJournalType());
+      drainQueue(0, QUEUE);
+      drainQueue(0, new SimpleString("LAZY-QUEUE"));
+
+      checkEmptyXID(xid);
+
+   }
+
+   /**
+    * @param numberOfMessagesExpected
+    * @param queue
+    * @throws HornetQException
+    */
+   private void drainQueue(int numberOfMessagesExpected, SimpleString queue) throws HornetQException
+   {
+      ClientSession sess = sf.createSession(true, true);
+
+      ClientConsumer consumer = sess.createConsumer(queue);
+
+      sess.start();
+
+      for (int i = 0; i < numberOfMessagesExpected; i++)
+      {
+         ClientMessage msg = consumer.receive(5000);
+         assertNotNull(msg);
+
+         if (i % 100 == 0)
+         {
+            // System.out.println("Received #" + i + "  on thread after start");
+         }
+         msg.acknowledge();
+      }
+
+      assertNull(consumer.receiveImmediate());
+
+      sess.close();
+   }
+
+   /**
+    * @throws HornetQException
+    */
+   private void addBogusData(int nmessages, String queue) throws HornetQException
+   {
+      ClientSession session = sf.createSession(false, false);
+      try
+      {
+         session.createQueue(queue, queue, true);
+      }
+      catch (Exception ignored)
+      {
+      }
+
+      ClientProducer prod = session.createProducer(queue);
+      for (int i = 0; i < nmessages; i++)
+      {
+         ClientMessage msg = session.createClientMessage(true);
+         msg.getBody().writeBytes(new byte[1024]);
+         prod.send(msg);
+      }
+      session.commit();
+
+      session.start();
+
+      ClientConsumer cons = session.createConsumer(queue);
+      assertNotNull(cons.receive(1000));
+      session.rollback();
+      session.close();
+   }
+
+   protected void stopServer() throws Exception
+   {
+      try
+      {
+         if (server != null && server.isStarted())
+         {
+            server.stop();
+         }
+      }
+      catch (Throwable e)
+      {
+         e.printStackTrace(System.out); // System.out => junit reports
+      }
+
+      sf = null;
+   }
+
+   private void setupServer(JournalType journalType) throws Exception, HornetQException
+   {
+      if (!AsynchronousFileImpl.isLoaded())
+      {
+         journalType = JournalType.NIO;
+      }
+      if (server == null)
+      {
+         Configuration config = createDefaultConfig(true);
+         config.setJournalFileSize(ConfigurationImpl.DEFAULT_JOURNAL_FILE_SIZE);
+
+         config.setJournalType(journalType);
+         config.setJMXManagementEnabled(false);
+
+         config.setJournalFileSize(ConfigurationImpl.DEFAULT_JOURNAL_FILE_SIZE);
+         config.setJournalMinFiles(ConfigurationImpl.DEFAULT_JOURNAL_MIN_FILES);
+
+         config.setJournalCompactMinFiles(ConfigurationImpl.DEFAULT_JOURNAL_COMPACT_MIN_FILES);
+         config.setJournalCompactPercentage(ConfigurationImpl.DEFAULT_JOURNAL_COMPACT_PERCENTAGE);
+
+         // This test is supposed to not sync.. All the ACKs are async, and it was supposed to not sync
+         config.setJournalSyncNonTransactional(false);
+
+         // config.setJournalCompactMinFiles(0);
+         // config.setJournalCompactPercentage(0);
+
+         server = createServer(true, config);
+      }
+
+      server.start();
+
+      sf = createNettyFactory();
+      sf.setBlockOnPersistentSend(false);
+      sf.setBlockOnAcknowledge(false);
+
+      ClientSession sess = sf.createSession();
+
+      try
+      {
+         sess.createQueue(ADDRESS, QUEUE, true);
+      }
+      catch (Exception ignored)
+      {
+      }
+
+      sess.close();
+
+      sf = createInVMFactory();
+   }
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   // Public --------------------------------------------------------
+
+   class BaseThread extends Thread
+   {
+      Throwable e;
+
+      final CountDownLatch latchReady;
+
+      final CountDownLatch latchStart;
+
+      final int numberOfMessages;
+
+      final int commitInterval;
+
+      final boolean transactional;
+
+      BaseThread(String name,
+                 CountDownLatch latchReady,
+                 CountDownLatch latchStart,
+                 boolean transactional,
+                 int numberOfMessages,
+                 int commitInterval)
+      {
+         super(name);
+         this.transactional = transactional;
+         this.latchReady = latchReady;
+         this.latchStart = latchStart;
+         this.commitInterval = commitInterval;
+         this.numberOfMessages = numberOfMessages;
+      }
+
+   }
+
+   class ProducerThread extends BaseThread
+   {
+      ProducerThread(int id,
+                     CountDownLatch latchReady,
+                     CountDownLatch latchStart,
+                     boolean transactional,
+                     int numberOfMessages,
+                     int commitInterval)
+      {
+         super("ClientProducer:" + id, latchReady, latchStart, transactional, numberOfMessages, commitInterval);
+      }
+
+      public void run()
+      {
+         ClientSession session = null;
+         latchReady.countDown();
+         try
+         {
+            latchStart.await();
+            session = sf.createSession(!transactional, !transactional);
+            ClientProducer prod = session.createProducer(ADDRESS);
+            for (int i = 0; i < numberOfMessages; i++)
+            {
+               if (transactional)
+               {
+                  if (i % commitInterval == 0)
+                  {
+                     session.commit();
+                  }
+               }
+               if (i % 100 == 0)
+               {
+                  // System.out.println(Thread.currentThread().getName() + "::sent #" + i);
+               }
+               ClientMessage msg = session.createClientMessage(true);
+               msg.setBody(ChannelBuffers.wrappedBuffer(new byte[1024]));
+               prod.send(msg);
+            }
+
+            if (transactional)
+            {
+               session.commit();
+            }
+
+            System.out.println("Thread " + Thread.currentThread().getName() +
+                               " sent " +
+                               numberOfMessages +
+                               "  messages");
+         }
+         catch (Throwable e)
+         {
+            e.printStackTrace();
+            this.e = e;
+         }
+         finally
+         {
+            try
+            {
+               session.close();
+            }
+            catch (Throwable e)
+            {
+               this.e = e;
+            }
+         }
+      }
+   }
+
+   class ConsumerThread extends BaseThread
+   {
+      ConsumerThread(int id,
+                     CountDownLatch latchReady,
+                     CountDownLatch latchStart,
+                     boolean transactional,
+                     int numberOfMessages,
+                     int commitInterval)
+      {
+         super("ClientConsumer:" + id, latchReady, latchStart, transactional, numberOfMessages, commitInterval);
+      }
+
+      public void run()
+      {
+         ClientSession session = null;
+         latchReady.countDown();
+         try
+         {
+            latchStart.await();
+            session = sf.createSession(!transactional, !transactional);
+            session.start();
+            ClientConsumer cons = session.createConsumer(QUEUE);
+            for (int i = 0; i < numberOfMessages; i++)
+            {
+               ClientMessage msg = cons.receive(60 * 1000);
+               msg.acknowledge();
+               if (i % commitInterval == 0)
+               {
+                  session.commit();
+               }
+               if (i % 100 == 0)
+               {
+                  // System.out.println(Thread.currentThread().getName() + "::received #" + i);
+               }
+            }
+
+            System.out.println("Thread " + Thread.currentThread().getName() +
+                               " received " +
+                               numberOfMessages +
+                               " messages");
+
+            session.commit();
+         }
+         catch (Throwable e)
+         {
+            this.e = e;
+         }
+         finally
+         {
+            try
+            {
+               session.close();
+            }
+            catch (Throwable e)
+            {
+               this.e = e;
+            }
+         }
+      }
+   }
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+
+}

Deleted: trunk/tests/src/org/hornetq/tests/unit/core/asyncio/TimedBufferTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/asyncio/TimedBufferTest.java	2009-11-12 02:47:08 UTC (rev 8266)
+++ trunk/tests/src/org/hornetq/tests/unit/core/asyncio/TimedBufferTest.java	2009-11-12 04:39:27 UTC (rev 8267)
@@ -1,153 +0,0 @@
-/*
- * Copyright 2009 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *    http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied.  See the License for the specific language governing
- * permissions and limitations under the License.
- */
-
-
-package org.hornetq.tests.unit.core.asyncio;
-
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import junit.framework.TestSuite;
-
-import org.hornetq.core.asyncio.AIOCallback;
-import org.hornetq.core.asyncio.impl.AsynchronousFileImpl;
-import org.hornetq.core.asyncio.impl.TimedBuffer;
-import org.hornetq.core.asyncio.impl.TimedBufferObserver;
-import org.hornetq.tests.util.UnitTestCase;
-
-/**
- * A TimedBufferTest
- *
- * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
- *
- *
- */
-public class TimedBufferTest extends UnitTestCase
-{
-
-   // Constants -----------------------------------------------------
-
-   // Attributes ----------------------------------------------------
-
-   // Static --------------------------------------------------------
-   
-   public static TestSuite suite()
-   {
-      return createAIOTestSuite(TimedBufferTest.class);
-   }
-
-
-   // Constructors --------------------------------------------------
-
-   // Public --------------------------------------------------------
-   
-   AIOCallback dummyCallback = new AIOCallback()
-   {
-
-      public void done()
-      {
-      }
-
-      public void onError(int errorCode, String errorMessage)
-      {
-      }
-   };
-
-   
-   public void testFillBuffer()
-   {
-      final ArrayList<ByteBuffer> buffers = new ArrayList<ByteBuffer>();
-      final AtomicInteger flushTimes = new AtomicInteger(0);
-      class TestObserver implements TimedBufferObserver
-      {
-         public void flushBuffer(ByteBuffer buffer, List<AIOCallback> callbacks)
-         {
-            buffers.add(buffer);
-            flushTimes.incrementAndGet();
-         }
-
-         /* (non-Javadoc)
-          * @see org.hornetq.utils.timedbuffer.TimedBufferObserver#newBuffer(int, int)
-          */
-         public ByteBuffer newBuffer(int minSize, int maxSize)
-         {
-            return ByteBuffer.allocate(maxSize);
-         }
-
-         public int getRemainingBytes()
-         {
-            return 1024*1024;
-         }
-      }
-      
-      TimedBuffer timedBuffer = new TimedBuffer(100, 3600 * 1000, false, false); // Any big timeout
-      
-      timedBuffer.setObserver(new TestObserver());
-      
-      int x = 0;
-      for (int i = 0 ; i < 10; i++)
-      {
-         byte[] bytes = new byte[10];
-         for (int j = 0 ; j < 10; j++)
-         {
-            bytes[j] = getSamplebyte(x++);
-         }
-         
-         timedBuffer.checkSize(10);
-         timedBuffer.addBytes(bytes, false, dummyCallback);
-      }
-            
-      assertEquals(1, flushTimes.get());
-      
-      ByteBuffer flushedBuffer = buffers.get(0);
-      
-      assertEquals(100, flushedBuffer.limit());
-      
-      assertEquals(100, flushedBuffer.capacity());
-      
-
-      flushedBuffer.rewind();
-      
-      for (int i = 0; i < 100; i++)
-      {
-         assertEquals(getSamplebyte(i), flushedBuffer.get());
-      }
-      
-      
-   }
-   
-   // Package protected ---------------------------------------------
-
-   // Protected -----------------------------------------------------
-
-   @Override
-   protected void setUp() throws Exception
-   {
-      super.setUp();
-
-      if (!AsynchronousFileImpl.isLoaded())
-      {
-         fail(String.format("libAIO is not loaded on %s %s %s",
-                            System.getProperty("os.name"),
-                            System.getProperty("os.arch"),
-                            System.getProperty("os.version")));
-      }
-   }
-   
-   // Private -------------------------------------------------------
-
-   // Inner classes -------------------------------------------------
-
-}

Modified: trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/AlignedJournalImplTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/AlignedJournalImplTest.java	2009-11-12 02:47:08 UTC (rev 8266)
+++ trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/AlignedJournalImplTest.java	2009-11-12 04:39:27 UTC (rev 8267)
@@ -107,7 +107,7 @@
             buffer.put(i, (byte)1);
          }
 
-         file.write(buffer, true);
+         file.writeDirect(buffer, true);
 
          buffer = ByteBuffer.allocate(400);
          for (int i = 0; i < 400; i++)
@@ -115,7 +115,7 @@
             buffer.put(i, (byte)2);
          }
 
-         file.write(buffer, true);
+         file.writeDirect(buffer, true);
 
          buffer = ByteBuffer.allocate(600);
 
@@ -598,7 +598,7 @@
       // Changing the check bufferSize, so reload will ignore this record
       file.position(100);
 
-      file.write(buffer, true);
+      file.writeDirect(buffer, true);
 
       file.close();
 
@@ -663,7 +663,7 @@
       // Changing the check bufferSize, so reload will ignore this record
       file.position(100);
 
-      file.write(buffer, true);
+      file.writeDirect(buffer, true);
 
       file.close();
 
@@ -759,7 +759,7 @@
       // reload will think the record came from a different journal usage)
       file.position(100);
 
-      file.write(buffer, true);
+      file.writeDirect(buffer, true);
 
       file.close();
 
@@ -1038,7 +1038,7 @@
       // reload will think the record came from a different journal usage)
       file.position(100);
 
-      file.write(buffer, true);
+      file.writeDirect(buffer, true);
 
       file.close();
 

Modified: trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/JournalImplTestUnit.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/JournalImplTestUnit.java	2009-11-12 02:47:08 UTC (rev 8266)
+++ trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/JournalImplTestUnit.java	2009-11-12 04:39:27 UTC (rev 8267)
@@ -2056,10 +2056,11 @@
 
    public void testSimpleAdd() throws Exception
    {
-      setup(10, 10 * 1024, true);
+      setup(2, 10 * 1024, true);
       createJournal();
       startJournal();
       load();
+      this.sync = true;
       add(1);
       stopJournal();
       createJournal();

Modified: trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/SequentialFileFactoryTestBase.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/SequentialFileFactoryTestBase.java	2009-11-12 02:47:08 UTC (rev 8266)
+++ trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/SequentialFileFactoryTestBase.java	2009-11-12 04:39:27 UTC (rev 8267)
@@ -224,19 +224,19 @@
       ByteBuffer bb3 = factory.wrapBuffer(bytes3);
       
       long initialPos = sf.position();
-      sf.write(bb1, true);
+      sf.writeDirect(bb1, true);
       long bytesWritten = sf.position() - initialPos;
 
       assertEquals(calculateRecordSize(bytes1.length, sf.getAlignment()), bytesWritten);
 
       initialPos = sf.position();
-      sf.write(bb2, true);
+      sf.writeDirect(bb2, true);
       bytesWritten = sf.position() - initialPos;
 
       assertEquals(calculateRecordSize(bytes2.length, sf.getAlignment()), bytesWritten);
 
       initialPos = sf.position();
-      sf.write(bb3, true);
+      sf.writeDirect(bb3, true);
       bytesWritten = sf.position() - initialPos;
 
       assertEquals(calculateRecordSize(bytes3.length, sf.getAlignment()), bytesWritten);
@@ -296,20 +296,20 @@
          ByteBuffer bb3 = factory.wrapBuffer(bytes3);
 
          long initialPos = sf.position();
-         sf.write(bb1, true);
+         sf.writeDirect(bb1, true);
          long bytesWritten = sf.position() - initialPos;
 
          assertEquals(bb1.limit(), bytesWritten);
 
          initialPos = sf.position();
-         sf.write(bb2, true);
+         sf.writeDirect(bb2, true);
          bytesWritten = sf.position() - initialPos;
 
          
          assertEquals(bb2.limit(), bytesWritten);
 
          initialPos = sf.position();
-         sf.write(bb3, true);
+         sf.writeDirect(bb3, true);
          bytesWritten = sf.position() - initialPos;
 
          assertEquals(bb3.limit(), bytesWritten);
@@ -373,7 +373,7 @@
       ByteBuffer bb1 = factory.wrapBuffer(bytes1);
 
       long initialPos = sf.position();
-      sf.write(bb1, true);
+      sf.writeDirect(bb1, true);
       long bytesWritten = sf.position() - initialPos;
 
       assertEquals(bb1.limit(), bytesWritten);
@@ -385,7 +385,7 @@
          
          bb1 = factory.wrapBuffer(bytes1);
          
-         sf.write(bb1, true);
+         sf.writeDirect(bb1, true);
 
          fail("Should throw exception");
       }
@@ -396,7 +396,7 @@
 
       sf.open();
 
-      sf.write(bb1, true);
+      sf.writeDirect(bb1, true);
 
       sf.close();
    }

Copied: trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/TimedBufferTest.java (from rev 8261, trunk/tests/src/org/hornetq/tests/unit/core/asyncio/TimedBufferTest.java)
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/TimedBufferTest.java	                        (rev 0)
+++ trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/TimedBufferTest.java	2009-11-12 04:39:27 UTC (rev 8267)
@@ -0,0 +1,129 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.unit.core.journal.impl;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.hornetq.core.journal.IOCallback;
+import org.hornetq.core.journal.impl.TimedBuffer;
+import org.hornetq.core.journal.impl.TimedBufferObserver;
+import org.hornetq.tests.util.UnitTestCase;
+
+/**
+ * A TimedBufferTest
+ *
+ * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public class TimedBufferTest extends UnitTestCase
+{
+
+   // Constants -----------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+
+   // Static --------------------------------------------------------
+   // Constructors --------------------------------------------------
+
+   // Public --------------------------------------------------------
+
+   IOCallback dummyCallback = new IOCallback()
+   {
+
+      public void done()
+      {
+      }
+
+      public void onError(final int errorCode, final String errorMessage)
+      {
+      }
+
+      public void waitCompletion() throws Exception
+      {
+      }
+   };
+
+   public void testFillBuffer()
+   {
+      final ArrayList<ByteBuffer> buffers = new ArrayList<ByteBuffer>();
+      final AtomicInteger flushTimes = new AtomicInteger(0);
+      class TestObserver implements TimedBufferObserver
+      {
+         public void flushBuffer(final ByteBuffer buffer, final List<IOCallback> callbacks)
+         {
+            buffers.add(buffer);
+            flushTimes.incrementAndGet();
+         }
+
+         /* (non-Javadoc)
+          * @see org.hornetq.utils.timedbuffer.TimedBufferObserver#newBuffer(int, int)
+          */
+         public ByteBuffer newBuffer(final int minSize, final int maxSize)
+         {
+            return ByteBuffer.allocate(maxSize);
+         }
+
+         public int getRemainingBytes()
+         {
+            return 1024 * 1024;
+         }
+      }
+
+      TimedBuffer timedBuffer = new TimedBuffer(100, 3600 * 1000, false, false); // Any big timeout
+
+      timedBuffer.setObserver(new TestObserver());
+
+      int x = 0;
+      for (int i = 0; i < 10; i++)
+      {
+         byte[] bytes = new byte[10];
+         for (int j = 0; j < 10; j++)
+         {
+            bytes[j] = getSamplebyte(x++);
+         }
+
+         timedBuffer.checkSize(10);
+         timedBuffer.addBytes(bytes, false, dummyCallback);
+      }
+
+      assertEquals(1, flushTimes.get());
+
+      ByteBuffer flushedBuffer = buffers.get(0);
+
+      assertEquals(100, flushedBuffer.limit());
+
+      assertEquals(100, flushedBuffer.capacity());
+
+      flushedBuffer.rewind();
+
+      for (int i = 0; i < 100; i++)
+      {
+         assertEquals(getSamplebyte(i), flushedBuffer.get());
+      }
+
+   }
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+
+}

Modified: trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java	2009-11-12 02:47:08 UTC (rev 8266)
+++ trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java	2009-11-12 04:39:27 UTC (rev 8267)
@@ -23,6 +23,7 @@
 import org.hornetq.core.journal.IOCallback;
 import org.hornetq.core.journal.SequentialFile;
 import org.hornetq.core.journal.SequentialFileFactory;
+import org.hornetq.core.journal.impl.TimedBuffer;
 import org.hornetq.core.logging.Logger;
 import org.hornetq.core.remoting.spi.HornetQBuffer;
 
@@ -438,7 +439,7 @@
          return data.position();
       }
 
-      public synchronized void write(final ByteBuffer bytes, final boolean sync, final IOCallback callback) throws Exception
+      public synchronized void writeDirect(final ByteBuffer bytes, final boolean sync, final IOCallback callback)
       {
          if (!open)
          {
@@ -491,9 +492,9 @@
          }
       }
 
-      public void write(final ByteBuffer bytes, final boolean sync) throws Exception
+      public void writeDirect(final ByteBuffer bytes, final boolean sync) throws Exception
       {
-         write(bytes, sync, null);
+         writeDirect(bytes, sync, null);
       }
 
       private void checkAndResize(final int size)
@@ -606,7 +607,7 @@
        */
       public void write(HornetQBuffer bytes, boolean sync, IOCallback callback) throws Exception
       {
-         write(ByteBuffer.wrap(bytes.array()), sync, callback);
+         writeDirect(ByteBuffer.wrap(bytes.array()), sync, callback);
 
       }
 
@@ -615,7 +616,7 @@
        */
       public void write(HornetQBuffer bytes, boolean sync) throws Exception
       {
-         write(ByteBuffer.wrap(bytes.array()), sync);
+         writeDirect(ByteBuffer.wrap(bytes.array()), sync);
       }
 
       /* (non-Javadoc)
@@ -628,6 +629,15 @@
          return file != null && file.data != null && file.data.capacity() > 0;
       }
 
+      /* (non-Javadoc)
+       * @see org.hornetq.core.journal.SequentialFile#setTimedBuffer(org.hornetq.core.journal.impl.TimedBuffer)
+       */
+      public void setTimedBuffer(TimedBuffer buffer)
+      {
+         // TODO Auto-generated method stub
+         
+      }
+
    }
 
    /* (non-Javadoc)

Modified: trunk/tests/src/org/hornetq/tests/unit/core/paging/impl/PageImplTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/paging/impl/PageImplTest.java	2009-11-12 02:47:08 UTC (rev 8266)
+++ trunk/tests/src/org/hornetq/tests/unit/core/paging/impl/PageImplTest.java	2009-11-12 04:39:27 UTC (rev 8267)
@@ -162,7 +162,7 @@
       
       buffer.rewind();
       
-      file.write(buffer, true);
+      file.writeDirect(buffer, true);
       
       impl.close();
 



More information about the hornetq-commits mailing list