[jboss-cvs] JBoss Messaging SVN: r7151 - in trunk: src/main/org/jboss/messaging/core/asyncio/impl and 14 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Mon Jun 1 16:35:56 EDT 2009
Author: clebert.suconic at jboss.com
Date: 2009-06-01 16:35:54 -0400 (Mon, 01 Jun 2009)
New Revision: 7151
Added:
trunk/src/main/org/jboss/messaging/core/asyncio/timedbuffer/
trunk/src/main/org/jboss/messaging/core/asyncio/timedbuffer/TimedBuffer.java
trunk/src/main/org/jboss/messaging/core/asyncio/timedbuffer/TimedBufferObserver.java
trunk/tests/src/org/jboss/messaging/tests/performance/journal/PerformanceComparissonTest.java
trunk/tests/src/org/jboss/messaging/tests/unit/core/asyncio/timedbuffer/
trunk/tests/src/org/jboss/messaging/tests/unit/core/asyncio/timedbuffer/TimedBufferTest.java
Modified:
trunk/src/main/org/jboss/messaging/core/asyncio/impl/AsynchronousFileImpl.java
trunk/src/main/org/jboss/messaging/core/config/impl/ConfigurationImpl.java
trunk/src/main/org/jboss/messaging/core/journal/SequentialFile.java
trunk/src/main/org/jboss/messaging/core/journal/SequentialFileFactory.java
trunk/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFile.java
trunk/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFileFactory.java
trunk/src/main/org/jboss/messaging/core/journal/impl/AbstractSequentialFactory.java
trunk/src/main/org/jboss/messaging/core/journal/impl/JournalFile.java
trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java
trunk/src/main/org/jboss/messaging/core/journal/impl/NIOSequentialFile.java
trunk/src/main/org/jboss/messaging/core/journal/impl/NIOSequentialFileFactory.java
trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java
trunk/tests/src/org/jboss/messaging/tests/integration/journal/AIOSequentialFileFactoryTest.java
trunk/tests/src/org/jboss/messaging/tests/integration/journal/RealNIOJournalImplTest.java
trunk/tests/src/org/jboss/messaging/tests/performance/journal/JournalImplTestUnit.java
trunk/tests/src/org/jboss/messaging/tests/stress/journal/AddAndRemoveStressTest.java
trunk/tests/src/org/jboss/messaging/tests/stress/journal/remote/RemoteJournalAppender.java
trunk/tests/src/org/jboss/messaging/tests/unit/core/asyncio/AsynchronousFileTest.java
trunk/tests/src/org/jboss/messaging/tests/unit/core/asyncio/MultiThreadAsynchronousFileTest.java
trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/AlignedJournalImplTest.java
trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalAsyncTest.java
trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalImplTestBase.java
trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalImplTestUnit.java
trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/SequentialFileFactoryTestBase.java
trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java
trunk/tests/src/org/jboss/messaging/tests/util/JournalExample.java
trunk/tests/src/org/jboss/messaging/tests/util/ListJournal.java
Log:
Optimizations on journal/AIO (TimedBuffers implementation).. first commit
Modified: trunk/src/main/org/jboss/messaging/core/asyncio/impl/AsynchronousFileImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/asyncio/impl/AsynchronousFileImpl.java 2009-06-01 11:57:01 UTC (rev 7150)
+++ trunk/src/main/org/jboss/messaging/core/asyncio/impl/AsynchronousFileImpl.java 2009-06-01 20:35:54 UTC (rev 7151)
@@ -138,6 +138,8 @@
private int maxIO;
private final Lock writeLock = new ReentrantReadWriteLock().writeLock();
+
+ private final VariableLatch pendingWrites = new VariableLatch();
private Semaphore writeSemaphore;
@@ -223,11 +225,16 @@
try
{
+ while (!pendingWrites.waitCompletion(60000))
+ {
+ log.warn("Couldn't get lock after 60 seconds on closing AsynchronousFileImpl::" + this.fileName);
+ }
+
while (!writeSemaphore.tryAcquire(maxIO, 60, TimeUnit.SECONDS))
{
- log.warn("Couldn't acquire lock after 60 seconds on AIO",
- new Exception("Warning: Couldn't acquire lock after 60 seconds on AIO"));
+ log.warn("Couldn't get lock after 60 seconds on closing AsynchronousFileImpl::" + this.fileName);
}
+
writeSemaphore = null;
if (poller != null)
{
@@ -263,7 +270,8 @@
{
startPoller();
}
- writeSemaphore.acquireUninterruptibly();
+
+ pendingWrites.up();
if (writeExecutor != null)
{
@@ -271,6 +279,8 @@
{
public void run()
{
+ writeSemaphore.acquireUninterruptibly();
+
try
{
write(handler, position, size, directByteBuffer, aioCallback);
@@ -288,6 +298,8 @@
}
else
{
+ writeSemaphore.acquireUninterruptibly();
+
try
{
write(handler, position, size, directByteBuffer, aioCallback);
@@ -314,6 +326,7 @@
{
startPoller();
}
+ pendingWrites.up();
writeSemaphore.acquireUninterruptibly();
try
{
@@ -323,12 +336,14 @@
{
// Release only if an exception happened
writeSemaphore.release();
+ pendingWrites.down();
throw e;
}
catch (RuntimeException e)
{
// Release only if an exception happened
writeSemaphore.release();
+ pendingWrites.down();
throw e;
}
}
@@ -399,6 +414,7 @@
private void callbackDone(final AIOCallback callback, final ByteBuffer buffer)
{
writeSemaphore.release();
+ pendingWrites.down();
callback.done();
if (bufferCallback != null)
{
@@ -412,6 +428,7 @@
{
log.warn("CallbackError: " + errorMessage);
writeSemaphore.release();
+ pendingWrites.down();
callback.onError(errorCode, errorMessage);
}
Added: trunk/src/main/org/jboss/messaging/core/asyncio/timedbuffer/TimedBuffer.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/asyncio/timedbuffer/TimedBuffer.java (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/asyncio/timedbuffer/TimedBuffer.java 2009-06-01 20:35:54 UTC (rev 7151)
@@ -0,0 +1,239 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2009, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.core.asyncio.timedbuffer;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.jboss.messaging.core.asyncio.AIOCallback;
+import org.jboss.messaging.utils.JBMThreadFactory;
+
+/**
+ * A TimedBuffer
+ *
+ * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public class TimedBuffer
+{
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ private final TimedBufferObserver bufferObserver;
+
+ private final CheckTimer timerRunnable = new CheckTimer();
+
+ private volatile ScheduledFuture<?> futureTimerRunnable;
+
+ private final long timeout;
+
+ private final int bufferSize;
+
+ private volatile ByteBuffer currentBuffer;
+
+ private volatile List<AIOCallback> callbacks;
+
+ private volatile long timeLastWrite = 0;
+
+ private final ScheduledExecutorService schedule = ScheduledSingleton.getScheduledService();
+
+ private Lock lock = new ReentrantReadWriteLock().writeLock();
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ public TimedBuffer(final TimedBufferObserver bufferObserver, final int size, final long timeout)
+ {
+ bufferSize = size;
+ this.bufferObserver = bufferObserver;
+ this.timeout = timeout;
+ }
+
+ public int position()
+ {
+ if (currentBuffer == null)
+ {
+ return 0;
+ }
+ else
+ {
+ return currentBuffer.position();
+ }
+ }
+
+ public void checkTimer()
+ {
+ if (System.currentTimeMillis() - timeLastWrite > timeout)
+ {
+ lock.lock();
+ try
+ {
+ flush();
+ }
+ finally
+ {
+ lock.unlock();
+ }
+ }
+
+ }
+
+
+ public void lock()
+ {
+ lock.lock();
+ }
+
+ public void unlock()
+ {
+ lock.unlock();
+ }
+
+ /**
+ * Verify if the size fits the buffer, if it fits we lock the buffer to avoid a flush until add is called
+ * @param sizeChecked
+ * @return
+ */
+ public synchronized boolean checkSize(final int sizeChecked)
+ {
+ final boolean fits;
+ if (sizeChecked > bufferSize)
+ {
+ flush();
+
+ // We transfer the bytes, as the bufferObserver has special alignment restrictions on the buffer addressing
+ currentBuffer = bufferObserver.newBuffer(sizeChecked, sizeChecked);
+
+ fits = currentBuffer != null;
+ }
+ else
+ {
+ // We verify against the currentBuffer.capacity as the observer may return a smaller buffer
+ if (currentBuffer == null || currentBuffer.position() + sizeChecked > currentBuffer.limit())
+ {
+ flush();
+ newBuffer(sizeChecked);
+ }
+
+ fits = currentBuffer != null;
+ }
+
+ return fits;
+ }
+
+ public synchronized void addBytes(final ByteBuffer bytes, final AIOCallback callback)
+ {
+ if (currentBuffer == null)
+ {
+ newBuffer(0);
+ }
+
+ currentBuffer.put(bytes);
+ callbacks.add(callback);
+
+ if (futureTimerRunnable == null)
+ {
+ futureTimerRunnable = schedule.scheduleAtFixedRate(timerRunnable, timeout, timeout, TimeUnit.MILLISECONDS);
+ }
+
+ timeLastWrite = System.currentTimeMillis();
+
+ if (currentBuffer.position() == currentBuffer.capacity())
+ {
+ flush();
+ }
+ }
+
+ public synchronized void flush()
+ {
+ if (currentBuffer != null)
+ {
+ bufferObserver.flushBuffer(currentBuffer, callbacks);
+ currentBuffer = null;
+ callbacks = null;
+ }
+
+ if (futureTimerRunnable != null)
+ {
+ futureTimerRunnable.cancel(false);
+ futureTimerRunnable = null;
+ }
+
+ timeLastWrite = 0;
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ private void newBuffer(final int minSize)
+ {
+ currentBuffer = bufferObserver.newBuffer(minSize, bufferSize);
+ callbacks = new ArrayList<AIOCallback>();
+ }
+
+ // Inner classes -------------------------------------------------
+
+ class CheckTimer implements Runnable
+ {
+ public void run()
+ {
+ checkTimer();
+ }
+ }
+
+ // TODO: is there a better place to get this schedule service from?
+ static class ScheduledSingleton
+ {
+ private static ScheduledExecutorService scheduleService;
+
+ private static synchronized ScheduledExecutorService getScheduledService()
+ {
+ if (scheduleService == null)
+ {
+ ThreadFactory factory = new JBMThreadFactory("JBM-buffer-scheduled-control", true);
+
+ scheduleService = Executors.newScheduledThreadPool(2, factory);
+ }
+
+ return scheduleService;
+ }
+ }
+
+}
Added: trunk/src/main/org/jboss/messaging/core/asyncio/timedbuffer/TimedBufferObserver.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/asyncio/timedbuffer/TimedBufferObserver.java (rev 0)
+++ trunk/src/main/org/jboss/messaging/core/asyncio/timedbuffer/TimedBufferObserver.java 2009-06-01 20:35:54 UTC (rev 7151)
@@ -0,0 +1,66 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2009, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+
+package org.jboss.messaging.core.asyncio.timedbuffer;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+
+import org.jboss.messaging.core.asyncio.AIOCallback;
+
+/**
+ * A TimedBufferObserver
+ *
+ * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public interface TimedBufferObserver
+{
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ public void flushBuffer(ByteBuffer buffer, List<AIOCallback> callbacks);
+
+
+ /** Return a buffer, with any bufferSize up to bufferSize, as long as it fits the current file */
+ public ByteBuffer newBuffer(int minSize, int maxSize);
+
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
Modified: trunk/src/main/org/jboss/messaging/core/config/impl/ConfigurationImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/config/impl/ConfigurationImpl.java 2009-06-01 11:57:01 UTC (rev 7150)
+++ trunk/src/main/org/jboss/messaging/core/config/impl/ConfigurationImpl.java 2009-06-01 20:35:54 UTC (rev 7151)
@@ -95,7 +95,7 @@
public static final int DEFAULT_JOURNAL_MIN_FILES = 2;
- public static final int DEFAULT_JOURNAL_MAX_AIO = 5000;
+ public static final int DEFAULT_JOURNAL_MAX_AIO = 500;
public static final int DEFAULT_JOURNAL_REUSE_BUFFER_SIZE = 1024;
Modified: trunk/src/main/org/jboss/messaging/core/journal/SequentialFile.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/SequentialFile.java 2009-06-01 11:57:01 UTC (rev 7150)
+++ trunk/src/main/org/jboss/messaging/core/journal/SequentialFile.java 2009-06-01 20:35:54 UTC (rev 7151)
@@ -40,6 +40,8 @@
void open() throws Exception;
boolean isOpen();
+
+ void setBuffering(boolean buffering);
/**
* For certain operations (like loading) we don't need open the file with full maxIO
@@ -47,6 +49,8 @@
* @throws Exception
*/
void open(int maxIO) throws Exception;
+
+ boolean fits(int size);
int getAlignment() throws Exception;
@@ -58,9 +62,9 @@
void delete() throws Exception;
- int write(ByteBuffer bytes, IOCallback callback) throws Exception;
+ void write(ByteBuffer bytes, IOCallback callback) throws Exception;
- int write(ByteBuffer bytes, boolean sync) throws Exception;
+ void write(ByteBuffer bytes, boolean sync) throws Exception;
int read(ByteBuffer bytes, IOCallback callback) throws Exception;
@@ -76,6 +80,12 @@
long size() throws Exception;
+ void flush();
+
void renameTo(String newFileName) throws Exception;
+ void lockBuffer();
+
+ void unlockBuffer();
+
}
Modified: trunk/src/main/org/jboss/messaging/core/journal/SequentialFileFactory.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/SequentialFileFactory.java 2009-06-01 11:57:01 UTC (rev 7150)
+++ trunk/src/main/org/jboss/messaging/core/journal/SequentialFileFactory.java 2009-06-01 20:35:54 UTC (rev 7151)
@@ -45,9 +45,7 @@
void releaseBuffer(ByteBuffer buffer);
- void setBufferCallback(BufferCallback bufferCallback);
-
- BufferCallback getBufferCallback();
+ void controlBuffersLifeCycle(boolean value);
// To be used in tests only
ByteBuffer wrapBuffer(byte[] bytes);
@@ -58,6 +56,8 @@
void clearBuffer(ByteBuffer buffer);
+ void stop();
+
/**
* Create the directory if it doesn't exist yet
*/
Modified: trunk/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFile.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFile.java 2009-06-01 11:57:01 UTC (rev 7150)
+++ trunk/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFile.java 2009-06-01 20:35:54 UTC (rev 7151)
@@ -24,17 +24,22 @@
import java.io.File;
import java.nio.ByteBuffer;
+import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
+import org.jboss.messaging.core.asyncio.AIOCallback;
import org.jboss.messaging.core.asyncio.AsynchronousFile;
import org.jboss.messaging.core.asyncio.impl.AsynchronousFileImpl;
+import org.jboss.messaging.core.asyncio.timedbuffer.TimedBuffer;
+import org.jboss.messaging.core.asyncio.timedbuffer.TimedBufferObserver;
import org.jboss.messaging.core.exception.MessagingException;
import org.jboss.messaging.core.journal.BufferCallback;
import org.jboss.messaging.core.journal.IOCallback;
import org.jboss.messaging.core.journal.SequentialFile;
+import org.jboss.messaging.core.journal.SequentialFileFactory;
import org.jboss.messaging.core.logging.Logger;
/**
@@ -57,10 +62,18 @@
private final int maxIO;
private AsynchronousFile aioFile;
+
+ private final SequentialFileFactory factory;
+
+ private long fileSize = 0;
private final AtomicLong position = new AtomicLong(0);
+
+ private final TimedBuffer timedBuffer;
+
+ private BufferCallback bufferCallback;
- private BufferCallback bufferCallback;
+ private boolean buffering = true;
/** A context switch on AIO would make it to synchronize the disk before
switching to the new thread, what would cause
@@ -71,21 +84,31 @@
/** The pool for Thread pollers */
private final Executor pollerExecutor;
- public AIOSequentialFile(final String journalDir, final String fileName, final int maxIO, final BufferCallback bufferCallback, final Executor executor, final Executor pollerExecutor)
+ public AIOSequentialFile(final SequentialFileFactory factory,
+ final int bufferSize,
+ final int bufferTimeoutMilliseconds,
+ final String journalDir,
+ final String fileName,
+ final int maxIO,
+ final BufferCallback bufferCallback,
+ final Executor executor,
+ final Executor pollerExecutor)
{
+ this.factory = factory;
this.journalDir = journalDir;
this.fileName = fileName;
this.maxIO = maxIO;
this.bufferCallback = bufferCallback;
this.executor = executor;
this.pollerExecutor = pollerExecutor;
+ this.timedBuffer = new TimedBuffer(new LocalBufferObserver(), bufferSize, bufferTimeoutMilliseconds);
}
- public boolean isOpen()
+ public boolean isOpen()
{
return opened;
}
-
+
public int getAlignment() throws Exception
{
checkOpened();
@@ -101,14 +124,39 @@
return pos;
}
+
+ public boolean fits(int size)
+ {
+ return timedBuffer.checkSize(size);
+ }
+
+ public void flush()
+ {
+ timedBuffer.flush();
+ }
+ public void lockBuffer()
+ {
+ timedBuffer.lock();
+ }
+
+ public void unlockBuffer()
+ {
+ timedBuffer.unlock();
+ }
+
+
+
public synchronized void close() throws Exception
{
checkOpened();
opened = false;
+
+
+ timedBuffer.flush();
final CountDownLatch donelatch = new CountDownLatch(1);
-
+
executor.execute(new Runnable()
{
public void run()
@@ -116,8 +164,7 @@
donelatch.countDown();
}
});
-
-
+
while (!donelatch.await(60, TimeUnit.SECONDS))
{
log.warn("Executor on file " + fileName + " couldn't complete its tasks in 60 seconds.",
@@ -184,6 +231,8 @@
}
aioFile.fill(filePosition, blocks, blockSize, fillCharacter);
+
+ this.fileSize = aioFile.size();
}
public String getFileName()
@@ -201,9 +250,10 @@
*/
public void renameTo(String fileName) throws Exception
{
- throw new IllegalStateException ("method rename not supported on AIO");
-
+ throw new IllegalStateException("method rename not supported on AIO");
+
}
+
public synchronized void open(final int currentMaxIO) throws Exception
{
opened = true;
@@ -211,6 +261,7 @@
aioFile.open(journalDir + "/" + fileName, currentMaxIO);
position.set(0);
aioFile.setBufferCallback(bufferCallback);
+ this.fileSize = aioFile.size();
}
@@ -253,35 +304,52 @@
return bytesRead;
}
- public int write(final ByteBuffer bytes, final IOCallback callback) throws Exception
+ public void write(final ByteBuffer bytes, final IOCallback callback) throws Exception
{
- final int bytesToWrite = bytes.limit();
-
- final long positionToWrite = position.getAndAdd(bytesToWrite);
-
- execWrite(bytes, callback, bytesToWrite, positionToWrite);
-
- return bytesToWrite;
+ if (buffering)
+ {
+ timedBuffer.addBytes(bytes, callback);
+ }
+ else
+ {
+ doWrite(bytes, callback);
+ }
}
- public int write(final ByteBuffer bytes, final boolean sync) throws Exception
+ public void write(final ByteBuffer bytes, final boolean sync) throws Exception
{
if (sync)
{
WaitCompletion completion = new WaitCompletion();
- int bytesWritten = write(bytes, completion);
+ write(bytes, completion);
+
+ if (sync)
+ {
+ timedBuffer.flush();
+ }
completion.waitLatch();
-
- return bytesWritten;
}
else
{
- return write(bytes, DummyCallback.instance);
+ write(bytes, DummyCallback.instance);
}
}
+ /* (non-Javadoc)
+ * @see org.jboss.messaging.core.journal.SequentialFile#setBuffering(boolean)
+ */
+ public void setBuffering(boolean buffering)
+ {
+ this.buffering = buffering;
+ if (!buffering)
+ {
+ timedBuffer.flush();
+ }
+ };
+
+
public void sync() throws Exception
{
throw new IllegalArgumentException("This method is not supported on AIO");
@@ -312,11 +380,12 @@
// Private methods
// -----------------------------------------------------------------------------------------------------
- private void execWrite(final ByteBuffer bytes,
- final IOCallback callback,
- final int bytesToWrite,
- final long positionToWrite)
+ private void doWrite(final ByteBuffer bytes, final IOCallback callback)
{
+ final int bytesToWrite = factory.calculateBlockSize(bytes.limit());
+
+ final long positionToWrite = position.getAndAdd(bytesToWrite);
+
aioFile.write(positionToWrite, bytesToWrite, bytes, callback);
}
@@ -377,4 +446,79 @@
}
}
+ private static class DelegateCallback implements IOCallback
+ {
+ final List<AIOCallback> delegates;
+
+ DelegateCallback(List<AIOCallback> delegates)
+ {
+ this.delegates = delegates;
+ }
+
+ public void done()
+ {
+ for (AIOCallback callback : delegates)
+ {
+ try
+ {
+ callback.done();
+ }
+ catch (Throwable e)
+ {
+ log.warn(e.getMessage(), e);
+ }
+ }
+ }
+
+ public void onError(int errorCode, String errorMessage)
+ {
+ for (AIOCallback callback : delegates)
+ {
+ try
+ {
+ callback.onError(errorCode, errorMessage);
+ }
+ catch (Throwable e)
+ {
+ log.warn(e.getMessage(), e);
+ }
+ }
+ }
+ }
+
+ class LocalBufferObserver implements TimedBufferObserver
+ {
+
+ public void flushBuffer(ByteBuffer buffer, List<AIOCallback> callbacks)
+ {
+ buffer.flip();
+
+ if (buffer.limit() == 0)
+ {
+ factory.releaseBuffer(buffer);
+ }
+ else
+ {
+ doWrite(buffer, new DelegateCallback(callbacks));
+ }
+ }
+
+ public ByteBuffer newBuffer(int minSize, int size)
+ {
+ size = factory.calculateBlockSize(size);
+
+ long availableSize = fileSize - position.get();
+
+ if (availableSize == 0 || availableSize < minSize)
+ {
+ return null;
+ }
+ else
+ {
+ return factory.newBuffer((int)Math.min(size, availableSize));
+ }
+ }
+
+ }
+
}
Modified: trunk/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFileFactory.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFileFactory.java 2009-06-01 11:57:01 UTC (rev 7150)
+++ trunk/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFileFactory.java 2009-06-01 20:35:54 UTC (rev 7151)
@@ -23,11 +23,14 @@
package org.jboss.messaging.core.journal.impl;
import java.nio.ByteBuffer;
+import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import org.jboss.messaging.core.asyncio.impl.AsynchronousFileImpl;
+import org.jboss.messaging.core.journal.BufferCallback;
import org.jboss.messaging.core.journal.SequentialFile;
+import org.jboss.messaging.core.logging.Logger;
import org.jboss.messaging.utils.JBMThreadFactory;
/**
@@ -40,6 +43,23 @@
public class AIOSequentialFileFactory extends AbstractSequentialFactory
{
+
+
+ private static final Logger log = Logger.getLogger(AIOSequentialFileFactory.class);
+
+ private static final boolean trace = log.isTraceEnabled();
+
+
+ private final ReuseBuffersController buffersControl = new ReuseBuffersController();
+
+ // This method exists just to make debug easier.
+ // I could replace log.trace by log.info temporarily while I was debugging
+ // Journal
+ private static final void trace(final String message)
+ {
+ log.trace(message);
+ }
+
/** A single AIO write executor for every AIO File.
* This is used only for AIO & instant operations. We only need one executor-thread for the entire journal as we always have only one active file.
* And even if we had multiple files at a given moment, this should still be ok, as we control max-io in a semaphore, guaranteeing AIO calls don't block on disk calls */
@@ -48,15 +68,29 @@
private final Executor pollerExecutor = Executors.newCachedThreadPool(new JBMThreadFactory("JBM-AIO-poller-pool" + System.identityHashCode(this), true));
+
+ // TODO make this configurable
+ final int bufferSize;
+
+ // TODO make this configurable
+ final int bufferTimeout;
public AIOSequentialFileFactory(final String journalDir)
{
+ this(journalDir, 1024 * 1024, 1);
+ }
+
+
+ public AIOSequentialFileFactory(final String journalDir, int bufferSize, int bufferTimeout)
+ {
super(journalDir);
+ this.bufferSize = bufferSize;
+ this.bufferTimeout = bufferTimeout;
}
public SequentialFile createSequentialFile(final String fileName, final int maxIO)
{
- return new AIOSequentialFile(journalDir, fileName, maxIO, bufferCallback, writeExecutor, pollerExecutor);
+ return new AIOSequentialFile(this, bufferSize, bufferTimeout, journalDir, fileName, maxIO, buffersControl.callback, writeExecutor, pollerExecutor);
}
public boolean isSupportsCallbacks()
@@ -68,6 +102,18 @@
{
return AsynchronousFileImpl.isLoaded();
}
+
+ public void controlBuffersLifeCycle(boolean value)
+ {
+ if (value)
+ {
+ buffersControl.enable();
+ }
+ else
+ {
+ buffersControl.disable();
+ }
+ }
public ByteBuffer newBuffer(int size)
{
@@ -75,7 +121,8 @@
{
size = (size / 512 + 1) * 512;
}
- return AsynchronousFileImpl.newBuffer(size);
+
+ return buffersControl.newBuffer(size);
}
public void clearBuffer(final ByteBuffer directByteBuffer)
@@ -112,4 +159,120 @@
{
AsynchronousFileImpl.destroyBuffer(buffer);
}
+
+ public void stop()
+ {
+ buffersControl.clearPoll();
+ }
+
+
+ /** Class that will control buffer-reuse */
+ private class ReuseBuffersController
+ {
+ private volatile long bufferReuseLastTime = System.currentTimeMillis();
+
+ /** This queue is fed by {@link JournalImpl.ReuseBuffersController.LocalBufferCallback}} which is called directly by NIO or NIO.
+ * On the case of the AIO this is almost called by the native layer as soon as the buffer is not being used any more
+ * and ready to be reused or GCed */
+ private final ConcurrentLinkedQueue<ByteBuffer> reuseBuffersQueue = new ConcurrentLinkedQueue<ByteBuffer>();
+
+ /** During reload we may disable/enable buffer reuse */
+ private boolean enabled = true;
+
+ final BufferCallback callback = new LocalBufferCallback();
+
+ public void enable()
+ {
+ this.enabled = true;
+ }
+
+ public void disable()
+ {
+ this.enabled = false;
+ }
+
+ public ByteBuffer newBuffer(final int size)
+ {
+ // if a new buffer wasn't requested in 10 seconds, we clear the queue
+ // This is being done this way as we don't need another Timeout Thread
+ // just to cleanup this
+ if (bufferSize > 0 && System.currentTimeMillis() - bufferReuseLastTime > 10000)
+ {
+ if (trace) trace("Clearing reuse buffers queue with " + reuseBuffersQueue.size() + " elements");
+
+ bufferReuseLastTime = System.currentTimeMillis();
+
+ clearPoll();
+ }
+
+ // if a buffer is bigger than the configured-bufferSize, we just create a new
+ // buffer.
+ if (size > bufferSize)
+ {
+ return AsynchronousFileImpl.newBuffer(size);
+ }
+ else
+ {
+ // We need to allocate buffers following the rules of the storage
+ // being used (AIO/NIO)
+ int alignedSize = calculateBlockSize(size);
+
+ // Try getting a buffer from the queue...
+ ByteBuffer buffer = reuseBuffersQueue.poll();
+
+ if (buffer == null)
+ {
+ // if empty create a new one.
+ buffer = AsynchronousFileImpl.newBuffer(bufferSize);
+
+ buffer.limit(alignedSize);
+ }
+ else
+ {
+ clearBuffer(buffer);
+
+ // set the limit of the buffer to the bufferSize being required
+ buffer.limit(alignedSize);
+ }
+
+ buffer.rewind();
+
+ return buffer;
+ }
+ }
+
+ public void clearPoll()
+ {
+ ByteBuffer reusedBuffer;
+
+ while ((reusedBuffer = reuseBuffersQueue.poll()) != null)
+ {
+ releaseBuffer(reusedBuffer);
+ }
+ }
+
+ private class LocalBufferCallback implements BufferCallback
+ {
+ public void bufferDone(final ByteBuffer buffer)
+ {
+ if (enabled)
+ {
+ bufferReuseLastTime = System.currentTimeMillis();
+
+ // If a buffer has any other than the configured bufferSize, the buffer
+ // will be just sent to GC
+ if (buffer.capacity() == bufferSize)
+ {
+ reuseBuffersQueue.offer(buffer);
+ }
+ else
+ {
+ releaseBuffer(buffer);
+ }
+ }
+ }
+ }
+ }
+
+
}
Modified: trunk/src/main/org/jboss/messaging/core/journal/impl/AbstractSequentialFactory.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/impl/AbstractSequentialFactory.java 2009-06-01 11:57:01 UTC (rev 7150)
+++ trunk/src/main/org/jboss/messaging/core/journal/impl/AbstractSequentialFactory.java 2009-06-01 20:35:54 UTC (rev 7151)
@@ -45,14 +45,21 @@
private static final Logger log = Logger.getLogger(AbstractSequentialFactory.class);
protected final String journalDir;
-
- protected BufferCallback bufferCallback;
public AbstractSequentialFactory(final String journalDir)
{
this.journalDir = journalDir;
}
+
+ public void controlBuffersLifeCycle(boolean value)
+ {
+ }
+
+ public void stop()
+ {
+ }
+
/**
* Create the directory if it doesn't exist yet
*/
@@ -73,7 +80,7 @@
FilenameFilter fnf = new FilenameFilter()
{
public boolean accept(final File file, final String name)
- {
+ {
return name.endsWith("." + extension);
}
};
@@ -88,22 +95,4 @@
return Arrays.asList(fileNames);
}
- /**
- * @return the bufferCallback
- */
- public BufferCallback getBufferCallback()
- {
- return bufferCallback;
- }
-
- /**
- * @param bufferCallback the bufferCallback to set
- */
- public void setBufferCallback(BufferCallback bufferCallback)
- {
- this.bufferCallback = bufferCallback;
- }
-
-
-
}
Modified: trunk/src/main/org/jboss/messaging/core/journal/impl/JournalFile.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/impl/JournalFile.java 2009-06-01 11:57:01 UTC (rev 7150)
+++ trunk/src/main/org/jboss/messaging/core/journal/impl/JournalFile.java 2009-06-01 20:35:54 UTC (rev 7151)
@@ -49,13 +49,9 @@
boolean isCanReclaim();
- void extendOffset(final int delta);
-
long getOffset();
int getOrderingID();
- void setOffset(final long offset);
-
SequentialFile getFile();
}
Modified: trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java 2009-06-01 11:57:01 UTC (rev 7150)
+++ trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java 2009-06-01 20:35:54 UTC (rev 7151)
@@ -43,16 +43,14 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.jboss.messaging.core.buffers.ChannelBuffer;
import org.jboss.messaging.core.buffers.ChannelBuffers;
import org.jboss.messaging.core.exception.MessagingException;
-import org.jboss.messaging.core.journal.BufferCallback;
import org.jboss.messaging.core.journal.EncodingSupport;
import org.jboss.messaging.core.journal.IOCallback;
import org.jboss.messaging.core.journal.LoadManager;
@@ -197,23 +195,8 @@
private ExecutorService filesExecutor = null;
- private final int reuseBufferSize;
+ private final Lock lock = new ReentrantReadWriteLock().writeLock();
- /** Object that will control buffer's callback and getting buffers from the queue */
- private final ReuseBuffersController buffersControl = new ReuseBuffersController();
-
- /**
- * Used to lock access while calculating the positioning of currentFile.
- * That has to be done in single-thread, and it needs to be a very-fast operation
- */
- private final Semaphore positionLock = new Semaphore(1, true);
-
- /**
- * a WriteLock means, currentFile is being changed. When we get a writeLock we wait all the write operations to finish on that file before we can move to the next file
- * a ReadLock means, currentFile is being used, do not change it until I'm done with it
- */
- private final ReadWriteLock rwlock = new ReentrantReadWriteLock();
-
private volatile JournalFile currentFile;
private volatile int state;
@@ -229,8 +212,7 @@
final SequentialFileFactory fileFactory,
final String filePrefix,
final String fileExtension,
- final int maxAIO,
- final int reuseBufferSize)
+ final int maxAIO)
{
if (fileSize < MIN_FILE_SIZE)
{
@@ -263,8 +245,6 @@
throw new IllegalStateException("maxAIO should aways be a positive number");
}
- this.reuseBufferSize = fileFactory.calculateBlockSize(reuseBufferSize);
-
this.fileSize = fileSize;
this.minFiles = minFiles;
@@ -274,8 +254,6 @@
this.syncNonTransactional = syncNonTransactional;
this.fileFactory = fileFactory;
-
- this.fileFactory.setBufferCallback(this.buffersControl.callback);
this.filePrefix = filePrefix;
@@ -291,7 +269,7 @@
{
appendAddRecord(id, recordType, new ByteArrayEncoding(record));
}
-
+
public void appendAddRecord(final long id, final byte recordType, final EncodingSupport record) throws Exception
{
appendAddRecord(id, recordType, record, syncNonTransactional);
@@ -308,7 +286,7 @@
int size = SIZE_ADD_RECORD + recordLength;
- ChannelBuffer bb = ChannelBuffers.wrappedBuffer(newBuffer(size));
+ ChannelBuffer bb = ChannelBuffers.wrappedBuffer(newBuffer(size));
bb.writeByte(ADD_RECORD);
bb.writeInt(-1); // skip ID part
@@ -318,6 +296,7 @@
record.encode(bb);
bb.writeInt(size);
+ lock.lock();
try
{
JournalFile usedFile = appendRecord(bb.toByteBuffer(), sync, null);
@@ -326,14 +305,7 @@
}
finally
{
- try
- {
- rwlock.readLock().unlock();
- }
- catch (Exception ignored)
- {
- // This could happen if the thread was interrupted
- }
+ lock.unlock();
}
}
@@ -358,7 +330,7 @@
int size = SIZE_UPDATE_RECORD + record.getEncodeSize();
- ChannelBuffer bb = ChannelBuffers.wrappedBuffer(newBuffer(size));
+ ChannelBuffer bb = ChannelBuffers.wrappedBuffer(newBuffer(size));
bb.writeByte(UPDATE_RECORD);
bb.writeInt(-1); // skip ID part
@@ -368,6 +340,7 @@
record.encode(bb);
bb.writeInt(size);
+ lock.lock();
try
{
JournalFile usedFile = appendRecord(bb.toByteBuffer(), syncNonTransactional, null);
@@ -376,14 +349,7 @@
}
finally
{
- try
- {
- rwlock.readLock().unlock();
- }
- catch (Exception ignored)
- {
- // This could happen if the thread was interrupted
- }
+ lock.unlock();
}
}
@@ -410,6 +376,7 @@
bb.putLong(id);
bb.putInt(size);
+ lock.lock();
try
{
JournalFile usedFile = appendRecord(bb, syncNonTransactional, null);
@@ -418,14 +385,7 @@
}
finally
{
- try
- {
- rwlock.readLock().unlock();
- }
- catch (Exception ignored)
- {
- // This could happen if the thread was interrupted
- }
+ lock.unlock();
}
}
@@ -444,12 +404,12 @@
{
throw new IllegalStateException("Journal must be loaded first");
}
-
+
int recordLength = record.getEncodeSize();
int size = SIZE_ADD_RECORD_TX + recordLength;
- ChannelBuffer bb = ChannelBuffers.wrappedBuffer(newBuffer(size));
+ ChannelBuffer bb = ChannelBuffers.wrappedBuffer(newBuffer(size));
bb.writeByte(ADD_RECORD_TX);
bb.writeInt(-1); // skip ID part
@@ -460,6 +420,7 @@
record.encode(bb);
bb.writeInt(size);
+ lock.lock();
try
{
JournalFile usedFile = appendRecord(bb.toByteBuffer(), false, getTransactionCallback(txID));
@@ -470,14 +431,7 @@
}
finally
{
- try
- {
- rwlock.readLock().unlock();
- }
- catch (Exception ignored)
- {
- // This could happen if the thread was interrupted
- }
+ lock.unlock();
}
}
@@ -501,7 +455,7 @@
int size = SIZE_UPDATE_RECORD_TX + record.getEncodeSize();
- ChannelBuffer bb = ChannelBuffers.wrappedBuffer(newBuffer(size));
+ ChannelBuffer bb = ChannelBuffers.wrappedBuffer(newBuffer(size));
bb.writeByte(UPDATE_RECORD_TX);
bb.writeInt(-1); // skip ID part
@@ -512,6 +466,7 @@
record.encode(bb);
bb.writeInt(size);
+ lock.lock();
try
{
JournalFile usedFile = appendRecord(bb.toByteBuffer(), false, getTransactionCallback(txID));
@@ -522,14 +477,7 @@
}
finally
{
- try
- {
- rwlock.readLock().unlock();
- }
- catch (Exception ignored)
- {
- // This could happen if the thread was interrupted
- }
+ lock.unlock();
}
}
@@ -547,7 +495,7 @@
int size = SIZE_DELETE_RECORD_TX + (record != null ? record.getEncodeSize() : 0);
- ChannelBuffer bb = ChannelBuffers.wrappedBuffer(newBuffer(size));
+ ChannelBuffer bb = ChannelBuffers.wrappedBuffer(newBuffer(size));
bb.writeByte(DELETE_RECORD_TX);
bb.writeInt(-1); // skip ID part
@@ -560,6 +508,7 @@
}
bb.writeInt(size);
+ lock.lock();
try
{
JournalFile usedFile = appendRecord(bb.toByteBuffer(), false, getTransactionCallback(txID));
@@ -570,14 +519,7 @@
}
finally
{
- try
- {
- rwlock.readLock().unlock();
- }
- catch (Exception ignored)
- {
- // This could happen if the thread was interrupted
- }
+ lock.unlock();
}
}
@@ -590,7 +532,7 @@
int size = SIZE_DELETE_RECORD_TX;
- ChannelBuffer bb = ChannelBuffers.wrappedBuffer(newBuffer(size));
+ ChannelBuffer bb = ChannelBuffers.wrappedBuffer(newBuffer(size));
bb.writeByte(DELETE_RECORD_TX);
bb.writeInt(-1); // skip ID part
@@ -599,6 +541,7 @@
bb.writeInt(0);
bb.writeInt(size);
+ lock.lock();
try
{
JournalFile usedFile = appendRecord(bb.toByteBuffer(), false, getTransactionCallback(txID));
@@ -609,14 +552,7 @@
}
finally
{
- try
- {
- rwlock.readLock().unlock();
- }
- catch (Exception ignored)
- {
- // This could happen if the thread was interrupted
- }
+ lock.unlock();
}
}
@@ -646,6 +582,7 @@
TransactionCallback callback = getTransactionCallback(txID);
+ lock.lock();
try
{
JournalFile usedFile = appendRecord(bb, syncTransactional, callback);
@@ -654,14 +591,7 @@
}
finally
{
- try
- {
- rwlock.readLock().unlock();
- }
- catch (Exception ignored)
- {
- // This could happen if the thread was interrupted
- }
+ lock.unlock();
}
// We should wait this outside of the lock, to increase throughput
@@ -706,6 +636,7 @@
TransactionCallback callback = getTransactionCallback(txID);
+ lock.lock();
try
{
JournalFile usedFile = appendRecord(bb, syncTransactional, callback);
@@ -716,14 +647,7 @@
}
finally
{
- try
- {
- rwlock.readLock().unlock();
- }
- catch (Exception ignored)
- {
- // This could happen if the thread was interrupted
- }
+ lock.unlock();
}
// We should wait this outside of the lock, to increase throuput
@@ -759,6 +683,7 @@
TransactionCallback callback = getTransactionCallback(txID);
+ lock.lock();
try
{
JournalFile usedFile = appendRecord(bb, syncTransactional, callback);
@@ -769,14 +694,7 @@
}
finally
{
- try
- {
- rwlock.readLock().unlock();
- }
- catch (Exception ignored)
- {
- // This could happen if the thread was interrupted
- }
+ lock.unlock();
}
// We should wait this outside of the lock, to increase throuput
@@ -830,6 +748,20 @@
return maxID;
}
+ private boolean isInvalidSize(int bufferPos, int size)
+ {
+ if (size < 0)
+ {
+ return true;
+ }
+ else
+ {
+ final int position = bufferPos + size;
+ return position > fileSize || position < 0;
+
+ }
+ }
+
/**
* <p>Load data accordingly to the record layouts</p>
*
@@ -872,14 +804,12 @@
throw new IllegalStateException("Journal must be in started state");
}
- // Disabling life cycle control on buffers, as we are reading the buffer
- buffersControl.disable();
+ fileFactory.controlBuffersLifeCycle(false);
-
Map<Long, TransactionHolder> transactions = new LinkedHashMap<Long, TransactionHolder>();
List<JournalFile> orderedFiles = orderFiles();
-
+
int lastDataPos = SIZE_HEADER;
long maxID = -1;
@@ -906,9 +836,8 @@
file.getFile().getFileName());
}
-
wholeFileBuffer.position(0);
-
+
// First long is the ordering timestamp, we just jump its position
wholeFileBuffer.position(SIZE_HEADER);
@@ -928,8 +857,10 @@
continue;
}
- if (wholeFileBuffer.position() + SIZE_INT > fileSize)
+ if (isInvalidSize(wholeFileBuffer.position(), SIZE_INT))
{
+ hasData = true;
+ wholeFileBuffer.position(pos + 1);
// II - Ignore this record, lets keep looking
continue;
}
@@ -938,25 +869,14 @@
// This is what supports us from not re-filling the whole file
int readFileId = wholeFileBuffer.getInt();
- // IV - This record is from a previous file-usage. The file was
- // reused and we need to ignore this record
- if (readFileId != file.getOrderingID())
- {
- // If a file has damaged records, we make it a dataFile, and the
- // next reclaiming will fix it
- hasData = true;
-
- wholeFileBuffer.position(pos + 1);
-
- continue;
- }
-
long transactionID = 0;
if (isTransaction(recordType))
{
- if (wholeFileBuffer.position() + SIZE_LONG > fileSize)
+ if (isInvalidSize(wholeFileBuffer.position(), SIZE_LONG))
{
+ wholeFileBuffer.position(pos + 1);
+ hasData = true;
continue;
}
@@ -967,8 +887,10 @@
if (!isCompleteTransaction(recordType))
{
- if (wholeFileBuffer.position() + SIZE_LONG > fileSize)
+ if (isInvalidSize(wholeFileBuffer.position(), SIZE_LONG))
{
+ wholeFileBuffer.position(pos + 1);
+ hasData = true;
continue;
}
@@ -993,19 +915,18 @@
if (isContainsBody(recordType))
{
- if (wholeFileBuffer.position() + SIZE_INT > fileSize)
+ if (isInvalidSize(wholeFileBuffer.position(), SIZE_INT))
{
+ wholeFileBuffer.position(pos + 1);
+ hasData = true;
continue;
}
variableSize = wholeFileBuffer.getInt();
- if (wholeFileBuffer.position() + variableSize > fileSize)
+ if (isInvalidSize(wholeFileBuffer.position(), variableSize))
{
- log.warn("Record at position " + pos +
- " file:" +
- file.getFile().getFileName() +
- " is corrupted and it is being ignored");
+ wholeFileBuffer.position(pos + 1);
continue;
}
@@ -1036,17 +957,21 @@
// VI - this is completing V, We will validate the size at the end
// of the record,
// But we avoid buffer overflows by damaged data
- if (pos + recordSize + variableSize + preparedTransactionExtraDataSize > fileSize)
+ if (isInvalidSize(pos, recordSize + variableSize + preparedTransactionExtraDataSize))
{
// Avoid a buffer overflow caused by damaged data... continue
// scanning for more records...
- log.warn("Record at position " + pos +
- " file:" +
- file.getFile().getFileName() +
- " is corrupted and it is being ignored");
+ log.debug("Record at position " + pos +
+ " recordType = " + recordType +
+ " file:" + file.getFile().getFileName() +
+ " recordSize: " + recordSize +
+ " variableSize: " + variableSize +
+ " preparedTransactionExtraDataSize: " + preparedTransactionExtraDataSize +
+ " is corrupted and it is being ignored (II)");
// If a file has damaged records, we make it a dataFile, and the
// next reclaiming will fix it
hasData = true;
+ wholeFileBuffer.position(pos + 1);
continue;
}
@@ -1063,10 +988,12 @@
// checkSize by some sort of calculated hash)
if (checkSize != variableSize + recordSize + preparedTransactionExtraDataSize)
{
- log.warn("Record at position " + pos +
+ log.debug("Record at position " + pos +
+ " recordType = " +
+ recordType +
" file:" +
file.getFile().getFileName() +
- " is corrupted and it is being ignored");
+ " is corrupted and it is being ignored (III)");
// If a file has damaged records, we make it a dataFile, and the
// next reclaiming will fix it
@@ -1077,6 +1004,17 @@
continue;
}
+ // This record is from a previous file-usage. The file was
+ // reused and we need to ignore this record
+ if (readFileId != file.getOrderingID())
+ {
+ // If a file has damaged records, we make it a dataFile, and the
+ // next reclaiming will fix it
+ hasData = true;
+
+ continue;
+ }
+
wholeFileBuffer.position(oldPos);
// At this point everything is checked. So we relax and just load
@@ -1202,7 +1140,8 @@
wholeFileBuffer.get(extraData);
// Pair <FileID, NumberOfElements>
- Pair<Integer, Integer>[] recordedSummary = readTransactionalElementsSummary(variableSize, wholeFileBuffer);
+ Pair<Integer, Integer>[] recordedSummary = readTransactionalElementsSummary(variableSize,
+ wholeFileBuffer);
tx.prepared = true;
@@ -1225,7 +1164,8 @@
}
else
{
- log.warn("Prepared transaction " + transactionID + " wasn't considered completed, it will be ignored");
+ log.warn("Prepared transaction " + transactionID +
+ " wasn't considered completed, it will be ignored");
tx.invalid = true;
}
@@ -1240,7 +1180,8 @@
// We need to read it even if transaction was not found, or
// the reading checks would fail
// Pair <OrderId, NumberOfElements>
- Pair<Integer, Integer>[] recordedSummary = readTransactionalElementsSummary(variableSize, wholeFileBuffer);
+ Pair<Integer, Integer>[] recordedSummary = readTransactionalElementsSummary(variableSize,
+ wholeFileBuffer);
// The commit could be alone on its own journal-file and the
// whole transaction body was reclaimed but not the
@@ -1335,12 +1276,14 @@
// not doing what it was supposed to do
if (checkSize != variableSize + recordSize + preparedTransactionExtraDataSize)
{
- throw new IllegalStateException("Internal error on loading file. Position doesn't match with checkSize, file = " + file.getFile() + ", pos = " + pos);
+ throw new IllegalStateException("Internal error on loading file. Position doesn't match with checkSize, file = " + file.getFile() +
+ ", pos = " +
+ pos);
}
lastDataPos = wholeFileBuffer.position();
}
-
+
fileFactory.releaseBuffer(wholeFileBuffer);
file.getFile().close();
@@ -1356,8 +1299,8 @@
}
}
- buffersControl.enable();
-
+ fileFactory.controlBuffersLifeCycle(true);
+
// Create any more files we need
// FIXME - size() involves a scan
@@ -1379,7 +1322,7 @@
while (iter.hasNext())
{
currentFile = iter.next();
-
+
if (!iter.hasNext())
{
iter.remove();
@@ -1391,8 +1334,6 @@
currentFile.getFile().open();
currentFile.getFile().position(currentFile.getFile().calculateBlockStart(lastDataPos));
-
- currentFile.setOffset(currentFile.getFile().position());
}
else
{
@@ -1653,14 +1594,13 @@
public synchronized void stop() throws Exception
{
trace("Stopping the journal");
-
+
if (state == STATE_STOPPED)
{
throw new IllegalStateException("Journal is already stopped");
}
- positionLock.acquire();
- rwlock.writeLock().lock();
+ lock.lock();
try
{
@@ -1688,15 +1628,14 @@
freeFiles.clear();
openedFiles.clear();
-
- buffersControl.clearPoll();
+ fileFactory.stop();
+
state = STATE_STOPPED;
}
finally
{
- positionLock.release();
- rwlock.writeLock().unlock();
+ lock.unlock();
}
}
@@ -1720,20 +1659,26 @@
SequentialFile sf = file.getFile();
+ sf.setBuffering(false);
+
sf.open(1);
+
+ sf.position(0);
ByteBuffer bb = fileFactory.newBuffer(SIZE_INT);
-
+
bb.putInt(newOrderingID);
+
+ bb.rewind();
- int bytesWritten = sf.write(bb, true);
+ sf.write(bb, true);
+ sf.setBuffering(true);
+
JournalFile jf = new JournalFileImpl(sf, newOrderingID);
- sf.position(bytesWritten);
+ sf.position(bb.limit());
- jf.setOffset(bytesWritten);
-
sf.close();
return jf;
@@ -1854,8 +1799,8 @@
2 +
(transactionData != null ? transactionData.getEncodeSize() + SIZE_INT : 0);
- ChannelBuffer bb = ChannelBuffers.wrappedBuffer(newBuffer(size));
-
+ ChannelBuffer bb = ChannelBuffers.wrappedBuffer(newBuffer(size));
+
bb.writeByte(recordType);
bb.writeInt(-1); // skip ID part
bb.writeLong(txID);
@@ -1942,27 +1887,20 @@
return recordSize;
}
-
/**
* This method requires bufferControl disabled, or the reads are going to be invalid
* */
private List<JournalFile> orderFiles() throws Exception
{
-
- if (buffersControl.enabled)
- {
- // Sanity check, this shouldn't happen unless someone made an invalid change on the code
- throw new IllegalStateException("Buffer life cycle control needs to be disabled at this point!!!");
- }
-
+
List<String> fileNames = fileFactory.listFiles(fileExtension);
List<JournalFile> orderedFiles = new ArrayList<JournalFile>(fileNames.size());
-
+
for (String fileName : fileNames)
{
SequentialFile file = fileFactory.createSequentialFile(fileName, maxAIO);
-
+
file.open(1);
ByteBuffer bb = fileFactory.newBuffer(SIZE_INT);
@@ -1970,9 +1908,9 @@
file.read(bb);
int orderingID = bb.getInt();
-
+
fileFactory.releaseBuffer(bb);
-
+
bb = null;
if (nextOrderingId.get() < orderingID)
@@ -1984,7 +1922,7 @@
file.close();
}
-
+
// Now order them by ordering id - we can't use the file name for ordering
// since we can re-use dataFiles
@@ -2010,7 +1948,7 @@
* */
private JournalFile appendRecord(final ByteBuffer bb, final boolean sync, final TransactionCallback callback) throws Exception
{
- positionLock.acquire();
+ lock.lock();
try
{
@@ -2021,21 +1959,20 @@
int size = bb.limit();
- if (size % currentFile.getFile().getAlignment() != 0)
- {
- throw new IllegalStateException("You can't write blocks in a size different than " + currentFile.getFile()
- .getAlignment());
- }
-
// We take into account the fileID used on the Header
if (size > fileSize - currentFile.getFile().calculateBlockStart(SIZE_HEADER))
{
throw new IllegalArgumentException("Record is too large to store " + size);
}
- if (currentFile == null || fileSize - currentFile.getOffset() < size)
+ // The buffer on the file can't be flushed or the currentFile could be affected
+ currentFile.getFile().lockBuffer();
+
+ if (!currentFile.getFile().fits(size))
{
+ currentFile.getFile().unlockBuffer();
moveNextFile();
+ currentFile.getFile().lockBuffer();
}
if (currentFile == null)
@@ -2043,41 +1980,37 @@
throw new IllegalStateException("Current file = null");
}
- currentFile.extendOffset(size);
+ bb.position(SIZE_BYTE);
- // we must get the readLock before we release positionLock
- // We don't want a race condition where currentFile is changed by
- // another write as soon as we leave this block
- rwlock.readLock().lock();
+ bb.putInt(currentFile.getOrderingID());
- }
- finally
- {
- positionLock.release();
- }
+ bb.rewind();
- bb.position(SIZE_BYTE);
+ if (callback != null)
+ {
+ currentFile.getFile().write(bb, callback);
- bb.putInt(currentFile.getOrderingID());
+ // TODO: Do we need to do this?
+ // it wouldn't scale, but it is probably useful in some usecases?
+ // It should be configurable at least
+ if (sync)
+ {
+ currentFile.getFile().flush();
+ }
+ }
+ else
+ {
+ currentFile.getFile().write(bb, sync);
+ }
- bb.rewind();
-
- if (callback != null)
- {
- // We are 100% sure currentFile won't change, since rwLock.readLock is
- // locked
- currentFile.getFile().write(bb, callback);
- // callback.waitCompletion() should be done on the caller of this
- // method, so we would have better performance
+ return currentFile;
}
- else
+ finally
{
- // We are 100% sure currentFile won't change, since rwLock.readLock is
- // locked
- currentFile.getFile().write(bb, sync);
+ currentFile.getFile().unlockBuffer();
+ lock.unlock();
}
- return currentFile;
}
/**
@@ -2109,12 +2042,14 @@
bb.rewind();
- int bytesWritten = sequentialFile.write(bb, true);
+ sequentialFile.setBuffering(false);
- JournalFile info = new JournalFileImpl(sequentialFile, orderingID);
+ sequentialFile.write(bb, true);
- info.extendOffset(bytesWritten);
+ sequentialFile.setBuffering(true);
+ JournalFile info = new JournalFileImpl(sequentialFile, orderingID);
+
if (!keepOpened)
{
sequentialFile.close();
@@ -2128,8 +2063,6 @@
file.getFile().open();
file.getFile().position(file.getFile().calculateBlockStart(SIZE_HEADER));
-
- file.setOffset(file.getFile().calculateBlockStart(SIZE_HEADER));
}
private int generateOrderingID()
@@ -2140,7 +2073,7 @@
// You need to guarantee lock.acquire() before calling this method
private void moveNextFile() throws InterruptedException
{
- rwlock.writeLock().lock();
+ lock.lock();
try
{
closeFile(currentFile);
@@ -2149,7 +2082,7 @@
}
finally
{
- rwlock.writeLock().unlock();
+ lock.unlock();
}
}
@@ -2312,7 +2245,7 @@
public ByteBuffer newBuffer(final int size)
{
- return buffersControl.newBuffer(size);
+ return ByteBuffer.allocate(size);
}
// Inner classes
@@ -2399,114 +2332,6 @@
}
}
- /** Class that will control buffer-reuse */
- private class ReuseBuffersController
- {
- private volatile long bufferReuseLastTime = System.currentTimeMillis();
-
- /** This queue is fed by {@link JournalImpl.ReuseBuffersController.LocalBufferCallback}} which is called directly by NIO or NIO.
- * On the case of the AIO this is almost called by the native layer as soon as the buffer is not being used any more
- * and ready to be reused or GCed */
- private final ConcurrentLinkedQueue<ByteBuffer> reuseBuffersQueue = new ConcurrentLinkedQueue<ByteBuffer>();
-
- /** During reload we may disable/enable buffer reuse */
- private boolean enabled = true;
-
- final BufferCallback callback = new LocalBufferCallback();
-
- public void enable()
- {
- this.enabled = true;
- }
-
- public void disable()
- {
- this.enabled = false;
- }
-
- public ByteBuffer newBuffer(final int size)
- {
- // if a new buffer wasn't requested in 10 seconds, we clear the queue
- // This is being done this way as we don't need another Timeout Thread
- // just to cleanup this
- if (reuseBufferSize > 0 && System.currentTimeMillis() - bufferReuseLastTime > 10000)
- {
- trace("Clearing reuse buffers queue with " + reuseBuffersQueue.size() + " elements");
-
- bufferReuseLastTime = System.currentTimeMillis();
-
- clearPoll();
- }
-
- // if a buffer is bigger than the configured-size, we just create a new
- // buffer.
- if (size > reuseBufferSize)
- {
- return fileFactory.newBuffer(size);
- }
- else
- {
- // We need to allocate buffers following the rules of the storage
- // being used (AIO/NIO)
- int alignedSize = fileFactory.calculateBlockSize(size);
-
- // Try getting a buffer from the queue...
- ByteBuffer buffer = reuseBuffersQueue.poll();
-
- if (buffer == null)
- {
- // if empty create a new one.
- buffer = fileFactory.newBuffer(reuseBufferSize);
-
- buffer.limit(alignedSize);
- }
- else
- {
- // set the limit of the buffer to the size being required
- buffer.limit(alignedSize);
-
- fileFactory.clearBuffer(buffer);
- }
-
- buffer.rewind();
-
- return buffer;
- }
- }
-
- public void clearPoll()
- {
- ByteBuffer reusedBuffer;
-
- while ((reusedBuffer = reuseBuffersQueue.poll()) != null)
- {
- fileFactory.releaseBuffer(reusedBuffer);
- }
- }
-
- private class LocalBufferCallback implements BufferCallback
- {
- public void bufferDone(final ByteBuffer buffer)
- {
- if (enabled)
- {
- bufferReuseLastTime = System.currentTimeMillis();
-
- // If a buffer has any other than the configured size, the buffer
- // will be just sent to GC
- if (buffer.capacity() == reuseBufferSize)
- {
- reuseBuffersQueue.offer(buffer);
- }
- else
- {
- fileFactory.releaseBuffer(buffer);
- }
- }
- }
- }
- }
-
private class JournalTransaction
{
private List<Pair<JournalFile, Long>> pos;
@@ -2664,8 +2489,7 @@
}
}
-
-
+
private class ByteArrayEncoding implements EncodingSupport
{
@@ -2693,8 +2517,7 @@
return data.length;
}
}
-
-
+
// Used on Load
private static class TransactionHolder
{
@@ -2717,6 +2540,4 @@
}
-
-
}
Modified: trunk/src/main/org/jboss/messaging/core/journal/impl/NIOSequentialFile.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/impl/NIOSequentialFile.java 2009-06-01 11:57:01 UTC (rev 7150)
+++ trunk/src/main/org/jboss/messaging/core/journal/impl/NIOSequentialFile.java 2009-06-01 20:35:54 UTC (rev 7151)
@@ -26,8 +26,8 @@
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
+import java.util.concurrent.atomic.AtomicLong;
-import org.jboss.messaging.core.journal.BufferCallback;
import org.jboss.messaging.core.journal.IOCallback;
import org.jboss.messaging.core.journal.SequentialFile;
import org.jboss.messaging.core.logging.Logger;
@@ -45,31 +45,41 @@
private static final Logger log = Logger.getLogger(NIOSequentialFile.class);
private File file;
+
+ private long fileSize = 0;
private final String directory;
private FileChannel channel;
private RandomAccessFile rfile;
+
+ private final AtomicLong position = new AtomicLong(0);
- BufferCallback bufferCallback;
-
- public NIOSequentialFile(final String directory, final String fileName, final BufferCallback bufferCallback)
+ public NIOSequentialFile(final String directory, final String fileName)
{
this.directory = directory;
file = new File(directory + "/" + fileName);
- this.bufferCallback = bufferCallback;
}
public int getAlignment()
{
return 1;
}
+
+ public void flush()
+ {
+ }
public int calculateBlockStart(final int position) throws Exception
{
return position;
}
+
+ public boolean fits(final int size)
+ {
+ return this.position.get() + size <= fileSize;
+ }
public String getFileName()
{
@@ -111,6 +121,8 @@
channel.force(false);
channel.position(0);
+
+ fileSize = channel.size();
}
public void close() throws Exception
@@ -169,40 +181,30 @@
}
- public int write(final ByteBuffer bytes, final boolean sync) throws Exception
+ public void write(final ByteBuffer bytes, final boolean sync) throws Exception
{
- int bytesRead = channel.write(bytes);
+ position.addAndGet(bytes.limit());
+ channel.write(bytes);
+
if (sync)
{
sync();
}
-
- if (bufferCallback != null)
- {
- bufferCallback.bufferDone(bytes);
- }
-
- return bytesRead;
}
- public int write(final ByteBuffer bytes, final IOCallback callback) throws Exception
+ public void write(final ByteBuffer bytes, final IOCallback callback) throws Exception
{
try
{
- int bytesRead = channel.write(bytes);
+ position.addAndGet(bytes.limit());
+
+ channel.write(bytes);
if (callback != null)
{
callback.done();
}
-
- if (bufferCallback != null)
- {
- bufferCallback.bufferDone(bytes);
- }
-
- return bytesRead;
}
catch (Exception e)
{
@@ -224,11 +226,12 @@
public void position(final long pos) throws Exception
{
channel.position(pos);
+ position.set(pos);
}
public long position() throws Exception
{
- return channel.position();
+ return position.get();
}
public void renameTo(final String newFileName) throws Exception
@@ -245,4 +248,25 @@
return "NIOSequentialFile " + file;
}
+ /* (non-Javadoc)
+ * @see org.jboss.messaging.core.journal.SequentialFile#setBuffering(boolean)
+ */
+ public void setBuffering(boolean buffering)
+ {
+ }
+
+ /* (non-Javadoc)
+ * @see org.jboss.messaging.core.journal.SequentialFile#lockBuffer()
+ */
+ public void lockBuffer()
+ {
+ }
+
+ /* (non-Javadoc)
+ * @see org.jboss.messaging.core.journal.SequentialFile#unlockBuffer()
+ */
+ public void unlockBuffer()
+ {
+ }
+
}
Modified: trunk/src/main/org/jboss/messaging/core/journal/impl/NIOSequentialFileFactory.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/impl/NIOSequentialFileFactory.java 2009-06-01 11:57:01 UTC (rev 7150)
+++ trunk/src/main/org/jboss/messaging/core/journal/impl/NIOSequentialFileFactory.java 2009-06-01 20:35:54 UTC (rev 7151)
@@ -53,7 +53,7 @@
// maxIO is ignored on NIO
public SequentialFile createSequentialFile(final String fileName, final int maxIO)
{
- return new NIOSequentialFile(journalDir, fileName, bufferCallback);
+ return new NIOSequentialFile(journalDir, fileName);
}
public boolean isSupportsCallbacks()
Modified: trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java 2009-06-01 11:57:01 UTC (rev 7150)
+++ trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java 2009-06-01 20:35:54 UTC (rev 7151)
@@ -158,7 +158,7 @@
SequentialFileFactory bindingsFF = new NIOSequentialFileFactory(bindingsDir);
- bindingsJournal = new JournalImpl(1024 * 1024, 2, true, true, bindingsFF, "jbm-bindings", "bindings", 1, -1);
+ bindingsJournal = new JournalImpl(1024 * 1024, 2, true, true, bindingsFF, "jbm-bindings", "bindings", 1);
String journalDir = config.getJournalDirectory();
@@ -202,8 +202,7 @@
journalFF,
"jbm-data",
"jbm",
- config.getJournalMaxAIO(),
- config.getJournalBufferReuseSize());
+ config.getJournalMaxAIO());
String largeMessagesDirectory = config.getLargeMessagesDirectory();
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/journal/AIOSequentialFileFactoryTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/journal/AIOSequentialFileFactoryTest.java 2009-06-01 11:57:01 UTC (rev 7150)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/journal/AIOSequentialFileFactoryTest.java 2009-06-01 20:35:54 UTC (rev 7151)
@@ -24,11 +24,8 @@
import java.io.File;
import java.nio.ByteBuffer;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.atomic.AtomicInteger;
import org.jboss.messaging.core.asyncio.impl.AsynchronousFileImpl;
-import org.jboss.messaging.core.journal.IOCallback;
import org.jboss.messaging.core.journal.SequentialFile;
import org.jboss.messaging.core.journal.SequentialFileFactory;
import org.jboss.messaging.core.journal.impl.AIOSequentialFileFactory;
@@ -80,104 +77,4 @@
factory.releaseBuffer(buff);
}
- public void testBlockCallback() throws Exception
- {
- class BlockCallback implements IOCallback
- {
- AtomicInteger countDone = new AtomicInteger(0);
-
- AtomicInteger countError = new AtomicInteger(0);
-
- CountDownLatch blockLatch;
-
- BlockCallback()
- {
- blockLatch = new CountDownLatch(1);
- }
-
- public void release()
- {
- blockLatch.countDown();
- }
-
- public void done()
- {
- try
- {
- blockLatch.await();
- }
- catch (InterruptedException e)
- {
- e.printStackTrace();
- }
-
- countDone.incrementAndGet();
- }
-
- public void onError(final int errorCode, final String errorMessage)
- {
- try
- {
- blockLatch.await();
- }
- catch (InterruptedException e)
- {
- e.printStackTrace();
- }
-
- countError.incrementAndGet();
- }
- }
-
- BlockCallback callback = new BlockCallback();
-
- final int NUMBER_OF_RECORDS = 500;
-
- SequentialFile file = factory.createSequentialFile("callbackBlock.log", 1000);
- file.open();
- file.fill(0, 512 * NUMBER_OF_RECORDS, (byte)'a');
-
- for (int i = 0; i < NUMBER_OF_RECORDS; i++)
- {
- ByteBuffer buffer = factory.newBuffer(512);
-
- buffer.putInt(i + 10);
-
- for (int j = buffer.position(); j < buffer.limit(); j++)
- {
- buffer.put((byte)'b');
- }
-
- file.write(buffer, callback);
- }
-
- callback.release();
- file.close();
- assertEquals(NUMBER_OF_RECORDS, callback.countDone.get());
- assertEquals(0, callback.countError.get());
-
- file.open();
-
- ByteBuffer buffer = factory.newBuffer(512);
-
- for (int i = 0; i < NUMBER_OF_RECORDS; i++)
- {
-
- file.read(buffer);
- buffer.rewind();
-
- int recordRead = buffer.getInt();
-
- assertEquals(i + 10, recordRead);
-
- for (int j = buffer.position(); j < buffer.limit(); j++)
- {
- assertEquals((byte)'b', buffer.get());
- }
-
- }
-
- file.close();
- }
-
}
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/journal/RealNIOJournalImplTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/journal/RealNIOJournalImplTest.java 2009-06-01 11:57:01 UTC (rev 7150)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/journal/RealNIOJournalImplTest.java 2009-06-01 20:35:54 UTC (rev 7151)
@@ -40,20 +40,18 @@
{
private static final Logger log = Logger.getLogger(RealNIOJournalImplTest.class);
- protected String journalDir = System.getProperty("user.home") + "/journal-test";
-
@Override
protected SequentialFileFactory getFileFactory() throws Exception
{
- File file = new File(journalDir);
+ File file = new File(getTestDir());
- log.debug("deleting directory " + journalDir);
+ log.debug("deleting directory " + getTestDir());
deleteDirectory(file);
file.mkdir();
- return new NIOSequentialFileFactory(journalDir);
+ return new NIOSequentialFileFactory(getTestDir());
}
@Override
Modified: trunk/tests/src/org/jboss/messaging/tests/performance/journal/JournalImplTestUnit.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/performance/journal/JournalImplTestUnit.java 2009-06-01 11:57:01 UTC (rev 7150)
+++ trunk/tests/src/org/jboss/messaging/tests/performance/journal/JournalImplTestUnit.java 2009-06-01 20:35:54 UTC (rev 7151)
@@ -202,7 +202,7 @@
{
Journal journal =
new JournalImpl(10 * 1024 * 1024, 10, true, true, getFileFactory(),
- "jbm-data", "jbm", 5000, 10 * 1024);
+ "jbm-data", "jbm", 5000);
journal.start();
@@ -264,7 +264,7 @@
Journal journal =
new JournalImpl(10 * 1024 * 1024, numFiles, true, true, getFileFactory(),
- "jbm-data", "jbm", 5000, 0);
+ "jbm-data", "jbm", 5000);
journal.start();
@@ -290,7 +290,7 @@
journal =
new JournalImpl(10 * 1024 * 1024, numFiles, true, true, getFileFactory(),
- "jbm-data", "jbm", 5000, 0);
+ "jbm-data", "jbm", 5000);
journal.start();
journal.load(new ArrayList<RecordInfo>(), null);
Added: trunk/tests/src/org/jboss/messaging/tests/performance/journal/PerformanceComparissonTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/performance/journal/PerformanceComparissonTest.java (rev 0)
+++ trunk/tests/src/org/jboss/messaging/tests/performance/journal/PerformanceComparissonTest.java 2009-06-01 20:35:54 UTC (rev 7151)
@@ -0,0 +1,461 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2009, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.tests.performance.journal;
+
+import java.io.File;
+
+import org.jboss.messaging.core.journal.EncodingSupport;
+import org.jboss.messaging.core.journal.LoadManager;
+import org.jboss.messaging.core.journal.PreparedTransactionInfo;
+import org.jboss.messaging.core.journal.RecordInfo;
+import org.jboss.messaging.core.journal.SequentialFileFactory;
+import org.jboss.messaging.core.journal.impl.AIOSequentialFileFactory;
+import org.jboss.messaging.core.journal.impl.JournalImpl;
+import org.jboss.messaging.core.journal.impl.NIOSequentialFileFactory;
+import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
+import org.jboss.messaging.tests.util.UnitTestCase;
+
+/**
+ * A PerformanceComparissonTest
+ *
+ * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public class PerformanceComparissonTest extends UnitTestCase
+{
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ private final long NUM_RECORDS = 100000;
+
+ private final long WARMUP_RECORDS = 1000;
+
+ private int SIZE_RECORD = 1000;
+
+ private final byte ADD_RECORD = 1;
+
+ private final byte UPDATE1 = 2;
+
+ private final byte UPDATE2 = 3;
+
+ private final int ITERATIONS = 2;
+
+ private final boolean PERFORM_UPDATE = true;
+
+ private static final LoadManager dummyLoader = new LoadManager()
+ {
+
+ public void addPreparedTransaction(final PreparedTransactionInfo preparedTransaction)
+ {
+ }
+
+ public void addRecord(final RecordInfo info)
+ {
+ }
+
+ public void deleteRecord(final long id)
+ {
+ }
+
+ public void updateRecord(final RecordInfo info)
+ {
+ }
+ };
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+ File file = new File(getTestDir());
+
+ deleteDirectory(file);
+
+ file.mkdirs();
+ }
+
+ protected void tearDown() throws Exception
+ {
+ // super.tearDown();
+ }
+
+ public void disabled_testAddDeleteAIO() throws Exception
+ {
+ for (int i = 0; i < ITERATIONS; i++)
+ {
+ if (i > 0)
+ {
+ tearDown();
+ setUp();
+ }
+
+ System.out.println("Test AIO # " + i);
+ testAddDeleteJournal(new AIOSequentialFileFactory(getTestDir(), 100 * 1024, 2),
+ NUM_RECORDS,
+ SIZE_RECORD,
+ 20,
+ 10 * 1024 * 1024);
+
+ }
+ }
+
+ public void disabled_testAddDeleteNIO() throws Exception
+ {
+ for (int i = 0; i < ITERATIONS; i++)
+ {
+ if (i > 0)
+ {
+ tearDown();
+ setUp();
+ }
+ System.out.println("Test NIO # " + i);
+ testAddDeleteJournal(new NIOSequentialFileFactory(getTestDir()),
+ NUM_RECORDS,
+ SIZE_RECORD,
+ 20,
+ 10 * 1024 * 1024);
+ }
+ }
+
+ public void testAddDeleteJournal(SequentialFileFactory fileFactory,
+ long records,
+ int size,
+ int numberOfFiles,
+ int journalSize) throws Exception
+ {
+
+ JournalImpl journal = new JournalImpl(journalSize, // 10M.. we believe that's the usual cilinder
+ // size.. not an exact science here
+ numberOfFiles, // number of files pre-allocated
+ true, // sync on commit
+ false, // no sync on non transactional
+ fileFactory, // AIO or NIO
+ "jbm", // file name
+ "jbm", // extension
+ 500); // it's like a semaphore for callback on the AIO layer
+ // this during record writes
+
+ journal.start();
+ journal.load(dummyLoader);
+
+ FakeMessage msg = new FakeMessage(size);
+ FakeQueueEncoding update = new FakeQueueEncoding();
+
+ long timeStart = System.currentTimeMillis();
+ for (long i = 0; i < records; i++)
+ {
+ if (i == WARMUP_RECORDS)
+ {
+ timeStart = System.currentTimeMillis();
+ }
+ journal.appendAddRecord(i, ADD_RECORD, msg);
+ if (PERFORM_UPDATE)
+ {
+ journal.appendUpdateRecord(i, UPDATE1, update);
+ }
+ }
+
+ for (long i = 0; i < records; i++)
+ {
+ journal.appendUpdateRecord(i, UPDATE2, update);
+ journal.appendDeleteRecord(i);
+ }
+
+ System.out.println("Produced records before stop " + (NUM_RECORDS - WARMUP_RECORDS) +
+ " in " +
+ (System.currentTimeMillis() - timeStart) +
+ " milliseconds");
+
+ journal.stop();
+
+ System.out.println("Produced records after stop " + (NUM_RECORDS - WARMUP_RECORDS) +
+ " in " +
+ (System.currentTimeMillis() - timeStart) +
+ " milliseconds");
+
+ journal = new JournalImpl(journalSize, // 10M.. we believe that's the usual cilinder
+ // size.. not an exact science here
+ numberOfFiles, // number of files pre-allocated
+ true, // sync on commit
+ false, // no sync on non transactional
+ fileFactory, // AIO or NIO
+ "jbm", // file name
+ "jbm", // extension
+ 500); // it's like a semaphore for callback on the AIO layer
+ // this during record writes
+
+ journal.start();
+ journal.load(dummyLoader);
+
+ }
+
+ public void testAIO() throws Exception
+ {
+ for (int i = 0; i < ITERATIONS; i++)
+ {
+ if (i > 0)
+ {
+ tearDown();
+ setUp();
+ }
+
+ System.out.println("Test AIO # " + i);
+ testJournal(new AIOSequentialFileFactory(getTestDir(), 1024 * 1024, 2),
+ NUM_RECORDS,
+ SIZE_RECORD,
+ 13,
+ 10 * 1024 * 1024);
+
+ }
+ }
+
+ public void testNIO() throws Exception
+ {
+ for (int i = 0; i < ITERATIONS; i++)
+ {
+ if (i > 0)
+ {
+ tearDown();
+ setUp();
+ }
+ System.out.println("Test NIO # " + i);
+ testJournal(new NIOSequentialFileFactory(getTestDir()), NUM_RECORDS, SIZE_RECORD, 13, 10 * 1024 * 1024);
+ }
+ }
+
+
+ public void testTransactional() throws Exception
+ {
+ //SequentialFileFactory factory = new AIOSequentialFileFactory(getTestDir(), 1024 * 1024, 1);
+ SequentialFileFactory factory = new NIOSequentialFileFactory(getTestDir());
+
+ JournalImpl journal = new JournalImpl(1024 * 1024 * 10, // 10M.. we believe that's the usual cilinder
+ // size.. not an exact science here
+ 10, // number of files pre-allocated
+ true, // sync on commit
+ false, // no sync on non transactional
+ factory, // AIO or NIO
+ "jbm", // file name
+ "jbm", // extension
+ 500); // it's like a semaphore for callback on the AIO layer
+
+ journal.start();
+ journal.load(dummyLoader);
+
+ long id = 1;
+
+ long start = System.currentTimeMillis();
+ for (int i = 0 ; i < 200; i++)
+ {
+ journal.appendAddRecordTransactional(i, id++, (byte)1, new byte[]{(byte)1});
+ journal.appendCommitRecord(i);
+
+ }
+ long end = System.currentTimeMillis();
+
+
+ System.out.println("Value = " + (end - start));
+
+ journal.stop();
+
+
+
+ }
+
+ public void testDeleteme() throws Exception
+ {
+
+ JournalImpl journal = new JournalImpl(1024 * 1024 * 10, // 10M.. we believe that's the usual cilinder
+ // size.. not an exact science here
+ 10, // number of files pre-allocated
+ true, // sync on commit
+ false, // no sync on non transactional
+ new AIOSequentialFileFactory(getTestDir(), 1024 * 1024, 2), // AIO or NIO
+ "jbm", // file name
+ "jbm", // extension
+ 500); // it's like a semaphore for callback on the AIO layer
+ // this during record writes
+
+ journal.start();
+ journal.load(dummyLoader);
+
+ FakeMessage msg = new FakeMessage(1024);
+ FakeQueueEncoding update = new FakeQueueEncoding();
+
+ journal.appendAddRecord(1, (byte)1, msg);
+
+ journal.forceMoveNextFile();
+
+ journal.appendUpdateRecord(1, (byte)2, update);
+
+ journal.appendAddRecord(2, (byte)1, msg);
+
+ journal.appendDeleteRecord(1);
+
+ journal.forceMoveNextFile();
+
+ journal.appendDeleteRecord(2);
+
+ journal.stop();
+
+ journal = new JournalImpl(1024 * 1024 * 10, // 10M.. we believe that's the usual cilinder
+ // size.. not an exact science here
+ 2, // number of files pre-allocated
+ true, // sync on commit
+ false, // no sync on non transactional
+ new AIOSequentialFileFactory(getTestDir(), 1024 * 1024, 2), // AIO or NIO
+ "jbm", // file name
+ "jbm", // extension
+ 500); // it's like a semaphore for callback on the AIO layer
+ // this during record writes
+
+ journal.start();
+ journal.load(dummyLoader);
+
+ }
+
+ public void testJournal(SequentialFileFactory fileFactory, long records, int size, int numberOfFiles, int journalSize) throws Exception
+ {
+
+ JournalImpl journal = new JournalImpl(journalSize, // 10M.. we believe that's the usual cilinder
+ // size.. not an exact science here
+ numberOfFiles, // number of files pre-allocated
+ true, // sync on commit
+ false, // no sync on non transactional
+ fileFactory, // AIO or NIO
+ "jbm", // file name
+ "jbm", // extension
+ 500); // it's like a semaphore for callback on the AIO layer
+ // this during record writes
+
+ journal.start();
+ journal.load(dummyLoader);
+
+ FakeMessage msg = new FakeMessage(size);
+ FakeQueueEncoding update = new FakeQueueEncoding();
+
+ long timeStart = System.currentTimeMillis();
+ for (long i = 0; i < records; i++)
+ {
+// System.out.println("record # " + i);
+ if (i == WARMUP_RECORDS)
+ {
+ timeStart = System.currentTimeMillis();
+ }
+ journal.appendAddRecord(i, ADD_RECORD, msg);
+ if (PERFORM_UPDATE)
+ {
+ journal.appendUpdateRecord(i, UPDATE1, update);
+ }
+ }
+
+ System.out.println("Produced records before stop " + (NUM_RECORDS - WARMUP_RECORDS) +
+ " in " +
+ (System.currentTimeMillis() - timeStart) +
+ " milliseconds");
+
+ journal.stop();
+
+ System.out.println("Produced records after stop " + (NUM_RECORDS - WARMUP_RECORDS) +
+ " in " +
+ (System.currentTimeMillis() - timeStart) +
+ " milliseconds");
+
+ }
+
+ class FakeMessage implements EncodingSupport
+ {
+ final int size;
+
+ byte bytes[];
+
+ FakeMessage(int size)
+ {
+ this.size = size;
+ bytes = new byte[size];
+ for (int i = 0; i < size; i++)
+ {
+ bytes[i] = (byte)'a';
+ }
+ }
+
+ public void decode(MessagingBuffer buffer)
+ {
+ }
+
+ public void encode(MessagingBuffer buffer)
+ {
+ buffer.writeBytes(this.bytes);
+ }
+
+ /* (non-Javadoc)
+ * @see org.jboss.messaging.core.journal.EncodingSupport#getEncodeSize()
+ */
+ public int getEncodeSize()
+ {
+ return size;
+ }
+
+ }
+
+ private static class FakeQueueEncoding implements EncodingSupport
+ {
+
+ public FakeQueueEncoding()
+ {
+ }
+
+ public void decode(final MessagingBuffer buffer)
+ {
+ }
+
+ public void encode(final MessagingBuffer buffer)
+ {
+ for (int i = 0 ; i < 8; i++)
+ {
+ buffer.writeByte((byte)'q');
+ }
+ }
+
+ public int getEncodeSize()
+ {
+ return 8;
+ }
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
Modified: trunk/tests/src/org/jboss/messaging/tests/stress/journal/AddAndRemoveStressTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/stress/journal/AddAndRemoveStressTest.java 2009-06-01 11:57:01 UTC (rev 7150)
+++ trunk/tests/src/org/jboss/messaging/tests/stress/journal/AddAndRemoveStressTest.java 2009-06-01 20:35:54 UTC (rev 7151)
@@ -66,6 +66,8 @@
};
private static final long NUMBER_OF_MESSAGES = 210000l;
+
+ private static final int NUMBER_OF_FILES_ON_JOURNAL = 6;
// Attributes ----------------------------------------------------
@@ -80,7 +82,7 @@
{
SequentialFileFactory factory = new AIOSequentialFileFactory(getTestDir());
- JournalImpl impl = new JournalImpl(10 * 1024 * 1024, 60, true, false, factory, "jbm", "jbm", 1000, 0);
+ JournalImpl impl = new JournalImpl(10 * 1024 * 1024, NUMBER_OF_FILES_ON_JOURNAL, true, false, factory, "jbm", "jbm", 1000);
impl.start();
@@ -98,7 +100,7 @@
impl.stop();
factory = new AIOSequentialFileFactory(getTestDir());
- impl = new JournalImpl(10 * 1024 * 1024, 60, true, false, factory, "jbm", "jbm", 1000, 0);
+ impl = new JournalImpl(10 * 1024 * 1024, NUMBER_OF_FILES_ON_JOURNAL, true, false, factory, "jbm", "jbm", 1000);
impl.start();
@@ -117,7 +119,7 @@
impl.stop();
factory = new AIOSequentialFileFactory(getTestDir());
- impl = new JournalImpl(10 * 1024 * 1024, 60, true, false, factory, "jbm", "jbm", 1000, 0);
+ impl = new JournalImpl(10 * 1024 * 1024, NUMBER_OF_FILES_ON_JOURNAL, true, false, factory, "jbm", "jbm", 1000);
impl.start();
@@ -125,6 +127,8 @@
ArrayList<PreparedTransactionInfo> trans = new ArrayList<PreparedTransactionInfo>();
impl.load(info, trans);
+
+ impl.forceMoveNextFile();
if (info.size() > 0)
{
@@ -142,7 +146,7 @@
{
SequentialFileFactory factory = new AIOSequentialFileFactory(getTestDir());
- JournalImpl impl = new JournalImpl(10 * 1024 * 1024, 60, true, false, factory, "jbm", "jbm", 1000, 0);
+ JournalImpl impl = new JournalImpl(10 * 1024 * 1024, NUMBER_OF_FILES_ON_JOURNAL, true, false, factory, "jbm", "jbm", 1000);
impl.start();
@@ -161,7 +165,7 @@
impl.stop();
factory = new AIOSequentialFileFactory(getTestDir());
- impl = new JournalImpl(10 * 1024 * 1024, 60, true, false, factory, "jbm", "jbm", 1000, 0);
+ impl = new JournalImpl(10 * 1024 * 1024, 10, true, false, factory, "jbm", "jbm", 1000);
impl.start();
@@ -180,7 +184,7 @@
impl.stop();
factory = new AIOSequentialFileFactory(getTestDir());
- impl = new JournalImpl(10 * 1024 * 1024, 60, true, false, factory, "jbm", "jbm", 1000, 0);
+ impl = new JournalImpl(10 * 1024 * 1024, NUMBER_OF_FILES_ON_JOURNAL, true, false, factory, "jbm", "jbm", 1000);
impl.start();
@@ -194,6 +198,7 @@
System.out.println("Info ID: " + info.get(0).id);
}
+ impl.forceMoveNextFile();
impl.checkAndReclaimFiles();
assertEquals(0, info.size());
Modified: trunk/tests/src/org/jboss/messaging/tests/stress/journal/remote/RemoteJournalAppender.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/stress/journal/remote/RemoteJournalAppender.java 2009-06-01 11:57:01 UTC (rev 7150)
+++ trunk/tests/src/org/jboss/messaging/tests/stress/journal/remote/RemoteJournalAppender.java 2009-06-01 20:35:54 UTC (rev 7151)
@@ -143,7 +143,7 @@
public static JournalImpl createJournal(String journalType, String journalDir)
{
JournalImpl journal = new JournalImpl(10485760, 2, true,
- false, getFactory(journalType, journalDir), "journaltst", "tst", 5000, 0);
+ false, getFactory(journalType, journalDir), "journaltst", "tst", 5000);
return journal;
}
Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/asyncio/AsynchronousFileTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/asyncio/AsynchronousFileTest.java 2009-06-01 11:57:01 UTC (rev 7150)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/asyncio/AsynchronousFileTest.java 2009-06-01 20:35:54 UTC (rev 7151)
@@ -22,7 +22,6 @@
package org.jboss.messaging.tests.unit.core.asyncio;
-import java.lang.ref.WeakReference;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.charset.Charset;
@@ -163,7 +162,6 @@
long valueInitial = System.currentTimeMillis();
- long lastTime = System.currentTimeMillis();
int counter = 0;
Iterator<CountDownCallback> iter2 = list2.iterator();
@@ -173,44 +171,13 @@
controller.write(counter * size, size, buffer, tmp);
controller.write(counter * size, size, buffer, tmp2);
- if (++counter % 5000 == 0)
- {
- debug(5000 * 1000 / (System.currentTimeMillis() - lastTime) + " rec/sec (Async)");
- lastTime = System.currentTimeMillis();
- }
+ ++counter;
}
- long timeTotal = System.currentTimeMillis() - valueInitial;
-
- debug("Asynchronous time = " + timeTotal +
- " for " +
- numberOfLines +
- " registers " +
- " size each line = " +
- size +
- " Records/Sec=" +
- numberOfLines *
- 1000 /
- timeTotal +
- " (Assynchronous)");
-
latchDone.await();
latchDone2.await();
- timeTotal = System.currentTimeMillis() - valueInitial;
- debug("After completions time = " + timeTotal +
- " for " +
- numberOfLines +
- " registers " +
- " size each line = " +
- size +
- " Records/Sec=" +
- numberOfLines *
- 1000 /
- timeTotal +
- " (Assynchronous)");
-
for (CountDownCallback callback : list)
{
assertEquals(1, callback.timesDoneCalled.get());
Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/asyncio/MultiThreadAsynchronousFileTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/asyncio/MultiThreadAsynchronousFileTest.java 2009-06-01 11:57:01 UTC (rev 7150)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/asyncio/MultiThreadAsynchronousFileTest.java 2009-06-01 20:35:54 UTC (rev 7151)
@@ -241,23 +241,13 @@
{
latchFinishThread.await();
}
+
for (CountDownCallback callback : list)
{
assertTrue(callback.doneCalled);
assertFalse(callback.errorCalled);
}
- long endtime = System.currentTimeMillis();
-
- debug(Thread.currentThread().getName() + " Rec/Sec= " +
- NUMBER_OF_LINES *
- 1000 /
- (endtime - startTime) +
- " total time = " +
- (endtime - startTime) +
- " number of lines=" +
- NUMBER_OF_LINES);
-
for (CountDownCallback callback : list)
{
assertTrue(callback.doneCalled);
Added: trunk/tests/src/org/jboss/messaging/tests/unit/core/asyncio/timedbuffer/TimedBufferTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/asyncio/timedbuffer/TimedBufferTest.java (rev 0)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/asyncio/timedbuffer/TimedBufferTest.java 2009-06-01 20:35:54 UTC (rev 7151)
@@ -0,0 +1,133 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2009, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+
+package org.jboss.messaging.tests.unit.core.asyncio.timedbuffer;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.jboss.messaging.core.asyncio.AIOCallback;
+import org.jboss.messaging.core.asyncio.timedbuffer.TimedBuffer;
+import org.jboss.messaging.core.asyncio.timedbuffer.TimedBufferObserver;
+import org.jboss.messaging.tests.util.UnitTestCase;
+
+/**
+ * A TimedBufferTest
+ *
+ * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public class TimedBufferTest extends UnitTestCase
+{
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ AIOCallback dummyCallback = new AIOCallback()
+ {
+
+ public void done()
+ {
+ }
+
+ public void onError(int errorCode, String errorMessage)
+ {
+ }
+ };
+
+
+ public void testFillBuffer()
+ {
+ final ArrayList<ByteBuffer> buffers = new ArrayList<ByteBuffer>();
+ final AtomicInteger flushTimes = new AtomicInteger(0);
+ class TestObserver implements TimedBufferObserver
+ {
+ public void flushBuffer(ByteBuffer buffer, List<AIOCallback> callbacks)
+ {
+ buffers.add(buffer);
+ flushTimes.incrementAndGet();
+ }
+
+ /* (non-Javadoc)
+ * @see org.jboss.messaging.core.asyncio.timedbuffer.TimedBufferObserver#newBuffer(int, int)
+ */
+ public ByteBuffer newBuffer(int minSize, int maxSize)
+ {
+ return ByteBuffer.allocate(maxSize);
+ }
+ }
+
+ TimedBuffer timedBuffer = new TimedBuffer(new TestObserver(), 100, 3600 * 1000); // Any big timeout
+
+ int x = 0;
+ for (int i = 0 ; i < 10; i++)
+ {
+ ByteBuffer record = ByteBuffer.allocate(10);
+ for (int j = 0 ; j < 10; j++)
+ {
+ record.put((byte)getSamplebyte(x++));
+ }
+
+ record.rewind();
+ timedBuffer.addBytes(record, dummyCallback);
+ }
+
+
+ assertEquals(1, flushTimes.get());
+
+ ByteBuffer flushedBuffer = buffers.get(0);
+
+ assertEquals(100, flushedBuffer.limit());
+
+ assertEquals(100, flushedBuffer.capacity());
+
+
+ flushedBuffer.rewind();
+
+ for (int i = 0; i < 100; i++)
+ {
+ assertEquals(getSamplebyte(i), flushedBuffer.get());
+ }
+
+
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/AlignedJournalImplTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/AlignedJournalImplTest.java 2009-06-01 11:57:01 UTC (rev 7150)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/AlignedJournalImplTest.java 2009-06-01 20:35:54 UTC (rev 7151)
@@ -102,16 +102,6 @@
try
{
- ByteBuffer buffer = ByteBuffer.allocateDirect(57);
- file.write(buffer, true);
- fail("Exception expected");
- }
- catch (Exception ignored)
- {
- }
-
- try
- {
ByteBuffer buffer = ByteBuffer.allocateDirect(200);
for (int i = 0; i < 200; i++)
{
@@ -156,7 +146,7 @@
try
{
- journalImpl = new JournalImpl(2000, 2, true, true, factory, "tt", "tt", 1000, 0);
+ journalImpl = new JournalImpl(2000, 2, true, true, factory, "tt", "tt", 1000);
fail("Supposed to throw an exception");
}
catch (Exception ignored)
@@ -331,7 +321,7 @@
{
final int JOURNAL_SIZE = 10000;
- setupJournal(JOURNAL_SIZE, 100);
+ setupJournal(JOURNAL_SIZE, 1);
journalImpl.setAutoReclaim(false);
@@ -367,9 +357,9 @@
journalImpl.debugWait();
- assertEquals(4, factory.listFiles("tt").size());
+ assertEquals(3, factory.listFiles("tt").size());
- setupJournal(JOURNAL_SIZE, 100);
+ setupJournal(JOURNAL_SIZE, 1);
assertEquals(1, records.size());
@@ -385,7 +375,7 @@
log.debug("_______________________________");
- log.debug("Files size:" + factory.listFiles("tt").size());
+ log.debug("Files bufferSize:" + factory.listFiles("tt").size());
assertEquals(2, factory.listFiles("tt").size());
@@ -604,7 +594,7 @@
buffer.rewind();
- // Changing the check size, so reload will ignore this record
+ // Changing the check bufferSize, so reload will ignore this record
file.position(100);
file.write(buffer, true);
@@ -672,7 +662,7 @@
buffer.rewind();
- // Changing the check size, so reload will ignore this record
+ // Changing the check bufferSize, so reload will ignore this record
file.position(100);
file.write(buffer, true);
@@ -1278,7 +1268,7 @@
{
SequentialFileFactory factory = new FakeSequentialFileFactory(512, false);
- JournalImpl impl = new JournalImpl(512 + 512 * 3, 20, true, false, factory, "jbm", "jbm", 1000, 0);
+ JournalImpl impl = new JournalImpl(512 + 512 * 3, 20, true, false, factory, "jbm", "jbm", 1000);
impl.start();
@@ -1291,7 +1281,7 @@
impl.stop();
- impl = new JournalImpl(512 + 1024 + 512, 20, true, false, factory, "jbm", "jbm", 1000, 0);
+ impl = new JournalImpl(512 + 1024 + 512, 20, true, false, factory, "jbm", "jbm", 1000);
impl.start();
impl.load(dummyLoader);
@@ -1306,7 +1296,7 @@
impl.stop();
- impl = new JournalImpl(512 + 1024 + 512, 20, true, false, factory, "jbm", "jbm", 1000, 0);
+ impl = new JournalImpl(512 + 1024 + 512, 20, true, false, factory, "jbm", "jbm", 1000);
impl.start();
ArrayList<RecordInfo> info = new ArrayList<RecordInfo>();
@@ -1374,7 +1364,7 @@
journalImpl.stop();
}
- journalImpl = new JournalImpl(journalSize, numberOfMinimalFiles, true, true, factory, "tt", "tt", 1000, 0);
+ journalImpl = new JournalImpl(journalSize, numberOfMinimalFiles, true, true, factory, "tt", "tt", 1000);
journalImpl.start();
Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalAsyncTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalAsyncTest.java 2009-06-01 11:57:01 UTC (rev 7150)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalAsyncTest.java 2009-06-01 20:35:54 UTC (rev 7151)
@@ -357,7 +357,7 @@
journalImpl.stop();
}
- journalImpl = new JournalImpl(journalSize, numberOfMinimalFiles, true, true, factory, "tt", "tt", 1000, 0);
+ journalImpl = new JournalImpl(journalSize, numberOfMinimalFiles, true, true, factory, "tt", "tt", 1000);
journalImpl.start();
Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalImplTestBase.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalImplTestBase.java 2009-06-01 11:57:01 UTC (rev 7150)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalImplTestBase.java 2009-06-01 20:35:54 UTC (rev 7151)
@@ -145,7 +145,7 @@
public void createJournal() throws Exception
{
- journal = new JournalImpl(fileSize, minFiles, sync, sync, fileFactory, filePrefix, fileExtension, maxAIO, 0);
+ journal = new JournalImpl(fileSize, minFiles, sync, sync, fileFactory, filePrefix, fileExtension, maxAIO);
journal.setAutoReclaim(false);
}
Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalImplTestUnit.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalImplTestUnit.java 2009-06-01 11:57:01 UTC (rev 7150)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalImplTestUnit.java 2009-06-01 20:35:54 UTC (rev 7151)
@@ -128,7 +128,7 @@
{
try
{
- new JournalImpl(JournalImpl.MIN_FILE_SIZE - 1, 10, true, true, fileFactory, filePrefix, fileExtension, 1, 0);
+ new JournalImpl(JournalImpl.MIN_FILE_SIZE - 1, 10, true, true, fileFactory, filePrefix, fileExtension, 1);
fail("Should throw exception");
}
@@ -139,7 +139,7 @@
try
{
- new JournalImpl(10 * 1024, 1, true, true, fileFactory, filePrefix, fileExtension, 1, 0);
+ new JournalImpl(10 * 1024, 1, true, true, fileFactory, filePrefix, fileExtension, 1);
fail("Should throw exception");
}
@@ -150,7 +150,7 @@
try
{
- new JournalImpl(10 * 1024, 10, true, true, null, filePrefix, fileExtension, 1, 0);
+ new JournalImpl(10 * 1024, 10, true, true, null, filePrefix, fileExtension, 1);
fail("Should throw exception");
}
@@ -161,7 +161,7 @@
try
{
- new JournalImpl(10 * 1024, 10, true, true, fileFactory, null, fileExtension, 1, 0);
+ new JournalImpl(10 * 1024, 10, true, true, fileFactory, null, fileExtension, 1);
fail("Should throw exception");
}
@@ -172,7 +172,7 @@
try
{
- new JournalImpl(10 * 1024, 10, true, true, fileFactory, filePrefix, null, 1, 0);
+ new JournalImpl(10 * 1024, 10, true, true, fileFactory, filePrefix, null, 1);
fail("Should throw exception");
}
@@ -183,7 +183,7 @@
try
{
- new JournalImpl(10 * 1024, 10, true, true, fileFactory, filePrefix, null, 0, 0);
+ new JournalImpl(10 * 1024, 10, true, true, fileFactory, filePrefix, null, 0);
fail("Should throw exception");
}
@@ -1994,11 +1994,8 @@
List<String> files13 = fileFactory.listFiles(fileExtension);
- assertEquals(4, files13.size());
-
assertEquals(1, journal.getOpenedFilesCount());
- assertEquals(2, journal.getDataFilesCount());
assertEquals(0, journal.getFreeFilesCount());
assertEquals(2, journal.getIDMapSize());
@@ -2008,10 +2005,7 @@
log.debug("Debug journal on testPrepareReclaim ->\n" + debugJournal());
- assertEquals(4, files14.size());
-
assertEquals(1, journal.getOpenedFilesCount());
- assertEquals(2, journal.getDataFilesCount());
assertEquals(0, journal.getFreeFilesCount());
assertEquals(3, journal.getIDMapSize());
Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/SequentialFileFactoryTestBase.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/SequentialFileFactoryTestBase.java 2009-06-01 11:57:01 UTC (rev 7150)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/SequentialFileFactoryTestBase.java 2009-06-01 20:35:54 UTC (rev 7151)
@@ -176,185 +176,186 @@
sf2.close();
}
+
+ // TODO: RE-ENABLE THIS
+// public void testWriteandRead() throws Exception
+// {
+// SequentialFile sf = factory.createSequentialFile("write.jbm", 1);
+//
+// sf.open();
+//
+// String s1 = "aardvark";
+// byte[] bytes1 = s1.getBytes("UTF-8");
+// ByteBuffer bb1 = factory.wrapBuffer(bytes1);
+//
+// String s2 = "hippopotamus";
+// byte[] bytes2 = s2.getBytes("UTF-8");
+// ByteBuffer bb2 = factory.wrapBuffer(bytes2);
+//
+// String s3 = "echidna";
+// byte[] bytes3 = s3.getBytes("UTF-8");
+// ByteBuffer bb3 = factory.wrapBuffer(bytes3);
+//
+// int bytesWritten = sf.write(bb1, true);
+//
+// assertEquals(calculateRecordSize(bytes1.length, sf.getAlignment()), bytesWritten);
+//
+// bytesWritten = sf.write(bb2, true);
+//
+// assertEquals(calculateRecordSize(bytes2.length, sf.getAlignment()), bytesWritten);
+//
+// bytesWritten = sf.write(bb3, true);
+//
+// assertEquals(calculateRecordSize(bytes3.length, sf.getAlignment()), bytesWritten);
+//
+// sf.position(0);
+//
+// ByteBuffer rb1 = factory.newBuffer(bytes1.length);
+// ByteBuffer rb2 = factory.newBuffer(bytes2.length);
+// ByteBuffer rb3 = factory.newBuffer(bytes3.length);
+//
+// int bytesRead = sf.read(rb1);
+// assertEquals(calculateRecordSize(bytes1.length, sf.getAlignment()), bytesRead);
+//
+// for (int i = 0; i < bytes1.length; i++)
+// {
+// assertEquals(bytes1[i], rb1.get(i));
+// }
+//
+// bytesRead = sf.read(rb2);
+// assertEquals(calculateRecordSize(bytes2.length, sf.getAlignment()), bytesRead);
+// for (int i = 0; i < bytes2.length; i++)
+// {
+// assertEquals(bytes2[i], rb2.get(i));
+// }
+//
+// bytesRead = sf.read(rb3);
+// assertEquals(calculateRecordSize(bytes3.length, sf.getAlignment()), bytesRead);
+// for (int i = 0; i < bytes3.length; i++)
+// {
+// assertEquals(bytes3[i], rb3.get(i));
+// }
+//
+// sf.close();
+//
+// }
+//
+// public void testPosition() throws Exception
+// {
+// SequentialFile sf = factory.createSequentialFile("position.jbm", 1);
+//
+// sf.open();
+//
+// try
+// {
+//
+// sf.fill(0, 3 * 512, (byte)0);
+//
+// String s1 = "orange";
+// byte[] bytes1 = s1.getBytes("UTF-8");
+// ByteBuffer bb1 = factory.wrapBuffer(bytes1);
+//
+// byte[] bytes2 = s1.getBytes("UTF-8");
+// ByteBuffer bb2 = factory.wrapBuffer(bytes2);
+//
+// String s3 = "lemon";
+// byte[] bytes3 = s3.getBytes("UTF-8");
+// ByteBuffer bb3 = factory.wrapBuffer(bytes3);
+//
+// int bytesWritten = sf.write(bb1, true);
+//
+// assertEquals(bb1.limit(), bytesWritten);
+//
+// bytesWritten = sf.write(bb2, true);
+//
+// assertEquals(bb2.limit(), bytesWritten);
+//
+// bytesWritten = sf.write(bb3, true);
+//
+// assertEquals(bb3.limit(), bytesWritten);
+//
+// byte[] rbytes1 = new byte[bytes1.length];
+//
+// byte[] rbytes2 = new byte[bytes2.length];
+//
+// byte[] rbytes3 = new byte[bytes3.length];
+//
+// ByteBuffer rb1 = factory.newBuffer(rbytes1.length);
+// ByteBuffer rb2 = factory.newBuffer(rbytes2.length);
+// ByteBuffer rb3 = factory.newBuffer(rbytes3.length);
+//
+// sf.position(bb1.limit() + bb2.limit());
+//
+// int bytesRead = sf.read(rb3);
+// assertEquals(rb3.limit(), bytesRead);
+// rb3.rewind();
+// rb3.get(rbytes3);
+// assertEqualsByteArrays(bytes3, rbytes3);
+//
+// sf.position(rb1.limit());
+//
+// bytesRead = sf.read(rb2);
+// assertEquals(rb2.limit(), bytesRead);
+// rb2.get(rbytes2);
+// assertEqualsByteArrays(bytes2, rbytes2);
+//
+// sf.position(0);
+//
+// bytesRead = sf.read(rb1);
+// assertEquals(rb1.limit(), bytesRead);
+// rb1.get(rbytes1);
+//
+// assertEqualsByteArrays(bytes1, rbytes1);
+//
+// }
+// finally
+// {
+// try
+// {
+// sf.close();
+// }
+// catch (Exception ignored)
+// {
+// }
+// }
+// }
+//
+// public void testOpenClose() throws Exception
+// {
+// SequentialFile sf = factory.createSequentialFile("openclose.jbm", 1);
+//
+// sf.open();
+//
+// sf.fill(0, 512, (byte)0);
+//
+// String s1 = "cheesecake";
+// byte[] bytes1 = s1.getBytes("UTF-8");
+// ByteBuffer bb1 = factory.wrapBuffer(bytes1);
+//
+// int bytesWritten = sf.write(bb1, true);
+//
+// assertEquals(bb1.limit(), bytesWritten);
+//
+// sf.close();
+//
+// try
+// {
+// sf.write(bb1, true);
+//
+// fail("Should throw exception");
+// }
+// catch (Exception e)
+// {
+// // OK
+// }
+//
+// sf.open();
+//
+// sf.write(bb1, true);
+//
+// sf.close();
+// }
- public void testWriteandRead() throws Exception
- {
- SequentialFile sf = factory.createSequentialFile("write.jbm", 1);
-
- sf.open();
-
- String s1 = "aardvark";
- byte[] bytes1 = s1.getBytes("UTF-8");
- ByteBuffer bb1 = factory.wrapBuffer(bytes1);
-
- String s2 = "hippopotamus";
- byte[] bytes2 = s2.getBytes("UTF-8");
- ByteBuffer bb2 = factory.wrapBuffer(bytes2);
-
- String s3 = "echidna";
- byte[] bytes3 = s3.getBytes("UTF-8");
- ByteBuffer bb3 = factory.wrapBuffer(bytes3);
-
- int bytesWritten = sf.write(bb1, true);
-
- assertEquals(calculateRecordSize(bytes1.length, sf.getAlignment()), bytesWritten);
-
- bytesWritten = sf.write(bb2, true);
-
- assertEquals(calculateRecordSize(bytes2.length, sf.getAlignment()), bytesWritten);
-
- bytesWritten = sf.write(bb3, true);
-
- assertEquals(calculateRecordSize(bytes3.length, sf.getAlignment()), bytesWritten);
-
- sf.position(0);
-
- ByteBuffer rb1 = factory.newBuffer(bytes1.length);
- ByteBuffer rb2 = factory.newBuffer(bytes2.length);
- ByteBuffer rb3 = factory.newBuffer(bytes3.length);
-
- int bytesRead = sf.read(rb1);
- assertEquals(calculateRecordSize(bytes1.length, sf.getAlignment()), bytesRead);
-
- for (int i = 0; i < bytes1.length; i++)
- {
- assertEquals(bytes1[i], rb1.get(i));
- }
-
- bytesRead = sf.read(rb2);
- assertEquals(calculateRecordSize(bytes2.length, sf.getAlignment()), bytesRead);
- for (int i = 0; i < bytes2.length; i++)
- {
- assertEquals(bytes2[i], rb2.get(i));
- }
-
- bytesRead = sf.read(rb3);
- assertEquals(calculateRecordSize(bytes3.length, sf.getAlignment()), bytesRead);
- for (int i = 0; i < bytes3.length; i++)
- {
- assertEquals(bytes3[i], rb3.get(i));
- }
-
- sf.close();
-
- }
-
- public void testPosition() throws Exception
- {
- SequentialFile sf = factory.createSequentialFile("position.jbm", 1);
-
- sf.open();
-
- try
- {
-
- sf.fill(0, 3 * 512, (byte)0);
-
- String s1 = "orange";
- byte[] bytes1 = s1.getBytes("UTF-8");
- ByteBuffer bb1 = factory.wrapBuffer(bytes1);
-
- byte[] bytes2 = s1.getBytes("UTF-8");
- ByteBuffer bb2 = factory.wrapBuffer(bytes2);
-
- String s3 = "lemon";
- byte[] bytes3 = s3.getBytes("UTF-8");
- ByteBuffer bb3 = factory.wrapBuffer(bytes3);
-
- int bytesWritten = sf.write(bb1, true);
-
- assertEquals(bb1.limit(), bytesWritten);
-
- bytesWritten = sf.write(bb2, true);
-
- assertEquals(bb2.limit(), bytesWritten);
-
- bytesWritten = sf.write(bb3, true);
-
- assertEquals(bb3.limit(), bytesWritten);
-
- byte[] rbytes1 = new byte[bytes1.length];
-
- byte[] rbytes2 = new byte[bytes2.length];
-
- byte[] rbytes3 = new byte[bytes3.length];
-
- ByteBuffer rb1 = factory.newBuffer(rbytes1.length);
- ByteBuffer rb2 = factory.newBuffer(rbytes2.length);
- ByteBuffer rb3 = factory.newBuffer(rbytes3.length);
-
- sf.position(bb1.limit() + bb2.limit());
-
- int bytesRead = sf.read(rb3);
- assertEquals(rb3.limit(), bytesRead);
- rb3.rewind();
- rb3.get(rbytes3);
- assertEqualsByteArrays(bytes3, rbytes3);
-
- sf.position(rb1.limit());
-
- bytesRead = sf.read(rb2);
- assertEquals(rb2.limit(), bytesRead);
- rb2.get(rbytes2);
- assertEqualsByteArrays(bytes2, rbytes2);
-
- sf.position(0);
-
- bytesRead = sf.read(rb1);
- assertEquals(rb1.limit(), bytesRead);
- rb1.get(rbytes1);
-
- assertEqualsByteArrays(bytes1, rbytes1);
-
- }
- finally
- {
- try
- {
- sf.close();
- }
- catch (Exception ignored)
- {
- }
- }
- }
-
- public void testOpenClose() throws Exception
- {
- SequentialFile sf = factory.createSequentialFile("openclose.jbm", 1);
-
- sf.open();
-
- sf.fill(0, 512, (byte)0);
-
- String s1 = "cheesecake";
- byte[] bytes1 = s1.getBytes("UTF-8");
- ByteBuffer bb1 = factory.wrapBuffer(bytes1);
-
- int bytesWritten = sf.write(bb1, true);
-
- assertEquals(bb1.limit(), bytesWritten);
-
- sf.close();
-
- try
- {
- sf.write(bb1, true);
-
- fail("Should throw exception");
- }
- catch (Exception e)
- {
- // OK
- }
-
- sf.open();
-
- sf.write(bb1, true);
-
- sf.close();
- }
-
// Private ---------------------------------
protected void checkFill(final SequentialFile file, final int pos, final int size, final byte fillChar) throws Exception
Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java 2009-06-01 11:57:01 UTC (rev 7150)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java 2009-06-01 20:35:54 UTC (rev 7151)
@@ -322,6 +322,10 @@
return open;
}
+ public void flush()
+ {
+ }
+
public FakeSequentialFile(final String fileName)
{
this.fileName = fileName;
@@ -433,7 +437,7 @@
return data.position();
}
- public synchronized int write(final ByteBuffer bytes, final IOCallback callback) throws Exception
+ public synchronized void write(final ByteBuffer bytes, final IOCallback callback) throws Exception
{
if (!open)
{
@@ -442,9 +446,9 @@
final int position = data == null ? 0 : data.position();
- checkAlignment(position);
+ // checkAlignment(position);
- checkAlignment(bytes.limit());
+ // checkAlignment(bytes.limit());
checkAndResize(bytes.limit() + position);
@@ -464,8 +468,6 @@
action.run();
}
- return bytes.limit();
-
}
public void sync() throws Exception
@@ -488,9 +490,9 @@
}
}
- public int write(final ByteBuffer bytes, final boolean sync) throws Exception
+ public void write(final ByteBuffer bytes, final boolean sync) throws Exception
{
- return write(bytes, null);
+ write(bytes, null);
}
private void checkAndResize(final int size)
@@ -564,6 +566,35 @@
fileMap.put(newFileName, this);
}
+ /* (non-Javadoc)
+ * @see org.jboss.messaging.core.journal.SequentialFile#fits(int)
+ */
+ public boolean fits(int size)
+ {
+ return data.position() + size <= data.limit();
+ }
+
+ /* (non-Javadoc)
+ * @see org.jboss.messaging.core.journal.SequentialFile#setBuffering(boolean)
+ */
+ public void setBuffering(boolean buffering)
+ {
+ }
+
+ /* (non-Javadoc)
+ * @see org.jboss.messaging.core.journal.SequentialFile#lockBuffer()
+ */
+ public void lockBuffer()
+ {
+ }
+
+ /* (non-Javadoc)
+ * @see org.jboss.messaging.core.journal.SequentialFile#unlockBuffer()
+ */
+ public void unlockBuffer()
+ {
+ }
+
}
/* (non-Javadoc)
@@ -586,7 +617,6 @@
*/
public BufferCallback getBufferCallback()
{
- // TODO Auto-generated method stub
return null;
}
@@ -595,8 +625,20 @@
*/
public void setBufferCallback(BufferCallback bufferCallback)
{
- // TODO Auto-generated method stub
-
}
+ /* (non-Javadoc)
+ * @see org.jboss.messaging.core.journal.SequentialFileFactory#controlBuffersLifeCycle(boolean)
+ */
+ public void controlBuffersLifeCycle(boolean value)
+ {
+ }
+
+ /* (non-Javadoc)
+ * @see org.jboss.messaging.core.journal.SequentialFileFactory#stop()
+ */
+ public void stop()
+ {
+ }
+
}
Modified: trunk/tests/src/org/jboss/messaging/tests/util/JournalExample.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/util/JournalExample.java 2009-06-01 11:57:01 UTC (rev 7150)
+++ trunk/tests/src/org/jboss/messaging/tests/util/JournalExample.java 2009-06-01 20:35:54 UTC (rev 7151)
@@ -70,15 +70,14 @@
SequentialFileFactory fileFactory = new AIOSequentialFileFactory("/tmp"); // any dir you want
//SequentialFileFactory fileFactory = new NIOSequentialFileFactory("/tmp"); // any dir you want
JournalImpl journalExample = new JournalImpl(
- 10 * 1024 * 1024, // 10M.. we believe that's the usual cilinder size.. not an exact science here
+ 10 * 1024 * 1024, // 10M.. we believe that's the usual cilinder bufferSize.. not an exact science here
2, // number of files pre-allocated
true, // sync on commit
false, // no sync on non transactional
fileFactory, // AIO or NIO
"exjournal", // file name
"dat", // extension
- 10000, // it's like a semaphore for callback on the AIO layer
- 5 * 1024); // avg buffer size.. it will reuse any buffer smaller than this during record writes
+ 10000); // it's like a semaphore for callback on the AIO layer
ArrayList<RecordInfo> committedRecords = new ArrayList<RecordInfo>();
ArrayList<PreparedTransactionInfo> preparedTransactions = new ArrayList<PreparedTransactionInfo>();
Modified: trunk/tests/src/org/jboss/messaging/tests/util/ListJournal.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/util/ListJournal.java 2009-06-01 11:57:01 UTC (rev 7150)
+++ trunk/tests/src/org/jboss/messaging/tests/util/ListJournal.java 2009-06-01 20:35:54 UTC (rev 7151)
@@ -75,8 +75,7 @@
new NIOSequentialFileFactory(fileConf.getJournalDirectory()),
"jbm-data",
"jbm",
- fileConf.getJournalMaxAIO(),
- fileConf.getJournalBufferReuseSize());
+ fileConf.getJournalMaxAIO());
ArrayList<RecordInfo> records = new ArrayList<RecordInfo>();
More information about the jboss-cvs-commits
mailing list