[jboss-cvs] JBoss Messaging SVN: r7181 - in trunk: examples/core/perf/server0 and 23 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Wed Jun 3 10:06:16 EDT 2009


Author: timfox
Date: 2009-06-03 10:06:16 -0400 (Wed, 03 Jun 2009)
New Revision: 7181

Modified:
   trunk/examples/core/perf/perf.properties
   trunk/examples/core/perf/server0/jbm-configuration.xml
   trunk/examples/core/perf/src/org/jboss/core/example/PerfBase.java
   trunk/src/config/common/schema/jbm-configuration.xsd
   trunk/src/config/common/version.properties
   trunk/src/main/org/jboss/messaging/core/asyncio/impl/TimedBuffer.java
   trunk/src/main/org/jboss/messaging/core/config/Configuration.java
   trunk/src/main/org/jboss/messaging/core/config/impl/ConfigurationImpl.java
   trunk/src/main/org/jboss/messaging/core/config/impl/FileConfiguration.java
   trunk/src/main/org/jboss/messaging/core/journal/Journal.java
   trunk/src/main/org/jboss/messaging/core/journal/SequentialFile.java
   trunk/src/main/org/jboss/messaging/core/journal/TestableJournal.java
   trunk/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFile.java
   trunk/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFileFactory.java
   trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java
   trunk/src/main/org/jboss/messaging/core/management/MessagingServerControlMBean.java
   trunk/src/main/org/jboss/messaging/core/management/impl/MessagingServerControl.java
   trunk/src/main/org/jboss/messaging/core/management/jmx/impl/ReplicationAwareMessagingServerControlWrapper.java
   trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java
   trunk/tests/src/org/jboss/messaging/tests/integration/journal/RealAIOJournalImplTest.java
   trunk/tests/src/org/jboss/messaging/tests/integration/management/MessagingServerControlTest.java
   trunk/tests/src/org/jboss/messaging/tests/integration/management/MessagingServerControlUsingCoreTest.java
   trunk/tests/src/org/jboss/messaging/tests/integration/remoting/SynchronousCloseTest.java
   trunk/tests/src/org/jboss/messaging/tests/performance/journal/JournalImplTestUnit.java
   trunk/tests/src/org/jboss/messaging/tests/stress/journal/AddAndRemoveStressTest.java
   trunk/tests/src/org/jboss/messaging/tests/stress/journal/remote/RemoteJournalAppender.java
   trunk/tests/src/org/jboss/messaging/tests/timing/core/journal/impl/JournalImplTestUnit.java
   trunk/tests/src/org/jboss/messaging/tests/unit/core/config/impl/DefaultsFileConfigurationTest.java
   trunk/tests/src/org/jboss/messaging/tests/unit/core/config/impl/FileConfigurationTest.java
   trunk/tests/src/org/jboss/messaging/tests/unit/core/deployers/impl/QueueDeployerTest.java
   trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/AlignedJournalImplTest.java
   trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalAsyncTest.java
   trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalImplTestBase.java
   trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalImplTestUnit.java
   trunk/tests/src/org/jboss/messaging/tests/util/JournalExample.java
   trunk/tests/src/org/jboss/messaging/tests/util/ListJournal.java
Log:
more fixes to AIO

Modified: trunk/examples/core/perf/perf.properties
===================================================================
--- trunk/examples/core/perf/perf.properties	2009-06-03 13:22:52 UTC (rev 7180)
+++ trunk/examples/core/perf/perf.properties	2009-06-03 14:06:16 UTC (rev 7181)
@@ -1,10 +1,10 @@
-num-messages=500000
-num-warmup-messages=50000
+num-messages=100000
+num-warmup-messages=0
 message-size=1000
-durable=false
+durable=true
 transacted=false
 batch-size=1000
-drain-queue=true
+drain-queue=false
 queue-name=perfQueue
 throttle-rate=-1
 address=perfAddress
@@ -14,6 +14,6 @@
 tcp-buffer=1048576
 tcp-no-delay=false
 send-window=1048576
-pre-ack=true
+pre-ack=false
 block-ack=true
 block-persistent=true

Modified: trunk/examples/core/perf/server0/jbm-configuration.xml
===================================================================
--- trunk/examples/core/perf/server0/jbm-configuration.xml	2009-06-03 13:22:52 UTC (rev 7180)
+++ trunk/examples/core/perf/server0/jbm-configuration.xml	2009-06-03 14:06:16 UTC (rev 7181)
@@ -15,7 +15,13 @@
    
    <security-enabled>false</security-enabled>
    
-   <persistence-enabled>false</persistence-enabled>
+   <persistence-enabled>true</persistence-enabled>
+
+   <journal-sync-non-transactional>true</journal-sync-non-transactional>
+   <journal-type>ASYNCIO</journal-type>
+   <journal-aio-buffer-timeout>1</journal-aio-buffer-timeout>
+   <journal-min-files>10</journal-min-files>
+
    
    <queues>
 	   <queue name="perfQueue">

Modified: trunk/examples/core/perf/src/org/jboss/core/example/PerfBase.java
===================================================================
--- trunk/examples/core/perf/src/org/jboss/core/example/PerfBase.java	2009-06-03 13:22:52 UTC (rev 7180)
+++ trunk/examples/core/perf/src/org/jboss/core/example/PerfBase.java	2009-06-03 14:06:16 UTC (rev 7181)
@@ -384,6 +384,8 @@
             double duration = (1.0 * System.currentTimeMillis() - start) / 1000;
             log.info(String.format("sent %6d messages in %2.2fs", i, duration));
          }
