[hornetq-commits] JBoss hornetq SVN: r8121 - in trunk: src/main/org/hornetq/core/persistence/impl/journal and 6 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Fri Oct 16 22:25:11 EDT 2009


Author: clebert.suconic at jboss.com
Date: 2009-10-16 22:25:10 -0400 (Fri, 16 Oct 2009)
New Revision: 8121

Modified:
   trunk/src/main/org/hornetq/core/journal/impl/NIOSequentialFile.java
   trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
   trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
   trunk/src/main/org/hornetq/core/replication/ReplicationContext.java
   trunk/src/main/org/hornetq/core/replication/impl/ReplicatedJournal.java
   trunk/src/main/org/hornetq/core/replication/impl/ReplicationContextImpl.java
   trunk/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
   trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
   trunk/tests/src/org/hornetq/tests/
   trunk/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java
Log:
https://jira.jboss.org/jira/browse/HORNETQ-125 - Replication stop on backup failure

Modified: trunk/src/main/org/hornetq/core/journal/impl/NIOSequentialFile.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/NIOSequentialFile.java	2009-10-16 12:01:15 UTC (rev 8120)
+++ trunk/src/main/org/hornetq/core/journal/impl/NIOSequentialFile.java	2009-10-17 02:25:10 UTC (rev 8121)
@@ -54,10 +54,6 @@
       return 1;
    }
 
