[jboss-cvs] JBoss Messaging SVN: r3884 - in trunk: src/main/org/jboss/messaging/core/journal/impl and 2 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Mon Mar 17 08:04:45 EDT 2008


Author: timfox
Date: 2008-03-17 08:04:44 -0400 (Mon, 17 Mar 2008)
New Revision: 3884

Added:
   trunk/src/main/org/jboss/messaging/core/journal/PreparedTransactionInfo.java
   trunk/src/main/org/jboss/messaging/core/journal/RecordInfo.java
   trunk/src/main/org/jboss/messaging/core/journal/impl/TransactionHolder.java
   trunk/tests/src/org/jboss/messaging/core/journal/impl/test/unit/FakeSequentialFileFactoryTest.java
   trunk/tests/src/org/jboss/messaging/core/journal/impl/test/unit/JournalImplTest.java
   trunk/tests/src/org/jboss/messaging/core/journal/impl/test/unit/JournalImplTestBase.java
   trunk/tests/src/org/jboss/messaging/core/journal/impl/test/unit/NIOSequentialFileFactoryTest.java
   trunk/tests/src/org/jboss/messaging/core/journal/impl/test/unit/RealJournalImplTest.java
   trunk/tests/src/org/jboss/messaging/core/journal/impl/test/unit/SequentialFileFactoryTestBase.java
   trunk/tests/src/org/jboss/messaging/core/journal/impl/test/unit/fakes/FakeSequentialFile.java
Removed:
   trunk/src/main/org/jboss/messaging/core/journal/RecordHandle.java
   trunk/src/main/org/jboss/messaging/core/journal/RecordHistory.java
   trunk/src/main/org/jboss/messaging/core/journal/impl/RecordHandleImpl.java
   trunk/src/main/org/jboss/messaging/core/journal/impl/RecordHistoryImpl.java
   trunk/tests/src/org/jboss/messaging/core/journal/impl/test/unit/JournalTest.java
Modified:
   trunk/src/main/org/jboss/messaging/core/journal/Journal.java
   trunk/src/main/org/jboss/messaging/core/journal/SequentialFile.java
   trunk/src/main/org/jboss/messaging/core/journal/SequentialFileFactory.java
   trunk/src/main/org/jboss/messaging/core/journal/impl/JournalFile.java
   trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java
   trunk/src/main/org/jboss/messaging/core/journal/impl/NIOSequentialFile.java
   trunk/src/main/org/jboss/messaging/core/journal/impl/NIOSequentialFileFactory.java
   trunk/tests/src/org/jboss/messaging/core/journal/impl/test/unit/fakes/FakeSequentialFileFactory.java
Log:
More journal work


Modified: trunk/src/main/org/jboss/messaging/core/journal/Journal.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/Journal.java	2008-03-17 10:31:09 UTC (rev 3883)
+++ trunk/src/main/org/jboss/messaging/core/journal/Journal.java	2008-03-17 12:04:44 UTC (rev 3884)
@@ -23,6 +23,7 @@
 
 import java.util.List;
 
+import org.jboss.messaging.core.server.MessagingComponent;
 
 /**
  * 
@@ -31,34 +32,40 @@
  * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
  *
  */
-public interface Journal
+public interface Journal extends MessagingComponent
 {
 	// Non transactional operations
 	
-	RecordHandle appendAddRecord(long id, byte[] record) throws Exception;
+	void appendAddRecord(long id, byte[] record) throws Exception;
 	
-	void appendUpdateRecord(RecordHandle handle, byte[] record) throws Exception;
+	void appendUpdateRecord(long id, byte[] record) throws Exception;
 	
-	void appendDeleteRecord(RecordHandle handle) throws Exception;
+	void appendDeleteRecord(long id) throws Exception;
 	
 	// Transactional operations
 	
-	RecordHandle appendAddRecordTransactional(long txID, long id, byte[] record, boolean done) throws Exception;
+	void appendAddRecordTransactional(long txID, long id, byte[] record, boolean done) throws Exception;
 	
-	void appendUpdateRecordTransactional(long txID, RecordHandle handle, byte[] record, boolean done) throws Exception;
+	void appendUpdateRecordTransactional(long txID, long id, byte[] record, boolean done) throws Exception;
 	
-	void appendDeleteRecordTransactional(long txID, RecordHandle handle, boolean done) throws Exception;
+	void appendDeleteRecordTransactional(long txID, long id, boolean done) throws Exception;
 	
+	//XA operations
 	
-//	RecordHandle appendAddRecordPrepare(long txID, long id, byte[] record, boolean done) throws Exception;
-//	
-//	void appendUpdateRecordPrepare(long txID, RecordHandle handle, byte[] record, boolean done) throws Exception;
-//	
-//	void appendDeleteRecordPrepare(long txID, RecordHandle handle, boolean done) throws Exception;
+	void appendAddRecordPrepare(long txID, long id, byte[] record, boolean done) throws Exception;
+	
+	void appendUpdateRecordPrepare(long txID, long id, byte[] record, boolean done) throws Exception;
+	
+	void appendDeleteRecordPrepare(long txID, long id, boolean done) throws Exception;
+	
+	void appendXACommitRecord(long txID) throws Exception;
+	
+	void appendXARollbackRecord(long txID) throws Exception;
 
 	
 	// Load
 	
-	List<RecordHistory> load() throws Exception;
+	void load(List<RecordInfo> committedRecords,
+			    List<PreparedTransactionInfo> preparedTransactions) throws Exception;
 	
 }

Added: trunk/src/main/org/jboss/messaging/core/journal/PreparedTransactionInfo.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/PreparedTransactionInfo.java	                        (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/journal/PreparedTransactionInfo.java	2008-03-17 12:04:44 UTC (rev 3884)
@@ -0,0 +1,27 @@
+package org.jboss.messaging.core.journal;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * 
+ * A PreparedTransactionInfo
+ * 
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ */
+public class PreparedTransactionInfo
+{
+	public final long id;
+	
+	public final List<RecordInfo> records = new ArrayList<RecordInfo>();
+	
+	public final Set<Long> recordsToDelete = new HashSet<Long>();
+	
+	public PreparedTransactionInfo(final long id)
+	{
+		this.id = id;
+	}
+}

Deleted: trunk/src/main/org/jboss/messaging/core/journal/RecordHandle.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/RecordHandle.java	2008-03-17 10:31:09 UTC (rev 3883)
+++ trunk/src/main/org/jboss/messaging/core/journal/RecordHandle.java	2008-03-17 12:04:44 UTC (rev 3884)
@@ -1,34 +0,0 @@
-/*
-  * JBoss, Home of Professional Open Source
-  * Copyright 2005, JBoss Inc., and individual contributors as indicated
-  * by the @authors tag. See the copyright.txt in the distribution for a
-  * full listing of individual contributors.
-  *
-  * This is free software; you can redistribute it and/or modify it
-  * under the terms of the GNU Lesser General Public License as
-  * published by the Free Software Foundation; either version 2.1 of
-  * the License, or (at your option) any later version.
-  *
-  * This software is distributed in the hope that it will be useful,
-  * but WITHOUT ANY WARRANTY; without even the implied warranty of
-  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
-  * Lesser General Public License for more details.
-  *
-  * You should have received a copy of the GNU Lesser General Public
-  * License along with this software; if not, write to the Free
-  * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
-  * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
-  */
-package org.jboss.messaging.core.journal;
-
-/**
- * 
- * A RecordHandle
- * 
- * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- *
- */
-public interface RecordHandle
-{
-	public long getID();
-}

Deleted: trunk/src/main/org/jboss/messaging/core/journal/RecordHistory.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/RecordHistory.java	2008-03-17 10:31:09 UTC (rev 3883)
+++ trunk/src/main/org/jboss/messaging/core/journal/RecordHistory.java	2008-03-17 12:04:44 UTC (rev 3884)
@@ -1,38 +0,0 @@
-/*
-  * JBoss, Home of Professional Open Source
-  * Copyright 2005, JBoss Inc., and individual contributors as indicated
-  * by the @authors tag. See the copyright.txt in the distribution for a
-  * full listing of individual contributors.
-  *
-  * This is free software; you can redistribute it and/or modify it
-  * under the terms of the GNU Lesser General Public License as
-  * published by the Free Software Foundation; either version 2.1 of
-  * the License, or (at your option) any later version.
-  *
-  * This software is distributed in the hope that it will be useful,
-  * but WITHOUT ANY WARRANTY; without even the implied warranty of
-  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
-  * Lesser General Public License for more details.
-  *
-  * You should have received a copy of the GNU Lesser General Public
-  * License along with this software; if not, write to the Free
-  * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
-  * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
-  */
-package org.jboss.messaging.core.journal;
-
-import java.util.List;
-
-/**
- * 
- * A RecordHistory
- * 
- * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- *
- */
-public interface RecordHistory
-{
-	RecordHandle getHandle();
-	
-	List<byte[]> getRecords();
-}

Added: trunk/src/main/org/jboss/messaging/core/journal/RecordInfo.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/RecordInfo.java	                        (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/journal/RecordInfo.java	2008-03-17 12:04:44 UTC (rev 3884)
@@ -0,0 +1,40 @@
+package org.jboss.messaging.core.journal;
+
+
+/**
+ * 
+ * A RecordInfo
+ * 
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ */
+public class RecordInfo
+{
+	public RecordInfo(final long id, final byte[] data, final boolean isUpdate)
+	{
+		this.id = id;
+		
+		this.data = data;
+		
+		this.isUpdate = isUpdate;
+	}
+	
+	public final long id;
+	
+	public final byte[] data;
+	
+	public boolean isUpdate;
+	
+	public int hashCode()
+	{
+		return (int)((id >>> 32) ^ id);
+	}
+	
+	public boolean equals(Object other)
+	{
+		RecordInfo r = (RecordInfo)other;
+		
+		return r.id == this.id;		
+	}
+	
+}

Modified: trunk/src/main/org/jboss/messaging/core/journal/SequentialFile.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/SequentialFile.java	2008-03-17 10:31:09 UTC (rev 3883)
+++ trunk/src/main/org/jboss/messaging/core/journal/SequentialFile.java	2008-03-17 12:04:44 UTC (rev 3884)
@@ -43,11 +43,11 @@
 	
 	void delete() throws Exception;
 
-	void write(ByteBuffer bytes) throws Exception;
+	int write(ByteBuffer bytes, boolean sync) throws Exception;
 	   
 	int read(ByteBuffer bytes) throws Exception;
 	
-	void reset() throws Exception;
+	void position(int pos) throws Exception;
 	
 	void close() throws Exception;
 }

Modified: trunk/src/main/org/jboss/messaging/core/journal/SequentialFileFactory.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/SequentialFileFactory.java	2008-03-17 10:31:09 UTC (rev 3883)
+++ trunk/src/main/org/jboss/messaging/core/journal/SequentialFileFactory.java	2008-03-17 12:04:44 UTC (rev 3884)
@@ -34,5 +34,5 @@
 {
 	SequentialFile createSequentialFile(String fileName, boolean sync) throws Exception;
 	
-	List<String> listFiles(String journalDir, String extension) throws Exception;
+	List<String> listFiles(String extension) throws Exception;
 }

Modified: trunk/src/main/org/jboss/messaging/core/journal/impl/JournalFile.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/impl/JournalFile.java	2008-03-17 10:31:09 UTC (rev 3883)
+++ trunk/src/main/org/jboss/messaging/core/journal/impl/JournalFile.java	2008-03-17 12:04:44 UTC (rev 3884)
@@ -39,9 +39,11 @@
 	
 	private final long orderingID;
 	
-	private int refCount;
-	
 	private int offset;
+	
+	private final Set<Long> positives = new HashSet<Long>();
+	
+	private final Set<Long> negatives = new HashSet<Long>();
 		
 	public JournalFile(final SequentialFile file, final long orderingID)
 	{
@@ -73,21 +75,15 @@
 	public SequentialFile getFile()
 	{
 		return file;
-	}
+	}	
 	
-	public void incRefCount()
+	public void addPositive(final long id)
 	{
-		refCount++;
+		this.positives.add(id);
 	}
 	
-	public void decRefCount()
+	public void addNegative(final long id)
 	{
-		refCount--;
+		this.negatives.add(id);
 	}
-	
-	public boolean isEmpty()
-	{
-		return refCount == 0;
-	}
-	
 }

Modified: trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java	2008-03-17 10:31:09 UTC (rev 3883)
+++ trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java	2008-03-17 12:04:44 UTC (rev 3884)
@@ -25,19 +25,20 @@
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Comparator;
+import java.util.HashSet;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Queue;
+import java.util.Set;
 import java.util.Timer;
 import java.util.TimerTask;
-import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.Semaphore;
 
 import org.jboss.messaging.core.journal.Journal;
-import org.jboss.messaging.core.journal.RecordHandle;
-import org.jboss.messaging.core.journal.RecordHistory;
+import org.jboss.messaging.core.journal.PreparedTransactionInfo;
+import org.jboss.messaging.core.journal.RecordInfo;
 import org.jboss.messaging.core.journal.SequentialFile;
 import org.jboss.messaging.core.journal.SequentialFileFactory;
 import org.jboss.messaging.core.logging.Logger;
