[hornetq-commits] JBoss hornetq SVN: r12298 - in trunk: tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover and 1 other directories.
do-not-reply at jboss.org
do-not-reply at jboss.org
Tue Mar 13 12:45:45 EDT 2012
Author: borges
Date: 2012-03-13 12:45:44 -0400 (Tue, 13 Mar 2012)
New Revision: 12298
Modified:
trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/QuorumFailOverTest.java
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/util/BackupSyncDelay.java
Log:
HORNETQ-720 HORNETQ-776 Fixes
Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java 2012-03-13 16:37:04 UTC (rev 12297)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java 2012-03-13 16:45:44 UTC (rev 12298)
@@ -322,7 +322,7 @@
}
- // lifecycle methods
+ // life-cycle methods
// ----------------------------------------------------------------
/*
@@ -2107,8 +2107,8 @@
}
clusterManager.start();
- final TransportConfiguration config = configuration.getConnectorConfigurations().get(liveConnectorName);
- serverLocator0 = (ServerLocatorInternal)HornetQClient.createServerLocatorWithHA(config);
+ final TransportConfiguration tp = configuration.getConnectorConfigurations().get(liveConnectorName);
+ serverLocator0 = (ServerLocatorInternal)HornetQClient.createServerLocatorWithHA(tp);
quorumManager = new QuorumManager(serverLocator0, threadPool, getIdentity());
replicationEndpoint.setQuorumManager(quorumManager);
@@ -2165,7 +2165,8 @@
QuorumManager.BACKUP_ACTIVATION signal = quorumManager.waitForStatusChange();
serverLocator0.close();
- replicationEndpoint.stop();
+ if (replicationEndpoint != null)
+ replicationEndpoint.stop();
if (failedToConnect || !started || signal == BACKUP_ACTIVATION.STOP)
return;
Modified: trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/QuorumFailOverTest.java
===================================================================
--- trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/QuorumFailOverTest.java 2012-03-13 16:37:04 UTC (rev 12297)
+++ trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/QuorumFailOverTest.java 2012-03-13 16:45:44 UTC (rev 12298)
@@ -13,10 +13,13 @@
public void testQuorumVoting() throws Exception
{
- int[] liveServerIDs = new int[] { 1, 2, 3 };
+ int[] liveServerIDs = new int[] { 0, 1, 2 };
setupCluster();
- startServers(0, 1, 2, 3, 4, 5);
+ startServers(0, 1, 2);
+ // BackupSyncDelay delay = new BackupSyncDelay(servers[4], servers[1],
+ // PacketImpl.REPLICATION_SCHEDULED_FAILOVER);
+ startServers(3, 4, 5);
for (int i : liveServerIDs)
{
@@ -36,6 +39,9 @@
waitForBindings(0, QUEUES_TESTADDRESS, 1, 1, true);
+ send(0, QUEUES_TESTADDRESS, 10, false, null);
+ verifyReceiveRoundRobinInSomeOrder(true, 10, 0, 1, 2);
+
final TopologyListener liveTopologyListener = new TopologyListener("LIVE-1");
locators[0].addClusterTopologyListener(liveTopologyListener);
@@ -52,6 +58,7 @@
assertTrue(servers[3].waitForInitialization(2, TimeUnit.SECONDS));
assertFalse("3 should have failed over ", servers[3].getConfiguration().isBackup());
+
failNode(1);
assertFalse("4 should have failed over ", servers[4].getConfiguration().isBackup());
}
Modified: trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/util/BackupSyncDelay.java
===================================================================
--- trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/util/BackupSyncDelay.java 2012-03-13 16:37:04 UTC (rev 12297)
+++ trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/util/BackupSyncDelay.java 2012-03-13 16:45:44 UTC (rev 12298)
@@ -16,6 +16,7 @@
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationResponseMessage;
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationStartSyncMessage;
import org.hornetq.core.replication.ReplicationEndpoint;
+import org.hornetq.core.server.HornetQServer;
import org.hornetq.spi.core.protocol.RemotingConnection;
/**
@@ -35,26 +36,41 @@
public class BackupSyncDelay implements Interceptor
{
- private final ReplicationChannelHandler handler = new ReplicationChannelHandler();
- private final TestableServer backup;
- private final TestableServer live;
+ private final ReplicationChannelHandler handler;
+ private final HornetQServer backup;
+ private final HornetQServer live;
public void deliverUpToDateMsg()
{
- live.removeInterceptor(this);
+ live.getRemotingService().removeInterceptor(this);
if (backup.isStarted())
handler.deliver();
}
- public BackupSyncDelay(TestableServer backup, TestableServer live)
+ /**
+ * @param backup
+ * @param live
+ * @param packetCode which packet is going to be intercepted.
+ */
+ public BackupSyncDelay(HornetQServer backup, HornetQServer live, byte packetCode)
{
- assert backup.getServer().getConfiguration().isBackup();
- assert !live.getServer().getConfiguration().isBackup();
+ assert backup.getConfiguration().isBackup();
+ assert !live.getConfiguration().isBackup();
this.backup = backup;
this.live = live;
- live.addInterceptor(this);
+ live.getRemotingService().addInterceptor(this);
+ handler = new ReplicationChannelHandler(packetCode);
}
+ /**
+ * @param backupServer
+ * @param liveServer
+ */
+ public BackupSyncDelay(TestableServer backupServer, TestableServer liveServer)
+ {
+ this(backupServer.getServer(), liveServer.getServer(), PacketImpl.REPLICATION_START_FINISH_SYNC);
+ }
+
@Override
public boolean intercept(Packet packet, RemotingConnection connection) throws HornetQException
{
@@ -62,12 +78,12 @@
{
try
{
- ReplicationEndpoint repEnd = backup.getServer().getReplicationEndpoint();
+ ReplicationEndpoint repEnd = backup.getReplicationEndpoint();
handler.addSubHandler(repEnd);
Channel repChannel = repEnd.getChannel();
repChannel.setHandler(handler);
handler.setChannel(repChannel);
- live.removeInterceptor(this);
+ live.getRemotingService().removeInterceptor(this);
}
catch (Exception e)
{
@@ -80,6 +96,10 @@
public static class ReplicationChannelHandler implements ChannelHandler
{
+ public ReplicationChannelHandler(byte type)
+ {
+ this.typeToIntercept = type;
+ }
private ReplicationEndpoint handler;
private Packet onHold;
private Channel channel;
@@ -87,6 +107,7 @@
private volatile boolean delivered;
private boolean receivedUpToDate;
private boolean mustHold = true;
+ private final byte typeToIntercept;
public void addSubHandler(ReplicationEndpoint handler)
{
@@ -134,25 +155,32 @@
@Override
public synchronized void handlePacket(Packet packet)
{
-
if (onHold != null && deliver)
{
deliver();
}
- if (packet.getType() == PacketImpl.REPLICATION_START_FINISH_SYNC && mustHold)
+ if (typeToIntercept == PacketImpl.REPLICATION_START_FINISH_SYNC)
{
- ReplicationStartSyncMessage syncMsg = (ReplicationStartSyncMessage)packet;
- if (syncMsg.isSynchronizationFinished() && !deliver)
+ if (packet.getType() == PacketImpl.REPLICATION_START_FINISH_SYNC && mustHold)
{
- receivedUpToDate = true;
- assert onHold == null;
- onHold = packet;
- PacketImpl response = new ReplicationResponseMessage();
- channel.send(response);
- return;
+ ReplicationStartSyncMessage syncMsg = (ReplicationStartSyncMessage)packet;
+ if (syncMsg.isSynchronizationFinished() && !deliver)
+ {
+ receivedUpToDate = true;
+ assert onHold == null;
+ onHold = packet;
+ PacketImpl response = new ReplicationResponseMessage();
+ channel.send(response);
+ return;
+ }
}
}
+ else if (typeToIntercept == packet.getType())
+ {
+ channel.send(new ReplicationResponseMessage());
+ return;
+ }
handler.handlePacket(packet);
}
@@ -164,11 +192,6 @@
private final Channel channel;
- /**
- * @param connection
- * @param id
- * @param confWindowSize
- */
public ChannelWrapper(Channel channel)
{
this.channel = channel;
More information about the hornetq-commits
mailing list