[hornetq-commits] JBoss hornetq SVN: r8195 - in trunk: src/main/org/hornetq/core/journal/impl and 17 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Tue Nov 3 11:55:04 EST 2009


Author: clebert.suconic at jboss.com
Date: 2009-11-03 11:55:02 -0500 (Tue, 03 Nov 2009)
New Revision: 8195

Added:
   trunk/src/main/org/hornetq/core/journal/JournalLoadInformation.java
   trunk/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationCompareDataMessage.java
   trunk/tests/src/org/hornetq/tests/integration/cluster/failover/ReplicatedDistrubtionTest.java
   trunk/tests/src/org/hornetq/tests/integration/cluster/failover/SharedStoreDistributionTest.java
Modified:
   trunk/src/main/org/hornetq/core/journal/Journal.java
   trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java
   trunk/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
   trunk/src/main/org/hornetq/core/persistence/StorageManager.java
   trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
   trunk/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java
   trunk/src/main/org/hornetq/core/remoting/impl/PacketDecoder.java
   trunk/src/main/org/hornetq/core/remoting/impl/wireformat/PacketImpl.java
   trunk/src/main/org/hornetq/core/replication/ReplicationEndpoint.java
   trunk/src/main/org/hornetq/core/replication/ReplicationManager.java
   trunk/src/main/org/hornetq/core/replication/impl/ReplicatedJournal.java
   trunk/src/main/org/hornetq/core/replication/impl/ReplicationContextImpl.java
   trunk/src/main/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
   trunk/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
   trunk/src/main/org/hornetq/core/server/HornetQServer.java
   trunk/src/main/org/hornetq/core/server/cluster/impl/Redistributor.java
   trunk/src/main/org/hornetq/core/server/impl/HornetQPacketHandler.java
   trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
   trunk/tests/src/org/hornetq/tests/integration/client/SessionFactoryTest.java
   trunk/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
   trunk/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTest.java
   trunk/tests/src/org/hornetq/tests/integration/cluster/failover/GroupingFailoverReplicationTest.java
   trunk/tests/src/org/hornetq/tests/integration/cluster/failover/GroupingFailoverTestBase.java
   trunk/tests/src/org/hornetq/tests/integration/cluster/failover/PagingFailoverTest.java
   trunk/tests/src/org/hornetq/tests/integration/jms/HornetQConnectionFactoryTest.java
   trunk/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java
   trunk/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java
Log:
https://jira.jboss.org/jira/browse/HORNETQ-125 - Replication work

Modified: trunk/src/main/org/hornetq/core/journal/Journal.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/Journal.java	2009-11-03 15:02:43 UTC (rev 8194)
+++ trunk/src/main/org/hornetq/core/journal/Journal.java	2009-11-03 16:55:02 UTC (rev 8195)
@@ -15,7 +15,6 @@
 
 import java.util.List;
 
-import org.hornetq.core.journal.impl.JournalFile;
 import org.hornetq.core.server.HornetQComponent;
 
 /**
@@ -77,12 +76,19 @@
 
    // Load
    
-   long load(LoaderCallback reloadManager) throws Exception;
+   JournalLoadInformation load(LoaderCallback reloadManager) throws Exception;
 
+   /** Load internal data structures and not expose any data.
+    *  This is only useful if you're using the journal but not interested on the current data.
+    *  Useful in situations where the journal is being replicated, copied... etc. */
+   JournalLoadInformation loadInternalOnly() throws Exception;
 
-   long load(List<RecordInfo> committedRecords, List<PreparedTransactionInfo> preparedTransactions, TransactionFailureCallback transactionFailure) throws Exception;
 
+   JournalLoadInformation load(List<RecordInfo> committedRecords, List<PreparedTransactionInfo> preparedTransactions, TransactionFailureCallback transactionFailure) throws Exception;
+
    int getAlignment() throws Exception;
+   
+   int getNumberOfRecords();
 
    void perfBlast(int pages) throws Exception;
 

Added: trunk/src/main/org/hornetq/core/journal/JournalLoadInformation.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/JournalLoadInformation.java	                        (rev 0)
+++ trunk/src/main/org/hornetq/core/journal/JournalLoadInformation.java	2009-11-03 16:55:02 UTC (rev 8195)
@@ -0,0 +1,144 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.journal;
+
+/**
+ * This is a POJO containing information about the journal during load time.
+ *
+ * @author <mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public class JournalLoadInformation
+{
+
+   // Constants -----------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+   private int numberOfRecords = 0;
+
+   private long maxID = -1;
+   
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   public JournalLoadInformation()
+   {
+      super();
+   }
+
+   /**
+    * @param numberOfRecords
+    * @param maxID
+    */
+   public JournalLoadInformation(final int numberOfRecords, final long maxID)
+   {
+      super();
+      this.numberOfRecords = numberOfRecords;
+      this.maxID = maxID;
+   }
+   
+   
+   
+
+   // Public --------------------------------------------------------
+
+
+   /**
+    * @return the numberOfRecords
+    */
+   public int getNumberOfRecords()
+   {
+      return numberOfRecords;
+   }
+
+   /**
+    * @param numberOfRecords the numberOfRecords to set
+    */
+   public void setNumberOfRecords(final int numberOfRecords)
+   {
+      this.numberOfRecords = numberOfRecords;
+   }
+
+   /**
+    * @return the maxID
+    */
+   public long getMaxID()
+   {
+      return maxID;
+   }
+
+   /**
+    * @param maxID the maxID to set
+    */
+   public void setMaxID(final long maxID)
+   {
+      this.maxID = maxID;
+   }
+
+
+   /* (non-Javadoc)
+    * @see java.lang.Object#hashCode()
+    */
+   @Override
+   public int hashCode()
+   {
+      final int prime = 31;
+      int result = 1;
+      result = prime * result + (int)(maxID ^ (maxID >>> 32));
+      result = prime * result + numberOfRecords;
+      return result;
+   }
+
+   /* (non-Javadoc)
+    * @see java.lang.Object#equals(java.lang.Object)
+    */
+   @Override
+   public boolean equals(Object obj)
+   {
+      if (this == obj)
+         return true;
+      if (obj == null)
+         return false;
+      if (getClass() != obj.getClass())
+         return false;
+      JournalLoadInformation other = (JournalLoadInformation)obj;
+      if (maxID != other.maxID)
+         return false;
+      if (numberOfRecords != other.numberOfRecords)
+         return false;
+      return true;
+   }
+
+   
+   /* (non-Javadoc)
+    * @see java.lang.Object#toString()
+    */
+   @Override
+   public String toString()
+   {
+      return "JournalLoadInformation [maxID=" + maxID + ", numberOfRecords=" + numberOfRecords + "]";
+   }
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+
+}

Modified: trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java	2009-11-03 15:02:43 UTC (rev 8194)
+++ trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java	2009-11-03 16:55:02 UTC (rev 8195)
@@ -36,6 +36,7 @@
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -44,6 +45,7 @@
 import org.hornetq.core.buffers.ChannelBuffers;
 import org.hornetq.core.journal.EncodingSupport;
 import org.hornetq.core.journal.IOCallback;
+import org.hornetq.core.journal.JournalLoadInformation;
 import org.hornetq.core.journal.LoaderCallback;
 import org.hornetq.core.journal.PreparedTransactionInfo;
 import org.hornetq.core.journal.RecordInfo;
@@ -1163,7 +1165,7 @@
    {
       appendDeleteRecordTransactional(txID, id, NullEncoding.instance);
    }
-   
+
    /* (non-Javadoc)
     * @see org.hornetq.core.journal.Journal#appendPrepareRecord(long, byte[], boolean)
     */
@@ -1172,8 +1174,6 @@
       appendPrepareRecord(txID, new ByteArrayEncoding(transactionData), sync);
    }
 
-
-
    /** 
     * 
     * <p>If the system crashed after a prepare was called, it should store information that is required to bring the transaction 
@@ -1360,19 +1360,48 @@
       return fileFactory.getAlignment();
    }
 
+   public synchronized JournalLoadInformation loadInternalOnly() throws Exception
+   {
+      LoaderCallback dummyLoader = new LoaderCallback()
+      {
+
+         public void failedTransaction(long transactionID, List<RecordInfo> records, List<RecordInfo> recordsToDelete)
+         {
+         }
+
+         public void updateRecord(RecordInfo info)
+         {
+         }
+
+         public void deleteRecord(long id)
+         {
+         }
+
+         public void addRecord(RecordInfo info)
+         {
+         }
+
+         public void addPreparedTransaction(PreparedTransactionInfo preparedTransaction)
+         {
+         }
+      };
+
+      return this.load(dummyLoader);
+   }
+
    /**
     * @see JournalImpl#load(LoaderCallback)
     */
