[hornetq-commits] JBoss hornetq SVN: r12298 - in trunk: tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover and 1 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Tue Mar 13 12:45:45 EDT 2012


Author: borges
Date: 2012-03-13 12:45:44 -0400 (Tue, 13 Mar 2012)
New Revision: 12298

Modified:
   trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java
   trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/QuorumFailOverTest.java
   trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/util/BackupSyncDelay.java
Log:
HORNETQ-720 HORNETQ-776 Fixes

Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java	2012-03-13 16:37:04 UTC (rev 12297)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java	2012-03-13 16:45:44 UTC (rev 12298)
@@ -322,7 +322,7 @@
 
    }
 
-   // lifecycle methods
+   // life-cycle methods
    // ----------------------------------------------------------------
 
    /*
@@ -2107,8 +2107,8 @@
             }
             clusterManager.start();
 
-            final TransportConfiguration config = configuration.getConnectorConfigurations().get(liveConnectorName);
-            serverLocator0 = (ServerLocatorInternal)HornetQClient.createServerLocatorWithHA(config);
+            final TransportConfiguration tp = configuration.getConnectorConfigurations().get(liveConnectorName);
+            serverLocator0 = (ServerLocatorInternal)HornetQClient.createServerLocatorWithHA(tp);
             quorumManager = new QuorumManager(serverLocator0, threadPool, getIdentity());
             replicationEndpoint.setQuorumManager(quorumManager);
 
@@ -2165,7 +2165,8 @@
             QuorumManager.BACKUP_ACTIVATION signal = quorumManager.waitForStatusChange();
 
             serverLocator0.close();
-            replicationEndpoint.stop();
+            if (replicationEndpoint != null)
+               replicationEndpoint.stop();
 
             if (failedToConnect || !started || signal == BACKUP_ACTIVATION.STOP)
                return;

Modified: trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/QuorumFailOverTest.java
===================================================================
--- trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/QuorumFailOverTest.java	2012-03-13 16:37:04 UTC (rev 12297)
+++ trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/QuorumFailOverTest.java	2012-03-13 16:45:44 UTC (rev 12298)
@@ -13,10 +13,13 @@
 
    public void testQuorumVoting() throws Exception
    {
-      int[] liveServerIDs = new int[] { 1, 2, 3 };
+      int[] liveServerIDs = new int[] { 0, 1, 2 };
       setupCluster();
 
-      startServers(0, 1, 2, 3, 4, 5);
+      startServers(0, 1, 2);
+      // BackupSyncDelay delay = new BackupSyncDelay(servers[4], servers[1],
+      // PacketImpl.REPLICATION_SCHEDULED_FAILOVER);
+      startServers(3, 4, 5);
 
       for (int i : liveServerIDs)
       {
@@ -36,6 +39,9 @@
 
       waitForBindings(0, QUEUES_TESTADDRESS, 1, 1, true);
 
+      send(0, QUEUES_TESTADDRESS, 10, false, null);
+      verifyReceiveRoundRobinInSomeOrder(true, 10, 0, 1, 2);
+
       final TopologyListener liveTopologyListener = new TopologyListener("LIVE-1");
 
       locators[0].addClusterTopologyListener(liveTopologyListener);
@@ -52,6 +58,7 @@
 
       assertTrue(servers[3].waitForInitialization(2, TimeUnit.SECONDS));
       assertFalse("3 should have failed over ", servers[3].getConfiguration().isBackup());
+
       failNode(1);
       assertFalse("4 should have failed over ", servers[4].getConfiguration().isBackup());
    }

Modified: trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/util/BackupSyncDelay.java
===================================================================
--- trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/util/BackupSyncDelay.java	2012-03-13 16:37:04 UTC (rev 12297)
+++ trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/util/BackupSyncDelay.java	2012-03-13 16:45:44 UTC (rev 12298)
@@ -16,6 +16,7 @@
 import org.hornetq.core.protocol.core.impl.wireformat.ReplicationResponseMessage;
 import org.hornetq.core.protocol.core.impl.wireformat.ReplicationStartSyncMessage;
 import org.hornetq.core.replication.ReplicationEndpoint;
+import org.hornetq.core.server.HornetQServer;
 import org.hornetq.spi.core.protocol.RemotingConnection;
 
 /**
@@ -35,26 +36,41 @@
 public class BackupSyncDelay implements Interceptor
 {
 
-   private final ReplicationChannelHandler handler = new ReplicationChannelHandler();
-   private final TestableServer backup;
-   private final TestableServer live;
+   private final ReplicationChannelHandler handler;
+   private final HornetQServer backup;
+   private final HornetQServer live;
 
    public void deliverUpToDateMsg()
    {
-      live.removeInterceptor(this);
+      live.getRemotingService().removeInterceptor(this);
       if (backup.isStarted())
          handler.deliver();
    }
 
-   public BackupSyncDelay(TestableServer backup, TestableServer live)
+   /**
+    * @param backup
+    * @param live
+    * @param packetCode which packet is going to be intercepted.
+    */
+   public BackupSyncDelay(HornetQServer backup, HornetQServer live, byte packetCode)
    {
-      assert backup.getServer().getConfiguration().isBackup();
-      assert !live.getServer().getConfiguration().isBackup();
+      assert backup.getConfiguration().isBackup();
+      assert !live.getConfiguration().isBackup();
       this.backup = backup;
       this.live = live;
-      live.addInterceptor(this);
+      live.getRemotingService().addInterceptor(this);
+      handler = new ReplicationChannelHandler(packetCode);
    }
 
