Author: clebert.suconic(a)jboss.com
Date: 2011-09-19 20:50:32 -0400 (Mon, 19 Sep 2011)
New Revision: 11371
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ServerLocatorInternal.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/Topology.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/remoting/impl/invm/InVMAcceptor.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/remoting/impl/invm/InVMAcceptorFactory.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/remoting/impl/invm/InVMConnection.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/remoting/impl/invm/InVMConnector.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/remoting/impl/netty/NettyAcceptor.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/remoting/impl/netty/NettyAcceptorFactory.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/remoting/impl/netty/NettyConnection.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/remoting/impl/netty/NettyConnector.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/ClusterConnection.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/ClusterManager.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerInternal.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/spi/core/protocol/ProtocolManager.java
branches/Branch_2_2_EAP/src/main/org/hornetq/spi/core/remoting/Acceptor.java
branches/Branch_2_2_EAP/src/main/org/hornetq/spi/core/remoting/AcceptorFactory.java
branches/Branch_2_2_EAP/src/main/org/hornetq/spi/core/remoting/ConnectionLifeCycleListener.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/OnewayTwoNodeClusterTest.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/SimpleSymmetricClusterTest.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/TwoWayTwoNodeClusterTest.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/failover/FailBackAutoTest.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/failover/GroupingFailoverTestBase.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/failover/MultipleBackupsFailoverTestBase.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/failover/MultipleLivesMultipleBackupsFailoverTest.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/http/NettyHttpTest.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/largemessage/mock/MockConnector.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyAcceptorFactoryTest.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyAcceptorTest.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyConnectionTest.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyConnectorTest.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/util/ServiceTestBase.java
Log:
Refactoring clustering manager / cluster connection
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java 2011-09-19
23:47:54 UTC (rev 11370)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java 2011-09-20
00:50:32 UTC (rev 11371)
@@ -53,6 +53,7 @@
import org.hornetq.core.remoting.FailureListener;
import org.hornetq.core.version.Version;
import org.hornetq.spi.core.protocol.ProtocolType;
+import org.hornetq.spi.core.remoting.Acceptor;
import org.hornetq.spi.core.remoting.BufferHandler;
import org.hornetq.spi.core.remoting.Connection;
import org.hornetq.spi.core.remoting.ConnectionLifeCycleListener;
@@ -367,7 +368,7 @@
// ConnectionLifeCycleListener implementation
--------------------------------------------------
- public void connectionCreated(final Connection connection, final ProtocolType
protocol)
+ public void connectionCreated(final Acceptor acceptor, final Connection connection,
final ProtocolType protocol)
{
}
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java 2011-09-19
23:47:54 UTC (rev 11370)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ServerLocatorImpl.java 2011-09-20
00:50:32 UTC (rev 11371)
@@ -1111,6 +1111,11 @@
}
}
+ public String getIdentity()
+ {
+ return identity;
+ }
+
public void setIdentity(String identity)
{
this.identity = identity;
@@ -1282,8 +1287,8 @@
{
log.debug("nodeDown " + this + " nodeID=" + nodeID + "
as being down", new Exception("trace"));
}
-
- if (topology.removeMember(eventTime, nodeID))
+
+ if (!(isClusterConnection() && nodeID.equals(this.getNodeID())) &&
topology.removeMember(eventTime, nodeID))
{
if (topology.isEmpty())
{
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ServerLocatorInternal.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ServerLocatorInternal.java 2011-09-19
23:47:54 UTC (rev 11370)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ServerLocatorInternal.java 2011-09-20
00:50:32 UTC (rev 11371)
@@ -39,8 +39,12 @@
void setAfterConnectionInternalListener(AfterConnectInternalListener listener);
- /** Used to better identify Cluster Connection Locators on logs while debugging logs
*/
+ /** Used to better identify Cluster Connection Locators on logs. To facilitate
eventual debugging.
+ *
+ * This method used to be on tests interface, but I'm now making it part of the
public interface since*/
void setIdentity(String identity);
+
+ String getIdentity();
void setNodeID(String nodeID);
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/Topology.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/Topology.java 2011-09-19
23:47:54 UTC (rev 11370)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/Topology.java 2011-09-20
00:50:32 UTC (rev 11371)
@@ -108,7 +108,7 @@
{
if (log.isDebugEnabled())
{
- log.info(this + "::Live node " + nodeId + "=" +
memberInput);
+ log.debug(this + "::node " + nodeId + "=" +
memberInput);
}
memberInput.setUniqueEventID(System.currentTimeMillis());
mapTopology.remove(nodeId);
@@ -212,7 +212,7 @@
currentMember +
", memberInput=" +
memberInput +
- "newMember=" + newMember);
+ "newMember=" + newMember, new Exception
("trace"));
}
@@ -301,7 +301,7 @@
{
if (member.getUniqueEventID() > uniqueEventID)
{
- log.info("The removeMember was issued before the node " + nodeId
+ " was started, ignoring call");
+ log.debug("The removeMember was issued before the node " +
nodeId + " was started, ignoring call");
member = null;
}
else
@@ -482,22 +482,17 @@
public synchronized String describe(final String text)
{
- String desc = text + "\n";
+ String desc = text + "topology on " + this + ":\n";
for (Entry<String, TopologyMember> entry : new HashMap<String,
TopologyMember>(mapTopology).entrySet())
{
desc += "\t" + entry.getKey() + " => " + entry.getValue()
+ "\n";
}
desc += "\t" + "nodes=" + nodes() + "\t" +
"members=" + members();
- return desc;
- }
-
- public void clear()
- {
- if (Topology.log.isDebugEnabled())
+ if (mapTopology.isEmpty())
{
- Topology.log.debug(this + "::clear", new
Exception("trace"));
+ desc += "\tEmpty";
}
- mapTopology.clear();
+ return desc;
}
public int members()
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java 2011-09-19
23:47:54 UTC (rev 11370)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java 2011-09-20
00:50:32 UTC (rev 11371)
@@ -42,6 +42,7 @@
import org.hornetq.spi.core.protocol.ConnectionEntry;
import org.hornetq.spi.core.protocol.ProtocolManager;
import org.hornetq.spi.core.protocol.RemotingConnection;
+import org.hornetq.spi.core.remoting.Acceptor;
import org.hornetq.spi.core.remoting.Connection;
/**
@@ -68,7 +69,7 @@
this.interceptors = interceptors;
}
- public ConnectionEntry createConnectionEntry(final Connection connection)
+ public ConnectionEntry createConnectionEntry(final Acceptor acceptorUsed, final
Connection connection)
{
final Configuration config = server.getConfiguration();
@@ -177,16 +178,18 @@
};
final boolean isCC = msg.isClusterConnection();
-
- server.getClusterManager().addClusterTopologyListener(listener, isCC);
-
- rc.addCloseListener(new CloseListener()
+ if (acceptorUsed.getClusterConnection() != null)
{
- public void connectionClosed()
+
acceptorUsed.getClusterConnection().addClusterTopologyListener(listener, isCC);
+
+ rc.addCloseListener(new CloseListener()
{
- server.getClusterManager().removeClusterTopologyListener(listener,
isCC);
- }
- });
+ public void connectionClosed()
+ {
+
acceptorUsed.getClusterConnection().removeClusterTopologyListener(listener, isCC);
+ }
+ });
+ }
}
else if (packet.getType() == PacketImpl.NODE_ANNOUNCE)
{
@@ -205,7 +208,8 @@
{
log.trace("Server " + server + " receiving nodeUp from
NodeID=" + msg.getNodeID() + ", pair=" + pair);
}
- server.getClusterManager().nodeAnnounced(msg.getCurrentEventID(),
msg.getNodeID(), pair, msg.isBackup());
+
+ acceptorUsed.getClusterConnection().nodeAnnounced(msg.getCurrentEventID(),
msg.getNodeID(), pair, msg.isBackup());
}
}
});
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java 2011-09-19
23:47:54 UTC (rev 11370)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java 2011-09-20
00:50:32 UTC (rev 11371)
@@ -38,6 +38,7 @@
import org.hornetq.spi.core.protocol.ConnectionEntry;
import org.hornetq.spi.core.protocol.ProtocolManager;
import org.hornetq.spi.core.protocol.RemotingConnection;
+import org.hornetq.spi.core.remoting.Acceptor;
import org.hornetq.spi.core.remoting.Connection;
import org.hornetq.spi.core.security.HornetQSecurityManager;
import org.hornetq.utils.UUIDGenerator;
@@ -109,7 +110,7 @@
// ProtocolManager implementation --------------------------------
- public ConnectionEntry createConnectionEntry(final Connection connection)
+ public ConnectionEntry createConnectionEntry(final Acceptor acceptorUsed, final
Connection connection)
{
StompConnection conn = new StompConnection(connection, this);
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/remoting/impl/invm/InVMAcceptor.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/remoting/impl/invm/InVMAcceptor.java 2011-09-19
23:47:54 UTC (rev 11370)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/remoting/impl/invm/InVMAcceptor.java 2011-09-20
00:50:32 UTC (rev 11371)
@@ -21,6 +21,7 @@
import org.hornetq.api.core.SimpleString;
import org.hornetq.api.core.management.NotificationType;
import org.hornetq.core.logging.Logger;
+import org.hornetq.core.server.cluster.ClusterConnection;
import org.hornetq.core.server.management.Notification;
import org.hornetq.core.server.management.NotificationService;
import org.hornetq.spi.core.protocol.ProtocolType;
@@ -54,16 +55,21 @@
private volatile boolean started;
private final ExecutorFactory executorFactory;
+
+ private final ClusterConnection clusterConnection;
private boolean paused;
private NotificationService notificationService;
- public InVMAcceptor(final Map<String, Object> configuration,
+ public InVMAcceptor(final ClusterConnection clusterConnection,
+ final Map<String, Object> configuration,
final BufferHandler handler,
final ConnectionLifeCycleListener listener,
final Executor threadPool)
{
+ this.clusterConnection = clusterConnection;
+
this.handler = handler;
this.listener = listener;
@@ -73,6 +79,11 @@
executorFactory = new OrderedExecutorFactory(threadPool);
}
+ public ClusterConnection getClusterConnection()
+ {
+ return clusterConnection;
+ }
+
public synchronized void start() throws Exception
{
if (started)
@@ -189,7 +200,7 @@
throw new IllegalStateException("Acceptor is not started");
}
- new InVMConnection(id, connectionID, remoteHandler, new Listener(connector),
clientExecutor);
+ new InVMConnection(this, id, connectionID, remoteHandler, new Listener(connector),
clientExecutor);
}
public void disconnect(final String connectionID)
@@ -209,6 +220,8 @@
private class Listener implements ConnectionLifeCycleListener
{
+ //private static Listener instance = new Listener();
+
private final InVMConnector connector;
Listener(final InVMConnector connector)
@@ -216,14 +229,14 @@
this.connector = connector;
}
- public void connectionCreated(final Connection connection, final ProtocolType
protocol)
+ public void connectionCreated(final Acceptor acceptor, final Connection connection,
final ProtocolType protocol)
{
if (connections.putIfAbsent((String)connection.getID(), connection) != null)
{
throw new IllegalArgumentException("Connection already exists with id
" + connection.getID());
}
- listener.connectionCreated(connection, protocol);
+ listener.connectionCreated(acceptor, connection, protocol);
}
public void connectionDestroyed(final Object connectionID)
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/remoting/impl/invm/InVMAcceptorFactory.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/remoting/impl/invm/InVMAcceptorFactory.java 2011-09-19
23:47:54 UTC (rev 11370)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/remoting/impl/invm/InVMAcceptorFactory.java 2011-09-20
00:50:32 UTC (rev 11371)
@@ -17,6 +17,7 @@
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
+import org.hornetq.core.server.cluster.ClusterConnection;
import org.hornetq.spi.core.remoting.Acceptor;
import org.hornetq.spi.core.remoting.AcceptorFactory;
import org.hornetq.spi.core.remoting.BufferDecoder;
@@ -31,14 +32,15 @@
*/
public class InVMAcceptorFactory implements AcceptorFactory
{
- public Acceptor createAcceptor(final Map<String, Object> configuration,
+ public Acceptor createAcceptor(final ClusterConnection clusterConnection,
+ final Map<String, Object> configuration,
final BufferHandler handler,
final BufferDecoder decoder,
final ConnectionLifeCycleListener listener,
final Executor threadPool,
final ScheduledExecutorService scheduledThreadPool)
{
- return new InVMAcceptor(configuration, handler, listener, threadPool);
+ return new InVMAcceptor(clusterConnection, configuration, handler, listener,
threadPool);
}
public Set<String> getAllowableProperties()
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/remoting/impl/invm/InVMConnection.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/remoting/impl/invm/InVMConnection.java 2011-09-19
23:47:54 UTC (rev 11370)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/remoting/impl/invm/InVMConnection.java 2011-09-20
00:50:32 UTC (rev 11371)
@@ -21,6 +21,7 @@
import org.hornetq.api.core.HornetQBuffers;
import org.hornetq.core.logging.Logger;
import org.hornetq.spi.core.protocol.ProtocolType;
+import org.hornetq.spi.core.remoting.Acceptor;
import org.hornetq.spi.core.remoting.BufferHandler;
import org.hornetq.spi.core.remoting.Connection;
import org.hornetq.spi.core.remoting.ConnectionLifeCycleListener;
@@ -57,15 +58,17 @@
private volatile boolean closing;
- public InVMConnection(final int serverID,
+ public InVMConnection(final Acceptor acceptor,
+ final int serverID,
final BufferHandler handler,
final ConnectionLifeCycleListener listener,
final Executor executor)
{
- this(serverID, UUIDGenerator.getInstance().generateSimpleStringUUID().toString(),
handler, listener, executor);
+ this(acceptor, serverID,
UUIDGenerator.getInstance().generateSimpleStringUUID().toString(), handler, listener,
executor);
}
- public InVMConnection(final int serverID,
+ public InVMConnection(final Acceptor acceptor,
+ final int serverID,
final String id,
final BufferHandler handler,
final ConnectionLifeCycleListener listener,
@@ -81,7 +84,7 @@
this.executor = executor;
- listener.connectionCreated(this, ProtocolType.CORE);
+ listener.connectionCreated(acceptor, this, ProtocolType.CORE);
}
public void close()
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/remoting/impl/invm/InVMConnector.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/remoting/impl/invm/InVMConnector.java 2011-09-19
23:47:54 UTC (rev 11370)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/remoting/impl/invm/InVMConnector.java 2011-09-20
00:50:32 UTC (rev 11371)
@@ -172,19 +172,20 @@
final ConnectionLifeCycleListener
listener,
final Executor serverExecutor)
{
- return new InVMConnection(id, handler, listener, serverExecutor);
+ // No acceptor on a client connection
+ return new InVMConnection(null, id, handler, listener, serverExecutor);
}
private class Listener implements ConnectionLifeCycleListener
{
- public void connectionCreated(final Connection connection, final ProtocolType
protocol)
+ public void connectionCreated(final Acceptor acceptor, final Connection connection,
final ProtocolType protocol)
{
if (connections.putIfAbsent((String)connection.getID(), connection) != null)
{
throw new IllegalArgumentException("Connection already exists with id
" + connection.getID());
}
- listener.connectionCreated(connection, protocol);
+ listener.connectionCreated(acceptor, connection, protocol);
}
public void connectionDestroyed(final Object connectionID)
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/remoting/impl/netty/NettyAcceptor.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/remoting/impl/netty/NettyAcceptor.java 2011-09-19
23:47:54 UTC (rev 11370)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/remoting/impl/netty/NettyAcceptor.java 2011-09-20
00:50:32 UTC (rev 11371)
@@ -37,6 +37,7 @@
import org.hornetq.core.logging.Logger;
import org.hornetq.core.protocol.stomp.WebSocketServerHandler;
import org.hornetq.core.remoting.impl.ssl.SSLSupport;
+import org.hornetq.core.server.cluster.ClusterConnection;
import org.hornetq.core.server.management.Notification;
import org.hornetq.core.server.management.NotificationService;
import org.hornetq.spi.core.protocol.ProtocolType;
@@ -87,6 +88,8 @@
{
static final Logger log = Logger.getLogger(NettyAcceptor.class);
+ private ClusterConnection clusterConnection;
+
private ChannelFactory channelFactory;
private volatile ChannelGroup serverChannelGroup;
@@ -158,6 +161,7 @@
private final long batchDelay;
private final boolean directDeliver;
+
public NettyAcceptor(final Map<String, Object> configuration,
final BufferHandler handler,
@@ -166,6 +170,21 @@
final Executor threadPool,
final ScheduledExecutorService scheduledThreadPool)
{
+ this(null, configuration, handler, decoder, listener, threadPool,
scheduledThreadPool);
+ }
+
+
+ public NettyAcceptor(final ClusterConnection clusterConnection,
+ final Map<String, Object> configuration,
+ final BufferHandler handler,
+ final BufferDecoder decoder,
+ final ConnectionLifeCycleListener listener,
+ final Executor threadPool,
+ final ScheduledExecutorService scheduledThreadPool)
+ {
+
+ this.clusterConnection = clusterConnection;
+
this.handler = handler;
this.decoder = decoder;
@@ -618,6 +637,14 @@
{
this.notificationService = notificationService;
}
+
+ /* (non-Javadoc)
+ * @see org.hornetq.spi.core.remoting.Acceptor#getClusterConnection()
+ */
+ public ClusterConnection getClusterConnection()
+ {
+ return clusterConnection;
+ }
// Inner classes
-----------------------------------------------------------------------------
@@ -633,7 +660,7 @@
@Override
public void channelConnected(final ChannelHandlerContext ctx, final
ChannelStateEvent e) throws Exception
{
- new NettyConnection(e.getChannel(), new Listener(), !httpEnabled &&
batchDelay > 0, directDeliver);
+ new NettyConnection(NettyAcceptor.this, e.getChannel(), new Listener(),
!httpEnabled && batchDelay > 0, directDeliver);
SslHandler sslHandler = ctx.getPipeline().get(SslHandler.class);
if (sslHandler != null)
@@ -662,14 +689,14 @@
private class Listener implements ConnectionLifeCycleListener
{
- public void connectionCreated(final Connection connection, final ProtocolType
protocol)
+ public void connectionCreated(final Acceptor acceptor, final Connection connection,
final ProtocolType protocol)
{
if (connections.putIfAbsent(connection.getID(), (NettyConnection)connection) !=
null)
{
throw new IllegalArgumentException("Connection already exists with id
" + connection.getID());
}
- listener.connectionCreated(connection, NettyAcceptor.this.protocol);
+ listener.connectionCreated(acceptor, connection, NettyAcceptor.this.protocol);
}
public void connectionDestroyed(final Object connectionID)
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/remoting/impl/netty/NettyAcceptorFactory.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/remoting/impl/netty/NettyAcceptorFactory.java 2011-09-19
23:47:54 UTC (rev 11370)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/remoting/impl/netty/NettyAcceptorFactory.java 2011-09-20
00:50:32 UTC (rev 11371)
@@ -18,6 +18,7 @@
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
+import org.hornetq.core.server.cluster.ClusterConnection;
import org.hornetq.spi.core.remoting.Acceptor;
import org.hornetq.spi.core.remoting.AcceptorFactory;
import org.hornetq.spi.core.remoting.BufferDecoder;
@@ -31,14 +32,15 @@
*/
public class NettyAcceptorFactory implements AcceptorFactory
{
- public Acceptor createAcceptor(final Map<String, Object> configuration,
+ public Acceptor createAcceptor(final ClusterConnection connection,
+ final Map<String, Object> configuration,
final BufferHandler handler,
final BufferDecoder decoder,
final ConnectionLifeCycleListener listener,
final Executor threadPool,
final ScheduledExecutorService scheduledThreadPool)
{
- return new NettyAcceptor(configuration, handler, decoder, listener, threadPool,
scheduledThreadPool);
+ return new NettyAcceptor(connection, configuration, handler, decoder, listener,
threadPool, scheduledThreadPool);
}
public Set<String> getAllowableProperties()
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/remoting/impl/netty/NettyConnection.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/remoting/impl/netty/NettyConnection.java 2011-09-19
23:47:54 UTC (rev 11370)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/remoting/impl/netty/NettyConnection.java 2011-09-20
00:50:32 UTC (rev 11371)
@@ -21,6 +21,7 @@
import org.hornetq.core.buffers.impl.ChannelBufferWrapper;
import org.hornetq.core.logging.Logger;
import org.hornetq.spi.core.protocol.ProtocolType;
+import org.hornetq.spi.core.remoting.Acceptor;
import org.hornetq.spi.core.remoting.Connection;
import org.hornetq.spi.core.remoting.ConnectionLifeCycleListener;
import org.hornetq.spi.core.remoting.ReadyListener;
@@ -72,6 +73,15 @@
boolean batchingEnabled,
boolean directDeliver)
{
+ this(null, channel, listener, batchingEnabled, directDeliver);
+ }
+
+ public NettyConnection(final Acceptor acceptor,
+ final Channel channel,
+ final ConnectionLifeCycleListener listener,
+ boolean batchingEnabled,
+ boolean directDeliver)
+ {
this.channel = channel;
this.listener = listener;
@@ -80,7 +90,7 @@
this.directDeliver = directDeliver;
- listener.connectionCreated(this, ProtocolType.CORE);
+ listener.connectionCreated(acceptor, this, ProtocolType.CORE);
}
// Public --------------------------------------------------------
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/remoting/impl/netty/NettyConnector.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/remoting/impl/netty/NettyConnector.java 2011-09-19
23:47:54 UTC (rev 11370)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/remoting/impl/netty/NettyConnector.java 2011-09-20
00:50:32 UTC (rev 11371)
@@ -35,6 +35,7 @@
import org.hornetq.core.logging.Logger;
import org.hornetq.core.remoting.impl.ssl.SSLSupport;
import org.hornetq.spi.core.protocol.ProtocolType;
+import org.hornetq.spi.core.remoting.Acceptor;
import org.hornetq.spi.core.remoting.BufferHandler;
import org.hornetq.spi.core.remoting.Connection;
import org.hornetq.spi.core.remoting.ConnectionLifeCycleListener;
@@ -495,7 +496,8 @@
ch.getPipeline().get(HornetQChannelHandler.class).active = true;
}
- NettyConnection conn = new NettyConnection(ch, new Listener(), !httpEnabled
&& batchDelay > 0, false);
+ // No acceptor on a client connection
+ NettyConnection conn = new NettyConnection(null, ch, new Listener(),
!httpEnabled && batchDelay > 0, false);
return conn;
}
@@ -689,7 +691,7 @@
private class Listener implements ConnectionLifeCycleListener
{
- public void connectionCreated(final Connection connection, final ProtocolType
protocol)
+ public void connectionCreated(final Acceptor acceptor, final Connection connection,
final ProtocolType protocol)
{
if (connections.putIfAbsent(connection.getID(), connection) != null)
{
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java 2011-09-19
23:47:54 UTC (rev 11370)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java 2011-09-20
00:50:32 UTC (rev 11371)
@@ -39,6 +39,7 @@
import org.hornetq.core.remoting.impl.netty.TransportConstants;
import org.hornetq.core.remoting.server.RemotingService;
import org.hornetq.core.server.HornetQServer;
+import org.hornetq.core.server.cluster.ClusterManager;
import org.hornetq.core.server.impl.ServerSessionImpl;
import org.hornetq.core.server.management.ManagementService;
import org.hornetq.spi.core.protocol.ConnectionEntry;
@@ -94,6 +95,8 @@
private final ScheduledExecutorService scheduledThreadPool;
private FailureCheckAndFlushThread failureCheckAndFlushThread;
+
+ private final ClusterManager clusterManager;
private Map<ProtocolType, ProtocolManager> protocolMap = new
ConcurrentHashMap<ProtocolType, ProtocolManager>();
@@ -101,7 +104,8 @@
// Constructors --------------------------------------------------
- public RemotingServiceImpl(final Configuration config,
+ public RemotingServiceImpl(final ClusterManager clusterManager,
+ final Configuration config,
final HornetQServer server,
final ManagementService managementService,
final ScheduledExecutorService scheduledThreadPool)
@@ -109,6 +113,8 @@
transportConfigs = config.getAcceptorConfigurations();
this.server = server;
+
+ this.clusterManager = clusterManager;
ClassLoader loader = Thread.currentThread().getContextClassLoader();
for (String interceptorClass : config.getInterceptorClassNames())
@@ -202,7 +208,9 @@
ProtocolManager manager = protocolMap.get(protocol);
- Acceptor acceptor = factory.createAcceptor(info.getParams(),
+ // TODO: parameterize the cluster connection
+ Acceptor acceptor =
factory.createAcceptor(clusterManager.getDefaultConnection(),
+ info.getParams(),
new DelegatingBufferHandler(),
manager,
this,
@@ -370,7 +378,7 @@
return protocolMap.get(protocol);
}
- public void connectionCreated(final Connection connection, final ProtocolType
protocol)
+ public void connectionCreated(final Acceptor acceptor, final Connection connection,
final ProtocolType protocol)
{
if (server == null)
{
@@ -384,7 +392,7 @@
throw new IllegalArgumentException("Unknown protocol " + protocol);
}
- ConnectionEntry entry = pmgr.createConnectionEntry(connection);
+ ConnectionEntry entry = pmgr.createConnectionEntry(acceptor, connection);
if (isTrace)
{
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/ClusterConnection.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/ClusterConnection.java 2011-09-19
23:47:54 UTC (rev 11370)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/ClusterConnection.java 2011-09-20
00:50:32 UTC (rev 11371)
@@ -15,9 +15,11 @@
import java.util.Map;
+import org.hornetq.api.core.Pair;
import org.hornetq.api.core.SimpleString;
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.api.core.client.ClusterTopologyListener;
+import org.hornetq.core.client.impl.Topology;
import org.hornetq.core.server.HornetQComponent;
import org.hornetq.core.server.HornetQServer;
@@ -37,7 +39,13 @@
String getNodeID();
HornetQServer getServer();
+
+ void nodeAnnounced(long eventUID, String nodeID, Pair<TransportConfiguration,
TransportConfiguration> connectorPair, boolean backup);
+ void addClusterTopologyListener(ClusterTopologyListener listener, boolean
clusterConnection);
+
+ void removeClusterTopologyListener(ClusterTopologyListener listener, boolean
clusterConnection);
+
/**
* @return a Map of node ID and addresses
*/
@@ -47,8 +55,14 @@
TransportConfiguration getConnector();
+ Topology getTopology();
+
void flushExecutor();
// for debug
String describe();
+
+ void informTopology();
+
+ void announceBackup();
}
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/ClusterManager.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/ClusterManager.java 2011-09-19
23:47:54 UTC (rev 11370)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/ClusterManager.java 2011-09-20
00:50:32 UTC (rev 11371)
@@ -16,11 +16,7 @@
import java.util.Map;
import java.util.Set;
-import org.hornetq.api.core.Pair;
import org.hornetq.api.core.SimpleString;
-import org.hornetq.api.core.TransportConfiguration;
-import org.hornetq.api.core.client.ClusterTopologyListener;
-import org.hornetq.core.client.impl.Topology;
import org.hornetq.core.config.BridgeConfiguration;
import org.hornetq.core.server.HornetQComponent;
@@ -37,26 +33,26 @@
Map<String, Bridge> getBridges();
Set<ClusterConnection> getClusterConnections();
+
+ /**
+ * Return the default ClusterConnection to be used case it's not defined by the
acceptor
+ * @return
+ */
+ ClusterConnection getDefaultConnection();
ClusterConnection getClusterConnection(SimpleString name);
Set<BroadcastGroup> getBroadcastGroups();
-
- void addClusterTopologyListener(ClusterTopologyListener listener, boolean
clusterConnection);
- void removeClusterTopologyListener(ClusterTopologyListener listener, boolean
clusterConnection);
-
void activate();
- void nodeAnnounced(long eventUID, String nodeID, Pair<TransportConfiguration,
TransportConfiguration> connectorPair, boolean backup);
-
- Topology getTopology();
-
void flushExecutor();
void announceBackup() throws Exception;
+
+ void deploy() throws Exception;
- void deployBridge(BridgeConfiguration config) throws Exception;
+ void deployBridge(BridgeConfiguration config, boolean start) throws Exception;
void destroyBridge(String name) throws Exception;
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2011-09-19
23:47:54 UTC (rev 11370)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2011-09-20
00:50:32 UTC (rev 11371)
@@ -33,6 +33,8 @@
import org.hornetq.api.core.SimpleString;
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.api.core.client.ClientMessage;
+import org.hornetq.api.core.client.ClientSessionFactory;
+import org.hornetq.api.core.client.ClusterTopologyListener;
import org.hornetq.api.core.management.ManagementHelper;
import org.hornetq.api.core.management.NotificationType;
import org.hornetq.core.client.impl.AfterConnectInternalListener;
@@ -46,6 +48,7 @@
import org.hornetq.core.postoffice.Bindings;
import org.hornetq.core.postoffice.PostOffice;
import org.hornetq.core.postoffice.impl.PostOfficeImpl;
+import org.hornetq.core.protocol.core.impl.wireformat.NodeAnnounceMessage;
import org.hornetq.core.server.HornetQServer;
import org.hornetq.core.server.Queue;
import org.hornetq.core.server.cluster.Bridge;
@@ -79,8 +82,6 @@
private final ExecutorFactory executorFactory;
- private final Topology clusterManagerTopology;
-
private final Executor executor;
private final HornetQServer server;
@@ -140,9 +141,17 @@
private final Set<TransportConfiguration> allowableConnections = new
HashSet<TransportConfiguration>();
private final ClusterManagerInternal manager;
+
+
+ // Stuff that used to be on the ClusterManager
+
+ private final Topology topology = new Topology(this);
+
+ private volatile ServerLocatorInternal backupServerLocator;
+
+
public ClusterConnectionImpl(final ClusterManagerInternal manager,
- final Topology clusterManagerTopology,
final TransportConfiguration[] tcConfigs,
final TransportConfiguration connector,
final SimpleString name,
@@ -204,6 +213,8 @@
this.executorFactory = executorFactory;
this.executor = executorFactory.getExecutor();
+
+ this.topology.setExecutor(executor);
this.server = server;
@@ -227,8 +238,6 @@
this.callTimeout = callTimeout;
- this.clusterManagerTopology = clusterManagerTopology;
-
clusterConnector = new StaticClusterConnector(tcConfigs);
if (tcConfigs != null && tcConfigs.length > 0)
@@ -244,7 +253,6 @@
}
public ClusterConnectionImpl(final ClusterManagerImpl manager,
- final Topology clusterManagerTopology,
DiscoveryGroupConfiguration dg,
final TransportConfiguration connector,
final SimpleString name,
@@ -308,6 +316,8 @@
this.executorFactory = executorFactory;
this.executor = executorFactory.getExecutor();
+
+ this.topology.setExecutor(executor);
this.server = server;
@@ -330,11 +340,9 @@
clusterConnector = new DiscoveryClusterConnector(dg);
this.manager = manager;
-
- this.clusterManagerTopology = clusterManagerTopology;
}
- public void start() throws Exception
+ public void start() throws Exception
{
synchronized (this)
{
@@ -410,13 +418,21 @@
props);
managementService.sendNotification(notification);
}
+
+
executor.execute(new Runnable()
{
public void run()
{
synchronized (ClusterConnectionImpl.this)
{
+ if (backupServerLocator != null)
+ {
+ backupServerLocator.close();
+ backupServerLocator = null;
+ }
+
if (serverLocator != null)
{
serverLocator.close();
@@ -430,12 +446,97 @@
started = false;
}
+
+ public void announceBackup()
+ {
+ this.backupServerLocator = clusterConnector.createServerLocator(false);
+
+ backupServerLocator.setReconnectAttempts(-1);
+ backupServerLocator.setInitialConnectAttempts(-1);
+
+
+ executor.execute(new Runnable()
+ {
+ public void run()
+ {
+ try
+ {
+ if (log.isDebugEnabled())
+ {
+ log.debug(ClusterConnectionImpl.this + ":: announcing " +
connector + " to " + backupServerLocator);
+ }
+ ClientSessionFactory backupSessionFactory =
backupServerLocator.connect();
+ if (backupSessionFactory != null)
+ {
+ backupSessionFactory.getConnection()
+ .getChannel(0, -1)
+ .send(new
NodeAnnounceMessage(System.currentTimeMillis(),
+ nodeUUID.toString(),
+ true,
+ connector,
+ null));
+ log.info("backup announced");
+ }
+ }
+ catch (Exception e)
+ {
+ log.warn("Unable to announce backup, retrying", e);
+ }
+ }
+ });
+ }
+
+ private TopologyMember getLocalMember()
+ {
+ return topology.getMember(manager.getNodeId());
+ }
+
+ public void addClusterTopologyListener(final ClusterTopologyListener listener, final
boolean clusterConnection)
+ {
+ topology.addClusterTopologyListener(listener);
+
+ // no need to use an executor here since the Topology is already using one
+ topology.sendTopology(listener);
+ }
+
+ public void removeClusterTopologyListener(final ClusterTopologyListener listener,
final boolean clusterConnection)
+ {
+ topology.removeClusterTopologyListener(listener);
+ }
+
+ public Topology getTopology()
+ {
+ return topology;
+ }
+
+ public void nodeAnnounced(final long uniqueEventID,
+ final String nodeID,
+ final Pair<TransportConfiguration,
TransportConfiguration> connectorPair,
+ final boolean backup)
+ {
+ if (log.isDebugEnabled())
+ {
+ log.debug(this + "::NodeAnnounced, backup=" + backup + nodeID +
connectorPair);
+ }
+
+ TopologyMember newMember = new TopologyMember(connectorPair.a, connectorPair.b);
+ newMember.setUniqueEventID(uniqueEventID);
+ if (backup)
+ {
+ topology.updateBackup(nodeID, new TopologyMember(connectorPair.a,
connectorPair.b));
+ }
+ else
+ {
+ topology.updateMember(uniqueEventID, nodeID, newMember);
+ }
+ }
+
/* (non-Javadoc)
* @see
org.hornetq.core.client.impl.AfterConnectInternalListener#onConnection(org.hornetq.core.client.impl.ClientSessionFactoryInternal)
*/
public void onConnection(ClientSessionFactoryInternal sf)
{
- TopologyMember localMember = manager.getLocalMember();
+ TopologyMember localMember = getLocalMember();
sf.sendNodeAnnounce(localMember.getUniqueEventID(),
manager.getNodeId(),
false,
@@ -498,9 +599,27 @@
}
backup = false;
+
+ topology.updateAsLive(manager.getNodeId(), new TopologyMember(connector, null));
- serverLocator = clusterConnector.createServerLocator();
+ if (backupServerLocator != null)
+ {
+ // todo we could use the topology of this to preempt it arriving from the cc
+ try
+ {
+ backupServerLocator.close();
+ }
+ catch (Exception e)
+ {
+ log.warn("problem closing backup session factory", e);
+ }
+ backupServerLocator = null;
+ }
+
+
+ serverLocator = clusterConnector.createServerLocator(true);
+
if (serverLocator != null)
{
@@ -509,7 +628,7 @@
log.debug("DuplicateDetection is disabled, sending clustered messages
blocked");
}
- final TopologyMember currentMember =
clusterManagerTopology.getMember(nodeUUID.toString());
+ final TopologyMember currentMember = topology.getMember(manager.getNodeId());
if (currentMember == null)
{
@@ -554,6 +673,7 @@
log.debug("sending notification: " + notification);
managementService.sendNotification(notification);
}
+
}
public TransportConfiguration getConnector()
@@ -660,7 +780,6 @@
{
log.debug(this + "::Creating record for nodeID=" + nodeID +
", connectorPair=" + connectorPair);
}
- log.info(this + "::Creating record for nodeID=" + nodeID +
", connectorPair=" + connectorPair);
// New node - create a new flow record
@@ -701,7 +820,26 @@
}
}
}
+
+ public synchronized void informTopology()
+ {
+ String nodeID = server.getNodeID().toString();
+
+ TopologyMember localMember;
+
+ if (backup)
+ {
+ localMember = new TopologyMember(null, connector);
+ }
+ else
+ {
+ localMember = new TopologyMember(connector, null);
+ }
+ topology.updateAsLive(nodeID, localMember);
+ }
+
+
private void createNewRecord(final long eventUID,
final String targetNodeID,
final TransportConfiguration connector,
@@ -709,7 +847,7 @@
final Queue queue,
final boolean start) throws Exception
{
- final ServerLocatorInternal targetLocator = new
ServerLocatorImpl(clusterManagerTopology, false, connector);
+ final ServerLocatorInternal targetLocator = new ServerLocatorImpl(topology, false,
connector);
String nodeId;
@@ -1365,7 +1503,8 @@
@Override
public String toString()
{
- return "ClusterConnectionImpl [nodeUUID=" + nodeUUID +
+ return "ClusterConnectionImpl@" + System.identityHashCode(this) +
+ "[nodeUUID=" + nodeUUID +
", connector=" +
connector +
", address=" +
@@ -1395,7 +1534,7 @@
interface ClusterConnector
{
- ServerLocatorInternal createServerLocator();
+ ServerLocatorInternal createServerLocator(boolean includeTopology);
}
private class StaticClusterConnector implements ClusterConnector
@@ -1407,7 +1546,7 @@
this.tcConfigs = tcConfigs;
}
- public ServerLocatorInternal createServerLocator()
+ public ServerLocatorInternal createServerLocator(boolean includeTopology)
{
if (tcConfigs != null && tcConfigs.length > 0)
{
@@ -1415,7 +1554,9 @@
{
log.debug(ClusterConnectionImpl.this + "Creating a serverLocator for
" + Arrays.toString(tcConfigs));
}
- return new ServerLocatorImpl(clusterManagerTopology, true, tcConfigs);
+ ServerLocatorImpl locator = new
ServerLocatorImpl(includeTopology?topology:null, true, tcConfigs);
+ locator.setClusterConnection(true);
+ return locator;
}
else
{
@@ -1443,9 +1584,11 @@
this.dg = dg;
}
- public ServerLocatorInternal createServerLocator()
+ public ServerLocatorInternal createServerLocator(boolean includeTopology)
{
- return new ServerLocatorImpl(clusterManagerTopology, true, dg);
+ ServerLocatorImpl locator = new ServerLocatorImpl(includeTopology?topology:null,
true, dg);
+ return locator;
+
}
}
}
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java 2011-09-19
23:47:54 UTC (rev 11370)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java 2011-09-20
00:50:32 UTC (rev 11371)
@@ -30,15 +30,10 @@
import java.util.concurrent.ScheduledFuture;
import org.hornetq.api.core.DiscoveryGroupConfiguration;
-import org.hornetq.api.core.Pair;
import org.hornetq.api.core.SimpleString;
import org.hornetq.api.core.TransportConfiguration;
-import org.hornetq.api.core.client.ClientSessionFactory;
-import org.hornetq.api.core.client.ClusterTopologyListener;
import org.hornetq.api.core.client.HornetQClient;
import org.hornetq.core.client.impl.ServerLocatorInternal;
-import org.hornetq.core.client.impl.Topology;
-import org.hornetq.core.client.impl.TopologyMember;
import org.hornetq.core.config.BridgeConfiguration;
import org.hornetq.core.config.BroadcastGroupConfiguration;
import org.hornetq.core.config.ClusterConnectionConfiguration;
@@ -46,7 +41,6 @@
import org.hornetq.core.logging.Logger;
import org.hornetq.core.postoffice.Binding;
import org.hornetq.core.postoffice.PostOffice;
-import org.hornetq.core.protocol.core.impl.wireformat.NodeAnnounceMessage;
import org.hornetq.core.server.HornetQServer;
import org.hornetq.core.server.Queue;
import org.hornetq.core.server.cluster.Bridge;
@@ -83,6 +77,8 @@
private final PostOffice postOffice;
private final ScheduledExecutorService scheduledExecutor;
+
+ private ClusterConnection defaultClusterConnection;
private final ManagementService managementService;
@@ -99,10 +95,6 @@
// the cluster connections which links this node to other cluster nodes
private final Map<String, ClusterConnection> clusterConnections = new
HashMap<String, ClusterConnection>();
- private final Topology topology = new Topology(this);
-
- private volatile ServerLocatorInternal backupServerLocator;
-
private final Set<ServerLocatorInternal> clusterLocators = new
ConcurrentHashSet<ServerLocatorInternal>();
private final Executor executor;
@@ -126,8 +118,6 @@
executor = executorFactory.getExecutor();;
- topology.setExecutor(executor);
-
this.server = server;
this.postOffice = postOffice;
@@ -152,7 +142,6 @@
out.println("Information on " + this);
out.println("*******************************************************");
- out.println("Topology: " + topology.describe("Toopology on " +
this));
for (ClusterConnection conn : this.clusterConnections.values())
{
@@ -163,29 +152,24 @@
return str.toString();
}
+
+ public ClusterConnection getDefaultConnection()
+ {
+ return defaultClusterConnection;
+ }
public String toString()
{
return "ClusterManagerImpl[server=" + server + "]@" +
System.identityHashCode(this);
}
- public TopologyMember getLocalMember()
- {
- return topology.getMember(nodeUUID.toString());
- }
-
public String getNodeId()
{
return nodeUUID.toString();
}
- public synchronized void start() throws Exception
+ public synchronized void deploy() throws Exception
{
- if (started)
- {
- return;
- }
-
if (clustered)
{
for (BroadcastGroupConfiguration config :
configuration.getBroadcastGroupConfigurations())
@@ -193,43 +177,44 @@
deployBroadcastGroup(config);
}
- String connectorName = null;
-
for (ClusterConnectionConfiguration config :
configuration.getClusterConfigurations())
{
- if (connectorName == null)
- {
- connectorName = config.getConnectorName();
- break;
- }
- }
+ deployClusterConnection(config);
+ }
+ }
+ }
- if (connectorName != null)
+ public synchronized void start() throws Exception
+ {
+ if (started)
+ {
+ return;
+ }
+
+ for (BroadcastGroup group: broadcastGroups.values())
+ {
+ if (!backup)
{
- TransportConfiguration nodeConnector =
configuration.getConnectorConfigurations().get(connectorName);
- if (nodeConnector == null)
- {
- log.warn("No connecor with name '" + connectorName +
- "'. The cluster connection will not be
deployed.");
- return;
- }
-
- // Now announce presence
- announceNode(nodeConnector);
-
- for (ClusterConnectionConfiguration config :
configuration.getClusterConfigurations())
- {
- deployClusterConnection(config);
- }
+ group.start();
}
-
}
+
+ for (ClusterConnection conn : clusterConnections.values())
+ {
+ conn.start();
+ if (backup)
+ {
+ conn.informTopology();
+ conn.announceBackup();
+ }
+ }
for (BridgeConfiguration config : configuration.getBridgeConfigurations())
{
- deployBridge(config);
+ deployBridge(config, !backup);
}
+
started = true;
}
@@ -267,12 +252,6 @@
}
bridges.clear();
-
- if (backupServerLocator != null)
- {
- backupServerLocator.close();
- backupServerLocator = null;
- }
}
for (ServerLocatorInternal clusterLocator : clusterLocators)
@@ -289,31 +268,9 @@
clusterLocators.clear();
started = false;
- clusterConnections.clear();
+ clearClusterConnections();
}
- public void nodeAnnounced(final long uniqueEventID,
- final String nodeID,
- final Pair<TransportConfiguration,
TransportConfiguration> connectorPair,
- final boolean backup)
- {
- if (log.isDebugEnabled())
- {
- log.debug(this + "::NodeAnnounced, backup=" + backup + nodeID +
connectorPair);
- }
-
- TopologyMember newMember = new TopologyMember(connectorPair.a, connectorPair.b);
- newMember.setUniqueEventID(uniqueEventID);
- if (backup)
- {
- topology.updateBackup(nodeID, new TopologyMember(connectorPair.a,
connectorPair.b));
- }
- else
- {
- topology.updateMember(uniqueEventID, nodeID, newMember);
- }
- }
-
public void flushExecutor()
{
Future future = new Future();
@@ -350,24 +307,6 @@
return clusterConnections.get(name.toString());
}
- public void addClusterTopologyListener(final ClusterTopologyListener listener, final
boolean clusterConnection)
- {
- topology.addClusterTopologyListener(listener);
-
- // no need to use an executor here since the Topology is already using one
- topology.sendTopology(listener);
- }
-
- public void removeClusterTopologyListener(final ClusterTopologyListener listener,
final boolean clusterConnection)
- {
- topology.removeClusterTopologyListener(listener);
- }
-
- public Topology getTopology()
- {
- return topology;
- }
-
// backup node becomes live
public synchronized void activate()
{
@@ -375,27 +314,6 @@
{
backup = false;
- String nodeID = server.getNodeID().toString();
-
- TopologyMember member = topology.getMember(nodeID);
- // swap backup as live and send it to everybody
- member = new TopologyMember(member.getConnector().b, null);
- topology.updateAsLive(nodeID, member);
-
- if (backupServerLocator != null)
- {
- // todo we could use the topology of this to preempt it arriving from the cc
- try
- {
- backupServerLocator.close();
- }
- catch (Exception e)
- {
- log.warn("problem closing backup session factory", e);
- }
- backupServerLocator = null;
- }
-
for (BroadcastGroup broadcastGroup : broadcastGroups.values())
{
try
@@ -432,31 +350,15 @@
log.warn("unable to start bridge " + bridge.getName(), e);
}
}
-
- topology.sendMember(nodeID);
}
}
public void announceBackup() throws Exception
{
- List<ClusterConnectionConfiguration> configs =
this.configuration.getClusterConfigurations();
- if (!configs.isEmpty())
+ for (ClusterConnection conn : this.clusterConnections.values())
{
- ClusterConnectionConfiguration config = configs.get(0);
-
- TransportConfiguration connector =
configuration.getConnectorConfigurations().get(config.getConnectorName());
-
- if (connector == null)
- {
- log.warn("No connecor with name '" + config.getConnectorName()
+ "'. backup cannot be announced.");
- return;
- }
- announceBackup(config, connector);
+ conn.announceBackup();
}
- else
- {
- log.warn("no cluster connections defined, unable to announce
backup");
- }
}
public void addClusterLocator(final ServerLocatorInternal serverLocator)
@@ -468,114 +370,9 @@
{
this.clusterLocators.remove(serverLocator);
}
-
- private synchronized void announceNode(final TransportConfiguration nodeConnector)
+
+ public synchronized void deployBridge(final BridgeConfiguration config, final boolean
start) throws Exception
{
- String nodeID = server.getNodeID().toString();
-
- TopologyMember localMember;
- if (backup)
- {
- localMember = new TopologyMember(null, nodeConnector);
- }
- else
- {
- localMember = new TopologyMember(nodeConnector, null);
- }
-
- topology.updateAsLive(nodeID, localMember);
- }
-
- private synchronized void deployBroadcastGroup(final BroadcastGroupConfiguration
config) throws Exception
- {
- if (broadcastGroups.containsKey(config.getName()))
- {
- ClusterManagerImpl.log.warn("There is already a broadcast-group with name
" + config.getName() +
- " deployed. This one will not be
deployed.");
-
- return;
- }
-
- InetAddress localAddress = null;
- if (config.getLocalBindAddress() != null)
- {
- localAddress = InetAddress.getByName(config.getLocalBindAddress());
- }
-
- InetAddress groupAddress = InetAddress.getByName(config.getGroupAddress());
-
- BroadcastGroupImpl group = new BroadcastGroupImpl(nodeUUID.toString(),
- config.getName(),
- localAddress,
- config.getLocalBindPort(),
- groupAddress,
- config.getGroupPort(),
- !backup);
-
- for (String connectorInfo : config.getConnectorInfos())
- {
- TransportConfiguration connector =
configuration.getConnectorConfigurations().get(connectorInfo);
-
- if (connector == null)
- {
- logWarnNoConnector(config.getName(), connectorInfo);
-
- return;
- }
-
- group.addConnector(connector);
- }
-
- ScheduledFuture<?> future = scheduledExecutor.scheduleWithFixedDelay(group,
- 0L,
-
config.getBroadcastPeriod(),
-
MILLISECONDS);
-
- group.setScheduledFuture(future);
-
- broadcastGroups.put(config.getName(), group);
-
- managementService.registerBroadcastGroup(group, config);
-
- if (!backup)
- {
- group.start();
- }
- }
-
- private void logWarnNoConnector(final String connectorName, final String bgName)
- {
- ClusterManagerImpl.log.warn("There is no connector deployed with name
'" + connectorName +
- "'. The broadcast group with name '"
+
- bgName +
- "' will not be deployed.");
- }
-
- private TransportConfiguration[] connectorNameListToArray(final List<String>
connectorNames)
- {
- TransportConfiguration[] tcConfigs =
(TransportConfiguration[])Array.newInstance(TransportConfiguration.class,
-
connectorNames.size());
- int count = 0;
- for (String connectorName : connectorNames)
- {
- TransportConfiguration connector =
configuration.getConnectorConfigurations().get(connectorName);
-
- if (connector == null)
- {
- ClusterManagerImpl.log.warn("No connector defined with name '"
+ connectorName +
- "'. The bridge will not be
deployed.");
-
- return null;
- }
-
- tcConfigs[count++] = connector;
- }
-
- return tcConfigs;
- }
-
- public synchronized void deployBridge(final BridgeConfiguration config) throws
Exception
- {
if (config.getName() == null)
{
ClusterManagerImpl.log.warn("Must specify a unique name for each bridge.
This one will not be deployed.");
@@ -702,11 +499,12 @@
bridges.put(config.getName(), bridge);
managementService.registerBridge(bridge, config);
-
- if (!backup)
+
+ if (start)
{
bridge.start();
}
+
}
public void destroyBridge(final String name) throws Exception
@@ -726,11 +524,49 @@
bridge.flushExecutor();
}
- private synchronized void deployClusterConnection(final ClusterConnectionConfiguration
config) throws Exception
+ // for testing
+ public void clear()
{
+ for (Bridge bridge : bridges.values())
+ {
+ try
+ {
+ bridge.stop();
+ }
+ catch (Exception e)
+ {
+ log.warn(e.getMessage(), e);
+ }
+ }
+ bridges.clear();
+ for (ClusterConnection clusterConnection : clusterConnections.values())
+ {
+ try
+ {
+ clusterConnection.stop();
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ }
+ }
+ clearClusterConnections();
+ }
+
+ // Private methods
----------------------------------------------------------------------------------------------------
+
+
+ private void clearClusterConnections()
+ {
+ clusterConnections.clear();
+ this.defaultClusterConnection = null;
+ }
+
+ private void deployClusterConnection(final ClusterConnectionConfiguration config)
throws Exception
+ {
if (config.getName() == null)
{
- ClusterManagerImpl.log.warn("Must specify a unique name for each cluster.
This one will not be deployed.");
+ ClusterManagerImpl.log.warn("Must specify a unique name for each cluster
connection. This one will not be deployed.");
return;
}
@@ -781,7 +617,6 @@
}
clusterConnection = new ClusterConnectionImpl(this,
- topology,
dg,
connector,
new
SimpleString(config.getName()),
@@ -819,7 +654,6 @@
}
clusterConnection = new ClusterConnectionImpl(this,
- topology,
tcConfigs,
connector,
new
SimpleString(config.getName()),
@@ -847,6 +681,11 @@
config.isAllowDirectConnectionsOnly());
}
+ if (defaultClusterConnection == null)
+ {
+ defaultClusterConnection = clusterConnection;
+ }
+
managementService.registerCluster(clusterConnection, config);
clusterConnections.put(config.getName(), clusterConnection);
@@ -855,75 +694,8 @@
{
log.debug("ClusterConnection.start at " + clusterConnection, new
Exception("trace"));
}
- clusterConnection.start();
-
- if (backup)
- {
- announceBackup(config, connector);
- }
}
-
- private void announceBackup(final ClusterConnectionConfiguration config, final
TransportConfiguration connector) throws Exception
- {
- if (config.getStaticConnectors() != null)
- {
- TransportConfiguration[] tcConfigs =
connectorNameListToArray(config.getStaticConnectors());
-
- backupServerLocator =
(ServerLocatorInternal)HornetQClient.createServerLocatorWithoutHA(tcConfigs);
- backupServerLocator.setReconnectAttempts(-1);
- backupServerLocator.setInitialConnectAttempts(-1);
- }
- else if (config.getDiscoveryGroupName() != null)
- {
- DiscoveryGroupConfiguration dg =
configuration.getDiscoveryGroupConfigurations()
-
.get(config.getDiscoveryGroupName());
-
- if (dg == null)
- {
- ClusterManagerImpl.log.warn("No discovery group with name '" +
config.getDiscoveryGroupName() +
- "'. The cluster connection will not be
deployed.");
- }
-
- backupServerLocator =
(ServerLocatorInternal)HornetQClient.createServerLocatorWithoutHA(dg);
- backupServerLocator.setReconnectAttempts(-1);
- backupServerLocator.setInitialConnectAttempts(-1);
- }
- else
- {
- return;
- }
- log.info("announcing backup");
- executor.execute(new Runnable()
- {
- public void run()
- {
- try
- {
- if (log.isDebugEnabled())
- {
- log.debug(ClusterManagerImpl.this + ":: announcing " +
connector + " to " + backupServerLocator);
- }
- ClientSessionFactory backupSessionFactory =
backupServerLocator.connect();
- if (backupSessionFactory != null)
- {
- backupSessionFactory.getConnection()
- .getChannel(0, -1)
- .send(new
NodeAnnounceMessage(System.currentTimeMillis(),
- nodeUUID.toString(),
- true,
- connector,
- null));
- log.info("backup announced");
- }
- }
- catch (Exception e)
- {
- log.warn("Unable to announce backup, retrying", e);
- }
- }
- });
- }
-
+
private Transformer instantiateTransformer(final String transformerClassName)
{
Transformer transformer = null;
@@ -945,32 +717,89 @@
return transformer;
}
- // for testing
- public void clear()
+
+ private synchronized void deployBroadcastGroup(final BroadcastGroupConfiguration
config) throws Exception
{
- for (Bridge bridge : bridges.values())
+ if (broadcastGroups.containsKey(config.getName()))
{
- try
+ ClusterManagerImpl.log.warn("There is already a broadcast-group with name
" + config.getName() +
+ " deployed. This one will not be
deployed.");
+
+ return;
+ }
+
+ InetAddress localAddress = null;
+ if (config.getLocalBindAddress() != null)
+ {
+ localAddress = InetAddress.getByName(config.getLocalBindAddress());
+ }
+
+ InetAddress groupAddress = InetAddress.getByName(config.getGroupAddress());
+
+ BroadcastGroupImpl group = new BroadcastGroupImpl(nodeUUID.toString(),
+ config.getName(),
+ localAddress,
+ config.getLocalBindPort(),
+ groupAddress,
+ config.getGroupPort(),
+ !backup);
+
+ for (String connectorInfo : config.getConnectorInfos())
+ {
+ TransportConfiguration connector =
configuration.getConnectorConfigurations().get(connectorInfo);
+
+ if (connector == null)
{
- bridge.stop();
+ logWarnNoConnector(config.getName(), connectorInfo);
+
+ return;
}
- catch (Exception e)
- {
- log.warn(e.getMessage(), e);
- }
+
+ group.addConnector(connector);
}
- bridges.clear();
- for (ClusterConnection clusterConnection : clusterConnections.values())
+
+ ScheduledFuture<?> future = scheduledExecutor.scheduleWithFixedDelay(group,
+ 0L,
+
config.getBroadcastPeriod(),
+
MILLISECONDS);
+
+ group.setScheduledFuture(future);
+
+ broadcastGroups.put(config.getName(), group);
+
+ managementService.registerBroadcastGroup(group, config);
+ }
+
+ private void logWarnNoConnector(final String connectorName, final String bgName)
+ {
+ ClusterManagerImpl.log.warn("There is no connector deployed with name
'" + connectorName +
+ "'. The broadcast group with name '"
+
+ bgName +
+ "' will not be deployed.");
+ }
+
+ private TransportConfiguration[] connectorNameListToArray(final List<String>
connectorNames)
+ {
+ TransportConfiguration[] tcConfigs =
(TransportConfiguration[])Array.newInstance(TransportConfiguration.class,
+
connectorNames.size());
+ int count = 0;
+ for (String connectorName : connectorNames)
{
- try
+ TransportConfiguration connector =
configuration.getConnectorConfigurations().get(connectorName);
+
+ if (connector == null)
{
- clusterConnection.stop();
+ ClusterManagerImpl.log.warn("No connector defined with name '"
+ connectorName +
+ "'. The bridge will not be
deployed.");
+
+ return null;
}
- catch (Exception e)
- {
- e.printStackTrace();
- }
+
+ tcConfigs[count++] = connector;
}
- clusterConnections.clear();
+
+ return tcConfigs;
}
+
+
}
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerInternal.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerInternal.java 2011-09-19
23:47:54 UTC (rev 11370)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerInternal.java 2011-09-20
00:50:32 UTC (rev 11371)
@@ -14,7 +14,6 @@
package org.hornetq.core.server.cluster.impl;
import org.hornetq.core.client.impl.ServerLocatorInternal;
-import org.hornetq.core.client.impl.TopologyMember;
import org.hornetq.core.server.cluster.ClusterManager;
/**
@@ -30,8 +29,6 @@
void removeClusterLocator(ServerLocatorInternal locator);
- TopologyMember getLocalMember();
-
String getNodeId();
}
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2011-09-19
23:47:54 UTC (rev 11370)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2011-09-20
00:50:32 UTC (rev 11371)
@@ -227,7 +227,12 @@
// Used to identify the server on tests... useful on debugging testcases
private String identity;
+
+ private Thread backupActivationThread;
+ private Activation activation;
+
+
// Constructors
// ---------------------------------------------------------------------------------
@@ -289,11 +294,6 @@
// lifecycle methods
// ----------------------------------------------------------------
- private interface Activation extends Runnable
- {
- void close(boolean permanently) throws Exception;
- }
-
/*
* Can be overridden for tests
*/
@@ -309,259 +309,6 @@
}
}
- private class NoSharedStoreLiveActivation implements Activation
- {
- public void run()
- {
- try
- {
- initialisePart1();
-
- initialisePart2();
-
- if (identity != null)
- {
- log.info("Server " + identity + " is now live");
- }
- else
- {
- log.info("Server is now live");
- }
- }
- catch (Exception e)
- {
- log.error("Failure in initialisation", e);
- }
- }
-
- public void close(boolean permanently) throws Exception
- {
-
- }
- }
-
- private class SharedStoreLiveActivation implements Activation
- {
- public void run()
- {
- try
- {
- log.info("Waiting to obtain live lock");
-
- checkJournalDirectory();
-
- initialisePart1();
-
- if(nodeManager.isBackupLive())
- {
- //looks like we've failed over at some point need to inform that we
are the backup so when the current live
- // goes down they failover to us
- clusterManager.announceBackup();
- Thread.sleep(configuration.getFailbackDelay());
- }
-
- nodeManager.startLiveNode();
-
- if (stopped)
- {
- return;
- }
-
- initialisePart2();
-
- log.info("Server is now live");
- }
- catch (Exception e)
- {
- log.error("Failure in initialisation", e);
- }
- }
-
- public void close(boolean permanently) throws Exception
- {
- if(permanently)
- {
- nodeManager.crashLiveServer();
- }
- else
- {
- nodeManager.pauseLiveServer();
- }
- }
- }
-
-
- private class SharedStoreBackupActivation implements Activation
- {
-
- volatile boolean closed = false;
- public void run()
- {
- try
- {
- nodeManager.startBackup();
-
- initialisePart1();
-
- clusterManager.start();
-
- started = true;
-
- log.info("HornetQ Backup Server version " +
getVersion().getFullVersion() + " [" + nodeManager.getNodeId() + "]
started, waiting live to fail before it gets active");
-
- nodeManager.awaitLiveNode();
-
- configuration.setBackup(false);
-
- if (stopped)
- {
- return;
- }
-
- initialisePart2();
-
- clusterManager.activate();
-
- log.info("Backup Server is now live");
-
- nodeManager.releaseBackup();
- if(configuration.isAllowAutoFailBack())
- {
- class FailbackChecker implements Runnable
- {
- boolean restarting = false;
- public void run()
- {
- try
- {
- if(!restarting && nodeManager.isAwaitingFailback())
- {
- log.info("live server wants to restart, restarting server
in backup");
- restarting = true;
- Thread t = new Thread(new Runnable()
- {
- public void run()
- {
- try
- {
- log.debug(HornetQServerImpl.this + "::Stopping
live node in favor of failback");
- stop(true);
- // We need to wait some time before we start the
backup again
- // otherwise we may eventually start before the live
had a chance to get it
- Thread.sleep(configuration.getFailbackDelay());
- configuration.setBackup(true);
- log.debug(HornetQServerImpl.this + "::Starting
backup node now after failback");
- start();
- }
- catch (Exception e)
- {
- log.warn("unable to restart server, please kill
and restart manually", e);
- }
- }
- });
- t.start();
- }
- }
- catch (Exception e)
- {
- log.debug(e.getMessage(), e);
- //hopefully it will work next call
- }
- }
- }
- scheduledPool.scheduleAtFixedRate(new FailbackChecker(), 1000l, 1000l,
TimeUnit.MILLISECONDS);
- }
- }
- catch (InterruptedException e)
- {
- //this is ok, we are being stopped
- }
- catch (ClosedChannelException e)
- {
- //this is ok too, we are being stopped
- }
- catch (Exception e)
- {
- if(!(e.getCause() instanceof InterruptedException))
- {
- log.error("Failure in initialisation", e);
- }
- }
- catch(Throwable e)
- {
- log.error("Failure in initialisation", e);
- }
- }
-
- public void close(boolean permanently) throws Exception
- {
- if (configuration.isBackup())
- {
- long timeout = 30000;
-
- long start = System.currentTimeMillis();
-
- while (backupActivationThread.isAlive() && System.currentTimeMillis()
- start < timeout)
- {
- nodeManager.interrupt();
-
- backupActivationThread.interrupt();
-
- backupActivationThread.join(1000);
-
- }
-
- if (System.currentTimeMillis() - start >= timeout)
- {
- threadDump("Timed out waiting for backup activation to exit");
- }
-
- nodeManager.stopBackup();
- }
- else
- {
- //if we are now live, behave as live
- // We need to delete the file too, otherwise the backup will failover when we
shutdown or if the backup is
- // started before the live
- if(permanently)
- {
- nodeManager.crashLiveServer();
- }
- else
- {
- nodeManager.pauseLiveServer();
- }
- }
- }
- }
-
- private class SharedNothingBackupActivation implements Activation
- {
- public void run()
- {
- try
- {
- // TODO
-
- // Try-Connect to live server using live-connector-ref
-
- // sit in loop and try and connect, if server is not live then it will return
NOT_LIVE
- }
- catch (Exception e)
- {
- log.error("Failure in initialisation", e);
- }
- }
-
- public void close(boolean permanently) throws Exception
- {
- }
- }
-
- private Thread backupActivationThread;
-
- private Activation activation;
-
public synchronized void start() throws Exception
{
stopped = false;
@@ -611,6 +358,7 @@
}
+ // The activation on fail-back may change the value of isBackup, for that reason we
are not using else here
if (configuration.isBackup())
{
if (configuration.isSharedStore())
@@ -1069,7 +817,6 @@
return new HashSet<ServerSession>(sessions.values());
}
- // TODO - should this really be here?? It's only used in tests
public boolean isInitialised()
{
synchronized (initialiseLock)
@@ -1232,9 +979,145 @@
return connectorsService;
}
- // Public
- //
---------------------------------------------------------------------------------------
+
+ public synchronized boolean checkActivate() throws Exception
+ {
+ if (configuration.isBackup())
+ {
+ // Handle backup server activation
+ if (!configuration.isSharedStore())
+ {
+ if (replicationEndpoint == null)
+ {
+ HornetQServerImpl.log.warn("There is no replication endpoint,
can't activate this backup server");
+
+ throw new HornetQException(HornetQException.INTERNAL_ERROR,
"Can't activate the server");
+ }
+
+ replicationEndpoint.stop();
+ }
+
+ // Complete the startup procedure
+
+ HornetQServerImpl.log.info("Activating backup server");
+
+ configuration.setBackup(false);
+
+ initialisePart2();
+ }
+
+ return true;
+ }
+
+ public void deployDivert(DivertConfiguration config) throws Exception
+ {
+ if (config.getName() == null)
+ {
+ HornetQServerImpl.log.warn("Must specify a name for each divert. This one
will not be deployed.");
+
+ return;
+ }
+
+ if (config.getAddress() == null)
+ {
+ HornetQServerImpl.log.warn("Must specify an address for each divert. This
one will not be deployed.");
+
+ return;
+ }
+
+ if (config.getForwardingAddress() == null)
+ {
+ HornetQServerImpl.log.warn("Must specify an forwarding address for each
divert. This one will not be deployed.");
+
+ return;
+ }
+
+ SimpleString sName = new SimpleString(config.getName());
+
+ if (postOffice.getBinding(sName) != null)
+ {
+ HornetQServerImpl.log.warn("Binding already exists with name " + sName
+ ", divert will not be deployed");
+
+ return;
+ }
+
+ SimpleString sAddress = new SimpleString(config.getAddress());
+
+ Transformer transformer =
instantiateTransformer(config.getTransformerClassName());
+
+ Filter filter = FilterImpl.createFilter(config.getFilterString());
+
+ Divert divert = new DivertImpl(new SimpleString(config.getForwardingAddress()),
+ sName,
+ new SimpleString(config.getRoutingName()),
+ config.isExclusive(),
+ filter,
+ transformer,
+ postOffice,
+ storageManager);
+
+ Binding binding = new DivertBinding(storageManager.generateUniqueID(), sAddress,
divert);
+
+ postOffice.addBinding(binding);
+
+ managementService.registerDivert(divert, config);
+ }
+
+ public void destroyDivert(SimpleString name) throws Exception
+ {
+ Binding binding = postOffice.getBinding(name);
+ if (binding == null)
+ {
+ throw new HornetQException(HornetQException.INTERNAL_ERROR, "No binding for
divert " + name);
+ }
+ if (!(binding instanceof DivertBinding))
+ {
+ throw new HornetQException(HornetQException.INTERNAL_ERROR, "Binding "
+ name + " is not a divert");
+ }
+
+ postOffice.removeBinding(name);
+ }
+
+
+
+ public void deployBridge(BridgeConfiguration config) throws Exception
+ {
+ if (clusterManager != null)
+ {
+ clusterManager.deployBridge(config, true);
+ }
+ }
+
+ public void destroyBridge(String name) throws Exception
+ {
+ if (clusterManager != null)
+ {
+ clusterManager.destroyBridge(name);
+ }
+ }
+
+ public ServerSession getSessionByID(String sessionName)
+ {
+ return sessions.get(sessionName);
+ }
+
+ // PUBLIC -------
+
+ public String toString()
+ {
+ if (identity != null)
+ {
+ return "HornetQServerImpl::" + identity;
+ }
+ else
+ {
+ return "HornetQServerImpl::" + (nodeManager != null ?
"serverUUID=" + nodeManager.getUUID() : "");
+ }
+ }
+
+
+
// Package protected
// ----------------------------------------------------------------------------
@@ -1296,34 +1179,6 @@
// Private
//
--------------------------------------------------------------------------------------
- // private boolean startReplication() throws Exception
- // {
- // String backupConnectorName = configuration.getBackupConnectorName();
- //
- // if (!configuration.isSharedStore() && backupConnectorName != null)
- // {
- // TransportConfiguration backupConnector =
configuration.getConnectorConfigurations().get(backupConnectorName);
- //
- // if (backupConnector == null)
- // {
- // HornetQServerImpl.log.warn("connector with name '" +
backupConnectorName +
- // "' is not defined in the configuration.");
- // }
- // else
- // {
- //
- // replicationFailoverManager =
createBackupConnectionFailoverManager(backupConnector,
- // threadPool,
- // scheduledPool);
- //
- // replicationManager = new ReplicationManagerImpl(replicationFailoverManager,
executorFactory);
- // replicationManager.start();
- // }
- // }
- //
- // return true;
- // }
-
private void callActivateCallbacks()
{
for (ActivateCallback callback : activateCallbacks)
@@ -1340,44 +1195,6 @@
}
}
- public synchronized boolean checkActivate() throws Exception
- {
- if (configuration.isBackup())
- {
- // Handle backup server activation
-
- if (!configuration.isSharedStore())
- {
- if (replicationEndpoint == null)
- {
- HornetQServerImpl.log.warn("There is no replication endpoint,
can't activate this backup server");
-
- throw new HornetQException(HornetQException.INTERNAL_ERROR,
"Can't activate the server");
- }
-
- replicationEndpoint.stop();
- }
-
- // Complete the startup procedure
-
- HornetQServerImpl.log.info("Activating backup server");
-
- configuration.setBackup(false);
-
- initialisePart2();
- }
-
- return true;
- }
-
- private class FileActivateRunner implements Runnable
- {
- public void run()
- {
-
- }
- }
-
private void initialiseLogging()
{
LogDelegateFactory logDelegateFactory =
(LogDelegateFactory)instantiateInstance(configuration.getLogDelegateFactoryClassName());
@@ -1414,8 +1231,6 @@
managementService = new ManagementServiceImpl(mbeanServer, configuration);
- remotingService = new RemotingServiceImpl(configuration, this, managementService,
scheduledPool);
-
if (configuration.getMemoryMeasureInterval() != -1)
{
memoryManager = new
MemoryManagerImpl(configuration.getMemoryWarningThreshold(),
@@ -1470,6 +1285,23 @@
configuration.isPersistIDCache(),
addressSettingsRepository);
+ // This can't be created until node id is set
+ clusterManager = new ClusterManagerImpl(executorFactory,
+ this,
+ postOffice,
+ scheduledPool,
+ managementService,
+ configuration,
+ nodeManager.getUUID(),
+ configuration.isBackup(),
+ configuration.isClustered());
+
+
+ clusterManager.deploy();
+
+
+ remotingService = new RemotingServiceImpl(clusterManager, configuration, this,
managementService, scheduledPool);
+
messagingServerControl = managementService.registerServer(postOffice,
storageManager,
configuration,
@@ -1527,18 +1359,6 @@
deploySecurityFromConfiguration();
deployGroupingHandlerConfiguration(configuration.getGroupingHandlerConfiguration());
-
- // This can't be created until node id is set
- clusterManager = new ClusterManagerImpl(executorFactory,
- this,
- postOffice,
- scheduledPool,
- managementService,
- configuration,
- nodeManager.getUUID(),
- configuration.isBackup(),
- configuration.isClustered());
-
}
/*
@@ -1604,10 +1424,10 @@
// We do this at the end - we don't want things like MDBs or other connections
connecting to a backup server until
// it is activated
- remotingService.start();
-
clusterManager.start();
+ remotingService.start();
+
initialised = true;
}
@@ -1826,76 +1646,6 @@
}
}
- public void deployDivert(DivertConfiguration config) throws Exception
- {
- if (config.getName() == null)
- {
- HornetQServerImpl.log.warn("Must specify a name for each divert. This one
will not be deployed.");
-
- return;
- }
-
- if (config.getAddress() == null)
- {
- HornetQServerImpl.log.warn("Must specify an address for each divert. This
one will not be deployed.");
-
- return;
- }
-
- if (config.getForwardingAddress() == null)
- {
- HornetQServerImpl.log.warn("Must specify an forwarding address for each
divert. This one will not be deployed.");
-
- return;
- }
-
- SimpleString sName = new SimpleString(config.getName());
-
- if (postOffice.getBinding(sName) != null)
- {
- HornetQServerImpl.log.warn("Binding already exists with name " + sName
+ ", divert will not be deployed");
-
- return;
- }
-
- SimpleString sAddress = new SimpleString(config.getAddress());
-
- Transformer transformer =
instantiateTransformer(config.getTransformerClassName());
-
- Filter filter = FilterImpl.createFilter(config.getFilterString());
-
- Divert divert = new DivertImpl(new SimpleString(config.getForwardingAddress()),
- sName,
- new SimpleString(config.getRoutingName()),
- config.isExclusive(),
- filter,
- transformer,
- postOffice,
- storageManager);
-
- Binding binding = new DivertBinding(storageManager.generateUniqueID(), sAddress,
divert);
-
- postOffice.addBinding(binding);
-
- managementService.registerDivert(divert, config);
- }
-
- public void destroyDivert(SimpleString name) throws Exception
- {
- Binding binding = postOffice.getBinding(name);
- if (binding == null)
- {
- throw new HornetQException(HornetQException.INTERNAL_ERROR, "No binding for
divert " + name);
- }
- if (!(binding instanceof DivertBinding))
- {
- throw new HornetQException(HornetQException.INTERNAL_ERROR, "Binding "
+ name + " is not a divert");
- }
-
- postOffice.removeBinding(name);
- }
-
-
private synchronized void deployGroupingHandlerConfiguration(final
GroupingHandlerConfiguration config) throws Exception
{
if (config != null)
@@ -1922,22 +1672,6 @@
managementService.addNotificationListener(groupingHandler);
}
}
-
- public void deployBridge(BridgeConfiguration config) throws Exception
- {
- if (clusterManager != null)
- {
- clusterManager.deployBridge(config);
- }
- }
-
- public void destroyBridge(String name) throws Exception
- {
- if (clusterManager != null)
- {
- clusterManager.destroyBridge(name);
- }
- }
private Transformer instantiateTransformer(final String transformerClassName)
{
@@ -1979,11 +1713,6 @@
}
- public ServerSession getSessionByID(String sessionName)
- {
- return sessions.get(sessionName);
- }
-
/**
* Check if journal directory exists or create it (if configured to do so)
*/
@@ -2005,18 +1734,284 @@
}
}
- public String toString()
+ /**
+ * To be called by backup trying to fail back the server
+ */
+ private void startFailbackChecker()
{
- if (identity != null)
+ scheduledPool.scheduleAtFixedRate(new FailbackChecker(), 1000l, 1000l,
TimeUnit.MILLISECONDS);
+ }
+
+
+ // Inner classes
+ // --------------------------------------------------------------------------------
+
+ class FailbackChecker implements Runnable
+ {
+ boolean restarting = false;
+ public void run()
{
- return "HornetQServerImpl::" + identity;
+ try
+ {
+ if(!restarting && nodeManager.isAwaitingFailback())
+ {
+ log.info("live server wants to restart, restarting server in
backup");
+ restarting = true;
+ Thread t = new Thread(new Runnable()
+ {
+ public void run()
+ {
+ try
+ {
+ log.debug(HornetQServerImpl.this + "::Stopping live node in
favor of failback");
+ stop(true);
+ // We need to wait some time before we start the backup again
+ // otherwise we may eventually start before the live had a chance
to get it
+ Thread.sleep(configuration.getFailbackDelay());
+ configuration.setBackup(true);
+ log.debug(HornetQServerImpl.this + "::Starting backup node
now after failback");
+ start();
+ }
+ catch (Exception e)
+ {
+ log.warn("unable to restart server, please kill and restart
manually", e);
+ }
+ }
+ });
+ t.start();
+ }
+ }
+ catch (Exception e)
+ {
+ log.warn(e.getMessage(), e);
+ }
}
- else
+ }
+
+
+
+ private class SharedStoreLiveActivation implements Activation
+ {
+ public void run()
{
- return "HornetQServerImpl::" + (nodeManager != null ?
"serverUUID=" + nodeManager.getUUID() : "");
+ try
+ {
+ log.info("Waiting to obtain live lock");
+
+ checkJournalDirectory();
+
+ if (log.isDebugEnabled())
+ {
+ log.debug("First part initialization on " + this);
+ }
+
+ initialisePart1();
+
+ if(nodeManager.isBackupLive())
+ {
+ //looks like we've failed over at some point need to inform that we
are the backup so when the current live
+ // goes down they failover to us
+ if (log.isDebugEnabled())
+ {
+ log.debug("announcing backup to the former live" + this);
+ }
+
+ clusterManager.announceBackup();
+ Thread.sleep(configuration.getFailbackDelay());
+ }
+
+ nodeManager.startLiveNode();
+
+ if (stopped)
+ {
+ return;
+ }
+
+ initialisePart2();
+
+ log.info("Server is now live");
+ }
+ catch (Exception e)
+ {
+ log.error("Failure in initialisation", e);
+ }
}
+
+ public void close(boolean permanently) throws Exception
+ {
+ if(permanently)
+ {
+ nodeManager.crashLiveServer();
+ }
+ else
+ {
+ nodeManager.pauseLiveServer();
+ }
+ }
}
- // Inner classes
- // --------------------------------------------------------------------------------
+
+ private class SharedStoreBackupActivation implements Activation
+ {
+ public void run()
+ {
+ try
+ {
+ nodeManager.startBackup();
+
+ initialisePart1();
+
+ clusterManager.start();
+
+ started = true;
+
+ log.info("HornetQ Backup Server version " +
getVersion().getFullVersion() + " [" + nodeManager.getNodeId() + "]
started, waiting live to fail before it gets active");
+
+ nodeManager.awaitLiveNode();
+
+ configuration.setBackup(false);
+
+ if (stopped)
+ {
+ return;
+ }
+
+ initialisePart2();
+
+ clusterManager.activate();
+
+ log.info("Backup Server is now live");
+
+ nodeManager.releaseBackup();
+ if(configuration.isAllowAutoFailBack())
+ {
+ startFailbackChecker();
+ }
+ }
+ catch (InterruptedException e)
+ {
+ //this is ok, we are being stopped
+ }
+ catch (ClosedChannelException e)
+ {
+ //this is ok too, we are being stopped
+ }
+ catch (Exception e)
+ {
+ if(!(e.getCause() instanceof InterruptedException))
+ {
+ log.error("Failure in initialisation", e);
+ }
+ }
+ catch(Throwable e)
+ {
+ log.error("Failure in initialisation", e);
+ }
+ }
+
+ /**
+ *
+ */
+ public void close(boolean permanently) throws Exception
+ {
+ if (configuration.isBackup())
+ {
+ long timeout = 30000;
+
+ long start = System.currentTimeMillis();
+
+ while (backupActivationThread.isAlive() && System.currentTimeMillis()
- start < timeout)
+ {
+ nodeManager.interrupt();
+
+ backupActivationThread.interrupt();
+
+ backupActivationThread.join(1000);
+
+ }
+
+ if (System.currentTimeMillis() - start >= timeout)
+ {
+ threadDump("Timed out waiting for backup activation to exit");
+ }
+
+ nodeManager.stopBackup();
+ }
+ else
+ {
+ //if we are now live, behave as live
+ // We need to delete the file too, otherwise the backup will failover when we
shutdown or if the backup is
+ // started before the live
+ if(permanently)
+ {
+ nodeManager.crashLiveServer();
+ }
+ else
+ {
+ nodeManager.pauseLiveServer();
+ }
+ }
+ }
+ }
+
+ private interface Activation extends Runnable
+ {
+ void close(boolean permanently) throws Exception;
+ }
+
+ private class SharedNothingBackupActivation implements Activation
+ {
+ public void run()
+ {
+ try
+ {
+ // TODO
+
+ // Try-Connect to live server using live-connector-ref
+
+ // sit in loop and try and connect, if server is not live then it will return
NOT_LIVE
+ }
+ catch (Exception e)
+ {
+ log.error("Failure in initialisation", e);
+ }
+ }
+
+ public void close(boolean permanently) throws Exception
+ {
+ }
+ }
+
+ private class NoSharedStoreLiveActivation implements Activation
+ {
+ public void run()
+ {
+ try
+ {
+ initialisePart1();
+
+ initialisePart2();
+
+ if (identity != null)
+ {
+ log.info("Server " + identity + " is now live");
+ }
+ else
+ {
+ log.info("Server is now live");
+ }
+ }
+ catch (Exception e)
+ {
+ log.error("Failure in initialisation", e);
+ }
+ }
+
+ public void close(boolean permanently) throws Exception
+ {
+
+ }
+ }
+
+
}
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2011-09-19
23:47:54 UTC (rev 11370)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2011-09-20
00:50:32 UTC (rev 11371)
@@ -1062,7 +1062,7 @@
return;
}
-
+
consumer.receiveCredits(credits);
}
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/spi/core/protocol/ProtocolManager.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/spi/core/protocol/ProtocolManager.java 2011-09-19
23:47:54 UTC (rev 11370)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/spi/core/protocol/ProtocolManager.java 2011-09-20
00:50:32 UTC (rev 11371)
@@ -14,6 +14,7 @@
package org.hornetq.spi.core.protocol;
import org.hornetq.api.core.HornetQBuffer;
+import org.hornetq.spi.core.remoting.Acceptor;
import org.hornetq.spi.core.remoting.BufferDecoder;
import org.hornetq.spi.core.remoting.Connection;
@@ -26,7 +27,7 @@
*/
public interface ProtocolManager extends BufferDecoder
{
- ConnectionEntry createConnectionEntry(Connection connection);
+ ConnectionEntry createConnectionEntry(Acceptor acceptorUsed, Connection connection);
public void removeHandler(final String name);
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/spi/core/remoting/Acceptor.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/spi/core/remoting/Acceptor.java 2011-09-19
23:47:54 UTC (rev 11370)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/spi/core/remoting/Acceptor.java 2011-09-20
00:50:32 UTC (rev 11371)
@@ -14,6 +14,7 @@
package org.hornetq.spi.core.remoting;
import org.hornetq.core.server.HornetQComponent;
+import org.hornetq.core.server.cluster.ClusterConnection;
import org.hornetq.core.server.management.NotificationService;
/**
@@ -31,6 +32,11 @@
void pause();
/**
+ * @return the cluster connection associated with this Acceptor
+ */
+ ClusterConnection getClusterConnection();
+
+ /**
* Set the notification service for this acceptor to use.
*
* @param notificationService the notification service
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/spi/core/remoting/AcceptorFactory.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/spi/core/remoting/AcceptorFactory.java 2011-09-19
23:47:54 UTC (rev 11370)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/spi/core/remoting/AcceptorFactory.java 2011-09-20
00:50:32 UTC (rev 11371)
@@ -18,6 +18,8 @@
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
+import org.hornetq.core.server.cluster.ClusterConnection;
+
/**
* A factory for creating acceptors.
* <p/>
@@ -40,7 +42,8 @@
* @param scheduledThreadPool a scheduled thread pool
* @return an acceptor
*/
- Acceptor createAcceptor(final Map<String, Object> configuration,
+ Acceptor createAcceptor(ClusterConnection clusterConnection,
+ final Map<String, Object> configuration,
BufferHandler handler,
BufferDecoder decoder,
ConnectionLifeCycleListener listener,
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/spi/core/remoting/ConnectionLifeCycleListener.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/spi/core/remoting/ConnectionLifeCycleListener.java 2011-09-19
23:47:54 UTC (rev 11370)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/spi/core/remoting/ConnectionLifeCycleListener.java 2011-09-20
00:50:32 UTC (rev 11371)
@@ -23,11 +23,13 @@
public interface ConnectionLifeCycleListener
{
/**
- * called when a connection is created.
+ * This method is used both by client connector creation and server connection
creation through acceptors.
+ * the acceptor will be set to null on client operations
*
+ * @param The acceptor here will be always null on a client connection created event.
* @param connection the connection that has been created
*/
- void connectionCreated(Connection connection, ProtocolType protocol);
+ void connectionCreated(Acceptor acceptor, Connection connection, ProtocolType
protocol);
/**
* called when a connection is destroyed.
Modified:
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
===================================================================
---
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java 2011-09-19
23:47:54 UTC (rev 11370)
+++
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java 2011-09-20
00:50:32 UTC (rev 11371)
@@ -940,10 +940,10 @@
for (ClusterConnection cc : clusterManager.getClusterConnections())
{
out += cc.describe() + "\n";
+ out += cc.getTopology().describe();
}
}
out += "\n\nfull topology:";
- out += clusterManager.getTopology().describe();
return out + br;
}
Modified:
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/OnewayTwoNodeClusterTest.java
===================================================================
---
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/OnewayTwoNodeClusterTest.java 2011-09-19
23:47:54 UTC (rev 11370)
+++
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/OnewayTwoNodeClusterTest.java 2011-09-20
00:50:32 UTC (rev 11371)
@@ -131,10 +131,6 @@
waitForTopology(servers[0], 2);
waitForTopology(servers[1], 2);
-
- System.out.println(servers[0].getClusterManager().getTopology().describe());
-
- System.out.println(servers[1].getClusterManager().getTopology().describe());
setupSessionFactory(0, isNetty(), true);
setupSessionFactory(1, isNetty(), true);
Modified:
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/SimpleSymmetricClusterTest.java
===================================================================
---
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/SimpleSymmetricClusterTest.java 2011-09-19
23:47:54 UTC (rev 11370)
+++
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/SimpleSymmetricClusterTest.java 2011-09-20
00:50:32 UTC (rev 11371)
@@ -153,17 +153,6 @@
waitForTopology(servers[1], 3);
waitForTopology(servers[2], 3);
- for (int i = 0 ; i < 3; i++)
- {
- System.out.println("top[" + i + "]=" +
servers[i].getClusterManager().getTopology().describe());
- }
-
- for (int i = 0; i <= 2; i++)
- {
- log.info("*************************************\n " + servers[i] +
- " topology:\n" +
- servers[i].getClusterManager().getTopology().describe());
- }
setupSessionFactory(0, isNetty());
setupSessionFactory(1, isNetty());
setupSessionFactory(2, isNetty());
@@ -196,12 +185,6 @@
startServers(0, 1);
- for (int i = 0; i <= 1; i++)
- {
- log.info("*************************************\n " + servers[i] +
- " topology:\n" +
- servers[i].getClusterManager().getTopology().describe());
- }
setupSessionFactory(0, isNetty());
setupSessionFactory(1, isNetty());
@@ -266,13 +249,6 @@
for (int i = 0; i <= 4; i++)
{
- log.info("*************************************\n " + servers[i] +
- " topology:\n" +
- servers[i].getClusterManager().getTopology().describe());
- }
-
- for (int i = 0; i <= 4; i++)
- {
setupSessionFactory(i, isNetty());
}
Modified:
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/TwoWayTwoNodeClusterTest.java
===================================================================
---
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/TwoWayTwoNodeClusterTest.java 2011-09-19
23:47:54 UTC (rev 11370)
+++
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/TwoWayTwoNodeClusterTest.java 2011-09-20
00:50:32 UTC (rev 11371)
@@ -137,8 +137,6 @@
startServers(0, 1);
waitForTopology(servers[0], 2);
- System.out.println(servers[0].getClusterManager().getTopology().describe());
- System.out.println(servers[1].getClusterManager().getTopology().describe());
waitForTopology(servers[1], 2);
for (int i = 0; i < 10; i++)
@@ -148,7 +146,6 @@
log.info("#stop #test #" + i);
stopServers(1);
- System.out.println(servers[0].getClusterManager().getTopology().describe());
waitForTopology(servers[0], 1, 2000);
log.info("#start #test #" + i);
startServers(1);
Modified:
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/failover/FailBackAutoTest.java
===================================================================
---
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/failover/FailBackAutoTest.java 2011-09-19
23:47:54 UTC (rev 11370)
+++
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/failover/FailBackAutoTest.java 2011-09-20
00:50:32 UTC (rev 11371)
@@ -28,6 +28,7 @@
import org.hornetq.api.core.client.ClientProducer;
import org.hornetq.api.core.client.ClientSession;
import org.hornetq.api.core.client.ClientSessionFactory;
+import org.hornetq.api.core.client.ServerLocator;
import org.hornetq.api.core.client.SessionFailureListener;
import org.hornetq.core.client.impl.ClientSessionFactoryInternal;
import org.hornetq.core.client.impl.ServerLocatorInternal;
@@ -76,18 +77,28 @@
locator.setBlockOnDurableSend(true);
locator.setFailoverOnInitialConnection(true);
locator.setReconnectAttempts(-1);
+ ((ServerLocatorInternal)locator).setIdentity("testAutoFailback");
+
ClientSessionFactoryInternal sf = createSessionFactoryAndWaitForTopology(locator,
2);
final CountDownLatch latch = new CountDownLatch(1);
ClientSession session = sendAndConsume(sf, true);
+
+ System.out.println(locator.getTopology().describe());
MyListener listener = new MyListener(latch);
session.addFailureListener(listener);
+
+ System.out.println(locator.getTopology().describe());
liveServer.crash();
-
+
assertTrue(latch.await(5, TimeUnit.SECONDS));
+
+ log.info("backup (nowLive) topology = " +
backupServer.getServer().getClusterManager().getDefaultConnection().getTopology().describe());
+
+ log.info("Server Crash!!!");
ClientProducer producer = session.createProducer(FailoverTestBase.ADDRESS);
@@ -97,6 +108,11 @@
producer.send(message);
+ verifyMessageOnServer(1, 1);
+
+ System.out.println(locator.getTopology().describe());
+
+
session.removeFailureListener(listener);
final CountDownLatch latch2 = new CountDownLatch(1);
@@ -107,6 +123,10 @@
log.info("******* starting live server back");
liveServer.start();
+
+ Thread.sleep(1000);
+
+ System.out.println("After failback: " +
locator.getTopology().describe());
assertTrue(latch2.await(5, TimeUnit.SECONDS));
@@ -118,6 +138,8 @@
session.close();
+ verifyMessageOnServer(0, 1);
+
sf.close();
Assert.assertEquals(0, sf.numSessions());
@@ -125,6 +147,29 @@
Assert.assertEquals(0, sf.numConnections());
}
+ /**
+ * @throws Exception
+ * @throws HornetQException
+ */
+ private void verifyMessageOnServer(final int server, final int numberOfMessages)
throws Exception, HornetQException
+ {
+ ServerLocator backupLocator = createInVMLocator(server);
+ ClientSessionFactory factorybkp = backupLocator.createSessionFactory();
+ ClientSession sessionbkp = factorybkp.createSession(false, false);
+ sessionbkp.start();
+ ClientConsumer consumerbkp = sessionbkp.createConsumer(ADDRESS);
+ for (int i = 0 ; i < numberOfMessages; i++)
+ {
+ ClientMessage msg = consumerbkp.receive(1000);
+ assertNotNull(msg);
+ msg.acknowledge();
+ sessionbkp.commit();
+ }
+ sessionbkp.close();
+ factorybkp.close();
+ backupLocator.close();
+ }
+
public void testAutoFailbackThenFailover() throws Exception
{
locator.setBlockOnNonDurableSend(true);
@@ -253,7 +298,7 @@
if (createQueue)
{
- session.createQueue(FailoverTestBase.ADDRESS, FailoverTestBase.ADDRESS, null,
false);
+ session.createQueue(FailoverTestBase.ADDRESS, FailoverTestBase.ADDRESS, null,
true);
}
ClientProducer producer = session.createProducer(FailoverTestBase.ADDRESS);
@@ -288,6 +333,8 @@
}
ClientMessage message3 = consumer.receiveImmediate();
+
+ consumer.close();
Assert.assertNull(message3);
@@ -315,6 +362,7 @@
public void connectionFailed(final HornetQException me, boolean failedOver)
{
+ System.out.println("Failed, me");
latch.countDown();
}
Modified:
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/failover/GroupingFailoverTestBase.java
===================================================================
---
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/failover/GroupingFailoverTestBase.java 2011-09-19
23:47:54 UTC (rev 11370)
+++
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/failover/GroupingFailoverTestBase.java 2011-09-20
00:50:32 UTC (rev 11371)
@@ -12,16 +12,9 @@
*/
package org.hornetq.tests.integration.cluster.failover;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
-import junit.framework.Assert;
-
-import org.hornetq.api.core.HornetQException;
import org.hornetq.api.core.Message;
import org.hornetq.api.core.SimpleString;
import org.hornetq.core.server.group.impl.GroupingHandlerConfiguration;
-import org.hornetq.spi.core.protocol.RemotingConnection;
import org.hornetq.tests.integration.cluster.distribution.ClusterTestBase;
/**
@@ -167,6 +160,7 @@
closeSessionFactory(0);
Thread.sleep(1000);
+
servers[0].stop(true);
waitForServerRestart(2);
@@ -213,16 +207,4 @@
abstract boolean isSharedServer();
- private void fail(final RemotingConnection conn, final CountDownLatch latch) throws
InterruptedException
- {
- // Simulate failure on connection
- conn.fail(new HornetQException(HornetQException.NOT_CONNECTED));
-
- // Wait to be informed of failure
-
- boolean ok = latch.await(1000, TimeUnit.MILLISECONDS);
-
- Assert.assertTrue(ok);
- }
-
}
Modified:
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/failover/MultipleBackupsFailoverTestBase.java
===================================================================
---
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/failover/MultipleBackupsFailoverTestBase.java 2011-09-19
23:47:54 UTC (rev 11370)
+++
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/failover/MultipleBackupsFailoverTestBase.java 2011-09-20
00:50:32 UTC (rev 11371)
@@ -193,7 +193,7 @@
{
if (server != null)
{
- log.info("failed topology, Topology on server = " +
server.getClusterManager().getTopology().describe());
+ log.info("failed topology, Topology on server = " +
server.getClusterManager().describe());
}
}
assertTrue("expected " + topologyMembers + " members", ok);
Modified:
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/failover/MultipleLivesMultipleBackupsFailoverTest.java
===================================================================
---
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/failover/MultipleLivesMultipleBackupsFailoverTest.java 2011-09-19
23:47:54 UTC (rev 11370)
+++
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/failover/MultipleLivesMultipleBackupsFailoverTest.java 2011-09-20
00:50:32 UTC (rev 11371)
@@ -98,8 +98,6 @@
Thread.sleep(500);
servers.get(0).crash(session);
- System.out.println("server3 " +
servers.get(3).getServer().getClusterManager().getTopology().describe());
-
int liveAfter0 = waitForNewLive(10000, true, servers, 1, 2);
ServerLocator locator2 = getServerLocator(3);
Modified:
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/http/NettyHttpTest.java
===================================================================
---
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/http/NettyHttpTest.java 2011-09-19
23:47:54 UTC (rev 11370)
+++
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/http/NettyHttpTest.java 2011-09-20
00:50:32 UTC (rev 11371)
@@ -29,6 +29,7 @@
import org.hornetq.core.remoting.impl.netty.NettyConnector;
import org.hornetq.core.remoting.impl.netty.TransportConstants;
import org.hornetq.spi.core.protocol.ProtocolType;
+import org.hornetq.spi.core.remoting.Acceptor;
import org.hornetq.spi.core.remoting.BufferHandler;
import org.hornetq.spi.core.remoting.Connection;
import org.hornetq.spi.core.remoting.ConnectionLifeCycleListener;
@@ -544,7 +545,7 @@
latch = connCreatedLatch;
}
- public void connectionCreated(final Connection connection, final ProtocolType
protocol)
+ public void connectionCreated(final Acceptor acceptor, final Connection connection,
final ProtocolType protocol)
{
this.connection = connection;
if (latch != null)
Modified:
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/largemessage/mock/MockConnector.java
===================================================================
---
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/largemessage/mock/MockConnector.java 2011-09-19
23:47:54 UTC (rev 11370)
+++
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/largemessage/mock/MockConnector.java 2011-09-20
00:50:32 UTC (rev 11371)
@@ -89,7 +89,7 @@
*/
public MockConnection(final int serverID, final BufferHandler handler, final
ConnectionLifeCycleListener listener)
{
- super(serverID, handler, listener, Executors.newSingleThreadExecutor());
+ super(null, serverID, handler, listener, Executors.newSingleThreadExecutor());
}
@Override
Modified:
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyAcceptorFactoryTest.java
===================================================================
---
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyAcceptorFactoryTest.java 2011-09-19
23:47:54 UTC (rev 11370)
+++
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyAcceptorFactoryTest.java 2011-09-20
00:50:32 UTC (rev 11371)
@@ -63,7 +63,7 @@
{
}
- public void connectionCreated(final Connection connection, final ProtocolType
protocol)
+ public void connectionCreated(final Acceptor acceptor, final Connection
connection, final ProtocolType protocol)
{
}
@@ -74,7 +74,8 @@
};
- Acceptor acceptor = factory.createAcceptor(params,
+ Acceptor acceptor = factory.createAcceptor(null,
+ params,
handler,
null,
listener,
Modified:
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyAcceptorTest.java
===================================================================
---
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyAcceptorTest.java 2011-09-19
23:47:54 UTC (rev 11370)
+++
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyAcceptorTest.java 2011-09-20
00:50:32 UTC (rev 11371)
@@ -28,6 +28,7 @@
import org.hornetq.core.remoting.impl.netty.NettyAcceptor;
import org.hornetq.core.remoting.impl.netty.TransportConstants;
import org.hornetq.spi.core.protocol.ProtocolType;
+import org.hornetq.spi.core.remoting.Acceptor;
import org.hornetq.spi.core.remoting.BufferHandler;
import org.hornetq.spi.core.remoting.Connection;
import org.hornetq.spi.core.remoting.ConnectionLifeCycleListener;
@@ -80,7 +81,7 @@
{
}
- public void connectionCreated(final Connection connection, final ProtocolType
protocol)
+ public void connectionCreated(final Acceptor acceptor, final Connection
connection, final ProtocolType protocol)
{
}
Modified:
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyConnectionTest.java
===================================================================
---
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyConnectionTest.java 2011-09-19
23:47:54 UTC (rev 11370)
+++
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyConnectionTest.java 2011-09-20
00:50:32 UTC (rev 11371)
@@ -24,6 +24,7 @@
import org.hornetq.api.core.HornetQException;
import org.hornetq.core.remoting.impl.netty.NettyConnection;
import org.hornetq.spi.core.protocol.ProtocolType;
+import org.hornetq.spi.core.remoting.Acceptor;
import org.hornetq.spi.core.remoting.Connection;
import org.hornetq.spi.core.remoting.ConnectionLifeCycleListener;
import org.hornetq.tests.util.RandomUtil;
@@ -220,7 +221,7 @@
class MyListener implements ConnectionLifeCycleListener
{
- public void connectionCreated(final Connection connection, final ProtocolType
protocol)
+ public void connectionCreated(final Acceptor acceptor, final Connection connection,
final ProtocolType protocol)
{
}
Modified:
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyConnectorTest.java
===================================================================
---
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyConnectorTest.java 2011-09-19
23:47:54 UTC (rev 11370)
+++
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyConnectorTest.java 2011-09-20
00:50:32 UTC (rev 11371)
@@ -23,6 +23,7 @@
import org.hornetq.api.core.HornetQException;
import org.hornetq.core.remoting.impl.netty.NettyConnector;
import org.hornetq.spi.core.protocol.ProtocolType;
+import org.hornetq.spi.core.remoting.Acceptor;
import org.hornetq.spi.core.remoting.BufferHandler;
import org.hornetq.spi.core.remoting.Connection;
import org.hornetq.spi.core.remoting.ConnectionLifeCycleListener;
@@ -66,7 +67,7 @@
{
}
- public void connectionCreated(final Connection connection, final ProtocolType
protocol)
+ public void connectionCreated(final Acceptor acceptor, final Connection
connection, final ProtocolType protocol)
{
}
public void connectionReadyForWrites(Object connectionID, boolean ready)
@@ -106,7 +107,7 @@
{
}
- public void connectionCreated(final Connection connection, final ProtocolType
protocol)
+ public void connectionCreated(final Acceptor acceptor, final Connection
connection, final ProtocolType protocol)
{
}
Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/util/ServiceTestBase.java
===================================================================
---
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/util/ServiceTestBase.java 2011-09-19
23:47:54 UTC (rev 11370)
+++
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/util/ServiceTestBase.java 2011-09-20
00:50:32 UTC (rev 11371)
@@ -19,6 +19,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import javax.management.MBeanServer;
@@ -37,11 +38,13 @@
import org.hornetq.core.remoting.impl.invm.InVMAcceptorFactory;
import org.hornetq.core.remoting.impl.invm.InVMConnectorFactory;
import org.hornetq.core.remoting.impl.invm.InVMRegistry;
+import org.hornetq.core.remoting.impl.invm.TransportConstants;
import org.hornetq.core.remoting.impl.netty.NettyAcceptorFactory;
import org.hornetq.core.remoting.impl.netty.NettyConnectorFactory;
import org.hornetq.core.server.HornetQServer;
import org.hornetq.core.server.HornetQServers;
import org.hornetq.core.server.NodeManager;
+import org.hornetq.core.server.cluster.ClusterConnection;
import org.hornetq.core.server.impl.HornetQServerImpl;
import org.hornetq.core.settings.impl.AddressFullMessagePolicy;
import org.hornetq.core.settings.impl.AddressSettings;
@@ -112,8 +115,15 @@
log.debug("waiting for " + nodes + " on the topology for server =
" + server);
long start = System.currentTimeMillis();
+
+ Set<ClusterConnection> ccs =
server.getClusterManager().getClusterConnections();
+
+ if (ccs.size() != 1)
+ {
+ throw new IllegalStateException("You need a single cluster connection on
this version of waitForTopology on ServiceTestBase");
+ }
- Topology topology = server.getClusterManager().getTopology();
+ Topology topology = ccs.iterator().next().getTopology();
do
{
@@ -521,7 +531,19 @@
locators.add(locatorWithoutHA);
return locatorWithoutHA;
}
+
+ protected ServerLocator createInVMLocator(final int serverID)
+ {
+ Map<String, Object> server1Params = new HashMap<String, Object>();
+ if (serverID != 0)
+ {
+ server1Params.put(TransportConstants.SERVER_ID_PROP_NAME, serverID);
+ }
+
+ return HornetQClient.createServerLocatorWithHA(new
TransportConfiguration(INVM_CONNECTOR_FACTORY, server1Params));
+ }
+
protected ClientSessionFactoryImpl createFactory(final String connectorClass) throws
Exception
{
ServerLocator locator = HornetQClient.createServerLocatorWithoutHA(new
TransportConfiguration(connectorClass));