[hornetq-commits] JBoss hornetq SVN: r8153 - in branches/Clebert_Sync: src/main/org/hornetq/core/persistence/impl/journal and 1 other directories.
do-not-reply at jboss.org
do-not-reply at jboss.org
Tue Oct 27 19:48:10 EDT 2009
Author: clebert.suconic at jboss.com
Date: 2009-10-27 19:48:09 -0400 (Tue, 27 Oct 2009)
New Revision: 8153
Modified:
branches/Clebert_Sync/src/main/org/hornetq/core/journal/impl/JournalCopier.java
branches/Clebert_Sync/src/main/org/hornetq/core/journal/impl/JournalImpl.java
branches/Clebert_Sync/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
branches/Clebert_Sync/tests/src/org/hornetq/tests/unit/core/journal/impl/CopyJournalTest.java
Log:
Backup changes
Modified: branches/Clebert_Sync/src/main/org/hornetq/core/journal/impl/JournalCopier.java
===================================================================
--- branches/Clebert_Sync/src/main/org/hornetq/core/journal/impl/JournalCopier.java 2009-10-27 21:13:03 UTC (rev 8152)
+++ branches/Clebert_Sync/src/main/org/hornetq/core/journal/impl/JournalCopier.java 2009-10-27 23:48:09 UTC (rev 8153)
@@ -36,7 +36,7 @@
/** enable some trace at development. */
private static final boolean DEV_TRACE = true;
- private static final boolean isTraceEnabled = log.isTraceEnabled();
+ private static final boolean isTrace = log.isTraceEnabled();
private static void trace(final String msg)
{
@@ -49,7 +49,8 @@
private final Journal journalTo;
- /** Proxy mode means, everything will be copied over without any evaluations such as */
+ /** Proxy mode means, everything will be copied over without any evaluations.
+ * This is useful at the end of copying when everything needs to be copied. */
private boolean proxyMode = false;
// Static --------------------------------------------------------
Modified: branches/Clebert_Sync/src/main/org/hornetq/core/journal/impl/JournalImpl.java
===================================================================
--- branches/Clebert_Sync/src/main/org/hornetq/core/journal/impl/JournalImpl.java 2009-10-27 21:13:03 UTC (rev 8152)
+++ branches/Clebert_Sync/src/main/org/hornetq/core/journal/impl/JournalImpl.java 2009-10-27 23:48:09 UTC (rev 8153)
@@ -1553,6 +1553,8 @@
// Need to make sure everything is out of executors and on the disk before backing it up
flush();
+ copier.setProxyMode(true);
+
List<JournalFile> newDataFilesToProcess = getSnapshotFilesToProcess();
Collections.sort(newDataFilesToProcess, new JournalFileComparator());
Modified: branches/Clebert_Sync/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- branches/Clebert_Sync/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2009-10-27 21:13:03 UTC (rev 8152)
+++ branches/Clebert_Sync/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2009-10-27 23:48:09 UTC (rev 8153)
@@ -184,7 +184,7 @@
SequentialFileFactory bindingsFF = new NIOSequentialFileFactory(bindingsDir);
- Journal localBindings = new JournalImpl(1024 * 1024,
+ JournalImpl localBindings = new JournalImpl(1024 * 1024,
2,
config.getJournalCompactMinFiles(),
config.getJournalCompactPercentage(),
@@ -252,7 +252,7 @@
this.idGenerator = new BatchingIDGenerator(0, CHECKPOINT_BATCH_SIZE, bindingsJournal);
}
- Journal localMessage = new JournalImpl(config.getJournalFileSize(),
+ JournalImpl localMessage = new JournalImpl(config.getJournalFileSize(),
config.getJournalMinFiles(),
config.getJournalCompactMinFiles(),
config.getJournalCompactPercentage(),
Modified: branches/Clebert_Sync/tests/src/org/hornetq/tests/unit/core/journal/impl/CopyJournalTest.java
===================================================================
--- branches/Clebert_Sync/tests/src/org/hornetq/tests/unit/core/journal/impl/CopyJournalTest.java 2009-10-27 21:13:03 UTC (rev 8152)
+++ branches/Clebert_Sync/tests/src/org/hornetq/tests/unit/core/journal/impl/CopyJournalTest.java 2009-10-27 23:48:09 UTC (rev 8153)
@@ -17,6 +17,7 @@
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
+import java.util.ArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@@ -44,50 +45,67 @@
// Attributes ----------------------------------------------------
final AtomicInteger sequence = new AtomicInteger(0);
-
+
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
// Public --------------------------------------------------------
-
+
public void testSimpleCopy() throws Exception
{
setup(2, 100 * 1024, false);
createJournal();
startJournal();
load();
-
-
- for (int i = 0 ; i < 10; i++)
+ ArrayList<Long> transactions = new ArrayList<Long>();
+
+ ArrayList<Long> toDelete = new ArrayList<Long>();
+
+ for (int i = 0; i < 40; i++)
{
- addWithSize(1024, sequence.incrementAndGet(), sequence.incrementAndGet(), sequence.incrementAndGet(), sequence.incrementAndGet());
+ long iDelete = sequence.incrementAndGet();
+
+ toDelete.add(iDelete);
+
+ addWithSize(1024, iDelete);
}
- addTx(sequence.incrementAndGet(), sequence.incrementAndGet(), sequence.incrementAndGet(), sequence.incrementAndGet());
- for (int i = 0 ; i < 10; i++)
+
+ long tx = sequence.incrementAndGet();
+ transactions.add(tx);
+ addTx(tx, sequence.incrementAndGet(), sequence.incrementAndGet(), sequence.incrementAndGet());
+
+ for (int i = 0; i < 10; i++)
{
- addWithSize(1024, sequence.incrementAndGet(), sequence.incrementAndGet(), sequence.incrementAndGet(), sequence.incrementAndGet());
+ addWithSize(1024,
+ sequence.incrementAndGet(),
+ sequence.incrementAndGet(),
+ sequence.incrementAndGet(),
+ sequence.incrementAndGet());
}
-
- File destDir = new File(getTestDir()+"/dest");
-
+
+ File destDir = new File(getTestDir() + "/dest");
+
destDir.mkdirs();
-
+
SequentialFileFactory newFactory = new NIOSequentialFileFactory(destDir.getAbsolutePath());
-
+
Journal destJournal = new JournalImpl(10 * 1024, 2, 0, 0, newFactory, filePrefix, fileExtension, 1);
destJournal.start();
destJournal.loadInternalOnly();
-
+
CountDownLatch locked = new CountDownLatch(1);
-
+
JournalHandler handler = new JournalHandler(destJournal, locked, 5);
-
- final Journal proxyJournal = (Journal)Proxy.newProxyInstance(this.getClass().getClassLoader(),new Class[]{Journal.class}, handler);
-
+
+ final Journal proxyJournal = (Journal)Proxy.newProxyInstance(this.getClass().getClassLoader(),
+ new Class[] { Journal.class },
+ handler);
+
Thread copier = new Thread()
{
+ @Override
public void run()
{
try
@@ -98,36 +116,50 @@
{
e.printStackTrace();
}
-
+
}
};
-
+
copier.start();
-
+
assertTrue(locked.await(10, TimeUnit.SECONDS));
-
+
sequence.set(5000);
-
- for (int i = 0 ; i < 10 ; i++)
+
+ for (int i = 0; i < 10; i++)
{
- addTx(sequence.incrementAndGet(), sequence.incrementAndGet(), sequence.incrementAndGet(), sequence.incrementAndGet());
+
+ tx = sequence.incrementAndGet();
+ transactions.add(tx);
+ addTx(tx, sequence.incrementAndGet(), sequence.incrementAndGet(), sequence.incrementAndGet());
+
journal.forceMoveNextFile();
}
-
-
-
+
+ for (Long txToCommit : transactions)
+ {
+ commit(txToCommit);
+ }
+
+ for (Long iDelete : toDelete)
+ {
+ delete(iDelete);
+ }
+
handler.unlock();
-
+
copier.join();
-
+
stopJournal();
-
+
destJournal.stop();
-
- this.fileFactory = newFactory;
-
+
+ fileFactory = newFactory;
+
+ createJournal();
+
startJournal();
-
+
loadAndCheck(true);
}
@@ -144,53 +176,47 @@
ConfigurationImpl.DEFAULT_JOURNAL_AIO_BUFFER_SIZE,
1000000,
false,
- false
- );
+ false);
}
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
-
-
+
/** This handler will lock after N calls, until the Handler is opened again */
protected class JournalHandler implements InvocationHandler
{
-
-
+
final VariableLatch valve = new VariableLatch();
-
+
final CountDownLatch locked;
-
+
final int executionsBeforeLock;
-
+
int executions = 0;
-
-
+
private final Journal target;
-
- public JournalHandler(Journal journal, CountDownLatch locked, int executionsBeforeLock)
+
+ public JournalHandler(final Journal journal, final CountDownLatch locked, final int executionsBeforeLock)
{
- this.target = journal;
+ target = journal;
this.locked = locked;
this.executionsBeforeLock = executionsBeforeLock;
}
-
+
private void lock()
{
valve.up();
}
-
+
public void unlock()
{
valve.down();
}
-
- public Object invoke(Object proxy, Method method, Object[] args) throws Throwable
+ public Object invoke(final Object proxy, final Method method, final Object[] args) throws Throwable
{
- System.out.println("Invoked " + method.getName());
- if (executions ++ == executionsBeforeLock)
+ if (executions++ == executionsBeforeLock)
{
lock();
locked.countDown();
@@ -201,7 +227,7 @@
}
return method.invoke(target, args);
}
-
+
}
// Private -------------------------------------------------------
More information about the hornetq-commits
mailing list