[jboss-cvs] JBoss Messaging SVN: r7300 - in trunk: src/main/org/jboss/messaging/core/journal and 3 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Wed Jun 10 18:27:05 EDT 2009
Author: clebert.suconic at jboss.com
Date: 2009-06-10 18:27:05 -0400 (Wed, 10 Jun 2009)
New Revision: 7300
Modified:
trunk/src/main/org/jboss/messaging/core/asyncio/impl/TimedBuffer.java
trunk/src/main/org/jboss/messaging/core/journal/SequentialFile.java
trunk/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFile.java
trunk/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFileFactory.java
trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java
trunk/src/main/org/jboss/messaging/core/journal/impl/NIOSequentialFile.java
trunk/tests/src/org/jboss/messaging/tests/unit/core/asyncio/TimedBufferTest.java
trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java
Log:
Improvements on journal
Modified: trunk/src/main/org/jboss/messaging/core/asyncio/impl/TimedBuffer.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/asyncio/impl/TimedBuffer.java 2009-06-10 18:12:08 UTC (rev 7299)
+++ trunk/src/main/org/jboss/messaging/core/asyncio/impl/TimedBuffer.java 2009-06-10 22:27:05 UTC (rev 7300)
@@ -31,7 +31,9 @@
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.jboss.messaging.core.asyncio.AIOCallback;
+import org.jboss.messaging.core.buffers.ChannelBuffers;
import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
import org.jboss.messaging.utils.VariableLatch;
/**
@@ -52,7 +54,7 @@
private TimedBufferObserver bufferObserver;
// Some kernels don't have good resolutions on timers.. I've set this to disabled.. we may decide later
- private static final boolean USE_NATIVE_TIMERS = false;
+ private static final boolean USE_NATIVE_TIMERS = true;
// This is used to pause and resume the timer
// This is a reusable Latch, that uses java.util.concurrent base classes
@@ -62,15 +64,17 @@
private final int bufferSize;
- private final ByteBuffer currentBuffer;
+ private final MessagingBuffer buffer;
+ private int bufferLimit = 0;
+
private List<AIOCallback> callbacks;
private final Lock lock = new ReentrantReadWriteLock().writeLock();
// used to measure inactivity. This buffer will be automatically flushed when more than timeout inactive
private volatile boolean active = false;
-
+
private final long timeout;
// used to measure sync requests. When a sync is requested, it shouldn't take more than timeout to happen
@@ -81,19 +85,19 @@
private volatile boolean started;
private final boolean flushOnSync;
-
+
// for logging write rates
-
+
private final boolean logRates;
-
+
private volatile long bytesFlushed;
-
+
private Timer logRatesTimer;
-
+
private TimerTask logRatesTimerTask;
-
+
private long lastExecution;
-
+
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
@@ -109,7 +113,7 @@
this.logRatesTimer = new Timer(true);
}
// Setting the interval for nano-sleeps
-
+
// We are keeping this disabled for now until we figure out what to do.
// I've found a few problems with nano-sleep depending on the version of the kernel:
// http://fixunix.com/unix/552033-problem-nanosleep.html
@@ -117,9 +121,11 @@
{
AsynchronousFileImpl.setNanoSleepInterval((int)timeout);
}
-
- currentBuffer = ByteBuffer.wrap(new byte[bufferSize]);
- currentBuffer.limit(0);
+
+ buffer = ChannelBuffers.buffer(bufferSize);
+ buffer.clear();
+ bufferLimit = 0;
+
callbacks = new ArrayList<AIOCallback>();
this.flushOnSync = flushOnSync;
latchTimer.up();
@@ -138,11 +144,11 @@
timerThread = new Thread(timerRunnable, "jbm-aio-timer");
timerThread.start();
-
+
if (logRates)
{
logRatesTimerTask = new LogRatesTimerTask();
-
+
logRatesTimer.scheduleAtFixedRate(logRatesTimerTask, 2000, 2000);
}
@@ -155,15 +161,15 @@
{
return;
}
-
+
this.flush();
-
+
this.bufferObserver = null;
-
+
latchTimer.down();
timerRunnable.close();
-
+
if (logRates)
{
logRatesTimerTask.cancel();
@@ -216,7 +222,7 @@
") on the journal");
}
- if (currentBuffer.limit() == 0 || currentBuffer.position() + sizeChecked > currentBuffer.limit())
+ if (bufferLimit == 0 || buffer.writerIndex() + sizeChecked > bufferLimit)
{
flush();
@@ -228,8 +234,8 @@
}
else
{
- currentBuffer.rewind();
- currentBuffer.limit(Math.min(remaining, bufferSize));
+ buffer.clear();
+ bufferLimit = Math.min(remaining, bufferSize);
return true;
}
}
@@ -239,15 +245,16 @@
}
}
- public synchronized void addBytes(final ByteBuffer bytes, final boolean sync, final AIOCallback callback)
+ public synchronized void addBytes(final byte[] bytes, final boolean sync, final AIOCallback callback)
{
- if (currentBuffer.position() == 0)
+ if (buffer.writerIndex() == 0)
{
// Resume latch
latchTimer.down();
}
-
- currentBuffer.put(bytes);
+
+ buffer.writeBytes(bytes);
+
callbacks.add(callback);
active = true;
@@ -268,7 +275,7 @@
}
}
- if (currentBuffer.position() == currentBuffer.capacity())
+ if (buffer.writerIndex() == bufferLimit)
{
flush();
}
@@ -276,34 +283,33 @@
public synchronized void flush()
{
- if (currentBuffer.limit() > 0)
+ if (buffer.writerIndex() > 0)
{
latchTimer.up();
-
- int pos = currentBuffer.position();
-
+
+ int pos = buffer.writerIndex();
+
if (logRates)
{
- bytesFlushed += pos;
+ bytesFlushed += pos;
}
ByteBuffer directBuffer = bufferObserver.newBuffer(bufferSize, pos);
// Putting a byteArray on a native buffer is much faster, since it will do in a single native call.
- // Using directBuffer.put(currentBuffer) would make several append calls for each byte
-
- //TODO we could optimise this further by making currentBuffer a simple byte[] and using System.arrayCopy to add bytes to it
- //then we wouldn't need the array() call
- directBuffer.put(currentBuffer.array(), 0, pos);
+ // Using directBuffer.put(buffer) would make several append calls for each byte
+ directBuffer.put(buffer.array(), 0, pos);
+
bufferObserver.flushBuffer(directBuffer, callbacks);
callbacks = new ArrayList<AIOCallback>();
active = false;
pendingSync = false;
-
- currentBuffer.limit(0);
+
+ buffer.clear();
+ bufferLimit = 0;
}
}
@@ -342,35 +348,34 @@
private class LogRatesTimerTask extends TimerTask
{
private boolean closed;
-
+
@Override
public synchronized void run()
{
if (!closed)
{
long now = System.currentTimeMillis();
-
+
if (lastExecution != 0)
- {
- double rate = 1000 * ((double)bytesFlushed) / ( now - lastExecution);
-
- log.info("Write rate = " + rate + " bytes / sec");
+ {
+ double rate = 1000 * ((double)bytesFlushed) / (now - lastExecution);
+ log.info("Write rate = " + rate + " bytes / sec or " + (long)(rate / (1024 * 1024)) + " MiB / sec");
}
lastExecution = now;
-
+
bytesFlushed = 0;
}
}
-
+
public synchronized boolean cancel()
{
closed = true;
-
+
return super.cancel();
}
}
-
+
private class CheckTimer implements Runnable
{
private volatile boolean closed = false;
@@ -381,7 +386,7 @@
{
Thread.currentThread().setPriority(Thread.MAX_PRIORITY);
}
-
+
while (!closed)
{
try
@@ -391,7 +396,7 @@
catch (InterruptedException ignored)
{
}
-
+
sleep();
checkTimer();
Modified: trunk/src/main/org/jboss/messaging/core/journal/SequentialFile.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/SequentialFile.java 2009-06-10 18:12:08 UTC (rev 7299)
+++ trunk/src/main/org/jboss/messaging/core/journal/SequentialFile.java 2009-06-10 22:27:05 UTC (rev 7300)
@@ -24,6 +24,8 @@
import java.nio.ByteBuffer;
+import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
+
/**
*
* A SequentialFile
@@ -60,6 +62,10 @@
void delete() throws Exception;
+ void write(MessagingBuffer bytes, boolean sync, IOCallback callback) throws Exception;
+
+ void write(MessagingBuffer bytes, boolean sync) throws Exception;
+
void write(ByteBuffer bytes, boolean sync, IOCallback callback) throws Exception;
void write(ByteBuffer bytes, boolean sync) throws Exception;
Modified: trunk/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFile.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFile.java 2009-06-10 18:12:08 UTC (rev 7299)
+++ trunk/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFile.java 2009-06-10 22:27:05 UTC (rev 7300)
@@ -40,6 +40,7 @@
import org.jboss.messaging.core.journal.SequentialFile;
import org.jboss.messaging.core.journal.SequentialFileFactory;
import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
/**
*
@@ -301,11 +302,43 @@
return bytesRead;
}
+ public void write(final MessagingBuffer bytes, final boolean sync, final IOCallback callback) throws Exception
+ {
+ if (timedBuffer != null)
+ {
+ timedBuffer.addBytes(bytes.array(), sync, callback);
+ }
+ else
+ {
+ ByteBuffer buffer = factory.newBuffer(bytes.capacity());
+ buffer.put(bytes.array());
+ doWrite(buffer, callback);
+ }
+ }
+
+ public void write(final MessagingBuffer bytes, final boolean sync) throws Exception
+ {
+ if (sync)
+ {
+ IOCallback completion = SimpleWaitIOCallback.getInstance();
+
+ write(bytes, true, completion);
+
+ completion.waitCompletion();
+ }
+ else
+ {
+ write(bytes, false, DummyCallback.instance);
+ }
+ }
+
+
public void write(final ByteBuffer bytes, final boolean sync, final IOCallback callback) throws Exception
{
if (timedBuffer != null)
{
- timedBuffer.addBytes(bytes, sync, callback);
+ // sanity check.. it shouldn't happen
+ throw new IllegalStateException("Illegal buffered usage. Can't use ByteBuffer write while buffer SequentialFile");
}
else
{
@@ -329,6 +362,7 @@
}
}
+
public void sync() throws Exception
{
throw new IllegalArgumentException("This method is not supported on AIO");
Modified: trunk/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFileFactory.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFileFactory.java 2009-06-10 18:12:08 UTC (rev 7299)
+++ trunk/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFileFactory.java 2009-06-10 22:27:05 UTC (rev 7300)
@@ -204,9 +204,14 @@
public void stop()
{
- buffersControl.clearPoll();
+ buffersControl.stop();
timedBuffer.stop();
}
+
+ protected void finalize()
+ {
+ this.stop();
+ }
/** Class that will control buffer-reuse */
private class ReuseBuffersController
@@ -220,6 +225,8 @@
/** During reload we may disable/enable buffer reuse */
private boolean enabled = true;
+
+ private boolean stopped = false;
final BufferCallback callback = new LocalBufferCallback();
@@ -284,8 +291,14 @@
}
}
- public void clearPoll()
+ public synchronized void stop()
{
+ stopped = true;
+ clearPoll();
+ }
+
+ public synchronized void clearPoll()
+ {
ByteBuffer reusedBuffer;
while ((reusedBuffer = reuseBuffersQueue.poll()) != null)
@@ -298,19 +311,31 @@
{
public void bufferDone(final ByteBuffer buffer)
{
- if (enabled)
+ synchronized (ReuseBuffersController.this)
{
- bufferReuseLastTime = System.currentTimeMillis();
-
- // If a buffer has any other than the configured bufferSize, the buffer
- // will be just sent to GC
- if (buffer.capacity() == bufferSize)
+ if (stopped)
{
- reuseBuffersQueue.offer(buffer);
+ System.out.println("Releasing buffer after stopped");
+ releaseBuffer(buffer);
}
else
{
- releaseBuffer(buffer);
+
+ if (enabled)
+ {
+ bufferReuseLastTime = System.currentTimeMillis();
+
+ // If a buffer has any other than the configured bufferSize, the buffer
+ // will be just sent to GC
+ if (buffer.capacity() == bufferSize)
+ {
+ reuseBuffersQueue.offer(buffer);
+ }
+ else
+ {
+ releaseBuffer(buffer);
+ }
+ }
}
}
}
Modified: trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java 2009-06-10 18:12:08 UTC (rev 7299)
+++ trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java 2009-06-10 22:27:05 UTC (rev 7300)
@@ -43,10 +43,9 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.jboss.messaging.core.buffers.ChannelBuffer;
import org.jboss.messaging.core.buffers.ChannelBuffers;
@@ -191,7 +190,7 @@
private ExecutorService filesExecutor = null;
- private final Lock lock = new ReentrantReadWriteLock().writeLock();
+ private final Semaphore lock = new Semaphore(1);
private volatile JournalFile currentFile;
@@ -271,7 +270,7 @@
int size = SIZE_ADD_RECORD + recordLength;
- ChannelBuffer bb = ChannelBuffers.wrappedBuffer(newBuffer(size));
+ ChannelBuffer bb = newBuffer(size);
bb.writeByte(ADD_RECORD);
bb.writeInt(-1); // skip ID part
@@ -283,16 +282,16 @@
IOCallback callback = getSyncCallback(sync);
- lock.lock();
+ lock.acquire();
try
{
- JournalFile usedFile = appendRecord(bb.toByteBuffer(), sync, callback);
+ JournalFile usedFile = appendRecord(bb, sync, callback);
posFilesMap.put(id, new PosFiles(usedFile));
}
finally
{
- lock.unlock();
+ lock.release();
}
if (callback != null)
@@ -322,7 +321,7 @@
int size = SIZE_UPDATE_RECORD + record.getEncodeSize();
- ChannelBuffer bb = ChannelBuffers.wrappedBuffer(newBuffer(size));
+ ChannelBuffer bb = newBuffer(size);
bb.writeByte(UPDATE_RECORD);
bb.writeInt(-1); // skip ID part
@@ -335,16 +334,16 @@
IOCallback callback = getSyncCallback(sync);
- lock.lock();
+ lock.acquire();
try
{
- JournalFile usedFile = appendRecord(bb.toByteBuffer(), sync, callback);
+ JournalFile usedFile = appendRecord(bb, sync, callback);
posFiles.addUpdateFile(usedFile);
}
finally
{
- lock.unlock();
+ lock.release();
}
if (callback != null)
@@ -369,16 +368,16 @@
int size = SIZE_DELETE_RECORD;
- ByteBuffer bb = newBuffer(size);
+ ChannelBuffer bb = newBuffer(size);
- bb.put(DELETE_RECORD);
- bb.putInt(-1); // skip ID part
- bb.putLong(id);
- bb.putInt(size);
+ bb.writeByte(DELETE_RECORD);
+ bb.writeInt(-1); // skip ID part
+ bb.writeLong(id);
+ bb.writeInt(size);
IOCallback callback = getSyncCallback(sync);
- lock.lock();
+ lock.acquire();
try
{
JournalFile usedFile = appendRecord(bb, sync, callback);
@@ -387,7 +386,7 @@
}
finally
{
- lock.unlock();
+ lock.release();
}
if (callback != null)
@@ -418,7 +417,7 @@
int size = SIZE_ADD_RECORD_TX + recordLength;
- ChannelBuffer bb = ChannelBuffers.wrappedBuffer(newBuffer(size));
+ ChannelBuffer bb = newBuffer(size);
bb.writeByte(ADD_RECORD_TX);
bb.writeInt(-1); // skip ID part
@@ -429,10 +428,10 @@
record.encode(bb);
bb.writeInt(size);
- lock.lock();
+ lock.acquire();
try
{
- JournalFile usedFile = appendRecord(bb.toByteBuffer(), false, getTransactionCallback(txID, sync));
+ JournalFile usedFile = appendRecord(bb, false, getTransactionCallback(txID, sync));
JournalTransaction tx = getTransactionInfo(txID);
@@ -440,7 +439,7 @@
}
finally
{
- lock.unlock();
+ lock.release();
}
}
@@ -466,7 +465,7 @@
int size = SIZE_UPDATE_RECORD_TX + record.getEncodeSize();
- ChannelBuffer bb = ChannelBuffers.wrappedBuffer(newBuffer(size));
+ ChannelBuffer bb = newBuffer(size);
bb.writeByte(UPDATE_RECORD_TX);
bb.writeInt(-1); // skip ID part
@@ -477,10 +476,10 @@
record.encode(bb);
bb.writeInt(size);
- lock.lock();
+ lock.acquire();
try
{
- JournalFile usedFile = appendRecord(bb.toByteBuffer(), false, getTransactionCallback(txID, sync));
+ JournalFile usedFile = appendRecord(bb, false, getTransactionCallback(txID, sync));
JournalTransaction tx = getTransactionInfo(txID);
@@ -488,7 +487,7 @@
}
finally
{
- lock.unlock();
+ lock.release();
}
}
@@ -508,7 +507,7 @@
int size = SIZE_DELETE_RECORD_TX + (record != null ? record.getEncodeSize() : 0);
- ChannelBuffer bb = ChannelBuffers.wrappedBuffer(newBuffer(size));
+ ChannelBuffer bb = newBuffer(size);
bb.writeByte(DELETE_RECORD_TX);
bb.writeInt(-1); // skip ID part
@@ -521,10 +520,10 @@
}
bb.writeInt(size);
- lock.lock();
+ lock.acquire();
try
{
- JournalFile usedFile = appendRecord(bb.toByteBuffer(), false, getTransactionCallback(txID, sync));
+ JournalFile usedFile = appendRecord(bb, false, getTransactionCallback(txID, sync));
JournalTransaction tx = getTransactionInfo(txID);
@@ -532,7 +531,7 @@
}
finally
{
- lock.unlock();
+ lock.release();
}
}
@@ -546,7 +545,7 @@
int size = SIZE_DELETE_RECORD_TX;
- ChannelBuffer bb = ChannelBuffers.wrappedBuffer(newBuffer(size));
+ ChannelBuffer bb = newBuffer(size);
bb.writeByte(DELETE_RECORD_TX);
bb.writeInt(-1); // skip ID part
@@ -555,10 +554,10 @@
bb.writeInt(0);
bb.writeInt(size);
- lock.lock();
+ lock.acquire();
try
{
- JournalFile usedFile = appendRecord(bb.toByteBuffer(), false, getTransactionCallback(txID, sync));
+ JournalFile usedFile = appendRecord(bb, false, getTransactionCallback(txID, sync));
JournalTransaction tx = getTransactionInfo(txID);
@@ -566,7 +565,7 @@
}
finally
{
- lock.unlock();
+ lock.release();
}
}
@@ -592,11 +591,11 @@
JournalTransaction tx = getTransactionInfo(txID);
- ByteBuffer bb = writeTransaction(PREPARE_RECORD, txID, tx, transactionData);
+ ChannelBuffer bb = writeTransaction(PREPARE_RECORD, txID, tx, transactionData);
IOCallback callback = getTransactionCallback(txID, sync);
- lock.lock();
+ lock.acquire();
try
{
JournalFile usedFile = appendRecord(bb, sync, callback);
@@ -605,7 +604,7 @@
}
finally
{
- lock.unlock();
+ lock.release();
}
// We should wait this outside of the lock, to increase throughput
@@ -646,11 +645,11 @@
throw new IllegalStateException("Cannot find tx with id " + txID);
}
- ByteBuffer bb = writeTransaction(COMMIT_RECORD, txID, tx, null);
+ ChannelBuffer bb = writeTransaction(COMMIT_RECORD, txID, tx, null);
IOCallback callback = getTransactionCallback(txID, sync);
- lock.lock();
+ lock.acquire();
try
{
JournalFile usedFile = appendRecord(bb, sync, callback);
@@ -661,7 +660,7 @@
}
finally
{
- lock.unlock();
+ lock.release();
}
// We should wait this outside of the lock, to increase throuput
@@ -688,16 +687,16 @@
int size = SIZE_ROLLBACK_RECORD;
- ByteBuffer bb = newBuffer(size);
+ ChannelBuffer bb = newBuffer(size);
- bb.put(ROLLBACK_RECORD);
- bb.putInt(-1); // skip ID part
- bb.putLong(txID);
- bb.putInt(size);
+ bb.writeByte(ROLLBACK_RECORD);
+ bb.writeInt(-1); // skip ID part
+ bb.writeLong(txID);
+ bb.writeInt(size);
IOCallback callback = getTransactionCallback(txID, sync);
- lock.lock();
+ lock.acquire();
try
{
JournalFile usedFile = appendRecord(bb, sync, callback);
@@ -708,7 +707,7 @@
}
finally
{
- lock.unlock();
+ lock.release();
}
// We should wait this outside of the lock, to increase throuput
@@ -1573,8 +1572,16 @@
// In some tests we need to force the journal to move to a next file
public void forceMoveNextFile() throws Exception
{
- moveNextFile();
- debugWait();
+ lock.acquire();
+ try
+ {
+ moveNextFile();
+ debugWait();
+ }
+ finally
+ {
+ lock.release();
+ }
}
public void perfBlast(final int pages) throws Exception
@@ -1613,7 +1620,7 @@
throw new IllegalStateException("Journal is already stopped");
}
- lock.lock();
+ lock.acquire();
try
{
@@ -1648,7 +1655,7 @@
}
finally
{
- lock.unlock();
+ lock.release();
}
}
@@ -1798,7 +1805,7 @@
* @return
* @throws Exception
*/
- private ByteBuffer writeTransaction(final byte recordType,
+ private ChannelBuffer writeTransaction(final byte recordType,
final long txID,
final JournalTransaction tx,
final EncodingSupport transactionData) throws Exception
@@ -1808,7 +1815,7 @@
2 +
(transactionData != null ? transactionData.getEncodeSize() + SIZE_INT : 0);
- ChannelBuffer bb = ChannelBuffers.wrappedBuffer(newBuffer(size));
+ ChannelBuffer bb = newBuffer(size);
bb.writeByte(recordType);
bb.writeInt(-1); // skip ID part
@@ -1834,7 +1841,7 @@
bb.writeInt(size);
- return bb.toByteBuffer();
+ return bb;
}
private boolean isTransaction(final byte recordType)
@@ -1953,13 +1960,10 @@
/**
- * Note: This method will perform rwlock.readLock.lock();
- * The method caller should aways unlock that readLock
+ * Note: You should aways guarantee locking the semaphore lock.
* */
- private JournalFile appendRecord(final ByteBuffer bb, final boolean sync, final IOCallback callback) throws Exception
+ private JournalFile appendRecord(final MessagingBuffer bb, final boolean sync, final IOCallback callback) throws Exception
{
- lock.lock();
-
try
{
if (state != STATE_LOADED)
@@ -1967,7 +1971,7 @@
throw new IllegalStateException("The journal was stopped");
}
- int size = bb.limit();
+ int size = bb.capacity();
// We take into account the fileID used on the Header
if (size > fileSize - currentFile.getFile().calculateBlockStart(SIZE_HEADER))
@@ -1997,12 +2001,10 @@
throw new IllegalStateException("Current file = null");
}
- bb.position(SIZE_BYTE);
+ bb.writerIndex(SIZE_BYTE);
- bb.putInt(currentFile.getOrderingID());
+ bb.writeInt(currentFile.getOrderingID());
- bb.rewind();
-
if (callback != null)
{
currentFile.getFile().write(bb, sync, callback);
@@ -2017,7 +2019,6 @@
finally
{
currentFile.getFile().unlockBuffer();
- lock.unlock();
}
}
@@ -2078,20 +2079,11 @@
// You need to guarantee lock.acquire() before calling this method
private void moveNextFile() throws InterruptedException
{
- lock.lock();
- try
- {
- closeFile(currentFile);
+ closeFile(currentFile);
- currentFile = enqueueOpenFile();
-
- fileFactory.activate(currentFile.getFile());
-
- }
- finally
- {
- lock.unlock();
- }
+ currentFile = enqueueOpenFile();
+
+ fileFactory.activate(currentFile.getFile());
}
/**
@@ -2272,9 +2264,9 @@
}
}
- public ByteBuffer newBuffer(final int size)
+ public ChannelBuffer newBuffer(final int size)
{
- return ByteBuffer.wrap(new byte[size]);
+ return ChannelBuffers.buffer(size);
}
// Inner classes
@@ -2582,22 +2574,16 @@
{
try
{
- lock.lock();
+ lock.acquire();
- byte[] bytes = new byte[128 * 1024];
+ MessagingBuffer bb = newBuffer(128 * 1024);
for (int i = 0; i < pages; i++)
{
- ByteBuffer bb = ByteBuffer.wrap(bytes);
-
- bb.limit(bytes.length);
-
- bb.position(0);
-
appendRecord(bb, false, null);
}
- lock.unlock();
+ lock.release();
}
catch (Exception e)
{
Modified: trunk/src/main/org/jboss/messaging/core/journal/impl/NIOSequentialFile.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/impl/NIOSequentialFile.java 2009-06-10 18:12:08 UTC (rev 7299)
+++ trunk/src/main/org/jboss/messaging/core/journal/impl/NIOSequentialFile.java 2009-06-10 22:27:05 UTC (rev 7300)
@@ -31,6 +31,7 @@
import org.jboss.messaging.core.journal.IOCallback;
import org.jboss.messaging.core.journal.SequentialFile;
import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
/**
*
@@ -183,6 +184,17 @@
}
+
+ public void write(final MessagingBuffer bytes, final boolean sync) throws Exception
+ {
+ write(ByteBuffer.wrap(bytes.array()), sync);
+ }
+
+ public void write(final MessagingBuffer bytes, final boolean sync, final IOCallback callback) throws Exception
+ {
+ write(ByteBuffer.wrap(bytes.array()), sync, callback);
+ }
+
public void write(final ByteBuffer bytes, final boolean sync) throws Exception
{
position.addAndGet(bytes.limit());
Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/asyncio/TimedBufferTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/asyncio/TimedBufferTest.java 2009-06-10 18:12:08 UTC (rev 7299)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/asyncio/TimedBufferTest.java 2009-06-10 22:27:05 UTC (rev 7300)
@@ -99,15 +99,14 @@
int x = 0;
for (int i = 0 ; i < 10; i++)
{
- ByteBuffer record = ByteBuffer.allocate(10);
+ byte[] bytes = new byte[10];
for (int j = 0 ; j < 10; j++)
{
- record.put((byte)getSamplebyte(x++));
+ bytes[j] = getSamplebyte(x++);
}
timedBuffer.checkSize(10);
- record.rewind();
- timedBuffer.addBytes(record, false, dummyCallback);
+ timedBuffer.addBytes(bytes, false, dummyCallback);
}
Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java 2009-06-10 18:12:08 UTC (rev 7299)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java 2009-06-10 22:27:05 UTC (rev 7300)
@@ -33,6 +33,7 @@
import org.jboss.messaging.core.journal.SequentialFile;
import org.jboss.messaging.core.journal.SequentialFileFactory;
import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
/**
*
@@ -595,6 +596,23 @@
{
}
+ /* (non-Javadoc)
+ * @see org.jboss.messaging.core.journal.SequentialFile#write(org.jboss.messaging.core.remoting.spi.MessagingBuffer, boolean, org.jboss.messaging.core.journal.IOCallback)
+ */
+ public void write(MessagingBuffer bytes, boolean sync, IOCallback callback) throws Exception
+ {
+ write(ByteBuffer.wrap(bytes.array()), sync, callback);
+
+ }
+
+ /* (non-Javadoc)
+ * @see org.jboss.messaging.core.journal.SequentialFile#write(org.jboss.messaging.core.remoting.spi.MessagingBuffer, boolean)
+ */
+ public void write(MessagingBuffer bytes, boolean sync) throws Exception
+ {
+ write(ByteBuffer.wrap(bytes.array()), sync);
+ }
+
}
/* (non-Javadoc)
More information about the jboss-cvs-commits
mailing list