[hornetq-commits] JBoss hornetq SVN: r8160 - in branches/Clebert_Sync: src/main/org/hornetq/core/journal/impl and 7 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Wed Oct 28 18:58:47 EDT 2009


Author: clebert.suconic at jboss.com
Date: 2009-10-28 18:58:47 -0400 (Wed, 28 Oct 2009)
New Revision: 8160

Modified:
   branches/Clebert_Sync/src/main/org/hornetq/core/journal/Journal.java
   branches/Clebert_Sync/src/main/org/hornetq/core/journal/impl/JournalImpl.java
   branches/Clebert_Sync/src/main/org/hornetq/core/persistence/StorageManager.java
   branches/Clebert_Sync/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
   branches/Clebert_Sync/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java
   branches/Clebert_Sync/src/main/org/hornetq/core/replication/impl/ReplicatedJournal.java
   branches/Clebert_Sync/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java
   branches/Clebert_Sync/tests/src/org/hornetq/tests/unit/core/journal/impl/CopyJournalTest.java
   branches/Clebert_Sync/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java
Log:
Backup changes

Modified: branches/Clebert_Sync/src/main/org/hornetq/core/journal/Journal.java
===================================================================
--- branches/Clebert_Sync/src/main/org/hornetq/core/journal/Journal.java	2009-10-28 22:09:58 UTC (rev 8159)
+++ branches/Clebert_Sync/src/main/org/hornetq/core/journal/Journal.java	2009-10-28 22:58:47 UTC (rev 8160)
@@ -91,8 +91,11 @@
 
    void perfBlast(int pages) throws Exception;
    
-   /** Read the entire content of the journal and copy it to another Journal */
-   void copyTo(Journal destJournal) throws Exception;
+   /** Read the entire content of the journal and copy it to another Journal.
+    *  @param destJournal the journal where the records are being copied to
+    *  @param afterCopy An action that should be executed right before the end of the copy while the journal is in lock mode
+    *  */
+   void copyTo(Journal destJournal, Runnable afterCopy) throws Exception;
 
    /** This method will flush everything and make a hard sync on the journal. Use it with caution. (on tests mainly) */
    void flush() throws Exception;

Modified: branches/Clebert_Sync/src/main/org/hornetq/core/journal/impl/JournalImpl.java
===================================================================
--- branches/Clebert_Sync/src/main/org/hornetq/core/journal/impl/JournalImpl.java	2009-10-28 22:09:58 UTC (rev 8159)
+++ branches/Clebert_Sync/src/main/org/hornetq/core/journal/impl/JournalImpl.java	2009-10-28 22:58:47 UTC (rev 8160)
@@ -1463,7 +1463,7 @@
    /* (non-Javadoc)
     * @see org.hornetq.core.journal.Journal#copyTo(org.hornetq.core.journal.Journal)
     */
