[hornetq-commits] JBoss hornetq SVN: r11765 - in trunk/hornetq-core/src/main/java/org/hornetq/core: persistence/impl/journal and 2 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Fri Nov 25 10:00:31 EST 2011


Author: borges
Date: 2011-11-25 10:00:31 -0500 (Fri, 25 Nov 2011)
New Revision: 11765

Modified:
   trunk/hornetq-core/src/main/java/org/hornetq/core/persistence/StorageManager.java
   trunk/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
   trunk/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java
   trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java
Log:
HORNETQ-720 HORNETQ-768 Reset state at JournalStorageManager in case of errors.

Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/persistence/StorageManager.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/persistence/StorageManager.java	2011-11-25 14:54:32 UTC (rev 11764)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/persistence/StorageManager.java	2011-11-25 15:00:31 UTC (rev 11765)
@@ -269,4 +269,11 @@
       ServerMessage message,
       RoutingContext ctx,
       RouteContextList listCtx) throws Exception;
+
+   /**
+    * Stops the replication of data from the live to the backup.
+    * <p>
+    * Typical scenario is a broken connection.
+    */
+   void stopReplication();
 }

Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java	2011-11-25 14:54:32 UTC (rev 11764)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java	2011-11-25 15:00:31 UTC (rev 11765)
@@ -194,9 +194,11 @@
         }
     }
 
-    private Journal messageJournal;
-    private Journal bindingsJournal;
-    private final SequentialFileFactory largeMessagesFactory;
+   private Journal messageJournal;
+   private Journal bindingsJournal;
+   private final Journal originalMessageJournal;
+   private final Journal originalBindingsJournal;
+   private final SequentialFileFactory largeMessagesFactory;
 
     private volatile boolean started;
 
@@ -235,18 +237,10 @@
    public JournalStorageManager(final Configuration config, final ExecutorFactory executorFactory,
                                 final IOCriticalErrorListener criticalErrorListener)
    {
-      this(config, executorFactory, null, criticalErrorListener);
-   }
-
-   public JournalStorageManager(final Configuration config, final ExecutorFactory executorFactory,
-                                final ReplicationManager replicator, final IOCriticalErrorListener criticalErrorListener)
-   {
       this.executorFactory = executorFactory;
 
       executor = executorFactory.getExecutor();
 
-      this.replicator = replicator;
-
       if (config.getJournalType() != JournalType.NIO && config.getJournalType() != JournalType.ASYNCIO)
       {
          throw new IllegalArgumentException("Only NIO and AsyncIO are supported journals");
@@ -274,19 +268,13 @@
                                                 "bindings",
                                                 1);
 
-        if (replicator != null)
-            {
-                bindingsJournal = new ReplicatedJournal((byte)0, localBindings, replicator);
-            }
-        else
-            {
-                bindingsJournal = localBindings;
-            }
+      bindingsJournal = localBindings;
+      originalBindingsJournal = localBindings;
 
-        if (journalDir == null)
-            {
-                throw new NullPointerException("journal-dir is null");
-            }
+      if (journalDir == null)
+      {
+         throw new NullPointerException("journal-dir is null");
+      }
 
         createJournalDir = config.isCreateJournalDir();
 
@@ -332,14 +320,8 @@
                                                config.getJournalType() == JournalType.ASYNCIO ? config.getJournalMaxIO_AIO()
                                                : config.getJournalMaxIO_NIO());
 
-      if (replicator != null)
-      {
-         messageJournal = new ReplicatedJournal((byte)1, localMessage, replicator);
-      }
-      else
-      {
-         messageJournal = localMessage;
-      }
+      messageJournal = localMessage;
+      originalMessageJournal = localMessage;
 
       largeMessagesDirectory = config.getLargeMessagesDirectory();
       largeMessagesFactory = new NIOSequentialFileFactory(largeMessagesDirectory, false, criticalErrorListener);
@@ -380,8 +362,8 @@
       JournalFile[] messageFiles = null;
       JournalFile[] bindingsFiles = null;
 
