[hornetq-commits] JBoss hornetq SVN: r11068 - in branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core: protocol/core/impl and 3 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Fri Jul 29 13:24:59 EDT 2011


Author: borges
Date: 2011-07-29 13:24:59 -0400 (Fri, 29 Jul 2011)
New Revision: 11068

Added:
   branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/ReplicationFileIdMessage.java
   branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/ReplicationJournalFileMessage.java
Removed:
   branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/ReplicationJournalFile.java
Modified:
   branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
   branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/PacketDecoder.java
   branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/PacketImpl.java
   branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/ReplicationManager.java
   branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
   branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
Log:
HORNETQ-720 Send fileID values used by live to backup

Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java	2011-07-29 04:25:11 UTC (rev 11067)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java	2011-07-29 17:24:59 UTC (rev 11068)
@@ -160,7 +160,22 @@
 
    public enum JournalContent
    {
-      MESSAGES, BINDINGS;
+      BINDINGS((byte)0),      MESSAGES((byte)1);
+
+      public final byte typeByte;
+
+      JournalContent(byte b){
+         typeByte = b;
+      }
+
+      public static JournalContent getType(byte type)
+      {
+         if (MESSAGES.typeByte == type)
+            return MESSAGES;
+         if (BINDINGS.typeByte == type)
+            return BINDINGS;
+         throw new RuntimeException("invalid byte");
+      }
    }
 
    private Journal messageJournal;
@@ -337,6 +352,10 @@
     */
    public void setReplicator(ReplicationManager replicationManager) throws Exception
    {
+      if (!started)
+      {
+         throw new IllegalStateException("must be started...");
+      }
       assert replicationManager != null;
       replicator = replicationManager;
 
@@ -345,24 +364,28 @@
          throw new HornetQException(HornetQException.INTERNAL_ERROR,
                                     "journals here are not JournalImpl. You can't set a replicator!");
       }
-      JournalImpl localMessageJournal = (JournalImpl)messageJournal;
-      JournalImpl localBindingsJournal = (JournalImpl)bindingsJournal;
-      if (false)
-      {
+      // XXX NEED to take a global lock on the StorageManager.
+
+      final JournalImpl localMessageJournal = (JournalImpl)messageJournal;
+      final JournalImpl localBindingsJournal = (JournalImpl)bindingsJournal;
+
       localMessageJournal.writeLock();
       localBindingsJournal.writeLock();
 
-      JournalFile[] messageFiles = prepateJournalForCopy(localMessageJournal);
-      JournalFile[] bindingsFiles = prepateJournalForCopy(localBindingsJournal);
+      JournalFile[] messageFiles = prepareJournalForCopy(localMessageJournal, JournalContent.MESSAGES);
+      JournalFile[] bindingsFiles = prepareJournalForCopy(localBindingsJournal, JournalContent.BINDINGS);
+
       localMessageJournal.writeUnlock();
       localBindingsJournal.writeUnlock();
 
+
+      bindingsJournal = new ReplicatedJournal(((byte)0), localBindingsJournal, replicator);
+      messageJournal = new ReplicatedJournal((byte)1, localMessageJournal, replicator);
+
       sendJournalFile(messageFiles, JournalContent.MESSAGES);
       sendJournalFile(bindingsFiles, JournalContent.BINDINGS);
-      }
-      // XXX NEED to take a global lock on the StorageManager.
-      bindingsJournal = new ReplicatedJournal(((byte)0), localBindingsJournal, replicator);
-      messageJournal = new ReplicatedJournal((byte)1, localMessageJournal, replicator);
+
+      // SEND "SYNC_DONE" msg to backup telling it can become operational.
    }
 
    /**
@@ -381,12 +404,12 @@
       }
    }
 
-   private JournalFile[] prepateJournalForCopy(JournalImpl journal) throws Exception
+   private JournalFile[] prepareJournalForCopy(JournalImpl journal, JournalContent contentType) throws Exception
    {
       journal.setAutoReclaim(false);
       /*
-       * need to check whether it is safe to proceed if compacting is running (specially at the end
-       * of it)
+       * XXX need to check whether it is safe to proceed if compacting is running (specially at the
+       * end of it)
        */
       journal.forceMoveNextFile();
       JournalFile[] datafiles = journal.getDataFiles();
@@ -394,6 +417,7 @@
       {
          jf.setCanReclaim(false);
       }
