[jboss-cvs] JBoss Messaging SVN: r3852 - in trunk/src/main/org/jboss/messaging/core/journal: impl and 1 other directory.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Fri Mar 7 10:46:57 EST 2008


Author: timfox
Date: 2008-03-07 10:46:57 -0500 (Fri, 07 Mar 2008)
New Revision: 3852

Added:
   trunk/src/main/org/jboss/messaging/core/journal/RecordHandle.java
   trunk/src/main/org/jboss/messaging/core/journal/SequentialFileFactory.java
   trunk/src/main/org/jboss/messaging/core/journal/impl/NIOSequentialFileFactory.java
   trunk/src/main/org/jboss/messaging/core/journal/impl/RecordHandleImpl.java
Modified:
   trunk/src/main/org/jboss/messaging/core/journal/Journal.java
   trunk/src/main/org/jboss/messaging/core/journal/SequentialFile.java
   trunk/src/main/org/jboss/messaging/core/journal/impl/JournalFile.java
   trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java
   trunk/src/main/org/jboss/messaging/core/journal/impl/NIOSequentialFile.java
Log:
More journal work


Modified: trunk/src/main/org/jboss/messaging/core/journal/Journal.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/Journal.java	2008-03-06 18:22:05 UTC (rev 3851)
+++ trunk/src/main/org/jboss/messaging/core/journal/Journal.java	2008-03-07 15:46:57 UTC (rev 3852)
@@ -21,7 +21,9 @@
   */
 package org.jboss.messaging.core.journal;
 
+import java.util.Map;
 
+
 /**
  * 
  * A Journal
@@ -31,11 +33,9 @@
  */
 public interface Journal
 {
-	void preAllocateFiles(int numFiles) throws Exception;
+	RecordHandle add(long id, byte[] bytes) throws Exception;
 	
-	void add(long id, byte[] bytes) throws Exception;
+	void delete(RecordHandle handle) throws Exception;
 	
-	void delete(long id) throws Exception;
-	
-	void load() throws Exception;
+	Map<Long, byte[]> load() throws Exception;
 }

