Author: clebert.suconic(a)jboss.com
Date: 2009-10-27 14:08:05 -0400 (Tue, 27 Oct 2009)
New Revision: 8151
Added:
branches/Clebert_Sync/tests/src/org/hornetq/tests/unit/core/journal/impl/CopyJournalTest.java
Modified:
branches/Clebert_Sync/src/main/org/hornetq/core/journal/Journal.java
branches/Clebert_Sync/src/main/org/hornetq/core/journal/impl/JournalCopier.java
branches/Clebert_Sync/src/main/org/hornetq/core/journal/impl/JournalImpl.java
branches/Clebert_Sync/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
branches/Clebert_Sync/src/main/org/hornetq/core/replication/impl/ReplicatedJournal.java
branches/Clebert_Sync/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java
branches/Clebert_Sync/tests/src/org/hornetq/tests/unit/core/journal/impl/JournalImplTestBase.java
Log:
Backup changes
Modified: branches/Clebert_Sync/src/main/org/hornetq/core/journal/Journal.java
===================================================================
--- branches/Clebert_Sync/src/main/org/hornetq/core/journal/Journal.java 2009-10-27
16:16:44 UTC (rev 8150)
+++ branches/Clebert_Sync/src/main/org/hornetq/core/journal/Journal.java 2009-10-27
18:08:05 UTC (rev 8151)
@@ -78,6 +78,11 @@
// Load
long load(LoaderCallback reloadManager) throws Exception;
+
+ /** Load internal data structures and not expose any data.
+ * This is only useful if you're using the journal but not interested on the
current data.
+ * Useful in situations where the journal is being replicated, copied... etc. */
+ void loadInternalOnly() throws Exception;
long load(List<RecordInfo> committedRecords, List<PreparedTransactionInfo>
preparedTransactions, TransactionFailureCallback transactionFailure) throws Exception;
Modified: branches/Clebert_Sync/src/main/org/hornetq/core/journal/impl/JournalCopier.java
===================================================================
---
branches/Clebert_Sync/src/main/org/hornetq/core/journal/impl/JournalCopier.java 2009-10-27
16:16:44 UTC (rev 8150)
+++
branches/Clebert_Sync/src/main/org/hornetq/core/journal/impl/JournalCopier.java 2009-10-27
18:08:05 UTC (rev 8151)
@@ -23,7 +23,7 @@
import org.hornetq.utils.DataConstants;
/**
- * A JournalCopier
+ * This will read records
*
* @author <mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
*
@@ -35,6 +35,17 @@
private static final Logger log = Logger.getLogger(JournalCopier.class);
+
+ /** enable some trace at development. */
+ private static final boolean DEV_TRACE = true;
+
+ private static final boolean isTraceEnabled = log.isTraceEnabled();
+
+ private static void trace(String msg)
+ {
+ System.out.println("JournalCopier::" + msg);
+ }
+
// Attributes ----------------------------------------------------
private final Set<Long> pendingTransactions;
@@ -72,14 +83,29 @@
{
if (lookupRecord(info.id))
{
+ if (DEV_TRACE)
+ {
+ trace("Backing add ID = " + info.id);
+ }
journalTo.appendAddRecord(info.id, info.userRecordType, info.data, false);
}
+ else
+ {
+ if (DEV_TRACE)
+ {
+ trace("Ignoring add ID = " + info.id);
+ }
+ }
}
public void onReadAddRecordTX(final long transactionID, final RecordInfo info) throws
Exception
{
if (pendingTransactions.contains(transactionID))
{
+ if (DEV_TRACE)
+ {
+ trace("Backing add TXID = " + transactionID + " id = " +
info.id);
+ }
journalTo.appendAddRecordTransactional(transactionID, info.id,
info.userRecordType, info.data);
}
else
@@ -132,7 +158,7 @@
if (pendingTransactions.contains(transactionID))
{
// Sanity check, this should never happen
- log.warn("Inconsistency during compacting: RollbackRecord ID = " +
transactionID +
+ log.warn("Inconsistency during copying: RollbackRecord ID = " +
transactionID +
" for an already rolled back transaction during
compacting");
}
}
Modified: branches/Clebert_Sync/src/main/org/hornetq/core/journal/impl/JournalImpl.java
===================================================================
---
branches/Clebert_Sync/src/main/org/hornetq/core/journal/impl/JournalImpl.java 2009-10-27
16:16:44 UTC (rev 8150)
+++
branches/Clebert_Sync/src/main/org/hornetq/core/journal/impl/JournalImpl.java 2009-10-27
18:08:05 UTC (rev 8151)
@@ -83,7 +83,7 @@
private static final Logger log = Logger.getLogger(JournalImpl.class);
- private static final boolean trace = false;
+ private static final boolean trace = log.isTraceEnabled();
/** This is to be set to true at DEBUG & development only */
private static final boolean LOAD_TRACE = false;
@@ -1358,7 +1358,36 @@
{
return fileFactory.getAlignment();
}
+
+ public synchronized void loadInternalOnly() throws Exception
+ {
+ LoaderCallback dummyLoader = new LoaderCallback()
+ {
+ public void failedTransaction(long transactionID, List<RecordInfo>
records, List<RecordInfo> recordsToDelete)
+ {
+ }
+
+ public void updateRecord(RecordInfo info)
+ {
+ }
+
+ public void deleteRecord(long id)
+ {
+ }
+
+ public void addRecord(RecordInfo info)
+ {
+ }
+
+ public void addPreparedTransaction(PreparedTransactionInfo preparedTransaction)
+ {
+ }
+ };
+
+ this.load(dummyLoader);
+ }
+
/**
* @see JournalImpl#load(LoaderCallback)
*/
@@ -1450,6 +1479,7 @@
flushExecutor();
// Wait the compactor and cleanup to finish case they are running
+ // This will also set the compactorRunning, as clean up and compact can't
happen at the same time
while (!compactorRunning.compareAndSet(false, true))
{
final CountDownLatch latch = new CountDownLatch(1);
@@ -1515,7 +1545,7 @@
readJournalFile(fileFactory, file, copier);
}
- compactor.flush();
+ copier.flush();
}
finally
Modified:
branches/Clebert_Sync/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
---
branches/Clebert_Sync/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2009-10-27
16:16:44 UTC (rev 8150)
+++
branches/Clebert_Sync/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2009-10-27
18:08:05 UTC (rev 8151)
@@ -1263,32 +1263,8 @@
*/
public void loadInternalOnly() throws Exception
{
- LoaderCallback dummyLoader = new LoaderCallback()
- {
-
- public void failedTransaction(long transactionID, List<RecordInfo>
records, List<RecordInfo> recordsToDelete)
- {
- }
-
- public void updateRecord(RecordInfo info)
- {
- }
-
- public void deleteRecord(long id)
- {
- }
-
- public void addRecord(RecordInfo info)
- {
- }
-
- public void addPreparedTransaction(PreparedTransactionInfo preparedTransaction)
- {
- }
- };
-
- bindingsJournal.load(dummyLoader);
- messageJournal.load(dummyLoader);
+ bindingsJournal.loadInternalOnly();
+ messageJournal.loadInternalOnly();
}
// Public
-----------------------------------------------------------------------------------
Modified:
branches/Clebert_Sync/src/main/org/hornetq/core/replication/impl/ReplicatedJournal.java
===================================================================
---
branches/Clebert_Sync/src/main/org/hornetq/core/replication/impl/ReplicatedJournal.java 2009-10-27
16:16:44 UTC (rev 8150)
+++
branches/Clebert_Sync/src/main/org/hornetq/core/replication/impl/ReplicatedJournal.java 2009-10-27
18:08:05 UTC (rev 8151)
@@ -26,7 +26,6 @@
import org.hornetq.core.persistence.impl.journal.JournalStorageManager;
import org.hornetq.core.replication.ReplicationManager;
-
/**
* Used by the {@link JournalStorageManager} to replicate journal calls.
*
@@ -52,9 +51,7 @@
private final byte journalID;
- public ReplicatedJournal(final byte journaID,
- final Journal localJournal,
- final ReplicationManager replicationManager)
+ public ReplicatedJournal(final byte journaID, final Journal localJournal, final
ReplicationManager replicationManager)
{
super();
journalID = journaID;
@@ -62,9 +59,17 @@
this.replicationManager = replicationManager;
}
+ public ReplicatedJournal(final byte journaID, final ReplicationManager
replicationManager)
+ {
+ super();
+ journalID = journaID;
+ localJournal = null;
+ this.replicationManager = replicationManager;
+ }
+
// Static --------------------------------------------------------
-
- private static void trace(String message)
+
+ private static void trace(final String message)
{
log.trace(message);
}
@@ -100,7 +105,10 @@
trace("Append record id = " + id + " recordType = " +
recordType);
}
replicationManager.appendAddRecord(journalID, id, recordType, record);
- localJournal.appendAddRecord(id, recordType, record, sync);
+ if (localJournal != null)
+ {
+ localJournal.appendAddRecord(id, recordType, record, sync);
+ }
}
/**
@@ -134,7 +142,10 @@
trace("Append record TXid = " + id + " recordType = " +
recordType);
}
replicationManager.appendAddRecordTransactional(journalID, txID, id, recordType,
record);
- localJournal.appendAddRecordTransactional(txID, id, recordType, record);
+ if (localJournal != null)
+ {
+ localJournal.appendAddRecordTransactional(txID, id, recordType, record);
+ }
}
/**
@@ -150,7 +161,10 @@
trace("AppendCommit " + txID);
}
replicationManager.appendCommitRecord(journalID, txID);
- localJournal.appendCommitRecord(txID, sync);
+ if (localJournal != null)
+ {
+ localJournal.appendCommitRecord(txID, sync);
+ }
}
/**
@@ -166,7 +180,10 @@
trace("AppendDelete " + id);
}
replicationManager.appendDeleteRecord(journalID, id);
- localJournal.appendDeleteRecord(id, sync);
+ if (localJournal != null)
+ {
+ localJournal.appendDeleteRecord(id, sync);
+ }
}
/**
@@ -195,7 +212,10 @@
trace("AppendDelete txID=" + txID + " id=" + id);
}
replicationManager.appendDeleteRecordTransactional(journalID, txID, id, record);
- localJournal.appendDeleteRecordTransactional(txID, id, record);
+ if (localJournal != null)
+ {
+ localJournal.appendDeleteRecordTransactional(txID, id, record);
+ }
}
/**
@@ -211,7 +231,10 @@
trace("AppendDelete (noencoding) txID=" + txID + " id=" +
id);
}
replicationManager.appendDeleteRecordTransactional(journalID, txID, id);
- localJournal.appendDeleteRecordTransactional(txID, id);
+ if (localJournal != null)
+ {
+ localJournal.appendDeleteRecordTransactional(txID, id);
+ }
}
/**
@@ -240,7 +263,10 @@
trace("AppendPrepare txID=" + txID);
}
replicationManager.appendPrepareRecord(journalID, txID, transactionData);
- localJournal.appendPrepareRecord(txID, transactionData, sync);
+ if (localJournal != null)
+ {
+ localJournal.appendPrepareRecord(txID, transactionData, sync);
+ }
}
/**
@@ -256,7 +282,10 @@
trace("AppendRollback " + txID);
}
replicationManager.appendRollbackRecord(journalID, txID);
- localJournal.appendRollbackRecord(txID, sync);
+ if (localJournal != null)
+ {
+ localJournal.appendRollbackRecord(txID, sync);
+ }
}
/**
@@ -287,7 +316,10 @@
trace("AppendUpdateRecord id = " + id + " , recordType = " +
recordType);
}
replicationManager.appendUpdateRecord(journalID, id, recordType, record);
- localJournal.appendUpdateRecord(id, recordType, record, sync);
+ if (localJournal != null)
+ {
+ localJournal.appendUpdateRecord(id, recordType, record, sync);
+ }
}
/**
@@ -324,7 +356,10 @@
trace("AppendUpdateRecord txid=" + txID + " id = " + id +
" , recordType = " + recordType);
}
replicationManager.appendUpdateRecordTransactional(journalID, txID, id, recordType,
record);
- localJournal.appendUpdateRecordTransactional(txID, id, recordType, record);
+ if (localJournal != null)
+ {
+ localJournal.appendUpdateRecordTransactional(txID, id, recordType, record);
+ }
}
/**
@@ -339,7 +374,14 @@
final List<PreparedTransactionInfo> preparedTransactions,
final TransactionFailureCallback transactionFailure) throws
Exception
{
- return localJournal.load(committedRecords, preparedTransactions,
transactionFailure);
+ if (localJournal != null)
+ {
+ return localJournal.load(committedRecords, preparedTransactions,
transactionFailure);
+ }
+ else
+ {
+ return -1;
+ }
}
/**
@@ -350,7 +392,14 @@
*/
public long load(final LoaderCallback reloadManager) throws Exception
{
- return localJournal.load(reloadManager);
+ if (localJournal != null)
+ {
+ return localJournal.load(reloadManager);
+ }
+ else
+ {
+ return -1;
+ }
}
/**
@@ -360,7 +409,10 @@
*/
public void perfBlast(final int pages) throws Exception
{
- localJournal.perfBlast(pages);
+ if (localJournal != null)
+ {
+ localJournal.perfBlast(pages);
+ }
}
/**
@@ -369,7 +421,10 @@
*/
public void start() throws Exception
{
- localJournal.start();
+ if (localJournal != null)
+ {
+ localJournal.start();
+ }
}
/**
@@ -378,7 +433,10 @@
*/
public void stop() throws Exception
{
- localJournal.stop();
+ if (localJournal != null)
+ {
+ localJournal.stop();
+ }
}
/* (non-Javadoc)
@@ -386,7 +444,14 @@
*/
public int getAlignment() throws Exception
{
- return localJournal.getAlignment();
+ if (localJournal != null)
+ {
+ return localJournal.getAlignment();
+ }
+ else
+ {
+ return 1;
+ }
}
/* (non-Javadoc)
@@ -394,13 +459,20 @@
*/
public boolean isStarted()
{
- return localJournal.isStarted();
+ if (localJournal != null)
+ {
+ return localJournal.isStarted();
+ }
+ else
+ {
+ return true;
+ }
}
/* (non-Javadoc)
* @see org.hornetq.core.journal.Journal#copyTo(org.hornetq.core.journal.Journal)
*/
- public void copyTo(Journal destJournal) throws Exception
+ public void copyTo(final Journal destJournal) throws Exception
{
// This would be a nonsense operation. Only the real journal can copyTo
throw new IllegalStateException("Operation Not Implemeted!");
@@ -411,9 +483,19 @@
*/
public void flush() throws Exception
{
- localJournal.flush();
+ if (localJournal != null)
+ {
+ localJournal.flush();
+ }
}
+ /* (non-Javadoc)
+ * @see org.hornetq.core.journal.Journal#loadInternalOnly()
+ */
+ public void loadInternalOnly() throws Exception
+ {
+ }
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
Modified:
branches/Clebert_Sync/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java
===================================================================
---
branches/Clebert_Sync/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java 2009-10-27
16:16:44 UTC (rev 8150)
+++
branches/Clebert_Sync/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java 2009-10-27
18:08:05 UTC (rev 8151)
@@ -713,5 +713,12 @@
{
}
+ /* (non-Javadoc)
+ * @see org.hornetq.core.journal.Journal#loadInternalOnly()
+ */
+ public void loadInternalOnly() throws Exception
+ {
+ }
+
}
}
Added:
branches/Clebert_Sync/tests/src/org/hornetq/tests/unit/core/journal/impl/CopyJournalTest.java
===================================================================
---
branches/Clebert_Sync/tests/src/org/hornetq/tests/unit/core/journal/impl/CopyJournalTest.java
(rev 0)
+++
branches/Clebert_Sync/tests/src/org/hornetq/tests/unit/core/journal/impl/CopyJournalTest.java 2009-10-27
18:08:05 UTC (rev 8151)
@@ -0,0 +1,109 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.unit.core.journal.impl;
+
+import java.io.File;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.hornetq.core.config.impl.ConfigurationImpl;
+import org.hornetq.core.journal.Journal;
+import org.hornetq.core.journal.SequentialFileFactory;
+import org.hornetq.core.journal.impl.AIOSequentialFileFactory;
+import org.hornetq.core.journal.impl.JournalImpl;
+import org.hornetq.core.journal.impl.NIOSequentialFileFactory;
+
+/**
+ * A CopyJournalTest
+ *
+ * @author <mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public class CopyJournalTest extends JournalImplTestBase
+{
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ final AtomicInteger sequence = new AtomicInteger(0);
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ public void testSimpleCopy() throws Exception
+ {
+ setup(10, 10 * 1024, true);
+ createJournal();
+ startJournal();
+ load();
+
+
+
+ for (int i = 0 ; i < 10; i++)
+ {
+ addWithSize(1024, sequence.incrementAndGet(), sequence.incrementAndGet(),
sequence.incrementAndGet(), sequence.incrementAndGet());
+ }
+ addTx(sequence.incrementAndGet(), sequence.incrementAndGet(),
sequence.incrementAndGet(), sequence.incrementAndGet());
+
+ File destDir = new File(getTestDir()+"/dest");
+
+ destDir.mkdirs();
+
+ SequentialFileFactory nioFactory = new
NIOSequentialFileFactory(destDir.getAbsolutePath());
+
+ Journal destJournal = new JournalImpl(10 * 1024, 2, 0, 0, nioFactory, filePrefix,
fileExtension, 1);
+ destJournal.start();
+ destJournal.loadInternalOnly();
+
+ journal.copyTo(destJournal);
+
+ journal.flush();
+
+ destJournal.flush();
+
+
+
+ System.exit(1);
+ }
+
+ @Override
+ protected SequentialFileFactory getFileFactory() throws Exception
+ {
+ File file = new File(getTestDir());
+
+ deleteDirectory(file);
+
+ file.mkdir();
+
+ return new AIOSequentialFileFactory(getTestDir(),
+
ConfigurationImpl.DEFAULT_JOURNAL_AIO_BUFFER_SIZE,
+ 1000000,
+ true,
+ false
+ );
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
Modified:
branches/Clebert_Sync/tests/src/org/hornetq/tests/unit/core/journal/impl/JournalImplTestBase.java
===================================================================
---
branches/Clebert_Sync/tests/src/org/hornetq/tests/unit/core/journal/impl/JournalImplTestBase.java 2009-10-27
16:16:44 UTC (rev 8150)
+++
branches/Clebert_Sync/tests/src/org/hornetq/tests/unit/core/journal/impl/JournalImplTestBase.java 2009-10-27
18:08:05 UTC (rev 8151)
@@ -13,6 +13,7 @@
package org.hornetq.tests.unit.core.journal.impl;
+import java.io.File;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.LinkedHashMap;
@@ -26,6 +27,7 @@
import org.hornetq.core.journal.RecordInfo;
import org.hornetq.core.journal.SequentialFileFactory;
import org.hornetq.core.journal.TestableJournal;
+import org.hornetq.core.journal.impl.AIOSequentialFileFactory;
import org.hornetq.core.journal.impl.JournalImpl;
import org.hornetq.core.logging.Logger;
import org.hornetq.tests.util.UnitTestCase;
@@ -465,10 +467,13 @@
protected void printJournalLists(final List<RecordInfo> expected, final
List<RecordInfo> actual)
{
System.out.println("***********************************************");
- System.out.println("Expected list:");
- for (RecordInfo info : expected)
+ if (expected != null)
{
- System.out.println("Record " + info.id + " isUpdate = " +
info.isUpdate);
+ System.out.println("Expected list:");
+ for (RecordInfo info : expected)
+ {
+ System.out.println("Record " + info.id + " isUpdate = " +
info.isUpdate);
+ }
}
if (actual != null)
{