[hornetq-commits] JBoss hornetq SVN: r11501 - branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl.

do-not-reply at jboss.org do-not-reply at jboss.org
Mon Oct 10 09:35:11 EDT 2011


Author: borges
Date: 2011-10-10 09:35:11 -0400 (Mon, 10 Oct 2011)
New Revision: 11501

Modified:
   branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
Log:
HORNETQ-720 Make ReplicationEndpoint.stop() more robust, and do close all open files

Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java	2011-10-10 13:35:00 UTC (rev 11500)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java	2011-10-10 13:35:11 UTC (rev 11501)
@@ -18,6 +18,7 @@
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.channels.FileChannel;
+import java.util.Arrays;
 import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.Map;
@@ -91,7 +92,7 @@
    /** Files reserved in each journal for synchronization of existing data from the 'live' server. */
    private final Map<JournalContent, Map<Long, JournalSyncFile>> filesReservedForSync =
             new HashMap<JournalContent, Map<Long, JournalSyncFile>>();
-   private Map<Long, LargeServerMessage> largeMessagesOnSync = new HashMap<Long, LargeServerMessage>();
+   private final Map<Long, LargeServerMessage> largeMessagesOnSync = new HashMap<Long, LargeServerMessage>();
 
    /**
     * Used to hold the real Journals before the backup is synchronized. This field should be
@@ -144,6 +145,7 @@
     */
    public void handlePacket(final Packet packet)
    {
+
       PacketImpl response = new ReplicationResponseMessage();
       final byte type=packet.getType();
 
@@ -230,7 +232,7 @@
       return started;
    }
 
-   public void start() throws Exception
+   public synchronized void start() throws Exception
    {
       Configuration config = server.getConfiguration();
 
@@ -263,22 +265,19 @@
 
    }
 
-   /* (non-Javadoc)
-    * @see org.hornetq.core.server.HornetQComponent#stop()
-    */
-   public void stop() throws Exception
+   public synchronized void stop() throws Exception
    {
-      if(!started)
+      if (!started)
       {
           return;
       }
+
       // This could be null if the backup server is being
       // shut down without any live server connecting here
       if (channel != null)
       {
          channel.close();
       }
-      storage.stop();
 
       for (ConcurrentMap<Integer, Page> map : pageIndex.values())
       {
@@ -301,11 +300,28 @@
       {
          largeMessage.releaseResources();
       }
-
       largeMessages.clear();
 
+      for (LargeServerMessage largeMessage : largeMessagesOnSync.values())
+      {
+         largeMessage.releaseResources();
+      }
+      largeMessagesOnSync.clear();
+
+      for (Entry<JournalContent, Map<Long, JournalSyncFile>> entry : filesReservedForSync.entrySet())
+      {
+         for (JournalSyncFile filesReserved : entry.getValue().values())
+         {
+            filesReserved.close();
+         }
+      }
+
+      filesReservedForSync.clear();
       pageManager.stop();
 
+      // Storage needs to be the last to stop
+      storage.stop();
+
       started = false;
    }
 
@@ -437,9 +453,9 @@
             }
             largeMessages.clear();
             largeMessages.putAll(largeMessagesOnSync);
+            largeMessagesOnSync.clear();
          }
       }
-      largeMessagesOnSync = null;
       journalsHolder = null;
       server.setRemoteBackupUpToDate();
       log.info("Backup server " + server + " is synchronized with live-server.");
@@ -524,15 +540,23 @@
          return;
       }
 
+
       final Journal journal = journalsHolder.get(packet.getJournalContentType());
+      synchronized (this)
+      {
+         if (!started)
+               return;
+      Map<Long, JournalSyncFile> mapToFill = filesReservedForSync.get(packet.getJournalContentType());
+         log.info("Journal " + packet.getJournalContentType() + ". Reserving fileIDs for synchronization: " +
+                  Arrays.toString(packet.getFileIds()));
 
-      Map<Long, JournalSyncFile> mapToFill = filesReservedForSync.get(packet.getJournalContentType());
       for (Entry<Long, JournalFile> entry : journal.createFilesForBackupSync(packet.getFileIds()).entrySet())
       {
          mapToFill.put(entry.getKey(), new JournalSyncFile(entry.getValue()));
       }
       registerJournal(packet.getJournalContentType().typeByte, new FileWrapperJournal(journal));
-     }
+      }
+   }
 
    private void handleLargeMessageEnd(final ReplicationLargeMessageEndMessage packet)
    {
@@ -601,9 +625,7 @@
          log.warn("Large MessageID " + messageId +
                                           "  is not available on backup server. Ignoring replication message");
       }
-
       return message;
-
    }
 
    /**
@@ -845,7 +867,8 @@
 
       void close() throws IOException
       {
-         channel.close();
+         if (channel != null)
+            channel.close();
       }
 
       @Override



More information about the hornetq-commits mailing list