[hornetq-commits] JBoss hornetq SVN: r11122 - in branches/HORNETQ-720_Replication: hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl and 5 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Thu Aug 4 11:30:21 EDT 2011


Author: borges
Date: 2011-08-04 11:30:21 -0400 (Thu, 04 Aug 2011)
New Revision: 11122

Added:
   branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/ReplicationStartSyncMessage.java
Removed:
   branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/ReplicationFileIdMessage.java
Modified:
   branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
   branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/PacketDecoder.java
   branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/ReplicationManager.java
   branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
   branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
   branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java
   branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/JournalImpl.java
   branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/ReplicatingJournal.java
Log:
HORNETQ-720 Avoid record counting in the journal during replication sync
- add new journal state "SYNC" as the Journal cannot be considered "LOADED".
- rename R*SendFileIdMessage to R*StartSyncMessage as it does more than fileID reservation

Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java	2011-08-04 15:28:56 UTC (rev 11121)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java	2011-08-04 15:30:21 UTC (rev 11122)
@@ -447,7 +447,7 @@
       {
          jf.setCanReclaim(false);
       }
-      replicator.reserveFileIds(datafiles, contentType);
+      replicator.sendStartSyncMessage(datafiles, contentType);
       return datafiles;
    }
 
@@ -1674,7 +1674,6 @@
       JournalLoadInformation[] info = new JournalLoadInformation[2];
       info[0] = bindingsJournal.loadInternalOnly();
       info[1] = messageJournal.loadInternalOnly();
-
       return info;
    }
 

Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/PacketDecoder.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/PacketDecoder.java	2011-08-04 15:28:56 UTC (rev 11121)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/PacketDecoder.java	2011-08-04 15:30:21 UTC (rev 11122)
@@ -106,7 +106,7 @@
 import org.hornetq.core.protocol.core.impl.wireformat.ReplicationCompareDataMessage;
 import org.hornetq.core.protocol.core.impl.wireformat.ReplicationDeleteMessage;
 import org.hornetq.core.protocol.core.impl.wireformat.ReplicationDeleteTXMessage;
-import org.hornetq.core.protocol.core.impl.wireformat.ReplicationFileIdMessage;
+import org.hornetq.core.protocol.core.impl.wireformat.ReplicationStartSyncMessage;
 import org.hornetq.core.protocol.core.impl.wireformat.ReplicationJournalFileMessage;
 import org.hornetq.core.protocol.core.impl.wireformat.ReplicationLargeMessageBeingMessage;
 import org.hornetq.core.protocol.core.impl.wireformat.ReplicationLargeMessageWriteMessage;
@@ -535,7 +535,7 @@
          }
          case PacketImpl.REPLICATION_FILE_ID:
          {
-            packet = new ReplicationFileIdMessage();
+            packet = new ReplicationStartSyncMessage();
             break;
          }
          case PacketImpl.REPLICATION_SYNC:

