[jboss-cvs] JBoss Messaging SVN: r3871 - in trunk: src/main/org/jboss/messaging/core/bindingmanager and 4 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Wed Mar 12 06:02:42 EDT 2008
Author: timfox
Date: 2008-03-12 06:02:42 -0400 (Wed, 12 Mar 2008)
New Revision: 3871
Added:
trunk/src/main/org/jboss/messaging/core/bindingmanager/
trunk/src/main/org/jboss/messaging/core/bindingmanager/BindingManager.java
trunk/src/main/org/jboss/messaging/core/journal/RecordHistory.java
trunk/src/main/org/jboss/messaging/core/journal/impl/RecordHistoryImpl.java
Modified:
trunk/src/main/org/jboss/messaging/core/journal/Journal.java
trunk/src/main/org/jboss/messaging/core/journal/RecordHandle.java
trunk/src/main/org/jboss/messaging/core/journal/SequentialFile.java
trunk/src/main/org/jboss/messaging/core/journal/impl/JournalFile.java
trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java
trunk/src/main/org/jboss/messaging/core/journal/impl/NIOSequentialFile.java
trunk/src/main/org/jboss/messaging/core/journal/impl/RecordHandleImpl.java
trunk/tests/src/org/jboss/messaging/core/journal/impl/test/unit/JournalTest.java
trunk/tests/src/org/jboss/messaging/core/journal/impl/test/unit/fakes/FakeSequentialFileFactory.java
Log:
More journal work
Added: trunk/src/main/org/jboss/messaging/core/bindingmanager/BindingManager.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/bindingmanager/BindingManager.java (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/bindingmanager/BindingManager.java 2008-03-12 10:02:42 UTC (rev 3871)
@@ -0,0 +1,48 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.messaging.core.bindingmanager;
+
+import org.jboss.messaging.core.postoffice.Binding;
+
+/**
+ *
+ * A BindingManager
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ */
+public interface BindingManager
+{
+ /**
+ * Add a binding into the store
+ * @param binding The binding to add
+ * @throws Exception
+ */
+ void addBinding(Binding binding) throws Exception;
+
+ /**
+ * Delete a binding from the store
+ * @param binding The binding to delete
+ * @throws Exception
+ */
+ void deleteBinding(Binding binding) throws Exception;
+}
Modified: trunk/src/main/org/jboss/messaging/core/journal/Journal.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/Journal.java 2008-03-12 03:05:48 UTC (rev 3870)
+++ trunk/src/main/org/jboss/messaging/core/journal/Journal.java 2008-03-12 10:02:42 UTC (rev 3871)
@@ -21,7 +21,7 @@
*/
package org.jboss.messaging.core.journal;
-import java.util.Map;
+import java.util.List;
/**
@@ -33,9 +33,32 @@
*/
public interface Journal
{
- RecordHandle add(long id, byte[] bytes) throws Exception;
+ // Non transactional operations
- void delete(RecordHandle handle) throws Exception;
+ RecordHandle appendAddRecord(long id, byte[] record) throws Exception;
- Map<Long, byte[]> load() throws Exception;
+ void appendUpdateRecord(RecordHandle handle, byte[] record) throws Exception;
+
+ void appendDeleteRecord(RecordHandle handle) throws Exception;
+
+ // Transactional operations
+
+ RecordHandle appendAddRecordTransactional(long txID, long id, byte[] record, boolean done) throws Exception;
+
+ void appendUpdateRecordTransactional(long txID, RecordHandle handle, byte[] record, boolean done) throws Exception;
+
+ void appendDeleteRecordTransactional(long txID, RecordHandle handle, boolean done) throws Exception;
+
+
+// RecordHandle appendAddRecordPrepare(long txID, long id, byte[] record, boolean done) throws Exception;
+//
+// void appendUpdateRecordPrepare(long txID, RecordHandle handle, byte[] record, boolean done) throws Exception;
+//
+// void appendDeleteRecordPrepare(long txID, RecordHandle handle, boolean done) throws Exception;
+
+
+ // Load
+
+ List<RecordHistory> load() throws Exception;
+
}
Modified: trunk/src/main/org/jboss/messaging/core/journal/RecordHandle.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/RecordHandle.java 2008-03-12 03:05:48 UTC (rev 3870)
+++ trunk/src/main/org/jboss/messaging/core/journal/RecordHandle.java 2008-03-12 10:02:42 UTC (rev 3871)
@@ -30,4 +30,5 @@
*/
public interface RecordHandle
{
+ public long getID();
}
Added: trunk/src/main/org/jboss/messaging/core/journal/RecordHistory.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/RecordHistory.java (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/journal/RecordHistory.java 2008-03-12 10:02:42 UTC (rev 3871)
@@ -0,0 +1,38 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.messaging.core.journal;
+
+import java.util.List;
+
+/**
+ *
+ * A RecordHistory
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ */
+public interface RecordHistory
+{
+ RecordHandle getHandle();
+
+ List<byte[]> getRecords();
+}
Modified: trunk/src/main/org/jboss/messaging/core/journal/SequentialFile.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/SequentialFile.java 2008-03-12 03:05:48 UTC (rev 3870)
+++ trunk/src/main/org/jboss/messaging/core/journal/SequentialFile.java 2008-03-12 10:02:42 UTC (rev 3871)
@@ -39,13 +39,13 @@
String getFileName();
- void preAllocate(int size, byte fillCharacter) throws Exception;
+ void fill(int position, int size, byte fillCharacter) throws Exception;
void delete() throws Exception;
void write(ByteBuffer bytes) throws Exception;
- void read(ByteBuffer bytes) throws Exception;
+ int read(ByteBuffer bytes) throws Exception;
void reset() throws Exception;
Modified: trunk/src/main/org/jboss/messaging/core/journal/impl/JournalFile.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/impl/JournalFile.java 2008-03-12 03:05:48 UTC (rev 3870)
+++ trunk/src/main/org/jboss/messaging/core/journal/impl/JournalFile.java 2008-03-12 10:02:42 UTC (rev 3871)
@@ -39,7 +39,7 @@
private final long orderingID;
- private final Set<Long> ids = new HashSet<Long>();
+ private int refCount;
private int offset;
@@ -75,19 +75,19 @@
return file;
}
- public void addID(final long id)
+ public void incRefCount()
{
- ids.add(id);
+ refCount++;
}
- public void removeID(final long id)
+ public void decRefCount()
{
- ids.remove(id);
+ refCount--;
}
public boolean isEmpty()
{
- return ids.isEmpty();
+ return refCount == 0;
}
}
Modified: trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java 2008-03-12 03:05:48 UTC (rev 3870)
+++ trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java 2008-03-12 10:02:42 UTC (rev 3871)
@@ -25,14 +25,19 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
-import java.util.HashMap;
-import java.util.LinkedList;
+import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
+import java.util.Queue;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Semaphore;
import org.jboss.messaging.core.journal.Journal;
import org.jboss.messaging.core.journal.RecordHandle;
+import org.jboss.messaging.core.journal.RecordHistory;
import org.jboss.messaging.core.journal.SequentialFile;
import org.jboss.messaging.core.journal.SequentialFileFactory;
import org.jboss.messaging.core.logging.Logger;
@@ -48,38 +53,69 @@
{
private static final Logger log = Logger.getLogger(JournalImpl.class);
- private static final int INT_LENGTH = 4;
+ // The sizes of primitive types
- private static final int LONG_LENGTH = 8;
+ private static final int SIZE_LONG = 8;
+
+ private static final int SIZE_INT = 4;
+
+ private static final int SIZE_BYTE = 1;
+
+ //Record markers - they must be all unique
+
+ public static final byte ADD_RECORD = 11;
- public static final byte ADD_RECORD = 1;
+ public static final byte UPDATE_RECORD = 12;
- public static final byte DELETE_RECORD = 2;
+ public static final byte DELETE_RECORD = 13;
- public static final byte FILL_CHARACTER = (byte)'J';
+ public static final byte ADD_RECORD_TX = 14;
- public static final String JOURNAL_FILE_PREFIX = "jbm";
+ public static final byte UPDATE_RECORD_TX = 15;
- public static final String JOURNAL_FILE_EXTENSION = "jbm";
+ public static final byte DELETE_RECORD_TX = 16;
-
+ //End markers - they must all be unique
+ public static final byte DONE = 21;
+
+ public static final byte TX_CONTINUE = 22;
+
+ public static final byte TX_DONE = 23;
+
+ public static final byte FILL_CHARACTER = 74; // Letter 'J'
+
+
+
+
private final String journalDir;
private final int fileSize;
- private final int numFiles;
+ private final int minFiles;
+ private final int minAvailableFiles;
+
private final boolean sync;
private final SequentialFileFactory fileFactory;
- private final LinkedList<JournalFile> files = new LinkedList<JournalFile>();
+ private final long taskPeriod;
- private final LinkedList<JournalFile> availableFiles = new LinkedList<JournalFile>();
+ public final String filePrefix;
- private final LinkedList<JournalFile> filesToDelete = new LinkedList<JournalFile>();
-
+ public final String fileExtension;
+
+
+ private final Queue<JournalFile> files = new ConcurrentLinkedQueue<JournalFile>();
+
+ private final Queue<JournalFile> availableFiles = new ConcurrentLinkedQueue<JournalFile>();
+
+ private final Map<Long, TransactionInfo> transactions = new ConcurrentHashMap<Long, TransactionInfo>();
+
+ private final Map<Long, List<RecordHandle>> transactionalDeletes =
+ new ConcurrentHashMap<Long, List<RecordHandle>>();
+
/*
* We use a semaphore rather than synchronized since it performs better when contended
*/
@@ -92,54 +128,152 @@
private volatile boolean loaded;
private volatile long lastOrderingID;
-
- public JournalImpl(final String journalDir, final int fileSize, final int numFiles, final boolean sync,
- final SequentialFileFactory fileFactory)
+
+ private final Timer timer = new Timer(true);
+
+ private final TimerTask reclaimerTask = new ReclaimerTask();
+
+ private final TimerTask availableFilesTask = new AvailableFilesTask();
+
+
+
+ public JournalImpl(final String journalDir, final int fileSize, final int minFiles, final int minAvailableFiles,
+ final boolean sync, final SequentialFileFactory fileFactory, final long taskPeriod,
+ final String filePrefix, final String fileExtension)
{
this.journalDir = journalDir;
this.fileSize = fileSize;
- this.numFiles = numFiles;
+ this.minFiles = minFiles;
+ this.minAvailableFiles = minAvailableFiles;
+
this.sync = sync;
this.fileFactory = fileFactory;
+
+ this.taskPeriod = taskPeriod;
+
+ this.filePrefix = filePrefix;
+
+ this.fileExtension = fileExtension;
}
// Journal implementation ----------------------------------------------------------------
- public RecordHandle add(final long id, final byte[] bytes) throws Exception
+ public ByteBuffer allocateBuffer(final int size) throws Exception
{
+ return ByteBuffer.allocateDirect(size);
+ }
+
+ public RecordHandle appendAddRecord(final long id, final byte[] record) throws Exception
+ {
if (!loaded)
{
throw new IllegalStateException("Journal must be loaded first");
}
- int size = 1 + INT_LENGTH + LONG_LENGTH + bytes.length;
+ //TODO optimise to avoid creating a new byte buffer
+ int size = SIZE_BYTE + SIZE_LONG + SIZE_INT + record.length + SIZE_BYTE;
+
+ ByteBuffer bb = ByteBuffer.wrap(new byte[size]);
+
+ bb.put(ADD_RECORD);
+
+ bb.putLong(id);
+
+ bb.putInt(record.length);
+
+ bb.put(record);
+
+ bb.put(DONE);
+
+ bb.flip();
+
lock.acquire();
try
- {
+ {
checkFile(size);
+
+ currentFile.getFile().write(bb);
- byte[] toWrite = new byte[size];
- ByteBuffer bb = ByteBuffer.wrap(toWrite);
- bb.put(ADD_RECORD);
- bb.putLong(id);
- bb.putInt(bytes.length);
- bb.put(bytes);
+ currentFile.extendOffset(size);
- bb.flip();
+ RecordHandleImpl rh = new RecordHandleImpl(id);
+ rh.addFile(currentFile);
+
+ return rh;
+ }
+ finally
+ {
+ lock.release();
+ }
+ }
+
+ public RecordHandle appendAddRecordTransactional(final long txID, final long id,
+ final byte[] record, final boolean done) throws Exception
+ {
+ if (!loaded)
+ {
+ throw new IllegalStateException("Journal must be loaded first");
+ }
+
+ //TODO optimise to avoid creating a new byte buffer
+
+ int size = SIZE_BYTE + SIZE_LONG + SIZE_LONG + SIZE_INT + record.length + SIZE_BYTE;
+
+ ByteBuffer bb = ByteBuffer.wrap(new byte[size]);
+
+ bb.put(ADD_RECORD);
+
+ bb.putLong(txID);
+
+ bb.putLong(id);
+
+ bb.putInt(record.length);
+
+ bb.put(record);
+
+ if (done)
+ {
+ bb.put(TX_DONE);
+ }
+ else
+ {
+ bb.put(TX_CONTINUE);
+ }
+
+ bb.flip();
+
+ lock.acquire();
+
+ try
+ {
+ checkFile(size);
+
currentFile.getFile().write(bb);
currentFile.extendOffset(size);
- currentFile.addID(id);
+ RecordHandleImpl rh = new RecordHandleImpl(id);
- return new RecordHandleImpl(id, currentFile);
+ rh.addFile(currentFile);
+
+ if (done)
+ {
+ List<RecordHandle> list = transactionalDeletes.remove(txID);
+
+ if (list != null)
+ {
+ releaseDeletes(list);
+ }
+ }
+
+ return rh;
}
finally
{
@@ -147,7 +281,7 @@
}
}
- public void delete(RecordHandle handle) throws Exception
+ public void appendUpdateRecord(final RecordHandle handle, final byte[] record) throws Exception
{
if (!loaded)
{
@@ -156,39 +290,225 @@
RecordHandleImpl rh = (RecordHandleImpl)handle;
- int size = 1 + LONG_LENGTH;
+ //TODO optimise to avoid creating a new byte buffer
+ int size = SIZE_BYTE + SIZE_LONG + SIZE_INT + record.length + SIZE_BYTE;
+
+ ByteBuffer bb = ByteBuffer.wrap(new byte[size]);
+
+ bb.put(UPDATE_RECORD);
+
+ bb.putLong(rh.getID());
+
+ bb.putInt(record.length);
+
+ bb.put(record);
+
+ bb.put(DONE);
+
+ bb.flip();
+
lock.acquire();
try
- {
+ {
checkFile(size);
+
+ currentFile.getFile().write(bb);
- long id = rh.getID();
+ currentFile.extendOffset(size);
- byte[] toWrite = new byte[size];
- ByteBuffer bb = ByteBuffer.wrap(toWrite);
- bb.put(DELETE_RECORD);
- bb.putLong(id);
+ rh.addFile(currentFile);
+ }
+ finally
+ {
+ lock.release();
+ }
+ }
+
+ public void appendUpdateRecordTransactional(final long txID, final RecordHandle handle,
+ final byte[] record, final boolean done) throws Exception
+ {
+ if (!loaded)
+ {
+ throw new IllegalStateException("Journal must be loaded first");
+ }
+
+ RecordHandleImpl rh = (RecordHandleImpl)handle;
+
+ //TODO optimise to avoid creating a new byte buffer
+
+ int size = SIZE_BYTE + SIZE_LONG + SIZE_LONG + SIZE_INT + record.length + SIZE_BYTE;
+
+ ByteBuffer bb = ByteBuffer.wrap(new byte[size]);
+
+ bb.put(UPDATE_RECORD);
+
+ bb.putLong(txID);
+
+ bb.putLong(rh.getID());
+
+ bb.putInt(record.length);
+
+ bb.put(record);
+
+ if (done)
+ {
+ bb.put(TX_DONE);
+ }
+ else
+ {
+ bb.put(TX_CONTINUE);
+ }
+
+ bb.flip();
+
+ lock.acquire();
+
+ try
+ {
+ checkFile(size);
+
+ currentFile.getFile().write(bb);
- bb.flip();
-
- currentFile.getFile().write(bb);
-
- JournalFile addedFile = rh.getFile();
+ currentFile.extendOffset(size);
- addedFile.removeID(id);
+ rh.addFile(currentFile);
- checkAndReclaimFile(addedFile);
+ if (done)
+ {
+ List<RecordHandle> list = transactionalDeletes.remove(txID);
+
+ if (list != null)
+ {
+ releaseDeletes(list);
+ }
+ }
}
finally
{
lock.release();
+ }
+ }
+
+ private void releaseDeletes(final List<RecordHandle> deletes)
+ {
+ for (RecordHandle handle: deletes)
+ {
+ RecordHandleImpl rh = (RecordHandleImpl)handle;
+
+ rh.recordDeleted();
}
}
+
+ public void appendDeleteRecord(final RecordHandle handle) throws Exception
+ {
+ if (!loaded)
+ {
+ throw new IllegalStateException("Journal must be loaded first");
+ }
- public Map<Long, byte[]> load() throws Exception
+ RecordHandleImpl rh = (RecordHandleImpl)handle;
+
+ int size = SIZE_BYTE + SIZE_LONG + SIZE_BYTE;
+
+ ByteBuffer buffer = ByteBuffer.wrap(new byte[size]);
+
+ buffer.put(DELETE_RECORD);
+
+ buffer.putLong(rh.getID());
+
+ buffer.put(DONE);
+
+ buffer.flip();
+
+ lock.acquire();
+
+ try
+ {
+ checkFile(size);
+
+ currentFile.getFile().write(buffer);
+
+ currentFile.extendOffset(size);
+
+ rh.recordDeleted();
+ }
+ finally
+ {
+ lock.release();
+ }
+ }
+
+ public void appendDeleteRecordTransactional(final long txID, final RecordHandle handle, final boolean done) throws Exception
{
+ if (!loaded)
+ {
+ throw new IllegalStateException("Journal must be loaded first");
+ }
+
+ RecordHandleImpl rh = (RecordHandleImpl)handle;
+
+ int size = SIZE_BYTE + SIZE_LONG + SIZE_LONG + SIZE_BYTE;
+
+ ByteBuffer buffer = ByteBuffer.wrap(new byte[size]);
+
+ buffer.put(DELETE_RECORD);
+
+ buffer.putLong(txID);
+
+ buffer.putLong(rh.getID());
+
+ if (done)
+ {
+ buffer.put(TX_DONE);
+ }
+ else
+ {
+ buffer.put(TX_CONTINUE);
+ }
+
+ buffer.flip();
+
+ lock.acquire();
+
+ try
+ {
+ checkFile(size);
+
+ currentFile.getFile().write(buffer);
+
+ currentFile.extendOffset(size);
+
+ //It's a transactional delete so we need to make sure file doesn't get deleted
+ rh.addFile(currentFile);
+
+ List<RecordHandle> list = transactionalDeletes.get(txID);
+
+ if (list == null)
+ {
+ list = new ArrayList<RecordHandle>();
+
+ transactionalDeletes.put(txID, list);
+ }
+
+ list.add(handle);
+
+ if (done)
+ {
+ transactionalDeletes.remove(txID);
+
+ releaseDeletes(list);
+ }
+ }
+ finally
+ {
+ lock.release();
+ }
+ }
+
+ public List<RecordHistory> load() throws Exception
+ {
if (loaded)
{
throw new IllegalStateException("Journal is already loaded");
@@ -196,7 +516,7 @@
log.info("Loading...");
- List<String> fileNames = fileFactory.listFiles(journalDir, JOURNAL_FILE_EXTENSION);
+ List<String> fileNames = fileFactory.listFiles(journalDir, fileExtension);
log.info("There are " + fileNames.size() + " files in directory");
@@ -208,7 +528,7 @@
file.open();
- ByteBuffer bb = ByteBuffer.wrap(new byte[LONG_LENGTH]);
+ ByteBuffer bb = ByteBuffer.wrap(new byte[SIZE_LONG]);
file.read(bb);
@@ -221,9 +541,9 @@
orderedFiles.add(new JournalFile(file, orderingID));
}
- log.info("numFiles is " + numFiles);
+ log.info("minFiles is " + minFiles);
- int createNum = numFiles - orderedFiles.size();
+ int createNum = minFiles - orderedFiles.size();
//Preallocate some more if necessary
for (int i = 0; i < createNum; i++)
@@ -252,93 +572,290 @@
Collections.sort(orderedFiles, new JournalFileComparator());
- Map<Long, byte[]> records = new HashMap<Long, byte[]>();
-
- boolean filesWithData = true;
-
- outer: for (JournalFile file: orderedFiles)
+ Map<Long, RecordHistory> histories = new LinkedHashMap<Long, RecordHistory>();
+
+ for (JournalFile file: orderedFiles)
{
log.info("Loading file, ordering id is " + file.getOrderingID());
- byte[] bytes = new byte[fileSize];
+ ByteBuffer bb = ByteBuffer.wrap(new byte[fileSize]);
- ByteBuffer bb = ByteBuffer.wrap(bytes);
+ int bytesRead = file.getFile().read(bb);
- file.getFile().read(bb);
+ if (bytesRead != fileSize)
+ {
+ //deal with this better
+
+ throw new IllegalStateException("File is wrong size " + bytesRead +
+ " expected " + fileSize + " : " + file.getFile().getFileName());
+ }
bb.flip();
+ //First long is the ordering timestamp
bb.getLong();
- while (filesWithData && bb.hasRemaining())
+ boolean hasData = false;
+
+ while (bb.hasRemaining())
{
byte recordType = bb.get();
log.info("recordtype is " + recordType);
- if (recordType == ADD_RECORD)
+ int pos = bb.position();
+
+ switch(recordType)
{
- long id = bb.getLong();
+ case ADD_RECORD:
+ {
+ long id = bb.getLong();
+
+ int size = bb.getInt();
+
+ byte[] record = new byte[size];
+
+ bb.get(record);
+
+ byte end = bb.get();
+
+ if (end != DONE)
+ {
+ repairFrom(pos, file);
+ }
+ else
+ {
+ handleAddRecord(id, file, record, histories);
+
+ hasData = true;
+ }
+
+ break;
+ }
+ case UPDATE_RECORD:
+ {
+ long id = bb.getLong();
+
+ int size = bb.getInt();
+
+ byte[] record = new byte[size];
+
+ bb.get(record);
+
+ byte end = bb.get();
+
+ if (end != DONE)
+ {
+ repairFrom(pos, file);
+ }
+ else
+ {
+ handleUpdateRecord(id, file, record, histories);
+
+ hasData = true;
+ }
+
+ break;
+ }
+ case DELETE_RECORD:
+ {
+ long id = bb.getLong();
+
+ byte end = bb.get();
+
+ if (end != DONE)
+ {
+ repairFrom(pos, file);
+ }
+ else
+ {
+ handleDeleteRecord(id, histories);
+
+ hasData = true;
+ }
+
+ break;
+ }
+ case ADD_RECORD_TX:
+ {
+ long txID = bb.getLong();
+
+ long id = bb.getLong();
+
+ int size = bb.getInt();
+
+ byte[] record = new byte[size];
+
+ bb.get(record);
+
+ byte end = bb.get();
+
+ if (end != TX_CONTINUE && end != TX_DONE)
+ {
+ repairFrom(pos, file);
+ }
+ else
+ {
+ handleTransactionalRecord(ADD_RECORD, txID, id, file, record, end, histories);
+
+ hasData = true;
+ }
- int length = bb.getInt();
-
- //TODO - optimise this - no need to copy
-
- byte[] record = new byte[length];
-
- bb.get(record);
-
- records.put(id, record);
+ break;
+ }
+ case UPDATE_RECORD_TX:
+ {
+ long txID = bb.getLong();
+
+ long id = bb.getLong();
+
+ int size = bb.getInt();
+
+ byte[] record = new byte[size];
+
+ bb.get(record);
+
+ byte end = bb.get();
+
+ if (end != TX_CONTINUE && end != TX_DONE)
+ {
+ repairFrom(pos, file);
+ }
+ else
+ {
+ handleTransactionalRecord(UPDATE_RECORD, txID, id, file, record, end, histories);
+
+ hasData = true;
+ }
+
+ break;
+ }
+ case DELETE_RECORD_TX:
+ {
+ long txID = bb.getLong();
+
+ long id = bb.getLong();
+
+ byte end = bb.get();
+
+ if (end != TX_CONTINUE && end != TX_DONE)
+ {
+ repairFrom(pos, file);
+ }
+ else
+ {
+ handleTransactionalRecord(DELETE_RECORD, txID, id, file, null, end, histories);
+
+ hasData = true;
+ }
+
+ break;
+ }
+ case FILL_CHARACTER:
+ {
+ //End of records in file - we check the file only contains fill characters from this point
+ while (bb.hasRemaining())
+ {
+ byte b = bb.get();
+
+ if (b != FILL_CHARACTER)
+ {
+ throw new IllegalStateException("Corrupt file " + file.getFile().getFileName() +
+ " contains non fill character at position " + pos);
+ }
+ }
+
+ break;
+ }
+ default:
+ {
+ throw new IllegalStateException("Journal " + file.getFile().getFileName() +
+ " is corrupt, invalid record type " + recordType);
+ }
}
- else if (recordType == DELETE_RECORD)
- {
- long id = bb.getLong();
-
- records.remove(id);
- }
- else if (recordType == FILL_CHARACTER)
- {
- //Implies end of records in the file
-
- files.add(file);
-
- currentFile = file;
-
- filesWithData = false;
-
- continue outer;
- }
- else
- {
- throw new IllegalStateException("Journal " + file.getFile().getFileName() +
- " is corrupt, invalid record type " + recordType);
- }
}
- if (filesWithData)
+ if (hasData)
{
log.info("Adding to files");
+
files.add(file);
}
else
{
log.info("Adding to available files");
- //Empty files with no data of importance
+
+ //Empty files with no data
availableFiles.add(file);
- }
+ }
}
+
+ //Now it's possible that some of the files are no longer needed
+ checkFilesForReclamation();
+
+ for (JournalFile file: files)
+ {
+ currentFile = file;
+ }
+
+ //Check we have enough available files
+
+ checkAndCreateAvailableFiles();
+
+ if (currentFile == null)
+ {
+ currentFile = availableFiles.remove();
+
+ files.add(currentFile);
+ }
+
+ //Close all files apart from the current one
+
+ for (JournalFile file: files)
+ {
+ if (file != currentFile)
+ {
+ file.getFile().close();
+ }
+ }
+
+ startTasks();
+
loaded = true;
- return records;
+ return new ArrayList<RecordHistory>(histories.values());
}
+ private void repairFrom(int pos, JournalFile file) throws Exception
+ {
+ log.warn("Corruption has been detected in file: " + file.getFile().getFileName() +
+ " in the record that starts at position " + pos + ". " +
+ "The most likely cause is that a crash occurred in the previous run. The corrupt record will be discarded.");
+
+ file.getFile().fill(pos, fileSize - pos, FILL_CHARACTER);
+ }
+
+ public void checkAndCreateAvailableFiles() throws Exception
+ {
+ log.info("Checking if we need to create more files");
+
+ int filesToCreate = minAvailableFiles - availableFiles.size();
+
+ for (int i = 0; i < filesToCreate; i++)
+ {
+ JournalFile file = createFile();
+
+ availableFiles.add(file);
+ }
+ }
+
public void stop() throws Exception
{
- log.info("files size " + files.size());
- log.info("available files size " + availableFiles.size());
- log.info("files top delete size " + filesToDelete.size());
+ reclaimerTask.cancel();
+ availableFilesTask.cancel();
+
for (JournalFile file: files)
{
file.getFile().close();
@@ -348,85 +865,195 @@
{
file.getFile().close();
}
-
- for (JournalFile file: filesToDelete)
- {
- file.getFile().close();
- }
-
+
this.currentFile = null;
files.clear();
- availableFiles.clear();
+ availableFiles.clear();
+ }
+
+ public void startTasks()
+ {
+ timer.schedule(reclaimerTask, taskPeriod, taskPeriod);
- filesToDelete.clear();
+ timer.schedule(availableFilesTask, taskPeriod, taskPeriod);
}
// Public -----------------------------------------------------------------------------
- public LinkedList<JournalFile> getFiles()
+ public Queue<JournalFile> getFiles()
{
return files;
}
- public LinkedList<JournalFile> getAvailableFiles()
+ public Queue<JournalFile> getAvailableFiles()
{
return availableFiles;
}
- public LinkedList<JournalFile> getFilesToDelete()
+ public void checkFilesForReclamation() throws Exception
+ {
+ log.info("checking files for reclamation");
+
+ for (JournalFile file: files)
+ {
+ if (file.isEmpty() && file != currentFile)
+ {
+ //File can be reclaimed
+
+ files.remove(file);
+
+ //Re-initialise it
+
+ long newOrderingID = generateOrderingID();
+
+ ByteBuffer bb = ByteBuffer.wrap(new byte[SIZE_LONG]);
+
+ bb.putLong(newOrderingID);
+
+ SequentialFile sf = file.getFile();
+
+ //Note we MUST re-fill it - otherwise we won't be able to detect corrupt records
+ sf.fill(0, fileSize, FILL_CHARACTER);
+
+ sf.write(bb);
+
+ JournalFile jf = new JournalFile(sf, newOrderingID);
+
+ availableFiles.add(jf);
+ }
+ }
+ }
+
+ // Private -----------------------------------------------------------------------------
+
+ private void playTransaction(final TransactionInfo tx, final Map<Long, RecordHistory> histories)
{
- return filesToDelete;
+ for (TransactionEntry entry: tx.entries)
+ {
+ switch (entry.type)
+ {
+ case ADD_RECORD:
+ {
+ handleAddRecord(entry.id, entry.file, entry.record, histories);
+
+ break;
+ }
+ case UPDATE_RECORD:
+ {
+ handleUpdateRecord(entry.id, entry.file, entry.record, histories);
+
+ break;
+ }
+ case DELETE_RECORD:
+ {
+ handleDeleteRecord(entry.id, histories);
+
+ break;
+ }
+ default:
+ {
+ throw new IllegalStateException("Invalid record type " + entry.type);
+ }
+ }
+ }
}
- // Private -----------------------------------------------------------------------------
+ private void handleAddRecord(final long id, final JournalFile file,
+ final byte[] record, final Map<Long, RecordHistory> histories)
+ {
+ RecordHandleImpl handle = new RecordHandleImpl(id);
+
+ handle.addFile(file);
+
+ RecordHistoryImpl history = new RecordHistoryImpl(handle);
+
+ history.addRecord(record);
+
+ histories.put(id, history);
+ }
- private void checkAndReclaimFile(JournalFile file) throws Exception
- {
- if (file.isEmpty() && file != currentFile)
+ private void handleUpdateRecord(final long id, final JournalFile file,
+ final byte[] record, final Map<Long, RecordHistory> histories)
+ {
+ RecordHistoryImpl history = (RecordHistoryImpl)histories.get(id);
+
+ if (history == null)
{
- //File can be reclaimed
+ throw new IllegalStateException("Cannot find record (update) " + id);
+ }
+
+ RecordHandleImpl handle = (RecordHandleImpl)history.getHandle();
+
+ handle.addFile(file);
+
+ history.addRecord(record);
+ }
+
+ private void handleDeleteRecord(final long id, final Map<Long, RecordHistory> histories)
+ {
+ RecordHistoryImpl history = (RecordHistoryImpl)histories.remove(id);
+
+ if (history == null)
+ {
+ throw new IllegalStateException("Cannot find record (delete) " + id);
+ }
+
+ RecordHandleImpl handle = (RecordHandleImpl)history.getHandle();
+
+ handle.recordDeleted();
+ }
+
+ private void handleTransactionalRecord(final byte type, final long txID, final long id,
+ final JournalFile file, final byte[] record,
+ final byte end,
+ final Map<Long, RecordHistory> histories)
+ {
+ TransactionInfo tx = transactions.get(txID);
+
+ if (tx == null)
+ {
+ tx = new TransactionInfo();
- files.remove(file);
+ transactions.put(txID, tx);
+ }
+
+ TransactionEntry entry = new TransactionEntry(type, id, file, record);
+
+ tx.entries.add(entry);
+
+ if (end == TX_DONE)
+ {
+ transactions.remove(txID);
- //TODO - add to delete file list if there are a lot of available files
-
- //Re-initialise it
-
- long newOrderingID = generateOrderingID();
-
- ByteBuffer bb = ByteBuffer.wrap(new byte[LONG_LENGTH]);
-
- bb.putLong(newOrderingID);
-
- SequentialFile sf = file.getFile();
-
- sf.reset();
-
- sf.write(bb);
-
- JournalFile jf = new JournalFile(sf, newOrderingID);
-
- availableFiles.add(jf);
+ playTransaction(tx, histories);
}
+ else if (end == TX_CONTINUE)
+ {
+ //
+ }
+ else
+ {
+ throw new IllegalStateException("Invalid transaction marker " + end);
+ }
}
-
+
private JournalFile createFile() throws Exception
{
log.info("Creating a new file");
long orderingID = generateOrderingID();
- String fileName = journalDir + "/" + JOURNAL_FILE_PREFIX + "-" + orderingID + "." + JOURNAL_FILE_EXTENSION;
+ String fileName = journalDir + "/" + filePrefix + "-" + orderingID + "." + fileExtension;
SequentialFile sequentialFile = fileFactory.createSequentialFile(fileName, sync);
sequentialFile.open();
- sequentialFile.preAllocate(fileSize, FILL_CHARACTER);
+ sequentialFile.fill(0, fileSize, FILL_CHARACTER);
- ByteBuffer bb = ByteBuffer.wrap(new byte[LONG_LENGTH]);
+ ByteBuffer bb = ByteBuffer.wrap(new byte[SIZE_LONG]);
bb.putLong(orderingID);
@@ -465,7 +1092,7 @@
private void checkFile(final int size) throws Exception
{
//We take into account the first timestamp long
- if (size > fileSize - LONG_LENGTH)
+ if (size > fileSize - SIZE_LONG)
{
throw new IllegalArgumentException("Record is too large to store " + size);
}
@@ -476,23 +1103,87 @@
if (currentFile != null)
{
- currentFile.getFile().close();
-
- checkAndReclaimFile(currentFile);
+ currentFile.getFile().close();
}
log.info("Getting new file");
- if (!availableFiles.isEmpty())
+ checkAndCreateAvailableFiles();
+
+ currentFile = availableFiles.remove();
+
+ files.add(currentFile);
+ }
+ }
+
+ private static class TransactionInfo
+ {
+ final List<TransactionEntry> entries = new ArrayList<TransactionEntry>();
+ }
+
+ private static class TransactionEntry
+ {
+ TransactionEntry(final byte type, final long id, final JournalFile file, final byte[] record)
+ {
+ this.type = type;
+ this.id = id;
+ this.file = file;
+ this.record = record;
+ }
+ final byte type;
+ final long id;
+ final JournalFile file;
+ final byte[] record;
+ }
+
+ private class ReclaimerTask extends TimerTask
+ {
+ public boolean cancel()
+ {
+ timer.cancel();
+
+ return super.cancel();
+ }
+
+ public void run()
+ {
+ try
{
- currentFile = availableFiles.remove();
+ checkFilesForReclamation();
}
- else
+ catch (Exception e)
{
- currentFile = createFile();
+ log.error("Failure in running reclaimer", e);
+
+ cancel();
}
+ }
+
+ }
+
+ private class AvailableFilesTask extends TimerTask
+ {
+ public boolean cancel()
+ {
+ timer.cancel();
- files.add(currentFile);
+ return super.cancel();
}
+
+ public void run()
+ {
+ try
+ {
+ checkAndCreateAvailableFiles();
+ }
+ catch (Exception e)
+ {
+ log.error("Failure in running availableFileChecker", e);
+
+ cancel();
+ }
+ }
+
}
+
}
Modified: trunk/src/main/org/jboss/messaging/core/journal/impl/NIOSequentialFile.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/impl/NIOSequentialFile.java 2008-03-12 03:05:48 UTC (rev 3870)
+++ trunk/src/main/org/jboss/messaging/core/journal/impl/NIOSequentialFile.java 2008-03-12 10:02:42 UTC (rev 3871)
@@ -71,7 +71,7 @@
channel = rfile.getChannel();
}
- public void preAllocate(final int size, final byte fillCharacter) throws Exception
+ public void fill(final int position, final int size, final byte fillCharacter) throws Exception
{
ByteBuffer bb = ByteBuffer.allocateDirect(size);
@@ -82,7 +82,7 @@
bb.flip();
- channel.position(0);
+ channel.position(position);
channel.write(bb);
@@ -103,11 +103,13 @@
file.delete();
}
- public void read(ByteBuffer bytes) throws Exception
+ public int read(ByteBuffer bytes) throws Exception
{
int bytesRead = channel.read(bytes);
log.info("Read " + bytesRead + " bytes");
+
+ return bytesRead;
}
public void write(ByteBuffer bytes) throws Exception
@@ -117,7 +119,7 @@
if (sync)
{
channel.force(false);
- };
+ }
}
public void reset() throws Exception
Modified: trunk/src/main/org/jboss/messaging/core/journal/impl/RecordHandleImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/impl/RecordHandleImpl.java 2008-03-12 03:05:48 UTC (rev 3870)
+++ trunk/src/main/org/jboss/messaging/core/journal/impl/RecordHandleImpl.java 2008-03-12 10:02:42 UTC (rev 3871)
@@ -1,5 +1,8 @@
package org.jboss.messaging.core.journal.impl;
+import java.util.ArrayList;
+import java.util.List;
+
import org.jboss.messaging.core.journal.RecordHandle;
/**
@@ -13,13 +16,18 @@
{
private final long id;
- private final JournalFile file;
+ private List<JournalFile> files = new ArrayList<JournalFile>();
- public RecordHandleImpl(final long id, final JournalFile file)
+ public RecordHandleImpl(final long id)
{
this.id = id;
+ }
+
+ public void addFile(JournalFile file)
+ {
+ files.add(file);
- this.file = file;
+ file.incRefCount();
}
public long getID()
@@ -27,8 +35,11 @@
return id;
}
- public JournalFile getFile()
+ public void recordDeleted()
{
- return file;
+ for (JournalFile file: files)
+ {
+ file.decRefCount();
+ }
}
}
Added: trunk/src/main/org/jboss/messaging/core/journal/impl/RecordHistoryImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/impl/RecordHistoryImpl.java (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/journal/impl/RecordHistoryImpl.java 2008-03-12 10:02:42 UTC (rev 3871)
@@ -0,0 +1,41 @@
+package org.jboss.messaging.core.journal.impl;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.jboss.messaging.core.journal.RecordHandle;
+import org.jboss.messaging.core.journal.RecordHistory;
+
+/**
+ *
+ * A RecordHistoryImpl
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ */
+public class RecordHistoryImpl implements RecordHistory
+{
+ private final List<byte[]> records = new ArrayList<byte[]>();
+
+ private final RecordHandle handle;
+
+ public RecordHistoryImpl(final RecordHandle handle)
+ {
+ this.handle = handle;
+ }
+
+ public RecordHandle getHandle()
+ {
+ return handle;
+ }
+
+ public List<byte[]> getRecords()
+ {
+ return records;
+ }
+
+ public void addRecord(final byte[] record)
+ {
+ records.add(record);
+ }
+}
Modified: trunk/tests/src/org/jboss/messaging/core/journal/impl/test/unit/JournalTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/core/journal/impl/test/unit/JournalTest.java 2008-03-12 03:05:48 UTC (rev 3870)
+++ trunk/tests/src/org/jboss/messaging/core/journal/impl/test/unit/JournalTest.java 2008-03-12 10:02:42 UTC (rev 3871)
@@ -25,7 +25,6 @@
import java.nio.ByteBuffer;
import java.util.Map;
-import org.jboss.messaging.core.journal.SequentialFile;
import org.jboss.messaging.core.journal.impl.JournalImpl;
import org.jboss.messaging.core.journal.impl.test.unit.fakes.FakeSequentialFileFactory;
import org.jboss.messaging.core.journal.impl.test.unit.fakes.FakeSequentialFileFactory.FakeSequentialFile;
@@ -60,26 +59,32 @@
public void testLoad() throws Exception
{
- final int numFiles = 10;
+ final int minFiles = 10;
+ final int minAvailableFiles = 10;
+
final int fileSize = 10 * 1024;
final boolean sync = true;
+ final String filePrefix = "jbm";
+
+ final String fileExtension = "jbm";
+
long timeStart = System.currentTimeMillis();
- JournalImpl journal = new JournalImpl(journalDir, fileSize, numFiles, sync, factory);
+ JournalImpl journal =
+ new JournalImpl(journalDir, fileSize, minFiles, minAvailableFiles, sync, factory, 5000, filePrefix, fileExtension);
journal.load();
long timeEnd = System.currentTimeMillis();
assertEquals(1, journal.getFiles().size());
- assertEquals(numFiles - 1, journal.getAvailableFiles().size());
- assertEquals(0, journal.getFilesToDelete().size());
+ assertEquals(minFiles - 1, journal.getAvailableFiles().size());
+
+ assertEquals(minFiles, factory.getFileMap().size());
- assertEquals(numFiles, factory.getFileMap().size());
-
for (Map.Entry<String, FakeSequentialFile> entry: factory.getFileMap().entrySet())
{
FakeSequentialFile file = (FakeSequentialFile)entry.getValue();
@@ -98,7 +103,7 @@
long orderingID = bb.getLong();
String expectedFilename =
- journalDir + "/" + JournalImpl.JOURNAL_FILE_PREFIX + "-" + orderingID + "." + JournalImpl.JOURNAL_FILE_EXTENSION;
+ journalDir + "/" + filePrefix + "-" + orderingID + "." + fileExtension;
assertEquals(expectedFilename, file.getFileName());
@@ -128,22 +133,21 @@
assertEquals(0, journal.getFiles().size());
assertEquals(0, journal.getAvailableFiles().size());
- assertEquals(0, journal.getFilesToDelete().size());
-
+
//Now reload
- journal = new JournalImpl(journalDir, fileSize, numFiles, sync, factory);
+ journal = new JournalImpl(journalDir, fileSize, minFiles, minAvailableFiles, sync, factory, 5000, filePrefix, fileExtension);
+
log.info("******** reloading");
journal.load();
assertEquals(1, journal.getFiles().size());
- assertEquals(numFiles - 1, journal.getAvailableFiles().size());
- assertEquals(0, journal.getFilesToDelete().size());
+ assertEquals(minFiles - 1, journal.getAvailableFiles().size());
+
+ assertEquals(minFiles, factory.getFileMap().size());
- assertEquals(numFiles, factory.getFileMap().size());
-
for (Map.Entry<String, FakeSequentialFile> entry: factory.getFileMap().entrySet())
{
FakeSequentialFile file = (FakeSequentialFile)entry.getValue();
@@ -162,7 +166,7 @@
long orderingID = bb.getLong();
String expectedFilename =
- journalDir + "/" + JournalImpl.JOURNAL_FILE_PREFIX + "-" + orderingID + "." + JournalImpl.JOURNAL_FILE_EXTENSION;
+ journalDir + "/" + filePrefix + "-" + orderingID + "." + fileExtension;
assertEquals(expectedFilename, file.getFileName());
Modified: trunk/tests/src/org/jboss/messaging/core/journal/impl/test/unit/fakes/FakeSequentialFileFactory.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/core/journal/impl/test/unit/fakes/FakeSequentialFileFactory.java 2008-03-12 03:05:48 UTC (rev 3870)
+++ trunk/tests/src/org/jboss/messaging/core/journal/impl/test/unit/fakes/FakeSequentialFileFactory.java 2008-03-12 10:02:42 UTC (rev 3871)
@@ -151,7 +151,7 @@
open = true;
}
- public void preAllocate(int size, byte fillCharacter) throws Exception
+ public void fill(int pos, int size, byte fillCharacter) throws Exception
{
if (!open)
{
@@ -162,7 +162,7 @@
byte[] bytes = new byte[size];
- for (int i = 0; i < size; i++)
+ for (int i = pos; i < size + pos; i++)
{
bytes[i] = fillCharacter;
}
@@ -170,7 +170,7 @@
data = ByteBuffer.wrap(bytes);
}
- public void read(ByteBuffer bytes) throws Exception
+ public int read(ByteBuffer bytes) throws Exception
{
if (!open)
{
@@ -186,6 +186,8 @@
data.get(bytesRead);
bytes.put(bytesRead);
+
+ return bytesRead.length;
}
public void reset() throws Exception
More information about the jboss-cvs-commits
mailing list