-   public void copyTo(Journal journal) throws Exception
+   public void copyTo(Journal journal, Runnable afterCopy) throws Exception
    {
       if (state != STATE_LOADED)
       {
@@ -1579,6 +1579,7 @@
 
             }
             
+            afterCopy.run();
          }
          finally
          {

Modified: branches/Clebert_Sync/src/main/org/hornetq/core/persistence/StorageManager.java
===================================================================
--- branches/Clebert_Sync/src/main/org/hornetq/core/persistence/StorageManager.java	2009-10-28 22:09:58 UTC (rev 8159)
+++ branches/Clebert_Sync/src/main/org/hornetq/core/persistence/StorageManager.java	2009-10-28 22:58:47 UTC (rev 8160)
@@ -23,6 +23,7 @@
 import org.hornetq.core.paging.PagingManager;
 import org.hornetq.core.postoffice.Binding;
 import org.hornetq.core.postoffice.PostOffice;
+import org.hornetq.core.replication.ReplicationManager;
 import org.hornetq.core.server.HornetQComponent;
 import org.hornetq.core.server.LargeServerMessage;
 import org.hornetq.core.server.MessageReference;
@@ -115,6 +116,8 @@
    /** This method is only useful at the backup side. We only load internal structures making the journals ready for
     *  append mode on the backup side. */
    void loadInternalOnly() throws Exception;
+   
+   void initiateReplication(ReplicationManager replication) throws Exception;
 
    
    public void loadMessageJournal(final PostOffice postOffice,

Modified: branches/Clebert_Sync/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- branches/Clebert_Sync/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java	2009-10-28 22:09:58 UTC (rev 8159)
+++ branches/Clebert_Sync/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java	2009-10-28 22:58:47 UTC (rev 8160)
@@ -127,7 +127,7 @@
 
    private final BatchingIDGenerator idGenerator;
 
-   private final ReplicationManager replicator;
+   private volatile ReplicationManager replicator;
 
    private final ReplicatedJournal messageJournal;
 
@@ -601,6 +601,16 @@
       SequentialFile fileToRename = createFileForLargeMessage(message.getMessageID(), true);
       message.getFile().renameTo(fileToRename.getFileName());
    }
+   
+   /* (non-Javadoc)
+    * @see org.hornetq.core.persistence.StorageManager#setReplicationNode(org.hornetq.core.replication.ReplicationManager)
+    */
+   public void initiateReplication(ReplicationManager replication) throws Exception
+   {
+      this.replicator = replication;
+      bindingsJournal.initiateReplication(replication);
+      messageJournal.initiateReplication(replication);
+   }
 
    private static final class AddMessageRecord
    {

Modified: branches/Clebert_Sync/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java
===================================================================
--- branches/Clebert_Sync/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java	2009-10-28 22:09:58 UTC (rev 8159)
+++ branches/Clebert_Sync/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java	2009-10-28 22:58:47 UTC (rev 8160)
@@ -29,6 +29,7 @@
 import org.hornetq.core.postoffice.Binding;
 import org.hornetq.core.postoffice.PostOffice;
 import org.hornetq.core.remoting.spi.HornetQBuffer;
+import org.hornetq.core.replication.ReplicationManager;
 import org.hornetq.core.server.LargeServerMessage;
 import org.hornetq.core.server.MessageReference;
 import org.hornetq.core.server.Queue;
@@ -317,5 +318,12 @@
    {
    }
 
+   /* (non-Javadoc)
+    * @see org.hornetq.core.persistence.StorageManager#initiateReplication(org.hornetq.core.replication.ReplicationManager)
+    */
+   public void initiateReplication(ReplicationManager replication) throws Exception
+   {
+   }
 
+
 }

Modified: branches/Clebert_Sync/src/main/org/hornetq/core/replication/impl/ReplicatedJournal.java
===================================================================
--- branches/Clebert_Sync/src/main/org/hornetq/core/replication/impl/ReplicatedJournal.java	2009-10-28 22:09:58 UTC (rev 8159)
+++ branches/Clebert_Sync/src/main/org/hornetq/core/replication/impl/ReplicatedJournal.java	2009-10-28 22:58:47 UTC (rev 8160)
@@ -87,7 +87,46 @@
    // Constructors --------------------------------------------------
 
    // Public --------------------------------------------------------
+
    /**
+    * @param replication
+    */
+   public void initiateReplication(final ReplicationManager replication) throws Exception
+   {
+      if (replicationManager != null)
+      {
+         journalLock.writeLock();
+         try
+         {
+            replicationManager = null;
+         }
+         finally
+         {
+            journalLock.writeUnLock();
+         }
+      }
+      
+      // Instantiate a new Replicated Journal that won't have any local journal associated.
+      // This will happen in parallele while the current journal still being used.
+      ReplicatedJournal proxy = new ReplicatedJournal(this.journalID,
+                                                  null,
+                                                  null,
+                                                  replication);
+      
+      localJournal.copyTo(proxy, new Runnable()
+      {
+         public void run()
+         {
+            // This needs to be done right after the copy is finished
+            // But while the journal still locked on its final stage, 
+            // so after this point all the journal appends are going to be replicated
+            ReplicatedJournal.this.replicationManager = replication;
+         }
+      });
+   }
+
+   // Journal implementation ----------------------------------------
+   /**
     * @param id
     * @param recordType
     * @param record
@@ -601,7 +640,7 @@
    /* (non-Javadoc)
     * @see org.hornetq.core.journal.Journal#copyTo(org.hornetq.core.journal.Journal)
     */
-   public void copyTo(final Journal destJournal) throws Exception
+   public void copyTo(final Journal destJournal, Runnable aferCopy) throws Exception
    {
       // This would be a nonsense operation. Only the real journal can copyTo
       throw new IllegalStateException("Operation Not Implemeted!");

Modified: branches/Clebert_Sync/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java
===================================================================
--- branches/Clebert_Sync/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java	2009-10-28 22:09:58 UTC (rev 8159)
+++ branches/Clebert_Sync/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java	2009-10-28 22:58:47 UTC (rev 8160)
@@ -763,5 +763,12 @@
       {
       }
 
+      /* (non-Javadoc)
+       * @see org.hornetq.core.journal.Journal#copyTo(org.hornetq.core.journal.Journal, java.lang.Runnable)
+       */
+      public void copyTo(Journal destJournal, Runnable afterCopy) throws Exception
+      {
+      }
+
    }
 }