-   public void flush()
-   {
-   }
-
    public int calculateBlockStart(final int position) throws Exception
    {
       return position;

Modified: trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java	2009-10-16 12:01:15 UTC (rev 8120)
+++ trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java	2009-10-17 02:25:10 UTC (rev 8121)
@@ -1764,7 +1764,7 @@
    private class FinishPageMessageOperation implements TransactionOperation
    {
 
-      public void afterCommit(final Transaction tx) throws Exception
+      public void afterCommit(final Transaction tx)
       {
          // If part of the transaction goes to the queue, and part goes to paging, we can't let depage start for the
          // transaction until all the messages were added to the queue

Modified: trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java	2009-10-16 12:01:15 UTC (rev 8120)
+++ trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java	2009-10-17 02:25:10 UTC (rev 8121)
@@ -1100,7 +1100,7 @@
          return Collections.emptySet();
       }
 
-      public void afterCommit(final Transaction tx) throws Exception
+      public void afterCommit(final Transaction tx)
       {
          // If part of the transaction goes to the queue, and part goes to paging, we can't let depage start for the
          // transaction until all the messages were added to the queue
@@ -1214,7 +1214,7 @@
          this.refs = refs;
       }
 
-      public void afterCommit(Transaction tx) throws Exception
+      public void afterCommit(Transaction tx)
       {        
          for (MessageReference ref : refs)
          {

Modified: trunk/src/main/org/hornetq/core/replication/ReplicationContext.java
===================================================================
--- trunk/src/main/org/hornetq/core/replication/ReplicationContext.java	2009-10-16 12:01:15 UTC (rev 8120)
+++ trunk/src/main/org/hornetq/core/replication/ReplicationContext.java	2009-10-17 02:25:10 UTC (rev 8121)
@@ -34,5 +34,8 @@
    
    /** To be called when there are no more operations pending */
    void complete();
+   
+   /** Flush all pending callbacks on the Context */
+   void flush();
 
 }

Modified: trunk/src/main/org/hornetq/core/replication/impl/ReplicatedJournal.java
===================================================================
--- trunk/src/main/org/hornetq/core/replication/impl/ReplicatedJournal.java	2009-10-16 12:01:15 UTC (rev 8120)
+++ trunk/src/main/org/hornetq/core/replication/impl/ReplicatedJournal.java	2009-10-17 02:25:10 UTC (rev 8121)
@@ -48,17 +48,17 @@
 
    private final ReplicationManager replicationManager;
 
-   private final Journal replicatedJournal;
+   private final Journal localJournal;
 
    private final byte journalID;
 
    public ReplicatedJournal(final byte journaID,
-                                final Journal replicatedJournal,
+                                final Journal localJournal,
                                 final ReplicationManager replicationManager)
    {
       super();
       journalID = journaID;
-      this.replicatedJournal = replicatedJournal;
+      this.localJournal = localJournal;
       this.replicationManager = replicationManager;
    }
 
@@ -100,7 +100,7 @@
          trace("Append record id = " + id + " recordType = " + recordType);
       }
       replicationManager.appendAddRecord(journalID, id, recordType, record);
-      replicatedJournal.appendAddRecord(id, recordType, record, sync);
+      localJournal.appendAddRecord(id, recordType, record, sync);
    }
 
    /**
@@ -134,7 +134,7 @@
          trace("Append record TXid = " + id + " recordType = " + recordType);
       }
       replicationManager.appendAddRecordTransactional(journalID, txID, id, recordType, record);
-      replicatedJournal.appendAddRecordTransactional(txID, id, recordType, record);
+      localJournal.appendAddRecordTransactional(txID, id, recordType, record);
    }
 
    /**
@@ -150,7 +150,7 @@
          trace("AppendCommit " + txID);
       }
       replicationManager.appendCommitRecord(journalID, txID);
-      replicatedJournal.appendCommitRecord(txID, sync);
+      localJournal.appendCommitRecord(txID, sync);
    }
 
    /**
@@ -166,7 +166,7 @@
          trace("AppendDelete " + id);
       }
       replicationManager.appendDeleteRecord(journalID, id);
-      replicatedJournal.appendDeleteRecord(id, sync);
+      localJournal.appendDeleteRecord(id, sync);
    }
 
    /**
@@ -195,7 +195,7 @@
          trace("AppendDelete txID=" + txID + " id=" + id);
       }
       replicationManager.appendDeleteRecordTransactional(journalID, txID, id, record);
-      replicatedJournal.appendDeleteRecordTransactional(txID, id, record);
+      localJournal.appendDeleteRecordTransactional(txID, id, record);
    }
 
    /**
@@ -211,7 +211,7 @@
          trace("AppendDelete (noencoding) txID=" + txID + " id=" + id);
       }
       replicationManager.appendDeleteRecordTransactional(journalID, txID, id);
-      replicatedJournal.appendDeleteRecordTransactional(txID, id);
+      localJournal.appendDeleteRecordTransactional(txID, id);
    }
 
    /**
@@ -240,7 +240,7 @@
          trace("AppendPrepare txID=" + txID);
       }
       replicationManager.appendPrepareRecord(journalID, txID, transactionData);
-      replicatedJournal.appendPrepareRecord(txID, transactionData, sync);
+      localJournal.appendPrepareRecord(txID, transactionData, sync);
    }
 
    /**
@@ -256,7 +256,7 @@
          trace("AppendRollback " + txID);
       }
       replicationManager.appendRollbackRecord(journalID, txID);
-      replicatedJournal.appendRollbackRecord(txID, sync);
+      localJournal.appendRollbackRecord(txID, sync);
    }
 
    /**
@@ -287,7 +287,7 @@
          trace("AppendUpdateRecord id = " + id + " , recordType = " + recordType);
       }
       replicationManager.appendUpdateRecord(journalID, id, recordType, record);
-      replicatedJournal.appendUpdateRecord(id, recordType, record, sync);
+      localJournal.appendUpdateRecord(id, recordType, record, sync);
    }
 
    /**
@@ -324,7 +324,7 @@
          trace("AppendUpdateRecord txid=" + txID + " id = " + id + " , recordType = " + recordType);
       }
       replicationManager.appendUpdateRecordTransactional(journalID, txID, id, recordType, record);
-      replicatedJournal.appendUpdateRecordTransactional(txID, id, recordType, record);
+      localJournal.appendUpdateRecordTransactional(txID, id, recordType, record);
    }
 
    /**
@@ -339,7 +339,7 @@
                     final List<PreparedTransactionInfo> preparedTransactions,
                     final TransactionFailureCallback transactionFailure) throws Exception
    {
-      return replicatedJournal.load(committedRecords, preparedTransactions, transactionFailure);
+      return localJournal.load(committedRecords, preparedTransactions, transactionFailure);
    }
 
    /**
@@ -350,7 +350,7 @@
     */
    public long load(final LoaderCallback reloadManager) throws Exception
    {
-      return replicatedJournal.load(reloadManager);
+      return localJournal.load(reloadManager);
    }
 
    /**
@@ -360,7 +360,7 @@
     */
    public void perfBlast(final int pages) throws Exception
    {
-      replicatedJournal.perfBlast(pages);
+      localJournal.perfBlast(pages);
    }
 
    /**
@@ -369,7 +369,7 @@
     */
    public void start() throws Exception
    {
-      replicatedJournal.start();
+      localJournal.start();
    }
 
    /**
@@ -378,7 +378,7 @@
     */
    public void stop() throws Exception
    {
-      replicatedJournal.stop();
+      localJournal.stop();
    }
 
    /* (non-Javadoc)
@@ -386,7 +386,7 @@
     */
    public int getAlignment() throws Exception
    {
-      return replicatedJournal.getAlignment();
+      return localJournal.getAlignment();
    }
 
    /* (non-Javadoc)
@@ -394,7 +394,7 @@
     */
    public boolean isStarted()
    {
-      return replicatedJournal.isStarted();
+      return localJournal.isStarted();
    }
 
    // Package protected ---------------------------------------------

Modified: trunk/src/main/org/hornetq/core/replication/impl/ReplicationContextImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/replication/impl/ReplicationContextImpl.java	2009-10-16 12:01:15 UTC (rev 8120)
+++ trunk/src/main/org/hornetq/core/replication/impl/ReplicationContextImpl.java	2009-10-17 02:25:10 UTC (rev 8121)
@@ -53,14 +53,22 @@
    {
       if (--pendings == 0)
       {
-         if (tasks != null)
+         flush();
+      }
+   }
+
+   /**
+    * 
+    */
+   public void flush()
+   {
+      if (tasks != null)
+      {
+         for (Runnable run : tasks)
          {
-            for (Runnable run : tasks)
-            {
-               executor.execute(run);
-            }
-            tasks.clear();
+            executor.execute(run);
          }
+         tasks.clear();
       }
    }
    

Modified: trunk/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java	2009-10-16 12:01:15 UTC (rev 8120)
+++ trunk/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java	2009-10-17 02:25:10 UTC (rev 8121)
@@ -19,11 +19,13 @@
 import java.util.concurrent.Executor;
 
 import org.hornetq.core.client.impl.FailoverManager;
+import org.hornetq.core.exception.HornetQException;
 import org.hornetq.core.journal.EncodingSupport;
 import org.hornetq.core.logging.Logger;
 import org.hornetq.core.paging.PagedMessage;
 import org.hornetq.core.remoting.Channel;
 import org.hornetq.core.remoting.ChannelHandler;
+import org.hornetq.core.remoting.FailureListener;
 import org.hornetq.core.remoting.Packet;
 import org.hornetq.core.remoting.RemotingConnection;
 import org.hornetq.core.remoting.impl.wireformat.CreateReplicationSessionMessage;
@@ -40,8 +42,8 @@
 import org.hornetq.core.remoting.impl.wireformat.ReplicationPageWriteMessage;
 import org.hornetq.core.remoting.impl.wireformat.ReplicationPrepareMessage;
 import org.hornetq.core.remoting.spi.HornetQBuffer;
+import org.hornetq.core.replication.ReplicationContext;
 import org.hornetq.core.replication.ReplicationManager;
-import org.hornetq.core.replication.ReplicationContext;
 import org.hornetq.utils.ConcurrentHashSet;
 import org.hornetq.utils.SimpleString;
 
@@ -79,11 +81,11 @@
 
    private final Executor executor;
 
-   private final ThreadLocal<ReplicationContext> repliToken = new ThreadLocal<ReplicationContext>();
+   private final ThreadLocal<ReplicationContext> tlReplicationContext = new ThreadLocal<ReplicationContext>();
 
    private final Queue<ReplicationContext> pendingTokens = new ConcurrentLinkedQueue<ReplicationContext>();
 
-   private final ConcurrentHashSet<ReplicationContext> activeTokens = new ConcurrentHashSet<ReplicationContext>();
+   private final ConcurrentHashSet<ReplicationContext> activeContexts = new ConcurrentHashSet<ReplicationContext>();
 
    // Static --------------------------------------------------------
 
@@ -255,7 +257,7 @@
          sendReplicatePacket(new ReplicationPageWriteMessage(message, pageNumber));
       }
    }