-   public synchronized long load(final List<RecordInfo> committedRecords,
-                                 final List<PreparedTransactionInfo> preparedTransactions,
-                                 final TransactionFailureCallback failureCallback) throws Exception
+   public synchronized JournalLoadInformation load(final List<RecordInfo> committedRecords,
+                                               final List<PreparedTransactionInfo> preparedTransactions,
+                                               final TransactionFailureCallback failureCallback) throws Exception
    {
       final Set<Long> recordsToDelete = new HashSet<Long>();
       final List<RecordInfo> records = new ArrayList<RecordInfo>();
 
       final int DELETE_FLUSH = 20000;
 
-      long maxID = load(new LoaderCallback()
+      JournalLoadInformation info = load(new LoaderCallback()
       {
          public void addPreparedTransaction(final PreparedTransactionInfo preparedTransaction)
          {
@@ -1429,7 +1458,7 @@
          }
       }
 
-      return maxID;
+      return info;
    }
 
    /**
@@ -1649,7 +1678,7 @@
     * <p> * FileID and NumberOfElements are the transaction summary, and they will be repeated (N)umberOfFiles times </p> 
     * 
     * */
-   public synchronized long load(final LoaderCallback loadManager) throws Exception
+   public synchronized JournalLoadInformation load(final LoaderCallback loadManager) throws Exception
    {
       if (state != STATE_STARTED)
       {
@@ -1676,7 +1705,8 @@
 
       int lastDataPos = SIZE_HEADER;
 
-      long maxID = -1;
+      final AtomicLong maxID = new AtomicLong(-1);
+      // long maxID = -1;
 
       for (final JournalFile file : orderedFiles)
       {
@@ -1687,12 +1717,23 @@
          int resultLastPost = readJournalFile(fileFactory, file, new JournalReaderCallback()
          {
 
+            private void checkID(final long id)
+            {
+               if (id > maxID.longValue())
+               {
+                  maxID.set(id);
+               }
+            }
+
             public void onReadAddRecord(final RecordInfo info) throws Exception
             {
                if (trace && LOAD_TRACE)
                {
                   trace("AddRecord: " + info);
                }
+
+               checkID(info.id);
+
                hasData.set(true);
 
                loadManager.addRecord(info);
@@ -1706,6 +1747,9 @@
                {
                   trace("UpdateRecord: " + info);
                }
+
+               checkID(info.id);
+
                hasData.set(true);
 
                loadManager.updateRecord(info);
@@ -1753,6 +1797,8 @@
                   trace((info.isUpdate ? "updateRecordTX: " : "addRecordTX: ") + info + ", txid = " + transactionID);
                }
 
+               checkID(info.id);
+
                hasData.set(true);
 
                TransactionHolder tx = loadTransactions.get(transactionID);
@@ -2034,11 +2080,21 @@
 
             // Remove the transactionInfo
             transactions.remove(transaction.transactionID);
-            
-            loadManager.failedTransaction(transaction.transactionID, transaction.recordInfos, transaction.recordsToDelete);
+
+            loadManager.failedTransaction(transaction.transactionID,
+                                          transaction.recordInfos,
+                                          transaction.recordsToDelete);
          }
          else
          {
+            for (RecordInfo info : transaction.recordInfos)
+            {
+               if (info.id > maxID.get())
+               {
+                  maxID.set(info.id);
+               }
+            }
+
             PreparedTransactionInfo info = new PreparedTransactionInfo(transaction.transactionID, transaction.extraData);
 
             info.records.addAll(transaction.recordInfos);
@@ -2053,7 +2109,7 @@
 
       checkReclaimStatus();
 
-      return maxID;
+      return new JournalLoadInformation(records.size(), maxID.longValue());
    }
 
    /** 
@@ -2531,6 +2587,11 @@
       }
    }
 
+   public int getNumberOfRecords()
+   {
+      return this.records.size();
+   }
+
    // Public
    // -----------------------------------------------------------------------------
 
@@ -2854,7 +2915,7 @@
             currentFile.getFile().write(bb, sync);
          }
 
-         return currentFile;         
+         return currentFile;
       }
       finally
       {
@@ -3355,7 +3416,7 @@
    {
 
       private static NullEncoding instance = new NullEncoding();
-      
+
       public static NullEncoding getInstance()
       {
          return instance;

Modified: trunk/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java	2009-11-03 15:02:43 UTC (rev 8194)
+++ trunk/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java	2009-11-03 16:55:02 UTC (rev 8195)
@@ -40,7 +40,6 @@
 import org.hornetq.core.server.LargeServerMessage;
 import org.hornetq.core.server.MessageReference;
 import org.hornetq.core.server.ServerMessage;
-import org.hornetq.core.server.impl.RoutingContextImpl;
 import org.hornetq.core.server.impl.ServerProducerCreditManager;
 import org.hornetq.core.server.impl.ServerProducerCreditManagerImpl;
 import org.hornetq.core.settings.impl.AddressFullMessagePolicy;
@@ -971,6 +970,9 @@
       }
 
       depageTransaction.commit();
+      
+      // StorageManager does the check: if (replicated) -> do the proper cleanup already 
+      storageManager.completeReplication();
 
       if (isTrace)
       {

Modified: trunk/src/main/org/hornetq/core/persistence/StorageManager.java
===================================================================
--- trunk/src/main/org/hornetq/core/persistence/StorageManager.java	2009-11-03 15:02:43 UTC (rev 8194)
+++ trunk/src/main/org/hornetq/core/persistence/StorageManager.java	2009-11-03 16:55:02 UTC (rev 8195)
@@ -18,6 +18,7 @@
 
 import javax.transaction.xa.Xid;
 
+import org.hornetq.core.journal.JournalLoadInformation;
 import org.hornetq.core.paging.PageTransactionInfo;
 import org.hornetq.core.paging.PagedMessage;
 import org.hornetq.core.paging.PagingManager;
@@ -119,10 +120,10 @@
 
    /** This method is only useful at the backup side. We only load internal structures making the journals ready for
     *  append mode on the backup side. */
-   void loadInternalOnly() throws Exception;
+   JournalLoadInformation[] loadInternalOnly() throws Exception;
 
    
-   public void loadMessageJournal(final PostOffice postOffice,
+   JournalLoadInformation loadMessageJournal(final PostOffice postOffice,
                                   final PagingManager pagingManager,
                                   final ResourceManager resourceManager,
                                   final Map<Long, Queue> queues,
@@ -139,7 +140,7 @@
    
    void deleteQueueBinding(long queueBindingID) throws Exception;
    
-   void loadBindingJournal(List<QueueBindingInfo> queueBindingInfos, List<GroupingInfo> groupingInfos) throws Exception;
+   JournalLoadInformation loadBindingJournal(List<QueueBindingInfo> queueBindingInfos, List<GroupingInfo> groupingInfos) throws Exception;
 
    //grouping relateed operations
    void addGrouping(GroupBinding groupBinding) throws Exception;

Modified: trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java	2009-11-03 15:02:43 UTC (rev 8194)
+++ trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java	2009-11-03 16:55:02 UTC (rev 8195)
@@ -38,7 +38,7 @@
 import org.hornetq.core.filter.Filter;
 import org.hornetq.core.journal.EncodingSupport;
 import org.hornetq.core.journal.Journal;
-import org.hornetq.core.journal.LoaderCallback;
+import org.hornetq.core.journal.JournalLoadInformation;
 import org.hornetq.core.journal.PreparedTransactionInfo;
 import org.hornetq.core.journal.RecordInfo;
 import org.hornetq.core.journal.SequentialFile;
@@ -678,7 +678,7 @@
 
    }
 
-   public void loadMessageJournal(final PostOffice postOffice,
+   public JournalLoadInformation loadMessageJournal(final PostOffice postOffice,
                                   final PagingManager pagingManager,
                                   final ResourceManager resourceManager,
                                   final Map<Long, Queue> queues,
@@ -690,7 +690,7 @@
 
       Map<Long, ServerMessage> messages = new HashMap<Long, ServerMessage>();
 
-      messageJournal.load(records, preparedTransactions, new LargeMessageTXFailureCallback(messages));
+      JournalLoadInformation info = messageJournal.load(records, preparedTransactions, new LargeMessageTXFailureCallback(messages));
       
       ArrayList<LargeServerMessage> largeMessages = new ArrayList<LargeServerMessage>();
 
@@ -919,6 +919,8 @@
       {
          messageJournal.perfBlast(perfBlastPages);
       }
+      
+      return info;
    }
 
    /**
@@ -1189,13 +1191,13 @@
       bindingsJournal.appendDeleteRecord(queueBindingID, true);
    }
 
-   public void loadBindingJournal(final List<QueueBindingInfo> queueBindingInfos, final List<GroupingInfo> groupingInfos) throws Exception
+   public JournalLoadInformation loadBindingJournal(final List<QueueBindingInfo> queueBindingInfos, final List<GroupingInfo> groupingInfos) throws Exception
    {
       List<RecordInfo> records = new ArrayList<RecordInfo>();
 
       List<PreparedTransactionInfo> preparedTransactions = new ArrayList<PreparedTransactionInfo>();
 
-      bindingsJournal.load(records, preparedTransactions, null);
+      JournalLoadInformation bindingsInfo = bindingsJournal.load(records, preparedTransactions, null);
 
       for (RecordInfo record : records)
       {
@@ -1239,6 +1241,8 @@
             throw new IllegalStateException("Invalid record type " + rec);
          }
       }
+      
+      return bindingsInfo;
    }
 
    // HornetQComponent implementation
@@ -1296,34 +1300,13 @@
    /* (non-Javadoc)
     * @see org.hornetq.core.persistence.StorageManager#loadInternalOnly()
     */
-   public void loadInternalOnly() throws Exception
+   public JournalLoadInformation[] loadInternalOnly() throws Exception
    {
-      LoaderCallback dummyLoader = new LoaderCallback()
-      {
-
-         public void failedTransaction(long transactionID, List<RecordInfo> records, List<RecordInfo> recordsToDelete)
-         {
-         }
-
-         public void updateRecord(RecordInfo info)
-         {
-         }
-
-         public void deleteRecord(long id)
-         {
-         }
-
-         public void addRecord(RecordInfo info)
-         {
-         }
-
-         public void addPreparedTransaction(PreparedTransactionInfo preparedTransaction)
-         {
-         }
-      };
-
-      bindingsJournal.load(dummyLoader);
-      messageJournal.load(dummyLoader);
+      JournalLoadInformation[] info = new JournalLoadInformation[2];
+      info[0] = bindingsJournal.loadInternalOnly();
+      info[1] = messageJournal.loadInternalOnly();
+      
+      return info;
    }
 
    // Public -----------------------------------------------------------------------------------

Modified: trunk/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java
===================================================================
--- trunk/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java	2009-11-03 15:02:43 UTC (rev 8194)
+++ trunk/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java	2009-11-03 16:55:02 UTC (rev 8195)
@@ -20,16 +20,17 @@
 import javax.transaction.xa.Xid;
 
 import org.hornetq.core.buffers.ChannelBuffers;
-import org.hornetq.core.logging.Logger;
+import org.hornetq.core.journal.JournalLoadInformation;
 import org.hornetq.core.paging.PageTransactionInfo;
 import org.hornetq.core.paging.PagedMessage;
 import org.hornetq.core.paging.PagingManager;
+import org.hornetq.core.persistence.GroupingInfo;
 import org.hornetq.core.persistence.QueueBindingInfo;
 import org.hornetq.core.persistence.StorageManager;
-import org.hornetq.core.persistence.GroupingInfo;
 import org.hornetq.core.postoffice.Binding;
 import org.hornetq.core.postoffice.PostOffice;
 import org.hornetq.core.remoting.spi.HornetQBuffer;
+import org.hornetq.core.replication.ReplicationManager;
 import org.hornetq.core.server.LargeServerMessage;
 import org.hornetq.core.server.MessageReference;
 import org.hornetq.core.server.Queue;
@@ -50,8 +51,6 @@
  */
 public class NullStorageManager implements StorageManager
 {
-   private static final Logger log = Logger.getLogger(NullStorageManager.class);
-   
    private final AtomicLong idSequence = new AtomicLong(0);
    
    private UUID id;
@@ -80,9 +79,9 @@
    {
    }
 
-   public void loadBindingJournal(List<QueueBindingInfo> queueBindingInfos, List<GroupingInfo> groupingInfos) throws Exception
+   public JournalLoadInformation loadBindingJournal(List<QueueBindingInfo> queueBindingInfos, List<GroupingInfo> groupingInfos) throws Exception
    {
-
+      return new JournalLoadInformation();
    }
 
    public void prepare(final long txID, final Xid xid) throws Exception
@@ -252,12 +251,13 @@
    {
    }
    
-   public void loadMessageJournal(PostOffice postOffice,
+   public JournalLoadInformation loadMessageJournal(PostOffice postOffice,
                                   PagingManager pagingManager,
                                   ResourceManager resourceManager,
                                   Map<Long, Queue> queues,
                                   Map<SimpleString, List<Pair<byte[], Long>>> duplicateIDMap) throws Exception
    {
+      return new JournalLoadInformation();
    }
 
    public void deleteDuplicateIDTransactional(final long txID, final long recordID) throws Exception
@@ -271,8 +271,9 @@
    /* (non-Javadoc)
     * @see org.hornetq.core.persistence.StorageManager#loadInternalOnly()
     */
-   public void loadInternalOnly() throws Exception
+   public JournalLoadInformation[] loadInternalOnly() throws Exception
    {
+      return null;
    }
 
    /* (non-Javadoc)
@@ -334,4 +335,12 @@
    {
    }
 
+   /* (non-Javadoc)
+    * @see org.hornetq.core.persistence.StorageManager#setReplicator(org.hornetq.core.replication.ReplicationManager)
+    */
+   public void setReplicator(ReplicationManager replicator)
+   {
+      throw new IllegalStateException("Null Persistence should never be used as replicated");
+   }
+
 }

Modified: trunk/src/main/org/hornetq/core/remoting/impl/PacketDecoder.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/impl/PacketDecoder.java	2009-11-03 15:02:43 UTC (rev 8194)
+++ trunk/src/main/org/hornetq/core/remoting/impl/PacketDecoder.java	2009-11-03 16:55:02 UTC (rev 8195)
@@ -28,6 +28,7 @@
 import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.REPLICATION_APPEND;
 import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.REPLICATION_APPEND_TX;
 import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.REPLICATION_COMMIT_ROLLBACK;
+import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.REPLICATION_COMPARE_DATA;
 import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.REPLICATION_DELETE;
 import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.REPLICATION_DELETE_TX;
 import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.REPLICATION_LARGE_MESSAGE_BEGIN;
@@ -91,6 +92,7 @@
 import org.hornetq.core.remoting.impl.wireformat.ReplicationAddMessage;
 import org.hornetq.core.remoting.impl.wireformat.ReplicationAddTXMessage;
 import org.hornetq.core.remoting.impl.wireformat.ReplicationCommitMessage;
+import org.hornetq.core.remoting.impl.wireformat.ReplicationCompareDataMessage;
 import org.hornetq.core.remoting.impl.wireformat.ReplicationDeleteMessage;
 import org.hornetq.core.remoting.impl.wireformat.ReplicationDeleteTXMessage;
 import org.hornetq.core.remoting.impl.wireformat.ReplicationLargeMessageBeingMessage;
@@ -457,6 +459,11 @@
             packet = new ReplicationLargeMessageWriteMessage();
             break;
          }
+         case REPLICATION_COMPARE_DATA:
+         {
+            packet = new ReplicationCompareDataMessage();
+            break;
+         }
          case SESS_FORCE_CONSUMER_DELIVERY:
          {
             packet = new SessionForceConsumerDelivery();

Modified: trunk/src/main/org/hornetq/core/remoting/impl/wireformat/PacketImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/impl/wireformat/PacketImpl.java	2009-11-03 15:02:43 UTC (rev 8194)
+++ trunk/src/main/org/hornetq/core/remoting/impl/wireformat/PacketImpl.java	2009-11-03 16:55:02 UTC (rev 8195)
@@ -170,6 +170,8 @@
    public static final byte REPLICATION_LARGE_MESSAGE_END = 90;
    
    public static final byte REPLICATION_LARGE_MESSAGE_WRITE = 91;
+   
+   public static final byte REPLICATION_COMPARE_DATA = 92;
    // Static --------------------------------------------------------
 
    public PacketImpl(final byte type)

Added: trunk/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationCompareDataMessage.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationCompareDataMessage.java	                        (rev 0)
+++ trunk/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationCompareDataMessage.java	2009-11-03 16:55:02 UTC (rev 8195)
@@ -0,0 +1,100 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.remoting.impl.wireformat;
+
+import org.hornetq.core.journal.JournalLoadInformation;
+import org.hornetq.core.remoting.spi.HornetQBuffer;
+import org.hornetq.utils.DataConstants;
+
+/**
+ * Message used to compare if the Journals between the live and
+ * backup nodes are equivalent and can be used over replication.
+ * The backup journal needs to be an exact copy of the live node before it starts.
+ * @author <a href="mailto:tim.fox at jboss.com">Clebert Suconic</a>
+ */
+public class ReplicationCompareDataMessage extends PacketImpl
+{
+   // Constants -----------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+
+   private JournalLoadInformation[] journalInformation;
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   public ReplicationCompareDataMessage(final JournalLoadInformation[] journalInformation)
+   {
+      super(REPLICATION_COMPARE_DATA);
+
+      this.journalInformation = journalInformation;
+   }
+
+   public ReplicationCompareDataMessage()
+   {
+      super(REPLICATION_COMPARE_DATA);
+   }
+
+   // Public --------------------------------------------------------
+   public int getRequiredBufferSize()
+   {
+      return BASIC_PACKET_SIZE + 
+             DataConstants.SIZE_INT + (journalInformation.length * (DataConstants.SIZE_INT + DataConstants.SIZE_LONG)) +
+             DataConstants.SIZE_INT;
+
+   }
+
+   @Override
+   public void encodeBody(final HornetQBuffer buffer)
+   {
+      buffer.writeInt(journalInformation.length);
+      for (JournalLoadInformation info : journalInformation)
+      {
+         buffer.writeInt(info.getNumberOfRecords());
+         buffer.writeLong(info.getMaxID());
+      }
+   }
+
+   @Override
+   public void decodeBody(final HornetQBuffer buffer)
+   {
+      int numberOfJournals = buffer.readInt();
+
+      this.journalInformation = new JournalLoadInformation[numberOfJournals];
+
+      for (int i = 0; i < numberOfJournals; i++)
+      {
+         this.journalInformation[i] = new JournalLoadInformation();
+         this.journalInformation[i].setNumberOfRecords(buffer.readInt());
+         this.journalInformation[i].setMaxID(buffer.readLong());
+      }
+   }
+
+   /**
+    * @return the journalInformation
+    */
+   public JournalLoadInformation[] getJournalInformation()
+   {
+      return journalInformation;
+   }
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+}

Modified: trunk/src/main/org/hornetq/core/replication/ReplicationEndpoint.java
===================================================================
--- trunk/src/main/org/hornetq/core/replication/ReplicationEndpoint.java	2009-11-03 15:02:43 UTC (rev 8194)
+++ trunk/src/main/org/hornetq/core/replication/ReplicationEndpoint.java	2009-11-03 16:55:02 UTC (rev 8195)
@@ -13,6 +13,8 @@
 
 package org.hornetq.core.replication;
 
+import org.hornetq.core.exception.HornetQException;
+import org.hornetq.core.journal.JournalLoadInformation;
 import org.hornetq.core.remoting.Channel;
 import org.hornetq.core.remoting.ChannelHandler;
 import org.hornetq.core.server.HornetQComponent;
@@ -30,5 +32,7 @@
    void setChannel(Channel channel);
 
    Channel getChannel();
+   
+   void compareJournalInformation(JournalLoadInformation[] journalInformation) throws HornetQException;
 
 }

Modified: trunk/src/main/org/hornetq/core/replication/ReplicationManager.java
===================================================================
--- trunk/src/main/org/hornetq/core/replication/ReplicationManager.java	2009-11-03 15:02:43 UTC (rev 8194)
+++ trunk/src/main/org/hornetq/core/replication/ReplicationManager.java	2009-11-03 16:55:02 UTC (rev 8195)
@@ -15,7 +15,9 @@
 
 import java.util.Set;
 
+import org.hornetq.core.exception.HornetQException;
 import org.hornetq.core.journal.EncodingSupport;
+import org.hornetq.core.journal.JournalLoadInformation;
 import org.hornetq.core.paging.PagedMessage;
 import org.hornetq.core.server.HornetQComponent;
 import org.hornetq.utils.SimpleString;
@@ -80,4 +82,10 @@
    
    void largeMessageDelete(long messageId);
 
+   /**
+    * @param journalInfo
+    * @throws HornetQException 
+    */
+   void compareJournals(JournalLoadInformation[] journalInfo) throws HornetQException;
+
 }

Modified: trunk/src/main/org/hornetq/core/replication/impl/ReplicatedJournal.java
===================================================================
--- trunk/src/main/org/hornetq/core/replication/impl/ReplicatedJournal.java	2009-11-03 15:02:43 UTC (rev 8194)
+++ trunk/src/main/org/hornetq/core/replication/impl/ReplicatedJournal.java	2009-11-03 16:55:02 UTC (rev 8195)
@@ -17,6 +17,7 @@
 
 import org.hornetq.core.journal.EncodingSupport;
 import org.hornetq.core.journal.Journal;
+import org.hornetq.core.journal.JournalLoadInformation;
 import org.hornetq.core.journal.LoaderCallback;
 import org.hornetq.core.journal.PreparedTransactionInfo;
 import org.hornetq.core.journal.RecordInfo;
@@ -44,7 +45,12 @@
 
    // Attributes ----------------------------------------------------
 
-   private static final boolean trace = log.isTraceEnabled();
+   private static final boolean trace = false;
+   
+   private static void trace(String message)
+   {
+      System.out.println("ReplicatedJournal::" + message);
+   }
 
    private final ReplicationManager replicationManager;
 
@@ -64,10 +70,6 @@
 
    // Static --------------------------------------------------------
    
-   private static void trace(String message)
-   {
-      log.trace(message);
-   }
 
    // Constructors --------------------------------------------------
 
@@ -335,7 +337,7 @@
     * @throws Exception
     * @see org.hornetq.core.journal.Journal#load(java.util.List, java.util.List, org.hornetq.core.journal.TransactionFailureCallback)
     */
-   public long load(final List<RecordInfo> committedRecords,
+   public JournalLoadInformation load(final List<RecordInfo> committedRecords,
                     final List<PreparedTransactionInfo> preparedTransactions,
                     final TransactionFailureCallback transactionFailure) throws Exception
    {
@@ -348,7 +350,7 @@
     * @throws Exception
     * @see org.hornetq.core.journal.Journal#load(org.hornetq.core.journal.LoaderCallback)
     */
-   public long load(final LoaderCallback reloadManager) throws Exception
+   public JournalLoadInformation load(final LoaderCallback reloadManager) throws Exception
    {
       return localJournal.load(reloadManager);
    }
@@ -397,6 +399,22 @@
       return localJournal.isStarted();
    }
 
+   /* (non-Javadoc)
+    * @see org.hornetq.core.journal.Journal#loadInternalOnly()
+    */
+   public JournalLoadInformation loadInternalOnly() throws Exception
+   {
+      return localJournal.loadInternalOnly();
+   }
+
+   /* (non-Javadoc)
+    * @see org.hornetq.core.journal.Journal#getNumberOfRecords()
+    */
+   public int getNumberOfRecords()
+   {
+      return localJournal.getNumberOfRecords();
+   }
+
    // Package protected ---------------------------------------------
 
    // Protected -----------------------------------------------------

Modified: trunk/src/main/org/hornetq/core/replication/impl/ReplicationContextImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/replication/impl/ReplicationContextImpl.java	2009-11-03 15:02:43 UTC (rev 8194)
+++ trunk/src/main/org/hornetq/core/replication/impl/ReplicationContextImpl.java	2009-11-03 16:55:02 UTC (rev 8195)
@@ -14,7 +14,8 @@
 package org.hornetq.core.replication.impl;
 
 import java.util.ArrayList;
-import java.util.concurrent.Executor;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.hornetq.core.replication.ReplicationContext;
 
@@ -27,75 +28,78 @@
  */
 public class ReplicationContextImpl implements ReplicationContext
 {
-   final Executor executor;
+   private List<Runnable> tasks;
    
-   private ArrayList<Runnable> tasks;
+   private AtomicInteger pendings = new AtomicInteger(0);
    
-   private volatile int pendings;
+   private volatile boolean complete = false;
    
    /**
     * @param executor
     */
-   public ReplicationContextImpl(Executor executor)
+   public ReplicationContextImpl()
    {
       super();
-      this.executor = executor;
    }
 
    /** To be called by the replication manager, when new replication is added to the queue */
-   public synchronized void linedUp()
+   public void linedUp()
    {
-      pendings++;
+      pendings.incrementAndGet();
    }
 
+   /** You may have several actions to be done after a replication operation is completed. */
+   public void addReplicationAction(Runnable runnable)
+   {
+      if (complete)
+      {
+         // Sanity check, this shouldn't happen
+         throw new IllegalStateException("The Replication Context is complete, and no more tasks are accepted");
+      }
+
+      if (tasks == null)
+      {
+         // No need to use Concurrent, we only add from a single thread.
+         // We don't add any more Runnables after it is complete
+         tasks = new ArrayList<Runnable>();
+      }
+      
+      tasks.add(runnable);
+   }
+
    /** To be called by the replication manager, when data is confirmed on the channel */
    public synchronized void replicated()
    {
-      if (--pendings == 0)
+      if (pendings.decrementAndGet() == 0 && complete)
       {
          flush();
       }
    }
 
-   /**
-    * 
+
+   /* (non-Javadoc)
+    * @see org.hornetq.core.replication.ReplicationToken#complete()
     */
-   public void flush()
+   public synchronized void complete()
    {
+      complete = true;
+      if (pendings.get() == 0 && complete)
+      {
+         flush();
+      }
+  }
+   
+   public synchronized void flush()
+   {
       if (tasks != null)
       {
          for (Runnable run : tasks)
          {
-            executor.execute(run);
+            run.run();
          }
          tasks.clear();
       }
    }
    
-   /** You may have several actions to be done after a replication operation is completed. */
-   public synchronized void addReplicationAction(Runnable runnable)
-   {
-      if (pendings == 0)
-      {
-         executor.execute(runnable);
-      }
-      else
-      {
-         if (tasks == null)
-         {
-            tasks = new ArrayList<Runnable>();
-         }
-         
-         tasks.add(runnable);
-      }
-   }
-
-   /* (non-Javadoc)
-    * @see org.hornetq.core.replication.ReplicationToken#complete()
-    */
-   public void complete()
-   {
-      // TODO Auto-generated method stub
-      
-   }
+   
 }

Modified: trunk/src/main/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java	2009-11-03 15:02:43 UTC (rev 8194)
+++ trunk/src/main/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java	2009-11-03 16:55:02 UTC (rev 8195)
@@ -16,12 +16,15 @@
 import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.REPLICATION_LARGE_MESSAGE_BEGIN;
 import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.REPLICATION_LARGE_MESSAGE_END;
 import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.REPLICATION_LARGE_MESSAGE_WRITE;
+import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.REPLICATION_COMPARE_DATA;
 
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 
 import org.hornetq.core.config.Configuration;
+import org.hornetq.core.exception.HornetQException;
 import org.hornetq.core.journal.Journal;
+import org.hornetq.core.journal.JournalLoadInformation;
 import org.hornetq.core.logging.Logger;
 import org.hornetq.core.paging.Page;
 import org.hornetq.core.paging.PagedMessage;
@@ -31,10 +34,13 @@
 import org.hornetq.core.persistence.impl.journal.JournalStorageManager;
 import org.hornetq.core.remoting.Channel;
 import org.hornetq.core.remoting.Packet;
+import org.hornetq.core.remoting.impl.wireformat.HornetQExceptionMessage;
+import org.hornetq.core.remoting.impl.wireformat.NullResponseMessage;
 import org.hornetq.core.remoting.impl.wireformat.PacketImpl;
 import org.hornetq.core.remoting.impl.wireformat.ReplicationAddMessage;
 import org.hornetq.core.remoting.impl.wireformat.ReplicationAddTXMessage;
 import org.hornetq.core.remoting.impl.wireformat.ReplicationCommitMessage;
+import org.hornetq.core.remoting.impl.wireformat.ReplicationCompareDataMessage;
 import org.hornetq.core.remoting.impl.wireformat.ReplicationDeleteMessage;
 import org.hornetq.core.remoting.impl.wireformat.ReplicationDeleteTXMessage;
 import org.hornetq.core.remoting.impl.wireformat.ReplicationLargeMessageBeingMessage;
@@ -84,6 +90,8 @@
 
    private PagingManager pageManager;
 
+   private JournalLoadInformation[] journalLoadInformation;
+
    private final ConcurrentMap<SimpleString, ConcurrentMap<Integer, Page>> pageIndex = new ConcurrentHashMap<SimpleString, ConcurrentMap<Integer, Page>>();
 
    private final ConcurrentMap<Long, LargeServerMessage> largeMessages = new ConcurrentHashMap<Long, LargeServerMessage>();
@@ -101,6 +109,8 @@
     */
    public void handlePacket(final Packet packet)
    {
+      PacketImpl response = new ReplicationResponseMessage();
+
       try
       {
          if (packet.getType() == PacketImpl.REPLICATION_APPEND)
@@ -147,6 +157,11 @@
          {
             handleLargeMessageEnd((ReplicationLargemessageEndMessage)packet);
          }
+         else if (packet.getType() == REPLICATION_COMPARE_DATA)
+         {
+            handleCompareDataMessage((ReplicationCompareDataMessage)packet);
+            response = new NullResponseMessage();
+         }
          else
          {
             log.warn("Packet " + packet + " can't be processed by the ReplicationEndpoint");
@@ -154,10 +169,10 @@
       }
       catch (Exception e)
       {
-         // TODO: what to do when the IO fails on the backup side? should we shutdown the backup?
          log.warn(e.getMessage(), e);
+         response = new HornetQExceptionMessage((HornetQException)e);
       }
-      channel.send(new ReplicationResponseMessage());
+      channel.send(response);
    }
 
    /* (non-Javadoc)
@@ -182,7 +197,7 @@
       messagingJournal = storage.getMessageJournal();
 
       // We only need to load internal structures on the backup...
-      storage.loadInternalOnly();
+      journalLoadInformation = storage.loadInternalOnly();
 
       pageManager = new PagingManagerImpl(new PagingStoreFactoryNIO(config.getPagingDirectory(),
                                                                     server.getExecutorFactory()),
@@ -199,7 +214,12 @@
     */
    public void stop() throws Exception
    {
-      channel.close();
+      // This could be null if the backup server is being
+      // shut down without any live server connecting here
+      if (channel != null)
+      {
+         channel.close();
+      }
       storage.stop();
 
       for (ConcurrentMap<Integer, Page> map : pageIndex.values())
@@ -243,6 +263,52 @@
       this.channel = channel;
    }
 
+   public void compareJournalInformation(JournalLoadInformation[] journalInformation) throws HornetQException
+   {
+      if (this.journalLoadInformation == null || this.journalLoadInformation.length != journalInformation.length)
+      {
+         throw new HornetQException(HornetQException.INTERNAL_ERROR,
+                                    "Live Node contains more journals than the backup node. Probably a version match error");
+      }
+
+      for (int i = 0; i < journalInformation.length; i++)
+      {
+         if (!journalInformation[i].equals(this.journalLoadInformation[i]))
+         {
+            log.warn("Journal comparisson mismatch:\n" + journalParametersToString(journalInformation));
+            throw new HornetQException(HornetQException.ILLEGAL_STATE,
+                                       "Backup node can't connect to the live node as the data differs");
+         }
+      }
+
+   }
+
+   /**
+    * @param journalInformation
+    */
+   private String journalParametersToString(JournalLoadInformation[] journalInformation)
+   {
+      return "**********************************************************\n" +
+               "parameters:\n" +
+               "Bindings = " +
+               journalInformation[0] +
+               "\n" +
+               "Messaging = " +
+               journalInformation[1] +
+               "\n" +
+               "**********************************************************" +
+               "\n" +
+               "Expected:" +
+               "\n" +
+               "Bindings = " +
+               this.journalLoadInformation[0] +
+               "\n" +
+               "Messaging = " +
+               this.journalLoadInformation[1] +
+               "\n" +
+               "**********************************************************";
+   }
+
    // Package protected ---------------------------------------------
 
    // Protected -----------------------------------------------------
@@ -279,6 +345,15 @@
          message.addBytes(packet.getBody());
       }
    }
+   
+      /**
+    * @param request
+    */
+   private void handleCompareDataMessage(ReplicationCompareDataMessage request) throws HornetQException
+   {
+      compareJournalInformation(request.getJournalInformation());
+   }
+   
 
    private LargeServerMessage lookupLargeMessage(long messageId, boolean delete)
    {
@@ -301,7 +376,6 @@
       return message;
 
    }
-
    /**
     * @param packet
     */

Modified: trunk/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java	2009-11-03 15:02:43 UTC (rev 8194)
+++ trunk/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java	2009-11-03 16:55:02 UTC (rev 8195)
@@ -22,6 +22,7 @@
 import org.hornetq.core.client.impl.FailoverManager;
 import org.hornetq.core.exception.HornetQException;
 import org.hornetq.core.journal.EncodingSupport;
+import org.hornetq.core.journal.JournalLoadInformation;
 import org.hornetq.core.logging.Logger;
 import org.hornetq.core.paging.PagedMessage;
 import org.hornetq.core.remoting.Channel;
@@ -33,6 +34,7 @@
 import org.hornetq.core.remoting.impl.wireformat.ReplicationAddMessage;
 import org.hornetq.core.remoting.impl.wireformat.ReplicationAddTXMessage;
 import org.hornetq.core.remoting.impl.wireformat.ReplicationCommitMessage;
+import org.hornetq.core.remoting.impl.wireformat.ReplicationCompareDataMessage;
 import org.hornetq.core.remoting.impl.wireformat.ReplicationDeleteMessage;
 import org.hornetq.core.remoting.impl.wireformat.ReplicationDeleteTXMessage;
 import org.hornetq.core.remoting.impl.wireformat.ReplicationLargeMessageBeingMessage;
@@ -86,7 +88,7 @@
    private final Queue<ReplicationContext> pendingTokens = new ConcurrentLinkedQueue<ReplicationContext>();
 
    private final ConcurrentHashSet<ReplicationContext> activeContexts = new ConcurrentHashSet<ReplicationContext>();
-
+ 
    // Static --------------------------------------------------------
 
    // Constructors --------------------------------------------------
@@ -304,8 +306,19 @@
     */
    public synchronized void start() throws Exception
    {
+      if (started)
+      {
+         throw new IllegalStateException("ReplicationManager is already started");
+      }
       connection = failoverManager.getConnection();
 
+      if (connection == null)
+      {
+         log.warn("Backup server MUST be started before live server. Initialisation will not proceed.");
+         throw new HornetQException(HornetQException.ILLEGAL_STATE,
+                                    "Backup server MUST be started before live server. Initialisation will not proceed.");
+      }
+
       long channelID = connection.generateChannelID();
 
       Channel mainChannel = connection.getChannel(1, -1);
@@ -381,7 +394,7 @@
       ReplicationContext token = tlReplicationContext.get();
       if (token == null)
       {
-         token = new ReplicationContextImpl(executor);
+         token = new ReplicationContextImpl();
          activeContexts.add(token);
          tlReplicationContext.set(token);
       }
@@ -414,6 +427,7 @@
                activeContexts.remove(token);
             }
          });
+         token.complete();
       }
    }
 
@@ -455,7 +469,16 @@
          repliToken.replicated();
       }
    }
+   
+   /* (non-Javadoc)
+    * @see org.hornetq.core.replication.ReplicationManager#compareJournals(org.hornetq.core.journal.JournalLoadInformation[])
+    */
+   public void compareJournals(JournalLoadInformation[] journalInfo) throws HornetQException
+   {
+      replicatingChannel.sendBlocking(new ReplicationCompareDataMessage(journalInfo));
+   }
 
+
    private void replicated()
    {
       ReplicationContext tokenPolled = pendingTokens.poll();

Modified: trunk/src/main/org/hornetq/core/server/HornetQServer.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/HornetQServer.java	2009-11-03 15:02:43 UTC (rev 8194)
+++ trunk/src/main/org/hornetq/core/server/HornetQServer.java	2009-11-03 16:55:02 UTC (rev 8195)
@@ -20,6 +20,7 @@
 
 import org.hornetq.core.config.Configuration;
 import org.hornetq.core.exception.HornetQException;
+import org.hornetq.core.journal.JournalLoadInformation;
 import org.hornetq.core.management.ManagementService;
 import org.hornetq.core.management.impl.HornetQServerControlImpl;
 import org.hornetq.core.persistence.StorageManager;
@@ -73,7 +74,9 @@
 
    ReattachSessionResponseMessage reattachSession(RemotingConnection connection, String name, int lastReceivedCommandID) throws Exception;
 
-   ReplicationEndpoint createReplicationEndpoint(Channel channel) throws Exception;
+   /** The journal at the backup server has to be equivalent as the journal used on the live node. 
+    *  Or else the backup node is out of sync. */
+   ReplicationEndpoint connectToReplicationEndpoint(Channel channel) throws Exception;
 
    CreateSessionResponseMessage createSession(String name,
                                               long channelID,                                              

Modified: trunk/src/main/org/hornetq/core/server/cluster/impl/Redistributor.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/cluster/impl/Redistributor.java	2009-11-03 15:02:43 UTC (rev 8194)
+++ trunk/src/main/org/hornetq/core/server/cluster/impl/Redistributor.java	2009-11-03 16:55:02 UTC (rev 8195)
@@ -23,7 +23,6 @@
 import org.hornetq.core.server.HandleStatus;
 import org.hornetq.core.server.MessageReference;
 import org.hornetq.core.server.Queue;
-import org.hornetq.core.server.impl.RoutingContextImpl;
 import org.hornetq.core.transaction.Transaction;
 import org.hornetq.core.transaction.impl.TransactionImpl;
 import org.hornetq.utils.Future;
@@ -145,40 +144,41 @@
 
       tx.commit();
 
-      
-      Runnable action = new Runnable()
+      if (storageManager.isReplicated())
       {
-         public void run()
+         storageManager.afterReplicated(new Runnable()
          {
-            
-            count++;
-      
-            if (count == batchSize)
+            public void run()
             {
-               // We continue the next batch on a different thread, so as not to keep the delivery thread busy for a very
-               // long time in the case there are many messages in the queue
-               active = false;
-      
-               
-               executor.execute(new Prompter());
-      
-               count = 0;
+               execPrompter();
             }
-            
-         }
-      };
-      
-      if (storageManager.isReplicated())
-      {
-         storageManager.afterReplicated(action);
+         });
          storageManager.completeReplication();
       }
       else
       {
-         action.run();
+         execPrompter();
       }
    }
+   
+   private void execPrompter()
+   {
+      count++;
+      
+      if (count == batchSize)
+      {
+         // We continue the next batch on a different thread, so as not to keep the delivery thread busy for a very
+         // long time in the case there are many messages in the queue
+         active = false;
 
+         
+         executor.execute(new Prompter());
+
+         count = 0;
+      }
+      
+   }
+
    private class Prompter implements Runnable
    {
       public void run()

Modified: trunk/src/main/org/hornetq/core/server/impl/HornetQPacketHandler.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/HornetQPacketHandler.java	2009-11-03 15:02:43 UTC (rev 8194)
+++ trunk/src/main/org/hornetq/core/server/impl/HornetQPacketHandler.java	2009-11-03 16:55:02 UTC (rev 8195)
@@ -15,8 +15,8 @@
 
 import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.CREATESESSION;
 import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.CREATE_QUEUE;
+import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.CREATE_REPLICATION;
 import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.REATTACH_SESSION;
-import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.CREATE_REPLICATION;
 
 import org.hornetq.core.exception.HornetQException;
 import org.hornetq.core.logging.Logger;
@@ -194,7 +194,7 @@
       {
          Channel channel = connection.getChannel(request.getSessionChannelID(), request.getWindowSize());
 
-         ReplicationEndpoint endpoint = server.createReplicationEndpoint(channel);
+         ReplicationEndpoint endpoint = server.connectToReplicationEndpoint(channel);
 
          channel.setHandler(endpoint);
 

Modified: trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java	2009-11-03 15:02:43 UTC (rev 8194)
+++ trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java	2009-11-03 16:55:02 UTC (rev 8195)
@@ -50,6 +50,7 @@
 import org.hornetq.core.exception.HornetQException;
 import org.hornetq.core.filter.Filter;
 import org.hornetq.core.filter.impl.FilterImpl;
+import org.hornetq.core.journal.JournalLoadInformation;
 import org.hornetq.core.journal.impl.SyncSpeedTest;
 import org.hornetq.core.logging.LogDelegateFactory;
 import org.hornetq.core.logging.Logger;
@@ -267,13 +268,14 @@
    {
       initialiseLogging();
 
-      log.info((configuration.isBackup() ? "backup" : "live") + " server is starting..");
-
       if (started)
       {
+         log.info((configuration.isBackup() ? "backup" : "live") + " is already started, ignoring the call to start..");
          return;
       }
 
+      log.info((configuration.isBackup() ? "backup" : "live") + " server is starting..");
+
       if (configuration.isRunSyncSpeedTest())
       {
          SyncSpeedTest test = new SyncSpeedTest();
@@ -285,6 +287,11 @@
 
       if (configuration.isBackup())
       {
+         if (!configuration.isSharedStore())
+         {
+            this.replicationEndpoint = new ReplicationEndpointImpl(this);
+            this.replicationEndpoint.start();
+         }
          // We defer actually initialisation until the live node has contacted the backup
          log.info("Backup server initialised");
       }
@@ -658,19 +665,20 @@
       return new CreateSessionResponseMessage(version.getIncrementingVersion());
    }
 
-   public synchronized ReplicationEndpoint createReplicationEndpoint(final Channel channel) throws Exception
+   public synchronized ReplicationEndpoint connectToReplicationEndpoint(final Channel channel) throws Exception
    {
       if (!configuration.isBackup())
       {
          throw new HornetQException(HornetQException.ILLEGAL_STATE, "Connected server is not a backup server");
       }
 
-      if (replicationEndpoint == null)
+      if (replicationEndpoint.getChannel() != null)
       {
-         replicationEndpoint = new ReplicationEndpointImpl(this);
-         replicationEndpoint.setChannel(channel);
-         replicationEndpoint.start();
+         throw new HornetQException(HornetQException.ILLEGAL_STATE, "Backup replication server is already connected to another server");
       }
+      
+      replicationEndpoint.setChannel(channel);
+      
 
       return replicationEndpoint;
    }
@@ -891,7 +899,7 @@
    {
       String backupConnectorName = configuration.getBackupConnectorName();
 
-      if (backupConnectorName != null)
+      if (!configuration.isSharedStore() && backupConnectorName != null)
       {
          TransportConfiguration backupConnector = configuration.getConnectorConfigurations().get(backupConnectorName);
 
@@ -1086,9 +1094,13 @@
          }
       }
       deployGroupingHandlerConfiguration(configuration.getGroupingHandlerConfiguration());
+      
       // Load the journal and populate queues, transactions and caches in memory
-      loadJournal();
+      JournalLoadInformation[] journalInfo = loadJournals();
+      
+      compareJournals(journalInfo);
 
+
       // Deploy any queues in the Configuration class - if there's no file deployment we still need
       // to load those
       deployQueuesFromConfiguration();
@@ -1149,6 +1161,17 @@
       initialised = true;
    }
 
+   /**
+    * @param journalInfo
+    */
+   private void compareJournals(JournalLoadInformation[] journalInfo) throws Exception
+   {
+      if (replicationManager != null)
+      {
+         replicationManager.compareJournals(journalInfo);
+      }
+   }
+
    private void deployQueuesFromConfiguration() throws Exception
    {
       for (QueueConfiguration config : configuration.getQueueConfigurations())
@@ -1160,13 +1183,15 @@
       }
    }
 
-   private void loadJournal() throws Exception
+   private JournalLoadInformation[] loadJournals() throws Exception
    {
+      JournalLoadInformation[] journalInfo = new JournalLoadInformation[2];
+      
       List<QueueBindingInfo> queueBindingInfos = new ArrayList<QueueBindingInfo>();
 
       List<GroupingInfo> groupingInfos = new ArrayList<GroupingInfo>();
 
-      storageManager.loadBindingJournal(queueBindingInfos, groupingInfos);
+      journalInfo[0] = storageManager.loadBindingJournal(queueBindingInfos, groupingInfos);
 
       // Set the node id - must be before we load the queues into the postoffice, but after we load the journal
       setNodeID();
@@ -1206,7 +1231,7 @@
 
       Map<SimpleString, List<Pair<byte[], Long>>> duplicateIDMap = new HashMap<SimpleString, List<Pair<byte[], Long>>>();
 
-      storageManager.loadMessageJournal(postOffice, pagingManager, resourceManager, queues, duplicateIDMap);
+      journalInfo[1] = storageManager.loadMessageJournal(postOffice, pagingManager, resourceManager, queues, duplicateIDMap);
 
       for (Map.Entry<SimpleString, List<Pair<byte[], Long>>> entry : duplicateIDMap.entrySet())
       {
@@ -1219,6 +1244,8 @@
             cache.load(entry.getValue());
          }
       }
+      
+      return journalInfo;
    }
 
    private void setNodeID() throws Exception

Modified: trunk/tests/src/org/hornetq/tests/integration/client/SessionFactoryTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/client/SessionFactoryTest.java	2009-11-03 15:02:43 UTC (rev 8194)
+++ trunk/tests/src/org/hornetq/tests/integration/client/SessionFactoryTest.java	2009-11-03 16:55:02 UTC (rev 8195)
@@ -853,6 +853,7 @@
       Configuration backupConf = new ConfigurationImpl();
       backupConf.setSecurityEnabled(false);
       backupConf.setClustered(true);
+      backupConf.setSharedStore(true);
       backupParams.put(TransportConstants.SERVER_ID_PROP_NAME, 1);
       backupConf.getAcceptorConfigurations()
                 .add(new TransportConfiguration("org.hornetq.core.remoting.impl.invm.InVMAcceptorFactory",
@@ -873,6 +874,7 @@
       connectors.put(liveTC.getName(), liveTC);
       liveConf.setConnectorConfigurations(connectors);
       liveConf.setBackupConnectorName(backupTC.getName());
+      liveConf.setSharedStore(true);
       liveConf.setClustered(true);
 
       List<Pair<String, String>> connectorNames = new ArrayList<Pair<String, String>>();

Modified: trunk/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java	2009-11-03 15:02:43 UTC (rev 8194)
+++ trunk/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java	2009-11-03 16:55:02 UTC (rev 8195)
@@ -29,7 +29,6 @@
 import org.hornetq.core.client.ClientSession;
 import org.hornetq.core.client.ClientSessionFactory;
 import org.hornetq.core.client.impl.ClientSessionFactoryImpl;
-import org.hornetq.core.client.impl.FailoverManagerImpl;
 import org.hornetq.core.config.Configuration;
 import org.hornetq.core.config.TransportConfiguration;
 import org.hornetq.core.config.cluster.BroadcastGroupConfiguration;
@@ -37,6 +36,7 @@
 import org.hornetq.core.config.cluster.DiscoveryGroupConfiguration;
 import org.hornetq.core.config.impl.ConfigurationImpl;
 import org.hornetq.core.logging.Logger;
+import org.hornetq.core.message.impl.MessageImpl;
 import org.hornetq.core.postoffice.Binding;
 import org.hornetq.core.postoffice.Bindings;
 import org.hornetq.core.postoffice.PostOffice;
@@ -45,11 +45,10 @@
 import org.hornetq.core.server.HornetQ;
 import org.hornetq.core.server.HornetQServer;
 import org.hornetq.core.server.JournalType;
-import org.hornetq.core.server.group.impl.GroupingHandlerConfiguration;
-import org.hornetq.core.server.group.GroupingHandler;
 import org.hornetq.core.server.cluster.ClusterConnection;
 import org.hornetq.core.server.cluster.RemoteQueueBinding;
-import org.hornetq.core.message.impl.MessageImpl;
+import org.hornetq.core.server.group.GroupingHandler;
+import org.hornetq.core.server.group.impl.GroupingHandlerConfiguration;
 import org.hornetq.integration.transports.netty.TransportConstants;
 import org.hornetq.tests.util.ServiceTestBase;
 import org.hornetq.utils.Pair;
@@ -68,7 +67,7 @@
 {
    private static final Logger log = Logger.getLogger(ClusterTestBase.class);
 
-   private static final int[] PORTS = {TransportConstants.DEFAULT_PORT,
+   private static final int[] PORTS = { TransportConstants.DEFAULT_PORT,
                                        TransportConstants.DEFAULT_PORT + 1,
                                        TransportConstants.DEFAULT_PORT + 2,
                                        TransportConstants.DEFAULT_PORT + 3,
@@ -77,8 +76,7 @@
                                        TransportConstants.DEFAULT_PORT + 6,
                                        TransportConstants.DEFAULT_PORT + 7,
                                        TransportConstants.DEFAULT_PORT + 8,
-                                       TransportConstants.DEFAULT_PORT + 9,
-   };
+                                       TransportConstants.DEFAULT_PORT + 9, };
 
    private static final long WAIT_TIMEOUT = 10000;
 
@@ -136,7 +134,7 @@
 
    protected HornetQServer[] servers = new HornetQServer[MAX_SERVERS];
 
-   private ClientSessionFactory[] sfs = new ClientSessionFactory[MAX_SERVERS];
+   protected ClientSessionFactory[] sfs = new ClientSessionFactory[MAX_SERVERS];
 
    protected void waitForMessages(int node, final String address, final int count) throws Exception
    {
@@ -169,11 +167,11 @@
       }
       while (System.currentTimeMillis() - start < WAIT_TIMEOUT);
 
-      //System.out.println(threadDump(" - fired by ClusterTestBase::waitForBindings"));
+      // System.out.println(threadDump(" - fired by ClusterTestBase::waitForBindings"));
 
       throw new IllegalStateException("Timed out waiting for messages (messageCount = " + messageCount +
-            ", expecting = " +
-            count);
+                                      ", expecting = " +
+                                      count);
    }
 
    protected void waitForServerRestart(int node) throws Exception
@@ -181,7 +179,7 @@
       long start = System.currentTimeMillis();
       do
       {
-         if(servers[node].isInitialised())
+         if (servers[node].isInitialised())
          {
             return;
          }
@@ -201,15 +199,15 @@
                                   final int consumerCount,
                                   final boolean local) throws Exception
    {
-//      log.info("waiting for bindings on node " + node +
-//               " address " +
-//               address +
-//               " count " +
-//               count +
-//               " consumerCount " +
-//               consumerCount +
-//               " local " +
-//               local);
+      System.out.println("waiting for bindings on node " + node +
+                         " address " +
+                         address +
+                         " count " +
+                         count +
+                         " consumerCount " +
+                         consumerCount +
+                         " local " +
+                         local);
       HornetQServer server = this.servers[node];
 
       if (server == null)
@@ -237,7 +235,7 @@
          {
             if ((binding instanceof LocalQueueBinding && local) || (binding instanceof RemoteQueueBinding && !local))
             {
-               QueueBinding qBinding = (QueueBinding) binding;
+               QueueBinding qBinding = (QueueBinding)binding;
 
                bindingCount++;
 
@@ -245,7 +243,7 @@
             }
          }
 
-         //log.info(node + " binding count " + bindingCount + " consumer Count " + totConsumers);
+         // log.info(node + " binding count " + bindingCount + " consumer Count " + totConsumers);
 
          if (bindingCount == count && totConsumers == consumerCount)
          {
@@ -260,8 +258,8 @@
       // System.out.println(threadDump(" - fired by ClusterTestBase::waitForBindings"));
 
       String msg = "Timed out waiting for bindings (bindingCount = " + bindingCount +
-            ", totConsumers = " +
-            totConsumers;
+                   ", totConsumers = " +
+                   totConsumers;
 
       log.error(msg);
 
@@ -438,12 +436,23 @@
       session.close();
    }
 
-   protected void sendWithProperty(int node, String address, int numMessages, boolean durable, SimpleString key, SimpleString val) throws Exception
+   protected void sendWithProperty(int node,
+                                   String address,
+                                   int numMessages,
+                                   boolean durable,
+                                   SimpleString key,
+                                   SimpleString val) throws Exception
    {
       sendInRange(node, address, 0, numMessages, durable, key, val);
    }
 
-   protected void sendInRange(int node, String address, int msgStart, int msgEnd, boolean durable, SimpleString key, SimpleString val) throws Exception
+   protected void sendInRange(int node,
+                              String address,
+                              int msgStart,
+                              int msgEnd,
+                              boolean durable,
+                              SimpleString key,
+                              SimpleString val) throws Exception
    {
       ClientSessionFactory sf = this.sfs[node];
 
@@ -475,8 +484,11 @@
 
    protected void setUpGroupHandler(GroupingHandlerConfiguration.TYPE type, int node, int timeout)
    {
-      this.servers[node].getConfiguration().setGroupingHandlerConfiguration(
-            new GroupingHandlerConfiguration(new SimpleString("grouparbitrator"), type, new SimpleString("queues"), timeout));
+      this.servers[node].getConfiguration()
+                        .setGroupingHandlerConfiguration(new GroupingHandlerConfiguration(new SimpleString("grouparbitrator"),
+                                                                                          type,
+                                                                                          new SimpleString("queues"),
+                                                                                          timeout));
    }
 
    protected void setUpGroupHandler(GroupingHandler groupingHandler, int node)
@@ -499,17 +511,12 @@
       verifyReceiveAllInRangeNotBefore(false, -1, msgStart, msgEnd, consumerIDs);
    }
 
-   protected void verifyReceiveAllWithGroupIDRoundRobin(
-         int msgStart,
-         int msgEnd,
-         int... consumerIDs) throws Exception
+   protected void verifyReceiveAllWithGroupIDRoundRobin(int msgStart, int msgEnd, int... consumerIDs) throws Exception
    {
       verifyReceiveAllWithGroupIDRoundRobin(true, -1, msgStart, msgEnd, consumerIDs);
    }
 
-   protected int verifyReceiveAllOnSingleConsumer(int msgStart,
-                                                  int msgEnd,
-                                                  int... consumerIDs) throws Exception
+   protected int verifyReceiveAllOnSingleConsumer(int msgStart, int msgEnd, int... consumerIDs) throws Exception
    {
       return verifyReceiveAllOnSingleConsumer(true, msgStart, msgEnd, consumerIDs);
    }
@@ -553,7 +560,7 @@
                assertTrue("Message received too soon", System.currentTimeMillis() >= firstReceiveTime);
             }
 
-            SimpleString id = (SimpleString) message.getProperty(MessageImpl.HDR_GROUP_ID);
+            SimpleString id = (SimpleString)message.getProperty(MessageImpl.HDR_GROUP_ID);
             System.out.println("received " + id + " on consumer " + consumerIDs[i]);
             if (groupIdsReceived.get(id) == null)
             {
@@ -561,20 +568,20 @@
             }
             else if (groupIdsReceived.get(id) != i)
             {
-               fail("consumer " + groupIdsReceived.get(id) + " already bound to groupid " + id + " received on consumer " + i);
+               fail("consumer " + groupIdsReceived.get(id) +
+                    " already bound to groupid " +
+                    id +
+                    " received on consumer " +
+                    i);
             }
 
          }
 
       }
 
