[hornetq-commits] JBoss hornetq SVN: r9532 - in trunk: src/main/org/hornetq/core/persistence/impl/journal and 2 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Thu Aug 12 15:35:54 EDT 2010


Author: clebert.suconic at jboss.com
Date: 2010-08-12 15:35:52 -0400 (Thu, 12 Aug 2010)
New Revision: 9532

Modified:
   trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java
   trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
   trunk/src/main/org/hornetq/core/server/impl/ScheduledDeliveryHandlerImpl.java
   trunk/tests/src/org/hornetq/tests/util/ServiceTestBase.java
Log:
HORNETQ-482 Improving startup time on large journal files

Modified: trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java	2010-08-12 16:31:35 UTC (rev 9531)
+++ trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java	2010-08-12 19:35:52 UTC (rev 9532)
@@ -20,6 +20,7 @@
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.LinkedHashMap;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.NoSuchElementException;
@@ -32,6 +33,7 @@
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingDeque;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -65,7 +67,6 @@
 import org.hornetq.core.journal.impl.dataformat.JournalRollbackRecordTX;
 import org.hornetq.core.logging.Logger;
 import org.hornetq.utils.DataConstants;
-import org.hornetq.utils.concurrent.LinkedBlockingDeque;
 
 /**
  * 
@@ -385,8 +386,6 @@
       return compactor;
    }
 
-
-   
    /** this method is used internally only however tools may use it to maintenance. 
     *  It won't be part of the interface as the tools should be specific to the implementation */
    public List<JournalFile> orderFiles() throws Exception
@@ -400,11 +399,11 @@
          SequentialFile file = fileFactory.createSequentialFile(fileName, maxAIO);
 
          file.open(1, false);
-         
+
          try
          {
             long fileID = readFileHeader(file);
-   
+
             orderedFiles.add(new JournalFileImpl(file, fileID));
          }
          finally
@@ -420,10 +419,10 @@
 
       return orderedFiles;
    }
-   
+
    private void calculateNextfileID(List<JournalFile> files)
    {
-      
+
       for (JournalFile file : files)
       {
          long fileID = file.getFileID();
@@ -431,9 +430,9 @@
          {
             nextFileID.set(fileID);
          }
-   
+
          long fileNameID = getFileNameID(file.getFile().getFileName());
-   
+
          // The compactor could create a fileName but use a previously assigned ID.
          // Because of that we need to take both parts into account
          if (nextFileID.get() < fileNameID)
@@ -442,14 +441,8 @@
          }
       }
 
-
    }
 
-   
-
-
-
-
    /** this method is used internally only however tools may use it to maintenance.  */
    public static int readJournalFile(final SequentialFileFactory fileFactory,
                                      final JournalFile file,
@@ -502,7 +495,6 @@
             // This is what supports us from not re-filling the whole file
             int readFileId = wholeFileBuffer.getInt();
 
-            
             // This record is from a previous file-usage. The file was
             // reused and we need to ignore this record
             if (readFileId != file.getRecordID())
@@ -511,7 +503,6 @@
                continue;
             }
 
-
             long transactionID = 0;
 
             if (JournalImpl.isTransaction(recordType))
@@ -570,10 +561,10 @@
                {
                   if (JournalImpl.isInvalidSize(journalFileSize, wholeFileBuffer.position(), 1))
                   {
-                     wholeFileBuffer.position(pos +1);
+                     wholeFileBuffer.position(pos + 1);
                      continue;
                   }
-                  
+
                   userRecordType = wholeFileBuffer.get();
                }
 
@@ -585,8 +576,8 @@
 
                record = new byte[variableSize];
 
-                wholeFileBuffer.get(record);
-         }
+               wholeFileBuffer.get(record);
+            }
 
             // Case this is a transaction, this will contain the number of pendingTransactions on a transaction, at the
             // currentFile