-   
+
    /* (non-Javadoc)
     * @see org.hornetq.core.replication.ReplicationManager#largeMessageBegin(byte[])
     */
@@ -300,8 +302,6 @@
       }
    }
 
-   
-
    /* (non-Javadoc)
     * @see org.hornetq.core.server.HornetQComponent#isStarted()
     */
@@ -330,6 +330,22 @@
 
       mainChannel.sendBlocking(replicationStartPackage);
 
+      failoverManager.addFailureListener(new FailureListener()
+      {
+         public void connectionFailed(HornetQException me)
+         {
+            log.warn("Connection to the backup node failed, removing replication now");
+            try
+            {
+               stop();
+            }
+            catch (Exception e)
+            {
+               log.warn(e.getMessage(), e);
+            }
+         }
+      });
+
       started = true;
 
       enabled = true;
@@ -340,6 +356,16 @@
     */
    public void stop() throws Exception
    {
+      enabled = false;
+      
+      for (ReplicationContext ctx : activeContexts)
+      {
+         ctx.complete();
+         ctx.flush();
+      }
+      
+      activeContexts.clear();
+      
       if (replicatingChannel != null)
       {
          replicatingChannel.close();
@@ -353,16 +379,18 @@
       }
 
       connection = null;
+
+      started = false;
    }
 
    public ReplicationContext getContext()
    {
-      ReplicationContext token = repliToken.get();
+      ReplicationContext token = tlReplicationContext.get();
       if (token == null)
       {
          token = new ReplicationContextImpl(executor);
-         activeTokens.add(token);
-         repliToken.set(token);
+         activeContexts.add(token);
+         tlReplicationContext.set(token);
       }
       return token;
    }
