Author: borges
Date: 2011-06-27 10:20:38 -0400 (Mon, 27 Jun 2011)
New Revision: 10887
Added:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/HaBackupRegistrationMessage.java
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/PacketDecoder.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/PacketImpl.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java
Log:
HORNETQ-720 Add a HaBackupRegistrationMessage (still not functional)
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java
===================================================================
---
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java 2011-06-27
14:18:52 UTC (rev 10886)
+++
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java 2011-06-27
14:20:38 UTC (rev 10887)
@@ -30,6 +30,7 @@
import org.hornetq.core.protocol.core.Packet;
import org.hornetq.core.protocol.core.ServerSessionPacketHandler;
import org.hornetq.core.protocol.core.impl.wireformat.ClusterTopologyChangeMessage;
+import org.hornetq.core.protocol.core.impl.wireformat.HaBackupRegistrationMessage;
import org.hornetq.core.protocol.core.impl.wireformat.NodeAnnounceMessage;
import org.hornetq.core.protocol.core.impl.wireformat.Ping;
import
org.hornetq.core.protocol.core.impl.wireformat.SubscribeClusterTopologyUpdatesMessage;
@@ -108,24 +109,24 @@
else if (packet.getType() == PacketImpl.SUBSCRIBE_TOPOLOGY)
{
SubscribeClusterTopologyUpdatesMessage msg =
(SubscribeClusterTopologyUpdatesMessage)packet;
-
+
final ClusterTopologyListener listener = new ClusterTopologyListener()
{
public void nodeUP(String nodeID, Pair<TransportConfiguration,
TransportConfiguration> connectorPair, boolean last)
{
channel0.send(new ClusterTopologyChangeMessage(nodeID,
connectorPair, last));
}
-
+
public void nodeDown(String nodeID)
{
channel0.send(new ClusterTopologyChangeMessage(nodeID));
}
};
-
+
final boolean isCC = msg.isClusterConnection();
-
+
server.getClusterManager().addClusterTopologyListener(listener, isCC);
-
+
rc.addCloseListener(new CloseListener()
{
public void connectionClosed()
@@ -149,15 +150,21 @@
}
server.getClusterManager().notifyNodeUp(msg.getNodeID(), pair, false,
true);
}
+ else if (packet.getType() == PacketImpl.HA_BACKUP_REGISTRATION)
+ {
+ HaBackupRegistrationMessage msg = (HaBackupRegistrationMessage)packet;
+ System.out.println("HA_BACKUP_REGISTRATION: " + msg);
+ System.out.println("HA_BR: " + server.getIdentity() + ",
toString=" + server);
+ }
}
});
-
-
+
+
return entry;
}
- private Map<String, ServerSessionPacketHandler> sessionHandlers = new
ConcurrentHashMap<String, ServerSessionPacketHandler>();
+ private final Map<String, ServerSessionPacketHandler> sessionHandlers = new
ConcurrentHashMap<String, ServerSessionPacketHandler>();
public ServerSessionPacketHandler getSessionHandler(final String sessionName)
{
@@ -179,9 +186,15 @@
}
// This is never called using the core protocol, since we override the
HornetQFrameDecoder with our core
- // optimised version HornetQFrameDecoder2, which nevers calls this
+ // optimised version HornetQFrameDecoder2, which never calls this
public int isReadyToHandle(HornetQBuffer buffer)
{
return -1;
}
+
+ @Override
+ public String toString()
+ {
+ return "CoreProtocolManager(server=" + server + ")";
+ }
}
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/PacketDecoder.java
===================================================================
---
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/PacketDecoder.java 2011-06-27
14:18:52 UTC (rev 10886)
+++
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/PacketDecoder.java 2011-06-27
14:20:38 UTC (rev 10887)
@@ -93,6 +93,7 @@
import org.hornetq.core.protocol.core.impl.wireformat.CreateSessionMessage;
import org.hornetq.core.protocol.core.impl.wireformat.CreateSessionResponseMessage;
import org.hornetq.core.protocol.core.impl.wireformat.DisconnectMessage;
+import org.hornetq.core.protocol.core.impl.wireformat.HaBackupRegistrationMessage;
import org.hornetq.core.protocol.core.impl.wireformat.HornetQExceptionMessage;
import org.hornetq.core.protocol.core.impl.wireformat.NodeAnnounceMessage;
import org.hornetq.core.protocol.core.impl.wireformat.NullResponseMessage;
@@ -163,7 +164,7 @@
public class PacketDecoder
{
private static final Logger log = Logger.getLogger(PacketDecoder.class);
-
+
public Packet decode(final HornetQBuffer in)
{
final byte packetType = in.readByte();
@@ -524,6 +525,11 @@
packet = new SessionAddMetaDataMessageV2();
break;
}
+ case PacketImpl.HA_BACKUP_REGISTRATION:
+ {
+ packet = new HaBackupRegistrationMessage();
+ break;
+ }
default:
{
throw new IllegalArgumentException("Invalid type: " + packetType);
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/PacketImpl.java
===================================================================
---
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/PacketImpl.java 2011-06-27
14:18:52 UTC (rev 10886)
+++
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/PacketImpl.java 2011-06-27
14:20:38 UTC (rev 10887)
@@ -149,7 +149,7 @@
public static final byte SESS_PRODUCER_REQUEST_CREDITS = 79;
public static final byte SESS_PRODUCER_CREDITS = 80;
-
+
public static final byte SESS_INDIVIDUAL_ACKNOWLEDGE = 81;
// Replication
@@ -183,17 +183,19 @@
public static final byte REPLICATION_SYNC = 103;
// HA
-
+
public static final byte SESS_ADD_METADATA = 104;
-
+
public static final byte SESS_ADD_METADATA2 = 105;
-
+
public static final byte CLUSTER_TOPOLOGY = 110;
public static final byte NODE_ANNOUNCE = 111;
public static final byte SUBSCRIBE_TOPOLOGY = 112;
+ public static final byte HA_BACKUP_REGISTRATION = 113;
+
// Static --------------------------------------------------------
public PacketImpl(final byte type)
@@ -276,7 +278,7 @@
{
return true;
}
-
+
public boolean isAsyncExec()
{
return false;
Added:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/HaBackupRegistrationMessage.java
===================================================================
---
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/HaBackupRegistrationMessage.java
(rev 0)
+++
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/HaBackupRegistrationMessage.java 2011-06-27
14:20:38 UTC (rev 10887)
@@ -0,0 +1,60 @@
+/**
+ *
+ */
+package org.hornetq.core.protocol.core.impl.wireformat;
+
+import org.hornetq.api.core.HornetQBuffer;
+import org.hornetq.api.core.TransportConfiguration;
+import org.hornetq.core.protocol.core.impl.PacketImpl;
+
+/**
+ * Registers a backup node with its live server.
+ * <p>
+ * After registration the live server will initiate synchronization of its state with the
new backup
+ * node.
+ */
+public class HaBackupRegistrationMessage extends PacketImpl
+{
+
+ private TransportConfiguration connector;
+
+ private String nodeID;
+
+ public HaBackupRegistrationMessage(String nodeId, TransportConfiguration tc)
+ {
+ this();
+ connector = tc;
+ nodeID = nodeId;
+ }
+
+ public HaBackupRegistrationMessage()
+ {
+ super(HA_BACKUP_REGISTRATION);
+ }
+
+ public String getNodeID()
+ {
+ return nodeID;
+ }
+
+ public TransportConfiguration getConnector()
+ {
+ return connector;
+ }
+
+ @Override
+ public void encodeRest(final HornetQBuffer buffer)
+ {
+ buffer.writeString(nodeID);
+ connector.encode(buffer);
+ }
+
+ @Override
+ public void decodeRest(final HornetQBuffer buffer)
+ {
+ nodeID = buffer.readString();
+ connector = new TransportConfiguration();
+ connector.decode(buffer);
+ }
+
+}
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
---
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java 2011-06-27
14:18:52 UTC (rev 10886)
+++
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java 2011-06-27
14:20:38 UTC (rev 10887)
@@ -80,7 +80,7 @@
import org.hornetq.core.postoffice.impl.LocalQueueBinding;
import org.hornetq.core.postoffice.impl.PostOfficeImpl;
import org.hornetq.core.protocol.core.Channel;
-import org.hornetq.core.protocol.core.impl.wireformat.NodeAnnounceMessage;
+import org.hornetq.core.protocol.core.impl.wireformat.HaBackupRegistrationMessage;
import org.hornetq.core.remoting.server.RemotingService;
import org.hornetq.core.remoting.server.impl.RemotingServiceImpl;
import org.hornetq.core.replication.ReplicationEndpoint;
@@ -521,14 +521,14 @@
initialisePart1();
clusterManager.start();
- // Try-Connect to live server using live-connector-ref
- String liveConnectorsName = configuration.getLiveConnectorName();
- if (liveConnectorsName == null)
+
+
+ String liveConnectorName = configuration.getLiveConnectorName();
+ if (liveConnectorName == null)
{
throw new IllegalArgumentException("Cannot have a replicated backup
without configuring its live-server!");
}
- final TransportConfiguration config =
configuration.getConnectorConfigurations().get(liveConnectorsName);
- log.info("config is " + config);
+ final TransportConfiguration config =
configuration.getConnectorConfigurations().get(liveConnectorName);
final ServerLocatorInternal serverLocator =
(ServerLocatorInternal)HornetQClient.createServerLocatorWithoutHA(config);
@@ -536,28 +536,30 @@
// sit in loop and try and connect, if server is not live then it will return
NOT_LIVE
final ClientSessionFactory liveServerSessionFactory =
serverLocator.connect();
- if (liveServerSessionFactory != null)
+ if (liveServerSessionFactory == null)
{
- log.debug("announce backup to live-server");
- liveServerSessionFactory.getConnection()
- .getChannel(0, -1)
- .send(new
NodeAnnounceMessage(getNodeID().toString(), true, config));
- log.info("backup announced");
+ // XXX
+ throw new RuntimeException("Need to retry...");
}
+ log.info("announce backup to live-server (id=" + liveConnectorName
+ ")");
+ liveServerSessionFactory.getConnection()
+ .getChannel(0, -1)
+ .send(new
HaBackupRegistrationMessage(getNodeID().toString(), config));
+ log.info("backup registered");
+
started = true;
log.info("HornetQ Backup Server version " +
getVersion().getFullVersion() + " [" + nodeManager.getNodeId() +
"] started, waiting live to fail before it gets active");
-
nodeManager.awaitLiveNode();
+ // Server node (i.e. Life node) is not running, now the backup takes over.
- // XXX ???
+ // XXX this really belongs to this point?
+ initialisePart2();
+
configuration.setBackup(false);
- // XXX
-
- initialisePart2();
}
catch (Exception e)
{
@@ -1247,28 +1249,25 @@
// private boolean startReplication() throws Exception
// {
- // String backupConnectorName = configuration.getBackupConnectorName();
+ // // get list of backup names!
//
// if (!configuration.isSharedStore() && backupConnectorName != null)
// {
- // TransportConfiguration backupConnector =
configuration.getConnectorConfigurations().get(backupConnectorName);
+ // TransportConfiguration backupConnector =
+ // configuration.getConnectorConfigurations().get(backupConnectorName);
//
// if (backupConnector == null)
// {
// HornetQServerImpl.log.warn("connector with name '" +
backupConnectorName +
// "' is not defined in the configuration.");
+ // return false;
// }
- // else
- // {
- //
// replicationFailoverManager =
createBackupConnectionFailoverManager(backupConnector,
- // threadPool,
- // scheduledPool);
+ // threadPool, scheduledPool);
//
// replicationManager = new ReplicationManagerImpl(replicationFailoverManager,
executorFactory);
// replicationManager.start();
// }
- // }
//
// return true;
// }
Modified:
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java
===================================================================
---
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java 2011-06-27
14:18:52 UTC (rev 10886)
+++
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java 2011-06-27
14:20:38 UTC (rev 10887)
@@ -165,6 +165,8 @@
config1.setLargeMessagesDirectory(config1.getLargeMessagesDirectory() +
"_backup");
config1.getAcceptorConfigurations().clear();
config1.getAcceptorConfigurations().add(getAcceptorTransportConfiguration(false));
+ config1.getConnectorConfigurations().put(LIVE_NODE_NAME,
getConnectorTransportConfiguration(true));
+ //liveConfig.setBackupConnectorName("toBackup");
config1.setSecurityEnabled(false);
config1.setSharedStore(false);
config1.setBackup(true);
@@ -177,8 +179,6 @@
config0.getAcceptorConfigurations().clear();
config0.getAcceptorConfigurations().add(getAcceptorTransportConfiguration(true));
config0.setName(LIVE_NODE_NAME);
- config0.getConnectorConfigurations().put("toBackup",
getConnectorTransportConfiguration(false));
- //liveConfig.setBackupConnectorName("toBackup");
config0.setSecurityEnabled(false);
config0.setSharedStore(false);
liveConfig = config0;