[hornetq-commits] JBoss hornetq SVN: r11755 - in trunk: hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl and 6 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Thu Nov 24 07:25:38 EST 2011


Author: borges
Date: 2011-11-24 07:25:37 -0500 (Thu, 24 Nov 2011)
New Revision: 11755

Added:
   trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/ReplicationError.java
   trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupAuthenticationTest.java
Modified:
   trunk/hornetq-core/src/main/java/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
   trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/ChannelImpl.java
   trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java
   trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/PacketDecoder.java
   trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/BackupRegistrationFailedMessage.java
   trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/BackupRegistrationMessage.java
   trunk/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
   trunk/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
   trunk/hornetq-core/src/main/java/org/hornetq/core/server/cluster/ClusterConnection.java
   trunk/hornetq-core/src/main/java/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
   trunk/hornetq-core/src/main/java/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
   trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java
Log:
HORNETQ-720 HORNETQ-771 Autheticate Replicated backup request and handle replication-start error msg.

Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java	2011-11-24 12:24:54 UTC (rev 11754)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java	2011-11-24 12:25:37 UTC (rev 11755)
@@ -1408,7 +1408,7 @@
       }
    }
 
-   private class Channel0Handler implements ChannelHandler
+   private final class Channel0Handler implements ChannelHandler
    {
       private final CoreRemotingConnection conn;
 
@@ -1498,9 +1498,12 @@
                serverLocator.notifyNodeUp(topMessage.getUniqueEventID(), topMessage.getNodeID(), topMessage.getPair(), topMessage.isLast());
             }
          }
+         else if (type == PacketImpl.BACKUP_REGISTRATION_FAILED)
+         {
+            // no-op
+         }
       }
 
-
    }
 
    public class CloseRunnable implements Runnable

Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/ChannelImpl.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/ChannelImpl.java	2011-11-24 12:24:54 UTC (rev 11754)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/ChannelImpl.java	2011-11-24 12:25:37 UTC (rev 11755)
@@ -13,6 +13,7 @@
 
 package org.hornetq.core.protocol.core.impl;
 
+import java.util.EnumSet;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.Condition;
@@ -59,10 +60,18 @@
       {
          this.id = id;
       }
+
+      protected static String idToString(long code)
+      {
+         for (CHANNEL_ID channel:EnumSet.allOf(CHANNEL_ID.class)){
+            if (channel.id==code) return channel.toString();
+         }
+         return Long.toString(code);
+      }
    }
 
    private static final Logger log = Logger.getLogger(ChannelImpl.class);
-   
+
    private static final boolean isTrace = log.isTraceEnabled();
 
    private volatile long id;
@@ -118,11 +127,11 @@
          resendCache = null;
       }
    }
