[hornetq-commits] JBoss hornetq SVN: r11055 - in branches/HORNETQ-720_Replication: hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat and 5 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Wed Jul 27 13:27:23 EDT 2011


Author: borges
Date: 2011-07-27 13:27:23 -0400 (Wed, 27 Jul 2011)
New Revision: 11055

Added:
   branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/ReplicationJournalFile.java
Modified:
   branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
   branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/ReplicationManager.java
   branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
   branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java
   branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/Journal.java
   branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/TestableJournal.java
   branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/AbstractSequentialFile.java
   branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/JournalFilesRepository.java
   branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/JournalImpl.java
Log:
HORNETQ-720 Some replication synchronization code (turned off as it does not work).

Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java	2011-07-27 17:25:49 UTC (rev 11054)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java	2011-07-27 17:27:23 UTC (rev 11055)
@@ -14,6 +14,7 @@
 package org.hornetq.core.persistence.impl.journal;
 
 import java.io.File;
+import java.io.IOException;
 import java.io.PrintStream;
 import java.nio.ByteBuffer;
 import java.security.AccessController;
@@ -157,6 +158,11 @@
 
    private ReplicationManager replicator;
 
+   public enum JournalContent
+   {
+      MESSAGES, BINDINGS;
+   }
+
    private Journal messageJournal;
 
    private Journal bindingsJournal;
@@ -321,17 +327,76 @@
       return replicator != null;
    }
 
-   public void setReplicator(ReplicationManager replicationManager)
+   /**
+    * XXX FIXME Method ignores the synchronization of LargeMessages and Paging.
+    * <p>
+    * XXX A second version improvement would be to allow new operations to be sent to the backup,
+    * while we synchronize the existing logs.
+    * @param replicationManager
+    * @throws HornetQException
+    */
+   public void setReplicator(ReplicationManager replicationManager) throws Exception
    {
       assert replicationManager != null;
       replicator = replicationManager;
-      Journal localMessageJournal = messageJournal;
-      Journal localBindingsJournal = bindingsJournal;
+
+      if (!(messageJournal instanceof JournalImpl) || !(bindingsJournal instanceof JournalImpl))
+      {
+         throw new HornetQException(HornetQException.INTERNAL_ERROR,
+                                    "journals here are not JournalImpl. You can't set a replicator!");
+      }
+      JournalImpl localMessageJournal = (JournalImpl)messageJournal;
+      JournalImpl localBindingsJournal = (JournalImpl)bindingsJournal;
+      if (false)
+      {
+      localMessageJournal.writeLock();
+      localBindingsJournal.writeLock();
+
+      JournalFile[] messageFiles = prepateJournalForCopy(localMessageJournal);
+      JournalFile[] bindingsFiles = prepateJournalForCopy(localBindingsJournal);
+      localMessageJournal.writeUnlock();
+      localBindingsJournal.writeUnlock();
+
+      sendJournalFile(messageFiles, JournalContent.MESSAGES);
+      sendJournalFile(bindingsFiles, JournalContent.BINDINGS);
+      }
+      // XXX NEED to take a global lock on the StorageManager.
       bindingsJournal = new ReplicatedJournal(((byte)0), localBindingsJournal, replicator);
       messageJournal = new ReplicatedJournal((byte)1, localMessageJournal, replicator);
-      // XXX HORNETQ-720 obviously missing here is the synchronization step.
    }
 
