[hornetq-commits] JBoss hornetq SVN: r11304 - in branches/HORNETQ-720_Replication: hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal and 8 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Thu Sep 8 11:06:11 EDT 2011


Author: borges
Date: 2011-09-08 11:06:11 -0400 (Thu, 08 Sep 2011)
New Revision: 11304

Added:
   branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/BackupRegistrationMessage.java
Removed:
   branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/HaBackupRegistrationMessage.java
Modified:
   branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/client/impl/ServerLocatorImpl.java
   branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/client/impl/ServerLocatorInternal.java
   branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
   branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java
   branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/PacketDecoder.java
   branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/PacketImpl.java
   branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
   branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
   branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
   branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java
   branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/Journal.java
   branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/JournalImpl.java
   branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/util/BackupSyncDelay.java
Log:
HORNETQ-720 Clean up of code related to the case (see "HORNETQ-720 XXX" problem markers)

Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/client/impl/ServerLocatorImpl.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/client/impl/ServerLocatorImpl.java	2011-09-07 16:10:18 UTC (rev 11303)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/client/impl/ServerLocatorImpl.java	2011-09-08 15:06:11 UTC (rev 11304)
@@ -76,7 +76,7 @@
 
    private final StaticConnector staticConnector = new StaticConnector();
 
-   private Topology topology = new Topology();
+   private final Topology topology = new Topology();
 
    private Pair<TransportConfiguration, TransportConfiguration>[] topologyArray;
 
@@ -1270,21 +1270,6 @@
       }
    }
 
