Author: borges
Date: 2011-07-29 13:24:59 -0400 (Fri, 29 Jul 2011)
New Revision: 11068
Added:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/ReplicationFileIdMessage.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/ReplicationJournalFileMessage.java
Removed:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/ReplicationJournalFile.java
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/PacketDecoder.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/PacketImpl.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/ReplicationManager.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
Log:
HORNETQ-720 Send fileID values used by live to backup
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
---
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2011-07-29
04:25:11 UTC (rev 11067)
+++
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2011-07-29
17:24:59 UTC (rev 11068)
@@ -160,7 +160,22 @@
public enum JournalContent
{
- MESSAGES, BINDINGS;
+ BINDINGS((byte)0), MESSAGES((byte)1);
+
+ public final byte typeByte;
+
+ JournalContent(byte b){
+ typeByte = b;
+ }
+
+ public static JournalContent getType(byte type)
+ {
+ if (MESSAGES.typeByte == type)
+ return MESSAGES;
+ if (BINDINGS.typeByte == type)
+ return BINDINGS;
+ throw new RuntimeException("invalid byte");
+ }
}
private Journal messageJournal;
@@ -337,6 +352,10 @@
*/
public void setReplicator(ReplicationManager replicationManager) throws Exception
{
+ if (!started)
+ {
+ throw new IllegalStateException("must be started...");
+ }
assert replicationManager != null;
replicator = replicationManager;
@@ -345,24 +364,28 @@
throw new HornetQException(HornetQException.INTERNAL_ERROR,
"journals here are not JournalImpl. You
can't set a replicator!");
}
- JournalImpl localMessageJournal = (JournalImpl)messageJournal;
- JournalImpl localBindingsJournal = (JournalImpl)bindingsJournal;
- if (false)
- {
+ // XXX NEED to take a global lock on the StorageManager.
+
+ final JournalImpl localMessageJournal = (JournalImpl)messageJournal;
+ final JournalImpl localBindingsJournal = (JournalImpl)bindingsJournal;
+
localMessageJournal.writeLock();
localBindingsJournal.writeLock();
- JournalFile[] messageFiles = prepateJournalForCopy(localMessageJournal);
- JournalFile[] bindingsFiles = prepateJournalForCopy(localBindingsJournal);
+ JournalFile[] messageFiles = prepareJournalForCopy(localMessageJournal,
JournalContent.MESSAGES);
+ JournalFile[] bindingsFiles = prepareJournalForCopy(localBindingsJournal,
JournalContent.BINDINGS);
+
localMessageJournal.writeUnlock();
localBindingsJournal.writeUnlock();
+
+ bindingsJournal = new ReplicatedJournal(((byte)0), localBindingsJournal,
replicator);
+ messageJournal = new ReplicatedJournal((byte)1, localMessageJournal, replicator);
+
sendJournalFile(messageFiles, JournalContent.MESSAGES);
sendJournalFile(bindingsFiles, JournalContent.BINDINGS);
- }
- // XXX NEED to take a global lock on the StorageManager.
- bindingsJournal = new ReplicatedJournal(((byte)0), localBindingsJournal,
replicator);
- messageJournal = new ReplicatedJournal((byte)1, localMessageJournal, replicator);
+
+ // SEND "SYNC_DONE" msg to backup telling it can become operational.
}
/**
@@ -381,12 +404,12 @@
}
}
- private JournalFile[] prepateJournalForCopy(JournalImpl journal) throws Exception
+ private JournalFile[] prepareJournalForCopy(JournalImpl journal, JournalContent
contentType) throws Exception
{
journal.setAutoReclaim(false);
/*
- * need to check whether it is safe to proceed if compacting is running (specially
at the end
- * of it)
+ * XXX need to check whether it is safe to proceed if compacting is running
(specially at the
+ * end of it)
*/
journal.forceMoveNextFile();
JournalFile[] datafiles = journal.getDataFiles();
@@ -394,6 +417,7 @@
{
jf.setCanReclaim(false);
}
+ replicator.reserveFileIds(datafiles, contentType);
return datafiles;
}
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/PacketDecoder.java
===================================================================
---
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/PacketDecoder.java 2011-07-29
04:25:11 UTC (rev 11067)
+++
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/PacketDecoder.java 2011-07-29
17:24:59 UTC (rev 11068)
@@ -106,6 +106,7 @@
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationCompareDataMessage;
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationDeleteMessage;
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationDeleteTXMessage;
+import org.hornetq.core.protocol.core.impl.wireformat.ReplicationFileIdMessage;
import
org.hornetq.core.protocol.core.impl.wireformat.ReplicationLargeMessageBeingMessage;
import
org.hornetq.core.protocol.core.impl.wireformat.ReplicationLargeMessageWriteMessage;
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationLargemessageEndMessage;
@@ -531,6 +532,11 @@
packet = new HaBackupRegistrationMessage();
break;
}
+ case PacketImpl.REPLICATION_FILE_ID:
+ {
+ packet = new ReplicationFileIdMessage();
+ break;
+ }
default:
{
throw new IllegalArgumentException("Invalid type: " + packetType);
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/PacketImpl.java
===================================================================
---
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/PacketImpl.java 2011-07-29
04:25:11 UTC (rev 11067)
+++
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/PacketImpl.java 2011-07-29
17:24:59 UTC (rev 11068)
@@ -196,6 +196,8 @@
public static final byte HA_BACKUP_REGISTRATION = 113;
+ public static final byte REPLICATION_FILE_ID = 120;
+
// Static --------------------------------------------------------
public PacketImpl(final byte type)
Added:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/ReplicationFileIdMessage.java
===================================================================
---
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/ReplicationFileIdMessage.java
(rev 0)
+++
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/ReplicationFileIdMessage.java 2011-07-29
17:24:59 UTC (rev 11068)
@@ -0,0 +1,65 @@
+package org.hornetq.core.protocol.core.impl.wireformat;
+
+import org.hornetq.api.core.HornetQBuffer;
+import org.hornetq.core.journal.impl.JournalFile;
+import org.hornetq.core.persistence.impl.journal.JournalStorageManager.JournalContent;
+import org.hornetq.core.protocol.core.impl.PacketImpl;
+
+/**
+ * Send all fileIDs used in the live server to the backup.
+ */
+public class ReplicationFileIdMessage extends PacketImpl
+{
+
+ private long[] ids;
+ private JournalContent journalType;
+
+ public ReplicationFileIdMessage()
+ {
+ super(REPLICATION_FILE_ID);
+ }
+
+ public ReplicationFileIdMessage(JournalFile[] datafiles, JournalContent contentType)
+ {
+ this();
+ ids = new long[datafiles.length];
+ for (int i = 0; i < datafiles.length; i++)
+ {
+ ids[i] = datafiles[i].getFileID();
+ }
+ journalType = contentType;
+ }
+
+ @Override
+ public void encodeRest(final HornetQBuffer buffer)
+ {
+ buffer.writeByte(journalType.typeByte);
+ buffer.writeInt(ids.length);
+ for (long id : ids)
+ {
+ buffer.writeLong(id);
+ }
+ }
+
+ @Override
+ public void decodeRest(final HornetQBuffer buffer)
+ {
+ journalType = JournalContent.getType(buffer.readByte());
+ int length = buffer.readInt();
+ ids = new long[length];
+ for (int i = 0; i < length; i++)
+ {
+ ids[i] = buffer.readLong();
+ }
+ }
+
+ public JournalContent getJournalContentType()
+ {
+ return journalType;
+ }
+
+ public long[] getFileIds()
+ {
+ return ids;
+ }
+}
Deleted:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/ReplicationJournalFile.java
===================================================================
---
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/ReplicationJournalFile.java 2011-07-29
04:25:11 UTC (rev 11067)
+++
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/ReplicationJournalFile.java 2011-07-29
17:24:59 UTC (rev 11068)
@@ -1,29 +0,0 @@
-package org.hornetq.core.protocol.core.impl.wireformat;
-
-import org.hornetq.core.persistence.impl.journal.JournalStorageManager.JournalContent;
-import org.hornetq.core.protocol.core.impl.PacketImpl;
-
-/**
- * Used to copy JournalFile data over to the backup during synchronization.
- */
-public final class ReplicationJournalFile extends PacketImpl
-{
-
- private byte[] data;
- private int dataSize;
- private JournalContent journalType;
-
- public ReplicationJournalFile()
- {
- super(REPLICATION_SYNC);
- }
-
- public ReplicationJournalFile(int size, byte[] data, JournalContent content)
- {
- this();
- this.dataSize = size;
- this.data = data;
- this.journalType = content;
- }
-
-}
Added:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/ReplicationJournalFileMessage.java
===================================================================
---
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/ReplicationJournalFileMessage.java
(rev 0)
+++
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/ReplicationJournalFileMessage.java 2011-07-29
17:24:59 UTC (rev 11068)
@@ -0,0 +1,50 @@
+package org.hornetq.core.protocol.core.impl.wireformat;
+
+import org.hornetq.api.core.HornetQBuffer;
+import org.hornetq.core.persistence.impl.journal.JournalStorageManager.JournalContent;
+import org.hornetq.core.protocol.core.impl.PacketImpl;
+
+/**
+ * Used to copy JournalFile data over to the backup during synchronization.
+ */
+public final class ReplicationJournalFileMessage extends PacketImpl
+{
+
+ private byte[] data;
+ private int dataSize;
+ private JournalContent journalType;
+ private long fileId;
+
+ public ReplicationJournalFileMessage()
+ {
+ super(REPLICATION_SYNC);
+ }
+
+ public ReplicationJournalFileMessage(int size, byte[] data, JournalContent content,
long id)
+ {
+ this();
+ this.fileId = id;
+ this.dataSize = size;
+ this.data = data;
+ this.journalType = content;
+ }
+
+ @Override
+ public void encodeRest(final HornetQBuffer buffer)
+ {
+ buffer.writeLong(fileId);
+ buffer.writeByte(journalType.typeByte);
+ buffer.writeInt(dataSize);
+ buffer.writeBytes(data, 0, dataSize);
+ }
+
+ @Override
+ public void decodeRest(final HornetQBuffer buffer)
+ {
+ fileId = buffer.readLong();
+ journalType = JournalContent.getType(buffer.readByte());
+ int size = buffer.readInt();
+ data = new byte[size];
+ buffer.readBytes(data);
+ }
+}
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/ReplicationManager.java
===================================================================
---
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/ReplicationManager.java 2011-07-29
04:25:11 UTC (rev 11067)
+++
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/ReplicationManager.java 2011-07-29
17:24:59 UTC (rev 11068)
@@ -93,4 +93,12 @@
*/
void sendJournalFile(JournalFile jf, JournalContent type) throws IOException,
HornetQException;
+ /**
+ * Reserve the following fileIDs in the backup server.
+ * @param datafiles
+ * @param contentType
+ * @throws HornetQException
+ */
+ void reserveFileIds(JournalFile[] datafiles, JournalContent contentType) throws
HornetQException;
+
}
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
===================================================================
---
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java 2011-07-29
04:25:11 UTC (rev 11067)
+++
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java 2011-07-29
17:24:59 UTC (rev 11068)
@@ -21,6 +21,7 @@
import org.hornetq.core.config.Configuration;
import org.hornetq.core.journal.Journal;
import org.hornetq.core.journal.JournalLoadInformation;
+import org.hornetq.core.journal.impl.JournalImpl;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.paging.Page;
import org.hornetq.core.paging.PagedMessage;
@@ -28,6 +29,7 @@
import org.hornetq.core.paging.impl.PagingManagerImpl;
import org.hornetq.core.paging.impl.PagingStoreFactoryNIO;
import org.hornetq.core.persistence.impl.journal.JournalStorageManager;
+import org.hornetq.core.persistence.impl.journal.JournalStorageManager.JournalContent;
import org.hornetq.core.protocol.core.Channel;
import org.hornetq.core.protocol.core.Packet;
import org.hornetq.core.protocol.core.impl.PacketImpl;
@@ -39,6 +41,7 @@
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationCompareDataMessage;
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationDeleteMessage;
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationDeleteTXMessage;
+import org.hornetq.core.protocol.core.impl.wireformat.ReplicationFileIdMessage;
import
org.hornetq.core.protocol.core.impl.wireformat.ReplicationLargeMessageBeingMessage;
import
org.hornetq.core.protocol.core.impl.wireformat.ReplicationLargeMessageWriteMessage;
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationLargemessageEndMessage;
@@ -80,10 +83,11 @@
private JournalLoadInformation[] journalLoadInformation;
- private final ConcurrentMap<SimpleString, ConcurrentMap<Integer, Page>>
pageIndex = new ConcurrentHashMap<SimpleString, ConcurrentMap<Integer,
Page>>();
+ private final ConcurrentMap<SimpleString, ConcurrentMap<Integer, Page>>
pageIndex =
+ new ConcurrentHashMap<SimpleString, ConcurrentMap<Integer,
Page>>();
+ private final ConcurrentMap<Long, LargeServerMessage> largeMessages =
+ new ConcurrentHashMap<Long, LargeServerMessage>();
- private final ConcurrentMap<Long, LargeServerMessage> largeMessages = new
ConcurrentHashMap<Long, LargeServerMessage>();
-
// Used on tests, to simulate failures on delete pages
private boolean deletePages = true;
@@ -176,6 +180,10 @@
handleCompareDataMessage((ReplicationCompareDataMessage)packet);
response = new NullResponseMessage();
}
+ else if (type == PacketImpl.REPLICATION_FILE_ID)
+ {
+ handleJournalFileIdReservation((ReplicationFileIdMessage)packet);
+ }
else
{
log.warn("Packet " + packet + " can't be processed by the
ReplicationEndpoint");
@@ -215,8 +223,8 @@
server.getManagementService().setStorageManager(storage);
- registerJournal((byte)1, storage.getMessageJournal());
- registerJournal((byte)0, storage.getBindingsJournal());
+ registerJournal(JournalContent.MESSAGES.typeByte, storage.getMessageJournal());
+ registerJournal(JournalContent.BINDINGS.typeByte, storage.getBindingsJournal());
// We only need to load internal structures on the backup...
journalLoadInformation = storage.loadInternalOnly();
@@ -353,9 +361,24 @@
// Protected -----------------------------------------------------
// Private -------------------------------------------------------
- /**
- * @param packet
- */
+
+ private void handleJournalFileIdReservation(final ReplicationFileIdMessage packet)
throws HornetQException
+ {
+ final Journal journalIf = journals[packet.getJournalContentType().typeByte];
+ if (journalIf.isStarted())
+ {
+ throw new HornetQException(HornetQException.INTERNAL_ERROR, "Journal can
not be started!");
+ }
+
+ if (!(journalIf instanceof JournalImpl))
+ {
+ throw new HornetQException(HornetQException.INTERNAL_ERROR,
+ "Journals of backup server are expected to be
JournalImpl");
+ }
+ JournalImpl journal = (JournalImpl)journalIf;
+
+ }
+
private void handleLargeMessageEnd(final ReplicationLargemessageEndMessage packet)
{
LargeServerMessage message = lookupLargeMessage(packet.getMessageId(), true);
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
===================================================================
---
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java 2011-07-29
04:25:11 UTC (rev 11067)
+++
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java 2011-07-29
17:24:59 UTC (rev 11068)
@@ -44,7 +44,8 @@
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationCompareDataMessage;
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationDeleteMessage;
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationDeleteTXMessage;
-import org.hornetq.core.protocol.core.impl.wireformat.ReplicationJournalFile;
+import org.hornetq.core.protocol.core.impl.wireformat.ReplicationFileIdMessage;
+import org.hornetq.core.protocol.core.impl.wireformat.ReplicationJournalFileMessage;
import
org.hornetq.core.protocol.core.impl.wireformat.ReplicationLargeMessageBeingMessage;
import
org.hornetq.core.protocol.core.impl.wireformat.ReplicationLargeMessageWriteMessage;
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationLargemessageEndMessage;
@@ -507,15 +508,23 @@
public void sendJournalFile(JournalFile jf, JournalContent content) throws
IOException, HornetQException
{
FileInputStream file = new FileInputStream(jf.getFile().getFileName());
- byte[] data = new byte[1 << 17]; // about 130 kB
+ final long id = jf.getFileID();
+ final byte[] data = new byte[1 << 17]; // about 130 kB
while (true)
{
int bytesRead = file.read(data);
if (bytesRead == -1)
break;
- replicatingChannel.sendBlocking(new ReplicationJournalFile(bytesRead, data,
content));
+ replicatingChannel.sendBlocking(new ReplicationJournalFileMessage(bytesRead,
data, content, id));
}
+ // XXX probably need to sync the JournalFile(?)
throw new UnsupportedOperationException();
}
+ @Override
+ public void reserveFileIds(JournalFile[] datafiles, JournalContent contentType) throws
HornetQException
+ {
+ replicatingChannel.sendBlocking(new ReplicationFileIdMessage(datafiles,
contentType));
+ }
+
}