[hornetq-commits] JBoss hornetq SVN: r10379 - in trunk: tests/src/org/hornetq/tests/integration/journal and 1 other directory.

do-not-reply at jboss.org do-not-reply at jboss.org
Sun Mar 27 00:58:44 EDT 2011


Author: clebert.suconic at jboss.com
Date: 2011-03-27 00:58:43 -0400 (Sun, 27 Mar 2011)
New Revision: 10379

Modified:
   trunk/src/main/org/hornetq/core/journal/impl/JournalFilesRepository.java
   trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java
   trunk/src/main/org/hornetq/core/journal/impl/JournalTransaction.java
   trunk/tests/src/org/hornetq/tests/integration/journal/NIOBufferedJournalCompactTest.java
Log:
-r10376:10378 from branch_2_2_eap

Modified: trunk/src/main/org/hornetq/core/journal/impl/JournalFilesRepository.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/JournalFilesRepository.java	2011-03-27 04:55:20 UTC (rev 10378)
+++ trunk/src/main/org/hornetq/core/journal/impl/JournalFilesRepository.java	2011-03-27 04:58:43 UTC (rev 10379)
@@ -59,8 +59,6 @@
 
    private final BlockingDeque<JournalFile> dataFiles = new LinkedBlockingDeque<JournalFile>();
 
-   private final BlockingQueue<JournalFile> pendingCloseFiles = new LinkedBlockingDeque<JournalFile>();
-
    private final ConcurrentLinkedQueue<JournalFile> freeFiles = new ConcurrentLinkedQueue<JournalFile>();
 
    private final BlockingQueue<JournalFile> openedFiles = new LinkedBlockingQueue<JournalFile>();
@@ -80,8 +78,6 @@
    private final int userVersion;
 
    private Executor openFilesExecutor;
-   
-   private Executor closeFilesExecutor;
 
    // Static --------------------------------------------------------
 
@@ -106,18 +102,15 @@
 
    // Public --------------------------------------------------------
 
-   public void setExecutor(final Executor fileExecutor, final Executor closeExecutor)
+   public void setExecutor(final Executor fileExecutor)
    {
       this.openFilesExecutor = fileExecutor;
-      this.closeFilesExecutor = closeExecutor;
    }
 