-   
+
    public boolean supports(final byte packetType)
    {
       int version = connection.getClientVersion();
-      
+
       switch (packetType)
       {
          case PacketImpl.CLUSTER_TOPOLOGY_V2:
@@ -198,7 +207,7 @@
       synchronized (sendLock)
       {
          packet.setChannelID(id);
-         
+
          if (isTrace)
          {
             log.trace("Sending packet nonblocking " + packet + " on channeID=" + id);
@@ -237,7 +246,7 @@
          {
             lock.unlock();
          }
-         
+
          if (isTrace)
          {
             log.trace("Writing buffer for channelID=" + id);
@@ -351,7 +360,10 @@
    {
       if (confWindowSize < 0)
       {
-         throw new IllegalStateException("You can't set confirmationHandler on a connection with confirmation-window-size < 0. Look at the documentation for more information.");
+         final String msg =
+                  "You can't set confirmationHandler on a connection with confirmation-window-size < 0."
+                           + " Look at the documentation for more information.";
+         throw new IllegalStateException(msg);
       }
       commandConfirmationHandler = handler;
    }
@@ -575,4 +587,10 @@
 
       firstStoredCommandID += numberToClear;
    }
+
+   @Override
+   public String toString()
+   {
+      return "Channel[id=" + CHANNEL_ID.idToString(id) + ", handler=" + handler + "]";
+   }
 }

Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java	2011-11-24 12:24:54 UTC (rev 11754)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java	2011-11-24 12:25:37 UTC (rev 11755)
@@ -17,7 +17,6 @@
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Executor;
-import java.util.concurrent.Executors;
 
 import org.hornetq.api.core.HornetQBuffer;
 import org.hornetq.api.core.HornetQException;
@@ -44,6 +43,7 @@
 import org.hornetq.core.protocol.core.impl.wireformat.SubscribeClusterTopologyUpdatesMessageV2;
 import org.hornetq.core.remoting.CloseListener;
 import org.hornetq.core.server.HornetQServer;
+import org.hornetq.core.server.cluster.ClusterConnection;
 import org.hornetq.spi.core.protocol.ConnectionEntry;
 import org.hornetq.spi.core.protocol.ProtocolManager;
 import org.hornetq.spi.core.protocol.RemotingConnection;
@@ -211,33 +211,23 @@
             } else if (packet.getType() == PacketImpl.BACKUP_REGISTRATION)
             {
                BackupRegistrationMessage msg = (BackupRegistrationMessage)packet;
+               ClusterConnection clusterConnection = acceptorUsed.getClusterConnection();
 
-               try {
-                  server.startReplication(rc, acceptorUsed.getClusterConnection(), getPair(msg.getConnector(), true));
-               } catch (HornetQException e){
-                 channel0.send(new BackupRegistrationFailedMessage(e));
-               }
-            }
-            else if (packet.getType() == PacketImpl.BACKUP_REGISTRATION_FAILED)
-            {
-               assert server.getConfiguration().isBackup();
-               assert !server.getConfiguration().isSharedStore();
-               log.warn("Replication failed to start because of exception with error " +
-                        ((BackupRegistrationFailedMessage)packet).getCause());
-               Executors.newSingleThreadExecutor().execute(new Runnable()
+               if (clusterConnection.verify(msg.getClusterUser(), msg.getClusterPassword()))
                {
-                  public void run()
+                  try
                   {
-                     try
-                     {
-                        server.stop();
-                     }
-                     catch (Exception e)
-                     {
-                        log.error("Error while stopping server: " + server, e);
-                     }
+                     server.startReplication(rc, clusterConnection, getPair(msg.getConnector(), true));
                   }
-               });
+                  catch (HornetQException e)
+                  {
+                     channel0.send(new BackupRegistrationFailedMessage(e));
+                  }
+               }
+               else
+               {
+                  channel0.send(new BackupRegistrationFailedMessage(null));
+               }
             }
          }
 

Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/PacketDecoder.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/PacketDecoder.java	2011-11-24 12:24:54 UTC (rev 11754)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/PacketDecoder.java	2011-11-24 12:25:37 UTC (rev 11755)
@@ -88,6 +88,7 @@
 
 import org.hornetq.api.core.HornetQBuffer;
 import org.hornetq.core.protocol.core.Packet;
+import org.hornetq.core.protocol.core.impl.wireformat.BackupRegistrationFailedMessage;
 import org.hornetq.core.protocol.core.impl.wireformat.BackupRegistrationMessage;
 import org.hornetq.core.protocol.core.impl.wireformat.ClusterTopologyChangeMessage;
 import org.hornetq.core.protocol.core.impl.wireformat.ClusterTopologyChangeMessage_V2;
@@ -547,6 +548,11 @@
             packet = new BackupRegistrationMessage();
             break;
          }