@@ -660,7 +651,7 @@
             // checkSize by some sort of calculated hash)
             if (checkSize != variableSize + recordSize + preparedTransactionExtraDataSize)
             {
-               JournalImpl.trace("Record at position " + pos +
+              JournalImpl.trace("Record at position " + pos +
                                  " recordType = " +
                                  recordType +
                                  " possible transactionID = " + transactionID + 
@@ -1058,7 +1049,7 @@
       {
          if (JournalImpl.LOAD_TRACE)
          {
-            JournalImpl.trace("appendAddRecordTransactional txID " + txID +
+             JournalImpl.trace("appendAddRecordTransactional txID " + txID +
                               ", id = " +
                               id +
                               ", recordType = " +
@@ -1326,7 +1317,6 @@
             JournalImpl.trace("appendCommitRecord txID " + txID + ", compacting = " + (compactor != null));
          }
 
-
          if (tx == null)
          {
             throw new IllegalStateException("Cannot find tx with id " + txID);
@@ -1461,48 +1451,60 @@
                                                    final TransactionFailureCallback failureCallback) throws Exception
    {
       final Set<Long> recordsToDelete = new HashSet<Long>();
-      final List<RecordInfo> records = new ArrayList<RecordInfo>();
+      // ArrayList was taking too long to delete elements on checkDeleteSize
+      final List<RecordInfo> records = new LinkedList<RecordInfo>();
 
       final int DELETE_FLUSH = 20000;
 
       JournalLoadInformation info = load(new LoaderCallback()
       {
+         Runtime runtime = Runtime.getRuntime();
+
+         private void checkDeleteSize()
+         {
+            // HORNETQ-482 - Flush deletes only if memory is critical 
+            if (recordsToDelete.size() > DELETE_FLUSH && (runtime.freeMemory() > (runtime.maxMemory() * 0.8)))
+            {
+               log.debug("Flushing deletes during loading, deleteCount = " + recordsToDelete.size());
+               // Clean up when the list is too large, or it won't be possible to load large sets of files
+               // Done as part of JBMESSAGING-1678
+               Iterator<RecordInfo> iter = records.iterator();
+               while (iter.hasNext())
+               {
+                  RecordInfo record = iter.next();
+
+                  if (recordsToDelete.contains(record.id))
+                  {
+                     iter.remove();
+                  }
+               }
+
+               recordsToDelete.clear();
+            }
+         }
+
          public void addPreparedTransaction(final PreparedTransactionInfo preparedTransaction)
          {
             preparedTransactions.add(preparedTransaction);
+            checkDeleteSize();
          }
 
          public void addRecord(final RecordInfo info)
          {
             records.add(info);
+            checkDeleteSize();
          }
 
          public void updateRecord(final RecordInfo info)
          {
             records.add(info);
+            checkDeleteSize();
          }
 
          public void deleteRecord(final long id)
          {
             recordsToDelete.add(id);
-
-            // Clean up when the list is too large, or it won't be possible to load large sets of files
-            // Done as part of JBMESSAGING-1678
-            if (recordsToDelete.size() == DELETE_FLUSH)
-            {
-               Iterator<RecordInfo> iter = records.iterator();
-               while (iter.hasNext())
-               {
-                  RecordInfo record = iter.next();
-
-                  if (recordsToDelete.contains(record.id))
-                  {
-                     iter.remove();
-                  }
-               }
-
-               recordsToDelete.clear();
-            }
+            checkDeleteSize();
          }
 
          public void failedTransaction(final long transactionID,
@@ -1551,7 +1553,7 @@
             JournalImpl.trace("Starting compacting operation on journal");
          }
          JournalImpl.log.debug("Starting compacting operation on journal");
-         
+
          onCompactStart();
 
          // We need to guarantee that the journal is frozen for this short time
@@ -1774,7 +1776,7 @@
       final Map<Long, TransactionHolder> loadTransactions = new LinkedHashMap<Long, TransactionHolder>();
 
       final List<JournalFile> orderedFiles = orderFiles();
-      
+
       calculateNextfileID(orderedFiles);
 
       int lastDataPos = JournalImpl.SIZE_HEADER;
@@ -2290,7 +2292,7 @@
       {
          return;
       }
- 
+
       compactingLock.readLock().lock();
 
       try
@@ -2307,7 +2309,7 @@
                JournalImpl.trace("Cleaning up file " + file);
             }
             JournalImpl.log.debug("Cleaning up file " + file);
-            
+
             if (file.getPosCount() == 0)
             {
                // nothing to be done
@@ -2327,7 +2329,7 @@
                   jrnFile.incPosCount(); // this file can't be reclaimed while cleanup is being done
                }
             }
-            
+
             currentFile.resetNegCount(file);
             currentFile.incPosCount();
             dependencies.add(currentFile);
@@ -2360,7 +2362,7 @@
 
          SequentialFile controlFile = createControlFile(null, null, new Pair<String, String>(tmpFileName,
                                                                                              cleanedFileName));
-         
+
          SequentialFile returningFile = fileFactory.createSequentialFile(file.getFile().getFileName(), maxAIO);
 
          returningFile.renameTo(renameExtensionFile(tmpFileName, ".cmp") + ".tmp");
@@ -2368,9 +2370,9 @@
          tmpFile.renameTo(cleanedFileName);
 
          controlFile.delete();
-         
+
          final JournalFile retJournalfile = new JournalFileImpl(returningFile, -1);
-         
+
          filesExecutor.execute(new Runnable()
          {
             public void run()
@@ -2395,7 +2397,7 @@
       }
 
    }
-   
+
    private boolean needsCompact() throws Exception
    {
       JournalFile[] dataFiles = getDataFiles();
@@ -2422,7 +2424,7 @@
          // compacting is disabled
          return;
       }
-      
+
       if (state != JournalImpl.STATE_LOADED)
       {
          return;
@@ -2598,7 +2600,7 @@
    {
       return maxAIO;
    }
-   
+
    public int getUserVersion()
    {
       return userVersion;
@@ -2675,14 +2677,14 @@
       {
 
          state = JournalImpl.STATE_STOPPED;
-         
+
          compactorExecutor.shutdown();
-         
+
          if (!compactorExecutor.awaitTermination(120, TimeUnit.SECONDS))
          {
             JournalImpl.log.warn("Couldn't stop compactor executor after 120 seconds");
          }
-         
+
          filesExecutor.shutdown();
 
          if (!filesExecutor.awaitTermination(60, TimeUnit.SECONDS))
@@ -2754,13 +2756,14 @@
    /** being protected as testcases can override this method */
    protected void renameFiles(final List<JournalFile> oldFiles, final List<JournalFile> newFiles) throws Exception
    {
-      
-      // addFreeFiles has to be called through filesExecutor, or the fileID on the orderedFiles may end up in a wrong order
-      // These files are already freed, and are described on the compactor file control. 
+
+      // addFreeFiles has to be called through filesExecutor, or the fileID on the orderedFiles may end up in a wrong
+      // order
+      // These files are already freed, and are described on the compactor file control.
       // In case of crash they will be cleared anyways
-      
+
       final CountDownLatch done = new CountDownLatch(1);
-      
+
       filesExecutor.execute(new Runnable()
       {
          public void run()
@@ -2773,13 +2776,13 @@
                }
                catch (Throwable e)
                {
-                  log.warn("Error reinitializing file "  + file, e);
+                  log.warn("Error reinitializing file " + file, e);
                }
             }
             done.countDown();
          }
       });
-      
+
       // need to wait all old files to be freed
       // to avoid a race where the CTR file is deleted before the init for these files is already done
       // what could cause a duplicate in case of a crash after the CTR is deleted and before the file is initialized
@@ -2812,7 +2815,7 @@
    protected void onCompactDone()
    {
    }
-   
+
    // Private
    // -----------------------------------------------------------------------------
 
@@ -2824,7 +2827,7 @@
    {
       if (file.getFile().size() != this.getFileSize())
       {
-         log.warn("Deleting "  + file + ".. as it doesn't have the configured size", new Exception ("trace"));
+         log.warn("Deleting " + file + ".. as it doesn't have the configured size", new Exception("trace"));
          file.getFile().delete();
       }
       else
@@ -2834,7 +2837,7 @@
          // Re-initialise it
 
          JournalFile jf = reinitializeFile(file);
-         
+
          if (renameTmp)
          {
             jf.getFile().renameTo(renameExtensionFile(jf.getFile().getFileName(), ".tmp"));
@@ -2959,20 +2962,19 @@
       file.read(bb);
 
       int journalVersion = bb.getInt();
-      
+
       if (journalVersion != FORMAT_VERSION)
       {
          throw new HornetQException(HornetQException.IO_ERROR, "Journal files version mismatch");
       }
-      
-      
+
       int readUserVersion = bb.getInt();
-      
+
       if (readUserVersion != userVersion)
       {
          throw new HornetQException(HornetQException.IO_ERROR, "Journal data belong to a different version");
       }
-      
+
       long fileID = bb.getLong();
 
       fileFactory.releaseBuffer(bb);
@@ -2986,15 +2988,18 @@
     * @param sequentialFile
     * @throws Exception
     */
-   public static int initFileHeader(final SequentialFileFactory fileFactory, final SequentialFile sequentialFile, final int userVersion, final long fileID) throws Exception
+   public static int initFileHeader(final SequentialFileFactory fileFactory,
+                                    final SequentialFile sequentialFile,
+                                    final int userVersion,
+                                    final long fileID) throws Exception
    {
       // We don't need to release buffers while writing.
       ByteBuffer bb = fileFactory.newBuffer(JournalImpl.SIZE_HEADER);
-      
+
       HornetQBuffer buffer = HornetQBuffers.wrappedBuffer(bb);
 
       writeHeader(buffer, userVersion, fileID);
-      
+
       bb.rewind();
 
       int bufferSize = bb.limit();
@@ -3014,12 +3019,12 @@
    public static void writeHeader(HornetQBuffer buffer, final int userVersion, final long fileID)
    {
       buffer.writeInt(FORMAT_VERSION);
-      
+
       buffer.writeInt(userVersion);
 
       buffer.writeLong(fileID);
    }
-   
+
    /** 
     * 
     * @param completeTransaction If the appendRecord is for a prepare or commit, where we should update the number of pendingTransactions on the current file
@@ -3293,7 +3298,7 @@
                                  new Exception("Warning: Couldn't open a file in 60 Seconds"));
          }
       }
-      
+
       if (trace)
       {
          JournalImpl.trace("Returning file " + nextFile);
@@ -3337,7 +3342,7 @@
    private void pushOpenedFile() throws Exception
    {
       JournalFile nextOpenedFile = getFile(true, true, true, false);
-      
+
       if (trace)
       {
          JournalImpl.trace("pushing openFile " + nextOpenedFile);
@@ -3407,7 +3412,7 @@
       }
 
    }
-   
+
    private void drainClosedFiles()
    {
       JournalFile file;
@@ -3548,7 +3553,7 @@
       else
       {
          final int position = bufferPos + size;
-         
+
          return position > fileSize || position < 0;
 
       }

Modified: trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java	2010-08-12 16:31:35 UTC (rev 9531)
+++ trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java	2010-08-12 19:35:52 UTC (rev 9532)
@@ -16,12 +16,11 @@
 import java.io.File;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Executor;
 
@@ -775,9 +774,6 @@
 
       Map<Long, ServerMessage> messages = new HashMap<Long, ServerMessage>();
       
-      // used to identify messages that are not referenced
-      Set<Long> referencedMessages = new HashSet<Long>();
-
       JournalLoadInformation info = messageJournal.load(records,
                                                         preparedTransactions,
                                                         new LargeMessageTXFailureCallback(messages));
@@ -786,8 +782,19 @@
 
       Map<Long, Map<Long, AddMessageRecord>> queueMap = new HashMap<Long, Map<Long, AddMessageRecord>>();
 
-      for (RecordInfo record : records)
+      final int totalSize = records.size();
+      
+      for (int reccount = 0 ; reccount < totalSize; reccount++)
       {
+         // It will show log.info only with large journals (more than 1 million records)
+         if (reccount> 0 && reccount % 1000000 == 0)
+         {
+            long percent = (long)((((double)reccount) / ((double)totalSize)) * 100f);
+            
+            log.info(percent + "% loaded");
+         }
+         
+         RecordInfo record = records.get(reccount);
          byte[] data = record.data;
 
          HornetQBuffer buff = HornetQBuffers.wrappedBuffer(data);
@@ -839,8 +846,6 @@
                {
                   throw new IllegalStateException("Cannot find message " + record.id);
                }
-               
-               referencedMessages.add(messageID);
 
                queueMessages.put(messageID, new AddMessageRecord(message));
 
@@ -969,7 +974,16 @@
                throw new IllegalStateException("Invalid record type " + recordType);
             }
          }
+         
+         // This will free up memory sooner. The record is not needed any more
+         // and its byte array would consume memory during the load process even though it's not necessary any longer
+         // what would delay processing time during load
+         records.set(reccount, null);
       }
+      
+      // Release the memory as soon as not needed any longer
+      records.clear();
+      records = null;
 
       for (Map.Entry<Long, Map<Long, AddMessageRecord>> entry : queueMap.entrySet())
       {
@@ -978,8 +992,10 @@
          Map<Long, AddMessageRecord> queueRecords = entry.getValue();
 
          Queue queue = queues.get(queueID);
+         
+         Collection<AddMessageRecord> valueRecords = queueRecords.values();
 
-         for (AddMessageRecord record : queueRecords.values())
+         for (AddMessageRecord record : valueRecords)
          {
             long scheduledDeliveryTime = record.scheduledDeliveryTime;
 
@@ -1013,11 +1029,9 @@
       
       for (ServerMessage msg : messages.values())
       {
-         if (!referencedMessages.contains(msg.getMessageID()))
+         if (msg.getRefCount() == 0)
          {
             log.info("Deleting unreferenced message id=" + msg.getMessageID() + " from the journal");
-            // Something after routing could delete messages
-            // So we ignore eventual ignores
             try
             {
                deleteMessage(msg.getMessageID());
@@ -2128,6 +2142,8 @@
       long scheduledDeliveryTime;
 
       int deliveryCount;
+      
+      boolean referenced = false;
    }
 
    private class LargeMessageTXFailureCallback implements TransactionFailureCallback

Modified: trunk/src/main/org/hornetq/core/server/impl/ScheduledDeliveryHandlerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/ScheduledDeliveryHandlerImpl.java	2010-08-12 16:31:35 UTC (rev 9531)
+++ trunk/src/main/org/hornetq/core/server/impl/ScheduledDeliveryHandlerImpl.java	2010-08-12 19:35:52 UTC (rev 9532)
@@ -52,7 +52,7 @@
    {
       long deliveryTime = ref.getScheduledDeliveryTime();
 
-      if (deliveryTime > 0 && scheduledExecutor != null)
+      if (deliveryTime > System.currentTimeMillis() && scheduledExecutor != null)
       {
          if (ScheduledDeliveryHandlerImpl.trace)
          {

Modified: trunk/tests/src/org/hornetq/tests/util/ServiceTestBase.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/util/ServiceTestBase.java	2010-08-12 16:31:35 UTC (rev 9531)
+++ trunk/tests/src/org/hornetq/tests/util/ServiceTestBase.java	2010-08-12 19:35:52 UTC (rev 9532)
@@ -36,7 +36,6 @@
 import org.hornetq.core.remoting.impl.netty.NettyConnectorFactory;
 import org.hornetq.core.server.HornetQServer;
 import org.hornetq.core.server.HornetQServers;
-import org.hornetq.core.server.JournalType;
 import org.hornetq.core.settings.impl.AddressSettings;
 import org.hornetq.jms.client.HornetQBytesMessage;
 import org.hornetq.jms.client.HornetQTextMessage;



More information about the hornetq-commits mailing list