[hornetq-commits] JBoss hornetq SVN: r8151 - in branches/Clebert_Sync: src/main/org/hornetq/core/journal/impl and 4 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Tue Oct 27 14:08:06 EDT 2009


Author: clebert.suconic at jboss.com
Date: 2009-10-27 14:08:05 -0400 (Tue, 27 Oct 2009)
New Revision: 8151

Added:
   branches/Clebert_Sync/tests/src/org/hornetq/tests/unit/core/journal/impl/CopyJournalTest.java
Modified:
   branches/Clebert_Sync/src/main/org/hornetq/core/journal/Journal.java
   branches/Clebert_Sync/src/main/org/hornetq/core/journal/impl/JournalCopier.java
   branches/Clebert_Sync/src/main/org/hornetq/core/journal/impl/JournalImpl.java
   branches/Clebert_Sync/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
   branches/Clebert_Sync/src/main/org/hornetq/core/replication/impl/ReplicatedJournal.java
   branches/Clebert_Sync/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java
   branches/Clebert_Sync/tests/src/org/hornetq/tests/unit/core/journal/impl/JournalImplTestBase.java
Log:
Backup changes

Modified: branches/Clebert_Sync/src/main/org/hornetq/core/journal/Journal.java
===================================================================
--- branches/Clebert_Sync/src/main/org/hornetq/core/journal/Journal.java	2009-10-27 16:16:44 UTC (rev 8150)
+++ branches/Clebert_Sync/src/main/org/hornetq/core/journal/Journal.java	2009-10-27 18:08:05 UTC (rev 8151)
@@ -78,6 +78,11 @@
    // Load
    
    long load(LoaderCallback reloadManager) throws Exception;
+   
+   /** Load internal data structures and not expose any data.
+    *  This is only useful if you're using the journal but not interested on the current data.
+    *  Useful in situations where the journal is being replicated, copied... etc. */
+   void loadInternalOnly() throws Exception;
 
 
    long load(List<RecordInfo> committedRecords, List<PreparedTransactionInfo> preparedTransactions, TransactionFailureCallback transactionFailure) throws Exception;

Modified: branches/Clebert_Sync/src/main/org/hornetq/core/journal/impl/JournalCopier.java
===================================================================
--- branches/Clebert_Sync/src/main/org/hornetq/core/journal/impl/JournalCopier.java	2009-10-27 16:16:44 UTC (rev 8150)
+++ branches/Clebert_Sync/src/main/org/hornetq/core/journal/impl/JournalCopier.java	2009-10-27 18:08:05 UTC (rev 8151)
@@ -23,7 +23,7 @@
 import org.hornetq.utils.DataConstants;
 
 /**
- * A JournalCopier
+ * This will read records 
  *
  * @author <mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
  *
@@ -35,6 +35,17 @@
 
    private static final Logger log = Logger.getLogger(JournalCopier.class);
 
+   
+   /** enable some trace at development. */
+   private static final boolean DEV_TRACE = true;
+   
+   private static final boolean isTraceEnabled = log.isTraceEnabled();
+   
+   private static void trace(String msg)
+   {
+      System.out.println("JournalCopier::" + msg);
+   }
+
    // Attributes ----------------------------------------------------
 
    private final Set<Long> pendingTransactions;
