[hornetq-commits] JBoss hornetq SVN: r10887 - in branches/HORNETQ-720_Replication: hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat and 2 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Mon Jun 27 10:20:39 EDT 2011


Author: borges
Date: 2011-06-27 10:20:38 -0400 (Mon, 27 Jun 2011)
New Revision: 10887

Added:
   branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/HaBackupRegistrationMessage.java
Modified:
   branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java
   branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/PacketDecoder.java
   branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/PacketImpl.java
   branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java
   branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java
Log:
HORNETQ-720 Add a HaBackupRegistrationMessage (still not functional)

Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java	2011-06-27 14:18:52 UTC (rev 10886)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java	2011-06-27 14:20:38 UTC (rev 10887)
@@ -30,6 +30,7 @@
 import org.hornetq.core.protocol.core.Packet;
 import org.hornetq.core.protocol.core.ServerSessionPacketHandler;
 import org.hornetq.core.protocol.core.impl.wireformat.ClusterTopologyChangeMessage;
+import org.hornetq.core.protocol.core.impl.wireformat.HaBackupRegistrationMessage;
 import org.hornetq.core.protocol.core.impl.wireformat.NodeAnnounceMessage;
 import org.hornetq.core.protocol.core.impl.wireformat.Ping;
 import org.hornetq.core.protocol.core.impl.wireformat.SubscribeClusterTopologyUpdatesMessage;
@@ -108,24 +109,24 @@
             else if (packet.getType() == PacketImpl.SUBSCRIBE_TOPOLOGY)
             {
                SubscribeClusterTopologyUpdatesMessage msg = (SubscribeClusterTopologyUpdatesMessage)packet;
-               
+
                final ClusterTopologyListener listener = new ClusterTopologyListener()
                {
                   public void nodeUP(String nodeID, Pair<TransportConfiguration, TransportConfiguration> connectorPair, boolean last)
                   {
                      channel0.send(new ClusterTopologyChangeMessage(nodeID, connectorPair, last));
                   }
-                  
+
                   public void nodeDown(String nodeID)
                   {
                      channel0.send(new ClusterTopologyChangeMessage(nodeID));
                   }
                };
-               
+
                final boolean isCC = msg.isClusterConnection();
-               
+
                server.getClusterManager().addClusterTopologyListener(listener, isCC);
-               
+
                rc.addCloseListener(new CloseListener()
                {
                   public void connectionClosed()
@@ -149,15 +150,21 @@
                }
                server.getClusterManager().notifyNodeUp(msg.getNodeID(), pair, false, true);
             }
+            else if (packet.getType() == PacketImpl.HA_BACKUP_REGISTRATION)
+            {
+               HaBackupRegistrationMessage msg = (HaBackupRegistrationMessage)packet;
+               System.out.println("HA_BACKUP_REGISTRATION: " + msg);
+               System.out.println("HA_BR: " + server.getIdentity() + ", toString=" + server);
+            }
          }
       });
-      
-      
 
+
+
       return entry;
    }
 
-   private Map<String, ServerSessionPacketHandler> sessionHandlers = new ConcurrentHashMap<String, ServerSessionPacketHandler>();
+   private final Map<String, ServerSessionPacketHandler> sessionHandlers = new ConcurrentHashMap<String, ServerSessionPacketHandler>();
 
    public ServerSessionPacketHandler getSessionHandler(final String sessionName)
    {
@@ -179,9 +186,15 @@
    }
 
    // This is never called using the core protocol, since we override the HornetQFrameDecoder with our core
-   // optimised version HornetQFrameDecoder2, which nevers calls this
+   // optimised version HornetQFrameDecoder2, which never calls this
    public int isReadyToHandle(HornetQBuffer buffer)
    {
       return -1;
    }
+
+   @Override
+   public String toString()
+   {
+      return "CoreProtocolManager(server=" + server + ")";
+   }
 }

Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/PacketDecoder.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/PacketDecoder.java	2011-06-27 14:18:52 UTC (rev 10886)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/PacketDecoder.java	2011-06-27 14:20:38 UTC (rev 10887)
@@ -93,6 +93,7 @@
 import org.hornetq.core.protocol.core.impl.wireformat.CreateSessionMessage;
 import org.hornetq.core.protocol.core.impl.wireformat.CreateSessionResponseMessage;
 import org.hornetq.core.protocol.core.impl.wireformat.DisconnectMessage;
+import org.hornetq.core.protocol.core.impl.wireformat.HaBackupRegistrationMessage;
 import org.hornetq.core.protocol.core.impl.wireformat.HornetQExceptionMessage;
 import org.hornetq.core.protocol.core.impl.wireformat.NodeAnnounceMessage;
 import org.hornetq.core.protocol.core.impl.wireformat.NullResponseMessage;
