[jboss-cvs] JBoss Messaging SVN: r4718 - in trunk: src/main/org/jboss/messaging/core/journal/impl and 2 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Wed Jul 23 19:03:13 EDT 2008
Author: clebert.suconic at jboss.com
Date: 2008-07-23 19:03:13 -0400 (Wed, 23 Jul 2008)
New Revision: 4718
Modified:
trunk/build-messaging.xml
trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java
trunk/src/main/org/jboss/messaging/core/journal/impl/Reclaimer.java
trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/AlignedJournalImplTest.java
trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java
Log:
More journal improvements and tests
Modified: trunk/build-messaging.xml
===================================================================
--- trunk/build-messaging.xml 2008-07-23 14:53:11 UTC (rev 4717)
+++ trunk/build-messaging.xml 2008-07-23 23:03:13 UTC (rev 4718)
@@ -781,7 +781,7 @@
</junit>
</target>
- <target name="all-tests" depends="unit-tests, integration-tests, timing-tests, performance-tests, concurrent-tests, jms-tests"/>
+ <target name="all-tests" depends="unit-tests, integration-tests, timing-tests, performance-tests, concurrent-tests, stress-tests, jms-tests"/>
<target name="compile-reports">
<mkdir dir="${test.stylesheets.dir}"/>
Modified: trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java 2008-07-23 14:53:11 UTC (rev 4717)
+++ trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java 2008-07-23 23:03:13 UTC (rev 4718)
@@ -63,8 +63,10 @@
/**
*
- * A JournalImpl
+ * <p>A JournalImpl</p
*
+ * <p>WIKI Page: <a href="http://wiki.jboss.org/auth/wiki/JBossMessaging2Journal"> http://wiki.jboss.org/auth/wiki/JBossMessaging2Journal</a></p>
+ *
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
* @author <a href="mailto:clebert.suconic at jboss.com">Clebert Suconic</a>
*
@@ -183,7 +185,6 @@
*/
//TODO - improve concurrency by allowing concurrent accesses if doesn't change current file
- // this locks access to currentFile
private final Semaphore lock = new Semaphore(1, true);
private volatile JournalFile currentFile ;
@@ -728,6 +729,9 @@
if (readFileId != file.getOrderingID())
{
+ // If a file has damaged records, we make it a dataFile, and the next reclaiming will fix it
+ hasData = true;
+
bb.position(pos + 1);
//log.info("Record read at position " + pos + " doesn't belong to this current journal file, ignoring it!");
continue;
@@ -802,6 +806,8 @@
if (checkSize != variableSize + recordSize)
{
log.warn("Record at position " + pos + " file:" + file.getFile().getFileName() + " is corrupted and it is being ignored");
+ // If a file has damaged records, we make it a dataFile, and the next reclaiming will fix it
+ hasData = true;
bb.position(pos + SIZE_BYTE);
continue;
}
@@ -992,7 +998,7 @@
else
{
log.warn("Transaction " + transactionID + " is missing elements so the transaction is being ignored");
- journalTransaction.rollback(file);
+ journalTransaction.forget();
}
hasData = true;
@@ -1062,10 +1068,13 @@
//FIXME - size() involves a scan
int filesToCreate = minFiles - (dataFiles.size() + freeFiles.size());
- for (int i = 0; i < filesToCreate; i++)
+ if (filesToCreate > 0)
{
- // Keeping all files opened can be very costly (mainly on AIO)
- freeFiles.add(createFile(false));
+ for (int i = 0; i < filesToCreate; i++)
+ {
+ // Keeping all files opened can be very costly (mainly on AIO)
+ freeFiles.add(createFile(false));
+ }
}
//The current file is the last one
@@ -1167,12 +1176,19 @@
builder.append("FreeFile:" + file + "\n");
}
- builder.append("CurrentFile:" + currentFile+ " posCounter = " + currentFile.getPosCount() + "\n");
-
- if (currentFile instanceof JournalFileImpl)
+ if (currentFile != null)
{
- builder.append(((JournalFileImpl)currentFile).debug());
+ builder.append("CurrentFile:" + currentFile+ " posCounter = " + currentFile.getPosCount() + "\n");
+
+ if (currentFile instanceof JournalFileImpl)
+ {
+ builder.append(((JournalFileImpl)currentFile).debug());
+ }
}
+ else
+ {
+ builder.append("CurrentFile: No current file at this point!");
+ }
builder.append("#Opened Files:" + this.openedFiles.size());
@@ -1229,26 +1245,8 @@
{
//Re-initialise it
- int newOrderingID = generateOrderingID();
+ JournalFile jf = reinitializeFile(file);
- SequentialFile sf = file.getFile();
-
- sf.open();
-
- ByteBuffer bb = fileFactory.newBuffer(SIZE_INT);
-
- bb.putInt(newOrderingID);
-
- int bytesWritten = sf.write(bb, true);
-
- JournalFile jf = new JournalFileImpl(sf, newOrderingID);
-
- sf.position(bytesWritten);
-
- jf.setOffset(bytesWritten);
-
- sf.close();
-
freeFiles.add(jf);
}
else
@@ -1262,7 +1260,7 @@
}
}
}
-
+
public int getDataFilesCount()
{
return dataFiles.size();
@@ -1404,6 +1402,31 @@
// Private -----------------------------------------------------------------------------
+ // Discard the old JournalFile and set it with a new ID
+ private JournalFile reinitializeFile(JournalFile file) throws Exception
+ {
+ int newOrderingID = generateOrderingID();
+
+ SequentialFile sf = file.getFile();
+
+ sf.open();
+
+ ByteBuffer bb = fileFactory.newBuffer(SIZE_INT);
+
+ bb.putInt(newOrderingID);
+
+ int bytesWritten = sf.write(bb, true);
+
+ JournalFile jf = new JournalFileImpl(sf, newOrderingID);
+
+ sf.position(bytesWritten);
+
+ jf.setOffset(bytesWritten);
+
+ sf.close();
+ return jf;
+ }
+
@SuppressWarnings("unchecked")
private Pair<Integer, Integer>[] readReferencesOnTransaction(int variableSize, ByteBuffer bb)
{
Modified: trunk/src/main/org/jboss/messaging/core/journal/impl/Reclaimer.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/impl/Reclaimer.java 2008-07-23 14:53:11 UTC (rev 4717)
+++ trunk/src/main/org/jboss/messaging/core/journal/impl/Reclaimer.java 2008-07-23 23:03:13 UTC (rev 4718)
@@ -27,21 +27,23 @@
/**
*
- * A ReclaimerTest
+ * <p>A ReclaimerTest</p>
*
- * The journal consists of an ordered list of journal files Fn where 0 <= n <= N
+ * <p>The journal consists of an ordered list of journal files Fn where 0 <= n <= N</p>
*
- * A journal file can contain either positives (pos) or negatives (neg)
+ * <p>A journal file can contain either positives (pos) or negatives (neg)</p>
*
- * (Positives correspond either to adds or updates, and negatives correspond to deletes).
+ * <p>(Positives correspond either to adds or updates, and negatives correspond to deletes).</p>
*
- * A file Fn can be deleted if, and only if the following criteria are satisified
+ * <p>A file Fn can be deleted if, and only if the following criteria are satisified</p>
*
- * 1) All pos in a file Fn, must have corresponding neg in any file Fm where m >= n.
+ * <p>1) All pos in a file Fn, must have corresponding neg in any file Fm where m >= n.</p>
*
- * 2) All pos that correspond to any neg in file Fn, must all live in any file Fm where 0 <= m <= n
- * which are also marked for deletion in the same pass of the algorithm.
+ * <p>2) All pos that correspond to any neg in file Fn, must all live in any file Fm where 0 <= m <= n
+ * which are also marked for deletion in the same pass of the algorithm.</p>
*
+ * <p>WIKI Page: <a href="http://wiki.jboss.org/wiki/JBossMessaging2Reclaiming">http://wiki.jboss.org/wiki/JBossMessaging2Reclaiming</a></p>
+ *
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
*
*/
Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/AlignedJournalImplTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/AlignedJournalImplTest.java 2008-07-23 14:53:11 UTC (rev 4717)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/AlignedJournalImplTest.java 2008-07-23 23:03:13 UTC (rev 4718)
@@ -25,6 +25,7 @@
import java.nio.ByteBuffer;
import java.util.ArrayList;
+import java.util.List;
import org.jboss.messaging.core.journal.EncodingSupport;
import org.jboss.messaging.core.journal.PreparedTransactionInfo;
@@ -70,8 +71,7 @@
public void testBasicAlignment() throws Exception
{
- FakeSequentialFileFactory factory = new FakeSequentialFileFactory(200,
- true, false);
+ FakeSequentialFileFactory factory = new FakeSequentialFileFactory(200, true);
SequentialFile file = factory.createSequentialFile("test1", 100, 10000);
@@ -438,8 +438,6 @@
journalImpl.checkAndReclaimFiles();
- System.out.println("Journal: " + journalImpl.debug());
-
assertEquals(2, factory.listFiles("tt").size());
}
@@ -570,19 +568,274 @@
}
- public void testReloadInvalidVariableSize() throws Exception
+ public void testReloadInvalidCheckSizeOnTransaction() throws Exception
{
- // Test to be written
+ final int JOURNAL_SIZE = 20000;
+
+ setupJournal(JOURNAL_SIZE, 100);
+
+ assertEquals(2, factory.listFiles("tt").size());
+
+
+ assertEquals(0, records.size());
+ assertEquals(0, transactions.size());
+
+ for (int i = 0; i < 20 ; i++)
+ {
+ journalImpl.appendAddRecordTransactional(1l, (long)i, (byte)0, new SimpleEncoding(1, (byte)15));
+ journalImpl.forceMoveNextFile();
+ }
+
+ journalImpl.forceMoveNextFile();
+
+ journalImpl.appendCommitRecord(1l);
+
+ SequentialFile file = factory.createSequentialFile("tt-1.tt", 10000, 5000);
+
+ file.open();
+
+ ByteBuffer buffer = ByteBuffer.allocate(100);
+
+ // Messing up with the first record (removing the position)
+ file.position(100);
+
+ file.read(buffer);
+
+ // jumping RecordType, FileId, TransactionID, RecordID, VariableSize, RecordType, RecordBody (that we know it is 1 )
+ buffer.position(1 + 4 + 8 + 8 + 4 + 1 + 1);
+
+ int posCheckSize = buffer.position();
+
+ assertEquals(JournalImpl.SIZE_ADD_RECORD_TX + 1, buffer.getInt());
+
+ buffer.position(posCheckSize);
+
+ buffer.putInt(-1);
+
+ buffer.rewind();
+
+ // Changing the check size, so reload will ignore this record
+ file.position(100);
+
+ file.write(buffer, true);
+
+ file.close();
+
+ setupJournal(JOURNAL_SIZE, 100);
+
+ assertEquals(0, records.size());
+
+ journalImpl.checkAndReclaimFiles();
+
+ assertEquals(0, journalImpl.getDataFilesCount());
+
+ assertEquals(2, factory.listFiles("tt").size());
+
}
+
+ public void testPartiallyBrokenFile() throws Exception
+ {
+ final int JOURNAL_SIZE = 20000;
+
+ setupJournal(JOURNAL_SIZE, 100);
+
+ assertEquals(2, factory.listFiles("tt").size());
+
+
+ assertEquals(0, records.size());
+ assertEquals(0, transactions.size());
+
+ for (int i = 0; i < 20 ; i++)
+ {
+ journalImpl.appendAddRecordTransactional(1l, (long)i, (byte)0, new SimpleEncoding(1, (byte)15));
+ journalImpl.appendAddRecordTransactional(2l, (long)i + 20l, (byte)0, new SimpleEncoding(1, (byte)15));
+ journalImpl.forceMoveNextFile();
+ }
+
+
+ journalImpl.forceMoveNextFile();
+
+ journalImpl.appendCommitRecord(1l);
+
+ journalImpl.appendCommitRecord(2l);
+
+ SequentialFile file = factory.createSequentialFile("tt-1.tt", 10000, 5000);
+
+ file.open();
+
+ ByteBuffer buffer = ByteBuffer.allocate(100);
+
+ // Messing up with the first record (removing the position)
+ file.position(100);
+
+ file.read(buffer);
+
+ // jumping RecordType, FileId, TransactionID, RecordID, VariableSize, RecordType, RecordBody (that we know it is 1 )
+ buffer.position(1 + 4 + 8 + 8 + 4 + 1 + 1);
+
+ int posCheckSize = buffer.position();
+
+ assertEquals(JournalImpl.SIZE_ADD_RECORD_TX + 1, buffer.getInt());
+
+ buffer.position(posCheckSize);
+
+ buffer.putInt(-1);
+
+ buffer.rewind();
+
+ // Changing the check size, so reload will ignore this record
+ file.position(100);
+
+ file.write(buffer, true);
+
+ file.close();
+
+ setupJournal(JOURNAL_SIZE, 100);
+
+ assertEquals(20, records.size());
+
+ journalImpl.checkAndReclaimFiles();
+
+ assertEquals(20, journalImpl.getDataFilesCount());
+
+ assertEquals(22, factory.listFiles("tt").size());
+
+ }
+
+ public void testReduceFreeFiles() throws Exception
+ {
+ final int JOURNAL_SIZE = 20000;
+
+ setupJournal(JOURNAL_SIZE, 100, 10);
+
+ assertEquals(10, factory.listFiles("tt").size());
+
+ setupJournal(JOURNAL_SIZE, 100, 2);
+
+ assertEquals(10, factory.listFiles("tt").size());
+
+ for (int i = 0; i < 10; i++)
+ {
+ journalImpl.appendAddRecord(i, (byte)0, new SimpleEncoding(1,(byte)0));
+ journalImpl.forceMoveNextFile();
+ }
+
+ setupJournal(JOURNAL_SIZE, 100, 2);
+
+ assertEquals(10, records.size());
+
+ assertEquals(12, factory.listFiles("tt").size());
+
+ for (int i = 0; i < 10; i++)
+ {
+ journalImpl.appendDeleteRecord(i);
+ }
+
+ journalImpl.forceMoveNextFile();
+
+ journalImpl.checkAndReclaimFiles();
+
+ setupJournal(JOURNAL_SIZE, 100, 2);
+
+ assertEquals(0, records.size());
+
+ assertEquals(2, factory.listFiles("tt").size());
+ }
+
public void testReloadIncompleteTransaction() throws Exception
{
- // We should miss one record (hole) on the transaction
+ final int JOURNAL_SIZE = 20000;
+
+ setupJournal(JOURNAL_SIZE, 100);
+
+ assertEquals(2, factory.listFiles("tt").size());
+
+
+ assertEquals(0, records.size());
+ assertEquals(0, transactions.size());
+
+ for (int i = 0; i < 10 ; i++)
+ {
+ journalImpl.appendAddRecordTransactional(1l, (long)i, (byte)0, new SimpleEncoding(1, (byte)15));
+ journalImpl.forceMoveNextFile();
+ }
+
+
+ for (int i = 10; i < 20 ; i++)
+ {
+ journalImpl.appendAddRecordTransactional(1l, (long)i, (byte)0, new SimpleEncoding(1, (byte)15));
+ journalImpl.forceMoveNextFile();
+ }
+
+ journalImpl.forceMoveNextFile();
+
+ journalImpl.appendCommitRecord(1l);
+
+ SequentialFile file = factory.createSequentialFile("tt-1.tt", 10000, 5000);
+
+ file.open();
+
+ ByteBuffer buffer = ByteBuffer.allocate(100);
+
+ // Messing up with the first record (removing the position)
+ file.position(100);
+
+ file.read(buffer);
+
+ buffer.position(1);
+
+ buffer.putInt(-1);
+
+ buffer.rewind();
+
+ // Messing up with the first record (changing the fileID, so Journal reload will think the record came from a different journal usage)
+ file.position(100);
+
+ file.write(buffer, true);
+
+ file.close();
+
+ setupJournal(JOURNAL_SIZE, 100);
+
+ assertEquals(0, records.size());
+
+ journalImpl.checkAndReclaimFiles();
+
+ assertEquals(0, journalImpl.getDataFilesCount());
+
+ assertEquals(2, factory.listFiles("tt").size());
+
}
public void testAsynchronousCommit() throws Exception
{
- // We should miss one record (hole) on the transaction
+// final int JOURNAL_SIZE = 20000;
+//
+// setupJournal(JOURNAL_SIZE, 100, 5);
+//
+// assertEquals(2, factory.listFiles("tt").size());
+//
+// assertEquals(0, records.size());
+// assertEquals(0, transactions.size());
+//
+// for (int i = 0; i < 10 ; i++)
+// {
+// journalImpl.appendAddRecordTransactional(1l, (long)i, (byte)0, new SimpleEncoding(1, (byte)15));
+// journalImpl.forceMoveNextFile();
+// }
+//
+//
+// for (int i = 10; i < 20 ; i++)
+// {
+// journalImpl.appendAddRecordTransactional(1l, (long)i, (byte)0, new SimpleEncoding(1, (byte)15));
+// journalImpl.forceMoveNextFile();
+// }
+//
+// journalImpl.forceMoveNextFile();
+//
+// journalImpl.appendCommitRecord(1l);
+//
}
public void testAsynchronousRollback() throws Exception
@@ -590,16 +843,11 @@
// We should miss one record (hole) on the transaction
}
- public void testGarbageBetweenRecords() throws Exception
- {
- // We should miss one record (hole) on the transaction
- }
-
public void testPrepareAloneOnSeparatedFile() throws Exception
{
final int JOURNAL_SIZE = 20000;
- setupJournal(JOURNAL_SIZE, 100, 5);
+ setupJournal(JOURNAL_SIZE, 100);
assertEquals(0, records.size());
assertEquals(0, transactions.size());
@@ -624,7 +872,7 @@
journalImpl.forceMoveNextFile();
journalImpl.checkAndReclaimFiles();
- setupJournal(JOURNAL_SIZE, 100, 5);
+ setupJournal(JOURNAL_SIZE, 100);
assertEquals(1, records.size());
}
@@ -633,7 +881,7 @@
{
final int JOURNAL_SIZE = 20000;
- setupJournal(JOURNAL_SIZE, 100, 5);
+ setupJournal(JOURNAL_SIZE, 100);
assertEquals(0, records.size());
assertEquals(0, transactions.size());
@@ -662,7 +910,7 @@
journalImpl.forceMoveNextFile();
journalImpl.checkAndReclaimFiles();
- setupJournal(JOURNAL_SIZE, 100, 5);
+ setupJournal(JOURNAL_SIZE, 100);
assertEquals(40, records.size());
@@ -687,8 +935,6 @@
journalImpl.debugWait();
- //System.out.println("files = " + journalImpl.debug());
-
journalImpl.appendPrepareRecord(1l);
assertEquals(12, factory.listFiles("tt").size());
@@ -733,6 +979,8 @@
assertEquals(0, transactions.size());
journalImpl.forceMoveNextFile();
+
+ // Reclaiming should still be able to reclaim a file if a transaction was ignored
journalImpl.checkAndReclaimFiles();
assertEquals(2, factory.listFiles("tt").size());
@@ -790,7 +1038,7 @@
if (factory == null)
{
factory = new FakeSequentialFileFactory(alignment,
- true, false);
+ true);
}
if (journalImpl != null)
Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java 2008-07-23 14:53:11 UTC (rev 4717)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java 2008-07-23 23:03:13 UTC (rev 4718)
@@ -55,7 +55,7 @@
private final boolean supportsCallback;
- private final boolean holdCallbacks;
+ private volatile boolean holdCallbacks;
private final List<Runnable> callbacksInHold;
@@ -63,24 +63,16 @@
// Constructors --------------------------------------------------
- public FakeSequentialFileFactory(final int alignment, final boolean supportsCallback, final boolean holdCallback)
+ public FakeSequentialFileFactory(final int alignment, final boolean supportsCallback)
{
this.alignment = alignment;
this.supportsCallback = supportsCallback;
- this.holdCallbacks = holdCallback;
- if (holdCallbacks)
- {
- callbacksInHold = new ArrayList<Runnable>();
- }
- else
- {
- callbacksInHold = null;
- }
+ callbacksInHold = new ArrayList<Runnable>();
}
public FakeSequentialFileFactory()
{
- this(1, false, false);
+ this(1, false);
}
@@ -151,6 +143,16 @@
return ByteBuffer.wrap(bytes);
}
+ public boolean isHoldCallbacks()
+ {
+ return holdCallbacks;
+ }
+
+ public void setHoldCallbacks(boolean holdCallbacks)
+ {
+ this.holdCallbacks = holdCallbacks;
+ }
+
public void flushAllCallbacks()
{
for (Runnable action : callbacksInHold)
More information about the jboss-cvs-commits
mailing list