[jboss-cvs] JBoss Messaging SVN: r7432 - in branches/clebert_temp_expirement: src/main/org/jboss/messaging/core/asyncio/impl and 5 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Mon Jun 22 20:55:33 EDT 2009


Author: clebert.suconic at jboss.com
Date: 2009-06-22 20:55:33 -0400 (Mon, 22 Jun 2009)
New Revision: 7432

Modified:
   branches/clebert_temp_expirement/native/src/JNICallbackAdapter.cpp
   branches/clebert_temp_expirement/native/src/JNICallbackAdapter.h
   branches/clebert_temp_expirement/native/src/JNI_AsynchronousFileImpl.cpp
   branches/clebert_temp_expirement/native/src/Version.h
   branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/asyncio/impl/AsynchronousFileImpl.java
   branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/Journal.java
   branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/SequentialFileFactory.java
   branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/TestableJournal.java
   branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFileFactory.java
   branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/AbstractSequentialFactory.java
   branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalFile.java
   branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalFileImpl.java
   branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java
   branches/clebert_temp_expirement/tests/src/org/jboss/messaging/tests/integration/journal/RealAIOJournalImplTest.java
   branches/clebert_temp_expirement/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalImplTestBase.java
   branches/clebert_temp_expirement/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalImplTestUnit.java
   branches/clebert_temp_expirement/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/ReclaimerTest.java
   branches/clebert_temp_expirement/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java
Log:
Uploading initial changes

Modified: branches/clebert_temp_expirement/native/src/JNICallbackAdapter.cpp
===================================================================
--- branches/clebert_temp_expirement/native/src/JNICallbackAdapter.cpp	2009-06-23 00:53:36 UTC (rev 7431)
+++ branches/clebert_temp_expirement/native/src/JNICallbackAdapter.cpp	2009-06-23 00:55:33 UTC (rev 7432)
@@ -22,12 +22,15 @@
 #include <iostream>
 #include "JavaUtilities.h"
 
-JNICallbackAdapter::JNICallbackAdapter(AIOController * _controller, jobject _callback, jobject _fileController, jobject _bufferReference) : CallbackAdapter()
+jobject nullObj = NULL;
+
+JNICallbackAdapter::JNICallbackAdapter(AIOController * _controller, jobject _callback, jobject _fileController, jobject _bufferReference, short _isRead) : CallbackAdapter()
 {
 	controller = _controller;
 	callback = _callback;
 	fileController = _fileController;
 	bufferReference = _bufferReference;
+	isRead = _isRead;
 }
 
 JNICallbackAdapter::~JNICallbackAdapter()
@@ -36,7 +39,7 @@
 
 void JNICallbackAdapter::done(THREAD_CONTEXT threadContext)
 {
-	JNI_ENV(threadContext)->CallVoidMethod(fileController, controller->done, callback, bufferReference); 
+	JNI_ENV(threadContext)->CallVoidMethod(fileController, controller->done, callback,  isRead ? nullObj : bufferReference); 
 	release(threadContext);
 }
 
@@ -44,7 +47,7 @@
 {
 	controller->log(threadContext, 0, "Libaio event generated errors, callback object was informed about it");
 	jstring strError = JNI_ENV(threadContext)->NewStringUTF(error.data());
-	JNI_ENV(threadContext)->CallVoidMethod(fileController, controller->error, callback, (jint)errorCode, strError);
+	JNI_ENV(threadContext)->CallVoidMethod(fileController, controller->error, callback, isRead ? nullObj : bufferReference, (jint)errorCode, strError);
 	release(threadContext);
 }
 

Modified: branches/clebert_temp_expirement/native/src/JNICallbackAdapter.h
===================================================================
--- branches/clebert_temp_expirement/native/src/JNICallbackAdapter.h	2009-06-23 00:53:36 UTC (rev 7431)
+++ branches/clebert_temp_expirement/native/src/JNICallbackAdapter.h	2009-06-23 00:55:33 UTC (rev 7432)
@@ -34,6 +34,8 @@
 	jobject callback;
 	jobject fileController;
 	jobject bufferReference;
+	// Is this a read operation
+	short isRead;
 
 	void release(THREAD_CONTEXT threadContext)
 	{
@@ -47,7 +49,7 @@
 	
 public:
 	// _ob must be a global Reference (use createGloblReferente before calling the constructor)
-	JNICallbackAdapter(AIOController * _controller, jobject _callback, jobject _fileController, jobject _bufferReference);
+	JNICallbackAdapter(AIOController * _controller, jobject _callback, jobject _fileController, jobject _bufferReference, short _isRead);
 	virtual ~JNICallbackAdapter();
 
 	void done(THREAD_CONTEXT threadContext);

Modified: branches/clebert_temp_expirement/native/src/JNI_AsynchronousFileImpl.cpp
===================================================================
--- branches/clebert_temp_expirement/native/src/JNI_AsynchronousFileImpl.cpp	2009-06-23 00:53:36 UTC (rev 7431)
+++ branches/clebert_temp_expirement/native/src/JNI_AsynchronousFileImpl.cpp	2009-06-23 00:55:33 UTC (rev 7432)
@@ -57,7 +57,7 @@
 		controller->done = env->GetMethodID(clazz,"callbackDone","(Lorg/jboss/messaging/core/asyncio/AIOCallback;Ljava/nio/ByteBuffer;)V");
 		if (!controller->done) return 0;
 
-		controller->error = env->GetMethodID(clazz, "callbackError", "(Lorg/jboss/messaging/core/asyncio/AIOCallback;ILjava/lang/String;)V");
+		controller->error = env->GetMethodID(clazz, "callbackError", "(Lorg/jboss/messaging/core/asyncio/AIOCallback;Ljava/nio/ByteBuffer;ILjava/lang/String;)V");
         if (!controller->error) return 0;
 
         jclass loggerClass = env->GetObjectClass(logger);
@@ -103,7 +103,7 @@
 			return;
 		}
 
-		CallbackAdapter * adapter = new JNICallbackAdapter(controller, env->NewGlobalRef(callback), env->NewGlobalRef(objThis), env->NewGlobalRef(jbuffer));
+		CallbackAdapter * adapter = new JNICallbackAdapter(controller, env->NewGlobalRef(callback), env->NewGlobalRef(objThis), env->NewGlobalRef(jbuffer), true);
 
 		controller->fileOutput.read(env, position, (size_t)size, buffer, adapter);
 	}
@@ -186,7 +186,7 @@
 		}
 
 
-		CallbackAdapter * adapter = new JNICallbackAdapter(controller, env->NewGlobalRef(callback), env->NewGlobalRef(objThis), env->NewGlobalRef(jbuffer));
+		CallbackAdapter * adapter = new JNICallbackAdapter(controller, env->NewGlobalRef(callback), env->NewGlobalRef(objThis), env->NewGlobalRef(jbuffer), false);
 
 		controller->fileOutput.write(env, position, (size_t)size, buffer, adapter);
 	}

Modified: branches/clebert_temp_expirement/native/src/Version.h
===================================================================
--- branches/clebert_temp_expirement/native/src/Version.h	2009-06-23 00:53:36 UTC (rev 7431)
+++ branches/clebert_temp_expirement/native/src/Version.h	2009-06-23 00:55:33 UTC (rev 7432)
@@ -1,5 +1,5 @@
 
 #ifndef _VERSION_NATIVE_AIO
