[jboss-cvs] JBoss Messaging SVN: r7141 - in branches/Branch_JBM2_Perf_Clebert: tests/src/org/jboss/messaging/tests/performance/journal and 2 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Sun May 31 00:00:53 EDT 2009
Author: clebert.suconic at jboss.com
Date: 2009-05-31 00:00:52 -0400 (Sun, 31 May 2009)
New Revision: 7141
Added:
branches/Branch_JBM2_Perf_Clebert/tests/src/org/jboss/messaging/tests/unit/core/asyncio/timedbuffer/
branches/Branch_JBM2_Perf_Clebert/tests/src/org/jboss/messaging/tests/unit/core/asyncio/timedbuffer/TimedBufferTest.java
Modified:
branches/Branch_JBM2_Perf_Clebert/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java
branches/Branch_JBM2_Perf_Clebert/tests/src/org/jboss/messaging/tests/performance/journal/PerformanceComparissonTest.java
Log:
Journal work
Modified: branches/Branch_JBM2_Perf_Clebert/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java
===================================================================
--- branches/Branch_JBM2_Perf_Clebert/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java 2009-05-30 22:49:25 UTC (rev 7140)
+++ branches/Branch_JBM2_Perf_Clebert/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java 2009-05-31 04:00:52 UTC (rev 7141)
@@ -43,9 +43,10 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.jboss.messaging.core.buffers.ChannelBuffer;
import org.jboss.messaging.core.buffers.ChannelBuffers;
@@ -194,11 +195,7 @@
private ExecutorService filesExecutor = null;
- /**
- * Used to lock access while calculating the positioning of currentFile.
- * That has to be done in single-thread, and it needs to be a very-fast operation
- */
- private final Semaphore lock = new Semaphore(1, true);
+ private final Lock lock = new ReentrantReadWriteLock().writeLock();
private volatile JournalFile currentFile;
@@ -219,11 +216,11 @@
{
if (fileSize < MIN_FILE_SIZE)
{
- throw new IllegalArgumentException("File bufferSize cannot be less than " + MIN_FILE_SIZE + " bytes");
+ throw new IllegalArgumentException("File size cannot be less than " + MIN_FILE_SIZE + " bytes");
}
if (fileSize % fileFactory.getAlignment() != 0)
{
- throw new IllegalArgumentException("Invalid journal-file-bufferSize " + fileSize +
+ throw new IllegalArgumentException("Invalid journal-file-size " + fileSize +
", It should be multiple of " +
fileFactory.getAlignment());
}
@@ -299,9 +296,17 @@
record.encode(bb);
bb.writeInt(size);
- JournalFile usedFile = appendRecord(bb.toByteBuffer(), sync, null);
+ lock.lock();
+ try
+ {
+ JournalFile usedFile = appendRecord(bb.toByteBuffer(), sync, null);
- posFilesMap.put(id, new PosFiles(usedFile));
+ posFilesMap.put(id, new PosFiles(usedFile));
+ }
+ finally
+ {
+ lock.unlock();
+ }
}
public void appendUpdateRecord(final long id, final byte recordType, final byte[] record) throws Exception
@@ -335,9 +340,17 @@
record.encode(bb);
bb.writeInt(size);
- JournalFile usedFile = appendRecord(bb.toByteBuffer(), syncNonTransactional, null);
+ lock.lock();
+ try
+ {
+ JournalFile usedFile = appendRecord(bb.toByteBuffer(), syncNonTransactional, null);
- posFiles.addUpdateFile(usedFile);
+ posFiles.addUpdateFile(usedFile);
+ }
+ finally
+ {
+ lock.unlock();
+ }
}
public void appendDeleteRecord(final long id) throws Exception
@@ -363,9 +376,17 @@
bb.putLong(id);
bb.putInt(size);
- JournalFile usedFile = appendRecord(bb, syncNonTransactional, null);
+ lock.lock();
+ try
+ {
+ JournalFile usedFile = appendRecord(bb, syncNonTransactional, null);
- posFiles.addDelete(usedFile);
+ posFiles.addDelete(usedFile);
+ }
+ finally
+ {
+ lock.unlock();
+ }
}
public void appendAddRecordTransactional(final long txID, final long id, final byte recordType, final byte[] record) throws Exception
@@ -399,11 +420,19 @@
record.encode(bb);
bb.writeInt(size);
- JournalFile usedFile = appendRecord(bb.toByteBuffer(), false, getTransactionCallback(txID));
+ lock.lock();
+ try
+ {
+ JournalFile usedFile = appendRecord(bb.toByteBuffer(), false, getTransactionCallback(txID));
- JournalTransaction tx = getTransactionInfo(txID);
+ JournalTransaction tx = getTransactionInfo(txID);
- tx.addPositive(usedFile, id);
+ tx.addPositive(usedFile, id);
+ }
+ finally
+ {
+ lock.unlock();
+ }
}
public void appendUpdateRecordTransactional(final long txID,
@@ -437,11 +466,19 @@
record.encode(bb);
bb.writeInt(size);
- JournalFile usedFile = appendRecord(bb.toByteBuffer(), false, getTransactionCallback(txID));
+ lock.lock();
+ try
+ {
+ JournalFile usedFile = appendRecord(bb.toByteBuffer(), false, getTransactionCallback(txID));
- JournalTransaction tx = getTransactionInfo(txID);
+ JournalTransaction tx = getTransactionInfo(txID);
- tx.addPositive(usedFile, id);
+ tx.addPositive(usedFile, id);
+ }
+ finally
+ {
+ lock.unlock();
+ }
}
public void appendDeleteRecordTransactional(final long txID, final long id, final byte[] record) throws Exception
@@ -469,14 +506,21 @@
{
record.encode(bb);
}
-
bb.writeInt(size);
- JournalFile usedFile = appendRecord(bb.toByteBuffer(), false, getTransactionCallback(txID));
+ lock.lock();
+ try
+ {
+ JournalFile usedFile = appendRecord(bb.toByteBuffer(), false, getTransactionCallback(txID));
- JournalTransaction tx = getTransactionInfo(txID);
+ JournalTransaction tx = getTransactionInfo(txID);
- tx.addNegative(usedFile, id);
+ tx.addNegative(usedFile, id);
+ }
+ finally
+ {
+ lock.unlock();
+ }
}
public void appendDeleteRecordTransactional(final long txID, final long id) throws Exception
@@ -497,11 +541,19 @@
bb.writeInt(0);
bb.writeInt(size);
- JournalFile usedFile = appendRecord(bb.toByteBuffer(), false, getTransactionCallback(txID));
+ lock.lock();
+ try
+ {
+ JournalFile usedFile = appendRecord(bb.toByteBuffer(), false, getTransactionCallback(txID));
- JournalTransaction tx = getTransactionInfo(txID);
+ JournalTransaction tx = getTransactionInfo(txID);
- tx.addNegative(usedFile, id);
+ tx.addNegative(usedFile, id);
+ }
+ finally
+ {
+ lock.unlock();
+ }
}
/**
@@ -530,9 +582,17 @@
TransactionCallback callback = getTransactionCallback(txID);
- JournalFile usedFile = appendRecord(bb, syncTransactional, callback);
+ lock.lock();
+ try
+ {
+ JournalFile usedFile = appendRecord(bb, syncTransactional, callback);
- tx.prepare(usedFile);
+ tx.prepare(usedFile);
+ }
+ finally
+ {
+ lock.unlock();
+ }
// We should wait this outside of the lock, to increase throughput
if (callback != null)
@@ -576,11 +636,19 @@
TransactionCallback callback = getTransactionCallback(txID);
- JournalFile usedFile = appendRecord(bb, syncTransactional, callback);
+ lock.lock();
+ try
+ {
+ JournalFile usedFile = appendRecord(bb, syncTransactional, callback);
- transactionCallbacks.remove(txID);
+ transactionCallbacks.remove(txID);
- tx.commit(usedFile);
+ tx.commit(usedFile);
+ }
+ finally
+ {
+ lock.unlock();
+ }
// We should wait this outside of the lock, to increase throuput
if (callback != null)
@@ -615,11 +683,19 @@
TransactionCallback callback = getTransactionCallback(txID);
- JournalFile usedFile = appendRecord(bb, syncTransactional, callback);
+ lock.lock();
+ try
+ {
+ JournalFile usedFile = appendRecord(bb, syncTransactional, callback);
- transactionCallbacks.remove(txID);
+ transactionCallbacks.remove(txID);
- tx.rollback(usedFile);
+ tx.rollback(usedFile);
+ }
+ finally
+ {
+ lock.unlock();
+ }
// We should wait this outside of the lock, to increase throuput
if (callback != null)
@@ -684,11 +760,11 @@
* <tr><td>RecordID</td><td>Long (8 bytes)</td></tr>
* <tr><td>BodySize(Add, update and delete)</td><td>Integer (4 bytes)</td></tr>
* <tr><td>UserDefinedRecordType (If add/update only)</td><td>Byte (1)</td</tr>
- * <tr><td>RecordBody</td><td>Byte Array (bufferSize=BodySize)</td></tr>
+ * <tr><td>RecordBody</td><td>Byte Array (size=BodySize)</td></tr>
* <tr><td>Check Size</td><td>Integer (4 bytes)</td></tr>
* </table>
*
- * <p> The check-bufferSize is used to validate if the record is valid and complete </p>
+ * <p> The check-size is used to validate if the record is valid and complete </p>
*
* <p>Commit/Prepare record layout:</p>
* <table border=1>
@@ -736,10 +812,10 @@
{
// FIXME - We should extract everything we can from this file
// and then we shouldn't ever reuse this file on reclaiming (instead
- // reclaim on different bufferSize files would aways throw the file away)
+ // reclaim on different size files would aways throw the file away)
// rather than throw ISE!
// We don't want to leave the user with an unusable system
- throw new IllegalStateException("File is wrong bufferSize " + bytesRead +
+ throw new IllegalStateException("File is wrong size " + bytesRead +
" expected " +
fileSize +
" : " +
@@ -777,19 +853,6 @@
// This is what supports us from not re-filling the whole file
int readFileId = wholeFileBuffer.getInt();
- // IV - This record is from a previous file-usage. The file was
- // reused and we need to ignore this record
- if (readFileId != file.getOrderingID())
- {
- // If a file has damaged records, we make it a dataFile, and the
- // next reclaiming will fix it
- hasData = true;
-
- wholeFileBuffer.position(pos + 1);
-
- continue;
- }
-
long transactionID = 0;
if (isTransaction(recordType))
@@ -816,9 +879,9 @@
maxID = Math.max(maxID, recordID);
}
- // We use the bufferSize of the record to validate the health of the
+ // We use the size of the record to validate the health of the
// record.
- // (V) We verify the bufferSize of the record
+ // (V) We verify the size of the record
// The variable record portion used on Updates and Appends
int variableSize = 0;
@@ -842,6 +905,8 @@
if (wholeFileBuffer.position() + variableSize > fileSize)
{
log.warn("Record at position " + pos +
+ " type = " +
+ recordType +
" file:" +
file.getFile().getFileName() +
" is corrupted and it is being ignored");
@@ -862,17 +927,17 @@
{
if (recordType == PREPARE_RECORD)
{
- // Add the variable bufferSize required for preparedTransactions
+ // Add the variable size required for preparedTransactions
preparedTransactionExtraDataSize = wholeFileBuffer.getInt();
}
// Both commit and record contain the recordSummary, and this is
- // used to calculate the record-bufferSize on both record-types
+ // used to calculate the record-size on both record-types
variableSize += wholeFileBuffer.getInt() * SIZE_INT * 2;
}
int recordSize = getRecordSize(recordType);
- // VI - this is completing V, We will validate the bufferSize at the end
+ // VI - this is completing V, We will validate the size at the end
// of the record,
// But we avoid buffer overflows by damaged data
if (pos + recordSize + variableSize + preparedTransactionExtraDataSize > fileSize)
@@ -880,9 +945,11 @@
// Avoid a buffer overflow caused by damaged data... continue
// scanning for more records...
log.warn("Record at position " + pos +
+ " recordType = " +
+ recordType +
" file:" +
file.getFile().getFileName() +
- " is corrupted and it is being ignored");
+ " is corrupted and it is being ignored (II)");
// If a file has damaged records, we make it a dataFile, and the
// next reclaiming will fix it
hasData = true;
@@ -896,16 +963,18 @@
int checkSize = wholeFileBuffer.getInt();
- // VII - The checkSize at the end has to match with the bufferSize
+ // VII - The checkSize at the end has to match with the size
// informed at the beggining.
// This is like testing a hash for the record. (We could replace the
// checkSize by some sort of calculated hash)
if (checkSize != variableSize + recordSize + preparedTransactionExtraDataSize)
{
log.warn("Record at position " + pos +
+ " recordType = " +
+ recordType +
" file:" +
file.getFile().getFileName() +
- " is corrupted and it is being ignored");
+ " is corrupted and it is being ignored (III)");
// If a file has damaged records, we make it a dataFile, and the
// next reclaiming will fix it
@@ -916,6 +985,17 @@
continue;
}
+ // This record is from a previous file-usage. The file was
+ // reused and we need to ignore this record
+ if (readFileId != file.getOrderingID())
+ {
+ // If a file has damaged records, we make it a dataFile, and the
+ // next reclaiming will fix it
+ hasData = true;
+
+ continue;
+ }
+
wholeFileBuffer.position(oldPos);
// At this point everything is checked. So we relax and just load
@@ -1204,7 +1284,7 @@
// Create any more files we need
- // FIXME - bufferSize() involves a scan
+ // FIXME - size() involves a scan
int filesToCreate = minFiles - (dataFiles.size() + freeFiles.size());
if (filesToCreate > 0)
@@ -1391,7 +1471,7 @@
dataFiles.remove(file);
- // FIXME - bufferSize() involves a scan!!!
+ // FIXME - size() involves a scan!!!
if (freeFiles.size() + dataFiles.size() + 1 + openedFiles.size() < minFiles)
{
// Re-initialise it
@@ -1501,7 +1581,7 @@
throw new IllegalStateException("Journal is already stopped");
}
- lock.acquire();
+ lock.lock();
try
{
@@ -1536,7 +1616,7 @@
}
finally
{
- lock.release();
+ lock.unlock();
}
}
@@ -1744,7 +1824,7 @@
private int getRecordSize(final byte recordType)
{
- // The record bufferSize (without the variable portion)
+ // The record size (without the variable portion)
int recordSize = 0;
switch (recordType)
{
@@ -1845,11 +1925,8 @@
* */
private JournalFile appendRecord(final ByteBuffer bb, final boolean sync, final TransactionCallback callback) throws Exception
{
- lock.acquire();
+ lock.lock();
- // TOOD: when we add the timer on AIO, we need to make sure this routine locks buffered timer somehow, as the
- // offSet verification can't happen in the middle of the buffered timer
-
try
{
if (state != STATE_LOADED)
@@ -1908,8 +1985,7 @@
finally
{
currentFile.getFile().unlockBuffer();
-
- lock.release();
+ lock.unlock();
}
}
@@ -1974,9 +2050,17 @@
// You need to guarantee lock.acquire() before calling this method
private void moveNextFile() throws InterruptedException
{
- closeFile(currentFile);
+ lock.lock();
+ try
+ {
+ closeFile(currentFile);
- currentFile = enqueueOpenFile();
+ currentFile = enqueueOpenFile();
+ }
+ finally
+ {
+ lock.unlock();
+ }
}
/**
Modified: branches/Branch_JBM2_Perf_Clebert/tests/src/org/jboss/messaging/tests/performance/journal/PerformanceComparissonTest.java
===================================================================
--- branches/Branch_JBM2_Perf_Clebert/tests/src/org/jboss/messaging/tests/performance/journal/PerformanceComparissonTest.java 2009-05-30 22:49:25 UTC (rev 7140)
+++ branches/Branch_JBM2_Perf_Clebert/tests/src/org/jboss/messaging/tests/performance/journal/PerformanceComparissonTest.java 2009-05-31 04:00:52 UTC (rev 7141)
@@ -289,7 +289,7 @@
}
- public void disabled_testDeleteme() throws Exception
+ public void testDeleteme() throws Exception
{
JournalImpl journal = new JournalImpl(1024 * 1024 * 10, // 10M.. we believe that's the usual cilinder
Added: branches/Branch_JBM2_Perf_Clebert/tests/src/org/jboss/messaging/tests/unit/core/asyncio/timedbuffer/TimedBufferTest.java
===================================================================
--- branches/Branch_JBM2_Perf_Clebert/tests/src/org/jboss/messaging/tests/unit/core/asyncio/timedbuffer/TimedBufferTest.java (rev 0)
+++ branches/Branch_JBM2_Perf_Clebert/tests/src/org/jboss/messaging/tests/unit/core/asyncio/timedbuffer/TimedBufferTest.java 2009-05-31 04:00:52 UTC (rev 7141)
@@ -0,0 +1,133 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2009, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+
+package org.jboss.messaging.tests.unit.core.asyncio.timedbuffer;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.jboss.messaging.core.asyncio.AIOCallback;
+import org.jboss.messaging.core.asyncio.timedbuffer.TimedBuffer;
+import org.jboss.messaging.core.asyncio.timedbuffer.TimedBufferObserver;
+import org.jboss.messaging.tests.util.UnitTestCase;
+
+/**
+ * A TimedBufferTest
+ *
+ * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public class TimedBufferTest extends UnitTestCase
+{
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ AIOCallback dummyCallback = new AIOCallback()
+ {
+
+ public void done()
+ {
+ }
+
+ public void onError(int errorCode, String errorMessage)
+ {
+ }
+ };
+
+
+ public void testFillBuffer()
+ {
+ final ArrayList<ByteBuffer> buffers = new ArrayList<ByteBuffer>();
+ final AtomicInteger flushTimes = new AtomicInteger(0);
+ class TestObserver implements TimedBufferObserver
+ {
+ public void flushBuffer(ByteBuffer buffer, List<AIOCallback> callbacks)
+ {
+ buffers.add(buffer);
+ flushTimes.incrementAndGet();
+ }
+
+ /* (non-Javadoc)
+ * @see org.jboss.messaging.core.asyncio.timedbuffer.TimedBufferObserver#newBuffer(int, int)
+ */
+ public ByteBuffer newBuffer(int minSize, int maxSize)
+ {
+ return ByteBuffer.allocate(maxSize);
+ }
+ }
+
+ TimedBuffer timedBuffer = new TimedBuffer(new TestObserver(), 100, 3600 * 1000); // Any big timeout
+
+ int x = 0;
+ for (int i = 0 ; i < 10; i++)
+ {
+ ByteBuffer record = ByteBuffer.allocate(10);
+ for (int j = 0 ; j < 10; j++)
+ {
+ record.put((byte)getSamplebyte(x++));
+ }
+
+ record.rewind();
+ timedBuffer.addBytes(record, dummyCallback);
+ }
+
+
+ assertEquals(1, flushTimes.get());
+
+ ByteBuffer flushedBuffer = buffers.get(0);
+
+ assertEquals(100, flushedBuffer.limit());
+
+ assertEquals(100, flushedBuffer.capacity());
+
+
+ flushedBuffer.rewind();
+
+ for (int i = 0; i < 100; i++)
+ {
+ assertEquals(getSamplebyte(i), flushedBuffer.get());
+ }
+
+
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
More information about the jboss-cvs-commits
mailing list