@@ -72,14 +83,29 @@
    {
       if (lookupRecord(info.id))
       {
+         if (DEV_TRACE)
+         {
+            trace("Backing add ID = " + info.id);
+         }
          journalTo.appendAddRecord(info.id, info.userRecordType, info.data, false);
       }
+      else
+      {
+         if (DEV_TRACE)
+         {
+            trace("Ignoring add ID = " + info.id);
+         }
+      }
    }
 
    public void onReadAddRecordTX(final long transactionID, final RecordInfo info) throws Exception
    {
       if (pendingTransactions.contains(transactionID))
       {
+         if (DEV_TRACE)
+         {
+            trace("Backing add TXID = " + transactionID + " id = " + info.id);
+         }
          journalTo.appendAddRecordTransactional(transactionID, info.id, info.userRecordType, info.data);
       }
       else
@@ -132,7 +158,7 @@
       if (pendingTransactions.contains(transactionID))
       {
          // Sanity check, this should never happen
-         log.warn("Inconsistency during compacting: RollbackRecord ID = " + transactionID +
+         log.warn("Inconsistency during copying: RollbackRecord ID = " + transactionID +
                   " for an already rolled back transaction during compacting");
       }
    }

Modified: branches/Clebert_Sync/src/main/org/hornetq/core/journal/impl/JournalImpl.java
===================================================================
--- branches/Clebert_Sync/src/main/org/hornetq/core/journal/impl/JournalImpl.java	2009-10-27 16:16:44 UTC (rev 8150)
+++ branches/Clebert_Sync/src/main/org/hornetq/core/journal/impl/JournalImpl.java	2009-10-27 18:08:05 UTC (rev 8151)
@@ -83,7 +83,7 @@
 
    private static final Logger log = Logger.getLogger(JournalImpl.class);
 
-   private static final boolean trace = false;
+   private static final boolean trace = log.isTraceEnabled();
 
    /** This is to be set to true at DEBUG & development only */
    private static final boolean LOAD_TRACE = false;
@@ -1358,7 +1358,36 @@
    {
       return fileFactory.getAlignment();
    }
+   
+   public synchronized void loadInternalOnly() throws Exception
+   {
+      LoaderCallback dummyLoader = new LoaderCallback()
+      {
 
+         public void failedTransaction(long transactionID, List<RecordInfo> records, List<RecordInfo> recordsToDelete)
+         {
+         }
+
+         public void updateRecord(RecordInfo info)
+         {
+         }
+
+         public void deleteRecord(long id)
+         {
+         }
+
+         public void addRecord(RecordInfo info)
+         {
+         }
+
+         public void addPreparedTransaction(PreparedTransactionInfo preparedTransaction)
+         {
+         }
+      };
+      
+      this.load(dummyLoader);
+   }
+
    /**
     * @see JournalImpl#load(LoaderCallback)
     */
@@ -1450,6 +1479,7 @@
       flushExecutor();
 
       // Wait the compactor and cleanup to finish case they are running
+      // This will also set the compactorRunning, as clean up and compact can't happen at the same time
       while (!compactorRunning.compareAndSet(false, true))
       {
          final CountDownLatch latch = new CountDownLatch(1);
@@ -1515,7 +1545,7 @@
             readJournalFile(fileFactory, file, copier);
          }
 
-         compactor.flush();
+         copier.flush();
 
       }
       finally

Modified: branches/Clebert_Sync/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- branches/Clebert_Sync/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java	2009-10-27 16:16:44 UTC (rev 8150)
+++ branches/Clebert_Sync/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java	2009-10-27 18:08:05 UTC (rev 8151)
@@ -1263,32 +1263,8 @@
     */
    public void loadInternalOnly() throws Exception
    {
-      LoaderCallback dummyLoader = new LoaderCallback()
-      {
-
-         public void failedTransaction(long transactionID, List<RecordInfo> records, List<RecordInfo> recordsToDelete)
-         {
-         }
-
-         public void updateRecord(RecordInfo info)
-         {
-         }
-
-         public void deleteRecord(long id)
-         {
-         }
-
-         public void addRecord(RecordInfo info)
-         {
-         }
-
-         public void addPreparedTransaction(PreparedTransactionInfo preparedTransaction)
-         {
-         }
-      };
-
-      bindingsJournal.load(dummyLoader);
-      messageJournal.load(dummyLoader);
+      bindingsJournal.loadInternalOnly();
+      messageJournal.loadInternalOnly();
    }
 
    // Public -----------------------------------------------------------------------------------

Modified: branches/Clebert_Sync/src/main/org/hornetq/core/replication/impl/ReplicatedJournal.java
===================================================================
--- branches/Clebert_Sync/src/main/org/hornetq/core/replication/impl/ReplicatedJournal.java	2009-10-27 16:16:44 UTC (rev 8150)
+++ branches/Clebert_Sync/src/main/org/hornetq/core/replication/impl/ReplicatedJournal.java	2009-10-27 18:08:05 UTC (rev 8151)
@@ -26,7 +26,6 @@
 import org.hornetq.core.persistence.impl.journal.JournalStorageManager;
 import org.hornetq.core.replication.ReplicationManager;
 
