[jboss-cvs] JBoss Messaging SVN: r7168 - in trunk: src/main/org/jboss/messaging/core/journal and 3 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Tue Jun 2 18:18:04 EDT 2009


Author: clebert.suconic at jboss.com
Date: 2009-06-02 18:18:03 -0400 (Tue, 02 Jun 2009)
New Revision: 7168

Modified:
   trunk/src/main/org/jboss/messaging/core/asyncio/impl/TimedBuffer.java
   trunk/src/main/org/jboss/messaging/core/journal/SequentialFile.java
   trunk/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFile.java
   trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java
   trunk/src/main/org/jboss/messaging/core/journal/impl/NIOSequentialFile.java
   trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java
   trunk/tests/src/org/jboss/messaging/tests/unit/util/timedbuffer/TimedBufferTest.java
Log:
Fixes

Modified: trunk/src/main/org/jboss/messaging/core/asyncio/impl/TimedBuffer.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/asyncio/impl/TimedBuffer.java	2009-06-02 20:21:38 UTC (rev 7167)
+++ trunk/src/main/org/jboss/messaging/core/asyncio/impl/TimedBuffer.java	2009-06-02 22:18:03 UTC (rev 7168)
@@ -25,13 +25,17 @@
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
-import java.util.Timer;
-import java.util.TimerTask;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import org.jboss.messaging.core.asyncio.AIOCallback;
 import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.utils.JBMThreadFactory;
 
 /**
  * A TimedBuffer
@@ -52,6 +56,8 @@
 
    private final CheckTimer timerRunnable = new CheckTimer();
 
+   private volatile ScheduledFuture<?> futureTimerRunnable;
+
    private final long timeout;
 
    private final int bufferSize;
@@ -62,10 +68,14 @@
 
    private final Lock lock = new ReentrantReadWriteLock().writeLock();
 
-   private final Timer timer;
+   private final ScheduledExecutorService schedule = ScheduledSingleton.getScheduledService();
 
-   private long lastFlushTime;
+   // used to measure inactivity. This buffer will be automatically flushed when more than timeout inactive
+   private volatile long timeLastAdd = 0;
 
+   // used to measure sync requests. When a sync is requested, it shouldn't take more than timeout to happen
+   private volatile long timeLastSync = 0;
+
    // Static --------------------------------------------------------
 
    // Constructors --------------------------------------------------
@@ -74,23 +84,13 @@
 
    public TimedBuffer(final TimedBufferObserver bufferObserver, final int size, final long timeout)
    {
-      bufferSize = size;      
+      bufferSize = size;
       this.bufferObserver = bufferObserver;
       this.timeout = timeout;
-      this.currentBuffer = ByteBuffer.wrap(new byte[bufferSize]);
-      this.currentBuffer.limit(0);
-      this.callbacks = new ArrayList<AIOCallback>();
-      this.timer = new Timer("jbm-timed-buffer", true);
-      
-      this.timer.schedule(timerRunnable, timeout, timeout);
+      currentBuffer = ByteBuffer.wrap(new byte[bufferSize]);
+      currentBuffer.limit(0);
+      callbacks = new ArrayList<AIOCallback>();
    }
-   
-   public synchronized void close()
-   {
-      timerRunnable.cancel();
-      
-      timer.cancel();
-   }
 
    public void lock()
    {
@@ -102,8 +102,17 @@
       lock.unlock();
    }
 
+   /** used to determine that a sync happened, and we should schedule flush to not take more than timeout no matter the activity on the buffer */
+   public synchronized void sync()
+   {
+      if (timeLastSync == 0)
+      {
+         timeLastSync = System.currentTimeMillis();
+      }
+   }
+
    /**
-    * Verify if the size fits the buffer, if it fits we lock the buffer to avoid a flush until add is called
+    * Verify if the size fits the buffer
     * @param sizeChecked
     * @return
     */
@@ -139,11 +148,22 @@
 
    }
 