@@ -53,6 +54,12 @@
 {
 	private static final Logger log = Logger.getLogger(JournalImpl.class);
 	
+	private static final int STATE_STOPPED = 0;
+	
+	private static final int STATE_STARTED = 1;
+	
+	private static final int STATE_LOADED = 2;
+	
 	// The sizes of primitive types
 	
 	private static final int SIZE_LONG = 8;
@@ -61,6 +68,10 @@
    
    private static final int SIZE_BYTE = 1;
    
+   public static final int MIN_FILE_SIZE = 1024;
+   
+   public static final int MIN_TASK_PERIOD = 5000;
+   
    //Record markers - they must be all unique
    
 	public static final byte ADD_RECORD = 11;
@@ -75,21 +86,31 @@
 	
 	public static final byte DELETE_RECORD_TX = 16;
 	
+	public static final byte ADD_RECORD_PREPARE = 17;
+	
+	public static final byte UPDATE_RECORD_PREPARE = 18;
+	
+	public static final byte DELETE_RECORD_PREPARE = 19;
+	
+	public static final byte XA_COMMIT_RECORD = 20;
+	
+	public static final byte XA_ROLLBACK_RECORD = 21;
+	
 	//End markers - they must all be unique
 	
-	public static final byte DONE = 21;
+	public static final byte DONE = 31;
 	
-	public static final byte TX_CONTINUE = 22;
+	public static final byte TX_CONTINUE = 32;
 	
-	public static final byte TX_DONE = 23;
+	public static final byte TX_DONE = 33;
 	
+	public static final byte XA_CONTINUE = 34;
+	
+	public static final byte XA_DONE = 35;
+	
 	public static final byte FILL_CHARACTER = 74; // Letter 'J' 
 		
-	
-	
-	  	
-	private final String journalDir;
-	
+
 	private final int fileSize;
 	
 	private final int minFiles;
@@ -111,11 +132,6 @@
 	
 	private final Queue<JournalFile> availableFiles = new ConcurrentLinkedQueue<JournalFile>();
 	
-	private final Map<Long, TransactionInfo> transactions = new ConcurrentHashMap<Long, TransactionInfo>();
-	
-	private final Map<Long, List<RecordHandle>> transactionalDeletes =
-		new ConcurrentHashMap<Long, List<RecordHandle>>();
-			
 	/*
 	 * We use a semaphore rather than synchronized since it performs better when contended
 	 */
@@ -125,23 +141,49 @@
 		
 	private volatile JournalFile currentFile ;
 		
-	private volatile boolean loaded;
+	private volatile int state;
 	
 	private volatile long lastOrderingID;
 	
 	private final Timer timer = new Timer(true);
 	
-	private final TimerTask reclaimerTask = new ReclaimerTask();
+	private TimerTask reclaimerTask;
 	
-	private final TimerTask availableFilesTask = new AvailableFilesTask();
+	private TimerTask availableFilesTask;
 	
-
 	
-	public JournalImpl(final String journalDir, final int fileSize, final int minFiles, final int minAvailableFiles,
+	public JournalImpl(final int fileSize, final int minFiles, final int minAvailableFiles,
 			             final boolean sync, final SequentialFileFactory fileFactory, final long taskPeriod,
 			             final String filePrefix, final String fileExtension)
 	{
-		this.journalDir = journalDir;
+		if (fileSize < MIN_FILE_SIZE)
+		{
+			throw new IllegalArgumentException("File size cannot be less than " + MIN_FILE_SIZE + " bytes");
+		}
+		if (minFiles < 2)
+		{
+			throw new IllegalArgumentException("minFiles cannot be less than 2");
+		}
+		if (minAvailableFiles < 2)
+		{
+			throw new IllegalArgumentException("minAvailableFiles cannot be less than 2");
+		}
+		if (fileFactory == null)
+		{
+			throw new NullPointerException("fileFactory is null");
+		}
+		if (taskPeriod < MIN_TASK_PERIOD)
+		{
+			throw new IllegalArgumentException("taskPeriod cannot be less than " + MIN_TASK_PERIOD);
+		}
+		if (filePrefix == null)
+		{
+			throw new NullPointerException("filePrefix is null");
+		}
+		if (fileExtension == null)
+		{
+			throw new NullPointerException("fileExtension is null");
+		}
 		
 		this.fileSize = fileSize;
 		
@@ -167,77 +209,123 @@
 		return ByteBuffer.allocateDirect(size);
 	}
 	
-	public RecordHandle appendAddRecord(final long id, final byte[] record) throws Exception
+	public void appendAddRecord(final long id, final byte[] record) throws Exception
 	{
-		if (!loaded)
+		if (state != STATE_LOADED)
 		{
 			throw new IllegalStateException("Journal must be loaded first");
 		}
 		
-		//TODO optimise to avoid creating a new byte buffer
-		
 		int size = SIZE_BYTE + SIZE_LONG + SIZE_INT + record.length + SIZE_BYTE;
 		
 		ByteBuffer bb = ByteBuffer.wrap(new byte[size]);
 		
-		bb.put(ADD_RECORD);
+		bb.put(ADD_RECORD);		
+		bb.putLong(id);		
+		bb.putInt(record.length);		
+		bb.put(record);		
+		bb.put(DONE);	
 		
-		bb.putLong(id);
-		
-		bb.putInt(record.length);
-		
-		bb.put(record);
-		
-		bb.put(DONE);
-		
 		bb.flip();
 			             				
 		lock.acquire();
 		
 		try
 		{   					
-   		checkFile(size);
-   		   		
-   		currentFile.getFile().write(bb);		
-   		
+   		checkFile(size);   		   	
+   		currentFile.getFile().write(bb, true);		   		
    		currentFile.extendOffset(size);
-   		
-   		RecordHandleImpl rh = new RecordHandleImpl(id);
-   		
-   		rh.addFile(currentFile);
-   		
-   		return rh;
 		}
 		finally
 		{
 			lock.release();
 		}
 	}
-	
-	public RecordHandle appendAddRecordTransactional(final long txID, final long id,
-			                                           final byte[] record, final boolean done) throws Exception
+			
+	public void appendUpdateRecord(final long id, final byte[] record) throws Exception
 	{
-		if (!loaded)
+		if (state != STATE_LOADED)
 		{
 			throw new IllegalStateException("Journal must be loaded first");
 		}
 		
 		//TODO optimise to avoid creating a new byte buffer
 		
-		int size = SIZE_BYTE + SIZE_LONG + SIZE_LONG + SIZE_INT + record.length + SIZE_BYTE;
+		int size = SIZE_BYTE + SIZE_LONG + SIZE_INT + record.length + SIZE_BYTE;
 		
 		ByteBuffer bb = ByteBuffer.wrap(new byte[size]);
 		
-		bb.put(ADD_RECORD);
+		bb.put(UPDATE_RECORD);		
+		bb.putLong(id);		
+		bb.putInt(record.length);		
+		bb.put(record);		
+		bb.put(DONE);
 		
+		bb.flip();
+		
+		lock.acquire();
+		
+		try
+		{   		
+   		checkFile(size);   		   		
+   		currentFile.getFile().write(bb, true);		   		
+   		currentFile.extendOffset(size);
+		}
+		finally
+		{
+			lock.release();
+		}				
+	}
+			
+	public void appendDeleteRecord(long id) throws Exception
+	{
+		if (state != STATE_LOADED)
+		{
+			throw new IllegalStateException("Journal must be loaded first");
+		}
+		
+		int size = SIZE_BYTE + SIZE_LONG + SIZE_BYTE;
+		
+		ByteBuffer buffer = ByteBuffer.wrap(new byte[size]);
+		
+		buffer.put(DELETE_RECORD);		
+		buffer.putLong(id);		
+		buffer.put(DONE);
+		
+		buffer.flip();
+								
+		lock.acquire();
+		
+		try
+		{   		
+   		checkFile(size);   		   		
+   		currentFile.getFile().write(buffer, true);
+   		currentFile.extendOffset(size);
+		}
+		finally
+		{
+			lock.release();
+		}				
+	}		
+
+	public void appendAddRecordTransactional(final long txID, final long id,
+			                                   final byte[] record, final boolean done) throws Exception
+   {
+		if (state != STATE_LOADED)
+		{
+			throw new IllegalStateException("Journal must be loaded first");
+		}
+
+		int size = SIZE_BYTE + SIZE_LONG + SIZE_LONG + SIZE_INT + record.length + SIZE_BYTE;
+
+		ByteBuffer bb = ByteBuffer.wrap(new byte[size]);
+
+		bb.put(ADD_RECORD_TX);
 		bb.putLong(txID);
-		
 		bb.putLong(id);
-		
 		bb.putInt(record.length);
-		
 		bb.put(record);
-		
+
 		if (done)
 		{
 			bb.put(TX_DONE);
@@ -246,34 +334,16 @@
 		{
 			bb.put(TX_CONTINUE);
 		}
-		
+
 		bb.flip();
-			             				
+
 		lock.acquire();
-		
+
 		try
 		{   					
-   		checkFile(size);
-   		   		
-   		currentFile.getFile().write(bb);		
-   		
-   		currentFile.extendOffset(size);
-   		
-   		RecordHandleImpl rh = new RecordHandleImpl(id);
-   		
-   		rh.addFile(currentFile);
-   		
-   		if (done)
-   		{   		
-      		List<RecordHandle> list = transactionalDeletes.remove(txID);
-      		
-      		if (list != null)
-   			{
-   				releaseDeletes(list);
-   			}      		
-   		} 
-   		
-   		return rh;
+			checkFile(size);
+			currentFile.getFile().write(bb, done);		
+			currentFile.extendOffset(size);
 		}
 		finally
 		{
@@ -281,30 +351,32 @@
 		}
 	}
 	
-	public void appendUpdateRecord(final RecordHandle handle, final byte[] record) throws Exception
+	public void appendUpdateRecordTransactional(final long txID, final long id,
+			final byte[] record, final boolean done) throws Exception
 	{
-		if (!loaded)
+		if (state != STATE_LOADED)
 		{
 			throw new IllegalStateException("Journal must be loaded first");
 		}
+
+		int size = SIZE_BYTE + SIZE_LONG + SIZE_LONG + SIZE_INT + record.length + SIZE_BYTE;
 		
-		RecordHandleImpl rh = (RecordHandleImpl)handle;
-		
-		//TODO optimise to avoid creating a new byte buffer
-		
-		int size = SIZE_BYTE + SIZE_LONG + SIZE_INT + record.length + SIZE_BYTE;
-		
 		ByteBuffer bb = ByteBuffer.wrap(new byte[size]);
 		
-		bb.put(UPDATE_RECORD);
-		
-		bb.putLong(rh.getID());
-		
-		bb.putInt(record.length);
-		
+		bb.put(UPDATE_RECORD_TX);		
+		bb.putLong(txID);		
+		bb.putLong(id);		
+		bb.putInt(record.length);		
 		bb.put(record);
 		
-		bb.put(DONE);
+	   if (done)
+	   {
+	   	bb.put(TX_DONE);
+	   }
+	   else
+	   {
+	   	bb.put(TX_CONTINUE);
+	   }
 		
 		bb.flip();
 		
@@ -312,13 +384,9 @@
 		
 		try
 		{   		
-   		checkFile(size);
-   		   		
-   		currentFile.getFile().write(bb);		
-   		
-   		currentFile.extendOffset(size);
-   		
-   		rh.addFile(currentFile);
+   		checkFile(size);   		   	
+   		currentFile.getFile().write(bb, done);		   		
+   		currentFile.extendOffset(size);   		   		   		
 		}
 		finally
 		{
@@ -326,39 +394,113 @@
 		}				
 	}
 	
-	public void appendUpdateRecordTransactional(final long txID, final RecordHandle handle,
-			final byte[] record, final boolean done) throws Exception
+	public void appendDeleteRecordTransactional(final long txID, final long id, final boolean done) throws Exception
 	{
-		if (!loaded)
+		if (state != STATE_LOADED)
 		{
 			throw new IllegalStateException("Journal must be loaded first");
 		}
 		
-		RecordHandleImpl rh = (RecordHandleImpl)handle;
+		int size = SIZE_BYTE + SIZE_LONG + SIZE_LONG + SIZE_BYTE;
 		
-		//TODO optimise to avoid creating a new byte buffer
+		ByteBuffer buffer = ByteBuffer.wrap(new byte[size]);
 		
+		buffer.put(DELETE_RECORD_TX);		
+		buffer.putLong(txID);		
+		buffer.putLong(id);
+		
+		if (done)
+		{
+			buffer.put(TX_DONE);
+		}
+		else
+		{
+			buffer.put(TX_CONTINUE);
+		}
+				
+		buffer.flip();
+								
+		lock.acquire();
+		
+		try
+		{   		
+   		checkFile(size);   		   	
+   		currentFile.getFile().write(buffer, done);		   		
+   		currentFile.extendOffset(size);   		
+		}
+		finally
+		{
+			lock.release();
+		}				
+	}	
+	
+	
+	public void appendAddRecordPrepare(final long txID, final long id, final byte[] record, final boolean done) throws Exception
+	{
+		if (state != STATE_LOADED)
+		{
+			throw new IllegalStateException("Journal must be loaded first");
+		}
+
 		int size = SIZE_BYTE + SIZE_LONG + SIZE_LONG + SIZE_INT + record.length + SIZE_BYTE;
-		
+
 		ByteBuffer bb = ByteBuffer.wrap(new byte[size]);
-		
-		bb.put(UPDATE_RECORD);
-		
+
+		bb.put(ADD_RECORD_PREPARE);
 		bb.putLong(txID);
+		bb.putLong(id);
+		bb.putInt(record.length);
+		bb.put(record);
+
+		if (done)
+		{
+			bb.put(XA_DONE);
+		}
+		else
+		{
+			bb.put(XA_CONTINUE);
+		}
+
+		bb.flip();
+
+		lock.acquire();
+
+		try
+		{   					
+			checkFile(size);
+			currentFile.getFile().write(bb, done);		
+			currentFile.extendOffset(size);			
+		}
+		finally
+		{
+			lock.release();
+		}
+	}
+	
+	public void appendUpdateRecordPrepare(final long txID, final long id, final byte[] record, final boolean done) throws Exception
+	{
+		if (state != STATE_LOADED)
+		{
+			throw new IllegalStateException("Journal must be loaded first");
+		}
 		
-		bb.putLong(rh.getID());
+		int size = SIZE_BYTE + SIZE_LONG + SIZE_LONG + SIZE_INT + record.length + SIZE_BYTE;
 		
-		bb.putInt(record.length);
+		ByteBuffer bb = ByteBuffer.wrap(new byte[size]);
 		
+		bb.put(UPDATE_RECORD_PREPARE);		
+		bb.putLong(txID);		
+		bb.putLong(id);		
+		bb.putInt(record.length);		
 		bb.put(record);
 		
 	   if (done)
 	   {
-	   	bb.put(TX_DONE);
+	   	bb.put(XA_DONE);
 	   }
 	   else
 	   {
-	   	bb.put(TX_CONTINUE);
+	   	bb.put(XA_CONTINUE);
 	   }
 		
 		bb.flip();
@@ -367,58 +509,39 @@
 		
 		try
 		{   		
-   		checkFile(size);
-   		   		
-   		currentFile.getFile().write(bb);		
-   		
-   		currentFile.extendOffset(size);
-   		   		
-   		rh.addFile(currentFile);
-   		
-   		if (done)
-   		{   		
-      		List<RecordHandle> list = transactionalDeletes.remove(txID);
-      		
-      		if (list != null)
-   			{
-   				releaseDeletes(list);
-   			}      		
-   		}   		
+   		checkFile(size);   		   		
+   		currentFile.getFile().write(bb, done);		   		
+   		currentFile.extendOffset(size);   		   		   		
 		}
 		finally
 		{
 			lock.release();
-		}				
+		}		
 	}
 	
-	private void releaseDeletes(final List<RecordHandle> deletes)
+	public void appendDeleteRecordPrepare(final long txID, final long id, final boolean done) throws Exception
 	{
-		for (RecordHandle handle: deletes)
+		if (state != STATE_LOADED)
 		{
-			RecordHandleImpl rh = (RecordHandleImpl)handle;
-			
-			rh.recordDeleted();
-		}
-	}
-	
-	public void appendDeleteRecord(final RecordHandle handle) throws Exception
-	{
-		if (!loaded)
-		{
 			throw new IllegalStateException("Journal must be loaded first");
 		}
 		
-		RecordHandleImpl rh = (RecordHandleImpl)handle;
+		int size = SIZE_BYTE + SIZE_LONG + SIZE_LONG + SIZE_BYTE;
 		
-		int size = SIZE_BYTE + SIZE_LONG + SIZE_BYTE;
-		
 		ByteBuffer buffer = ByteBuffer.wrap(new byte[size]);
 		
-		buffer.put(DELETE_RECORD);
+		buffer.put(DELETE_RECORD_PREPARE);		
+		buffer.putLong(txID);		
+		buffer.putLong(id);
 		
-		buffer.putLong(rh.getID());
-		
-		buffer.put(DONE);
+		if (done)
+		{
+			buffer.put(XA_DONE);
+		}
+		else
+		{
+			buffer.put(XA_CONTINUE);
+		}
 				
 		buffer.flip();
 								
@@ -426,13 +549,9 @@
 		
 		try
 		{   		
-   		checkFile(size);
-   		   		
-   		currentFile.getFile().write(buffer);		
-   		
-   		currentFile.extendOffset(size);
-   		
-   		rh.recordDeleted();
+   		checkFile(size);   		   	
+   		currentFile.getFile().write(buffer, done);		   		
+   		currentFile.extendOffset(size);   		   		
 		}
 		finally
 		{
@@ -440,33 +559,51 @@
 		}				
 	}
 	
-	public void appendDeleteRecordTransactional(final long txID, final RecordHandle handle, final boolean done) throws Exception
+	public void appendXACommitRecord(final long txID) throws Exception
 	{
-		if (!loaded)
+		if (state != STATE_LOADED)
 		{
 			throw new IllegalStateException("Journal must be loaded first");
 		}
 		
-		RecordHandleImpl rh = (RecordHandleImpl)handle;
+		int size = SIZE_BYTE + SIZE_LONG + SIZE_BYTE;
 		
-		int size = SIZE_BYTE + SIZE_LONG + SIZE_LONG + SIZE_BYTE;
-		
 		ByteBuffer buffer = ByteBuffer.wrap(new byte[size]);
 		
-		buffer.put(DELETE_RECORD);
+		buffer.put(XA_COMMIT_RECORD);		
+		buffer.putLong(txID);		
+		buffer.put(XA_DONE);		
 		
-		buffer.putLong(txID);
+		buffer.flip();
 		
-		buffer.putLong(rh.getID());
+		lock.acquire();
 		
-		if (done)
-		{
-			buffer.put(TX_DONE);
+		try
+		{   		
+   		checkFile(size);   		   		
+   		currentFile.getFile().write(buffer, true);		   		
+   		currentFile.extendOffset(size);
 		}
-		else
+		finally
 		{
-			buffer.put(TX_CONTINUE);
+			lock.release();
+		}		
+	}
+	
+	public void appendXARollbackRecord(final long txID) throws Exception
+	{
+		if (state != STATE_LOADED)
+		{
+			throw new IllegalStateException("Journal must be loaded first");
 		}
+		
+		int size = SIZE_BYTE + SIZE_LONG + SIZE_BYTE;
+		
+		ByteBuffer buffer = ByteBuffer.wrap(new byte[size]);
+		
+		buffer.put(XA_ROLLBACK_RECORD);		
+		buffer.putLong(txID);		
+		buffer.put(XA_DONE);
 				
 		buffer.flip();
 								
@@ -474,52 +611,32 @@
 		
 		try
 		{   		
-   		checkFile(size);
-   		   		
-   		currentFile.getFile().write(buffer);		
-   		
+   		checkFile(size);   		   	
+   		currentFile.getFile().write(buffer, true);		   		
    		currentFile.extendOffset(size);
-   		
-   		//It's a transactional delete so we need to make sure file doesn't get deleted
-   		rh.addFile(currentFile);
-   		
-   		List<RecordHandle> list = transactionalDeletes.get(txID);
-   		
-   		if (list == null)
-   		{
-   			list = new ArrayList<RecordHandle>();
-   			
-   			transactionalDeletes.put(txID, list);
-   		}
-   		
-   		list.add(handle);
-   		
-   		if (done)
-   		{
-   			transactionalDeletes.remove(txID);
-   			
-   			releaseDeletes(list);
-   		}
 		}
 		finally
 		{
 			lock.release();
-		}				
-	}	
+		}		
+	}
 		
-	public List<RecordHistory> load() throws Exception
+	public void load(final List<RecordInfo> committedRecords,
+		              final List<PreparedTransactionInfo> preparedTransactions) throws Exception
 	{
-		if (loaded)
+		if (state != STATE_STARTED)
 		{
-			throw new IllegalStateException("Journal is already loaded");
+			throw new IllegalStateException("Journal must be in started state");
 		}
 		
-		log.info("Loading...");
+		Set<Long> recordsToDelete = new HashSet<Long>();
 		
-		List<String> fileNames = fileFactory.listFiles(journalDir, fileExtension);
+		Map<Long, TransactionHolder> transactions = new LinkedHashMap<Long, TransactionHolder>();
 		
-		log.info("There are " + fileNames.size() + " files in directory");
+		List<RecordInfo> records = new ArrayList<RecordInfo>();
 		
+		List<String> fileNames = fileFactory.listFiles(fileExtension);
+		
 		List<JournalFile> orderedFiles = new ArrayList<JournalFile>(fileNames.size());
 				
 		for (String fileName: fileNames)
@@ -535,14 +652,12 @@
 			bb.flip();
 			
 			long orderingID = bb.getLong();
+						
+			orderedFiles.add(new JournalFile(file, orderingID));
 			
-			file.reset();
-							
-			orderedFiles.add(new JournalFile(file, orderingID));
+			file.close();
 		}
 		
-		log.info("minFiles is " + minFiles);
-		
 		int createNum = minFiles - orderedFiles.size();
 		
 		//Preallocate some more if necessary
@@ -552,10 +667,8 @@
 			
 			orderedFiles.add(file);
 			
-			log.info("Created new file");
+			file.getFile().close();
 		}
