[hornetq-commits] JBoss hornetq SVN: r8039 - in branches/Replication_Clebert: src/main/org/hornetq/core/journal/impl and 2 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Fri Oct 2 17:07:24 EDT 2009


Author: clebert.suconic at jboss.com
Date: 2009-10-02 17:07:23 -0400 (Fri, 02 Oct 2009)
New Revision: 8039

Added:
   branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicatedJournalImpl.java
Modified:
   branches/Replication_Clebert/src/main/org/hornetq/core/journal/Journal.java
   branches/Replication_Clebert/src/main/org/hornetq/core/journal/TestableJournal.java
   branches/Replication_Clebert/src/main/org/hornetq/core/journal/impl/JournalImpl.java
   branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
   branches/Replication_Clebert/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java
Log:
Changes

Modified: branches/Replication_Clebert/src/main/org/hornetq/core/journal/Journal.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/journal/Journal.java	2009-10-02 17:15:12 UTC (rev 8038)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/journal/Journal.java	2009-10-02 21:07:23 UTC (rev 8039)
@@ -86,22 +86,5 @@
 
    void perfBlast(int pages) throws Exception;
 
-   /** This method is called automatically when a new file is opened.
-    * @return true if it needs to re-check due to cleanup or other factors  */
-   boolean checkReclaimStatus() throws Exception;
 
-   /** This method check for the need of compacting based on the minCompactPercentage 
-    * This method is usually called automatically when new files are opened
-   */
-   void checkCompact() throws Exception;
-
-   /**
-    * Eliminate deleted records of the journal.
-    * @throws Exception 
-    */
-   void compact() throws Exception;
-   
-   
-   JournalFile[] getDataFiles();
-
 }

Modified: branches/Replication_Clebert/src/main/org/hornetq/core/journal/TestableJournal.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/journal/TestableJournal.java	2009-10-02 17:15:12 UTC (rev 8038)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/journal/TestableJournal.java	2009-10-02 21:07:23 UTC (rev 8039)
@@ -13,6 +13,7 @@
 
 package org.hornetq.core.journal;
 
+import org.hornetq.core.journal.impl.JournalFile;
 
 /**
  * 
@@ -52,6 +53,14 @@
 
    boolean isAutoReclaim();
 
+   void compact() throws Exception;
    
+   /** This method is called automatically when a new file is opened.
+    * @return true if it needs to re-check due to cleanup or other factors  */
+   boolean checkReclaimStatus() throws Exception;
 
+   
+   JournalFile[] getDataFiles();
+   
+
 }

Modified: branches/Replication_Clebert/src/main/org/hornetq/core/journal/impl/JournalImpl.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/journal/impl/JournalImpl.java	2009-10-02 17:15:12 UTC (rev 8038)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/journal/impl/JournalImpl.java	2009-10-02 21:07:23 UTC (rev 8039)
@@ -2159,7 +2159,7 @@
       return (compactMinFiles * compactPercentage);
    }
 