@@ -380,17 +408,17 @@
     */
    public void closeContext()
    {
-      final ReplicationContext token = repliToken.get();
+      final ReplicationContext token = tlReplicationContext.get();
       if (token != null)
       {
          // Disassociate thread local
-         repliToken.set(null);
+         tlReplicationContext.set(null);
          // Remove from pending tokens as soon as this is complete
          token.addReplicationAction(new Runnable()
          {
             public void run()
             {
-               activeTokens.remove(token);
+               activeContexts.remove(token);
             }
          });
       }
@@ -401,7 +429,7 @@
     */
    public Set<ReplicationContext> getActiveTokens()
    {
-      return activeTokens;
+      return activeContexts;
    }
 
    private void sendReplicatePacket(final Packet packet)

Modified: trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java	2009-10-16 12:01:15 UTC (rev 8120)
+++ trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java	2009-10-17 02:25:10 UTC (rev 8121)
@@ -192,9 +192,9 @@
    private boolean initialised;
 
    private FailoverManager replicationFailoverManager;
-   
+
    private ReplicationManager replicationManager;
-   
+
    private ReplicationEndpoint replicationEndpoint;
 
    private final Set<ActivateCallback> activateCallbacks = new HashSet<ActivateCallback>();
@@ -251,7 +251,7 @@
 
       addressSettingsRepository.setDefault(new AddressSettings());
 
-     // this.managementConnectorID = managementConnectorSequence.decrementAndGet();
+      // this.managementConnectorID = managementConnectorSequence.decrementAndGet();
    }
 
    // lifecycle methods
@@ -351,7 +351,7 @@
       {
          storageManager.stop();
       }