Added: trunk/src/main/org/jboss/messaging/core/journal/RecordHandle.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/RecordHandle.java	                        (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/journal/RecordHandle.java	2008-03-07 15:46:57 UTC (rev 3852)
@@ -0,0 +1,33 @@
+/*
+  * JBoss, Home of Professional Open Source
+  * Copyright 2005, JBoss Inc., and individual contributors as indicated
+  * by the @authors tag. See the copyright.txt in the distribution for a
+  * full listing of individual contributors.
+  *
+  * This is free software; you can redistribute it and/or modify it
+  * under the terms of the GNU Lesser General Public License as
+  * published by the Free Software Foundation; either version 2.1 of
+  * the License, or (at your option) any later version.
+  *
+  * This software is distributed in the hope that it will be useful,
+  * but WITHOUT ANY WARRANTY; without even the implied warranty of
+  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+  * Lesser General Public License for more details.
+  *
+  * You should have received a copy of the GNU Lesser General Public
+  * License along with this software; if not, write to the Free
+  * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+  * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+  */
+package org.jboss.messaging.core.journal;
+
+/**
+ * 
+ * A RecordHandle
+ * 
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ */
+public interface RecordHandle
+{
+}

Modified: trunk/src/main/org/jboss/messaging/core/journal/SequentialFile.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/SequentialFile.java	2008-03-06 18:22:05 UTC (rev 3851)
+++ trunk/src/main/org/jboss/messaging/core/journal/SequentialFile.java	2008-03-07 15:46:57 UTC (rev 3852)
@@ -32,19 +32,22 @@
  */
 public interface SequentialFile
 {
-	long getOrderingID();
+	/*
+	 * Creates the file if it doesn't already exist, then opens it
+	 */
+	void open() throws Exception;
 	
-	void load() throws Exception;
+	String getFileName();
 	
-	void create() throws Exception;
+	void preAllocate(int size, byte fillCharacter) throws Exception;
 	
-	void initialise(long orderingID, int size) throws Exception;
-	
 	void delete() throws Exception;
 
 	void write(ByteBuffer bytes) throws Exception;
 	   
 	void read(ByteBuffer bytes) throws Exception;
 	
+	void reset() throws Exception;
+	
 	void close() throws Exception;
 }

Added: trunk/src/main/org/jboss/messaging/core/journal/SequentialFileFactory.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/SequentialFileFactory.java	                        (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/journal/SequentialFileFactory.java	2008-03-07 15:46:57 UTC (rev 3852)
@@ -0,0 +1,38 @@
+/*
+  * JBoss, Home of Professional Open Source
+  * Copyright 2005, JBoss Inc., and individual contributors as indicated
+  * by the @authors tag. See the copyright.txt in the distribution for a
+  * full listing of individual contributors.
+  *
+  * This is free software; you can redistribute it and/or modify it
+  * under the terms of the GNU Lesser General Public License as
+  * published by the Free Software Foundation; either version 2.1 of
+  * the License, or (at your option) any later version.
+  *
+  * This software is distributed in the hope that it will be useful,
+  * but WITHOUT ANY WARRANTY; without even the implied warranty of
+  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+  * Lesser General Public License for more details.
+  *
+  * You should have received a copy of the GNU Lesser General Public
+  * License along with this software; if not, write to the Free
+  * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+  * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+  */
+package org.jboss.messaging.core.journal;
+
+import java.util.List;
+
+/**
+ * 
+ * A SequentialFileFactory
+ * 
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ */
+public interface SequentialFileFactory
+{
+	SequentialFile createSequentialFile(String fileName, boolean sync) throws Exception;
+	
+	List<String> listFiles(String journalDir, String extension) throws Exception;
+}

Modified: trunk/src/main/org/jboss/messaging/core/journal/impl/JournalFile.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/impl/JournalFile.java	2008-03-06 18:22:05 UTC (rev 3851)
+++ trunk/src/main/org/jboss/messaging/core/journal/impl/JournalFile.java	2008-03-07 15:46:57 UTC (rev 3852)
@@ -37,16 +37,20 @@
 {
 	private final SequentialFile file;
 	
+	private final long orderingID;
+	
 	private final Set<Long> ids = new HashSet<Long>();
 	
 	private int offset;
-	
-	public JournalFile(SequentialFile file)
+		
+	public JournalFile(final SequentialFile file, final long orderingID)
 	{
 		this.file = file;
+		
+		this.orderingID = orderingID;
 	}
 	
-	public void extendOffset(int delta)
+	public void extendOffset(final int delta)
 	{
 		offset += delta;
 	}
@@ -56,9 +60,34 @@
 		return offset;
 	}
 	
+	public long getOrderingID()
+	{
+		return orderingID;
+	}
+	
+	public void resetOffset()
+	{
+		offset = 0;
+	}
+	
 	public SequentialFile getFile()
 	{
 		return file;
 	}
 	
+	public void addID(final long id)
+	{
+		ids.add(id);
+	}
+	
+	public void removeID(final long id)
+	{
+		ids.remove(id);
+	}
+	
+	public boolean isEmpty()
+	{
+		return ids.isEmpty();
+	}
+	
 }

Modified: trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java	2008-03-06 18:22:05 UTC (rev 3851)
+++ trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java	2008-03-07 15:46:57 UTC (rev 3852)
@@ -21,8 +21,6 @@
   */
 package org.jboss.messaging.core.journal.impl;
 
-import java.io.File;
-import java.io.FilenameFilter;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -31,9 +29,12 @@
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.Semaphore;
 
 import org.jboss.messaging.core.journal.Journal;
+import org.jboss.messaging.core.journal.RecordHandle;
 import org.jboss.messaging.core.journal.SequentialFile;
+import org.jboss.messaging.core.journal.SequentialFileFactory;
 import org.jboss.messaging.core.logging.Logger;
 
 /**
@@ -51,110 +52,230 @@
 	
 	private static final int LONG_LENGTH = 8;
 	
-	private static final byte ADD_RECORD = 1;
+	public static final byte ADD_RECORD = 1;
 	
-	private static final byte DELETE_RECORD = 2;
+	public static final byte DELETE_RECORD = 2;
 	
-	private static final String JOURNAL_FILE_PREFIX = "journal-";
+	public static final byte FILL_CHARACTER = (byte)'J';
 	
-	private static final String JOURNAL_FILE_EXTENSION = "jbm";
+	public static final String JOURNAL_FILE_PREFIX = "jbm";
 	
-   	
+	public static final String JOURNAL_FILE_EXTENSION = "jbm";
+	
+   
+	
 	private final String journalDir;
 	
 	private final int fileSize;
 	
-	private JournalFile currentFile ;
+	private final int numFiles;
 	
-	private List<JournalFile> files = new ArrayList<JournalFile>();
+	private final boolean sync;
 	
-	private LinkedList<JournalFile> availableFiles = new LinkedList<JournalFile>();
+	private final SequentialFileFactory fileFactory;
 	
-	private int fileSequence;
+	private final LinkedList<JournalFile> files = new LinkedList<JournalFile>();
 	
+	private final LinkedList<JournalFile> availableFiles = new LinkedList<JournalFile>();
 	
-	public JournalImpl(final String journalDir, final int fileSize)
+	private final LinkedList<JournalFile> filesToDelete = new LinkedList<JournalFile>();
+		
+	/*
+	 * We use a semaphore rather than synchronized since it performs better when contended
+	 */
+	
+	//TODO - improve concurrency by allowing concurrent accesses if doesn't change current file
+	private final Semaphore lock = new Semaphore(1, true);
+		
+	private volatile JournalFile currentFile ;
+		
+	private volatile boolean loaded;
+	
+	private volatile long lastOrderingID;
+					
+	public JournalImpl(final String journalDir, final int fileSize, final int numFiles, final boolean sync,
+			             final SequentialFileFactory fileFactory)
 	{
 		this.journalDir = journalDir;
 		
 		this.fileSize = fileSize;
+		
+		this.numFiles = numFiles;
+		
+		this.sync = sync;
+		
+		this.fileFactory = fileFactory;
 	}
 	
-	public void preAllocateFiles(int numFiles) throws Exception
+	// Journal implementation ----------------------------------------------------------------
+	
+	public RecordHandle add(final long id, final byte[] bytes) throws Exception
 	{
-		log.info("Pre-allocating " + numFiles + " files");
-		
-		for (int i = 0; i < numFiles; i++)
+		if (!loaded)
 		{
-			JournalFile info = createFile();
-			
-			availableFiles.add(info);
+			throw new IllegalStateException("Journal must be loaded first");
 		}
 		
-		log.info("Done pre-allocate");
-	}
-		
-	public void add(long id, byte[] bytes) throws Exception
-	{
 		int size = 1 + INT_LENGTH + LONG_LENGTH + bytes.length;
 		
-		checkFile(size);
+		lock.acquire();
 		
-		byte[] toWrite = new byte[size];
-		ByteBuffer bb = ByteBuffer.wrap(toWrite);
-		bb.put(ADD_RECORD);		
-		bb.putLong(id);
-		bb.putInt(bytes.length);
-		bb.put(bytes);
-		
-		bb.flip();
-		
-		currentFile.getFile().write(bb);		
-		
-		currentFile.extendOffset(size);
+		try
+		{   		
+   		checkFile(size);
+   		
+   		byte[] toWrite = new byte[size];
+   		ByteBuffer bb = ByteBuffer.wrap(toWrite);
+   		bb.put(ADD_RECORD);		
+   		bb.putLong(id);
+   		bb.putInt(bytes.length);
+   		bb.put(bytes);
+   		
+   		bb.flip();
+   		
+   		currentFile.getFile().write(bb);		
+   		
+   		currentFile.extendOffset(size);
+   		
+   		currentFile.addID(id);
+   		
+   		return new RecordHandleImpl(id, currentFile);
+		}
+		finally
+		{
+			lock.release();
+		}
 	}
 	
-	public void delete(long id) throws Exception
+	public void delete(RecordHandle handle) throws Exception
 	{
-		int size = 1 + LONG_LENGTH;
+		if (!loaded)
+		{
+			throw new IllegalStateException("Journal must be loaded first");
+		}
 		
-		checkFile(size);
+		RecordHandleImpl rh = (RecordHandleImpl)handle;
 		
-		byte[] toWrite = new byte[size];
-		ByteBuffer bb = ByteBuffer.wrap(toWrite);
-		bb.put(DELETE_RECORD);
-		bb.putLong(id);
+		int size = 1 + LONG_LENGTH;
 		
-		bb.flip();
+		lock.acquire();
 		
-		currentFile.getFile().write(bb);			
+		try
+		{		
+   		checkFile(size);
+   		
+   		long id = rh.getID();
+   		
+   		byte[] toWrite = new byte[size];
+   		ByteBuffer bb = ByteBuffer.wrap(toWrite);
+   		bb.put(DELETE_RECORD);
+   		bb.putLong(id);
+   		
+   		bb.flip();
+   		
+   		currentFile.getFile().write(bb);	
+   		
+   		JournalFile addedFile = rh.getFile();
+   		   		
+   		addedFile.removeID(id);
+   		
+   		checkAndReclaimFile(addedFile);
+		}
+		finally
+		{
+			lock.release();
+		}
 	}
-	
-	private boolean loaded;
-	
-	public void load() throws Exception
+		
+	public Map<Long, byte[]> load() throws Exception
 	{
 		if (loaded)
 		{
 			throw new IllegalStateException("Journal is already loaded");
 		}
 		
-		loadFiles();
+		log.info("Loading...");
 		
-		Map<Long, byte[]> records = new HashMap<Long, byte[]>();
+		List<String> fileNames = fileFactory.listFiles(journalDir, JOURNAL_FILE_EXTENSION);
+		
+		log.info("There are " + fileNames.size() + " files in directory");
+		
+		List<JournalFile> orderedFiles = new ArrayList<JournalFile>(fileNames.size());
 				
-		for (JournalFile file: this.files)
+		for (String fileName: fileNames)
 		{
+			SequentialFile file = fileFactory.createSequentialFile(fileName, sync);
+			
+			file.open();
+			
+			ByteBuffer bb = ByteBuffer.wrap(new byte[LONG_LENGTH]);
+			
+			file.read(bb);
+			
+			bb.flip();
+			
+			long orderingID = bb.getLong();
+			
+			file.reset();
+							
+			orderedFiles.add(new JournalFile(file, orderingID));
+		}
+		
+		log.info("numFiles is " + numFiles);
+		
+		int createNum = numFiles - orderedFiles.size();
+		
+		//Preallocate some more if necessary
+		for (int i = 0; i < createNum; i++)
+		{
+			JournalFile file = createFile();
+			
+			orderedFiles.add(file);
+			
+			log.info("Created new file");
+		}
+		
+		log.info("Done creating new ones");
+			
+		//Now order them by ordering id - we can't use the file name for ordering since we can re-use files
+		
+		class JournalFileComparator implements Comparator<JournalFile>
+		{
+			public int compare(JournalFile f1, JournalFile f2)
+	      {
+	         long id1 = f1.getOrderingID();
+	         long id2 = f2.getOrderingID();
+
+	         return (id1 < id2 ? -1 : (id1 == id2 ? 0 : 1));
+	      }
+		}
+
+		Collections.sort(orderedFiles, new JournalFileComparator());
+		
+		Map<Long, byte[]> records = new HashMap<Long, byte[]>();
+		
+		boolean filesWithData = true;
+		
+		outer: for (JournalFile file: orderedFiles)
+		{
+			log.info("Loading file, ordering id is " + file.getOrderingID());
+			
 			byte[] bytes = new byte[fileSize];
 			
 			ByteBuffer bb = ByteBuffer.wrap(bytes);
 			
 			file.getFile().read(bb);
 			
-			while (true)
+			bb.flip();
+			
+			bb.getLong();
+			
+			while (filesWithData && bb.hasRemaining())
 			{
 				byte recordType = bb.get();
 				
+				log.info("recordtype is " + recordType);
+				
 				if (recordType == ADD_RECORD)
 				{
 					long id = bb.getLong();
@@ -175,92 +296,193 @@
 					
 					records.remove(id);
 				}
+				else if (recordType == FILL_CHARACTER)
+				{										
+					//Implies end of records in the file
+					
+					files.add(file);
+					
+					currentFile = file;
+					
+					filesWithData = false;
+															
+					continue outer;
+				}		
 				else
 				{
-					//Implies end of records in the file
-					break;
-					
-					//TODO set currentFile and offset
+					throw new IllegalStateException("Journal " + file.getFile().getFileName() +
+							                         " is corrupt, invalid record type " + recordType);
 				}
 			}
-			
-		}
+				
+			if (filesWithData)
+			{
+				log.info("Adding to files");
+				files.add(file);
+			}
+			else
+			{
+				log.info("Adding to available files");
+				//Empty files with no data of importance
+				availableFiles.add(file);
+			}
+		}				
 		
+		loaded = true;
 		
+		return records;
 	}
 	
