Author: ataylor
Date: 2010-10-27 09:43:09 -0400 (Wed, 27 Oct 2010)
New Revision: 9816
Modified:
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/cluster/impl/DiscoveryGroupImpl.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/RemotingConnectionImpl.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/wireformat/DisconnectMessage.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/stomp/StompConnection.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/remoting/server/RemotingService.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/NodeManager.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/ClusterManager.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/impl/FileLockNodeManager.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/impl/InVMNodeManager.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/spi/core/protocol/RemotingConnection.java
Log:
added failback restart backup server support
Modified:
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
===================================================================
---
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java 2010-10-26
08:17:00 UTC (rev 9815)
+++
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java 2010-10-27
13:43:09 UTC (rev 9816)
@@ -1159,7 +1159,7 @@
// cause reconnect loop
public void run()
{
- conn.fail(new HornetQException(HornetQException.DISCONNECTED,
+ conn.fail(new
HornetQException(msg.isFailoverOnServerShutdown()?HornetQException.NOT_CONNECTED:HornetQException.DISCONNECTED,
"The connection was disconnected
because of server shutdown"));
if (msg.getNodeID() != null)
{
Modified:
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/cluster/impl/DiscoveryGroupImpl.java
===================================================================
---
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/cluster/impl/DiscoveryGroupImpl.java 2010-10-26
08:17:00 UTC (rev 9815)
+++
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/cluster/impl/DiscoveryGroupImpl.java 2010-10-27
13:43:09 UTC (rev 9816)
@@ -165,6 +165,11 @@
started = false;
}
+ synchronized (waitLock)
+ {
+ waitLock.notify();
+ }
+
try
{
thread.join();
@@ -173,6 +178,7 @@
{
}
+
socket.close();
socket = null;
@@ -222,7 +228,7 @@
long toWait = timeout;
- while (!received && (toWait > 0 || timeout == 0))
+ while (started && !received && (toWait > 0 || timeout == 0))
{
try
{
Modified:
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/RemotingConnectionImpl.java
===================================================================
---
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/RemotingConnectionImpl.java 2010-10-26
08:17:00 UTC (rev 9815)
+++
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/RemotingConnectionImpl.java 2010-10-27
13:43:09 UTC (rev 9816)
@@ -288,7 +288,7 @@
callClosingListeners();
}
- public void disconnect()
+ public void disconnect(boolean failoverOnServerShutdown)
{
Channel channel0 = getChannel(0, -1);
@@ -307,7 +307,7 @@
channel.flushConfirmations();
}
- Packet disconnect = new DisconnectMessage(nodeID);
+ Packet disconnect = new DisconnectMessage(nodeID, failoverOnServerShutdown);
channel0.sendAndFlush(disconnect);
}
Modified:
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/wireformat/DisconnectMessage.java
===================================================================
---
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/wireformat/DisconnectMessage.java 2010-10-26
08:17:00 UTC (rev 9815)
+++
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/core/impl/wireformat/DisconnectMessage.java 2010-10-27
13:43:09 UTC (rev 9816)
@@ -31,16 +31,20 @@
// Attributes ----------------------------------------------------
private SimpleString nodeID;
+
+ private boolean failoverOnServerShutdown;
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
- public DisconnectMessage(final SimpleString nodeID)
+ public DisconnectMessage(final SimpleString nodeID, boolean failoverOnServerShutdown)
{
super(PacketImpl.DISCONNECT);
this.nodeID = nodeID;
+
+ this.failoverOnServerShutdown = failoverOnServerShutdown;
}
public DisconnectMessage()
@@ -55,16 +59,24 @@
return nodeID;
}
+ public boolean isFailoverOnServerShutdown()
+ {
+ return failoverOnServerShutdown;
+ }
+
@Override
public void encodeRest(final HornetQBuffer buffer)
{
buffer.writeNullableSimpleString(nodeID);
+
+ buffer.writeBoolean(failoverOnServerShutdown);
}
@Override
public void decodeRest(final HornetQBuffer buffer)
{
nodeID = buffer.readNullableSimpleString();
+ failoverOnServerShutdown = buffer.readBoolean();
}
@Override
Modified:
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/stomp/StompConnection.java
===================================================================
---
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/stomp/StompConnection.java 2010-10-26
08:17:00 UTC (rev 9815)
+++
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/protocol/stomp/StompConnection.java 2010-10-27
13:43:09 UTC (rev 9816)
@@ -89,7 +89,7 @@
manager.cleanup(this);
}
- public void disconnect()
+ public void disconnect(boolean clientFailover)
{
}
Modified:
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/remoting/server/RemotingService.java
===================================================================
---
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/remoting/server/RemotingService.java 2010-10-26
08:17:00 UTC (rev 9815)
+++
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/remoting/server/RemotingService.java 2010-10-27
13:43:09 UTC (rev 9816)
@@ -45,4 +45,6 @@
void freeze();
RemotingConnection getServerSideReplicatingConnection();
+
+ void stop(boolean failoverOnServerShutdown) throws Exception;
}
Modified:
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java
===================================================================
---
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java 2010-10-26
08:17:00 UTC (rev 9815)
+++
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java 2010-10-27
13:43:09 UTC (rev 9816)
@@ -30,9 +30,7 @@
import org.hornetq.api.core.HornetQBuffer;
import org.hornetq.api.core.HornetQException;
import org.hornetq.api.core.Interceptor;
-import org.hornetq.api.core.Pair;
import org.hornetq.api.core.TransportConfiguration;
-import org.hornetq.api.core.client.ClusterTopologyListener;
import org.hornetq.core.config.Configuration;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.protocol.core.ServerSessionPacketHandler;
@@ -255,6 +253,11 @@
public void stop() throws Exception
{
+ stop(false);
+ }
+
+ public void stop(boolean clientFailover) throws Exception
+ {
if (!started)
{
return;
@@ -279,7 +282,7 @@
{
RemotingConnection conn = entry.connection;
- conn.disconnect();
+ conn.disconnect(clientFailover);
}
for (Acceptor acceptor : acceptors)
Modified:
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/NodeManager.java
===================================================================
---
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/NodeManager.java 2010-10-26
08:17:00 UTC (rev 9815)
+++
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/NodeManager.java 2010-10-27
13:43:09 UTC (rev 9816)
@@ -74,5 +74,5 @@
public abstract boolean isAwaitingFailback() throws Exception;
- public abstract void killServer();
+ public abstract boolean isBackupLive() throws Exception;
}
Modified:
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/ClusterManager.java
===================================================================
---
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/ClusterManager.java 2010-10-26
08:17:00 UTC (rev 9815)
+++
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/ClusterManager.java 2010-10-27
13:43:09 UTC (rev 9816)
@@ -52,4 +52,6 @@
void notifyNodeUp(String nodeID, String sourceNodeID, Pair<TransportConfiguration,
TransportConfiguration> connectorPair, boolean backup, int distance);
Topology getTopology();
+
+ void announceBackup() throws Exception;
}
Modified:
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
===================================================================
---
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2010-10-26
08:17:00 UTC (rev 9815)
+++
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2010-10-27
13:43:09 UTC (rev 9816)
@@ -210,7 +210,10 @@
}
catch (Exception e)
{
- log.warn("did not connect the cluster connection to other
nodes", e);
+ if(started)
+ {
+ log.warn("did not connect the cluster connection to other
nodes", e);
+ }
}
}
});
Modified:
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
===================================================================
---
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java 2010-10-26
08:17:00 UTC (rev 9815)
+++
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java 2010-10-27
13:43:09 UTC (rev 9816)
@@ -362,6 +362,7 @@
try
{
broadcastGroup.start();
+ broadcastGroup.activate();
}
catch (Exception e)
{
@@ -405,6 +406,29 @@
}
}
+ public void announceBackup() throws Exception
+ {
+ List<ClusterConnectionConfiguration> configs =
this.configuration.getClusterConfigurations();
+ if(!configs.isEmpty())
+ {
+ ClusterConnectionConfiguration config = configs.get(0);
+
+ TransportConfiguration connector =
configuration.getConnectorConfigurations().get(config.getConnectorName());
+
+ if (connector == null)
+ {
+ log.warn("No connecor with name '" + config.getConnectorName()
+
+ "'. backup cannot be announced.");
+ return;
+ }
+ announceBackup(config, connector);
+ }
+ else
+ {
+ log.warn("no cluster connections defined, unable to announce
backup");
+ }
+ }
+
private synchronized void announceNode()
{
// TODO does this really work with more than one cluster connection? I think not
@@ -784,6 +808,7 @@
{
return;
}
+ log.info("announcing backup");
backupSessionFactory = locator.connect();
backupSessionFactory.getConnection().getChannel(0, -1).send(new
NodeAnnounceMessage(nodeUUID.toString(), nodeUUID.toString(), true, connector));
}
Modified:
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/impl/FileLockNodeManager.java
===================================================================
---
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/impl/FileLockNodeManager.java 2010-10-26
08:17:00 UTC (rev 9815)
+++
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/impl/FileLockNodeManager.java 2010-10-27
13:43:09 UTC (rev 9816)
@@ -36,6 +36,14 @@
private final String SERVER_LOCK_NAME = "server.lock";
+ private static final String ACCESS_MODE = "rw";
+
+ private static final int LIVE_LOCK_POS = 1;
+
+ private static final int BACKUP_LOCK_POS = 2;
+
+ private static final int LOCK_LENGTH = 1;
+
private static final byte LIVE = 'L';
private static final byte FAILINGBACK = 'F';
@@ -52,6 +60,7 @@
private final String directory;
+
public FileLockNodeManager(final String directory)
{
this.directory = directory;
@@ -70,7 +79,7 @@
file.createNewFile();
}
- RandomAccessFile raFile = new RandomAccessFile(file, "rw");
+ RandomAccessFile raFile = new RandomAccessFile(file, ACCESS_MODE);
channel = raFile.getChannel();
@@ -92,12 +101,20 @@
return getState() == FAILINGBACK;
}
- @Override
- public void killServer()
+ public boolean isBackupLive() throws Exception
{
- System.exit(0);
+ FileLock liveAttemptLock;
+ liveAttemptLock = channel.tryLock(LIVE_LOCK_POS, LOCK_LENGTH, false);
+ if(liveAttemptLock == null)
+ {
+ return true;
+ }
+ else
+ {
+ liveAttemptLock.release();
+ return false;
+ }
}
-
@Override
public void releaseBackup() throws Exception
{
@@ -115,7 +132,7 @@
Thread.sleep(2000);
}
- liveLock = channel.lock(1, 1, false);
+ liveLock = channel.lock(LIVE_LOCK_POS, 1, false);
byte state = getState();
@@ -144,7 +161,7 @@
log.info("Waiting to become backup node");
- backupLock = channel.lock(2, 1, false);
+ backupLock = channel.lock(BACKUP_LOCK_POS, LOCK_LENGTH, false);
log.info("** got backup lock");
@@ -157,7 +174,7 @@
log.info("Waiting to obtain live lock");
- liveLock = channel.lock(1, 1, false);
+ liveLock = channel.lock(LIVE_LOCK_POS, LOCK_LENGTH, false);
log.info("Live Server Obtained live lock");
Modified:
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
---
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2010-10-26
08:17:00 UTC (rev 9815)
+++
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2010-10-27
13:43:09 UTC (rev 9816)
@@ -305,10 +305,19 @@
checkJournalDirectory();
+ initialisePart1();
+
+ if(nodeManager.isBackupLive())
+ {
+ //looks like we've failed over at some point need to inform that we
are the backup so when the current live
+ // goes down they failover to us
+ clusterManager.announceBackup();
+ //
+ Thread.sleep(2000);
+ }
+
nodeManager.startLiveNode();
- initialisePart1();
-
initialisePart2();
log.info("Server is now live");
@@ -362,25 +371,43 @@
nodeManager.releaseBackup();
if(configuration.isAllowAutoFailBack())
{
- //todo dont hardcode schedule timings
- scheduledPool.scheduleAtFixedRate(new Runnable()
+ class FailbackChecker implements Runnable
{
+ boolean restarting = false;
public void run()
{
try
{
- if(nodeManager.isAwaitingFailback())
+ if(!restarting && nodeManager.isAwaitingFailback())
{
- log.info("live server wants to restart, killing
server");
- nodeManager.killServer();
+ log.info("live server wants to restart, restarting server
in backup");
+ restarting = true;
+ Thread t = new Thread(new Runnable()
+ {
+ public void run()
+ {
+ try
+ {
+ stop(true);
+ configuration.setBackup(true);
+ start();
+ }
+ catch (Exception e)
+ {
+ log.info("unable to restart server, please kill
and restart manually", e);
+ }
+ }
+ });
+ t.start();
}
}
catch (Exception e)
{
- log.warn("unable to kill server, please kill manually to
force failback");
+ //hopefully it will work next call
}
}
- }, 1000l, 1000l, TimeUnit.MILLISECONDS);
+ }
+ scheduledPool.scheduleAtFixedRate(new FailbackChecker(), 1000l, 1000l,
TimeUnit.MILLISECONDS);
}
}
catch (InterruptedException e)
@@ -586,7 +613,7 @@
{
System.out.println("HornetQServerImpl.stop");
}
- remotingService.stop();
+ remotingService.stop(permanently);
synchronized (this)
{
Modified:
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/impl/InVMNodeManager.java
===================================================================
---
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/impl/InVMNodeManager.java 2010-10-26
08:17:00 UTC (rev 9815)
+++
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/impl/InVMNodeManager.java 2010-10-27
13:43:09 UTC (rev 9816)
@@ -123,9 +123,9 @@
}
@Override
- public void killServer()
+ public boolean isBackupLive() throws Exception
{
- //todo
+ return liveLock.availablePermits() == 0;
}
private void releaseBackupNode()
Modified:
branches/2_2_0_HA_Improvements/src/main/org/hornetq/spi/core/protocol/RemotingConnection.java
===================================================================
---
branches/2_2_0_HA_Improvements/src/main/org/hornetq/spi/core/protocol/RemotingConnection.java 2010-10-26
08:17:00 UTC (rev 9815)
+++
branches/2_2_0_HA_Improvements/src/main/org/hornetq/spi/core/protocol/RemotingConnection.java 2010-10-27
13:43:09 UTC (rev 9816)
@@ -137,8 +137,9 @@
/**
* Disconnect the connection, closing all channels
+ * @param clientFailover
*/
- void disconnect();
+ void disconnect(boolean clientFailover);
/**
* returns true if any data has been received since the last time this method was
called.