[hornetq-commits] JBoss hornetq SVN: r8159 - in branches/Clebert_Sync: src/main/org/hornetq/core/persistence/impl/journal and 2 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Wed Oct 28 18:09:59 EDT 2009


Author: clebert.suconic at jboss.com
Date: 2009-10-28 18:09:58 -0400 (Wed, 28 Oct 2009)
New Revision: 8159

Modified:
   branches/Clebert_Sync/src/main/org/hornetq/core/journal/impl/JournalCopier.java
   branches/Clebert_Sync/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
   branches/Clebert_Sync/src/main/org/hornetq/core/replication/impl/ReplicatedJournal.java
   branches/Clebert_Sync/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
   branches/Clebert_Sync/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java
Log:
some minor changes

Modified: branches/Clebert_Sync/src/main/org/hornetq/core/journal/impl/JournalCopier.java
===================================================================
--- branches/Clebert_Sync/src/main/org/hornetq/core/journal/impl/JournalCopier.java	2009-10-28 18:32:19 UTC (rev 8158)
+++ branches/Clebert_Sync/src/main/org/hornetq/core/journal/impl/JournalCopier.java	2009-10-28 22:09:58 UTC (rev 8159)
@@ -50,7 +50,7 @@
    private final Journal journalTo;
 
    /** Proxy mode means, everything will be copied over without any evaluations.
-    *  This is useful at the end of copying when everything needs to be copied. */
+    *  This is used at the end of copying when everything needs to be copied. */
    private boolean proxyMode = false;
 
    // Static --------------------------------------------------------

Modified: branches/Clebert_Sync/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- branches/Clebert_Sync/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java	2009-10-28 18:32:19 UTC (rev 8158)
+++ branches/Clebert_Sync/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java	2009-10-28 22:09:58 UTC (rev 8159)
@@ -129,9 +129,9 @@
 
    private final ReplicationManager replicator;
 
-   private final Journal messageJournal;
+   private final ReplicatedJournal messageJournal;
 
-   private final Journal bindingsJournal;
+   private final ReplicatedJournal bindingsJournal;
 
    private final SequentialFileFactory largeMessagesFactory;
 
@@ -193,14 +193,7 @@
                                               "bindings",
                                               1);
 
-      if (replicator != null)
-      {
-         this.bindingsJournal = new ReplicatedJournal((byte)0, localBindings, localBindings, replicator);
-      }
-      else
-      {
-         this.bindingsJournal = localBindings;
-      }
+      this.bindingsJournal = new ReplicatedJournal((byte)0, localBindings, localBindings, replicator);
 
       if (journalDir == null)
       {
@@ -261,14 +254,7 @@
                                              "hq",
                                              config.getJournalMaxAIO());
 
-      if (replicator != null)
-      {
-         this.messageJournal = new ReplicatedJournal((byte)1, localMessage, localMessage, replicator);
-      }
-      else
-      {
-         this.messageJournal = localMessage;
-      }
+      this.messageJournal = new ReplicatedJournal((byte)1, localMessage, localMessage, replicator);
 
       largeMessagesDirectory = config.getLargeMessagesDirectory();
 

Modified: branches/Clebert_Sync/src/main/org/hornetq/core/replication/impl/ReplicatedJournal.java
===================================================================
--- branches/Clebert_Sync/src/main/org/hornetq/core/replication/impl/ReplicatedJournal.java	2009-10-28 18:32:19 UTC (rev 8158)
+++ branches/Clebert_Sync/src/main/org/hornetq/core/replication/impl/ReplicatedJournal.java	2009-10-28 22:09:58 UTC (rev 8159)
@@ -46,7 +46,7 @@
 
    private static final boolean trace = log.isTraceEnabled();
 
-   private final ReplicationManager replicationManager;
+   private volatile ReplicationManager replicationManager;
 
    private final Journal localJournal;
 