-   public synchronized void factoryClosed(final ClientSessionFactory factory)
-   {
-      factories.remove(factory);
-
-      if (factories.isEmpty())
-      {
-         // Go back to using the broadcast or static list
-
-         receivedTopology = false;
-
-         topology = null;
-
-      }
-   }
-
    public Topology getTopology()
    {
       return topology;

Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/client/impl/ServerLocatorInternal.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/client/impl/ServerLocatorInternal.java	2011-09-07 16:10:18 UTC (rev 11303)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/client/impl/ServerLocatorInternal.java	2011-09-08 15:06:11 UTC (rev 11304)
@@ -31,8 +31,6 @@
 {
    void start(Executor executor) throws Exception;
 
-   void factoryClosed(final ClientSessionFactory factory);
-
    void setNodeID(String nodeID);
 
    String getNodeID();

Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java	2011-09-07 16:10:18 UTC (rev 11303)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java	2011-09-08 15:06:11 UTC (rev 11304)
@@ -347,7 +347,6 @@
    }
 
    /**
-    * XXX FIXME HORNETQ-720 Method ignores the synchronization of Paging.
     * @param replicationManager
     * @param pagingManager
     * @throws HornetQException
@@ -703,8 +702,6 @@
       try
       {
       // Note that we don't sync, the add reference that comes immediately after will sync if appropriate
-
-         // XXX HORNETQ-720
       if (message.isLargeMessage())
       {
          messageJournal.appendAddRecord(message.getMessageID(),

Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java	2011-09-07 16:10:18 UTC (rev 11303)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java	2011-09-08 15:06:11 UTC (rev 11304)
@@ -31,7 +31,7 @@
 import org.hornetq.core.protocol.core.ServerSessionPacketHandler;
 import org.hornetq.core.protocol.core.impl.ChannelImpl.CHANNEL_ID;
 import org.hornetq.core.protocol.core.impl.wireformat.ClusterTopologyChangeMessage;
-import org.hornetq.core.protocol.core.impl.wireformat.HaBackupRegistrationMessage;
+import org.hornetq.core.protocol.core.impl.wireformat.BackupRegistrationMessage;
 import org.hornetq.core.protocol.core.impl.wireformat.NodeAnnounceMessage;
 import org.hornetq.core.protocol.core.impl.wireformat.Ping;
 import org.hornetq.core.protocol.core.impl.wireformat.SubscribeClusterTopologyUpdatesMessage;
@@ -146,9 +146,9 @@
                server.getClusterManager().notifyNodeUp(msg.getNodeID(), getPair(msg.getConnector(), backup), backup,
                                                        true);
             }
-            else if (packet.getType() == PacketImpl.HA_BACKUP_REGISTRATION)
+            else if (packet.getType() == PacketImpl.BACKUP_REGISTRATION)
             {
-               HaBackupRegistrationMessage msg = (HaBackupRegistrationMessage)packet;
+               BackupRegistrationMessage msg = (BackupRegistrationMessage)packet;
                try
                {
                   server.addHaBackup(rc);

Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/PacketDecoder.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/PacketDecoder.java	2011-09-07 16:10:18 UTC (rev 11303)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/PacketDecoder.java	2011-09-08 15:06:11 UTC (rev 11304)
@@ -90,7 +90,7 @@
 import org.hornetq.core.protocol.core.impl.wireformat.CreateSessionMessage;
 import org.hornetq.core.protocol.core.impl.wireformat.CreateSessionResponseMessage;
 import org.hornetq.core.protocol.core.impl.wireformat.DisconnectMessage;
-import org.hornetq.core.protocol.core.impl.wireformat.HaBackupRegistrationMessage;
+import org.hornetq.core.protocol.core.impl.wireformat.BackupRegistrationMessage;
 import org.hornetq.core.protocol.core.impl.wireformat.HornetQExceptionMessage;
 import org.hornetq.core.protocol.core.impl.wireformat.NodeAnnounceMessage;
 import org.hornetq.core.protocol.core.impl.wireformat.NullResponseMessage;
@@ -521,9 +521,9 @@
             packet = new SessionAddMetaDataMessageV2();
             break;
          }
-         case PacketImpl.HA_BACKUP_REGISTRATION:
+         case PacketImpl.BACKUP_REGISTRATION:
          {
-            packet = new HaBackupRegistrationMessage();
+            packet = new BackupRegistrationMessage();
             break;
          }
          case PacketImpl.REPLICATION_START_STOP_SYNC:

Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/PacketImpl.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/PacketImpl.java	2011-09-07 16:10:18 UTC (rev 11303)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/PacketImpl.java	2011-09-08 15:06:11 UTC (rev 11304)
@@ -192,8 +192,7 @@
 
    public static final byte SUBSCRIBE_TOPOLOGY = 112;
 
-   /** XXX HORNETQ-720 "HA" is not really used anywhere else. Better name? */
-   public static final byte HA_BACKUP_REGISTRATION = 113;
+   public static final byte BACKUP_REGISTRATION = 113;
 
    public static final byte REPLICATION_START_STOP_SYNC = 120;
 

Copied: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/BackupRegistrationMessage.java (from rev 11303, branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/HaBackupRegistrationMessage.java)
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/BackupRegistrationMessage.java	                        (rev 0)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/BackupRegistrationMessage.java	2011-09-08 15:06:11 UTC (rev 11304)
@@ -0,0 +1,60 @@
+/**
+ *
+ */
+package org.hornetq.core.protocol.core.impl.wireformat;
+
+import org.hornetq.api.core.HornetQBuffer;
+import org.hornetq.api.core.TransportConfiguration;
+import org.hornetq.core.protocol.core.impl.PacketImpl;
+
+/**
+ * Registers a backup node with its live server.
+ * <p>
+ * After registration the live server will initiate synchronization of its state with the new backup
+ * node.
+ */
+public class BackupRegistrationMessage extends PacketImpl
+{
+
+   private TransportConfiguration connector;
+
+   private String nodeID;
+
+   public BackupRegistrationMessage(String nodeId, TransportConfiguration tc)
+   {
+      this();
+      connector = tc;
+      nodeID = nodeId;
+   }
+
+   public BackupRegistrationMessage()
+   {
+      super(BACKUP_REGISTRATION);
+   }
+
+   public String getNodeID()
+   {
+      return nodeID;
+   }
+
+   public TransportConfiguration getConnector()
+   {
+      return connector;
+   }
+
+   @Override
+   public void encodeRest(final HornetQBuffer buffer)
+   {
+      buffer.writeString(nodeID);
+      connector.encode(buffer);
+   }
+
+   @Override
+   public void decodeRest(final HornetQBuffer buffer)
+   {
+      nodeID = buffer.readString();
+      connector = new TransportConfiguration();
+      connector.decode(buffer);
+   }
+
+}

Deleted: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/HaBackupRegistrationMessage.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/HaBackupRegistrationMessage.java	2011-09-07 16:10:18 UTC (rev 11303)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/HaBackupRegistrationMessage.java	2011-09-08 15:06:11 UTC (rev 11304)
@@ -1,60 +0,0 @@
-/**
- *
- */
-package org.hornetq.core.protocol.core.impl.wireformat;
-
-import org.hornetq.api.core.HornetQBuffer;
-import org.hornetq.api.core.TransportConfiguration;
-import org.hornetq.core.protocol.core.impl.PacketImpl;
-
-/**
- * Registers a backup node with its live server.
- * <p>
- * After registration the live server will initiate synchronization of its state with the new backup
- * node.
- */
-public class HaBackupRegistrationMessage extends PacketImpl
-{
-
-   private TransportConfiguration connector;
-
-   private String nodeID;
-
-   public HaBackupRegistrationMessage(String nodeId, TransportConfiguration tc)
-   {
-      this();
-      connector = tc;
-      nodeID = nodeId;
-   }
-
-   public HaBackupRegistrationMessage()
-   {
-      super(HA_BACKUP_REGISTRATION);
-   }
-
-   public String getNodeID()
-   {
-      return nodeID;
-   }
-
-   public TransportConfiguration getConnector()
-   {
-      return connector;
-   }
-
-   @Override
-   public void encodeRest(final HornetQBuffer buffer)
-   {
-      buffer.writeString(nodeID);
-      connector.encode(buffer);
-   }
-
-   @Override
-   public void decodeRest(final HornetQBuffer buffer)
-   {
-      nodeID = buffer.readString();
-      connector = new TransportConfiguration();
-      connector.decode(buffer);
-   }
-
-}

Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java	2011-09-07 16:10:18 UTC (rev 11303)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java	2011-09-08 15:06:11 UTC (rev 11304)
@@ -523,26 +523,14 @@
          return;
       }
 
