[jboss-cvs] JBoss Messaging SVN: r4128 - in branches/trunk_tmp_aio: src/main/org/jboss/messaging/core/asyncio/impl and 3 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Mon Apr 28 15:03:12 EDT 2008
Author: clebert.suconic at jboss.com
Date: 2008-04-28 15:03:12 -0400 (Mon, 28 Apr 2008)
New Revision: 4128
Modified:
branches/trunk_tmp_aio/src/main/org/jboss/messaging/core/asyncio/AsynchronousFile.java
branches/trunk_tmp_aio/src/main/org/jboss/messaging/core/asyncio/impl/AsynchronousFileImpl.java
branches/trunk_tmp_aio/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFile.java
branches/trunk_tmp_aio/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java
branches/trunk_tmp_aio/tests/src/org/jboss/messaging/core/asyncio/impl/test/integration/SingleThreadWriteNativeTest.java
branches/trunk_tmp_aio/tests/src/org/jboss/messaging/core/journal/impl/test/timing/RealJournalImplAIOTest.java
Log:
Fixing concurrent broken usage
Modified: branches/trunk_tmp_aio/src/main/org/jboss/messaging/core/asyncio/AsynchronousFile.java
===================================================================
--- branches/trunk_tmp_aio/src/main/org/jboss/messaging/core/asyncio/AsynchronousFile.java 2008-04-28 15:56:47 UTC (rev 4127)
+++ branches/trunk_tmp_aio/src/main/org/jboss/messaging/core/asyncio/AsynchronousFile.java 2008-04-28 19:03:12 UTC (rev 4128)
@@ -18,7 +18,7 @@
public interface AsynchronousFile
{
- void close();
+ void close() throws Exception;
/**
*
Modified: branches/trunk_tmp_aio/src/main/org/jboss/messaging/core/asyncio/impl/AsynchronousFileImpl.java
===================================================================
--- branches/trunk_tmp_aio/src/main/org/jboss/messaging/core/asyncio/impl/AsynchronousFileImpl.java 2008-04-28 15:56:47 UTC (rev 4127)
+++ branches/trunk_tmp_aio/src/main/org/jboss/messaging/core/asyncio/impl/AsynchronousFileImpl.java 2008-04-28 19:03:12 UTC (rev 4128)
@@ -8,6 +8,7 @@
package org.jboss.messaging.core.asyncio.impl;
import java.nio.ByteBuffer;
+import java.util.concurrent.Semaphore;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -29,14 +30,14 @@
private String fileName;
private Thread poller;
private static boolean loaded = true;
+ private int maxIO;
+ Semaphore writeSemaphore;
+
ReadWriteLock lock = new ReentrantReadWriteLock();
Lock writeLock = lock.writeLock();
- Lock readLock = lock.readLock();
- ReadWriteLock lockPoller = new ReentrantReadWriteLock();
- Lock readPollerLock = lockPoller.readLock();
- Lock writePollerLock = lockPoller.writeLock();
+ Semaphore pollerSemaphore = new Semaphore(1);
/**
* Warning: Beware of the C++ pointer! It will bite you! :-)
@@ -70,6 +71,10 @@
try
{
writeLock.lock();
+ this.maxIO = maxIO;
+
+ this.writeSemaphore = new Semaphore(maxIO);
+
if (opened)
{
throw new IllegalStateException("AsynchronousFile is already opened");
@@ -96,26 +101,26 @@
// informing caller that this thread already has the lock
try
{
- pollEvents();
+ pollEvents();
}
finally
{
- readPollerLock.unlock();
+ pollerSemaphore.release();
}
}
}
- public void close()
+ public void close() throws Exception
{
checkOpened();
writeLock.lock();
+ writeSemaphore.acquire(maxIO);
stopPoller(handler);
-
// We need to make sure we won't call close until Poller is completely done, or we might get beautiful GPFs
- writePollerLock.lock();
try
{
+ pollerSemaphore.acquire();
closeInternal(handler);
opened = false;
handler = 0;
@@ -123,7 +128,7 @@
finally
{
writeLock.unlock();
- writePollerLock.unlock();
+ pollerSemaphore.release();
}
}
@@ -131,14 +136,14 @@
public void write(long position, long size, ByteBuffer directByteBuffer, AIOCallback aioPackage)
{
checkOpened();
- readLock.lock();
+ this.writeSemaphore.acquireUninterruptibly();
try
{
write (handler, position, size, directByteBuffer, aioPackage);
}
catch (RuntimeException e)
{
- readLock.unlock();
+ writeSemaphore.release();
throw e;
}
@@ -147,14 +152,14 @@
public void read(long position, long size, ByteBuffer directByteBuffer, AIOCallback aioPackage)
{
checkOpened();
- readLock.lock();
+ this.writeSemaphore.acquireUninterruptibly();
try
{
read (handler, position, size, directByteBuffer, aioPackage);
}
catch (RuntimeException e)
{
- readLock.unlock();
+ writeSemaphore.release();
throw e;
}
@@ -183,14 +188,14 @@
@SuppressWarnings("unused") // Called by the JNI layer.. just ignore the warning
private void callbackDone(AIOCallback callback)
{
- readLock.unlock();
+ writeSemaphore.release();
callback.done();
}
@SuppressWarnings("unused") // Called by the JNI layer.. just ignore the warning
private void callbackError(AIOCallback callback, int errorCode, String errorMessage)
{
- readLock.unlock();
+ writeSemaphore.release();
callback.onError(errorCode, errorMessage);
}
@@ -206,12 +211,12 @@
private synchronized void startPoller()
{
checkOpened();
- readPollerLock.lock();
poller = new PollerThread();
try
{
- poller.start();
+ this.pollerSemaphore.acquire();
+ poller.start();
}
catch (Exception ex)
{
Modified: branches/trunk_tmp_aio/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFile.java
===================================================================
--- branches/trunk_tmp_aio/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFile.java 2008-04-28 15:56:47 UTC (rev 4127)
+++ branches/trunk_tmp_aio/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFile.java 2008-04-28 19:03:12 UTC (rev 4128)
@@ -111,7 +111,7 @@
public void open() throws Exception
{
aioFile = new AsynchronousFileImpl();
- aioFile.open(journalDir + "/" + fileName, 500);
+ aioFile.open(journalDir + "/" + fileName, 1000);
position.set(0);
}
Modified: branches/trunk_tmp_aio/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java
===================================================================
--- branches/trunk_tmp_aio/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java 2008-04-28 15:56:47 UTC (rev 4127)
+++ branches/trunk_tmp_aio/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java 2008-04-28 19:03:12 UTC (rev 4128)
@@ -963,7 +963,9 @@
for (int i = 0; i < filesToCreate; i++)
{
- freeFiles.add(createFile());
+ // Keeping all files opened can be very costly (mainly on AIO)
+ JournalFile createdFile = createFile();
+ freeFiles.add(createdFile);
}
//The current file is the last one
@@ -991,6 +993,7 @@
else
{
currentFile = freeFiles.remove();
+ currentFile.getFile().open();
}
for (RecordInfo record: records)
@@ -1263,6 +1266,8 @@
info.extendOffset(bytesWritten);
+ info.getFile().close();
+
return info;
}
@@ -1311,10 +1316,12 @@
if (!freeFiles.isEmpty())
{
currentFile = freeFiles.remove();
+ currentFile.getFile().open();
}
else
{
currentFile = createFile();
+ currentFile.getFile().open();
}
}
}
Modified: branches/trunk_tmp_aio/tests/src/org/jboss/messaging/core/asyncio/impl/test/integration/SingleThreadWriteNativeTest.java
===================================================================
--- branches/trunk_tmp_aio/tests/src/org/jboss/messaging/core/asyncio/impl/test/integration/SingleThreadWriteNativeTest.java 2008-04-28 15:56:47 UTC (rev 4127)
+++ branches/trunk_tmp_aio/tests/src/org/jboss/messaging/core/asyncio/impl/test/integration/SingleThreadWriteNativeTest.java 2008-04-28 19:03:12 UTC (rev 4128)
@@ -166,7 +166,7 @@
public void testAddAsyncData() throws Exception
{
- asyncData(150000,1024,20000);
+ asyncData(500000,1024,30000);
}
public void testValidateData() throws Exception
@@ -599,7 +599,7 @@
for (LocalAIO tmp: list)
{
controller.write(counter * size, size, block, tmp);
- if (++counter % 5000 == 0)
+ if (++counter % 20000 == 0)
{
System.out.println(5000*1000/(System.currentTimeMillis()-lastTime) + " rec/sec (Async)");
lastTime = System.currentTimeMillis();
Modified: branches/trunk_tmp_aio/tests/src/org/jboss/messaging/core/journal/impl/test/timing/RealJournalImplAIOTest.java
===================================================================
--- branches/trunk_tmp_aio/tests/src/org/jboss/messaging/core/journal/impl/test/timing/RealJournalImplAIOTest.java 2008-04-28 15:56:47 UTC (rev 4127)
+++ branches/trunk_tmp_aio/tests/src/org/jboss/messaging/core/journal/impl/test/timing/RealJournalImplAIOTest.java 2008-04-28 19:03:12 UTC (rev 4128)
@@ -65,15 +65,20 @@
public void testSpeedNonTransactional() throws Exception
{
+ final int numMessages = 3000000;
+
+ long numFiles = (int)(((numMessages * 1024 + 512) / (10 * 1024 * 1024)) * 1.1);
+
+ log.info("num Files=" + numFiles);
+
Journal journal =
- new JournalImpl(10 * 1024 * 1024, 10, true, new AIOSequentialFileFactory(journalDir),
+ new JournalImpl(10 * 1024 * 1024, 100, true, new AIOSequentialFileFactory(journalDir),
5000, "jbm-data", "jbm");
journal.start();
journal.load(new ArrayList<RecordInfo>(), null);
- final int numMessages = 50000;
final CountDownLatch latch = new CountDownLatch(numMessages);
@@ -121,12 +126,9 @@
long start = System.currentTimeMillis();
- ArrayList<LocalCallback> callbacks = new ArrayList<LocalCallback>();
-
+ LocalCallback callback = new LocalCallback(1, latch);
for (int i = 0; i < numMessages; i++)
{
- LocalCallback callback = new LocalCallback(i, latch);
- callbacks.add(callback);
journal.appendAddRecord(i, data, callback);
}
@@ -138,21 +140,6 @@
boolean failed = false;
- for (LocalCallback callback: callbacks)
- {
- if (callback.message != null)
- {
- fail(callback.message);
- }
-
- if (!callback.done)
- {
- System.out.println("callback i=" + callback.i + " was not received!");
- failed = true;
- }
- }
-
-
// If this fails it is probably because JournalImpl it is closing the files without waiting all the completes to arrive first
assertFalse(failed);
More information about the jboss-cvs-commits
mailing list