-   public void clear()
+   public void clear() throws Exception
    {
       dataFiles.clear();
 
-      drainClosedFiles();
-
       freeFiles.clear();
 
       for (JournalFile file : openedFiles)
@@ -269,8 +262,19 @@
     */
    public synchronized void addFreeFile(final JournalFile file, final boolean renameTmp) throws Exception
    {
-      if (file.getFile().size() != fileSize)
+      long calculatedSize = 0;
+      try
       {
+         calculatedSize = file.getFile().size();
+      }
+      catch (Exception e)
+      {
+         e.printStackTrace();
+         System.out.println("Can't get file size on " + file);
+         System.exit(-1);
+      }
+      if (calculatedSize != fileSize)
+      {
          JournalFilesRepository.log.warn("Deleting " + file + ".. as it doesn't have the configured size");
          file.getFile().delete();
       }
@@ -317,23 +321,6 @@
       return openedFiles.size();
    }
 
-   public void drainClosedFiles()
-   {
-      JournalFile file;
-      try
-      {
-         while ((file = pendingCloseFiles.poll()) != null)
-         {
-            file.getFile().close();
-         }
-      }
-      catch (Exception e)
-      {
-         JournalFilesRepository.log.warn(e.getMessage(), e);
-      }
-
-   }
-
    /** 
     * <p>This method will instantly return the opened file, and schedule opening and reclaiming.</p>
     * <p>In case there are no cached opened files, this method will block until the file was opened,
@@ -406,31 +393,11 @@
       openedFiles.offer(nextOpenedFile);
    }
 
-   public void closeFile(final JournalFile file)
+   public void closeFile(final JournalFile file) throws Exception
    {
       fileFactory.deactivateBuffer();
-      pendingCloseFiles.add(file);
+      file.getFile().close();
       dataFiles.add(file);
-
-      Runnable run = new Runnable()
-      {
-         public void run()
-         {
-            drainClosedFiles();
-         }
-      };
-
-      // We can't close files while the compactor is running
-      // as we may be closing files that are being read by the compactor
-      if (closeFilesExecutor == null)
-      {
-         run.run();
-      }
-      else
-      {
-         closeFilesExecutor.execute(run);
-      }
-
    }
 
    /**

Modified: trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java	2011-03-27 04:55:20 UTC (rev 10378)
+++ trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java	2011-03-27 04:58:43 UTC (rev 10379)
@@ -1655,8 +1655,6 @@
             // We need to move to the next file, as we need a clear start for negatives and positives counts
             moveNextFile(false);
 
-            filesRepository.drainClosedFiles();
-
             // Take the snapshots and replace the structures
 
             dataFilesToProcess.addAll(filesRepository.getDataFiles());
@@ -2544,7 +2542,7 @@
          }
       });
 
-      filesRepository.setExecutor(filesExecutor, compactorExecutor);
+      filesRepository.setExecutor(filesExecutor);
 
       fileFactory.start();
 
@@ -2576,7 +2574,7 @@
 
          filesExecutor.shutdown();
 
-         filesRepository.setExecutor(null, null);
+         filesRepository.setExecutor(null);
 
          if (!filesExecutor.awaitTermination(60, TimeUnit.SECONDS))
          {
@@ -2590,8 +2588,6 @@
             currentFile.getFile().close();
          }
 
-         filesRepository.drainClosedFiles();
-
          filesRepository.clear();
 
          fileFactory.stop();
@@ -2947,14 +2943,6 @@
             callback = null;
          }
 
-         if (sync && !compactorRunning.get())
-         {
-            // In an edge case the transaction could still have pending data from previous files.
-            // This shouldn't cause any blocking issues, as this is here to guarantee we cover all possibilities
-            // on guaranteeing the data is on the disk
-            tx.syncPreviousFiles(fileFactory.isSupportsCallbacks(), currentFile);
-         }
-
          // We need to add the number of records on currentFile if prepare or commit
          if (completeTransaction)
          {
@@ -2983,7 +2971,7 @@
    }
 
    // You need to guarantee lock.acquire() before calling this method
-   private void moveNextFile(final boolean scheduleReclaim) throws InterruptedException
+   private void moveNextFile(final boolean scheduleReclaim) throws Exception
    {
       filesRepository.closeFile(currentFile);
 
@@ -3017,7 +3005,6 @@
             {
                try
                {
-                  filesRepository.drainClosedFiles();
                   if (!checkReclaimStatus())
                   {
                      checkCompact();

Modified: trunk/src/main/org/hornetq/core/journal/impl/JournalTransaction.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/JournalTransaction.java	2011-03-27 04:55:20 UTC (rev 10378)
+++ trunk/src/main/org/hornetq/core/journal/impl/JournalTransaction.java	2011-03-27 04:58:43 UTC (rev 10379)
@@ -193,38 +193,6 @@
       data.setNumberOfRecords(getCounter(currentFile));
    }
 
-   /** 99.99 % of the times previous files will be already synced, since they are scheduled to be closed.
-    *  Because of that, this operation should be almost very fast.*/
-   public void syncPreviousFiles(final boolean callbacks, final JournalFile currentFile) throws Exception
-   {
-      if (callbacks)
-      {
-         if (callbackList != null)
-         {
-            for (Map.Entry<JournalFile, TransactionCallback> entry : callbackList.entrySet())
-            {
-               if (entry.getKey() != currentFile)
-               {
-                  entry.getValue().waitCompletion();
-               }
-            }
-         }
-      }
-      else
-      {
-         if (pendingFiles != null)
-         {
-            for (JournalFile file : pendingFiles)
-            {
-               if (file != currentFile)
-               {
-                  file.getFile().waitForClose();
-               }
-            }
-         }
-      }
-   }
-
    public TransactionCallback getCallback(final JournalFile file) throws Exception
    {
       if (callbackList == null)

Modified: trunk/tests/src/org/hornetq/tests/integration/journal/NIOBufferedJournalCompactTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/journal/NIOBufferedJournalCompactTest.java	2011-03-27 04:55:20 UTC (rev 10378)
+++ trunk/tests/src/org/hornetq/tests/integration/journal/NIOBufferedJournalCompactTest.java	2011-03-27 04:58:43 UTC (rev 10379)
@@ -39,6 +39,18 @@
    // Public --------------------------------------------------------
 
    // Package protected ---------------------------------------------
+   
+   public void testLoop() throws Exception
+   {
+      int i = 0 ;
+      while (true)
+      {
+         System.out.println("#test " + (i++));
+         testOnRollback();
+         tearDown();
+         setUp();
+      }
+   }
 
    // Protected -----------------------------------------------------
    @Override



More information about the hornetq-commits mailing list