Author: borges
Date: 2011-11-24 07:24:34 -0500 (Thu, 24 Nov 2011)
New Revision: 11753
Modified:
trunk/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/ReplicationStartSyncMessage.java
trunk/hornetq-core/src/main/java/org/hornetq/core/replication/ReplicationManager.java
trunk/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
trunk/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
Log:
HORNETQ-720 Send live's nodeID as soon as possible, as it is needed to detect
failures.
Modified:
trunk/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
---
trunk/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2011-11-24
10:25:58 UTC (rev 11752)
+++
trunk/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2011-11-24
12:24:34 UTC (rev 11753)
@@ -396,8 +396,8 @@
pagingManager.lock();
try
{
- messageFiles = prepareJournalForCopy(localMessageJournal,
JournalContent.MESSAGES);
- bindingsFiles = prepareJournalForCopy(localBindingsJournal,
JournalContent.BINDINGS);
+ messageFiles = prepareJournalForCopy(localMessageJournal,
JournalContent.MESSAGES, nodeID);
+ bindingsFiles = prepareJournalForCopy(localBindingsJournal,
JournalContent.BINDINGS, nodeID);
pageFilesToSync = getPageInformationForSync(pagingManager);
largeMessageFilesToSync = getLargeMessageInformation();
}
@@ -527,11 +527,12 @@
}
}
- private JournalFile[] prepareJournalForCopy(Journal journal, JournalContent
contentType) throws Exception
+ private JournalFile[]
+ prepareJournalForCopy(Journal journal, JournalContent contentType, String
nodeID) throws Exception
{
journal.forceMoveNextFile();
JournalFile[] datafiles = journal.getDataFiles();
- replicator.sendStartSyncMessage(datafiles, contentType);
+ replicator.sendStartSyncMessage(datafiles, contentType, nodeID);
return datafiles;
}
Modified:
trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/ReplicationStartSyncMessage.java
===================================================================
---
trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/ReplicationStartSyncMessage.java 2011-11-24
10:25:58 UTC (rev 11752)
+++
trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/ReplicationStartSyncMessage.java 2011-11-24
12:24:34 UTC (rev 11753)
@@ -30,9 +30,10 @@
this.nodeID = nodeID;
}
- public ReplicationStartSyncMessage(JournalFile[] datafiles, JournalContent
contentType)
+ public ReplicationStartSyncMessage(JournalFile[] datafiles, JournalContent
contentType, String nodeID)
{
this();
+ this.nodeID = nodeID;
synchronizationIsFinished = false;
ids = new long[datafiles.length];
for (int i = 0; i < datafiles.length; i++)
@@ -46,11 +47,9 @@
public void encodeRest(final HornetQBuffer buffer)
{
buffer.writeBoolean(synchronizationIsFinished);
+ buffer.writeString(nodeID);
if (synchronizationIsFinished)
- {
- buffer.writeString(nodeID);
return;
- }
buffer.writeByte(journalType.typeByte);
buffer.writeInt(ids.length);
for (long id : ids)
@@ -63,9 +62,9 @@
public void decodeRest(final HornetQBuffer buffer)
{
synchronizationIsFinished = buffer.readBoolean();
+ nodeID = buffer.readString();
if (synchronizationIsFinished)
{
- nodeID = buffer.readString();
return;
}
journalType = JournalContent.getType(buffer.readByte());
Modified:
trunk/hornetq-core/src/main/java/org/hornetq/core/replication/ReplicationManager.java
===================================================================
---
trunk/hornetq-core/src/main/java/org/hornetq/core/replication/ReplicationManager.java 2011-11-24
10:25:58 UTC (rev 11752)
+++
trunk/hornetq-core/src/main/java/org/hornetq/core/replication/ReplicationManager.java 2011-11-24
12:24:34 UTC (rev 11753)
@@ -93,7 +93,8 @@
* @param contentType
* @throws HornetQException
*/
- void sendStartSyncMessage(JournalFile[] datafiles, JournalContent contentType) throws
HornetQException;
+ void sendStartSyncMessage(JournalFile[] datafiles, JournalContent contentType, String
nodeID)
+
throws HornetQException;
/**
* Informs backup that data synchronization is done.
Modified:
trunk/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
===================================================================
---
trunk/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java 2011-11-24
10:25:58 UTC (rev 11752)
+++
trunk/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java 2011-11-24
12:24:34 UTC (rev 11753)
@@ -131,6 +131,7 @@
// Public --------------------------------------------------------
+ @Override
public void registerJournal(final byte id, final Journal journal)
{
if (journals == null || id >= journals.length)
@@ -150,6 +151,7 @@
journals[id] = journal;
}
+ @Override
public void handlePacket(final Packet packet)
{
PacketImpl response = new ReplicationResponseMessage();
@@ -341,17 +343,13 @@
started = false;
}
- /* (non-Javadoc)
- * @see org.hornetq.core.replication.ReplicationEndpoint#getChannel()
- */
+ @Override
public Channel getChannel()
{
return channel;
}
- /* (non-Javadoc)
- * @see
org.hornetq.core.replication.ReplicationEndpoint#setChannel(org.hornetq.core.remoting.Channel)
- */
+ @Override
public void setChannel(final Channel channel)
{
this.channel = channel;
@@ -568,7 +566,11 @@
synchronized (this)
{
if (!started)
- return;
+ return;
+ if (packet.getNodeID() != null)
+ {
+ quorumManager.setLiveID(packet.getNodeID());
+ }
Map<Long, JournalSyncFile> mapToFill =
filesReservedForSync.get(packet.getJournalContentType());
log.info("Journal " + packet.getJournalContentType() + ".
Reserving fileIDs for synchronization: " +
Arrays.toString(packet.getFileIds()));
@@ -580,7 +582,7 @@
FileWrapperJournal syncJournal = new FileWrapperJournal(journal);
registerJournal(packet.getJournalContentType().typeByte, syncJournal);
}
- }
+ }
private void handleLargeMessageEnd(final ReplicationLargeMessageEndMessage packet)
{
Modified:
trunk/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
===================================================================
---
trunk/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java 2011-11-24
10:25:58 UTC (rev 11752)
+++
trunk/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java 2011-11-24
12:24:34 UTC (rev 11753)
@@ -110,9 +110,7 @@
}
}
- /* (non-Javadoc)
- * @see org.hornetq.core.replication.ReplicationManager#appendUpdateRecord(byte, long,
byte, org.hornetq.core.journal.EncodingSupport, boolean)
- */
+ @Override
public void appendUpdateRecord(final byte journalID,
final long id,
final byte recordType,
@@ -124,9 +122,7 @@
}
}
- /* (non-Javadoc)
- * @see org.hornetq.core.replication.ReplicationManager#appendDeleteRecord(byte, long,
boolean)
- */
+ @Override
public void appendDeleteRecord(final byte journalID, final long id) throws Exception
{
if (enabled)
@@ -585,10 +581,13 @@
}
@Override
- public void sendStartSyncMessage(JournalFile[] datafiles, JournalContent contentType)
throws HornetQException
+ public
+ void
+ sendStartSyncMessage(JournalFile[] datafiles, JournalContent contentType,
String nodeID)
+
throws HornetQException
{
if (enabled)
- sendReplicatePacket(new ReplicationStartSyncMessage(datafiles, contentType));
+ sendReplicatePacket(new ReplicationStartSyncMessage(datafiles, contentType,
nodeID));
}
@Override