[hornetq-commits] JBoss hornetq SVN: r8435 - in trunk: src/main/org/hornetq/core/journal/impl and 4 other directories.
do-not-reply at jboss.org
do-not-reply at jboss.org
Fri Nov 27 21:30:52 EST 2009
Author: clebert.suconic at jboss.com
Date: 2009-11-27 21:30:52 -0500 (Fri, 27 Nov 2009)
New Revision: 8435
Added:
trunk/src/main/org/hornetq/core/journal/impl/InternalEncoder.java
trunk/src/main/org/hornetq/core/journal/impl/dataformat/
trunk/src/main/org/hornetq/core/journal/impl/dataformat/JournalAddRecord.java
trunk/src/main/org/hornetq/core/journal/impl/dataformat/JournalAddRecordTX.java
trunk/src/main/org/hornetq/core/journal/impl/dataformat/JournalCompleteRecordTX.java
trunk/src/main/org/hornetq/core/journal/impl/dataformat/JournalDeleteRecord.java
trunk/src/main/org/hornetq/core/journal/impl/dataformat/JournalDeleteRecordTX.java
trunk/src/main/org/hornetq/core/journal/impl/dataformat/JournalRollbackRecordTX.java
Modified:
trunk/src/main/org/hornetq/core/journal/SequentialFile.java
trunk/src/main/org/hornetq/core/journal/impl/AbstractJournalUpdateTask.java
trunk/src/main/org/hornetq/core/journal/impl/AbstractSequentialFile.java
trunk/src/main/org/hornetq/core/journal/impl/JournalCleaner.java
trunk/src/main/org/hornetq/core/journal/impl/JournalCompactor.java
trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java
trunk/src/main/org/hornetq/core/journal/impl/JournalTransaction.java
trunk/src/main/org/hornetq/core/journal/impl/TimedBuffer.java
trunk/tests/src/org/hornetq/tests/integration/journal/NIOJournalCompactTest.java
trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/AlignedJournalImplTest.java
trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/JournalImplTestBase.java
trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/JournalImplTestUnit.java
trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java
Log:
Optimization on journal - removing one unecessary buffer copy that was introduced after the TimedBuffer
Modified: trunk/src/main/org/hornetq/core/journal/SequentialFile.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/SequentialFile.java 2009-11-28 01:26:06 UTC (rev 8434)
+++ trunk/src/main/org/hornetq/core/journal/SequentialFile.java 2009-11-28 02:30:52 UTC (rev 8435)
@@ -60,6 +60,10 @@
void write(HornetQBuffer bytes, boolean sync) throws Exception;
+ void write(EncodingSupport bytes, boolean sync, IOAsyncTask callback) throws Exception;
+
+ void write(EncodingSupport bytes, boolean sync) throws Exception;
+
/** Write directly to the file without using any buffer */
void writeDirect(ByteBuffer bytes, boolean sync, IOAsyncTask callback);
Modified: trunk/src/main/org/hornetq/core/journal/impl/AbstractJournalUpdateTask.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/AbstractJournalUpdateTask.java 2009-11-28 01:26:06 UTC (rev 8434)
+++ trunk/src/main/org/hornetq/core/journal/impl/AbstractJournalUpdateTask.java 2009-11-28 02:30:52 UTC (rev 8435)
@@ -22,6 +22,7 @@
import org.hornetq.core.buffers.HornetQBuffers;
import org.hornetq.core.journal.SequentialFile;
import org.hornetq.core.journal.SequentialFileFactory;
+import org.hornetq.core.journal.impl.dataformat.JournalAddRecord;
import org.hornetq.core.logging.Logger;
import org.hornetq.utils.ConcurrentHashSet;
import org.hornetq.utils.Pair;
@@ -151,12 +152,16 @@
}
}
- JournalImpl.writeAddRecord(-1,
- 1,
- (byte)0,
- new JournalImpl.ByteArrayEncoding(filesToRename.toByteBuffer().array()),
- JournalImpl.SIZE_ADD_RECORD + filesToRename.toByteBuffer().array().length,
- renameBuffer);
+
+ InternalEncoder controlRecord = new JournalAddRecord(true,
+ 1,
+ (byte)0,
+ new JournalImpl.ByteArrayEncoding(filesToRename.toByteBuffer()
+ .array()));
+
+ controlRecord.setFileID(-1);
+
+ controlRecord.encode(renameBuffer);
ByteBuffer writeBuffer = fileFactory.newBuffer(renameBuffer.writerIndex());
@@ -229,7 +234,21 @@
{
return writingChannel;
}
+
+ protected void writeEncoder(InternalEncoder record) throws Exception
+ {
+ record.setFileID(fileID);
+ record.encode(getWritingChannel());
+ }
+ protected void writeEncoder(InternalEncoder record, int txcounter) throws Exception
+ {
+ record.setNumberOfRecords(txcounter);
+ writeEncoder(record);
+ }
+
+
+
// Private -------------------------------------------------------
// Inner classes -------------------------------------------------
Modified: trunk/src/main/org/hornetq/core/journal/impl/AbstractSequentialFile.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/AbstractSequentialFile.java 2009-11-28 01:26:06 UTC (rev 8434)
+++ trunk/src/main/org/hornetq/core/journal/impl/AbstractSequentialFile.java 2009-11-28 02:30:52 UTC (rev 8435)
@@ -21,7 +21,9 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
+import org.hornetq.core.journal.EncodingSupport;
import org.hornetq.core.buffers.HornetQBuffer;
+import org.hornetq.core.buffers.HornetQBuffers;
import org.hornetq.core.journal.IOAsyncTask;
import org.hornetq.core.journal.SequentialFile;
import org.hornetq.core.journal.SequentialFileFactory;
@@ -223,7 +225,44 @@
write(bytes, false, DummyCallback.getInstance());
}
}
+
+ public void write(final EncodingSupport bytes, final boolean sync, final IOAsyncTask callback) throws Exception
+ {
+ if (timedBuffer != null)
+ {
+ timedBuffer.addBytes(bytes, sync, callback);
+ }
+ else
+ {
+ ByteBuffer buffer = factory.newBuffer(bytes.getEncodeSize());
+
+ // If not using the TimedBuffer, a final copy is necessary
+ // Because AIO will need a specific Buffer
+ // And NIO will also need a whole buffer to perform the write
+
+ HornetQBuffer outBuffer = HornetQBuffers.wrappedBuffer(buffer);
+ bytes.encode(outBuffer);
+ buffer.rewind();
+ writeDirect(buffer, sync, callback);
+ }
+ }
+ public void write(final EncodingSupport bytes, final boolean sync) throws Exception
+ {
+ if (sync)
+ {
+ SimpleWaitIOCallback completion = new SimpleWaitIOCallback();
+
+ write(bytes, true, completion);
+
+ completion.waitCompletion();
+ }
+ else
+ {
+ write(bytes, false, DummyCallback.getInstance());
+ }
+ }
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
Added: trunk/src/main/org/hornetq/core/journal/impl/InternalEncoder.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/InternalEncoder.java (rev 0)
+++ trunk/src/main/org/hornetq/core/journal/impl/InternalEncoder.java 2009-11-28 02:30:52 UTC (rev 8435)
@@ -0,0 +1,56 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.journal.impl;
+
+import org.hornetq.core.buffers.HornetQBuffer;
+import org.hornetq.core.journal.EncodingSupport;
+
+/**
+ * A InternalEncoder
+ *
+ * @author <mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public abstract class InternalEncoder implements EncodingSupport
+{
+
+ protected int fileID;
+
+ public int getFileID()
+ {
+ return fileID;
+ }
+
+ public void setFileID(int fileID)
+ {
+ this.fileID = fileID;
+ }
+
+ public void decode(HornetQBuffer buffer)
+ {
+ }
+
+ public void setNumberOfRecords(int records)
+ {
+ }
+
+ public int getNumberOfRecords()
+ {
+ return 0;
+ }
+
+ public abstract int getEncodeSize();
+}
+
Modified: trunk/src/main/org/hornetq/core/journal/impl/JournalCleaner.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/JournalCleaner.java 2009-11-28 01:26:06 UTC (rev 8434)
+++ trunk/src/main/org/hornetq/core/journal/impl/JournalCleaner.java 2009-11-28 02:30:52 UTC (rev 8435)
@@ -20,7 +20,12 @@
import org.hornetq.core.journal.RecordInfo;
import org.hornetq.core.journal.SequentialFileFactory;
-import org.hornetq.utils.DataConstants;
+import org.hornetq.core.journal.impl.dataformat.JournalAddRecord;
+import org.hornetq.core.journal.impl.dataformat.JournalAddRecordTX;
+import org.hornetq.core.journal.impl.dataformat.JournalCompleteRecordTX;
+import org.hornetq.core.journal.impl.dataformat.JournalDeleteRecord;
+import org.hornetq.core.journal.impl.dataformat.JournalDeleteRecordTX;
+import org.hornetq.core.journal.impl.dataformat.JournalRollbackRecordTX;
/**
* A JournalCleaner
@@ -72,14 +77,10 @@
{
if (lookupRecord(info.id))
{
- int size = JournalImpl.SIZE_ADD_RECORD + info.data.length;
-
- JournalImpl.writeAddRecord(fileID,
- info.id,
- info.getUserRecordType(),
- new JournalImpl.ByteArrayEncoding(info.data),
- size,
- getWritingChannel());
+ writeEncoder(new JournalAddRecord(true,
+ info.id,
+ info.getUserRecordType(),
+ new JournalImpl.ByteArrayEncoding(info.data)));
}
}
@@ -92,15 +93,11 @@
{
incrementTransactionCounter(transactionID);
- int size = JournalImpl.SIZE_ADD_RECORD_TX + recordInfo.data.length;
-
- JournalImpl.writeAddRecordTX(fileID,
- transactionID,
- recordInfo.id,
- recordInfo.getUserRecordType(),
- new JournalImpl.ByteArrayEncoding(recordInfo.data),
- size,
- getWritingChannel());
+ writeEncoder(new JournalAddRecordTX(true,
+ transactionID,
+ recordInfo.id,
+ recordInfo.getUserRecordType(),
+ new JournalImpl.ByteArrayEncoding(recordInfo.data)));
}
}
@@ -111,14 +108,7 @@
{
int txcounter = getTransactionCounter(transactionID);
- JournalImpl.writeTransaction(fileID,
- JournalImpl.COMMIT_RECORD,
- transactionID,
- null,
- JournalImpl.SIZE_COMPLETE_TRANSACTION_RECORD,
- txcounter,
- getWritingChannel());
-
+ writeEncoder(new JournalCompleteRecordTX(true, transactionID, null), txcounter);
}
/* (non-Javadoc)
@@ -126,7 +116,7 @@
*/
public void onReadDeleteRecord(final long recordID) throws Exception
{
- JournalImpl.writeDeleteRecord(fileID, recordID, JournalImpl.SIZE_DELETE_RECORD, getWritingChannel());
+ writeEncoder(new JournalDeleteRecord(recordID));
}
/* (non-Javadoc)
@@ -134,16 +124,11 @@
*/
public void onReadDeleteRecordTX(final long transactionID, final RecordInfo recordInfo) throws Exception
{
- int size = JournalImpl.SIZE_DELETE_RECORD_TX + recordInfo.data.length;
-
incrementTransactionCounter(transactionID);
- JournalImpl.writeDeleteRecordTransactional(fileID,
- transactionID,
- recordInfo.id,
- new JournalImpl.ByteArrayEncoding(recordInfo.data),
- size,
- getWritingChannel());
+ writeEncoder(new JournalDeleteRecordTX(transactionID,
+ recordInfo.id,
+ new JournalImpl.ByteArrayEncoding(recordInfo.data)));
}
/* (non-Javadoc)
@@ -153,15 +138,8 @@
{
int txcounter = getTransactionCounter(transactionID);
- int size = JournalImpl.SIZE_COMPLETE_TRANSACTION_RECORD + extraData.length + DataConstants.SIZE_INT;
-
- JournalImpl.writeTransaction(fileID,
- JournalImpl.PREPARE_RECORD,
- transactionID,
- new JournalImpl.ByteArrayEncoding(extraData),
- size,
- txcounter,
- getWritingChannel());
+ writeEncoder(new JournalCompleteRecordTX(false, transactionID, new JournalImpl.ByteArrayEncoding(extraData)),
+ txcounter);
}
/* (non-Javadoc)
@@ -169,7 +147,7 @@
*/
public void onReadRollbackRecord(final long transactionID) throws Exception
{
- JournalImpl.writeRollback(fileID, transactionID, getWritingChannel());
+ writeEncoder(new JournalRollbackRecordTX(transactionID));
}
/* (non-Javadoc)
@@ -179,13 +157,10 @@
{
if (lookupRecord(recordInfo.id))
{
- int size = JournalImpl.SIZE_UPDATE_RECORD + recordInfo.data.length;
- JournalImpl.writeUpdateRecord(fileID,
- recordInfo.id,
- recordInfo.userRecordType,
- new JournalImpl.ByteArrayEncoding(recordInfo.data),
- size,
- getWritingChannel());
+ writeEncoder(new JournalAddRecord(false,
+ recordInfo.id,
+ recordInfo.userRecordType,
+ new JournalImpl.ByteArrayEncoding(recordInfo.data)));
}
}
@@ -197,14 +172,12 @@
if (lookupRecord(recordInfo.id))
{
incrementTransactionCounter(transactionID);
- int size = JournalImpl.SIZE_UPDATE_RECORD_TX + recordInfo.data.length;
- JournalImpl.writeUpdateRecordTX(fileID,
- transactionID,
- recordInfo.id,
- recordInfo.userRecordType,
- new JournalImpl.ByteArrayEncoding(recordInfo.data),
- size,
- getWritingChannel());
+
+ writeEncoder(new JournalAddRecordTX(false,
+ transactionID,
+ recordInfo.id,
+ recordInfo.userRecordType,
+ new JournalImpl.ByteArrayEncoding(recordInfo.data)));
}
}
Modified: trunk/src/main/org/hornetq/core/journal/impl/JournalCompactor.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/JournalCompactor.java 2009-11-28 01:26:06 UTC (rev 8434)
+++ trunk/src/main/org/hornetq/core/journal/impl/JournalCompactor.java 2009-11-28 02:30:52 UTC (rev 8435)
@@ -27,6 +27,10 @@
import org.hornetq.core.journal.SequentialFile;
import org.hornetq.core.journal.SequentialFileFactory;
import org.hornetq.core.journal.impl.JournalImpl.JournalRecord;
+import org.hornetq.core.journal.impl.dataformat.JournalAddRecord;
+import org.hornetq.core.journal.impl.dataformat.JournalAddRecordTX;
+import org.hornetq.core.journal.impl.dataformat.JournalCompleteRecordTX;
+import org.hornetq.core.journal.impl.dataformat.JournalDeleteRecordTX;
import org.hornetq.core.logging.Logger;
import org.hornetq.utils.DataConstants;
import org.hornetq.utils.Pair;
@@ -246,18 +250,16 @@
{
if (lookupRecord(info.id))
{
- int size = JournalImpl.SIZE_ADD_RECORD + info.data.length;
+ InternalEncoder addRecord = new JournalAddRecord(true,
+ info.id,
+ info.getUserRecordType(),
+ new JournalImpl.ByteArrayEncoding(info.data));
+
+ checkSize(addRecord.getEncodeSize());
- checkSize(size);
+ writeEncoder(addRecord);
- JournalImpl.writeAddRecord(fileID,
- info.id,
- info.getUserRecordType(),
- new JournalImpl.ByteArrayEncoding(info.data),
- size,
- getWritingChannel());
-
- newRecords.put(info.id, new JournalRecord(currentFile, size));
+ newRecords.put(info.id, new JournalRecord(currentFile, addRecord.getEncodeSize()));
}
}
@@ -267,19 +269,17 @@
{
JournalTransaction newTransaction = getNewJournalTransaction(transactionID);
- int size = JournalImpl.SIZE_ADD_RECORD_TX + info.data.length;
+ InternalEncoder record = new JournalAddRecordTX(true,
+ transactionID,
+ info.id,
+ info.getUserRecordType(),
+ new JournalImpl.ByteArrayEncoding(info.data));
+
+ checkSize(record.getEncodeSize());
- checkSize(size);
+ newTransaction.addPositive(currentFile, info.id, record.getEncodeSize());
- newTransaction.addPositive(currentFile, info.id, size);
-
- JournalImpl.writeAddRecordTX(fileID,
- transactionID,
- info.id,
- info.getUserRecordType(),
- new JournalImpl.ByteArrayEncoding(info.data),
- size,
- getWritingChannel());
+ writeEncoder(record);
}
else
{
@@ -315,17 +315,14 @@
{
JournalTransaction newTransaction = getNewJournalTransaction(transactionID);
- int size = JournalImpl.SIZE_DELETE_RECORD_TX + info.data.length;
+ InternalEncoder record = new JournalDeleteRecordTX(transactionID,
+ info.id,
+ new JournalImpl.ByteArrayEncoding(info.data));
- checkSize(size);
+ checkSize(record.getEncodeSize());
+
+ writeEncoder(record);
- JournalImpl.writeDeleteRecordTransactional(fileID,
- transactionID,
- info.id,
- new JournalImpl.ByteArrayEncoding(info.data),
- size,
- getWritingChannel());
-
newTransaction.addNegative(currentFile, info.id);
}
// else.. nothing to be done
@@ -343,17 +340,13 @@
JournalTransaction newTransaction = getNewJournalTransaction(transactionID);
- int size = JournalImpl.SIZE_COMPLETE_TRANSACTION_RECORD + extraData.length + DataConstants.SIZE_INT;
+ InternalEncoder prepareRecord = new JournalCompleteRecordTX(false,
+ transactionID,
+ new JournalImpl.ByteArrayEncoding(extraData));
- checkSize(size);
+ checkSize(prepareRecord.getEncodeSize());
- JournalImpl.writeTransaction(fileID,
- JournalImpl.PREPARE_RECORD,
- transactionID,
- new JournalImpl.ByteArrayEncoding(extraData),
- size,
- newTransaction.getCounter(currentFile),
- getWritingChannel());
+ writeEncoder(prepareRecord, newTransaction.getCounter(currentFile));
newTransaction.prepare(currentFile);
@@ -374,9 +367,12 @@
{
if (lookupRecord(info.id))
{
- int size = JournalImpl.SIZE_UPDATE_RECORD + info.data.length;
+ InternalEncoder updateRecord = new JournalAddRecord(false,
+ info.id,
+ info.userRecordType,
+ new JournalImpl.ByteArrayEncoding(info.data));
- checkSize(size);
+ checkSize(updateRecord.getEncodeSize());
JournalRecord newRecord = newRecords.get(info.id);
@@ -386,16 +382,10 @@
}
else
{
- newRecord.addUpdateFile(currentFile, size);
+ newRecord.addUpdateFile(currentFile, updateRecord.getEncodeSize());
}
-
- JournalImpl.writeUpdateRecord(fileID,
- info.id,
- info.userRecordType,
- new JournalImpl.ByteArrayEncoding(info.data),
- size,
- getWritingChannel());
-
+
+ writeEncoder(updateRecord);
}
}
@@ -405,19 +395,18 @@
{
JournalTransaction newTransaction = getNewJournalTransaction(transactionID);
- int size = JournalImpl.SIZE_UPDATE_RECORD_TX + info.data.length;
+ InternalEncoder updateRecordTX = new JournalAddRecordTX(false,
+ transactionID,
+ info.id,
+ info.userRecordType,
+ new JournalImpl.ByteArrayEncoding(info.data));
- checkSize(size);
+
+ checkSize(updateRecordTX.getEncodeSize());
- JournalImpl.writeUpdateRecordTX(fileID,
- transactionID,
- info.id,
- info.userRecordType,
- new JournalImpl.ByteArrayEncoding(info.data),
- size,
- getWritingChannel());
-
- newTransaction.addPositive(currentFile, info.id, size);
+ writeEncoder(updateRecordTX);
+
+ newTransaction.addPositive(currentFile, info.id, updateRecordTX.getEncodeSize());
}
else
{
@@ -425,6 +414,8 @@
}
}
+
+
/**
* @param transactionID
* @return
Modified: trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java 2009-11-28 01:26:06 UTC (rev 8434)
+++ trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java 2009-11-28 02:30:52 UTC (rev 8435)
@@ -54,6 +54,12 @@
import org.hornetq.core.journal.SequentialFileFactory;
import org.hornetq.core.journal.TestableJournal;
import org.hornetq.core.journal.TransactionFailureCallback;
+import org.hornetq.core.journal.impl.dataformat.JournalAddRecord;
+import org.hornetq.core.journal.impl.dataformat.JournalAddRecordTX;
+import org.hornetq.core.journal.impl.dataformat.JournalCompleteRecordTX;
+import org.hornetq.core.journal.impl.dataformat.JournalDeleteRecord;
+import org.hornetq.core.journal.impl.dataformat.JournalDeleteRecordTX;
+import org.hornetq.core.journal.impl.dataformat.JournalRollbackRecordTX;
import org.hornetq.core.logging.Logger;
import org.hornetq.utils.DataConstants;
import org.hornetq.utils.Pair;
@@ -114,10 +120,6 @@
public static final byte ADD_RECORD = 11;
- public static final byte SIZE_UPDATE_RECORD = BASIC_SIZE + DataConstants.SIZE_LONG +
- DataConstants.SIZE_BYTE +
- DataConstants.SIZE_INT /* + record.length */;
-
public static final byte UPDATE_RECORD = 12;
public static final int SIZE_ADD_RECORD_TX = BASIC_SIZE + DataConstants.SIZE_LONG +
@@ -127,11 +129,6 @@
public static final byte ADD_RECORD_TX = 13;
- public static final int SIZE_UPDATE_RECORD_TX = BASIC_SIZE + DataConstants.SIZE_LONG +
- DataConstants.SIZE_BYTE +
- DataConstants.SIZE_LONG +
- DataConstants.SIZE_INT /* + record.length */;
-
public static final byte UPDATE_RECORD_TX = 14;
public static final int SIZE_DELETE_RECORD_TX = BASIC_SIZE + DataConstants.SIZE_LONG +
@@ -292,207 +289,6 @@
this.maxAIO = maxAIO;
}
- // Public methods (used by package members such as JournalCompactor) (these methods are not part of the JournalImpl
- // interface)
-
- /**
- * <p>A transaction record (Commit or Prepare), will hold the number of elements the transaction has on each file.</p>
- * <p>For example, a transaction was spread along 3 journal files with 10 pendingTransactions on each file.
- * (What could happen if there are too many pendingTransactions, or if an user event delayed pendingTransactions to come in time to a single file).</p>
- * <p>The element-summary will then have</p>
- * <p>FileID1, 10</p>
- * <p>FileID2, 10</p>
- * <p>FileID3, 10</p>
- *
- * <br>
- * <p> During the load, the transaction needs to have 30 pendingTransactions spread across the files as originally written.</p>
- * <p> If for any reason there are missing pendingTransactions, that means the transaction was not completed and we should ignore the whole transaction </p>
- * <p> We can't just use a global counter as reclaiming could delete files after the transaction was successfully committed.
- * That also means not having a whole file on journal-reload doesn't mean we have to invalidate the transaction </p>
- *
- * @param recordType
- * @param txID
- * @param tx
- * @param transactionData
- * @return
- * @throws Exception
- */
- public static void writeTransaction(final int fileID,
- final byte recordType,
- final long txID,
- final EncodingSupport transactionData,
- final int size,
- final int numberOfRecords,
- final HornetQBuffer bb) throws Exception
- {
- bb.writeByte(recordType);
- bb.writeInt(fileID); // skip ID part
- bb.writeLong(txID);
- bb.writeInt(numberOfRecords);
-
- if (transactionData != null)
- {
- bb.writeInt(transactionData.getEncodeSize());
- }
-
- if (transactionData != null)
- {
- transactionData.encode(bb);
- }
-
- bb.writeInt(size);
- }
-
- /**
- * @param txID
- * @param id
- * @param recordType
- * @param record
- * @param size
- * @param bb
- */
- public static void writeUpdateRecordTX(final int fileID,
- final long txID,
- final long id,
- final byte recordType,
- final EncodingSupport record,
- final int size,
- final HornetQBuffer bb)
- {
- bb.writeByte(UPDATE_RECORD_TX);
- bb.writeInt(fileID);
- bb.writeLong(txID);
- bb.writeLong(id);
- bb.writeInt(record.getEncodeSize());
- bb.writeByte(recordType);
- record.encode(bb);
- bb.writeInt(size);
- }
-
- /**
- * @param txID
- * @param bb
- */
- public static void writeRollback(final int fileID, final long txID, HornetQBuffer bb)
- {
- bb.writeByte(ROLLBACK_RECORD);
- bb.writeInt(fileID);
- bb.writeLong(txID);
- bb.writeInt(SIZE_ROLLBACK_RECORD);
- }
-
- /**
- * @param id
- * @param recordType
- * @param record
- * @param size
- * @param bb
- */
- public static void writeUpdateRecord(final int fileId,
- final long id,
- final byte recordType,
- final EncodingSupport record,
- final int size,
- final HornetQBuffer bb)
- {
- bb.writeByte(UPDATE_RECORD);
- bb.writeInt(fileId);
- bb.writeLong(id);
- bb.writeInt(record.getEncodeSize());
- bb.writeByte(recordType);
- record.encode(bb);
- bb.writeInt(size);
- }
-
- /**
- * @param id
- * @param recordType
- * @param record
- * @param size
- * @param bb
- */
- public static void writeAddRecord(final int fileId,
- final long id,
- final byte recordType,
- final EncodingSupport record,
- final int size,
- final HornetQBuffer bb)
- {
- bb.writeByte(ADD_RECORD);
- bb.writeInt(fileId);
- bb.writeLong(id);
- bb.writeInt(record.getEncodeSize());
- bb.writeByte(recordType);
- record.encode(bb);
- bb.writeInt(size);
- }
-
- /**
- * @param id
- * @param size
- * @param bb
- */
- public static void writeDeleteRecord(final int fileId, final long id, int size, HornetQBuffer bb)
- {
- bb.writeByte(DELETE_RECORD);
- bb.writeInt(fileId);
- bb.writeLong(id);
- bb.writeInt(size);
- }
-
- /**
- * @param txID
- * @param id
- * @param record
- * @param size
- * @param bb
- */
- public static void writeDeleteRecordTransactional(final int fileID,
- final long txID,
- final long id,
- final EncodingSupport record,
- final int size,
- final HornetQBuffer bb)
- {
- bb.writeByte(DELETE_RECORD_TX);
- bb.writeInt(fileID);
- bb.writeLong(txID);
- bb.writeLong(id);
- bb.writeInt(record != null ? record.getEncodeSize() : 0);
- if (record != null)
- {
- record.encode(bb);
- }
- bb.writeInt(size);
- }
-
- /**
- * @param txID
- * @param id
- * @param recordType
- * @param record
- * @param recordLength
- * @param size
- * @param bb
- */
- public static void writeAddRecordTX(final int fileID,
- final long txID,
- final long id,
- final byte recordType,
- final EncodingSupport record,
- final int size,
- final HornetQBuffer bb)
- {
- bb.writeByte(ADD_RECORD_TX);
- bb.writeInt(fileID);
- bb.writeLong(txID);
- bb.writeLong(id);
- bb.writeInt(record.getEncodeSize());
- bb.writeByte(recordType);
- record.encode(bb);
- bb.writeInt(size);
- }
-
public Map<Long, JournalRecord> getRecords()
{
return records;
@@ -843,7 +639,7 @@
{
appendAddRecord(id, recordType, new ByteArrayEncoding(record), sync);
}
-
+
public void appendAddRecord(final long id, final byte recordType, final byte[] record, final boolean sync, final IOCompletion callback) throws Exception
{
appendAddRecord(id, recordType, new ByteArrayEncoding(record), sync, callback);
@@ -876,12 +672,8 @@
try
{
- int size = SIZE_ADD_RECORD + record.getEncodeSize();
+ InternalEncoder addRecord = new JournalAddRecord(true, id, recordType, record);
- HornetQBuffer bb = newBuffer(size);
-
- writeAddRecord(-1, id, recordType, record, size, bb); // fileID will be filled later
-
if (callback != null)
{
callback.lineUp();
@@ -890,9 +682,9 @@
lockAppend.lock();
try
{
- JournalFile usedFile = appendRecord(bb, false, sync, null, callback);
+ JournalFile usedFile = appendRecord(addRecord, false, sync, null, callback);
- records.put(id, new JournalRecord(usedFile, size));
+ records.put(id, new JournalRecord(usedFile, addRecord.getEncodeSize()));
}
finally
{
@@ -952,12 +744,8 @@
}
}
- int size = SIZE_UPDATE_RECORD + record.getEncodeSize();
+ InternalEncoder updateRecord = new JournalAddRecord(false, id, recordType, record);
- HornetQBuffer bb = newBuffer(size);
-
- writeUpdateRecord(-1, id, recordType, record, size, bb);
-
if (callback != null)
{
callback.lineUp();
@@ -966,17 +754,17 @@
lockAppend.lock();
try
{
- JournalFile usedFile = appendRecord(bb, false, sync, null, callback);
+ JournalFile usedFile = appendRecord(updateRecord, false, sync, null, callback);
// record== null here could only mean there is a compactor, and computing the delete should be done after
// compacting is done
if (jrnRecord == null)
{
- compactor.addCommandUpdate(id, usedFile, size);
+ compactor.addCommandUpdate(id, usedFile, updateRecord.getEncodeSize());
}
else
{
- jrnRecord.addUpdateFile(usedFile, size);
+ jrnRecord.addUpdateFile(usedFile, updateRecord.getEncodeSize());
}
}
finally
@@ -1029,12 +817,8 @@
}
}
- int size = SIZE_DELETE_RECORD;
+ InternalEncoder deleteRecord = new JournalDeleteRecord(id);
- HornetQBuffer bb = newBuffer(size);
-
- writeDeleteRecord(-1, id, size, bb);
-
if (callback != null)
{
callback.lineUp();
@@ -1043,7 +827,7 @@
lockAppend.lock();
try
{
- JournalFile usedFile = appendRecord(bb, false, sync, null, callback);
+ JournalFile usedFile = appendRecord(deleteRecord, false, sync, null, callback);
// record== null here could only mean there is a compactor, and computing the delete should be done after
// compacting is done
@@ -1093,20 +877,16 @@
try
{
- int size = SIZE_ADD_RECORD_TX + record.getEncodeSize();
+ InternalEncoder addRecord = new JournalAddRecordTX(true, txID, id, recordType, record);
- HornetQBuffer bb = newBuffer(size);
-
- writeAddRecordTX(-1, txID, id, recordType, record, size, bb);
-
JournalTransaction tx = getTransactionInfo(txID);
lockAppend.lock();
try
{
- JournalFile usedFile = appendRecord(bb, false, false, tx, null);
+ JournalFile usedFile = appendRecord(addRecord, false, false, tx, null);
- tx.addPositive(usedFile, id, size);
+ tx.addPositive(usedFile, id, addRecord.getEncodeSize());
}
finally
{
@@ -1146,20 +926,16 @@
try
{
- int size = SIZE_UPDATE_RECORD_TX + record.getEncodeSize();
+ InternalEncoder updateRecordTX = new JournalAddRecordTX(false, txID, id, recordType, record);
- HornetQBuffer bb = newBuffer(size);
-
- writeUpdateRecordTX(-1, txID, id, recordType, record, size, bb);
-
JournalTransaction tx = getTransactionInfo(txID);
lockAppend.lock();
try
{
- JournalFile usedFile = appendRecord(bb, false, false, tx, null);
+ JournalFile usedFile = appendRecord(updateRecordTX, false, false, tx, null);
- tx.addPositive(usedFile, id, size);
+ tx.addPositive(usedFile, id, updateRecordTX.getEncodeSize());
}
finally
{
@@ -1193,18 +969,14 @@
try
{
- int size = SIZE_DELETE_RECORD_TX + record.getEncodeSize();
+ InternalEncoder deleteRecordTX = new JournalDeleteRecordTX(txID, id, record);
- HornetQBuffer bb = newBuffer(size);
-
- writeDeleteRecordTransactional(-1, txID, id, record, size, bb);
-
JournalTransaction tx = getTransactionInfo(txID);
lockAppend.lock();
try
{
- JournalFile usedFile = appendRecord(bb, false, false, tx, null);
+ JournalFile usedFile = appendRecord(deleteRecordTX, false, false, tx, null);
tx.addNegative(usedFile, id);
}
@@ -1282,11 +1054,8 @@
try
{
- int size = SIZE_COMPLETE_TRANSACTION_RECORD + transactionData.getEncodeSize() + DataConstants.SIZE_INT;
- HornetQBuffer bb = newBuffer(size);
+ InternalEncoder prepareRecord = new JournalCompleteRecordTX(false, txID, transactionData);
- writeTransaction(-1, PREPARE_RECORD, txID, transactionData, size, -1, bb);
-
if (callback != null)
{
callback.lineUp();
@@ -1295,7 +1064,7 @@
lockAppend.lock();
try
{
- JournalFile usedFile = appendRecord(bb, true, sync, tx, callback);
+ JournalFile usedFile = appendRecord(prepareRecord, true, sync, tx, callback);
tx.prepare(usedFile);
}
@@ -1365,16 +1134,8 @@
throw new IllegalStateException("Cannot find tx with id " + txID);
}
- HornetQBuffer bb = newBuffer(SIZE_COMPLETE_TRANSACTION_RECORD);
+ InternalEncoder commitRecord = new JournalCompleteRecordTX(true, txID, null);
- writeTransaction(-1,
- COMMIT_RECORD,
- txID,
- null,
- SIZE_COMPLETE_TRANSACTION_RECORD,
- -1 /* number of records on this transaction will be filled later inside append record */,
- bb);
-
if (callback != null)
{
callback.lineUp();
@@ -1383,7 +1144,7 @@
lockAppend.lock();
try
{
- JournalFile usedFile = appendRecord(bb, true, sync, tx, callback);
+ JournalFile usedFile = appendRecord(commitRecord, true, sync, tx, callback);
tx.commit(usedFile);
}
@@ -1433,10 +1194,8 @@
throw new IllegalStateException("Cannot find tx with id " + txID);
}
- HornetQBuffer bb = newBuffer(SIZE_ROLLBACK_RECORD);
+ InternalEncoder rollbackRecord = new JournalRollbackRecordTX(txID);
- writeRollback(-1, txID, bb);
-
if (callback != null)
{
callback.lineUp();
@@ -1445,7 +1204,7 @@
lockAppend.lock();
try
{
- JournalFile usedFile = appendRecord(bb, false, sync, tx, callback);
+ JournalFile usedFile = appendRecord(rollbackRecord, false, sync, tx, callback);
tx.rollback(usedFile);
}
@@ -1586,6 +1345,7 @@
try
{
+ trace("Starting compacting operation on journal");
log.debug("Starting compacting operation on journal");
// We need to guarantee that the journal is frozen for this short time
@@ -1867,7 +1627,7 @@
// have been deleted
// just leaving some updates in this file
- posFiles.addUpdateFile(file, info.data.length + SIZE_UPDATE_RECORD);
+ posFiles.addUpdateFile(file, info.data.length + SIZE_ADD_RECORD);
}
}
@@ -2337,6 +2097,10 @@
try
{
+ if (trace)
+ {
+ trace("Cleaning up file " + file);
+ }
log.debug("Cleaning up file " + file);
if (file.getPosCount() == 0)
@@ -2847,13 +2611,13 @@
recordSize = SIZE_ADD_RECORD;
break;
case UPDATE_RECORD:
- recordSize = SIZE_UPDATE_RECORD;
+ recordSize = SIZE_ADD_RECORD;
break;
case ADD_RECORD_TX:
recordSize = SIZE_ADD_RECORD_TX;
break;
case UPDATE_RECORD_TX:
- recordSize = SIZE_UPDATE_RECORD_TX;
+ recordSize = SIZE_ADD_RECORD_TX;
break;
case DELETE_RECORD:
recordSize = SIZE_DELETE_RECORD;
@@ -2933,7 +2697,7 @@
*
* @param completeTransaction If the appendRecord is for a prepare or commit, where we should update the number of pendingTransactions on the current file
* */
- private JournalFile appendRecord(final HornetQBuffer bb,
+ private JournalFile appendRecord(final InternalEncoder encoder,
final boolean completeTransaction,
final boolean sync,
final JournalTransaction tx,
@@ -2948,7 +2712,7 @@
final IOAsyncTask callback;
- int size = bb.capacity();
+ int size = encoder.getEncodeSize();
// We take into account the fileID used on the Header
if (size > fileSize - currentFile.getFile().calculateBlockStart(SIZE_HEADER))
@@ -3012,7 +2776,7 @@
if (completeTransaction)
{
// Filling the number of pendingTransactions at the current file
- tx.fillNumberOfRecords(currentFile, bb);
+ tx.fillNumberOfRecords(currentFile, encoder);
}
}
else
@@ -3021,16 +2785,15 @@
}
// Adding fileID
- bb.writerIndex(DataConstants.SIZE_BYTE);
- bb.writeInt(currentFile.getFileID());
+ encoder.setFileID(currentFile.getFileID());
if (callback != null)
{
- currentFile.getFile().write(bb, sync, callback);
+ currentFile.getFile().write(encoder, sync, callback);
}
else
{
- currentFile.getFile().write(bb, sync);
+ currentFile.getFile().write(encoder, sync);
}
return currentFile;
@@ -3615,7 +3378,7 @@
return id1 < id2 ? -1 : id1 == id2 ? 0 : 1;
}
}
-
+
private class PerfBlast extends Thread
{
private final int pages;
@@ -3633,13 +3396,13 @@
{
lockAppend.lock();
- HornetQBuffer bb = newBuffer(490 * 1024);
+// HornetQBuffer bb = newBuffer(128 * 1024);
+//
+// for (int i = 0; i < pages; i++)
+// {
+// appendRecord(bb, false, false, null, null);
+// }
- for (int i = 0; i < pages; i++)
- {
- appendRecord(bb, false, true, null, null);
- }
-
lockAppend.unlock();
}
catch (Exception e)
@@ -3648,5 +3411,11 @@
}
}
}
+
+
+
+
+
+
}
Modified: trunk/src/main/org/hornetq/core/journal/impl/JournalTransaction.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/JournalTransaction.java 2009-11-28 01:26:06 UTC (rev 8434)
+++ trunk/src/main/org/hornetq/core/journal/impl/JournalTransaction.java 2009-11-28 02:30:52 UTC (rev 8435)
@@ -184,12 +184,9 @@
* @param currentFile
* @param bb
*/
- public void fillNumberOfRecords(final JournalFile currentFile, final HornetQBuffer bb)
+ public void fillNumberOfRecords(final JournalFile currentFile, final InternalEncoder data)
{
- bb.writerIndex(DataConstants.SIZE_BYTE + DataConstants.SIZE_INT + DataConstants.SIZE_LONG);
-
- bb.writeInt(getCounter(currentFile));
-
+ data.setNumberOfRecords(getCounter(currentFile));
}
/** 99.99 % of the times previous files will be already synced, since they are scheduled to be closed.
Modified: trunk/src/main/org/hornetq/core/journal/impl/TimedBuffer.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/TimedBuffer.java 2009-11-28 01:26:06 UTC (rev 8434)
+++ trunk/src/main/org/hornetq/core/journal/impl/TimedBuffer.java 2009-11-28 02:30:52 UTC (rev 8435)
@@ -24,7 +24,9 @@
import org.hornetq.core.buffers.HornetQBuffer;
import org.hornetq.core.buffers.HornetQBuffers;
+import org.hornetq.core.journal.EncodingSupport;
import org.hornetq.core.journal.IOAsyncTask;
+import org.hornetq.core.journal.IOCompletion;
import org.hornetq.core.logging.Logger;
import org.hornetq.utils.VariableLatch;
@@ -231,13 +233,18 @@
public synchronized void addBytes(final HornetQBuffer bytes, final boolean sync, final IOAsyncTask callback)
{
+ addBytes(new JournalImpl.ByteArrayEncoding(bytes.toByteBuffer().array()), sync, callback);
+ }
+
+ public synchronized void addBytes(final EncodingSupport bytes, final boolean sync, final IOAsyncTask callback)
+ {
if (buffer.writerIndex() == 0)
{
// Resume latch
latchTimer.down();
}
- buffer.writeBytes(bytes, bytes.capacity());
+ bytes.encode(buffer);
callbacks.add(callback);
Added: trunk/src/main/org/hornetq/core/journal/impl/dataformat/JournalAddRecord.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/dataformat/JournalAddRecord.java (rev 0)
+++ trunk/src/main/org/hornetq/core/journal/impl/dataformat/JournalAddRecord.java 2009-11-28 02:30:52 UTC (rev 8435)
@@ -0,0 +1,88 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.journal.impl.dataformat;
+
+import org.hornetq.core.buffers.HornetQBuffer;
+import org.hornetq.core.journal.EncodingSupport;
+import org.hornetq.core.journal.impl.InternalEncoder;
+import org.hornetq.core.journal.impl.JournalImpl;
+
+/**
+ * A JournalAddRecord
+ *
+ * @author <mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public class JournalAddRecord extends InternalEncoder
+{
+
+ private final long id;
+
+ private final EncodingSupport record;
+
+ private final byte recordType;
+
+ private final boolean add;
+
+ /**
+ * @param id
+ * @param recordType
+ * @param record
+ * @param size
+ */
+ public JournalAddRecord(final boolean add, final long id, final byte recordType, final EncodingSupport record)
+ {
+ this.id = id;
+
+ this.record = record;
+
+ this.recordType = recordType;
+
+ this.add = add;
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.journal.EncodingSupport#encode(org.hornetq.core.buffers.HornetQBuffer)
+ */
+ public void encode(final HornetQBuffer buffer)
+ {
+ if (add)
+ {
+ buffer.writeByte(JournalImpl.ADD_RECORD);
+ }
+ else
+ {
+ buffer.writeByte(JournalImpl.UPDATE_RECORD);
+ }
+
+ buffer.writeInt(fileID);
+
+ buffer.writeLong(id);
+
+ buffer.writeInt(record.getEncodeSize());
+
+ buffer.writeByte(recordType);
+
+ record.encode(buffer);
+
+ buffer.writeInt(getEncodeSize());
+ }
+
+ @Override
+ public int getEncodeSize()
+ {
+ return JournalImpl.SIZE_ADD_RECORD + record.getEncodeSize();
+ }
+}
\ No newline at end of file
Added: trunk/src/main/org/hornetq/core/journal/impl/dataformat/JournalAddRecordTX.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/dataformat/JournalAddRecordTX.java (rev 0)
+++ trunk/src/main/org/hornetq/core/journal/impl/dataformat/JournalAddRecordTX.java 2009-11-28 02:30:52 UTC (rev 8435)
@@ -0,0 +1,99 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.journal.impl.dataformat;
+
+import org.hornetq.core.buffers.HornetQBuffer;
+import org.hornetq.core.journal.EncodingSupport;
+import org.hornetq.core.journal.impl.InternalEncoder;
+import org.hornetq.core.journal.impl.JournalImpl;
+
+/**
+ * A JournalAddRecordTX
+ *
+ * @author <mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public class JournalAddRecordTX extends InternalEncoder
+{
+
+ private final long txID;
+
+ private final long id;
+
+ private final EncodingSupport record;
+
+ private final byte recordType;
+
+ private final boolean add;
+
+ /**
+ * @param id
+ * @param recordType
+ * @param record
+ * @param size
+ */
+ public JournalAddRecordTX(final boolean add,
+ final long txID,
+ final long id,
+ final byte recordType,
+ final EncodingSupport record)
+ {
+
+ this.txID = txID;
+
+ this.id = id;
+
+ this.record = record;
+
+ this.recordType = recordType;
+
+ this.add = add;
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.journal.EncodingSupport#encode(org.hornetq.core.buffers.HornetQBuffer)
+ */
+ public void encode(final HornetQBuffer buffer)
+ {
+ if (add)
+ {
+ buffer.writeByte(JournalImpl.ADD_RECORD_TX);
+ }
+ else
+ {
+ buffer.writeByte(JournalImpl.UPDATE_RECORD_TX);
+ }
+
+ buffer.writeInt(fileID);
+
+ buffer.writeLong(txID);
+
+ buffer.writeLong(id);
+
+ buffer.writeInt(record.getEncodeSize());
+
+ buffer.writeByte(recordType);
+
+ record.encode(buffer);
+
+ buffer.writeInt(getEncodeSize());
+ }
+
+ @Override
+ public int getEncodeSize()
+ {
+ return JournalImpl.SIZE_ADD_RECORD_TX + record.getEncodeSize();
+ }
+}
Added: trunk/src/main/org/hornetq/core/journal/impl/dataformat/JournalCompleteRecordTX.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/dataformat/JournalCompleteRecordTX.java (rev 0)
+++ trunk/src/main/org/hornetq/core/journal/impl/dataformat/JournalCompleteRecordTX.java 2009-11-28 02:30:52 UTC (rev 8435)
@@ -0,0 +1,116 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.journal.impl.dataformat;
+
+import org.hornetq.core.buffers.HornetQBuffer;
+import org.hornetq.core.journal.EncodingSupport;
+import org.hornetq.core.journal.impl.InternalEncoder;
+import org.hornetq.core.journal.impl.JournalImpl;
+
+/**
+ * <p>A transaction record (Commit or Prepare), will hold the number of elements the transaction has on each file.</p>
+ * <p>For example, a transaction was spread along 3 journal files with 10 pendingTransactions on each file.
+ * (What could happen if there are too many pendingTransactions, or if an user event delayed pendingTransactions to come in time to a single file).</p>
+ * <p>The element-summary will then have</p>
+ * <p>FileID1, 10</p>
+ * <p>FileID2, 10</p>
+ * <p>FileID3, 10</p>
+ *
+ * <br>
+ * <p> During the load, the transaction needs to have 30 pendingTransactions spread across the files as originally written.</p>
+ * <p> If for any reason there are missing pendingTransactions, that means the transaction was not completed and we should ignore the whole transaction </p>
+ * <p> We can't just use a global counter as reclaiming could delete files after the transaction was successfully committed.
+ * That also means not having a whole file on journal-reload doesn't mean we have to invalidate the transaction </p>
+ *
+ * @author <mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public class JournalCompleteRecordTX extends InternalEncoder
+{
+ private final boolean isCommit;
+
+ private final long txID;
+
+ private final EncodingSupport transactionData;
+
+ private int numberOfRecords;
+
+ public JournalCompleteRecordTX(final boolean isCommit, final long txID, final EncodingSupport transactionData)
+ {
+ this.isCommit = isCommit;
+
+ this.txID = txID;
+
+ this.transactionData = transactionData;
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.journal.EncodingSupport#encode(org.hornetq.core.buffers.HornetQBuffer)
+ */
+ public void encode(final HornetQBuffer buffer)
+ {
+ if (isCommit)
+ {
+ buffer.writeByte(JournalImpl.COMMIT_RECORD);
+ }
+ else
+ {
+ buffer.writeByte(JournalImpl.PREPARE_RECORD);
+ }
+
+ buffer.writeInt(fileID);
+
+ buffer.writeLong(txID);
+
+ buffer.writeInt(numberOfRecords);
+
+ if (transactionData != null)
+ {
+ buffer.writeInt(transactionData.getEncodeSize());
+ }
+
+ if (transactionData != null)
+ {
+ transactionData.encode(buffer);
+ }
+
+ buffer.writeInt(getEncodeSize());
+ }
+
+ @Override
+ public void setNumberOfRecords(final int records)
+ {
+ numberOfRecords = records;
+ }
+
+ @Override
+ public int getNumberOfRecords()
+ {
+ return numberOfRecords;
+ }
+
+ @Override
+ public int getEncodeSize()
+ {
+ if (isCommit)
+ {
+ return JournalImpl.SIZE_COMPLETE_TRANSACTION_RECORD;
+ }
+ else
+ {
+ return JournalImpl.SIZE_PREPARE_RECORD + (transactionData != null ? transactionData.getEncodeSize() : 0);
+ }
+ }
+}
Added: trunk/src/main/org/hornetq/core/journal/impl/dataformat/JournalDeleteRecord.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/dataformat/JournalDeleteRecord.java (rev 0)
+++ trunk/src/main/org/hornetq/core/journal/impl/dataformat/JournalDeleteRecord.java 2009-11-28 02:30:52 UTC (rev 8435)
@@ -0,0 +1,62 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.journal.impl.dataformat;
+
+import org.hornetq.core.buffers.HornetQBuffer;
+import org.hornetq.core.journal.impl.InternalEncoder;
+import org.hornetq.core.journal.impl.JournalImpl;
+
+/**
+ * A JournalDeleteRecord
+ *
+ * @author <mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public class JournalDeleteRecord extends InternalEncoder
+{
+
+ private final long id;
+
+ /**
+ * @param id
+ * @param recordType
+ * @param record
+ * @param size
+ */
+ public JournalDeleteRecord(final long id)
+ {
+ this.id = id;
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.journal.EncodingSupport#encode(org.hornetq.core.buffers.HornetQBuffer)
+ */
+ public void encode(final HornetQBuffer buffer)
+ {
+ buffer.writeByte(JournalImpl.DELETE_RECORD);
+
+ buffer.writeInt(fileID);
+
+ buffer.writeLong(id);
+
+ buffer.writeInt(getEncodeSize());
+ }
+
+ @Override
+ public int getEncodeSize()
+ {
+ return JournalImpl.SIZE_DELETE_RECORD;
+ }
+}
Added: trunk/src/main/org/hornetq/core/journal/impl/dataformat/JournalDeleteRecordTX.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/dataformat/JournalDeleteRecordTX.java (rev 0)
+++ trunk/src/main/org/hornetq/core/journal/impl/dataformat/JournalDeleteRecordTX.java 2009-11-28 02:30:52 UTC (rev 8435)
@@ -0,0 +1,80 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.journal.impl.dataformat;
+
+import org.hornetq.core.buffers.HornetQBuffer;
+import org.hornetq.core.journal.EncodingSupport;
+import org.hornetq.core.journal.impl.InternalEncoder;
+import org.hornetq.core.journal.impl.JournalImpl;
+
+/**
+ * A JournalDeleteRecordTX
+ *
+ * @author <mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public class JournalDeleteRecordTX extends InternalEncoder
+{
+
+ private final long txID;
+
+ private final long id;
+
+ private final EncodingSupport record;
+
+ /**
+ * @param id
+ * @param recordType
+ * @param record
+ * @param size
+ */
+ public JournalDeleteRecordTX(final long txID, final long id, final EncodingSupport record)
+ {
+ this.id = id;
+
+ this.txID = txID;
+
+ this.record = record;
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.journal.EncodingSupport#encode(org.hornetq.core.buffers.HornetQBuffer)
+ */
+ public void encode(final HornetQBuffer buffer)
+ {
+ buffer.writeByte(JournalImpl.DELETE_RECORD_TX);
+
+ buffer.writeInt(fileID);
+
+ buffer.writeLong(txID);
+
+ buffer.writeLong(id);
+
+ buffer.writeInt(record != null ? record.getEncodeSize() : 0);
+
+ if (record != null)
+ {
+ record.encode(buffer);
+ }
+
+ buffer.writeInt(getEncodeSize());
+ }
+
+ @Override
+ public int getEncodeSize()
+ {
+ return JournalImpl.SIZE_DELETE_RECORD_TX + (record != null ? record.getEncodeSize() : 0);
+ }
+}
Added: trunk/src/main/org/hornetq/core/journal/impl/dataformat/JournalRollbackRecordTX.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/dataformat/JournalRollbackRecordTX.java (rev 0)
+++ trunk/src/main/org/hornetq/core/journal/impl/dataformat/JournalRollbackRecordTX.java 2009-11-28 02:30:52 UTC (rev 8435)
@@ -0,0 +1,53 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.journal.impl.dataformat;
+
+import org.hornetq.core.buffers.HornetQBuffer;
+import org.hornetq.core.journal.impl.InternalEncoder;
+import org.hornetq.core.journal.impl.JournalImpl;
+
+/**
+ * A JournalRollbackRecordTX
+ *
+ * @author <mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public class JournalRollbackRecordTX extends InternalEncoder
+{
+ private final long txID;
+
+ public JournalRollbackRecordTX(final long txID)
+ {
+ this.txID = txID;
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.journal.EncodingSupport#encode(org.hornetq.core.buffers.HornetQBuffer)
+ */
+ public void encode(final HornetQBuffer buffer)
+ {
+ buffer.writeByte(JournalImpl.ROLLBACK_RECORD);
+ buffer.writeInt(fileID);
+ buffer.writeLong(txID);
+ buffer.writeInt(JournalImpl.SIZE_ROLLBACK_RECORD);
+
+ }
+
+ @Override
+ public int getEncodeSize()
+ {
+ return JournalImpl.SIZE_ROLLBACK_RECORD;
+ }
+}
Modified: trunk/tests/src/org/hornetq/tests/integration/journal/NIOJournalCompactTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/journal/NIOJournalCompactTest.java 2009-11-28 01:26:06 UTC (rev 8434)
+++ trunk/tests/src/org/hornetq/tests/integration/journal/NIOJournalCompactTest.java 2009-11-28 02:30:52 UTC (rev 8435)
@@ -616,7 +616,7 @@
journal.forceMoveNextFile();
update(id);
- expectedSizes.add(recordLength + JournalImpl.SIZE_UPDATE_RECORD);
+ expectedSizes.add(recordLength + JournalImpl.SIZE_ADD_RECORD);
journal.forceMoveNextFile();
}
Modified: trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/AlignedJournalImplTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/AlignedJournalImplTest.java 2009-11-28 01:26:06 UTC (rev 8434)
+++ trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/AlignedJournalImplTest.java 2009-11-28 02:30:52 UTC (rev 8435)
@@ -719,7 +719,7 @@
{
final int JOURNAL_SIZE = 2000;
- setupAndLoadJournal(JOURNAL_SIZE, 100);
+ setupAndLoadJournal(JOURNAL_SIZE, 1);
assertEquals(2, factory.listFiles("tt").size());
@@ -759,6 +759,7 @@
// reload will think the record came from a different journal usage)
file.position(100);
+ buffer.rewind();
file.writeDirect(buffer, true);
file.close();
Modified: trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/JournalImplTestBase.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/JournalImplTestBase.java 2009-11-28 01:26:06 UTC (rev 8434)
+++ trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/JournalImplTestBase.java 2009-11-28 02:30:52 UTC (rev 8435)
@@ -279,7 +279,7 @@
for (long element : arguments)
{
- byte[] updateRecord = generateRecord(recordLength - JournalImpl.SIZE_UPDATE_RECORD_TX);
+ byte[] updateRecord = generateRecord(recordLength - JournalImpl.SIZE_ADD_RECORD_TX);
journal.appendUpdateRecordTransactional(txID, element, (byte)0, updateRecord);
Modified: trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/JournalImplTestUnit.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/JournalImplTestUnit.java 2009-11-28 01:26:06 UTC (rev 8434)
+++ trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/JournalImplTestUnit.java 2009-11-28 02:30:52 UTC (rev 8435)
@@ -3118,7 +3118,9 @@
addTx(1, 1);
+ addTx(1, 2);
updateTx(1, 1);
+ updateTx(1, 3);
commit(1);
update(1);
Modified: trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java 2009-11-28 01:26:06 UTC (rev 8434)
+++ trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java 2009-11-28 02:30:52 UTC (rev 8435)
@@ -20,7 +20,9 @@
import java.util.concurrent.ConcurrentHashMap;
import org.hornetq.core.asyncio.BufferCallback;
+import org.hornetq.core.journal.EncodingSupport;
import org.hornetq.core.buffers.HornetQBuffer;
+import org.hornetq.core.buffers.HornetQBuffers;
import org.hornetq.core.journal.IOAsyncTask;
import org.hornetq.core.journal.SequentialFile;
import org.hornetq.core.journal.SequentialFileFactory;
@@ -622,8 +624,32 @@
bytes.readerIndex(0);
writeDirect(bytes.toByteBuffer(), sync);
}
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.journal.SequentialFile#write(org.hornetq.core.journal.EncodingSupport, boolean, org.hornetq.core.journal.IOCompletion)
+ */
+ public void write(EncodingSupport bytes, boolean sync, IOAsyncTask callback) throws Exception
+ {
+ ByteBuffer buffer = newBuffer(bytes.getEncodeSize());
+ HornetQBuffer outbuffer = HornetQBuffers.wrappedBuffer(buffer);
+ bytes.encode(outbuffer);
+ write(outbuffer, sync, callback);
+ }
/* (non-Javadoc)
+ * @see org.hornetq.core.journal.SequentialFile#write(org.hornetq.core.journal.EncodingSupport, boolean)
+ */
+ public void write(EncodingSupport bytes, boolean sync) throws Exception
+ {
+ ByteBuffer buffer = newBuffer(bytes.getEncodeSize());
+ HornetQBuffer outbuffer = HornetQBuffers.wrappedBuffer(buffer);
+ bytes.encode(outbuffer);
+ write(outbuffer, sync);
+ }
+
+
+
+ /* (non-Javadoc)
* @see org.hornetq.core.journal.SequentialFile#exists()
*/
public boolean exists()
More information about the hornetq-commits
mailing list