[jboss-cvs] JBoss Messaging SVN: r7167 - in trunk/src/main/org/jboss/messaging/core: journal/impl and 1 other directory.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Tue Jun 2 16:21:38 EDT 2009


Author: timfox
Date: 2009-06-02 16:21:38 -0400 (Tue, 02 Jun 2009)
New Revision: 7167

Modified:
   trunk/src/main/org/jboss/messaging/core/asyncio/impl/TimedBuffer.java
   trunk/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFile.java
Log:
fix timing on aio journal

Modified: trunk/src/main/org/jboss/messaging/core/asyncio/impl/TimedBuffer.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/asyncio/impl/TimedBuffer.java	2009-06-02 18:42:25 UTC (rev 7166)
+++ trunk/src/main/org/jboss/messaging/core/asyncio/impl/TimedBuffer.java	2009-06-02 20:21:38 UTC (rev 7167)
@@ -25,17 +25,13 @@
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.TimeUnit;
+import java.util.Timer;
+import java.util.TimerTask;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import org.jboss.messaging.core.asyncio.AIOCallback;
 import org.jboss.messaging.core.logging.Logger;
-import org.jboss.messaging.utils.JBMThreadFactory;
 
 /**
  * A TimedBuffer
@@ -56,21 +52,19 @@
 
    private final CheckTimer timerRunnable = new CheckTimer();
 
-   private volatile ScheduledFuture<?> futureTimerRunnable;
-
    private final long timeout;
 
    private final int bufferSize;
 
    private final ByteBuffer currentBuffer;
 
-   private volatile List<AIOCallback> callbacks;
+   private List<AIOCallback> callbacks;
 
-   private volatile long timeLastWrite = 0;
+   private final Lock lock = new ReentrantReadWriteLock().writeLock();
 
-   private final ScheduledExecutorService schedule = ScheduledSingleton.getScheduledService();
+   private final Timer timer;
 
-   private Lock lock = new ReentrantReadWriteLock().writeLock();
+   private long lastFlushTime;
 
    // Static --------------------------------------------------------
 
@@ -78,46 +72,26 @@
 
    // Public --------------------------------------------------------
 
-   // private byte[] data;
-
    public TimedBuffer(final TimedBufferObserver bufferObserver, final int size, final long timeout)
    {
-      bufferSize = size;
+      bufferSize = size;      
       this.bufferObserver = bufferObserver;
       this.timeout = timeout;
       this.currentBuffer = ByteBuffer.wrap(new byte[bufferSize]);
       this.currentBuffer.limit(0);
       this.callbacks = new ArrayList<AIOCallback>();
+      this.timer = new Timer("jbm-timed-buffer", true);
+      
+      this.timer.schedule(timerRunnable, timeout, timeout);
    }
-
-   public int position()
+   
+   public synchronized void close()
    {
-      if (currentBuffer == null)
-      {
-         return 0;
-      }
-      else
-      {
-         return currentBuffer.position();
-      }
+      timerRunnable.cancel();
+      
+      timer.cancel();
    }
 
-   public void checkTimer()
-   {
-      if (System.currentTimeMillis() - timeLastWrite > timeout)
-      {
-         lock.lock();
-         try
-         {
-            flush();
-         }
-         finally
-         {
-            lock.unlock();
-         }
-      }
-   }
-
    public void lock()
    {
       lock.lock();
@@ -141,13 +115,12 @@
                                          ") on the journal");
       }
 
-      
-      if (currentBuffer.limit() == 0 ||  currentBuffer.position() + sizeChecked > currentBuffer.limit())
+      if (currentBuffer.limit() == 0 || currentBuffer.position() + sizeChecked > currentBuffer.limit())
       {
          flush();
 
          final int remaining = bufferObserver.getRemainingBytes();
-         
+
          if (sizeChecked > remaining)
          {
             return false;
@@ -171,13 +144,6 @@
       currentBuffer.put(bytes);
       callbacks.add(callback);
 
-      if (futureTimerRunnable == null)
-      {
-         futureTimerRunnable = schedule.scheduleAtFixedRate(timerRunnable, timeout, timeout, TimeUnit.MILLISECONDS);
-      }
-
-      timeLastWrite = System.currentTimeMillis();
-
       if (currentBuffer.position() == currentBuffer.capacity())
       {
          flush();
@@ -191,22 +157,17 @@
          ByteBuffer directBuffer = bufferObserver.newBuffer(bufferSize, currentBuffer.position());
 
          currentBuffer.flip();
-         
+
          directBuffer.put(currentBuffer);
 
          bufferObserver.flushBuffer(directBuffer, callbacks);
-         
+
          callbacks = new ArrayList<AIOCallback>();
       }
 
-      if (futureTimerRunnable != null)
-      {
-         futureTimerRunnable.cancel(false);
-         futureTimerRunnable = null;
-      }
-
-      timeLastWrite = 0;
       currentBuffer.limit(0);
+
+      this.lastFlushTime = System.currentTimeMillis();
    }
 
    // Package protected ---------------------------------------------
@@ -214,32 +175,45 @@
    // Protected -----------------------------------------------------
 
    // Private -------------------------------------------------------
-
-   // Inner classes -------------------------------------------------
-
-   class CheckTimer implements Runnable
+   
+   private void checkTimer()
    {
-      public void run()
+      if (System.currentTimeMillis() - lastFlushTime >= timeout)
       {
-         checkTimer();
+         lock.lock();
+         try
+         {
+            flush();
+         }
+         finally
+         {
+            lock.unlock();
+         }
       }
    }
 
-   // TODO: is there a better place to get this schedule service from?
-   static class ScheduledSingleton
+
+   // Inner classes -------------------------------------------------
+
+   class CheckTimer extends TimerTask
    {
-      private static ScheduledExecutorService scheduleService;
+      private boolean cancelled;
 
-      private static synchronized ScheduledExecutorService getScheduledService()
+      @Override
+      public synchronized void run()
       {
-         if (scheduleService == null)
+         if (!cancelled)
          {
-            ThreadFactory factory = new JBMThreadFactory("JBM-buffer-scheduled-control", true);
-
-            scheduleService = Executors.newScheduledThreadPool(2, factory);
+            checkTimer();
          }
+      }
 
-         return scheduleService;
+      @Override
+      public synchronized boolean cancel()
+      {
+         cancelled = true;
+
+         return super.cancel();
       }
    }
 

Modified: trunk/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFile.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFile.java	2009-06-02 18:42:25 UTC (rev 7166)
+++ trunk/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFile.java	2009-06-02 20:21:38 UTC (rev 7167)
@@ -144,16 +144,15 @@
    {
       timedBuffer.unlock();
    }
-
-
    
    public synchronized void close() throws Exception
    {
       checkOpened();
       opened = false;
-      
-      
+            
       timedBuffer.flush();
+      
+      timedBuffer.close();
 
       final CountDownLatch donelatch = new CountDownLatch(1);
 
@@ -172,7 +171,7 @@
       }
 
       aioFile.close();
-      aioFile = null;
+      aioFile = null;           
    }
 
    public void delete() throws Exception




More information about the jboss-cvs-commits mailing list