Author: borges
Date: 2011-10-17 11:33:57 -0400 (Mon, 17 Oct 2011)
New Revision: 11559
Modified:
trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java
trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java
Log:
HORNETQ-720 Merge activation classes
Modified:
trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java
===================================================================
---
trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java 2011-10-17
15:33:40 UTC (rev 11558)
+++
trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java 2011-10-17
15:33:57 UTC (rev 11559)
@@ -51,8 +51,6 @@
* A CoreProtocolManager
*
* @author Tim Fox
- *
- *
*/
public class CoreProtocolManager implements ProtocolManager
{
@@ -216,8 +214,8 @@
* HORNETQ-720 Instantiate a new server locator to call
notifyNodeUp(...)? Or send
* a CLUSTER_TOPOLOGY(_2?) message?
*/
-// server.getClusterManager().notifyNodeUp(msg.getNodeID(),
getPair(msg.getConnector(), true), true,
-// true);
+
acceptorUsed.getClusterConnection().nodeAnnounced(System.currentTimeMillis(),
msg.getNodeID(),
+
getPair(msg.getConnector(), true), true);
}
}
}
Modified:
trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
---
trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java 2011-10-17
15:33:40 UTC (rev 11558)
+++
trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java 2011-10-17
15:33:57 UTC (rev 11559)
@@ -40,8 +40,12 @@
import org.hornetq.api.core.HornetQException;
import org.hornetq.api.core.Pair;
import org.hornetq.api.core.SimpleString;
+import org.hornetq.api.core.TransportConfiguration;
+import org.hornetq.api.core.client.ClientSessionFactory;
+import org.hornetq.api.core.client.HornetQClient;
import org.hornetq.core.asyncio.impl.AsynchronousFileImpl;
import org.hornetq.core.client.impl.ClientSessionFactoryImpl;
+import org.hornetq.core.client.impl.ServerLocatorInternal;
import org.hornetq.core.config.BridgeConfiguration;
import org.hornetq.core.config.Configuration;
import org.hornetq.core.config.CoreQueueConfiguration;
@@ -83,6 +87,7 @@
import org.hornetq.core.postoffice.impl.PostOfficeImpl;
import org.hornetq.core.protocol.core.Channel;
import org.hornetq.core.protocol.core.CoreRemotingConnection;
+import org.hornetq.core.protocol.core.impl.ChannelImpl.CHANNEL_ID;
import org.hornetq.core.remoting.server.RemotingService;
import org.hornetq.core.remoting.server.impl.RemotingServiceImpl;
import org.hornetq.core.replication.ReplicationEndpoint;
@@ -362,17 +367,14 @@
if (configuration.isSharedStore() &&
configuration.isPersistenceEnabled())
{
activation = new SharedStoreLiveActivation();
-
- // This should block until the lock is got
-
- activation.run();
}
else
{
activation = new NoSharedStoreLiveActivation();
+ }
- activation.run();
- }
+ activation.run();
+
started = true;
HornetQServerImpl.log.info("HornetQ Server version " +
getVersion().getFullVersion() +
@@ -1862,7 +1864,7 @@
}
}
- private class SharedStoreLiveActivation implements Activation
+ private final class SharedStoreLiveActivation implements Activation
{
public void run()
{
@@ -1923,7 +1925,7 @@
}
}
- private class SharedStoreBackupActivation implements Activation
+ private final class SharedStoreBackupActivation implements Activation
{
public void run()
{
@@ -1984,9 +1986,6 @@
}
}
- /**
- *
- */
public void close(boolean permanently) throws Exception
{
if (configuration.isBackup())
@@ -2029,7 +2028,7 @@
}
}
- private class ShutdownOnCriticalErrorListener implements IOCriticalErrorListener
+ private final class ShutdownOnCriticalErrorListener implements
IOCriticalErrorListener
{
boolean failedAlready = false;
@@ -2065,30 +2064,143 @@
void close(boolean permanently) throws Exception;
}
- private class SharedNothingBackupActivation implements Activation
+ private final class SharedNothingBackupActivation implements Activation
{
+ private ServerLocatorInternal serverLocator;
+
public void run()
{
try
{
- // TODO
+ nodeManager.startBackup();
- // Try-Connect to live server using live-connector-ref
+ initialisePart1();
- // sit in loop and try and connect, if server is not live then it will return
NOT_LIVE
+ final String liveConnectorName = configuration.getLiveConnectorName();
+ if (liveConnectorName == null)
+ {
+ throw new IllegalArgumentException(
+ "Cannot have a replicated backup
without configuring its live-server!");
+ }
+ clusterManager.start();
+
+ final TransportConfiguration config =
configuration.getConnectorConfigurations().get(liveConnectorName);
+ serverLocator =
(ServerLocatorInternal)HornetQClient.createServerLocatorWithHA(config);
+ final QuorumManager quorumManager = new QuorumManager(serverLocator,
nodeManager.getNodeId().toString());
+
+ serverLocator.setReconnectAttempts(-1);
+
+ threadPool.execute(new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ try
+ {
+ final ClientSessionFactory liveServerSessionFactory =
serverLocator.connect();
+ if (liveServerSessionFactory == null)
+ {
+ // XXX HORNETQ-768
+ throw new RuntimeException("Need to retry?");
+ }
+ CoreRemotingConnection liveConnection =
liveServerSessionFactory.getConnection();
+ Channel pingChannel = liveConnection.getChannel(CHANNEL_ID.PING.id,
-1);
+ Channel replicationChannel =
liveConnection.getChannel(CHANNEL_ID.REPLICATION.id, -1);
+
+ connectToReplicationEndpoint(replicationChannel);
+ replicationEndpoint.start();
+ clusterManager.announceReplicatingBackup(pingChannel);
+ }
+ catch (Exception e)
+ {
+ log.warn("Unable to announce backup for replication.",
e);
+ }
+ }
+ });
+
+ log.info("HornetQ Backup Server version " +
getVersion().getFullVersion() + " [" + nodeManager.getNodeId() +
+ "] started, waiting live to fail before it gets active");
+ started = true;
+
+ // Server node (i.e. Life node) is not running, now the backup takes over.
+ // we must remember to close stuff we don't need any more
+ while (true)
+ {
+ nodeManager.awaitLiveNode();
+ if (quorumManager.isNodeDown())
+ {
+ break;
+ }
+ }
+
+ serverLocator.close();
+ replicationEndpoint.stop();
+
+ if (!isRemoteBackupUpToDate())
+ {
+ /*
+ * XXX HORNETQ-768 Live is down, and this server was not in sync. Perhaps
we should
+ * first try to wait a little longer to see if the 'live' comes
back?
+ */
+ throw new HornetQException(HornetQException.ILLEGAL_STATE, "Backup
Server was not yet in sync with live");
+ }
+
+ configuration.setBackup(false);
+ synchronized (startUpLock)
+ {
+ if (!started)
+ return;
+ storageManager.start();
+ initialisePart2();
+ clusterManager.activate();
+ }
+
}
catch (Exception e)
{
+ if ((e instanceof InterruptedException || e instanceof IllegalStateException)
&& !started)
+ // do not log these errors if the server is being stopped.
+ return;
log.error("Failure in initialisation", e);
+ e.printStackTrace();
}
}
- public void close(boolean permanently) throws Exception
+ public void close(final boolean permanently) throws Exception
{
+ if (serverLocator != null)
+ {
+ serverLocator.close();
+ serverLocator = null;
+ }
+
+ if (configuration.isBackup())
+ {
+
+ long timeout = 30000;
+
+ long start = System.currentTimeMillis();
+
+ while (backupActivationThread.isAlive() && System.currentTimeMillis()
- start < timeout)
+ {
+ nodeManager.interrupt();
+
+ backupActivationThread.interrupt();
+
+ Thread.sleep(1000);
+ }
+
+ if (System.currentTimeMillis() - start >= timeout)
+ {
+ log.warn("Timed out waiting for backup activation to exit");
+ }
+
+ nodeManager.stopBackup();
+ }
}
}
- private class NoSharedStoreLiveActivation implements Activation
+ private final class NoSharedStoreLiveActivation implements Activation
{
public void run()
{
@@ -2115,7 +2227,14 @@
public void close(boolean permanently) throws Exception
{
-
+ if (permanently)
+ {
+ nodeManager.crashLiveServer();
+ }
+ else
+ {
+ nodeManager.pauseLiveServer();
+ }
}
}
Show replies by date