Author: borges
Date: 2011-08-05 11:00:17 -0400 (Fri, 05 Aug 2011)
New Revision: 11141
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/ReplicationJournalFileMessage.java
branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/IOCompletion.java
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupJournalSyncTest.java
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java
Log:
HORNETQ-720 Control when backup reaches "up-to-date" status during tests
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/ReplicationJournalFileMessage.java
===================================================================
---
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/ReplicationJournalFileMessage.java 2011-08-05
14:59:02 UTC (rev 11140)
+++
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/ReplicationJournalFileMessage.java 2011-08-05
15:00:17 UTC (rev 11141)
@@ -21,7 +21,7 @@
private JournalContent journalType;
/** This value refers to {@link org.hornetq.core.journal.impl.JournalFile#getFileID()}
*/
private long fileId;
- private boolean backupIsUpToDate = false;
+ private boolean backupIsUpToDate;
private byte[] byteArray;
public ReplicationJournalFileMessage()
@@ -33,6 +33,7 @@
{
this();
this.fileId = id;
+ this.backupIsUpToDate = id == -1;
this.dataSize = size;
this.data = buffer;
this.journalType = content;
Modified:
branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/IOCompletion.java
===================================================================
---
branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/IOCompletion.java 2011-08-05
14:59:02 UTC (rev 11140)
+++
branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/IOCompletion.java 2011-08-05
15:00:17 UTC (rev 11141)
@@ -23,4 +23,4 @@
public interface IOCompletion extends IOAsyncTask
{
void storeLineUp();
-}
+}
\ No newline at end of file
Modified:
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupJournalSyncTest.java
===================================================================
---
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupJournalSyncTest.java 2011-08-05
14:59:02 UTC (rev 11140)
+++
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupJournalSyncTest.java 2011-08-05
15:00:17 UTC (rev 11141)
@@ -4,6 +4,7 @@
import java.util.Set;
import org.hornetq.api.core.HornetQException;
+import org.hornetq.api.core.Interceptor;
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.api.core.client.ClientConsumer;
import org.hornetq.api.core.client.ClientProducer;
@@ -13,7 +14,14 @@
import org.hornetq.core.journal.impl.JournalFile;
import org.hornetq.core.journal.impl.JournalImpl;
import org.hornetq.core.persistence.impl.journal.JournalStorageManager;
+import org.hornetq.core.protocol.core.Channel;
+import org.hornetq.core.protocol.core.ChannelHandler;
+import org.hornetq.core.protocol.core.Packet;
+import org.hornetq.core.protocol.core.impl.PacketImpl;
+import org.hornetq.core.protocol.core.impl.wireformat.ReplicationJournalFileMessage;
+import org.hornetq.core.replication.ReplicationEndpoint;
import org.hornetq.core.server.HornetQServer;
+import org.hornetq.spi.core.protocol.RemotingConnection;
import org.hornetq.tests.integration.cluster.util.TestableServer;
import org.hornetq.tests.util.TransportConfigurationUtils;
@@ -24,6 +32,7 @@
private ClientSessionFactoryInternal sessionFactory;
private ClientSession session;
private ClientProducer producer;
+ private ReplicationChannelHandler handler;
private static final int N_MSGS = 100;
@Override
@@ -49,6 +58,8 @@
public void testReserveFileIdValuesOnBackup() throws Exception
{
+ handler = new ReplicationChannelHandler();
+ liveServer.addInterceptor(new BackupSyncDelay(handler));
createProducerSendSomeMessages();
JournalImpl messageJournal = getMessageJournalFromServer(liveServer);
for (int i = 0; i < 5; i++)
@@ -57,11 +68,14 @@
sendMessages(session, producer, N_MSGS);
}
backupServer.start();
- waitForBackup(sessionFactory, 10);
+ waitForBackup(sessionFactory, 10, false);
// SEND more messages, now with the backup replicating
sendMessages(session, producer, N_MSGS);
+ handler.notifyAll();
+ waitForBackup(sessionFactory, 10, true);
+
Set<Long> liveIds = getFileIds(messageJournal);
assertFalse("should not be initialized",
backupServer.getServer().isInitialised());
crash(session);
@@ -146,6 +160,10 @@
@Override
protected void tearDown() throws Exception
{
+ if (handler != null)
+ {
+ handler.notifyAll();
+ }
if (sessionFactory != null)
sessionFactory.close();
if (session != null)
@@ -173,4 +191,71 @@
return TransportConfigurationUtils.getInVMConnector(live);
}
+ private class BackupSyncDelay implements Interceptor
+ {
+
+ private final ReplicationChannelHandler handler;
+
+ public BackupSyncDelay(ReplicationChannelHandler handler)
+ {
+ this.handler = handler;
+ // TODO Auto-generated constructor stub
+ }
+
+ @Override
+ public boolean intercept(Packet packet, RemotingConnection connection) throws
HornetQException
+ {
+ if (packet.getType() == PacketImpl.HA_BACKUP_REGISTRATION)
+ {
+ try
+ {
+ ReplicationEndpoint repEnd =
backupServer.getServer().getReplicationEndpoint();
+ handler.addSubHandler(repEnd);
+ Channel repChannel = repEnd.getChannel();
+ repChannel.setHandler(handler);
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+ return true;
+ }
+
+ }
+
+ private static class ReplicationChannelHandler implements ChannelHandler
+ {
+
+ private ChannelHandler handler;
+
+ public void addSubHandler(ChannelHandler handler)
+ {
+ this.handler = handler;
+ }
+
+ @Override
+ public void handlePacket(Packet packet)
+ {
+ System.out.println(packet);
+ if (packet.getType() == PacketImpl.REPLICATION_SYNC)
+ {
+ ReplicationJournalFileMessage syncMsg =
(ReplicationJournalFileMessage)packet;
+ if (syncMsg.isUpToDate())
+ {
+ // Hold the message that notifies the backup that sync is done.
+ try
+ {
+ wait();
+ }
+ catch (InterruptedException e)
+ {
+ // no-op
+ }
+ }
+ }
+ handler.handlePacket(packet);
+ }
+
+ }
}
Modified:
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java
===================================================================
---
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java 2011-08-05
14:59:02 UTC (rev 11140)
+++
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTestBase.java 2011-08-05
15:00:17 UTC (rev 11141)
@@ -240,12 +240,22 @@
*/
protected void waitForBackup(ClientSessionFactoryInternal sessionFactory, long
seconds) throws Exception
{
+ waitForBackup(sessionFactory, seconds, true);
+ }
+
+ /**
+ * @param sessionFactory
+ * @param seconds
+ * @param waitForSync whether to wait for sync'ing data with the live to finish or
not
+ */
+ protected void waitForBackup(ClientSessionFactoryInternal sessionFactory, long
seconds, boolean waitForSync)
+ {
final long toWait = seconds * 1000;
final long time = System.currentTimeMillis();
final HornetQServerImpl actualServer =
(HornetQServerImpl)backupServer.getServer();
while (true)
{
- if (sessionFactory.getBackupConnector() != null &&
actualServer.isRemoteBackupUpToDate())
+ if (sessionFactory.getBackupConnector() != null &&
(actualServer.isRemoteBackupUpToDate() || !waitForSync))
{
break;
}
@@ -263,9 +273,6 @@
//ignore
}
}
-
- System.out.println("Backup server state: [started=" +
actualServer.isStarted() + ", upToDate=" +
- actualServer.isRemoteBackupUpToDate() + "]");
}
protected TransportConfiguration getNettyAcceptorTransportConfiguration(final boolean
live)
Show replies by date