Deleted: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/ReplicationFileIdMessage.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/ReplicationFileIdMessage.java	2011-08-04 15:28:56 UTC (rev 11121)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/ReplicationFileIdMessage.java	2011-08-04 15:30:21 UTC (rev 11122)
@@ -1,69 +0,0 @@
-package org.hornetq.core.protocol.core.impl.wireformat;
-
-import org.hornetq.api.core.HornetQBuffer;
-import org.hornetq.core.journal.impl.JournalFile;
-import org.hornetq.core.persistence.impl.journal.JournalStorageManager.JournalContent;
-import org.hornetq.core.protocol.core.impl.PacketImpl;
-
-/**
- * Sends all fileIDs used in the live server to the backup. This is done so that we:
- * <ol>
- * <li>reserve those IDs in the backup;
- * <li>start replicating while the journal synchronization is taking place.
- * </ol>
- */
-public class ReplicationFileIdMessage extends PacketImpl
-{
-
-   private long[] ids;
-   private JournalContent journalType;
-
-   public ReplicationFileIdMessage()
-   {
-      super(REPLICATION_FILE_ID);
-   }
-
-   public ReplicationFileIdMessage(JournalFile[] datafiles, JournalContent contentType)
-   {
-      this();
-      ids = new long[datafiles.length];
-      for (int i = 0; i < datafiles.length; i++)
-      {
-         ids[i] = datafiles[i].getFileID();
-      }
-      journalType = contentType;
-   }
-
-   @Override
-   public void encodeRest(final HornetQBuffer buffer)
-   {
-      buffer.writeByte(journalType.typeByte);
-      buffer.writeInt(ids.length);
-      for (long id : ids)
-      {
-         buffer.writeLong(id);
-      }
-   }
-
-   @Override
-   public void decodeRest(final HornetQBuffer buffer)
-   {
-      journalType = JournalContent.getType(buffer.readByte());
-      int length = buffer.readInt();
-      ids = new long[length];
-      for (int i = 0; i < length; i++)
-      {
-         ids[i] = buffer.readLong();
-      }
-   }
-
-   public JournalContent getJournalContentType()
-   {
-      return journalType;
-   }
-
-   public long[] getFileIds()
-   {
-      return ids;
-   }
-}

Copied: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/ReplicationStartSyncMessage.java (from rev 11121, branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/ReplicationFileIdMessage.java)
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/ReplicationStartSyncMessage.java	                        (rev 0)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/ReplicationStartSyncMessage.java	2011-08-04 15:30:21 UTC (rev 11122)
@@ -0,0 +1,68 @@
+package org.hornetq.core.protocol.core.impl.wireformat;
+
+import org.hornetq.api.core.HornetQBuffer;
+import org.hornetq.core.journal.impl.JournalFile;
+import org.hornetq.core.persistence.impl.journal.JournalStorageManager.JournalContent;
+import org.hornetq.core.protocol.core.impl.PacketImpl;
+
+/**
+ * Sends all fileIDs used in the live server to the backup. This is done so that we:
+ * <ol>
+ * <li>reserve those IDs in the backup;
+ * <li>start replicating while the journal synchronization is taking place.
+ * </ol>
+ */
+public class ReplicationStartSyncMessage extends PacketImpl
+{
+   private long[] ids;
+   private JournalContent journalType;
+
+   public ReplicationStartSyncMessage()
+   {
+      super(REPLICATION_FILE_ID);
+   }
+
+   public ReplicationStartSyncMessage(JournalFile[] datafiles, JournalContent contentType)
+   {
+      this();
+      ids = new long[datafiles.length];
+      for (int i = 0; i < datafiles.length; i++)
+      {
+         ids[i] = datafiles[i].getFileID();
+      }
+      journalType = contentType;
+   }
+
+   @Override
+   public void encodeRest(final HornetQBuffer buffer)
+   {
+      buffer.writeByte(journalType.typeByte);
+      buffer.writeInt(ids.length);
+      for (long id : ids)
+      {
+         buffer.writeLong(id);
+      }
+   }
+
+   @Override
+   public void decodeRest(final HornetQBuffer buffer)
+   {
+      journalType = JournalContent.getType(buffer.readByte());
+      int length = buffer.readInt();
+      ids = new long[length];
+      for (int i = 0; i < length; i++)
+      {
+         ids[i] = buffer.readLong();
+      }
+   }
+
+   public JournalContent getJournalContentType()
+   {
+      return journalType;
+   }
+
+   public long[] getFileIds()
+   {
+      return ids;
+   }
+}

Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/ReplicationManager.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/ReplicationManager.java	2011-08-04 15:28:56 UTC (rev 11121)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/ReplicationManager.java	2011-08-04 15:30:21 UTC (rev 11122)
@@ -99,7 +99,7 @@
     * @param contentType
     * @throws HornetQException
     */
