[hornetq-commits] JBoss hornetq SVN: r10774 - in branches/Branch_2_2_EAP: src/main/org/hornetq/core/persistence/impl/journal and 2 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Sun Jun 5 23:12:34 EDT 2011


Author: clebert.suconic at jboss.com
Date: 2011-06-05 23:12:33 -0400 (Sun, 05 Jun 2011)
New Revision: 10774

Modified:
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/journal/impl/JournalFilesRepository.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/journal/impl/JournalImpl.java
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
   branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/PagingTest.java
   branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/journal/NIOJournalCompactTest.java
Log:
HORNETQ-714 - fixing out of ordering issue after compacting

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/journal/impl/JournalFilesRepository.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/journal/impl/JournalFilesRepository.java	2011-06-03 15:21:19 UTC (rev 10773)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/journal/impl/JournalFilesRepository.java	2011-06-06 03:12:33 UTC (rev 10774)
@@ -42,6 +42,10 @@
    private static final Logger log = Logger.getLogger(JournalFilesRepository.class);
 
    private static final boolean trace = JournalFilesRepository.log.isTraceEnabled();
+   
+   // Used to debug the consistency of the journal ordering.
+   // This is meant to be false as these extra checks would cause performance issues
+   private static final boolean CHECK_CONSISTENCE = false;
 
    // This method exists just to make debug easier.
    // I could replace log.trace by log.info temporarily while I was debugging
@@ -56,6 +60,8 @@
    // Attributes ----------------------------------------------------
 
    private final SequentialFileFactory fileFactory;
+   
+   private final JournalImpl journal;
 
    private final BlockingDeque<JournalFile> dataFiles = new LinkedBlockingDeque<JournalFile>();
 
@@ -78,12 +84,30 @@
    private final int userVersion;
 
    private Executor openFilesExecutor;
+   
+   private Runnable pushOpenRunnable = new Runnable()
+   {
+      public void run()
+      {
+         try
+         {
+            pushOpenedFile();
+         }
+         catch (Exception e)
+         {
+            JournalFilesRepository.log.error(e.getMessage(), e);
+         }
+      }
+   };
 
+
+
    // Static --------------------------------------------------------
 
    // Constructors --------------------------------------------------
-
+   
    public JournalFilesRepository(final SequentialFileFactory fileFactory,
+                                 final JournalImpl journal,
                                  final String filePrefix,
                                  final String fileExtension,
                                  final int userVersion,
@@ -98,6 +122,7 @@
       this.minFiles = minFiles;
       this.fileSize = fileSize;
       this.userVersion = userVersion;
+      this.journal = journal;
    }
 
    // Public --------------------------------------------------------
@@ -233,11 +258,96 @@
    public void addDataFileOnTop(final JournalFile file)
    {
       dataFiles.addFirst(file);
+      
+      if (CHECK_CONSISTENCE)
+      {
+      	checkDataFiles();
+      }
    }
+   
+   public String debugFiles()
+   {
+      StringBuffer buffer = new StringBuffer();
+      
+      buffer.append("**********\nCurrent File = "  + journal.getCurrentFile() + "\n");
+      buffer.append("**********\nDataFiles:\n");
+      for (JournalFile file : dataFiles)
+      {
+         buffer.append(file.toString() + "\n");
+      }
+      buffer.append("*********\nFreeFiles:\n");
+      for (JournalFile file : freeFiles)
+      {
+         buffer.append(file.toString() + "\n");
+      }
+      return buffer.toString();
+   }
+   
+   public synchronized void checkDataFiles()
+   {
+      long seq = -1;
+      for (JournalFile file : dataFiles)
+      {
+         if (file.getFileID() <= seq)
+         {
+            log.info("CheckDataFiles:");
+            log.info(debugFiles());
+            log.info("Sequence out of order on journal");
+            System.exit(-1);
+         }
+         
+         if (journal.getCurrentFile() != null && journal.getCurrentFile().getFileID() <= file.getFileID())
+         {
+            log.info("CheckDataFiles:");
+            log.info(debugFiles());
+            log.info("CurrentFile on the journal is <= the sequence file.getFileID=" + file.getFileID() + " on the dataFiles");
+            log.info("Currentfile.getFileId=" + journal.getCurrentFile().getFileID() + " while the file.getFileID()=" + file.getFileID());
+            log.info("IsSame = (" + (journal.getCurrentFile() == file) + ")");
+            
+           // throw new RuntimeException ("Check failure!");
+         }
+         
+         if (journal.getCurrentFile() == file)
+         {
+            throw new RuntimeException ("Check failure! Current file listed as data file!");
+         }
+         
+         seq = file.getFileID();
+      }
+      
+      long lastFreeId = -1;
+      for (JournalFile file : freeFiles)
+      {
+         if (file.getFileID() <= lastFreeId)
+         {
+            log.info("CheckDataFiles:");
+            log.info(debugFiles());
+            log.info("FreeFileID out of order ");
+            
+            throw new RuntimeException ("Check failure!");
+         }
+         
+         lastFreeId= file.getFileID();
+         
+         if (file.getFileID() < seq)
+         {
+            log.info("CheckDataFiles:");
+            log.info(debugFiles());
+            log.info("A FreeFile is less then the maximum data");
+            
+           // throw new RuntimeException ("Check failure!");
+         }
+      }
+   }
 
    public void addDataFileOnBottom(final JournalFile file)
    {
       dataFiles.add(file);
+      
+      if (CHECK_CONSISTENCE)
+      {
+      	checkDataFiles();
+      }
    }
 
    // Free File Operations ==========================================
