Author: borges
Date: 2011-11-24 07:25:37 -0500 (Thu, 24 Nov 2011)
New Revision: 11755
Added:
trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/ReplicationError.java
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupAuthenticationTest.java
Modified:
trunk/hornetq-core/src/main/java/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/ChannelImpl.java
trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java
trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/PacketDecoder.java
trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/BackupRegistrationFailedMessage.java
trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/BackupRegistrationMessage.java
trunk/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
trunk/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
trunk/hornetq-core/src/main/java/org/hornetq/core/server/cluster/ClusterConnection.java
trunk/hornetq-core/src/main/java/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
trunk/hornetq-core/src/main/java/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java
Log:
HORNETQ-720 HORNETQ-771 Autheticate Replicated backup request and handle replication-start
error msg.
Modified:
trunk/hornetq-core/src/main/java/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
===================================================================
---
trunk/hornetq-core/src/main/java/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java 2011-11-24
12:24:54 UTC (rev 11754)
+++
trunk/hornetq-core/src/main/java/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java 2011-11-24
12:25:37 UTC (rev 11755)
@@ -1408,7 +1408,7 @@
}
}
- private class Channel0Handler implements ChannelHandler
+ private final class Channel0Handler implements ChannelHandler
{
private final CoreRemotingConnection conn;
@@ -1498,9 +1498,12 @@
serverLocator.notifyNodeUp(topMessage.getUniqueEventID(),
topMessage.getNodeID(), topMessage.getPair(), topMessage.isLast());
}
}
+ else if (type == PacketImpl.BACKUP_REGISTRATION_FAILED)
+ {
+ // no-op
+ }
}
-
}
public class CloseRunnable implements Runnable
Modified:
trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/ChannelImpl.java
===================================================================
---
trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/ChannelImpl.java 2011-11-24
12:24:54 UTC (rev 11754)
+++
trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/ChannelImpl.java 2011-11-24
12:25:37 UTC (rev 11755)
@@ -13,6 +13,7 @@
package org.hornetq.core.protocol.core.impl;
+import java.util.EnumSet;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
@@ -59,10 +60,18 @@
{
this.id = id;
}
+
+ protected static String idToString(long code)
+ {
+ for (CHANNEL_ID channel:EnumSet.allOf(CHANNEL_ID.class)){
+ if (channel.id==code) return channel.toString();
+ }
+ return Long.toString(code);
+ }
}
private static final Logger log = Logger.getLogger(ChannelImpl.class);
-
+
private static final boolean isTrace = log.isTraceEnabled();
private volatile long id;
@@ -118,11 +127,11 @@
resendCache = null;
}
}
-
+
public boolean supports(final byte packetType)
{
int version = connection.getClientVersion();
-
+
switch (packetType)
{
case PacketImpl.CLUSTER_TOPOLOGY_V2:
@@ -198,7 +207,7 @@
synchronized (sendLock)
{
packet.setChannelID(id);
-
+
if (isTrace)
{
log.trace("Sending packet nonblocking " + packet + " on
channeID=" + id);
@@ -237,7 +246,7 @@
{
lock.unlock();
}
-
+
if (isTrace)
{
log.trace("Writing buffer for channelID=" + id);
@@ -351,7 +360,10 @@
{
if (confWindowSize < 0)
{
- throw new IllegalStateException("You can't set confirmationHandler on a
connection with confirmation-window-size < 0. Look at the documentation for more
information.");
+ final String msg =
+ "You can't set confirmationHandler on a connection with
confirmation-window-size < 0."
+ + " Look at the documentation for more
information.";
+ throw new IllegalStateException(msg);
}
commandConfirmationHandler = handler;
}
@@ -575,4 +587,10 @@
firstStoredCommandID += numberToClear;
}
+
+ @Override
+ public String toString()
+ {
+ return "Channel[id=" + CHANNEL_ID.idToString(id) + ", handler="
+ handler + "]";
+ }
}
Modified:
trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java
===================================================================
---
trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java 2011-11-24
12:24:54 UTC (rev 11754)
+++
trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java 2011-11-24
12:25:37 UTC (rev 11755)
@@ -17,7 +17,6 @@
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
-import java.util.concurrent.Executors;
import org.hornetq.api.core.HornetQBuffer;
import org.hornetq.api.core.HornetQException;
@@ -44,6 +43,7 @@
import
org.hornetq.core.protocol.core.impl.wireformat.SubscribeClusterTopologyUpdatesMessageV2;
import org.hornetq.core.remoting.CloseListener;
import org.hornetq.core.server.HornetQServer;
+import org.hornetq.core.server.cluster.ClusterConnection;
import org.hornetq.spi.core.protocol.ConnectionEntry;
import org.hornetq.spi.core.protocol.ProtocolManager;
import org.hornetq.spi.core.protocol.RemotingConnection;
@@ -211,33 +211,23 @@
} else if (packet.getType() == PacketImpl.BACKUP_REGISTRATION)
{
BackupRegistrationMessage msg = (BackupRegistrationMessage)packet;
+ ClusterConnection clusterConnection =
acceptorUsed.getClusterConnection();
- try {
- server.startReplication(rc, acceptorUsed.getClusterConnection(),
getPair(msg.getConnector(), true));
- } catch (HornetQException e){
- channel0.send(new BackupRegistrationFailedMessage(e));
- }
- }
- else if (packet.getType() == PacketImpl.BACKUP_REGISTRATION_FAILED)
- {
- assert server.getConfiguration().isBackup();
- assert !server.getConfiguration().isSharedStore();
- log.warn("Replication failed to start because of exception with error
" +
- ((BackupRegistrationFailedMessage)packet).getCause());
- Executors.newSingleThreadExecutor().execute(new Runnable()
+ if (clusterConnection.verify(msg.getClusterUser(),
msg.getClusterPassword()))
{
- public void run()
+ try
{
- try
- {
- server.stop();
- }
- catch (Exception e)
- {
- log.error("Error while stopping server: " + server,
e);
- }
+ server.startReplication(rc, clusterConnection,
getPair(msg.getConnector(), true));
}
- });
+ catch (HornetQException e)
+ {
+ channel0.send(new BackupRegistrationFailedMessage(e));
+ }
+ }
+ else
+ {
+ channel0.send(new BackupRegistrationFailedMessage(null));
+ }
}
}
Modified:
trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/PacketDecoder.java
===================================================================
---
trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/PacketDecoder.java 2011-11-24
12:24:54 UTC (rev 11754)
+++
trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/PacketDecoder.java 2011-11-24
12:25:37 UTC (rev 11755)
@@ -88,6 +88,7 @@
import org.hornetq.api.core.HornetQBuffer;
import org.hornetq.core.protocol.core.Packet;
+import org.hornetq.core.protocol.core.impl.wireformat.BackupRegistrationFailedMessage;
import org.hornetq.core.protocol.core.impl.wireformat.BackupRegistrationMessage;
import org.hornetq.core.protocol.core.impl.wireformat.ClusterTopologyChangeMessage;
import org.hornetq.core.protocol.core.impl.wireformat.ClusterTopologyChangeMessage_V2;
@@ -547,6 +548,11 @@
packet = new BackupRegistrationMessage();
break;
}
+ case PacketImpl.BACKUP_REGISTRATION_FAILED:
+ {
+ packet = new BackupRegistrationFailedMessage();
+ break;
+ }
case PacketImpl.REPLICATION_START_FINISH_SYNC:
{
packet = new ReplicationStartSyncMessage();
Modified:
trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/BackupRegistrationFailedMessage.java
===================================================================
---
trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/BackupRegistrationFailedMessage.java 2011-11-24
12:24:54 UTC (rev 11754)
+++
trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/BackupRegistrationFailedMessage.java 2011-11-24
12:25:37 UTC (rev 11755)
@@ -13,28 +13,70 @@
public final class BackupRegistrationFailedMessage extends PacketImpl
{
- int errorCode;
+ enum BackupRegistrationProblem
+ {
+ EXCEPTION(0), AUTHENTICATION(1);
+ final int code;
+ private BackupRegistrationProblem(int code)
+ {
+ this.code = code;
+ }
+ }
+
+ int errorCode = -1;
+ BackupRegistrationProblem problem;
+
public BackupRegistrationFailedMessage(HornetQException e)
{
super(BACKUP_REGISTRATION_FAILED);
- errorCode = e.getCode();
+ if (e != null)
+ {
+ errorCode = e.getCode();
+ problem = BackupRegistrationProblem.EXCEPTION;
+ }
+ else
+ {
+ problem = BackupRegistrationProblem.AUTHENTICATION;
+ }
}
+ public BackupRegistrationFailedMessage()
+ {
+ super(BACKUP_REGISTRATION_FAILED);
+ }
+
public int getCause()
{
return errorCode;
}
+ public BackupRegistrationProblem getRegistrationProblem()
+ {
+ return problem;
+ }
+
@Override
public void encodeRest(final HornetQBuffer buffer)
{
- buffer.writeInt(errorCode);
+ buffer.writeInt(problem.code);
+ if (problem == BackupRegistrationProblem.EXCEPTION)
+ {
+ buffer.writeInt(errorCode);
+ }
}
@Override
public void decodeRest(final HornetQBuffer buffer)
{
- errorCode = buffer.readInt();
+ if (buffer.readInt() == BackupRegistrationProblem.AUTHENTICATION.code)
+ {
+ problem = BackupRegistrationProblem.AUTHENTICATION;
+ }
+ else
+ {
+ problem = BackupRegistrationProblem.EXCEPTION;
+ errorCode = buffer.readInt();
+ }
}
}
Modified:
trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/BackupRegistrationMessage.java
===================================================================
---
trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/BackupRegistrationMessage.java 2011-11-24
12:24:54 UTC (rev 11754)
+++
trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/BackupRegistrationMessage.java 2011-11-24
12:25:37 UTC (rev 11755)
@@ -1,6 +1,3 @@
-/**
- *
- */
package org.hornetq.core.protocol.core.impl.wireformat;
import org.hornetq.api.core.HornetQBuffer;
@@ -8,10 +5,13 @@
import org.hornetq.core.protocol.core.impl.PacketImpl;
/**
- * Registers a backup node with its live server.
+ * Registers a given backup-server as the replicating backup of a live server (i.e. a
regular
+ * HornetQ).
* <p>
- * After registration the live server will initiate synchronization of its state with the
new backup
- * node.
+ * If it succeeds the backup will start synchronization of its state with the new backup
node, and
+ * replicating any new data. If it fails the backup server will receive a message
indicating
+ * failure, and should shutdown.
+ * @see BackupRegistrationFailedMessage
*/
public final class BackupRegistrationMessage extends PacketImpl
{
@@ -20,11 +20,17 @@
private String nodeID;
- public BackupRegistrationMessage(String nodeId, TransportConfiguration tc)
+ private String clusterUser;
+
+ private String clusterPassword;
+
+ public BackupRegistrationMessage(String nodeId, TransportConfiguration tc, String
user, String password)
{
this();
connector = tc;
nodeID = nodeId;
+ clusterUser = user;
+ clusterPassword = password;
}
public BackupRegistrationMessage()
@@ -46,6 +52,8 @@
public void encodeRest(final HornetQBuffer buffer)
{
buffer.writeString(nodeID);
+ buffer.writeString(clusterUser);
+ buffer.writeString(clusterPassword);
connector.encode(buffer);
}
@@ -53,8 +61,26 @@
public void decodeRest(final HornetQBuffer buffer)
{
nodeID = buffer.readString();
+ clusterUser = buffer.readString();
+ clusterPassword = buffer.readString();
connector = new TransportConfiguration();
connector.decode(buffer);
}
+ /**
+ * @return
+ */
+ public String getClusterUser()
+ {
+ return clusterUser;
+ }
+
+ /**
+ * @return
+ */
+ public String getClusterPassword()
+ {
+ return clusterPassword;
+ }
+
}
Modified:
trunk/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
===================================================================
---
trunk/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java 2011-11-24
12:24:54 UTC (rev 11754)
+++
trunk/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java 2011-11-24
12:25:37 UTC (rev 11755)
@@ -243,7 +243,8 @@
public synchronized void start() throws Exception
{
Configuration config = server.getConfiguration();
-
+ try
+ {
storage = server.getStorageManager();
storage.start();
@@ -270,7 +271,12 @@
pageManager.start();
started = true;
-
+ }
+ catch (Exception e)
+ {
+ if (!server.isStopped())
+ throw e;
+ }
}
public synchronized void stop() throws Exception
Modified:
trunk/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
===================================================================
---
trunk/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java 2011-11-24
12:24:54 UTC (rev 11754)
+++
trunk/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java 2011-11-24
12:25:37 UTC (rev 11755)
@@ -98,7 +98,7 @@
public ReplicationManagerImpl(CoreRemotingConnection remotingConnection, final
ExecutorFactory executorFactory)
{
this.executorFactory = executorFactory;
- replicatingChannel = remotingConnection.getChannel(CHANNEL_ID.REPLICATION.id, -1);
+ this.replicatingChannel = remotingConnection.getChannel(CHANNEL_ID.REPLICATION.id,
-1);
this.remotingConnection = remotingConnection;
}
Modified:
trunk/hornetq-core/src/main/java/org/hornetq/core/server/cluster/ClusterConnection.java
===================================================================
---
trunk/hornetq-core/src/main/java/org/hornetq/core/server/cluster/ClusterConnection.java 2011-11-24
12:24:54 UTC (rev 11754)
+++
trunk/hornetq-core/src/main/java/org/hornetq/core/server/cluster/ClusterConnection.java 2011-11-24
12:25:37 UTC (rev 11755)
@@ -27,7 +27,7 @@
* A ClusterConnection
*
* @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
- *
+ *
* Created 23 Jan 2009 14:51:55
*
*
@@ -37,32 +37,40 @@
SimpleString getName();
String getNodeID();
-
+
HornetQServer getServer();
-
+
void nodeAnnounced(long eventUID, String nodeID, Pair<TransportConfiguration,
TransportConfiguration> connectorPair, boolean backup);
void addClusterTopologyListener(ClusterTopologyListener listener, boolean
clusterConnection);
-
+
void removeClusterTopologyListener(ClusterTopologyListener listener, boolean
clusterConnection);
-
+
/**
* @return a Map of node ID and addresses
*/
Map<String, String> getNodes();
void activate() throws Exception;
-
+
TransportConfiguration getConnector();
-
+
Topology getTopology();
-
+
void flushExecutor();
// for debug
String describe();
void informTopology();
-
+
void announceBackup();
+
+ /**
+ * Verifies whether user and password match the ones configured for this
ClusterConnection.
+ * @param clusterUser
+ * @param clusterPassword
+ * @return
+ */
+ boolean verify(String clusterUser, String clusterPassword);
}
Modified:
trunk/hornetq-core/src/main/java/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
===================================================================
---
trunk/hornetq-core/src/main/java/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2011-11-24
12:24:54 UTC (rev 11754)
+++
trunk/hornetq-core/src/main/java/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2011-11-24
12:25:37 UTC (rev 11755)
@@ -1502,7 +1502,7 @@
@Override
public String toString()
{
- return "ClusterConnectionImpl@" + System.identityHashCode(this) +
+ return "ClusterConnectionImpl@" + System.identityHashCode(this) +
"[nodeUUID=" + nodeUUID +
", connector=" +
connector +
@@ -1563,9 +1563,6 @@
}
}
- /* (non-Javadoc)
- * @see java.lang.Object#toString()
- */
@Override
public String toString()
{
@@ -1590,4 +1587,10 @@
}
}
+
+ @Override
+ public boolean verify(String clusterUser0, String clusterPassword0)
+ {
+ return clusterUser.equals(clusterUser0) &&
clusterPassword.equals(clusterPassword0);
+ }
}
Modified:
trunk/hornetq-core/src/main/java/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
===================================================================
---
trunk/hornetq-core/src/main/java/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java 2011-11-24
12:24:54 UTC (rev 11754)
+++
trunk/hornetq-core/src/main/java/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java 2011-11-24
12:25:37 UTC (rev 11755)
@@ -381,7 +381,8 @@
log.warn("No connector with name '" + config.getConnectorName()
+ "'. backup cannot be announced.");
return;
}
- liveChannel.send(new BackupRegistrationMessage(nodeUUID.toString(),
connector));
+ liveChannel.send(new BackupRegistrationMessage(nodeUUID.toString(), connector,
configuration.getClusterUser(),
+
configuration.getClusterPassword()));
}
else
{
Modified:
trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
---
trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java 2011-11-24
12:24:54 UTC (rev 11754)
+++
trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java 2011-11-24
12:25:37 UTC (rev 11755)
@@ -1287,7 +1287,7 @@
if (ConfigurationImpl.DEFAULT_CLUSTER_USER.equals(configuration.getClusterUser())
&&
ConfigurationImpl.DEFAULT_CLUSTER_PASSWORD.equals(configuration.getClusterPassword()))
{
- log.warn("Security risk! It has been detected that the cluster admin user
and password " + "have not been changed from the installation default. "
+ log.warn("Security risk! HornetQ is running with the default cluster admin
user and default password. "
+ "Please see the HornetQ user guide, cluster chapter, for
instructions on how to do this.");
}
@@ -2047,7 +2047,7 @@
replicationEndpoint.setQuorumManager(quorumManager);
serverLocator0.setReconnectAttempts(-1);
-
+ serverLocator0.addInterceptor(new ReplicationError(HornetQServerImpl.this));
threadPool.execute(new Runnable()
{
@Override
@@ -2064,7 +2064,6 @@
CoreRemotingConnection liveConnection =
liveServerSessionFactory.getConnection();
Channel pingChannel = liveConnection.getChannel(CHANNEL_ID.PING.id,
-1);
Channel replicationChannel =
liveConnection.getChannel(CHANNEL_ID.REPLICATION.id, -1);
-
connectToReplicationEndpoint(replicationChannel);
replicationEndpoint.start();
clusterManager.announceReplicatingBackup(pingChannel);
Added:
trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/ReplicationError.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/ReplicationError.java
(rev 0)
+++
trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/ReplicationError.java 2011-11-24
12:25:37 UTC (rev 11755)
@@ -0,0 +1,51 @@
+/**
+ *
+ */
+package org.hornetq.core.server.impl;
+
+import org.hornetq.api.core.HornetQException;
+import org.hornetq.api.core.Interceptor;
+import org.hornetq.core.client.impl.ClientSessionFactoryImpl;
+import org.hornetq.core.logging.Logger;
+import org.hornetq.core.protocol.core.Packet;
+import org.hornetq.core.protocol.core.impl.PacketImpl;
+import org.hornetq.core.server.HornetQServer;
+import org.hornetq.spi.core.protocol.RemotingConnection;
+
+/**
+ * Stops the backup in case of an error at the start of Replication.
+ * <p>
+ * Using an interceptor for the task to avoid a server reference inside of the
'basic' channel-0
+ * handler at {@link ClientSessionFactoryImpl#Channel0Handler}. As {@link
ClientSessionFactoryImpl}
+ * is also shipped in the HQ-client JAR (which does not include {@link HornetQServer}).
+ */
+final class ReplicationError implements Interceptor
+{
+ private final HornetQServer server;
+ private static final Logger log = Logger.getLogger(ReplicationError.class);
+
+ public ReplicationError(HornetQServer server)
+ {
+ this.server = server;
+ }
+
+ @Override
+ public boolean intercept(Packet packet, RemotingConnection connection) throws
HornetQException
+ {
+ System.out.println(packet);
+ if (packet.getType() != PacketImpl.BACKUP_REGISTRATION_FAILED)
+ return true;
+ log.warn("Failed to register as backup. Stopping the server.");
+ try
+ {
+ server.stop();
+ }
+ catch (Exception e)
+ {
+ throw new HornetQException(HornetQException.INTERNAL_ERROR, "error trying
to stop " + server, e);
+ }
+
+ return false;
+ }
+
+}
Added:
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupAuthenticationTest.java
===================================================================
---
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupAuthenticationTest.java
(rev 0)
+++
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupAuthenticationTest.java 2011-11-24
12:25:37 UTC (rev 11755)
@@ -0,0 +1,79 @@
+/**
+ *
+ */
+package org.hornetq.tests.integration.cluster.failover;
+
+import java.util.Arrays;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.hornetq.api.core.HornetQException;
+import org.hornetq.api.core.Interceptor;
+import org.hornetq.api.core.TransportConfiguration;
+import org.hornetq.core.protocol.core.Packet;
+import org.hornetq.core.protocol.core.impl.PacketImpl;
+import org.hornetq.spi.core.protocol.RemotingConnection;
+import org.hornetq.tests.util.TransportConfigurationUtils;
+
+public class BackupAuthenticationTest extends FailoverTestBase
+{
+ private static CountDownLatch latch;
+ @Override
+ public void setUp() throws Exception
+ {
+ startBackupServer = false;
+ latch = new CountDownLatch(1);
+ super.setUp();
+ }
+
+ public void testPasswordSetting() throws Exception
+ {
+ waitForServer(liveServer.getServer());
+ backupServer.start();
+ assertTrue(latch.await(5, TimeUnit.SECONDS));
+ /*
+ * can't intercept the message at the backup, so we intercept the registration
message at the
+ * live.
+ */
+ Thread.sleep(2000);
+ assertFalse("backup should have stopped", backupServer.isStarted());
+ backupConfig.setClusterPassword(CLUSTER_PASSWORD);
+ backupServer.start();
+ waitForServer(backupServer.getServer());
+ }
+
+ @Override
+ protected void createConfigs() throws Exception
+ {
+ createReplicatedConfigs();
+ backupConfig.setClusterPassword("crocodile");
+
liveConfig.setInterceptorClassNames(Arrays.asList(NotifyingInterceptor.class.getName()));
+ }
+
+ @Override
+ protected TransportConfiguration getAcceptorTransportConfiguration(boolean live)
+ {
+ return TransportConfigurationUtils.getInVMAcceptor(live);
+ }
+
+ @Override
+ protected TransportConfiguration getConnectorTransportConfiguration(boolean live)
+ {
+ return TransportConfigurationUtils.getInVMConnector(live);
+ }
+
+ public static final class NotifyingInterceptor implements Interceptor
+ {
+
+ @Override
+ public boolean intercept(Packet packet, RemotingConnection connection) throws
HornetQException
+ {
+ System.out.println("intercept? wtf " + packet);
+ if (packet.getType() == PacketImpl.BACKUP_REGISTRATION)
+ {
+ latch.countDown();
+ }
+ return true;
+ }
+ }
+}