-      final Journal journalIf = journalsHolder.get(packet.getJournalContentType());
+      final Journal journal = journalsHolder.get(packet.getJournalContentType());
 
-      JournalImpl journal = assertJournalImpl(journalIf);
       Map<Long, JournalFile> mapToFill = filesReservedForSync.get(packet.getJournalContentType());
-      JournalFile current = journal.createFilesForRemoteSync(packet.getFileIds(), mapToFill);
+      JournalFile current = journal.createFilesForBackupSync(packet.getFileIds(), mapToFill);
       registerJournal(packet.getJournalContentType().typeByte,
                       new FileWrapperJournal(current, storage.hasCallbackSupport()));
      }
 
-   // XXX HORNETQ-720 really need to do away with this once the method calls get stable.
-   private static JournalImpl assertJournalImpl(final Journal journalIf) throws HornetQException
-   {
-      if (!(journalIf instanceof JournalImpl))
-      {
-         throw new HornetQException(HornetQException.INTERNAL_ERROR,
-                                    "Journals of backup server are expected to be JournalImpl");
-      }
-      return (JournalImpl)journalIf;
-   }
-
    private void handleLargeMessageEnd(final ReplicationLargeMessageEndMessage packet)
    {
       LargeServerMessage message = lookupLargeMessage(packet.getMessageId(), true);

Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java	2011-09-07 16:10:18 UTC (rev 11303)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java	2011-09-08 15:06:11 UTC (rev 11304)
@@ -577,7 +577,6 @@
    @Override
    public void sendSynchronizationDone()
    {
-      ReplicationStartSyncMessage msg = new ReplicationStartSyncMessage(null, null);
-      sendReplicatePacket(msg);
+      sendReplicatePacket(new ReplicationStartSyncMessage(null, null));
    }
 }

Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java	2011-09-07 16:10:18 UTC (rev 11303)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java	2011-09-08 15:06:11 UTC (rev 11304)
@@ -45,7 +45,7 @@
 import org.hornetq.core.postoffice.Binding;
 import org.hornetq.core.postoffice.PostOffice;
 import org.hornetq.core.protocol.core.Channel;
-import org.hornetq.core.protocol.core.impl.wireformat.HaBackupRegistrationMessage;
+import org.hornetq.core.protocol.core.impl.wireformat.BackupRegistrationMessage;
 import org.hornetq.core.protocol.core.impl.wireformat.NodeAnnounceMessage;
 import org.hornetq.core.server.HornetQServer;
 import org.hornetq.core.server.Queue;
