[jboss-cvs] JBoss Messaging SVN: r3871 - in trunk: src/main/org/jboss/messaging/core/bindingmanager and 4 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Wed Mar 12 06:02:42 EDT 2008


Author: timfox
Date: 2008-03-12 06:02:42 -0400 (Wed, 12 Mar 2008)
New Revision: 3871

Added:
   trunk/src/main/org/jboss/messaging/core/bindingmanager/
   trunk/src/main/org/jboss/messaging/core/bindingmanager/BindingManager.java
   trunk/src/main/org/jboss/messaging/core/journal/RecordHistory.java
   trunk/src/main/org/jboss/messaging/core/journal/impl/RecordHistoryImpl.java
Modified:
   trunk/src/main/org/jboss/messaging/core/journal/Journal.java
   trunk/src/main/org/jboss/messaging/core/journal/RecordHandle.java
   trunk/src/main/org/jboss/messaging/core/journal/SequentialFile.java
   trunk/src/main/org/jboss/messaging/core/journal/impl/JournalFile.java
   trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java
   trunk/src/main/org/jboss/messaging/core/journal/impl/NIOSequentialFile.java
   trunk/src/main/org/jboss/messaging/core/journal/impl/RecordHandleImpl.java
   trunk/tests/src/org/jboss/messaging/core/journal/impl/test/unit/JournalTest.java
   trunk/tests/src/org/jboss/messaging/core/journal/impl/test/unit/fakes/FakeSequentialFileFactory.java
Log:
More journal work


Added: trunk/src/main/org/jboss/messaging/core/bindingmanager/BindingManager.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/bindingmanager/BindingManager.java	                        (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/bindingmanager/BindingManager.java	2008-03-12 10:02:42 UTC (rev 3871)
@@ -0,0 +1,48 @@
+/*
+  * JBoss, Home of Professional Open Source
+  * Copyright 2005, JBoss Inc., and individual contributors as indicated
+  * by the @authors tag. See the copyright.txt in the distribution for a
+  * full listing of individual contributors.
+  *
+  * This is free software; you can redistribute it and/or modify it
+  * under the terms of the GNU Lesser General Public License as
+  * published by the Free Software Foundation; either version 2.1 of
+  * the License, or (at your option) any later version.
+  *
+  * This software is distributed in the hope that it will be useful,
+  * but WITHOUT ANY WARRANTY; without even the implied warranty of
+  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+  * Lesser General Public License for more details.
+  *
+  * You should have received a copy of the GNU Lesser General Public
+  * License along with this software; if not, write to the Free
+  * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+  * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+  */
+package org.jboss.messaging.core.bindingmanager;
+
+import org.jboss.messaging.core.postoffice.Binding;
+
+/**
+ * 
+ * A BindingManager
+ * 
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ */
+public interface BindingManager
+{
+   /**
+    * Add a binding into the store
+    * @param binding The binding to add
+    * @throws Exception
+    */
+   void addBinding(Binding binding) throws Exception;
+   
+   /**
+    * Delete a binding from the store
+    * @param binding The binding to delete
+    * @throws Exception
+    */
+   void deleteBinding(Binding binding) throws Exception;
+}

Modified: trunk/src/main/org/jboss/messaging/core/journal/Journal.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/Journal.java	2008-03-12 03:05:48 UTC (rev 3870)
+++ trunk/src/main/org/jboss/messaging/core/journal/Journal.java	2008-03-12 10:02:42 UTC (rev 3871)
@@ -21,7 +21,7 @@
   */
 package org.jboss.messaging.core.journal;
 
-import java.util.Map;
+import java.util.List;
 
 
 /**
@@ -33,9 +33,32 @@
  */
 public interface Journal
 {
-	RecordHandle add(long id, byte[] bytes) throws Exception;
+	// Non transactional operations
 	
-	void delete(RecordHandle handle) throws Exception;
+	RecordHandle appendAddRecord(long id, byte[] record) throws Exception;
 	
-	Map<Long, byte[]> load() throws Exception;
+	void appendUpdateRecord(RecordHandle handle, byte[] record) throws Exception;
+	
+	void appendDeleteRecord(RecordHandle handle) throws Exception;
+	
+	// Transactional operations
+	
+	RecordHandle appendAddRecordTransactional(long txID, long id, byte[] record, boolean done) throws Exception;
+	
+	void appendUpdateRecordTransactional(long txID, RecordHandle handle, byte[] record, boolean done) throws Exception;
+	
+	void appendDeleteRecordTransactional(long txID, RecordHandle handle, boolean done) throws Exception;
+	
+	
+//	RecordHandle appendAddRecordPrepare(long txID, long id, byte[] record, boolean done) throws Exception;
+//	
+//	void appendUpdateRecordPrepare(long txID, RecordHandle handle, byte[] record, boolean done) throws Exception;
+//	
+//	void appendDeleteRecordPrepare(long txID, RecordHandle handle, boolean done) throws Exception;
+
+	
+	// Load
+	
+	List<RecordHistory> load() throws Exception;
+	
 }

Modified: trunk/src/main/org/jboss/messaging/core/journal/RecordHandle.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/RecordHandle.java	2008-03-12 03:05:48 UTC (rev 3870)
+++ trunk/src/main/org/jboss/messaging/core/journal/RecordHandle.java	2008-03-12 10:02:42 UTC (rev 3871)
@@ -30,4 +30,5 @@
  */
 public interface RecordHandle
 {
+	public long getID();
 }

Added: trunk/src/main/org/jboss/messaging/core/journal/RecordHistory.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/RecordHistory.java	                        (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/journal/RecordHistory.java	2008-03-12 10:02:42 UTC (rev 3871)
@@ -0,0 +1,38 @@
+/*
+  * JBoss, Home of Professional Open Source
+  * Copyright 2005, JBoss Inc., and individual contributors as indicated
+  * by the @authors tag. See the copyright.txt in the distribution for a
+  * full listing of individual contributors.
+  *
+  * This is free software; you can redistribute it and/or modify it
+  * under the terms of the GNU Lesser General Public License as
+  * published by the Free Software Foundation; either version 2.1 of
+  * the License, or (at your option) any later version.
+  *
+  * This software is distributed in the hope that it will be useful,
+  * but WITHOUT ANY WARRANTY; without even the implied warranty of
+  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+  * Lesser General Public License for more details.
+  *
+  * You should have received a copy of the GNU Lesser General Public
+  * License along with this software; if not, write to the Free
+  * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+  * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+  */
+package org.jboss.messaging.core.journal;
+
+import java.util.List;
+
+/**
+ * 
+ * A RecordHistory
+ * 
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ */
+public interface RecordHistory
+{
+	RecordHandle getHandle();
+	
+	List<byte[]> getRecords();
+}

Modified: trunk/src/main/org/jboss/messaging/core/journal/SequentialFile.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/SequentialFile.java	2008-03-12 03:05:48 UTC (rev 3870)
+++ trunk/src/main/org/jboss/messaging/core/journal/SequentialFile.java	2008-03-12 10:02:42 UTC (rev 3871)
@@ -39,13 +39,13 @@
 	
 	String getFileName();
 	
-	void preAllocate(int size, byte fillCharacter) throws Exception;
+	void fill(int position, int size, byte fillCharacter) throws Exception;
 	
 	void delete() throws Exception;
 
 	void write(ByteBuffer bytes) throws Exception;
 	   
-	void read(ByteBuffer bytes) throws Exception;
+	int read(ByteBuffer bytes) throws Exception;
 	
 	void reset() throws Exception;
 	

Modified: trunk/src/main/org/jboss/messaging/core/journal/impl/JournalFile.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/impl/JournalFile.java	2008-03-12 03:05:48 UTC (rev 3870)
+++ trunk/src/main/org/jboss/messaging/core/journal/impl/JournalFile.java	2008-03-12 10:02:42 UTC (rev 3871)
@@ -39,7 +39,7 @@
 	
 	private final long orderingID;
 	
-	private final Set<Long> ids = new HashSet<Long>();
+	private int refCount;
 	
 	private int offset;
 		
@@ -75,19 +75,19 @@
 		return file;
 	}
 	
-	public void addID(final long id)
+	public void incRefCount()
 	{
-		ids.add(id);
+		refCount++;
 	}
 	
-	public void removeID(final long id)
+	public void decRefCount()
 	{
-		ids.remove(id);
+		refCount--;
 	}
 	
 	public boolean isEmpty()
 	{
-		return ids.isEmpty();
+		return refCount == 0;
 	}
 	
 }