+         case PacketImpl.BACKUP_REGISTRATION_FAILED:
+         {
+            packet = new BackupRegistrationFailedMessage();
+            break;
+         }
          case PacketImpl.REPLICATION_START_FINISH_SYNC:
          {
             packet = new ReplicationStartSyncMessage();

Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/BackupRegistrationFailedMessage.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/BackupRegistrationFailedMessage.java	2011-11-24 12:24:54 UTC (rev 11754)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/BackupRegistrationFailedMessage.java	2011-11-24 12:25:37 UTC (rev 11755)
@@ -13,28 +13,70 @@
 public final class BackupRegistrationFailedMessage extends PacketImpl
 {
 
-   int errorCode;
+   enum BackupRegistrationProblem
+   {
+      EXCEPTION(0), AUTHENTICATION(1);
+      final int code;
 
+      private BackupRegistrationProblem(int code)
+      {
+         this.code = code;
+      }
+   }
+
+   int errorCode = -1;
+   BackupRegistrationProblem problem;
+
    public BackupRegistrationFailedMessage(HornetQException e)
    {
       super(BACKUP_REGISTRATION_FAILED);
-      errorCode = e.getCode();
+      if (e != null)
+      {
+         errorCode = e.getCode();
+         problem = BackupRegistrationProblem.EXCEPTION;
+      }
+      else
+      {
+         problem = BackupRegistrationProblem.AUTHENTICATION;
+      }
    }
 
+   public BackupRegistrationFailedMessage()
+   {
+      super(BACKUP_REGISTRATION_FAILED);
+   }
+
    public int getCause()
    {
       return errorCode;
    }
 
+   public BackupRegistrationProblem getRegistrationProblem()
+   {
+      return problem;
+   }
+
    @Override
    public void encodeRest(final HornetQBuffer buffer)
    {
-      buffer.writeInt(errorCode);
+      buffer.writeInt(problem.code);
+      if (problem == BackupRegistrationProblem.EXCEPTION)
+      {
+         buffer.writeInt(errorCode);
+      }
    }
 
    @Override
    public void decodeRest(final HornetQBuffer buffer)
    {
-      errorCode = buffer.readInt();
+      if (buffer.readInt() == BackupRegistrationProblem.AUTHENTICATION.code)
+      {
+         problem = BackupRegistrationProblem.AUTHENTICATION;
+      }
+      else
+      {
+         problem = BackupRegistrationProblem.EXCEPTION;
+         errorCode = buffer.readInt();
+      }
    }
 }

Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/BackupRegistrationMessage.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/BackupRegistrationMessage.java	2011-11-24 12:24:54 UTC (rev 11754)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/BackupRegistrationMessage.java	2011-11-24 12:25:37 UTC (rev 11755)
@@ -1,6 +1,3 @@
-/**
- *
- */
 package org.hornetq.core.protocol.core.impl.wireformat;
 
 import org.hornetq.api.core.HornetQBuffer;
@@ -8,10 +5,13 @@
 import org.hornetq.core.protocol.core.impl.PacketImpl;
 
 /**
- * Registers a backup node with its live server.
+ * Registers a given backup-server as the replicating backup of a live server (i.e. a regular
+ * HornetQ).
  * <p>
- * After registration the live server will initiate synchronization of its state with the new backup
- * node.
+ * If it succeeds the backup will start synchronization of its state with the new backup node, and
+ * replicating any new data. If it fails the backup server will receive a message indicating
+ * failure, and should shutdown.
+ * @see BackupRegistrationFailedMessage
  */
 public final class BackupRegistrationMessage extends PacketImpl
 {
@@ -20,11 +20,17 @@
 
    private String nodeID;
 
-   public BackupRegistrationMessage(String nodeId, TransportConfiguration tc)
+   private String clusterUser;
+
+   private String clusterPassword;
+
+   public BackupRegistrationMessage(String nodeId, TransportConfiguration tc, String user, String password)
    {
       this();
       connector = tc;
       nodeID = nodeId;
+      clusterUser = user;
+      clusterPassword = password;
    }
 
    public BackupRegistrationMessage()
@@ -46,6 +52,8 @@
    public void encodeRest(final HornetQBuffer buffer)
    {
       buffer.writeString(nodeID);
+      buffer.writeString(clusterUser);
+      buffer.writeString(clusterPassword);
       connector.encode(buffer);
    }
 
@@ -53,8 +61,26 @@
    public void decodeRest(final HornetQBuffer buffer)
    {
       nodeID = buffer.readString();
+      clusterUser = buffer.readString();
+      clusterPassword = buffer.readString();
       connector = new TransportConfiguration();
       connector.decode(buffer);
    }
 
+   /**
+    * @return
+    */
+   public String getClusterUser()
+   {
+      return clusterUser;
+   }
+
+   /**
+    * @return
+    */
+   public String getClusterPassword()
+   {
+      return clusterPassword;
+   }
+
 }

Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java	2011-11-24 12:24:54 UTC (rev 11754)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java	2011-11-24 12:25:37 UTC (rev 11755)
@@ -243,7 +243,8 @@
    public synchronized void start() throws Exception
    {
       Configuration config = server.getConfiguration();
-
+      try
+      {
       storage = server.getStorageManager();
       storage.start();
 
@@ -270,7 +271,12 @@
       pageManager.start();
 
       started = true;
-
+      }
+      catch (Exception e)
+      {
+         if (!server.isStopped())
+            throw e;
+      }
    }
 
    public synchronized void stop() throws Exception

Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java	2011-11-24 12:24:54 UTC (rev 11754)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java	2011-11-24 12:25:37 UTC (rev 11755)
@@ -98,7 +98,7 @@
    public ReplicationManagerImpl(CoreRemotingConnection remotingConnection, final ExecutorFactory executorFactory)
    {
       this.executorFactory = executorFactory;
-      replicatingChannel = remotingConnection.getChannel(CHANNEL_ID.REPLICATION.id, -1);
+      this.replicatingChannel = remotingConnection.getChannel(CHANNEL_ID.REPLICATION.id, -1);
       this.remotingConnection = remotingConnection;
    }
 

Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/server/cluster/ClusterConnection.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/server/cluster/ClusterConnection.java	2011-11-24 12:24:54 UTC (rev 11754)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/server/cluster/ClusterConnection.java	2011-11-24 12:25:37 UTC (rev 11755)
@@ -27,7 +27,7 @@
  * A ClusterConnection
  *
  * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- * 
+ *
  * Created 23 Jan 2009 14:51:55
  *
  *
@@ -37,32 +37,40 @@
    SimpleString getName();
 
    String getNodeID();
-   
+
    HornetQServer getServer();
-   
+
    void nodeAnnounced(long eventUID, String nodeID, Pair<TransportConfiguration, TransportConfiguration> connectorPair, boolean backup);
 
    void addClusterTopologyListener(ClusterTopologyListener listener, boolean clusterConnection);
-   
+
    void removeClusterTopologyListener(ClusterTopologyListener listener, boolean clusterConnection);
-   
+
    /**
     * @return a Map of node ID and addresses
     */
    Map<String, String> getNodes();
 
    void activate() throws Exception;
-   
+
    TransportConfiguration getConnector();
-   
+
    Topology getTopology();
-   
+
    void flushExecutor();
 
    // for debug
    String describe();
 
    void informTopology();
-   
+
    void announceBackup();
+
+   /**
+    * Verifies whether user and password match the ones configured for this ClusterConnection.
+    * @param clusterUser
+    * @param clusterPassword
+    * @return
+    */
+   boolean verify(String clusterUser, String clusterPassword);
 }

Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java	2011-11-24 12:24:54 UTC (rev 11754)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java	2011-11-24 12:25:37 UTC (rev 11755)
@@ -1502,7 +1502,7 @@
    @Override
    public String toString()
    {
-      return "ClusterConnectionImpl@" + System.identityHashCode(this)  + 
+      return "ClusterConnectionImpl@" + System.identityHashCode(this)  +
              "[nodeUUID=" + nodeUUID +
              ", connector=" +
              connector +
@@ -1563,9 +1563,6 @@
          }
       }
 