+   /**
+    * @param backupServer
+    * @param liveServer
+    */
+   public BackupSyncDelay(TestableServer backupServer, TestableServer liveServer)
+   {
+      this(backupServer.getServer(), liveServer.getServer(), PacketImpl.REPLICATION_START_FINISH_SYNC);
+   }
+
    @Override
    public boolean intercept(Packet packet, RemotingConnection connection) throws HornetQException
    {
@@ -62,12 +78,12 @@
       {
          try
          {
-            ReplicationEndpoint repEnd = backup.getServer().getReplicationEndpoint();
+            ReplicationEndpoint repEnd = backup.getReplicationEndpoint();
             handler.addSubHandler(repEnd);
             Channel repChannel = repEnd.getChannel();
             repChannel.setHandler(handler);
             handler.setChannel(repChannel);
-            live.removeInterceptor(this);
+            live.getRemotingService().removeInterceptor(this);
          }
          catch (Exception e)
          {
@@ -80,6 +96,10 @@
    public static class ReplicationChannelHandler implements ChannelHandler
    {
 
+      public ReplicationChannelHandler(byte type)
+      {
+         this.typeToIntercept = type;
+      }
       private ReplicationEndpoint handler;
       private Packet onHold;
       private Channel channel;
@@ -87,6 +107,7 @@
       private volatile boolean delivered;
       private boolean receivedUpToDate;
       private boolean mustHold = true;
+      private final byte typeToIntercept;
 
       public void addSubHandler(ReplicationEndpoint handler)
       {
@@ -134,25 +155,32 @@
       @Override
       public synchronized void handlePacket(Packet packet)
       {
-
          if (onHold != null && deliver)
          {
             deliver();
          }
 
-         if (packet.getType() == PacketImpl.REPLICATION_START_FINISH_SYNC && mustHold)
+         if (typeToIntercept == PacketImpl.REPLICATION_START_FINISH_SYNC)
          {
-            ReplicationStartSyncMessage syncMsg = (ReplicationStartSyncMessage)packet;
-            if (syncMsg.isSynchronizationFinished() && !deliver)
+            if (packet.getType() == PacketImpl.REPLICATION_START_FINISH_SYNC && mustHold)
             {
-               receivedUpToDate = true;
-               assert onHold == null;
-               onHold = packet;
-               PacketImpl response = new ReplicationResponseMessage();
-               channel.send(response);
-               return;
+               ReplicationStartSyncMessage syncMsg = (ReplicationStartSyncMessage)packet;
+               if (syncMsg.isSynchronizationFinished() && !deliver)
+               {
+                  receivedUpToDate = true;
+                  assert onHold == null;
+                  onHold = packet;
+                  PacketImpl response = new ReplicationResponseMessage();
+                  channel.send(response);
+                  return;
+               }
             }
          }
+         else if (typeToIntercept == packet.getType())
+         {
+            channel.send(new ReplicationResponseMessage());
+            return;
+         }
 
          handler.handlePacket(packet);
       }
@@ -164,11 +192,6 @@
 
       private final Channel channel;
 
-      /**
-       * @param connection
-       * @param id
-       * @param confWindowSize
-       */
       public ChannelWrapper(Channel channel)
       {
          this.channel = channel;



More information about the hornetq-commits mailing list