[jboss-cvs] JBoss Messaging SVN: r4684 - in trunk: src/main/org/jboss/messaging/core/journal/impl and 6 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Wed Jul 16 01:08:46 EDT 2008
Author: clebert.suconic at jboss.com
Date: 2008-07-16 01:08:16 -0400 (Wed, 16 Jul 2008)
New Revision: 4684
Added:
trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/EasyMockJournalTest.java
trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/fakes/SimpleEncoding.java
Modified:
trunk/src/main/org/jboss/messaging/core/journal/TestableJournal.java
trunk/src/main/org/jboss/messaging/core/journal/impl/JournalFile.java
trunk/src/main/org/jboss/messaging/core/journal/impl/JournalFileImpl.java
trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java
trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java
trunk/tests/src/org/jboss/messaging/tests/performance/journal/impl/JournalImplTestUnit.java
trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/AlignedJournalImplTest.java
trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalImplTestBase.java
trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalImplTestUnit.java
trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/ReclaimerTest.java
trunk/tests/src/org/jboss/messaging/tests/unit/core/persistence/impl/journal/JournalStorageManagerTest.java
trunk/tests/src/org/jboss/messaging/tests/util/UnitTestCase.java
Log:
Journal Improvements
Modified: trunk/src/main/org/jboss/messaging/core/journal/TestableJournal.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/TestableJournal.java 2008-07-14 16:47:22 UTC (rev 4683)
+++ trunk/src/main/org/jboss/messaging/core/journal/TestableJournal.java 2008-07-16 05:08:16 UTC (rev 4684)
@@ -54,8 +54,6 @@
boolean isSyncNonTransactional();
- long getTaskPeriod();
-
String getFilePrefix();
String getFileExtension();
@@ -65,4 +63,6 @@
long getAIOTimeout();
void forceMoveNextFile() throws Exception;
+
+ void disableAutoReclaiming();
}
Modified: trunk/src/main/org/jboss/messaging/core/journal/impl/JournalFile.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/impl/JournalFile.java 2008-07-14 16:47:22 UTC (rev 4683)
+++ trunk/src/main/org/jboss/messaging/core/journal/impl/JournalFile.java 2008-07-16 05:08:16 UTC (rev 4684)
@@ -53,7 +53,7 @@
int getOffset();
- long getOrderingID();
+ int getOrderingID();
void setOffset(final int offset);
Modified: trunk/src/main/org/jboss/messaging/core/journal/impl/JournalFileImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/impl/JournalFileImpl.java 2008-07-14 16:47:22 UTC (rev 4683)
+++ trunk/src/main/org/jboss/messaging/core/journal/impl/JournalFileImpl.java 2008-07-16 05:08:16 UTC (rev 4684)
@@ -43,7 +43,7 @@
private final SequentialFile file;
- private final long orderingID;
+ private final int orderingID;
private int offset;
@@ -53,7 +53,7 @@
private Map<JournalFile, Integer> negCounts = new HashMap<JournalFile, Integer>();
- public JournalFileImpl(final SequentialFile file, final long orderingID)
+ public JournalFileImpl(final SequentialFile file, final int orderingID)
{
this.file = file;
@@ -118,7 +118,7 @@
return offset;
}
- public long getOrderingID()
+ public int getOrderingID()
{
return orderingID;
}
Modified: trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java 2008-07-14 16:47:22 UTC (rev 4683)
+++ trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java 2008-07-16 05:08:16 UTC (rev 4684)
@@ -34,8 +34,6 @@
import java.util.NoSuchElementException;
import java.util.Queue;
import java.util.Set;
-import java.util.Timer;
-import java.util.TimerTask;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
@@ -46,6 +44,7 @@
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.jboss.messaging.core.journal.EncodingSupport;
@@ -70,10 +69,8 @@
*/
public class JournalImpl implements TestableJournal
{
- private static final Logger log = Logger.getLogger(JournalImpl.class);
- private static final boolean trace = log.isTraceEnabled();
-
+ // Constants -----------------------------------------------------
private static final int STATE_STOPPED = 0;
private static final int STATE_STARTED = 1;
@@ -85,59 +82,66 @@
private static final int SIZE_LONG = 8;
private static final int SIZE_INT = 4;
+
+ private static final int SIZE_SHORT = 2;
private static final int SIZE_BYTE = 1;
public static final int MIN_FILE_SIZE = 1024;
- public static final int MIN_TASK_PERIOD = 1000;
+ public static final int SIZE_HEADER = 4;
+
//Record markers - they must be all unique
- public static final int SIZE_HEADER = 8;
+ public static final int BASIC_SIZE = SIZE_BYTE + SIZE_INT + SIZE_INT;
- public static final int SIZE_ADD_RECORD = SIZE_BYTE + SIZE_LONG + SIZE_BYTE + SIZE_INT + SIZE_BYTE; // + record.length
+ public static final int SIZE_ADD_RECORD = BASIC_SIZE + SIZE_LONG + SIZE_BYTE + SIZE_INT; // + record.length
public static final byte ADD_RECORD = 11;
- public static final byte SIZE_UPDATE_RECORD = SIZE_BYTE + SIZE_LONG + SIZE_BYTE + SIZE_INT + SIZE_BYTE; // + record.length;
+ public static final byte SIZE_UPDATE_RECORD = BASIC_SIZE + SIZE_LONG + SIZE_BYTE + SIZE_INT; // + record.length;
public static final byte UPDATE_RECORD = 12;
- public static final int SIZE_DELETE_RECORD = SIZE_BYTE + SIZE_LONG + SIZE_BYTE;
+ public static final int SIZE_ADD_RECORD_TX = BASIC_SIZE + SIZE_LONG + SIZE_BYTE + SIZE_LONG + SIZE_INT; // + record.length
+
+ public static final byte ADD_RECORD_TX = 13;
- public static final byte DELETE_RECORD = 13;
+ public static final int SIZE_UPDATE_RECORD_TX = BASIC_SIZE + SIZE_LONG + SIZE_BYTE + SIZE_LONG + SIZE_INT; // + record.length
- public static final byte ADD_RECORD_TX = 14;
+ public static final byte UPDATE_RECORD_TX = 14;
- public static final int SIZE_ADD_RECORD_TX = SIZE_BYTE + SIZE_LONG + SIZE_BYTE + SIZE_LONG + SIZE_INT + SIZE_BYTE; // Add the size of Bytes on this
+ public static final int SIZE_DELETE_RECORD = BASIC_SIZE + SIZE_LONG;
- public static final int SIZE_UPDATE_RECORD_TX = SIZE_BYTE + SIZE_LONG + SIZE_BYTE + SIZE_LONG + SIZE_INT + SIZE_BYTE; // Add the size of Bytes on this
+ public static final byte DELETE_RECORD = 15;
- public static final byte UPDATE_RECORD_TX = 15;
+ public static final int SIZE_DELETE_RECORD_TX = BASIC_SIZE + SIZE_LONG + SIZE_LONG;
- public static final int SIZE_DELETE_RECORD_TX = SIZE_BYTE + SIZE_LONG + SIZE_LONG + SIZE_BYTE;
-
public static final byte DELETE_RECORD_TX = 16;
- public static final int SIZE_PREPARE_RECORD = SIZE_BYTE + SIZE_LONG + SIZE_BYTE;
+ public static final int SIZE_PREPARE_RECORD = BASIC_SIZE + SIZE_LONG;
public static final byte PREPARE_RECORD = 17;
- public static final byte SIZE_COMMIT_RECORD = SIZE_BYTE + SIZE_LONG + SIZE_BYTE;
+ public static final int SIZE_COMMIT_RECORD = BASIC_SIZE + SIZE_LONG;
public static final byte COMMIT_RECORD = 18;
- public static final byte SIZE_ROLLBACK_RECORD = SIZE_BYTE + SIZE_LONG + SIZE_BYTE;
+ public static final int SIZE_ROLLBACK_RECORD = BASIC_SIZE + SIZE_LONG;
public static final byte ROLLBACK_RECORD = 19;
- public static final byte DONE = 20;
-
public static final byte FILL_CHARACTER = 74; // Letter 'J'
+ // Attributes ----------------------------------------------------
+
+ private boolean autoReclaim = true;
+
+ private AtomicInteger nextOrderingId = new AtomicInteger(0);
+
// used for Asynchronous IO only (ignored on NIO).
private final int maxAIO;
@@ -154,13 +158,10 @@
private final SequentialFileFactory fileFactory;
- private final long taskPeriod;
-
public final String filePrefix;
public final String fileExtension;
-
private final Queue<JournalFile> dataFiles = new ConcurrentLinkedQueue<JournalFile>();
private final Queue<JournalFile> freeFiles = new ConcurrentLinkedQueue<JournalFile>();
@@ -196,17 +197,21 @@
private volatile long lastOrderingID;
- private final Timer timer = new Timer(true);
-
- private TimerTask reclaimerTask;
-
private final AtomicLong transactionIDSequence = new AtomicLong(0);
private Reclaimer reclaimer = new Reclaimer();
+ // Static --------------------------------------------------------
+
+ private static final Logger log = Logger.getLogger(JournalImpl.class);
+
+ private static final boolean trace = log.isTraceEnabled();
+
+ // Constructors --------------------------------------------------
+
public JournalImpl(final int fileSize, final int minFiles,
final boolean syncTransactional, final boolean syncNonTransactional,
- final SequentialFileFactory fileFactory, final long taskPeriod,
+ final SequentialFileFactory fileFactory,
final String filePrefix, final String fileExtension, final int maxAIO, final long aioTimeout)
{
if (fileSize < MIN_FILE_SIZE)
@@ -221,10 +226,6 @@
{
throw new NullPointerException("fileFactory is null");
}
- if (taskPeriod < MIN_TASK_PERIOD)
- {
- throw new IllegalArgumentException("taskPeriod cannot be less than " + MIN_TASK_PERIOD);
- }
if (filePrefix == null)
{
throw new NullPointerException("filePrefix is null");
@@ -252,8 +253,6 @@
this.fileFactory = fileFactory;
- this.taskPeriod = taskPeriod;
-
this.filePrefix = filePrefix;
this.fileExtension = fileExtension;
@@ -279,11 +278,12 @@
ByteBufferWrapper bb = new ByteBufferWrapper(fileFactory.newBuffer(size));
bb.putByte(ADD_RECORD);
+ bb.position(SIZE_BYTE + SIZE_INT); // skip ID part
+ bb.putInt(recordLength);
bb.putLong(id);
bb.putByte(recordType);
- bb.putInt(recordLength);
record.encode(bb);
- bb.putByte(DONE);
+ bb.putInt(size);
bb.rewind();
JournalFile usedFile = appendRecord(bb.getBuffer(), syncNonTransactional, null);
@@ -298,16 +298,18 @@
throw new IllegalStateException("Journal must be loaded first");
}
+
int size = SIZE_ADD_RECORD + record.length;
ByteBuffer bb = fileFactory.newBuffer(size);
- bb.put(ADD_RECORD);
+ bb.put(ADD_RECORD);
+ bb.position(SIZE_BYTE + SIZE_INT); // skip ID part
+ bb.putInt(record.length);
bb.putLong(id);
bb.put(recordType);
- bb.putInt(record.length);
bb.put(record);
- bb.put(DONE);
+ bb.putInt(size);
bb.rewind();
JournalFile usedFile = appendRecord(bb, syncNonTransactional, null);
@@ -334,11 +336,12 @@
ByteBuffer bb = fileFactory.newBuffer(size);
bb.put(UPDATE_RECORD);
+ bb.position(SIZE_BYTE + SIZE_INT); // skip ID part
+ bb.putInt(record.length);
bb.putLong(id);
bb.put(recordType);
- bb.putInt(record.length);
bb.put(record);
- bb.put(DONE);
+ bb.putInt(size);
bb.rewind();
JournalFile usedFile = appendRecord(bb, syncNonTransactional, null);
@@ -365,11 +368,12 @@
ByteBufferWrapper bb = new ByteBufferWrapper(fileFactory.newBuffer(size));
bb.putByte(UPDATE_RECORD);
+ bb.position(SIZE_BYTE + SIZE_INT); // skip ID part
+ bb.putInt(record.getEncodeSize());
bb.putLong(id);
bb.putByte(recordType);
- bb.putInt(record.getEncodeSize());
record.encode(bb);
- bb.putByte(DONE);
+ bb.putInt(size);
bb.rewind();
JournalFile usedFile = appendRecord(bb.getBuffer(), syncNonTransactional, null);
@@ -396,8 +400,9 @@
ByteBuffer bb = fileFactory.newBuffer(size);
bb.put(DELETE_RECORD);
+ bb.position(SIZE_BYTE + SIZE_INT); // skip ID part
bb.putLong(id);
- bb.put(DONE);
+ bb.putInt(size);
bb.rewind();
JournalFile usedFile = appendRecord(bb, syncNonTransactional, null);
@@ -424,12 +429,13 @@
ByteBufferWrapper bb = new ByteBufferWrapper(fileFactory.newBuffer(size));
bb.putByte(ADD_RECORD_TX);
+ bb.position(SIZE_BYTE + SIZE_INT); // skip ID part
+ bb.putInt(recordLength);
bb.putLong(txID);
bb.putByte(recordType);
bb.putLong(id);
- bb.putInt(recordLength);
record.encode(bb);
- bb.putByte(DONE);
+ bb.putInt(size);
bb.rewind();
JournalFile usedFile;
@@ -453,12 +459,13 @@
ByteBuffer bb = fileFactory.newBuffer(size);
bb.put(ADD_RECORD_TX);
+ bb.position(SIZE_BYTE + SIZE_INT); // skip ID part
+ bb.putInt(record.length);
bb.putLong(txID);
bb.put(recordType);
bb.putLong(id);
- bb.putInt(record.length);
bb.put(record);
- bb.put(DONE);
+ bb.putInt(size);
bb.rewind();
JournalFile usedFile;
@@ -482,12 +489,13 @@
ByteBuffer bb = fileFactory.newBuffer(size);
bb.put(UPDATE_RECORD_TX);
+ bb.position(SIZE_BYTE + SIZE_INT); // skip ID part
+ bb.putInt(record.length);
bb.putLong(txID);
bb.put(recordType);
bb.putLong(id);
- bb.putInt(record.length);
bb.put(record);
- bb.put(DONE);
+ bb.putInt(size);
bb.rewind();
JournalFile usedFile;
@@ -512,12 +520,13 @@
bb.putByte(UPDATE_RECORD_TX);
+ bb.position(SIZE_BYTE + SIZE_INT); // skip ID part
+ bb.putInt(record.getEncodeSize());
bb.putLong(txID);
bb.putByte(recordType);
bb.putLong(id);
- bb.putInt(record.getEncodeSize());
record.encode(bb);
- bb.putByte(DONE);
+ bb.putInt(size);
bb.rewind();
JournalFile usedFile;
@@ -541,9 +550,10 @@
ByteBuffer bb = fileFactory.newBuffer(size);
bb.put(DELETE_RECORD_TX);
+ bb.position(SIZE_BYTE + SIZE_INT); // skip ID part
bb.putLong(txID);
bb.putLong(id);
- bb.put(DONE);
+ bb.putInt(size);
bb.rewind();
JournalFile usedFile;
@@ -574,8 +584,9 @@
ByteBuffer bb = fileFactory.newBuffer(size);
bb.put(PREPARE_RECORD);
+ bb.position(SIZE_BYTE + SIZE_INT); // skip ID part
bb.putLong(txID);
- bb.put(DONE);
+ bb.putInt(size);
bb.rewind();
JournalFile usedFile;
@@ -604,8 +615,9 @@
ByteBuffer bb = fileFactory.newBuffer(size);
bb.put(COMMIT_RECORD);
+ bb.position(SIZE_BYTE + SIZE_INT); // skip ID part
bb.putLong(txID);
- bb.put(DONE);
+ bb.putInt(size);
bb.rewind();
JournalFile usedFile;
@@ -637,8 +649,9 @@
ByteBuffer bb = fileFactory.newBuffer(size);
bb.put(ROLLBACK_RECORD);
+ bb.position(SIZE_BYTE + SIZE_INT); // skip ID part
bb.putLong(txID);
- bb.put(DONE);
+ bb.putInt(size);
bb.rewind();
JournalFile usedFile;
@@ -674,12 +687,17 @@
file.open();
- ByteBuffer bb = fileFactory.newBuffer(SIZE_LONG);
+ ByteBuffer bb = fileFactory.newBuffer(SIZE_INT);
file.read(bb);
- long orderingID = bb.getLong();
+ int orderingID = bb.getInt();
+ if (nextOrderingId.get() < orderingID)
+ {
+ nextOrderingId.set(orderingID);
+ }
+
orderedFiles.add(new JournalFileImpl(file, orderingID));
file.close();
@@ -691,8 +709,8 @@
{
public int compare(JournalFile f1, JournalFile f2)
{
- long id1 = f1.getOrderingID();
- long id2 = f2.getOrderingID();
+ int id1 = f1.getOrderingID();
+ int id2 = f2.getOrderingID();
return (id1 < id2 ? -1 : (id1 == id2 ? 0 : 1));
}
@@ -723,19 +741,113 @@
}
//First long is the ordering timestamp, we just jump its position
- bb.position(file.getFile().calculateBlockStart(SIZE_LONG));
+ bb.position(file.getFile().calculateBlockStart(SIZE_HEADER));
boolean hasData = false;
while (bb.hasRemaining())
{
- int pos = bb.position();
+ final int pos = bb.position();
byte recordType = bb.get();
+ if (recordType < ADD_RECORD || recordType > ROLLBACK_RECORD)
+ {
+ if (trace)
+ {
+ log.trace("Invalid record type at " + bb.position() + " file:" + file);
+ }
+ continue;
+ }
+
+ if (bb.position() + SIZE_INT > fileSize)
+ {
+ continue;
+ }
+
+ int readFileId = bb.getInt();
+
+ // The variable record portion used on Updates and Appends
+ int variableSize = 0;
+ // The record size (without the variable portion)
+ int recordSize = 0;
+
+ if (recordType >= ADD_RECORD && recordType <= UPDATE_RECORD_TX)
+ {
+ if (bb.position() + SIZE_INT > fileSize)
+ {
+ continue;
+ }
+
+ variableSize = bb.getInt();
+ }
+
switch(recordType)
{
case ADD_RECORD:
+ recordSize = SIZE_ADD_RECORD;
+ break;
+ case UPDATE_RECORD:
+ recordSize = SIZE_UPDATE_RECORD;
+ break;
+ case ADD_RECORD_TX:
+ recordSize = SIZE_ADD_RECORD_TX;
+ break;
+ case UPDATE_RECORD_TX:
+ recordSize = SIZE_UPDATE_RECORD_TX;
+ break;
+ case DELETE_RECORD:
+ recordSize = SIZE_DELETE_RECORD;
+ break;
+ case DELETE_RECORD_TX:
+ recordSize = SIZE_DELETE_RECORD_TX;
+ break;
+ case PREPARE_RECORD:
+ recordSize = SIZE_PREPARE_RECORD;
+ break;
+ case COMMIT_RECORD:
+ recordSize = SIZE_COMMIT_RECORD;
+ break;
+ case ROLLBACK_RECORD:
+ recordSize = SIZE_ROLLBACK_RECORD;
+ break;
+ default:
+ // Sanity check, this was previously tested, nothing different should be on this switch
+ throw new IllegalStateException("Record other than expected");
+
+ }
+
+ if (pos + recordSize + variableSize > fileSize)
+ {
+ continue;
+ }
+
+ int oldPos = bb.position();
+
+ bb.position(pos + variableSize + recordSize - SIZE_INT);
+
+ int checkSize = bb.getInt();
+
+ if (checkSize != variableSize + recordSize)
+ {
+ log.warn("Record at position " + pos + " is corrupted and it is being ignored");
+ bb.position(pos + SIZE_BYTE);
+ continue;
+ }
+
+ if (readFileId != file.getOrderingID())
+ {
+ //log.info("Record read at position " + pos + " doesn't belong to this current journal file, ignoring it!");
+ continue;
+ }
+
+ bb.position(oldPos);
+
+
+
+ switch(recordType)
+ {
+ case ADD_RECORD:
{
long id = bb.getLong();
@@ -743,22 +855,14 @@
byte userRecordType = bb.get();
- int size = bb.getInt();
- byte[] record = new byte[size];
+ byte[] record = new byte[variableSize];
+
bb.get(record);
- byte end = bb.get();
+
+ records.add(new RecordInfo(id, userRecordType, record, false));
+ hasData = true;
- if (end != DONE)
- {
- repairFrom(pos, file);
- }
- else
- {
- records.add(new RecordInfo(id, userRecordType, record, false));
- hasData = true;
-
- posFilesMap.put(id, new PosFiles(file));
- }
+ posFilesMap.put(id, new PosFiles(file));
break;
}
@@ -770,30 +874,21 @@
byte userRecordType = bb.get();
- int size = bb.getInt();
- byte[] record = new byte[size];
+ byte[] record = new byte[variableSize];
bb.get(record);
- byte end = bb.get();
+
+ records.add(new RecordInfo(id, userRecordType, record, true));
+ hasData = true;
+ file.incPosCount();
- if (end != DONE)
+ PosFiles posFiles = posFilesMap.get(id);
+
+ if (posFiles != null)
{
- repairFrom(pos, file);
- }
- else
- {
- records.add(new RecordInfo(id, userRecordType, record, true));
- hasData = true;
- file.incPosCount();
+ //It's legal for this to be null. The file(s) with the may have been deleted
+ //just leaving some updates in this file
- PosFiles posFiles = posFilesMap.get(id);
-
- if (posFiles != null)
- {
- //It's legal for this to be null. The file(s) with the may have been deleted
- //just leaving some updates in this file
-
- posFiles.addUpdateFile(file);
- }
+ posFiles.addUpdateFile(file);
}
break;
@@ -804,24 +899,15 @@
maxMessageID = Math.max(maxMessageID, id);
- byte end = bb.get();
+ recordsToDelete.add(id);
+ hasData = true;
- if (end != DONE)
+ PosFiles posFiles = posFilesMap.remove(id);
+
+ if (posFiles != null)
{
- repairFrom(pos, file);
- }
- else
- {
- recordsToDelete.add(id);
- hasData = true;
-
- PosFiles posFiles = posFilesMap.remove(id);
-
- if (posFiles != null)
- {
- posFiles.addDelete(file);
- }
- }
+ posFiles.addDelete(file);
+ }
break;
}
@@ -835,41 +921,32 @@
long id = bb.getLong();
maxMessageID = Math.max(maxMessageID, id);
- int size = bb.getInt();
- byte[] record = new byte[size];
+ byte[] record = new byte[variableSize];
bb.get(record);
- byte end = bb.get();
- if (end != DONE)
+ TransactionHolder tx = transactions.get(txID);
+
+ if (tx == null)
{
- repairFrom(pos, file);
+ tx = new TransactionHolder(txID);
+ transactions.put(txID, tx);
}
- else
- {
- TransactionHolder tx = transactions.get(txID);
+
+ tx.recordInfos.add(new RecordInfo(id, userRecordType, record, false));
+
+ TransactionNegPos tnp = transactionInfos.get(txID);
+
+ if (tnp == null)
+ {
+ tnp = new TransactionNegPos();
- if (tx == null)
- {
- tx = new TransactionHolder(txID);
- transactions.put(txID, tx);
- }
-
- tx.recordInfos.add(new RecordInfo(id, userRecordType, record, false));
-
- TransactionNegPos tnp = transactionInfos.get(txID);
-
- if (tnp == null)
- {
- tnp = new TransactionNegPos();
-
- transactionInfos.put(txID, tnp);
- }
-
- tnp.addPos(file, id);
-
- hasData = true;
+ transactionInfos.put(txID, tnp);
}
+ tnp.addPos(file, id);
+
+ hasData = true;
+
break;
}
case UPDATE_RECORD_TX:
@@ -882,41 +959,32 @@
long id = bb.getLong();
maxMessageID = Math.max(maxMessageID, id);
- int size = bb.getInt();
- byte[] record = new byte[size];
+ byte[] record = new byte[variableSize];
bb.get(record);
- byte end = bb.get();
- if (end != DONE)
+ TransactionHolder tx = transactions.get(txID);
+
+ if (tx == null)
{
- repairFrom(pos, file);
+ tx = new TransactionHolder(txID);
+ transactions.put(txID, tx);
}
- else
- {
- TransactionHolder tx = transactions.get(txID);
+
+ tx.recordInfos.add(new RecordInfo(id, userRecordType, record, true));
+
+ TransactionNegPos tnp = transactionInfos.get(txID);
+
+ if (tnp == null)
+ {
+ tnp = new TransactionNegPos();
- if (tx == null)
- {
- tx = new TransactionHolder(txID);
- transactions.put(txID, tx);
- }
-
- tx.recordInfos.add(new RecordInfo(id, userRecordType, record, true));
-
- TransactionNegPos tnp = transactionInfos.get(txID);
-
- if (tnp == null)
- {
- tnp = new TransactionNegPos();
-
- transactionInfos.put(txID, tnp);
- }
-
- tnp.addPos(file, id);
-
- hasData = true;
+ transactionInfos.put(txID, tnp);
}
+ tnp.addPos(file, id);
+
+ hasData = true;
+
break;
}
case DELETE_RECORD_TX:
@@ -926,38 +994,29 @@
long id = bb.getLong();
maxMessageID = Math.max(maxMessageID, id);
- byte end = bb.get();
+ TransactionHolder tx = transactions.get(txID);
- if (end != DONE)
+ if (tx == null)
{
- repairFrom(pos, file);
+ tx = new TransactionHolder(txID);
+ transactions.put(txID, tx);
}
- else
- {
- TransactionHolder tx = transactions.get(txID);
+
+ tx.recordsToDelete.add(id);
+
+ TransactionNegPos tnp = transactionInfos.get(txID);
+
+ if (tnp == null)
+ {
+ tnp = new TransactionNegPos();
- if (tx == null)
- {
- tx = new TransactionHolder(txID);
- transactions.put(txID, tx);
- }
-
- tx.recordsToDelete.add(id);
-
- TransactionNegPos tnp = transactionInfos.get(txID);
-
- if (tnp == null)
- {
- tnp = new TransactionNegPos();
-
- transactionInfos.put(txID, tnp);
- }
-
- tnp.addNeg(file, id);
-
- hasData = true;
+ transactionInfos.put(txID, tnp);
}
+ tnp.addNeg(file, id);
+
+ hasData = true;
+
break;
}
case PREPARE_RECORD:
@@ -965,35 +1024,27 @@
long txID = bb.getLong();
maxTransactionID = Math.max(maxTransactionID, txID);
- byte end = bb.get();
+
+ TransactionHolder tx = transactions.get(txID);
- if (end != DONE)
+ if (tx == null)
{
- repairFrom(pos, file);
+ throw new IllegalStateException("Cannot find tx with id " + txID);
}
- else
+
+ tx.prepared = true;
+
+ TransactionNegPos tnp = transactionInfos.get(txID);
+
+ if (tnp == null)
{
- TransactionHolder tx = transactions.get(txID);
-
- if (tx == null)
- {
- throw new IllegalStateException("Cannot find tx with id " + txID);
- }
-
- tx.prepared = true;
-
- TransactionNegPos tnp = transactionInfos.get(txID);
-
- if (tnp == null)
- {
- throw new IllegalStateException("Cannot find tx " + txID);
- }
-
- tnp.prepare(file);
-
- hasData = true;
+ throw new IllegalStateException("Cannot find tx " + txID);
}
+ tnp.prepare(file);
+
+ hasData = true;
+
break;
}
case COMMIT_RECORD:
@@ -1001,32 +1052,23 @@
long txID = bb.getLong();
maxTransactionID = Math.max(maxTransactionID, txID);
- byte end = bb.get();
+ TransactionHolder tx = transactions.remove(txID);
- if (end != DONE)
+ if (tx != null)
{
- repairFrom(pos, file);
- }
- else
- {
- TransactionHolder tx = transactions.remove(txID);
+ records.addAll(tx.recordInfos);
+ recordsToDelete.addAll(tx.recordsToDelete);
- if (tx != null)
+ TransactionNegPos tnp = transactionInfos.remove(txID);
+
+ if (tnp == null)
{
- records.addAll(tx.recordInfos);
- recordsToDelete.addAll(tx.recordsToDelete);
-
- TransactionNegPos tnp = transactionInfos.remove(txID);
-
- if (tnp == null)
- {
- throw new IllegalStateException("Cannot find tx " + txID);
- }
-
- tnp.commit(file);
-
- hasData = true;
+ throw new IllegalStateException("Cannot find tx " + txID);
}
+
+ tnp.commit(file);
+
+ hasData = true;
}
break;
@@ -1036,49 +1078,25 @@
long txID = bb.getLong();
maxTransactionID = Math.max(maxTransactionID, txID);
- byte end = bb.get();
- if (end != DONE)
- {
- repairFrom(pos, file);
- }
- else
- {
- TransactionHolder tx = transactions.remove(txID);
-
- if (tx != null)
- {
- TransactionNegPos tnp = transactionInfos.remove(txID);
-
- if (tnp == null)
- {
- throw new IllegalStateException("Cannot find tx " + txID);
- }
-
- tnp.rollback(file);
-
- hasData = true;
- }
- }
+ TransactionHolder tx = transactions.remove(txID);
- break;
- }
- case FILL_CHARACTER:
- {
- //End of records in file - we check the file only contains fill characters from this point
- while (bb.hasRemaining())
- {
- byte b = bb.get();
+ if (tx != null)
+ {
+ TransactionNegPos tnp = transactionInfos.remove(txID);
- if (b != FILL_CHARACTER)
+ if (tnp == null)
{
- throw new IllegalStateException("Corrupt file " + file.getFile().getFileName() +
- " contains non fill character at position " + pos);
+ throw new IllegalStateException("Cannot find tx " + txID);
}
+
+ tnp.rollback(file);
+
+ hasData = true;
}
- break;
- }
+ break;
+ }
default:
{
throw new IllegalStateException("Journal " + file.getFile().getFileName() +
@@ -1086,6 +1104,13 @@
}
}
+ checkSize = bb.getInt();
+
+ if (checkSize != variableSize + recordSize)
+ {
+ throw new IllegalStateException("Internal error on loading file. Position doesn't match with checkSize");
+ }
+
bb.position(file.getFile().calculateBlockStart(bb.position()));
if (recordType != FILL_CHARACTER)
@@ -1296,6 +1321,8 @@
if (trace) log.trace("Reclaiming file " + file);
+ log.info("Reclaiming file " + file);
+
dataFiles.remove(file);
//FIXME - size() involves a scan!!!
@@ -1303,7 +1330,7 @@
{
//Re-initialise it
- long newOrderingID = generateOrderingID();
+ int newOrderingID = generateOrderingID();
SequentialFile sf = file.getFile();
@@ -1311,14 +1338,8 @@
ByteBuffer bb = fileFactory.newBuffer(SIZE_LONG);
- bb.putLong(newOrderingID);
+ bb.putInt(newOrderingID);
- //Note we MUST re-fill it - otherwise we won't be able to detect corrupt records
-
- //TODO - if we can avoid this somehow would be good, since filling the file is a heavyweight
- //operation and can impact other IO operations on the disk
- sf.fill(0, fileSize, FILL_CHARACTER);
-
int bytesWritten = sf.write(bb, true);
JournalFile jf = new JournalFileImpl(sf, newOrderingID);
@@ -1337,6 +1358,8 @@
file.getFile().delete();
}
+
+ log.info("Done reclaiming");
}
}
}
@@ -1381,11 +1404,6 @@
return syncNonTransactional;
}
- public long getTaskPeriod()
- {
- return taskPeriod;
- }
-
public String getFilePrefix()
{
return filePrefix;
@@ -1422,7 +1440,13 @@
debugWait();
}
+
+ public void disableAutoReclaiming()
+ {
+ this.autoReclaim = false;
+ }
+
// MessagingComponent implementation ---------------------------------------------------
public synchronized boolean isStarted()
@@ -1491,10 +1515,6 @@
{
throw new IllegalStateException("Journal is stopped");
}
-
- reclaimerTask = new ReclaimerTask();
-
- timer.schedule(reclaimerTask, taskPeriod, taskPeriod);
}
public void stopReclaimer()
@@ -1503,11 +1523,6 @@
{
throw new IllegalStateException("Journal is already stopped");
}
-
- if (reclaimerTask != null)
- {
- reclaimerTask.cancel();
- }
}
// Public -----------------------------------------------------------------------------
@@ -1523,6 +1538,9 @@
try
{
checkFile(size);
+ bb.position(SIZE_BYTE);
+ bb.putInt(currentFile.getOrderingID());
+ bb.rewind();
if (callback != null)
{
currentFile.getFile().write(bb, callback);
@@ -1557,7 +1575,7 @@
private JournalFile createFile(boolean keepOpened) throws Exception
{
- long orderingID = generateOrderingID();
+ int orderingID = generateOrderingID();
String fileName = filePrefix + "-" + orderingID + "." + fileExtension;
@@ -1569,9 +1587,9 @@
sequentialFile.fill(0, fileSize, FILL_CHARACTER);
- ByteBuffer bb = fileFactory.newBuffer(SIZE_LONG);
+ ByteBuffer bb = fileFactory.newBuffer(SIZE_INT);
- bb.putLong(orderingID);
+ bb.putInt(orderingID);
bb.rewind();
@@ -1592,28 +1610,12 @@
private void openFile(JournalFile file) throws Exception
{
file.getFile().open();
- file.getFile().position(file.getFile().calculateBlockStart(SIZE_LONG));
+ file.getFile().position(file.getFile().calculateBlockStart(SIZE_HEADER));
}
- private long generateOrderingID()
+ private int generateOrderingID()
{
- long orderingID = System.currentTimeMillis();
-
- while (orderingID == lastOrderingID)
- {
- //Ensure it's unique
- try
- {
- Thread.sleep(1);
- }
- catch (InterruptedException ignore)
- {
- }
- orderingID = System.currentTimeMillis();
- }
- lastOrderingID = orderingID;
-
- return orderingID;
+ return nextOrderingId.addAndGet(1);
}
// You need to guarantee lock.acquire() over currentFile before calling this method
@@ -1656,6 +1658,10 @@
try
{
pushOpenedFile();
+ if (autoReclaim)
+ {
+ checkAndReclaimFiles();
+ }
}
catch (Exception e)
{
@@ -1794,30 +1800,6 @@
}
- private class ReclaimerTask extends TimerTask
- {
- public synchronized boolean cancel()
- {
- timer.cancel();
-
- return super.cancel();
- }
-
- public synchronized void run()
- {
- try
- {
- checkAndReclaimFiles();
- }
- catch (Exception e)
- {
- log.error("Failure in running ReclaimerTask", e);
-
- cancel();
- }
- }
- }
-
private static class PosFiles
{
private final JournalFile addFile;
@@ -1981,5 +1963,5 @@
}
}
}
-
+
}
Modified: trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java 2008-07-14 16:47:22 UTC (rev 4683)
+++ trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java 2008-07-16 05:08:16 UTC (rev 4684)
@@ -128,7 +128,7 @@
SequentialFileFactory bindingsFF = new NIOSequentialFileFactory(bindingsDir);
- bindingsJournal = new JournalImpl(1024 * 1024, 2, true, true, bindingsFF, 10000, "jbm-bindings", "bindings", 1, 1);
+ bindingsJournal = new JournalImpl(1024 * 1024, 2, true, true, bindingsFF, "jbm-bindings", "bindings", 1, 1);
String journalDir = config.getJournalDirectory();
@@ -171,7 +171,7 @@
messageJournal = new JournalImpl(config.getJournalFileSize(),
config.getJournalMinFiles(), config.isJournalSyncTransactional(),
config.isJournalSyncNonTransactional(), journalFF,
- config.getJournalTaskPeriod(), "jbm-data", "jbm", config.getJournalMaxAIO(), config.getJournalAIOTimeout());
+ "jbm-data", "jbm", config.getJournalMaxAIO(), config.getJournalAIOTimeout());
}
/* This constructor is only used for testing */
Modified: trunk/tests/src/org/jboss/messaging/tests/performance/journal/impl/JournalImplTestUnit.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/performance/journal/impl/JournalImplTestUnit.java 2008-07-14 16:47:22 UTC (rev 4683)
+++ trunk/tests/src/org/jboss/messaging/tests/performance/journal/impl/JournalImplTestUnit.java 2008-07-16 05:08:16 UTC (rev 4684)
@@ -203,7 +203,7 @@
{
Journal journal =
new JournalImpl(10 * 1024 * 1024, 10, true, true, getFileFactory(),
- 5000, "jbm-data", "jbm", 5000, 120);
+ "jbm-data", "jbm", 5000, 120);
journal.start();
@@ -265,7 +265,7 @@
Journal journal =
new JournalImpl(10 * 1024 * 1024, numFiles, true, true, getFileFactory(),
- 5000, "jbm-data", "jbm", 5000, 120);
+ "jbm-data", "jbm", 5000, 120);
journal.start();
@@ -291,7 +291,7 @@
journal =
new JournalImpl(10 * 1024 * 1024, numFiles, true, true, getFileFactory(),
- 5000, "jbm-data", "jbm", 5000, 120);
+ "jbm-data", "jbm", 5000, 120);
journal.start();
journal.load(new ArrayList<RecordInfo>(), null);
Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/AlignedJournalImplTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/AlignedJournalImplTest.java 2008-07-14 16:47:22 UTC (rev 4683)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/AlignedJournalImplTest.java 2008-07-16 05:08:16 UTC (rev 4684)
@@ -26,13 +26,17 @@
import java.nio.ByteBuffer;
import java.util.ArrayList;
+import org.easymock.EasyMock;
+import org.easymock.IAnswer;
import org.jboss.messaging.core.journal.EncodingSupport;
import org.jboss.messaging.core.journal.PreparedTransactionInfo;
import org.jboss.messaging.core.journal.RecordInfo;
import org.jboss.messaging.core.journal.SequentialFile;
+import org.jboss.messaging.core.journal.SequentialFileFactory;
import org.jboss.messaging.core.journal.impl.JournalImpl;
import org.jboss.messaging.core.logging.Logger;
import org.jboss.messaging.tests.unit.core.journal.impl.fakes.FakeSequentialFileFactory;
+import org.jboss.messaging.tests.unit.core.journal.impl.fakes.SimpleEncoding;
import org.jboss.messaging.tests.util.UnitTestCase;
import org.jboss.messaging.util.MessagingBuffer;
@@ -50,7 +54,7 @@
private int alignment = 0;
- private FakeSequentialFileFactory factory;
+ private SequentialFileFactory factory;
JournalImpl journalImpl = null;
@@ -220,12 +224,59 @@
}
+ public void testPartialDelete() throws Exception
+ {
+ final int JOURNAL_SIZE = 10000;
+
+ setupJournal(JOURNAL_SIZE, 100);
+
+ journalImpl.disableAutoReclaiming();
+
+ journalImpl.checkAndReclaimFiles();
+
+ journalImpl.debugWait();
+
+ assertEquals(2, factory.listFiles("tt").size());
+
+ log.debug("Initial:--> " + journalImpl.debug());
+
+ log.debug("_______________________________");
+
+ for (int i = 0; i < 50; i++)
+ {
+ journalImpl.appendAddRecord((long)i, (byte)1, new SimpleEncoding(1, (byte) 'x'));
+ }
+
+ journalImpl.forceMoveNextFile();
+
+ // as the request to a new file is asynchronous, we need to make sure the async requests are done
+ journalImpl.debugWait();
+
+ assertEquals(3, factory.listFiles("tt").size());
+
+ for (int i = 10; i < 50; i++)
+ {
+ journalImpl.appendDeleteRecord((long)i);
+ }
+
+ journalImpl.debugWait();
+
+ setupJournal(JOURNAL_SIZE, 100);
+
+ assertEquals(10, this.records.size());
+
+ assertEquals(3, factory.listFiles("tt").size());
+
+ }
+
public void testAddAndDeleteReclaimWithoutTransactions() throws Exception
{
- final int JOURNAL_SIZE = 51 * 1024;
+ final int JOURNAL_SIZE = 10000;
- setupJournal(JOURNAL_SIZE, 1024);
+ setupJournal(JOURNAL_SIZE, 100);
+ journalImpl.disableAutoReclaiming();
+
journalImpl.checkAndReclaimFiles();
journalImpl.debugWait();
@@ -238,7 +289,7 @@
for (int i = 0; i < 50; i++)
{
- journalImpl.appendAddRecord((long)i, (byte)1, new SimpleEncoding(200, (byte) 'x'));
+ journalImpl.appendAddRecord((long)i, (byte)1, new SimpleEncoding(1, (byte) 'x'));
}
// as the request to a new file is asynchronous, we need to make sure the async requests are done
@@ -251,16 +302,21 @@
journalImpl.appendDeleteRecord((long)i);
}
- journalImpl.appendAddRecord((long)1000, (byte)1, new SimpleEncoding(200, (byte) 'x'));
+ journalImpl.forceMoveNextFile();
+ journalImpl.appendAddRecord((long)1000, (byte)1, new SimpleEncoding(1, (byte) 'x'));
+
journalImpl.debugWait();
assertEquals(4, factory.listFiles("tt").size());
- journalImpl.checkReclaimStatus();
- log.debug(journalImpl.debug());
+ setupJournal(JOURNAL_SIZE, 100);
+ assertEquals(1, records.size());
+
+ assertEquals(1000, records.get(0).id);
+
journalImpl.checkAndReclaimFiles();
log.debug(journalImpl.debug());
@@ -311,28 +367,28 @@
}
- public void testReclaimWithInterruptedTransaction() throws Exception
+ public void testReloadWithInterruptedTransaction() throws Exception
{
final int JOURNAL_SIZE = 1100;
setupJournal(JOURNAL_SIZE, 100);
+ journalImpl.disableAutoReclaiming();
+
assertEquals(0, records.size());
assertEquals(0, transactions.size());
for (int i = 0; i < 10; i++)
{
- journalImpl.appendAddRecordTransactional(1, 1, (byte) 1, new SimpleEncoding(1,(byte) 1));
+ journalImpl.appendAddRecordTransactional(77l, 1, (byte) 1, new SimpleEncoding(1,(byte) 1));
journalImpl.forceMoveNextFile();
}
journalImpl.debugWait();
- System.out.println("files = " + journalImpl.debug());
-
assertEquals(12, factory.listFiles("tt").size());
- journalImpl.appendAddRecordTransactional(2, 1, (byte) 1, new SimpleEncoding(1,(byte) 1));
+ journalImpl.appendAddRecordTransactional(78l, 1, (byte) 1, new SimpleEncoding(1,(byte) 1));
assertEquals(12, factory.listFiles("tt").size());
@@ -340,12 +396,10 @@
assertEquals(0, records.size());
assertEquals(0, transactions.size());
-
- //System.out.println("Journal - " + journalImpl.debug());
try
{
- journalImpl.appendCommitRecord(1l);
+ journalImpl.appendCommitRecord(77l);
// This was supposed to throw an exception, as the transaction was forgotten (interrupted by a reload).
fail("Supposed to throw exception");
}
@@ -363,11 +417,13 @@
journalImpl.checkAndReclaimFiles();
+ System.out.println("Journal: " + journalImpl.debug());
+
assertEquals(2, factory.listFiles("tt").size());
}
- public void testReclaimWithCompletedTransaction() throws Exception
+ public void testReloadWithCompletedTransaction() throws Exception
{
final int JOURNAL_SIZE = 2000;
@@ -425,8 +481,31 @@
}
- public void testReclaimWithPreparedTransaction() throws Exception
+
+ public void testTotalSize() throws Exception
{
+ final int JOURNAL_SIZE = 2000;
+
+ setupJournal(JOURNAL_SIZE, 100);
+
+ assertEquals(0, records.size());
+ assertEquals(0, transactions.size());
+
+ journalImpl.appendAddRecordTransactional(1l, 2l, (byte)3, new SimpleEncoding(1900 - JournalImpl.SIZE_ADD_RECORD_TX, (byte)4));
+
+ journalImpl.appendCommitRecord(1l);
+
+ journalImpl.debugWait();
+
+ setupJournal(JOURNAL_SIZE, 100);
+
+ assertEquals(1, records.size());
+
+ }
+
+
+ public void testReloadWithPreparedTransaction() throws Exception
+ {
final int JOURNAL_SIZE = 3 * 1024;
setupJournal(JOURNAL_SIZE, 1);
@@ -495,6 +574,7 @@
}
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
@@ -540,8 +620,8 @@
journalImpl = new JournalImpl(journalSize, 2,
true, true,
- factory, 1000,
- "tt", "tt", 1000, 1000);
+ factory,
+ "tt", "tt", 1000, 10000);
journalImpl.start();
@@ -554,37 +634,5 @@
// Inner classes -------------------------------------------------
- private class SimpleEncoding implements EncodingSupport
- {
-
- private final int size;
- private final byte bytetosend;
-
- public SimpleEncoding(int size, byte bytetosend)
- {
- this.size = size;
- this.bytetosend = bytetosend;
- }
-
- public void decode(MessagingBuffer buffer)
- {
- throw new UnsupportedOperationException();
-
- }
-
- public void encode(MessagingBuffer buffer)
- {
- for (int i = 0; i < size; i++)
- {
- buffer.putByte(bytetosend);
- }
- }
-
- public int getEncodeSize()
- {
- return size;
- }
-
- }
}
Added: trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/EasyMockJournalTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/EasyMockJournalTest.java (rev 0)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/EasyMockJournalTest.java 2008-07-16 05:08:16 UTC (rev 4684)
@@ -0,0 +1,488 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+
+package org.jboss.messaging.tests.unit.core.journal.impl;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+
+import org.easymock.EasyMock;
+import org.easymock.IAnswer;
+import org.easymock.IArgumentMatcher;
+import org.jboss.messaging.core.journal.SequentialFile;
+import org.jboss.messaging.core.journal.SequentialFileFactory;
+import org.jboss.messaging.core.journal.impl.JournalImpl;
+import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.tests.unit.core.journal.impl.fakes.SimpleEncoding;
+import org.jboss.messaging.tests.util.UnitTestCase;
+
+public class EasyMockJournalTest extends UnitTestCase
+{
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ SequentialFileFactory mockFactory = null;
+ SequentialFile file1 = null;
+ SequentialFile file2 = null;
+
+ // Static --------------------------------------------------------
+
+ private static final Logger log = Logger.getLogger(EasyMockJournalTest.class);
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ public void testAppendRecord() throws Exception
+ {
+ JournalImpl journalImpl = newJournal();
+
+ EasyMock.expect(file1.write(compareByteBuffer(autoEncode(JournalImpl.ADD_RECORD,
+ /*FileID*/1,
+ /*RecordLength*/1,
+ /* ID */14l,
+ /*RecordType*/(byte)33,
+ /* body */(byte)10,
+ JournalImpl.SIZE_ADD_RECORD + 1)), EasyMock.eq(true))).andReturn(JournalImpl.SIZE_ADD_RECORD + 1);
+
+
+ EasyMock.replay(mockFactory, file1, file2);
+
+ journalImpl.appendAddRecord(14l, (byte) 33, new byte[]{ (byte) 10 });
+
+ EasyMock.verify(mockFactory, file1, file2);
+
+ EasyMock.reset(mockFactory, file1, file2);
+
+ stubValues();
+
+ EasyMock.expect(file1.write(compareByteBuffer(autoEncode(JournalImpl.ADD_RECORD,
+ /*FileID*/1,
+ /*RecordLength*/1,
+ /* ID */14l,
+ /*RecordType*/(byte)33,
+ /* body */(byte)10,
+ JournalImpl.SIZE_ADD_RECORD + 1)), EasyMock.eq(true))).andReturn(JournalImpl.SIZE_ADD_RECORD + 1);
+
+ EasyMock.replay(mockFactory, file1, file2);
+
+ journalImpl.appendAddRecord(14l, (byte)33, new SimpleEncoding(1,(byte)10));
+
+ EasyMock.verify(mockFactory, file1, file2);
+
+ }
+
+
+ public void testDeleteRecord() throws Exception
+ {
+ JournalImpl journalImpl = newJournal();
+
+ EasyMock.expect(file1.write(compareByteBuffer(autoEncode(JournalImpl.ADD_RECORD,
+ /*FileID*/1,
+ /*RecordLength*/1,
+ /* ID */14l,
+ /*RecordType*/(byte)33,
+ /* body */(byte)10,
+ JournalImpl.SIZE_ADD_RECORD + 1)), EasyMock.eq(true))).andReturn(JournalImpl.SIZE_ADD_RECORD + 1);
+
+
+ EasyMock.expect(file1.write(compareByteBuffer(autoEncode(JournalImpl.DELETE_RECORD,
+ /*FileID*/1,
+ /* ID */14l,
+ JournalImpl.SIZE_DELETE_RECORD)), EasyMock.eq(true))).andReturn(JournalImpl.SIZE_DELETE_RECORD);
+
+ EasyMock.replay(mockFactory, file1, file2);
+
+ journalImpl.appendAddRecord(14l, (byte) 33, new byte[]{ (byte) 10 });
+
+ journalImpl.appendDeleteRecord(14l);
+
+ EasyMock.verify(mockFactory, file1, file2);
+ }
+
+ public void testDeleteTransRecord() throws Exception
+ {
+ JournalImpl journalImpl = newJournal();
+
+ EasyMock.expect(file1.write(compareByteBuffer(autoEncode(JournalImpl.ADD_RECORD,
+ /*FileID*/1,
+ /*RecordLength*/1,
+ /* ID */15l,
+ /*RecordType*/(byte)33,
+ /* body */(byte)10,
+ JournalImpl.SIZE_ADD_RECORD + 1)), EasyMock.eq(true))).andReturn(JournalImpl.SIZE_ADD_RECORD + 1);
+
+
+ EasyMock.expect(file1.write(compareByteBuffer(autoEncode(JournalImpl.DELETE_RECORD_TX,
+ /*FileID*/1,
+ /* Transaction ID*/ 100l,
+ /* ID */15l,
+ JournalImpl.SIZE_DELETE_RECORD_TX)), EasyMock.eq(false))).andReturn(JournalImpl.SIZE_DELETE_RECORD_TX);
+
+ EasyMock.expect(file1.write(compareByteBuffer(autoEncode(JournalImpl.COMMIT_RECORD,
+ /*FileID*/1,
+ /* Transaction ID*/ 100l,
+ JournalImpl.SIZE_COMMIT_RECORD)), EasyMock.eq(true))).andReturn(JournalImpl.SIZE_COMMIT_RECORD);
+
+ EasyMock.replay(mockFactory, file1, file2);
+
+ journalImpl.appendAddRecord(15l, (byte) 33, new byte[]{ (byte) 10 });
+
+ journalImpl.appendDeleteRecordTransactional(100l, 15l);
+
+ journalImpl.appendCommitRecord(100l);
+
+ EasyMock.verify(mockFactory, file1, file2);
+
+
+
+
+ }
+
+ public void testAppendAndCommitRecord() throws Exception
+ {
+ JournalImpl journalImpl = newJournal();
+
+ EasyMock.expect(
+ file1.write(compareByteBuffer(autoEncode(JournalImpl.ADD_RECORD_TX,
+ /* FileID */1,
+ /* RecordLength */1,
+ /* TXID */3l,
+ /* RecordType */(byte) 33,
+ /* ID */14l,
+ /* body */(byte) 10, JournalImpl.SIZE_ADD_RECORD_TX + 1)),
+ EasyMock.eq(false))).andReturn(
+ JournalImpl.SIZE_ADD_RECORD_TX + 1);
+
+ EasyMock.expect(
+ file1.write(compareByteBuffer(autoEncode(JournalImpl.ADD_RECORD_TX,
+ /* FileID */1,
+ /* RecordLength */1,
+ /* TXID */3l,
+ /* RecordType */(byte) 33,
+ /* ID */15l,
+ /* body */(byte) 10, JournalImpl.SIZE_ADD_RECORD_TX + 1)),
+ EasyMock.eq(false))).andReturn(
+ JournalImpl.SIZE_ADD_RECORD_TX + 1);
+
+ EasyMock.expect(file1.write(compareByteBuffer(autoEncode(JournalImpl.PREPARE_RECORD,
+ /*FileID*/1,
+ /* TXID */ 3l,
+ JournalImpl.SIZE_PREPARE_RECORD)), EasyMock.eq(true))).andReturn(JournalImpl.SIZE_PREPARE_RECORD);
+
+ EasyMock.expect(file1.write(compareByteBuffer(autoEncode(JournalImpl.COMMIT_RECORD,
+ /*FileID*/1,
+ /* TXID */ 3l,
+ JournalImpl.SIZE_COMMIT_RECORD)), EasyMock.eq(true))).andReturn(JournalImpl.SIZE_COMMIT_RECORD);
+
+ EasyMock.replay(mockFactory, file1, file2);
+
+ journalImpl.appendAddRecordTransactional(3, 14l, (byte)33, new SimpleEncoding(1,(byte)10));
+
+ journalImpl.appendAddRecordTransactional(3, 15l, (byte) 33, new byte[]{ (byte) 10 });
+
+ journalImpl.appendPrepareRecord(3l);
+
+ journalImpl.appendCommitRecord(3l);
+
+ EasyMock.verify(mockFactory, file1, file2);
+ }
+
+ public void testAppendAndRollbacktRecord() throws Exception
+ {
+ JournalImpl journalImpl = newJournal();
+
+ EasyMock.expect(
+ file1.write(compareByteBuffer(autoEncode(JournalImpl.ADD_RECORD_TX,
+ /* FileID */1,
+ /* RecordLength */1,
+ /* TXID */3l,
+ /* RecordType */(byte) 33,
+ /* ID */14l,
+ /* body */(byte) 10, JournalImpl.SIZE_ADD_RECORD_TX + 1)),
+ EasyMock.eq(false))).andReturn(
+ JournalImpl.SIZE_ADD_RECORD_TX + 1);
+
+ EasyMock.expect(file1.write(compareByteBuffer(autoEncode(JournalImpl.ROLLBACK_RECORD,
+ /*FileID*/1,
+ /* TXID */ 3l,
+ JournalImpl.SIZE_ROLLBACK_RECORD)), EasyMock.eq(true))).andReturn(JournalImpl.SIZE_ROLLBACK_RECORD);
+
+ EasyMock.replay(mockFactory, file1, file2);
+
+ journalImpl.appendAddRecordTransactional(3, 14l, (byte)33, new SimpleEncoding(1,(byte)10));
+
+ journalImpl.appendRollbackRecord(3l);
+
+ EasyMock.verify(mockFactory, file1, file2);
+ }
+
+ public void testupdateRecordNonTrans() throws Exception
+ {
+ JournalImpl journalImpl = newJournal();
+
+ EasyMock.expect(file1.write(compareByteBuffer(autoEncode(JournalImpl.ADD_RECORD,
+ /* FileID */1,
+ /* RecordLength */1,
+ /* ID */15l,
+ /* RecordType */(byte)33,
+ /* body */(byte)10,
+ JournalImpl.SIZE_ADD_RECORD + 1)), EasyMock.eq(true))).andReturn(JournalImpl.SIZE_ADD_RECORD + 1);
+
+ EasyMock.expect(file1.write(compareByteBuffer(autoEncode(JournalImpl.UPDATE_RECORD,
+ /* FileID */1,
+ /* RecordLength */1,
+ /* ID */15l,
+ /* RecordType */(byte)34,
+ /* body */(byte)11,
+ JournalImpl.SIZE_UPDATE_RECORD + 1)), EasyMock.eq(true))).andReturn(JournalImpl.SIZE_UPDATE_RECORD + 1);
+
+ EasyMock.expect(file1.write(compareByteBuffer(autoEncode(JournalImpl.UPDATE_RECORD,
+ /* FileID */1,
+ /* RecordLength */1,
+ /* ID */15l,
+ /* RecordType */(byte)35,
+ /* body */(byte)12,
+ JournalImpl.SIZE_UPDATE_RECORD + 1)), EasyMock.eq(true))).andReturn(JournalImpl.SIZE_UPDATE_RECORD + 1);
+
+ EasyMock.replay(mockFactory, file1, file2);
+
+ journalImpl.appendAddRecord(15l, (byte) 33, new byte[]{ (byte) 10 });
+
+ journalImpl.appendUpdateRecord(15l, (byte)34, new SimpleEncoding(1, (byte)11));
+
+ journalImpl.appendUpdateRecord(15l, (byte)35, new byte[]{ (byte) 12});
+
+ EasyMock.verify(mockFactory, file1, file2);
+
+ }
+
+
+ public void testupdateRecordTrans() throws Exception
+ {
+ JournalImpl journalImpl = newJournal();
+
+ EasyMock.expect(file1.write(compareByteBuffer(autoEncode(JournalImpl.ADD_RECORD,
+ /* FileID */1,
+ /* RecordLength */1,
+ /* ID */15l,
+ /* RecordType */(byte)33,
+ /* body */(byte)10,
+ JournalImpl.SIZE_ADD_RECORD + 1)), EasyMock.eq(true))).andReturn(JournalImpl.SIZE_ADD_RECORD + 1);
+
+ EasyMock.expect(file1.write(compareByteBuffer(autoEncode(JournalImpl.UPDATE_RECORD_TX,
+ /* FileID */1,
+ /* RecordLength */1,
+ /* TransactionID */33l,
+ /* RecordType */ (byte)34,
+ /* ID */15l,
+ /* body */(byte)11,
+ JournalImpl.SIZE_UPDATE_RECORD_TX + 1)), EasyMock.eq(false))).andReturn(JournalImpl.SIZE_UPDATE_RECORD_TX + 1);
+
+ EasyMock.expect(file1.write(compareByteBuffer(autoEncode(JournalImpl.UPDATE_RECORD_TX,
+ /* FileID */1,
+ /* RecordLength */1,
+ /* TransactionID */33l,
+ /* RecordType */ (byte)35,
+ /* ID */15l,
+ /* body */(byte)12,
+ JournalImpl.SIZE_UPDATE_RECORD_TX + 1)), EasyMock.eq(false))).andReturn(JournalImpl.SIZE_UPDATE_RECORD_TX + 1);
+
+ EasyMock.expect(file1.write(compareByteBuffer(autoEncode(JournalImpl.COMMIT_RECORD,
+ /*FileID*/1,
+ /* Transaction ID*/ 33l,
+ JournalImpl.SIZE_COMMIT_RECORD)), EasyMock.eq(true))).andReturn(JournalImpl.SIZE_COMMIT_RECORD);
+
+ EasyMock.replay(mockFactory, file1, file2);
+
+ journalImpl.appendAddRecord(15l, (byte) 33, new byte[]{ (byte) 10 });
+
+ journalImpl.appendUpdateRecordTransactional(33l, 15l, (byte)34, new SimpleEncoding(1, (byte)11));
+
+ journalImpl.appendUpdateRecordTransactional(33l, 15l, (byte)35, new byte[]{ (byte) 12});
+
+ journalImpl.appendCommitRecord(33l);
+
+ EasyMock.verify(mockFactory, file1, file2);
+
+ }
+
+ // Private -------------------------------------------------------
+
+ private JournalImpl newJournal() throws Exception
+ {
+ mockFactory = EasyMock.createMock(SequentialFileFactory.class);
+ file1 = EasyMock.createMock(SequentialFile.class);
+ file2 = EasyMock.createMock(SequentialFile.class);
+
+ stubValues();
+
+ EasyMock.expect(mockFactory.createSequentialFile(EasyMock.isA(String.class), EasyMock.anyInt(), EasyMock.anyLong())).andReturn(file1);
+
+ EasyMock.expect(mockFactory.createSequentialFile(EasyMock.isA(String.class), EasyMock.anyInt(), EasyMock.anyLong())).andReturn(file2);
+
+ file1.open();
+
+ EasyMock.expectLastCall().anyTimes();
+
+ file2.open();
+
+ EasyMock.expectLastCall().anyTimes();
+
+ file1.close();
+
+ EasyMock.expectLastCall().anyTimes();
+
+ file2.close();
+
+ EasyMock.expectLastCall().anyTimes();
+
+ file1.fill(0, 100 * 1024, (byte) 'J');
+
+ file2.fill(0, 100 * 1024, (byte) 'J');
+
+ EasyMock.expect(file1.write(compareByteBuffer(autoEncode((int)1)), EasyMock.eq(true))).andReturn(4);
+ EasyMock.expect(file2.write(compareByteBuffer(autoEncode((int)2)), EasyMock.eq(true))).andReturn(4);
+
+ file1.position(4);
+
+ file2.position(4);
+
+ EasyMock.replay(mockFactory, file1, file2);
+
+ JournalImpl journalImpl = new JournalImpl(100 * 1024, 2,
+ true, true,
+ mockFactory,
+ "tt", "tt", 1000, 1000);
+
+ journalImpl.start();
+
+ journalImpl.load(new ArrayList(), new ArrayList());
+
+ EasyMock.verify(mockFactory, file1, file2);
+
+ EasyMock.reset(mockFactory, file1, file2);
+
+ stubValues();
+
+ return journalImpl;
+ }
+
+
+ private void stubValues() throws Exception
+ {
+ EasyMock.expect(mockFactory.getAlignment()).andStubReturn(1);
+ EasyMock.expect(mockFactory.isSupportsCallbacks()).andStubReturn(false);
+
+ EasyMock.expect(mockFactory.listFiles("tt")).andStubReturn(
+ new ArrayList<String>());
+
+ EasyMock.expect(mockFactory.newBuffer(EasyMock.anyInt())).andStubAnswer(
+ new IAnswer<ByteBuffer>()
+ {
+
+ public ByteBuffer answer() throws Throwable
+ {
+ Integer valueInt = (Integer) EasyMock.getCurrentArguments()[0];
+
+ return ByteBuffer.allocateDirect(valueInt);
+ }
+ });
+
+ EasyMock.expect(file1.calculateBlockStart(EasyMock.anyInt()))
+ .andStubAnswer(new IAnswer<Integer>()
+ {
+
+ public Integer answer() throws Throwable
+ {
+ return (Integer) EasyMock.getCurrentArguments()[0];
+ }
+ });
+
+ EasyMock.expect(file2.calculateBlockStart(EasyMock.anyInt()))
+ .andStubAnswer(new IAnswer<Integer>()
+ {
+
+ public Integer answer() throws Throwable
+ {
+ return (Integer) EasyMock.getCurrentArguments()[0];
+ }
+ });
+
+
+ EasyMock.expect(file1.getAlignment()).andStubReturn(1);
+ EasyMock.expect(file2.getAlignment()).andStubReturn(1);
+
+ }
+
+
+ private ByteBuffer compareByteBuffer(final byte expectedArray[])
+ {
+
+ EasyMock.reportMatcher(new IArgumentMatcher()
+ {
+
+ public void appendTo(StringBuffer buffer)
+ {
+ }
+
+ public boolean matches(Object argument)
+ {
+ ByteBuffer buffer = (ByteBuffer) argument;
+
+ buffer.rewind();
+ byte[] compareArray = new byte[buffer.limit()];
+ buffer.get(compareArray);
+
+ if (compareArray.length != expectedArray.length)
+ {
+ return false;
+ }
+
+ for (int i = 0; i < expectedArray.length; i++)
+ {
+ if (expectedArray[i] != compareArray[i])
+ {
+ return false;
+ }
+ }
+
+ return true;
+ }
+
+ });
+
+ return null;
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+
+
+}
Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalImplTestBase.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalImplTestBase.java 2008-07-14 16:47:22 UTC (rev 4683)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalImplTestBase.java 2008-07-16 05:08:16 UTC (rev 4684)
@@ -141,7 +141,8 @@
public void createJournal() throws Exception
{
journal =
- new JournalImpl(fileSize, minFiles, sync, sync, fileFactory, 1000, filePrefix, fileExtension, maxAIO, 120000);
+ new JournalImpl(fileSize, minFiles, sync, sync, fileFactory, filePrefix, fileExtension, maxAIO, 120000);
+ journal.disableAutoReclaiming();
}
protected void startJournal() throws Exception
Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalImplTestUnit.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalImplTestUnit.java 2008-07-14 16:47:22 UTC (rev 4683)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalImplTestUnit.java 2008-07-16 05:08:16 UTC (rev 4684)
@@ -125,7 +125,7 @@
{
try
{
- new JournalImpl(JournalImpl.MIN_FILE_SIZE - 1, 10, true, true, fileFactory, 5000, filePrefix, fileExtension, 1, 120);
+ new JournalImpl(JournalImpl.MIN_FILE_SIZE - 1, 10, true, true, fileFactory, filePrefix, fileExtension, 1, 120);
fail("Should throw exception");
}
@@ -136,7 +136,7 @@
try
{
- new JournalImpl(10 * 1024, 1, true, true, fileFactory, 5000, filePrefix, fileExtension, 1, 120);
+ new JournalImpl(10 * 1024, 1, true, true, fileFactory, filePrefix, fileExtension, 1, 120);
fail("Should throw exception");
}
@@ -147,7 +147,7 @@
try
{
- new JournalImpl(10 * 1024, 10, true, true, null, 5000, filePrefix, fileExtension, 1, 120);
+ new JournalImpl(10 * 1024, 10, true, true, null, filePrefix, fileExtension, 1, 120);
fail("Should throw exception");
}
@@ -158,21 +158,10 @@
try
{
- new JournalImpl(10 * 1024, 10, true, true, fileFactory, JournalImpl.MIN_TASK_PERIOD - 1, filePrefix, fileExtension, 1, 120);
+ new JournalImpl(10 * 1024, 10, true, true, fileFactory, null, fileExtension, 1, 120);
fail("Should throw exception");
}
- catch (IllegalArgumentException e)
- {
- //Ok
- }
-
- try
- {
- new JournalImpl(10 * 1024, 10, true, true, fileFactory, 5000, null, fileExtension, 1, 120);
-
- fail("Should throw exception");
- }
catch (NullPointerException e)
{
//Ok
@@ -180,7 +169,7 @@
try
{
- new JournalImpl(10 * 1024, 10, true, true, fileFactory, 5000, filePrefix, null, 1, 120);
+ new JournalImpl(10 * 1024, 10, true, true, fileFactory, filePrefix, null, 1, 120);
fail("Should throw exception");
}
@@ -191,7 +180,7 @@
try
{
- new JournalImpl(10 * 1024, 10, true, true, fileFactory, 5000, filePrefix, null, 0, 120);
+ new JournalImpl(10 * 1024, 10, true, true, fileFactory, filePrefix, null, 0, 120);
fail("Should throw exception");
}
@@ -202,7 +191,7 @@
try
{
- new JournalImpl(10 * 1024, 10, true, true, fileFactory, 5000, filePrefix, fileExtension, 0, -1);
+ new JournalImpl(10 * 1024, 10, true, true, fileFactory, filePrefix, fileExtension, 0, -1);
fail("Should throw exception");
}
@@ -943,104 +932,115 @@
assertEquals(1, journal.getOpenedFilesCount());
}
- public void testReclaimTransactionalSimple() throws Exception
- {
- setup(2, calculateRecordSize(JournalImpl.SIZE_HEADER, getAlignment()) +
- calculateRecordSize(recordLength, getAlignment()), true);
- createJournal();
- startJournal();
- load();
-
- List<String> files1 = fileFactory.listFiles(fileExtension);
-
- assertEquals(2, files1.size());
-
- assertEquals(0, journal.getDataFilesCount());
- assertEquals(0, journal.getFreeFilesCount());
- assertEquals(1, journal.getOpenedFilesCount());
- assertEquals(0, journal.getIDMapSize());
-
- addTx(1, 1); // in file 0
-
- deleteTx(1, 1); // in file 1
-
- List<String> files2 = fileFactory.listFiles(fileExtension);
-
- assertEquals(3, files2.size());
- assertEquals(1, journal.getOpenedFilesCount());
-
- assertEquals(1, journal.getDataFilesCount());
- assertEquals(0, journal.getFreeFilesCount());
- assertEquals(0, journal.getIDMapSize());
-
- //Make sure we move on to the next file
-
- addWithSize(recordLength - JournalImpl.SIZE_ADD_RECORD, 2); // in file 2
-
- List<String> files3 = fileFactory.listFiles(fileExtension);
-
- assertEquals(4, files3.size());
- assertEquals(1, journal.getOpenedFilesCount());
-
- log.debug("data files count "+ journal.getDataFilesCount());
- log.debug("free files count "+ journal.getFreeFilesCount());
-
- assertEquals(2, journal.getDataFilesCount());
- assertEquals(0, journal.getFreeFilesCount());
- assertEquals(1, journal.getIDMapSize());
-
- commit(1); // in file 3
-
- List<String> files4 = fileFactory.listFiles(fileExtension);
-
- assertEquals(5, files4.size());
- assertEquals(1, journal.getOpenedFilesCount());
-
- assertEquals(3, journal.getDataFilesCount());
- assertEquals(0, journal.getFreeFilesCount());
- assertEquals(1, journal.getIDMapSize());
-
- //Make sure we move on to the next file
-
- addWithSize(recordLength - JournalImpl.SIZE_ADD_RECORD, 3); // in file 4
-
- List<String> files5 = fileFactory.listFiles(fileExtension);
-
- assertEquals(6, files5.size());
- assertEquals(1, journal.getOpenedFilesCount());
-
- assertEquals(4, journal.getDataFilesCount());
- assertEquals(0, journal.getFreeFilesCount());
- assertEquals(2, journal.getIDMapSize());
-
- checkAndReclaimFiles();
-
- List<String> files6 = fileFactory.listFiles(fileExtension);
-
- //Three should get deleted (files 0, 1, 3)
-
- assertEquals(3, files6.size());
- assertEquals(1, journal.getOpenedFilesCount());
-
- assertEquals(1, journal.getDataFilesCount());
- assertEquals(0, journal.getFreeFilesCount());
- assertEquals(2, journal.getIDMapSize());
+ public void testReclaimTransactionalSimple() throws Exception
+ {
+ setup(2, calculateRecordSize(JournalImpl.SIZE_HEADER, getAlignment()) +
+ calculateRecordSize(recordLength, getAlignment()), true);
+ createJournal();
+ startJournal();
+ load();
+ List<String> files1 = fileFactory.listFiles(fileExtension);
+
+ assertEquals(2, files1.size());
+
+ assertEquals(0, journal.getDataFilesCount());
+ assertEquals(0, journal.getFreeFilesCount());
+ assertEquals(1, journal.getOpenedFilesCount());
+ assertEquals(0, journal.getIDMapSize());
+
+ addTx(1, 1); // in file 0
+
+ deleteTx(1, 1); // in file 1
+
+ journal.debugWait();
+
+ System.out.println("journal tmp :" + journal.debug());
+
+ List<String> files2 = fileFactory.listFiles(fileExtension);
+
+ assertEquals(3, files2.size());
+ assertEquals(1, journal.getOpenedFilesCount());
+
+ assertEquals(1, journal.getDataFilesCount());
+ assertEquals(0, journal.getFreeFilesCount());
+ assertEquals(0, journal.getIDMapSize());
+
+ //Make sure we move on to the next file
+
+ addWithSize(recordLength - JournalImpl.SIZE_ADD_RECORD, 2); // in file 2
+
+ journal.debugWait();
+
+ System.out.println("journal tmp2 :" + journal.debug());
- //Now restart
-
- stopJournal();
- createJournal();
- startJournal();
- loadAndCheck();
-
- assertEquals(3, files6.size());
- assertEquals(1, journal.getOpenedFilesCount());
-
- assertEquals(1, journal.getDataFilesCount());
- assertEquals(0, journal.getFreeFilesCount());
- assertEquals(2, journal.getIDMapSize());
- }
-
+ List<String> files3 = fileFactory.listFiles(fileExtension);
+
+ assertEquals(4, files3.size());
+ assertEquals(1, journal.getOpenedFilesCount());
+
+ log.debug("data files count "+ journal.getDataFilesCount());
+ log.debug("free files count "+ journal.getFreeFilesCount());
+
+ assertEquals(2, journal.getDataFilesCount());
+ assertEquals(0, journal.getFreeFilesCount());
+ assertEquals(1, journal.getIDMapSize());
+
+ commit(1); // in file 3
+
+ List<String> files4 = fileFactory.listFiles(fileExtension);
+
+ assertEquals(5, files4.size());
+ assertEquals(1, journal.getOpenedFilesCount());
+
+ assertEquals(3, journal.getDataFilesCount());
+ assertEquals(0, journal.getFreeFilesCount());
+ assertEquals(1, journal.getIDMapSize());
+
+ //Make sure we move on to the next file
+
+ addWithSize(recordLength - JournalImpl.SIZE_ADD_RECORD, 3); // in file 4
+
+ List<String> files5 = fileFactory.listFiles(fileExtension);
+
+ assertEquals(6, files5.size());
+ assertEquals(1, journal.getOpenedFilesCount());
+
+ assertEquals(4, journal.getDataFilesCount());
+ assertEquals(0, journal.getFreeFilesCount());
+ assertEquals(2, journal.getIDMapSize());
+
+ checkAndReclaimFiles();
+
+ List<String> files6 = fileFactory.listFiles(fileExtension);
+
+ //Three should get deleted (files 0, 1, 3)
+
+ assertEquals(3, files6.size());
+ assertEquals(1, journal.getOpenedFilesCount());
+
+ assertEquals(1, journal.getDataFilesCount());
+ assertEquals(0, journal.getFreeFilesCount());
+ assertEquals(2, journal.getIDMapSize());
+
+ //Now restart
+
+ journal.checkAndReclaimFiles();
+
+ System.out.println("journal:" + journal.debug());
+
+ stopJournal(false);
+ createJournal();
+ startJournal();
+ loadAndCheck();
+
+ assertEquals(3, files6.size());
+ assertEquals(1, journal.getOpenedFilesCount());
+
+ assertEquals(1, journal.getDataFilesCount());
+ assertEquals(0, journal.getFreeFilesCount());
+ assertEquals(2, journal.getIDMapSize());
+ }
+
public void testAddDeleteCommitTXIDMap1() throws Exception
{
setup(10, 10 * 1024, true);
Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/ReclaimerTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/ReclaimerTest.java 2008-07-14 16:47:22 UTC (rev 4683)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/ReclaimerTest.java 2008-07-16 05:08:16 UTC (rev 4684)
@@ -768,7 +768,7 @@
return 0;
}
- public long getOrderingID()
+ public int getOrderingID()
{
return 0;
}
Added: trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/fakes/SimpleEncoding.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/fakes/SimpleEncoding.java (rev 0)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/fakes/SimpleEncoding.java 2008-07-16 05:08:16 UTC (rev 4684)
@@ -0,0 +1,84 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+
+package org.jboss.messaging.tests.unit.core.journal.impl.fakes;
+
+import org.jboss.messaging.core.journal.EncodingSupport;
+import org.jboss.messaging.util.MessagingBuffer;
+
+/**
+ * Provides a SimpleEncoding with a Fake Payload
+ * @author <a href="mailto:clebert.suconic at jboss.com">Clebert Suconic</a>
+ *
+ */
+public class SimpleEncoding implements EncodingSupport
+{
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+ private final int size;
+ private final byte bytetosend;
+
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ public SimpleEncoding(int size, byte bytetosend)
+ {
+ this.size = size;
+ this.bytetosend = bytetosend;
+ }
+
+
+ // Public --------------------------------------------------------
+ public void decode(MessagingBuffer buffer)
+ {
+ throw new UnsupportedOperationException();
+
+ }
+
+ public void encode(MessagingBuffer buffer)
+ {
+ for (int i = 0; i < size; i++)
+ {
+ buffer.putByte(bytetosend);
+ }
+ }
+
+ public int getEncodeSize()
+ {
+ return size;
+ }
+
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/persistence/impl/journal/JournalStorageManagerTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/persistence/impl/journal/JournalStorageManagerTest.java 2008-07-14 16:47:22 UTC (rev 4683)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/persistence/impl/journal/JournalStorageManagerTest.java 2008-07-16 05:08:16 UTC (rev 4684)
@@ -800,7 +800,6 @@
assertEquals(config.getJournalMinFiles(), messageJournal.getMinFiles());
assertEquals(config.isJournalSyncTransactional(), messageJournal.isSyncTransactional());
assertEquals(config.isJournalSyncNonTransactional(), messageJournal.isSyncNonTransactional());
- assertEquals(config.getJournalTaskPeriod(), messageJournal.getTaskPeriod());
assertEquals("jbm-data", messageJournal.getFilePrefix());
assertEquals("jbm", messageJournal.getFileExtension());
assertEquals(config.getJournalMaxAIO(), messageJournal.getMaxAIO());
@@ -814,7 +813,6 @@
assertEquals(2, bindingsJournal.getMinFiles());
assertEquals(true, bindingsJournal.isSyncTransactional());
assertEquals(true, bindingsJournal.isSyncNonTransactional());
- assertEquals(10000, bindingsJournal.getTaskPeriod());
assertEquals("jbm-bindings", bindingsJournal.getFilePrefix());
assertEquals("bindings", bindingsJournal.getFileExtension());
assertEquals(1, bindingsJournal.getMaxAIO());
Modified: trunk/tests/src/org/jboss/messaging/tests/util/UnitTestCase.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/util/UnitTestCase.java 2008-07-14 16:47:22 UTC (rev 4683)
+++ trunk/tests/src/org/jboss/messaging/tests/util/UnitTestCase.java 2008-07-16 05:08:16 UTC (rev 4684)
@@ -155,6 +155,85 @@
// Protected -----------------------------------------------------
+ protected byte[] autoEncode(Object... args)
+ {
+
+ int size = 0;
+
+ for (Object arg : args)
+ {
+ if (arg instanceof Byte)
+ {
+ size++;
+ }
+ else if (arg instanceof Boolean)
+ {
+ size++;
+ }
+ else if (arg instanceof Integer)
+ {
+ size += 4;
+ }
+ else if (arg instanceof Long)
+ {
+ size += 8;
+ }
+ else if (arg instanceof Float)
+ {
+ size += 4;
+ }
+ else if (arg instanceof Double)
+ {
+ size += 8;
+ }
+ else
+ {
+ throw new IllegalArgumentException(
+ "method autoEncode doesn't know how to convert "
+ + arg.getClass() + " yet");
+ }
+ }
+
+ ByteBuffer buffer = ByteBuffer.allocate(size);
+
+ for (Object arg : args)
+ {
+ if (arg instanceof Byte)
+ {
+ buffer.put(((Byte) arg).byteValue());
+ }
+ else if (arg instanceof Boolean)
+ {
+ Boolean b = (Boolean) arg;
+ buffer.put((byte) (b.booleanValue() ? 1 : 0));
+ }
+ else if (arg instanceof Integer)
+ {
+ buffer.putInt(((Integer) arg).intValue());
+ }
+ else if (arg instanceof Long)
+ {
+ buffer.putLong(((Long) arg).longValue());
+ }
+ else if (arg instanceof Float)
+ {
+ buffer.putFloat(((Float) arg).floatValue());
+ }
+ else if (arg instanceof Double)
+ {
+ buffer.putDouble(((Double) arg).doubleValue());
+ }
+ else
+ {
+ throw new IllegalArgumentException(
+ "method autoEncode doesn't know how to convert "
+ + arg.getClass() + " yet");
+ }
+ }
+
+ return buffer.array();
+ }
+
protected boolean deleteDirectory(File directory)
{
More information about the jboss-cvs-commits
mailing list