-	// Private -----------------------------------------------------------------------------
-	
-	private void loadFiles() throws Exception
+	public void stop() throws Exception
 	{
-		File dir = new File(journalDir);
+		log.info("files size " + files.size());
+		log.info("available files size " + availableFiles.size());
+		log.info("files top delete size " + filesToDelete.size());
 		
-		FilenameFilter fnf = new FilenameFilter()
+		for (JournalFile file: files)
 		{
-			public boolean accept(File file, String name)
-			{
-				return name.endsWith(".jbm");
-			}
-		};
+			file.getFile().close();
+		}
 		
-		String[] fileNames = dir.list(fnf);
-		
-		List<JournalFile> files = new ArrayList<JournalFile>(fileNames.length);
-				
-		for (String fileName: fileNames)
+		for (JournalFile file: availableFiles)
 		{
-			SequentialFile file = new NIOSequentialFile(fileName, true);
-			
-			file.load();
-			
-			files.add(new JournalFile(file));
+			file.getFile().close();
 		}
 		
-		//Now order them by ordering id - we can't use the file name for ordering since we can re-use files
-		
-		class JournalFileComparator implements Comparator<JournalFile>
+		for (JournalFile file: filesToDelete)
 		{
-			public int compare(JournalFile f1, JournalFile f2)
-	      {
-	         long id1 = f1.getFile().getOrderingID();
-	         long id2 = f2.getFile().getOrderingID();
-
-	         return (id1 < id2 ? -1 : (id1 == id2 ? 0 : 1));
-	      }
+			file.getFile().close();
 		}