-   public synchronized void addBytes(final ByteBuffer bytes, final AIOCallback callback)
+   public synchronized void addBytes(final ByteBuffer bytes, final boolean sync, final AIOCallback callback)
    {
+      timeLastAdd = System.currentTimeMillis();
+      if (sync)
+      {
+         this.timeLastSync = timeLastAdd;
+      }
+
       currentBuffer.put(bytes);
       callbacks.add(callback);
 
+      if (futureTimerRunnable == null)
+      {
+         futureTimerRunnable = schedule.scheduleAtFixedRate(timerRunnable, timeout, timeout, TimeUnit.MILLISECONDS);
+      }
+      
       if (currentBuffer.position() == currentBuffer.capacity())
       {
          flush();
@@ -163,11 +183,19 @@
          bufferObserver.flushBuffer(directBuffer, callbacks);
 
          callbacks = new ArrayList<AIOCallback>();
+
       }
 
+      if (futureTimerRunnable != null)
+      {
+         futureTimerRunnable.cancel(false);
+         futureTimerRunnable = null;
+      }
+
+      timeLastAdd = 0;
+      timeLastSync = 0;
+
       currentBuffer.limit(0);
-
-      this.lastFlushTime = System.currentTimeMillis();
    }
 
    // Package protected ---------------------------------------------
@@ -175,10 +203,14 @@
    // Protected -----------------------------------------------------
 
    // Private -------------------------------------------------------
-   
+
    private void checkTimer()
    {
-      if (System.currentTimeMillis() - lastFlushTime >= timeout)
+      final long now = System.currentTimeMillis();
+
+      // if inactive for more than the timeout
+      // of if a sync happened at more than the the timeout ago
+      if (now - timeLastAdd >= timeout || timeLastSync != 0 && now - timeLastSync >= timeout)
       {
          lock.lock();
          try
@@ -192,28 +224,31 @@
       }
    }
 
-
    // Inner classes -------------------------------------------------
 
-   class CheckTimer extends TimerTask
+   class CheckTimer implements Runnable
    {
-      private boolean cancelled;
+      public void run()
+      {
+         checkTimer();
+      }
+   }
 
-      @Override
-      public synchronized void run()
+   // TODO: is there a better place to get this schedule service from?
+   static class ScheduledSingleton
+   {
+      private static ScheduledExecutorService scheduleService;
+
+      private static synchronized ScheduledExecutorService getScheduledService()
       {
-         if (!cancelled)
+         if (scheduleService == null)
          {
-            checkTimer();
+            ThreadFactory factory = new JBMThreadFactory("JBM-buffer-scheduled-control", true);
+
+            scheduleService = Executors.newSingleThreadScheduledExecutor(factory);
          }
-      }
 
-      @Override
-      public synchronized boolean cancel()
-      {
-         cancelled = true;
-
-         return super.cancel();
+         return scheduleService;
       }
    }
 

Modified: trunk/src/main/org/jboss/messaging/core/journal/SequentialFile.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/SequentialFile.java	2009-06-02 20:21:38 UTC (rev 7167)
+++ trunk/src/main/org/jboss/messaging/core/journal/SequentialFile.java	2009-06-02 22:18:03 UTC (rev 7168)
@@ -62,7 +62,7 @@
 
    void delete() throws Exception;
 
-   void write(ByteBuffer bytes, IOCallback callback) throws Exception;
+   void write(ByteBuffer bytes, boolean sync, IOCallback callback) throws Exception;
 
    void write(ByteBuffer bytes, boolean sync) throws Exception;
 

Modified: trunk/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFile.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFile.java	2009-06-02 20:21:38 UTC (rev 7167)
+++ trunk/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFile.java	2009-06-02 22:18:03 UTC (rev 7168)
@@ -151,8 +151,6 @@
       opened = false;
             
       timedBuffer.flush();
-      
-      timedBuffer.close();
 
       final CountDownLatch donelatch = new CountDownLatch(1);
 
