[hornetq-commits] JBoss hornetq SVN: r10969 - in branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core: replication/impl and 1 other directories.
do-not-reply at jboss.org
do-not-reply at jboss.org
Tue Jul 12 09:31:04 EDT 2011
Author: ataylor
Date: 2011-07-12 09:31:03 -0400 (Tue, 12 Jul 2011)
New Revision: 10969
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/client/impl/ServerLocatorImpl.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java
Log:
announce replication in backup thread and make sure to activate the cluster manager
Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/client/impl/ServerLocatorImpl.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/client/impl/ServerLocatorImpl.java 2011-07-12 07:40:24 UTC (rev 10968)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/client/impl/ServerLocatorImpl.java 2011-07-12 13:31:03 UTC (rev 10969)
@@ -1217,7 +1217,7 @@
updateArraysAndPairs();
}
- if (last)
+ if (last && topologyArray != null)
{
receivedTopology = true;
}
Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java 2011-07-12 07:40:24 UTC (rev 10968)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java 2011-07-12 13:31:03 UTC (rev 10969)
@@ -87,7 +87,9 @@
// Used on tests, to simulate failures on delete pages
private boolean deletePages = true;
- // Constructors --------------------------------------------------
+ private boolean started;
+
+ // Constructors --------------------------------------------------
public ReplicationEndpointImpl(final HornetQServer server)
{
this.server = server;
@@ -198,7 +200,7 @@
*/
public boolean isStarted()
{
- return true;
+ return started;
}
/* (non-Javadoc)
@@ -229,6 +231,8 @@
pageManager.start();
+ started = true;
+
}
/* (non-Javadoc)
@@ -236,6 +240,10 @@
*/
public void stop() throws Exception
{
+ if(!started)
+ {
+ return;
+ }
// This could be null if the backup server is being
// shut down without any live server connecting here
if (channel != null)
@@ -269,6 +277,8 @@
largeMessages.clear();
pageManager.stop();
+
+ started = false;
}
/* (non-Javadoc)
Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java 2011-07-12 07:40:24 UTC (rev 10968)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java 2011-07-12 13:31:03 UTC (rev 10969)
@@ -537,44 +537,65 @@
initialisePart1();
clusterManager.start();
+
String liveConnectorName = configuration.getLiveConnectorName();
if (liveConnectorName == null)
{
throw new IllegalArgumentException("Cannot have a replicated backup without configuring its live-server!");
}
final TransportConfiguration config = configuration.getConnectorConfigurations().get(liveConnectorName);
- serverLocator =
+ serverLocator =
(ServerLocatorInternal)HornetQClient.createServerLocatorWithoutHA(config);
serverLocator.setReconnectAttempts(-1);
- final ClientSessionFactory liveServerSessionFactory = serverLocator.connect();
-
- if (liveServerSessionFactory == null)
+ threadPool.execute(new Runnable()
{
- // XXX
- throw new RuntimeException("Need to retry...");
- }
- CoreRemotingConnection liveConnection = liveServerSessionFactory.getConnection();
- Channel pingChannel = liveConnection.getChannel(CHANNEL_ID.PING.id, -1);
- Channel replicationChannel = liveConnection.getChannel(CHANNEL_ID.REPLICATION.id, -1);
+ @Override
+ public void run()
+ {
+ try
+ {
+ final ClientSessionFactory liveServerSessionFactory = serverLocator.connect();
- replicationChannel.setHandler(replicationEndpoint);
- connectToReplicationEndpoint(replicationChannel);
- replicationEndpoint.start();
+ if (liveServerSessionFactory == null)
+ {
+ // XXX
+ throw new RuntimeException("Need to retry...");
+ }
+ CoreRemotingConnection liveConnection = liveServerSessionFactory.getConnection();
+ Channel pingChannel = liveConnection.getChannel(CHANNEL_ID.PING.id, -1);
+ Channel replicationChannel = liveConnection.getChannel(CHANNEL_ID.REPLICATION.id, -1);
- clusterManager.announceReplicatingBackup(pingChannel);
+ replicationChannel.setHandler(replicationEndpoint);
+ connectToReplicationEndpoint(replicationChannel);
+ replicationEndpoint.start();
+ clusterManager.announceReplicatingBackup(pingChannel);
+ }
+ catch (Exception e)
+ {
+ log.warn("unable to announce backup for replication", e);
+ }
+ }
+ });
+
+
log.info("HornetQ Backup Server version " + getVersion().getFullVersion() + " [" + nodeManager.getNodeId() +
"] started, waiting live to fail before it gets active");
started = true;
+
+ // Server node (i.e. Life node) is not running, now the backup takes over.
+ //we must remember to close stuff we don't need any more
nodeManager.awaitLiveNode();
- // Server node (i.e. Life node) is not running, now the backup takes over.
+ serverLocator.close();
replicationEndpoint.stop();
configuration.setBackup(false);
initialisePart2();
+ clusterManager.activate();
+
}
catch (Exception e)
{
More information about the hornetq-commits
mailing list