-#define _VERSION_NATIVE_AIO 21
+#define _VERSION_NATIVE_AIO 22
 #endif
 

Modified: branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/asyncio/impl/AsynchronousFileImpl.java
===================================================================
--- branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/asyncio/impl/AsynchronousFileImpl.java	2009-06-23 00:53:36 UTC (rev 7431)
+++ branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/asyncio/impl/AsynchronousFileImpl.java	2009-06-23 00:55:33 UTC (rev 7432)
@@ -55,7 +55,7 @@
 
    private static boolean loaded = false;
 
-   private static int EXPECTED_NATIVE_VERSION = 21;
+   private static int EXPECTED_NATIVE_VERSION = 22;
 
    public static void addMax(final int io)
    {
@@ -289,11 +289,11 @@
                }
                catch (MessagingException e)
                {
-                  callbackError(aioCallback, e.getCode(), e.getMessage());
+                  callbackError(aioCallback, directByteBuffer, e.getCode(), e.getMessage());
                }
                catch (RuntimeException e)
                {
-                  callbackError(aioCallback, MessagingException.INTERNAL_ERROR, e.getMessage());
+                  callbackError(aioCallback, directByteBuffer, MessagingException.INTERNAL_ERROR, e.getMessage());
                }
             }
          });
@@ -308,11 +308,11 @@
          }
          catch (MessagingException e)
          {
-            callbackError(aioCallback, e.getCode(), e.getMessage());
+            callbackError(aioCallback, directByteBuffer, e.getCode(), e.getMessage());
          }
          catch (RuntimeException e)
          {
-            callbackError(aioCallback, MessagingException.INTERNAL_ERROR, e.getMessage());
+            callbackError(aioCallback, directByteBuffer, MessagingException.INTERNAL_ERROR, e.getMessage());
          }
       }
 
@@ -418,7 +418,9 @@
       writeSemaphore.release();
       pendingWrites.down();
       callback.done();
-      if (bufferCallback != null)
+      
+      // The buffer is not sent on callback for read operations
+      if (bufferCallback != null && buffer != null)
       {
          bufferCallback.bufferDone(buffer);
       }
@@ -426,12 +428,18 @@
 
    // Called by the JNI layer.. just ignore the
    // warning
-   private void callbackError(final AIOCallback callback, final int errorCode, final String errorMessage)
+   private void callbackError(final AIOCallback callback, final ByteBuffer buffer, final int errorCode, final String errorMessage)
    {
       log.warn("CallbackError: " + errorMessage);
       writeSemaphore.release();
       pendingWrites.down();
       callback.onError(errorCode, errorMessage);
+
+      // The buffer is not sent on callback for read operations
+      if (bufferCallback != null && buffer != null)
+      {
+         bufferCallback.bufferDone(buffer);
+      }
    }
 
    private void pollEvents()

Modified: branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/Journal.java
===================================================================
--- branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/Journal.java	2009-06-23 00:53:36 UTC (rev 7431)
+++ branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/Journal.java	2009-06-23 00:55:33 UTC (rev 7432)
@@ -81,6 +81,14 @@
 
    void appendRollbackRecord(long txID, boolean sync) throws Exception;
 
+
+   /**
+    * Eliminate deleted records of the journal
+    * @throws Exception 
+    */
+   void compact() throws Exception;
+   
+
    // Load
 
    long load(List<RecordInfo> committedRecords, List<PreparedTransactionInfo> preparedTransactions) throws Exception;

Modified: branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/SequentialFileFactory.java
===================================================================
--- branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/SequentialFileFactory.java	2009-06-23 00:53:36 UTC (rev 7431)
+++ branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/SequentialFileFactory.java	2009-06-23 00:55:33 UTC (rev 7432)
@@ -40,13 +40,11 @@
    List<String> listFiles(String extension) throws Exception;
 
    boolean isSupportsCallbacks();
-
+   
    ByteBuffer newBuffer(int size);
    
    void releaseBuffer(ByteBuffer buffer);
    
-   void controlBuffersLifeCycle(boolean value);
-   
    /** The factory may need to do some initialization before the file is activated.
     *  this was added as a hook for AIO to initialize the Observer on TimedBuffer.
     *  It could be eventually done the same on NIO if we implement TimedBuffer on NIO */
@@ -71,5 +69,9 @@
     * Create the directory if it doesn't exist yet
     */
    void createDirs() throws Exception;
+   
+   // used on tests only
+   void testFlush();
 
+
 }

Modified: branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/TestableJournal.java
===================================================================
--- branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/TestableJournal.java	2009-06-23 00:53:36 UTC (rev 7431)
+++ branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/TestableJournal.java	2009-06-23 00:55:33 UTC (rev 7432)
@@ -70,7 +70,7 @@
    void setAutoReclaim(boolean autoReclaim);
 
    boolean isAutoReclaim();
+
    
-   
 
 }

Modified: branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFileFactory.java
===================================================================
--- branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFileFactory.java	2009-06-23 00:53:36 UTC (rev 7431)
+++ branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFileFactory.java	2009-06-23 00:55:33 UTC (rev 7432)
@@ -111,6 +111,11 @@
       }
    }
 
+   public void testFlush()
+   {
+      timedBuffer.flush();
+   }
+
    public void deactivate(SequentialFile file)
    {
       timedBuffer.flush();
@@ -140,18 +145,6 @@
       return AsynchronousFileImpl.isLoaded();
    }
 
