[hornetq-commits] JBoss hornetq SVN: r11418 - in branches/HORNETQ-720_Replication: hornetq-core/src/main/java/org/hornetq/core/replication/impl and 1 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Mon Sep 26 06:35:31 EDT 2011


Author: borges
Date: 2011-09-26 06:35:30 -0400 (Mon, 26 Sep 2011)
New Revision: 11418

Modified:
   branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
   branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
   branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/NettyReplicatedFailoverTest.java
Log:
HORNETQ-720 Close the replicator in case we close the JournalStorageManager,
make sure we never send anything when replication is !enabled

Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java	2011-09-26 10:34:14 UTC (rev 11417)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java	2011-09-26 10:35:30 UTC (rev 11418)
@@ -2055,8 +2055,10 @@
          idGenerator.close();
       }
 
+      if (replicator != null)
+         replicator.stop();
+
       bindingsJournal.stop();
-
       messageJournal.stop();
 
       singleThreadExecutor.shutdown();

Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java	2011-09-26 10:34:14 UTC (rev 11417)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java	2011-09-26 10:35:30 UTC (rev 11418)
@@ -101,10 +101,7 @@
 
    public void appendAddRecord(final byte journalID, final long id, final byte recordType, final EncodingSupport record)
    {
-      if (enabled)
-      {
-         sendReplicatePacket(new ReplicationAddMessage(journalID, false, id, recordType, record));
-      }
+      sendReplicatePacket(new ReplicationAddMessage(journalID, false, id, recordType, record));
    }
 
    /* (non-Javadoc)
@@ -115,10 +112,7 @@
                                   final byte recordType,
                                   final EncodingSupport record) throws Exception
    {
-      if (enabled)
-      {
-         sendReplicatePacket(new ReplicationAddMessage(journalID, true, id, recordType, record));
-      }
+      sendReplicatePacket(new ReplicationAddMessage(journalID, true, id, recordType, record));
    }
 
    /* (non-Javadoc)
@@ -126,10 +120,7 @@
     */
    public void appendDeleteRecord(final byte journalID, final long id) throws Exception
    {
-      if (enabled)
-      {
-         sendReplicatePacket(new ReplicationDeleteMessage(journalID, id));
-      }
+      sendReplicatePacket(new ReplicationDeleteMessage(journalID, id));
    }
 
    public void appendAddRecordTransactional(final byte journalID,
@@ -138,10 +129,7 @@
                                             final byte recordType,
                                             final EncodingSupport record) throws Exception
    {
-      if (enabled)
-      {
-         sendReplicatePacket(new ReplicationAddTXMessage(journalID, false, txID, id, recordType, record));
-      }
+      sendReplicatePacket(new ReplicationAddTXMessage(journalID, false, txID, id, recordType, record));
    }
 
    /* (non-Javadoc)
@@ -153,10 +141,7 @@
                                                final byte recordType,
                                                final EncodingSupport record) throws Exception
    {
-      if (enabled)
-      {
-         sendReplicatePacket(new ReplicationAddTXMessage(journalID, true, txID, id, recordType, record));
-      }
+      sendReplicatePacket(new ReplicationAddTXMessage(journalID, true, txID, id, recordType, record));
    }
 
    /* (non-Javadoc)
@@ -164,10 +149,7 @@
     */
    public void appendCommitRecord(final byte journalID, final long txID, final boolean lineUp) throws Exception
    {
-      if (enabled)
-      {
-         sendReplicatePacket(new ReplicationCommitMessage(journalID, false, txID), lineUp);
-      }
+      sendReplicatePacket(new ReplicationCommitMessage(journalID, false, txID), lineUp);
    }
 
    /* (non-Javadoc)
@@ -178,10 +160,7 @@
                                                final long id,
                                                final EncodingSupport record) throws Exception
    {
-      if (enabled)
-      {
-         sendReplicatePacket(new ReplicationDeleteTXMessage(journalID, txID, id, record));
-      }
+      sendReplicatePacket(new ReplicationDeleteTXMessage(journalID, txID, id, record));
    }
 
    /* (non-Javadoc)
@@ -189,10 +168,7 @@
     */
    public void appendDeleteRecordTransactional(final byte journalID, final long txID, final long id) throws Exception
    {
-      if (enabled)
-      {
          sendReplicatePacket(new ReplicationDeleteTXMessage(journalID, txID, id, NullEncoding.instance));
-      }
    }
 
    /* (non-Javadoc)
@@ -200,10 +176,7 @@
     */
    public void appendPrepareRecord(final byte journalID, final long txID, final EncodingSupport transactionData) throws Exception
    {
-      if (enabled)
-      {
-         sendReplicatePacket(new ReplicationPrepareMessage(journalID, txID, transactionData));
-      }
+      sendReplicatePacket(new ReplicationPrepareMessage(journalID, txID, transactionData));
    }
 
    /* (non-Javadoc)
@@ -211,10 +184,7 @@
     */
    public void appendRollbackRecord(final byte journalID, final long txID) throws Exception
    {
-      if (enabled)
-      {
-         sendReplicatePacket(new ReplicationCommitMessage(journalID, false, txID));
-      }
+      sendReplicatePacket(new ReplicationCommitMessage(journalID, false, txID));
    }
 
    /* (non-Javadoc)
@@ -222,10 +192,7 @@
     */
    public void pageClosed(final SimpleString storeName, final int pageNumber)
    {
-      if (enabled)
-      {
-         sendReplicatePacket(new ReplicationPageEventMessage(storeName, pageNumber, false));
-      }
+      sendReplicatePacket(new ReplicationPageEventMessage(storeName, pageNumber, false));
    }
 
    /* (non-Javadoc)
@@ -233,10 +200,7 @@
     */
    public void pageDeleted(final SimpleString storeName, final int pageNumber)
    {
-      if (enabled)
-      {
-         sendReplicatePacket(new ReplicationPageEventMessage(storeName, pageNumber, true));
-      }
+      sendReplicatePacket(new ReplicationPageEventMessage(storeName, pageNumber, true));
    }
 
    /* (non-Javadoc)
@@ -244,10 +208,7 @@
     */
    public void pageWrite(final PagedMessage message, final int pageNumber)
    {
-      if (enabled)
-      {
-         sendReplicatePacket(new ReplicationPageWriteMessage(message, pageNumber));
-      }
+      sendReplicatePacket(new ReplicationPageWriteMessage(message, pageNumber));
    }
 
    /* (non-Javadoc)
@@ -255,10 +216,7 @@
     */
    public void largeMessageBegin(final long messageId)
    {
-      if (enabled)
-      {
-         sendReplicatePacket(new ReplicationLargeMessageBeingMessage(messageId));
-      }
+      sendReplicatePacket(new ReplicationLargeMessageBeingMessage(messageId));
    }
 
    /* XXX Unused? */
