[jboss-cvs] JBoss Messaging SVN: r7167 - in trunk/src/main/org/jboss/messaging/core: journal/impl and 1 other directory.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Tue Jun 2 16:21:38 EDT 2009
Author: timfox
Date: 2009-06-02 16:21:38 -0400 (Tue, 02 Jun 2009)
New Revision: 7167
Modified:
trunk/src/main/org/jboss/messaging/core/asyncio/impl/TimedBuffer.java
trunk/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFile.java
Log:
fix timing on aio journal
Modified: trunk/src/main/org/jboss/messaging/core/asyncio/impl/TimedBuffer.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/asyncio/impl/TimedBuffer.java 2009-06-02 18:42:25 UTC (rev 7166)
+++ trunk/src/main/org/jboss/messaging/core/asyncio/impl/TimedBuffer.java 2009-06-02 20:21:38 UTC (rev 7167)
@@ -25,17 +25,13 @@
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.TimeUnit;
+import java.util.Timer;
+import java.util.TimerTask;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.jboss.messaging.core.asyncio.AIOCallback;
import org.jboss.messaging.core.logging.Logger;
-import org.jboss.messaging.utils.JBMThreadFactory;
/**
* A TimedBuffer
@@ -56,21 +52,19 @@
private final CheckTimer timerRunnable = new CheckTimer();
- private volatile ScheduledFuture<?> futureTimerRunnable;
-
private final long timeout;
private final int bufferSize;
private final ByteBuffer currentBuffer;
- private volatile List<AIOCallback> callbacks;
+ private List<AIOCallback> callbacks;
- private volatile long timeLastWrite = 0;
+ private final Lock lock = new ReentrantReadWriteLock().writeLock();
- private final ScheduledExecutorService schedule = ScheduledSingleton.getScheduledService();
+ private final Timer timer;
- private Lock lock = new ReentrantReadWriteLock().writeLock();
+ private long lastFlushTime;
// Static --------------------------------------------------------
@@ -78,46 +72,26 @@
// Public --------------------------------------------------------
- // private byte[] data;
-
public TimedBuffer(final TimedBufferObserver bufferObserver, final int size, final long timeout)
{
- bufferSize = size;
+ bufferSize = size;
this.bufferObserver = bufferObserver;
this.timeout = timeout;
this.currentBuffer = ByteBuffer.wrap(new byte[bufferSize]);
this.currentBuffer.limit(0);
this.callbacks = new ArrayList<AIOCallback>();
+ this.timer = new Timer("jbm-timed-buffer", true);
+
+ this.timer.schedule(timerRunnable, timeout, timeout);
}
-
- public int position()
+
+ public synchronized void close()
{
- if (currentBuffer == null)
- {
- return 0;
- }
- else
- {
- return currentBuffer.position();
- }
+ timerRunnable.cancel();
+
+ timer.cancel();
}
- public void checkTimer()
- {
- if (System.currentTimeMillis() - timeLastWrite > timeout)
- {
- lock.lock();
- try
- {
- flush();
- }
- finally
- {
- lock.unlock();
- }
- }
- }
-
public void lock()
{
lock.lock();
@@ -141,13 +115,12 @@
") on the journal");
}
-
- if (currentBuffer.limit() == 0 || currentBuffer.position() + sizeChecked > currentBuffer.limit())
+ if (currentBuffer.limit() == 0 || currentBuffer.position() + sizeChecked > currentBuffer.limit())
{
flush();
final int remaining = bufferObserver.getRemainingBytes();
-
+
if (sizeChecked > remaining)
{
return false;
@@ -171,13 +144,6 @@
currentBuffer.put(bytes);
callbacks.add(callback);
- if (futureTimerRunnable == null)
- {
- futureTimerRunnable = schedule.scheduleAtFixedRate(timerRunnable, timeout, timeout, TimeUnit.MILLISECONDS);
- }
-
- timeLastWrite = System.currentTimeMillis();
-
if (currentBuffer.position() == currentBuffer.capacity())
{
flush();
@@ -191,22 +157,17 @@
ByteBuffer directBuffer = bufferObserver.newBuffer(bufferSize, currentBuffer.position());
currentBuffer.flip();
-
+
directBuffer.put(currentBuffer);
bufferObserver.flushBuffer(directBuffer, callbacks);
-
+
callbacks = new ArrayList<AIOCallback>();
}
- if (futureTimerRunnable != null)
- {
- futureTimerRunnable.cancel(false);
- futureTimerRunnable = null;
- }
-
- timeLastWrite = 0;
currentBuffer.limit(0);
+
+ this.lastFlushTime = System.currentTimeMillis();
}
// Package protected ---------------------------------------------
@@ -214,32 +175,45 @@
// Protected -----------------------------------------------------
// Private -------------------------------------------------------
-
- // Inner classes -------------------------------------------------
-
- class CheckTimer implements Runnable
+
+ private void checkTimer()
{
- public void run()
+ if (System.currentTimeMillis() - lastFlushTime >= timeout)
{
- checkTimer();
+ lock.lock();
+ try
+ {
+ flush();
+ }
+ finally
+ {
+ lock.unlock();
+ }
}
}
- // TODO: is there a better place to get this schedule service from?
- static class ScheduledSingleton
+
+ // Inner classes -------------------------------------------------
+
+ class CheckTimer extends TimerTask
{
- private static ScheduledExecutorService scheduleService;
+ private boolean cancelled;
- private static synchronized ScheduledExecutorService getScheduledService()
+ @Override
+ public synchronized void run()
{
- if (scheduleService == null)
+ if (!cancelled)
{
- ThreadFactory factory = new JBMThreadFactory("JBM-buffer-scheduled-control", true);
-
- scheduleService = Executors.newScheduledThreadPool(2, factory);
+ checkTimer();
}
+ }
- return scheduleService;
+ @Override
+ public synchronized boolean cancel()
+ {
+ cancelled = true;
+
+ return super.cancel();
}
}
Modified: trunk/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFile.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFile.java 2009-06-02 18:42:25 UTC (rev 7166)
+++ trunk/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFile.java 2009-06-02 20:21:38 UTC (rev 7167)
@@ -144,16 +144,15 @@
{
timedBuffer.unlock();
}
-
-
public synchronized void close() throws Exception
{
checkOpened();
opened = false;
-
-
+
timedBuffer.flush();
+
+ timedBuffer.close();
final CountDownLatch donelatch = new CountDownLatch(1);
@@ -172,7 +171,7 @@
}
aioFile.close();
- aioFile = null;
+ aioFile = null;
}
public void delete() throws Exception
More information about the jboss-cvs-commits
mailing list