-   public void controlBuffersLifeCycle(boolean value)
-   {
-      if (value)
-      {
-         buffersControl.enable();
-      }
-      else
-      {
-         buffersControl.disable();
-      }
-   }
-
    public ByteBuffer newBuffer(int size)
    {
       if (size % 512 != 0)
@@ -223,23 +216,10 @@
        * and ready to be reused or GCed */
       private final ConcurrentLinkedQueue<ByteBuffer> reuseBuffersQueue = new ConcurrentLinkedQueue<ByteBuffer>();
 
-      /** During reload we may disable/enable buffer reuse */
-      private boolean enabled = true;
-
       private boolean stopped = false;
 
       final BufferCallback callback = new LocalBufferCallback();
 
-      public void enable()
-      {
-         this.enabled = true;
-      }
-
-      public void disable()
-      {
-         this.enabled = false;
-      }
-
       public ByteBuffer newBuffer(final int size)
       {
          // if a new buffer wasn't requested in 10 seconds, we clear the queue
@@ -314,26 +294,23 @@
             synchronized (ReuseBuffersController.this)
             {
 
-               if (enabled)
+               if (stopped)
                {
-                  if (stopped)
+                  releaseBuffer(buffer);
+               }
+               else
+               {
+                  bufferReuseLastTime = System.currentTimeMillis();
+
+                  // If a buffer has any other than the configured bufferSize, the buffer
+                  // will be just sent to GC
+                  if (buffer.capacity() == bufferSize)
                   {
-                     releaseBuffer(buffer);
+                     reuseBuffersQueue.offer(buffer);
                   }
                   else
                   {
-                     bufferReuseLastTime = System.currentTimeMillis();
-
-                     // If a buffer has any other than the configured bufferSize, the buffer
-                     // will be just sent to GC
-                     if (buffer.capacity() == bufferSize)
-                     {
-                        reuseBuffersQueue.offer(buffer);
-                     }
-                     else
-                     {
-                        releaseBuffer(buffer);
-                     }
+                     releaseBuffer(buffer);
                   }
                }
             }

Modified: branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/AbstractSequentialFactory.java
===================================================================
--- branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/AbstractSequentialFactory.java	2009-06-23 00:53:36 UTC (rev 7431)
+++ branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/AbstractSequentialFactory.java	2009-06-23 00:55:33 UTC (rev 7432)
@@ -53,10 +53,6 @@
    }
 
    
-   public void controlBuffersLifeCycle(boolean value)
-   {
-   }
-   
    public void stop()
    {
    }
@@ -76,6 +72,10 @@
    public void deactivate(SequentialFile file)
    {
    }
+   
+   public void testFlush()
+   {
+   }
 
    /** 
     * Create the directory if it doesn't exist yet

Modified: branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalFile.java
===================================================================
--- branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalFile.java	2009-06-23 00:53:36 UTC (rev 7431)
+++ branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalFile.java	2009-06-23 00:55:33 UTC (rev 7432)
@@ -44,6 +44,12 @@
    void incPosCount();
 
    void decPosCount();
+   
+   void incPendingTransaction();
+   
+   void decPendingTransaction();
+   
+   int getPendingTransactions();
 
    void setCanReclaim(boolean canDelete);
 

Modified: branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalFileImpl.java
===================================================================
--- branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalFileImpl.java	2009-06-23 00:53:36 UTC (rev 7431)
+++ branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalFileImpl.java	2009-06-23 00:55:33 UTC (rev 7432)
@@ -48,6 +48,8 @@
 
    private long offset;
 
+   private final AtomicInteger pendingTransactions = new AtomicInteger(0);
+   
    private final AtomicInteger posCount = new AtomicInteger(0);
 
    private boolean canReclaim;
@@ -104,7 +106,23 @@
    {
       posCount.decrementAndGet();
    }
+   
+   public void incPendingTransaction()
+   {
+      pendingTransactions.incrementAndGet();
+   }
+   
+   public void decPendingTransaction()
+   {
+      pendingTransactions.decrementAndGet();
+   }
+   
+   public int getPendingTransactions()
+   {
+      return pendingTransactions.get();
+   }
 
+
    public void extendOffset(final int delta)
    {
       offset += delta;

Modified: branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java
===================================================================
--- branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java	2009-06-23 00:53:36 UTC (rev 7431)
+++ branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java	2009-06-23 00:55:33 UTC (rev 7432)
@@ -47,6 +47,10 @@
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
 
 import org.jboss.messaging.core.buffers.ChannelBuffer;
 import org.jboss.messaging.core.buffers.ChannelBuffers;
@@ -92,14 +96,16 @@
 
    private static final Logger log = Logger.getLogger(JournalImpl.class);
 
-   private static final boolean trace = log.isTraceEnabled();
+   //private static final boolean trace = log.isTraceEnabled();
+   private static final boolean trace = true;
 
    // This method exists just to make debug easier.
    // I could replace log.trace by log.info temporarily while I was debugging
    // Journal
    private static final void trace(final String message)
    {
-      log.trace(message);
+      System.out.println(message);
+      //log.trace(message);
    }
 
    // The sizes of primitive types
@@ -112,7 +118,7 @@
 
    public static final int MIN_FILE_SIZE = 1024;
 
-   public static final int SIZE_HEADER = 4;
+   public static final int SIZE_HEADER = SIZE_INT * 2;
 
    public static final int BASIC_SIZE = SIZE_BYTE + SIZE_INT + SIZE_INT;
 
@@ -160,9 +166,9 @@
 
    // Attributes ----------------------------------------------------
 
-   private boolean autoReclaim = true;
+   private volatile boolean autoReclaim = true;
 
-   private final AtomicInteger nextOrderingId = new AtomicInteger(0);
+   private final AtomicInteger nextFileID = new AtomicInteger(0);
 
    // used for Asynchronous IO only (ignored on NIO).
    private final int maxAIO;
@@ -183,16 +189,23 @@
 
    private final BlockingQueue<JournalFile> openedFiles = new LinkedBlockingQueue<JournalFile>();
 
-   private final ConcurrentMap<Long, PosFiles> posFilesMap = new ConcurrentHashMap<Long, PosFiles>();
+   // Compacting may replace this structure
+   private volatile ConcurrentMap<Long, RecordFilesRelationship> recordsRelationshipMap = new ConcurrentHashMap<Long, RecordFilesRelationship>();
 
    private final ConcurrentMap<Long, JournalTransaction> transactionInfos = new ConcurrentHashMap<Long, JournalTransaction>();
 
-   private final ConcurrentMap<Long, TransactionCallback> transactionCallbacks = new ConcurrentHashMap<Long, TransactionCallback>();
-
    private ExecutorService filesExecutor = null;
 
    private final Semaphore lock = new Semaphore(1);
 
+   private final ReadWriteLock compactingLock = new ReentrantReadWriteLock();
+
+   /** We don't lock the journal while compacting, however during a short time before we start, and after we finish,
+    *  we need to rearrange the referenceCounting structures*/
+   private final Lock readLockCompact = compactingLock.readLock();
+
+   private final Lock writeLockCompact = compactingLock.writeLock();
+
    private volatile JournalFile currentFile;
 
    private volatile int state;
@@ -267,32 +280,44 @@
          throw new IllegalStateException("Journal must be loaded first");
       }
 
-      int recordLength = record.getEncodeSize();
+      IOCallback callback = null;
 
-      int size = SIZE_ADD_RECORD + recordLength;
+      readLockCompact.lock();
 
-      ChannelBuffer bb = newBuffer(size);
+      try
+      {
 
-      bb.writeByte(ADD_RECORD);
-      bb.writeInt(-1); // skip ID part
-      bb.writeLong(id);
-      bb.writeInt(recordLength);
-      bb.writeByte(recordType);
-      record.encode(bb);
-      bb.writeInt(size);
+         int recordLength = record.getEncodeSize();
 
-      IOCallback callback = getSyncCallback(sync);
+         int size = SIZE_ADD_RECORD + recordLength;
 
-      lock.acquire();
-      try
-      {
-         JournalFile usedFile = appendRecord(bb, sync, callback);
+         ChannelBuffer bb = newBuffer(size);
 
-         posFilesMap.put(id, new PosFiles(usedFile));
+         bb.writeByte(ADD_RECORD);
+         bb.writeInt(-1); // skip ID part
+         bb.writeLong(id);
+         bb.writeInt(recordLength);
+         bb.writeByte(recordType);
+         record.encode(bb);
+         bb.writeInt(size);
+
+         callback = getSyncCallback(sync);
+
+         lock.acquire();
+         try
+         {
+            JournalFile usedFile = appendRecord(bb, sync, callback);
+
+            recordsRelationshipMap.put(id, new RecordFilesRelationship(usedFile));
+         }
+         finally
+         {
+            lock.release();
+         }
       }
       finally
       {
-         lock.release();
+         readLockCompact.unlock();
       }
 
       if (callback != null)
@@ -313,37 +338,49 @@
          throw new IllegalStateException("Journal must be loaded first");
       }
 
-      PosFiles posFiles = posFilesMap.get(id);
+      IOCallback callback = null;
 
-      if (posFiles == null)
+      readLockCompact.lock();
+
+      try
       {
-         throw new IllegalStateException("Cannot find add info " + id);
-      }
 
-      int size = SIZE_UPDATE_RECORD + record.getEncodeSize();
+         RecordFilesRelationship posFiles = recordsRelationshipMap.get(id);
 