@@ -275,10 +233,7 @@
     */
    public void largeMessageWrite(final long messageId, final byte[] body)
    {
-      if (enabled)
-      {
-         sendReplicatePacket(new ReplicationLargeMessageWriteMessage(messageId, body));
-      }
+      sendReplicatePacket(new ReplicationLargeMessageWriteMessage(messageId, body));
    }
 
    /* (non-Javadoc)
@@ -312,6 +267,7 @@
     */
    public void stop() throws Exception
    {
+
       if (!started)
       {
          return;
@@ -333,7 +289,6 @@
             ReplicationManagerImpl.log.warn("Error completing callback on replication manager", e);
          }
       }
-
       if (replicatingChannel != null)
       {
          replicatingChannel.close();
@@ -372,6 +327,7 @@
 
    public void compareJournals(final JournalLoadInformation[] journalInfo) throws HornetQException
    {
+      if (enabled)
       replicatingChannel.sendBlocking(new ReplicationCompareDataMessage(journalInfo));
    }
 
@@ -388,6 +344,8 @@
 
    private void sendReplicatePacket(final Packet packet, boolean lineUp)
    {
+      if (!enabled)
+         return;
       boolean runItNow = false;
 
       OperationContext repliToken = OperationContextImpl.getContext(executorFactory);
@@ -506,9 +464,12 @@
    @Override
    public void syncJournalFile(JournalFile jf, JournalContent content) throws Exception
    {
-      SequentialFile file = jf.getFile().copy();
-      log.info("Replication: sending " + jf + " (size=" + file.size() + ") to backup. " + file);
-      sendLargeFile(content, null, jf.getFileID(), file, Long.MAX_VALUE);
+      if (enabled)
+      {
+         SequentialFile file = jf.getFile().copy();
+         log.info("Replication: sending " + jf + " (size=" + file.size() + ") to backup. " + file);
+         sendLargeFile(content, null, jf.getFileID(), file, Long.MAX_VALUE);
+      }
    }
 
    @Override
@@ -536,6 +497,8 @@
             long maxBytesToSend)
             throws Exception
    {
+      if (!enabled)
+         return;
       if (!file.isOpen())
       {
          file.open(1, false);

Modified: branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/NettyReplicatedFailoverTest.java
===================================================================
--- branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/NettyReplicatedFailoverTest.java	2011-09-26 10:34:14 UTC (rev 11417)
+++ branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/NettyReplicatedFailoverTest.java	2011-09-26 10:35:30 UTC (rev 11418)
@@ -13,10 +13,6 @@
 
 package org.hornetq.tests.integration.cluster.failover;
 
-import org.hornetq.core.config.Configuration;
-import org.hornetq.tests.integration.cluster.util.SameProcessHornetQServer;
-import org.hornetq.tests.integration.cluster.util.TestableServer;
-
 /**
  * A NettyReplicatedFailoverTest
  *



More information about the hornetq-commits mailing list