-
-		Collections.sort(files, new JournalFileComparator());
 		
-		for (JournalFile file: files)
+		this.currentFile = null;
+		
+		files.clear();
+		
+		availableFiles.clear();
+		
+		filesToDelete.clear();
+	}
+	
+	// Public -----------------------------------------------------------------------------
+	
+	public LinkedList<JournalFile> getFiles()
+	{
+		return files;
+	}
+	
+	public LinkedList<JournalFile> getAvailableFiles()
+	{
+		return availableFiles;
+	}
+	
+	public LinkedList<JournalFile> getFilesToDelete()
+	{
+		return filesToDelete;
+	}
+	
+	// Private -----------------------------------------------------------------------------
+	
+	private void checkAndReclaimFile(JournalFile file) throws Exception
+	{		
+		if (file.isEmpty() && file != currentFile)
 		{
+			//File can be reclaimed
 			
+			files.remove(file);
+			
+			//TODO - add to delete file list if there are a lot of available files
+			
+			//Re-initialise it
+			
+			long newOrderingID = generateOrderingID();
+			
+			ByteBuffer bb = ByteBuffer.wrap(new byte[LONG_LENGTH]);
+			
+			bb.putLong(newOrderingID);
+			
+			SequentialFile sf = file.getFile();
+			
+			sf.reset();
+			
+			sf.write(bb);
+			
+			JournalFile jf = new JournalFile(sf, newOrderingID);
+			
+			availableFiles.add(jf);   		
 		}
 	}
 	
 	private JournalFile createFile() throws Exception
 	{
-		int fileNo = fileSequence++;
+		log.info("Creating a new file");
 		
-		String fileName = journalDir + "/" + JOURNAL_FILE_PREFIX + fileNo + "." + JOURNAL_FILE_EXTENSION;
+		long orderingID = generateOrderingID();
 		
-		SequentialFile sequentialFile = new NIOSequentialFile(fileName, true);
+		String fileName = journalDir + "/" + JOURNAL_FILE_PREFIX + "-" + orderingID + "." + JOURNAL_FILE_EXTENSION;
+						
+		SequentialFile sequentialFile = fileFactory.createSequentialFile(fileName, sync);
 		
-		sequentialFile.create();
+		sequentialFile.open();
+						
+		sequentialFile.preAllocate(fileSize, FILL_CHARACTER);
 		
-		JournalFile info = new JournalFile(sequentialFile);
+		ByteBuffer bb = ByteBuffer.wrap(new byte[LONG_LENGTH]);
 		
+		bb.putLong(orderingID);
+		
+		bb.flip();
+		
+		sequentialFile.write(bb);
+		
+		sequentialFile.reset();
+		
+		JournalFile info = new JournalFile(sequentialFile, orderingID);
+		
 		return info;
 	}
 	
