Author: borges
Date: 2011-08-10 06:29:24 -0400 (Wed, 10 Aug 2011)
New Revision: 11177
Modified:
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupJournalSyncTest.java
Log:
HORNETQ-720 Fix system to delay "up-to-date" msg.
Modified:
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupJournalSyncTest.java
===================================================================
---
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupJournalSyncTest.java 2011-08-10
10:28:45 UTC (rev 11176)
+++
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupJournalSyncTest.java 2011-08-10
10:29:24 UTC (rev 11177)
@@ -2,6 +2,7 @@
import java.util.HashSet;
import java.util.Set;
+import java.util.concurrent.locks.Lock;
import org.hornetq.api.core.HornetQException;
import org.hornetq.api.core.Interceptor;
@@ -16,9 +17,12 @@
import org.hornetq.core.persistence.impl.journal.JournalStorageManager;
import org.hornetq.core.protocol.core.Channel;
import org.hornetq.core.protocol.core.ChannelHandler;
+import org.hornetq.core.protocol.core.CommandConfirmationHandler;
+import org.hornetq.core.protocol.core.CoreRemotingConnection;
import org.hornetq.core.protocol.core.Packet;
import org.hornetq.core.protocol.core.impl.PacketImpl;
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationJournalFileMessage;
+import org.hornetq.core.protocol.core.impl.wireformat.ReplicationResponseMessage;
import org.hornetq.core.replication.ReplicationEndpoint;
import org.hornetq.core.server.HornetQServer;
import org.hornetq.spi.core.protocol.RemotingConnection;
@@ -209,6 +213,8 @@
handler.addSubHandler(repEnd);
Channel repChannel = repEnd.getChannel();
repChannel.setHandler(handler);
+ handler.setChannel(repChannel);
+ liveServer.removeInterceptor(this);
}
catch (Exception e)
{
@@ -223,33 +229,217 @@
private static class ReplicationChannelHandler implements ChannelHandler
{
- private ChannelHandler handler;
+ private ReplicationEndpoint handler;
private Packet onHold;
+ private Channel channel;
public volatile boolean deliver;
- public void addSubHandler(ChannelHandler handler)
+ public void addSubHandler(ReplicationEndpoint handler)
{
this.handler = handler;
}
+ public void setChannel(Channel channel)
+ {
+ this.channel = channel;
+ }
+
@Override
public void handlePacket(Packet packet)
{
+
if (onHold != null && deliver)
{
- handler.handlePacket(onHold);
+ // Use wrapper to avoid sending a response
+ ChannelWrapper wrapper = new ChannelWrapper(channel);
+ handler.setChannel(wrapper);
+ try
+ {
+ handler.handlePacket(onHold);
+ }
+ finally
+ {
+ handler.setChannel(channel);
+ onHold = null;
+ }
}
+
if (packet.getType() == PacketImpl.REPLICATION_SYNC)
{
ReplicationJournalFileMessage syncMsg =
(ReplicationJournalFileMessage)packet;
if (syncMsg.isUpToDate())
{
+ assert onHold == null;
onHold = packet;
+ PacketImpl response = new ReplicationResponseMessage();
+ channel.send(response);
return;
}
}
+
handler.handlePacket(packet);
}
}
+
+ private static class ChannelWrapper implements Channel
+ {
+
+ private final Channel channel;
+
+ /**
+ * @param connection
+ * @param id
+ * @param confWindowSize
+ */
+ public ChannelWrapper(Channel channel)
+ {
+ this.channel = channel;
+ }
+
+ @Override
+ public String toString()
+ {
+ return "ChannelWrapper(" + channel + ")";
+ }
+
+ @Override
+ public long getID()
+ {
+ return channel.getID();
+ }
+
+ @Override
+ public void send(Packet packet)
+ {
+ // no-op
+ // channel.send(packet);
+ }
+
+ @Override
+ public void sendBatched(Packet packet)
+ {
+ throw new UnsupportedOperationException();
+
+ }
+
+ @Override
+ public void sendAndFlush(Packet packet)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Packet sendBlocking(Packet packet) throws HornetQException
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void setHandler(ChannelHandler handler)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void close()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void transferConnection(CoreRemotingConnection newConnection)
+ {
+ throw new UnsupportedOperationException();
+
+ }
+
+ @Override
+ public void replayCommands(int lastConfirmedCommandID)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public int getLastConfirmedCommandID()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void lock()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void unlock()
+ {
+ throw new UnsupportedOperationException();
+
+ }
+
+ @Override
+ public void returnBlocking()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Lock getLock()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public CoreRemotingConnection getConnection()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void confirm(Packet packet)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void setCommandConfirmationHandler(CommandConfirmationHandler handler)
+ {
+ throw new UnsupportedOperationException();
+
+ }
+
+ @Override
+ public void flushConfirmations()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void handlePacket(Packet packet)
+ {
+ throw new UnsupportedOperationException();
+
+ }
+
+ @Override
+ public void clearCommands()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public int getConfirmationWindowSize()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void setTransferring(boolean transferring)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ }
}