[jboss-cvs] JBoss Messaging SVN: r4257 - in trunk: src/main/org/jboss/messaging/core/asyncio/impl and 9 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Tue May 20 21:51:25 EDT 2008
Author: clebert.suconic at jboss.com
Date: 2008-05-20 21:51:25 -0400 (Tue, 20 May 2008)
New Revision: 4257
Modified:
trunk/src/config/jbm-configuration.xml
trunk/src/main/org/jboss/messaging/core/asyncio/impl/AsynchronousFileImpl.java
trunk/src/main/org/jboss/messaging/core/config/Configuration.java
trunk/src/main/org/jboss/messaging/core/config/impl/ConfigurationImpl.java
trunk/src/main/org/jboss/messaging/core/config/impl/FileConfiguration.java
trunk/src/main/org/jboss/messaging/core/journal/TestableJournal.java
trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java
trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java
trunk/tests/config/ConfigurationTest-config.xml
trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/message/MessageTestBase.java
trunk/tests/src/org/jboss/messaging/tests/unit/core/config/impl/ConfigurationTest.java
trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalImplTestBase.java
trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalImplTestUnit.java
Log:
JBMESSAGING-1283 - Few improvements on AIO:
i - reverting usage of Semaphores to control MaxAIO
ii- Adding MaxAIO as a configuration parameter
iii - Few testcases fixes
Modified: trunk/src/config/jbm-configuration.xml
===================================================================
--- trunk/src/config/jbm-configuration.xml 2008-05-20 23:43:19 UTC (rev 4256)
+++ trunk/src/config/jbm-configuration.xml 2008-05-21 01:51:25 UTC (rev 4257)
@@ -78,6 +78,10 @@
<journal-min-files>10</journal-min-files>
+ <!-- Maximum of simultaneous asynchronous writes accepted by the native layer.
+ (parameter ignored on NIO) -->
+ <journal-max-aio>5000</journal-max-aio>
+
<journal-task-period>5000</journal-task-period>
<security-enabled>true</security-enabled>
Modified: trunk/src/main/org/jboss/messaging/core/asyncio/impl/AsynchronousFileImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/asyncio/impl/AsynchronousFileImpl.java 2008-05-20 23:43:19 UTC (rev 4256)
+++ trunk/src/main/org/jboss/messaging/core/asyncio/impl/AsynchronousFileImpl.java 2008-05-21 01:51:25 UTC (rev 4257)
@@ -8,6 +8,7 @@
package org.jboss.messaging.core.asyncio.impl;
import java.nio.ByteBuffer;
+import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
@@ -101,6 +102,7 @@
private VariableLatch writeLatch = new VariableLatch();
private ReadWriteLock lock = new ReentrantReadWriteLock();
private Lock writeLock = lock.writeLock();
+ private Semaphore writeSemaphore;
/**
* Warning: Beware of the C++ pointer! It will bite you! :-)
@@ -117,7 +119,8 @@
try
{
writeLock.lock();
- this.maxIO = maxIO;
+ this.maxIO = maxIO;
+ writeSemaphore = new Semaphore(this.maxIO);
if (opened)
{
@@ -125,8 +128,8 @@
}
opened = true;
this.fileName=fileName;
- handler = init (fileName, maxIO, log);
- addMax(maxIO);
+ handler = init (fileName, this.maxIO, log);
+ addMax(this.maxIO);
startPoller();
}
finally
@@ -139,14 +142,16 @@
{
checkOpened();
- writeLock.lock();
- writeLatch.waitCompletion();
- stopPoller(handler);
- // We need to make sure we won't call close until Poller is completely done, or we might get beautiful GPFs
- poller.join();
try
{
- closeInternal(handler);
+ writeLock.lock();
+ writeLatch.waitCompletion(120);
+ writeSemaphore = null;
+ stopPoller(handler);
+ // We need to make sure we won't call close until Poller is completely done, or we might get beautiful GPFs
+ poller.join();
+
+ closeInternal(handler);
addMax(maxIO * -1);
opened = false;
handler = 0;
@@ -161,12 +166,14 @@
{
checkOpened();
writeLatch.up();
+ writeSemaphore.acquireUninterruptibly();
try
{
write (handler, position, size, directByteBuffer, aioPackage);
}
catch (RuntimeException e)
{
+ writeSemaphore.release();
writeLatch.down();
throw e;
}
@@ -177,12 +184,14 @@
{
checkOpened();
writeLatch.up();
+ writeSemaphore.acquireUninterruptibly();
try
{
read (handler, position, size, directByteBuffer, aioPackage);
}
catch (RuntimeException e)
{
+ writeSemaphore.release();
writeLatch.down();
throw e;
}
@@ -213,6 +222,7 @@
@SuppressWarnings("unused") // Called by the JNI layer.. just ignore the warning
private void callbackDone(AIOCallback callback)
{
+ writeSemaphore.release();
writeLatch.down();
callback.done();
}
@@ -220,6 +230,7 @@
@SuppressWarnings("unused") // Called by the JNI layer.. just ignore the warning
private void callbackError(AIOCallback callback, int errorCode, String errorMessage)
{
+ writeSemaphore.release();
writeLatch.down();
callback.onError(errorCode, errorMessage);
}
Modified: trunk/src/main/org/jboss/messaging/core/config/Configuration.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/config/Configuration.java 2008-05-20 23:43:19 UTC (rev 4256)
+++ trunk/src/main/org/jboss/messaging/core/config/Configuration.java 2008-05-21 01:51:25 UTC (rev 4257)
@@ -106,6 +106,10 @@
int getJournalFileSize();
int getJournalMinFiles();
+
+ int getJournalMaxAIO();
+
+ void setJournalMaxAIO(int max);
long getJournalTaskPeriod();
Modified: trunk/src/main/org/jboss/messaging/core/config/impl/ConfigurationImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/config/impl/ConfigurationImpl.java 2008-05-20 23:43:19 UTC (rev 4256)
+++ trunk/src/main/org/jboss/messaging/core/config/impl/ConfigurationImpl.java 2008-05-21 01:51:25 UTC (rev 4257)
@@ -51,6 +51,7 @@
public static final int DEFAULT_REQRES_TIMEOUT = 5; // in seconds
public static final boolean DEFAULT_INVM_DISABLED = false;
public static final boolean DEFAULT_SSL_ENABLED = false;
+ public static final int DEFAULT_MAX_AIO = 3000;
protected List<String> defaultInterceptors = new ArrayList<String>();
@@ -82,6 +83,8 @@
protected int journalMinFiles;
+ protected int maxAIO;
+
protected long journalTaskPeriod;
protected boolean securityEnabled = true;
@@ -405,6 +408,16 @@
return journalFileSize;
}
+ public int getJournalMaxAIO()
+ {
+ return maxAIO;
+ }
+
+ public void setJournalMaxAIO(int max)
+ {
+ this.maxAIO = max;
+ }
+
public int getJournalMinFiles()
{
return journalMinFiles;
Modified: trunk/src/main/org/jboss/messaging/core/config/impl/FileConfiguration.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/config/impl/FileConfiguration.java 2008-05-20 23:43:19 UTC (rev 4256)
+++ trunk/src/main/org/jboss/messaging/core/config/impl/FileConfiguration.java 2008-05-21 01:51:25 UTC (rev 4257)
@@ -136,6 +136,8 @@
this.journalTaskPeriod = getLong(e, "journal-task-period", 5000L);
+ this.maxAIO = getInteger(e, "journal-max-aio", DEFAULT_MAX_AIO);
+
this.securityEnabled = getBoolean(e, "security-enabled", true);
NodeList defaultInterceptors = e.getElementsByTagName("default-interceptors-config");
Modified: trunk/src/main/org/jboss/messaging/core/journal/TestableJournal.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/TestableJournal.java 2008-05-20 23:43:19 UTC (rev 4256)
+++ trunk/src/main/org/jboss/messaging/core/journal/TestableJournal.java 2008-05-21 01:51:25 UTC (rev 4257)
@@ -5,6 +5,7 @@
* A TestableJournal
*
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @author <a href="mailto:clebert.suconic at jboss.com">Clebert Suconic</a>
*
*/
public interface TestableJournal extends Journal
@@ -17,6 +18,10 @@
int getIDMapSize();
+ String debug() throws Exception;
+
+ void debugWait() throws Exception;
+
//void dump();
}
Modified: trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java 2008-05-20 23:43:19 UTC (rev 4256)
+++ trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java 2008-05-21 01:51:25 UTC (rev 4257)
@@ -40,7 +40,6 @@
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadPoolExecutor;
@@ -1187,8 +1186,8 @@
reclaimer.scan(dataFiles.toArray(files));
}
-
- public String debug() throws Exception
+
+ public String debug() throws Exception
{
this.checkReclaimStatus();
@@ -1209,6 +1208,28 @@
return builder.toString();
}
+
+ /** Method for use on testcases.
+ * It will call waitComplete on every transaction, so any assertions on the file system will be correct after this */
+ public void debugWait() throws Exception
+ {
+ for (TransactionCallback callback: transactionCallbacks.values())
+ {
+ callback.waitCompletion();
+ }
+
+ final CountDownLatch latch = new CountDownLatch(1);
+
+ this.closingExecutor.execute(new Runnable(){
+ public void run()
+ {
+ latch.countDown();
+ }
+ });
+
+ // just to make sure the closing thread is empty
+ latch.await();
+ }
// TestableJournal implementation --------------------------------------------------------------
Modified: trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java 2008-05-20 23:43:19 UTC (rev 4256)
+++ trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java 2008-05-21 01:51:25 UTC (rev 4257)
@@ -147,7 +147,7 @@
messageJournal = new JournalImpl(config.getJournalFileSize(),
config.getJournalMinFiles(), config.isJournalSync(), journalFF,
- config.getJournalTaskPeriod(), "jbm-data", "jbm", 2000);
+ config.getJournalTaskPeriod(), "jbm-data", "jbm", config.getJournalMaxAIO());
}
public long generateMessageID()
Modified: trunk/tests/config/ConfigurationTest-config.xml
===================================================================
--- trunk/tests/config/ConfigurationTest-config.xml 2008-05-20 23:43:19 UTC (rev 4256)
+++ trunk/tests/config/ConfigurationTest-config.xml 2008-05-21 01:51:25 UTC (rev 4257)
@@ -50,7 +50,9 @@
<remoting-ssl-keystore-path>messaging.keystore</remoting-ssl-keystore-path>
<remoting-ssl-keystore-password>secureexample keystore</remoting-ssl-keystore-password>
<remoting-ssl-truststore-path>messaging.truststore</remoting-ssl-truststore-path>
- <remoting-ssl-truststore-password>secureexample truststore</remoting-ssl-truststore-password>
+ <remoting-ssl-truststore-password>secureexample truststore</remoting-ssl-truststore-password>
+
+ <journal-max-aio>123</journal-max-aio>
</configuration>
Modified: trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/message/MessageTestBase.java
===================================================================
--- trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/message/MessageTestBase.java 2008-05-20 23:43:19 UTC (rev 4256)
+++ trunk/tests/jms-tests/src/org/jboss/test/messaging/jms/message/MessageTestBase.java 2008-05-21 01:51:25 UTC (rev 4257)
@@ -93,7 +93,8 @@
log.debug("Message sent");
- Message r = queueCons.receive();
+ Message r = queueCons.receive(2000);
+ assertNotNull(r);
log.debug("Message received");
@@ -110,7 +111,8 @@
queueProd.send(message);
- Message r = queueCons.receive();
+ Message r = queueCons.receive(1000);
+ assertNotNull(r);
assertEquals(DeliveryMode.PERSISTENT, r.getJMSDeliveryMode());
Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/config/impl/ConfigurationTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/config/impl/ConfigurationTest.java 2008-05-20 23:43:19 UTC (rev 4256)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/config/impl/ConfigurationTest.java 2008-05-21 01:51:25 UTC (rev 4257)
@@ -126,6 +126,11 @@
assertEquals("org.jboss.tst2", configuration.getDefaultInterceptors().get(1));
}
+ public void testMaxAIO() throws Exception
+ {
+ assertEquals(123, configuration.getJournalMaxAIO());
+ }
+
//config is supposed to be immutable??
// public void testPropertyChangeListener() throws Exception
// {
Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalImplTestBase.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalImplTestBase.java 2008-05-20 23:43:19 UTC (rev 4256)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalImplTestBase.java 2008-05-21 01:51:25 UTC (rev 4257)
@@ -127,7 +127,7 @@
this.minFiles = minFreeFiles;
this.fileSize = fileSize;
this.sync = sync;
- this.maxAIO = 1000;
+ this.maxAIO = 50;
}
public void createJournal() throws Exception
Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalImplTestUnit.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalImplTestUnit.java 2008-05-20 23:43:19 UTC (rev 4256)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalImplTestUnit.java 2008-05-21 01:51:25 UTC (rev 4257)
@@ -739,6 +739,8 @@
addTx(1, i);
}
+ journal.debugWait();
+
assertEquals(calculateNumberOfFiles(fileSize , journal.getAlignment(), 100, recordLength), journal.getDataFilesCount());
assertEquals(0, journal.getFreeFilesCount());
assertEquals(0, journal.getIDMapSize());
@@ -776,6 +778,8 @@
updateTx(1, i);
}
+ journal.debugWait();
+
assertEquals(calculateNumberOfFiles(fileSize , journal.getAlignment(), 200, recordLength), journal.getDataFilesCount());
assertEquals(0, journal.getFreeFilesCount());
assertEquals(0, journal.getIDMapSize());
@@ -813,6 +817,7 @@
deleteTx(1, i);
}
+ journal.debugWait();
assertEquals(calculateNumberOfFiles(fileSize , journal.getAlignment(), 200, recordLength, 200, JournalImpl.SIZE_DELETE_RECORD_TX), journal.getDataFilesCount());
@@ -921,6 +926,8 @@
deleteTx(1, 1); // in file 1
+ journal.debugWait();
+
List<String> files2 = fileFactory.listFiles(fileExtension);
assertEquals(2, files2.size());
@@ -975,7 +982,7 @@
assertEquals(0, journal.getFreeFilesCount());
assertEquals(2, journal.getIDMapSize());
- Thread.sleep(1000);
+ journal.debugWait();
//Now restart
stopJournal();
@@ -1229,6 +1236,8 @@
addTx(1, 1);
+ journal.debugWait();
+
List<String> files2 = fileFactory.listFiles(fileExtension);
assertEquals(2, files2.size());
More information about the jboss-cvs-commits
mailing list