[jboss-cvs] JBoss Messaging SVN: r7164 - in trunk: src/main/org/jboss/messaging/core/asyncio/impl and 3 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Tue Jun 2 13:17:30 EDT 2009
Author: timfox
Date: 2009-06-02 13:17:29 -0400 (Tue, 02 Jun 2009)
New Revision: 7164
Added:
trunk/src/main/org/jboss/messaging/core/asyncio/impl/TimedBuffer.java
trunk/src/main/org/jboss/messaging/core/asyncio/impl/TimedBufferObserver.java
Removed:
trunk/src/main/org/jboss/messaging/utils/timedbuffer/TimedBuffer.java
trunk/src/main/org/jboss/messaging/utils/timedbuffer/TimedBufferObserver.java
Modified:
trunk/native/src/disktest.cpp
trunk/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFile.java
trunk/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFileFactory.java
trunk/tests/src/org/jboss/messaging/tests/unit/util/timedbuffer/TimedBufferTest.java
Log:
perf fix on AIO
Modified: trunk/native/src/disktest.cpp
===================================================================
--- trunk/native/src/disktest.cpp 2009-06-02 14:31:13 UTC (rev 7163)
+++ trunk/native/src/disktest.cpp 2009-06-02 17:17:29 UTC (rev 7164)
@@ -130,15 +130,15 @@
for (long position = 0 ; position < fileSize; position += bufferSize)
{
writes++;
- struct iocb * iocb = new struct iocb();
- ::io_prep_pwrite(iocb, handle, preAllocBuffer, bufferSize, position);
- iocb->data = (void *)position;
-
- if (io_submit(aioContext, 1, &iocb) < 0)
- {
- fprintf (stderr, "Error on submitting AIO\n");
- exit(-1);
- }
+ struct iocb * iocb = new struct iocb();
+ ::io_prep_pwrite(iocb, handle, preAllocBuffer, bufferSize, position);
+ iocb->data = (void *)position;
+
+ if (io_submit(aioContext, 1, &iocb) < 0)
+ {
+ fprintf (stderr, "Error on submitting AIO\n");
+ exit(-1);
+ }
}
int writesReceived = 0;
Copied: trunk/src/main/org/jboss/messaging/core/asyncio/impl/TimedBuffer.java (from rev 7160, trunk/src/main/org/jboss/messaging/utils/timedbuffer/TimedBuffer.java)
===================================================================
--- trunk/src/main/org/jboss/messaging/core/asyncio/impl/TimedBuffer.java (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/asyncio/impl/TimedBuffer.java 2009-06-02 17:17:29 UTC (rev 7164)
@@ -0,0 +1,233 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2009, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.core.asyncio.impl;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.jboss.messaging.core.asyncio.AIOCallback;
+import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.utils.JBMThreadFactory;
+
+/**
+ * A TimedBuffer
+ *
+ * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public class TimedBuffer
+{
+ // Constants -----------------------------------------------------
+
+ private static final Logger log = Logger.getLogger(TimedBuffer.class);
+
+ // Attributes ----------------------------------------------------
+
+ private final TimedBufferObserver bufferObserver;
+
+ private final CheckTimer timerRunnable = new CheckTimer();
+
+ private volatile ScheduledFuture<?> futureTimerRunnable;
+
+ private final long timeout;
+
+ private final int bufferSize;
+
+ private final ByteBuffer currentBuffer;
+
+ private final List<AIOCallback> callbacks;
+
+ private volatile long timeLastWrite = 0;
+
+ private final ScheduledExecutorService schedule = ScheduledSingleton.getScheduledService();
+
+ private Lock lock = new ReentrantReadWriteLock().writeLock();
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ //private byte[] data;
+
+ public TimedBuffer(final TimedBufferObserver bufferObserver, final int size, final long timeout)
+ {
+ bufferSize = size;
+ this.bufferObserver = bufferObserver;
+ this.timeout = timeout;
+ this.currentBuffer = ByteBuffer.wrap(new byte[bufferSize]);
+ this.callbacks = new ArrayList<AIOCallback>();
+ }
+
+ public int position()
+ {
+ if (currentBuffer == null)
+ {
+ return 0;
+ }
+ else
+ {
+ return currentBuffer.position();
+ }
+ }
+
+ public void checkTimer()
+ {
+ if (System.currentTimeMillis() - timeLastWrite > timeout)
+ {
+ lock.lock();
+ try
+ {
+ flush();
+ }
+ finally
+ {
+ lock.unlock();
+ }
+ }
+ }
+
+ public void lock()
+ {
+ lock.lock();
+ }
+
+ public void unlock()
+ {
+ lock.unlock();
+ }
+
+ /**
+ * Verify if the size fits the buffer, if it fits we lock the buffer to avoid a flush until add is called
+ * @param sizeChecked
+ * @return
+ */
+ public synchronized boolean checkSize(final int sizeChecked)
+ {
+ if (sizeChecked > bufferSize)
+ {
+ flush();
+
+ currentBuffer.rewind();
+ }
+ else
+ {
+ // We verify against the currentBuffer.capacity as the observer may return a smaller buffer
+ if (currentBuffer.position() + sizeChecked > currentBuffer.limit())
+ {
+ flush();
+
+ currentBuffer.rewind();
+ }
+ }
+
+ return true;
+ }
+
+ public synchronized void addBytes(final ByteBuffer bytes, final AIOCallback callback)
+ {
+ currentBuffer.put(bytes);
+ callbacks.add(callback);
+
+ if (futureTimerRunnable == null)
+ {
+ futureTimerRunnable = schedule.scheduleAtFixedRate(timerRunnable, timeout, timeout, TimeUnit.MILLISECONDS);
+ }
+
+ timeLastWrite = System.currentTimeMillis();
+
+ if (currentBuffer.position() == currentBuffer.capacity())
+ {
+ flush();
+ }
+ }
+
+ public synchronized void flush()
+ {
+ if (currentBuffer != null)
+ {
+ ByteBuffer directBuffer = bufferObserver.newBuffer(currentBuffer.capacity(), currentBuffer.capacity());
+
+ directBuffer.put(currentBuffer);
+
+ bufferObserver.flushBuffer(directBuffer, callbacks);
+
+ currentBuffer.rewind();
+
+ callbacks.clear();
+ }
+
+ if (futureTimerRunnable != null)
+ {
+ futureTimerRunnable.cancel(false);
+ futureTimerRunnable = null;
+ }
+
+ timeLastWrite = 0;
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+ class CheckTimer implements Runnable
+ {
+ public void run()
+ {
+ checkTimer();
+ }
+ }
+
+ // TODO: is there a better place to get this schedule service from?
+ static class ScheduledSingleton
+ {
+ private static ScheduledExecutorService scheduleService;
+
+ private static synchronized ScheduledExecutorService getScheduledService()
+ {
+ if (scheduleService == null)
+ {
+ ThreadFactory factory = new JBMThreadFactory("JBM-buffer-scheduled-control", true);
+
+ scheduleService = Executors.newScheduledThreadPool(2, factory);
+ }
+
+ return scheduleService;
+ }
+ }
+
+}
Copied: trunk/src/main/org/jboss/messaging/core/asyncio/impl/TimedBufferObserver.java (from rev 7160, trunk/src/main/org/jboss/messaging/utils/timedbuffer/TimedBufferObserver.java)
===================================================================
--- trunk/src/main/org/jboss/messaging/core/asyncio/impl/TimedBufferObserver.java (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/asyncio/impl/TimedBufferObserver.java 2009-06-02 17:17:29 UTC (rev 7164)
@@ -0,0 +1,66 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2009, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+
+package org.jboss.messaging.core.asyncio.impl;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+
+import org.jboss.messaging.core.asyncio.AIOCallback;
+
+/**
+ * A TimedBufferObserver
+ *
+ * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public interface TimedBufferObserver
+{
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ public void flushBuffer(ByteBuffer buffer, List<AIOCallback> callbacks);
+
+
+ /** Return a buffer, with any bufferSize up to bufferSize, as long as it fits the current file */
+ public ByteBuffer newBuffer(int minSize, int maxSize);
+
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
Modified: trunk/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFile.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFile.java 2009-06-02 14:31:13 UTC (rev 7163)
+++ trunk/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFile.java 2009-06-02 17:17:29 UTC (rev 7164)
@@ -33,14 +33,14 @@
import org.jboss.messaging.core.asyncio.AIOCallback;
import org.jboss.messaging.core.asyncio.AsynchronousFile;
import org.jboss.messaging.core.asyncio.impl.AsynchronousFileImpl;
+import org.jboss.messaging.core.asyncio.impl.TimedBuffer;
+import org.jboss.messaging.core.asyncio.impl.TimedBufferObserver;
import org.jboss.messaging.core.exception.MessagingException;
import org.jboss.messaging.core.journal.BufferCallback;
import org.jboss.messaging.core.journal.IOCallback;
import org.jboss.messaging.core.journal.SequentialFile;
import org.jboss.messaging.core.journal.SequentialFileFactory;
import org.jboss.messaging.core.logging.Logger;
-import org.jboss.messaging.utils.timedbuffer.TimedBuffer;
-import org.jboss.messaging.utils.timedbuffer.TimedBufferObserver;
/**
*
Modified: trunk/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFileFactory.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFileFactory.java 2009-06-02 14:31:13 UTC (rev 7163)
+++ trunk/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFileFactory.java 2009-06-02 17:17:29 UTC (rev 7164)
@@ -206,7 +206,7 @@
// if a buffer is bigger than the configured-bufferSize, we just create a new
// buffer.
if (size > bufferSize)
- {
+ {
return AsynchronousFileImpl.newBuffer(size);
}
else
@@ -223,10 +223,10 @@
// if empty create a new one.
buffer = AsynchronousFileImpl.newBuffer(bufferSize);
- buffer.limit(alignedSize);
+ buffer.limit(alignedSize);
}
else
- {
+ {
clearBuffer(buffer);
// set the limit of the buffer to the bufferSize being required
Deleted: trunk/src/main/org/jboss/messaging/utils/timedbuffer/TimedBuffer.java
===================================================================
--- trunk/src/main/org/jboss/messaging/utils/timedbuffer/TimedBuffer.java 2009-06-02 14:31:13 UTC (rev 7163)
+++ trunk/src/main/org/jboss/messaging/utils/timedbuffer/TimedBuffer.java 2009-06-02 17:17:29 UTC (rev 7164)
@@ -1,239 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005-2009, Red Hat Middleware LLC, and individual contributors
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-
-package org.jboss.messaging.utils.timedbuffer;
-
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
-import org.jboss.messaging.core.asyncio.AIOCallback;
-import org.jboss.messaging.utils.JBMThreadFactory;
-
-/**
- * A TimedBuffer
- *
- * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
- *
- *
- */
-public class TimedBuffer
-{
-
- // Constants -----------------------------------------------------
-
- // Attributes ----------------------------------------------------
-
- private final TimedBufferObserver bufferObserver;
-
- private final CheckTimer timerRunnable = new CheckTimer();
-
- private volatile ScheduledFuture<?> futureTimerRunnable;
-
- private final long timeout;
-
- private final int bufferSize;
-
- private volatile ByteBuffer currentBuffer;
-
- private volatile List<AIOCallback> callbacks;
-
- private volatile long timeLastWrite = 0;
-
- private final ScheduledExecutorService schedule = ScheduledSingleton.getScheduledService();
-
- private Lock lock = new ReentrantReadWriteLock().writeLock();
-
- // Static --------------------------------------------------------
-
- // Constructors --------------------------------------------------
-
- // Public --------------------------------------------------------
-
- public TimedBuffer(final TimedBufferObserver bufferObserver, final int size, final long timeout)
- {
- bufferSize = size;
- this.bufferObserver = bufferObserver;
- this.timeout = timeout;
- }
-
- public int position()
- {
- if (currentBuffer == null)
- {
- return 0;
- }
- else
- {
- return currentBuffer.position();
- }
- }
-
- public void checkTimer()
- {
- if (System.currentTimeMillis() - timeLastWrite > timeout)
- {
- lock.lock();
- try
- {
- flush();
- }
- finally
- {
- lock.unlock();
- }
- }
-
- }
-
-
- public void lock()
- {
- lock.lock();
- }
-
- public void unlock()
- {
- lock.unlock();
- }
-
- /**
- * Verify if the size fits the buffer, if it fits we lock the buffer to avoid a flush until add is called
- * @param sizeChecked
- * @return
- */
- public synchronized boolean checkSize(final int sizeChecked)
- {
- final boolean fits;
- if (sizeChecked > bufferSize)
- {
- flush();
-
- // We transfer the bytes, as the bufferObserver has special alignment restrictions on the buffer addressing
- currentBuffer = bufferObserver.newBuffer(sizeChecked, sizeChecked);
-
- fits = currentBuffer != null;
- }
- else
- {
- // We verify against the currentBuffer.capacity as the observer may return a smaller buffer
- if (currentBuffer == null || currentBuffer.position() + sizeChecked > currentBuffer.limit())
- {
- flush();
- newBuffer(sizeChecked);
- }
-
- fits = currentBuffer != null;
- }
-
- return fits;
- }
-
- public synchronized void addBytes(final ByteBuffer bytes, final AIOCallback callback)
- {
- if (currentBuffer == null)
- {
- newBuffer(0);
- }
-
- currentBuffer.put(bytes);
- callbacks.add(callback);
-
- if (futureTimerRunnable == null)
- {
- futureTimerRunnable = schedule.scheduleAtFixedRate(timerRunnable, timeout, timeout, TimeUnit.MILLISECONDS);
- }
-
- timeLastWrite = System.currentTimeMillis();
-
- if (currentBuffer.position() == currentBuffer.capacity())
- {
- flush();
- }
- }
-
- public synchronized void flush()
- {
- if (currentBuffer != null)
- {
- bufferObserver.flushBuffer(currentBuffer, callbacks);
- currentBuffer = null;
- callbacks = null;
- }
-
- if (futureTimerRunnable != null)
- {
- futureTimerRunnable.cancel(false);
- futureTimerRunnable = null;
- }
-
- timeLastWrite = 0;
- }
-
- // Package protected ---------------------------------------------
-
- // Protected -----------------------------------------------------
-
- // Private -------------------------------------------------------
-
- private void newBuffer(final int minSize)
- {
- currentBuffer = bufferObserver.newBuffer(minSize, bufferSize);
- callbacks = new ArrayList<AIOCallback>();
- }
-
- // Inner classes -------------------------------------------------
-
- class CheckTimer implements Runnable
- {
- public void run()
- {
- checkTimer();
- }
- }
-
- // TODO: is there a better place to get this schedule service from?
- static class ScheduledSingleton
- {
- private static ScheduledExecutorService scheduleService;
-
- private static synchronized ScheduledExecutorService getScheduledService()
- {
- if (scheduleService == null)
- {
- ThreadFactory factory = new JBMThreadFactory("JBM-buffer-scheduled-control", true);
-
- scheduleService = Executors.newScheduledThreadPool(2, factory);
- }
-
- return scheduleService;
- }
- }
-
-}
Deleted: trunk/src/main/org/jboss/messaging/utils/timedbuffer/TimedBufferObserver.java
===================================================================
--- trunk/src/main/org/jboss/messaging/utils/timedbuffer/TimedBufferObserver.java 2009-06-02 14:31:13 UTC (rev 7163)
+++ trunk/src/main/org/jboss/messaging/utils/timedbuffer/TimedBufferObserver.java 2009-06-02 17:17:29 UTC (rev 7164)
@@ -1,66 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005-2009, Red Hat Middleware LLC, and individual contributors
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-
-
-package org.jboss.messaging.utils.timedbuffer;
-
-import java.nio.ByteBuffer;
-import java.util.List;
-
-import org.jboss.messaging.core.asyncio.AIOCallback;
-
-/**
- * A TimedBufferObserver
- *
- * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
- *
- *
- */
-public interface TimedBufferObserver
-{
-
- // Constants -----------------------------------------------------
-
- // Attributes ----------------------------------------------------
-
- // Static --------------------------------------------------------
-
- // Constructors --------------------------------------------------
-
- // Public --------------------------------------------------------
-
- public void flushBuffer(ByteBuffer buffer, List<AIOCallback> callbacks);
-
-
- /** Return a buffer, with any bufferSize up to bufferSize, as long as it fits the current file */
- public ByteBuffer newBuffer(int minSize, int maxSize);
-
-
- // Package protected ---------------------------------------------
-
- // Protected -----------------------------------------------------
-
- // Private -------------------------------------------------------
-
- // Inner classes -------------------------------------------------
-
-}
Modified: trunk/tests/src/org/jboss/messaging/tests/unit/util/timedbuffer/TimedBufferTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/util/timedbuffer/TimedBufferTest.java 2009-06-02 14:31:13 UTC (rev 7163)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/util/timedbuffer/TimedBufferTest.java 2009-06-02 17:17:29 UTC (rev 7164)
@@ -29,9 +29,9 @@
import java.util.concurrent.atomic.AtomicInteger;
import org.jboss.messaging.core.asyncio.AIOCallback;
+import org.jboss.messaging.core.asyncio.impl.TimedBuffer;
+import org.jboss.messaging.core.asyncio.impl.TimedBufferObserver;
import org.jboss.messaging.tests.util.UnitTestCase;
-import org.jboss.messaging.utils.timedbuffer.TimedBuffer;
-import org.jboss.messaging.utils.timedbuffer.TimedBufferObserver;
/**
* A TimedBufferTest
More information about the jboss-cvs-commits
mailing list