-
    }
 
-   protected int verifyReceiveAllOnSingleConsumer(boolean ack,
-                                                  int msgStart,
-                                                  int msgEnd,
-                                                  int... consumerIDs) throws Exception
+   protected int verifyReceiveAllOnSingleConsumer(boolean ack, int msgStart, int msgEnd, int... consumerIDs) throws Exception
    {
       int groupIdsReceived = -1;
       for (int i = 0; i < consumerIDs.length; i++)
@@ -649,7 +656,7 @@
                assertTrue("Message received too soon", System.currentTimeMillis() >= firstReceiveTime);
             }
 
-            if (j != (Integer) (message.getProperty(COUNT_PROP)))
+            if (j != (Integer)(message.getProperty(COUNT_PROP)))
             {
                outOfOrder = true;
                System.out.println("Message j=" + j + " was received out of order = " + message.getProperty(COUNT_PROP));
@@ -707,8 +714,8 @@
             if (message != null)
             {
                log.info("check receive Consumer " + consumerIDs[i] +
-                     " received message " +
-                     message.getProperty(COUNT_PROP));
+                        " received message " +
+                        message.getProperty(COUNT_PROP));
             }
             else
             {
@@ -779,7 +786,7 @@
 
             if (message != null)
             {
-               int count = (Integer) message.getProperty(COUNT_PROP);
+               int count = (Integer)message.getProperty(COUNT_PROP);
 
                Integer prevCount = countMap.get(i);
 
@@ -799,7 +806,7 @@
                   message.acknowledge();
                }
 
-               //log.info("consumer " + consumerIDs[i] +" returns " + count);
+               // log.info("consumer " + consumerIDs[i] +" returns " + count);
             }
             else
             {
@@ -841,7 +848,7 @@
 
             if (message != null)
             {
-               int count = (Integer) message.getProperty(COUNT_PROP);
+               int count = (Integer)message.getProperty(COUNT_PROP);
 
                // log.info("consumer " + consumerIDs[i] + " received message " + count);
 
@@ -889,7 +896,7 @@
 
          assertNotNull(list);
 
-         int elem = (Integer) list.poll();
+         int elem = (Integer)list.poll();
 
          assertEquals(messageCounts[i], elem);
 
@@ -929,7 +936,7 @@
 
          if (message != null)
          {
-            int count = (Integer) message.getProperty(COUNT_PROP);
+            int count = (Integer)message.getProperty(COUNT_PROP);
 
             ints.add(count);
          }
@@ -1011,18 +1018,21 @@
          serverTotc = new TransportConfiguration(INVM_CONNECTOR_FACTORY, params);
       }
 
-      Map<String, Object> backupParams = generateParams(backupNode, netty);
+      TransportConfiguration serverBackuptc = null;
 
-      TransportConfiguration serverBackuptc;
+      if (backupNode != -1)
+      {
+         Map<String, Object> backupParams = generateParams(backupNode, netty);
 
-      if (netty)
-      {
-         serverBackuptc = new TransportConfiguration(NETTY_CONNECTOR_FACTORY, backupParams);
+         if (netty)
+         {
+            serverBackuptc = new TransportConfiguration(NETTY_CONNECTOR_FACTORY, backupParams);
+         }
+         else
+         {
+            serverBackuptc = new TransportConfiguration(INVM_CONNECTOR_FACTORY, backupParams);
+         }
       }
-      else
-      {
-         serverBackuptc = new TransportConfiguration(INVM_CONNECTOR_FACTORY, backupParams);
-      }
 
       ClientSessionFactory sf = new ClientSessionFactoryImpl(serverTotc, serverBackuptc);
 
@@ -1068,6 +1078,16 @@
 
    protected void setupServer(int node, boolean fileStorage, boolean netty, boolean backup, int backupNode)
    {
+      setupServer(node, fileStorage, true, netty, backup, backupNode);
+   }
+
+   protected void setupServer(int node,
+                              boolean fileStorage,
+                              boolean sharedStorage,
+                              boolean netty,
+                              boolean backup,
+                              int backupNode)
+   {
       if (servers[node] != null)
       {
          throw new IllegalArgumentException("Already a server at node " + node);
@@ -1076,15 +1096,28 @@
       Configuration configuration = new ConfigurationImpl();
 
       configuration.setSecurityEnabled(false);
-      configuration.setBindingsDirectory(getBindingsDir(node, backup));
       configuration.setJournalMinFiles(2);
       configuration.setJournalMaxAIO(1000);
-      configuration.setJournalDirectory(getJournalDir(node, backup));
       configuration.setJournalFileSize(100 * 1024);
       configuration.setJournalType(JournalType.ASYNCIO);
       configuration.setJournalMaxAIO(1000);
-      configuration.setPagingDirectory(getPageDir(node, backup));
-      configuration.setLargeMessagesDirectory(getLargeMessagesDir(node, backup));
+      configuration.setSharedStore(sharedStorage);
+      if (sharedStorage)
+      {
+         // Shared storage will share the node between the backup and live node
+         int nodeDirectoryToUse = backupNode == -1 ? node : backupNode;
+         configuration.setBindingsDirectory(getBindingsDir(nodeDirectoryToUse, false));
+         configuration.setJournalDirectory(getJournalDir(nodeDirectoryToUse, false));
+         configuration.setPagingDirectory(getPageDir(nodeDirectoryToUse, false));
+         configuration.setLargeMessagesDirectory(getLargeMessagesDir(nodeDirectoryToUse, false));
+      }
+      else
+      {
+         configuration.setBindingsDirectory(getBindingsDir(node, backup));
+         configuration.setJournalDirectory(getJournalDir(node, backup));
+         configuration.setPagingDirectory(getPageDir(node, backup));
+         configuration.setLargeMessagesDirectory(getLargeMessagesDir(node, backup));
+      }
       configuration.setClustered(true);
       configuration.setJournalCompactMinFiles(0);
       configuration.setBackup(backup);
@@ -1141,7 +1174,6 @@
       servers[node] = server;
    }
 
-  
    protected void setupServerWithDiscovery(int node,
                                            String groupAddress,
                                            int port,
@@ -1238,21 +1270,21 @@
          configuration.getConnectorConfigurations().put(nettytc_c.getName(), nettytc_c);
 
          connectorPairs.add(new Pair<String, String>(nettytc_c.getName(),
-               nettyBackuptc == null ? null : nettyBackuptc.getName()));
+                                                     nettyBackuptc == null ? null : nettyBackuptc.getName()));
       }
       else
       {
          connectorPairs.add(new Pair<String, String>(invmtc_c.getName(), invmBackuptc == null ? null
-               : invmBackuptc.getName()));
+                                                                                             : invmBackuptc.getName()));
       }
 
       BroadcastGroupConfiguration bcConfig = new BroadcastGroupConfiguration("bg1",
-            null,
-            -1,
-            groupAddress,
-            port,
-            250,
-            connectorPairs);
+                                                                             null,
+                                                                             -1,
+                                                                             groupAddress,
+                                                                             port,
+                                                                             250,
+                                                                             connectorPairs);
 
       configuration.getBroadcastGroupConfigurations().add(bcConfig);
 
