[jboss-cvs] JBoss Messaging SVN: r7446 - in branches/clebert_temp_expirement: src/main/org/jboss/messaging/core/journal/impl and 5 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Tue Jun 23 18:48:10 EDT 2009
Author: clebert.suconic at jboss.com
Date: 2009-06-23 18:48:09 -0400 (Tue, 23 Jun 2009)
New Revision: 7446
Modified:
branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/Journal.java
branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java
branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java
branches/clebert_temp_expirement/tests/src/org/jboss/messaging/tests/performance/journal/JournalImplTestUnit.java
branches/clebert_temp_expirement/tests/src/org/jboss/messaging/tests/stress/journal/remote/RemoteJournalAppender.java
branches/clebert_temp_expirement/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/AlignedJournalImplTest.java
branches/clebert_temp_expirement/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalAsyncTest.java
branches/clebert_temp_expirement/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalImplTestBase.java
branches/clebert_temp_expirement/tests/src/org/jboss/messaging/tests/util/JournalExample.java
Log:
Individualizing callbacks (per file)
Modified: branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/Journal.java
===================================================================
--- branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/Journal.java 2009-06-23 19:12:39 UTC (rev 7445)
+++ branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/Journal.java 2009-06-23 22:48:09 UTC (rev 7446)
@@ -50,19 +50,19 @@
// Transactional operations
- void appendAddRecordTransactional(long txID, long id, byte recordType, byte[] record, boolean sync) throws Exception;
+ void appendAddRecordTransactional(long txID, long id, byte recordType, byte[] record) throws Exception;
- void appendAddRecordTransactional(long txID, long id, byte recordType, EncodingSupport record, boolean sync) throws Exception;
+ void appendAddRecordTransactional(long txID, long id, byte recordType, EncodingSupport record) throws Exception;
- void appendUpdateRecordTransactional(long txID, long id, byte recordType, byte[] record, boolean sync) throws Exception;
+ void appendUpdateRecordTransactional(long txID, long id, byte recordType, byte[] record) throws Exception;
- void appendUpdateRecordTransactional(long txID, long id, byte recordType, EncodingSupport record, boolean sync) throws Exception;
+ void appendUpdateRecordTransactional(long txID, long id, byte recordType, EncodingSupport record) throws Exception;
- void appendDeleteRecordTransactional(long txID, long id, byte[] record, boolean sync) throws Exception;
+ void appendDeleteRecordTransactional(long txID, long id, byte[] record) throws Exception;
- void appendDeleteRecordTransactional(long txID, long id, EncodingSupport record, boolean sync) throws Exception;
+ void appendDeleteRecordTransactional(long txID, long id, EncodingSupport record) throws Exception;
- void appendDeleteRecordTransactional(long txID, long id, boolean sync) throws Exception;
+ void appendDeleteRecordTransactional(long txID, long id) throws Exception;
void appendCommitRecord(long txID, boolean sync) throws Exception;
Modified: branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java
===================================================================
--- branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java 2009-06-23 19:12:39 UTC (rev 7445)
+++ branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java 2009-06-23 22:48:09 UTC (rev 7446)
@@ -305,7 +305,7 @@
lock.acquire();
try
{
- JournalFile usedFile = appendRecord(bb, sync, callback);
+ JournalFile usedFile = appendRecord(bb, sync, null, callback);
recordsRelationshipMap.put(id, new RecordFilesRelationship(usedFile));
}
@@ -368,7 +368,7 @@
lock.acquire();
try
{
- JournalFile usedFile = appendRecord(bb, sync, callback);
+ JournalFile usedFile = appendRecord(bb, sync, null, callback);
posFiles.addUpdateFile(usedFile);
}
@@ -423,7 +423,7 @@
lock.acquire();
try
{
- JournalFile usedFile = appendRecord(bb, sync, callback);
+ JournalFile usedFile = appendRecord(bb, sync, null, callback);
posFiles.addDelete(usedFile);
}
@@ -446,18 +446,16 @@
public void appendAddRecordTransactional(final long txID,
final long id,
final byte recordType,
- final byte[] record,
- final boolean sync) throws Exception
+ final byte[] record) throws Exception
{
- appendAddRecordTransactional(txID, id, recordType, new ByteArrayEncoding(record), sync);
+ appendAddRecordTransactional(txID, id, recordType, new ByteArrayEncoding(record));
}
public void appendAddRecordTransactional(final long txID,
final long id,
final byte recordType,
- final EncodingSupport record,
- final boolean sync) throws Exception
+ final EncodingSupport record) throws Exception
{
if (state != STATE_LOADED)
{
@@ -489,7 +487,7 @@
{
JournalTransaction tx = getTransactionInfo(txID);
- JournalFile usedFile = appendRecord(bb, false, getTransactionCallback(tx, sync));
+ JournalFile usedFile = appendRecord(bb, false, tx, null);
tx.addPositive(usedFile, id);
}
@@ -507,17 +505,15 @@
public void appendUpdateRecordTransactional(final long txID,
final long id,
final byte recordType,
- final byte[] record,
- final boolean sync) throws Exception
+ final byte[] record) throws Exception
{
- appendUpdateRecordTransactional(txID, id, recordType, new ByteArrayEncoding(record), sync);
+ appendUpdateRecordTransactional(txID, id, recordType, new ByteArrayEncoding(record));
}
public void appendUpdateRecordTransactional(final long txID,
final long id,
final byte recordType,
- final EncodingSupport record,
- final boolean sync) throws Exception
+ final EncodingSupport record) throws Exception
{
if (state != STATE_LOADED)
{
@@ -547,7 +543,7 @@
{
JournalTransaction tx = getTransactionInfo(txID);
- JournalFile usedFile = appendRecord(bb, false, getTransactionCallback(tx, sync));
+ JournalFile usedFile = appendRecord(bb, false, tx, null);
tx.addPositive(usedFile, id);
}
@@ -562,15 +558,14 @@
}
}
- public void appendDeleteRecordTransactional(final long txID, final long id, final byte[] record, final boolean sync) throws Exception
+ public void appendDeleteRecordTransactional(final long txID, final long id, final byte[] record) throws Exception
{
- appendDeleteRecordTransactional(txID, id, new ByteArrayEncoding(record), sync);
+ appendDeleteRecordTransactional(txID, id, new ByteArrayEncoding(record));
}
public void appendDeleteRecordTransactional(final long txID,
final long id,
- final EncodingSupport record,
- final boolean sync) throws Exception
+ final EncodingSupport record) throws Exception
{
if (state != STATE_LOADED)
{
@@ -601,7 +596,7 @@
{
JournalTransaction tx = getTransactionInfo(txID);
- JournalFile usedFile = appendRecord(bb, false, getTransactionCallback(tx, sync));
+ JournalFile usedFile = appendRecord(bb, false, tx, null);
tx.addNegative(usedFile, id);
}
@@ -616,7 +611,7 @@
}
}
- public void appendDeleteRecordTransactional(final long txID, final long id, final boolean sync) throws Exception
+ public void appendDeleteRecordTransactional(final long txID, final long id) throws Exception
{
if (state != STATE_LOADED)
{
@@ -643,7 +638,7 @@
{
JournalTransaction tx = getTransactionInfo(txID);
- JournalFile usedFile = appendRecord(bb, false, getTransactionCallback(tx, sync));
+ JournalFile usedFile = appendRecord(bb, false, tx, null);
tx.addNegative(usedFile, id);
}
@@ -679,22 +674,24 @@
throw new IllegalStateException("Journal must be loaded first");
}
+ JournalTransaction tx = getTransactionInfo(txID);
+
+ if (sync)
+ {
+ tx.syncPreviousFiles();
+ }
+
readLockCompact.lock();
- IOCallback callback = null;
-
try
{
- JournalTransaction tx = getTransactionInfo(txID);
ChannelBuffer bb = writeTransaction(PREPARE_RECORD, txID, tx, transactionData);
- callback = getTransactionCallback(tx, sync);
-
lock.acquire();
try
{
- JournalFile usedFile = appendRecord(bb, sync, callback);
+ JournalFile usedFile = appendRecord(bb, sync, tx, null);
tx.prepare(usedFile);
}
@@ -710,11 +707,7 @@
}
// We should wait this outside of the lock, to increase throughput
- if (callback != null)
- {
- callback.waitCompletion();
- }
-
+ tx.waitCompletion();
}
/**
@@ -741,15 +734,18 @@
throw new IllegalStateException("Journal must be loaded first");
}
+ JournalTransaction tx = transactionInfos.remove(txID);
+
+ if (sync)
+ {
+ tx.syncPreviousFiles();
+ }
+
readLockCompact.lock();
- IOCallback callback = null;
-
try
{
- JournalTransaction tx = transactionInfos.remove(txID);
-
if (tx == null)
{
throw new IllegalStateException("Cannot find tx with id " + txID);
@@ -757,12 +753,10 @@
ChannelBuffer bb = writeTransaction(COMMIT_RECORD, txID, tx, null);
- callback = getTransactionCallback(tx, sync);
-
lock.acquire();
try
{
- JournalFile usedFile = appendRecord(bb, sync, callback);
+ JournalFile usedFile = appendRecord(bb, sync, tx, null);
tx.commit(usedFile);
}
@@ -777,10 +771,10 @@
readLockCompact.unlock();
}
- // We should wait this outside of the lock, to increase throuput
- if (callback != null)
+ if (sync)
{
- callback.waitCompletion();
+ // We should wait this outside of the lock, to increase throuput
+ tx.waitCompletion();
}
}
@@ -791,13 +785,13 @@
throw new IllegalStateException("Journal must be loaded first");
}
- IOCallback callback = null;
-
readLockCompact.lock();
+ JournalTransaction tx = null;
+
try
{
- JournalTransaction tx = transactionInfos.remove(txID);
+ tx = transactionInfos.remove(txID);
if (tx == null)
{
@@ -813,12 +807,10 @@
bb.writeLong(txID);
bb.writeInt(size);
- callback = getTransactionCallback(tx, sync);
-
lock.acquire();
try
{
- JournalFile usedFile = appendRecord(bb, sync, callback);
+ JournalFile usedFile = appendRecord(bb, sync, tx, null);
tx.rollback(usedFile);
}
@@ -834,9 +826,10 @@
}
// We should wait this outside of the lock, to increase throuput
- if (callback != null)
+
+ if (sync)
{
- callback.waitCompletion();
+ tx.waitCompletion();
}
}
@@ -934,7 +927,7 @@
}
final ConcurrentMap<Long, RecordFilesRelationship> recordsSnapshot = recordsSnapshotList;
-
+
Compactor compactor = new Compactor(recordsSnapshot, dataFilesToProcess.get(0).getFileID());
for (final JournalFile file : dataFilesToProcess)
@@ -960,11 +953,11 @@
class Compactor implements JournalReader
{
JournalFile currentOutputFile;
-
+
SequentialFile sequentialFile;
ByteBuffer bufferWrite;
-
+
int nextOrderingID;
ConcurrentMap<Long, RecordFilesRelationship> recordsSnapshot;
@@ -1001,7 +994,7 @@
sequentialFile.write(bufferWrite, true);
sequentialFile.close();
}
-
+
bufferWrite = fileFactory.newBuffer(fileSize);
currentOutputFile = openFile(false);
sequentialFile = currentOutputFile.getFile();
@@ -1578,10 +1571,7 @@
for (JournalTransaction tx : transactionInfos.values())
{
- if (tx.getCallback() != null)
- {
- tx.getCallback().waitCompletion();
- }
+ tx.waitCallbacks();
}
if (filesExecutor != null && !filesExecutor.isShutdown())
@@ -2349,7 +2339,7 @@
file.read(bb);
int fileID = bb.getInt();
-
+
int orderingID = bb.getInt();
fileFactory.releaseBuffer(bb);
@@ -2375,7 +2365,7 @@
{
int oid1 = f1.getOrderingID();
int oid2 = f2.getOrderingID();
-
+
if (oid1 == oid2)
{
int id1 = f1.getFileID();
@@ -2397,7 +2387,10 @@
/**
* Note: You should aways guarantee locking the semaphore lock.
* */
- private JournalFile appendRecord(final MessagingBuffer bb, final boolean sync, final IOCallback callback) throws Exception
+ private JournalFile appendRecord(final MessagingBuffer bb,
+ final boolean sync,
+ final JournalTransaction tx,
+ IOCallback callback) throws Exception
{
try
{
@@ -2433,9 +2426,34 @@
if (currentFile == null)
{
- throw new IllegalStateException("Current file = null");
+ throw new NullPointerException("Current file = null");
}
+ if (tx != null)
+ {
+ if (callback != null)
+ {
+ // sanity check, it should not happen.
+ throw new IllegalArgumentException("Invalid callback parameter. Use of tx is mutually exclusive with the callback");
+ }
+
+ // The callback of a transaction has to be taken inside the lock,
+ // when we guarantee the currentFile will not be changed,
+ // since we individualize the callback per file
+ callback = tx.getCallback(currentFile);
+
+ if (sync)
+ {
+ // We already did sync previous files outside of the lock,
+ // but in a very rare occasion (maybe in a low speed disk)
+ // you could have a race where the currentFile changed between the last sync to the time the lock was acquired.
+ // So, we call the syncPreviousFiles again to guarantee data on disk.
+ // Even if there is data to be synced, this should be very fast since previous files were already scheduled to be closed.
+ // This is just verifying if previous files are already closed
+ tx.syncPreviousFiles();
+ }
+ }
+
bb.writerIndex(SIZE_BYTE);
bb.writeInt(currentFile.getFileID());
@@ -2691,34 +2709,6 @@
}
}
- private IOCallback getTransactionCallback(final JournalTransaction tx, final boolean sync) throws MessagingException
- {
- if (sync && fileFactory.isSupportsCallbacks())
- {
- TransactionCallback callback = tx.getCallback();
-
- if (callback == null)
- {
- callback = new TransactionCallback();
-
- tx.setCallback(callback);
- }
-
- if (callback.errorMessage != null)
- {
- throw new MessagingException(callback.errorCode, callback.errorMessage);
- }
-
- callback.countUp();
-
- return callback;
- }
- else
- {
- return null;
- }
- }
-
public ChannelBuffer newBuffer(final int size)
{
return ChannelBuffers.buffer(size);
@@ -2834,8 +2824,6 @@
private class JournalTransaction
{
- private TransactionCallback callback;
-
private List<Pair<JournalFile, Long>> pos;
private List<Pair<JournalFile, Long>> neg;
@@ -2844,30 +2832,87 @@
// We can't have those files being reclaimed or compacted if there is a pending transaction
private Set<JournalFile> pendingFiles;
+ private TransactionCallback currentCallback;
+
// Map of file id to number of elements participating on the transaction
// in that file
// Used to verify completion on reload
private final Map<Integer, AtomicInteger> numberOfElementsPerFile = new HashMap<Integer, AtomicInteger>();
+ private Map<JournalFile, TransactionCallback> callbackList;
+
public Map<Integer, AtomicInteger> getElementsSummary()
{
return numberOfElementsPerFile;
}
- /**
- * @param callback
- */
- public void setCallback(TransactionCallback callback)
+ /** 99.99 % of the times previous files will be already synced, since they are scheduled to be closed.
+ * Because of that, this operation should be almost very fast.*/
+ public void syncPreviousFiles() throws Exception
{
- this.callback = callback;
+ if (fileFactory.isSupportsCallbacks())
+ {
+ if (callbackList != null)
+ {
+ for (Map.Entry<JournalFile, TransactionCallback> entry : callbackList.entrySet())
+ {
+ if (entry.getKey() != currentFile)
+ {
+ entry.getValue().waitCompletion();
+ }
+ }
+ }
+ }
+ else
+ {
+ for (JournalFile file: pendingFiles)
+ {
+ if (file != currentFile)
+ {
+ file.getFile().waitForClose();
+ }
+ }
+ }
}
+ public TransactionCallback getCurrentCallback()
+ {
+ return currentCallback;
+ }
+
/**
* @return
*/
- public TransactionCallback getCallback()
+ public TransactionCallback getCallback(JournalFile file) throws Exception
{
- return this.callback;
+ if (fileFactory.isSupportsCallbacks())
+ {
+ if (callbackList == null)
+ {
+ callbackList = new HashMap<JournalFile, TransactionCallback>();
+ }
+
+ currentCallback = callbackList.get(file);
+
+ if (currentCallback == null)
+ {
+ currentCallback = new TransactionCallback();
+ callbackList.put(file, currentCallback);
+ }
+
+ if (currentCallback.errorMessage != null)
+ {
+ throw new MessagingException(currentCallback.errorCode, currentCallback.errorMessage);
+ }
+
+ currentCallback.countUp();
+
+ return currentCallback;
+ }
+ else
+ {
+ return null;
+ }
}
public void addPositive(final JournalFile file, final long id)
@@ -2945,6 +2990,26 @@
}
}
+ public void waitCallbacks() throws Exception
+ {
+ if (callbackList != null)
+ {
+ for (TransactionCallback callback : callbackList.values())
+ {
+ callback.waitCompletion();
+ }
+ }
+ }
+
+ /** Wait completion at the latest file only */
+ public void waitCompletion() throws Exception
+ {
+ if (currentCallback != null)
+ {
+ currentCallback.waitCompletion();
+ }
+ }
+
/**
* The caller of this method needs to guarantee lock.acquire before calling this method if being used outside of the lock context.
* or else potFilesMap could be affected
@@ -3096,7 +3161,7 @@
for (int i = 0; i < pages; i++)
{
- appendRecord(bb, false, null);
+ appendRecord(bb, false, null, null);
}
lock.release();
Modified: branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java 2009-06-23 19:12:39 UTC (rev 7445)
+++ branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java 2009-06-23 22:48:09 UTC (rev 7446)
@@ -340,16 +340,14 @@
messageJournal.appendAddRecordTransactional(txID,
message.getMessageID(),
ADD_LARGE_MESSAGE,
- new LargeMessageEncoding(((LargeServerMessage)message)),
- syncTransactional);
+ new LargeMessageEncoding(((LargeServerMessage)message)));
}
else
{
messageJournal.appendAddRecordTransactional(txID,
message.getMessageID(),
ADD_MESSAGE,
- message,
- syncTransactional);
+ message);
}
}
@@ -360,7 +358,7 @@
{
// Instead of updating the record, we delete the old one as that is
// better for reclaiming
- messageJournal.appendDeleteRecordTransactional(txID, pageTransaction.getRecordID(), syncTransactional);
+ messageJournal.appendDeleteRecordTransactional(txID, pageTransaction.getRecordID());
}
pageTransaction.setRecordID(generateUniqueID());
@@ -368,8 +366,7 @@
messageJournal.appendAddRecordTransactional(txID,
pageTransaction.getRecordID(),
PAGE_TRANSACTION,
- pageTransaction,
- syncTransactional);
+ pageTransaction);
}
public void storeReferenceTransactional(final long txID, final long queueID, final long messageID) throws Exception
@@ -377,8 +374,7 @@
messageJournal.appendUpdateRecordTransactional(txID,
messageID,
ADD_REF,
- new RefEncoding(queueID),
- syncTransactional);
+ new RefEncoding(queueID));
}
public void storeAcknowledgeTransactional(final long txID, final long queueID, final long messageID) throws Exception
@@ -386,13 +382,12 @@
messageJournal.appendUpdateRecordTransactional(txID,
messageID,
ACKNOWLEDGE_REF,
- new RefEncoding(queueID),
- syncTransactional);
+ new RefEncoding(queueID));
}
public void deletePageTransactional(final long txID, final long recordID) throws Exception
{
- messageJournal.appendDeleteRecordTransactional(txID, recordID, syncTransactional);
+ messageJournal.appendDeleteRecordTransactional(txID, recordID);
}
public void updateScheduledDeliveryTimeTransactional(final long txID, final MessageReference ref) throws Exception
@@ -403,13 +398,12 @@
messageJournal.appendUpdateRecordTransactional(txID,
ref.getMessage().getMessageID(),
SET_SCHEDULED_DELIVERY_TIME,
- encoding,
- syncTransactional);
+ encoding);
}
public void deleteMessageTransactional(final long txID, final long queueID, final long messageID) throws Exception
{
- messageJournal.appendDeleteRecordTransactional(txID, messageID, new DeleteEncoding(queueID), syncTransactional);
+ messageJournal.appendDeleteRecordTransactional(txID, messageID, new DeleteEncoding(queueID));
}
public void prepare(final long txID, final Xid xid) throws Exception
@@ -434,7 +428,7 @@
{
DuplicateIDEncoding encoding = new DuplicateIDEncoding(address, duplID);
- messageJournal.appendAddRecordTransactional(txID, recordID, DUPLICATE_ID, encoding, syncTransactional);
+ messageJournal.appendAddRecordTransactional(txID, recordID, DUPLICATE_ID, encoding);
}
public void updateDuplicateIDTransactional(final long txID,
@@ -444,12 +438,12 @@
{
DuplicateIDEncoding encoding = new DuplicateIDEncoding(address, duplID);
- messageJournal.appendUpdateRecordTransactional(txID, recordID, DUPLICATE_ID, encoding, syncTransactional);
+ messageJournal.appendUpdateRecordTransactional(txID, recordID, DUPLICATE_ID, encoding);
}
public void deleteDuplicateIDTransactional(long txID, long recordID) throws Exception
{
- messageJournal.appendDeleteRecordTransactional(txID, recordID, syncTransactional);
+ messageJournal.appendDeleteRecordTransactional(txID, recordID);
}
// Other operations
Modified: branches/clebert_temp_expirement/tests/src/org/jboss/messaging/tests/performance/journal/JournalImplTestUnit.java
===================================================================
--- branches/clebert_temp_expirement/tests/src/org/jboss/messaging/tests/performance/journal/JournalImplTestUnit.java 2009-06-23 19:12:39 UTC (rev 7445)
+++ branches/clebert_temp_expirement/tests/src/org/jboss/messaging/tests/performance/journal/JournalImplTestUnit.java 2009-06-23 22:48:09 UTC (rev 7446)
@@ -221,7 +221,7 @@
long startTrans = System.currentTimeMillis();
for (int j = 0; j < 1000; j++)
{
- journal.appendAddRecordTransactional(i, count++, (byte)0, data, true);
+ journal.appendAddRecordTransactional(i, count++, (byte)0, data);
}
journal.appendCommitRecord(i, true);
Modified: branches/clebert_temp_expirement/tests/src/org/jboss/messaging/tests/stress/journal/remote/RemoteJournalAppender.java
===================================================================
--- branches/clebert_temp_expirement/tests/src/org/jboss/messaging/tests/stress/journal/remote/RemoteJournalAppender.java 2009-06-23 19:12:39 UTC (rev 7445)
+++ branches/clebert_temp_expirement/tests/src/org/jboss/messaging/tests/stress/journal/remote/RemoteJournalAppender.java 2009-06-23 22:48:09 UTC (rev 7446)
@@ -215,7 +215,7 @@
if (transactionSize != 0)
{
- journal.appendAddRecordTransactional(transactionId, id, (byte)99, buffer.array(), false);
+ journal.appendAddRecordTransactional(transactionId, id, (byte)99, buffer.array());
if (++transactionCounter == transactionSize)
{
Modified: branches/clebert_temp_expirement/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/AlignedJournalImplTest.java
===================================================================
--- branches/clebert_temp_expirement/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/AlignedJournalImplTest.java 2009-06-23 19:12:39 UTC (rev 7445)
+++ branches/clebert_temp_expirement/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/AlignedJournalImplTest.java 2009-06-23 22:48:09 UTC (rev 7446)
@@ -390,7 +390,7 @@
assertEquals(0, records.size());
assertEquals(0, transactions.size());
- journalImpl.appendAddRecordTransactional(1, 1, (byte)1, new SimpleEncoding(1, (byte)1), false);
+ journalImpl.appendAddRecordTransactional(1, 1, (byte)1, new SimpleEncoding(1, (byte)1));
setupJournal(JOURNAL_SIZE, 100);
@@ -429,7 +429,7 @@
for (int i = 0; i < 10; i++)
{
- journalImpl.appendAddRecordTransactional(77l, 1, (byte)1, new SimpleEncoding(1, (byte)1), false);
+ journalImpl.appendAddRecordTransactional(77l, 1, (byte)1, new SimpleEncoding(1, (byte)1));
journalImpl.forceMoveNextFile();
}
@@ -437,7 +437,7 @@
assertEquals(12, factory.listFiles("tt").size());
- journalImpl.appendAddRecordTransactional(78l, 1, (byte)1, new SimpleEncoding(1, (byte)1), false);
+ journalImpl.appendAddRecordTransactional(78l, 1, (byte)1, new SimpleEncoding(1, (byte)1));
assertEquals(12, factory.listFiles("tt").size());
@@ -478,7 +478,7 @@
for (int i = 0; i < 10; i++)
{
- journalImpl.appendAddRecordTransactional(1, i, (byte)1, new SimpleEncoding(1, (byte)1), false);
+ journalImpl.appendAddRecordTransactional(1, i, (byte)1, new SimpleEncoding(1, (byte)1));
journalImpl.forceMoveNextFile();
}
@@ -501,7 +501,7 @@
for (int i = 0; i < 10; i++)
{
- journalImpl.appendDeleteRecordTransactional(2l, i, false);
+ journalImpl.appendDeleteRecordTransactional(2l, i);
journalImpl.forceMoveNextFile();
}
@@ -536,7 +536,7 @@
journalImpl.appendAddRecordTransactional(1l,
2l,
(byte)3,
- new SimpleEncoding(1900 - JournalImpl.SIZE_ADD_RECORD_TX, (byte)4), false);
+ new SimpleEncoding(1900 - JournalImpl.SIZE_ADD_RECORD_TX, (byte)4));
journalImpl.appendCommitRecord(1l, false);
@@ -561,7 +561,7 @@
for (int i = 0; i < 20; i++)
{
- journalImpl.appendAddRecordTransactional(1l, i, (byte)0, new SimpleEncoding(1, (byte)15), false);
+ journalImpl.appendAddRecordTransactional(1l, i, (byte)0, new SimpleEncoding(1, (byte)15));
journalImpl.forceMoveNextFile();
}
@@ -626,8 +626,8 @@
for (int i = 0; i < 20; i++)
{
- journalImpl.appendAddRecordTransactional(1l, i, (byte)0, new SimpleEncoding(1, (byte)15), false);
- journalImpl.appendAddRecordTransactional(2l, i + 20l, (byte)0, new SimpleEncoding(1, (byte)15), false);
+ journalImpl.appendAddRecordTransactional(1l, i, (byte)0, new SimpleEncoding(1, (byte)15));
+ journalImpl.appendAddRecordTransactional(2l, i + 20l, (byte)0, new SimpleEncoding(1, (byte)15));
journalImpl.forceMoveNextFile();
}
@@ -734,13 +734,13 @@
for (int i = 0; i < 10; i++)
{
- journalImpl.appendAddRecordTransactional(1l, i, (byte)0, new SimpleEncoding(1, (byte)15), false);
+ journalImpl.appendAddRecordTransactional(1l, i, (byte)0, new SimpleEncoding(1, (byte)15));
journalImpl.forceMoveNextFile();
}
for (int i = 10; i < 20; i++)
{
- journalImpl.appendAddRecordTransactional(1l, i, (byte)0, new SimpleEncoding(1, (byte)15), false);
+ journalImpl.appendAddRecordTransactional(1l, i, (byte)0, new SimpleEncoding(1, (byte)15));
journalImpl.forceMoveNextFile();
}
@@ -796,7 +796,7 @@
for (int i = 0; i < 10; i++)
{
- journalImpl.appendAddRecordTransactional(1l, i, (byte)0, new SimpleEncoding(1, (byte)15), false);
+ journalImpl.appendAddRecordTransactional(1l, i, (byte)0, new SimpleEncoding(1, (byte)15));
}
journalImpl.forceMoveNextFile();
@@ -807,7 +807,7 @@
for (int i = 0; i < 10; i++)
{
- journalImpl.appendDeleteRecordTransactional(2l, i, false);
+ journalImpl.appendDeleteRecordTransactional(2l, i);
}
journalImpl.appendCommitRecord(2l, false);
@@ -841,7 +841,7 @@
{
journalImpl.forceMoveNextFile();
}
- journalImpl.appendAddRecordTransactional(1l, i, (byte)0, new SimpleEncoding(1, (byte)15), false);
+ journalImpl.appendAddRecordTransactional(1l, i, (byte)0, new SimpleEncoding(1, (byte)15));
}
journalImpl.appendCommitRecord(1l, false);
@@ -852,7 +852,7 @@
{
journalImpl.forceMoveNextFile();
}
- journalImpl.appendDeleteRecordTransactional(2l, i, false);
+ journalImpl.appendDeleteRecordTransactional(2l, i);
}
journalImpl.appendCommitRecord(2l, false);
@@ -878,7 +878,7 @@
journalImpl.appendAddRecord(10l, (byte)0, new SimpleEncoding(10, (byte)0), false);
- journalImpl.appendDeleteRecordTransactional(1l, 10l, new SimpleEncoding(100, (byte)'j'), false);
+ journalImpl.appendDeleteRecordTransactional(1l, 10l, new SimpleEncoding(100, (byte)'j'));
journalImpl.appendPrepareRecord(1, xid, false);
@@ -929,7 +929,7 @@
for (int i = 0; i < 10; i++)
{
- journalImpl.appendAddRecordTransactional(1, i, (byte)1, new SimpleEncoding(50, (byte)1), false);
+ journalImpl.appendAddRecordTransactional(1, i, (byte)1, new SimpleEncoding(50, (byte)1));
journalImpl.forceMoveNextFile();
}
@@ -968,7 +968,7 @@
for (int i = 0; i < 10; i++)
{
- journalImpl.appendDeleteRecordTransactional(2l, i, false);
+ journalImpl.appendDeleteRecordTransactional(2l, i);
}
SimpleEncoding xid2 = new SimpleEncoding(15, (byte)2);
@@ -1018,7 +1018,7 @@
for (int i = 0; i < 10; i++)
{
- journalImpl.appendAddRecordTransactional(1, i, (byte)1, new SimpleEncoding(50, (byte)1), false);
+ journalImpl.appendAddRecordTransactional(1, i, (byte)1, new SimpleEncoding(50, (byte)1));
journalImpl.forceMoveNextFile();
}
@@ -1067,7 +1067,7 @@
for (int i = 0; i < 10; i++)
{
- journalImpl.appendAddRecordTransactional(1l, i, (byte)0, new SimpleEncoding(1, (byte)0), false);
+ journalImpl.appendAddRecordTransactional(1l, i, (byte)0, new SimpleEncoding(1, (byte)0));
journalImpl.forceMoveNextFile();
}
@@ -1096,7 +1096,7 @@
for (int i = 0; i < 10; i++)
{
- journalImpl.appendAddRecordTransactional(1l, i, (byte)0, new SimpleEncoding(1, (byte)0), false);
+ journalImpl.appendAddRecordTransactional(1l, i, (byte)0, new SimpleEncoding(1, (byte)0));
}
journalImpl.appendCommitRecord(1l, false);
@@ -1119,7 +1119,7 @@
for (int i = 0; i < 10; i++)
{
- journalImpl.appendAddRecordTransactional(1l, i, (byte)0, new SimpleEncoding(1, (byte)0), false);
+ journalImpl.appendAddRecordTransactional(1l, i, (byte)0, new SimpleEncoding(1, (byte)0));
}
journalImpl.appendCommitRecord(1l, false);
@@ -1203,7 +1203,7 @@
latchStart.await();
for (int i = 0; i < NUMBER_OF_ELEMENTS; i++)
{
- journalImpl.appendAddRecordTransactional(i, i, (byte)1, new SimpleEncoding(50, (byte)1), false);
+ journalImpl.appendAddRecordTransactional(i, i, (byte)1, new SimpleEncoding(50, (byte)1));
journalImpl.appendCommitRecord(i, false);
queueDelete.offer(i);
}
Modified: branches/clebert_temp_expirement/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalAsyncTest.java
===================================================================
--- branches/clebert_temp_expirement/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalAsyncTest.java 2009-06-23 19:12:39 UTC (rev 7445)
+++ branches/clebert_temp_expirement/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalAsyncTest.java 2009-06-23 22:48:09 UTC (rev 7446)
@@ -85,7 +85,7 @@
{
for (int i = 0; i < 10; i++)
{
- journalImpl.appendAddRecordTransactional(1l, i, (byte)1, new SimpleEncoding(1, (byte)0), true);
+ journalImpl.appendAddRecordTransactional(1l, i, (byte)1, new SimpleEncoding(1, (byte)0));
}
latch.countDown();
@@ -147,7 +147,7 @@
{
for (int i = 0; i < 10; i++)
{
- journalImpl.appendAddRecordTransactional(1l, i, (byte)1, new SimpleEncoding(1, (byte)0), true);
+ journalImpl.appendAddRecordTransactional(1l, i, (byte)1, new SimpleEncoding(1, (byte)0));
}
journalImpl.appendRollbackRecord(1l, true);
@@ -211,7 +211,7 @@
{
for (int i = 0; i < 10; i++)
{
- journalImpl.appendAddRecordTransactional(1l, i, (byte)1, new SimpleEncoding(1, (byte)0), true);
+ journalImpl.appendAddRecordTransactional(1l, i, (byte)1, new SimpleEncoding(1, (byte)0));
}
journalImpl.appendCommitRecord(1l, true);
@@ -268,7 +268,7 @@
factory.setHoldCallbacks(true, null);
factory.setGenerateErrors(true);
- journalImpl.appendAddRecordTransactional(1l, 1, (byte)1, new SimpleEncoding(1, (byte)0), true);
+ journalImpl.appendAddRecordTransactional(1l, 1, (byte)1, new SimpleEncoding(1, (byte)0));
factory.flushAllCallbacks();
@@ -277,7 +277,7 @@
try
{
- journalImpl.appendAddRecordTransactional(1l, 2, (byte)1, new SimpleEncoding(1, (byte)0), true);
+ journalImpl.appendAddRecordTransactional(1l, 2, (byte)1, new SimpleEncoding(1, (byte)0));
fail("Exception expected"); // An exception already happened in one
// of the elements on this transaction.
// We can't accept any more elements on
Modified: branches/clebert_temp_expirement/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalImplTestBase.java
===================================================================
--- branches/clebert_temp_expirement/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalImplTestBase.java 2009-06-23 19:12:39 UTC (rev 7445)
+++ branches/clebert_temp_expirement/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalImplTestBase.java 2009-06-23 22:48:09 UTC (rev 7446)
@@ -270,7 +270,7 @@
// SIZE_BYTE
byte[] record = generateRecord(recordLength - JournalImpl.SIZE_ADD_RECORD_TX);
- journal.appendAddRecordTransactional(txID, element, (byte)0, record, sync);
+ journal.appendAddRecordTransactional(txID, element, (byte)0, record);
tx.records.add(new RecordInfo(element, (byte)0, record, false));
@@ -287,7 +287,7 @@
{
byte[] updateRecord = generateRecord(recordLength - JournalImpl.SIZE_UPDATE_RECORD_TX);
- journal.appendUpdateRecordTransactional(txID, element, (byte)0, updateRecord, sync);
+ journal.appendUpdateRecordTransactional(txID, element, (byte)0, updateRecord);
tx.records.add(new RecordInfo(element, (byte)0, updateRecord, true));
}
@@ -300,7 +300,7 @@
for (long element : arguments)
{
- journal.appendDeleteRecordTransactional(txID, element, sync);
+ journal.appendDeleteRecordTransactional(txID, element);
tx.deletes.add(new RecordInfo(element, (byte)0, null, true));
}
Modified: branches/clebert_temp_expirement/tests/src/org/jboss/messaging/tests/util/JournalExample.java
===================================================================
--- branches/clebert_temp_expirement/tests/src/org/jboss/messaging/tests/util/JournalExample.java 2009-06-23 19:12:39 UTC (rev 7445)
+++ branches/clebert_temp_expirement/tests/src/org/jboss/messaging/tests/util/JournalExample.java 2009-06-23 22:48:09 UTC (rev 7446)
@@ -147,7 +147,7 @@
0,
1,
2,
- 5 }, true);
+ 5 });
}
// After this is complete, you're sure the records are there
More information about the jboss-cvs-commits
mailing list