-      ChannelBuffer bb = newBuffer(size);
+         if (posFiles == null)
+         {
+            throw new IllegalStateException("Cannot find add info " + id);
+         }
 
-      bb.writeByte(UPDATE_RECORD);
-      bb.writeInt(-1); // skip ID part
-      bb.writeLong(id);
-      bb.writeInt(record.getEncodeSize());
-      bb.writeByte(recordType);
-      record.encode(bb);
-      bb.writeInt(size);
+         int size = SIZE_UPDATE_RECORD + record.getEncodeSize();
 
-      IOCallback callback = getSyncCallback(sync);
+         ChannelBuffer bb = newBuffer(size);
 
-      lock.acquire();
-      try
-      {
-         JournalFile usedFile = appendRecord(bb, sync, callback);
+         bb.writeByte(UPDATE_RECORD);
+         bb.writeInt(-1); // skip ID part
+         bb.writeLong(id);
+         bb.writeInt(record.getEncodeSize());
+         bb.writeByte(recordType);
+         record.encode(bb);
+         bb.writeInt(size);
 
-         posFiles.addUpdateFile(usedFile);
+         callback = getSyncCallback(sync);
+
+         lock.acquire();
+         try
+         {
+            JournalFile usedFile = appendRecord(bb, sync, callback);
+
+            posFiles.addUpdateFile(usedFile);
+         }
+         finally
+         {
+            lock.release();
+         }
       }
       finally
       {
-         lock.release();
+         readLockCompact.unlock();
       }
 
       if (callback != null)
@@ -359,34 +396,46 @@
          throw new IllegalStateException("Journal must be loaded first");
       }
 
-      PosFiles posFiles = posFilesMap.remove(id);
+      readLockCompact.lock();
 
-      if (posFiles == null)
+      IOCallback callback = null;
+
+      try
       {
-         throw new IllegalStateException("Cannot find add info " + id);
-      }
 
-      int size = SIZE_DELETE_RECORD;
+         RecordFilesRelationship posFiles = recordsRelationshipMap.remove(id);
 
-      ChannelBuffer bb = newBuffer(size);
+         if (posFiles == null)
+         {
+            throw new IllegalStateException("Cannot find add info " + id);
+         }
 
-      bb.writeByte(DELETE_RECORD);
-      bb.writeInt(-1); // skip ID part
-      bb.writeLong(id);
-      bb.writeInt(size);
+         int size = SIZE_DELETE_RECORD;
 
-      IOCallback callback = getSyncCallback(sync);
+         ChannelBuffer bb = newBuffer(size);
 
-      lock.acquire();
-      try
-      {
-         JournalFile usedFile = appendRecord(bb, sync, callback);
+         bb.writeByte(DELETE_RECORD);
+         bb.writeInt(-1); // skip ID part
+         bb.writeLong(id);
+         bb.writeInt(size);
 
-         posFiles.addDelete(usedFile);
+         callback = getSyncCallback(sync);
+
+         lock.acquire();
+         try
+         {
+            JournalFile usedFile = appendRecord(bb, sync, callback);
+
+            posFiles.addDelete(usedFile);
+         }
+         finally
+         {
+            lock.release();
+         }
       }
       finally
       {
-         lock.release();
+         readLockCompact.unlock();
       }
 
       if (callback != null)
@@ -416,33 +465,43 @@
          throw new IllegalStateException("Journal must be loaded first");
       }
 
-      int recordLength = record.getEncodeSize();
+      readLockCompact.lock();
 
-      int size = SIZE_ADD_RECORD_TX + recordLength;
+      try
+      {
 
-      ChannelBuffer bb = newBuffer(size);
+         int recordLength = record.getEncodeSize();
 
-      bb.writeByte(ADD_RECORD_TX);
-      bb.writeInt(-1); // skip ID part
-      bb.writeLong(txID);
-      bb.writeLong(id);
-      bb.writeInt(recordLength);
-      bb.writeByte(recordType);
-      record.encode(bb);
-      bb.writeInt(size);
+         int size = SIZE_ADD_RECORD_TX + recordLength;
 
-      lock.acquire();
-      try
-      {
-         JournalFile usedFile = appendRecord(bb, false, getTransactionCallback(txID, sync));
+         ChannelBuffer bb = newBuffer(size);
 
-         JournalTransaction tx = getTransactionInfo(txID);
+         bb.writeByte(ADD_RECORD_TX);
+         bb.writeInt(-1); // skip ID part
+         bb.writeLong(txID);
+         bb.writeLong(id);
+         bb.writeInt(recordLength);
+         bb.writeByte(recordType);
+         record.encode(bb);
+         bb.writeInt(size);
 
-         tx.addPositive(usedFile, id);
+         lock.acquire();
+         try
+         {
+            JournalTransaction tx = getTransactionInfo(txID);
+
+            JournalFile usedFile = appendRecord(bb, false, getTransactionCallback(tx, sync));
+
+            tx.addPositive(usedFile, id);
+         }
+         finally
+         {
+            lock.release();
+         }
       }
       finally
       {
-         lock.release();
+         readLockCompact.unlock();
       }
    }
 
@@ -466,31 +525,41 @@
          throw new IllegalStateException("Journal must be loaded first");
       }
 
-      int size = SIZE_UPDATE_RECORD_TX + record.getEncodeSize();
+      readLockCompact.lock();
 
-      ChannelBuffer bb = newBuffer(size);
-
-      bb.writeByte(UPDATE_RECORD_TX);
-      bb.writeInt(-1); // skip ID part
-      bb.writeLong(txID);
-      bb.writeLong(id);
-      bb.writeInt(record.getEncodeSize());
-      bb.writeByte(recordType);
-      record.encode(bb);
-      bb.writeInt(size);
-
-      lock.acquire();
       try
       {
-         JournalFile usedFile = appendRecord(bb, false, getTransactionCallback(txID, sync));
 
-         JournalTransaction tx = getTransactionInfo(txID);
+         int size = SIZE_UPDATE_RECORD_TX + record.getEncodeSize();
 
-         tx.addPositive(usedFile, id);
+         ChannelBuffer bb = newBuffer(size);
+
+         bb.writeByte(UPDATE_RECORD_TX);
+         bb.writeInt(-1); // skip ID part
+         bb.writeLong(txID);
+         bb.writeLong(id);
+         bb.writeInt(record.getEncodeSize());
+         bb.writeByte(recordType);
+         record.encode(bb);
+         bb.writeInt(size);
+
+         lock.acquire();
+         try
+         {
+            JournalTransaction tx = getTransactionInfo(txID);
+
+            JournalFile usedFile = appendRecord(bb, false, getTransactionCallback(tx, sync));
+
+            tx.addPositive(usedFile, id);
+         }
+         finally
+         {
+            lock.release();
+         }
       }
       finally
       {
-         lock.release();
+         readLockCompact.unlock();
       }
    }
 
@@ -509,33 +578,42 @@
          throw new IllegalStateException("Journal must be loaded first");
       }
 
-      int size = SIZE_DELETE_RECORD_TX + (record != null ? record.getEncodeSize() : 0);
+      readLockCompact.lock();
 
