[jboss-cvs] JBoss Messaging SVN: r4924 - in trunk: src/main/org/jboss/messaging/core/journal/impl and 6 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Tue Sep 9 18:41:34 EDT 2008
Author: clebert.suconic at jboss.com
Date: 2008-09-09 18:41:33 -0400 (Tue, 09 Sep 2008)
New Revision: 4924
Modified:
trunk/src/main/org/jboss/messaging/core/journal/Journal.java
trunk/src/main/org/jboss/messaging/core/journal/TestableJournal.java
trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java
trunk/src/main/org/jboss/messaging/core/paging/PagingManager.java
trunk/src/main/org/jboss/messaging/core/paging/impl/PagingManagerImpl.java
trunk/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java
trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java
trunk/tests/src/org/jboss/messaging/tests/performance/journal/JournalImplTestUnit.java
trunk/tests/src/org/jboss/messaging/tests/unit/core/persistence/impl/journal/JournalStorageManagerTest.java
trunk/tests/src/org/jboss/messaging/tests/util/UnitTestCase.java
Log:
Journal tweaks / doc changes
Modified: trunk/src/main/org/jboss/messaging/core/journal/Journal.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/Journal.java 2008-09-09 09:37:56 UTC (rev 4923)
+++ trunk/src/main/org/jboss/messaging/core/journal/Journal.java 2008-09-09 22:41:33 UTC (rev 4924)
@@ -22,10 +22,9 @@
package org.jboss.messaging.core.journal;
+import java.util.List;
+
import org.jboss.messaging.core.server.MessagingComponent;
-
-import javax.transaction.xa.Xid;
-import java.util.List;
/**
*
* A Journal
@@ -40,10 +39,6 @@
void appendAddRecord(long id, byte recordType, EncodingSupport record) throws Exception;
- void appendAddRecord(long id, byte recordType, byte[] record) throws Exception;
-
- void appendUpdateRecord(long id, byte recordType, byte[] record) throws Exception;
-
void appendUpdateRecord(long id, byte recordType, EncodingSupport record) throws Exception;
void appendDeleteRecord(long id) throws Exception;
@@ -54,17 +49,24 @@
void appendAddRecordTransactional(long txID, long id, byte recordType, EncodingSupport record) throws Exception;
- void appendAddRecordTransactional(long txID, long id, byte recordType, byte[] record) throws Exception;
-
- void appendUpdateRecordTransactional(long txID, long id, byte recordType, byte[] record) throws Exception;
-
void appendUpdateRecordTransactional(long txID, long id, byte recordType, EncodingSupport record) throws Exception;
void appendDeleteRecordTransactional(long txID, long id) throws Exception;
void appendCommitRecord(long txID) throws Exception;
- void appendPrepareRecord(long txID, EncodingSupport transactionIdentifier) throws Exception;
+ /**
+ *
+ * <p>If the system crashed after a prepare was called, it should store information that is required to bring the transaction
+ * back to a state it could be committed. </p>
+ *
+ * <p> transactionData is usually a safe space you could use to store things like XIDs or any other supporting user-data required to replay the transaction </p>
+ *
+ * @param txID
+ * @param transactionData Information to support the system replaying the transaction
+ * @throws Exception
+ */
+ void appendPrepareRecord(long txID, EncodingSupport transactionData) throws Exception;
void appendRollbackRecord(long txID) throws Exception;
Modified: trunk/src/main/org/jboss/messaging/core/journal/TestableJournal.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/TestableJournal.java 2008-09-09 09:37:56 UTC (rev 4923)
+++ trunk/src/main/org/jboss/messaging/core/journal/TestableJournal.java 2008-09-09 22:41:33 UTC (rev 4924)
@@ -71,4 +71,17 @@
void setAutoReclaim(boolean autoReclaim);
boolean isAutoReclaim();
+
+
+ // These add methods are only used by testCases
+
+ void appendAddRecord(long id, byte recordType, byte[] record) throws Exception;
+
+ void appendUpdateRecord(long id, byte recordType, byte[] record) throws Exception;
+
+ void appendAddRecordTransactional(long txID, long id, byte recordType, byte[] record) throws Exception;
+
+ void appendUpdateRecordTransactional(long txID, long id, byte recordType, byte[] record) throws Exception;
+
+
}
Modified: trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java 2008-09-09 09:37:56 UTC (rev 4923)
+++ trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java 2008-09-09 22:41:33 UTC (rev 4924)
@@ -22,19 +22,48 @@
package org.jboss.messaging.core.journal.impl;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
import org.jboss.messaging.core.exception.MessagingException;
-import org.jboss.messaging.core.journal.*;
+import org.jboss.messaging.core.journal.BufferCallback;
+import org.jboss.messaging.core.journal.EncodingSupport;
+import org.jboss.messaging.core.journal.IOCallback;
+import org.jboss.messaging.core.journal.LoadManager;
+import org.jboss.messaging.core.journal.PreparedTransactionInfo;
+import org.jboss.messaging.core.journal.RecordInfo;
+import org.jboss.messaging.core.journal.SequentialFile;
+import org.jboss.messaging.core.journal.SequentialFileFactory;
+import org.jboss.messaging.core.journal.TestableJournal;
import org.jboss.messaging.core.logging.Logger;
import org.jboss.messaging.core.remoting.impl.ByteBufferWrapper;
+import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
import org.jboss.messaging.util.Pair;
import org.jboss.messaging.util.VariableLatch;
-import java.nio.ByteBuffer;
-import java.util.*;
-import java.util.concurrent.*;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-
/**
*
* <p>A JournalImpl</p
@@ -151,10 +180,9 @@
private final int reuseBufferSize;
- private final ConcurrentLinkedQueue<ByteBuffer> reuseBuffers = new ConcurrentLinkedQueue<ByteBuffer>();
+ /** Object that will control buffer's callback and getting buffers from the queue */
+ private final ReuseBuffersController buffersControl = new ReuseBuffersController();
- private final BufferCallback bufferCallback = new LocalBufferCallback();
-
/*
* We use a semaphore rather than synchronized since it performs better when
* contended
@@ -279,81 +307,6 @@
}
}
- public void appendAddRecord(final long id, final byte recordType, final byte[] record) throws Exception
- {
- if (state != STATE_LOADED)
- {
- throw new IllegalStateException("Journal must be loaded first");
- }
-
- int size = SIZE_ADD_RECORD + record.length;
-
- ByteBuffer bb = newBuffer(size);
-
- bb.put(ADD_RECORD);
- bb.position(SIZE_BYTE + SIZE_INT); // skip ID part
- bb.putLong(id);
- bb.putInt(record.length);
- bb.put(recordType);
- bb.put(record);
- bb.putInt(size);
- bb.rewind();
-
- try
- {
- lock.acquire();
-
- JournalFile usedFile = appendRecord(bb, syncNonTransactional, null);
-
- posFilesMap.put(id, new PosFiles(usedFile));
- }
- finally
- {
- lock.release();
- }
-
- }
-
- public void appendUpdateRecord(final long id, final byte recordType, final byte[] record) throws Exception
- {
- if (state != STATE_LOADED)
- {
- throw new IllegalStateException("Journal must be loaded first");
- }
-
- PosFiles posFiles = posFilesMap.get(id);
-
- if (posFiles == null)
- {
- throw new IllegalStateException("Cannot find add info " + id);
- }
-
- int size = SIZE_UPDATE_RECORD + record.length;
-
- ByteBuffer bb = newBuffer(size);
-
- bb.put(UPDATE_RECORD);
- bb.position(SIZE_BYTE + SIZE_INT); // skip ID part
- bb.putLong(id);
- bb.putInt(record.length);
- bb.put(recordType);
- bb.put(record);
- bb.putInt(size);
- bb.rewind();
-
- lock.acquire();
- try
- {
- JournalFile usedFile = appendRecord(bb, syncNonTransactional, null);
-
- posFiles.addUpdateFile(usedFile);
- }
- finally
- {
- lock.release();
- }
- }
-
public void appendUpdateRecord(final long id, final byte recordType, final EncodingSupport record) throws Exception
{
if (state != STATE_LOADED)
@@ -478,81 +431,6 @@
}
}
- public void appendAddRecordTransactional(final long txID, final long id, final byte recordType, final byte[] record) throws Exception
- {
- if (state != STATE_LOADED)
- {
- throw new IllegalStateException("Journal must be loaded first");
- }
-
- int size = SIZE_ADD_RECORD_TX + record.length;
-
- ByteBuffer bb = newBuffer(size);
-
- bb.put(ADD_RECORD_TX);
- bb.position(SIZE_BYTE + SIZE_INT); // skip ID part
- bb.putLong(txID);
- bb.putLong(id);
- bb.putInt(record.length);
- bb.put(recordType);
- bb.put(record);
- bb.putInt(size);
- bb.rewind();
-
- lock.acquire();
-
- try
- {
- JournalFile usedFile = appendRecord(bb, false, getTransactionCallback(txID));
-
- JournalTransaction tx = getTransactionInfo(txID);
-
- tx.addPositive(usedFile, id);
- }
- finally
- {
- lock.release();
- }
- }
-
- public void appendUpdateRecordTransactional(final long txID, final long id, byte recordType, final byte[] record) throws Exception
- {
- if (state != STATE_LOADED)
- {
- throw new IllegalStateException("Journal must be loaded first");
- }
-
- int size = SIZE_UPDATE_RECORD_TX + record.length;
-
- ByteBuffer bb = newBuffer(size);
-
- bb.put(UPDATE_RECORD_TX);
- bb.position(SIZE_BYTE + SIZE_INT); // skip ID part
- bb.putLong(txID);
- bb.putLong(id);
- bb.putInt(record.length);
- bb.put(recordType);
- bb.put(record);
- bb.putInt(size);
- bb.rewind();
-
- lock.acquire();
-
- try
- {
- JournalFile usedFile = appendRecord(bb, false, getTransactionCallback(txID));
-
- JournalTransaction tx = getTransactionInfo(txID);
-
- tx.addPositive(usedFile, id);
- }
- finally
- {
- lock.release();
- }
- }
-
-
public void appendUpdateRecordTransactional(final long txID, final long id, byte recordType, EncodingSupport record) throws Exception
{
if (state != STATE_LOADED)
@@ -624,7 +502,18 @@
}
}
- public void appendPrepareRecord(final long txID, EncodingSupport xid) throws Exception
+ /**
+ *
+ * <p>If the system crashed after a prepare was called, it should store information that is required to bring the transaction
+ * back to a state it could be committed. </p>
+ *
+ * <p> transactionData is usually a safe space you could use to store things like XIDs or any other supporting user-data required to replay the transaction </p>
+ *
+ * @param txID
+ * @param transactionData Information to support the system replaying the transaction
+ * @throws Exception
+ */
+ public void appendPrepareRecord(final long txID, EncodingSupport transactionData) throws Exception
{
if (state != STATE_LOADED)
{
@@ -633,7 +522,7 @@
JournalTransaction tx = getTransactionInfo(txID);
- ByteBuffer bb = writePrepareTransaction(PREPARE_RECORD, txID, tx, xid);
+ ByteBuffer bb = writePrepareTransaction(PREPARE_RECORD, txID, tx, transactionData);
lock.acquire();
@@ -804,16 +693,21 @@
byte recordType = bb.get();
if (recordType < ADD_RECORD || recordType > ROLLBACK_RECORD)
{
+ // I - We scan for any valid record on the file. If a hole happened on the middle of the file we keep looking until all the possibilities are gone
continue;
}
if (bb.position() + SIZE_INT > fileSize)
{
+ // II - Ignore this record, lets keep looking
continue;
}
+ // III - Every record has the file-id.
+ // This is what supports us from not re-filling the whole file
int readFileId = bb.getInt();
+ // IV - This record is from a previous file-usage. The file was reused and we need to ignore this record
if (readFileId != file.getOrderingID())
{
// If a file has damaged records, we make it a dataFile, and the next reclaiming will fix it
@@ -825,7 +719,7 @@
}
long transactionID = 0;
-
+
if (isTransaction(recordType))
{
if (bb.position() + SIZE_LONG > fileSize)
@@ -846,7 +740,11 @@
recordID = bb.getLong();
maxID = Math.max(maxID, recordID);
}
+
+ // We use the size of the record to validate the health of the record.
+ // (V) We verify the size of the record
+
// The variable record portion used on Updates and Appends
int variableSize = 0;
// Used to hold XIDs on PrepareTransactions
@@ -884,9 +782,12 @@
}
int recordSize = getRecordSize(recordType);
-
+
+ // VI - this is completing V, We will validate the size at the end of the record,
+ // But we avoid buffer overflows by damaged data
if (pos + recordSize + variableSize + preparedTransactionDataSize > fileSize)
{
+ // Avoid a buffer overflow caused by damaged data... continue scanning for more records...
continue;
}
@@ -896,6 +797,8 @@
int checkSize = bb.getInt();
+ // VII - The checkSize at the end has to match with the size informed at the beggining.
+ // This is like testing a hash for the record. (We could replace the checkSize by some sort of calculated hash)
if (checkSize != variableSize + recordSize + preparedTransactionDataSize)
{
log.warn("Record at position " + pos + " file:" + file.getFile().getFileName() + " is corrupted and it is being ignored");
@@ -907,6 +810,9 @@
bb.position(oldPos);
+
+ // At this point everything is already check. So relax and just load the data now.
+
switch(recordType)
{
case ADD_RECORD:
@@ -1023,8 +929,8 @@
byte xidData[] = new byte[preparedTransactionDataSize];
bb.get(xidData);
- // Pair <OrderId, NumberOfElements>
- Pair<Integer, Integer>[] values = readReferencesOnTransaction(variableSize, bb);
+ // Pair <FileID, NumberOfElements>
+ Pair<Integer, Integer>[] recordedSummary = readTransactionalElementsSummary(variableSize, bb);
if (tx != null)
{
@@ -1040,7 +946,7 @@
transactionInfos.put(transactionID, journalTransaction);
}
- boolean healthy = checkTransactionHealth(journalTransaction, orderedFiles, values);
+ boolean healthy = checkTransactionHealth(journalTransaction, orderedFiles, recordedSummary);
if (healthy)
{
@@ -1063,7 +969,7 @@
// We need to read it even if transaction was not found, or the reading checks would fail
// Pair <OrderId, NumberOfElements>
- Pair<Integer, Integer>[] values = readReferencesOnTransaction(variableSize, bb);
+ Pair<Integer, Integer>[] recordedSummary = readTransactionalElementsSummary(variableSize, bb);
if (tx != null)
{
@@ -1074,7 +980,7 @@
throw new IllegalStateException("Cannot find tx " + transactionID);
}
- boolean healthy = checkTransactionHealth(journalTransaction, orderedFiles, values);
+ boolean healthy = checkTransactionHealth(journalTransaction, orderedFiles, recordedSummary);
if (healthy)
{
@@ -1109,6 +1015,7 @@
}
case ROLLBACK_RECORD:
{
+
TransactionHolder tx = transactions.remove(transactionID);
if (tx != null)
@@ -1119,7 +1026,8 @@
{
throw new IllegalStateException("Cannot find tx " + transactionID);
}
-
+
+ // There is no need to validate summaries on Rollbacks.. We will ignore the data anyway.
tnp.rollback(file);
hasData = true;
@@ -1136,6 +1044,8 @@
checkSize = bb.getInt();
+ // This is a sanity check about the loading code itself.
+ // If this checkSize doesn't match, it means the reading method is not doing what it was supposed to do
if (checkSize != variableSize + recordSize + preparedTransactionDataSize)
{
throw new IllegalStateException("Internal error on loading file. Position doesn't match with checkSize");
@@ -1193,7 +1103,7 @@
if (this.reuseBufferSize > 0)
{
- currentFile.getFile().setBufferCallback(bufferCallback);
+ currentFile.getFile().setBufferCallback(buffersControl.callback);
}
currentFile.getFile().position(currentFile.getFile().calculateBlockStart(lastDataPos));
@@ -1436,6 +1346,30 @@
debugWait();
}
+ // Add/update methods only used on testcases (using a byte[]). Those methods are now part of the Test interface ------------------------
+
+
+ public void appendAddRecord(final long id, final byte recordType, final byte[] record) throws Exception
+ {
+ appendAddRecord(id, recordType, new ByteArrayEncoding(record));
+ }
+
+ public void appendUpdateRecord(final long id, final byte recordType, final byte[] record) throws Exception
+ {
+ appendUpdateRecord(id, recordType, new ByteArrayEncoding(record));
+ }
+
+ public void appendAddRecordTransactional(final long txID, final long id, final byte recordType, final byte[] record) throws Exception
+ {
+ appendAddRecordTransactional(txID, id, recordType, new ByteArrayEncoding(record));
+ }
+
+ public void appendUpdateRecordTransactional(final long txID, final long id, byte recordType, final byte[] record) throws Exception
+ {
+ appendUpdateRecordTransactional(txID, id, recordType, new ByteArrayEncoding(record));
+ }
+
+
// MessagingComponent implementation ---------------------------------------------------
public synchronized boolean isStarted()
@@ -1526,10 +1460,15 @@
return jf;
}
- private Pair<Integer, Integer>[] readReferencesOnTransaction(final int variableSize, final ByteBuffer bb)
+ /** It will read the elements-summary back from the commit/prepare transaction
+ * Pair<FileID, Counter> */
+ @SuppressWarnings("unchecked") // See comment on the method body
+ private Pair<Integer, Integer>[] readTransactionalElementsSummary(final int variableSize, final ByteBuffer bb)
{
int numberOfFiles = variableSize / (SIZE_INT * 2);
+ // This line aways throws a compilation-warning, even thought is a completely legal operation.
+ // We could remove the SuppressWarnings as soon as we find better expression on this line:
Pair<Integer, Integer> values[] = (Pair<Integer, Integer> [])new Pair[numberOfFiles];
for (int i = 0; i < numberOfFiles; i++)
@@ -1540,27 +1479,48 @@
return values;
}
+
+ /**
+ * <p>This method will validate if the transaction (PREPARE/COMMIT) is complete as stated on the COMMIT-RECORD.</p>
+ * <p> We record a summary about the records on the journal file on COMMIT and PREPARE.
+ * When we load the records we build a new summary and we check the original summary to the current summary.
+ * This method is basically verifying if the entire transaction is being loaded </p>
+ * @param journalTransaction
+ * @param orderedFiles
+ * @param recordedSummary
+ * @return
+ */
private boolean checkTransactionHealth(final JournalTransaction journalTransaction,
final List<JournalFile> orderedFiles,
- final Pair<Integer, Integer>[] readReferences)
+ final Pair<Integer, Integer>[] recordedSummary)
{
boolean healthy = true;
- Map<Integer, AtomicInteger> refMap = journalTransaction.getElementsSummary();
- for (Pair<Integer, Integer> ref: readReferences)
+ // (I) First we get the summary of what we really have on the files now:
+
+ // FileID, NumberOfElements
+ Map<Integer, AtomicInteger> currentSummary = journalTransaction.getElementsSummary();
+
+ // (II) We compare the recorded summary on the commit, against to the reality on the files
+ for (Pair<Integer, Integer> ref: recordedSummary)
{
- AtomicInteger counter = refMap.get(ref.a);
+ AtomicInteger counter = currentSummary.get(ref.a);
if (counter == null)
{
- // Couldn't find the counter, but if part of the transaction was reclaimed it is ok!
+ // (III) One of the original files didn't show any record. This would still be okay if the file was reclaimed
boolean found = false;
for (JournalFile lookupFile: orderedFiles)
{
if (lookupFile.getOrderingID() == ref.a)
{
+ // (IV) oops, we were expecting at least one record on this file.
+ // The file still exists and no records were found.
+ // That means the transaction crashed before complete,
+ // so this transaction is broken and needs to be ignored.
+ // This is probably a hole caused by a crash during commit.
found = true;
}
}
@@ -1572,6 +1532,8 @@
}
else
{
+ // (V) Missing a record... Transaction was not completed as stated. we will ignore the whole transaction
+ // This is probably a hole caused by a crash during commit/prepare.
if (counter.get() != ref.b)
{
healthy = false;
@@ -1582,6 +1544,29 @@
return healthy;
}
+ /**
+ *
+ * <p>A transaction record (Commit or Prepare), will hold the number of elements the transaction has on each file.</p>
+ * <p>For example, a transaction was spread along 3 journal files with 10 records on each file.
+ * (What could happen if there are too many records, or if an user event delayed records to come in time to a single file).</p>
+ * <p>The element-summary will then have</p>
+ * <p>FileID1, 10</p>
+ * <p>FileID2, 10</p>
+ * <p>FileID3, 10</p>
+ *
+ * <br>
+ * <p> During the load the transaction needs to have 30 records, spread across the files as originally written.</p>
+ * <p> If for any reason there are missing records, that means the transaction was not completed and we should ignore the whole transaction </p>
+ * <p> We can't just use a global counter as reclaiming could delete files after the transaction was successfully committed.
+ * That also means not having a whole file on journal-reload doesn't mean we have to invalidate the transaction </p>
+ *
+ * @param recordType
+ * @param txID
+ * @param tx
+ * @param transactionData
+ * @return
+ * @throws Exception
+ */
private ByteBuffer writeCommitTransaction(final byte recordType, final long txID, final JournalTransaction tx) throws Exception
{
int size = SIZE_COMPLETE_TRANSACTION_RECORD + tx.getElementsSummary().size() * SIZE_INT * 2;
@@ -1606,9 +1591,13 @@
return bb;
}
- private ByteBuffer writePrepareTransaction(final byte recordType, final long txID, final JournalTransaction tx, EncodingSupport xid) throws Exception
+ /**
+ * TODO: Merge this method back into writeCommitTransaction (as it was done originally).
+ * For more explanations read the javadoc on writeCommitTransaction
+ */
+ private ByteBuffer writePrepareTransaction(final byte recordType, final long txID, final JournalTransaction tx, EncodingSupport transactionData) throws Exception
{
- int xidSize = xid.getEncodeSize();
+ int xidSize = transactionData.getEncodeSize();
int size = SIZE_COMPLETE_TRANSACTION_RECORD + tx.getElementsSummary().size() * SIZE_INT * 2 + xidSize + SIZE_INT;
ByteBuffer bb = newBuffer(size);
@@ -1618,7 +1607,7 @@
bb.putLong(txID);
bb.putInt(xidSize);
bb.putInt(tx.getElementsSummary().size());
- xid.encode(new ByteBufferWrapper(bb));
+ transactionData.encode(new ByteBufferWrapper(bb));
for (Map.Entry<Integer, AtomicInteger> entry: tx.getElementsSummary().entrySet())
{
@@ -1765,6 +1754,12 @@
}
+ /**
+ * This method will create a new file on the file system, pre-fill it with FILL_CHARACTER
+ * @param keepOpened
+ * @return
+ * @throws Exception
+ */
private JournalFile createFile(final boolean keepOpened) throws Exception
{
int orderingID = generateOrderingID();
@@ -1806,7 +1801,7 @@
file.setOffset(file.getFile().calculateBlockStart(SIZE_HEADER));
if (this.reuseBufferSize > 0)
{
- file.getFile().setBufferCallback(bufferCallback);
+ file.getFile().setBufferCallback(buffersControl.callback);
}
}
@@ -1844,7 +1839,12 @@
currentFile = enqueueOpenFile();
}
- // You need to guarantee lock.acquire() before calling this method
+ /**
+ * This method will immediatly return the opened file, and schedule opening and reclaiming.
+ * In case there are no cached opened files, this method will block until the file was opened. (what would happen only if the system is under load).
+ *
+ * Warning: You need to guarantee lock.acquire() before calling this method
+ * */
private JournalFile enqueueOpenFile() throws InterruptedException
{
if (trace) trace("enqueueOpenFile with openedFiles.size=" + openedFiles.size());
@@ -1987,69 +1987,20 @@
return null;
}
}
- // -- Area reserved for the reuse buffer logic -----------------------------------------
- private volatile long bufferReuseLastTime = System.currentTimeMillis();
- private ByteBuffer newBuffer(int size)
+ public ByteBuffer newBuffer(int size)
{
- // if a new buffer wasn't requested in 10 seconds, we clear the queue
- // This is being done this way as we don't need another Timeout Thread just to cleanup this
- if (reuseBufferSize > 0 && System.currentTimeMillis() - bufferReuseLastTime > 10000)
- {
- log.debug("Clearing reuse buffers queue with " + reuseBuffers.size() + " elements");
- bufferReuseLastTime = System.currentTimeMillis();
- reuseBuffers.clear();
- }
-
- if (size > reuseBufferSize)
- {
- return fileFactory.newBuffer(size);
- }
- else
- {
-
- int alignedSize = fileFactory.calculateBlockSize(size);
-
- ByteBuffer buffer = this.reuseBuffers.poll();
- if (buffer == null)
- {
- buffer = fileFactory.newBuffer(reuseBufferSize);
- buffer.limit(alignedSize);
- }
- else
- {
- buffer.limit(alignedSize);
-
- // we could gain some little performance if we could avoid clearing the buffer.
- // On AIO this is being done with just a memset, what should be fairly quick
- fileFactory.clearBuffer(buffer);
- }
-
- buffer.rewind();
-
- return buffer;
- }
+ return this.buffersControl.newBuffer(size);
}
-
- private class LocalBufferCallback implements BufferCallback
- {
- public void bufferDone(ByteBuffer buffer)
- {
- bufferReuseLastTime = System.currentTimeMillis();
- if (buffer.capacity() == reuseBufferSize)
- {
- reuseBuffers.offer(buffer);
- }
- }
-
- }
-
// ------------------------------------------------------------------------------------
// Inner classes ---------------------------------------------------------------------------
+
+ // Just encapsulates the VariableLatch waiting for transaction completions
+ // Used if the SequentialFile supports Callbacks
private static class TransactionCallback implements IOCallback
{
private final VariableLatch countLatch = new VariableLatch();
@@ -2087,6 +2038,7 @@
}
+ /** Used on the ref-count for reclaiming */
private static class PosFiles
{
private final JournalFile addFile;
@@ -2125,7 +2077,82 @@
}
}
}
+
+ /** Class that will control buffer-reuse */
+ class ReuseBuffersController
+ {
+ private volatile long bufferReuseLastTime = System.currentTimeMillis();
+
+ // This queue is feeded by LocalBufferCallback which is called directly by NIO or NIO.
+ // On the case of the AIO this is almost called by the native layer as soon as the buffer is not being used any more
+ // and ready to reuse or GC
+ private final ConcurrentLinkedQueue<ByteBuffer> reuseBuffers = new ConcurrentLinkedQueue<ByteBuffer>();
+
+ final BufferCallback callback = new LocalBufferCallback();
+
+ public ByteBuffer newBuffer(int size)
+ {
+ // if a new buffer wasn't requested in 10 seconds, we clear the queue
+ // This is being done this way as we don't need another Timeout Thread just to cleanup this
+ if (reuseBufferSize > 0 && System.currentTimeMillis() - bufferReuseLastTime > 10000)
+ {
+ log.debug("Clearing reuse buffers queue with " + reuseBuffers.size() + " elements");
+ bufferReuseLastTime = System.currentTimeMillis();
+ reuseBuffers.clear();
+ }
+
+ // if a buffer is bigger than the configured-size, we just create a new buffer.
+ if (size > reuseBufferSize)
+ {
+ return fileFactory.newBuffer(size);
+ }
+ else
+ {
+
+ // We need to allocate buffers following the rules of the storage being used (AIO/NIO)
+ int alignedSize = fileFactory.calculateBlockSize(size);
+
+ // Try getting a buffer from the queue...
+ ByteBuffer buffer = this.reuseBuffers.poll();
+ if (buffer == null)
+ {
+ // if empty create a new one.
+ buffer = fileFactory.newBuffer(reuseBufferSize);
+ buffer.limit(alignedSize);
+ }
+ else
+ {
+ // set the limit of the buffer to the size being required
+ buffer.limit(alignedSize);
+ fileFactory.clearBuffer(buffer);
+ }
+
+ buffer.rewind();
+
+ return buffer;
+ }
+ }
+
+ private class LocalBufferCallback implements BufferCallback
+ {
+
+ public void bufferDone(ByteBuffer buffer)
+ {
+ bufferReuseLastTime = System.currentTimeMillis();
+ // If a buffer has any other than the configured size, the buffer will be just sent to GC
+ // This could happen if
+ if (buffer.capacity() == reuseBufferSize)
+ {
+ reuseBuffers.offer(buffer);
+ }
+ }
+
+ }
+
+ }
+
+
private class JournalTransaction
{
private List<Pair<JournalFile, Long>> pos;
@@ -2275,4 +2302,34 @@
}
}
+
+ private static class ByteArrayEncoding implements EncodingSupport
+ {
+
+ final byte[] data;
+
+ ByteArrayEncoding(final byte[] data)
+ {
+ this.data = data;
+ }
+
+ public void decode(MessagingBuffer buffer)
+ {
+ throw new IllegalStateException("operation not supported");
+ }
+
+ public void encode(MessagingBuffer buffer)
+ {
+ buffer.putBytes(data);
+ }
+
+ public int getEncodeSize()
+ {
+ return data.length;
+ }
+
+ }
+
+
+
}
Modified: trunk/src/main/org/jboss/messaging/core/paging/PagingManager.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/paging/PagingManager.java 2008-09-09 09:37:56 UTC (rev 4923)
+++ trunk/src/main/org/jboss/messaging/core/paging/PagingManager.java 2008-09-09 22:41:33 UTC (rev 4924)
@@ -91,7 +91,7 @@
*
* Duplication detection for paging processing
* */
- void loadLastPage(LastPageRecord lastPage) throws Exception;
+ void setLastPage(LastPageRecord lastPage) throws Exception;
/**
*
Modified: trunk/src/main/org/jboss/messaging/core/paging/impl/PagingManagerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/paging/impl/PagingManagerImpl.java 2008-09-09 09:37:56 UTC (rev 4923)
+++ trunk/src/main/org/jboss/messaging/core/paging/impl/PagingManagerImpl.java 2008-09-09 22:41:33 UTC (rev 4924)
@@ -246,7 +246,7 @@
return pagingStore.getAddressSize() < pagingStore.getMaxSizeBytes();
}
- public void loadLastPage(LastPageRecord lastPage) throws Exception
+ public void setLastPage(LastPageRecord lastPage) throws Exception
{
System.out.println("LastPage loaded was " + lastPage.getLastId() + " recordID = " + lastPage.getRecordId());
this.getPageStore(lastPage.getDestination()).setLastRecord(lastPage);
Modified: trunk/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java 2008-09-09 09:37:56 UTC (rev 4923)
+++ trunk/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java 2008-09-09 22:41:33 UTC (rev 4924)
@@ -179,7 +179,7 @@
/**
- * Depage one page-file, read it and send it to the pagingManager
+ * Depage one page-file, read it and send it to the pagingManager / postoffice
* @return
* @throws Exception
*/
@@ -197,10 +197,10 @@
}
page.open();
PageMessage messages[] = page.read();
- boolean needMorePages = pagingManager.onDepage(page.getPageId(), PagingStoreImpl.this.storeName, PagingStoreImpl.this, messages);
+ boolean addressFull = pagingManager.onDepage(page.getPageId(), PagingStoreImpl.this.storeName, PagingStoreImpl.this, messages);
page.delete();
- return needMorePages;
+ return addressFull;
}
Modified: trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java 2008-09-09 09:37:56 UTC (rev 4923)
+++ trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java 2008-09-09 22:41:33 UTC (rev 4924)
@@ -203,9 +203,7 @@
public void storeAcknowledge(final long queueID, final long messageID) throws Exception
{
- EncodingSupport record = ackBytes(queueID, messageID);
-
- messageJournal.appendUpdateRecord(messageID, ACKNOWLEDGE_REF, record);
+ messageJournal.appendUpdateRecord(messageID, ACKNOWLEDGE_REF, new ACKEncoding(queueID));
}
public void storeDelete(final long messageID) throws Exception
@@ -244,9 +242,7 @@
public void storeAcknowledgeTransactional(long txID, long queueID, long messageID) throws Exception
{
- EncodingSupport record = ackBytes(queueID, messageID);
-
- messageJournal.appendUpdateRecordTransactional(txID, messageID, ACKNOWLEDGE_REF, record);
+ messageJournal.appendUpdateRecordTransactional(txID, messageID, ACKNOWLEDGE_REF, new ACKEncoding(queueID));
}
public void storeDeleteTransactional(long txID, long messageID) throws Exception
@@ -273,19 +269,11 @@
public void updateDeliveryCount(final MessageReference ref) throws Exception
{
- byte[] bytes = new byte[SIZE_BYTE + SIZE_LONG + SIZE_LONG + SIZE_INT];
+ DeliveryCountUpdateEncoding updateInfo = new DeliveryCountUpdateEncoding(ref.getQueue().getPersistenceID(), ref.getDeliveryCount());
- ByteBuffer bb = ByteBuffer.wrap(bytes);
-
- bb.putLong(ref.getQueue().getPersistenceID());
-
- bb.putLong(ref.getMessage().getMessageID());
-
- bb.putInt(ref.getDeliveryCount());
-
- messageJournal.appendUpdateRecord(ref.getMessage().getMessageID(), UPDATE_DELIVERY_COUNT, bytes);
+ messageJournal.appendUpdateRecord(ref.getMessage().getMessageID(), UPDATE_DELIVERY_COUNT, updateInfo);
}
-
+
public void loadMessages(final PostOffice postOffice, final Map<Long, Queue> queues, ResourceManager resourceManager) throws Exception
{
List<RecordInfo> records = new ArrayList<RecordInfo>();
@@ -358,47 +346,15 @@
byte[] data = record.data;
ByteBuffer bb = ByteBuffer.wrap(data);
+
+ MessagingBuffer buff = new ByteBufferWrapper(bb);
byte recordType = record.getUserRecordType();
switch (recordType)
{
- case PAGE_TRANSACTION:
- {
- MessagingBuffer buff = new ByteBufferWrapper(bb);
-
- PageTransactionInfoImpl pageTransactionInfo = new PageTransactionInfoImpl();
-
- pageTransactionInfo.decode(buff);
-
- pageTransactionInfo.setRecordID(record.id);
-
- PagingManager pagingManager = postOffice.getPagingManager();
-
- pagingManager.addTransaction(pageTransactionInfo);
-
- break;
- }
- case LAST_PAGE:
- {
- MessagingBuffer buff = new ByteBufferWrapper(bb);
-
- LastPageRecordImpl recordImpl = new LastPageRecordImpl();
-
- recordImpl.setRecordId(record.id);
-
- recordImpl.decode(buff);
-
- PagingManager pagingManager = postOffice.getPagingManager();
-
- pagingManager.loadLastPage(recordImpl);
-
- break;
- }
case ADD_MESSAGE:
{
- MessagingBuffer buff = new ByteBufferWrapper(bb);
-
ServerMessage message = new ServerMessageImpl(record.id);
message.decode(buff);
@@ -414,15 +370,17 @@
}
case ACKNOWLEDGE_REF:
{
- long queueID = bb.getLong();
+ long messageID = record.id;
+
+ ACKEncoding encoding = new ACKEncoding();
+ encoding.decode(buff);
+
- long messageID = bb.getLong();
+ Queue queue = queues.get(encoding.queueID);
- Queue queue = queues.get(queueID);
-
if (queue == null)
{
- throw new IllegalStateException("Cannot find queue with id " + queueID);
+ throw new IllegalStateException("Cannot find queue with id " + encoding.queueID);
}
MessageReference removed = queue.removeReferenceWithID(messageID);
@@ -436,17 +394,17 @@
}
case UPDATE_DELIVERY_COUNT:
{
- long queueID = bb.getLong();
+ long messageID = record.id;
+
+ DeliveryCountUpdateEncoding deliveryUpdate = new DeliveryCountUpdateEncoding();
+
+ deliveryUpdate.decode(buff);
- long messageID = bb.getLong();
+ Queue queue = queues.get(deliveryUpdate.queueID);
- int deliveryCount = bb.getInt();
-
- Queue queue = queues.get(queueID);
-
if (queue == null)
{
- throw new IllegalStateException("Cannot find queue with id " + queueID);
+ throw new IllegalStateException("Cannot find queue with id " + deliveryUpdate.queueID);
}
MessageReference reference = queue.getReference(messageID);
@@ -456,11 +414,40 @@
throw new IllegalStateException("Failed to find reference for " + messageID);
}
- reference.setDeliveryCount(deliveryCount);
+ reference.setDeliveryCount(deliveryUpdate.count);
break;
}
+ case PAGE_TRANSACTION:
+ {
+
+ PageTransactionInfoImpl pageTransactionInfo = new PageTransactionInfoImpl();
+
+ pageTransactionInfo.decode(buff);
+
+ pageTransactionInfo.setRecordID(record.id);
+
+ PagingManager pagingManager = postOffice.getPagingManager();
+
+ pagingManager.addTransaction(pageTransactionInfo);
+
+ break;
+ }
+ case LAST_PAGE:
+ {
+ LastPageRecordImpl recordImpl = new LastPageRecordImpl();
+
+ recordImpl.setRecordId(record.id);
+
+ recordImpl.decode(buff);
+
+ PagingManager pagingManager = postOffice.getPagingManager();
+
+ pagingManager.setLastPage(recordImpl);
+
+ break;
+ }
case SET_SCHEDULED_DELIVERY_TIME:
{
//TODO
@@ -477,19 +464,6 @@
public void addBinding(Binding binding) throws Exception
{
- ByteArrayOutputStream baos = new ByteArrayOutputStream();
-
- DataOutputStream daos = new DataOutputStream(baos);
-
- /*
- We store:
- *
- * Queue name
- * Address string
- * All nodes?
- * Filter string
- */
-
Queue queue = binding.getQueue();
//We generate the queue id here
@@ -497,39 +471,25 @@
long queueID = bindingIDSequence.getAndIncrement();
queue.setPersistenceID(queueID);
-
- byte[] nameBytes = queue.getName().getData();
- daos.writeInt(nameBytes.length);
+ final SimpleString filterString;
- daos.write(nameBytes);
+ final Filter filter = queue.getFilter();
- byte[] addressBytes = binding.getAddress().getData();
-
- daos.writeInt(addressBytes.length);
-
- daos.write(addressBytes);
-
- Filter filter = queue.getFilter();
-
- daos.writeBoolean(filter != null);
-
if (filter != null)
{
- byte[] filterBytes = queue.getFilter().getFilterString().getData();
-
- daos.writeInt(filterBytes.length);
-
- daos.write(filterBytes);
+ filterString = filter.getFilterString();
}
-
- daos.flush();
-
- byte[] data = baos.toByteArray();
+ else
+ {
+ filterString = null;
+ }
- bindingsJournal.appendAddRecord(queueID, BINDING_RECORD, data);
+ BindingEncoding bindingEncoding = new BindingEncoding(binding.getQueue().getName(), binding.getAddress(), filterString);
+
+ bindingsJournal.appendAddRecord(queueID, BINDING_RECORD, bindingEncoding);
}
-
+
public void deleteBinding(Binding binding) throws Exception
{
long id = binding.getQueue().getPersistenceID();
@@ -553,22 +513,10 @@
}
else
{
- ByteArrayOutputStream baos = new ByteArrayOutputStream();
-
- DataOutputStream daos = new DataOutputStream(baos);
+ DestinationEncoding destinationEnc = new DestinationEncoding(destination);
- byte[] destBytes = destination.getData();
+ bindingsJournal.appendAddRecord(destinationID, DESTINATION_RECORD, destinationEnc);
- daos.writeInt(destBytes.length);
-
- daos.write(destBytes);
-
- daos.flush();
-
- byte[] data = baos.toByteArray();
-
- bindingsJournal.appendAddRecord(destinationID, DESTINATION_RECORD, data);
-
return true;
}
}
@@ -600,57 +548,38 @@
{
long id = record.id;
- byte[] data = record.data;
+ MessagingBuffer buffer = new ByteBufferWrapper(ByteBuffer.wrap(record.data));
- ByteArrayInputStream bais = new ByteArrayInputStream(data);
-
- DataInputStream dais = new DataInputStream(bais);
-
byte rec = record.getUserRecordType();
if (rec == BINDING_RECORD)
{
- int len = dais.readInt();
- byte[] queueNameBytes = new byte[len];
- dais.read(queueNameBytes);
- SimpleString queueName = new SimpleString(queueNameBytes);
-
- len = dais.readInt();
- byte[] addressBytes = new byte[len];
- dais.read(addressBytes);
- SimpleString address = new SimpleString(addressBytes);
-
- Filter filter = null;
-
- if (dais.readBoolean())
- {
- len = dais.readInt();
- byte[] filterBytes = new byte[len];
- dais.read(filterBytes);
- SimpleString filterString = new SimpleString(filterBytes);
-
- filter = new FilterImpl(filterString);
- }
-
- Queue queue = queueFactory.createQueue(id, queueName, filter, true);
+
+ BindingEncoding encodeBinding = new BindingEncoding();
+ encodeBinding.decode(buffer);
+
+ Filter filter = null;
+
+ if (encodeBinding.filter != null)
+ {
+ filter = new FilterImpl(encodeBinding.filter);
+ }
+
+ Queue queue = queueFactory.createQueue(id, encodeBinding.queueName, filter, true);
- Binding binding = new BindingImpl(address, queue);
+ Binding binding = new BindingImpl(encodeBinding.address, queue);
bindings.add(binding);
}
else if (rec == DESTINATION_RECORD)
{
- int len = dais.readInt();
+
+ DestinationEncoding destEnc = new DestinationEncoding();
+ destEnc.decode(buffer);
+
+ destinationIDMap.put(destEnc.destination, id);
- byte[] destData = new byte[len];
-
- dais.read(destData);
-
- SimpleString destinationName = new SimpleString(destData);
-
- destinationIDMap.put(destinationName, id);
-
- destinations.add(destinationName);
+ destinations.add(destEnc.destination);
}
else
{
@@ -661,6 +590,7 @@
bindingIDSequence.set(maxID + 1);
}
+
// MessagingComponent implementation ------------------------------------------------------
public synchronized void start() throws Exception
@@ -710,30 +640,6 @@
// Private ----------------------------------------------------------------------------------
- private EncodingSupport ackBytes(final long queueID, final long messageID)
- {
- // Using an EncodingSupport, to avoid some byteArrayCopy
- return new EncodingSupport()
- {
-
- public void decode(MessagingBuffer buffer)
- {
- throw new UnsupportedOperationException();
- }
-
- public void encode(MessagingBuffer buffer)
- {
- buffer.putLong(queueID);
- buffer.putLong(messageID);
- }
-
- public int getEncodeSize()
- {
- return SIZE_LONG * 2;
- }
-
- };
- }
private void checkAndCreateDir(String dir, boolean create)
{
@@ -800,4 +706,154 @@
}
+ private static class BindingEncoding implements EncodingSupport
+ {
+
+ SimpleString queueName;
+ SimpleString address;
+ SimpleString filter;
+
+ public BindingEncoding()
+ {
+
+ }
+
+ public BindingEncoding(SimpleString queueName,
+ SimpleString address, SimpleString filter)
+ {
+ super();
+ this.queueName = queueName;
+ this.address = address;
+ this.filter = filter;
+ }
+
+ public void decode(MessagingBuffer buffer)
+ {
+ queueName = buffer.getSimpleString();
+ address = buffer.getSimpleString();
+ filter = buffer.getNullableSimpleString();
+
+ }
+
+ public void encode(MessagingBuffer buffer)
+ {
+ buffer.putSimpleString(queueName);
+ buffer.putSimpleString(address);
+ buffer.putNullableSimpleString(filter);
+ }
+
+ public int getEncodeSize()
+ {
+ return SimpleString.sizeofString(queueName) +
+ SimpleString.sizeofString(address) +
+ 1 + // HasFilter?
+ ((filter != null) ? SimpleString.sizeofString(filter) : 0);
+ }
+ }
+
+ private static class DestinationEncoding implements EncodingSupport
+ {
+
+ SimpleString destination;
+
+ DestinationEncoding(SimpleString destination)
+ {
+ this.destination = destination;
+ }
+
+ DestinationEncoding()
+ {
+ }
+
+ public void decode(MessagingBuffer buffer)
+ {
+ this.destination = buffer.getSimpleString();
+ }
+
+ public void encode(MessagingBuffer buffer)
+ {
+ buffer.putSimpleString(destination);
+ }
+
+ public int getEncodeSize()
+ {
+ return SimpleString.sizeofString(destination);
+ }
+
+ }
+
+ private static class DeliveryCountUpdateEncoding implements EncodingSupport
+ {
+ long queueID;
+ int count;
+
+ public DeliveryCountUpdateEncoding()
+ {
+ super();
+ }
+
+ public DeliveryCountUpdateEncoding(long queueID, int count)
+ {
+ super();
+ this.queueID = queueID;
+ this.count = count;
+ }
+
+ public void decode(MessagingBuffer buffer)
+ {
+ queueID = buffer.getLong();
+ count = buffer.getInt();
+ }
+
+ public void encode(MessagingBuffer buffer)
+ {
+ buffer.putLong(queueID);
+ buffer.putInt(count);
+ }
+
+ public int getEncodeSize()
+ {
+ return 8 + 4;
+ }
+
+ }
+
+
+ private class ACKEncoding implements EncodingSupport
+ {
+ long queueID;
+
+
+
+ public ACKEncoding(long queueID)
+ {
+ super();
+ this.queueID = queueID;
+ }
+
+ public ACKEncoding()
+ {
+ super();
+ }
+
+ public void decode(MessagingBuffer buffer)
+ {
+ this.queueID = buffer.getLong();
+ }
+
+ public void encode(MessagingBuffer buffer)
+ {
+ buffer.putLong(queueID);
+ }
+
+ public int getEncodeSize()
+ {
+ return 8;
+ }
+
+ }
+
+
+
+
}
Modified: trunk/tests/src/org/jboss/messaging/tests/performance/journal/JournalImplTestUnit.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/performance/journal/JournalImplTestUnit.java 2008-09-09 09:37:56 UTC (rev 4923)
+++ trunk/tests/src/org/jboss/messaging/tests/performance/journal/JournalImplTestUnit.java 2008-09-09 22:41:33 UTC (rev 4924)
@@ -31,6 +31,7 @@
import org.jboss.messaging.core.journal.impl.JournalImpl;
import org.jboss.messaging.core.logging.Logger;
import org.jboss.messaging.tests.unit.core.journal.impl.JournalImplTestBase;
+import org.jboss.messaging.tests.unit.core.journal.impl.fakes.SimpleEncoding;
/**
*
@@ -211,7 +212,7 @@
{
final int numMessages = 50050;
- byte[] data = new byte[1024];
+ SimpleEncoding data = new SimpleEncoding(1024, (byte)'j');
long start = System.currentTimeMillis();
@@ -270,7 +271,7 @@
journal.load(new ArrayList<RecordInfo>(), null);
log.debug("Adding data");
- byte[] data = new byte[700];
+ SimpleEncoding data = new SimpleEncoding(700, (byte)'j');
long start = System.currentTimeMillis();
Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/persistence/impl/journal/JournalStorageManagerTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/persistence/impl/journal/JournalStorageManagerTest.java 2008-09-09 09:37:56 UTC (rev 4923)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/persistence/impl/journal/JournalStorageManagerTest.java 2008-09-09 22:41:33 UTC (rev 4924)
@@ -88,12 +88,7 @@
final long queueID = 1210981;
final long messageID = 101921092;
- byte[] record = new byte[16];
- ByteBuffer bb = ByteBuffer.wrap(record);
- bb.putLong(queueID);
- bb.putLong(messageID);
-
- messageJournal.appendUpdateRecord(EasyMock.eq(messageID), EasyMock.eq(JournalStorageManager.ACKNOWLEDGE_REF), encodingMatch(record));
+ messageJournal.appendUpdateRecord(EasyMock.eq(messageID), EasyMock.eq(JournalStorageManager.ACKNOWLEDGE_REF), encodingMatch(autoEncode(queueID)));
EasyMock.replay(messageJournal, bindingsJournal);
jsm.storeAcknowledge(queueID, messageID);
EasyMock.verify(messageJournal, bindingsJournal);
@@ -141,13 +136,8 @@
final long queueID = 1210981;
final long messageID = 101921092;
- byte[] record = new byte[16];
- ByteBuffer bb = ByteBuffer.wrap(record);
- bb.putLong(queueID);
- bb.putLong(messageID);
-
final long txID = 12091921;
- messageJournal.appendUpdateRecordTransactional(EasyMock.eq(txID), EasyMock.eq(messageID), EasyMock.eq(JournalStorageManager.ACKNOWLEDGE_REF), encodingMatch(record));
+ messageJournal.appendUpdateRecordTransactional(EasyMock.eq(txID), EasyMock.eq(messageID), EasyMock.eq(JournalStorageManager.ACKNOWLEDGE_REF), encodingMatch(autoEncode(queueID)));
EasyMock.replay(messageJournal, bindingsJournal);
jsm.storeAcknowledgeTransactional(txID, queueID, messageID);
EasyMock.verify(messageJournal, bindingsJournal);
@@ -228,12 +218,6 @@
final long queueID = 1283743;
final int deliveryCount = 4757;
- byte[] bytes = new byte[21];
- ByteBuffer bb = ByteBuffer.wrap(bytes);
- bb.putLong(queueID);
- bb.putLong(msgID);
- bb.putInt(deliveryCount);
-
MessageReference ref = EasyMock.createStrictMock(MessageReference.class);
ServerMessage msg = EasyMock.createStrictMock(ServerMessage.class);
Queue queue = EasyMock.createStrictMock(Queue.class);
@@ -243,7 +227,7 @@
EasyMock.expect(msg.getMessageID()).andStubReturn(msgID);
EasyMock.expect(ref.getDeliveryCount()).andReturn(deliveryCount);
- messageJournal.appendUpdateRecord(EasyMock.eq(msgID), EasyMock.eq(JournalStorageManager.UPDATE_DELIVERY_COUNT), EasyMock.aryEq(bytes));
+ messageJournal.appendUpdateRecord(EasyMock.eq(msgID), EasyMock.eq(JournalStorageManager.UPDATE_DELIVERY_COUNT), compareEncodingSupport(autoEncode(queueID, deliveryCount)));
EasyMock.replay(messageJournal, bindingsJournal, ref, msg, queue);
jsm.updateDeliveryCount(ref);
EasyMock.verify(messageJournal, bindingsJournal, ref, msg, queue);
@@ -319,10 +303,9 @@
RecordInfo record5 = new RecordInfo(msg2ID, JournalStorageManager.ACKNOWLEDGE_REF, ack3Bytes, true);
final int deliveryCount = 4757;
- byte[] updateBytes = new byte[21];
+ byte[] updateBytes = new byte[12];
ByteBuffer bb4 = ByteBuffer.wrap(updateBytes);
bb4.putLong(queue1ID);
- bb4.putLong(msg1ID);
bb4.putInt(deliveryCount);
RecordInfo record6 = new RecordInfo(msg1ID, JournalStorageManager.UPDATE_DELIVERY_COUNT, updateBytes, true);
@@ -459,7 +442,7 @@
log.debug("** data length is " + data.length);
log.debug(UnitTestCase.dumpBytes(data));
- bindingsJournal.appendAddRecord(EasyMock.eq(0L), EasyMock.eq(JournalStorageManager.BINDING_RECORD), EasyMock.aryEq(data));
+ bindingsJournal.appendAddRecord(EasyMock.eq(0L), EasyMock.eq(JournalStorageManager.BINDING_RECORD), compareEncodingSupport(data));
if (useFilter)
{
@@ -548,7 +531,7 @@
daos.write(destBytes);
daos.flush();
byte[] data = baos.toByteArray();
- bindingsJournal.appendAddRecord(EasyMock.eq(0L), EasyMock.eq(JournalStorageManager.DESTINATION_RECORD), EasyMock.aryEq(data));
+ bindingsJournal.appendAddRecord(EasyMock.eq(0L), EasyMock.eq(JournalStorageManager.DESTINATION_RECORD), compareEncodingSupport(data));
EasyMock.replay(messageJournal, bindingsJournal);
@@ -579,7 +562,7 @@
daos.write(destBytes);
daos.flush();
data = baos.toByteArray();
- bindingsJournal.appendAddRecord(EasyMock.eq(2L), EasyMock.eq(JournalStorageManager.DESTINATION_RECORD), EasyMock.aryEq(data));
+ bindingsJournal.appendAddRecord(EasyMock.eq(2L), EasyMock.eq(JournalStorageManager.DESTINATION_RECORD), compareEncodingSupport(data));
EasyMock.replay(messageJournal, bindingsJournal);
Modified: trunk/tests/src/org/jboss/messaging/tests/util/UnitTestCase.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/util/UnitTestCase.java 2008-09-09 09:37:56 UTC (rev 4923)
+++ trunk/tests/src/org/jboss/messaging/tests/util/UnitTestCase.java 2008-09-09 22:41:33 UTC (rev 4924)
@@ -43,7 +43,9 @@
import org.easymock.EasyMock;
import org.easymock.IArgumentMatcher;
import org.jboss.messaging.core.exception.MessagingException;
+import org.jboss.messaging.core.journal.EncodingSupport;
import org.jboss.messaging.core.remoting.impl.ByteBufferWrapper;
+import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
import org.jboss.messaging.core.server.MessageReference;
import org.jboss.messaging.core.server.Queue;
import org.jboss.messaging.core.server.ServerMessage;
@@ -275,6 +277,50 @@
return null;
}
+ protected EncodingSupport compareEncodingSupport(final byte expectedArray[])
+ {
+
+ EasyMock.reportMatcher(new IArgumentMatcher()
+ {
+
+ public void appendTo(StringBuffer buffer)
+ {
+ buffer.append("EncodingSupport buffer didn't match");
+ }
+
+ public boolean matches(Object argument)
+ {
+ EncodingSupport encoding = (EncodingSupport) argument;
+
+ final int size = encoding.getEncodeSize();
+
+ if (size != expectedArray.length)
+ {
+ System.out.println(size + " != " + expectedArray.length);
+ return false;
+ }
+
+ byte[] compareArray = new byte[size];
+
+ MessagingBuffer buffer = new ByteBufferWrapper(ByteBuffer.wrap(compareArray));
+ encoding.encode(buffer);
+
+ for (int i = 0; i < expectedArray.length; i++)
+ {
+ if (expectedArray[i] != compareArray[i])
+ {
+ return false;
+ }
+ }
+
+ return true;
+ }
+
+ });
+
+ return null;
+ }
+
protected boolean deleteDirectory(File directory)
More information about the jboss-cvs-commits
mailing list