[jboss-cvs] JBoss Messaging SVN: r4701 - in trunk: src/main/org/jboss/messaging/core/journal and 5 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Mon Jul 21 19:06:11 EDT 2008


Author: clebert.suconic at jboss.com
Date: 2008-07-21 19:06:10 -0400 (Mon, 21 Jul 2008)
New Revision: 4701

Modified:
   trunk/examples/jms/src/org/jboss/jms/example/PerfExample.java
   trunk/src/main/org/jboss/messaging/core/journal/Journal.java
   trunk/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFile.java
   trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java
   trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java
   trunk/tests/src/org/jboss/messaging/tests/performance/journal/impl/JournalImplTestUnit.java
   trunk/tests/src/org/jboss/messaging/tests/timing/core/journal/impl/JournalImplTestUnit.java
   trunk/tests/src/org/jboss/messaging/tests/unit/core/persistence/impl/journal/JournalStorageManagerTest.java
Log:


Modified: trunk/examples/jms/src/org/jboss/jms/example/PerfExample.java
===================================================================
--- trunk/examples/jms/src/org/jboss/jms/example/PerfExample.java	2008-07-21 21:25:52 UTC (rev 4700)
+++ trunk/examples/jms/src/org/jboss/jms/example/PerfExample.java	2008-07-21 23:06:10 UTC (rev 4701)
@@ -219,7 +219,13 @@
          if (perfParams.isDrainQueue())
          {
             drainQueue(messageConsumer);
+            if (perfParams.isSessionTransacted())
+            {
+               log.info("commit on drainQueue");
+               session.commit();
+            }
          }
+         
 
          log.info("READY!!!");
 

Modified: trunk/src/main/org/jboss/messaging/core/journal/Journal.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/Journal.java	2008-07-21 21:25:52 UTC (rev 4700)
+++ trunk/src/main/org/jboss/messaging/core/journal/Journal.java	2008-07-21 23:06:10 UTC (rev 4701)
@@ -75,13 +75,6 @@
 
    long load(LoadManager reloadManager) throws Exception;
 
-	
-	// Start and stop reclaimer
-	
-	void startReclaimer();
-	
-	void stopReclaimer();
-	
 	int getAlignment() throws Exception;
 	
 }

Modified: trunk/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFile.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFile.java	2008-07-21 21:25:52 UTC (rev 4700)
+++ trunk/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFile.java	2008-07-21 23:06:10 UTC (rev 4701)
@@ -287,6 +287,7 @@
 
       public void onError(int errorCode, String errorMessage)
       {
+         log.warn("Error on writing data!" + errorMessage + " code - " + errorCode, new Exception (errorMessage));
       }	   
 	}
 	

Modified: trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java	2008-07-21 21:25:52 UTC (rev 4700)
+++ trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java	2008-07-21 23:06:10 UTC (rev 4701)
@@ -175,14 +175,8 @@
    
    private final ConcurrentMap<Long, TransactionCallback> transactionCallbacks = new ConcurrentHashMap<Long, TransactionCallback>();
    
-   private ExecutorService closingExecutor = null;
+   private ExecutorService filesExecutor = null;
    