+      replicator.reserveFileIds(datafiles, contentType);
       return datafiles;
    }
 

Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/PacketDecoder.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/PacketDecoder.java	2011-07-29 04:25:11 UTC (rev 11067)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/PacketDecoder.java	2011-07-29 17:24:59 UTC (rev 11068)
@@ -106,6 +106,7 @@
 import org.hornetq.core.protocol.core.impl.wireformat.ReplicationCompareDataMessage;
 import org.hornetq.core.protocol.core.impl.wireformat.ReplicationDeleteMessage;
 import org.hornetq.core.protocol.core.impl.wireformat.ReplicationDeleteTXMessage;
+import org.hornetq.core.protocol.core.impl.wireformat.ReplicationFileIdMessage;
 import org.hornetq.core.protocol.core.impl.wireformat.ReplicationLargeMessageBeingMessage;
 import org.hornetq.core.protocol.core.impl.wireformat.ReplicationLargeMessageWriteMessage;
 import org.hornetq.core.protocol.core.impl.wireformat.ReplicationLargemessageEndMessage;
@@ -531,6 +532,11 @@
             packet = new HaBackupRegistrationMessage();
             break;
          }
+         case PacketImpl.REPLICATION_FILE_ID:
+         {
+            packet = new ReplicationFileIdMessage();
+            break;
+         }
          default:
          {
             throw new IllegalArgumentException("Invalid type: " + packetType);

Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/PacketImpl.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/PacketImpl.java	2011-07-29 04:25:11 UTC (rev 11067)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/PacketImpl.java	2011-07-29 17:24:59 UTC (rev 11068)
@@ -196,6 +196,8 @@
 
    public static final byte HA_BACKUP_REGISTRATION = 113;
 
+   public static final byte REPLICATION_FILE_ID = 120;
+
    // Static --------------------------------------------------------
 
    public PacketImpl(final byte type)

Added: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/ReplicationFileIdMessage.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/ReplicationFileIdMessage.java	                        (rev 0)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/ReplicationFileIdMessage.java	2011-07-29 17:24:59 UTC (rev 11068)
@@ -0,0 +1,65 @@
+package org.hornetq.core.protocol.core.impl.wireformat;
+
+import org.hornetq.api.core.HornetQBuffer;
+import org.hornetq.core.journal.impl.JournalFile;
+import org.hornetq.core.persistence.impl.journal.JournalStorageManager.JournalContent;
+import org.hornetq.core.protocol.core.impl.PacketImpl;
+
+/**
+ * Send all fileIDs used in the live server to the backup.
+ */
+public class ReplicationFileIdMessage extends PacketImpl
+{
+
+   private long[] ids;
+   private JournalContent journalType;
+
+   public ReplicationFileIdMessage()
+   {
+      super(REPLICATION_FILE_ID);
+   }
+
+   public ReplicationFileIdMessage(JournalFile[] datafiles, JournalContent contentType)
+   {
+      this();
+      ids = new long[datafiles.length];
+      for (int i = 0; i < datafiles.length; i++)
+      {
+         ids[i] = datafiles[i].getFileID();
+      }
+      journalType = contentType;
+   }
+
+   @Override
+   public void encodeRest(final HornetQBuffer buffer)
+   {
+      buffer.writeByte(journalType.typeByte);
+      buffer.writeInt(ids.length);
+      for (long id : ids)
+      {
+         buffer.writeLong(id);
+      }
+   }
+
+   @Override
+   public void decodeRest(final HornetQBuffer buffer)
+   {
+      journalType = JournalContent.getType(buffer.readByte());
+      int length = buffer.readInt();
+      ids = new long[length];
+      for (int i = 0; i < length; i++)
+      {
+         ids[i] = buffer.readLong();
+      }
+   }
+
+   public JournalContent getJournalContentType()
+   {
+      return journalType;
+   }
+
+   public long[] getFileIds()
+   {
+      return ids;
+   }
+}

Deleted: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/ReplicationJournalFile.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/ReplicationJournalFile.java	2011-07-29 04:25:11 UTC (rev 11067)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/ReplicationJournalFile.java	2011-07-29 17:24:59 UTC (rev 11068)
@@ -1,29 +0,0 @@
-package org.hornetq.core.protocol.core.impl.wireformat;
-
-import org.hornetq.core.persistence.impl.journal.JournalStorageManager.JournalContent;
-import org.hornetq.core.protocol.core.impl.PacketImpl;
-
-/**
- * Used to copy JournalFile data over to the backup during synchronization.
- */
-public final class ReplicationJournalFile extends PacketImpl
-{
-
-   private byte[] data;
-   private int dataSize;
-   private JournalContent journalType;
-
-   public ReplicationJournalFile()
-   {
-      super(REPLICATION_SYNC);
-   }
-
-   public ReplicationJournalFile(int size, byte[] data, JournalContent content)
-   {
-      this();
-      this.dataSize = size;
-      this.data = data;
-      this.journalType = content;
-   }
-
-}

Added: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/ReplicationJournalFileMessage.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/ReplicationJournalFileMessage.java	                        (rev 0)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/ReplicationJournalFileMessage.java	2011-07-29 17:24:59 UTC (rev 11068)
@@ -0,0 +1,50 @@
+package org.hornetq.core.protocol.core.impl.wireformat;
+
+import org.hornetq.api.core.HornetQBuffer;
+import org.hornetq.core.persistence.impl.journal.JournalStorageManager.JournalContent;
+import org.hornetq.core.protocol.core.impl.PacketImpl;
+
+/**
+ * Used to copy JournalFile data over to the backup during synchronization.
+ */
+public final class ReplicationJournalFileMessage extends PacketImpl
+{
+
+   private byte[] data;
+   private int dataSize;
+   private JournalContent journalType;
+   private long fileId;
+
+   public ReplicationJournalFileMessage()
+   {
+      super(REPLICATION_SYNC);
+   }
+
+   public ReplicationJournalFileMessage(int size, byte[] data, JournalContent content, long id)
+   {
+      this();
+      this.fileId = id;
+      this.dataSize = size;
+      this.data = data;
+      this.journalType = content;
+   }
+
+   @Override
+   public void encodeRest(final HornetQBuffer buffer)
+   {
+      buffer.writeLong(fileId);
+      buffer.writeByte(journalType.typeByte);
+      buffer.writeInt(dataSize);
+      buffer.writeBytes(data, 0, dataSize);
+   }
+
+   @Override
+   public void decodeRest(final HornetQBuffer buffer)
+   {
+      fileId = buffer.readLong();
+      journalType = JournalContent.getType(buffer.readByte());
+      int size = buffer.readInt();
+      data = new byte[size];
+      buffer.readBytes(data);
+   }
+}

Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/ReplicationManager.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/ReplicationManager.java	2011-07-29 04:25:11 UTC (rev 11067)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/ReplicationManager.java	2011-07-29 17:24:59 UTC (rev 11068)
@@ -93,4 +93,12 @@
     */
    void sendJournalFile(JournalFile jf, JournalContent type) throws IOException, HornetQException;
 
+   /**
+    * Reserve the following fileIDs in the backup server.
+    * @param datafiles
+    * @param contentType
+    * @throws HornetQException
+    */
+   void reserveFileIds(JournalFile[] datafiles, JournalContent contentType) throws HornetQException;
+
 }

Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java	2011-07-29 04:25:11 UTC (rev 11067)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java	2011-07-29 17:24:59 UTC (rev 11068)
@@ -21,6 +21,7 @@
 import org.hornetq.core.config.Configuration;
 import org.hornetq.core.journal.Journal;
 import org.hornetq.core.journal.JournalLoadInformation;