-   void reserveFileIds(JournalFile[] datafiles, JournalContent contentType) throws HornetQException;
+   void sendStartSyncMessage(JournalFile[] datafiles, JournalContent contentType) throws HornetQException;
 
    /**
     * Informs backup that data synchronization is done.

Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java	2011-08-04 15:28:56 UTC (rev 11121)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java	2011-08-04 15:30:21 UTC (rev 11122)
@@ -28,6 +28,7 @@
 import org.hornetq.core.journal.SequentialFile;
 import org.hornetq.core.journal.impl.JournalFile;
 import org.hornetq.core.journal.impl.JournalImpl;
+import org.hornetq.core.journal.impl.ReplicatingJournal;
 import org.hornetq.core.logging.Logger;
 import org.hornetq.core.paging.Page;
 import org.hornetq.core.paging.PagedMessage;
@@ -47,7 +48,6 @@
 import org.hornetq.core.protocol.core.impl.wireformat.ReplicationCompareDataMessage;
 import org.hornetq.core.protocol.core.impl.wireformat.ReplicationDeleteMessage;
 import org.hornetq.core.protocol.core.impl.wireformat.ReplicationDeleteTXMessage;
-import org.hornetq.core.protocol.core.impl.wireformat.ReplicationFileIdMessage;
 import org.hornetq.core.protocol.core.impl.wireformat.ReplicationJournalFileMessage;
 import org.hornetq.core.protocol.core.impl.wireformat.ReplicationLargeMessageBeingMessage;
 import org.hornetq.core.protocol.core.impl.wireformat.ReplicationLargeMessageWriteMessage;
@@ -56,6 +56,7 @@
 import org.hornetq.core.protocol.core.impl.wireformat.ReplicationPageWriteMessage;
 import org.hornetq.core.protocol.core.impl.wireformat.ReplicationPrepareMessage;
 import org.hornetq.core.protocol.core.impl.wireformat.ReplicationResponseMessage;
+import org.hornetq.core.protocol.core.impl.wireformat.ReplicationStartSyncMessage;
 import org.hornetq.core.replication.ReplicationEndpoint;
 import org.hornetq.core.server.LargeServerMessage;
 import org.hornetq.core.server.ServerMessage;
@@ -81,16 +82,18 @@
 
    private Journal[] journals;
    private JournalLoadInformation[] journalLoadInformation;
-   // Files reserved in each journal for synchronization of existing data from the 'live' server
+
+   /** Files reserved in each journal for synchronization of existing data from the 'live' server. */
    private final Map<JournalContent, Map<Long, JournalFile>> filesReservedForSync =
             new HashMap<JournalContent, Map<Long, JournalFile>>();
 
+   /** Used to hold the real Journals before the backup is synchronized. */
+   private final Map<JournalContent, Journal> journalsHolder = new HashMap<JournalContent, Journal>();
+
    private JournalStorageManager storage;
 
    private PagingManager pageManager;
 
-
-
    private final ConcurrentMap<SimpleString, ConcurrentMap<Integer, Page>> pageIndex =
             new ConcurrentHashMap<SimpleString, ConcurrentMap<Integer, Page>>();
    private final ConcurrentMap<Long, LargeServerMessage> largeMessages =
@@ -191,7 +194,7 @@
          }
          else if (type == PacketImpl.REPLICATION_FILE_ID)
          {
-            handleJournalFileIdReservation((ReplicationFileIdMessage)packet);
+            handleStartReplicationSynchronization((ReplicationStartSyncMessage)packet);
          }
          else if (type == PacketImpl.REPLICATION_SYNC)
          {
@@ -236,11 +239,12 @@
 
       server.getManagementService().setStorageManager(storage);
 
-      registerJournal(JournalContent.BINDINGS.typeByte, storage.getBindingsJournal());
-      registerJournal(JournalContent.MESSAGES.typeByte, storage.getMessageJournal());
+      journalsHolder.put(JournalContent.BINDINGS, storage.getBindingsJournal());
+      journalsHolder.put(JournalContent.MESSAGES, storage.getMessageJournal());
 
       for (JournalContent jc : EnumSet.allOf(JournalContent.class))
       {
+         System.out.println("State? " + journalsHolder.get(jc));
          filesReservedForSync.put(jc, new HashMap<Long, JournalFile>());
       }
 
@@ -257,7 +261,7 @@
 
       pageManager.start();
 
-       started = true;
+      started = true;
 
    }
 
