[hornetq-commits] JBoss hornetq SVN: r8152 - in branches/Clebert_Sync: tests/src/org/hornetq/tests/unit/core/journal/impl and 1 other directory.
do-not-reply at jboss.org
do-not-reply at jboss.org
Tue Oct 27 17:13:04 EDT 2009
Author: clebert.suconic at jboss.com
Date: 2009-10-27 17:13:03 -0400 (Tue, 27 Oct 2009)
New Revision: 8152
Modified:
branches/Clebert_Sync/src/main/org/hornetq/core/journal/impl/JournalCopier.java
branches/Clebert_Sync/src/main/org/hornetq/core/journal/impl/JournalImpl.java
branches/Clebert_Sync/tests/src/org/hornetq/tests/unit/core/journal/impl/CopyJournalTest.java
Log:
Backup again...
Modified: branches/Clebert_Sync/src/main/org/hornetq/core/journal/impl/JournalCopier.java
===================================================================
--- branches/Clebert_Sync/src/main/org/hornetq/core/journal/impl/JournalCopier.java 2009-10-27 18:08:05 UTC (rev 8151)
+++ branches/Clebert_Sync/src/main/org/hornetq/core/journal/impl/JournalCopier.java 2009-10-27 21:13:03 UTC (rev 8152)
@@ -18,9 +18,7 @@
import org.hornetq.core.journal.Journal;
import org.hornetq.core.journal.RecordInfo;
import org.hornetq.core.journal.SequentialFileFactory;
-import org.hornetq.core.journal.impl.JournalImpl.JournalRecord;
import org.hornetq.core.logging.Logger;
-import org.hornetq.utils.DataConstants;
/**
* This will read records
@@ -35,13 +33,12 @@
private static final Logger log = Logger.getLogger(JournalCopier.class);
-
/** enable some trace at development. */
private static final boolean DEV_TRACE = true;
-
+
private static final boolean isTraceEnabled = log.isTraceEnabled();
-
- private static void trace(String msg)
+
+ private static void trace(final String msg)
{
System.out.println("JournalCopier::" + msg);
}
@@ -52,6 +49,9 @@
private final Journal journalTo;
+ /** Proxy mode means, everything will be copied over without any evaluations such as */
+ private boolean proxyMode = false;
+
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
@@ -62,14 +62,14 @@
* @param recordsSnapshot
* @param nextOrderingID
*/
- public JournalCopier(SequentialFileFactory fileFactory,
- JournalImpl journalFrom,
- Journal journalTo,
- Set<Long> recordsSnapshot,
- Set<Long> pendingTransactionsSnapshot)
+ public JournalCopier(final SequentialFileFactory fileFactory,
+ final JournalImpl journalFrom,
+ final Journal journalTo,
+ final Set<Long> recordsSnapshot,
+ final Set<Long> pendingTransactionsSnapshot)
{
super(fileFactory, journalFrom, recordsSnapshot, -1);
- this.pendingTransactions = pendingTransactionsSnapshot;
+ pendingTransactions = pendingTransactionsSnapshot;
this.journalTo = journalTo;
}
@@ -81,7 +81,7 @@
public void onReadAddRecord(final RecordInfo info) throws Exception
{
- if (lookupRecord(info.id))
+ if (proxyMode || lookupRecord(info.id))
{
if (DEV_TRACE)
{
@@ -100,7 +100,7 @@
public void onReadAddRecordTX(final long transactionID, final RecordInfo info) throws Exception
{
- if (pendingTransactions.contains(transactionID))
+ if (proxyMode || pendingTransactions.contains(transactionID))
{
if (DEV_TRACE)
{
@@ -118,8 +118,12 @@
public void onReadCommitRecord(final long transactionID, final int numberOfRecords) throws Exception
{
- if (pendingTransactions.contains(transactionID))
+ if (proxyMode)
{
+ journalTo.appendCommitRecord(transactionID, false);
+ }
+ else if (pendingTransactions.contains(transactionID))
+ {
// Sanity check, this should never happen
log.warn("Inconsistency during compacting: CommitRecord ID = " + transactionID +
" for an already committed transaction during compacting");
@@ -128,12 +132,17 @@
public void onReadDeleteRecord(final long recordID) throws Exception
{
+ if (proxyMode)
+ {
+ journalTo.appendDeleteRecord(recordID, false);
+ }
+ // else....
// Nothing to be done here, we don't copy deleted records
}
public void onReadDeleteRecordTX(final long transactionID, final RecordInfo info) throws Exception
{
- if (pendingTransactions.contains(transactionID))
+ if (proxyMode || pendingTransactions.contains(transactionID))
{
journalTo.appendDeleteRecordTransactional(transactionID, info.id, info.data);
}
@@ -147,7 +156,7 @@
public void onReadPrepareRecord(final long transactionID, final byte[] extraData, final int numberOfRecords) throws Exception
{
- if (pendingTransactions.contains(transactionID))
+ if (proxyMode || pendingTransactions.contains(transactionID))
{
journalTo.appendPrepareRecord(transactionID, extraData, false);
}
@@ -155,6 +164,10 @@
public void onReadRollbackRecord(final long transactionID) throws Exception
{
+ if (proxyMode)
+ {
+ journalTo.appendRollbackRecord(transactionID, false);
+ }
if (pendingTransactions.contains(transactionID))
{
// Sanity check, this should never happen
@@ -165,7 +178,7 @@
public void onReadUpdateRecord(final RecordInfo info) throws Exception
{
- if (lookupRecord(info.id))
+ if (proxyMode || lookupRecord(info.id))
{
journalTo.appendUpdateRecord(info.id, info.userRecordType, info.data, false);
}
@@ -173,7 +186,7 @@
public void onReadUpdateRecordTX(final long transactionID, final RecordInfo info) throws Exception
{
- if (pendingTransactions.contains(transactionID))
+ if (proxyMode || pendingTransactions.contains(transactionID))
{
journalTo.appendUpdateRecordTransactional(transactionID, info.id, info.userRecordType, info.data);
}
@@ -183,6 +196,11 @@
}
}
+ public void setProxyMode(final boolean proxyMode)
+ {
+ this.proxyMode = proxyMode;
+ }
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
Modified: branches/Clebert_Sync/src/main/org/hornetq/core/journal/impl/JournalImpl.java
===================================================================
--- branches/Clebert_Sync/src/main/org/hornetq/core/journal/impl/JournalImpl.java 2009-10-27 18:08:05 UTC (rev 8151)
+++ branches/Clebert_Sync/src/main/org/hornetq/core/journal/impl/JournalImpl.java 2009-10-27 21:13:03 UTC (rev 8152)
@@ -1476,8 +1476,6 @@
compactMinFiles = 0;
autoReclaim = false;
- flushExecutor();
-
// Wait the compactor and cleanup to finish case they are running
// This will also set the compactorRunning, as clean up and compact can't happen at the same time
while (!compactorRunning.compareAndSet(false, true))
@@ -1517,8 +1515,6 @@
return;
}
- dataFiles.clear();
-
HashSet<Long> txSet = new HashSet<Long>();
for (Map.Entry<Long, JournalTransaction> entry : transactions.entrySet())
@@ -1535,6 +1531,9 @@
Collections.sort(dataFilesToProcess, new JournalFileComparator());
+ // Need to make sure everything is out of executors and on the disk before backing it up
+ flush();
+
// This is where most of the work is done, taking most of the time of the compacting routine.
// Notice there are no locks while this is being done.
@@ -1544,9 +1543,48 @@
{
readJournalFile(fileFactory, file, copier);
}
+
+
+ // Final Freeze.... Sending the left over files (including the next file)
+ globalLock.writeLock().lock();
+
+ try
+ {
+ // Need to make sure everything is out of executors and on the disk before backing it up
+ flush();
+
+ List<JournalFile> newDataFilesToProcess = getSnapshotFilesToProcess();
+ Collections.sort(newDataFilesToProcess, new JournalFileComparator());
+
+ Iterator<JournalFile> newDataIterator = newDataFilesToProcess.iterator();
+ for (JournalFile alreadyProcessed : dataFilesToProcess)
+ {
+ JournalFile newFile = newDataIterator.next();
+
+ if (newFile.getFileID() != alreadyProcessed.getFileID())
+ {
+ log.warn("Processed FileID " + alreadyProcessed.getFileID() + " inconsistent with previous processed " + newFile.getFileID());
+ }
+ }
- copier.flush();
+ while (newDataIterator.hasNext())
+ {
+ JournalFile newFile = newDataIterator.next();
+
+ log.info("processing " + newFile.getFileID());
+
+ readJournalFile(fileFactory, newFile, copier);
+ }
+
+ }
+ finally
+ {
+ globalLock.writeLock().unlock();
+ }
+
+
+
}
finally
{
@@ -1756,6 +1794,8 @@
*/
private List<JournalFile> getSnapshotFilesToProcess() throws Exception
{
+ flush();
+
// We need to move to the next file, as we need a clear start for negatives and positives counts
moveNextFile(true);
Modified: branches/Clebert_Sync/tests/src/org/hornetq/tests/unit/core/journal/impl/CopyJournalTest.java
===================================================================
--- branches/Clebert_Sync/tests/src/org/hornetq/tests/unit/core/journal/impl/CopyJournalTest.java 2009-10-27 18:08:05 UTC (rev 8151)
+++ branches/Clebert_Sync/tests/src/org/hornetq/tests/unit/core/journal/impl/CopyJournalTest.java 2009-10-27 21:13:03 UTC (rev 8152)
@@ -14,6 +14,11 @@
package org.hornetq.tests.unit.core.journal.impl;
import java.io.File;
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.Method;
+import java.lang.reflect.Proxy;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.hornetq.core.config.impl.ConfigurationImpl;
@@ -22,6 +27,7 @@
import org.hornetq.core.journal.impl.AIOSequentialFileFactory;
import org.hornetq.core.journal.impl.JournalImpl;
import org.hornetq.core.journal.impl.NIOSequentialFileFactory;
+import org.hornetq.utils.VariableLatch;
/**
* A CopyJournalTest
@@ -47,7 +53,7 @@
public void testSimpleCopy() throws Exception
{
- setup(10, 10 * 1024, true);
+ setup(2, 100 * 1024, false);
createJournal();
startJournal();
load();
@@ -59,26 +65,70 @@
addWithSize(1024, sequence.incrementAndGet(), sequence.incrementAndGet(), sequence.incrementAndGet(), sequence.incrementAndGet());
}
addTx(sequence.incrementAndGet(), sequence.incrementAndGet(), sequence.incrementAndGet(), sequence.incrementAndGet());
-
+ for (int i = 0 ; i < 10; i++)
+ {
+ addWithSize(1024, sequence.incrementAndGet(), sequence.incrementAndGet(), sequence.incrementAndGet(), sequence.incrementAndGet());
+ }
+
File destDir = new File(getTestDir()+"/dest");
destDir.mkdirs();
- SequentialFileFactory nioFactory = new NIOSequentialFileFactory(destDir.getAbsolutePath());
+ SequentialFileFactory newFactory = new NIOSequentialFileFactory(destDir.getAbsolutePath());
- Journal destJournal = new JournalImpl(10 * 1024, 2, 0, 0, nioFactory, filePrefix, fileExtension, 1);
+ Journal destJournal = new JournalImpl(10 * 1024, 2, 0, 0, newFactory, filePrefix, fileExtension, 1);
destJournal.start();
destJournal.loadInternalOnly();
- journal.copyTo(destJournal);
+ CountDownLatch locked = new CountDownLatch(1);
- journal.flush();
+ JournalHandler handler = new JournalHandler(destJournal, locked, 5);
- destJournal.flush();
+ final Journal proxyJournal = (Journal)Proxy.newProxyInstance(this.getClass().getClassLoader(),new Class[]{Journal.class}, handler);
+ Thread copier = new Thread()
+ {
+ public void run()
+ {
+ try
+ {
+ journal.copyTo(proxyJournal);
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ }
+
+ }
+ };
+ copier.start();
- System.exit(1);
+ assertTrue(locked.await(10, TimeUnit.SECONDS));
+
+ sequence.set(5000);
+
+ for (int i = 0 ; i < 10 ; i++)
+ {
+ addTx(sequence.incrementAndGet(), sequence.incrementAndGet(), sequence.incrementAndGet(), sequence.incrementAndGet());
+ journal.forceMoveNextFile();
+ }
+
+
+
+ handler.unlock();
+
+ copier.join();
+
+ stopJournal();
+
+ destJournal.stop();
+
+ this.fileFactory = newFactory;
+
+ startJournal();
+
+ loadAndCheck(true);
}
@Override
@@ -93,7 +143,7 @@
return new AIOSequentialFileFactory(getTestDir(),
ConfigurationImpl.DEFAULT_JOURNAL_AIO_BUFFER_SIZE,
1000000,
- true,
+ false,
false
);
}
@@ -101,7 +151,59 @@
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
+
+
+ /** This handler will lock after N calls, until the Handler is opened again */
+ protected class JournalHandler implements InvocationHandler
+ {
+
+
+ final VariableLatch valve = new VariableLatch();
+
+ final CountDownLatch locked;
+
+ final int executionsBeforeLock;
+
+ int executions = 0;
+
+
+ private final Journal target;
+
+ public JournalHandler(Journal journal, CountDownLatch locked, int executionsBeforeLock)
+ {
+ this.target = journal;
+ this.locked = locked;
+ this.executionsBeforeLock = executionsBeforeLock;
+ }
+
+ private void lock()
+ {
+ valve.up();
+ }
+
+ public void unlock()
+ {
+ valve.down();
+ }
+
+ public Object invoke(Object proxy, Method method, Object[] args) throws Throwable
+ {
+ System.out.println("Invoked " + method.getName());
+ if (executions ++ == executionsBeforeLock)
+ {
+ lock();
+ locked.countDown();
+ }
+ if (!valve.waitCompletion(10000))
+ {
+ throw new IllegalStateException("Timeout waiting for open valve");
+ }
+ return method.invoke(target, args);
+ }
+
+ }
+
// Private -------------------------------------------------------
// Inner classes -------------------------------------------------
More information about the hornetq-commits
mailing list