[jboss-cvs] JBoss Messaging SVN: r7474 - in branches/clebert_temp_expirement: tests/src/org/jboss/messaging/tests/unit/core/journal/impl and 1 other directory.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Thu Jun 25 19:21:12 EDT 2009
Author: clebert.suconic at jboss.com
Date: 2009-06-25 19:21:12 -0400 (Thu, 25 Jun 2009)
New Revision: 7474
Modified:
branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java
branches/clebert_temp_expirement/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalImplTestUnit.java
Log:
fix on Bytebuffer + changes
Modified: branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java
===================================================================
--- branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java 2009-06-25 21:42:56 UTC (rev 7473)
+++ branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java 2009-06-25 23:21:12 UTC (rev 7474)
@@ -64,6 +64,7 @@
import org.jboss.messaging.core.journal.TestableJournal;
import org.jboss.messaging.core.logging.Logger;
import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
+import org.jboss.messaging.tests.util.UnitTestCase;
import org.jboss.messaging.utils.Pair;
import org.jboss.messaging.utils.VariableLatch;
@@ -346,13 +347,7 @@
ChannelBuffer bb = newBuffer(size);
- bb.writeByte(UPDATE_RECORD);
- bb.writeInt(-1); // skip ID part
- bb.writeLong(id);
- bb.writeInt(record.getEncodeSize());
- bb.writeByte(recordType);
- record.encode(bb);
- bb.writeInt(size);
+ writeUpdateRecord(-1, id, recordType, record, size, bb);
callback = getSyncCallback(sync);
@@ -508,14 +503,7 @@
ChannelBuffer bb = newBuffer(size);
- bb.writeByte(UPDATE_RECORD_TX);
- bb.writeInt(-1); // skip ID part
- bb.writeLong(txID);
- bb.writeLong(id);
- bb.writeInt(record.getEncodeSize());
- bb.writeByte(recordType);
- record.encode(bb);
- bb.writeInt(size);
+ writeUpdateRecordTX(-1, txID, id, recordType, record, size, bb);
JournalTransaction tx = getTransactionInfo(txID);
@@ -889,17 +877,19 @@
class Compactor implements JournalReader
{
- JournalFile currentOutputFile;
+
+
+ JournalFile currentFile;
SequentialFile sequentialFile;
int fileID;
- ByteBuffer bufferWrite;
-
ChannelBuffer channelWrapper;
int nextOrderingID;
+
+ final List<JournalFile> newDataFiles = new ArrayList<JournalFile>();
final Map<Long, JournalRecord> recordsSnapshot;
@@ -920,13 +910,13 @@
private void checkSize(int size) throws Exception
{
- if (bufferWrite == null)
+ if (channelWrapper == null)
{
openFile();
}
else
{
- if (bufferWrite.position() + size > bufferWrite.limit())
+ if (channelWrapper.writerIndex() + size > channelWrapper.capacity())
{
openFile();
}
@@ -935,14 +925,15 @@
public void flush() throws Exception
{
- if (bufferWrite != null)
+ if (channelWrapper != null)
{
sequentialFile.position(0);
- sequentialFile.write(bufferWrite, true);
+ sequentialFile.write(channelWrapper, true);
sequentialFile.close();
+ newDataFiles.add(currentFile);
}
- bufferWrite = null;
+ channelWrapper = null;
}
/**
@@ -952,18 +943,28 @@
{
flush();
- bufferWrite = fileFactory.newBuffer(fileSize);
+ ByteBuffer bufferWrite = fileFactory.newBuffer(fileSize);
channelWrapper = ChannelBuffers.wrappedBuffer(bufferWrite);
- currentOutputFile = getFile(false, false);
- sequentialFile = currentOutputFile.getFile();
+ currentFile = getFile(false, false);
+ sequentialFile = currentFile.getFile();
sequentialFile.renameTo(sequentialFile.getFileName() + ".cmp");
sequentialFile.open(1);
- currentOutputFile = new JournalFileImpl(sequentialFile, nextOrderingID, nextOrderingID);
- fileID = nextOrderingID;
+ currentFile = new JournalFileImpl(sequentialFile, nextOrderingID, nextOrderingID);
+ fileID = nextOrderingID++;
System.out.println("Next OrderingID = " + nextOrderingID);
- bufferWrite.putInt(nextOrderingID);
- bufferWrite.putInt(nextOrderingID++);
+
+
+ channelWrapper.writeInt(fileID);
+ channelWrapper.writeInt(fileID);
+
+
+
+ for (int i = 0 ; i < 1000; i++)
+ {
+ channelWrapper.writeByte(UnitTestCase.getSamplebyte(i));
+ }
+
}
public void addRecord(RecordInfo info) throws Exception
@@ -1005,10 +1006,9 @@
size,
channelWrapper);
}
- else if (recordsSnapshot.get(info.id) != null)
+ else
{
- // AddRecordTX for a committed record, just converting it as a regular record
- // The record is already confirmed. There is no need to keep the transaction information during compacting
+ // Will try it as a regular record, the method addRecord will validate if this is a live record or not
addRecord(info);
}
}
@@ -1027,8 +1027,6 @@
public void deleteRecord(long recordID) throws Exception
{
- // nothing to be done here, if it is a delete, the record is already gone.. so.. no worries
-
if (records.get(recordID) != null)
{
// Sanity check, it should never happen
@@ -1037,24 +1035,24 @@
}
- public void deleteRecordTX(long transactionID, RecordInfo recordInfo) throws Exception
+ public void deleteRecordTX(long transactionID, RecordInfo info) throws Exception
{
if (pendingTransactions.get(transactionID) != null)
{
JournalTransaction newTransaction = getNewJournalTransaction(transactionID);
- int size = SIZE_DELETE_RECORD_TX + recordInfo.data.length;
-
+ int size = SIZE_DELETE_RECORD_TX + info.data.length;
+
checkSize(size);
-
+
writeDeleteRecordTransactional(fileID,
transactionID,
- recordInfo.id,
- new ByteArrayEncoding(recordInfo.data),
+ info.id,
+ new ByteArrayEncoding(info.data),
size,
channelWrapper);
-
- newTransaction.addNegative(currentFile, recordInfo.id);
+
+ newTransaction.addNegative(currentFile, info.id);
}
}
@@ -1102,16 +1100,58 @@
{
if (recordsSnapshot.get(info.id) != null)
{
- System.out.println("Update " + info.id + " to be out on compacted file");
+ System.out.println("UpdateRecord on compacting");
+ int size = SIZE_UPDATE_RECORD + info.data.length;
+
+ checkSize(size);
+
+ JournalRecord newRecord = newRecords.get(info.id);
+
+ if (newRecord == null)
+ {
+ log.warn("Couldn't find addRecord information for record " + info.id + " during compacting");
+ }
+
+
+ writeUpdateRecord(fileID,
+ info.id,
+ info.userRecordType,
+ new ByteArrayEncoding(info.data),
+ size,
+ channelWrapper);
+
+ newRecord.addUpdateFile(currentFile);
+
}
}
public void updateRecordTX(long transactionID, RecordInfo info) throws Exception
{
- if (recordsSnapshot.get(info.id) != null)
+
+ if (pendingTransactions.get(transactionID) != null)
{
- System.out.println("UpdateTX " + info.id + " to be out on compacted file");
+ JournalTransaction newTransaction = getNewJournalTransaction(transactionID);
+
+ int size = SIZE_UPDATE_RECORD_TX + info.data.length;
+
+ checkSize(size);
+
+ writeUpdateRecordTX(fileID,
+ transactionID,
+ info.id,
+ info.userRecordType,
+ new ByteArrayEncoding(info.data),
+ size,
+ channelWrapper);
+
+
+ newTransaction.addPositive(currentFile, info.id);
}
+ else
+ {
+
+ updateRecord(info);
+ }
}
/**
@@ -2258,12 +2298,61 @@
}
/**
+ * @param txID
* @param id
* @param recordType
* @param record
* @param size
* @param bb
*/
+ private void writeUpdateRecordTX(final int fileID,
+ final long txID,
+ final long id,
+ final byte recordType,
+ final EncodingSupport record,
+ int size,
+ ChannelBuffer bb)
+ {
+ bb.writeByte(UPDATE_RECORD_TX);
+ bb.writeInt(fileID);
+ bb.writeLong(txID);
+ bb.writeLong(id);
+ bb.writeInt(record.getEncodeSize());
+ bb.writeByte(recordType);
+ record.encode(bb);
+ bb.writeInt(size);
+ }
+
+ /**
+ * @param id
+ * @param recordType
+ * @param record
+ * @param size
+ * @param bb
+ */
+ private void writeUpdateRecord(final int fileId,
+ final long id,
+ final byte recordType,
+ final EncodingSupport record,
+ int size,
+ ChannelBuffer bb)
+ {
+ bb.writeByte(UPDATE_RECORD);
+ bb.writeInt(fileId); // skip ID part
+ bb.writeLong(id);
+ bb.writeInt(record.getEncodeSize());
+ bb.writeByte(recordType);
+ record.encode(bb);
+ bb.writeInt(size);
+ }
+
+ /**
+ * @param id
+ * @param recordType
+ * @param record
+ * @param size
+ * @param bb
+ */
private void writeAddRecord(final int fileId,
final long id,
final byte recordType,
Modified: branches/clebert_temp_expirement/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalImplTestUnit.java
===================================================================
--- branches/clebert_temp_expirement/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalImplTestUnit.java 2009-06-25 21:42:56 UTC (rev 7473)
+++ branches/clebert_temp_expirement/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalImplTestUnit.java 2009-06-25 23:21:12 UTC (rev 7474)
@@ -2353,7 +2353,6 @@
EncodingSupport xid = new SimpleEncoding(10, (byte)'p');
prepare(1, xid);
-
stopJournal();
createJournal();
@@ -3078,8 +3077,7 @@
assertEquals(0, journal.getDataFilesCount());
}
-
-
+
public void testCompactwithPendingPrepare() throws Exception
{
}
@@ -3091,14 +3089,14 @@
public void testCompactwithConcurrentAppend() throws Exception
{
}
-
+
public void testCompactWithPendingTransactionAndDelete() throws Exception
{
}
-
+
public void testCompactingWithPendingTransaction() throws Exception
{
-
+
}
public void testSimpleCompacting() throws Exception
@@ -3108,12 +3106,12 @@
createJournal();
startJournal();
load();
-
+
int NUMBER_OF_RECORDS = 100;
long transactionID = 0;
- for (int i = 0; i < NUMBER_OF_RECORDS/2; i++)
+ for (int i = 0; i < NUMBER_OF_RECORDS / 2; i++)
{
add(i);
if (i % 10 == 0 && i > 0)
@@ -3123,7 +3121,7 @@
update(i);
}
- for (int i = NUMBER_OF_RECORDS/2; i < NUMBER_OF_RECORDS; i++)
+ for (int i = NUMBER_OF_RECORDS / 2; i < NUMBER_OF_RECORDS; i++)
{
addTx(transactionID, i);
@@ -3156,77 +3154,74 @@
journal.compact();
- for (String fileName: fileFactory.listFiles("cmp"))
+ for (String fileName : fileFactory.listFiles("cmp"))
{
System.out.println("File = " + fileName);
-
+
SequentialFile readFile = fileFactory.createSequentialFile(fileName, 1);
- ;
((JournalImpl)journal).readJournalFile(new JournalFileImpl(readFile, 1, 1), new JournalReader()
{
public void addRecord(RecordInfo info) throws Exception
{
- System.out.println("recordID = " + info.id);
+ System.out.println("AddrecordID = " + info.id);
}
public void addRecordTX(long transactionID, RecordInfo recordInfo) throws Exception
{
- System.out.println("RecordIDTX = " + transactionID + ", recordID=" + recordInfo.id);
+ System.out.println("UpdRecordTX = " + transactionID + ", recordID=" + recordInfo.id);
}
public void commitRecord(long transactionID, int numberOfRecords) throws Exception
{
// TODO Auto-generated method stub
-
+
}
public void deleteRecord(long recordID) throws Exception
{
// TODO Auto-generated method stub
-
+
}
public void deleteRecordTX(long transactionID, RecordInfo recordInfo) throws Exception
{
// TODO Auto-generated method stub
-
+
}
public void markAsDataFile(JournalFile file)
{
// TODO Auto-generated method stub
-
+
}
public void prepareRecord(long transactionID, byte[] extraData, int numberOfRecords) throws Exception
{
// TODO Auto-generated method stub
-
+
}
public void rollbackRecord(long transactionID) throws Exception
{
// TODO Auto-generated method stub
-
+
}
public void updateRecord(RecordInfo recordInfo) throws Exception
{
- // TODO Auto-generated method stub
-
+ System.out.println("UpdRecordID : " + recordInfo.id);
+
}
public void updateRecordTX(long transactionID, RecordInfo recordInfo) throws Exception
{
- // TODO Auto-generated method stub
-
+ System.out.println("UpdRecordID : " + recordInfo.id);
+
}
-
+
});
}
-
-
}
More information about the jboss-cvs-commits
mailing list