[jboss-cvs] JBoss Messaging SVN: r7134 - in branches/Branch_JBM2_Perf_Clebert: src/main/org/jboss/messaging/core/journal/impl and 3 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Fri May 29 17:44:57 EDT 2009
Author: clebert.suconic at jboss.com
Date: 2009-05-29 17:44:57 -0400 (Fri, 29 May 2009)
New Revision: 7134
Modified:
branches/Branch_JBM2_Perf_Clebert/src/main/org/jboss/messaging/core/journal/SequentialFile.java
branches/Branch_JBM2_Perf_Clebert/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFile.java
branches/Branch_JBM2_Perf_Clebert/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java
branches/Branch_JBM2_Perf_Clebert/src/main/org/jboss/messaging/core/journal/impl/NIOSequentialFile.java
branches/Branch_JBM2_Perf_Clebert/tests/src/org/jboss/messaging/tests/integration/journal/AIOSequentialFileFactoryTest.java
branches/Branch_JBM2_Perf_Clebert/tests/src/org/jboss/messaging/tests/performance/journal/PerformanceComparissonTest.java
branches/Branch_JBM2_Perf_Clebert/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java
Log:
Modified: branches/Branch_JBM2_Perf_Clebert/src/main/org/jboss/messaging/core/journal/SequentialFile.java
===================================================================
--- branches/Branch_JBM2_Perf_Clebert/src/main/org/jboss/messaging/core/journal/SequentialFile.java 2009-05-29 20:14:17 UTC (rev 7133)
+++ branches/Branch_JBM2_Perf_Clebert/src/main/org/jboss/messaging/core/journal/SequentialFile.java 2009-05-29 21:44:57 UTC (rev 7134)
@@ -40,6 +40,8 @@
void open() throws Exception;
boolean isOpen();
+
+ void setBuffering(boolean buffering);
/**
* For certain operations (like loading) we don't need open the file with full maxIO
Modified: branches/Branch_JBM2_Perf_Clebert/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFile.java
===================================================================
--- branches/Branch_JBM2_Perf_Clebert/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFile.java 2009-05-29 20:14:17 UTC (rev 7133)
+++ branches/Branch_JBM2_Perf_Clebert/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFile.java 2009-05-29 21:44:57 UTC (rev 7134)
@@ -72,6 +72,8 @@
private final TimedBuffer timedBuffer;
private BufferCallback bufferCallback;
+
+ private boolean buffering = true;
/** A context switch on AIO would make it to synchronize the disk before
switching to the new thread, what would cause
@@ -287,7 +289,14 @@
public void write(final ByteBuffer bytes, final IOCallback callback) throws Exception
{
- timedBuffer.addBytes(bytes, callback);
+ if (buffering)
+ {
+ timedBuffer.addBytes(bytes, callback);
+ }
+ else
+ {
+ doWrite(bytes, callback);
+ }
}
public void write(final ByteBuffer bytes, final boolean sync) throws Exception
@@ -311,6 +320,19 @@
}
}
+ /* (non-Javadoc)
+ * @see org.jboss.messaging.core.journal.SequentialFile#setBuffering(boolean)
+ */
+ public void setBuffering(boolean buffering)
+ {
+ this.buffering = buffering;
+ if (!buffering)
+ {
+ timedBuffer.flush();
+ }
+ };
+
+
public void sync() throws Exception
{
throw new IllegalArgumentException("This method is not supported on AIO");
@@ -341,11 +363,9 @@
// Private methods
// -----------------------------------------------------------------------------------------------------
- private void execWrite(final ByteBuffer bytes, final IOCallback callback)
+ private void doWrite(final ByteBuffer bytes, final IOCallback callback)
{
final int bytesToWrite = factory.calculateBlockSize(bytes.limit());
-
- bytes.limit(bytesToWrite);
final long positionToWrite = position.getAndAdd(bytesToWrite);
@@ -462,7 +482,7 @@
}
else
{
- execWrite(buffer, new DelegateCallback(callbacks));
+ doWrite(buffer, new DelegateCallback(callbacks));
}
}
@@ -482,7 +502,6 @@
}
}
- };
+ }
-
}
Modified: branches/Branch_JBM2_Perf_Clebert/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java
===================================================================
--- branches/Branch_JBM2_Perf_Clebert/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java 2009-05-29 20:14:17 UTC (rev 7133)
+++ branches/Branch_JBM2_Perf_Clebert/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java 2009-05-29 21:44:57 UTC (rev 7134)
@@ -1560,6 +1560,8 @@
SequentialFile sf = file.getFile();
+ sf.setBuffering(false);
+
sf.open(1);
ByteBuffer bb = fileFactory.newBuffer(SIZE_INT);
@@ -1568,6 +1570,8 @@
sf.write(bb, true);
+ sf.setBuffering(true);
+
JournalFile jf = new JournalFileImpl(sf, newOrderingID);
sf.position(bb.limit());
Modified: branches/Branch_JBM2_Perf_Clebert/src/main/org/jboss/messaging/core/journal/impl/NIOSequentialFile.java
===================================================================
--- branches/Branch_JBM2_Perf_Clebert/src/main/org/jboss/messaging/core/journal/impl/NIOSequentialFile.java 2009-05-29 20:14:17 UTC (rev 7133)
+++ branches/Branch_JBM2_Perf_Clebert/src/main/org/jboss/messaging/core/journal/impl/NIOSequentialFile.java 2009-05-29 21:44:57 UTC (rev 7134)
@@ -244,4 +244,11 @@
return "NIOSequentialFile " + file;
}
+ /* (non-Javadoc)
+ * @see org.jboss.messaging.core.journal.SequentialFile#setBuffering(boolean)
+ */
+ public void setBuffering(boolean buffering)
+ {
+ }
+
}
Modified: branches/Branch_JBM2_Perf_Clebert/tests/src/org/jboss/messaging/tests/integration/journal/AIOSequentialFileFactoryTest.java
===================================================================
--- branches/Branch_JBM2_Perf_Clebert/tests/src/org/jboss/messaging/tests/integration/journal/AIOSequentialFileFactoryTest.java 2009-05-29 20:14:17 UTC (rev 7133)
+++ branches/Branch_JBM2_Perf_Clebert/tests/src/org/jboss/messaging/tests/integration/journal/AIOSequentialFileFactoryTest.java 2009-05-29 21:44:57 UTC (rev 7134)
@@ -24,11 +24,8 @@
import java.io.File;
import java.nio.ByteBuffer;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.atomic.AtomicInteger;
import org.jboss.messaging.core.asyncio.impl.AsynchronousFileImpl;
-import org.jboss.messaging.core.journal.IOCallback;
import org.jboss.messaging.core.journal.SequentialFile;
import org.jboss.messaging.core.journal.SequentialFileFactory;
import org.jboss.messaging.core.journal.impl.AIOSequentialFileFactory;
@@ -80,104 +77,4 @@
factory.releaseBuffer(buff);
}
- public void testBlockCallback() throws Exception
- {
- class BlockCallback implements IOCallback
- {
- AtomicInteger countDone = new AtomicInteger(0);
-
- AtomicInteger countError = new AtomicInteger(0);
-
- CountDownLatch blockLatch;
-
- BlockCallback()
- {
- blockLatch = new CountDownLatch(1);
- }
-
- public void release()
- {
- blockLatch.countDown();
- }
-
- public void done()
- {
- try
- {
- blockLatch.await();
- }
- catch (InterruptedException e)
- {
- e.printStackTrace();
- }
-
- countDone.incrementAndGet();
- }
-
- public void onError(final int errorCode, final String errorMessage)
- {
- try
- {
- blockLatch.await();
- }
- catch (InterruptedException e)
- {
- e.printStackTrace();
- }
-
- countError.incrementAndGet();
- }
- }
-
- BlockCallback callback = new BlockCallback();
-
- final int NUMBER_OF_RECORDS = 500;
-
- SequentialFile file = factory.createSequentialFile("callbackBlock.log", 1000);
- file.open();
- file.fill(0, 512 * NUMBER_OF_RECORDS, (byte)'a');
-
- for (int i = 0; i < NUMBER_OF_RECORDS; i++)
- {
- ByteBuffer buffer = factory.newBuffer(512);
-
- buffer.putInt(i + 10);
-
- for (int j = buffer.position(); j < buffer.limit(); j++)
- {
- buffer.put((byte)'b');
- }
-
- file.write(buffer, callback);
- }
-
- callback.release();
- file.close();
- assertEquals(NUMBER_OF_RECORDS, callback.countDone.get());
- assertEquals(0, callback.countError.get());
-
- file.open();
-
- ByteBuffer buffer = factory.newBuffer(512);
-
- for (int i = 0; i < NUMBER_OF_RECORDS; i++)
- {
-
- file.read(buffer);
- buffer.rewind();
-
- int recordRead = buffer.getInt();
-
- assertEquals(i + 10, recordRead);
-
- for (int j = buffer.position(); j < buffer.limit(); j++)
- {
- assertEquals((byte)'b', buffer.get());
- }
-
- }
-
- file.close();
- }
-
}
Modified: branches/Branch_JBM2_Perf_Clebert/tests/src/org/jboss/messaging/tests/performance/journal/PerformanceComparissonTest.java
===================================================================
--- branches/Branch_JBM2_Perf_Clebert/tests/src/org/jboss/messaging/tests/performance/journal/PerformanceComparissonTest.java 2009-05-29 20:14:17 UTC (rev 7133)
+++ branches/Branch_JBM2_Perf_Clebert/tests/src/org/jboss/messaging/tests/performance/journal/PerformanceComparissonTest.java 2009-05-29 21:44:57 UTC (rev 7134)
@@ -120,7 +120,7 @@
testAddDeleteJournal(new AIOSequentialFileFactory(getTestDir(), 100 * 1024, 2),
NUM_RECORDS,
SIZE_RECORD,
- 13,
+ 20,
10 * 1024 * 1024);
}
@@ -139,7 +139,7 @@
testAddDeleteJournal(new NIOSequentialFileFactory(getTestDir()),
NUM_RECORDS,
SIZE_RECORD,
- 13,
+ 20,
10 * 1024 * 1024);
}
}
@@ -250,6 +250,58 @@
}
}
+ public void testDeleteme() throws Exception
+ {
+
+ JournalImpl journal = new JournalImpl(1024 * 1024 * 10, // 10M.. we believe that's the usual cilinder
+ // size.. not an exact science here
+ 10, // number of files pre-allocated
+ true, // sync on commit
+ false, // no sync on non transactional
+ new AIOSequentialFileFactory(getTestDir(), 1024 * 1024, 2), // AIO or NIO
+ "jbm", // file name
+ "jbm", // extension
+ 500); // it's like a semaphore for callback on the AIO layer
+ // this during record writes
+
+ journal.start();
+ journal.load(dummyLoader);
+
+ FakeMessage msg = new FakeMessage(1024);
+ FakeQueueEncoding update = new FakeQueueEncoding();
+
+ journal.appendAddRecord(1, (byte)1, msg);
+
+ journal.forceMoveNextFile();
+
+ journal.appendUpdateRecord(1, (byte)2, update);
+
+ journal.appendAddRecord(2, (byte)1, msg);
+
+ journal.appendDeleteRecord(1);
+
+ journal.forceMoveNextFile();
+
+ journal.appendDeleteRecord(2);
+
+ journal.stop();
+
+ journal = new JournalImpl(1024 * 1024 * 10, // 10M.. we believe that's the usual cilinder
+ // size.. not an exact science here
+ 2, // number of files pre-allocated
+ true, // sync on commit
+ false, // no sync on non transactional
+ new AIOSequentialFileFactory(getTestDir(), 1024 * 1024, 2), // AIO or NIO
+ "jbm", // file name
+ "jbm", // extension
+ 500); // it's like a semaphore for callback on the AIO layer
+ // this during record writes
+
+ journal.start();
+ journal.load(dummyLoader);
+
+ }
+
public void testJournal(SequentialFileFactory fileFactory, long records, int size, int numberOfFiles, int journalSize) throws Exception
{
@@ -302,12 +354,16 @@
{
final int size;
- byte buffer[];
+ byte bytes[];
FakeMessage(int size)
{
this.size = size;
- buffer = new byte[size];
+ bytes = new byte[size];
+ for (int i = 0; i < size; i++)
+ {
+ bytes[i] = (byte)'a';
+ }
}
public void decode(MessagingBuffer buffer)
@@ -316,7 +372,7 @@
public void encode(MessagingBuffer buffer)
{
- buffer.writeBytes(new byte[size]);
+ buffer.writeBytes(this.bytes);
}
/* (non-Javadoc)
@@ -331,27 +387,21 @@
private static class FakeQueueEncoding implements EncodingSupport
{
- long queueID;
- public FakeQueueEncoding(final long queueID)
- {
- super();
- this.queueID = queueID;
- }
-
public FakeQueueEncoding()
{
- super();
}
public void decode(final MessagingBuffer buffer)
{
- queueID = buffer.readLong();
}
public void encode(final MessagingBuffer buffer)
{
- buffer.writeLong(queueID);
+ for (int i = 0 ; i < 8; i++)
+ {
+ buffer.writeByte((byte)'q');
+ }
}
public int getEncodeSize()
Modified: branches/Branch_JBM2_Perf_Clebert/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java
===================================================================
--- branches/Branch_JBM2_Perf_Clebert/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java 2009-05-29 20:14:17 UTC (rev 7133)
+++ branches/Branch_JBM2_Perf_Clebert/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java 2009-05-29 21:44:57 UTC (rev 7134)
@@ -570,6 +570,13 @@
return data.position() + size <= data.limit();
}
+ /* (non-Javadoc)
+ * @see org.jboss.messaging.core.journal.SequentialFile#setBuffering(boolean)
+ */
+ public void setBuffering(boolean buffering)
+ {
+ }
+
}
/* (non-Javadoc)
@@ -592,7 +599,6 @@
*/
public BufferCallback getBufferCallback()
{
- // TODO Auto-generated method stub
return null;
}
More information about the jboss-cvs-commits
mailing list