-   public synchronized void cleanUp(final JournalFile file) throws Exception
+   private synchronized void cleanUp(final JournalFile file) throws Exception
    {
       if (state != STATE_LOADED)
       {
@@ -2236,7 +2236,7 @@
 
    }
 
-   public void checkCompact() throws Exception
+   private void checkCompact() throws Exception
    {
       if (compactMinFiles == 0)
       {

Added: branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicatedJournalImpl.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicatedJournalImpl.java	                        (rev 0)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicatedJournalImpl.java	2009-10-02 21:07:23 UTC (rev 8039)
@@ -0,0 +1,355 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.replication.impl;
+
+import java.util.List;
+
+import org.hornetq.core.journal.EncodingSupport;
+import org.hornetq.core.journal.Journal;
+import org.hornetq.core.journal.LoaderCallback;
+import org.hornetq.core.journal.PreparedTransactionInfo;
+import org.hornetq.core.journal.RecordInfo;
+import org.hornetq.core.journal.TransactionFailureCallback;
+import org.hornetq.core.journal.impl.JournalImpl.ByteArrayEncoding;
+import org.hornetq.core.replication.ReplicationManager;
+
+/**
+ * A ReplicatedJournalImpl
+ *
+ * @author <mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public class ReplicatedJournalImpl implements Journal
+{
+
+   // Constants -----------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+
+   private final ReplicationManager replicationManager;
+
+   private final Journal replicatedJournal;
+
+   private final byte journalID;
+
+   /**
+    * @param journaID
+    * @param replicatedJournal
+    * @param replicationManager
+    */
+   public ReplicatedJournalImpl(byte journaID, Journal replicatedJournal, ReplicationManager replicationManager)
+   {
+      super();
+      this.journalID = journaID;
+      this.replicatedJournal = replicatedJournal;
+      this.replicationManager = replicationManager;
+   }
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   // Public --------------------------------------------------------
+   /**
+    * @param id
+    * @param recordType
+    * @param record
+    * @param sync
+    * @throws Exception
+    * @see org.hornetq.core.journal.Journal#appendAddRecord(long, byte, byte[], boolean)
+    */
+   public void appendAddRecord(long id, byte recordType, byte[] record, boolean sync) throws Exception
+   {
+      replicationManager.appendAddRecord(journalID, id, recordType, new ByteArrayEncoding(record));
+      replicatedJournal.appendAddRecord(id, recordType, record, sync);
+   }
+
+   /**
+    * @param id
+    * @param recordType
+    * @param record
+    * @param sync
+    * @throws Exception
+    * @see org.hornetq.core.journal.Journal#appendAddRecord(long, byte, org.hornetq.core.journal.EncodingSupport, boolean)
+    */
+   public void appendAddRecord(long id, byte recordType, EncodingSupport record, boolean sync) throws Exception
+   {
+      replicationManager.appendAddRecord(journalID, id, recordType, record);
+      replicatedJournal.appendAddRecord(id, recordType, record, sync);
+   }
+
+   /**
+    * @param txID
+    * @param id
+    * @param recordType
+    * @param record
+    * @throws Exception
+    * @see org.hornetq.core.journal.Journal#appendAddRecordTransactional(long, long, byte, byte[])
+    */
+   public void appendAddRecordTransactional(long txID, long id, byte recordType, byte[] record) throws Exception
+   {
+      replicationManager.appendAddRecordTransactional(journalID, txID, id, recordType, new ByteArrayEncoding(record));
+      replicatedJournal.appendAddRecordTransactional(txID, id, recordType, record);
+   }
+
+   /**
+    * @param txID
+    * @param id
+    * @param recordType
+    * @param record
+    * @throws Exception
+    * @see org.hornetq.core.journal.Journal#appendAddRecordTransactional(long, long, byte, org.hornetq.core.journal.EncodingSupport)
+    */
+   public void appendAddRecordTransactional(long txID, long id, byte recordType, EncodingSupport record) throws Exception
+   {
+      replicationManager.appendAddRecordTransactional(journalID, txID, id, recordType, record);
+      replicatedJournal.appendAddRecordTransactional(txID, id, recordType, record);
+   }
+
+   /**
+    * @param txID
+    * @param sync
+    * @throws Exception
+    * @see org.hornetq.core.journal.Journal#appendCommitRecord(long, boolean)
+    */
+   public void appendCommitRecord(long txID, boolean sync) throws Exception
+   {
+      replicationManager.appendCommitRecord(journalID, txID);
+      replicatedJournal.appendCommitRecord(txID, sync);
+   }
+
+   /**
+    * @param id
+    * @param sync
+    * @throws Exception
+    * @see org.hornetq.core.journal.Journal#appendDeleteRecord(long, boolean)
+    */
+   public void appendDeleteRecord(long id, boolean sync) throws Exception
+   {
+      replicationManager.appendDeleteRecord(journalID, id);
+      replicatedJournal.appendDeleteRecord(id, sync);
+   }
+
+   /**
+    * @param txID
+    * @param id
+    * @param record
+    * @throws Exception
+    * @see org.hornetq.core.journal.Journal#appendDeleteRecordTransactional(long, long, byte[])
+    */
+   public void appendDeleteRecordTransactional(long txID, long id, byte[] record) throws Exception
+   {
+      replicationManager.appendDeleteRecordTransactional(journalID, txID, id, new ByteArrayEncoding(record));
+      replicatedJournal.appendDeleteRecordTransactional(txID, id, record);
+   }
+
+   /**
+    * @param txID
+    * @param id
+    * @param record
+    * @throws Exception
+    * @see org.hornetq.core.journal.Journal#appendDeleteRecordTransactional(long, long, org.hornetq.core.journal.EncodingSupport)
+    */
+   public void appendDeleteRecordTransactional(long txID, long id, EncodingSupport record) throws Exception
+   {
+      replicationManager.appendDeleteRecordTransactional(journalID, txID, id, record);
+      replicatedJournal.appendDeleteRecordTransactional(txID, id, record);
+   }
+
+   /**
+    * @param txID
+    * @param id
+    * @throws Exception
+    * @see org.hornetq.core.journal.Journal#appendDeleteRecordTransactional(long, long)
+    */
+   public void appendDeleteRecordTransactional(long txID, long id) throws Exception
+   {
+      replicationManager.appendDeleteRecordTransactional(journalID, txID, id);
+      replicatedJournal.appendDeleteRecordTransactional(txID, id);
+   }
+
+   /**
+    * @param txID
+    * @param transactionData
+    * @param sync
+    * @throws Exception
+    * @see org.hornetq.core.journal.Journal#appendPrepareRecord(long, byte[], boolean)
+    */
+   public void appendPrepareRecord(long txID, byte[] transactionData, boolean sync) throws Exception
+   {
+      replicationManager.appendPrepareRecord(journalID, txID, new ByteArrayEncoding(transactionData));
+      replicatedJournal.appendPrepareRecord(txID, transactionData, sync);
+   }
+
+   /**
+    * @param txID
+    * @param transactionData
+    * @param sync
+    * @throws Exception
+    * @see org.hornetq.core.journal.Journal#appendPrepareRecord(long, org.hornetq.core.journal.EncodingSupport, boolean)
+    */
+   public void appendPrepareRecord(long txID, EncodingSupport transactionData, boolean sync) throws Exception
+   {
+      replicationManager.appendPrepareRecord(journalID, txID, transactionData);
+      replicatedJournal.appendPrepareRecord(txID, transactionData, sync);
+   }
+
+   /**
+    * @param txID
+    * @param sync
+    * @throws Exception
+    * @see org.hornetq.core.journal.Journal#appendRollbackRecord(long, boolean)
+    */
+   public void appendRollbackRecord(long txID, boolean sync) throws Exception
+   {
+      replicationManager.appendRollbackRecord(journalID, txID);
+      replicatedJournal.appendRollbackRecord(txID, sync);
+   }
+
+   /**
+    * @param id
+    * @param recordType
+    * @param record
+    * @param sync
+    * @throws Exception
+    * @see org.hornetq.core.journal.Journal#appendUpdateRecord(long, byte, byte[], boolean)
+    */
+   public void appendUpdateRecord(long id, byte recordType, byte[] record, boolean sync) throws Exception
+   {
+      replicationManager.appendUpdateRecord(journalID, id, recordType, new ByteArrayEncoding(record));
+      replicatedJournal.appendUpdateRecord(id, recordType, record, sync);
+   }
+
+   /**
+    * @param id
+    * @param recordType
+    * @param record
+    * @param sync
+    * @throws Exception
+    * @see org.hornetq.core.journal.Journal#appendUpdateRecord(long, byte, org.hornetq.core.journal.EncodingSupport, boolean)
+    */
+   public void appendUpdateRecord(long id, byte recordType, EncodingSupport record, boolean sync) throws Exception
+   {
+      replicationManager.appendUpdateRecord(journalID, id, recordType, record);
+      replicatedJournal.appendUpdateRecord(id, recordType, record, sync);
+   }
+
+   /**
+    * @param txID
+    * @param id
+    * @param recordType
+    * @param record
+    * @throws Exception
+    * @see org.hornetq.core.journal.Journal#appendUpdateRecordTransactional(long, long, byte, byte[])
+    */
+   public void appendUpdateRecordTransactional(long txID, long id, byte recordType, byte[] record) throws Exception
+   {
+      replicationManager.appendUpdateRecordTransactional(journalID, txID, id, recordType, new ByteArrayEncoding(record));
+      replicatedJournal.appendUpdateRecordTransactional(txID, id, recordType, record);
+   }
+
+   /**
+    * @param txID
+    * @param id
+    * @param recordType
+    * @param record
+    * @throws Exception
+    * @see org.hornetq.core.journal.Journal#appendUpdateRecordTransactional(long, long, byte, org.hornetq.core.journal.EncodingSupport)
+    */
+   public void appendUpdateRecordTransactional(long txID, long id, byte recordType, EncodingSupport record) throws Exception
+   {
+      replicationManager.appendUpdateRecordTransactional(journalID, txID, id, recordType, record);
+      replicatedJournal.appendUpdateRecordTransactional(txID, id, recordType, record);
+   }
+
+   /**
+    * @param committedRecords
+    * @param preparedTransactions
+    * @param transactionFailure
+    * @return
+    * @throws Exception
+    * @see org.hornetq.core.journal.Journal#load(java.util.List, java.util.List, org.hornetq.core.journal.TransactionFailureCallback)
+    */
+   public long load(List<RecordInfo> committedRecords,
+                    List<PreparedTransactionInfo> preparedTransactions,
+                    TransactionFailureCallback transactionFailure) throws Exception
+   {
+      return replicatedJournal.load(committedRecords, preparedTransactions, transactionFailure);
+   }
+
+   /**
+    * @param reloadManager
+    * @return
+    * @throws Exception
+    * @see org.hornetq.core.journal.Journal#load(org.hornetq.core.journal.LoaderCallback)
+    */
+   public long load(LoaderCallback reloadManager) throws Exception
+   {
+      return replicatedJournal.load(reloadManager);
+   }
+
+   /**
+    * @param pages
+    * @throws Exception
+    * @see org.hornetq.core.journal.Journal#perfBlast(int)
+    */
+   public void perfBlast(int pages) throws Exception
+   {
+      replicatedJournal.perfBlast(pages);
+   }
+
+   /**
+    * @throws Exception
+    * @see org.hornetq.core.server.HornetQComponent#start()
+    */
+   public void start() throws Exception
+   {
+      replicatedJournal.start();
+   }
+
+   /**
+    * @throws Exception
+    * @see org.hornetq.core.server.HornetQComponent#stop()
+    */
+   public void stop() throws Exception
+   {
+      replicatedJournal.stop();
+   }
+
+   /* (non-Javadoc)
+    * @see org.hornetq.core.journal.Journal#getAlignment()
+    */
+   public int getAlignment() throws Exception
+   {
+      return replicatedJournal.getAlignment();
+   }
+
+   /* (non-Javadoc)
+    * @see org.hornetq.core.server.HornetQComponent#isStarted()
+    */
+   public boolean isStarted()
+   {
+      return replicatedJournal.isStarted();
+   }
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+
+}

Modified: branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java	2009-10-02 17:15:12 UTC (rev 8038)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java	2009-10-02 21:07:23 UTC (rev 8039)
@@ -66,7 +66,7 @@
 
    private boolean started;
 
-   private boolean playedResponsesOnFailure;
+   private volatile boolean enabled;
 
    private final Object replicationLock = new Object();
 
@@ -100,80 +100,120 @@
 
    public void appendAddRecord(final byte journalID, final long id, final byte recordType, final EncodingSupport record)
    {
-      sendReplicatePacket(new ReplicationAddMessage(journalID, false, id, recordType, record));
+      if (enabled)
+      {
+         sendReplicatePacket(new ReplicationAddMessage(journalID, false, id, recordType, record));
+      }
    }
 
    /* (non-Javadoc)
     * @see org.hornetq.core.replication.ReplicationManager#appendUpdateRecord(byte, long, byte, org.hornetq.core.journal.EncodingSupport, boolean)
     */
-   public void appendUpdateRecord(byte journalID, long id, byte recordType, EncodingSupport record) throws Exception
+   public void appendUpdateRecord(final byte journalID,
+                                  final long id,
+                                  final byte recordType,
+                                  final EncodingSupport record) throws Exception
    {
-      sendReplicatePacket(new ReplicationAddMessage(journalID, true, id, recordType, record));
+      if (enabled)
+      {
+         sendReplicatePacket(new ReplicationAddMessage(journalID, true, id, recordType, record));
+      }
    }
 
    /* (non-Javadoc)
     * @see org.hornetq.core.replication.ReplicationManager#appendDeleteRecord(byte, long, boolean)
     */
-   public void appendDeleteRecord(byte journalID, long id) throws Exception
+   public void appendDeleteRecord(final byte journalID, final long id) throws Exception
    {
-      sendReplicatePacket(new ReplicationDeleteMessage(journalID, id));
+      if (enabled)
+      {
+         sendReplicatePacket(new ReplicationDeleteMessage(journalID, id));
+      }
    }
 
-   public void appendAddRecordTransactional(byte journalID, long txID, long id, byte recordType, EncodingSupport record) throws Exception
+   public void appendAddRecordTransactional(final byte journalID,
+                                            final long txID,
+                                            final long id,
+                                            final byte recordType,
+                                            final EncodingSupport record) throws Exception
    {
-      sendReplicatePacket(new ReplicationAddTXMessage(journalID, false, txID, id, recordType, record));
+      if (enabled)
+      {
+         sendReplicatePacket(new ReplicationAddTXMessage(journalID, false, txID, id, recordType, record));
+      }
    }
 
    /* (non-Javadoc)
     * @see org.hornetq.core.replication.ReplicationManager#appendUpdateRecordTransactional(byte, long, long, byte, org.hornetq.core.journal.EncodingSupport)
     */
-   public void appendUpdateRecordTransactional(byte journalID,
-                                               long txID,
-                                               long id,
-                                               byte recordType,
-                                               EncodingSupport record) throws Exception
+   public void appendUpdateRecordTransactional(final byte journalID,
+                                               final long txID,
+                                               final long id,
+                                               final byte recordType,
+                                               final EncodingSupport record) throws Exception
    {
-      sendReplicatePacket(new ReplicationAddTXMessage(journalID, true, txID, id, recordType, record));
+      if (enabled)
+      {
+         sendReplicatePacket(new ReplicationAddTXMessage(journalID, true, txID, id, recordType, record));
+      }
    }
 
    /* (non-Javadoc)
     * @see org.hornetq.core.replication.ReplicationManager#appendCommitRecord(byte, long, boolean)
     */
-   public void appendCommitRecord(byte journalID, long txID) throws Exception
+   public void appendCommitRecord(final byte journalID, final long txID) throws Exception
    {
-      sendReplicatePacket(new ReplicationCommitMessage(journalID, false, txID));
+      if (enabled)
+      {
+         sendReplicatePacket(new ReplicationCommitMessage(journalID, false, txID));
+      }
    }
 
    /* (non-Javadoc)
     * @see org.hornetq.core.replication.ReplicationManager#appendDeleteRecordTransactional(byte, long, long, org.hornetq.core.journal.EncodingSupport)
     */
-   public void appendDeleteRecordTransactional(byte journalID, long txID, long id, EncodingSupport record) throws Exception
+   public void appendDeleteRecordTransactional(final byte journalID,
+                                               final long txID,
+                                               final long id,
+                                               final EncodingSupport record) throws Exception
    {
-      sendReplicatePacket(new ReplicationDeleteTXMessage(journalID, txID, id, record));
+      if (enabled)
+      {
+         sendReplicatePacket(new ReplicationDeleteTXMessage(journalID, txID, id, record));
+      }
    }
 
    /* (non-Javadoc)
     * @see org.hornetq.core.replication.ReplicationManager#appendDeleteRecordTransactional(byte, long, long)
     */
-   public void appendDeleteRecordTransactional(byte journalID, long txID, long id) throws Exception
+   public void appendDeleteRecordTransactional(final byte journalID, final long txID, final long id) throws Exception
    {
-      sendReplicatePacket(new ReplicationDeleteTXMessage(journalID, txID, id, NullEncoding.instance));
+      if (enabled)
+      {
+         sendReplicatePacket(new ReplicationDeleteTXMessage(journalID, txID, id, NullEncoding.instance));
+      }
    }
 
    /* (non-Javadoc)
     * @see org.hornetq.core.replication.ReplicationManager#appendPrepareRecord(byte, long, org.hornetq.core.journal.EncodingSupport, boolean)
     */
-   public void appendPrepareRecord(byte journalID, long txID, EncodingSupport transactionData) throws Exception
+   public void appendPrepareRecord(final byte journalID, final long txID, final EncodingSupport transactionData) throws Exception
    {
-      sendReplicatePacket(new ReplicationPrepareMessage(journalID, txID, transactionData));
+      if (enabled)
+      {
+         sendReplicatePacket(new ReplicationPrepareMessage(journalID, txID, transactionData));
+      }
    }
 
    /* (non-Javadoc)
     * @see org.hornetq.core.replication.ReplicationManager#appendRollbackRecord(byte, long, boolean)
     */
-   public void appendRollbackRecord(byte journalID, long txID) throws Exception
+   public void appendRollbackRecord(final byte journalID, final long txID) throws Exception
    {
-      sendReplicatePacket(new ReplicationCommitMessage(journalID, false, txID));
+      if (enabled)
+      {
+         sendReplicatePacket(new ReplicationCommitMessage(journalID, false, txID));
+      }
    }
 
    /* (non-Javadoc)
@@ -181,7 +221,7 @@
     */
    public synchronized boolean isStarted()
    {
-      return this.started;
+      return started;
    }
 
    /* (non-Javadoc)
@@ -195,16 +235,18 @@
 
       Channel mainChannel = connection.getChannel(1, -1, false);
 
-      this.replicatingChannel = connection.getChannel(channelID, WINDOW_SIZE, false);
+      replicatingChannel = connection.getChannel(channelID, WINDOW_SIZE, false);
 
-      this.replicatingChannel.setHandler(this.responseHandler);
+      replicatingChannel.setHandler(responseHandler);
 
       CreateReplicationSessionMessage replicationStartPackage = new CreateReplicationSessionMessage(channelID,
                                                                                                     WINDOW_SIZE);
 
       mainChannel.sendBlocking(replicationStartPackage);
 
-      this.started = true;
+      started = true;
+
+      enabled = true;
    }
 
    /* (non-Javadoc)
@@ -217,7 +259,7 @@
          replicatingChannel.close();
       }
 
-      this.started = false;
+      started = false;
 
       if (connection != null)
       {
@@ -242,7 +284,7 @@
    /* (non-Javadoc)
     * @see org.hornetq.core.replication.ReplicationManager#addReplicationAction(java.lang.Runnable)
     */
-   public void addReplicationAction(Runnable runnable)
+   public void addReplicationAction(final Runnable runnable)
    {
       getReplicationToken().addReplicationAction(runnable);
    }
@@ -267,7 +309,7 @@
          });
       }
    }
