[jboss-cvs] JBoss Messaging SVN: r7501 - in branches/clebert_temp_expirement: tests/src/org/jboss/messaging/tests/integration/journal and 1 other directory.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Tue Jun 30 01:52:22 EDT 2009
Author: clebert.suconic at jboss.com
Date: 2009-06-30 01:52:22 -0400 (Tue, 30 Jun 2009)
New Revision: 7501
Modified:
branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalCompactor.java
branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java
branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalReaderCallback.java
branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalTransaction.java
branches/clebert_temp_expirement/tests/src/org/jboss/messaging/tests/integration/journal/NIOJournalCompactTest.java
Log:
Concurrent transactions changes
Modified: branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalCompactor.java
===================================================================
--- branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalCompactor.java 2009-06-30 03:37:06 UTC (rev 7500)
+++ branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalCompactor.java 2009-06-30 05:52:22 UTC (rev 7501)
@@ -87,35 +87,79 @@
{
return newDataFiles;
}
-
+
public Map<Long, JournalRecord> getNewRecords()
{
return newRecords;
}
-
+
+ public Map<Long, JournalTransaction> getNewTransactions()
+ {
+ return newTransactions;
+ }
+
public JournalCompactor(final SequentialFileFactory fileFactory,
final JournalImpl journal,
- Set<Long> recordsSnapshot,
- int firstFileID)
+ final Set<Long> recordsSnapshot,
+ final int firstFileID)
{
this.fileFactory = fileFactory;
this.journal = journal;
this.recordsSnapshot.addAll(recordsSnapshot);
- this.nextOrderingID = firstFileID;
+ nextOrderingID = firstFileID;
}
/** This methods informs the Compactor about the existence of a pending (non committed) transaction */
- public void addPendingTransaction(long transactionID, long ids[])
+ public void addPendingTransaction(final long transactionID, final long ids[])
{
pendingTransactions.put(transactionID, new PendingTransaction(transactionID, ids));
}
/**
* @param id
+ * @param journalTransaction
+ */
+ public void addCommandCommit(final JournalTransaction liveTransaction, final JournalFile currentFile)
+ {
+ log.info("Adding commit command");
+ pendingCommands.add(new CommitCompactCommand(liveTransaction, currentFile));
+
+ long ids[] = liveTransaction.getPositiveArray();
+
+ PendingTransaction oldTransaction = pendingTransactions.get(liveTransaction.getId());
+ long ids2[] = null;
+
+ if (oldTransaction != null)
+ {
+ ids2 = oldTransaction.pendingIDs;
+ }
+
+ /** If a delete comes for these records, while the compactor still working, we need to be able to take them into account for later deletes
+ * instead of throwing exceptions about non existent records */
+ for (long id : ids)
+ {
+ recordsSnapshot.add(id);
+ }
+
+ for (long id : ids2)
+ {
+ recordsSnapshot.add(id);
+ }
+ }
+
+ public void addCommandRollback(final JournalTransaction liveTransaction, final JournalFile currentFile)
+ {
+ log.info("Adding rollback command");
+ pendingCommands.add(new RollbackCompactCommand(liveTransaction, currentFile));
+ }
+
+ /**
+ * @param id
* @param usedFile
*/
- public void addCommandDelete(long id, JournalFile usedFile)
+ public void addCommandDelete(final long id, final JournalFile usedFile)
{
+ log.info("Adding delete command");
pendingCommands.add(new DeleteCompactCommand(id, usedFile));
}
@@ -123,17 +167,18 @@
* @param id
* @param usedFile
*/
- public void addCommandUpdate(long id, JournalFile usedFile)
+ public void addCommandUpdate(final long id, final JournalFile usedFile)
{
+ log.info("Adding update command");
pendingCommands.add(new UpdateCompactCommand(id, usedFile));
}
- public boolean lookupRecord(long id)
+ public boolean lookupRecord(final long id)
{
return recordsSnapshot.contains(id);
}
- private void checkSize(int size) throws Exception
+ private void checkSize(final int size) throws Exception
{
if (channelWrapper == null)
{
@@ -169,6 +214,7 @@
{
for (CompactCommand command : pendingCommands)
{
+ log.info("Replaying " + command.getClass().getName());
try
{
command.execute();
@@ -178,13 +224,13 @@
log.warn("Error replaying pending commands after compacting", e);
}
}
-
+
pendingCommands.clear();
}
// JournalReaderCallback implementation -------------------------------------------
- public void addRecord(RecordInfo info) throws Exception
+ public void onReadAddRecord(final RecordInfo info) throws Exception
{
if (recordsSnapshot.contains(info.id))
{
@@ -203,7 +249,7 @@
}
}
- public void addRecordTX(long transactionID, RecordInfo info) throws Exception
+ public void onReadAddRecordTX(final long transactionID, final RecordInfo info) throws Exception
{
if (pendingTransactions.get(transactionID) != null)
{
@@ -226,11 +272,11 @@
else
{
// Will try it as a regular record, the method addRecord will validate if this is a live record or not
- addRecord(info);
+ onReadAddRecord(info);
}
}
- public void commitRecord(long transactionID, int numberOfRecords) throws Exception
+ public void onReadCommitRecord(final long transactionID, final int numberOfRecords) throws Exception
{
if (pendingTransactions.get(transactionID) != null)
{
@@ -240,7 +286,7 @@
}
}
- public void deleteRecord(long recordID) throws Exception
+ public void onReadDeleteRecord(final long recordID) throws Exception
{
if (newRecords.get(recordID) != null)
{
@@ -250,7 +296,7 @@
}
- public void deleteRecordTX(long transactionID, RecordInfo info) throws Exception
+ public void onReadDeleteRecordTX(final long transactionID, final RecordInfo info) throws Exception
{
if (pendingTransactions.get(transactionID) != null)
{
@@ -272,12 +318,12 @@
// else.. nothing to be done
}
- public void markAsDataFile(JournalFile file)
+ public void markAsDataFile(final JournalFile file)
{
// nothing to be done here
}
- public void prepareRecord(long transactionID, byte[] extraData, int numberOfRecords) throws Exception
+ public void onReadPrepareRecord(final long transactionID, final byte[] extraData, final int numberOfRecords) throws Exception
{
if (pendingTransactions.get(transactionID) != null)
{
@@ -302,7 +348,7 @@
}
}
- public void rollbackRecord(long transactionID) throws Exception
+ public void onReadRollbackRecord(final long transactionID) throws Exception
{
if (pendingTransactions.get(transactionID) != null)
{
@@ -312,7 +358,7 @@
}
}
- public void updateRecord(RecordInfo info) throws Exception
+ public void onReadUpdateRecord(final RecordInfo info) throws Exception
{
if (recordsSnapshot.contains(info.id))
{
@@ -339,7 +385,7 @@
}
}
- public void updateRecordTX(long transactionID, RecordInfo info) throws Exception
+ public void onReadUpdateRecordTX(final long transactionID, final RecordInfo info) throws Exception
{
if (pendingTransactions.get(transactionID) != null)
{
@@ -361,7 +407,7 @@
}
else
{
- updateRecord(info);
+ onReadUpdateRecord(info);
}
}
@@ -369,7 +415,7 @@
* @param transactionID
* @return
*/
- private JournalTransaction getNewJournalTransaction(long transactionID)
+ private JournalTransaction getNewJournalTransaction(final long transactionID)
{
JournalTransaction newTransaction = newTransactions.get(transactionID);
if (newTransaction == null)
@@ -411,9 +457,8 @@
long id;
JournalFile usedFile;
-
-
- public DeleteCompactCommand(long id, JournalFile usedFile)
+
+ public DeleteCompactCommand(final long id, final JournalFile usedFile)
{
this.id = id;
this.usedFile = usedFile;
@@ -422,23 +467,24 @@
@Override
void execute() throws Exception
{
+ System.out.println("Deleting command " + id);
JournalRecord deleteRecord = journal.getRecords().remove(id);
deleteRecord.delete(usedFile);
}
}
-
+
private class PendingTransaction
{
long transactionID;
+
long pendingIDs[];
-
-
- PendingTransaction(long transactionID, long ids[])
+
+ PendingTransaction(final long transactionID, final long ids[])
{
this.transactionID = transactionID;
- this.pendingIDs = ids;
+ pendingIDs = ids;
}
-
+
}
private class UpdateCompactCommand extends CompactCommand
@@ -446,15 +492,13 @@
long id;
JournalFile usedFile;
-
-
- public UpdateCompactCommand(long id, JournalFile usedFile)
+
+ public UpdateCompactCommand(final long id, final JournalFile usedFile)
{
this.id = id;
this.usedFile = usedFile;
}
-
@Override
void execute() throws Exception
{
@@ -463,13 +507,56 @@
}
}
- /**
- * @param id
- * @param journalTransaction
- */
- public void addCommandCommit(long id, JournalTransaction journalTransaction)
+ private class CommitCompactCommand extends CompactCommand
{
- // TODO Auto-generated method stub
-
+ private final JournalTransaction liveTransaction;
+
+ /** File containing the commit record */
+ private final JournalFile commitFile;
+
+ public CommitCompactCommand(final JournalTransaction liveTransaction, final JournalFile commitFile)
+ {
+ this.liveTransaction = liveTransaction;
+ this.commitFile = commitFile;
+ }
+
+ @Override
+ void execute() throws Exception
+ {
+ JournalTransaction newTransaction = newTransactions.get(liveTransaction.getId());
+ if (newTransaction != null)
+ {
+ liveTransaction.merge(newTransaction);
+ liveTransaction.commit(commitFile);
+ }
+ newTransactions.remove(liveTransaction.getId());
+ }
}
+
+ private class RollbackCompactCommand extends CompactCommand
+ {
+ private final JournalTransaction liveTransaction;
+
+ /** File containing the commit record */
+ private final JournalFile rollbackFile;
+
+ public RollbackCompactCommand(final JournalTransaction liveTransaction, final JournalFile rollbackFile)
+ {
+ this.liveTransaction = liveTransaction;
+ this.rollbackFile = rollbackFile;
+ }
+
+ @Override
+ void execute() throws Exception
+ {
+ JournalTransaction newTransaction = newTransactions.get(liveTransaction.getId());
+ if (newTransaction != null)
+ {
+ liveTransaction.merge(newTransaction);
+ liveTransaction.rollback(rollbackFile);
+ }
+ newTransactions.remove(liveTransaction.getId());
+ }
+ }
+
}
Modified: branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java
===================================================================
--- branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java 2009-06-30 03:37:06 UTC (rev 7500)
+++ branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java 2009-06-30 05:52:22 UTC (rev 7501)
@@ -266,7 +266,11 @@
return records;
}
-
+ public JournalFile getCurrentFile()
+ {
+ return currentFile;
+ }
+
public JournalCompactor getCompactor()
{
return this.compactor;
@@ -864,8 +868,8 @@
for (Map.Entry<Long, JournalTransaction> entry : transactions.entrySet())
{
+ compactor.addPendingTransaction(entry.getKey(), entry.getValue().getPositiveArray());
entry.getValue().setCompacting();
- compactor.addPendingTransaction(entry.getKey(), entry.getValue().getPositiveArray());
}
// We will calculate the new records during compacting, what will take the position the records will take after compacting
@@ -905,13 +909,18 @@
List<JournalFile> newDatafiles = null;
+ JournalCompactor localCompactor = compactor;
+
compactingLock.writeLock().lock();
try
{
- newDatafiles = compactor.getNewDataFiles();
+ // Need to clear the compactor here, or the replay commands will send commands back (infinite loop)
+ this.compactor = null;
+ newDatafiles = localCompactor.getNewDataFiles();
+
// Restore newRecords created during compacting
- for (Map.Entry<Long, JournalRecord> newRecordEntry : compactor.getNewRecords().entrySet())
+ for (Map.Entry<Long, JournalRecord> newRecordEntry : localCompactor.getNewRecords().entrySet())
{
records.put(newRecordEntry.getKey(), newRecordEntry.getValue());
}
@@ -919,17 +928,43 @@
// Restore compacted dataFiles
for (int i = newDatafiles.size() - 1; i >= 0; i--)
{
- dataFiles.addFirst(newDatafiles.get(i));
+ JournalFile fileToAdd = newDatafiles.get(i);
+ if (trace)
+ {
+ trace("Adding file " + fileToAdd + " back as datafile");
+ }
+ dataFiles.addFirst(fileToAdd);
}
+
// Replay pending commands (including updates, deletes and commits)
- compactor.replayPendingCommands();
+ localCompactor.replayPendingCommands();
+
+ // Merge transactions back after compacting
- // Deal with transactions commits that happened during the compacting
+ for (JournalTransaction newTransaction : localCompactor.getNewTransactions().values())
+ {
+ if (trace)
+ {
+ trace("Merging pending transaction " + newTransaction + " after compacting to the journal");
+ }
+ JournalTransaction liveTransaction = this.transactions.get(newTransaction.getId());
+ if (liveTransaction == null)
+ {
+ log.warn("Inconsistency: Can't merge transaction " + newTransaction.getId() + " back into JournalTransactions");
+ }
+ else
+ {
+ liveTransaction.merge(newTransaction);
+ }
+ }
+
+ for (Map.Entry<Long, JournalRecord> record : records.entrySet())
+ {
+ trace("We have " + record.getKey() + " on the list now");
+ }
- this.compactor = null;
-
}
finally
{
@@ -1107,7 +1142,7 @@
int resultLastPost = readJournalFile(file, new JournalReaderCallback()
{
- public void addRecord(RecordInfo info) throws Exception
+ public void onReadAddRecord(RecordInfo info) throws Exception
{
if (trace && LOAD_TRACE)
{
@@ -1120,7 +1155,7 @@
records.put(info.id, new JournalRecord(file));
}
- public void updateRecord(RecordInfo info) throws Exception
+ public void onReadUpdateRecord(RecordInfo info) throws Exception
{
if (trace && LOAD_TRACE)
{
@@ -1142,7 +1177,7 @@
}
}
- public void deleteRecord(long recordID) throws Exception
+ public void onReadDeleteRecord(long recordID) throws Exception
{
if (trace && LOAD_TRACE)
{
@@ -1160,12 +1195,12 @@
}
}
- public void updateRecordTX(long transactionID, RecordInfo info) throws Exception
+ public void onReadUpdateRecordTX(long transactionID, RecordInfo info) throws Exception
{
- addRecordTX(transactionID, info);
+ onReadAddRecordTX(transactionID, info);
}
- public void addRecordTX(long transactionID, RecordInfo info) throws Exception
+ public void onReadAddRecordTX(long transactionID, RecordInfo info) throws Exception
{
if (trace && LOAD_TRACE)
@@ -1198,7 +1233,7 @@
tnp.addPositive(file, info.id);
}
- public void deleteRecordTX(long transactionID, RecordInfo info) throws Exception
+ public void onReadDeleteRecordTX(long transactionID, RecordInfo info) throws Exception
{
if (trace && LOAD_TRACE)
{
@@ -1231,7 +1266,7 @@
}
- public void prepareRecord(long transactionID, byte[] extraData, int numberOfRecords) throws Exception
+ public void onReadPrepareRecord(long transactionID, byte[] extraData, int numberOfRecords) throws Exception
{
if (trace && LOAD_TRACE)
{
@@ -1276,7 +1311,7 @@
}
}
- public void commitRecord(long transactionID, int numberOfRecords) throws Exception
+ public void onReadCommitRecord(long transactionID, int numberOfRecords) throws Exception
{
if (trace && LOAD_TRACE)
{
@@ -1337,7 +1372,7 @@
}
- public void rollbackRecord(long transactionID) throws Exception
+ public void onReadRollbackRecord(long transactionID) throws Exception
{
if (trace && LOAD_TRACE)
{
@@ -2006,37 +2041,37 @@
{
case ADD_RECORD:
{
- reader.addRecord(new RecordInfo(recordID, userRecordType, record, false));
+ reader.onReadAddRecord(new RecordInfo(recordID, userRecordType, record, false));
break;
}
case UPDATE_RECORD:
{
- reader.updateRecord(new RecordInfo(recordID, userRecordType, record, true));
+ reader.onReadUpdateRecord(new RecordInfo(recordID, userRecordType, record, true));
break;
}
case DELETE_RECORD:
{
- reader.deleteRecord(recordID);
+ reader.onReadDeleteRecord(recordID);
break;
}
case ADD_RECORD_TX:
{
- reader.addRecordTX(transactionID, new RecordInfo(recordID, userRecordType, record, false));
+ reader.onReadAddRecordTX(transactionID, new RecordInfo(recordID, userRecordType, record, false));
break;
}
case UPDATE_RECORD_TX:
{
- reader.updateRecordTX(transactionID, new RecordInfo(recordID, userRecordType, record, true));
+ reader.onReadUpdateRecordTX(transactionID, new RecordInfo(recordID, userRecordType, record, true));
break;
}
case DELETE_RECORD_TX:
{
- reader.deleteRecordTX(transactionID, new RecordInfo(recordID, (byte)0, record, true));
+ reader.onReadDeleteRecordTX(transactionID, new RecordInfo(recordID, (byte)0, record, true));
break;
}
@@ -2047,19 +2082,19 @@
wholeFileBuffer.get(extraData);
- reader.prepareRecord(transactionID, extraData, transactionCheckNumberOfRecords);
+ reader.onReadPrepareRecord(transactionID, extraData, transactionCheckNumberOfRecords);
break;
}
case COMMIT_RECORD:
{
- reader.commitRecord(transactionID, transactionCheckNumberOfRecords);
+ reader.onReadCommitRecord(transactionID, transactionCheckNumberOfRecords);
break;
}
case ROLLBACK_RECORD:
{
- reader.rollbackRecord(transactionID);
+ reader.onReadRollbackRecord(transactionID);
break;
}
default:
Modified: branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalReaderCallback.java
===================================================================
--- branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalReaderCallback.java 2009-06-30 03:37:06 UTC (rev 7500)
+++ branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalReaderCallback.java 2009-06-30 05:52:22 UTC (rev 7501)
@@ -34,56 +34,56 @@
*/
public interface JournalReaderCallback
{
- void addRecord(RecordInfo info) throws Exception;
+ void onReadAddRecord(RecordInfo info) throws Exception;
/**
* @param recordInfo
* @throws Exception
*/
- void updateRecord(RecordInfo recordInfo) throws Exception;
+ void onReadUpdateRecord(RecordInfo recordInfo) throws Exception;
/**
* @param recordID
*/
- void deleteRecord(long recordID) throws Exception;
+ void onReadDeleteRecord(long recordID) throws Exception;
/**
* @param transactionID
* @param recordInfo
* @throws Exception
*/
- void addRecordTX(long transactionID, RecordInfo recordInfo) throws Exception;
+ void onReadAddRecordTX(long transactionID, RecordInfo recordInfo) throws Exception;
/**
* @param transactionID
* @param recordInfo
* @throws Exception
*/
- void updateRecordTX(long transactionID, RecordInfo recordInfo) throws Exception;
+ void onReadUpdateRecordTX(long transactionID, RecordInfo recordInfo) throws Exception;
/**
* @param transactionID
* @param recordInfo
*/
- void deleteRecordTX(long transactionID, RecordInfo recordInfo) throws Exception;
+ void onReadDeleteRecordTX(long transactionID, RecordInfo recordInfo) throws Exception;
/**
* @param transactionID
* @param extraData
* @param summaryData
*/
- void prepareRecord(long transactionID, byte[] extraData, int numberOfRecords) throws Exception;
+ void onReadPrepareRecord(long transactionID, byte[] extraData, int numberOfRecords) throws Exception;
/**
* @param transactionID
* @param summaryData
*/
- void commitRecord(long transactionID, int numberOfRecords) throws Exception;
+ void onReadCommitRecord(long transactionID, int numberOfRecords) throws Exception;
/**
* @param transactionID
*/
- void rollbackRecord(long transactionID) throws Exception;
+ void onReadRollbackRecord(long transactionID) throws Exception;
public void markAsDataFile(JournalFile file);
Modified: branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalTransaction.java
===================================================================
--- branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalTransaction.java 2009-06-30 03:37:06 UTC (rev 7500)
+++ branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalTransaction.java 2009-06-30 05:52:22 UTC (rev 7501)
@@ -51,8 +51,8 @@
private List<Pair<JournalFile, Long>> neg;
- private long id;
-
+ private final long id;
+
// All the files this transaction is touching on.
// We can't have those files being reclaimed if there is a pending transaction
private Set<JournalFile> pendingFiles;
@@ -73,6 +73,14 @@
this.journal = journal;
}
+ /**
+ * @return the id
+ */
+ public long getId()
+ {
+ return id;
+ }
+
public int getCounter(final JournalFile file)
{
return internalgetCounter(file).intValue();
@@ -288,7 +296,7 @@
if (compacting)
{
- compactor.addCommandCommit(id, this);
+ compactor.addCommandCommit(this, file);
}
else
{
@@ -299,21 +307,22 @@
{
JournalImpl.JournalRecord posFiles = journal.getRecords().get(trUpdate.b);
- if (posFiles == null)
+ if (compactor != null && compactor.lookupRecord(trUpdate.b))
{
- posFiles = new JournalImpl.JournalRecord(trUpdate.a);
-
- journal.getRecords().put(trUpdate.b, posFiles);
- }
- else if (compactor != null && compactor.lookupRecord(trUpdate.b))
- {
// This is a case where the transaction was opened after compacting was started,
// but the commit arrived while compacting was working
// We need to cache the counter update, so compacting will take the correct files when it is done
compactor.addCommandUpdate(trUpdate.b, trUpdate.a);
}
else
+ if (posFiles == null)
{
+ posFiles = new JournalImpl.JournalRecord(trUpdate.a);
+
+ journal.getRecords().put(trUpdate.b, posFiles);
+ }
+ else
+ {
posFiles.addUpdateFile(trUpdate.a);
}
}
@@ -325,6 +334,8 @@
{
JournalImpl.JournalRecord posFiles = journal.getRecords().remove(trDelete.b);
+ System.out.println("Deleting record " + trDelete.b + " posFiles = " + posFiles);
+
if (posFiles != null)
{
posFiles.delete(trDelete.a);
@@ -375,20 +386,29 @@
* */
public void rollback(final JournalFile file)
{
- // Now add negs for the pos we added in each file in which there were
- // transactional operations
- // Note that we do this on rollback as we do on commit, since we need
- // to ensure the file containing
- // the rollback record doesn't get deleted before the files with the
- // transactional operations are deleted
- // Otherwise we may run into problems especially with XA where we are
- // just left with a prepare when the tx
- // has actually been rolled back
+ JournalCompactor compactor = journal.getCompactor();
- for (JournalFile jf : pendingFiles)
+ if (compacting && compactor != null)
{
- file.incNegCount(jf);
+ compactor.addCommandRollback(this, file);
}
+ else
+ {
+ // Now add negs for the pos we added in each file in which there were
+ // transactional operations
+ // Note that we do this on rollback as we do on commit, since we need
+ // to ensure the file containing
+ // the rollback record doesn't get deleted before the files with the
+ // transactional operations are deleted
+ // Otherwise we may run into problems especially with XA where we are
+ // just left with a prepare when the tx
+ // has actually been rolled back
+
+ for (JournalFile jf : pendingFiles)
+ {
+ file.incNegCount(jf);
+ }
+ }
}
/**
@@ -414,6 +434,11 @@
}
+ public String toString()
+ {
+ return "JournalTransaction(" + this.id + ")";
+ }
+
private AtomicInteger internalgetCounter(final JournalFile file)
{
if (lastFile != file)
Modified: branches/clebert_temp_expirement/tests/src/org/jboss/messaging/tests/integration/journal/NIOJournalCompactTest.java
===================================================================
--- branches/clebert_temp_expirement/tests/src/org/jboss/messaging/tests/integration/journal/NIOJournalCompactTest.java 2009-06-30 03:37:06 UTC (rev 7500)
+++ branches/clebert_temp_expirement/tests/src/org/jboss/messaging/tests/integration/journal/NIOJournalCompactTest.java 2009-06-30 05:52:22 UTC (rev 7501)
@@ -32,6 +32,7 @@
import org.jboss.messaging.core.logging.Logger;
import org.jboss.messaging.tests.unit.core.journal.impl.JournalImplTestBase;
import org.jboss.messaging.utils.IDGenerator;
+import org.jboss.messaging.utils.Pair;
import org.jboss.messaging.utils.TimeAndCounterIDGenerator;
/**
@@ -68,45 +69,52 @@
public void testCompactwithPendingCommit() throws Exception
{
- InternalCompactTest(false, false, false, false, true);
+ InternalCompactTest(false, false, false, false, true, false, false);
}
+ public void testCompactwithDelayedCommit() throws Exception
+ {
+ InternalCompactTest(false, false, false, false, true, false, true);
+ }
+
public void testCompactwithPendingCommitFollowedByDelete() throws Exception
{
+ InternalCompactTest(false, false, false, false, true, true, false);
}
-
-
+
public void testCompactwithConcurrentUpdateAndDeletes() throws Exception
{
- InternalCompactTest(true, false, true, true, false);
+ InternalCompactTest(true, false, true, true, false, false, false);
}
public void testCompactwithConcurrentDeletes() throws Exception
{
- InternalCompactTest(true, false, false, true, false);
+ InternalCompactTest(true, false, false, true, false, false, false);
}
public void testCompactwithConcurrentUpdates() throws Exception
{
- InternalCompactTest(true, false, true, false, false);
+ InternalCompactTest(true, false, true, false, false, false, false);
}
public void testCompactWithConcurrentAppend() throws Exception
{
- InternalCompactTest(true, true, false, false, false);
+ InternalCompactTest(true, true, false, false, false, false, false);
}
private void InternalCompactTest(final boolean regularAdd,
final boolean performAppend,
final boolean performUpdate,
final boolean performDelete,
- final boolean pendingTransactions) throws Exception
+ final boolean pendingTransactions,
+ final boolean deleteTransactRecords,
+ final boolean delayCommit) throws Exception
{
setup(50, 60 * 1024, true);
ArrayList<Long> liveIDs = new ArrayList<Long>();
- ArrayList<Long> listPendingTransactions = new ArrayList<Long>();
+ ArrayList<Pair<Long, Long>> transactedRecords = new ArrayList<Pair<Long, Long>>();
final CountDownLatch latchDone = new CountDownLatch(1);
final CountDownLatch latchWait = new CountDownLatch(1);
@@ -164,9 +172,10 @@
{
for (long i = 0; i < 100; i++)
{
- addTx(transactionID, idGenerator.generateID());
- updateTx(transactionID, idGenerator.generateID());
- listPendingTransactions.add(transactionID++);
+ long recordID = idGenerator.generateID();
+ addTx(transactionID, recordID);
+ updateTx(transactionID, recordID);
+ transactedRecords.add(new Pair<Long, Long>(transactionID++, recordID));
}
}
@@ -224,7 +233,8 @@
for (int i = 0; i < 50; i++)
{
- // A Total new transaction (that was created after the compact started) to add new record while compacting is still working
+ // A Total new transaction (that was created after the compact started) to add new record while compacting
+ // is still working
addTx(transactionID, nextID++);
commit(transactionID++);
if (i % 10 == 0)
@@ -245,7 +255,8 @@
}
else
{
- // A Total new transaction (that was created after the compact started) to update a record that is being compacted
+ // A Total new transaction (that was created after the compact started) to update a record that is being
+ // compacted
updateTx(transactionID, liveID);
commit(transactionID++);
}
@@ -259,55 +270,84 @@
{
if (count++ % 2 == 0)
{
+ System.out.println("Deleting no trans " + liveID );
delete(liveID);
}
else
{
- // A Total new transaction (that was created after the compact started) to delete a record that is being compacted
+ System.out.println("Deleting TX " + liveID );
+ // A Total new transaction (that was created after the compact started) to delete a record that is being
+ // compacted
deleteTx(transactionID, liveID);
commit(transactionID++);
}
+ System.out.println("Deletes are going into " + ((JournalImpl)journal).getCurrentFile());
}
}
- if (pendingTransactions)
+ if (pendingTransactions && !delayCommit)
{
- for (long tx : listPendingTransactions)
+ for (Pair<Long, Long> tx : transactedRecords)
{
- if (tx % 2 == 0)
+ if (tx.a % 2 == 0)
{
- commit(tx);
+ commit(tx.a);
+
+ if (deleteTransactRecords)
+ {
+ delete(tx.b);
+ }
}
else
{
- rollback(tx);
+ rollback(tx.a);
}
}
}
/** Some independent adds and updates */
- for (int i = 0; i < 1000; i++)
- {
- long id = idGenerator.generateID();
- add(id);
- delete(id);
+// for (int i = 0; i < 1000; i++)
+// {
+// long id = idGenerator.generateID();
+// add(id);
+// delete(id);
+//
+// if (i % 100 == 0)
+// {
+// journal.forceMoveNextFile();
+// }
+// }
- if (i % 100 == 0)
- {
- journal.forceMoveNextFile();
- }
- }
-
journal.forceMoveNextFile();
latchWait.countDown();
t.join();
+ if (pendingTransactions && delayCommit)
+ {
+ for (Pair<Long, Long> tx : transactedRecords)
+ {
+ if (tx.a % 2 == 0)
+ {
+ commit(tx.a);
+
+ if (deleteTransactRecords)
+ {
+ delete(tx.b);
+ }
+ }
+ else
+ {
+ rollback(tx.a);
+ }
+ }
+ }
+
add(idGenerator.generateID());
- // journal.compact();
+ journal.compact();
stopJournal();
createJournal();
More information about the jboss-cvs-commits
mailing list