-      final Journal localMessageJournal = messageJournal;
-      final Journal localBindingsJournal = bindingsJournal;
+      try
+      {
 
       Map<String, Long> largeMessageFilesToSync;
       Map<SimpleString, Collection<Integer>> pageFilesToSync;
@@ -389,15 +371,15 @@
       try
       {
          replicator = replicationManager;
-         localMessageJournal.synchronizationLock();
-         localBindingsJournal.synchronizationLock();
+         originalMessageJournal.synchronizationLock();
+         originalBindingsJournal.synchronizationLock();
          try
          {
             pagingManager.lock();
             try
             {
-               messageFiles = prepareJournalForCopy(localMessageJournal, JournalContent.MESSAGES, nodeID);
-               bindingsFiles = prepareJournalForCopy(localBindingsJournal, JournalContent.BINDINGS, nodeID);
+               messageFiles = prepareJournalForCopy(originalMessageJournal, JournalContent.MESSAGES, nodeID);
+               bindingsFiles = prepareJournalForCopy(originalBindingsJournal, JournalContent.BINDINGS, nodeID);
                pageFilesToSync = getPageInformationForSync(pagingManager);
                largeMessageFilesToSync = getLargeMessageInformation();
             }
@@ -408,11 +390,11 @@
          }
          finally
          {
-            localMessageJournal.synchronizationUnlock();
-            localBindingsJournal.synchronizationUnlock();
+            originalMessageJournal.synchronizationUnlock();
+            originalBindingsJournal.synchronizationUnlock();
          }
-         bindingsJournal = new ReplicatedJournal(((byte)0), localBindingsJournal, replicator);
-         messageJournal = new ReplicatedJournal((byte)1, localMessageJournal, replicator);
+         bindingsJournal = new ReplicatedJournal(((byte)0), originalBindingsJournal, replicator);
+         messageJournal = new ReplicatedJournal((byte)1, originalMessageJournal, replicator);
       }
       finally
       {
@@ -433,11 +415,36 @@
       }
       finally
       {
+            storageManagerLock.writeLock().unlock();
+         }
+      }
+      catch (Exception e)
+      {
+         stopReplication();
+         throw e;
+      }
+   }
+
+   /**
+    * Stops replication by resetting replication-related fields to their 'unreplicated' state.
+    */
+   @Override
+   public void stopReplication()
+   {
+
+      storageManagerLock.writeLock().lock();
+      try
+      {
+         bindingsJournal = originalBindingsJournal;
+         messageJournal = originalMessageJournal;
+         replicator = null;
+      }
+      finally
+      {
          storageManagerLock.writeLock().unlock();
       }
    }
 