-   
+
    /* (non-Javadoc)
     * @see org.hornetq.core.replication.ReplicationManager#getPendingTokens()
     */
@@ -276,7 +318,6 @@
       return activeTokens;
    }
 
-
    private void sendReplicatePacket(final Packet packet)
    {
       boolean runItNow = false;
@@ -286,7 +327,7 @@
 
       synchronized (replicationLock)
       {
-         if (playedResponsesOnFailure)
+         if (!enabled)
          {
             // Already replicating channel failed, so just play the action now
 
@@ -336,7 +377,7 @@
       /* (non-Javadoc)
        * @see org.hornetq.core.remoting.ChannelHandler#handlePacket(org.hornetq.core.remoting.Packet)
        */
-      public void handlePacket(Packet packet)
+      public void handlePacket(final Packet packet)
       {
          if (packet.getType() == PacketImpl.REPLICATION_RESPONSE)
          {
@@ -366,5 +407,4 @@
 
    }
 
-
 }

Modified: branches/Replication_Clebert/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java
===================================================================
--- branches/Replication_Clebert/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java	2009-10-02 17:15:12 UTC (rev 8038)
+++ branches/Replication_Clebert/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java	2009-10-02 21:07:23 UTC (rev 8039)
@@ -34,6 +34,11 @@
 import org.hornetq.core.config.TransportConfiguration;
 import org.hornetq.core.exception.HornetQException;
 import org.hornetq.core.journal.EncodingSupport;
+import org.hornetq.core.journal.Journal;
+import org.hornetq.core.journal.LoaderCallback;
+import org.hornetq.core.journal.PreparedTransactionInfo;
+import org.hornetq.core.journal.RecordInfo;
+import org.hornetq.core.journal.TransactionFailureCallback;
 import org.hornetq.core.management.ManagementService;
 import org.hornetq.core.remoting.Channel;
 import org.hornetq.core.remoting.ChannelHandler;
@@ -42,6 +47,7 @@
 import org.hornetq.core.remoting.impl.invm.InVMConnectorFactory;
 import org.hornetq.core.remoting.server.impl.RemotingServiceImpl;
 import org.hornetq.core.remoting.spi.HornetQBuffer;
+import org.hornetq.core.replication.impl.ReplicatedJournalImpl;
 import org.hornetq.core.replication.impl.ReplicationManagerImpl;
 import org.hornetq.core.server.HornetQServer;
 import org.hornetq.core.server.impl.HornetQServerImpl;
@@ -147,21 +153,23 @@
          ReplicationManagerImpl manager = new ReplicationManagerImpl(connectionManager, executor);
          manager.start();
 
-         manager.appendPrepareRecord((byte)0, 100, new FakeData());
+         Journal replicatedJournal = new ReplicatedJournalImpl((byte)1, new FakeJournal(), manager);
 
-         manager.appendAddRecord((byte)0, 1, (byte)1, new FakeData());
-         manager.appendUpdateRecord((byte)0, 1, (byte)2, new FakeData());
-         manager.appendDeleteRecord((byte)0, 1);
-         manager.appendAddRecordTransactional((byte)0, 2, 2, (byte)1, new FakeData());
-         manager.appendUpdateRecordTransactional((byte)0, 2, 2, (byte)2, new FakeData());
-         manager.appendCommitRecord((byte)0, 2);
+         replicatedJournal.appendPrepareRecord(1, new FakeData(), false);
 
-         manager.appendDeleteRecordTransactional((byte)0, 3, 4, new FakeData());
-         manager.appendPrepareRecord((byte)0, 3, new FakeData());
-         manager.appendRollbackRecord((byte)0, 3);
+         replicatedJournal.appendAddRecord(1, (byte)1, new FakeData(), false);
+         replicatedJournal.appendUpdateRecord(1, (byte)2, new FakeData(), false);
+         replicatedJournal.appendDeleteRecord(1, false);
+         replicatedJournal.appendAddRecordTransactional(2, 2, (byte)1, new FakeData());
+         replicatedJournal.appendUpdateRecordTransactional(2, 2, (byte)2, new FakeData());
+         replicatedJournal.appendCommitRecord(2, false);
 
+         replicatedJournal.appendDeleteRecordTransactional(3, 4, new FakeData());
+         replicatedJournal.appendPrepareRecord(3, new FakeData(), false);
+         replicatedJournal.appendRollbackRecord(3, false);
+
          final CountDownLatch latch = new CountDownLatch(1);
-         manager.getReplicationToken().addReplicationAction(new Runnable()
+         manager.addReplicationAction(new Runnable()
          {
 
             public void run()
@@ -172,10 +180,10 @@
          });
          assertTrue(latch.await(1, TimeUnit.SECONDS));
          assertEquals(1, manager.getActiveTokens().size());
-         
+
          manager.completeToken();
-         
-         for (int i = 0 ; i < 100; i++)
+
+         for (int i = 0; i < 100; i++)
          {
             // This is asynchronous. Have to wait completion
             if (manager.getActiveTokens().size() == 0)
@@ -292,7 +300,7 @@
       connectionManager = null;
 
       scheduledExecutor = null;
-      
+
       super.tearDown();
 
    }
@@ -300,4 +308,199 @@
    // Private -------------------------------------------------------
 
    // Inner classes -------------------------------------------------
+
+   static class FakeJournal implements Journal
+   {
+
+      /* (non-Javadoc)
+       * @see org.hornetq.core.journal.Journal#appendAddRecord(long, byte, byte[], boolean)
+       */
+      public void appendAddRecord(long id, byte recordType, byte[] record, boolean sync) throws Exception
+      {
+
+      }
+
+      /* (non-Javadoc)
+       * @see org.hornetq.core.journal.Journal#appendAddRecord(long, byte, org.hornetq.core.journal.EncodingSupport, boolean)
+       */
+      public void appendAddRecord(long id, byte recordType, EncodingSupport record, boolean sync) throws Exception
+      {
+
+      }
+
+      /* (non-Javadoc)
+       * @see org.hornetq.core.journal.Journal#appendAddRecordTransactional(long, long, byte, byte[])
+       */
+      public void appendAddRecordTransactional(long txID, long id, byte recordType, byte[] record) throws Exception
+      {
+
+      }
+
+      /* (non-Javadoc)
+       * @see org.hornetq.core.journal.Journal#appendAddRecordTransactional(long, long, byte, org.hornetq.core.journal.EncodingSupport)
+       */
+      public void appendAddRecordTransactional(long txID, long id, byte recordType, EncodingSupport record) throws Exception
+      {
+
+      }
+
+      /* (non-Javadoc)
+       * @see org.hornetq.core.journal.Journal#appendCommitRecord(long, boolean)
+       */
+      public void appendCommitRecord(long txID, boolean sync) throws Exception
+      {
+
+      }
+
+      /* (non-Javadoc)
+       * @see org.hornetq.core.journal.Journal#appendDeleteRecord(long, boolean)
+       */
+      public void appendDeleteRecord(long id, boolean sync) throws Exception
+      {
+
+      }
+
+      /* (non-Javadoc)
+       * @see org.hornetq.core.journal.Journal#appendDeleteRecordTransactional(long, long, byte[])
+       */
+      public void appendDeleteRecordTransactional(long txID, long id, byte[] record) throws Exception
+      {
+
+      }
+
+      /* (non-Javadoc)
+       * @see org.hornetq.core.journal.Journal#appendDeleteRecordTransactional(long, long, org.hornetq.core.journal.EncodingSupport)
+       */
+      public void appendDeleteRecordTransactional(long txID, long id, EncodingSupport record) throws Exception
+      {
+
+      }
+
+      /* (non-Javadoc)
+       * @see org.hornetq.core.journal.Journal#appendDeleteRecordTransactional(long, long)
+       */
+      public void appendDeleteRecordTransactional(long txID, long id) throws Exception
+      {
+
+      }
+
+      /* (non-Javadoc)
+       * @see org.hornetq.core.journal.Journal#appendPrepareRecord(long, org.hornetq.core.journal.EncodingSupport, boolean)
+       */
+      public void appendPrepareRecord(long txID, EncodingSupport transactionData, boolean sync) throws Exception
+      {
+
+      }
+
+      /* (non-Javadoc)
+       * @see org.hornetq.core.journal.Journal#appendPrepareRecord(long, byte[], boolean)
+       */
+      public void appendPrepareRecord(long txID, byte[] transactionData, boolean sync) throws Exception
+      {
+
+      }
+
+      /* (non-Javadoc)
+       * @see org.hornetq.core.journal.Journal#appendRollbackRecord(long, boolean)
+       */
+      public void appendRollbackRecord(long txID, boolean sync) throws Exception
+      {
+
+      }
+
+      /* (non-Javadoc)
+       * @see org.hornetq.core.journal.Journal#appendUpdateRecord(long, byte, byte[], boolean)
+       */
+      public void appendUpdateRecord(long id, byte recordType, byte[] record, boolean sync) throws Exception
+      {
+
+      }
+
+      /* (non-Javadoc)
+       * @see org.hornetq.core.journal.Journal#appendUpdateRecord(long, byte, org.hornetq.core.journal.EncodingSupport, boolean)
+       */
+      public void appendUpdateRecord(long id, byte recordType, EncodingSupport record, boolean sync) throws Exception
+      {
+
+      }
+
+      /* (non-Javadoc)
+       * @see org.hornetq.core.journal.Journal#appendUpdateRecordTransactional(long, long, byte, byte[])
+       */
+      public void appendUpdateRecordTransactional(long txID, long id, byte recordType, byte[] record) throws Exception
+      {
+
+      }
+
+      /* (non-Javadoc)
+       * @see org.hornetq.core.journal.Journal#appendUpdateRecordTransactional(long, long, byte, org.hornetq.core.journal.EncodingSupport)
+       */
+      public void appendUpdateRecordTransactional(long txID, long id, byte recordType, EncodingSupport record) throws Exception
+      {
+
+      }
+
+      /* (non-Javadoc)
+       * @see org.hornetq.core.journal.Journal#getAlignment()
+       */
+      public int getAlignment() throws Exception
+      {
+
+         return 0;
+      }
+
+      /* (non-Javadoc)
+       * @see org.hornetq.core.journal.Journal#load(org.hornetq.core.journal.LoaderCallback)
+       */
+      public long load(LoaderCallback reloadManager) throws Exception
+      {
+
+         return 0;
+      }
+
+      /* (non-Javadoc)
+       * @see org.hornetq.core.journal.Journal#load(java.util.List, java.util.List, org.hornetq.core.journal.TransactionFailureCallback)
+       */
+      public long load(List<RecordInfo> committedRecords,
+                       List<PreparedTransactionInfo> preparedTransactions,
+                       TransactionFailureCallback transactionFailure) throws Exception
+      {
+
+         return 0;
+      }
+
+      /* (non-Javadoc)
+       * @see org.hornetq.core.journal.Journal#perfBlast(int)
+       */
+      public void perfBlast(int pages) throws Exception
+      {
+
+      }
+
+      /* (non-Javadoc)
+       * @see org.hornetq.core.server.HornetQComponent#isStarted()
+       */
+      public boolean isStarted()
+      {
+
+         return false;
+      }
+
+      /* (non-Javadoc)
+       * @see org.hornetq.core.server.HornetQComponent#start()
+       */
+      public void start() throws Exception
+      {
+
+      }
+
+      /* (non-Javadoc)
+       * @see org.hornetq.core.server.HornetQComponent#stop()
+       */
+      public void stop() throws Exception
+      {
+
+      }
+
+   }
 }



More information about the hornetq-commits mailing list