Modified: branches/Clebert_Sync/tests/src/org/hornetq/tests/unit/core/journal/impl/CopyJournalTest.java
===================================================================
--- branches/Clebert_Sync/tests/src/org/hornetq/tests/unit/core/journal/impl/CopyJournalTest.java	2009-10-28 22:09:58 UTC (rev 8159)
+++ branches/Clebert_Sync/tests/src/org/hornetq/tests/unit/core/journal/impl/CopyJournalTest.java	2009-10-28 22:58:47 UTC (rev 8160)
@@ -20,6 +20,7 @@
 import java.util.ArrayList;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.hornetq.core.config.impl.ConfigurationImpl;
@@ -102,6 +103,8 @@
       final Journal proxyJournal = (Journal)Proxy.newProxyInstance(this.getClass().getClassLoader(),
                                                                    new Class[] { Journal.class },
                                                                    handler);
+      
+      final AtomicBoolean copied = new AtomicBoolean(false);
 
       Thread copier = new Thread()
       {
@@ -110,7 +113,13 @@
          {
             try
             {
-               journal.copyTo(proxyJournal);
+               journal.copyTo(proxyJournal, new Runnable()
+               {
+                  public void run()
+                  {
+                     copied.set(true);
+                  }
+               });
             }
             catch (Exception e)
             {
@@ -149,6 +158,8 @@
       handler.unlock();
 
       copier.join();
+      
+      assertTrue(copied.get());
 
       stopJournal();
 

Modified: branches/Clebert_Sync/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java
===================================================================
--- branches/Clebert_Sync/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java	2009-10-28 22:09:58 UTC (rev 8159)
+++ branches/Clebert_Sync/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java	2009-10-28 22:58:47 UTC (rev 8160)
@@ -46,6 +46,7 @@
 import org.hornetq.core.postoffice.Binding;
 import org.hornetq.core.postoffice.PostOffice;
 import org.hornetq.core.remoting.spi.HornetQBuffer;
+import org.hornetq.core.replication.ReplicationManager;
 import org.hornetq.core.server.LargeServerMessage;
 import org.hornetq.core.server.MessageReference;
 import org.hornetq.core.server.Queue;
@@ -1177,6 +1178,13 @@
          
       }
 
+      /* (non-Javadoc)
+       * @see org.hornetq.core.persistence.StorageManager#initiateReplication(org.hornetq.core.replication.ReplicationManager)
+       */
+      public void initiateReplication(ReplicationManager replication) throws Exception
+      {
+      }
+
    }
 
    class FakeStoreFactory implements PagingStoreFactory



More information about the hornetq-commits mailing list