[hornetq-commits] JBoss hornetq SVN: r11503 - in branches/HORNETQ-720_Replication: hornetq-core/src/main/java/org/hornetq/core/server/impl and 5 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Mon Oct 10 09:35:57 EDT 2011


Author: borges
Date: 2011-10-10 09:35:57 -0400 (Mon, 10 Oct 2011)
New Revision: 11503

Modified:
   branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicatedJournal.java
   branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
   branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
   branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java
   branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/Journal.java
   branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/FileWrapperJournal.java
   branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/JournalBase.java
   branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/JournalImpl.java
   branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupSyncJournalTest.java
   branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/util/BackupSyncDelay.java
   branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/replication/ReplicationTest.java
Log:
HORNETQ-720 JournalWrapper to always manipulate files through wrapped JournalImpl

Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicatedJournal.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicatedJournal.java	2011-10-10 13:35:23 UTC (rev 11502)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicatedJournal.java	2011-10-10 13:35:57 UTC (rev 11503)
@@ -26,7 +26,6 @@
 import org.hornetq.core.journal.SequentialFileFactory;
 import org.hornetq.core.journal.TransactionFailureCallback;
 import org.hornetq.core.journal.impl.JournalFile;
-import org.hornetq.core.journal.impl.JournalFilesRepository;
 import org.hornetq.core.journal.impl.dataformat.ByteArrayEncoding;
 import org.hornetq.core.logging.Logger;
 import org.hornetq.core.persistence.OperationContext;
@@ -616,13 +615,6 @@
       throw new UnsupportedOperationException();
    }
 
-   @Override
-   public JournalFilesRepository getFilesRepository()
-   {
-      throw new UnsupportedOperationException();
-   }
-
-   @Override
    public int getFileSize()
    {
       return localJournal.getFileSize();

Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java	2011-10-10 13:35:23 UTC (rev 11502)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java	2011-10-10 13:35:57 UTC (rev 11503)
@@ -317,6 +317,11 @@
       }
 
       filesReservedForSync.clear();
+      for (Journal j : journals)
+      {
+         if (j instanceof FileWrapperJournal)
+            j.stop();
+      }
       pageManager.stop();
 
       // Storage needs to be the last to stop
@@ -363,7 +368,6 @@
                                        "Backup node can't connect to the live node as the data differs");
          }
       }