-   /** 
-    * We have a separated executor for open, as if we used the same executor this would still represent
-    * a point of wait between the closing and open.
-    * */
-   private ExecutorService openExecutor = null;
-   
    /*
     * We use a semaphore rather than synchronized since it performs better when
     * contended
@@ -200,8 +194,6 @@
    
    private Reclaimer reclaimer = new Reclaimer();
    
-   private Thread shutdownHook = null;
-   
    // Static --------------------------------------------------------
    
    private static final Logger log = Logger.getLogger(JournalImpl.class);
@@ -682,8 +674,6 @@
          throw new IllegalStateException("Journal must be in started state");
       }
       
-      addShutdownHook();
-      
       Map<Long, TransactionHolder> transactions = new LinkedHashMap<Long, TransactionHolder>();
       
       List<JournalFile> orderedFiles = orderFiles();
@@ -1200,12 +1190,12 @@
          callback.waitCompletion(aioTimeout);
       }
       
-      if (closingExecutor != null && !closingExecutor.isShutdown())
+      if (filesExecutor != null && !filesExecutor.isShutdown())
       {
          // Send something to the closingExecutor, just to make sure we went until its end
          final CountDownLatch latch = new CountDownLatch(1);
          
-         this.closingExecutor.execute(new Runnable()
+         this.filesExecutor.execute(new Runnable()
          {
             public void run()
             {
@@ -1216,25 +1206,9 @@
          latch.await();
       }
       
-      if (openExecutor != null && !openExecutor.isShutdown())
-      {
-         // Send something to the closingExecutor, just to make sure we went until its end
-         final CountDownLatch latch = new CountDownLatch(1);
-         
-         this.openExecutor.execute(new Runnable()
-         {
-            public void run()
-            {
-               latch.countDown();
-            }
-         });
-         
-         latch.await();
-      }
-      
    }
    
-   public synchronized void checkAndReclaimFiles() throws Exception
+   public void checkAndReclaimFiles() throws Exception
    {
       checkReclaimStatus();
       
@@ -1386,8 +1360,7 @@
          throw new IllegalStateException("Journal is not stopped");
       }
       
-      this.openExecutor =  Executors.newSingleThreadExecutor();
-      this.closingExecutor = Executors.newSingleThreadExecutor();
+      this.filesExecutor =  Executors.newSingleThreadExecutor();
       
       state = STATE_STARTED;
 
@@ -1395,28 +1368,18 @@
    
    public synchronized void stop() throws Exception
    {
-      clearShutdownHook();
-
       if (state == STATE_STOPPED)
       {
          throw new IllegalStateException("Journal is already stopped");
       }
       
-      stopReclaimer();
-      
-      closingExecutor.shutdown();
-      if (!closingExecutor.awaitTermination(aioTimeout, TimeUnit.MILLISECONDS))
-      {
-         throw new IllegalStateException("Time out waiting for closing executor to finish");
-      }
-      
       if (currentFile != null)
       {
          currentFile.getFile().close();
       }
       
-      openExecutor.shutdown();
-      if (!openExecutor.awaitTermination(aioTimeout, TimeUnit.MILLISECONDS))
+      filesExecutor.shutdown();
+      if (!filesExecutor.awaitTermination(aioTimeout, TimeUnit.MILLISECONDS))
       {
          throw new IllegalStateException("Time out waiting for open executor to finish");
       }
@@ -1437,22 +1400,6 @@
       state = STATE_STOPPED;
    }
    
-   public void startReclaimer()
-   {
-      if (state == STATE_STOPPED)
-      {
-         throw new IllegalStateException("Journal is stopped");
-      }
-   }
-   
-   public void stopReclaimer()
-   {
-      if (state == STATE_STOPPED)
-      {
-         throw new IllegalStateException("Journal is already stopped");
-      }
-   }
-   
    // Public -----------------------------------------------------------------------------
    
    // Private -----------------------------------------------------------------------------
@@ -1636,46 +1583,6 @@
       return orderedFiles;
    }
    
-   private void clearShutdownHook()
-   {
-      if (shutdownHook != null)
-      {
-         try
-         {
-            Runtime.getRuntime().removeShutdownHook(shutdownHook);
-         }
-         catch (Throwable e)
-         {
-         }
-         shutdownHook = null;
-      }
-   }
-   
-   private void addShutdownHook()
-   {
-      
-      clearShutdownHook();
-      
-      
-      shutdownHook = new Thread()
-      {
-        public void run()
-        {
-           try
-           {
-              log.info("Journal being stopped");
-              JournalImpl.this.stop();
-           }
-           catch (Exception e)
-           {
-              log.warn(e, e);
-           }
-        }
-      };
-      Runtime.getRuntime().addShutdownHook(shutdownHook);
-
-   }
-   
    private JournalFile appendRecord(final ByteBuffer bb, final boolean sync, final TransactionCallback callback) throws Exception
    {
       lock.acquire();
@@ -1787,7 +1694,7 @@
    private JournalFile enqueueOpenFile() throws InterruptedException
    {
       if (trace) log.trace("enqueueOpenFile with openedFiles.size=" + openedFiles.size());
-      openExecutor.execute(new Runnable()
+      filesExecutor.execute(new Runnable()
       {
          public void run()
          {
@@ -1803,7 +1710,7 @@
       });
       if (autoReclaim)
       {
-         openExecutor.execute(new Runnable()
+         filesExecutor.execute(new Runnable()
          {
             public void run()
             {
@@ -1860,7 +1767,7 @@
    
    private void closeFile(final JournalFile file)
    {
-      this.closingExecutor.execute(new Runnable() { public void run()
+      this.filesExecutor.execute(new Runnable() { public void run()
       {
          try
          {

Modified: trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java	2008-07-21 21:25:52 UTC (rev 4700)
+++ trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java	2008-07-21 23:06:10 UTC (rev 4701)
@@ -561,12 +561,8 @@
 		
 		bindingsJournal.start();
 		
-		bindingsJournal.startReclaimer();
-		
 		messageJournal.start();
 		
-		messageJournal.startReclaimer();
-		
 		started = true;
 	}
 

Modified: trunk/tests/src/org/jboss/messaging/tests/performance/journal/impl/JournalImplTestUnit.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/performance/journal/impl/JournalImplTestUnit.java	2008-07-21 21:25:52 UTC (rev 4700)
+++ trunk/tests/src/org/jboss/messaging/tests/performance/journal/impl/JournalImplTestUnit.java	2008-07-21 23:06:10 UTC (rev 4701)
@@ -146,8 +146,6 @@
       startJournal();
       load();
       
-      journal.startReclaimer();
-      
       long start = System.currentTimeMillis();
       
                   

Modified: trunk/tests/src/org/jboss/messaging/tests/timing/core/journal/impl/JournalImplTestUnit.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/timing/core/journal/impl/JournalImplTestUnit.java	2008-07-21 21:25:52 UTC (rev 4700)
+++ trunk/tests/src/org/jboss/messaging/tests/timing/core/journal/impl/JournalImplTestUnit.java	2008-07-21 23:06:10 UTC (rev 4701)
@@ -143,8 +143,6 @@
       startJournal();
       load();
       
-      journal.startReclaimer();
-      
       long start = System.currentTimeMillis();
       
                   

Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/persistence/impl/journal/JournalStorageManagerTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/persistence/impl/journal/JournalStorageManagerTest.java	2008-07-21 21:25:52 UTC (rev 4700)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/persistence/impl/journal/JournalStorageManagerTest.java	2008-07-21 23:06:10 UTC (rev 4701)
@@ -708,9 +708,7 @@
       
       assertFalse(jsm.isStarted());
       bindingsJournal.start();
-      bindingsJournal.startReclaimer();
       messageJournal.start();
-      messageJournal.startReclaimer();
       
       EasyMock.replay(messageJournal, bindingsJournal);
       




More information about the jboss-cvs-commits mailing list