-      /* (non-Javadoc)
-       * @see java.lang.Object#toString()
-       */
       @Override
       public String toString()
       {
@@ -1590,4 +1587,10 @@
 
       }
    }
+
+   @Override
+   public boolean verify(String clusterUser0, String clusterPassword0)
+   {
+      return clusterUser.equals(clusterUser0) && clusterPassword.equals(clusterPassword0);
+   }
 }

Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java	2011-11-24 12:24:54 UTC (rev 11754)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java	2011-11-24 12:25:37 UTC (rev 11755)
@@ -381,7 +381,8 @@
             log.warn("No connector with name '" + config.getConnectorName() + "'. backup cannot be announced.");
             return;
          }
-         liveChannel.send(new BackupRegistrationMessage(nodeUUID.toString(), connector));
+         liveChannel.send(new BackupRegistrationMessage(nodeUUID.toString(), connector, configuration.getClusterUser(),
+                                                        configuration.getClusterPassword()));
       }
       else
       {

Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java	2011-11-24 12:24:54 UTC (rev 11754)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java	2011-11-24 12:25:37 UTC (rev 11755)
@@ -1287,7 +1287,7 @@
 
       if (ConfigurationImpl.DEFAULT_CLUSTER_USER.equals(configuration.getClusterUser()) && ConfigurationImpl.DEFAULT_CLUSTER_PASSWORD.equals(configuration.getClusterPassword()))
       {
-         log.warn("Security risk! It has been detected that the cluster admin user and password " + "have not been changed from the installation default. "
+         log.warn("Security risk! HornetQ is running with the default cluster admin user and default password. "
                   + "Please see the HornetQ user guide, cluster chapter, for instructions on how to do this.");
       }
 
@@ -2047,7 +2047,7 @@
             replicationEndpoint.setQuorumManager(quorumManager);
 
             serverLocator0.setReconnectAttempts(-1);
-
+            serverLocator0.addInterceptor(new ReplicationError(HornetQServerImpl.this));
             threadPool.execute(new Runnable()
             {
                @Override
@@ -2064,7 +2064,6 @@
                      CoreRemotingConnection liveConnection = liveServerSessionFactory.getConnection();
                      Channel pingChannel = liveConnection.getChannel(CHANNEL_ID.PING.id, -1);
                      Channel replicationChannel = liveConnection.getChannel(CHANNEL_ID.REPLICATION.id, -1);
-
                      connectToReplicationEndpoint(replicationChannel);
                      replicationEndpoint.start();
                      clusterManager.announceReplicatingBackup(pingChannel);

Added: trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/ReplicationError.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/ReplicationError.java	                        (rev 0)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/ReplicationError.java	2011-11-24 12:25:37 UTC (rev 11755)
@@ -0,0 +1,51 @@
+/**
+ *
+ */
+package org.hornetq.core.server.impl;
+
+import org.hornetq.api.core.HornetQException;
+import org.hornetq.api.core.Interceptor;
+import org.hornetq.core.client.impl.ClientSessionFactoryImpl;
+import org.hornetq.core.logging.Logger;
+import org.hornetq.core.protocol.core.Packet;
+import org.hornetq.core.protocol.core.impl.PacketImpl;
+import org.hornetq.core.server.HornetQServer;
+import org.hornetq.spi.core.protocol.RemotingConnection;
+
+/**
+ * Stops the backup in case of an error at the start of Replication.
+ * <p>
+ * Using an interceptor for the task to avoid a server reference inside of the 'basic' channel-0
+ * handler at {@link ClientSessionFactoryImpl#Channel0Handler}. As {@link ClientSessionFactoryImpl}
+ * is also shipped in the HQ-client JAR (which does not include {@link HornetQServer}).
+ */
+final class ReplicationError implements Interceptor
+{
+   private final HornetQServer server;
+   private static final Logger log = Logger.getLogger(ReplicationError.class);
+
+   public ReplicationError(HornetQServer server)
+   {
+      this.server = server;
+   }
+
+   @Override
+   public boolean intercept(Packet packet, RemotingConnection connection) throws HornetQException
+   {
+      System.out.println(packet);
+      if (packet.getType() != PacketImpl.BACKUP_REGISTRATION_FAILED)
+         return true;
+      log.warn("Failed to register as backup. Stopping the server.");
+      try
+      {
+         server.stop();
+      }
+      catch (Exception e)
+      {
+         throw new HornetQException(HornetQException.INTERNAL_ERROR, "error trying to stop " + server, e);
+      }
+
+      return false;
+   }
+
+}

Added: trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupAuthenticationTest.java
===================================================================
--- trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupAuthenticationTest.java	                        (rev 0)
+++ trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupAuthenticationTest.java	2011-11-24 12:25:37 UTC (rev 11755)
@@ -0,0 +1,79 @@
+/**
+ *
+ */
+package org.hornetq.tests.integration.cluster.failover;
+
+import java.util.Arrays;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.hornetq.api.core.HornetQException;
+import org.hornetq.api.core.Interceptor;
+import org.hornetq.api.core.TransportConfiguration;
+import org.hornetq.core.protocol.core.Packet;
+import org.hornetq.core.protocol.core.impl.PacketImpl;
+import org.hornetq.spi.core.protocol.RemotingConnection;
+import org.hornetq.tests.util.TransportConfigurationUtils;
+
+public class BackupAuthenticationTest extends FailoverTestBase
+{
+   private static CountDownLatch latch;
+   @Override
+   public void setUp() throws Exception
+   {
+      startBackupServer = false;
+      latch = new CountDownLatch(1);
+      super.setUp();
+   }
+
+   public void testPasswordSetting() throws Exception
+   {
+      waitForServer(liveServer.getServer());
+      backupServer.start();
+      assertTrue(latch.await(5, TimeUnit.SECONDS));
+      /*
+       * can't intercept the message at the backup, so we intercept the registration message at the
+       * live.
+       */
+      Thread.sleep(2000);
+      assertFalse("backup should have stopped", backupServer.isStarted());
+      backupConfig.setClusterPassword(CLUSTER_PASSWORD);
+      backupServer.start();
+      waitForServer(backupServer.getServer());
+   }
+
+   @Override
+   protected void createConfigs() throws Exception
+   {
+      createReplicatedConfigs();
+      backupConfig.setClusterPassword("crocodile");
+      liveConfig.setInterceptorClassNames(Arrays.asList(NotifyingInterceptor.class.getName()));
+   }
+
+   @Override
+   protected TransportConfiguration getAcceptorTransportConfiguration(boolean live)
+   {
+      return TransportConfigurationUtils.getInVMAcceptor(live);
+   }
+
+   @Override
+   protected TransportConfiguration getConnectorTransportConfiguration(boolean live)
+   {
+      return TransportConfigurationUtils.getInVMConnector(live);
+   }
+
+   public static final class NotifyingInterceptor implements Interceptor
+   {
+
+      @Override
+      public boolean intercept(Packet packet, RemotingConnection connection) throws HornetQException
+      {
+         System.out.println("intercept? wtf " + packet);
+         if (packet.getType() == PacketImpl.BACKUP_REGISTRATION)
+         {
+            latch.countDown();
+         }
+         return true;
+      }
+   }
+}



More information about the hornetq-commits mailing list