@@ -480,7 +480,7 @@
                      "'. backup cannot be announced.");
             return;
          }
-         liveChannel.send(new HaBackupRegistrationMessage(nodeUUID.toString(), connector));
+         liveChannel.send(new BackupRegistrationMessage(nodeUUID.toString(), connector));
       }
       else
       {

Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java	2011-09-07 16:10:18 UTC (rev 11303)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java	2011-09-08 15:06:11 UTC (rev 11304)
@@ -570,7 +570,6 @@
                         Channel pingChannel = liveConnection.getChannel(CHANNEL_ID.PING.id, -1);
                         Channel replicationChannel = liveConnection.getChannel(CHANNEL_ID.REPLICATION.id, -1);
 
-                        replicationChannel.setHandler(replicationEndpoint);
                         connectToReplicationEndpoint(replicationChannel);
                         replicationEndpoint.start();
 
@@ -1064,10 +1063,6 @@
       return session;
    }
 
-   /**
-    * XXX FIXME to be made private, and method removed from Server interface once HORNETQ-720 is
-    * finished.
-    */
    private synchronized ReplicationEndpoint connectToReplicationEndpoint(final Channel channel) throws Exception
    {
       if (!configuration.isBackup())
@@ -1076,8 +1071,7 @@
                   getIdentity());
       }
 
-      if (replicationEndpoint == null)
-         System.err.println("endpoint is null!");
+      channel.setHandler(replicationEndpoint);
 
       if (replicationEndpoint.getChannel() != null)
       {

Modified: branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/Journal.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/Journal.java	2011-09-07 16:10:18 UTC (rev 11303)
+++ branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/Journal.java	2011-09-08 15:06:11 UTC (rev 11304)
@@ -14,7 +14,9 @@
 package org.hornetq.core.journal;
 
 import java.util.List;
+import java.util.Map;
 
+import org.hornetq.core.journal.impl.JournalFile;
 import org.hornetq.core.server.HornetQComponent;
 
 /**
@@ -139,4 +141,19 @@
 
    void runDirectJournalBlast() throws Exception;
 
+   /**
+    * Reserves journal file IDs, creates the necessary files for synchronization, and places
+    * references to these (reserved for sync) files in the map.
+    * <p>
+    * During the synchronization between a live server and backup, we reserve in the backup the
+    * journal file IDs used in the live server. This call also makes sure the files are created
+    * empty without any kind of headers added.
+    * @param fileIds ids to reserve for synchronization
+    * @param mapToFill map to be filled with id and journal file pairs for <b>synchronization</b>.
+    * @return a new {@link JournalFile} to be used for regular <b>replication</b> during
+    *         synchronization
+    * @throws Exception
+    */
+   JournalFile createFilesForBackupSync(long[] fileIds, Map<Long, JournalFile> mapToFill) throws Exception;
+
 }

Modified: branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/JournalImpl.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/JournalImpl.java	2011-09-07 16:10:18 UTC (rev 11303)
+++ branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/JournalImpl.java	2011-09-08 15:06:11 UTC (rev 11304)
@@ -3106,7 +3106,8 @@
     * @return
     * @throws Exception
     */
-   public JournalFile createFilesForRemoteSync(long[] fileIds, Map<Long, JournalFile> map) throws Exception
+   @Override
+   public JournalFile createFilesForBackupSync(long[] fileIds, Map<Long, JournalFile> map) throws Exception
    {
       writeLock();
       try

Modified: branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/util/BackupSyncDelay.java
===================================================================
--- branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/util/BackupSyncDelay.java	2011-09-07 16:10:18 UTC (rev 11303)
+++ branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/util/BackupSyncDelay.java	2011-09-08 15:06:11 UTC (rev 11304)
@@ -58,7 +58,7 @@
    @Override
    public boolean intercept(Packet packet, RemotingConnection connection) throws HornetQException
    {
-      if (packet.getType() == PacketImpl.HA_BACKUP_REGISTRATION)
+      if (packet.getType() == PacketImpl.BACKUP_REGISTRATION)
       {
          try
          {



More information about the hornetq-commits mailing list