[jboss-cvs] JBoss Messaging SVN: r7447 - in branches/clebert_temp_expirement: src/main/org/jboss/messaging/core/journal/impl and 1 other directory.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Tue Jun 23 19:51:36 EDT 2009
Author: clebert.suconic at jboss.com
Date: 2009-06-23 19:51:36 -0400 (Tue, 23 Jun 2009)
New Revision: 7447
Modified:
branches/clebert_temp_expirement/build-messaging.xml
branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java
Log:
This commit is just a backup... load is not working at the moment (if someone is trying to run the current revision on this branch)
Modified: branches/clebert_temp_expirement/build-messaging.xml
===================================================================
--- branches/clebert_temp_expirement/build-messaging.xml 2009-06-23 22:48:09 UTC (rev 7446)
+++ branches/clebert_temp_expirement/build-messaging.xml 2009-06-23 23:51:36 UTC (rev 7447)
@@ -591,6 +591,7 @@
<zipfileset src="./thirdparty/org/jboss/lib/jboss-common-core.jar"/>
<zipfileset src="./thirdparty/org/jboss/lib/jboss-mdr.jar"/>
<zipfileset src="./thirdparty/org/jboss/lib/jbossxb.jar"/>
+ <zipfileset src="./thirdparty/sun-jaxb/lib/jaxb-api.jar"/>
</jar>
</target>
Modified: branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java
===================================================================
--- branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java 2009-06-23 22:48:09 UTC (rev 7446)
+++ branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java 2009-06-23 23:51:36 UTC (rev 7447)
@@ -95,16 +95,17 @@
private static final Logger log = Logger.getLogger(JournalImpl.class);
- // private static final boolean trace = log.isTraceEnabled();
- private static final boolean trace = true;
+ private static final boolean trace = log.isTraceEnabled();
+ // private static final boolean trace = true;
+
// This method exists just to make debug easier.
// I could replace log.trace by log.info temporarily while I was debugging
// Journal
private static final void trace(final String message)
{
- System.out.println(message);
- // log.trace(message);
+ // System.out.println(message);
+ log.trace(message);
}
// The sizes of primitive types
@@ -147,7 +148,7 @@
public static final byte DELETE_RECORD = 16;
- public static final int SIZE_COMPLETE_TRANSACTION_RECORD = BASIC_SIZE + SIZE_INT + SIZE_LONG /* + NumerOfElements*SIZE_INT*2 */;
+ public static final int SIZE_COMPLETE_TRANSACTION_RECORD = BASIC_SIZE + SIZE_INT + SIZE_LONG + SIZE_INT;
public static final int SIZE_PREPARE_RECORD = SIZE_COMPLETE_TRANSACTION_RECORD + SIZE_INT;
@@ -305,7 +306,7 @@
lock.acquire();
try
{
- JournalFile usedFile = appendRecord(bb, sync, null, callback);
+ JournalFile usedFile = appendRecord(bb, false, sync, null, callback);
recordsRelationshipMap.put(id, new RecordFilesRelationship(usedFile));
}
@@ -368,7 +369,7 @@
lock.acquire();
try
{
- JournalFile usedFile = appendRecord(bb, sync, null, callback);
+ JournalFile usedFile = appendRecord(bb, false, sync, null, callback);
posFiles.addUpdateFile(usedFile);
}
@@ -423,7 +424,7 @@
lock.acquire();
try
{
- JournalFile usedFile = appendRecord(bb, sync, null, callback);
+ JournalFile usedFile = appendRecord(bb, false, sync, null, callback);
posFiles.addDelete(usedFile);
}
@@ -443,10 +444,7 @@
}
}
- public void appendAddRecordTransactional(final long txID,
- final long id,
- final byte recordType,
- final byte[] record) throws Exception
+ public void appendAddRecordTransactional(final long txID, final long id, final byte recordType, final byte[] record) throws Exception
{
appendAddRecordTransactional(txID, id, recordType, new ByteArrayEncoding(record));
@@ -487,7 +485,7 @@
{
JournalTransaction tx = getTransactionInfo(txID);
- JournalFile usedFile = appendRecord(bb, false, tx, null);
+ JournalFile usedFile = appendRecord(bb, false, false, tx, null);
tx.addPositive(usedFile, id);
}
@@ -543,7 +541,7 @@
{
JournalTransaction tx = getTransactionInfo(txID);
- JournalFile usedFile = appendRecord(bb, false, tx, null);
+ JournalFile usedFile = appendRecord(bb, false, false, tx, null);
tx.addPositive(usedFile, id);
}
@@ -563,9 +561,7 @@
appendDeleteRecordTransactional(txID, id, new ByteArrayEncoding(record));
}
- public void appendDeleteRecordTransactional(final long txID,
- final long id,
- final EncodingSupport record) throws Exception
+ public void appendDeleteRecordTransactional(final long txID, final long id, final EncodingSupport record) throws Exception
{
if (state != STATE_LOADED)
{
@@ -596,7 +592,7 @@
{
JournalTransaction tx = getTransactionInfo(txID);
- JournalFile usedFile = appendRecord(bb, false, tx, null);
+ JournalFile usedFile = appendRecord(bb, false, false, tx, null);
tx.addNegative(usedFile, id);
}
@@ -638,7 +634,7 @@
{
JournalTransaction tx = getTransactionInfo(txID);
- JournalFile usedFile = appendRecord(bb, false, tx, null);
+ JournalFile usedFile = appendRecord(bb, false, false, tx, null);
tx.addNegative(usedFile, id);
}
@@ -691,7 +687,7 @@
lock.acquire();
try
{
- JournalFile usedFile = appendRecord(bb, sync, tx, null);
+ JournalFile usedFile = appendRecord(bb, true, sync, tx, null);
tx.prepare(usedFile);
}
@@ -756,7 +752,7 @@
lock.acquire();
try
{
- JournalFile usedFile = appendRecord(bb, sync, tx, null);
+ JournalFile usedFile = appendRecord(bb, true, sync, tx, null);
tx.commit(usedFile);
}
@@ -810,7 +806,7 @@
lock.acquire();
try
{
- JournalFile usedFile = appendRecord(bb, sync, tx, null);
+ JournalFile usedFile = appendRecord(bb, false, sync, tx, null);
tx.rollback(usedFile);
}
@@ -2225,35 +2221,26 @@
final JournalTransaction tx,
final EncodingSupport transactionData) throws Exception
{
- int size = SIZE_COMPLETE_TRANSACTION_RECORD + tx.getElementsSummary().size() *
- SIZE_INT *
- 2 +
- (transactionData != null ? transactionData.getEncodeSize() + SIZE_INT : 0);
+ int size = SIZE_COMPLETE_TRANSACTION_RECORD + (transactionData != null ? transactionData.getEncodeSize() + SIZE_INT
+ : 0);
ChannelBuffer bb = newBuffer(size);
bb.writeByte(recordType);
bb.writeInt(-1); // skip ID part
bb.writeLong(txID);
+ bb.writeInt(-1); // skip number of records part
if (transactionData != null)
{
bb.writeInt(transactionData.getEncodeSize());
}
- bb.writeInt(tx.getElementsSummary().size());
-
if (transactionData != null)
{
transactionData.encode(bb);
}
- for (Map.Entry<Integer, AtomicInteger> entry : tx.getElementsSummary().entrySet())
- {
- bb.writeInt(entry.getKey());
- bb.writeInt(entry.getValue().get());
- }
-
bb.writeInt(size);
return bb;
@@ -2386,8 +2373,11 @@
/**
* Note: You should aways guarantee locking the semaphore lock.
+ *
+ * @param transactional If the appendRecord is for a prepare or commit, where we should update the number of records on the current file
* */
private JournalFile appendRecord(final MessagingBuffer bb,
+ final boolean completeTransaction,
final boolean sync,
final JournalTransaction tx,
IOCallback callback) throws Exception
@@ -2441,17 +2431,24 @@
// when we guarantee the currentFile will not be changed,
// since we individualize the callback per file
callback = tx.getCallback(currentFile);
-
+
if (sync)
{
// We already did sync previous files outside of the lock,
// but in a very rare occasion (maybe in a low speed disk)
- // you could have a race where the currentFile changed between the last sync to the time the lock was acquired.
+ // you could have a race where the currentFile changed between the last sync to the time the lock was
+ // acquired.
// So, we call the syncPreviousFiles again to guarantee data on disk.
- // Even if there is data to be synced, this should be very fast since previous files were already scheduled to be closed.
+ // Even if there is data to be synced, this should be very fast since previous files were already
+ // scheduled to be closed.
// This is just verifying if previous files are already closed
tx.syncPreviousFiles();
}
+
+ if (completeTransaction)
+ {
+ tx.fillNumberOfRecords(currentFile, bb);
+ }
}
bb.writerIndex(SIZE_BYTE);
@@ -2846,6 +2843,18 @@
return numberOfElementsPerFile;
}
+ /**
+ * @param currentFile
+ * @param bb
+ */
+ public void fillNumberOfRecords(JournalFile currentFile, MessagingBuffer bb)
+ {
+ bb.writerIndex(SIZE_BYTE + SIZE_INT + SIZE_LONG);
+
+ bb.writeInt(getCounter(currentFile).intValue());
+
+ }
+
/** 99.99 % of the times previous files will be already synced, since they are scheduled to be closed.
* Because of that, this operation should be almost very fast.*/
public void syncPreviousFiles() throws Exception
@@ -2865,11 +2874,11 @@
}
else
{
- for (JournalFile file: pendingFiles)
+ for (JournalFile file : pendingFiles)
{
if (file != currentFile)
{
- file.getFile().waitForClose();
+ file.getFile().waitForClose();
}
}
}
@@ -3161,7 +3170,7 @@
for (int i = 0; i < pages; i++)
{
- appendRecord(bb, false, null, null);
+ appendRecord(bb, false, false, null, null);
}
lock.release();
More information about the jboss-cvs-commits
mailing list