Author: borges
Date: 2012-01-12 07:08:26 -0500 (Thu, 12 Jan 2012)
New Revision: 12012
Added:
trunk/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/LargeServerMessageInSync.java
trunk/hornetq-core/src/main/java/org/hornetq/core/replication/ReplicatedLargeMessage.java
Modified:
trunk/hornetq-core/src/main/java/org/hornetq/core/persistence/StorageManager.java
trunk/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
trunk/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/LargeServerMessageImpl.java
trunk/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/nullpm/NullStorageLargeServerMessage.java
trunk/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java
trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/ReplicationStartSyncMessage.java
trunk/hornetq-core/src/main/java/org/hornetq/core/replication/ReplicationEndpoint.java
trunk/hornetq-core/src/main/java/org/hornetq/core/replication/ReplicationManager.java
trunk/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
trunk/hornetq-core/src/main/java/org/hornetq/core/server/LargeServerMessage.java
trunk/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/JournalImpl.java
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupSyncJournalTest.java
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupSyncLargeMessageTest.java
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/persistence/DeleteMessagesOnStartupTest.java
Log:
HORNETQ-720 Fix synchronization issues with Large Messages
Modified:
trunk/hornetq-core/src/main/java/org/hornetq/core/persistence/StorageManager.java
===================================================================
---
trunk/hornetq-core/src/main/java/org/hornetq/core/persistence/StorageManager.java 2012-01-12
12:07:32 UTC (rev 12011)
+++
trunk/hornetq-core/src/main/java/org/hornetq/core/persistence/StorageManager.java 2012-01-12
12:08:26 UTC (rev 12012)
@@ -26,6 +26,7 @@
import org.hornetq.core.journal.IOAsyncTask;
import org.hornetq.core.journal.Journal;
import org.hornetq.core.journal.JournalLoadInformation;
+import org.hornetq.core.journal.SequentialFile;
import org.hornetq.core.message.impl.MessageInternal;
import org.hornetq.core.paging.PageTransactionInfo;
import org.hornetq.core.paging.PagedMessage;
@@ -161,6 +162,8 @@
*/
LargeServerMessage createLargeMessage(long id, MessageInternal message) throws
Exception;
+ SequentialFile createFileForLargeMessage(final long messageID, String extension);
+
void prepare(long txID, Xid xid) throws Exception;
void commit(long txID) throws Exception;
@@ -274,4 +277,11 @@
* Typical scenario is a broken connection.
*/
void stopReplication();
+
+ /**
+ * @param appendFile
+ * @param messageID
+ * @param bytes
+ */
+ void addBytesToLargeMessage(SequentialFile appendFile, long messageID, byte[] bytes)
throws Exception;
}
Modified:
trunk/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
---
trunk/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2012-01-12
12:07:32 UTC (rev 12011)
+++
trunk/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2012-01-12
12:08:26 UTC (rev 12012)
@@ -381,7 +381,7 @@
messageFiles = prepareJournalForCopy(originalMessageJournal,
JournalContent.MESSAGES, nodeID);
bindingsFiles = prepareJournalForCopy(originalBindingsJournal,
JournalContent.BINDINGS, nodeID);
pageFilesToSync = getPageInformationForSync(pagingManager);
- largeMessageFilesToSync = getLargeMessageInformation();
+ largeMessageFilesToSync = getLargeMessageInformation();
}
finally
{
@@ -469,14 +469,14 @@
private Map<SimpleString, Collection<Integer>>
getPageInformationForSync(PagingManager pagingManager)
throws Exception
{
- Map<SimpleString, Collection<Integer>> info = new
HashMap<SimpleString, Collection<Integer>>();
- for (SimpleString storeName : pagingManager.getStoreNames())
- {
- PagingStore store = pagingManager.getPageStore(storeName);
- info.put(storeName, store.getCurrentIds());
- store.forceAnotherPage();
- }
- return info;
+ Map<SimpleString, Collection<Integer>> info = new
HashMap<SimpleString, Collection<Integer>>();
+ for (SimpleString storeName : pagingManager.getStoreNames())
+ {
+ PagingStore store = pagingManager.getPageStore(storeName);
+ info.put(storeName, store.getCurrentIds());
+ store.forceAnotherPage();
+ }
+ return info;
}
private void sendLargeMessageFiles(Map<String, Long> largeMessageFilesToSync)
throws Exception
@@ -509,30 +509,35 @@
*/
private Map<String, Long> getLargeMessageInformation() throws Exception
{
- Map<String, Long> largeMessages = new HashMap<String, Long>();
- List<String> filenames = largeMessagesFactory.listFiles("msg");
- for (String filename : filenames)
- {
- SequentialFile seqFile =
largeMessagesFactory.createSequentialFile(filename, 1);
- long size = seqFile.size();
- largeMessages.put(filename, size);
- }
- return largeMessages;
+ final String prefix = "msg";
+ Map<String, Long> largeMessages = new HashMap<String, Long>();
+ List<String> filenames = largeMessagesFactory.listFiles(prefix);
+
+ List<Long> idList = new ArrayList<Long>();
+ for (String filename : filenames)
+ {
+ idList.add(Long.valueOf(filename.substring(0, filename.length() -
(prefix.length() + 1))));
+ SequentialFile seqFile = largeMessagesFactory.createSequentialFile(filename,
1);
+ long size = seqFile.size();
+ largeMessages.put(filename, size);
+ }
+ replicator.sendLargeMessageIdListMessage(idList);
+ return largeMessages;
}
/**
* Send an entire journal file to a replicating backup server.
*/
private void sendJournalFile(JournalFile[] journalFiles, JournalContent type) throws
Exception
- {
- for (JournalFile jf : journalFiles)
- {
- if (!started)
- return;
- replicator.syncJournalFile(jf, type);
- jf.setCanReclaim(true);
- }
- }
+ {
+ for (JournalFile jf : journalFiles)
+ {
+ if (!started)
+ return;
+ replicator.syncJournalFile(jf, type);
+ jf.setCanReclaim(true);
+ }
+ }
private JournalFile[]
prepareJournalForCopy(Journal journal, JournalContent contentType, String
nodeID) throws Exception
@@ -546,23 +551,23 @@
@Override
public void waitOnOperations() throws Exception
{
- if (!started)
- {
- JournalStorageManager.log.warn("Server is stopped");
- throw new IllegalStateException("Server is stopped");
- }
- waitOnOperations(0);
+ if (!started)
+ {
+ JournalStorageManager.log.warn("Server is stopped");
+ throw new IllegalStateException("Server is stopped");
+ }
+ waitOnOperations(0);
}
@Override
public boolean waitOnOperations(final long timeout) throws Exception
{
- if (!started)
- {
- JournalStorageManager.log.warn("Server is stopped");
- throw new IllegalStateException("Server is stopped");
- }
- return getContext().waitCompletion(timeout);
+ if (!started)
+ {
+ JournalStorageManager.log.warn("Server is stopped");
+ throw new IllegalStateException("Server is stopped");
+ }
+ return getContext().waitCompletion(timeout);
}
@Override
@@ -665,7 +670,8 @@
return new LargeServerMessageImpl(this);
}
- protected final void addBytesToLargeMessage(final SequentialFile file, final long
messageId, final byte[] bytes)
+ public final void
+ addBytesToLargeMessage(final SequentialFile file, final long messageId, final
byte[] bytes)
throws Exception
{
readLock();
@@ -2241,7 +2247,7 @@
* @param messageID
* @return
*/
- SequentialFile createFileForLargeMessage(final long messageID, String extension)
+ public SequentialFile createFileForLargeMessage(final long messageID, String
extension)
{
return largeMessagesFactory.createSequentialFile(messageID + extension, -1);
}
Modified:
trunk/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/LargeServerMessageImpl.java
===================================================================
---
trunk/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/LargeServerMessageImpl.java 2012-01-12
12:07:32 UTC (rev 12011)
+++
trunk/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/LargeServerMessageImpl.java 2012-01-12
12:08:26 UTC (rev 12012)
@@ -54,7 +54,6 @@
private long pendingRecordID = -1;
private boolean paged;
- private boolean replicationSync;
// We should only use the NIO implementation on the Journal
private SequentialFile file;
@@ -107,9 +106,7 @@
paged = true;
}
- /* (non-Javadoc)
- * @see org.hornetq.core.server.LargeServerMessage#addBytes(byte[])
- */
+ @Override
public synchronized void addBytes(final byte[] bytes) throws Exception
{
validateFile();
@@ -239,6 +236,7 @@
return true;
}
+ @Override
public synchronized void deleteFile() throws Exception
{
validateFile();
@@ -430,14 +428,10 @@
private String getExtension()
{
- if (replicationSync)
- return ".sync";
return durable ? ".msg" : ".tmp";
}
- /* (non-Javadoc)
- * @see
org.hornetq.core.server.LargeServerMessage#setLinkedMessage(org.hornetq.core.server.LargeServerMessage)
- */
+ @Override
public void setLinkedMessage(final LargeServerMessage message)
{
if (file != null)
@@ -521,18 +515,10 @@
return bytesRead;
}
- /* (non-Javadoc)
- * @see org.hornetq.core.message.BodyEncoder#getLargeBodySize()
- */
+ @Override
public long getLargeBodySize()
{
return bodySize;
}
}
-
- @Override
- public void setReplicationSync(boolean sync)
- {
- replicationSync = sync;
- }
}
Added:
trunk/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/LargeServerMessageInSync.java
===================================================================
---
trunk/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/LargeServerMessageInSync.java
(rev 0)
+++
trunk/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/LargeServerMessageInSync.java 2012-01-12
12:08:26 UTC (rev 12012)
@@ -0,0 +1,135 @@
+/**
+ *
+ */
+package org.hornetq.core.persistence.impl.journal;
+
+import java.nio.ByteBuffer;
+
+import org.hornetq.api.core.HornetQException;
+import org.hornetq.core.journal.SequentialFile;
+import org.hornetq.core.logging.Logger;
+import org.hornetq.core.persistence.StorageManager;
+import org.hornetq.core.replication.ReplicatedLargeMessage;
+import org.hornetq.core.server.LargeServerMessage;
+
+public final class LargeServerMessageInSync implements ReplicatedLargeMessage
+{
+ private static final Logger log = Logger.getLogger(LargeServerMessageInSync.class);
+
+ private static final String SYNC_EXTENSION = ".sync";
+ private final LargeServerMessage mainLM;
+ private final StorageManager storageManager;
+ private SequentialFile appendFile;
+ private boolean syncDone;
+
+ /**
+ * @param storageManager
+ */
+ public LargeServerMessageInSync(StorageManager storageManager)
+ {
+ mainLM = storageManager.createLargeMessage();
+ this.storageManager = storageManager;
+ }
+
+ public synchronized void joinSyncedData(ByteBuffer buffer) throws Exception
+ {
+ SequentialFile mainSeqFile = mainLM.getFile();
+ if (appendFile != null)
+ {
+ for (;;)
+ {
+ buffer.rewind();
+ int size = appendFile.read(buffer);
+ mainSeqFile.writeInternal(buffer);
+ if (size < buffer.capacity())
+ {
+ break;
+ }
+ }
+ deleteAppendFile();
+ }
+ syncDone = true;
+ }
+
+ public SequentialFile getSyncFile() throws HornetQException
+ {
+ return mainLM.getFile();
+ }
+
+ @Override
+ public void setDurable(boolean durable)
+ {
+ mainLM.setDurable(durable);
+ }
+
+ @Override
+ public synchronized void setMessageID(long id)
+ {
+ mainLM.setMessageID(id);
+ }
+
+ @Override
+ public synchronized void releaseResources()
+ {
+ mainLM.releaseResources();
+ if (appendFile != null && appendFile.isOpen())
+ {
+ try
+ {
+ appendFile.close();
+ }
+ catch (Exception e)
+ {
+ log.error(e.getMessage(), e);
+ }
+ }
+ }
+
+ @Override
+ public synchronized void deleteFile() throws Exception
+ {
+ try
+ {
+ mainLM.deleteFile();
+ }
+ finally
+ {
+ deleteAppendFile();
+ }
+ }
+
+ /**
+ * @throws Exception
+ */
+ private void deleteAppendFile() throws Exception
+ {
+ if (appendFile != null)
+ {
+ if (appendFile.isOpen())
+ appendFile.close();
+ appendFile.delete();
+ }
+ }
+
+ @Override
+ public synchronized void addBytes(byte[] bytes) throws Exception
+ {
+ if (syncDone)
+ {
+ mainLM.addBytes(bytes);
+ return;
+ }
+
+ if (appendFile == null)
+ {
+ appendFile = storageManager.createFileForLargeMessage(mainLM.getMessageID(),
SYNC_EXTENSION);
+ }
+
+ if (!appendFile.isOpen())
+ {
+ appendFile.open();
+ }
+ storageManager.addBytesToLargeMessage(appendFile, mainLM.getMessageID(), bytes);
+ }
+
+}
Modified:
trunk/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/nullpm/NullStorageLargeServerMessage.java
===================================================================
---
trunk/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/nullpm/NullStorageLargeServerMessage.java 2012-01-12
12:07:32 UTC (rev 12011)
+++
trunk/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/nullpm/NullStorageLargeServerMessage.java 2012-01-12
12:08:26 UTC (rev 12012)
@@ -138,9 +138,7 @@
}
- /* (non-Javadoc)
- * @see org.hornetq.core.server.LargeServerMessage#incrementDelayDeletionCount()
- */
+ @Override
public void incrementDelayDeletionCount()
{
@@ -158,51 +156,25 @@
return "LargeServerMessage[messageID=" + messageID + ",
durable=" + durable + ", address=" + getAddress() +
",properties=" + properties.toString() + "]";
}
- /* (non-Javadoc)
- * @see org.hornetq.core.server.LargeServerMessage#setPaged()
- */
+ @Override
public void setPaged()
{
}
- /* (non-Javadoc)
- * @see org.hornetq.core.server.LargeServerMessage#setPendingRecordID(long)
- */
+ @Override
public void setPendingRecordID(long pendingRecordID)
{
}
- /* (non-Javadoc)
- * @see org.hornetq.core.server.LargeServerMessage#getPendingRecordID()
- */
+ @Override
public long getPendingRecordID()
{
return -1;
}
- /*
- * (non-Javadoc)
- * @see org.hornetq.core.server.LargeServerMessage#setReplicationSync(boolean)
- */
@Override
- public void setReplicationSync(boolean sync)
- {
- // TODO Auto-generated method stub
-
- }
-
- @Override
public SequentialFile getFile()
{
return null;
}
-
- // Package protected ---------------------------------------------
-
- // Protected -----------------------------------------------------
-
- // Private -------------------------------------------------------
-
- // Inner classes -------------------------------------------------
-
}
Modified:
trunk/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java
===================================================================
---
trunk/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java 2012-01-12
12:07:32 UTC (rev 12011)
+++
trunk/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java 2012-01-12
12:08:26 UTC (rev 12012)
@@ -28,6 +28,7 @@
import org.hornetq.core.journal.IOAsyncTask;
import org.hornetq.core.journal.Journal;
import org.hornetq.core.journal.JournalLoadInformation;
+import org.hornetq.core.journal.SequentialFile;
import org.hornetq.core.message.impl.MessageInternal;
import org.hornetq.core.paging.PageTransactionInfo;
import org.hornetq.core.paging.PagedMessage;
@@ -351,92 +352,67 @@
return true;
}
- /* (non-Javadoc)
- * @see
org.hornetq.core.persistence.StorageManager#setReplicator(org.hornetq.core.replication.ReplicationManager)
- */
public void setReplicator(final ReplicationManager replicator)
{
throw new IllegalStateException("Null Persistence should never be used as
replicated");
}
- /* (non-Javadoc)
- * @see
org.hornetq.core.persistence.StorageManager#afterCompleteOperations(org.hornetq.core.journal.IOCompletion)
- */
public void afterCompleteOperations(final IOAsyncTask run)
{
run.done();
}
- /* (non-Javadoc)
- * @see org.hornetq.core.persistence.StorageManager#waitOnOperations()
- */
+ @Override
public void waitOnOperations() throws Exception
{
}
- /* (non-Javadoc)
- * @see org.hornetq.core.persistence.StorageManager#getContext()
- */
+ @Override
public OperationContext getContext()
{
return dummyContext;
}
- /* (non-Javadoc)
- * @see org.hornetq.core.persistence.StorageManager#newContext()
- */
+ @Override
public OperationContext newContext(final Executor executor)
{
return dummyContext;
}
-
+ @Override
public OperationContext newSingleThreadContext()
{
return dummyContext;
}
-
- /* (non-Javadoc)
- * @see
org.hornetq.core.persistence.StorageManager#setContext(org.hornetq.core.persistence.OperationContext)
- */
+ @Override
public void setContext(final OperationContext context)
{
}
- /* (non-Javadoc)
- * @see org.hornetq.core.persistence.StorageManager#clearContext()
- */
+ @Override
public void clearContext()
{
}
- /* (non-Javadoc)
- * @see org.hornetq.core.persistence.StorageManager#recoverAddressSettings()
- */
+ @Override
public List<PersistedAddressSetting> recoverAddressSettings() throws Exception
{
return Collections.emptyList();
}
- /* (non-Javadoc)
- * @see
org.hornetq.core.persistence.StorageManager#storeAddressSetting(org.hornetq.core.persistconfig.PersistedAddressSetting)
- */
+ @Override
public void storeAddressSetting(PersistedAddressSetting addressSetting) throws
Exception
{
}
- /* (non-Javadoc)
- * @see org.hornetq.core.persistence.StorageManager#recoverPersistedRoles()
- */
+ @Override
public List<PersistedRoles> recoverPersistedRoles() throws Exception
{
return Collections.emptyList();
}
- /* (non-Javadoc)
- * @see
org.hornetq.core.persistence.StorageManager#storeSecurityRoles(org.hornetq.core.persistconfig.PersistedRoles)
- */
+ @Override
public void storeSecurityRoles(PersistedRoles persistedRoles) throws Exception
{
}
@@ -512,54 +488,44 @@
{
}
- /* (non-Javadoc)
- * @see org.hornetq.core.persistence.StorageManager#deletePageCounter(long, long)
- */
+ @Override
public void deletePageCounter(long txID, long recordID) throws Exception
{
}
- /* (non-Javadoc)
- * @see org.hornetq.core.persistence.StorageManager#storePageCounterInc(long, long,
int)
- */
+ @Override
public long storePageCounterInc(long txID, long queueID, int add) throws Exception
{
return 0;
}
- /* (non-Javadoc)
- * @see org.hornetq.core.persistence.StorageManager#storePageCounterInc(long, int)
- */
+ @Override
public long storePageCounterInc(long queueID, int add) throws Exception
{
return 0;
}
- /* (non-Javadoc)
- * @see org.hornetq.core.persistence.StorageManager#commit(long, boolean)
- */
+ @Override
public void commit(long txID, boolean lineUpContext) throws Exception
{
}
- /* (non-Javadoc)
- * @see org.hornetq.core.persistence.StorageManager#lineUpContext()
- */
+ @Override
public void lineUpContext()
{
}
- /* (non-Javadoc)
- * @see
org.hornetq.core.persistence.StorageManager#confirmPendingLargeMessageTX(org.hornetq.core.transaction.Transaction,
long, long)
- */
+ @Override
public void confirmPendingLargeMessageTX(Transaction transaction, long messageID, long
recordID) throws Exception
{
}
+ @Override
public void confirmPendingLargeMessage(long recordID) throws Exception
{
}
+ @Override
public void stop(boolean ioCriticalError) throws Exception
{
}
@@ -598,4 +564,16 @@
{
// no-op
}
+
+ @Override
+ public SequentialFile createFileForLargeMessage(long messageID, String extension)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void addBytesToLargeMessage(SequentialFile appendFile, long messageID, byte[]
bytes) throws Exception
+ {
+ // no-op
+ }
}
Modified:
trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/ReplicationStartSyncMessage.java
===================================================================
---
trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/ReplicationStartSyncMessage.java 2012-01-12
12:07:32 UTC (rev 12011)
+++
trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/ReplicationStartSyncMessage.java 2012-01-12
12:08:26 UTC (rev 12012)
@@ -1,5 +1,9 @@
package org.hornetq.core.protocol.core.impl.wireformat;
+import java.security.InvalidParameterException;
+import java.util.Arrays;
+import java.util.List;
+
import org.hornetq.api.core.HornetQBuffer;
import org.hornetq.core.journal.impl.JournalFile;
import org.hornetq.core.persistence.impl.journal.JournalStorageManager.JournalContent;
@@ -14,15 +18,57 @@
public class ReplicationStartSyncMessage extends PacketImpl
{
private long[] ids;
- private JournalContent journalType;
+ private SyncDataType dataType;
private boolean synchronizationIsFinished;
private String nodeID;
+ public enum SyncDataType
+ {
+ JournalBindings(JournalContent.BINDINGS.typeByte),
+ JournalMessages(JournalContent.MESSAGES.typeByte),
+ LargeMessages((byte)2);
+
+ private byte code;
+
+ private SyncDataType(byte code)
+ {
+ this.code = code;
+ }
+
+ public static JournalContent getJournalContentType(SyncDataType dataType)
+ {
+ return JournalContent.getType(dataType.code);
+ }
+
+ public static SyncDataType getDataType(byte code)
+ {
+ if (code == JournalBindings.code)
+ return JournalBindings;
+ if (code == JournalMessages.code)
+ return JournalMessages;
+ if (code == LargeMessages.code)
+ return LargeMessages;
+ throw new InvalidParameterException("invalid byte: " + code);
+ }
+ }
+
public ReplicationStartSyncMessage()
{
super(REPLICATION_START_FINISH_SYNC);
}
+ public ReplicationStartSyncMessage(List<Long> filenames)
+ {
+ this();
+ ids = new long[filenames.size()];
+ for (int i = 0; i < filenames.size(); i++)
+ {
+ ids[i] = filenames.get(i);
+ }
+ dataType = SyncDataType.LargeMessages;
+ nodeID = ""; // this value will be ignored
+ }
+
public ReplicationStartSyncMessage(String nodeID)
{
this();
@@ -40,7 +86,17 @@
{
ids[i] = datafiles[i].getFileID();
}
- journalType = contentType;
+ switch (contentType)
+ {
+ case MESSAGES:
+ dataType = SyncDataType.JournalMessages;
+ break;
+ case BINDINGS:
+ dataType = SyncDataType.JournalBindings;
+ break;
+ default:
+ throw new IllegalArgumentException();
+ }
}
@Override
@@ -50,7 +106,7 @@
buffer.writeString(nodeID);
if (synchronizationIsFinished)
return;
- buffer.writeByte(journalType.typeByte);
+ buffer.writeByte(dataType.code);
buffer.writeInt(ids.length);
for (long id : ids)
{
@@ -67,7 +123,7 @@
{
return;
}
- journalType = JournalContent.getType(buffer.readByte());
+ dataType = SyncDataType.getDataType(buffer.readByte());
int length = buffer.readInt();
ids = new long[length];
for (int i = 0; i < length; i++)
@@ -85,9 +141,9 @@
return synchronizationIsFinished;
}
- public JournalContent getJournalContentType()
+ public SyncDataType getDataType()
{
- return journalType;
+ return dataType;
}
public long[] getFileIds()
@@ -99,4 +155,58 @@
{
return nodeID;
}
+
+ @Override
+ public int hashCode()
+ {
+ final int prime = 31;
+ int result = super.hashCode();
+ result = prime * result + Arrays.hashCode(ids);
+ result = prime * result + ((dataType == null) ? 0 : dataType.hashCode());
+ result = prime * result + ((nodeID == null) ? 0 : nodeID.hashCode());
+ result = prime * result + (synchronizationIsFinished ? 1231 : 1237);
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object obj)
+ {
+ if (this == obj)
+ {
+ return true;
+ }
+ if (!super.equals(obj))
+ {
+ return false;
+ }
+ if (!(obj instanceof ReplicationStartSyncMessage))
+ {
+ return false;
+ }
+ ReplicationStartSyncMessage other = (ReplicationStartSyncMessage)obj;
+ if (!Arrays.equals(ids, other.ids))
+ {
+ return false;
+ }
+ if (dataType != other.dataType)
+ {
+ return false;
+ }
+ if (nodeID == null)
+ {
+ if (other.nodeID != null)
+ {
+ return false;
+ }
+ }
+ else if (!nodeID.equals(other.nodeID))
+ {
+ return false;
+ }
+ if (synchronizationIsFinished != other.synchronizationIsFinished)
+ {
+ return false;
+ }
+ return true;
+ }
}
Added:
trunk/hornetq-core/src/main/java/org/hornetq/core/replication/ReplicatedLargeMessage.java
===================================================================
---
trunk/hornetq-core/src/main/java/org/hornetq/core/replication/ReplicatedLargeMessage.java
(rev 0)
+++
trunk/hornetq-core/src/main/java/org/hornetq/core/replication/ReplicatedLargeMessage.java 2012-01-12
12:08:26 UTC (rev 12012)
@@ -0,0 +1,38 @@
+/**
+ *
+ */
+package org.hornetq.core.replication;
+
+import org.hornetq.core.server.LargeServerMessage;
+
+/**
+ *
+ */
+public interface ReplicatedLargeMessage
+{
+ /**
+ * @see LargeServerMessage#setDurable(boolean)
+ */
+ void setDurable(boolean b);
+
+ /**
+ * @see LargeServerMessage#setMessageID(long)
+ */
+ void setMessageID(long id);
+
+ /**
+ * @see LargeServerMessage#releaseResources()
+ */
+ void releaseResources();
+
+ /**
+ * @see LargeServerMessage#deleteFile()
+ */
+ void deleteFile() throws Exception;
+
+ /**
+ * @see LargeServerMessage#addBytes(byte[])
+ */
+ void addBytes(byte[] body) throws Exception;
+
+}
Modified:
trunk/hornetq-core/src/main/java/org/hornetq/core/replication/ReplicationEndpoint.java
===================================================================
---
trunk/hornetq-core/src/main/java/org/hornetq/core/replication/ReplicationEndpoint.java 2012-01-12
12:07:32 UTC (rev 12011)
+++
trunk/hornetq-core/src/main/java/org/hornetq/core/replication/ReplicationEndpoint.java 2012-01-12
12:08:26 UTC (rev 12012)
@@ -44,6 +44,7 @@
import org.hornetq.core.paging.impl.PagingStoreFactoryNIO;
import org.hornetq.core.persistence.StorageManager;
import org.hornetq.core.persistence.impl.journal.JournalStorageManager.JournalContent;
+import org.hornetq.core.persistence.impl.journal.LargeServerMessageInSync;
import org.hornetq.core.protocol.core.Channel;
import org.hornetq.core.protocol.core.ChannelHandler;
import org.hornetq.core.protocol.core.Packet;
@@ -64,9 +65,9 @@
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationPrepareMessage;
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationResponseMessage;
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationStartSyncMessage;
+import
org.hornetq.core.protocol.core.impl.wireformat.ReplicationStartSyncMessage.SyncDataType;
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationSyncFileMessage;
import org.hornetq.core.server.HornetQComponent;
-import org.hornetq.core.server.LargeServerMessage;
import org.hornetq.core.server.ServerMessage;
import org.hornetq.core.server.impl.HornetQServerImpl;
import org.hornetq.core.server.impl.QuorumManager;
@@ -97,7 +98,6 @@
/** Files reserved in each journal for synchronization of existing data from the
'live' server. */
private final Map<JournalContent, Map<Long, JournalSyncFile>>
filesReservedForSync =
new HashMap<JournalContent, Map<Long, JournalSyncFile>>();
- private final Map<Long, LargeServerMessage> largeMessagesOnSync = new
HashMap<Long, LargeServerMessage>();
/**
* Used to hold the real Journals before the backup is synchronized. This field should
be
@@ -111,8 +111,8 @@
private final ConcurrentMap<SimpleString, ConcurrentMap<Integer, Page>>
pageIndex =
new ConcurrentHashMap<SimpleString, ConcurrentMap<Integer,
Page>>();
- private final ConcurrentMap<Long, LargeServerMessage> largeMessages =
- new ConcurrentHashMap<Long, LargeServerMessage>();
+ private final ConcurrentMap<Long, ReplicatedLargeMessage> largeMessages =
+ new ConcurrentHashMap<Long, ReplicatedLargeMessage>();
// Used on tests, to simulate failures on delete pages
private boolean deletePages = true;
@@ -285,8 +285,7 @@
return;
}
- // This could be null if the backup server is being
- // shut down without any live server connecting here
+ // Channel may be null if there isn't a connection to a live server
if (channel != null)
{
channel.close();
@@ -309,18 +308,12 @@
pageIndex.clear();
- for (LargeServerMessage largeMessage : largeMessages.values())
+ for (ReplicatedLargeMessage largeMessage : largeMessages.values())
{
largeMessage.releaseResources();
}
largeMessages.clear();
- for (LargeServerMessage largeMessage : largeMessagesOnSync.values())
- {
- largeMessage.releaseResources();
- }
- largeMessagesOnSync.clear();
-
for (Entry<JournalContent, Map<Long, JournalSyncFile>> entry :
filesReservedForSync.entrySet())
{
for (JournalSyncFile filesReserved : entry.getValue().values())
@@ -444,42 +437,17 @@
journal.synchronizationUnlock();
}
}
- synchronized (largeMessagesOnSync)
+ ByteBuffer buffer = ByteBuffer.allocate(4 * 1024);
+ for (Entry<Long, ReplicatedLargeMessage> entry : largeMessages.entrySet())
{
- synchronized (largeMessages)
+ ReplicatedLargeMessage lm = entry.getValue();
+ if (lm instanceof LargeServerMessageInSync)
{
- ByteBuffer buffer = ByteBuffer.allocate(10 * 1024);
- for (Entry<Long, LargeServerMessage> entry : largeMessages.entrySet())
- {
- Long id = entry.getKey();
- LargeServerMessage lm = entry.getValue();
- if (largeMessagesOnSync.containsKey(id))
- {
- SequentialFile sq = lm.getFile();
- LargeServerMessage mainLM = largeMessagesOnSync.get(id);
- SequentialFile mainSeqFile = mainLM.getFile();
- for (;;)
- {
- buffer.rewind();
- int size = sq.read(buffer);
- mainSeqFile.writeInternal(buffer);
- if (size < buffer.capacity())
- {
- break;
- }
- }
- }
- else
- {
- // these are large-messages created after sync started
- largeMessagesOnSync.put(id, lm);
- }
- }
- largeMessages.clear();
- largeMessages.putAll(largeMessagesOnSync);
- largeMessagesOnSync.clear();
+ LargeServerMessageInSync lmSync = (LargeServerMessageInSync)lm;
+ lmSync.joinSyncedData(buffer);
}
}
+
journalsHolder = null;
quorumManager.setLiveID(liveID);
server.setRemoteBackupUpToDate(liveID);
@@ -487,7 +455,7 @@
return;
}
- private void handleReplicationSynchronization(ReplicationSyncFileMessage msg) throws
Exception
+ private synchronized void handleReplicationSynchronization(ReplicationSyncFileMessage
msg) throws Exception
{
Long id = Long.valueOf(msg.getId());
byte[] data = msg.getData();
@@ -496,24 +464,19 @@
{
case LARGE_MESSAGE:
{
- synchronized (largeMessagesOnSync)
+ ReplicatedLargeMessage largeMessage = lookupLargeMessage(id, false);
+ if (!(largeMessage instanceof LargeServerMessageInSync))
{
- LargeServerMessage largeMessage = largeMessagesOnSync.get(id);
- if (largeMessage == null)
- {
- largeMessage = storage.createLargeMessage();
- largeMessage.setDurable(true);
- largeMessage.setMessageID(id);
- largeMessagesOnSync.put(id, largeMessage);
- }
- channel = largeMessage.getFile();
+ log.error("large message sync: largeMessage instance is incompatible
with it, ignoring data");
+ return;
}
+ LargeServerMessageInSync
largeMessageInSync=(LargeServerMessageInSync)largeMessage;
+ channel = largeMessageInSync.getSyncFile();
break;
}
case PAGE:
{
Page page = getPage(msg.getPageStore(), (int)msg.getId());
-
channel = page.getFile();
break;
}
@@ -565,31 +528,51 @@
return;
}
- final Journal journal = journalsHolder.get(packet.getJournalContentType());
+
synchronized (this)
{
if (!started)
return;
- if (packet.getNodeID() != null)
+ switch (packet.getDataType())
{
- quorumManager.setLiveID(packet.getNodeID());
- }
- Map<Long, JournalSyncFile> mapToFill =
filesReservedForSync.get(packet.getJournalContentType());
- log.info("Journal " + packet.getJournalContentType() + ".
Reserving fileIDs for synchronization: " +
- Arrays.toString(packet.getFileIds()));
+ case LargeMessages:
+ for (long msgID : packet.getFileIds())
+ {
+ createLargeMessage(msgID, true);
+ }
+ break;
+ case JournalBindings:
+ case JournalMessages:
- for (Entry<Long, JournalFile> entry :
journal.createFilesForBackupSync(packet.getFileIds()).entrySet())
- {
- mapToFill.put(entry.getKey(), new JournalSyncFile(entry.getValue()));
+ final JournalContent journalContent =
SyncDataType.getJournalContentType(packet.getDataType());
+ final Journal journal = journalsHolder.get(journalContent);
+
+ if (packet.getNodeID() != null)
+ {
+ // At the start of replication, we still do not know which is the
nodeID that the live uses.
+ // This is the point where the backup gets this information.
+ quorumManager.setLiveID(packet.getNodeID());
+ }
+ Map<Long, JournalSyncFile> mapToFill =
filesReservedForSync.get(journalContent);
+ log.info("Journal " + journalContent + ". Reserving fileIDs
for synchronization: " +
+ Arrays.toString(packet.getFileIds()));
+
+ for (Entry<Long, JournalFile> entry :
journal.createFilesForBackupSync(packet.getFileIds()).entrySet())
+ {
+ mapToFill.put(entry.getKey(), new JournalSyncFile(entry.getValue()));
+ }
+ FileWrapperJournal syncJournal = new FileWrapperJournal(journal);
+ registerJournal(journalContent.typeByte, syncJournal);
+ break;
+ default:
+ throw new HornetQException(HornetQException.INTERNAL_ERROR,
"unhandled data type!");
}
- FileWrapperJournal syncJournal = new FileWrapperJournal(journal);
- registerJournal(packet.getJournalContentType().typeByte, syncJournal);
}
}
private void handleLargeMessageEnd(final ReplicationLargeMessageEndMessage packet)
{
- LargeServerMessage message = lookupLargeMessage(packet.getMessageId(), true);
+ ReplicatedLargeMessage message = lookupLargeMessage(packet.getMessageId(), true);
if (message != null)
{
@@ -609,7 +592,7 @@
*/
private void handleLargeMessageWrite(final ReplicationLargeMessageWriteMessage packet)
throws Exception
{
- LargeServerMessage message = lookupLargeMessage(packet.getMessageId(), false);
+ ReplicatedLargeMessage message = lookupLargeMessage(packet.getMessageId(), false);
if (message != null)
{
message.addBytes(packet.getBody());
@@ -624,9 +607,9 @@
compareJournalInformation(request.getJournalInformation());
}
- private LargeServerMessage lookupLargeMessage(final long messageId, final boolean
delete)
+ private ReplicatedLargeMessage lookupLargeMessage(final long messageId, final boolean
delete)
{
- LargeServerMessage message;
+ ReplicatedLargeMessage message;
if (delete)
{
@@ -635,18 +618,6 @@
else
{
message = largeMessages.get(messageId);
- if (message == null)
- {
- synchronized (largeMessages)
- {
- if (!server.isRemoteBackupUpToDate())
- {
- // in case we need to append data to a file while still sync'ing
the backup
- createLargeMessage(messageId, true);
- message = largeMessages.get(messageId);
- }
- }
- }
}
if (message == null)
@@ -671,10 +642,14 @@
private void createLargeMessage(final long id, boolean sync)
{
- LargeServerMessage msg = storage.createLargeMessage();
+ ReplicatedLargeMessage msg;
+ if (sync)
+ msg = new LargeServerMessageInSync(storage);
+ else
+ msg = storage.createLargeMessage();
+
msg.setDurable(true);
msg.setMessageID(id);
- msg.setReplicationSync(sync);
largeMessages.put(id, msg);
}
Modified:
trunk/hornetq-core/src/main/java/org/hornetq/core/replication/ReplicationManager.java
===================================================================
---
trunk/hornetq-core/src/main/java/org/hornetq/core/replication/ReplicationManager.java 2012-01-12
12:07:32 UTC (rev 12011)
+++
trunk/hornetq-core/src/main/java/org/hornetq/core/replication/ReplicationManager.java 2012-01-12
12:08:26 UTC (rev 12012)
@@ -13,6 +13,7 @@
package org.hornetq.core.replication;
+import java.util.List;
import java.util.Set;
import org.hornetq.api.core.HornetQException;
@@ -125,4 +126,13 @@
* @throws Exception
*/
void syncPages(SequentialFile file, long id, SimpleString pageStore) throws
Exception;
+
+ /**
+ * Reserves several LargeMessage IDs in the backup.
+ * <p>
+ * Doing this before hand removes the need of synchronizing large-message deletes with
the
+ * largeMessageSyncList.
+ * @param largeMessageIDs
+ */
+ void sendLargeMessageIdListMessage(List<Long> largeMessageIDs);
}
Modified:
trunk/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
===================================================================
---
trunk/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java 2012-01-12
12:07:32 UTC (rev 12011)
+++
trunk/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java 2012-01-12
12:08:26 UTC (rev 12012)
@@ -17,6 +17,7 @@
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.util.LinkedHashSet;
+import java.util.List;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
@@ -251,9 +252,7 @@
}
}
- /* (non-Javadoc)
- * @see org.hornetq.core.replication.ReplicationManager#largeMessageBegin(byte[])
- */
+ @Override
public void largeMessageBegin(final long messageId)
{
if (enabled)
@@ -270,9 +269,7 @@
}
}
- /* (non-Javadoc)
- * @see org.hornetq.core.replication.ReplicationManager#largeMessageWrite(long,
byte[])
- */
+ @Override
public void largeMessageWrite(final long messageId, final byte[] body)
{
if (enabled)
@@ -596,4 +593,12 @@
if (enabled)
sendReplicatePacket(new ReplicationStartSyncMessage(nodeID));
}
+
+ @Override
+ public void sendLargeMessageIdListMessage(List<Long> largeMessageIDs)
+ {
+ if (enabled)
+ sendReplicatePacket(new ReplicationStartSyncMessage(largeMessageIDs));
+
+ }
}
Modified:
trunk/hornetq-core/src/main/java/org/hornetq/core/server/LargeServerMessage.java
===================================================================
---
trunk/hornetq-core/src/main/java/org/hornetq/core/server/LargeServerMessage.java 2012-01-12
12:07:32 UTC (rev 12011)
+++
trunk/hornetq-core/src/main/java/org/hornetq/core/server/LargeServerMessage.java 2012-01-12
12:08:26 UTC (rev 12012)
@@ -15,21 +15,22 @@
import org.hornetq.api.core.HornetQException;
import org.hornetq.core.journal.SequentialFile;
+import org.hornetq.core.replication.ReplicatedLargeMessage;
/**
* A LargeMessage
*
* @author <a href="mailto:clebert.suconic@jboss.org">Clebert
Suconic</a>
*/
-public interface LargeServerMessage extends ServerMessage
+public interface LargeServerMessage extends ServerMessage, ReplicatedLargeMessage
{
void addBytes(byte[] bytes) throws Exception;
/** When a large message is copied (e.g. ExpiryQueue) instead of copying the file, we
specify a link between the messages */
void setLinkedMessage(LargeServerMessage message);
-
+
void setPendingRecordID(long pendingRecordID);
-
+
long getPendingRecordID();
boolean isFileExists() throws Exception;
@@ -50,13 +51,6 @@
void decrementDelayDeletionCount() throws Exception;
/**
- * This method only has relevance in a backup server.
- * @param sync {@code true} if this file is meant for appends of a message that needs
to be
- * sync'ed with the live.
- */
- void setReplicationSync(boolean sync);
-
- /**
* @return
* @throws HornetQException
*/
Modified:
trunk/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/JournalImpl.java
===================================================================
---
trunk/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/JournalImpl.java 2012-01-12
12:07:32 UTC (rev 12011)
+++
trunk/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/JournalImpl.java 2012-01-12
12:08:26 UTC (rev 12012)
@@ -2725,12 +2725,13 @@
/**
* @param completeTransaction If the appendRecord is for a prepare or commit, where we
should
* update the number of pendingTransactions on the current file
+ * @throws Exception
*/
private JournalFile appendRecord(final JournalInternalRecord encoder,
final boolean completeTransaction,
final boolean sync,
final JournalTransaction tx,
- final IOAsyncTask parameterCallback) throws
Exception
+ final IOAsyncTask parameterCallback) throws Exception
{
checkJournalIsLoaded();
Modified:
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupSyncJournalTest.java
===================================================================
---
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupSyncJournalTest.java 2012-01-12
12:07:32 UTC (rev 12011)
+++
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupSyncJournalTest.java 2012-01-12
12:08:26 UTC (rev 12012)
@@ -24,7 +24,7 @@
private static final int BACKUP_WAIT_TIME = 20;
private ServerLocatorInternal locator;
- private ClientSessionFactoryInternal sessionFactory;
+ protected ClientSessionFactoryInternal sessionFactory;
private ClientSession session;
private ClientProducer producer;
private BackupSyncDelay syncDelay;
Modified:
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupSyncLargeMessageTest.java
===================================================================
---
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupSyncLargeMessageTest.java 2012-01-12
12:07:32 UTC (rev 12011)
+++
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupSyncLargeMessageTest.java 2012-01-12
12:08:26 UTC (rev 12012)
@@ -40,12 +40,27 @@
getAllMessageFileIds(dir).size());
createProducerSendSomeMessages();
startBackupFinishSyncing();
- // File dir = new
- // File(backupServer.getServer().getConfiguration().getLargeMessagesDirectory());
receiveMsgsInRange(0, n_msgs / 2);
assertEquals("we really ought to delete these after delivery", n_msgs /
2, getAllMessageFileIds(dir).size());
}
+ public void testDeleteLargeMessagesDuringSync() throws Exception
+ {
+ File dir = new
File(backupServer.getServer().getConfiguration().getLargeMessagesDirectory());
+ assertEquals("Should not have any large messages... previous test failed to
clean up?", 0,
+ getAllMessageFileIds(dir).size());
+ createProducerSendSomeMessages();
+
+ backupServer.start();
+ waitForComponent(backupServer.getServer(), 5);
+ receiveMsgsInRange(0, n_msgs / 2);
+
+ startBackupFinishSyncing();
+ backupServer.stop();
+
+ assertEquals("we really ought to delete these after delivery", n_msgs /
2, getAllMessageFileIds(dir).size());
+ }
+
private Set<Long> getAllMessageFileIds(File dir)
{
Set<Long> idsOnBkp = new HashSet<Long>();
Modified:
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/persistence/DeleteMessagesOnStartupTest.java
===================================================================
---
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/persistence/DeleteMessagesOnStartupTest.java 2012-01-12
12:07:32 UTC (rev 12011)
+++
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/persistence/DeleteMessagesOnStartupTest.java 2012-01-12
12:08:26 UTC (rev 12012)
@@ -101,14 +101,4 @@
};
}
-
- // Package protected ---------------------------------------------
-
- // Protected -----------------------------------------------------
-
- // Private -------------------------------------------------------
-
- // Inner classes -------------------------------------------------
-
-
}