-		
-		log.info("Done creating new ones");
 			
 		//Now order them by ordering id - we can't use the file name for ordering since we can re-use files
 		
@@ -572,14 +685,14 @@
 
 		Collections.sort(orderedFiles, new JournalFileComparator());
 		
-		Map<Long, RecordHistory> histories = new LinkedHashMap<Long, RecordHistory>();
-				
+		int lastDataPos = -1;
+		
 		for (JournalFile file: orderedFiles)
-		{
-			log.info("Loading file, ordering id is " + file.getOrderingID());
-			
+		{	
 			ByteBuffer bb = ByteBuffer.wrap(new byte[fileSize]);
 			
+			file.getFile().open();
+			
 			int bytesRead = file.getFile().read(bb);
 			
 			if (bytesRead != fileSize)
@@ -601,14 +714,12 @@
 			{
 				byte recordType = bb.get();
 				
-				log.info("recordtype is " + recordType);
-				
 				int pos = bb.position();
 				
 				switch(recordType)
 				{
 					case ADD_RECORD:
-					{					
+					{									
 						long id = bb.getLong();
 						
 						int size = bb.getInt();
@@ -624,9 +735,9 @@
 							repairFrom(pos, file);
 						}
 						else
-						{						
-							handleAddRecord(id, file, record, histories);
-							
+						{																				
+							records.add(new RecordInfo(id, record, false));
+
 							hasData = true;							
 						}
 												
@@ -650,7 +761,7 @@
 						}
 						else
 						{					
-							handleUpdateRecord(id, file, record, histories);
+							records.add(new RecordInfo(id, record, true));
 							
 							hasData = true;							
 						}
@@ -669,7 +780,7 @@
 						}
 						else
 						{						
-							handleDeleteRecord(id, histories);
+							recordsToDelete.add(id);
 							
 							hasData = true;							
 						}
@@ -696,8 +807,24 @@
 						}
 						else
 						{						
-							handleTransactionalRecord(ADD_RECORD, txID, id, file, record, end, histories);
+							TransactionHolder tx = transactions.get(txID);
 							
+							if (tx == null)
+							{
+								tx = new TransactionHolder(txID);
+								
+								transactions.put(txID, tx);
+							}
+							
+							tx.recordInfos.add(new RecordInfo(id, record, false));
+							
+							if (end == TX_DONE)
+							{
+								transactions.remove(txID);
+								
+								records.addAll(tx.recordInfos);
+							}
+
 							hasData = true;							
 						}
 					
@@ -723,8 +850,22 @@
 						}
 						else
 						{					
-							handleTransactionalRecord(UPDATE_RECORD, txID, id, file, record, end, histories);
+							TransactionHolder tx = transactions.get(txID);
 							
+							if (tx == null)
+							{
+								throw new IllegalStateException("Cannot find tx with id " + txID);
+							}
+							
+							tx.recordInfos.add(new RecordInfo(id, record, true));
+							
+							if (end == TX_DONE)
+							{
+								transactions.remove(txID);
+								
+								records.addAll(tx.recordInfos);
+							}
+							
 							hasData = true;							
 						}
 											
@@ -744,13 +885,200 @@
 						}
 						else
 						{					
-							handleTransactionalRecord(DELETE_RECORD, txID, id, file, null, end, histories);
+							TransactionHolder tx = transactions.get(txID);
 							
+							if (tx == null)
+							{
+								throw new IllegalStateException("Cannot find tx with id " + txID);
+							}
+							
+							tx.recordsToDelete.add(id);
+							
+							if (end == TX_DONE)
+							{
+								transactions.remove(txID);
+								
+								records.addAll(tx.recordInfos);
+								
+								recordsToDelete.addAll(tx.recordsToDelete);
+							} 
+							
 							hasData = true;							
 						}
 											
 						break;
 					}	
+					case ADD_RECORD_PREPARE:
+					{
+						long txID = bb.getLong();
+						
+						long id = bb.getLong();
+						
+						int size = bb.getInt();
+						
+						byte[] record = new byte[size];
+						
+						bb.get(record);
+						
+						byte end = bb.get();
+						
+						if (end != XA_DONE && end != XA_CONTINUE)
+						{
+							repairFrom(pos, file);
+						}
+						else
+						{						
+							TransactionHolder tx = transactions.get(txID);
+							
+							if (tx == null)
+							{
+								tx = new TransactionHolder(txID);
+								
+								transactions.put(txID, tx);
+							}
+							
+							tx.recordInfos.add(new RecordInfo(id, record, false));
+							
+							if (end == XA_DONE)
+							{
+								tx.prepared = true;
+							}
+							
+							hasData = true;							
+						}
+						
+						break;
+					}
+					case UPDATE_RECORD_PREPARE:
+					{
+						long txID = bb.getLong();
+						
+						long id = bb.getLong();
+						
+						int size = bb.getInt();
+						
+						byte[] record = new byte[size];
+						
+						bb.get(record);
+						
+						byte end = bb.get();
+						
+						if (end != XA_CONTINUE && end != XA_DONE)
+						{
+							repairFrom(pos, file);
+						}
+						else
+						{					
+							TransactionHolder tx = transactions.get(txID);
+							
+							if (tx == null)
+							{
+								throw new IllegalStateException("Cannot find tx with id " + txID);
+							}
+							
+							tx.recordInfos.add(new RecordInfo(id, record, true));
+							
+							if (end == XA_DONE)
+							{
+								tx.prepared = true;
+							}
+							
+							hasData = true;							
+						}
+											
+						break;
+					}
+					case DELETE_RECORD_PREPARE:
+					{
+						long txID = bb.getLong();
+						
+						long id = bb.getLong();
+						
+						byte end = bb.get();
+						
+						if (end != XA_CONTINUE && end != XA_DONE)
+						{
+							repairFrom(pos, file);
+						}
+						else
+						{					
+							TransactionHolder tx = transactions.get(txID);
+							
+							if (tx == null)
+							{
+								throw new IllegalStateException("Cannot find tx with id " + txID);
+							}
+							
+							tx.recordsToDelete.add(id);
+							
+							if (end == XA_DONE)
+							{
+								tx.prepared = true;
+							}	
+							
+							hasData = true;							
+						}
+											
+						break;
+					}
+					case XA_COMMIT_RECORD:
+					{
+						long txID = bb.getLong();
+						
+						byte end = bb.get();
+						
+						if (end != XA_DONE)
+						{
+							repairFrom(pos, file);
+						}
+						else
+						{
+							TransactionHolder tx = transactions.remove(txID);
+							
+							if (tx == null)
+							{
+								throw new IllegalStateException("Cannot find tx with id " + txID);
+							}
+							
+							if (!tx.prepared)
+							{
+								throw new IllegalStateException("Can't commit transaction - it is not prepared " + txID);
+							}
+							
+							records.addAll(tx.recordInfos);
+							
+							recordsToDelete.addAll(tx.recordsToDelete);
+						}
+						
+						break;
+					}
+					case XA_ROLLBACK_RECORD:
+					{
+						long txID = bb.getLong();
+						
+						byte end = bb.get();
+						
+						if (end != XA_DONE)
+						{
+							repairFrom(pos, file);
+						}
+						else
+						{
+							TransactionHolder tx = transactions.remove(txID);
+							
+							if (tx == null)
+							{
+								throw new IllegalStateException("Cannot find tx with id " + txID);
+							}
+							
+							if (!tx.prepared)
+							{
+								throw new IllegalStateException("Can't roll back transaction - it is not prepared " + txID);
+							}
+						}
+						
+						break;
+					}
 					case FILL_CHARACTER:						
 					{
 						//End of records in file - we check the file only contains fill characters from this point
@@ -773,110 +1101,157 @@
 								                         " is corrupt, invalid record type " + recordType);
 					}
 				}
+				
+				if (recordType != FILL_CHARACTER)
+				{
+					lastDataPos = pos;
+				}
 			}
-				
+						
 			if (hasData)
-			{
-				log.info("Adding to files");
+			{			
+				files.add(file);
 				
-				files.add(file);
+				//Files are always maintained closed - there may be a lot of them and we don't want to run out
+				//of file handles
+				file.getFile().close();				
 			}
 			else
-			{
-				log.info("Adding to available files");
-				
+			{				
 				//Empty files with no data
 				availableFiles.add(file);
+				
+				//Position it ready for writing
+				file.getFile().position(SIZE_LONG);
 			}								
-		}				
-						
+		}			
+									
 		//Now it's possible that some of the files are no longer needed
 		
 		checkFilesForReclamation();
+				
+		//Check we have enough available files
 		
+		checkAndCreateAvailableFiles();
+						
 		for (JournalFile file: files)
 		{
-			currentFile = file;
+			currentFile = file;						
 		}
 		
-		//Check we have enough available files
+		if (currentFile != null)
+		{		
+			currentFile.getFile().open();
 		
-		checkAndCreateAvailableFiles();
-				
-		if (currentFile == null)
+			currentFile.getFile().position(lastDataPos);
+		}
+		else
 		{
 			currentFile = availableFiles.remove();
 			
 			files.add(currentFile);
 		}				
 		
-		//Close all files apart from the current one
+		startTasks();
+				
+		for (RecordInfo record: records)
+		{
+			if (!recordsToDelete.contains(record.id))
+			{
+				committedRecords.add(record);
+			}
+		}
 		
-		for (JournalFile file: files)
+		for (TransactionHolder transaction: transactions.values())
 		{
-			if (file != currentFile)
+			if (!transaction.prepared)
 			{
-				file.getFile().close();
+				log.warn("Uncommitted transaction with id " + transaction.transactionID + " found and discarded");
 			}
+			else
+			{
+				PreparedTransactionInfo info = new PreparedTransactionInfo(transaction.transactionID);
+				
+				info.records.addAll(transaction.recordInfos);
+				
+				info.recordsToDelete.addAll(transaction.recordsToDelete);
+				
+				preparedTransactions.add(info);
+			}
 		}
-						
-		startTasks();
-		
-		loaded = true;
-		
-		return new ArrayList<RecordHistory>(histories.values());
+				
+		state = STATE_LOADED;
 	}
-	
-	private void repairFrom(int pos, JournalFile file) throws Exception
-	{
-		log.warn("Corruption has been detected in file: " + file.getFile().getFileName() +
-				   " in the record that starts at position " + pos + ". " + 
-				   "The most likely cause is that a crash occurred in the previous run. The corrupt record will be discarded.");
-		
-		file.getFile().fill(pos, fileSize - pos, FILL_CHARACTER);
-	}
-	
+			
 	public void checkAndCreateAvailableFiles() throws Exception
 	{
-		log.info("Checking if we need to create more files");
-		
+		//log.info("min available " + minAvailableFiles + " avail: " + availableFiles.size() + " files: " + files.size());
 		int filesToCreate = minAvailableFiles - availableFiles.size();
 		
 		for (int i = 0; i < filesToCreate; i++)
 		{
 			JournalFile file = createFile();
 			
+			//log.info("Creating new file");
+			
 			availableFiles.add(file);
 		}
 	}
 	
-	public void stop() throws Exception
+	// MessagingComponent implementation ---------------------------------------------------
+	
+	public synchronized void start()
 	{
-		reclaimerTask.cancel();
+		if (state != STATE_STOPPED)
+		{
+			throw new IllegalStateException("Journal is not stopped");
+		}
 		
-		availableFilesTask.cancel();
+		state = STATE_STARTED;
+	}
+	
+	public synchronized void stop() throws Exception
+	{
+		if (state == STATE_STOPPED)
+		{
+			throw new IllegalStateException("Journal is already stopped");
+		}
 		
-		for (JournalFile file: files)
+		if (reclaimerTask != null)
 		{
-			file.getFile().close();
+			reclaimerTask.cancel();
 		}
 		
+		if (availableFilesTask != null)
+		{
+			availableFilesTask.cancel();
+		}
+		
+		if (currentFile != null)
+		{
+			currentFile.getFile().close();
+		}
+		
 		for (JournalFile file: availableFiles)
 		{
 			file.getFile().close();
 		}
 
-		this.currentFile = null;
+		currentFile = null;
 		
 		files.clear();
 		
-		availableFiles.clear();			
+		availableFiles.clear();		
+		
+		state = STATE_STOPPED;
 	}
 	
 	public void startTasks()
 	{
+		reclaimerTask = new ReclaimerTask();
 		timer.schedule(reclaimerTask, taskPeriod, taskPeriod);
 		
+		availableFilesTask = new AvailableFilesTask();
 		timer.schedule(availableFilesTask, taskPeriod, taskPeriod);
 	}
 	
@@ -894,11 +1269,10 @@
 	
 	public void checkFilesForReclamation() throws Exception
 	{		
-		log.info("checking files for reclamation");
-		
 		for (JournalFile file: files)
 		{		
-   		if (file.isEmpty() && file != currentFile)
+			//TODO reclamation
+   		if (false && file != currentFile)
    		{
    			//File can be reclaimed
    			
@@ -917,135 +1291,39 @@
    			//Note we MUST re-fill it - otherwise we won't be able to detect corrupt records
    			sf.fill(0, fileSize, FILL_CHARACTER);
    			
-   			sf.write(bb);
+   			sf.write(bb, true);
    			
    			JournalFile jf = new JournalFile(sf, newOrderingID);
    			
+   			sf.position(SIZE_LONG);
+   			
+   			jf.resetOffset();
+   			
+   			jf.extendOffset(SIZE_LONG);
+   			
    			availableFiles.add(jf);   		
    		}
 		}
 	}
 		
 	// Private -----------------------------------------------------------------------------