@@ -163,7 +164,7 @@
 public class PacketDecoder
 {
    private static final Logger log = Logger.getLogger(PacketDecoder.class);
-   
+
    public Packet decode(final HornetQBuffer in)
    {
       final byte packetType = in.readByte();
@@ -524,6 +525,11 @@
             packet = new SessionAddMetaDataMessageV2();
             break;
          }
+         case PacketImpl.HA_BACKUP_REGISTRATION:
+         {
+            packet = new HaBackupRegistrationMessage();
+            break;
+         }
          default:
          {
             throw new IllegalArgumentException("Invalid type: " + packetType);

Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/PacketImpl.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/PacketImpl.java	2011-06-27 14:18:52 UTC (rev 10886)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/PacketImpl.java	2011-06-27 14:20:38 UTC (rev 10887)
@@ -149,7 +149,7 @@
    public static final byte SESS_PRODUCER_REQUEST_CREDITS = 79;
 
    public static final byte SESS_PRODUCER_CREDITS = 80;
-   
+
    public static final byte SESS_INDIVIDUAL_ACKNOWLEDGE = 81;
 
    // Replication
@@ -183,17 +183,19 @@
    public static final byte REPLICATION_SYNC = 103;
 
    // HA
-   
+
    public static final byte SESS_ADD_METADATA = 104;
-   
+
    public static final byte SESS_ADD_METADATA2 = 105;
-   
+
    public static final byte CLUSTER_TOPOLOGY = 110;
 
    public static final byte NODE_ANNOUNCE = 111;
 
    public static final byte SUBSCRIBE_TOPOLOGY = 112;
 
+   public static final byte HA_BACKUP_REGISTRATION = 113;
+
    // Static --------------------------------------------------------
 
    public PacketImpl(final byte type)
@@ -276,7 +278,7 @@
    {
       return true;
    }
-   
+
    public boolean isAsyncExec()
    {
       return false;

Added: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/HaBackupRegistrationMessage.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/HaBackupRegistrationMessage.java	                        (rev 0)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/HaBackupRegistrationMessage.java	2011-06-27 14:20:38 UTC (rev 10887)
@@ -0,0 +1,60 @@
+/**
+ *
+ */
+package org.hornetq.core.protocol.core.impl.wireformat;
+
+import org.hornetq.api.core.HornetQBuffer;
+import org.hornetq.api.core.TransportConfiguration;
+import org.hornetq.core.protocol.core.impl.PacketImpl;
+
+/**
+ * Registers a backup node with its live server.
+ * <p>
+ * After registration the live server will initiate synchronization of its state with the new backup
+ * node.
+ */
+public class HaBackupRegistrationMessage extends PacketImpl
+{
+
+   private TransportConfiguration connector;
+
+   private String nodeID;
+
+   public HaBackupRegistrationMessage(String nodeId, TransportConfiguration tc)
+   {
+      this();
+      connector = tc;
+      nodeID = nodeId;
+   }
+
+   public HaBackupRegistrationMessage()
+   {
+      super(HA_BACKUP_REGISTRATION);
+   }
+
+   public String getNodeID()
+   {
+      return nodeID;
+   }
+
+   public TransportConfiguration getConnector()
+   {
+      return connector;
+   }
+
+   @Override
+   public void encodeRest(final HornetQBuffer buffer)
+   {
+      buffer.writeString(nodeID);
+      connector.encode(buffer);
+   }
+
+   @Override
+   public void decodeRest(final HornetQBuffer buffer)
+   {
+      nodeID = buffer.readString();
+      connector = new TransportConfiguration();
+      connector.decode(buffer);
+   }
+
+}

Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java	2011-06-27 14:18:52 UTC (rev 10886)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java	2011-06-27 14:20:38 UTC (rev 10887)
@@ -80,7 +80,7 @@
 import org.hornetq.core.postoffice.impl.LocalQueueBinding;
 import org.hornetq.core.postoffice.impl.PostOfficeImpl;
 import org.hornetq.core.protocol.core.Channel;
-import org.hornetq.core.protocol.core.impl.wireformat.NodeAnnounceMessage;
+import org.hornetq.core.protocol.core.impl.wireformat.HaBackupRegistrationMessage;
 import org.hornetq.core.remoting.server.RemotingService;
 import org.hornetq.core.remoting.server.impl.RemotingServiceImpl;
 import org.hornetq.core.replication.ReplicationEndpoint;
@@ -521,14 +521,14 @@
             initialisePart1();
 
             clusterManager.start();
-            // Try-Connect to live server using live-connector-ref
-            String liveConnectorsName = configuration.getLiveConnectorName();
-            if (liveConnectorsName == null)
+
+
+            String liveConnectorName = configuration.getLiveConnectorName();
+            if (liveConnectorName == null)
             {
                throw new IllegalArgumentException("Cannot have a replicated backup without configuring its live-server!");
             }
-            final TransportConfiguration config = configuration.getConnectorConfigurations().get(liveConnectorsName);
-            log.info("config is " + config);
+            final TransportConfiguration config = configuration.getConnectorConfigurations().get(liveConnectorName);
             final ServerLocatorInternal serverLocator =
                      (ServerLocatorInternal)HornetQClient.createServerLocatorWithoutHA(config);
 
@@ -536,28 +536,30 @@
             // sit in loop and try and connect, if server is not live then it will return NOT_LIVE
             final ClientSessionFactory liveServerSessionFactory = serverLocator.connect();
 
-            if (liveServerSessionFactory != null)
+            if (liveServerSessionFactory == null)
             {
-               log.debug("announce backup to live-server");
-               liveServerSessionFactory.getConnection()
-                                       .getChannel(0, -1)
-                                       .send(new NodeAnnounceMessage(getNodeID().toString(), true, config));
-               log.info("backup announced");
+               // XXX
+               throw new RuntimeException("Need to retry...");
             }
+            log.info("announce backup to live-server (id=" + liveConnectorName + ")");
+            liveServerSessionFactory.getConnection()
+                                    .getChannel(0, -1)
+                                    .send(new HaBackupRegistrationMessage(getNodeID().toString(), config));
+            log.info("backup registered");
 
+
             started = true;
 
             log.info("HornetQ Backup Server version " + getVersion().getFullVersion() + " [" + nodeManager.getNodeId() +
                      "] started, waiting live to fail before it gets active");
-
             nodeManager.awaitLiveNode();
+            // Server node (i.e. Life node) is not running, now the backup takes over.
 
-            // XXX ???
+            // XXX this really belongs to this point?
+            initialisePart2();
+
             configuration.setBackup(false);
 
-            // XXX
-
-            initialisePart2();
          }
          catch (Exception e)
          {
@@ -1247,28 +1249,25 @@
 
    // private boolean startReplication() throws Exception
    // {
-   // String backupConnectorName = configuration.getBackupConnectorName();
+   // // get list of backup names!
    //
    // if (!configuration.isSharedStore() && backupConnectorName != null)
    // {
-   // TransportConfiguration backupConnector = configuration.getConnectorConfigurations().get(backupConnectorName);
+   // TransportConfiguration backupConnector =
+   // configuration.getConnectorConfigurations().get(backupConnectorName);
    //
    // if (backupConnector == null)
    // {
    // HornetQServerImpl.log.warn("connector with name '" + backupConnectorName +
    // "' is not defined in the configuration.");
+   // return false;
    // }
-   // else
-   // {
-   //
    // replicationFailoverManager = createBackupConnectionFailoverManager(backupConnector,
-   // threadPool,
-   // scheduledPool);
+   // threadPool, scheduledPool);
    //
    // replicationManager = new ReplicationManagerImpl(replicationFailoverManager, executorFactory);
    // replicationManager.start();
    // }
-   // }
    //
    // return true;
    // }

Modified: branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java
===================================================================
--- branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java	2011-06-27 14:18:52 UTC (rev 10886)
+++ branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java	2011-06-27 14:20:38 UTC (rev 10887)
@@ -165,6 +165,8 @@
       config1.setLargeMessagesDirectory(config1.getLargeMessagesDirectory() + "_backup");
       config1.getAcceptorConfigurations().clear();
       config1.getAcceptorConfigurations().add(getAcceptorTransportConfiguration(false));
+      config1.getConnectorConfigurations().put(LIVE_NODE_NAME, getConnectorTransportConfiguration(true));
+      //liveConfig.setBackupConnectorName("toBackup");
       config1.setSecurityEnabled(false);
       config1.setSharedStore(false);
       config1.setBackup(true);
@@ -177,8 +179,6 @@
       config0.getAcceptorConfigurations().clear();
       config0.getAcceptorConfigurations().add(getAcceptorTransportConfiguration(true));
       config0.setName(LIVE_NODE_NAME);
-      config0.getConnectorConfigurations().put("toBackup", getConnectorTransportConfiguration(false));
-      //liveConfig.setBackupConnectorName("toBackup");
       config0.setSecurityEnabled(false);
       config0.setSharedStore(false);
       liveConfig = config0;



More information about the hornetq-commits mailing list