[hornetq-commits] JBoss hornetq SVN: r11753 - in trunk/hornetq-core/src/main/java/org/hornetq/core: protocol/core/impl/wireformat and 2 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Thu Nov 24 07:24:35 EST 2011


Author: borges
Date: 2011-11-24 07:24:34 -0500 (Thu, 24 Nov 2011)
New Revision: 11753

Modified:
   trunk/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
   trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/ReplicationStartSyncMessage.java
   trunk/hornetq-core/src/main/java/org/hornetq/core/replication/ReplicationManager.java
   trunk/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
   trunk/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
Log:
HORNETQ-720 Send live's nodeID as soon as possible, as it is needed to detect failures.

Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java	2011-11-24 10:25:58 UTC (rev 11752)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java	2011-11-24 12:24:34 UTC (rev 11753)
@@ -396,8 +396,8 @@
             pagingManager.lock();
             try
             {
-               messageFiles = prepareJournalForCopy(localMessageJournal, JournalContent.MESSAGES);
-               bindingsFiles = prepareJournalForCopy(localBindingsJournal, JournalContent.BINDINGS);
+               messageFiles = prepareJournalForCopy(localMessageJournal, JournalContent.MESSAGES, nodeID);
+               bindingsFiles = prepareJournalForCopy(localBindingsJournal, JournalContent.BINDINGS, nodeID);
                pageFilesToSync = getPageInformationForSync(pagingManager);
                largeMessageFilesToSync = getLargeMessageInformation();
             }
@@ -527,11 +527,12 @@
             }
     }
 
-    private JournalFile[] prepareJournalForCopy(Journal journal, JournalContent contentType) throws Exception
+   private JournalFile[]
+            prepareJournalForCopy(Journal journal, JournalContent contentType, String nodeID) throws Exception
     {
         journal.forceMoveNextFile();
         JournalFile[] datafiles = journal.getDataFiles();
-        replicator.sendStartSyncMessage(datafiles, contentType);
+      replicator.sendStartSyncMessage(datafiles, contentType, nodeID);
         return datafiles;
     }
 

Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/ReplicationStartSyncMessage.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/ReplicationStartSyncMessage.java	2011-11-24 10:25:58 UTC (rev 11752)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/ReplicationStartSyncMessage.java	2011-11-24 12:24:34 UTC (rev 11753)
@@ -30,9 +30,10 @@
       this.nodeID = nodeID;
    }
 
-   public ReplicationStartSyncMessage(JournalFile[] datafiles, JournalContent contentType)
+   public ReplicationStartSyncMessage(JournalFile[] datafiles, JournalContent contentType, String nodeID)
    {
       this();
+      this.nodeID = nodeID;
       synchronizationIsFinished = false;
       ids = new long[datafiles.length];
       for (int i = 0; i < datafiles.length; i++)
@@ -46,11 +47,9 @@
    public void encodeRest(final HornetQBuffer buffer)
    {
       buffer.writeBoolean(synchronizationIsFinished);
+      buffer.writeString(nodeID);
       if (synchronizationIsFinished)
-      {
-         buffer.writeString(nodeID);
          return;
-      }
       buffer.writeByte(journalType.typeByte);
       buffer.writeInt(ids.length);
       for (long id : ids)
@@ -63,9 +62,9 @@
    public void decodeRest(final HornetQBuffer buffer)
    {
       synchronizationIsFinished = buffer.readBoolean();
+      nodeID = buffer.readString();
       if (synchronizationIsFinished)
       {
-         nodeID = buffer.readString();
          return;
       }
       journalType = JournalContent.getType(buffer.readByte());

Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/replication/ReplicationManager.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/replication/ReplicationManager.java	2011-11-24 10:25:58 UTC (rev 11752)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/replication/ReplicationManager.java	2011-11-24 12:24:34 UTC (rev 11753)
@@ -93,7 +93,8 @@
     * @param contentType
     * @throws HornetQException
     */
-   void sendStartSyncMessage(JournalFile[] datafiles, JournalContent contentType) throws HornetQException;
+   void sendStartSyncMessage(JournalFile[] datafiles, JournalContent contentType, String nodeID)
+                                                                                                throws HornetQException;
 
    /**
     * Informs backup that data synchronization is done.

Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java	2011-11-24 10:25:58 UTC (rev 11752)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java	2011-11-24 12:24:34 UTC (rev 11753)
@@ -131,6 +131,7 @@
 
    // Public --------------------------------------------------------
 
+   @Override
    public void registerJournal(final byte id, final Journal journal)
    {
       if (journals == null || id >= journals.length)
@@ -150,6 +151,7 @@
       journals[id] = journal;
    }
 
+   @Override
    public void handlePacket(final Packet packet)
    {
       PacketImpl response = new ReplicationResponseMessage();
@@ -341,17 +343,13 @@
       started = false;
    }
 
-   /* (non-Javadoc)
-    * @see org.hornetq.core.replication.ReplicationEndpoint#getChannel()
-    */
+   @Override
    public Channel getChannel()
    {
       return channel;
    }
 
-   /* (non-Javadoc)
-    * @see org.hornetq.core.replication.ReplicationEndpoint#setChannel(org.hornetq.core.remoting.Channel)
-    */
+   @Override
    public void setChannel(final Channel channel)
    {
       this.channel = channel;
@@ -568,7 +566,11 @@
       synchronized (this)
       {
          if (!started)
-               return;
+            return;
+         if (packet.getNodeID() != null)
+         {
+            quorumManager.setLiveID(packet.getNodeID());
+         }
          Map<Long, JournalSyncFile> mapToFill = filesReservedForSync.get(packet.getJournalContentType());
          log.info("Journal " + packet.getJournalContentType() + ". Reserving fileIDs for synchronization: " +
                   Arrays.toString(packet.getFileIds()));
@@ -580,7 +582,7 @@
          FileWrapperJournal syncJournal = new FileWrapperJournal(journal);
          registerJournal(packet.getJournalContentType().typeByte, syncJournal);
       }
-     }
+   }
 
    private void handleLargeMessageEnd(final ReplicationLargeMessageEndMessage packet)
    {

Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java	2011-11-24 10:25:58 UTC (rev 11752)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java	2011-11-24 12:24:34 UTC (rev 11753)
@@ -110,9 +110,7 @@
       }
    }
 
-   /* (non-Javadoc)
-    * @see org.hornetq.core.replication.ReplicationManager#appendUpdateRecord(byte, long, byte, org.hornetq.core.journal.EncodingSupport, boolean)
-    */
+   @Override
    public void appendUpdateRecord(final byte journalID,
                                   final long id,
                                   final byte recordType,
@@ -124,9 +122,7 @@
       }
    }
 
-   /* (non-Javadoc)
-    * @see org.hornetq.core.replication.ReplicationManager#appendDeleteRecord(byte, long, boolean)
-    */
+   @Override
    public void appendDeleteRecord(final byte journalID, final long id) throws Exception
    {
       if (enabled)
@@ -585,10 +581,13 @@
    }
 
    @Override
-   public void sendStartSyncMessage(JournalFile[] datafiles, JournalContent contentType) throws HornetQException
+   public
+            void
+            sendStartSyncMessage(JournalFile[] datafiles, JournalContent contentType, String nodeID)
+                                                                                                    throws HornetQException
    {
       if (enabled)
-         sendReplicatePacket(new ReplicationStartSyncMessage(datafiles, contentType));
+         sendReplicatePacket(new ReplicationStartSyncMessage(datafiles, contentType, nodeID));
    }
 
    @Override



More information about the hornetq-commits mailing list