@@ -1280,7 +1312,7 @@
       if (netty)
       {
          params.put(org.hornetq.integration.transports.netty.TransportConstants.PORT_PROP_NAME,
-               org.hornetq.integration.transports.netty.TransportConstants.DEFAULT_PORT + node);
+                    org.hornetq.integration.transports.netty.TransportConstants.DEFAULT_PORT + node);
       }
       else
       {
@@ -1351,12 +1383,12 @@
       pairs.add(connectorPair);
 
       ClusterConnectionConfiguration clusterConf = new ClusterConnectionConfiguration(name,
-            address,
-            100,
-            true,
-            forwardWhenNoConsumers,
-            maxHops,
-            pairs);
+                                                                                      address,
+                                                                                      100,
+                                                                                      true,
+                                                                                      forwardWhenNoConsumers,
+                                                                                      maxHops,
+                                                                                      pairs);
       serverFrom.getConfiguration().getClusterConfigurations().add(clusterConf);
    }
 
@@ -1402,12 +1434,12 @@
       }
 
       ClusterConnectionConfiguration clusterConf = new ClusterConnectionConfiguration(name,
-            address,
-            250,
-            true,
-            forwardWhenNoConsumers,
-            maxHops,
-            pairs);
+                                                                                      address,
+                                                                                      250,
+                                                                                      true,
+                                                                                      forwardWhenNoConsumers,
+                                                                                      maxHops,
+                                                                                      pairs);
 
       serverFrom.getConfiguration().getClusterConfigurations().add(clusterConf);
    }