-	private void checkFile(int size) throws Exception
+	private long generateOrderingID()
 	{
+		long orderingID = System.currentTimeMillis();
+		
+		while (orderingID == lastOrderingID)
+		{
+			//Ensure it's unique
+			try
+			{				
+				Thread.sleep(1);
+			}
+			catch (InterruptedException ignore)
+			{				
+			}
+			orderingID = System.currentTimeMillis();
+		}
+		lastOrderingID = orderingID;	
+		
+		return orderingID;
+	}
+	
+	private void checkFile(final int size) throws Exception
+	{
+		//We take into account the first timestamp long
+		if (size > fileSize - LONG_LENGTH)
+		{
+			throw new IllegalArgumentException("Record is too large to store " + size);
+		}
+		
 		if (currentFile == null || fileSize - currentFile.getOffset() < size)
 		{
+			log.info("Getting new file");
+			
 			if (currentFile != null)
 			{
 				currentFile.getFile().close();
+				
+				checkAndReclaimFile(currentFile);
 			}
 			
+			log.info("Getting new file");
+			
 			if (!availableFiles.isEmpty())
 			{
 				currentFile = availableFiles.remove();			

Modified: trunk/src/main/org/jboss/messaging/core/journal/impl/NIOSequentialFile.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/impl/NIOSequentialFile.java	2008-03-06 18:22:05 UTC (rev 3851)
+++ trunk/src/main/org/jboss/messaging/core/journal/impl/NIOSequentialFile.java	2008-03-07 15:46:57 UTC (rev 3852)
@@ -27,6 +27,7 @@
 import java.nio.channels.FileChannel;
 
 import org.jboss.messaging.core.journal.SequentialFile;
+import org.jboss.messaging.core.logging.Logger;
 
 /**
  * 
@@ -37,27 +38,32 @@
  */
 public class NIOSequentialFile implements SequentialFile
 {
+	private static final Logger log = Logger.getLogger(NIOSequentialFile.class);
+		
 	private static final int LONG_LENGTH = 8;
 		
-	private final String fileName;
+	private String fileName;
 	
-	private final boolean sync;
-		
+	private boolean sync;
+	
 	private File file;
 	
 	private FileChannel channel;
-	
-	private long orderingID;
 		
-	public NIOSequentialFile(String fileName, boolean sync) throws Exception
+	public NIOSequentialFile(final String fileName, final boolean sync)
 	{
 		this.fileName = fileName;
 		
-		this.sync = sync;			  
+		this.sync = sync;		
 	}
-		
-	public void create() throws Exception
+	
+	public String getFileName()
 	{
+		return fileName;
+	}
+		
+	public void open() throws Exception
+	{		
 		file = new File(fileName);
 
 		RandomAccessFile rfile = new RandomAccessFile(file, "rw");
@@ -65,55 +71,26 @@
 		channel = rfile.getChannel();		
 	}
 	
-	public void load() throws Exception
+	public void preAllocate(final int size, final byte fillCharacter) throws Exception
 	{
-		ByteBuffer bb = ByteBuffer.wrap(new byte[LONG_LENGTH]);
-		
-		channel.read(bb);
-		
-		orderingID = bb.getLong();
-	}
-	
-	public long getOrderingID()
-	{
-		return orderingID;
-	}
-	
-	public void initialise(long orderingID, int size) throws Exception
-	{
 		ByteBuffer bb = ByteBuffer.allocateDirect(size);
 		
-		this.orderingID = orderingID;
-
-		//First 8 bytes contain the orderingID - used to order the files when loading
-		
-		bb.putLong(orderingID);
-		
-		//for debug only
 		for (int i = 0; i < size - LONG_LENGTH; i++)
 		{
-			if (i % 100 == 0)
-			{
-				bb.put((byte)'\n');
-			}
-			else
-			{
-				bb.put((byte)'X');
-			}
+			bb.put(fillCharacter);			
 		}
-		//end debug
-
+		
 		bb.flip();
 
 		channel.position(0);
 
 		channel.write(bb);
 
-		channel.force(false);
+		channel.force(false);	
 		
-		channel.position(LONG_LENGTH);		
+		channel.position(0);
 	}
-
+	
 	public void close() throws Exception
 	{
 		channel.close();
@@ -128,7 +105,9 @@
 
 	public void read(ByteBuffer bytes) throws Exception
 	{
-		channel.read(bytes);
+		int bytesRead = channel.read(bytes);
+		
+		log.info("Read " + bytesRead + " bytes");
 	}
 
 	public void write(ByteBuffer bytes) throws Exception
@@ -138,7 +117,11 @@
 		if (sync)
 		{
 			channel.force(false);
-		}
+		};
 	}
 
+	public void reset() throws Exception
+	{
+		channel.position(0);
+	}
 }

Added: trunk/src/main/org/jboss/messaging/core/journal/impl/NIOSequentialFileFactory.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/impl/NIOSequentialFileFactory.java	                        (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/journal/impl/NIOSequentialFileFactory.java	2008-03-07 15:46:57 UTC (rev 3852)
@@ -0,0 +1,63 @@
+/*
+  * JBoss, Home of Professional Open Source
+  * Copyright 2005, JBoss Inc., and individual contributors as indicated
+  * by the @authors tag. See the copyright.txt in the distribution for a
+  * full listing of individual contributors.
+  *
+  * This is free software; you can redistribute it and/or modify it
+  * under the terms of the GNU Lesser General Public License as
+  * published by the Free Software Foundation; either version 2.1 of
+  * the License, or (at your option) any later version.
+  *
+  * This software is distributed in the hope that it will be useful,
+  * but WITHOUT ANY WARRANTY; without even the implied warranty of
+  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+  * Lesser General Public License for more details.
+  *
+  * You should have received a copy of the GNU Lesser General Public
+  * License along with this software; if not, write to the Free
+  * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+  * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+  */
+package org.jboss.messaging.core.journal.impl;
+
+import java.io.File;
+import java.io.FilenameFilter;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.jboss.messaging.core.journal.SequentialFile;
+import org.jboss.messaging.core.journal.SequentialFileFactory;
+
+/**
+ * 
+ * A NIOSequentialFileFactory
+ * 
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ */
+public class NIOSequentialFileFactory implements SequentialFileFactory
+{
+	public SequentialFile createSequentialFile(final String fileName, final boolean sync)
+	{
+		return new NIOSequentialFile(fileName, sync);
+	}
+
+	public List<String> listFiles(final String journalDir, final String extension) throws Exception
+	{
+		File dir = new File(journalDir);
+		
+		FilenameFilter fnf = new FilenameFilter()
+		{
+			public boolean accept(File file, String name)
+			{
+				return name.endsWith(".jbm");
+			}
+		};
+		
+		String[] fileNames = dir.list(fnf);
+		
+		return Arrays.asList(fileNames);
+	}
+}

Added: trunk/src/main/org/jboss/messaging/core/journal/impl/RecordHandleImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/impl/RecordHandleImpl.java	                        (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/journal/impl/RecordHandleImpl.java	2008-03-07 15:46:57 UTC (rev 3852)
@@ -0,0 +1,34 @@
+package org.jboss.messaging.core.journal.impl;
+
+import org.jboss.messaging.core.journal.RecordHandle;
+
+/**
+ * 
+ * A RecordHandleImpl
+ * 
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ */
+public class RecordHandleImpl implements RecordHandle
+{
+	private final long id;
+	
+	private final JournalFile file;
+	
+	public RecordHandleImpl(final long id, final JournalFile file)
+	{
+		this.id = id;
+		
+		this.file = file;
+	}
+	
+	public long getID()
+	{
+		return id;
+	}
+	
+	public JournalFile getFile()
+	{
+		return file;
+	}
+}




More information about the jboss-cvs-commits mailing list