Author: borges
Date: 2011-06-27 10:18:52 -0400 (Mon, 27 Jun 2011)
New Revision: 10886
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java
Log:
HORNETQ-720 Add some replication support
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
---
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java 2011-06-27
14:14:47 UTC (rev 10885)
+++
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java 2011-06-27
14:18:52 UTC (rev 10886)
@@ -38,8 +38,12 @@
import org.hornetq.api.core.HornetQException;
import org.hornetq.api.core.Pair;
import org.hornetq.api.core.SimpleString;
+import org.hornetq.api.core.TransportConfiguration;
+import org.hornetq.api.core.client.ClientSessionFactory;
+import org.hornetq.api.core.client.HornetQClient;
import org.hornetq.core.asyncio.impl.AsynchronousFileImpl;
import org.hornetq.core.client.impl.ClientSessionFactoryImpl;
+import org.hornetq.core.client.impl.ServerLocatorInternal;
import org.hornetq.core.config.BridgeConfiguration;
import org.hornetq.core.config.Configuration;
import org.hornetq.core.config.CoreQueueConfiguration;
@@ -76,6 +80,7 @@
import org.hornetq.core.postoffice.impl.LocalQueueBinding;
import org.hornetq.core.postoffice.impl.PostOfficeImpl;
import org.hornetq.core.protocol.core.Channel;
+import org.hornetq.core.protocol.core.impl.wireformat.NodeAnnounceMessage;
import org.hornetq.core.remoting.server.RemotingService;
import org.hornetq.core.remoting.server.impl.RemotingServiceImpl;
import org.hornetq.core.replication.ReplicationEndpoint;
@@ -133,7 +138,7 @@
//
------------------------------------------------------------------------------------
private static final Logger log = Logger.getLogger(HornetQServerImpl.class);
-
+
// JMS Topics (which are outside of the scope of the core API) will require a dumb
subscription with a dummy-filter at this current version
// as a way to keep its existence valid and TCK tests
// That subscription needs an invalid filter, however paging needs to ignore any
subscription with this filter.
@@ -147,7 +152,7 @@
// Attributes
//
-----------------------------------------------------------------------------------
-
+
private final Version version;
private final HornetQSecurityManager securityManager;
@@ -165,7 +170,7 @@
private volatile QueueFactory queueFactory;
private volatile PagingManager pagingManager;
-
+
private volatile PostOffice postOffice;
private volatile ExecutorService threadPool;
@@ -187,7 +192,7 @@
private volatile RemotingService remotingService;
private volatile ManagementService managementService;
-
+
private volatile ConnectorsService connectorsService;
private MemoryManager memoryManager;
@@ -217,9 +222,9 @@
private final Set<ActivateCallback> activateCallbacks = new
HashSet<ActivateCallback>();
private volatile GroupingHandler groupingHandler;
-
+
private NodeManager nodeManager;
-
+
// Used to identify the server on tests... useful on debugging testcases
private String identity;
@@ -352,7 +357,7 @@
nodeManager.startLiveNode();
initialisePart2();
-
+
log.info("Server is now live");
}
catch (Exception e)
@@ -392,11 +397,11 @@
log.info("HornetQ Backup Server version " +
getVersion().getFullVersion() + " [" + nodeManager.getNodeId() + "]
started, waiting live to fail before it gets active");
nodeManager.awaitLiveNode();
-
+
configuration.setBackup(false);
-
+
initialisePart2();
-
+
clusterManager.activate();
log.info("Backup Server is now live");
@@ -511,11 +516,48 @@
{
try
{
- // TODO
+ nodeManager.startBackup();
+ initialisePart1();
+
+ clusterManager.start();
// Try-Connect to live server using live-connector-ref
+ String liveConnectorsName = configuration.getLiveConnectorName();
+ if (liveConnectorsName == null)
+ {
+ throw new IllegalArgumentException("Cannot have a replicated backup
without configuring its live-server!");
+ }
+ final TransportConfiguration config =
configuration.getConnectorConfigurations().get(liveConnectorsName);
+ log.info("config is " + config);
+ final ServerLocatorInternal serverLocator =
+
(ServerLocatorInternal)HornetQClient.createServerLocatorWithoutHA(config);
+ // XXX Need to retry the connection a couple of times
// sit in loop and try and connect, if server is not live then it will return
NOT_LIVE
+ final ClientSessionFactory liveServerSessionFactory =
serverLocator.connect();
+
+ if (liveServerSessionFactory != null)
+ {
+ log.debug("announce backup to live-server");
+ liveServerSessionFactory.getConnection()
+ .getChannel(0, -1)
+ .send(new
NodeAnnounceMessage(getNodeID().toString(), true, config));
+ log.info("backup announced");
+ }
+
+ started = true;
+
+ log.info("HornetQ Backup Server version " +
getVersion().getFullVersion() + " [" + nodeManager.getNodeId() +
+ "] started, waiting live to fail before it gets active");
+
+ nodeManager.awaitLiveNode();
+
+ // XXX ???
+ configuration.setBackup(false);
+
+ // XXX
+
+ initialisePart2();
}
catch (Exception e)
{
@@ -523,7 +565,7 @@
}
}
- public void close(boolean permanently) throws Exception
+ public void close(final boolean permanently) throws Exception
{
}
}
@@ -556,7 +598,7 @@
test.run();
}
-
+
if (!configuration.isBackup())
{
if (configuration.isSharedStore() &&
configuration.isPersistenceEnabled())
@@ -577,10 +619,9 @@
HornetQServerImpl.log.info("HornetQ Server version " +
getVersion().getFullVersion() + " [" + nodeManager.getNodeId() + "]
started");
}
-
-
- if (configuration.isBackup())
+ else
{
+ // server is Backup
if (configuration.isSharedStore())
{
activation = new SharedStoreBackupActivation();
@@ -588,7 +629,6 @@
else
{
// Replicated
-
activation = new SharedNothingBackupActivation();
}
@@ -635,14 +675,14 @@
managementService.removeNotificationListener(groupingHandler);
groupingHandler = null;
}
-
+
if (clusterManager != null)
{
clusterManager.stop();
}
}
-
+
// We close all the exception in an attempt to let any pending IO to finish
// to avoid scenarios where the send or ACK got to disk but the response didn't
get to the client
// It may still be possible to have this scenario on a real failure (without the
use of XA)
@@ -729,9 +769,9 @@
{
memoryManager.stop();
}
-
+
threadPool.shutdown();
-
+
scheduledPool.shutdown();
try
@@ -747,7 +787,7 @@
}
threadPool = null;
-
+
try
{
if (!scheduledPool.awaitTermination(10, TimeUnit.SECONDS))
@@ -761,7 +801,7 @@
}
threadPool = null;
-
+
scheduledPool = null;
pagingManager = null;
@@ -805,22 +845,22 @@
// HornetQServer implementation
// -----------------------------------------------------------
-
+
public void setIdentity(String identity)
{
this.identity = identity;
}
-
+
public String getIdentity()
{
return identity;
}
-
+
public ScheduledExecutorService getScheduledPool()
{
return scheduledPool;
}
-
+
public Configuration getConfiguration()
{
return configuration;
@@ -830,7 +870,7 @@
{
return mbeanServer;
}
-
+
public PagingManager getPagingManager()
{
return pagingManager;
@@ -860,7 +900,7 @@
{
return securityRepository;
}
-
+
public NodeManager getNodeManager()
{
return nodeManager;
@@ -1025,18 +1065,18 @@
{
return createQueue(address, queueName, filterString, durable, temporary, false);
}
-
+
public Queue locateQueue(SimpleString queueName) throws Exception
{
Binding binding = postOffice.getBinding(queueName);
-
+
Bindable queue = binding.getBindable();
-
+
if (!(queue instanceof Queue))
{
throw new IllegalStateException("locateQueue should only be used to locate
queues");
}
-
+
return (Queue) binding.getBindable();
}
@@ -1063,7 +1103,7 @@
}
Queue queue = (Queue)binding.getBindable();
-
+
if (queue.getPageSubscription() != null)
{
queue.getPageSubscription().close();
@@ -1177,7 +1217,7 @@
protected PagingManager createPagingManager()
{
-
+
return new PagingManagerImpl(new
PagingStoreFactoryNIO(configuration.getPagingDirectory(),
configuration.getJournalBufferSize_NIO(),
scheduledPool,
@@ -1187,8 +1227,8 @@
addressSettingsRepository);
}
- /**
- * This method is protected as it may be used as a hook for creating a custom storage
manager (on tests for instance)
+ /**
+ * This method is protected as it may be used as a hook for creating a custom storage
manager (on tests for instance)
*/
protected StorageManager createStorageManager()
{
@@ -1392,7 +1432,7 @@
addressSettingsDeployer.start();
}
-
+
deployAddressSettingsFromConfiguration();
storageManager.start();
@@ -1450,7 +1490,7 @@
// Load the journal and populate queues, transactions and caches in memory
pagingManager.reloadStores();
-
+
JournalLoadInformation[] journalInfo = loadJournals();
compareJournals(journalInfo);
@@ -1488,7 +1528,7 @@
// this needs to be done before clustering is fully activated
callActivateCallbacks();
- // Deply any pre-defined diverts
+ // Deploy any pre-defined diverts
deployDiverts();
if (deploymentManager != null)
@@ -1564,11 +1604,11 @@
for (QueueBindingInfo queueBindingInfo : queueBindingInfos)
{
queueBindingInfosMap.put(queueBindingInfo.getId(), queueBindingInfo);
-
+
Filter filter = FilterImpl.createFilter(queueBindingInfo.getFilterString());
PageSubscription subscription =
pagingManager.getPageStore(queueBindingInfo.getAddress()).getCursorProvier().createSubscription(queueBindingInfo.getId(),
filter, true);
-
+
Queue queue = queueFactory.createQueue(queueBindingInfo.getId(),
queueBindingInfo.getAddress(),
queueBindingInfo.getQueueName(),
@@ -1585,8 +1625,8 @@
managementService.registerAddress(queueBindingInfo.getAddress());
managementService.registerQueue(queue, queueBindingInfo.getAddress(),
storageManager);
-
-
+
+
}
for (GroupingInfo groupingInfo : groupingInfos)
@@ -1672,12 +1712,12 @@
}
Filter filter = FilterImpl.createFilter(filterString);
-
+
long queueID = storageManager.generateUniqueID();
PageSubscription pageSubscription;
-
-
+
+
if (filterString != null &&
filterString.toString().equals(GENERIC_IGNORED_FILTER))
{
pageSubscription = null;
@@ -1771,7 +1811,7 @@
managementService.registerDivert(divert, config);
}
-
+
public void destroyDivert(SimpleString name) throws Exception
{
Binding binding = postOffice.getBinding(name);
@@ -1814,7 +1854,7 @@
managementService.addNotificationListener(groupingHandler);
}
}
-
+
public void deployBridge(BridgeConfiguration config) throws Exception
{
if (clusterManager != null)
@@ -1822,7 +1862,7 @@
clusterManager.deployBridge(config);
}
}
-
+
public void destroyBridge(String name) throws Exception
{
if (clusterManager != null)
@@ -1875,7 +1915,7 @@
{
return sessions.get(sessionName);
}
-
+
/**
* Check if journal directory exists or create it (if configured to do so)
*/
@@ -1896,7 +1936,7 @@
}
}
}
-
+
@Override
public String toString()
{
Modified:
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java
===================================================================
---
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java 2011-06-27
14:14:47 UTC (rev 10885)
+++
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java 2011-06-27
14:18:52 UTC (rev 10886)
@@ -56,6 +56,7 @@
// Constants -----------------------------------------------------
protected static final SimpleString ADDRESS = new
SimpleString("FailoverTestAddress");
+ protected static final String LIVE_NODE_NAME = "hqLIVE";
// Attributes ----------------------------------------------------
@@ -167,13 +168,15 @@
config1.setSecurityEnabled(false);
config1.setSharedStore(false);
config1.setBackup(true);
+ config1.setLiveConnectorName(LIVE_NODE_NAME);
backupConfig = config1;
+
backupServer = createBackupServer();
Configuration config0 = super.createDefaultConfig();
config0.getAcceptorConfigurations().clear();
config0.getAcceptorConfigurations().add(getAcceptorTransportConfiguration(true));
-
+ config0.setName(LIVE_NODE_NAME);
config0.getConnectorConfigurations().put("toBackup",
getConnectorTransportConfiguration(false));
//liveConfig.setBackupConnectorName("toBackup");
config0.setSecurityEnabled(false);
@@ -181,8 +184,8 @@
liveConfig = config0;
liveServer = createLiveServer();
+ liveServer.start();
backupServer.start();
- liveServer.start();
}
@Override