@@ -303,11 +301,11 @@
       return bytesRead;
    }
 
-   public void write(final ByteBuffer bytes, final IOCallback callback) throws Exception
+   public void write(final ByteBuffer bytes, final boolean sync, final IOCallback callback) throws Exception
    {
       if (buffering)
       {
-         timedBuffer.addBytes(bytes, callback);
+         timedBuffer.addBytes(bytes, sync, callback);
       }
       else
       {
@@ -321,13 +319,13 @@
       {
          IOCallback completion = SimpleWaitIOCallback.getInstance();
 
-         write(bytes, completion);
+         write(bytes, true, completion);
          
          completion.waitCompletion();
       }
       else
       {
-         write(bytes, DummyCallback.instance);
+         write(bytes, false, DummyCallback.instance);
       }
    }
 

Modified: trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java	2009-06-02 20:21:38 UTC (rev 7167)
+++ trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java	2009-06-02 22:18:03 UTC (rev 7168)
@@ -1477,6 +1477,11 @@
     *  It will call waitComplete on every transaction, so any assertions on the file system will be correct after this */
    public void debugWait() throws Exception
    {
+      if (currentFile != null)
+      {
+         currentFile.getFile().flush();
+      }
+      
       for (TransactionCallback callback : transactionCallbacks.values())
       {
          callback.waitCompletion();
@@ -2023,7 +2028,7 @@
 
          if (callback != null)
          {
-            currentFile.getFile().write(bb, callback);
+            currentFile.getFile().write(bb, sync, callback);
 
             // This is defaulted to false. The user is telling us to not wait the buffer timeout when a commit or sync is called
             if (flushOnSync && sync)

Modified: trunk/src/main/org/jboss/messaging/core/journal/impl/NIOSequentialFile.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/impl/NIOSequentialFile.java	2009-06-02 20:21:38 UTC (rev 7167)
+++ trunk/src/main/org/jboss/messaging/core/journal/impl/NIOSequentialFile.java	2009-06-02 22:18:03 UTC (rev 7168)
@@ -195,13 +195,18 @@
       }
    }
 
-   public void write(final ByteBuffer bytes, final IOCallback callback) throws Exception
+   public void write(final ByteBuffer bytes, final boolean sync, final IOCallback callback) throws Exception
    {
       try
       {
          position.addAndGet(bytes.limit());
          
          channel.write(bytes);
+         
+         if (sync)
+         {
+            sync();
+         }
 
          if (callback != null)
          {

Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java	2009-06-02 20:21:38 UTC (rev 7167)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java	2009-06-02 22:18:03 UTC (rev 7168)
@@ -437,7 +437,7 @@
          return data.position();
       }
 
-      public synchronized void write(final ByteBuffer bytes, final IOCallback callback) throws Exception
+      public synchronized void write(final ByteBuffer bytes, final boolean sync, final IOCallback callback) throws Exception
       {
          if (!open)
          {
@@ -492,7 +492,7 @@
 
       public void write(final ByteBuffer bytes, final boolean sync) throws Exception
       {
-         write(bytes, null);
+         write(bytes, sync, null);
       }
 
       private void checkAndResize(final int size)

Modified: trunk/tests/src/org/jboss/messaging/tests/unit/util/timedbuffer/TimedBufferTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/util/timedbuffer/TimedBufferTest.java	2009-06-02 20:21:38 UTC (rev 7167)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/util/timedbuffer/TimedBufferTest.java	2009-06-02 22:18:03 UTC (rev 7168)
@@ -104,8 +104,9 @@
             record.put((byte)getSamplebyte(x++));
          }
          
+         timedBuffer.checkSize(10);
          record.rewind();
-         timedBuffer.addBytes(record, dummyCallback);
+         timedBuffer.addBytes(record, false, dummyCallback);
       }
       
       




More information about the jboss-cvs-commits mailing list