-      
+
       if (replicationEndpoint != null)
       {
          replicationEndpoint.stop();
@@ -407,7 +407,7 @@
       {
          memoryManager.stop();
       }
-      
+
       pagingManager = null;
       securityStore = null;
       resourceManager = null;
@@ -603,22 +603,21 @@
 
       return new CreateSessionResponseMessage(version.getIncrementingVersion());
    }
-   
+
    public synchronized ReplicationEndpoint createReplicationEndpoint(final Channel channel) throws Exception
    {
       if (!configuration.isBackup())
       {
          throw new HornetQException(HornetQException.ILLEGAL_STATE, "Connected server is not a backup server");
       }
-      
+
       if (replicationEndpoint == null)
       {
          replicationEndpoint = new ReplicationEndpointImpl(this);
          replicationEndpoint.setChannel(channel);
          replicationEndpoint.start();
       }
-      
-      
+
       return replicationEndpoint;
    }
 
@@ -660,81 +659,6 @@
       }
    }
 
-   // public void initialiseBackup(final UUID theUUID, final long liveUniqueID) throws Exception
-   // {
-   // if (theUUID == null)
-   // {
-   // throw new IllegalArgumentException("node id is null");
-   // }
-   //
-   // synchronized (initialiseLock)
-   // {
-   // if (initialised)
-   // {
-   // throw new IllegalStateException("Server is already initialised");
-   // }
-   //
-   // this.uuid = theUUID;
-   //
-   // this.nodeID = new SimpleString(uuid.toString());
-   //
-   // initialisePart2();
-   //
-   // long backupID = storageManager.getCurrentUniqueID();
-   //
-   // if (liveUniqueID != backupID)
-   // {
-   // initialised = false;
-   //
-   // throw new IllegalStateException("Live and backup unique ids different (" + liveUniqueID +
-   // ":" +
-   // backupID +
-   // "). You're probably trying to restart a live backup pair after a crash");
-   // }
-   //
-   // log.info("Backup server is now operational");
-   // }
-   // }
-
-   private boolean startReplication() throws Exception
-   {
-      String backupConnectorName = configuration.getBackupConnectorName();
-
-      if (backupConnectorName != null)
-      {
-         TransportConfiguration backupConnector = configuration.getConnectorConfigurations().get(backupConnectorName);
-
-         if (backupConnector == null)
-         {
-            log.warn("connector with name '" + backupConnectorName + "' is not defined in the configuration.");
-         }
-         else
-         {
-            
-            replicationFailoverManager = new FailoverManagerImpl((ClientSessionFactory)null,
-                                                          backupConnector,
-                                                          null,
-                                                          false,
-                                                          ClientSessionFactoryImpl.DEFAULT_CALL_TIMEOUT,
-                                                          ClientSessionFactoryImpl.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
-                                                          ClientSessionFactoryImpl.DEFAULT_CONNECTION_TTL,
-                                                          0,
-                                                          1.0d,
-                                                          0,
-                                                          1,
-                                                          threadPool,
-                                                          scheduledPool,
-                                                          null);
-  
-            
-            this.replicationManager = new ReplicationManagerImpl(replicationFailoverManager, this.executorFactory.getExecutor());
-            replicationManager.start();
-         }
-      }
-
-      return true;
-   }
-
    public HornetQServerControlImpl getHornetQServerControl()
    {
       return messagingServerControl;
@@ -841,6 +765,30 @@
    // Protected
    // ------------------------------------------------------------------------------------
 
+   /**
+    * Protected so tests can change this behaviour
+    * @param backupConnector
+    */
+   protected FailoverManagerImpl createBackupConnection(final TransportConfiguration backupConnector,
+                                                        final ExecutorService threadPool,
+                                                        final ScheduledExecutorService scheduledPool)
+   {
+      return new FailoverManagerImpl((ClientSessionFactory)null,
+                                     backupConnector,
+                                     null,
+                                     false,
+                                     ClientSessionFactoryImpl.DEFAULT_CALL_TIMEOUT,
+                                     ClientSessionFactoryImpl.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
+                                     ClientSessionFactoryImpl.DEFAULT_CONNECTION_TTL,
+                                     0,
+                                     1.0d,
+                                     0,
+                                     1,
+                                     threadPool,
+                                     scheduledPool,
+                                     null);
+   }
+
    protected PagingManager createPagingManager()
    {
       return new PagingManagerImpl(new PagingStoreFactoryNIO(configuration.getPagingDirectory(), executorFactory),
@@ -873,7 +821,34 @@
 
    // Private
    // --------------------------------------------------------------------------------------
+   
+   private boolean startReplication() throws Exception
+   {
+      String backupConnectorName = configuration.getBackupConnectorName();
 
+      if (backupConnectorName != null)
+      {
+         TransportConfiguration backupConnector = configuration.getConnectorConfigurations().get(backupConnectorName);
+
+         if (backupConnector == null)
+         {
+            log.warn("connector with name '" + backupConnectorName + "' is not defined in the configuration.");
+         }
+         else
+         {
+
+            replicationFailoverManager = createBackupConnection(backupConnector, threadPool, scheduledPool);
+
+            this.replicationManager = new ReplicationManagerImpl(replicationFailoverManager,
+                                                                 this.executorFactory.getExecutor());
+            replicationManager.start();
+         }
+      }
+
+      return true;
+   }
+
+
    private synchronized void callActivateCallbacks()
    {
       for (ActivateCallback callback : activateCallbacks)
@@ -895,10 +870,10 @@
                log.warn("There is no replication endpoint, can't activate this backup server");
                throw new HornetQException(HornetQException.INTERNAL_ERROR, "Can't activate the server");
             }
-            
+
             replicationEndpoint.stop();
          }
-         
+
          // Complete the startup procedure
 
          log.info("Activating server");
@@ -933,11 +908,7 @@
 
       managementService = new ManagementServiceImpl(mbeanServer, configuration);
 
-      remotingService = new RemotingServiceImpl(configuration,
-                                                this,
-                                                managementService,
-                                                threadPool,
-                                                scheduledPool);
+      remotingService = new RemotingServiceImpl(configuration, this, managementService, threadPool, scheduledPool);
 
       if (configuration.getMemoryMeasureInterval() != -1)
       {
@@ -964,7 +935,6 @@
          deploymentManager = new FileDeploymentManager(configuration.getFileDeployerScanPeriod());
       }
 
-      
       startReplication();
 
       this.storageManager = createStorageManager();
@@ -1009,7 +979,7 @@
                                                                 this,
                                                                 queueFactory,
                                                                 scheduledPool,
-                                                                pagingManager, 
+                                                                pagingManager,
                                                                 configuration.isBackup());
 
       // Address settings need to deployed initially, since they're require on paging manager.start()


Property changes on: trunk/tests/src/org/hornetq/tests
___________________________________________________________________
Name: svn:ignore
   + svnignored


Modified: trunk/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java	2009-10-16 12:01:15 UTC (rev 8120)
+++ trunk/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java	2009-10-17 02:25:10 UTC (rev 8121)
@@ -15,6 +15,7 @@
 
 import static org.hornetq.tests.util.RandomUtil.randomString;
 
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.concurrent.CountDownLatch;
@@ -24,6 +25,7 @@
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.hornetq.core.buffers.ChannelBuffers;
 import org.hornetq.core.client.ClientSessionFactory;
@@ -46,6 +48,9 @@
 import org.hornetq.core.paging.impl.PagingManagerImpl;
 import org.hornetq.core.paging.impl.PagingStoreFactoryNIO;
 import org.hornetq.core.persistence.StorageManager;
+import org.hornetq.core.remoting.Interceptor;
+import org.hornetq.core.remoting.Packet;
+import org.hornetq.core.remoting.RemotingConnection;
 import org.hornetq.core.remoting.impl.invm.InVMConnectorFactory;
 import org.hornetq.core.remoting.spi.HornetQBuffer;
 import org.hornetq.core.replication.impl.ReplicatedJournal;
@@ -79,8 +84,6 @@
 
    private ExecutorService executor;
 
-   private FailoverManager connectionManager;
-
    private ScheduledExecutorService scheduledExecutor;
 
    // Static --------------------------------------------------------
@@ -98,11 +101,13 @@
 
       HornetQServer server = new HornetQServerImpl(config);
 
+      FailoverManager failoverManager = createFailoverManager();
+
       server.start();
 
       try
       {
-         ReplicationManagerImpl manager = new ReplicationManagerImpl(connectionManager, executor);
+         ReplicationManagerImpl manager = new ReplicationManagerImpl(failoverManager, executor);
          manager.start();
          manager.stop();
       }
@@ -123,9 +128,12 @@
 
       server.start();
 
+      FailoverManager failoverManager = createFailoverManager();
+
       try
       {
-         ReplicationManagerImpl manager = new ReplicationManagerImpl(connectionManager, executor);
+         ReplicationManagerImpl manager = new ReplicationManagerImpl(failoverManager, executor);
+
          try
          {
             manager.start();
@@ -154,9 +162,11 @@
 
       server.start();
 
+      FailoverManager failoverManager = createFailoverManager();
+
       try
       {
-         ReplicationManagerImpl manager = new ReplicationManagerImpl(connectionManager, executor);
+         ReplicationManagerImpl manager = new ReplicationManagerImpl(failoverManager, executor);
          manager.start();
 
          Journal replicatedJournal = new ReplicatedJournal((byte)1, new FakeJournal(), manager);
@@ -212,12 +222,12 @@
                                                          server.getConfiguration(),
                                                          server.getExecutorFactory(),
                                                          server.getAddressSettingsRepository());
-         
+
          PagingStore store = pagingManager.getPageStore(dummy);
          store.start();
          assertEquals(5, store.getNumberOfPages());
          store.stop();
-         
+
          manager.pageDeleted(dummy, 1);
          manager.pageDeleted(dummy, 2);
          manager.pageDeleted(dummy, 3);
@@ -226,25 +236,24 @@
          manager.pageDeleted(dummy, 6);
 
          blockOnReplication(manager);
-         
+
          ServerMessageImpl serverMsg = new ServerMessageImpl();
          serverMsg.setMessageID(500);
          serverMsg.setDestination(new SimpleString("tttt"));
-         
-         
+
          HornetQBuffer buffer = ChannelBuffers.dynamicBuffer(100);
          serverMsg.encodeProperties(buffer);
-         
+
          manager.largeMessageBegin(500);
 
          manager.largeMessageWrite(500, new byte[1024]);
-         
+
          manager.largeMessageEnd(500);
-         
+
          blockOnReplication(manager);
-         
+
          store.start();
-         
+
          assertEquals(0, store.getNumberOfPages());
 
          manager.stop();
@@ -255,26 +264,49 @@
       }
    }
 
-
    public void testSendPacketsWithFailure() throws Exception
    {
 
       Configuration config = createDefaultConfig(false);
 
       config.setBackup(true);
+      
+      final AtomicBoolean returnIntercept = new AtomicBoolean(true);
 
+      final Interceptor intercept = new Interceptor()
+      {
+
+         public boolean intercept(Packet packet, RemotingConnection connection) throws HornetQException
+         {
+            if (returnIntercept.get())
+            {
+               System.out.println("Returning true");
+            }
+            return returnIntercept.get();
+         }
+
+      };
+
       HornetQServer server = new HornetQServerImpl(config);
 
       server.start();
 
+      final ArrayList<Interceptor> listInterceptor = new ArrayList<Interceptor>();
+      listInterceptor.add(intercept);
+
+      FailoverManager failoverManager = createFailoverManager(listInterceptor);
+
       try
       {
-         ReplicationManagerImpl manager = new ReplicationManagerImpl(connectionManager, executor);
+         ReplicationManagerImpl manager = new ReplicationManagerImpl(failoverManager, executor);
          manager.start();
 
          Journal replicatedJournal = new ReplicatedJournal((byte)1, new FakeJournal(), manager);
 
-         for (int i = 0 ; i < 500; i++)
+         Thread.sleep(100);
+         returnIntercept.set(false);
+
+         for (int i = 0; i < 500; i++)
          {
             replicatedJournal.appendAddRecord(i, (byte)1, new FakeData(), false);
          }
@@ -287,10 +319,12 @@
                latch.countDown();
             }
          });
