[jboss-cvs] JBoss Messaging SVN: r4691 - in trunk: src/main/org/jboss/messaging/core/journal/impl and 2 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Thu Jul 17 16:47:54 EDT 2008
Author: clebert.suconic at jboss.com
Date: 2008-07-17 16:47:53 -0400 (Thu, 17 Jul 2008)
New Revision: 4691
Added:
trunk/src/main/org/jboss/messaging/core/journal/LoadManager.java
Modified:
trunk/src/main/org/jboss/messaging/core/journal/Journal.java
trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java
trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java
trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/EasyMockJournalTest.java
Log:
More Journal work
Modified: trunk/src/main/org/jboss/messaging/core/journal/Journal.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/Journal.java 2008-07-17 16:02:03 UTC (rev 4690)
+++ trunk/src/main/org/jboss/messaging/core/journal/Journal.java 2008-07-17 20:47:53 UTC (rev 4691)
@@ -70,8 +70,11 @@
// Load
- long load(List<RecordInfo> committedRecords,
- List<PreparedTransactionInfo> preparedTransactions) throws Exception;
+ long load(List<RecordInfo> committedRecords,
+ List<PreparedTransactionInfo> preparedTransactions) throws Exception;
+
+ long load(LoadManager reloadManager) throws Exception;
+
// Start and stop reclaimer
Added: trunk/src/main/org/jboss/messaging/core/journal/LoadManager.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/LoadManager.java (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/journal/LoadManager.java 2008-07-17 20:47:53 UTC (rev 4691)
@@ -0,0 +1,39 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.core.journal;
+
+/**
+ *
+ * @author <a href="mailto:clebert.suconic at jboss.com">Clebert Suconic</a>
+ *
+ */
+public interface LoadManager
+{
+ void addRecord(RecordInfo info);
+
+ void deleteRecord(long id);
+
+ void updateRecord(RecordInfo info);
+
+ void addPreparedTransaction(PreparedTransactionInfo preparedTransaction);
+}
Modified: trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java 2008-07-17 16:02:03 UTC (rev 4690)
+++ trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java 2008-07-17 20:47:53 UTC (rev 4691)
@@ -49,6 +49,7 @@
import org.jboss.messaging.core.journal.EncodingSupport;
import org.jboss.messaging.core.journal.IOCallback;
+import org.jboss.messaging.core.journal.LoadManager;
import org.jboss.messaging.core.journal.PreparedTransactionInfo;
import org.jboss.messaging.core.journal.RecordInfo;
import org.jboss.messaging.core.journal.SequentialFile;
@@ -276,8 +277,8 @@
bb.putByte(ADD_RECORD);
bb.position(SIZE_BYTE + SIZE_INT); // skip ID part
+ bb.putLong(id);
bb.putInt(recordLength);
- bb.putLong(id);
bb.putByte(recordType);
record.encode(bb);
bb.putInt(size);
@@ -302,8 +303,8 @@
bb.put(ADD_RECORD);
bb.position(SIZE_BYTE + SIZE_INT); // skip ID part
+ bb.putLong(id);
bb.putInt(record.length);
- bb.putLong(id);
bb.put(recordType);
bb.put(record);
bb.putInt(size);
@@ -334,8 +335,8 @@
bb.put(UPDATE_RECORD);
bb.position(SIZE_BYTE + SIZE_INT); // skip ID part
+ bb.putLong(id);
bb.putInt(record.length);
- bb.putLong(id);
bb.put(recordType);
bb.put(record);
bb.putInt(size);
@@ -366,8 +367,8 @@
bb.putByte(UPDATE_RECORD);
bb.position(SIZE_BYTE + SIZE_INT); // skip ID part
+ bb.putLong(id);
bb.putInt(record.getEncodeSize());
- bb.putLong(id);
bb.putByte(recordType);
record.encode(bb);
bb.putInt(size);
@@ -427,10 +428,10 @@
bb.putByte(ADD_RECORD_TX);
bb.position(SIZE_BYTE + SIZE_INT); // skip ID part
+ bb.putLong(txID);
+ bb.putLong(id);
bb.putInt(recordLength);
- bb.putLong(txID);
bb.putByte(recordType);
- bb.putLong(id);
record.encode(bb);
bb.putInt(size);
bb.rewind();
@@ -455,10 +456,10 @@
bb.put(ADD_RECORD_TX);
bb.position(SIZE_BYTE + SIZE_INT); // skip ID part
+ bb.putLong(txID);
+ bb.putLong(id);
bb.putInt(record.length);
- bb.putLong(txID);
bb.put(recordType);
- bb.putLong(id);
bb.put(record);
bb.putInt(size);
bb.rewind();
@@ -483,10 +484,10 @@
bb.put(UPDATE_RECORD_TX);
bb.position(SIZE_BYTE + SIZE_INT); // skip ID part
+ bb.putLong(txID);
+ bb.putLong(id);
bb.putInt(record.length);
- bb.putLong(txID);
bb.put(recordType);
- bb.putLong(id);
bb.put(record);
bb.putInt(size);
bb.rewind();
@@ -512,10 +513,10 @@
bb.putByte(UPDATE_RECORD_TX);
bb.position(SIZE_BYTE + SIZE_INT); // skip ID part
+ bb.putLong(txID);
+ bb.putLong(id);
bb.putInt(record.getEncodeSize());
- bb.putLong(txID);
bb.putByte(recordType);
- bb.putLong(id);
record.encode(bb);
bb.putInt(size);
bb.rewind();
@@ -649,61 +650,64 @@
public synchronized long load(final List<RecordInfo> committedRecords,
final List<PreparedTransactionInfo> preparedTransactions) throws Exception
- {
- if (state != STATE_STARTED)
- {
- throw new IllegalStateException("Journal must be in started state");
- }
+ {
- addShutdownHook();
+ final Set<Long> recordsToDelete = new HashSet<Long>();
+ final List<RecordInfo> records = new ArrayList<RecordInfo>();
+
- Set<Long> recordsToDelete = new HashSet<Long>();
-
- Map<Long, TransactionHolder> transactions = new LinkedHashMap<Long, TransactionHolder>();
-
- List<RecordInfo> records = new ArrayList<RecordInfo>();
-
- List<String> fileNames = fileFactory.listFiles(fileExtension);
-
- List<JournalFile> orderedFiles = new ArrayList<JournalFile>(fileNames.size());
-
- for (String fileName: fileNames)
+ long maxID = load (new LoadManager()
{
- SequentialFile file = fileFactory.createSequentialFile(fileName, maxAIO, aioTimeout);
-
- file.open();
-
- ByteBuffer bb = fileFactory.newBuffer(SIZE_INT);
-
- file.read(bb);
-
- int orderingID = bb.getInt();
-
- if (nextOrderingId.get() < orderingID)
+
+ public void addPreparedTransaction(
+ PreparedTransactionInfo preparedTransaction)
{
- nextOrderingId.set(orderingID);
+ preparedTransactions.add(preparedTransaction);
}
+
+ public void addRecord(RecordInfo info)
+ {
+ records.add(info);
+ }
+
+ public void updateRecord(RecordInfo info)
+ {
+ records.add(info);
+ }
+
+ public void deleteRecord(long id)
+ {
+ recordsToDelete.add(id);
+ }
- orderedFiles.add(new JournalFileImpl(file, orderingID));
-
- file.close();
- }
+ });
- //Now order them by ordering id - we can't use the file name for ordering since we can re-use dataFiles
- class JournalFileComparator implements Comparator<JournalFile>
+ for (RecordInfo record: records)
{
- public int compare(JournalFile f1, JournalFile f2)
+ if (!recordsToDelete.contains(record.id))
{
- int id1 = f1.getOrderingID();
- int id2 = f2.getOrderingID();
-
- return (id1 < id2 ? -1 : (id1 == id2 ? 0 : 1));
+ committedRecords.add(record);
}
}
- Collections.sort(orderedFiles, new JournalFileComparator());
+ return maxID;
+ }
+
+ public synchronized long load (LoadManager loadManager) throws Exception
+ {
+ if (state != STATE_STARTED)
+ {
+ throw new IllegalStateException("Journal must be in started state");
+ }
+
+ addShutdownHook();
+
+ Map<Long, TransactionHolder> transactions = new LinkedHashMap<Long, TransactionHolder>();
+
+ List<JournalFile> orderedFiles = orderFiles();
+
int lastDataPos = -1;
long maxTransactionID = -1;
@@ -753,12 +757,45 @@
int readFileId = bb.getInt();
+ if (readFileId != file.getOrderingID())
+ {
+ bb.position(pos + 1);
+ //log.info("Record read at position " + pos + " doesn't belong to this current journal file, ignoring it!");
+ continue;
+ }
+
+ long transactionID = 0;
+
+ if (isTransaction(recordType))
+ {
+ if (bb.position() + SIZE_LONG > fileSize)
+ {
+ continue;
+ }
+ transactionID = bb.getLong();
+ maxTransactionID = Math.max(maxTransactionID, transactionID);
+
+ }
+
+ long recordID = 0;
+ if (!isCompleteTransaction(recordType))
+ {
+ if (bb.position() + SIZE_LONG > fileSize)
+ {
+ continue;
+ }
+ recordID = bb.getLong();
+ maxMessageID = Math.max(maxMessageID, recordID);
+
+ }
+
+
// The variable record portion used on Updates and Appends
int variableSize = 0;
- // The record size (without the variable portion)
- int recordSize = 0;
+ byte userRecordType = 0;
+ byte record[] = null;
- if (recordType >= ADD_RECORD && recordType <= UPDATE_RECORD_TX)
+ if (isContainsBody(recordType))
{
if (bb.position() + SIZE_INT > fileSize)
{
@@ -766,43 +803,20 @@
}
variableSize = bb.getInt();
- }
-
- switch(recordType)
- {
- case ADD_RECORD:
- recordSize = SIZE_ADD_RECORD;
- break;
- case UPDATE_RECORD:
- recordSize = SIZE_UPDATE_RECORD;
- break;
- case ADD_RECORD_TX:
- recordSize = SIZE_ADD_RECORD_TX;
- break;
- case UPDATE_RECORD_TX:
- recordSize = SIZE_UPDATE_RECORD_TX;
- break;
- case DELETE_RECORD:
- recordSize = SIZE_DELETE_RECORD;
- break;
- case DELETE_RECORD_TX:
- recordSize = SIZE_DELETE_RECORD_TX;
- break;
- case PREPARE_RECORD:
- recordSize = SIZE_PREPARE_RECORD;
- break;
- case COMMIT_RECORD:
- recordSize = SIZE_COMMIT_RECORD;
- break;
- case ROLLBACK_RECORD:
- recordSize = SIZE_ROLLBACK_RECORD;
- break;
- default:
- // Sanity check, this was previously tested, nothing different should be on this switch
- throw new IllegalStateException("Record other than expected");
+ if (bb.position() + variableSize > fileSize)
+ {
+ continue;
+ }
+
+ userRecordType = bb.get();
+
+ record = new byte[variableSize];
+ bb.get(record);
}
+ int recordSize = getRecordSize(recordType);
+
if (pos + recordSize + variableSize > fileSize)
{
continue;
@@ -821,53 +835,27 @@
continue;
}
- if (readFileId != file.getOrderingID())
- {
- //log.info("Record read at position " + pos + " doesn't belong to this current journal file, ignoring it!");
- continue;
- }
-
bb.position(oldPos);
-
-
switch(recordType)
{
case ADD_RECORD:
{
- long id = bb.getLong();
+ loadManager.addRecord(new RecordInfo(recordID, userRecordType, record, false));
- maxMessageID = Math.max(maxMessageID, id);
+ posFilesMap.put(recordID, new PosFiles(file));
- byte userRecordType = bb.get();
-
- byte[] record = new byte[variableSize];
-
- bb.get(record);
+ hasData = true;
- records.add(new RecordInfo(id, userRecordType, record, false));
- hasData = true;
-
- posFilesMap.put(id, new PosFiles(file));
-
break;
}
case UPDATE_RECORD:
{
- long id = bb.getLong();
-
- maxMessageID = Math.max(maxMessageID, id);
-
- byte userRecordType = bb.get();
-
- byte[] record = new byte[variableSize];
- bb.get(record);
-
- records.add(new RecordInfo(id, userRecordType, record, true));
+ loadManager.updateRecord(new RecordInfo(recordID, userRecordType, record, true));
hasData = true;
file.incPosCount();
- PosFiles posFiles = posFilesMap.get(id);
+ PosFiles posFiles = posFilesMap.get(recordID);
if (posFiles != null)
{
@@ -881,14 +869,10 @@
}
case DELETE_RECORD:
{
- long id = bb.getLong();
-
- maxMessageID = Math.max(maxMessageID, id);
-
- recordsToDelete.add(id);
+ loadManager.deleteRecord(recordID);
hasData = true;
- PosFiles posFiles = posFilesMap.remove(id);
+ PosFiles posFiles = posFilesMap.remove(recordID);
if (posFiles != null)
{
@@ -898,108 +882,55 @@
break;
}
case ADD_RECORD_TX:
+ case UPDATE_RECORD_TX:
{
- long txID = bb.getLong();
- maxTransactionID = Math.max(maxTransactionID, txID);
+ TransactionHolder tx = transactions.get(transactionID);
- byte userRecordType = bb.get();
-
- long id = bb.getLong();
- maxMessageID = Math.max(maxMessageID, id);
-
- byte[] record = new byte[variableSize];
- bb.get(record);
-
- TransactionHolder tx = transactions.get(txID);
-
if (tx == null)
{
- tx = new TransactionHolder(txID);
- transactions.put(txID, tx);
+ tx = new TransactionHolder(transactionID);
+ transactions.put(transactionID, tx);
}
- tx.recordInfos.add(new RecordInfo(id, userRecordType, record, false));
+ tx.recordInfos.add(new RecordInfo(recordID, userRecordType, record, recordType==UPDATE_RECORD_TX?true:false));
- JournalTransaction tnp = transactionInfos.get(txID);
+ JournalTransaction tnp = transactionInfos.get(transactionID);
if (tnp == null)
{
tnp = new JournalTransaction();
- transactionInfos.put(txID, tnp);
+ transactionInfos.put(transactionID, tnp);
}
- tnp.addPositive(file, id);
+ tnp.addPositive(file, recordID);
hasData = true;
break;
}
- case UPDATE_RECORD_TX:
- {
- long txID = bb.getLong();
- maxTransactionID = Math.max(maxTransactionID, txID);
-
- byte userRecordType = bb.get();
-
- long id = bb.getLong();
- maxMessageID = Math.max(maxMessageID, id);
-
- byte[] record = new byte[variableSize];
- bb.get(record);
-
- TransactionHolder tx = transactions.get(txID);
-
- if (tx == null)
- {
- tx = new TransactionHolder(txID);
- transactions.put(txID, tx);
- }
-
- tx.recordInfos.add(new RecordInfo(id, userRecordType, record, true));
-
- JournalTransaction tnp = transactionInfos.get(txID);
-
- if (tnp == null)
- {
- tnp = new JournalTransaction();
-
- transactionInfos.put(txID, tnp);
- }
-
- tnp.addPositive(file, id);
-
- hasData = true;
-
- break;
- }
case DELETE_RECORD_TX:
{
- long txID = bb.getLong();
- maxTransactionID = Math.max(maxTransactionID, txID);
- long id = bb.getLong();
- maxMessageID = Math.max(maxMessageID, id);
+ TransactionHolder tx = transactions.get(transactionID);
- TransactionHolder tx = transactions.get(txID);
-
if (tx == null)
{
- tx = new TransactionHolder(txID);
- transactions.put(txID, tx);
+ tx = new TransactionHolder(transactionID);
+ transactions.put(transactionID, tx);
}
- tx.recordsToDelete.add(id);
+ tx.recordsToDelete.add(recordID);
- JournalTransaction tnp = transactionInfos.get(txID);
+ JournalTransaction tnp = transactionInfos.get(transactionID);
if (tnp == null)
{
tnp = new JournalTransaction();
- transactionInfos.put(txID, tnp);
+ transactionInfos.put(transactionID, tnp);
}
- tnp.addNegative(file, id);
+ tnp.addNegative(file, recordID);
hasData = true;
@@ -1007,25 +938,22 @@
}
case PREPARE_RECORD:
{
- long txID = bb.getLong();
int numberOfElements = bb.getInt();
- maxTransactionID = Math.max(maxTransactionID, txID);
-
- TransactionHolder tx = transactions.get(txID);
+ TransactionHolder tx = transactions.get(transactionID);
if (tx == null)
{
- throw new IllegalStateException("Cannot find tx with id " + txID);
+ throw new IllegalStateException("Cannot find tx with id " + transactionID);
}
tx.prepared = true;
- JournalTransaction journalTransaction = transactionInfos.get(txID);
+ JournalTransaction journalTransaction = transactionInfos.get(transactionID);
if (journalTransaction == null)
{
- throw new IllegalStateException("Cannot find tx " + txID);
+ throw new IllegalStateException("Cannot find tx " + transactionID);
}
if (numberOfElements == journalTransaction.getNumberOfElements())
@@ -1044,31 +972,43 @@
}
case COMMIT_RECORD:
{
- long txID = bb.getLong();
int numberOfElements = bb.getInt();
- maxTransactionID = Math.max(maxTransactionID, txID);
- TransactionHolder tx = transactions.remove(txID);
+ TransactionHolder tx = transactions.remove(transactionID);
if (tx != null)
{
- JournalTransaction tnp = transactionInfos.remove(txID);
+ JournalTransaction tnp = transactionInfos.remove(transactionID);
if (tnp == null)
{
- throw new IllegalStateException("Cannot find tx " + txID);
+ throw new IllegalStateException("Cannot find tx " + transactionID);
}
if (numberOfElements == tnp.getNumberOfElements())
{
- records.addAll(tx.recordInfos);
- recordsToDelete.addAll(tx.recordsToDelete);
+ for (RecordInfo txRecord: tx.recordInfos)
+ {
+ if (txRecord.isUpdate)
+ {
+ loadManager.updateRecord(txRecord);
+ }
+ else
+ {
+ loadManager.addRecord(txRecord);
+ }
+ }
+
+ for (Long deleteValue: tx.recordsToDelete)
+ {
+ loadManager.deleteRecord(deleteValue);
+ }
tnp.commit(file);
}
else
{
- log.warn("Transaction " + txID + " is missing " + (numberOfElements - tnp.getNumberOfElements()) + " so the transaction is being ignored");
+ log.warn("Transaction " + transactionID + " is missing " + (numberOfElements - tnp.getNumberOfElements()) + " so the transaction is being ignored");
tnp.rollback(file);
}
@@ -1079,20 +1019,17 @@
}
case ROLLBACK_RECORD:
{
- long txID = bb.getLong();
/* int numberOfElements = */ bb.getInt(); // Not being currently used
- maxTransactionID = Math.max(maxTransactionID, txID);
+ TransactionHolder tx = transactions.remove(transactionID);
- TransactionHolder tx = transactions.remove(txID);
-
if (tx != null)
{
- JournalTransaction tnp = transactionInfos.remove(txID);
+ JournalTransaction tnp = transactionInfos.remove(transactionID);
if (tnp == null)
{
- throw new IllegalStateException("Cannot find tx " + txID);
+ throw new IllegalStateException("Cannot find tx " + transactionID);
}
tnp.rollback(file);
@@ -1180,14 +1117,6 @@
pushOpenedFile();
- for (RecordInfo record: records)
- {
- if (!recordsToDelete.contains(record.id))
- {
- committedRecords.add(record);
- }
- }
-
for (TransactionHolder transaction: transactions.values())
{
if (!transaction.prepared || transaction.invalid)
@@ -1215,15 +1144,15 @@
info.recordsToDelete.addAll(transaction.recordsToDelete);
- preparedTransactions.add(info);
+ loadManager.addPreparedTransaction(info);
}
}
state = STATE_LOADED;
return maxMessageID;
- }
-
+ }
+
public int getAlignment() throws Exception
{
return this.fileFactory.getAlignment();
@@ -1537,6 +1466,108 @@
// Private -----------------------------------------------------------------------------
+ private boolean isTransaction(final byte recordType)
+ {
+ return recordType == ADD_RECORD_TX || recordType == UPDATE_RECORD_TX ||
+ recordType == DELETE_RECORD_TX || isCompleteTransaction(recordType);
+ }
+
+ private boolean isCompleteTransaction(final byte recordType)
+ {
+ return recordType == COMMIT_RECORD || recordType == PREPARE_RECORD || recordType == ROLLBACK_RECORD;
+ }
+
+ private boolean isContainsBody(final byte recordType)
+ {
+ return recordType >= ADD_RECORD && recordType <= UPDATE_RECORD_TX;
+ }
+
+ private int getRecordSize(byte recordType)
+ {
+ // The record size (without the variable portion)
+ int recordSize = 0;
+ switch(recordType)
+ {
+ case ADD_RECORD:
+ recordSize = SIZE_ADD_RECORD;
+ break;
+ case UPDATE_RECORD:
+ recordSize = SIZE_UPDATE_RECORD;
+ break;
+ case ADD_RECORD_TX:
+ recordSize = SIZE_ADD_RECORD_TX;
+ break;
+ case UPDATE_RECORD_TX:
+ recordSize = SIZE_UPDATE_RECORD_TX;
+ break;
+ case DELETE_RECORD:
+ recordSize = SIZE_DELETE_RECORD;
+ break;
+ case DELETE_RECORD_TX:
+ recordSize = SIZE_DELETE_RECORD_TX;
+ break;
+ case PREPARE_RECORD:
+ recordSize = SIZE_PREPARE_RECORD;
+ break;
+ case COMMIT_RECORD:
+ recordSize = SIZE_COMMIT_RECORD;
+ break;
+ case ROLLBACK_RECORD:
+ recordSize = SIZE_ROLLBACK_RECORD;
+ break;
+ default:
+ // Sanity check, this was previously tested, nothing different should be on this switch
+ throw new IllegalStateException("Record other than expected");
+
+ }
+ return recordSize;
+ }
+
+ private List<JournalFile> orderFiles() throws Exception
+ {
+ List<String> fileNames = fileFactory.listFiles(fileExtension);
+
+ List<JournalFile> orderedFiles = new ArrayList<JournalFile>(fileNames.size());
+
+ for (String fileName: fileNames)
+ {
+ SequentialFile file = fileFactory.createSequentialFile(fileName, maxAIO, aioTimeout);
+
+ file.open();
+
+ ByteBuffer bb = fileFactory.newBuffer(SIZE_INT);
+
+ file.read(bb);
+
+ int orderingID = bb.getInt();
+
+ if (nextOrderingId.get() < orderingID)
+ {
+ nextOrderingId.set(orderingID);
+ }
+
+ orderedFiles.add(new JournalFileImpl(file, orderingID));
+
+ file.close();
+ }
+
+ //Now order them by ordering id - we can't use the file name for ordering since we can re-use dataFiles
+
+ class JournalFileComparator implements Comparator<JournalFile>
+ {
+ public int compare(JournalFile f1, JournalFile f2)
+ {
+ int id1 = f1.getOrderingID();
+ int id2 = f2.getOrderingID();
+
+ return (id1 < id2 ? -1 : (id1 == id2 ? 0 : 1));
+ }
+ }
+
+ Collections.sort(orderedFiles, new JournalFileComparator());
+ return orderedFiles;
+ }
+
private void clearShutdownHook()
{
if (shutdownHook != null)
Modified: trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java 2008-07-17 16:02:03 UTC (rev 4690)
+++ trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java 2008-07-17 20:47:53 UTC (rev 4691)
@@ -272,6 +272,7 @@
messageIDSequence.set(maxMessageID + 1);
//TODO - recover prepared transactions
+ //TODO - Use load(ReloadManager) instead of Load(lists)
for (RecordInfo record: records)
{
Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/EasyMockJournalTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/EasyMockJournalTest.java 2008-07-17 16:02:03 UTC (rev 4690)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/EasyMockJournalTest.java 2008-07-17 20:47:53 UTC (rev 4691)
@@ -60,8 +60,8 @@
{
EasyMock.expect(file1.write(compareByteBuffer(autoEncode(JournalImpl.ADD_RECORD,
/*FileID*/1,
+ /* ID */14l,
/*RecordLength*/1,
- /* ID */14l,
/*RecordType*/(byte)33,
/* body */(byte)10,
JournalImpl.SIZE_ADD_RECORD + 1)), EasyMock.eq(true))).andReturn(JournalImpl.SIZE_ADD_RECORD + 1);
@@ -79,8 +79,8 @@
EasyMock.expect(file1.write(compareByteBuffer(autoEncode(JournalImpl.ADD_RECORD,
/*FileID*/1,
+ /* ID */14l,
/*RecordLength*/1,
- /* ID */14l,
/*RecordType*/(byte)33,
/* body */(byte)10,
JournalImpl.SIZE_ADD_RECORD + 1)), EasyMock.eq(true))).andReturn(JournalImpl.SIZE_ADD_RECORD + 1);
@@ -98,8 +98,8 @@
{
EasyMock.expect(file1.write(compareByteBuffer(autoEncode(JournalImpl.ADD_RECORD,
/*FileID*/1,
+ /* ID */14l,
/*RecordLength*/1,
- /* ID */14l,
/*RecordType*/(byte)33,
/* body */(byte)10,
JournalImpl.SIZE_ADD_RECORD + 1)), EasyMock.eq(true))).andReturn(JournalImpl.SIZE_ADD_RECORD + 1);
@@ -123,8 +123,8 @@
{
EasyMock.expect(file1.write(compareByteBuffer(autoEncode(JournalImpl.ADD_RECORD,
/*FileID*/1,
+ /* ID */15l,
/*RecordLength*/1,
- /* ID */15l,
/*RecordType*/(byte)33,
/* body */(byte)10,
JournalImpl.SIZE_ADD_RECORD + 1)), EasyMock.eq(true))).andReturn(JournalImpl.SIZE_ADD_RECORD + 1);
@@ -166,10 +166,10 @@
EasyMock.expect(
file1.write(compareByteBuffer(autoEncode(JournalImpl.ADD_RECORD_TX,
/* FileID */1,
+ /* TXID */3l,
+ /* ID */14l,
/* RecordLength */1,
- /* TXID */3l,
/* RecordType */(byte) 33,
- /* ID */14l,
/* body */(byte) 10, JournalImpl.SIZE_ADD_RECORD_TX + 1)),
EasyMock.eq(false))).andReturn(
JournalImpl.SIZE_ADD_RECORD_TX + 1);
@@ -177,10 +177,10 @@
EasyMock.expect(
file1.write(compareByteBuffer(autoEncode(JournalImpl.ADD_RECORD_TX,
/* FileID */1,
+ /* TXID */3l,
+ /* ID */15l,
/* RecordLength */1,
- /* TXID */3l,
/* RecordType */(byte) 33,
- /* ID */15l,
/* body */(byte) 10, JournalImpl.SIZE_ADD_RECORD_TX + 1)),
EasyMock.eq(false))).andReturn(
JournalImpl.SIZE_ADD_RECORD_TX + 1);
@@ -215,10 +215,10 @@
EasyMock.expect(
file1.write(compareByteBuffer(autoEncode(JournalImpl.ADD_RECORD_TX,
/* FileID */1,
+ /* TXID */3l,
+ /* ID */14l,
/* RecordLength */1,
- /* TXID */3l,
/* RecordType */(byte) 33,
- /* ID */14l,
/* body */(byte) 10, JournalImpl.SIZE_ADD_RECORD_TX + 1)),
EasyMock.eq(false))).andReturn(
JournalImpl.SIZE_ADD_RECORD_TX + 1);
@@ -242,24 +242,24 @@
{
EasyMock.expect(file1.write(compareByteBuffer(autoEncode(JournalImpl.ADD_RECORD,
/* FileID */1,
+ /* ID */15l,
/* RecordLength */1,
- /* ID */15l,
/* RecordType */(byte)33,
/* body */(byte)10,
JournalImpl.SIZE_ADD_RECORD + 1)), EasyMock.eq(true))).andReturn(JournalImpl.SIZE_ADD_RECORD + 1);
EasyMock.expect(file1.write(compareByteBuffer(autoEncode(JournalImpl.UPDATE_RECORD,
/* FileID */1,
+ /* ID */15l,
/* RecordLength */1,
- /* ID */15l,
/* RecordType */(byte)34,
/* body */(byte)11,
JournalImpl.SIZE_UPDATE_RECORD + 1)), EasyMock.eq(true))).andReturn(JournalImpl.SIZE_UPDATE_RECORD + 1);
EasyMock.expect(file1.write(compareByteBuffer(autoEncode(JournalImpl.UPDATE_RECORD,
/* FileID */1,
+ /* ID */15l,
/* RecordLength */1,
- /* ID */15l,
/* RecordType */(byte)35,
/* body */(byte)12,
JournalImpl.SIZE_UPDATE_RECORD + 1)), EasyMock.eq(true))).andReturn(JournalImpl.SIZE_UPDATE_RECORD + 1);
@@ -281,27 +281,27 @@
{
EasyMock.expect(file1.write(compareByteBuffer(autoEncode(JournalImpl.ADD_RECORD,
/* FileID */1,
+ /* ID */15l,
/* RecordLength */1,
- /* ID */15l,
/* RecordType */(byte)33,
/* body */(byte)10,
JournalImpl.SIZE_ADD_RECORD + 1)), EasyMock.eq(true))).andReturn(JournalImpl.SIZE_ADD_RECORD + 1);
EasyMock.expect(file1.write(compareByteBuffer(autoEncode(JournalImpl.UPDATE_RECORD_TX,
/* FileID */1,
+ /* TransactionID */33l,
+ /* ID */15l,
/* RecordLength */1,
- /* TransactionID */33l,
/* RecordType */ (byte)34,
- /* ID */15l,
/* body */(byte)11,
JournalImpl.SIZE_UPDATE_RECORD_TX + 1)), EasyMock.eq(false))).andReturn(JournalImpl.SIZE_UPDATE_RECORD_TX + 1);
EasyMock.expect(file1.write(compareByteBuffer(autoEncode(JournalImpl.UPDATE_RECORD_TX,
/* FileID */1,
+ /* TransactionID */33l,
+ /* ID */15l,
/* RecordLength */1,
- /* TransactionID */33l,
/* RecordType */ (byte)35,
- /* ID */15l,
/* body */(byte)12,
JournalImpl.SIZE_UPDATE_RECORD_TX + 1)), EasyMock.eq(false))).andReturn(JournalImpl.SIZE_UPDATE_RECORD_TX + 1);
More information about the jboss-cvs-commits
mailing list