@@ -54,6 +54,7 @@
 
    private final byte journalID;
 
+   /** The server is started with the backup */
    public ReplicatedJournal(final byte journaID,
                             final JournalLock journalLock,
                             final Journal localJournal,
@@ -66,6 +67,7 @@
       this.journalLock = journalLock;
    }
 
+   /** Used by the backup process, to send the backup over */
    public ReplicatedJournal(final byte journaID, final ReplicationManager replicationManager)
    {
       super();
@@ -115,7 +117,10 @@
       preAppend();
       try
       {
-         replicationManager.appendAddRecord(journalID, id, recordType, record);
+         if (replicationManager != null)
+         {
+            replicationManager.appendAddRecord(journalID, id, recordType, record);
+         }
          if (localJournal != null)
          {
             localJournal.appendAddRecord(id, recordType, record, sync);
@@ -160,7 +165,10 @@
       preAppend();
       try
       {
-         replicationManager.appendAddRecordTransactional(journalID, txID, id, recordType, record);
+         if (replicationManager != null)
+         {
+            replicationManager.appendAddRecordTransactional(journalID, txID, id, recordType, record);
+         }
          if (localJournal != null)
          {
             localJournal.appendAddRecordTransactional(txID, id, recordType, record);
@@ -188,7 +196,10 @@
       preAppend();
       try
       {
-         replicationManager.appendCommitRecord(journalID, txID);
+         if (replicationManager != null)
+         {
+            replicationManager.appendCommitRecord(journalID, txID);
+         }
          if (localJournal != null)
          {
             localJournal.appendCommitRecord(txID, sync);
@@ -216,7 +227,10 @@
       preAppend();
       try
       {
-         replicationManager.appendDeleteRecord(journalID, id);
+         if (replicationManager != null)
+         {
+            replicationManager.appendDeleteRecord(journalID, id);
+         }
          if (localJournal != null)
          {
             localJournal.appendDeleteRecord(id, sync);
@@ -257,7 +271,10 @@
       preAppend();
       try
       {
-         replicationManager.appendDeleteRecordTransactional(journalID, txID, id, record);
+         if (replicationManager != null)
+         {
+            replicationManager.appendDeleteRecordTransactional(journalID, txID, id, record);
+         }
          if (localJournal != null)
          {
             localJournal.appendDeleteRecordTransactional(txID, id, record);
@@ -285,7 +302,10 @@
       preAppend();
       try
       {
-         replicationManager.appendDeleteRecordTransactional(journalID, txID, id);
+         if (replicationManager != null)
+         {
+            replicationManager.appendDeleteRecordTransactional(journalID, txID, id);
+         }
          if (localJournal != null)
          {
             localJournal.appendDeleteRecordTransactional(txID, id);
@@ -326,7 +346,10 @@
       preAppend();
       try
       {
-         replicationManager.appendPrepareRecord(journalID, txID, transactionData);
+         if (replicationManager != null)
+         {
+            replicationManager.appendPrepareRecord(journalID, txID, transactionData);
+         }
          if (localJournal != null)
          {
             localJournal.appendPrepareRecord(txID, transactionData, sync);
@@ -354,7 +377,10 @@
       preAppend();
       try
       {
-         replicationManager.appendRollbackRecord(journalID, txID);
+         if (replicationManager != null)
+         {
+            replicationManager.appendRollbackRecord(journalID, txID);
+         }
          if (localJournal != null)
          {
             localJournal.appendRollbackRecord(txID, sync);
@@ -397,7 +423,10 @@
       preAppend();
       try
       {
-         replicationManager.appendUpdateRecord(journalID, id, recordType, record);
+         if (replicationManager != null)
+         {
+            replicationManager.appendUpdateRecord(journalID, id, recordType, record);
+         }
          if (localJournal != null)
          {
             localJournal.appendUpdateRecord(id, recordType, record, sync);
@@ -446,7 +475,10 @@
       preAppend();
       try
       {
-         replicationManager.appendUpdateRecordTransactional(journalID, txID, id, recordType, record);
+         if (replicationManager != null)
+         {
+            replicationManager.appendUpdateRecordTransactional(journalID, txID, id, recordType, record);
+         }
          if (localJournal != null)
          {
             localJournal.appendUpdateRecordTransactional(txID, id, recordType, record);
@@ -591,6 +623,10 @@
     */
    public void loadInternalOnly() throws Exception
    {
+      if (localJournal != null)
+      {
+         localJournal.loadInternalOnly();
+      }
    }
 
    // Package protected ---------------------------------------------

Modified: branches/Clebert_Sync/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
===================================================================
--- branches/Clebert_Sync/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java	2009-10-28 18:32:19 UTC (rev 8158)
+++ branches/Clebert_Sync/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java	2009-10-28 22:09:58 UTC (rev 8159)
@@ -317,6 +317,13 @@
    {
       connection = failoverManager.getConnection();
 
+      if (connection == null)
+      {
+         log.warn("Backup server MUST be started before live server. Initialisation will not proceed.");
+         throw new HornetQException(HornetQException.ILLEGAL_STATE,
+                                    "Backup server MUST be started before live server. Initialisation will not proceed.");
+      }
+
       long channelID = connection.generateChannelID();
 
       Channel mainChannel = connection.getChannel(1, -1, false);
@@ -357,15 +364,15 @@
    public void stop() throws Exception
    {
       enabled = false;
-      
+
       for (ReplicationContext ctx : activeContexts)
       {
          ctx.complete();
          ctx.flush();
       }
-      
+
       activeContexts.clear();
-      
+
       if (replicatingChannel != null)
       {
          replicatingChannel.close();

Modified: branches/Clebert_Sync/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java
===================================================================
--- branches/Clebert_Sync/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java	2009-10-28 18:32:19 UTC (rev 8158)
+++ branches/Clebert_Sync/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java	2009-10-28 22:09:58 UTC (rev 8159)
@@ -291,7 +291,6 @@
 
          Journal replicatedJournal = new ReplicatedJournal((byte)1, new FakeJournal(), new FakeJournal(), manager);
 
-         Thread.sleep(100);
          TestInterceptor.value.set(false);
 
          for (int i = 0; i < 500; i++)
@@ -320,24 +319,20 @@
       }
    }
 
-   /**
-    * @param manager
-    * @return
-    */
-   private void blockOnReplication(ReplicationManagerImpl manager) throws Exception
+   public void testNoServer() throws Exception
    {
-      final CountDownLatch latch = new CountDownLatch(1);
-      manager.afterReplicated(new Runnable()
+      FailoverManager failoverManager = createFailoverManager();
+
+      try
       {
-
-         public void run()
-         {
-            latch.countDown();
-         }
-
-      });
-
-      assertTrue(latch.await(30, TimeUnit.SECONDS));
+         ReplicationManagerImpl manager = new ReplicationManagerImpl(failoverManager, executor);
+         manager.start();
+         fail("Exception expected");
+      }
+      catch (HornetQException expected)
+      {
+         assertEquals(HornetQException.ILLEGAL_STATE , expected.getCode());
+      }
    }
 
    public void testNoActions() throws Exception
@@ -461,6 +456,26 @@
                                      interceptors);
    }
 
+   /**
+    * @param manager
+    * @return
+    */
+   private void blockOnReplication(ReplicationManagerImpl manager) throws Exception
+   {
+      final CountDownLatch latch = new CountDownLatch(1);
+      manager.afterReplicated(new Runnable()
+      {
+
+         public void run()
+         {
+            latch.countDown();
+         }
+
+      });
+
+      assertTrue(latch.await(30, TimeUnit.SECONDS));
+   }
+
    protected void tearDown() throws Exception
    {
 



More information about the hornetq-commits mailing list