[jboss-cvs] JBoss Messaging SVN: r7475 - in branches/clebert_temp_expirement: tests/src/org/jboss/messaging/tests/unit/core/journal/impl and 1 other directory.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Thu Jun 25 20:02:15 EDT 2009
Author: clebert.suconic at jboss.com
Date: 2009-06-25 20:02:15 -0400 (Thu, 25 Jun 2009)
New Revision: 7475
Modified:
branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalFile.java
branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalFileImpl.java
branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java
branches/clebert_temp_expirement/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalImplTestUnit.java
branches/clebert_temp_expirement/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/ReclaimerTest.java
Log:
tweaks
Modified: branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalFile.java
===================================================================
--- branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalFile.java 2009-06-25 23:21:12 UTC (rev 7474)
+++ branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalFile.java 2009-06-26 00:02:15 UTC (rev 7475)
@@ -35,6 +35,10 @@
*/
public interface JournalFile
{
+
+ /** Used during compacting (clearing counters) */
+ void clearCounts();
+
int getNegCount(JournalFile file);
void incNegCount(JournalFile file);
@@ -45,12 +49,6 @@
void decPosCount();
- void incPendingTransaction();
-
- void decPendingTransaction();
-
- int getPendingTransactions();
-
void setCanReclaim(boolean canDelete);
boolean isCanReclaim();
Modified: branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalFileImpl.java
===================================================================
--- branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalFileImpl.java 2009-06-25 23:21:12 UTC (rev 7474)
+++ branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalFileImpl.java 2009-06-26 00:02:15 UTC (rev 7475)
@@ -49,15 +49,13 @@
private final int orderingID;
private long offset;
-
- private final AtomicInteger pendingTransactions = new AtomicInteger(0);
private final AtomicInteger posCount = new AtomicInteger(0);
private boolean canReclaim;
private final Map<JournalFile, AtomicInteger> negCounts = new ConcurrentHashMap<JournalFile, AtomicInteger>();
-
+
public JournalFileImpl(final SequentialFile file, final int fileID, final int orderingID)
{
this.file = file;
@@ -67,6 +65,12 @@
this.orderingID = orderingID;
}
+ public void clearCounts()
+ {
+ negCounts.clear();
+ posCount.set(0);
+ }
+
public int getPosCount()
{
return posCount.intValue();
@@ -111,22 +115,6 @@
posCount.decrementAndGet();
}
- public void incPendingTransaction()
- {
- pendingTransactions.incrementAndGet();
- }
-
- public void decPendingTransaction()
- {
- pendingTransactions.decrementAndGet();
- }
-
- public int getPendingTransactions()
- {
- return pendingTransactions.get();
- }
-
-
public void extendOffset(final int delta)
{
offset += delta;
@@ -197,4 +185,5 @@
return count;
}
+
}
Modified: branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java
===================================================================
--- branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java 2009-06-25 23:21:12 UTC (rev 7474)
+++ branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java 2009-06-26 00:02:15 UTC (rev 7475)
@@ -817,6 +817,7 @@
// We need to guarantee that the journal is frozen for this short time
// We don't freeze the journal as we compact, only for the short time where we replace records
compactingLock.writeLock().lock();
+ currentFile.clearCounts();
try
{
autoReclaim = false;
@@ -828,16 +829,8 @@
for (JournalFile file : dataFiles)
{
- if (file.getPendingTransactions() == 0)
- {
- trace("Adding " + file + " to compact list");
- dataFilesToProcess.add(file);
- }
- else
- {
- trace(file + " will not be compacted as it has pending transactions");
- break;
- }
+ file.clearCounts();
+ dataFilesToProcess.add(file);
}
this.compactor = new Compactor(recordsSnapshot, pendingTransactions, dataFilesToProcess.get(0).getFileID());
@@ -877,8 +870,7 @@
class Compactor implements JournalReader
{
-
-
+
JournalFile currentFile;
SequentialFile sequentialFile;
@@ -888,7 +880,7 @@
ChannelBuffer channelWrapper;
int nextOrderingID;
-
+
final List<JournalFile> newDataFiles = new ArrayList<JournalFile>();
final Map<Long, JournalRecord> recordsSnapshot;
@@ -928,7 +920,7 @@
if (channelWrapper != null)
{
sequentialFile.position(0);
- sequentialFile.write(channelWrapper, true);
+ sequentialFile.write(channelWrapper.toByteBuffer(), true);
sequentialFile.close();
newDataFiles.add(currentFile);
}
@@ -954,17 +946,14 @@
fileID = nextOrderingID++;
System.out.println("Next OrderingID = " + nextOrderingID);
-
channelWrapper.writeInt(fileID);
channelWrapper.writeInt(fileID);
-
-
-
- for (int i = 0 ; i < 1000; i++)
+
+ for (int i = 0; i < 1000; i++)
{
channelWrapper.writeByte(UnitTestCase.getSamplebyte(i));
}
-
+
}
public void addRecord(RecordInfo info) throws Exception
@@ -1106,12 +1095,11 @@
checkSize(size);
JournalRecord newRecord = newRecords.get(info.id);
-
+
if (newRecord == null)
{
log.warn("Couldn't find addRecord information for record " + info.id + " during compacting");
}
-
writeUpdateRecord(fileID,
info.id,
@@ -1119,9 +1107,9 @@
new ByteArrayEncoding(info.data),
size,
channelWrapper);
-
+
newRecord.addUpdateFile(currentFile);
-
+
}
}
@@ -1131,7 +1119,7 @@
if (pendingTransactions.get(transactionID) != null)
{
JournalTransaction newTransaction = getNewJournalTransaction(transactionID);
-
+
int size = SIZE_UPDATE_RECORD_TX + info.data.length;
checkSize(size);
@@ -1143,13 +1131,12 @@
new ByteArrayEncoding(info.data),
size,
channelWrapper);
-
-
+
newTransaction.addPositive(currentFile, info.id);
}
else
{
-
+
updateRecord(info);
}
}
@@ -1919,24 +1906,16 @@
public int readJournalFile(JournalFile file, JournalReader reader) throws Exception
{
- ByteBuffer wholeFileBuffer = fileFactory.newBuffer(fileSize);
-
+
file.getFile().open(1);
- int bytesRead = file.getFile().read(wholeFileBuffer);
+ ByteBuffer wholeFileBuffer = fileFactory.newBuffer((int)file.getFile().size());
- if (bytesRead != fileSize)
+ int bytesRead = file.getFile().read(wholeFileBuffer);
+
+ if (bytesRead != file.getFile().size())
{
- // FIXME - We should extract everything we can from this file
- // and then we shouldn't ever reuse this file on reclaiming (instead
- // reclaim on different size files would aways throw the file away)
- // rather than throw ISE!
- // We don't want to leave the user with an unusable system
- throw new IllegalStateException("File is wrong size " + bytesRead +
- " expected " +
- fileSize +
- " : " +
- file.getFile().getFileName());
+ throw new RuntimeException("Invalid read! The system couldn't read the entire file into memory");
}
wholeFileBuffer.position(0);
@@ -2227,9 +2206,6 @@
/**
* <p> Check for holes on the transaction (a commit written but with an incomplete transaction) </p>
* <p>This method will validate if the transaction (PREPARE/COMMIT) is complete as stated on the COMMIT-RECORD.</p>
- * <p> We record a summary about the pendingTransactions on the journal file on COMMIT and PREPARE.
- * When we load the pendingTransactions we build a new summary and we check the original summary to the current summary.
- * This method is basically verifying if the entire transaction is being loaded </p>
*
* <p>Look at the javadoc on {@link JournalImpl#appendCommitRecord(long)} about how the transaction-summary is recorded</p>
*
@@ -2243,8 +2219,6 @@
final List<JournalFile> orderedFiles,
final int numberOfRecords)
{
- // (I) First we get the summary of what we really have on the files now:
-
return journalTransaction.getCounter(currentFile) == numberOfRecords;
}
@@ -2338,7 +2312,7 @@
ChannelBuffer bb)
{
bb.writeByte(UPDATE_RECORD);
- bb.writeInt(fileId); // skip ID part
+ bb.writeInt(fileId);
bb.writeLong(id);
bb.writeInt(record.getEncodeSize());
bb.writeByte(recordType);
@@ -3181,7 +3155,6 @@
for (JournalFile jf : pendingFiles)
{
file.incNegCount(jf);
- jf.decPendingTransaction();
}
}
@@ -3224,7 +3197,6 @@
for (JournalFile jf : pendingFiles)
{
file.incNegCount(jf);
- jf.decPendingTransaction();
}
}
@@ -3247,7 +3219,6 @@
for (JournalFile jf : pendingFiles)
{
jf.decPosCount();
- jf.decPendingTransaction();
}
}
@@ -3267,8 +3238,6 @@
// prevents any transactional operations
// being deleted before a commit or rollback is written
file.incPosCount();
-
- file.incPendingTransaction();
}
}
}
Modified: branches/clebert_temp_expirement/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalImplTestUnit.java
===================================================================
--- branches/clebert_temp_expirement/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalImplTestUnit.java 2009-06-25 23:21:12 UTC (rev 7474)
+++ branches/clebert_temp_expirement/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalImplTestUnit.java 2009-06-26 00:02:15 UTC (rev 7475)
@@ -3145,7 +3145,7 @@
}
journal.forceMoveNextFile();
-
+
System.out.println("Number of Files: " + journal.getDataFilesCount());
System.out.println("Before compact ****************************");
Modified: branches/clebert_temp_expirement/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/ReclaimerTest.java
===================================================================
--- branches/clebert_temp_expirement/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/ReclaimerTest.java 2009-06-25 23:21:12 UTC (rev 7474)
+++ branches/clebert_temp_expirement/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/ReclaimerTest.java 2009-06-26 00:02:15 UTC (rev 7475)
@@ -893,5 +893,12 @@
{
return 0;
}
+
+ /* (non-Javadoc)
+ * @see org.jboss.messaging.core.journal.impl.JournalFile#clearCounts()
+ */
+ public void clearCounts()
+ {
+ }
}
}
More information about the jboss-cvs-commits
mailing list