[jboss-cvs] JBoss Messaging SVN: r7168 - in trunk: src/main/org/jboss/messaging/core/journal and 3 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Tue Jun 2 18:18:04 EDT 2009
Author: clebert.suconic at jboss.com
Date: 2009-06-02 18:18:03 -0400 (Tue, 02 Jun 2009)
New Revision: 7168
Modified:
trunk/src/main/org/jboss/messaging/core/asyncio/impl/TimedBuffer.java
trunk/src/main/org/jboss/messaging/core/journal/SequentialFile.java
trunk/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFile.java
trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java
trunk/src/main/org/jboss/messaging/core/journal/impl/NIOSequentialFile.java
trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java
trunk/tests/src/org/jboss/messaging/tests/unit/util/timedbuffer/TimedBufferTest.java
Log:
Fixes
Modified: trunk/src/main/org/jboss/messaging/core/asyncio/impl/TimedBuffer.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/asyncio/impl/TimedBuffer.java 2009-06-02 20:21:38 UTC (rev 7167)
+++ trunk/src/main/org/jboss/messaging/core/asyncio/impl/TimedBuffer.java 2009-06-02 22:18:03 UTC (rev 7168)
@@ -25,13 +25,17 @@
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
-import java.util.Timer;
-import java.util.TimerTask;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.jboss.messaging.core.asyncio.AIOCallback;
import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.utils.JBMThreadFactory;
/**
* A TimedBuffer
@@ -52,6 +56,8 @@
private final CheckTimer timerRunnable = new CheckTimer();
+ private volatile ScheduledFuture<?> futureTimerRunnable;
+
private final long timeout;
private final int bufferSize;
@@ -62,10 +68,14 @@
private final Lock lock = new ReentrantReadWriteLock().writeLock();
- private final Timer timer;
+ private final ScheduledExecutorService schedule = ScheduledSingleton.getScheduledService();
- private long lastFlushTime;
+ // used to measure inactivity. This buffer will be automatically flushed when more than timeout inactive
+ private volatile long timeLastAdd = 0;
+ // used to measure sync requests. When a sync is requested, it shouldn't take more than timeout to happen
+ private volatile long timeLastSync = 0;
+
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
@@ -74,23 +84,13 @@
public TimedBuffer(final TimedBufferObserver bufferObserver, final int size, final long timeout)
{
- bufferSize = size;
+ bufferSize = size;
this.bufferObserver = bufferObserver;
this.timeout = timeout;
- this.currentBuffer = ByteBuffer.wrap(new byte[bufferSize]);
- this.currentBuffer.limit(0);
- this.callbacks = new ArrayList<AIOCallback>();
- this.timer = new Timer("jbm-timed-buffer", true);
-
- this.timer.schedule(timerRunnable, timeout, timeout);
+ currentBuffer = ByteBuffer.wrap(new byte[bufferSize]);
+ currentBuffer.limit(0);
+ callbacks = new ArrayList<AIOCallback>();
}
-
- public synchronized void close()
- {
- timerRunnable.cancel();
-
- timer.cancel();
- }
public void lock()
{
@@ -102,8 +102,17 @@
lock.unlock();
}
+ /** used to determine that a sync happened, and we should schedule flush to not take more than timeout no matter the activity on the buffer */
+ public synchronized void sync()
+ {
+ if (timeLastSync == 0)
+ {
+ timeLastSync = System.currentTimeMillis();
+ }
+ }
+
/**
- * Verify if the size fits the buffer, if it fits we lock the buffer to avoid a flush until add is called
+ * Verify if the size fits the buffer
* @param sizeChecked
* @return
*/
@@ -139,11 +148,22 @@
}
- public synchronized void addBytes(final ByteBuffer bytes, final AIOCallback callback)
+ public synchronized void addBytes(final ByteBuffer bytes, final boolean sync, final AIOCallback callback)
{
+ timeLastAdd = System.currentTimeMillis();
+ if (sync)
+ {
+ this.timeLastSync = timeLastAdd;
+ }
+
currentBuffer.put(bytes);
callbacks.add(callback);
+ if (futureTimerRunnable == null)
+ {
+ futureTimerRunnable = schedule.scheduleAtFixedRate(timerRunnable, timeout, timeout, TimeUnit.MILLISECONDS);
+ }
+
if (currentBuffer.position() == currentBuffer.capacity())
{
flush();
@@ -163,11 +183,19 @@
bufferObserver.flushBuffer(directBuffer, callbacks);
callbacks = new ArrayList<AIOCallback>();
+
}
+ if (futureTimerRunnable != null)
+ {
+ futureTimerRunnable.cancel(false);
+ futureTimerRunnable = null;
+ }
+
+ timeLastAdd = 0;
+ timeLastSync = 0;
+
currentBuffer.limit(0);
-
- this.lastFlushTime = System.currentTimeMillis();
}
// Package protected ---------------------------------------------
@@ -175,10 +203,14 @@
// Protected -----------------------------------------------------
// Private -------------------------------------------------------
-
+
private void checkTimer()
{
- if (System.currentTimeMillis() - lastFlushTime >= timeout)
+ final long now = System.currentTimeMillis();
+
+ // if inactive for more than the timeout
+ // of if a sync happened at more than the the timeout ago
+ if (now - timeLastAdd >= timeout || timeLastSync != 0 && now - timeLastSync >= timeout)
{
lock.lock();
try
@@ -192,28 +224,31 @@
}
}
-
// Inner classes -------------------------------------------------
- class CheckTimer extends TimerTask
+ class CheckTimer implements Runnable
{
- private boolean cancelled;
+ public void run()
+ {
+ checkTimer();
+ }
+ }
- @Override
- public synchronized void run()
+ // TODO: is there a better place to get this schedule service from?
+ static class ScheduledSingleton
+ {
+ private static ScheduledExecutorService scheduleService;
+
+ private static synchronized ScheduledExecutorService getScheduledService()
{
- if (!cancelled)
+ if (scheduleService == null)
{
- checkTimer();
+ ThreadFactory factory = new JBMThreadFactory("JBM-buffer-scheduled-control", true);
+
+ scheduleService = Executors.newSingleThreadScheduledExecutor(factory);
}
- }
- @Override
- public synchronized boolean cancel()
- {
- cancelled = true;
-
- return super.cancel();
+ return scheduleService;
}
}
Modified: trunk/src/main/org/jboss/messaging/core/journal/SequentialFile.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/SequentialFile.java 2009-06-02 20:21:38 UTC (rev 7167)
+++ trunk/src/main/org/jboss/messaging/core/journal/SequentialFile.java 2009-06-02 22:18:03 UTC (rev 7168)
@@ -62,7 +62,7 @@
void delete() throws Exception;
- void write(ByteBuffer bytes, IOCallback callback) throws Exception;
+ void write(ByteBuffer bytes, boolean sync, IOCallback callback) throws Exception;
void write(ByteBuffer bytes, boolean sync) throws Exception;
Modified: trunk/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFile.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFile.java 2009-06-02 20:21:38 UTC (rev 7167)
+++ trunk/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFile.java 2009-06-02 22:18:03 UTC (rev 7168)
@@ -151,8 +151,6 @@
opened = false;
timedBuffer.flush();
-
- timedBuffer.close();
final CountDownLatch donelatch = new CountDownLatch(1);
@@ -303,11 +301,11 @@
return bytesRead;
}
- public void write(final ByteBuffer bytes, final IOCallback callback) throws Exception
+ public void write(final ByteBuffer bytes, final boolean sync, final IOCallback callback) throws Exception
{
if (buffering)
{
- timedBuffer.addBytes(bytes, callback);
+ timedBuffer.addBytes(bytes, sync, callback);
}
else
{
@@ -321,13 +319,13 @@
{
IOCallback completion = SimpleWaitIOCallback.getInstance();
- write(bytes, completion);
+ write(bytes, true, completion);
completion.waitCompletion();
}
else
{
- write(bytes, DummyCallback.instance);
+ write(bytes, false, DummyCallback.instance);
}
}
Modified: trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java 2009-06-02 20:21:38 UTC (rev 7167)
+++ trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java 2009-06-02 22:18:03 UTC (rev 7168)
@@ -1477,6 +1477,11 @@
* It will call waitComplete on every transaction, so any assertions on the file system will be correct after this */
public void debugWait() throws Exception
{
+ if (currentFile != null)
+ {
+ currentFile.getFile().flush();
+ }
+
for (TransactionCallback callback : transactionCallbacks.values())
{
callback.waitCompletion();
@@ -2023,7 +2028,7 @@
if (callback != null)
{
- currentFile.getFile().write(bb, callback);
+ currentFile.getFile().write(bb, sync, callback);
// This is defaulted to false. The user is telling us to not wait the buffer timeout when a commit or sync is called
if (flushOnSync && sync)
Modified: trunk/src/main/org/jboss/messaging/core/journal/impl/NIOSequentialFile.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/impl/NIOSequentialFile.java 2009-06-02 20:21:38 UTC (rev 7167)
+++ trunk/src/main/org/jboss/messaging/core/journal/impl/NIOSequentialFile.java 2009-06-02 22:18:03 UTC (rev 7168)
@@ -195,13 +195,18 @@
}
}
- public void write(final ByteBuffer bytes, final IOCallback callback) throws Exception
+ public void write(final ByteBuffer bytes, final boolean sync, final IOCallback callback) throws Exception
{
try
{
position.addAndGet(bytes.limit());
channel.write(bytes);
+
+ if (sync)
+ {
+ sync();
+ }
if (callback != null)
{
Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java 2009-06-02 20:21:38 UTC (rev 7167)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java 2009-06-02 22:18:03 UTC (rev 7168)
@@ -437,7 +437,7 @@
return data.position();
}
- public synchronized void write(final ByteBuffer bytes, final IOCallback callback) throws Exception
+ public synchronized void write(final ByteBuffer bytes, final boolean sync, final IOCallback callback) throws Exception
{
if (!open)
{
@@ -492,7 +492,7 @@
public void write(final ByteBuffer bytes, final boolean sync) throws Exception
{
- write(bytes, null);
+ write(bytes, sync, null);
}
private void checkAndResize(final int size)
Modified: trunk/tests/src/org/jboss/messaging/tests/unit/util/timedbuffer/TimedBufferTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/util/timedbuffer/TimedBufferTest.java 2009-06-02 20:21:38 UTC (rev 7167)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/util/timedbuffer/TimedBufferTest.java 2009-06-02 22:18:03 UTC (rev 7168)
@@ -104,8 +104,9 @@
record.put((byte)getSamplebyte(x++));
}
+ timedBuffer.checkSize(10);
record.rewind();
- timedBuffer.addBytes(record, dummyCallback);
+ timedBuffer.addBytes(record, false, dummyCallback);
}
More information about the jboss-cvs-commits
mailing list