[jboss-cvs] JBoss Messaging SVN: r7443 - in branches/clebert_temp_expirement: tests/src/org/jboss/messaging/tests/unit/core/journal/impl and 1 other directory.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Tue Jun 23 12:46:34 EDT 2009
Author: clebert.suconic at jboss.com
Date: 2009-06-23 12:46:34 -0400 (Tue, 23 Jun 2009)
New Revision: 7443
Modified:
branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalFile.java
branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalFileImpl.java
branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java
branches/clebert_temp_expirement/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/ReclaimerTest.java
Log:
tweaks on the temp branch
Modified: branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalFile.java
===================================================================
--- branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalFile.java 2009-06-23 11:07:33 UTC (rev 7442)
+++ branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalFile.java 2009-06-23 16:46:34 UTC (rev 7443)
@@ -57,6 +57,8 @@
long getOffset();
+ int getFileID();
+
int getOrderingID();
SequentialFile getFile();
Modified: branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalFileImpl.java
===================================================================
--- branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalFileImpl.java 2009-06-23 11:07:33 UTC (rev 7442)
+++ branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalFileImpl.java 2009-06-23 16:46:34 UTC (rev 7443)
@@ -44,6 +44,8 @@
private final SequentialFile file;
+ private final int fileID;
+
private final int orderingID;
private long offset;
@@ -56,10 +58,12 @@
private final Map<JournalFile, AtomicInteger> negCounts = new ConcurrentHashMap<JournalFile, AtomicInteger>();
- public JournalFileImpl(final SequentialFile file, final int orderingID)
+ public JournalFileImpl(final SequentialFile file, final int fileID, final int orderingID)
{
this.file = file;
+ this.fileID = fileID;
+
this.orderingID = orderingID;
}
@@ -133,6 +137,11 @@
return offset;
}
+ public int getFileID()
+ {
+ return fileID;
+ }
+
public int getOrderingID()
{
return orderingID;
Modified: branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java
===================================================================
--- branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java 2009-06-23 11:07:33 UTC (rev 7442)
+++ branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java 2009-06-23 16:46:34 UTC (rev 7443)
@@ -934,56 +934,83 @@
}
final ConcurrentMap<Long, RecordFilesRelationship> recordsSnapshot = recordsSnapshotList;
+
+ Compactor compactor = new Compactor(recordsSnapshot, dataFilesToProcess.get(0).getFileID());
for (final JournalFile file : dataFilesToProcess)
{
- readJournalFile(file, new CompactorJournalReader(recordsSnapshot));
+ readJournalFile(file, compactor);
- writeLockCompact.lock();
- try
- {
- // Restore relationshipMap
- // Deal with updates and deletes that happened during the compacting
+ }
- }
- finally
- {
- writeLockCompact.unlock();
- }
+ writeLockCompact.lock();
+ try
+ {
+ // Restore relationshipMap
+ // Deal with updates and deletes that happened during the compacting
}
+ finally
+ {
+ writeLockCompact.unlock();
+ }
}
- class CompactorJournalReader implements JournalReader
+ class Compactor implements JournalReader
{
JournalFile currentOutputFile;
+
+ SequentialFile sequentialFile;
ByteBuffer bufferWrite;
+
+ int nextOrderingID;
ConcurrentMap<Long, RecordFilesRelationship> recordsSnapshot;
- public CompactorJournalReader(ConcurrentMap<Long, RecordFilesRelationship> recordsSnapshot)
+ public Compactor(ConcurrentMap<Long, RecordFilesRelationship> recordsSnapshot, int firstFileID)
{
this.recordsSnapshot = recordsSnapshot;
+ this.nextOrderingID = firstFileID;
}
private void checkSize(int size) throws Exception
{
if (bufferWrite == null)
{
- bufferWrite = fileFactory.newBuffer(size);
+ flushFile();
}
+ else
+ {
+ if (bufferWrite.position() + size > bufferWrite.limit())
+ {
+ flushFile();
+ }
+ }
+ }
- if (currentOutputFile == null)
+ /**
+ * @throws Exception
+ */
+ private void flushFile() throws Exception
+ {
+ if (bufferWrite != null)
{
- currentOutputFile = openFile(false);
+ sequentialFile.position(0);
+ sequentialFile.write(bufferWrite, true);
+ sequentialFile.close();
}
+
+ bufferWrite = fileFactory.newBuffer(fileSize);
+ currentOutputFile = openFile(false);
+ sequentialFile = currentOutputFile.getFile();
+ bufferWrite.putInt(currentOutputFile.getFileID());
+ bufferWrite.putInt(nextOrderingID++);
}
public void addRecord(RecordInfo info) throws Exception
{
-
if (recordsSnapshot.get(info.id) != null)
{
System.out.println("Record " + info.id + " to be out on compacted file");
@@ -1780,7 +1807,7 @@
sf.write(bb, true);
- JournalFile jf = new JournalFileImpl(sf, newFileID);
+ JournalFile jf = new JournalFileImpl(sf, newFileID, newFileID);
sf.position(bb.limit());
@@ -1985,7 +2012,7 @@
// This record is from a previous file-usage. The file was
// reused and we need to ignore this record
- if (readFileId != file.getOrderingID())
+ if (readFileId != file.getFileID())
{
// If a file has damaged records, we make it a dataFile, and the
// next reclaiming will fix it
@@ -2153,7 +2180,7 @@
{
for (JournalFile lookupFile : orderedFiles)
{
- if (lookupFile.getOrderingID() == ref.a)
+ if (lookupFile.getFileID() == ref.a)
{
// (III) oops, we were expecting at least one record on this
// file.
@@ -2321,18 +2348,20 @@
file.read(bb);
+ int fileID = bb.getInt();
+
int orderingID = bb.getInt();
fileFactory.releaseBuffer(bb);
bb = null;
- if (nextFileID.get() < orderingID)
+ if (nextFileID.get() < fileID)
{
- nextFileID.set(orderingID);
+ nextFileID.set(fileID);
}
- orderedFiles.add(new JournalFileImpl(file, orderingID));
+ orderedFiles.add(new JournalFileImpl(file, fileID, orderingID));
file.close();
}
@@ -2344,10 +2373,19 @@
{
public int compare(final JournalFile f1, final JournalFile f2)
{
- int id1 = f1.getOrderingID();
- int id2 = f2.getOrderingID();
-
- return id1 < id2 ? -1 : id1 == id2 ? 0 : 1;
+ int oid1 = f1.getOrderingID();
+ int oid2 = f2.getOrderingID();
+
+ if (oid1 == oid2)
+ {
+ int id1 = f1.getFileID();
+ int id2 = f2.getFileID();
+ return oid1 < id2 ? -1 : id1 == id2 ? 0 : 1;
+ }
+ else
+ {
+ return oid1 < oid2 ? -1 : oid1 == oid2 ? 0 : 1;
+ }
}
}
@@ -2400,7 +2438,7 @@
bb.writerIndex(SIZE_BYTE);
- bb.writeInt(currentFile.getOrderingID());
+ bb.writeInt(currentFile.getFileID());
if (callback != null)
{
@@ -2459,7 +2497,7 @@
sequentialFile.write(bb, true);
- JournalFile info = new JournalFileImpl(sequentialFile, fileID);
+ JournalFile info = new JournalFileImpl(sequentialFile, fileID, fileID);
if (!keepOpened)
{
@@ -2975,12 +3013,12 @@
private AtomicInteger getCounter(final JournalFile file)
{
- AtomicInteger value = numberOfElementsPerFile.get(file.getOrderingID());
+ AtomicInteger value = numberOfElementsPerFile.get(file.getFileID());
if (value == null)
{
value = new AtomicInteger();
- numberOfElementsPerFile.put(file.getOrderingID(), value);
+ numberOfElementsPerFile.put(file.getFileID(), value);
}
return value;
Modified: branches/clebert_temp_expirement/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/ReclaimerTest.java
===================================================================
--- branches/clebert_temp_expirement/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/ReclaimerTest.java 2009-06-23 11:07:33 UTC (rev 7442)
+++ branches/clebert_temp_expirement/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/ReclaimerTest.java 2009-06-23 16:46:34 UTC (rev 7443)
@@ -762,7 +762,7 @@
return 0;
}
- public int getOrderingID()
+ public int getFileID()
{
return 0;
}
@@ -885,5 +885,13 @@
public void incPendingTransaction()
{
}
+
+ /* (non-Javadoc)
+ * @see org.jboss.messaging.core.journal.impl.JournalFile#getOrderingID()
+ */
+ public int getOrderingID()
+ {
+ return 0;
+ }
}
}
More information about the jboss-cvs-commits
mailing list