@@ -389,9 +393,9 @@
    {
       if (msg.isUpToDate())
       {
-         for (Journal journal2 : journals)
+         for (JournalContent jc : EnumSet.allOf(JournalContent.class))
          {
-            JournalImpl journal = (JournalImpl)journal2;
+            JournalImpl journal = (JournalImpl)journalsHolder.get(jc);
             journal.writeLock();
             try
             {
@@ -401,6 +405,7 @@
                }
                // files should be already in place.
                filesReservedForSync.remove(msg.getJournalContent());
+               registerJournal(jc.typeByte, journalsHolder.get(jc));
                // XXX HORNETQ-720 must reload journals
                // XXX HORNETQ-720 must start using real journals
             }
@@ -417,6 +422,7 @@
 
       long id = msg.getFileId();
       JournalFile journalFile = filesReservedForSync.get(msg.getJournalContent()).get(Long.valueOf(id));
+
       byte[] data = msg.getData();
       if (data == null)
       {
@@ -433,18 +439,27 @@
       }
    }
 
-   private void handleJournalFileIdReservation(final ReplicationFileIdMessage packet) throws Exception
+   /**
+    * Reserves files (with the given fileID) in the specified journal, and places a
+    * {@link ReplicatingJournal} in place to store messages while synchronization is going on.
+    * @param packet
+    * @throws Exception
+    */
+   private void handleStartReplicationSynchronization(final ReplicationStartSyncMessage packet) throws Exception
    {
       if (server.isRemoteBackupUpToDate())
       {
          throw new HornetQException(HornetQException.INTERNAL_ERROR, "RemoteBackup can not be up-to-date!");
       }
-      final Journal journalIf = journals[packet.getJournalContentType().typeByte];
+      final Journal journalIf = journalsHolder.get(packet.getJournalContentType());
 
       JournalImpl journal = assertJournalImpl(journalIf);
-      journal.createFilesForRemoteSync(packet.getFileIds(), filesReservedForSync.get(packet.getJournalContentType()));
+      Map<Long, JournalFile> mapToFill = filesReservedForSync.get(packet.getJournalContentType());
+      JournalFile current = journal.createFilesForRemoteSync(packet.getFileIds(), mapToFill);
+      registerJournal(packet.getJournalContentType().typeByte, new ReplicatingJournal(current));
    }
 
+   // XXX HORNETQ-720 really need to do away with this once the method calls get stable.
    private static JournalImpl assertJournalImpl(final Journal journalIf) throws HornetQException
    {
       if (!(journalIf instanceof JournalImpl))

Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java	2011-08-04 15:28:56 UTC (rev 11121)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java	2011-08-04 15:30:21 UTC (rev 11122)
@@ -44,7 +44,6 @@
 import org.hornetq.core.protocol.core.impl.wireformat.ReplicationCompareDataMessage;
 import org.hornetq.core.protocol.core.impl.wireformat.ReplicationDeleteMessage;
 import org.hornetq.core.protocol.core.impl.wireformat.ReplicationDeleteTXMessage;
-import org.hornetq.core.protocol.core.impl.wireformat.ReplicationFileIdMessage;
 import org.hornetq.core.protocol.core.impl.wireformat.ReplicationJournalFileMessage;
 import org.hornetq.core.protocol.core.impl.wireformat.ReplicationLargeMessageBeingMessage;
 import org.hornetq.core.protocol.core.impl.wireformat.ReplicationLargeMessageWriteMessage;
@@ -52,6 +51,7 @@
 import org.hornetq.core.protocol.core.impl.wireformat.ReplicationPageEventMessage;
 import org.hornetq.core.protocol.core.impl.wireformat.ReplicationPageWriteMessage;
 import org.hornetq.core.protocol.core.impl.wireformat.ReplicationPrepareMessage;
+import org.hornetq.core.protocol.core.impl.wireformat.ReplicationStartSyncMessage;
 import org.hornetq.core.replication.ReplicationManager;
 import org.hornetq.utils.ExecutorFactory;
 
@@ -509,7 +509,7 @@
    public void sendJournalFile(JournalFile jf, JournalContent content) throws Exception
    {
       SequentialFile file = jf.getFile().copy();
-      log.info("Replication: sending " + jf + " to backup. " + file);
+      log.info("Replication: sending " + jf + " (size=" + file.size() + ") to backup. " + file);
       if (!file.isOpen())
       {
          file.open(1, false);
@@ -531,9 +531,9 @@
    }
 
    @Override
-   public void reserveFileIds(JournalFile[] datafiles, JournalContent contentType) throws HornetQException
+   public void sendStartSyncMessage(JournalFile[] datafiles, JournalContent contentType) throws HornetQException
    {
-      sendReplicatePacket(new ReplicationFileIdMessage(datafiles, contentType));
+      sendReplicatePacket(new ReplicationStartSyncMessage(datafiles, contentType));
    }
 
    @Override

Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java	2011-08-04 15:28:56 UTC (rev 11121)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java	2011-08-04 15:30:21 UTC (rev 11122)
@@ -565,7 +565,7 @@
 
                         if (liveServerSessionFactory == null)
                         {
-                           // XXX
+                        // XXX HORNETQ-720
                            throw new RuntimeException("Need to retry...");
                         }
                         CoreRemotingConnection liveConnection = liveServerSessionFactory.getConnection();

Modified: branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/JournalImpl.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/JournalImpl.java	2011-08-04 15:28:56 UTC (rev 11121)
+++ branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/JournalImpl.java	2011-08-04 15:30:21 UTC (rev 11122)
@@ -77,7 +77,9 @@
 
    private enum JournalState
    {
-      STOPPED, STARTED, LOADED;
+      STOPPED, STARTED,
+      /** When a replicating server is still not synchronized with its live. */
+      SYNCING, LOADED;
    }
    // Constants -----------------------------------------------------
 
@@ -100,7 +102,7 @@
    // Journal
    private static final void trace(final String message)
    {
-      JournalImpl.log.trace(message);
+      JournalImpl.log.info(message);
    }
 
    private static final void traceRecord(final String message)
@@ -304,6 +306,12 @@
       this.userVersion = userVersion;
    }
 
+   @Override
+   public String toString()
+   {
+      return super.toString() + " " + state;
+   }
+
    public void runDirectJournalBlast() throws Exception
    {
       final int numIts = 100000000;
@@ -411,7 +419,6 @@
       // since we can re-use dataFiles
 
       Collections.sort(orderedFiles, new JournalFileComparator());
-
       return orderedFiles;
    }
 
@@ -1425,6 +1432,7 @@
       }
    }
 