-
 /**
  * Used by the {@link JournalStorageManager} to replicate journal calls. 
  *
@@ -52,9 +51,7 @@
 
    private final byte journalID;
 
-   public ReplicatedJournal(final byte journaID,
-                                final Journal localJournal,
-                                final ReplicationManager replicationManager)
+   public ReplicatedJournal(final byte journaID, final Journal localJournal, final ReplicationManager replicationManager)
    {
       super();
       journalID = journaID;
@@ -62,9 +59,17 @@
       this.replicationManager = replicationManager;
    }
 
+   public ReplicatedJournal(final byte journaID, final ReplicationManager replicationManager)
+   {
+      super();
+      journalID = journaID;
+      localJournal = null;
+      this.replicationManager = replicationManager;
+   }
+
    // Static --------------------------------------------------------
-   
-   private static void trace(String message)
+
+   private static void trace(final String message)
    {
       log.trace(message);
    }
@@ -100,7 +105,10 @@
          trace("Append record id = " + id + " recordType = " + recordType);
       }
       replicationManager.appendAddRecord(journalID, id, recordType, record);
-      localJournal.appendAddRecord(id, recordType, record, sync);
+      if (localJournal != null)
+      {
+         localJournal.appendAddRecord(id, recordType, record, sync);
+      }
    }
 
    /**
@@ -134,7 +142,10 @@
          trace("Append record TXid = " + id + " recordType = " + recordType);
       }
       replicationManager.appendAddRecordTransactional(journalID, txID, id, recordType, record);
-      localJournal.appendAddRecordTransactional(txID, id, recordType, record);
+      if (localJournal != null)
+      {
+         localJournal.appendAddRecordTransactional(txID, id, recordType, record);
+      }
    }
 
    /**
@@ -150,7 +161,10 @@
          trace("AppendCommit " + txID);
       }
       replicationManager.appendCommitRecord(journalID, txID);
-      localJournal.appendCommitRecord(txID, sync);
+      if (localJournal != null)
+      {
+         localJournal.appendCommitRecord(txID, sync);
+      }
    }
 
    /**
@@ -166,7 +180,10 @@
          trace("AppendDelete " + id);
       }
       replicationManager.appendDeleteRecord(journalID, id);
-      localJournal.appendDeleteRecord(id, sync);
+      if (localJournal != null)
+      {
+         localJournal.appendDeleteRecord(id, sync);
+      }
    }
 
    /**
@@ -195,7 +212,10 @@
          trace("AppendDelete txID=" + txID + " id=" + id);
       }
       replicationManager.appendDeleteRecordTransactional(journalID, txID, id, record);
-      localJournal.appendDeleteRecordTransactional(txID, id, record);
+      if (localJournal != null)
+      {
+         localJournal.appendDeleteRecordTransactional(txID, id, record);
+      }
    }
 
    /**
@@ -211,7 +231,10 @@
          trace("AppendDelete (noencoding) txID=" + txID + " id=" + id);
       }
       replicationManager.appendDeleteRecordTransactional(journalID, txID, id);
-      localJournal.appendDeleteRecordTransactional(txID, id);
+      if (localJournal != null)
+      {
+         localJournal.appendDeleteRecordTransactional(txID, id);
+      }
    }
 
    /**
@@ -240,7 +263,10 @@
          trace("AppendPrepare txID=" + txID);
       }
       replicationManager.appendPrepareRecord(journalID, txID, transactionData);
-      localJournal.appendPrepareRecord(txID, transactionData, sync);
+      if (localJournal != null)
+      {
+         localJournal.appendPrepareRecord(txID, transactionData, sync);
+      }
    }
 
    /**
@@ -256,7 +282,10 @@
          trace("AppendRollback " + txID);
       }
       replicationManager.appendRollbackRecord(journalID, txID);
-      localJournal.appendRollbackRecord(txID, sync);
+      if (localJournal != null)
+      {
+         localJournal.appendRollbackRecord(txID, sync);
+      }
    }
 
    /**
@@ -287,7 +316,10 @@
          trace("AppendUpdateRecord id = " + id + " , recordType = " + recordType);
       }
       replicationManager.appendUpdateRecord(journalID, id, recordType, record);
-      localJournal.appendUpdateRecord(id, recordType, record, sync);
+      if (localJournal != null)
+      {
+         localJournal.appendUpdateRecord(id, recordType, record, sync);
+      }
    }
 
    /**
@@ -324,7 +356,10 @@
          trace("AppendUpdateRecord txid=" + txID + " id = " + id + " , recordType = " + recordType);
       }
       replicationManager.appendUpdateRecordTransactional(journalID, txID, id, recordType, record);
-      localJournal.appendUpdateRecordTransactional(txID, id, recordType, record);
+      if (localJournal != null)
+      {
+         localJournal.appendUpdateRecordTransactional(txID, id, recordType, record);
+      }
    }
 
    /**
@@ -339,7 +374,14 @@
                     final List<PreparedTransactionInfo> preparedTransactions,
                     final TransactionFailureCallback transactionFailure) throws Exception
    {
-      return localJournal.load(committedRecords, preparedTransactions, transactionFailure);
+      if (localJournal != null)
+      {
+         return localJournal.load(committedRecords, preparedTransactions, transactionFailure);
+      }
+      else
+      {
+         return -1;
+      }
    }
 
    /**
@@ -350,7 +392,14 @@
     */
    public long load(final LoaderCallback reloadManager) throws Exception
    {
-      return localJournal.load(reloadManager);
+      if (localJournal != null)
+      {
+         return localJournal.load(reloadManager);
+      }
+      else
+      {
+         return -1;
+      }
    }
 
    /**
@@ -360,7 +409,10 @@
     */
    public void perfBlast(final int pages) throws Exception
    {
-      localJournal.perfBlast(pages);
+      if (localJournal != null)
+      {
+         localJournal.perfBlast(pages);
+      }
    }
 
    /**
@@ -369,7 +421,10 @@
     */
    public void start() throws Exception
    {
-      localJournal.start();
+      if (localJournal != null)
+      {
+         localJournal.start();
+      }
    }
 
    /**
@@ -378,7 +433,10 @@
     */
    public void stop() throws Exception
    {
-      localJournal.stop();
+      if (localJournal != null)
+      {
+         localJournal.stop();
+      }
    }
 
    /* (non-Javadoc)
@@ -386,7 +444,14 @@
     */
    public int getAlignment() throws Exception
    {
-      return localJournal.getAlignment();
+      if (localJournal != null)
+      {
+         return localJournal.getAlignment();
+      }
+      else
+      {
+         return 1;
+      }
    }
 
    /* (non-Javadoc)
@@ -394,13 +459,20 @@
     */
    public boolean isStarted()
    {
-      return localJournal.isStarted();
+      if (localJournal != null)
+      {
+         return localJournal.isStarted();
+      }
+      else
+      {
+         return true;
+      }
    }
 
    /* (non-Javadoc)
     * @see org.hornetq.core.journal.Journal#copyTo(org.hornetq.core.journal.Journal)
     */