-         
+
          manager.closeContext();
          
-         assertTrue(latch.await(10, TimeUnit.SECONDS));
+         server.stop();
+
+         assertTrue(latch.await(50, TimeUnit.SECONDS));
       }
       finally
       {
@@ -314,7 +348,7 @@
          }
 
       });
-      
+
       assertTrue(latch.await(30, TimeUnit.SECONDS));
    }
 
@@ -329,9 +363,11 @@
 
       server.start();
 
+      FailoverManager failoverManager = createFailoverManager();
+
       try
       {
-         ReplicationManagerImpl manager = new ReplicationManagerImpl(connectionManager, executor);
+         ReplicationManagerImpl manager = new ReplicationManagerImpl(failoverManager, executor);
          manager.start();
 
          Journal replicatedJournal = new ReplicatedJournal((byte)1, new FakeJournal(), manager);
@@ -395,25 +431,7 @@
    }
 
    // Package protected ---------------------------------------------
-   /*class LocalRemotingServiceImpl extends RemotingServiceImpl
-   {
-      
-      public LocalRemotingServiceImpl(final Configuration config,
-                                 final HornetQServer server,
-                                 final ManagementService managementService,
-                                 final Executor threadPool,
-                                 final ScheduledExecutorService scheduledThreadPool)
-      {
-         super(config, server, managementService, threadPool, scheduledThreadPool);
-      }
 
-      protected ChannelHandler createHandler(RemotingConnection conn, Channel channel)
-      {
-         return super.createHandler(conn, channel);
-      }
-
-   }*/
-
    // Protected -----------------------------------------------------
 
    protected void setUp() throws Exception
