[jboss-cvs] JBoss Messaging SVN: r7460 - in branches/clebert_temp_expirement: tests/src/org/jboss/messaging/tests/unit/core/journal/impl and 1 other directory.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Thu Jun 25 00:07:44 EDT 2009
Author: clebert.suconic at jboss.com
Date: 2009-06-25 00:07:44 -0400 (Thu, 25 Jun 2009)
New Revision: 7460
Added:
branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalReader.java
Modified:
branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFile.java
branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalFile.java
branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java
branches/clebert_temp_expirement/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalImplTestBase.java
branches/clebert_temp_expirement/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalImplTestUnit.java
Log:
changes
Modified: branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFile.java
===================================================================
--- branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFile.java 2009-06-24 21:37:13 UTC (rev 7459)
+++ branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFile.java 2009-06-25 04:07:44 UTC (rev 7460)
@@ -262,7 +262,10 @@
*/
public void renameTo(String newFileName) throws Exception
{
- close();
+ if (isOpen())
+ {
+ close();
+ }
File newFile = new File(directory + "/" + newFileName);
file.renameTo(newFile);
file = newFile;
Modified: branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalFile.java
===================================================================
--- branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalFile.java 2009-06-24 21:37:13 UTC (rev 7459)
+++ branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalFile.java 2009-06-25 04:07:44 UTC (rev 7460)
@@ -56,7 +56,7 @@
boolean isCanReclaim();
long getOffset();
-
+
int getFileID();
int getOrderingID();
Modified: branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java
===================================================================
--- branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java 2009-06-24 21:37:13 UTC (rev 7459)
+++ branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java 2009-06-25 04:07:44 UTC (rev 7460)
@@ -286,20 +286,11 @@
try
{
+ int size = SIZE_ADD_RECORD + record.getEncodeSize();
- int recordLength = record.getEncodeSize();
-
- int size = SIZE_ADD_RECORD + recordLength;
-
ChannelBuffer bb = newBuffer(size);
- bb.writeByte(ADD_RECORD);
- bb.writeInt(-1); // skip ID part
- bb.writeLong(id);
- bb.writeInt(recordLength);
- bb.writeByte(recordType);
- record.encode(bb);
- bb.writeInt(size);
+ writeAddRecord(id, -1, recordType, record, size, bb); // fileID will be filled later
callback = getSyncCallback(sync);
@@ -682,8 +673,11 @@
try
{
- ChannelBuffer bb = writeTransaction(PREPARE_RECORD, txID, tx, transactionData);
+ int size = SIZE_COMPLETE_TRANSACTION_RECORD + transactionData.getEncodeSize() + SIZE_INT;
+ ChannelBuffer bb = newBuffer(size);
+ writeTransaction(PREPARE_RECORD, txID, tx, transactionData, size, bb);
+
lock.acquire();
try
{
@@ -742,8 +736,10 @@
throw new IllegalStateException("Cannot find tx with id " + txID);
}
- ChannelBuffer bb = writeTransaction(COMMIT_RECORD, txID, tx, null);
+ ChannelBuffer bb = newBuffer(SIZE_COMPLETE_TRANSACTION_RECORD);
+ writeTransaction(COMMIT_RECORD, txID, tx, null, SIZE_COMPLETE_TRANSACTION_RECORD, bb);
+
lock.acquire();
try
{
@@ -924,8 +920,10 @@
for (final JournalFile file : dataFilesToProcess)
{
readJournalFile(file, compactor);
-
}
+
+
+ compactor.flushBuffer();
writeLockCompact.lock();
try
@@ -947,7 +945,11 @@
SequentialFile sequentialFile;
+ int fileID;
+
ByteBuffer bufferWrite;
+
+ ChannelBuffer channelWrapper;
int nextOrderingID;
@@ -963,21 +965,18 @@
{
if (bufferWrite == null)
{
- flushFile();
+ openFile();
}
else
{
if (bufferWrite.position() + size > bufferWrite.limit())
{
- flushFile();
+ openFile();
}
}
}
- /**
- * @throws Exception
- */
- private void flushFile() throws Exception
+ public void flushBuffer() throws Exception
{
if (bufferWrite != null)
{
@@ -986,10 +985,27 @@
sequentialFile.close();
}
+ bufferWrite = null;
+ }
+
+ /**
+ * @throws Exception
+ */
+ private void openFile() throws Exception
+ {
+ flushBuffer();
+
bufferWrite = fileFactory.newBuffer(fileSize);
- currentOutputFile = openFile(false);
+ channelWrapper = ChannelBuffers.wrappedBuffer(bufferWrite);
+
+ currentOutputFile = getFile(false, false);
sequentialFile = currentOutputFile.getFile();
- bufferWrite.putInt(currentOutputFile.getFileID());
+ sequentialFile.renameTo(sequentialFile.getFileName() + ".cmp");
+ sequentialFile.open(1);
+ currentOutputFile = new JournalFileImpl(sequentialFile, nextOrderingID, nextOrderingID);
+ fileID = nextOrderingID;
+ System.out.println("Next OrderingID = " + nextOrderingID);
+ bufferWrite.putInt(nextOrderingID);
bufferWrite.putInt(nextOrderingID++);
}
@@ -997,6 +1013,13 @@
{
if (recordsSnapshot.get(info.id) != null)
{
+ int size = SIZE_ADD_RECORD + info.data.length;
+
+ checkSize(size);
+
+ writeAddRecord(info.id, fileID, info.getUserRecordType(), new ByteArrayEncoding(info.data), size, channelWrapper);
+
+
System.out.println("Record " + info.id + " to be out on compacted file");
}
}
@@ -1318,10 +1341,7 @@
throw new IllegalStateException("Cannot find tx " + transactionID);
}
- boolean healthy = checkTransactionHealth(file,
- journalTransaction,
- orderedFiles,
- numberOfRecords);
+ boolean healthy = checkTransactionHealth(file, journalTransaction, orderedFiles, numberOfRecords);
if (healthy)
{
@@ -1800,7 +1820,7 @@
return jf;
}
- private int readJournalFile(JournalFile file, JournalReader reader) throws Exception
+ public int readJournalFile(JournalFile file, JournalReader reader) throws Exception
{
ByteBuffer wholeFileBuffer = fileFactory.newBuffer(fileSize);
@@ -1925,7 +1945,7 @@
wholeFileBuffer.get(record);
}
-
+
// Case this is a transaction, this will contain the number of records on a transaction, at the currentFile
int transactionCheckNumberOfRecords = 0;
@@ -2152,16 +2172,13 @@
* @return
* @throws Exception
*/
- private ChannelBuffer writeTransaction(final byte recordType,
- final long txID,
- final JournalTransaction tx,
- final EncodingSupport transactionData) throws Exception
+ private void writeTransaction(final byte recordType,
+ final long txID,
+ final JournalTransaction tx,
+ final EncodingSupport transactionData,
+ final int size,
+ final ChannelBuffer bb) throws Exception
{
- int size = SIZE_COMPLETE_TRANSACTION_RECORD + (transactionData != null ? transactionData.getEncodeSize() + SIZE_INT
- : 0);
-
- ChannelBuffer bb = newBuffer(size);
-
bb.writeByte(recordType);
bb.writeInt(-1); // skip ID part
bb.writeLong(txID);
@@ -2178,8 +2195,29 @@
}
bb.writeInt(size);
+ }
- return bb;
+ /**
+ * @param id
+ * @param recordType
+ * @param record
+ * @param size
+ * @param bb
+ */
+ private void writeAddRecord(final long id,
+ final int fileId,
+ final byte recordType,
+ final EncodingSupport record,
+ int size,
+ ChannelBuffer bb)
+ {
+ bb.writeByte(ADD_RECORD);
+ bb.writeInt(fileId);
+ bb.writeLong(id);
+ bb.writeInt(record.getEncodeSize());
+ bb.writeByte(recordType);
+ record.encode(bb);
+ bb.writeInt(size);
}
private boolean isTransaction(final byte recordType)
@@ -2371,7 +2409,8 @@
if (sync)
{
// 99 % of the times this will be already synced, as previous files should be closed already.
- // This is to have 100% guarantee the transaction will be persisted and no loss of information would happen
+ // This is to have 100% guarantee the transaction will be persisted and no loss of information would
+ // happen
tx.syncPreviousFiles();
}
@@ -2382,7 +2421,6 @@
}
}
-
// Adding fileID
bb.writerIndex(SIZE_BYTE);
@@ -2550,7 +2588,7 @@
* */
private void pushOpenedFile() throws Exception
{
- JournalFile nextOpenedFile = openFile(true);
+ JournalFile nextOpenedFile = getFile(true, true);
openedFiles.offer(nextOpenedFile);
}
@@ -2559,7 +2597,7 @@
* @return
* @throws Exception
*/
- private JournalFile openFile(boolean multiAIO) throws Exception
+ private JournalFile getFile(boolean keepOpened, boolean multiAIO) throws Exception
{
JournalFile nextOpenedFile = null;
try
@@ -2572,11 +2610,14 @@
if (nextOpenedFile == null)
{
- nextOpenedFile = createFile(true, multiAIO);
+ nextOpenedFile = createFile(keepOpened, multiAIO);
}
else
{
- openFile(nextOpenedFile, multiAIO);
+ if (keepOpened)
+ {
+ openFile(nextOpenedFile, multiAIO);
+ }
}
return nextOpenedFile;
}
@@ -2765,9 +2806,9 @@
private TransactionCallback currentCallback;
private Map<JournalFile, TransactionCallback> callbackList;
-
+
private JournalFile lastFile = null;
-
+
private final AtomicInteger counter = new AtomicInteger();
private AtomicInteger internalgetCounter(final JournalFile file)
@@ -2779,7 +2820,7 @@
}
return counter;
}
-
+
public int getCounter(final JournalFile file)
{
return internalgetCounter(file).intValue();
@@ -2789,7 +2830,7 @@
{
internalgetCounter(file).incrementAndGet();
}
-
+
/**
* @param currentFile
* @param bb
@@ -3110,61 +3151,5 @@
}
}
- private static interface JournalReader
- {
- void addRecord(RecordInfo info) throws Exception;
- /**
- * @param recordInfo
- * @throws Exception
- */
- void updateRecord(RecordInfo recordInfo) throws Exception;
-
- /**
- * @param recordID
- */
- void deleteRecord(long recordID) throws Exception;
-
- /**
- * @param transactionID
- * @param recordInfo
- * @throws Exception
- */
- void addRecordTX(long transactionID, RecordInfo recordInfo) throws Exception;
-
- /**
- * @param transactionID
- * @param recordInfo
- * @throws Exception
- */
- void updateRecordTX(long transactionID, RecordInfo recordInfo) throws Exception;
-
- /**
- * @param transactionID
- * @param recordInfo
- */
- void deleteRecordTX(long transactionID, RecordInfo recordInfo) throws Exception;
-
- /**
- * @param transactionID
- * @param extraData
- * @param summaryData
- */
- void prepareRecord(long transactionID, byte[] extraData, int numberOfRecords) throws Exception;
-
- /**
- * @param transactionID
- * @param summaryData
- */
- void commitRecord(long transactionID, int numberOfRecords) throws Exception;
-
- /**
- * @param transactionID
- */
- void rollbackRecord(long transactionID) throws Exception;
-
- public void markAsDataFile(JournalFile file);
-
- }
-
}
Added: branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalReader.java
===================================================================
--- branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalReader.java (rev 0)
+++ branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalReader.java 2009-06-25 04:07:44 UTC (rev 7460)
@@ -0,0 +1,91 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2009, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+
+package org.jboss.messaging.core.journal.impl;
+
+import org.jboss.messaging.core.journal.RecordInfo;
+
+/**
+ * A JournalReader
+ *
+ * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public interface JournalReader
+{
+ void addRecord(RecordInfo info) throws Exception;
+
+ /**
+ * @param recordInfo
+ * @throws Exception
+ */
+ void updateRecord(RecordInfo recordInfo) throws Exception;
+
+ /**
+ * @param recordID
+ */
+ void deleteRecord(long recordID) throws Exception;
+
+ /**
+ * @param transactionID
+ * @param recordInfo
+ * @throws Exception
+ */
+ void addRecordTX(long transactionID, RecordInfo recordInfo) throws Exception;
+
+ /**
+ * @param transactionID
+ * @param recordInfo
+ * @throws Exception
+ */
+ void updateRecordTX(long transactionID, RecordInfo recordInfo) throws Exception;
+
+ /**
+ * @param transactionID
+ * @param recordInfo
+ */
+ void deleteRecordTX(long transactionID, RecordInfo recordInfo) throws Exception;
+
+ /**
+ * @param transactionID
+ * @param extraData
+ * @param summaryData
+ */
+ void prepareRecord(long transactionID, byte[] extraData, int numberOfRecords) throws Exception;
+
+ /**
+ * @param transactionID
+ * @param summaryData
+ */
+ void commitRecord(long transactionID, int numberOfRecords) throws Exception;
+
+ /**
+ * @param transactionID
+ */
+ void rollbackRecord(long transactionID) throws Exception;
+
+ public void markAsDataFile(JournalFile file);
+
+
+}
Modified: branches/clebert_temp_expirement/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalImplTestBase.java
===================================================================
--- branches/clebert_temp_expirement/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalImplTestBase.java 2009-06-24 21:37:13 UTC (rev 7459)
+++ branches/clebert_temp_expirement/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalImplTestBase.java 2009-06-25 04:07:44 UTC (rev 7460)
@@ -499,7 +499,8 @@
byte[] record = new byte[length];
for (int i = 0; i < length; i++)
{
- record[i] = RandomUtil.randomByte();
+ //record[i] = RandomUtil.randomByte();
+ record[i] = getSamplebyte(i);
}
return record;
}
Modified: branches/clebert_temp_expirement/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalImplTestUnit.java
===================================================================
--- branches/clebert_temp_expirement/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalImplTestUnit.java 2009-06-24 21:37:13 UTC (rev 7459)
+++ branches/clebert_temp_expirement/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalImplTestUnit.java 2009-06-25 04:07:44 UTC (rev 7460)
@@ -3076,10 +3076,12 @@
createJournal();
startJournal();
load();
+
+ int NUMBER_OF_RECRODS = 100;
long transactionID = 0;
- for (int i = 0; i < 500; i++)
+ for (int i = 0; i < NUMBER_OF_RECRODS/2; i++)
{
add(i);
if (i % 10 == 0 && i > 0)
@@ -3089,7 +3091,7 @@
update(i);
}
- for (int i = 500; i < 1000; i++)
+ for (int i = NUMBER_OF_RECRODS/2; i < NUMBER_OF_RECRODS; i++)
{
addTx(transactionID, i);
@@ -3104,7 +3106,7 @@
System.out.println("Number of Files: " + journal.getDataFilesCount());
- for (int i = 0; i < 1000; i++)
+ for (int i = 0; i < NUMBER_OF_RECRODS; i++)
{
if (!(i % 10 == 0))
{
More information about the jboss-cvs-commits
mailing list