[hornetq-commits] JBoss hornetq SVN: r9606 - branches/Branch_2_1/src/main/org/hornetq/core/journal/impl.
do-not-reply at jboss.org
do-not-reply at jboss.org
Fri Aug 27 12:44:18 EDT 2010
Author: clebert.suconic at jboss.com
Date: 2010-08-27 12:44:17 -0400 (Fri, 27 Aug 2010)
New Revision: 9606
Removed:
branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/JournalCleaner.java
Modified:
branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/AIOSequentialFile.java
branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/AIOSequentialFileFactory.java
branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/AbstractJournalUpdateTask.java
branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/AbstractSequentialFileFactory.java
branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/ExportJournal.java
branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/ImportJournal.java
branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/JournalCompactor.java
branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/JournalFile.java
branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/JournalFileImpl.java
branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/JournalFilesRepository.java
branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/JournalImpl.java
branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/JournalRecord.java
branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/JournalRecordProvider.java
branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/JournalTransaction.java
branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/NIOSequentialFile.java
branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/Reclaimer.java
branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/TimedBuffer.java
Log:
auto cleanup only
Modified: branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/AIOSequentialFile.java
===================================================================
--- branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/AIOSequentialFile.java 2010-08-27 16:17:39 UTC (rev 9605)
+++ branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/AIOSequentialFile.java 2010-08-27 16:44:17 UTC (rev 9606)
@@ -279,8 +279,8 @@
aioFile.write(positionToWrite, bytesToWrite, bytes, callback);
}
-
- public void writeInternal(ByteBuffer bytes) throws Exception
+
+ public void writeInternal(final ByteBuffer bytes) throws Exception
{
final int bytesToWrite = factory.calculateBlockSize(bytes.limit());
@@ -289,7 +289,6 @@
aioFile.writeInternal(positionToWrite, bytesToWrite, bytes);
}
-
// Protected methods
// -----------------------------------------------------------------------------------------------------
Modified: branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/AIOSequentialFileFactory.java
===================================================================
--- branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/AIOSequentialFileFactory.java 2010-08-27 16:17:39 UTC (rev 9605)
+++ branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/AIOSequentialFileFactory.java 2010-08-27 16:44:17 UTC (rev 9606)
@@ -146,7 +146,8 @@
super.start();
pollerExecutor = Executors.newCachedThreadPool(new HornetQThreadFactory("HornetQ-AIO-poller-pool" + System.identityHashCode(this),
- true, getThisClassLoader()));
+ true,
+ AIOSequentialFileFactory.getThisClassLoader()));
}
@@ -295,18 +296,17 @@
}
}
}
-
+
private static ClassLoader getThisClassLoader()
{
return AccessController.doPrivileged(new PrivilegedAction<ClassLoader>()
- {
- public ClassLoader run()
- {
- return ClientSessionFactoryImpl.class.getClassLoader();
- }
- });
-
+ {
+ public ClassLoader run()
+ {
+ return ClientSessionFactoryImpl.class.getClassLoader();
+ }
+ });
+
}
-
}
Modified: branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/AbstractJournalUpdateTask.java
===================================================================
--- branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/AbstractJournalUpdateTask.java 2010-08-27 16:17:39 UTC (rev 9605)
+++ branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/AbstractJournalUpdateTask.java 2010-08-27 16:44:17 UTC (rev 9606)
@@ -54,7 +54,7 @@
protected JournalFile currentFile;
protected SequentialFile sequentialFile;
-
+
protected final JournalFilesRepository filesRepository;
protected long nextOrderingID;
@@ -154,16 +154,14 @@
new ByteArrayEncoding(filesToRename.toByteBuffer()
.array()));
-
-
HornetQBuffer renameBuffer = HornetQBuffers.dynamicBuffer(filesToRename.writerIndex());
controlRecord.setFileID(0);
-
+
controlRecord.encode(renameBuffer);
ByteBuffer writeBuffer = fileFactory.newBuffer(renameBuffer.writerIndex());
-
+
writeBuffer.put(renameBuffer.toByteBuffer().array(), 0, renameBuffer.writerIndex());
writeBuffer.rewind();
@@ -184,10 +182,10 @@
if (writingChannel != null)
{
sequentialFile.position(0);
-
+
// To Fix the size of the file
writingChannel.writerIndex(writingChannel.capacity());
-
+
sequentialFile.writeInternal(writingChannel.toByteBuffer());
sequentialFile.close();
newDataFiles.add(currentFile);
@@ -217,13 +215,13 @@
writingChannel = HornetQBuffers.wrappedBuffer(bufferWrite);
currentFile = filesRepository.takeFile(false, false, false, true);
-
+
sequentialFile = currentFile.getFile();
sequentialFile.open(1, false);
currentFile = new JournalFileImpl(sequentialFile, nextOrderingID++, JournalImpl.FORMAT_VERSION);
-
+
JournalImpl.writeHeader(writingChannel, journal.getUserVersion(), currentFile.getFileID());
}
Modified: branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/AbstractSequentialFileFactory.java
===================================================================
--- branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/AbstractSequentialFileFactory.java 2010-08-27 16:17:39 UTC (rev 9605)
+++ branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/AbstractSequentialFileFactory.java 2010-08-27 16:44:17 UTC (rev 9606)
@@ -117,7 +117,8 @@
if (isSupportsCallbacks())
{
writeExecutor = Executors.newSingleThreadExecutor(new HornetQThreadFactory("HornetQ-Asynchronous-Persistent-Writes" + System.identityHashCode(this),
- true, getThisClassLoader()));
+ true,
+ AbstractSequentialFileFactory.getThisClassLoader()));
}
}
@@ -193,14 +194,13 @@
private static ClassLoader getThisClassLoader()
{
return AccessController.doPrivileged(new PrivilegedAction<ClassLoader>()
- {
- public ClassLoader run()
- {
- return ClientSessionFactoryImpl.class.getClassLoader();
- }
- });
-
+ {
+ public ClassLoader run()
+ {
+ return ClientSessionFactoryImpl.class.getClassLoader();
+ }
+ });
+
}
-
}
Modified: branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/ExportJournal.java
===================================================================
--- branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/ExportJournal.java 2010-08-27 16:17:39 UTC (rev 9605)
+++ branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/ExportJournal.java 2010-08-27 16:44:17 UTC (rev 9606)
@@ -19,8 +19,7 @@
import java.io.PrintStream;
import java.util.List;
-import org.hornetq.core.journal.*;
-
+import org.hornetq.core.journal.RecordInfo;
import org.hornetq.core.journal.SequentialFileFactory;
import org.hornetq.utils.Base64;
@@ -48,7 +47,7 @@
// Public --------------------------------------------------------
- public static void main(String arg[])
+ public static void main(final String arg[])
{
if (arg.length != 5)
{
@@ -58,7 +57,7 @@
try
{
- exportJournal(arg[0], arg[1], arg[2], 2, Integer.parseInt(arg[3]), arg[4]);
+ ExportJournal.exportJournal(arg[0], arg[1], arg[2], 2, Integer.parseInt(arg[3]), arg[4]);
}
catch (Exception e)
{
@@ -67,32 +66,31 @@
}
- public static void exportJournal(String directory,
- String journalPrefix,
- String journalSuffix,
- int minFiles,
- int fileSize,
- String fileOutput) throws Exception
+ public static void exportJournal(final String directory,
+ final String journalPrefix,
+ final String journalSuffix,
+ final int minFiles,
+ final int fileSize,
+ final String fileOutput) throws Exception
{
-
+
FileOutputStream fileOut = new FileOutputStream(new File(fileOutput));
BufferedOutputStream buffOut = new BufferedOutputStream(fileOut);
PrintStream out = new PrintStream(buffOut);
-
- exportJournal(directory, journalPrefix, journalSuffix, minFiles, fileSize, out);
-
+
+ ExportJournal.exportJournal(directory, journalPrefix, journalSuffix, minFiles, fileSize, out);
+
out.close();
}
-
- public static void exportJournal(String directory,
- String journalPrefix,
- String journalSuffix,
- int minFiles,
- int fileSize,
- PrintStream out) throws Exception
+ public static void exportJournal(final String directory,
+ final String journalPrefix,
+ final String journalSuffix,
+ final int minFiles,
+ final int fileSize,
+ final PrintStream out) throws Exception
{
NIOSequentialFileFactory nio = new NIOSequentialFileFactory(directory);
@@ -104,7 +102,7 @@
{
out.println("#File," + file);
- exportJournalFile(out, nio, file);
+ ExportJournal.exportJournalFile(out, nio, file);
}
}
@@ -114,67 +112,71 @@
* @param file
* @throws Exception
*/
- public static void exportJournalFile(final PrintStream out, SequentialFileFactory fileFactory, JournalFile file) throws Exception
+ public static void exportJournalFile(final PrintStream out,
+ final SequentialFileFactory fileFactory,
+ final JournalFile file) throws Exception
{
JournalImpl.readJournalFile(fileFactory, file, new JournalReaderCallback()
{
- public void onReadUpdateRecordTX(long transactionID, RecordInfo recordInfo) throws Exception
+ public void onReadUpdateRecordTX(final long transactionID, final RecordInfo recordInfo) throws Exception
{
- out.println("operation at UpdateTX,txID@" + transactionID + "," + describeRecord(recordInfo));
+ out.println("operation at UpdateTX,txID@" + transactionID + "," + ExportJournal.describeRecord(recordInfo));
}
- public void onReadUpdateRecord(RecordInfo recordInfo) throws Exception
+ public void onReadUpdateRecord(final RecordInfo recordInfo) throws Exception
{
- out.println("operation at Update," + describeRecord(recordInfo));
+ out.println("operation at Update," + ExportJournal.describeRecord(recordInfo));
}
- public void onReadRollbackRecord(long transactionID) throws Exception
+ public void onReadRollbackRecord(final long transactionID) throws Exception
{
out.println("operation at Rollback,txID@" + transactionID);
}
- public void onReadPrepareRecord(long transactionID, byte[] extraData, int numberOfRecords) throws Exception
+ public void onReadPrepareRecord(final long transactionID, final byte[] extraData, final int numberOfRecords) throws Exception
{
out.println("operation at Prepare,txID@" + transactionID +
",numberOfRecords@" +
numberOfRecords +
",extraData@" +
- encode(extraData));
+ ExportJournal.encode(extraData));
}
- public void onReadDeleteRecordTX(long transactionID, RecordInfo recordInfo) throws Exception
+ public void onReadDeleteRecordTX(final long transactionID, final RecordInfo recordInfo) throws Exception
{
- out.println("operation at DeleteRecordTX,txID@" + transactionID + "," + describeRecord(recordInfo));
+ out.println("operation at DeleteRecordTX,txID@" + transactionID +
+ "," +
+ ExportJournal.describeRecord(recordInfo));
}
- public void onReadDeleteRecord(long recordID) throws Exception
+ public void onReadDeleteRecord(final long recordID) throws Exception
{
out.println("operation at DeleteRecord,id@" + recordID);
}
- public void onReadCommitRecord(long transactionID, int numberOfRecords) throws Exception
+ public void onReadCommitRecord(final long transactionID, final int numberOfRecords) throws Exception
{
out.println("operation at Commit,txID@" + transactionID + ",numberOfRecords@" + numberOfRecords);
}
- public void onReadAddRecordTX(long transactionID, RecordInfo recordInfo) throws Exception
+ public void onReadAddRecordTX(final long transactionID, final RecordInfo recordInfo) throws Exception
{
- out.println("operation at AddRecordTX,txID@" + transactionID + "," + describeRecord(recordInfo));
+ out.println("operation at AddRecordTX,txID@" + transactionID + "," + ExportJournal.describeRecord(recordInfo));
}
- public void onReadAddRecord(RecordInfo recordInfo) throws Exception
+ public void onReadAddRecord(final RecordInfo recordInfo) throws Exception
{
- out.println("operation at AddRecord," + describeRecord(recordInfo));
+ out.println("operation at AddRecord," + ExportJournal.describeRecord(recordInfo));
}
- public void markAsDataFile(JournalFile file)
+ public void markAsDataFile(final JournalFile file)
{
}
});
}
- private static String describeRecord(RecordInfo recordInfo)
+ private static String describeRecord(final RecordInfo recordInfo)
{
return "id@" + recordInfo.id +
",userRecordType@" +
@@ -186,10 +188,10 @@
",compactCount@" +
recordInfo.compactCount +
",data@" +
- encode(recordInfo.data);
+ ExportJournal.encode(recordInfo.data);
}
- private static String encode(byte[] data)
+ private static String encode(final byte[] data)
{
return Base64.encodeBytes(data, 0, data.length, Base64.DONT_BREAK_LINES | Base64.URL_SAFE);
}
Modified: branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/ImportJournal.java
===================================================================
--- branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/ImportJournal.java 2010-08-27 16:17:39 UTC (rev 9605)
+++ branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/ImportJournal.java 2010-08-27 16:44:17 UTC (rev 9606)
@@ -25,7 +25,6 @@
import java.util.concurrent.atomic.AtomicInteger;
import org.hornetq.core.journal.RecordInfo;
-import org.hornetq.core.journal.impl.JournalRecord;
import org.hornetq.utils.Base64;
/**
@@ -52,7 +51,7 @@
// Public --------------------------------------------------------
- public static void main(String arg[])
+ public static void main(final String arg[])
{
if (arg.length != 5)
{
@@ -62,7 +61,7 @@
try
{
- importJournal(arg[0], arg[1], arg[2], 2, Integer.parseInt(arg[3]), arg[4]);
+ ImportJournal.importJournal(arg[0], arg[1], arg[2], 2, Integer.parseInt(arg[3]), arg[4]);
}
catch (Exception e)
{
@@ -71,34 +70,35 @@
}
- public static void importJournal(String directory,
- String journalPrefix,
- String journalSuffix,
- int minFiles,
- int fileSize,
- String fileInput) throws Exception
+ public static void importJournal(final String directory,
+ final String journalPrefix,
+ final String journalSuffix,
+ final int minFiles,
+ final int fileSize,
+ final String fileInput) throws Exception
{
FileInputStream fileInputStream = new FileInputStream(new File(fileInput));
- importJournal(directory, journalPrefix, journalSuffix, minFiles, fileSize, fileInputStream);
+ ImportJournal.importJournal(directory, journalPrefix, journalSuffix, minFiles, fileSize, fileInputStream);
}
- public static void importJournal(String directory,
- String journalPrefix,
- String journalSuffix,
- int minFiles,
- int fileSize,
- InputStream stream) throws Exception
+
+ public static void importJournal(final String directory,
+ final String journalPrefix,
+ final String journalSuffix,
+ final int minFiles,
+ final int fileSize,
+ final InputStream stream) throws Exception
{
Reader reader = new InputStreamReader(stream);
- importJournal(directory, journalPrefix, journalSuffix, minFiles, fileSize, reader);
+ ImportJournal.importJournal(directory, journalPrefix, journalSuffix, minFiles, fileSize, reader);
}
- public static void importJournal(String directory,
- String journalPrefix,
- String journalSuffix,
- int minFiles,
- int fileSize,
- Reader reader) throws Exception
+ public static void importJournal(final String directory,
+ final String journalPrefix,
+ final String journalSuffix,
+ final int minFiles,
+ final int fileSize,
+ final Reader reader) throws Exception
{
File journalDir = new File(directory);
@@ -139,7 +139,7 @@
continue;
}
- Properties lineProperties = parseLine(splitLine);
+ Properties lineProperties = ImportJournal.parseLine(splitLine);
String operation = null;
try
@@ -148,67 +148,67 @@
if (operation.equals("AddRecord"))
{
- RecordInfo info = parseRecord(lineProperties);
+ RecordInfo info = ImportJournal.parseRecord(lineProperties);
journal.appendAddRecord(info.id, info.userRecordType, info.data, false);
}
else if (operation.equals("AddRecordTX"))
{
- long txID = parseLong("txID", lineProperties);
- AtomicInteger counter = getCounter(txID, txCounters);
+ long txID = ImportJournal.parseLong("txID", lineProperties);
+ AtomicInteger counter = ImportJournal.getCounter(txID, txCounters);
counter.incrementAndGet();
- RecordInfo info = parseRecord(lineProperties);
+ RecordInfo info = ImportJournal.parseRecord(lineProperties);
journal.appendAddRecordTransactional(txID, info.id, info.userRecordType, info.data);
}
else if (operation.equals("AddRecordTX"))
{
- long txID = parseLong("txID", lineProperties);
- AtomicInteger counter = getCounter(txID, txCounters);
+ long txID = ImportJournal.parseLong("txID", lineProperties);
+ AtomicInteger counter = ImportJournal.getCounter(txID, txCounters);
counter.incrementAndGet();
- RecordInfo info = parseRecord(lineProperties);
+ RecordInfo info = ImportJournal.parseRecord(lineProperties);
journal.appendAddRecordTransactional(txID, info.id, info.userRecordType, info.data);
}
else if (operation.equals("UpdateTX"))
{
- long txID = parseLong("txID", lineProperties);
- AtomicInteger counter = getCounter(txID, txCounters);
+ long txID = ImportJournal.parseLong("txID", lineProperties);
+ AtomicInteger counter = ImportJournal.getCounter(txID, txCounters);
counter.incrementAndGet();
- RecordInfo info = parseRecord(lineProperties);
+ RecordInfo info = ImportJournal.parseRecord(lineProperties);
journal.appendUpdateRecordTransactional(txID, info.id, info.userRecordType, info.data);
}
else if (operation.equals("Update"))
{
- RecordInfo info = parseRecord(lineProperties);
+ RecordInfo info = ImportJournal.parseRecord(lineProperties);
journal.appendUpdateRecord(info.id, info.userRecordType, info.data, false);
}
else if (operation.equals("DeleteRecord"))
{
- long id = parseLong("id", lineProperties);
+ long id = ImportJournal.parseLong("id", lineProperties);
// If not found it means the append/update records were reclaimed already
- if (journalRecords.get((Long)id) != null)
+ if (journalRecords.get(id) != null)
{
journal.appendDeleteRecord(id, false);
}
}
else if (operation.equals("DeleteRecordTX"))
{
- long txID = parseLong("txID", lineProperties);
- long id = parseLong("id", lineProperties);
- AtomicInteger counter = getCounter(txID, txCounters);
+ long txID = ImportJournal.parseLong("txID", lineProperties);
+ long id = ImportJournal.parseLong("id", lineProperties);
+ AtomicInteger counter = ImportJournal.getCounter(txID, txCounters);
counter.incrementAndGet();
// If not found it means the append/update records were reclaimed already
- if (journalRecords.get((Long)id) != null)
+ if (journalRecords.get(id) != null)
{
journal.appendDeleteRecordTransactional(txID, id);
}
}
else if (operation.equals("Prepare"))
{
- long txID = parseLong("txID", lineProperties);
- int numberOfRecords = parseInt("numberOfRecords", lineProperties);
- AtomicInteger counter = getCounter(txID, txCounters);
- byte[] data = parseEncoding("extraData", lineProperties);
+ long txID = ImportJournal.parseLong("txID", lineProperties);
+ int numberOfRecords = ImportJournal.parseInt("numberOfRecords", lineProperties);
+ AtomicInteger counter = ImportJournal.getCounter(txID, txCounters);
+ byte[] data = ImportJournal.parseEncoding("extraData", lineProperties);
if (counter.get() == numberOfRecords)
{
@@ -227,9 +227,9 @@
}
else if (operation.equals("Commit"))
{
- long txID = parseLong("txID", lineProperties);
- int numberOfRecords = parseInt("numberOfRecords", lineProperties);
- AtomicInteger counter = getCounter(txID, txCounters);
+ long txID = ImportJournal.parseLong("txID", lineProperties);
+ int numberOfRecords = ImportJournal.parseInt("numberOfRecords", lineProperties);
+ AtomicInteger counter = ImportJournal.getCounter(txID, txCounters);
if (counter.get() == numberOfRecords)
{
journal.appendCommitRecord(txID, false);
@@ -247,7 +247,7 @@
}
else if (operation.equals("Rollback"))
{
- long txID = parseLong("txID", lineProperties);
+ long txID = ImportJournal.parseLong("txID", lineProperties);
journal.appendRollbackRecord(txID, false);
}
else
@@ -264,7 +264,7 @@
journal.stop();
}
- protected static AtomicInteger getCounter(Long txID, Map<Long, AtomicInteger> txCounters)
+ protected static AtomicInteger getCounter(final Long txID, final Map<Long, AtomicInteger> txCounters)
{
AtomicInteger counter = txCounters.get(txID);
@@ -277,50 +277,50 @@
return counter;
}
- protected static RecordInfo parseRecord(Properties properties) throws Exception
+ protected static RecordInfo parseRecord(final Properties properties) throws Exception
{
- long id = parseLong("id", properties);
- byte userRecordType = parseByte("userRecordType", properties);
- boolean isUpdate = parseBoolean("isUpdate", properties);
- byte[] data = parseEncoding("data", properties);
+ long id = ImportJournal.parseLong("id", properties);
+ byte userRecordType = ImportJournal.parseByte("userRecordType", properties);
+ boolean isUpdate = ImportJournal.parseBoolean("isUpdate", properties);
+ byte[] data = ImportJournal.parseEncoding("data", properties);
return new RecordInfo(id, userRecordType, data, isUpdate, (short)0);
}
- private static byte[] parseEncoding(String name, Properties properties) throws Exception
+ private static byte[] parseEncoding(final String name, final Properties properties) throws Exception
{
- String value = parseString(name, properties);
+ String value = ImportJournal.parseString(name, properties);
- return decode(value);
+ return ImportJournal.decode(value);
}
/**
* @param properties
* @return
*/
- private static int parseInt(String name, Properties properties) throws Exception
+ private static int parseInt(final String name, final Properties properties) throws Exception
{
- String value = parseString(name, properties);
+ String value = ImportJournal.parseString(name, properties);
return Integer.parseInt(value);
}
- private static long parseLong(String name, Properties properties) throws Exception
+ private static long parseLong(final String name, final Properties properties) throws Exception
{
- String value = parseString(name, properties);
+ String value = ImportJournal.parseString(name, properties);
return Long.parseLong(value);
}
- private static boolean parseBoolean(String name, Properties properties) throws Exception
+ private static boolean parseBoolean(final String name, final Properties properties) throws Exception
{
- String value = parseString(name, properties);
+ String value = ImportJournal.parseString(name, properties);
return Boolean.parseBoolean(value);
}
- private static byte parseByte(String name, Properties properties) throws Exception
+ private static byte parseByte(final String name, final Properties properties) throws Exception
{
- String value = parseString(name, properties);
+ String value = ImportJournal.parseString(name, properties);
return Byte.parseByte(value);
}
@@ -331,7 +331,7 @@
* @return
* @throws Exception
*/
- private static String parseString(String name, Properties properties) throws Exception
+ private static String parseString(final String name, final Properties properties) throws Exception
{
String value = properties.getProperty(name);
@@ -342,7 +342,7 @@
return value;
}
- protected static Properties parseLine(String[] splitLine)
+ protected static Properties parseLine(final String[] splitLine)
{
Properties properties = new Properties();
@@ -362,7 +362,7 @@
return properties;
}
- private static byte[] decode(String data)
+ private static byte[] decode(final String data)
{
return Base64.decode(data, Base64.DONT_BREAK_LINES | Base64.URL_SAFE);
}
Deleted: branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/JournalCleaner.java
===================================================================
Modified: branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/JournalCompactor.java
===================================================================
--- branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/JournalCompactor.java 2010-08-27 16:17:39 UTC (rev 9605)
+++ branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/JournalCompactor.java 2010-08-27 16:44:17 UTC (rev 9606)
@@ -47,10 +47,10 @@
{
private static final Logger log = Logger.getLogger(JournalCompactor.class);
-
+
// We try to separate old record from new ones when doing the compacting
// this is a split line
- // We will force a moveNextFiles when the compactCount is bellow than COMPACT_SPLIT_LINE
+ // We will force a moveNextFiles when the compactCount is bellow than COMPACT_SPLIT_LINE
private final short COMPACT_SPLIT_LINE = 2;
// Snapshot of transactions that were pending when the compactor started
@@ -217,7 +217,7 @@
{
checkSize(size, -1);
}
-
+
private void checkSize(final int size, final int compactCount) throws Exception
{
if (getWritingChannel() == null)
@@ -239,17 +239,19 @@
return;
}
}
-
+
if (getWritingChannel().writerIndex() + size > getWritingChannel().capacity())
{
openFile();
}
}
}
-
+
int currentCount;
+
// This means we will need to split when the compactCount is bellow the watermark
boolean willNeedToSplit = false;
+
boolean splitted = false;
private boolean checkCompact(final int compactCount) throws Exception
@@ -258,7 +260,7 @@
{
willNeedToSplit = true;
}
-
+
if (willNeedToSplit && compactCount < COMPACT_SPLIT_LINE)
{
willNeedToSplit = false;
@@ -271,8 +273,6 @@
return false;
}
}
-
-
/**
* Replay pending counts that happened during compacting
@@ -305,7 +305,7 @@
info.getUserRecordType(),
new ByteArrayEncoding(info.data));
addRecord.setCompactCount((short)(info.compactCount + 1));
-
+
checkSize(addRecord.getEncodeSize(), info.compactCount);
writeEncoder(addRecord);
@@ -327,7 +327,7 @@
new ByteArrayEncoding(info.data));
record.setCompactCount((short)(info.compactCount + 1));
-
+
checkSize(record.getEncodeSize(), info.compactCount);
newTransaction.addPositive(currentFile, info.id, record.getEncodeSize());
@@ -347,15 +347,15 @@
}
else
{
- JournalTransaction newTransaction = newTransactions.remove(transactionID);
+ JournalTransaction newTransaction = newTransactions.remove(transactionID);
if (newTransaction != null)
{
JournalInternalRecord commitRecord = new JournalCompleteRecordTX(true, transactionID, null);
-
+
checkSize(commitRecord.getEncodeSize());
-
+
writeEncoder(commitRecord, newTransaction.getCounter(currentFile));
-
+
newTransaction.commit(currentFile);
}
}
@@ -366,7 +366,8 @@
if (newRecords.get(recordID) != null)
{
// Sanity check, it should never happen
- throw new IllegalStateException("Inconsistency during compacting: Delete record being read on an existent record (id=" + recordID + ")");
+ throw new IllegalStateException("Inconsistency during compacting: Delete record being read on an existent record (id=" + recordID +
+ ")");
}
}
@@ -428,16 +429,16 @@
JournalTransaction newTransaction = newTransactions.remove(transactionID);
if (newTransaction != null)
{
-
+
JournalInternalRecord rollbackRecord = new JournalRollbackRecordTX(transactionID);
-
+
checkSize(rollbackRecord.getEncodeSize());
writeEncoder(rollbackRecord);
-
+
newTransaction.rollback(currentFile);
}
-
+
}
}
@@ -451,7 +452,7 @@
new ByteArrayEncoding(info.data));
updateRecord.setCompactCount((short)(info.compactCount + 1));
-
+
checkSize(updateRecord.getEncodeSize(), info.compactCount);
JournalRecord newRecord = newRecords.get(info.id);
@@ -483,7 +484,7 @@
new ByteArrayEncoding(info.data));
updateRecordTX.setCompactCount((short)(info.compactCount + 1));
-
+
checkSize(updateRecordTX.getEncodeSize(), info.compactCount);
writeEncoder(updateRecordTX);
@@ -534,7 +535,7 @@
JournalRecord deleteRecord = journal.getRecords().remove(id);
if (deleteRecord == null)
{
- log.warn("Can't find record " + id + " during compact replay");
+ JournalCompactor.log.warn("Can't find record " + id + " during compact replay");
}
else
{
Modified: branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/JournalFile.java
===================================================================
--- branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/JournalFile.java 2010-08-27 16:17:39 UTC (rev 9605)
+++ branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/JournalFile.java 2010-08-27 16:44:17 UTC (rev 9606)
@@ -45,7 +45,7 @@
void decSize(int bytes);
int getLiveSize();
-
+
/** The total number of deletes this file has */
int getTotalNegativeToOthers();
@@ -58,9 +58,9 @@
/** This is a field to identify that records on this file actually belong to the current file.
* The possible implementation for this is fileID & Integer.MAX_VALUE */
int getRecordID();
-
+
long getFileID();
-
+
int getJournalVersion();
SequentialFile getFile();
Modified: branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/JournalFileImpl.java
===================================================================
--- branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/JournalFileImpl.java 2010-08-27 16:17:39 UTC (rev 9605)
+++ branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/JournalFileImpl.java 2010-08-27 16:44:17 UTC (rev 9606)
@@ -47,7 +47,7 @@
private boolean canReclaim;
- private AtomicInteger totalNegativeToOthers = new AtomicInteger(0);
+ private final AtomicInteger totalNegativeToOthers = new AtomicInteger(0);
private final int version;
@@ -61,7 +61,7 @@
this.version = version;
- this.recordID = (int)(fileID & (long)Integer.MAX_VALUE);
+ recordID = (int)(fileID & Integer.MAX_VALUE);
}
public void clearCounts()
@@ -165,7 +165,7 @@
{
try
{
- return "JournalFileImpl: (" + file.getFileName() + " id = " + this.fileID + ", recordID = " + recordID + ")";
+ return "JournalFileImpl: (" + file.getFileName() + " id = " + fileID + ", recordID = " + recordID + ")";
}
catch (Exception e)
{
Modified: branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/JournalFilesRepository.java
===================================================================
--- branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/JournalFilesRepository.java 2010-08-27 16:17:39 UTC (rev 9605)
+++ branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/JournalFilesRepository.java 2010-08-27 16:44:17 UTC (rev 9606)
@@ -86,12 +86,12 @@
// Constructors --------------------------------------------------
public JournalFilesRepository(final SequentialFileFactory fileFactory,
- final String filePrefix,
- final String fileExtension,
- final int userVersion,
- final int maxAIO,
- final int fileSize,
- final int minFiles)
+ final String filePrefix,
+ final String fileExtension,
+ final int userVersion,
+ final int maxAIO,
+ final int fileSize,
+ final int minFiles)
{
this.fileFactory = fileFactory;
this.maxAIO = maxAIO;
@@ -269,7 +269,7 @@
if (file.getFile().size() != fileSize)
{
JournalFilesRepository.log.warn("Deleting " + file + ".. as it doesn't have the configured size",
- new Exception("trace"));
+ new Exception("trace"));
file.getFile().delete();
}
else
@@ -376,7 +376,7 @@
if (nextFile == null)
{
JournalFilesRepository.log.warn("Couldn't open a file in 60 Seconds",
- new Exception("Warning: Couldn't open a file in 60 Seconds"));
+ new Exception("Warning: Couldn't open a file in 60 Seconds"));
}
}
Modified: branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/JournalImpl.java
===================================================================
--- branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/JournalImpl.java 2010-08-27 16:17:39 UTC (rev 9605)
+++ branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/JournalImpl.java 2010-08-27 16:44:17 UTC (rev 9606)
@@ -1857,7 +1857,7 @@
// just leaving some updates in this file
posFiles.addUpdateFile(file, info.data.length + JournalImpl.SIZE_ADD_RECORD + 1); // +1 = compact
- // count
+ // count
}
}
@@ -1908,7 +1908,7 @@
}
tnp.addPositive(file, info.id, info.data.length + JournalImpl.SIZE_ADD_RECORD_TX + 1); // +1 = compact
- // count
+ // count
}
public void onReadDeleteRecordTX(final long transactionID, final RecordInfo info) throws Exception
Modified: branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/JournalRecord.java
===================================================================
--- branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/JournalRecord.java 2010-08-27 16:17:39 UTC (rev 9605)
+++ branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/JournalRecord.java 2010-08-27 16:44:17 UTC (rev 9606)
@@ -74,6 +74,7 @@
}
}
+ @Override
public String toString()
{
StringBuffer buffer = new StringBuffer();
Modified: branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/JournalRecordProvider.java
===================================================================
--- branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/JournalRecordProvider.java 2010-08-27 16:17:39 UTC (rev 9605)
+++ branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/JournalRecordProvider.java 2010-08-27 16:44:17 UTC (rev 9606)
@@ -29,6 +29,6 @@
public interface JournalRecordProvider
{
JournalCompactor getCompactor();
-
+
Map<Long, JournalRecord> getRecords();
}
Modified: branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/JournalTransaction.java
===================================================================
--- branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/JournalTransaction.java 2010-08-27 16:17:39 UTC (rev 9605)
+++ branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/JournalTransaction.java 2010-08-27 16:44:17 UTC (rev 9606)
@@ -61,10 +61,10 @@
this.id = id;
this.journal = journal;
}
-
- public void replaceRecordProvider(JournalRecordProvider provider)
+
+ public void replaceRecordProvider(final JournalRecordProvider provider)
{
- this.journal = provider;
+ journal = provider;
}
/**
@@ -329,7 +329,7 @@
else
{
JournalRecord posFiles = journal.getRecords().remove(trDelete.id);
-
+
if (posFiles != null)
{
posFiles.delete(trDelete.file);
Modified: branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/NIOSequentialFile.java
===================================================================
--- branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/NIOSequentialFile.java 2010-08-27 16:17:39 UTC (rev 9605)
+++ branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/NIOSequentialFile.java 2010-08-27 16:44:17 UTC (rev 9606)
@@ -140,7 +140,7 @@
@Override
public synchronized void close() throws Exception
{
- super.close();
+ super.close();
if (maxIOSemaphore != null)
{
@@ -260,14 +260,12 @@
{
internalWrite(bytes, sync, null);
}
-
-
- public void writeInternal(ByteBuffer bytes) throws Exception
+
+ public void writeInternal(final ByteBuffer bytes) throws Exception
{
internalWrite(bytes, true, null);
}
-
@Override
protected ByteBuffer newBuffer(int size, final int limit)
{
@@ -292,7 +290,7 @@
}
return;
}
-
+
position.addAndGet(bytes.limit());
if (maxIOSemaphore == null)
Modified: branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/Reclaimer.java
===================================================================
--- branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/Reclaimer.java 2010-08-27 16:17:39 UTC (rev 9605)
+++ branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/Reclaimer.java 2010-08-27 16:44:17 UTC (rev 9606)
@@ -61,20 +61,24 @@
{
Reclaimer.trace("posCount on " + currentFile + " = " + posCount);
}
-
+
for (int j = i; j < files.length; j++)
{
if (Reclaimer.trace)
{
if (files[j].getNegCount(currentFile) != 0)
{
- Reclaimer.trace("Negative from " + files[j] + " into " + currentFile + " = " + files[j].getNegCount(currentFile));
+ Reclaimer.trace("Negative from " + files[j] +
+ " into " +
+ currentFile +
+ " = " +
+ files[j].getNegCount(currentFile));
}
}
totNeg += files[j].getNegCount(currentFile);
}
-
+
currentFile.setCanReclaim(true);
if (posCount <= totNeg)
@@ -99,7 +103,7 @@
{
Reclaimer.trace(currentFile + " Can't be reclaimed because " + file + " has negative values");
}
-
+
currentFile.setCanReclaim(false);
break;
Modified: branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/TimedBuffer.java
===================================================================
--- branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/TimedBuffer.java 2010-08-27 16:17:39 UTC (rev 9605)
+++ branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/TimedBuffer.java 2010-08-27 16:44:17 UTC (rev 9606)
@@ -127,7 +127,6 @@
return;
}
-
// Need to start with the spin limiter acquired
try
{
@@ -207,7 +206,7 @@
{
throw new IllegalStateException("TimedBuffer is not started");
}
-
+
if (sizeChecked > bufferSize)
{
throw new IllegalStateException("Can't write records bigger than the bufferSize(" + bufferSize +
@@ -259,7 +258,7 @@
{
throw new IllegalStateException("TimedBuffer is not started");
}
-
+
delayFlush = false;
bytes.encode(buffer);
@@ -306,7 +305,7 @@
{
throw new IllegalStateException("TimedBuffer is not started");
}
-
+
if ((force || !delayFlush) && buffer.writerIndex() > 0)
{
int pos = buffer.writerIndex();
More information about the hornetq-commits
mailing list