-   public void copyTo(Journal destJournal) throws Exception
+   public void copyTo(final Journal destJournal) throws Exception
    {
       // This would be a nonsense operation. Only the real journal can copyTo
       throw new IllegalStateException("Operation Not Implemeted!");
@@ -411,9 +483,19 @@
     */
    public void flush() throws Exception
    {
-      localJournal.flush();
+      if (localJournal != null)
+      {
+         localJournal.flush();
+      }
    }
 
+   /* (non-Javadoc)
+    * @see org.hornetq.core.journal.Journal#loadInternalOnly()
+    */
+   public void loadInternalOnly() throws Exception
+   {
+   }
+
    // Package protected ---------------------------------------------
 
    // Protected -----------------------------------------------------

Modified: branches/Clebert_Sync/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java
===================================================================
--- branches/Clebert_Sync/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java	2009-10-27 16:16:44 UTC (rev 8150)
+++ branches/Clebert_Sync/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java	2009-10-27 18:08:05 UTC (rev 8151)
@@ -713,5 +713,12 @@
       {
       }
 
+      /* (non-Javadoc)
+       * @see org.hornetq.core.journal.Journal#loadInternalOnly()
+       */
+      public void loadInternalOnly() throws Exception
+      {
+      }
+
    }
 }

Added: branches/Clebert_Sync/tests/src/org/hornetq/tests/unit/core/journal/impl/CopyJournalTest.java
===================================================================
--- branches/Clebert_Sync/tests/src/org/hornetq/tests/unit/core/journal/impl/CopyJournalTest.java	                        (rev 0)
+++ branches/Clebert_Sync/tests/src/org/hornetq/tests/unit/core/journal/impl/CopyJournalTest.java	2009-10-27 18:08:05 UTC (rev 8151)
@@ -0,0 +1,109 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.unit.core.journal.impl;
+
+import java.io.File;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.hornetq.core.config.impl.ConfigurationImpl;
+import org.hornetq.core.journal.Journal;
+import org.hornetq.core.journal.SequentialFileFactory;
+import org.hornetq.core.journal.impl.AIOSequentialFileFactory;
+import org.hornetq.core.journal.impl.JournalImpl;
+import org.hornetq.core.journal.impl.NIOSequentialFileFactory;
+
+/**
+ * A CopyJournalTest
+ *
+ * @author <mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public class CopyJournalTest extends JournalImplTestBase
+{
+
+   // Constants -----------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+
+   final AtomicInteger sequence = new AtomicInteger(0);
+   
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   // Public --------------------------------------------------------
+   
+   public void testSimpleCopy() throws Exception
+   {
+      setup(10, 10 * 1024, true);
+      createJournal();
+      startJournal();
+      load();
+      
+      
+
+      for (int i = 0 ; i < 10; i++)
+      {
+         addWithSize(1024, sequence.incrementAndGet(), sequence.incrementAndGet(), sequence.incrementAndGet(), sequence.incrementAndGet());
+      }
+      addTx(sequence.incrementAndGet(), sequence.incrementAndGet(), sequence.incrementAndGet(), sequence.incrementAndGet());
+      
+      File destDir = new File(getTestDir()+"/dest");
+      
+      destDir.mkdirs();
+      
+      SequentialFileFactory nioFactory = new NIOSequentialFileFactory(destDir.getAbsolutePath());
+      
+      Journal destJournal = new JournalImpl(10 * 1024, 2, 0, 0, nioFactory, filePrefix, fileExtension, 1);
+      destJournal.start();
+      destJournal.loadInternalOnly();
+      
+      journal.copyTo(destJournal);
+      
+      journal.flush();
+      
+      destJournal.flush();
+      
+      
+      
+      System.exit(1);
+   }
+
+   @Override
+   protected SequentialFileFactory getFileFactory() throws Exception
+   {
+      File file = new File(getTestDir());
+
+      deleteDirectory(file);
+
+      file.mkdir();
+
+      return new AIOSequentialFileFactory(getTestDir(),
+                                          ConfigurationImpl.DEFAULT_JOURNAL_AIO_BUFFER_SIZE,
+                                          1000000,
+                                          true,
+                                          false      
+      );
+   }
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+
+}

Modified: branches/Clebert_Sync/tests/src/org/hornetq/tests/unit/core/journal/impl/JournalImplTestBase.java
===================================================================
--- branches/Clebert_Sync/tests/src/org/hornetq/tests/unit/core/journal/impl/JournalImplTestBase.java	2009-10-27 16:16:44 UTC (rev 8150)
+++ branches/Clebert_Sync/tests/src/org/hornetq/tests/unit/core/journal/impl/JournalImplTestBase.java	2009-10-27 18:08:05 UTC (rev 8151)
@@ -13,6 +13,7 @@
 
 package org.hornetq.tests.unit.core.journal.impl;
 
+import java.io.File;
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.LinkedHashMap;
@@ -26,6 +27,7 @@
 import org.hornetq.core.journal.RecordInfo;
 import org.hornetq.core.journal.SequentialFileFactory;
 import org.hornetq.core.journal.TestableJournal;
+import org.hornetq.core.journal.impl.AIOSequentialFileFactory;
 import org.hornetq.core.journal.impl.JournalImpl;
 import org.hornetq.core.logging.Logger;
 import org.hornetq.tests.util.UnitTestCase;
@@ -465,10 +467,13 @@
    protected void printJournalLists(final List<RecordInfo> expected, final List<RecordInfo> actual)
    {
       System.out.println("***********************************************");
-      System.out.println("Expected list:");
-      for (RecordInfo info : expected)
+      if (expected != null)
       {
-         System.out.println("Record " + info.id + " isUpdate = " + info.isUpdate);
+         System.out.println("Expected list:");
+         for (RecordInfo info : expected)
+         {
+            System.out.println("Record " + info.id + " isUpdate = " + info.isUpdate);
+         }
       }
       if (actual != null)
       {



More information about the hornetq-commits mailing list