@@ -1466,18 +1498,16 @@
 
          Pair<String, String> connectorPair = new Pair<String, String>(serverTotc.getName(), serverBackupTotc.getName());
 
-         // Pair<String, String> connectorPair = new Pair<String, String>(serverTotc.getName(), null);
-
          pairs.add(connectorPair);
       }
 
       ClusterConnectionConfiguration clusterConf = new ClusterConnectionConfiguration(name,
-            address,
-            250,
-            true,
-            forwardWhenNoConsumers,
-            maxHops,
-            pairs);
+                                                                                      address,
+                                                                                      250,
+                                                                                      true,
+                                                                                      forwardWhenNoConsumers,
+                                                                                      maxHops,
+                                                                                      pairs);
 
       serverFrom.getConfiguration().getClusterConfigurations().add(clusterConf);
    }
@@ -1498,12 +1528,12 @@
       }
 
       ClusterConnectionConfiguration clusterConf = new ClusterConnectionConfiguration(name,
-            address,
-            100,
-            true,
-            forwardWhenNoConsumers,
-            maxHops,
-            discoveryGroupName);
+                                                                                      address,
+                                                                                      100,
+                                                                                      true,
+                                                                                      forwardWhenNoConsumers,
+                                                                                      maxHops,
+                                                                                      discoveryGroupName);
       List<ClusterConnectionConfiguration> clusterConfs = server.getConfiguration().getClusterConfigurations();
 
       clusterConfs.add(clusterConf);