+   // XXX make it protected?
    public int getAlignment() throws Exception
    {
       return fileFactory.getAlignment();
@@ -1458,7 +1466,7 @@
          }
       };
 
-      return this.load(dummyLoader);
+      return this.load(dummyLoader, true, true);
    }
 
    public JournalLoadInformation load(final List<RecordInfo> committedRecords,
@@ -1765,9 +1773,9 @@
 
             localCompactor.replayPendingCommands();
 
-            // Merge transactions back after compacting
-            // This has to be done after the replay pending commands, as we need to delete committs that happened during
-            // the compacting
+            // Merge transactions back after compacting.
+            // This has to be done after the replay pending commands, as we need to delete commits
+            // that happened during the compacting
 
             for (JournalTransaction newTransaction : localCompactor.getNewTransactions().values())
             {
@@ -1868,11 +1876,20 @@
       return load(loadManager, true);
    }
 
-   public synchronized JournalLoadInformation load(final LoaderCallback loadManager, boolean fixFailingTransactions) throws Exception
+   public JournalLoadInformation load(final LoaderCallback loadManager, boolean fixFailingTransactions)
+            throws Exception
    {
+      return load(loadManager, fixFailingTransactions, false);
+   }
+
+   private synchronized JournalLoadInformation load(final LoaderCallback loadManager, boolean fixFailingTransactions,
+            final boolean replicationSync) throws Exception
+   {
+
       if (state != JournalState.STARTED)
       {
-         throw new IllegalStateException("Journal " + this + " must be in started state, was " + state);
+         throw new IllegalStateException("Journal " + this + " must be in " + JournalState.STARTED + " state, was " +
+                  state);
       }
 
       checkControlFile();
@@ -2162,6 +2179,13 @@
          }
       }
 
