[jboss-cvs] JBoss Messaging SVN: r3884 - in trunk: src/main/org/jboss/messaging/core/journal/impl and 2 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Mon Mar 17 08:04:45 EDT 2008
Author: timfox
Date: 2008-03-17 08:04:44 -0400 (Mon, 17 Mar 2008)
New Revision: 3884
Added:
trunk/src/main/org/jboss/messaging/core/journal/PreparedTransactionInfo.java
trunk/src/main/org/jboss/messaging/core/journal/RecordInfo.java
trunk/src/main/org/jboss/messaging/core/journal/impl/TransactionHolder.java
trunk/tests/src/org/jboss/messaging/core/journal/impl/test/unit/FakeSequentialFileFactoryTest.java
trunk/tests/src/org/jboss/messaging/core/journal/impl/test/unit/JournalImplTest.java
trunk/tests/src/org/jboss/messaging/core/journal/impl/test/unit/JournalImplTestBase.java
trunk/tests/src/org/jboss/messaging/core/journal/impl/test/unit/NIOSequentialFileFactoryTest.java
trunk/tests/src/org/jboss/messaging/core/journal/impl/test/unit/RealJournalImplTest.java
trunk/tests/src/org/jboss/messaging/core/journal/impl/test/unit/SequentialFileFactoryTestBase.java
trunk/tests/src/org/jboss/messaging/core/journal/impl/test/unit/fakes/FakeSequentialFile.java
Removed:
trunk/src/main/org/jboss/messaging/core/journal/RecordHandle.java
trunk/src/main/org/jboss/messaging/core/journal/RecordHistory.java
trunk/src/main/org/jboss/messaging/core/journal/impl/RecordHandleImpl.java
trunk/src/main/org/jboss/messaging/core/journal/impl/RecordHistoryImpl.java
trunk/tests/src/org/jboss/messaging/core/journal/impl/test/unit/JournalTest.java
Modified:
trunk/src/main/org/jboss/messaging/core/journal/Journal.java
trunk/src/main/org/jboss/messaging/core/journal/SequentialFile.java
trunk/src/main/org/jboss/messaging/core/journal/SequentialFileFactory.java
trunk/src/main/org/jboss/messaging/core/journal/impl/JournalFile.java
trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java
trunk/src/main/org/jboss/messaging/core/journal/impl/NIOSequentialFile.java
trunk/src/main/org/jboss/messaging/core/journal/impl/NIOSequentialFileFactory.java
trunk/tests/src/org/jboss/messaging/core/journal/impl/test/unit/fakes/FakeSequentialFileFactory.java
Log:
More journal work
Modified: trunk/src/main/org/jboss/messaging/core/journal/Journal.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/Journal.java 2008-03-17 10:31:09 UTC (rev 3883)
+++ trunk/src/main/org/jboss/messaging/core/journal/Journal.java 2008-03-17 12:04:44 UTC (rev 3884)
@@ -23,6 +23,7 @@
import java.util.List;
+import org.jboss.messaging.core.server.MessagingComponent;
/**
*
@@ -31,34 +32,40 @@
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
*
*/
-public interface Journal
+public interface Journal extends MessagingComponent
{
// Non transactional operations
- RecordHandle appendAddRecord(long id, byte[] record) throws Exception;
+ void appendAddRecord(long id, byte[] record) throws Exception;
- void appendUpdateRecord(RecordHandle handle, byte[] record) throws Exception;
+ void appendUpdateRecord(long id, byte[] record) throws Exception;
- void appendDeleteRecord(RecordHandle handle) throws Exception;
+ void appendDeleteRecord(long id) throws Exception;
// Transactional operations
- RecordHandle appendAddRecordTransactional(long txID, long id, byte[] record, boolean done) throws Exception;
+ void appendAddRecordTransactional(long txID, long id, byte[] record, boolean done) throws Exception;
- void appendUpdateRecordTransactional(long txID, RecordHandle handle, byte[] record, boolean done) throws Exception;
+ void appendUpdateRecordTransactional(long txID, long id, byte[] record, boolean done) throws Exception;
- void appendDeleteRecordTransactional(long txID, RecordHandle handle, boolean done) throws Exception;
+ void appendDeleteRecordTransactional(long txID, long id, boolean done) throws Exception;
+ //XA operations
-// RecordHandle appendAddRecordPrepare(long txID, long id, byte[] record, boolean done) throws Exception;
-//
-// void appendUpdateRecordPrepare(long txID, RecordHandle handle, byte[] record, boolean done) throws Exception;
-//
-// void appendDeleteRecordPrepare(long txID, RecordHandle handle, boolean done) throws Exception;
+ void appendAddRecordPrepare(long txID, long id, byte[] record, boolean done) throws Exception;
+
+ void appendUpdateRecordPrepare(long txID, long id, byte[] record, boolean done) throws Exception;
+
+ void appendDeleteRecordPrepare(long txID, long id, boolean done) throws Exception;
+
+ void appendXACommitRecord(long txID) throws Exception;
+
+ void appendXARollbackRecord(long txID) throws Exception;
// Load
- List<RecordHistory> load() throws Exception;
+ void load(List<RecordInfo> committedRecords,
+ List<PreparedTransactionInfo> preparedTransactions) throws Exception;
}
Added: trunk/src/main/org/jboss/messaging/core/journal/PreparedTransactionInfo.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/PreparedTransactionInfo.java (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/journal/PreparedTransactionInfo.java 2008-03-17 12:04:44 UTC (rev 3884)
@@ -0,0 +1,27 @@
+package org.jboss.messaging.core.journal;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+/**
+ *
+ * A PreparedTransactionInfo
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ */
+public class PreparedTransactionInfo
+{
+ public final long id;
+
+ public final List<RecordInfo> records = new ArrayList<RecordInfo>();
+
+ public final Set<Long> recordsToDelete = new HashSet<Long>();
+
+ public PreparedTransactionInfo(final long id)
+ {
+ this.id = id;
+ }
+}
Deleted: trunk/src/main/org/jboss/messaging/core/journal/RecordHandle.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/RecordHandle.java 2008-03-17 10:31:09 UTC (rev 3883)
+++ trunk/src/main/org/jboss/messaging/core/journal/RecordHandle.java 2008-03-17 12:04:44 UTC (rev 3884)
@@ -1,34 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005, JBoss Inc., and individual contributors as indicated
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-package org.jboss.messaging.core.journal;
-
-/**
- *
- * A RecordHandle
- *
- * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- *
- */
-public interface RecordHandle
-{
- public long getID();
-}
Deleted: trunk/src/main/org/jboss/messaging/core/journal/RecordHistory.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/RecordHistory.java 2008-03-17 10:31:09 UTC (rev 3883)
+++ trunk/src/main/org/jboss/messaging/core/journal/RecordHistory.java 2008-03-17 12:04:44 UTC (rev 3884)
@@ -1,38 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005, JBoss Inc., and individual contributors as indicated
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-package org.jboss.messaging.core.journal;
-
-import java.util.List;
-
-/**
- *
- * A RecordHistory
- *
- * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- *
- */
-public interface RecordHistory
-{
- RecordHandle getHandle();
-
- List<byte[]> getRecords();
-}
Added: trunk/src/main/org/jboss/messaging/core/journal/RecordInfo.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/RecordInfo.java (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/journal/RecordInfo.java 2008-03-17 12:04:44 UTC (rev 3884)
@@ -0,0 +1,40 @@
+package org.jboss.messaging.core.journal;
+
+
+/**
+ *
+ * A RecordInfo
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ */
+public class RecordInfo
+{
+ public RecordInfo(final long id, final byte[] data, final boolean isUpdate)
+ {
+ this.id = id;
+
+ this.data = data;
+
+ this.isUpdate = isUpdate;
+ }
+
+ public final long id;
+
+ public final byte[] data;
+
+ public boolean isUpdate;
+
+ public int hashCode()
+ {
+ return (int)((id >>> 32) ^ id);
+ }
+
+ public boolean equals(Object other)
+ {
+ RecordInfo r = (RecordInfo)other;
+
+ return r.id == this.id;
+ }
+
+}
Modified: trunk/src/main/org/jboss/messaging/core/journal/SequentialFile.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/SequentialFile.java 2008-03-17 10:31:09 UTC (rev 3883)
+++ trunk/src/main/org/jboss/messaging/core/journal/SequentialFile.java 2008-03-17 12:04:44 UTC (rev 3884)
@@ -43,11 +43,11 @@
void delete() throws Exception;
- void write(ByteBuffer bytes) throws Exception;
+ int write(ByteBuffer bytes, boolean sync) throws Exception;
int read(ByteBuffer bytes) throws Exception;
- void reset() throws Exception;
+ void position(int pos) throws Exception;
void close() throws Exception;
}
Modified: trunk/src/main/org/jboss/messaging/core/journal/SequentialFileFactory.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/SequentialFileFactory.java 2008-03-17 10:31:09 UTC (rev 3883)
+++ trunk/src/main/org/jboss/messaging/core/journal/SequentialFileFactory.java 2008-03-17 12:04:44 UTC (rev 3884)
@@ -34,5 +34,5 @@
{
SequentialFile createSequentialFile(String fileName, boolean sync) throws Exception;
- List<String> listFiles(String journalDir, String extension) throws Exception;
+ List<String> listFiles(String extension) throws Exception;
}
Modified: trunk/src/main/org/jboss/messaging/core/journal/impl/JournalFile.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/impl/JournalFile.java 2008-03-17 10:31:09 UTC (rev 3883)
+++ trunk/src/main/org/jboss/messaging/core/journal/impl/JournalFile.java 2008-03-17 12:04:44 UTC (rev 3884)
@@ -39,9 +39,11 @@
private final long orderingID;
- private int refCount;
-
private int offset;
+
+ private final Set<Long> positives = new HashSet<Long>();
+
+ private final Set<Long> negatives = new HashSet<Long>();
public JournalFile(final SequentialFile file, final long orderingID)
{
@@ -73,21 +75,15 @@
public SequentialFile getFile()
{
return file;
- }
+ }
- public void incRefCount()
+ public void addPositive(final long id)
{
- refCount++;
+ this.positives.add(id);
}
- public void decRefCount()
+ public void addNegative(final long id)
{
- refCount--;
+ this.negatives.add(id);
}
-
- public boolean isEmpty()
- {
- return refCount == 0;
- }
-
}
Modified: trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java 2008-03-17 10:31:09 UTC (rev 3883)
+++ trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java 2008-03-17 12:04:44 UTC (rev 3884)
@@ -25,19 +25,20 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
+import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Queue;
+import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
-import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Semaphore;
import org.jboss.messaging.core.journal.Journal;
-import org.jboss.messaging.core.journal.RecordHandle;
-import org.jboss.messaging.core.journal.RecordHistory;
+import org.jboss.messaging.core.journal.PreparedTransactionInfo;
+import org.jboss.messaging.core.journal.RecordInfo;
import org.jboss.messaging.core.journal.SequentialFile;
import org.jboss.messaging.core.journal.SequentialFileFactory;
import org.jboss.messaging.core.logging.Logger;
@@ -53,6 +54,12 @@
{
private static final Logger log = Logger.getLogger(JournalImpl.class);
+ private static final int STATE_STOPPED = 0;
+
+ private static final int STATE_STARTED = 1;
+
+ private static final int STATE_LOADED = 2;
+
// The sizes of primitive types
private static final int SIZE_LONG = 8;
@@ -61,6 +68,10 @@
private static final int SIZE_BYTE = 1;
+ public static final int MIN_FILE_SIZE = 1024;
+
+ public static final int MIN_TASK_PERIOD = 5000;
+
//Record markers - they must be all unique
public static final byte ADD_RECORD = 11;
@@ -75,21 +86,31 @@
public static final byte DELETE_RECORD_TX = 16;
+ public static final byte ADD_RECORD_PREPARE = 17;
+
+ public static final byte UPDATE_RECORD_PREPARE = 18;
+
+ public static final byte DELETE_RECORD_PREPARE = 19;
+
+ public static final byte XA_COMMIT_RECORD = 20;
+
+ public static final byte XA_ROLLBACK_RECORD = 21;
+
//End markers - they must all be unique
- public static final byte DONE = 21;
+ public static final byte DONE = 31;
- public static final byte TX_CONTINUE = 22;
+ public static final byte TX_CONTINUE = 32;
- public static final byte TX_DONE = 23;
+ public static final byte TX_DONE = 33;
+ public static final byte XA_CONTINUE = 34;
+
+ public static final byte XA_DONE = 35;
+
public static final byte FILL_CHARACTER = 74; // Letter 'J'
-
-
-
- private final String journalDir;
-
+
private final int fileSize;
private final int minFiles;
@@ -111,11 +132,6 @@
private final Queue<JournalFile> availableFiles = new ConcurrentLinkedQueue<JournalFile>();
- private final Map<Long, TransactionInfo> transactions = new ConcurrentHashMap<Long, TransactionInfo>();
-
- private final Map<Long, List<RecordHandle>> transactionalDeletes =
- new ConcurrentHashMap<Long, List<RecordHandle>>();
-
/*
* We use a semaphore rather than synchronized since it performs better when contended
*/
@@ -125,23 +141,49 @@
private volatile JournalFile currentFile ;
- private volatile boolean loaded;
+ private volatile int state;
private volatile long lastOrderingID;
private final Timer timer = new Timer(true);
- private final TimerTask reclaimerTask = new ReclaimerTask();
+ private TimerTask reclaimerTask;
- private final TimerTask availableFilesTask = new AvailableFilesTask();
+ private TimerTask availableFilesTask;
-
- public JournalImpl(final String journalDir, final int fileSize, final int minFiles, final int minAvailableFiles,
+ public JournalImpl(final int fileSize, final int minFiles, final int minAvailableFiles,
final boolean sync, final SequentialFileFactory fileFactory, final long taskPeriod,
final String filePrefix, final String fileExtension)
{
- this.journalDir = journalDir;
+ if (fileSize < MIN_FILE_SIZE)
+ {
+ throw new IllegalArgumentException("File size cannot be less than " + MIN_FILE_SIZE + " bytes");
+ }
+ if (minFiles < 2)
+ {
+ throw new IllegalArgumentException("minFiles cannot be less than 2");
+ }
+ if (minAvailableFiles < 2)
+ {
+ throw new IllegalArgumentException("minAvailableFiles cannot be less than 2");
+ }
+ if (fileFactory == null)
+ {
+ throw new NullPointerException("fileFactory is null");
+ }
+ if (taskPeriod < MIN_TASK_PERIOD)
+ {
+ throw new IllegalArgumentException("taskPeriod cannot be less than " + MIN_TASK_PERIOD);
+ }
+ if (filePrefix == null)
+ {
+ throw new NullPointerException("filePrefix is null");
+ }
+ if (fileExtension == null)
+ {
+ throw new NullPointerException("fileExtension is null");
+ }
this.fileSize = fileSize;
@@ -167,77 +209,123 @@
return ByteBuffer.allocateDirect(size);
}
- public RecordHandle appendAddRecord(final long id, final byte[] record) throws Exception
+ public void appendAddRecord(final long id, final byte[] record) throws Exception
{
- if (!loaded)
+ if (state != STATE_LOADED)
{
throw new IllegalStateException("Journal must be loaded first");
}
- //TODO optimise to avoid creating a new byte buffer
-
int size = SIZE_BYTE + SIZE_LONG + SIZE_INT + record.length + SIZE_BYTE;
ByteBuffer bb = ByteBuffer.wrap(new byte[size]);
- bb.put(ADD_RECORD);
+ bb.put(ADD_RECORD);
+ bb.putLong(id);
+ bb.putInt(record.length);
+ bb.put(record);
+ bb.put(DONE);
- bb.putLong(id);
-
- bb.putInt(record.length);
-
- bb.put(record);
-
- bb.put(DONE);
-
bb.flip();
lock.acquire();
try
{
- checkFile(size);
-
- currentFile.getFile().write(bb);
-
+ checkFile(size);
+ currentFile.getFile().write(bb, true);
currentFile.extendOffset(size);
-
- RecordHandleImpl rh = new RecordHandleImpl(id);
-
- rh.addFile(currentFile);
-
- return rh;
}
finally
{
lock.release();
}
}
-
- public RecordHandle appendAddRecordTransactional(final long txID, final long id,
- final byte[] record, final boolean done) throws Exception
+
+ public void appendUpdateRecord(final long id, final byte[] record) throws Exception
{
- if (!loaded)
+ if (state != STATE_LOADED)
{
throw new IllegalStateException("Journal must be loaded first");
}
//TODO optimise to avoid creating a new byte buffer
- int size = SIZE_BYTE + SIZE_LONG + SIZE_LONG + SIZE_INT + record.length + SIZE_BYTE;
+ int size = SIZE_BYTE + SIZE_LONG + SIZE_INT + record.length + SIZE_BYTE;
ByteBuffer bb = ByteBuffer.wrap(new byte[size]);
- bb.put(ADD_RECORD);
+ bb.put(UPDATE_RECORD);
+ bb.putLong(id);
+ bb.putInt(record.length);
+ bb.put(record);
+ bb.put(DONE);
+ bb.flip();
+
+ lock.acquire();
+
+ try
+ {
+ checkFile(size);
+ currentFile.getFile().write(bb, true);
+ currentFile.extendOffset(size);
+ }
+ finally
+ {
+ lock.release();
+ }
+ }
+
+ public void appendDeleteRecord(long id) throws Exception
+ {
+ if (state != STATE_LOADED)
+ {
+ throw new IllegalStateException("Journal must be loaded first");
+ }
+
+ int size = SIZE_BYTE + SIZE_LONG + SIZE_BYTE;
+
+ ByteBuffer buffer = ByteBuffer.wrap(new byte[size]);
+
+ buffer.put(DELETE_RECORD);
+ buffer.putLong(id);
+ buffer.put(DONE);
+
+ buffer.flip();
+
+ lock.acquire();
+
+ try
+ {
+ checkFile(size);
+ currentFile.getFile().write(buffer, true);
+ currentFile.extendOffset(size);
+ }
+ finally
+ {
+ lock.release();
+ }
+ }
+
+ public void appendAddRecordTransactional(final long txID, final long id,
+ final byte[] record, final boolean done) throws Exception
+ {
+ if (state != STATE_LOADED)
+ {
+ throw new IllegalStateException("Journal must be loaded first");
+ }
+
+ int size = SIZE_BYTE + SIZE_LONG + SIZE_LONG + SIZE_INT + record.length + SIZE_BYTE;
+
+ ByteBuffer bb = ByteBuffer.wrap(new byte[size]);
+
+ bb.put(ADD_RECORD_TX);
bb.putLong(txID);
-
bb.putLong(id);
-
bb.putInt(record.length);
-
bb.put(record);
-
+
if (done)
{
bb.put(TX_DONE);
@@ -246,34 +334,16 @@
{
bb.put(TX_CONTINUE);
}
-
+
bb.flip();
-
+
lock.acquire();
-
+
try
{
- checkFile(size);
-
- currentFile.getFile().write(bb);
-
- currentFile.extendOffset(size);
-
- RecordHandleImpl rh = new RecordHandleImpl(id);
-
- rh.addFile(currentFile);
-
- if (done)
- {
- List<RecordHandle> list = transactionalDeletes.remove(txID);
-
- if (list != null)
- {
- releaseDeletes(list);
- }
- }
-
- return rh;
+ checkFile(size);
+ currentFile.getFile().write(bb, done);
+ currentFile.extendOffset(size);
}
finally
{
@@ -281,30 +351,32 @@
}
}
- public void appendUpdateRecord(final RecordHandle handle, final byte[] record) throws Exception
+ public void appendUpdateRecordTransactional(final long txID, final long id,
+ final byte[] record, final boolean done) throws Exception
{
- if (!loaded)
+ if (state != STATE_LOADED)
{
throw new IllegalStateException("Journal must be loaded first");
}
+
+ int size = SIZE_BYTE + SIZE_LONG + SIZE_LONG + SIZE_INT + record.length + SIZE_BYTE;
- RecordHandleImpl rh = (RecordHandleImpl)handle;
-
- //TODO optimise to avoid creating a new byte buffer
-
- int size = SIZE_BYTE + SIZE_LONG + SIZE_INT + record.length + SIZE_BYTE;
-
ByteBuffer bb = ByteBuffer.wrap(new byte[size]);
- bb.put(UPDATE_RECORD);
-
- bb.putLong(rh.getID());
-
- bb.putInt(record.length);
-
+ bb.put(UPDATE_RECORD_TX);
+ bb.putLong(txID);
+ bb.putLong(id);
+ bb.putInt(record.length);
bb.put(record);
- bb.put(DONE);
+ if (done)
+ {
+ bb.put(TX_DONE);
+ }
+ else
+ {
+ bb.put(TX_CONTINUE);
+ }
bb.flip();
@@ -312,13 +384,9 @@
try
{
- checkFile(size);
-
- currentFile.getFile().write(bb);
-
- currentFile.extendOffset(size);
-
- rh.addFile(currentFile);
+ checkFile(size);
+ currentFile.getFile().write(bb, done);
+ currentFile.extendOffset(size);
}
finally
{
@@ -326,39 +394,113 @@
}
}
- public void appendUpdateRecordTransactional(final long txID, final RecordHandle handle,
- final byte[] record, final boolean done) throws Exception
+ public void appendDeleteRecordTransactional(final long txID, final long id, final boolean done) throws Exception
{
- if (!loaded)
+ if (state != STATE_LOADED)
{
throw new IllegalStateException("Journal must be loaded first");
}
- RecordHandleImpl rh = (RecordHandleImpl)handle;
+ int size = SIZE_BYTE + SIZE_LONG + SIZE_LONG + SIZE_BYTE;
- //TODO optimise to avoid creating a new byte buffer
+ ByteBuffer buffer = ByteBuffer.wrap(new byte[size]);
+ buffer.put(DELETE_RECORD_TX);
+ buffer.putLong(txID);
+ buffer.putLong(id);
+
+ if (done)
+ {
+ buffer.put(TX_DONE);
+ }
+ else
+ {
+ buffer.put(TX_CONTINUE);
+ }
+
+ buffer.flip();
+
+ lock.acquire();
+
+ try
+ {
+ checkFile(size);
+ currentFile.getFile().write(buffer, done);
+ currentFile.extendOffset(size);
+ }
+ finally
+ {
+ lock.release();
+ }
+ }
+
+
+ public void appendAddRecordPrepare(final long txID, final long id, final byte[] record, final boolean done) throws Exception
+ {
+ if (state != STATE_LOADED)
+ {
+ throw new IllegalStateException("Journal must be loaded first");
+ }
+
int size = SIZE_BYTE + SIZE_LONG + SIZE_LONG + SIZE_INT + record.length + SIZE_BYTE;
-
+
ByteBuffer bb = ByteBuffer.wrap(new byte[size]);
-
- bb.put(UPDATE_RECORD);
-
+
+ bb.put(ADD_RECORD_PREPARE);
bb.putLong(txID);
+ bb.putLong(id);
+ bb.putInt(record.length);
+ bb.put(record);
+
+ if (done)
+ {
+ bb.put(XA_DONE);
+ }
+ else
+ {
+ bb.put(XA_CONTINUE);
+ }
+
+ bb.flip();
+
+ lock.acquire();
+
+ try
+ {
+ checkFile(size);
+ currentFile.getFile().write(bb, done);
+ currentFile.extendOffset(size);
+ }
+ finally
+ {
+ lock.release();
+ }
+ }
+
+ public void appendUpdateRecordPrepare(final long txID, final long id, final byte[] record, final boolean done) throws Exception
+ {
+ if (state != STATE_LOADED)
+ {
+ throw new IllegalStateException("Journal must be loaded first");
+ }
- bb.putLong(rh.getID());
+ int size = SIZE_BYTE + SIZE_LONG + SIZE_LONG + SIZE_INT + record.length + SIZE_BYTE;
- bb.putInt(record.length);
+ ByteBuffer bb = ByteBuffer.wrap(new byte[size]);
+ bb.put(UPDATE_RECORD_PREPARE);
+ bb.putLong(txID);
+ bb.putLong(id);
+ bb.putInt(record.length);
bb.put(record);
if (done)
{
- bb.put(TX_DONE);
+ bb.put(XA_DONE);
}
else
{
- bb.put(TX_CONTINUE);
+ bb.put(XA_CONTINUE);
}
bb.flip();
@@ -367,58 +509,39 @@
try
{
- checkFile(size);
-
- currentFile.getFile().write(bb);
-
- currentFile.extendOffset(size);
-
- rh.addFile(currentFile);
-
- if (done)
- {
- List<RecordHandle> list = transactionalDeletes.remove(txID);
-
- if (list != null)
- {
- releaseDeletes(list);
- }
- }
+ checkFile(size);
+ currentFile.getFile().write(bb, done);
+ currentFile.extendOffset(size);
}
finally
{
lock.release();
- }
+ }
}
- private void releaseDeletes(final List<RecordHandle> deletes)
+ public void appendDeleteRecordPrepare(final long txID, final long id, final boolean done) throws Exception
{
- for (RecordHandle handle: deletes)
+ if (state != STATE_LOADED)
{
- RecordHandleImpl rh = (RecordHandleImpl)handle;
-
- rh.recordDeleted();
- }
- }
-
- public void appendDeleteRecord(final RecordHandle handle) throws Exception
- {
- if (!loaded)
- {
throw new IllegalStateException("Journal must be loaded first");
}
- RecordHandleImpl rh = (RecordHandleImpl)handle;
+ int size = SIZE_BYTE + SIZE_LONG + SIZE_LONG + SIZE_BYTE;
- int size = SIZE_BYTE + SIZE_LONG + SIZE_BYTE;
-
ByteBuffer buffer = ByteBuffer.wrap(new byte[size]);
- buffer.put(DELETE_RECORD);
+ buffer.put(DELETE_RECORD_PREPARE);
+ buffer.putLong(txID);
+ buffer.putLong(id);
- buffer.putLong(rh.getID());
-
- buffer.put(DONE);
+ if (done)
+ {
+ buffer.put(XA_DONE);
+ }
+ else
+ {
+ buffer.put(XA_CONTINUE);
+ }
buffer.flip();
@@ -426,13 +549,9 @@
try
{
- checkFile(size);
-
- currentFile.getFile().write(buffer);
-
- currentFile.extendOffset(size);
-
- rh.recordDeleted();
+ checkFile(size);
+ currentFile.getFile().write(buffer, done);
+ currentFile.extendOffset(size);
}
finally
{
@@ -440,33 +559,51 @@
}
}
- public void appendDeleteRecordTransactional(final long txID, final RecordHandle handle, final boolean done) throws Exception
+ public void appendXACommitRecord(final long txID) throws Exception
{
- if (!loaded)
+ if (state != STATE_LOADED)
{
throw new IllegalStateException("Journal must be loaded first");
}
- RecordHandleImpl rh = (RecordHandleImpl)handle;
+ int size = SIZE_BYTE + SIZE_LONG + SIZE_BYTE;
- int size = SIZE_BYTE + SIZE_LONG + SIZE_LONG + SIZE_BYTE;
-
ByteBuffer buffer = ByteBuffer.wrap(new byte[size]);
- buffer.put(DELETE_RECORD);
+ buffer.put(XA_COMMIT_RECORD);
+ buffer.putLong(txID);
+ buffer.put(XA_DONE);
- buffer.putLong(txID);
+ buffer.flip();
- buffer.putLong(rh.getID());
+ lock.acquire();
- if (done)
- {
- buffer.put(TX_DONE);
+ try
+ {
+ checkFile(size);
+ currentFile.getFile().write(buffer, true);
+ currentFile.extendOffset(size);
}
- else
+ finally
{
- buffer.put(TX_CONTINUE);
+ lock.release();
+ }
+ }
+
+ public void appendXARollbackRecord(final long txID) throws Exception
+ {
+ if (state != STATE_LOADED)
+ {
+ throw new IllegalStateException("Journal must be loaded first");
}
+
+ int size = SIZE_BYTE + SIZE_LONG + SIZE_BYTE;
+
+ ByteBuffer buffer = ByteBuffer.wrap(new byte[size]);
+
+ buffer.put(XA_ROLLBACK_RECORD);
+ buffer.putLong(txID);
+ buffer.put(XA_DONE);
buffer.flip();
@@ -474,52 +611,32 @@
try
{
- checkFile(size);
-
- currentFile.getFile().write(buffer);
-
+ checkFile(size);
+ currentFile.getFile().write(buffer, true);
currentFile.extendOffset(size);
-
- //It's a transactional delete so we need to make sure file doesn't get deleted
- rh.addFile(currentFile);
-
- List<RecordHandle> list = transactionalDeletes.get(txID);
-
- if (list == null)
- {
- list = new ArrayList<RecordHandle>();
-
- transactionalDeletes.put(txID, list);
- }
-
- list.add(handle);
-
- if (done)
- {
- transactionalDeletes.remove(txID);
-
- releaseDeletes(list);
- }
}
finally
{
lock.release();
- }
- }
+ }
+ }
- public List<RecordHistory> load() throws Exception
+ public void load(final List<RecordInfo> committedRecords,
+ final List<PreparedTransactionInfo> preparedTransactions) throws Exception
{
- if (loaded)
+ if (state != STATE_STARTED)
{
- throw new IllegalStateException("Journal is already loaded");
+ throw new IllegalStateException("Journal must be in started state");
}
- log.info("Loading...");
+ Set<Long> recordsToDelete = new HashSet<Long>();
- List<String> fileNames = fileFactory.listFiles(journalDir, fileExtension);
+ Map<Long, TransactionHolder> transactions = new LinkedHashMap<Long, TransactionHolder>();
- log.info("There are " + fileNames.size() + " files in directory");
+ List<RecordInfo> records = new ArrayList<RecordInfo>();
+ List<String> fileNames = fileFactory.listFiles(fileExtension);
+
List<JournalFile> orderedFiles = new ArrayList<JournalFile>(fileNames.size());
for (String fileName: fileNames)
@@ -535,14 +652,12 @@
bb.flip();
long orderingID = bb.getLong();
+
+ orderedFiles.add(new JournalFile(file, orderingID));
- file.reset();
-
- orderedFiles.add(new JournalFile(file, orderingID));
+ file.close();
}
- log.info("minFiles is " + minFiles);
-
int createNum = minFiles - orderedFiles.size();
//Preallocate some more if necessary
@@ -552,10 +667,8 @@
orderedFiles.add(file);
- log.info("Created new file");
+ file.getFile().close();
}
-
- log.info("Done creating new ones");
//Now order them by ordering id - we can't use the file name for ordering since we can re-use files
@@ -572,14 +685,14 @@
Collections.sort(orderedFiles, new JournalFileComparator());
- Map<Long, RecordHistory> histories = new LinkedHashMap<Long, RecordHistory>();
-
+ int lastDataPos = -1;
+
for (JournalFile file: orderedFiles)
- {
- log.info("Loading file, ordering id is " + file.getOrderingID());
-
+ {
ByteBuffer bb = ByteBuffer.wrap(new byte[fileSize]);
+ file.getFile().open();
+
int bytesRead = file.getFile().read(bb);
if (bytesRead != fileSize)
@@ -601,14 +714,12 @@
{
byte recordType = bb.get();
- log.info("recordtype is " + recordType);
-
int pos = bb.position();
switch(recordType)
{
case ADD_RECORD:
- {
+ {
long id = bb.getLong();
int size = bb.getInt();
@@ -624,9 +735,9 @@
repairFrom(pos, file);
}
else
- {
- handleAddRecord(id, file, record, histories);
-
+ {
+ records.add(new RecordInfo(id, record, false));
+
hasData = true;
}
@@ -650,7 +761,7 @@
}
else
{
- handleUpdateRecord(id, file, record, histories);
+ records.add(new RecordInfo(id, record, true));
hasData = true;
}
@@ -669,7 +780,7 @@
}
else
{
- handleDeleteRecord(id, histories);
+ recordsToDelete.add(id);
hasData = true;
}
@@ -696,8 +807,24 @@
}
else
{
- handleTransactionalRecord(ADD_RECORD, txID, id, file, record, end, histories);
+ TransactionHolder tx = transactions.get(txID);
+ if (tx == null)
+ {
+ tx = new TransactionHolder(txID);
+
+ transactions.put(txID, tx);
+ }
+
+ tx.recordInfos.add(new RecordInfo(id, record, false));
+
+ if (end == TX_DONE)
+ {
+ transactions.remove(txID);
+
+ records.addAll(tx.recordInfos);
+ }
+
hasData = true;
}
@@ -723,8 +850,22 @@
}
else
{
- handleTransactionalRecord(UPDATE_RECORD, txID, id, file, record, end, histories);
+ TransactionHolder tx = transactions.get(txID);
+ if (tx == null)
+ {
+ throw new IllegalStateException("Cannot find tx with id " + txID);
+ }
+
+ tx.recordInfos.add(new RecordInfo(id, record, true));
+
+ if (end == TX_DONE)
+ {
+ transactions.remove(txID);
+
+ records.addAll(tx.recordInfos);
+ }
+
hasData = true;
}
@@ -744,13 +885,200 @@
}
else
{
- handleTransactionalRecord(DELETE_RECORD, txID, id, file, null, end, histories);
+ TransactionHolder tx = transactions.get(txID);
+ if (tx == null)
+ {
+ throw new IllegalStateException("Cannot find tx with id " + txID);
+ }
+
+ tx.recordsToDelete.add(id);
+
+ if (end == TX_DONE)
+ {
+ transactions.remove(txID);
+
+ records.addAll(tx.recordInfos);
+
+ recordsToDelete.addAll(tx.recordsToDelete);
+ }
+
hasData = true;
}
break;
}
+ case ADD_RECORD_PREPARE:
+ {
+ long txID = bb.getLong();
+
+ long id = bb.getLong();
+
+ int size = bb.getInt();
+
+ byte[] record = new byte[size];
+
+ bb.get(record);
+
+ byte end = bb.get();
+
+ if (end != XA_DONE && end != XA_CONTINUE)
+ {
+ repairFrom(pos, file);
+ }
+ else
+ {
+ TransactionHolder tx = transactions.get(txID);
+
+ if (tx == null)
+ {
+ tx = new TransactionHolder(txID);
+
+ transactions.put(txID, tx);
+ }
+
+ tx.recordInfos.add(new RecordInfo(id, record, false));
+
+ if (end == XA_DONE)
+ {
+ tx.prepared = true;
+ }
+
+ hasData = true;
+ }
+
+ break;
+ }
+ case UPDATE_RECORD_PREPARE:
+ {
+ long txID = bb.getLong();
+
+ long id = bb.getLong();
+
+ int size = bb.getInt();
+
+ byte[] record = new byte[size];
+
+ bb.get(record);
+
+ byte end = bb.get();
+
+ if (end != XA_CONTINUE && end != XA_DONE)
+ {
+ repairFrom(pos, file);
+ }
+ else
+ {
+ TransactionHolder tx = transactions.get(txID);
+
+ if (tx == null)
+ {
+ throw new IllegalStateException("Cannot find tx with id " + txID);
+ }
+
+ tx.recordInfos.add(new RecordInfo(id, record, true));
+
+ if (end == XA_DONE)
+ {
+ tx.prepared = true;
+ }
+
+ hasData = true;
+ }
+
+ break;
+ }
+ case DELETE_RECORD_PREPARE:
+ {
+ long txID = bb.getLong();
+
+ long id = bb.getLong();
+
+ byte end = bb.get();
+
+ if (end != XA_CONTINUE && end != XA_DONE)
+ {
+ repairFrom(pos, file);
+ }
+ else
+ {
+ TransactionHolder tx = transactions.get(txID);
+
+ if (tx == null)
+ {
+ throw new IllegalStateException("Cannot find tx with id " + txID);
+ }
+
+ tx.recordsToDelete.add(id);
+
+ if (end == XA_DONE)
+ {
+ tx.prepared = true;
+ }
+
+ hasData = true;
+ }
+
+ break;
+ }
+ case XA_COMMIT_RECORD:
+ {
+ long txID = bb.getLong();
+
+ byte end = bb.get();
+
+ if (end != XA_DONE)
+ {
+ repairFrom(pos, file);
+ }
+ else
+ {
+ TransactionHolder tx = transactions.remove(txID);
+
+ if (tx == null)
+ {
+ throw new IllegalStateException("Cannot find tx with id " + txID);
+ }
+
+ if (!tx.prepared)
+ {
+ throw new IllegalStateException("Can't commit transaction - it is not prepared " + txID);
+ }
+
+ records.addAll(tx.recordInfos);
+
+ recordsToDelete.addAll(tx.recordsToDelete);
+ }
+
+ break;
+ }
+ case XA_ROLLBACK_RECORD:
+ {
+ long txID = bb.getLong();
+
+ byte end = bb.get();
+
+ if (end != XA_DONE)
+ {
+ repairFrom(pos, file);
+ }
+ else
+ {
+ TransactionHolder tx = transactions.remove(txID);
+
+ if (tx == null)
+ {
+ throw new IllegalStateException("Cannot find tx with id " + txID);
+ }
+
+ if (!tx.prepared)
+ {
+ throw new IllegalStateException("Can't roll back transaction - it is not prepared " + txID);
+ }
+ }
+
+ break;
+ }
case FILL_CHARACTER:
{
//End of records in file - we check the file only contains fill characters from this point
@@ -773,110 +1101,157 @@
" is corrupt, invalid record type " + recordType);
}
}
+
+ if (recordType != FILL_CHARACTER)
+ {
+ lastDataPos = pos;
+ }
}
-
+
if (hasData)
- {
- log.info("Adding to files");
+ {
+ files.add(file);
- files.add(file);
+ //Files are always maintained closed - there may be a lot of them and we don't want to run out
+ //of file handles
+ file.getFile().close();
}
else
- {
- log.info("Adding to available files");
-
+ {
//Empty files with no data
availableFiles.add(file);
+
+ //Position it ready for writing
+ file.getFile().position(SIZE_LONG);
}
- }
-
+ }
+
//Now it's possible that some of the files are no longer needed
checkFilesForReclamation();
+
+ //Check we have enough available files
+ checkAndCreateAvailableFiles();
+
for (JournalFile file: files)
{
- currentFile = file;
+ currentFile = file;
}
- //Check we have enough available files
+ if (currentFile != null)
+ {
+ currentFile.getFile().open();
- checkAndCreateAvailableFiles();
-
- if (currentFile == null)
+ currentFile.getFile().position(lastDataPos);
+ }
+ else
{
currentFile = availableFiles.remove();
files.add(currentFile);
}
- //Close all files apart from the current one
+ startTasks();
+
+ for (RecordInfo record: records)
+ {
+ if (!recordsToDelete.contains(record.id))
+ {
+ committedRecords.add(record);
+ }
+ }
- for (JournalFile file: files)
+ for (TransactionHolder transaction: transactions.values())
{
- if (file != currentFile)
+ if (!transaction.prepared)
{
- file.getFile().close();
+ log.warn("Uncommitted transaction with id " + transaction.transactionID + " found and discarded");
}
+ else
+ {
+ PreparedTransactionInfo info = new PreparedTransactionInfo(transaction.transactionID);
+
+ info.records.addAll(transaction.recordInfos);
+
+ info.recordsToDelete.addAll(transaction.recordsToDelete);
+
+ preparedTransactions.add(info);
+ }
}
-
- startTasks();
-
- loaded = true;
-
- return new ArrayList<RecordHistory>(histories.values());
+
+ state = STATE_LOADED;
}
-
- private void repairFrom(int pos, JournalFile file) throws Exception
- {
- log.warn("Corruption has been detected in file: " + file.getFile().getFileName() +
- " in the record that starts at position " + pos + ". " +
- "The most likely cause is that a crash occurred in the previous run. The corrupt record will be discarded.");
-
- file.getFile().fill(pos, fileSize - pos, FILL_CHARACTER);
- }
-
+
public void checkAndCreateAvailableFiles() throws Exception
{
- log.info("Checking if we need to create more files");
-
+ //log.info("min available " + minAvailableFiles + " avail: " + availableFiles.size() + " files: " + files.size());
int filesToCreate = minAvailableFiles - availableFiles.size();
for (int i = 0; i < filesToCreate; i++)
{
JournalFile file = createFile();
+ //log.info("Creating new file");
+
availableFiles.add(file);
}
}
- public void stop() throws Exception
+ // MessagingComponent implementation ---------------------------------------------------
+
+ public synchronized void start()
{
- reclaimerTask.cancel();
+ if (state != STATE_STOPPED)
+ {
+ throw new IllegalStateException("Journal is not stopped");
+ }
- availableFilesTask.cancel();
+ state = STATE_STARTED;
+ }
+
+ public synchronized void stop() throws Exception
+ {
+ if (state == STATE_STOPPED)
+ {
+ throw new IllegalStateException("Journal is already stopped");
+ }
- for (JournalFile file: files)
+ if (reclaimerTask != null)
{
- file.getFile().close();
+ reclaimerTask.cancel();
}
+ if (availableFilesTask != null)
+ {
+ availableFilesTask.cancel();
+ }
+
+ if (currentFile != null)
+ {
+ currentFile.getFile().close();
+ }
+
for (JournalFile file: availableFiles)
{
file.getFile().close();
}
- this.currentFile = null;
+ currentFile = null;
files.clear();
- availableFiles.clear();
+ availableFiles.clear();
+
+ state = STATE_STOPPED;
}
public void startTasks()
{
+ reclaimerTask = new ReclaimerTask();
timer.schedule(reclaimerTask, taskPeriod, taskPeriod);
+ availableFilesTask = new AvailableFilesTask();
timer.schedule(availableFilesTask, taskPeriod, taskPeriod);
}
@@ -894,11 +1269,10 @@
public void checkFilesForReclamation() throws Exception
{
- log.info("checking files for reclamation");
-
for (JournalFile file: files)
{
- if (file.isEmpty() && file != currentFile)
+ //TODO reclamation
+ if (false && file != currentFile)
{
//File can be reclaimed
@@ -917,135 +1291,39 @@
//Note we MUST re-fill it - otherwise we won't be able to detect corrupt records
sf.fill(0, fileSize, FILL_CHARACTER);
- sf.write(bb);
+ sf.write(bb, true);
JournalFile jf = new JournalFile(sf, newOrderingID);
+ sf.position(SIZE_LONG);
+
+ jf.resetOffset();
+
+ jf.extendOffset(SIZE_LONG);
+
availableFiles.add(jf);
}
}
}
// Private -----------------------------------------------------------------------------
-
- private void playTransaction(final TransactionInfo tx, final Map<Long, RecordHistory> histories)
- {
- for (TransactionEntry entry: tx.entries)
- {
- switch (entry.type)
- {
- case ADD_RECORD:
- {
- handleAddRecord(entry.id, entry.file, entry.record, histories);
-
- break;
- }
- case UPDATE_RECORD:
- {
- handleUpdateRecord(entry.id, entry.file, entry.record, histories);
-
- break;
- }
- case DELETE_RECORD:
- {
- handleDeleteRecord(entry.id, histories);
-
- break;
- }
- default:
- {
- throw new IllegalStateException("Invalid record type " + entry.type);
- }
- }
- }
- }
-
- private void handleAddRecord(final long id, final JournalFile file,
- final byte[] record, final Map<Long, RecordHistory> histories)
- {
- RecordHandleImpl handle = new RecordHandleImpl(id);
- handle.addFile(file);
-
- RecordHistoryImpl history = new RecordHistoryImpl(handle);
-
- history.addRecord(record);
-
- histories.put(id, history);
- }
-
- private void handleUpdateRecord(final long id, final JournalFile file,
- final byte[] record, final Map<Long, RecordHistory> histories)
- {
- RecordHistoryImpl history = (RecordHistoryImpl)histories.get(id);
-
- if (history == null)
- {
- throw new IllegalStateException("Cannot find record (update) " + id);
- }
-
- RecordHandleImpl handle = (RecordHandleImpl)history.getHandle();
-
- handle.addFile(file);
-
- history.addRecord(record);
- }
-
- private void handleDeleteRecord(final long id, final Map<Long, RecordHistory> histories)
- {
- RecordHistoryImpl history = (RecordHistoryImpl)histories.remove(id);
-
- if (history == null)
- {
- throw new IllegalStateException("Cannot find record (delete) " + id);
- }
-
- RecordHandleImpl handle = (RecordHandleImpl)history.getHandle();
-
- handle.recordDeleted();
- }
-
- private void handleTransactionalRecord(final byte type, final long txID, final long id,
- final JournalFile file, final byte[] record,
- final byte end,
- final Map<Long, RecordHistory> histories)
+ private void repairFrom(int pos, JournalFile file) throws Exception
{
- TransactionInfo tx = transactions.get(txID);
+ log.warn("Corruption has been detected in file: " + file.getFile().getFileName() +
+ " in the record that starts at position " + pos + ". " +
+ "The most likely cause is that a crash occurred in the previous run. The corrupt record will be discarded.");
- if (tx == null)
- {
- tx = new TransactionInfo();
-
- transactions.put(txID, tx);
- }
+ file.getFile().fill(pos, fileSize - pos, FILL_CHARACTER);
- TransactionEntry entry = new TransactionEntry(type, id, file, record);
-
- tx.entries.add(entry);
-
- if (end == TX_DONE)
- {
- transactions.remove(txID);
-
- playTransaction(tx, histories);
- }
- else if (end == TX_CONTINUE)
- {
- //
- }
- else
- {
- throw new IllegalStateException("Invalid transaction marker " + end);
- }
+ file.getFile().position(pos);
}
private JournalFile createFile() throws Exception
{
- log.info("Creating a new file");
-
long orderingID = generateOrderingID();
- String fileName = journalDir + "/" + filePrefix + "-" + orderingID + "." + fileExtension;
+ String fileName = filePrefix + "-" + orderingID + "." + fileExtension;
SequentialFile sequentialFile = fileFactory.createSequentialFile(fileName, sync);
@@ -1059,12 +1337,14 @@
bb.flip();
- sequentialFile.write(bb);
+ sequentialFile.write(bb, true);
- sequentialFile.reset();
+ sequentialFile.position(SIZE_LONG);
JournalFile info = new JournalFile(sequentialFile, orderingID);
+ info.extendOffset(SIZE_LONG);
+
return info;
}
@@ -1096,46 +1376,19 @@
{
throw new IllegalArgumentException("Record is too large to store " + size);
}
-
+
if (currentFile == null || fileSize - currentFile.getOffset() < size)
{
- log.info("Getting new file");
+ checkAndCreateAvailableFiles();
- if (currentFile != null)
- {
- currentFile.getFile().close();
- }
+ currentFile.getFile().close();
- log.info("Getting new file");
-
- checkAndCreateAvailableFiles();
-
currentFile = availableFiles.remove();
files.add(currentFile);
}
}
- private static class TransactionInfo
- {
- final List<TransactionEntry> entries = new ArrayList<TransactionEntry>();
- }
-
- private static class TransactionEntry
- {
- TransactionEntry(final byte type, final long id, final JournalFile file, final byte[] record)
- {
- this.type = type;
- this.id = id;
- this.file = file;
- this.record = record;
- }
- final byte type;
- final long id;
- final JournalFile file;
- final byte[] record;
- }
-
private class ReclaimerTask extends TimerTask
{
public boolean cancel()
@@ -1149,7 +1402,7 @@
{
try
{
- checkFilesForReclamation();
+ //checkFilesForReclamation();
}
catch (Exception e)
{
@@ -1174,7 +1427,7 @@
{
try
{
- checkAndCreateAvailableFiles();
+ //checkAndCreateAvailableFiles();
}
catch (Exception e)
{
Modified: trunk/src/main/org/jboss/messaging/core/journal/impl/NIOSequentialFile.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/impl/NIOSequentialFile.java 2008-03-17 10:31:09 UTC (rev 3883)
+++ trunk/src/main/org/jboss/messaging/core/journal/impl/NIOSequentialFile.java 2008-03-17 12:04:44 UTC (rev 3884)
@@ -39,9 +39,9 @@
public class NIOSequentialFile implements SequentialFile
{
private static final Logger log = Logger.getLogger(NIOSequentialFile.class);
-
- private static final int LONG_LENGTH = 8;
-
+
+ private String journalDir;
+
private String fileName;
private boolean sync;
@@ -49,9 +49,13 @@
private File file;
private FileChannel channel;
+
+ private RandomAccessFile rfile;
- public NIOSequentialFile(final String fileName, final boolean sync)
+ public NIOSequentialFile(final String journalDir, final String fileName, final boolean sync)
{
+ this.journalDir = journalDir;
+
this.fileName = fileName;
this.sync = sync;
@@ -64,18 +68,20 @@
public void open() throws Exception
{
- file = new File(fileName);
+ file = new File(journalDir + "/" + fileName);
- RandomAccessFile rfile = new RandomAccessFile(file, "rw");
+ rfile = new RandomAccessFile(file, "rw");
channel = rfile.getChannel();
+
+ //log.info("Opened file");
}
public void fill(final int position, final int size, final byte fillCharacter) throws Exception
{
ByteBuffer bb = ByteBuffer.allocateDirect(size);
- for (int i = 0; i < size - LONG_LENGTH; i++)
+ for (int i = 0; i < size; i++)
{
bb.put(fillCharacter);
}
@@ -94,36 +100,51 @@
public void close() throws Exception
{
channel.close();
+
+ rfile.close();
+
+ channel = null;
+
+ rfile = null;
+
+ file = null;
+
+ //log.info("Closed file");
}
public void delete() throws Exception
- {
- close();
+ {
+ file.delete();
- file.delete();
+ close();
}
public int read(ByteBuffer bytes) throws Exception
{
+ //log.info("reading, position is " + channel.position());
+
int bytesRead = channel.read(bytes);
- log.info("Read " + bytesRead + " bytes");
+ //log.info("Read " + bytesRead + " bytes");
return bytesRead;
}
- public void write(ByteBuffer bytes) throws Exception
+ public int write(ByteBuffer bytes, boolean sync) throws Exception
{
- channel.write(bytes);
+ int bytesRead = channel.write(bytes);
- if (sync)
+ if (sync && this.sync)
{
channel.force(false);
}
+
+ return bytesRead;
}
- public void reset() throws Exception
+ public void position(final int pos) throws Exception
{
- channel.position(0);
+ //log.info("Positioning to " + pos);
+ channel.position(pos);
}
}
Modified: trunk/src/main/org/jboss/messaging/core/journal/impl/NIOSequentialFileFactory.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/impl/NIOSequentialFileFactory.java 2008-03-17 10:31:09 UTC (rev 3883)
+++ trunk/src/main/org/jboss/messaging/core/journal/impl/NIOSequentialFileFactory.java 2008-03-17 12:04:44 UTC (rev 3884)
@@ -23,7 +23,6 @@
import java.io.File;
import java.io.FilenameFilter;
-import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
@@ -39,12 +38,19 @@
*/
public class NIOSequentialFileFactory implements SequentialFileFactory
{
+ private final String journalDir;
+
+ public NIOSequentialFileFactory(final String journalDir)
+ {
+ this.journalDir = journalDir;
+ }
+
public SequentialFile createSequentialFile(final String fileName, final boolean sync)
{
- return new NIOSequentialFile(fileName, sync);
+ return new NIOSequentialFile(journalDir, fileName, sync);
}
- public List<String> listFiles(final String journalDir, final String extension) throws Exception
+ public List<String> listFiles(final String extension) throws Exception
{
File dir = new File(journalDir);
@@ -52,7 +58,7 @@
{
public boolean accept(File file, String name)
{
- return name.endsWith(".jbm");
+ return name.endsWith("." + extension);
}
};
Deleted: trunk/src/main/org/jboss/messaging/core/journal/impl/RecordHandleImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/impl/RecordHandleImpl.java 2008-03-17 10:31:09 UTC (rev 3883)
+++ trunk/src/main/org/jboss/messaging/core/journal/impl/RecordHandleImpl.java 2008-03-17 12:04:44 UTC (rev 3884)
@@ -1,45 +0,0 @@
-package org.jboss.messaging.core.journal.impl;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.jboss.messaging.core.journal.RecordHandle;
-
-/**
- *
- * A RecordHandleImpl
- *
- * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- *
- */
-public class RecordHandleImpl implements RecordHandle
-{
- private final long id;
-
- private List<JournalFile> files = new ArrayList<JournalFile>();
-
- public RecordHandleImpl(final long id)
- {
- this.id = id;
- }
-
- public void addFile(JournalFile file)
- {
- files.add(file);
-
- file.incRefCount();
- }
-
- public long getID()
- {
- return id;
- }
-
- public void recordDeleted()
- {
- for (JournalFile file: files)
- {
- file.decRefCount();
- }
- }
-}
Deleted: trunk/src/main/org/jboss/messaging/core/journal/impl/RecordHistoryImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/impl/RecordHistoryImpl.java 2008-03-17 10:31:09 UTC (rev 3883)
+++ trunk/src/main/org/jboss/messaging/core/journal/impl/RecordHistoryImpl.java 2008-03-17 12:04:44 UTC (rev 3884)
@@ -1,41 +0,0 @@
-package org.jboss.messaging.core.journal.impl;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.jboss.messaging.core.journal.RecordHandle;
-import org.jboss.messaging.core.journal.RecordHistory;
-
-/**
- *
- * A RecordHistoryImpl
- *
- * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- *
- */
-public class RecordHistoryImpl implements RecordHistory
-{
- private final List<byte[]> records = new ArrayList<byte[]>();
-
- private final RecordHandle handle;
-
- public RecordHistoryImpl(final RecordHandle handle)
- {
- this.handle = handle;
- }
-
- public RecordHandle getHandle()
- {
- return handle;
- }
-
- public List<byte[]> getRecords()
- {
- return records;
- }
-
- public void addRecord(final byte[] record)
- {
- records.add(record);
- }
-}
Added: trunk/src/main/org/jboss/messaging/core/journal/impl/TransactionHolder.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/impl/TransactionHolder.java (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/journal/impl/TransactionHolder.java 2008-03-17 12:04:44 UTC (rev 3884)
@@ -0,0 +1,32 @@
+package org.jboss.messaging.core.journal.impl;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.jboss.messaging.core.journal.RecordInfo;
+
+/**
+ *
+ * A TransactionHolder
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ */
+public class TransactionHolder
+{
+ public TransactionHolder(final long id)
+ {
+ this.transactionID = id;
+ }
+
+ public final long transactionID;
+
+ public final List<RecordInfo> recordInfos = new ArrayList<RecordInfo>();
+
+ public final Set<Long> recordsToDelete = new HashSet<Long>();
+
+ public boolean prepared;
+
+}
Added: trunk/tests/src/org/jboss/messaging/core/journal/impl/test/unit/FakeSequentialFileFactoryTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/core/journal/impl/test/unit/FakeSequentialFileFactoryTest.java (rev 0)
+++ trunk/tests/src/org/jboss/messaging/core/journal/impl/test/unit/FakeSequentialFileFactoryTest.java 2008-03-17 12:04:44 UTC (rev 3884)
@@ -0,0 +1,46 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.messaging.core.journal.impl.test.unit;
+
+import org.jboss.messaging.core.journal.SequentialFileFactory;
+import org.jboss.messaging.core.journal.impl.test.unit.fakes.FakeSequentialFileFactory;
+
+/**
+ *
+ * A FakeSequentialFileFactoryTest
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ */
+public class FakeSequentialFileFactoryTest extends SequentialFileFactoryTestBase
+{
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+ }
+
+ protected SequentialFileFactory createFactory()
+ {
+ return new FakeSequentialFileFactory();
+ }
+
+}
Added: trunk/tests/src/org/jboss/messaging/core/journal/impl/test/unit/JournalImplTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/core/journal/impl/test/unit/JournalImplTest.java (rev 0)
+++ trunk/tests/src/org/jboss/messaging/core/journal/impl/test/unit/JournalImplTest.java 2008-03-17 12:04:44 UTC (rev 3884)
@@ -0,0 +1,45 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.messaging.core.journal.impl.test.unit;
+
+import org.jboss.messaging.core.journal.SequentialFileFactory;
+import org.jboss.messaging.core.journal.impl.test.unit.fakes.FakeSequentialFileFactory;
+
+/**
+ *
+ * A JournalImplTest
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ */
+public class JournalImplTest extends JournalImplTestBase
+{
+ protected void prepareDirectory() throws Exception
+ {
+ //NOOP
+ }
+
+ protected SequentialFileFactory getFileFactory() throws Exception
+ {
+ return new FakeSequentialFileFactory();
+ }
+}
Added: trunk/tests/src/org/jboss/messaging/core/journal/impl/test/unit/JournalImplTestBase.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/core/journal/impl/test/unit/JournalImplTestBase.java (rev 0)
+++ trunk/tests/src/org/jboss/messaging/core/journal/impl/test/unit/JournalImplTestBase.java 2008-03-17 12:04:44 UTC (rev 3884)
@@ -0,0 +1,1345 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.messaging.core.journal.impl.test.unit;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.ListIterator;
+import java.util.Map;
+
+import org.jboss.messaging.core.journal.Journal;
+import org.jboss.messaging.core.journal.PreparedTransactionInfo;
+import org.jboss.messaging.core.journal.RecordInfo;
+import org.jboss.messaging.core.journal.SequentialFileFactory;
+import org.jboss.messaging.core.journal.impl.JournalImpl;
+import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.test.unit.RandomUtil;
+import org.jboss.messaging.test.unit.UnitTestCase;
+
+/**
+ *
+ * A JournalImplTestBase
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ */
+public abstract class JournalImplTestBase extends UnitTestCase
+{
+ private static final Logger log = Logger.getLogger(JournalImplTestBase.class);
+
+ private List<RecordInfo> records = new LinkedList<RecordInfo>();
+
+ private Journal journal;
+
+ private int recordLength = 1024;
+
+ private Map<Long, TransactionHolder> transactions = new LinkedHashMap<Long, TransactionHolder>();
+
+ private int minFiles;
+
+ private int minAvailableFiles;
+
+ private int fileSize;
+
+ private boolean sync;
+
+ private String filePrefix = "jbm";
+
+ private String fileExtension = "jbm";
+
+ private SequentialFileFactory fileFactory;
+
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+
+ prepareDirectory();
+
+ fileFactory = getFileFactory();
+ }
+
+ protected void tearDown() throws Exception
+ {
+ super.tearDown();
+
+ if (journal != null)
+ {
+ try
+ {
+ journal.stop();
+ }
+ catch (Exception ignore)
+ {
+ }
+ }
+ }
+
+ protected abstract void prepareDirectory() throws Exception;
+
+ protected abstract SequentialFileFactory getFileFactory() throws Exception;
+
+ // General tests
+ // =============
+
+ public void testState() throws Exception
+ {
+ setup(10, 10, 10 * 1024, true);
+ createJournal();
+ try
+ {
+ load();
+ fail("Should throw exception");
+ }
+ catch (IllegalStateException e)
+ {
+ //OK
+ }
+ try
+ {
+ stopJournal();
+ fail("Should throw exception");
+ }
+ catch (IllegalStateException e)
+ {
+ //OK
+ }
+ startJournal();
+ try
+ {
+ startJournal();
+ fail("Should throw exception");
+ }
+ catch (IllegalStateException e)
+ {
+ //OK
+ }
+ stopJournal();
+ startJournal();
+ load();
+ try
+ {
+ load();
+ fail("Should throw exception");
+ }
+ catch (IllegalStateException e)
+ {
+ //OK
+ }
+ try
+ {
+ startJournal();
+ fail("Should throw exception");
+ }
+ catch (IllegalStateException e)
+ {
+ //OK
+ }
+ stopJournal();
+ }
+
+ public void testParams() throws Exception
+ {
+ try
+ {
+ new JournalImpl(JournalImpl.MIN_FILE_SIZE - 1, 10, 10, true, fileFactory, 5000, filePrefix, fileExtension);
+
+ fail("Should throw exception");
+ }
+ catch (IllegalArgumentException e)
+ {
+ //Ok
+ }
+
+ try
+ {
+ new JournalImpl(10 * 1024, 1, 10, true, fileFactory, 5000, filePrefix, fileExtension);
+
+ fail("Should throw exception");
+ }
+ catch (IllegalArgumentException e)
+ {
+ //Ok
+ }
+
+ try
+ {
+ new JournalImpl(10 * 1024, 10, 1, true, fileFactory, 5000, filePrefix, fileExtension);
+
+ fail("Should throw exception");
+ }
+ catch (IllegalArgumentException e)
+ {
+ //Ok
+ }
+
+ try
+ {
+ new JournalImpl(10 * 1024, 10, 10, true, null, 5000, filePrefix, fileExtension);
+
+ fail("Should throw exception");
+ }
+ catch (NullPointerException e)
+ {
+ //Ok
+ }
+
+ try
+ {
+ new JournalImpl(10 * 1024, 10, 10, true, fileFactory, JournalImpl.MIN_TASK_PERIOD - 1, filePrefix, fileExtension);
+
+ fail("Should throw exception");
+ }
+ catch (IllegalArgumentException e)
+ {
+ //Ok
+ }
+
+ try
+ {
+ new JournalImpl(10 * 1024, 10, 10, true, fileFactory, 5000, null, fileExtension);
+
+ fail("Should throw exception");
+ }
+ catch (NullPointerException e)
+ {
+ //Ok
+ }
+
+ try
+ {
+ new JournalImpl(10 * 1024, 10, 10, true, fileFactory, 5000, filePrefix, null);
+
+ fail("Should throw exception");
+ }
+ catch (NullPointerException e)
+ {
+ //Ok
+ }
+
+ }
+
+ // Non transactional tests
+ // =======================
+
+ public void testSimpleAdd() throws Exception
+ {
+ setup(10, 10, 10 * 1024, true);
+ createJournal();
+ startJournal();
+ load();
+ add(1);
+ stopJournal();
+ createJournal();
+ startJournal();
+ loadAndCheck();
+ }
+
+ public void testMultipleAdd() throws Exception
+ {
+ setup(10, 10, 10 * 1024, true);
+ createJournal();
+ startJournal();
+ load();
+ add(1,2,3,4,5,6,7,8,9,10);
+ stopJournal();
+ createJournal();
+ startJournal();
+ loadAndCheck();
+ }
+
+ public void testMultipleAddNonContiguous() throws Exception
+ {
+ setup(10, 10, 10 * 1024, true);
+ createJournal();
+ startJournal();
+ load();
+ add(1,3,5,7,10,13,56,100,102,200,201,202,203);
+ stopJournal();
+ createJournal();
+ startJournal();
+ loadAndCheck();
+ }
+
+ public void testSimpleAddUpdate() throws Exception
+ {
+ setup(10, 10, 10 * 1024, true);
+ createJournal();
+ startJournal();
+ load();
+ add(1);
+ update(1);
+ stopJournal();
+ createJournal();
+ startJournal();
+ loadAndCheck();
+ }
+
+ public void testMultipleAddUpdate() throws Exception
+ {
+ setup(10, 10, 10 * 1024, true);
+ createJournal();
+ startJournal();
+ load();
+ add(1,2,3,4,5,6,7,8,9,10);
+ update(1,2,4,7,9,10);
+ stopJournal();
+ createJournal();
+ startJournal();
+ loadAndCheck();
+ }
+
+ public void testMultipleAddUpdateAll() throws Exception
+ {
+ setup(10, 10, 10 * 1024, true);
+ createJournal();
+ startJournal();
+ load();
+ add(1,2,3,4,5,6,7,8,9,10);
+ update(1,2,3,4,5,6,7,8,9,10);
+ stopJournal();
+ createJournal();
+ startJournal();
+ loadAndCheck();
+ }
+
+ public void testMultipleAddUpdateNonContiguous() throws Exception
+ {
+ setup(10, 10, 10 * 1024, true);
+ createJournal();
+ startJournal();
+ load();
+ add(1,3,5,7,10,13,56,100,102,200,201,202,203);
+ add(3,7,10,13,56,100,200,202,203);
+ stopJournal();
+ createJournal();
+ startJournal();
+ loadAndCheck();
+ }
+
+ public void testMultipleAddUpdateAllNonContiguous() throws Exception
+ {
+ setup(10, 10, 10 * 1024, true);
+ createJournal();
+ startJournal();
+ load();
+ add(1,3,5,7,10,13,56,100,102,200,201,202,203);
+ update(1,3,5,7,10,13,56,100,102,200,201,202,203);
+ stopJournal();
+ createJournal();
+ startJournal();
+ loadAndCheck();
+ }
+
+ public void testSimpleAddUpdateDelete() throws Exception
+ {
+ setup(10, 10, 10 * 1024, true);
+ createJournal();
+ startJournal();
+ load();
+ add(1);
+ update(1);
+ delete(1);
+ stopJournal();
+ createJournal();
+ startJournal();
+ loadAndCheck();
+ }
+
+ public void testMultipleAddUpdateDelete() throws Exception
+ {
+ setup(10, 10, 10 * 1024, true);
+ createJournal();
+ startJournal();
+ load();
+ add(1,2,3,4,5,6,7,8,9,10);
+ update(1,2,4,7,9,10);
+ delete(1,4,7,9,10);
+ stopJournal();
+ createJournal();
+ startJournal();
+ loadAndCheck();
+ }
+
+ public void testMultipleAddUpdateDeleteAll() throws Exception
+ {
+ setup(10, 10, 10 * 1024, true);
+ createJournal();
+ startJournal();
+ load();
+ add(1,2,3,4,5,6,7,8,9,10);
+ update(1,2,3,4,5,6,7,8,9,10);
+ update(1,2,3,4,5,6,7,8,9,10);
+ stopJournal();
+ createJournal();
+ startJournal();
+ loadAndCheck();
+ }
+
+ public void testMultipleAddUpdateDeleteNonContiguous() throws Exception
+ {
+ setup(10, 10, 10 * 1024, true);
+ createJournal();
+ startJournal();
+ load();
+ add(1,3,5,7,10,13,56,100,102,200,201,202,203);
+ add(3,7,10,13,56,100,200,202,203);
+ delete(3,10,56,100,200,203);
+ stopJournal();
+ createJournal();
+ startJournal();
+ loadAndCheck();
+ }
+
+ public void testMultipleAddUpdateDeleteAllNonContiguous() throws Exception
+ {
+ setup(10, 10, 10 * 1024, true);
+ createJournal();
+ startJournal();
+ load();
+ add(1,3,5,7,10,13,56,100,102,200,201,202,203);
+ update(1,3,5,7,10,13,56,100,102,200,201,202,203);
+ delete(1,3,5,7,10,13,56,100,102,200,201,202,203);
+ stopJournal();
+ createJournal();
+ startJournal();
+ loadAndCheck();
+ }
+
+ public void testMultipleAddUpdateDeleteDifferentOrder() throws Exception
+ {
+ setup(10, 10, 10 * 1024, true);
+ createJournal();
+ startJournal();
+ load();
+ add(1,3,5,7,10,13,56,100,102,200,201,202,203);
+ update(203, 202, 201, 200, 102, 100, 1, 3, 5, 7, 10, 13, 56);
+ delete(56, 13, 10, 7, 5, 3, 1, 203, 202, 201, 200, 102, 100);
+ stopJournal();
+ createJournal();
+ startJournal();
+ loadAndCheck();
+ }
+
+ public void testMultipleAddUpdateDeleteDifferentRecordLengths() throws Exception
+ {
+ setup(10, 10, 2048, true);
+ createJournal();
+ startJournal();
+ load();
+
+ for (int i = 0; i < 1000; i++)
+ {
+ byte[] record = generateRecord(10 + (int)(1500 * Math.random()));
+
+ journal.appendAddRecord(i, record);
+
+ records.add(new RecordInfo(i, record, false));
+ }
+
+ for (int i = 0; i < 1000; i++)
+ {
+ byte[] record = generateRecord(10 + (int)(1024 * Math.random()));
+
+ journal.appendUpdateRecord(i, record);
+
+ records.add(new RecordInfo(i, record, true));
+ }
+
+ for (int i = 0; i < 1000; i++)
+ {
+ journal.appendDeleteRecord(i);
+
+ removeRecordsForID(i);
+ }
+
+ stopJournal();
+ createJournal();
+ startJournal();
+ loadAndCheck();
+ stopJournal();
+ }
+
+ public void testAddUpdateDeleteManySmallFileSize() throws Exception
+ {
+ final int numberAdds = 10000;
+
+ final int numberUpdates = 5000;
+
+ final int numberDeletes = 3000;
+
+ long[] adds = new long[numberAdds];
+
+ for (int i = 0; i < numberAdds; i++)
+ {
+ adds[i] = i;
+ }
+
+ long[] updates = new long[numberUpdates];
+
+ for (int i = 0; i < numberUpdates; i++)
+ {
+ updates[i] = i;
+ }
+
+ long[] deletes = new long[numberDeletes];
+
+ for (int i = 0; i < numberDeletes; i++)
+ {
+ deletes[i] = i;
+ }
+
+ setup(10, 10, 10 * 1024, true);
+ createJournal();
+ startJournal();
+ load();
+ add(adds);
+ update(updates);
+ delete(deletes);
+ stopJournal();
+ createJournal();
+ startJournal();
+ loadAndCheck();
+
+ }
+
+ public void testAddUpdateDeleteManyLargeFileSize() throws Exception
+ {
+ final int numberAdds = 10000;
+
+ final int numberUpdates = 5000;
+
+ final int numberDeletes = 3000;
+
+ long[] adds = new long[numberAdds];
+
+ for (int i = 0; i < numberAdds; i++)
+ {
+ adds[i] = i;
+ }
+
+ long[] updates = new long[numberUpdates];
+
+ for (int i = 0; i < numberUpdates; i++)
+ {
+ updates[i] = i;
+ }
+
+ long[] deletes = new long[numberDeletes];
+
+ for (int i = 0; i < numberDeletes; i++)
+ {
+ deletes[i] = i;
+ }
+
+ setup(10, 10, 10 * 1024 * 1024, true);
+ createJournal();
+ startJournal();
+ load();
+ add(adds);
+ update(updates);
+ delete(deletes);
+ stopJournal();
+ createJournal();
+ startJournal();
+ loadAndCheck();
+
+ }
+
+ // Transactional tests
+ // ===================
+
+ public void testSimpleTransaction() throws Exception
+ {
+ setup(10, 10, 10 * 1024, true);
+ createJournal();
+ startJournal();
+ load();
+ addTx(1, false, 1);
+ updateTx(1, false, 1);
+ deleteTx(1, true, 1);
+ stopJournal();
+ createJournal();
+ startJournal();
+ loadAndCheck();
+ }
+
+ public void testTransactionDontDeleteAll() throws Exception
+ {
+ setup(10, 10, 10 * 1024, true);
+ createJournal();
+ startJournal();
+ load();
+ addTx(1, false, 1, 2, 3);
+ updateTx(1, false, 1, 2);
+ deleteTx(1, true, 1);
+ stopJournal();
+ createJournal();
+ startJournal();
+ loadAndCheck();
+ }
+
+ public void testTransactionDeleteAll() throws Exception
+ {
+ setup(10, 10, 10 * 1024, true);
+ createJournal();
+ startJournal();
+ load();
+ addTx(1, false, 1, 2, 3);
+ updateTx(1, false, 1, 2);
+ deleteTx(1, true, 1, 2, 3);
+ stopJournal();
+ createJournal();
+ startJournal();
+ loadAndCheck();
+ }
+
+ public void testTransactionUpdateFromBeforeTx() throws Exception
+ {
+ setup(10, 10, 10 * 1024, true);
+ createJournal();
+ startJournal();
+ load();
+ add(1, 2, 3);
+ addTx(1, false, 4, 5, 6);
+ updateTx(1, true, 1, 5);
+ stopJournal();
+ createJournal();
+ startJournal();
+ loadAndCheck();
+ }
+
+ public void testTransactionDeleteFromBeforeTx() throws Exception
+ {
+ setup(10, 10, 10 * 1024, true);
+ createJournal();
+ startJournal();
+ load();
+ add(1, 2, 3);
+ addTx(1, false, 4, 5, 6);
+ deleteTx(1, true, 1, 2, 3, 4, 5, 6);
+ stopJournal();
+ createJournal();
+ startJournal();
+ loadAndCheck();
+ }
+
+ public void testTransactionChangesNotVisibleOutsideTX() throws Exception
+ {
+ setup(10, 10, 10 * 1024, true);
+ createJournal();
+ startJournal();
+ load();
+ add(1, 2, 3);
+ addTx(1, false, 4, 5, 6);
+ updateTx(1, false, 1, 2, 4, 5);
+ deleteTx(1, false, 1, 2, 3, 4, 5, 6);
+ stopJournal();
+ createJournal();
+ startJournal();
+ loadAndCheck();
+ }
+
+ public void testMultipleTransactionsDifferentIDs() throws Exception
+ {
+ setup(10, 10, 10 * 1024, true);
+ createJournal();
+ startJournal();
+ load();
+
+ addTx(1, false, 1, 2, 3, 4, 5, 6);
+ updateTx(1, false, 1, 3, 5);
+ deleteTx(1, false, 1, 2, 3, 4, 5, 6);
+
+ addTx(2, false, 11, 12, 13, 14, 15, 16);
+ updateTx(2, false, 11, 13, 15);
+ deleteTx(2, false, 11, 12, 13, 14, 15, 16);
+
+ addTx(3, false, 21, 22, 23, 24, 25, 26);
+ updateTx(3, false, 21, 23, 25);
+ deleteTx(3, false, 21, 22, 23, 24, 25, 26);
+
+ stopJournal();
+ createJournal();
+ startJournal();
+ loadAndCheck();
+ }
+
+ public void testMultipleInterleavedTransactionsDifferentIDs() throws Exception
+ {
+ setup(10, 10, 10 * 1024, true);
+ createJournal();
+ startJournal();
+ load();
+
+ addTx(1, false, 1, 2, 3, 4, 5, 6);
+
+ addTx(3, false, 21, 22, 23, 24, 25, 26);
+
+ updateTx(1, false, 1, 3, 5);
+
+ addTx(2, false, 11, 12, 13, 14, 15, 16);
+
+ deleteTx(1, false, 1, 2, 3, 4, 5, 6);
+
+ updateTx(2, false, 11, 13, 15);
+
+ updateTx(3, false, 21, 23, 25);
+
+ deleteTx(2, false, 11, 12, 13, 14, 15, 16);
+
+ deleteTx(3, false, 21, 22, 23, 24, 25, 26);
+
+ stopJournal();
+ createJournal();
+ startJournal();
+ loadAndCheck();
+ }
+
+ public void testMultipleInterleavedTransactionsSameIDs() throws Exception
+ {
+ setup(10, 10, 10 * 1024, true);
+ createJournal();
+ startJournal();
+ load();
+
+ add(1, 2, 3, 4, 5, 6, 7, 8);
+
+ addTx(1, false, 9, 10, 11, 12);
+
+ addTx(2, false, 13, 14, 15, 16, 17);
+
+ addTx(3, false, 18, 19, 20, 21, 22);
+
+ updateTx(1, false, 1, 2, 3);
+
+ updateTx(2, true, 4, 5, 6);
+
+ updateTx(3, false, 7, 8);
+
+ deleteTx(1, true, 1, 2);
+
+ deleteTx(3, true, 7, 8);
+
+ stopJournal();
+ createJournal();
+ startJournal();
+ loadAndCheck();
+ }
+
+ public void testTransactionMixed() throws Exception
+ {
+ setup(10, 10, 10 * 1024, true);
+ createJournal();
+ startJournal();
+ load();
+ add(1,3,5,7,10,13,56,100,102,200,201,202,203);
+ addTx(1, false, 675, 676, 677, 700, 703);
+ update(1,3,5,7,10,13,56,100,102,200,201,202,203);
+ updateTx(1, false, 677, 700);
+ delete(1,3,5,7,10,13,56,100,102,200,201,202,203);
+ deleteTx(1, true, 703, 675, 1,3,5,7,10);
+ stopJournal();
+ createJournal();
+ startJournal();
+ loadAndCheck();
+ }
+
+ public void testTransactionAddDeleteDifferentOrder() throws Exception
+ {
+ setup(10, 10, 10 * 1024, true);
+ createJournal();
+ startJournal();
+ load();
+ addTx(1, false, 1, 2, 3, 4, 5, 6, 7, 8, 9);
+ deleteTx(1, true, 9, 8, 5, 3, 7, 6, 2, 1, 4);
+ stopJournal();
+ createJournal();
+ startJournal();
+ loadAndCheck();
+ }
+
+
+ // XA tests
+ // ========
+
+ public void testXASimpleNotPrepared() throws Exception
+ {
+ setup(10, 10, 10 * 1024, true);
+ createJournal();
+ startJournal();
+ load();
+ this.addPrepare(1, false, 1, 2, 3, 4, 5, 6, 7, 8, 9);
+ this.updatePrepare(1, false, 1, 2, 3, 4, 7, 8);
+ this.deletePrepare(1, false, 1, 2, 3, 4, 5);
+ stopJournal();
+ createJournal();
+ startJournal();
+ loadAndCheck();
+ }
+
+ public void testXASimplePrepared() throws Exception
+ {
+ setup(10, 10, 10 * 1024, true);
+ createJournal();
+ startJournal();
+ load();
+ this.addPrepare(1, false, 1, 2, 3, 4, 5, 6, 7, 8, 9);
+ this.updatePrepare(1, false, 1, 2, 3, 4, 7, 8);
+ this.deletePrepare(1, true, 1, 2, 3, 4, 5);
+ stopJournal();
+ createJournal();
+ startJournal();
+ loadAndCheck();
+ }
+
+ public void testXASimpleCommit() throws Exception
+ {
+ setup(10, 10, 10 * 1024, true);
+ createJournal();
+ startJournal();
+ load();
+ this.addPrepare(1, false, 1, 2, 3, 4, 5, 6, 7, 8, 9);
+ this.updatePrepare(1, false, 1, 2,3, 4, 7, 8);
+ this.deletePrepare(1, true, 1, 2, 3, 4, 5);
+ this.xaCommit(1);
+ stopJournal();
+ createJournal();
+ startJournal();
+ loadAndCheck();
+ }
+
+ public void testXASimpleRollback() throws Exception
+ {
+ setup(10, 10, 10 * 1024, true);
+ createJournal();
+ startJournal();
+ load();
+ this.addPrepare(1, false, 1, 2, 3, 4, 5, 6, 7, 8, 9);
+ this.updatePrepare(1, false, 1, 2,3, 4, 7, 8);
+ this.deletePrepare(1, true, 1, 2, 3, 4, 5);
+ this.xaRollback(1);
+ stopJournal();
+ createJournal();
+ startJournal();
+ loadAndCheck();
+ }
+
+ public void testXAChangesNotVisibleNotPrepared() throws Exception
+ {
+ setup(10, 10, 10 * 1024, true);
+ createJournal();
+ startJournal();
+ load();
+ this.add(1, 2, 3, 4, 5, 6);
+ this.addPrepare(1, false, 7, 8, 9, 10);
+ this.updatePrepare(1, false, 1, 2, 3, 7, 8, 9);
+ this.deletePrepare(1, false, 1, 2, 3, 4, 5);
+ stopJournal();
+ createJournal();
+ startJournal();
+ loadAndCheck();
+ }
+
+ public void testXAChangesNotVisiblePrepared() throws Exception
+ {
+ setup(10, 10, 10 * 1024, true);
+ createJournal();
+ startJournal();
+ load();
+ this.add(1, 2, 3, 4, 5, 6);
+ this.addPrepare(1, false, 7, 8, 9, 10);
+ this.updatePrepare(1, false, 1, 2, 3, 7, 8, 9);
+ this.deletePrepare(1, true, 1, 2, 3, 4, 5);
+ stopJournal();
+ createJournal();
+ startJournal();
+ loadAndCheck();
+ }
+
+ public void testXAChangesNotVisibleRollback() throws Exception
+ {
+ setup(10, 10, 10 * 1024, true);
+ createJournal();
+ startJournal();
+ load();
+ this.add(1, 2, 3, 4, 5, 6);
+ this.addPrepare(1, false, 7, 8, 9, 10);
+ this.updatePrepare(1, false, 1, 2, 3, 7, 8, 9);
+ this.deletePrepare(1, true, 1, 2, 3, 4, 5);
+ this.xaRollback(1);
+ stopJournal();
+ createJournal();
+ startJournal();
+ loadAndCheck();
+ }
+
+ public void testXAChangesisibleCommit() throws Exception
+ {
+ setup(10, 10, 10 * 1024, true);
+ createJournal();
+ startJournal();
+ load();
+ this.add(1, 2, 3, 4, 5, 6);
+ this.addPrepare(1, false, 7, 8, 9, 10);
+ this.updatePrepare(1, false, 1, 2, 3, 7, 8, 9);
+ this.deletePrepare(1, true, 1, 2, 3, 4, 5);
+ this.xaCommit(1);
+ stopJournal();
+ createJournal();
+ startJournal();
+ loadAndCheck();
+ }
+
+ public void testXAMultiple() throws Exception
+ {
+ setup(10, 10, 10 * 1024, true);
+ createJournal();
+ startJournal();
+ load();
+ this.add(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
+ this.addPrepare(1, false, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20);
+ this.addPrepare(2, false, 21, 22, 23, 24, 25, 26, 27);
+ this.updatePrepare(1, false, 1, 3, 6, 11, 14, 17);
+ this.addPrepare(3, false, 28, 29, 30, 31, 32, 33, 34, 35);
+ this.updatePrepare(3, false, 7, 8, 9, 10);
+ this.deletePrepare(2, true, 4, 5, 6, 23, 25, 27);
+ this.deletePrepare(1, true, 1, 2, 11, 14, 15);
+ this.deletePrepare(3, true, 28, 31, 32, 9);
+
+ this.xaCommit(1);
+ this.xaRollback(2);
+ this.xaCommit(3);
+ }
+
+ // Private ---------------------------------------------------------------------------------
+
+ private void setup(int minFiles, int minAvailableFiles, int fileSize, boolean sync)
+ {
+ this.minFiles = minFiles;
+ this.minAvailableFiles = minAvailableFiles;
+ this.fileSize = fileSize;
+ this.sync = sync;
+ }
+
+ public void createJournal() throws Exception
+ {
+ journal =
+ new JournalImpl(fileSize, minFiles, minAvailableFiles, sync, fileFactory, 5000, filePrefix, fileExtension);
+ }
+
+ private void startJournal() throws Exception
+ {
+ journal.start();
+ }
+
+ private void stopJournal() throws Exception
+ {
+ journal.stop();
+ }
+
+ private void loadAndCheck() throws Exception
+ {
+ List<RecordInfo> committedRecords = new ArrayList<RecordInfo>();
+
+ List<PreparedTransactionInfo> preparedTransactions = new ArrayList<PreparedTransactionInfo>();
+
+ journal.load(committedRecords, preparedTransactions);
+
+ checkRecordsEquivalent(records, committedRecords);
+
+ //check prepared transactions
+
+ List<PreparedTransactionInfo> prepared = new ArrayList<PreparedTransactionInfo>();
+
+ for (Map.Entry<Long, TransactionHolder> entry : transactions.entrySet())
+ {
+ if (entry.getValue().prepared)
+ {
+ PreparedTransactionInfo info = new PreparedTransactionInfo(entry.getKey());
+
+ info.records.addAll(entry.getValue().records);
+
+ info.recordsToDelete.addAll(entry.getValue().deletes);
+
+ prepared.add(info);
+ }
+ }
+
+ checkTransactionsEquivalent(prepared, preparedTransactions);
+ }
+
+
+
+ private void load() throws Exception
+ {
+ journal.load(null, null);
+ }
+
+ private void add(long... arguments) throws Exception
+ {
+ for (int i = 0; i < arguments.length; i++)
+ {
+ byte[] record = generateRecord(recordLength);
+
+ journal.appendAddRecord(arguments[i], record);
+
+ records.add(new RecordInfo(arguments[i], record, false));
+ }
+ }
+
+ private void update(long... arguments) throws Exception
+ {
+ for (int i = 0; i < arguments.length; i++)
+ {
+ byte[] updateRecord = generateRecord(recordLength);
+
+ journal.appendUpdateRecord(arguments[i], updateRecord);
+
+ records.add(new RecordInfo(arguments[i], updateRecord, true));
+ }
+ }
+
+ private void delete(long... arguments) throws Exception
+ {
+ for (int i = 0; i < arguments.length; i++)
+ {
+ journal.appendDeleteRecord(arguments[i]);
+
+ removeRecordsForID(arguments[i]);
+ }
+ }
+
+ private void addTx(long txID, boolean done, long... arguments) throws Exception
+ {
+ TransactionHolder tx = transactions.get(txID);
+
+ if (tx == null)
+ {
+ tx = new TransactionHolder();
+
+ transactions.put(txID, tx);
+ }
+
+ for (int i = 0; i < arguments.length; i++)
+ {
+ byte[] record = generateRecord(recordLength);
+
+ boolean useDone = done ? i == arguments.length - 1 : false;
+
+ journal.appendAddRecordTransactional(txID, arguments[i], record, useDone);
+
+ tx.records.add(new RecordInfo(arguments[i], record, false));
+
+ }
+
+ if (done)
+ {
+ commitTx(txID);
+ }
+ }
+
+ private void addPrepare(long txID, boolean done, long... arguments) throws Exception
+ {
+ TransactionHolder tx = transactions.get(txID);
+
+ if (tx == null)
+ {
+ tx = new TransactionHolder();
+
+ transactions.put(txID, tx);
+ }
+
+ for (int i = 0; i < arguments.length; i++)
+ {
+ byte[] record = generateRecord(recordLength);
+
+ boolean useDone = done ? i == arguments.length - 1 : false;
+
+ journal.appendAddRecordPrepare(txID, arguments[i], record, useDone);
+
+ tx.records.add(new RecordInfo(arguments[i], record, false));
+
+ }
+
+ if (done)
+ {
+ tx.prepared = true;
+ }
+ }
+
+ private void updateTx(long txID, boolean done, long... arguments) throws Exception
+ {
+ TransactionHolder tx = transactions.get(txID);
+
+ if (tx == null)
+ {
+ throw new IllegalStateException("Cannot find tx " + txID);
+ }
+
+ for (int i = 0; i < arguments.length; i++)
+ {
+ byte[] updateRecord = generateRecord(recordLength);
+
+ boolean useDone = done ? i == arguments.length - 1 : false;
+
+ journal.appendUpdateRecordTransactional(txID, arguments[i], updateRecord, useDone);
+
+ tx.records.add(new RecordInfo(arguments[i], updateRecord, true));
+ }
+
+ if (done)
+ {
+ commitTx(txID);
+ }
+ }
+
+ private void updatePrepare(long txID, boolean done, long... arguments) throws Exception
+ {
+ TransactionHolder tx = transactions.get(txID);
+
+ if (tx == null)
+ {
+ throw new IllegalStateException("Cannot find tx " + txID);
+ }
+
+ if (tx.prepared)
+ {
+ throw new IllegalStateException("Transaction is already prepared");
+ }
+
+ for (int i = 0; i < arguments.length; i++)
+ {
+ byte[] updateRecord = generateRecord(recordLength);
+
+ boolean useDone = done ? i == arguments.length - 1 : false;
+
+ journal.appendUpdateRecordPrepare(txID, arguments[i], updateRecord, useDone);
+
+ tx.records.add(new RecordInfo(arguments[i], updateRecord, true));
+ }
+
+ if (done)
+ {
+ tx.prepared = true;
+ }
+ }
+
+ private void deleteTx(long txID, boolean done, long... arguments) throws Exception
+ {
+ TransactionHolder tx = transactions.get(txID);
+
+ if (tx == null)
+ {
+ throw new IllegalStateException("Cannot find tx " + txID);
+ }
+
+ for (int i = 0; i < arguments.length; i++)
+ {
+ boolean useDone = done ? i == arguments.length - 1 : false;
+
+ journal.appendDeleteRecordTransactional(txID, arguments[i], useDone);
+
+ tx.deletes.add(arguments[i]);
+ }
+
+ if (done)
+ {
+ commitTx(txID);
+ }
+ }
+
+ private void deletePrepare(long txID, boolean done, long... arguments) throws Exception
+ {
+ TransactionHolder tx = transactions.get(txID);
+
+ if (tx == null)
+ {
+ throw new IllegalStateException("Cannot find tx " + txID);
+ }
+
+ if (tx.prepared)
+ {
+ throw new IllegalStateException("Transaction is already prepared");
+ }
+
+ for (int i = 0; i < arguments.length; i++)
+ {
+ boolean useDone = done ? i == arguments.length - 1 : false;
+
+ journal.appendDeleteRecordPrepare(txID, arguments[i], useDone);
+
+ tx.deletes.add(arguments[i]);
+ }
+
+ if (done)
+ {
+ tx.prepared = true;
+ }
+ }
+
+ private void xaCommit(long txID) throws Exception
+ {
+ TransactionHolder tx = transactions.get(txID);
+
+ if (tx == null)
+ {
+ throw new IllegalStateException("Cannot find tx " + txID);
+ }
+
+ if (!tx.prepared)
+ {
+ throw new IllegalStateException("Transaction is not prepared");
+ }
+
+ journal.appendXACommitRecord(txID);
+
+ this.commitTx(txID);
+ }
+
+ private void xaRollback(long txID) throws Exception
+ {
+ TransactionHolder tx = transactions.remove(txID);
+
+ if (tx == null)
+ {
+ throw new IllegalStateException("Cannot find tx " + txID);
+ }
+
+ if (!tx.prepared)
+ {
+ throw new IllegalStateException("Transaction is not prepared");
+ }
+
+ journal.appendXARollbackRecord(txID);
+ }
+
+ private void commitTx(long txID)
+ {
+ TransactionHolder tx = transactions.remove(txID);
+
+ if (tx == null)
+ {
+ throw new IllegalStateException("Cannot find tx " + txID);
+ }
+
+ records.addAll(tx.records);
+
+ for (Long l: tx.deletes)
+ {
+ removeRecordsForID(l);
+ }
+ }
+
+ private void removeRecordsForID(long id)
+ {
+ for (ListIterator<RecordInfo> iter = records.listIterator(); iter.hasNext();)
+ {
+ RecordInfo info = iter.next();
+
+ if (info.id == id)
+ {
+ iter.remove();
+ }
+ }
+ }
+
+
+ private void checkTransactionsEquivalent(List<PreparedTransactionInfo> expected, List<PreparedTransactionInfo> actual)
+ {
+ assertEquals("Lists not same length", expected.size(), actual.size());
+
+ Iterator<PreparedTransactionInfo> iterExpected = expected.iterator();
+
+ Iterator<PreparedTransactionInfo> iterActual = actual.iterator();
+
+ while (iterExpected.hasNext())
+ {
+ PreparedTransactionInfo rexpected = iterExpected.next();
+
+ PreparedTransactionInfo ractual = iterActual.next();
+
+ assertEquals("ids not same", rexpected.id, ractual.id);
+
+ checkRecordsEquivalent(rexpected.records, ractual.records);
+
+ assertEquals("deletes size not same", rexpected.recordsToDelete.size(), ractual.recordsToDelete.size());
+
+ Iterator<Long> iterDeletesExpected = rexpected.recordsToDelete.iterator();
+
+ Iterator<Long> iterDeletesActual = ractual.recordsToDelete.iterator();
+
+ while (iterDeletesExpected.hasNext())
+ {
+ long lexpected = iterDeletesExpected.next();
+
+ long lactual = iterDeletesActual.next();
+
+ assertEquals("Delete ids not same", lexpected, lactual);
+ }
+ }
+ }
+
+ private void checkRecordsEquivalent(List<RecordInfo> expected, List<RecordInfo> actual)
+ {
+ assertEquals("Lists not same length", expected.size(), actual.size());
+
+ Iterator<RecordInfo> iterExpected = expected.iterator();
+
+ Iterator<RecordInfo> iterActual = actual.iterator();
+
+ while (iterExpected.hasNext())
+ {
+ RecordInfo rexpected = iterExpected.next();
+
+ RecordInfo ractual = iterActual.next();
+
+ assertEquals("ids not same", rexpected.id, ractual.id);
+
+ assertEquals("type not same", rexpected.isUpdate, ractual.isUpdate);
+
+ assertByteArraysEquivalent(rexpected.data, ractual.data);
+ }
+ }
+
+ private byte[] generateRecord(int length)
+ {
+ byte[] record = new byte[length];
+ for (int i = 0; i < length; i++)
+ {
+ record[i] = RandomUtil.randomByte();
+ }
+ return record;
+ }
+
+ class TransactionHolder
+ {
+ List<RecordInfo> records = new ArrayList<RecordInfo>();
+
+ List<Long> deletes = new ArrayList<Long>();
+
+ boolean prepared;
+ }
+
+}
Deleted: trunk/tests/src/org/jboss/messaging/core/journal/impl/test/unit/JournalTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/core/journal/impl/test/unit/JournalTest.java 2008-03-17 10:31:09 UTC (rev 3883)
+++ trunk/tests/src/org/jboss/messaging/core/journal/impl/test/unit/JournalTest.java 2008-03-17 12:04:44 UTC (rev 3884)
@@ -1,236 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005, JBoss Inc., and individual contributors as indicated
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-package org.jboss.messaging.core.journal.impl.test.unit;
-
-import java.io.File;
-import java.nio.ByteBuffer;
-import java.util.Map;
-
-import org.jboss.messaging.core.journal.impl.JournalImpl;
-import org.jboss.messaging.core.journal.impl.test.unit.fakes.FakeSequentialFileFactory;
-import org.jboss.messaging.core.journal.impl.test.unit.fakes.FakeSequentialFileFactory.FakeSequentialFile;
-import org.jboss.messaging.core.logging.Logger;
-import org.jboss.messaging.test.unit.UnitTestCase;
-
-/**
- *
- * A JournalTest
- *
- * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- *
- */
-public class JournalTest extends UnitTestCase
-{
- private static final Logger log = Logger.getLogger(JournalTest.class);
-
- private String journalDir = System.getProperty("user.home") + "/journal-test";
-
- private FakeSequentialFileFactory factory = new FakeSequentialFileFactory();
-
- protected void setUp() throws Exception
- {
- super.setUp();
-
- File file = new File(journalDir);
-
- deleteDirectory(file);
-
- file.mkdir();
- }
-
- public void testLoad() throws Exception
- {
- final int minFiles = 10;
-
- final int minAvailableFiles = 10;
-
- final int fileSize = 10 * 1024;
-
- final boolean sync = true;
-
- final String filePrefix = "jbm";
-
- final String fileExtension = "jbm";
-
- long timeStart = System.currentTimeMillis();
-
- JournalImpl journal =
- new JournalImpl(journalDir, fileSize, minFiles, minAvailableFiles, sync, factory, 5000, filePrefix, fileExtension);
-
- journal.load();
-
- long timeEnd = System.currentTimeMillis();
-
- assertEquals(1, journal.getFiles().size());
- assertEquals(minFiles - 1, journal.getAvailableFiles().size());
-
- assertEquals(minFiles, factory.getFileMap().size());
-
- for (Map.Entry<String, FakeSequentialFile> entry: factory.getFileMap().entrySet())
- {
- FakeSequentialFile file = (FakeSequentialFile)entry.getValue();
-
- assertEquals(sync, file.isSync());
-
- assertTrue(file.isOpen());
-
- byte[] bytes = file.getData().array();
-
- assertEquals(fileSize, bytes.length);
-
- //First four bytes should be ordering id timestamp
-
- ByteBuffer bb = ByteBuffer.wrap(bytes, 0, 8);
- long orderingID = bb.getLong();
-
- String expectedFilename =
- journalDir + "/" + filePrefix + "-" + orderingID + "." + fileExtension;
-
- assertEquals(expectedFilename, file.getFileName());
-
- log.info("Ordering id is " + orderingID);
-
- assertTrue(orderingID >= timeStart);
-
- assertTrue(orderingID <= timeEnd);
-
- for (int i = 8; i < bytes.length; i++)
- {
- if (bytes[i] != JournalImpl.FILL_CHARACTER)
- {
- fail("Not filled correctly");
- }
- }
- }
-
- journal.stop();
-
- for (Map.Entry<String, FakeSequentialFile> entry: factory.getFileMap().entrySet())
- {
- FakeSequentialFile file = (FakeSequentialFile)entry.getValue();
-
- assertFalse(file.isOpen());
- }
-
- assertEquals(0, journal.getFiles().size());
- assertEquals(0, journal.getAvailableFiles().size());
-
- //Now reload
-
- journal = new JournalImpl(journalDir, fileSize, minFiles, minAvailableFiles, sync, factory, 5000, filePrefix, fileExtension);
-
-
- log.info("******** reloading");
-
- journal.load();
-
- assertEquals(1, journal.getFiles().size());
- assertEquals(minFiles - 1, journal.getAvailableFiles().size());
-
- assertEquals(minFiles, factory.getFileMap().size());
-
- for (Map.Entry<String, FakeSequentialFile> entry: factory.getFileMap().entrySet())
- {
- FakeSequentialFile file = (FakeSequentialFile)entry.getValue();
-
- assertEquals(sync, file.isSync());
-
- assertTrue(file.isOpen());
-
- byte[] bytes = file.getData().array();
-
- assertEquals(fileSize, bytes.length);
-
- //First four bytes should be ordering id timestamp
-
- ByteBuffer bb = ByteBuffer.wrap(bytes, 0, 8);
- long orderingID = bb.getLong();
-
- String expectedFilename =
- journalDir + "/" + filePrefix + "-" + orderingID + "." + fileExtension;
-
- assertEquals(expectedFilename, file.getFileName());
-
- log.info("Ordering id is " + orderingID);
-
- assertTrue(orderingID >= timeStart);
-
- assertTrue(orderingID <= timeEnd);
-
- for (int i = 8; i < bytes.length; i++)
- {
- if (bytes[i] != JournalImpl.FILL_CHARACTER)
- {
- fail("Not filled correctly");
- }
- }
- }
-
-
- }
-
-// public void test1() throws Exception
-// {
-// File file = new File(journalDir);
-//
-// JournalImpl journal = new JournalImpl(journalDir, 10 * 1024 * 1024, 10, true, factory);
-//
-// journal.load();
-//
-// long start = System.currentTimeMillis();
-//
-// byte[] bytes = new byte[1024];
-//
-// for (int i = 0; i < bytes.length; i++)
-// {
-// if (i % 100 == 0)
-// {
-// bytes[i] = '\n';
-// }
-// else
-// {
-// bytes[i] = 'T';
-// }
-// }
-//
-// final int numIts = 50000;
-//
-// for (int i = 0; i < numIts; i++)
-// {
-// journal.add(1, bytes);
-// }
-//
-// long end = System.currentTimeMillis();
-//
-// long numbytes = numIts * 1024;
-//
-// double actualRate = 1000 * (double)numbytes / ( end - start);
-//
-// log.info("Rate: (bytes/sec) " + actualRate);
-//
-// double recordRate = 1000 * (double)numIts / ( end - start);
-//
-// log.info("Rate: (records/sec) " + recordRate);
-//
-// }
-
-}
Added: trunk/tests/src/org/jboss/messaging/core/journal/impl/test/unit/NIOSequentialFileFactoryTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/core/journal/impl/test/unit/NIOSequentialFileFactoryTest.java (rev 0)
+++ trunk/tests/src/org/jboss/messaging/core/journal/impl/test/unit/NIOSequentialFileFactoryTest.java 2008-03-17 12:04:44 UTC (rev 3884)
@@ -0,0 +1,56 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.messaging.core.journal.impl.test.unit;
+
+import java.io.File;
+
+import org.jboss.messaging.core.journal.SequentialFileFactory;
+import org.jboss.messaging.core.journal.impl.NIOSequentialFileFactory;
+
+/**
+ *
+ * A NIOSequentialFileFactoryTest
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ */
+public class NIOSequentialFileFactoryTest extends SequentialFileFactoryTestBase
+{
+ protected String journalDir = System.getProperty("user.home") + "/journal-test";
+
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+
+ File file = new File(journalDir);
+
+ deleteDirectory(file);
+
+ file.mkdir();
+ }
+
+ protected SequentialFileFactory createFactory()
+ {
+ return new NIOSequentialFileFactory(journalDir);
+ }
+
+}
Added: trunk/tests/src/org/jboss/messaging/core/journal/impl/test/unit/RealJournalImplTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/core/journal/impl/test/unit/RealJournalImplTest.java (rev 0)
+++ trunk/tests/src/org/jboss/messaging/core/journal/impl/test/unit/RealJournalImplTest.java 2008-03-17 12:04:44 UTC (rev 3884)
@@ -0,0 +1,56 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.messaging.core.journal.impl.test.unit;
+
+import java.io.File;
+
+import org.jboss.messaging.core.journal.SequentialFileFactory;
+import org.jboss.messaging.core.journal.impl.NIOSequentialFileFactory;
+import org.jboss.messaging.core.logging.Logger;
+
+/**
+ *
+ * A RealJournalImplTest
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ */
+public class RealJournalImplTest extends JournalImplTestBase
+{
+ private static final Logger log = Logger.getLogger(RealJournalImplTest.class);
+
+ protected String journalDir = System.getProperty("user.home") + "/journal-test";
+
+ protected void prepareDirectory() throws Exception
+ {
+ File file = new File(journalDir);
+
+ deleteDirectory(file);
+
+ file.mkdir();
+ }
+
+ protected SequentialFileFactory getFileFactory() throws Exception
+ {
+ return new NIOSequentialFileFactory(journalDir);
+ }
+}
Added: trunk/tests/src/org/jboss/messaging/core/journal/impl/test/unit/SequentialFileFactoryTestBase.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/core/journal/impl/test/unit/SequentialFileFactoryTestBase.java (rev 0)
+++ trunk/tests/src/org/jboss/messaging/core/journal/impl/test/unit/SequentialFileFactoryTestBase.java 2008-03-17 12:04:44 UTC (rev 3884)
@@ -0,0 +1,326 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.messaging.core.journal.impl.test.unit;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+
+import org.jboss.messaging.core.journal.SequentialFile;
+import org.jboss.messaging.core.journal.SequentialFileFactory;
+import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.test.unit.UnitTestCase;
+
+/**
+ *
+ * A SequentialFileFactoryTestBase
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ */
+public abstract class SequentialFileFactoryTestBase extends UnitTestCase
+{
+ private static final Logger log = Logger.getLogger(SequentialFileFactoryTestBase.class);
+
+
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+
+ factory = createFactory();
+ }
+
+ protected abstract SequentialFileFactory createFactory();
+
+ private SequentialFileFactory factory;
+
+ public void testCreateAndListFiles() throws Exception
+ {
+ List<String> expectedFiles = new ArrayList<String>();
+
+ final int numFiles = 10;
+
+ for (int i = 0; i < numFiles; i++)
+ {
+ String fileName = UUID.randomUUID().toString() + ".jbm";
+
+ expectedFiles.add(fileName);
+
+ SequentialFile sf = factory.createSequentialFile(fileName, false);
+
+ sf.open();
+
+ assertEquals(fileName, sf.getFileName());
+ }
+
+ //Create a couple with a different extension - they shouldn't be picked up
+
+ SequentialFile sf1 = factory.createSequentialFile("different.file", false);
+ sf1.open();
+
+ SequentialFile sf2 = factory.createSequentialFile("different.cheese", false);
+ sf2.open();
+
+ List<String> fileNames = factory.listFiles("jbm");
+
+ assertEquals(expectedFiles.size(), fileNames.size());
+
+ for (String fileName: expectedFiles)
+ {
+ assertTrue(fileNames.contains(fileName));
+ }
+
+ fileNames = factory.listFiles("file");
+
+ assertEquals(1, fileNames.size());
+
+ assertTrue(fileNames.contains("different.file"));
+
+ fileNames = factory.listFiles("cheese");
+
+ assertEquals(1, fileNames.size());
+
+ assertTrue(fileNames.contains("different.cheese"));
+ }
+
+
+ public void testFill() throws Exception
+ {
+ SequentialFile sf = factory.createSequentialFile("fill.jbm", true);
+
+ sf.open();
+
+ checkFill(sf, 0, 100, (byte)'X');
+
+ checkFill(sf, 13, 300, (byte)'Y');
+
+ checkFill(sf, 0, 1, (byte)'Z');
+
+ checkFill(sf, 100, 1, (byte)'A');
+
+ checkFill(sf, 1000, 10000, (byte)'B');
+ }
+
+ public void testDelete() throws Exception
+ {
+ SequentialFile sf = factory.createSequentialFile("delete-me.jbm", true);
+
+ sf.open();
+
+ SequentialFile sf2 = factory.createSequentialFile("delete-me2.jbm", true);
+
+ sf2.open();
+
+ List<String> fileNames = factory.listFiles("jbm");
+
+ assertEquals(2, fileNames.size());
+
+ assertTrue(fileNames.contains("delete-me.jbm"));
+
+ assertTrue(fileNames.contains("delete-me2.jbm"));
+
+ sf.delete();
+
+ fileNames = factory.listFiles("jbm");
+
+ assertEquals(1, fileNames.size());
+
+ assertTrue(fileNames.contains("delete-me2.jbm"));
+
+ }
+
+ public void testWriteandRead() throws Exception
+ {
+ SequentialFile sf = factory.createSequentialFile("write.jbm", true);
+
+ sf.open();
+
+ String s1 = "aardvark";
+ byte[] bytes1 = s1.getBytes("UTF-8");
+ ByteBuffer bb1 = ByteBuffer.wrap(bytes1);
+
+ String s2 = "hippopotamus";
+ byte[] bytes2 = s2.getBytes("UTF-8");
+ ByteBuffer bb2 = ByteBuffer.wrap(bytes2);
+
+ String s3 = "echidna";
+ byte[] bytes3 = s3.getBytes("UTF-8");
+ ByteBuffer bb3 = ByteBuffer.wrap(bytes3);
+
+ int bytesWritten = sf.write(bb1, true);
+
+ assertEquals(bytes1.length, bytesWritten);
+
+ bytesWritten = sf.write(bb2, true);
+
+ assertEquals(bytes2.length, bytesWritten);
+
+ bytesWritten = sf.write(bb3, true);
+
+ assertEquals(bytes3.length, bytesWritten);
+
+ sf.position(0);
+
+ byte[] rbytes1 = new byte[bytes1.length];
+
+ byte[] rbytes2 = new byte[bytes2.length];
+
+ byte[] rbytes3 = new byte[bytes3.length];
+
+ ByteBuffer rb1 = ByteBuffer.wrap(rbytes1);
+ ByteBuffer rb2 = ByteBuffer.wrap(rbytes2);
+ ByteBuffer rb3 = ByteBuffer.wrap(rbytes3);
+
+ int bytesRead = sf.read(rb1);
+ assertEquals(rbytes1.length, bytesRead);
+ assertByteArraysEquivalent(bytes1, rbytes1);
+
+ bytesRead = sf.read(rb2);
+ assertEquals(rbytes2.length, bytesRead);
+ assertByteArraysEquivalent(bytes2, rbytes2);
+
+ bytesRead = sf.read(rb3);
+ assertEquals(rbytes3.length, bytesRead);
+ assertByteArraysEquivalent(bytes3, rbytes3);
+ }
+
+ public void testPosition() throws Exception
+ {
+ SequentialFile sf = factory.createSequentialFile("position.jbm", true);
+
+ sf.open();
+
+ String s1 = "orange";
+ byte[] bytes1 = s1.getBytes("UTF-8");
+ ByteBuffer bb1 = ByteBuffer.wrap(bytes1);
+
+ String s2 = "grapefruit";
+ byte[] bytes2 = s2.getBytes("UTF-8");
+ ByteBuffer bb2 = ByteBuffer.wrap(bytes2);
+
+ String s3 = "lemon";
+ byte[] bytes3 = s3.getBytes("UTF-8");
+ ByteBuffer bb3 = ByteBuffer.wrap(bytes3);
+
+ int bytesWritten = sf.write(bb1, true);
+
+ assertEquals(bytes1.length, bytesWritten);
+
+ bytesWritten = sf.write(bb2, true);
+
+ assertEquals(bytes2.length, bytesWritten);
+
+ bytesWritten = sf.write(bb3, true);
+
+ assertEquals(bytes3.length, bytesWritten);
+
+ byte[] rbytes1 = new byte[bytes1.length];
+
+ byte[] rbytes2 = new byte[bytes2.length];
+
+ byte[] rbytes3 = new byte[bytes3.length];
+
+ ByteBuffer rb1 = ByteBuffer.wrap(rbytes1);
+ ByteBuffer rb2 = ByteBuffer.wrap(rbytes2);
+ ByteBuffer rb3 = ByteBuffer.wrap(rbytes3);
+
+ sf.position(bytes1.length + bytes2.length);
+
+ int bytesRead = sf.read(rb3);
+ assertEquals(rbytes3.length, bytesRead);
+ assertByteArraysEquivalent(bytes3, rbytes3);
+
+ sf.position(bytes1.length);
+
+ bytesRead = sf.read(rb2);
+ assertEquals(rbytes2.length, bytesRead);
+ assertByteArraysEquivalent(bytes2, rbytes2);
+
+ sf.position(0);
+
+ bytesRead = sf.read(rb1);
+ assertEquals(rbytes1.length, bytesRead);
+ assertByteArraysEquivalent(bytes1, rbytes1);
+ }
+
+ public void testOpenClose() throws Exception
+ {
+ SequentialFile sf = factory.createSequentialFile("openclose.jbm", true);
+
+ sf.open();
+
+ String s1 = "cheesecake";
+ byte[] bytes1 = s1.getBytes("UTF-8");
+ ByteBuffer bb1 = ByteBuffer.wrap(bytes1);
+
+ int bytesWritten = sf.write(bb1, true);
+
+ assertEquals(bytes1.length, bytesWritten);
+
+ sf.close();
+
+ try
+ {
+ sf.write(bb1, true);
+
+ fail("Should throw exception");
+ }
+ catch (Exception e)
+ {
+ //OK
+ }
+
+ sf.open();
+
+ sf.write(bb1, true);
+ }
+
+ // Private ---------------------------------
+
+ private void checkFill(SequentialFile file, int pos, int size, byte fillChar) throws Exception
+ {
+ file.fill(pos, size, fillChar);
+
+ file.close();
+
+ file.open();
+
+ file.position(pos);
+
+ byte[] bytes = new byte[size];
+
+ ByteBuffer bb = ByteBuffer.wrap(bytes);
+
+ int bytesRead = file.read(bb);
+
+ assertEquals(size, bytesRead);
+
+ for (int i = 0; i < size; i++)
+ {
+ //log.info(" i is " + i);
+ assertEquals(fillChar, bytes[i]);
+ }
+
+ }
+
+}
Added: trunk/tests/src/org/jboss/messaging/core/journal/impl/test/unit/fakes/FakeSequentialFile.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/core/journal/impl/test/unit/fakes/FakeSequentialFile.java (rev 0)
+++ trunk/tests/src/org/jboss/messaging/core/journal/impl/test/unit/fakes/FakeSequentialFile.java 2008-03-17 12:04:44 UTC (rev 3884)
@@ -0,0 +1,6 @@
+package org.jboss.messaging.core.journal.impl.test.unit.fakes;
+
+public class FakeSequentialFile
+{
+
+}
Modified: trunk/tests/src/org/jboss/messaging/core/journal/impl/test/unit/fakes/FakeSequentialFileFactory.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/core/journal/impl/test/unit/fakes/FakeSequentialFileFactory.java 2008-03-17 10:31:09 UTC (rev 3883)
+++ trunk/tests/src/org/jboss/messaging/core/journal/impl/test/unit/fakes/FakeSequentialFileFactory.java 2008-03-17 12:04:44 UTC (rev 3884)
@@ -58,15 +58,25 @@
{
sf.data.position(0);
- log.info("positioning data to 0");
+ //log.info("positioning data to 0");
}
return sf;
}
- public List<String> listFiles(String journalDir, String extension)
+ public List<String> listFiles(final String extension)
{
- return new ArrayList<String>(fileMap.keySet());
+ List<String> files = new ArrayList<String>();
+
+ for (String s: fileMap.keySet())
+ {
+ if (s.endsWith("." + extension))
+ {
+ files.add(s);
+ }
+ }
+
+ return files;
}
public Map<String, FakeSequentialFile> getFileMap()
@@ -101,7 +111,7 @@
public boolean isOpen()
{
- log.info("is open" + System.identityHashCode(this) +" open is now " + open);
+ //log.info("is open" + System.identityHashCode(this) +" open is now " + open);
return open;
}
@@ -116,7 +126,10 @@
{
open = false;
- log.info("Calling close " + System.identityHashCode(this) +" open is now " + open);
+ if (data != null)
+ {
+ data.position(0);
+ }
}
public void delete() throws Exception
@@ -141,7 +154,7 @@
public void open() throws Exception
{
- log.info("open called");
+ //log.info("open called");
if (open)
{
@@ -158,18 +171,18 @@
throw new IllegalStateException("Is closed");
}
- log.info("pre-allocate called " + size +" , " + fillCharacter);
+ checkAndResize(pos + size);
- byte[] bytes = new byte[size];
+ //log.info("size is " + size + " pos is " + pos);
for (int i = pos; i < size + pos; i++)
{
- bytes[i] = fillCharacter;
- }
-
- data = ByteBuffer.wrap(bytes);
+ data.array()[i] = fillCharacter;
+
+ //log.info("Filling " + pos + " with char " + fillCharacter);
+ }
}
-
+
public int read(ByteBuffer bytes) throws Exception
{
if (!open)
@@ -177,11 +190,11 @@
throw new IllegalStateException("Is closed");
}
- log.info("read called " + bytes.array().length);
+ //log.info("read called " + bytes.array().length);
byte[] bytesRead = new byte[bytes.array().length];
- log.info("reading, data pos is " + data.position() + " data size is " + data.array().length);
+ //log.info("reading, data pos is " + data.position() + " data size is " + data.array().length);
data.get(bytesRead);
@@ -190,30 +203,56 @@
return bytesRead.length;
}
- public void reset() throws Exception
+ public void position(int pos) throws Exception
{
if (!open)
{
throw new IllegalStateException("Is closed");
}
- log.info("reset called");
+ //log.info("reset called");
- data.position(0);
+ data.position(pos);
}
- public void write(ByteBuffer bytes) throws Exception
+ public int write(ByteBuffer bytes, boolean sync) throws Exception
{
if (!open)
{
throw new IllegalStateException("Is closed");
}
- log.info("write called, position is " + data.position() + " bytes is " + bytes.array().length);
+ int position = data == null ? 0 : data.position();
+ checkAndResize(bytes.capacity() + position);
+
+ //log.info("write called, position is " + data.position() + " bytes is " + bytes.array().length);
+
data.put(bytes);
+
+ return bytes.array().length;
}
+
+ private void checkAndResize(int size)
+ {
+ int oldpos = data == null ? 0 : data.position();
+
+ if (data == null || data.array().length < size)
+ {
+ byte[] newBytes = new byte[size];
+
+ if (data != null)
+ {
+ System.arraycopy(data.array(), 0, newBytes, 0, data.array().length);
+ }
+
+ data = ByteBuffer.wrap(newBytes);
+
+ data.position(oldpos);
+ }
+ }
+
}
}
More information about the jboss-cvs-commits
mailing list