[jboss-cvs] JBoss Messaging SVN: r4277 - in trunk: src/config and 24 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Wed May 21 23:07:49 EDT 2008
Author: clebert.suconic at jboss.com
Date: 2008-05-21 23:07:49 -0400 (Wed, 21 May 2008)
New Revision: 4277
Added:
trunk/tests/src/org/jboss/messaging/tests/integration/core/asyncio/impl/AIOTestBase.java
trunk/tests/src/org/jboss/messaging/tests/performance/
trunk/tests/src/org/jboss/messaging/tests/performance/journal/
trunk/tests/src/org/jboss/messaging/tests/performance/journal/FakeJournalImplTest.java
trunk/tests/src/org/jboss/messaging/tests/performance/journal/JournalImplTestUnit.java
trunk/tests/src/org/jboss/messaging/tests/performance/journal/RealJournalImplAIOTest.java
trunk/tests/src/org/jboss/messaging/tests/performance/journal/RealJournalImplTest.java
trunk/tests/src/org/jboss/messaging/tests/performance/persistence/
trunk/tests/src/org/jboss/messaging/tests/performance/persistence/StorageManagerTimingTest.java
trunk/tests/src/org/jboss/messaging/tests/performance/persistence/fakes/
trunk/tests/src/org/jboss/messaging/tests/performance/persistence/fakes/FakeBinding.java
trunk/tests/src/org/jboss/messaging/tests/performance/persistence/fakes/FakePostOffice.java
trunk/tests/src/org/jboss/messaging/tests/performance/remoting/
trunk/tests/src/org/jboss/messaging/tests/performance/remoting/MeasureBase.java
trunk/tests/src/org/jboss/messaging/tests/performance/remoting/MeasureInVMTest.java
trunk/tests/src/org/jboss/messaging/tests/performance/remoting/MeasureRemoteTest.java
Removed:
trunk/tests/src/org/jboss/messaging/tests/integration/core/remoting/mina/timing/MeasureBase.java
trunk/tests/src/org/jboss/messaging/tests/integration/core/remoting/mina/timing/MeasureInVMTest.java
trunk/tests/src/org/jboss/messaging/tests/integration/core/remoting/mina/timing/MeasureRemoteTest.java
trunk/tests/src/org/jboss/messaging/tests/unit/core/persistence/fakes/FakeBinding.java
trunk/tests/src/org/jboss/messaging/tests/unit/core/persistence/fakes/FakePostOffice.java
trunk/tests/src/org/jboss/messaging/tests/unit/core/persistence/impl/timing/StorageManagerTimingTest.java
Modified:
trunk/build-messaging.xml
trunk/src/config/jbm-configuration.xml
trunk/src/main/org/jboss/messaging/core/asyncio/AIOCallback.java
trunk/src/main/org/jboss/messaging/core/asyncio/AsynchronousFile.java
trunk/src/main/org/jboss/messaging/core/asyncio/impl/AsynchronousFileImpl.java
trunk/src/main/org/jboss/messaging/core/config/Configuration.java
trunk/src/main/org/jboss/messaging/core/config/impl/ConfigurationImpl.java
trunk/src/main/org/jboss/messaging/core/config/impl/FileConfiguration.java
trunk/src/main/org/jboss/messaging/core/journal/IOCallback.java
trunk/src/main/org/jboss/messaging/core/journal/Journal.java
trunk/src/main/org/jboss/messaging/core/journal/RecordInfo.java
trunk/src/main/org/jboss/messaging/core/journal/SequentialFile.java
trunk/src/main/org/jboss/messaging/core/journal/SequentialFileFactory.java
trunk/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFile.java
trunk/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFileFactory.java
trunk/src/main/org/jboss/messaging/core/journal/impl/AbstractSequentialFactory.java
trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java
trunk/src/main/org/jboss/messaging/core/journal/impl/NIOSequentialFile.java
trunk/src/main/org/jboss/messaging/core/journal/impl/NIOSequentialFileFactory.java
trunk/src/main/org/jboss/messaging/core/message/Message.java
trunk/src/main/org/jboss/messaging/core/message/impl/MessageImpl.java
trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java
trunk/tests/config/ConfigurationTest-config.xml
trunk/tests/src/org/jboss/messaging/tests/integration/core/asyncio/impl/MultiThreadWriteNativeTest.java
trunk/tests/src/org/jboss/messaging/tests/integration/core/asyncio/impl/SingleThreadWriteNativeTest.java
trunk/tests/src/org/jboss/messaging/tests/unit/core/config/impl/ConfigurationTest.java
trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/AIOSequentialFileFactoryTest.java
trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalImplTestBase.java
trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalImplTestUnit.java
trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/SequentialFileFactoryTestBase.java
trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java
trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/timing/JournalImplTestUnit.java
trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/timing/RealJournalImplAIOTest.java
Log:
I - AIO Timeouts & configuration
II - Few tweaks on headers
III - Moving performance tests out of unit tests
Modified: trunk/build-messaging.xml
===================================================================
--- trunk/build-messaging.xml 2008-05-21 19:37:04 UTC (rev 4276)
+++ trunk/build-messaging.xml 2008-05-22 03:07:49 UTC (rev 4277)
@@ -615,7 +615,8 @@
<fileset dir="${test.classes.dir}">
<include name="**/org/jboss/messaging/tests/integration/**/*${test-mask}.class"/>
<include name="**/org/jboss/messaging/tests/unit/**/*${test-mask}.class"/>
- <exclude name="**/org/jboss/messaging/tests/local/**/*${test-mask}.class"/>
+ <exclude name="**/org/jboss/messaging/tests/local/**/*${test-mask}.class"/>
+ <exclude name="**/org/jboss/messaging/tests/performance/**/*${test-mask}.class"/>
</fileset>
</batchtest>
</junit>
Modified: trunk/src/config/jbm-configuration.xml
===================================================================
--- trunk/src/config/jbm-configuration.xml 2008-05-21 19:37:04 UTC (rev 4276)
+++ trunk/src/config/jbm-configuration.xml 2008-05-22 03:07:49 UTC (rev 4277)
@@ -82,6 +82,15 @@
(parameter ignored on NIO) -->
<journal-max-aio>5000</journal-max-aio>
+
+ <!-- Maximum time in seconds an AIO operation could take.
+ This includes:
+ - closing Asynchronous files
+ - Transaction awaits
+ - Awaits on non transactional writes
+ -->
+ <journal-aio-timeout>90</journal-aio-timeout>
+
<journal-task-period>5000</journal-task-period>
<security-enabled>true</security-enabled>
Modified: trunk/src/main/org/jboss/messaging/core/asyncio/AIOCallback.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/asyncio/AIOCallback.java 2008-05-21 19:37:04 UTC (rev 4276)
+++ trunk/src/main/org/jboss/messaging/core/asyncio/AIOCallback.java 2008-05-22 03:07:49 UTC (rev 4277)
@@ -8,7 +8,7 @@
package org.jboss.messaging.core.asyncio;
/**
- *
+ * The interface used AIO Callbacks.
* @author clebert.suconic at jboss.com
*
*/
Modified: trunk/src/main/org/jboss/messaging/core/asyncio/AsynchronousFile.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/asyncio/AsynchronousFile.java 2008-05-21 19:37:04 UTC (rev 4276)
+++ trunk/src/main/org/jboss/messaging/core/asyncio/AsynchronousFile.java 2008-05-22 03:07:49 UTC (rev 4277)
@@ -24,7 +24,7 @@
* @param fileName
* @param maxIO The number of max concurrent asynchrnous IO operations. It has to be balanced between the size of your writes and the capacity of your disk.
*/
- void open(String fileName, int maxIO);
+ void open(String fileName, int maxIO, int timeout);
/**
* Warning: This function will perform a synchronous IO, probably translating to a fstat call
Modified: trunk/src/main/org/jboss/messaging/core/asyncio/impl/AsynchronousFileImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/asyncio/impl/AsynchronousFileImpl.java 2008-05-21 19:37:04 UTC (rev 4276)
+++ trunk/src/main/org/jboss/messaging/core/asyncio/impl/AsynchronousFileImpl.java 2008-05-22 03:07:49 UTC (rev 4277)
@@ -22,6 +22,8 @@
/**
*
+ * AsynchronousFile implementation
+ *
* @author clebert.suconic at jboss.com
* Warning: Case you refactor the name or the package of this class
* You need to make sure you also rename the C++ native calls
@@ -103,6 +105,7 @@
private ReadWriteLock lock = new ReentrantReadWriteLock();
private Lock writeLock = lock.writeLock();
private Semaphore writeSemaphore;
+ private int timeout;
/**
* Warning: Beware of the C++ pointer! It will bite you! :-)
@@ -114,11 +117,12 @@
// AsynchronousFile implementation
// ------------------------------------------------------------------------------------
- public void open(final String fileName, final int maxIO)
+ public void open(final String fileName, final int maxIO, final int timeout)
{
try
{
writeLock.lock();
+ this.timeout = timeout;
this.maxIO = maxIO;
writeSemaphore = new Semaphore(this.maxIO);
@@ -145,7 +149,7 @@
try
{
writeLock.lock();
- writeLatch.waitCompletion(120);
+ writeLatch.waitCompletion(timeout);
writeSemaphore = null;
stopPoller(handler);
// We need to make sure we won't call close until Poller is completely done, or we might get beautiful GPFs
Modified: trunk/src/main/org/jboss/messaging/core/config/Configuration.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/config/Configuration.java 2008-05-21 19:37:04 UTC (rev 4276)
+++ trunk/src/main/org/jboss/messaging/core/config/Configuration.java 2008-05-22 03:07:49 UTC (rev 4277)
@@ -109,6 +109,8 @@
int getJournalMaxAIO();
+ int getJournalAIOTimeout();
+
void setJournalMaxAIO(int max);
long getJournalTaskPeriod();
Modified: trunk/src/main/org/jboss/messaging/core/config/impl/ConfigurationImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/config/impl/ConfigurationImpl.java 2008-05-21 19:37:04 UTC (rev 4276)
+++ trunk/src/main/org/jboss/messaging/core/config/impl/ConfigurationImpl.java 2008-05-22 03:07:49 UTC (rev 4277)
@@ -52,6 +52,7 @@
public static final boolean DEFAULT_INVM_DISABLED = false;
public static final boolean DEFAULT_SSL_ENABLED = false;
public static final int DEFAULT_MAX_AIO = 3000;
+ public static final int DEFAULT_AIO_TIMEOUT = 90;
protected List<String> defaultInterceptors = new ArrayList<String>();
@@ -83,8 +84,10 @@
protected int journalMinFiles;
- protected int maxAIO;
+ protected int journalMaxAIO;
+ protected int journalAIOTimeout;
+
protected long journalTaskPeriod;
protected boolean securityEnabled = true;
@@ -410,15 +413,20 @@
public int getJournalMaxAIO()
{
- return maxAIO;
+ return journalMaxAIO;
}
public void setJournalMaxAIO(int max)
{
- this.maxAIO = max;
+ this.journalMaxAIO = max;
}
- public int getJournalMinFiles()
+ public int getJournalAIOTimeout()
+ {
+ return journalAIOTimeout;
+ }
+
+ public int getJournalMinFiles()
{
return journalMinFiles;
}
Modified: trunk/src/main/org/jboss/messaging/core/config/impl/FileConfiguration.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/config/impl/FileConfiguration.java 2008-05-21 19:37:04 UTC (rev 4276)
+++ trunk/src/main/org/jboss/messaging/core/config/impl/FileConfiguration.java 2008-05-22 03:07:49 UTC (rev 4277)
@@ -136,8 +136,10 @@
this.journalTaskPeriod = getLong(e, "journal-task-period", 5000L);
- this.maxAIO = getInteger(e, "journal-max-aio", DEFAULT_MAX_AIO);
+ this.journalMaxAIO = getInteger(e, "journal-max-aio", DEFAULT_MAX_AIO);
+ this.journalAIOTimeout = getInteger(e, "journal-aio-timeout", DEFAULT_AIO_TIMEOUT);
+
this.securityEnabled = getBoolean(e, "security-enabled", true);
NodeList defaultInterceptors = e.getElementsByTagName("default-interceptors-config");
Modified: trunk/src/main/org/jboss/messaging/core/journal/IOCallback.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/IOCallback.java 2008-05-21 19:37:04 UTC (rev 4276)
+++ trunk/src/main/org/jboss/messaging/core/journal/IOCallback.java 2008-05-22 03:07:49 UTC (rev 4277)
@@ -9,6 +9,14 @@
import org.jboss.messaging.core.asyncio.AIOCallback;
+/**
+ *
+ * This class is just a direct extention of AIOCallback.
+ * Just to avoid the direct dependency of org.jboss.messaging.core.asynciio.AIOCallback from the journal.
+ *
+ * @author <a href="mailto:clebert.suconic at jboss.com">Clebert Suconic</a>
+ *
+ */
public interface IOCallback extends AIOCallback
{
Modified: trunk/src/main/org/jboss/messaging/core/journal/Journal.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/Journal.java 2008-05-21 19:37:04 UTC (rev 4276)
+++ trunk/src/main/org/jboss/messaging/core/journal/Journal.java 2008-05-22 03:07:49 UTC (rev 4277)
@@ -30,6 +30,7 @@
* A Journal
*
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @author <a href="mailto:clebert.suconic at jboss.com">Clebert Suconic</a>
*
*/
public interface Journal extends MessagingComponent
Modified: trunk/src/main/org/jboss/messaging/core/journal/RecordInfo.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/RecordInfo.java 2008-05-21 19:37:04 UTC (rev 4276)
+++ trunk/src/main/org/jboss/messaging/core/journal/RecordInfo.java 2008-05-22 03:07:49 UTC (rev 4277)
@@ -6,6 +6,7 @@
* A RecordInfo
*
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @author <a href="mailto:clebert.suconic at jboss.com">Clebert Suconic</a>
*
*/
public class RecordInfo
Modified: trunk/src/main/org/jboss/messaging/core/journal/SequentialFile.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/SequentialFile.java 2008-05-21 19:37:04 UTC (rev 4276)
+++ trunk/src/main/org/jboss/messaging/core/journal/SequentialFile.java 2008-05-22 03:07:49 UTC (rev 4277)
@@ -28,6 +28,7 @@
* A SequentialFile
*
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @author <a href="mailto:clebert.suconic at jboss.com">Clebert Suconic</a>
*
*/
public interface SequentialFile
Modified: trunk/src/main/org/jboss/messaging/core/journal/SequentialFileFactory.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/SequentialFileFactory.java 2008-05-21 19:37:04 UTC (rev 4276)
+++ trunk/src/main/org/jboss/messaging/core/journal/SequentialFileFactory.java 2008-05-22 03:07:49 UTC (rev 4277)
@@ -29,11 +29,12 @@
* A SequentialFileFactory
*
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @author <a href="mailto:clebert.suconic at jboss.com">Clebert Suconic</a>
*
*/
public interface SequentialFileFactory
{
- SequentialFile createSequentialFile(String fileName, boolean sync, int maxIO) throws Exception;
+ SequentialFile createSequentialFile(String fileName, boolean sync, int maxIO, int timeout) throws Exception;
List<String> listFiles(String extension) throws Exception;
Modified: trunk/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFile.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFile.java 2008-05-21 19:37:04 UTC (rev 4276)
+++ trunk/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFile.java 2008-05-22 03:07:49 UTC (rev 4277)
@@ -41,6 +41,8 @@
private final int maxIO;
+ private final int timeout;
+
private AsynchronousFile aioFile;
private AtomicLong position = new AtomicLong(0);
@@ -49,11 +51,12 @@
// serious performance problems. Because of that we make all the writes on AIO using a single thread.
private ExecutorService executor;
- public AIOSequentialFile(final String journalDir, final String fileName, final int maxIO) throws Exception
+ public AIOSequentialFile(final String journalDir, final String fileName, final int maxIO, final int timeout) throws Exception
{
this.journalDir = journalDir;
this.fileName = fileName;
this.maxIO = maxIO;
+ this.timeout = timeout;
}
public int getAlignment() throws Exception
@@ -146,7 +149,7 @@
opened = true;
executor = Executors.newSingleThreadExecutor();
aioFile = new AsynchronousFileImpl();
- aioFile.open(journalDir + "/" + fileName, maxIO);
+ aioFile.open(journalDir + "/" + fileName, maxIO, timeout);
position.set(0);
}
Modified: trunk/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFileFactory.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFileFactory.java 2008-05-21 19:37:04 UTC (rev 4276)
+++ trunk/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFileFactory.java 2008-05-22 03:07:49 UTC (rev 4277)
@@ -26,9 +26,9 @@
super(journalDir);
}
- public SequentialFile createSequentialFile(final String fileName, final boolean sync, final int maxIO) throws Exception
+ public SequentialFile createSequentialFile(final String fileName, final boolean sync, final int maxIO, final int timeout) throws Exception
{
- return new AIOSequentialFile(journalDir, fileName, maxIO);
+ return new AIOSequentialFile(journalDir, fileName, maxIO, timeout);
}
public boolean supportsCallbacks()
Modified: trunk/src/main/org/jboss/messaging/core/journal/impl/AbstractSequentialFactory.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/impl/AbstractSequentialFactory.java 2008-05-21 19:37:04 UTC (rev 4276)
+++ trunk/src/main/org/jboss/messaging/core/journal/impl/AbstractSequentialFactory.java 2008-05-22 03:07:49 UTC (rev 4277)
@@ -15,6 +15,14 @@
import org.jboss.messaging.core.journal.SequentialFileFactory;
+/**
+ *
+ * An abstract SequentialFileFactory containing basic functionality for both AIO and NIO SequentialFactories
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @author <a href="mailto:clebert.suconic at jboss.com">Clebert Suconic</a>
+ *
+ */
public abstract class AbstractSequentialFactory implements SequentialFileFactory
{
protected final String journalDir;
Modified: trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java 2008-05-21 19:37:04 UTC (rev 4276)
+++ trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java 2008-05-22 03:07:49 UTC (rev 4277)
@@ -41,9 +41,7 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
-import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.Semaphore;
-import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
@@ -140,6 +138,9 @@
// used for Asynchronous IO only (ignored on NIO).
private final int maxAIO;
+ // used for Asynchronous IO only (ignored on NIO).
+ private final int aioTimeout;
+
private final int fileSize;
private final int minFiles;
@@ -193,7 +194,7 @@
public JournalImpl(final int fileSize, final int minFiles,
final boolean sync, final SequentialFileFactory fileFactory, final long taskPeriod,
- final String filePrefix, final String fileExtension, final int maxAIO)
+ final String filePrefix, final String fileExtension, final int maxAIO, final int aioTimeout)
{
if (fileSize < MIN_FILE_SIZE)
{
@@ -223,6 +224,10 @@
{
throw new IllegalStateException("maxAIO should aways be a positive number");
}
+ if (aioTimeout < 1)
+ {
+ throw new IllegalStateException("aio-timeout cannot be less than 1 second");
+ }
this.fileSize = fileSize;
@@ -240,6 +245,8 @@
this.maxAIO = maxAIO;
+ this.aioTimeout = aioTimeout;
+
shouldUseCallback = fileFactory.supportsCallbacks() && sync;
}
@@ -650,7 +657,7 @@
for (String fileName: fileNames)
{
- SequentialFile file = fileFactory.createSequentialFile(fileName, sync, maxAIO);
+ SequentialFile file = fileFactory.createSequentialFile(fileName, sync, maxAIO, aioTimeout);
file.open();
@@ -1435,7 +1442,7 @@
if (trace) log.trace("Creating file " + fileName);
- SequentialFile sequentialFile = fileFactory.createSequentialFile(fileName, sync, maxAIO);
+ SequentialFile sequentialFile = fileFactory.createSequentialFile(fileName, sync, maxAIO, aioTimeout);
sequentialFile.open();
Modified: trunk/src/main/org/jboss/messaging/core/journal/impl/NIOSequentialFile.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/impl/NIOSequentialFile.java 2008-05-21 19:37:04 UTC (rev 4276)
+++ trunk/src/main/org/jboss/messaging/core/journal/impl/NIOSequentialFile.java 2008-05-22 03:07:49 UTC (rev 4277)
@@ -35,6 +35,7 @@
* A NIOSequentialFile
*
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @author <a href="mailto:clebert.suconic at jboss.com">Clebert Suconic</a>
*
*/
public class NIOSequentialFile implements SequentialFile
Modified: trunk/src/main/org/jboss/messaging/core/journal/impl/NIOSequentialFileFactory.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/impl/NIOSequentialFileFactory.java 2008-05-21 19:37:04 UTC (rev 4276)
+++ trunk/src/main/org/jboss/messaging/core/journal/impl/NIOSequentialFileFactory.java 2008-05-22 03:07:49 UTC (rev 4277)
@@ -31,6 +31,7 @@
* A NIOSequentialFileFactory
*
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @author <a href="mailto:clebert.suconic at jboss.com">Clebert Suconic</a>
*
*/
public class NIOSequentialFileFactory extends AbstractSequentialFactory implements SequentialFileFactory
@@ -40,7 +41,8 @@
super(journalDir);
}
- public SequentialFile createSequentialFile(final String fileName, final boolean sync, int maxIO)
+ // The timeout is ignored on NIO
+ public SequentialFile createSequentialFile(final String fileName, final boolean sync, int maxIO, int timeout)
{
return new NIOSequentialFile(journalDir, fileName, sync);
}
Modified: trunk/src/main/org/jboss/messaging/core/message/Message.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/message/Message.java 2008-05-21 19:37:04 UTC (rev 4276)
+++ trunk/src/main/org/jboss/messaging/core/message/Message.java 2008-05-22 03:07:49 UTC (rev 4277)
@@ -33,7 +33,8 @@
* The payload is opaque to the messaging system.
*
* @author <a href="mailto:ovidiu at feodorov.com">Ovidiu Feodorov</a>
- * @author <a href="mailto:tim.fox"jboss.com">Tim Fox</a>
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @author <a href="mailto:clebert.suconic at jboss.com">ClebertSuconic</a>
* @version <tt>$Revision: 3341 $</tt>
*
* $Id: Message.java 3341 2007-11-19 14:34:57Z timfox $
Modified: trunk/src/main/org/jboss/messaging/core/message/impl/MessageImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/message/impl/MessageImpl.java 2008-05-21 19:37:04 UTC (rev 4276)
+++ trunk/src/main/org/jboss/messaging/core/message/impl/MessageImpl.java 2008-05-22 03:07:49 UTC (rev 4277)
@@ -42,6 +42,7 @@
*
* @author <a href="mailto:ovidiu at feodorov.com">Ovidiu Feodorov</a>
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * @author <a href="mailto:clebert.suconic at jboss.com">Clebert Suconic</a>
* @version <tt>$Revision: 2740 $</tt>
*
* For normal message transportation serialization is not used
Modified: trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java 2008-05-21 19:37:04 UTC (rev 4276)
+++ trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java 2008-05-22 03:07:49 UTC (rev 4277)
@@ -105,7 +105,7 @@
SequentialFileFactory bindingsFF = new NIOSequentialFileFactory(bindingsDir);
- bindingsJournal = new JournalImpl(1024 * 1024, 2, true, bindingsFF, 10000, "jbm-bindings", "bindings", 1);
+ bindingsJournal = new JournalImpl(1024 * 1024, 2, true, bindingsFF, 10000, "jbm-bindings", "bindings", 1, 1);
String journalDir = config.getJournalDirectory();
@@ -147,7 +147,7 @@
messageJournal = new JournalImpl(config.getJournalFileSize(),
config.getJournalMinFiles(), config.isJournalSync(), journalFF,
- config.getJournalTaskPeriod(), "jbm-data", "jbm", config.getJournalMaxAIO());
+ config.getJournalTaskPeriod(), "jbm-data", "jbm", config.getJournalMaxAIO(), config.getJournalAIOTimeout());
}
public long generateMessageID()
Modified: trunk/tests/config/ConfigurationTest-config.xml
===================================================================
--- trunk/tests/config/ConfigurationTest-config.xml 2008-05-21 19:37:04 UTC (rev 4276)
+++ trunk/tests/config/ConfigurationTest-config.xml 2008-05-22 03:07:49 UTC (rev 4277)
@@ -53,7 +53,8 @@
<remoting-ssl-truststore-password>secureexample truststore</remoting-ssl-truststore-password>
<journal-max-aio>123</journal-max-aio>
-
+ <journal-aio-timeout>123</journal-aio-timeout>
+
</configuration>
</deployment>
\ No newline at end of file
Added: trunk/tests/src/org/jboss/messaging/tests/integration/core/asyncio/impl/AIOTestBase.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/core/asyncio/impl/AIOTestBase.java (rev 0)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/core/asyncio/impl/AIOTestBase.java 2008-05-22 03:07:49 UTC (rev 4277)
@@ -0,0 +1,134 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.tests.integration.core.asyncio.impl;
+
+import java.io.File;
+import java.nio.ByteBuffer;
+import java.util.concurrent.CountDownLatch;
+
+import org.jboss.messaging.core.asyncio.AIOCallback;
+import org.jboss.messaging.core.asyncio.impl.AsynchronousFileImpl;
+import org.jboss.messaging.tests.util.UnitTestCase;
+
+/**
+ * The base class for AIO Tests
+ * @author Clebert Suconic
+ *
+ */
+public abstract class AIOTestBase extends UnitTestCase
+{
+ protected String fileDir = System.getProperty("user.home") + "/journal-test";
+ protected String FILE_NAME = fileDir + "/fileUsedOnNativeTests.log";
+
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+
+ File file = new File(fileDir);
+
+ deleteDirectory(file);
+
+ file.mkdir();
+
+ if (!AsynchronousFileImpl.isLoaded())
+ {
+ fail(String.format("libAIO is not loaded on %s %s %s", System
+ .getProperty("os.name"), System.getProperty("os.arch"), System
+ .getProperty("os.version")));
+ }
+ }
+
+ protected void tearDown() throws Exception
+ {
+ super.tearDown();
+ assertEquals(0, AsynchronousFileImpl.getTotalMaxIO());
+ }
+
+ protected void encodeBufer(ByteBuffer buffer)
+ {
+ buffer.clear();
+ int size = buffer.limit();
+ for (int i = 0; i < size - 1; i++)
+ {
+ buffer.put((byte) ('a' + (i % 20)));
+ }
+
+ buffer.put((byte) '\n');
+
+ }
+
+ protected void preAlloc(AsynchronousFileImpl controller, long size)
+ {
+ System.out.println("Pre allocating");
+ System.out.flush();
+ long startPreAllocate = System.currentTimeMillis();
+ controller.fill(0l, 1, size, (byte) 0);
+ long endPreAllocate = System.currentTimeMillis() - startPreAllocate;
+ if (endPreAllocate != 0)
+ {
+ System.out.println("PreAllocated the file (size = " + size
+ + " bytes) in " + endPreAllocate + " Milliseconds, What means "
+ + (size / endPreAllocate) + " bytes per millisecond");
+ }
+ }
+
+ protected static class CountDownCallback implements AIOCallback
+ {
+
+ CountDownLatch latch;
+
+ public CountDownCallback(CountDownLatch latch)
+ {
+ this.latch = latch;
+ }
+
+ boolean doneCalled = false;
+ boolean errorCalled = false;
+ int timesDoneCalled = 0;
+
+ public void done()
+ {
+ doneCalled = true;
+ timesDoneCalled++;
+ if (latch != null)
+ {
+ latch.countDown();
+ }
+ }
+
+ public void onError(int errorCode, String errorMessage)
+ {
+ errorCalled = true;
+ if (latch != null)
+ {
+ // even thought an error happened, we need to inform the latch, or the test won't finish
+ latch.countDown();
+ }
+ System.out.println("Received an Error - " + errorCode + " message=" + errorMessage);
+
+ }
+
+ }
+
+
+}
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/core/asyncio/impl/MultiThreadWriteNativeTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/core/asyncio/impl/MultiThreadWriteNativeTest.java 2008-05-21 19:37:04 UTC (rev 4276)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/core/asyncio/impl/MultiThreadWriteNativeTest.java 2008-05-22 03:07:49 UTC (rev 4277)
@@ -7,18 +7,14 @@
package org.jboss.messaging.tests.integration.core.asyncio.impl;
-import java.io.File;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
-import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicInteger;
-import junit.framework.TestCase;
-
import org.jboss.messaging.core.asyncio.AIOCallback;
import org.jboss.messaging.core.asyncio.impl.AsynchronousFileImpl;
import org.jboss.messaging.core.logging.Logger;
@@ -30,108 +26,33 @@
* I - Run->Open Run Dialog
* II - Find the class on the list (you will find it if you already tried running this testcase before)
* III - Add -Djava.library.path=<your project place>/native/src/.libs
+ *
+ * @author <a href="mailto:clebert.suconic at jboss.com">Clebert Suconic</a>.
* */
-public class MultiThreadWriteNativeTest extends TestCase
+public class MultiThreadWriteNativeTest extends AIOTestBase
{
static Logger log = Logger.getLogger(MultiThreadWriteNativeTest.class);
- static AtomicInteger position = new AtomicInteger(0);
+ AtomicInteger position = new AtomicInteger(0);
- String FILE_NAME="/tmp/libaio.log";
-
static final int SIZE = 1024;
- static final int NUMBER_OF_THREADS = 40;
- static final int NUMBER_OF_LINES = 5000;
+ static final int NUMBER_OF_THREADS = 10;
+ static final int NUMBER_OF_LINES = 1000;
// Executor exec
- static Executor executor = Executors.newSingleThreadExecutor();
+ Executor executor = Executors.newSingleThreadExecutor();
- static Semaphore semaphore = new Semaphore(1, false);
-
-
- static class ExecClass implements Runnable
- {
-
- AsynchronousFileImpl aio;
- ByteBuffer buffer;
- AIOCallback callback;
-
-
- public ExecClass(AsynchronousFileImpl aio, ByteBuffer buffer, AIOCallback callback)
- {
- this.aio = aio;
- this.buffer = buffer;
- this.callback = callback;
- }
-
- public void run()
- {
- try
- {
- aio.write(getNewPosition()*SIZE, SIZE, buffer, callback);
-
- }
- catch (Exception e)
- {
- callback.onError(-1, e.toString());
- e.printStackTrace();
- }
- finally
- {
- try { semaphore.release(); } catch (Exception ignored){}
- }
- }
-
- }
-
-
-
- private static void addData(AsynchronousFileImpl aio, ByteBuffer buffer, AIOCallback callback) throws Exception
- {
- //aio.write(getNewPosition()*SIZE, SIZE, buffer, callback);
- executor.execute(new ExecClass(aio, buffer, callback));
-
- //semaphore.acquire();
- //try
- //{
- //aio.write(getNewPosition()*SIZE, SIZE, buffer, callback);
- //}
- //finally
- //{
- // semaphore.release();
- //}
-
-
-
- }
-
-
-
-
protected void setUp() throws Exception
{
super.setUp();
- if (!AsynchronousFileImpl.isLoaded())
- {
- fail(String.format("libAIO is not loaded on %s %s %s",
- System.getProperty("os.name"),
- System.getProperty("os.arch"),
- System.getProperty("os.version")));
- }
-
- File file = new File(FILE_NAME);
- file.delete();
-
position.set(0);
}
protected void tearDown() throws Exception
{
super.tearDown();
-
- assertEquals(0, AsynchronousFileImpl.getTotalMaxIO());
}
public void testMultipleASynchronousWrites() throws Throwable
@@ -148,7 +69,7 @@
{
log.info(sync?"Sync test:":"Async test");
AsynchronousFileImpl jlibAIO = new AsynchronousFileImpl();
- jlibAIO.open(FILE_NAME, 21000);
+ jlibAIO.open(FILE_NAME, 21000, 120);
try
{
log.debug("Preallocating file");
@@ -196,12 +117,12 @@
- private static int getNewPosition()
+ private int getNewPosition()
{
return position.addAndGet(1);
}
- static class ThreadProducer extends Thread
+ class ThreadProducer extends Thread
{
Throwable failed = null;
@@ -247,13 +168,13 @@
if (!sync) latchFinishThread = new CountDownLatch(NUMBER_OF_LINES);
- LinkedList<LocalCallback> list = new LinkedList<LocalCallback>();
+ LinkedList<CountDownCallback> list = new LinkedList<CountDownCallback>();
for (int i=0;i<NUMBER_OF_LINES;i++)
{
if (sync) latchFinishThread = new CountDownLatch(1);
- LocalCallback callback = new LocalCallback(latchFinishThread, buffer, libaio);
+ CountDownCallback callback = new CountDownCallback(latchFinishThread);
if (!sync) list.add(callback);
addData(libaio, buffer,callback);
if (sync)
@@ -264,7 +185,7 @@
}
}
if (!sync) latchFinishThread.await();
- for (LocalCallback callback: list)
+ for (CountDownCallback callback: list)
{
assertTrue (callback.doneCalled);
assertFalse (callback.errorCalled);
@@ -277,7 +198,7 @@
libaio.destroyBuffer(buffer);
- for (LocalCallback callback: list)
+ for (CountDownCallback callback: list)
{
assertTrue (callback.doneCalled);
assertFalse (callback.errorCalled);
@@ -296,41 +217,45 @@
private static void addString(String str, ByteBuffer buffer)
{
byte bytes[] = str.getBytes();
- //buffer.putInt(bytes.length);
buffer.put(bytes);
- //CharBuffer charBuffer = CharBuffer.wrap(str);
- //UTF_8_ENCODER.encode(charBuffer, buffer, true);
-
}
- static class LocalCallback implements AIOCallback
+ private void addData(AsynchronousFileImpl aio, ByteBuffer buffer, AIOCallback callback) throws Exception
{
- boolean doneCalled = false;
- boolean errorCalled = false;
- CountDownLatch latchDone;
- ByteBuffer releaseMe;
- AsynchronousFileImpl libaio;
+ executor.execute(new WriteRunnable(aio, buffer, callback));
+ }
+
+ private class WriteRunnable implements Runnable
+ {
- public LocalCallback(CountDownLatch latchDone, ByteBuffer releaseMe, AsynchronousFileImpl libaio)
- {
- this.latchDone = latchDone;
- this.releaseMe = releaseMe;
- this.libaio = libaio;
- }
+ AsynchronousFileImpl aio;
+ ByteBuffer buffer;
+ AIOCallback callback;
- public void done()
+
+ public WriteRunnable(AsynchronousFileImpl aio, ByteBuffer buffer, AIOCallback callback)
{
- doneCalled=true;
- latchDone.countDown();
- //libaio.destroyBuffer(releaseMe);
+ this.aio = aio;
+ this.buffer = buffer;
+ this.callback = callback;
}
- public void onError(int errorCode, String errorMessage)
+ public void run()
{
- errorCalled=true;
- latchDone.countDown();
- libaio.destroyBuffer(releaseMe);
+ try
+ {
+ aio.write(getNewPosition()*SIZE, SIZE, buffer, callback);
+
+ }
+ catch (Exception e)
+ {
+ callback.onError(-1, e.toString());
+ e.printStackTrace();
+ }
}
}
+
+
+
}
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/core/asyncio/impl/SingleThreadWriteNativeTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/core/asyncio/impl/SingleThreadWriteNativeTest.java 2008-05-21 19:37:04 UTC (rev 4276)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/core/asyncio/impl/SingleThreadWriteNativeTest.java 2008-05-22 03:07:49 UTC (rev 4277)
@@ -7,7 +7,6 @@
package org.jboss.messaging.tests.integration.core.asyncio.impl;
-import java.io.File;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.charset.Charset;
@@ -20,8 +19,6 @@
import org.jboss.messaging.core.asyncio.impl.AsynchronousFileImpl;
import org.jboss.messaging.core.logging.Logger;
-import junit.framework.TestCase;
-
/**
*
* you need to define -Djava.library.path=${project-root}/native/src/.libs when calling the JVM
@@ -30,820 +27,628 @@
* II - Find the class on the list (you will find it if you already tried running this testcase before)
* III - Add -Djava.library.path=<your project place>/native/src/.libs
* */
-public class SingleThreadWriteNativeTest extends TestCase
+public class SingleThreadWriteNativeTest extends AIOTestBase
{
-
- private static final Logger log = Logger.getLogger(SingleThreadWriteNativeTest.class);
- private static CharsetEncoder UTF_8_ENCODER = Charset.forName("UTF-8").newEncoder();
+ private static final Logger log = Logger
+ .getLogger(SingleThreadWriteNativeTest.class);
+ private static CharsetEncoder UTF_8_ENCODER = Charset.forName("UTF-8")
+ .newEncoder();
- byte commonBuffer[] = null;
+ byte commonBuffer[] = null;
- String FILE_NAME="/tmp/libaio.log";
-
@Override
protected void setUp() throws Exception
{
- super.setUp();
- if (!AsynchronousFileImpl.isLoaded())
- {
- fail(String.format("libAIO is not loaded on %s %s %s",
- System.getProperty("os.name"),
- System.getProperty("os.arch"),
- System.getProperty("os.version")));
- }
-
- LocalAIO.staticDone = 0;
- File file = new File(FILE_NAME);
- file.delete();
+ super.setUp();
}
protected void tearDown() throws Exception
{
super.tearDown();
- assertEquals(0, AsynchronousFileImpl.getTotalMaxIO());
}
-
- private void encodeBufer(ByteBuffer buffer)
+
+ /**
+ * Opening and closing a file immediately can lead to races on the native layer,
+ * creating crash conditions.
+ * */
+ public void testOpenClose() throws Exception
{
- buffer.clear();
- int size = buffer.limit();
- for (int i=0;i<size-1;i++)
- {
- buffer.put((byte)('a' + (i%20)));
- }
-
- buffer.put((byte)'\n');
-
+ final AsynchronousFileImpl controller = new AsynchronousFileImpl();
+ for (int i = 0; i < 1000; i++)
+ {
+ controller.open(FILE_NAME, 10000, 120);
+ controller.close();
+
+ }
}
-
+
+ /**
+ * This test is validating if the AIO layer can open two different
+ * simultaneous files without loose any callbacks. This test made the native
+ * layer to crash at some point during development
+ */
public void testTwoFiles() throws Exception
{
final AsynchronousFileImpl controller = new AsynchronousFileImpl();
final AsynchronousFileImpl controller2 = new AsynchronousFileImpl();
- controller.open(FILE_NAME + ".1", 10000);
- controller2.open(FILE_NAME + ".2", 10000);
+ controller.open(FILE_NAME + ".1", 10000, 120);
+ controller2.open(FILE_NAME + ".2", 10000, 120);
- int numberOfLines = 100000;
+ int numberOfLines = 1000;
int size = 1024;
-
- try
- {
- System.out.println("++testDirectDataNoPage"); System.out.flush();
- CountDownLatch latchDone = new CountDownLatch(numberOfLines);
- CountDownLatch latchDone2 = new CountDownLatch(numberOfLines);
-
- ByteBuffer block = controller.newBuffer(size);
- encodeBufer(block);
-
- preAlloc(controller, numberOfLines * size);
- preAlloc(controller2, numberOfLines * size);
-
- ArrayList<LocalAIO> list = new ArrayList<LocalAIO>();
- ArrayList<LocalAIO> list2 = new ArrayList<LocalAIO>();
-
- for (int i=0; i<numberOfLines; i++)
- {
- list.add(new LocalAIO(latchDone));
- list2.add(new LocalAIO(latchDone2));
- }
-
-
- long valueInitial = System.currentTimeMillis();
-
- System.out.println("Adding data");
-
- long lastTime = System.currentTimeMillis();
- int counter = 0;
- Iterator<LocalAIO> iter2 = list2.iterator();
-
- for (LocalAIO tmp: list)
- {
- LocalAIO tmp2 = iter2.next();
-
- controller.write(counter * size, size, block, tmp);
- controller.write(counter * size, size, block, tmp2);
- if (++counter % 5000 == 0)
- {
- System.out.println(5000*1000/(System.currentTimeMillis()-lastTime) + " rec/sec (Async)");
- lastTime = System.currentTimeMillis();
- }
-
- }
-
- System.out.println("Data added " + (System.currentTimeMillis() - valueInitial));
-
-
- System.out.println("Finished append " + (System.currentTimeMillis() - valueInitial) + " received = " + LocalAIO.staticDone);
- System.out.println("Flush now");
- System.out.println("Received " + LocalAIO.staticDone);
- long timeTotal = System.currentTimeMillis() - valueInitial;
-
- System.out.println("Asynchronous time = " + timeTotal + " for " + numberOfLines + " registers " + " size each line = " + size + " Records/Sec=" + (numberOfLines*1000/timeTotal) + " (Assynchronous)");
-
- latchDone.await();
- latchDone2.await();
-
- timeTotal = System.currentTimeMillis() - valueInitial;
- System.out.println("After completions time = " + timeTotal + " for " + numberOfLines + " registers " + " size each line = " + size + " Records/Sec=" + (numberOfLines*1000/timeTotal) + " (Assynchronous)");
-
- for (LocalAIO tmp: list)
- {
- assertEquals(1, tmp.timesDoneCalled);
- assertTrue(tmp.doneCalled);
- assertFalse(tmp.errorCalled);
- }
-
- controller.destroyBuffer(block);
-
- controller.close();
- }
- finally
- {
- try {controller.close();} catch (Exception ignored){}
- try {controller2.close();} catch (Exception ignored){}
- }
-
-
- }
-
- public void testAnnoyingPoller() throws Exception
- {
- final AsynchronousFileImpl controller = new AsynchronousFileImpl();
- for (int i=0; i< 1000; i++)
+
+ try
{
- controller.open(FILE_NAME, 10000);
- controller.close();
+ log.info("++testDirectDataNoPage");
+ CountDownLatch latchDone = new CountDownLatch(numberOfLines);
+ CountDownLatch latchDone2 = new CountDownLatch(numberOfLines);
+ ByteBuffer block = controller.newBuffer(size);
+ encodeBufer(block);
+
+ preAlloc(controller, numberOfLines * size);
+ preAlloc(controller2, numberOfLines * size);
+
+ ArrayList<CountDownCallback> list = new ArrayList<CountDownCallback>();
+ ArrayList<CountDownCallback> list2 = new ArrayList<CountDownCallback>();
+
+ for (int i = 0; i < numberOfLines; i++)
+ {
+ list.add(new CountDownCallback(latchDone));
+ list2.add(new CountDownCallback(latchDone2));
+ }
+
+ long valueInitial = System.currentTimeMillis();
+
+ long lastTime = System.currentTimeMillis();
+ int counter = 0;
+ Iterator<CountDownCallback> iter2 = list2.iterator();
+
+ for (CountDownCallback tmp : list)
+ {
+ CountDownCallback tmp2 = iter2.next();
+
+ controller.write(counter * size, size, block, tmp);
+ controller.write(counter * size, size, block, tmp2);
+ if (++counter % 5000 == 0)
+ {
+ log.info(5000 * 1000 / (System.currentTimeMillis() - lastTime)
+ + " rec/sec (Async)");
+ lastTime = System.currentTimeMillis();
+ }
+
+ }
+
+ long timeTotal = System.currentTimeMillis() - valueInitial;
+
+ log.info("Asynchronous time = " + timeTotal + " for " + numberOfLines
+ + " registers " + " size each line = " + size + " Records/Sec="
+ + (numberOfLines * 1000 / timeTotal) + " (Assynchronous)");
+
+ latchDone.await();
+ latchDone2.await();
+
+ timeTotal = System.currentTimeMillis() - valueInitial;
+ log.info("After completions time = " + timeTotal + " for "
+ + numberOfLines + " registers " + " size each line = " + size
+ + " Records/Sec=" + (numberOfLines * 1000 / timeTotal)
+ + " (Assynchronous)");
+
+ for (CountDownCallback callback : list)
+ {
+ assertEquals(1, callback.timesDoneCalled);
+ assertTrue(callback.doneCalled);
+ assertFalse(callback.errorCalled);
+ }
+
+ for (CountDownCallback callback : list2)
+ {
+ assertEquals(1, callback.timesDoneCalled);
+ assertTrue(callback.doneCalled);
+ assertFalse(callback.errorCalled);
+ }
+
+ controller.destroyBuffer(block);
+
+ controller.close();
+ } finally
+ {
+ try
+ {
+ controller.close();
+ } catch (Exception ignored)
+ {
+ }
+ try
+ {
+ controller2.close();
+ } catch (Exception ignored)
+ {
+ }
}
}
-
public void testAddBeyongSimultaneousLimit() throws Exception
{
- asyncData(150000,1024,100);
+ asyncData(3000, 1024, 10);
}
-
+
public void testAddAsyncData() throws Exception
{
- asyncData(500000,1024,30000);
+ asyncData(10000, 1024, 30000);
}
- public void testValidateData() throws Exception
- {
- validateData(150000,1024,20000);
- }
-
public void testInvalidReads() throws Exception
{
class LocalCallback implements AIOCallback
{
-
+
CountDownLatch latch = new CountDownLatch(1);
boolean error;
+
public void done()
{
latch.countDown();
}
-
+
public void onError(int errorCode, String errorMessage)
{
this.error = true;
latch.countDown();
}
}
-
+
AsynchronousFileImpl controller = new AsynchronousFileImpl();
try
{
-
- final int SIZE = 512;
-
- controller.open(FILE_NAME, 10);
- controller.close();
-
- controller = new AsynchronousFileImpl();
-
- controller.open(FILE_NAME, 10);
-
- controller.fill(0,1, 512, (byte)'j');
-
-
- ByteBuffer buffer = controller.newBuffer(SIZE);
-
-
- buffer.clear();
-
- for (int i=0; i<SIZE; i++)
- {
- buffer.put((byte)(i%100));
- }
-
- LocalCallback callbackLocal = new LocalCallback();
-
- controller.write(0, 512, buffer, callbackLocal);
-
- callbackLocal.latch.await();
-
- ByteBuffer newBuffer = ByteBuffer.allocateDirect(50);
-
- callbackLocal = new LocalCallback();
-
- controller.read(0, 50, newBuffer, callbackLocal);
-
- callbackLocal.latch.await();
-
- //assertTrue(callbackLocal.error);
-
- callbackLocal = new LocalCallback();
-
- byte bytes[] = new byte[512];
-
- try
- {
- newBuffer = ByteBuffer.wrap(bytes);
-
- controller.read(0, 512, newBuffer, callbackLocal);
-
- fail("An exception was supposed to be thrown");
- }
- catch (Exception ignored)
- {
- }
-
- //newBuffer = ByteBuffer.allocateDirect(512);
- newBuffer = controller.newBuffer(512);
- callbackLocal = new LocalCallback();
- controller.read(0, 512, newBuffer,callbackLocal);
- callbackLocal.latch.await();
- assertFalse(callbackLocal.error);
-
- newBuffer.rewind();
-
- byte[] bytesRead = new byte[SIZE];
-
- newBuffer.get(bytesRead);
-
- for (int i=0; i<SIZE;i++)
- {
- assertEquals((byte)(i%100), bytesRead[i]);
- }
-
-
- controller.destroyBuffer(buffer);
- }
- finally
+
+ final int SIZE = 512;
+
+ controller.open(FILE_NAME, 10, 120);
+ controller.close();
+
+ controller = new AsynchronousFileImpl();
+
+ controller.open(FILE_NAME, 10, 120);
+
+ controller.fill(0, 1, 512, (byte) 'j');
+
+ ByteBuffer buffer = controller.newBuffer(SIZE);
+
+ buffer.clear();
+
+ for (int i = 0; i < SIZE; i++)
+ {
+ buffer.put((byte) (i % 100));
+ }
+
+ LocalCallback callbackLocal = new LocalCallback();
+
+ controller.write(0, 512, buffer, callbackLocal);
+
+ callbackLocal.latch.await();
+
+ ByteBuffer newBuffer = ByteBuffer.allocateDirect(50);
+
+ callbackLocal = new LocalCallback();
+
+ controller.read(0, 50, newBuffer, callbackLocal);
+
+ callbackLocal.latch.await();
+
+ //assertTrue(callbackLocal.error);
+
+ callbackLocal = new LocalCallback();
+
+ byte bytes[] = new byte[512];
+
+ try
+ {
+ newBuffer = ByteBuffer.wrap(bytes);
+
+ controller.read(0, 512, newBuffer, callbackLocal);
+
+ fail("An exception was supposed to be thrown");
+ } catch (Exception ignored)
+ {
+ }
+
+ //newBuffer = ByteBuffer.allocateDirect(512);
+ newBuffer = controller.newBuffer(512);
+ callbackLocal = new LocalCallback();
+ controller.read(0, 512, newBuffer, callbackLocal);
+ callbackLocal.latch.await();
+ assertFalse(callbackLocal.error);
+
+ newBuffer.rewind();
+
+ byte[] bytesRead = new byte[SIZE];
+
+ newBuffer.get(bytesRead);
+
+ for (int i = 0; i < SIZE; i++)
+ {
+ assertEquals((byte) (i % 100), bytesRead[i]);
+ }
+
+ controller.destroyBuffer(buffer);
+ } finally
{
- try { controller.close(); } catch (Throwable ignored){}
-
+ try
+ {
+ controller.close();
+ } catch (Throwable ignored)
+ {
+ }
+
}
-
+
}
-
public void testRead() throws Exception
{
- final AsynchronousFileImpl controller = new AsynchronousFileImpl();
- try
- {
-
- final int NUMBER_LINES = 1000;
- final int SIZE = 1024;
-
- controller.open(FILE_NAME, 10);
-
- log.info("Filling file");
-
- controller.fill(0,1, NUMBER_LINES * SIZE, (byte)'j');
-
- ByteBuffer buffer = controller.newBuffer(SIZE);
-
- log.info("Writing file");
-
- for (int i=0; i<NUMBER_LINES; i++)
- {
- buffer.clear();
- addString ("Str value " + i + "\n", buffer);
- for (int j=buffer.position(); j<buffer.capacity()-1;j++)
- {
- buffer.put((byte)' ');
- }
- buffer.put((byte)'\n');
-
-
- CountDownLatch latch = new CountDownLatch(1);
- LocalAIO aio = new LocalAIO(latch);
- controller.write(i * SIZE, SIZE, buffer, aio);
- latch.await();
- assertFalse(aio.errorCalled);
- assertTrue(aio.doneCalled);
- }
-
-
- // If you call close you're supposed to wait events to finish before closing it
- log.info("Closing file");
- controller.close();
- log.info("Reading file");
- controller.open(FILE_NAME, 10);
-
- ByteBuffer newBuffer = ByteBuffer.allocateDirect(SIZE);
-
- for (int i=0; i<NUMBER_LINES; i++)
- {
- newBuffer.clear();
- addString ("Str value " + i + "\n", newBuffer);
- for (int j=newBuffer.position(); j<newBuffer.capacity()-1;j++)
- {
- newBuffer.put((byte)' ');
- }
- newBuffer.put((byte)'\n');
-
-
- CountDownLatch latch = new CountDownLatch(1);
- LocalAIO aio = new LocalAIO(latch);
- controller.read(i * SIZE, SIZE, buffer, aio);
- latch.await();
- assertFalse(aio.errorCalled);
- assertTrue(aio.doneCalled);
-
- byte bytesRead[] = new byte[SIZE];
- byte bytesCompare[] = new byte[SIZE];
-
- newBuffer.rewind();
- newBuffer.get(bytesCompare);
- buffer.rewind();
- buffer.get(bytesRead);
-
- for (int count=0;count<SIZE;count++)
- {
- assertEquals("byte position " + count + " differs on line " + i, bytesCompare[count], bytesRead[count]);
- }
-
-
- //byte[] byteCompare = new byte[SIZE];
- //byte[] byteRead = new byte[SIZE];
-
- assertTrue(buffer.equals(newBuffer));
- }
-
- controller.destroyBuffer(buffer);
- }
- finally
- {
- try { controller.close(); } catch (Throwable ignored){}
-
- }
-
+ final AsynchronousFileImpl controller = new AsynchronousFileImpl();
+ try
+ {
+
+ final int NUMBER_LINES = 1000;
+ final int SIZE = 1024;
+
+ controller.open(FILE_NAME, 10, 120);
+
+ log.info("Filling file");
+
+ controller.fill(0, 1, NUMBER_LINES * SIZE, (byte) 'j');
+
+ ByteBuffer buffer = controller.newBuffer(SIZE);
+
+ log.info("Writing file");
+
+ for (int i = 0; i < NUMBER_LINES; i++)
+ {
+ buffer.clear();
+ addString("Str value " + i + "\n", buffer);
+ for (int j = buffer.position(); j < buffer.capacity() - 1; j++)
+ {
+ buffer.put((byte) ' ');
+ }
+ buffer.put((byte) '\n');
+
+ CountDownLatch latch = new CountDownLatch(1);
+ CountDownCallback aio = new CountDownCallback(latch);
+ controller.write(i * SIZE, SIZE, buffer, aio);
+ latch.await();
+ assertFalse(aio.errorCalled);
+ assertTrue(aio.doneCalled);
+ }
+
+ // If you call close you're supposed to wait events to finish before closing it
+ log.info("Closing file");
+ controller.close();
+ log.info("Reading file");
+ controller.open(FILE_NAME, 10, 120);
+
+ ByteBuffer newBuffer = ByteBuffer.allocateDirect(SIZE);
+
+ for (int i = 0; i < NUMBER_LINES; i++)
+ {
+ newBuffer.clear();
+ addString("Str value " + i + "\n", newBuffer);
+ for (int j = newBuffer.position(); j < newBuffer.capacity() - 1; j++)
+ {
+ newBuffer.put((byte) ' ');
+ }
+ newBuffer.put((byte) '\n');
+
+ CountDownLatch latch = new CountDownLatch(1);
+ CountDownCallback aio = new CountDownCallback(latch);
+ controller.read(i * SIZE, SIZE, buffer, aio);
+ latch.await();
+ assertFalse(aio.errorCalled);
+ assertTrue(aio.doneCalled);
+
+ byte bytesRead[] = new byte[SIZE];
+ byte bytesCompare[] = new byte[SIZE];
+
+ newBuffer.rewind();
+ newBuffer.get(bytesCompare);
+ buffer.rewind();
+ buffer.get(bytesRead);
+
+ for (int count = 0; count < SIZE; count++)
+ {
+ assertEquals("byte position " + count + " differs on line " + i,
+ bytesCompare[count], bytesRead[count]);
+ }
+
+ assertTrue(buffer.equals(newBuffer));
+ }
+
+ controller.destroyBuffer(buffer);
+ } finally
+ {
+ try
+ {
+ controller.close();
+ } catch (Throwable ignored)
+ {
+ }
+
+ }
+
}
-
-
+ /**
+ * This test will call file.close() when there are still callbacks being processed.
+ * This could cause a crash or callbacks missing and this test is validating both situations.
+ * The file is also read after being written to validate its correctness */
public void testConcurrentClose() throws Exception
{
- // The test might eventually pass if broken
- for (int i=0; i<10; i++)
- internalConcurrentClose();
+ final AsynchronousFileImpl controller = new AsynchronousFileImpl();
+ try
+ {
+
+ final int NUMBER_LINES = 1000;
+ CountDownLatch readLatch = new CountDownLatch(NUMBER_LINES);
+ final int SIZE = 1024;
+
+ controller.open(FILE_NAME, 10000, 120);
+
+ log.info("Filling file");
+
+ controller.fill(0, 1, NUMBER_LINES * SIZE, (byte) 'j');
+
+ log.info("Writing file");
+
+ for (int i = 0; i < NUMBER_LINES; i++)
+ {
+ ByteBuffer buffer = ByteBuffer.allocateDirect(SIZE);
+
+ buffer.clear();
+ addString("Str value " + i + "\n", buffer);
+ for (int j = buffer.position(); j < buffer.capacity() - 1; j++)
+ {
+ buffer.put((byte) ' ');
+ }
+ buffer.put((byte) '\n');
+
+ CountDownCallback aio = new CountDownCallback(readLatch);
+ controller.write(i * SIZE, SIZE, buffer, aio);
+ }
+
+ long counter = readLatch.getCount();
+ // If you call close you're supposed to wait events to finish before
+ // closing it
+ controller.close();
+ log.info("Closed file with counter = " + counter);
+
+ assertEquals(0, readLatch.getCount());
+ readLatch.await();
+ log.info("Reading file");
+ controller.open(FILE_NAME, 10, 120);
+
+ ByteBuffer newBuffer = ByteBuffer.allocateDirect(SIZE);
+
+ ByteBuffer buffer = ByteBuffer.allocateDirect(SIZE);
+
+ for (int i = 0; i < NUMBER_LINES; i++)
+ {
+ newBuffer.clear();
+ addString("Str value " + i + "\n", newBuffer);
+ for (int j = newBuffer.position(); j < newBuffer.capacity() - 1; j++)
+ {
+ newBuffer.put((byte) ' ');
+ }
+ newBuffer.put((byte) '\n');
+
+ CountDownLatch latch = new CountDownLatch(1);
+ CountDownCallback aio = new CountDownCallback(latch);
+ controller.read(i * SIZE, SIZE, buffer, aio);
+ latch.await();
+ assertFalse(aio.errorCalled);
+ assertTrue(aio.doneCalled);
+
+ byte bytesRead[] = new byte[SIZE];
+ byte bytesCompare[] = new byte[SIZE];
+
+ newBuffer.rewind();
+ newBuffer.get(bytesCompare);
+ buffer.rewind();
+ buffer.get(bytesRead);
+
+ for (int count = 0; count < SIZE; count++)
+ {
+ assertEquals("byte position " + count + " differs on line " + i,
+ bytesCompare[count], bytesRead[count]);
+ }
+
+ assertTrue(buffer.equals(newBuffer));
+ }
+
+ } finally
+ {
+ try
+ {
+ controller.close();
+ } catch (Throwable ignored)
+ {
+ }
+
+ }
}
- public void internalConcurrentClose() throws Exception
+ private void asyncData(int numberOfLines, int size, int aioLimit)
+ throws Exception
{
- final AsynchronousFileImpl controller = new AsynchronousFileImpl();
- try
- {
-
- final int NUMBER_LINES = 1000;
- CountDownLatch readLatch = new CountDownLatch (NUMBER_LINES);
- final int SIZE = 1024;
-
- controller.open(FILE_NAME, 10000);
-
- log.info("Filling file");
-
- controller.fill(0,1, NUMBER_LINES * SIZE, (byte)'j');
-
- log.info("Writing file");
-
- for (int i=0; i<NUMBER_LINES; i++)
- {
- ByteBuffer buffer = ByteBuffer.allocateDirect(SIZE);
-
- buffer.clear();
- addString ("Str value " + i + "\n", buffer);
- for (int j=buffer.position(); j<buffer.capacity()-1;j++)
- {
- buffer.put((byte)' ');
- }
- buffer.put((byte)'\n');
-
-
- LocalAIO aio = new LocalAIO(readLatch);
- controller.write(i * SIZE, SIZE, buffer, aio);
- }
-
-
- long counter = readLatch.getCount();
- // If you call close you're supposed to wait events to finish before closing it
- controller.close();
- log.info("Closed file with counter = " + counter);
- assertEquals(0, readLatch.getCount());
- readLatch.await();
- log.info("Reading file");
- controller.open(FILE_NAME, 10);
-
- ByteBuffer newBuffer = ByteBuffer.allocateDirect(SIZE);
-
- ByteBuffer buffer = ByteBuffer.allocateDirect(SIZE);
-
- for (int i=0; i<NUMBER_LINES; i++)
- {
- newBuffer.clear();
- addString ("Str value " + i + "\n", newBuffer);
- for (int j=newBuffer.position(); j<newBuffer.capacity()-1;j++)
- {
- newBuffer.put((byte)' ');
- }
- newBuffer.put((byte)'\n');
-
-
- CountDownLatch latch = new CountDownLatch(1);
- LocalAIO aio = new LocalAIO(latch);
- controller.read(i * SIZE, SIZE, buffer, aio);
- latch.await();
- assertFalse(aio.errorCalled);
- assertTrue(aio.doneCalled);
-
- byte bytesRead[] = new byte[SIZE];
- byte bytesCompare[] = new byte[SIZE];
-
- newBuffer.rewind();
- newBuffer.get(bytesCompare);
- buffer.rewind();
- buffer.get(bytesRead);
-
- for (int count=0;count<SIZE;count++)
- {
- assertEquals("byte position " + count + " differs on line " + i, bytesCompare[count], bytesRead[count]);
- }
-
-
- //byte[] byteCompare = new byte[SIZE];
- //byte[] byteRead = new byte[SIZE];
-
- assertTrue(buffer.equals(newBuffer));
- }
-
- }
- finally
- {
- try { controller.close(); } catch (Throwable ignored){}
-
- }
-
- }
-
- /**
- * This method is not used unless you uncomment testValidateData
- * The purpose of this method is to verify if the information generated by one of the write methods is correct
- * @param numberOfLines
- * @param size
- * @param aioLimit
- * @throws Exception
- */
- private void validateData(int numberOfLines, int size, int aioLimit) throws Exception
- {
- final AsynchronousFileImpl controller = new AsynchronousFileImpl();
- try
- {
- controller.open(FILE_NAME, aioLimit);
-
- ByteBuffer compareBlock = ByteBuffer.allocateDirect(size);
- encodeBufer(compareBlock);
-
- ByteBuffer readBuffer = controller.newBuffer(size);
-
-
- boolean firstInvalid = false;
- for (int i=0;i<numberOfLines;i++)
- {
- if (i % 1000 == 0)
- {
- log.info("line = " + i);
- }
- CountDownLatch latch = new CountDownLatch(1);
- LocalAIO callback = new LocalAIO(latch);
- controller.read(i * size, size, readBuffer, callback);
-
- latch.await();
-
- if (!compareBuffers(compareBlock, readBuffer))
- {
- //log.info("Invalid line at " + i);
- firstInvalid=true;
- }
- else
- {
- if (firstInvalid)
- {
- for (int line=0;line<10;line++) log.info("*********************************************");
- log.warn("Valid line after an invalid line!!!");
- }
- }
-
- readBuffer.position(100);
- ByteBuffer buf1 = readBuffer.slice();
-
- //System.out.println("buf1=" + buf1);
- }
- }
- finally
- {
- controller.close();
- }
- }
-
-
- private boolean compareBuffers(ByteBuffer buffer1, ByteBuffer buffer2)
- {
+ final AsynchronousFileImpl controller = new AsynchronousFileImpl();
+ controller.open(FILE_NAME, aioLimit, 120);
- buffer1.rewind();
- buffer2.rewind();
-
- if (buffer1.limit() != buffer2.limit())
+ try
{
- return false;
- }
-
- byte bytes1[] = new byte[buffer1.limit()];
- byte bytes2[] = new byte[buffer2.limit()];
-
- buffer1.get(bytes1);
- buffer2.get(bytes2);
-
- for (int i=0; i< bytes1.length; i++)
+ log.info("++testDirectDataNoPage");
+ System.out.flush();
+ CountDownLatch latchDone = new CountDownLatch(numberOfLines);
+
+ ByteBuffer block = controller.newBuffer(size);
+ encodeBufer(block);
+
+ preAlloc(controller, numberOfLines * size);
+
+ ArrayList<CountDownCallback> list = new ArrayList<CountDownCallback>();
+
+ for (int i = 0; i < numberOfLines; i++)
+ {
+ list.add(new CountDownCallback(latchDone));
+ }
+
+ long valueInitial = System.currentTimeMillis();
+
+ long lastTime = System.currentTimeMillis();
+ int counter = 0;
+ for (CountDownCallback tmp : list)
+ {
+ controller.write(counter * size, size, block, tmp);
+ if (++counter % 20000 == 0)
+ {
+ log.info(20000 * 1000 / (System.currentTimeMillis() - lastTime)
+ + " rec/sec (Async)");
+ lastTime = System.currentTimeMillis();
+ }
+
+ }
+
+ System.out.print("waiting...");
+
+ latchDone.await();
+
+ log.info("done");
+
+ long timeTotal = System.currentTimeMillis() - valueInitial;
+ log.info("After completions time = " + timeTotal + " for "
+ + numberOfLines + " registers " + " size each line = " + size
+ + " Records/Sec=" + (numberOfLines * 1000 / timeTotal)
+ + " (Assynchronous)");
+
+ for (CountDownCallback tmp : list)
+ {
+ assertEquals(1, tmp.timesDoneCalled);
+ assertTrue(tmp.doneCalled);
+ assertFalse(tmp.errorCalled);
+ }
+
+ controller.destroyBuffer(block);
+
+ controller.close();
+ } finally
{
- if (bytes1[i] != bytes2[i])
+ try
{
- return false;
+ controller.close();
+ } catch (Exception ignored)
+ {
}
}
- return true;
}
- private void asyncData(int numberOfLines, int size, int aioLimit) throws Exception
- {
- final AsynchronousFileImpl controller = new AsynchronousFileImpl();
- controller.open(FILE_NAME, aioLimit);
-
- try
- {
- System.out.println("++testDirectDataNoPage"); System.out.flush();
- CountDownLatch latchDone = new CountDownLatch(numberOfLines);
-
- ByteBuffer block = controller.newBuffer(size);
- encodeBufer(block);
-
- preAlloc(controller, numberOfLines * size);
-
- ArrayList<LocalAIO> list = new ArrayList<LocalAIO>();
-
- for (int i=0; i<numberOfLines; i++)
- {
- list.add(new LocalAIO(latchDone));
- }
-
-
- long valueInitial = System.currentTimeMillis();
-
- System.out.println("Adding data");
-
- long lastTime = System.currentTimeMillis();
- int counter = 0;
- for (LocalAIO tmp: list)
- {
- controller.write(counter * size, size, block, tmp);
- if (++counter % 20000 == 0)
- {
- System.out.println(5000*1000/(System.currentTimeMillis()-lastTime) + " rec/sec (Async)");
- lastTime = System.currentTimeMillis();
- }
-
- }
-
- System.out.println("Data added " + (System.currentTimeMillis() - valueInitial));
-
-
- System.out.println("Finished append " + (System.currentTimeMillis() - valueInitial) + " received = " + LocalAIO.staticDone);
- System.out.println("Flush now");
- System.out.println("Received " + LocalAIO.staticDone);
- long timeTotal = System.currentTimeMillis() - valueInitial;
-
- System.out.println("Asynchronous time = " + timeTotal + " for " + numberOfLines + " registers " + " size each line = " + size + " Records/Sec=" + (numberOfLines*1000/timeTotal) + " (Assynchronous)");
-
- latchDone.await();
-
- timeTotal = System.currentTimeMillis() - valueInitial;
- System.out.println("After completions time = " + timeTotal + " for " + numberOfLines + " registers " + " size each line = " + size + " Records/Sec=" + (numberOfLines*1000/timeTotal) + " (Assynchronous)");
-
- for (LocalAIO tmp: list)
- {
- assertEquals(1, tmp.timesDoneCalled);
- assertTrue(tmp.doneCalled);
- assertFalse(tmp.errorCalled);
- }
-
- controller.destroyBuffer(block);
-
- controller.close();
- }
- finally
- {
- try {controller.close();} catch (Exception ignored){}
- }
-
-
- }
-
public void testDirectSynchronous() throws Exception
{
- try
- {
- System.out.println("++testDirectDataNoPage"); System.out.flush();
- final int NUMBER_LINES = 10000;
- final int SIZE = 1024;
-
- final AsynchronousFileImpl controller = new AsynchronousFileImpl();
- controller.open(FILE_NAME, 2000);
-
- ByteBuffer block = controller.newBuffer(SIZE);
- encodeBufer(block);
-
- preAlloc(controller, NUMBER_LINES * SIZE);
-
- long valueInitial = System.currentTimeMillis();
-
- System.out.println("Adding data");
-
- long lastTime = System.currentTimeMillis();
- int counter = 0;
-
- for (int i=0; i<NUMBER_LINES; i++)
- {
- CountDownLatch latchDone = new CountDownLatch(1);
- LocalAIO aioBlock = new LocalAIO(latchDone);
- controller.write(i*512, 512, block, aioBlock);
- latchDone.await();
- assertTrue(aioBlock.doneCalled);
- assertFalse(aioBlock.errorCalled);
- if (++counter % 500 == 0)
- {
- System.out.println(500*1000/(System.currentTimeMillis()-lastTime) + " rec/sec (Synchronous)");
- lastTime = System.currentTimeMillis();
- }
- }
-
- System.out.println("Data added " + (System.currentTimeMillis() - valueInitial));
-
-
- System.out.println("Finished append " + (System.currentTimeMillis() - valueInitial) + " received = " + LocalAIO.staticDone);
- System.out.println("Flush now");
- System.out.println("Received " + LocalAIO.staticDone);
-
- long timeTotal = System.currentTimeMillis() - valueInitial;
- System.out.println("Flushed " + timeTotal);
- System.out.println("time = " + timeTotal + " for " + NUMBER_LINES + " registers " + " size each line = " + SIZE + " Records/Sec=" + (NUMBER_LINES*1000/timeTotal) + " Synchronous");
-
- controller.destroyBuffer(block);
- controller.close();
- }
- catch (Exception e)
- {
- System.out.println("Received " + LocalAIO.staticDone + " before it failed");
- throw e;
- }
-
+ try
+ {
+ log.info("++testDirectDataNoPage");
+ System.out.flush();
+ final int NUMBER_LINES = 3000;
+ final int SIZE = 1024;
+
+ final AsynchronousFileImpl controller = new AsynchronousFileImpl();
+ controller.open(FILE_NAME, 2000, 120);
+
+ ByteBuffer block = ByteBuffer.allocateDirect(SIZE);
+ encodeBufer(block);
+
+ preAlloc(controller, NUMBER_LINES * SIZE);
+
+ long startTime = System.currentTimeMillis();
+
+ for (int i = 0; i < NUMBER_LINES; i++)
+ {
+ CountDownLatch latchDone = new CountDownLatch(1);
+ CountDownCallback aioBlock = new CountDownCallback(latchDone);
+ controller.write(i * 512, 512, block, aioBlock);
+ latchDone.await();
+ assertTrue(aioBlock.doneCalled);
+ assertFalse(aioBlock.errorCalled);
+ }
+
+ long timeTotal = System.currentTimeMillis() - startTime;
+ log.info("time = " + timeTotal + " for " + NUMBER_LINES
+ + " registers " + " size each line = " + SIZE + " Records/Sec="
+ + (NUMBER_LINES * 1000 / timeTotal) + " Synchronous");
+
+ controller.close();
+ } catch (Exception e)
+ {
+ throw e;
+ }
+
}
- private void preAlloc(AsynchronousFileImpl controller, long size)
- {
- System.out.println("Pre allocating"); System.out.flush();
- long startPreAllocate = System.currentTimeMillis();
- controller.fill(0l, 1, size, (byte)0);
- long endPreAllocate = System.currentTimeMillis() - startPreAllocate;
- if (endPreAllocate != 0) System.out.println("PreAllocated the file (size = " + size + " bytes) in " + endPreAllocate + " Milliseconds, What means " + (size/endPreAllocate) + " bytes per millisecond");
- }
-
-
public void testInvalidWrite() throws Exception
{
- final AsynchronousFileImpl controller = new AsynchronousFileImpl();
- controller.open(FILE_NAME, 2000);
-
- try
- {
-
- final int SIZE=512;
-
- ByteBuffer block = controller.newBuffer(SIZE);
- encodeBufer(block);
-
- preAlloc(controller, 1000 * 512);
-
-
- CountDownLatch latchDone = new CountDownLatch(1);
-
- LocalAIO aioBlock = new LocalAIO(latchDone);
- controller.write(11, 512, block, aioBlock);
-
- latchDone.await();
-
- assertTrue (aioBlock.errorCalled);
- assertFalse(aioBlock.doneCalled);
-
- controller.destroyBuffer(block);
- }
- catch (Exception e)
- {
- System.out.println("Received " + LocalAIO.staticDone + " before it failed");
- throw e;
- }
- finally
- {
- controller.close();
- }
-
+ final AsynchronousFileImpl controller = new AsynchronousFileImpl();
+ controller.open(FILE_NAME, 2000, 120);
+
+ try
+ {
+
+ final int SIZE = 512;
+
+ ByteBuffer block = controller.newBuffer(SIZE);
+ encodeBufer(block);
+
+ preAlloc(controller, 1000 * 512);
+
+ CountDownLatch latchDone = new CountDownLatch(1);
+
+ CountDownCallback aioBlock = new CountDownCallback(latchDone);
+ controller.write(11, 512, block, aioBlock);
+
+ latchDone.await();
+
+ assertTrue(aioBlock.errorCalled);
+ assertFalse(aioBlock.doneCalled);
+
+ controller.destroyBuffer(block);
+ } catch (Exception e)
+ {
+ throw e;
+ } finally
+ {
+ controller.close();
+ }
+
}
public void testInvalidAlloc() throws Exception
{
- AsynchronousFileImpl controller = new AsynchronousFileImpl();
- try
- {
- // You don't need to open the file to alloc it
- ByteBuffer buffer = controller.newBuffer(300);
- fail ("Exception expected");
- }
- catch (Exception ignored)
- {
- }
-
+ AsynchronousFileImpl controller = new AsynchronousFileImpl();
+ try
+ {
+ ByteBuffer buffer = controller.newBuffer(300);
+ fail("Exception expected");
+ } catch (Exception ignored)
+ {
+ }
+
}
- private static class LocalAIO implements AIOCallback
- {
-
- CountDownLatch latch;
-
- public LocalAIO(CountDownLatch latch)
- {
- this.latch = latch;
- }
-
- boolean doneCalled = false;
- boolean errorCalled = false;
- int timesDoneCalled = 0;
- static int staticDone = 0;
- public void decode(int length, ByteBuffer buffer)
- {
- // TODO Auto-generated method stub
-
- }
-
- public void done()
- {
- //System.out.println("Received Done"); System.out.flush();
- doneCalled = true;
- timesDoneCalled++;
- staticDone++;
- if (latch != null)
- {
- latch.countDown();
- }
-
- }
-
- public void onError(int errorCode, String errorMessage)
- {
- errorCalled = true;
- if (latch != null)
- {
- // even thought an error happened, we need to inform the latch, or the test won't finish
- latch.countDown();
- }
- System.out.println("Received an Error - " + errorCode + " message=" + errorMessage);
-
- }
-
- }
-
private void addString(String str, ByteBuffer buffer)
{
- CharBuffer charBuffer = CharBuffer.wrap(str);
- UTF_8_ENCODER.encode(charBuffer, buffer, true);
-
+ CharBuffer charBuffer = CharBuffer.wrap(str);
+ UTF_8_ENCODER.encode(charBuffer, buffer, true);
+
}
-
-
}
Deleted: trunk/tests/src/org/jboss/messaging/tests/integration/core/remoting/mina/timing/MeasureBase.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/core/remoting/mina/timing/MeasureBase.java 2008-05-21 19:37:04 UTC (rev 4276)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/core/remoting/mina/timing/MeasureBase.java 2008-05-22 03:07:49 UTC (rev 4277)
@@ -1,193 +0,0 @@
-/*
- * JBoss, the OpenSource J2EE webOS
- *
- * Distributable under LGPL license.
- * See terms of license at gnu.org.
- */
-
-package org.jboss.messaging.tests.integration.core.remoting.mina.timing;
-
-import junit.framework.TestCase;
-
-import org.jboss.messaging.core.client.impl.ConnectionParamsImpl;
-import org.jboss.messaging.core.client.impl.LocationImpl;
-import org.jboss.messaging.core.client.impl.RemotingConnectionImpl;
-import org.jboss.messaging.core.config.impl.ConfigurationImpl;
-import org.jboss.messaging.core.remoting.Packet;
-import org.jboss.messaging.core.remoting.PacketDispatcher;
-import org.jboss.messaging.core.remoting.PacketHandler;
-import org.jboss.messaging.core.remoting.PacketReturner;
-import org.jboss.messaging.core.remoting.impl.mina.MinaService;
-import org.jboss.messaging.core.remoting.impl.wireformat.EmptyPacket;
-
-
-/**
- *
- * @author clebert suconic
- *
- */
-public abstract class MeasureBase extends TestCase
-{
- protected MinaService service;
- protected PacketDispatcher serverDispatcher;
-
- @Override
- public void setUp() throws Exception
- {
- super.setUp();
- startServer();
- serverDispatcher.register(new FakeHandler());
- }
-
- @Override
- public void tearDown() throws Exception
- {
- service.stop();
- }
-
-
- public void testMixingSends() throws Throwable
- {
- RemotingConnectionImpl remoting = new RemotingConnectionImpl(getLocation(), createParameters());
- remoting.start();
-
- int NUMBER_OF_MESSAGES = 300;
-
- long start = System.currentTimeMillis();
-
- for (int i=0; i<NUMBER_OF_MESSAGES; i++)
- {
- if (i%2 == 0)
- {
- remoting.sendOneWay(10, 10, new EmptyPacket(EmptyPacket.CLOSE));
- }
- else
- {
- Object ret = remoting.sendBlocking(10, 0, new EmptyPacket(EmptyPacket.CLOSE));
- assertTrue (ret instanceof EmptyPacket);
- //assertEquals(EmptyPacket.EXCEPTION, ret.getType());
- }
- }
-
- long end = System.currentTimeMillis();
-
-
- System.out.println("Messages / second = " + NUMBER_OF_MESSAGES * 1000 / (end-start));
- Thread.sleep(1000);
-
- remoting.stop();
-
- }
-
- public void testBlockSends() throws Throwable
- {
- //NIOConnector connector = createNIOConnector(new PacketDispatcherImpl(null));
- //NIOSession session = connector.connect();
-
-
-
- RemotingConnectionImpl remoting = new RemotingConnectionImpl(getLocation(), createParameters());
- remoting.start();
-
- int NUMBER_OF_MESSAGES = 100;
-
- long start = System.currentTimeMillis();
-
- for (int i=0; i<NUMBER_OF_MESSAGES; i++)
- {
- Object ret = remoting.sendBlocking(10, 10, new EmptyPacket(EmptyPacket.CLOSE));
- assertTrue (ret instanceof EmptyPacket);
- }
-
- long end = System.currentTimeMillis();
-
-
- System.out.println("Messages / second = " + NUMBER_OF_MESSAGES * 1000 / (end-start));
- Thread.sleep(1000);
-
- remoting.stop();
-
- }
-
- public void testOneWaySends() throws Throwable
- {
- //NIOConnector connector = createNIOConnector(new PacketDispatcherImpl(null));
- //NIOSession session = connector.connect();
-
-
- RemotingConnectionImpl remoting = new RemotingConnectionImpl(getLocation(), createParameters());
- remoting.start();
-
- int NUMBER_OF_MESSAGES = 30000;
-
- long start = System.currentTimeMillis();
-
- for (int i=0; i<NUMBER_OF_MESSAGES; i++)
- {
- remoting.sendOneWay(10, 10, new EmptyPacket(EmptyPacket.CLOSE));
- }
-
- remoting.sendBlocking(10, 10, new EmptyPacket(EmptyPacket.CLOSE));
-
- long end = System.currentTimeMillis();
-
-
- System.out.println("Messages / second = " + NUMBER_OF_MESSAGES * 1000 / (end-start));
-
- remoting.stop();
-
- }
-
- protected abstract LocationImpl getLocation();
-
- protected abstract ConfigurationImpl createConfiguration();
-
-
-
- protected void startServer() throws Exception
- {
- service = new MinaService(createConfiguration());
- service.start();
- serverDispatcher = service.getDispatcher();
- System.out.println("Server Dispatcher = " + serverDispatcher);
- }
-
- // Private
-
- protected ConnectionParamsImpl createParameters()
- {
- ConnectionParamsImpl param = new ConnectionParamsImpl();
- param.setTimeout(50000);
- return param;
- }
-
-
-
- // Inner Classes
-
- class FakeHandler implements PacketHandler
- {
-
- public long getID()
- {
- return 10;
- }
-
- public void handle(Packet packet, PacketReturner sender)
- {
- //System.out.println("Hello " + packet);
- try
- {
- if (packet.getResponseTargetID() >= 0)
- {
- packet.setTargetID(packet.getResponseTargetID());
- sender.send(packet);
- }
- } catch (Exception e)
- {
- e.printStackTrace();
- }
- }
- }
-
-}
Deleted: trunk/tests/src/org/jboss/messaging/tests/integration/core/remoting/mina/timing/MeasureInVMTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/core/remoting/mina/timing/MeasureInVMTest.java 2008-05-21 19:37:04 UTC (rev 4276)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/core/remoting/mina/timing/MeasureInVMTest.java 2008-05-22 03:07:49 UTC (rev 4277)
@@ -1,33 +0,0 @@
-/*
- * JBoss, the OpenSource J2EE webOS
- *
- * Distributable under LGPL license.
- * See terms of license at gnu.org.
- */
-
-package org.jboss.messaging.tests.integration.core.remoting.mina.timing;
-
-import org.jboss.messaging.core.client.impl.LocationImpl;
-import org.jboss.messaging.core.config.impl.ConfigurationImpl;
-import org.jboss.messaging.tests.unit.core.remoting.impl.ConfigurationHelper;
-
-
-/** This test was added to compare InVM calls against MINA calls */
-public class MeasureInVMTest extends MeasureBase
-{
-
- @Override
- protected LocationImpl getLocation()
- {
- return new LocationImpl(0);
-
- }
-
- protected ConfigurationImpl createConfiguration()
- {
- return ConfigurationHelper.newInVMConfig();
- }
-
-
-
-}
Deleted: trunk/tests/src/org/jboss/messaging/tests/integration/core/remoting/mina/timing/MeasureRemoteTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/core/remoting/mina/timing/MeasureRemoteTest.java 2008-05-21 19:37:04 UTC (rev 4276)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/core/remoting/mina/timing/MeasureRemoteTest.java 2008-05-22 03:07:49 UTC (rev 4277)
@@ -1,34 +0,0 @@
-/*
- * JBoss, the OpenSource J2EE webOS
- *
- * Distributable under LGPL license.
- * See terms of license at gnu.org.
- */
-
-package org.jboss.messaging.tests.integration.core.remoting.mina.timing;
-
-import static org.jboss.messaging.core.remoting.TransportType.TCP;
-
-import org.jboss.messaging.core.client.impl.LocationImpl;
-import org.jboss.messaging.core.config.impl.ConfigurationImpl;
-import org.jboss.messaging.tests.integration.core.remoting.mina.TestSupport;
-import org.jboss.messaging.tests.unit.core.remoting.impl.ConfigurationHelper;
-
-public class MeasureRemoteTest extends MeasureBase
-{
-
- @Override
- protected LocationImpl getLocation()
- {
- return new LocationImpl(TCP, "localhost", TestSupport.PORT);
- }
-
- @Override
- protected ConfigurationImpl createConfiguration()
- {
- return ConfigurationHelper.newTCPConfiguration("localhost", TestSupport.PORT);
- }
-
-
-
-}
Copied: trunk/tests/src/org/jboss/messaging/tests/performance/journal/FakeJournalImplTest.java (from rev 4272, trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/timing/FakeJournalImplTest.java)
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/performance/journal/FakeJournalImplTest.java (rev 0)
+++ trunk/tests/src/org/jboss/messaging/tests/performance/journal/FakeJournalImplTest.java 2008-05-22 03:07:49 UTC (rev 4277)
@@ -0,0 +1,41 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.messaging.tests.performance.journal;
+
+import org.jboss.messaging.core.journal.SequentialFileFactory;
+import org.jboss.messaging.tests.unit.core.journal.impl.fakes.FakeSequentialFileFactory;
+
+/**
+ *
+ * A FakeJournalImplTest
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ */
+public class FakeJournalImplTest extends JournalImplTestUnit
+{
+ protected SequentialFileFactory getFileFactory() throws Exception
+ {
+ return new FakeSequentialFileFactory();
+ }
+}
+
Copied: trunk/tests/src/org/jboss/messaging/tests/performance/journal/JournalImplTestUnit.java (from rev 4272, trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/timing/JournalImplTestUnit.java)
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/performance/journal/JournalImplTestUnit.java (rev 0)
+++ trunk/tests/src/org/jboss/messaging/tests/performance/journal/JournalImplTestUnit.java 2008-05-22 03:07:49 UTC (rev 4277)
@@ -0,0 +1,356 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.messaging.tests.performance.journal;
+
+import java.util.ArrayList;
+import java.util.concurrent.CountDownLatch;
+
+import org.jboss.messaging.core.asyncio.impl.AsynchronousFileImpl;
+import org.jboss.messaging.core.journal.IOCallback;
+import org.jboss.messaging.core.journal.Journal;
+import org.jboss.messaging.core.journal.PreparedTransactionInfo;
+import org.jboss.messaging.core.journal.RecordInfo;
+import org.jboss.messaging.core.journal.impl.JournalImpl;
+import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.tests.unit.core.journal.impl.JournalImplTestBase;
+
+/**
+ *
+ * A RealJournalImplTest
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ */
+public abstract class JournalImplTestUnit extends JournalImplTestBase
+{
+ private static final Logger log = Logger.getLogger(JournalImplTestUnit.class);
+
+ protected void tearDown() throws Exception
+ {
+ super.tearDown();
+
+ assertEquals(0, AsynchronousFileImpl.getTotalMaxIO());
+ }
+
+ public void testAddUpdateDeleteManyLargeFileSize() throws Exception
+ {
+ final int numberAdds = 10000;
+
+ final int numberUpdates = 5000;
+
+ final int numberDeletes = 3000;
+
+ long[] adds = new long[numberAdds];
+
+ for (int i = 0; i < numberAdds; i++)
+ {
+ adds[i] = i;
+ }
+
+ long[] updates = new long[numberUpdates];
+
+ for (int i = 0; i < numberUpdates; i++)
+ {
+ updates[i] = i;
+ }
+
+ long[] deletes = new long[numberDeletes];
+
+ for (int i = 0; i < numberDeletes; i++)
+ {
+ deletes[i] = i;
+ }
+
+ // This would take a long time with sync=true, and still validates the file.
+ setup(10, 10 * 1024 * 1024, false);
+ createJournal();
+ startJournal();
+ load();
+ add(adds);
+ update(updates);
+ delete(deletes);
+ stopJournal();
+ createJournal();
+ startJournal();
+ loadAndCheck();
+
+ }
+
+ public void testAddUpdateDeleteManySmallFileSize() throws Exception
+ {
+ final int numberAdds = 10000;
+
+ final int numberUpdates = 5000;
+
+ final int numberDeletes = 3000;
+
+ long[] adds = new long[numberAdds];
+
+ for (int i = 0; i < numberAdds; i++)
+ {
+ adds[i] = i;
+ }
+
+ long[] updates = new long[numberUpdates];
+
+ for (int i = 0; i < numberUpdates; i++)
+ {
+ updates[i] = i;
+ }
+
+ long[] deletes = new long[numberDeletes];
+
+ for (int i = 0; i < numberDeletes; i++)
+ {
+ deletes[i] = i;
+ }
+
+ setup(10, 10 * 1024, false);
+ createJournal();
+ startJournal();
+ load();
+ add(adds);
+ update(updates);
+ delete(deletes);
+
+ log.info("Debug journal:" + debugJournal());
+ stopJournal(false);
+ createJournal();
+ startJournal();
+ loadAndCheck();
+
+ }
+
+ public void testReclaimAndReload() throws Exception
+ {
+ setup(2, 10 * 1024 * 1024, false);
+ createJournal();
+ startJournal();
+ load();
+
+ journal.startReclaimer();
+
+ long start = System.currentTimeMillis();
+
+
+ byte[] record = generateRecord(recordLength);
+
+ int NUMBER_OF_RECORDS = 1000;
+
+ for (int count = 0; count < NUMBER_OF_RECORDS; count++)
+ {
+ journal.appendAddRecord(count, (byte)0, record);
+
+ if (count >= NUMBER_OF_RECORDS / 2)
+ {
+ journal.appendDeleteRecord(count - NUMBER_OF_RECORDS / 2);
+ }
+
+ if (count % 100 == 0)
+ {
+ log.info("Done: " + count);
+ }
+ }
+
+ long end = System.currentTimeMillis();
+
+ double rate = 1000 * ((double)NUMBER_OF_RECORDS) / (end - start);
+
+ log.info("Rate of " + rate + " adds/removes per sec");
+
+ log.info("Reclaim status = " + debugJournal());
+
+ stopJournal();
+ createJournal();
+ startJournal();
+ journal.load(new ArrayList<RecordInfo>(), new ArrayList<PreparedTransactionInfo>());
+
+ assertEquals(NUMBER_OF_RECORDS / 2, journal.getIDMapSize());
+
+ stopJournal();
+ }
+
+ public void testSpeedNonTransactional() throws Exception
+ {
+ for (int i=0;i<1;i++)
+ {
+ this.setUp();
+ System.gc(); Thread.sleep(500);
+ internaltestSpeedNonTransactional();
+ this.tearDown();
+ }
+ }
+
+ public void internaltestSpeedNonTransactional() throws Exception
+ {
+
+ final long numMessages = 10000;
+
+ int numFiles = (int)(((numMessages * 1024 + 512) / (10 * 1024 * 1024)) * 1.3);
+
+ if (numFiles<2) numFiles = 2;
+
+ log.info("num Files=" + numFiles);
+
+ Journal journal =
+ new JournalImpl(10 * 1024 * 1024, numFiles, true, getFileFactory(),
+ 5000, "jbm-data", "jbm", 5000, 120);
+
+ journal.start();
+
+ journal.load(new ArrayList<RecordInfo>(), null);
+
+
+ final CountDownLatch latch = new CountDownLatch((int)numMessages);
+
+
+ class LocalCallback implements IOCallback
+ {
+
+ int i=0;
+ String message = null;
+ boolean done = false;
+ CountDownLatch latch;
+
+ public LocalCallback(int i, CountDownLatch latch)
+ {
+ this.i = i;
+ this.latch = latch;
+ }
+ public void done()
+ {
+ synchronized (this)
+ {
+ if (done)
+ {
+ message = "done received in duplicate";
+ }
+ done = true;
+ this.latch.countDown();
+ }
+ }
+
+ public void onError(int errorCode, String errorMessage)
+ {
+ synchronized (this)
+ {
+ System.out.println("********************** Error = " + (i++));
+ message = errorMessage;
+ latch.countDown();
+ }
+ }
+
+ }
+
+
+ log.info("Adding data");
+ byte[] data = new byte[700];
+
+ long start = System.currentTimeMillis();
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ journal.appendAddRecord(i, (byte)0, data);
+ }
+
+ long end = System.currentTimeMillis();
+
+ double rate = 1000 * (double)numMessages / (end - start);
+
+ boolean failed = false;
+
+ // If this fails it is probably because JournalImpl it is closing the files without waiting all the completes to arrive first
+ assertFalse(failed);
+
+
+ log.info("Rate " + rate + " records/sec");
+
+ journal.stop();
+
+ journal =
+ new JournalImpl(10 * 1024 * 1024, numFiles, true, getFileFactory(),
+ 5000, "jbm-data", "jbm", 5000, 120);
+
+ journal.start();
+ journal.load(new ArrayList<RecordInfo>(), null);
+ journal.stop();
+
+ }
+
+ public void testSpeedTransactional() throws Exception
+ {
+ Journal journal =
+ new JournalImpl(10 * 1024 * 1024, 10, true, getFileFactory(),
+ 5000, "jbm-data", "jbm", 5000, 120);
+
+ journal.start();
+
+ journal.load(new ArrayList<RecordInfo>(), null);
+
+ try
+ {
+ final int numMessages = 50050;
+
+ byte[] data = new byte[1024];
+
+ long start = System.currentTimeMillis();
+
+ int count = 0;
+ double rates[] = new double[50];
+ for (int i = 0; i < 50; i++)
+ {
+ long startTrans = System.currentTimeMillis();
+ for (int j=0; j<1000; j++)
+ {
+ journal.appendAddRecordTransactional(i, (byte)0, count++, data);
+ }
+
+ journal.appendCommitRecord(i);
+
+ long endTrans = System.currentTimeMillis();
+
+ rates[i] = 1000 * (double)1000 / (endTrans - startTrans);
+ }
+
+ long end = System.currentTimeMillis();
+
+ for (double rate: rates)
+ {
+ log.info("Transaction Rate = " + rate + " records/sec");
+
+ }
+
+ double rate = 1000 * (double)numMessages / (end - start);
+
+ log.info("Rate " + rate + " records/sec");
+ }
+ finally
+ {
+ journal.stop();
+ }
+
+ }
+
+
+}
+
+
Copied: trunk/tests/src/org/jboss/messaging/tests/performance/journal/RealJournalImplAIOTest.java (from rev 4272, trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/timing/RealJournalImplAIOTest.java)
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/performance/journal/RealJournalImplAIOTest.java (rev 0)
+++ trunk/tests/src/org/jboss/messaging/tests/performance/journal/RealJournalImplAIOTest.java 2008-05-22 03:07:49 UTC (rev 4277)
@@ -0,0 +1,67 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.messaging.tests.performance.journal;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.jboss.messaging.core.asyncio.AIOCallback;
+import org.jboss.messaging.core.journal.IOCallback;
+import org.jboss.messaging.core.journal.Journal;
+import org.jboss.messaging.core.journal.RecordInfo;
+import org.jboss.messaging.core.journal.SequentialFileFactory;
+import org.jboss.messaging.core.journal.impl.AIOSequentialFileFactory;
+import org.jboss.messaging.core.journal.impl.JournalImpl;
+import org.jboss.messaging.core.journal.impl.NIOSequentialFileFactory;
+import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.tests.unit.core.journal.impl.fakes.FakeCallback;
+
+/**
+ *
+ * A RealJournalImplTest
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ */
+public class RealJournalImplAIOTest extends JournalImplTestUnit
+{
+ private static final Logger log = Logger.getLogger(RealJournalImplAIOTest.class);
+
+ protected String journalDir = System.getProperty("user.home") + "/journal-test";
+
+ protected SequentialFileFactory getFileFactory() throws Exception
+ {
+ File file = new File(journalDir);
+
+ log.info("deleting directory " + journalDir);
+
+ deleteDirectory(file);
+
+ file.mkdir();
+
+ return new AIOSequentialFileFactory(journalDir);
+ }
+
+}
+
Copied: trunk/tests/src/org/jboss/messaging/tests/performance/journal/RealJournalImplTest.java (from rev 4272, trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/timing/RealJournalImplTest.java)
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/performance/journal/RealJournalImplTest.java (rev 0)
+++ trunk/tests/src/org/jboss/messaging/tests/performance/journal/RealJournalImplTest.java 2008-05-22 03:07:49 UTC (rev 4277)
@@ -0,0 +1,58 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.messaging.tests.performance.journal;
+
+import java.io.File;
+
+import org.jboss.messaging.core.journal.SequentialFileFactory;
+import org.jboss.messaging.core.journal.impl.NIOSequentialFileFactory;
+import org.jboss.messaging.core.logging.Logger;
+
+/**
+ *
+ * A RealJournalImplTest
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ */
+public class RealJournalImplTest extends JournalImplTestUnit
+{
+ private static final Logger log = Logger.getLogger(RealJournalImplTest.class);
+
+ protected String journalDir = System.getProperty("user.home") + "/journal-test";
+
+ protected SequentialFileFactory getFileFactory() throws Exception
+ {
+ File file = new File(journalDir);
+
+ log.info("deleting directory " + journalDir);
+
+ deleteDirectory(file);
+
+ file.mkdir();
+
+ return new NIOSequentialFileFactory(journalDir);
+ }
+
+
+}
+
Copied: trunk/tests/src/org/jboss/messaging/tests/performance/persistence/StorageManagerTimingTest.java (from rev 4276, trunk/tests/src/org/jboss/messaging/tests/unit/core/persistence/impl/timing/StorageManagerTimingTest.java)
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/performance/persistence/StorageManagerTimingTest.java (rev 0)
+++ trunk/tests/src/org/jboss/messaging/tests/performance/persistence/StorageManagerTimingTest.java 2008-05-22 03:07:49 UTC (rev 4277)
@@ -0,0 +1,288 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.tests.performance.persistence;
+
+import java.io.File;
+import java.util.HashMap;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.jboss.messaging.core.asyncio.impl.AsynchronousFileImpl;
+import org.jboss.messaging.core.config.impl.FileConfiguration;
+import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.message.impl.MessageImpl;
+import org.jboss.messaging.core.persistence.impl.journal.JournalStorageManager;
+import org.jboss.messaging.core.remoting.impl.mina.BufferWrapper;
+import org.jboss.messaging.core.server.JournalType;
+import org.jboss.messaging.core.server.Queue;
+import org.jboss.messaging.core.server.impl.ServerMessageImpl;
+import org.jboss.messaging.tests.performance.persistence.fakes.FakePostOffice;
+import org.jboss.messaging.tests.util.UnitTestCase;
+import org.jboss.messaging.util.SimpleString;
+
+public class StorageManagerTimingTest extends UnitTestCase
+{
+
+ private static final Logger log = Logger.getLogger(StorageManagerTimingTest.class);
+
+ protected void tearDown() throws Exception
+ {
+ super.tearDown();
+ assertEquals(0, AsynchronousFileImpl.getTotalMaxIO());
+ }
+
+
+ public void testAIO() throws Exception
+ {
+ // just to do some initial loading.. ignore this rate
+ internalTestStorage(JournalType.ASYNCIO, 1000, 1, 1);
+
+ double rate = internalTestStorage(JournalType.ASYNCIO, 60000, 1, 1)[0];
+ printRates("Rate of AIO, 60000 inserts / commits on every insert", rate);
+
+ rate = internalTestStorage(JournalType.ASYNCIO, 30000, -1, 1)[0];
+ printRates("Rate of AIO, 30000 inserts / single commit at the end", rate);
+
+ rate = internalTestStorage(JournalType.ASYNCIO, 30000, 5, 1)[0];
+ printRates("Rate of AIO, 30000 inserts / commit every 5 recodds", rate);
+
+ rate = internalTestStorage(JournalType.ASYNCIO, 30000, -1, 1)[0];
+ printRates("Rate of AIO, 30000 inserts / single commit at the end (again)", rate);
+
+ }
+
+ public void testAIOMultiThread() throws Exception
+ {
+ double[] rates = internalTestStorage(JournalType.ASYNCIO, 10000, -1, 1);
+ rates = internalTestStorage(JournalType.ASYNCIO, 30000, -1, 5);
+
+ printRates("Rate of AIO, 30000 inserts / single commit at the end", rates);
+
+
+ rates = internalTestStorage(JournalType.ASYNCIO, 5000, 1, 5);
+
+ printRates("Rate of AIO, 30000 inserts / commit on every insert", rates);
+ }
+
+ public void testNIO() throws Exception
+ {
+ // just to do some initial loading.. ignore this rate
+ internalTestStorage(JournalType.NIO, 1000, 1, 1);
+ double rate = internalTestStorage(JournalType.NIO, 1000, 1, 1)[0];
+ printRates("Rate of NIO, 1000 inserts, 1000 commits", rate);
+
+ rate = internalTestStorage(JournalType.NIO, 30000, -1, 1)[0];
+ printRates("Rate of NIO, 30000 inserts / single commit at the end", rate);
+
+ rate = internalTestStorage(JournalType.NIO, 30000, 5, 1)[0];
+ printRates("Rate of NIO, 30000 inserts / commit every 5 records", rate);
+ }
+
+ public void testNIOMultiThread() throws Exception
+ {
+
+ double[] rates = internalTestStorage(JournalType.NIO, 5000, -1, 5);
+
+ printRates("Rate of NIO, 5000 inserts / single commit at the end", rates);
+
+ rates = internalTestStorage(JournalType.NIO, 5000, 1, 5);
+
+ printRates("Rate of NIO, 5000 inserts / commit on every insert", rates);
+
+
+ }
+
+ public double[] internalTestStorage(final JournalType journalType,
+ final long numberOfMessages,
+ final int transInterval,
+ final int numberOfThreads) throws Exception
+ {
+ FileConfiguration configuration = new FileConfiguration();
+
+ configuration.start();
+
+ deleteDirectory(new File(configuration.getBindingsDirectory()));
+ deleteDirectory(new File(configuration.getJournalDirectory()));
+
+ configuration.setJournalType(journalType);
+
+ final JournalStorageManager journal = new JournalStorageManager(configuration);
+ journal.start();
+
+ FakePostOffice office = new FakePostOffice();
+
+ HashMap<Long, Queue> queues = new HashMap<Long, Queue>();
+
+ journal.loadMessages(office, queues);
+
+ final byte[] bytes = new byte[900];
+
+ for (int i=0;i<bytes.length;i++)
+ {
+ bytes[i] = (byte)('a' + (i%20));
+ }
+
+
+ final BufferWrapper buffer = new BufferWrapper(1024);
+ buffer.putBytes(bytes);
+
+ final AtomicLong transactionGenerator = new AtomicLong(1);
+
+ class LocalThread extends Thread
+ {
+ int id;
+ int commits = 1;
+ Exception e;
+ long totalTime = 0;
+ public LocalThread(int id)
+ {
+ super("LocalThread:" + id);
+ this.id = id;
+ }
+
+ public void run()
+ {
+ try
+ {
+ long start = System.currentTimeMillis();
+
+ long trans = transactionGenerator.incrementAndGet();
+ boolean commitPending=false;
+ for (long i=1;i<=numberOfMessages;i++)
+ {
+
+ final SimpleString address = new SimpleString("Destination " + i);
+
+
+ ServerMessageImpl implMsg = new ServerMessageImpl(/* type */ 1, /* durable */ true, /* expiration */ 0,
+ /* timestamp */ 0, /* priority */(byte)0);
+
+ implMsg.putStringProperty(new SimpleString("Key"), new SimpleString("This String is worthless!"));
+
+ implMsg.setMessageID(i);
+ implMsg.setBody(buffer);
+
+ implMsg.setDestination(address);
+
+
+
+ journal.storeMessageTransactional(trans, implMsg);
+
+ commitPending = true;
+
+ if (transInterval>0 && i%transInterval == 0)
+ {
+ journal.commit(trans);
+ commits ++;
+ trans = transactionGenerator.incrementAndGet();
+ commitPending = false;
+ }
+ }
+
+ if (commitPending) journal.commit(trans);
+
+
+ long end = System.currentTimeMillis();
+
+ totalTime = end - start;
+ }
+ catch (Exception e)
+ {
+ log.error(e.getMessage(), e);
+ this.e = e;
+ }
+ }
+ }
+
+ try
+ {
+ LocalThread[] threads = new LocalThread[numberOfThreads];
+
+
+ for (int i = 0; i < numberOfThreads; i++)
+ {
+ threads[i] = new LocalThread(i);
+ }
+
+ for (int i = 0; i < numberOfThreads; i++)
+ {
+ threads[i].start();
+ }
+
+ for (int i = 0; i < numberOfThreads; i++)
+ {
+ threads[i].join();
+ }
+
+ for (int i = 0; i < numberOfThreads; i++)
+ {
+ if (threads[i].e != null)
+ {
+ throw threads[i].e;
+ }
+ }
+
+ double rates[] = new double[numberOfThreads];
+
+ for (int i=0; i<numberOfThreads; i++)
+ {
+ rates[i] = (numberOfMessages + threads[i].commits) * 1000 / threads[i].totalTime;
+ }
+
+ return rates;
+ }
+ finally
+ {
+ journal.stop();
+ }
+
+ }
+
+
+ private void printRates(String msg, double rate)
+ {
+ printRates(msg, new double[] { rate });
+ }
+ private void printRates(String msg, double[] rates)
+ {
+ double rate = 0;
+
+ log.info("*************************************************************************");
+ log.info(" " + msg + " ");
+
+ double totalRate = 0;
+ for (int i=0; i<rates.length; i++)
+ {
+ rate = rates[i];
+ totalRate += rate;
+ if (rates.length>1)
+ {
+ log.info( " Thread " + i + ": = " + rate + " inserts/sec (including commits)");
+ }
+ }
+
+ log.info( " Total rate : = " + totalRate + " inserts/sec (including commits)");
+ log.info("*************************************************************************");
+ }
+
+
+}
Copied: trunk/tests/src/org/jboss/messaging/tests/performance/persistence/fakes/FakeBinding.java (from rev 4272, trunk/tests/src/org/jboss/messaging/tests/unit/core/persistence/fakes/FakeBinding.java)
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/performance/persistence/fakes/FakeBinding.java (rev 0)
+++ trunk/tests/src/org/jboss/messaging/tests/performance/persistence/fakes/FakeBinding.java 2008-05-22 03:07:49 UTC (rev 4277)
@@ -0,0 +1,28 @@
+package org.jboss.messaging.tests.performance.persistence.fakes;
+
+import org.jboss.messaging.core.postoffice.Binding;
+import org.jboss.messaging.core.server.Queue;
+import org.jboss.messaging.util.SimpleString;
+
+public class FakeBinding implements Binding
+{
+ SimpleString address;
+ Queue queue;
+
+ public FakeBinding(SimpleString address, Queue queue)
+ {
+ this.address = address;
+ this.queue = queue;
+ }
+
+ public SimpleString getAddress()
+ {
+ return address;
+ }
+
+ public Queue getQueue()
+ {
+ return queue;
+ }
+
+}
Copied: trunk/tests/src/org/jboss/messaging/tests/performance/persistence/fakes/FakePostOffice.java (from rev 4272, trunk/tests/src/org/jboss/messaging/tests/unit/core/persistence/fakes/FakePostOffice.java)
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/performance/persistence/fakes/FakePostOffice.java (rev 0)
+++ trunk/tests/src/org/jboss/messaging/tests/performance/persistence/fakes/FakePostOffice.java 2008-05-22 03:07:49 UTC (rev 4277)
@@ -0,0 +1,109 @@
+package org.jboss.messaging.tests.performance.persistence.fakes;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.jboss.messaging.core.filter.Filter;
+import org.jboss.messaging.core.postoffice.Binding;
+import org.jboss.messaging.core.postoffice.FlowController;
+import org.jboss.messaging.core.postoffice.PostOffice;
+import org.jboss.messaging.core.server.Queue;
+import org.jboss.messaging.core.server.QueueFactory;
+import org.jboss.messaging.core.server.ServerMessage;
+import org.jboss.messaging.tests.unit.core.server.impl.fakes.FakeQueueFactory;
+import org.jboss.messaging.util.ConcurrentHashSet;
+import org.jboss.messaging.util.SimpleString;
+
+
+
+/** Maybe this Fake should be moved to postoffice.fakes, but since this
+ * Fake only has the basic needed for StorageManagerTest, I have left it here for now */
+public class FakePostOffice implements PostOffice
+{
+
+ ConcurrentHashMap<SimpleString, Binding> bindings = new ConcurrentHashMap<SimpleString, Binding>();
+
+ QueueFactory queueFactory = new FakeQueueFactory();
+
+ ConcurrentHashSet<SimpleString> addresses = new ConcurrentHashSet<SimpleString>();
+
+ public Binding addBinding(SimpleString address, SimpleString queueName,
+ Filter filter, boolean durable, boolean temporary) throws Exception
+ {
+ Queue queue = queueFactory.createQueue(-1, queueName, filter, durable, temporary);
+ Binding binding = new FakeBinding(address, queue);
+ bindings.put(address, binding);
+ return binding;
+ }
+
+ public boolean addDestination(SimpleString address, boolean temporary)
+ throws Exception
+ {
+ return addresses.addIfAbsent(address);
+ }
+
+ public boolean containsDestination(SimpleString address)
+ {
+ return addresses.contains(address);
+ }
+
+ public Binding getBinding(SimpleString queueName) throws Exception
+ {
+ return bindings.get(queueName);
+ }
+
+ public List<Binding> getBindingsForAddress(SimpleString address)
+ throws Exception
+ {
+ return null;
+ }
+
+ public FlowController getFlowController(SimpleString address)
+ {
+ return null;
+ }
+
+ public Map<SimpleString, List<Binding>> getMappings()
+ {
+ return null;
+ }
+
+ public Set<SimpleString> listAllDestinations()
+ {
+ return null;
+ }
+
+ public Binding removeBinding(SimpleString queueName) throws Exception
+ {
+ return null;
+ }
+
+ public boolean removeDestination(SimpleString address, boolean temporary)
+ throws Exception
+ {
+ // TODO Auto-generated method stub
+ return false;
+ }
+
+ public void start() throws Exception
+ {
+ // TODO Auto-generated method stub
+
+ }
+
+ public void stop() throws Exception
+ {
+ // TODO Auto-generated method stub
+
+ }
+
+ public List<org.jboss.messaging.core.server.MessageReference> route(
+ ServerMessage message) throws Exception
+ {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+}
Copied: trunk/tests/src/org/jboss/messaging/tests/performance/remoting/MeasureBase.java (from rev 4276, trunk/tests/src/org/jboss/messaging/tests/integration/core/remoting/mina/timing/MeasureBase.java)
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/performance/remoting/MeasureBase.java (rev 0)
+++ trunk/tests/src/org/jboss/messaging/tests/performance/remoting/MeasureBase.java 2008-05-22 03:07:49 UTC (rev 4277)
@@ -0,0 +1,193 @@
+/*
+ * JBoss, the OpenSource J2EE webOS
+ *
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
+ */
+
+package org.jboss.messaging.tests.performance.remoting;
+
+import junit.framework.TestCase;
+
+import org.jboss.messaging.core.client.impl.ConnectionParamsImpl;
+import org.jboss.messaging.core.client.impl.LocationImpl;
+import org.jboss.messaging.core.client.impl.RemotingConnectionImpl;
+import org.jboss.messaging.core.config.impl.ConfigurationImpl;
+import org.jboss.messaging.core.remoting.Packet;
+import org.jboss.messaging.core.remoting.PacketDispatcher;
+import org.jboss.messaging.core.remoting.PacketHandler;
+import org.jboss.messaging.core.remoting.PacketReturner;
+import org.jboss.messaging.core.remoting.impl.mina.MinaService;
+import org.jboss.messaging.core.remoting.impl.wireformat.EmptyPacket;
+
+
+/**
+ *
+ * @author clebert suconic
+ *
+ */
+public abstract class MeasureBase extends TestCase
+{
+ protected MinaService service;
+ protected PacketDispatcher serverDispatcher;
+
+ @Override
+ public void setUp() throws Exception
+ {
+ super.setUp();
+ startServer();
+ serverDispatcher.register(new FakeHandler());
+ }
+
+ @Override
+ public void tearDown() throws Exception
+ {
+ service.stop();
+ }
+
+
+ public void testMixingSends() throws Throwable
+ {
+ RemotingConnectionImpl remoting = new RemotingConnectionImpl(getLocation(), createParameters());
+ remoting.start();
+
+ int NUMBER_OF_MESSAGES = 300;
+
+ long start = System.currentTimeMillis();
+
+ for (int i=0; i<NUMBER_OF_MESSAGES; i++)
+ {
+ if (i%2 == 0)
+ {
+ remoting.sendOneWay(10, 10, new EmptyPacket(EmptyPacket.CLOSE));
+ }
+ else
+ {
+ Object ret = remoting.sendBlocking(10, 0, new EmptyPacket(EmptyPacket.CLOSE));
+ assertTrue (ret instanceof EmptyPacket);
+ //assertEquals(EmptyPacket.EXCEPTION, ret.getType());
+ }
+ }
+
+ long end = System.currentTimeMillis();
+
+
+ System.out.println("Messages / second = " + NUMBER_OF_MESSAGES * 1000 / (end-start));
+ Thread.sleep(1000);
+
+ remoting.stop();
+
+ }
+
+ public void testBlockSends() throws Throwable
+ {
+ //NIOConnector connector = createNIOConnector(new PacketDispatcherImpl(null));
+ //NIOSession session = connector.connect();
+
+
+
+ RemotingConnectionImpl remoting = new RemotingConnectionImpl(getLocation(), createParameters());
+ remoting.start();
+
+ int NUMBER_OF_MESSAGES = 100;
+
+ long start = System.currentTimeMillis();
+
+ for (int i=0; i<NUMBER_OF_MESSAGES; i++)
+ {
+ Object ret = remoting.sendBlocking(10, 10, new EmptyPacket(EmptyPacket.CLOSE));
+ assertTrue (ret instanceof EmptyPacket);
+ }
+
+ long end = System.currentTimeMillis();
+
+
+ System.out.println("Messages / second = " + NUMBER_OF_MESSAGES * 1000 / (end-start));
+ Thread.sleep(1000);
+
+ remoting.stop();
+
+ }
+
+ public void testOneWaySends() throws Throwable
+ {
+ //NIOConnector connector = createNIOConnector(new PacketDispatcherImpl(null));
+ //NIOSession session = connector.connect();
+
+
+ RemotingConnectionImpl remoting = new RemotingConnectionImpl(getLocation(), createParameters());
+ remoting.start();
+
+ int NUMBER_OF_MESSAGES = 30000;
+
+ long start = System.currentTimeMillis();
+
+ for (int i=0; i<NUMBER_OF_MESSAGES; i++)
+ {
+ remoting.sendOneWay(10, 10, new EmptyPacket(EmptyPacket.CLOSE));
+ }
+
+ remoting.sendBlocking(10, 10, new EmptyPacket(EmptyPacket.CLOSE));
+
+ long end = System.currentTimeMillis();
+
+
+ System.out.println("Messages / second = " + NUMBER_OF_MESSAGES * 1000 / (end-start));
+
+ remoting.stop();
+
+ }
+
+ protected abstract LocationImpl getLocation();
+
+ protected abstract ConfigurationImpl createConfiguration();
+
+
+
+ protected void startServer() throws Exception
+ {
+ service = new MinaService(createConfiguration());
+ service.start();
+ serverDispatcher = service.getDispatcher();
+ System.out.println("Server Dispatcher = " + serverDispatcher);
+ }
+
+ // Private
+
+ protected ConnectionParamsImpl createParameters()
+ {
+ ConnectionParamsImpl param = new ConnectionParamsImpl();
+ param.setTimeout(50000);
+ return param;
+ }
+
+
+
+ // Inner Classes
+
+ class FakeHandler implements PacketHandler
+ {
+
+ public long getID()
+ {
+ return 10;
+ }
+
+ public void handle(Packet packet, PacketReturner sender)
+ {
+ //System.out.println("Hello " + packet);
+ try
+ {
+ if (packet.getResponseTargetID() >= 0)
+ {
+ packet.setTargetID(packet.getResponseTargetID());
+ sender.send(packet);
+ }
+ } catch (Exception e)
+ {
+ e.printStackTrace();
+ }
+ }
+ }
+
+}
Copied: trunk/tests/src/org/jboss/messaging/tests/performance/remoting/MeasureInVMTest.java (from rev 4276, trunk/tests/src/org/jboss/messaging/tests/integration/core/remoting/mina/timing/MeasureInVMTest.java)
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/performance/remoting/MeasureInVMTest.java (rev 0)
+++ trunk/tests/src/org/jboss/messaging/tests/performance/remoting/MeasureInVMTest.java 2008-05-22 03:07:49 UTC (rev 4277)
@@ -0,0 +1,33 @@
+/*
+ * JBoss, the OpenSource J2EE webOS
+ *
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
+ */
+
+package org.jboss.messaging.tests.performance.remoting;
+
+import org.jboss.messaging.core.client.impl.LocationImpl;
+import org.jboss.messaging.core.config.impl.ConfigurationImpl;
+import org.jboss.messaging.tests.unit.core.remoting.impl.ConfigurationHelper;
+
+
+/** This test was added to compare InVM calls against MINA calls */
+public class MeasureInVMTest extends MeasureBase
+{
+
+ @Override
+ protected LocationImpl getLocation()
+ {
+ return new LocationImpl(0);
+
+ }
+
+ protected ConfigurationImpl createConfiguration()
+ {
+ return ConfigurationHelper.newInVMConfig();
+ }
+
+
+
+}
Copied: trunk/tests/src/org/jboss/messaging/tests/performance/remoting/MeasureRemoteTest.java (from rev 4276, trunk/tests/src/org/jboss/messaging/tests/integration/core/remoting/mina/timing/MeasureRemoteTest.java)
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/performance/remoting/MeasureRemoteTest.java (rev 0)
+++ trunk/tests/src/org/jboss/messaging/tests/performance/remoting/MeasureRemoteTest.java 2008-05-22 03:07:49 UTC (rev 4277)
@@ -0,0 +1,34 @@
+/*
+ * JBoss, the OpenSource J2EE webOS
+ *
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
+ */
+
+package org.jboss.messaging.tests.performance.remoting;
+
+import static org.jboss.messaging.core.remoting.TransportType.TCP;
+
+import org.jboss.messaging.core.client.impl.LocationImpl;
+import org.jboss.messaging.core.config.impl.ConfigurationImpl;
+import org.jboss.messaging.tests.integration.core.remoting.mina.TestSupport;
+import org.jboss.messaging.tests.unit.core.remoting.impl.ConfigurationHelper;
+
+public class MeasureRemoteTest extends MeasureBase
+{
+
+ @Override
+ protected LocationImpl getLocation()
+ {
+ return new LocationImpl(TCP, "localhost", TestSupport.PORT);
+ }
+
+ @Override
+ protected ConfigurationImpl createConfiguration()
+ {
+ return ConfigurationHelper.newTCPConfiguration("localhost", TestSupport.PORT);
+ }
+
+
+
+}
Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/config/impl/ConfigurationTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/config/impl/ConfigurationTest.java 2008-05-21 19:37:04 UTC (rev 4276)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/config/impl/ConfigurationTest.java 2008-05-22 03:07:49 UTC (rev 4277)
@@ -131,6 +131,11 @@
assertEquals(123, configuration.getJournalMaxAIO());
}
+ public void testAIOTimeout() throws Exception
+ {
+ assertEquals(123, configuration.getJournalAIOTimeout());
+ }
+
//config is supposed to be immutable??
// public void testPropertyChangeListener() throws Exception
// {
Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/AIOSequentialFileFactoryTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/AIOSequentialFileFactoryTest.java 2008-05-21 19:37:04 UTC (rev 4276)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/AIOSequentialFileFactoryTest.java 2008-05-22 03:07:49 UTC (rev 4277)
@@ -61,7 +61,7 @@
public void testBuffer() throws Exception
{
- SequentialFile file = factory.createSequentialFile("filtetmp.log", true, 10);
+ SequentialFile file = factory.createSequentialFile("filtetmp.log", true, 10, 120);
file.open();
ByteBuffer buff = factory.newBuffer(10);
assertEquals(512, buff.limit());
@@ -122,7 +122,7 @@
final int NUMBER_OF_RECORDS = 10000;
- SequentialFile file = factory.createSequentialFile("callbackBlock.log", true, 1000);
+ SequentialFile file = factory.createSequentialFile("callbackBlock.log", true, 1000, 120);
file.open();
file.fill(0, 512 * NUMBER_OF_RECORDS, (byte)'a');
Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalImplTestBase.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalImplTestBase.java 2008-05-21 19:37:04 UTC (rev 4276)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalImplTestBase.java 2008-05-22 03:07:49 UTC (rev 4277)
@@ -139,7 +139,7 @@
public void createJournal() throws Exception
{
journal =
- new JournalImpl(fileSize, minFiles, sync, fileFactory, 1000, filePrefix, fileExtension, maxAIO);
+ new JournalImpl(fileSize, minFiles, sync, fileFactory, 1000, filePrefix, fileExtension, maxAIO, 120);
}
protected void startJournal() throws Exception
Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalImplTestUnit.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalImplTestUnit.java 2008-05-21 19:37:04 UTC (rev 4276)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalImplTestUnit.java 2008-05-22 03:07:49 UTC (rev 4277)
@@ -102,7 +102,7 @@
{
try
{
- new JournalImpl(JournalImpl.MIN_FILE_SIZE - 1, 10, true, fileFactory, 5000, filePrefix, fileExtension, 1);
+ new JournalImpl(JournalImpl.MIN_FILE_SIZE - 1, 10, true, fileFactory, 5000, filePrefix, fileExtension, 1, 120);
fail("Should throw exception");
}
@@ -113,7 +113,7 @@
try
{
- new JournalImpl(10 * 1024, 1, true, fileFactory, 5000, filePrefix, fileExtension, 1);
+ new JournalImpl(10 * 1024, 1, true, fileFactory, 5000, filePrefix, fileExtension, 1, 120);
fail("Should throw exception");
}
@@ -124,7 +124,7 @@
try
{
- new JournalImpl(10 * 1024, 10, true, null, 5000, filePrefix, fileExtension, 1);
+ new JournalImpl(10 * 1024, 10, true, null, 5000, filePrefix, fileExtension, 1, 120);
fail("Should throw exception");
}
@@ -135,7 +135,7 @@
try
{
- new JournalImpl(10 * 1024, 10, true, fileFactory, JournalImpl.MIN_TASK_PERIOD - 1, filePrefix, fileExtension, 1);
+ new JournalImpl(10 * 1024, 10, true, fileFactory, JournalImpl.MIN_TASK_PERIOD - 1, filePrefix, fileExtension, 1, 120);
fail("Should throw exception");
}
@@ -146,7 +146,7 @@
try
{
- new JournalImpl(10 * 1024, 10, true, fileFactory, 5000, null, fileExtension, 1);
+ new JournalImpl(10 * 1024, 10, true, fileFactory, 5000, null, fileExtension, 1, 120);
fail("Should throw exception");
}
@@ -157,7 +157,7 @@
try
{
- new JournalImpl(10 * 1024, 10, true, fileFactory, 5000, filePrefix, null, 1);
+ new JournalImpl(10 * 1024, 10, true, fileFactory, 5000, filePrefix, null, 1, 120);
fail("Should throw exception");
}
@@ -168,7 +168,7 @@
try
{
- new JournalImpl(10 * 1024, 10, true, fileFactory, 5000, filePrefix, null, 0);
+ new JournalImpl(10 * 1024, 10, true, fileFactory, 5000, filePrefix, null, 0, 120);
fail("Should throw exception");
}
@@ -177,6 +177,17 @@
//Ok
}
+ try
+ {
+ new JournalImpl(10 * 1024, 10, true, fileFactory, 5000, filePrefix, fileExtension, 0, -1);
+
+ fail("Should throw exception");
+ }
+ catch (IllegalStateException e)
+ {
+ //Ok
+ }
+
}
public void testFilesImmediatelyAfterload() throws Exception
Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/SequentialFileFactoryTestBase.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/SequentialFileFactoryTestBase.java 2008-05-21 19:37:04 UTC (rev 4276)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/SequentialFileFactoryTestBase.java 2008-05-22 03:07:49 UTC (rev 4277)
@@ -74,7 +74,7 @@
expectedFiles.add(fileName);
- SequentialFile sf = factory.createSequentialFile(fileName, false, 1);
+ SequentialFile sf = factory.createSequentialFile(fileName, false, 1, 120);
sf.open();
@@ -85,10 +85,10 @@
//Create a couple with a different extension - they shouldn't be picked up
- SequentialFile sf1 = factory.createSequentialFile("different.file", false, 1);
+ SequentialFile sf1 = factory.createSequentialFile("different.file", false, 1, 120);
sf1.open();
- SequentialFile sf2 = factory.createSequentialFile("different.cheese", false, 1);
+ SequentialFile sf2 = factory.createSequentialFile("different.cheese", false, 1, 120);
sf2.open();
List<String> fileNames = factory.listFiles("jbm");
@@ -119,7 +119,7 @@
public void testFill() throws Exception
{
- SequentialFile sf = factory.createSequentialFile("fill.jbm", true, 1);
+ SequentialFile sf = factory.createSequentialFile("fill.jbm", true, 1, 120);
sf.open();
@@ -144,11 +144,11 @@
public void testDelete() throws Exception
{
- SequentialFile sf = factory.createSequentialFile("delete-me.jbm", true, 1);
+ SequentialFile sf = factory.createSequentialFile("delete-me.jbm", true, 1, 120);
sf.open();
- SequentialFile sf2 = factory.createSequentialFile("delete-me2.jbm", true, 1);
+ SequentialFile sf2 = factory.createSequentialFile("delete-me2.jbm", true, 1, 120);
sf2.open();
@@ -174,7 +174,7 @@
public void testWriteandRead() throws Exception
{
- SequentialFile sf = factory.createSequentialFile("write.jbm", true, 1);
+ SequentialFile sf = factory.createSequentialFile("write.jbm", true, 1, 120);
sf.open();
@@ -249,7 +249,7 @@
public void testPosition() throws Exception
{
- SequentialFile sf = factory.createSequentialFile("position.jbm", true, 1);
+ SequentialFile sf = factory.createSequentialFile("position.jbm", true, 1, 120);
sf.open();
@@ -321,7 +321,7 @@
public void testOpenClose() throws Exception
{
- SequentialFile sf = factory.createSequentialFile("openclose.jbm", true, 1);
+ SequentialFile sf = factory.createSequentialFile("openclose.jbm", true, 1, 120);
sf.open();
Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java 2008-05-21 19:37:04 UTC (rev 4276)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/fakes/FakeSequentialFileFactory.java 2008-05-22 03:07:49 UTC (rev 4277)
@@ -45,7 +45,7 @@
private Map<String, FakeSequentialFile> fileMap = new ConcurrentHashMap<String, FakeSequentialFile>();
- public SequentialFile createSequentialFile(final String fileName, final boolean sync, final int maxAIO) throws Exception
+ public SequentialFile createSequentialFile(final String fileName, final boolean sync, final int maxAIO, final int timeout) throws Exception
{
FakeSequentialFile sf = fileMap.get(fileName);
Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/timing/JournalImplTestUnit.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/timing/JournalImplTestUnit.java 2008-05-21 19:37:04 UTC (rev 4276)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/timing/JournalImplTestUnit.java 2008-05-22 03:07:49 UTC (rev 4277)
@@ -22,14 +22,10 @@
package org.jboss.messaging.tests.unit.core.journal.impl.timing;
import java.util.ArrayList;
-import java.util.concurrent.CountDownLatch;
import org.jboss.messaging.core.asyncio.impl.AsynchronousFileImpl;
-import org.jboss.messaging.core.journal.IOCallback;
-import org.jboss.messaging.core.journal.Journal;
import org.jboss.messaging.core.journal.PreparedTransactionInfo;
import org.jboss.messaging.core.journal.RecordInfo;
-import org.jboss.messaging.core.journal.impl.JournalImpl;
import org.jboss.messaging.core.logging.Logger;
import org.jboss.messaging.tests.unit.core.journal.impl.JournalImplTestBase;
@@ -80,8 +76,7 @@
deletes[i] = i;
}
- // This would take a long time with sync=true, and still validates the file.
- setup(10, 10 * 1024 * 1024, false);
+ setup(10, 10 * 1024 * 1024, true);
createJournal();
startJournal();
load();
@@ -124,7 +119,7 @@
deletes[i] = i;
}
- setup(10, 10 * 1024, false);
+ setup(10, 10 * 1024, true);
createJournal();
startJournal();
load();
@@ -132,7 +127,6 @@
update(updates);
delete(deletes);
- log.info("Debug journal:" + debugJournal());
stopJournal(false);
createJournal();
startJournal();
@@ -189,168 +183,7 @@
stopJournal();
}
- public void testSpeedNonTransactional() throws Exception
- {
- for (int i=0;i<1;i++)
- {
- this.setUp();
- System.gc(); Thread.sleep(500);
- internaltestSpeedNonTransactional();
- this.tearDown();
- }
- }
- public void internaltestSpeedNonTransactional() throws Exception
- {
-
- final long numMessages = 10000;
-
- int numFiles = (int)(((numMessages * 1024 + 512) / (10 * 1024 * 1024)) * 1.3);
-
- if (numFiles<2) numFiles = 2;
-
- log.info("num Files=" + numFiles);
-
- Journal journal =
- new JournalImpl(10 * 1024 * 1024, numFiles, true, getFileFactory(),
- 5000, "jbm-data", "jbm", 5000);
-
- journal.start();
-
- journal.load(new ArrayList<RecordInfo>(), null);
-
-
- final CountDownLatch latch = new CountDownLatch((int)numMessages);
-
-
- class LocalCallback implements IOCallback
- {
-
- int i=0;
- String message = null;
- boolean done = false;
- CountDownLatch latch;
-
- public LocalCallback(int i, CountDownLatch latch)
- {
- this.i = i;
- this.latch = latch;
- }
- public void done()
- {
- synchronized (this)
- {
- if (done)
- {
- message = "done received in duplicate";
- }
- done = true;
- this.latch.countDown();
- }
- }
-
- public void onError(int errorCode, String errorMessage)
- {
- synchronized (this)
- {
- System.out.println("********************** Error = " + (i++));
- message = errorMessage;
- latch.countDown();
- }
- }
-
- }
-
-
- log.info("Adding data");
- byte[] data = new byte[700];
-
- long start = System.currentTimeMillis();
-
- for (int i = 0; i < numMessages; i++)
- {
- journal.appendAddRecord(i, (byte)0, data);
- }
-
- long end = System.currentTimeMillis();
-
- double rate = 1000 * (double)numMessages / (end - start);
-
- boolean failed = false;
-
- // If this fails it is probably because JournalImpl it is closing the files without waiting all the completes to arrive first
- assertFalse(failed);
-
-
- log.info("Rate " + rate + " records/sec");
-
- journal.stop();
-
- journal =
- new JournalImpl(10 * 1024 * 1024, numFiles, true, getFileFactory(),
- 5000, "jbm-data", "jbm", 5000);
-
- journal.start();
- journal.load(new ArrayList<RecordInfo>(), null);
- journal.stop();
-
- }
-
- public void testSpeedTransactional() throws Exception
- {
- Journal journal =
- new JournalImpl(10 * 1024 * 1024, 10, true, getFileFactory(),
- 5000, "jbm-data", "jbm", 5000);
-
- journal.start();
-
- journal.load(new ArrayList<RecordInfo>(), null);
-
- try
- {
- final int numMessages = 50050;
-
- byte[] data = new byte[1024];
-
- long start = System.currentTimeMillis();
-
- int count = 0;
- double rates[] = new double[50];
- for (int i = 0; i < 50; i++)
- {
- long startTrans = System.currentTimeMillis();
- for (int j=0; j<1000; j++)
- {
- journal.appendAddRecordTransactional(i, (byte)0, count++, data);
- }
-
- journal.appendCommitRecord(i);
-
- long endTrans = System.currentTimeMillis();
-
- rates[i] = 1000 * (double)1000 / (endTrans - startTrans);
- }
-
- long end = System.currentTimeMillis();
-
- for (double rate: rates)
- {
- log.info("Transaction Rate = " + rate + " records/sec");
-
- }
-
- double rate = 1000 * (double)numMessages / (end - start);
-
- log.info("Rate " + rate + " records/sec");
- }
- finally
- {
- journal.stop();
- }
-
- }
-
-
}
Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/timing/RealJournalImplAIOTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/timing/RealJournalImplAIOTest.java 2008-05-21 19:37:04 UTC (rev 4276)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/timing/RealJournalImplAIOTest.java 2008-05-22 03:07:49 UTC (rev 4277)
@@ -22,20 +22,10 @@
package org.jboss.messaging.tests.unit.core.journal.impl.timing;
import java.io.File;
-import java.util.ArrayList;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import org.jboss.messaging.core.asyncio.AIOCallback;
-import org.jboss.messaging.core.journal.IOCallback;
-import org.jboss.messaging.core.journal.Journal;
-import org.jboss.messaging.core.journal.RecordInfo;
import org.jboss.messaging.core.journal.SequentialFileFactory;
import org.jboss.messaging.core.journal.impl.AIOSequentialFileFactory;
-import org.jboss.messaging.core.journal.impl.JournalImpl;
-import org.jboss.messaging.core.journal.impl.NIOSequentialFileFactory;
import org.jboss.messaging.core.logging.Logger;
-import org.jboss.messaging.tests.unit.core.journal.impl.fakes.FakeCallback;
/**
*
Deleted: trunk/tests/src/org/jboss/messaging/tests/unit/core/persistence/fakes/FakeBinding.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/persistence/fakes/FakeBinding.java 2008-05-21 19:37:04 UTC (rev 4276)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/persistence/fakes/FakeBinding.java 2008-05-22 03:07:49 UTC (rev 4277)
@@ -1,28 +0,0 @@
-package org.jboss.messaging.tests.unit.core.persistence.fakes;
-
-import org.jboss.messaging.core.postoffice.Binding;
-import org.jboss.messaging.core.server.Queue;
-import org.jboss.messaging.util.SimpleString;
-
-public class FakeBinding implements Binding
-{
- SimpleString address;
- Queue queue;
-
- public FakeBinding(SimpleString address, Queue queue)
- {
- this.address = address;
- this.queue = queue;
- }
-
- public SimpleString getAddress()
- {
- return address;
- }
-
- public Queue getQueue()
- {
- return queue;
- }
-
-}
Deleted: trunk/tests/src/org/jboss/messaging/tests/unit/core/persistence/fakes/FakePostOffice.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/persistence/fakes/FakePostOffice.java 2008-05-21 19:37:04 UTC (rev 4276)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/persistence/fakes/FakePostOffice.java 2008-05-22 03:07:49 UTC (rev 4277)
@@ -1,109 +0,0 @@
-package org.jboss.messaging.tests.unit.core.persistence.fakes;
-
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-
-import org.jboss.messaging.core.filter.Filter;
-import org.jboss.messaging.core.postoffice.Binding;
-import org.jboss.messaging.core.postoffice.FlowController;
-import org.jboss.messaging.core.postoffice.PostOffice;
-import org.jboss.messaging.core.server.Queue;
-import org.jboss.messaging.core.server.QueueFactory;
-import org.jboss.messaging.core.server.ServerMessage;
-import org.jboss.messaging.tests.unit.core.server.impl.fakes.FakeQueueFactory;
-import org.jboss.messaging.util.ConcurrentHashSet;
-import org.jboss.messaging.util.SimpleString;
-
-
-
-/** Maybe this Fake should be moved to postoffice.fakes, but since this
- * Fake only has the basic needed for StorageManagerTest, I have left it here for now */
-public class FakePostOffice implements PostOffice
-{
-
- ConcurrentHashMap<SimpleString, Binding> bindings = new ConcurrentHashMap<SimpleString, Binding>();
-
- QueueFactory queueFactory = new FakeQueueFactory();
-
- ConcurrentHashSet<SimpleString> addresses = new ConcurrentHashSet<SimpleString>();
-
- public Binding addBinding(SimpleString address, SimpleString queueName,
- Filter filter, boolean durable, boolean temporary) throws Exception
- {
- Queue queue = queueFactory.createQueue(-1, queueName, filter, durable, temporary);
- Binding binding = new FakeBinding(address, queue);
- bindings.put(address, binding);
- return binding;
- }
-
- public boolean addDestination(SimpleString address, boolean temporary)
- throws Exception
- {
- return addresses.addIfAbsent(address);
- }
-
- public boolean containsDestination(SimpleString address)
- {
- return addresses.contains(address);
- }
-
- public Binding getBinding(SimpleString queueName) throws Exception
- {
- return bindings.get(queueName);
- }
-
- public List<Binding> getBindingsForAddress(SimpleString address)
- throws Exception
- {
- return null;
- }
-
- public FlowController getFlowController(SimpleString address)
- {
- return null;
- }
-
- public Map<SimpleString, List<Binding>> getMappings()
- {
- return null;
- }
-
- public Set<SimpleString> listAllDestinations()
- {
- return null;
- }
-
- public Binding removeBinding(SimpleString queueName) throws Exception
- {
- return null;
- }
-
- public boolean removeDestination(SimpleString address, boolean temporary)
- throws Exception
- {
- // TODO Auto-generated method stub
- return false;
- }
-
- public void start() throws Exception
- {
- // TODO Auto-generated method stub
-
- }
-
- public void stop() throws Exception
- {
- // TODO Auto-generated method stub
-
- }
-
- public List<org.jboss.messaging.core.server.MessageReference> route(
- ServerMessage message) throws Exception
- {
- // TODO Auto-generated method stub
- return null;
- }
-
-}
Deleted: trunk/tests/src/org/jboss/messaging/tests/unit/core/persistence/impl/timing/StorageManagerTimingTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/persistence/impl/timing/StorageManagerTimingTest.java 2008-05-21 19:37:04 UTC (rev 4276)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/persistence/impl/timing/StorageManagerTimingTest.java 2008-05-22 03:07:49 UTC (rev 4277)
@@ -1,288 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005, JBoss Inc., and individual contributors as indicated
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-
-package org.jboss.messaging.tests.unit.core.persistence.impl.timing;
-
-import java.io.File;
-import java.util.HashMap;
-import java.util.concurrent.atomic.AtomicLong;
-
-import org.jboss.messaging.core.asyncio.impl.AsynchronousFileImpl;
-import org.jboss.messaging.core.config.impl.FileConfiguration;
-import org.jboss.messaging.core.logging.Logger;
-import org.jboss.messaging.core.message.impl.MessageImpl;
-import org.jboss.messaging.core.persistence.impl.journal.JournalStorageManager;
-import org.jboss.messaging.core.remoting.impl.mina.BufferWrapper;
-import org.jboss.messaging.core.server.JournalType;
-import org.jboss.messaging.core.server.Queue;
-import org.jboss.messaging.core.server.impl.ServerMessageImpl;
-import org.jboss.messaging.tests.unit.core.persistence.fakes.FakePostOffice;
-import org.jboss.messaging.tests.util.UnitTestCase;
-import org.jboss.messaging.util.SimpleString;
-
-public class StorageManagerTimingTest extends UnitTestCase
-{
-
- private static final Logger log = Logger.getLogger(StorageManagerTimingTest.class);
-
- protected void tearDown() throws Exception
- {
- super.tearDown();
- assertEquals(0, AsynchronousFileImpl.getTotalMaxIO());
- }
-
-
- public void testAIO() throws Exception
- {
- // just to do some initial loading.. ignore this rate
- internalTestStorage(JournalType.ASYNCIO, 1000, 1, 1);
-
- double rate = internalTestStorage(JournalType.ASYNCIO, 60000, 1, 1)[0];
- printRates("Rate of AIO, 60000 inserts / commits on every insert", rate);
-
- rate = internalTestStorage(JournalType.ASYNCIO, 30000, -1, 1)[0];
- printRates("Rate of AIO, 30000 inserts / single commit at the end", rate);
-
- rate = internalTestStorage(JournalType.ASYNCIO, 30000, 5, 1)[0];
- printRates("Rate of AIO, 30000 inserts / commit every 5 recodds", rate);
-
- rate = internalTestStorage(JournalType.ASYNCIO, 30000, -1, 1)[0];
- printRates("Rate of AIO, 30000 inserts / single commit at the end (again)", rate);
-
- }
-
- public void testAIOMultiThread() throws Exception
- {
- double[] rates = internalTestStorage(JournalType.ASYNCIO, 10000, -1, 1);
- rates = internalTestStorage(JournalType.ASYNCIO, 30000, -1, 5);
-
- printRates("Rate of AIO, 30000 inserts / single commit at the end", rates);
-
-
- rates = internalTestStorage(JournalType.ASYNCIO, 5000, 1, 5);
-
- printRates("Rate of AIO, 30000 inserts / commit on every insert", rates);
- }
-
- public void testNIO() throws Exception
- {
- // just to do some initial loading.. ignore this rate
- internalTestStorage(JournalType.NIO, 1000, 1, 1);
- double rate = internalTestStorage(JournalType.NIO, 1000, 1, 1)[0];
- printRates("Rate of NIO, 1000 inserts, 1000 commits", rate);
-
- rate = internalTestStorage(JournalType.NIO, 30000, -1, 1)[0];
- printRates("Rate of NIO, 30000 inserts / single commit at the end", rate);
-
- rate = internalTestStorage(JournalType.NIO, 30000, 5, 1)[0];
- printRates("Rate of NIO, 30000 inserts / commit every 5 records", rate);
- }
-
- public void testNIOMultiThread() throws Exception
- {
-
- double[] rates = internalTestStorage(JournalType.NIO, 5000, -1, 5);
-
- printRates("Rate of NIO, 5000 inserts / single commit at the end", rates);
-
- rates = internalTestStorage(JournalType.NIO, 5000, 1, 5);
-
- printRates("Rate of NIO, 5000 inserts / commit on every insert", rates);
-
-
- }
-
- public double[] internalTestStorage(final JournalType journalType,
- final long numberOfMessages,
- final int transInterval,
- final int numberOfThreads) throws Exception
- {
- FileConfiguration configuration = new FileConfiguration();
-
- configuration.start();
-
- deleteDirectory(new File(configuration.getBindingsDirectory()));
- deleteDirectory(new File(configuration.getJournalDirectory()));
-
- configuration.setJournalType(journalType);
-
- final JournalStorageManager journal = new JournalStorageManager(configuration);
- journal.start();
-
- FakePostOffice office = new FakePostOffice();
-
- HashMap<Long, Queue> queues = new HashMap<Long, Queue>();
-
- journal.loadMessages(office, queues);
-
- final byte[] bytes = new byte[900];
-
- for (int i=0;i<bytes.length;i++)
- {
- bytes[i] = (byte)('a' + (i%20));
- }
-
-
- final BufferWrapper buffer = new BufferWrapper(1024);
- buffer.putBytes(bytes);
-
- final AtomicLong transactionGenerator = new AtomicLong(1);
-
- class LocalThread extends Thread
- {
- int id;
- int commits = 1;
- Exception e;
- long totalTime = 0;
- public LocalThread(int id)
- {
- super("LocalThread:" + id);
- this.id = id;
- }
-
- public void run()
- {
- try
- {
- long start = System.currentTimeMillis();
-
- long trans = transactionGenerator.incrementAndGet();
- boolean commitPending=false;
- for (long i=1;i<=numberOfMessages;i++)
- {
-
- final SimpleString address = new SimpleString("Destination " + i);
-
-
- ServerMessageImpl implMsg = new ServerMessageImpl(/* type */ 1, /* durable */ true, /* expiration */ 0,
- /* timestamp */ 0, /* priority */(byte)0);
-
- implMsg.putStringProperty(new SimpleString("Key"), new SimpleString("This String is worthless!"));
-
- implMsg.setMessageID(i);
- implMsg.setBody(buffer);
-
- implMsg.setDestination(address);
-
-
-
- journal.storeMessageTransactional(trans, implMsg);
-
- commitPending = true;
-
- if (transInterval>0 && i%transInterval == 0)
- {
- journal.commit(trans);
- commits ++;
- trans = transactionGenerator.incrementAndGet();
- commitPending = false;
- }
- }
-
- if (commitPending) journal.commit(trans);
-
-
- long end = System.currentTimeMillis();
-
- totalTime = end - start;
- }
- catch (Exception e)
- {
- log.error(e.getMessage(), e);
- this.e = e;
- }
- }
- }
-
- try
- {
- LocalThread[] threads = new LocalThread[numberOfThreads];
-
-
- for (int i = 0; i < numberOfThreads; i++)
- {
- threads[i] = new LocalThread(i);
- }
-
- for (int i = 0; i < numberOfThreads; i++)
- {
- threads[i].start();
- }
-
- for (int i = 0; i < numberOfThreads; i++)
- {
- threads[i].join();
- }
-
- for (int i = 0; i < numberOfThreads; i++)
- {
- if (threads[i].e != null)
- {
- throw threads[i].e;
- }
- }
-
- double rates[] = new double[numberOfThreads];
-
- for (int i=0; i<numberOfThreads; i++)
- {
- rates[i] = (numberOfMessages + threads[i].commits) * 1000 / threads[i].totalTime;
- }
-
- return rates;
- }
- finally
- {
- journal.stop();
- }
-
- }
-
-
- private void printRates(String msg, double rate)
- {
- printRates(msg, new double[] { rate });
- }
- private void printRates(String msg, double[] rates)
- {
- double rate = 0;
-
- log.info("*************************************************************************");
- log.info(" " + msg + " ");
-
- double totalRate = 0;
- for (int i=0; i<rates.length; i++)
- {
- rate = rates[i];
- totalRate += rate;
- if (rates.length>1)
- {
- log.info( " Thread " + i + ": = " + rate + " inserts/sec (including commits)");
- }
- }
-
- log.info( " Total rate : = " + totalRate + " inserts/sec (including commits)");
- log.info("*************************************************************************");
- }
-
-
-}
More information about the jboss-cvs-commits
mailing list