@@ -254,6 +364,11 @@
    public void addFreeFileNoInit(final JournalFile file)
    {
       freeFiles.add(file);
+      
+      if (CHECK_CONSISTENCE)
+      {
+      	checkDataFiles();
+      }
    }
 
    /**
@@ -302,6 +417,11 @@
       {
          file.getFile().delete();
       }
+      
+      if (CHECK_CONSISTENCE)
+      {
+      	checkDataFiles();
+      }
    }
 
    public Collection<JournalFile> getFreeFiles()
@@ -333,28 +453,13 @@
          JournalFilesRepository.trace("enqueueOpenFile with openedFiles.size=" + openedFiles.size());
       }
 
-      Runnable run = new Runnable()
-      {
-         public void run()
-         {
-            try
-            {
-               pushOpenedFile();
-            }
-            catch (Exception e)
-            {
-               JournalFilesRepository.log.error(e.getMessage(), e);
-            }
-         }
-      };
-
       if (openFilesExecutor == null)
       {
-         run.run();
+         pushOpenRunnable.run();
       }
       else
       {
-         openFilesExecutor.execute(run);
+         openFilesExecutor.execute(pushOpenRunnable);
       }
 
       JournalFile nextFile = null;

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/journal/impl/JournalImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/journal/impl/JournalImpl.java	2011-06-03 15:21:19 UTC (rev 10773)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/journal/impl/JournalImpl.java	2011-06-06 03:12:33 UTC (rev 10774)
@@ -292,6 +292,7 @@
       this.fileFactory = fileFactory;
 
       filesRepository = new JournalFilesRepository(fileFactory,
+                                                   this,
                                                    filePrefix,
                                                    fileExtension,
                                                    userVersion,
@@ -1484,7 +1485,7 @@
    public synchronized JournalLoadInformation load(final List<RecordInfo> committedRecords,
                                                    final List<PreparedTransactionInfo> preparedTransactions,
                                                    final TransactionFailureCallback failureCallback,
-                                                   final boolean fixBadTX) throws Exception
+                                                   final boolean changeData) throws Exception
    {
       final Set<Long> recordsToDelete = new HashSet<Long>();
       // ArrayList was taking too long to delete elements on checkDeleteSize
@@ -1554,7 +1555,7 @@
                failureCallback.failedTransaction(transactionID, records, recordsToDelete);
             }
          }
-      }, fixBadTX);
+      }, changeData);
 
       for (RecordInfo record : records)
       {
@@ -1640,16 +1641,8 @@
 
       try
       {
-         if (JournalImpl.trace)
-         {
-            JournalImpl.trace("Starting compacting operation on journal");
-         }
+         log.debug("Starting compacting operation on journal");
 
-         if (JournalImpl.TRACE_RECORDS)
-         {
-            JournalImpl.traceRecord("Starting compacting operation on journal");
-         }
-
          onCompactStart();
 
          // We need to guarantee that the journal is frozen for this short time
@@ -1806,16 +1799,8 @@
          renameFiles(dataFilesToProcess, newDatafiles);
          deleteControlFile(controlFile);
 
-         if (JournalImpl.trace)
-         {
-            trace("Finished compacting on journal");
-         }
-
-         if (JournalImpl.TRACE_RECORDS)
-         {
-            JournalImpl.traceRecord("Finished compacting on journal");
-         }
-
+         log.debug("Finished compacting on journal");
+         
       }
       finally
       {
@@ -1879,7 +1864,7 @@
       return load(loadManager, true);
    }
    
-   public synchronized JournalLoadInformation load(final LoaderCallback loadManager, boolean fixFailingTransactions) throws Exception
+   public synchronized JournalLoadInformation load(final LoaderCallback loadManager, final boolean changeData) throws Exception
    {
       if (state != JournalImpl.STATE_STARTED)
       {
@@ -2167,8 +2152,11 @@
          }
          else
          {
-            // Empty dataFiles with no data
-            filesRepository.addFreeFileNoInit(file);
+            if (changeData)
+            {
+               // Empty dataFiles with no data
+               filesRepository.addFreeFile(file, false);
+            }
          }
       }
 
@@ -2206,7 +2194,7 @@
             JournalImpl.log.warn("Uncommitted transaction with id " + transaction.transactionID +
                                  " found and discarded");
 
-            if (fixFailingTransactions)
+            if (changeData)
             {
                // I append a rollback record here, because otherwise compacting will be throwing messages because of unknown transactions
                this.appendRollbackRecord(transaction.transactionID, false);
@@ -2998,7 +2986,7 @@
 
       if (JournalImpl.trace)
       {
-         JournalImpl.trace("moveNextFile: " + currentFile);
+         log.trace("moveNextFile: " + currentFile);
       }
 
       fileFactory.activateBuffer(currentFile.getFile());

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java	2011-06-03 15:21:19 UTC (rev 10773)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java	2011-06-06 03:12:33 UTC (rev 10774)
@@ -910,11 +910,13 @@
 
                if (message == null)
                {
-                  throw new IllegalStateException("Cannot find message " + record.id);
+                  log.error("Cannot find message " + record.id);
                }
+               else
+               {
+                  queueMessages.put(messageID, new AddMessageRecord(message));
+               }
 
-               queueMessages.put(messageID, new AddMessageRecord(message));
-
                break;
             }
             case ACKNOWLEDGE_REF:
@@ -929,14 +931,16 @@
 
                if (queueMessages == null)
                {
-                  throw new IllegalStateException("Cannot find queue messages " + encoding.queueID);
+                  log.error("Cannot find queue messages for queueID=" + encoding.queueID + " on ack for messageID=" + messageID);
                }
-
-               AddMessageRecord rec = queueMessages.remove(messageID);
-
-               if (rec == null)
+               else
                {
-                  throw new IllegalStateException("Cannot find message " + messageID);
+                  AddMessageRecord rec = queueMessages.remove(messageID);
+   
+                  if (rec == null)
+                  {
+                     log.error("Cannot find message " + messageID);
+                  }
                }
 
                break;
@@ -1008,18 +1012,23 @@
 
                if (queueMessages == null)
                {
-                  throw new IllegalStateException("Cannot find queue messages " + encoding.queueID);
+                  log.error("Cannot find queue messages " + encoding.queueID + " for message " + messageID + " while processing scheduled messages");
                }
-
-               AddMessageRecord rec = queueMessages.get(messageID);
-
-               if (rec == null)
+               else
                {
-                  throw new IllegalStateException("Cannot find message " + messageID);
+   
+                  AddMessageRecord rec = queueMessages.get(messageID);
+   
+                  if (rec == null)
+                  {
+                     log.error("Cannot find message " + messageID);
+                  }
+                  else
+                  {
+                     rec.scheduledDeliveryTime = encoding.scheduledDeliveryTime;
+                  }
                }
 
-               rec.scheduledDeliveryTime = encoding.scheduledDeliveryTime;
-
                break;
             }
             case DUPLICATE_ID:

Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/PagingTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/PagingTest.java	2011-06-03 15:21:19 UTC (rev 10773)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/PagingTest.java	2011-06-06 03:12:33 UTC (rev 10774)
@@ -3677,11 +3677,24 @@
       }
    }
    
+   public void testLoop() throws Exception
+   {
+      for (int i = 0 ; i < 1000; i++)
+      {
+         log.warn("#test " + i);
+         testDLAOnLargeMessageAndPaging();
+         tearDown();
+         setUp();
+      }
+      
+   }
+   
    public void testDLAOnLargeMessageAndPaging() throws Exception
    {
       clearData();
 
       Configuration config = createDefaultConfig();
+      config.setThreadPoolMaxSize(5);
 
       config.setJournalSyncNonTransactional(false);
 
@@ -3776,12 +3789,14 @@
             assertNotNull("Message " + i + " wasn't received", message);
             message.acknowledge();
             
+            final AtomicInteger bytesOutput = new AtomicInteger(0);
+            
             message.setOutputStream(new OutputStream()
             {
                @Override
                public void write(int b) throws IOException
                {
-
+                  bytesOutput.incrementAndGet();
                }
             });
 
@@ -3795,8 +3810,10 @@
             }
             catch (Throwable e)
             {
+               log.info("output bytes = " + bytesOutput);
                log.info(threadDump("dump"));
-               fail("Couldn't finish large message receiving for id=" + message.getStringProperty("id") + " with messageID=" + message.getMessageID());
+               fail("Couldn't finish large message receiving for id=" + 
+                    message.getStringProperty("id") + " with messageID=" + message.getMessageID());
             }
 
          }

Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/journal/NIOJournalCompactTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/journal/NIOJournalCompactTest.java	2011-06-03 15:21:19 UTC (rev 10773)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/journal/NIOJournalCompactTest.java	2011-06-06 03:12:33 UTC (rev 10774)
@@ -779,8 +779,10 @@
             }
          }
       }
+      
+      long lastId = idGenerator.generateID();
 
-      add(idGenerator.generateID());
+      add(lastId);
 
       if (createControlFile && deleteControlFile && renameFilesAfterCompacting)
       {
@@ -791,6 +793,15 @@
       createJournal();
       startJournal();
       loadAndCheck();
+  
+      journal.forceMoveNextFile();
+      update(lastId);
+      
+      stopJournal();
+      createJournal();
+      startJournal();
+      loadAndCheck();
+      
 
    }
 



More information about the hornetq-commits mailing list