-      ChannelBuffer bb = newBuffer(size);
-
-      bb.writeByte(DELETE_RECORD_TX);
-      bb.writeInt(-1); // skip ID part
-      bb.writeLong(txID);
-      bb.writeLong(id);
-      bb.writeInt(record != null ? record.getEncodeSize() : 0);
-      if (record != null)
-      {
-         record.encode(bb);
-      }
-      bb.writeInt(size);
-
-      lock.acquire();
       try
       {
-         JournalFile usedFile = appendRecord(bb, false, getTransactionCallback(txID, sync));
+         int size = SIZE_DELETE_RECORD_TX + (record != null ? record.getEncodeSize() : 0);
 
-         JournalTransaction tx = getTransactionInfo(txID);
+         ChannelBuffer bb = newBuffer(size);
 
-         tx.addNegative(usedFile, id);
+         bb.writeByte(DELETE_RECORD_TX);
+         bb.writeInt(-1); // skip ID part
+         bb.writeLong(txID);
+         bb.writeLong(id);
+         bb.writeInt(record != null ? record.getEncodeSize() : 0);
+         if (record != null)
+         {
+            record.encode(bb);
+         }
+         bb.writeInt(size);
+
+         lock.acquire();
+         try
+         {
+            JournalTransaction tx = getTransactionInfo(txID);
+
+            JournalFile usedFile = appendRecord(bb, false, getTransactionCallback(tx, sync));
+
+            tx.addNegative(usedFile, id);
+         }
+         finally
+         {
+            lock.release();
+         }
       }
       finally
       {
-         lock.release();
+         readLockCompact.unlock();
       }
    }
 
@@ -546,30 +624,40 @@
          throw new IllegalStateException("Journal must be loaded first");
       }
 
-      int size = SIZE_DELETE_RECORD_TX;
+      readLockCompact.lock();
 
-      ChannelBuffer bb = newBuffer(size);
-
-      bb.writeByte(DELETE_RECORD_TX);
-      bb.writeInt(-1); // skip ID part
-      bb.writeLong(txID);
-      bb.writeLong(id);
-      bb.writeInt(0);
-      bb.writeInt(size);
-
-      lock.acquire();
       try
       {
-         JournalFile usedFile = appendRecord(bb, false, getTransactionCallback(txID, sync));
+         int size = SIZE_DELETE_RECORD_TX;
 
-         JournalTransaction tx = getTransactionInfo(txID);
+         ChannelBuffer bb = newBuffer(size);
 
-         tx.addNegative(usedFile, id);
+         bb.writeByte(DELETE_RECORD_TX);
+         bb.writeInt(-1); // skip ID part
+         bb.writeLong(txID);
+         bb.writeLong(id);
+         bb.writeInt(0);
+         bb.writeInt(size);
+
+         lock.acquire();
+         try
+         {
+            JournalTransaction tx = getTransactionInfo(txID);
+
+            JournalFile usedFile = appendRecord(bb, false, getTransactionCallback(tx, sync));
+
+            tx.addNegative(usedFile, id);
+         }
+         finally
+         {
+            lock.release();
+         }
       }
       finally
       {
-         lock.release();
+         readLockCompact.unlock();
       }
