[hornetq-commits] JBoss hornetq SVN: r8160 - in branches/Clebert_Sync: src/main/org/hornetq/core/journal/impl and 7 other directories.
do-not-reply at jboss.org
do-not-reply at jboss.org
Wed Oct 28 18:58:47 EDT 2009
Author: clebert.suconic at jboss.com
Date: 2009-10-28 18:58:47 -0400 (Wed, 28 Oct 2009)
New Revision: 8160
Modified:
branches/Clebert_Sync/src/main/org/hornetq/core/journal/Journal.java
branches/Clebert_Sync/src/main/org/hornetq/core/journal/impl/JournalImpl.java
branches/Clebert_Sync/src/main/org/hornetq/core/persistence/StorageManager.java
branches/Clebert_Sync/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
branches/Clebert_Sync/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java
branches/Clebert_Sync/src/main/org/hornetq/core/replication/impl/ReplicatedJournal.java
branches/Clebert_Sync/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java
branches/Clebert_Sync/tests/src/org/hornetq/tests/unit/core/journal/impl/CopyJournalTest.java
branches/Clebert_Sync/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java
Log:
Backup changes
Modified: branches/Clebert_Sync/src/main/org/hornetq/core/journal/Journal.java
===================================================================
--- branches/Clebert_Sync/src/main/org/hornetq/core/journal/Journal.java 2009-10-28 22:09:58 UTC (rev 8159)
+++ branches/Clebert_Sync/src/main/org/hornetq/core/journal/Journal.java 2009-10-28 22:58:47 UTC (rev 8160)
@@ -91,8 +91,11 @@
void perfBlast(int pages) throws Exception;
- /** Read the entire content of the journal and copy it to another Journal */
- void copyTo(Journal destJournal) throws Exception;
+ /** Read the entire content of the journal and copy it to another Journal.
+ * @param destJournal the journal where the records are being copied to
+ * @param afterCopy An action that should be executed right before the end of the copy while the journal is in lock mode
+ * */
+ void copyTo(Journal destJournal, Runnable afterCopy) throws Exception;
/** This method will flush everything and make a hard sync on the journal. Use it with caution. (on tests mainly) */
void flush() throws Exception;
Modified: branches/Clebert_Sync/src/main/org/hornetq/core/journal/impl/JournalImpl.java
===================================================================
--- branches/Clebert_Sync/src/main/org/hornetq/core/journal/impl/JournalImpl.java 2009-10-28 22:09:58 UTC (rev 8159)
+++ branches/Clebert_Sync/src/main/org/hornetq/core/journal/impl/JournalImpl.java 2009-10-28 22:58:47 UTC (rev 8160)
@@ -1463,7 +1463,7 @@
/* (non-Javadoc)
* @see org.hornetq.core.journal.Journal#copyTo(org.hornetq.core.journal.Journal)
*/
- public void copyTo(Journal journal) throws Exception
+ public void copyTo(Journal journal, Runnable afterCopy) throws Exception
{
if (state != STATE_LOADED)
{
@@ -1579,6 +1579,7 @@
}
+ afterCopy.run();
}
finally
{
Modified: branches/Clebert_Sync/src/main/org/hornetq/core/persistence/StorageManager.java
===================================================================
--- branches/Clebert_Sync/src/main/org/hornetq/core/persistence/StorageManager.java 2009-10-28 22:09:58 UTC (rev 8159)
+++ branches/Clebert_Sync/src/main/org/hornetq/core/persistence/StorageManager.java 2009-10-28 22:58:47 UTC (rev 8160)
@@ -23,6 +23,7 @@
import org.hornetq.core.paging.PagingManager;
import org.hornetq.core.postoffice.Binding;
import org.hornetq.core.postoffice.PostOffice;
+import org.hornetq.core.replication.ReplicationManager;
import org.hornetq.core.server.HornetQComponent;
import org.hornetq.core.server.LargeServerMessage;
import org.hornetq.core.server.MessageReference;
@@ -115,6 +116,8 @@
/** This method is only useful at the backup side. We only load internal structures making the journals ready for
* append mode on the backup side. */
void loadInternalOnly() throws Exception;
+
+ void initiateReplication(ReplicationManager replication) throws Exception;
public void loadMessageJournal(final PostOffice postOffice,
Modified: branches/Clebert_Sync/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- branches/Clebert_Sync/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2009-10-28 22:09:58 UTC (rev 8159)
+++ branches/Clebert_Sync/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2009-10-28 22:58:47 UTC (rev 8160)
@@ -127,7 +127,7 @@
private final BatchingIDGenerator idGenerator;
- private final ReplicationManager replicator;
+ private volatile ReplicationManager replicator;
private final ReplicatedJournal messageJournal;
@@ -601,6 +601,16 @@
SequentialFile fileToRename = createFileForLargeMessage(message.getMessageID(), true);
message.getFile().renameTo(fileToRename.getFileName());
}
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.persistence.StorageManager#setReplicationNode(org.hornetq.core.replication.ReplicationManager)
+ */
+ public void initiateReplication(ReplicationManager replication) throws Exception
+ {
+ this.replicator = replication;
+ bindingsJournal.initiateReplication(replication);
+ messageJournal.initiateReplication(replication);
+ }
private static final class AddMessageRecord
{
Modified: branches/Clebert_Sync/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java
===================================================================
--- branches/Clebert_Sync/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java 2009-10-28 22:09:58 UTC (rev 8159)
+++ branches/Clebert_Sync/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java 2009-10-28 22:58:47 UTC (rev 8160)
@@ -29,6 +29,7 @@
import org.hornetq.core.postoffice.Binding;
import org.hornetq.core.postoffice.PostOffice;
import org.hornetq.core.remoting.spi.HornetQBuffer;
+import org.hornetq.core.replication.ReplicationManager;
import org.hornetq.core.server.LargeServerMessage;
import org.hornetq.core.server.MessageReference;
import org.hornetq.core.server.Queue;
@@ -317,5 +318,12 @@
{
}
+ /* (non-Javadoc)
+ * @see org.hornetq.core.persistence.StorageManager#initiateReplication(org.hornetq.core.replication.ReplicationManager)
+ */
+ public void initiateReplication(ReplicationManager replication) throws Exception
+ {
+ }
+
}
Modified: branches/Clebert_Sync/src/main/org/hornetq/core/replication/impl/ReplicatedJournal.java
===================================================================
--- branches/Clebert_Sync/src/main/org/hornetq/core/replication/impl/ReplicatedJournal.java 2009-10-28 22:09:58 UTC (rev 8159)
+++ branches/Clebert_Sync/src/main/org/hornetq/core/replication/impl/ReplicatedJournal.java 2009-10-28 22:58:47 UTC (rev 8160)
@@ -87,7 +87,46 @@
// Constructors --------------------------------------------------
// Public --------------------------------------------------------
+
/**
+ * @param replication
+ */
+ public void initiateReplication(final ReplicationManager replication) throws Exception
+ {
+ if (replicationManager != null)
+ {
+ journalLock.writeLock();
+ try
+ {
+ replicationManager = null;
+ }
+ finally
+ {
+ journalLock.writeUnLock();
+ }
+ }
+
+ // Instantiate a new Replicated Journal that won't have any local journal associated.
+ // This will happen in parallele while the current journal still being used.
+ ReplicatedJournal proxy = new ReplicatedJournal(this.journalID,
+ null,
+ null,
+ replication);
+
+ localJournal.copyTo(proxy, new Runnable()
+ {
+ public void run()
+ {
+ // This needs to be done right after the copy is finished
+ // But while the journal still locked on its final stage,
+ // so after this point all the journal appends are going to be replicated
+ ReplicatedJournal.this.replicationManager = replication;
+ }
+ });
+ }
+
+ // Journal implementation ----------------------------------------
+ /**
* @param id
* @param recordType
* @param record
@@ -601,7 +640,7 @@
/* (non-Javadoc)
* @see org.hornetq.core.journal.Journal#copyTo(org.hornetq.core.journal.Journal)
*/
- public void copyTo(final Journal destJournal) throws Exception
+ public void copyTo(final Journal destJournal, Runnable aferCopy) throws Exception
{
// This would be a nonsense operation. Only the real journal can copyTo
throw new IllegalStateException("Operation Not Implemeted!");
Modified: branches/Clebert_Sync/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java
===================================================================
--- branches/Clebert_Sync/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java 2009-10-28 22:09:58 UTC (rev 8159)
+++ branches/Clebert_Sync/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java 2009-10-28 22:58:47 UTC (rev 8160)
@@ -763,5 +763,12 @@
{
}
+ /* (non-Javadoc)
+ * @see org.hornetq.core.journal.Journal#copyTo(org.hornetq.core.journal.Journal, java.lang.Runnable)
+ */
+ public void copyTo(Journal destJournal, Runnable afterCopy) throws Exception
+ {
+ }
+
}
}
Modified: branches/Clebert_Sync/tests/src/org/hornetq/tests/unit/core/journal/impl/CopyJournalTest.java
===================================================================
--- branches/Clebert_Sync/tests/src/org/hornetq/tests/unit/core/journal/impl/CopyJournalTest.java 2009-10-28 22:09:58 UTC (rev 8159)
+++ branches/Clebert_Sync/tests/src/org/hornetq/tests/unit/core/journal/impl/CopyJournalTest.java 2009-10-28 22:58:47 UTC (rev 8160)
@@ -20,6 +20,7 @@
import java.util.ArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.hornetq.core.config.impl.ConfigurationImpl;
@@ -102,6 +103,8 @@
final Journal proxyJournal = (Journal)Proxy.newProxyInstance(this.getClass().getClassLoader(),
new Class[] { Journal.class },
handler);
+
+ final AtomicBoolean copied = new AtomicBoolean(false);
Thread copier = new Thread()
{
@@ -110,7 +113,13 @@
{
try
{
- journal.copyTo(proxyJournal);
+ journal.copyTo(proxyJournal, new Runnable()
+ {
+ public void run()
+ {
+ copied.set(true);
+ }
+ });
}
catch (Exception e)
{
@@ -149,6 +158,8 @@
handler.unlock();
copier.join();
+
+ assertTrue(copied.get());
stopJournal();
Modified: branches/Clebert_Sync/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java
===================================================================
--- branches/Clebert_Sync/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java 2009-10-28 22:09:58 UTC (rev 8159)
+++ branches/Clebert_Sync/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java 2009-10-28 22:58:47 UTC (rev 8160)
@@ -46,6 +46,7 @@
import org.hornetq.core.postoffice.Binding;
import org.hornetq.core.postoffice.PostOffice;
import org.hornetq.core.remoting.spi.HornetQBuffer;
+import org.hornetq.core.replication.ReplicationManager;
import org.hornetq.core.server.LargeServerMessage;
import org.hornetq.core.server.MessageReference;
import org.hornetq.core.server.Queue;
@@ -1177,6 +1178,13 @@
}
+ /* (non-Javadoc)
+ * @see org.hornetq.core.persistence.StorageManager#initiateReplication(org.hornetq.core.replication.ReplicationManager)
+ */
+ public void initiateReplication(ReplicationManager replication) throws Exception
+ {
+ }
+
}
class FakeStoreFactory implements PagingStoreFactory
More information about the hornetq-commits
mailing list