Author: timfox
Date: 2009-12-01 15:18:47 -0500 (Tue, 01 Dec 2009)
New Revision: 8483
Added:
trunk/src/main/org/hornetq/core/journal/impl/AbstractSequentialFileFactory.java
Removed:
trunk/src/main/org/hornetq/core/journal/impl/AbstractSequentialFactory.java
Modified:
trunk/build-hornetq.xml
trunk/src/config/common/schema/hornetq-configuration.xsd
trunk/src/main/org/hornetq/core/asyncio/impl/AsynchronousFileImpl.java
trunk/src/main/org/hornetq/core/config/Configuration.java
trunk/src/main/org/hornetq/core/config/impl/ConfigurationImpl.java
trunk/src/main/org/hornetq/core/config/impl/FileConfiguration.java
trunk/src/main/org/hornetq/core/journal/IOCompletion.java
trunk/src/main/org/hornetq/core/journal/Journal.java
trunk/src/main/org/hornetq/core/journal/SequentialFile.java
trunk/src/main/org/hornetq/core/journal/SequentialFileFactory.java
trunk/src/main/org/hornetq/core/journal/impl/AIOSequentialFile.java
trunk/src/main/org/hornetq/core/journal/impl/AIOSequentialFileFactory.java
trunk/src/main/org/hornetq/core/journal/impl/AbstractSequentialFile.java
trunk/src/main/org/hornetq/core/journal/impl/DummyCallback.java
trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java
trunk/src/main/org/hornetq/core/journal/impl/NIOSequentialFile.java
trunk/src/main/org/hornetq/core/journal/impl/NIOSequentialFileFactory.java
trunk/src/main/org/hornetq/core/journal/impl/SimpleWaitIOCallback.java
trunk/src/main/org/hornetq/core/journal/impl/SyncSpeedTest.java
trunk/src/main/org/hornetq/core/journal/impl/TimedBuffer.java
trunk/src/main/org/hornetq/core/management/HornetQServerControl.java
trunk/src/main/org/hornetq/core/management/impl/HornetQServerControlImpl.java
trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
trunk/src/main/org/hornetq/core/persistence/impl/journal/OperationContextImpl.java
trunk/src/main/org/hornetq/core/replication/impl/ReplicatedJournal.java
trunk/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java
trunk/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java
trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
trunk/src/main/org/hornetq/utils/TokenBucketLimiterImpl.java
trunk/tests/config/ConfigurationTest-full-config.xml
trunk/tests/src/org/hornetq/tests/integration/client/BlockingSendTest.java
trunk/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
trunk/tests/src/org/hornetq/tests/integration/cluster/failover/GroupingFailoverReplicationTest.java
trunk/tests/src/org/hornetq/tests/integration/cluster/failover/GroupingFailoverSharedServerTest.java
trunk/tests/src/org/hornetq/tests/integration/jms/cluster/JMSFailoverTest.java
trunk/tests/src/org/hornetq/tests/integration/journal/AIOJournalCompactTest.java
trunk/tests/src/org/hornetq/tests/integration/journal/AIOJournalImplTest.java
trunk/tests/src/org/hornetq/tests/integration/journal/ValidateTransactionHealthTest.java
trunk/tests/src/org/hornetq/tests/integration/management/HornetQServerControlTest.java
trunk/tests/src/org/hornetq/tests/integration/management/HornetQServerControlUsingCoreTest.java
trunk/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java
trunk/tests/src/org/hornetq/tests/opt/SendTest.java
trunk/tests/src/org/hornetq/tests/unit/core/config/impl/ConfigurationImplTest.java
trunk/tests/src/org/hornetq/tests/unit/core/config/impl/DefaultsFileConfigurationTest.java
trunk/tests/src/org/hornetq/tests/unit/core/config/impl/FileConfigurationTest.java
trunk/tests/src/org/hornetq/tests/unit/core/deployers/impl/QueueDeployerTest.java
trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/TimedBufferTest.java
trunk/tests/src/org/hornetq/tests/util/ListJournal.java
Log:
Mainly tuning of journal/timed buffer and various other small changes
Modified: trunk/build-hornetq.xml
===================================================================
--- trunk/build-hornetq.xml 2009-12-01 19:40:40 UTC (rev 8482)
+++ trunk/build-hornetq.xml 2009-12-01 20:18:47 UTC (rev 8483)
@@ -1226,7 +1226,7 @@
<fileset dir="${test.classes.dir}">
<!-- exlcuded because of
https://jira.jboss.org/jira/browse/HORNETQ-65
-->
<exclude
name="**/cluster/failover/*StaticClusterWithBackupFailoverTest.class" />
- <exclude
name="**/cluster/failover/*WithBackupFailoverTest.class" />
+ <exclude
name="**/cluster/failover/*WithBackupFailoverTest.class" />
<include name="${tests.param}"/>
</fileset>
</batchtest>
Modified: trunk/src/config/common/schema/hornetq-configuration.xsd
===================================================================
--- trunk/src/config/common/schema/hornetq-configuration.xsd 2009-12-01 19:40:40 UTC (rev
8482)
+++ trunk/src/config/common/schema/hornetq-configuration.xsd 2009-12-01 20:18:47 UTC (rev
8483)
@@ -151,9 +151,7 @@
<xsd:element maxOccurs="1" minOccurs="0"
name="journal-buffer-timeout" type="xsd:long">
</xsd:element>
<xsd:element maxOccurs="1" minOccurs="0"
name="journal-buffer-size" type="xsd:long">
- </xsd:element>
- <xsd:element maxOccurs="1" minOccurs="0"
name="journal-flush-on-sync" type="xsd:boolean">
- </xsd:element>
+ </xsd:element>
<xsd:element maxOccurs="1" minOccurs="0"
name="journal-sync-transactional" type="xsd:boolean">
</xsd:element>
<xsd:element maxOccurs="1" minOccurs="0"
name="journal-sync-non-transactional" type="xsd:boolean">
Modified: trunk/src/main/org/hornetq/core/asyncio/impl/AsynchronousFileImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/asyncio/impl/AsynchronousFileImpl.java 2009-12-01
19:40:40 UTC (rev 8482)
+++ trunk/src/main/org/hornetq/core/asyncio/impl/AsynchronousFileImpl.java 2009-12-01
20:18:47 UTC (rev 8483)
@@ -290,12 +290,12 @@
if (writeExecutor != null)
{
+ maxIOSemaphore.acquireUninterruptibly();
+
writeExecutor.execute(new Runnable()
{
public void run()
{
- maxIOSemaphore.acquireUninterruptibly();
-
long sequence = nextWritingSequence.getAndIncrement();
try
@@ -445,7 +445,7 @@
private void callbackDone(final AIOCallback callback, final long sequence, final
ByteBuffer buffer)
{
maxIOSemaphore.release();
-
+
pendingWrites.down();
callbackLock.lock();
Modified: trunk/src/main/org/hornetq/core/config/Configuration.java
===================================================================
--- trunk/src/main/org/hornetq/core/config/Configuration.java 2009-12-01 19:40:40 UTC (rev
8482)
+++ trunk/src/main/org/hornetq/core/config/Configuration.java 2009-12-01 20:18:47 UTC (rev
8483)
@@ -224,22 +224,34 @@
void setJournalMinFiles(int files);
- int getJournalMaxAIO();
+ //AIO and NIO need different values for these params
+
+ int getJournalMaxIO_AIO();
- void setJournalMaxAIO(int maxAIO);
+ void setJournalMaxIO_AIO(int journalMaxIO);
- void setJournalBufferSize(int size);
+ int getJournalBufferTimeout_AIO();
+
+ void setJournalBufferTimeout_AIO(int journalBufferTimeout);
+
+ int getJournalBufferSize_AIO();
+
+ void setJournalBufferSize_AIO(int journalBufferSize);
- int getJournalBufferSize();
- void setJournalBufferTimeout(int timeout);
+ int getJournalMaxIO_NIO();
+
+ void setJournalMaxIO_NIO(int journalMaxIO);
+
+ int getJournalBufferTimeout_NIO();
+
+ void setJournalBufferTimeout_NIO(int journalBufferTimeout);
+
+ int getJournalBufferSize_NIO();
+
+ void setJournalBufferSize_NIO(int journalBufferSize);
- int getJournalBufferTimeout();
-
- void setJournalFlushOnSync(boolean flush);
-
- boolean isJournalFlushOnSync();
-
+
boolean isCreateBindingsDir();
void setCreateBindingsDir(boolean create);
Modified: trunk/src/main/org/hornetq/core/config/impl/ConfigurationImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/config/impl/ConfigurationImpl.java 2009-12-01 19:40:40
UTC (rev 8482)
+++ trunk/src/main/org/hornetq/core/config/impl/ConfigurationImpl.java 2009-12-01 20:18:47
UTC (rev 8483)
@@ -100,14 +100,22 @@
public static final int DEFAULT_JOURNAL_MIN_FILES = 2;
- public static final int DEFAULT_JOURNAL_MAX_AIO = 500;
+ // AIO and NIO need to have different defaults for some values
- public static final boolean DEFAULT_JOURNAL_FLUSH_SYNC = false;
+ public static final int DEFAULT_JOURNAL_MAX_IO_AIO = 500;
- public static final int DEFAULT_JOURNAL_BUFFER_TIMEOUT = 20000;
+ public static final int DEFAULT_JOURNAL_BUFFER_TIMEOUT_AIO = (int)(1000000000d /
2000);
- public static final int DEFAULT_JOURNAL_BUFFER_SIZE = 128 * 1024;
+ public static final int DEFAULT_JOURNAL_BUFFER_SIZE_AIO = 490 * 1024;
+ public static final int DEFAULT_JOURNAL_MAX_IO_NIO = 1;
+
+ public static final int DEFAULT_JOURNAL_BUFFER_TIMEOUT_NIO = (int)(1000000000d /
300);
+
+ public static final int DEFAULT_JOURNAL_BUFFER_SIZE_NIO = 490 * 1024;
+
+
+
public static final boolean DEFAULT_JOURNAL_LOG_WRITE_RATE = false;
public static final int DEFAULT_JOURNAL_PERF_BLAST_PAGES = -1;
@@ -266,14 +274,23 @@
protected int journalMinFiles = DEFAULT_JOURNAL_MIN_FILES;
- protected int journalMaxAIO = DEFAULT_JOURNAL_MAX_AIO;
+
+ //AIO and NIO need different values for these attributes
+
+ protected int journalMaxIO_AIO = DEFAULT_JOURNAL_MAX_IO_AIO;
- protected boolean journalFlushSync = DEFAULT_JOURNAL_FLUSH_SYNC;
+ protected int journalBufferTimeout_AIO = DEFAULT_JOURNAL_BUFFER_TIMEOUT_AIO;
- protected int journalBufferTimeout = DEFAULT_JOURNAL_BUFFER_TIMEOUT;
+ protected int journalBufferSize_AIO = DEFAULT_JOURNAL_BUFFER_SIZE_AIO;
+
+ protected int journalMaxIO_NIO = DEFAULT_JOURNAL_MAX_IO_NIO;
- protected int journalBufferSize = DEFAULT_JOURNAL_BUFFER_SIZE;
+ protected int journalBufferTimeout_NIO = DEFAULT_JOURNAL_BUFFER_TIMEOUT_NIO;
+ protected int journalBufferSize_NIO = DEFAULT_JOURNAL_BUFFER_SIZE_NIO;
+
+
+
protected boolean logJournalWriteRate = DEFAULT_JOURNAL_LOG_WRITE_RATE;
protected int journalPerfBlastPages = DEFAULT_JOURNAL_PERF_BLAST_PAGES;
@@ -665,16 +682,6 @@
journalFileSize = size;
}
- public int getJournalMaxAIO()
- {
- return journalMaxAIO;
- }
-
- public void setJournalMaxAIO(final int maxAIO)
- {
- journalMaxAIO = maxAIO;
- }
-
public int getJournalMinFiles()
{
return journalMinFiles;
@@ -815,36 +822,6 @@
jmxDomain = domain;
}
- public void setJournalBufferTimeout(int timeout)
- {
- this.journalBufferTimeout = timeout;
- }
-
- public int getJournalBufferTimeout()
- {
- return journalBufferTimeout;
- }
-
- public void setJournalFlushOnSync(boolean flush)
- {
- journalFlushSync = flush;
- }
-
- public boolean isJournalFlushOnSync()
- {
- return journalFlushSync;
- }
-
- public int getJournalBufferSize()
- {
- return journalBufferSize;
- }
-
- public void setJournalBufferSize(int size)
- {
- this.journalBufferSize = size;
- }
-
public String getLargeMessagesDirectory()
{
return largeMessagesDirectory;
@@ -930,6 +907,131 @@
this.managementRequestTimeout = managementRequestTimeout;
}
+ public int getJournalCompactMinFiles()
+ {
+ return journalCompactMinFiles;
+ }
+
+ public int getJournalCompactPercentage()
+ {
+ return journalCompactPercentage;
+ }
+
+ public void setJournalCompactMinFiles(int minFiles)
+ {
+ this.journalCompactMinFiles = minFiles;
+ }
+
+ public void setJournalCompactPercentage(int percentage)
+ {
+ this.journalCompactPercentage = percentage;
+ }
+
+ public long getServerDumpInterval()
+ {
+ return serverDumpInterval;
+ }
+
+ public void setServerDumpInterval(long intervalInMilliseconds)
+ {
+ this.serverDumpInterval = intervalInMilliseconds;
+ }
+
+ public int getMemoryWarningThreshold()
+ {
+ return memoryWarningThreshold;
+ }
+
+ public void setMemoryWarningThreshold(int memoryWarningThreshold)
+ {
+ this.memoryWarningThreshold = memoryWarningThreshold;
+ }
+
+ public long getMemoryMeasureInterval()
+ {
+ return memoryMeasureInterval;
+ }
+
+ public void setMemoryMeasureInterval(long memoryMeasureInterval)
+ {
+ this.memoryMeasureInterval = memoryMeasureInterval;
+ }
+
+ public String getLogDelegateFactoryClassName()
+ {
+ return logDelegateFactoryClassName;
+ }
+
+ public void setLogDelegateFactoryClassName(String className)
+ {
+ this.logDelegateFactoryClassName = className;
+ }
+
+
+
+ public int getJournalMaxIO_AIO()
+ {
+ return journalMaxIO_AIO;
+ }
+
+ public void setJournalMaxIO_AIO(int journalMaxIO)
+ {
+ this.journalMaxIO_AIO = journalMaxIO;
+ }
+
+ public int getJournalBufferTimeout_AIO()
+ {
+ return journalBufferTimeout_AIO;
+ }
+
+ public void setJournalBufferTimeout_AIO(int journalBufferTimeout)
+ {
+ this.journalBufferTimeout_AIO = journalBufferTimeout;
+ }
+
+ public int getJournalBufferSize_AIO()
+ {
+ return journalBufferSize_AIO;
+ }
+
+ public void setJournalBufferSize_AIO(int journalBufferSize)
+ {
+ this.journalBufferSize_AIO = journalBufferSize;
+ }
+
+
+ public int getJournalMaxIO_NIO()
+ {
+ return journalMaxIO_NIO;
+ }
+
+ public void setJournalMaxIO_NIO(int journalMaxIO)
+ {
+ this.journalMaxIO_NIO = journalMaxIO;
+ }
+
+ public int getJournalBufferTimeout_NIO()
+ {
+ return journalBufferTimeout_NIO;
+ }
+
+ public void setJournalBufferTimeout_NIO(int journalBufferTimeout)
+ {
+ this.journalBufferTimeout_NIO = journalBufferTimeout;
+ }
+
+ public int getJournalBufferSize_NIO()
+ {
+ return journalBufferSize_NIO;
+ }
+
+ public void setJournalBufferSize_NIO(int journalBufferSize)
+ {
+ this.journalBufferSize_NIO = journalBufferSize;
+ }
+
+
+
@Override
public boolean equals(Object obj)
{
@@ -965,7 +1067,7 @@
{
return false;
}
-
+
if (clustered != other.clustered)
return false;
if (connectionTTLOverride != other.connectionTTLOverride)
@@ -983,12 +1085,18 @@
return false;
if (jmxManagementEnabled != other.jmxManagementEnabled)
return false;
- if (journalBufferSize != other.journalBufferSize)
+ if (this.journalBufferSize_AIO != other.journalBufferSize_AIO)
return false;
- if (journalBufferTimeout != other.journalBufferTimeout)
+ if (journalBufferTimeout_AIO != other.journalBufferTimeout_AIO)
return false;
- if (journalFlushSync != other.journalFlushSync)
+ if (journalMaxIO_AIO != other.journalMaxIO_AIO)
+ return false;
+ if (this.journalBufferSize_NIO != other.journalBufferSize_NIO)
return false;
+ if (journalBufferTimeout_NIO != other.journalBufferTimeout_NIO)
+ return false;
+ if (journalMaxIO_NIO != other.journalMaxIO_NIO)
+ return false;
if (journalCompactMinFiles != other.journalCompactMinFiles)
return false;
if (journalCompactPercentage != other.journalCompactPercentage)
@@ -1002,8 +1110,7 @@
return false;
if (journalFileSize != other.journalFileSize)
return false;
- if (journalMaxAIO != other.journalMaxAIO)
- return false;
+
if (journalMinFiles != other.journalMinFiles)
return false;
if (journalPerfBlastPages != other.journalPerfBlastPages)
@@ -1100,64 +1207,4 @@
return true;
}
- public int getJournalCompactMinFiles()
- {
- return journalCompactMinFiles;
- }
-
- public int getJournalCompactPercentage()
- {
- return journalCompactPercentage;
- }
-
- public void setJournalCompactMinFiles(int minFiles)
- {
- this.journalCompactMinFiles = minFiles;
- }
-
- public void setJournalCompactPercentage(int percentage)
- {
- this.journalCompactPercentage = percentage;
- }
-
- public long getServerDumpInterval()
- {
- return serverDumpInterval;
- }
-
- public void setServerDumpInterval(long intervalInMilliseconds)
- {
- this.serverDumpInterval = intervalInMilliseconds;
- }
-
- public int getMemoryWarningThreshold()
- {
- return memoryWarningThreshold;
- }
-
- public void setMemoryWarningThreshold(int memoryWarningThreshold)
- {
- this.memoryWarningThreshold = memoryWarningThreshold;
- }
-
- public long getMemoryMeasureInterval()
- {
- return memoryMeasureInterval;
- }
-
- public void setMemoryMeasureInterval(long memoryMeasureInterval)
- {
- this.memoryMeasureInterval = memoryMeasureInterval;
- }
-
- public String getLogDelegateFactoryClassName()
- {
- return logDelegateFactoryClassName;
- }
-
- public void setLogDelegateFactoryClassName(String className)
- {
- this.logDelegateFactoryClassName = className;
- }
-
}
Modified: trunk/src/main/org/hornetq/core/config/impl/FileConfiguration.java
===================================================================
--- trunk/src/main/org/hornetq/core/config/impl/FileConfiguration.java 2009-12-01 19:40:40
UTC (rev 8482)
+++ trunk/src/main/org/hornetq/core/config/impl/FileConfiguration.java 2009-12-01 20:18:47
UTC (rev 8483)
@@ -73,9 +73,9 @@
private static final String DEFAULT_CONFIGURATION_URL =
"hornetq-configuration.xml";
private static final String CONFIGURATION_SCHEMA_URL =
"schema/hornetq-configuration.xsd";
-
- //For a bridge confirmations must be activated or send acknowledgements won't
return
-
+
+ // For a bridge confirmations must be activated or send acknowledgements won't
return
+
public static final int DEFAULT_CONFIRMATION_WINDOW_SIZE = 1024 * 1024;
// Static --------------------------------------------------------------------------
@@ -335,11 +335,36 @@
journalFileSize = getInteger(e, "journal-file-size", journalFileSize,
GT_ZERO);
- journalFlushSync = getBoolean(e, "journal-flush-on-sync",
DEFAULT_JOURNAL_FLUSH_SYNC);
+ int journalBufferTimeout = getInteger(e,
+ "journal-buffer-timeout",
+ journalType == JournalType.ASYNCIO ?
DEFAULT_JOURNAL_BUFFER_TIMEOUT_AIO
+ :
DEFAULT_JOURNAL_BUFFER_TIMEOUT_NIO,
+ GT_ZERO);
- journalBufferTimeout = getInteger(e, "journal-buffer-timeout",
DEFAULT_JOURNAL_BUFFER_TIMEOUT, GT_ZERO);
+ int journalBufferSize = getInteger(e,
+ "journal-buffer-size",
+ journalType == JournalType.ASYNCIO ?
DEFAULT_JOURNAL_BUFFER_SIZE_AIO
+ :
DEFAULT_JOURNAL_BUFFER_SIZE_NIO,
+ GT_ZERO);
- journalBufferSize = getInteger(e, "journal-buffer-size",
DEFAULT_JOURNAL_BUFFER_SIZE, GT_ZERO);
+ int journalMaxIO = getInteger(e,
+ "journal-max-aio",
+ journalType == JournalType.ASYNCIO ?
DEFAULT_JOURNAL_MAX_IO_AIO
+ :
DEFAULT_JOURNAL_MAX_IO_NIO,
+ GT_ZERO);
+
+ if (journalType == JournalType.ASYNCIO)
+ {
+ journalBufferTimeout_AIO = journalBufferTimeout;
+ journalBufferSize_AIO = journalBufferSize;
+ journalMaxIO_AIO = journalMaxIO;
+ }
+ else
+ {
+ journalBufferTimeout_NIO = journalBufferTimeout;
+ journalBufferSize_NIO = journalBufferSize;
+ journalMaxIO_NIO = journalMaxIO;
+ }
journalMinFiles = getInteger(e, "journal-min-files", journalMinFiles,
GT_ZERO);
@@ -347,8 +372,6 @@
journalCompactPercentage = getInteger(e, "journal-compact-percentage",
journalCompactPercentage, PERCENTAGE);
- journalMaxAIO = getInteger(e, "journal-max-aio", journalMaxAIO,
GT_ZERO);
-
logJournalWriteRate = getBoolean(e, "log-journal-write-rate",
DEFAULT_JOURNAL_LOG_WRITE_RATE);
journalPerfBlastPages = getInteger(e, "perf-blast-pages",
DEFAULT_JOURNAL_PERF_BLAST_PAGES, MINUS_ONE_OR_GT_ZERO);
@@ -367,12 +390,12 @@
GT_ZERO);
serverDumpInterval = getLong(e, "server-dump-interval",
serverDumpInterval, MINUS_ONE_OR_GT_ZERO); // in
-
// milliseconds
+ // milliseconds
memoryWarningThreshold = getInteger(e, "memory-warning-threshold",
memoryWarningThreshold, PERCENTAGE);
memoryMeasureInterval = getLong(e, "memory-measure-interval",
memoryMeasureInterval, MINUS_ONE_OR_GT_ZERO); // in
-
// milliseconds
+ // milliseconds
backupWindowSize = getInteger(e, "backup-window-size",
DEFAULT_BACKUP_WINDOW_SIZE, MINUS_ONE_OR_GT_ZERO);
@@ -523,7 +546,7 @@
long retryInterval = getLong(e, "retry-interval",
DEFAULT_CLUSTER_RETRY_INTERVAL, GT_ZERO);
int confirmationWindowSize = getInteger(e, "confirmation-window-size",
DEFAULT_CONFIRMATION_WINDOW_SIZE, GT_ZERO);
-
+
String discoveryGroupName = null;
List<Pair<String, String>> connectorPairs = new
ArrayList<Pair<String, String>>();
@@ -609,10 +632,13 @@
String transformerClassName = getString(brNode, "transformer-class-name",
null, NO_CHECK);
long retryInterval = getLong(brNode, "retry-interval",
DEFAULT_RETRY_INTERVAL, GT_ZERO);
-
- //Default bridge conf
- int confirmationWindowSize = getInteger(brNode,
"confirmation-window-size", DEFAULT_CONFIRMATION_WINDOW_SIZE, GT_ZERO);
-
+
+ // Default bridge conf
+ int confirmationWindowSize = getInteger(brNode,
+ "confirmation-window-size",
+ DEFAULT_CONFIRMATION_WINDOW_SIZE,
+ GT_ZERO);
+
double retryIntervalMultiplier = getDouble(brNode,
"retry-interval-multiplier",
DEFAULT_RETRY_INTERVAL_MULTIPLIER,
Modified: trunk/src/main/org/hornetq/core/journal/IOCompletion.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/IOCompletion.java 2009-12-01 19:40:40 UTC (rev
8482)
+++ trunk/src/main/org/hornetq/core/journal/IOCompletion.java 2009-12-01 20:18:47 UTC (rev
8483)
@@ -22,5 +22,5 @@
*/
public interface IOCompletion extends IOAsyncTask
{
- void lineUp();
+ void storeLineUp();
}
Modified: trunk/src/main/org/hornetq/core/journal/Journal.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/Journal.java 2009-12-01 19:40:40 UTC (rev
8482)
+++ trunk/src/main/org/hornetq/core/journal/Journal.java 2009-12-01 20:18:47 UTC (rev
8483)
@@ -112,5 +112,6 @@
void perfBlast(int pages) throws Exception;
+ void runDirectJournalBlast() throws Exception;
}
Modified: trunk/src/main/org/hornetq/core/journal/SequentialFile.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/SequentialFile.java 2009-12-01 19:40:40 UTC
(rev 8482)
+++ trunk/src/main/org/hornetq/core/journal/SequentialFile.java 2009-12-01 20:18:47 UTC
(rev 8483)
@@ -88,10 +88,6 @@
void renameTo(String newFileName) throws Exception;
- void disableAutoFlush();
-
- void enableAutoFlush();
-
SequentialFile copy();
void setTimedBuffer(TimedBuffer buffer);
Modified: trunk/src/main/org/hornetq/core/journal/SequentialFileFactory.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/SequentialFileFactory.java 2009-12-01 19:40:40
UTC (rev 8482)
+++ trunk/src/main/org/hornetq/core/journal/SequentialFileFactory.java 2009-12-01 20:18:47
UTC (rev 8483)
@@ -36,9 +36,6 @@
void releaseBuffer(ByteBuffer buffer);
- /** The factory may need to do some initialization before the file is activated.
- * this was added as a hook for AIO to initialize the Observer on TimedBuffer.
- * It could be eventually done the same on NIO if we implement TimedBuffer on NIO */
void activateBuffer(SequentialFile file);
void deactivateBuffer();
@@ -61,7 +58,5 @@
*/
void createDirs() throws Exception;
- void flush();
-
-
+ void flush();
}
Modified: trunk/src/main/org/hornetq/core/journal/impl/AIOSequentialFile.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/AIOSequentialFile.java 2009-12-01
19:40:40 UTC (rev 8482)
+++ trunk/src/main/org/hornetq/core/journal/impl/AIOSequentialFile.java 2009-12-01
20:18:47 UTC (rev 8483)
@@ -46,7 +46,6 @@
/** The pool for Thread pollers */
private final Executor pollerExecutor;
-
public AIOSequentialFile(final SequentialFileFactory factory,
final int bufferSize,
@@ -87,18 +86,27 @@
public SequentialFile copy()
{
- return new AIOSequentialFile(factory, -1, -1, getFile().getParent(), getFileName(),
maxIO, bufferCallback, writerExecutor, pollerExecutor);
+ return new AIOSequentialFile(factory,
+ -1,
+ -1,
+ getFile().getParent(),
+ getFileName(),
+ maxIO,
+ bufferCallback,
+ writerExecutor,
+ pollerExecutor);
}
+ @Override
public synchronized void close() throws Exception
{
if (!opened)
{
return;
}
-
+
super.close();
-
+
opened = false;
timedBuffer = null;
@@ -106,7 +114,7 @@
aioFile.close();
aioFile = null;
- this.notifyAll();
+ notifyAll();
}
/* (non-Javadoc)
@@ -165,7 +173,7 @@
aioFile.fill(filePosition, blocks, blockSize, fillCharacter);
- this.fileSize = aioFile.size();
+ fileSize = aioFile.size();
}
public void open() throws Exception
@@ -176,16 +184,16 @@
public synchronized void open(final int maxIO) throws Exception
{
opened = true;
-
+
aioFile = new AsynchronousFileImpl(writerExecutor, pollerExecutor);
-
+
aioFile.open(getFile().getAbsolutePath(), maxIO);
-
+
position.set(0);
-
+
aioFile.setBufferCallback(bufferCallback);
-
- this.fileSize = aioFile.size();
+
+ fileSize = aioFile.size();
}
public void setBufferCallback(final BufferCallback callback)
@@ -216,7 +224,6 @@
return bytesRead;
}
-
public void sync() throws Exception
{
@@ -244,18 +251,14 @@
// Public methods
//
-----------------------------------------------------------------------------------------------------
- // Protected methods
- //
-----------------------------------------------------------------------------------------------------
-
-
public void writeDirect(final ByteBuffer bytes, final boolean sync) throws Exception
{
if (sync)
{
SimpleWaitIOCallback completion = new SimpleWaitIOCallback();
-
+
writeDirect(bytes, true, completion);
-
+
completion.waitCompletion();
}
else
@@ -264,12 +267,11 @@
}
}
-
/**
*
* @param sync Not used on AIO
* */
- public void writeDirect(final ByteBuffer bytes, final boolean sync, IOAsyncTask
callback)
+ public void writeDirect(final ByteBuffer bytes, final boolean sync, final IOAsyncTask
callback)
{
final int bytesToWrite = factory.calculateBlockSize(bytes.limit());
@@ -278,7 +280,20 @@
aioFile.write(positionToWrite, bytesToWrite, bytes, callback);
}
+ // Protected methods
+ //
-----------------------------------------------------------------------------------------------------
+ @Override
+ protected ByteBuffer newBuffer(int size, int limit)
+ {
+ size = factory.calculateBlockSize(size);
+ limit = factory.calculateBlockSize(limit);
+
+ ByteBuffer buffer = factory.newBuffer(size);
+ buffer.limit(limit);
+ return buffer;
+ }
+
// Private methods
//
-----------------------------------------------------------------------------------------------------
Modified: trunk/src/main/org/hornetq/core/journal/impl/AIOSequentialFileFactory.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/AIOSequentialFileFactory.java 2009-12-01
19:40:40 UTC (rev 8482)
+++ trunk/src/main/org/hornetq/core/journal/impl/AIOSequentialFileFactory.java 2009-12-01
20:18:47 UTC (rev 8483)
@@ -33,7 +33,7 @@
* @author clebert.suconic(a)jboss.com
*
*/
-public class AIOSequentialFileFactory extends AbstractSequentialFactory
+public class AIOSequentialFileFactory extends AbstractSequentialFileFactory
{
private static final Logger log = Logger.getLogger(AIOSequentialFileFactory.class);
@@ -55,19 +55,17 @@
public AIOSequentialFileFactory(final String journalDir)
{
this(journalDir,
- ConfigurationImpl.DEFAULT_JOURNAL_BUFFER_SIZE,
- ConfigurationImpl.DEFAULT_JOURNAL_BUFFER_TIMEOUT,
- ConfigurationImpl.DEFAULT_JOURNAL_FLUSH_SYNC,
+ ConfigurationImpl.DEFAULT_JOURNAL_BUFFER_SIZE_AIO,
+ ConfigurationImpl.DEFAULT_JOURNAL_BUFFER_TIMEOUT_AIO,
false);
}
public AIOSequentialFileFactory(final String journalDir,
final int bufferSize,
- final long bufferTimeout,
- final boolean flushOnSync,
+ final int bufferTimeout,
final boolean logRates)
{
- super(journalDir, true, bufferSize, bufferTimeout, flushOnSync, logRates);
+ super(journalDir, true, bufferSize, bufferTimeout, logRates);
}
public SequentialFile createSequentialFile(final String fileName, final int maxIO)
@@ -154,18 +152,21 @@
{
buffersControl.stop();
- pollerExecutor.shutdown();
+ if (pollerExecutor != null)
+ {
+ pollerExecutor.shutdown();
- try
- {
- if (!pollerExecutor.awaitTermination(EXECUTOR_TIMEOUT, TimeUnit.SECONDS))
+ try
{
- log.warn("Timed out on AIO poller shutdown", new
Exception("Timed out on AIO writer shutdown"));
+ if (!pollerExecutor.awaitTermination(EXECUTOR_TIMEOUT, TimeUnit.SECONDS))
+ {
+ log.warn("Timed out on AIO poller shutdown", new
Exception("Timed out on AIO writer shutdown"));
+ }
}
+ catch (InterruptedException e)
+ {
+ }
}
- catch (InterruptedException e)
- {
- }
super.stop();
}
Deleted: trunk/src/main/org/hornetq/core/journal/impl/AbstractSequentialFactory.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/AbstractSequentialFactory.java 2009-12-01
19:40:40 UTC (rev 8482)
+++ trunk/src/main/org/hornetq/core/journal/impl/AbstractSequentialFactory.java 2009-12-01
20:18:47 UTC (rev 8483)
@@ -1,200 +0,0 @@
-/*
- * Copyright 2009 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied. See the License for the specific language governing
- * permissions and limitations under the License.
- */
-
-package org.hornetq.core.journal.impl;
-
-import java.io.File;
-import java.io.FilenameFilter;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.Arrays;
-import java.util.List;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-
-import org.hornetq.core.journal.SequentialFile;
-import org.hornetq.core.journal.SequentialFileFactory;
-import org.hornetq.core.logging.Logger;
-import org.hornetq.utils.HornetQThreadFactory;
-
-/**
- *
- * An abstract SequentialFileFactory containing basic functionality for both AIO and NIO
SequentialFactories
- *
- * @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
- * @author <a href="mailto:clebert.suconic@jboss.com">Clebert
Suconic</a>
- *
- */
-public abstract class AbstractSequentialFactory implements SequentialFileFactory
-{
-
- // Timeout used to wait executors to shutdown
- protected static final int EXECUTOR_TIMEOUT = 60;
-
- private static final Logger log = Logger.getLogger(AbstractSequentialFactory.class);
-
- protected final String journalDir;
-
- protected final TimedBuffer timedBuffer;
-
- protected final int bufferSize;
-
- protected final long bufferTimeout;
-
- /**
- * Asynchronous writes need to be done at another executor.
- * This needs to be done at NIO, or else we would have the callers thread blocking for
the return.
- * At AIO this is necessary as context switches on writes would fire flushes at the
kernel.
- * */
- protected ExecutorService writeExecutor;
-
- public AbstractSequentialFactory(final String journalDir,
- final boolean buffered,
- final int bufferSize,
- final long bufferTimeout,
- final boolean flushOnSync,
- final boolean logRates)
- {
- this.journalDir = journalDir;
-
- if (buffered)
- {
- timedBuffer = new TimedBuffer(bufferSize, bufferTimeout, flushOnSync,
logRates);
- }
- else
- {
- timedBuffer = null;
- }
- this.bufferSize = bufferSize;
- this.bufferTimeout = bufferTimeout;
- }
-
- public void stop()
- {
- if (timedBuffer != null)
- {
- timedBuffer.stop();
- }
-
- if (isSupportsCallbacks())
- {
- writeExecutor.shutdown();
-
- try
- {
- if (!writeExecutor.awaitTermination(EXECUTOR_TIMEOUT, TimeUnit.SECONDS))
- {
- log.warn("Timed out on AIO writer shutdown", new
Exception("Timed out on AIO writer shutdown"));
- }
- }
- catch (InterruptedException e)
- {
- }
- }
-
-
- }
-
- public void start()
- {
- if (timedBuffer != null)
- {
- timedBuffer.start();
- }
-
- if (isSupportsCallbacks())
- {
- writeExecutor = Executors.newSingleThreadExecutor(new
HornetQThreadFactory("HornetQ-Asynchronous-Persistent-Writes" +
System.identityHashCode(this),
-
true));
- }
-
-
- }
-
- /* (non-Javadoc)
- * @see
org.hornetq.core.journal.SequentialFileFactory#activate(org.hornetq.core.journal.SequentialFile)
- */
- public void activateBuffer(final SequentialFile file)
- {
- if (timedBuffer != null)
- {
- timedBuffer.disableAutoFlush();
- try
- {
- file.setTimedBuffer(timedBuffer);
- }
- finally
- {
- file.enableAutoFlush();
- }
- }
- }
-
- public void flush()
- {
- if (timedBuffer != null)
- {
- timedBuffer.flush();
- }
- }
-
- public void deactivateBuffer()
- {
- if (timedBuffer != null)
- {
- timedBuffer.flush();
- timedBuffer.setObserver(null);
- }
- }
-
- public void releaseBuffer(ByteBuffer buffer)
- {
- }
-
- /**
- * Create the directory if it doesn't exist yet
- */
- public void createDirs() throws Exception
- {
- File file = new File(journalDir);
- boolean ok = file.mkdirs();
- if (!ok)
- {
- throw new IOException("Failed to create directory " + journalDir);
- }
- }
-
- public List<String> listFiles(final String extension) throws Exception
- {
- File dir = new File(journalDir);
-
- FilenameFilter fnf = new FilenameFilter()
- {
- public boolean accept(final File file, final String name)
- {
- return name.endsWith("." + extension);
- }
- };
-
- String[] fileNames = dir.list(fnf);
-
- if (fileNames == null)
- {
- throw new IOException("Failed to list: " + journalDir);
- }
-
- return Arrays.asList(fileNames);
- }
-
-}
Modified: trunk/src/main/org/hornetq/core/journal/impl/AbstractSequentialFile.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/AbstractSequentialFile.java 2009-12-01
19:40:40 UTC (rev 8482)
+++ trunk/src/main/org/hornetq/core/journal/impl/AbstractSequentialFile.java 2009-12-01
20:18:47 UTC (rev 8483)
@@ -21,9 +21,9 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
-import org.hornetq.core.journal.EncodingSupport;
import org.hornetq.core.buffers.HornetQBuffer;
import org.hornetq.core.buffers.HornetQBuffers;
+import org.hornetq.core.journal.EncodingSupport;
import org.hornetq.core.journal.IOAsyncTask;
import org.hornetq.core.journal.SequentialFile;
import org.hornetq.core.journal.SequentialFileFactory;
@@ -162,22 +162,6 @@
}
}
- public final void disableAutoFlush()
- {
- if (timedBuffer != null)
- {
- timedBuffer.disableAutoFlush();
- }
- }
-
- public final void enableAutoFlush()
- {
- if (timedBuffer != null)
- {
- timedBuffer.enableAutoFlush();
- }
- }
-
public void setTimedBuffer(final TimedBuffer buffer)
{
if (timedBuffer != null)
@@ -315,7 +299,17 @@
}
}
}
+
+ protected ByteBuffer newBuffer(int size, int limit)
+ {
+ size = factory.calculateBlockSize(size);
+ limit = factory.calculateBlockSize(limit);
+ ByteBuffer buffer = factory.newBuffer(size);
+ buffer.limit(limit);
+ return buffer;
+ }
+
protected class LocalBufferObserver implements TimedBufferObserver
{
public void flushBuffer(final ByteBuffer buffer, final boolean requestedSync, final
List<IOAsyncTask> callbacks)
@@ -334,12 +328,7 @@
public ByteBuffer newBuffer(int size, int limit)
{
- size = factory.calculateBlockSize(size);
- limit = factory.calculateBlockSize(limit);
-
- ByteBuffer buffer = factory.newBuffer(size);
- buffer.limit(limit);
- return buffer;
+ return AbstractSequentialFile.this.newBuffer(size, limit);
}
public int getRemainingBytes()
Copied: trunk/src/main/org/hornetq/core/journal/impl/AbstractSequentialFileFactory.java
(from rev 8467,
trunk/src/main/org/hornetq/core/journal/impl/AbstractSequentialFactory.java)
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/AbstractSequentialFileFactory.java
(rev 0)
+++
trunk/src/main/org/hornetq/core/journal/impl/AbstractSequentialFileFactory.java 2009-12-01
20:18:47 UTC (rev 8483)
@@ -0,0 +1,187 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.journal.impl;
+
+import java.io.File;
+import java.io.FilenameFilter;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import org.hornetq.core.journal.SequentialFile;
+import org.hornetq.core.journal.SequentialFileFactory;
+import org.hornetq.core.logging.Logger;
+import org.hornetq.utils.HornetQThreadFactory;
+
+/**
+ *
+ * An abstract SequentialFileFactory containing basic functionality for both AIO and NIO
SequentialFactories
+ *
+ * @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
+ * @author <a href="mailto:clebert.suconic@jboss.com">Clebert
Suconic</a>
+ *
+ */
+public abstract class AbstractSequentialFileFactory implements SequentialFileFactory
+{
+
+ // Timeout used to wait executors to shutdown
+ protected static final int EXECUTOR_TIMEOUT = 60;
+
+ private static final Logger log =
Logger.getLogger(AbstractSequentialFileFactory.class);
+
+ protected final String journalDir;
+
+ protected final TimedBuffer timedBuffer;
+
+ protected final int bufferSize;
+
+ protected final long bufferTimeout;
+
+ /**
+ * Asynchronous writes need to be done at another executor.
+ * This needs to be done at NIO, or else we would have the callers thread blocking for
the return.
+ * At AIO this is necessary as context switches on writes would fire flushes at the
kernel.
+ * */
+ protected ExecutorService writeExecutor;
+
+ public AbstractSequentialFileFactory(final String journalDir,
+ final boolean buffered,
+ final int bufferSize,
+ final int bufferTimeout,
+ final boolean logRates)
+ {
+ this.journalDir = journalDir;
+
+ if (buffered)
+ {
+ timedBuffer = new TimedBuffer(bufferSize, bufferTimeout, logRates);
+ }
+ else
+ {
+ timedBuffer = null;
+ }
+ this.bufferSize = bufferSize;
+ this.bufferTimeout = bufferTimeout;
+ }
+
+ public void stop()
+ {
+ if (timedBuffer != null)
+ {
+ timedBuffer.stop();
+ }
+
+ if (isSupportsCallbacks() && writeExecutor != null)
+ {
+ writeExecutor.shutdown();
+
+ try
+ {
+ if (!writeExecutor.awaitTermination(EXECUTOR_TIMEOUT, TimeUnit.SECONDS))
+ {
+ log.warn("Timed out on AIO writer shutdown", new
Exception("Timed out on AIO writer shutdown"));
+ }
+ }
+ catch (InterruptedException e)
+ {
+ }
+ }
+ }
+
+ public void start()
+ {
+ if (timedBuffer != null)
+ {
+ timedBuffer.start();
+ }
+
+ if (isSupportsCallbacks())
+ {
+ writeExecutor = Executors.newSingleThreadExecutor(new
HornetQThreadFactory("HornetQ-Asynchronous-Persistent-Writes" +
System.identityHashCode(this),
+
true));
+ }
+
+ }
+
+ /* (non-Javadoc)
+ * @see
org.hornetq.core.journal.SequentialFileFactory#activate(org.hornetq.core.journal.SequentialFile)
+ */
+ public void activateBuffer(final SequentialFile file)
+ {
+ if (timedBuffer != null)
+ {
+ file.setTimedBuffer(timedBuffer);
+ }
+ }
+
+ public void flush()
+ {
+ if (timedBuffer != null)
+ {
+ timedBuffer.flush();
+ }
+ }
+
+ public void deactivateBuffer()
+ {
+ if (timedBuffer != null)
+ {
+ timedBuffer.flush();
+ timedBuffer.setObserver(null);
+ }
+ }
+
+ public void releaseBuffer(final ByteBuffer buffer)
+ {
+ }
+
+ /**
+ * Create the directory if it doesn't exist yet
+ */
+ public void createDirs() throws Exception
+ {
+ File file = new File(journalDir);
+ boolean ok = file.mkdirs();
+ if (!ok)
+ {
+ throw new IOException("Failed to create directory " + journalDir);
+ }
+ }
+
+ public List<String> listFiles(final String extension) throws Exception
+ {
+ File dir = new File(journalDir);
+
+ FilenameFilter fnf = new FilenameFilter()
+ {
+ public boolean accept(final File file, final String name)
+ {
+ return name.endsWith("." + extension);
+ }
+ };
+
+ String[] fileNames = dir.list(fnf);
+
+ if (fileNames == null)
+ {
+ throw new IOException("Failed to list: " + journalDir);
+ }
+
+ return Arrays.asList(fileNames);
+ }
+}
Modified: trunk/src/main/org/hornetq/core/journal/impl/DummyCallback.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/DummyCallback.java 2009-12-01 19:40:40
UTC (rev 8482)
+++ trunk/src/main/org/hornetq/core/journal/impl/DummyCallback.java 2009-12-01 20:18:47
UTC (rev 8483)
@@ -50,7 +50,7 @@
/* (non-Javadoc)
* @see org.hornetq.core.journal.IOCompletion#linedUp()
*/
- public void lineUp()
+ public void storeLineUp()
{
}
}
Modified: trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java 2009-12-01 19:40:40 UTC
(rev 8482)
+++ trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java 2009-12-01 20:18:47 UTC
(rev 8483)
@@ -103,7 +103,6 @@
private static final void trace(final String message)
{
log.trace(message);
- //System.out.println("JournalImpl::" + message);
}
// The sizes of primitive types
@@ -166,7 +165,6 @@
private final AtomicInteger nextFileID = new AtomicInteger(0);
- // used for Asynchronous IO only (ignored on NIO).
private final int maxAIO;
private final int fileSize;
@@ -219,9 +217,70 @@
private volatile int state;
private final Reclaimer reclaimer = new Reclaimer();
-
+
// Constructors --------------------------------------------------
+ public void runDirectJournalBlast() throws Exception
+ {
+ final int numIts = 100000000;
+
+ log.info("*** running direct journal blast: " + numIts);
+
+ final CountDownLatch latch = new CountDownLatch(numIts * 2);
+
+ class MyIOAsyncTask implements IOCompletion
+ {
+ public void done()
+ {
+ latch.countDown();
+ }
+
+ public void onError(int errorCode, String errorMessage)
+ {
+
+ }
+
+ public void storeLineUp()
+ {
+ }
+ }
+
+ final MyIOAsyncTask task = new MyIOAsyncTask();
+
+ final int recordSize = 1024;
+
+ final byte[] bytes = new byte[recordSize];
+
+ class MyRecord implements EncodingSupport
+ {
+
+ public void decode(HornetQBuffer buffer)
+ {
+ }
+
+ public void encode(HornetQBuffer buffer)
+ {
+ buffer.writeBytes(bytes);
+ }
+
+ public int getEncodeSize()
+ {
+ return recordSize;
+ }
+
+ }
+
+ MyRecord record = new MyRecord();
+
+ for (int i = 0; i < numIts; i++)
+ {
+ appendAddRecord(i, (byte)1, record, true, task);
+ appendDeleteRecord(i, true, task);
+ }
+
+ latch.await();
+ }
+
public JournalImpl(final int fileSize,
final int minFiles,
final int compactMinFiles,
@@ -229,7 +288,7 @@
final SequentialFileFactory fileFactory,
final String filePrefix,
final String fileExtension,
- final int maxAIO)
+ final int maxIO)
{
if (fileFactory == null)
{
@@ -257,7 +316,7 @@
{
throw new NullPointerException("fileExtension is null");
}
- if (maxAIO <= 0)
+ if (maxIO <= 0)
{
throw new IllegalStateException("maxAIO should aways be a positive
number");
}
@@ -274,7 +333,9 @@
else
{
this.compactPercentage = (float)compactPercentage / 100f;
- }
+ }
+
+ log.info("creating journal with max io " + maxIO);
this.compactMinFiles = compactMinFiles;
@@ -288,7 +349,7 @@
this.fileExtension = fileExtension;
- this.maxAIO = maxAIO;
+ this.maxAIO = maxIO;
}
public Map<Long, JournalRecord> getRecords()
@@ -533,13 +594,13 @@
switch (recordType)
{
case ADD_RECORD:
- {
+ {
reader.onReadAddRecord(new RecordInfo(recordID, userRecordType, record,
false));
break;
}
case UPDATE_RECORD:
- {
+ {
reader.onReadUpdateRecord(new RecordInfo(recordID, userRecordType,
record, true));
break;
}
@@ -641,25 +702,33 @@
{
appendAddRecord(id, recordType, new ByteArrayEncoding(record), sync);
}
-
- public void appendAddRecord(final long id, final byte recordType, final byte[] record,
final boolean sync, final IOCompletion callback) throws Exception
+
+ public void appendAddRecord(final long id,
+ final byte recordType,
+ final byte[] record,
+ final boolean sync,
+ final IOCompletion callback) throws Exception
{
appendAddRecord(id, recordType, new ByteArrayEncoding(record), sync, callback);
}
-
+
public void appendAddRecord(final long id, final byte recordType, final
EncodingSupport record, final boolean sync) throws Exception
{
SyncIOCompletion callback = getSyncCallback(sync);
-
+
appendAddRecord(id, recordType, record, sync, callback);
-
+
if (callback != null)
{
callback.waitCompletion();
}
}
- public void appendAddRecord(final long id, final byte recordType, final
EncodingSupport record, final boolean sync, final IOCompletion callback) throws Exception
+ public void appendAddRecord(final long id,
+ final byte recordType,
+ final EncodingSupport record,
+ final boolean sync,
+ final IOCompletion callback) throws Exception
{
if (LOAD_TRACE)
{
@@ -669,16 +738,16 @@
{
throw new IllegalStateException("Journal must be loaded first");
}
-
+
compactingLock.readLock().lock();
try
- {
+ {
JournalInternalRecord addRecord = new JournalAddRecord(true, id, recordType,
record);
if (callback != null)
{
- callback.lineUp();
+ callback.storeLineUp();
}
lockAppend.lock();
@@ -704,7 +773,11 @@
appendUpdateRecord(id, recordType, new ByteArrayEncoding(record), sync);
}
- public void appendUpdateRecord(final long id, final byte recordType, final byte[]
record, final boolean sync, final IOCompletion callback) throws Exception
+ public void appendUpdateRecord(final long id,
+ final byte recordType,
+ final byte[] record,
+ final boolean sync,
+ final IOCompletion callback) throws Exception
{
appendUpdateRecord(id, recordType, new ByteArrayEncoding(record), sync, callback);
}
@@ -712,16 +785,20 @@
public void appendUpdateRecord(final long id, final byte recordType, final
EncodingSupport record, final boolean sync) throws Exception
{
SyncIOCompletion callback = getSyncCallback(sync);
-
+
appendUpdateRecord(id, recordType, record, sync, callback);
-
+
if (callback != null)
{
callback.waitCompletion();
}
}
-
- public void appendUpdateRecord(final long id, final byte recordType, final
EncodingSupport record, final boolean sync, final IOCompletion callback) throws Exception
+
+ public void appendUpdateRecord(final long id,
+ final byte recordType,
+ final EncodingSupport record,
+ final boolean sync,
+ final IOCompletion callback) throws Exception
{
if (LOAD_TRACE)
{
@@ -731,7 +808,7 @@
{
throw new IllegalStateException("Journal must be loaded first");
}
-
+
compactingLock.readLock().lock();
try
@@ -747,12 +824,12 @@
}
JournalInternalRecord updateRecord = new JournalAddRecord(false, id, recordType,
record);
-
+
if (callback != null)
{
- callback.lineUp();
+ callback.storeLineUp();
}
-
+
lockAppend.lock();
try
{
@@ -780,19 +857,18 @@
}
}
-
public void appendDeleteRecord(final long id, final boolean sync) throws Exception
{
SyncIOCompletion callback = getSyncCallback(sync);
-
+
appendDeleteRecord(id, sync, callback);
-
+
if (callback != null)
{
callback.waitCompletion();
}
}
-
+
public void appendDeleteRecord(final long id, final boolean sync, final IOCompletion
callback) throws Exception
{
if (LOAD_TRACE)
@@ -803,7 +879,7 @@
{
throw new IllegalStateException("Journal must be loaded first");
}
-
+
compactingLock.readLock().lock();
try
@@ -818,12 +894,12 @@
throw new IllegalStateException("Cannot find add info " + id);
}
}
-
+
JournalInternalRecord deleteRecord = new JournalDeleteRecord(id);
if (callback != null)
{
- callback.lineUp();
+ callback.storeLineUp();
}
lockAppend.lock();
@@ -997,8 +1073,7 @@
{
appendDeleteRecordTransactional(txID, id, NullEncoding.instance);
}
-
-
+
public void appendPrepareRecord(long txID, byte[] transactionData, boolean sync,
IOCompletion completion) throws Exception
{
appendPrepareRecord(txID, new ByteArrayEncoding(transactionData), sync,
completion);
@@ -1015,9 +1090,9 @@
public void appendPrepareRecord(final long txID, final EncodingSupport
transactionData, final boolean sync) throws Exception
{
SyncIOCompletion syncCompletion = getSyncCallback(sync);
-
+
appendPrepareRecord(txID, transactionData, sync, syncCompletion);
-
+
if (syncCompletion != null)
{
syncCompletion.waitCompletion();
@@ -1037,7 +1112,10 @@
* @param transactionData - extra user data for the prepare
* @throws Exception
*/
- public void appendPrepareRecord(final long txID, final EncodingSupport
transactionData, final boolean sync, IOCompletion callback) throws Exception
+ public void appendPrepareRecord(final long txID,
+ final EncodingSupport transactionData,
+ final boolean sync,
+ IOCompletion callback) throws Exception
{
if (LOAD_TRACE)
{
@@ -1060,7 +1138,7 @@
if (callback != null)
{
- callback.lineUp();
+ callback.storeLineUp();
}
lockAppend.lock();
@@ -1081,22 +1159,19 @@
compactingLock.readLock().unlock();
}
}
-
-
-
+
public void appendCommitRecord(final long txID, final boolean sync) throws Exception
{
SyncIOCompletion syncCompletion = getSyncCallback(sync);
-
+
appendCommitRecord(txID, sync, syncCompletion);
-
+
if (syncCompletion != null)
{
syncCompletion.waitCompletion();
}
}
-
/**
* <p>A transaction record (Commit or Prepare), will hold the number of elements
the transaction has on each file.</p>
* <p>For example, a transaction was spread along 3 journal files with 10
pendingTransactions on each file.
@@ -1114,7 +1189,6 @@
*
* @see JournalImpl#writeTransaction(byte, long,
org.hornetq.core.journal.impl.JournalImpl.JournalTransaction, EncodingSupport)
*/
-
public void appendCommitRecord(final long txID, final boolean sync, final IOCompletion
callback) throws Exception
{
@@ -1130,7 +1204,6 @@
try
{
-
if (tx == null)
{
throw new IllegalStateException("Cannot find tx with id " + txID);
@@ -1140,7 +1213,7 @@
if (callback != null)
{
- callback.lineUp();
+ callback.storeLineUp();
}
lockAppend.lock();
@@ -1162,20 +1235,19 @@
}
}
-
public void appendRollbackRecord(final long txID, final boolean sync) throws
Exception
{
SyncIOCompletion syncCompletion = getSyncCallback(sync);
-
+
appendRollbackRecord(txID, sync, syncCompletion);
-
+
if (syncCompletion != null)
{
syncCompletion.waitCompletion();
}
}
-
+
public void appendRollbackRecord(final long txID, final boolean sync, final
IOCompletion callback) throws Exception
{
if (state != STATE_LOADED)
@@ -1195,12 +1267,12 @@
{
throw new IllegalStateException("Cannot find tx with id " + txID);
}
-
+
JournalInternalRecord rollbackRecord = new JournalRollbackRecordTX(txID);
if (callback != null)
{
- callback.lineUp();
+ callback.storeLineUp();
}
lockAppend.lock();
@@ -1276,7 +1348,7 @@
}
public void addRecord(final RecordInfo info)
- {
+ {
records.add(info);
}
@@ -1288,7 +1360,7 @@
public void deleteRecord(final long id)
{
recordsToDelete.add(id);
-
+
// Clean up when the list is too large, or it won't be possible to load
large sets of files
// Done as part of JBMESSAGING-1678
if (recordsToDelete.size() == DELETE_FLUSH)
@@ -1324,7 +1396,7 @@
committedRecords.add(record);
}
}
-
+
return info;
}
@@ -2695,7 +2767,6 @@
}
/**
- * Note: You should aways guarantee locking the semaphore lock.
*
* @param completeTransaction If the appendRecord is for a prepare or commit, where we
should update the number of pendingTransactions on the current file
* */
@@ -2705,109 +2776,91 @@
final JournalTransaction tx,
final IOAsyncTask parameterCallback) throws
Exception
{
- try
+ if (state != STATE_LOADED)
{
- if (state != STATE_LOADED)
- {
- throw new IllegalStateException("The journal is not loaded " +
state);
- }
-
- final IOAsyncTask callback;
+ throw new IllegalStateException("The journal is not loaded " +
state);
+ }
- int size = encoder.getEncodeSize();
+ final IOAsyncTask callback;
- // We take into account the fileID used on the Header
- if (size > fileSize -
currentFile.getFile().calculateBlockStart(SIZE_HEADER))
- {
- throw new IllegalArgumentException("Record is too large to store "
+ size);
- }
+ int size = encoder.getEncodeSize();
- // Disable auto flush on the timer. The Timer should'nt flush anything
- currentFile.getFile().disableAutoFlush();
+ // We take into account the fileID used on the Header
+ if (size > fileSize - currentFile.getFile().calculateBlockStart(SIZE_HEADER))
+ {
+ throw new IllegalArgumentException("Record is too large to store " +
size);
+ }
+ if (!currentFile.getFile().fits(size))
+ {
+ moveNextFile(false);
+
+ // The same check needs to be done at the new file also
if (!currentFile.getFile().fits(size))
{
- currentFile.getFile().enableAutoFlush();
-
- moveNextFile(false);
-
- currentFile.getFile().disableAutoFlush();
-
- // The same check needs to be done at the new file also
- if (!currentFile.getFile().fits(size))
- {
- // Sanity check, this should never happen
- throw new IllegalStateException("Invalid logic on buffer
allocation");
- }
+ // Sanity check, this should never happen
+ throw new IllegalStateException("Invalid logic on buffer
allocation");
}
+ }
- if (currentFile == null)
- {
- throw new NullPointerException("Current file = null");
- }
+ if (currentFile == null)
+ {
+ throw new NullPointerException("Current file = null");
+ }
- if (tx != null)
+ if (tx != null)
+ {
+ // The callback of a transaction has to be taken inside the lock,
+ // when we guarantee the currentFile will not be changed,
+ // since we individualize the callback per file
+ if (fileFactory.isSupportsCallbacks())
{
- // The callback of a transaction has to be taken inside the lock,
- // when we guarantee the currentFile will not be changed,
- // since we individualize the callback per file
- if (fileFactory.isSupportsCallbacks())
+ // Set the delegated callback as a parameter
+ TransactionCallback txcallback = tx.getCallback(currentFile);
+ if (parameterCallback != null)
{
- // Set the delegated callback as a parameter
- TransactionCallback txcallback = tx.getCallback(currentFile);
- if (parameterCallback != null)
- {
- txcallback.setDelegateCompletion(parameterCallback);
- }
- callback = txcallback;
+ txcallback.setDelegateCompletion(parameterCallback);
}
- else
- {
- callback = null;
- }
-
- if (sync)
- {
- // In an edge case the transaction could still have pending data from
previous files.
- // This shouldn't cause any blocking issues, as this is here to
guarantee we cover all possibilities
- // on guaranteeing the data is on the disk
- tx.syncPreviousFiles(fileFactory.isSupportsCallbacks(), currentFile);
- }
-
- // We need to add the number of records on currentFile if prepare or commit
- if (completeTransaction)
- {
- // Filling the number of pendingTransactions at the current file
- tx.fillNumberOfRecords(currentFile, encoder);
- }
+ callback = txcallback;
}
else
{
- callback = parameterCallback;
+ callback = null;
}
- // Adding fileID
- encoder.setFileID(currentFile.getFileID());
-
- if (callback != null)
+ if (sync)
{
- currentFile.getFile().write(encoder, sync, callback);
+ // In an edge case the transaction could still have pending data from
previous files.
+ // This shouldn't cause any blocking issues, as this is here to guarantee
we cover all possibilities
+ // on guaranteeing the data is on the disk
+ tx.syncPreviousFiles(fileFactory.isSupportsCallbacks(), currentFile);
}
- else
+
+ // We need to add the number of records on currentFile if prepare or commit
+ if (completeTransaction)
{
- currentFile.getFile().write(encoder, sync);
+ // Filling the number of pendingTransactions at the current file
+ tx.fillNumberOfRecords(currentFile, encoder);
}
+ }
+ else
+ {
+ callback = parameterCallback;
+ }
- return currentFile;
+ // Adding fileID
+ encoder.setFileID(currentFile.getFileID());
+
+ if (callback != null)
+ {
+ currentFile.getFile().write(encoder, sync, callback);
}
- finally
+ else
{
- if (currentFile != null)
- {
- currentFile.getFile().enableAutoFlush();
- }
+ currentFile.getFile().write(encoder, sync);
}
+ return currentFile;
}
/** Get the ID part of the name */
@@ -3352,7 +3405,7 @@
return id1 < id2 ? -1 : id1 == id2 ? 0 : 1;
}
}
-
+
private class PerfBlast extends Thread
{
private final int pages;
@@ -3371,7 +3424,7 @@
lockAppend.lock();
final ByteArrayEncoding byteEncoder = new ByteArrayEncoding(new byte[128 *
1024]);
-
+
JournalInternalRecord blastRecord = new JournalInternalRecord()
{
@@ -3386,7 +3439,6 @@
byteEncoder.encode(buffer);
}
};
-
for (int i = 0; i < pages; i++)
{
@@ -3401,11 +3453,5 @@
}
}
}
-
-
-
-
-
-
}
Modified: trunk/src/main/org/hornetq/core/journal/impl/NIOSequentialFile.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/NIOSequentialFile.java 2009-12-01
19:40:40 UTC (rev 8482)
+++ trunk/src/main/org/hornetq/core/journal/impl/NIOSequentialFile.java 2009-12-01
20:18:47 UTC (rev 8483)
@@ -46,21 +46,28 @@
/** The write semaphore here is only used when writing asynchronously */
private Semaphore maxIOSemaphore;
-
+
private final int defaultMaxIO;
-
+
private int maxIO;
- public NIOSequentialFile(final SequentialFileFactory factory, final String directory,
final String fileName, final int maxIO, final Executor writerExecutor)
+ public NIOSequentialFile(final SequentialFileFactory factory,
+ final String directory,
+ final String fileName,
+ final int maxIO,
+ final Executor writerExecutor)
{
super(directory, new File(directory + "/" + fileName), factory,
writerExecutor);
- this.defaultMaxIO = maxIO;
+ defaultMaxIO = maxIO;
}
- public NIOSequentialFile(final SequentialFileFactory factory, final File file, final
int maxIO, final Executor writerExecutor)
+ public NIOSequentialFile(final SequentialFileFactory factory,
+ final File file,
+ final int maxIO,
+ final Executor writerExecutor)
{
super(file.getParent(), new File(file.getPath()), factory, writerExecutor);
- this.defaultMaxIO = maxIO;
+ defaultMaxIO = maxIO;
}
public int getAlignment()
@@ -82,7 +89,7 @@
* Some operations while initializing files on the journal may require a different
maxIO */
public synchronized void open() throws Exception
{
- open(this.defaultMaxIO);
+ open(defaultMaxIO);
}
public void open(final int maxIO) throws Exception
@@ -92,10 +99,10 @@
channel = rfile.getChannel();
fileSize = channel.size();
-
+
if (writerExecutor != null)
{
- this.maxIOSemaphore = new Semaphore(maxIO);
+ maxIOSemaphore = new Semaphore(maxIO);
this.maxIO = maxIO;
}
}
@@ -130,18 +137,19 @@
}
}
+ @Override
public synchronized void close() throws Exception
{
super.close();
-
+
if (maxIOSemaphore != null)
{
while (!maxIOSemaphore.tryAcquire(maxIO, 60, TimeUnit.SECONDS))
{
- log.warn("Couldn't get lock after 60 seconds on closing
AsynchronousFileImpl::" + this.getFileName());
+ log.warn("Couldn't get lock after 60 seconds on closing
AsynchronousFileImpl::" + getFileName());
}
}
-
+
maxIOSemaphore = null;
if (channel != null)
@@ -213,6 +221,7 @@
}
}
+ @Override
public void position(final long pos) throws Exception
{
super.position(pos);
@@ -252,6 +261,16 @@
internalWrite(bytes, sync, null);
}
+ @Override
+ protected ByteBuffer newBuffer(int size, final int limit)
+ {
+ // For NIO, we don't need to allocate a buffer the entire size of the timed
buffer, unlike AIO
+
+ size = limit;
+
+ return super.newBuffer(size, limit);
+ }
+
private void internalWrite(final ByteBuffer bytes, final boolean sync, final
IOAsyncTask callback) throws Exception
{
if (!isOpen())
@@ -266,7 +285,7 @@
}
return;
}
-
+
if (writerExecutor == null)
{
doInternalWrite(bytes, sync, callback);
@@ -275,7 +294,7 @@
{
// This is a flow control on writing, just like maxAIO on libaio
maxIOSemaphore.acquire();
-
+
writerExecutor.execute(new Runnable()
{
public void run()
@@ -309,7 +328,7 @@
* @throws Exception
*/
private void doInternalWrite(final ByteBuffer bytes, final boolean sync, final
IOAsyncTask callback) throws Exception
- {
+ {
position.addAndGet(bytes.limit());
channel.write(bytes);
Modified: trunk/src/main/org/hornetq/core/journal/impl/NIOSequentialFileFactory.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/NIOSequentialFileFactory.java 2009-12-01
19:40:40 UTC (rev 8482)
+++ trunk/src/main/org/hornetq/core/journal/impl/NIOSequentialFileFactory.java 2009-12-01
20:18:47 UTC (rev 8483)
@@ -28,7 +28,7 @@
* @author <a href="mailto:clebert.suconic@jboss.com">Clebert
Suconic</a>
*
*/
-public class NIOSequentialFileFactory extends AbstractSequentialFactory implements
SequentialFileFactory
+public class NIOSequentialFileFactory extends AbstractSequentialFileFactory implements
SequentialFileFactory
{
private static final Logger log = Logger.getLogger(NIOSequentialFileFactory.class);
@@ -37,9 +37,8 @@
{
this(journalDir,
false,
- ConfigurationImpl.DEFAULT_JOURNAL_BUFFER_SIZE,
- ConfigurationImpl.DEFAULT_JOURNAL_BUFFER_TIMEOUT,
- ConfigurationImpl.DEFAULT_JOURNAL_FLUSH_SYNC,
+ ConfigurationImpl.DEFAULT_JOURNAL_BUFFER_SIZE_NIO,
+ ConfigurationImpl.DEFAULT_JOURNAL_BUFFER_TIMEOUT_NIO,
false);
}
@@ -47,26 +46,23 @@
{
this(journalDir,
buffered,
- ConfigurationImpl.DEFAULT_JOURNAL_BUFFER_SIZE,
- ConfigurationImpl.DEFAULT_JOURNAL_BUFFER_TIMEOUT,
- ConfigurationImpl.DEFAULT_JOURNAL_FLUSH_SYNC,
+ ConfigurationImpl.DEFAULT_JOURNAL_BUFFER_SIZE_NIO,
+ ConfigurationImpl.DEFAULT_JOURNAL_BUFFER_TIMEOUT_NIO,
false);
}
public NIOSequentialFileFactory(final String journalDir,
final boolean buffered,
final int bufferSize,
- final long bufferTimeout,
- final boolean flushOnSync,
+ final int bufferTimeout,
final boolean logRates)
{
- super(journalDir, buffered, bufferSize, bufferTimeout, flushOnSync, logRates);
+ super(journalDir, buffered, bufferSize, bufferTimeout, logRates);
}
- // maxIO is ignored on NIO
public SequentialFile createSequentialFile(final String fileName, int maxIO)
{
- if (maxIO < 0)
+ if (maxIO < 1)
{
// A single threaded IO
maxIO = 1;
Modified: trunk/src/main/org/hornetq/core/journal/impl/SimpleWaitIOCallback.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/SimpleWaitIOCallback.java 2009-12-01
19:40:40 UTC (rev 8482)
+++ trunk/src/main/org/hornetq/core/journal/impl/SimpleWaitIOCallback.java 2009-12-01
20:18:47 UTC (rev 8483)
@@ -71,7 +71,7 @@
/* (non-Javadoc)
* @see org.hornetq.core.journal.IOCompletion#linedUp()
*/
- public void lineUp()
+ public void storeLineUp()
{
}
}
Modified: trunk/src/main/org/hornetq/core/journal/impl/SyncSpeedTest.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/SyncSpeedTest.java 2009-12-01 19:40:40
UTC (rev 8482)
+++ trunk/src/main/org/hornetq/core/journal/impl/SyncSpeedTest.java 2009-12-01 20:18:47
UTC (rev 8483)
@@ -10,15 +10,20 @@
* implied. See the License for the specific language governing
* permissions and limitations under the License.
*/
-
package org.hornetq.core.journal.impl;
import java.io.File;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
-import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executors;
+import org.hornetq.core.journal.IOAsyncTask;
+import org.hornetq.core.journal.SequentialFile;
+import org.hornetq.core.journal.SequentialFileFactory;
import org.hornetq.core.logging.Logger;
/**
@@ -34,139 +39,303 @@
public class SyncSpeedTest
{
private static final Logger log = Logger.getLogger(SyncSpeedTest.class);
-
+
public static void main(final String[] args)
{
try
{
- new SyncSpeedTest().run();
+ new SyncSpeedTest().testScaleAIO();
}
catch (Exception e)
{
e.printStackTrace();
}
}
-
+
+ protected SequentialFileFactory fileFactory;
+
+ public boolean AIO = true;
+
+ protected void setupFactory()
+ {
+ if (AIO)
+ {
+ fileFactory = new AIOSequentialFileFactory(".", 0, 0, false);
+ }
+ else
+ {
+ fileFactory = new NIOSequentialFileFactory(".", false, 0, 0, false);
+ }
+ }
+
+ protected SequentialFile createSequentialFile(String fileName)
+ {
+ if (AIO)
+ {
+ return new AIOSequentialFile(fileFactory,
+ 0,
+ 0,
+ ".",
+ fileName,
+ 100000,
+ null,
+ null,
+ Executors.newSingleThreadExecutor());
+ }
+ else
+ {
+ return new NIOSequentialFile(fileFactory, new File(fileName), 1000, null);
+ }
+ }
+
+ public void run2() throws Exception
+ {
+ setupFactory();
+
+ int recordSize = 128 * 1024;
+
+ while (true)
+ {
+ System.out.println("** record size is " + recordSize);
+
+ int warmup = 500;
+
+ int its = 500;
+
+ int fileSize = (its + warmup) * recordSize;
+
+ SequentialFile file = createSequentialFile("sync-speed-test.dat");
+
+ if (file.exists())
+ {
+ file.delete();
+ }
+
+ file.open();
+
+ file.fill(0, fileSize, (byte)'X');
+
+ if (!AIO)
+ {
+ file.sync();
+ }
+
+ ByteBuffer bb1 = generateBuffer(recordSize, (byte)'h');
+
+ long start = 0;
+
+ for (int i = 0; i < its + warmup; i++)
+ {
+ if (i == warmup)
+ {
+ start = System.currentTimeMillis();
+ }
+
+ bb1.rewind();
+
+ file.writeDirect(bb1, true);
+ }
+
+ long end = System.currentTimeMillis();
+
+ double rate = 1000 * ((double)its) / (end - start);
+
+ double throughput = recordSize * rate;
+
+ System.out.println("Rate of " + rate + " syncs per sec");
+ System.out.println("Throughput " + throughput + " bytes per
sec");
+ System.out.println("*************");
+
+ recordSize *= 2;
+ }
+ }
+
public void run() throws Exception
{
- int fileSize = 1024 * 1024 * 100;
-
- int recordSize = 1024;
-
- int its = 10 * 1024;
-
- File file = new File("sync-speed-test.dat");
-
- if (file.exists())
+ int recordSize = 256;
+
+ while (true)
{
- file.delete();
+ System.out.println("** record size is " + recordSize);
+
+ int warmup = 500;
+
+ int its = 500;
+
+ int fileSize = (its + warmup) * recordSize;
+
+ File file = new File("sync-speed-test.dat");
+
+ if (file.exists())
+ {
+ file.delete();
+ }
+
+ file.createNewFile();
+
+ RandomAccessFile rfile = new RandomAccessFile(file, "rw");
+
+ FileChannel channel = rfile.getChannel();
+
+ ByteBuffer bb = generateBuffer(fileSize, (byte)'x');
+
+ write(bb, channel, fileSize);
+
+ channel.force(true);
+
+ channel.position(0);
+
+ ByteBuffer bb1 = generateBuffer(recordSize, (byte)'h');
+
+ long start = 0;
+
+ for (int i = 0; i < its + warmup; i++)
+ {
+ if (i == warmup)
+ {
+ start = System.currentTimeMillis();
+ }
+
+ bb1.flip();
+ channel.write(bb1);
+ channel.force(false);
+ }
+
+ long end = System.currentTimeMillis();
+
+ double rate = 1000 * ((double)its) / (end - start);
+
+ double throughput = recordSize * rate;
+
+ System.out.println("Rate of " + rate + " syncs per sec");
+ System.out.println("Throughput " + throughput + " bytes per
sec");
+
+ recordSize *= 2;
}
-
- RandomAccessFile rfile = new RandomAccessFile(file, "rw");
-
- FileChannel channel = rfile.getChannel();
-
- ByteBuffer bb = generateBuffer(fileSize, (byte)'x');
-
- write(bb, channel, fileSize);
-
- channel.force(false);
-
- channel.position(0);
-
- MappedByteBuffer mappedBB = channel.map(FileChannel.MapMode.READ_WRITE, 0,
fileSize);
-
- mappedBB.load();
- // mappedBB.order(java.nio.ByteOrder.LITTLE_ENDIAN);
- System.out.println("isLoaded=" + mappedBB.isLoaded() + ";
isDirect=" + mappedBB.isDirect() + "; byteOrder=" + mappedBB.order());
-
- ByteBuffer bb1 = generateBuffer(recordSize, (byte)'h');
-
- System.out.println("Measuring");
-
- long start = System.currentTimeMillis();
-
- for (int i = 0; i < its; i++)
+ }
+
+ public void testScaleAIO() throws Exception
+ {
+ setupFactory();
+
+ final int recordSize = 1024;
+
+ System.out.println("** record size is " + recordSize);
+
+ final int its = 10;
+
+ for (int numThreads = 1; numThreads <= 10; numThreads++)
{
- bb1.flip();
- mappedBB.position(0);
- mappedBB.put(bb1);
- mappedBB.force();
-
- //write(bb1, channel, recordSize);
- // channel.force(false);
+
+ int fileSize = its * recordSize * numThreads;
+
+ final SequentialFile file =
createSequentialFile("sync-speed-test.dat");
+
+ if (file.exists())
+ {
+ file.delete();
+ }
+
+ file.open();
+
+ file.fill(0, fileSize, (byte)'X');
+
+ if (!AIO)
+ {
+ file.sync();
+ }
+
+ final CountDownLatch latch = new CountDownLatch(its * numThreads);
+
+ class MyIOAsyncTask implements IOAsyncTask
+ {
+ public void done()
+ {
+ latch.countDown();
+ }
+
+ public void onError(int errorCode, String errorMessage)
+ {
+
+ }
+ }
+
+ final MyIOAsyncTask task = new MyIOAsyncTask();
+
+ class MyRunner implements Runnable
+ {
+ private ByteBuffer bb1;
+
+ MyRunner()
+ {
+ bb1 = generateBuffer(recordSize, (byte)'h');
+ }
+
+ public void run()
+ {
+ for (int i = 0; i < its; i++)
+ {
+ bb1.rewind();
+
+ file.writeDirect(bb1, true, task);
+// try
+// {
+// file.writeDirect(bb1, true);
+// }
+// catch (Exception e)
+// {
+// e.printStackTrace();
+// }
+ }
+ }
+ }
+
+ Set<Thread> threads = new HashSet<Thread>();
+
+ for (int i = 0; i < numThreads; i++)
+ {
+ MyRunner runner = new MyRunner();
+
+ Thread t = new Thread(runner);
+
+ threads.add(t);
+ }
+
+ long start = System.currentTimeMillis();
+
+ for (Thread t : threads)
+ {
+ log.info("starting thread");
+ t.start();
+ }
+
+ for (Thread t : threads)
+ {
+ t.join();
+ }
+
+ latch.await();
+
+ long end = System.currentTimeMillis();
+
+ double rate = 1000 * ((double)its * numThreads) / (end - start);
+
+ double throughput = recordSize * rate;
+
+ System.out.println("For " + numThreads + " threads:");
+ System.out.println("Rate of " + rate + " records per sec");
+ System.out.println("Throughput " + throughput + " bytes per
sec");
+ System.out.println("*************");
}
-
- long end = System.currentTimeMillis();
-
- double rate = 1000 * ((double)its) / (end - start);
-
- System.out.println("Rate of " + rate + " syncs per sec");
- file.delete();
}
-
-// public void run() throws Exception
-// {
-// log.info("******* Starting file sync speed test *******");
-//
-// int fileSize = 1024 * 1024 * 10;
-//
-// int recordSize = 1024;
-//
-// int its = 10 * 1024;
-//
-// File file = new File("sync-speed-test.dat");
-//
-// if (file.exists())
-// {
-// file.delete();
-// }
-//
-// RandomAccessFile rfile = new RandomAccessFile(file, "rw");
-//
-// FileChannel channel = rfile.getChannel();
-//
-// ByteBuffer bb = generateBuffer(fileSize, (byte)'x');
-//
-// write(bb, channel, fileSize);
-//
-// channel.force(false);
-//
-// channel.position(0);
-//
-// ByteBuffer bb1 = generateBuffer(recordSize, (byte)'h');
-//
-// log.info("Measuring");
-//
-// long start = System.currentTimeMillis();
-//
-// for (int i = 0; i < its; i++)
-// {
-// write(bb1, channel, recordSize);
-//
-// channel.force(false);
-// }
-//
-// long end = System.currentTimeMillis();
-//
-// double rate = 1000 * ((double)its) / (end - start);
-//
-// log.info("Rate of " + rate + " syncs per sec");
-//
-// rfile.close();
-//
-// file.delete();
-//
-// log.info("****** test complete *****");
-// }
-
+
private void write(final ByteBuffer buffer, final FileChannel channel, final int size)
throws Exception
{
buffer.flip();
-
+
channel.write(buffer);
}
-
+
private ByteBuffer generateBuffer(final int size, final byte ch)
{
ByteBuffer bb = ByteBuffer.allocateDirect(size);
@@ -175,7 +344,7 @@
{
bb.put(ch);
}
-
+
return bb;
- }
-}
+ }
+}
\ No newline at end of file
Modified: trunk/src/main/org/hornetq/core/journal/impl/TimedBuffer.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/TimedBuffer.java 2009-12-01 19:40:40 UTC
(rev 8482)
+++ trunk/src/main/org/hornetq/core/journal/impl/TimedBuffer.java 2009-12-01 20:18:47 UTC
(rev 8483)
@@ -19,17 +19,15 @@
import java.util.List;
import java.util.Timer;
import java.util.TimerTask;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.atomic.AtomicLong;
import org.hornetq.core.buffers.HornetQBuffer;
import org.hornetq.core.buffers.HornetQBuffers;
import org.hornetq.core.journal.EncodingSupport;
import org.hornetq.core.journal.IOAsyncTask;
-import org.hornetq.core.journal.IOCompletion;
import org.hornetq.core.journal.impl.dataformat.ByteArrayEncoding;
import org.hornetq.core.logging.Logger;
-import org.hornetq.utils.VariableLatch;
/**
* A TimedBuffer
@@ -48,9 +46,10 @@
private TimedBufferObserver bufferObserver;
- // This is used to pause and resume the timer
- // This is a reusable Latch, that uses java.util.concurrent base classes
- private final VariableLatch latchTimer = new VariableLatch();
+ // If the TimedBuffer is idle - i.e. no records are being added, then it's
pointless the timer flush thread
+ // in spinning and checking the time - and using up CPU in the process - this
semaphore is used to
+ // prevent that
+ private final Semaphore spinLimiter = new Semaphore(1);
private CheckTimer timerRunnable = new CheckTimer();
@@ -62,13 +61,8 @@
private List<IOAsyncTask> callbacks;
- private final Lock lock = new ReentrantReadWriteLock().writeLock();
+ private volatile int timeout;
- // used to measure inactivity. This buffer will be automatically flushed when more
than timeout inactive
- private volatile boolean active = false;
-
- private final long timeout;
-
// used to measure sync requests. When a sync is requested, it shouldn't take more
than timeout to happen
private volatile boolean pendingSync = false;
@@ -76,21 +70,24 @@
private volatile boolean started;
- private final boolean flushOnSync;
+ // We use this flag to prevent flush occuring between calling checkSize and addBytes
+ // CheckSize must always be followed by it's corresponding addBytes otherwise the
buffer
+ // can get in an inconsistent state
+ private boolean delayFlush;
// for logging write rates
private final boolean logRates;
- private volatile long bytesFlushed;
+ private final AtomicLong bytesFlushed = new AtomicLong(0);
- private volatile long flushesDone;
+ private final AtomicLong flushesDone = new AtomicLong(0);
private Timer logRatesTimer;
private TimerTask logRatesTimerTask;
- private long lastExecution;
+ private final AtomicLong lastFlushTime = new AtomicLong(0);
// Static --------------------------------------------------------
@@ -98,23 +95,29 @@
// Public --------------------------------------------------------
- public TimedBuffer(final int size, final long timeout, final boolean flushOnSync,
final boolean logRates)
+ public TimedBuffer(final int size, final int timeout, final boolean logRates)
{
- this.bufferSize = size;
+ log.info("timed buffer size " + size);
+ log.info("timed buffer timeout " + timeout);
+
+ bufferSize = size;
+
this.logRates = logRates;
+
if (logRates)
{
- this.logRatesTimer = new Timer(true);
+ logRatesTimer = new Timer(true);
}
// Setting the interval for nano-sleeps
buffer = HornetQBuffers.fixedBuffer(bufferSize);
+
buffer.clear();
+
bufferLimit = 0;
callbacks = new ArrayList<IOAsyncTask>();
- this.flushOnSync = flushOnSync;
- latchTimer.up();
+
this.timeout = timeout;
}
@@ -127,7 +130,7 @@
timerRunnable = new CheckTimer();
- timerThread = new Thread(timerRunnable, "hornetq-async-buffer");
+ timerThread = new Thread(timerRunnable, "hornetq-buffer-timeout");
timerThread.start();
@@ -148,11 +151,11 @@
return;
}
- this.flush();
+ flush();
- this.bufferObserver = null;
+ bufferObserver = null;
- latchTimer.down();
+ spinLimiter.release();
timerRunnable.close();
@@ -175,26 +178,16 @@
started = false;
}
- public synchronized void setObserver(TimedBufferObserver observer)
+ public synchronized void setObserver(final TimedBufferObserver observer)
{
- if (this.bufferObserver != null)
+ if (bufferObserver != null)
{
flush();
}
- this.bufferObserver = observer;
+ bufferObserver = observer;
}
- public void disableAutoFlush()
- {
- lock.lock();
- }
-
- public void enableAutoFlush()
- {
- lock.unlock();
- }
-
/**
* Verify if the size fits the buffer
* @param sizeChecked
@@ -210,23 +203,36 @@
if (bufferLimit == 0 || buffer.writerIndex() + sizeChecked > bufferLimit)
{
+ // Either there is not enough space left in the buffer for the sized record
+ // Or a flush has just been performed and we need to re-calcualate bufferLimit
+
flush();
- final int remaining = bufferObserver.getRemainingBytes();
+ delayFlush = true;
- if (sizeChecked > remaining)
+ final int remainingInFile = bufferObserver.getRemainingBytes();
+
+ if (sizeChecked > remainingInFile)
{
+ // Need to move to a new file -not enough space in file for this size
+
return false;
}
else
{
- buffer.clear();
- bufferLimit = Math.min(remaining, bufferSize);
+ // There is enough space in the file for this size
+
+ // Need to re-calculate buffer limit
+
+ bufferLimit = Math.min(remainingInFile, bufferSize);
+
return true;
}
}
else
{
+ delayFlush = true;
+
return true;
}
}
@@ -238,87 +244,84 @@
public synchronized void addBytes(final EncodingSupport bytes, final boolean sync,
final IOAsyncTask callback)
{
+ delayFlush = false;
+
if (buffer.writerIndex() == 0)
{
- // Resume latch
- latchTimer.down();
+ // More bytes have been added so the timer flush thread can resume
+
+ spinLimiter.release();
}
bytes.encode(buffer);
callbacks.add(callback);
- active = true;
-
if (sync)
{
- if (!pendingSync)
- {
- pendingSync = true;
- }
+ pendingSync = true;
- if (flushOnSync)
- {
- flush();
- }
+// if (System.nanoTime() - lastFlushTime.get() > timeout)
+// {
+// // This might happen if there is low activity in the buffer - the timer
hasn't fired because no sync records
+// // have been recently added, and suddenly a sync record is added
+// // In this case we do a flush immediately, which can reduce latency in this
case
+//
+// flush();
+// }
}
- if (buffer.writerIndex() == bufferLimit)
- {
- flush();
- }
}
public void flush()
{
- ByteBuffer bufferToFlush = null;
-
- boolean useSync = false;
-
- List<IOAsyncTask> callbacksToCall = null;
-
synchronized (this)
{
- if (buffer.writerIndex() > 0)
+ if (!delayFlush && buffer.writerIndex() > 0)
{
- latchTimer.up();
-
int pos = buffer.writerIndex();
if (logRates)
{
- bytesFlushed += pos;
+ bytesFlushed.addAndGet(pos);
}
- bufferToFlush = bufferObserver.newBuffer(bufferSize, pos);
+ ByteBuffer bufferToFlush = bufferObserver.newBuffer(bufferSize, pos);
// Putting a byteArray on a native buffer is much faster, since it will do in
a single native call.
// Using bufferToFlush.put(buffer) would make several append calls for each
byte
bufferToFlush.put(buffer.toByteBuffer().array(), 0, pos);
- callbacksToCall = callbacks;
+ if (bufferToFlush != null)
+ {
+ bufferObserver.flushBuffer(bufferToFlush, pendingSync, callbacks);
+ }
- callbacks = new LinkedList<IOAsyncTask>();
+ try
+ {
+ // We acquire the spinLimiter semaphore - this prevents the timer flush
thread unnecessarily spinning
+ // when the buffer is inactive
+ spinLimiter.acquire();
+ }
+ catch (InterruptedException e)
+ {
+ // Ignore
+ }
- useSync = pendingSync;
+ lastFlushTime.set(System.nanoTime());
- active = false;
pendingSync = false;
+ callbacks = new LinkedList<IOAsyncTask>();
+
buffer.clear();
+
bufferLimit = 0;
- flushesDone++;
+ flushesDone.incrementAndGet();
}
}
-
- // Execute the flush outside of the lock
- // This is important for NIO performance while we are using NIO Callbacks
- if (bufferToFlush != null)
- {
- bufferObserver.flushBuffer(bufferToFlush, useSync, callbacksToCall);
- }
}
// Package protected ---------------------------------------------
@@ -327,36 +330,18 @@
// Private -------------------------------------------------------
- private void checkTimer()
- {
- // if inactive for more than the timeout
- // of if a sync happened at more than the the timeout ago
- if (!active || pendingSync)
- {
- lock.lock();
- try
- {
- if (bufferObserver != null)
- {
- flush();
- }
- }
- finally
- {
- lock.unlock();
- }
- }
-
- // Set the buffer as inactive.. we will flush the buffer next tick if nothing
change this
- active = false;
- }
-
// Inner classes -------------------------------------------------
private class LogRatesTimerTask extends TimerTask
{
private boolean closed;
+ private long lastExecution;
+
+ private long lastBytesFlushed;
+
+ private long lastFlushesDone;
+
@Override
public synchronized void run()
{
@@ -364,22 +349,26 @@
{
long now = System.currentTimeMillis();
+ long bytesF = bytesFlushed.get();
+ long flushesD = flushesDone.get();
+
if (lastExecution != 0)
{
- double rate = 1000 * ((double)bytesFlushed) / (now - lastExecution);
+ double rate = 1000 * (double)(bytesF - lastBytesFlushed) / (now -
lastExecution);
log.info("Write rate = " + rate + " bytes / sec or " +
(long)(rate / (1024 * 1024)) + " MiB / sec");
- double flushRate = 1000 * ((double)flushesDone) / (now - lastExecution);
+ double flushRate = 1000 * (double)(flushesD - lastFlushesDone) / (now -
lastExecution);
log.info("Flush rate = " + flushRate + " flushes /
sec");
}
lastExecution = now;
- bytesFlushed = 0;
+ lastBytesFlushed = bytesF;
- flushesDone = 0;
+ lastFlushesDone = flushesD;
}
}
+ @Override
public synchronized boolean cancel()
{
closed = true;
@@ -396,33 +385,29 @@
{
while (!closed)
{
- try
+ // We flush on the timer if there are pending syncs there and we've
waited waited at least one
+ // timeout since the time of the last flush
+ // Effectively flushing "resets" the timer
+
+ if (pendingSync && bufferObserver != null &&
(System.nanoTime() > lastFlushTime.get() + timeout))
{
- latchTimer.waitCompletion();
+ flush();
}
- catch (InterruptedException ignored)
+
+ try
{
- }
+ spinLimiter.acquire();
- sleep();
+ Thread.yield();
- checkTimer();
-
+ spinLimiter.release();
+ }
+ catch (InterruptedException ignore)
+ {
+ }
}
}
- /**
- *
- */
- private void sleep()
- {
- long time = System.nanoTime() + timeout;
- while (time > System.nanoTime())
- {
- Thread.yield();
- }
- }
-
public void close()
{
closed = true;
Modified: trunk/src/main/org/hornetq/core/management/HornetQServerControl.java
===================================================================
--- trunk/src/main/org/hornetq/core/management/HornetQServerControl.java 2009-12-01
19:40:40 UTC (rev 8482)
+++ trunk/src/main/org/hornetq/core/management/HornetQServerControl.java 2009-12-01
20:18:47 UTC (rev 8483)
@@ -59,8 +59,12 @@
int getJournalMinFiles();
- int getJournalMaxAIO();
+ int getJournalMaxIO();
+ int getJournalBufferSize();
+
+ int getJournalBufferTimeout();
+
int getJournalCompactMinFiles();
int getJournalCompactPercentage();
@@ -87,10 +91,6 @@
boolean isSharedStore();
- int getAIOBufferSize();
-
- int getAIOBufferTimeout();
-
String getPagingDirectory();
boolean isPersistDeliveryCountBeforeDelivery();
Modified: trunk/src/main/org/hornetq/core/management/impl/HornetQServerControlImpl.java
===================================================================
---
trunk/src/main/org/hornetq/core/management/impl/HornetQServerControlImpl.java 2009-12-01
19:40:40 UTC (rev 8482)
+++
trunk/src/main/org/hornetq/core/management/impl/HornetQServerControlImpl.java 2009-12-01
20:18:47 UTC (rev 8483)
@@ -47,6 +47,7 @@
import org.hornetq.core.remoting.RemotingConnection;
import org.hornetq.core.remoting.server.RemotingService;
import org.hornetq.core.server.HornetQServer;
+import org.hornetq.core.server.JournalType;
import org.hornetq.core.server.ServerSession;
import org.hornetq.core.transaction.ResourceManager;
import org.hornetq.core.transaction.Transaction;
@@ -88,12 +89,12 @@
// Constructors --------------------------------------------------
public HornetQServerControlImpl(final PostOffice postOffice,
- final Configuration configuration,
- final ResourceManager resourceManager,
- final RemotingService remotingService,
- final HornetQServer messagingServer,
- final MessageCounterManager messageCounterManager,
- final NotificationBroadcasterSupport broadcaster) throws
Exception
+ final Configuration configuration,
+ final ResourceManager resourceManager,
+ final RemotingService remotingService,
+ final HornetQServer messagingServer,
+ final MessageCounterManager messageCounterManager,
+ final NotificationBroadcasterSupport broadcaster)
throws Exception
{
super(HornetQServerControl.class);
this.postOffice = postOffice;
@@ -128,7 +129,7 @@
{
return configuration.isBackup();
}
-
+
public boolean isSharedStore()
{
return configuration.isSharedStore();
@@ -146,19 +147,28 @@
public String[] getInterceptorClassNames()
{
- return configuration.getInterceptorClassNames().toArray(new
String[configuration.getInterceptorClassNames().size()]);
+ return configuration.getInterceptorClassNames().toArray(new
String[configuration.getInterceptorClassNames()
+
.size()]);
}
- public int getAIOBufferSize()
+ public int getJournalBufferSize()
{
- return configuration.getJournalBufferSize();
+ return configuration.getJournalType() == JournalType.ASYNCIO ?
configuration.getJournalBufferSize_AIO()
+ :
configuration.getJournalBufferSize_NIO();
}
-
- public int getAIOBufferTimeout()
+
+ public int getJournalBufferTimeout()
{
- return configuration.getJournalBufferTimeout();
+ return configuration.getJournalType() == JournalType.ASYNCIO ?
configuration.getJournalBufferTimeout_AIO()
+ :
configuration.getJournalBufferTimeout_NIO();
}
-
+
+ public int getJournalMaxIO()
+ {
+ return configuration.getJournalType() == JournalType.ASYNCIO ?
configuration.getJournalMaxIO_AIO()
+ :
configuration.getJournalMaxIO_NIO();
+ }
+
public String getJournalDirectory()
{
return configuration.getJournalDirectory();
@@ -169,16 +179,11 @@
return configuration.getJournalFileSize();
}
- public int getJournalMaxAIO()
- {
- return configuration.getJournalMaxAIO();
- }
-
public int getJournalMinFiles()
{
return configuration.getJournalMinFiles();
}
-
+
public int getJournalCompactMinFiles()
{
return configuration.getJournalCompactMinFiles();
@@ -208,7 +213,7 @@
{
return configuration.getScheduledThreadPoolMaxSize();
}
-
+
public int getThreadPoolMaxSize()
{
return configuration.getThreadPoolMaxSize();
@@ -265,7 +270,7 @@
{
server.createQueue(new SimpleString(address), new SimpleString(name), null, true,
false);
}
-
+
public void createQueue(final String address, final String name, final boolean
durable) throws Exception
{
server.createQueue(new SimpleString(address), new SimpleString(name), null,
durable, false);
@@ -291,7 +296,7 @@
QueueControl queue = (QueueControl)queues[i];
names[i] = queue.getName();
}
-
+
return names;
}
@@ -304,7 +309,7 @@
AddressControl address = (AddressControl)addresses[i];
names[i] = address.getAddress();
}
-
+
return names;
}
@@ -319,7 +324,7 @@
{
return server.getConnectionCount();
}
-
+
public void enableMessageCounters()
{
setMessageCounterEnabled(true);
@@ -382,7 +387,6 @@
{
DateFormat dateFormat = DateFormat.getDateTimeInstance(DateFormat.SHORT,
DateFormat.MEDIUM);
-
Map<Xid, Long> xids =
resourceManager.getPreparedTransactionsWithCreationTime();
ArrayList<Entry<Xid, Long>> xidsSortedByCreationTime = new
ArrayList<Map.Entry<Xid, Long>>(xids.entrySet());
Collections.sort(xidsSortedByCreationTime, new Comparator<Entry<Xid,
Long>>()
@@ -403,7 +407,7 @@
}
return s;
}
-
+
public String[] listHeuristicCommittedTransactions()
{
List<Xid> xids = resourceManager.getHeuristicCommittedTransactions();
@@ -415,7 +419,7 @@
}
return s;
}
-
+
public String[] listHeuristicRolledBackTransactions()
{
List<Xid> xids = resourceManager.getHeuristicRolledbackTransactions();
@@ -505,7 +509,7 @@
{
remotingService.removeConnection(connection.getID());
connection.fail(new HornetQException(HornetQException.INTERNAL_ERROR,
"connections for " + ipAddress +
-
" closed by management"));
+ "
closed by management"));
closed = true;
}
}
@@ -540,33 +544,33 @@
public Object[] getConnectors() throws Exception
{
Collection<TransportConfiguration> connectorConfigurations =
configuration.getConnectorConfigurations().values();
-
+
Object[] ret = new Object[connectorConfigurations.size()];
-
+
int i = 0;
- for (TransportConfiguration config: connectorConfigurations)
+ for (TransportConfiguration config : connectorConfigurations)
{
Object[] tc = new Object[3];
-
+
tc[0] = config.getName();
tc[1] = config.getFactoryClassName();
tc[2] = config.getParams();
-
+
ret[i++] = tc;
}
-
+
return ret;
}
-
+
public String getConnectorsAsJSON() throws Exception
{
JSONArray array = new JSONArray();
-
- for (TransportConfiguration config:
configuration.getConnectorConfigurations().values())
+
+ for (TransportConfiguration config :
configuration.getConnectorConfigurations().values())
{
array.put(new JSONObject(config));
}
-
+
return array.toString();
}
Modified:
trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
---
trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2009-12-01
19:40:40 UTC (rev 8482)
+++
trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2009-12-01
20:18:47 UTC (rev 8483)
@@ -93,7 +93,6 @@
*/
public class JournalStorageManager implements StorageManager
{
-
private static final Logger log = Logger.getLogger(JournalStorageManager.class);
private static final long CHECKPOINT_BATCH_SIZE = Integer.MAX_VALUE;
@@ -145,7 +144,7 @@
private final SequentialFileFactory largeMessagesFactory;
private volatile boolean started;
-
+
/** Used to create Operation Contexts */
private final ExecutorFactory executorFactory;
@@ -172,7 +171,9 @@
this(config, executorFactory, null);
}
- public JournalStorageManager(final Configuration config, final ExecutorFactory
executorFactory, final ReplicationManager replicator)
+ public JournalStorageManager(final Configuration config,
+ final ExecutorFactory executorFactory,
+ final ReplicationManager replicator)
{
this.executorFactory = executorFactory;
@@ -229,37 +230,32 @@
SequentialFileFactory journalFF = null;
- if (config.getJournalType() == JournalType.ASYNCIO)
+ JournalType journalTypeToUse = config.getJournalType();
+
+ if (config.getJournalType() == JournalType.ASYNCIO &&
!AIOSequentialFileFactory.isSupported())
{
+ log.warn("AIO wasn't located on this platform, it will fall back to
using pure Java NIO. If your platform is Linux, install LibAIO to enable the AIO
journal");
+
+ journalTypeToUse = JournalType.NIO;
+ }
+
+ if (journalTypeToUse == JournalType.ASYNCIO)
+ {
log.info("AIO journal selected");
- if (!AIOSequentialFileFactory.isSupported())
- {
- log.warn("AIO wasn't located on this platform, it will fall back to
using pure Java NIO. If your platform is Linux, install LibAIO to enable the AIO
journal");
- journalFF = new NIOSequentialFileFactory(journalDir,
- true,
- config.getJournalBufferSize(),
- config.getJournalBufferTimeout(),
- config.isJournalFlushOnSync(),
- config.isLogJournalWriteRate());
- }
- else
- {
- journalFF = new AIOSequentialFileFactory(journalDir,
- config.getJournalBufferSize(),
- config.getJournalBufferTimeout(),
- config.isJournalFlushOnSync(),
- config.isLogJournalWriteRate());
- log.info("AIO loaded successfully");
- }
+
+ journalFF = new AIOSequentialFileFactory(journalDir,
+ config.getJournalBufferSize_AIO(),
+ config.getJournalBufferTimeout_AIO(),
+ config.isLogJournalWriteRate());
+ log.info("AIO loaded successfully");
}
else if (config.getJournalType() == JournalType.NIO)
{
log.info("NIO Journal selected");
journalFF = new NIOSequentialFileFactory(journalDir,
true,
- config.getJournalBufferSize(),
- config.getJournalBufferTimeout(),
- config.isJournalFlushOnSync(),
+ config.getJournalBufferSize_NIO(),
+ config.getJournalBufferTimeout_NIO(),
config.isLogJournalWriteRate());
}
else
@@ -283,7 +279,8 @@
journalFF,
"hornetq-data",
"hq",
- config.getJournalMaxAIO());
+ config.getJournalType() ==
JournalType.ASYNCIO ? config.getJournalMaxIO_AIO()
+
: config.getJournalMaxIO_NIO());
if (replicator != null)
{
@@ -308,7 +305,7 @@
{
getContext().complete();
}
-
+
public void clearContext()
{
OperationContextImpl.clearContext();
@@ -319,7 +316,6 @@
return replicator != null;
}
-
public void waitOnOperations() throws Exception
{
waitOnOperations(-1);
@@ -337,8 +333,7 @@
{
waitCallback.waitCompletion();
}
- else
- if (!waitCallback.waitCompletion(timeout))
+ else if (!waitCallback.waitCompletion(timeout))
{
throw new IllegalStateException("no response received from
replication");
}
@@ -383,7 +378,6 @@
// TODO: shouldn't those page methods be on the PageManager? ^^^^
-
/* (non-Javadoc)
* @see org.hornetq.core.persistence.StorageManager#getContext()
*/
@@ -391,7 +385,7 @@
{
return OperationContextImpl.getContext(executorFactory);
}
-
+
public void setContext(OperationContext context)
{
OperationContextImpl.setContext(context);
@@ -478,12 +472,12 @@
public void storeMessage(final ServerMessage message) throws Exception
{
- //TODO - how can this be less than zero?
+ // TODO - how can this be less than zero?
if (message.getMessageID() <= 0)
{
throw new HornetQException(HornetQException.ILLEGAL_STATE, "MessageId was
not assigned to Message");
}
-
+
// Note that we don't sync, the add reference that comes immediately after will
sync if appropriate
if (message.isLargeMessage())
@@ -491,50 +485,64 @@
messageJournal.appendAddRecord(message.getMessageID(),
ADD_LARGE_MESSAGE,
new
LargeMessageEncoding((LargeServerMessage)message),
- false, getContext(false));
+ false,
+ getContext(false));
}
else
- {
+ {
messageJournal.appendAddRecord(message.getMessageID(), ADD_MESSAGE, message,
false, getContext(false));
}
}
+ public void storeReference(final long queueID, final long messageID, final boolean
last) throws Exception
- public void storeReference(final long queueID, final long messageID, final boolean
last) throws Exception
- {
- messageJournal.appendUpdateRecord(messageID, ADD_REF, new RefEncoding(queueID),
last && syncNonTransactional, getContext(syncNonTransactional));
+ {
+ messageJournal.appendUpdateRecord(messageID,
+ ADD_REF,
+ new RefEncoding(queueID),
+ last && syncNonTransactional,
+ getContext(last &&
syncNonTransactional));
}
public void storeAcknowledge(final long queueID, final long messageID) throws
Exception
- {
- messageJournal.appendUpdateRecord(messageID, ACKNOWLEDGE_REF, new
RefEncoding(queueID), syncNonTransactional, getContext(syncNonTransactional));
+ {
+ messageJournal.appendUpdateRecord(messageID,
+ ACKNOWLEDGE_REF,
+ new RefEncoding(queueID),
+ syncNonTransactional,
+ getContext(syncNonTransactional));
}
public void deleteMessage(final long messageID) throws Exception
- {
+ {
messageJournal.appendDeleteRecord(messageID, syncNonTransactional,
getContext(syncNonTransactional));
}
public void updateScheduledDeliveryTime(final MessageReference ref) throws Exception
- {
+ {
ScheduledDeliveryEncoding encoding = new
ScheduledDeliveryEncoding(ref.getScheduledDeliveryTime(), ref.getQueue()
.getID());
messageJournal.appendUpdateRecord(ref.getMessage().getMessageID(),
SET_SCHEDULED_DELIVERY_TIME,
encoding,
- syncNonTransactional,
getContext(syncNonTransactional));
+ syncNonTransactional,
+ getContext(syncNonTransactional));
}
public void storeDuplicateID(final SimpleString address, final byte[] duplID, final
long recordID) throws Exception
- {
+ {
DuplicateIDEncoding encoding = new DuplicateIDEncoding(address, duplID);
- messageJournal.appendAddRecord(recordID, DUPLICATE_ID, encoding,
syncNonTransactional, getContext(syncNonTransactional));
+ messageJournal.appendAddRecord(recordID,
+ DUPLICATE_ID,
+ encoding,
+ syncNonTransactional,
+ getContext(syncNonTransactional));
}
public void deleteDuplicateID(long recordID) throws Exception
- {
+ {
messageJournal.appendDeleteRecord(recordID, syncNonTransactional,
getContext(syncNonTransactional));
}
@@ -591,7 +599,12 @@
public long storeHeuristicCompletion(Xid xid, boolean isCommit) throws Exception
{
long id = generateUniqueID();
- messageJournal.appendAddRecord(id, HEURISTIC_COMPLETION, new
HeuristicCompletionEncoding(xid, isCommit), true, getContext(true));
+
+ messageJournal.appendAddRecord(id,
+ HEURISTIC_COMPLETION,
+ new HeuristicCompletionEncoding(xid, isCommit),
+ true,
+ getContext(true));
return id;
}
@@ -668,10 +681,10 @@
DeliveryCountUpdateEncoding updateInfo = new
DeliveryCountUpdateEncoding(ref.getQueue().getID(),
ref.getDeliveryCount());
- messageJournal.appendUpdateRecord(ref.getMessage().getMessageID(),
- UPDATE_DELIVERY_COUNT,
- updateInfo,
- syncNonTransactional,
getContext(syncNonTransactional));
+ messageJournal.appendUpdateRecord(ref.getMessage().getMessageID(),
UPDATE_DELIVERY_COUNT, updateInfo,
+
+ syncNonTransactional, getContext(syncNonTransactional));
+
}
private static final class AddMessageRecord
@@ -738,7 +751,7 @@
JournalLoadInformation info = messageJournal.load(records,
preparedTransactions,
new
LargeMessageTXFailureCallback(messages));
-
+
ArrayList<LargeServerMessage> largeMessages = new
ArrayList<LargeServerMessage>();
Map<Long, Map<Long, AddMessageRecord>> queueMap = new HashMap<Long,
Map<Long, AddMessageRecord>>();
@@ -764,7 +777,7 @@
break;
}
case ADD_MESSAGE:
- {
+ {
ServerMessage message = new ServerMessageImpl(record.id, 50);
message.decode(buff);
@@ -967,6 +980,11 @@
messageJournal.perfBlast(perfBlastPages);
}
+ if (System.getProperty("org.hornetq.opt.directblast") != null)
+ {
+ messageJournal.runDirectJournalBlast();
+ }
+
return info;
}
@@ -1355,7 +1373,7 @@
return info;
}
-
+
// Public
-----------------------------------------------------------------------------------
public Journal getMessageJournal()
@@ -1416,7 +1434,7 @@
}
// Private
----------------------------------------------------------------------------------
-
+
private void checkAndCreateDir(final String dir, final boolean create)
{
File f = new File(dir);
@@ -1464,18 +1482,15 @@
return DummyOperationContext.getInstance();
}
}
-
-
// Inner Classes
// ----------------------------------------------------------------------------
-
static class DummyOperationContext implements OperationContext
{
-
+
private static DummyOperationContext instance = new DummyOperationContext();
-
+
public static OperationContext getInstance()
{
return instance;
@@ -1512,7 +1527,7 @@
/* (non-Javadoc)
* @see org.hornetq.core.journal.IOCompletion#lineUp()
*/
- public void lineUp()
+ public void storeLineUp()
{
}
@@ -1529,9 +1544,9 @@
public void onError(int errorCode, String errorMessage)
{
}
-
+
}
-
+
private static class XidEncoding implements EncodingSupport
{
final Xid xid;
@@ -2018,5 +2033,4 @@
}
-
}
Modified:
trunk/src/main/org/hornetq/core/persistence/impl/journal/OperationContextImpl.java
===================================================================
---
trunk/src/main/org/hornetq/core/persistence/impl/journal/OperationContextImpl.java 2009-12-01
19:40:40 UTC (rev 8482)
+++
trunk/src/main/org/hornetq/core/persistence/impl/journal/OperationContextImpl.java 2009-12-01
20:18:47 UTC (rev 8483)
@@ -20,6 +20,7 @@
import java.util.concurrent.atomic.AtomicInteger;
import org.hornetq.core.journal.IOAsyncTask;
+import org.hornetq.core.logging.Logger;
import org.hornetq.core.persistence.OperationContext;
import org.hornetq.utils.ExecutorFactory;
@@ -38,6 +39,7 @@
*/
public class OperationContextImpl implements OperationContext
{
+ private static final Logger log = Logger.getLogger(OperationContextImpl.class);
private static final ThreadLocal<OperationContext> threadLocalContext = new
ThreadLocal<OperationContext>();
@@ -61,8 +63,7 @@
{
threadLocalContext.set(context);
}
-
-
+
private List<TaskHolder> tasks;
private volatile int storeLineUp = 0;
@@ -90,10 +91,9 @@
super();
this.executor = executor;
}
-
- /** To be called by the replication manager, when new replication is added to the
queue */
- public void lineUp()
- {
+
+ public void storeLineUp()
+ {
storeLineUp++;
}
@@ -108,7 +108,6 @@
checkTasks();
}
- /** You may have several actions to be done after a replication operation is
completed. */
public void executeOnCompletion(final IOAsyncTask completion)
{
if (errorCode != -1)
@@ -159,7 +158,6 @@
}
- /** To be called by the storage manager, when data is confirmed on the channel */
public synchronized void done()
{
stored++;
Modified: trunk/src/main/org/hornetq/core/replication/impl/ReplicatedJournal.java
===================================================================
--- trunk/src/main/org/hornetq/core/replication/impl/ReplicatedJournal.java 2009-12-01
19:40:40 UTC (rev 8482)
+++ trunk/src/main/org/hornetq/core/replication/impl/ReplicatedJournal.java 2009-12-01
20:18:47 UTC (rev 8483)
@@ -519,6 +519,12 @@
return localJournal.getNumberOfRecords();
}
+ public void runDirectJournalBlast() throws Exception
+ {
+ // TODO Auto-generated method stub
+
+ }
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
Modified: trunk/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java 2009-12-01
19:40:40 UTC (rev 8482)
+++ trunk/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java 2009-12-01
20:18:47 UTC (rev 8483)
@@ -695,12 +695,24 @@
}
catch (HornetQException e)
{
+ csf.close();
+
// the session was created while its server was starting, retry it:
if (e.getCode() == HornetQException.SESSION_CREATION_REJECTED)
{
log.warn("Server is starting, retry to create the session for bridge
" + name);
+ //Sleep a little to prevent spinning too much
+ try
+ {
+ Thread.sleep(10);
+ }
+ catch (InterruptedException ignore)
+ {
+ }
+
retry = true;
+
continue;
}
else
Modified: trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2009-12-01 19:40:40
UTC (rev 8482)
+++ trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2009-12-01 20:18:47
UTC (rev 8483)
@@ -22,6 +22,7 @@
import java.util.Set;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -52,7 +53,10 @@
import org.hornetq.core.exception.HornetQException;
import org.hornetq.core.filter.Filter;
import org.hornetq.core.filter.impl.FilterImpl;
+import org.hornetq.core.journal.EncodingSupport;
import org.hornetq.core.journal.IOAsyncTask;
+import org.hornetq.core.journal.IOCompletion;
+import org.hornetq.core.journal.Journal;
import org.hornetq.core.journal.JournalLoadInformation;
import org.hornetq.core.journal.impl.SyncSpeedTest;
import org.hornetq.core.logging.LogDelegateFactory;
@@ -90,11 +94,16 @@
import org.hornetq.core.security.SecurityStore;
import org.hornetq.core.security.impl.SecurityStoreImpl;
import org.hornetq.core.server.ActivateCallback;
+import org.hornetq.core.server.Consumer;
import org.hornetq.core.server.Divert;
+import org.hornetq.core.server.HandleStatus;
import org.hornetq.core.server.HornetQServer;
import org.hornetq.core.server.MemoryManager;
+import org.hornetq.core.server.MessageReference;
import org.hornetq.core.server.Queue;
import org.hornetq.core.server.QueueFactory;
+import org.hornetq.core.server.RoutingContext;
+import org.hornetq.core.server.ServerConsumer;
import org.hornetq.core.server.ServerMessage;
import org.hornetq.core.server.ServerSession;
import org.hornetq.core.server.cluster.ClusterManager;
@@ -630,7 +639,7 @@
throw new
HornetQException(HornetQException.INCOMPATIBLE_CLIENT_SERVER_VERSIONS,
"Server and client versions
incompatible");
}
-
+
if (!checkActivate())
{
// Backup server is not ready to accept connections
@@ -970,7 +979,7 @@
if (replicationEndpoint == null)
{
log.warn("There is no replication endpoint, can't activate this
backup server");
-
+
throw new HornetQException(HornetQException.INTERNAL_ERROR,
"Can't activate the server");
}
@@ -1189,6 +1198,8 @@
}
initialised = true;
+
+ log.info("********** initialised");
if (System.getProperty("org.hornetq.opt.routeblast") != null)
{
@@ -1482,14 +1493,50 @@
}
}
+
+// private void runRouteBlastNoWait() throws Exception
+// {
+// SimpleString address = new SimpleString("rbnw_address");
+// SimpleString queueName = new SimpleString("rbnw_name");
+//
+// createQueue(address, queueName, null, true, false, true);
+//
+// Queue queue = (Queue)postOffice.getBinding(queueName).getBindable();
+//
+// RBConsumer consumer = new RBConsumer(queue);
+//
+// queue.addConsumer(consumer);
+//
+// final int bodySize = 1024;
+//
+// byte[] body = new byte[bodySize];
+//
+// final int numMessages = 10000000;
+//
+// for (int i = 0; i < numMessages; i++)
+// {
+// final ServerMessage msg = new
ServerMessageImpl(storageManager.generateUniqueID(), 1500);
+//
+// msg.getBodyBuffer().writeBytes(body);
+//
+// msg.setDestination(address);
+//
+// msg.setDurable(true);
+//
+// postOffice.route(msg);
+// }
+// }
+
private LinkedBlockingQueue<RouteBlastRunner> available = new
LinkedBlockingQueue<RouteBlastRunner>();
+
private void runRouteBlast() throws Exception
{
log.info("*** running route blast");
- final int numThreads = 2;
+
+ final int numThreads = 1;
- final int numClients = 200;
+ final int numClients = 1000;
for (int i = 0; i < numClients; i++)
{
@@ -1499,7 +1546,7 @@
available.add(run);
}
-
+
log.info("setup, now running");
Set<Thread> runners = new HashSet<Thread>();
@@ -1518,43 +1565,39 @@
t.join();
}
}
-
- class Foo implements Runnable
- {
- public void run()
- {
- for (int i = 0; i < 1000000; i++)
- {
- try
- {
- RouteBlastRunner runner = available.take();
-
- runner.run();
- }
- catch (InterruptedException e)
- {
- log.error("Interrupted", e);
- }
- }
- }
- }
-
+
class RouteBlastRunner implements Runnable
{
private SimpleString address;
+ private Set<Consumer> consumers = new HashSet<Consumer>();
+
RouteBlastRunner(SimpleString address)
{
this.address = address;
}
+
+
public void setup() throws Exception
{
final int numQueues = 1;
for (int i = 0; i < numQueues; i++)
{
- createQueue(address, new SimpleString(address +
".hq.route_blast_queue" + i), null, true, false, true);
+ SimpleString queueName = new SimpleString(address +
".hq.route_blast_queue" + i);
+
+ createQueue(address, queueName, null, true, false, true);
+
+ Queue queue = (Queue)postOffice.getBinding(queueName).getBindable();
+
+ RBConsumer consumer = new RBConsumer(queue);
+
+ queue.addConsumer(consumer);
+
+ //log.info("added consumer to queue " + queue);
+
+ consumers.add(consumer);
}
}
@@ -1573,7 +1616,7 @@
msg.setDestination(address);
msg.setDurable(true);
-
+
postOffice.route(msg);
storageManager.afterCompleteOperations(new IOAsyncTask()
@@ -1584,7 +1627,7 @@
}
public void done()
- {
+ {
available.add(RouteBlastRunner.this);
}
});
@@ -1596,7 +1639,59 @@
}
}
+
+
+
+
+ class Foo implements Runnable
+ {
+ public void run()
+ {
+ for (int i = 0; i < 1000000; i++)
+ {
+ try
+ {
+ RouteBlastRunner runner = available.take();
+ runner.run();
+ }
+ catch (InterruptedException e)
+ {
+ log.error("Interrupted", e);
+ }
+ }
+ }
+ }
+
+ private class RBConsumer implements Consumer
+ {
+ private Queue queue;
+
+ RBConsumer(Queue queue)
+ {
+ this.queue = queue;
+ }
+
+ public Filter getFilter()
+ {
+ return null;
+ }
+
+ public HandleStatus handle(MessageReference reference) throws Exception
+ {
+ reference.handled();
+
+ queue.acknowledge(reference);
+
+ //log.info("acking");
+
+ return HandleStatus.HANDLED;
+ }
+
+ }
+
+
+
// Inner classes
// --------------------------------------------------------------------------------
}
Modified: trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java 2009-12-01 19:40:40 UTC
(rev 8482)
+++ trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java 2009-12-01 20:18:47 UTC
(rev 8483)
@@ -1227,7 +1227,7 @@
}
private synchronized boolean directDeliver(final MessageReference reference)
- {
+ {
if (paused || handlers.isEmpty())
{
return false;
@@ -1241,7 +1241,7 @@
MessageHandler handler = getHandlerRoundRobin();
Consumer consumer = handler.getConsumer();
-
+
if (!checkExpired(reference))
{
SimpleString groupID =
reference.getMessage().getSimpleStringProperty(MessageImpl.HDR_GROUP_ID);
@@ -1303,7 +1303,7 @@
}
protected synchronized void add(final MessageReference ref, final boolean first)
- {
+ {
if (dontAdd)
{
return;
Modified: trunk/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java 2009-12-01 19:40:40
UTC (rev 8482)
+++ trunk/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java 2009-12-01 20:18:47
UTC (rev 8483)
@@ -61,7 +61,9 @@
*/
public ServerMessageImpl(final long messageID, final int initialMessageBufferSize)
{
- super(messageID, initialMessageBufferSize);
+ super(initialMessageBufferSize);
+
+ this.messageID = messageID;
}
protected ServerMessageImpl(final int initialMessageBufferSize)
Modified: trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2009-12-01 19:40:40
UTC (rev 8482)
+++ trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2009-12-01 20:18:47
UTC (rev 8483)
@@ -186,7 +186,7 @@
private final HornetQServer server;
private final SimpleString managementAddress;
-
+
// The current currentLargeMessage being processed
private volatile LargeServerMessage currentLargeMessage;
@@ -241,7 +241,7 @@
this.resourceManager = resourceManager;
this.securityStore = securityStore;
-
+
this.executor = executor;
if (!xa)
@@ -1455,52 +1455,11 @@
sendResponse(packet, null, false, false);
}
}
-
-
-
- public void handleSend2(final ServerMessage message)
- {
- try
- {
- long id = storageManager.generateUniqueID();
- message.setMessageID(id);
- message.encodeMessageIDToBuffer();
-
- if (message.getDestination().equals(managementAddress))
- {
- // It's a management message
-
- handleManagementMessage(message);
- }
- else
- {
- send(message);
- }
- }
- catch (Exception e)
- {
- log.error("Failed to send message", e);
-
- }
- finally
- {
- try
- {
- releaseOutStanding(message, message.getEncodeSize());
- }
- catch (Exception e)
- {
- log.error("Failed to release outstanding credits", e);
- }
- }
-
- }
-
public void handleSend(final SessionSendMessage packet)
{
Packet response = null;
-
+
ServerMessage message = (ServerMessage)packet.getMessage();
try
@@ -1553,8 +1512,8 @@
log.error("Failed to release outstanding credits", e);
}
}
-
- sendResponse(packet, response, false, false);
+
+ sendResponse(packet, response, false, false);
}
public void handleSendContinuations(final SessionSendContinuationMessage packet)
@@ -1638,7 +1597,7 @@
}
}
});
-
+
if (gotCredits > 0)
{
sendProducerCredits(holder, gotCredits, address);
@@ -1752,30 +1711,26 @@
// Private
// ----------------------------------------------------------------------------
- /**
- * Respond to client after replication
- * @param packet
- * @param response
- */
private void sendResponse(final Packet confirmPacket,
final Packet response,
final boolean flush,
final boolean closeChannel)
{
storageManager.afterCompleteOperations(new IOAsyncTask()
- {
+ {
public void onError(int errorCode, String errorMessage)
{
log.warn("Error processing IOCallback code = " + errorCode + "
message = " + errorMessage);
- HornetQExceptionMessage exceptionMessage = new HornetQExceptionMessage(new
HornetQException(errorCode, errorMessage));
-
- doSendResponse(confirmPacket, exceptionMessage, flush, closeChannel);
+ HornetQExceptionMessage exceptionMessage = new HornetQExceptionMessage(new
HornetQException(errorCode,
+
errorMessage));
+
+ doConfirmAndResponse(confirmPacket, exceptionMessage, flush, closeChannel);
}
public void done()
{
- doSendResponse(confirmPacket, response, flush, closeChannel);
+ doConfirmAndResponse(confirmPacket, response, flush, closeChannel);
}
});
}
@@ -1786,11 +1741,11 @@
* @param flush
* @param closeChannel
*/
- private void doSendResponse(final Packet confirmPacket,
- final Packet response,
- final boolean flush,
- final boolean closeChannel)
- {
+ private void doConfirmAndResponse(final Packet confirmPacket,
+ final Packet response,
+ final boolean flush,
+ final boolean closeChannel)
+ {
if (confirmPacket != null)
{
channel.confirm(confirmPacket);
@@ -1802,7 +1757,7 @@
}
if (response != null)
- {
+ {
channel.send(response);
}
Modified: trunk/src/main/org/hornetq/utils/TokenBucketLimiterImpl.java
===================================================================
--- trunk/src/main/org/hornetq/utils/TokenBucketLimiterImpl.java 2009-12-01 19:40:40 UTC
(rev 8482)
+++ trunk/src/main/org/hornetq/utils/TokenBucketLimiterImpl.java 2009-12-01 20:18:47 UTC
(rev 8483)
@@ -53,17 +53,21 @@
{
while (!check())
{
- if (!spin)
+ if (spin)
{
- try
- {
- Thread.sleep(1);
- }
- catch (Exception e)
- {
- //Ignore
- }
+ Thread.yield();
}
+ else
+ {
+ try
+ {
+ Thread.sleep(1);
+ }
+ catch (Exception e)
+ {
+ //Ignore
+ }
+ }
}
}
Modified: trunk/tests/config/ConfigurationTest-full-config.xml
===================================================================
--- trunk/tests/config/ConfigurationTest-full-config.xml 2009-12-01 19:40:40 UTC (rev
8482)
+++ trunk/tests/config/ConfigurationTest-full-config.xml 2009-12-01 20:18:47 UTC (rev
8483)
@@ -38,8 +38,7 @@
<create-journal-dir>false</create-journal-dir>
<journal-type>NIO</journal-type>
<journal-compact-min-files>123</journal-compact-min-files>
- <journal-compact-percentage>33</journal-compact-percentage>
- <journal-flush-on-sync>true</journal-flush-on-sync>
+ <journal-compact-percentage>33</journal-compact-percentage>
<journal-buffer-timeout>1000</journal-buffer-timeout>
<journal-buffer-size>10000</journal-buffer-size>
<journal-sync-transactional>false</journal-sync-transactional>
Modified: trunk/tests/src/org/hornetq/tests/integration/client/BlockingSendTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/client/BlockingSendTest.java 2009-12-01
19:40:40 UTC (rev 8482)
+++ trunk/tests/src/org/hornetq/tests/integration/client/BlockingSendTest.java 2009-12-01
20:18:47 UTC (rev 8483)
@@ -51,7 +51,7 @@
{
server.getConfiguration().setJournalSyncNonTransactional(false);
- server.getConfiguration().setJournalBufferTimeout(15000);
+ server.getConfiguration().setJournalBufferTimeout_AIO(15000);
server.start();
Modified:
trunk/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
===================================================================
---
trunk/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java 2009-12-01
19:40:40 UTC (rev 8482)
+++
trunk/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java 2009-12-01
20:18:47 UTC (rev 8483)
@@ -1138,10 +1138,9 @@
configuration.setSecurityEnabled(false);
configuration.setJournalMinFiles(2);
- configuration.setJournalMaxAIO(1000);
+ configuration.setJournalMaxIO_AIO(1000);
configuration.setJournalFileSize(100 * 1024);
configuration.setJournalType(JournalType.ASYNCIO);
- configuration.setJournalMaxAIO(1000);
configuration.setSharedStore(sharedStorage);
if (sharedStorage)
{
@@ -1254,7 +1253,7 @@
configuration.setJournalDirectory(getJournalDir(node, false));
configuration.setJournalFileSize(100 * 1024);
configuration.setJournalType(JournalType.ASYNCIO);
- configuration.setJournalMaxAIO(1000);
+ configuration.setJournalMaxIO_AIO(1000);
configuration.setPagingDirectory(getPageDir(node, false));
configuration.setLargeMessagesDirectory(getLargeMessagesDir(node, false));
configuration.setClustered(true);
Modified:
trunk/tests/src/org/hornetq/tests/integration/cluster/failover/GroupingFailoverReplicationTest.java
===================================================================
---
trunk/tests/src/org/hornetq/tests/integration/cluster/failover/GroupingFailoverReplicationTest.java 2009-12-01
19:40:40 UTC (rev 8482)
+++
trunk/tests/src/org/hornetq/tests/integration/cluster/failover/GroupingFailoverReplicationTest.java 2009-12-01
20:18:47 UTC (rev 8483)
@@ -42,11 +42,10 @@
configuration.setSecurityEnabled(false);
configuration.setBindingsDirectory(getBindingsDir(node, false));
configuration.setJournalMinFiles(2);
- configuration.setJournalMaxAIO(1000);
+ configuration.setJournalMaxIO_AIO(1000);
configuration.setJournalDirectory(getJournalDir(node, false));
configuration.setJournalFileSize(100 * 1024);
configuration.setJournalType(JournalType.ASYNCIO);
- configuration.setJournalMaxAIO(1000);
configuration.setPagingDirectory(getPageDir(node, false));
configuration.setLargeMessagesDirectory(getLargeMessagesDir(node, false));
configuration.setClustered(true);
Modified:
trunk/tests/src/org/hornetq/tests/integration/cluster/failover/GroupingFailoverSharedServerTest.java
===================================================================
---
trunk/tests/src/org/hornetq/tests/integration/cluster/failover/GroupingFailoverSharedServerTest.java 2009-12-01
19:40:40 UTC (rev 8482)
+++
trunk/tests/src/org/hornetq/tests/integration/cluster/failover/GroupingFailoverSharedServerTest.java 2009-12-01
20:18:47 UTC (rev 8483)
@@ -12,18 +12,15 @@
*/
package org.hornetq.tests.integration.cluster.failover;
-import org.hornetq.core.server.group.impl.GroupingHandlerConfiguration;
-import org.hornetq.core.server.JournalType;
-import org.hornetq.core.server.HornetQServer;
-import org.hornetq.core.server.HornetQ;
-import org.hornetq.core.message.impl.MessageImpl;
+import java.util.Map;
+
import org.hornetq.core.config.Configuration;
import org.hornetq.core.config.TransportConfiguration;
import org.hornetq.core.config.impl.ConfigurationImpl;
-import org.hornetq.utils.SimpleString;
+import org.hornetq.core.server.HornetQ;
+import org.hornetq.core.server.HornetQServer;
+import org.hornetq.core.server.JournalType;
-import java.util.Map;
-
/**
* @author <a href="mailto:andy.taylor@jboss.org">Andy Taylor</a>
* Created Oct 26, 2009
@@ -42,11 +39,10 @@
configuration.setSecurityEnabled(false);
configuration.setBindingsDirectory(getBindingsDir(backupNode, false));
configuration.setJournalMinFiles(2);
- configuration.setJournalMaxAIO(1000);
+ configuration.setJournalMaxIO_AIO(1000);
configuration.setJournalDirectory(getJournalDir(backupNode, false));
configuration.setJournalFileSize(100 * 1024);
configuration.setJournalType(JournalType.ASYNCIO);
- configuration.setJournalMaxAIO(1000);
configuration.setPagingDirectory(getPageDir(backupNode, false));
configuration.setLargeMessagesDirectory(getLargeMessagesDir(backupNode, false));
configuration.setClustered(true);
Modified: trunk/tests/src/org/hornetq/tests/integration/jms/cluster/JMSFailoverTest.java
===================================================================
---
trunk/tests/src/org/hornetq/tests/integration/jms/cluster/JMSFailoverTest.java 2009-12-01
19:40:40 UTC (rev 8482)
+++
trunk/tests/src/org/hornetq/tests/integration/jms/cluster/JMSFailoverTest.java 2009-12-01
20:18:47 UTC (rev 8483)
@@ -16,6 +16,7 @@
import java.util.HashMap;
import java.util.Map;
+import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.ExceptionListener;
@@ -41,6 +42,7 @@
import org.hornetq.jms.HornetQQueue;
import org.hornetq.jms.client.HornetQConnectionFactory;
import org.hornetq.jms.client.HornetQSession;
+import org.hornetq.tests.util.RandomUtil;
import org.hornetq.tests.util.UnitTestCase;
import org.hornetq.utils.SimpleString;
@@ -122,11 +124,15 @@
MessageConsumer consumer = sess.createConsumer(queue);
+ byte[] body = RandomUtil.randomBytes(bodySize);
+
for (int i = 0; i < numMessages; i++)
{
- TextMessage tm = sess.createTextMessage("message" + i);
+ BytesMessage bm = sess.createBytesMessage();
+
+ bm.writeBytes(body);
- producer.send(tm);
+ producer.send(bm);
}
conn.start();
@@ -143,11 +149,11 @@
{
log.info("got message " + i);
- TextMessage tm = (TextMessage)consumer.receive(1000);
+ BytesMessage bm = (BytesMessage)consumer.receive(1000);
- assertNotNull(tm);
+ assertNotNull(bm);
- assertEquals("message" + i, tm.getText());
+ assertEquals(body.length, bm.getBodyLength());
}
TextMessage tm = (TextMessage)consumer.receiveNoWait();
Modified:
trunk/tests/src/org/hornetq/tests/integration/journal/AIOJournalCompactTest.java
===================================================================
---
trunk/tests/src/org/hornetq/tests/integration/journal/AIOJournalCompactTest.java 2009-12-01
19:40:40 UTC (rev 8482)
+++
trunk/tests/src/org/hornetq/tests/integration/journal/AIOJournalCompactTest.java 2009-12-01
20:18:47 UTC (rev 8483)
@@ -30,15 +30,12 @@
*
*/
public class AIOJournalCompactTest extends NIOJournalCompactTest
-{
-
-
+{
public static TestSuite suite()
{
return createAIOTestSuite(AIOJournalCompactTest.class);
}
-
// Constants -----------------------------------------------------
// Attributes ----------------------------------------------------
@@ -63,9 +60,8 @@
file.mkdir();
return new AIOSequentialFileFactory(getTestDir(),
- ConfigurationImpl.DEFAULT_JOURNAL_BUFFER_SIZE,
- 1000000,
- true,
+
ConfigurationImpl.DEFAULT_JOURNAL_BUFFER_SIZE_AIO,
+ 1000000,
false
);
}
Modified: trunk/tests/src/org/hornetq/tests/integration/journal/AIOJournalImplTest.java
===================================================================
---
trunk/tests/src/org/hornetq/tests/integration/journal/AIOJournalImplTest.java 2009-12-01
19:40:40 UTC (rev 8482)
+++
trunk/tests/src/org/hornetq/tests/integration/journal/AIOJournalImplTest.java 2009-12-01
20:18:47 UTC (rev 8483)
@@ -70,9 +70,8 @@
file.mkdir();
return new AIOSequentialFileFactory(getTestDir(),
- ConfigurationImpl.DEFAULT_JOURNAL_BUFFER_SIZE,
- 1000000,
- true,
+
ConfigurationImpl.DEFAULT_JOURNAL_BUFFER_SIZE_AIO,
+ 1000000,
false
);
}
Modified:
trunk/tests/src/org/hornetq/tests/integration/journal/ValidateTransactionHealthTest.java
===================================================================
---
trunk/tests/src/org/hornetq/tests/integration/journal/ValidateTransactionHealthTest.java 2009-12-01
19:40:40 UTC (rev 8482)
+++
trunk/tests/src/org/hornetq/tests/integration/journal/ValidateTransactionHealthTest.java 2009-12-01
20:18:47 UTC (rev 8483)
@@ -388,9 +388,8 @@
if (factoryType.equals("aio"))
{
return new AIOSequentialFileFactory(directory,
-
ConfigurationImpl.DEFAULT_JOURNAL_BUFFER_SIZE,
-
ConfigurationImpl.DEFAULT_JOURNAL_BUFFER_TIMEOUT,
-
ConfigurationImpl.DEFAULT_JOURNAL_FLUSH_SYNC,
+
ConfigurationImpl.DEFAULT_JOURNAL_BUFFER_SIZE_AIO,
+
ConfigurationImpl.DEFAULT_JOURNAL_BUFFER_TIMEOUT_AIO,
false);
}
else
Modified:
trunk/tests/src/org/hornetq/tests/integration/management/HornetQServerControlTest.java
===================================================================
---
trunk/tests/src/org/hornetq/tests/integration/management/HornetQServerControlTest.java 2009-12-01
19:40:40 UTC (rev 8482)
+++
trunk/tests/src/org/hornetq/tests/integration/management/HornetQServerControlTest.java 2009-12-01
20:18:47 UTC (rev 8483)
@@ -106,9 +106,9 @@
assertEquals(conf.isJournalSyncNonTransactional(),
serverControl.isJournalSyncNonTransactional());
assertEquals(conf.getJournalFileSize(), serverControl.getJournalFileSize());
assertEquals(conf.getJournalMinFiles(), serverControl.getJournalMinFiles());
- assertEquals(conf.getJournalMaxAIO(), serverControl.getJournalMaxAIO());
- assertEquals(conf.getJournalBufferSize(), serverControl.getAIOBufferSize());
- assertEquals(conf.getJournalBufferTimeout(), serverControl.getAIOBufferTimeout());
+ assertEquals(conf.getJournalMaxIO_AIO(), serverControl.getJournalMaxIO());
+ assertEquals(conf.getJournalBufferSize_AIO(),
serverControl.getJournalBufferSize());
+ assertEquals(conf.getJournalBufferTimeout_AIO(),
serverControl.getJournalBufferTimeout());
assertEquals(conf.isCreateBindingsDir(), serverControl.isCreateBindingsDir());
assertEquals(conf.isCreateJournalDir(), serverControl.isCreateJournalDir());
assertEquals(conf.getPagingDirectory(), serverControl.getPagingDirectory());
Modified:
trunk/tests/src/org/hornetq/tests/integration/management/HornetQServerControlUsingCoreTest.java
===================================================================
---
trunk/tests/src/org/hornetq/tests/integration/management/HornetQServerControlUsingCoreTest.java 2009-12-01
19:40:40 UTC (rev 8482)
+++
trunk/tests/src/org/hornetq/tests/integration/management/HornetQServerControlUsingCoreTest.java 2009-12-01
20:18:47 UTC (rev 8483)
@@ -203,9 +203,9 @@
return (Integer)proxy.retrieveAttributeValue("journalFileSize");
}
- public int getJournalMaxAIO()
+ public int getJournalMaxIO()
{
- return (Integer)proxy.retrieveAttributeValue("journalMaxAIO");
+ return (Integer)proxy.retrieveAttributeValue("journalMaxIO");
}
public int getJournalMinFiles()
@@ -423,14 +423,14 @@
proxy.invokeOperation("setMessageCounterSamplePeriod", newPeriod);
}
- public int getAIOBufferSize()
+ public int getJournalBufferSize()
{
- return (Integer)proxy.retrieveAttributeValue("AIOBufferSize");
+ return (Integer)proxy.retrieveAttributeValue("JournalBufferSize");
}
- public int getAIOBufferTimeout()
+ public int getJournalBufferTimeout()
{
- return (Integer)proxy.retrieveAttributeValue("AIOBufferTimeout");
+ return
(Integer)proxy.retrieveAttributeValue("JournalBufferTimeout");
}
public int getJournalCompactMinFiles()
Modified: trunk/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java
===================================================================
---
trunk/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java 2009-12-01
19:40:40 UTC (rev 8482)
+++
trunk/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java 2009-12-01
20:18:47 UTC (rev 8483)
@@ -418,7 +418,7 @@
{
OperationContext ctx = OperationContextImpl.getContext(factory);
- ctx.lineUp();
+ ctx.storeLineUp();
String msg = "I'm an exception";
@@ -1076,5 +1076,11 @@
{
}
+ public void runDirectJournalBlast() throws Exception
+ {
+ // TODO Auto-generated method stub
+
+ }
+
}
}
Modified: trunk/tests/src/org/hornetq/tests/opt/SendTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/opt/SendTest.java 2009-12-01 19:40:40 UTC (rev
8482)
+++ trunk/tests/src/org/hornetq/tests/opt/SendTest.java 2009-12-01 20:18:47 UTC (rev
8483)
@@ -74,9 +74,10 @@
{
log.info("*** Starting server");
- System.setProperty("org.hornetq.opt.dontadd", "true");
- System.setProperty("org.hornetq.opt.routeblast", "true");
+ //System.setProperty("org.hornetq.opt.dontadd", "true");
+ // System.setProperty("org.hornetq.opt.routeblast", "true");
//System.setProperty("org.hornetq.opt.generatemessages",
"true");
+ System.setProperty("org.hornetq.opt.directblast", "true");
Configuration configuration = new ConfigurationImpl();
configuration.setSecurityEnabled(false);
@@ -91,9 +92,19 @@
configuration.setJournalType(JournalType.ASYNCIO);
+ configuration.setJournalBufferTimeout_NIO(1000000000 / 100); // this is in
nanoseconds
+ configuration.setJournalBufferSize_NIO(490 * 1024);
+ configuration.setJournalMaxIO_NIO(1);
+
+ configuration.setJournalBufferTimeout_AIO(1000000000 / 1000); // this is in
nanoseconds
+ configuration.setJournalBufferSize_AIO(490 * 1024);
+ configuration.setJournalMaxIO_AIO(500);
+
configuration.setLogJournalWriteRate(true);
- //configuration.setRunSyncSpeedTest(true);
+
+ // configuration.setRunSyncSpeedTest(true);
+
Map<String, Object> params = new HashMap<String, Object>();
params.put(TransportConstants.USE_NIO_PROP_NAME, Boolean.FALSE);
Modified:
trunk/tests/src/org/hornetq/tests/unit/core/config/impl/ConfigurationImplTest.java
===================================================================
---
trunk/tests/src/org/hornetq/tests/unit/core/config/impl/ConfigurationImplTest.java 2009-12-01
19:40:40 UTC (rev 8482)
+++
trunk/tests/src/org/hornetq/tests/unit/core/config/impl/ConfigurationImplTest.java 2009-12-01
20:18:47 UTC (rev 8483)
@@ -59,7 +59,8 @@
assertEquals(ConfigurationImpl.DEFAULT_JOURNAL_SYNC_NON_TRANSACTIONAL,
conf.isJournalSyncNonTransactional());
assertEquals(ConfigurationImpl.DEFAULT_JOURNAL_FILE_SIZE,
conf.getJournalFileSize());
assertEquals(ConfigurationImpl.DEFAULT_JOURNAL_MIN_FILES,
conf.getJournalMinFiles());
- assertEquals(ConfigurationImpl.DEFAULT_JOURNAL_MAX_AIO, conf.getJournalMaxAIO());
+ assertEquals(ConfigurationImpl.DEFAULT_JOURNAL_MAX_IO_AIO,
conf.getJournalMaxIO_AIO());
+ assertEquals(ConfigurationImpl.DEFAULT_JOURNAL_MAX_IO_NIO,
conf.getJournalMaxIO_NIO());
assertEquals(ConfigurationImpl.DEFAULT_WILDCARD_ROUTING_ENABLED,
conf.isWildcardRoutingEnabled());
assertEquals(ConfigurationImpl.DEFAULT_TRANSACTION_TIMEOUT,
conf.getTransactionTimeout());
assertEquals(ConfigurationImpl.DEFAULT_MESSAGE_EXPIRY_SCAN_PERIOD,
conf.getMessageExpiryScanPeriod()); // OK
@@ -82,8 +83,10 @@
assertEquals(ConfigurationImpl.DEFAULT_PAGING_DIR, conf.getPagingDirectory());
assertEquals(ConfigurationImpl.DEFAULT_LARGE_MESSAGES_DIR,
conf.getLargeMessagesDirectory());
assertEquals(ConfigurationImpl.DEFAULT_JOURNAL_COMPACT_PERCENTAGE,
conf.getJournalCompactPercentage());
- assertEquals(ConfigurationImpl.DEFAULT_JOURNAL_FLUSH_SYNC,
conf.isJournalFlushOnSync());
- assertEquals(ConfigurationImpl.DEFAULT_JOURNAL_BUFFER_TIMEOUT,
conf.getJournalBufferTimeout());
+ assertEquals(ConfigurationImpl.DEFAULT_JOURNAL_BUFFER_TIMEOUT_AIO,
conf.getJournalBufferTimeout_AIO());
+ assertEquals(ConfigurationImpl.DEFAULT_JOURNAL_BUFFER_TIMEOUT_NIO,
conf.getJournalBufferTimeout_NIO());
+ assertEquals(ConfigurationImpl.DEFAULT_JOURNAL_BUFFER_SIZE_AIO,
conf.getJournalBufferSize_AIO());
+ assertEquals(ConfigurationImpl.DEFAULT_JOURNAL_BUFFER_SIZE_NIO,
conf.getJournalBufferSize_NIO());
assertEquals(ConfigurationImpl.DEFAULT_JOURNAL_LOG_WRITE_RATE,
conf.isLogJournalWriteRate());
assertEquals(ConfigurationImpl.DEFAULT_JOURNAL_PERF_BLAST_PAGES,
conf.getJournalPerfBlastPages());
assertEquals(ConfigurationImpl.DEFAULT_MESSAGE_COUNTER_ENABLED,
conf.isMessageCounterEnabled());
@@ -164,8 +167,12 @@
assertEquals(i, conf.getJournalMinFiles());
i = randomInt();
- conf.setJournalMaxAIO(i);
- assertEquals(i, conf.getJournalMaxAIO());
+ conf.setJournalMaxIO_AIO(i);
+ assertEquals(i, conf.getJournalMaxIO_AIO());
+
+ i = randomInt();
+ conf.setJournalMaxIO_NIO(i);
+ assertEquals(i, conf.getJournalMaxIO_NIO());
s = randomString();
conf.setManagementAddress(new SimpleString(s));
@@ -244,16 +251,20 @@
assertEquals(i, conf.getJournalCompactPercentage());
i = randomInt();
- conf.setJournalBufferSize(i);
- assertEquals(i, conf.getJournalBufferSize());
+ conf.setJournalBufferSize_AIO(i);
+ assertEquals(i, conf.getJournalBufferSize_AIO());
i = randomInt();
- conf.setJournalBufferTimeout(i);
- assertEquals(i, conf.getJournalBufferTimeout());
+ conf.setJournalBufferTimeout_AIO(i);
+ assertEquals(i, conf.getJournalBufferTimeout_AIO());
+
+ i = randomInt();
+ conf.setJournalBufferSize_NIO(i);
+ assertEquals(i, conf.getJournalBufferSize_NIO());
- b = randomBoolean();
- conf.setJournalFlushOnSync(b);
- assertEquals(b, conf.isJournalFlushOnSync());
+ i = randomInt();
+ conf.setJournalBufferTimeout_NIO(i);
+ assertEquals(i, conf.getJournalBufferTimeout_NIO());
b = randomBoolean();
conf.setLogJournalWriteRate(b);
@@ -381,8 +392,12 @@
assertEquals(i, conf.getJournalMinFiles());
i = randomInt();
- conf.setJournalMaxAIO(i);
- assertEquals(i, conf.getJournalMaxAIO());
+ conf.setJournalMaxIO_AIO(i);
+ assertEquals(i, conf.getJournalMaxIO_AIO());
+
+ i = randomInt();
+ conf.setJournalMaxIO_NIO(i);
+ assertEquals(i, conf.getJournalMaxIO_NIO());
s = randomString();
conf.setManagementAddress(new SimpleString(s));
@@ -461,16 +476,20 @@
assertEquals(i, conf.getJournalCompactPercentage());
i = randomInt();
- conf.setJournalBufferSize(i);
- assertEquals(i, conf.getJournalBufferSize());
+ conf.setJournalBufferSize_AIO(i);
+ assertEquals(i, conf.getJournalBufferSize_AIO());
i = randomInt();
- conf.setJournalBufferTimeout(i);
- assertEquals(i, conf.getJournalBufferTimeout());
+ conf.setJournalBufferTimeout_AIO(i);
+ assertEquals(i, conf.getJournalBufferTimeout_AIO());
+
+ i = randomInt();
+ conf.setJournalBufferSize_NIO(i);
+ assertEquals(i, conf.getJournalBufferSize_NIO());
- b = randomBoolean();
- conf.setJournalFlushOnSync(b);
- assertEquals(b, conf.isJournalFlushOnSync());
+ i = randomInt();
+ conf.setJournalBufferTimeout_NIO(i);
+ assertEquals(i, conf.getJournalBufferTimeout_NIO());
b = randomBoolean();
conf.setLogJournalWriteRate(b);
Modified:
trunk/tests/src/org/hornetq/tests/unit/core/config/impl/DefaultsFileConfigurationTest.java
===================================================================
---
trunk/tests/src/org/hornetq/tests/unit/core/config/impl/DefaultsFileConfigurationTest.java 2009-12-01
19:40:40 UTC (rev 8482)
+++
trunk/tests/src/org/hornetq/tests/unit/core/config/impl/DefaultsFileConfigurationTest.java 2009-12-01
20:18:47 UTC (rev 8483)
@@ -105,12 +105,18 @@
assertEquals(ConfigurationImpl.DEFAULT_JOURNAL_MIN_FILES,
conf.getJournalMinFiles());
- assertEquals(ConfigurationImpl.DEFAULT_JOURNAL_MAX_AIO, conf.getJournalMaxAIO());
+ assertEquals(ConfigurationImpl.DEFAULT_JOURNAL_MAX_IO_AIO,
conf.getJournalMaxIO_AIO());
- assertEquals(ConfigurationImpl.DEFAULT_JOURNAL_BUFFER_TIMEOUT,
conf.getJournalBufferTimeout());
+ assertEquals(ConfigurationImpl.DEFAULT_JOURNAL_BUFFER_TIMEOUT_AIO,
conf.getJournalBufferTimeout_AIO());
- assertEquals(ConfigurationImpl.DEFAULT_JOURNAL_BUFFER_SIZE,
conf.getJournalBufferSize());
+ assertEquals(ConfigurationImpl.DEFAULT_JOURNAL_BUFFER_SIZE_AIO,
conf.getJournalBufferSize_AIO());
+ assertEquals(ConfigurationImpl.DEFAULT_JOURNAL_MAX_IO_NIO,
conf.getJournalMaxIO_NIO());
+
+ assertEquals(ConfigurationImpl.DEFAULT_JOURNAL_BUFFER_TIMEOUT_NIO,
conf.getJournalBufferTimeout_NIO());
+
+ assertEquals(ConfigurationImpl.DEFAULT_JOURNAL_BUFFER_SIZE_NIO,
conf.getJournalBufferSize_NIO());
+
assertEquals(ConfigurationImpl.DEFAULT_CREATE_BINDINGS_DIR,
conf.isCreateBindingsDir());
assertEquals(ConfigurationImpl.DEFAULT_CREATE_JOURNAL_DIR,
conf.isCreateJournalDir());
Modified:
trunk/tests/src/org/hornetq/tests/unit/core/config/impl/FileConfigurationTest.java
===================================================================
---
trunk/tests/src/org/hornetq/tests/unit/core/config/impl/FileConfigurationTest.java 2009-12-01
19:40:40 UTC (rev 8482)
+++
trunk/tests/src/org/hornetq/tests/unit/core/config/impl/FileConfigurationTest.java 2009-12-01
20:18:47 UTC (rev 8483)
@@ -69,17 +69,19 @@
assertEquals(false, conf.isCreateBindingsDir());
assertEquals("somedir2", conf.getJournalDirectory());
assertEquals(false, conf.isCreateJournalDir());
+
assertEquals(JournalType.NIO, conf.getJournalType());
- assertEquals(10000, conf.getJournalBufferSize());
- assertEquals(true, conf.isJournalFlushOnSync());
- assertEquals(1000, conf.getJournalBufferTimeout());
+ assertEquals(10000, conf.getJournalBufferSize_NIO());
+ assertEquals(1000, conf.getJournalBufferTimeout_NIO());
+ assertEquals(56546, conf.getJournalMaxIO_NIO());
+
assertEquals(false, conf.isJournalSyncTransactional());
assertEquals(true, conf.isJournalSyncNonTransactional());
assertEquals(12345678, conf.getJournalFileSize());
assertEquals(100, conf.getJournalMinFiles());
assertEquals(123, conf.getJournalCompactMinFiles());
assertEquals(33, conf.getJournalCompactPercentage());
- assertEquals(56546, conf.getJournalMaxAIO());
+
assertEquals("largemessagesdir", conf.getLargeMessagesDirectory());
assertEquals(95, conf.getMemoryWarningThreshold());
assertEquals(1024, conf.getBackupWindowSize());
Modified:
trunk/tests/src/org/hornetq/tests/unit/core/deployers/impl/QueueDeployerTest.java
===================================================================
---
trunk/tests/src/org/hornetq/tests/unit/core/deployers/impl/QueueDeployerTest.java 2009-12-01
19:40:40 UTC (rev 8482)
+++
trunk/tests/src/org/hornetq/tests/unit/core/deployers/impl/QueueDeployerTest.java 2009-12-01
20:18:47 UTC (rev 8483)
@@ -275,7 +275,7 @@
return 0;
}
- public int getJournalMaxAIO()
+ public int getJournalMaxIO()
{
return 0;
@@ -526,13 +526,13 @@
}
- public int getAIOBufferSize()
+ public int getJournalBufferSize()
{
return 0;
}
- public int getAIOBufferTimeout()
+ public int getJournalBufferTimeout()
{
return 0;
}
Modified: trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/TimedBufferTest.java
===================================================================
---
trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/TimedBufferTest.java 2009-12-01
19:40:40 UTC (rev 8482)
+++
trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/TimedBufferTest.java 2009-12-01
20:18:47 UTC (rev 8483)
@@ -87,7 +87,7 @@
}
}
- TimedBuffer timedBuffer = new TimedBuffer(100, 3600 * 1000, false, false); // Any
big timeout
+ TimedBuffer timedBuffer = new TimedBuffer(100, 3600 * 1000, false); // Any big
timeout
timedBuffer.setObserver(new TestObserver());
@@ -106,8 +106,10 @@
timedBuffer.addBytes(buff, false, dummyCallback);
}
+ timedBuffer.checkSize(1);
+
assertEquals(1, flushTimes.get());
-
+
ByteBuffer flushedBuffer = buffers.get(0);
assertEquals(100, flushedBuffer.limit());
Modified: trunk/tests/src/org/hornetq/tests/util/ListJournal.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/util/ListJournal.java 2009-12-01 19:40:40 UTC (rev
8482)
+++ trunk/tests/src/org/hornetq/tests/util/ListJournal.java 2009-12-01 20:18:47 UTC (rev
8483)
@@ -63,7 +63,7 @@
new
NIOSequentialFileFactory(fileConf.getJournalDirectory()),
"hornetq-data",
"hq",
- fileConf.getJournalMaxAIO());
+ fileConf.getJournalMaxIO_NIO());
ArrayList<RecordInfo> records = new ArrayList<RecordInfo>();
ArrayList<PreparedTransactionInfo> prepared = new
ArrayList<PreparedTransactionInfo>();