[jboss-cvs] JBoss Messaging SVN: r7500 - branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Mon Jun 29 23:37:06 EDT 2009
Author: clebert.suconic at jboss.com
Date: 2009-06-29 23:37:06 -0400 (Mon, 29 Jun 2009)
New Revision: 7500
Modified:
branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFile.java
branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/DummyCallback.java
branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalCompactor.java
branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java
branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalTransaction.java
Log:
tweaks
Modified: branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFile.java
===================================================================
--- branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFile.java 2009-06-30 01:12:59 UTC (rev 7499)
+++ branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFile.java 2009-06-30 03:37:06 UTC (rev 7500)
@@ -346,7 +346,7 @@
}
else
{
- write(bytes, false, DummyCallback.instance);
+ write(bytes, false, DummyCallback.getInstance());
}
}
@@ -376,7 +376,7 @@
}
else
{
- write(bytes, false, DummyCallback.instance);
+ write(bytes, false, DummyCallback.getInstance());
}
}
Modified: branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/DummyCallback.java
===================================================================
--- branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/DummyCallback.java 2009-06-30 01:12:59 UTC (rev 7499)
+++ branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/DummyCallback.java 2009-06-30 03:37:06 UTC (rev 7500)
@@ -35,7 +35,7 @@
*/
public class DummyCallback implements IOCallback
{
- static DummyCallback instance = new DummyCallback();
+ private static DummyCallback instance = new DummyCallback();
private static final Logger log = Logger.getLogger(SimpleWaitIOCallback.class);
Modified: branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalCompactor.java
===================================================================
--- branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalCompactor.java 2009-06-30 01:12:59 UTC (rev 7499)
+++ branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalCompactor.java 2009-06-30 03:37:06 UTC (rev 7500)
@@ -29,6 +29,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
import org.jboss.messaging.core.buffers.ChannelBuffer;
import org.jboss.messaging.core.buffers.ChannelBuffers;
@@ -52,34 +53,46 @@
private static final Logger log = Logger.getLogger(JournalCompactor.class);
- final JournalImpl journal;
+ private final JournalImpl journal;
- final SequentialFileFactory fileFactory;
+ private final SequentialFileFactory fileFactory;
- JournalFile currentFile;
+ private JournalFile currentFile;
- SequentialFile sequentialFile;
+ private SequentialFile sequentialFile;
- int fileID;
+ private int fileID;
- ChannelBuffer channelWrapper;
+ private ChannelBuffer channelWrapper;
- int nextOrderingID;
+ private int nextOrderingID;
- final List<JournalFile> newDataFiles = new ArrayList<JournalFile>();
+ private final List<JournalFile> newDataFiles = new ArrayList<JournalFile>();
- final Set<Long> recordsSnapshot = new ConcurrentHashSet<Long>();;
+ private final Set<Long> recordsSnapshot = new ConcurrentHashSet<Long>();;
// Snapshot of transactions that were pending when the compactor started
- final Set<Long> pendingTransactions = new ConcurrentHashSet<Long>();
+ private final Map<Long, PendingTransaction> pendingTransactions = new ConcurrentHashMap<Long, PendingTransaction>();
- final Map<Long, JournalRecord> newRecords = new HashMap<Long, JournalRecord>();
+ private final Map<Long, JournalRecord> newRecords = new HashMap<Long, JournalRecord>();
- final Map<Long, JournalTransaction> newTransactions = new HashMap<Long, JournalTransaction>();
+ private final Map<Long, JournalTransaction> newTransactions = new HashMap<Long, JournalTransaction>();
- /** Commands that happened during compacting */
- final LinkedList<CompactCommand> pendingCommands = new LinkedList<CompactCommand>();
+ /** Commands that happened during compacting
+ * We can't take any counts during compactation, as we won't know in what files the records are taking place, so
+ * we cache those updates during compacting. As soon as we are done we take the right account. */
+ private final LinkedList<CompactCommand> pendingCommands = new LinkedList<CompactCommand>();
+ public List<JournalFile> getNewDataFiles()
+ {
+ return newDataFiles;
+ }
+
+ public Map<Long, JournalRecord> getNewRecords()
+ {
+ return newRecords;
+ }
+
public JournalCompactor(final SequentialFileFactory fileFactory,
final JournalImpl journal,
Set<Long> recordsSnapshot,
@@ -92,9 +105,9 @@
}
/** This methods informs the Compactor about the existence of a pending (non committed) transaction */
- public void addPendingTransaction(long transactionID)
+ public void addPendingTransaction(long transactionID, long ids[])
{
- pendingTransactions.add(transactionID);
+ pendingTransactions.put(transactionID, new PendingTransaction(transactionID, ids));
}
/**
@@ -192,7 +205,7 @@
public void addRecordTX(long transactionID, RecordInfo info) throws Exception
{
- if (pendingTransactions.contains(transactionID))
+ if (pendingTransactions.get(transactionID) != null)
{
JournalTransaction newTransaction = getNewJournalTransaction(transactionID);
@@ -219,7 +232,7 @@
public void commitRecord(long transactionID, int numberOfRecords) throws Exception
{
- if (pendingTransactions.contains(transactionID))
+ if (pendingTransactions.get(transactionID) != null)
{
// Sanity check, this should never happen
throw new IllegalStateException("Inconsistency during compacting: CommitRecord ID = " + transactionID +
@@ -239,7 +252,7 @@
public void deleteRecordTX(long transactionID, RecordInfo info) throws Exception
{
- if (pendingTransactions.contains(transactionID))
+ if (pendingTransactions.get(transactionID) != null)
{
JournalTransaction newTransaction = getNewJournalTransaction(transactionID);
@@ -266,7 +279,7 @@
public void prepareRecord(long transactionID, byte[] extraData, int numberOfRecords) throws Exception
{
- if (pendingTransactions.contains(transactionID))
+ if (pendingTransactions.get(transactionID) != null)
{
JournalTransaction newTransaction = getNewJournalTransaction(transactionID);
@@ -291,7 +304,7 @@
public void rollbackRecord(long transactionID) throws Exception
{
- if (pendingTransactions.contains(transactionID))
+ if (pendingTransactions.get(transactionID) != null)
{
// Sanity check, this should never happen
throw new IllegalStateException("Inconsistency during compacting: RollbackRecord ID = " + transactionID +
@@ -328,7 +341,7 @@
public void updateRecordTX(long transactionID, RecordInfo info) throws Exception
{
- if (pendingTransactions.contains(transactionID))
+ if (pendingTransactions.get(transactionID) != null)
{
JournalTransaction newTransaction = getNewJournalTransaction(transactionID);
@@ -361,7 +374,7 @@
JournalTransaction newTransaction = newTransactions.get(transactionID);
if (newTransaction == null)
{
- newTransaction = new JournalTransaction(journal);
+ newTransaction = new JournalTransaction(transactionID, journal);
newTransactions.put(transactionID, newTransaction);
}
return newTransaction;
@@ -413,6 +426,20 @@
deleteRecord.delete(usedFile);
}
}
+
+ private class PendingTransaction
+ {
+ long transactionID;
+ long pendingIDs[];
+
+
+ PendingTransaction(long transactionID, long ids[])
+ {
+ this.transactionID = transactionID;
+ this.pendingIDs = ids;
+ }
+
+ }
private class UpdateCompactCommand extends CompactCommand
{
@@ -435,4 +462,14 @@
updateRecord.addUpdateFile(usedFile);
}
}
+
+ /**
+ * @param id
+ * @param journalTransaction
+ */
+ public void addCommandCommit(long id, JournalTransaction journalTransaction)
+ {
+ // TODO Auto-generated method stub
+
+ }
}
Modified: branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java
===================================================================
--- branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java 2009-06-30 01:12:59 UTC (rev 7499)
+++ branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java 2009-06-30 03:37:06 UTC (rev 7500)
@@ -61,9 +61,7 @@
import org.jboss.messaging.core.journal.TestableJournal;
import org.jboss.messaging.core.logging.Logger;
import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
-import org.jboss.messaging.utils.ConcurrentHashSet;
import org.jboss.messaging.utils.DataConstants;
-import org.jboss.messaging.utils.Pair;
import org.jboss.messaging.utils.concurrent.LinkedBlockingDeque;
/**
@@ -866,9 +864,8 @@
for (Map.Entry<Long, JournalTransaction> entry : transactions.entrySet())
{
- System.out.println("TransactionID = " + entry.getKey());
entry.getValue().setCompacting();
- compactor.addPendingTransaction(entry.getKey());
+ compactor.addPendingTransaction(entry.getKey(), entry.getValue().getPositiveArray());
}
// We will calculate the new records during compacting, what will take the position the records will take after compacting
@@ -904,17 +901,17 @@
// Usually tests will use this to hold the compacting while other structures are being updated.
onCompactDone();
- SequentialFile controlFile = createControlFile(dataFilesToProcess, compactor.newDataFiles);
+ SequentialFile controlFile = createControlFile(dataFilesToProcess, compactor.getNewDataFiles());
List<JournalFile> newDatafiles = null;
compactingLock.writeLock().lock();
try
{
- newDatafiles = compactor.newDataFiles;
+ newDatafiles = compactor.getNewDataFiles();
// Restore newRecords created during compacting
- for (Map.Entry<Long, JournalRecord> newRecordEntry : compactor.newRecords.entrySet())
+ for (Map.Entry<Long, JournalRecord> newRecordEntry : compactor.getNewRecords().entrySet())
{
records.put(newRecordEntry.getKey(), newRecordEntry.getValue());
}
@@ -929,7 +926,7 @@
compactor.replayPendingCommands();
- // Deal with transactions commits that happend during the compacting
+ // Deal with transactions commits that happened during the compacting
this.compactor = null;
@@ -956,17 +953,6 @@
{
}
- if (compactor.sequentialFile != null)
- {
- try
- {
- compactor.sequentialFile = null;
- }
- catch (Throwable ignored)
- {
- }
- }
-
compactor = null;
}
autoReclaim = previousReclaimValue;
@@ -1204,7 +1190,7 @@
if (tnp == null)
{
- tnp = new JournalTransaction(JournalImpl.this);
+ tnp = new JournalTransaction(info.id, JournalImpl.this);
transactions.put(transactionID, tnp);
}
@@ -1236,7 +1222,7 @@
if (tnp == null)
{
- tnp = new JournalTransaction(JournalImpl.this);
+ tnp = new JournalTransaction(transactionID, JournalImpl.this);
transactions.put(transactionID, tnp);
}
@@ -1272,7 +1258,7 @@
if (journalTransaction == null)
{
- journalTransaction = new JournalTransaction(JournalImpl.this);
+ journalTransaction = new JournalTransaction(transactionID, JournalImpl.this);
transactions.put(transactionID, journalTransaction);
}
@@ -2730,7 +2716,7 @@
if (tx == null)
{
- tx = new JournalTransaction(this);
+ tx = new JournalTransaction(txID, this);
JournalTransaction trans = transactions.putIfAbsent(txID, tx);
Modified: branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalTransaction.java
===================================================================
--- branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalTransaction.java 2009-06-30 01:12:59 UTC (rev 7499)
+++ branches/clebert_temp_expirement/src/main/org/jboss/messaging/core/journal/impl/JournalTransaction.java 2009-06-30 03:37:06 UTC (rev 7500)
@@ -51,12 +51,14 @@
private List<Pair<JournalFile, Long>> neg;
+ private long id;
+
// All the files this transaction is touching on.
// We can't have those files being reclaimed if there is a pending transaction
private Set<JournalFile> pendingFiles;
private TransactionCallback currentCallback;
-
+
private boolean compacting = false;
private Map<JournalFile, TransactionCallback> callbackList;
@@ -65,8 +67,9 @@
private final AtomicInteger counter = new AtomicInteger();
- public JournalTransaction(final JournalImpl journal)
+ public JournalTransaction(final long id, final JournalImpl journal)
{
+ this.id = id;
this.journal = journal;
}
@@ -80,10 +83,74 @@
internalgetCounter(file).incrementAndGet();
}
+ public long[] getPositiveArray()
+ {
+ if (pos == null)
+ {
+ return new long[0];
+ }
+ else
+ {
+ int i = 0;
+ long[] ids = new long[pos.size()];
+ for (Pair<JournalFile, Long> el : pos)
+ {
+ ids[i++] = el.b;
+ }
+ return ids;
+ }
+ }
+
public void setCompacting()
{
compacting = true;
-
+
+ // Everything is cleared on the transaction...
+ // since we are compacting, everything is at the compactor's level
+ clear();
+ }
+
+ /** This is used to merge transactions from compacting */
+ public void merge(JournalTransaction other)
+ {
+ if (other.pos != null)
+ {
+ if (pos == null)
+ {
+ pos = new ArrayList<Pair<JournalFile, Long>>();
+ }
+
+ pos.addAll(other.pos);
+ }
+
+ if (other.neg != null)
+ {
+ if (neg == null)
+ {
+ neg = new ArrayList<Pair<JournalFile, Long>>();
+ }
+
+ neg.addAll(other.neg);
+ }
+
+ if (other.pendingFiles != null)
+ {
+ if (pendingFiles == null)
+ {
+ pendingFiles = new HashSet<JournalFile>();
+ }
+
+ pendingFiles.addAll(other.pendingFiles);
+ }
+
+ this.compacting = false;
+ }
+
+ /**
+ *
+ */
+ public void clear()
+ {
// / Compacting is recreating all the previous files and everything
// / so we just clear the list of previous files, previous pos and previous adds
// / The transaction may be working at the top from now
@@ -213,64 +280,72 @@
}
/**
- * The caller of this method needs to guarantee lock.acquire. (unless this is being called from load what is a single thread process).
+ * The caller of this method needs to guarantee lock.acquire at the journal. (unless this is being called from load what is a single thread process).
* */
public void commit(final JournalFile file)
{
JournalCompactor compactor = journal.getCompactor();
-
- if (pos != null)
+
+ if (compacting)
{
- for (Pair<JournalFile, Long> trUpdate : pos)
+ compactor.addCommandCommit(id, this);
+ }
+ else
+ {
+
+ if (pos != null)
{
- JournalImpl.JournalRecord posFiles = journal.getRecords().get(trUpdate.b);
-
- if (posFiles == null)
+ for (Pair<JournalFile, Long> trUpdate : pos)
{
- posFiles = new JournalImpl.JournalRecord(trUpdate.a);
+ JournalImpl.JournalRecord posFiles = journal.getRecords().get(trUpdate.b);
- journal.getRecords().put(trUpdate.b, posFiles);
+ if (posFiles == null)
+ {
+ posFiles = new JournalImpl.JournalRecord(trUpdate.a);
+
+ journal.getRecords().put(trUpdate.b, posFiles);
+ }
+ else if (compactor != null && compactor.lookupRecord(trUpdate.b))
+ {
+ // This is a case where the transaction was opened after compacting was started,
+ // but the commit arrived while compacting was working
+ // We need to cache the counter update, so compacting will take the correct files when it is done
+ compactor.addCommandUpdate(trUpdate.b, trUpdate.a);
+ }
+ else
+ {
+ posFiles.addUpdateFile(trUpdate.a);
+ }
}
- else if (compactor != null && compactor.lookupRecord(trUpdate.b))
- {
- // This is a case where the transaction was opened after compacting was started,
- // but the commit arrived while compacting was working
- // We need to cache the counter update, so compacting will take the correct files when it is done
- compactor.addCommandUpdate(trUpdate.b, trUpdate.a);
- }
- else
- {
- posFiles.addUpdateFile(trUpdate.a);
- }
}
- }
- if (neg != null)
- {
- for (Pair<JournalFile, Long> trDelete : neg)
+ if (neg != null)
{
- JournalImpl.JournalRecord posFiles = journal.getRecords().remove(trDelete.b);
+ for (Pair<JournalFile, Long> trDelete : neg)
+ {
+ JournalImpl.JournalRecord posFiles = journal.getRecords().remove(trDelete.b);
- if (posFiles != null)
- {
- posFiles.delete(trDelete.a);
+ if (posFiles != null)
+ {
+ posFiles.delete(trDelete.a);
+ }
+ else if (compactor != null && compactor.lookupRecord(trDelete.b))
+ {
+ // This is a case where the transaction was opened after compacting was started,
+ // but the commit arrived while compacting was working
+ // We need to cache the counter update, so compacting will take the correct files when it is done
+ compactor.addCommandDelete(trDelete.b, trDelete.a);
+ }
}
- else if (compactor != null && compactor.lookupRecord(trDelete.b))
- {
- // This is a case where the transaction was opened after compacting was started,
- // but the commit arrived while compacting was working
- // We need to cache the counter update, so compacting will take the correct files when it is done
- compactor.addCommandDelete(trDelete.b, trDelete.a);
- }
}
- }
- // Now add negs for the pos we added in each file in which there were
- // transactional operations
+ // Now add negs for the pos we added in each file in which there were
+ // transactional operations
- for (JournalFile jf : pendingFiles)
- {
- file.incNegCount(jf);
+ for (JournalFile jf : pendingFiles)
+ {
+ file.incNegCount(jf);
+ }
}
}
More information about the jboss-cvs-commits
mailing list