Author: clebert.suconic(a)jboss.com
Date: 2010-04-29 19:01:30 -0400 (Thu, 29 Apr 2010)
New Revision: 9186
Modified:
trunk/src/main/org/hornetq/core/journal/Journal.java
trunk/src/main/org/hornetq/core/journal/impl/AbstractJournalUpdateTask.java
trunk/src/main/org/hornetq/core/journal/impl/JournalCleaner.java
trunk/src/main/org/hornetq/core/journal/impl/JournalCompactor.java
trunk/src/main/org/hornetq/core/journal/impl/JournalFile.java
trunk/src/main/org/hornetq/core/journal/impl/JournalFileImpl.java
trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java
trunk/src/main/org/hornetq/core/replication/impl/ReplicatedJournal.java
trunk/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java
trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/JournalImplTestUnit.java
trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/ReclaimerTest.java
Log:
https://jira.jboss.org/jira/browse/HORNETQ-318 - using a long to the fileID - first
commit
Modified: trunk/src/main/org/hornetq/core/journal/Journal.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/Journal.java 2010-04-29 16:21:28 UTC (rev
9185)
+++ trunk/src/main/org/hornetq/core/journal/Journal.java 2010-04-29 23:01:30 UTC (rev
9186)
@@ -114,6 +114,8 @@
int getAlignment() throws Exception;
int getNumberOfRecords();
+
+ int getUserVersion();
void perfBlast(int pages) throws Exception;
Modified: trunk/src/main/org/hornetq/core/journal/impl/AbstractJournalUpdateTask.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/AbstractJournalUpdateTask.java 2010-04-29
16:21:28 UTC (rev 9185)
+++ trunk/src/main/org/hornetq/core/journal/impl/AbstractJournalUpdateTask.java 2010-04-29
23:01:30 UTC (rev 9186)
@@ -55,10 +55,8 @@
protected SequentialFile sequentialFile;
- protected int fileID;
+ protected long nextOrderingID;
- protected int nextOrderingID;
-
private HornetQBuffer writingChannel;
private final Set<Long> recordsSnapshot = new ConcurrentHashSet<Long>();
@@ -72,7 +70,7 @@
protected AbstractJournalUpdateTask(final SequentialFileFactory fileFactory,
final JournalImpl journal,
final Set<Long> recordsSnapshot,
- final int nextOrderingID)
+ final long nextOrderingID)
{
super();
this.journal = journal;
@@ -95,11 +93,8 @@
{
controlFile.open(1, false);
- HornetQBuffer renameBuffer = HornetQBuffers.dynamicBuffer(1);
+ JournalImpl.initFileHeader(fileFactory, controlFile, 0, 0);
- renameBuffer.writeInt(-1);
- renameBuffer.writeInt(-1);
-
HornetQBuffer filesToRename = HornetQBuffers.dynamicBuffer(1);
// DataFiles first
@@ -155,12 +150,16 @@
new
ByteArrayEncoding(filesToRename.toByteBuffer()
.array()));
- controlRecord.setFileID(-1);
+
+ HornetQBuffer renameBuffer =
HornetQBuffers.dynamicBuffer(filesToRename.writerIndex());
+
+ controlRecord.setFileID(0);
+
controlRecord.encode(renameBuffer);
ByteBuffer writeBuffer = fileFactory.newBuffer(renameBuffer.writerIndex());
-
+
writeBuffer.put(renameBuffer.toByteBuffer().array(), 0,
renameBuffer.writerIndex());
writeBuffer.rewind();
@@ -208,16 +207,18 @@
flush();
ByteBuffer bufferWrite = fileFactory.newBuffer(journal.getFileSize());
+
writingChannel = HornetQBuffers.wrappedBuffer(bufferWrite);
currentFile = journal.getFile(false, false, false, true);
+
sequentialFile = currentFile.getFile();
sequentialFile.open(1, false);
- fileID = nextOrderingID++;
- currentFile = new JournalFileImpl(sequentialFile, fileID);
- writingChannel.writeInt(fileID);
+ currentFile = new JournalFileImpl(sequentialFile, nextOrderingID++);
+
+ JournalImpl.writeHeader(writingChannel, journal.getUserVersion(),
currentFile.getFileID());
}
protected void addToRecordsSnaptshot(final long id)
@@ -235,7 +236,7 @@
protected void writeEncoder(final JournalInternalRecord record) throws Exception
{
- record.setFileID(fileID);
+ record.setFileID(currentFile.getRecordID());
record.encode(getWritingChannel());
}
Modified: trunk/src/main/org/hornetq/core/journal/impl/JournalCleaner.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/JournalCleaner.java 2010-04-29 16:21:28
UTC (rev 9185)
+++ trunk/src/main/org/hornetq/core/journal/impl/JournalCleaner.java 2010-04-29 23:01:30
UTC (rev 9186)
@@ -55,7 +55,7 @@
protected JournalCleaner(final SequentialFileFactory fileFactory,
final JournalImpl journal,
final Set<Long> recordsSnapshot,
- final int nextOrderingID) throws Exception
+ final long nextOrderingID) throws Exception
{
super(fileFactory, journal, recordsSnapshot, nextOrderingID);
openFile();
Modified: trunk/src/main/org/hornetq/core/journal/impl/JournalCompactor.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/JournalCompactor.java 2010-04-29 16:21:28
UTC (rev 9185)
+++ trunk/src/main/org/hornetq/core/journal/impl/JournalCompactor.java 2010-04-29 23:01:30
UTC (rev 9186)
@@ -69,7 +69,7 @@
if (controlFile.exists())
{
- JournalFile file = new JournalFileImpl(controlFile, -1);
+ JournalFile file = new JournalFileImpl(controlFile, 0);
final ArrayList<RecordInfo> records = new ArrayList<RecordInfo>();
@@ -140,7 +140,7 @@
public JournalCompactor(final SequentialFileFactory fileFactory,
final JournalImpl journal,
final Set<Long> recordsSnapshot,
- final int firstFileID)
+ final long firstFileID)
{
super(fileFactory, journal, recordsSnapshot, firstFileID);
}
Modified: trunk/src/main/org/hornetq/core/journal/impl/JournalFile.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/JournalFile.java 2010-04-29 16:21:28 UTC
(rev 9185)
+++ trunk/src/main/org/hornetq/core/journal/impl/JournalFile.java 2010-04-29 23:01:30 UTC
(rev 9186)
@@ -56,7 +56,11 @@
long getOffset();
- int getFileID();
+ /** This is a field to identify that records on this file actually belong to the
current file.
+ * The possible implementation for this is fileID & Integer.MAX_VALUE */
+ int getRecordID();
+
+ long getFileID();
SequentialFile getFile();
}
Modified: trunk/src/main/org/hornetq/core/journal/impl/JournalFileImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/JournalFileImpl.java 2010-04-29 16:21:28
UTC (rev 9185)
+++ trunk/src/main/org/hornetq/core/journal/impl/JournalFileImpl.java 2010-04-29 23:01:30
UTC (rev 9186)
@@ -35,7 +35,9 @@
private final SequentialFile file;
- private final int fileID;
+ private final long fileID;
+
+ private final int recordID;
private long offset;
@@ -49,11 +51,13 @@
private final Map<JournalFile, AtomicInteger> negCounts = new
ConcurrentHashMap<JournalFile, AtomicInteger>();
- public JournalFileImpl(final SequentialFile file, final int fileID)
+ public JournalFileImpl(final SequentialFile file, final long fileID)
{
this.file = file;
this.fileID = fileID;
+
+ this.recordID = (int)(fileID & (long)Integer.MAX_VALUE);
}
public void clearCounts()
@@ -132,10 +136,15 @@
return offset;
}
- public int getFileID()
+ public long getFileID()
{
return fileID;
}
+
+ public int getRecordID()
+ {
+ return recordID;
+ }
public void setOffset(final long offset)
{
Modified: trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java 2010-04-29 16:21:28 UTC
(rev 9185)
+++ trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java 2010-04-29 23:01:30 UTC
(rev 9186)
@@ -69,7 +69,7 @@
/**
*
- * <p>A JournalImpl</p
+ * <p>A circular log implementation.</p
*
* <p>Look at {@link JournalImpl#load(LoaderCallback)} for the file layout
*
@@ -88,6 +88,8 @@
private static final int STATE_LOADED = 2;
+ private static final int FORMAT_VERSION = 1;
+
// Static --------------------------------------------------------
private static final Logger log = Logger.getLogger(JournalImpl.class);
@@ -109,7 +111,8 @@
public static final int MIN_FILE_SIZE = 1024;
- public static final int SIZE_HEADER = DataConstants.SIZE_INT;
+ // FileID(Long) + JournalVersion + UserVersion
+ public static final int SIZE_HEADER = DataConstants.SIZE_LONG + DataConstants.SIZE_INT
+ DataConstants.SIZE_INT;
public static final int BASIC_SIZE = DataConstants.SIZE_BYTE + DataConstants.SIZE_INT
+ DataConstants.SIZE_INT;
@@ -163,8 +166,10 @@
private volatile boolean autoReclaim = true;
- private final AtomicInteger nextFileID = new AtomicInteger(0);
+ private final AtomicLong nextFileID = new AtomicLong(0);
+ private final int userVersion;
+
private final int maxAIO;
private final int fileSize;
@@ -220,65 +225,16 @@
// Constructors --------------------------------------------------
- public void runDirectJournalBlast() throws Exception
+ public JournalImpl(final int fileSize,
+ final int minFiles,
+ final int compactMinFiles,
+ final int compactPercentage,
+ final SequentialFileFactory fileFactory,
+ final String filePrefix,
+ final String fileExtension,
+ final int maxAIO)
{
- final int numIts = 100000000;
-
- JournalImpl.log.info("*** running direct journal blast: " + numIts);
-
- final CountDownLatch latch = new CountDownLatch(numIts * 2);
-
- class MyIOAsyncTask implements IOCompletion
- {
- public void done()
- {
- latch.countDown();
- }
-
- public void onError(final int errorCode, final String errorMessage)
- {
-
- }
-
- public void storeLineUp()
- {
- }
- }
-
- final MyIOAsyncTask task = new MyIOAsyncTask();
-
- final int recordSize = 1024;
-
- final byte[] bytes = new byte[recordSize];
-
- class MyRecord implements EncodingSupport
- {
-
- public void decode(final HornetQBuffer buffer)
- {
- }
-
- public void encode(final HornetQBuffer buffer)
- {
- buffer.writeBytes(bytes);
- }
-
- public int getEncodeSize()
- {
- return recordSize;
- }
-
- }
-
- MyRecord record = new MyRecord();
-
- for (int i = 0; i < numIts; i++)
- {
- appendAddRecord(i, (byte)1, record, true, task);
- appendDeleteRecord(i, true, task);
- }
-
- latch.await();
+ this(fileSize, minFiles, compactMinFiles, compactPercentage, fileFactory,
filePrefix, fileExtension, maxAIO, 0);
}
public JournalImpl(final int fileSize,
@@ -288,7 +244,8 @@
final SequentialFileFactory fileFactory,
final String filePrefix,
final String fileExtension,
- final int maxIO)
+ final int maxAIO,
+ final int userVersion)
{
if (fileFactory == null)
{
@@ -316,7 +273,7 @@
{
throw new NullPointerException("fileExtension is null");
}
- if (maxIO <= 0)
+ if (maxAIO <= 0)
{
throw new IllegalStateException("maxAIO should aways be a positive
number");
}
@@ -347,9 +304,72 @@
this.fileExtension = fileExtension;
- maxAIO = maxIO;
+ this.maxAIO = maxAIO;
+
+ this.userVersion = userVersion;
}
+ public void runDirectJournalBlast() throws Exception
+ {
+ final int numIts = 100000000;
+
+ JournalImpl.log.info("*** running direct journal blast: " + numIts);
+
+ final CountDownLatch latch = new CountDownLatch(numIts * 2);
+
+ class MyIOAsyncTask implements IOCompletion
+ {
+ public void done()
+ {
+ latch.countDown();
+ }
+
+ public void onError(final int errorCode, final String errorMessage)
+ {
+
+ }
+
+ public void storeLineUp()
+ {
+ }
+ }
+
+ final MyIOAsyncTask task = new MyIOAsyncTask();
+
+ final int recordSize = 1024;
+
+ final byte[] bytes = new byte[recordSize];
+
+ class MyRecord implements EncodingSupport
+ {
+
+ public void decode(final HornetQBuffer buffer)
+ {
+ }
+
+ public void encode(final HornetQBuffer buffer)
+ {
+ buffer.writeBytes(bytes);
+ }
+
+ public int getEncodeSize()
+ {
+ return recordSize;
+ }
+
+ }
+
+ MyRecord record = new MyRecord();
+
+ for (int i = 0; i < numIts; i++)
+ {
+ appendAddRecord(i, (byte)1, record, true, task);
+ appendDeleteRecord(i, true, task);
+ }
+
+ latch.await();
+ }
+
public Map<Long, JournalRecord> getRecords()
{
return records;
@@ -576,7 +596,7 @@
// This record is from a previous file-usage. The file was
// reused and we need to ignore this record
- if (readFileId != file.getFileID())
+ if (readFileId != file.getRecordID())
{
// If a file has damaged pendingTransactions, we make it a dataFile, and
the
// next reclaiming will fix it
@@ -2442,6 +2462,11 @@
{
return maxAIO;
}
+
+ public int getUserVersion()
+ {
+ return userVersion;
+ }
// In some tests we need to force the journal to move to a next file
public void forceMoveNextFile() throws Exception
@@ -2631,25 +2656,17 @@
// Discard the old JournalFile and set it with a new ID
private JournalFile reinitializeFile(final JournalFile file) throws Exception
{
- int newFileID = generateFileID();
+ long newFileID = generateFileID();
SequentialFile sf = file.getFile();
sf.open(1, false);
- sf.position(0);
+ int position = initFileHeader(this.fileFactory, sf, userVersion, newFileID);
- ByteBuffer bb = fileFactory.newBuffer(JournalImpl.SIZE_HEADER);
-
- bb.putInt(newFileID);
-
- bb.rewind();
-
- sf.writeDirect(bb, true);
-
JournalFile jf = new JournalFileImpl(sf, newFileID);
- sf.position(bb.limit());
+ sf.position(position);
sf.close();
@@ -2747,16 +2764,8 @@
file.open(1, false);
- ByteBuffer bb = fileFactory.newBuffer(JournalImpl.SIZE_HEADER);
+ long fileID = readFileHeader(file);
- file.read(bb);
-
- int fileID = bb.getInt();
-
- fileFactory.releaseBuffer(bb);
-
- bb = null;
-
if (nextFileID.get() < fileID)
{
nextFileID.set(fileID);
@@ -2784,6 +2793,68 @@
return orderedFiles;
}
+ /**
+ * @param file
+ * @return
+ * @throws Exception
+ */
+ private long readFileHeader(SequentialFile file) throws Exception
+ {
+ ByteBuffer bb = fileFactory.newBuffer(JournalImpl.SIZE_HEADER);
+
+ file.read(bb);
+
+ int journalVersion = bb.getInt();
+
+ int userVersion = bb.getInt();
+
+ long fileID = bb.getLong();
+
+ fileFactory.releaseBuffer(bb);
+
+ bb = null;
+ return fileID;
+ }
+
+ /**
+ * @param fileID
+ * @param sequentialFile
+ * @throws Exception
+ */
+ static int initFileHeader(final SequentialFileFactory fileFactory, final
SequentialFile sequentialFile, final int userVersion, final long fileID) throws Exception
+ {
+ // We don't need to release buffers while writing.
+ ByteBuffer bb = fileFactory.newBuffer(JournalImpl.SIZE_HEADER);
+
+ HornetQBuffer buffer = HornetQBuffers.wrappedBuffer(bb);
+
+ writeHeader(buffer, userVersion, fileID);
+
+ bb.rewind();
+
+ int bufferSize = bb.limit();
+
+ sequentialFile.position(0);
+
+ sequentialFile.writeDirect(bb, true);
+
+ return bufferSize;
+ }
+
+ /**
+ * @param buffer
+ * @param userVersion
+ * @param fileID
+ */
+ static void writeHeader(HornetQBuffer buffer, final int userVersion, final long
fileID)
+ {
+ buffer.writeInt(FORMAT_VERSION);
+
+ buffer.writeInt(userVersion);
+
+ buffer.writeLong(fileID);
+ }
+
/**
*
* @param completeTransaction If the appendRecord is for a prepare or commit, where we
should update the number of pendingTransactions on the current file
@@ -2867,7 +2938,7 @@
}
// Adding fileID
- encoder.setFileID(currentFile.getFileID());
+ encoder.setFileID(currentFile.getRecordID());
if (callback != null)
{
@@ -2903,10 +2974,10 @@
*/
private JournalFile createFile(final boolean keepOpened,
final boolean multiAIO,
- final boolean fill,
+ final boolean init,
final boolean tmpCompact) throws Exception
{
- int fileID = generateFileID();
+ long fileID = generateFileID();
String fileName;
@@ -2930,17 +3001,11 @@
sequentialFile.open(1, false);
- if (fill)
+ if (init)
{
sequentialFile.fill(0, fileSize, JournalImpl.FILL_CHARACTER);
- ByteBuffer bb = fileFactory.newBuffer(JournalImpl.SIZE_HEADER);
-
- bb.putInt(fileID);
-
- bb.rewind();
-
- sequentialFile.writeDirect(bb, true);
+ initFileHeader(this.fileFactory, sequentialFile, userVersion, fileID);
}
long position = sequentialFile.position();
@@ -2979,7 +3044,7 @@
file.getFile().position(file.getFile().calculateBlockStart(JournalImpl.SIZE_HEADER));
}
- private int generateFileID()
+ private long generateFileID()
{
return nextFileID.incrementAndGet();
}
@@ -3098,7 +3163,7 @@
*/
JournalFile getFile(final boolean keepOpened,
final boolean multiAIO,
- final boolean fill,
+ final boolean initFile,
final boolean tmpCompactExtension) throws Exception
{
JournalFile nextOpenedFile = null;
@@ -3117,7 +3182,7 @@
if (nextOpenedFile == null)
{
- nextOpenedFile = createFile(keepOpened, multiAIO, fill, tmpCompactExtension);
+ nextOpenedFile = createFile(keepOpened, multiAIO, initFile,
tmpCompactExtension);
}
else
{
@@ -3435,8 +3500,8 @@
{
public int compare(final JournalFile f1, final JournalFile f2)
{
- int id1 = f1.getFileID();
- int id2 = f2.getFileID();
+ long id1 = f1.getFileID();
+ long id2 = f2.getFileID();
return id1 < id2 ? -1 : id1 == id2 ? 0 : 1;
}
Modified: trunk/src/main/org/hornetq/core/replication/impl/ReplicatedJournal.java
===================================================================
--- trunk/src/main/org/hornetq/core/replication/impl/ReplicatedJournal.java 2010-04-29
16:21:28 UTC (rev 9185)
+++ trunk/src/main/org/hornetq/core/replication/impl/ReplicatedJournal.java 2010-04-29
23:01:30 UTC (rev 9186)
@@ -533,8 +533,15 @@
public void runDirectJournalBlast() throws Exception
{
- // TODO Auto-generated method stub
+ localJournal.runDirectJournalBlast();
+ }
+ /* (non-Javadoc)
+ * @see org.hornetq.core.journal.Journal#getUserVersion()
+ */
+ public int getUserVersion()
+ {
+ return localJournal.getUserVersion();
}
// Package protected ---------------------------------------------
Modified: trunk/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java
===================================================================
---
trunk/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java 2010-04-29
16:21:28 UTC (rev 9185)
+++
trunk/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java 2010-04-29
23:01:30 UTC (rev 9186)
@@ -1078,8 +1078,14 @@
public void runDirectJournalBlast() throws Exception
{
- // TODO Auto-generated method stub
+ }
+ /* (non-Javadoc)
+ * @see org.hornetq.core.journal.Journal#getUserVersion()
+ */
+ public int getUserVersion()
+ {
+ return 0;
}
}
Modified:
trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/JournalImplTestUnit.java
===================================================================
---
trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/JournalImplTestUnit.java 2010-04-29
16:21:28 UTC (rev 9185)
+++
trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/JournalImplTestUnit.java 2010-04-29
23:01:30 UTC (rev 9186)
@@ -662,7 +662,7 @@
public void testReclaimAddUpdateDeleteDifferentFiles1() throws Exception
{
// Make sure there is one record per file
- setup(2, calculateRecordSize(8, getAlignment()) +
calculateRecordSize(JournalImpl.SIZE_ADD_RECORD + recordLength,
+ setup(2, calculateRecordSize(JournalImpl.SIZE_HEADER, getAlignment()) +
calculateRecordSize(JournalImpl.SIZE_ADD_RECORD + recordLength,
getAlignment()), true);
createJournal();
startJournal();
@@ -701,7 +701,7 @@
public void testReclaimAddUpdateDeleteDifferentFiles2() throws Exception
{
// Make sure there is one record per file
- setup(2, calculateRecordSize(8, getAlignment()) +
calculateRecordSize(JournalImpl.SIZE_ADD_RECORD + recordLength,
+ setup(2, calculateRecordSize(JournalImpl.SIZE_HEADER, getAlignment()) +
calculateRecordSize(JournalImpl.SIZE_ADD_RECORD + recordLength,
getAlignment()), true);
createJournal();
Modified: trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/ReclaimerTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/ReclaimerTest.java 2010-04-29
16:21:28 UTC (rev 9185)
+++ trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/ReclaimerTest.java 2010-04-29
23:01:30 UTC (rev 9186)
@@ -792,7 +792,7 @@
return 0;
}
- public int getFileID()
+ public long getFileID()
{
return 0;
}
@@ -977,5 +977,13 @@
this.needCleanup = needCleanup;
}
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.journal.impl.JournalFile#getRecordID()
+ */
+ public int getRecordID()
+ {
+ return 0;
+ }
}
}