-
    }
 
    /**
@@ -411,8 +415,9 @@
             }
             // files should be already in place.
             filesReservedForSync.remove(jc);
-            getJournal(jc.typeByte).stop();
             registerJournal(jc.typeByte, journal);
+            journal.stop();
+            journal.start();
             journal.loadInternalOnly();
          }
          finally
@@ -540,23 +545,23 @@
          return;
       }
 
-
       final Journal journal = journalsHolder.get(packet.getJournalContentType());
       synchronized (this)
       {
          if (!started)
                return;
-      Map<Long, JournalSyncFile> mapToFill = filesReservedForSync.get(packet.getJournalContentType());
+         Map<Long, JournalSyncFile> mapToFill = filesReservedForSync.get(packet.getJournalContentType());
          log.info("Journal " + packet.getJournalContentType() + ". Reserving fileIDs for synchronization: " +
                   Arrays.toString(packet.getFileIds()));
 
-      for (Entry<Long, JournalFile> entry : journal.createFilesForBackupSync(packet.getFileIds()).entrySet())
-      {
-         mapToFill.put(entry.getKey(), new JournalSyncFile(entry.getValue()));
+         for (Entry<Long, JournalFile> entry : journal.createFilesForBackupSync(packet.getFileIds()).entrySet())
+         {
+            mapToFill.put(entry.getKey(), new JournalSyncFile(entry.getValue()));
+         }
+         FileWrapperJournal syncJournal = new FileWrapperJournal(journal);
+         registerJournal(packet.getJournalContentType().typeByte, syncJournal);
       }
-      registerJournal(packet.getJournalContentType().typeByte, new FileWrapperJournal(journal));
-      }
-   }
+     }
 
    private void handleLargeMessageEnd(final ReplicationLargeMessageEndMessage packet)
    {

Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java	2011-10-10 13:35:23 UTC (rev 11502)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java	2011-10-10 13:35:57 UTC (rev 11503)
@@ -470,8 +470,8 @@
       {
          SequentialFile file = jf.getFile().copy();
          try {
-         log.info("Replication: sending " + jf + " (size=" + file.size() + ") to backup. " + file);
-         sendLargeFile(content, null, jf.getFileID(), file, Long.MAX_VALUE);
+            log.info("Replication: sending " + jf + " (size=" + file.size() + ") to backup. " + file);
+            sendLargeFile(content, null, jf.getFileID(), file, Long.MAX_VALUE);
          }
          finally
          {

Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java	2011-10-10 13:35:23 UTC (rev 11502)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java	2011-10-10 13:35:57 UTC (rev 11503)
@@ -215,8 +215,8 @@
    private final Map<String, ServerSession> sessions = new ConcurrentHashMap<String, ServerSession>();
 
    private final Object initialiseLock = new Object();
-
    private boolean initialised;
+   private final Object startUpLock = new Object();
 
    /**
     * Only applicable to 'remote backup servers'. If this flag is false the backup may not become
@@ -602,8 +602,6 @@
             serverLocator.close();
             replicationEndpoint.stop();
 
-            if (!started)
-               return;
             if (!isRemoteBackupUpToDate())
             {
                /*
@@ -614,16 +612,20 @@
             }
 
             configuration.setBackup(false);
-            storageManager.start();
+            synchronized (startUpLock)
+            {
+               if (!started)
+                  return;
+               storageManager.start();
+               initialisePart2();
+               clusterManager.activate();
+            }
 
-            initialisePart2();
-            clusterManager.activate();
-
          }
          catch (Exception e)
          {
-            if (e instanceof InterruptedException && !started)
-               // do not log errors if the server is being stopped.
+            if ((e instanceof InterruptedException || e instanceof IllegalStateException) && !started)
+               // do not log these errors if the server is being stopped.
                return;
             log.error("Failure in initialisation", e);
             e.printStackTrace();
@@ -796,6 +798,9 @@
 
       synchronized (this)
       {
+         synchronized (startUpLock)
+         {
+
          // Stop the deployers
          if (configuration.isFileDeploymentEnabled())
          {
@@ -917,6 +922,8 @@
 
          started = false;
          initialised = false;
+         }
+
          // to display in the log message
          SimpleString tempNodeID = getNodeID();
 

Modified: branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/Journal.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/Journal.java	2011-10-10 13:35:23 UTC (rev 11502)
+++ branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/Journal.java	2011-10-10 13:35:57 UTC (rev 11503)
@@ -17,7 +17,6 @@
 import java.util.Map;
 
 import org.hornetq.core.journal.impl.JournalFile;
-import org.hornetq.core.journal.impl.JournalFilesRepository;
 import org.hornetq.core.server.HornetQComponent;
 
 /**
@@ -181,7 +180,5 @@
 
    SequentialFileFactory getFileFactory();
 
-   JournalFilesRepository getFilesRepository();
-
    int getFileSize();
 }

Modified: branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/FileWrapperJournal.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/FileWrapperJournal.java	2011-10-10 13:35:23 UTC (rev 11502)
+++ branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/FileWrapperJournal.java	2011-10-10 13:35:57 UTC (rev 11503)
@@ -15,7 +15,6 @@
 import org.hornetq.core.journal.LoaderCallback;
 import org.hornetq.core.journal.PreparedTransactionInfo;
 import org.hornetq.core.journal.RecordInfo;
-import org.hornetq.core.journal.SequentialFile;
 import org.hornetq.core.journal.SequentialFileFactory;
 import org.hornetq.core.journal.TransactionFailureCallback;
 import org.hornetq.core.journal.impl.dataformat.JournalAddRecord;
@@ -34,9 +33,10 @@
 public class FileWrapperJournal extends JournalBase implements Journal
 {
    private final ReentrantLock lockAppend = new ReentrantLock();
-   // private final ReadWriteLock journalLock = new ReentrantReadWriteLock();
 
    private final ConcurrentMap<Long, AtomicInteger> transactions = new ConcurrentHashMap<Long, AtomicInteger>();
+   private final JournalImpl journal;
+   protected volatile JournalFile currentFile;
 
    /**
     * @param journal
@@ -44,24 +44,21 @@
     */
    public FileWrapperJournal(Journal journal) throws Exception
    {
-      super(journal.getFileFactory(), journal.getFilesRepository(), journal.getFileSize());
-      setUpCurrentFile(JournalImpl.SIZE_HEADER);
+      super(journal.getFileFactory().isSupportsCallbacks(), journal.getFileSize());
+      this.journal = (JournalImpl)journal;
+      currentFile = this.journal.setUpCurrentFile(JournalImpl.SIZE_HEADER);
    }
 
    @Override
    public void start() throws Exception
    {
-      throw new HornetQException(HornetQException.UNSUPPORTED_PACKET);
+      throw new UnsupportedOperationException();
    }
 
    @Override
    public void stop() throws Exception
    {
-      SequentialFile seqFile = currentFile.getFile();
-      long pos = seqFile.position();
-      seqFile.close();
-      seqFile.open();
-      seqFile.position(pos);
+      currentFile.getFile().close();
    }
 
    @Override
