Author: clebert.suconic(a)jboss.com
Date: 2011-09-16 23:00:35 -0400 (Fri, 16 Sep 2011)
New Revision: 11359
Modified:
branches/Branch_2_2_EAP_cluster4/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
branches/Branch_2_2_EAP_cluster4/src/main/org/hornetq/core/client/impl/ServerLocatorInternal.java
branches/Branch_2_2_EAP_cluster4/src/main/org/hornetq/core/client/impl/Topology.java
branches/Branch_2_2_EAP_cluster4/src/main/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java
branches/Branch_2_2_EAP_cluster4/src/main/org/hornetq/core/server/cluster/ClusterConnection.java
branches/Branch_2_2_EAP_cluster4/src/main/org/hornetq/core/server/cluster/ClusterManager.java
branches/Branch_2_2_EAP_cluster4/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
branches/Branch_2_2_EAP_cluster4/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
branches/Branch_2_2_EAP_cluster4/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
branches/Branch_2_2_EAP_cluster4/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
branches/Branch_2_2_EAP_cluster4/tests/src/org/hornetq/tests/integration/cluster/failover/FailBackAutoTest.java
branches/Branch_2_2_EAP_cluster4/tests/src/org/hornetq/tests/util/ServiceTestBase.java
Log:
cluster cleanup
Modified:
branches/Branch_2_2_EAP_cluster4/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
===================================================================
---
branches/Branch_2_2_EAP_cluster4/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java 2011-09-17
02:31:55 UTC (rev 11358)
+++
branches/Branch_2_2_EAP_cluster4/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java 2011-09-17
03:00:35 UTC (rev 11359)
@@ -1111,6 +1111,11 @@
}
}
+ public String getIdentity()
+ {
+ return identity;
+ }
+
public void setIdentity(String identity)
{
this.identity = identity;
@@ -1282,8 +1287,8 @@
{
log.debug("nodeDown " + this + " nodeID=" + nodeID + "
as being down", new Exception("trace"));
}
-
- if (topology.removeMember(eventTime, nodeID))
+
+ if (!(isClusterConnection() && nodeID.equals(this.getNodeID())) &&
topology.removeMember(eventTime, nodeID))
{
if (topology.isEmpty())
{
Modified:
branches/Branch_2_2_EAP_cluster4/src/main/org/hornetq/core/client/impl/ServerLocatorInternal.java
===================================================================
---
branches/Branch_2_2_EAP_cluster4/src/main/org/hornetq/core/client/impl/ServerLocatorInternal.java 2011-09-17
02:31:55 UTC (rev 11358)
+++
branches/Branch_2_2_EAP_cluster4/src/main/org/hornetq/core/client/impl/ServerLocatorInternal.java 2011-09-17
03:00:35 UTC (rev 11359)
@@ -39,8 +39,12 @@
void setAfterConnectionInternalListener(AfterConnectInternalListener listener);
- /** Used to better identify Cluster Connection Locators on logs while debugging logs
*/
+ /** Used to better identify Cluster Connection Locators on logs. To facilitate
eventual debugging.
+ *
+ * This method used to be on tests interface, but I'm now making it part of the
public interface since*/
void setIdentity(String identity);
+
+ String getIdentity();
void setNodeID(String nodeID);
Modified:
branches/Branch_2_2_EAP_cluster4/src/main/org/hornetq/core/client/impl/Topology.java
===================================================================
---
branches/Branch_2_2_EAP_cluster4/src/main/org/hornetq/core/client/impl/Topology.java 2011-09-17
02:31:55 UTC (rev 11358)
+++
branches/Branch_2_2_EAP_cluster4/src/main/org/hornetq/core/client/impl/Topology.java 2011-09-17
03:00:35 UTC (rev 11359)
@@ -108,7 +108,7 @@
{
if (log.isDebugEnabled())
{
- log.info(this + "::Live node " + nodeId + "=" +
memberInput);
+ log.debug(this + "::node " + nodeId + "=" +
memberInput);
}
memberInput.setUniqueEventID(System.currentTimeMillis());
mapTopology.remove(nodeId);
@@ -212,7 +212,7 @@
currentMember +
", memberInput=" +
memberInput +
- "newMember=" + newMember);
+ "newMember=" + newMember, new Exception
("trace"));
}
@@ -301,7 +301,7 @@
{
if (member.getUniqueEventID() > uniqueEventID)
{
- log.info("The removeMember was issued before the node " + nodeId
+ " was started, ignoring call");
+ log.debug("The removeMember was issued before the node " +
nodeId + " was started, ignoring call");
member = null;
}
else
@@ -482,22 +482,17 @@
public synchronized String describe(final String text)
{
- String desc = text + "\n";
+ String desc = text + "topology on " + this + ":\n";
for (Entry<String, TopologyMember> entry : new HashMap<String,
TopologyMember>(mapTopology).entrySet())
{
desc += "\t" + entry.getKey() + " => " + entry.getValue()
+ "\n";
}
desc += "\t" + "nodes=" + nodes() + "\t" +
"members=" + members();
- return desc;
- }
-
- public void clear()
- {
- if (Topology.log.isDebugEnabled())
+ if (mapTopology.isEmpty())
{
- Topology.log.debug(this + "::clear", new
Exception("trace"));
+ desc += "\tEmpty";
}
- mapTopology.clear();
+ return desc;
}
public int members()
Modified:
branches/Branch_2_2_EAP_cluster4/src/main/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java
===================================================================
---
branches/Branch_2_2_EAP_cluster4/src/main/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java 2011-09-17
02:31:55 UTC (rev 11358)
+++
branches/Branch_2_2_EAP_cluster4/src/main/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java 2011-09-17
03:00:35 UTC (rev 11359)
@@ -178,16 +178,18 @@
};
final boolean isCC = msg.isClusterConnection();
-
- acceptorUsed.getClusterConnection().addClusterTopologyListener(listener,
isCC);
-
- rc.addCloseListener(new CloseListener()
+ if (acceptorUsed.getClusterConnection() != null)
{
- public void connectionClosed()
+
acceptorUsed.getClusterConnection().addClusterTopologyListener(listener, isCC);
+
+ rc.addCloseListener(new CloseListener()
{
-
acceptorUsed.getClusterConnection().removeClusterTopologyListener(listener, isCC);
- }
- });
+ public void connectionClosed()
+ {
+
acceptorUsed.getClusterConnection().removeClusterTopologyListener(listener, isCC);
+ }
+ });
+ }
}
else if (packet.getType() == PacketImpl.NODE_ANNOUNCE)
{
Modified:
branches/Branch_2_2_EAP_cluster4/src/main/org/hornetq/core/server/cluster/ClusterConnection.java
===================================================================
---
branches/Branch_2_2_EAP_cluster4/src/main/org/hornetq/core/server/cluster/ClusterConnection.java 2011-09-17
02:31:55 UTC (rev 11358)
+++
branches/Branch_2_2_EAP_cluster4/src/main/org/hornetq/core/server/cluster/ClusterConnection.java 2011-09-17
03:00:35 UTC (rev 11359)
@@ -62,7 +62,7 @@
// for debug
String describe();
- void announceNode();
+ void informTopology();
void announceBackup();
}
Modified:
branches/Branch_2_2_EAP_cluster4/src/main/org/hornetq/core/server/cluster/ClusterManager.java
===================================================================
---
branches/Branch_2_2_EAP_cluster4/src/main/org/hornetq/core/server/cluster/ClusterManager.java 2011-09-17
02:31:55 UTC (rev 11358)
+++
branches/Branch_2_2_EAP_cluster4/src/main/org/hornetq/core/server/cluster/ClusterManager.java 2011-09-17
03:00:35 UTC (rev 11359)
@@ -49,6 +49,8 @@
void flushExecutor();
void announceBackup() throws Exception;
+
+ void deploy() throws Exception;
void deployBridge(BridgeConfiguration config) throws Exception;
Modified:
branches/Branch_2_2_EAP_cluster4/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
===================================================================
---
branches/Branch_2_2_EAP_cluster4/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2011-09-17
02:31:55 UTC (rev 11358)
+++
branches/Branch_2_2_EAP_cluster4/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2011-09-17
03:00:35 UTC (rev 11359)
@@ -449,7 +449,7 @@
public void announceBackup()
{
- this.backupServerLocator = clusterConnector.createServerLocator();
+ this.backupServerLocator = clusterConnector.createServerLocator(false);
backupServerLocator.setReconnectAttempts(-1);
backupServerLocator.setInitialConnectAttempts(-1);
@@ -475,9 +475,9 @@
true,
connector,
null));
+ backupSessionFactory.close();
log.info("backup announced");
}
- //backupSessionFactory.close();
}
catch (Exception e)
{
@@ -619,7 +619,7 @@
- serverLocator = clusterConnector.createServerLocator();
+ serverLocator = clusterConnector.createServerLocator(true);
if (serverLocator != null)
{
@@ -674,6 +674,7 @@
log.debug("sending notification: " + notification);
managementService.sendNotification(notification);
}
+
}
public TransportConfiguration getConnector()
@@ -780,7 +781,6 @@
{
log.debug(this + "::Creating record for nodeID=" + nodeID +
", connectorPair=" + connectorPair);
}
- log.info(this + "::Creating record for nodeID=" + nodeID +
", connectorPair=" + connectorPair);
// New node - create a new flow record
@@ -822,7 +822,7 @@
}
}
- public synchronized void announceNode()
+ public synchronized void informTopology()
{
String nodeID = server.getNodeID().toString();
@@ -1504,7 +1504,8 @@
@Override
public String toString()
{
- return "ClusterConnectionImpl [nodeUUID=" + nodeUUID +
+ return "ClusterConnectionImpl@" + System.identityHashCode(this) +
+ "[nodeUUID=" + nodeUUID +
", connector=" +
connector +
", address=" +
@@ -1534,7 +1535,7 @@
interface ClusterConnector
{
- ServerLocatorInternal createServerLocator();
+ ServerLocatorInternal createServerLocator(boolean includeTopology);
}
private class StaticClusterConnector implements ClusterConnector
@@ -1546,7 +1547,7 @@
this.tcConfigs = tcConfigs;
}
- public ServerLocatorInternal createServerLocator()
+ public ServerLocatorInternal createServerLocator(boolean includeTopology)
{
if (tcConfigs != null && tcConfigs.length > 0)
{
@@ -1554,7 +1555,9 @@
{
log.debug(ClusterConnectionImpl.this + "Creating a serverLocator for
" + Arrays.toString(tcConfigs));
}
- return new ServerLocatorImpl(topology, true, tcConfigs);
+ ServerLocatorImpl locator = new ServerLocatorImpl(includeTopology ? topology
: null, true, tcConfigs);
+ locator.setClusterConnection(true);
+ return locator;
}
else
{
@@ -1582,9 +1585,11 @@
this.dg = dg;
}
- public ServerLocatorInternal createServerLocator()
+ public ServerLocatorInternal createServerLocator(boolean includeTopology)
{
- return new ServerLocatorImpl(topology, true, dg);
+ ServerLocatorImpl locator = new ServerLocatorImpl(includeTopology ? topology :
null, true, dg);
+ return locator;
+
}
}
}
Modified:
branches/Branch_2_2_EAP_cluster4/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
===================================================================
---
branches/Branch_2_2_EAP_cluster4/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java 2011-09-17
02:31:55 UTC (rev 11358)
+++
branches/Branch_2_2_EAP_cluster4/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java 2011-09-17
03:00:35 UTC (rev 11359)
@@ -168,13 +168,8 @@
return nodeUUID.toString();
}
- public synchronized void start() throws Exception
+ public synchronized void deploy() throws Exception
{
- if (started)
- {
- return;
- }
-
if (clustered)
{
for (BroadcastGroupConfiguration config :
configuration.getBroadcastGroupConfigurations())
@@ -186,22 +181,48 @@
{
deployClusterConnection(config);
}
+ }
- for (ClusterConnection conn : clusterConnections.values())
+ for (BridgeConfiguration config : configuration.getBridgeConfigurations())
+ {
+ deployBridge(config);
+ }
+ }
+
+ public synchronized void start() throws Exception
+ {
+ if (started)
+ {
+ return;
+ }
+
+ for (BroadcastGroup group: broadcastGroups.values())
+ {
+ if (!backup)
{
- conn.announceNode();
- if (backup)
- {
- conn.announceBackup();
- }
+ group.start();
}
}
+
+ for (ClusterConnection conn : clusterConnections.values())
+ {
+ conn.start();
+ if (backup)
+ {
+ conn.informTopology();
+ conn.announceBackup();
+ }
+ }
- for (BridgeConfiguration config : configuration.getBridgeConfigurations())
+ for (Bridge bridge : bridges.values())
{
- deployBridge(config);
+ if (!backup)
+ {
+ bridge.start();
+ }
}
+
started = true;
}
@@ -255,7 +276,7 @@
clusterLocators.clear();
started = false;
- clusterConnections.clear();
+ clearClusterConnections();
}
public void flushExecutor()
@@ -487,10 +508,6 @@
managementService.registerBridge(bridge, config);
- if (!backup)
- {
- bridge.start();
- }
}
public void destroyBridge(final String name) throws Exception
@@ -536,11 +553,18 @@
e.printStackTrace();
}
}
- clusterConnections.clear();
+ clearClusterConnections();
}
// Private methods
----------------------------------------------------------------------------------------------------
+
+ private void clearClusterConnections()
+ {
+ clusterConnections.clear();
+ this.defaultClusterConnection = null;
+ }
+
private void deployClusterConnection(final ClusterConnectionConfiguration config)
throws Exception
{
if (config.getName() == null)
@@ -673,7 +697,6 @@
{
log.debug("ClusterConnection.start at " + clusterConnection, new
Exception("trace"));
}
- clusterConnection.start();
}
private Transformer instantiateTransformer(final String transformerClassName)
@@ -748,11 +771,6 @@
broadcastGroups.put(config.getName(), group);
managementService.registerBroadcastGroup(group, config);
-
- if (!backup)
- {
- group.start();
- }
}
private void logWarnNoConnector(final String connectorName, final String bgName)
Modified:
branches/Branch_2_2_EAP_cluster4/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
---
branches/Branch_2_2_EAP_cluster4/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2011-09-17
02:31:55 UTC (rev 11358)
+++
branches/Branch_2_2_EAP_cluster4/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2011-09-17
03:00:35 UTC (rev 11359)
@@ -227,7 +227,12 @@
// Used to identify the server on tests... useful on debugging testcases
private String identity;
+
+ private Thread backupActivationThread;
+ private Activation activation;
+
+
// Constructors
// ---------------------------------------------------------------------------------
@@ -289,11 +294,6 @@
// lifecycle methods
// ----------------------------------------------------------------
- private interface Activation extends Runnable
- {
- void close(boolean permanently) throws Exception;
- }
-
/*
* Can be overridden for tests
*/
@@ -309,259 +309,6 @@
}
}
- private class NoSharedStoreLiveActivation implements Activation
- {
- public void run()
- {
- try
- {
- initialisePart1();
-
- initialisePart2();
-
- if (identity != null)
- {
- log.info("Server " + identity + " is now live");
- }
- else
- {
- log.info("Server is now live");
- }
- }
- catch (Exception e)
- {
- log.error("Failure in initialisation", e);
- }
- }
-
- public void close(boolean permanently) throws Exception
- {
-
- }
- }
-
- private class SharedStoreLiveActivation implements Activation
- {
- public void run()
- {
- try
- {
- log.info("Waiting to obtain live lock");
-
- checkJournalDirectory();
-
- initialisePart1();
-
- if(nodeManager.isBackupLive())
- {
- //looks like we've failed over at some point need to inform that we
are the backup so when the current live
- // goes down they failover to us
- clusterManager.announceBackup();
- Thread.sleep(configuration.getFailbackDelay());
- }
-
- nodeManager.startLiveNode();
-
- if (stopped)
- {
- return;
- }
-
- initialisePart2();
-
- log.info("Server is now live");
- }
- catch (Exception e)
- {
- log.error("Failure in initialisation", e);
- }
- }
-
- public void close(boolean permanently) throws Exception
- {
- if(permanently)
- {
- nodeManager.crashLiveServer();
- }
- else
- {
- nodeManager.pauseLiveServer();
- }
- }
- }
-
-
- private class SharedStoreBackupActivation implements Activation
- {
-
- volatile boolean closed = false;
- public void run()
- {
- try
- {
- nodeManager.startBackup();
-
- initialisePart1();
-
- clusterManager.start();
-
- started = true;
-
- log.info("HornetQ Backup Server version " +
getVersion().getFullVersion() + " [" + nodeManager.getNodeId() + "]
started, waiting live to fail before it gets active");
-
- nodeManager.awaitLiveNode();
-
- configuration.setBackup(false);
-
- if (stopped)
- {
- return;
- }
-
- initialisePart2();
-
- clusterManager.activate();
-
- log.info("Backup Server is now live");
-
- nodeManager.releaseBackup();
- if(configuration.isAllowAutoFailBack())
- {
- class FailbackChecker implements Runnable
- {
- boolean restarting = false;
- public void run()
- {
- try
- {
- if(!restarting && nodeManager.isAwaitingFailback())
- {
- log.info("live server wants to restart, restarting server
in backup");
- restarting = true;
- Thread t = new Thread(new Runnable()
- {
- public void run()
- {
- try
- {
- log.debug(HornetQServerImpl.this + "::Stopping
live node in favor of failback");
- stop(true);
- // We need to wait some time before we start the
backup again
- // otherwise we may eventually start before the live
had a chance to get it
- Thread.sleep(configuration.getFailbackDelay());
- configuration.setBackup(true);
- log.debug(HornetQServerImpl.this + "::Starting
backup node now after failback");
- start();
- }
- catch (Exception e)
- {
- log.warn("unable to restart server, please kill
and restart manually", e);
- }
- }
- });
- t.start();
- }
- }
- catch (Exception e)
- {
- log.debug(e.getMessage(), e);
- //hopefully it will work next call
- }
- }
- }
- scheduledPool.scheduleAtFixedRate(new FailbackChecker(), 1000l, 1000l,
TimeUnit.MILLISECONDS);
- }
- }
- catch (InterruptedException e)
- {
- //this is ok, we are being stopped
- }
- catch (ClosedChannelException e)
- {
- //this is ok too, we are being stopped
- }
- catch (Exception e)
- {
- if(!(e.getCause() instanceof InterruptedException))
- {
- log.error("Failure in initialisation", e);
- }
- }
- catch(Throwable e)
- {
- log.error("Failure in initialisation", e);
- }
- }
-
- public void close(boolean permanently) throws Exception
- {
- if (configuration.isBackup())
- {
- long timeout = 30000;
-
- long start = System.currentTimeMillis();
-
- while (backupActivationThread.isAlive() && System.currentTimeMillis()
- start < timeout)
- {
- nodeManager.interrupt();
-
- backupActivationThread.interrupt();
-
- backupActivationThread.join(1000);
-
- }
-
- if (System.currentTimeMillis() - start >= timeout)
- {
- threadDump("Timed out waiting for backup activation to exit");
- }
-
- nodeManager.stopBackup();
- }
- else
- {
- //if we are now live, behave as live
- // We need to delete the file too, otherwise the backup will failover when we
shutdown or if the backup is
- // started before the live
- if(permanently)
- {
- nodeManager.crashLiveServer();
- }
- else
- {
- nodeManager.pauseLiveServer();
- }
- }
- }
- }
-
- private class SharedNothingBackupActivation implements Activation
- {
- public void run()
- {
- try
- {
- // TODO
-
- // Try-Connect to live server using live-connector-ref
-
- // sit in loop and try and connect, if server is not live then it will return
NOT_LIVE
- }
- catch (Exception e)
- {
- log.error("Failure in initialisation", e);
- }
- }
-
- public void close(boolean permanently) throws Exception
- {
- }
- }
-
- private Thread backupActivationThread;
-
- private Activation activation;
-
public synchronized void start() throws Exception
{
stopped = false;
@@ -611,6 +358,7 @@
}
+ // The activation on fail-back may change the value of isBackup, for that reason we
are not using else here
if (configuration.isBackup())
{
if (configuration.isSharedStore())
@@ -1069,7 +817,6 @@
return new HashSet<ServerSession>(sessions.values());
}
- // TODO - should this really be here?? It's only used in tests
public boolean isInitialised()
{
synchronized (initialiseLock)
@@ -1232,9 +979,145 @@
return connectorsService;
}
- // Public
- //
---------------------------------------------------------------------------------------
+
+ public synchronized boolean checkActivate() throws Exception
+ {
+ if (configuration.isBackup())
+ {
+ // Handle backup server activation
+ if (!configuration.isSharedStore())
+ {
+ if (replicationEndpoint == null)
+ {
+ HornetQServerImpl.log.warn("There is no replication endpoint,
can't activate this backup server");
+
+ throw new HornetQException(HornetQException.INTERNAL_ERROR,
"Can't activate the server");
+ }
+
+ replicationEndpoint.stop();
+ }
+
+ // Complete the startup procedure
+
+ HornetQServerImpl.log.info("Activating backup server");
+
+ configuration.setBackup(false);
+
+ initialisePart2();
+ }
+
+ return true;
+ }
+
+ public void deployDivert(DivertConfiguration config) throws Exception
+ {
+ if (config.getName() == null)
+ {
+ HornetQServerImpl.log.warn("Must specify a name for each divert. This one
will not be deployed.");
+
+ return;
+ }
+
+ if (config.getAddress() == null)
+ {
+ HornetQServerImpl.log.warn("Must specify an address for each divert. This
one will not be deployed.");
+
+ return;
+ }
+
+ if (config.getForwardingAddress() == null)
+ {
+ HornetQServerImpl.log.warn("Must specify an forwarding address for each
divert. This one will not be deployed.");
+
+ return;
+ }
+
+ SimpleString sName = new SimpleString(config.getName());
+
+ if (postOffice.getBinding(sName) != null)
+ {
+ HornetQServerImpl.log.warn("Binding already exists with name " + sName
+ ", divert will not be deployed");
+
+ return;
+ }
+
+ SimpleString sAddress = new SimpleString(config.getAddress());
+
+ Transformer transformer =
instantiateTransformer(config.getTransformerClassName());
+
+ Filter filter = FilterImpl.createFilter(config.getFilterString());
+
+ Divert divert = new DivertImpl(new SimpleString(config.getForwardingAddress()),
+ sName,
+ new SimpleString(config.getRoutingName()),
+ config.isExclusive(),
+ filter,
+ transformer,
+ postOffice,
+ storageManager);
+
+ Binding binding = new DivertBinding(storageManager.generateUniqueID(), sAddress,
divert);
+
+ postOffice.addBinding(binding);
+
+ managementService.registerDivert(divert, config);
+ }
+
+ public void destroyDivert(SimpleString name) throws Exception
+ {
+ Binding binding = postOffice.getBinding(name);
+ if (binding == null)
+ {
+ throw new HornetQException(HornetQException.INTERNAL_ERROR, "No binding for
divert " + name);
+ }
+ if (!(binding instanceof DivertBinding))
+ {
+ throw new HornetQException(HornetQException.INTERNAL_ERROR, "Binding "
+ name + " is not a divert");
+ }
+
+ postOffice.removeBinding(name);
+ }
+
+
+
+ public void deployBridge(BridgeConfiguration config) throws Exception
+ {
+ if (clusterManager != null)
+ {
+ clusterManager.deployBridge(config);
+ }
+ }
+
+ public void destroyBridge(String name) throws Exception
+ {
+ if (clusterManager != null)
+ {
+ clusterManager.destroyBridge(name);
+ }
+ }
+
+ public ServerSession getSessionByID(String sessionName)
+ {
+ return sessions.get(sessionName);
+ }
+
+ // PUBLIC -------
+
+ public String toString()
+ {
+ if (identity != null)
+ {
+ return "HornetQServerImpl::" + identity;
+ }
+ else
+ {
+ return "HornetQServerImpl::" + (nodeManager != null ?
"serverUUID=" + nodeManager.getUUID() : "");
+ }
+ }
+
+
+
// Package protected
// ----------------------------------------------------------------------------
@@ -1296,34 +1179,6 @@
// Private
//
--------------------------------------------------------------------------------------
- // private boolean startReplication() throws Exception
- // {
- // String backupConnectorName = configuration.getBackupConnectorName();
- //
- // if (!configuration.isSharedStore() && backupConnectorName != null)
- // {
- // TransportConfiguration backupConnector =
configuration.getConnectorConfigurations().get(backupConnectorName);
- //
- // if (backupConnector == null)
- // {
- // HornetQServerImpl.log.warn("connector with name '" +
backupConnectorName +
- // "' is not defined in the configuration.");
- // }
- // else
- // {
- //
- // replicationFailoverManager =
createBackupConnectionFailoverManager(backupConnector,
- // threadPool,
- // scheduledPool);
- //
- // replicationManager = new ReplicationManagerImpl(replicationFailoverManager,
executorFactory);
- // replicationManager.start();
- // }
- // }
- //
- // return true;
- // }
-
private void callActivateCallbacks()
{
for (ActivateCallback callback : activateCallbacks)
@@ -1340,44 +1195,6 @@
}
}
- public synchronized boolean checkActivate() throws Exception
- {
- if (configuration.isBackup())
- {
- // Handle backup server activation
-
- if (!configuration.isSharedStore())
- {
- if (replicationEndpoint == null)
- {
- HornetQServerImpl.log.warn("There is no replication endpoint,
can't activate this backup server");
-
- throw new HornetQException(HornetQException.INTERNAL_ERROR,
"Can't activate the server");
- }
-
- replicationEndpoint.stop();
- }
-
- // Complete the startup procedure
-
- HornetQServerImpl.log.info("Activating backup server");
-
- configuration.setBackup(false);
-
- initialisePart2();
- }
-
- return true;
- }
-
- private class FileActivateRunner implements Runnable
- {
- public void run()
- {
-
- }
- }
-
private void initialiseLogging()
{
LogDelegateFactory logDelegateFactory =
(LogDelegateFactory)instantiateInstance(configuration.getLogDelegateFactoryClassName());
@@ -1478,8 +1295,11 @@
nodeManager.getUUID(),
configuration.isBackup(),
configuration.isClustered());
+
+ clusterManager.deploy();
+
remotingService = new RemotingServiceImpl(clusterManager, configuration, this,
managementService, scheduledPool);
messagingServerControl = managementService.registerServer(postOffice,
@@ -1826,76 +1646,6 @@
}
}
- public void deployDivert(DivertConfiguration config) throws Exception
- {
- if (config.getName() == null)
- {
- HornetQServerImpl.log.warn("Must specify a name for each divert. This one
will not be deployed.");
-
- return;
- }
-
- if (config.getAddress() == null)
- {
- HornetQServerImpl.log.warn("Must specify an address for each divert. This
one will not be deployed.");
-
- return;
- }
-
- if (config.getForwardingAddress() == null)
- {
- HornetQServerImpl.log.warn("Must specify an forwarding address for each
divert. This one will not be deployed.");
-
- return;
- }
-
- SimpleString sName = new SimpleString(config.getName());
-
- if (postOffice.getBinding(sName) != null)
- {
- HornetQServerImpl.log.warn("Binding already exists with name " + sName
+ ", divert will not be deployed");
-
- return;
- }
-
- SimpleString sAddress = new SimpleString(config.getAddress());
-
- Transformer transformer =
instantiateTransformer(config.getTransformerClassName());
-
- Filter filter = FilterImpl.createFilter(config.getFilterString());
-
- Divert divert = new DivertImpl(new SimpleString(config.getForwardingAddress()),
- sName,
- new SimpleString(config.getRoutingName()),
- config.isExclusive(),
- filter,
- transformer,
- postOffice,
- storageManager);
-
- Binding binding = new DivertBinding(storageManager.generateUniqueID(), sAddress,
divert);
-
- postOffice.addBinding(binding);
-
- managementService.registerDivert(divert, config);
- }
-
- public void destroyDivert(SimpleString name) throws Exception
- {
- Binding binding = postOffice.getBinding(name);
- if (binding == null)
- {
- throw new HornetQException(HornetQException.INTERNAL_ERROR, "No binding for
divert " + name);
- }
- if (!(binding instanceof DivertBinding))
- {
- throw new HornetQException(HornetQException.INTERNAL_ERROR, "Binding "
+ name + " is not a divert");
- }
-
- postOffice.removeBinding(name);
- }
-
-
private synchronized void deployGroupingHandlerConfiguration(final
GroupingHandlerConfiguration config) throws Exception
{
if (config != null)
@@ -1922,22 +1672,6 @@
managementService.addNotificationListener(groupingHandler);
}
}
-
- public void deployBridge(BridgeConfiguration config) throws Exception
- {
- if (clusterManager != null)
- {
- clusterManager.deployBridge(config);
- }
- }
-
- public void destroyBridge(String name) throws Exception
- {
- if (clusterManager != null)
- {
- clusterManager.destroyBridge(name);
- }
- }
private Transformer instantiateTransformer(final String transformerClassName)
{
@@ -1979,11 +1713,6 @@
}
- public ServerSession getSessionByID(String sessionName)
- {
- return sessions.get(sessionName);
- }
-
/**
* Check if journal directory exists or create it (if configured to do so)
*/
@@ -2005,18 +1734,284 @@
}
}
- public String toString()
+ /**
+ * To be called by backup trying to fail back the server
+ */
+ private void startFailbackChecker()
{
- if (identity != null)
+ scheduledPool.scheduleAtFixedRate(new FailbackChecker(), 1000l, 1000l,
TimeUnit.MILLISECONDS);
+ }
+
+
+ // Inner classes
+ // --------------------------------------------------------------------------------
+
+ class FailbackChecker implements Runnable
+ {
+ boolean restarting = false;
+ public void run()
{
- return "HornetQServerImpl::" + identity;
+ try
+ {
+ if(!restarting && nodeManager.isAwaitingFailback())
+ {
+ log.info("live server wants to restart, restarting server in
backup");
+ restarting = true;
+ Thread t = new Thread(new Runnable()
+ {
+ public void run()
+ {
+ try
+ {
+ log.debug(HornetQServerImpl.this + "::Stopping live node in
favor of failback");
+ stop(true);
+ // We need to wait some time before we start the backup again
+ // otherwise we may eventually start before the live had a chance
to get it
+ Thread.sleep(configuration.getFailbackDelay());
+ configuration.setBackup(true);
+ log.debug(HornetQServerImpl.this + "::Starting backup node
now after failback");
+ start();
+ }
+ catch (Exception e)
+ {
+ log.warn("unable to restart server, please kill and restart
manually", e);
+ }
+ }
+ });
+ t.start();
+ }
+ }
+ catch (Exception e)
+ {
+ log.warn(e.getMessage(), e);
+ }
}
- else
+ }
+
+
+
+ private class SharedStoreLiveActivation implements Activation
+ {
+ public void run()
{
- return "HornetQServerImpl::" + (nodeManager != null ?
"serverUUID=" + nodeManager.getUUID() : "");
+ try
+ {
+ log.info("Waiting to obtain live lock");
+
+ checkJournalDirectory();
+
+ if (log.isDebugEnabled())
+ {
+ log.debug("First part initialization on " + this);
+ }
+
+ initialisePart1();
+
+ if(nodeManager.isBackupLive())
+ {
+ //looks like we've failed over at some point need to inform that we
are the backup so when the current live
+ // goes down they failover to us
+ if (log.isDebugEnabled())
+ {
+ log.debug("announcing backup to the former live" + this);
+ }
+
+ clusterManager.announceBackup();
+ Thread.sleep(configuration.getFailbackDelay());
+ }
+
+ nodeManager.startLiveNode();
+
+ if (stopped)
+ {
+ return;
+ }
+
+ initialisePart2();
+
+ log.info("Server is now live");
+ }
+ catch (Exception e)
+ {
+ log.error("Failure in initialisation", e);
+ }
}
+
+ public void close(boolean permanently) throws Exception
+ {
+ if(permanently)
+ {
+ nodeManager.crashLiveServer();
+ }
+ else
+ {
+ nodeManager.pauseLiveServer();
+ }
+ }
}
- // Inner classes
- // --------------------------------------------------------------------------------
+
+ private class SharedStoreBackupActivation implements Activation
+ {
+ public void run()
+ {
+ try
+ {
+ nodeManager.startBackup();
+
+ initialisePart1();
+
+ clusterManager.start();
+
+ started = true;
+
+ log.info("HornetQ Backup Server version " +
getVersion().getFullVersion() + " [" + nodeManager.getNodeId() + "]
started, waiting live to fail before it gets active");
+
+ nodeManager.awaitLiveNode();
+
+ configuration.setBackup(false);
+
+ if (stopped)
+ {
+ return;
+ }
+
+ initialisePart2();
+
+ clusterManager.activate();
+
+ log.info("Backup Server is now live");
+
+ nodeManager.releaseBackup();
+ if(configuration.isAllowAutoFailBack())
+ {
+ startFailbackChecker();
+ }
+ }
+ catch (InterruptedException e)
+ {
+ //this is ok, we are being stopped
+ }
+ catch (ClosedChannelException e)
+ {
+ //this is ok too, we are being stopped
+ }
+ catch (Exception e)
+ {
+ if(!(e.getCause() instanceof InterruptedException))
+ {
+ log.error("Failure in initialisation", e);
+ }
+ }
+ catch(Throwable e)
+ {
+ log.error("Failure in initialisation", e);
+ }
+ }
+
+ /**
+ *
+ */
+ public void close(boolean permanently) throws Exception
+ {
+ if (configuration.isBackup())
+ {
+ long timeout = 30000;
+
+ long start = System.currentTimeMillis();
+
+ while (backupActivationThread.isAlive() && System.currentTimeMillis()
- start < timeout)
+ {
+ nodeManager.interrupt();
+
+ backupActivationThread.interrupt();
+
+ backupActivationThread.join(1000);
+
+ }
+
+ if (System.currentTimeMillis() - start >= timeout)
+ {
+ threadDump("Timed out waiting for backup activation to exit");
+ }
+
+ nodeManager.stopBackup();
+ }
+ else
+ {
+ //if we are now live, behave as live
+ // We need to delete the file too, otherwise the backup will failover when we
shutdown or if the backup is
+ // started before the live
+ if(permanently)
+ {
+ nodeManager.crashLiveServer();
+ }
+ else
+ {
+ nodeManager.pauseLiveServer();
+ }
+ }
+ }
+ }
+
+ private interface Activation extends Runnable
+ {
+ void close(boolean permanently) throws Exception;
+ }
+
+ private class SharedNothingBackupActivation implements Activation
+ {
+ public void run()
+ {
+ try
+ {
+ // TODO
+
+ // Try-Connect to live server using live-connector-ref
+
+ // sit in loop and try and connect, if server is not live then it will return
NOT_LIVE
+ }
+ catch (Exception e)
+ {
+ log.error("Failure in initialisation", e);
+ }
+ }
+
+ public void close(boolean permanently) throws Exception
+ {
+ }
+ }
+
+ private class NoSharedStoreLiveActivation implements Activation
+ {
+ public void run()
+ {
+ try
+ {
+ initialisePart1();
+
+ initialisePart2();
+
+ if (identity != null)
+ {
+ log.info("Server " + identity + " is now live");
+ }
+ else
+ {
+ log.info("Server is now live");
+ }
+ }
+ catch (Exception e)
+ {
+ log.error("Failure in initialisation", e);
+ }
+ }
+
+ public void close(boolean permanently) throws Exception
+ {
+
+ }
+ }
+
+
}
Modified:
branches/Branch_2_2_EAP_cluster4/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
===================================================================
---
branches/Branch_2_2_EAP_cluster4/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2011-09-17
02:31:55 UTC (rev 11358)
+++
branches/Branch_2_2_EAP_cluster4/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2011-09-17
03:00:35 UTC (rev 11359)
@@ -1062,7 +1062,7 @@
return;
}
-
+
consumer.receiveCredits(credits);
}
Modified:
branches/Branch_2_2_EAP_cluster4/tests/src/org/hornetq/tests/integration/cluster/failover/FailBackAutoTest.java
===================================================================
---
branches/Branch_2_2_EAP_cluster4/tests/src/org/hornetq/tests/integration/cluster/failover/FailBackAutoTest.java 2011-09-17
02:31:55 UTC (rev 11358)
+++
branches/Branch_2_2_EAP_cluster4/tests/src/org/hornetq/tests/integration/cluster/failover/FailBackAutoTest.java 2011-09-17
03:00:35 UTC (rev 11359)
@@ -28,6 +28,7 @@
import org.hornetq.api.core.client.ClientProducer;
import org.hornetq.api.core.client.ClientSession;
import org.hornetq.api.core.client.ClientSessionFactory;
+import org.hornetq.api.core.client.ServerLocator;
import org.hornetq.api.core.client.SessionFailureListener;
import org.hornetq.core.client.impl.ClientSessionFactoryInternal;
import org.hornetq.core.client.impl.ServerLocatorInternal;
@@ -76,18 +77,28 @@
locator.setBlockOnDurableSend(true);
locator.setFailoverOnInitialConnection(true);
locator.setReconnectAttempts(-1);
+ ((ServerLocatorInternal)locator).setIdentity("testAutoFailback");
+
ClientSessionFactoryInternal sf = createSessionFactoryAndWaitForTopology(locator,
2);
final CountDownLatch latch = new CountDownLatch(1);
ClientSession session = sendAndConsume(sf, true);
+
+ System.out.println(locator.getTopology().describe());
MyListener listener = new MyListener(latch);
session.addFailureListener(listener);
+
+ System.out.println(locator.getTopology().describe());
liveServer.crash();
-
+
assertTrue(latch.await(5, TimeUnit.SECONDS));
+
+ log.info("backup (nowLive) topology = " +
backupServer.getServer().getClusterManager().getDefaultConnection().getTopology().describe());
+
+ log.info("Server Crash!!!");
ClientProducer producer = session.createProducer(FailoverTestBase.ADDRESS);
@@ -97,6 +108,11 @@
producer.send(message);
+ verifyMessageOnServer(1, 1);
+
+ System.out.println(locator.getTopology().describe());
+
+
session.removeFailureListener(listener);
final CountDownLatch latch2 = new CountDownLatch(1);
@@ -107,6 +123,10 @@
log.info("******* starting live server back");
liveServer.start();
+
+ Thread.sleep(1000);
+
+ System.out.println("After failback: " +
locator.getTopology().describe());
assertTrue(latch2.await(5, TimeUnit.SECONDS));
@@ -118,6 +138,8 @@
session.close();
+ verifyMessageOnServer(0, 1);
+
sf.close();
Assert.assertEquals(0, sf.numSessions());
@@ -125,6 +147,29 @@
Assert.assertEquals(0, sf.numConnections());
}
+ /**
+ * @throws Exception
+ * @throws HornetQException
+ */
+ private void verifyMessageOnServer(final int server, final int numberOfMessages)
throws Exception, HornetQException
+ {
+ ServerLocator backupLocator = createInVMLocator(server);
+ ClientSessionFactory factorybkp = backupLocator.createSessionFactory();
+ ClientSession sessionbkp = factorybkp.createSession(false, false);
+ sessionbkp.start();
+ ClientConsumer consumerbkp = sessionbkp.createConsumer(ADDRESS);
+ for (int i = 0 ; i < numberOfMessages; i++)
+ {
+ ClientMessage msg = consumerbkp.receive(1000);
+ assertNotNull(msg);
+ msg.acknowledge();
+ sessionbkp.commit();
+ }
+ sessionbkp.close();
+ factorybkp.close();
+ backupLocator.close();
+ }
+
public void testAutoFailbackThenFailover() throws Exception
{
locator.setBlockOnNonDurableSend(true);
@@ -253,7 +298,7 @@
if (createQueue)
{
- session.createQueue(FailoverTestBase.ADDRESS, FailoverTestBase.ADDRESS, null,
false);
+ session.createQueue(FailoverTestBase.ADDRESS, FailoverTestBase.ADDRESS, null,
true);
}
ClientProducer producer = session.createProducer(FailoverTestBase.ADDRESS);
@@ -288,6 +333,8 @@
}
ClientMessage message3 = consumer.receiveImmediate();
+
+ consumer.close();
Assert.assertNull(message3);
@@ -315,6 +362,7 @@
public void connectionFailed(final HornetQException me, boolean failedOver)
{
+ System.out.println("Failed, me");
latch.countDown();
}
Modified:
branches/Branch_2_2_EAP_cluster4/tests/src/org/hornetq/tests/util/ServiceTestBase.java
===================================================================
---
branches/Branch_2_2_EAP_cluster4/tests/src/org/hornetq/tests/util/ServiceTestBase.java 2011-09-17
02:31:55 UTC (rev 11358)
+++
branches/Branch_2_2_EAP_cluster4/tests/src/org/hornetq/tests/util/ServiceTestBase.java 2011-09-17
03:00:35 UTC (rev 11359)
@@ -38,6 +38,7 @@
import org.hornetq.core.remoting.impl.invm.InVMAcceptorFactory;
import org.hornetq.core.remoting.impl.invm.InVMConnectorFactory;
import org.hornetq.core.remoting.impl.invm.InVMRegistry;
+import org.hornetq.core.remoting.impl.invm.TransportConstants;
import org.hornetq.core.remoting.impl.netty.NettyAcceptorFactory;
import org.hornetq.core.remoting.impl.netty.NettyConnectorFactory;
import org.hornetq.core.server.HornetQServer;
@@ -530,7 +531,19 @@
locators.add(locatorWithoutHA);
return locatorWithoutHA;
}
+
+ protected ServerLocator createInVMLocator(final int serverID)
+ {
+ Map<String, Object> server1Params = new HashMap<String, Object>();
+ if (serverID != 0)
+ {
+ server1Params.put(TransportConstants.SERVER_ID_PROP_NAME, serverID);
+ }
+
+ return HornetQClient.createServerLocatorWithHA(new
TransportConfiguration(INVM_CONNECTOR_FACTORY, server1Params));
+ }
+
protected ClientSessionFactoryImpl createFactory(final String connectorClass) throws
Exception
{
ServerLocator locator = HornetQClient.createServerLocatorWithoutHA(new
TransportConfiguration(connectorClass));