[jboss-cvs] JBoss Messaging SVN: r4331 - in trunk: src/main/org/jboss/messaging/core/journal/impl and 2 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Wed May 28 18:01:09 EDT 2008
Author: clebert.suconic at jboss.com
Date: 2008-05-28 18:01:09 -0400 (Wed, 28 May 2008)
New Revision: 4331
Modified:
trunk/src/main/org/jboss/messaging/core/journal/SequentialFile.java
trunk/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFile.java
trunk/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFileFactory.java
trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java
trunk/src/main/org/jboss/messaging/core/journal/impl/NIOSequentialFile.java
trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/AIOSequentialFileFactoryTest.java
trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/SequentialFileFactoryTestBase.java
trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java
Log:
Refactoring on Journal
Modified: trunk/src/main/org/jboss/messaging/core/journal/SequentialFile.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/SequentialFile.java 2008-05-28 19:21:15 UTC (rev 4330)
+++ trunk/src/main/org/jboss/messaging/core/journal/SequentialFile.java 2008-05-28 22:01:09 UTC (rev 4331)
@@ -48,7 +48,7 @@
void delete() throws Exception;
- int write(ByteBuffer bytes, boolean sync, IOCallback callback) throws Exception;
+ int write(ByteBuffer bytes, IOCallback callback) throws Exception;
int write(ByteBuffer bytes, boolean sync) throws Exception;
Modified: trunk/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFile.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFile.java 2008-05-28 19:21:15 UTC (rev 4330)
+++ trunk/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFile.java 2008-05-28 22:01:09 UTC (rev 4331)
@@ -43,7 +43,9 @@
private final long timeout;
- private AsynchronousFile aioFile;
+ private final boolean sync;
+
+ private AsynchronousFile aioFile;
private AtomicLong position = new AtomicLong(0);
@@ -51,12 +53,13 @@
// serious performance problems. Because of that we make all the writes on AIO using a single thread.
private ExecutorService executor;
- public AIOSequentialFile(final String journalDir, final String fileName, final int maxIO, final long timeout) throws Exception
+ public AIOSequentialFile(final String journalDir, final String fileName, final int maxIO, final long timeout, final boolean sync) throws Exception
{
this.journalDir = journalDir;
this.fileName = fileName;
this.maxIO = maxIO;
this.timeout = timeout;
+ this.sync = sync;
}
public int getAlignment() throws Exception
@@ -188,18 +191,13 @@
int bytesRead = read (bytes, waitCompletion);
- waitCompletion.waitLatch();
+ waitCompletion.waitLatch(timeout);
- if (waitCompletion.errorMessage != null)
- {
- throw new MessagingException(waitCompletion.errorCode, waitCompletion.errorMessage);
- }
-
return bytesRead;
}
- public int write(final ByteBuffer bytes, boolean sync, final IOCallback callback) throws Exception
+ public int write(final ByteBuffer bytes, final IOCallback callback) throws Exception
{
final int bytesToWrite = bytes.limit();
@@ -210,34 +208,52 @@
return bytesToWrite;
}
+ public int write(final ByteBuffer bytes, final boolean sync) throws Exception
+ {
+ if (sync && this.sync)
+ {
+ WaitCompletion completion = new WaitCompletion();
+
+ int bytesWritten = write(bytes, completion);
+
+ completion.waitLatch(timeout);
+
+ return bytesWritten;
+ }
+ else
+ {
+ return write (bytes, DummyCallback.instance);
+ }
+
+ }
+
+
+ // Private methods
+ // -----------------------------------------------------------------------------------------------------
+
private void execWrite(final ByteBuffer bytes, final IOCallback callback,
- final int bytesToWrite, final long positionToWrite)
+ final int bytesToWrite, final long positionToWrite)
{
executor.execute(new Runnable()
- {
- public void run()
- {
- try
- {
- aioFile.write(positionToWrite, bytesToWrite, bytes, callback);
- }
- catch (Exception e)
- {
- log.warn (e.getMessage(), e);
- if (callback != null)
- {
- callback.onError(-1, e.getMessage());
- }
- }
- }
- });
+ {
+ public void run()
+ {
+ try
+ {
+ aioFile.write(positionToWrite, bytesToWrite, bytes, callback);
+ } catch (Exception e)
+ {
+ log.warn(e.getMessage(), e);
+ if (callback != null)
+ {
+ callback.onError(-1, e.getMessage());
+ }
+ }
+ }
+ });
}
+
- public int write(final ByteBuffer bytes, final boolean sync) throws Exception
- {
- return write (bytes, sync, DummyCallback.instance);
- }
-
private void checkOpened() throws Exception
{
if (aioFile == null || !opened)
@@ -281,9 +297,23 @@
latch.countDown();
}
- public void waitLatch() throws Exception
+ public boolean waitLatch(long timeout) throws Exception
{
- latch.await();
+ if (latch.await(timeout, TimeUnit.MILLISECONDS))
+ {
+ if (errorMessage != null)
+ {
+ throw new MessagingException(errorCode, errorMessage);
+ }
+ return true;
+ }
+ else
+ {
+ return false;
+ }
+
+
+
}
}
}
Modified: trunk/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFileFactory.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFileFactory.java 2008-05-28 19:21:15 UTC (rev 4330)
+++ trunk/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFileFactory.java 2008-05-28 22:01:09 UTC (rev 4331)
@@ -28,7 +28,7 @@
public SequentialFile createSequentialFile(final String fileName, final boolean sync, final int maxIO, final long timeout) throws Exception
{
- return new AIOSequentialFile(journalDir, fileName, maxIO, timeout);
+ return new AIOSequentialFile(journalDir, fileName, maxIO, timeout, sync);
}
public boolean supportsCallbacks()
Modified: trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java 2008-05-28 19:21:15 UTC (rev 4330)
+++ trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java 2008-05-28 22:01:09 UTC (rev 4331)
@@ -288,7 +288,7 @@
if (shouldUseCallback)
{
SimpleCallback callback = new SimpleCallback();
- usedFile = appendRecord(bb.getBuffer(), true, callback);
+ usedFile = appendRecord(bb.getBuffer(), callback);
callback.waitCompletion(aioTimeout);
}
else
@@ -323,7 +323,7 @@
if (shouldUseCallback)
{
SimpleCallback callback = new SimpleCallback();
- usedFile = appendRecord(bb, true, callback);
+ usedFile = appendRecord(bb, callback);
callback.waitCompletion(aioTimeout);
}
else
@@ -364,7 +364,7 @@
if (shouldUseCallback)
{
SimpleCallback callback = new SimpleCallback();
- usedFile = appendRecord(bb, true, callback);
+ usedFile = appendRecord(bb, callback);
callback.waitCompletion(aioTimeout);
}
else
@@ -403,7 +403,7 @@
if (shouldUseCallback)
{
SimpleCallback callback = new SimpleCallback();
- appendRecord(bb, true, callback);
+ appendRecord(bb, callback);
callback.waitCompletion(aioTimeout);
}
else
@@ -447,7 +447,7 @@
{
TransactionCallback callback = getTransactionCallback(txID);
callback.countUp();
- usedFile = appendRecord(bb.getBuffer(), false, callback);
+ usedFile = appendRecord(bb.getBuffer(), callback);
}
else
{
@@ -483,7 +483,7 @@
bb.put(DONE);
bb.rewind();
- JournalFile usedFile = appendRecord(bb, false, callback);
+ JournalFile usedFile = appendRecord(bb, callback);
TransactionNegPos tx = getTransactionInfo(txID);
@@ -514,7 +514,7 @@
bb.put(DONE);
bb.rewind();
- JournalFile usedFile = appendRecord(bb, false, callback);
+ JournalFile usedFile = appendRecord(bb, callback);
TransactionNegPos tx = getTransactionInfo(txID);
@@ -541,7 +541,7 @@
bb.put(DONE);
bb.rewind();
- JournalFile usedFile = appendRecord(bb, false, callback);
+ JournalFile usedFile = appendRecord(bb, callback);
TransactionNegPos tx = getTransactionInfo(txID);
@@ -574,7 +574,7 @@
bb.put(DONE);
bb.rewind();
- JournalFile usedFile = appendRecord(bb, true, callback);
+ JournalFile usedFile = appendRecord(bb, callback);
tx.prepare(usedFile);
}
@@ -607,7 +607,7 @@
{
TransactionCallback callback = getTransactionCallback(txID);
callback.countUp();
- usedFile = appendRecord(bb, true, callback);
+ usedFile = appendRecord(bb, callback);
callback.waitCompletion(aioTimeout);
}
else
@@ -648,7 +648,7 @@
if (shouldUseCallback)
{
SimpleCallback callback = new SimpleCallback();
- usedFile = appendRecord(bb, true, callback);
+ usedFile = appendRecord(bb, callback);
callback.waitCompletion(aioTimeout);
}
else
@@ -1459,7 +1459,7 @@
}
}
- private JournalFile appendRecord(final ByteBuffer bb, final boolean sync, final IOCallback callback) throws Exception
+ private JournalFile appendRecord(final ByteBuffer bb, final IOCallback callback) throws Exception
{
lock.acquire();
@@ -1468,7 +1468,7 @@
try
{
checkFile(size);
- currentFile.getFile().write(bb, sync, callback);
+ currentFile.getFile().write(bb, callback);
currentFile.extendOffset(size);
return currentFile;
}
Modified: trunk/src/main/org/jboss/messaging/core/journal/impl/NIOSequentialFile.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/impl/NIOSequentialFile.java 2008-05-28 19:21:15 UTC (rev 4330)
+++ trunk/src/main/org/jboss/messaging/core/journal/impl/NIOSequentialFile.java 2008-05-28 22:01:09 UTC (rev 4331)
@@ -158,24 +158,34 @@
public int write(final ByteBuffer bytes, final boolean sync) throws Exception
{
- return write(bytes, sync, null);
+ int bytesRead = channel.write(bytes);
+
+ if (sync && this.sync)
+ {
+ channel.force(false);
+ }
+
+ return bytesRead;
}
- public int write(final ByteBuffer bytes, final boolean sync, final IOCallback callback) throws Exception
+ public int write(final ByteBuffer bytes, final IOCallback callback) throws Exception
{
- int bytesRead = channel.write(bytes);
-
- if (sync && this.sync)
- {
- channel.force(false);
- }
-
- if (callback != null)
- {
- callback.done();
- }
-
- return bytesRead;
+ try
+ {
+ int bytesRead = channel.write(bytes);
+
+ if (callback != null)
+ {
+ callback.done();
+ }
+
+ return bytesRead;
+ }
+ catch (Exception e)
+ {
+ callback.onError(-1, e.getMessage());
+ throw e;
+ }
}
public void position(final int pos) throws Exception
Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/AIOSequentialFileFactoryTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/AIOSequentialFileFactoryTest.java 2008-05-28 19:21:15 UTC (rev 4330)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/AIOSequentialFileFactoryTest.java 2008-05-28 22:01:09 UTC (rev 4331)
@@ -138,7 +138,7 @@
buffer.put((byte)'b');
}
- file.write(buffer, true, callback);
+ file.write(buffer, callback);
}
Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/SequentialFileFactoryTestBase.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/SequentialFileFactoryTestBase.java 2008-05-28 19:21:15 UTC (rev 4330)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/SequentialFileFactoryTestBase.java 2008-05-28 22:01:09 UTC (rev 4331)
@@ -190,21 +190,15 @@
byte[] bytes3 = s3.getBytes("UTF-8");
ByteBuffer bb3 = factory.wrapBuffer(bytes3);
- FakeCallback callback = new FakeCallback();
- int bytesWritten = sf.write(bb1, true, callback);
- callback.waitComplete();
+ int bytesWritten = sf.write(bb1, true);
assertEquals(calculateRecordSize(bytes1.length, sf.getAlignment()), bytesWritten);
- callback = new FakeCallback();
- bytesWritten = sf.write(bb2, true, callback);
- callback.waitComplete();
+ bytesWritten = sf.write(bb2, true);
assertEquals(calculateRecordSize(bytes2.length, sf.getAlignment()), bytesWritten);
- callback = new FakeCallback();
- bytesWritten = sf.write(bb3, true, callback);
- callback.waitComplete();
+ bytesWritten = sf.write(bb3, true);
assertEquals(calculateRecordSize(bytes3.length, sf.getAlignment()), bytesWritten);
@@ -214,9 +208,7 @@
ByteBuffer rb2 = factory.newBuffer(bytes2.length);
ByteBuffer rb3 = factory.newBuffer(bytes3.length);
- callback = new FakeCallback();
- int bytesRead = sf.read(rb1, callback);
- callback.waitComplete();
+ int bytesRead = sf.read(rb1);
assertEquals(calculateRecordSize(bytes1.length, sf.getAlignment()), bytesRead);
for (int i=0; i<bytes1.length; i++)
@@ -224,9 +216,7 @@
assertEquals(bytes1[i], rb1.get(i));
}
- callback = new FakeCallback();
- bytesRead = sf.read(rb2, callback);
- callback.waitComplete();
+ bytesRead = sf.read(rb2);
assertEquals(calculateRecordSize(bytes2.length, sf.getAlignment()), bytesRead);
for (int i=0; i<bytes2.length; i++)
{
@@ -234,9 +224,7 @@
}
- callback = new FakeCallback();
- bytesRead = sf.read(rb3, callback);
- callback.waitComplete();
+ bytesRead = sf.read(rb3);
assertEquals(calculateRecordSize(bytes3.length, sf.getAlignment()), bytesRead);
for (int i=0; i<bytes3.length; i++)
{
@@ -265,21 +253,15 @@
byte[] bytes3 = s3.getBytes("UTF-8");
ByteBuffer bb3 = factory.wrapBuffer(bytes3);
- FakeCallback callback = new FakeCallback();
- int bytesWritten = sf.write(bb1, true, callback);
- callback.waitComplete();
+ int bytesWritten = sf.write(bb1, true);
assertEquals(bb1.limit(), bytesWritten);
- callback = new FakeCallback();
- bytesWritten = sf.write(bb2, true, callback);
- callback.waitComplete();
+ bytesWritten = sf.write(bb2, true);
assertEquals(bb2.limit(), bytesWritten);
- callback = new FakeCallback();
- bytesWritten = sf.write(bb3, true, callback);
- callback.waitComplete();
+ bytesWritten = sf.write(bb3, true);
assertEquals(bb3.limit(), bytesWritten);
@@ -329,9 +311,7 @@
byte[] bytes1 = s1.getBytes("UTF-8");
ByteBuffer bb1 = factory.wrapBuffer(bytes1);
- FakeCallback callback = new FakeCallback();
- int bytesWritten = sf.write(bb1, true, callback);
- callback.waitComplete();
+ int bytesWritten = sf.write(bb1, true);
assertEquals(bb1.limit(), bytesWritten);
Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java 2008-05-28 19:21:15 UTC (rev 4330)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java 2008-05-28 22:01:09 UTC (rev 4331)
@@ -229,7 +229,7 @@
data.position(pos);
}
- public int write(ByteBuffer bytes, boolean sync, IOCallback callback) throws Exception
+ public int write(ByteBuffer bytes, IOCallback callback) throws Exception
{
if (!open)
{
@@ -252,7 +252,7 @@
public int write(ByteBuffer bytes, boolean sync) throws Exception
{
- return write(bytes, sync, null);
+ return write(bytes, null);
}
private void checkAndResize(int size)
More information about the jboss-cvs-commits
mailing list