@@ -97,7 +94,7 @@
          {
             callback.storeLineUp();
          }
-         switchFileIfNecessary(encoder.getEncodeSize());
+         currentFile = journal.switchFileIfNecessary(encoder.getEncodeSize());
          encoder.setFileID(currentFile.getRecordID());
 
          if (callback != null)
@@ -312,9 +309,4 @@
       throw new UnsupportedOperationException();
    }
 
-   @Override
-   public JournalFilesRepository getFilesRepository()
-   {
-      throw new UnsupportedOperationException();
-   }
 }

Modified: branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/JournalBase.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/JournalBase.java	2011-10-10 13:35:23 UTC (rev 11502)
+++ branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/JournalBase.java	2011-10-10 13:35:57 UTC (rev 11503)
@@ -3,34 +3,25 @@
 import org.hornetq.api.core.HornetQBuffer;
 import org.hornetq.core.journal.EncodingSupport;
 import org.hornetq.core.journal.IOCompletion;
-import org.hornetq.core.journal.SequentialFileFactory;
 import org.hornetq.core.journal.impl.dataformat.ByteArrayEncoding;
 import org.hornetq.core.logging.Logger;
 
 abstract class JournalBase
 {
 
-   protected final JournalFilesRepository filesRepository;
-   protected final SequentialFileFactory fileFactory;
-   protected volatile JournalFile currentFile;
    protected final int fileSize;
+   private final boolean supportsCallback;
 
    private static final Logger log = Logger.getLogger(JournalBase.class);
    private static final boolean trace = log.isTraceEnabled();
 
-   public JournalBase(SequentialFileFactory fileFactory, JournalFilesRepository journalFilesRepository, int fileSize)
+   public JournalBase(boolean supportsCallback, int fileSize)
    {
       if (fileSize < JournalImpl.MIN_FILE_SIZE)
       {
          throw new IllegalArgumentException("File size cannot be less than " + JournalImpl.MIN_FILE_SIZE + " bytes");
       }
-      if (fileSize % fileFactory.getAlignment() != 0)
-      {
-         throw new IllegalArgumentException("Invalid journal-file-size " + fileSize + ", It should be multiple of " +
-                  fileFactory.getAlignment());
-      }
-      this.fileFactory = fileFactory;
-      this.filesRepository = journalFilesRepository;
+      this.supportsCallback = supportsCallback;
       this.fileSize = fileSize;
    }
 
@@ -198,56 +189,11 @@
       }
    }
 
-   /**
-    * @param size
-    * @throws Exception
-    */
-   protected void switchFileIfNecessary(int size) throws Exception
-   {
-      // We take into account the fileID used on the Header
-      if (size > fileSize - currentFile.getFile().calculateBlockStart(JournalImpl.SIZE_HEADER))
-      {
-         throw new IllegalArgumentException("Record is too large to store " + size);
-      }
-
-      if (!currentFile.getFile().fits(size))
-      {
-         moveNextFile(true);
-
-         // The same check needs to be done at the new file also
-         if (!currentFile.getFile().fits(size))
-         {
-            // Sanity check, this should never happen
-            throw new IllegalStateException("Invalid logic on buffer allocation");
-         }
-      }
-   }
-
    abstract void scheduleReclaim();
 