@@ -426,25 +444,33 @@
 
       scheduledExecutor = new ScheduledThreadPoolExecutor(10, tFactory);
 
+   }
+
+   private FailoverManagerImpl createFailoverManager()
+   {
+      return createFailoverManager(null);
+   }
+
+   private FailoverManagerImpl createFailoverManager(List<Interceptor> interceptors)
+   {
       TransportConfiguration connectorConfig = new TransportConfiguration(InVMConnectorFactory.class.getName(),
                                                                           new HashMap<String, Object>(),
                                                                           randomString());
 
-      connectionManager = new FailoverManagerImpl((ClientSessionFactory)null,
-                                                  connectorConfig,
-                                                      null,
-                                                      false,
-                                                      ClientSessionFactoryImpl.DEFAULT_CALL_TIMEOUT,
-                                                      ClientSessionFactoryImpl.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
-                                                      ClientSessionFactoryImpl.DEFAULT_CONNECTION_TTL,
-                                                      0,
-                                                      1.0d,
-                                                      0,
-                                                      1,
-                                                      executor,
-                                                      scheduledExecutor,
-                                                      null);
-
+      return new FailoverManagerImpl((ClientSessionFactory)null,
+                                     connectorConfig,
+                                     null,
+                                     false,
+                                     ClientSessionFactoryImpl.DEFAULT_CALL_TIMEOUT,
+                                     ClientSessionFactoryImpl.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
+                                     ClientSessionFactoryImpl.DEFAULT_CONNECTION_TTL,
+                                     0,
+                                     1.0d,
+                                     0,
+                                     1,
+                                     executor,
+                                     scheduledExecutor,
+                                     interceptors);
    }
 
    protected void tearDown() throws Exception
@@ -456,8 +482,6 @@
 
       tFactory = null;
 
-      connectionManager = null;
-
       scheduledExecutor = null;
 
       super.tearDown();



More information about the hornetq-commits mailing list