+import org.hornetq.core.journal.impl.JournalImpl;
 import org.hornetq.core.logging.Logger;
 import org.hornetq.core.paging.Page;
 import org.hornetq.core.paging.PagedMessage;
@@ -28,6 +29,7 @@
 import org.hornetq.core.paging.impl.PagingManagerImpl;
 import org.hornetq.core.paging.impl.PagingStoreFactoryNIO;
 import org.hornetq.core.persistence.impl.journal.JournalStorageManager;
+import org.hornetq.core.persistence.impl.journal.JournalStorageManager.JournalContent;
 import org.hornetq.core.protocol.core.Channel;
 import org.hornetq.core.protocol.core.Packet;
 import org.hornetq.core.protocol.core.impl.PacketImpl;
@@ -39,6 +41,7 @@
 import org.hornetq.core.protocol.core.impl.wireformat.ReplicationCompareDataMessage;
 import org.hornetq.core.protocol.core.impl.wireformat.ReplicationDeleteMessage;
 import org.hornetq.core.protocol.core.impl.wireformat.ReplicationDeleteTXMessage;
+import org.hornetq.core.protocol.core.impl.wireformat.ReplicationFileIdMessage;
 import org.hornetq.core.protocol.core.impl.wireformat.ReplicationLargeMessageBeingMessage;
 import org.hornetq.core.protocol.core.impl.wireformat.ReplicationLargeMessageWriteMessage;
 import org.hornetq.core.protocol.core.impl.wireformat.ReplicationLargemessageEndMessage;
@@ -80,10 +83,11 @@
 
    private JournalLoadInformation[] journalLoadInformation;
 
-   private final ConcurrentMap<SimpleString, ConcurrentMap<Integer, Page>> pageIndex = new ConcurrentHashMap<SimpleString, ConcurrentMap<Integer, Page>>();
+   private final ConcurrentMap<SimpleString, ConcurrentMap<Integer, Page>> pageIndex =
+            new ConcurrentHashMap<SimpleString, ConcurrentMap<Integer, Page>>();
+   private final ConcurrentMap<Long, LargeServerMessage> largeMessages =
+            new ConcurrentHashMap<Long, LargeServerMessage>();
 