+   /**
+    * Send an entire journal file to a replicating server (a backup server that is).
+    * @param jf
+    * @param replicator2
+    * @throws IOException
+    * @throws HornetQException
+    */
+   private void sendJournalFile(JournalFile[] journalFiles, JournalContent type) throws IOException, HornetQException
+   {
+      for (JournalFile jf : journalFiles)
+      {
+         replicator.sendJournalFile(jf, type);
+         jf.setCanReclaim(true);
+      }
+   }
+
+   private JournalFile[] prepateJournalForCopy(JournalImpl journal) throws Exception
+   {
+      journal.setAutoReclaim(false);
+      /*
+       * need to check whether it is safe to proceed if compacting is running (specially at the end
+       * of it)
+       */
+      journal.forceMoveNextFile();
+      JournalFile[] datafiles = journal.getDataFiles();
+      for (JournalFile jf : datafiles)
+      {
+         jf.setCanReclaim(false);
+      }
+      return datafiles;
+   }
+
    public void waitOnOperations() throws Exception
    {
       if (!started)

Added: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/ReplicationJournalFile.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/ReplicationJournalFile.java	                        (rev 0)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/ReplicationJournalFile.java	2011-07-27 17:27:23 UTC (rev 11055)
@@ -0,0 +1,29 @@
+package org.hornetq.core.protocol.core.impl.wireformat;
+
+import org.hornetq.core.persistence.impl.journal.JournalStorageManager.JournalContent;
+import org.hornetq.core.protocol.core.impl.PacketImpl;
+
+/**
+ * Used to copy JournalFile data over to the backup during synchronization.
+ */
+public final class ReplicationJournalFile extends PacketImpl
+{
+
+   private byte[] data;
+   private int dataSize;
+   private JournalContent journalType;
+
+   public ReplicationJournalFile()
+   {
+      super(REPLICATION_SYNC);
+   }
+
+   public ReplicationJournalFile(int size, byte[] data, JournalContent content)
+   {
+      this();
+      this.dataSize = size;
+      this.data = data;
+      this.journalType = content;
+   }
+
+}

Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/ReplicationManager.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/ReplicationManager.java	2011-07-27 17:25:49 UTC (rev 11054)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/ReplicationManager.java	2011-07-27 17:27:23 UTC (rev 11055)
@@ -13,20 +13,23 @@
 
 package org.hornetq.core.replication;
 
+import java.io.IOException;
 import java.util.Set;
 
 import org.hornetq.api.core.HornetQException;
 import org.hornetq.api.core.SimpleString;
 import org.hornetq.core.journal.EncodingSupport;
 import org.hornetq.core.journal.JournalLoadInformation;
+import org.hornetq.core.journal.impl.JournalFile;
 import org.hornetq.core.paging.PagedMessage;
 import org.hornetq.core.persistence.OperationContext;
 import org.hornetq.core.persistence.impl.journal.JournalStorageManager;
+import org.hornetq.core.persistence.impl.journal.JournalStorageManager.JournalContent;
 import org.hornetq.core.server.HornetQComponent;
 
 /**
  * Used by the {@link JournalStorageManager} to update the replicated journal.
- * 
+ *
  * @author <mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
  */
 public interface ReplicationManager extends HornetQComponent
@@ -84,4 +87,10 @@
     */
    void compareJournals(JournalLoadInformation[] journalInfo) throws HornetQException;
 
+   /**
+    * Sends the whole content of the file to be duplicated.
+    * @throws HornetQException
+    */
+   void sendJournalFile(JournalFile jf, JournalContent type) throws IOException, HornetQException;
+
 }

Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java	2011-07-27 17:25:49 UTC (rev 11054)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java	2011-07-27 17:27:23 UTC (rev 11055)
@@ -13,6 +13,8 @@
 
 package org.hornetq.core.replication.impl;
 
+import java.io.FileInputStream;
+import java.io.IOException;
 import java.util.LinkedHashSet;
 import java.util.Queue;
 import java.util.Set;
@@ -24,9 +26,11 @@
 import org.hornetq.api.core.client.SessionFailureListener;
 import org.hornetq.core.journal.EncodingSupport;
 import org.hornetq.core.journal.JournalLoadInformation;
+import org.hornetq.core.journal.impl.JournalFile;
 import org.hornetq.core.logging.Logger;
 import org.hornetq.core.paging.PagedMessage;
 import org.hornetq.core.persistence.OperationContext;
+import org.hornetq.core.persistence.impl.journal.JournalStorageManager.JournalContent;
 import org.hornetq.core.persistence.impl.journal.OperationContextImpl;
 import org.hornetq.core.protocol.core.Channel;
 import org.hornetq.core.protocol.core.ChannelHandler;
