Author: borges
Date: 2011-06-28 11:21:14 -0400 (Tue, 28 Jun 2011)
New Revision: 10893
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/client/impl/ServerLocatorImpl.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/HornetQServer.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java
Log:
HORNETQ-720 Trigger replication from 'live'
(which fails as current code requires it to be started from the backup).
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/client/impl/ServerLocatorImpl.java
===================================================================
---
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/client/impl/ServerLocatorImpl.java 2011-06-28
15:19:24 UTC (rev 10892)
+++
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/client/impl/ServerLocatorImpl.java 2011-06-28
15:21:14 UTC (rev 10893)
@@ -18,8 +18,19 @@
import java.net.InetAddress;
import java.security.AccessController;
import java.security.PrivilegedAction;
-import java.util.*;
-import java.util.concurrent.*;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
import org.hornetq.api.core.DiscoveryGroupConfiguration;
import org.hornetq.api.core.HornetQException;
@@ -57,13 +68,13 @@
private final Set<ClusterTopologyListener> topologyListeners = new
HashSet<ClusterTopologyListener>();
- private Set<ClientSessionFactory> factories = new
HashSet<ClientSessionFactory>();
+ private final Set<ClientSessionFactory> factories = new
HashSet<ClientSessionFactory>();
private TransportConfiguration[] initialConnectors;
- private DiscoveryGroupConfiguration discoveryGroupConfiguration;
+ private final DiscoveryGroupConfiguration discoveryGroupConfiguration;
- private StaticConnector staticConnector = new StaticConnector();
+ private final StaticConnector staticConnector = new StaticConnector();
private Topology topology = new Topology();
@@ -158,14 +169,14 @@
private boolean backup;
private final Exception e = new Exception();
-
+
// To be called when there are ServerLocator being finalized.
// To be used on test assertions
public static Runnable finalizeCallback = null;
-
+
public static synchronized void clearThreadPools()
{
-
+
if (globalThreadPool != null)
{
globalThreadPool.shutdown();
@@ -184,7 +195,7 @@
globalThreadPool = null;
}
}
-
+
if (globalScheduledThreadPool != null)
{
globalScheduledThreadPool.shutdown();
@@ -1318,8 +1329,9 @@
}
}
- class StaticConnector implements Serializable
+ final class StaticConnector implements Serializable
{
+ private static final long serialVersionUID = 959566606042156036L;
private List<Connector> connectors;
public ClientSessionFactory connect() throws HornetQException
@@ -1344,14 +1356,14 @@
try
{
-
+
List<Future<ClientSessionFactory>> futuresList = new
ArrayList<Future<ClientSessionFactory>>();
-
+
for (Connector conn : connectors)
{
futuresList.add(threadPool.submit(conn));
}
-
+
for (int i = 0, futuresSize = futuresList.size(); i < futuresSize; i++)
{
Future<ClientSessionFactory> future = futuresList.get(i);
@@ -1359,7 +1371,9 @@
{
csf = future.get();
if (csf != null)
+ {
break;
+ }
}
catch (Exception e)
{
@@ -1415,6 +1429,7 @@
}
}
+ @Override
public void finalize() throws Throwable
{
if (!closed && finalizeCheck)
@@ -1423,7 +1438,7 @@
System.identityHashCode(this));
log.warn("The ServerLocator you didn't close was created
here:", e);
-
+
if (ServerLocatorImpl.finalizeCallback != null)
{
ServerLocatorImpl.finalizeCallback.run();
@@ -1437,7 +1452,7 @@
class Connector implements Callable<ClientSessionFactory>
{
- private TransportConfiguration initialConnector;
+ private final TransportConfiguration initialConnector;
private volatile ClientSessionFactoryInternal factory;
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java
===================================================================
---
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java 2011-06-28
15:19:24 UTC (rev 10892)
+++
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java 2011-06-28
15:21:14 UTC (rev 10893)
@@ -54,6 +54,9 @@
private final List<Interceptor> interceptors;
+ private final Map<String, ServerSessionPacketHandler> sessionHandlers =
+ new ConcurrentHashMap<String, ServerSessionPacketHandler>();
+
public CoreProtocolManager(final HornetQServer server, final List<Interceptor>
interceptors)
{
this.server = server;
@@ -153,8 +156,18 @@
else if (packet.getType() == PacketImpl.HA_BACKUP_REGISTRATION)
{
HaBackupRegistrationMessage msg = (HaBackupRegistrationMessage)packet;
- System.out.println("HA_BACKUP_REGISTRATION: " + msg);
+ System.out.println("HA_BACKUP_REGISTRATION: " + msg + "
connector=" + msg.getConnector());
System.out.println("HA_BR: " + server.getIdentity() + ",
toString=" + server);
+ try
+ {
+ server.addHaBackup(msg.getConnector());
+ }
+ catch (Exception e)
+ {
+ // XXX This is not what we want
+ e.printStackTrace();
+ throw new RuntimeException(e);
+ }
}
}
});
@@ -164,8 +177,6 @@
return entry;
}
- private final Map<String, ServerSessionPacketHandler> sessionHandlers = new
ConcurrentHashMap<String, ServerSessionPacketHandler>();
-
public ServerSessionPacketHandler getSessionHandler(final String sessionName)
{
return sessionHandlers.get(sessionName);
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/HornetQServer.java
===================================================================
---
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/HornetQServer.java 2011-06-28
15:19:24 UTC (rev 10892)
+++
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/HornetQServer.java 2011-06-28
15:21:14 UTC (rev 10893)
@@ -20,6 +20,7 @@
import javax.management.MBeanServer;
import org.hornetq.api.core.SimpleString;
+import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.core.config.BridgeConfiguration;
import org.hornetq.core.config.Configuration;
import org.hornetq.core.config.DivertConfiguration;
@@ -49,23 +50,23 @@
* This interface defines the internal interface of the HornetQ Server exposed to other
components of the server. The
* external management interface of the HornetQ Server is defined by the
HornetQServerManagement interface This
* interface is never exposed outside the HornetQ server, e.g. by JMX or other means
- *
+ *
* @author <a href="tim.fox(a)jboss.com">Tim Fox</a>
* @author <a href="ataylor(a)redhat.com">Andy Taylor</a>
*/
public interface HornetQServer extends HornetQComponent
{
-
+
void setIdentity(String identity);
-
+
String getIdentity();
-
+
Configuration getConfiguration();
RemotingService getRemotingService();
StorageManager getStorageManager();
-
+
PagingManager getPagingManager();
ManagementService getManagementService();
@@ -75,12 +76,12 @@
MBeanServer getMBeanServer();
Version getVersion();
-
+
NodeManager getNodeManager();
/**
* Returns the resource to manage this HornetQ server.
- *
+ *
* Using this control will throw IllegalStateException if the
* server is not properly started.
*/
@@ -90,7 +91,7 @@
void unregisterActivateCallback(ActivateCallback callback);
- /** The journal at the backup server has to be equivalent as the journal used on the
live node.
+ /** The journal at the backup server has to be equivalent as the journal used on the
live node.
* Or else the backup node is out of sync. */
ReplicationEndpoint connectToReplicationEndpoint(Channel channel) throws Exception;
@@ -143,31 +144,31 @@
SimpleString filterString,
boolean durable,
boolean temporary) throws Exception;
-
+
Queue locateQueue(SimpleString queueName) throws Exception;
void destroyQueue(SimpleString queueName, ServerSession session) throws Exception;
ScheduledExecutorService getScheduledPool();
-
+
ExecutorFactory getExecutorFactory();
void setGroupingHandler(GroupingHandler groupingHandler);
GroupingHandler getGroupingHandler();
-
+
ReplicationEndpoint getReplicationEndpoint();
-
+
ReplicationManager getReplicationManager();
- boolean checkActivate() throws Exception;
-
+ boolean checkActivate() throws Exception;
+
void deployDivert(DivertConfiguration config) throws Exception;
void destroyDivert(SimpleString name) throws Exception;
ConnectorsService getConnectorsService();
-
+
void deployBridge(BridgeConfiguration config) throws Exception;
void destroyBridge(String name) throws Exception;
@@ -175,4 +176,10 @@
ServerSession getSessionByID(String sessionID);
void stop(boolean failoverOnServerShutdown) throws Exception;
+
+ /**
+ * @param connector
+ * @throws Exception
+ */
+ void addHaBackup(TransportConfiguration connector) throws Exception;
}
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
---
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java 2011-06-28
15:19:24 UTC (rev 10892)
+++
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java 2011-06-28
15:21:14 UTC (rev 10893)
@@ -41,8 +41,10 @@
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.api.core.client.ClientSessionFactory;
import org.hornetq.api.core.client.HornetQClient;
+import org.hornetq.api.core.client.ServerLocator;
import org.hornetq.core.asyncio.impl.AsynchronousFileImpl;
import org.hornetq.core.client.impl.ClientSessionFactoryImpl;
+import org.hornetq.core.client.impl.ClientSessionFactoryInternal;
import org.hornetq.core.client.impl.ServerLocatorInternal;
import org.hornetq.core.config.BridgeConfiguration;
import org.hornetq.core.config.Configuration;
@@ -85,6 +87,7 @@
import org.hornetq.core.remoting.server.impl.RemotingServiceImpl;
import org.hornetq.core.replication.ReplicationEndpoint;
import org.hornetq.core.replication.ReplicationManager;
+import org.hornetq.core.replication.impl.ReplicationManagerImpl;
import org.hornetq.core.security.CheckType;
import org.hornetq.core.security.Role;
import org.hornetq.core.security.SecurityStore;
@@ -119,6 +122,7 @@
import org.hornetq.spi.core.protocol.RemotingConnection;
import org.hornetq.spi.core.protocol.SessionCallback;
import org.hornetq.spi.core.security.HornetQSecurityManager;
+import org.hornetq.utils.ConcurrentHashSet;
import org.hornetq.utils.ExecutorFactory;
import org.hornetq.utils.HornetQThreadFactory;
import org.hornetq.utils.OrderedExecutorFactory;
@@ -209,6 +213,8 @@
private final Map<String, ServerSession> sessions = new
ConcurrentHashMap<String, ServerSession>();
+ private final Set<String> sharedNothingBackups = new
ConcurrentHashSet<String>();
+
private final Object initialiseLock = new Object();
private boolean initialised;
@@ -576,6 +582,8 @@
private Activation activation;
+ private ServerLocator serverLocator;
+
public synchronized void start() throws Exception
{
initialiseLogging();
@@ -1247,31 +1255,23 @@
// Private
//
--------------------------------------------------------------------------------------
- // private boolean startReplication() throws Exception
- // {
- // // get list of backup names!
- //
- // if (!configuration.isSharedStore() && backupConnectorName != null)
- // {
- // TransportConfiguration backupConnector =
- // configuration.getConnectorConfigurations().get(backupConnectorName);
- //
- // if (backupConnector == null)
- // {
- // HornetQServerImpl.log.warn("connector with name '" +
backupConnectorName +
- // "' is not defined in the configuration.");
- // return false;
- // }
- // replicationFailoverManager =
createBackupConnectionFailoverManager(backupConnector,
- // threadPool, scheduledPool);
- //
- // replicationManager = new ReplicationManagerImpl(replicationFailoverManager,
executorFactory);
- // replicationManager.start();
- // }
- //
- // return true;
- // }
+ private boolean startReplication(TransportConfiguration connector) throws Exception
+ {
+ assert !configuration.isSharedStore();
+ if (configuration.isSharedStore() || connector == null)
+ {
+ return true;
+ }
+ serverLocator = HornetQClient.createServerLocatorWithHA(connector);
+ ClientSessionFactoryInternal replicationFailoverManager =
+
(ClientSessionFactoryInternal)serverLocator.createSessionFactory(connector);
+ replicationManager = new ReplicationManagerImpl(replicationFailoverManager,
executorFactory);
+ replicationManager.start();
+
+ return true;
+ }
+
private void callActivateCallbacks()
{
for (ActivateCallback callback : activateCallbacks)
@@ -1942,6 +1942,12 @@
return "HornetQServerImpl::" + (identity == null ? "" :
(identity + ", ")) + (nodeManager != null ? ("serverUUID=" +
nodeManager.getUUID()) : "");
}
- // Inner classes
- // --------------------------------------------------------------------------------
+ @Override
+ public void addHaBackup(TransportConfiguration connector) throws Exception
+ {
+ log.info(connector + " " + connector.getFactoryClassName() + "
" + connector.getParams() + " " +
+ replicationManager);
+ startReplication(connector);
+ // throw new UnsupportedOperationException("unimplemented");
+ }
}