[jboss-cvs] JBoss Messaging SVN: r3852 - in trunk/src/main/org/jboss/messaging/core/journal: impl and 1 other directory.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Fri Mar 7 10:46:57 EST 2008
Author: timfox
Date: 2008-03-07 10:46:57 -0500 (Fri, 07 Mar 2008)
New Revision: 3852
Added:
trunk/src/main/org/jboss/messaging/core/journal/RecordHandle.java
trunk/src/main/org/jboss/messaging/core/journal/SequentialFileFactory.java
trunk/src/main/org/jboss/messaging/core/journal/impl/NIOSequentialFileFactory.java
trunk/src/main/org/jboss/messaging/core/journal/impl/RecordHandleImpl.java
Modified:
trunk/src/main/org/jboss/messaging/core/journal/Journal.java
trunk/src/main/org/jboss/messaging/core/journal/SequentialFile.java
trunk/src/main/org/jboss/messaging/core/journal/impl/JournalFile.java
trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java
trunk/src/main/org/jboss/messaging/core/journal/impl/NIOSequentialFile.java
Log:
More journal work
Modified: trunk/src/main/org/jboss/messaging/core/journal/Journal.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/Journal.java 2008-03-06 18:22:05 UTC (rev 3851)
+++ trunk/src/main/org/jboss/messaging/core/journal/Journal.java 2008-03-07 15:46:57 UTC (rev 3852)
@@ -21,7 +21,9 @@
*/
package org.jboss.messaging.core.journal;
+import java.util.Map;
+
/**
*
* A Journal
@@ -31,11 +33,9 @@
*/
public interface Journal
{
- void preAllocateFiles(int numFiles) throws Exception;
+ RecordHandle add(long id, byte[] bytes) throws Exception;
- void add(long id, byte[] bytes) throws Exception;
+ void delete(RecordHandle handle) throws Exception;
- void delete(long id) throws Exception;
-
- void load() throws Exception;
+ Map<Long, byte[]> load() throws Exception;
}
Added: trunk/src/main/org/jboss/messaging/core/journal/RecordHandle.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/RecordHandle.java (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/journal/RecordHandle.java 2008-03-07 15:46:57 UTC (rev 3852)
@@ -0,0 +1,33 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.messaging.core.journal;
+
+/**
+ *
+ * A RecordHandle
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ */
+public interface RecordHandle
+{
+}
Modified: trunk/src/main/org/jboss/messaging/core/journal/SequentialFile.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/SequentialFile.java 2008-03-06 18:22:05 UTC (rev 3851)
+++ trunk/src/main/org/jboss/messaging/core/journal/SequentialFile.java 2008-03-07 15:46:57 UTC (rev 3852)
@@ -32,19 +32,22 @@
*/
public interface SequentialFile
{
- long getOrderingID();
+ /*
+ * Creates the file if it doesn't already exist, then opens it
+ */
+ void open() throws Exception;
- void load() throws Exception;
+ String getFileName();
- void create() throws Exception;
+ void preAllocate(int size, byte fillCharacter) throws Exception;
- void initialise(long orderingID, int size) throws Exception;
-
void delete() throws Exception;
void write(ByteBuffer bytes) throws Exception;
void read(ByteBuffer bytes) throws Exception;
+ void reset() throws Exception;
+
void close() throws Exception;
}
Added: trunk/src/main/org/jboss/messaging/core/journal/SequentialFileFactory.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/SequentialFileFactory.java (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/journal/SequentialFileFactory.java 2008-03-07 15:46:57 UTC (rev 3852)
@@ -0,0 +1,38 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.messaging.core.journal;
+
+import java.util.List;
+
+/**
+ *
+ * A SequentialFileFactory
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ */
+public interface SequentialFileFactory
+{
+ SequentialFile createSequentialFile(String fileName, boolean sync) throws Exception;
+
+ List<String> listFiles(String journalDir, String extension) throws Exception;
+}
Modified: trunk/src/main/org/jboss/messaging/core/journal/impl/JournalFile.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/impl/JournalFile.java 2008-03-06 18:22:05 UTC (rev 3851)
+++ trunk/src/main/org/jboss/messaging/core/journal/impl/JournalFile.java 2008-03-07 15:46:57 UTC (rev 3852)
@@ -37,16 +37,20 @@
{
private final SequentialFile file;
+ private final long orderingID;
+
private final Set<Long> ids = new HashSet<Long>();
private int offset;
-
- public JournalFile(SequentialFile file)
+
+ public JournalFile(final SequentialFile file, final long orderingID)
{
this.file = file;
+
+ this.orderingID = orderingID;
}
- public void extendOffset(int delta)
+ public void extendOffset(final int delta)
{
offset += delta;
}
@@ -56,9 +60,34 @@
return offset;
}
+ public long getOrderingID()
+ {
+ return orderingID;
+ }
+
+ public void resetOffset()
+ {
+ offset = 0;
+ }
+
public SequentialFile getFile()
{
return file;
}
+ public void addID(final long id)
+ {
+ ids.add(id);
+ }
+
+ public void removeID(final long id)
+ {
+ ids.remove(id);
+ }
+
+ public boolean isEmpty()
+ {
+ return ids.isEmpty();
+ }
+
}
Modified: trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java 2008-03-06 18:22:05 UTC (rev 3851)
+++ trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java 2008-03-07 15:46:57 UTC (rev 3852)
@@ -21,8 +21,6 @@
*/
package org.jboss.messaging.core.journal.impl;
-import java.io.File;
-import java.io.FilenameFilter;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
@@ -31,9 +29,12 @@
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.Semaphore;
import org.jboss.messaging.core.journal.Journal;
+import org.jboss.messaging.core.journal.RecordHandle;
import org.jboss.messaging.core.journal.SequentialFile;
+import org.jboss.messaging.core.journal.SequentialFileFactory;
import org.jboss.messaging.core.logging.Logger;
/**
@@ -51,110 +52,230 @@
private static final int LONG_LENGTH = 8;
- private static final byte ADD_RECORD = 1;
+ public static final byte ADD_RECORD = 1;
- private static final byte DELETE_RECORD = 2;
+ public static final byte DELETE_RECORD = 2;
- private static final String JOURNAL_FILE_PREFIX = "journal-";
+ public static final byte FILL_CHARACTER = (byte)'J';
- private static final String JOURNAL_FILE_EXTENSION = "jbm";
+ public static final String JOURNAL_FILE_PREFIX = "jbm";
-
+ public static final String JOURNAL_FILE_EXTENSION = "jbm";
+
+
+
private final String journalDir;
private final int fileSize;
- private JournalFile currentFile ;
+ private final int numFiles;
- private List<JournalFile> files = new ArrayList<JournalFile>();
+ private final boolean sync;
- private LinkedList<JournalFile> availableFiles = new LinkedList<JournalFile>();
+ private final SequentialFileFactory fileFactory;
- private int fileSequence;
+ private final LinkedList<JournalFile> files = new LinkedList<JournalFile>();
+ private final LinkedList<JournalFile> availableFiles = new LinkedList<JournalFile>();
- public JournalImpl(final String journalDir, final int fileSize)
+ private final LinkedList<JournalFile> filesToDelete = new LinkedList<JournalFile>();
+
+ /*
+ * We use a semaphore rather than synchronized since it performs better when contended
+ */
+
+ //TODO - improve concurrency by allowing concurrent accesses if doesn't change current file
+ private final Semaphore lock = new Semaphore(1, true);
+
+ private volatile JournalFile currentFile ;
+
+ private volatile boolean loaded;
+
+ private volatile long lastOrderingID;
+
+ public JournalImpl(final String journalDir, final int fileSize, final int numFiles, final boolean sync,
+ final SequentialFileFactory fileFactory)
{
this.journalDir = journalDir;
this.fileSize = fileSize;
+
+ this.numFiles = numFiles;
+
+ this.sync = sync;
+
+ this.fileFactory = fileFactory;
}
- public void preAllocateFiles(int numFiles) throws Exception
+ // Journal implementation ----------------------------------------------------------------
+
+ public RecordHandle add(final long id, final byte[] bytes) throws Exception
{
- log.info("Pre-allocating " + numFiles + " files");
-
- for (int i = 0; i < numFiles; i++)
+ if (!loaded)
{
- JournalFile info = createFile();
-
- availableFiles.add(info);
+ throw new IllegalStateException("Journal must be loaded first");
}
- log.info("Done pre-allocate");
- }
-
- public void add(long id, byte[] bytes) throws Exception
- {
int size = 1 + INT_LENGTH + LONG_LENGTH + bytes.length;
- checkFile(size);
+ lock.acquire();
- byte[] toWrite = new byte[size];
- ByteBuffer bb = ByteBuffer.wrap(toWrite);
- bb.put(ADD_RECORD);
- bb.putLong(id);
- bb.putInt(bytes.length);
- bb.put(bytes);
-
- bb.flip();
-
- currentFile.getFile().write(bb);
-
- currentFile.extendOffset(size);
+ try
+ {
+ checkFile(size);
+
+ byte[] toWrite = new byte[size];
+ ByteBuffer bb = ByteBuffer.wrap(toWrite);
+ bb.put(ADD_RECORD);
+ bb.putLong(id);
+ bb.putInt(bytes.length);
+ bb.put(bytes);
+
+ bb.flip();
+
+ currentFile.getFile().write(bb);
+
+ currentFile.extendOffset(size);
+
+ currentFile.addID(id);
+
+ return new RecordHandleImpl(id, currentFile);
+ }
+ finally
+ {
+ lock.release();
+ }
}
- public void delete(long id) throws Exception
+ public void delete(RecordHandle handle) throws Exception
{
- int size = 1 + LONG_LENGTH;
+ if (!loaded)
+ {
+ throw new IllegalStateException("Journal must be loaded first");
+ }
- checkFile(size);
+ RecordHandleImpl rh = (RecordHandleImpl)handle;
- byte[] toWrite = new byte[size];
- ByteBuffer bb = ByteBuffer.wrap(toWrite);
- bb.put(DELETE_RECORD);
- bb.putLong(id);
+ int size = 1 + LONG_LENGTH;
- bb.flip();
+ lock.acquire();
- currentFile.getFile().write(bb);
+ try
+ {
+ checkFile(size);
+
+ long id = rh.getID();
+
+ byte[] toWrite = new byte[size];
+ ByteBuffer bb = ByteBuffer.wrap(toWrite);
+ bb.put(DELETE_RECORD);
+ bb.putLong(id);
+
+ bb.flip();
+
+ currentFile.getFile().write(bb);
+
+ JournalFile addedFile = rh.getFile();
+
+ addedFile.removeID(id);
+
+ checkAndReclaimFile(addedFile);
+ }
+ finally
+ {
+ lock.release();
+ }
}
-
- private boolean loaded;
-
- public void load() throws Exception
+
+ public Map<Long, byte[]> load() throws Exception
{
if (loaded)
{
throw new IllegalStateException("Journal is already loaded");
}
- loadFiles();
+ log.info("Loading...");
- Map<Long, byte[]> records = new HashMap<Long, byte[]>();
+ List<String> fileNames = fileFactory.listFiles(journalDir, JOURNAL_FILE_EXTENSION);
+
+ log.info("There are " + fileNames.size() + " files in directory");
+
+ List<JournalFile> orderedFiles = new ArrayList<JournalFile>(fileNames.size());
- for (JournalFile file: this.files)
+ for (String fileName: fileNames)
{
+ SequentialFile file = fileFactory.createSequentialFile(fileName, sync);
+
+ file.open();
+
+ ByteBuffer bb = ByteBuffer.wrap(new byte[LONG_LENGTH]);
+
+ file.read(bb);
+
+ bb.flip();
+
+ long orderingID = bb.getLong();
+
+ file.reset();
+
+ orderedFiles.add(new JournalFile(file, orderingID));
+ }
+
+ log.info("numFiles is " + numFiles);
+
+ int createNum = numFiles - orderedFiles.size();
+
+ //Preallocate some more if necessary
+ for (int i = 0; i < createNum; i++)
+ {
+ JournalFile file = createFile();
+
+ orderedFiles.add(file);
+
+ log.info("Created new file");
+ }
+
+ log.info("Done creating new ones");
+
+ //Now order them by ordering id - we can't use the file name for ordering since we can re-use files
+
+ class JournalFileComparator implements Comparator<JournalFile>
+ {
+ public int compare(JournalFile f1, JournalFile f2)
+ {
+ long id1 = f1.getOrderingID();
+ long id2 = f2.getOrderingID();
+
+ return (id1 < id2 ? -1 : (id1 == id2 ? 0 : 1));
+ }
+ }
+
+ Collections.sort(orderedFiles, new JournalFileComparator());
+
+ Map<Long, byte[]> records = new HashMap<Long, byte[]>();
+
+ boolean filesWithData = true;
+
+ outer: for (JournalFile file: orderedFiles)
+ {
+ log.info("Loading file, ordering id is " + file.getOrderingID());
+
byte[] bytes = new byte[fileSize];
ByteBuffer bb = ByteBuffer.wrap(bytes);
file.getFile().read(bb);
- while (true)
+ bb.flip();
+
+ bb.getLong();
+
+ while (filesWithData && bb.hasRemaining())
{
byte recordType = bb.get();
+ log.info("recordtype is " + recordType);
+
if (recordType == ADD_RECORD)
{
long id = bb.getLong();
@@ -175,92 +296,193 @@
records.remove(id);
}
+ else if (recordType == FILL_CHARACTER)
+ {
+ //Implies end of records in the file
+
+ files.add(file);
+
+ currentFile = file;
+
+ filesWithData = false;
+
+ continue outer;
+ }
else
{
- //Implies end of records in the file
- break;
-
- //TODO set currentFile and offset
+ throw new IllegalStateException("Journal " + file.getFile().getFileName() +
+ " is corrupt, invalid record type " + recordType);
}
}
-
- }
+
+ if (filesWithData)
+ {
+ log.info("Adding to files");
+ files.add(file);
+ }
+ else
+ {
+ log.info("Adding to available files");
+ //Empty files with no data of importance
+ availableFiles.add(file);
+ }
+ }
+ loaded = true;
+ return records;
}
- // Private -----------------------------------------------------------------------------
-
- private void loadFiles() throws Exception
+ public void stop() throws Exception
{
- File dir = new File(journalDir);
+ log.info("files size " + files.size());
+ log.info("available files size " + availableFiles.size());
+ log.info("files top delete size " + filesToDelete.size());
- FilenameFilter fnf = new FilenameFilter()
+ for (JournalFile file: files)
{
- public boolean accept(File file, String name)
- {
- return name.endsWith(".jbm");
- }
- };
+ file.getFile().close();
+ }
- String[] fileNames = dir.list(fnf);
-
- List<JournalFile> files = new ArrayList<JournalFile>(fileNames.length);
-
- for (String fileName: fileNames)
+ for (JournalFile file: availableFiles)
{
- SequentialFile file = new NIOSequentialFile(fileName, true);
-
- file.load();
-
- files.add(new JournalFile(file));
+ file.getFile().close();
}
- //Now order them by ordering id - we can't use the file name for ordering since we can re-use files
-
- class JournalFileComparator implements Comparator<JournalFile>
+ for (JournalFile file: filesToDelete)
{
- public int compare(JournalFile f1, JournalFile f2)
- {
- long id1 = f1.getFile().getOrderingID();
- long id2 = f2.getFile().getOrderingID();
-
- return (id1 < id2 ? -1 : (id1 == id2 ? 0 : 1));
- }
+ file.getFile().close();
}
-
- Collections.sort(files, new JournalFileComparator());
- for (JournalFile file: files)
+ this.currentFile = null;
+
+ files.clear();
+
+ availableFiles.clear();
+
+ filesToDelete.clear();
+ }
+
+ // Public -----------------------------------------------------------------------------
+
+ public LinkedList<JournalFile> getFiles()
+ {
+ return files;
+ }
+
+ public LinkedList<JournalFile> getAvailableFiles()
+ {
+ return availableFiles;
+ }
+
+ public LinkedList<JournalFile> getFilesToDelete()
+ {
+ return filesToDelete;
+ }
+
+ // Private -----------------------------------------------------------------------------
+
+ private void checkAndReclaimFile(JournalFile file) throws Exception
+ {
+ if (file.isEmpty() && file != currentFile)
{
+ //File can be reclaimed
+ files.remove(file);
+
+ //TODO - add to delete file list if there are a lot of available files
+
+ //Re-initialise it
+
+ long newOrderingID = generateOrderingID();
+
+ ByteBuffer bb = ByteBuffer.wrap(new byte[LONG_LENGTH]);
+
+ bb.putLong(newOrderingID);
+
+ SequentialFile sf = file.getFile();
+
+ sf.reset();
+
+ sf.write(bb);
+
+ JournalFile jf = new JournalFile(sf, newOrderingID);
+
+ availableFiles.add(jf);
}
}
private JournalFile createFile() throws Exception
{
- int fileNo = fileSequence++;
+ log.info("Creating a new file");
- String fileName = journalDir + "/" + JOURNAL_FILE_PREFIX + fileNo + "." + JOURNAL_FILE_EXTENSION;
+ long orderingID = generateOrderingID();
- SequentialFile sequentialFile = new NIOSequentialFile(fileName, true);
+ String fileName = journalDir + "/" + JOURNAL_FILE_PREFIX + "-" + orderingID + "." + JOURNAL_FILE_EXTENSION;
+
+ SequentialFile sequentialFile = fileFactory.createSequentialFile(fileName, sync);
- sequentialFile.create();
+ sequentialFile.open();
+
+ sequentialFile.preAllocate(fileSize, FILL_CHARACTER);
- JournalFile info = new JournalFile(sequentialFile);
+ ByteBuffer bb = ByteBuffer.wrap(new byte[LONG_LENGTH]);
+ bb.putLong(orderingID);
+
+ bb.flip();
+
+ sequentialFile.write(bb);
+
+ sequentialFile.reset();
+
+ JournalFile info = new JournalFile(sequentialFile, orderingID);
+
return info;
}
- private void checkFile(int size) throws Exception
+ private long generateOrderingID()
{
+ long orderingID = System.currentTimeMillis();
+
+ while (orderingID == lastOrderingID)
+ {
+ //Ensure it's unique
+ try
+ {
+ Thread.sleep(1);
+ }
+ catch (InterruptedException ignore)
+ {
+ }
+ orderingID = System.currentTimeMillis();
+ }
+ lastOrderingID = orderingID;
+
+ return orderingID;
+ }
+
+ private void checkFile(final int size) throws Exception
+ {
+ //We take into account the first timestamp long
+ if (size > fileSize - LONG_LENGTH)
+ {
+ throw new IllegalArgumentException("Record is too large to store " + size);
+ }
+
if (currentFile == null || fileSize - currentFile.getOffset() < size)
{
+ log.info("Getting new file");
+
if (currentFile != null)
{
currentFile.getFile().close();
+
+ checkAndReclaimFile(currentFile);
}
+ log.info("Getting new file");
+
if (!availableFiles.isEmpty())
{
currentFile = availableFiles.remove();
Modified: trunk/src/main/org/jboss/messaging/core/journal/impl/NIOSequentialFile.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/impl/NIOSequentialFile.java 2008-03-06 18:22:05 UTC (rev 3851)
+++ trunk/src/main/org/jboss/messaging/core/journal/impl/NIOSequentialFile.java 2008-03-07 15:46:57 UTC (rev 3852)
@@ -27,6 +27,7 @@
import java.nio.channels.FileChannel;
import org.jboss.messaging.core.journal.SequentialFile;
+import org.jboss.messaging.core.logging.Logger;
/**
*
@@ -37,27 +38,32 @@
*/
public class NIOSequentialFile implements SequentialFile
{
+ private static final Logger log = Logger.getLogger(NIOSequentialFile.class);
+
private static final int LONG_LENGTH = 8;
- private final String fileName;
+ private String fileName;
- private final boolean sync;
-
+ private boolean sync;
+
private File file;
private FileChannel channel;
-
- private long orderingID;
- public NIOSequentialFile(String fileName, boolean sync) throws Exception
+ public NIOSequentialFile(final String fileName, final boolean sync)
{
this.fileName = fileName;
- this.sync = sync;
+ this.sync = sync;
}
-
- public void create() throws Exception
+
+ public String getFileName()
{
+ return fileName;
+ }
+
+ public void open() throws Exception
+ {
file = new File(fileName);
RandomAccessFile rfile = new RandomAccessFile(file, "rw");
@@ -65,55 +71,26 @@
channel = rfile.getChannel();
}
- public void load() throws Exception
+ public void preAllocate(final int size, final byte fillCharacter) throws Exception
{
- ByteBuffer bb = ByteBuffer.wrap(new byte[LONG_LENGTH]);
-
- channel.read(bb);
-
- orderingID = bb.getLong();
- }
-
- public long getOrderingID()
- {
- return orderingID;
- }
-
- public void initialise(long orderingID, int size) throws Exception
- {
ByteBuffer bb = ByteBuffer.allocateDirect(size);
- this.orderingID = orderingID;
-
- //First 8 bytes contain the orderingID - used to order the files when loading
-
- bb.putLong(orderingID);
-
- //for debug only
for (int i = 0; i < size - LONG_LENGTH; i++)
{
- if (i % 100 == 0)
- {
- bb.put((byte)'\n');
- }
- else
- {
- bb.put((byte)'X');
- }
+ bb.put(fillCharacter);
}
- //end debug
-
+
bb.flip();
channel.position(0);
channel.write(bb);
- channel.force(false);
+ channel.force(false);
- channel.position(LONG_LENGTH);
+ channel.position(0);
}
-
+
public void close() throws Exception
{
channel.close();
@@ -128,7 +105,9 @@
public void read(ByteBuffer bytes) throws Exception
{
- channel.read(bytes);
+ int bytesRead = channel.read(bytes);
+
+ log.info("Read " + bytesRead + " bytes");
}
public void write(ByteBuffer bytes) throws Exception
@@ -138,7 +117,11 @@
if (sync)
{
channel.force(false);
- }
+ };
}
+ public void reset() throws Exception
+ {
+ channel.position(0);
+ }
}
Added: trunk/src/main/org/jboss/messaging/core/journal/impl/NIOSequentialFileFactory.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/impl/NIOSequentialFileFactory.java (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/journal/impl/NIOSequentialFileFactory.java 2008-03-07 15:46:57 UTC (rev 3852)
@@ -0,0 +1,63 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.messaging.core.journal.impl;
+
+import java.io.File;
+import java.io.FilenameFilter;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.jboss.messaging.core.journal.SequentialFile;
+import org.jboss.messaging.core.journal.SequentialFileFactory;
+
+/**
+ *
+ * A NIOSequentialFileFactory
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ */
+public class NIOSequentialFileFactory implements SequentialFileFactory
+{
+ public SequentialFile createSequentialFile(final String fileName, final boolean sync)
+ {
+ return new NIOSequentialFile(fileName, sync);
+ }
+
+ public List<String> listFiles(final String journalDir, final String extension) throws Exception
+ {
+ File dir = new File(journalDir);
+
+ FilenameFilter fnf = new FilenameFilter()
+ {
+ public boolean accept(File file, String name)
+ {
+ return name.endsWith(".jbm");
+ }
+ };
+
+ String[] fileNames = dir.list(fnf);
+
+ return Arrays.asList(fileNames);
+ }
+}
Added: trunk/src/main/org/jboss/messaging/core/journal/impl/RecordHandleImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/impl/RecordHandleImpl.java (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/journal/impl/RecordHandleImpl.java 2008-03-07 15:46:57 UTC (rev 3852)
@@ -0,0 +1,34 @@
+package org.jboss.messaging.core.journal.impl;
+
+import org.jboss.messaging.core.journal.RecordHandle;
+
+/**
+ *
+ * A RecordHandleImpl
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ */
+public class RecordHandleImpl implements RecordHandle
+{
+ private final long id;
+
+ private final JournalFile file;
+
+ public RecordHandleImpl(final long id, final JournalFile file)
+ {
+ this.id = id;
+
+ this.file = file;
+ }
+
+ public long getID()
+ {
+ return id;
+ }
+
+ public JournalFile getFile()
+ {
+ return file;
+ }
+}
More information about the jboss-cvs-commits
mailing list