[jboss-cvs] JBoss Messaging SVN: r7181 - in trunk: examples/core/perf/server0 and 23 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Wed Jun 3 10:06:16 EDT 2009
Author: timfox
Date: 2009-06-03 10:06:16 -0400 (Wed, 03 Jun 2009)
New Revision: 7181
Modified:
trunk/examples/core/perf/perf.properties
trunk/examples/core/perf/server0/jbm-configuration.xml
trunk/examples/core/perf/src/org/jboss/core/example/PerfBase.java
trunk/src/config/common/schema/jbm-configuration.xsd
trunk/src/config/common/version.properties
trunk/src/main/org/jboss/messaging/core/asyncio/impl/TimedBuffer.java
trunk/src/main/org/jboss/messaging/core/config/Configuration.java
trunk/src/main/org/jboss/messaging/core/config/impl/ConfigurationImpl.java
trunk/src/main/org/jboss/messaging/core/config/impl/FileConfiguration.java
trunk/src/main/org/jboss/messaging/core/journal/Journal.java
trunk/src/main/org/jboss/messaging/core/journal/SequentialFile.java
trunk/src/main/org/jboss/messaging/core/journal/TestableJournal.java
trunk/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFile.java
trunk/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFileFactory.java
trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java
trunk/src/main/org/jboss/messaging/core/management/MessagingServerControlMBean.java
trunk/src/main/org/jboss/messaging/core/management/impl/MessagingServerControl.java
trunk/src/main/org/jboss/messaging/core/management/jmx/impl/ReplicationAwareMessagingServerControlWrapper.java
trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java
trunk/tests/src/org/jboss/messaging/tests/integration/journal/RealAIOJournalImplTest.java
trunk/tests/src/org/jboss/messaging/tests/integration/management/MessagingServerControlTest.java
trunk/tests/src/org/jboss/messaging/tests/integration/management/MessagingServerControlUsingCoreTest.java
trunk/tests/src/org/jboss/messaging/tests/integration/remoting/SynchronousCloseTest.java
trunk/tests/src/org/jboss/messaging/tests/performance/journal/JournalImplTestUnit.java
trunk/tests/src/org/jboss/messaging/tests/stress/journal/AddAndRemoveStressTest.java
trunk/tests/src/org/jboss/messaging/tests/stress/journal/remote/RemoteJournalAppender.java
trunk/tests/src/org/jboss/messaging/tests/timing/core/journal/impl/JournalImplTestUnit.java
trunk/tests/src/org/jboss/messaging/tests/unit/core/config/impl/DefaultsFileConfigurationTest.java
trunk/tests/src/org/jboss/messaging/tests/unit/core/config/impl/FileConfigurationTest.java
trunk/tests/src/org/jboss/messaging/tests/unit/core/deployers/impl/QueueDeployerTest.java
trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/AlignedJournalImplTest.java
trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalAsyncTest.java
trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalImplTestBase.java
trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalImplTestUnit.java
trunk/tests/src/org/jboss/messaging/tests/util/JournalExample.java
trunk/tests/src/org/jboss/messaging/tests/util/ListJournal.java
Log:
more fixes to AIO
Modified: trunk/examples/core/perf/perf.properties
===================================================================
--- trunk/examples/core/perf/perf.properties 2009-06-03 13:22:52 UTC (rev 7180)
+++ trunk/examples/core/perf/perf.properties 2009-06-03 14:06:16 UTC (rev 7181)
@@ -1,10 +1,10 @@
-num-messages=500000
-num-warmup-messages=50000
+num-messages=100000
+num-warmup-messages=0
message-size=1000
-durable=false
+durable=true
transacted=false
batch-size=1000
-drain-queue=true
+drain-queue=false
queue-name=perfQueue
throttle-rate=-1
address=perfAddress
@@ -14,6 +14,6 @@
tcp-buffer=1048576
tcp-no-delay=false
send-window=1048576
-pre-ack=true
+pre-ack=false
block-ack=true
block-persistent=true
Modified: trunk/examples/core/perf/server0/jbm-configuration.xml
===================================================================
--- trunk/examples/core/perf/server0/jbm-configuration.xml 2009-06-03 13:22:52 UTC (rev 7180)
+++ trunk/examples/core/perf/server0/jbm-configuration.xml 2009-06-03 14:06:16 UTC (rev 7181)
@@ -15,7 +15,13 @@
<security-enabled>false</security-enabled>
- <persistence-enabled>false</persistence-enabled>
+ <persistence-enabled>true</persistence-enabled>
+
+ <journal-sync-non-transactional>true</journal-sync-non-transactional>
+ <journal-type>ASYNCIO</journal-type>
+ <journal-aio-buffer-timeout>1</journal-aio-buffer-timeout>
+ <journal-min-files>10</journal-min-files>
+
<queues>
<queue name="perfQueue">
Modified: trunk/examples/core/perf/src/org/jboss/core/example/PerfBase.java
===================================================================
--- trunk/examples/core/perf/src/org/jboss/core/example/PerfBase.java 2009-06-03 13:22:52 UTC (rev 7180)
+++ trunk/examples/core/perf/src/org/jboss/core/example/PerfBase.java 2009-06-03 14:06:16 UTC (rev 7181)
@@ -384,6 +384,8 @@
double duration = (1.0 * System.currentTimeMillis() - start) / 1000;
log.info(String.format("sent %6d messages in %2.2fs", i, duration));
}
+
+ // log.info("sent message " + i);
if (tbl != null)
{
Modified: trunk/src/config/common/schema/jbm-configuration.xsd
===================================================================
--- trunk/src/config/common/schema/jbm-configuration.xsd 2009-06-03 13:22:52 UTC (rev 7180)
+++ trunk/src/config/common/schema/jbm-configuration.xsd 2009-06-03 14:06:16 UTC (rev 7181)
@@ -194,10 +194,7 @@
</xsd:element>
<xsd:element name="journal-aio-buffer-size"
type="xsd:long" maxOccurs="1" minOccurs="0">
- </xsd:element>
- <xsd:element name="journal-aio-flush-on-sync"
- type="xsd:boolean" maxOccurs="1" minOccurs="0">
- </xsd:element>
+ </xsd:element>
<xsd:element name="journal-sync-transactional"
type="xsd:boolean" maxOccurs="1" minOccurs="0">
</xsd:element>
Modified: trunk/src/config/common/version.properties
===================================================================
--- trunk/src/config/common/version.properties 2009-06-03 13:22:52 UTC (rev 7180)
+++ trunk/src/config/common/version.properties 2009-06-03 14:06:16 UTC (rev 7181)
@@ -1,7 +1,7 @@
-messaging.version.versionName=Stilton
+messaging.version.versionName=tadpole
messaging.version.majorVersion=2
messaging.version.minorVersion=0
messaging.version.microVersion=0
-messaging.version.incrementingVersion=101
+messaging.version.incrementingVersion=102
messaging.version.versionSuffix=BETA1-SNAPSHOT
messaging.version.versionTag=beta1
\ No newline at end of file
Modified: trunk/src/main/org/jboss/messaging/core/asyncio/impl/TimedBuffer.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/asyncio/impl/TimedBuffer.java 2009-06-03 13:22:52 UTC (rev 7180)
+++ trunk/src/main/org/jboss/messaging/core/asyncio/impl/TimedBuffer.java 2009-06-03 14:06:16 UTC (rev 7181)
@@ -25,17 +25,13 @@
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.jboss.messaging.core.asyncio.AIOCallback;
import org.jboss.messaging.core.logging.Logger;
-import org.jboss.messaging.utils.JBMThreadFactory;
+import org.jboss.messaging.utils.TokenBucketLimiter;
+import org.jboss.messaging.utils.TokenBucketLimiterImpl;
/**
* A TimedBuffer
@@ -54,10 +50,8 @@
private final TimedBufferObserver bufferObserver;
- private final CheckTimer timerRunnable = new CheckTimer();
+ private CheckTimer timerRunnable = new CheckTimer();
- private volatile ScheduledFuture<?> futureTimerRunnable;
-
private final long timeout;
private final int bufferSize;
@@ -68,14 +62,16 @@
private final Lock lock = new ReentrantReadWriteLock().writeLock();
- private final ScheduledExecutorService schedule = ScheduledSingleton.getScheduledService();
-
// used to measure inactivity. This buffer will be automatically flushed when more than timeout inactive
private volatile long timeLastAdd = 0;
// used to measure sync requests. When a sync is requested, it shouldn't take more than timeout to happen
private volatile long timeLastSync = 0;
+ private Thread timerThread;
+
+ private boolean started;
+
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
@@ -86,12 +82,55 @@
{
bufferSize = size;
this.bufferObserver = bufferObserver;
- this.timeout = timeout;
+ this.timeout = timeout;
currentBuffer = ByteBuffer.wrap(new byte[bufferSize]);
currentBuffer.limit(0);
- callbacks = new ArrayList<AIOCallback>();
+ callbacks = new ArrayList<AIOCallback>();
}
+
+ public synchronized void start()
+ {
+ if (started)
+ {
+ return;
+ }
+
+ timerRunnable = new CheckTimer();
+
+ timerThread = new Thread(timerRunnable, "jbm-aio-timer");
+ timerThread.start();
+
+ started = true;
+
+ log.info("started timed buffer");
+ }
+
+ public synchronized void stop()
+ {
+ if (!started)
+ {
+ return;
+ }
+
+ timerRunnable.close();
+
+ while (timerThread.isAlive())
+ {
+ try
+ {
+ timerThread.join();
+ }
+ catch (InterruptedException e)
+ {
+ }
+ }
+
+ started = false;
+
+ log.info("stopped timedbuffer");
+ }
+
public void lock()
{
lock.lock();
@@ -136,31 +175,26 @@
{
return true;
}
-
}
public synchronized void addBytes(final ByteBuffer bytes, final boolean sync, final AIOCallback callback)
{
- timeLastAdd = System.currentTimeMillis();
-
-
+ long now = System.nanoTime();
+
+ timeLastAdd = now;
+
if (sync)
{
// We should flush on the next timeout, no matter what other activity happens on the buffer
if (timeLastSync == 0)
{
- timeLastSync = System.currentTimeMillis();
+ timeLastSync = now;
}
}
currentBuffer.put(bytes);
callbacks.add(callback);
- if (futureTimerRunnable == null)
- {
- futureTimerRunnable = schedule.scheduleAtFixedRate(timerRunnable, timeout, timeout, TimeUnit.MILLISECONDS);
- }
-
if (currentBuffer.position() == currentBuffer.capacity())
{
flush();
@@ -181,18 +215,11 @@
callbacks = new ArrayList<AIOCallback>();
- }
+ timeLastAdd = 0;
+ timeLastSync = 0;
- if (futureTimerRunnable != null)
- {
- futureTimerRunnable.cancel(false);
- futureTimerRunnable = null;
+ currentBuffer.limit(0);
}
-
- timeLastAdd = 0;
- timeLastSync = 0;
-
- currentBuffer.limit(0);
}
// Package protected ---------------------------------------------
@@ -203,15 +230,16 @@
private void checkTimer()
{
- final long now = System.currentTimeMillis();
+ final long now = System.nanoTime();
// if inactive for more than the timeout
// of if a sync happened at more than the the timeout ago
- if (now - timeLastAdd >= timeout || timeLastSync != 0 && now - timeLastSync >= timeout)
+ if (timeLastAdd != 0 && now - timeLastAdd >= timeout || timeLastSync != 0 && now - timeLastSync >= timeout)
{
lock.lock();
try
{
+ // log.info("** flushing because of timer");
flush();
}
finally
@@ -223,29 +251,36 @@
// Inner classes -------------------------------------------------
- class CheckTimer implements Runnable
+ private class CheckTimer implements Runnable
{
- public void run()
- {
- checkTimer();
- }
- }
+ private volatile boolean closed = false;
- // TODO: is there a better place to get this schedule service from?
- static class ScheduledSingleton
- {
- private static ScheduledExecutorService scheduleService;
+ private TokenBucketLimiter limiter = new TokenBucketLimiterImpl(4000, false);
- private static synchronized ScheduledExecutorService getScheduledService()
+ public void run()
{
- if (scheduleService == null)
+ while (!closed)
{
- ThreadFactory factory = new JBMThreadFactory("JBM-buffer-scheduled-control", true);
+ // log.info(System.identityHashCode(this) + " firing");
+ checkTimer();
- scheduleService = Executors.newSingleThreadScheduledExecutor(factory);
+ // try
+ // {
+ // Thread.sleep(1);
+ // }
+ // catch (InterruptedException ignore)
+ // {
+ // }
+
+ // limiter.limit();
+
+ Thread.yield();
}
+ }
- return scheduleService;
+ public void close()
+ {
+ closed = true;
}
}
Modified: trunk/src/main/org/jboss/messaging/core/config/Configuration.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/config/Configuration.java 2009-06-03 13:22:52 UTC (rev 7180)
+++ trunk/src/main/org/jboss/messaging/core/config/Configuration.java 2009-06-03 14:06:16 UTC (rev 7181)
@@ -200,14 +200,10 @@
int getAIOBufferSize();
- void setAIOBufferTimeout(int timeout);
+ void setAIOBufferTimeout(long timeout);
- int getAIOBufferTimeout();
+ long getAIOBufferTimeout();
- void setAIOFlushOnSync(boolean flush);
-
- boolean isAIOFlushOnSync();
-
boolean isCreateBindingsDir();
void setCreateBindingsDir(boolean create);
Modified: trunk/src/main/org/jboss/messaging/core/config/impl/ConfigurationImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/config/impl/ConfigurationImpl.java 2009-06-03 13:22:52 UTC (rev 7180)
+++ trunk/src/main/org/jboss/messaging/core/config/impl/ConfigurationImpl.java 2009-06-03 14:06:16 UTC (rev 7181)
@@ -97,10 +97,8 @@
public static final int DEFAULT_JOURNAL_MAX_AIO = 500;
- public static final boolean DEFAULT_JOURNAL_AIO_FLUSH_SYNC = false;
+ public static final long DEFAULT_JOURNAL_AIO_BUFFER_TIMEOUT = 500;
- public static final int DEFAULT_JOURNAL_AIO_BUFFER_TIMEOUT = 1;
-
public static final int DEFAULT_JOURNAL_AIO_BUFFER_SIZE = 128 * 1024;
public static final boolean DEFAULT_WILDCARD_ROUTING_ENABLED = true;
@@ -238,11 +236,9 @@
protected int journalMinFiles = DEFAULT_JOURNAL_MIN_FILES;
protected int journalMaxAIO = DEFAULT_JOURNAL_MAX_AIO;
+
+ protected long journalAIOBufferTimeout = DEFAULT_JOURNAL_AIO_BUFFER_TIMEOUT;
- protected boolean journalAIOFlushSync = DEFAULT_JOURNAL_AIO_FLUSH_SYNC;
-
- protected int journalAIOBufferTimeout = DEFAULT_JOURNAL_AIO_BUFFER_TIMEOUT;
-
protected int journalAIOBufferSize = DEFAULT_JOURNAL_AIO_BUFFER_SIZE;
protected boolean wildcardRoutingEnabled = DEFAULT_WILDCARD_ROUTING_ENABLED;
@@ -708,27 +704,16 @@
jmxManagementEnabled = enabled;
}
-
- public void setAIOBufferTimeout(int timeout)
+ public void setAIOBufferTimeout(long timeout)
{
this.journalAIOBufferTimeout = timeout;
}
- public int getAIOBufferTimeout()
+ public long getAIOBufferTimeout()
{
return journalAIOBufferTimeout;
}
- public void setAIOFlushOnSync(boolean flush)
- {
- journalAIOFlushSync = flush;
- }
-
- public boolean isAIOFlushOnSync()
- {
- return journalAIOFlushSync;
- }
-
public int getAIOBufferSize()
{
return journalAIOBufferSize;
Modified: trunk/src/main/org/jboss/messaging/core/config/impl/FileConfiguration.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/config/impl/FileConfiguration.java 2009-06-03 13:22:52 UTC (rev 7180)
+++ trunk/src/main/org/jboss/messaging/core/config/impl/FileConfiguration.java 2009-06-03 14:06:16 UTC (rev 7181)
@@ -297,11 +297,9 @@
journalSyncNonTransactional = getBoolean(e, "journal-sync-non-transactional", journalSyncNonTransactional);
journalFileSize = getInteger(e, "journal-file-size", journalFileSize);
+
+ journalAIOBufferTimeout = getLong(e, "journal-aio-buffer-timeout", DEFAULT_JOURNAL_AIO_BUFFER_TIMEOUT);
- journalAIOFlushSync = getBoolean(e, "journal-aio-flush-on-sync", DEFAULT_JOURNAL_AIO_FLUSH_SYNC);
-
- journalAIOBufferTimeout = getInteger(e, "journal-aio-buffer-timeout", DEFAULT_JOURNAL_AIO_BUFFER_TIMEOUT);
-
journalAIOBufferSize = getInteger(e, "journal-aio-buffer-size", DEFAULT_JOURNAL_AIO_BUFFER_SIZE);
journalMinFiles = getInteger(e, "journal-min-files", journalMinFiles);
Modified: trunk/src/main/org/jboss/messaging/core/journal/Journal.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/Journal.java 2009-06-03 13:22:52 UTC (rev 7180)
+++ trunk/src/main/org/jboss/messaging/core/journal/Journal.java 2009-06-03 14:06:16 UTC (rev 7181)
@@ -38,35 +38,33 @@
{
// Non transactional operations
- void appendAddRecord(long id, byte recordType, byte[] record) throws Exception;
+ void appendAddRecord(long id, byte recordType, byte[] record, boolean sync) throws Exception;
- void appendAddRecord(long id, byte recordType, EncodingSupport record) throws Exception;
-
void appendAddRecord(long id, byte recordType, EncodingSupport record, boolean sync) throws Exception;
+
+ void appendUpdateRecord(long id, byte recordType, byte[] record, boolean sync) throws Exception;
- void appendUpdateRecord(long id, byte recordType, byte[] record) throws Exception;
+ void appendUpdateRecord(long id, byte recordType, EncodingSupport record, boolean sync) throws Exception;
- void appendUpdateRecord(long id, byte recordType, EncodingSupport record) throws Exception;
+ void appendDeleteRecord(long id, boolean sync) throws Exception;
- void appendDeleteRecord(long id) throws Exception;
-
// Transactional operations
- void appendAddRecordTransactional(long txID, long id, byte recordType, byte[] record) throws Exception;
+ void appendAddRecordTransactional(long txID, long id, byte recordType, byte[] record, boolean sync) throws Exception;
- void appendAddRecordTransactional(long txID, long id, byte recordType, EncodingSupport record) throws Exception;
+ void appendAddRecordTransactional(long txID, long id, byte recordType, EncodingSupport record, boolean sync) throws Exception;
- void appendUpdateRecordTransactional(long txID, long id, byte recordType, byte[] record) throws Exception;
+ void appendUpdateRecordTransactional(long txID, long id, byte recordType, byte[] record, boolean sync) throws Exception;
- void appendUpdateRecordTransactional(long txID, long id, byte recordType, EncodingSupport record) throws Exception;
+ void appendUpdateRecordTransactional(long txID, long id, byte recordType, EncodingSupport record, boolean sync) throws Exception;
- void appendDeleteRecordTransactional(long txID, long id, byte[] record) throws Exception;
+ void appendDeleteRecordTransactional(long txID, long id, byte[] record, boolean sync) throws Exception;
- void appendDeleteRecordTransactional(long txID, long id, EncodingSupport record) throws Exception;
+ void appendDeleteRecordTransactional(long txID, long id, EncodingSupport record, boolean sync) throws Exception;
- void appendDeleteRecordTransactional(long txID, long id) throws Exception;
+ void appendDeleteRecordTransactional(long txID, long id, boolean sync) throws Exception;
- void appendCommitRecord(long txID) throws Exception;
+ void appendCommitRecord(long txID, boolean sync) throws Exception;
/**
*
@@ -79,9 +77,9 @@
* @param transactionData - extra user data for the prepare
* @throws Exception
*/
- void appendPrepareRecord(long txID, EncodingSupport transactionData) throws Exception;
+ void appendPrepareRecord(long txID, EncodingSupport transactionData, boolean sync) throws Exception;
- void appendRollbackRecord(long txID) throws Exception;
+ void appendRollbackRecord(long txID, boolean sync) throws Exception;
// Load
Modified: trunk/src/main/org/jboss/messaging/core/journal/SequentialFile.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/SequentialFile.java 2009-06-03 13:22:52 UTC (rev 7180)
+++ trunk/src/main/org/jboss/messaging/core/journal/SequentialFile.java 2009-06-03 14:06:16 UTC (rev 7181)
@@ -80,8 +80,6 @@
long size() throws Exception;
- void flush();
-
void renameTo(String newFileName) throws Exception;
void lockBuffer();
Modified: trunk/src/main/org/jboss/messaging/core/journal/TestableJournal.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/TestableJournal.java 2009-06-03 13:22:52 UTC (rev 7180)
+++ trunk/src/main/org/jboss/messaging/core/journal/TestableJournal.java 2009-06-03 14:06:16 UTC (rev 7181)
@@ -50,9 +50,9 @@
int getMinFiles();
- boolean isSyncTransactional();
+ // boolean isSyncTransactional();
- boolean isSyncNonTransactional();
+ // boolean isSyncNonTransactional();
String getFilePrefix();
Modified: trunk/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFile.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFile.java 2009-06-03 13:22:52 UTC (rev 7180)
+++ trunk/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFile.java 2009-06-03 14:06:16 UTC (rev 7181)
@@ -35,7 +35,6 @@
import org.jboss.messaging.core.asyncio.impl.AsynchronousFileImpl;
import org.jboss.messaging.core.asyncio.impl.TimedBuffer;
import org.jboss.messaging.core.asyncio.impl.TimedBufferObserver;
-import org.jboss.messaging.core.exception.MessagingException;
import org.jboss.messaging.core.journal.BufferCallback;
import org.jboss.messaging.core.journal.IOCallback;
import org.jboss.messaging.core.journal.SequentialFile;
@@ -71,7 +70,7 @@
private final TimedBuffer timedBuffer;
- private BufferCallback bufferCallback;
+ private final BufferCallback bufferCallback;
private boolean buffering = true;
@@ -86,7 +85,7 @@
public AIOSequentialFile(final SequentialFileFactory factory,
final int bufferSize,
- final int bufferTimeoutMilliseconds,
+ final long bufferTimeoutMilliseconds,
final String journalDir,
final String fileName,
final int maxIO,
@@ -130,11 +129,6 @@
return timedBuffer.checkSize(size);
}
- public void flush()
- {
- timedBuffer.flush();
- }
-
public void lockBuffer()
{
timedBuffer.lock();
@@ -151,7 +145,8 @@
opened = false;
timedBuffer.flush();
-
+ timedBuffer.stop();
+
final CountDownLatch donelatch = new CountDownLatch(1);
executor.execute(new Runnable()
@@ -238,8 +233,8 @@
}
public void open() throws Exception
- {
- open(maxIO);
+ {
+ open(maxIO);
}
/* (non-Javadoc)
@@ -253,13 +248,13 @@
public synchronized void open(final int currentMaxIO) throws Exception
{
+ timedBuffer.start();
opened = true;
aioFile = newFile();
aioFile.open(journalDir + "/" + fileName, currentMaxIO);
position.set(0);
aioFile.setBufferCallback(bufferCallback);
this.fileSize = aioFile.size();
-
}
public void setBufferCallback(final BufferCallback callback)
Modified: trunk/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFileFactory.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFileFactory.java 2009-06-03 13:22:52 UTC (rev 7180)
+++ trunk/src/main/org/jboss/messaging/core/journal/impl/AIOSequentialFileFactory.java 2009-06-03 14:06:16 UTC (rev 7181)
@@ -42,15 +42,11 @@
*
*/
public class AIOSequentialFileFactory extends AbstractSequentialFactory
-{
-
-
-
+{
private static final Logger log = Logger.getLogger(AIOSequentialFileFactory.class);
private static final boolean trace = log.isTraceEnabled();
-
-
+
private final ReuseBuffersController buffersControl = new ReuseBuffersController();
// This method exists just to make debug easier.
@@ -68,18 +64,17 @@
private final Executor pollerExecutor = Executors.newCachedThreadPool(new JBMThreadFactory("JBM-AIO-poller-pool" + System.identityHashCode(this), true));
-
- final int bufferSize;
+ private final int bufferSize;
- final int bufferTimeout;
+ private final long bufferTimeout;
public AIOSequentialFileFactory(final String journalDir)
{
this(journalDir, ConfigurationImpl.DEFAULT_JOURNAL_AIO_BUFFER_SIZE, ConfigurationImpl.DEFAULT_JOURNAL_AIO_BUFFER_TIMEOUT);
}
- public AIOSequentialFileFactory(final String journalDir, int bufferSize, int bufferTimeout)
+ public AIOSequentialFileFactory(final String journalDir, int bufferSize, long bufferTimeout)
{
super(journalDir);
this.bufferSize = bufferSize;
Modified: trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java 2009-06-03 13:22:52 UTC (rev 7180)
+++ trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java 2009-06-03 14:06:16 UTC (rev 7181)
@@ -171,13 +171,6 @@
private final int minFiles;
- private final boolean syncTransactional;
-
- private final boolean syncNonTransactional;
-
- // used on AIO
- private final boolean flushOnSync;
-
private final SequentialFileFactory fileFactory;
public final String filePrefix;
@@ -210,9 +203,6 @@
public JournalImpl(final int fileSize,
final int minFiles,
- final boolean syncTransactional,
- final boolean syncNonTransactional,
- final boolean flushOnSync,
final SequentialFileFactory fileFactory,
final String filePrefix,
final String fileExtension,
@@ -253,12 +243,6 @@
this.minFiles = minFiles;
- this.syncTransactional = syncTransactional;
-
- this.syncNonTransactional = syncNonTransactional;
-
- this.flushOnSync = flushOnSync;
-
this.fileFactory = fileFactory;
this.filePrefix = filePrefix;
@@ -271,16 +255,11 @@
// Journal implementation
// ----------------------------------------------------------------
- public void appendAddRecord(final long id, final byte recordType, final byte[] record) throws Exception
+ public void appendAddRecord(final long id, final byte recordType, final byte[] record, final boolean sync) throws Exception
{
- appendAddRecord(id, recordType, new ByteArrayEncoding(record), syncNonTransactional);
+ appendAddRecord(id, recordType, new ByteArrayEncoding(record), sync);
}
- public void appendAddRecord(final long id, final byte recordType, final EncodingSupport record) throws Exception
- {
- appendAddRecord(id, recordType, record, syncNonTransactional);
- }
-
public void appendAddRecord(final long id, final byte recordType, final EncodingSupport record, final boolean sync) throws Exception
{
if (state != STATE_LOADED)
@@ -322,12 +301,12 @@
}
}
- public void appendUpdateRecord(final long id, final byte recordType, final byte[] record) throws Exception
+ public void appendUpdateRecord(final long id, final byte recordType, final byte[] record, final boolean sync) throws Exception
{
- appendUpdateRecord(id, recordType, new ByteArrayEncoding(record));
+ appendUpdateRecord(id, recordType, new ByteArrayEncoding(record), sync);
}
- public void appendUpdateRecord(final long id, final byte recordType, final EncodingSupport record) throws Exception
+ public void appendUpdateRecord(final long id, final byte recordType, final EncodingSupport record, final boolean sync) throws Exception
{
if (state != STATE_LOADED)
{
@@ -354,12 +333,12 @@
bb.writeInt(size);
- IOCallback callback = getSyncCallback(syncNonTransactional);
+ IOCallback callback = getSyncCallback(sync);
lock.lock();
try
{
- JournalFile usedFile = appendRecord(bb.toByteBuffer(), syncNonTransactional, callback);
+ JournalFile usedFile = appendRecord(bb.toByteBuffer(), sync, callback);
posFiles.addUpdateFile(usedFile);
}
@@ -374,7 +353,7 @@
}
}
- public void appendDeleteRecord(final long id) throws Exception
+ public void appendDeleteRecord(final long id, final boolean sync) throws Exception
{
if (state != STATE_LOADED)
{
@@ -397,12 +376,12 @@
bb.putLong(id);
bb.putInt(size);
- IOCallback callback = getSyncCallback(syncNonTransactional);
+ IOCallback callback = getSyncCallback(sync);
lock.lock();
try
{
- JournalFile usedFile = appendRecord(bb, syncNonTransactional, callback);
+ JournalFile usedFile = appendRecord(bb, sync, callback);
posFiles.addDelete(usedFile);
}
@@ -417,16 +396,18 @@
}
}
- public void appendAddRecordTransactional(final long txID, final long id, final byte recordType, final byte[] record) throws Exception
+ public void appendAddRecordTransactional(final long txID, final long id, final byte recordType, final byte[] record,
+ final boolean sync) throws Exception
{
- appendAddRecordTransactional(txID, id, recordType, new ByteArrayEncoding(record));
+ appendAddRecordTransactional(txID, id, recordType, new ByteArrayEncoding(record), sync);
}
public void appendAddRecordTransactional(final long txID,
final long id,
final byte recordType,
- final EncodingSupport record) throws Exception
+ final EncodingSupport record,
+ final boolean sync) throws Exception
{
if (state != STATE_LOADED)
{
@@ -451,7 +432,7 @@
lock.lock();
try
{
- JournalFile usedFile = appendRecord(bb.toByteBuffer(), false, getTransactionCallback(txID));
+ JournalFile usedFile = appendRecord(bb.toByteBuffer(), false, getTransactionCallback(txID, sync));
JournalTransaction tx = getTransactionInfo(txID);
@@ -466,15 +447,17 @@
public void appendUpdateRecordTransactional(final long txID,
final long id,
final byte recordType,
- final byte[] record) throws Exception
+ final byte[] record,
+ final boolean sync) throws Exception
{
- appendUpdateRecordTransactional(txID, id, recordType, new ByteArrayEncoding(record));
+ appendUpdateRecordTransactional(txID, id, recordType, new ByteArrayEncoding(record), sync);
}
public void appendUpdateRecordTransactional(final long txID,
final long id,
final byte recordType,
- final EncodingSupport record) throws Exception
+ final EncodingSupport record,
+ final boolean sync) throws Exception
{
if (state != STATE_LOADED)
{
@@ -497,7 +480,7 @@
lock.lock();
try
{
- JournalFile usedFile = appendRecord(bb.toByteBuffer(), false, getTransactionCallback(txID));
+ JournalFile usedFile = appendRecord(bb.toByteBuffer(), false, getTransactionCallback(txID, sync));
JournalTransaction tx = getTransactionInfo(txID);
@@ -509,12 +492,14 @@
}
}
- public void appendDeleteRecordTransactional(final long txID, final long id, final byte[] record) throws Exception
+ public void appendDeleteRecordTransactional(final long txID, final long id, final byte[] record,
+ final boolean sync) throws Exception
{
- appendDeleteRecordTransactional(txID, id, new ByteArrayEncoding(record));
+ appendDeleteRecordTransactional(txID, id, new ByteArrayEncoding(record), sync);
}
- public void appendDeleteRecordTransactional(final long txID, final long id, final EncodingSupport record) throws Exception
+ public void appendDeleteRecordTransactional(final long txID, final long id, final EncodingSupport record,
+ final boolean sync) throws Exception
{
if (state != STATE_LOADED)
{
@@ -539,7 +524,7 @@
lock.lock();
try
{
- JournalFile usedFile = appendRecord(bb.toByteBuffer(), false, getTransactionCallback(txID));
+ JournalFile usedFile = appendRecord(bb.toByteBuffer(), false, getTransactionCallback(txID, sync));
JournalTransaction tx = getTransactionInfo(txID);
@@ -551,7 +536,8 @@
}
}
- public void appendDeleteRecordTransactional(final long txID, final long id) throws Exception
+ public void appendDeleteRecordTransactional(final long txID, final long id,
+ final boolean sync) throws Exception
{
if (state != STATE_LOADED)
{
@@ -572,7 +558,7 @@
lock.lock();
try
{
- JournalFile usedFile = appendRecord(bb.toByteBuffer(), false, getTransactionCallback(txID));
+ JournalFile usedFile = appendRecord(bb.toByteBuffer(), false, getTransactionCallback(txID, sync));
JournalTransaction tx = getTransactionInfo(txID);
@@ -597,7 +583,7 @@
* @param transactionData - extra user data for the prepare
* @throws Exception
*/
- public void appendPrepareRecord(final long txID, final EncodingSupport transactionData) throws Exception
+ public void appendPrepareRecord(final long txID, final EncodingSupport transactionData, final boolean sync) throws Exception
{
if (state != STATE_LOADED)
{
@@ -608,12 +594,12 @@
ByteBuffer bb = writeTransaction(PREPARE_RECORD, txID, tx, transactionData);
- IOCallback callback = getTransactionCallback(txID);
+ IOCallback callback = getTransactionCallback(txID, sync);
lock.lock();
try
{
- JournalFile usedFile = appendRecord(bb, syncTransactional, callback);
+ JournalFile usedFile = appendRecord(bb, sync, callback);
tx.prepare(usedFile);
}
@@ -646,7 +632,7 @@
*
* @see JournalImpl#writeTransaction(byte, long, org.jboss.messaging.core.journal.impl.JournalImpl.JournalTransaction, EncodingSupport)
*/
- public void appendCommitRecord(final long txID) throws Exception
+ public void appendCommitRecord(final long txID, final boolean sync) throws Exception
{
if (state != STATE_LOADED)
{
@@ -662,12 +648,12 @@
ByteBuffer bb = writeTransaction(COMMIT_RECORD, txID, tx, null);
- IOCallback callback = getTransactionCallback(txID);
+ IOCallback callback = getTransactionCallback(txID, sync);
lock.lock();
try
{
- JournalFile usedFile = appendRecord(bb, syncTransactional, callback);
+ JournalFile usedFile = appendRecord(bb, sync, callback);
transactionCallbacks.remove(txID);
@@ -686,7 +672,7 @@
}
- public void appendRollbackRecord(final long txID) throws Exception
+ public void appendRollbackRecord(final long txID, final boolean sync) throws Exception
{
if (state != STATE_LOADED)
{
@@ -709,12 +695,12 @@
bb.putLong(txID);
bb.putInt(size);
- IOCallback callback = getTransactionCallback(txID);
+ IOCallback callback = getTransactionCallback(txID, sync);
lock.lock();
try
{
- JournalFile usedFile = appendRecord(bb, syncTransactional, callback);
+ JournalFile usedFile = appendRecord(bb, sync, callback);
transactionCallbacks.remove(txID);
@@ -1476,12 +1462,7 @@
/** Method for use on testcases.
* It will call waitComplete on every transaction, so any assertions on the file system will be correct after this */
public void debugWait() throws Exception
- {
- if (currentFile != null)
- {
- currentFile.getFile().flush();
- }
-
+ {
for (TransactionCallback callback : transactionCallbacks.values())
{
callback.waitCompletion();
@@ -1572,16 +1553,6 @@
return minFiles;
}
- public boolean isSyncTransactional()
- {
- return syncTransactional;
- }
-
- public boolean isSyncNonTransactional()
- {
- return syncNonTransactional;
- }
-
public String getFilePrefix()
{
return filePrefix;
@@ -2029,12 +2000,6 @@
if (callback != null)
{
currentFile.getFile().write(bb, sync, callback);
-
- // This is defaulted to false. The user is telling us to not wait the buffer timeout when a commit or sync is called
- if (flushOnSync && sync)
- {
- currentFile.getFile().flush();
- }
}
else
{
@@ -2268,11 +2233,9 @@
}
}
-
-
- private IOCallback getTransactionCallback(final long transactionId) throws MessagingException
+ private IOCallback getTransactionCallback(final long transactionId, final boolean sync) throws MessagingException
{
- if (fileFactory.isSupportsCallbacks() && syncTransactional)
+ if (sync && fileFactory.isSupportsCallbacks())
{
TransactionCallback callback = transactionCallbacks.get(transactionId);
Modified: trunk/src/main/org/jboss/messaging/core/management/MessagingServerControlMBean.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/management/MessagingServerControlMBean.java 2009-06-03 13:22:52 UTC (rev 7180)
+++ trunk/src/main/org/jboss/messaging/core/management/MessagingServerControlMBean.java 2009-06-03 14:06:16 UTC (rev 7181)
@@ -92,10 +92,8 @@
int getAIOBufferSize();
- int getAIOBufferTimeout();
+ long getAIOBufferTimeout();
- boolean isAIOFlushOnSync();
-
public long getPagingMaxGlobalSizeBytes();
public String getPagingDirectory();
Modified: trunk/src/main/org/jboss/messaging/core/management/impl/MessagingServerControl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/management/impl/MessagingServerControl.java 2009-06-03 13:22:52 UTC (rev 7180)
+++ trunk/src/main/org/jboss/messaging/core/management/impl/MessagingServerControl.java 2009-06-03 14:06:16 UTC (rev 7181)
@@ -22,6 +22,25 @@
package org.jboss.messaging.core.management.impl;
+import java.text.DateFormat;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.Map.Entry;
+
+import javax.management.ListenerNotFoundException;
+import javax.management.MBeanNotificationInfo;
+import javax.management.NotificationBroadcasterSupport;
+import javax.management.NotificationEmitter;
+import javax.management.NotificationFilter;
+import javax.management.NotificationListener;
+import javax.transaction.xa.Xid;
+
import org.jboss.messaging.core.config.Configuration;
import org.jboss.messaging.core.config.TransportConfiguration;
import org.jboss.messaging.core.exception.MessagingException;
@@ -40,24 +59,6 @@
import org.jboss.messaging.core.transaction.impl.XidImpl;
import org.jboss.messaging.utils.SimpleString;
-import javax.management.ListenerNotFoundException;
-import javax.management.MBeanNotificationInfo;
-import javax.management.NotificationBroadcasterSupport;
-import javax.management.NotificationEmitter;
-import javax.management.NotificationFilter;
-import javax.management.NotificationListener;
-import javax.transaction.xa.Xid;
-import java.text.DateFormat;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.Date;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-
/**
* @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
*
@@ -156,23 +157,16 @@
return configuration.getInterceptorClassNames().toArray(new String[configuration.getInterceptorClassNames().size()]);
}
-
-
public int getAIOBufferSize()
{
return configuration.getAIOBufferSize();
}
- public int getAIOBufferTimeout()
+ public long getAIOBufferTimeout()
{
return configuration.getAIOBufferTimeout();
}
- public boolean isAIOFlushOnSync()
- {
- return configuration.isAIOFlushOnSync();
- }
-
public String getJournalDirectory()
{
return configuration.getJournalDirectory();
Modified: trunk/src/main/org/jboss/messaging/core/management/jmx/impl/ReplicationAwareMessagingServerControlWrapper.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/management/jmx/impl/ReplicationAwareMessagingServerControlWrapper.java 2009-06-03 13:22:52 UTC (rev 7180)
+++ trunk/src/main/org/jboss/messaging/core/management/jmx/impl/ReplicationAwareMessagingServerControlWrapper.java 2009-06-03 14:06:16 UTC (rev 7181)
@@ -96,16 +96,11 @@
return localControl.getAIOBufferSize();
}
- public int getAIOBufferTimeout()
+ public long getAIOBufferTimeout()
{
return localControl.getAIOBufferTimeout();
}
- public boolean isAIOFlushOnSync()
- {
- return localControl.isAIOFlushOnSync();
- }
-
public String getJournalDirectory()
{
return localControl.getJournalDirectory();
Modified: trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java 2009-06-03 13:22:52 UTC (rev 7180)
+++ trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java 2009-06-03 14:06:16 UTC (rev 7181)
@@ -138,6 +138,10 @@
private final Executor executor;
+ private final boolean syncTransactional;
+
+ private final boolean syncNonTransactional;
+
public JournalStorageManager(final Configuration config, final Executor executor)
{
this.executor = executor;
@@ -158,7 +162,7 @@
SequentialFileFactory bindingsFF = new NIOSequentialFileFactory(bindingsDir);
- bindingsJournal = new JournalImpl(1024 * 1024, 2, true, true, false, bindingsFF, "jbm-bindings", "bindings", 1);
+ bindingsJournal = new JournalImpl(1024 * 1024, 2, bindingsFF, "jbm-bindings", "bindings", 1);
String journalDir = config.getJournalDirectory();
@@ -167,6 +171,10 @@
throw new NullPointerException("journal-dir is null");
}
+ syncNonTransactional = config.isJournalSyncNonTransactional();
+
+ syncTransactional = config.isJournalSyncTransactional();
+
checkAndCreateDir(journalDir, config.isCreateJournalDir());
SequentialFileFactory journalFF = null;
@@ -181,7 +189,9 @@
}
else
{
- journalFF = new AIOSequentialFileFactory(journalDir, config.getAIOBufferSize(), config.getAIOBufferTimeout());
+ journalFF = new AIOSequentialFileFactory(journalDir,
+ config.getAIOBufferSize(),
+ config.getAIOBufferTimeout());
log.info("AIO loaded successfully");
}
}
@@ -196,10 +206,7 @@
}
messageJournal = new JournalImpl(config.getJournalFileSize(),
- config.getJournalMinFiles(),
- config.isJournalSyncTransactional(),
- config.isJournalSyncNonTransactional(),
- config.isAIOFlushOnSync(),
+ config.getJournalMinFiles(),
journalFF,
"jbm-data",
"jbm",
@@ -254,37 +261,40 @@
// Non transactional operations
public void storeMessage(final ServerMessage message) throws Exception
- {
+ {
if (message.getMessageID() <= 0)
{
throw new MessagingException(MessagingException.ILLEGAL_STATE, "MessageId was not assigned to Message");
}
+ // Note that we don't sync, the add reference that comes immediately after will sync
+
if (message instanceof LargeServerMessage)
{
messageJournal.appendAddRecord(message.getMessageID(),
ADD_LARGE_MESSAGE,
- new LargeMessageEncoding((LargeServerMessage)message));
+ new LargeMessageEncoding((LargeServerMessage)message),
+ false);
}
else
{
- messageJournal.appendAddRecord(message.getMessageID(), ADD_MESSAGE, message);
+ messageJournal.appendAddRecord(message.getMessageID(), ADD_MESSAGE, message, false);
}
}
public void storeReference(final long queueID, final long messageID) throws Exception
{
- messageJournal.appendUpdateRecord(messageID, ADD_REF, new RefEncoding(queueID));
+ messageJournal.appendUpdateRecord(messageID, ADD_REF, new RefEncoding(queueID), syncNonTransactional);
}
public void storeAcknowledge(final long queueID, final long messageID) throws Exception
{
- messageJournal.appendUpdateRecord(messageID, ACKNOWLEDGE_REF, new RefEncoding(queueID));
+ messageJournal.appendUpdateRecord(messageID, ACKNOWLEDGE_REF, new RefEncoding(queueID), syncNonTransactional);
}
public void deleteMessage(final long messageID) throws Exception
{
- messageJournal.appendDeleteRecord(messageID);
+ messageJournal.appendDeleteRecord(messageID, syncNonTransactional);
}
public void updateScheduledDeliveryTime(final MessageReference ref) throws Exception
@@ -292,26 +302,29 @@
ScheduledDeliveryEncoding encoding = new ScheduledDeliveryEncoding(ref.getScheduledDeliveryTime(),
ref.getQueue().getPersistenceID());
- messageJournal.appendUpdateRecord(ref.getMessage().getMessageID(), SET_SCHEDULED_DELIVERY_TIME, encoding);
+ messageJournal.appendUpdateRecord(ref.getMessage().getMessageID(),
+ SET_SCHEDULED_DELIVERY_TIME,
+ encoding,
+ syncNonTransactional);
}
public void storeDuplicateID(final SimpleString address, final byte[] duplID, final long recordID) throws Exception
{
DuplicateIDEncoding encoding = new DuplicateIDEncoding(address, duplID);
- messageJournal.appendAddRecord(recordID, DUPLICATE_ID, encoding);
+ messageJournal.appendAddRecord(recordID, DUPLICATE_ID, encoding, syncNonTransactional);
}
public void updateDuplicateID(final SimpleString address, final byte[] duplID, final long recordID) throws Exception
{
DuplicateIDEncoding encoding = new DuplicateIDEncoding(address, duplID);
- messageJournal.appendUpdateRecord(recordID, DUPLICATE_ID, encoding);
+ messageJournal.appendUpdateRecord(recordID, DUPLICATE_ID, encoding, syncNonTransactional);
}
public void deleteDuplicateID(long recordID) throws Exception
{
- messageJournal.appendDeleteRecord(recordID);
+ messageJournal.appendDeleteRecord(recordID, syncNonTransactional);
}
// Transactional operations
@@ -328,11 +341,16 @@
messageJournal.appendAddRecordTransactional(txID,
message.getMessageID(),
ADD_LARGE_MESSAGE,
- new LargeMessageEncoding(((LargeServerMessage)message)));
+ new LargeMessageEncoding(((LargeServerMessage)message)),
+ syncTransactional);
}
else
{
- messageJournal.appendAddRecordTransactional(txID, message.getMessageID(), ADD_MESSAGE, message);
+ messageJournal.appendAddRecordTransactional(txID,
+ message.getMessageID(),
+ ADD_MESSAGE,
+ message,
+ syncTransactional);
}
}
@@ -343,7 +361,7 @@
{
// Instead of updating the record, we delete the old one as that is
// better for reclaiming
- messageJournal.appendDeleteRecordTransactional(txID, pageTransaction.getRecordID());
+ messageJournal.appendDeleteRecordTransactional(txID, pageTransaction.getRecordID(), syncTransactional);
}
pageTransaction.setRecordID(generateUniqueID());
@@ -351,22 +369,31 @@
messageJournal.appendAddRecordTransactional(txID,
pageTransaction.getRecordID(),
PAGE_TRANSACTION,
- pageTransaction);
+ pageTransaction,
+ syncTransactional);
}
public void storeReferenceTransactional(final long txID, final long queueID, final long messageID) throws Exception
{
- messageJournal.appendUpdateRecordTransactional(txID, messageID, ADD_REF, new RefEncoding(queueID));
+ messageJournal.appendUpdateRecordTransactional(txID,
+ messageID,
+ ADD_REF,
+ new RefEncoding(queueID),
+ syncTransactional);
}
public void storeAcknowledgeTransactional(final long txID, final long queueID, final long messageID) throws Exception
{
- messageJournal.appendUpdateRecordTransactional(txID, messageID, ACKNOWLEDGE_REF, new RefEncoding(queueID));
+ messageJournal.appendUpdateRecordTransactional(txID,
+ messageID,
+ ACKNOWLEDGE_REF,
+ new RefEncoding(queueID),
+ syncTransactional);
}
public void deletePageTransactional(final long txID, final long recordID) throws Exception
{
- messageJournal.appendDeleteRecordTransactional(txID, recordID);
+ messageJournal.appendDeleteRecordTransactional(txID, recordID, syncTransactional);
}
public void updateScheduledDeliveryTimeTransactional(final long txID, final MessageReference ref) throws Exception
@@ -377,27 +404,28 @@
messageJournal.appendUpdateRecordTransactional(txID,
ref.getMessage().getMessageID(),
SET_SCHEDULED_DELIVERY_TIME,
- encoding);
+ encoding,
+ syncTransactional);
}
public void deleteMessageTransactional(final long txID, final long queueID, final long messageID) throws Exception
{
- messageJournal.appendDeleteRecordTransactional(txID, messageID, new DeleteEncoding(queueID));
+ messageJournal.appendDeleteRecordTransactional(txID, messageID, new DeleteEncoding(queueID), syncTransactional);
}
public void prepare(final long txID, final Xid xid) throws Exception
{
- messageJournal.appendPrepareRecord(txID, new XidEncoding(xid));
+ messageJournal.appendPrepareRecord(txID, new XidEncoding(xid), syncTransactional);
}
public void commit(final long txID) throws Exception
{
- messageJournal.appendCommitRecord(txID);
+ messageJournal.appendCommitRecord(txID, syncTransactional);
}
public void rollback(final long txID) throws Exception
{
- messageJournal.appendRollbackRecord(txID);
+ messageJournal.appendRollbackRecord(txID, syncTransactional);
}
public void storeDuplicateIDTransactional(final long txID,
@@ -407,7 +435,7 @@
{
DuplicateIDEncoding encoding = new DuplicateIDEncoding(address, duplID);
- messageJournal.appendAddRecordTransactional(txID, recordID, DUPLICATE_ID, encoding);
+ messageJournal.appendAddRecordTransactional(txID, recordID, DUPLICATE_ID, encoding, syncTransactional);
}
public void updateDuplicateIDTransactional(final long txID,
@@ -417,12 +445,12 @@
{
DuplicateIDEncoding encoding = new DuplicateIDEncoding(address, duplID);
- messageJournal.appendUpdateRecordTransactional(txID, recordID, DUPLICATE_ID, encoding);
+ messageJournal.appendUpdateRecordTransactional(txID, recordID, DUPLICATE_ID, encoding, syncTransactional);
}
public void deleteDuplicateIDTransactional(long txID, long recordID) throws Exception
{
- messageJournal.appendDeleteRecordTransactional(txID, recordID);
+ messageJournal.appendDeleteRecordTransactional(txID, recordID, syncTransactional);
}
// Other operations
@@ -432,7 +460,10 @@
DeliveryCountUpdateEncoding updateInfo = new DeliveryCountUpdateEncoding(ref.getQueue().getPersistenceID(),
ref.getDeliveryCount());
- messageJournal.appendUpdateRecord(ref.getMessage().getMessageID(), UPDATE_DELIVERY_COUNT, updateInfo);
+ messageJournal.appendUpdateRecord(ref.getMessage().getMessageID(),
+ UPDATE_DELIVERY_COUNT,
+ updateInfo,
+ syncNonTransactional);
}
private static final class AddMessageRecord
@@ -873,12 +904,12 @@
queue.setPersistenceID(id);
- bindingsJournal.appendAddRecord(id, QUEUE_BINDING_RECORD, bindingEncoding);
+ bindingsJournal.appendAddRecord(id, QUEUE_BINDING_RECORD, bindingEncoding, true);
}
public void deleteQueueBinding(final long queueBindingID) throws Exception
{
- bindingsJournal.appendDeleteRecord(queueBindingID);
+ bindingsJournal.appendDeleteRecord(queueBindingID, true);
}
public void loadBindingJournal(final List<QueueBindingInfo> queueBindingInfos) throws Exception
@@ -1077,11 +1108,11 @@
private volatile long nextID;
public BatchingIDGenerator(final long start, final long checkpointSize)
- {
+ {
this.counter = new AtomicLong(start);
this.checkpointSize = checkpointSize;
-
+
nextID = start + checkpointSize;
}
@@ -1109,9 +1140,9 @@
}
private synchronized void saveCheckPoint(final long id)
- {
+ {
if (id >= nextID)
- {
+ {
storeID(id);
nextID += checkpointSize;
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/journal/RealAIOJournalImplTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/journal/RealAIOJournalImplTest.java 2009-06-03 13:22:52 UTC (rev 7180)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/journal/RealAIOJournalImplTest.java 2009-06-03 14:06:16 UTC (rev 7181)
@@ -73,22 +73,7 @@
return new AIOSequentialFileFactory(getTestDir());
}
- public void testAddSync() throws Exception
- {
- setup(10, 10 * 1024, true);
- this.flushOnSync = false;
- createJournal();
- startJournal();
- load();
- for (int i = 0; i < 1000; i++)
- {
- journal.appendAddRecord(i, (byte)1, new byte[] { 10, 12 });
- }
-
- stopJournal();
- }
-
@Override
protected int getAlignment()
{
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/management/MessagingServerControlTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/management/MessagingServerControlTest.java 2009-06-03 13:22:52 UTC (rev 7180)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/management/MessagingServerControlTest.java 2009-06-03 14:06:16 UTC (rev 7181)
@@ -99,8 +99,7 @@
assertEquals(conf.getJournalMinFiles(), serverControl.getJournalMinFiles());
assertEquals(conf.getJournalMaxAIO(), serverControl.getJournalMaxAIO());
assertEquals(conf.getAIOBufferSize(), serverControl.getAIOBufferSize());
- assertEquals(conf.getAIOBufferTimeout(), serverControl.getAIOBufferTimeout());
- assertEquals(conf.isAIOFlushOnSync(), serverControl.isAIOFlushOnSync());
+ assertEquals(conf.getAIOBufferTimeout(), serverControl.getAIOBufferTimeout());
assertEquals(conf.isCreateBindingsDir(), serverControl.isCreateBindingsDir());
assertEquals(conf.isCreateJournalDir(), serverControl.isCreateJournalDir());
assertEquals(conf.getPagingDirectory(), serverControl.getPagingDirectory());
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/management/MessagingServerControlUsingCoreTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/management/MessagingServerControlUsingCoreTest.java 2009-06-03 13:22:52 UTC (rev 7180)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/management/MessagingServerControlUsingCoreTest.java 2009-06-03 14:06:16 UTC (rev 7181)
@@ -422,9 +422,9 @@
return (Integer)proxy.retrieveAttributeValue("AIOBufferSize");
}
- public int getAIOBufferTimeout()
+ public long getAIOBufferTimeout()
{
- return (Integer)proxy.retrieveAttributeValue("AIOBufferTimeout");
+ return (Long)proxy.retrieveAttributeValue("AIOBufferTimeout");
}
public boolean isAIOFlushOnSync()
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/remoting/SynchronousCloseTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/remoting/SynchronousCloseTest.java 2009-06-03 13:22:52 UTC (rev 7180)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/remoting/SynchronousCloseTest.java 2009-06-03 14:06:16 UTC (rev 7181)
@@ -111,19 +111,11 @@
ClientSessionFactory sf = createSessionFactory();
- for (int i = 0; i < 100; i++)
+ for (int i = 0; i < 2000; i++)
{
ClientSession session = sf.createSession(false, true, true);
- assertEquals(1, server.getMessagingServerControl().listRemoteAddresses().length);
-
- log.info("closing session");
session.close();
- log.info("closed session");
-
- // Thread.sleep(10000);
-
- assertEquals(0, server.getMessagingServerControl().listRemoteAddresses().length);
}
sf.close();
Modified: trunk/tests/src/org/jboss/messaging/tests/performance/journal/JournalImplTestUnit.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/performance/journal/JournalImplTestUnit.java 2009-06-03 13:22:52 UTC (rev 7180)
+++ trunk/tests/src/org/jboss/messaging/tests/performance/journal/JournalImplTestUnit.java 2009-06-03 14:06:16 UTC (rev 7181)
@@ -18,7 +18,7 @@
* License along with this software; if not, write to the Free
* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
+ */
package org.jboss.messaging.tests.performance.journal;
@@ -44,43 +44,43 @@
public abstract class JournalImplTestUnit extends JournalImplTestBase
{
private static final Logger log = Logger.getLogger(JournalImplTestUnit.class);
-
+
protected void tearDown() throws Exception
{
super.tearDown();
-
+
assertEquals(0, AsynchronousFileImpl.getTotalMaxIO());
}
-
+
public void testAddUpdateDeleteManyLargeFileSize() throws Exception
{
final int numberAdds = 1000;
-
+
final int numberUpdates = 500;
-
+
final int numberDeletes = 300;
-
+
long[] adds = new long[numberAdds];
-
+
for (int i = 0; i < numberAdds; i++)
{
adds[i] = i;
}
-
+
long[] updates = new long[numberUpdates];
-
+
for (int i = 0; i < numberUpdates; i++)
{
updates[i] = i;
}
-
+
long[] deletes = new long[numberDeletes];
-
+
for (int i = 0; i < numberDeletes; i++)
{
deletes[i] = i;
}
-
+
setup(10, 10 * 1024 * 1024, true);
createJournal();
startJournal();
@@ -92,38 +92,38 @@
createJournal();
startJournal();
loadAndCheck();
-
+
}
-
+
public void testAddUpdateDeleteManySmallFileSize() throws Exception
{
final int numberAdds = 1000;
-
+
final int numberUpdates = 500;
-
+
final int numberDeletes = 300;
-
+
long[] adds = new long[numberAdds];
-
+
for (int i = 0; i < numberAdds; i++)
{
adds[i] = i;
}
-
+
long[] updates = new long[numberUpdates];
-
+
for (int i = 0; i < numberUpdates; i++)
{
updates[i] = i;
}
-
+
long[] deletes = new long[numberDeletes];
-
+
for (int i = 0; i < numberDeletes; i++)
{
deletes[i] = i;
}
-
+
setup(10, 10 * 1024, true);
createJournal();
startJournal();
@@ -137,112 +137,110 @@
createJournal();
startJournal();
loadAndCheck();
-
+
}
-
+
public void testReclaimAndReload() throws Exception
{
setup(2, 10 * 1024 * 1024, false);
createJournal();
startJournal();
load();
-
+
long start = System.currentTimeMillis();
-
-
+
byte[] record = generateRecord(recordLength);
-
+
int NUMBER_OF_RECORDS = 1000;
for (int count = 0; count < NUMBER_OF_RECORDS; count++)
{
- journal.appendAddRecord(count, (byte)0, record);
-
+ journal.appendAddRecord(count, (byte)0, record, true);
+
if (count >= NUMBER_OF_RECORDS / 2)
{
- journal.appendDeleteRecord(count - NUMBER_OF_RECORDS / 2);
+ journal.appendDeleteRecord(count - NUMBER_OF_RECORDS / 2, true);
}
-
+
if (count % 100 == 0)
{
log.debug("Done: " + count);
}
}
-
+
long end = System.currentTimeMillis();
-
+
double rate = 1000 * ((double)NUMBER_OF_RECORDS) / (end - start);
-
+
log.info("Rate of " + rate + " adds/removes per sec");
-
+
log.debug("Reclaim status = " + debugJournal());
-
+
stopJournal();
createJournal();
startJournal();
journal.load(new ArrayList<RecordInfo>(), new ArrayList<PreparedTransactionInfo>());
-
+
assertEquals(NUMBER_OF_RECORDS / 2, journal.getIDMapSize());
-
+
stopJournal();
}
-
+
public void testSpeedNonTransactional() throws Exception
{
- for (int i=0;i<1;i++)
+ for (int i = 0; i < 1; i++)
{
this.setUp();
- System.gc(); Thread.sleep(500);
+ System.gc();
+ Thread.sleep(500);
internaltestSpeedNonTransactional();
this.tearDown();
}
}
-
+
public void testSpeedTransactional() throws Exception
{
- Journal journal =
- new JournalImpl(10 * 1024 * 1024, 10, true, true, false, getFileFactory(),
- "jbm-data", "jbm", 5000);
-
+ Journal journal = new JournalImpl(10 * 1024 * 1024, 10, getFileFactory(), "jbm-data", "jbm", 5000);
+
journal.start();
-
+
journal.load(new ArrayList<RecordInfo>(), null);
-
+
try
{
final int numMessages = 50050;
-
+
SimpleEncoding data = new SimpleEncoding(1024, (byte)'j');
-
+
long start = System.currentTimeMillis();
-
+
int count = 0;
double rates[] = new double[50];
for (int i = 0; i < 50; i++)
{
long startTrans = System.currentTimeMillis();
- for (int j=0; j<1000; j++)
+ for (int j = 0; j < 1000; j++)
{
- journal.appendAddRecordTransactional(i, count++, (byte)0, data);
+ journal.appendAddRecordTransactional(i, count++, (byte)0, data, true);
}
-
- journal.appendCommitRecord(i);
-
+
+ journal.appendCommitRecord(i, true);
+
long endTrans = System.currentTimeMillis();
-
+
rates[i] = 1000 * (double)1000 / (endTrans - startTrans);
}
-
+
long end = System.currentTimeMillis();
-
- for (double rate: rates)
+
+ for (double rate : rates)
{
log.info("Transaction Rate = " + rate + " records/sec");
-
+
}
-
+
double rate = 1000 * (double)numMessages / (end - start);
-
+
log.info("Rate " + rate + " records/sec");
}
finally
@@ -251,54 +249,48 @@
}
}
-
+
private void internaltestSpeedNonTransactional() throws Exception
- {
+ {
final long numMessages = 10000;
-
- int numFiles = (int)(((numMessages * 1024 + 512) / (10 * 1024 * 1024)) * 1.3);
-
- if (numFiles<2) numFiles = 2;
-
+
+ int numFiles = (int)(((numMessages * 1024 + 512) / (10 * 1024 * 1024)) * 1.3);
+
+ if (numFiles < 2)
+ numFiles = 2;
+
log.debug("num Files=" + numFiles);
- Journal journal =
- new JournalImpl(10 * 1024 * 1024, numFiles, true, true, false, getFileFactory(),
- "jbm-data", "jbm", 5000);
-
+ Journal journal = new JournalImpl(10 * 1024 * 1024, numFiles, getFileFactory(), "jbm-data", "jbm", 5000);
+
journal.start();
-
+
journal.load(new ArrayList<RecordInfo>(), null);
-
+
log.debug("Adding data");
SimpleEncoding data = new SimpleEncoding(700, (byte)'j');
-
+
long start = System.currentTimeMillis();
-
+
for (int i = 0; i < numMessages; i++)
{
- journal.appendAddRecord(i, (byte)0, data);
+ journal.appendAddRecord(i, (byte)0, data, true);
}
-
+
long end = System.currentTimeMillis();
-
+
double rate = 1000 * (double)numMessages / (end - start);
-
+
log.info("Rate " + rate + " records/sec");
journal.stop();
-
- journal =
- new JournalImpl(10 * 1024 * 1024, numFiles, true, true, false, getFileFactory(),
- "jbm-data", "jbm", 5000);
-
+
+ journal = new JournalImpl(10 * 1024 * 1024, numFiles, getFileFactory(), "jbm-data", "jbm", 5000);
+
journal.start();
journal.load(new ArrayList<RecordInfo>(), null);
journal.stop();
-
+
}
-
-
-}
-
+}
Modified: trunk/tests/src/org/jboss/messaging/tests/stress/journal/AddAndRemoveStressTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/stress/journal/AddAndRemoveStressTest.java 2009-06-03 13:22:52 UTC (rev 7180)
+++ trunk/tests/src/org/jboss/messaging/tests/stress/journal/AddAndRemoveStressTest.java 2009-06-03 14:06:16 UTC (rev 7181)
@@ -24,7 +24,6 @@
import java.io.File;
import java.util.ArrayList;
-import java.util.concurrent.Executors;
import org.jboss.messaging.core.journal.LoadManager;
import org.jboss.messaging.core.journal.PreparedTransactionInfo;
@@ -82,7 +81,7 @@
{
SequentialFileFactory factory = new AIOSequentialFileFactory(getTestDir());
- JournalImpl impl = new JournalImpl(10 * 1024 * 1024, NUMBER_OF_FILES_ON_JOURNAL, true, false, true, factory, "jbm", "jbm", 1000);
+ JournalImpl impl = new JournalImpl(10 * 1024 * 1024, NUMBER_OF_FILES_ON_JOURNAL, factory, "jbm", "jbm", 1000);
impl.start();
@@ -94,13 +93,13 @@
{
System.out.println("Append " + i);
}
- impl.appendAddRecord(i, (byte)0, new SimpleEncoding(1024, (byte)'f'));
+ impl.appendAddRecord(i, (byte)0, new SimpleEncoding(1024, (byte)'f'), false);
}
impl.stop();
factory = new AIOSequentialFileFactory(getTestDir());
- impl = new JournalImpl(10 * 1024 * 1024, NUMBER_OF_FILES_ON_JOURNAL, true, false, true, factory, "jbm", "jbm", 1000);
+ impl = new JournalImpl(10 * 1024 * 1024, NUMBER_OF_FILES_ON_JOURNAL, factory, "jbm", "jbm", 1000);
impl.start();
@@ -113,13 +112,13 @@
System.out.println("Delete " + i);
}
- impl.appendDeleteRecord(i);
+ impl.appendDeleteRecord(i, false);
}
impl.stop();
factory = new AIOSequentialFileFactory(getTestDir());
- impl = new JournalImpl(10 * 1024 * 1024, NUMBER_OF_FILES_ON_JOURNAL, true, false, true, factory, "jbm", "jbm", 1000);
+ impl = new JournalImpl(10 * 1024 * 1024, NUMBER_OF_FILES_ON_JOURNAL, factory, "jbm", "jbm", 1000);
impl.start();
@@ -146,7 +145,7 @@
{
SequentialFileFactory factory = new AIOSequentialFileFactory(getTestDir());
- JournalImpl impl = new JournalImpl(10 * 1024 * 1024, NUMBER_OF_FILES_ON_JOURNAL, true, false, true, factory, "jbm", "jbm", 1000);
+ JournalImpl impl = new JournalImpl(10 * 1024 * 1024, NUMBER_OF_FILES_ON_JOURNAL, factory, "jbm", "jbm", 1000);
impl.start();
@@ -158,14 +157,14 @@
{
System.out.println("Append " + i);
}
- impl.appendAddRecord(i, (byte)21, new SimpleEncoding(40, (byte)'f'));
- impl.appendUpdateRecord(i, (byte)22, new SimpleEncoding(40, (byte)'g'));
+ impl.appendAddRecord(i, (byte)21, new SimpleEncoding(40, (byte)'f'), false);
+ impl.appendUpdateRecord(i, (byte)22, new SimpleEncoding(40, (byte)'g'), false);
}
impl.stop();
factory = new AIOSequentialFileFactory(getTestDir());
- impl = new JournalImpl(10 * 1024 * 1024, 10, true, false, true, factory, "jbm", "jbm", 1000);
+ impl = new JournalImpl(10 * 1024 * 1024, 10, factory, "jbm", "jbm", 1000);
impl.start();
@@ -178,13 +177,13 @@
System.out.println("Delete " + i);
}
- impl.appendDeleteRecord(i);
+ impl.appendDeleteRecord(i, false);
}
impl.stop();
factory = new AIOSequentialFileFactory(getTestDir());
- impl = new JournalImpl(10 * 1024 * 1024, NUMBER_OF_FILES_ON_JOURNAL, true, false, true, factory, "jbm", "jbm", 1000);
+ impl = new JournalImpl(10 * 1024 * 1024, NUMBER_OF_FILES_ON_JOURNAL, factory, "jbm", "jbm", 1000);
impl.start();
Modified: trunk/tests/src/org/jboss/messaging/tests/stress/journal/remote/RemoteJournalAppender.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/stress/journal/remote/RemoteJournalAppender.java 2009-06-03 13:22:52 UTC (rev 7180)
+++ trunk/tests/src/org/jboss/messaging/tests/stress/journal/remote/RemoteJournalAppender.java 2009-06-03 14:06:16 UTC (rev 7181)
@@ -142,8 +142,7 @@
public static JournalImpl createJournal(String journalType, String journalDir)
{
- JournalImpl journal = new JournalImpl(10485760, 2, true,
- false, true, getFactory(journalType, journalDir), "journaltst", "tst", 5000);
+ JournalImpl journal = new JournalImpl(10485760, 2, getFactory(journalType, journalDir), "journaltst", "tst", 5000);
return journal;
}
@@ -212,25 +211,25 @@
if (transactionSize != 0)
{
- journal.appendAddRecordTransactional(transactionId, id, (byte)99, buffer.array());
+ journal.appendAddRecordTransactional(transactionId, id, (byte)99, buffer.array(), false);
if (++transactionCounter == transactionSize)
{
System.out.println("Commit transaction " + transactionId);
- journal.appendCommitRecord(transactionId);
+ journal.appendCommitRecord(transactionId, false);
transactionCounter = 0;
transactionId = nextID.incrementAndGet();
}
}
else
{
- journal.appendAddRecord(id, (byte)99, buffer.array());
+ journal.appendAddRecord(id, (byte)99, buffer.array(), false);
}
}
if (transactionCounter != 0)
{
- journal.appendCommitRecord(transactionId);
+ journal.appendCommitRecord(transactionId, false);
}
if (transactionSize == 0)
Modified: trunk/tests/src/org/jboss/messaging/tests/timing/core/journal/impl/JournalImplTestUnit.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/timing/core/journal/impl/JournalImplTestUnit.java 2009-06-03 13:22:52 UTC (rev 7180)
+++ trunk/tests/src/org/jboss/messaging/tests/timing/core/journal/impl/JournalImplTestUnit.java 2009-06-03 14:06:16 UTC (rev 7181)
@@ -152,11 +152,11 @@
for (int count = 0; count < NUMBER_OF_RECORDS; count++)
{
- journal.appendAddRecord(count, (byte)0, record);
+ journal.appendAddRecord(count, (byte)0, record, false);
if (count >= NUMBER_OF_RECORDS / 2)
{
- journal.appendDeleteRecord(count - NUMBER_OF_RECORDS / 2);
+ journal.appendDeleteRecord(count - NUMBER_OF_RECORDS / 2, false);
}
if (count % 100 == 0)
Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/config/impl/DefaultsFileConfigurationTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/config/impl/DefaultsFileConfigurationTest.java 2009-06-03 13:22:52 UTC (rev 7180)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/config/impl/DefaultsFileConfigurationTest.java 2009-06-03 14:06:16 UTC (rev 7181)
@@ -110,8 +110,6 @@
assertEquals(ConfigurationImpl.DEFAULT_JOURNAL_MAX_AIO, conf.getJournalMaxAIO());
- assertEquals(ConfigurationImpl.DEFAULT_JOURNAL_AIO_FLUSH_SYNC, conf.isAIOFlushOnSync());
-
assertEquals(ConfigurationImpl.DEFAULT_JOURNAL_AIO_BUFFER_TIMEOUT, conf.getAIOBufferTimeout());
assertEquals(ConfigurationImpl.DEFAULT_JOURNAL_AIO_BUFFER_SIZE, conf.getAIOBufferSize());
Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/config/impl/FileConfigurationTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/config/impl/FileConfigurationTest.java 2009-06-03 13:22:52 UTC (rev 7180)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/config/impl/FileConfigurationTest.java 2009-06-03 14:06:16 UTC (rev 7181)
@@ -79,8 +79,7 @@
assertEquals("somedir2", conf.getJournalDirectory());
assertEquals(false, conf.isCreateJournalDir());
assertEquals(JournalType.NIO, conf.getJournalType());
- assertEquals(10000, conf.getAIOBufferSize());
- assertEquals(true, conf.isAIOFlushOnSync());
+ assertEquals(10000, conf.getAIOBufferSize());
assertEquals(1000, conf.getAIOBufferTimeout());
assertEquals(false, conf.isJournalSyncTransactional());
assertEquals(true, conf.isJournalSyncNonTransactional());
Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/deployers/impl/QueueDeployerTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/deployers/impl/QueueDeployerTest.java 2009-06-03 13:22:52 UTC (rev 7180)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/deployers/impl/QueueDeployerTest.java 2009-06-03 14:06:16 UTC (rev 7181)
@@ -532,9 +532,8 @@
/* (non-Javadoc)
* @see org.jboss.messaging.core.management.MessagingServerControlMBean#getAIOBufferTimeout()
*/
- public int getAIOBufferTimeout()
+ public long getAIOBufferTimeout()
{
-
return 0;
}
Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/AlignedJournalImplTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/AlignedJournalImplTest.java 2009-06-03 13:22:52 UTC (rev 7180)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/AlignedJournalImplTest.java 2009-06-03 14:06:16 UTC (rev 7181)
@@ -146,7 +146,7 @@
try
{
- journalImpl = new JournalImpl(2000, 2, true, true, false, factory, "tt", "tt", 1000);
+ journalImpl = new JournalImpl(2000, 2, factory, "tt", "tt", 1000);
fail("Supposed to throw an exception");
}
catch (Exception ignored)
@@ -161,7 +161,7 @@
setupJournal(JOURNAL_SIZE, 10);
- journalImpl.appendAddRecord(13, (byte)14, new SimpleEncoding(1, (byte)15));
+ journalImpl.appendAddRecord(13, (byte)14, new SimpleEncoding(1, (byte)15), false);
journalImpl.forceMoveNextFile();
@@ -198,13 +198,13 @@
{
bytes[j] = (byte)i;
}
- journalImpl.appendAddRecord(i * 100l, (byte)i, bytes);
+ journalImpl.appendAddRecord(i * 100l, (byte)i, bytes, false);
}
for (int i = 25; i < 50; i++)
{
EncodingSupport support = new SimpleEncoding(5, (byte)i);
- journalImpl.appendAddRecord(i * 100l, (byte)i, support);
+ journalImpl.appendAddRecord(i * 100l, (byte)i, support, false);
}
setupJournal(JOURNAL_SIZE, 1024);
@@ -233,7 +233,7 @@
bytes[j] = (byte)'x';
}
- journalImpl.appendUpdateRecord(i * 100l, (byte)i, bytes);
+ journalImpl.appendUpdateRecord(i * 100l, (byte)i, bytes, false);
}
setupJournal(JOURNAL_SIZE, 1024);
@@ -291,7 +291,7 @@
for (int i = 0; i < 50; i++)
{
- journalImpl.appendAddRecord(i, (byte)1, new SimpleEncoding(1, (byte)'x'));
+ journalImpl.appendAddRecord(i, (byte)1, new SimpleEncoding(1, (byte)'x'), false);
}
journalImpl.forceMoveNextFile();
@@ -304,7 +304,7 @@
for (int i = 10; i < 50; i++)
{
- journalImpl.appendDeleteRecord(i);
+ journalImpl.appendDeleteRecord(i, false);
}
journalImpl.debugWait();
@@ -337,7 +337,7 @@
for (int i = 0; i < 50; i++)
{
- journalImpl.appendAddRecord(i, (byte)1, new SimpleEncoding(1, (byte)'x'));
+ journalImpl.appendAddRecord(i, (byte)1, new SimpleEncoding(1, (byte)'x'), false);
}
// as the request to a new file is asynchronous, we need to make sure the
@@ -348,12 +348,12 @@
for (int i = 0; i < 50; i++)
{
- journalImpl.appendDeleteRecord(i);
+ journalImpl.appendDeleteRecord(i, false);
}
journalImpl.forceMoveNextFile();
- journalImpl.appendAddRecord(1000, (byte)1, new SimpleEncoding(1, (byte)'x'));
+ journalImpl.appendAddRecord(1000, (byte)1, new SimpleEncoding(1, (byte)'x'), false);
journalImpl.debugWait();
@@ -390,7 +390,7 @@
assertEquals(0, records.size());
assertEquals(0, transactions.size());
- journalImpl.appendAddRecordTransactional(1, 1, (byte)1, new SimpleEncoding(1, (byte)1));
+ journalImpl.appendAddRecordTransactional(1, 1, (byte)1, new SimpleEncoding(1, (byte)1), false);
setupJournal(JOURNAL_SIZE, 100);
@@ -399,7 +399,7 @@
try
{
- journalImpl.appendCommitRecord(1l);
+ journalImpl.appendCommitRecord(1l, false);
// This was supposed to throw an exception, as the transaction was
// forgotten (interrupted by a reload).
fail("Supposed to throw exception");
@@ -429,7 +429,7 @@
for (int i = 0; i < 10; i++)
{
- journalImpl.appendAddRecordTransactional(77l, 1, (byte)1, new SimpleEncoding(1, (byte)1));
+ journalImpl.appendAddRecordTransactional(77l, 1, (byte)1, new SimpleEncoding(1, (byte)1), false);
journalImpl.forceMoveNextFile();
}
@@ -437,7 +437,7 @@
assertEquals(12, factory.listFiles("tt").size());
- journalImpl.appendAddRecordTransactional(78l, 1, (byte)1, new SimpleEncoding(1, (byte)1));
+ journalImpl.appendAddRecordTransactional(78l, 1, (byte)1, new SimpleEncoding(1, (byte)1), false);
assertEquals(12, factory.listFiles("tt").size());
@@ -448,7 +448,7 @@
try
{
- journalImpl.appendCommitRecord(77l);
+ journalImpl.appendCommitRecord(77l, false);
// This was supposed to throw an exception, as the transaction was
// forgotten (interrupted by a reload).
fail("Supposed to throw exception");
@@ -478,11 +478,11 @@
for (int i = 0; i < 10; i++)
{
- journalImpl.appendAddRecordTransactional(1, i, (byte)1, new SimpleEncoding(1, (byte)1));
+ journalImpl.appendAddRecordTransactional(1, i, (byte)1, new SimpleEncoding(1, (byte)1), false);
journalImpl.forceMoveNextFile();
}
- journalImpl.appendCommitRecord(1l);
+ journalImpl.appendCommitRecord(1l, false);
journalImpl.debugWait();
@@ -501,17 +501,17 @@
for (int i = 0; i < 10; i++)
{
- journalImpl.appendDeleteRecordTransactional(2l, i);
+ journalImpl.appendDeleteRecordTransactional(2l, i, false);
journalImpl.forceMoveNextFile();
}
- journalImpl.appendCommitRecord(2l);
+ journalImpl.appendCommitRecord(2l, false);
- journalImpl.appendAddRecord(100, (byte)1, new SimpleEncoding(5, (byte)1));
+ journalImpl.appendAddRecord(100, (byte)1, new SimpleEncoding(5, (byte)1), false);
journalImpl.forceMoveNextFile();
- journalImpl.appendAddRecord(101, (byte)1, new SimpleEncoding(5, (byte)1));
+ journalImpl.appendAddRecord(101, (byte)1, new SimpleEncoding(5, (byte)1), false);
journalImpl.checkAndReclaimFiles();
@@ -536,9 +536,9 @@
journalImpl.appendAddRecordTransactional(1l,
2l,
(byte)3,
- new SimpleEncoding(1900 - JournalImpl.SIZE_ADD_RECORD_TX, (byte)4));
+ new SimpleEncoding(1900 - JournalImpl.SIZE_ADD_RECORD_TX, (byte)4), false);
- journalImpl.appendCommitRecord(1l);
+ journalImpl.appendCommitRecord(1l, false);
journalImpl.debugWait();
@@ -561,13 +561,13 @@
for (int i = 0; i < 20; i++)
{
- journalImpl.appendAddRecordTransactional(1l, i, (byte)0, new SimpleEncoding(1, (byte)15));
+ journalImpl.appendAddRecordTransactional(1l, i, (byte)0, new SimpleEncoding(1, (byte)15), false);
journalImpl.forceMoveNextFile();
}
journalImpl.forceMoveNextFile();
- journalImpl.appendCommitRecord(1l);
+ journalImpl.appendCommitRecord(1l, false);
SequentialFile file = factory.createSequentialFile("tt-1.tt", 1);
@@ -626,16 +626,16 @@
for (int i = 0; i < 20; i++)
{
- journalImpl.appendAddRecordTransactional(1l, i, (byte)0, new SimpleEncoding(1, (byte)15));
- journalImpl.appendAddRecordTransactional(2l, i + 20l, (byte)0, new SimpleEncoding(1, (byte)15));
+ journalImpl.appendAddRecordTransactional(1l, i, (byte)0, new SimpleEncoding(1, (byte)15), false);
+ journalImpl.appendAddRecordTransactional(2l, i + 20l, (byte)0, new SimpleEncoding(1, (byte)15), false);
journalImpl.forceMoveNextFile();
}
journalImpl.forceMoveNextFile();
- journalImpl.appendCommitRecord(1l);
+ journalImpl.appendCommitRecord(1l, false);
- journalImpl.appendCommitRecord(2l);
+ journalImpl.appendCommitRecord(2l, false);
SequentialFile file = factory.createSequentialFile("tt-1.tt", 1);
@@ -695,7 +695,7 @@
for (int i = 0; i < 10; i++)
{
- journalImpl.appendAddRecord(i, (byte)0, new SimpleEncoding(1, (byte)0));
+ journalImpl.appendAddRecord(i, (byte)0, new SimpleEncoding(1, (byte)0), false);
journalImpl.forceMoveNextFile();
}
@@ -707,7 +707,7 @@
for (int i = 0; i < 10; i++)
{
- journalImpl.appendDeleteRecord(i);
+ journalImpl.appendDeleteRecord(i, false);
}
journalImpl.forceMoveNextFile();
@@ -734,19 +734,19 @@
for (int i = 0; i < 10; i++)
{
- journalImpl.appendAddRecordTransactional(1l, i, (byte)0, new SimpleEncoding(1, (byte)15));
+ journalImpl.appendAddRecordTransactional(1l, i, (byte)0, new SimpleEncoding(1, (byte)15), false);
journalImpl.forceMoveNextFile();
}
for (int i = 10; i < 20; i++)
{
- journalImpl.appendAddRecordTransactional(1l, i, (byte)0, new SimpleEncoding(1, (byte)15));
+ journalImpl.appendAddRecordTransactional(1l, i, (byte)0, new SimpleEncoding(1, (byte)15), false);
journalImpl.forceMoveNextFile();
}
journalImpl.forceMoveNextFile();
- journalImpl.appendCommitRecord(1l);
+ journalImpl.appendCommitRecord(1l, false);
SequentialFile file = factory.createSequentialFile("tt-1.tt", 1);
@@ -796,22 +796,22 @@
for (int i = 0; i < 10; i++)
{
- journalImpl.appendAddRecordTransactional(1l, i, (byte)0, new SimpleEncoding(1, (byte)15));
+ journalImpl.appendAddRecordTransactional(1l, i, (byte)0, new SimpleEncoding(1, (byte)15), false);
}
journalImpl.forceMoveNextFile();
SimpleEncoding xidEncoding = new SimpleEncoding(10, (byte)'a');
- journalImpl.appendPrepareRecord(1l, xidEncoding);
- journalImpl.appendCommitRecord(1l);
+ journalImpl.appendPrepareRecord(1l, xidEncoding, false);
+ journalImpl.appendCommitRecord(1l, false);
for (int i = 0; i < 10; i++)
{
- journalImpl.appendDeleteRecordTransactional(2l, i);
+ journalImpl.appendDeleteRecordTransactional(2l, i, false);
}
- journalImpl.appendCommitRecord(2l);
- journalImpl.appendAddRecord(100l, (byte)0, new SimpleEncoding(1, (byte)10)); // Add
+ journalImpl.appendCommitRecord(2l, false);
+ journalImpl.appendAddRecord(100l, (byte)0, new SimpleEncoding(1, (byte)10), false); // Add
// anything
// to
// keep
@@ -841,10 +841,10 @@
{
journalImpl.forceMoveNextFile();
}
- journalImpl.appendAddRecordTransactional(1l, i, (byte)0, new SimpleEncoding(1, (byte)15));
+ journalImpl.appendAddRecordTransactional(1l, i, (byte)0, new SimpleEncoding(1, (byte)15), false);
}
- journalImpl.appendCommitRecord(1l);
+ journalImpl.appendCommitRecord(1l, false);
for (int i = 0; i < 10; i++)
{
@@ -852,10 +852,10 @@
{
journalImpl.forceMoveNextFile();
}
- journalImpl.appendDeleteRecordTransactional(2l, i);
+ journalImpl.appendDeleteRecordTransactional(2l, i, false);
}
- journalImpl.appendCommitRecord(2l);
+ journalImpl.appendCommitRecord(2l, false);
journalImpl.forceMoveNextFile();
journalImpl.checkAndReclaimFiles();
@@ -876,11 +876,11 @@
SimpleEncoding xid = new SimpleEncoding(10, (byte)1);
- journalImpl.appendAddRecord(10l, (byte)0, new SimpleEncoding(10, (byte)0));
+ journalImpl.appendAddRecord(10l, (byte)0, new SimpleEncoding(10, (byte)0), false);
- journalImpl.appendDeleteRecordTransactional(1l, 10l, new SimpleEncoding(100, (byte)'j'));
+ journalImpl.appendDeleteRecordTransactional(1l, 10l, new SimpleEncoding(100, (byte)'j'), false);
- journalImpl.appendPrepareRecord(1, xid);
+ journalImpl.appendPrepareRecord(1, xid, false);
journalImpl.debugWait();
@@ -907,7 +907,7 @@
assertEquals((byte)1, transactions.get(0).extraData[i]);
}
- journalImpl.appendCommitRecord(1l);
+ journalImpl.appendCommitRecord(1l, false);
journalImpl.debugWait();
@@ -929,7 +929,7 @@
for (int i = 0; i < 10; i++)
{
- journalImpl.appendAddRecordTransactional(1, i, (byte)1, new SimpleEncoding(50, (byte)1));
+ journalImpl.appendAddRecordTransactional(1, i, (byte)1, new SimpleEncoding(50, (byte)1), false);
journalImpl.forceMoveNextFile();
}
@@ -937,7 +937,7 @@
SimpleEncoding xid1 = new SimpleEncoding(10, (byte)1);
- journalImpl.appendPrepareRecord(1l, xid1);
+ journalImpl.appendPrepareRecord(1l, xid1, false);
assertEquals(12, factory.listFiles("tt").size());
@@ -958,7 +958,7 @@
assertEquals(12, factory.listFiles("tt").size());
- journalImpl.appendCommitRecord(1l);
+ journalImpl.appendCommitRecord(1l, false);
setupJournal(JOURNAL_SIZE, 1024);
@@ -968,12 +968,12 @@
for (int i = 0; i < 10; i++)
{
- journalImpl.appendDeleteRecordTransactional(2l, i);
+ journalImpl.appendDeleteRecordTransactional(2l, i, false);
}
SimpleEncoding xid2 = new SimpleEncoding(15, (byte)2);
- journalImpl.appendPrepareRecord(2l, xid2);
+ journalImpl.appendPrepareRecord(2l, xid2, false);
setupJournal(JOURNAL_SIZE, 1);
@@ -990,7 +990,7 @@
assertEquals(12, factory.listFiles("tt").size());
- journalImpl.appendCommitRecord(2l);
+ journalImpl.appendCommitRecord(2l, false);
setupJournal(JOURNAL_SIZE, 1);
@@ -1018,11 +1018,11 @@
for (int i = 0; i < 10; i++)
{
- journalImpl.appendAddRecordTransactional(1, i, (byte)1, new SimpleEncoding(50, (byte)1));
+ journalImpl.appendAddRecordTransactional(1, i, (byte)1, new SimpleEncoding(50, (byte)1), false);
journalImpl.forceMoveNextFile();
}
- journalImpl.appendPrepareRecord(1l, new SimpleEncoding(13, (byte)0));
+ journalImpl.appendPrepareRecord(1l, new SimpleEncoding(13, (byte)0), false);
setupJournal(JOURNAL_SIZE, 100);
assertEquals(0, records.size());
@@ -1067,11 +1067,11 @@
for (int i = 0; i < 10; i++)
{
- journalImpl.appendAddRecordTransactional(1l, i, (byte)0, new SimpleEncoding(1, (byte)0));
+ journalImpl.appendAddRecordTransactional(1l, i, (byte)0, new SimpleEncoding(1, (byte)0), false);
journalImpl.forceMoveNextFile();
}
- journalImpl.appendRollbackRecord(1l);
+ journalImpl.appendRollbackRecord(1l, false);
journalImpl.forceMoveNextFile();
@@ -1096,10 +1096,10 @@
for (int i = 0; i < 10; i++)
{
- journalImpl.appendAddRecordTransactional(1l, i, (byte)0, new SimpleEncoding(1, (byte)0));
+ journalImpl.appendAddRecordTransactional(1l, i, (byte)0, new SimpleEncoding(1, (byte)0), false);
}
- journalImpl.appendCommitRecord(1l);
+ journalImpl.appendCommitRecord(1l, false);
setupJournal(JOURNAL_SIZE, 100);
@@ -1119,10 +1119,10 @@
for (int i = 0; i < 10; i++)
{
- journalImpl.appendAddRecordTransactional(1l, i, (byte)0, new SimpleEncoding(1, (byte)0));
+ journalImpl.appendAddRecordTransactional(1l, i, (byte)0, new SimpleEncoding(1, (byte)0), false);
}
- journalImpl.appendCommitRecord(1l);
+ journalImpl.appendCommitRecord(1l, false);
setupJournal(JOURNAL_SIZE, 100);
@@ -1139,11 +1139,11 @@
setupJournal(JOURNAL_SIZE, 1);
- journalImpl.appendPrepareRecord(2l, new SimpleEncoding(10, (byte)'j'));
+ journalImpl.appendPrepareRecord(2l, new SimpleEncoding(10, (byte)'j'), false);
journalImpl.forceMoveNextFile();
- journalImpl.appendAddRecord(1l, (byte)0, new SimpleEncoding(10, (byte)'k'));
+ journalImpl.appendAddRecord(1l, (byte)0, new SimpleEncoding(10, (byte)'k'), false);
setupJournal(JOURNAL_SIZE, 1);
@@ -1159,9 +1159,9 @@
assertEquals(1, transactions.size());
- journalImpl.appendCommitRecord(2l);
+ journalImpl.appendCommitRecord(2l, false);
- journalImpl.appendDeleteRecord(1l);
+ journalImpl.appendDeleteRecord(1l, false);
journalImpl.forceMoveNextFile();
@@ -1203,8 +1203,8 @@
latchStart.await();
for (int i = 0; i < NUMBER_OF_ELEMENTS; i++)
{
- journalImpl.appendAddRecordTransactional(i, i, (byte)1, new SimpleEncoding(50, (byte)1));
- journalImpl.appendCommitRecord(i);
+ journalImpl.appendAddRecordTransactional(i, i, (byte)1, new SimpleEncoding(50, (byte)1), false);
+ journalImpl.appendCommitRecord(i, false);
queueDelete.offer(i);
}
finishedOK.incrementAndGet();
@@ -1232,7 +1232,7 @@
{
break;
}
- journalImpl.appendDeleteRecord(toDelete);
+ journalImpl.appendDeleteRecord(toDelete, false);
}
finishedOK.incrementAndGet();
}
@@ -1268,20 +1268,20 @@
{
SequentialFileFactory factory = new FakeSequentialFileFactory(512, false);
- JournalImpl impl = new JournalImpl(512 + 512 * 3, 20, true, false, false, factory, "jbm", "jbm", 1000);
+ JournalImpl impl = new JournalImpl(512 + 512 * 3, 20, factory, "jbm", "jbm", 1000);
impl.start();
impl.load(dummyLoader);
- impl.appendAddRecord(1l, (byte)0, new SimpleEncoding(100, (byte)'a'));
- impl.appendAddRecord(2l, (byte)0, new SimpleEncoding(100, (byte)'b'));
- impl.appendAddRecord(3l, (byte)0, new SimpleEncoding(100, (byte)'b'));
- impl.appendAddRecord(4l, (byte)0, new SimpleEncoding(100, (byte)'b'));
+ impl.appendAddRecord(1l, (byte)0, new SimpleEncoding(100, (byte)'a'), false);
+ impl.appendAddRecord(2l, (byte)0, new SimpleEncoding(100, (byte)'b'), false);
+ impl.appendAddRecord(3l, (byte)0, new SimpleEncoding(100, (byte)'b'), false);
+ impl.appendAddRecord(4l, (byte)0, new SimpleEncoding(100, (byte)'b'), false);
impl.stop();
- impl = new JournalImpl(512 + 1024 + 512, 20, true, false, false, factory, "jbm", "jbm", 1000);
+ impl = new JournalImpl(512 + 1024 + 512, 20, factory, "jbm", "jbm", 1000);
impl.start();
impl.load(dummyLoader);
@@ -1289,14 +1289,14 @@
// specific bug caught during development
impl.forceMoveNextFile();
- impl.appendDeleteRecord(1l);
- impl.appendDeleteRecord(2l);
- impl.appendDeleteRecord(3l);
- impl.appendDeleteRecord(4l);
+ impl.appendDeleteRecord(1l, false);
+ impl.appendDeleteRecord(2l, false);
+ impl.appendDeleteRecord(3l, false);
+ impl.appendDeleteRecord(4l, false);
impl.stop();
- impl = new JournalImpl(512 + 1024 + 512, 20, true, false, false, factory, "jbm", "jbm", 1000);
+ impl = new JournalImpl(512 + 1024 + 512, 20, factory, "jbm", "jbm", 1000);
impl.start();
ArrayList<RecordInfo> info = new ArrayList<RecordInfo>();
@@ -1364,7 +1364,7 @@
journalImpl.stop();
}
- journalImpl = new JournalImpl(journalSize, numberOfMinimalFiles, true, true, false, factory, "tt", "tt", 1000);
+ journalImpl = new JournalImpl(journalSize, numberOfMinimalFiles, factory, "tt", "tt", 1000);
journalImpl.start();
Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalAsyncTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalAsyncTest.java 2009-06-03 13:22:52 UTC (rev 7180)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalAsyncTest.java 2009-06-03 14:06:16 UTC (rev 7181)
@@ -85,12 +85,12 @@
{
for (int i = 0; i < 10; i++)
{
- journalImpl.appendAddRecordTransactional(1l, i, (byte)1, new SimpleEncoding(1, (byte)0));
+ journalImpl.appendAddRecordTransactional(1l, i, (byte)1, new SimpleEncoding(1, (byte)0), false);
}
latch.countDown();
factory.setHoldCallbacks(false, null);
- journalImpl.appendCommitRecord(1l);
+ journalImpl.appendCommitRecord(1l, false);
}
catch (Exception e)
{
@@ -147,10 +147,10 @@
{
for (int i = 0; i < 10; i++)
{
- journalImpl.appendAddRecordTransactional(1l, i, (byte)1, new SimpleEncoding(1, (byte)0));
+ journalImpl.appendAddRecordTransactional(1l, i, (byte)1, new SimpleEncoding(1, (byte)0), false);
}
- journalImpl.appendRollbackRecord(1l);
+ journalImpl.appendRollbackRecord(1l, false);
}
catch (Exception e)
{
@@ -211,10 +211,10 @@
{
for (int i = 0; i < 10; i++)
{
- journalImpl.appendAddRecordTransactional(1l, i, (byte)1, new SimpleEncoding(1, (byte)0));
+ journalImpl.appendAddRecordTransactional(1l, i, (byte)1, new SimpleEncoding(1, (byte)0), false);
}
- journalImpl.appendCommitRecord(1l);
+ journalImpl.appendCommitRecord(1l, false);
}
catch (Exception e)
{
@@ -248,7 +248,7 @@
try
{
- journalImpl.appendRollbackRecord(1l);
+ journalImpl.appendRollbackRecord(1l, false);
fail("Supposed to throw an exception");
}
catch (Exception e)
@@ -268,7 +268,7 @@
factory.setHoldCallbacks(true, null);
factory.setGenerateErrors(true);
- journalImpl.appendAddRecordTransactional(1l, 1, (byte)1, new SimpleEncoding(1, (byte)0));
+ journalImpl.appendAddRecordTransactional(1l, 1, (byte)1, new SimpleEncoding(1, (byte)0), false);
factory.flushAllCallbacks();
@@ -277,7 +277,7 @@
try
{
- journalImpl.appendAddRecordTransactional(1l, 2, (byte)1, new SimpleEncoding(1, (byte)0));
+ journalImpl.appendAddRecordTransactional(1l, 2, (byte)1, new SimpleEncoding(1, (byte)0), false);
fail("Exception expected"); // An exception already happened in one
// of the elements on this transaction.
// We can't accept any more elements on
@@ -298,7 +298,7 @@
try
{
- journalImpl.appendAddRecord(1l, (byte)0, new SimpleEncoding(1, (byte)0));
+ journalImpl.appendAddRecord(1l, (byte)0, new SimpleEncoding(1, (byte)0), false);
fail("Exception expected");
}
catch (Exception ignored)
@@ -357,7 +357,7 @@
journalImpl.stop();
}
- journalImpl = new JournalImpl(journalSize, numberOfMinimalFiles, true, true, false, factory, "tt", "tt", 1000);
+ journalImpl = new JournalImpl(journalSize, numberOfMinimalFiles, factory, "tt", "tt", 1000);
journalImpl.start();
Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalImplTestBase.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalImplTestBase.java 2009-06-03 13:22:52 UTC (rev 7180)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalImplTestBase.java 2009-06-03 14:06:16 UTC (rev 7181)
@@ -22,7 +22,6 @@
package org.jboss.messaging.tests.unit.core.journal.impl;
-import java.io.File;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.LinkedHashMap;
@@ -69,8 +68,6 @@
protected boolean sync;
- protected boolean flushOnSync;
-
protected String filePrefix = "jbm";
protected String fileExtension = "jbm";
@@ -142,13 +139,12 @@
minFiles = minFreeFiles;
this.fileSize = fileSize;
this.sync = sync;
- this.flushOnSync = sync;
maxAIO = 50;
}
public void createJournal() throws Exception
{
- journal = new JournalImpl(fileSize, minFiles, sync, sync, flushOnSync, fileFactory, filePrefix, fileExtension, maxAIO);
+ journal = new JournalImpl(fileSize, minFiles, fileFactory, filePrefix, fileExtension, maxAIO);
journal.setAutoReclaim(false);
}
@@ -230,7 +226,7 @@
{
byte[] record = generateRecord(size);
- journal.appendAddRecord(element, (byte)0, record);
+ journal.appendAddRecord(element, (byte)0, record, sync);
records.add(new RecordInfo(element, (byte)0, record, false));
}
@@ -244,7 +240,7 @@
{
byte[] updateRecord = generateRecord(recordLength);
- journal.appendUpdateRecord(element, (byte)0, updateRecord);
+ journal.appendUpdateRecord(element, (byte)0, updateRecord, sync);
records.add(new RecordInfo(element, (byte)0, updateRecord, true));
}
@@ -256,7 +252,7 @@
{
for (long element : arguments)
{
- journal.appendDeleteRecord(element);
+ journal.appendDeleteRecord(element, sync);
removeRecordsForID(element);
}
@@ -274,7 +270,7 @@
// SIZE_BYTE
byte[] record = generateRecord(recordLength - JournalImpl.SIZE_ADD_RECORD_TX);
- journal.appendAddRecordTransactional(txID, element, (byte)0, record);
+ journal.appendAddRecordTransactional(txID, element, (byte)0, record, sync);
tx.records.add(new RecordInfo(element, (byte)0, record, false));
@@ -291,7 +287,7 @@
{
byte[] updateRecord = generateRecord(recordLength - JournalImpl.SIZE_UPDATE_RECORD_TX);
- journal.appendUpdateRecordTransactional(txID, element, (byte)0, updateRecord);
+ journal.appendUpdateRecordTransactional(txID, element, (byte)0, updateRecord, sync);
tx.records.add(new RecordInfo(element, (byte)0, updateRecord, true));
}
@@ -304,7 +300,7 @@
for (long element : arguments)
{
- journal.appendDeleteRecordTransactional(txID, element);
+ journal.appendDeleteRecordTransactional(txID, element, sync);
tx.deletes.add(new RecordInfo(element, (byte)0, null, true));
}
@@ -325,7 +321,7 @@
{
throw new IllegalStateException("Transaction is already prepared");
}
- journal.appendPrepareRecord(txID, xid);
+ journal.appendPrepareRecord(txID, xid, sync);
tx.prepared = true;
@@ -341,7 +337,7 @@
throw new IllegalStateException("Cannot find tx " + txID);
}
- journal.appendCommitRecord(txID);
+ journal.appendCommitRecord(txID, sync);
commitTx(txID);
@@ -357,7 +353,7 @@
throw new IllegalStateException("Cannot find tx " + txID);
}
- journal.appendRollbackRecord(txID);
+ journal.appendRollbackRecord(txID, sync);
journal.debugWait();
}
Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalImplTestUnit.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalImplTestUnit.java 2009-06-03 13:22:52 UTC (rev 7180)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/journal/impl/JournalImplTestUnit.java 2009-06-03 14:06:16 UTC (rev 7181)
@@ -120,7 +120,7 @@
for (int i = 0; i < 100; i++)
{
System.out.println("i = " + i);
- journal.appendAddRecord(1, (byte)1, new SimpleEncoding(2, (byte)'a'));
+ journal.appendAddRecord(1, (byte)1, new SimpleEncoding(2, (byte)'a'), false);
}
stopJournal();
}
@@ -129,7 +129,7 @@
{
try
{
- new JournalImpl(JournalImpl.MIN_FILE_SIZE - 1, 10, true, true, false, fileFactory, filePrefix, fileExtension, 1);
+ new JournalImpl(JournalImpl.MIN_FILE_SIZE - 1, 10, fileFactory, filePrefix, fileExtension, 1);
fail("Should throw exception");
}
@@ -140,7 +140,7 @@
try
{
- new JournalImpl(10 * 1024, 1, true, true, false, fileFactory, filePrefix, fileExtension, 1);
+ new JournalImpl(10 * 1024, 1, fileFactory, filePrefix, fileExtension, 1);
fail("Should throw exception");
}
@@ -151,7 +151,7 @@
try
{
- new JournalImpl(10 * 1024, 10, true, true, false, null, filePrefix, fileExtension, 1);
+ new JournalImpl(10 * 1024, 10, null, filePrefix, fileExtension, 1);
fail("Should throw exception");
}
@@ -162,7 +162,7 @@
try
{
- new JournalImpl(10 * 1024, 10, true, true, false, fileFactory, null, fileExtension, 1);
+ new JournalImpl(10 * 1024, 10, fileFactory, null, fileExtension, 1);
fail("Should throw exception");
}
@@ -173,7 +173,7 @@
try
{
- new JournalImpl(10 * 1024, 10, true, true, false, fileFactory, filePrefix, null, 1);
+ new JournalImpl(10 * 1024, 10, fileFactory, filePrefix, null, 1);
fail("Should throw exception");
}
@@ -184,7 +184,7 @@
try
{
- new JournalImpl(10 * 1024, 10, true, true, false, fileFactory, filePrefix, null, 0);
+ new JournalImpl(10 * 1024, 10, fileFactory, filePrefix, null, 0);
fail("Should throw exception");
}
@@ -2260,7 +2260,7 @@
{
byte[] record = generateRecord(10 + (int)(1500 * Math.random()));
- journal.appendAddRecord(i, (byte)0, record);
+ journal.appendAddRecord(i, (byte)0, record, false);
records.add(new RecordInfo(i, (byte)0, record, false));
}
@@ -2269,14 +2269,14 @@
{
byte[] record = generateRecord(10 + (int)(1024 * Math.random()));
- journal.appendUpdateRecord(i, (byte)0, record);
+ journal.appendUpdateRecord(i, (byte)0, record, false);
records.add(new RecordInfo(i, (byte)0, record, true));
}
for (int i = 0; i < 100; i++)
{
- journal.appendDeleteRecord(i);
+ journal.appendDeleteRecord(i, false);
removeRecordsForID(i);
}
Modified: trunk/tests/src/org/jboss/messaging/tests/util/JournalExample.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/util/JournalExample.java 2009-06-03 13:22:52 UTC (rev 7180)
+++ trunk/tests/src/org/jboss/messaging/tests/util/JournalExample.java 2009-06-03 14:06:16 UTC (rev 7181)
@@ -1,4 +1,5 @@
package org.jboss.messaging.tests.util;
+
import java.util.ArrayList;
import org.jboss.messaging.core.journal.PreparedTransactionInfo;
@@ -61,25 +62,21 @@
// Inner classes -------------------------------------------------
-
public static void main(String arg[])
{
TimeAndCounterIDGenerator idgenerator = new TimeAndCounterIDGenerator();
try
{
SequentialFileFactory fileFactory = new AIOSequentialFileFactory("/tmp"); // any dir you want
- //SequentialFileFactory fileFactory = new NIOSequentialFileFactory("/tmp"); // any dir you want
- JournalImpl journalExample = new JournalImpl(
- 10 * 1024 * 1024, // 10M.. we believe that's the usual cilinder bufferSize.. not an exact science here
+ // SequentialFileFactory fileFactory = new NIOSequentialFileFactory("/tmp"); // any dir you want
+ JournalImpl journalExample = new JournalImpl(10 * 1024 * 1024, // 10M.. we believe that's the usual cilinder
+ // bufferSize.. not an exact science here
2, // number of files pre-allocated
- true, // sync on commit
- false, // no sync on non transactional
- false, // if aio, flush on sync
fileFactory, // AIO or NIO
"exjournal", // file name
"dat", // extension
- 10000); // it's like a semaphore for callback on the AIO layer
-
+ 10000); // it's like a semaphore for callback on the AIO layer
+
ArrayList<RecordInfo> committedRecords = new ArrayList<RecordInfo>();
ArrayList<PreparedTransactionInfo> preparedTransactions = new ArrayList<PreparedTransactionInfo>();
journalExample.start();
@@ -87,31 +84,77 @@
journalExample.load(committedRecords, preparedTransactions);
System.out.println("Loaded Record List:");
-
- for (RecordInfo record: committedRecords)
+
+ for (RecordInfo record : committedRecords)
{
- System.out.println("Record id = " + record.id + " userType = " + record.userRecordType + " with " + record.data.length + " bytes is stored on the journal");
+ System.out.println("Record id = " + record.id +
+ " userType = " +
+ record.userRecordType +
+ " with " +
+ record.data.length +
+ " bytes is stored on the journal");
}
System.out.println("Adding Records:");
-
- for (int i = 0 ; i < 10; i++)
+
+ for (int i = 0; i < 10; i++)
{
- journalExample.appendAddRecord(idgenerator.generateID(), (byte)1, new byte[] { 0, 1, 2, 0, 1, 2, 0, 1, 2, 0, 1, 2, 0, 1, 2, 0, 1, 2, 0, 1, 2} );
+ journalExample.appendAddRecord(idgenerator.generateID(), (byte)1, new byte[] { 0,
+ 1,
+ 2,
+ 0,
+ 1,
+ 2,
+ 0,
+ 1,
+ 2,
+ 0,
+ 1,
+ 2,
+ 0,
+ 1,
+ 2,
+ 0,
+ 1,
+ 2,
+ 0,
+ 1,
+ 2 }, false);
}
-
+
long tx = idgenerator.generateID(); // some id generation system
-
- for (int i = 0 ; i < 100; i++)
+
+ for (int i = 0; i < 100; i++)
{
- journalExample.appendAddRecordTransactional(tx, idgenerator.generateID(), (byte)2, new byte[] { 0, 1, 2, 0, 1, 2, 0, 1, 2, 0, 1, 2, 0, 1, 2, 0, 1, 2, 0, 1, 2, 5});
+ journalExample.appendAddRecordTransactional(tx, idgenerator.generateID(), (byte)2, new byte[] { 0,
+ 1,
+ 2,
+ 0,
+ 1,
+ 2,
+ 0,
+ 1,
+ 2,
+ 0,
+ 1,
+ 2,
+ 0,
+ 1,
+ 2,
+ 0,
+ 1,
+ 2,
+ 0,
+ 1,
+ 2,
+ 5 }, true);
}
-
+
// After this is complete, you're sure the records are there
- journalExample.appendCommitRecord(tx);
+ journalExample.appendCommitRecord(tx, true);
System.out.println("Done!");
-
+
journalExample.stop();
}
catch (Exception e)
@@ -119,5 +162,5 @@
e.printStackTrace();
}
}
-
+
}
Modified: trunk/tests/src/org/jboss/messaging/tests/util/ListJournal.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/util/ListJournal.java 2009-06-03 13:22:52 UTC (rev 7180)
+++ trunk/tests/src/org/jboss/messaging/tests/util/ListJournal.java 2009-06-03 14:06:16 UTC (rev 7181)
@@ -20,7 +20,6 @@
* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
*/
-
package org.jboss.messaging.tests.util;
import java.util.ArrayList;
@@ -61,27 +60,21 @@
try
{
FileConfiguration fileConf = new FileConfiguration();
-
- //fileConf.setConfigurationUrl(arg[0]);
-
+
+ // fileConf.setConfigurationUrl(arg[0]);
+
fileConf.start();
-
-
-
+
JournalImpl journal = new JournalImpl(fileConf.getJournalFileSize(),
- fileConf.getJournalMinFiles(),
- true,
- true,
- false,
- new NIOSequentialFileFactory(fileConf.getJournalDirectory()),
- "jbm-data",
- "jbm",
- fileConf.getJournalMaxAIO());
-
-
+ fileConf.getJournalMinFiles(),
+ new NIOSequentialFileFactory(fileConf.getJournalDirectory()),
+ "jbm-data",
+ "jbm",
+ fileConf.getJournalMaxAIO());
+
ArrayList<RecordInfo> records = new ArrayList<RecordInfo>();
ArrayList<PreparedTransactionInfo> prepared = new ArrayList<PreparedTransactionInfo>();
-
+
journal.start();
journal.load(records, prepared);
@@ -89,23 +82,20 @@
{
System.out.println("There are " + prepared.size() + " prepared transactions on the journal");
}
-
-
+
System.out.println("Total of " + records.size() + " committed records");
- for (RecordInfo record: records)
+ for (RecordInfo record : records)
{
System.out.println("user record: " + record);
}
-
+
journal.checkAndReclaimFiles();
-
+
System.out.println("Data = " + journal.debug());
-
+
journal.stop();
-
-
-
+
}
catch (Exception e)
{
@@ -113,7 +103,7 @@
}
}
-
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
More information about the jboss-cvs-commits
mailing list