-
    /**
     * @param pageFilesToSync
     * @throws Exception
@@ -530,10 +537,10 @@
    private JournalFile[]
             prepareJournalForCopy(Journal journal, JournalContent contentType, String nodeID) throws Exception
     {
-        journal.forceMoveNextFile();
-        JournalFile[] datafiles = journal.getDataFiles();
+      journal.forceMoveNextFile();
+      JournalFile[] datafiles = journal.getDataFiles();
       replicator.sendStartSyncMessage(datafiles, contentType, nodeID);
-        return datafiles;
+      return datafiles;
     }
 
     public void waitOnOperations() throws Exception
@@ -4105,4 +4112,5 @@
 
    }
 
+
 }
\ No newline at end of file

Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java	2011-11-25 14:54:32 UTC (rev 11764)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java	2011-11-25 15:00:31 UTC (rev 11765)
@@ -618,4 +618,10 @@
    {
       return false;
    }
+
+   @Override
+   public void stopReplication()
+   {
+      // no-op
+   }
  }

Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java	2011-11-25 14:54:32 UTC (rev 11764)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java	2011-11-25 15:00:31 UTC (rev 11765)
@@ -88,6 +88,7 @@
 import org.hornetq.core.protocol.core.Channel;
 import org.hornetq.core.protocol.core.CoreRemotingConnection;
 import org.hornetq.core.protocol.core.impl.ChannelImpl.CHANNEL_ID;
+import org.hornetq.core.remoting.FailureListener;
 import org.hornetq.core.remoting.server.RemotingService;
 import org.hornetq.core.remoting.server.impl.RemotingServiceImpl;
 import org.hornetq.core.replication.ReplicationEndpoint;
@@ -227,6 +228,7 @@
    private final Object initialiseLock = new Object();
    private boolean initialised;
    private final Object startUpLock = new Object();
+   private final Object replicationLock = new Object();
 
    /**
     * Only applicable to 'remote backup servers'. If this flag is false the backup may not become
@@ -1201,7 +1203,7 @@
    {
       if (configuration.isPersistenceEnabled())
       {
-         return new JournalStorageManager(configuration, executorFactory, replicationManager, shutdownOnCriticalIO);
+         return new JournalStorageManager(configuration, executorFactory, shutdownOnCriticalIO);
       }
       else
       {
@@ -2265,30 +2267,60 @@
          throw new HornetQException(HornetQException.ALREADY_REPLICATING);
       }
 
-      replicationManager = new ReplicationManagerImpl(rc, executorFactory);
-      try
+      if (!isStarted())
       {
-         replicationManager.start();
-         storageManager.startReplication(replicationManager, pagingManager, getNodeID().toString(), clusterConnection,
-                                         pair);
+         throw new IllegalStateException();
       }
-      catch (Exception e)
+
+      synchronized (replicationLock)
       {
-         /*
-          * The reasoning here is that the exception was either caused by (1) the (interaction with)
-          * the backup, or (2) by an IO Error at the storage. If (1), we can swallow the exception
-          * and ignore the replication request. If (2) the live will crash shortly.
-          */
-         log.warn("Exception when trying to start replication", e);
-         replicationManager = null;
-         if (e instanceof HornetQException)
+
+         if (replicationManager != null)
          {
-            throw (HornetQException)e;
+            throw new HornetQException(HornetQException.ALREADY_REPLICATING);
          }
-         else
+
+         rc.addFailureListener(new ReplicationFailureListener());
+         replicationManager = new ReplicationManagerImpl(rc, executorFactory);
+
+         try
          {
-            throw new HornetQException(HornetQException.INTERNAL_ERROR, "Error trying to start replication", e);
+            replicationManager.start();
+            storageManager.startReplication(replicationManager, pagingManager, getNodeID().toString(),
+                                            clusterConnection, pair);
          }
+         catch (Exception e)
+         {
+            /*
+             * The reasoning here is that the exception was either caused by (1) the (interaction
+             * with) the backup, or (2) by an IO Error at the storage. If (1), we can swallow the
+             * exception and ignore the replication request. If (2) the live will crash shortly.
+             */
+            log.warn("Exception when trying to start replication", e);
+
+            try
+            {
+               if (replicationManager != null)
+                  replicationManager.stop();
+            }
+            catch (Exception hqe)
+            {
+               log.warn("Exception while trying to close replicationManager", hqe);
+            }
+            finally
+            {
+               replicationManager = null;
+            }
+
+            if (e instanceof HornetQException)
+            {
+               throw (HornetQException)e;
+            }
+            else
+            {
+               throw new HornetQException(HornetQException.INTERNAL_ERROR, "Error trying to start replication", e);
+            }
+         }
       }
    }
 
@@ -2309,4 +2341,28 @@
       nodeManager.setNodeID(nodeID);
       backupUpToDate = true;
    }
+
+   private final class ReplicationFailureListener implements FailureListener
+   {
+
+      @Override
+      public void connectionFailed(HornetQException exception, boolean failedOver)
+      {
+         Executors.newSingleThreadExecutor().execute(new Runnable()
+         {
+            public void run()
+            {
+               synchronized (replicationLock)
+
+               {
+                  if (replicationManager != null)
+                  {
+                     storageManager.stopReplication();
+                  }
+               }
+            }
+         });
+      }
+   }
+
 }



More information about the hornetq-commits mailing list