[hornetq-commits] JBoss hornetq SVN: r11177 - branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover.

do-not-reply at jboss.org do-not-reply at jboss.org
Wed Aug 10 06:29:24 EDT 2011


Author: borges
Date: 2011-08-10 06:29:24 -0400 (Wed, 10 Aug 2011)
New Revision: 11177

Modified:
   branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupJournalSyncTest.java
Log:
HORNETQ-720 Fix system to delay "up-to-date" msg.

Modified: branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupJournalSyncTest.java
===================================================================
--- branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupJournalSyncTest.java	2011-08-10 10:28:45 UTC (rev 11176)
+++ branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupJournalSyncTest.java	2011-08-10 10:29:24 UTC (rev 11177)
@@ -2,6 +2,7 @@
 
 import java.util.HashSet;
 import java.util.Set;
+import java.util.concurrent.locks.Lock;
 
 import org.hornetq.api.core.HornetQException;
 import org.hornetq.api.core.Interceptor;
@@ -16,9 +17,12 @@
 import org.hornetq.core.persistence.impl.journal.JournalStorageManager;
 import org.hornetq.core.protocol.core.Channel;
 import org.hornetq.core.protocol.core.ChannelHandler;
+import org.hornetq.core.protocol.core.CommandConfirmationHandler;
+import org.hornetq.core.protocol.core.CoreRemotingConnection;
 import org.hornetq.core.protocol.core.Packet;
 import org.hornetq.core.protocol.core.impl.PacketImpl;
 import org.hornetq.core.protocol.core.impl.wireformat.ReplicationJournalFileMessage;
+import org.hornetq.core.protocol.core.impl.wireformat.ReplicationResponseMessage;
 import org.hornetq.core.replication.ReplicationEndpoint;
 import org.hornetq.core.server.HornetQServer;
 import org.hornetq.spi.core.protocol.RemotingConnection;
@@ -209,6 +213,8 @@
                handler.addSubHandler(repEnd);
                Channel repChannel = repEnd.getChannel();
                repChannel.setHandler(handler);
+               handler.setChannel(repChannel);
+               liveServer.removeInterceptor(this);
             }
             catch (Exception e)
             {
@@ -223,33 +229,217 @@
    private static class ReplicationChannelHandler implements ChannelHandler
    {
 
-      private ChannelHandler handler;
+      private ReplicationEndpoint handler;
       private Packet onHold;
+      private Channel channel;
       public volatile boolean deliver;
 
-      public void addSubHandler(ChannelHandler handler)
+      public void addSubHandler(ReplicationEndpoint handler)
       {
          this.handler = handler;
       }
 
+      public void setChannel(Channel channel)
+      {
+         this.channel = channel;
+      }
+
       @Override
       public void handlePacket(Packet packet)
       {
+
          if (onHold != null && deliver)
          {
-            handler.handlePacket(onHold);
+            // Use wrapper to avoid sending a response
+            ChannelWrapper wrapper = new ChannelWrapper(channel);
+            handler.setChannel(wrapper);
+            try
+            {
+               handler.handlePacket(onHold);
+            }
+            finally
+            {
+               handler.setChannel(channel);
+               onHold = null;
+            }
          }
+
          if (packet.getType() == PacketImpl.REPLICATION_SYNC)
          {
             ReplicationJournalFileMessage syncMsg = (ReplicationJournalFileMessage)packet;
             if (syncMsg.isUpToDate())
             {
+               assert onHold == null;
                onHold = packet;
+               PacketImpl response = new ReplicationResponseMessage();
+               channel.send(response);
                return;
             }
          }
+
          handler.handlePacket(packet);
       }
 
    }
+
+   private static class ChannelWrapper implements Channel
+   {
+
+      private final Channel channel;
+
+      /**
+       * @param connection
+       * @param id
+       * @param confWindowSize
+       */
+      public ChannelWrapper(Channel channel)
+      {
+         this.channel = channel;
+      }
+
+      @Override
+      public String toString()
+      {
+         return "ChannelWrapper(" + channel + ")";
+      }
+
+      @Override
+      public long getID()
+      {
+         return channel.getID();
+      }
+
+      @Override
+      public void send(Packet packet)
+      {
+         // no-op
+         // channel.send(packet);
+      }
+
+      @Override
+      public void sendBatched(Packet packet)
+      {
+         throw new UnsupportedOperationException();
+
+      }
+
+      @Override
+      public void sendAndFlush(Packet packet)
+      {
+         throw new UnsupportedOperationException();
+      }
+
+      @Override
+      public Packet sendBlocking(Packet packet) throws HornetQException
+      {
+         throw new UnsupportedOperationException();
+      }
+
+      @Override
+      public void setHandler(ChannelHandler handler)
+      {
+         throw new UnsupportedOperationException();
+      }
+
+      @Override
+      public void close()
+      {
+         throw new UnsupportedOperationException();
+      }
+
+      @Override
+      public void transferConnection(CoreRemotingConnection newConnection)
+      {
+         throw new UnsupportedOperationException();
+
+      }
+
+      @Override
+      public void replayCommands(int lastConfirmedCommandID)
+      {
+         throw new UnsupportedOperationException();
+      }
+
+      @Override
+      public int getLastConfirmedCommandID()
+      {
+         throw new UnsupportedOperationException();
+      }
+
+      @Override
+      public void lock()
+      {
+         throw new UnsupportedOperationException();
+      }
+
+      @Override
+      public void unlock()
+      {
+         throw new UnsupportedOperationException();
+
+      }
+
+      @Override
+      public void returnBlocking()
+      {
+         throw new UnsupportedOperationException();
+      }
+
+      @Override
+      public Lock getLock()
+      {
+         throw new UnsupportedOperationException();
+      }
+
+      @Override
+      public CoreRemotingConnection getConnection()
+      {
+         throw new UnsupportedOperationException();
+      }
+
+      @Override
+      public void confirm(Packet packet)
+      {
+         throw new UnsupportedOperationException();
+      }
+
+      @Override
+      public void setCommandConfirmationHandler(CommandConfirmationHandler handler)
+      {
+         throw new UnsupportedOperationException();
+
+      }
+
+      @Override
+      public void flushConfirmations()
+      {
+         throw new UnsupportedOperationException();
+      }
+
+      @Override
+      public void handlePacket(Packet packet)
+      {
+         throw new UnsupportedOperationException();
+
+      }
+
+      @Override
+      public void clearCommands()
+      {
+         throw new UnsupportedOperationException();
+      }
+
+      @Override
+      public int getConfirmationWindowSize()
+      {
+         throw new UnsupportedOperationException();
+      }
+
+      @Override
+      public void setTransferring(boolean transferring)
+      {
+         throw new UnsupportedOperationException();
+      }
+
+   }
 }



More information about the hornetq-commits mailing list