[jboss-cvs] JBoss Messaging SVN: r7165 - in trunk: src/main/org/jboss/messaging/core/asyncio/impl and 5 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Tue Jun 2 14:33:56 EDT 2009
Author: clebert.suconic at jboss.com
Date: 2009-06-02 14:33:56 -0400 (Tue, 02 Jun 2009)
New Revision: 7165
Added:
trunk/src/main/org/jboss/messaging/core/journal/impl/DummyCallback.java
trunk/src/main/org/jboss/messaging/core/journal/impl/SimpleWaitIOCallback.java
Modified:
trunk/native/src/LibAIOController.cpp
trunk/native/src/disktest.cpp
trunk/src/main/org/jboss/messaging/core/asyncio/impl/TimedBuffer.java
trunk/src/main/org/jboss/messaging/core/asyncio/impl/TimedBufferObserver.java
trunk/src/main/org/jboss/messaging/core/journal/IOCallback.java
trunk/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFile.java
trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java
trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalImplTestUnit.java
trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/fakes/FakeCallback.java
trunk/tests/src/org/jboss/messaging/tests/unit/util/timedbuffer/TimedBufferTest.java
Log:
AIO performance fixes
Modified: trunk/native/src/LibAIOController.cpp
===================================================================
--- trunk/native/src/LibAIOController.cpp 2009-06-02 17:17:29 UTC (rev 7164)
+++ trunk/native/src/LibAIOController.cpp 2009-06-02 18:33:56 UTC (rev 7165)
@@ -88,7 +88,7 @@
if (buffer == 0)
{
- throwException(env, NATIVE_ERROR_INVALID_BUFFER, "Invalid Direct Buffer used");
+ throwException(env, NATIVE_ERROR_INVALID_BUFFER, "Invalid Buffer used, libaio requires NativeBuffer instead of Java ByteBuffer");
return;
}
@@ -117,7 +117,7 @@
if (buffer == 0)
{
- throwException(env, NATIVE_ERROR_INVALID_BUFFER, "Invalid Direct Buffer used");
+ throwException(env, NATIVE_ERROR_INVALID_BUFFER, "Invalid Buffer used, libaio requires NativeBuffer instead of Java ByteBuffer");
return;
}
@@ -176,7 +176,7 @@
if (buffer == 0)
{
- throwException(env, NATIVE_ERROR_INVALID_BUFFER, "Invalid Direct Buffer used");
+ throwException(env, NATIVE_ERROR_INVALID_BUFFER, "Invalid Buffer used, libaio requires NativeBuffer instead of Java ByteBuffer");
return;
}
Modified: trunk/native/src/disktest.cpp
===================================================================
--- trunk/native/src/disktest.cpp 2009-06-02 17:17:29 UTC (rev 7164)
+++ trunk/native/src/disktest.cpp 2009-06-02 18:33:56 UTC (rev 7165)
@@ -21,6 +21,10 @@
}
+/**
+ * Authored by Clebert Suconic @ redhat . com
+ * Licensed under LGPL
+ */
int main(int arg, char * param[])
{
char * directory;
Modified: trunk/src/main/org/jboss/messaging/core/asyncio/impl/TimedBuffer.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/asyncio/impl/TimedBuffer.java 2009-06-02 17:17:29 UTC (rev 7164)
+++ trunk/src/main/org/jboss/messaging/core/asyncio/impl/TimedBuffer.java 2009-06-02 18:33:56 UTC (rev 7165)
@@ -47,7 +47,7 @@
public class TimedBuffer
{
// Constants -----------------------------------------------------
-
+
private static final Logger log = Logger.getLogger(TimedBuffer.class);
// Attributes ----------------------------------------------------
@@ -64,12 +64,12 @@
private final ByteBuffer currentBuffer;
- private final List<AIOCallback> callbacks;
+ private volatile List<AIOCallback> callbacks;
private volatile long timeLastWrite = 0;
private final ScheduledExecutorService schedule = ScheduledSingleton.getScheduledService();
-
+
private Lock lock = new ReentrantReadWriteLock().writeLock();
// Static --------------------------------------------------------
@@ -78,14 +78,15 @@
// Public --------------------------------------------------------
- //private byte[] data;
-
+ // private byte[] data;
+
public TimedBuffer(final TimedBufferObserver bufferObserver, final int size, final long timeout)
{
bufferSize = size;
this.bufferObserver = bufferObserver;
- this.timeout = timeout;
+ this.timeout = timeout;
this.currentBuffer = ByteBuffer.wrap(new byte[bufferSize]);
+ this.currentBuffer.limit(0);
this.callbacks = new ArrayList<AIOCallback>();
}
@@ -116,12 +117,12 @@
}
}
}
-
+
public void lock()
{
lock.lock();
}
-
+
public void unlock()
{
lock.unlock();
@@ -136,22 +137,33 @@
{
if (sizeChecked > bufferSize)
{
+ throw new IllegalStateException("Can't write records bigger than the bufferSize(" + bufferSize +
+ ") on the journal");
+ }
+
+
+ if (currentBuffer.limit() == 0 || currentBuffer.position() + sizeChecked > currentBuffer.limit())
+ {
flush();
+
+ final int remaining = bufferObserver.getRemainingBytes();
- currentBuffer.rewind();
- }
- else
- {
- // We verify against the currentBuffer.capacity as the observer may return a smaller buffer
- if (currentBuffer.position() + sizeChecked > currentBuffer.limit())
+ if (sizeChecked > remaining)
{
- flush();
-
+ return false;
+ }
+ else
+ {
currentBuffer.rewind();
+ currentBuffer.limit(Math.min(remaining, bufferSize));
+ return true;
}
}
+ else
+ {
+ return true;
+ }
- return true;
}
public synchronized void addBytes(final ByteBuffer bytes, final AIOCallback callback)
@@ -174,17 +186,17 @@
public synchronized void flush()
{
- if (currentBuffer != null)
+ if (currentBuffer.limit() > 0)
{
- ByteBuffer directBuffer = bufferObserver.newBuffer(currentBuffer.capacity(), currentBuffer.capacity());
+ ByteBuffer directBuffer = bufferObserver.newBuffer(bufferSize, currentBuffer.position());
+
+ currentBuffer.flip();
directBuffer.put(currentBuffer);
-
+
bufferObserver.flushBuffer(directBuffer, callbacks);
-
- currentBuffer.rewind();
- callbacks.clear();
+ callbacks = new ArrayList<AIOCallback>();
}
if (futureTimerRunnable != null)
@@ -194,6 +206,7 @@
}
timeLastWrite = 0;
+ currentBuffer.limit(0);
}
// Package protected ---------------------------------------------
@@ -201,7 +214,7 @@
// Protected -----------------------------------------------------
// Private -------------------------------------------------------
-
+
// Inner classes -------------------------------------------------
class CheckTimer implements Runnable
Modified: trunk/src/main/org/jboss/messaging/core/asyncio/impl/TimedBufferObserver.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/asyncio/impl/TimedBufferObserver.java 2009-06-02 17:17:29 UTC (rev 7164)
+++ trunk/src/main/org/jboss/messaging/core/asyncio/impl/TimedBufferObserver.java 2009-06-02 18:33:56 UTC (rev 7165)
@@ -51,9 +51,12 @@
public void flushBuffer(ByteBuffer buffer, List<AIOCallback> callbacks);
- /** Return a buffer, with any bufferSize up to bufferSize, as long as it fits the current file */
- public ByteBuffer newBuffer(int minSize, int maxSize);
+ /** Return the number of remaining bytes that still fit on the observer (file) */
+ public int getRemainingBytes();
+
+ public ByteBuffer newBuffer(int size, int limit);
+
// Package protected ---------------------------------------------
Modified: trunk/src/main/org/jboss/messaging/core/journal/IOCallback.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/IOCallback.java 2009-06-02 17:17:29 UTC (rev 7164)
+++ trunk/src/main/org/jboss/messaging/core/journal/IOCallback.java 2009-06-02 18:33:56 UTC (rev 7165)
@@ -34,5 +34,5 @@
*/
public interface IOCallback extends AIOCallback
{
-
+ void waitCompletion() throws Exception;
}
Modified: trunk/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFile.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFile.java 2009-06-02 17:17:29 UTC (rev 7164)
+++ trunk/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFile.java 2009-06-02 18:33:56 UTC (rev 7165)
@@ -295,11 +295,11 @@
public int read(final ByteBuffer bytes) throws Exception
{
- WaitCompletion waitCompletion = new WaitCompletion();
+ IOCallback waitCompletion = SimpleWaitIOCallback.getInstance();
int bytesRead = read(bytes, waitCompletion);
- waitCompletion.waitLatch();
+ waitCompletion.waitCompletion();
return bytesRead;
}
@@ -320,11 +320,11 @@
{
if (sync)
{
- WaitCompletion completion = new WaitCompletion();
+ IOCallback completion = SimpleWaitIOCallback.getInstance();
write(bytes, completion);
- completion.waitLatch();
+ completion.waitCompletion();
}
else
{
@@ -391,56 +391,8 @@
throw new IllegalStateException("File not opened");
}
}
-
- private static class DummyCallback implements IOCallback
- {
- static DummyCallback instance = new DummyCallback();
-
- public void done()
- {
- }
-
- public void onError(final int errorCode, final String errorMessage)
- {
- log.warn("Error on writing data!" + errorMessage + " code - " + errorCode, new Exception(errorMessage));
- }
- }
-
- private static class WaitCompletion implements IOCallback
- {
- private final CountDownLatch latch = new CountDownLatch(1);
-
- private volatile String errorMessage;
-
- private volatile int errorCode = 0;
-
- public void done()
- {
- latch.countDown();
- }
-
- public void onError(final int errorCode, final String errorMessage)
- {
- this.errorCode = errorCode;
-
- this.errorMessage = errorMessage;
-
- log.warn("Error Message " + errorMessage);
-
- latch.countDown();
- }
-
- public void waitLatch() throws Exception
- {
- latch.await();
- if (errorMessage != null)
- {
- throw new MessagingException(errorCode, errorMessage);
- }
- return;
- }
- }
-
+
+
private static class DelegateCallback implements IOCallback
{
final List<AIOCallback> delegates;
@@ -479,6 +431,10 @@
}
}
}
+
+ public void waitCompletion() throws Exception
+ {
+ }
}
class LocalBufferObserver implements TimedBufferObserver
@@ -498,19 +454,25 @@
}
}
- public ByteBuffer newBuffer(int minSize, int size)
+ public ByteBuffer newBuffer(int size, int limit)
{
size = factory.calculateBlockSize(size);
-
- long availableSize = fileSize - position.get();
-
- if (availableSize == 0 || availableSize < minSize)
+ limit = factory.calculateBlockSize(limit);
+
+ ByteBuffer buffer = factory.newBuffer(size);
+ buffer.limit(limit);
+ return buffer;
+ }
+
+ public int getRemainingBytes()
+ {
+ if (fileSize - position.get() > Integer.MAX_VALUE)
{
- return null;
+ return Integer.MAX_VALUE;
}
else
{
- return factory.newBuffer((int)Math.min(size, availableSize));
+ return (int)(fileSize - position.get());
}
}
Added: trunk/src/main/org/jboss/messaging/core/journal/impl/DummyCallback.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/impl/DummyCallback.java (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/journal/impl/DummyCallback.java 2009-06-02 18:33:56 UTC (rev 7165)
@@ -0,0 +1,60 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2009, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+
+package org.jboss.messaging.core.journal.impl;
+
+import org.jboss.messaging.core.journal.IOCallback;
+import org.jboss.messaging.core.logging.Logger;
+
+/**
+ * A DummyCallback
+ *
+ * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public class DummyCallback implements IOCallback
+{
+ static DummyCallback instance = new DummyCallback();
+
+ private static final Logger log = Logger.getLogger(SimpleWaitIOCallback.class);
+
+ public static IOCallback getInstance()
+ {
+ return instance;
+ }
+
+ public void done()
+ {
+ }
+
+ public void onError(final int errorCode, final String errorMessage)
+ {
+ log.warn("Error on writing data!" + errorMessage + " code - " + errorCode, new Exception(errorMessage));
+ }
+
+ public void waitCompletion() throws Exception
+ {
+ }
+}
+
Modified: trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java 2009-06-02 17:17:29 UTC (rev 7164)
+++ trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java 2009-06-02 18:33:56 UTC (rev 7165)
@@ -273,7 +273,7 @@
public void appendAddRecord(final long id, final byte recordType, final byte[] record) throws Exception
{
- appendAddRecord(id, recordType, new ByteArrayEncoding(record));
+ appendAddRecord(id, recordType, new ByteArrayEncoding(record), syncNonTransactional);
}
public void appendAddRecord(final long id, final byte recordType, final EncodingSupport record) throws Exception
@@ -301,11 +301,13 @@
bb.writeByte(recordType);
record.encode(bb);
bb.writeInt(size);
+
+ IOCallback callback = getSyncCallback(sync);
lock.lock();
try
{
- JournalFile usedFile = appendRecord(bb.toByteBuffer(), sync, null);
+ JournalFile usedFile = appendRecord(bb.toByteBuffer(), sync, callback);
posFilesMap.put(id, new PosFiles(usedFile));
}
@@ -313,6 +315,11 @@
{
lock.unlock();
}
+
+ if (callback != null)
+ {
+ callback.waitCompletion();
+ }
}
public void appendUpdateRecord(final long id, final byte recordType, final byte[] record) throws Exception
@@ -346,10 +353,13 @@
record.encode(bb);
bb.writeInt(size);
+
+ IOCallback callback = getSyncCallback(syncNonTransactional);
+
lock.lock();
try
{
- JournalFile usedFile = appendRecord(bb.toByteBuffer(), syncNonTransactional, null);
+ JournalFile usedFile = appendRecord(bb.toByteBuffer(), syncNonTransactional, callback);
posFiles.addUpdateFile(usedFile);
}
@@ -357,6 +367,11 @@
{
lock.unlock();
}
+
+ if (callback != null)
+ {
+ callback.waitCompletion();
+ }
}
public void appendDeleteRecord(final long id) throws Exception
@@ -381,11 +396,13 @@
bb.putInt(-1); // skip ID part
bb.putLong(id);
bb.putInt(size);
+
+ IOCallback callback = getSyncCallback(syncNonTransactional);
lock.lock();
try
{
- JournalFile usedFile = appendRecord(bb, syncNonTransactional, null);
+ JournalFile usedFile = appendRecord(bb, syncNonTransactional, callback);
posFiles.addDelete(usedFile);
}
@@ -393,6 +410,11 @@
{
lock.unlock();
}
+
+ if (callback != null)
+ {
+ callback.waitCompletion();
+ }
}
public void appendAddRecordTransactional(final long txID, final long id, final byte recordType, final byte[] record) throws Exception
@@ -586,7 +608,7 @@
ByteBuffer bb = writeTransaction(PREPARE_RECORD, txID, tx, transactionData);
- TransactionCallback callback = getTransactionCallback(txID);
+ IOCallback callback = getTransactionCallback(txID);
lock.lock();
try
@@ -640,7 +662,7 @@
ByteBuffer bb = writeTransaction(COMMIT_RECORD, txID, tx, null);
- TransactionCallback callback = getTransactionCallback(txID);
+ IOCallback callback = getTransactionCallback(txID);
lock.lock();
try
@@ -687,7 +709,7 @@
bb.putLong(txID);
bb.putInt(size);
- TransactionCallback callback = getTransactionCallback(txID);
+ IOCallback callback = getTransactionCallback(txID);
lock.lock();
try
@@ -1952,7 +1974,7 @@
* Note: This method will perform rwlock.readLock.lock();
* The method caller should aways unlock that readLock
* */
- private JournalFile appendRecord(final ByteBuffer bb, final boolean sync, final TransactionCallback callback) throws Exception
+ private JournalFile appendRecord(final ByteBuffer bb, final boolean sync, final IOCallback callback) throws Exception
{
lock.lock();
@@ -1979,6 +2001,13 @@
currentFile.getFile().unlockBuffer();
moveNextFile();
currentFile.getFile().lockBuffer();
+
+ // The same check needs to be done at the new file also
+ if (!currentFile.getFile().fits(size))
+ {
+ // Sanity check, this should never happen
+ throw new IllegalStateException("Invalid logic on buffer allocation");
+ }
}
if (currentFile == null)
@@ -1996,6 +2025,7 @@
{
currentFile.getFile().write(bb, callback);
+ // This is defaulted to false. The user is telling us to not wait the buffer timeout when a commit or sync is called
if (flushOnSync && sync)
{
currentFile.getFile().flush();
@@ -2212,8 +2242,30 @@
return tx;
}
+
+
+ private IOCallback getSyncCallback(boolean sync)
+ {
+ if (fileFactory.isSupportsCallbacks())
+ {
+ if (sync)
+ {
+ return SimpleWaitIOCallback.getInstance();
+ }
+ else
+ {
+ return DummyCallback.getInstance();
+ }
+ }
+ else
+ {
+ return null;
+ }
+ }
- private TransactionCallback getTransactionCallback(final long transactionId) throws MessagingException
+
+
+ private IOCallback getTransactionCallback(final long transactionId) throws MessagingException
{
if (fileFactory.isSupportsCallbacks() && syncTransactional)
{
Added: trunk/src/main/org/jboss/messaging/core/journal/impl/SimpleWaitIOCallback.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/impl/SimpleWaitIOCallback.java (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/journal/impl/SimpleWaitIOCallback.java 2009-06-02 18:33:56 UTC (rev 7165)
@@ -0,0 +1,80 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2009, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.core.journal.impl;
+
+import java.util.concurrent.CountDownLatch;
+
+import org.jboss.messaging.core.exception.MessagingException;
+import org.jboss.messaging.core.journal.IOCallback;
+import org.jboss.messaging.core.logging.Logger;
+
+/**
+ * A SimpleWaitIOCallback
+ *
+ * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public class SimpleWaitIOCallback implements IOCallback
+{
+
+ private static final Logger log = Logger.getLogger(SimpleWaitIOCallback.class);
+
+ private final CountDownLatch latch = new CountDownLatch(1);
+
+ private volatile String errorMessage;
+
+ private volatile int errorCode = 0;
+
+ public static IOCallback getInstance()
+ {
+ return new SimpleWaitIOCallback();
+ }
+
+
+ public void done()
+ {
+ latch.countDown();
+ }
+
+ public void onError(final int errorCode, final String errorMessage)
+ {
+ this.errorCode = errorCode;
+
+ this.errorMessage = errorMessage;
+
+ log.warn("Error Message " + errorMessage);
+
+ latch.countDown();
+ }
+
+ public void waitCompletion() throws Exception
+ {
+ latch.await();
+ if (errorMessage != null)
+ {
+ throw new MessagingException(errorCode, errorMessage);
+ }
+ return;
+ }
+}
Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalImplTestUnit.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalImplTestUnit.java 2009-06-02 17:17:29 UTC (rev 7164)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalImplTestUnit.java 2009-06-02 18:33:56 UTC (rev 7165)
@@ -119,6 +119,7 @@
// exceptions)
for (int i = 0; i < 100; i++)
{
+ System.out.println("i = " + i);
journal.appendAddRecord(1, (byte)1, new SimpleEncoding(2, (byte)'a'));
}
stopJournal();
Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/fakes/FakeCallback.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/fakes/FakeCallback.java 2009-06-02 17:17:29 UTC (rev 7164)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/fakes/FakeCallback.java 2009-06-02 18:33:56 UTC (rev 7165)
@@ -63,4 +63,8 @@
latch.await();
}
+ public void waitCompletion() throws Exception
+ {
+ }
+
}
Modified: trunk/tests/src/org/jboss/messaging/tests/unit/util/timedbuffer/TimedBufferTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/util/timedbuffer/TimedBufferTest.java 2009-06-02 17:17:29 UTC (rev 7164)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/util/timedbuffer/TimedBufferTest.java 2009-06-02 18:33:56 UTC (rev 7165)
@@ -72,6 +72,7 @@
final AtomicInteger flushTimes = new AtomicInteger(0);
class TestObserver implements TimedBufferObserver
{
+ //TODO: fix the test
public void flushBuffer(ByteBuffer buffer, List<AIOCallback> callbacks)
{
buffers.add(buffer);
@@ -85,6 +86,11 @@
{
return ByteBuffer.allocate(maxSize);
}
+
+ public int getRemainingBytes()
+ {
+ return 1024*1024;
+ }
}
TimedBuffer timedBuffer = new TimedBuffer(new TestObserver(), 100, 3600 * 1000); // Any big timeout
More information about the jboss-cvs-commits
mailing list