-   // You need to guarantee lock.acquire() before calling this method
-   protected void moveNextFile(final boolean scheduleReclaim) throws Exception
-   {
-      filesRepository.closeFile(currentFile);
-
-      currentFile = filesRepository.openFile();
-
-      if (scheduleReclaim)
-      {
-         scheduleReclaim();
-      }
-
-      if (trace)
-      {
-         log.info("moveNextFile: " + currentFile);
-      }
-
-      fileFactory.activateBuffer(currentFile.getFile());
-   }
-
    protected SyncIOCompletion getSyncCallback(final boolean sync)
    {
-      if (fileFactory.isSupportsCallbacks())
+      if (supportsCallback)
       {
          if (sync)
          {
@@ -279,38 +225,6 @@
       }
    }
 
-   /**
-    * @param lastDataPos
-    * @throws Exception
-    */
-   protected void setUpCurrentFile(int lastDataPos) throws Exception
-   {
-      // Create any more files we need
-
-      filesRepository.ensureMinFiles();
-
-      // The current file is the last one that has data
-
-      currentFile = filesRepository.pollLastDataFile();
-
-      if (currentFile != null)
-      {
-         currentFile.getFile().open();
-
-         currentFile.getFile().position(currentFile.getFile().calculateBlockStart(lastDataPos));
-      }
-      else
-      {
-         currentFile = filesRepository.getFreeFile();
-
-         filesRepository.openFile(currentFile, true);
-      }
-
-      fileFactory.activateBuffer(currentFile.getFile());
-
-      filesRepository.pushOpenedFile();
-   }
-
    public int getFileSize()
    {
       return fileSize;

Modified: branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/JournalImpl.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/JournalImpl.java	2011-10-10 13:35:23 UTC (rev 11502)
+++ branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/JournalImpl.java	2011-10-10 13:35:57 UTC (rev 11503)
@@ -15,7 +15,6 @@
 
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
@@ -177,6 +176,9 @@
    private final float compactPercentage;
 
    private final int compactMinFiles;
+   private final JournalFilesRepository filesRepository;
+   private final SequentialFileFactory fileFactory;
+   private volatile JournalFile currentFile;
 
    // Compacting may replace this structure
    private final ConcurrentMap<Long, JournalRecord> records = new ConcurrentHashMap<Long, JournalRecord>();
@@ -223,8 +225,12 @@
                       final String fileExtension,
                       final int maxAIO)
    {
-      super(fileFactory, new JournalFilesRepository(fileFactory, filePrefix, fileExtension, 0, maxAIO, fileSize,
-                                                    minFiles), fileSize);
+      super(fileFactory.isSupportsCallbacks(), fileSize);
+      if (fileSize % fileFactory.getAlignment() != 0)
+      {
+         throw new IllegalArgumentException("Invalid journal-file-size " + fileSize + ", It should be multiple of " +
+                  fileFactory.getAlignment());
+      }
       if (minFiles < 2)
       {
          throw new IllegalArgumentException("minFiles cannot be less than 2");
@@ -242,7 +248,10 @@
       {
          this.compactPercentage = compactPercentage / 100f;
       }
+      this.fileFactory = fileFactory;
 
+      this.filesRepository =
+               new JournalFilesRepository(fileFactory, filePrefix, fileExtension, 0, maxAIO, fileSize, minFiles);
       this.compactMinFiles = compactMinFiles;
       this.minFiles = minFiles;
       this.userVersion = 0;
@@ -1706,6 +1715,7 @@
       filesRepository.clear();
 
       transactions.clear();
+      currentFile = null;
 
       final Map<Long, TransactionHolder> loadTransactions = new LinkedHashMap<Long, TransactionHolder>();
 
@@ -2366,13 +2376,16 @@
 
    public synchronized void stop() throws Exception
    {
-      JournalImpl.trace("Stopping the journal");
-
       if (state == JournalState.STOPPED)
       {
          throw new IllegalStateException("Journal is already stopped");
       }
 
+      JournalImpl.trace("Stopping the journal " + this);
+
+      journalLock.writeLock().lock();
+      try
+      {
       lockAppend.lock();
 
       try
@@ -2412,7 +2425,12 @@
       finally
       {
          lockAppend.unlock();
+         }
       }
+      finally
+      {
+         journalLock.writeLock().unlock();
+      }
    }
 
    public int getNumberOfRecords()
@@ -3018,7 +3036,6 @@
       try
       {
          Map<Long, JournalFile> map = new HashMap<Long, JournalFile>();
-         log.info("Reserving fileIDs before synchronization: " + Arrays.toString(fileIds));
          long maxID = -1;
          for (long id : fileIds)
          {
@@ -3040,9 +3057,84 @@
       return fileFactory;
    }
 
-   @Override
-   public JournalFilesRepository getFilesRepository()
+   /**
+    * @param lastDataPos
+    * @return
+    * @throws Exception
+    */
+   protected JournalFile setUpCurrentFile(int lastDataPos) throws Exception
    {
-      return filesRepository;
+      // Create any more files we need
+
+      filesRepository.ensureMinFiles();
+
+      // The current file is the last one that has data
+
+      currentFile = filesRepository.pollLastDataFile();
+      if (currentFile != null)
+      {
+         if (!currentFile.getFile().isOpen())
+         currentFile.getFile().open();
+
+         currentFile.getFile().position(currentFile.getFile().calculateBlockStart(lastDataPos));
+      }
+      else
+      {
+         currentFile = filesRepository.getFreeFile();
+
+         filesRepository.openFile(currentFile, true);
+      }
+
+      fileFactory.activateBuffer(currentFile.getFile());
+
+      filesRepository.pushOpenedFile();
+      return currentFile;
    }
+
+   /**
+    * @param size
+    * @return
+    * @throws Exception
+    */
+   protected JournalFile switchFileIfNecessary(int size) throws Exception
+   {
+      // We take into account the fileID used on the Header
+      if (size > fileSize - currentFile.getFile().calculateBlockStart(JournalImpl.SIZE_HEADER))
+      {
+         throw new IllegalArgumentException("Record is too large to store " + size);
+      }
+
+      if (!currentFile.getFile().fits(size))
+      {
+         moveNextFile(true);
+
+         // The same check needs to be done at the new file also
+         if (!currentFile.getFile().fits(size))
+         {
+            // Sanity check, this should never happen
+            throw new IllegalStateException("Invalid logic on buffer allocation");
+         }
+      }
+      return currentFile;
+   }
+
+   /** You need to guarantee lock.acquire() before calling this method! */
+   private void moveNextFile(final boolean scheduleReclaim) throws Exception
+   {
+      filesRepository.closeFile(currentFile);
+
+      currentFile = filesRepository.openFile();
+
+      if (scheduleReclaim)
+      {
+         scheduleReclaim();
+      }
+
+      if (trace)
+      {
+         log.info("moveNextFile: " + currentFile);
+      }
+
+      fileFactory.activateBuffer(currentFile.getFile());
+   }
 }

Modified: branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupSyncJournalTest.java
===================================================================
--- branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupSyncJournalTest.java	2011-10-10 13:35:23 UTC (rev 11502)
+++ branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupSyncJournalTest.java	2011-10-10 13:35:57 UTC (rev 11503)
@@ -45,8 +45,7 @@
 
    public void testNodeID() throws Exception
    {
-      backupServer.start();
-      waitForComponent(backupServer, 5);
+      startBackupFinishSyncing();
       assertTrue("must be running", backupServer.isStarted());
       assertEquals("backup and live should have the same nodeID", liveServer.getServer().getNodeID(),
                    backupServer.getServer().getNodeID());
@@ -113,8 +112,8 @@
 
    protected void startBackupFinishSyncing() throws Exception
    {
-      backupServer.start();
       syncDelay.deliverUpToDateMsg();
+      backupServer.start();
       waitForBackup(sessionFactory, BACKUP_WAIT_TIME, true);
    }
 

Modified: branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/util/BackupSyncDelay.java
===================================================================
--- branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/util/BackupSyncDelay.java	2011-10-10 13:35:23 UTC (rev 11502)
+++ branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/util/BackupSyncDelay.java	2011-10-10 13:35:57 UTC (rev 11503)
@@ -41,9 +41,9 @@
 
    public void deliverUpToDateMsg()
    {
+      live.removeInterceptor(this);
       if (backup.isStarted())
          handler.deliver();
-      live.removeInterceptor(this);
    }
 
    public BackupSyncDelay(TestableServer backup, TestableServer live)

Modified: branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/replication/ReplicationTest.java
===================================================================
--- branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/replication/ReplicationTest.java	2011-10-10 13:35:23 UTC (rev 11502)
+++ branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/replication/ReplicationTest.java	2011-10-10 13:35:57 UTC (rev 11503)
@@ -54,7 +54,6 @@
 import org.hornetq.core.journal.SequentialFileFactory;
 import org.hornetq.core.journal.TransactionFailureCallback;
 import org.hornetq.core.journal.impl.JournalFile;
-import org.hornetq.core.journal.impl.JournalFilesRepository;
 import org.hornetq.core.paging.PagedMessage;
 import org.hornetq.core.paging.PagingManager;
 import org.hornetq.core.paging.PagingStore;
@@ -891,12 +890,6 @@
       }
 
       @Override
-      public JournalFilesRepository getFilesRepository()
-      {
-         return null;
-      }
-
-      @Override
       public int getFileSize()
       {
          return 0;



More information about the hornetq-commits mailing list