-   private final ConcurrentMap<Long, LargeServerMessage> largeMessages = new ConcurrentHashMap<Long, LargeServerMessage>();
-
    // Used on tests, to simulate failures on delete pages
    private boolean deletePages = true;
 
@@ -176,6 +180,10 @@
             handleCompareDataMessage((ReplicationCompareDataMessage)packet);
             response = new NullResponseMessage();
          }
+         else if (type == PacketImpl.REPLICATION_FILE_ID)
+         {
+            handleJournalFileIdReservation((ReplicationFileIdMessage)packet);
+         }
          else
          {
             log.warn("Packet " + packet + " can't be processed by the ReplicationEndpoint");
@@ -215,8 +223,8 @@
 
       server.getManagementService().setStorageManager(storage);
 
-      registerJournal((byte)1, storage.getMessageJournal());
-      registerJournal((byte)0, storage.getBindingsJournal());
+      registerJournal(JournalContent.MESSAGES.typeByte, storage.getMessageJournal());
+      registerJournal(JournalContent.BINDINGS.typeByte, storage.getBindingsJournal());
 
       // We only need to load internal structures on the backup...
       journalLoadInformation = storage.loadInternalOnly();
@@ -353,9 +361,24 @@
    // Protected -----------------------------------------------------
 
    // Private -------------------------------------------------------
-   /**
-    * @param packet
-    */
+
+   private void handleJournalFileIdReservation(final ReplicationFileIdMessage packet) throws HornetQException
+   {
+      final Journal journalIf = journals[packet.getJournalContentType().typeByte];
+      if (journalIf.isStarted())
+      {
+         throw new HornetQException(HornetQException.INTERNAL_ERROR, "Journal can not be started!");
+      }
+
+      if (!(journalIf instanceof JournalImpl))
+      {
+         throw new HornetQException(HornetQException.INTERNAL_ERROR,
+                                    "Journals of backup server are expected to be JournalImpl");
+      }
+      JournalImpl journal = (JournalImpl)journalIf;
+
+   }
+
    private void handleLargeMessageEnd(final ReplicationLargemessageEndMessage packet)
    {
       LargeServerMessage message = lookupLargeMessage(packet.getMessageId(), true);

Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java	2011-07-29 04:25:11 UTC (rev 11067)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java	2011-07-29 17:24:59 UTC (rev 11068)
@@ -44,7 +44,8 @@
 import org.hornetq.core.protocol.core.impl.wireformat.ReplicationCompareDataMessage;
 import org.hornetq.core.protocol.core.impl.wireformat.ReplicationDeleteMessage;
 import org.hornetq.core.protocol.core.impl.wireformat.ReplicationDeleteTXMessage;
-import org.hornetq.core.protocol.core.impl.wireformat.ReplicationJournalFile;
+import org.hornetq.core.protocol.core.impl.wireformat.ReplicationFileIdMessage;
+import org.hornetq.core.protocol.core.impl.wireformat.ReplicationJournalFileMessage;
 import org.hornetq.core.protocol.core.impl.wireformat.ReplicationLargeMessageBeingMessage;
 import org.hornetq.core.protocol.core.impl.wireformat.ReplicationLargeMessageWriteMessage;
 import org.hornetq.core.protocol.core.impl.wireformat.ReplicationLargemessageEndMessage;
@@ -507,15 +508,23 @@
    public void sendJournalFile(JournalFile jf, JournalContent content) throws IOException, HornetQException
    {
       FileInputStream file = new FileInputStream(jf.getFile().getFileName());
-      byte[] data = new byte[1 << 17]; // about 130 kB
+      final long id = jf.getFileID();
+      final byte[] data = new byte[1 << 17]; // about 130 kB
       while (true)
       {
          int bytesRead = file.read(data);
          if (bytesRead == -1)
             break;
-         replicatingChannel.sendBlocking(new ReplicationJournalFile(bytesRead, data, content));
+         replicatingChannel.sendBlocking(new ReplicationJournalFileMessage(bytesRead, data, content, id));
       }
+      // XXX probably need to sync the JournalFile(?)
       throw new UnsupportedOperationException();
    }
 
+   @Override
+   public void reserveFileIds(JournalFile[] datafiles, JournalContent contentType) throws HornetQException
+   {
+      replicatingChannel.sendBlocking(new ReplicationFileIdMessage(datafiles, contentType));
+   }
+
 }



More information about the hornetq-commits mailing list