Author: borges
Date: 2011-09-30 04:58:28 -0400 (Fri, 30 Sep 2011)
New Revision: 11449
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/paging/impl/PagingStoreImpl.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/StorageManager.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/ReplicationManager.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicatedJournal.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/Journal.java
branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/SequentialFile.java
branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/TestableJournal.java
branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/AIOSequentialFile.java
branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/AbstractSequentialFile.java
branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/FileWrapperJournal.java
branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/JournalBase.java
branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/JournalFilesRepository.java
branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/JournalImpl.java
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupSyncJournalTest.java
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTest.java
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/util/BackupSyncDelay.java
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/replication/ReplicationTest.java
branches/HORNETQ-720_Replication/tests/unit-tests/src/test/java/org/hornetq/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java
Log:
HORNETQ-720 Do not use AIO during backup synchronization
Also fix the logic at BackupSyncJournalTest for testing fileID reservation.
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/paging/impl/PagingStoreImpl.java
===================================================================
---
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/paging/impl/PagingStoreImpl.java 2011-09-30
08:55:35 UTC (rev 11448)
+++
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/paging/impl/PagingStoreImpl.java 2011-09-30
08:58:28 UTC (rev 11449)
@@ -1140,7 +1140,7 @@
{
continue;
}
- replicator.syncPages(fileFactory, sFile, id, getAddress());
+ replicator.syncPages(sFile, id, getAddress());
}
}
finally
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/StorageManager.java
===================================================================
---
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/StorageManager.java 2011-09-30
08:55:35 UTC (rev 11448)
+++
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/StorageManager.java 2011-09-30
08:58:28 UTC (rev 11449)
@@ -24,7 +24,6 @@
import org.hornetq.core.journal.IOAsyncTask;
import org.hornetq.core.journal.Journal;
import org.hornetq.core.journal.JournalLoadInformation;
-import org.hornetq.core.journal.SequentialFileFactory;
import org.hornetq.core.message.impl.MessageInternal;
import org.hornetq.core.paging.PageTransactionInfo;
import org.hornetq.core.paging.PagedMessage;
@@ -32,7 +31,6 @@
import org.hornetq.core.paging.cursor.PagePosition;
import org.hornetq.core.persistence.config.PersistedAddressSetting;
import org.hornetq.core.persistence.config.PersistedRoles;
-import org.hornetq.core.persistence.impl.journal.JournalStorageManager.JournalContent;
import org.hornetq.core.postoffice.Binding;
import org.hornetq.core.postoffice.PostOffice;
import org.hornetq.core.replication.ReplicationManager;
@@ -225,13 +223,6 @@
long storePageCounterInc(long queueID, int add) throws Exception;
/**
- * @param journalContent
- * @return {@code true} if the underlying {@link SequentialFileFactory} has callback
support.
- * @see SequentialFileFactory#isSupportsCallbacks()
- */
- boolean hasCallbackSupport(JournalContent journalContent);
-
- /**
* @return the bindings journal
*/
Journal getBindingsJournal();
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
---
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2011-09-30
08:55:35 UTC (rev 11448)
+++
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2011-09-30
08:58:28 UTC (rev 11449)
@@ -441,7 +441,6 @@
PagingStore store = manager.getPageStore(entry.getKey());
store.sendPages(replicator, entry.getValue());
}
-
}
/**
@@ -471,7 +470,7 @@
SequentialFile seqFile = largeMessagesFactory.createSequentialFile(fileName,
1);
if (!seqFile.exists())
continue;
- replicator.syncLargeMessageFile(largeMessagesFactory, seqFile, size,
getLargeMessageIdFromFilename(fileName));
+ replicator.syncLargeMessageFile(seqFile, size,
getLargeMessageIdFromFilename(fileName));
}
}
@@ -509,7 +508,7 @@
{
for (JournalFile jf : journalFiles)
{
- replicator.syncJournalFile(factory, jf, type);
+ replicator.syncJournalFile(jf, type);
jf.setCanReclaim(true);
}
}
@@ -3888,13 +3887,6 @@
journal.stop();
}
- public boolean hasCallbackSupport(JournalContent journalContent)
- {
- if (journalContent == JournalContent.MESSAGES)
- return messageJournalFileFactory.isSupportsCallbacks();
- return bindingsJournalFileFactory.isSupportsCallbacks();
- }
-
@Override
public boolean addToPage(PagingManager pagingManager,
SimpleString address,
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java
===================================================================
---
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java 2011-09-30
08:55:35 UTC (rev 11448)
+++
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java 2011-09-30
08:58:28 UTC (rev 11449)
@@ -37,7 +37,6 @@
import org.hornetq.core.persistence.StorageManager;
import org.hornetq.core.persistence.config.PersistedAddressSetting;
import org.hornetq.core.persistence.config.PersistedRoles;
-import org.hornetq.core.persistence.impl.journal.JournalStorageManager.JournalContent;
import org.hornetq.core.postoffice.Binding;
import org.hornetq.core.postoffice.PostOffice;
import org.hornetq.core.replication.ReplicationManager;
@@ -574,12 +573,6 @@
}
@Override
- public boolean hasCallbackSupport(JournalContent content)
- {
- return false;
- }
-
- @Override
public Journal getBindingsJournal()
{
return null;
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/ReplicationManager.java
===================================================================
---
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/ReplicationManager.java 2011-09-30
08:55:35 UTC (rev 11448)
+++
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/ReplicationManager.java 2011-09-30
08:58:28 UTC (rev 11449)
@@ -20,7 +20,6 @@
import org.hornetq.core.journal.EncodingSupport;
import org.hornetq.core.journal.JournalLoadInformation;
import org.hornetq.core.journal.SequentialFile;
-import org.hornetq.core.journal.SequentialFileFactory;
import org.hornetq.core.journal.impl.JournalFile;
import org.hornetq.core.paging.PagedMessage;
import org.hornetq.core.persistence.OperationContext;
@@ -89,13 +88,6 @@
void compareJournals(JournalLoadInformation[] journalInfo) throws HornetQException;
/**
- * Sends the whole content of the file to be duplicated.
- * @throws HornetQException
- * @throws Exception
- */
- void syncJournalFile(SequentialFileFactory factory, JournalFile jf, JournalContent
type) throws Exception;
-
- /**
* Reserve the following fileIDs in the backup server.
* @param datafiles
* @param contentType
@@ -111,11 +103,17 @@
void sendSynchronizationDone();
/**
+ * Sends the whole content of the file to be duplicated.
+ * @throws HornetQException
+ * @throws Exception
+ */
+ void syncJournalFile(JournalFile jf, JournalContent type) throws Exception;
+
+ /**
* @param seqFile
* @throws Exception
*/
- void syncLargeMessageFile(SequentialFileFactory fctr, SequentialFile seqFile, long
size, long id)
-
throws Exception;
+ void syncLargeMessageFile(SequentialFile seqFile, long size, long id) throws
Exception;
/**
* @param file
@@ -123,5 +121,5 @@
* @param pageStore
* @throws Exception
*/
- void syncPages(SequentialFileFactory factory, SequentialFile file, long id,
SimpleString pageStore) throws Exception;
+ void syncPages(SequentialFile file, long id, SimpleString pageStore) throws
Exception;
}
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicatedJournal.java
===================================================================
---
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicatedJournal.java 2011-09-30
08:55:35 UTC (rev 11448)
+++
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicatedJournal.java 2011-09-30
08:58:28 UTC (rev 11449)
@@ -23,8 +23,10 @@
import org.hornetq.core.journal.LoaderCallback;
import org.hornetq.core.journal.PreparedTransactionInfo;
import org.hornetq.core.journal.RecordInfo;
+import org.hornetq.core.journal.SequentialFileFactory;
import org.hornetq.core.journal.TransactionFailureCallback;
import org.hornetq.core.journal.impl.JournalFile;
+import org.hornetq.core.journal.impl.JournalFilesRepository;
import org.hornetq.core.journal.impl.dataformat.ByteArrayEncoding;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.persistence.OperationContext;
@@ -579,7 +581,7 @@
}
@Override
- public JournalFile createFilesForBackupSync(long[] fileIds, Map<Long,
JournalFile> mapToFill) throws Exception
+ public Map<Long, JournalFile> createFilesForBackupSync(long[] fileIds) throws
Exception
{
throw new UnsupportedOperationException("This method should only be called at
a replicating backup");
}
@@ -620,6 +622,18 @@
throw new UnsupportedOperationException();
}
+ @Override
+ public SequentialFileFactory getFileFactory()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public JournalFilesRepository getFilesRepository()
+ {
+ throw new UnsupportedOperationException();
+ }
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
===================================================================
---
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java 2011-09-30
08:55:35 UTC (rev 11448)
+++
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java 2011-09-30
08:58:28 UTC (rev 11449)
@@ -13,7 +13,11 @@
package org.hornetq.core.replication.impl;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.Map;
@@ -85,8 +89,8 @@
private final JournalLoadInformation[] journalLoadInformation = new
JournalLoadInformation[2];
/** Files reserved in each journal for synchronization of existing data from the
'live' server. */
- private final Map<JournalContent, Map<Long, JournalFile>>
filesReservedForSync =
- new HashMap<JournalContent, Map<Long, JournalFile>>();
+ private final Map<JournalContent, Map<Long, JournalSyncFile>>
filesReservedForSync =
+ new HashMap<JournalContent, Map<Long, JournalSyncFile>>();
private Map<Long, LargeServerMessage> largeMessagesOnSync = new HashMap<Long,
LargeServerMessage>();
/**
@@ -240,7 +244,7 @@
for (JournalContent jc : EnumSet.allOf(JournalContent.class))
{
- filesReservedForSync.put(jc, new HashMap<Long, JournalFile>());
+ filesReservedForSync.put(jc, new HashMap<Long, JournalSyncFile>());
// We only need to load internal structures on the backup...
journalLoadInformation[jc.typeByte] = journalsHolder.get(jc).loadSyncOnly();
}
@@ -446,7 +450,7 @@
{
Long id = Long.valueOf(msg.getId());
byte[] data = msg.getData();
- SequentialFile sf;
+ SequentialFile channel;
switch (msg.getFileType())
{
case LARGE_MESSAGE:
@@ -461,38 +465,44 @@
largeMessage.setMessageID(id);
largeMessagesOnSync.put(id, largeMessage);
}
- sf = largeMessage.getFile();
+ channel = largeMessage.getFile();
}
break;
}
- case JOURNAL:
- {
- JournalFile journalFile =
filesReservedForSync.get(msg.getJournalContent()).get(id);
- sf = journalFile.getFile();
- break;
- }
case PAGE:
{
Page page = getPage(msg.getPageStore(), (int)msg.getId());
- sf = page.getFile();
+ channel = page.getFile();
break;
}
+ case JOURNAL:
+ {
+ JournalSyncFile journalSyncFile =
filesReservedForSync.get(msg.getJournalContent()).get(id);
+ FileChannel channel2 = journalSyncFile.getChannel();
+ if (data == null)
+ {
+ channel2.close();
+ return;
+ }
+ channel2.write(ByteBuffer.wrap(data));
+ return;
+ }
default:
throw new HornetQException(HornetQException.INTERNAL_ERROR, "Unhandled
file type " + msg.getFileType());
}
if (data == null)
{
- sf.close();
+ channel.close();
return;
}
- if (!sf.isOpen())
+ if (!channel.isOpen())
{
- sf.open(1, false);
+ channel.open(1, false);
}
- sf.writeDirect(data);
+ channel.writeDirect(ByteBuffer.wrap(data), true);
}
/**
@@ -516,10 +526,12 @@
final Journal journal = journalsHolder.get(packet.getJournalContentType());
- Map<Long, JournalFile> mapToFill =
filesReservedForSync.get(packet.getJournalContentType());
- JournalFile current = journal.createFilesForBackupSync(packet.getFileIds(),
mapToFill);
- registerJournal(packet.getJournalContentType().typeByte,
- new FileWrapperJournal(current,
storage.hasCallbackSupport(packet.getJournalContentType())));
+ Map<Long, JournalSyncFile> mapToFill =
filesReservedForSync.get(packet.getJournalContentType());
+ for (Entry<Long, JournalFile> entry :
journal.createFilesForBackupSync(packet.getFileIds()).entrySet())
+ {
+ mapToFill.put(entry.getKey(), new JournalSyncFile(entry.getValue()));
+ }
+ registerJournal(packet.getJournalContentType().typeByte, new
FileWrapperJournal(journal));
}
private void handleLargeMessageEnd(final ReplicationLargeMessageEndMessage packet)
@@ -808,4 +820,38 @@
{
return journals[journalID];
}
+
+ public static class JournalSyncFile
+ {
+
+ private FileChannel channel;
+ private final File file;
+
+ public JournalSyncFile(JournalFile jFile) throws Exception
+ {
+ SequentialFile seqFile = jFile.getFile();
+ file = seqFile.getJavaFile();
+ seqFile.close();
+ }
+
+ FileChannel getChannel() throws Exception
+ {
+ if (channel == null)
+ {
+ channel = new FileOutputStream(file).getChannel();
+ }
+ return channel;
+ }
+
+ void close() throws IOException
+ {
+ channel.close();
+ }
+
+ @Override
+ public String toString()
+ {
+ return "JournalSyncFile(file=" + file.getAbsolutePath() +
")";
+ }
+ }
}
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
===================================================================
---
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java 2011-09-30
08:55:35 UTC (rev 11448)
+++
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java 2011-09-30
08:58:28 UTC (rev 11449)
@@ -13,7 +13,9 @@
package org.hornetq.core.replication.impl;
+import java.io.FileInputStream;
import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
import java.util.LinkedHashSet;
import java.util.Queue;
import java.util.Set;
@@ -26,7 +28,6 @@
import org.hornetq.core.journal.EncodingSupport;
import org.hornetq.core.journal.JournalLoadInformation;
import org.hornetq.core.journal.SequentialFile;
-import org.hornetq.core.journal.SequentialFileFactory;
import org.hornetq.core.journal.impl.JournalFile;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.paging.PagedMessage;
@@ -463,28 +464,26 @@
}
@Override
- public void syncJournalFile(SequentialFileFactory factory, JournalFile jf,
JournalContent content) throws Exception
+ public void syncJournalFile(JournalFile jf, JournalContent content) throws Exception
{
if (enabled)
{
SequentialFile file = jf.getFile().copy();
log.info("Replication: sending " + jf + " (size=" +
file.size() + ") to backup. " + file);
- sendLargeFile(content, null, jf.getFileID(), file, factory, Long.MAX_VALUE);
+ sendLargeFile(content, null, jf.getFileID(), file, Long.MAX_VALUE);
}
}
@Override
- public void
- syncLargeMessageFile(SequentialFileFactory factory, SequentialFile file, long size,
long id) throws Exception
+ public void syncLargeMessageFile(SequentialFile file, long size, long id) throws
Exception
{
- sendLargeFile(null, null, id, file, factory, size);
+ sendLargeFile(null, null, id, file, size);
}
@Override
- public void
- syncPages(SequentialFileFactory factory, SequentialFile file, long id, SimpleString
queueName) throws Exception
+ public void syncPages(SequentialFile file, long id, SimpleString queueName) throws
Exception
{
- sendLargeFile(null, queueName, id, file, factory, Long.MAX_VALUE);
+ sendLargeFile(null, queueName, id, file, Long.MAX_VALUE);
}
/**
@@ -500,9 +499,7 @@
SimpleString pageStore,
final long id,
SequentialFile file,
- SequentialFileFactory factory,
- long maxBytesToSend)
- throws Exception
+ long maxBytesToSend) throws Exception
{
if (!enabled)
return;
@@ -510,16 +507,17 @@
{
file.open();
}
- final ByteBuffer buffer = factory.newBuffer(1 << 17);
+ final FileChannel channel = (new
FileInputStream(file.getJavaFile())).getChannel();
try
{
+ final ByteBuffer buffer = ByteBuffer.allocate(1 << 17);
while (true)
- {
- buffer.rewind();
- int bytesRead = file.read(buffer);
- int toSend = bytesRead;
- if (bytesRead > 0)
{
+ buffer.clear();
+ int bytesRead = channel.read(buffer);
+ int toSend = bytesRead;
+ if (bytesRead > 0)
+ {
if (bytesRead >= maxBytesToSend)
{
toSend = (int)maxBytesToSend;
@@ -541,7 +539,7 @@
}
finally
{
- factory.releaseBuffer(buffer);
+ channel.close();
}
}
Modified:
branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/Journal.java
===================================================================
---
branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/Journal.java 2011-09-30
08:55:35 UTC (rev 11448)
+++
branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/Journal.java 2011-09-30
08:58:28 UTC (rev 11449)
@@ -17,6 +17,7 @@
import java.util.Map;
import org.hornetq.core.journal.impl.JournalFile;
+import org.hornetq.core.journal.impl.JournalFilesRepository;
import org.hornetq.core.server.HornetQComponent;
/**
@@ -148,13 +149,11 @@
* During the synchronization between a live server and backup, we reserve in the
backup the
* journal file IDs used in the live server. This call also makes sure the files are
created
* empty without any kind of headers added.
- * @param fileIds ids to reserve for synchronization
- * @param mapToFill map to be filled with id and journal file pairs for
<b>synchronization</b>.
- * @return a new {@link JournalFile} to be used for regular
<b>replication</b> during
- * synchronization
+ * @param fileIds IDs to reserve for synchronization
+ * @return map to be filled with id and journal file pairs for
<b>synchronization</b>.
* @throws Exception
*/
- JournalFile createFilesForBackupSync(long[] fileIds, Map<Long, JournalFile>
mapToFill) throws Exception;
+ Map<Long, JournalFile> createFilesForBackupSync(long[] fileIds) throws
Exception;
/**
* @return whether automatic reclaiming of Journal files is enabled
@@ -190,4 +189,9 @@
*/
JournalFile[] getDataFiles();
+ SequentialFileFactory getFileFactory();
+
+ JournalFilesRepository getFilesRepository();
+
+ int getFileSize();
}
Modified:
branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/SequentialFile.java
===================================================================
---
branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/SequentialFile.java 2011-09-30
08:55:35 UTC (rev 11448)
+++
branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/SequentialFile.java 2011-09-30
08:58:28 UTC (rev 11449)
@@ -13,6 +13,7 @@
package org.hornetq.core.journal;
+import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
@@ -128,4 +129,9 @@
void copyTo(SequentialFile newFileName) throws Exception;
void setTimedBuffer(TimedBuffer buffer);
+
+ /**
+ * Returns a native File of the file underlying this sequential file.
+ */
+ File getJavaFile();
}
Modified:
branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/TestableJournal.java
===================================================================
---
branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/TestableJournal.java 2011-09-30
08:55:35 UTC (rev 11448)
+++
branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/TestableJournal.java 2011-09-30
08:58:28 UTC (rev 11449)
@@ -37,8 +37,6 @@
void debugWait() throws Exception;
- int getFileSize();
-
int getMinFiles();
String getFilePrefix();
Modified:
branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/AIOSequentialFile.java
===================================================================
---
branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/AIOSequentialFile.java 2011-09-30
08:55:35 UTC (rev 11448)
+++
branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/AIOSequentialFile.java 2011-09-30
08:58:28 UTC (rev 11449)
@@ -24,7 +24,6 @@
import org.hornetq.core.journal.IOAsyncTask;
import org.hornetq.core.journal.SequentialFile;
import org.hornetq.core.journal.SequentialFileFactory;
-import org.hornetq.core.logging.Logger;
/**
*
@@ -35,8 +34,6 @@
*/
public class AIOSequentialFile extends AbstractSequentialFile
{
- private static final Logger log = Logger.getLogger(AIOSequentialFile.class);
-
private boolean opened = false;
private final int maxIO;
Modified:
branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/AbstractSequentialFile.java
===================================================================
---
branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/AbstractSequentialFile.java 2011-09-30
08:55:35 UTC (rev 11448)
+++
branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/AbstractSequentialFile.java 2011-09-30
08:58:28 UTC (rev 11449)
@@ -389,4 +389,9 @@
}
+ @Override
+ public File getJavaFile()
+ {
+ return getFile().getAbsoluteFile();
+ }
}
Modified:
branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/FileWrapperJournal.java
===================================================================
---
branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/FileWrapperJournal.java 2011-09-30
08:55:35 UTC (rev 11448)
+++
branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/FileWrapperJournal.java 2011-09-30
08:58:28 UTC (rev 11449)
@@ -15,6 +15,8 @@
import org.hornetq.core.journal.LoaderCallback;
import org.hornetq.core.journal.PreparedTransactionInfo;
import org.hornetq.core.journal.RecordInfo;
+import org.hornetq.core.journal.SequentialFile;
+import org.hornetq.core.journal.SequentialFileFactory;
import org.hornetq.core.journal.TransactionFailureCallback;
import org.hornetq.core.journal.impl.dataformat.JournalAddRecord;
import org.hornetq.core.journal.impl.dataformat.JournalAddRecordTX;
@@ -36,15 +38,14 @@
private final ConcurrentMap<Long, AtomicInteger> transactions = new
ConcurrentHashMap<Long, AtomicInteger>();
- private final JournalFile currentFile;
-
/**
- * @param file
+ * @param journal
+ * @throws Exception
*/
- public FileWrapperJournal(JournalFile file, boolean hasCallbackSupport)
+ public FileWrapperJournal(Journal journal) throws Exception
{
- super(hasCallbackSupport);
- currentFile = file;
+ super(journal.getFileFactory(), journal.getFilesRepository(),
journal.getFileSize());
+ setUpCurrentFile(JournalImpl.SIZE_HEADER);
}
@Override
@@ -56,7 +57,11 @@
@Override
public void stop() throws Exception
{
- currentFile.getFile().close();
+ SequentialFile seqFile = currentFile.getFile();
+ long pos = seqFile.position();
+ seqFile.close();
+ seqFile.open();
+ seqFile.position(pos);
}
@Override
@@ -92,7 +97,7 @@
{
callback.storeLineUp();
}
-
+ switchFileIfNecessary(encoder.getEncodeSize());
encoder.setFileID(currentFile.getRecordID());
if (callback != null)
@@ -189,6 +194,12 @@
return defaultValue.get();
}
+ @Override
+ public String toString()
+ {
+ return FileWrapperJournal.class.getName() + "(currentFile=[" +
currentFile + "], hash=" + super.toString() + ")";
+ }
+
// UNSUPPORTED STUFF
@Override
@@ -260,7 +271,7 @@
}
@Override
- public JournalFile createFilesForBackupSync(long[] fileIds, Map<Long,
JournalFile> mapToFill) throws Exception
+ public Map<Long, JournalFile> createFilesForBackupSync(long[] fileIds) throws
Exception
{
throw new UnsupportedOperationException();
}
@@ -300,4 +311,22 @@
{
throw new UnsupportedOperationException();
}
+
+ @Override
+ void scheduleReclaim()
+ {
+ // no-op
+ }
+
+ @Override
+ public SequentialFileFactory getFileFactory()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public JournalFilesRepository getFilesRepository()
+ {
+ throw new UnsupportedOperationException();
+ }
}
Modified:
branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/JournalBase.java
===================================================================
---
branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/JournalBase.java 2011-09-30
08:55:35 UTC (rev 11448)
+++
branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/JournalBase.java 2011-09-30
08:58:28 UTC (rev 11449)
@@ -3,16 +3,35 @@
import org.hornetq.api.core.HornetQBuffer;
import org.hornetq.core.journal.EncodingSupport;
import org.hornetq.core.journal.IOCompletion;
+import org.hornetq.core.journal.SequentialFileFactory;
import org.hornetq.core.journal.impl.dataformat.ByteArrayEncoding;
+import org.hornetq.core.logging.Logger;
abstract class JournalBase
{
- private final boolean hasCallbackSupport;
+ protected final JournalFilesRepository filesRepository;
+ protected final SequentialFileFactory fileFactory;
+ protected volatile JournalFile currentFile;
+ protected final int fileSize;
- public JournalBase(boolean hasCallbackSupport)
+ private static final Logger log = Logger.getLogger(JournalBase.class);
+ private static final boolean trace = log.isTraceEnabled();
+
+ public JournalBase(SequentialFileFactory fileFactory, JournalFilesRepository
journalFilesRepository, int fileSize)
{
- this.hasCallbackSupport = hasCallbackSupport;
+ if (fileSize < JournalImpl.MIN_FILE_SIZE)
+ {
+ throw new IllegalArgumentException("File size cannot be less than " +
JournalImpl.MIN_FILE_SIZE + " bytes");
+ }
+ if (fileSize % fileFactory.getAlignment() != 0)
+ {
+ throw new IllegalArgumentException("Invalid journal-file-size " +
fileSize + ", It should be multiple of " +
+ fileFactory.getAlignment());
+ }
+ this.fileFactory = fileFactory;
+ this.filesRepository = journalFilesRepository;
+ this.fileSize = fileSize;
}
abstract public void appendAddRecord(final long id, final byte recordType, final
EncodingSupport record,
@@ -179,9 +198,56 @@
}
}
+ /**
+ * @param size
+ * @throws Exception
+ */
+ protected void switchFileIfNecessary(int size) throws Exception
+ {
+ // We take into account the fileID used on the Header
+ if (size > fileSize -
currentFile.getFile().calculateBlockStart(JournalImpl.SIZE_HEADER))
+ {
+ throw new IllegalArgumentException("Record is too large to store " +
size);
+ }
+
+ if (!currentFile.getFile().fits(size))
+ {
+ moveNextFile(true);
+
+ // The same check needs to be done at the new file also
+ if (!currentFile.getFile().fits(size))
+ {
+ // Sanity check, this should never happen
+ throw new IllegalStateException("Invalid logic on buffer
allocation");
+ }
+ }
+ }
+
+ abstract void scheduleReclaim();
+
+ // You need to guarantee lock.acquire() before calling this method
+ protected void moveNextFile(final boolean scheduleReclaim) throws Exception
+ {
+ filesRepository.closeFile(currentFile);
+
+ currentFile = filesRepository.openFile();
+
+ if (scheduleReclaim)
+ {
+ scheduleReclaim();
+ }
+
+ if (trace)
+ {
+ log.info("moveNextFile: " + currentFile);
+ }
+
+ fileFactory.activateBuffer(currentFile.getFile());
+ }
+
protected SyncIOCompletion getSyncCallback(final boolean sync)
{
- if (hasCallbackSupport)
+ if (fileFactory.isSupportsCallbacks())
{
if (sync)
{
@@ -213,4 +279,40 @@
}
}
+ /**
+ * @param lastDataPos
+ * @throws Exception
+ */
+ protected void setUpCurrentFile(int lastDataPos) throws Exception
+ {
+ // Create any more files we need
+
+ filesRepository.ensureMinFiles();
+
+ // The current file is the last one that has data
+
+ currentFile = filesRepository.pollLastDataFile();
+
+ if (currentFile != null)
+ {
+ currentFile.getFile().open();
+
+
currentFile.getFile().position(currentFile.getFile().calculateBlockStart(lastDataPos));
+ }
+ else
+ {
+ currentFile = filesRepository.getFreeFile();
+
+ filesRepository.openFile(currentFile, true);
+ }
+
+ fileFactory.activateBuffer(currentFile.getFile());
+
+ filesRepository.pushOpenedFile();
+ }
+
+ public int getFileSize()
+ {
+ return fileSize;
+ }
}
Modified:
branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/JournalFilesRepository.java
===================================================================
---
branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/JournalFilesRepository.java 2011-09-30
08:55:35 UTC (rev 11448)
+++
branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/JournalFilesRepository.java 2011-09-30
08:58:28 UTC (rev 11449)
@@ -33,8 +33,6 @@
* Guaranteeing that they will be delivered in order to the Journal
*
* @author <a href="mailto:clebert.suconic@jboss.org">Clebert
Suconic</a>
- *
- *
*/
public class JournalFilesRepository
{
@@ -91,6 +89,18 @@
final int fileSize,
final int minFiles)
{
+ if (filePrefix == null)
+ {
+ throw new IllegalArgumentException("filePrefix cannot be null");
+ }
+ if (fileExtension == null)
+ {
+ throw new IllegalArgumentException("fileExtension cannot be null");
+ }
+ if (maxAIO <= 0)
+ {
+ throw new IllegalArgumentException("maxAIO must be a positive
number");
+ }
this.fileFactory = fileFactory;
this.maxAIO = maxAIO;
this.filePrefix = filePrefix;
@@ -447,9 +457,9 @@
* Creates files for journal synchronization of a replicated backup.
* @param isCurrent a current file is initialized and kept open.
*/
- public JournalFile createRemoteBackupSyncFile(long fileID, boolean isCurrent) throws
Exception
+ public JournalFile createRemoteBackupSyncFile(long fileID) throws Exception
{
- return createFile(isCurrent, false, isCurrent, false, fileID);
+ return createFile(false, false, true, false, fileID);
}
// Package protected ---------------------------------------------
@@ -576,4 +586,11 @@
return jf;
}
+
+ @Override
+ public String toString()
+ {
+ return "JournalFilesRepository(dataFiles=" + dataFiles + ",
freeFiles=" + freeFiles + ", openedFiles=" +
+ openedFiles + ")";
+ }
}
Modified:
branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/JournalImpl.java
===================================================================
---
branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/JournalImpl.java 2011-09-30
08:55:35 UTC (rev 11448)
+++
branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/JournalImpl.java 2011-09-30
08:58:28 UTC (rev 11449)
@@ -18,6 +18,7 @@
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
@@ -171,18 +172,12 @@
private final int userVersion;
- private final int fileSize;
-
private final int minFiles;
private final float compactPercentage;
private final int compactMinFiles;
- private final SequentialFileFactory fileFactory;
-
- private final JournalFilesRepository filesRepository;
-
// Compacting may replace this structure
private final ConcurrentMap<Long, JournalRecord> records = new
ConcurrentHashMap<Long, JournalRecord>();
@@ -212,8 +207,6 @@
*/
private final ReadWriteLock journalLock = new ReentrantReadWriteLock();
- private volatile JournalFile currentFile;
-
private volatile JournalState state = JournalState.STOPPED;
private final Reclaimer reclaimer = new Reclaimer();
@@ -229,47 +222,12 @@
final String fileExtension,
final int maxAIO)
{
- this(fileSize, minFiles, compactMinFiles, compactPercentage, fileFactory,
filePrefix, fileExtension, maxAIO, 0);
- }
-
- public JournalImpl(final int fileSize,
- final int minFiles,
- final int compactMinFiles,
- final int compactPercentage,
- final SequentialFileFactory fileFactory,
- final String filePrefix,
- final String fileExtension,
- final int maxAIO,
- final int userVersion)
- {
- super(fileFactory.isSupportsCallbacks());
- if (fileSize < JournalImpl.MIN_FILE_SIZE)
- {
- throw new IllegalArgumentException("File size cannot be less than " +
JournalImpl.MIN_FILE_SIZE + " bytes");
- }
- if (fileSize % fileFactory.getAlignment() != 0)
- {
- throw new IllegalArgumentException("Invalid journal-file-size " +
fileSize +
- ", It should be multiple of " +
- fileFactory.getAlignment());
- }
+ super(fileFactory, new JournalFilesRepository(fileFactory, filePrefix,
fileExtension, 0, maxAIO, fileSize,
+ minFiles), fileSize);
if (minFiles < 2)
{
throw new IllegalArgumentException("minFiles cannot be less than 2");
}
- if (filePrefix == null)
- {
- throw new NullPointerException("filePrefix is null");
- }
- if (fileExtension == null)
- {
- throw new NullPointerException("fileExtension is null");
- }
- if (maxAIO <= 0)
- {
- throw new IllegalStateException("maxAIO should aways be a positive
number");
- }
-
if (compactPercentage < 0 || compactPercentage > 100)
{
throw new IllegalArgumentException("Compact Percentage out of
range");
@@ -285,28 +243,14 @@
}
this.compactMinFiles = compactMinFiles;
-
- this.fileSize = fileSize;
-
this.minFiles = minFiles;
-
- this.fileFactory = fileFactory;
-
- filesRepository = new JournalFilesRepository(fileFactory,
- filePrefix,
- fileExtension,
- userVersion,
- maxAIO,
- fileSize,
- minFiles);
-
- this.userVersion = userVersion;
+ this.userVersion = 0;
}
@Override
public String toString()
{
- return super.toString() + " " + state;
+ return "JournalImpl(state=" + state + ", currentFile=[" +
currentFile + "], hash=" + super.toString() + ")";
}
public void runDirectJournalBlast() throws Exception
@@ -2041,31 +1985,8 @@
return new JournalLoadInformation(0, -1);
}
- // Create any more files we need
+ setUpCurrentFile(lastDataPos);
- filesRepository.ensureMinFiles();
-
- // The current file is the last one that has data
-
- currentFile = filesRepository.pollLastDataFile();
-
- if (currentFile != null)
- {
- currentFile.getFile().open();
-
-
currentFile.getFile().position(currentFile.getFile().calculateBlockStart(lastDataPos));
- }
- else
- {
- currentFile = filesRepository.getFreeFile();
-
- filesRepository.openFile(currentFile, true);
- }
-
- fileFactory.activateBuffer(currentFile.getFile());
-
- filesRepository.pushOpenedFile();
-
setJournalState(JournalState.LOADED);
for (TransactionHolder transaction : loadTransactions.values())
@@ -2162,7 +2083,7 @@
totalLiveSize += file.getLiveSize();
}
- long totalBytes = (long)dataFiles.length * (long)fileSize;
+ long totalBytes = dataFiles.length * (long)fileSize;
long compactMargin = (long)(totalBytes * compactPercentage);
@@ -2224,7 +2145,7 @@
// TestableJournal implementation
// --------------------------------------------------------------
- public void setAutoReclaim(final boolean autoReclaim)
+ public synchronized void setAutoReclaim(final boolean autoReclaim)
{
this.autoReclaim = autoReclaim;
}
@@ -2329,6 +2250,7 @@
return records.size();
}
+ @Override
public int getFileSize()
{
return fileSize;
@@ -2775,26 +2697,10 @@
final IOAsyncTask callback;
- int size = encoder.getEncodeSize();
+ final int size = encoder.getEncodeSize();
- // We take into account the fileID used on the Header
- if (size > fileSize -
currentFile.getFile().calculateBlockStart(JournalImpl.SIZE_HEADER))
- {
- throw new IllegalArgumentException("Record is too large to store " +
size);
- }
+ switchFileIfNecessary(size);
- if (!currentFile.getFile().fits(size))
- {
- moveNextFile(true);
-
- // The same check needs to be done at the new file also
- if (!currentFile.getFile().fits(size))
- {
- // Sanity check, this should never happen
- throw new IllegalStateException("Invalid logic on buffer
allocation");
- }
- }
-
if (tx != null)
{
// The callback of a transaction has to be taken inside the lock,
@@ -2842,28 +2748,9 @@
return currentFile;
}
- // You need to guarantee lock.acquire() before calling this method
- private void moveNextFile(final boolean scheduleReclaim) throws Exception
+ @Override
+ void scheduleReclaim()
{
- filesRepository.closeFile(currentFile);
-
- currentFile = filesRepository.openFile();
-
- if (scheduleReclaim)
- {
- scheduleReclaim();
- }
-
- if (JournalImpl.trace)
- {
- JournalImpl.trace("moveNextFile: " + currentFile);
- }
-
- fileFactory.activateBuffer(currentFile.getFile());
- }
-
- private void scheduleReclaim()
- {
if (state != JournalState.LOADED)
{
return;
@@ -3102,21 +2989,21 @@
* @throws Exception
*/
@Override
- public JournalFile createFilesForBackupSync(long[] fileIds, Map<Long,
JournalFile> map) throws Exception
+ public synchronized Map<Long, JournalFile> createFilesForBackupSync(long[]
fileIds) throws Exception
{
writeLock();
try
{
+ Map<Long, JournalFile> map = new HashMap<Long, JournalFile>();
log.info("Reserving fileIDs before synchronization: " +
Arrays.toString(fileIds));
long maxID = -1;
for (long id : fileIds)
{
maxID = Math.max(maxID, id);
- map.put(Long.valueOf(id), filesRepository.createRemoteBackupSyncFile(id,
false));
+ map.put(Long.valueOf(id), filesRepository.createRemoteBackupSyncFile(id));
}
- maxID += 1;
filesRepository.setNextFileID(maxID);
- return filesRepository.createRemoteBackupSyncFile(maxID, true);
+ return map;
}
finally
{
@@ -3128,4 +3015,16 @@
{
return autoReclaim;
}
+
+ @Override
+ public SequentialFileFactory getFileFactory()
+ {
+ return fileFactory;
+ }
+
+ @Override
+ public JournalFilesRepository getFilesRepository()
+ {
+ return filesRepository;
+ }
}
Modified:
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupSyncJournalTest.java
===================================================================
---
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupSyncJournalTest.java 2011-09-30
08:55:35 UTC (rev 11448)
+++
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupSyncJournalTest.java 2011-09-30
08:58:28 UTC (rev 11449)
@@ -28,7 +28,7 @@
private ClientSession session;
private ClientProducer producer;
private BackupSyncDelay syncDelay;
- protected int n_msgs = 10;
+ protected int n_msgs = 20;
@Override
protected void setUp() throws Exception
@@ -64,12 +64,19 @@
}
backupServer.start();
- syncDelay.deliverUpToDateMsg();
+
+ // Deliver messages with Backup in-sync
waitForBackup(sessionFactory, BACKUP_WAIT_TIME, false);
+ sendMessages(session, producer, n_msgs);
+ // Deliver messages with Backup up-to-date
+ syncDelay.deliverUpToDateMsg();
+ waitForBackup(sessionFactory, BACKUP_WAIT_TIME, true);
// SEND more messages, now with the backup replicating
sendMessages(session, producer, n_msgs);
+
Set<Long> liveIds = getFileIds(messageJournal);
+ int size = messageJournal.getFileSize();
PagingStore ps = liveServer.getServer().getPagingManager().getPageStore(ADDRESS);
if (ps.getPageSizeBytes() == PAGE_SIZE)
{
@@ -79,11 +86,14 @@
finishSyncAndFailover();
JournalImpl backupMsgJournal = getMessageJournalFromServer(backupServer);
+ System.out.println("backup journal " + backupMsgJournal);
+ System.out.println("live journal " + messageJournal);
+ assertEquals("file sizes must be the same", size,
backupMsgJournal.getFileSize());
Set<Long> backupIds = getFileIds(backupMsgJournal);
assertEquals("File IDs must match!", liveIds, backupIds);
// "+ 2": there two other calls that send N_MSGS.
- for (int i = 0; i < totalRounds + 2; i++)
+ for (int i = 0; i < totalRounds + 3; i++)
{
receiveMsgsInRange(0, n_msgs);
}
Modified:
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTest.java
===================================================================
---
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTest.java 2011-09-30
08:55:35 UTC (rev 11448)
+++
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/FailoverTest.java 2011-09-30
08:58:28 UTC (rev 11449)
@@ -1571,7 +1571,7 @@
ClientMessage message = consumer.receiveImmediate();
- Assert.assertNull("Null message", message);
+ Assert.assertNull("expecting null message", message);
session2.close();
@@ -1610,7 +1610,7 @@
backupServer.start();
- assertTrue(latch.await(5, TimeUnit.SECONDS));
+ assertTrue("session failure listener", latch.await(5,
TimeUnit.SECONDS));
ClientProducer producer = session.createProducer(FailoverTestBase.ADDRESS);
@@ -1772,7 +1772,7 @@
backupServer.start();
- assertTrue(latch.await(5, TimeUnit.SECONDS));
+ assertTrue("session failure listener", latch.await(5,
TimeUnit.SECONDS));
ClientProducer producer = session.createProducer(FailoverTestBase.ADDRESS);
Modified:
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/util/BackupSyncDelay.java
===================================================================
---
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/util/BackupSyncDelay.java 2011-09-30
08:55:35 UTC (rev 11448)
+++
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/util/BackupSyncDelay.java 2011-09-30
08:58:28 UTC (rev 11449)
@@ -84,6 +84,7 @@
private Packet onHold;
private Channel channel;
public volatile boolean deliver;
+ private volatile boolean delivered;
private boolean receivedUpToDate;
private boolean mustHold = true;
@@ -97,6 +98,8 @@
deliver = true;
if (!receivedUpToDate)
return;
+ if (delivered)
+ return;
if (onHold == null)
{
@@ -108,6 +111,7 @@
try
{
handler.handlePacket(onHold);
+ delivered = true;
}
finally
{
Modified:
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/replication/ReplicationTest.java
===================================================================
---
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/replication/ReplicationTest.java 2011-09-30
08:55:35 UTC (rev 11448)
+++
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/replication/ReplicationTest.java 2011-09-30
08:58:28 UTC (rev 11449)
@@ -51,8 +51,10 @@
import org.hornetq.core.journal.LoaderCallback;
import org.hornetq.core.journal.PreparedTransactionInfo;
import org.hornetq.core.journal.RecordInfo;
+import org.hornetq.core.journal.SequentialFileFactory;
import org.hornetq.core.journal.TransactionFailureCallback;
import org.hornetq.core.journal.impl.JournalFile;
+import org.hornetq.core.journal.impl.JournalFilesRepository;
import org.hornetq.core.paging.PagedMessage;
import org.hornetq.core.paging.PagingManager;
import org.hornetq.core.paging.PagingStore;
@@ -853,7 +855,7 @@
}
@Override
- public JournalFile createFilesForBackupSync(long[] fileIds, Map<Long,
JournalFile> mapToFill) throws Exception
+ public Map<Long, JournalFile> createFilesForBackupSync(long[] fileIds) throws
Exception
{
return null;
}
@@ -894,5 +896,17 @@
return null;
}
+ @Override
+ public SequentialFileFactory getFileFactory()
+ {
+ return null;
+ }
+
+ @Override
+ public JournalFilesRepository getFilesRepository()
+ {
+ return null;
+ }
+
}
}
Modified:
branches/HORNETQ-720_Replication/tests/unit-tests/src/test/java/org/hornetq/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java
===================================================================
---
branches/HORNETQ-720_Replication/tests/unit-tests/src/test/java/org/hornetq/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java 2011-09-30
08:55:35 UTC (rev 11448)
+++
branches/HORNETQ-720_Replication/tests/unit-tests/src/test/java/org/hornetq/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java 2011-09-30
08:58:28 UTC (rev 11449)
@@ -13,6 +13,7 @@
package org.hornetq.tests.unit.core.journal.impl.fakes;
+import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
@@ -28,7 +29,6 @@
import org.hornetq.core.journal.SequentialFile;
import org.hornetq.core.journal.SequentialFileFactory;
import org.hornetq.core.journal.impl.TimedBuffer;
-import org.hornetq.core.logging.Logger;
/**
*
@@ -40,8 +40,6 @@
*/
public class FakeSequentialFileFactory implements SequentialFileFactory
{
- private static final Logger log = Logger.getLogger(FakeSequentialFileFactory.class);
-
// Constants -----------------------------------------------------
// Attributes ----------------------------------------------------
@@ -285,11 +283,6 @@
}
}
- public boolean isSendError()
- {
- return sendError;
- }
-
public void setSendError(final boolean sendError)
{
this.sendError = sendError;
@@ -688,6 +681,12 @@
HornetQBuffer outbuffer = HornetQBuffers.wrappedBuffer(buffer);
write(outbuffer, true);
}
+
+ @Override
+ public File getJavaFile()
+ {
+ throw new UnsupportedOperationException();
+ }
}
/* (non-Javadoc)