Author: borges
Date: 2011-09-26 06:35:30 -0400 (Mon, 26 Sep 2011)
New Revision: 11418
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/NettyReplicatedFailoverTest.java
Log:
HORNETQ-720 Close the replicator in case we close the JournalStorageManager,
make sure we never send anything when replication is !enabled
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
---
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2011-09-26
10:34:14 UTC (rev 11417)
+++
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2011-09-26
10:35:30 UTC (rev 11418)
@@ -2055,8 +2055,10 @@
idGenerator.close();
}
+ if (replicator != null)
+ replicator.stop();
+
bindingsJournal.stop();
-
messageJournal.stop();
singleThreadExecutor.shutdown();
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
===================================================================
---
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java 2011-09-26
10:34:14 UTC (rev 11417)
+++
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java 2011-09-26
10:35:30 UTC (rev 11418)
@@ -101,10 +101,7 @@
public void appendAddRecord(final byte journalID, final long id, final byte
recordType, final EncodingSupport record)
{
- if (enabled)
- {
- sendReplicatePacket(new ReplicationAddMessage(journalID, false, id, recordType,
record));
- }
+ sendReplicatePacket(new ReplicationAddMessage(journalID, false, id, recordType,
record));
}
/* (non-Javadoc)
@@ -115,10 +112,7 @@
final byte recordType,
final EncodingSupport record) throws Exception
{
- if (enabled)
- {
- sendReplicatePacket(new ReplicationAddMessage(journalID, true, id, recordType,
record));
- }
+ sendReplicatePacket(new ReplicationAddMessage(journalID, true, id, recordType,
record));
}
/* (non-Javadoc)
@@ -126,10 +120,7 @@
*/
public void appendDeleteRecord(final byte journalID, final long id) throws Exception
{
- if (enabled)
- {
- sendReplicatePacket(new ReplicationDeleteMessage(journalID, id));
- }
+ sendReplicatePacket(new ReplicationDeleteMessage(journalID, id));
}
public void appendAddRecordTransactional(final byte journalID,
@@ -138,10 +129,7 @@
final byte recordType,
final EncodingSupport record) throws
Exception
{
- if (enabled)
- {
- sendReplicatePacket(new ReplicationAddTXMessage(journalID, false, txID, id,
recordType, record));
- }
+ sendReplicatePacket(new ReplicationAddTXMessage(journalID, false, txID, id,
recordType, record));
}
/* (non-Javadoc)
@@ -153,10 +141,7 @@
final byte recordType,
final EncodingSupport record) throws
Exception
{
- if (enabled)
- {
- sendReplicatePacket(new ReplicationAddTXMessage(journalID, true, txID, id,
recordType, record));
- }
+ sendReplicatePacket(new ReplicationAddTXMessage(journalID, true, txID, id,
recordType, record));
}
/* (non-Javadoc)
@@ -164,10 +149,7 @@
*/
public void appendCommitRecord(final byte journalID, final long txID, final boolean
lineUp) throws Exception
{
- if (enabled)
- {
- sendReplicatePacket(new ReplicationCommitMessage(journalID, false, txID),
lineUp);
- }
+ sendReplicatePacket(new ReplicationCommitMessage(journalID, false, txID), lineUp);
}
/* (non-Javadoc)
@@ -178,10 +160,7 @@
final long id,
final EncodingSupport record) throws
Exception
{
- if (enabled)
- {
- sendReplicatePacket(new ReplicationDeleteTXMessage(journalID, txID, id,
record));
- }
+ sendReplicatePacket(new ReplicationDeleteTXMessage(journalID, txID, id, record));
}
/* (non-Javadoc)
@@ -189,10 +168,7 @@
*/
public void appendDeleteRecordTransactional(final byte journalID, final long txID,
final long id) throws Exception
{
- if (enabled)
- {
sendReplicatePacket(new ReplicationDeleteTXMessage(journalID, txID, id,
NullEncoding.instance));
- }
}
/* (non-Javadoc)
@@ -200,10 +176,7 @@
*/
public void appendPrepareRecord(final byte journalID, final long txID, final
EncodingSupport transactionData) throws Exception
{
- if (enabled)
- {
- sendReplicatePacket(new ReplicationPrepareMessage(journalID, txID,
transactionData));
- }
+ sendReplicatePacket(new ReplicationPrepareMessage(journalID, txID,
transactionData));
}
/* (non-Javadoc)
@@ -211,10 +184,7 @@
*/
public void appendRollbackRecord(final byte journalID, final long txID) throws
Exception
{
- if (enabled)
- {
- sendReplicatePacket(new ReplicationCommitMessage(journalID, false, txID));
- }
+ sendReplicatePacket(new ReplicationCommitMessage(journalID, false, txID));
}
/* (non-Javadoc)
@@ -222,10 +192,7 @@
*/
public void pageClosed(final SimpleString storeName, final int pageNumber)
{
- if (enabled)
- {
- sendReplicatePacket(new ReplicationPageEventMessage(storeName, pageNumber,
false));
- }
+ sendReplicatePacket(new ReplicationPageEventMessage(storeName, pageNumber,
false));
}
/* (non-Javadoc)
@@ -233,10 +200,7 @@
*/
public void pageDeleted(final SimpleString storeName, final int pageNumber)
{
- if (enabled)
- {
- sendReplicatePacket(new ReplicationPageEventMessage(storeName, pageNumber,
true));
- }
+ sendReplicatePacket(new ReplicationPageEventMessage(storeName, pageNumber, true));
}
/* (non-Javadoc)
@@ -244,10 +208,7 @@
*/
public void pageWrite(final PagedMessage message, final int pageNumber)
{
- if (enabled)
- {
- sendReplicatePacket(new ReplicationPageWriteMessage(message, pageNumber));
- }
+ sendReplicatePacket(new ReplicationPageWriteMessage(message, pageNumber));
}
/* (non-Javadoc)
@@ -255,10 +216,7 @@
*/
public void largeMessageBegin(final long messageId)
{
- if (enabled)
- {
- sendReplicatePacket(new ReplicationLargeMessageBeingMessage(messageId));
- }
+ sendReplicatePacket(new ReplicationLargeMessageBeingMessage(messageId));
}
/* XXX Unused? */
@@ -275,10 +233,7 @@
*/
public void largeMessageWrite(final long messageId, final byte[] body)
{
- if (enabled)
- {
- sendReplicatePacket(new ReplicationLargeMessageWriteMessage(messageId, body));
- }
+ sendReplicatePacket(new ReplicationLargeMessageWriteMessage(messageId, body));
}
/* (non-Javadoc)
@@ -312,6 +267,7 @@
*/
public void stop() throws Exception
{
+
if (!started)
{
return;
@@ -333,7 +289,6 @@
ReplicationManagerImpl.log.warn("Error completing callback on
replication manager", e);
}
}
-
if (replicatingChannel != null)
{
replicatingChannel.close();
@@ -372,6 +327,7 @@
public void compareJournals(final JournalLoadInformation[] journalInfo) throws
HornetQException
{
+ if (enabled)
replicatingChannel.sendBlocking(new ReplicationCompareDataMessage(journalInfo));
}
@@ -388,6 +344,8 @@
private void sendReplicatePacket(final Packet packet, boolean lineUp)
{
+ if (!enabled)
+ return;
boolean runItNow = false;
OperationContext repliToken = OperationContextImpl.getContext(executorFactory);
@@ -506,9 +464,12 @@
@Override
public void syncJournalFile(JournalFile jf, JournalContent content) throws Exception
{
- SequentialFile file = jf.getFile().copy();
- log.info("Replication: sending " + jf + " (size=" + file.size()
+ ") to backup. " + file);
- sendLargeFile(content, null, jf.getFileID(), file, Long.MAX_VALUE);
+ if (enabled)
+ {
+ SequentialFile file = jf.getFile().copy();
+ log.info("Replication: sending " + jf + " (size=" +
file.size() + ") to backup. " + file);
+ sendLargeFile(content, null, jf.getFileID(), file, Long.MAX_VALUE);
+ }
}
@Override
@@ -536,6 +497,8 @@
long maxBytesToSend)
throws Exception
{
+ if (!enabled)
+ return;
if (!file.isOpen())
{
file.open(1, false);
Modified:
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/NettyReplicatedFailoverTest.java
===================================================================
---
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/NettyReplicatedFailoverTest.java 2011-09-26
10:34:14 UTC (rev 11417)
+++
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/NettyReplicatedFailoverTest.java 2011-09-26
10:35:30 UTC (rev 11418)
@@ -13,10 +13,6 @@
package org.hornetq.tests.integration.cluster.failover;
-import org.hornetq.core.config.Configuration;
-import org.hornetq.tests.integration.cluster.util.SameProcessHornetQServer;
-import org.hornetq.tests.integration.cluster.util.TestableServer;
-
/**
* A NettyReplicatedFailoverTest
*