Modified: trunk/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTest.java	2009-11-03 15:02:43 UTC (rev 8194)
+++ trunk/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTest.java	2009-11-03 16:55:02 UTC (rev 8195)
@@ -85,15 +85,15 @@
    {
       super(name);
    }
-   
+
    public FailoverTest()
    {
    }
-   
+
    abstract class BaseListener implements SessionFailureListener
    {
       public void beforeReconnect(HornetQException me)
-      {            
+      {
       }
    }
 
@@ -115,7 +115,7 @@
          public void connectionFailed(HornetQException me)
          {
             latch.countDown();
-         }                 
+         }
       }
 
       session.addFailureListener(new MyListener());
@@ -136,7 +136,7 @@
       }
 
       fail(session, latch);
-      
+
       log.info("got here 1");
 
       ClientConsumer consumer = session.createConsumer(ADDRESS);
@@ -169,6 +169,77 @@
       assertEquals(0, sf.numConnections());
    }
 
+   
+   /** It doesn't fail, but it restart both servers, live and backup, and the data should be received after the restart,
+    *  and the servers should be able to connect without any problems. */
+   public void testRestartServers() throws Exception
+   {
+      ClientSessionFactoryInternal sf = getSessionFactory();
+
+      sf.setBlockOnNonPersistentSend(true);
+      sf.setBlockOnPersistentSend(true);
+
+      ClientSession session = sf.createSession(true, true);
+
+      session.createQueue(ADDRESS, ADDRESS, null, true);
+
+      ClientProducer producer = session.createProducer(ADDRESS);
+
+      final int numMessages = 100;
+
+      for (int i = 0; i < numMessages; i++)
+      {
+         ClientMessage message = session.createClientMessage(true);
+
+         setBody(i, message);
+
+         message.putIntProperty("counter", i);
+
+         producer.send(message);
+      }
+
+      session.commit();
+
+      session.close();
+
+      server0Service.stop();
+      server1Service.stop();
+
+      server1Service.start();
+      server0Service.start();
+
+      sf = getSessionFactory();
+
+      sf.setBlockOnNonPersistentSend(true);
+      sf.setBlockOnPersistentSend(true);
+
+      session = sf.createSession(true, true);
+
+      ClientConsumer consumer = session.createConsumer(ADDRESS);
+
+      session.start();
+
+      for (int i = 0; i < numMessages; i++)
+      {
+         ClientMessage message = consumer.receive(1000);
+
+         assertNotNull(message);
+
+         assertMessageBody(i, message);
+
+         assertEquals(i, message.getProperty("counter"));
+
+         message.acknowledge();
+      }
+
+      log.info("closing session");
+      session.close();
+
+      assertEquals(0, sf.numSessions());
+
+      assertEquals(0, sf.numConnections());
+   }
+
    /**
     * @param session
     * @param latch
@@ -176,7 +247,7 @@
     */
    private void fail(ClientSession session, final CountDownLatch latch) throws InterruptedException
    {
-      
+
       RemotingConnection conn = ((ClientSessionInternal)session).getConnection();
 
       // Simulate failure on connection
@@ -202,14 +273,13 @@
 
       final CountDownLatch latch = new CountDownLatch(1);
 
-      class MyListener  extends BaseListener
+      class MyListener extends BaseListener
       {
          public void connectionFailed(HornetQException me)
          {
             latch.countDown();
          }
-         
-         
+
       }
 
       session.addFailureListener(new MyListener());
@@ -228,7 +298,7 @@
 
          producer.send(message);
       }
-      
+
       fail(session, latch);
 
       assertTrue(session.isRollbackOnly());
@@ -272,7 +342,7 @@
 
       final CountDownLatch latch = new CountDownLatch(1);
 