-	
-	private void playTransaction(final TransactionInfo tx, final Map<Long, RecordHistory> histories)
-	{
-		for (TransactionEntry entry: tx.entries)
-		{
-			switch (entry.type)
-			{
-				case ADD_RECORD:
-				{
-					handleAddRecord(entry.id, entry.file, entry.record, histories);
-					
-					break;
-				}
-				case UPDATE_RECORD:
-				{
-					handleUpdateRecord(entry.id, entry.file, entry.record, histories);
-					
-					break;
-				}
-				case DELETE_RECORD:
-				{
-					handleDeleteRecord(entry.id, histories);
-					
-					break;
-				}
-				default:
-				{
-					throw new IllegalStateException("Invalid record type " + entry.type);
-				}
-			}
-		}
-	}
-	
-	private void handleAddRecord(final long id, final JournalFile file,
-			                       final byte[] record, final Map<Long, RecordHistory> histories)
-	{
-		RecordHandleImpl handle = new RecordHandleImpl(id);
 		
-		handle.addFile(file);
-		
-		RecordHistoryImpl history = new RecordHistoryImpl(handle);
-		
-		history.addRecord(record);
-		
-		histories.put(id, history);
-	}
-	
-	private void handleUpdateRecord(final long id, final JournalFile file,
-         final byte[] record, final Map<Long, RecordHistory> histories)
-   {
-		RecordHistoryImpl history = (RecordHistoryImpl)histories.get(id);
-		
-		if (history == null)
-		{
-			throw new IllegalStateException("Cannot find record (update) " + id);
-		}
-		
-		RecordHandleImpl handle = (RecordHandleImpl)history.getHandle();
-		
-		handle.addFile(file);
-		
-		history.addRecord(record);	
-   }
-	
-	private void handleDeleteRecord(final long id, final Map<Long, RecordHistory> histories)
-   {
-		RecordHistoryImpl history = (RecordHistoryImpl)histories.remove(id);
-		
-		if (history == null)
-		{
-			throw new IllegalStateException("Cannot find record (delete) " + id);
-		}
-		
-		RecordHandleImpl handle = (RecordHandleImpl)history.getHandle();
-		
-		handle.recordDeleted();
-   }
-	
-	private void handleTransactionalRecord(final byte type, final long txID, final long id,
-			                                 final JournalFile file, final byte[] record,
-			                                 final byte end,
-			                                 final Map<Long, RecordHistory> histories)
+	private void repairFrom(int pos, JournalFile file) throws Exception
 	{
-		TransactionInfo tx = transactions.get(txID);
+		log.warn("Corruption has been detected in file: " + file.getFile().getFileName() +
+				   " in the record that starts at position " + pos + ". " + 
+				   "The most likely cause is that a crash occurred in the previous run. The corrupt record will be discarded.");
 		
-		if (tx == null)
-		{
-			tx = new TransactionInfo();
-			
-			transactions.put(txID, tx);
-		}
+		file.getFile().fill(pos, fileSize - pos, FILL_CHARACTER);
 		
-		TransactionEntry entry = new TransactionEntry(type, id, file, record);
-		
-		tx.entries.add(entry);
-		
-		if (end == TX_DONE)
-		{
-			transactions.remove(txID);
-			
-			playTransaction(tx, histories);
-		}
-		else if (end == TX_CONTINUE)
-		{
-			//
-		}
-		else
-		{
-			throw new IllegalStateException("Invalid transaction marker " + end);
-		}
+		file.getFile().position(pos);
 	}
 			
 	private JournalFile createFile() throws Exception
 	{
-		log.info("Creating a new file");
-		
 		long orderingID = generateOrderingID();
 		
-		String fileName = journalDir + "/" + filePrefix + "-" + orderingID + "." + fileExtension;
+		String fileName = filePrefix + "-" + orderingID + "." + fileExtension;
 						
 		SequentialFile sequentialFile = fileFactory.createSequentialFile(fileName, sync);
 		
@@ -1059,12 +1337,14 @@
 		
 		bb.flip();
 		
-		sequentialFile.write(bb);
+		sequentialFile.write(bb, true);
 		
-		sequentialFile.reset();
+		sequentialFile.position(SIZE_LONG);
 		
 		JournalFile info = new JournalFile(sequentialFile, orderingID);
 		
+		info.extendOffset(SIZE_LONG);
+		
 		return info;
 	}
 	
@@ -1096,46 +1376,19 @@
 		{
 			throw new IllegalArgumentException("Record is too large to store " + size);
 		}
-		
+
 		if (currentFile == null || fileSize - currentFile.getOffset() < size)
 		{
-			log.info("Getting new file");
+			checkAndCreateAvailableFiles();
 			
-			if (currentFile != null)
-			{
-				currentFile.getFile().close();								
-			}
+			currentFile.getFile().close();
 			
-			log.info("Getting new file");
-			
-			checkAndCreateAvailableFiles();
-			
 		   currentFile = availableFiles.remove();			
 
 			files.add(currentFile);
 		}
 	}
 	
