Author: borges
Date: 2011-07-29 13:27:23 -0400 (Fri, 29 Jul 2011)
New Revision: 11070
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/ReplicationJournalFileMessage.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/ReplicationManager.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java
branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/JournalFilesRepository.java
branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/JournalImpl.java
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupJournalSyncTest.java
Log:
HORNETQ-720 Add data file sync code and backupServer may only start if journals are
sync'ed.
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
---
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2011-07-29
17:25:36 UTC (rev 11069)
+++
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2011-07-29
17:27:23 UTC (rev 11070)
@@ -343,10 +343,7 @@
}
/**
- * XXX FIXME Method ignores the synchronization of LargeMessages and Paging.
- * <p>
- * XXX A second version improvement would be to allow new operations to be sent to the
backup,
- * while we synchronize the existing logs.
+ * XXX FIXME HORNETQ-720 Method ignores the synchronization of LargeMessages and
Paging.
* @param replicationManager
* @throws HornetQException
*/
@@ -354,7 +351,7 @@
{
if (!started)
{
- throw new IllegalStateException("must be started...");
+ throw new IllegalStateException("JournalStorageManager must be
started...");
}
assert replicationManager != null;
replicator = replicationManager;
@@ -364,7 +361,7 @@
throw new HornetQException(HornetQException.INTERNAL_ERROR,
"journals here are not JournalImpl. You
can't set a replicator!");
}
- // XXX NEED to take a global lock on the StorageManager.
+ // XXX HORNETQ-720 WRITE LOCK the StorageManager.
final JournalImpl localMessageJournal = (JournalImpl)messageJournal;
final JournalImpl localBindingsJournal = (JournalImpl)bindingsJournal;
@@ -378,14 +375,15 @@
localMessageJournal.writeUnlock();
localBindingsJournal.writeUnlock();
-
bindingsJournal = new ReplicatedJournal(((byte)0), localBindingsJournal,
replicator);
messageJournal = new ReplicatedJournal((byte)1, localMessageJournal, replicator);
+ // XXX HORNETQ-720 UNLOCK StorageManager...
+
sendJournalFile(messageFiles, JournalContent.MESSAGES);
sendJournalFile(bindingsFiles, JournalContent.BINDINGS);
- // SEND "SYNC_DONE" msg to backup telling it can become operational.
+ replicator.sendSynchronizationDone();
}
/**
@@ -408,8 +406,8 @@
{
journal.setAutoReclaim(false);
/*
- * XXX need to check whether it is safe to proceed if compacting is running
(specially at the
- * end of it)
+ * XXX HORNETQ-720 need to check whether it is safe to proceed if compacting is
running
+ * (specially at the end of it)
*/
journal.forceMoveNextFile();
JournalFile[] datafiles = journal.getDataFiles();
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/ReplicationJournalFileMessage.java
===================================================================
---
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/ReplicationJournalFileMessage.java 2011-07-29
17:25:36 UTC (rev 11069)
+++
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/ReplicationJournalFileMessage.java 2011-07-29
17:27:23 UTC (rev 11070)
@@ -5,7 +5,11 @@
import org.hornetq.core.protocol.core.impl.PacketImpl;
/**
- * Used to copy JournalFile data over to the backup during synchronization.
+ * Message is used to:
+ * <ol>
+ * <li>copy JournalFile data over to the backup during synchronization;
+ * <li>send a up-to-date signal to backup;
+ * </ol>
*/
public final class ReplicationJournalFileMessage extends PacketImpl
{
@@ -13,7 +17,9 @@
private byte[] data;
private int dataSize;
private JournalContent journalType;
+ /** This value refers to {@link org.hornetq.core.journal.impl.JournalFile#getFileID()}
*/
private long fileId;
+ private boolean backupIsUpToDate = false;
public ReplicationJournalFileMessage()
{
@@ -33,18 +39,56 @@
public void encodeRest(final HornetQBuffer buffer)
{
buffer.writeLong(fileId);
+ if (fileId == -1)
+ return;
buffer.writeByte(journalType.typeByte);
buffer.writeInt(dataSize);
- buffer.writeBytes(data, 0, dataSize);
+ // sending -1 will close the file
+ if (dataSize > -1)
+ {
+ buffer.writeBytes(data, 0, dataSize);
+ }
}
@Override
public void decodeRest(final HornetQBuffer buffer)
{
fileId = buffer.readLong();
+ if (fileId == -1)
+ {
+ backupIsUpToDate = true;
+ return;
+ }
journalType = JournalContent.getType(buffer.readByte());
int size = buffer.readInt();
- data = new byte[size];
- buffer.readBytes(data);
+ if (size > -1)
+ {
+ data = new byte[size];
+ buffer.readBytes(data);
+ }
}
+
+ public long getFileId()
+ {
+ return fileId;
+ }
+
+ public byte[] getData()
+ {
+ return data;
+ }
+
+ public JournalContent getJournalContent()
+ {
+ return journalType;
+ }
+
+ /**
+ * @return {@code true} if the live has finished synchronizing its data and the backup
is
+ * therefore up-to-date, {@code false} otherwise.
+ */
+ public boolean isUpToDate()
+ {
+ return backupIsUpToDate;
+ }
}
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/ReplicationManager.java
===================================================================
---
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/ReplicationManager.java 2011-07-29
17:25:36 UTC (rev 11069)
+++
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/ReplicationManager.java 2011-07-29
17:27:23 UTC (rev 11070)
@@ -101,4 +101,11 @@
*/
void reserveFileIds(JournalFile[] datafiles, JournalContent contentType) throws
HornetQException;
+ /**
+ * Informs backup that data synchronization is done.
+ * <p>
+ * So if 'live' fails, the (up-to-date) backup now may take over its duties.
+ */
+ void sendSynchronizationDone();
+
}
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
===================================================================
---
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java 2011-07-29
17:25:36 UTC (rev 11069)
+++
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java 2011-07-29
17:27:23 UTC (rev 11070)
@@ -13,6 +13,7 @@
package org.hornetq.core.replication.impl;
+import java.nio.ByteBuffer;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@@ -21,6 +22,8 @@
import org.hornetq.core.config.Configuration;
import org.hornetq.core.journal.Journal;
import org.hornetq.core.journal.JournalLoadInformation;
+import org.hornetq.core.journal.SequentialFile;
+import org.hornetq.core.journal.impl.JournalFile;
import org.hornetq.core.journal.impl.JournalImpl;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.paging.Page;
@@ -42,6 +45,7 @@
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationDeleteMessage;
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationDeleteTXMessage;
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationFileIdMessage;
+import org.hornetq.core.protocol.core.impl.wireformat.ReplicationJournalFileMessage;
import
org.hornetq.core.protocol.core.impl.wireformat.ReplicationLargeMessageBeingMessage;
import
org.hornetq.core.protocol.core.impl.wireformat.ReplicationLargeMessageWriteMessage;
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationLargemessageEndMessage;
@@ -50,15 +54,12 @@
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationPrepareMessage;
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationResponseMessage;
import org.hornetq.core.replication.ReplicationEndpoint;
-import org.hornetq.core.server.HornetQServer;
import org.hornetq.core.server.LargeServerMessage;
import org.hornetq.core.server.ServerMessage;
+import org.hornetq.core.server.impl.HornetQServerImpl;
/**
- *
* @author <mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
- *
- *
*/
public class ReplicationEndpointImpl implements ReplicationEndpoint
{
@@ -71,7 +72,7 @@
private static final boolean trace = log.isTraceEnabled();
- private final HornetQServer server;
+ private final HornetQServerImpl server;
private Channel channel;
@@ -94,7 +95,7 @@
private boolean started;
// Constructors --------------------------------------------------
- public ReplicationEndpointImpl(final HornetQServer server)
+ public ReplicationEndpointImpl(final HornetQServerImpl server)
{
this.server = server;
}
@@ -184,6 +185,10 @@
{
handleJournalFileIdReservation((ReplicationFileIdMessage)packet);
}
+ else if (type == PacketImpl.REPLICATION_SYNC)
+ {
+ handleReplicationSynchronization((ReplicationJournalFileMessage)packet);
+ }
else
{
log.warn("Packet " + packet + " can't be processed by the
ReplicationEndpoint");
@@ -362,21 +367,63 @@
// Private -------------------------------------------------------
- private void handleJournalFileIdReservation(final ReplicationFileIdMessage packet)
throws HornetQException
+ private void handleReplicationSynchronization(ReplicationJournalFileMessage msg)
throws Exception
{
- final Journal journalIf = journals[packet.getJournalContentType().typeByte];
- if (journalIf.isStarted())
+ if (msg.isUpToDate())
{
- throw new HornetQException(HornetQException.INTERNAL_ERROR, "Journal can
not be started!");
+ // XXX HORNETQ-720 must reload journals(?)
+ for (Journal j : journals)
+ {
+ JournalImpl journal = (JournalImpl)j;
+ journal.finishRemoteBackupSync();
+ }
+ server.setRemoteBackupUpToDate(true);
+ return;
}
+ Journal journalIf = getJournal(msg.getJournalContent().typeByte);
+ JournalImpl journal = assertJournalImpl(journalIf);
+
+ long id = msg.getFileId();
+ JournalFile journalFile = journal.getRemoteBackupSyncFile(id);
+ byte[] data = msg.getData();
+ if (data == null)
+ {
+ journalFile.getFile().close();
+ }
+ else
+ {
+ SequentialFile sf = journalFile.getFile();
+ if (!sf.isOpen())
+ {
+ sf.open(1, false);
+ }
+ sf.writeDirect(ByteBuffer.wrap(data), true);
+ }
+ // journal.get
+ }
+
+ private void handleJournalFileIdReservation(final ReplicationFileIdMessage packet)
throws Exception
+ {
+ if (server.isRemoteBackupUpToDate())
+ {
+ throw new HornetQException(HornetQException.INTERNAL_ERROR, "RemoteBackup
can not be up-to-date!");
+ }
+
+ final Journal journalIf = journals[packet.getJournalContentType().typeByte];
+
+ JournalImpl journal = assertJournalImpl(journalIf);
+ journal.createFilesForRemoteSync(packet.getFileIds());
+ }
+
+ private static JournalImpl assertJournalImpl(final Journal journalIf) throws
HornetQException
+ {
if (!(journalIf instanceof JournalImpl))
{
throw new HornetQException(HornetQException.INTERNAL_ERROR,
"Journals of backup server are expected to be
JournalImpl");
}
- JournalImpl journal = (JournalImpl)journalIf;
-
+ return (JournalImpl)journalIf;
}
private void handleLargeMessageEnd(final ReplicationLargemessageEndMessage packet)
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
===================================================================
---
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java 2011-07-29
17:25:36 UTC (rev 11069)
+++
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java 2011-07-29
17:27:23 UTC (rev 11070)
@@ -513,9 +513,10 @@
while (true)
{
int bytesRead = file.read(data);
+ // sending -1 bytes will close the file.
+ replicatingChannel.send(new ReplicationJournalFileMessage(bytesRead, data,
content, id));
if (bytesRead == -1)
break;
- replicatingChannel.sendBlocking(new ReplicationJournalFileMessage(bytesRead,
data, content, id));
}
// XXX probably need to sync the JournalFile(?)
throw new UnsupportedOperationException();
@@ -524,7 +525,13 @@
@Override
public void reserveFileIds(JournalFile[] datafiles, JournalContent contentType) throws
HornetQException
{
- replicatingChannel.sendBlocking(new ReplicationFileIdMessage(datafiles,
contentType));
+ replicatingChannel.send(new ReplicationFileIdMessage(datafiles, contentType));
}
+ @Override
+ public void sendSynchronizationDone()
+ {
+ replicatingChannel.send(new ReplicationJournalFileMessage(-1, null, null, -1));
+ }
+
}
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
---
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java 2011-07-29
17:25:36 UTC (rev 11069)
+++
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java 2011-07-29
17:27:23 UTC (rev 11070)
@@ -219,6 +219,12 @@
private boolean initialised;
+ /**
+ * Only applicable to 'remote backup servers'. If this flag is false the
backup may not become
+ * 'live'.
+ */
+ private volatile boolean backupUpToDate = true;
+
// private FailoverManager replicationFailoverManager;
private ReplicationManager replicationManager;
@@ -587,8 +593,20 @@
// Server node (i.e. Life node) is not running, now the backup takes over.
//we must remember to close stuff we don't need any more
nodeManager.awaitLiveNode();
+
serverLocator.close();
replicationEndpoint.stop();
+
+ if (!isRemoteBackupUpToDate())
+ {
+ /*
+ * XXX HORNETQ-720 Live is down, and this server was not in sync. Perhaps
we should
+ * first try to wait a little longer to see if the 'live' comes
back?
+ */
+ throw new RuntimeException("Backup Server was not yet in sync with
live");
+ }
+
+
configuration.setBackup(false);
initialisePart2();
@@ -603,7 +621,7 @@
}
public void close(final boolean permanently) throws Exception
- {
+ {
if(serverLocator != null)
{
serverLocator.close();
@@ -696,6 +714,7 @@
else
{
assert replicationEndpoint == null;
+ backupUpToDate = false;
replicationEndpoint = new ReplicationEndpointImpl(this);
activation = new SharedNothingBackupActivation();
}
@@ -2014,4 +2033,21 @@
journalStorageManager.setReplicator(replicationManager);
}
+
+ /**
+ * Whether a remote backup server was in sync with its live server. If it was not in
sync, it may
+ * not take over the live's functions.
+ * <p>
+ * A local backup server or a live server should always return {@code true}
+ * @return
+ */
+ public boolean isRemoteBackupUpToDate()
+ {
+ return backupUpToDate;
+ }
+
+ public void setRemoteBackupUpToDate(boolean isUpToDate)
+ {
+ backupUpToDate = isUpToDate;
+ }
}
Modified:
branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/JournalFilesRepository.java
===================================================================
---
branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/JournalFilesRepository.java 2011-07-29
17:25:36 UTC (rev 11069)
+++
branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/JournalFilesRepository.java 2011-07-29
17:27:23 UTC (rev 11070)
@@ -15,6 +15,7 @@
import java.util.Collection;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentLinkedQueue;
@@ -63,6 +64,8 @@
private final BlockingQueue<JournalFile> openedFiles = new
LinkedBlockingQueue<JournalFile>();
+ private Map<Long, JournalFile> filesReservedForSync;
+
private final AtomicLong nextFileID = new AtomicLong(0);
private final int maxAIO;
@@ -175,7 +178,7 @@
for (int i = 0; i < filesToCreate; i++)
{
// Keeping all files opened can be very costly (mainly on AIO)
- freeFiles.add(createFile(false, false, true, false));
+ freeFiles.add(createFile(false, false, true, false, -1));
}
}
@@ -416,7 +419,7 @@
if (nextFile == null)
{
- nextFile = createFile(keepOpened, multiAIO, initFile, tmpCompactExtension);
+ nextFile = createFile(keepOpened, multiAIO, initFile, tmpCompactExtension, -1);
}
else
{
@@ -434,6 +437,12 @@
return nextFile;
}
+ public void createRemoteBackupSyncFile(long fileID) throws Exception
+ {
+ assert !filesReservedForSync.containsKey(Long.valueOf(fileID));
+ filesReservedForSync.put(Long.valueOf(fileID), createFile(false, false, false,
false, fileID));
+ }
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
@@ -449,9 +458,10 @@
private JournalFile createFile(final boolean keepOpened,
final boolean multiAIO,
final boolean init,
- final boolean tmpCompact) throws Exception
+ final boolean tmpCompact,
+ final long fileIdPreSet) throws Exception
{
- long fileID = generateFileID();
+ long fileID = fileIdPreSet != -1 ? fileIdPreSet : generateFileID();
String fileName;
@@ -560,6 +570,22 @@
return jf;
}
- // Inner classes -------------------------------------------------
+ /**
+ * @param id
+ * @return
+ */
+ public JournalFile getRemoteBackupSyncFile(long id)
+ {
+ return filesReservedForSync.get(Long.valueOf(id));
+ }
+ public Collection<? extends JournalFile> getSyncFiles()
+ {
+ return filesReservedForSync.values();
+ }
+
+ public void clearSyncFiles()
+ {
+ filesReservedForSync.clear();
+ }
}
Modified:
branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/JournalImpl.java
===================================================================
---
branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/JournalImpl.java 2011-07-29
17:25:36 UTC (rev 11069)
+++
branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/JournalImpl.java 2011-07-29
17:27:23 UTC (rev 11070)
@@ -3266,4 +3266,64 @@
{
journalLock.writeLock().unlock();
}
+
+ /**
+ * @param fileIds
+ * @throws Exception
+ */
+ public void createFilesForRemoteSync(long[] fileIds) throws Exception
+ {
+ writeLock();
+ try
+ {
+ long maxID = -1;
+ for (long id : fileIds)
+ {
+ maxID = Math.max(maxID, id);
+ filesRepository.createRemoteBackupSyncFile(id);
+ }
+ }
+ finally
+ {
+ writeUnlock();
+ }
+ }
+
+ /**
+ * @param id
+ * @return
+ */
+ public JournalFile getRemoteBackupSyncFile(long id)
+ {
+ return filesRepository.getRemoteBackupSyncFile(id);
+ }
+
+ /**
+ *
+ */
+ public void finishRemoteBackupSync()
+ {
+ writeLock();
+ try
+ {
+ lockAppend.lock();
+ List<JournalFile> dataFilesToProcess = new
ArrayList<JournalFile>();
+ dataFilesToProcess.addAll(filesRepository.getDataFiles());
+ filesRepository.clearDataFiles();
+ dataFilesToProcess.addAll(filesRepository.getSyncFiles());
+ filesRepository.clearSyncFiles();
+ Collections.sort(dataFilesToProcess, new JournalFileComparator());
+ for (JournalFile file : dataFilesToProcess)
+ {
+ filesRepository.addDataFileOnTop(file);
+ }
+ // XXX HORNETQ-720 still missing a "reload" call
+ }
+ finally
+ {
+ lockAppend.unlock();
+ writeUnlock();
+ }
+
+ }
}
Modified:
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupJournalSyncTest.java
===================================================================
---
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupJournalSyncTest.java 2011-07-29
17:25:36 UTC (rev 11069)
+++
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupJournalSyncTest.java 2011-07-29
17:27:23 UTC (rev 11070)
@@ -1,5 +1,8 @@
package org.hornetq.tests.integration.cluster.failover;
+import java.util.HashSet;
+import java.util.Set;
+
import org.hornetq.api.core.HornetQException;
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.api.core.client.ClientConsumer;
@@ -7,6 +10,10 @@
import org.hornetq.api.core.client.ClientSession;
import org.hornetq.core.client.impl.ClientSessionFactoryInternal;
import org.hornetq.core.client.impl.ServerLocatorInternal;
+import org.hornetq.core.journal.impl.JournalFile;
+import org.hornetq.core.journal.impl.JournalImpl;
+import org.hornetq.core.persistence.impl.journal.JournalStorageManager;
+import org.hornetq.tests.integration.cluster.util.TestableServer;
import org.hornetq.tests.util.TransportConfigurationUtils;
public class BackupJournalSyncTest extends FailoverTestBase
@@ -15,6 +22,7 @@
private ServerLocatorInternal locator;
private ClientSessionFactoryInternal sessionFactory;
private ClientSession session;
+ private ClientProducer producer;
private static final int N_MSGS = 100;
@Override
@@ -38,15 +46,50 @@
backupServer.getServer().getNodeID());
}
+ public void testReserveFileIdValuesOnBackup() throws Exception
+ {
+ createProducerSendSomeMessages();
+ JournalImpl messageJournal = getMessageJournalFromServer(liveServer);
+ for (int i = 0; i < 5; i++)
+ {
+ messageJournal.forceMoveNextFile();
+ sendMessages(session, producer, N_MSGS);
+ }
+ backupServer.start();
+ waitForBackup(sessionFactory, 5);
+ // XXX HORNETQ-720 must wait for backup to sync!
+
+ JournalImpl backupMsgJournal = getMessageJournalFromServer(backupServer);
+ Set<Long> bckpIds = getFileIds(backupMsgJournal);
+ assertFalse(bckpIds.isEmpty());
+ Set<Long> liveIds = getFileIds(messageJournal);
+ assertEquals("sets must match! " + liveIds, bckpIds, liveIds);
+ }
+
+ /**
+ * @param backupMsgJournal
+ * @return
+ */
+ private Set<Long> getFileIds(JournalImpl journal)
+ {
+ Set<Long> results = new HashSet<Long>();
+ for (JournalFile jf : journal.getDataFiles())
+ {
+ results.add(Long.valueOf(jf.getFileID()));
+ }
+ return results;
+ }
+
+ static JournalImpl getMessageJournalFromServer(TestableServer server)
+ {
+ JournalStorageManager sm =
(JournalStorageManager)server.getServer().getStorageManager();
+ return (JournalImpl)sm.getMessageJournal();
+ }
+
public void testMessageSync() throws Exception
{
- session = sessionFactory.createSession(true, true);
- session.createQueue(FailoverTestBase.ADDRESS, FailoverTestBase.ADDRESS, null,
true);
- ClientProducer producer = session.createProducer(FailoverTestBase.ADDRESS);
+ createProducerSendSomeMessages();
- sendMessages(session, producer, N_MSGS);
- session.start();
-
receiveMsgs(0, N_MSGS / 2);
assertFalse("backup is not started!", backupServer.isStarted());
@@ -56,11 +99,20 @@
waitForBackup(sessionFactory, 5);
crash(session);
-
// consume N/2 from 'new' live (the old backup)
receiveMsgs(N_MSGS / 2, N_MSGS);
}
+ private void createProducerSendSomeMessages() throws HornetQException, Exception
+ {
+ session = sessionFactory.createSession(true, true);
+ session.createQueue(FailoverTestBase.ADDRESS, FailoverTestBase.ADDRESS, null,
true);
+ producer = session.createProducer(FailoverTestBase.ADDRESS);
+
+ sendMessages(session, producer, N_MSGS);
+ session.start();
+ }
+
private void receiveMsgs(int start, int end) throws HornetQException
{
ClientConsumer consumer = session.createConsumer(FailoverTestBase.ADDRESS);