[jboss-cvs] JBoss Messaging SVN: r7498 - in branches/clebert_temp_expirement: tests/src/org/jboss/messaging/tests/integration/journal and 1 other directory.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Mon Jun 29 19:47:11 EDT 2009
Author: clebert.suconic at jboss.com
Date: 2009-06-29 19:47:11 -0400 (Mon, 29 Jun 2009)
New Revision: 7498
Modified:
branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalCompactor.java
branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java
branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalTransaction.java
branches/clebert_temp_expirement/tests/src/org/jboss/messaging/tests/integration/journal/JournalCompactTest.java
Log:
tweaks
Modified: branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalCompactor.java
===================================================================
--- branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalCompactor.java 2009-06-29 18:36:59 UTC (rev 7497)
+++ branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalCompactor.java 2009-06-29 23:47:11 UTC (rev 7498)
@@ -28,6 +28,7 @@
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import org.jboss.messaging.core.buffers.ChannelBuffer;
import org.jboss.messaging.core.buffers.ChannelBuffers;
@@ -36,6 +37,7 @@
import org.jboss.messaging.core.journal.SequentialFileFactory;
import org.jboss.messaging.core.journal.impl.JournalImpl.JournalRecord;
import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.utils.ConcurrentHashSet;
import org.jboss.messaging.utils.DataConstants;
import org.jboss.messaging.utils.Pair;
@@ -69,45 +71,48 @@
final Map<Long, JournalRecord> recordsSnapshot;
- final Map<Long, JournalTransaction> pendingTransactions;
+ // Snapshot of transactions that were pending when the compactor started
+ final Set<Long> pendingTransactions = new ConcurrentHashSet<Long>();
final Map<Long, JournalRecord> newRecords = new HashMap<Long, JournalRecord>();
final Map<Long, JournalTransaction> newTransactions = new HashMap<Long, JournalTransaction>();
- final LinkedList<Pair<Long, JournalFile>> pendingUpdates = new LinkedList<Pair<Long, JournalFile>>();
+ /** Commands that happened during compacting */
+ final LinkedList<CompactCommand> pendingCommands = new LinkedList<CompactCommand>();
- final LinkedList<Pair<Long, JournalFile>> pendingDeletes = new LinkedList<Pair<Long, JournalFile>>();
-
public JournalCompactor(final SequentialFileFactory fileFactory,
final JournalImpl journal,
Map<Long, JournalRecord> recordsSnapshot,
- Map<Long, JournalTransaction> pendingTransactions,
int firstFileID)
{
this.fileFactory = fileFactory;
this.journal = journal;
this.recordsSnapshot = recordsSnapshot;
this.nextOrderingID = firstFileID;
- this.pendingTransactions = pendingTransactions;
}
+ public void addPendingTransaction(long transactionID)
+ {
+ pendingTransactions.add(transactionID);
+ }
+
/**
* @param id
* @param usedFile
*/
- public void addPendingDelete(long id, JournalFile usedFile)
+ public void addCommandDelete(long id, JournalFile usedFile)
{
- pendingDeletes.add(new Pair<Long, JournalFile>(id, usedFile));
+ pendingCommands.add(new DeleteCompactCommand(id, usedFile));
}
/**
* @param id
* @param usedFile
*/
- public void addPendingUpdate(long id, JournalFile usedFile)
+ public void addCommandUpdate(long id, JournalFile usedFile)
{
- pendingUpdates.add(new Pair<Long, JournalFile>(id, usedFile));
+ pendingCommands.add(new UpdateCompactCommand(id, usedFile));
}
public boolean lookupRecord(long id)
@@ -130,6 +135,7 @@
}
}
+ /** Write pending output into file */
public void flush() throws Exception
{
if (channelWrapper != null)
@@ -143,8 +149,28 @@
channelWrapper = null;
}
+ /**
+ * Replay pending counts that happened during compacting
+ */
+ public void replayPendingCommands()
+ {
+ for (CompactCommand command : pendingCommands)
+ {
+ try
+ {
+ command.execute();
+ }
+ catch (Exception e)
+ {
+ log.warn("Error replaying pending commands after compacting", e);
+ }
+ }
+
+ pendingCommands.clear();
+ }
+
// JournalReaderCallback implementation -------------------------------------------
-
+
public void addRecord(RecordInfo info) throws Exception
{
if (recordsSnapshot.get(info.id) != null)
@@ -166,7 +192,7 @@
public void addRecordTX(long transactionID, RecordInfo info) throws Exception
{
- if (pendingTransactions.get(transactionID) != null)
+ if (pendingTransactions.contains(transactionID))
{
JournalTransaction newTransaction = getNewJournalTransaction(transactionID);
@@ -193,9 +219,7 @@
public void commitRecord(long transactionID, int numberOfRecords) throws Exception
{
- JournalTransaction pendingTx = pendingTransactions.get(transactionID);
-
- if (pendingTx != null)
+ if (pendingTransactions.contains(transactionID))
{
// Sanity check, this should never happen
throw new IllegalStateException("Inconsistency during compacting: CommitRecord ID = " + transactionID +
@@ -215,7 +239,7 @@
public void deleteRecordTX(long transactionID, RecordInfo info) throws Exception
{
- if (pendingTransactions.get(transactionID) != null)
+ if (pendingTransactions.contains(transactionID))
{
JournalTransaction newTransaction = getNewJournalTransaction(transactionID);
@@ -232,6 +256,7 @@
newTransaction.addNegative(currentFile, info.id);
}
+ // else.. nothing to be done
}
public void markAsDataFile(JournalFile file)
@@ -241,7 +266,7 @@
public void prepareRecord(long transactionID, byte[] extraData, int numberOfRecords) throws Exception
{
- if (pendingTransactions.get(transactionID) != null)
+ if (pendingTransactions.contains(transactionID))
{
JournalTransaction newTransaction = getNewJournalTransaction(transactionID);
@@ -266,7 +291,7 @@
public void rollbackRecord(long transactionID) throws Exception
{
- if (pendingTransactions.get(transactionID) != null)
+ if (pendingTransactions.contains(transactionID))
{
// Sanity check, this should never happen
throw new IllegalStateException("Inconsistency during compacting: RollbackRecord ID = " + transactionID +
@@ -303,8 +328,7 @@
public void updateRecordTX(long transactionID, RecordInfo info) throws Exception
{
-
- if (pendingTransactions.get(transactionID) != null)
+ if (pendingTransactions.contains(transactionID))
{
JournalTransaction newTransaction = getNewJournalTransaction(transactionID);
@@ -324,7 +348,6 @@
}
else
{
-
updateRecord(info);
}
}
@@ -343,8 +366,7 @@
}
return newTransaction;
}
-
-
+
/**
* @throws Exception
*/
@@ -366,6 +388,50 @@
channelWrapper.writeInt(fileID);
}
-
+ static abstract class CompactCommand
+ {
+ abstract void execute() throws Exception;
+ }
+ class DeleteCompactCommand extends CompactCommand
+ {
+ long id;
+
+ JournalFile usedFile;
+
+
+ public DeleteCompactCommand(long id, JournalFile usedFile)
+ {
+ this.id = id;
+ this.usedFile = usedFile;
+ }
+
+ @Override
+ void execute() throws Exception
+ {
+ JournalRecord deleteRecord = journal.getRecords().remove(id);
+ deleteRecord.delete(usedFile);
+ }
+ }
+
+ class UpdateCompactCommand extends CompactCommand
+ {
+ long id;
+
+ JournalFile usedFile;
+
+
+ public UpdateCompactCommand(long id, JournalFile usedFile)
+ {
+ this.id = id;
+ this.usedFile = usedFile;
+ }
+
+ @Override
+ void execute() throws Exception
+ {
+ JournalRecord updateRecord = journal.getRecords().get(id);
+ updateRecord.addUpdateFile(usedFile);
+ }
+ }
}
Modified: branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java
===================================================================
--- branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java 2009-06-29 18:36:59 UTC (rev 7497)
+++ branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java 2009-06-29 23:47:11 UTC (rev 7498)
@@ -266,6 +266,12 @@
{
return records;
}
+
+
+ public JournalCompactor getCompactor()
+ {
+ return this.compactor;
+ }
// Journal implementation
// ----------------------------------------------------------------
@@ -365,7 +371,7 @@
// compacting is done
if (posFiles == null)
{
- compactor.addPendingUpdate(id, usedFile);
+ compactor.addCommandUpdate(id, usedFile);
}
else
{
@@ -432,7 +438,7 @@
// compacting is done
if (record == null)
{
- compactor.addPendingDelete(id, usedFile);
+ compactor.addCommandDelete(id, usedFile);
}
else
{
@@ -857,21 +863,21 @@
pendingTransactions = JournalImpl.this.pendingTransactions;
pendingTransactions.putAll(this.pendingTransactions);
- for (Map.Entry<Long, JournalTransaction> entry : pendingTransactions.entrySet())
- {
- System.out.println("TransactionID = " + entry.getKey());
- }
+ this.records = new ConcurrentHashMap<Long, JournalRecord>();
- JournalImpl.this.records = new ConcurrentHashMap<Long, JournalRecord>();
-
- records = new ConcurrentHashMap<Long, JournalRecord>();
-
dataFilesToProcess.addAll(dataFiles);
dataFiles.clear();
- this.compactor = new JournalCompactor(fileFactory, this, recordsSnapshot, pendingTransactions, dataFilesToProcess.get(0).getFileID());
+ this.compactor = new JournalCompactor(fileFactory, this, recordsSnapshot, dataFilesToProcess.get(0).getFileID());
+ for (Map.Entry<Long, JournalTransaction> entry : pendingTransactions.entrySet())
+ {
+ System.out.println("TransactionID = " + entry.getKey());
+ entry.getValue().setCompacting();
+ compactor.addPendingTransaction(entry.getKey());
+ }
+
}
finally
{
@@ -912,28 +918,22 @@
{
newDatafiles = compactor.newDataFiles;
+ // Restore newRecords created during compacting
for (Map.Entry<Long, JournalRecord> newRecordEntry : compactor.newRecords.entrySet())
{
records.put(newRecordEntry.getKey(), newRecordEntry.getValue());
}
+ // Restore compacted dataFiles
for (int i = newDatafiles.size() - 1; i >= 0; i--)
{
dataFiles.addFirst(newDatafiles.get(i));
}
- for (Pair<Long, JournalFile> pendingRecord : compactor.pendingUpdates)
- {
- JournalRecord updateRecord = this.records.get(pendingRecord.a);
- updateRecord.addUpdateFile(pendingRecord.b);
- }
-
- for (Pair<Long, JournalFile> pendingRecord : compactor.pendingDeletes)
- {
- JournalRecord deleteRecord = this.records.remove(pendingRecord.a);
- deleteRecord.delete(pendingRecord.b);
- }
-
+ // Replay pending commands
+
+ compactor.replayPendingCommands();
+
// Restore relationshipMap
// Deal with transactions commits that happend during the compacting
// Deal with updates and deletes that happened during the compacting
Modified: branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalTransaction.java
===================================================================
--- branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalTransaction.java 2009-06-29 18:36:59 UTC (rev 7497)
+++ branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalTransaction.java 2009-06-29 23:47:11 UTC (rev 7498)
@@ -78,6 +78,39 @@
internalgetCounter(file).incrementAndGet();
}
+ public void setCompacting()
+ {
+ // / Compacting is recreating all the previous files and everything
+ // / so we just clear the list of previous files, previous pos and previous adds
+ // / The transaction may be working at the top from now
+
+ if (pendingFiles != null)
+ {
+ pendingFiles.clear();
+ }
+
+ if (callbackList != null)
+ {
+ callbackList.clear();
+ }
+
+ if (pos != null)
+ {
+ pos.clear();
+ }
+
+ if (neg != null)
+ {
+ neg.clear();
+ }
+
+ counter.set(0);
+
+ lastFile = null;
+
+ currentCallback = null;
+ }
+
/**
* @param currentFile
* @param bb
@@ -180,35 +213,50 @@
* */
public void commit(final JournalFile file)
{
+ JournalCompactor compactor = journal.getCompactor();
if (pos != null)
{
- for (Pair<JournalFile, Long> p : pos)
+ for (Pair<JournalFile, Long> trUpdate : pos)
{
- JournalImpl.JournalRecord posFiles = journal.getRecords().get(p.b);
+ JournalImpl.JournalRecord posFiles = journal.getRecords().get(trUpdate.b);
if (posFiles == null)
{
- posFiles = new JournalImpl.JournalRecord(p.a);
+ posFiles = new JournalImpl.JournalRecord(trUpdate.a);
- journal.getRecords().put(p.b, posFiles);
+ journal.getRecords().put(trUpdate.b, posFiles);
}
+ else if (compactor != null && compactor.lookupRecord(trUpdate.b))
+ {
+ // This is a case where the transaction was opened after compacting was started,
+ // but the commit arrived while compacting was working
+ // We need to cache the counter update, so compacting will take the correct files when it is done
+ compactor.addCommandUpdate(trUpdate.b, trUpdate.a);
+ }
else
{
- posFiles.addUpdateFile(p.a);
+ posFiles.addUpdateFile(trUpdate.a);
}
}
}
if (neg != null)
{
- for (Pair<JournalFile, Long> n : neg)
+ for (Pair<JournalFile, Long> trDelete : neg)
{
- JournalImpl.JournalRecord posFiles = journal.getRecords().remove(n.b);
+ JournalImpl.JournalRecord posFiles = journal.getRecords().remove(trDelete.b);
if (posFiles != null)
{
- posFiles.delete(n.a);
+ posFiles.delete(trDelete.a);
}
+ else if (compactor != null && compactor.lookupRecord(trDelete.b))
+ {
+ // This is a case where the transaction was opened after compacting was started,
+ // but the commit arrived while compacting was working
+ // We need to cache the counter update, so compacting will take the correct files when it is done
+ compactor.addCommandDelete(trDelete.b, trDelete.a);
+ }
}
}
Modified: branches/clebert_temp_expirement/tests/src/org/jboss/messaging/tests/integration/journal/JournalCompactTest.java
===================================================================
--- branches/clebert_temp_expirement/tests/src/org/jboss/messaging/tests/integration/journal/JournalCompactTest.java 2009-06-29 18:36:59 UTC (rev 7497)
+++ branches/clebert_temp_expirement/tests/src/org/jboss/messaging/tests/integration/journal/JournalCompactTest.java 2009-06-29 23:47:11 UTC (rev 7498)
@@ -54,6 +54,10 @@
// General tests
// =============
+ public void testCrashRenamingFiles() throws Exception
+ {
+ }
+
public void testCompactwithPendingXACommit() throws Exception
{
}
@@ -127,7 +131,6 @@
if (regularAdd)
{
-
for (int i = 0; i < NUMBER_OF_RECORDS / 2; i++)
{
add(i);
@@ -205,7 +208,7 @@
if (performAppend)
{
- for (int i = 0; i < 100; i++)
+ for (int i = 0; i < 50; i++)
{
add(nextID++);
if (i % 10 == 0)
@@ -213,21 +216,53 @@
journal.forceMoveNextFile();
}
}
+
+ for (int i = 0; i < 50; i++)
+ {
+ // A Total new transaction (that was created after the compact started) to add new record while compacting is still working
+ addTx(transactionID, nextID++);
+ commit(transactionID++);
+ if (i % 10 == 0)
+ {
+ journal.forceMoveNextFile();
+ }
+ }
}
if (performUpdate)
{
+ int count = 0;
for (Long liveID : liveIDs)
{
- update(liveID);
+ if (count++ % 2 == 0)
+ {
+ update(liveID);
+ }
+ else
+ {
+ // A Total new transaction (that was created after the compact started) to update a record that is being compacted
+ updateTx(transactionID, liveID);
+ commit(transactionID++);
+ }
}
}
if (performDelete)
{
+ int count = 0;
for (long liveID : liveIDs)
{
- delete(liveID);
+ if (count++ % 2 == 0)
+ {
+ delete(liveID);
+ }
+ else
+ {
+ // A Total new transaction (that was created after the compact started) to delete a record that is being compacted
+ deleteTx(transactionID, liveID);
+ commit(transactionID++);
+ }
+
}
}
@@ -267,7 +302,7 @@
add(idGenerator.generateID());
- //journal.compact();
+ // journal.compact();
stopJournal();
createJournal();
More information about the jboss-cvs-commits
mailing list