[jboss-cvs] JBoss Messaging SVN: r7137 - in branches/Branch_JBM2_Perf_Clebert: src/main/org/jboss/messaging/core/journal/impl and 1 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Fri May 29 23:42:18 EDT 2009
Author: clebert.suconic at jboss.com
Date: 2009-05-29 23:42:18 -0400 (Fri, 29 May 2009)
New Revision: 7137
Removed:
branches/Branch_JBM2_Perf_Clebert/src/main/org/jboss/messaging/core/asyncio/timedbuffer/TimedBufferController.java
Modified:
branches/Branch_JBM2_Perf_Clebert/src/main/org/jboss/messaging/core/asyncio/timedbuffer/TimedBuffer.java
branches/Branch_JBM2_Perf_Clebert/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFile.java
branches/Branch_JBM2_Perf_Clebert/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFileFactory.java
branches/Branch_JBM2_Perf_Clebert/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java
branches/Branch_JBM2_Perf_Clebert/tests/src/org/jboss/messaging/tests/performance/journal/PerformanceComparissonTest.java
Log:
Timers Implementation
Modified: branches/Branch_JBM2_Perf_Clebert/src/main/org/jboss/messaging/core/asyncio/timedbuffer/TimedBuffer.java
===================================================================
--- branches/Branch_JBM2_Perf_Clebert/src/main/org/jboss/messaging/core/asyncio/timedbuffer/TimedBuffer.java 2009-05-30 02:05:46 UTC (rev 7136)
+++ branches/Branch_JBM2_Perf_Clebert/src/main/org/jboss/messaging/core/asyncio/timedbuffer/TimedBuffer.java 2009-05-30 03:42:18 UTC (rev 7137)
@@ -25,8 +25,16 @@
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.jboss.messaging.core.asyncio.AIOCallback;
+import org.jboss.messaging.utils.JBMThreadFactory;
/**
* A TimedBuffer
@@ -44,32 +52,37 @@
// Attributes ----------------------------------------------------
- final TimedBufferObserver bufferObserver;
-
- final TimedBufferController controller;
+ private final TimedBufferObserver bufferObserver;
- final long timeout;
+ private final CheckTimer timerRunnable = new CheckTimer();
- final int bufferSize;
+ private volatile ScheduledFuture<?> futureTimerRunnable;
- volatile ByteBuffer currentBuffer;
+ private final long timeout;
- volatile List<AIOCallback> callbacks;
+ private final int bufferSize;
+
+ private volatile ByteBuffer currentBuffer;
+
+ private volatile List<AIOCallback> callbacks;
+
+ private volatile long timeLastWrite = 0;
- volatile long writeLastTime = 0;
+ private final Lock lock = new ReentrantReadWriteLock().writeLock();
+ private final ScheduledExecutorService schedule = ScheduledSingleton.getScheduledService();
+
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
// Public --------------------------------------------------------
- public TimedBuffer(final TimedBufferController controller, final TimedBufferObserver bufferObserver, final int size, final long timeout)
+ public TimedBuffer(final TimedBufferObserver bufferObserver, final int size, final long timeout)
{
- this.bufferSize = size;
+ bufferSize = size;
this.bufferObserver = bufferObserver;
this.timeout = timeout;
- this.controller = controller;
}
public int position()
@@ -84,44 +97,88 @@
}
}
- public synchronized boolean checkSize(int sizeChecked)
+ public void checkTimer()
{
+ if (System.currentTimeMillis() - timeLastWrite > timeout)
+ {
+ lock.lock();
+ try
+ {
+ flush();
+ }
+ finally
+ {
+ lock.unlock();
+ }
+ }
+
+ }
+
+ /**
+ * Verify if the size fits the buffer, if it fits we lock the buffer to avoid a flush until add is called
+ * @param sizeChecked
+ * @return
+ */
+ public synchronized boolean checkSize(final int sizeChecked)
+ {
+ final boolean fits;
if (sizeChecked > bufferSize)
{
flush();
// We transfer the bytes, as the bufferObserver has special alignment restrictions on the buffer addressing
currentBuffer = bufferObserver.newBuffer(sizeChecked, sizeChecked);
-
- return currentBuffer != null;
+
+ fits = currentBuffer != null;
}
else
{
// We verify against the currentBuffer.capacity as the observer may return a smaller buffer
- if (currentBuffer == null || (currentBuffer.position() + sizeChecked) > currentBuffer.limit())
+ if (currentBuffer == null || currentBuffer.position() + sizeChecked > currentBuffer.limit())
{
flush();
newBuffer(sizeChecked);
}
-
- return currentBuffer != null;
-
+
+ fits = currentBuffer != null;
}
+
+ // If the size fits, we set the lock as we can't have a flush happening until addBytes was called
+ if (fits)
+ {
+ lock.lock();
+ }
+
+ return fits;
}
public synchronized void addBytes(final ByteBuffer bytes, final AIOCallback callback)
{
- if (currentBuffer == null)
+ try
{
- newBuffer(0);
+ if (currentBuffer == null)
+ {
+ newBuffer(0);
+ }
+
+ currentBuffer.put(bytes);
+ callbacks.add(callback);
+
+ if (futureTimerRunnable == null)
+ {
+ futureTimerRunnable = schedule.scheduleAtFixedRate(timerRunnable, timeout, timeout, TimeUnit.MILLISECONDS);
+ }
+
+ timeLastWrite = System.currentTimeMillis();
+
+ if (currentBuffer.position() == currentBuffer.capacity())
+ {
+ flush();
+ }
}
-
- currentBuffer.put(bytes);
- callbacks.add(callback);
-
- if (currentBuffer.position() == currentBuffer.capacity())
+ finally
{
- flush();
+ lock.unlock();
}
}
@@ -133,6 +190,14 @@
currentBuffer = null;
callbacks = null;
}
+
+ if (futureTimerRunnable != null)
+ {
+ futureTimerRunnable.cancel(false);
+ futureTimerRunnable = null;
+ }
+
+ timeLastWrite = 0;
}
// Package protected ---------------------------------------------
@@ -141,7 +206,7 @@
// Private -------------------------------------------------------
- private void newBuffer(int minSize)
+ private void newBuffer(final int minSize)
{
currentBuffer = bufferObserver.newBuffer(minSize, bufferSize);
callbacks = new ArrayList<AIOCallback>();
@@ -149,4 +214,30 @@
// Inner classes -------------------------------------------------
+ class CheckTimer implements Runnable
+ {
+ public void run()
+ {
+ checkTimer();
+ }
+ }
+
+ // TODO: is there a better place to get this schedule service from?
+ static class ScheduledSingleton
+ {
+ private static ScheduledExecutorService scheduleService;
+
+ private static synchronized ScheduledExecutorService getScheduledService()
+ {
+ if (scheduleService == null)
+ {
+ ThreadFactory factory = new JBMThreadFactory("JBM-buffer-scheduled-control", true);
+
+ scheduleService = Executors.newScheduledThreadPool(2, factory);
+ }
+
+ return scheduleService;
+ }
+ }
+
}
Deleted: branches/Branch_JBM2_Perf_Clebert/src/main/org/jboss/messaging/core/asyncio/timedbuffer/TimedBufferController.java
===================================================================
--- branches/Branch_JBM2_Perf_Clebert/src/main/org/jboss/messaging/core/asyncio/timedbuffer/TimedBufferController.java 2009-05-30 02:05:46 UTC (rev 7136)
+++ branches/Branch_JBM2_Perf_Clebert/src/main/org/jboss/messaging/core/asyncio/timedbuffer/TimedBufferController.java 2009-05-30 03:42:18 UTC (rev 7137)
@@ -1,54 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005-2009, Red Hat Middleware LLC, and individual contributors
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-
-
-package org.jboss.messaging.core.asyncio.timedbuffer;
-
-/**
- * A TimedBufferController
- *
- * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
- *
- *
- */
-public class TimedBufferController
-{
-
- // Constants -----------------------------------------------------
-
- // Attributes ----------------------------------------------------
-
- // Static --------------------------------------------------------
-
- // Constructors --------------------------------------------------
-
- // Public --------------------------------------------------------
-
- // Package protected ---------------------------------------------
-
- // Protected -----------------------------------------------------
-
- // Private -------------------------------------------------------
-
- // Inner classes -------------------------------------------------
-
-}
Modified: branches/Branch_JBM2_Perf_Clebert/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFile.java
===================================================================
--- branches/Branch_JBM2_Perf_Clebert/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFile.java 2009-05-30 02:05:46 UTC (rev 7136)
+++ branches/Branch_JBM2_Perf_Clebert/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFile.java 2009-05-30 03:42:18 UTC (rev 7137)
@@ -101,7 +101,7 @@
this.bufferCallback = bufferCallback;
this.executor = executor;
this.pollerExecutor = pollerExecutor;
- this.timedBuffer = new TimedBuffer(null, new LocalBufferObserver(), bufferSize, bufferTimeoutMilliseconds);
+ this.timedBuffer = new TimedBuffer(new LocalBufferObserver(), bufferSize, bufferTimeoutMilliseconds);
}
public boolean isOpen()
Modified: branches/Branch_JBM2_Perf_Clebert/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFileFactory.java
===================================================================
--- branches/Branch_JBM2_Perf_Clebert/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFileFactory.java 2009-05-30 02:05:46 UTC (rev 7136)
+++ branches/Branch_JBM2_Perf_Clebert/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFileFactory.java 2009-05-30 03:42:18 UTC (rev 7137)
@@ -77,7 +77,7 @@
public AIOSequentialFileFactory(final String journalDir)
{
- this(journalDir, 1024 * 1024, 2);
+ this(journalDir, 1024 * 1024, 1);
}
Modified: branches/Branch_JBM2_Perf_Clebert/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java
===================================================================
--- branches/Branch_JBM2_Perf_Clebert/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java 2009-05-30 02:05:46 UTC (rev 7136)
+++ branches/Branch_JBM2_Perf_Clebert/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java 2009-05-30 03:42:18 UTC (rev 7137)
@@ -1868,6 +1868,12 @@
if (!currentFile.getFile().fits(size))
{
moveNextFile();
+
+ // Needs to do it again to guarantee locking on timed buffers on AIO
+ if (!currentFile.getFile().fits(size))
+ {
+ throw new IllegalStateException("Inconsistence on sizes on journal");
+ }
}
if (currentFile == null)
@@ -1933,9 +1939,13 @@
bb.putInt(orderingID);
bb.rewind();
+
+ sequentialFile.setBuffering(false);
sequentialFile.write(bb, true);
+ sequentialFile.setBuffering(true);
+
JournalFile info = new JournalFileImpl(sequentialFile, orderingID);
if (!keepOpened)
Modified: branches/Branch_JBM2_Perf_Clebert/tests/src/org/jboss/messaging/tests/performance/journal/PerformanceComparissonTest.java
===================================================================
--- branches/Branch_JBM2_Perf_Clebert/tests/src/org/jboss/messaging/tests/performance/journal/PerformanceComparissonTest.java 2009-05-30 02:05:46 UTC (rev 7136)
+++ branches/Branch_JBM2_Perf_Clebert/tests/src/org/jboss/messaging/tests/performance/journal/PerformanceComparissonTest.java 2009-05-30 03:42:18 UTC (rev 7137)
@@ -249,6 +249,33 @@
testJournal(new NIOSequentialFileFactory(getTestDir()), NUM_RECORDS, SIZE_RECORD, 13, 10 * 1024 * 1024);
}
}
+
+
+ public void testTransactional() throws Exception
+ {
+ SequentialFileFactory factory = new AIOSequentialFileFactory(getTestDir(), 1024 * 1024, 1);
+
+ JournalImpl journal = new JournalImpl(1024 * 1024 * 10, // 10M.. we believe that's the usual cilinder
+ // size.. not an exact science here
+ 10, // number of files pre-allocated
+ true, // sync on commit
+ false, // no sync on non transactional
+ factory, // AIO or NIO
+ "jbm", // file name
+ "jbm", // extension
+ 500); // it's like a semaphore for callback on the AIO layer
+
+ journal.start();
+ journal.load(dummyLoader);
+
+ journal.appendAddRecordTransactional(1, 1, (byte)1, new byte[]{(byte)1});
+ journal.appendCommitRecord(1);
+
+ journal.stop();
+
+
+
+ }
public void disabled_testDeleteme() throws Exception
{
More information about the jboss-cvs-commits
mailing list