-	private static class TransactionInfo
-	{
-		final List<TransactionEntry> entries = new ArrayList<TransactionEntry>();
-	}
-	
-	private static class TransactionEntry
-	{
-		TransactionEntry(final byte type, final long id, final JournalFile file, final byte[] record)
-		{
-			this.type = type;
-			this.id = id;
-			this.file = file;
-			this.record = record;
-		}
-		final byte type;
-		final long id;
-		final JournalFile file;
-		final byte[] record;
-	}
-	
 	private class ReclaimerTask extends TimerTask
 	{
 		public boolean cancel()
@@ -1149,7 +1402,7 @@
 		{
 			try
 			{
-				checkFilesForReclamation();
+				//checkFilesForReclamation();
 			}
 			catch (Exception e)
 			{
@@ -1174,7 +1427,7 @@
 		{
 			try
 			{
-				checkAndCreateAvailableFiles();
+				//checkAndCreateAvailableFiles();
 			}
 			catch (Exception e)
 			{

Modified: trunk/src/main/org/jboss/messaging/core/journal/impl/NIOSequentialFile.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/impl/NIOSequentialFile.java	2008-03-17 10:31:09 UTC (rev 3883)
+++ trunk/src/main/org/jboss/messaging/core/journal/impl/NIOSequentialFile.java	2008-03-17 12:04:44 UTC (rev 3884)
@@ -39,9 +39,9 @@
 public class NIOSequentialFile implements SequentialFile
 {
 	private static final Logger log = Logger.getLogger(NIOSequentialFile.class);
-		
-	private static final int LONG_LENGTH = 8;
-		
+			
+	private String journalDir;
+	
 	private String fileName;
 	
 	private boolean sync;
@@ -49,9 +49,13 @@
 	private File file;
 	
 	private FileChannel channel;
+	
+	private RandomAccessFile rfile;
 		
-	public NIOSequentialFile(final String fileName, final boolean sync)
+	public NIOSequentialFile(final String journalDir, final String fileName, final boolean sync)
 	{
+		this.journalDir = journalDir;
+		
 		this.fileName = fileName;
 		
 		this.sync = sync;		
@@ -64,18 +68,20 @@
 		
 	public void open() throws Exception
 	{		
-		file = new File(fileName);
+		file = new File(journalDir + "/" + fileName);
 
-		RandomAccessFile rfile = new RandomAccessFile(file, "rw");
+		rfile = new RandomAccessFile(file, "rw");
 
 		channel = rfile.getChannel();		
+		
+		//log.info("Opened file");
 	}
 	
 	public void fill(final int position, final int size, final byte fillCharacter) throws Exception
 	{
 		ByteBuffer bb = ByteBuffer.allocateDirect(size);
 		
-		for (int i = 0; i < size - LONG_LENGTH; i++)
+		for (int i = 0; i < size; i++)
 		{
 			bb.put(fillCharacter);			
 		}
@@ -94,36 +100,51 @@
 	public void close() throws Exception
 	{
 		channel.close();
+		
+		rfile.close();
+						
+		channel = null;
+		
+		rfile = null;
+		
+		file = null;
+		
+		//log.info("Closed file");
 	}
 
 	public void delete() throws Exception
-	{
-		close();
+	{		
+		file.delete();
 		
-		file.delete();
+		close();		
 	}
 
 	public int read(ByteBuffer bytes) throws Exception
 	{
+		//log.info("reading, position is " + channel.position());
+		
 		int bytesRead = channel.read(bytes);
 		
-		log.info("Read " + bytesRead + " bytes");
+		//log.info("Read " + bytesRead + " bytes");
 		
 		return bytesRead;
 	}
 
-	public void write(ByteBuffer bytes) throws Exception
+	public int write(ByteBuffer bytes, boolean sync) throws Exception
 	{
-		channel.write(bytes);
+		int bytesRead = channel.write(bytes);
 		
-		if (sync)
+		if (sync && this.sync)
 		{
 			channel.force(false);
 		}
+		
+		return bytesRead;
 	}
 
-	public void reset() throws Exception
+	public void position(final int pos) throws Exception
 	{
-		channel.position(0);
+		//log.info("Positioning to " + pos);
+		channel.position(pos);
 	}
 }

Modified: trunk/src/main/org/jboss/messaging/core/journal/impl/NIOSequentialFileFactory.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/impl/NIOSequentialFileFactory.java	2008-03-17 10:31:09 UTC (rev 3883)
+++ trunk/src/main/org/jboss/messaging/core/journal/impl/NIOSequentialFileFactory.java	2008-03-17 12:04:44 UTC (rev 3884)
@@ -23,7 +23,6 @@
 
 import java.io.File;
 import java.io.FilenameFilter;
-import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 
@@ -39,12 +38,19 @@
  */
 public class NIOSequentialFileFactory implements SequentialFileFactory
 {
+	private final String journalDir;
+	
+	public NIOSequentialFileFactory(final String journalDir)
+	{
+		this.journalDir = journalDir;
+	}	
+	
 	public SequentialFile createSequentialFile(final String fileName, final boolean sync)
 	{
-		return new NIOSequentialFile(fileName, sync);
+		return new NIOSequentialFile(journalDir, fileName, sync);
 	}
 
-	public List<String> listFiles(final String journalDir, final String extension) throws Exception
+	public List<String> listFiles(final String extension) throws Exception
 	{
 		File dir = new File(journalDir);
 		
@@ -52,7 +58,7 @@
 		{
 			public boolean accept(File file, String name)
 			{
-				return name.endsWith(".jbm");
+				return name.endsWith("." + extension);
 			}
 		};
 		

Deleted: trunk/src/main/org/jboss/messaging/core/journal/impl/RecordHandleImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/impl/RecordHandleImpl.java	2008-03-17 10:31:09 UTC (rev 3883)
+++ trunk/src/main/org/jboss/messaging/core/journal/impl/RecordHandleImpl.java	2008-03-17 12:04:44 UTC (rev 3884)
@@ -1,45 +0,0 @@
-package org.jboss.messaging.core.journal.impl;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.jboss.messaging.core.journal.RecordHandle;
-
-/**
- * 
- * A RecordHandleImpl
- * 
- * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- *
- */
-public class RecordHandleImpl implements RecordHandle
-{
-	private final long id;
-	
-	private List<JournalFile> files = new ArrayList<JournalFile>();
-	
-	public RecordHandleImpl(final long id)
-	{
-		this.id = id;
-	}
-	
-	public void addFile(JournalFile file)
-	{
-		files.add(file);
-		
-		file.incRefCount();
-	}
-	
-	public long getID()
-	{
-		return id;
-	}
-	
-	public void recordDeleted()
-	{
-		for (JournalFile file: files)
-		{
-			file.decRefCount();
-		}
-	}
-}

Deleted: trunk/src/main/org/jboss/messaging/core/journal/impl/RecordHistoryImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/impl/RecordHistoryImpl.java	2008-03-17 10:31:09 UTC (rev 3883)
+++ trunk/src/main/org/jboss/messaging/core/journal/impl/RecordHistoryImpl.java	2008-03-17 12:04:44 UTC (rev 3884)
@@ -1,41 +0,0 @@
-package org.jboss.messaging.core.journal.impl;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.jboss.messaging.core.journal.RecordHandle;
-import org.jboss.messaging.core.journal.RecordHistory;
-
-/**
- * 
- * A RecordHistoryImpl
- * 
- * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- *
- */
-public class RecordHistoryImpl implements RecordHistory
-{
-	private final List<byte[]> records = new ArrayList<byte[]>();
-	
-	private final RecordHandle handle;
-	
-	public RecordHistoryImpl(final RecordHandle handle)
-	{
-		this.handle = handle;
-	}
-	
-	public RecordHandle getHandle()
-	{
-		return handle;
-	}
-
-	public List<byte[]> getRecords()
-	{
-		return records;
-	}
-	
-	public void addRecord(final byte[] record)
-	{
-		records.add(record);
-	}	
-}

Added: trunk/src/main/org/jboss/messaging/core/journal/impl/TransactionHolder.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/impl/TransactionHolder.java	                        (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/journal/impl/TransactionHolder.java	2008-03-17 12:04:44 UTC (rev 3884)
@@ -0,0 +1,32 @@
+package org.jboss.messaging.core.journal.impl;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.jboss.messaging.core.journal.RecordInfo;
+
+/**
+ * 
+ * A TransactionHolder
+ * 
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ */
+public class TransactionHolder
+{
+	public TransactionHolder(final long id)
+	{
+		this.transactionID = id;
+	}
+	
+	public final long transactionID;
+	
+	public final List<RecordInfo> recordInfos = new ArrayList<RecordInfo>();
+	
+	public final Set<Long> recordsToDelete = new HashSet<Long>();
+	
+	public boolean prepared;
+	
+}

Added: trunk/tests/src/org/jboss/messaging/core/journal/impl/test/unit/FakeSequentialFileFactoryTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/core/journal/impl/test/unit/FakeSequentialFileFactoryTest.java	                        (rev 0)
+++ trunk/tests/src/org/jboss/messaging/core/journal/impl/test/unit/FakeSequentialFileFactoryTest.java	2008-03-17 12:04:44 UTC (rev 3884)
@@ -0,0 +1,46 @@
+/*
+  * JBoss, Home of Professional Open Source
+  * Copyright 2005, JBoss Inc., and individual contributors as indicated
+  * by the @authors tag. See the copyright.txt in the distribution for a
+  * full listing of individual contributors.
+  *
+  * This is free software; you can redistribute it and/or modify it
+  * under the terms of the GNU Lesser General Public License as
+  * published by the Free Software Foundation; either version 2.1 of
+  * the License, or (at your option) any later version.
+  *
+  * This software is distributed in the hope that it will be useful,
+  * but WITHOUT ANY WARRANTY; without even the implied warranty of
+  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+  * Lesser General Public License for more details.
+  *
+  * You should have received a copy of the GNU Lesser General Public
+  * License along with this software; if not, write to the Free
+  * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+  * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+  */
+package org.jboss.messaging.core.journal.impl.test.unit;
+
+import org.jboss.messaging.core.journal.SequentialFileFactory;
+import org.jboss.messaging.core.journal.impl.test.unit.fakes.FakeSequentialFileFactory;
+
+/**
+ * 
+ * A FakeSequentialFileFactoryTest
+ * 
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ */
+public class FakeSequentialFileFactoryTest extends SequentialFileFactoryTestBase
+{
+	protected void setUp() throws Exception
+	{
+		super.setUp();
+	}
+
+	protected SequentialFileFactory createFactory()
+	{
+		return new FakeSequentialFileFactory();
+	}
+
+}

Added: trunk/tests/src/org/jboss/messaging/core/journal/impl/test/unit/JournalImplTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/core/journal/impl/test/unit/JournalImplTest.java	                        (rev 0)
+++ trunk/tests/src/org/jboss/messaging/core/journal/impl/test/unit/JournalImplTest.java	2008-03-17 12:04:44 UTC (rev 3884)
@@ -0,0 +1,45 @@
+/*
+  * JBoss, Home of Professional Open Source
+  * Copyright 2005, JBoss Inc., and individual contributors as indicated
+  * by the @authors tag. See the copyright.txt in the distribution for a
+  * full listing of individual contributors.
+  *
+  * This is free software; you can redistribute it and/or modify it
+  * under the terms of the GNU Lesser General Public License as
+  * published by the Free Software Foundation; either version 2.1 of
+  * the License, or (at your option) any later version.
+  *
+  * This software is distributed in the hope that it will be useful,
+  * but WITHOUT ANY WARRANTY; without even the implied warranty of
+  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+  * Lesser General Public License for more details.
+  *
+  * You should have received a copy of the GNU Lesser General Public
+  * License along with this software; if not, write to the Free
+  * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+  * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+  */
+package org.jboss.messaging.core.journal.impl.test.unit;
+
+import org.jboss.messaging.core.journal.SequentialFileFactory;
+import org.jboss.messaging.core.journal.impl.test.unit.fakes.FakeSequentialFileFactory;
+
+/**
+ * 
+ * A JournalImplTest
+ * 
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ */
+public class JournalImplTest extends JournalImplTestBase
+{
+	protected void prepareDirectory() throws Exception
+	{				
+		//NOOP
+	}
+	
+	protected SequentialFileFactory getFileFactory() throws Exception
+	{
+		return new FakeSequentialFileFactory();
+	}
+}

Added: trunk/tests/src/org/jboss/messaging/core/journal/impl/test/unit/JournalImplTestBase.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/core/journal/impl/test/unit/JournalImplTestBase.java	                        (rev 0)
+++ trunk/tests/src/org/jboss/messaging/core/journal/impl/test/unit/JournalImplTestBase.java	2008-03-17 12:04:44 UTC (rev 3884)
@@ -0,0 +1,1345 @@
+/*
+  * JBoss, Home of Professional Open Source
+  * Copyright 2005, JBoss Inc., and individual contributors as indicated
+  * by the @authors tag. See the copyright.txt in the distribution for a
+  * full listing of individual contributors.
+  *
+  * This is free software; you can redistribute it and/or modify it
+  * under the terms of the GNU Lesser General Public License as
+  * published by the Free Software Foundation; either version 2.1 of
+  * the License, or (at your option) any later version.
+  *
+  * This software is distributed in the hope that it will be useful,
+  * but WITHOUT ANY WARRANTY; without even the implied warranty of
+  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+  * Lesser General Public License for more details.
+  *
+  * You should have received a copy of the GNU Lesser General Public
+  * License along with this software; if not, write to the Free
+  * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+  * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+  */
+package org.jboss.messaging.core.journal.impl.test.unit;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.ListIterator;
+import java.util.Map;
+
+import org.jboss.messaging.core.journal.Journal;
+import org.jboss.messaging.core.journal.PreparedTransactionInfo;
+import org.jboss.messaging.core.journal.RecordInfo;
+import org.jboss.messaging.core.journal.SequentialFileFactory;
+import org.jboss.messaging.core.journal.impl.JournalImpl;
+import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.test.unit.RandomUtil;
+import org.jboss.messaging.test.unit.UnitTestCase;
+
+/**
+ * 
+ * A JournalImplTestBase
+ * 
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ */
+public abstract class JournalImplTestBase extends UnitTestCase
+{
+	private static final Logger log = Logger.getLogger(JournalImplTestBase.class);
+	
+	private List<RecordInfo> records = new LinkedList<RecordInfo>();
+	
+	private Journal journal;
+	
+	private int recordLength = 1024;
+	
+	private Map<Long, TransactionHolder> transactions = new LinkedHashMap<Long, TransactionHolder>();
+	
+	private int minFiles;
+	
+	private int minAvailableFiles;
+	
+	private int fileSize;
+	
+	private boolean sync;
+	
+	private String filePrefix = "jbm";
+	
+	private String fileExtension = "jbm";
+	
+	private SequentialFileFactory fileFactory;
+						
+	protected void setUp() throws Exception
+	{
+		super.setUp();
+		
+		prepareDirectory();
+		
+		fileFactory = getFileFactory();
+	}
+	
+	protected void tearDown() throws Exception
+	{
+		super.tearDown();
+		
+		if (journal != null)
+		{
+			try
+			{
+				journal.stop();
+			}
+			catch (Exception ignore)
+			{				
+			}
+		}
+	}
+	
+	protected abstract void prepareDirectory() throws Exception;
+	
+	protected abstract SequentialFileFactory getFileFactory() throws Exception;
+	
+	// General tests
+	// =============
+	
+	public void testState() throws Exception
+	{
+		setup(10, 10, 10 * 1024, true);
+		createJournal();
+		try
+		{
+			load();
+			fail("Should throw exception");
+		}
+		catch (IllegalStateException e)
+		{
+			//OK
+		}
+		try
+		{
+			stopJournal();
+			fail("Should throw exception");
+		}
+		catch (IllegalStateException e)
+		{
+			//OK
+		}
+		startJournal();
+		try
+		{
+			startJournal();
+			fail("Should throw exception");
+		}
+		catch (IllegalStateException e)
+		{
+			//OK
+		}
+		stopJournal();
+		startJournal();
+		load();
+		try
+		{
+			load();
+			fail("Should throw exception");
+		}
+		catch (IllegalStateException e)
+		{
+			//OK
+		}
+		try
+		{
+			startJournal();
+			fail("Should throw exception");
+		}
+		catch (IllegalStateException e)
+		{
+			//OK
+		}
+		stopJournal();		
+	}
+	
+	public void testParams() throws Exception
+	{
+		try
+		{
+			new JournalImpl(JournalImpl.MIN_FILE_SIZE - 1, 10, 10, true, fileFactory, 5000, filePrefix, fileExtension);
+			
+			fail("Should throw exception");
+		}
+		catch (IllegalArgumentException e)
+		{
+			//Ok
+		}
+		
+		try
+		{
+			new JournalImpl(10 * 1024, 1, 10, true, fileFactory, 5000, filePrefix, fileExtension);
+			
+			fail("Should throw exception");
+		}
+		catch (IllegalArgumentException e)
+		{
+			//Ok
+		}
+		
+		try
+		{
+			new JournalImpl(10 * 1024, 10, 1, true, fileFactory, 5000, filePrefix, fileExtension);
+			
+			fail("Should throw exception");
+		}
+		catch (IllegalArgumentException e)
+		{
+			//Ok
+		}
+		
+		try
+		{
+			new JournalImpl(10 * 1024, 10, 10, true, null, 5000, filePrefix, fileExtension);
+			
+			fail("Should throw exception");
+		}
+		catch (NullPointerException e)
+		{
+			//Ok
+		}
+		
+		try
+		{
+			new JournalImpl(10 * 1024, 10, 10, true, fileFactory, JournalImpl.MIN_TASK_PERIOD - 1, filePrefix, fileExtension);
+			
+			fail("Should throw exception");
+		}
+		catch (IllegalArgumentException e)
+		{
+			//Ok
+		}
+		
+		try
+		{
+			new JournalImpl(10 * 1024, 10, 10, true, fileFactory, 5000, null, fileExtension);
+			
+			fail("Should throw exception");
+		}
+		catch (NullPointerException e)
+		{
+			//Ok
+		}
+		
+		try
+		{
+			new JournalImpl(10 * 1024, 10, 10, true, fileFactory, 5000, filePrefix, null);
+			
+			fail("Should throw exception");
+		}
+		catch (NullPointerException e)
+		{
+			//Ok
+		}
+		
+	}
+
+	// Non transactional tests
+	// =======================
+	
+	public void testSimpleAdd() throws Exception
+	{
+		setup(10, 10, 10 * 1024, true);
+		createJournal();
+		startJournal();
+		load();
+		add(1);	
+		stopJournal();
+		createJournal();
+		startJournal();
+		loadAndCheck();
+	}
+	
+	public void testMultipleAdd() throws Exception
+	{
+		setup(10, 10, 10 * 1024, true);
+		createJournal();
+		startJournal();
+		load();
+		add(1,2,3,4,5,6,7,8,9,10);		
+		stopJournal();
+		createJournal();
+		startJournal();
+		loadAndCheck();
+	}
+	
+	public void testMultipleAddNonContiguous() throws Exception
+	{
+		setup(10, 10, 10 * 1024, true);
+		createJournal();
+		startJournal();
+		load();
+		add(1,3,5,7,10,13,56,100,102,200,201,202,203);		
+		stopJournal();
+		createJournal();
+		startJournal();
+		loadAndCheck();
+	}
+	
+	public void testSimpleAddUpdate() throws Exception
+	{
+		setup(10, 10, 10 * 1024, true);
+		createJournal();
+		startJournal();
+		load();
+		add(1);		
+		update(1);
+		stopJournal();
+		createJournal();
+		startJournal();
+		loadAndCheck();
+	}
+	
+	public void testMultipleAddUpdate() throws Exception
+	{
+		setup(10, 10, 10 * 1024, true);
+		createJournal();
+		startJournal();
+		load();
+		add(1,2,3,4,5,6,7,8,9,10);		
+		update(1,2,4,7,9,10);
+		stopJournal();
+		createJournal();
+		startJournal();
+		loadAndCheck();
+	}
+	
+	public void testMultipleAddUpdateAll() throws Exception
+	{
+		setup(10, 10, 10 * 1024, true);
+		createJournal();
+		startJournal();
+		load();
+		add(1,2,3,4,5,6,7,8,9,10);		
+		update(1,2,3,4,5,6,7,8,9,10);
+		stopJournal();
+		createJournal();
+		startJournal();
+		loadAndCheck();
+	}
+	
+	public void testMultipleAddUpdateNonContiguous() throws Exception
+	{
+		setup(10, 10, 10 * 1024, true);
+		createJournal();
+		startJournal();
+		load();
+		add(1,3,5,7,10,13,56,100,102,200,201,202,203);	
+		add(3,7,10,13,56,100,200,202,203);	
+		stopJournal();
+		createJournal();
+		startJournal();
+		loadAndCheck();
+	}
+	
+	public void testMultipleAddUpdateAllNonContiguous() throws Exception
+	{
+		setup(10, 10, 10 * 1024, true);
+		createJournal();
+		startJournal();
+		load();
+		add(1,3,5,7,10,13,56,100,102,200,201,202,203);
+		update(1,3,5,7,10,13,56,100,102,200,201,202,203);
+		stopJournal();
+		createJournal();
+		startJournal();
+		loadAndCheck();
+	}
+		
+	public void testSimpleAddUpdateDelete() throws Exception
+	{
+		setup(10, 10, 10 * 1024, true);
+		createJournal();
+		startJournal();
+		load();
+		add(1);		
+		update(1);
+		delete(1);
+		stopJournal();
+		createJournal();
+		startJournal();
+		loadAndCheck();
+	}
+	
+	public void testMultipleAddUpdateDelete() throws Exception
+	{
+		setup(10, 10, 10 * 1024, true);
+		createJournal();
+		startJournal();
+		load();
+		add(1,2,3,4,5,6,7,8,9,10);		
+		update(1,2,4,7,9,10);
+		delete(1,4,7,9,10);
+		stopJournal();
+		createJournal();
+		startJournal();
+		loadAndCheck();
+	}
+	
+	public void testMultipleAddUpdateDeleteAll() throws Exception
+	{
+		setup(10, 10, 10 * 1024, true);
+		createJournal();
+		startJournal();
+		load();
+		add(1,2,3,4,5,6,7,8,9,10);		
+		update(1,2,3,4,5,6,7,8,9,10);
+		update(1,2,3,4,5,6,7,8,9,10);
+		stopJournal();
+		createJournal();
+		startJournal();
+		loadAndCheck();
+	}
+	
+	public void testMultipleAddUpdateDeleteNonContiguous() throws Exception
+	{
+		setup(10, 10, 10 * 1024, true);
+		createJournal();
+		startJournal();
+		load();
+		add(1,3,5,7,10,13,56,100,102,200,201,202,203);	
+		add(3,7,10,13,56,100,200,202,203);	
+		delete(3,10,56,100,200,203);	
+		stopJournal();
+		createJournal();
+		startJournal();
+		loadAndCheck();
+	}
+	
+	public void testMultipleAddUpdateDeleteAllNonContiguous() throws Exception
+	{
+		setup(10, 10, 10 * 1024, true);
+		createJournal();
+		startJournal();
+		load();
+		add(1,3,5,7,10,13,56,100,102,200,201,202,203);
+		update(1,3,5,7,10,13,56,100,102,200,201,202,203);
+		delete(1,3,5,7,10,13,56,100,102,200,201,202,203);
+		stopJournal();
+		createJournal();
+		startJournal();
+		loadAndCheck();		
+	}
+	
+	public void testMultipleAddUpdateDeleteDifferentOrder() throws Exception
+	{
+		setup(10, 10, 10 * 1024, true);
+		createJournal();
+		startJournal();
+		load();
+		add(1,3,5,7,10,13,56,100,102,200,201,202,203);
+		update(203, 202, 201, 200, 102, 100, 1, 3, 5, 7, 10, 13, 56);
+		delete(56, 13, 10, 7, 5, 3, 1, 203, 202, 201, 200, 102, 100);
+		stopJournal();
+		createJournal();
+		startJournal();
+		loadAndCheck();		
+	}
+		
+	public void testMultipleAddUpdateDeleteDifferentRecordLengths() throws Exception
+	{
+		setup(10, 10, 2048, true);
+		createJournal();
+		startJournal();
+		load();
+		
+		for (int i = 0; i < 1000; i++)
+		{
+			byte[] record = generateRecord(10 + (int)(1500 * Math.random()));
+			
+			journal.appendAddRecord(i, record);
+			
+			records.add(new RecordInfo(i, record, false));
+		}
+		
+		for (int i = 0; i < 1000; i++)
+		{
+			byte[] record = generateRecord(10 + (int)(1024 * Math.random()));
+			
+			journal.appendUpdateRecord(i, record);
+			
+			records.add(new RecordInfo(i, record, true));
+		}
+		
+		for (int i = 0; i < 1000; i++)
+		{
+			journal.appendDeleteRecord(i);
+			
+			removeRecordsForID(i);
+		}
+		
+		stopJournal();
+		createJournal();
+		startJournal();
+		loadAndCheck();
+		stopJournal();			
+	}
+	
+	public void testAddUpdateDeleteManySmallFileSize() throws Exception
+	{
+		final int numberAdds = 10000;
+		
+		final int numberUpdates = 5000;
+		
+		final int numberDeletes = 3000;
+						
+		long[] adds = new long[numberAdds];
+		
+		for (int i = 0; i < numberAdds; i++)
+		{
+			adds[i] = i;
+		}
+		
+		long[] updates = new long[numberUpdates];
+		
+		for (int i = 0; i < numberUpdates; i++)
+		{
+			updates[i] = i;
+		}
+		
+		long[] deletes = new long[numberDeletes];
+		
+		for (int i = 0; i < numberDeletes; i++)
+		{
+			deletes[i] = i;
+		}
+		
+		setup(10, 10, 10 * 1024, true);
+		createJournal();
+		startJournal();
+		load();
+		add(adds);
+		update(updates);
+		delete(deletes);
+		stopJournal();
+		createJournal();
+		startJournal();
+		loadAndCheck();
+		
+	}
+	
+	public void testAddUpdateDeleteManyLargeFileSize() throws Exception
+	{
+		final int numberAdds = 10000;
+		
+		final int numberUpdates = 5000;
+		
+		final int numberDeletes = 3000;
+						
+		long[] adds = new long[numberAdds];
+		
+		for (int i = 0; i < numberAdds; i++)
+		{
+			adds[i] = i;
+		}
+		
+		long[] updates = new long[numberUpdates];
+		
+		for (int i = 0; i < numberUpdates; i++)
+		{
+			updates[i] = i;
+		}
+		
+		long[] deletes = new long[numberDeletes];
+		
+		for (int i = 0; i < numberDeletes; i++)
+		{
+			deletes[i] = i;
+		}
+		
+		setup(10, 10, 10 * 1024 * 1024, true);
+		createJournal();
+		startJournal();
+		load();
+		add(adds);
+		update(updates);
+		delete(deletes);
+		stopJournal();
+		createJournal();
+		startJournal();
+		loadAndCheck();
+		
+	}
+	
+	// Transactional tests
+	// ===================
+	
+	public void testSimpleTransaction() throws Exception
+	{
+		setup(10, 10, 10 * 1024, true);
+		createJournal();
+		startJournal();
+		load();
+		addTx(1, false, 1);
+		updateTx(1, false, 1);		
+		deleteTx(1, true, 1);		
+		stopJournal();
+		createJournal();
+		startJournal();
+		loadAndCheck();		
+	}
+	
+	public void testTransactionDontDeleteAll() throws Exception
+	{
+		setup(10, 10, 10 * 1024, true);
+		createJournal();
+		startJournal();
+		load();
+		addTx(1, false, 1, 2, 3);
+		updateTx(1, false, 1, 2);		
+		deleteTx(1, true, 1);		
+		stopJournal();
+		createJournal();
+		startJournal();
+		loadAndCheck();		
+	}
+	
+	public void testTransactionDeleteAll() throws Exception
+	{
+		setup(10, 10, 10 * 1024, true);
+		createJournal();
+		startJournal();
+		load();
+		addTx(1, false, 1, 2, 3);
+		updateTx(1, false, 1, 2);		
+		deleteTx(1, true, 1, 2, 3);		
+		stopJournal();
+		createJournal();
+		startJournal();
+		loadAndCheck();		
+	}
+	
+	public void testTransactionUpdateFromBeforeTx() throws Exception
+	{
+		setup(10, 10, 10 * 1024, true);
+		createJournal();
+		startJournal();
+		load();
+		add(1, 2, 3);
+		addTx(1, false, 4, 5, 6);
+		updateTx(1, true, 1, 5);	
+		stopJournal();
+		createJournal();
+		startJournal();
+		loadAndCheck();		
+	}
+	
+	public void testTransactionDeleteFromBeforeTx() throws Exception
+	{
+		setup(10, 10, 10 * 1024, true);
+		createJournal();
+		startJournal();
+		load();
+		add(1, 2, 3);
+		addTx(1, false, 4, 5, 6);
+		deleteTx(1, true, 1, 2, 3, 4, 5, 6);
+		stopJournal();
+		createJournal();
+		startJournal();
+		loadAndCheck();		
+	}
+	
+	public void testTransactionChangesNotVisibleOutsideTX() throws Exception
+	{
+		setup(10, 10, 10 * 1024, true);
+		createJournal();
+		startJournal();
+		load();
+		add(1, 2, 3);
+		addTx(1, false, 4, 5, 6);
+		updateTx(1, false, 1, 2, 4, 5);
+		deleteTx(1, false, 1, 2, 3, 4, 5, 6);
+		stopJournal();
+		createJournal();
+		startJournal();
+		loadAndCheck();		
+	}
+	
+	public void testMultipleTransactionsDifferentIDs() throws Exception
+	{
+		setup(10, 10, 10 * 1024, true);
+		createJournal();
+		startJournal();
+		load();
+		
+		addTx(1, false, 1, 2, 3, 4, 5, 6);
+		updateTx(1, false, 1, 3, 5);
+		deleteTx(1, false, 1, 2, 3, 4, 5, 6);
+		
+		addTx(2, false, 11, 12, 13, 14, 15, 16);
+		updateTx(2, false, 11, 13, 15);
+		deleteTx(2, false, 11, 12, 13, 14, 15, 16);
+		
+		addTx(3, false, 21, 22, 23, 24, 25, 26);
+		updateTx(3, false, 21, 23, 25);
+		deleteTx(3, false, 21, 22, 23, 24, 25, 26);
+		
+		stopJournal();
+		createJournal();
+		startJournal();
+		loadAndCheck();
+	}
+	
+	public void testMultipleInterleavedTransactionsDifferentIDs() throws Exception
+	{
+		setup(10, 10, 10 * 1024, true);
+		createJournal();
+		startJournal();
+		load();
+		
+		addTx(1, false, 1, 2, 3, 4, 5, 6);
+		
+		addTx(3, false, 21, 22, 23, 24, 25, 26);
+				
+		updateTx(1, false, 1, 3, 5);
+		
+		addTx(2, false, 11, 12, 13, 14, 15, 16);
+				
+		deleteTx(1, false, 1, 2, 3, 4, 5, 6);
+						
+		updateTx(2, false, 11, 13, 15);
+		
+		updateTx(3, false, 21, 23, 25);
+			
+		deleteTx(2, false, 11, 12, 13, 14, 15, 16);
+		
+		deleteTx(3, false, 21, 22, 23, 24, 25, 26);
+		
+		stopJournal();
+		createJournal();
+		startJournal();
+		loadAndCheck();
+	}
+	
+	public void testMultipleInterleavedTransactionsSameIDs() throws Exception
+	{
+		setup(10, 10, 10 * 1024, true);
+		createJournal();
+		startJournal();
+		load();
+				
+		add(1, 2, 3, 4, 5, 6, 7, 8);
+		
+		addTx(1, false, 9, 10, 11, 12);
+		
+		addTx(2, false, 13, 14, 15, 16, 17);
+		
+		addTx(3, false, 18, 19, 20, 21, 22);
+		
+		updateTx(1, false, 1, 2, 3);
+		
+		updateTx(2, true, 4, 5, 6);
+		
+		updateTx(3, false, 7, 8);
+		
+		deleteTx(1, true, 1, 2);
+		
+		deleteTx(3, true, 7, 8);
+		
+		stopJournal();
+		createJournal();
+		startJournal();
+		loadAndCheck();
+	}
+	
+	public void testTransactionMixed() throws Exception
+	{
+		setup(10, 10, 10 * 1024, true);
+		createJournal();
+		startJournal();
+		load();
+		add(1,3,5,7,10,13,56,100,102,200,201,202,203);		
+		addTx(1, false, 675, 676, 677, 700, 703);
+		update(1,3,5,7,10,13,56,100,102,200,201,202,203);		
+		updateTx(1, false, 677, 700);		
+		delete(1,3,5,7,10,13,56,100,102,200,201,202,203);		
+		deleteTx(1, true, 703, 675, 1,3,5,7,10);		
+		stopJournal();
+		createJournal();
+		startJournal();
+		loadAndCheck();		
+	}
+	
+	public void testTransactionAddDeleteDifferentOrder() throws Exception
+	{
+		setup(10, 10, 10 * 1024, true);
+		createJournal();
+		startJournal();
+		load();		
+		addTx(1, false, 1, 2, 3, 4, 5, 6, 7, 8, 9);					
+		deleteTx(1, true, 9, 8, 5, 3, 7, 6, 2, 1, 4);		
+		stopJournal();
+		createJournal();
+		startJournal();
+		loadAndCheck();		
+	}
+	
+	
+	// XA tests
+	// ========
+	
+	public void testXASimpleNotPrepared() throws Exception
+	{
+		setup(10, 10, 10 * 1024, true);
+		createJournal();
+		startJournal();
+		load();		
+		this.addPrepare(1, false, 1, 2, 3, 4, 5, 6, 7, 8, 9);					
+		this.updatePrepare(1, false, 1, 2, 3, 4, 7, 8);
+		this.deletePrepare(1, false, 1, 2, 3, 4, 5);		
+		stopJournal();
+		createJournal();
+		startJournal();
+		loadAndCheck();		
+	}
+	
+	public void testXASimplePrepared() throws Exception
+	{
+		setup(10, 10, 10 * 1024, true);
+		createJournal();
+		startJournal();
+		load();		
+		this.addPrepare(1, false, 1, 2, 3, 4, 5, 6, 7, 8, 9);					
+		this.updatePrepare(1, false, 1, 2, 3, 4, 7, 8);
+		this.deletePrepare(1, true, 1, 2, 3, 4, 5);		
+		stopJournal();
+		createJournal();
+		startJournal();
+		loadAndCheck();		
+	}
+	
+	public void testXASimpleCommit() throws Exception
+	{
+		setup(10, 10, 10 * 1024, true);
+		createJournal();
+		startJournal();
+		load();		
+		this.addPrepare(1, false, 1, 2, 3, 4, 5, 6, 7, 8, 9);					
+		this.updatePrepare(1, false, 1, 2,3, 4, 7, 8);
+		this.deletePrepare(1, true, 1, 2, 3, 4, 5);	
+		this.xaCommit(1);
+		stopJournal();
+		createJournal();
+		startJournal();
+		loadAndCheck();		
+	}
+	
+	public void testXASimpleRollback() throws Exception
+	{
+		setup(10, 10, 10 * 1024, true);
+		createJournal();
+		startJournal();
+		load();		
+		this.addPrepare(1, false, 1, 2, 3, 4, 5, 6, 7, 8, 9);					
+		this.updatePrepare(1, false, 1, 2,3, 4, 7, 8);
+		this.deletePrepare(1, true, 1, 2, 3, 4, 5);	
+		this.xaRollback(1);
+		stopJournal();
+		createJournal();
+		startJournal();
+		loadAndCheck();		
+	}
+	
+	public void testXAChangesNotVisibleNotPrepared() throws Exception
+	{
+		setup(10, 10, 10 * 1024, true);
+		createJournal();
+		startJournal();
+		load();		
+		this.add(1, 2, 3, 4, 5, 6);
+		this.addPrepare(1, false, 7, 8, 9, 10);					
+		this.updatePrepare(1, false, 1, 2, 3, 7, 8, 9);
+		this.deletePrepare(1, false, 1, 2, 3, 4, 5);	
+		stopJournal();
+		createJournal();
+		startJournal();
+		loadAndCheck();		
+	}
+	
+	public void testXAChangesNotVisiblePrepared() throws Exception
+	{
+		setup(10, 10, 10 * 1024, true);
+		createJournal();
+		startJournal();
+		load();		
+		this.add(1, 2, 3, 4, 5, 6);
+		this.addPrepare(1, false, 7, 8, 9, 10);					
+		this.updatePrepare(1, false, 1, 2, 3, 7, 8, 9);
+		this.deletePrepare(1, true, 1, 2, 3, 4, 5);	
+		stopJournal();
+		createJournal();
+		startJournal();
+		loadAndCheck();		
+	}
+	
+	public void testXAChangesNotVisibleRollback() throws Exception
+	{
+		setup(10, 10, 10 * 1024, true);
+		createJournal();
+		startJournal();
+		load();		
+		this.add(1, 2, 3, 4, 5, 6);
+		this.addPrepare(1, false, 7, 8, 9, 10);					
+		this.updatePrepare(1, false, 1, 2, 3, 7, 8, 9);
+		this.deletePrepare(1, true, 1, 2, 3, 4, 5);	
+		this.xaRollback(1);
+		stopJournal();
+		createJournal();
+		startJournal();
+		loadAndCheck();		
+	}
+	
+	public void testXAChangesisibleCommit() throws Exception
+	{
+		setup(10, 10, 10 * 1024, true);
+		createJournal();
+		startJournal();
+		load();		
+		this.add(1, 2, 3, 4, 5, 6);
+		this.addPrepare(1, false, 7, 8, 9, 10);					
+		this.updatePrepare(1, false, 1, 2, 3, 7, 8, 9);
+		this.deletePrepare(1, true, 1, 2, 3, 4, 5);	
+		this.xaCommit(1);
+		stopJournal();
+		createJournal();
+		startJournal();
+		loadAndCheck();		
+	}
+	
+	public void testXAMultiple() throws Exception
+	{
+		setup(10, 10, 10 * 1024, true);
+		createJournal();
+		startJournal();
+		load();		
+		this.add(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
+		this.addPrepare(1, false, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20);
+		this.addPrepare(2, false, 21, 22, 23, 24, 25, 26, 27);
+		this.updatePrepare(1, false, 1, 3, 6, 11, 14, 17);
+		this.addPrepare(3, false, 28, 29, 30, 31, 32, 33, 34, 35);
+		this.updatePrepare(3, false, 7, 8, 9, 10);
+		this.deletePrepare(2, true, 4, 5, 6, 23, 25, 27);
+		this.deletePrepare(1, true, 1, 2, 11, 14, 15);
+		this.deletePrepare(3, true, 28, 31, 32, 9);
+		
+		this.xaCommit(1);
+		this.xaRollback(2);
+		this.xaCommit(3);
+	}
+	
+	// Private ---------------------------------------------------------------------------------
+	
+	private void setup(int minFiles, int minAvailableFiles, int fileSize, boolean sync)
+	{		
+		this.minFiles = minFiles;
+		this.minAvailableFiles = minAvailableFiles;
+		this.fileSize = fileSize;
+		this.sync = sync;
+	}
+	
+	public void createJournal() throws Exception
+	{		
+		journal =
+			new JournalImpl(fileSize, minFiles, minAvailableFiles, sync, fileFactory, 5000, filePrefix, fileExtension);
+	}
+		
+	private void startJournal() throws Exception
+	{
+		journal.start();
+	}
+	
+	private void stopJournal() throws Exception
+	{
+		journal.stop();		
+	}
+	
+	private void loadAndCheck() throws Exception
+	{
+		List<RecordInfo> committedRecords = new ArrayList<RecordInfo>();
+		
+		List<PreparedTransactionInfo> preparedTransactions = new ArrayList<PreparedTransactionInfo>();
+		
+		journal.load(committedRecords, preparedTransactions);
+		
+		checkRecordsEquivalent(records, committedRecords);
+		
+		//check prepared transactions
+		
+		List<PreparedTransactionInfo> prepared = new ArrayList<PreparedTransactionInfo>();
+		
+		for (Map.Entry<Long, TransactionHolder> entry : transactions.entrySet())
+		{
+			if (entry.getValue().prepared)
+			{
+				PreparedTransactionInfo info = new PreparedTransactionInfo(entry.getKey());
+				
+				info.records.addAll(entry.getValue().records);
+				
+				info.recordsToDelete.addAll(entry.getValue().deletes);
+				
+				prepared.add(info);
+			}
+		}
+		
+		checkTransactionsEquivalent(prepared, preparedTransactions);
+	}
+	
+	
+	
+	private void load() throws Exception
+	{
+		journal.load(null, null);
+	}
+	
+	private void add(long... arguments) throws Exception
+	{
+		for (int i = 0; i < arguments.length; i++)
+		{		
+			byte[] record = generateRecord(recordLength);
+			
+			journal.appendAddRecord(arguments[i], record);
+			
+			records.add(new RecordInfo(arguments[i], record, false));			
+		}
+	}
+	
+	private void update(long... arguments) throws Exception
+	{
+		for (int i = 0; i < arguments.length; i++)
+		{		
+			byte[] updateRecord = generateRecord(recordLength);
+			
+			journal.appendUpdateRecord(arguments[i], updateRecord);
+			
+			records.add(new RecordInfo(arguments[i], updateRecord, true));	
+		}
+	}
+	
+	private void delete(long... arguments) throws Exception
+	{
+		for (int i = 0; i < arguments.length; i++)
+		{		
+			journal.appendDeleteRecord(arguments[i]);
+			
+			removeRecordsForID(arguments[i]);
+		}
+	}
+			
+	private void addTx(long txID, boolean done, long... arguments) throws Exception
+	{
+		TransactionHolder tx = transactions.get(txID);
+		
+		if (tx == null)
+		{
+			tx = new TransactionHolder();
+			
+			transactions.put(txID, tx);
+		}
+		
+		for (int i = 0; i < arguments.length; i++)
+		{		
+			byte[] record = generateRecord(recordLength);
+			
+			boolean useDone = done ? i == arguments.length - 1 : false;
+			
+			journal.appendAddRecordTransactional(txID, arguments[i], record, useDone);
+			
+			tx.records.add(new RecordInfo(arguments[i], record, false));
+			
+		}
+		
+		if (done)
+		{
+			commitTx(txID);
+		}
+	}
+	
+	private void addPrepare(long txID, boolean done, long... arguments) throws Exception
+	{
+		TransactionHolder tx = transactions.get(txID);
+		
+		if (tx == null)
+		{
+			tx = new TransactionHolder();
+			
+			transactions.put(txID, tx);
+		}
+		
+		for (int i = 0; i < arguments.length; i++)
+		{		
+			byte[] record = generateRecord(recordLength);
+			
+			boolean useDone = done ? i == arguments.length - 1 : false;
+			
+			journal.appendAddRecordPrepare(txID, arguments[i], record, useDone);
+			
+			tx.records.add(new RecordInfo(arguments[i], record, false));
+			
+		}
+		
+		if (done)
+		{
+			tx.prepared = true;
+		}
+	}
+	
+	private void updateTx(long txID, boolean done, long... arguments) throws Exception
+	{
+		TransactionHolder tx = transactions.get(txID);
+		
+		if (tx == null)
+		{
+			throw new IllegalStateException("Cannot find tx " + txID);
+		}
+		
+		for (int i = 0; i < arguments.length; i++)
+		{		
+			byte[] updateRecord = generateRecord(recordLength);
+			
+			boolean useDone = done ? i == arguments.length - 1 : false;
+						
+			journal.appendUpdateRecordTransactional(txID, arguments[i], updateRecord, useDone);
+			
+			tx.records.add(new RecordInfo(arguments[i], updateRecord, true));
+		}		
+		
+		if (done)
+		{
+			commitTx(txID);
+		}
+	}
+	
+	private void updatePrepare(long txID, boolean done, long... arguments) throws Exception
+	{
+		TransactionHolder tx = transactions.get(txID);
+		
+		if (tx == null)
+		{
+			throw new IllegalStateException("Cannot find tx " + txID);
+		}
+		
+		if (tx.prepared)
+		{
+			throw new IllegalStateException("Transaction is already prepared");
+		}
+		
+		for (int i = 0; i < arguments.length; i++)
+		{		
+			byte[] updateRecord = generateRecord(recordLength);
+			
+			boolean useDone = done ? i == arguments.length - 1 : false;
+						
+			journal.appendUpdateRecordPrepare(txID, arguments[i], updateRecord, useDone);
+			
+			tx.records.add(new RecordInfo(arguments[i], updateRecord, true));
+		}		
+		
+		if (done)
+		{
+			tx.prepared = true;
+		}
+	}
+	
+	private void deleteTx(long txID, boolean done, long... arguments) throws Exception
+	{
+		TransactionHolder tx = transactions.get(txID);
+		
+		if (tx == null)
+		{
+			throw new IllegalStateException("Cannot find tx " + txID);
+		}
+		
+		for (int i = 0; i < arguments.length; i++)
+		{		
+			boolean useDone = done ? i == arguments.length - 1 : false;
+						
+			journal.appendDeleteRecordTransactional(txID, arguments[i], useDone);
+			
+			tx.deletes.add(arguments[i]);			
+		}
+		
+		if (done)
+		{
+			commitTx(txID);
+		}
+	}
+	
+	private void deletePrepare(long txID, boolean done, long... arguments) throws Exception
+	{
+		TransactionHolder tx = transactions.get(txID);
+		
+		if (tx == null)
+		{
+			throw new IllegalStateException("Cannot find tx " + txID);
+		}
+		
+		if (tx.prepared)
+		{
+			throw new IllegalStateException("Transaction is already prepared");
+		}
+		
+		for (int i = 0; i < arguments.length; i++)
+		{		
+			boolean useDone = done ? i == arguments.length - 1 : false;
+						
+			journal.appendDeleteRecordPrepare(txID, arguments[i], useDone);
+			
+			tx.deletes.add(arguments[i]);			
+		}
+		
+		if (done)
+		{
+			tx.prepared = true;
+		}
+	}
+	
+	private void xaCommit(long txID) throws Exception
+	{
+		TransactionHolder tx = transactions.get(txID);
+		
+		if (tx == null)
+		{
+			throw new IllegalStateException("Cannot find tx " + txID);
+		}
+		
+		if (!tx.prepared)
+		{
+			throw new IllegalStateException("Transaction is not prepared");
+		}
+		
+		journal.appendXACommitRecord(txID);
+		
+		this.commitTx(txID);
+	}
+	
+	private void xaRollback(long txID) throws Exception
+	{
+		TransactionHolder tx = transactions.remove(txID);
+		
+		if (tx == null)
+		{
+			throw new IllegalStateException("Cannot find tx " + txID);
+		}
+		
+		if (!tx.prepared)
+		{
+			throw new IllegalStateException("Transaction is not prepared");
+		}
+		
+		journal.appendXARollbackRecord(txID);
+	}
+	
+	private void commitTx(long txID)
+	{
+		TransactionHolder tx = transactions.remove(txID);
+		
+		if (tx == null)
+		{
+			throw new IllegalStateException("Cannot find tx " + txID);
+		}
+		
+		records.addAll(tx.records);
+		
+		for (Long l: tx.deletes)
+		{
+			removeRecordsForID(l);
+		}
+	}
+	
+	private void removeRecordsForID(long id)
+	{
+		for (ListIterator<RecordInfo> iter = records.listIterator(); iter.hasNext();)
+		{
+			RecordInfo info = iter.next();
+			
+			if (info.id == id)
+			{
+				iter.remove();
+			}
+		}
+	}
+	
+	
+	private void checkTransactionsEquivalent(List<PreparedTransactionInfo> expected, List<PreparedTransactionInfo> actual)
+	{
+		assertEquals("Lists not same length", expected.size(), actual.size());
+		
+		Iterator<PreparedTransactionInfo> iterExpected = expected.iterator();
+		
+		Iterator<PreparedTransactionInfo> iterActual = actual.iterator();
+		
+		while (iterExpected.hasNext())
+		{
+			PreparedTransactionInfo rexpected = iterExpected.next();
+			
+			PreparedTransactionInfo ractual = iterActual.next();
+			
+			assertEquals("ids not same", rexpected.id, ractual.id);
+			
+			checkRecordsEquivalent(rexpected.records, ractual.records);
+			
+			assertEquals("deletes size not same", rexpected.recordsToDelete.size(), ractual.recordsToDelete.size());
+			
+			Iterator<Long> iterDeletesExpected = rexpected.recordsToDelete.iterator();
+			
+			Iterator<Long> iterDeletesActual = ractual.recordsToDelete.iterator();
+			
+			while (iterDeletesExpected.hasNext())
+			{
+				long lexpected = iterDeletesExpected.next();
+				
+				long lactual = iterDeletesActual.next();
+				
+				assertEquals("Delete ids not same", lexpected, lactual);
+			}
+		}
+	}
+	
+	private void checkRecordsEquivalent(List<RecordInfo> expected, List<RecordInfo> actual)
+	{
+		assertEquals("Lists not same length", expected.size(), actual.size());
+		
+		Iterator<RecordInfo> iterExpected = expected.iterator();
+		
+		Iterator<RecordInfo> iterActual = actual.iterator();
+		
+		while (iterExpected.hasNext())
+		{
+			RecordInfo rexpected = iterExpected.next();
+			
+			RecordInfo ractual = iterActual.next();
+			
+			assertEquals("ids not same", rexpected.id, ractual.id);
+			
+			assertEquals("type not same", rexpected.isUpdate, ractual.isUpdate);
+			
+			assertByteArraysEquivalent(rexpected.data, ractual.data);
+		}		
+	}
+	
+	private byte[] generateRecord(int length)
+	{
+		byte[] record = new byte[length];
+		for (int i = 0; i < length; i++)
+		{
+			record[i] = RandomUtil.randomByte();
+		}
+		return record;
+	}
+	
+	class TransactionHolder
+	{
+		List<RecordInfo> records = new ArrayList<RecordInfo>();
+		
+		List<Long> deletes = new ArrayList<Long>();
+		
+		boolean prepared;
+	}
+	
+}

Deleted: trunk/tests/src/org/jboss/messaging/core/journal/impl/test/unit/JournalTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/core/journal/impl/test/unit/JournalTest.java	2008-03-17 10:31:09 UTC (rev 3883)
+++ trunk/tests/src/org/jboss/messaging/core/journal/impl/test/unit/JournalTest.java	2008-03-17 12:04:44 UTC (rev 3884)
@@ -1,236 +0,0 @@
-/*
-  * JBoss, Home of Professional Open Source
-  * Copyright 2005, JBoss Inc., and individual contributors as indicated
-  * by the @authors tag. See the copyright.txt in the distribution for a
-  * full listing of individual contributors.
-  *
-  * This is free software; you can redistribute it and/or modify it
-  * under the terms of the GNU Lesser General Public License as
-  * published by the Free Software Foundation; either version 2.1 of
-  * the License, or (at your option) any later version.
-  *
-  * This software is distributed in the hope that it will be useful,
-  * but WITHOUT ANY WARRANTY; without even the implied warranty of
-  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
-  * Lesser General Public License for more details.
-  *
-  * You should have received a copy of the GNU Lesser General Public
-  * License along with this software; if not, write to the Free
-  * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
-  * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
-  */
-package org.jboss.messaging.core.journal.impl.test.unit;
-
-import java.io.File;
-import java.nio.ByteBuffer;
-import java.util.Map;
-
-import org.jboss.messaging.core.journal.impl.JournalImpl;
-import org.jboss.messaging.core.journal.impl.test.unit.fakes.FakeSequentialFileFactory;
-import org.jboss.messaging.core.journal.impl.test.unit.fakes.FakeSequentialFileFactory.FakeSequentialFile;
-import org.jboss.messaging.core.logging.Logger;
-import org.jboss.messaging.test.unit.UnitTestCase;
-
-/**
- * 
- * A JournalTest
- * 
- * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- *
- */
-public class JournalTest extends UnitTestCase
-{
-	private static final Logger log = Logger.getLogger(JournalTest.class);
-	
-	private String journalDir = System.getProperty("user.home") + "/journal-test";
-	
-	private FakeSequentialFileFactory factory = new FakeSequentialFileFactory();
-	
-	protected void setUp() throws Exception
-	{
-		super.setUp();
-		
-		File file = new File(journalDir);
-		
-		deleteDirectory(file);
-		
-		file.mkdir();		
-	}
-	
-	public void testLoad() throws Exception
-	{
-		final int minFiles = 10;
-		
-		final int minAvailableFiles = 10;
-		
-		final int fileSize = 10 * 1024;
-		
-		final boolean sync = true;
-		
-		final String filePrefix = "jbm";
-		
-		final String fileExtension = "jbm";
-		
-		long timeStart = System.currentTimeMillis();
-		
-		JournalImpl journal =
-			new JournalImpl(journalDir, fileSize, minFiles, minAvailableFiles, sync, factory, 5000, filePrefix, fileExtension);
-		
-		journal.load();
-		
-		long timeEnd = System.currentTimeMillis();
-		
-		assertEquals(1, journal.getFiles().size());
-		assertEquals(minFiles - 1, journal.getAvailableFiles().size());
-	
-		assertEquals(minFiles, factory.getFileMap().size());
-		
-		for (Map.Entry<String, FakeSequentialFile> entry: factory.getFileMap().entrySet())
-		{
-			FakeSequentialFile file = (FakeSequentialFile)entry.getValue();
-			
-			assertEquals(sync, file.isSync());
-			
-			assertTrue(file.isOpen());
-			
-			byte[] bytes = file.getData().array();
-			
-			assertEquals(fileSize, bytes.length);
-									
-			//First four bytes should be ordering id timestamp
-			
-			ByteBuffer bb = ByteBuffer.wrap(bytes, 0, 8);
-			long orderingID = bb.getLong();
-			
-			String expectedFilename =
-				journalDir + "/" + filePrefix + "-" + orderingID + "." + fileExtension;
-			
-			assertEquals(expectedFilename, file.getFileName());
-			
-			log.info("Ordering id is " + orderingID);
-			
-			assertTrue(orderingID >= timeStart);
-			
-			assertTrue(orderingID <= timeEnd);
-			
-			for (int i = 8; i < bytes.length; i++)
-			{
-				if (bytes[i] != JournalImpl.FILL_CHARACTER)
-				{
-					fail("Not filled correctly");
-				}
-			}
-		}
-		
-		journal.stop();
-		
-		for (Map.Entry<String, FakeSequentialFile> entry: factory.getFileMap().entrySet())
-		{
-			FakeSequentialFile file = (FakeSequentialFile)entry.getValue();
-			
-			assertFalse(file.isOpen());
-		}
-		
-		assertEquals(0, journal.getFiles().size());
-		assertEquals(0, journal.getAvailableFiles().size());
-
-		//Now reload
-		
-		journal = new JournalImpl(journalDir, fileSize, minFiles, minAvailableFiles, sync, factory, 5000, filePrefix, fileExtension);
-		
-		
-		log.info("******** reloading");
-		
-		journal.load();
-		
-		assertEquals(1, journal.getFiles().size());
-		assertEquals(minFiles - 1, journal.getAvailableFiles().size());
-	
-		assertEquals(minFiles, factory.getFileMap().size());
-		
-		for (Map.Entry<String, FakeSequentialFile> entry: factory.getFileMap().entrySet())
-		{
-			FakeSequentialFile file = (FakeSequentialFile)entry.getValue();
-			
-			assertEquals(sync, file.isSync());
-			
-			assertTrue(file.isOpen());
-			
-			byte[] bytes = file.getData().array();
-			
-			assertEquals(fileSize, bytes.length);
-									
-			//First four bytes should be ordering id timestamp
-			
-			ByteBuffer bb = ByteBuffer.wrap(bytes, 0, 8);
-			long orderingID = bb.getLong();
-			
-			String expectedFilename =
-				journalDir + "/" + filePrefix + "-" + orderingID + "." + fileExtension;
-			
-			assertEquals(expectedFilename, file.getFileName());
-			
-			log.info("Ordering id is " + orderingID);
-			
-			assertTrue(orderingID >= timeStart);
-			
-			assertTrue(orderingID <= timeEnd);
-			
-			for (int i = 8; i < bytes.length; i++)
-			{
-				if (bytes[i] != JournalImpl.FILL_CHARACTER)
-				{
-					fail("Not filled correctly");
-				}
-			}
-		}
-		
-		
-	}
-
-//	public void test1() throws Exception
-//	{		
-//		File file = new File(journalDir);
-//		
-//		JournalImpl journal = new JournalImpl(journalDir, 10 * 1024 * 1024, 10, true, factory);
-//		
-//		journal.load();
-//		
-//		long start = System.currentTimeMillis();
-//		
-//		byte[] bytes = new byte[1024];
-//
-//		for (int i = 0; i < bytes.length; i++)
-//		{
-//			if (i % 100 == 0)
-//			{
-//				bytes[i] = '\n';
-//			}
-//			else
-//			{
-//				bytes[i] = 'T';
-//			}
-//		}
-//		
-//		final int numIts = 50000;
-//		
-//		for (int i = 0; i < numIts; i++)
-//		{
-//			journal.add(1, bytes);
-//		}
-//				
-//		long end = System.currentTimeMillis();
-//		
-//		long numbytes = numIts * 1024;
-//		
-//		double actualRate = 1000 * (double)numbytes / ( end - start);
-//      
-//      log.info("Rate: (bytes/sec) " + actualRate);
-//      
-//      double recordRate = 1000 * (double)numIts / ( end - start);
-//      
-//      log.info("Rate: (records/sec) " + recordRate);
-//		
-//	}
-
-}

Added: trunk/tests/src/org/jboss/messaging/core/journal/impl/test/unit/NIOSequentialFileFactoryTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/core/journal/impl/test/unit/NIOSequentialFileFactoryTest.java	                        (rev 0)
+++ trunk/tests/src/org/jboss/messaging/core/journal/impl/test/unit/NIOSequentialFileFactoryTest.java	2008-03-17 12:04:44 UTC (rev 3884)
@@ -0,0 +1,56 @@
+/*
+  * JBoss, Home of Professional Open Source
+  * Copyright 2005, JBoss Inc., and individual contributors as indicated
+  * by the @authors tag. See the copyright.txt in the distribution for a
+  * full listing of individual contributors.
+  *
+  * This is free software; you can redistribute it and/or modify it
+  * under the terms of the GNU Lesser General Public License as
+  * published by the Free Software Foundation; either version 2.1 of
+  * the License, or (at your option) any later version.
+  *
+  * This software is distributed in the hope that it will be useful,
+  * but WITHOUT ANY WARRANTY; without even the implied warranty of
+  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+  * Lesser General Public License for more details.
+  *
+  * You should have received a copy of the GNU Lesser General Public
+  * License along with this software; if not, write to the Free
+  * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+  * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+  */
+package org.jboss.messaging.core.journal.impl.test.unit;
+
+import java.io.File;
+
+import org.jboss.messaging.core.journal.SequentialFileFactory;
+import org.jboss.messaging.core.journal.impl.NIOSequentialFileFactory;
+
+/**
+ * 
+ * A NIOSequentialFileFactoryTest
+ * 
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ */
+public class NIOSequentialFileFactoryTest extends SequentialFileFactoryTestBase
+{		
+	protected String journalDir = System.getProperty("user.home") + "/journal-test";
+		
+	protected void setUp() throws Exception
+	{
+		super.setUp();
+		
+		File file = new File(journalDir);
+		
+		deleteDirectory(file);
+		
+		file.mkdir();		
+	}
+
+	protected SequentialFileFactory createFactory()
+	{
+		return new NIOSequentialFileFactory(journalDir);
+	}
+
+}

Added: trunk/tests/src/org/jboss/messaging/core/journal/impl/test/unit/RealJournalImplTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/core/journal/impl/test/unit/RealJournalImplTest.java	                        (rev 0)
+++ trunk/tests/src/org/jboss/messaging/core/journal/impl/test/unit/RealJournalImplTest.java	2008-03-17 12:04:44 UTC (rev 3884)
@@ -0,0 +1,56 @@
+/*
+  * JBoss, Home of Professional Open Source
+  * Copyright 2005, JBoss Inc., and individual contributors as indicated
+  * by the @authors tag. See the copyright.txt in the distribution for a
+  * full listing of individual contributors.
+  *
+  * This is free software; you can redistribute it and/or modify it
+  * under the terms of the GNU Lesser General Public License as
+  * published by the Free Software Foundation; either version 2.1 of
+  * the License, or (at your option) any later version.
+  *
+  * This software is distributed in the hope that it will be useful,
+  * but WITHOUT ANY WARRANTY; without even the implied warranty of
+  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+  * Lesser General Public License for more details.
+  *
+  * You should have received a copy of the GNU Lesser General Public
+  * License along with this software; if not, write to the Free
+  * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+  * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+  */
+package org.jboss.messaging.core.journal.impl.test.unit;
+
+import java.io.File;
+
+import org.jboss.messaging.core.journal.SequentialFileFactory;
+import org.jboss.messaging.core.journal.impl.NIOSequentialFileFactory;
+import org.jboss.messaging.core.logging.Logger;
+
+/**
+ * 
+ * A RealJournalImplTest
+ * 
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ */
+public class RealJournalImplTest extends JournalImplTestBase
+{
+	private static final Logger log = Logger.getLogger(RealJournalImplTest.class);
+	
+	protected String journalDir = System.getProperty("user.home") + "/journal-test";
+		
+	protected void prepareDirectory() throws Exception
+	{				
+		File file = new File(journalDir);
+		
+		deleteDirectory(file);
+		
+		file.mkdir();		
+	}
+	
+	protected SequentialFileFactory getFileFactory() throws Exception
+	{
+		return new NIOSequentialFileFactory(journalDir);
+	}
+}

Added: trunk/tests/src/org/jboss/messaging/core/journal/impl/test/unit/SequentialFileFactoryTestBase.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/core/journal/impl/test/unit/SequentialFileFactoryTestBase.java	                        (rev 0)
+++ trunk/tests/src/org/jboss/messaging/core/journal/impl/test/unit/SequentialFileFactoryTestBase.java	2008-03-17 12:04:44 UTC (rev 3884)
@@ -0,0 +1,326 @@
+/*
+  * JBoss, Home of Professional Open Source
+  * Copyright 2005, JBoss Inc., and individual contributors as indicated
+  * by the @authors tag. See the copyright.txt in the distribution for a
+  * full listing of individual contributors.
+  *
+  * This is free software; you can redistribute it and/or modify it
+  * under the terms of the GNU Lesser General Public License as
+  * published by the Free Software Foundation; either version 2.1 of
+  * the License, or (at your option) any later version.
+  *
+  * This software is distributed in the hope that it will be useful,
+  * but WITHOUT ANY WARRANTY; without even the implied warranty of
+  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+  * Lesser General Public License for more details.
+  *
+  * You should have received a copy of the GNU Lesser General Public
+  * License along with this software; if not, write to the Free
+  * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+  * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+  */
+package org.jboss.messaging.core.journal.impl.test.unit;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+
+import org.jboss.messaging.core.journal.SequentialFile;
+import org.jboss.messaging.core.journal.SequentialFileFactory;
+import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.test.unit.UnitTestCase;
+
+/**
+ * 
+ * A SequentialFileFactoryTestBase
+ * 
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ */
+public abstract class SequentialFileFactoryTestBase extends UnitTestCase
+{
+	private static final Logger log = Logger.getLogger(SequentialFileFactoryTestBase.class);
+	
+	
+	protected void setUp() throws Exception
+	{
+		super.setUp();
+		
+		factory = createFactory();
+	}
+	
+	protected abstract SequentialFileFactory createFactory();
+	
+	private SequentialFileFactory factory;
+		
+	public void testCreateAndListFiles() throws Exception
+	{
+		List<String> expectedFiles = new ArrayList<String>();
+		
+		final int numFiles = 10;
+		
+		for (int i = 0; i < numFiles; i++)
+		{
+			String fileName = UUID.randomUUID().toString() + ".jbm";
+			
+			expectedFiles.add(fileName);
+			
+			SequentialFile sf = factory.createSequentialFile(fileName, false);
+			
+			sf.open();
+			
+			assertEquals(fileName, sf.getFileName());
+		}				
+		
+		//Create a couple with a different extension - they shouldn't be picked up
+		
+		SequentialFile sf1 = factory.createSequentialFile("different.file", false);
+		sf1.open();
+		
+		SequentialFile sf2 = factory.createSequentialFile("different.cheese", false);
+		sf2.open();
+						
+		List<String> fileNames = factory.listFiles("jbm");
+		
+		assertEquals(expectedFiles.size(), fileNames.size());
+		
+		for (String fileName: expectedFiles)
+		{
+			assertTrue(fileNames.contains(fileName));
+		}
+		
+		fileNames = factory.listFiles("file");
+		
+		assertEquals(1, fileNames.size());
+		
+		assertTrue(fileNames.contains("different.file"));	
+		
+		fileNames = factory.listFiles("cheese");
+		
+		assertEquals(1, fileNames.size());
+		
+		assertTrue(fileNames.contains("different.cheese"));	
+	}
+	
+	
+	public void testFill() throws Exception
+	{
+		SequentialFile sf = factory.createSequentialFile("fill.jbm", true);
+		
+		sf.open();
+		
+		checkFill(sf, 0, 100, (byte)'X');
+		
+		checkFill(sf, 13, 300, (byte)'Y');
+		
+		checkFill(sf, 0, 1, (byte)'Z');
+		
+		checkFill(sf, 100, 1, (byte)'A');
+		
+		checkFill(sf, 1000, 10000, (byte)'B');
+	}
+	
+	public void testDelete() throws Exception
+	{
+		SequentialFile sf = factory.createSequentialFile("delete-me.jbm", true);
+		
+		sf.open();
+		
+		SequentialFile sf2 = factory.createSequentialFile("delete-me2.jbm", true);
+		
+		sf2.open();
+		
+		List<String> fileNames = factory.listFiles("jbm");
+		
+		assertEquals(2, fileNames.size());
+		
+		assertTrue(fileNames.contains("delete-me.jbm"));
+		
+		assertTrue(fileNames.contains("delete-me2.jbm"));
+		
+		sf.delete();
+		
+		fileNames = factory.listFiles("jbm");
+		
+		assertEquals(1, fileNames.size());
+		
+		assertTrue(fileNames.contains("delete-me2.jbm"));
+		
+	}
+	
+	public void testWriteandRead() throws Exception
+	{
+		SequentialFile sf = factory.createSequentialFile("write.jbm", true);
+		
+		sf.open();
+		
+		String s1 = "aardvark";
+		byte[] bytes1 = s1.getBytes("UTF-8");
+		ByteBuffer bb1 = ByteBuffer.wrap(bytes1);
+		
+		String s2 = "hippopotamus";
+		byte[] bytes2 = s2.getBytes("UTF-8");
+		ByteBuffer bb2 = ByteBuffer.wrap(bytes2);
+		
+		String s3 = "echidna";
+		byte[] bytes3 = s3.getBytes("UTF-8");
+		ByteBuffer bb3 = ByteBuffer.wrap(bytes3);
+		
+		int bytesWritten = sf.write(bb1, true);
+		
+		assertEquals(bytes1.length, bytesWritten);
+		
+		bytesWritten = sf.write(bb2, true);
+		
+		assertEquals(bytes2.length, bytesWritten);
+		
+		bytesWritten = sf.write(bb3, true);
+		
+		assertEquals(bytes3.length, bytesWritten);
+		
+		sf.position(0);
+		
+		byte[] rbytes1 = new byte[bytes1.length];
+		
+		byte[] rbytes2 = new byte[bytes2.length];
+		
+		byte[] rbytes3 = new byte[bytes3.length];
+		
+		ByteBuffer rb1 = ByteBuffer.wrap(rbytes1);
+		ByteBuffer rb2 = ByteBuffer.wrap(rbytes2);
+		ByteBuffer rb3 = ByteBuffer.wrap(rbytes3);
+
+		int bytesRead = sf.read(rb1);
+		assertEquals(rbytes1.length, bytesRead);		
+		assertByteArraysEquivalent(bytes1, rbytes1);
+		
+		bytesRead = sf.read(rb2);
+		assertEquals(rbytes2.length, bytesRead);		
+		assertByteArraysEquivalent(bytes2, rbytes2);
+		
+		bytesRead = sf.read(rb3);
+		assertEquals(rbytes3.length, bytesRead);		
+		assertByteArraysEquivalent(bytes3, rbytes3);				
+	}
+	
+	public void testPosition() throws Exception
+	{
+		SequentialFile sf = factory.createSequentialFile("position.jbm", true);
+		
+		sf.open();
+		
+		String s1 = "orange";
+		byte[] bytes1 = s1.getBytes("UTF-8");
+		ByteBuffer bb1 = ByteBuffer.wrap(bytes1);
+		
+		String s2 = "grapefruit";
+		byte[] bytes2 = s2.getBytes("UTF-8");
+		ByteBuffer bb2 = ByteBuffer.wrap(bytes2);
+		
+		String s3 = "lemon";
+		byte[] bytes3 = s3.getBytes("UTF-8");
+		ByteBuffer bb3 = ByteBuffer.wrap(bytes3);
+		
+		int bytesWritten = sf.write(bb1, true);
+		
+		assertEquals(bytes1.length, bytesWritten);
+		
+		bytesWritten = sf.write(bb2, true);
+		
+		assertEquals(bytes2.length, bytesWritten);
+		
+		bytesWritten = sf.write(bb3, true);
+		
+		assertEquals(bytes3.length, bytesWritten);
+		
+		byte[] rbytes1 = new byte[bytes1.length];
+		
+		byte[] rbytes2 = new byte[bytes2.length];
+		
+		byte[] rbytes3 = new byte[bytes3.length];
+		
+		ByteBuffer rb1 = ByteBuffer.wrap(rbytes1);
+		ByteBuffer rb2 = ByteBuffer.wrap(rbytes2);
+		ByteBuffer rb3 = ByteBuffer.wrap(rbytes3);
+		
+		sf.position(bytes1.length + bytes2.length);
+		
+		int bytesRead = sf.read(rb3);
+		assertEquals(rbytes3.length, bytesRead);		
+		assertByteArraysEquivalent(bytes3, rbytes3);		
+		
+		sf.position(bytes1.length);
+		
+		bytesRead = sf.read(rb2);
+		assertEquals(rbytes2.length, bytesRead);		
+		assertByteArraysEquivalent(bytes2, rbytes2);
+		
+		sf.position(0);
+		
+		bytesRead = sf.read(rb1);
+		assertEquals(rbytes1.length, bytesRead);		
+		assertByteArraysEquivalent(bytes1, rbytes1);		
+	}
+	
+	public void testOpenClose() throws Exception
+	{
+		SequentialFile sf = factory.createSequentialFile("openclose.jbm", true);
+		
+		sf.open();
+		
+		String s1 = "cheesecake";
+		byte[] bytes1 = s1.getBytes("UTF-8");
+		ByteBuffer bb1 = ByteBuffer.wrap(bytes1);
+		
+		int bytesWritten = sf.write(bb1, true);
+		
+		assertEquals(bytes1.length, bytesWritten);
+		
+		sf.close();
+		
+		try
+		{
+			sf.write(bb1, true);
+			
+			fail("Should throw exception");
+		}
+		catch (Exception e)
+		{
+			//OK
+		}
+		
+		sf.open();
+		
+		sf.write(bb1, true);
+	}
+	
+	// Private ---------------------------------
+	
+	private void checkFill(SequentialFile file, int pos, int size, byte fillChar) throws Exception
+	{
+		file.fill(pos, size, fillChar);
+		
+		file.close();
+		
+		file.open();
+		
+		file.position(pos);
+		
+		byte[] bytes = new byte[size];
+		
+		ByteBuffer bb = ByteBuffer.wrap(bytes);
+		
+		int bytesRead = file.read(bb);
+		
+		assertEquals(size, bytesRead);
+		
+		for (int i = 0; i < size; i++)
+		{
+			//log.info(" i is " + i);
+			assertEquals(fillChar, bytes[i]);
+		}
+				
+	}
+	
+}

Added: trunk/tests/src/org/jboss/messaging/core/journal/impl/test/unit/fakes/FakeSequentialFile.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/core/journal/impl/test/unit/fakes/FakeSequentialFile.java	                        (rev 0)
+++ trunk/tests/src/org/jboss/messaging/core/journal/impl/test/unit/fakes/FakeSequentialFile.java	2008-03-17 12:04:44 UTC (rev 3884)
@@ -0,0 +1,6 @@
+package org.jboss.messaging.core.journal.impl.test.unit.fakes;
+
+public class FakeSequentialFile
+{
+
+}

Modified: trunk/tests/src/org/jboss/messaging/core/journal/impl/test/unit/fakes/FakeSequentialFileFactory.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/core/journal/impl/test/unit/fakes/FakeSequentialFileFactory.java	2008-03-17 10:31:09 UTC (rev 3883)
+++ trunk/tests/src/org/jboss/messaging/core/journal/impl/test/unit/fakes/FakeSequentialFileFactory.java	2008-03-17 12:04:44 UTC (rev 3884)
@@ -58,15 +58,25 @@
 		{		
 			sf.data.position(0);
 			
-			log.info("positioning data to 0");
+			//log.info("positioning data to 0");
 		}
 						
 		return sf;
 	}
 	
-	public List<String> listFiles(String journalDir, String extension)
+	public List<String> listFiles(final String extension)
 	{
-		return new ArrayList<String>(fileMap.keySet());
+		List<String> files = new ArrayList<String>();
+		
+		for (String s: fileMap.keySet())
+		{
+			if (s.endsWith("." + extension))
+			{
+				files.add(s);
+			}
+		}
+		
+		return files;
 	}
 	
 	public Map<String, FakeSequentialFile> getFileMap()
@@ -101,7 +111,7 @@
 		
 		public boolean isOpen()
 		{
-			log.info("is open" + System.identityHashCode(this) +" open is now " + open);
+			//log.info("is open" + System.identityHashCode(this) +" open is now " + open);
 			return open;
 		}
 		
@@ -116,7 +126,10 @@
 		{
 			open = false;
 			
-			log.info("Calling close " + System.identityHashCode(this) +" open is now " + open);
+			if (data != null)
+			{
+				data.position(0);
+			}
 		}
 
 		public void delete() throws Exception
@@ -141,7 +154,7 @@
 
 		public void open() throws Exception
 		{
-			log.info("open called");
+			//log.info("open called");
 			
 			if (open)
 			{
@@ -158,18 +171,18 @@
 				throw new IllegalStateException("Is closed");
 			}
 			
-			log.info("pre-allocate called " + size +" , " + fillCharacter);
+			checkAndResize(pos + size);
 			
-			byte[] bytes = new byte[size];
+			//log.info("size is " + size + " pos is " + pos);
 			
 			for (int i = pos; i < size + pos; i++)
 			{
-				bytes[i] = fillCharacter;
-			}
-			
-			data = ByteBuffer.wrap(bytes);		
+				data.array()[i] = fillCharacter;
+				
+				//log.info("Filling " + pos + " with char " + fillCharacter);
+			}						
 		}
-
+		
 		public int read(ByteBuffer bytes) throws Exception
 		{
 			if (!open)
@@ -177,11 +190,11 @@
 				throw new IllegalStateException("Is closed");
 			}
 			
-			log.info("read called " + bytes.array().length);
+			//log.info("read called " + bytes.array().length);
 			
 			byte[] bytesRead = new byte[bytes.array().length];
 			
-			log.info("reading, data pos is " + data.position() + " data size is " + data.array().length);
+			//log.info("reading, data pos is " + data.position() + " data size is " + data.array().length);
 			
 			data.get(bytesRead);
 			
@@ -190,30 +203,56 @@
 			return bytesRead.length;
 		}
 
-		public void reset() throws Exception
+		public void position(int pos) throws Exception
 		{
 			if (!open)
 			{
 				throw new IllegalStateException("Is closed");
 			}
 			
-			log.info("reset called");
+			//log.info("reset called");
 			
-			data.position(0);
+			data.position(pos);
 		}
 
-		public void write(ByteBuffer bytes) throws Exception
+		public int write(ByteBuffer bytes, boolean sync) throws Exception
 		{
 			if (!open)
 			{
 				throw new IllegalStateException("Is closed");
 			}
 			
-			log.info("write called, position is " + data.position() + " bytes is " + bytes.array().length);
+			int position = data == null ? 0 : data.position();
 			
+			checkAndResize(bytes.capacity() + position);
+			
+			//log.info("write called, position is " + data.position() + " bytes is " + bytes.array().length);
+			
 			data.put(bytes);
+			
+			return bytes.array().length;
 		}
+		
+		private void checkAndResize(int size)
+		{
+			int oldpos = data == null ? 0 : data.position();
+			
+			if (data == null || data.array().length < size)
+			{
+				byte[] newBytes = new byte[size];
+				
+				if (data != null)
+				{
+					System.arraycopy(data.array(), 0, newBytes, 0, data.array().length);
+				}
+				
+				data = ByteBuffer.wrap(newBytes);
+				
+				data.position(oldpos);
+			}
+		}
 
+
 	}
 
 }




More information about the jboss-cvs-commits mailing list