+      if (replicationSync)
+      {
+         assert filesRepository.getDataFiles().isEmpty();
+         setJournalState(JournalState.SYNCING);
+         return new JournalLoadInformation(0, -1);
+      }
+
       // Create any more files we need
 
       filesRepository.ensureMinFiles();
@@ -3144,11 +3168,6 @@
       }
    }
 
-   private HornetQBuffer newBuffer(final int size)
-   {
-      return HornetQBuffers.fixedBuffer(size);
-   }
-
    // Inner classes
    // ---------------------------------------------------------------------------
 
@@ -3157,11 +3176,6 @@
 
       private static NullEncoding instance = new NullEncoding();
 
-      public static NullEncoding getInstance()
-      {
-         return NullEncoding.instance;
-      }
-
       public void decode(final HornetQBuffer buffer)
       {
       }
@@ -3271,9 +3285,10 @@
 
    /**
     * @param fileIds
+    * @return
     * @throws Exception
     */
-   public void createFilesForRemoteSync(long[] fileIds, Map<Long, JournalFile> map) throws Exception
+   public JournalFile createFilesForRemoteSync(long[] fileIds, Map<Long, JournalFile> map) throws Exception
    {
       writeLock();
       try
@@ -3285,14 +3300,18 @@
             maxID = Math.max(maxID, id);
             map.put(Long.valueOf(id), filesRepository.createRemoteBackupSyncFile(id));
          }
-         if (maxID > 0)
-         {
-            filesRepository.setNextFileID(maxID);
-         }
+         maxID += 1;
+         filesRepository.setNextFileID(maxID);
+         return filesRepository.createRemoteBackupSyncFile(maxID);
       }
       finally
       {
          writeUnlock();
       }
    }
+
+   public boolean getAutoReclaim()
+   {
+      return autoReclaim;
+   }
 }

Modified: branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/ReplicatingJournal.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/ReplicatingJournal.java	2011-08-04 15:28:56 UTC (rev 11121)
+++ branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/ReplicatingJournal.java	2011-08-04 15:30:21 UTC (rev 11122)
@@ -1,29 +1,277 @@
 package org.hornetq.core.journal.impl;
 
-import org.hornetq.core.journal.SequentialFileFactory;
+import java.util.List;
 
+import org.hornetq.core.journal.EncodingSupport;
+import org.hornetq.core.journal.IOCompletion;
+import org.hornetq.core.journal.Journal;
+import org.hornetq.core.journal.JournalLoadInformation;
+import org.hornetq.core.journal.LoaderCallback;
+import org.hornetq.core.journal.PreparedTransactionInfo;
+import org.hornetq.core.journal.RecordInfo;
+import org.hornetq.core.journal.TransactionFailureCallback;
+
 /**
  * Journal used at a replicating backup server during the synchronization of data with the 'live'
  * server.
+ * <p>
+ * Its main purpose is to store the data like a Journal would but without verifying records.
  */