Modified: trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java	2008-03-12 03:05:48 UTC (rev 3870)
+++ trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java	2008-03-12 10:02:42 UTC (rev 3871)
@@ -25,14 +25,19 @@
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Comparator;
-import java.util.HashMap;
-import java.util.LinkedList;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Queue;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.Semaphore;
 
 import org.jboss.messaging.core.journal.Journal;
 import org.jboss.messaging.core.journal.RecordHandle;
+import org.jboss.messaging.core.journal.RecordHistory;
 import org.jboss.messaging.core.journal.SequentialFile;
 import org.jboss.messaging.core.journal.SequentialFileFactory;
 import org.jboss.messaging.core.logging.Logger;
@@ -48,38 +53,69 @@
 {
 	private static final Logger log = Logger.getLogger(JournalImpl.class);
 	
-	private static final int INT_LENGTH = 4;
+	// The sizes of primitive types
 	
-	private static final int LONG_LENGTH = 8;
+	private static final int SIZE_LONG = 8;
+   
+   private static final int SIZE_INT = 4;
+   
+   private static final int SIZE_BYTE = 1;
+   
+   //Record markers - they must be all unique
+   
+	public static final byte ADD_RECORD = 11;
 	
-	public static final byte ADD_RECORD = 1;
+	public static final byte UPDATE_RECORD = 12;
 	
-	public static final byte DELETE_RECORD = 2;
+	public static final byte DELETE_RECORD = 13;
 	
-	public static final byte FILL_CHARACTER = (byte)'J';
+	public static final byte ADD_RECORD_TX = 14;
 	
-	public static final String JOURNAL_FILE_PREFIX = "jbm";
+	public static final byte UPDATE_RECORD_TX = 15;
 	
-	public static final String JOURNAL_FILE_EXTENSION = "jbm";
+	public static final byte DELETE_RECORD_TX = 16;
 	
-   
+	//End markers - they must all be unique
 	
+	public static final byte DONE = 21;
+	
+	public static final byte TX_CONTINUE = 22;
+	
+	public static final byte TX_DONE = 23;
+	
+	public static final byte FILL_CHARACTER = 74; // Letter 'J' 
+		
+	
+	
+	  	
 	private final String journalDir;
 	
 	private final int fileSize;
 	
-	private final int numFiles;
+	private final int minFiles;
 	
+	private final int minAvailableFiles;
+	
 	private final boolean sync;
 	
 	private final SequentialFileFactory fileFactory;
 	
-	private final LinkedList<JournalFile> files = new LinkedList<JournalFile>();
+	private final long taskPeriod;
 	
-	private final LinkedList<JournalFile> availableFiles = new LinkedList<JournalFile>();
+	public final String filePrefix;
 	
-	private final LinkedList<JournalFile> filesToDelete = new LinkedList<JournalFile>();
-		
+	public final String fileExtension;
+	 
+	
+	private final Queue<JournalFile> files = new ConcurrentLinkedQueue<JournalFile>();
+	
+	private final Queue<JournalFile> availableFiles = new ConcurrentLinkedQueue<JournalFile>();
+	
+	private final Map<Long, TransactionInfo> transactions = new ConcurrentHashMap<Long, TransactionInfo>();
+	
+	private final Map<Long, List<RecordHandle>> transactionalDeletes =
+		new ConcurrentHashMap<Long, List<RecordHandle>>();
+			
 	/*
 	 * We use a semaphore rather than synchronized since it performs better when contended
 	 */
@@ -92,54 +128,152 @@
 	private volatile boolean loaded;
 	
 	private volatile long lastOrderingID;
-					
-	public JournalImpl(final String journalDir, final int fileSize, final int numFiles, final boolean sync,
-			             final SequentialFileFactory fileFactory)
+	
+	private final Timer timer = new Timer(true);
+	
+	private final TimerTask reclaimerTask = new ReclaimerTask();
+	
+	private final TimerTask availableFilesTask = new AvailableFilesTask();
+	
+
+	
+	public JournalImpl(final String journalDir, final int fileSize, final int minFiles, final int minAvailableFiles,
+			             final boolean sync, final SequentialFileFactory fileFactory, final long taskPeriod,
+			             final String filePrefix, final String fileExtension)
 	{
 		this.journalDir = journalDir;
 		
 		this.fileSize = fileSize;
 		
-		this.numFiles = numFiles;
+		this.minFiles = minFiles;
 		
+		this.minAvailableFiles = minAvailableFiles;
+		
 		this.sync = sync;
 		
 		this.fileFactory = fileFactory;
+		
+		this.taskPeriod = taskPeriod;
+		
+		this.filePrefix = filePrefix;
+		
+		this.fileExtension = fileExtension;
 	}
 	
 	// Journal implementation ----------------------------------------------------------------
 	
-	public RecordHandle add(final long id, final byte[] bytes) throws Exception
+	public ByteBuffer allocateBuffer(final int size) throws Exception
 	{
+		return ByteBuffer.allocateDirect(size);
+	}
+	
+	public RecordHandle appendAddRecord(final long id, final byte[] record) throws Exception
+	{
 		if (!loaded)
 		{
 			throw new IllegalStateException("Journal must be loaded first");
 		}
 		
-		int size = 1 + INT_LENGTH + LONG_LENGTH + bytes.length;
+		//TODO optimise to avoid creating a new byte buffer
 		
+		int size = SIZE_BYTE + SIZE_LONG + SIZE_INT + record.length + SIZE_BYTE;
+		
+		ByteBuffer bb = ByteBuffer.wrap(new byte[size]);
+		
+		bb.put(ADD_RECORD);
+		
+		bb.putLong(id);
+		
+		bb.putInt(record.length);
+		
+		bb.put(record);
+		
+		bb.put(DONE);
+		
+		bb.flip();
+			             				
 		lock.acquire();
 		
 		try
-		{   		
+		{   					
    		checkFile(size);
+   		   		
+   		currentFile.getFile().write(bb);		
    		
-   		byte[] toWrite = new byte[size];
-   		ByteBuffer bb = ByteBuffer.wrap(toWrite);
-   		bb.put(ADD_RECORD);		
-   		bb.putLong(id);
-   		bb.putInt(bytes.length);
-   		bb.put(bytes);
+   		currentFile.extendOffset(size);
    		
-   		bb.flip();
+   		RecordHandleImpl rh = new RecordHandleImpl(id);
    		
+   		rh.addFile(currentFile);
+   		
+   		return rh;
+		}
+		finally
+		{
+			lock.release();
+		}
+	}
+	
+	public RecordHandle appendAddRecordTransactional(final long txID, final long id,
+			                                           final byte[] record, final boolean done) throws Exception
+	{
+		if (!loaded)
+		{
+			throw new IllegalStateException("Journal must be loaded first");
+		}
+		
+		//TODO optimise to avoid creating a new byte buffer
+		
+		int size = SIZE_BYTE + SIZE_LONG + SIZE_LONG + SIZE_INT + record.length + SIZE_BYTE;
+		
+		ByteBuffer bb = ByteBuffer.wrap(new byte[size]);
+		
+		bb.put(ADD_RECORD);
+		
+		bb.putLong(txID);
+		
+		bb.putLong(id);
+		
+		bb.putInt(record.length);
+		
+		bb.put(record);
+		
+		if (done)
+		{
+			bb.put(TX_DONE);
+		}
+		else
+		{
+			bb.put(TX_CONTINUE);
+		}
+		
+		bb.flip();
+			             				
+		lock.acquire();
+		
+		try
+		{   					
+   		checkFile(size);
+   		   		
    		currentFile.getFile().write(bb);		
    		
    		currentFile.extendOffset(size);
    		
-   		currentFile.addID(id);
+   		RecordHandleImpl rh = new RecordHandleImpl(id);
    		
-   		return new RecordHandleImpl(id, currentFile);
+   		rh.addFile(currentFile);
+   		
+   		if (done)
+   		{   		
+      		List<RecordHandle> list = transactionalDeletes.remove(txID);
+      		
+      		if (list != null)
+   			{
+   				releaseDeletes(list);
+   			}      		
+   		} 
+   		
+   		return rh;
 		}
 		finally
 		{
@@ -147,7 +281,7 @@
 		}
 	}
 	
-	public void delete(RecordHandle handle) throws Exception
+	public void appendUpdateRecord(final RecordHandle handle, final byte[] record) throws Exception
 	{
 		if (!loaded)
 		{
@@ -156,39 +290,225 @@
 		
 		RecordHandleImpl rh = (RecordHandleImpl)handle;
 		
-		int size = 1 + LONG_LENGTH;
+		//TODO optimise to avoid creating a new byte buffer
 		
+		int size = SIZE_BYTE + SIZE_LONG + SIZE_INT + record.length + SIZE_BYTE;
+		
+		ByteBuffer bb = ByteBuffer.wrap(new byte[size]);
+		
+		bb.put(UPDATE_RECORD);
+		
+		bb.putLong(rh.getID());
+		
+		bb.putInt(record.length);
+		
+		bb.put(record);
+		
+		bb.put(DONE);
+		
+		bb.flip();
+		
 		lock.acquire();
 		
 		try
-		{		
+		{   		
    		checkFile(size);
+   		   		
+   		currentFile.getFile().write(bb);		
    		
-   		long id = rh.getID();
+   		currentFile.extendOffset(size);
    		
-   		byte[] toWrite = new byte[size];
-   		ByteBuffer bb = ByteBuffer.wrap(toWrite);
-   		bb.put(DELETE_RECORD);
-   		bb.putLong(id);
+   		rh.addFile(currentFile);
+		}
+		finally
+		{
+			lock.release();
+		}				
+	}
+	
+	public void appendUpdateRecordTransactional(final long txID, final RecordHandle handle,
+			final byte[] record, final boolean done) throws Exception
+	{
+		if (!loaded)
+		{
+			throw new IllegalStateException("Journal must be loaded first");
+		}
+		
+		RecordHandleImpl rh = (RecordHandleImpl)handle;
+		
+		//TODO optimise to avoid creating a new byte buffer
+		
+		int size = SIZE_BYTE + SIZE_LONG + SIZE_LONG + SIZE_INT + record.length + SIZE_BYTE;
+		
+		ByteBuffer bb = ByteBuffer.wrap(new byte[size]);
+		
+		bb.put(UPDATE_RECORD);
+		
+		bb.putLong(txID);
+		
+		bb.putLong(rh.getID());
+		
+		bb.putInt(record.length);
+		
+		bb.put(record);
+		
+	   if (done)
+	   {
+	   	bb.put(TX_DONE);
+	   }
+	   else
+	   {
+	   	bb.put(TX_CONTINUE);
+	   }
+		
+		bb.flip();
+		
+		lock.acquire();
+		
+		try
+		{   		
+   		checkFile(size);
+   		   		
+   		currentFile.getFile().write(bb);		
    		
-   		bb.flip();
-   		
-   		currentFile.getFile().write(bb);	
-   		
-   		JournalFile addedFile = rh.getFile();
+   		currentFile.extendOffset(size);
    		   		
-   		addedFile.removeID(id);
+   		rh.addFile(currentFile);
    		
-   		checkAndReclaimFile(addedFile);
+   		if (done)
+   		{   		
+      		List<RecordHandle> list = transactionalDeletes.remove(txID);
+      		
+      		if (list != null)
+   			{
+   				releaseDeletes(list);
+   			}      		
+   		}   		
 		}
 		finally
 		{
 			lock.release();
+		}				
+	}
+	
+	private void releaseDeletes(final List<RecordHandle> deletes)
+	{
+		for (RecordHandle handle: deletes)
+		{
+			RecordHandleImpl rh = (RecordHandleImpl)handle;
+			
+			rh.recordDeleted();
 		}
 	}
+	
+	public void appendDeleteRecord(final RecordHandle handle) throws Exception
+	{
+		if (!loaded)
+		{
+			throw new IllegalStateException("Journal must be loaded first");
+		}
 		
-	public Map<Long, byte[]> load() throws Exception
+		RecordHandleImpl rh = (RecordHandleImpl)handle;
+		
+		int size = SIZE_BYTE + SIZE_LONG + SIZE_BYTE;
+		
+		ByteBuffer buffer = ByteBuffer.wrap(new byte[size]);
+		
+		buffer.put(DELETE_RECORD);
+		
+		buffer.putLong(rh.getID());
+		
+		buffer.put(DONE);
+				
+		buffer.flip();
+								
+		lock.acquire();
+		
+		try
+		{   		
+   		checkFile(size);
+   		   		
+   		currentFile.getFile().write(buffer);		
+   		
+   		currentFile.extendOffset(size);
+   		
+   		rh.recordDeleted();
+		}
+		finally
+		{
+			lock.release();
+		}				
+	}
+	
+	public void appendDeleteRecordTransactional(final long txID, final RecordHandle handle, final boolean done) throws Exception
 	{
+		if (!loaded)
+		{
+			throw new IllegalStateException("Journal must be loaded first");
+		}
+		
+		RecordHandleImpl rh = (RecordHandleImpl)handle;
+		
+		int size = SIZE_BYTE + SIZE_LONG + SIZE_LONG + SIZE_BYTE;
+		
+		ByteBuffer buffer = ByteBuffer.wrap(new byte[size]);
+		
+		buffer.put(DELETE_RECORD);
+		
+		buffer.putLong(txID);
+		
+		buffer.putLong(rh.getID());
+		
+		if (done)
+		{
+			buffer.put(TX_DONE);
+		}
+		else
+		{
+			buffer.put(TX_CONTINUE);
+		}
+				
+		buffer.flip();
+								
+		lock.acquire();
+		
+		try
+		{   		
+   		checkFile(size);
+   		   		
+   		currentFile.getFile().write(buffer);		
+   		
+   		currentFile.extendOffset(size);
+   		
+   		//It's a transactional delete so we need to make sure file doesn't get deleted
+   		rh.addFile(currentFile);
+   		
+   		List<RecordHandle> list = transactionalDeletes.get(txID);
+   		
+   		if (list == null)
+   		{
+   			list = new ArrayList<RecordHandle>();
+   			
+   			transactionalDeletes.put(txID, list);
+   		}
+   		
+   		list.add(handle);
+   		
+   		if (done)
+   		{
+   			transactionalDeletes.remove(txID);
+   			
+   			releaseDeletes(list);
+   		}
+		}
+		finally
+		{
+			lock.release();
+		}				
+	}	
+		
+	public List<RecordHistory> load() throws Exception
+	{
 		if (loaded)
 		{
 			throw new IllegalStateException("Journal is already loaded");
@@ -196,7 +516,7 @@
 		
 		log.info("Loading...");
 		
-		List<String> fileNames = fileFactory.listFiles(journalDir, JOURNAL_FILE_EXTENSION);
+		List<String> fileNames = fileFactory.listFiles(journalDir, fileExtension);
 		
 		log.info("There are " + fileNames.size() + " files in directory");
 		
@@ -208,7 +528,7 @@
 			
 			file.open();
 			
-			ByteBuffer bb = ByteBuffer.wrap(new byte[LONG_LENGTH]);
+			ByteBuffer bb = ByteBuffer.wrap(new byte[SIZE_LONG]);
 			
 			file.read(bb);
 			
@@ -221,9 +541,9 @@
 			orderedFiles.add(new JournalFile(file, orderingID));
 		}
 		
-		log.info("numFiles is " + numFiles);
+		log.info("minFiles is " + minFiles);
 		
-		int createNum = numFiles - orderedFiles.size();
+		int createNum = minFiles - orderedFiles.size();
 		
 		//Preallocate some more if necessary
 		for (int i = 0; i < createNum; i++)
@@ -252,93 +572,290 @@
 
 		Collections.sort(orderedFiles, new JournalFileComparator());
 		
-		Map<Long, byte[]> records = new HashMap<Long, byte[]>();
-		
-		boolean filesWithData = true;
-		
-		outer: for (JournalFile file: orderedFiles)
+		Map<Long, RecordHistory> histories = new LinkedHashMap<Long, RecordHistory>();
+				
+		for (JournalFile file: orderedFiles)
 		{
 			log.info("Loading file, ordering id is " + file.getOrderingID());
 			
-			byte[] bytes = new byte[fileSize];
+			ByteBuffer bb = ByteBuffer.wrap(new byte[fileSize]);
 			
-			ByteBuffer bb = ByteBuffer.wrap(bytes);
+			int bytesRead = file.getFile().read(bb);
 			
-			file.getFile().read(bb);
+			if (bytesRead != fileSize)
+			{
+				//deal with this better
+				
+				throw new IllegalStateException("File is wrong size " + bytesRead +
+						                          " expected " + fileSize + " : " + file.getFile().getFileName());
+			}
 			
 			bb.flip();
 			
+			//First long is the ordering timestamp
 			bb.getLong();
 			
-			while (filesWithData && bb.hasRemaining())
+			boolean hasData = false;
+			
+			while (bb.hasRemaining())
 			{
 				byte recordType = bb.get();
 				
 				log.info("recordtype is " + recordType);
 				
-				if (recordType == ADD_RECORD)
+				int pos = bb.position();
+				
+				switch(recordType)
 				{
-					long id = bb.getLong();
+					case ADD_RECORD:
+					{					
+						long id = bb.getLong();
+						
+						int size = bb.getInt();
+						
+						byte[] record = new byte[size];
+						
+						bb.get(record);
+						
+						byte end = bb.get();
+						
+						if (end != DONE)
+						{
+							repairFrom(pos, file);
+						}
+						else
+						{						
+							handleAddRecord(id, file, record, histories);
+							
+							hasData = true;							
+						}
+												
+						break;
+					}										
+					case UPDATE_RECORD:						
+					{
+						long id = bb.getLong();
+						
+						int size = bb.getInt();
+						
+						byte[] record = new byte[size];
+						
+						bb.get(record);
+						
+						byte end = bb.get();
+						
+						if (end != DONE)
+						{
+							repairFrom(pos, file);
+						}
+						else
+						{					
+							handleUpdateRecord(id, file, record, histories);
+							
+							hasData = true;							
+						}
+												
+						break;
+					}					
+					case DELETE_RECORD:						
+					{
+						long id = bb.getLong();
+						
+						byte end = bb.get();
+						
+						if (end != DONE)
+						{
+							repairFrom(pos, file);
+						}
+						else
+						{						
+							handleDeleteRecord(id, histories);
+							
+							hasData = true;							
+						}
+						
+						break;
+					}					
+					case ADD_RECORD_TX:
+					{					
+						long txID = bb.getLong();
+						
+						long id = bb.getLong();
+						
+						int size = bb.getInt();
+						
+						byte[] record = new byte[size];
+						
+						bb.get(record);
+						
+						byte end = bb.get();
+						
+						if (end != TX_CONTINUE && end != TX_DONE)
+						{
+							repairFrom(pos, file);
+						}
+						else
+						{						
+							handleTransactionalRecord(ADD_RECORD, txID, id, file, record, end, histories);
+							
+							hasData = true;							
+						}
 					
-					int length = bb.getInt();
-					
-					//TODO - optimise this - no need to copy
-					
-					byte[] record = new byte[length];
-					
-					bb.get(record);
-					
-					records.put(id, record);
+						break;
+					}		
+					case UPDATE_RECORD_TX:
+					{					
+						long txID = bb.getLong();
+						
+						long id = bb.getLong();
+						
+						int size = bb.getInt();
+						
+						byte[] record = new byte[size];
+						
+						bb.get(record);
+						
+						byte end = bb.get();
+						
+						if (end != TX_CONTINUE && end != TX_DONE)
+						{
+							repairFrom(pos, file);
+						}
+						else
+						{					
+							handleTransactionalRecord(UPDATE_RECORD, txID, id, file, record, end, histories);
+							
+							hasData = true;							
+						}
+											
+						break;
+					}	
+					case DELETE_RECORD_TX:
+					{					
+						long txID = bb.getLong();
+						
+						long id = bb.getLong();
+						
+						byte end = bb.get();
+						
+						if (end != TX_CONTINUE && end != TX_DONE)
+						{
+							repairFrom(pos, file);
+						}
+						else
+						{					
+							handleTransactionalRecord(DELETE_RECORD, txID, id, file, null, end, histories);
+							
+							hasData = true;							
+						}
+											
+						break;
+					}	
+					case FILL_CHARACTER:						
+					{
+						//End of records in file - we check the file only contains fill characters from this point
+						while (bb.hasRemaining())
+						{
+							byte b = bb.get();
+							
+							if (b != FILL_CHARACTER)
+							{
+								throw new IllegalStateException("Corrupt file " + file.getFile().getFileName() +
+										" contains non fill character at position " + pos);
+							}
+						}
+						
+						break;						
+					}					
+					default:						
+					{
+						throw new IllegalStateException("Journal " + file.getFile().getFileName() +
+								                         " is corrupt, invalid record type " + recordType);
+					}
 				}
-				else if (recordType == DELETE_RECORD)
-				{
-					long id = bb.getLong();
-					
-					records.remove(id);
-				}
-				else if (recordType == FILL_CHARACTER)
-				{										
-					//Implies end of records in the file
-					
-					files.add(file);
-					
-					currentFile = file;
-					
-					filesWithData = false;
-															
-					continue outer;
-				}		
-				else
-				{
-					throw new IllegalStateException("Journal " + file.getFile().getFileName() +
-							                         " is corrupt, invalid record type " + recordType);
-				}
 			}
 				
-			if (filesWithData)
+			if (hasData)
 			{
 				log.info("Adding to files");
+				
 				files.add(file);
 			}
 			else
 			{
 				log.info("Adding to available files");
-				//Empty files with no data of importance
+				
+				//Empty files with no data
 				availableFiles.add(file);
-			}
+			}								
 		}				
+						
+		//Now it's possible that some of the files are no longer needed
 		
+		checkFilesForReclamation();
+		
+		for (JournalFile file: files)
+		{
+			currentFile = file;
+		}
+		
+		//Check we have enough available files
+		
+		checkAndCreateAvailableFiles();
+				
+		if (currentFile == null)
+		{
+			currentFile = availableFiles.remove();
+			
+			files.add(currentFile);
+		}				
+		
+		//Close all files apart from the current one
+		
+		for (JournalFile file: files)
+		{
+			if (file != currentFile)
+			{
+				file.getFile().close();
+			}
+		}
+						
+		startTasks();
+		
 		loaded = true;
 		
-		return records;
+		return new ArrayList<RecordHistory>(histories.values());
 	}
 	
+	private void repairFrom(int pos, JournalFile file) throws Exception
+	{
+		log.warn("Corruption has been detected in file: " + file.getFile().getFileName() +
+				   " in the record that starts at position " + pos + ". " + 
+				   "The most likely cause is that a crash occurred in the previous run. The corrupt record will be discarded.");
+		
+		file.getFile().fill(pos, fileSize - pos, FILL_CHARACTER);
+	}
+	
+	public void checkAndCreateAvailableFiles() throws Exception
+	{
+		log.info("Checking if we need to create more files");
+		
+		int filesToCreate = minAvailableFiles - availableFiles.size();
+		
+		for (int i = 0; i < filesToCreate; i++)
+		{
+			JournalFile file = createFile();
+			
+			availableFiles.add(file);
+		}
+	}
+	
 	public void stop() throws Exception
 	{
-		log.info("files size " + files.size());
-		log.info("available files size " + availableFiles.size());
-		log.info("files top delete size " + filesToDelete.size());
+		reclaimerTask.cancel();
 		
+		availableFilesTask.cancel();
+		
 		for (JournalFile file: files)
 		{
 			file.getFile().close();
@@ -348,85 +865,195 @@
 		{
 			file.getFile().close();
 		}
-		
-		for (JournalFile file: filesToDelete)
-		{
-			file.getFile().close();
-		}
-		
+
 		this.currentFile = null;
 		
 		files.clear();
 		
-		availableFiles.clear();
+		availableFiles.clear();			
+	}
+	
+	public void startTasks()
+	{
+		timer.schedule(reclaimerTask, taskPeriod, taskPeriod);
 		
-		filesToDelete.clear();
+		timer.schedule(availableFilesTask, taskPeriod, taskPeriod);
 	}
 	
 	// Public -----------------------------------------------------------------------------
 	
-	public LinkedList<JournalFile> getFiles()
+	public Queue<JournalFile> getFiles()
 	{
 		return files;
 	}
 	
-	public LinkedList<JournalFile> getAvailableFiles()
+	public Queue<JournalFile> getAvailableFiles()
 	{
 		return availableFiles;
 	}
 	
-	public LinkedList<JournalFile> getFilesToDelete()
+	public void checkFilesForReclamation() throws Exception
+	{		
+		log.info("checking files for reclamation");
+		
+		for (JournalFile file: files)
+		{		
+   		if (file.isEmpty() && file != currentFile)
+   		{
+   			//File can be reclaimed
+   			
+   			files.remove(file);
+   			
+   			//Re-initialise it
+   			
+   			long newOrderingID = generateOrderingID();
+   			
+   			ByteBuffer bb = ByteBuffer.wrap(new byte[SIZE_LONG]);
+   			
+   			bb.putLong(newOrderingID);
+   			
+   			SequentialFile sf = file.getFile();
+   			
+   			//Note we MUST re-fill it - otherwise we won't be able to detect corrupt records
+   			sf.fill(0, fileSize, FILL_CHARACTER);
+   			
+   			sf.write(bb);
+   			
+   			JournalFile jf = new JournalFile(sf, newOrderingID);
+   			
+   			availableFiles.add(jf);   		
+   		}
+		}
+	}
+		
+	// Private -----------------------------------------------------------------------------
+	
+	private void playTransaction(final TransactionInfo tx, final Map<Long, RecordHistory> histories)
 	{
-		return filesToDelete;
+		for (TransactionEntry entry: tx.entries)
+		{
+			switch (entry.type)
+			{
+				case ADD_RECORD:
+				{
+					handleAddRecord(entry.id, entry.file, entry.record, histories);
+					
+					break;
+				}
+				case UPDATE_RECORD:
+				{
+					handleUpdateRecord(entry.id, entry.file, entry.record, histories);
+					
+					break;
+				}
+				case DELETE_RECORD:
+				{
+					handleDeleteRecord(entry.id, histories);
+					
+					break;
+				}
+				default:
+				{
+					throw new IllegalStateException("Invalid record type " + entry.type);
+				}
+			}
+		}
 	}
 	
-	// Private -----------------------------------------------------------------------------
+	private void handleAddRecord(final long id, final JournalFile file,
+			                       final byte[] record, final Map<Long, RecordHistory> histories)
+	{
+		RecordHandleImpl handle = new RecordHandleImpl(id);
+		
+		handle.addFile(file);
+		
+		RecordHistoryImpl history = new RecordHistoryImpl(handle);
+		
+		history.addRecord(record);
+		
+		histories.put(id, history);
+	}
 	
-	private void checkAndReclaimFile(JournalFile file) throws Exception
-	{		
-		if (file.isEmpty() && file != currentFile)
+	private void handleUpdateRecord(final long id, final JournalFile file,
+         final byte[] record, final Map<Long, RecordHistory> histories)
+   {
+		RecordHistoryImpl history = (RecordHistoryImpl)histories.get(id);
+		
+		if (history == null)
 		{
-			//File can be reclaimed
+			throw new IllegalStateException("Cannot find record (update) " + id);
+		}
+		
+		RecordHandleImpl handle = (RecordHandleImpl)history.getHandle();
+		
+		handle.addFile(file);
+		
+		history.addRecord(record);	
+   }
+	
+	private void handleDeleteRecord(final long id, final Map<Long, RecordHistory> histories)
+   {
+		RecordHistoryImpl history = (RecordHistoryImpl)histories.remove(id);
+		
+		if (history == null)
+		{
+			throw new IllegalStateException("Cannot find record (delete) " + id);
+		}
+		
+		RecordHandleImpl handle = (RecordHandleImpl)history.getHandle();
+		
+		handle.recordDeleted();
+   }
+	
+	private void handleTransactionalRecord(final byte type, final long txID, final long id,
+			                                 final JournalFile file, final byte[] record,
+			                                 final byte end,
+			                                 final Map<Long, RecordHistory> histories)
+	{
+		TransactionInfo tx = transactions.get(txID);
+		
+		if (tx == null)
+		{
+			tx = new TransactionInfo();
 			
-			files.remove(file);
+			transactions.put(txID, tx);
+		}
+		
+		TransactionEntry entry = new TransactionEntry(type, id, file, record);
+		
+		tx.entries.add(entry);
+		
+		if (end == TX_DONE)
+		{
+			transactions.remove(txID);
 			
-			//TODO - add to delete file list if there are a lot of available files
-			
-			//Re-initialise it
-			
-			long newOrderingID = generateOrderingID();
-			
-			ByteBuffer bb = ByteBuffer.wrap(new byte[LONG_LENGTH]);
-			
-			bb.putLong(newOrderingID);
-			
-			SequentialFile sf = file.getFile();
-			
-			sf.reset();
-			
-			sf.write(bb);
-			
-			JournalFile jf = new JournalFile(sf, newOrderingID);
-			
-			availableFiles.add(jf);   		
+			playTransaction(tx, histories);
 		}
+		else if (end == TX_CONTINUE)
+		{
+			//
+		}
+		else
+		{
+			throw new IllegalStateException("Invalid transaction marker " + end);
+		}
 	}
-	
+			
 	private JournalFile createFile() throws Exception
 	{
 		log.info("Creating a new file");
 		
 		long orderingID = generateOrderingID();
 		
-		String fileName = journalDir + "/" + JOURNAL_FILE_PREFIX + "-" + orderingID + "." + JOURNAL_FILE_EXTENSION;
+		String fileName = journalDir + "/" + filePrefix + "-" + orderingID + "." + fileExtension;
 						
 		SequentialFile sequentialFile = fileFactory.createSequentialFile(fileName, sync);
 		
 		sequentialFile.open();
 						
-		sequentialFile.preAllocate(fileSize, FILL_CHARACTER);
+		sequentialFile.fill(0, fileSize, FILL_CHARACTER);
 		
-		ByteBuffer bb = ByteBuffer.wrap(new byte[LONG_LENGTH]);
+		ByteBuffer bb = ByteBuffer.wrap(new byte[SIZE_LONG]);
 		
 		bb.putLong(orderingID);
 		
@@ -465,7 +1092,7 @@
 	private void checkFile(final int size) throws Exception
 	{
 		//We take into account the first timestamp long
-		if (size > fileSize - LONG_LENGTH)
+		if (size > fileSize - SIZE_LONG)
 		{
 			throw new IllegalArgumentException("Record is too large to store " + size);
 		}
@@ -476,23 +1103,87 @@
 			
 			if (currentFile != null)
 			{
-				currentFile.getFile().close();
-				
-				checkAndReclaimFile(currentFile);
+				currentFile.getFile().close();								
 			}
 			
 			log.info("Getting new file");
 			
-			if (!availableFiles.isEmpty())
+			checkAndCreateAvailableFiles();
+			
+		   currentFile = availableFiles.remove();			
+
+			files.add(currentFile);
+		}
+	}
+	
+	private static class TransactionInfo
+	{
+		final List<TransactionEntry> entries = new ArrayList<TransactionEntry>();
+	}
+	
+	private static class TransactionEntry
+	{
+		TransactionEntry(final byte type, final long id, final JournalFile file, final byte[] record)
+		{
+			this.type = type;
+			this.id = id;
+			this.file = file;
+			this.record = record;
+		}
+		final byte type;
+		final long id;
+		final JournalFile file;
+		final byte[] record;
+	}
+	
+	private class ReclaimerTask extends TimerTask
+	{
+		public boolean cancel()
+		{
+			timer.cancel();
+			
+			return super.cancel();
+		}
+
+		public void run()
+		{
+			try
 			{
-				currentFile = availableFiles.remove();			
+				checkFilesForReclamation();
 			}
-			else
+			catch (Exception e)
 			{
-				currentFile = createFile();			
+				log.error("Failure in running reclaimer", e);
+				
+				cancel();
 			}
+		}
+		
+	}
+	
+	private class AvailableFilesTask extends TimerTask
+	{
+		public boolean cancel()
+		{
+			timer.cancel();
 			
-			files.add(currentFile);
+			return super.cancel();
 		}
+
+		public void run()
+		{
+			try
+			{
+				checkAndCreateAvailableFiles();
+			}
+			catch (Exception e)
+			{
+				log.error("Failure in running availableFileChecker", e);
+				
+				cancel();
+			}
+		}
+		
 	}
+	
 }

Modified: trunk/src/main/org/jboss/messaging/core/journal/impl/NIOSequentialFile.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/impl/NIOSequentialFile.java	2008-03-12 03:05:48 UTC (rev 3870)
+++ trunk/src/main/org/jboss/messaging/core/journal/impl/NIOSequentialFile.java	2008-03-12 10:02:42 UTC (rev 3871)
@@ -71,7 +71,7 @@
 		channel = rfile.getChannel();		
 	}
 	
-	public void preAllocate(final int size, final byte fillCharacter) throws Exception
+	public void fill(final int position, final int size, final byte fillCharacter) throws Exception
 	{
 		ByteBuffer bb = ByteBuffer.allocateDirect(size);
 		
@@ -82,7 +82,7 @@
 		
 		bb.flip();
 
-		channel.position(0);
+		channel.position(position);
 
 		channel.write(bb);
 
@@ -103,11 +103,13 @@
 		file.delete();
 	}
 
-	public void read(ByteBuffer bytes) throws Exception
+	public int read(ByteBuffer bytes) throws Exception
 	{
 		int bytesRead = channel.read(bytes);
 		
 		log.info("Read " + bytesRead + " bytes");
+		
+		return bytesRead;
 	}
 
 	public void write(ByteBuffer bytes) throws Exception
@@ -117,7 +119,7 @@
 		if (sync)
 		{
 			channel.force(false);
-		};
+		}
 	}
 
 	public void reset() throws Exception

Modified: trunk/src/main/org/jboss/messaging/core/journal/impl/RecordHandleImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/impl/RecordHandleImpl.java	2008-03-12 03:05:48 UTC (rev 3870)
+++ trunk/src/main/org/jboss/messaging/core/journal/impl/RecordHandleImpl.java	2008-03-12 10:02:42 UTC (rev 3871)
@@ -1,5 +1,8 @@
 package org.jboss.messaging.core.journal.impl;
 
+import java.util.ArrayList;
+import java.util.List;
+
 import org.jboss.messaging.core.journal.RecordHandle;
 
 /**
@@ -13,13 +16,18 @@
 {
 	private final long id;
 	
-	private final JournalFile file;
+	private List<JournalFile> files = new ArrayList<JournalFile>();
 	
-	public RecordHandleImpl(final long id, final JournalFile file)
+	public RecordHandleImpl(final long id)
 	{
 		this.id = id;
+	}
+	
+	public void addFile(JournalFile file)
+	{
+		files.add(file);
 		
-		this.file = file;
+		file.incRefCount();
 	}
 	
 	public long getID()
@@ -27,8 +35,11 @@
 		return id;
 	}
 	
-	public JournalFile getFile()
+	public void recordDeleted()
 	{
-		return file;
+		for (JournalFile file: files)
+		{
+			file.decRefCount();
+		}
 	}
 }

Added: trunk/src/main/org/jboss/messaging/core/journal/impl/RecordHistoryImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/impl/RecordHistoryImpl.java	                        (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/journal/impl/RecordHistoryImpl.java	2008-03-12 10:02:42 UTC (rev 3871)
@@ -0,0 +1,41 @@
+package org.jboss.messaging.core.journal.impl;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.jboss.messaging.core.journal.RecordHandle;
+import org.jboss.messaging.core.journal.RecordHistory;
+
+/**
+ * 
+ * A RecordHistoryImpl
+ * 
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ */
+public class RecordHistoryImpl implements RecordHistory
+{
+	private final List<byte[]> records = new ArrayList<byte[]>();
+	
+	private final RecordHandle handle;
+	
+	public RecordHistoryImpl(final RecordHandle handle)
+	{
+		this.handle = handle;
+	}
+	
+	public RecordHandle getHandle()
+	{
+		return handle;
+	}
+
+	public List<byte[]> getRecords()
+	{
+		return records;
+	}
+	
+	public void addRecord(final byte[] record)
+	{
+		records.add(record);
+	}	
+}

Modified: trunk/tests/src/org/jboss/messaging/core/journal/impl/test/unit/JournalTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/core/journal/impl/test/unit/JournalTest.java	2008-03-12 03:05:48 UTC (rev 3870)
+++ trunk/tests/src/org/jboss/messaging/core/journal/impl/test/unit/JournalTest.java	2008-03-12 10:02:42 UTC (rev 3871)
@@ -25,7 +25,6 @@
 import java.nio.ByteBuffer;
 import java.util.Map;
 
-import org.jboss.messaging.core.journal.SequentialFile;
 import org.jboss.messaging.core.journal.impl.JournalImpl;
 import org.jboss.messaging.core.journal.impl.test.unit.fakes.FakeSequentialFileFactory;
 import org.jboss.messaging.core.journal.impl.test.unit.fakes.FakeSequentialFileFactory.FakeSequentialFile;
@@ -60,26 +59,32 @@
 	
 	public void testLoad() throws Exception
 	{
-		final int numFiles = 10;
+		final int minFiles = 10;
 		
+		final int minAvailableFiles = 10;
+		
 		final int fileSize = 10 * 1024;
 		
 		final boolean sync = true;
 		
+		final String filePrefix = "jbm";
+		
+		final String fileExtension = "jbm";
+		
 		long timeStart = System.currentTimeMillis();
 		
-		JournalImpl journal = new JournalImpl(journalDir, fileSize, numFiles, sync, factory);
+		JournalImpl journal =
+			new JournalImpl(journalDir, fileSize, minFiles, minAvailableFiles, sync, factory, 5000, filePrefix, fileExtension);
 		
 		journal.load();
 		
 		long timeEnd = System.currentTimeMillis();
 		
 		assertEquals(1, journal.getFiles().size());
-		assertEquals(numFiles - 1, journal.getAvailableFiles().size());
-		assertEquals(0, journal.getFilesToDelete().size());
+		assertEquals(minFiles - 1, journal.getAvailableFiles().size());
+	
+		assertEquals(minFiles, factory.getFileMap().size());
 		
-		assertEquals(numFiles, factory.getFileMap().size());
-		
 		for (Map.Entry<String, FakeSequentialFile> entry: factory.getFileMap().entrySet())
 		{
 			FakeSequentialFile file = (FakeSequentialFile)entry.getValue();
@@ -98,7 +103,7 @@
 			long orderingID = bb.getLong();
 			
 			String expectedFilename =
-				journalDir + "/" + JournalImpl.JOURNAL_FILE_PREFIX + "-" + orderingID + "." + JournalImpl.JOURNAL_FILE_EXTENSION;
+				journalDir + "/" + filePrefix + "-" + orderingID + "." + fileExtension;
 			
 			assertEquals(expectedFilename, file.getFileName());
 			
@@ -128,22 +133,21 @@
 		
 		assertEquals(0, journal.getFiles().size());
 		assertEquals(0, journal.getAvailableFiles().size());
-		assertEquals(0, journal.getFilesToDelete().size());
-		
+
 		//Now reload
 		
-		journal = new JournalImpl(journalDir, fileSize, numFiles, sync, factory);
+		journal = new JournalImpl(journalDir, fileSize, minFiles, minAvailableFiles, sync, factory, 5000, filePrefix, fileExtension);
 		
+		
 		log.info("******** reloading");
 		
 		journal.load();
 		
 		assertEquals(1, journal.getFiles().size());
-		assertEquals(numFiles - 1, journal.getAvailableFiles().size());
-		assertEquals(0, journal.getFilesToDelete().size());
+		assertEquals(minFiles - 1, journal.getAvailableFiles().size());
+	
+		assertEquals(minFiles, factory.getFileMap().size());
 		
-		assertEquals(numFiles, factory.getFileMap().size());
-		
 		for (Map.Entry<String, FakeSequentialFile> entry: factory.getFileMap().entrySet())
 		{
 			FakeSequentialFile file = (FakeSequentialFile)entry.getValue();
@@ -162,7 +166,7 @@
 			long orderingID = bb.getLong();
 			
 			String expectedFilename =
-				journalDir + "/" + JournalImpl.JOURNAL_FILE_PREFIX + "-" + orderingID + "." + JournalImpl.JOURNAL_FILE_EXTENSION;
+				journalDir + "/" + filePrefix + "-" + orderingID + "." + fileExtension;
 			
 			assertEquals(expectedFilename, file.getFileName());
 			

Modified: trunk/tests/src/org/jboss/messaging/core/journal/impl/test/unit/fakes/FakeSequentialFileFactory.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/core/journal/impl/test/unit/fakes/FakeSequentialFileFactory.java	2008-03-12 03:05:48 UTC (rev 3870)
+++ trunk/tests/src/org/jboss/messaging/core/journal/impl/test/unit/fakes/FakeSequentialFileFactory.java	2008-03-12 10:02:42 UTC (rev 3871)
@@ -151,7 +151,7 @@
 			open = true;
 		}
 
-		public void preAllocate(int size, byte fillCharacter) throws Exception
+		public void fill(int pos, int size, byte fillCharacter) throws Exception
 		{		
 			if (!open)
 			{
@@ -162,7 +162,7 @@
 			
 			byte[] bytes = new byte[size];
 			
-			for (int i = 0; i < size; i++)
+			for (int i = pos; i < size + pos; i++)
 			{
 				bytes[i] = fillCharacter;
 			}
@@ -170,7 +170,7 @@
 			data = ByteBuffer.wrap(bytes);		
 		}
 
-		public void read(ByteBuffer bytes) throws Exception
+		public int read(ByteBuffer bytes) throws Exception
 		{
 			if (!open)
 			{
@@ -186,6 +186,8 @@
 			data.get(bytesRead);
 			
 			bytes.put(bytesRead);
+			
+			return bytesRead.length;
 		}
 
 		public void reset() throws Exception




More information about the jboss-cvs-commits mailing list