[jboss-cvs] JBoss Messaging SVN: r7507 - in branches/clebert_temp_expirement: tests/src/org/jboss/messaging/tests/unit/core/journal/impl and 1 other directory.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Wed Jul 1 01:44:19 EDT 2009
Author: clebert.suconic at jboss.com
Date: 2009-07-01 01:44:19 -0400 (Wed, 01 Jul 2009)
New Revision: 7507
Modified:
branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalCompactor.java
branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalFile.java
branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalFileImpl.java
branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java
branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalTransaction.java
branches/clebert_temp_expirement/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalImplTestUnit.java
branches/clebert_temp_expirement/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/ReclaimerTest.java
Log:
Computing live size of records (1st commit)
Modified: branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalCompactor.java
===================================================================
--- branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalCompactor.java 2009-07-01 04:41:56 UTC (rev 7506)
+++ branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalCompactor.java 2009-07-01 05:44:19 UTC (rev 7507)
@@ -305,10 +305,10 @@
* @param id
* @param usedFile
*/
- public void addCommandUpdate(final long id, final JournalFile usedFile)
+ public void addCommandUpdate(final long id, final JournalFile usedFile, final int size)
{
log.info("Adding update command");
- pendingCommands.add(new UpdateCompactCommand(id, usedFile));
+ pendingCommands.add(new UpdateCompactCommand(id, usedFile, size));
}
public boolean lookupRecord(final long id)
@@ -383,7 +383,7 @@
size,
writingChannel);
- newRecords.put(info.id, new JournalRecord(currentFile));
+ newRecords.put(info.id, new JournalRecord(currentFile, size));
}
}
@@ -397,7 +397,7 @@
checkSize(size);
- newTransaction.addPositive(currentFile, info.id);
+ newTransaction.addPositive(currentFile, info.id, size);
JournalImpl.writeAddRecordTX(fileID,
transactionID,
@@ -512,7 +512,7 @@
}
else
{
- newRecord.addUpdateFile(currentFile);
+ newRecord.addUpdateFile(currentFile, size);
}
JournalImpl.writeUpdateRecord(fileID,
@@ -543,7 +543,7 @@
size,
writingChannel);
- newTransaction.addPositive(currentFile, info.id);
+ newTransaction.addPositive(currentFile, info.id, size);
}
else
{
@@ -626,21 +626,24 @@
private class UpdateCompactCommand extends CompactCommand
{
- long id;
+ private long id;
- JournalFile usedFile;
+ private JournalFile usedFile;
+
+ private final int size;
- public UpdateCompactCommand(final long id, final JournalFile usedFile)
+ public UpdateCompactCommand(final long id, final JournalFile usedFile, final int size)
{
this.id = id;
this.usedFile = usedFile;
+ this.size = size;
}
@Override
void execute() throws Exception
{
JournalRecord updateRecord = journal.getRecords().get(id);
- updateRecord.addUpdateFile(usedFile);
+ updateRecord.addUpdateFile(usedFile, size);
}
}
Modified: branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalFile.java
===================================================================
--- branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalFile.java 2009-07-01 04:41:56 UTC (rev 7506)
+++ branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalFile.java 2009-07-01 05:44:19 UTC (rev 7507)
@@ -49,6 +49,12 @@
void decPosCount();
+ void addSize(int bytes);
+
+ void decSize(int bytes);
+
+ int getLiveSize();
+
void setCanReclaim(boolean canDelete);
boolean isCanReclaim();
Modified: branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalFileImpl.java
===================================================================
--- branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalFileImpl.java 2009-07-01 04:41:56 UTC (rev 7506)
+++ branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalFileImpl.java 2009-07-01 05:44:19 UTC (rev 7507)
@@ -51,6 +51,8 @@
private long offset;
private final AtomicInteger posCount = new AtomicInteger(0);
+
+ private final AtomicInteger liveBytes = new AtomicInteger(0);
private boolean canReclaim;
@@ -69,6 +71,7 @@
{
negCounts.clear();
posCount.set(0);
+ liveBytes.set(0);
}
public int getPosCount()
@@ -185,5 +188,29 @@
return count;
}
+ /* (non-Javadoc)
+ * @see org.jboss.messaging.core.journal.impl.JournalFile#addSize(int)
+ */
+ public void addSize(int bytes)
+ {
+ liveBytes.addAndGet(bytes);
+ }
+ /* (non-Javadoc)
+ * @see org.jboss.messaging.core.journal.impl.JournalFile#decSize(int)
+ */
+ public void decSize(int bytes)
+ {
+ liveBytes.addAndGet(-bytes);
+ }
+
+ /* (non-Javadoc)
+ * @see org.jboss.messaging.core.journal.impl.JournalFile#getSize()
+ */
+ public int getLiveSize()
+ {
+ return liveBytes.get();
+ }
+
+
}
Modified: branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java
===================================================================
--- branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java 2009-07-01 04:41:56 UTC (rev 7506)
+++ branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java 2009-07-01 05:44:19 UTC (rev 7507)
@@ -62,6 +62,7 @@
import org.jboss.messaging.core.logging.Logger;
import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
import org.jboss.messaging.utils.DataConstants;
+import org.jboss.messaging.utils.Pair;
import org.jboss.messaging.utils.concurrent.LinkedBlockingDeque;
/**
@@ -322,7 +323,7 @@
{
JournalFile usedFile = appendRecord(bb, false, sync, null, callback);
- records.put(id, new JournalRecord(usedFile));
+ records.put(id, new JournalRecord(usedFile, record.getEncodeSize()));
}
finally
{
@@ -386,11 +387,11 @@
// compacting is done
if (posFiles == null)
{
- compactor.addCommandUpdate(id, usedFile);
+ compactor.addCommandUpdate(id, usedFile, size);
}
else
{
- posFiles.addUpdateFile(usedFile);
+ posFiles.addUpdateFile(usedFile, record.getEncodeSize());
}
}
finally
@@ -511,7 +512,7 @@
{
JournalFile usedFile = appendRecord(bb, false, false, tx, null);
- tx.addPositive(usedFile, id);
+ tx.addPositive(usedFile, id, size);
}
finally
{
@@ -560,7 +561,7 @@
{
JournalFile usedFile = appendRecord(bb, false, false, tx, null);
- tx.addPositive(usedFile, id);
+ tx.addPositive(usedFile, id, size);
}
finally
{
@@ -1137,7 +1138,7 @@
loadManager.addRecord(info);
- records.put(info.id, new JournalRecord(file));
+ records.put(info.id, new JournalRecord(file, info.data.length + SIZE_ADD_RECORD));
}
public void onReadUpdateRecord(RecordInfo info) throws Exception
@@ -1158,7 +1159,7 @@
// have been deleted
// just leaving some updates in this file
- posFiles.addUpdateFile(file);
+ posFiles.addUpdateFile(file, info.data.length + SIZE_UPDATE_RECORD);
}
}
@@ -1215,7 +1216,7 @@
transactions.put(transactionID, tnp);
}
- tnp.addPositive(file, info.id);
+ tnp.addPositive(file, info.id, info.data.length + SIZE_ADD_RECORD_TX);
}
public void onReadDeleteRecordTX(long transactionID, RecordInfo info) throws Exception
@@ -1582,6 +1583,8 @@
file.getPosCount() +
" reclaimStatus = " +
file.isCanReclaim() +
+ " live size = " +
+ file.getLiveSize() +
"\n");
if (file instanceof JournalFileImpl)
{
@@ -2847,37 +2850,46 @@
public static class JournalRecord
{
private final JournalFile addFile;
+ private final int size;
- private List<JournalFile> updateFiles;
+ private List<Pair<JournalFile, Integer>> updateFiles;
- JournalRecord(final JournalFile addFile)
+ JournalRecord(final JournalFile addFile, final int size)
{
this.addFile = addFile;
+
+ this.size = size;
addFile.incPosCount();
+
+ addFile.addSize(size);
}
- void addUpdateFile(final JournalFile updateFile)
+ void addUpdateFile(final JournalFile updateFile, final int size)
{
if (updateFiles == null)
{
- updateFiles = new ArrayList<JournalFile>();
+ updateFiles = new ArrayList<Pair<JournalFile, Integer>>();
}
- updateFiles.add(updateFile);
+ updateFiles.add(new Pair<JournalFile, Integer>(updateFile, size));
updateFile.incPosCount();
+
+ updateFile.addSize(size);
}
void delete(final JournalFile file)
{
file.incNegCount(addFile);
+ addFile.decSize(size);
if (updateFiles != null)
{
- for (JournalFile updFile : updateFiles)
+ for (Pair<JournalFile, Integer> updFile : updateFiles)
{
- file.incNegCount(updFile);
+ file.incNegCount(updFile.a);
+ file.decSize(updFile.b);
}
}
}
@@ -2890,9 +2902,9 @@
if (updateFiles != null)
{
- for (JournalFile update : updateFiles)
+ for (Pair<JournalFile, Integer> update : updateFiles)
{
- buffer.append(", update=" + update.getFile().getFileName());
+ buffer.append(", update=" + update.a.getFile().getFileName());
}
}
Modified: branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalTransaction.java
===================================================================
--- branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalTransaction.java 2009-07-01 04:41:56 UTC (rev 7506)
+++ branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalTransaction.java 2009-07-01 05:44:19 UTC (rev 7507)
@@ -47,9 +47,9 @@
private final JournalImpl journal;
- private List<Pair<JournalFile, Long>> pos;
+ private List<JournalUpdate> pos;
- private List<Pair<JournalFile, Long>> neg;
+ private List<JournalUpdate> neg;
private final long id;
@@ -101,9 +101,9 @@
{
int i = 0;
long[] ids = new long[pos.size()];
- for (Pair<JournalFile, Long> el : pos)
+ for (JournalUpdate el : pos)
{
- ids[i++] = el.b;
+ ids[i++] = el.getId();
}
return ids;
}
@@ -125,7 +125,7 @@
{
if (pos == null)
{
- pos = new ArrayList<Pair<JournalFile, Long>>();
+ pos = new ArrayList<JournalUpdate>();
}
pos.addAll(other.pos);
@@ -135,7 +135,7 @@
{
if (neg == null)
{
- neg = new ArrayList<Pair<JournalFile, Long>>();
+ neg = new ArrayList<JournalUpdate>();
}
neg.addAll(other.neg);
@@ -259,7 +259,7 @@
return currentCallback;
}
- public void addPositive(final JournalFile file, final long id)
+ public void addPositive(final JournalFile file, final long id, final int size)
{
incCounter(file);
@@ -267,10 +267,10 @@
if (pos == null)
{
- pos = new ArrayList<Pair<JournalFile, Long>>();
+ pos = new ArrayList<JournalUpdate>();
}
- pos.add(new Pair<JournalFile, Long>(file, id));
+ pos.add(new JournalUpdate(file, id, size));
}
public void addNegative(final JournalFile file, final long id)
@@ -281,10 +281,10 @@
if (neg == null)
{
- neg = new ArrayList<Pair<JournalFile, Long>>();
+ neg = new ArrayList<JournalUpdate>();
}
- neg.add(new Pair<JournalFile, Long>(file, id));
+ neg.add(new JournalUpdate(file, id, 0));
}
/**
@@ -303,47 +303,46 @@
if (pos != null)
{
- for (Pair<JournalFile, Long> trUpdate : pos)
+ for (JournalUpdate trUpdate : pos)
{
- JournalImpl.JournalRecord posFiles = journal.getRecords().get(trUpdate.b);
+ JournalImpl.JournalRecord posFiles = journal.getRecords().get(trUpdate.id);
- if (compactor != null && compactor.lookupRecord(trUpdate.b))
+ if (compactor != null && compactor.lookupRecord(trUpdate.id))
{
// This is a case where the transaction was opened after compacting was started,
// but the commit arrived while compacting was working
// We need to cache the counter update, so compacting will take the correct files when it is done
- compactor.addCommandUpdate(trUpdate.b, trUpdate.a);
+ compactor.addCommandUpdate(trUpdate.id, trUpdate.file, trUpdate.size);
}
- else
- if (posFiles == null)
+ else if (posFiles == null)
{
- posFiles = new JournalImpl.JournalRecord(trUpdate.a);
+ posFiles = new JournalImpl.JournalRecord(trUpdate.file, trUpdate.size);
- journal.getRecords().put(trUpdate.b, posFiles);
+ journal.getRecords().put(trUpdate.id, posFiles);
}
else
{
- posFiles.addUpdateFile(trUpdate.a);
+ posFiles.addUpdateFile(trUpdate.file, trUpdate.size);
}
}
}
if (neg != null)
{
- for (Pair<JournalFile, Long> trDelete : neg)
+ for (JournalUpdate trDelete : neg)
{
- JournalImpl.JournalRecord posFiles = journal.getRecords().remove(trDelete.b);
-
+ JournalImpl.JournalRecord posFiles = journal.getRecords().remove(trDelete.id);
+
if (posFiles != null)
{
- posFiles.delete(trDelete.a);
+ posFiles.delete(trDelete.file);
}
- else if (compactor != null && compactor.lookupRecord(trDelete.b))
+ else if (compactor != null && compactor.lookupRecord(trDelete.id))
{
// This is a case where the transaction was opened after compacting was started,
// but the commit arrived while compacting was working
// We need to cache the counter update, so compacting will take the correct files when it is done
- compactor.addCommandDelete(trDelete.b, trDelete.a);
+ compactor.addCommandDelete(trDelete.id, trDelete.file);
}
}
}
@@ -465,4 +464,52 @@
file.incPosCount();
}
}
+
+ static class JournalUpdate
+ {
+ JournalFile file;
+
+ long id;
+
+ int size;
+
+
+ /**
+ * @param file
+ * @param id
+ * @param size
+ */
+ public JournalUpdate(JournalFile file, long id, int size)
+ {
+ super();
+ this.file = file;
+ this.id = id;
+ this.size = size;
+ }
+
+ /**
+ * @return the file
+ */
+ public JournalFile getFile()
+ {
+ return file;
+ }
+
+ /**
+ * @return the id
+ */
+ public long getId()
+ {
+ return id;
+ }
+
+ /**
+ * @return the size
+ */
+ public int getSize()
+ {
+ return size;
+ }
+
+ }
}
Modified: branches/clebert_temp_expirement/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalImplTestUnit.java
===================================================================
--- branches/clebert_temp_expirement/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalImplTestUnit.java 2009-07-01 04:41:56 UTC (rev 7506)
+++ branches/clebert_temp_expirement/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalImplTestUnit.java 2009-07-01 05:44:19 UTC (rev 7507)
@@ -3023,15 +3023,29 @@
{
delete(i);
}
+
+ System.out.println("After delete ****************************");
+ System.out.println(journal.debug());
+ System.out.println("*****************************************");
+
for (int i = 100; i < 200; i++)
{
updateTx(transactionID, i);
}
+
+ System.out.println("After updatetx ****************************");
+ System.out.println(journal.debug());
+ System.out.println("*****************************************");
+
journal.forceMoveNextFile();
commit(transactionID++);
+
+ System.out.println("After commit ****************************");
+ System.out.println(journal.debug());
+ System.out.println("*****************************************");
for (int i = 100; i < 200; i++)
{
@@ -3039,9 +3053,14 @@
deleteTx(transactionID, i);
}
+ System.out.println("After delete ****************************");
+ System.out.println(journal.debug());
+ System.out.println("*****************************************");
+
+
commit(transactionID++);
- System.out.println("Before reclaim ****************************");
+ System.out.println("Before reclaim/after commit ****************************");
System.out.println(journal.debug());
System.out.println("*****************************************");
@@ -3067,27 +3086,6 @@
assertEquals(0, journal.getDataFilesCount());
}
- public void testCompactwithPendingPrepare() throws Exception
- {
- }
-
- public void testCompactwithConcurrentDeletes() throws Exception
- {
- }
-
- public void testCompactwithConcurrentAppend() throws Exception
- {
- }
-
- public void testCompactWithPendingTransactionAndDelete() throws Exception
- {
- }
-
- public void testCompactingWithPendingTransaction() throws Exception
- {
-
- }
-
protected abstract int getAlignment();
}
Modified: branches/clebert_temp_expirement/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/ReclaimerTest.java
===================================================================
--- branches/clebert_temp_expirement/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/ReclaimerTest.java 2009-07-01 04:41:56 UTC (rev 7506)
+++ branches/clebert_temp_expirement/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/ReclaimerTest.java 2009-07-01 05:44:19 UTC (rev 7507)
@@ -900,5 +900,27 @@
public void clearCounts()
{
}
+
+ /* (non-Javadoc)
+ * @see org.jboss.messaging.core.journal.impl.JournalFile#addSize(int)
+ */
+ public void addSize(int bytes)
+ {
+ }
+
+ /* (non-Javadoc)
+ * @see org.jboss.messaging.core.journal.impl.JournalFile#decSize(int)
+ */
+ public void decSize(int bytes)
+ {
+ }
+
+ /* (non-Javadoc)
+ * @see org.jboss.messaging.core.journal.impl.JournalFile#getSize()
+ */
+ public int getLiveSize()
+ {
+ return 0;
+ }
}
}
More information about the jboss-cvs-commits
mailing list