+
    }
 
    /** 
@@ -592,22 +680,34 @@
          throw new IllegalStateException("Journal must be loaded first");
       }
 
-      JournalTransaction tx = getTransactionInfo(txID);
+      readLockCompact.lock();
 
-      ChannelBuffer bb = writeTransaction(PREPARE_RECORD, txID, tx, transactionData);
+      IOCallback callback = null;
 
-      IOCallback callback = getTransactionCallback(txID, sync);
-
-      lock.acquire();
       try
       {
-         JournalFile usedFile = appendRecord(bb, sync, callback);
+         JournalTransaction tx = getTransactionInfo(txID);
 
-         tx.prepare(usedFile);
+         ChannelBuffer bb = writeTransaction(PREPARE_RECORD, txID, tx, transactionData);
+
+         callback = getTransactionCallback(tx, sync);
+
+         lock.acquire();
+         try
+         {
+            JournalFile usedFile = appendRecord(bb, sync, callback);
+
+            tx.prepare(usedFile);
+         }
+         finally
+         {
+            lock.release();
+         }
+
       }
       finally
       {
-         lock.release();
+         readLockCompact.unlock();
       }
 
       // We should wait this outside of the lock, to increase throughput
@@ -615,6 +715,7 @@
       {
          callback.waitCompletion();
       }
+
    }
 
    /**
@@ -641,29 +742,40 @@
          throw new IllegalStateException("Journal must be loaded first");
       }
 
-      JournalTransaction tx = transactionInfos.remove(txID);
+      readLockCompact.lock();
 
-      if (tx == null)
+      IOCallback callback = null;
+
+      try
       {
-         throw new IllegalStateException("Cannot find tx with id " + txID);
-      }
 
-      ChannelBuffer bb = writeTransaction(COMMIT_RECORD, txID, tx, null);
+         JournalTransaction tx = transactionInfos.remove(txID);
 
-      IOCallback callback = getTransactionCallback(txID, sync);
+         if (tx == null)
+         {
+            throw new IllegalStateException("Cannot find tx with id " + txID);
+         }
 
-      lock.acquire();
-      try
-      {
-         JournalFile usedFile = appendRecord(bb, sync, callback);
+         ChannelBuffer bb = writeTransaction(COMMIT_RECORD, txID, tx, null);
 
-         transactionCallbacks.remove(txID);
+         callback = getTransactionCallback(tx, sync);
 
-         tx.commit(usedFile);
+         lock.acquire();
+         try
+         {
+            JournalFile usedFile = appendRecord(bb, sync, callback);
+
+            tx.commit(usedFile);
+         }
+         finally
+         {
+            lock.release();
+         }
+
       }
       finally
       {
-         lock.release();
+         readLockCompact.unlock();
       }
 
       // We should wait this outside of the lock, to increase throuput
@@ -671,7 +783,6 @@
       {
          callback.waitCompletion();
       }
-
    }
 
    public void appendRollbackRecord(final long txID, final boolean sync) throws Exception
@@ -681,36 +792,46 @@
          throw new IllegalStateException("Journal must be loaded first");
       }
 
-      JournalTransaction tx = transactionInfos.remove(txID);
+      IOCallback callback = null;
 
-      if (tx == null)
+      readLockCompact.lock();
+
+      try
       {
-         throw new IllegalStateException("Cannot find tx with id " + txID);
-      }
+         JournalTransaction tx = transactionInfos.remove(txID);
 
-      int size = SIZE_ROLLBACK_RECORD;
+         if (tx == null)
+         {
+            throw new IllegalStateException("Cannot find tx with id " + txID);
+         }
 
-      ChannelBuffer bb = newBuffer(size);
+         int size = SIZE_ROLLBACK_RECORD;
 
-      bb.writeByte(ROLLBACK_RECORD);
-      bb.writeInt(-1); // skip ID part
-      bb.writeLong(txID);
-      bb.writeInt(size);
+         ChannelBuffer bb = newBuffer(size);
 
-      IOCallback callback = getTransactionCallback(txID, sync);
+         bb.writeByte(ROLLBACK_RECORD);
+         bb.writeInt(-1); // skip ID part
+         bb.writeLong(txID);
+         bb.writeInt(size);
 
-      lock.acquire();
-      try
-      {
-         JournalFile usedFile = appendRecord(bb, sync, callback);
+         callback = getTransactionCallback(tx, sync);
 
-         transactionCallbacks.remove(txID);
+         lock.acquire();
+         try
+         {
+            JournalFile usedFile = appendRecord(bb, sync, callback);
 
-         tx.rollback(usedFile);
+            tx.rollback(usedFile);
+         }
+         finally
+         {
+            lock.release();
+         }
+
       }
       finally
       {
-         lock.release();
+         readLockCompact.unlock();
       }
 
       // We should wait this outside of the lock, to increase throuput
@@ -764,6 +885,59 @@
       return maxID;
    }
 
+   public void compact() throws Exception
+   {
+      ConcurrentMap<Long, RecordFilesRelationship> recordsSnapshot = null;
+      
+      ArrayList<JournalFile> dataFilesToProcess = new ArrayList<JournalFile>(dataFiles.size());
+
+      boolean previousReclaimValue = autoReclaim;
+
+      try
+      {
+
+         // First, we replace the recordsRelationshipMap by a new one.
+         // We need to guarantee that the journal is frozen for this short time
+         // We don't freeze the journal as we compact, only for the short time where we replace recordsRelationshipMap
+         writeLockCompact.lock();
+         try
+         {
+            autoReclaim = false;
+
+            recordsSnapshot = recordsRelationshipMap;
+
+            recordsRelationshipMap = new ConcurrentHashMap<Long, RecordFilesRelationship>();
+            
+            for (JournalFile file: dataFiles)
+            {
+               if (file.getPendingTransactions() == 0)
+               {
+                  trace("Adding " + file + " to compact list");
+                  dataFilesToProcess.add(file);
+               }
+               else
+               {
+                  trace(file + " will not be compacted as it has pending transactions");
+                  break;
+               }
+            }
+
+         }
+         finally
+         {
+            writeLockCompact.unlock();
+         }
+
+      }
+      finally
+      {
+         autoReclaim = previousReclaimValue;
+      }
+      
+      
+
+   }
+
    private boolean isInvalidSize(int bufferPos, int size)
    {
       if (size < 0)
@@ -820,8 +994,6 @@
          throw new IllegalStateException("Journal must be in started state");
       }
 
-      fileFactory.controlBuffersLifeCycle(false);
-
       final Map<Long, TransactionHolder> transactions = new LinkedHashMap<Long, TransactionHolder>();
 
       final List<JournalFile> orderedFiles = orderFiles();
@@ -849,7 +1021,7 @@
 
                loadManager.addRecord(info);
 
-               posFilesMap.put(info.id, new PosFiles(file));
+               recordsRelationshipMap.put(info.id, new RecordFilesRelationship(file));
             }
 
             public void updateRecord(RecordInfo info) throws Exception
@@ -862,7 +1034,7 @@
 
                loadManager.updateRecord(info);
 
-               PosFiles posFiles = posFilesMap.get(info.id);
+               RecordFilesRelationship posFiles = recordsRelationshipMap.get(info.id);
 
                if (posFiles != null)
                {
@@ -884,7 +1056,7 @@
 
                loadManager.deleteRecord(recordID);
 
-               PosFiles posFiles = posFilesMap.remove(recordID);
+               RecordFilesRelationship posFiles = recordsRelationshipMap.remove(recordID);
 
                if (posFiles != null)
                {
@@ -1123,8 +1295,6 @@
          }
       }
 
-      fileFactory.controlBuffersLifeCycle(true);
-
       // Create any more files we need
 
       // FIXME - size() involves a scan
@@ -1135,7 +1305,7 @@
          for (int i = 0; i < filesToCreate; i++)
          {
             // Keeping all files opened can be very costly (mainly on AIO)
-            freeFiles.add(createFile(false));
+            freeFiles.add(createFile(false, false));
          }
       }
 
@@ -1163,7 +1333,7 @@
       {
          currentFile = freeFiles.remove();
 
-         openFile(currentFile);
+         openFile(currentFile, true);
       }
 
       fileFactory.activate(currentFile.getFile());
@@ -1275,9 +1445,14 @@
     *  It will call waitComplete on every transaction, so any assertions on the file system will be correct after this */
    public void debugWait() throws Exception
    {
-      for (TransactionCallback callback : transactionCallbacks.values())
+      fileFactory.testFlush();
+
+      for (JournalTransaction tx : transactionInfos.values())
       {
-         callback.waitCompletion();
+         if (tx.getCallback() != null)
+         {
+            tx.getCallback().waitCompletion();
+         }
       }
 
       if (filesExecutor != null && !filesExecutor.isShutdown())
@@ -1352,7 +1527,7 @@
 
    public int getIDMapSize()
    {
-      return posFilesMap.size();
+      return recordsRelationshipMap.size();
    }
 
    public int getFileSize()
@@ -1486,7 +1661,7 @@
    // Discard the old JournalFile and set it with a new ID
    private JournalFile reinitializeFile(final JournalFile file) throws Exception
    {
-      int newOrderingID = generateOrderingID();
+      int newFileID = generateFileID();
 
       SequentialFile sf = file.getFile();
 
@@ -1494,15 +1669,16 @@
 
       sf.position(0);
 
-      ByteBuffer bb = fileFactory.newBuffer(SIZE_INT);
+      ByteBuffer bb = fileFactory.newBuffer(SIZE_HEADER);
 
-      bb.putInt(newOrderingID);
+      bb.putInt(newFileID);
+      bb.putInt(newFileID);
 
       bb.rewind();
 
       sf.write(bb, true);
 
-      JournalFile jf = new JournalFileImpl(sf, newOrderingID);
+      JournalFile jf = new JournalFileImpl(sf, newFileID);
 
       sf.position(bb.limit());
 
@@ -1511,7 +1687,6 @@
       return jf;
    }
 
-   
    private int readJournalFile(JournalFile file, JournalReader reader) throws Exception
    {
       ByteBuffer wholeFileBuffer = fileFactory.newBuffer(fileSize);
@@ -1729,13 +1904,13 @@
                reader.addRecord(new RecordInfo(recordID, userRecordType, record, false));
                break;
             }
-            
+
             case UPDATE_RECORD:
             {
                reader.updateRecord(new RecordInfo(recordID, userRecordType, record, true));
                break;
             }
-            
+
             case DELETE_RECORD:
             {
                reader.deleteRecord(recordID);
@@ -1747,19 +1922,19 @@
                reader.addRecordTX(transactionID, new RecordInfo(recordID, userRecordType, record, false));
                break;
             }
-            
+
             case UPDATE_RECORD_TX:
             {
                reader.updateRecordTX(transactionID, new RecordInfo(recordID, userRecordType, record, true));
                break;
             }
-            
+
             case DELETE_RECORD_TX:
             {
                reader.deleteRecordTX(transactionID, new RecordInfo(recordID, (byte)0, record, true));
                break;
             }
-            
+
             case PREPARE_RECORD:
             {
 
@@ -1821,7 +1996,6 @@
 
    }
 
-   
    /** It will read the elements-summary back from the commit/prepare transaction 
     *  Pair<FileID, Counter> */
    @SuppressWarnings("unchecked")
@@ -2041,7 +2215,7 @@
 
          file.open(1);
 
-         ByteBuffer bb = fileFactory.newBuffer(SIZE_INT);
+         ByteBuffer bb = fileFactory.newBuffer(SIZE_HEADER);
 
          file.read(bb);
 
@@ -2051,9 +2225,9 @@
 
          bb = null;
 
-         if (nextOrderingId.get() < orderingID)
+         if (nextFileID.get() < orderingID)
          {
-            nextOrderingId.set(orderingID);
+            nextFileID.set(orderingID);
          }
 
          orderedFiles.add(new JournalFileImpl(file, orderingID));
@@ -2150,11 +2324,11 @@
     * @return
     * @throws Exception
     */
-   private JournalFile createFile(final boolean keepOpened) throws Exception
+   private JournalFile createFile(final boolean keepOpened, final boolean multiAIO) throws Exception
    {
-      int orderingID = generateOrderingID();
+      int fileID = generateFileID();
 
-      String fileName = filePrefix + "-" + orderingID + "." + fileExtension;
+      String fileName = filePrefix + "-" + fileID + "." + fileExtension;
 
       if (trace)
       {
@@ -2163,19 +2337,27 @@
 
       SequentialFile sequentialFile = fileFactory.createSequentialFile(fileName, maxAIO);
 
-      sequentialFile.open();
+      if (multiAIO)
+      {
+         sequentialFile.open();
+      }
+      else
+      {
+         sequentialFile.open(1);
+      }
 
       sequentialFile.fill(0, fileSize, FILL_CHARACTER);
 
-      ByteBuffer bb = fileFactory.newBuffer(SIZE_INT);
+      ByteBuffer bb = fileFactory.newBuffer(SIZE_HEADER);
 
-      bb.putInt(orderingID);
+      bb.putInt(fileID);
+      bb.putInt(fileID);
 
       bb.rewind();
 
       sequentialFile.write(bb, true);
 
-      JournalFile info = new JournalFileImpl(sequentialFile, orderingID);
+      JournalFile info = new JournalFileImpl(sequentialFile, fileID);
 
       if (!keepOpened)
       {
@@ -2185,16 +2367,23 @@
       return info;
    }
 
-   private void openFile(final JournalFile file) throws Exception
+   private void openFile(final JournalFile file, final boolean multiAIO) throws Exception
    {
-      file.getFile().open();
+      if (multiAIO)
+      {
+         file.getFile().open();
+      }
+      else
+      {
+         file.getFile().open(1);
+      }
 
       file.getFile().position(file.getFile().calculateBlockStart(SIZE_HEADER));
    }
 
-   private int generateOrderingID()
+   private int generateFileID()
    {
-      return nextOrderingId.incrementAndGet();
+      return nextFileID.incrementAndGet();
    }
 
    // You need to guarantee lock.acquire() before calling this method
@@ -2272,6 +2461,17 @@
     * */
    private void pushOpenedFile() throws Exception
    {
+      JournalFile nextOpenedFile = openFile(true);
+
+      openedFiles.offer(nextOpenedFile);
+   }
+
+   /**
+    * @return
+    * @throws Exception
+    */
+   private JournalFile openFile(boolean multiAIO) throws Exception
+   {
       JournalFile nextOpenedFile = null;
       try
       {
@@ -2283,14 +2483,13 @@
 
       if (nextOpenedFile == null)
       {
-         nextOpenedFile = createFile(true);
+         nextOpenedFile = createFile(true, multiAIO);
       }
       else
       {
-         openFile(nextOpenedFile);
+         openFile(nextOpenedFile, multiAIO);
       }
-
-      openedFiles.offer(nextOpenedFile);
+      return nextOpenedFile;
    }
 
    private void closeFile(final JournalFile file)
@@ -2351,22 +2550,17 @@
       }
    }
 
-   private IOCallback getTransactionCallback(final long transactionId, final boolean sync) throws MessagingException
+   private IOCallback getTransactionCallback(final JournalTransaction tx, final boolean sync) throws MessagingException
    {
       if (sync && fileFactory.isSupportsCallbacks())
       {
-         TransactionCallback callback = transactionCallbacks.get(transactionId);
+         TransactionCallback callback = tx.getCallback();
 
          if (callback == null)
          {
             callback = new TransactionCallback();
 
-            TransactionCallback callbackCheck = transactionCallbacks.putIfAbsent(transactionId, callback);
-
-            if (callbackCheck != null)
-            {
-               callback = callbackCheck;
-            }
+            tx.setCallback(callback);
          }
 
          if (callback.errorMessage != null)
@@ -2433,14 +2627,18 @@
 
    }
 
-   /** Used on the ref-count for reclaiming */
-   private static class PosFiles
+   /** 
+    * This holds the relationship a record has with other files in regard to reference counting.
+    * Note: This class used to be called PosFiles
+    * 
+    * Used on the ref-count for reclaiming */
+   private static class RecordFilesRelationship
    {
       private final JournalFile addFile;
 
       private List<JournalFile> updateFiles;
 
-      PosFiles(final JournalFile addFile)
+      RecordFilesRelationship(final JournalFile addFile)
       {
          this.addFile = addFile;
 
@@ -2475,7 +2673,7 @@
       public String toString()
       {
          StringBuffer buffer = new StringBuffer();
-         buffer.append("PosFiles(add=" + addFile.getFile().getFileName());
+         buffer.append("RecordFilesRelationship(add=" + addFile.getFile().getFileName());
 
          if (updateFiles != null)
          {
@@ -2495,11 +2693,15 @@
 
    private class JournalTransaction
    {
+      private TransactionCallback callback;
+
       private List<Pair<JournalFile, Long>> pos;
 
       private List<Pair<JournalFile, Long>> neg;
 
-      private Set<JournalFile> transactionPos;
+      // All the files this transaction is touching on.
+      // We can't have those files being reclaimed or compacted if there is a pending transaction
+      private Set<JournalFile> pendingFiles;
 
       // Map of file id to number of elements participating on the transaction
       // in that file
@@ -2511,11 +2713,27 @@
          return numberOfElementsPerFile;
       }
 
+      /**
+       * @param callback
+       */
+      public void setCallback(TransactionCallback callback)
+      {
+         this.callback = callback;
+      }
+
+      /**
+       * @return
+       */
+      public TransactionCallback getCallback()
+      {
+         return this.callback;
+      }
+
       public void addPositive(final JournalFile file, final long id)
       {
          getCounter(file).incrementAndGet();
 
-         addTXPosCount(file);
+         addFile(file);
 
          if (pos == null)
          {
@@ -2529,7 +2747,7 @@
       {
          getCounter(file).incrementAndGet();
 
-         addTXPosCount(file);
+         addFile(file);
 
          if (neg == null)
          {
@@ -2539,19 +2757,22 @@
          neg.add(new Pair<JournalFile, Long>(file, id));
       }
 
+      /** 
+       * The caller of this method needs to guarantee lock.acquire. (unless this is being called from load what is a single thread process).
+       * */
       public void commit(final JournalFile file)
       {
          if (pos != null)
          {
             for (Pair<JournalFile, Long> p : pos)
             {
-               PosFiles posFiles = posFilesMap.get(p.b);
+               RecordFilesRelationship posFiles = recordsRelationshipMap.get(p.b);
 
                if (posFiles == null)
                {
-                  posFiles = new PosFiles(p.a);
+                  posFiles = new RecordFilesRelationship(p.a);
 
-                  posFilesMap.put(p.b, posFiles);
+                  recordsRelationshipMap.put(p.b, posFiles);
                }
                else
                {
@@ -2564,7 +2785,7 @@
          {
             for (Pair<JournalFile, Long> n : neg)
             {
-               PosFiles posFiles = posFilesMap.remove(n.b);
+               RecordFilesRelationship posFiles = recordsRelationshipMap.remove(n.b);
 
                if (posFiles != null)
                {
@@ -2576,12 +2797,17 @@
          // Now add negs for the pos we added in each file in which there were
          // transactional operations
 
-         for (JournalFile jf : transactionPos)
+         for (JournalFile jf : pendingFiles)
          {
             file.incNegCount(jf);
+            jf.decPendingTransaction();
          }
       }
 
+      /** 
+       * The caller of this method needs to guarantee lock.acquire before calling this method if being used outside of the lock context.
+       * or else potFilesMap could be affected
+       * */
       public void rollback(final JournalFile file)
       {
          // Now add negs for the pos we added in each file in which there were
@@ -2594,45 +2820,54 @@
          // just left with a prepare when the tx
          // has actually been rolled back
 
-         for (JournalFile jf : transactionPos)
+         for (JournalFile jf : pendingFiles)
          {
             file.incNegCount(jf);
+            jf.decPendingTransaction();
          }
       }
 
+      /** 
+       * The caller of this method needs to guarantee lock.acquire before calling this method if being used outside of the lock context.
+       * or else potFilesMap could be affected
+       * */
       public void prepare(final JournalFile file)
       {
          // We don't want the prepare record getting deleted before time
 
-         addTXPosCount(file);
+         addFile(file);
       }
 
+      /** Used by load, when the transaction was not loaded correctly */
       public void forget()
       {
          // The transaction was not committed or rolled back in the file, so we
          // reverse any pos counts we added
-
-         for (JournalFile jf : transactionPos)
+         for (JournalFile jf : pendingFiles)
          {
             jf.decPosCount();
+            jf.decPendingTransaction();
          }
+
       }
 
-      private void addTXPosCount(final JournalFile file)
+      private void addFile(final JournalFile file)
       {
-         if (transactionPos == null)
+         if (pendingFiles == null)
          {
-            transactionPos = new HashSet<JournalFile>();
+            pendingFiles = new HashSet<JournalFile>();
          }
 
-         if (!transactionPos.contains(file))
+         if (!pendingFiles.contains(file))
          {
-            transactionPos.add(file);
+            pendingFiles.add(file);
 
             // We add a pos for the transaction itself in the file - this
             // prevents any transactional operations
             // being deleted before a commit or rollback is written
             file.incPosCount();
+
+            file.incPendingTransaction();
          }
       }
 

Modified: branches/clebert_temp_expirement/tests/src/org/jboss/messaging/tests/integration/journal/RealAIOJournalImplTest.java
===================================================================
--- branches/clebert_temp_expirement/tests/src/org/jboss/messaging/tests/integration/journal/RealAIOJournalImplTest.java	2009-06-23 00:53:36 UTC (rev 7431)
+++ branches/clebert_temp_expirement/tests/src/org/jboss/messaging/tests/integration/journal/RealAIOJournalImplTest.java	2009-06-23 00:55:33 UTC (rev 7432)
@@ -70,7 +70,12 @@
 
       file.mkdir();
 
-      return new AIOSequentialFileFactory(getTestDir());
+      return new AIOSequentialFileFactory(getTestDir(),
+                                          ConfigurationImpl.DEFAULT_JOURNAL_AIO_BUFFER_SIZE,
+                                          1000000,
+                                          true,
+                                          false      
+      );
    }
 
 

Modified: branches/clebert_temp_expirement/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalImplTestBase.java
===================================================================
--- branches/clebert_temp_expirement/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalImplTestBase.java	2009-06-23 00:53:36 UTC (rev 7431)
+++ branches/clebert_temp_expirement/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalImplTestBase.java	2009-06-23 00:55:33 UTC (rev 7432)
@@ -256,7 +256,7 @@
 
          removeRecordsForID(element);
       }
-
+      
       journal.debugWait();
    }
 

Modified: branches/clebert_temp_expirement/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalImplTestUnit.java
===================================================================
--- branches/clebert_temp_expirement/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalImplTestUnit.java	2009-06-23 00:53:36 UTC (rev 7431)
+++ branches/clebert_temp_expirement/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalImplTestUnit.java	2009-06-23 00:55:33 UTC (rev 7432)
@@ -22,6 +22,7 @@
 
 package org.jboss.messaging.tests.unit.core.journal.impl;
 
+import java.util.ArrayList;
 import java.util.List;
 
 import org.jboss.messaging.core.journal.EncodingSupport;
@@ -2962,7 +2963,6 @@
       {
 
          addTx(transactionID, i);
-         updateTx(i + 100);
          if (i % 10 == 0 && i > 0)
          {
             journal.forceMoveNextFile();
@@ -3035,6 +3035,63 @@
       assertEquals(0, journal.getDataFilesCount());
    }
 
+   public void testCompactingWithPendingTransaction() throws Exception
+   {
+      setup(2, 60 * 1024, true);
+
+      createJournal();
+      startJournal();
+      load();
+      
+      long transactionID = 0;
+
+      for (int i = 0; i < 500; i++)
+      {
+         add(i);
+         if (i % 10 == 0 && i > 0)
+         {
+            journal.forceMoveNextFile();
+         }
+         update(i);
+      }
+
+      for (int i = 500; i < 1000; i++)
+      {
+
+         addTx(transactionID, i);
+         updateTx(transactionID, i + 100);
+         if (i % 10 == 0)
+         {
+            journal.forceMoveNextFile();
+         }
+         commit(transactionID++);
+         update(i);
+      }
+
+      System.out.println("Number of Files: " + journal.getDataFilesCount());
+      
+      
+      for (int i = 0 ; i < 1000; i++)
+      {
+         if (!(i % 10 == 0))
+         {
+            delete(i);
+         }
+      }
+
+      journal.forceMoveNextFile();
+
+      System.out.println("Number of Files: " + journal.getDataFilesCount());
+
+      System.out.println("Before compact ****************************");
+      System.out.println(journal.debug());
+      System.out.println("*****************************************");
+      
+      
+      journal.compact();
+
+   }
+
    protected abstract int getAlignment();
 
 }

Modified: branches/clebert_temp_expirement/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/ReclaimerTest.java
===================================================================
--- branches/clebert_temp_expirement/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/ReclaimerTest.java	2009-06-23 00:53:36 UTC (rev 7431)
+++ branches/clebert_temp_expirement/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/ReclaimerTest.java	2009-06-23 00:55:33 UTC (rev 7432)
@@ -863,5 +863,27 @@
       {
          return transactionIDs;
       }
+
+      /* (non-Javadoc)
+       * @see org.jboss.messaging.core.journal.impl.JournalFile#decPendingTransaction()
+       */
+      public void decPendingTransaction()
+      {
+      }
+
+      /* (non-Javadoc)
+       * @see org.jboss.messaging.core.journal.impl.JournalFile#getPendingTransactions()
+       */
+      public int getPendingTransactions()
+      {
+         return 0;
+      }
+
+      /* (non-Javadoc)
+       * @see org.jboss.messaging.core.journal.impl.JournalFile#incPendingTransaction()
+       */
+      public void incPendingTransaction()
+      {
+      }
    }
 }

Modified: branches/clebert_temp_expirement/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java
===================================================================
--- branches/clebert_temp_expirement/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java	2009-06-23 00:53:36 UTC (rev 7431)
+++ branches/clebert_temp_expirement/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java	2009-06-23 00:55:33 UTC (rev 7432)
@@ -680,4 +680,11 @@
    {
    }
 
+   /* (non-Javadoc)
+    * @see org.jboss.messaging.core.journal.SequentialFileFactory#testFlush()
+    */
+   public void testFlush()
+   {
+   }
+
 }




More information about the jboss-cvs-commits mailing list