-public class ReplicatingJournal extends JournalImpl
+public class ReplicatingJournal implements Journal
 {
 
+   private final JournalFile file;
+
    /**
-    * @param fileSize
-    * @param minFiles
-    * @param compactMinFiles
-    * @param compactPercentage
-    * @param fileFactory
-    * @param filePrefix
-    * @param fileExtension
-    * @param maxAIO
+    * @param file
     */
-   public ReplicatingJournal(int fileSize, int minFiles, int compactMinFiles, int compactPercentage,
-                             SequentialFileFactory fileFactory, String filePrefix, String fileExtension, int maxAIO)
+   public ReplicatingJournal(JournalFile file)
    {
-      super(fileSize, minFiles, compactMinFiles, compactPercentage, fileFactory, filePrefix, fileExtension, maxAIO);
+      this.file = file;
    }
 
+   @Override
+   public void start() throws Exception
+   {
+      // TODO Auto-generated method stub
 
+   }
+
+   @Override
+   public void stop() throws Exception
+   {
+      // TODO Auto-generated method stub
+
+   }
+
+   @Override
+   public boolean isStarted()
+   {
+      // TODO Auto-generated method stub
+      return false;
+   }
+
+   @Override
+   public void appendAddRecord(long id, byte recordType, byte[] record, boolean sync) throws Exception
+   {
+      throw new UnsupportedOperationException();
+   }
+
+   @Override
+   public void appendAddRecord(long id, byte recordType, byte[] record, boolean sync, IOCompletion completionCallback)
+                                                                                                                   throws Exception
+   {
+      throw new UnsupportedOperationException();
+   }
+
+   @Override
+   public void appendAddRecord(long id, byte recordType, EncodingSupport record, boolean sync) throws Exception
+   {
+      throw new UnsupportedOperationException();
+   }
+
+   @Override
+   public void appendAddRecord(long id, byte recordType, EncodingSupport record, boolean sync,
+                               IOCompletion completionCallback) throws Exception
+   {
+      throw new UnsupportedOperationException();
+
+   }
+
+   @Override
+   public void appendUpdateRecord(long id, byte recordType, byte[] record, boolean sync) throws Exception
+   {
+      throw new UnsupportedOperationException();
+   }
+
+   @Override
+   public void
+            appendUpdateRecord(long id, byte recordType, byte[] record, boolean sync, IOCompletion completionCallback)
+                     throws Exception
+   {
+      throw new UnsupportedOperationException();
+   }
+
+   @Override
+   public void appendUpdateRecord(long id, byte recordType, EncodingSupport record, boolean sync) throws Exception
+   {
+      throw new UnsupportedOperationException();
+   }
+
+   @Override
+   public void appendUpdateRecord(long id, byte recordType, EncodingSupport record, boolean sync,
+                                  IOCompletion completionCallback) throws Exception
+   {
+      throw new UnsupportedOperationException();
+   }
+
+   @Override
+   public void appendDeleteRecord(long id, boolean sync) throws Exception
+   {
+      throw new UnsupportedOperationException();
+   }
+
+   @Override
+   public void appendDeleteRecord(long id, boolean sync, IOCompletion completionCallback) throws Exception
+   {
+      throw new UnsupportedOperationException();
+   }
+
+   @Override
+   public void appendAddRecordTransactional(long txID, long id, byte recordType, byte[] record) throws Exception
+   {
+      throw new UnsupportedOperationException();
+   }
+
+   @Override
+   public void appendAddRecordTransactional(long txID, long id, byte recordType, EncodingSupport record)
+            throws Exception
+   {
+      throw new UnsupportedOperationException();
+   }
+
+   @Override
+   public void appendUpdateRecordTransactional(long txID, long id, byte recordType, byte[] record) throws Exception
+   {
+      throw new UnsupportedOperationException();
+   }
+
+   @Override
+   public void appendUpdateRecordTransactional(long txID, long id, byte recordType, EncodingSupport record)
+            throws Exception
+   {
+      throw new UnsupportedOperationException();
+   }
+
+   @Override
+   public void appendDeleteRecordTransactional(long txID, long id, byte[] record) throws Exception
+   {
+      throw new UnsupportedOperationException();
+   }
+
+   @Override
+   public void appendDeleteRecordTransactional(long txID, long id, EncodingSupport record) throws Exception
+   {
+      throw new UnsupportedOperationException();
+   }
+
+   @Override
+   public void appendDeleteRecordTransactional(long txID, long id) throws Exception
+   {
+      throw new UnsupportedOperationException();
+   }
+
+   @Override
+   public void appendCommitRecord(long txID, boolean sync) throws Exception
+   {
+      throw new UnsupportedOperationException();
+   }
+
+   @Override
+   public void appendCommitRecord(long txID, boolean sync, IOCompletion callback) throws Exception
+   {
+      throw new UnsupportedOperationException();
+   }
+
+   @Override
+   public void
+            appendCommitRecord(long txID, boolean sync, IOCompletion callback, boolean lineUpContext) throws Exception
+   {
+      throw new UnsupportedOperationException();
+   }
+
+   @Override
+   public void appendPrepareRecord(long txID, EncodingSupport transactionData, boolean sync) throws Exception
+   {
+      throw new UnsupportedOperationException();
+   }
+
+   @Override
+   public void appendPrepareRecord(long txID, EncodingSupport transactionData, boolean sync, IOCompletion callback)
+            throws Exception
+   {
+      throw new UnsupportedOperationException();
+   }
+
+   @Override
+   public void appendPrepareRecord(long txID, byte[] transactionData, boolean sync) throws Exception
+   {
+      throw new UnsupportedOperationException();
+   }
+
+   @Override
+   public void
+            appendPrepareRecord(long txID, byte[] transactionData, boolean sync, IOCompletion callback)
+                                                                                                       throws Exception
+   {
+      throw new UnsupportedOperationException();
+   }
+
+   @Override
+   public void appendRollbackRecord(long txID, boolean sync) throws Exception
+   {
+      throw new UnsupportedOperationException();
+   }
+
+   @Override
+   public void appendRollbackRecord(long txID, boolean sync, IOCompletion callback) throws Exception
+   {
+      throw new UnsupportedOperationException();
+   }
+
+   @Override
+   public JournalLoadInformation load(LoaderCallback reloadManager) throws Exception
+   {
+      throw new UnsupportedOperationException();
+   }
+
+   @Override
+   public JournalLoadInformation loadInternalOnly() throws Exception
+   {
+      throw new UnsupportedOperationException();
+   }
+
+   @Override
+   public void lineUpContex(IOCompletion callback)
+   {
+      throw new UnsupportedOperationException();
+   }
+
+   @Override
+   public JournalLoadInformation load(List<RecordInfo> committedRecords,
+            List<PreparedTransactionInfo> preparedTransactions, TransactionFailureCallback transactionFailure)
+            throws Exception
+   {
+      throw new UnsupportedOperationException();
+   }
+
+   @Override
+   public int getAlignment() throws Exception
+   {
+      throw new UnsupportedOperationException();
+   }
+
+   @Override
+   public int getNumberOfRecords()
+   {
+      throw new UnsupportedOperationException();
+   }
+
+   @Override
+   public int getUserVersion()
+   {
+      throw new UnsupportedOperationException();
+   }
+
+   @Override
+   public void perfBlast(int pages)
+   {
+      throw new UnsupportedOperationException();
+   }
+
+   @Override
+   public void runDirectJournalBlast() throws Exception
+   {
+      throw new UnsupportedOperationException();
+   }
 }



More information about the hornetq-commits mailing list