-      class MyListener  extends BaseListener
+      class MyListener extends BaseListener
       {
          public void connectionFailed(HornetQException me)
          {
@@ -330,7 +400,7 @@
       }
 
       assertNull(consumer.receive(1000));
-      
+
       session.commit();
 
       session.close();
@@ -353,7 +423,7 @@
 
       final CountDownLatch latch = new CountDownLatch(1);
 
-      class MyListener  extends BaseListener
+      class MyListener extends BaseListener
       {
          public void connectionFailed(HornetQException me)
          {
@@ -391,15 +461,15 @@
       fail(session, latch);
 
       session.commit();
-      
+
       session.close();
-      
+
       session = sf.createSession(false, false);
-      
+
       consumer = session.createConsumer(ADDRESS);
 
       session.start();
-      
+
       for (int i = 0; i < numMessages; i++)
       {
          // Only the persistent messages will survive
@@ -419,7 +489,7 @@
       }
 
       assertNull(consumer.receive(1000));
-      
+
       session.commit();
 
       session.close();
@@ -428,7 +498,7 @@
 
       assertEquals(0, sf.numConnections());
    }
-   
+
    public void testTransactedMessagesConsumedSoRollback() throws Exception
    {
       ClientSessionFactoryInternal sf = getSessionFactory();
@@ -442,7 +512,7 @@
 
       final CountDownLatch latch = new CountDownLatch(1);
 
-      class MyListener  extends BaseListener
+      class MyListener extends BaseListener
       {
          public void connectionFailed(HornetQException me)
          {
@@ -491,7 +561,7 @@
       fail(session2, latch);
 
       assertTrue(session2.isRollbackOnly());
-      
+
       try
       {
          session2.commit();
@@ -525,7 +595,7 @@
 
       final CountDownLatch latch = new CountDownLatch(1);
 
-      class MyListener  extends BaseListener
+      class MyListener extends BaseListener
       {
          public void connectionFailed(HornetQException me)
          {
@@ -578,7 +648,7 @@
       fail(session2, latch);
 
       assertFalse(session2.isRollbackOnly());
-      
+
       consumer = session2.createConsumer(ADDRESS);
 
       for (int i = numMessages / 2; i < numMessages; i++)
@@ -597,7 +667,7 @@
       session2.commit();
 
       assertNull(consumer.receive(1000));
-      
+
       session1.close();
 
       session2.close();
@@ -622,7 +692,7 @@
 
       final CountDownLatch latch = new CountDownLatch(1);
 
-      class MyListener  extends BaseListener
+      class MyListener extends BaseListener
       {
          public void connectionFailed(HornetQException me)
          {
@@ -692,7 +762,7 @@
 
       final CountDownLatch latch = new CountDownLatch(1);
 
-      class MyListener  extends BaseListener
+      class MyListener extends BaseListener
       {
          public void connectionFailed(HornetQException me)
          {
@@ -765,7 +835,7 @@
 
       final CountDownLatch latch = new CountDownLatch(1);
 
-      class MyListener  extends BaseListener
+      class MyListener extends BaseListener
       {
          public void connectionFailed(HornetQException me)
          {
@@ -839,7 +909,7 @@
 
       final CountDownLatch latch = new CountDownLatch(1);
 
-      class MyListener  extends BaseListener
+      class MyListener extends BaseListener
       {
          public void connectionFailed(HornetQException me)
          {
@@ -926,7 +996,7 @@
 
       final CountDownLatch latch = new CountDownLatch(1);
 
-      class MyListener  extends BaseListener
+      class MyListener extends BaseListener
       {
          public void connectionFailed(HornetQException me)
          {
@@ -1011,7 +1081,7 @@
 
       final CountDownLatch latch = new CountDownLatch(1);
 
-      class MyListener  extends BaseListener
+      class MyListener extends BaseListener
       {
          public void connectionFailed(HornetQException me)
          {
@@ -1072,7 +1142,7 @@
       // Wait to be informed of failure
 
       boolean ok = latch.await(1000, TimeUnit.MILLISECONDS);
-      
+
       log.info("waited for latch");
 
       assertTrue(ok);
@@ -1087,9 +1157,9 @@
       {
          assertEquals(XAException.XA_RBOTHER, e.errorCode);
       }
-      
-      //Thread.sleep(30000);
 
+      // Thread.sleep(30000);
+
       session1.close();
 
       session2.close();
@@ -1113,7 +1183,7 @@
 
       final CountDownLatch latch = new CountDownLatch(1);
 
-      class MyListener  extends BaseListener
+      class MyListener extends BaseListener
       {
          public void connectionFailed(HornetQException me)
          {
@@ -1197,7 +1267,7 @@
 
       final CountDownLatch latch = new CountDownLatch(1);
 
-      class MyListener  extends BaseListener
+      class MyListener extends BaseListener
       {
          public void connectionFailed(HornetQException me)
          {
@@ -1244,7 +1314,7 @@
 
       Map<ClientSession, List<ClientConsumer>> sessionConsumerMap = new HashMap<ClientSession, List<ClientConsumer>>();
 
-      class MyListener  extends BaseListener
+      class MyListener extends BaseListener
       {
          CountDownLatch latch = new CountDownLatch(1);
 
@@ -1342,7 +1412,6 @@
       assertEquals(0, sf.numConnections());
    }
 
-   
    /*
     * Browser will get reset to beginning after failover
     */
@@ -1359,7 +1428,7 @@
 
       final CountDownLatch latch = new CountDownLatch(1);
 
-      class MyListener  extends BaseListener
+      class MyListener extends BaseListener
       {
          public void connectionFailed(HornetQException me)
          {
@@ -1439,7 +1508,7 @@
 
       final CountDownLatch latch = new CountDownLatch(1);
 
-      class MyListener  extends BaseListener
+      class MyListener extends BaseListener
       {
          public void connectionFailed(HornetQException me)
          {
@@ -1522,7 +1591,7 @@
 
       final CountDownLatch latch = new CountDownLatch(1);
 
-      class MyListener  extends BaseListener
+      class MyListener extends BaseListener
       {
          public void connectionFailed(HornetQException me)
          {
@@ -1619,7 +1688,7 @@
 
       final CountDownLatch latch = new CountDownLatch(1);
 
-      class MyListener  extends BaseListener
+      class MyListener extends BaseListener
       {
          public void connectionFailed(HornetQException me)
          {
@@ -1687,7 +1756,7 @@
 
       final CountDownLatch latch = new CountDownLatch(1);
 
-      class MyListener  extends BaseListener
+      class MyListener extends BaseListener
       {
          public void connectionFailed(HornetQException me)
          {
@@ -1708,11 +1777,11 @@
          ClientMessage message = session.createClientMessage(true);
 
          if (i == 0)
-         { 
+         {
             // Only need to add it on one message per tx
             message.putStringProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, new SimpleString(txID));
          }
-         
+
          setBody(i, message);
 
          message.putIntProperty("counter", i);
@@ -1787,7 +1856,7 @@
          ClientMessage message = session2.createClientMessage(true);
 
          if (i == 0)
-         { 
+         {
             // Only need to add it on one message per tx
             message.putStringProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID, new SimpleString(txID));
          }
@@ -1843,7 +1912,7 @@
 
       final CountDownLatch latch = new CountDownLatch(1);
 
-      class MyListener  extends BaseListener
+      class MyListener extends BaseListener
       {
          public void connectionFailed(HornetQException me)
          {
@@ -2007,7 +2076,6 @@
       }
    }
 
-
    /**
     * @param i
     * @param message

Modified: trunk/tests/src/org/hornetq/tests/integration/cluster/failover/GroupingFailoverReplicationTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/cluster/failover/GroupingFailoverReplicationTest.java	2009-11-03 15:02:43 UTC (rev 8194)
+++ trunk/tests/src/org/hornetq/tests/integration/cluster/failover/GroupingFailoverReplicationTest.java	2009-11-03 16:55:02 UTC (rev 8195)
@@ -80,6 +80,6 @@
 
    void setupMasterServer(int i, boolean fileStorage, boolean netty)
    {
-      setupServer(i, fileStorage, netty, 2);
+      setupServer(i, fileStorage, false, netty, false, 2);
    }
 }

Modified: trunk/tests/src/org/hornetq/tests/integration/cluster/failover/GroupingFailoverTestBase.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/cluster/failover/GroupingFailoverTestBase.java	2009-11-03 15:02:43 UTC (rev 8194)
+++ trunk/tests/src/org/hornetq/tests/integration/cluster/failover/GroupingFailoverTestBase.java	2009-11-03 16:55:02 UTC (rev 8195)
@@ -12,28 +12,22 @@
  */
 package org.hornetq.tests.integration.cluster.failover;
 
-import org.hornetq.tests.integration.cluster.distribution.ClusterTestBase;
-import org.hornetq.core.config.Configuration;
-import org.hornetq.core.config.TransportConfiguration;
-import org.hornetq.core.config.impl.ConfigurationImpl;
-import org.hornetq.core.server.JournalType;
-import org.hornetq.core.server.HornetQServer;
-import org.hornetq.core.server.HornetQ;
-import org.hornetq.core.server.cluster.impl.ClusterConnectionImpl;
-import org.hornetq.core.server.cluster.MessageFlowRecord;
-import org.hornetq.core.server.group.impl.GroupingHandlerConfiguration;
-import org.hornetq.core.message.impl.MessageImpl;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
 import org.hornetq.core.client.ClientSession;
 import org.hornetq.core.client.impl.ClientSessionInternal;
-import org.hornetq.core.remoting.RemotingConnection;
-import org.hornetq.core.remoting.FailureListener;
 import org.hornetq.core.exception.HornetQException;
+import org.hornetq.core.message.impl.MessageImpl;
+import org.hornetq.core.remoting.FailureListener;
+import org.hornetq.core.remoting.RemotingConnection;
+import org.hornetq.core.server.cluster.MessageFlowRecord;
+import org.hornetq.core.server.cluster.impl.ClusterConnectionImpl;
+import org.hornetq.core.server.group.impl.GroupingHandlerConfiguration;
+import org.hornetq.tests.integration.cluster.distribution.ClusterTestBase;
 import org.hornetq.utils.SimpleString;
 
-import java.util.Map;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
 /**
  * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
  *         Created Oct 26, 2009

Modified: trunk/tests/src/org/hornetq/tests/integration/cluster/failover/PagingFailoverTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/cluster/failover/PagingFailoverTest.java	2009-11-03 15:02:43 UTC (rev 8194)
+++ trunk/tests/src/org/hornetq/tests/integration/cluster/failover/PagingFailoverTest.java	2009-11-03 16:55:02 UTC (rev 8195)
@@ -135,7 +135,7 @@
 
          for (int i = 0; i < MIDDLE; i++)
          {
-            ClientMessage msg = cons.receive(10000);
+            ClientMessage msg = cons.receive(20000);
             assertNotNull(msg);
             msg.acknowledge();
             if (transacted && i % 10 == 0)

Added: trunk/tests/src/org/hornetq/tests/integration/cluster/failover/ReplicatedDistrubtionTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/cluster/failover/ReplicatedDistrubtionTest.java	                        (rev 0)
+++ trunk/tests/src/org/hornetq/tests/integration/cluster/failover/ReplicatedDistrubtionTest.java	2009-11-03 16:55:02 UTC (rev 8195)
@@ -0,0 +1,315 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.integration.cluster.failover;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.hornetq.core.buffers.ChannelBuffers;
+import org.hornetq.core.client.ClientConsumer;
+import org.hornetq.core.client.ClientMessage;
+import org.hornetq.core.client.ClientProducer;
+import org.hornetq.core.client.ClientSession;
+import org.hornetq.core.client.SessionFailureListener;
+import org.hornetq.core.client.impl.ClientSessionInternal;
+import org.hornetq.core.exception.HornetQException;
+import org.hornetq.core.remoting.RemotingConnection;
+import org.hornetq.core.settings.impl.AddressSettings;
+import org.hornetq.tests.integration.cluster.distribution.ClusterTestBase;
+import org.hornetq.utils.SimpleString;
+
+/**
+ * A SymmetricFailoverTest
+ *
+ * @author <mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public class ReplicatedDistrubtionTest extends ClusterTestBase
+{
+
+   // Constants -----------------------------------------------------
+
+   static final SimpleString ADDRESS = new SimpleString("test.SomeAddress");
+
+   // Attributes ----------------------------------------------------
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   // Public --------------------------------------------------------
+
+   public void testRedistribution() throws Exception
+   {
+      setupSessionFactory(1, 0, true, true);
+      setupSessionFactory(3, 2, true, true);
+
+      ClientSession sessionOne = sfs[1].createSession(true, true);
+
+      ClientSession sessionThree = sfs[3].createSession(false, false);
+
+      sessionOne.createQueue(ADDRESS, ADDRESS, true);
+
+      sessionThree.createQueue(ADDRESS, ADDRESS, true);
+
+      ClientConsumer consThree = sessionThree.createConsumer(ADDRESS);
+
+      sessionThree.start();
+
+      waitForBindings(3, "test.SomeAddress", 1, 1, true);
+
+      try
+      {
+         ClientProducer producer = sessionOne.createProducer(ADDRESS);
+
+         for (int i = 0; i < 100; i++)
+         {
+            ClientMessage msg = sessionOne.createClientMessage(true);
+            msg.setBody(ChannelBuffers.wrappedBuffer(new byte[1024]));
+            msg.putIntProperty(new SimpleString("key"), i);
+            producer.send(msg);
+         }
+
+         sessionOne.commit();
+
+         for (int i = 0; i < 50; i++)
+         {
+            ClientMessage msg = consThree.receive(15000);
+
+            assertNotNull(msg);
+
+            System.out.println(i + " msg = " + msg);
+
+            int received = (Integer)msg.getProperty(new SimpleString("key"));
+
+            if (i != received)
+            {
+               // Shouldn't this be a failure?
+               System.out.println(i + "!=" + received);
+            }
+            msg.acknowledge();
+         }
+
+         sessionThree.commit();
+
+         // consThree.close();
+
+         // TODO: Remove this sleep: If a node fail,
+         // Redistribution may loose messages between the nodes.
+         Thread.sleep(500);
+
+         fail(sessionThree);
+
+         // sessionThree.close();
+         //         
+         // setupSessionFactory(2, -1, true);
+         //         
+         // sessionThree = sfs[2].createSession(true, true);
+         //         
+         // sessionThree.start();
+
+         // consThree = sessionThree.createConsumer(ADDRESS);
+
+         for (int i = 50; i < 100; i++)
+         {
+            ClientMessage msg = consThree.receive(15000);
+
+            assertNotNull(msg);
+
+            System.out.println(i + " msg = " + msg);
+
+            int received = (Integer)msg.getProperty(new SimpleString("key"));
+
+            if (i != received)
+            {
+               // Shouldn't this be a failure?
+               System.out.println(i + "!=" + received);
+            }
+            msg.acknowledge();
+         }
+
+         assertNull(consThree.receiveImmediate());
+
+         sessionThree.commit();
+
+         sessionOne.start();
+
+         ClientConsumer consOne = sessionOne.createConsumer(ADDRESS);
+
+         assertNull(consOne.receiveImmediate());
+
+      }
+      finally
+      {
+         sessionOne.close();
+         sessionThree.close();
+      }
+   }
+
+   public void testSimpleRedistributionOverReplication() throws Exception
+   {
+      setupSessionFactory(1, 0, true, true);
+      setupSessionFactory(3, 2, true, true);
+
+      ClientSession sessionOne = sfs[1].createSession(true, true);
+
+      ClientSession sessionThree = sfs[3].createSession(false, false);
+
+      sessionOne.createQueue(ADDRESS, ADDRESS, true);
+
+      sessionThree.createQueue(ADDRESS, ADDRESS, true);
+
+      ClientConsumer consThree = sessionThree.createConsumer(ADDRESS);
+
+      sessionThree.start();
+
+      waitForBindings(3, "test.SomeAddress", 1, 1, true);
+
+      try
+      {
+         ClientProducer producer = sessionOne.createProducer(ADDRESS);
+
+         for (int i = 0; i < 100; i++)
+         {
+            ClientMessage msg = sessionOne.createClientMessage(true);
+            msg.setBody(ChannelBuffers.wrappedBuffer(new byte[1024]));
+            msg.putIntProperty(new SimpleString("key"), i);
+            producer.send(msg);
+         }
+
+         sessionOne.commit();
+
+         for (int i = 0; i < 100; i++)
+         {
+            ClientMessage msg = consThree.receive(15000);
+
+            assertNotNull(msg);
+
+            System.out.println(i + " msg = " + msg);
+
+            int received = (Integer)msg.getProperty(new SimpleString("key"));
+
+            if (i != received)
+            {
+               // Shouldn't this be a failure?
+               System.out.println(i + "!=" + received);
+            }
+            msg.acknowledge();
+         }
+
+         sessionThree.commit();
+
+         sessionOne.start();
+
+         ClientConsumer consOne = sessionOne.createConsumer(ADDRESS);
+
+         assertNull(consOne.receiveImmediate());
+
+      }
+      finally
+      {
+         sessionOne.close();
+         sessionThree.close();
+      }
+   }
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+   /**
+    * @param session
+    * @param latch
+    * @throws InterruptedException
+    */
+   private void fail(final ClientSession session) throws InterruptedException
+   {
+
+      final CountDownLatch latch = new CountDownLatch(1);
+
+      class MyListener implements SessionFailureListener
+      {
+         public void connectionFailed(final HornetQException me)
+         {
+            latch.countDown();
+         }
+
+         /* (non-Javadoc)
+          * @see org.hornetq.core.client.SessionFailureListener#beforeReconnect(org.hornetq.core.exception.HornetQException)
+          */
+         public void beforeReconnect(final HornetQException exception)
+         {
+         }
+      }
+
+      session.addFailureListener(new MyListener());
+
+      RemotingConnection conn = ((ClientSessionInternal)session).getConnection();
+
+      // Simulate failure on connection
+      conn.fail(new HornetQException(HornetQException.NOT_CONNECTED));
+
+      // Wait to be informed of failure
+
+      boolean ok = latch.await(1000, TimeUnit.MILLISECONDS);
+
+      assertTrue(ok);
+   }
+
+   @Override
+   protected void setUp() throws Exception
+   {
+      super.setUp();
+
+      setupServer(0, true, isShared(), true, true, -1);
+      setupServer(1, true, isShared(), true, false, 0);
+      setupServer(2, true, isShared(), true, true, -1);
+      setupServer(3, true, isShared(), true, true,  2);
+
+      setupClusterConnectionWithBackups("test", "test", false, 1, true, 1, new int[] { 3 }, new int[] { 2 });
+
+      AddressSettings as = new AddressSettings();
+      as.setRedistributionDelay(0);
+
+      getServer(0).getAddressSettingsRepository().addMatch("test.*", as);
+      getServer(1).getAddressSettingsRepository().addMatch("test.*", as);
+      getServer(2).getAddressSettingsRepository().addMatch("test.*", as);
+      getServer(2).getAddressSettingsRepository().addMatch("test.*", as);
+
+      servers[0].start();
+      servers[2].start();
+      servers[1].start();
+      servers[3].start();
+   }
+
+   protected boolean isShared()
+   {
+      return false;
+   }
+
+   @Override
+   protected void tearDown() throws Exception
+   {
+      servers[2].stop();
+      servers[0].stop();
+      servers[1].stop();
+      servers[3].stop();
+      super.tearDown();
+   }
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+
+}

Added: trunk/tests/src/org/hornetq/tests/integration/cluster/failover/SharedStoreDistributionTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/cluster/failover/SharedStoreDistributionTest.java	                        (rev 0)
+++ trunk/tests/src/org/hornetq/tests/integration/cluster/failover/SharedStoreDistributionTest.java	2009-11-03 16:55:02 UTC (rev 8195)
@@ -0,0 +1,50 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.integration.cluster.failover;
+
+/**
+ * A SharedStoreReplicatedDistributionTest
+ *
+ * @author <mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public class SharedStoreDistributionTest extends ReplicatedDistrubtionTest
+{
+
+   // Constants -----------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   // Public --------------------------------------------------------
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+   
+   protected boolean isShared()
+   {
+      return true;
+   }
+
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+
+}

Modified: trunk/tests/src/org/hornetq/tests/integration/jms/HornetQConnectionFactoryTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/jms/HornetQConnectionFactoryTest.java	2009-11-03 15:02:43 UTC (rev 8194)
+++ trunk/tests/src/org/hornetq/tests/integration/jms/HornetQConnectionFactoryTest.java	2009-11-03 16:55:02 UTC (rev 8195)
@@ -863,6 +863,7 @@
                 .add(new TransportConfiguration("org.hornetq.core.remoting.impl.invm.InVMAcceptorFactory",
                                                 backupParams));
       backupConf.setBackup(true);
+      backupConf.setSharedStore(true);
       backupService = HornetQ.newHornetQServer(backupConf, false);
       backupService.start();
 
@@ -877,6 +878,7 @@
       connectors.put(backupTC.getName(), backupTC);
       connectors.put(liveTC.getName(), liveTC);
       liveConf.setConnectorConfigurations(connectors);
+      liveConf.setSharedStore(true);
       liveConf.setBackupConnectorName(backupTC.getName());
       liveConf.setClustered(true);
 

Modified: trunk/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java	2009-11-03 15:02:43 UTC (rev 8194)
+++ trunk/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java	2009-11-03 16:55:02 UTC (rev 8195)
@@ -37,6 +37,7 @@
 import org.hornetq.core.exception.HornetQException;
 import org.hornetq.core.journal.EncodingSupport;
 import org.hornetq.core.journal.Journal;
+import org.hornetq.core.journal.JournalLoadInformation;
 import org.hornetq.core.journal.LoaderCallback;
 import org.hornetq.core.journal.PreparedTransactionInfo;
 import org.hornetq.core.journal.RecordInfo;
@@ -107,7 +108,8 @@
 
       try
       {
-         ReplicationManagerImpl manager = new ReplicationManagerImpl(failoverManager, executor);
+         ReplicationManagerImpl manager = new ReplicationManagerImpl(failoverManager,
+                                                                     executor);
          manager.start();
          manager.stop();
       }
@@ -117,6 +119,87 @@
       }
    }
 
+   public void testInvalidJournal() throws Exception
+   {
+
+      Configuration config = createDefaultConfig(false);
+
+      config.setBackup(true);
+
+      HornetQServer server = new HornetQServerImpl(config);
+
+      FailoverManager failoverManager = createFailoverManager();
+
+      server.start();
+
+      try
+      {
+         ReplicationManagerImpl manager = new ReplicationManagerImpl(failoverManager,
+                                                                     executor);
+         manager.start();
+         try
+         {
+            manager.compareJournals(new JournalLoadInformation[]{new JournalLoadInformation(2,2), new JournalLoadInformation(2,2)});
+            fail("Exception was expected");
+         }
+         catch (HornetQException e)
+         {
+            e.printStackTrace();
+            assertEquals(HornetQException.ILLEGAL_STATE, e.getCode());
+         }
+
+         manager.compareJournals(new JournalLoadInformation[]{new JournalLoadInformation(), new JournalLoadInformation()});
+
+         manager.stop();
+      }
+      finally
+      {
+         server.stop();
+      }
+   }
+
+   // should throw an exception if a second server connects to the same backup
+   public void testInvalidConnection() throws Exception
+   {
+
+      Configuration config = createDefaultConfig(false);
+
+      config.setBackup(true);
+
+      HornetQServer server = new HornetQServerImpl(config);
+
+      FailoverManager failoverManager = createFailoverManager();
+
+      server.start();
+
+      try
+      {
+         ReplicationManagerImpl manager = new ReplicationManagerImpl(failoverManager,
+                                                                     executor);
+
+         manager.start();
+
+         try
+         {
+            ReplicationManagerImpl manager2 = new ReplicationManagerImpl(failoverManager,
+                                                                         executor);
+
+            manager2.start();
+            fail("Exception was expected");
+         }
+         catch (Exception e)
+         {
+         }
+
+         manager.stop();
+
+      }
+      finally
+      {
+         server.stop();
+      }
+   }
+
    public void testConnectIntoNonBackup() throws Exception
    {
 
@@ -132,7 +215,8 @@
 
       try
       {
-         ReplicationManagerImpl manager = new ReplicationManagerImpl(failoverManager, executor);
+         ReplicationManagerImpl manager = new ReplicationManagerImpl(failoverManager,
+                                                                     executor);
 
          try
          {
@@ -166,7 +250,8 @@
 
       try
       {
-         ReplicationManagerImpl manager = new ReplicationManagerImpl(failoverManager, executor);
+         ReplicationManagerImpl manager = new ReplicationManagerImpl(failoverManager,
+                                                                     executor);
          manager.start();
 
          Journal replicatedJournal = new ReplicatedJournal((byte)1, new FakeJournal(), manager);
@@ -184,11 +269,9 @@
          replicatedJournal.appendPrepareRecord(3, new FakeData(), false);
          replicatedJournal.appendRollbackRecord(3, false);
 
-         blockOnReplication(manager);
-
          assertEquals(1, manager.getActiveTokens().size());
 
-         manager.closeContext();
+         blockOnReplication(manager);
 
          for (int i = 0; i < 100; i++)
          {
@@ -272,11 +355,11 @@
       config.setBackup(true);
 
       ArrayList<String> intercepts = new ArrayList<String>();
-      
+
       intercepts.add(TestInterceptor.class.getName());
-      
+
       config.setInterceptorClassNames(intercepts);
-      
+
       HornetQServer server = new HornetQServerImpl(config);
 
       server.start();
@@ -285,7 +368,8 @@
 
       try
       {
-         ReplicationManagerImpl manager = new ReplicationManagerImpl(failoverManager, executor);
+         ReplicationManagerImpl manager = new ReplicationManagerImpl(failoverManager,
+                                                                      executor);
          manager.start();
 
          Journal replicatedJournal = new ReplicatedJournal((byte)1, new FakeJournal(), manager);
@@ -308,7 +392,7 @@
          });
 
          manager.closeContext();
-         
+
          server.stop();
 
          assertTrue(latch.await(50, TimeUnit.SECONDS));
@@ -336,9 +420,28 @@
 
       });
 
+      manager.closeContext();
+
       assertTrue(latch.await(30, TimeUnit.SECONDS));
    }
 
+   public void testNoServer() throws Exception
+   {
+      FailoverManager failoverManager = createFailoverManager();
+
+      try
+      {
+         ReplicationManagerImpl manager = new ReplicationManagerImpl(failoverManager,
+                                                                     executor);
+         manager.start();
+         fail("Exception expected");
+      }
+      catch (HornetQException expected)
+      {
+         assertEquals(HornetQException.ILLEGAL_STATE, expected.getCode());
+      }
+   }
+
    public void testNoActions() throws Exception
    {
 
@@ -354,7 +457,8 @@
 
       try
       {
-         ReplicationManagerImpl manager = new ReplicationManagerImpl(failoverManager, executor);
+         ReplicationManagerImpl manager = new ReplicationManagerImpl(failoverManager,
+                                                                     executor);
          manager.start();
 
          Journal replicatedJournal = new ReplicatedJournal((byte)1, new FakeJournal(), manager);
@@ -371,11 +475,13 @@
             }
 
          });
-         assertTrue(latch.await(1, TimeUnit.SECONDS));
+
          assertEquals(1, manager.getActiveTokens().size());
 
          manager.closeContext();
 
+         assertTrue(latch.await(1, TimeUnit.SECONDS));
+
          for (int i = 0; i < 100; i++)
          {
             // This is asynchronous. Have to wait completion
@@ -505,7 +611,6 @@
 
    };
 
-
    static class FakeJournal implements Journal
    {
 
@@ -649,21 +754,21 @@
       /* (non-Javadoc)
        * @see org.hornetq.core.journal.Journal#load(org.hornetq.core.journal.LoaderCallback)
        */
-      public long load(LoaderCallback reloadManager) throws Exception
+      public JournalLoadInformation load(LoaderCallback reloadManager) throws Exception
       {
 
-         return 0;
+         return new JournalLoadInformation();
       }
 
       /* (non-Javadoc)
        * @see org.hornetq.core.journal.Journal#load(java.util.List, java.util.List, org.hornetq.core.journal.TransactionFailureCallback)
        */
-      public long load(List<RecordInfo> committedRecords,
-                       List<PreparedTransactionInfo> preparedTransactions,
-                       TransactionFailureCallback transactionFailure) throws Exception
+      public JournalLoadInformation load(List<RecordInfo> committedRecords,
+                                         List<PreparedTransactionInfo> preparedTransactions,
+                                         TransactionFailureCallback transactionFailure) throws Exception
       {
 
-         return 0;
+         return new JournalLoadInformation();
       }
 
       /* (non-Javadoc)
@@ -699,5 +804,21 @@
 
       }
 
+      /* (non-Javadoc)
+       * @see org.hornetq.core.journal.Journal#loadInternalOnly()
+       */
+      public JournalLoadInformation loadInternalOnly() throws Exception
+      {
+         return new JournalLoadInformation();
+      }
+
+      /* (non-Javadoc)
+       * @see org.hornetq.core.journal.Journal#getNumberOfRecords()
+       */
+      public int getNumberOfRecords()
+      {
+         return 0;
+      }
+
    }
 }

Modified: trunk/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java	2009-11-03 15:02:43 UTC (rev 8194)
+++ trunk/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java	2009-11-03 16:55:02 UTC (rev 8195)
@@ -28,6 +28,7 @@
 import javax.transaction.xa.Xid;
 
 import org.hornetq.core.buffers.ChannelBuffers;
+import org.hornetq.core.journal.JournalLoadInformation;
 import org.hornetq.core.journal.SequentialFile;
 import org.hornetq.core.journal.SequentialFileFactory;
 import org.hornetq.core.journal.impl.NIOSequentialFileFactory;
@@ -47,6 +48,7 @@
 import org.hornetq.core.postoffice.Binding;
 import org.hornetq.core.postoffice.PostOffice;
 import org.hornetq.core.remoting.spi.HornetQBuffer;
+import org.hornetq.core.replication.ReplicationManager;
 import org.hornetq.core.server.LargeServerMessage;
 import org.hornetq.core.server.MessageReference;
 import org.hornetq.core.server.Queue;
@@ -945,8 +947,9 @@
       /* (non-Javadoc)
        * @see org.hornetq.core.persistence.StorageManager#loadBindingJournal(java.util.List)
        */
-      public void loadBindingJournal(final List<QueueBindingInfo> queueBindingInfos, List<GroupingInfo> groupingInfos) throws Exception
+      public JournalLoadInformation loadBindingJournal(final List<QueueBindingInfo> queueBindingInfos, List<GroupingInfo> groupingInfos) throws Exception
       {
+         return new JournalLoadInformation();
       }
 
       public void addGrouping(GroupBinding groupBinding) throws Exception
@@ -962,12 +965,13 @@
       /* (non-Javadoc)
        * @see org.hornetq.core.persistence.StorageManager#loadMessageJournal(org.hornetq.core.paging.PagingManager, java.util.Map, org.hornetq.core.transaction.ResourceManager, java.util.Map)
        */
-      public void loadMessageJournal(PostOffice postOffice,
+      public JournalLoadInformation loadMessageJournal(PostOffice postOffice,
                                      PagingManager pagingManager,
                                      ResourceManager resourceManager,
                                      Map<Long, Queue> queues,
                                      Map<SimpleString, List<Pair<byte[], Long>>> duplicateIDMap) throws Exception
       {
+         return new JournalLoadInformation();
       }
 
       /* (non-Javadoc)
@@ -1166,10 +1170,9 @@
       /* (non-Javadoc)
        * @see org.hornetq.core.persistence.StorageManager#loadInternalOnly()
        */
-      public void loadInternalOnly() throws Exception
+      public JournalLoadInformation[] loadInternalOnly() throws Exception
       {
-         
-         
+         return null; 
       }
 
       /* (non-Javadoc)
@@ -1206,6 +1209,13 @@
       {
       }
 
+      /* (non-Javadoc)
+       * @see org.hornetq.core.persistence.StorageManager#setReplicator(org.hornetq.core.replication.ReplicationManager)
+       */
+      public void setReplicator(ReplicationManager replicator)
+      {
+      }
+
    }
 
    class FakeStoreFactory implements PagingStoreFactory



More information about the hornetq-commits mailing list