+         
+       //  log.info("sent message " + i);
 
          if (tbl != null)
          {

Modified: trunk/src/config/common/schema/jbm-configuration.xsd
===================================================================
--- trunk/src/config/common/schema/jbm-configuration.xsd	2009-06-03 13:22:52 UTC (rev 7180)
+++ trunk/src/config/common/schema/jbm-configuration.xsd	2009-06-03 14:06:16 UTC (rev 7181)
@@ -194,10 +194,7 @@
 				</xsd:element>
 				<xsd:element name="journal-aio-buffer-size"
 					type="xsd:long" maxOccurs="1" minOccurs="0">
-				</xsd:element>
-				<xsd:element name="journal-aio-flush-on-sync"
-					type="xsd:boolean" maxOccurs="1" minOccurs="0">
-				</xsd:element>
+				</xsd:element>				
 				<xsd:element name="journal-sync-transactional"
 					type="xsd:boolean" maxOccurs="1" minOccurs="0">
 				</xsd:element>

Modified: trunk/src/config/common/version.properties
===================================================================
--- trunk/src/config/common/version.properties	2009-06-03 13:22:52 UTC (rev 7180)
+++ trunk/src/config/common/version.properties	2009-06-03 14:06:16 UTC (rev 7181)
@@ -1,7 +1,7 @@
-messaging.version.versionName=Stilton
+messaging.version.versionName=tadpole
 messaging.version.majorVersion=2
 messaging.version.minorVersion=0
 messaging.version.microVersion=0
-messaging.version.incrementingVersion=101
+messaging.version.incrementingVersion=102
 messaging.version.versionSuffix=BETA1-SNAPSHOT
 messaging.version.versionTag=beta1
\ No newline at end of file

Modified: trunk/src/main/org/jboss/messaging/core/asyncio/impl/TimedBuffer.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/asyncio/impl/TimedBuffer.java	2009-06-03 13:22:52 UTC (rev 7180)
+++ trunk/src/main/org/jboss/messaging/core/asyncio/impl/TimedBuffer.java	2009-06-03 14:06:16 UTC (rev 7181)
@@ -25,17 +25,13 @@
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import org.jboss.messaging.core.asyncio.AIOCallback;
 import org.jboss.messaging.core.logging.Logger;
-import org.jboss.messaging.utils.JBMThreadFactory;
+import org.jboss.messaging.utils.TokenBucketLimiter;
+import org.jboss.messaging.utils.TokenBucketLimiterImpl;
 
 /**
  * A TimedBuffer
@@ -54,10 +50,8 @@
 
    private final TimedBufferObserver bufferObserver;
 
-   private final CheckTimer timerRunnable = new CheckTimer();
+   private CheckTimer timerRunnable = new CheckTimer();
 
-   private volatile ScheduledFuture<?> futureTimerRunnable;
-
    private final long timeout;
 
    private final int bufferSize;
@@ -68,14 +62,16 @@
 
    private final Lock lock = new ReentrantReadWriteLock().writeLock();
 
-   private final ScheduledExecutorService schedule = ScheduledSingleton.getScheduledService();
-
    // used to measure inactivity. This buffer will be automatically flushed when more than timeout inactive
    private volatile long timeLastAdd = 0;
 
    // used to measure sync requests. When a sync is requested, it shouldn't take more than timeout to happen
    private volatile long timeLastSync = 0;
 
+   private Thread timerThread;
+   
+   private boolean started;
+
    // Static --------------------------------------------------------
 
    // Constructors --------------------------------------------------
@@ -86,12 +82,55 @@
    {
       bufferSize = size;
       this.bufferObserver = bufferObserver;
-      this.timeout = timeout;
+      this.timeout = timeout;      
       currentBuffer = ByteBuffer.wrap(new byte[bufferSize]);
       currentBuffer.limit(0);
-      callbacks = new ArrayList<AIOCallback>();
+      callbacks = new ArrayList<AIOCallback>();      
    }
+   
+   public synchronized void start()
+   {
+      if (started)
+      {
+         return;
+      }
+      
+      timerRunnable = new CheckTimer();
+      
+      timerThread = new Thread(timerRunnable, "jbm-aio-timer");
 
+      timerThread.start();
+      
+      started = true;
+      
+      log.info("started timed buffer");
+   }
+
+   public synchronized void stop()
+   {
+      if (!started)
+      {
+         return;
+      }
+      
+      timerRunnable.close();
+
+      while (timerThread.isAlive())
+      {
+         try
+         {
+            timerThread.join();
+         }
+         catch (InterruptedException e)
+         {
+         }
+      }
+      
+      started = false;
+      
+      log.info("stopped timedbuffer");
+   }
+
    public void lock()
    {
       lock.lock();
@@ -136,31 +175,26 @@
       {
          return true;
       }
-
    }
 
    public synchronized void addBytes(final ByteBuffer bytes, final boolean sync, final AIOCallback callback)
    {
-      timeLastAdd = System.currentTimeMillis();
-      
-      
+      long now = System.nanoTime();
+
+      timeLastAdd = now;
+
       if (sync)
       {
          // We should flush on the next timeout, no matter what other activity happens on the buffer
          if (timeLastSync == 0)
          {
-            timeLastSync = System.currentTimeMillis();
+            timeLastSync = now;
          }
       }
 
       currentBuffer.put(bytes);
       callbacks.add(callback);
 
-      if (futureTimerRunnable == null)
-      {
-         futureTimerRunnable = schedule.scheduleAtFixedRate(timerRunnable, timeout, timeout, TimeUnit.MILLISECONDS);
-      }
-      
       if (currentBuffer.position() == currentBuffer.capacity())
       {
          flush();
@@ -181,18 +215,11 @@
 
          callbacks = new ArrayList<AIOCallback>();
 
-      }
+         timeLastAdd = 0;
+         timeLastSync = 0;
 
-      if (futureTimerRunnable != null)
-      {
-         futureTimerRunnable.cancel(false);
-         futureTimerRunnable = null;
+         currentBuffer.limit(0);
       }
-
-      timeLastAdd = 0;
-      timeLastSync = 0;
-
-      currentBuffer.limit(0);
    }
 
    // Package protected ---------------------------------------------
@@ -203,15 +230,16 @@
 
    private void checkTimer()
    {
-      final long now = System.currentTimeMillis();
+      final long now = System.nanoTime();
 
       // if inactive for more than the timeout
       // of if a sync happened at more than the the timeout ago
-      if (now - timeLastAdd >= timeout || timeLastSync != 0 && now - timeLastSync >= timeout)
+      if (timeLastAdd != 0 && now - timeLastAdd >= timeout || timeLastSync != 0 && now - timeLastSync >= timeout)
       {
          lock.lock();
          try
          {
+            // log.info("** flushing because of timer");
             flush();
          }
          finally
@@ -223,29 +251,36 @@
 
    // Inner classes -------------------------------------------------
 
-   class CheckTimer implements Runnable
+   private class CheckTimer implements Runnable
    {
-      public void run()
-      {
-         checkTimer();
-      }
-   }
+      private volatile boolean closed = false;
 
-   // TODO: is there a better place to get this schedule service from?
-   static class ScheduledSingleton
-   {
-      private static ScheduledExecutorService scheduleService;
+      private TokenBucketLimiter limiter = new TokenBucketLimiterImpl(4000, false);
 
-      private static synchronized ScheduledExecutorService getScheduledService()
+      public void run()
       {
-         if (scheduleService == null)
+         while (!closed)
          {
-            ThreadFactory factory = new JBMThreadFactory("JBM-buffer-scheduled-control", true);
+            // log.info(System.identityHashCode(this) + " firing");
+            checkTimer();
 
-            scheduleService = Executors.newSingleThreadScheduledExecutor(factory);
+            // try
+            // {
+            // Thread.sleep(1);
+            // }
+            // catch (InterruptedException ignore)
+            // {
+            // }
+
+            // limiter.limit();
+
+            Thread.yield();
          }
+      }
 
-         return scheduleService;
+      public void close()
+      {
+         closed = true;
       }
    }
 

Modified: trunk/src/main/org/jboss/messaging/core/config/Configuration.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/config/Configuration.java	2009-06-03 13:22:52 UTC (rev 7180)
+++ trunk/src/main/org/jboss/messaging/core/config/Configuration.java	2009-06-03 14:06:16 UTC (rev 7181)
@@ -200,14 +200,10 @@
    
    int getAIOBufferSize();
    
-   void setAIOBufferTimeout(int timeout);
+   void setAIOBufferTimeout(long timeout);
    
-   int getAIOBufferTimeout();
+   long getAIOBufferTimeout();
    
-   void setAIOFlushOnSync(boolean flush);
-   
-   boolean isAIOFlushOnSync();
-
    boolean isCreateBindingsDir();
 
    void setCreateBindingsDir(boolean create);

Modified: trunk/src/main/org/jboss/messaging/core/config/impl/ConfigurationImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/config/impl/ConfigurationImpl.java	2009-06-03 13:22:52 UTC (rev 7180)
+++ trunk/src/main/org/jboss/messaging/core/config/impl/ConfigurationImpl.java	2009-06-03 14:06:16 UTC (rev 7181)
@@ -97,10 +97,8 @@
 
    public static final int DEFAULT_JOURNAL_MAX_AIO = 500;
    
-   public static final boolean DEFAULT_JOURNAL_AIO_FLUSH_SYNC = false;
+   public static final long DEFAULT_JOURNAL_AIO_BUFFER_TIMEOUT = 500;
    
-   public static final int DEFAULT_JOURNAL_AIO_BUFFER_TIMEOUT = 1;
-   
    public static final int DEFAULT_JOURNAL_AIO_BUFFER_SIZE = 128 * 1024;
 
    public static final boolean DEFAULT_WILDCARD_ROUTING_ENABLED = true;
@@ -238,11 +236,9 @@
    protected int journalMinFiles = DEFAULT_JOURNAL_MIN_FILES;
 
    protected int journalMaxAIO = DEFAULT_JOURNAL_MAX_AIO;
+     
+   protected long journalAIOBufferTimeout = DEFAULT_JOURNAL_AIO_BUFFER_TIMEOUT;
    
-   protected boolean journalAIOFlushSync = DEFAULT_JOURNAL_AIO_FLUSH_SYNC;
-   
-   protected int journalAIOBufferTimeout = DEFAULT_JOURNAL_AIO_BUFFER_TIMEOUT;
-   
    protected int journalAIOBufferSize = DEFAULT_JOURNAL_AIO_BUFFER_SIZE;
 
    protected boolean wildcardRoutingEnabled = DEFAULT_WILDCARD_ROUTING_ENABLED;
@@ -708,27 +704,16 @@
       jmxManagementEnabled = enabled;
    }
 
-
-   public void setAIOBufferTimeout(int timeout)
+   public void setAIOBufferTimeout(long timeout)
    {
       this.journalAIOBufferTimeout = timeout;
    }
    
-   public int getAIOBufferTimeout()
+   public long getAIOBufferTimeout()
    {
       return journalAIOBufferTimeout;
    }
 
-   public void setAIOFlushOnSync(boolean flush)
-   {
-      journalAIOFlushSync = flush;
-   }
-
-   public boolean isAIOFlushOnSync()
-   {
-      return journalAIOFlushSync;
-   }
-
    public int getAIOBufferSize()
    {
       return journalAIOBufferSize;

Modified: trunk/src/main/org/jboss/messaging/core/config/impl/FileConfiguration.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/config/impl/FileConfiguration.java	2009-06-03 13:22:52 UTC (rev 7180)
+++ trunk/src/main/org/jboss/messaging/core/config/impl/FileConfiguration.java	2009-06-03 14:06:16 UTC (rev 7181)
@@ -297,11 +297,9 @@
       journalSyncNonTransactional = getBoolean(e, "journal-sync-non-transactional", journalSyncNonTransactional);
 
       journalFileSize = getInteger(e, "journal-file-size", journalFileSize);
+
+      journalAIOBufferTimeout = getLong(e, "journal-aio-buffer-timeout", DEFAULT_JOURNAL_AIO_BUFFER_TIMEOUT);
       
-      journalAIOFlushSync = getBoolean(e, "journal-aio-flush-on-sync", DEFAULT_JOURNAL_AIO_FLUSH_SYNC);
-      
-      journalAIOBufferTimeout = getInteger(e, "journal-aio-buffer-timeout", DEFAULT_JOURNAL_AIO_BUFFER_TIMEOUT);
-      
       journalAIOBufferSize = getInteger(e, "journal-aio-buffer-size", DEFAULT_JOURNAL_AIO_BUFFER_SIZE);
 
       journalMinFiles = getInteger(e, "journal-min-files", journalMinFiles);

Modified: trunk/src/main/org/jboss/messaging/core/journal/Journal.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/Journal.java	2009-06-03 13:22:52 UTC (rev 7180)
+++ trunk/src/main/org/jboss/messaging/core/journal/Journal.java	2009-06-03 14:06:16 UTC (rev 7181)
@@ -38,35 +38,33 @@
 {
    // Non transactional operations
 
-   void appendAddRecord(long id, byte recordType, byte[] record) throws Exception;
+   void appendAddRecord(long id, byte recordType, byte[] record, boolean sync) throws Exception;
 
-   void appendAddRecord(long id, byte recordType, EncodingSupport record) throws Exception;
-   
    void appendAddRecord(long id, byte recordType, EncodingSupport record, boolean sync) throws Exception;
+   
+   void appendUpdateRecord(long id, byte recordType, byte[] record, boolean sync) throws Exception;
 
-   void appendUpdateRecord(long id, byte recordType, byte[] record) throws Exception;
+   void appendUpdateRecord(long id, byte recordType, EncodingSupport record, boolean sync) throws Exception;
 
-   void appendUpdateRecord(long id, byte recordType, EncodingSupport record) throws Exception;
+   void appendDeleteRecord(long id, boolean sync) throws Exception;
 
-   void appendDeleteRecord(long id) throws Exception;
-
    // Transactional operations
 
-   void appendAddRecordTransactional(long txID, long id, byte recordType, byte[] record) throws Exception;
+   void appendAddRecordTransactional(long txID, long id, byte recordType, byte[] record, boolean sync) throws Exception;
 
-   void appendAddRecordTransactional(long txID, long id, byte recordType, EncodingSupport record) throws Exception;
+   void appendAddRecordTransactional(long txID, long id, byte recordType, EncodingSupport record, boolean sync) throws Exception;
 
-   void appendUpdateRecordTransactional(long txID, long id, byte recordType, byte[] record) throws Exception;
+   void appendUpdateRecordTransactional(long txID, long id, byte recordType, byte[] record, boolean sync) throws Exception;
 
-   void appendUpdateRecordTransactional(long txID, long id, byte recordType, EncodingSupport record) throws Exception;
+   void appendUpdateRecordTransactional(long txID, long id, byte recordType, EncodingSupport record, boolean sync) throws Exception;
 
-   void appendDeleteRecordTransactional(long txID, long id, byte[] record) throws Exception;
+   void appendDeleteRecordTransactional(long txID, long id, byte[] record, boolean sync) throws Exception;
 
-   void appendDeleteRecordTransactional(long txID, long id, EncodingSupport record) throws Exception;
+   void appendDeleteRecordTransactional(long txID, long id, EncodingSupport record, boolean sync) throws Exception;
 
-   void appendDeleteRecordTransactional(long txID, long id) throws Exception;
+   void appendDeleteRecordTransactional(long txID, long id, boolean sync) throws Exception;
 
-   void appendCommitRecord(long txID) throws Exception;
+   void appendCommitRecord(long txID, boolean sync) throws Exception;
 
    /** 
     * 
@@ -79,9 +77,9 @@
     * @param transactionData - extra user data for the prepare
     * @throws Exception
     */
-   void appendPrepareRecord(long txID, EncodingSupport transactionData) throws Exception;
+   void appendPrepareRecord(long txID, EncodingSupport transactionData, boolean sync) throws Exception;
 
-   void appendRollbackRecord(long txID) throws Exception;
+   void appendRollbackRecord(long txID, boolean sync) throws Exception;
 
    // Load
 

Modified: trunk/src/main/org/jboss/messaging/core/journal/SequentialFile.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/SequentialFile.java	2009-06-03 13:22:52 UTC (rev 7180)
+++ trunk/src/main/org/jboss/messaging/core/journal/SequentialFile.java	2009-06-03 14:06:16 UTC (rev 7181)
@@ -80,8 +80,6 @@
 
    long size() throws Exception;
    
-   void flush();
-   
    void renameTo(String newFileName) throws Exception;
 
    void lockBuffer();

Modified: trunk/src/main/org/jboss/messaging/core/journal/TestableJournal.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/TestableJournal.java	2009-06-03 13:22:52 UTC (rev 7180)
+++ trunk/src/main/org/jboss/messaging/core/journal/TestableJournal.java	2009-06-03 14:06:16 UTC (rev 7181)
@@ -50,9 +50,9 @@
 
    int getMinFiles();
 
-   boolean isSyncTransactional();
+  // boolean isSyncTransactional();
 
-   boolean isSyncNonTransactional();
+  // boolean isSyncNonTransactional();
 
    String getFilePrefix();
 

Modified: trunk/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFile.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFile.java	2009-06-03 13:22:52 UTC (rev 7180)
+++ trunk/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFile.java	2009-06-03 14:06:16 UTC (rev 7181)
@@ -35,7 +35,6 @@
 import org.jboss.messaging.core.asyncio.impl.AsynchronousFileImpl;
 import org.jboss.messaging.core.asyncio.impl.TimedBuffer;
 import org.jboss.messaging.core.asyncio.impl.TimedBufferObserver;
-import org.jboss.messaging.core.exception.MessagingException;
 import org.jboss.messaging.core.journal.BufferCallback;
 import org.jboss.messaging.core.journal.IOCallback;
 import org.jboss.messaging.core.journal.SequentialFile;
@@ -71,7 +70,7 @@
 
    private final TimedBuffer timedBuffer;
 
-   private BufferCallback bufferCallback;
+   private final BufferCallback bufferCallback;
    
    private boolean buffering = true;
 
@@ -86,7 +85,7 @@
 
    public AIOSequentialFile(final SequentialFileFactory factory,
                             final int bufferSize,
-                            final int bufferTimeoutMilliseconds,
+                            final long bufferTimeoutMilliseconds,
                             final String journalDir,
                             final String fileName,
                             final int maxIO,
@@ -130,11 +129,6 @@
       return timedBuffer.checkSize(size);
    }
    
-   public void flush()
-   {
-      timedBuffer.flush();
-   }
-
    public void lockBuffer()
    {
       timedBuffer.lock();
@@ -151,7 +145,8 @@
       opened = false;
             
       timedBuffer.flush();
-
+      timedBuffer.stop();
+      
       final CountDownLatch donelatch = new CountDownLatch(1);
 
       executor.execute(new Runnable()
@@ -238,8 +233,8 @@
    }
 
    public void open() throws Exception
-   {
-      open(maxIO);
+   {            
+      open(maxIO);            
    }
 
    /* (non-Javadoc)
@@ -253,13 +248,13 @@
 
    public synchronized void open(final int currentMaxIO) throws Exception
    {
+      timedBuffer.start();
       opened = true;
       aioFile = newFile();
       aioFile.open(journalDir + "/" + fileName, currentMaxIO);
       position.set(0);
       aioFile.setBufferCallback(bufferCallback);
       this.fileSize = aioFile.size();
-
    }
 
    public void setBufferCallback(final BufferCallback callback)

Modified: trunk/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFileFactory.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFileFactory.java	2009-06-03 13:22:52 UTC (rev 7180)
+++ trunk/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFileFactory.java	2009-06-03 14:06:16 UTC (rev 7181)
@@ -42,15 +42,11 @@
  *
  */
 public class AIOSequentialFileFactory extends AbstractSequentialFactory
-{
-   
-   
-
+{     
    private static final Logger log = Logger.getLogger(AIOSequentialFileFactory.class);
 
    private static final boolean trace = log.isTraceEnabled();
-   
-   
+      
    private final ReuseBuffersController buffersControl = new ReuseBuffersController();
 
    // This method exists just to make debug easier.
@@ -68,18 +64,17 @@
    
 
    private final Executor pollerExecutor = Executors.newCachedThreadPool(new JBMThreadFactory("JBM-AIO-poller-pool" + System.identityHashCode(this), true));
-
    
-   final int bufferSize;
+   private final int bufferSize;
    
-   final int bufferTimeout;
+   private final long bufferTimeout;
    
    public AIOSequentialFileFactory(final String journalDir)
    {
       this(journalDir, ConfigurationImpl.DEFAULT_JOURNAL_AIO_BUFFER_SIZE, ConfigurationImpl.DEFAULT_JOURNAL_AIO_BUFFER_TIMEOUT);
    }
 
-   public AIOSequentialFileFactory(final String journalDir, int bufferSize, int bufferTimeout)
+   public AIOSequentialFileFactory(final String journalDir, int bufferSize, long bufferTimeout)
    {
       super(journalDir);
       this.bufferSize = bufferSize;

Modified: trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java	2009-06-03 13:22:52 UTC (rev 7180)
+++ trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java	2009-06-03 14:06:16 UTC (rev 7181)
@@ -171,13 +171,6 @@
 
    private final int minFiles;
 
-   private final boolean syncTransactional;
-
-   private final boolean syncNonTransactional;
-   
-   // used on AIO
-   private final boolean flushOnSync;
-
    private final SequentialFileFactory fileFactory;
 
    public final String filePrefix;
@@ -210,9 +203,6 @@
 
    public JournalImpl(final int fileSize,
                       final int minFiles,
-                      final boolean syncTransactional,
-                      final boolean syncNonTransactional,
-                      final boolean flushOnSync,
                       final SequentialFileFactory fileFactory,
                       final String filePrefix,
                       final String fileExtension,
@@ -253,12 +243,6 @@
 
       this.minFiles = minFiles;
 
-      this.syncTransactional = syncTransactional;
-
-      this.syncNonTransactional = syncNonTransactional;
-      
-      this.flushOnSync = flushOnSync;
-
       this.fileFactory = fileFactory;
 
       this.filePrefix = filePrefix;
@@ -271,16 +255,11 @@
    // Journal implementation
    // ----------------------------------------------------------------
 
-   public void appendAddRecord(final long id, final byte recordType, final byte[] record) throws Exception
+   public void appendAddRecord(final long id, final byte recordType, final byte[] record, final boolean sync) throws Exception
    {
-      appendAddRecord(id, recordType, new ByteArrayEncoding(record), syncNonTransactional);
+      appendAddRecord(id, recordType, new ByteArrayEncoding(record), sync);
    }
 
-   public void appendAddRecord(final long id, final byte recordType, final EncodingSupport record) throws Exception
-   {
-      appendAddRecord(id, recordType, record, syncNonTransactional);
-   }
-
    public void appendAddRecord(final long id, final byte recordType, final EncodingSupport record, final boolean sync) throws Exception
    {
       if (state != STATE_LOADED)
@@ -322,12 +301,12 @@
       }
    }
 
-   public void appendUpdateRecord(final long id, final byte recordType, final byte[] record) throws Exception
+   public void appendUpdateRecord(final long id, final byte recordType, final byte[] record, final boolean sync) throws Exception
    {
-      appendUpdateRecord(id, recordType, new ByteArrayEncoding(record));
+      appendUpdateRecord(id, recordType, new ByteArrayEncoding(record), sync);
    }
 
-   public void appendUpdateRecord(final long id, final byte recordType, final EncodingSupport record) throws Exception
+   public void appendUpdateRecord(final long id, final byte recordType, final EncodingSupport record, final boolean sync) throws Exception
    {
       if (state != STATE_LOADED)
       {
@@ -354,12 +333,12 @@
       bb.writeInt(size);
 
       
-      IOCallback callback = getSyncCallback(syncNonTransactional);
+      IOCallback callback = getSyncCallback(sync);
       
       lock.lock();
       try
       {
-         JournalFile usedFile = appendRecord(bb.toByteBuffer(), syncNonTransactional, callback);
+         JournalFile usedFile = appendRecord(bb.toByteBuffer(), sync, callback);
 
          posFiles.addUpdateFile(usedFile);
       }
@@ -374,7 +353,7 @@
       }
    }
 
-   public void appendDeleteRecord(final long id) throws Exception
+   public void appendDeleteRecord(final long id, final boolean sync) throws Exception
    {
       if (state != STATE_LOADED)
       {
@@ -397,12 +376,12 @@
       bb.putLong(id);
       bb.putInt(size);
       
-      IOCallback callback = getSyncCallback(syncNonTransactional);
+      IOCallback callback = getSyncCallback(sync);
 
       lock.lock();
       try
       {
-         JournalFile usedFile = appendRecord(bb, syncNonTransactional, callback);
+         JournalFile usedFile = appendRecord(bb, sync, callback);
 
          posFiles.addDelete(usedFile);
       }
@@ -417,16 +396,18 @@
       }
    }
 
-   public void appendAddRecordTransactional(final long txID, final long id, final byte recordType, final byte[] record) throws Exception
+   public void appendAddRecordTransactional(final long txID, final long id, final byte recordType, final byte[] record,
+                                            final boolean sync) throws Exception
    {
-      appendAddRecordTransactional(txID, id, recordType, new ByteArrayEncoding(record));
+      appendAddRecordTransactional(txID, id, recordType, new ByteArrayEncoding(record), sync);
 
    }
 
    public void appendAddRecordTransactional(final long txID,
                                             final long id,
                                             final byte recordType,
-                                            final EncodingSupport record) throws Exception
+                                            final EncodingSupport record,
+                                            final boolean sync) throws Exception
    {
       if (state != STATE_LOADED)
       {
@@ -451,7 +432,7 @@
       lock.lock();
       try
       {
-         JournalFile usedFile = appendRecord(bb.toByteBuffer(), false, getTransactionCallback(txID));
+         JournalFile usedFile = appendRecord(bb.toByteBuffer(), false, getTransactionCallback(txID, sync));
 
          JournalTransaction tx = getTransactionInfo(txID);
 
@@ -466,15 +447,17 @@
    public void appendUpdateRecordTransactional(final long txID,
                                                final long id,
                                                final byte recordType,
-                                               final byte[] record) throws Exception
+                                               final byte[] record,
+                                               final boolean sync) throws Exception
    {
-      appendUpdateRecordTransactional(txID, id, recordType, new ByteArrayEncoding(record));
+      appendUpdateRecordTransactional(txID, id, recordType, new ByteArrayEncoding(record), sync);
    }
 
    public void appendUpdateRecordTransactional(final long txID,
                                                final long id,
                                                final byte recordType,
-                                               final EncodingSupport record) throws Exception
+                                               final EncodingSupport record,
+                                               final boolean sync) throws Exception
    {
       if (state != STATE_LOADED)
       {
@@ -497,7 +480,7 @@
       lock.lock();
       try
       {
-         JournalFile usedFile = appendRecord(bb.toByteBuffer(), false, getTransactionCallback(txID));
+         JournalFile usedFile = appendRecord(bb.toByteBuffer(), false, getTransactionCallback(txID, sync));
 
          JournalTransaction tx = getTransactionInfo(txID);
 
@@ -509,12 +492,14 @@
       }
    }
 
-   public void appendDeleteRecordTransactional(final long txID, final long id, final byte[] record) throws Exception
+   public void appendDeleteRecordTransactional(final long txID, final long id, final byte[] record,
+                                               final boolean sync) throws Exception
    {
-      appendDeleteRecordTransactional(txID, id, new ByteArrayEncoding(record));
+      appendDeleteRecordTransactional(txID, id, new ByteArrayEncoding(record), sync);
    }
 
-   public void appendDeleteRecordTransactional(final long txID, final long id, final EncodingSupport record) throws Exception
+   public void appendDeleteRecordTransactional(final long txID, final long id, final EncodingSupport record,
+                                               final boolean sync) throws Exception
    {
       if (state != STATE_LOADED)
       {
@@ -539,7 +524,7 @@
       lock.lock();
       try
       {
-         JournalFile usedFile = appendRecord(bb.toByteBuffer(), false, getTransactionCallback(txID));
+         JournalFile usedFile = appendRecord(bb.toByteBuffer(), false, getTransactionCallback(txID, sync));
 
          JournalTransaction tx = getTransactionInfo(txID);
 
@@ -551,7 +536,8 @@
       }
    }
 
-   public void appendDeleteRecordTransactional(final long txID, final long id) throws Exception
+   public void appendDeleteRecordTransactional(final long txID, final long id,
+                                               final boolean sync) throws Exception
    {
       if (state != STATE_LOADED)
       {
@@ -572,7 +558,7 @@
       lock.lock();
       try
       {
-         JournalFile usedFile = appendRecord(bb.toByteBuffer(), false, getTransactionCallback(txID));
+         JournalFile usedFile = appendRecord(bb.toByteBuffer(), false, getTransactionCallback(txID, sync));
 
          JournalTransaction tx = getTransactionInfo(txID);
 
@@ -597,7 +583,7 @@
     * @param transactionData - extra user data for the prepare
     * @throws Exception
     */
-   public void appendPrepareRecord(final long txID, final EncodingSupport transactionData) throws Exception
+   public void appendPrepareRecord(final long txID, final EncodingSupport transactionData, final boolean sync) throws Exception
    {
       if (state != STATE_LOADED)
       {
@@ -608,12 +594,12 @@
 
       ByteBuffer bb = writeTransaction(PREPARE_RECORD, txID, tx, transactionData);
 
-      IOCallback callback = getTransactionCallback(txID);
+      IOCallback callback = getTransactionCallback(txID, sync);
 
       lock.lock();
       try
       {
-         JournalFile usedFile = appendRecord(bb, syncTransactional, callback);
+         JournalFile usedFile = appendRecord(bb, sync, callback);
 
          tx.prepare(usedFile);
       }
@@ -646,7 +632,7 @@
     *
     * @see JournalImpl#writeTransaction(byte, long, org.jboss.messaging.core.journal.impl.JournalImpl.JournalTransaction, EncodingSupport)
     */
-   public void appendCommitRecord(final long txID) throws Exception
+   public void appendCommitRecord(final long txID, final boolean sync) throws Exception
    {
       if (state != STATE_LOADED)
       {
@@ -662,12 +648,12 @@
 
       ByteBuffer bb = writeTransaction(COMMIT_RECORD, txID, tx, null);
 
-      IOCallback callback = getTransactionCallback(txID);
+      IOCallback callback = getTransactionCallback(txID, sync);
 
       lock.lock();
       try
       {
-         JournalFile usedFile = appendRecord(bb, syncTransactional, callback);
+         JournalFile usedFile = appendRecord(bb, sync, callback);
 
          transactionCallbacks.remove(txID);
 
@@ -686,7 +672,7 @@
 
    }
 
-   public void appendRollbackRecord(final long txID) throws Exception
+   public void appendRollbackRecord(final long txID, final boolean sync) throws Exception
    {
       if (state != STATE_LOADED)
       {
@@ -709,12 +695,12 @@
       bb.putLong(txID);
       bb.putInt(size);
 
-      IOCallback callback = getTransactionCallback(txID);
+      IOCallback callback = getTransactionCallback(txID, sync);
 
       lock.lock();
       try
       {
-         JournalFile usedFile = appendRecord(bb, syncTransactional, callback);
+         JournalFile usedFile = appendRecord(bb, sync, callback);
 
          transactionCallbacks.remove(txID);
 
@@ -1476,12 +1462,7 @@
    /** Method for use on testcases.
     *  It will call waitComplete on every transaction, so any assertions on the file system will be correct after this */
    public void debugWait() throws Exception
-   {
-      if (currentFile != null)
-      {
-         currentFile.getFile().flush();
-      }
-      
+   {         
       for (TransactionCallback callback : transactionCallbacks.values())
       {
          callback.waitCompletion();
@@ -1572,16 +1553,6 @@
       return minFiles;
    }
 
-   public boolean isSyncTransactional()
-   {
-      return syncTransactional;
-   }
-
-   public boolean isSyncNonTransactional()
-   {
-      return syncNonTransactional;
-   }
-
    public String getFilePrefix()
    {
       return filePrefix;
@@ -2029,12 +2000,6 @@
          if (callback != null)
          {
             currentFile.getFile().write(bb, sync, callback);
-
-            // This is defaulted to false. The user is telling us to not wait the buffer timeout when a commit or sync is called
-            if (flushOnSync && sync)
-            {
-               currentFile.getFile().flush();
-            }
          }
          else
          {
@@ -2268,11 +2233,9 @@
       }
    }
 
-
-
-   private IOCallback getTransactionCallback(final long transactionId) throws MessagingException
+   private IOCallback getTransactionCallback(final long transactionId, final boolean sync) throws MessagingException
    {
-      if (fileFactory.isSupportsCallbacks() && syncTransactional)
+      if (sync && fileFactory.isSupportsCallbacks())
       {
          TransactionCallback callback = transactionCallbacks.get(transactionId);
 

Modified: trunk/src/main/org/jboss/messaging/core/management/MessagingServerControlMBean.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/management/MessagingServerControlMBean.java	2009-06-03 13:22:52 UTC (rev 7180)
+++ trunk/src/main/org/jboss/messaging/core/management/MessagingServerControlMBean.java	2009-06-03 14:06:16 UTC (rev 7181)
@@ -92,10 +92,8 @@
    
    int getAIOBufferSize();
    
-   int getAIOBufferTimeout();
+   long getAIOBufferTimeout();
    
-   boolean isAIOFlushOnSync();
-
    public long getPagingMaxGlobalSizeBytes();
 
    public String getPagingDirectory();

Modified: trunk/src/main/org/jboss/messaging/core/management/impl/MessagingServerControl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/management/impl/MessagingServerControl.java	2009-06-03 13:22:52 UTC (rev 7180)
+++ trunk/src/main/org/jboss/messaging/core/management/impl/MessagingServerControl.java	2009-06-03 14:06:16 UTC (rev 7181)
@@ -22,6 +22,25 @@
 
 package org.jboss.messaging.core.management.impl;
 
+import java.text.DateFormat;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.Map.Entry;
+
+import javax.management.ListenerNotFoundException;
+import javax.management.MBeanNotificationInfo;
+import javax.management.NotificationBroadcasterSupport;
+import javax.management.NotificationEmitter;
+import javax.management.NotificationFilter;
+import javax.management.NotificationListener;
+import javax.transaction.xa.Xid;
+
 import org.jboss.messaging.core.config.Configuration;
 import org.jboss.messaging.core.config.TransportConfiguration;
 import org.jboss.messaging.core.exception.MessagingException;
@@ -40,24 +59,6 @@
 import org.jboss.messaging.core.transaction.impl.XidImpl;
 import org.jboss.messaging.utils.SimpleString;
 
-import javax.management.ListenerNotFoundException;
-import javax.management.MBeanNotificationInfo;
-import javax.management.NotificationBroadcasterSupport;
-import javax.management.NotificationEmitter;
-import javax.management.NotificationFilter;
-import javax.management.NotificationListener;
-import javax.transaction.xa.Xid;
-import java.text.DateFormat;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.Date;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-
 /**
  * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
  * 
@@ -156,23 +157,16 @@
       return configuration.getInterceptorClassNames().toArray(new String[configuration.getInterceptorClassNames().size()]);
    }
 
-   
-   
    public int getAIOBufferSize()
    {
       return configuration.getAIOBufferSize();
    }
    
-   public int getAIOBufferTimeout()
+   public long getAIOBufferTimeout()
    {
       return configuration.getAIOBufferTimeout();
    }
    
-   public boolean isAIOFlushOnSync()
-   {
-      return configuration.isAIOFlushOnSync();
-   }
-
    public String getJournalDirectory()
    {
       return configuration.getJournalDirectory();

Modified: trunk/src/main/org/jboss/messaging/core/management/jmx/impl/ReplicationAwareMessagingServerControlWrapper.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/management/jmx/impl/ReplicationAwareMessagingServerControlWrapper.java	2009-06-03 13:22:52 UTC (rev 7180)
+++ trunk/src/main/org/jboss/messaging/core/management/jmx/impl/ReplicationAwareMessagingServerControlWrapper.java	2009-06-03 14:06:16 UTC (rev 7181)
@@ -96,16 +96,11 @@
       return localControl.getAIOBufferSize();
    }
 
-   public int getAIOBufferTimeout()
+   public long getAIOBufferTimeout()
    {
       return localControl.getAIOBufferTimeout();
    }
 
-   public boolean isAIOFlushOnSync()
-   {
-      return localControl.isAIOFlushOnSync();
-   }
-
    public String getJournalDirectory()
    {
       return localControl.getJournalDirectory();

Modified: trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java	2009-06-03 13:22:52 UTC (rev 7180)
+++ trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java	2009-06-03 14:06:16 UTC (rev 7181)
@@ -138,6 +138,10 @@
 
    private final Executor executor;
 
+   private final boolean syncTransactional;
+
+   private final boolean syncNonTransactional;
+
    public JournalStorageManager(final Configuration config, final Executor executor)
    {
       this.executor = executor;
@@ -158,7 +162,7 @@
 
       SequentialFileFactory bindingsFF = new NIOSequentialFileFactory(bindingsDir);
 
-      bindingsJournal = new JournalImpl(1024 * 1024, 2, true, true, false, bindingsFF, "jbm-bindings", "bindings", 1);
+      bindingsJournal = new JournalImpl(1024 * 1024, 2, bindingsFF, "jbm-bindings", "bindings", 1);
 
       String journalDir = config.getJournalDirectory();
 
@@ -167,6 +171,10 @@
          throw new NullPointerException("journal-dir is null");
       }
 
+      syncNonTransactional = config.isJournalSyncNonTransactional();
+
+      syncTransactional = config.isJournalSyncTransactional();
+
       checkAndCreateDir(journalDir, config.isCreateJournalDir());
 
       SequentialFileFactory journalFF = null;
@@ -181,7 +189,9 @@
          }
          else
          {
-            journalFF = new AIOSequentialFileFactory(journalDir, config.getAIOBufferSize(), config.getAIOBufferTimeout());
+            journalFF = new AIOSequentialFileFactory(journalDir,
+                                                     config.getAIOBufferSize(),
+                                                     config.getAIOBufferTimeout());
             log.info("AIO loaded successfully");
          }
       }
@@ -196,10 +206,7 @@
       }
 
       messageJournal = new JournalImpl(config.getJournalFileSize(),
-                                       config.getJournalMinFiles(),
-                                       config.isJournalSyncTransactional(),
-                                       config.isJournalSyncNonTransactional(),
-                                       config.isAIOFlushOnSync(),
+                                       config.getJournalMinFiles(),                                      
                                        journalFF,
                                        "jbm-data",
                                        "jbm",
@@ -254,37 +261,40 @@
    // Non transactional operations
 
    public void storeMessage(final ServerMessage message) throws Exception
-   {  
+   {
       if (message.getMessageID() <= 0)
       {
          throw new MessagingException(MessagingException.ILLEGAL_STATE, "MessageId was not assigned to Message");
       }
 
+      // Note that we don't sync, the add reference that comes immediately after will sync
+
       if (message instanceof LargeServerMessage)
       {
          messageJournal.appendAddRecord(message.getMessageID(),
                                         ADD_LARGE_MESSAGE,
-                                        new LargeMessageEncoding((LargeServerMessage)message));
+                                        new LargeMessageEncoding((LargeServerMessage)message),
+                                        false);
       }
       else
       {
-         messageJournal.appendAddRecord(message.getMessageID(), ADD_MESSAGE, message);
+         messageJournal.appendAddRecord(message.getMessageID(), ADD_MESSAGE, message, false);
       }
    }
 
    public void storeReference(final long queueID, final long messageID) throws Exception
    {
-      messageJournal.appendUpdateRecord(messageID, ADD_REF, new RefEncoding(queueID));
+      messageJournal.appendUpdateRecord(messageID, ADD_REF, new RefEncoding(queueID), syncNonTransactional);
    }
 
    public void storeAcknowledge(final long queueID, final long messageID) throws Exception
    {
-      messageJournal.appendUpdateRecord(messageID, ACKNOWLEDGE_REF, new RefEncoding(queueID));
+      messageJournal.appendUpdateRecord(messageID, ACKNOWLEDGE_REF, new RefEncoding(queueID), syncNonTransactional);
    }
 
    public void deleteMessage(final long messageID) throws Exception
    {
-      messageJournal.appendDeleteRecord(messageID);
+      messageJournal.appendDeleteRecord(messageID, syncNonTransactional);
    }
 
    public void updateScheduledDeliveryTime(final MessageReference ref) throws Exception
@@ -292,26 +302,29 @@
       ScheduledDeliveryEncoding encoding = new ScheduledDeliveryEncoding(ref.getScheduledDeliveryTime(),
                                                                          ref.getQueue().getPersistenceID());
 
-      messageJournal.appendUpdateRecord(ref.getMessage().getMessageID(), SET_SCHEDULED_DELIVERY_TIME, encoding);
+      messageJournal.appendUpdateRecord(ref.getMessage().getMessageID(),
+                                        SET_SCHEDULED_DELIVERY_TIME,
+                                        encoding,
+                                        syncNonTransactional);
    }
 
    public void storeDuplicateID(final SimpleString address, final byte[] duplID, final long recordID) throws Exception
    {
       DuplicateIDEncoding encoding = new DuplicateIDEncoding(address, duplID);
 
-      messageJournal.appendAddRecord(recordID, DUPLICATE_ID, encoding);
+      messageJournal.appendAddRecord(recordID, DUPLICATE_ID, encoding, syncNonTransactional);
    }
 
    public void updateDuplicateID(final SimpleString address, final byte[] duplID, final long recordID) throws Exception
    {
       DuplicateIDEncoding encoding = new DuplicateIDEncoding(address, duplID);
 
-      messageJournal.appendUpdateRecord(recordID, DUPLICATE_ID, encoding);
+      messageJournal.appendUpdateRecord(recordID, DUPLICATE_ID, encoding, syncNonTransactional);
    }
 
    public void deleteDuplicateID(long recordID) throws Exception
    {
-      messageJournal.appendDeleteRecord(recordID);
+      messageJournal.appendDeleteRecord(recordID, syncNonTransactional);
    }
 
    // Transactional operations
@@ -328,11 +341,16 @@
          messageJournal.appendAddRecordTransactional(txID,
                                                      message.getMessageID(),
                                                      ADD_LARGE_MESSAGE,
-                                                     new LargeMessageEncoding(((LargeServerMessage)message)));
+                                                     new LargeMessageEncoding(((LargeServerMessage)message)),
+                                                     syncTransactional);
       }
       else
       {
-         messageJournal.appendAddRecordTransactional(txID, message.getMessageID(), ADD_MESSAGE, message);
+         messageJournal.appendAddRecordTransactional(txID,
+                                                     message.getMessageID(),
+                                                     ADD_MESSAGE,
+                                                     message,
+                                                     syncTransactional);
       }
 
    }
@@ -343,7 +361,7 @@
       {
          // Instead of updating the record, we delete the old one as that is
          // better for reclaiming
-         messageJournal.appendDeleteRecordTransactional(txID, pageTransaction.getRecordID());
+         messageJournal.appendDeleteRecordTransactional(txID, pageTransaction.getRecordID(), syncTransactional);
       }
 
       pageTransaction.setRecordID(generateUniqueID());
@@ -351,22 +369,31 @@
       messageJournal.appendAddRecordTransactional(txID,
                                                   pageTransaction.getRecordID(),
                                                   PAGE_TRANSACTION,
-                                                  pageTransaction);
+                                                  pageTransaction,
+                                                  syncTransactional);
    }
 
    public void storeReferenceTransactional(final long txID, final long queueID, final long messageID) throws Exception
    {
-      messageJournal.appendUpdateRecordTransactional(txID, messageID, ADD_REF, new RefEncoding(queueID));
+      messageJournal.appendUpdateRecordTransactional(txID,
+                                                     messageID,
+                                                     ADD_REF,
+                                                     new RefEncoding(queueID),
+                                                     syncTransactional);
    }
 
    public void storeAcknowledgeTransactional(final long txID, final long queueID, final long messageID) throws Exception
    {
-      messageJournal.appendUpdateRecordTransactional(txID, messageID, ACKNOWLEDGE_REF, new RefEncoding(queueID));
+      messageJournal.appendUpdateRecordTransactional(txID,
+                                                     messageID,
+                                                     ACKNOWLEDGE_REF,
+                                                     new RefEncoding(queueID),
+                                                     syncTransactional);
    }
 
    public void deletePageTransactional(final long txID, final long recordID) throws Exception
    {
-      messageJournal.appendDeleteRecordTransactional(txID, recordID);
+      messageJournal.appendDeleteRecordTransactional(txID, recordID, syncTransactional);
    }
 
    public void updateScheduledDeliveryTimeTransactional(final long txID, final MessageReference ref) throws Exception
@@ -377,27 +404,28 @@
       messageJournal.appendUpdateRecordTransactional(txID,
                                                      ref.getMessage().getMessageID(),
                                                      SET_SCHEDULED_DELIVERY_TIME,
-                                                     encoding);
+                                                     encoding,
+                                                     syncTransactional);
    }
 
    public void deleteMessageTransactional(final long txID, final long queueID, final long messageID) throws Exception
    {
-      messageJournal.appendDeleteRecordTransactional(txID, messageID, new DeleteEncoding(queueID));
+      messageJournal.appendDeleteRecordTransactional(txID, messageID, new DeleteEncoding(queueID), syncTransactional);
    }
 
    public void prepare(final long txID, final Xid xid) throws Exception
    {
-      messageJournal.appendPrepareRecord(txID, new XidEncoding(xid));
+      messageJournal.appendPrepareRecord(txID, new XidEncoding(xid), syncTransactional);
    }
 
    public void commit(final long txID) throws Exception
    {
-      messageJournal.appendCommitRecord(txID);
+      messageJournal.appendCommitRecord(txID, syncTransactional);
    }
 
    public void rollback(final long txID) throws Exception
    {
-      messageJournal.appendRollbackRecord(txID);
+      messageJournal.appendRollbackRecord(txID, syncTransactional);
    }
 
    public void storeDuplicateIDTransactional(final long txID,
@@ -407,7 +435,7 @@
    {
       DuplicateIDEncoding encoding = new DuplicateIDEncoding(address, duplID);
 
-      messageJournal.appendAddRecordTransactional(txID, recordID, DUPLICATE_ID, encoding);
+      messageJournal.appendAddRecordTransactional(txID, recordID, DUPLICATE_ID, encoding, syncTransactional);
    }
 
    public void updateDuplicateIDTransactional(final long txID,
@@ -417,12 +445,12 @@
    {
       DuplicateIDEncoding encoding = new DuplicateIDEncoding(address, duplID);
 
-      messageJournal.appendUpdateRecordTransactional(txID, recordID, DUPLICATE_ID, encoding);
+      messageJournal.appendUpdateRecordTransactional(txID, recordID, DUPLICATE_ID, encoding, syncTransactional);
    }
 
    public void deleteDuplicateIDTransactional(long txID, long recordID) throws Exception
    {
-      messageJournal.appendDeleteRecordTransactional(txID, recordID);
+      messageJournal.appendDeleteRecordTransactional(txID, recordID, syncTransactional);
    }
 
    // Other operations
@@ -432,7 +460,10 @@
       DeliveryCountUpdateEncoding updateInfo = new DeliveryCountUpdateEncoding(ref.getQueue().getPersistenceID(),
                                                                                ref.getDeliveryCount());
 
-      messageJournal.appendUpdateRecord(ref.getMessage().getMessageID(), UPDATE_DELIVERY_COUNT, updateInfo);
+      messageJournal.appendUpdateRecord(ref.getMessage().getMessageID(),
+                                        UPDATE_DELIVERY_COUNT,
+                                        updateInfo,
+                                        syncNonTransactional);
    }
 
    private static final class AddMessageRecord
@@ -873,12 +904,12 @@
 
       queue.setPersistenceID(id);
 
-      bindingsJournal.appendAddRecord(id, QUEUE_BINDING_RECORD, bindingEncoding);
+      bindingsJournal.appendAddRecord(id, QUEUE_BINDING_RECORD, bindingEncoding, true);
    }
 
    public void deleteQueueBinding(final long queueBindingID) throws Exception
    {
-      bindingsJournal.appendDeleteRecord(queueBindingID);
+      bindingsJournal.appendDeleteRecord(queueBindingID, true);
    }
 
    public void loadBindingJournal(final List<QueueBindingInfo> queueBindingInfos) throws Exception
@@ -1077,11 +1108,11 @@
       private volatile long nextID;
 
       public BatchingIDGenerator(final long start, final long checkpointSize)
-      {         
+      {
          this.counter = new AtomicLong(start);
 
          this.checkpointSize = checkpointSize;
-         
+
          nextID = start + checkpointSize;
       }
 
@@ -1109,9 +1140,9 @@
       }
 
       private synchronized void saveCheckPoint(final long id)
-      {        
+      {
          if (id >= nextID)
-         {            
+         {
             storeID(id);
 
             nextID += checkpointSize;

Modified: trunk/tests/src/org/jboss/messaging/tests/integration/journal/RealAIOJournalImplTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/journal/RealAIOJournalImplTest.java	2009-06-03 13:22:52 UTC (rev 7180)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/journal/RealAIOJournalImplTest.java	2009-06-03 14:06:16 UTC (rev 7181)
@@ -73,22 +73,7 @@
       return new AIOSequentialFileFactory(getTestDir());
    }
 
-   public void testAddSync() throws Exception
-   {
-      setup(10, 10 * 1024, true);
-      this.flushOnSync = false;
-      createJournal();
-      startJournal();
-      load();
 
-      for (int i = 0; i < 1000; i++)
-      {
-         journal.appendAddRecord(i, (byte)1, new byte[] { 10, 12 });
-      }
-
-      stopJournal();
-   }
-
    @Override
    protected int getAlignment()
    {

Modified: trunk/tests/src/org/jboss/messaging/tests/integration/management/MessagingServerControlTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/management/MessagingServerControlTest.java	2009-06-03 13:22:52 UTC (rev 7180)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/management/MessagingServerControlTest.java	2009-06-03 14:06:16 UTC (rev 7181)
@@ -99,8 +99,7 @@
       assertEquals(conf.getJournalMinFiles(), serverControl.getJournalMinFiles());
       assertEquals(conf.getJournalMaxAIO(), serverControl.getJournalMaxAIO());
       assertEquals(conf.getAIOBufferSize(), serverControl.getAIOBufferSize());
-      assertEquals(conf.getAIOBufferTimeout(), serverControl.getAIOBufferTimeout());
-      assertEquals(conf.isAIOFlushOnSync(), serverControl.isAIOFlushOnSync());
+      assertEquals(conf.getAIOBufferTimeout(), serverControl.getAIOBufferTimeout());      
       assertEquals(conf.isCreateBindingsDir(), serverControl.isCreateBindingsDir());
       assertEquals(conf.isCreateJournalDir(), serverControl.isCreateJournalDir());      
       assertEquals(conf.getPagingDirectory(), serverControl.getPagingDirectory());

Modified: trunk/tests/src/org/jboss/messaging/tests/integration/management/MessagingServerControlUsingCoreTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/management/MessagingServerControlUsingCoreTest.java	2009-06-03 13:22:52 UTC (rev 7180)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/management/MessagingServerControlUsingCoreTest.java	2009-06-03 14:06:16 UTC (rev 7181)
@@ -422,9 +422,9 @@
             return (Integer)proxy.retrieveAttributeValue("AIOBufferSize");
          }
 
-         public int getAIOBufferTimeout()
+         public long getAIOBufferTimeout()
          {
-            return (Integer)proxy.retrieveAttributeValue("AIOBufferTimeout");
+            return (Long)proxy.retrieveAttributeValue("AIOBufferTimeout");
          }
 
          public boolean isAIOFlushOnSync()

Modified: trunk/tests/src/org/jboss/messaging/tests/integration/remoting/SynchronousCloseTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/remoting/SynchronousCloseTest.java	2009-06-03 13:22:52 UTC (rev 7180)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/remoting/SynchronousCloseTest.java	2009-06-03 14:06:16 UTC (rev 7181)
@@ -111,19 +111,11 @@
 
       ClientSessionFactory sf = createSessionFactory();
 
-      for (int i = 0; i < 100; i++)
+      for (int i = 0; i < 2000; i++)
       {
          ClientSession session = sf.createSession(false, true, true);
 
-         assertEquals(1, server.getMessagingServerControl().listRemoteAddresses().length);
-
-         log.info("closing session");
          session.close();
-         log.info("closed session");
-
-         // Thread.sleep(10000);
-
-         assertEquals(0, server.getMessagingServerControl().listRemoteAddresses().length);
       }
 
       sf.close();

Modified: trunk/tests/src/org/jboss/messaging/tests/performance/journal/JournalImplTestUnit.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/performance/journal/JournalImplTestUnit.java	2009-06-03 13:22:52 UTC (rev 7180)
+++ trunk/tests/src/org/jboss/messaging/tests/performance/journal/JournalImplTestUnit.java	2009-06-03 14:06:16 UTC (rev 7181)
@@ -18,7 +18,7 @@
  * License along with this software; if not, write to the Free
  * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
  * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */ 
+ */
 
 package org.jboss.messaging.tests.performance.journal;
 
@@ -44,43 +44,43 @@
 public abstract class JournalImplTestUnit extends JournalImplTestBase
 {
    private static final Logger log = Logger.getLogger(JournalImplTestUnit.class);
-   
+
    protected void tearDown() throws Exception
    {
       super.tearDown();
-      
+
       assertEquals(0, AsynchronousFileImpl.getTotalMaxIO());
    }
-   
+
    public void testAddUpdateDeleteManyLargeFileSize() throws Exception
    {
       final int numberAdds = 1000;
-      
+
       final int numberUpdates = 500;
-      
+
       final int numberDeletes = 300;
-                  
+
       long[] adds = new long[numberAdds];
-      
+
       for (int i = 0; i < numberAdds; i++)
       {
          adds[i] = i;
       }
-      
+
       long[] updates = new long[numberUpdates];
-      
+
       for (int i = 0; i < numberUpdates; i++)
       {
          updates[i] = i;
       }
-      
+
       long[] deletes = new long[numberDeletes];
-      
+
       for (int i = 0; i < numberDeletes; i++)
       {
          deletes[i] = i;
       }
-      
+
       setup(10, 10 * 1024 * 1024, true);
       createJournal();
       startJournal();
@@ -92,38 +92,38 @@
       createJournal();
       startJournal();
       loadAndCheck();
-      
+
    }
-   
+
    public void testAddUpdateDeleteManySmallFileSize() throws Exception
    {
       final int numberAdds = 1000;
-      
+
       final int numberUpdates = 500;
-      
+
       final int numberDeletes = 300;
-                  
+
       long[] adds = new long[numberAdds];
-      
+
       for (int i = 0; i < numberAdds; i++)
       {
          adds[i] = i;
       }
-      
+
       long[] updates = new long[numberUpdates];
-      
+
       for (int i = 0; i < numberUpdates; i++)
       {
          updates[i] = i;
       }
-      
+
       long[] deletes = new long[numberDeletes];
-      
+
       for (int i = 0; i < numberDeletes; i++)
       {
          deletes[i] = i;
       }
-      
+
       setup(10, 10 * 1024, true);
       createJournal();
       startJournal();
@@ -137,112 +137,110 @@
       createJournal();
       startJournal();
       loadAndCheck();
-      
+
    }
-   
+
    public void testReclaimAndReload() throws Exception
    {
       setup(2, 10 * 1024 * 1024, false);
       createJournal();
       startJournal();
       load();
-      
+
       long start = System.currentTimeMillis();
-      
-                  
+
       byte[] record = generateRecord(recordLength);
-      
+
       int NUMBER_OF_RECORDS = 1000;
 
       for (int count = 0; count < NUMBER_OF_RECORDS; count++)
       {
-         journal.appendAddRecord(count, (byte)0, record);
-         
+         journal.appendAddRecord(count, (byte)0, record, true);
+
          if (count >= NUMBER_OF_RECORDS / 2)
          {
-            journal.appendDeleteRecord(count - NUMBER_OF_RECORDS / 2);
+            journal.appendDeleteRecord(count - NUMBER_OF_RECORDS / 2, true);
          }
-         
+
          if (count % 100 == 0)
          {
             log.debug("Done: " + count);
          }
       }
-      
+
       long end = System.currentTimeMillis();
-      
+
       double rate = 1000 * ((double)NUMBER_OF_RECORDS) / (end - start);
-      
+
       log.info("Rate of " + rate + " adds/removes per sec");
-      
+
       log.debug("Reclaim status = " + debugJournal());
-               
+
       stopJournal();
       createJournal();
       startJournal();
       journal.load(new ArrayList<RecordInfo>(), new ArrayList<PreparedTransactionInfo>());
-      
+
       assertEquals(NUMBER_OF_RECORDS / 2, journal.getIDMapSize());
-      
+
       stopJournal();
    }
-   
+
    public void testSpeedNonTransactional() throws Exception
    {
-      for (int i=0;i<1;i++)
+      for (int i = 0; i < 1; i++)
       {
          this.setUp();
-         System.gc(); Thread.sleep(500);
+         System.gc();
+         Thread.sleep(500);
          internaltestSpeedNonTransactional();
          this.tearDown();
       }
    }
-   
+
    public void testSpeedTransactional() throws Exception
    {
-      Journal journal =
-         new JournalImpl(10 * 1024 * 1024, 10, true, true, false, getFileFactory(),
-               "jbm-data", "jbm", 5000);
-      
+      Journal journal = new JournalImpl(10 * 1024 * 1024, 10, getFileFactory(), "jbm-data", "jbm", 5000);
+
       journal.start();
-      
+
       journal.load(new ArrayList<RecordInfo>(), null);
-      
+
       try
       {
          final int numMessages = 50050;
-         
+
          SimpleEncoding data = new SimpleEncoding(1024, (byte)'j');
-         
+
          long start = System.currentTimeMillis();
-         
+
          int count = 0;
          double rates[] = new double[50];
          for (int i = 0; i < 50; i++)
          {
             long startTrans = System.currentTimeMillis();
-            for (int j=0; j<1000; j++)
+            for (int j = 0; j < 1000; j++)
             {
-               journal.appendAddRecordTransactional(i, count++, (byte)0, data);
+               journal.appendAddRecordTransactional(i, count++, (byte)0, data, true);
             }
-            
-            journal.appendCommitRecord(i);
-            
+
+            journal.appendCommitRecord(i, true);
+
             long endTrans = System.currentTimeMillis();
-   
+
             rates[i] = 1000 * (double)1000 / (endTrans - startTrans);
          }
-         
+
          long end = System.currentTimeMillis();
-         
-         for (double rate: rates)
+
+         for (double rate : rates)
          {
             log.info("Transaction Rate = " + rate + " records/sec");
-            
+
          }
-         
+
          double rate = 1000 * (double)numMessages / (end - start);
-         
+
          log.info("Rate " + rate + " records/sec");
       }
       finally
@@ -251,54 +249,48 @@
       }
 
    }
-   
+
    private void internaltestSpeedNonTransactional() throws Exception
-   {      
+   {
       final long numMessages = 10000;
-      
-      int numFiles =  (int)(((numMessages * 1024 + 512) / (10 * 1024 * 1024)) * 1.3);
-      
-      if (numFiles<2) numFiles = 2;
-      
+
+      int numFiles = (int)(((numMessages * 1024 + 512) / (10 * 1024 * 1024)) * 1.3);
+
+      if (numFiles < 2)
+         numFiles = 2;
+
       log.debug("num Files=" + numFiles);
 
-      Journal journal =
-         new JournalImpl(10 * 1024 * 1024,  numFiles, true, true, false, getFileFactory(),
-               "jbm-data", "jbm", 5000);
-      
+      Journal journal = new JournalImpl(10 * 1024 * 1024, numFiles, getFileFactory(), "jbm-data", "jbm", 5000);
+
       journal.start();
-      
+
       journal.load(new ArrayList<RecordInfo>(), null);
-            
+
       log.debug("Adding data");
       SimpleEncoding data = new SimpleEncoding(700, (byte)'j');
-      
+
       long start = System.currentTimeMillis();
-      
+
       for (int i = 0; i < numMessages; i++)
       {
-         journal.appendAddRecord(i, (byte)0, data);
+         journal.appendAddRecord(i, (byte)0, data, true);
       }
-      
+
       long end = System.currentTimeMillis();
-      
+
       double rate = 1000 * (double)numMessages / (end - start);
-      
+
       log.info("Rate " + rate + " records/sec");
 
       journal.stop();
-      
-      journal =
-         new JournalImpl(10 * 1024 * 1024,  numFiles, true, true, false, getFileFactory(),
-               "jbm-data", "jbm", 5000);
-      
+
+      journal = new JournalImpl(10 * 1024 * 1024, numFiles, getFileFactory(), "jbm-data", "jbm", 5000);
+
       journal.start();
       journal.load(new ArrayList<RecordInfo>(), null);
       journal.stop();
-      
+
    }
-   
-   
-}
 
-
+}

Modified: trunk/tests/src/org/jboss/messaging/tests/stress/journal/AddAndRemoveStressTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/stress/journal/AddAndRemoveStressTest.java	2009-06-03 13:22:52 UTC (rev 7180)
+++ trunk/tests/src/org/jboss/messaging/tests/stress/journal/AddAndRemoveStressTest.java	2009-06-03 14:06:16 UTC (rev 7181)
@@ -24,7 +24,6 @@
 
 import java.io.File;
 import java.util.ArrayList;
-import java.util.concurrent.Executors;
 
 import org.jboss.messaging.core.journal.LoadManager;
 import org.jboss.messaging.core.journal.PreparedTransactionInfo;
@@ -82,7 +81,7 @@
    {
 
       SequentialFileFactory factory = new AIOSequentialFileFactory(getTestDir());
-      JournalImpl impl = new JournalImpl(10 * 1024 * 1024, NUMBER_OF_FILES_ON_JOURNAL, true, false, true, factory, "jbm", "jbm", 1000);
+      JournalImpl impl = new JournalImpl(10 * 1024 * 1024, NUMBER_OF_FILES_ON_JOURNAL, factory, "jbm", "jbm", 1000);
 
       impl.start();
 
@@ -94,13 +93,13 @@
          {
             System.out.println("Append " + i);
          }
-         impl.appendAddRecord(i, (byte)0, new SimpleEncoding(1024, (byte)'f'));
+         impl.appendAddRecord(i, (byte)0, new SimpleEncoding(1024, (byte)'f'), false);
       }
 
       impl.stop();
 
       factory = new AIOSequentialFileFactory(getTestDir());
-      impl = new JournalImpl(10 * 1024 * 1024, NUMBER_OF_FILES_ON_JOURNAL, true, false, true, factory, "jbm", "jbm", 1000);
+      impl = new JournalImpl(10 * 1024 * 1024, NUMBER_OF_FILES_ON_JOURNAL, factory, "jbm", "jbm", 1000);
 
       impl.start();
 
@@ -113,13 +112,13 @@
             System.out.println("Delete " + i);
          }
 
-         impl.appendDeleteRecord(i);
+         impl.appendDeleteRecord(i, false);
       }
 
       impl.stop();
 
       factory = new AIOSequentialFileFactory(getTestDir());
-      impl = new JournalImpl(10 * 1024 * 1024, NUMBER_OF_FILES_ON_JOURNAL, true, false, true, factory, "jbm", "jbm", 1000);
+      impl = new JournalImpl(10 * 1024 * 1024, NUMBER_OF_FILES_ON_JOURNAL, factory, "jbm", "jbm", 1000);
 
       impl.start();
 
@@ -146,7 +145,7 @@
    {
 
       SequentialFileFactory factory = new AIOSequentialFileFactory(getTestDir());
-      JournalImpl impl = new JournalImpl(10 * 1024 * 1024, NUMBER_OF_FILES_ON_JOURNAL, true, false, true, factory, "jbm", "jbm", 1000);
+      JournalImpl impl = new JournalImpl(10 * 1024 * 1024, NUMBER_OF_FILES_ON_JOURNAL, factory, "jbm", "jbm", 1000);
 
       impl.start();
 
@@ -158,14 +157,14 @@
          {
             System.out.println("Append " + i);
          }
-         impl.appendAddRecord(i, (byte)21, new SimpleEncoding(40, (byte)'f'));
-         impl.appendUpdateRecord(i, (byte)22, new SimpleEncoding(40, (byte)'g'));
+         impl.appendAddRecord(i, (byte)21, new SimpleEncoding(40, (byte)'f'), false);
+         impl.appendUpdateRecord(i, (byte)22, new SimpleEncoding(40, (byte)'g'), false);
       }
 
       impl.stop();
 
       factory = new AIOSequentialFileFactory(getTestDir());
-      impl = new JournalImpl(10 * 1024 * 1024, 10, true, false, true, factory, "jbm", "jbm", 1000);
+      impl = new JournalImpl(10 * 1024 * 1024, 10, factory, "jbm", "jbm", 1000);
 
       impl.start();
 
@@ -178,13 +177,13 @@
             System.out.println("Delete " + i);
          }
 
-         impl.appendDeleteRecord(i);
+         impl.appendDeleteRecord(i, false);
       }
 
       impl.stop();
 
       factory = new AIOSequentialFileFactory(getTestDir());
-      impl = new JournalImpl(10 * 1024 * 1024, NUMBER_OF_FILES_ON_JOURNAL, true, false, true, factory, "jbm", "jbm", 1000);
+      impl = new JournalImpl(10 * 1024 * 1024, NUMBER_OF_FILES_ON_JOURNAL, factory, "jbm", "jbm", 1000);
 
       impl.start();
 

Modified: trunk/tests/src/org/jboss/messaging/tests/stress/journal/remote/RemoteJournalAppender.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/stress/journal/remote/RemoteJournalAppender.java	2009-06-03 13:22:52 UTC (rev 7180)
+++ trunk/tests/src/org/jboss/messaging/tests/stress/journal/remote/RemoteJournalAppender.java	2009-06-03 14:06:16 UTC (rev 7181)
@@ -142,8 +142,7 @@
 
    public static JournalImpl createJournal(String journalType, String journalDir)
    {
-      JournalImpl journal = new JournalImpl(10485760, 2, true,
-            false, true, getFactory(journalType, journalDir), "journaltst", "tst", 5000);
+      JournalImpl journal = new JournalImpl(10485760, 2, getFactory(journalType, journalDir), "journaltst", "tst", 5000);
       return journal;
    }
    
@@ -212,25 +211,25 @@
                
                if (transactionSize != 0)
                {
-                  journal.appendAddRecordTransactional(transactionId, id, (byte)99, buffer.array());
+                  journal.appendAddRecordTransactional(transactionId, id, (byte)99, buffer.array(), false);
         
                   if (++transactionCounter == transactionSize)
                   {
                      System.out.println("Commit transaction " + transactionId);
-                     journal.appendCommitRecord(transactionId);
+                     journal.appendCommitRecord(transactionId, false);
                      transactionCounter = 0;
                      transactionId = nextID.incrementAndGet();
                   }
                }
                else
                {
-                  journal.appendAddRecord(id, (byte)99, buffer.array());
+                  journal.appendAddRecord(id, (byte)99, buffer.array(), false);
                }
             }
    
             if (transactionCounter != 0)
             {
-               journal.appendCommitRecord(transactionId);
+               journal.appendCommitRecord(transactionId, false);
             }
             
             if (transactionSize == 0)

Modified: trunk/tests/src/org/jboss/messaging/tests/timing/core/journal/impl/JournalImplTestUnit.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/timing/core/journal/impl/JournalImplTestUnit.java	2009-06-03 13:22:52 UTC (rev 7180)
+++ trunk/tests/src/org/jboss/messaging/tests/timing/core/journal/impl/JournalImplTestUnit.java	2009-06-03 14:06:16 UTC (rev 7181)
@@ -152,11 +152,11 @@
 
       for (int count = 0; count < NUMBER_OF_RECORDS; count++)
       {
-         journal.appendAddRecord(count, (byte)0, record);
+         journal.appendAddRecord(count, (byte)0, record, false);
          
          if (count >= NUMBER_OF_RECORDS / 2)
          {
-            journal.appendDeleteRecord(count - NUMBER_OF_RECORDS / 2);
+            journal.appendDeleteRecord(count - NUMBER_OF_RECORDS / 2, false);
          }
          
          if (count % 100 == 0)

Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/config/impl/DefaultsFileConfigurationTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/config/impl/DefaultsFileConfigurationTest.java	2009-06-03 13:22:52 UTC (rev 7180)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/config/impl/DefaultsFileConfigurationTest.java	2009-06-03 14:06:16 UTC (rev 7181)
@@ -110,8 +110,6 @@
 
       assertEquals(ConfigurationImpl.DEFAULT_JOURNAL_MAX_AIO, conf.getJournalMaxAIO());
 
-      assertEquals(ConfigurationImpl.DEFAULT_JOURNAL_AIO_FLUSH_SYNC, conf.isAIOFlushOnSync());
-
       assertEquals(ConfigurationImpl.DEFAULT_JOURNAL_AIO_BUFFER_TIMEOUT, conf.getAIOBufferTimeout());
 
       assertEquals(ConfigurationImpl.DEFAULT_JOURNAL_AIO_BUFFER_SIZE, conf.getAIOBufferSize());

Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/config/impl/FileConfigurationTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/config/impl/FileConfigurationTest.java	2009-06-03 13:22:52 UTC (rev 7180)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/config/impl/FileConfigurationTest.java	2009-06-03 14:06:16 UTC (rev 7181)
@@ -79,8 +79,7 @@
       assertEquals("somedir2", conf.getJournalDirectory());
       assertEquals(false, conf.isCreateJournalDir());
       assertEquals(JournalType.NIO, conf.getJournalType());
-      assertEquals(10000, conf.getAIOBufferSize());      
-      assertEquals(true, conf.isAIOFlushOnSync());      
+      assertEquals(10000, conf.getAIOBufferSize());        
       assertEquals(1000, conf.getAIOBufferTimeout());      
       assertEquals(false, conf.isJournalSyncTransactional());
       assertEquals(true, conf.isJournalSyncNonTransactional());

Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/deployers/impl/QueueDeployerTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/deployers/impl/QueueDeployerTest.java	2009-06-03 13:22:52 UTC (rev 7180)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/deployers/impl/QueueDeployerTest.java	2009-06-03 14:06:16 UTC (rev 7181)
@@ -532,9 +532,8 @@
       /* (non-Javadoc)
        * @see org.jboss.messaging.core.management.MessagingServerControlMBean#getAIOBufferTimeout()
        */
-      public int getAIOBufferTimeout()
+      public long getAIOBufferTimeout()
       {
-
          return 0;
       }
 

Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/AlignedJournalImplTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/AlignedJournalImplTest.java	2009-06-03 13:22:52 UTC (rev 7180)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/AlignedJournalImplTest.java	2009-06-03 14:06:16 UTC (rev 7181)
@@ -146,7 +146,7 @@
 
       try
       {
-         journalImpl = new JournalImpl(2000, 2, true, true, false, factory, "tt", "tt", 1000);
+         journalImpl = new JournalImpl(2000, 2, factory, "tt", "tt", 1000);
          fail("Supposed to throw an exception");
       }
       catch (Exception ignored)
@@ -161,7 +161,7 @@
 
       setupJournal(JOURNAL_SIZE, 10);
 
-      journalImpl.appendAddRecord(13, (byte)14, new SimpleEncoding(1, (byte)15));
+      journalImpl.appendAddRecord(13, (byte)14, new SimpleEncoding(1, (byte)15), false);
 
       journalImpl.forceMoveNextFile();
 
@@ -198,13 +198,13 @@
          {
             bytes[j] = (byte)i;
          }
-         journalImpl.appendAddRecord(i * 100l, (byte)i, bytes);
+         journalImpl.appendAddRecord(i * 100l, (byte)i, bytes, false);
       }
 
       for (int i = 25; i < 50; i++)
       {
          EncodingSupport support = new SimpleEncoding(5, (byte)i);
-         journalImpl.appendAddRecord(i * 100l, (byte)i, support);
+         journalImpl.appendAddRecord(i * 100l, (byte)i, support, false);
       }
 
       setupJournal(JOURNAL_SIZE, 1024);
@@ -233,7 +233,7 @@
             bytes[j] = (byte)'x';
          }
 
-         journalImpl.appendUpdateRecord(i * 100l, (byte)i, bytes);
+         journalImpl.appendUpdateRecord(i * 100l, (byte)i, bytes, false);
       }
 
       setupJournal(JOURNAL_SIZE, 1024);
@@ -291,7 +291,7 @@
 
       for (int i = 0; i < 50; i++)
       {
-         journalImpl.appendAddRecord(i, (byte)1, new SimpleEncoding(1, (byte)'x'));
+         journalImpl.appendAddRecord(i, (byte)1, new SimpleEncoding(1, (byte)'x'), false);
       }
 
       journalImpl.forceMoveNextFile();
@@ -304,7 +304,7 @@
 
       for (int i = 10; i < 50; i++)
       {
-         journalImpl.appendDeleteRecord(i);
+         journalImpl.appendDeleteRecord(i, false);
       }
 
       journalImpl.debugWait();
@@ -337,7 +337,7 @@
 
       for (int i = 0; i < 50; i++)
       {
-         journalImpl.appendAddRecord(i, (byte)1, new SimpleEncoding(1, (byte)'x'));
+         journalImpl.appendAddRecord(i, (byte)1, new SimpleEncoding(1, (byte)'x'), false);
       }
 
       // as the request to a new file is asynchronous, we need to make sure the
@@ -348,12 +348,12 @@
 
       for (int i = 0; i < 50; i++)
       {
-         journalImpl.appendDeleteRecord(i);
+         journalImpl.appendDeleteRecord(i, false);
       }
 
       journalImpl.forceMoveNextFile();
 
-      journalImpl.appendAddRecord(1000, (byte)1, new SimpleEncoding(1, (byte)'x'));
+      journalImpl.appendAddRecord(1000, (byte)1, new SimpleEncoding(1, (byte)'x'), false);
 
       journalImpl.debugWait();
 
@@ -390,7 +390,7 @@
       assertEquals(0, records.size());
       assertEquals(0, transactions.size());
 
-      journalImpl.appendAddRecordTransactional(1, 1, (byte)1, new SimpleEncoding(1, (byte)1));
+      journalImpl.appendAddRecordTransactional(1, 1, (byte)1, new SimpleEncoding(1, (byte)1), false);
 
       setupJournal(JOURNAL_SIZE, 100);
 
@@ -399,7 +399,7 @@
 
       try
       {
-         journalImpl.appendCommitRecord(1l);
+         journalImpl.appendCommitRecord(1l, false);
          // This was supposed to throw an exception, as the transaction was
          // forgotten (interrupted by a reload).
          fail("Supposed to throw exception");
@@ -429,7 +429,7 @@
 
       for (int i = 0; i < 10; i++)
       {
-         journalImpl.appendAddRecordTransactional(77l, 1, (byte)1, new SimpleEncoding(1, (byte)1));
+         journalImpl.appendAddRecordTransactional(77l, 1, (byte)1, new SimpleEncoding(1, (byte)1), false);
          journalImpl.forceMoveNextFile();
       }
 
@@ -437,7 +437,7 @@
 
       assertEquals(12, factory.listFiles("tt").size());
 
-      journalImpl.appendAddRecordTransactional(78l, 1, (byte)1, new SimpleEncoding(1, (byte)1));
+      journalImpl.appendAddRecordTransactional(78l, 1, (byte)1, new SimpleEncoding(1, (byte)1), false);
 
       assertEquals(12, factory.listFiles("tt").size());
 
@@ -448,7 +448,7 @@
 
       try
       {
-         journalImpl.appendCommitRecord(77l);
+         journalImpl.appendCommitRecord(77l, false);
          // This was supposed to throw an exception, as the transaction was
          // forgotten (interrupted by a reload).
          fail("Supposed to throw exception");
@@ -478,11 +478,11 @@
 
       for (int i = 0; i < 10; i++)
       {
-         journalImpl.appendAddRecordTransactional(1, i, (byte)1, new SimpleEncoding(1, (byte)1));
+         journalImpl.appendAddRecordTransactional(1, i, (byte)1, new SimpleEncoding(1, (byte)1), false);
          journalImpl.forceMoveNextFile();
       }
 
-      journalImpl.appendCommitRecord(1l);
+      journalImpl.appendCommitRecord(1l, false);
 
       journalImpl.debugWait();
 
@@ -501,17 +501,17 @@
 
       for (int i = 0; i < 10; i++)
       {
-         journalImpl.appendDeleteRecordTransactional(2l, i);
+         journalImpl.appendDeleteRecordTransactional(2l, i, false);
          journalImpl.forceMoveNextFile();
       }
 
-      journalImpl.appendCommitRecord(2l);
+      journalImpl.appendCommitRecord(2l, false);
 
-      journalImpl.appendAddRecord(100, (byte)1, new SimpleEncoding(5, (byte)1));
+      journalImpl.appendAddRecord(100, (byte)1, new SimpleEncoding(5, (byte)1), false);
 
       journalImpl.forceMoveNextFile();
 
-      journalImpl.appendAddRecord(101, (byte)1, new SimpleEncoding(5, (byte)1));
+      journalImpl.appendAddRecord(101, (byte)1, new SimpleEncoding(5, (byte)1), false);
 
       journalImpl.checkAndReclaimFiles();
 
@@ -536,9 +536,9 @@
       journalImpl.appendAddRecordTransactional(1l,
                                                2l,
                                                (byte)3,
-                                               new SimpleEncoding(1900 - JournalImpl.SIZE_ADD_RECORD_TX, (byte)4));
+                                               new SimpleEncoding(1900 - JournalImpl.SIZE_ADD_RECORD_TX, (byte)4), false);
 
-      journalImpl.appendCommitRecord(1l);
+      journalImpl.appendCommitRecord(1l, false);
 
       journalImpl.debugWait();
 
@@ -561,13 +561,13 @@
 
       for (int i = 0; i < 20; i++)
       {
-         journalImpl.appendAddRecordTransactional(1l, i, (byte)0, new SimpleEncoding(1, (byte)15));
+         journalImpl.appendAddRecordTransactional(1l, i, (byte)0, new SimpleEncoding(1, (byte)15), false);
          journalImpl.forceMoveNextFile();
       }
 
       journalImpl.forceMoveNextFile();
 
-      journalImpl.appendCommitRecord(1l);
+      journalImpl.appendCommitRecord(1l, false);
 
       SequentialFile file = factory.createSequentialFile("tt-1.tt", 1);
 
@@ -626,16 +626,16 @@
 
       for (int i = 0; i < 20; i++)
       {
-         journalImpl.appendAddRecordTransactional(1l, i, (byte)0, new SimpleEncoding(1, (byte)15));
-         journalImpl.appendAddRecordTransactional(2l, i + 20l, (byte)0, new SimpleEncoding(1, (byte)15));
+         journalImpl.appendAddRecordTransactional(1l, i, (byte)0, new SimpleEncoding(1, (byte)15), false);
+         journalImpl.appendAddRecordTransactional(2l, i + 20l, (byte)0, new SimpleEncoding(1, (byte)15), false);
          journalImpl.forceMoveNextFile();
       }
 
       journalImpl.forceMoveNextFile();
 
-      journalImpl.appendCommitRecord(1l);
+      journalImpl.appendCommitRecord(1l, false);
 
-      journalImpl.appendCommitRecord(2l);
+      journalImpl.appendCommitRecord(2l, false);
 
       SequentialFile file = factory.createSequentialFile("tt-1.tt", 1);
 
@@ -695,7 +695,7 @@
 
       for (int i = 0; i < 10; i++)
       {
-         journalImpl.appendAddRecord(i, (byte)0, new SimpleEncoding(1, (byte)0));
+         journalImpl.appendAddRecord(i, (byte)0, new SimpleEncoding(1, (byte)0), false);
          journalImpl.forceMoveNextFile();
       }
 
@@ -707,7 +707,7 @@
 
       for (int i = 0; i < 10; i++)
       {
-         journalImpl.appendDeleteRecord(i);
+         journalImpl.appendDeleteRecord(i, false);
       }
 
       journalImpl.forceMoveNextFile();
@@ -734,19 +734,19 @@
 
       for (int i = 0; i < 10; i++)
       {
-         journalImpl.appendAddRecordTransactional(1l, i, (byte)0, new SimpleEncoding(1, (byte)15));
+         journalImpl.appendAddRecordTransactional(1l, i, (byte)0, new SimpleEncoding(1, (byte)15), false);
          journalImpl.forceMoveNextFile();
       }
 
       for (int i = 10; i < 20; i++)
       {
-         journalImpl.appendAddRecordTransactional(1l, i, (byte)0, new SimpleEncoding(1, (byte)15));
+         journalImpl.appendAddRecordTransactional(1l, i, (byte)0, new SimpleEncoding(1, (byte)15), false);
          journalImpl.forceMoveNextFile();
       }
 
       journalImpl.forceMoveNextFile();
 
-      journalImpl.appendCommitRecord(1l);
+      journalImpl.appendCommitRecord(1l, false);
 
       SequentialFile file = factory.createSequentialFile("tt-1.tt", 1);
 
@@ -796,22 +796,22 @@
 
       for (int i = 0; i < 10; i++)
       {
-         journalImpl.appendAddRecordTransactional(1l, i, (byte)0, new SimpleEncoding(1, (byte)15));
+         journalImpl.appendAddRecordTransactional(1l, i, (byte)0, new SimpleEncoding(1, (byte)15), false);
       }
 
       journalImpl.forceMoveNextFile();
       SimpleEncoding xidEncoding = new SimpleEncoding(10, (byte)'a');
 
-      journalImpl.appendPrepareRecord(1l, xidEncoding);
-      journalImpl.appendCommitRecord(1l);
+      journalImpl.appendPrepareRecord(1l, xidEncoding, false);
+      journalImpl.appendCommitRecord(1l, false);
 
       for (int i = 0; i < 10; i++)
       {
-         journalImpl.appendDeleteRecordTransactional(2l, i);
+         journalImpl.appendDeleteRecordTransactional(2l, i, false);
       }
 
-      journalImpl.appendCommitRecord(2l);
-      journalImpl.appendAddRecord(100l, (byte)0, new SimpleEncoding(1, (byte)10)); // Add
+      journalImpl.appendCommitRecord(2l, false);
+      journalImpl.appendAddRecord(100l, (byte)0, new SimpleEncoding(1, (byte)10), false); // Add
       // anything
       // to
       // keep
@@ -841,10 +841,10 @@
          {
             journalImpl.forceMoveNextFile();
          }
-         journalImpl.appendAddRecordTransactional(1l, i, (byte)0, new SimpleEncoding(1, (byte)15));
+         journalImpl.appendAddRecordTransactional(1l, i, (byte)0, new SimpleEncoding(1, (byte)15), false);
       }
 
-      journalImpl.appendCommitRecord(1l);
+      journalImpl.appendCommitRecord(1l, false);
 
       for (int i = 0; i < 10; i++)
       {
@@ -852,10 +852,10 @@
          {
             journalImpl.forceMoveNextFile();
          }
-         journalImpl.appendDeleteRecordTransactional(2l, i);
+         journalImpl.appendDeleteRecordTransactional(2l, i, false);
       }
 
-      journalImpl.appendCommitRecord(2l);
+      journalImpl.appendCommitRecord(2l, false);
       journalImpl.forceMoveNextFile();
       journalImpl.checkAndReclaimFiles();
 
@@ -876,11 +876,11 @@
 
       SimpleEncoding xid = new SimpleEncoding(10, (byte)1);
 
-      journalImpl.appendAddRecord(10l, (byte)0, new SimpleEncoding(10, (byte)0));
+      journalImpl.appendAddRecord(10l, (byte)0, new SimpleEncoding(10, (byte)0), false);
 
-      journalImpl.appendDeleteRecordTransactional(1l, 10l, new SimpleEncoding(100, (byte)'j'));
+      journalImpl.appendDeleteRecordTransactional(1l, 10l, new SimpleEncoding(100, (byte)'j'), false);
 
-      journalImpl.appendPrepareRecord(1, xid);
+      journalImpl.appendPrepareRecord(1, xid, false);
 
       journalImpl.debugWait();
 
@@ -907,7 +907,7 @@
          assertEquals((byte)1, transactions.get(0).extraData[i]);
       }
 
-      journalImpl.appendCommitRecord(1l);
+      journalImpl.appendCommitRecord(1l, false);
 
       journalImpl.debugWait();
 
@@ -929,7 +929,7 @@
 
       for (int i = 0; i < 10; i++)
       {
-         journalImpl.appendAddRecordTransactional(1, i, (byte)1, new SimpleEncoding(50, (byte)1));
+         journalImpl.appendAddRecordTransactional(1, i, (byte)1, new SimpleEncoding(50, (byte)1), false);
          journalImpl.forceMoveNextFile();
       }
 
@@ -937,7 +937,7 @@
 
       SimpleEncoding xid1 = new SimpleEncoding(10, (byte)1);
 
-      journalImpl.appendPrepareRecord(1l, xid1);
+      journalImpl.appendPrepareRecord(1l, xid1, false);
 
       assertEquals(12, factory.listFiles("tt").size());
 
@@ -958,7 +958,7 @@
 
       assertEquals(12, factory.listFiles("tt").size());
 
-      journalImpl.appendCommitRecord(1l);
+      journalImpl.appendCommitRecord(1l, false);
 
       setupJournal(JOURNAL_SIZE, 1024);
 
@@ -968,12 +968,12 @@
 
       for (int i = 0; i < 10; i++)
       {
-         journalImpl.appendDeleteRecordTransactional(2l, i);
+         journalImpl.appendDeleteRecordTransactional(2l, i, false);
       }
 
       SimpleEncoding xid2 = new SimpleEncoding(15, (byte)2);
 
-      journalImpl.appendPrepareRecord(2l, xid2);
+      journalImpl.appendPrepareRecord(2l, xid2, false);
 
       setupJournal(JOURNAL_SIZE, 1);
 
@@ -990,7 +990,7 @@
 
       assertEquals(12, factory.listFiles("tt").size());
 
-      journalImpl.appendCommitRecord(2l);
+      journalImpl.appendCommitRecord(2l, false);
 
       setupJournal(JOURNAL_SIZE, 1);
 
@@ -1018,11 +1018,11 @@
 
       for (int i = 0; i < 10; i++)
       {
-         journalImpl.appendAddRecordTransactional(1, i, (byte)1, new SimpleEncoding(50, (byte)1));
+         journalImpl.appendAddRecordTransactional(1, i, (byte)1, new SimpleEncoding(50, (byte)1), false);
          journalImpl.forceMoveNextFile();
       }
 
-      journalImpl.appendPrepareRecord(1l, new SimpleEncoding(13, (byte)0));
+      journalImpl.appendPrepareRecord(1l, new SimpleEncoding(13, (byte)0), false);
 
       setupJournal(JOURNAL_SIZE, 100);
       assertEquals(0, records.size());
@@ -1067,11 +1067,11 @@
 
       for (int i = 0; i < 10; i++)
       {
-         journalImpl.appendAddRecordTransactional(1l, i, (byte)0, new SimpleEncoding(1, (byte)0));
+         journalImpl.appendAddRecordTransactional(1l, i, (byte)0, new SimpleEncoding(1, (byte)0), false);
          journalImpl.forceMoveNextFile();
       }
 
-      journalImpl.appendRollbackRecord(1l);
+      journalImpl.appendRollbackRecord(1l, false);
 
       journalImpl.forceMoveNextFile();
 
@@ -1096,10 +1096,10 @@
 
       for (int i = 0; i < 10; i++)
       {
-         journalImpl.appendAddRecordTransactional(1l, i, (byte)0, new SimpleEncoding(1, (byte)0));
+         journalImpl.appendAddRecordTransactional(1l, i, (byte)0, new SimpleEncoding(1, (byte)0), false);
       }
 
-      journalImpl.appendCommitRecord(1l);
+      journalImpl.appendCommitRecord(1l, false);
 
       setupJournal(JOURNAL_SIZE, 100);
 
@@ -1119,10 +1119,10 @@
 
       for (int i = 0; i < 10; i++)
       {
-         journalImpl.appendAddRecordTransactional(1l, i, (byte)0, new SimpleEncoding(1, (byte)0));
+         journalImpl.appendAddRecordTransactional(1l, i, (byte)0, new SimpleEncoding(1, (byte)0), false);
       }
 
-      journalImpl.appendCommitRecord(1l);
+      journalImpl.appendCommitRecord(1l, false);
 
       setupJournal(JOURNAL_SIZE, 100);
 
@@ -1139,11 +1139,11 @@
 
       setupJournal(JOURNAL_SIZE, 1);
 
-      journalImpl.appendPrepareRecord(2l, new SimpleEncoding(10, (byte)'j'));
+      journalImpl.appendPrepareRecord(2l, new SimpleEncoding(10, (byte)'j'), false);
 
       journalImpl.forceMoveNextFile();
 
-      journalImpl.appendAddRecord(1l, (byte)0, new SimpleEncoding(10, (byte)'k'));
+      journalImpl.appendAddRecord(1l, (byte)0, new SimpleEncoding(10, (byte)'k'), false);
 
       setupJournal(JOURNAL_SIZE, 1);
 
@@ -1159,9 +1159,9 @@
 
       assertEquals(1, transactions.size());
 
-      journalImpl.appendCommitRecord(2l);
+      journalImpl.appendCommitRecord(2l, false);
 
-      journalImpl.appendDeleteRecord(1l);
+      journalImpl.appendDeleteRecord(1l, false);
 
       journalImpl.forceMoveNextFile();
 
@@ -1203,8 +1203,8 @@
                latchStart.await();
                for (int i = 0; i < NUMBER_OF_ELEMENTS; i++)
                {
-                  journalImpl.appendAddRecordTransactional(i, i, (byte)1, new SimpleEncoding(50, (byte)1));
-                  journalImpl.appendCommitRecord(i);
+                  journalImpl.appendAddRecordTransactional(i, i, (byte)1, new SimpleEncoding(50, (byte)1), false);
+                  journalImpl.appendCommitRecord(i, false);
                   queueDelete.offer(i);
                }
                finishedOK.incrementAndGet();
@@ -1232,7 +1232,7 @@
                   {
                      break;
                   }
-                  journalImpl.appendDeleteRecord(toDelete);
+                  journalImpl.appendDeleteRecord(toDelete, false);
                }
                finishedOK.incrementAndGet();
             }
@@ -1268,20 +1268,20 @@
    {
 
       SequentialFileFactory factory = new FakeSequentialFileFactory(512, false);
-      JournalImpl impl = new JournalImpl(512 + 512 * 3, 20, true, false, false, factory, "jbm", "jbm", 1000);
+      JournalImpl impl = new JournalImpl(512 + 512 * 3, 20, factory, "jbm", "jbm", 1000);
 
       impl.start();
 
       impl.load(dummyLoader);
 
-      impl.appendAddRecord(1l, (byte)0, new SimpleEncoding(100, (byte)'a'));
-      impl.appendAddRecord(2l, (byte)0, new SimpleEncoding(100, (byte)'b'));
-      impl.appendAddRecord(3l, (byte)0, new SimpleEncoding(100, (byte)'b'));
-      impl.appendAddRecord(4l, (byte)0, new SimpleEncoding(100, (byte)'b'));
+      impl.appendAddRecord(1l, (byte)0, new SimpleEncoding(100, (byte)'a'), false);
+      impl.appendAddRecord(2l, (byte)0, new SimpleEncoding(100, (byte)'b'), false);
+      impl.appendAddRecord(3l, (byte)0, new SimpleEncoding(100, (byte)'b'), false);
+      impl.appendAddRecord(4l, (byte)0, new SimpleEncoding(100, (byte)'b'), false);
 
       impl.stop();
 
-      impl = new JournalImpl(512 + 1024 + 512, 20, true, false, false, factory, "jbm", "jbm", 1000);
+      impl = new JournalImpl(512 + 1024 + 512, 20, factory, "jbm", "jbm", 1000);
       impl.start();
       impl.load(dummyLoader);
 
@@ -1289,14 +1289,14 @@
       // specific bug caught during development
       impl.forceMoveNextFile();
 
-      impl.appendDeleteRecord(1l);
-      impl.appendDeleteRecord(2l);
-      impl.appendDeleteRecord(3l);
-      impl.appendDeleteRecord(4l);
+      impl.appendDeleteRecord(1l, false);
+      impl.appendDeleteRecord(2l, false);
+      impl.appendDeleteRecord(3l, false);
+      impl.appendDeleteRecord(4l, false);
 
       impl.stop();
 
-      impl = new JournalImpl(512 + 1024 + 512, 20, true, false, false, factory, "jbm", "jbm", 1000);
+      impl = new JournalImpl(512 + 1024 + 512, 20, factory, "jbm", "jbm", 1000);
       impl.start();
 
       ArrayList<RecordInfo> info = new ArrayList<RecordInfo>();
@@ -1364,7 +1364,7 @@
          journalImpl.stop();
       }
 
-      journalImpl = new JournalImpl(journalSize, numberOfMinimalFiles, true, true, false, factory, "tt", "tt", 1000);
+      journalImpl = new JournalImpl(journalSize, numberOfMinimalFiles, factory, "tt", "tt", 1000);
 
       journalImpl.start();
 

Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalAsyncTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalAsyncTest.java	2009-06-03 13:22:52 UTC (rev 7180)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalAsyncTest.java	2009-06-03 14:06:16 UTC (rev 7181)
@@ -85,12 +85,12 @@
             {
                for (int i = 0; i < 10; i++)
                {
-                  journalImpl.appendAddRecordTransactional(1l, i, (byte)1, new SimpleEncoding(1, (byte)0));
+                  journalImpl.appendAddRecordTransactional(1l, i, (byte)1, new SimpleEncoding(1, (byte)0), false);
                }
 
                latch.countDown();
                factory.setHoldCallbacks(false, null);
-               journalImpl.appendCommitRecord(1l);
+               journalImpl.appendCommitRecord(1l, false);
             }
             catch (Exception e)
             {
@@ -147,10 +147,10 @@
             {
                for (int i = 0; i < 10; i++)
                {
-                  journalImpl.appendAddRecordTransactional(1l, i, (byte)1, new SimpleEncoding(1, (byte)0));
+                  journalImpl.appendAddRecordTransactional(1l, i, (byte)1, new SimpleEncoding(1, (byte)0), false);
                }
 
-               journalImpl.appendRollbackRecord(1l);
+               journalImpl.appendRollbackRecord(1l, false);
             }
             catch (Exception e)
             {
@@ -211,10 +211,10 @@
             {
                for (int i = 0; i < 10; i++)
                {
-                  journalImpl.appendAddRecordTransactional(1l, i, (byte)1, new SimpleEncoding(1, (byte)0));
+                  journalImpl.appendAddRecordTransactional(1l, i, (byte)1, new SimpleEncoding(1, (byte)0), false);
                }
 
-               journalImpl.appendCommitRecord(1l);
+               journalImpl.appendCommitRecord(1l, false);
             }
             catch (Exception e)
             {
@@ -248,7 +248,7 @@
 
       try
       {
-         journalImpl.appendRollbackRecord(1l);
+         journalImpl.appendRollbackRecord(1l, false);
          fail("Supposed to throw an exception");
       }
       catch (Exception e)
@@ -268,7 +268,7 @@
       factory.setHoldCallbacks(true, null);
       factory.setGenerateErrors(true);
 
-      journalImpl.appendAddRecordTransactional(1l, 1, (byte)1, new SimpleEncoding(1, (byte)0));
+      journalImpl.appendAddRecordTransactional(1l, 1, (byte)1, new SimpleEncoding(1, (byte)0), false);
 
       factory.flushAllCallbacks();
 
@@ -277,7 +277,7 @@
 
       try
       {
-         journalImpl.appendAddRecordTransactional(1l, 2, (byte)1, new SimpleEncoding(1, (byte)0));
+         journalImpl.appendAddRecordTransactional(1l, 2, (byte)1, new SimpleEncoding(1, (byte)0), false);
          fail("Exception expected"); // An exception already happened in one
          // of the elements on this transaction.
          // We can't accept any more elements on
@@ -298,7 +298,7 @@
 
       try
       {
-         journalImpl.appendAddRecord(1l, (byte)0, new SimpleEncoding(1, (byte)0));
+         journalImpl.appendAddRecord(1l, (byte)0, new SimpleEncoding(1, (byte)0), false);
          fail("Exception expected");
       }
       catch (Exception ignored)
@@ -357,7 +357,7 @@
          journalImpl.stop();
       }
 
-      journalImpl = new JournalImpl(journalSize, numberOfMinimalFiles, true, true, false, factory, "tt", "tt", 1000);
+      journalImpl = new JournalImpl(journalSize, numberOfMinimalFiles, factory, "tt", "tt", 1000);
 
       journalImpl.start();
 

Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalImplTestBase.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalImplTestBase.java	2009-06-03 13:22:52 UTC (rev 7180)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalImplTestBase.java	2009-06-03 14:06:16 UTC (rev 7181)
@@ -22,7 +22,6 @@
 
 package org.jboss.messaging.tests.unit.core.journal.impl;
 
-import java.io.File;
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.LinkedHashMap;
@@ -69,8 +68,6 @@
 
    protected boolean sync;
    
-   protected boolean flushOnSync;
-
    protected String filePrefix = "jbm";
 
    protected String fileExtension = "jbm";
@@ -142,13 +139,12 @@
       minFiles = minFreeFiles;
       this.fileSize = fileSize;
       this.sync = sync;
-      this.flushOnSync = sync;
       maxAIO = 50;
    }
 
    public void createJournal() throws Exception
    {
-      journal = new JournalImpl(fileSize, minFiles, sync, sync, flushOnSync, fileFactory, filePrefix, fileExtension, maxAIO);
+      journal = new JournalImpl(fileSize, minFiles, fileFactory, filePrefix, fileExtension, maxAIO);
       journal.setAutoReclaim(false);
    }
 
@@ -230,7 +226,7 @@
       {
          byte[] record = generateRecord(size);
 
-         journal.appendAddRecord(element, (byte)0, record);
+         journal.appendAddRecord(element, (byte)0, record, sync);
 
          records.add(new RecordInfo(element, (byte)0, record, false));
       }
@@ -244,7 +240,7 @@
       {
          byte[] updateRecord = generateRecord(recordLength);
 
-         journal.appendUpdateRecord(element, (byte)0, updateRecord);
+         journal.appendUpdateRecord(element, (byte)0, updateRecord, sync);
 
          records.add(new RecordInfo(element, (byte)0, updateRecord, true));
       }
@@ -256,7 +252,7 @@
    {
       for (long element : arguments)
       {
-         journal.appendDeleteRecord(element);
+         journal.appendDeleteRecord(element, sync);
 
          removeRecordsForID(element);
       }
@@ -274,7 +270,7 @@
          // SIZE_BYTE
          byte[] record = generateRecord(recordLength - JournalImpl.SIZE_ADD_RECORD_TX);
 
-         journal.appendAddRecordTransactional(txID, element, (byte)0, record);
+         journal.appendAddRecordTransactional(txID, element, (byte)0, record, sync);
 
          tx.records.add(new RecordInfo(element, (byte)0, record, false));
 
@@ -291,7 +287,7 @@
       {
          byte[] updateRecord = generateRecord(recordLength - JournalImpl.SIZE_UPDATE_RECORD_TX);
 
-         journal.appendUpdateRecordTransactional(txID, element, (byte)0, updateRecord);
+         journal.appendUpdateRecordTransactional(txID, element, (byte)0, updateRecord, sync);
 
          tx.records.add(new RecordInfo(element, (byte)0, updateRecord, true));
       }
@@ -304,7 +300,7 @@
 
       for (long element : arguments)
       {
-         journal.appendDeleteRecordTransactional(txID, element);
+         journal.appendDeleteRecordTransactional(txID, element, sync);
 
          tx.deletes.add(new RecordInfo(element, (byte)0, null, true));
       }
@@ -325,7 +321,7 @@
       {
          throw new IllegalStateException("Transaction is already prepared");
       }
-      journal.appendPrepareRecord(txID, xid);
+      journal.appendPrepareRecord(txID, xid, sync);
 
       tx.prepared = true;
 
@@ -341,7 +337,7 @@
          throw new IllegalStateException("Cannot find tx " + txID);
       }
 
-      journal.appendCommitRecord(txID);
+      journal.appendCommitRecord(txID, sync);
 
       commitTx(txID);
 
@@ -357,7 +353,7 @@
          throw new IllegalStateException("Cannot find tx " + txID);
       }
 
-      journal.appendRollbackRecord(txID);
+      journal.appendRollbackRecord(txID, sync);
 
       journal.debugWait();
    }

Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalImplTestUnit.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalImplTestUnit.java	2009-06-03 13:22:52 UTC (rev 7180)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalImplTestUnit.java	2009-06-03 14:06:16 UTC (rev 7181)
@@ -120,7 +120,7 @@
       for (int i = 0; i < 100; i++)
       {
          System.out.println("i = " + i);
-         journal.appendAddRecord(1, (byte)1, new SimpleEncoding(2, (byte)'a'));
+         journal.appendAddRecord(1, (byte)1, new SimpleEncoding(2, (byte)'a'), false);
       }
       stopJournal();
    }
@@ -129,7 +129,7 @@
    {
       try
       {
-         new JournalImpl(JournalImpl.MIN_FILE_SIZE - 1, 10, true, true, false, fileFactory, filePrefix, fileExtension, 1);
+         new JournalImpl(JournalImpl.MIN_FILE_SIZE - 1, 10, fileFactory, filePrefix, fileExtension, 1);
 
          fail("Should throw exception");
       }
@@ -140,7 +140,7 @@
 
       try
       {
-         new JournalImpl(10 * 1024, 1, true, true, false, fileFactory, filePrefix, fileExtension, 1);
+         new JournalImpl(10 * 1024, 1, fileFactory, filePrefix, fileExtension, 1);
 
          fail("Should throw exception");
       }
@@ -151,7 +151,7 @@
 
       try
       {
-         new JournalImpl(10 * 1024, 10, true, true, false, null, filePrefix, fileExtension, 1);
+         new JournalImpl(10 * 1024, 10, null, filePrefix, fileExtension, 1);
 
          fail("Should throw exception");
       }
@@ -162,7 +162,7 @@
 
       try
       {
-         new JournalImpl(10 * 1024, 10, true, true, false, fileFactory, null, fileExtension, 1);
+         new JournalImpl(10 * 1024, 10, fileFactory, null, fileExtension, 1);
 
          fail("Should throw exception");
       }
@@ -173,7 +173,7 @@
 
       try
       {
-         new JournalImpl(10 * 1024, 10, true, true, false, fileFactory, filePrefix, null, 1);
+         new JournalImpl(10 * 1024, 10, fileFactory, filePrefix, null, 1);
 
          fail("Should throw exception");
       }
@@ -184,7 +184,7 @@
 
       try
       {
-         new JournalImpl(10 * 1024, 10, true, true, false, fileFactory, filePrefix, null, 0);
+         new JournalImpl(10 * 1024, 10, fileFactory, filePrefix, null, 0);
 
          fail("Should throw exception");
       }
@@ -2260,7 +2260,7 @@
       {
          byte[] record = generateRecord(10 + (int)(1500 * Math.random()));
 
-         journal.appendAddRecord(i, (byte)0, record);
+         journal.appendAddRecord(i, (byte)0, record, false);
 
          records.add(new RecordInfo(i, (byte)0, record, false));
       }
@@ -2269,14 +2269,14 @@
       {
          byte[] record = generateRecord(10 + (int)(1024 * Math.random()));
 
-         journal.appendUpdateRecord(i, (byte)0, record);
+         journal.appendUpdateRecord(i, (byte)0, record, false);
 
          records.add(new RecordInfo(i, (byte)0, record, true));
       }
 
       for (int i = 0; i < 100; i++)
       {
-         journal.appendDeleteRecord(i);
+         journal.appendDeleteRecord(i, false);
 
          removeRecordsForID(i);
       }

Modified: trunk/tests/src/org/jboss/messaging/tests/util/JournalExample.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/util/JournalExample.java	2009-06-03 13:22:52 UTC (rev 7180)
+++ trunk/tests/src/org/jboss/messaging/tests/util/JournalExample.java	2009-06-03 14:06:16 UTC (rev 7181)
@@ -1,4 +1,5 @@
 package org.jboss.messaging.tests.util;
+
 import java.util.ArrayList;
 
 import org.jboss.messaging.core.journal.PreparedTransactionInfo;
@@ -61,25 +62,21 @@
 
    // Inner classes -------------------------------------------------
 
-   
    public static void main(String arg[])
    {
       TimeAndCounterIDGenerator idgenerator = new TimeAndCounterIDGenerator();
       try
       {
          SequentialFileFactory fileFactory = new AIOSequentialFileFactory("/tmp"); // any dir you want
-         //SequentialFileFactory fileFactory = new NIOSequentialFileFactory("/tmp"); // any dir you want
-         JournalImpl journalExample = new JournalImpl(
-                                                      10 * 1024 * 1024, // 10M.. we believe that's the usual cilinder bufferSize.. not an exact science here
+         // SequentialFileFactory fileFactory = new NIOSequentialFileFactory("/tmp"); // any dir you want
+         JournalImpl journalExample = new JournalImpl(10 * 1024 * 1024, // 10M.. we believe that's the usual cilinder
+                                                                        // bufferSize.. not an exact science here
                                                       2, // number of files pre-allocated
-                                                      true, // sync on commit
-                                                      false, // no sync on non transactional
-                                                      false, // if aio, flush on sync
                                                       fileFactory, // AIO or NIO
                                                       "exjournal", // file name
                                                       "dat", // extension
-                                                       10000); // it's like a semaphore for callback on the AIO layer
-         
+                                                      10000); // it's like a semaphore for callback on the AIO layer
+
          ArrayList<RecordInfo> committedRecords = new ArrayList<RecordInfo>();
          ArrayList<PreparedTransactionInfo> preparedTransactions = new ArrayList<PreparedTransactionInfo>();
          journalExample.start();
@@ -87,31 +84,77 @@
          journalExample.load(committedRecords, preparedTransactions);
 
          System.out.println("Loaded Record List:");
-         
-         for (RecordInfo record: committedRecords)
+
+         for (RecordInfo record : committedRecords)
          {
-            System.out.println("Record id = " + record.id + " userType = " + record.userRecordType + " with " + record.data.length + " bytes is stored on the journal");
+            System.out.println("Record id = " + record.id +
+                               " userType = " +
+                               record.userRecordType +
+                               " with " +
+                               record.data.length +
+                               " bytes is stored on the journal");
          }
 
          System.out.println("Adding Records:");
-         
-         for (int i = 0 ; i < 10; i++)
+
+         for (int i = 0; i < 10; i++)
          {
-            journalExample.appendAddRecord(idgenerator.generateID(), (byte)1, new byte[] { 0, 1, 2, 0, 1, 2, 0, 1, 2, 0, 1, 2, 0, 1, 2, 0, 1, 2, 0, 1, 2} );
+            journalExample.appendAddRecord(idgenerator.generateID(), (byte)1, new byte[] { 0,
+                                                                                          1,
+                                                                                          2,
+                                                                                          0,
+                                                                                          1,
+                                                                                          2,
+                                                                                          0,
+                                                                                          1,
+                                                                                          2,
+                                                                                          0,
+                                                                                          1,
+                                                                                          2,
+                                                                                          0,
+                                                                                          1,
+                                                                                          2,
+                                                                                          0,
+                                                                                          1,
+                                                                                          2,
+                                                                                          0,
+                                                                                          1,
+                                                                                          2 }, false);
          }
-         
+
          long tx = idgenerator.generateID(); // some id generation system
-         
-         for (int i = 0 ; i < 100; i++)
+
+         for (int i = 0; i < 100; i++)
          {
-            journalExample.appendAddRecordTransactional(tx, idgenerator.generateID(), (byte)2, new byte[] { 0, 1, 2, 0, 1, 2, 0, 1, 2, 0, 1, 2, 0, 1, 2, 0, 1, 2, 0, 1, 2, 5});
+            journalExample.appendAddRecordTransactional(tx, idgenerator.generateID(), (byte)2, new byte[] { 0,
+                                                                                                           1,
+                                                                                                           2,
+                                                                                                           0,
+                                                                                                           1,
+                                                                                                           2,
+                                                                                                           0,
+                                                                                                           1,
+                                                                                                           2,
+                                                                                                           0,
+                                                                                                           1,
+                                                                                                           2,
+                                                                                                           0,
+                                                                                                           1,
+                                                                                                           2,
+                                                                                                           0,
+                                                                                                           1,
+                                                                                                           2,
+                                                                                                           0,
+                                                                                                           1,
+                                                                                                           2,
+                                                                                                           5 }, true);
          }
-         
+
          // After this is complete, you're sure the records are there
-         journalExample.appendCommitRecord(tx);
+         journalExample.appendCommitRecord(tx, true);
 
          System.out.println("Done!");
-         
+
          journalExample.stop();
       }
       catch (Exception e)
@@ -119,5 +162,5 @@
          e.printStackTrace();
       }
    }
-   
+
 }

Modified: trunk/tests/src/org/jboss/messaging/tests/util/ListJournal.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/util/ListJournal.java	2009-06-03 13:22:52 UTC (rev 7180)
+++ trunk/tests/src/org/jboss/messaging/tests/util/ListJournal.java	2009-06-03 14:06:16 UTC (rev 7181)
@@ -20,7 +20,6 @@
  * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
  */
 
-
 package org.jboss.messaging.tests.util;
 
 import java.util.ArrayList;
@@ -61,27 +60,21 @@
       try
       {
          FileConfiguration fileConf = new FileConfiguration();
-         
-         //fileConf.setConfigurationUrl(arg[0]);
-         
+
+         // fileConf.setConfigurationUrl(arg[0]);
+
          fileConf.start();
-         
-         
-         
+
          JournalImpl journal = new JournalImpl(fileConf.getJournalFileSize(),
-                            fileConf.getJournalMinFiles(),
-                            true,
-                            true,
-                            false,
-                            new NIOSequentialFileFactory(fileConf.getJournalDirectory()),
-                            "jbm-data",
-                            "jbm",
-                            fileConf.getJournalMaxAIO());
-         
-         
+                                               fileConf.getJournalMinFiles(),
+                                               new NIOSequentialFileFactory(fileConf.getJournalDirectory()),
+                                               "jbm-data",
+                                               "jbm",
+                                               fileConf.getJournalMaxAIO());
+
          ArrayList<RecordInfo> records = new ArrayList<RecordInfo>();
          ArrayList<PreparedTransactionInfo> prepared = new ArrayList<PreparedTransactionInfo>();
-         
+
          journal.start();
          journal.load(records, prepared);
 
@@ -89,23 +82,20 @@
          {
             System.out.println("There are " + prepared.size() + " prepared transactions on the journal");
          }
-         
-         
+
          System.out.println("Total of " + records.size() + " committed records");
 
-         for (RecordInfo record: records)
+         for (RecordInfo record : records)
          {
             System.out.println("user record: " + record);
          }
-         
+
          journal.checkAndReclaimFiles();
-         
+
          System.out.println("Data = " + journal.debug());
-         
+
          journal.stop();
-         
-         
-         
+
       }
       catch (Exception e)
       {
@@ -113,7 +103,7 @@
       }
 
    }
-   
+
    // Package protected ---------------------------------------------
 
    // Protected -----------------------------------------------------




More information about the jboss-cvs-commits mailing list