@@ -40,6 +44,7 @@
 import org.hornetq.core.protocol.core.impl.wireformat.ReplicationCompareDataMessage;
 import org.hornetq.core.protocol.core.impl.wireformat.ReplicationDeleteMessage;
 import org.hornetq.core.protocol.core.impl.wireformat.ReplicationDeleteTXMessage;
+import org.hornetq.core.protocol.core.impl.wireformat.ReplicationJournalFile;
 import org.hornetq.core.protocol.core.impl.wireformat.ReplicationLargeMessageBeingMessage;
 import org.hornetq.core.protocol.core.impl.wireformat.ReplicationLargeMessageWriteMessage;
 import org.hornetq.core.protocol.core.impl.wireformat.ReplicationLargemessageEndMessage;
@@ -463,8 +468,10 @@
 
    private class ResponseHandler implements ChannelHandler
    {
-      /* (non-Javadoc)
-       * @see org.hornetq.core.remoting.ChannelHandler#handlePacket(org.hornetq.core.remoting.Packet)
+      /*
+       * (non-Javadoc)
+       * @see
+       * org.hornetq.core.remoting.ChannelHandler#handlePacket(org.hornetq.core.remoting.Packet)
        */
       public void handlePacket(final Packet packet)
       {
@@ -496,4 +503,19 @@
 
    }
 
+   @Override
+   public void sendJournalFile(JournalFile jf, JournalContent content) throws IOException, HornetQException
+   {
+      FileInputStream file = new FileInputStream(jf.getFile().getFileName());
+      byte[] data = new byte[1 << 17]; // about 130 kB
+      while (true)
+      {
+         int bytesRead = file.read(data);
+         if (bytesRead == -1)
+            break;
+         replicatingChannel.sendBlocking(new ReplicationJournalFile(bytesRead, data, content));
+      }
+      throw new UnsupportedOperationException();
+   }
+
 }

Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java	2011-07-27 17:25:49 UTC (rev 11054)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java	2011-07-27 17:27:23 UTC (rev 11055)
@@ -2003,10 +2003,12 @@
    public void addHaBackup(CoreRemotingConnection rc) throws Exception
    {
       if (!(storageManager instanceof JournalStorageManager))
-         return;
+      {
+         throw new HornetQException(HornetQException.INTERNAL_ERROR, "unknown implementation of JournalStorageManager!");
+      }
+
       JournalStorageManager journalStorageManager = (JournalStorageManager)storageManager;
 
-
       replicationManager = new ReplicationManagerImpl(rc, executorFactory);
       replicationManager.start();
 

Modified: branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/Journal.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/Journal.java	2011-07-27 17:25:49 UTC (rev 11054)
+++ branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/Journal.java	2011-07-27 17:27:23 UTC (rev 11055)
@@ -18,11 +18,11 @@
 import org.hornetq.core.server.HornetQComponent;
 
 /**
- * 
+ *
  * Most methods on the journal provide a blocking version where you select the sync mode and a non blocking mode where you pass a completion callback as a parameter.
- * 
+ *
  * Notice also that even on the callback methods it's possible to pass the sync mode. That will only make sense on the NIO operations.
- * 
+ *
  * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
  * @author <a href="mailto:clebert.suconic at jboss.com">Clebert Suconic</a>
  *
@@ -84,13 +84,13 @@
     */
    void appendCommitRecord(long txID, boolean sync, IOCompletion callback, boolean lineUpContext) throws Exception;
 
-   /** 
-    * 
-    * <p>If the system crashed after a prepare was called, it should store information that is required to bring the transaction 
+   /**
+    *
+    * <p>If the system crashed after a prepare was called, it should store information that is required to bring the transaction
     *     back to a state it could be committed. </p>
-    * 
+    *
     * <p> transactionData allows you to store any other supporting user-data related to the transaction</p>
-    * 
+    *
     * @param txID
     * @param transactionData - extra user data for the prepare
     * @throws Exception
@@ -115,7 +115,7 @@
     *  This is only useful if you're using the journal but not interested on the current data.
     *  Useful in situations where the journal is being replicated, copied... etc. */
    JournalLoadInformation loadInternalOnly() throws Exception;
-   
+
    void lineUpContex(IOCompletion callback);
 
    JournalLoadInformation load(List<RecordInfo> committedRecords,

Modified: branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/TestableJournal.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/TestableJournal.java	2011-07-27 17:25:49 UTC (rev 11054)
+++ branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/TestableJournal.java	2011-07-27 17:27:23 UTC (rev 11055)
@@ -51,12 +51,9 @@
 
    void setAutoReclaim(boolean autoReclaim);
 
-   boolean isAutoReclaim();
-
    void testCompact() throws Exception;
    
    JournalFile getCurrentFile();
-   
 
    /** This method is called automatically when a new file is opened.
     * @return true if it needs to re-check due to cleanup or other factors  */

Modified: branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/AbstractSequentialFile.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/AbstractSequentialFile.java	2011-07-27 17:25:49 UTC (rev 11054)
+++ branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/AbstractSequentialFile.java	2011-07-27 17:27:23 UTC (rev 11055)
@@ -104,16 +104,16 @@
 
       file.delete();
    }
-   
+
    public void copyTo(SequentialFile newFileName) throws Exception
    {
       log.debug("Copying "  + this + " as " + newFileName);
       newFileName.open();
       this.open();
-      
-      
+
+
       ByteBuffer buffer = ByteBuffer.allocate(10 * 1024);
-      
+
       for (;;)
       {
          buffer.rewind();

Modified: branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/JournalFilesRepository.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/JournalFilesRepository.java	2011-07-27 17:25:49 UTC (rev 11054)
+++ branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/JournalFilesRepository.java	2011-07-27 17:27:23 UTC (rev 11055)
@@ -321,10 +321,10 @@
       return openedFiles.size();
    }
 
-   /** 
+   /**
     * <p>This method will instantly return the opened file, and schedule opening and reclaiming.</p>
     * <p>In case there are no cached opened files, this method will block until the file was opened,
-    * what would happen only if the system is under heavy load by another system (like a backup system, or a DB sharing the same box as HornetQ).</p> 
+    * what would happen only if the system is under heavy load by another system (like a backup system, or a DB sharing the same box as HornetQ).</p>
     * */
    public JournalFile openFile() throws InterruptedException
    {
@@ -377,8 +377,8 @@
       return nextFile;
    }
 
-   /** 
-    * 
+   /**
+    *
     * Open a file and place it into the openedFiles queue
     * */
    public void pushOpenedFile() throws Exception

Modified: branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/JournalImpl.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/JournalImpl.java	2011-07-27 17:25:49 UTC (rev 11054)
+++ branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/JournalImpl.java	2011-07-27 17:27:23 UTC (rev 11055)
@@ -65,15 +65,11 @@
 import org.hornetq.utils.DataConstants;
 
 /**
- *
- * <p>A circular log implementation.</p
- *
- * <p>Look at {@link JournalImpl#load(LoaderCallback)} for the file layout
- *
+ * A circular log implementation.
+ * <p>
+ * Look at {@link JournalImpl#load(LoaderCallback)} for the file layout
  * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- *
  * @author <a href="mailto:clebert.suconic at jboss.com">Clebert Suconic</a>
- *
  */
 public class JournalImpl implements TestableJournal, JournalRecordProvider
 {
@@ -1824,7 +1820,7 @@
 
             compactor = null;
          }
-         autoReclaim = previousReclaimValue;
+         setAutoReclaim(previousReclaimValue);
       }
 
    }
@@ -2352,11 +2348,6 @@
       this.autoReclaim = autoReclaim;
    }
 
-   public boolean isAutoReclaim()
-   {
-      return autoReclaim;
-   }
-
    public String debug() throws Exception
    {
       reclaimer.scan(getDataFiles());
@@ -3266,4 +3257,13 @@
       }
    }
 
+   public void writeLock()
+   {
+      journalLock.writeLock().lock();
+   }
+
+   public void writeUnlock()
+   {
+      journalLock.writeLock().unlock();
+   }
 }



More information about the hornetq-commits mailing list