JBoss hornetq SVN: r8486 - in trunk/src/main/org/hornetq/core/server: impl and 1 other directory.
by do-not-reply@jboss.org
Author: timfox
Date: 2009-12-01 18:13:07 -0500 (Tue, 01 Dec 2009)
New Revision: 8486
Modified:
trunk/src/main/org/hornetq/core/server/Queue.java
trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java
Log:
removed some unnecessary locking on QueueImpl
Modified: trunk/src/main/org/hornetq/core/server/Queue.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/Queue.java 2009-12-01 22:43:53 UTC (rev 8485)
+++ trunk/src/main/org/hornetq/core/server/Queue.java 2009-12-01 23:13:07 UTC (rev 8486)
@@ -126,11 +126,7 @@
Collection<Consumer> getConsumers();
boolean checkDLQ(MessageReference ref) throws Exception;
-
- void lockDelivery();
-
- void unlockDelivery();
-
+
/**
* @return an immutable iterator which does not allow to remove references
*/
Modified: trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java 2009-12-01 22:43:53 UTC (rev 8485)
+++ trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java 2009-12-01 23:13:07 UTC (rev 8486)
@@ -206,23 +206,6 @@
// Queue implementation ----------------------------------------------------------------------------------------
- public void lockDelivery()
- {
- try
- {
- lock.acquire();
- }
- catch (InterruptedException e)
- {
- log.warn(e.getMessage(), e);
- }
- }
-
- public void unlockDelivery()
- {
- lock.release();
- }
-
public boolean isDurable()
{
return durable;
@@ -1475,15 +1458,7 @@
// Must be set to false *before* executing to avoid race
waitingToDeliver.set(false);
- lockDelivery();
- try
- {
- deliver();
- }
- finally
- {
- unlockDelivery();
- }
+ deliver();
}
}
15 years, 1 month
JBoss hornetq SVN: r8485 - in trunk: src/main/org/hornetq/core/server/impl and 3 other directories.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2009-12-01 17:43:53 -0500 (Tue, 01 Dec 2009)
New Revision: 8485
Modified:
trunk/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
trunk/tests/src/org/hornetq/tests/stress/client/SendStressTest.java
trunk/tests/src/org/hornetq/tests/stress/journal/LargeJournalStressTest.java
trunk/tests/src/org/hornetq/tests/stress/journal/MultiThreadConsumerStressTest.java
trunk/tests/src/org/hornetq/tests/stress/paging/PageStressTest.java
Log:
Fixing stress tests
Modified: trunk/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java 2009-12-01 20:38:26 UTC (rev 8484)
+++ trunk/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java 2009-12-01 22:43:53 UTC (rev 8485)
@@ -17,7 +17,9 @@
import java.text.DecimalFormat;
import java.util.HashSet;
import java.util.List;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
@@ -398,6 +400,21 @@
if (running)
{
running = false;
+
+ final CountDownLatch latch = new CountDownLatch(1);
+
+ executor.execute(new Runnable()
+ {
+ public void run()
+ {
+ latch.countDown();
+ }
+ });
+
+ if (!latch.await(60, TimeUnit.SECONDS))
+ {
+ log.warn("Timed out on waiting PagingStore " + this.address + " to shutdown");
+ }
if (currentPage != null)
{
Modified: trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2009-12-01 20:38:26 UTC (rev 8484)
+++ trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2009-12-01 22:43:53 UTC (rev 8485)
@@ -385,6 +385,11 @@
managementService.stop();
+ if (pagingManager != null)
+ {
+ pagingManager.stop();
+ }
+
if (storageManager != null)
{
storageManager.stop();
@@ -423,8 +428,6 @@
postOffice.stop();
}
- // Need to shutdown pools before shutting down paging manager to make sure everything is written ok
-
List<Runnable> tasks = scheduledPool.shutdownNow();
for (Runnable task : tasks)
@@ -436,11 +439,6 @@
scheduledPool = null;
- if (pagingManager != null)
- {
- pagingManager.stop();
- }
-
if (memoryManager != null)
{
memoryManager.stop();
Modified: trunk/tests/src/org/hornetq/tests/stress/client/SendStressTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/stress/client/SendStressTest.java 2009-12-01 20:38:26 UTC (rev 8484)
+++ trunk/tests/src/org/hornetq/tests/stress/client/SendStressTest.java 2009-12-01 22:43:53 UTC (rev 8485)
@@ -13,8 +13,6 @@
package org.hornetq.tests.stress.client;
-import junit.framework.TestSuite;
-
import org.hornetq.core.client.ClientConsumer;
import org.hornetq.core.client.ClientMessage;
import org.hornetq.core.client.ClientProducer;
@@ -44,11 +42,6 @@
// Public --------------------------------------------------------
// Remove this method to re-enable those tests
- public static TestSuite suite()
- {
- return new TestSuite();
- }
-
public void testStressSendNetty() throws Exception
{
doTestStressSend(true);
Modified: trunk/tests/src/org/hornetq/tests/stress/journal/LargeJournalStressTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/stress/journal/LargeJournalStressTest.java 2009-12-01 20:38:26 UTC (rev 8484)
+++ trunk/tests/src/org/hornetq/tests/stress/journal/LargeJournalStressTest.java 2009-12-01 22:43:53 UTC (rev 8485)
@@ -16,7 +16,6 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
-import org.hornetq.core.buffers.HornetQBuffers;
import org.hornetq.core.client.ClientConsumer;
import org.hornetq.core.client.ClientMessage;
import org.hornetq.core.client.ClientProducer;
Modified: trunk/tests/src/org/hornetq/tests/stress/journal/MultiThreadConsumerStressTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/stress/journal/MultiThreadConsumerStressTest.java 2009-12-01 20:38:26 UTC (rev 8484)
+++ trunk/tests/src/org/hornetq/tests/stress/journal/MultiThreadConsumerStressTest.java 2009-12-01 22:43:53 UTC (rev 8485)
@@ -16,8 +16,11 @@
import java.util.ArrayList;
import java.util.concurrent.CountDownLatch;
-import org.hornetq.core.buffers.HornetQBuffers;
-import org.hornetq.core.client.*;
+import org.hornetq.core.client.ClientConsumer;
+import org.hornetq.core.client.ClientMessage;
+import org.hornetq.core.client.ClientProducer;
+import org.hornetq.core.client.ClientSession;
+import org.hornetq.core.client.ClientSessionFactory;
import org.hornetq.core.config.Configuration;
import org.hornetq.core.config.impl.ConfigurationImpl;
import org.hornetq.core.exception.HornetQException;
@@ -28,6 +31,8 @@
/**
* A MultiThreadConsumerStressTest
+ *
+ * This test validates consuming / sending messages while compacting is working
*
* @author <mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
*
@@ -153,7 +158,6 @@
private void setupServer(JournalType journalType) throws Exception, HornetQException
{
Configuration config = createDefaultConfig(true);
- config.setJournalFileSize(ConfigurationImpl.DEFAULT_JOURNAL_FILE_SIZE);
config.setJournalType(journalType);
config.setJMXManagementEnabled(true);
@@ -169,6 +173,12 @@
server.start();
sf = createNettyFactory();
+
+ sf.setBlockOnPersistentSend(false);
+
+ sf.setBlockOnNonPersistentSend(false);
+
+ sf.setBlockOnAcknowledge(false);
ClientSession sess = sf.createSession();
Modified: trunk/tests/src/org/hornetq/tests/stress/paging/PageStressTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/stress/paging/PageStressTest.java 2009-12-01 20:38:26 UTC (rev 8484)
+++ trunk/tests/src/org/hornetq/tests/stress/paging/PageStressTest.java 2009-12-01 22:43:53 UTC (rev 8485)
@@ -29,7 +29,7 @@
import org.hornetq.utils.SimpleString;
/**
- * This is an integration-tests that will take some time to run. TODO: Maybe this test belongs somewhere else?
+ * This is an integration-tests that will take some time to run.
*
* @author <a href="mailto:clebert.suconic@jboss.com">Clebert Suconic</a>
*/
@@ -51,6 +51,9 @@
public void testStopDuringDepage() throws Exception
{
Configuration config = createDefaultConfig();
+
+ config.setJournalSyncNonTransactional(false);
+ config.setJournalSyncTransactional(false);
HashMap<String, AddressSettings> settings = new HashMap<String, AddressSettings>();
@@ -63,6 +66,8 @@
ClientSessionFactory factory = createInVMFactory();
factory.setBlockOnAcknowledge(true);
+ factory.setBlockOnPersistentSend(false);
+ factory.setBlockOnNonPersistentSend(false);
ClientSession session = null;
try
@@ -119,7 +124,8 @@
System.out.println("server stopped, nr msgs: " + msgs);
- messagingService = createServer(true, config, 20 * 1024 * 1024, 10 * 1024 * 1024, settings);
+ messagingService = createServer(true, config, 10 * 1024 * 1024, 20 * 1024 * 1024, settings);
+
messagingService.start();
factory = createInVMFactory();
@@ -153,7 +159,13 @@
finally
{
session.close();
- messagingService.stop();
+ try
+ {
+ messagingService.stop();
+ }
+ catch (Throwable ignored)
+ {
+ }
}
}
15 years, 1 month
JBoss hornetq SVN: r8484 - in trunk/src/main/org/hornetq/core/server: impl and 1 other directory.
by do-not-reply@jboss.org
Author: timfox
Date: 2009-12-01 15:38:26 -0500 (Tue, 01 Dec 2009)
New Revision: 8484
Modified:
trunk/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
Log:
allow bridges to be deployed if clustered is false
Modified: trunk/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java 2009-12-01 20:18:47 UTC (rev 8483)
+++ trunk/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java 2009-12-01 20:38:26 UTC (rev 8484)
@@ -87,6 +87,8 @@
private boolean backup;
+ private final boolean clustered;
+
public ClusterManagerImpl(final org.hornetq.utils.ExecutorFactory executorFactory,
final HornetQServer server,
final PostOffice postOffice,
@@ -94,7 +96,8 @@
final ManagementService managementService,
final Configuration configuration,
final UUID nodeUUID,
- final boolean backup)
+ final boolean backup,
+ final boolean clustered)
{
if (nodeUUID == null)
{
@@ -116,6 +119,8 @@
this.nodeUUID = nodeUUID;
this.backup = backup;
+
+ this.clustered = clustered;
}
public synchronized void start() throws Exception
@@ -125,14 +130,22 @@
return;
}
- for (BroadcastGroupConfiguration config : configuration.getBroadcastGroupConfigurations())
+ if (clustered)
{
- deployBroadcastGroup(config);
- }
+ for (BroadcastGroupConfiguration config : configuration.getBroadcastGroupConfigurations())
+ {
+ deployBroadcastGroup(config);
+ }
- for (DiscoveryGroupConfiguration config : configuration.getDiscoveryGroupConfigurations().values())
- {
- deployDiscoveryGroup(config);
+ for (DiscoveryGroupConfiguration config : configuration.getDiscoveryGroupConfigurations().values())
+ {
+ deployDiscoveryGroup(config);
+ }
+
+ for (ClusterConnectionConfiguration config : configuration.getClusterConfigurations())
+ {
+ deployClusterConnection(config);
+ }
}
for (BridgeConfiguration config : configuration.getBridgeConfigurations())
@@ -140,11 +153,6 @@
deployBridge(config);
}
- for (ClusterConnectionConfiguration config : configuration.getClusterConfigurations())
- {
- deployClusterConnection(config);
- }
-
started = true;
}
@@ -155,16 +163,29 @@
return;
}
- for (BroadcastGroup group : broadcastGroups.values())
+ if (clustered)
{
- group.stop();
- managementService.unregisterBroadcastGroup(group.getName());
- }
+ for (BroadcastGroup group : broadcastGroups.values())
+ {
+ group.stop();
+ managementService.unregisterBroadcastGroup(group.getName());
+ }
- for (DiscoveryGroup group : discoveryGroups.values())
- {
- group.stop();
- managementService.unregisterDiscoveryGroup(group.getName());
+ for (DiscoveryGroup group : discoveryGroups.values())
+ {
+ group.stop();
+ managementService.unregisterDiscoveryGroup(group.getName());
+ }
+
+ for (ClusterConnection clusterConnection : clusters.values())
+ {
+ clusterConnection.stop();
+ managementService.unregisterCluster(clusterConnection.getName().toString());
+ }
+
+ broadcastGroups.clear();
+
+ discoveryGroups.clear();
}
for (Bridge bridge : bridges.values())
@@ -173,16 +194,6 @@
managementService.unregisterBridge(bridge.getName().toString());
}
- for (ClusterConnection clusterConnection : clusters.values())
- {
- clusterConnection.stop();
- managementService.unregisterCluster(clusterConnection.getName().toString());
- }
-
- broadcastGroups.clear();
-
- discoveryGroups.clear();
-
bridges.clear();
started = false;
Modified: trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2009-12-01 20:18:47 UTC (rev 8483)
+++ trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2009-12-01 20:38:26 UTC (rev 8484)
@@ -22,7 +22,6 @@
import java.util.Set;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -53,10 +52,7 @@
import org.hornetq.core.exception.HornetQException;
import org.hornetq.core.filter.Filter;
import org.hornetq.core.filter.impl.FilterImpl;
-import org.hornetq.core.journal.EncodingSupport;
import org.hornetq.core.journal.IOAsyncTask;
-import org.hornetq.core.journal.IOCompletion;
-import org.hornetq.core.journal.Journal;
import org.hornetq.core.journal.JournalLoadInformation;
import org.hornetq.core.journal.impl.SyncSpeedTest;
import org.hornetq.core.logging.LogDelegateFactory;
@@ -102,8 +98,6 @@
import org.hornetq.core.server.MessageReference;
import org.hornetq.core.server.Queue;
import org.hornetq.core.server.QueueFactory;
-import org.hornetq.core.server.RoutingContext;
-import org.hornetq.core.server.ServerConsumer;
import org.hornetq.core.server.ServerMessage;
import org.hornetq.core.server.ServerSession;
import org.hornetq.core.server.cluster.ClusterManager;
@@ -1160,20 +1154,18 @@
// Deply any pre-defined diverts
deployDiverts();
- if (configuration.isClustered())
- {
- // This can't be created until node id is set
- clusterManager = new ClusterManagerImpl(executorFactory,
- this,
- postOffice,
- scheduledPool,
- managementService,
- configuration,
- uuid,
- configuration.isBackup());
+ // This can't be created until node id is set
+ clusterManager = new ClusterManagerImpl(executorFactory,
+ this,
+ postOffice,
+ scheduledPool,
+ managementService,
+ configuration,
+ uuid,
+ configuration.isBackup(),
+ configuration.isClustered());
- clusterManager.start();
- }
+ clusterManager.start();
if (deploymentManager != null)
{
@@ -1198,7 +1190,7 @@
}
initialised = true;
-
+
log.info("********** initialised");
if (System.getProperty("org.hornetq.opt.routeblast") != null)
@@ -1493,47 +1485,45 @@
}
}
-
-// private void runRouteBlastNoWait() throws Exception
-// {
-// SimpleString address = new SimpleString("rbnw_address");
-// SimpleString queueName = new SimpleString("rbnw_name");
-//
-// createQueue(address, queueName, null, true, false, true);
-//
-// Queue queue = (Queue)postOffice.getBinding(queueName).getBindable();
-//
-// RBConsumer consumer = new RBConsumer(queue);
-//
-// queue.addConsumer(consumer);
-//
-// final int bodySize = 1024;
-//
-// byte[] body = new byte[bodySize];
-//
-// final int numMessages = 10000000;
-//
-// for (int i = 0; i < numMessages; i++)
-// {
-// final ServerMessage msg = new ServerMessageImpl(storageManager.generateUniqueID(), 1500);
-//
-// msg.getBodyBuffer().writeBytes(body);
-//
-// msg.setDestination(address);
-//
-// msg.setDurable(true);
-//
-// postOffice.route(msg);
-// }
-// }
-
+ // private void runRouteBlastNoWait() throws Exception
+ // {
+ // SimpleString address = new SimpleString("rbnw_address");
+ // SimpleString queueName = new SimpleString("rbnw_name");
+ //
+ // createQueue(address, queueName, null, true, false, true);
+ //
+ // Queue queue = (Queue)postOffice.getBinding(queueName).getBindable();
+ //
+ // RBConsumer consumer = new RBConsumer(queue);
+ //
+ // queue.addConsumer(consumer);
+ //
+ // final int bodySize = 1024;
+ //
+ // byte[] body = new byte[bodySize];
+ //
+ // final int numMessages = 10000000;
+ //
+ // for (int i = 0; i < numMessages; i++)
+ // {
+ // final ServerMessage msg = new ServerMessageImpl(storageManager.generateUniqueID(), 1500);
+ //
+ // msg.getBodyBuffer().writeBytes(body);
+ //
+ // msg.setDestination(address);
+ //
+ // msg.setDurable(true);
+ //
+ // postOffice.route(msg);
+ // }
+ // }
+
private LinkedBlockingQueue<RouteBlastRunner> available = new LinkedBlockingQueue<RouteBlastRunner>();
-
private void runRouteBlast() throws Exception
{
log.info("*** running route blast");
-
+
final int numThreads = 1;
final int numClients = 1000;
@@ -1565,7 +1555,7 @@
t.join();
}
}
-
+
class RouteBlastRunner implements Runnable
{
private SimpleString address;
@@ -1577,8 +1567,6 @@
this.address = address;
}
-
-
public void setup() throws Exception
{
final int numQueues = 1;
@@ -1595,7 +1583,7 @@
queue.addConsumer(consumer);
- //log.info("added consumer to queue " + queue);
+ // log.info("added consumer to queue " + queue);
consumers.add(consumer);
}
@@ -1639,10 +1627,7 @@
}
}
-
-
-
-
+
class Foo implements Runnable
{
public void run()
@@ -1662,7 +1647,7 @@
}
}
}
-
+
private class RBConsumer implements Consumer
{
private Queue queue;
@@ -1682,16 +1667,14 @@
reference.handled();
queue.acknowledge(reference);
-
- //log.info("acking");
+ // log.info("acking");
+
return HandleStatus.HANDLED;
}
}
-
-
// Inner classes
// --------------------------------------------------------------------------------
}
15 years, 1 month
JBoss hornetq SVN: r8483 - in trunk: src/config/common/schema and 25 other directories.
by do-not-reply@jboss.org
Author: timfox
Date: 2009-12-01 15:18:47 -0500 (Tue, 01 Dec 2009)
New Revision: 8483
Added:
trunk/src/main/org/hornetq/core/journal/impl/AbstractSequentialFileFactory.java
Removed:
trunk/src/main/org/hornetq/core/journal/impl/AbstractSequentialFactory.java
Modified:
trunk/build-hornetq.xml
trunk/src/config/common/schema/hornetq-configuration.xsd
trunk/src/main/org/hornetq/core/asyncio/impl/AsynchronousFileImpl.java
trunk/src/main/org/hornetq/core/config/Configuration.java
trunk/src/main/org/hornetq/core/config/impl/ConfigurationImpl.java
trunk/src/main/org/hornetq/core/config/impl/FileConfiguration.java
trunk/src/main/org/hornetq/core/journal/IOCompletion.java
trunk/src/main/org/hornetq/core/journal/Journal.java
trunk/src/main/org/hornetq/core/journal/SequentialFile.java
trunk/src/main/org/hornetq/core/journal/SequentialFileFactory.java
trunk/src/main/org/hornetq/core/journal/impl/AIOSequentialFile.java
trunk/src/main/org/hornetq/core/journal/impl/AIOSequentialFileFactory.java
trunk/src/main/org/hornetq/core/journal/impl/AbstractSequentialFile.java
trunk/src/main/org/hornetq/core/journal/impl/DummyCallback.java
trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java
trunk/src/main/org/hornetq/core/journal/impl/NIOSequentialFile.java
trunk/src/main/org/hornetq/core/journal/impl/NIOSequentialFileFactory.java
trunk/src/main/org/hornetq/core/journal/impl/SimpleWaitIOCallback.java
trunk/src/main/org/hornetq/core/journal/impl/SyncSpeedTest.java
trunk/src/main/org/hornetq/core/journal/impl/TimedBuffer.java
trunk/src/main/org/hornetq/core/management/HornetQServerControl.java
trunk/src/main/org/hornetq/core/management/impl/HornetQServerControlImpl.java
trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
trunk/src/main/org/hornetq/core/persistence/impl/journal/OperationContextImpl.java
trunk/src/main/org/hornetq/core/replication/impl/ReplicatedJournal.java
trunk/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java
trunk/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java
trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
trunk/src/main/org/hornetq/utils/TokenBucketLimiterImpl.java
trunk/tests/config/ConfigurationTest-full-config.xml
trunk/tests/src/org/hornetq/tests/integration/client/BlockingSendTest.java
trunk/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
trunk/tests/src/org/hornetq/tests/integration/cluster/failover/GroupingFailoverReplicationTest.java
trunk/tests/src/org/hornetq/tests/integration/cluster/failover/GroupingFailoverSharedServerTest.java
trunk/tests/src/org/hornetq/tests/integration/jms/cluster/JMSFailoverTest.java
trunk/tests/src/org/hornetq/tests/integration/journal/AIOJournalCompactTest.java
trunk/tests/src/org/hornetq/tests/integration/journal/AIOJournalImplTest.java
trunk/tests/src/org/hornetq/tests/integration/journal/ValidateTransactionHealthTest.java
trunk/tests/src/org/hornetq/tests/integration/management/HornetQServerControlTest.java
trunk/tests/src/org/hornetq/tests/integration/management/HornetQServerControlUsingCoreTest.java
trunk/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java
trunk/tests/src/org/hornetq/tests/opt/SendTest.java
trunk/tests/src/org/hornetq/tests/unit/core/config/impl/ConfigurationImplTest.java
trunk/tests/src/org/hornetq/tests/unit/core/config/impl/DefaultsFileConfigurationTest.java
trunk/tests/src/org/hornetq/tests/unit/core/config/impl/FileConfigurationTest.java
trunk/tests/src/org/hornetq/tests/unit/core/deployers/impl/QueueDeployerTest.java
trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/TimedBufferTest.java
trunk/tests/src/org/hornetq/tests/util/ListJournal.java
Log:
Mainly tuning of journal/timed buffer and various other small changes
Modified: trunk/build-hornetq.xml
===================================================================
--- trunk/build-hornetq.xml 2009-12-01 19:40:40 UTC (rev 8482)
+++ trunk/build-hornetq.xml 2009-12-01 20:18:47 UTC (rev 8483)
@@ -1226,7 +1226,7 @@
<fileset dir="${test.classes.dir}">
<!-- exlcuded because of https://jira.jboss.org/jira/browse/HORNETQ-65 -->
<exclude name="**/cluster/failover/*StaticClusterWithBackupFailoverTest.class" />
- <exclude name="**/cluster/failover/*WithBackupFailoverTest.class" />
+ <exclude name="**/cluster/failover/*WithBackupFailoverTest.class" />
<include name="${tests.param}"/>
</fileset>
</batchtest>
Modified: trunk/src/config/common/schema/hornetq-configuration.xsd
===================================================================
--- trunk/src/config/common/schema/hornetq-configuration.xsd 2009-12-01 19:40:40 UTC (rev 8482)
+++ trunk/src/config/common/schema/hornetq-configuration.xsd 2009-12-01 20:18:47 UTC (rev 8483)
@@ -151,9 +151,7 @@
<xsd:element maxOccurs="1" minOccurs="0" name="journal-buffer-timeout" type="xsd:long">
</xsd:element>
<xsd:element maxOccurs="1" minOccurs="0" name="journal-buffer-size" type="xsd:long">
- </xsd:element>
- <xsd:element maxOccurs="1" minOccurs="0" name="journal-flush-on-sync" type="xsd:boolean">
- </xsd:element>
+ </xsd:element>
<xsd:element maxOccurs="1" minOccurs="0" name="journal-sync-transactional" type="xsd:boolean">
</xsd:element>
<xsd:element maxOccurs="1" minOccurs="0" name="journal-sync-non-transactional" type="xsd:boolean">
Modified: trunk/src/main/org/hornetq/core/asyncio/impl/AsynchronousFileImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/asyncio/impl/AsynchronousFileImpl.java 2009-12-01 19:40:40 UTC (rev 8482)
+++ trunk/src/main/org/hornetq/core/asyncio/impl/AsynchronousFileImpl.java 2009-12-01 20:18:47 UTC (rev 8483)
@@ -290,12 +290,12 @@
if (writeExecutor != null)
{
+ maxIOSemaphore.acquireUninterruptibly();
+
writeExecutor.execute(new Runnable()
{
public void run()
{
- maxIOSemaphore.acquireUninterruptibly();
-
long sequence = nextWritingSequence.getAndIncrement();
try
@@ -445,7 +445,7 @@
private void callbackDone(final AIOCallback callback, final long sequence, final ByteBuffer buffer)
{
maxIOSemaphore.release();
-
+
pendingWrites.down();
callbackLock.lock();
Modified: trunk/src/main/org/hornetq/core/config/Configuration.java
===================================================================
--- trunk/src/main/org/hornetq/core/config/Configuration.java 2009-12-01 19:40:40 UTC (rev 8482)
+++ trunk/src/main/org/hornetq/core/config/Configuration.java 2009-12-01 20:18:47 UTC (rev 8483)
@@ -224,22 +224,34 @@
void setJournalMinFiles(int files);
- int getJournalMaxAIO();
+ //AIO and NIO need different values for these params
+
+ int getJournalMaxIO_AIO();
- void setJournalMaxAIO(int maxAIO);
+ void setJournalMaxIO_AIO(int journalMaxIO);
- void setJournalBufferSize(int size);
+ int getJournalBufferTimeout_AIO();
+
+ void setJournalBufferTimeout_AIO(int journalBufferTimeout);
+
+ int getJournalBufferSize_AIO();
+
+ void setJournalBufferSize_AIO(int journalBufferSize);
- int getJournalBufferSize();
- void setJournalBufferTimeout(int timeout);
+ int getJournalMaxIO_NIO();
+
+ void setJournalMaxIO_NIO(int journalMaxIO);
+
+ int getJournalBufferTimeout_NIO();
+
+ void setJournalBufferTimeout_NIO(int journalBufferTimeout);
+
+ int getJournalBufferSize_NIO();
+
+ void setJournalBufferSize_NIO(int journalBufferSize);
- int getJournalBufferTimeout();
-
- void setJournalFlushOnSync(boolean flush);
-
- boolean isJournalFlushOnSync();
-
+
boolean isCreateBindingsDir();
void setCreateBindingsDir(boolean create);
Modified: trunk/src/main/org/hornetq/core/config/impl/ConfigurationImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/config/impl/ConfigurationImpl.java 2009-12-01 19:40:40 UTC (rev 8482)
+++ trunk/src/main/org/hornetq/core/config/impl/ConfigurationImpl.java 2009-12-01 20:18:47 UTC (rev 8483)
@@ -100,14 +100,22 @@
public static final int DEFAULT_JOURNAL_MIN_FILES = 2;
- public static final int DEFAULT_JOURNAL_MAX_AIO = 500;
+ // AIO and NIO need to have different defaults for some values
- public static final boolean DEFAULT_JOURNAL_FLUSH_SYNC = false;
+ public static final int DEFAULT_JOURNAL_MAX_IO_AIO = 500;
- public static final int DEFAULT_JOURNAL_BUFFER_TIMEOUT = 20000;
+ public static final int DEFAULT_JOURNAL_BUFFER_TIMEOUT_AIO = (int)(1000000000d / 2000);
- public static final int DEFAULT_JOURNAL_BUFFER_SIZE = 128 * 1024;
+ public static final int DEFAULT_JOURNAL_BUFFER_SIZE_AIO = 490 * 1024;
+ public static final int DEFAULT_JOURNAL_MAX_IO_NIO = 1;
+
+ public static final int DEFAULT_JOURNAL_BUFFER_TIMEOUT_NIO = (int)(1000000000d / 300);
+
+ public static final int DEFAULT_JOURNAL_BUFFER_SIZE_NIO = 490 * 1024;
+
+
+
public static final boolean DEFAULT_JOURNAL_LOG_WRITE_RATE = false;
public static final int DEFAULT_JOURNAL_PERF_BLAST_PAGES = -1;
@@ -266,14 +274,23 @@
protected int journalMinFiles = DEFAULT_JOURNAL_MIN_FILES;
- protected int journalMaxAIO = DEFAULT_JOURNAL_MAX_AIO;
+
+ //AIO and NIO need different values for these attributes
+
+ protected int journalMaxIO_AIO = DEFAULT_JOURNAL_MAX_IO_AIO;
- protected boolean journalFlushSync = DEFAULT_JOURNAL_FLUSH_SYNC;
+ protected int journalBufferTimeout_AIO = DEFAULT_JOURNAL_BUFFER_TIMEOUT_AIO;
- protected int journalBufferTimeout = DEFAULT_JOURNAL_BUFFER_TIMEOUT;
+ protected int journalBufferSize_AIO = DEFAULT_JOURNAL_BUFFER_SIZE_AIO;
+
+ protected int journalMaxIO_NIO = DEFAULT_JOURNAL_MAX_IO_NIO;
- protected int journalBufferSize = DEFAULT_JOURNAL_BUFFER_SIZE;
+ protected int journalBufferTimeout_NIO = DEFAULT_JOURNAL_BUFFER_TIMEOUT_NIO;
+ protected int journalBufferSize_NIO = DEFAULT_JOURNAL_BUFFER_SIZE_NIO;
+
+
+
protected boolean logJournalWriteRate = DEFAULT_JOURNAL_LOG_WRITE_RATE;
protected int journalPerfBlastPages = DEFAULT_JOURNAL_PERF_BLAST_PAGES;
@@ -665,16 +682,6 @@
journalFileSize = size;
}
- public int getJournalMaxAIO()
- {
- return journalMaxAIO;
- }
-
- public void setJournalMaxAIO(final int maxAIO)
- {
- journalMaxAIO = maxAIO;
- }
-
public int getJournalMinFiles()
{
return journalMinFiles;
@@ -815,36 +822,6 @@
jmxDomain = domain;
}
- public void setJournalBufferTimeout(int timeout)
- {
- this.journalBufferTimeout = timeout;
- }
-
- public int getJournalBufferTimeout()
- {
- return journalBufferTimeout;
- }
-
- public void setJournalFlushOnSync(boolean flush)
- {
- journalFlushSync = flush;
- }
-
- public boolean isJournalFlushOnSync()
- {
- return journalFlushSync;
- }
-
- public int getJournalBufferSize()
- {
- return journalBufferSize;
- }
-
- public void setJournalBufferSize(int size)
- {
- this.journalBufferSize = size;
- }
-
public String getLargeMessagesDirectory()
{
return largeMessagesDirectory;
@@ -930,6 +907,131 @@
this.managementRequestTimeout = managementRequestTimeout;
}
+ public int getJournalCompactMinFiles()
+ {
+ return journalCompactMinFiles;
+ }
+
+ public int getJournalCompactPercentage()
+ {
+ return journalCompactPercentage;
+ }
+
+ public void setJournalCompactMinFiles(int minFiles)
+ {
+ this.journalCompactMinFiles = minFiles;
+ }
+
+ public void setJournalCompactPercentage(int percentage)
+ {
+ this.journalCompactPercentage = percentage;
+ }
+
+ public long getServerDumpInterval()
+ {
+ return serverDumpInterval;
+ }
+
+ public void setServerDumpInterval(long intervalInMilliseconds)
+ {
+ this.serverDumpInterval = intervalInMilliseconds;
+ }
+
+ public int getMemoryWarningThreshold()
+ {
+ return memoryWarningThreshold;
+ }
+
+ public void setMemoryWarningThreshold(int memoryWarningThreshold)
+ {
+ this.memoryWarningThreshold = memoryWarningThreshold;
+ }
+
+ public long getMemoryMeasureInterval()
+ {
+ return memoryMeasureInterval;
+ }
+
+ public void setMemoryMeasureInterval(long memoryMeasureInterval)
+ {
+ this.memoryMeasureInterval = memoryMeasureInterval;
+ }
+
+ public String getLogDelegateFactoryClassName()
+ {
+ return logDelegateFactoryClassName;
+ }
+
+ public void setLogDelegateFactoryClassName(String className)
+ {
+ this.logDelegateFactoryClassName = className;
+ }
+
+
+
+ public int getJournalMaxIO_AIO()
+ {
+ return journalMaxIO_AIO;
+ }
+
+ public void setJournalMaxIO_AIO(int journalMaxIO)
+ {
+ this.journalMaxIO_AIO = journalMaxIO;
+ }
+
+ public int getJournalBufferTimeout_AIO()
+ {
+ return journalBufferTimeout_AIO;
+ }
+
+ public void setJournalBufferTimeout_AIO(int journalBufferTimeout)
+ {
+ this.journalBufferTimeout_AIO = journalBufferTimeout;
+ }
+
+ public int getJournalBufferSize_AIO()
+ {
+ return journalBufferSize_AIO;
+ }
+
+ public void setJournalBufferSize_AIO(int journalBufferSize)
+ {
+ this.journalBufferSize_AIO = journalBufferSize;
+ }
+
+
+ public int getJournalMaxIO_NIO()
+ {
+ return journalMaxIO_NIO;
+ }
+
+ public void setJournalMaxIO_NIO(int journalMaxIO)
+ {
+ this.journalMaxIO_NIO = journalMaxIO;
+ }
+
+ public int getJournalBufferTimeout_NIO()
+ {
+ return journalBufferTimeout_NIO;
+ }
+
+ public void setJournalBufferTimeout_NIO(int journalBufferTimeout)
+ {
+ this.journalBufferTimeout_NIO = journalBufferTimeout;
+ }
+
+ public int getJournalBufferSize_NIO()
+ {
+ return journalBufferSize_NIO;
+ }
+
+ public void setJournalBufferSize_NIO(int journalBufferSize)
+ {
+ this.journalBufferSize_NIO = journalBufferSize;
+ }
+
+
+
@Override
public boolean equals(Object obj)
{
@@ -965,7 +1067,7 @@
{
return false;
}
-
+
if (clustered != other.clustered)
return false;
if (connectionTTLOverride != other.connectionTTLOverride)
@@ -983,12 +1085,18 @@
return false;
if (jmxManagementEnabled != other.jmxManagementEnabled)
return false;
- if (journalBufferSize != other.journalBufferSize)
+ if (this.journalBufferSize_AIO != other.journalBufferSize_AIO)
return false;
- if (journalBufferTimeout != other.journalBufferTimeout)
+ if (journalBufferTimeout_AIO != other.journalBufferTimeout_AIO)
return false;
- if (journalFlushSync != other.journalFlushSync)
+ if (journalMaxIO_AIO != other.journalMaxIO_AIO)
+ return false;
+ if (this.journalBufferSize_NIO != other.journalBufferSize_NIO)
return false;
+ if (journalBufferTimeout_NIO != other.journalBufferTimeout_NIO)
+ return false;
+ if (journalMaxIO_NIO != other.journalMaxIO_NIO)
+ return false;
if (journalCompactMinFiles != other.journalCompactMinFiles)
return false;
if (journalCompactPercentage != other.journalCompactPercentage)
@@ -1002,8 +1110,7 @@
return false;
if (journalFileSize != other.journalFileSize)
return false;
- if (journalMaxAIO != other.journalMaxAIO)
- return false;
+
if (journalMinFiles != other.journalMinFiles)
return false;
if (journalPerfBlastPages != other.journalPerfBlastPages)
@@ -1100,64 +1207,4 @@
return true;
}
- public int getJournalCompactMinFiles()
- {
- return journalCompactMinFiles;
- }
-
- public int getJournalCompactPercentage()
- {
- return journalCompactPercentage;
- }
-
- public void setJournalCompactMinFiles(int minFiles)
- {
- this.journalCompactMinFiles = minFiles;
- }
-
- public void setJournalCompactPercentage(int percentage)
- {
- this.journalCompactPercentage = percentage;
- }
-
- public long getServerDumpInterval()
- {
- return serverDumpInterval;
- }
-
- public void setServerDumpInterval(long intervalInMilliseconds)
- {
- this.serverDumpInterval = intervalInMilliseconds;
- }
-
- public int getMemoryWarningThreshold()
- {
- return memoryWarningThreshold;
- }
-
- public void setMemoryWarningThreshold(int memoryWarningThreshold)
- {
- this.memoryWarningThreshold = memoryWarningThreshold;
- }
-
- public long getMemoryMeasureInterval()
- {
- return memoryMeasureInterval;
- }
-
- public void setMemoryMeasureInterval(long memoryMeasureInterval)
- {
- this.memoryMeasureInterval = memoryMeasureInterval;
- }
-
- public String getLogDelegateFactoryClassName()
- {
- return logDelegateFactoryClassName;
- }
-
- public void setLogDelegateFactoryClassName(String className)
- {
- this.logDelegateFactoryClassName = className;
- }
-
}
Modified: trunk/src/main/org/hornetq/core/config/impl/FileConfiguration.java
===================================================================
--- trunk/src/main/org/hornetq/core/config/impl/FileConfiguration.java 2009-12-01 19:40:40 UTC (rev 8482)
+++ trunk/src/main/org/hornetq/core/config/impl/FileConfiguration.java 2009-12-01 20:18:47 UTC (rev 8483)
@@ -73,9 +73,9 @@
private static final String DEFAULT_CONFIGURATION_URL = "hornetq-configuration.xml";
private static final String CONFIGURATION_SCHEMA_URL = "schema/hornetq-configuration.xsd";
-
- //For a bridge confirmations must be activated or send acknowledgements won't return
-
+
+ // For a bridge confirmations must be activated or send acknowledgements won't return
+
public static final int DEFAULT_CONFIRMATION_WINDOW_SIZE = 1024 * 1024;
// Static --------------------------------------------------------------------------
@@ -335,11 +335,36 @@
journalFileSize = getInteger(e, "journal-file-size", journalFileSize, GT_ZERO);
- journalFlushSync = getBoolean(e, "journal-flush-on-sync", DEFAULT_JOURNAL_FLUSH_SYNC);
+ int journalBufferTimeout = getInteger(e,
+ "journal-buffer-timeout",
+ journalType == JournalType.ASYNCIO ? DEFAULT_JOURNAL_BUFFER_TIMEOUT_AIO
+ : DEFAULT_JOURNAL_BUFFER_TIMEOUT_NIO,
+ GT_ZERO);
- journalBufferTimeout = getInteger(e, "journal-buffer-timeout", DEFAULT_JOURNAL_BUFFER_TIMEOUT, GT_ZERO);
+ int journalBufferSize = getInteger(e,
+ "journal-buffer-size",
+ journalType == JournalType.ASYNCIO ? DEFAULT_JOURNAL_BUFFER_SIZE_AIO
+ : DEFAULT_JOURNAL_BUFFER_SIZE_NIO,
+ GT_ZERO);
- journalBufferSize = getInteger(e, "journal-buffer-size", DEFAULT_JOURNAL_BUFFER_SIZE, GT_ZERO);
+ int journalMaxIO = getInteger(e,
+ "journal-max-aio",
+ journalType == JournalType.ASYNCIO ? DEFAULT_JOURNAL_MAX_IO_AIO
+ : DEFAULT_JOURNAL_MAX_IO_NIO,
+ GT_ZERO);
+
+ if (journalType == JournalType.ASYNCIO)
+ {
+ journalBufferTimeout_AIO = journalBufferTimeout;
+ journalBufferSize_AIO = journalBufferSize;
+ journalMaxIO_AIO = journalMaxIO;
+ }
+ else
+ {
+ journalBufferTimeout_NIO = journalBufferTimeout;
+ journalBufferSize_NIO = journalBufferSize;
+ journalMaxIO_NIO = journalMaxIO;
+ }
journalMinFiles = getInteger(e, "journal-min-files", journalMinFiles, GT_ZERO);
@@ -347,8 +372,6 @@
journalCompactPercentage = getInteger(e, "journal-compact-percentage", journalCompactPercentage, PERCENTAGE);
- journalMaxAIO = getInteger(e, "journal-max-aio", journalMaxAIO, GT_ZERO);
-
logJournalWriteRate = getBoolean(e, "log-journal-write-rate", DEFAULT_JOURNAL_LOG_WRITE_RATE);
journalPerfBlastPages = getInteger(e, "perf-blast-pages", DEFAULT_JOURNAL_PERF_BLAST_PAGES, MINUS_ONE_OR_GT_ZERO);
@@ -367,12 +390,12 @@
GT_ZERO);
serverDumpInterval = getLong(e, "server-dump-interval", serverDumpInterval, MINUS_ONE_OR_GT_ZERO); // in
- // milliseconds
+ // milliseconds
memoryWarningThreshold = getInteger(e, "memory-warning-threshold", memoryWarningThreshold, PERCENTAGE);
memoryMeasureInterval = getLong(e, "memory-measure-interval", memoryMeasureInterval, MINUS_ONE_OR_GT_ZERO); // in
- // milliseconds
+ // milliseconds
backupWindowSize = getInteger(e, "backup-window-size", DEFAULT_BACKUP_WINDOW_SIZE, MINUS_ONE_OR_GT_ZERO);
@@ -523,7 +546,7 @@
long retryInterval = getLong(e, "retry-interval", DEFAULT_CLUSTER_RETRY_INTERVAL, GT_ZERO);
int confirmationWindowSize = getInteger(e, "confirmation-window-size", DEFAULT_CONFIRMATION_WINDOW_SIZE, GT_ZERO);
-
+
String discoveryGroupName = null;
List<Pair<String, String>> connectorPairs = new ArrayList<Pair<String, String>>();
@@ -609,10 +632,13 @@
String transformerClassName = getString(brNode, "transformer-class-name", null, NO_CHECK);
long retryInterval = getLong(brNode, "retry-interval", DEFAULT_RETRY_INTERVAL, GT_ZERO);
-
- //Default bridge conf
- int confirmationWindowSize = getInteger(brNode, "confirmation-window-size", DEFAULT_CONFIRMATION_WINDOW_SIZE, GT_ZERO);
-
+
+ // Default bridge conf
+ int confirmationWindowSize = getInteger(brNode,
+ "confirmation-window-size",
+ DEFAULT_CONFIRMATION_WINDOW_SIZE,
+ GT_ZERO);
+
double retryIntervalMultiplier = getDouble(brNode,
"retry-interval-multiplier",
DEFAULT_RETRY_INTERVAL_MULTIPLIER,
Modified: trunk/src/main/org/hornetq/core/journal/IOCompletion.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/IOCompletion.java 2009-12-01 19:40:40 UTC (rev 8482)
+++ trunk/src/main/org/hornetq/core/journal/IOCompletion.java 2009-12-01 20:18:47 UTC (rev 8483)
@@ -22,5 +22,5 @@
*/
public interface IOCompletion extends IOAsyncTask
{
- void lineUp();
+ void storeLineUp();
}
Modified: trunk/src/main/org/hornetq/core/journal/Journal.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/Journal.java 2009-12-01 19:40:40 UTC (rev 8482)
+++ trunk/src/main/org/hornetq/core/journal/Journal.java 2009-12-01 20:18:47 UTC (rev 8483)
@@ -112,5 +112,6 @@
void perfBlast(int pages) throws Exception;
+ void runDirectJournalBlast() throws Exception;
}
Modified: trunk/src/main/org/hornetq/core/journal/SequentialFile.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/SequentialFile.java 2009-12-01 19:40:40 UTC (rev 8482)
+++ trunk/src/main/org/hornetq/core/journal/SequentialFile.java 2009-12-01 20:18:47 UTC (rev 8483)
@@ -88,10 +88,6 @@
void renameTo(String newFileName) throws Exception;
- void disableAutoFlush();
-
- void enableAutoFlush();
-
SequentialFile copy();
void setTimedBuffer(TimedBuffer buffer);
Modified: trunk/src/main/org/hornetq/core/journal/SequentialFileFactory.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/SequentialFileFactory.java 2009-12-01 19:40:40 UTC (rev 8482)
+++ trunk/src/main/org/hornetq/core/journal/SequentialFileFactory.java 2009-12-01 20:18:47 UTC (rev 8483)
@@ -36,9 +36,6 @@
void releaseBuffer(ByteBuffer buffer);
- /** The factory may need to do some initialization before the file is activated.
- * this was added as a hook for AIO to initialize the Observer on TimedBuffer.
- * It could be eventually done the same on NIO if we implement TimedBuffer on NIO */
void activateBuffer(SequentialFile file);
void deactivateBuffer();
@@ -61,7 +58,5 @@
*/
void createDirs() throws Exception;
- void flush();
-
-
+ void flush();
}
Modified: trunk/src/main/org/hornetq/core/journal/impl/AIOSequentialFile.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/AIOSequentialFile.java 2009-12-01 19:40:40 UTC (rev 8482)
+++ trunk/src/main/org/hornetq/core/journal/impl/AIOSequentialFile.java 2009-12-01 20:18:47 UTC (rev 8483)
@@ -46,7 +46,6 @@
/** The pool for Thread pollers */
private final Executor pollerExecutor;
-
public AIOSequentialFile(final SequentialFileFactory factory,
final int bufferSize,
@@ -87,18 +86,27 @@
public SequentialFile copy()
{
- return new AIOSequentialFile(factory, -1, -1, getFile().getParent(), getFileName(), maxIO, bufferCallback, writerExecutor, pollerExecutor);
+ return new AIOSequentialFile(factory,
+ -1,
+ -1,
+ getFile().getParent(),
+ getFileName(),
+ maxIO,
+ bufferCallback,
+ writerExecutor,
+ pollerExecutor);
}
+ @Override
public synchronized void close() throws Exception
{
if (!opened)
{
return;
}
-
+
super.close();
-
+
opened = false;
timedBuffer = null;
@@ -106,7 +114,7 @@
aioFile.close();
aioFile = null;
- this.notifyAll();
+ notifyAll();
}
/* (non-Javadoc)
@@ -165,7 +173,7 @@
aioFile.fill(filePosition, blocks, blockSize, fillCharacter);
- this.fileSize = aioFile.size();
+ fileSize = aioFile.size();
}
public void open() throws Exception
@@ -176,16 +184,16 @@
public synchronized void open(final int maxIO) throws Exception
{
opened = true;
-
+
aioFile = new AsynchronousFileImpl(writerExecutor, pollerExecutor);
-
+
aioFile.open(getFile().getAbsolutePath(), maxIO);
-
+
position.set(0);
-
+
aioFile.setBufferCallback(bufferCallback);
-
- this.fileSize = aioFile.size();
+
+ fileSize = aioFile.size();
}
public void setBufferCallback(final BufferCallback callback)
@@ -216,7 +224,6 @@
return bytesRead;
}
-
public void sync() throws Exception
{
@@ -244,18 +251,14 @@
// Public methods
// -----------------------------------------------------------------------------------------------------
- // Protected methods
- // -----------------------------------------------------------------------------------------------------
-
-
public void writeDirect(final ByteBuffer bytes, final boolean sync) throws Exception
{
if (sync)
{
SimpleWaitIOCallback completion = new SimpleWaitIOCallback();
-
+
writeDirect(bytes, true, completion);
-
+
completion.waitCompletion();
}
else
@@ -264,12 +267,11 @@
}
}
-
/**
*
* @param sync Not used on AIO
* */
- public void writeDirect(final ByteBuffer bytes, final boolean sync, IOAsyncTask callback)
+ public void writeDirect(final ByteBuffer bytes, final boolean sync, final IOAsyncTask callback)
{
final int bytesToWrite = factory.calculateBlockSize(bytes.limit());
@@ -278,7 +280,20 @@
aioFile.write(positionToWrite, bytesToWrite, bytes, callback);
}
+ // Protected methods
+ // -----------------------------------------------------------------------------------------------------
+ @Override
+ protected ByteBuffer newBuffer(int size, int limit)
+ {
+ size = factory.calculateBlockSize(size);
+ limit = factory.calculateBlockSize(limit);
+
+ ByteBuffer buffer = factory.newBuffer(size);
+ buffer.limit(limit);
+ return buffer;
+ }
+
// Private methods
// -----------------------------------------------------------------------------------------------------
Modified: trunk/src/main/org/hornetq/core/journal/impl/AIOSequentialFileFactory.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/AIOSequentialFileFactory.java 2009-12-01 19:40:40 UTC (rev 8482)
+++ trunk/src/main/org/hornetq/core/journal/impl/AIOSequentialFileFactory.java 2009-12-01 20:18:47 UTC (rev 8483)
@@ -33,7 +33,7 @@
* @author clebert.suconic(a)jboss.com
*
*/
-public class AIOSequentialFileFactory extends AbstractSequentialFactory
+public class AIOSequentialFileFactory extends AbstractSequentialFileFactory
{
private static final Logger log = Logger.getLogger(AIOSequentialFileFactory.class);
@@ -55,19 +55,17 @@
public AIOSequentialFileFactory(final String journalDir)
{
this(journalDir,
- ConfigurationImpl.DEFAULT_JOURNAL_BUFFER_SIZE,
- ConfigurationImpl.DEFAULT_JOURNAL_BUFFER_TIMEOUT,
- ConfigurationImpl.DEFAULT_JOURNAL_FLUSH_SYNC,
+ ConfigurationImpl.DEFAULT_JOURNAL_BUFFER_SIZE_AIO,
+ ConfigurationImpl.DEFAULT_JOURNAL_BUFFER_TIMEOUT_AIO,
false);
}
public AIOSequentialFileFactory(final String journalDir,
final int bufferSize,
- final long bufferTimeout,
- final boolean flushOnSync,
+ final int bufferTimeout,
final boolean logRates)
{
- super(journalDir, true, bufferSize, bufferTimeout, flushOnSync, logRates);
+ super(journalDir, true, bufferSize, bufferTimeout, logRates);
}
public SequentialFile createSequentialFile(final String fileName, final int maxIO)
@@ -154,18 +152,21 @@
{
buffersControl.stop();
- pollerExecutor.shutdown();
+ if (pollerExecutor != null)
+ {
+ pollerExecutor.shutdown();
- try
- {
- if (!pollerExecutor.awaitTermination(EXECUTOR_TIMEOUT, TimeUnit.SECONDS))
+ try
{
- log.warn("Timed out on AIO poller shutdown", new Exception("Timed out on AIO writer shutdown"));
+ if (!pollerExecutor.awaitTermination(EXECUTOR_TIMEOUT, TimeUnit.SECONDS))
+ {
+ log.warn("Timed out on AIO poller shutdown", new Exception("Timed out on AIO writer shutdown"));
+ }
}
+ catch (InterruptedException e)
+ {
+ }
}
- catch (InterruptedException e)
- {
- }
super.stop();
}
Deleted: trunk/src/main/org/hornetq/core/journal/impl/AbstractSequentialFactory.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/AbstractSequentialFactory.java 2009-12-01 19:40:40 UTC (rev 8482)
+++ trunk/src/main/org/hornetq/core/journal/impl/AbstractSequentialFactory.java 2009-12-01 20:18:47 UTC (rev 8483)
@@ -1,200 +0,0 @@
-/*
- * Copyright 2009 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied. See the License for the specific language governing
- * permissions and limitations under the License.
- */
-
-package org.hornetq.core.journal.impl;
-
-import java.io.File;
-import java.io.FilenameFilter;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.Arrays;
-import java.util.List;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-
-import org.hornetq.core.journal.SequentialFile;
-import org.hornetq.core.journal.SequentialFileFactory;
-import org.hornetq.core.logging.Logger;
-import org.hornetq.utils.HornetQThreadFactory;
-
-/**
- *
- * An abstract SequentialFileFactory containing basic functionality for both AIO and NIO SequentialFactories
- *
- * @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
- * @author <a href="mailto:clebert.suconic@jboss.com">Clebert Suconic</a>
- *
- */
-public abstract class AbstractSequentialFactory implements SequentialFileFactory
-{
-
- // Timeout used to wait executors to shutdown
- protected static final int EXECUTOR_TIMEOUT = 60;
-
- private static final Logger log = Logger.getLogger(AbstractSequentialFactory.class);
-
- protected final String journalDir;
-
- protected final TimedBuffer timedBuffer;
-
- protected final int bufferSize;
-
- protected final long bufferTimeout;
-
- /**
- * Asynchronous writes need to be done at another executor.
- * This needs to be done at NIO, or else we would have the callers thread blocking for the return.
- * At AIO this is necessary as context switches on writes would fire flushes at the kernel.
- * */
- protected ExecutorService writeExecutor;
-
- public AbstractSequentialFactory(final String journalDir,
- final boolean buffered,
- final int bufferSize,
- final long bufferTimeout,
- final boolean flushOnSync,
- final boolean logRates)
- {
- this.journalDir = journalDir;
-
- if (buffered)
- {
- timedBuffer = new TimedBuffer(bufferSize, bufferTimeout, flushOnSync, logRates);
- }
- else
- {
- timedBuffer = null;
- }
- this.bufferSize = bufferSize;
- this.bufferTimeout = bufferTimeout;
- }
-
- public void stop()
- {
- if (timedBuffer != null)
- {
- timedBuffer.stop();
- }
-
- if (isSupportsCallbacks())
- {
- writeExecutor.shutdown();
-
- try
- {
- if (!writeExecutor.awaitTermination(EXECUTOR_TIMEOUT, TimeUnit.SECONDS))
- {
- log.warn("Timed out on AIO writer shutdown", new Exception("Timed out on AIO writer shutdown"));
- }
- }
- catch (InterruptedException e)
- {
- }
- }
-
-
- }
-
- public void start()
- {
- if (timedBuffer != null)
- {
- timedBuffer.start();
- }
-
- if (isSupportsCallbacks())
- {
- writeExecutor = Executors.newSingleThreadExecutor(new HornetQThreadFactory("HornetQ-Asynchronous-Persistent-Writes" + System.identityHashCode(this),
- true));
- }
-
-
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.journal.SequentialFileFactory#activate(org.hornetq.core.journal.SequentialFile)
- */
- public void activateBuffer(final SequentialFile file)
- {
- if (timedBuffer != null)
- {
- timedBuffer.disableAutoFlush();
- try
- {
- file.setTimedBuffer(timedBuffer);
- }
- finally
- {
- file.enableAutoFlush();
- }
- }
- }
-
- public void flush()
- {
- if (timedBuffer != null)
- {
- timedBuffer.flush();
- }
- }
-
- public void deactivateBuffer()
- {
- if (timedBuffer != null)
- {
- timedBuffer.flush();
- timedBuffer.setObserver(null);
- }
- }
-
- public void releaseBuffer(ByteBuffer buffer)
- {
- }
-
- /**
- * Create the directory if it doesn't exist yet
- */
- public void createDirs() throws Exception
- {
- File file = new File(journalDir);
- boolean ok = file.mkdirs();
- if (!ok)
- {
- throw new IOException("Failed to create directory " + journalDir);
- }
- }
-
- public List<String> listFiles(final String extension) throws Exception
- {
- File dir = new File(journalDir);
-
- FilenameFilter fnf = new FilenameFilter()
- {
- public boolean accept(final File file, final String name)
- {
- return name.endsWith("." + extension);
- }
- };
-
- String[] fileNames = dir.list(fnf);
-
- if (fileNames == null)
- {
- throw new IOException("Failed to list: " + journalDir);
- }
-
- return Arrays.asList(fileNames);
- }
-
-}
Modified: trunk/src/main/org/hornetq/core/journal/impl/AbstractSequentialFile.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/AbstractSequentialFile.java 2009-12-01 19:40:40 UTC (rev 8482)
+++ trunk/src/main/org/hornetq/core/journal/impl/AbstractSequentialFile.java 2009-12-01 20:18:47 UTC (rev 8483)
@@ -21,9 +21,9 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
-import org.hornetq.core.journal.EncodingSupport;
import org.hornetq.core.buffers.HornetQBuffer;
import org.hornetq.core.buffers.HornetQBuffers;
+import org.hornetq.core.journal.EncodingSupport;
import org.hornetq.core.journal.IOAsyncTask;
import org.hornetq.core.journal.SequentialFile;
import org.hornetq.core.journal.SequentialFileFactory;
@@ -162,22 +162,6 @@
}
}
- public final void disableAutoFlush()
- {
- if (timedBuffer != null)
- {
- timedBuffer.disableAutoFlush();
- }
- }
-
- public final void enableAutoFlush()
- {
- if (timedBuffer != null)
- {
- timedBuffer.enableAutoFlush();
- }
- }
-
public void setTimedBuffer(final TimedBuffer buffer)
{
if (timedBuffer != null)
@@ -315,7 +299,17 @@
}
}
}
+
+ protected ByteBuffer newBuffer(int size, int limit)
+ {
+ size = factory.calculateBlockSize(size);
+ limit = factory.calculateBlockSize(limit);
+ ByteBuffer buffer = factory.newBuffer(size);
+ buffer.limit(limit);
+ return buffer;
+ }
+
protected class LocalBufferObserver implements TimedBufferObserver
{
public void flushBuffer(final ByteBuffer buffer, final boolean requestedSync, final List<IOAsyncTask> callbacks)
@@ -334,12 +328,7 @@
public ByteBuffer newBuffer(int size, int limit)
{
- size = factory.calculateBlockSize(size);
- limit = factory.calculateBlockSize(limit);
-
- ByteBuffer buffer = factory.newBuffer(size);
- buffer.limit(limit);
- return buffer;
+ return AbstractSequentialFile.this.newBuffer(size, limit);
}
public int getRemainingBytes()
Copied: trunk/src/main/org/hornetq/core/journal/impl/AbstractSequentialFileFactory.java (from rev 8467, trunk/src/main/org/hornetq/core/journal/impl/AbstractSequentialFactory.java)
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/AbstractSequentialFileFactory.java (rev 0)
+++ trunk/src/main/org/hornetq/core/journal/impl/AbstractSequentialFileFactory.java 2009-12-01 20:18:47 UTC (rev 8483)
@@ -0,0 +1,187 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.journal.impl;
+
+import java.io.File;
+import java.io.FilenameFilter;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import org.hornetq.core.journal.SequentialFile;
+import org.hornetq.core.journal.SequentialFileFactory;
+import org.hornetq.core.logging.Logger;
+import org.hornetq.utils.HornetQThreadFactory;
+
+/**
+ *
+ * An abstract SequentialFileFactory containing basic functionality for both AIO and NIO SequentialFactories
+ *
+ * @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
+ * @author <a href="mailto:clebert.suconic@jboss.com">Clebert Suconic</a>
+ *
+ */
+public abstract class AbstractSequentialFileFactory implements SequentialFileFactory
+{
+
+ // Timeout used to wait executors to shutdown
+ protected static final int EXECUTOR_TIMEOUT = 60;
+
+ private static final Logger log = Logger.getLogger(AbstractSequentialFileFactory.class);
+
+ protected final String journalDir;
+
+ protected final TimedBuffer timedBuffer;
+
+ protected final int bufferSize;
+
+ protected final long bufferTimeout;
+
+ /**
+ * Asynchronous writes need to be done at another executor.
+ * This needs to be done at NIO, or else we would have the callers thread blocking for the return.
+ * At AIO this is necessary as context switches on writes would fire flushes at the kernel.
+ * */
+ protected ExecutorService writeExecutor;
+
+ public AbstractSequentialFileFactory(final String journalDir,
+ final boolean buffered,
+ final int bufferSize,
+ final int bufferTimeout,
+ final boolean logRates)
+ {
+ this.journalDir = journalDir;
+
+ if (buffered)
+ {
+ timedBuffer = new TimedBuffer(bufferSize, bufferTimeout, logRates);
+ }
+ else
+ {
+ timedBuffer = null;
+ }
+ this.bufferSize = bufferSize;
+ this.bufferTimeout = bufferTimeout;
+ }
+
+ public void stop()
+ {
+ if (timedBuffer != null)
+ {
+ timedBuffer.stop();
+ }
+
+ if (isSupportsCallbacks() && writeExecutor != null)
+ {
+ writeExecutor.shutdown();
+
+ try
+ {
+ if (!writeExecutor.awaitTermination(EXECUTOR_TIMEOUT, TimeUnit.SECONDS))
+ {
+ log.warn("Timed out on AIO writer shutdown", new Exception("Timed out on AIO writer shutdown"));
+ }
+ }
+ catch (InterruptedException e)
+ {
+ }
+ }
+ }
+
+ public void start()
+ {
+ if (timedBuffer != null)
+ {
+ timedBuffer.start();
+ }
+
+ if (isSupportsCallbacks())
+ {
+ writeExecutor = Executors.newSingleThreadExecutor(new HornetQThreadFactory("HornetQ-Asynchronous-Persistent-Writes" + System.identityHashCode(this),
+ true));
+ }
+
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.journal.SequentialFileFactory#activate(org.hornetq.core.journal.SequentialFile)
+ */
+ public void activateBuffer(final SequentialFile file)
+ {
+ if (timedBuffer != null)
+ {
+ file.setTimedBuffer(timedBuffer);
+ }
+ }
+
+ public void flush()
+ {
+ if (timedBuffer != null)
+ {
+ timedBuffer.flush();
+ }
+ }
+
+ public void deactivateBuffer()
+ {
+ if (timedBuffer != null)
+ {
+ timedBuffer.flush();
+ timedBuffer.setObserver(null);
+ }
+ }
+
+ public void releaseBuffer(final ByteBuffer buffer)
+ {
+ }
+
+ /**
+ * Create the directory if it doesn't exist yet
+ */
+ public void createDirs() throws Exception
+ {
+ File file = new File(journalDir);
+ boolean ok = file.mkdirs();
+ if (!ok)
+ {
+ throw new IOException("Failed to create directory " + journalDir);
+ }
+ }
+
+ public List<String> listFiles(final String extension) throws Exception
+ {
+ File dir = new File(journalDir);
+
+ FilenameFilter fnf = new FilenameFilter()
+ {
+ public boolean accept(final File file, final String name)
+ {
+ return name.endsWith("." + extension);
+ }
+ };
+
+ String[] fileNames = dir.list(fnf);
+
+ if (fileNames == null)
+ {
+ throw new IOException("Failed to list: " + journalDir);
+ }
+
+ return Arrays.asList(fileNames);
+ }
+}
Modified: trunk/src/main/org/hornetq/core/journal/impl/DummyCallback.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/DummyCallback.java 2009-12-01 19:40:40 UTC (rev 8482)
+++ trunk/src/main/org/hornetq/core/journal/impl/DummyCallback.java 2009-12-01 20:18:47 UTC (rev 8483)
@@ -50,7 +50,7 @@
/* (non-Javadoc)
* @see org.hornetq.core.journal.IOCompletion#linedUp()
*/
- public void lineUp()
+ public void storeLineUp()
{
}
}
Modified: trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java 2009-12-01 19:40:40 UTC (rev 8482)
+++ trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java 2009-12-01 20:18:47 UTC (rev 8483)
@@ -103,7 +103,6 @@
private static final void trace(final String message)
{
log.trace(message);
- //System.out.println("JournalImpl::" + message);
}
// The sizes of primitive types
@@ -166,7 +165,6 @@
private final AtomicInteger nextFileID = new AtomicInteger(0);
- // used for Asynchronous IO only (ignored on NIO).
private final int maxAIO;
private final int fileSize;
@@ -219,9 +217,70 @@
private volatile int state;
private final Reclaimer reclaimer = new Reclaimer();
-
+
// Constructors --------------------------------------------------
+ public void runDirectJournalBlast() throws Exception
+ {
+ final int numIts = 100000000;
+
+ log.info("*** running direct journal blast: " + numIts);
+
+ final CountDownLatch latch = new CountDownLatch(numIts * 2);
+
+ class MyIOAsyncTask implements IOCompletion
+ {
+ public void done()
+ {
+ latch.countDown();
+ }
+
+ public void onError(int errorCode, String errorMessage)
+ {
+
+ }
+
+ public void storeLineUp()
+ {
+ }
+ }
+
+ final MyIOAsyncTask task = new MyIOAsyncTask();
+
+ final int recordSize = 1024;
+
+ final byte[] bytes = new byte[recordSize];
+
+ class MyRecord implements EncodingSupport
+ {
+
+ public void decode(HornetQBuffer buffer)
+ {
+ }
+
+ public void encode(HornetQBuffer buffer)
+ {
+ buffer.writeBytes(bytes);
+ }
+
+ public int getEncodeSize()
+ {
+ return recordSize;
+ }
+
+ }
+
+ MyRecord record = new MyRecord();
+
+ for (int i = 0; i < numIts; i++)
+ {
+ appendAddRecord(i, (byte)1, record, true, task);
+ appendDeleteRecord(i, true, task);
+ }
+
+ latch.await();
+ }
+
public JournalImpl(final int fileSize,
final int minFiles,
final int compactMinFiles,
@@ -229,7 +288,7 @@
final SequentialFileFactory fileFactory,
final String filePrefix,
final String fileExtension,
- final int maxAIO)
+ final int maxIO)
{
if (fileFactory == null)
{
@@ -257,7 +316,7 @@
{
throw new NullPointerException("fileExtension is null");
}
- if (maxAIO <= 0)
+ if (maxIO <= 0)
{
throw new IllegalStateException("maxAIO should aways be a positive number");
}
@@ -274,7 +333,9 @@
else
{
this.compactPercentage = (float)compactPercentage / 100f;
- }
+ }
+
+ log.info("creating journal with max io " + maxIO);
this.compactMinFiles = compactMinFiles;
@@ -288,7 +349,7 @@
this.fileExtension = fileExtension;
- this.maxAIO = maxAIO;
+ this.maxAIO = maxIO;
}
public Map<Long, JournalRecord> getRecords()
@@ -533,13 +594,13 @@
switch (recordType)
{
case ADD_RECORD:
- {
+ {
reader.onReadAddRecord(new RecordInfo(recordID, userRecordType, record, false));
break;
}
case UPDATE_RECORD:
- {
+ {
reader.onReadUpdateRecord(new RecordInfo(recordID, userRecordType, record, true));
break;
}
@@ -641,25 +702,33 @@
{
appendAddRecord(id, recordType, new ByteArrayEncoding(record), sync);
}
-
- public void appendAddRecord(final long id, final byte recordType, final byte[] record, final boolean sync, final IOCompletion callback) throws Exception
+
+ public void appendAddRecord(final long id,
+ final byte recordType,
+ final byte[] record,
+ final boolean sync,
+ final IOCompletion callback) throws Exception
{
appendAddRecord(id, recordType, new ByteArrayEncoding(record), sync, callback);
}
-
+
public void appendAddRecord(final long id, final byte recordType, final EncodingSupport record, final boolean sync) throws Exception
{
SyncIOCompletion callback = getSyncCallback(sync);
-
+
appendAddRecord(id, recordType, record, sync, callback);
-
+
if (callback != null)
{
callback.waitCompletion();
}
}
- public void appendAddRecord(final long id, final byte recordType, final EncodingSupport record, final boolean sync, final IOCompletion callback) throws Exception
+ public void appendAddRecord(final long id,
+ final byte recordType,
+ final EncodingSupport record,
+ final boolean sync,
+ final IOCompletion callback) throws Exception
{
if (LOAD_TRACE)
{
@@ -669,16 +738,16 @@
{
throw new IllegalStateException("Journal must be loaded first");
}
-
+
compactingLock.readLock().lock();
try
- {
+ {
JournalInternalRecord addRecord = new JournalAddRecord(true, id, recordType, record);
if (callback != null)
{
- callback.lineUp();
+ callback.storeLineUp();
}
lockAppend.lock();
@@ -704,7 +773,11 @@
appendUpdateRecord(id, recordType, new ByteArrayEncoding(record), sync);
}
- public void appendUpdateRecord(final long id, final byte recordType, final byte[] record, final boolean sync, final IOCompletion callback) throws Exception
+ public void appendUpdateRecord(final long id,
+ final byte recordType,
+ final byte[] record,
+ final boolean sync,
+ final IOCompletion callback) throws Exception
{
appendUpdateRecord(id, recordType, new ByteArrayEncoding(record), sync, callback);
}
@@ -712,16 +785,20 @@
public void appendUpdateRecord(final long id, final byte recordType, final EncodingSupport record, final boolean sync) throws Exception
{
SyncIOCompletion callback = getSyncCallback(sync);
-
+
appendUpdateRecord(id, recordType, record, sync, callback);
-
+
if (callback != null)
{
callback.waitCompletion();
}
}
-
- public void appendUpdateRecord(final long id, final byte recordType, final EncodingSupport record, final boolean sync, final IOCompletion callback) throws Exception
+
+ public void appendUpdateRecord(final long id,
+ final byte recordType,
+ final EncodingSupport record,
+ final boolean sync,
+ final IOCompletion callback) throws Exception
{
if (LOAD_TRACE)
{
@@ -731,7 +808,7 @@
{
throw new IllegalStateException("Journal must be loaded first");
}
-
+
compactingLock.readLock().lock();
try
@@ -747,12 +824,12 @@
}
JournalInternalRecord updateRecord = new JournalAddRecord(false, id, recordType, record);
-
+
if (callback != null)
{
- callback.lineUp();
+ callback.storeLineUp();
}
-
+
lockAppend.lock();
try
{
@@ -780,19 +857,18 @@
}
}
-
public void appendDeleteRecord(final long id, final boolean sync) throws Exception
{
SyncIOCompletion callback = getSyncCallback(sync);
-
+
appendDeleteRecord(id, sync, callback);
-
+
if (callback != null)
{
callback.waitCompletion();
}
}
-
+
public void appendDeleteRecord(final long id, final boolean sync, final IOCompletion callback) throws Exception
{
if (LOAD_TRACE)
@@ -803,7 +879,7 @@
{
throw new IllegalStateException("Journal must be loaded first");
}
-
+
compactingLock.readLock().lock();
try
@@ -818,12 +894,12 @@
throw new IllegalStateException("Cannot find add info " + id);
}
}
-
+
JournalInternalRecord deleteRecord = new JournalDeleteRecord(id);
if (callback != null)
{
- callback.lineUp();
+ callback.storeLineUp();
}
lockAppend.lock();
@@ -997,8 +1073,7 @@
{
appendDeleteRecordTransactional(txID, id, NullEncoding.instance);
}
-
-
+
public void appendPrepareRecord(long txID, byte[] transactionData, boolean sync, IOCompletion completion) throws Exception
{
appendPrepareRecord(txID, new ByteArrayEncoding(transactionData), sync, completion);
@@ -1015,9 +1090,9 @@
public void appendPrepareRecord(final long txID, final EncodingSupport transactionData, final boolean sync) throws Exception
{
SyncIOCompletion syncCompletion = getSyncCallback(sync);
-
+
appendPrepareRecord(txID, transactionData, sync, syncCompletion);
-
+
if (syncCompletion != null)
{
syncCompletion.waitCompletion();
@@ -1037,7 +1112,10 @@
* @param transactionData - extra user data for the prepare
* @throws Exception
*/
- public void appendPrepareRecord(final long txID, final EncodingSupport transactionData, final boolean sync, IOCompletion callback) throws Exception
+ public void appendPrepareRecord(final long txID,
+ final EncodingSupport transactionData,
+ final boolean sync,
+ IOCompletion callback) throws Exception
{
if (LOAD_TRACE)
{
@@ -1060,7 +1138,7 @@
if (callback != null)
{
- callback.lineUp();
+ callback.storeLineUp();
}
lockAppend.lock();
@@ -1081,22 +1159,19 @@
compactingLock.readLock().unlock();
}
}
-
-
-
+
public void appendCommitRecord(final long txID, final boolean sync) throws Exception
{
SyncIOCompletion syncCompletion = getSyncCallback(sync);
-
+
appendCommitRecord(txID, sync, syncCompletion);
-
+
if (syncCompletion != null)
{
syncCompletion.waitCompletion();
}
}
-
/**
* <p>A transaction record (Commit or Prepare), will hold the number of elements the transaction has on each file.</p>
* <p>For example, a transaction was spread along 3 journal files with 10 pendingTransactions on each file.
@@ -1114,7 +1189,6 @@
*
* @see JournalImpl#writeTransaction(byte, long, org.hornetq.core.journal.impl.JournalImpl.JournalTransaction, EncodingSupport)
*/
-
public void appendCommitRecord(final long txID, final boolean sync, final IOCompletion callback) throws Exception
{
@@ -1130,7 +1204,6 @@
try
{
-
if (tx == null)
{
throw new IllegalStateException("Cannot find tx with id " + txID);
@@ -1140,7 +1213,7 @@
if (callback != null)
{
- callback.lineUp();
+ callback.storeLineUp();
}
lockAppend.lock();
@@ -1162,20 +1235,19 @@
}
}
-
public void appendRollbackRecord(final long txID, final boolean sync) throws Exception
{
SyncIOCompletion syncCompletion = getSyncCallback(sync);
-
+
appendRollbackRecord(txID, sync, syncCompletion);
-
+
if (syncCompletion != null)
{
syncCompletion.waitCompletion();
}
}
-
+
public void appendRollbackRecord(final long txID, final boolean sync, final IOCompletion callback) throws Exception
{
if (state != STATE_LOADED)
@@ -1195,12 +1267,12 @@
{
throw new IllegalStateException("Cannot find tx with id " + txID);
}
-
+
JournalInternalRecord rollbackRecord = new JournalRollbackRecordTX(txID);
if (callback != null)
{
- callback.lineUp();
+ callback.storeLineUp();
}
lockAppend.lock();
@@ -1276,7 +1348,7 @@
}
public void addRecord(final RecordInfo info)
- {
+ {
records.add(info);
}
@@ -1288,7 +1360,7 @@
public void deleteRecord(final long id)
{
recordsToDelete.add(id);
-
+
// Clean up when the list is too large, or it won't be possible to load large sets of files
// Done as part of JBMESSAGING-1678
if (recordsToDelete.size() == DELETE_FLUSH)
@@ -1324,7 +1396,7 @@
committedRecords.add(record);
}
}
-
+
return info;
}
@@ -2695,7 +2767,6 @@
}
/**
- * Note: You should aways guarantee locking the semaphore lock.
*
* @param completeTransaction If the appendRecord is for a prepare or commit, where we should update the number of pendingTransactions on the current file
* */
@@ -2705,109 +2776,91 @@
final JournalTransaction tx,
final IOAsyncTask parameterCallback) throws Exception
{
- try
+ if (state != STATE_LOADED)
{
- if (state != STATE_LOADED)
- {
- throw new IllegalStateException("The journal is not loaded " + state);
- }
-
- final IOAsyncTask callback;
+ throw new IllegalStateException("The journal is not loaded " + state);
+ }
- int size = encoder.getEncodeSize();
+ final IOAsyncTask callback;
- // We take into account the fileID used on the Header
- if (size > fileSize - currentFile.getFile().calculateBlockStart(SIZE_HEADER))
- {
- throw new IllegalArgumentException("Record is too large to store " + size);
- }
+ int size = encoder.getEncodeSize();
- // Disable auto flush on the timer. The Timer should'nt flush anything
- currentFile.getFile().disableAutoFlush();
+ // We take into account the fileID used on the Header
+ if (size > fileSize - currentFile.getFile().calculateBlockStart(SIZE_HEADER))
+ {
+ throw new IllegalArgumentException("Record is too large to store " + size);
+ }
+ if (!currentFile.getFile().fits(size))
+ {
+ moveNextFile(false);
+
+ // The same check needs to be done at the new file also
if (!currentFile.getFile().fits(size))
{
- currentFile.getFile().enableAutoFlush();
-
- moveNextFile(false);
-
- currentFile.getFile().disableAutoFlush();
-
- // The same check needs to be done at the new file also
- if (!currentFile.getFile().fits(size))
- {
- // Sanity check, this should never happen
- throw new IllegalStateException("Invalid logic on buffer allocation");
- }
+ // Sanity check, this should never happen
+ throw new IllegalStateException("Invalid logic on buffer allocation");
}
+ }
- if (currentFile == null)
- {
- throw new NullPointerException("Current file = null");
- }
+ if (currentFile == null)
+ {
+ throw new NullPointerException("Current file = null");
+ }
- if (tx != null)
+ if (tx != null)
+ {
+ // The callback of a transaction has to be taken inside the lock,
+ // when we guarantee the currentFile will not be changed,
+ // since we individualize the callback per file
+ if (fileFactory.isSupportsCallbacks())
{
- // The callback of a transaction has to be taken inside the lock,
- // when we guarantee the currentFile will not be changed,
- // since we individualize the callback per file
- if (fileFactory.isSupportsCallbacks())
+ // Set the delegated callback as a parameter
+ TransactionCallback txcallback = tx.getCallback(currentFile);
+ if (parameterCallback != null)
{
- // Set the delegated callback as a parameter
- TransactionCallback txcallback = tx.getCallback(currentFile);
- if (parameterCallback != null)
- {
- txcallback.setDelegateCompletion(parameterCallback);
- }
- callback = txcallback;
+ txcallback.setDelegateCompletion(parameterCallback);
}
- else
- {
- callback = null;
- }
-
- if (sync)
- {
- // In an edge case the transaction could still have pending data from previous files.
- // This shouldn't cause any blocking issues, as this is here to guarantee we cover all possibilities
- // on guaranteeing the data is on the disk
- tx.syncPreviousFiles(fileFactory.isSupportsCallbacks(), currentFile);
- }
-
- // We need to add the number of records on currentFile if prepare or commit
- if (completeTransaction)
- {
- // Filling the number of pendingTransactions at the current file
- tx.fillNumberOfRecords(currentFile, encoder);
- }
+ callback = txcallback;
}
else
{
- callback = parameterCallback;
+ callback = null;
}
- // Adding fileID
- encoder.setFileID(currentFile.getFileID());
-
- if (callback != null)
+ if (sync)
{
- currentFile.getFile().write(encoder, sync, callback);
+ // In an edge case the transaction could still have pending data from previous files.
+ // This shouldn't cause any blocking issues, as this is here to guarantee we cover all possibilities
+ // on guaranteeing the data is on the disk
+ tx.syncPreviousFiles(fileFactory.isSupportsCallbacks(), currentFile);
}
- else
+
+ // We need to add the number of records on currentFile if prepare or commit
+ if (completeTransaction)
{
- currentFile.getFile().write(encoder, sync);
+ // Filling the number of pendingTransactions at the current file
+ tx.fillNumberOfRecords(currentFile, encoder);
}
+ }
+ else
+ {
+ callback = parameterCallback;
+ }
- return currentFile;
+ // Adding fileID
+ encoder.setFileID(currentFile.getFileID());
+
+ if (callback != null)
+ {
+ currentFile.getFile().write(encoder, sync, callback);
}
- finally
+ else
{
- if (currentFile != null)
- {
- currentFile.getFile().enableAutoFlush();
- }
+ currentFile.getFile().write(encoder, sync);
}
+ return currentFile;
}
/** Get the ID part of the name */
@@ -3352,7 +3405,7 @@
return id1 < id2 ? -1 : id1 == id2 ? 0 : 1;
}
}
-
+
private class PerfBlast extends Thread
{
private final int pages;
@@ -3371,7 +3424,7 @@
lockAppend.lock();
final ByteArrayEncoding byteEncoder = new ByteArrayEncoding(new byte[128 * 1024]);
-
+
JournalInternalRecord blastRecord = new JournalInternalRecord()
{
@@ -3386,7 +3439,6 @@
byteEncoder.encode(buffer);
}
};
-
for (int i = 0; i < pages; i++)
{
@@ -3401,11 +3453,5 @@
}
}
}
-
-
-
-
-
-
}
Modified: trunk/src/main/org/hornetq/core/journal/impl/NIOSequentialFile.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/NIOSequentialFile.java 2009-12-01 19:40:40 UTC (rev 8482)
+++ trunk/src/main/org/hornetq/core/journal/impl/NIOSequentialFile.java 2009-12-01 20:18:47 UTC (rev 8483)
@@ -46,21 +46,28 @@
/** The write semaphore here is only used when writing asynchronously */
private Semaphore maxIOSemaphore;
-
+
private final int defaultMaxIO;
-
+
private int maxIO;
- public NIOSequentialFile(final SequentialFileFactory factory, final String directory, final String fileName, final int maxIO, final Executor writerExecutor)
+ public NIOSequentialFile(final SequentialFileFactory factory,
+ final String directory,
+ final String fileName,
+ final int maxIO,
+ final Executor writerExecutor)
{
super(directory, new File(directory + "/" + fileName), factory, writerExecutor);
- this.defaultMaxIO = maxIO;
+ defaultMaxIO = maxIO;
}
- public NIOSequentialFile(final SequentialFileFactory factory, final File file, final int maxIO, final Executor writerExecutor)
+ public NIOSequentialFile(final SequentialFileFactory factory,
+ final File file,
+ final int maxIO,
+ final Executor writerExecutor)
{
super(file.getParent(), new File(file.getPath()), factory, writerExecutor);
- this.defaultMaxIO = maxIO;
+ defaultMaxIO = maxIO;
}
public int getAlignment()
@@ -82,7 +89,7 @@
* Some operations while initializing files on the journal may require a different maxIO */
public synchronized void open() throws Exception
{
- open(this.defaultMaxIO);
+ open(defaultMaxIO);
}
public void open(final int maxIO) throws Exception
@@ -92,10 +99,10 @@
channel = rfile.getChannel();
fileSize = channel.size();
-
+
if (writerExecutor != null)
{
- this.maxIOSemaphore = new Semaphore(maxIO);
+ maxIOSemaphore = new Semaphore(maxIO);
this.maxIO = maxIO;
}
}
@@ -130,18 +137,19 @@
}
}
+ @Override
public synchronized void close() throws Exception
{
super.close();
-
+
if (maxIOSemaphore != null)
{
while (!maxIOSemaphore.tryAcquire(maxIO, 60, TimeUnit.SECONDS))
{
- log.warn("Couldn't get lock after 60 seconds on closing AsynchronousFileImpl::" + this.getFileName());
+ log.warn("Couldn't get lock after 60 seconds on closing AsynchronousFileImpl::" + getFileName());
}
}
-
+
maxIOSemaphore = null;
if (channel != null)
@@ -213,6 +221,7 @@
}
}
+ @Override
public void position(final long pos) throws Exception
{
super.position(pos);
@@ -252,6 +261,16 @@
internalWrite(bytes, sync, null);
}
+ @Override
+ protected ByteBuffer newBuffer(int size, final int limit)
+ {
+ // For NIO, we don't need to allocate a buffer the entire size of the timed buffer, unlike AIO
+
+ size = limit;
+
+ return super.newBuffer(size, limit);
+ }
+
private void internalWrite(final ByteBuffer bytes, final boolean sync, final IOAsyncTask callback) throws Exception
{
if (!isOpen())
@@ -266,7 +285,7 @@
}
return;
}
-
+
if (writerExecutor == null)
{
doInternalWrite(bytes, sync, callback);
@@ -275,7 +294,7 @@
{
// This is a flow control on writing, just like maxAIO on libaio
maxIOSemaphore.acquire();
-
+
writerExecutor.execute(new Runnable()
{
public void run()
@@ -309,7 +328,7 @@
* @throws Exception
*/
private void doInternalWrite(final ByteBuffer bytes, final boolean sync, final IOAsyncTask callback) throws Exception
- {
+ {
position.addAndGet(bytes.limit());
channel.write(bytes);
Modified: trunk/src/main/org/hornetq/core/journal/impl/NIOSequentialFileFactory.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/NIOSequentialFileFactory.java 2009-12-01 19:40:40 UTC (rev 8482)
+++ trunk/src/main/org/hornetq/core/journal/impl/NIOSequentialFileFactory.java 2009-12-01 20:18:47 UTC (rev 8483)
@@ -28,7 +28,7 @@
* @author <a href="mailto:clebert.suconic@jboss.com">Clebert Suconic</a>
*
*/
-public class NIOSequentialFileFactory extends AbstractSequentialFactory implements SequentialFileFactory
+public class NIOSequentialFileFactory extends AbstractSequentialFileFactory implements SequentialFileFactory
{
private static final Logger log = Logger.getLogger(NIOSequentialFileFactory.class);
@@ -37,9 +37,8 @@
{
this(journalDir,
false,
- ConfigurationImpl.DEFAULT_JOURNAL_BUFFER_SIZE,
- ConfigurationImpl.DEFAULT_JOURNAL_BUFFER_TIMEOUT,
- ConfigurationImpl.DEFAULT_JOURNAL_FLUSH_SYNC,
+ ConfigurationImpl.DEFAULT_JOURNAL_BUFFER_SIZE_NIO,
+ ConfigurationImpl.DEFAULT_JOURNAL_BUFFER_TIMEOUT_NIO,
false);
}
@@ -47,26 +46,23 @@
{
this(journalDir,
buffered,
- ConfigurationImpl.DEFAULT_JOURNAL_BUFFER_SIZE,
- ConfigurationImpl.DEFAULT_JOURNAL_BUFFER_TIMEOUT,
- ConfigurationImpl.DEFAULT_JOURNAL_FLUSH_SYNC,
+ ConfigurationImpl.DEFAULT_JOURNAL_BUFFER_SIZE_NIO,
+ ConfigurationImpl.DEFAULT_JOURNAL_BUFFER_TIMEOUT_NIO,
false);
}
public NIOSequentialFileFactory(final String journalDir,
final boolean buffered,
final int bufferSize,
- final long bufferTimeout,
- final boolean flushOnSync,
+ final int bufferTimeout,
final boolean logRates)
{
- super(journalDir, buffered, bufferSize, bufferTimeout, flushOnSync, logRates);
+ super(journalDir, buffered, bufferSize, bufferTimeout, logRates);
}
- // maxIO is ignored on NIO
public SequentialFile createSequentialFile(final String fileName, int maxIO)
{
- if (maxIO < 0)
+ if (maxIO < 1)
{
// A single threaded IO
maxIO = 1;
Modified: trunk/src/main/org/hornetq/core/journal/impl/SimpleWaitIOCallback.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/SimpleWaitIOCallback.java 2009-12-01 19:40:40 UTC (rev 8482)
+++ trunk/src/main/org/hornetq/core/journal/impl/SimpleWaitIOCallback.java 2009-12-01 20:18:47 UTC (rev 8483)
@@ -71,7 +71,7 @@
/* (non-Javadoc)
* @see org.hornetq.core.journal.IOCompletion#linedUp()
*/
- public void lineUp()
+ public void storeLineUp()
{
}
}
Modified: trunk/src/main/org/hornetq/core/journal/impl/SyncSpeedTest.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/SyncSpeedTest.java 2009-12-01 19:40:40 UTC (rev 8482)
+++ trunk/src/main/org/hornetq/core/journal/impl/SyncSpeedTest.java 2009-12-01 20:18:47 UTC (rev 8483)
@@ -10,15 +10,20 @@
* implied. See the License for the specific language governing
* permissions and limitations under the License.
*/
-
package org.hornetq.core.journal.impl;
import java.io.File;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
-import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executors;
+import org.hornetq.core.journal.IOAsyncTask;
+import org.hornetq.core.journal.SequentialFile;
+import org.hornetq.core.journal.SequentialFileFactory;
import org.hornetq.core.logging.Logger;
/**
@@ -34,139 +39,303 @@
public class SyncSpeedTest
{
private static final Logger log = Logger.getLogger(SyncSpeedTest.class);
-
+
public static void main(final String[] args)
{
try
{
- new SyncSpeedTest().run();
+ new SyncSpeedTest().testScaleAIO();
}
catch (Exception e)
{
e.printStackTrace();
}
}
-
+
+ protected SequentialFileFactory fileFactory;
+
+ public boolean AIO = true;
+
+ protected void setupFactory()
+ {
+ if (AIO)
+ {
+ fileFactory = new AIOSequentialFileFactory(".", 0, 0, false);
+ }
+ else
+ {
+ fileFactory = new NIOSequentialFileFactory(".", false, 0, 0, false);
+ }
+ }
+
+ protected SequentialFile createSequentialFile(String fileName)
+ {
+ if (AIO)
+ {
+ return new AIOSequentialFile(fileFactory,
+ 0,
+ 0,
+ ".",
+ fileName,
+ 100000,
+ null,
+ null,
+ Executors.newSingleThreadExecutor());
+ }
+ else
+ {
+ return new NIOSequentialFile(fileFactory, new File(fileName), 1000, null);
+ }
+ }
+
+ public void run2() throws Exception
+ {
+ setupFactory();
+
+ int recordSize = 128 * 1024;
+
+ while (true)
+ {
+ System.out.println("** record size is " + recordSize);
+
+ int warmup = 500;
+
+ int its = 500;
+
+ int fileSize = (its + warmup) * recordSize;
+
+ SequentialFile file = createSequentialFile("sync-speed-test.dat");
+
+ if (file.exists())
+ {
+ file.delete();
+ }
+
+ file.open();
+
+ file.fill(0, fileSize, (byte)'X');
+
+ if (!AIO)
+ {
+ file.sync();
+ }
+
+ ByteBuffer bb1 = generateBuffer(recordSize, (byte)'h');
+
+ long start = 0;
+
+ for (int i = 0; i < its + warmup; i++)
+ {
+ if (i == warmup)
+ {
+ start = System.currentTimeMillis();
+ }
+
+ bb1.rewind();
+
+ file.writeDirect(bb1, true);
+ }
+
+ long end = System.currentTimeMillis();
+
+ double rate = 1000 * ((double)its) / (end - start);
+
+ double throughput = recordSize * rate;
+
+ System.out.println("Rate of " + rate + " syncs per sec");
+ System.out.println("Throughput " + throughput + " bytes per sec");
+ System.out.println("*************");
+
+ recordSize *= 2;
+ }
+ }
+
public void run() throws Exception
{
- int fileSize = 1024 * 1024 * 100;
-
- int recordSize = 1024;
-
- int its = 10 * 1024;
-
- File file = new File("sync-speed-test.dat");
-
- if (file.exists())
+ int recordSize = 256;
+
+ while (true)
{
- file.delete();
+ System.out.println("** record size is " + recordSize);
+
+ int warmup = 500;
+
+ int its = 500;
+
+ int fileSize = (its + warmup) * recordSize;
+
+ File file = new File("sync-speed-test.dat");
+
+ if (file.exists())
+ {
+ file.delete();
+ }
+
+ file.createNewFile();
+
+ RandomAccessFile rfile = new RandomAccessFile(file, "rw");
+
+ FileChannel channel = rfile.getChannel();
+
+ ByteBuffer bb = generateBuffer(fileSize, (byte)'x');
+
+ write(bb, channel, fileSize);
+
+ channel.force(true);
+
+ channel.position(0);
+
+ ByteBuffer bb1 = generateBuffer(recordSize, (byte)'h');
+
+ long start = 0;
+
+ for (int i = 0; i < its + warmup; i++)
+ {
+ if (i == warmup)
+ {
+ start = System.currentTimeMillis();
+ }
+
+ bb1.flip();
+ channel.write(bb1);
+ channel.force(false);
+ }
+
+ long end = System.currentTimeMillis();
+
+ double rate = 1000 * ((double)its) / (end - start);
+
+ double throughput = recordSize * rate;
+
+ System.out.println("Rate of " + rate + " syncs per sec");
+ System.out.println("Throughput " + throughput + " bytes per sec");
+
+ recordSize *= 2;
}
-
- RandomAccessFile rfile = new RandomAccessFile(file, "rw");
-
- FileChannel channel = rfile.getChannel();
-
- ByteBuffer bb = generateBuffer(fileSize, (byte)'x');
-
- write(bb, channel, fileSize);
-
- channel.force(false);
-
- channel.position(0);
-
- MappedByteBuffer mappedBB = channel.map(FileChannel.MapMode.READ_WRITE, 0, fileSize);
-
- mappedBB.load();
- // mappedBB.order(java.nio.ByteOrder.LITTLE_ENDIAN);
- System.out.println("isLoaded=" + mappedBB.isLoaded() + "; isDirect=" + mappedBB.isDirect() + "; byteOrder=" + mappedBB.order());
-
- ByteBuffer bb1 = generateBuffer(recordSize, (byte)'h');
-
- System.out.println("Measuring");
-
- long start = System.currentTimeMillis();
-
- for (int i = 0; i < its; i++)
+ }
+
+ public void testScaleAIO() throws Exception
+ {
+ setupFactory();
+
+ final int recordSize = 1024;
+
+ System.out.println("** record size is " + recordSize);
+
+ final int its = 10;
+
+ for (int numThreads = 1; numThreads <= 10; numThreads++)
{
- bb1.flip();
- mappedBB.position(0);
- mappedBB.put(bb1);
- mappedBB.force();
-
- //write(bb1, channel, recordSize);
- // channel.force(false);
+
+ int fileSize = its * recordSize * numThreads;
+
+ final SequentialFile file = createSequentialFile("sync-speed-test.dat");
+
+ if (file.exists())
+ {
+ file.delete();
+ }
+
+ file.open();
+
+ file.fill(0, fileSize, (byte)'X');
+
+ if (!AIO)
+ {
+ file.sync();
+ }
+
+ final CountDownLatch latch = new CountDownLatch(its * numThreads);
+
+ class MyIOAsyncTask implements IOAsyncTask
+ {
+ public void done()
+ {
+ latch.countDown();
+ }
+
+ public void onError(int errorCode, String errorMessage)
+ {
+
+ }
+ }
+
+ final MyIOAsyncTask task = new MyIOAsyncTask();
+
+ class MyRunner implements Runnable
+ {
+ private ByteBuffer bb1;
+
+ MyRunner()
+ {
+ bb1 = generateBuffer(recordSize, (byte)'h');
+ }
+
+ public void run()
+ {
+ for (int i = 0; i < its; i++)
+ {
+ bb1.rewind();
+
+ file.writeDirect(bb1, true, task);
+// try
+// {
+// file.writeDirect(bb1, true);
+// }
+// catch (Exception e)
+// {
+// e.printStackTrace();
+// }
+ }
+ }
+ }
+
+ Set<Thread> threads = new HashSet<Thread>();
+
+ for (int i = 0; i < numThreads; i++)
+ {
+ MyRunner runner = new MyRunner();
+
+ Thread t = new Thread(runner);
+
+ threads.add(t);
+ }
+
+ long start = System.currentTimeMillis();
+
+ for (Thread t : threads)
+ {
+ log.info("starting thread");
+ t.start();
+ }
+
+ for (Thread t : threads)
+ {
+ t.join();
+ }
+
+ latch.await();
+
+ long end = System.currentTimeMillis();
+
+ double rate = 1000 * ((double)its * numThreads) / (end - start);
+
+ double throughput = recordSize * rate;
+
+ System.out.println("For " + numThreads + " threads:");
+ System.out.println("Rate of " + rate + " records per sec");
+ System.out.println("Throughput " + throughput + " bytes per sec");
+ System.out.println("*************");
}
-
- long end = System.currentTimeMillis();
-
- double rate = 1000 * ((double)its) / (end - start);
-
- System.out.println("Rate of " + rate + " syncs per sec");
- file.delete();
}
-
-// public void run() throws Exception
-// {
-// log.info("******* Starting file sync speed test *******");
-//
-// int fileSize = 1024 * 1024 * 10;
-//
-// int recordSize = 1024;
-//
-// int its = 10 * 1024;
-//
-// File file = new File("sync-speed-test.dat");
-//
-// if (file.exists())
-// {
-// file.delete();
-// }
-//
-// RandomAccessFile rfile = new RandomAccessFile(file, "rw");
-//
-// FileChannel channel = rfile.getChannel();
-//
-// ByteBuffer bb = generateBuffer(fileSize, (byte)'x');
-//
-// write(bb, channel, fileSize);
-//
-// channel.force(false);
-//
-// channel.position(0);
-//
-// ByteBuffer bb1 = generateBuffer(recordSize, (byte)'h');
-//
-// log.info("Measuring");
-//
-// long start = System.currentTimeMillis();
-//
-// for (int i = 0; i < its; i++)
-// {
-// write(bb1, channel, recordSize);
-//
-// channel.force(false);
-// }
-//
-// long end = System.currentTimeMillis();
-//
-// double rate = 1000 * ((double)its) / (end - start);
-//
-// log.info("Rate of " + rate + " syncs per sec");
-//
-// rfile.close();
-//
-// file.delete();
-//
-// log.info("****** test complete *****");
-// }
-
+
private void write(final ByteBuffer buffer, final FileChannel channel, final int size) throws Exception
{
buffer.flip();
-
+
channel.write(buffer);
}
-
+
private ByteBuffer generateBuffer(final int size, final byte ch)
{
ByteBuffer bb = ByteBuffer.allocateDirect(size);
@@ -175,7 +344,7 @@
{
bb.put(ch);
}
-
+
return bb;
- }
-}
+ }
+}
\ No newline at end of file
Modified: trunk/src/main/org/hornetq/core/journal/impl/TimedBuffer.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/TimedBuffer.java 2009-12-01 19:40:40 UTC (rev 8482)
+++ trunk/src/main/org/hornetq/core/journal/impl/TimedBuffer.java 2009-12-01 20:18:47 UTC (rev 8483)
@@ -19,17 +19,15 @@
import java.util.List;
import java.util.Timer;
import java.util.TimerTask;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.atomic.AtomicLong;
import org.hornetq.core.buffers.HornetQBuffer;
import org.hornetq.core.buffers.HornetQBuffers;
import org.hornetq.core.journal.EncodingSupport;
import org.hornetq.core.journal.IOAsyncTask;
-import org.hornetq.core.journal.IOCompletion;
import org.hornetq.core.journal.impl.dataformat.ByteArrayEncoding;
import org.hornetq.core.logging.Logger;
-import org.hornetq.utils.VariableLatch;
/**
* A TimedBuffer
@@ -48,9 +46,10 @@
private TimedBufferObserver bufferObserver;
- // This is used to pause and resume the timer
- // This is a reusable Latch, that uses java.util.concurrent base classes
- private final VariableLatch latchTimer = new VariableLatch();
+ // If the TimedBuffer is idle - i.e. no records are being added, then it's pointless the timer flush thread
+ // in spinning and checking the time - and using up CPU in the process - this semaphore is used to
+ // prevent that
+ private final Semaphore spinLimiter = new Semaphore(1);
private CheckTimer timerRunnable = new CheckTimer();
@@ -62,13 +61,8 @@
private List<IOAsyncTask> callbacks;
- private final Lock lock = new ReentrantReadWriteLock().writeLock();
+ private volatile int timeout;
- // used to measure inactivity. This buffer will be automatically flushed when more than timeout inactive
- private volatile boolean active = false;
-
- private final long timeout;
-
// used to measure sync requests. When a sync is requested, it shouldn't take more than timeout to happen
private volatile boolean pendingSync = false;
@@ -76,21 +70,24 @@
private volatile boolean started;
- private final boolean flushOnSync;
+ // We use this flag to prevent flush occuring between calling checkSize and addBytes
+ // CheckSize must always be followed by it's corresponding addBytes otherwise the buffer
+ // can get in an inconsistent state
+ private boolean delayFlush;
// for logging write rates
private final boolean logRates;
- private volatile long bytesFlushed;
+ private final AtomicLong bytesFlushed = new AtomicLong(0);
- private volatile long flushesDone;
+ private final AtomicLong flushesDone = new AtomicLong(0);
private Timer logRatesTimer;
private TimerTask logRatesTimerTask;
- private long lastExecution;
+ private final AtomicLong lastFlushTime = new AtomicLong(0);
// Static --------------------------------------------------------
@@ -98,23 +95,29 @@
// Public --------------------------------------------------------
- public TimedBuffer(final int size, final long timeout, final boolean flushOnSync, final boolean logRates)
+ public TimedBuffer(final int size, final int timeout, final boolean logRates)
{
- this.bufferSize = size;
+ log.info("timed buffer size " + size);
+ log.info("timed buffer timeout " + timeout);
+
+ bufferSize = size;
+
this.logRates = logRates;
+
if (logRates)
{
- this.logRatesTimer = new Timer(true);
+ logRatesTimer = new Timer(true);
}
// Setting the interval for nano-sleeps
buffer = HornetQBuffers.fixedBuffer(bufferSize);
+
buffer.clear();
+
bufferLimit = 0;
callbacks = new ArrayList<IOAsyncTask>();
- this.flushOnSync = flushOnSync;
- latchTimer.up();
+
this.timeout = timeout;
}
@@ -127,7 +130,7 @@
timerRunnable = new CheckTimer();
- timerThread = new Thread(timerRunnable, "hornetq-async-buffer");
+ timerThread = new Thread(timerRunnable, "hornetq-buffer-timeout");
timerThread.start();
@@ -148,11 +151,11 @@
return;
}
- this.flush();
+ flush();
- this.bufferObserver = null;
+ bufferObserver = null;
- latchTimer.down();
+ spinLimiter.release();
timerRunnable.close();
@@ -175,26 +178,16 @@
started = false;
}
- public synchronized void setObserver(TimedBufferObserver observer)
+ public synchronized void setObserver(final TimedBufferObserver observer)
{
- if (this.bufferObserver != null)
+ if (bufferObserver != null)
{
flush();
}
- this.bufferObserver = observer;
+ bufferObserver = observer;
}
- public void disableAutoFlush()
- {
- lock.lock();
- }
-
- public void enableAutoFlush()
- {
- lock.unlock();
- }
-
/**
* Verify if the size fits the buffer
* @param sizeChecked
@@ -210,23 +203,36 @@
if (bufferLimit == 0 || buffer.writerIndex() + sizeChecked > bufferLimit)
{
+ // Either there is not enough space left in the buffer for the sized record
+ // Or a flush has just been performed and we need to re-calcualate bufferLimit
+
flush();
- final int remaining = bufferObserver.getRemainingBytes();
+ delayFlush = true;
- if (sizeChecked > remaining)
+ final int remainingInFile = bufferObserver.getRemainingBytes();
+
+ if (sizeChecked > remainingInFile)
{
+ // Need to move to a new file -not enough space in file for this size
+
return false;
}
else
{
- buffer.clear();
- bufferLimit = Math.min(remaining, bufferSize);
+ // There is enough space in the file for this size
+
+ // Need to re-calculate buffer limit
+
+ bufferLimit = Math.min(remainingInFile, bufferSize);
+
return true;
}
}
else
{
+ delayFlush = true;
+
return true;
}
}
@@ -238,87 +244,84 @@
public synchronized void addBytes(final EncodingSupport bytes, final boolean sync, final IOAsyncTask callback)
{
+ delayFlush = false;
+
if (buffer.writerIndex() == 0)
{
- // Resume latch
- latchTimer.down();
+ // More bytes have been added so the timer flush thread can resume
+
+ spinLimiter.release();
}
bytes.encode(buffer);
callbacks.add(callback);
- active = true;
-
if (sync)
{
- if (!pendingSync)
- {
- pendingSync = true;
- }
+ pendingSync = true;
- if (flushOnSync)
- {
- flush();
- }
+// if (System.nanoTime() - lastFlushTime.get() > timeout)
+// {
+// // This might happen if there is low activity in the buffer - the timer hasn't fired because no sync records
+// // have been recently added, and suddenly a sync record is added
+// // In this case we do a flush immediately, which can reduce latency in this case
+//
+// flush();
+// }
}
- if (buffer.writerIndex() == bufferLimit)
- {
- flush();
- }
}
public void flush()
{
- ByteBuffer bufferToFlush = null;
-
- boolean useSync = false;
-
- List<IOAsyncTask> callbacksToCall = null;
-
synchronized (this)
{
- if (buffer.writerIndex() > 0)
+ if (!delayFlush && buffer.writerIndex() > 0)
{
- latchTimer.up();
-
int pos = buffer.writerIndex();
if (logRates)
{
- bytesFlushed += pos;
+ bytesFlushed.addAndGet(pos);
}
- bufferToFlush = bufferObserver.newBuffer(bufferSize, pos);
+ ByteBuffer bufferToFlush = bufferObserver.newBuffer(bufferSize, pos);
// Putting a byteArray on a native buffer is much faster, since it will do in a single native call.
// Using bufferToFlush.put(buffer) would make several append calls for each byte
bufferToFlush.put(buffer.toByteBuffer().array(), 0, pos);
- callbacksToCall = callbacks;
+ if (bufferToFlush != null)
+ {
+ bufferObserver.flushBuffer(bufferToFlush, pendingSync, callbacks);
+ }
- callbacks = new LinkedList<IOAsyncTask>();
+ try
+ {
+ // We acquire the spinLimiter semaphore - this prevents the timer flush thread unnecessarily spinning
+ // when the buffer is inactive
+ spinLimiter.acquire();
+ }
+ catch (InterruptedException e)
+ {
+ // Ignore
+ }
- useSync = pendingSync;
+ lastFlushTime.set(System.nanoTime());
- active = false;
pendingSync = false;
+ callbacks = new LinkedList<IOAsyncTask>();
+
buffer.clear();
+
bufferLimit = 0;
- flushesDone++;
+ flushesDone.incrementAndGet();
}
}
-
- // Execute the flush outside of the lock
- // This is important for NIO performance while we are using NIO Callbacks
- if (bufferToFlush != null)
- {
- bufferObserver.flushBuffer(bufferToFlush, useSync, callbacksToCall);
- }
}
// Package protected ---------------------------------------------
@@ -327,36 +330,18 @@
// Private -------------------------------------------------------
- private void checkTimer()
- {
- // if inactive for more than the timeout
- // of if a sync happened at more than the the timeout ago
- if (!active || pendingSync)
- {
- lock.lock();
- try
- {
- if (bufferObserver != null)
- {
- flush();
- }
- }
- finally
- {
- lock.unlock();
- }
- }
-
- // Set the buffer as inactive.. we will flush the buffer next tick if nothing change this
- active = false;
- }
-
// Inner classes -------------------------------------------------
private class LogRatesTimerTask extends TimerTask
{
private boolean closed;
+ private long lastExecution;
+
+ private long lastBytesFlushed;
+
+ private long lastFlushesDone;
+
@Override
public synchronized void run()
{
@@ -364,22 +349,26 @@
{
long now = System.currentTimeMillis();
+ long bytesF = bytesFlushed.get();
+ long flushesD = flushesDone.get();
+
if (lastExecution != 0)
{
- double rate = 1000 * ((double)bytesFlushed) / (now - lastExecution);
+ double rate = 1000 * (double)(bytesF - lastBytesFlushed) / (now - lastExecution);
log.info("Write rate = " + rate + " bytes / sec or " + (long)(rate / (1024 * 1024)) + " MiB / sec");
- double flushRate = 1000 * ((double)flushesDone) / (now - lastExecution);
+ double flushRate = 1000 * (double)(flushesD - lastFlushesDone) / (now - lastExecution);
log.info("Flush rate = " + flushRate + " flushes / sec");
}
lastExecution = now;
- bytesFlushed = 0;
+ lastBytesFlushed = bytesF;
- flushesDone = 0;
+ lastFlushesDone = flushesD;
}
}
+ @Override
public synchronized boolean cancel()
{
closed = true;
@@ -396,33 +385,29 @@
{
while (!closed)
{
- try
+ // We flush on the timer if there are pending syncs there and we've waited waited at least one
+ // timeout since the time of the last flush
+ // Effectively flushing "resets" the timer
+
+ if (pendingSync && bufferObserver != null && (System.nanoTime() > lastFlushTime.get() + timeout))
{
- latchTimer.waitCompletion();
+ flush();
}
- catch (InterruptedException ignored)
+
+ try
{
- }
+ spinLimiter.acquire();
- sleep();
+ Thread.yield();
- checkTimer();
-
+ spinLimiter.release();
+ }
+ catch (InterruptedException ignore)
+ {
+ }
}
}
- /**
- *
- */
- private void sleep()
- {
- long time = System.nanoTime() + timeout;
- while (time > System.nanoTime())
- {
- Thread.yield();
- }
- }
-
public void close()
{
closed = true;
Modified: trunk/src/main/org/hornetq/core/management/HornetQServerControl.java
===================================================================
--- trunk/src/main/org/hornetq/core/management/HornetQServerControl.java 2009-12-01 19:40:40 UTC (rev 8482)
+++ trunk/src/main/org/hornetq/core/management/HornetQServerControl.java 2009-12-01 20:18:47 UTC (rev 8483)
@@ -59,8 +59,12 @@
int getJournalMinFiles();
- int getJournalMaxAIO();
+ int getJournalMaxIO();
+ int getJournalBufferSize();
+
+ int getJournalBufferTimeout();
+
int getJournalCompactMinFiles();
int getJournalCompactPercentage();
@@ -87,10 +91,6 @@
boolean isSharedStore();
- int getAIOBufferSize();
-
- int getAIOBufferTimeout();
-
String getPagingDirectory();
boolean isPersistDeliveryCountBeforeDelivery();
Modified: trunk/src/main/org/hornetq/core/management/impl/HornetQServerControlImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/management/impl/HornetQServerControlImpl.java 2009-12-01 19:40:40 UTC (rev 8482)
+++ trunk/src/main/org/hornetq/core/management/impl/HornetQServerControlImpl.java 2009-12-01 20:18:47 UTC (rev 8483)
@@ -47,6 +47,7 @@
import org.hornetq.core.remoting.RemotingConnection;
import org.hornetq.core.remoting.server.RemotingService;
import org.hornetq.core.server.HornetQServer;
+import org.hornetq.core.server.JournalType;
import org.hornetq.core.server.ServerSession;
import org.hornetq.core.transaction.ResourceManager;
import org.hornetq.core.transaction.Transaction;
@@ -88,12 +89,12 @@
// Constructors --------------------------------------------------
public HornetQServerControlImpl(final PostOffice postOffice,
- final Configuration configuration,
- final ResourceManager resourceManager,
- final RemotingService remotingService,
- final HornetQServer messagingServer,
- final MessageCounterManager messageCounterManager,
- final NotificationBroadcasterSupport broadcaster) throws Exception
+ final Configuration configuration,
+ final ResourceManager resourceManager,
+ final RemotingService remotingService,
+ final HornetQServer messagingServer,
+ final MessageCounterManager messageCounterManager,
+ final NotificationBroadcasterSupport broadcaster) throws Exception
{
super(HornetQServerControl.class);
this.postOffice = postOffice;
@@ -128,7 +129,7 @@
{
return configuration.isBackup();
}
-
+
public boolean isSharedStore()
{
return configuration.isSharedStore();
@@ -146,19 +147,28 @@
public String[] getInterceptorClassNames()
{
- return configuration.getInterceptorClassNames().toArray(new String[configuration.getInterceptorClassNames().size()]);
+ return configuration.getInterceptorClassNames().toArray(new String[configuration.getInterceptorClassNames()
+ .size()]);
}
- public int getAIOBufferSize()
+ public int getJournalBufferSize()
{
- return configuration.getJournalBufferSize();
+ return configuration.getJournalType() == JournalType.ASYNCIO ? configuration.getJournalBufferSize_AIO()
+ : configuration.getJournalBufferSize_NIO();
}
-
- public int getAIOBufferTimeout()
+
+ public int getJournalBufferTimeout()
{
- return configuration.getJournalBufferTimeout();
+ return configuration.getJournalType() == JournalType.ASYNCIO ? configuration.getJournalBufferTimeout_AIO()
+ : configuration.getJournalBufferTimeout_NIO();
}
-
+
+ public int getJournalMaxIO()
+ {
+ return configuration.getJournalType() == JournalType.ASYNCIO ? configuration.getJournalMaxIO_AIO()
+ : configuration.getJournalMaxIO_NIO();
+ }
+
public String getJournalDirectory()
{
return configuration.getJournalDirectory();
@@ -169,16 +179,11 @@
return configuration.getJournalFileSize();
}
- public int getJournalMaxAIO()
- {
- return configuration.getJournalMaxAIO();
- }
-
public int getJournalMinFiles()
{
return configuration.getJournalMinFiles();
}
-
+
public int getJournalCompactMinFiles()
{
return configuration.getJournalCompactMinFiles();
@@ -208,7 +213,7 @@
{
return configuration.getScheduledThreadPoolMaxSize();
}
-
+
public int getThreadPoolMaxSize()
{
return configuration.getThreadPoolMaxSize();
@@ -265,7 +270,7 @@
{
server.createQueue(new SimpleString(address), new SimpleString(name), null, true, false);
}
-
+
public void createQueue(final String address, final String name, final boolean durable) throws Exception
{
server.createQueue(new SimpleString(address), new SimpleString(name), null, durable, false);
@@ -291,7 +296,7 @@
QueueControl queue = (QueueControl)queues[i];
names[i] = queue.getName();
}
-
+
return names;
}
@@ -304,7 +309,7 @@
AddressControl address = (AddressControl)addresses[i];
names[i] = address.getAddress();
}
-
+
return names;
}
@@ -319,7 +324,7 @@
{
return server.getConnectionCount();
}
-
+
public void enableMessageCounters()
{
setMessageCounterEnabled(true);
@@ -382,7 +387,6 @@
{
DateFormat dateFormat = DateFormat.getDateTimeInstance(DateFormat.SHORT, DateFormat.MEDIUM);
-
Map<Xid, Long> xids = resourceManager.getPreparedTransactionsWithCreationTime();
ArrayList<Entry<Xid, Long>> xidsSortedByCreationTime = new ArrayList<Map.Entry<Xid, Long>>(xids.entrySet());
Collections.sort(xidsSortedByCreationTime, new Comparator<Entry<Xid, Long>>()
@@ -403,7 +407,7 @@
}
return s;
}
-
+
public String[] listHeuristicCommittedTransactions()
{
List<Xid> xids = resourceManager.getHeuristicCommittedTransactions();
@@ -415,7 +419,7 @@
}
return s;
}
-
+
public String[] listHeuristicRolledBackTransactions()
{
List<Xid> xids = resourceManager.getHeuristicRolledbackTransactions();
@@ -505,7 +509,7 @@
{
remotingService.removeConnection(connection.getID());
connection.fail(new HornetQException(HornetQException.INTERNAL_ERROR, "connections for " + ipAddress +
- " closed by management"));
+ " closed by management"));
closed = true;
}
}
@@ -540,33 +544,33 @@
public Object[] getConnectors() throws Exception
{
Collection<TransportConfiguration> connectorConfigurations = configuration.getConnectorConfigurations().values();
-
+
Object[] ret = new Object[connectorConfigurations.size()];
-
+
int i = 0;
- for (TransportConfiguration config: connectorConfigurations)
+ for (TransportConfiguration config : connectorConfigurations)
{
Object[] tc = new Object[3];
-
+
tc[0] = config.getName();
tc[1] = config.getFactoryClassName();
tc[2] = config.getParams();
-
+
ret[i++] = tc;
}
-
+
return ret;
}
-
+
public String getConnectorsAsJSON() throws Exception
{
JSONArray array = new JSONArray();
-
- for (TransportConfiguration config: configuration.getConnectorConfigurations().values())
+
+ for (TransportConfiguration config : configuration.getConnectorConfigurations().values())
{
array.put(new JSONObject(config));
}
-
+
return array.toString();
}
Modified: trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2009-12-01 19:40:40 UTC (rev 8482)
+++ trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2009-12-01 20:18:47 UTC (rev 8483)
@@ -93,7 +93,6 @@
*/
public class JournalStorageManager implements StorageManager
{
-
private static final Logger log = Logger.getLogger(JournalStorageManager.class);
private static final long CHECKPOINT_BATCH_SIZE = Integer.MAX_VALUE;
@@ -145,7 +144,7 @@
private final SequentialFileFactory largeMessagesFactory;
private volatile boolean started;
-
+
/** Used to create Operation Contexts */
private final ExecutorFactory executorFactory;
@@ -172,7 +171,9 @@
this(config, executorFactory, null);
}
- public JournalStorageManager(final Configuration config, final ExecutorFactory executorFactory, final ReplicationManager replicator)
+ public JournalStorageManager(final Configuration config,
+ final ExecutorFactory executorFactory,
+ final ReplicationManager replicator)
{
this.executorFactory = executorFactory;
@@ -229,37 +230,32 @@
SequentialFileFactory journalFF = null;
- if (config.getJournalType() == JournalType.ASYNCIO)
+ JournalType journalTypeToUse = config.getJournalType();
+
+ if (config.getJournalType() == JournalType.ASYNCIO && !AIOSequentialFileFactory.isSupported())
{
+ log.warn("AIO wasn't located on this platform, it will fall back to using pure Java NIO. If your platform is Linux, install LibAIO to enable the AIO journal");
+
+ journalTypeToUse = JournalType.NIO;
+ }
+
+ if (journalTypeToUse == JournalType.ASYNCIO)
+ {
log.info("AIO journal selected");
- if (!AIOSequentialFileFactory.isSupported())
- {
- log.warn("AIO wasn't located on this platform, it will fall back to using pure Java NIO. If your platform is Linux, install LibAIO to enable the AIO journal");
- journalFF = new NIOSequentialFileFactory(journalDir,
- true,
- config.getJournalBufferSize(),
- config.getJournalBufferTimeout(),
- config.isJournalFlushOnSync(),
- config.isLogJournalWriteRate());
- }
- else
- {
- journalFF = new AIOSequentialFileFactory(journalDir,
- config.getJournalBufferSize(),
- config.getJournalBufferTimeout(),
- config.isJournalFlushOnSync(),
- config.isLogJournalWriteRate());
- log.info("AIO loaded successfully");
- }
+
+ journalFF = new AIOSequentialFileFactory(journalDir,
+ config.getJournalBufferSize_AIO(),
+ config.getJournalBufferTimeout_AIO(),
+ config.isLogJournalWriteRate());
+ log.info("AIO loaded successfully");
}
else if (config.getJournalType() == JournalType.NIO)
{
log.info("NIO Journal selected");
journalFF = new NIOSequentialFileFactory(journalDir,
true,
- config.getJournalBufferSize(),
- config.getJournalBufferTimeout(),
- config.isJournalFlushOnSync(),
+ config.getJournalBufferSize_NIO(),
+ config.getJournalBufferTimeout_NIO(),
config.isLogJournalWriteRate());
}
else
@@ -283,7 +279,8 @@
journalFF,
"hornetq-data",
"hq",
- config.getJournalMaxAIO());
+ config.getJournalType() == JournalType.ASYNCIO ? config.getJournalMaxIO_AIO()
+ : config.getJournalMaxIO_NIO());
if (replicator != null)
{
@@ -308,7 +305,7 @@
{
getContext().complete();
}
-
+
public void clearContext()
{
OperationContextImpl.clearContext();
@@ -319,7 +316,6 @@
return replicator != null;
}
-
public void waitOnOperations() throws Exception
{
waitOnOperations(-1);
@@ -337,8 +333,7 @@
{
waitCallback.waitCompletion();
}
- else
- if (!waitCallback.waitCompletion(timeout))
+ else if (!waitCallback.waitCompletion(timeout))
{
throw new IllegalStateException("no response received from replication");
}
@@ -383,7 +378,6 @@
// TODO: shouldn't those page methods be on the PageManager? ^^^^
-
/* (non-Javadoc)
* @see org.hornetq.core.persistence.StorageManager#getContext()
*/
@@ -391,7 +385,7 @@
{
return OperationContextImpl.getContext(executorFactory);
}
-
+
public void setContext(OperationContext context)
{
OperationContextImpl.setContext(context);
@@ -478,12 +472,12 @@
public void storeMessage(final ServerMessage message) throws Exception
{
- //TODO - how can this be less than zero?
+ // TODO - how can this be less than zero?
if (message.getMessageID() <= 0)
{
throw new HornetQException(HornetQException.ILLEGAL_STATE, "MessageId was not assigned to Message");
}
-
+
// Note that we don't sync, the add reference that comes immediately after will sync if appropriate
if (message.isLargeMessage())
@@ -491,50 +485,64 @@
messageJournal.appendAddRecord(message.getMessageID(),
ADD_LARGE_MESSAGE,
new LargeMessageEncoding((LargeServerMessage)message),
- false, getContext(false));
+ false,
+ getContext(false));
}
else
- {
+ {
messageJournal.appendAddRecord(message.getMessageID(), ADD_MESSAGE, message, false, getContext(false));
}
}
+ public void storeReference(final long queueID, final long messageID, final boolean last) throws Exception
- public void storeReference(final long queueID, final long messageID, final boolean last) throws Exception
- {
- messageJournal.appendUpdateRecord(messageID, ADD_REF, new RefEncoding(queueID), last && syncNonTransactional, getContext(syncNonTransactional));
+ {
+ messageJournal.appendUpdateRecord(messageID,
+ ADD_REF,
+ new RefEncoding(queueID),
+ last && syncNonTransactional,
+ getContext(last && syncNonTransactional));
}
public void storeAcknowledge(final long queueID, final long messageID) throws Exception
- {
- messageJournal.appendUpdateRecord(messageID, ACKNOWLEDGE_REF, new RefEncoding(queueID), syncNonTransactional, getContext(syncNonTransactional));
+ {
+ messageJournal.appendUpdateRecord(messageID,
+ ACKNOWLEDGE_REF,
+ new RefEncoding(queueID),
+ syncNonTransactional,
+ getContext(syncNonTransactional));
}
public void deleteMessage(final long messageID) throws Exception
- {
+ {
messageJournal.appendDeleteRecord(messageID, syncNonTransactional, getContext(syncNonTransactional));
}
public void updateScheduledDeliveryTime(final MessageReference ref) throws Exception
- {
+ {
ScheduledDeliveryEncoding encoding = new ScheduledDeliveryEncoding(ref.getScheduledDeliveryTime(), ref.getQueue()
.getID());
messageJournal.appendUpdateRecord(ref.getMessage().getMessageID(),
SET_SCHEDULED_DELIVERY_TIME,
encoding,
- syncNonTransactional, getContext(syncNonTransactional));
+ syncNonTransactional,
+ getContext(syncNonTransactional));
}
public void storeDuplicateID(final SimpleString address, final byte[] duplID, final long recordID) throws Exception
- {
+ {
DuplicateIDEncoding encoding = new DuplicateIDEncoding(address, duplID);
- messageJournal.appendAddRecord(recordID, DUPLICATE_ID, encoding, syncNonTransactional, getContext(syncNonTransactional));
+ messageJournal.appendAddRecord(recordID,
+ DUPLICATE_ID,
+ encoding,
+ syncNonTransactional,
+ getContext(syncNonTransactional));
}
public void deleteDuplicateID(long recordID) throws Exception
- {
+ {
messageJournal.appendDeleteRecord(recordID, syncNonTransactional, getContext(syncNonTransactional));
}
@@ -591,7 +599,12 @@
public long storeHeuristicCompletion(Xid xid, boolean isCommit) throws Exception
{
long id = generateUniqueID();
- messageJournal.appendAddRecord(id, HEURISTIC_COMPLETION, new HeuristicCompletionEncoding(xid, isCommit), true, getContext(true));
+
+ messageJournal.appendAddRecord(id,
+ HEURISTIC_COMPLETION,
+ new HeuristicCompletionEncoding(xid, isCommit),
+ true,
+ getContext(true));
return id;
}
@@ -668,10 +681,10 @@
DeliveryCountUpdateEncoding updateInfo = new DeliveryCountUpdateEncoding(ref.getQueue().getID(),
ref.getDeliveryCount());
- messageJournal.appendUpdateRecord(ref.getMessage().getMessageID(),
- UPDATE_DELIVERY_COUNT,
- updateInfo,
- syncNonTransactional, getContext(syncNonTransactional));
+ messageJournal.appendUpdateRecord(ref.getMessage().getMessageID(), UPDATE_DELIVERY_COUNT, updateInfo,
+
+ syncNonTransactional, getContext(syncNonTransactional));
+
}
private static final class AddMessageRecord
@@ -738,7 +751,7 @@
JournalLoadInformation info = messageJournal.load(records,
preparedTransactions,
new LargeMessageTXFailureCallback(messages));
-
+
ArrayList<LargeServerMessage> largeMessages = new ArrayList<LargeServerMessage>();
Map<Long, Map<Long, AddMessageRecord>> queueMap = new HashMap<Long, Map<Long, AddMessageRecord>>();
@@ -764,7 +777,7 @@
break;
}
case ADD_MESSAGE:
- {
+ {
ServerMessage message = new ServerMessageImpl(record.id, 50);
message.decode(buff);
@@ -967,6 +980,11 @@
messageJournal.perfBlast(perfBlastPages);
}
+ if (System.getProperty("org.hornetq.opt.directblast") != null)
+ {
+ messageJournal.runDirectJournalBlast();
+ }
+
return info;
}
@@ -1355,7 +1373,7 @@
return info;
}
-
+
// Public -----------------------------------------------------------------------------------
public Journal getMessageJournal()
@@ -1416,7 +1434,7 @@
}
// Private ----------------------------------------------------------------------------------
-
+
private void checkAndCreateDir(final String dir, final boolean create)
{
File f = new File(dir);
@@ -1464,18 +1482,15 @@
return DummyOperationContext.getInstance();
}
}
-
-
// Inner Classes
// ----------------------------------------------------------------------------
-
static class DummyOperationContext implements OperationContext
{
-
+
private static DummyOperationContext instance = new DummyOperationContext();
-
+
public static OperationContext getInstance()
{
return instance;
@@ -1512,7 +1527,7 @@
/* (non-Javadoc)
* @see org.hornetq.core.journal.IOCompletion#lineUp()
*/
- public void lineUp()
+ public void storeLineUp()
{
}
@@ -1529,9 +1544,9 @@
public void onError(int errorCode, String errorMessage)
{
}
-
+
}
-
+
private static class XidEncoding implements EncodingSupport
{
final Xid xid;
@@ -2018,5 +2033,4 @@
}
-
}
Modified: trunk/src/main/org/hornetq/core/persistence/impl/journal/OperationContextImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/persistence/impl/journal/OperationContextImpl.java 2009-12-01 19:40:40 UTC (rev 8482)
+++ trunk/src/main/org/hornetq/core/persistence/impl/journal/OperationContextImpl.java 2009-12-01 20:18:47 UTC (rev 8483)
@@ -20,6 +20,7 @@
import java.util.concurrent.atomic.AtomicInteger;
import org.hornetq.core.journal.IOAsyncTask;
+import org.hornetq.core.logging.Logger;
import org.hornetq.core.persistence.OperationContext;
import org.hornetq.utils.ExecutorFactory;
@@ -38,6 +39,7 @@
*/
public class OperationContextImpl implements OperationContext
{
+ private static final Logger log = Logger.getLogger(OperationContextImpl.class);
private static final ThreadLocal<OperationContext> threadLocalContext = new ThreadLocal<OperationContext>();
@@ -61,8 +63,7 @@
{
threadLocalContext.set(context);
}
-
-
+
private List<TaskHolder> tasks;
private volatile int storeLineUp = 0;
@@ -90,10 +91,9 @@
super();
this.executor = executor;
}
-
- /** To be called by the replication manager, when new replication is added to the queue */
- public void lineUp()
- {
+
+ public void storeLineUp()
+ {
storeLineUp++;
}
@@ -108,7 +108,6 @@
checkTasks();
}
- /** You may have several actions to be done after a replication operation is completed. */
public void executeOnCompletion(final IOAsyncTask completion)
{
if (errorCode != -1)
@@ -159,7 +158,6 @@
}
- /** To be called by the storage manager, when data is confirmed on the channel */
public synchronized void done()
{
stored++;
Modified: trunk/src/main/org/hornetq/core/replication/impl/ReplicatedJournal.java
===================================================================
--- trunk/src/main/org/hornetq/core/replication/impl/ReplicatedJournal.java 2009-12-01 19:40:40 UTC (rev 8482)
+++ trunk/src/main/org/hornetq/core/replication/impl/ReplicatedJournal.java 2009-12-01 20:18:47 UTC (rev 8483)
@@ -519,6 +519,12 @@
return localJournal.getNumberOfRecords();
}
+ public void runDirectJournalBlast() throws Exception
+ {
+ // TODO Auto-generated method stub
+
+ }
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
Modified: trunk/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java 2009-12-01 19:40:40 UTC (rev 8482)
+++ trunk/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java 2009-12-01 20:18:47 UTC (rev 8483)
@@ -695,12 +695,24 @@
}
catch (HornetQException e)
{
+ csf.close();
+
// the session was created while its server was starting, retry it:
if (e.getCode() == HornetQException.SESSION_CREATION_REJECTED)
{
log.warn("Server is starting, retry to create the session for bridge " + name);
+ //Sleep a little to prevent spinning too much
+ try
+ {
+ Thread.sleep(10);
+ }
+ catch (InterruptedException ignore)
+ {
+ }
+
retry = true;
+
continue;
}
else
Modified: trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2009-12-01 19:40:40 UTC (rev 8482)
+++ trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2009-12-01 20:18:47 UTC (rev 8483)
@@ -22,6 +22,7 @@
import java.util.Set;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -52,7 +53,10 @@
import org.hornetq.core.exception.HornetQException;
import org.hornetq.core.filter.Filter;
import org.hornetq.core.filter.impl.FilterImpl;
+import org.hornetq.core.journal.EncodingSupport;
import org.hornetq.core.journal.IOAsyncTask;
+import org.hornetq.core.journal.IOCompletion;
+import org.hornetq.core.journal.Journal;
import org.hornetq.core.journal.JournalLoadInformation;
import org.hornetq.core.journal.impl.SyncSpeedTest;
import org.hornetq.core.logging.LogDelegateFactory;
@@ -90,11 +94,16 @@
import org.hornetq.core.security.SecurityStore;
import org.hornetq.core.security.impl.SecurityStoreImpl;
import org.hornetq.core.server.ActivateCallback;
+import org.hornetq.core.server.Consumer;
import org.hornetq.core.server.Divert;
+import org.hornetq.core.server.HandleStatus;
import org.hornetq.core.server.HornetQServer;
import org.hornetq.core.server.MemoryManager;
+import org.hornetq.core.server.MessageReference;
import org.hornetq.core.server.Queue;
import org.hornetq.core.server.QueueFactory;
+import org.hornetq.core.server.RoutingContext;
+import org.hornetq.core.server.ServerConsumer;
import org.hornetq.core.server.ServerMessage;
import org.hornetq.core.server.ServerSession;
import org.hornetq.core.server.cluster.ClusterManager;
@@ -630,7 +639,7 @@
throw new HornetQException(HornetQException.INCOMPATIBLE_CLIENT_SERVER_VERSIONS,
"Server and client versions incompatible");
}
-
+
if (!checkActivate())
{
// Backup server is not ready to accept connections
@@ -970,7 +979,7 @@
if (replicationEndpoint == null)
{
log.warn("There is no replication endpoint, can't activate this backup server");
-
+
throw new HornetQException(HornetQException.INTERNAL_ERROR, "Can't activate the server");
}
@@ -1189,6 +1198,8 @@
}
initialised = true;
+
+ log.info("********** initialised");
if (System.getProperty("org.hornetq.opt.routeblast") != null)
{
@@ -1482,14 +1493,50 @@
}
}
+
+// private void runRouteBlastNoWait() throws Exception
+// {
+// SimpleString address = new SimpleString("rbnw_address");
+// SimpleString queueName = new SimpleString("rbnw_name");
+//
+// createQueue(address, queueName, null, true, false, true);
+//
+// Queue queue = (Queue)postOffice.getBinding(queueName).getBindable();
+//
+// RBConsumer consumer = new RBConsumer(queue);
+//
+// queue.addConsumer(consumer);
+//
+// final int bodySize = 1024;
+//
+// byte[] body = new byte[bodySize];
+//
+// final int numMessages = 10000000;
+//
+// for (int i = 0; i < numMessages; i++)
+// {
+// final ServerMessage msg = new ServerMessageImpl(storageManager.generateUniqueID(), 1500);
+//
+// msg.getBodyBuffer().writeBytes(body);
+//
+// msg.setDestination(address);
+//
+// msg.setDurable(true);
+//
+// postOffice.route(msg);
+// }
+// }
+
private LinkedBlockingQueue<RouteBlastRunner> available = new LinkedBlockingQueue<RouteBlastRunner>();
+
private void runRouteBlast() throws Exception
{
log.info("*** running route blast");
- final int numThreads = 2;
+
+ final int numThreads = 1;
- final int numClients = 200;
+ final int numClients = 1000;
for (int i = 0; i < numClients; i++)
{
@@ -1499,7 +1546,7 @@
available.add(run);
}
-
+
log.info("setup, now running");
Set<Thread> runners = new HashSet<Thread>();
@@ -1518,43 +1565,39 @@
t.join();
}
}
-
- class Foo implements Runnable
- {
- public void run()
- {
- for (int i = 0; i < 1000000; i++)
- {
- try
- {
- RouteBlastRunner runner = available.take();
-
- runner.run();
- }
- catch (InterruptedException e)
- {
- log.error("Interrupted", e);
- }
- }
- }
- }
-
+
class RouteBlastRunner implements Runnable
{
private SimpleString address;
+ private Set<Consumer> consumers = new HashSet<Consumer>();
+
RouteBlastRunner(SimpleString address)
{
this.address = address;
}
+
+
public void setup() throws Exception
{
final int numQueues = 1;
for (int i = 0; i < numQueues; i++)
{
- createQueue(address, new SimpleString(address + ".hq.route_blast_queue" + i), null, true, false, true);
+ SimpleString queueName = new SimpleString(address + ".hq.route_blast_queue" + i);
+
+ createQueue(address, queueName, null, true, false, true);
+
+ Queue queue = (Queue)postOffice.getBinding(queueName).getBindable();
+
+ RBConsumer consumer = new RBConsumer(queue);
+
+ queue.addConsumer(consumer);
+
+ //log.info("added consumer to queue " + queue);
+
+ consumers.add(consumer);
}
}
@@ -1573,7 +1616,7 @@
msg.setDestination(address);
msg.setDurable(true);
-
+
postOffice.route(msg);
storageManager.afterCompleteOperations(new IOAsyncTask()
@@ -1584,7 +1627,7 @@
}
public void done()
- {
+ {
available.add(RouteBlastRunner.this);
}
});
@@ -1596,7 +1639,59 @@
}
}
+
+
+
+
+ class Foo implements Runnable
+ {
+ public void run()
+ {
+ for (int i = 0; i < 1000000; i++)
+ {
+ try
+ {
+ RouteBlastRunner runner = available.take();
+ runner.run();
+ }
+ catch (InterruptedException e)
+ {
+ log.error("Interrupted", e);
+ }
+ }
+ }
+ }
+
+ private class RBConsumer implements Consumer
+ {
+ private Queue queue;
+
+ RBConsumer(Queue queue)
+ {
+ this.queue = queue;
+ }
+
+ public Filter getFilter()
+ {
+ return null;
+ }
+
+ public HandleStatus handle(MessageReference reference) throws Exception
+ {
+ reference.handled();
+
+ queue.acknowledge(reference);
+
+ //log.info("acking");
+
+ return HandleStatus.HANDLED;
+ }
+
+ }
+
+
+
// Inner classes
// --------------------------------------------------------------------------------
}
Modified: trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java 2009-12-01 19:40:40 UTC (rev 8482)
+++ trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java 2009-12-01 20:18:47 UTC (rev 8483)
@@ -1227,7 +1227,7 @@
}
private synchronized boolean directDeliver(final MessageReference reference)
- {
+ {
if (paused || handlers.isEmpty())
{
return false;
@@ -1241,7 +1241,7 @@
MessageHandler handler = getHandlerRoundRobin();
Consumer consumer = handler.getConsumer();
-
+
if (!checkExpired(reference))
{
SimpleString groupID = reference.getMessage().getSimpleStringProperty(MessageImpl.HDR_GROUP_ID);
@@ -1303,7 +1303,7 @@
}
protected synchronized void add(final MessageReference ref, final boolean first)
- {
+ {
if (dontAdd)
{
return;
Modified: trunk/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java 2009-12-01 19:40:40 UTC (rev 8482)
+++ trunk/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java 2009-12-01 20:18:47 UTC (rev 8483)
@@ -61,7 +61,9 @@
*/
public ServerMessageImpl(final long messageID, final int initialMessageBufferSize)
{
- super(messageID, initialMessageBufferSize);
+ super(initialMessageBufferSize);
+
+ this.messageID = messageID;
}
protected ServerMessageImpl(final int initialMessageBufferSize)
Modified: trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2009-12-01 19:40:40 UTC (rev 8482)
+++ trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2009-12-01 20:18:47 UTC (rev 8483)
@@ -186,7 +186,7 @@
private final HornetQServer server;
private final SimpleString managementAddress;
-
+
// The current currentLargeMessage being processed
private volatile LargeServerMessage currentLargeMessage;
@@ -241,7 +241,7 @@
this.resourceManager = resourceManager;
this.securityStore = securityStore;
-
+
this.executor = executor;
if (!xa)
@@ -1455,52 +1455,11 @@
sendResponse(packet, null, false, false);
}
}
-
-
-
- public void handleSend2(final ServerMessage message)
- {
- try
- {
- long id = storageManager.generateUniqueID();
- message.setMessageID(id);
- message.encodeMessageIDToBuffer();
-
- if (message.getDestination().equals(managementAddress))
- {
- // It's a management message
-
- handleManagementMessage(message);
- }
- else
- {
- send(message);
- }
- }
- catch (Exception e)
- {
- log.error("Failed to send message", e);
-
- }
- finally
- {
- try
- {
- releaseOutStanding(message, message.getEncodeSize());
- }
- catch (Exception e)
- {
- log.error("Failed to release outstanding credits", e);
- }
- }
-
- }
-
public void handleSend(final SessionSendMessage packet)
{
Packet response = null;
-
+
ServerMessage message = (ServerMessage)packet.getMessage();
try
@@ -1553,8 +1512,8 @@
log.error("Failed to release outstanding credits", e);
}
}
-
- sendResponse(packet, response, false, false);
+
+ sendResponse(packet, response, false, false);
}
public void handleSendContinuations(final SessionSendContinuationMessage packet)
@@ -1638,7 +1597,7 @@
}
}
});
-
+
if (gotCredits > 0)
{
sendProducerCredits(holder, gotCredits, address);
@@ -1752,30 +1711,26 @@
// Private
// ----------------------------------------------------------------------------
- /**
- * Respond to client after replication
- * @param packet
- * @param response
- */
private void sendResponse(final Packet confirmPacket,
final Packet response,
final boolean flush,
final boolean closeChannel)
{
storageManager.afterCompleteOperations(new IOAsyncTask()
- {
+ {
public void onError(int errorCode, String errorMessage)
{
log.warn("Error processing IOCallback code = " + errorCode + " message = " + errorMessage);
- HornetQExceptionMessage exceptionMessage = new HornetQExceptionMessage(new HornetQException(errorCode, errorMessage));
-
- doSendResponse(confirmPacket, exceptionMessage, flush, closeChannel);
+ HornetQExceptionMessage exceptionMessage = new HornetQExceptionMessage(new HornetQException(errorCode,
+ errorMessage));
+
+ doConfirmAndResponse(confirmPacket, exceptionMessage, flush, closeChannel);
}
public void done()
{
- doSendResponse(confirmPacket, response, flush, closeChannel);
+ doConfirmAndResponse(confirmPacket, response, flush, closeChannel);
}
});
}
@@ -1786,11 +1741,11 @@
* @param flush
* @param closeChannel
*/
- private void doSendResponse(final Packet confirmPacket,
- final Packet response,
- final boolean flush,
- final boolean closeChannel)
- {
+ private void doConfirmAndResponse(final Packet confirmPacket,
+ final Packet response,
+ final boolean flush,
+ final boolean closeChannel)
+ {
if (confirmPacket != null)
{
channel.confirm(confirmPacket);
@@ -1802,7 +1757,7 @@
}
if (response != null)
- {
+ {
channel.send(response);
}
Modified: trunk/src/main/org/hornetq/utils/TokenBucketLimiterImpl.java
===================================================================
--- trunk/src/main/org/hornetq/utils/TokenBucketLimiterImpl.java 2009-12-01 19:40:40 UTC (rev 8482)
+++ trunk/src/main/org/hornetq/utils/TokenBucketLimiterImpl.java 2009-12-01 20:18:47 UTC (rev 8483)
@@ -53,17 +53,21 @@
{
while (!check())
{
- if (!spin)
+ if (spin)
{
- try
- {
- Thread.sleep(1);
- }
- catch (Exception e)
- {
- //Ignore
- }
+ Thread.yield();
}
+ else
+ {
+ try
+ {
+ Thread.sleep(1);
+ }
+ catch (Exception e)
+ {
+ //Ignore
+ }
+ }
}
}
Modified: trunk/tests/config/ConfigurationTest-full-config.xml
===================================================================
--- trunk/tests/config/ConfigurationTest-full-config.xml 2009-12-01 19:40:40 UTC (rev 8482)
+++ trunk/tests/config/ConfigurationTest-full-config.xml 2009-12-01 20:18:47 UTC (rev 8483)
@@ -38,8 +38,7 @@
<create-journal-dir>false</create-journal-dir>
<journal-type>NIO</journal-type>
<journal-compact-min-files>123</journal-compact-min-files>
- <journal-compact-percentage>33</journal-compact-percentage>
- <journal-flush-on-sync>true</journal-flush-on-sync>
+ <journal-compact-percentage>33</journal-compact-percentage>
<journal-buffer-timeout>1000</journal-buffer-timeout>
<journal-buffer-size>10000</journal-buffer-size>
<journal-sync-transactional>false</journal-sync-transactional>
Modified: trunk/tests/src/org/hornetq/tests/integration/client/BlockingSendTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/client/BlockingSendTest.java 2009-12-01 19:40:40 UTC (rev 8482)
+++ trunk/tests/src/org/hornetq/tests/integration/client/BlockingSendTest.java 2009-12-01 20:18:47 UTC (rev 8483)
@@ -51,7 +51,7 @@
{
server.getConfiguration().setJournalSyncNonTransactional(false);
- server.getConfiguration().setJournalBufferTimeout(15000);
+ server.getConfiguration().setJournalBufferTimeout_AIO(15000);
server.start();
Modified: trunk/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java 2009-12-01 19:40:40 UTC (rev 8482)
+++ trunk/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java 2009-12-01 20:18:47 UTC (rev 8483)
@@ -1138,10 +1138,9 @@
configuration.setSecurityEnabled(false);
configuration.setJournalMinFiles(2);
- configuration.setJournalMaxAIO(1000);
+ configuration.setJournalMaxIO_AIO(1000);
configuration.setJournalFileSize(100 * 1024);
configuration.setJournalType(JournalType.ASYNCIO);
- configuration.setJournalMaxAIO(1000);
configuration.setSharedStore(sharedStorage);
if (sharedStorage)
{
@@ -1254,7 +1253,7 @@
configuration.setJournalDirectory(getJournalDir(node, false));
configuration.setJournalFileSize(100 * 1024);
configuration.setJournalType(JournalType.ASYNCIO);
- configuration.setJournalMaxAIO(1000);
+ configuration.setJournalMaxIO_AIO(1000);
configuration.setPagingDirectory(getPageDir(node, false));
configuration.setLargeMessagesDirectory(getLargeMessagesDir(node, false));
configuration.setClustered(true);
Modified: trunk/tests/src/org/hornetq/tests/integration/cluster/failover/GroupingFailoverReplicationTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/cluster/failover/GroupingFailoverReplicationTest.java 2009-12-01 19:40:40 UTC (rev 8482)
+++ trunk/tests/src/org/hornetq/tests/integration/cluster/failover/GroupingFailoverReplicationTest.java 2009-12-01 20:18:47 UTC (rev 8483)
@@ -42,11 +42,10 @@
configuration.setSecurityEnabled(false);
configuration.setBindingsDirectory(getBindingsDir(node, false));
configuration.setJournalMinFiles(2);
- configuration.setJournalMaxAIO(1000);
+ configuration.setJournalMaxIO_AIO(1000);
configuration.setJournalDirectory(getJournalDir(node, false));
configuration.setJournalFileSize(100 * 1024);
configuration.setJournalType(JournalType.ASYNCIO);
- configuration.setJournalMaxAIO(1000);
configuration.setPagingDirectory(getPageDir(node, false));
configuration.setLargeMessagesDirectory(getLargeMessagesDir(node, false));
configuration.setClustered(true);
Modified: trunk/tests/src/org/hornetq/tests/integration/cluster/failover/GroupingFailoverSharedServerTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/cluster/failover/GroupingFailoverSharedServerTest.java 2009-12-01 19:40:40 UTC (rev 8482)
+++ trunk/tests/src/org/hornetq/tests/integration/cluster/failover/GroupingFailoverSharedServerTest.java 2009-12-01 20:18:47 UTC (rev 8483)
@@ -12,18 +12,15 @@
*/
package org.hornetq.tests.integration.cluster.failover;
-import org.hornetq.core.server.group.impl.GroupingHandlerConfiguration;
-import org.hornetq.core.server.JournalType;
-import org.hornetq.core.server.HornetQServer;
-import org.hornetq.core.server.HornetQ;
-import org.hornetq.core.message.impl.MessageImpl;
+import java.util.Map;
+
import org.hornetq.core.config.Configuration;
import org.hornetq.core.config.TransportConfiguration;
import org.hornetq.core.config.impl.ConfigurationImpl;
-import org.hornetq.utils.SimpleString;
+import org.hornetq.core.server.HornetQ;
+import org.hornetq.core.server.HornetQServer;
+import org.hornetq.core.server.JournalType;
-import java.util.Map;
-
/**
* @author <a href="mailto:andy.taylor@jboss.org">Andy Taylor</a>
* Created Oct 26, 2009
@@ -42,11 +39,10 @@
configuration.setSecurityEnabled(false);
configuration.setBindingsDirectory(getBindingsDir(backupNode, false));
configuration.setJournalMinFiles(2);
- configuration.setJournalMaxAIO(1000);
+ configuration.setJournalMaxIO_AIO(1000);
configuration.setJournalDirectory(getJournalDir(backupNode, false));
configuration.setJournalFileSize(100 * 1024);
configuration.setJournalType(JournalType.ASYNCIO);
- configuration.setJournalMaxAIO(1000);
configuration.setPagingDirectory(getPageDir(backupNode, false));
configuration.setLargeMessagesDirectory(getLargeMessagesDir(backupNode, false));
configuration.setClustered(true);
Modified: trunk/tests/src/org/hornetq/tests/integration/jms/cluster/JMSFailoverTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/jms/cluster/JMSFailoverTest.java 2009-12-01 19:40:40 UTC (rev 8482)
+++ trunk/tests/src/org/hornetq/tests/integration/jms/cluster/JMSFailoverTest.java 2009-12-01 20:18:47 UTC (rev 8483)
@@ -16,6 +16,7 @@
import java.util.HashMap;
import java.util.Map;
+import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.ExceptionListener;
@@ -41,6 +42,7 @@
import org.hornetq.jms.HornetQQueue;
import org.hornetq.jms.client.HornetQConnectionFactory;
import org.hornetq.jms.client.HornetQSession;
+import org.hornetq.tests.util.RandomUtil;
import org.hornetq.tests.util.UnitTestCase;
import org.hornetq.utils.SimpleString;
@@ -122,11 +124,15 @@
MessageConsumer consumer = sess.createConsumer(queue);
+ byte[] body = RandomUtil.randomBytes(bodySize);
+
for (int i = 0; i < numMessages; i++)
{
- TextMessage tm = sess.createTextMessage("message" + i);
+ BytesMessage bm = sess.createBytesMessage();
+
+ bm.writeBytes(body);
- producer.send(tm);
+ producer.send(bm);
}
conn.start();
@@ -143,11 +149,11 @@
{
log.info("got message " + i);
- TextMessage tm = (TextMessage)consumer.receive(1000);
+ BytesMessage bm = (BytesMessage)consumer.receive(1000);
- assertNotNull(tm);
+ assertNotNull(bm);
- assertEquals("message" + i, tm.getText());
+ assertEquals(body.length, bm.getBodyLength());
}
TextMessage tm = (TextMessage)consumer.receiveNoWait();
Modified: trunk/tests/src/org/hornetq/tests/integration/journal/AIOJournalCompactTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/journal/AIOJournalCompactTest.java 2009-12-01 19:40:40 UTC (rev 8482)
+++ trunk/tests/src/org/hornetq/tests/integration/journal/AIOJournalCompactTest.java 2009-12-01 20:18:47 UTC (rev 8483)
@@ -30,15 +30,12 @@
*
*/
public class AIOJournalCompactTest extends NIOJournalCompactTest
-{
-
-
+{
public static TestSuite suite()
{
return createAIOTestSuite(AIOJournalCompactTest.class);
}
-
// Constants -----------------------------------------------------
// Attributes ----------------------------------------------------
@@ -63,9 +60,8 @@
file.mkdir();
return new AIOSequentialFileFactory(getTestDir(),
- ConfigurationImpl.DEFAULT_JOURNAL_BUFFER_SIZE,
- 1000000,
- true,
+ ConfigurationImpl.DEFAULT_JOURNAL_BUFFER_SIZE_AIO,
+ 1000000,
false
);
}
Modified: trunk/tests/src/org/hornetq/tests/integration/journal/AIOJournalImplTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/journal/AIOJournalImplTest.java 2009-12-01 19:40:40 UTC (rev 8482)
+++ trunk/tests/src/org/hornetq/tests/integration/journal/AIOJournalImplTest.java 2009-12-01 20:18:47 UTC (rev 8483)
@@ -70,9 +70,8 @@
file.mkdir();
return new AIOSequentialFileFactory(getTestDir(),
- ConfigurationImpl.DEFAULT_JOURNAL_BUFFER_SIZE,
- 1000000,
- true,
+ ConfigurationImpl.DEFAULT_JOURNAL_BUFFER_SIZE_AIO,
+ 1000000,
false
);
}
Modified: trunk/tests/src/org/hornetq/tests/integration/journal/ValidateTransactionHealthTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/journal/ValidateTransactionHealthTest.java 2009-12-01 19:40:40 UTC (rev 8482)
+++ trunk/tests/src/org/hornetq/tests/integration/journal/ValidateTransactionHealthTest.java 2009-12-01 20:18:47 UTC (rev 8483)
@@ -388,9 +388,8 @@
if (factoryType.equals("aio"))
{
return new AIOSequentialFileFactory(directory,
- ConfigurationImpl.DEFAULT_JOURNAL_BUFFER_SIZE,
- ConfigurationImpl.DEFAULT_JOURNAL_BUFFER_TIMEOUT,
- ConfigurationImpl.DEFAULT_JOURNAL_FLUSH_SYNC,
+ ConfigurationImpl.DEFAULT_JOURNAL_BUFFER_SIZE_AIO,
+ ConfigurationImpl.DEFAULT_JOURNAL_BUFFER_TIMEOUT_AIO,
false);
}
else
Modified: trunk/tests/src/org/hornetq/tests/integration/management/HornetQServerControlTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/management/HornetQServerControlTest.java 2009-12-01 19:40:40 UTC (rev 8482)
+++ trunk/tests/src/org/hornetq/tests/integration/management/HornetQServerControlTest.java 2009-12-01 20:18:47 UTC (rev 8483)
@@ -106,9 +106,9 @@
assertEquals(conf.isJournalSyncNonTransactional(), serverControl.isJournalSyncNonTransactional());
assertEquals(conf.getJournalFileSize(), serverControl.getJournalFileSize());
assertEquals(conf.getJournalMinFiles(), serverControl.getJournalMinFiles());
- assertEquals(conf.getJournalMaxAIO(), serverControl.getJournalMaxAIO());
- assertEquals(conf.getJournalBufferSize(), serverControl.getAIOBufferSize());
- assertEquals(conf.getJournalBufferTimeout(), serverControl.getAIOBufferTimeout());
+ assertEquals(conf.getJournalMaxIO_AIO(), serverControl.getJournalMaxIO());
+ assertEquals(conf.getJournalBufferSize_AIO(), serverControl.getJournalBufferSize());
+ assertEquals(conf.getJournalBufferTimeout_AIO(), serverControl.getJournalBufferTimeout());
assertEquals(conf.isCreateBindingsDir(), serverControl.isCreateBindingsDir());
assertEquals(conf.isCreateJournalDir(), serverControl.isCreateJournalDir());
assertEquals(conf.getPagingDirectory(), serverControl.getPagingDirectory());
Modified: trunk/tests/src/org/hornetq/tests/integration/management/HornetQServerControlUsingCoreTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/management/HornetQServerControlUsingCoreTest.java 2009-12-01 19:40:40 UTC (rev 8482)
+++ trunk/tests/src/org/hornetq/tests/integration/management/HornetQServerControlUsingCoreTest.java 2009-12-01 20:18:47 UTC (rev 8483)
@@ -203,9 +203,9 @@
return (Integer)proxy.retrieveAttributeValue("journalFileSize");
}
- public int getJournalMaxAIO()
+ public int getJournalMaxIO()
{
- return (Integer)proxy.retrieveAttributeValue("journalMaxAIO");
+ return (Integer)proxy.retrieveAttributeValue("journalMaxIO");
}
public int getJournalMinFiles()
@@ -423,14 +423,14 @@
proxy.invokeOperation("setMessageCounterSamplePeriod", newPeriod);
}
- public int getAIOBufferSize()
+ public int getJournalBufferSize()
{
- return (Integer)proxy.retrieveAttributeValue("AIOBufferSize");
+ return (Integer)proxy.retrieveAttributeValue("JournalBufferSize");
}
- public int getAIOBufferTimeout()
+ public int getJournalBufferTimeout()
{
- return (Integer)proxy.retrieveAttributeValue("AIOBufferTimeout");
+ return (Integer)proxy.retrieveAttributeValue("JournalBufferTimeout");
}
public int getJournalCompactMinFiles()
Modified: trunk/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java 2009-12-01 19:40:40 UTC (rev 8482)
+++ trunk/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java 2009-12-01 20:18:47 UTC (rev 8483)
@@ -418,7 +418,7 @@
{
OperationContext ctx = OperationContextImpl.getContext(factory);
- ctx.lineUp();
+ ctx.storeLineUp();
String msg = "I'm an exception";
@@ -1076,5 +1076,11 @@
{
}
+ public void runDirectJournalBlast() throws Exception
+ {
+ // TODO Auto-generated method stub
+
+ }
+
}
}
Modified: trunk/tests/src/org/hornetq/tests/opt/SendTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/opt/SendTest.java 2009-12-01 19:40:40 UTC (rev 8482)
+++ trunk/tests/src/org/hornetq/tests/opt/SendTest.java 2009-12-01 20:18:47 UTC (rev 8483)
@@ -74,9 +74,10 @@
{
log.info("*** Starting server");
- System.setProperty("org.hornetq.opt.dontadd", "true");
- System.setProperty("org.hornetq.opt.routeblast", "true");
+ //System.setProperty("org.hornetq.opt.dontadd", "true");
+ // System.setProperty("org.hornetq.opt.routeblast", "true");
//System.setProperty("org.hornetq.opt.generatemessages", "true");
+ System.setProperty("org.hornetq.opt.directblast", "true");
Configuration configuration = new ConfigurationImpl();
configuration.setSecurityEnabled(false);
@@ -91,9 +92,19 @@
configuration.setJournalType(JournalType.ASYNCIO);
+ configuration.setJournalBufferTimeout_NIO(1000000000 / 100); // this is in nanoseconds
+ configuration.setJournalBufferSize_NIO(490 * 1024);
+ configuration.setJournalMaxIO_NIO(1);
+
+ configuration.setJournalBufferTimeout_AIO(1000000000 / 1000); // this is in nanoseconds
+ configuration.setJournalBufferSize_AIO(490 * 1024);
+ configuration.setJournalMaxIO_AIO(500);
+
configuration.setLogJournalWriteRate(true);
- //configuration.setRunSyncSpeedTest(true);
+
+ // configuration.setRunSyncSpeedTest(true);
+
Map<String, Object> params = new HashMap<String, Object>();
params.put(TransportConstants.USE_NIO_PROP_NAME, Boolean.FALSE);
Modified: trunk/tests/src/org/hornetq/tests/unit/core/config/impl/ConfigurationImplTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/config/impl/ConfigurationImplTest.java 2009-12-01 19:40:40 UTC (rev 8482)
+++ trunk/tests/src/org/hornetq/tests/unit/core/config/impl/ConfigurationImplTest.java 2009-12-01 20:18:47 UTC (rev 8483)
@@ -59,7 +59,8 @@
assertEquals(ConfigurationImpl.DEFAULT_JOURNAL_SYNC_NON_TRANSACTIONAL, conf.isJournalSyncNonTransactional());
assertEquals(ConfigurationImpl.DEFAULT_JOURNAL_FILE_SIZE, conf.getJournalFileSize());
assertEquals(ConfigurationImpl.DEFAULT_JOURNAL_MIN_FILES, conf.getJournalMinFiles());
- assertEquals(ConfigurationImpl.DEFAULT_JOURNAL_MAX_AIO, conf.getJournalMaxAIO());
+ assertEquals(ConfigurationImpl.DEFAULT_JOURNAL_MAX_IO_AIO, conf.getJournalMaxIO_AIO());
+ assertEquals(ConfigurationImpl.DEFAULT_JOURNAL_MAX_IO_NIO, conf.getJournalMaxIO_NIO());
assertEquals(ConfigurationImpl.DEFAULT_WILDCARD_ROUTING_ENABLED, conf.isWildcardRoutingEnabled());
assertEquals(ConfigurationImpl.DEFAULT_TRANSACTION_TIMEOUT, conf.getTransactionTimeout());
assertEquals(ConfigurationImpl.DEFAULT_MESSAGE_EXPIRY_SCAN_PERIOD, conf.getMessageExpiryScanPeriod()); // OK
@@ -82,8 +83,10 @@
assertEquals(ConfigurationImpl.DEFAULT_PAGING_DIR, conf.getPagingDirectory());
assertEquals(ConfigurationImpl.DEFAULT_LARGE_MESSAGES_DIR, conf.getLargeMessagesDirectory());
assertEquals(ConfigurationImpl.DEFAULT_JOURNAL_COMPACT_PERCENTAGE, conf.getJournalCompactPercentage());
- assertEquals(ConfigurationImpl.DEFAULT_JOURNAL_FLUSH_SYNC, conf.isJournalFlushOnSync());
- assertEquals(ConfigurationImpl.DEFAULT_JOURNAL_BUFFER_TIMEOUT, conf.getJournalBufferTimeout());
+ assertEquals(ConfigurationImpl.DEFAULT_JOURNAL_BUFFER_TIMEOUT_AIO, conf.getJournalBufferTimeout_AIO());
+ assertEquals(ConfigurationImpl.DEFAULT_JOURNAL_BUFFER_TIMEOUT_NIO, conf.getJournalBufferTimeout_NIO());
+ assertEquals(ConfigurationImpl.DEFAULT_JOURNAL_BUFFER_SIZE_AIO, conf.getJournalBufferSize_AIO());
+ assertEquals(ConfigurationImpl.DEFAULT_JOURNAL_BUFFER_SIZE_NIO, conf.getJournalBufferSize_NIO());
assertEquals(ConfigurationImpl.DEFAULT_JOURNAL_LOG_WRITE_RATE, conf.isLogJournalWriteRate());
assertEquals(ConfigurationImpl.DEFAULT_JOURNAL_PERF_BLAST_PAGES, conf.getJournalPerfBlastPages());
assertEquals(ConfigurationImpl.DEFAULT_MESSAGE_COUNTER_ENABLED, conf.isMessageCounterEnabled());
@@ -164,8 +167,12 @@
assertEquals(i, conf.getJournalMinFiles());
i = randomInt();
- conf.setJournalMaxAIO(i);
- assertEquals(i, conf.getJournalMaxAIO());
+ conf.setJournalMaxIO_AIO(i);
+ assertEquals(i, conf.getJournalMaxIO_AIO());
+
+ i = randomInt();
+ conf.setJournalMaxIO_NIO(i);
+ assertEquals(i, conf.getJournalMaxIO_NIO());
s = randomString();
conf.setManagementAddress(new SimpleString(s));
@@ -244,16 +251,20 @@
assertEquals(i, conf.getJournalCompactPercentage());
i = randomInt();
- conf.setJournalBufferSize(i);
- assertEquals(i, conf.getJournalBufferSize());
+ conf.setJournalBufferSize_AIO(i);
+ assertEquals(i, conf.getJournalBufferSize_AIO());
i = randomInt();
- conf.setJournalBufferTimeout(i);
- assertEquals(i, conf.getJournalBufferTimeout());
+ conf.setJournalBufferTimeout_AIO(i);
+ assertEquals(i, conf.getJournalBufferTimeout_AIO());
+
+ i = randomInt();
+ conf.setJournalBufferSize_NIO(i);
+ assertEquals(i, conf.getJournalBufferSize_NIO());
- b = randomBoolean();
- conf.setJournalFlushOnSync(b);
- assertEquals(b, conf.isJournalFlushOnSync());
+ i = randomInt();
+ conf.setJournalBufferTimeout_NIO(i);
+ assertEquals(i, conf.getJournalBufferTimeout_NIO());
b = randomBoolean();
conf.setLogJournalWriteRate(b);
@@ -381,8 +392,12 @@
assertEquals(i, conf.getJournalMinFiles());
i = randomInt();
- conf.setJournalMaxAIO(i);
- assertEquals(i, conf.getJournalMaxAIO());
+ conf.setJournalMaxIO_AIO(i);
+ assertEquals(i, conf.getJournalMaxIO_AIO());
+
+ i = randomInt();
+ conf.setJournalMaxIO_NIO(i);
+ assertEquals(i, conf.getJournalMaxIO_NIO());
s = randomString();
conf.setManagementAddress(new SimpleString(s));
@@ -461,16 +476,20 @@
assertEquals(i, conf.getJournalCompactPercentage());
i = randomInt();
- conf.setJournalBufferSize(i);
- assertEquals(i, conf.getJournalBufferSize());
+ conf.setJournalBufferSize_AIO(i);
+ assertEquals(i, conf.getJournalBufferSize_AIO());
i = randomInt();
- conf.setJournalBufferTimeout(i);
- assertEquals(i, conf.getJournalBufferTimeout());
+ conf.setJournalBufferTimeout_AIO(i);
+ assertEquals(i, conf.getJournalBufferTimeout_AIO());
+
+ i = randomInt();
+ conf.setJournalBufferSize_NIO(i);
+ assertEquals(i, conf.getJournalBufferSize_NIO());
- b = randomBoolean();
- conf.setJournalFlushOnSync(b);
- assertEquals(b, conf.isJournalFlushOnSync());
+ i = randomInt();
+ conf.setJournalBufferTimeout_NIO(i);
+ assertEquals(i, conf.getJournalBufferTimeout_NIO());
b = randomBoolean();
conf.setLogJournalWriteRate(b);
Modified: trunk/tests/src/org/hornetq/tests/unit/core/config/impl/DefaultsFileConfigurationTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/config/impl/DefaultsFileConfigurationTest.java 2009-12-01 19:40:40 UTC (rev 8482)
+++ trunk/tests/src/org/hornetq/tests/unit/core/config/impl/DefaultsFileConfigurationTest.java 2009-12-01 20:18:47 UTC (rev 8483)
@@ -105,12 +105,18 @@
assertEquals(ConfigurationImpl.DEFAULT_JOURNAL_MIN_FILES, conf.getJournalMinFiles());
- assertEquals(ConfigurationImpl.DEFAULT_JOURNAL_MAX_AIO, conf.getJournalMaxAIO());
+ assertEquals(ConfigurationImpl.DEFAULT_JOURNAL_MAX_IO_AIO, conf.getJournalMaxIO_AIO());
- assertEquals(ConfigurationImpl.DEFAULT_JOURNAL_BUFFER_TIMEOUT, conf.getJournalBufferTimeout());
+ assertEquals(ConfigurationImpl.DEFAULT_JOURNAL_BUFFER_TIMEOUT_AIO, conf.getJournalBufferTimeout_AIO());
- assertEquals(ConfigurationImpl.DEFAULT_JOURNAL_BUFFER_SIZE, conf.getJournalBufferSize());
+ assertEquals(ConfigurationImpl.DEFAULT_JOURNAL_BUFFER_SIZE_AIO, conf.getJournalBufferSize_AIO());
+ assertEquals(ConfigurationImpl.DEFAULT_JOURNAL_MAX_IO_NIO, conf.getJournalMaxIO_NIO());
+
+ assertEquals(ConfigurationImpl.DEFAULT_JOURNAL_BUFFER_TIMEOUT_NIO, conf.getJournalBufferTimeout_NIO());
+
+ assertEquals(ConfigurationImpl.DEFAULT_JOURNAL_BUFFER_SIZE_NIO, conf.getJournalBufferSize_NIO());
+
assertEquals(ConfigurationImpl.DEFAULT_CREATE_BINDINGS_DIR, conf.isCreateBindingsDir());
assertEquals(ConfigurationImpl.DEFAULT_CREATE_JOURNAL_DIR, conf.isCreateJournalDir());
Modified: trunk/tests/src/org/hornetq/tests/unit/core/config/impl/FileConfigurationTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/config/impl/FileConfigurationTest.java 2009-12-01 19:40:40 UTC (rev 8482)
+++ trunk/tests/src/org/hornetq/tests/unit/core/config/impl/FileConfigurationTest.java 2009-12-01 20:18:47 UTC (rev 8483)
@@ -69,17 +69,19 @@
assertEquals(false, conf.isCreateBindingsDir());
assertEquals("somedir2", conf.getJournalDirectory());
assertEquals(false, conf.isCreateJournalDir());
+
assertEquals(JournalType.NIO, conf.getJournalType());
- assertEquals(10000, conf.getJournalBufferSize());
- assertEquals(true, conf.isJournalFlushOnSync());
- assertEquals(1000, conf.getJournalBufferTimeout());
+ assertEquals(10000, conf.getJournalBufferSize_NIO());
+ assertEquals(1000, conf.getJournalBufferTimeout_NIO());
+ assertEquals(56546, conf.getJournalMaxIO_NIO());
+
assertEquals(false, conf.isJournalSyncTransactional());
assertEquals(true, conf.isJournalSyncNonTransactional());
assertEquals(12345678, conf.getJournalFileSize());
assertEquals(100, conf.getJournalMinFiles());
assertEquals(123, conf.getJournalCompactMinFiles());
assertEquals(33, conf.getJournalCompactPercentage());
- assertEquals(56546, conf.getJournalMaxAIO());
+
assertEquals("largemessagesdir", conf.getLargeMessagesDirectory());
assertEquals(95, conf.getMemoryWarningThreshold());
assertEquals(1024, conf.getBackupWindowSize());
Modified: trunk/tests/src/org/hornetq/tests/unit/core/deployers/impl/QueueDeployerTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/deployers/impl/QueueDeployerTest.java 2009-12-01 19:40:40 UTC (rev 8482)
+++ trunk/tests/src/org/hornetq/tests/unit/core/deployers/impl/QueueDeployerTest.java 2009-12-01 20:18:47 UTC (rev 8483)
@@ -275,7 +275,7 @@
return 0;
}
- public int getJournalMaxAIO()
+ public int getJournalMaxIO()
{
return 0;
@@ -526,13 +526,13 @@
}
- public int getAIOBufferSize()
+ public int getJournalBufferSize()
{
return 0;
}
- public int getAIOBufferTimeout()
+ public int getJournalBufferTimeout()
{
return 0;
}
Modified: trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/TimedBufferTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/TimedBufferTest.java 2009-12-01 19:40:40 UTC (rev 8482)
+++ trunk/tests/src/org/hornetq/tests/unit/core/journal/impl/TimedBufferTest.java 2009-12-01 20:18:47 UTC (rev 8483)
@@ -87,7 +87,7 @@
}
}
- TimedBuffer timedBuffer = new TimedBuffer(100, 3600 * 1000, false, false); // Any big timeout
+ TimedBuffer timedBuffer = new TimedBuffer(100, 3600 * 1000, false); // Any big timeout
timedBuffer.setObserver(new TestObserver());
@@ -106,8 +106,10 @@
timedBuffer.addBytes(buff, false, dummyCallback);
}
+ timedBuffer.checkSize(1);
+
assertEquals(1, flushTimes.get());
-
+
ByteBuffer flushedBuffer = buffers.get(0);
assertEquals(100, flushedBuffer.limit());
Modified: trunk/tests/src/org/hornetq/tests/util/ListJournal.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/util/ListJournal.java 2009-12-01 19:40:40 UTC (rev 8482)
+++ trunk/tests/src/org/hornetq/tests/util/ListJournal.java 2009-12-01 20:18:47 UTC (rev 8483)
@@ -63,7 +63,7 @@
new NIOSequentialFileFactory(fileConf.getJournalDirectory()),
"hornetq-data",
"hq",
- fileConf.getJournalMaxAIO());
+ fileConf.getJournalMaxIO_NIO());
ArrayList<RecordInfo> records = new ArrayList<RecordInfo>();
ArrayList<PreparedTransactionInfo> prepared = new ArrayList<PreparedTransactionInfo>();
15 years, 1 month
JBoss hornetq SVN: r8482 - trunk/src/main/org/hornetq/core/client/impl.
by do-not-reply@jboss.org
Author: jmesnil
Date: 2009-12-01 14:40:40 -0500 (Tue, 01 Dec 2009)
New Revision: 8482
Modified:
trunk/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
Log:
failover issue
* fixed retry loop by rethrowing the exception if it is not a SESSION_CREATION_REJECTED
* added a sleep of 10ms before retry to send the request to avoid spinning too much
Modified: trunk/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java 2009-12-01 17:44:47 UTC (rev 8481)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java 2009-12-01 19:40:40 UTC (rev 8482)
@@ -839,7 +839,13 @@
{
log.warn("Server is starting, retry to create the session " + name);
retry = true;
+ // sleep a little bit to avoid spinning too much
+ Thread.sleep(10);
}
+ else
+ {
+ throw e;
+ }
}
} while(retry);
15 years, 1 month
JBoss hornetq SVN: r8481 - trunk/src/main/org/hornetq/core/client/impl.
by do-not-reply@jboss.org
Author: jmesnil
Date: 2009-12-01 12:44:47 -0500 (Tue, 01 Dec 2009)
New Revision: 8481
Modified:
trunk/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
Log:
fixed previous commit
* if a SESSION_CREATION_REJECTED was thrown, the do block was looping for ever...
Modified: trunk/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java 2009-12-01 16:59:49 UTC (rev 8480)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java 2009-12-01 17:44:47 UTC (rev 8481)
@@ -830,19 +830,20 @@
try
{
channel1.sendBlocking(createRequest);
+ retry = false;
}
catch(HornetQException e)
{
// the session was created while its server was starting, retry it:
if (e.getCode() == HornetQException.SESSION_CREATION_REJECTED)
{
- log.warn("Server is starting, retry to create the session");
-
+ log.warn("Server is starting, retry to create the session " + name);
retry = true;
- continue;
}
}
} while(retry);
+
+ log.info("created session " + name);
channel.clearCommands();
15 years, 1 month
JBoss hornetq SVN: r8480 - in trunk/src/main/org/hornetq/core: server/impl and 1 other directory.
by do-not-reply@jboss.org
Author: jmesnil
Date: 2009-12-01 11:59:49 -0500 (Tue, 01 Dec 2009)
New Revision: 8480
Modified:
trunk/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
Log:
failover issues
* in HornetQServerImpl.return a false ReattachSessionResponseMessage is the server is not started. Returning null was throwing a swallowed NPE
and the (blocking) client was not receiving any response
* in ClientSessionImple.handleFailover(), retry sending the CreateSessionMessage in case the server is accepting connections but not properly
started (thus refusing to create the session)
Modified: trunk/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java 2009-12-01 16:36:31 UTC (rev 8479)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java 2009-12-01 16:59:49 UTC (rev 8480)
@@ -824,8 +824,25 @@
autoCommitAcks,
preAcknowledge,
confirmationWindowSize);
+ boolean retry = false;
+ do
+ {
+ try
+ {
+ channel1.sendBlocking(createRequest);
+ }
+ catch(HornetQException e)
+ {
+ // the session was created while its server was starting, retry it:
+ if (e.getCode() == HornetQException.SESSION_CREATION_REJECTED)
+ {
+ log.warn("Server is starting, retry to create the session");
- channel1.sendBlocking(createRequest);
+ retry = true;
+ continue;
+ }
+ }
+ } while(retry);
channel.clearCommands();
Modified: trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2009-12-01 16:36:31 UTC (rev 8479)
+++ trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2009-12-01 16:59:49 UTC (rev 8480)
@@ -554,7 +554,7 @@
{
if (!started)
{
- return null;
+ return new ReattachSessionResponseMessage(-1, false);
}
ServerSession session = sessions.get(name);
15 years, 1 month
JBoss hornetq SVN: r8479 - trunk/src/main/org/hornetq/core/persistence/impl/journal.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2009-12-01 11:36:31 -0500 (Tue, 01 Dec 2009)
New Revision: 8479
Modified:
trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
Log:
Ignoring callbacks and lineups on sync=false operations
Modified: trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2009-12-01 16:33:35 UTC (rev 8478)
+++ trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2009-12-01 16:36:31 UTC (rev 8479)
@@ -37,7 +37,6 @@
import org.hornetq.core.filter.Filter;
import org.hornetq.core.journal.EncodingSupport;
import org.hornetq.core.journal.IOAsyncTask;
-import org.hornetq.core.journal.IOCompletion;
import org.hornetq.core.journal.Journal;
import org.hornetq.core.journal.JournalLoadInformation;
import org.hornetq.core.journal.PreparedTransactionInfo;
@@ -492,28 +491,28 @@
messageJournal.appendAddRecord(message.getMessageID(),
ADD_LARGE_MESSAGE,
new LargeMessageEncoding((LargeServerMessage)message),
- false, getContext());
+ false, getContext(false));
}
else
{
- messageJournal.appendAddRecord(message.getMessageID(), ADD_MESSAGE, message, false, getContext());
+ messageJournal.appendAddRecord(message.getMessageID(), ADD_MESSAGE, message, false, getContext(false));
}
}
public void storeReference(final long queueID, final long messageID, final boolean last) throws Exception
{
- messageJournal.appendUpdateRecord(messageID, ADD_REF, new RefEncoding(queueID), last && syncNonTransactional, getContext());
+ messageJournal.appendUpdateRecord(messageID, ADD_REF, new RefEncoding(queueID), last && syncNonTransactional, getContext(syncNonTransactional));
}
public void storeAcknowledge(final long queueID, final long messageID) throws Exception
{
- messageJournal.appendUpdateRecord(messageID, ACKNOWLEDGE_REF, new RefEncoding(queueID), syncNonTransactional, getContext());
+ messageJournal.appendUpdateRecord(messageID, ACKNOWLEDGE_REF, new RefEncoding(queueID), syncNonTransactional, getContext(syncNonTransactional));
}
public void deleteMessage(final long messageID) throws Exception
{
- messageJournal.appendDeleteRecord(messageID, syncNonTransactional, getContext());
+ messageJournal.appendDeleteRecord(messageID, syncNonTransactional, getContext(syncNonTransactional));
}
public void updateScheduledDeliveryTime(final MessageReference ref) throws Exception
@@ -524,19 +523,19 @@
messageJournal.appendUpdateRecord(ref.getMessage().getMessageID(),
SET_SCHEDULED_DELIVERY_TIME,
encoding,
- syncNonTransactional, getContext());
+ syncNonTransactional, getContext(syncNonTransactional));
}
public void storeDuplicateID(final SimpleString address, final byte[] duplID, final long recordID) throws Exception
{
DuplicateIDEncoding encoding = new DuplicateIDEncoding(address, duplID);
- messageJournal.appendAddRecord(recordID, DUPLICATE_ID, encoding, syncNonTransactional, getContext());
+ messageJournal.appendAddRecord(recordID, DUPLICATE_ID, encoding, syncNonTransactional, getContext(syncNonTransactional));
}
public void deleteDuplicateID(long recordID) throws Exception
{
- messageJournal.appendDeleteRecord(recordID, syncNonTransactional, getContext());
+ messageJournal.appendDeleteRecord(recordID, syncNonTransactional, getContext(syncNonTransactional));
}
// Transactional operations
@@ -592,13 +591,13 @@
public long storeHeuristicCompletion(Xid xid, boolean isCommit) throws Exception
{
long id = generateUniqueID();
- messageJournal.appendAddRecord(id, HEURISTIC_COMPLETION, new HeuristicCompletionEncoding(xid, isCommit), true, getContext());
+ messageJournal.appendAddRecord(id, HEURISTIC_COMPLETION, new HeuristicCompletionEncoding(xid, isCommit), true, getContext(true));
return id;
}
public void deleteHeuristicCompletion(long id) throws Exception
{
- messageJournal.appendDeleteRecord(id, true, getContext());
+ messageJournal.appendDeleteRecord(id, true, getContext(true));
}
public void deletePageTransactional(final long txID, final long recordID) throws Exception
@@ -624,17 +623,17 @@
public void prepare(final long txID, final Xid xid) throws Exception
{
- messageJournal.appendPrepareRecord(txID, new XidEncoding(xid), syncTransactional, getContext());
+ messageJournal.appendPrepareRecord(txID, new XidEncoding(xid), syncTransactional, getContext(syncTransactional));
}
public void commit(final long txID) throws Exception
{
- messageJournal.appendCommitRecord(txID, syncTransactional, getContext());
+ messageJournal.appendCommitRecord(txID, syncTransactional, getContext(syncTransactional));
}
public void rollback(final long txID) throws Exception
{
- messageJournal.appendRollbackRecord(txID, syncTransactional, getContext());
+ messageJournal.appendRollbackRecord(txID, syncTransactional, getContext(syncTransactional));
}
public void storeDuplicateIDTransactional(final long txID,
@@ -672,7 +671,7 @@
messageJournal.appendUpdateRecord(ref.getMessage().getMessageID(),
UPDATE_DELIVERY_COUNT,
updateInfo,
- syncNonTransactional, getContext());
+ syncNonTransactional, getContext(syncNonTransactional));
}
private static final class AddMessageRecord
@@ -1454,9 +1453,85 @@
}
}
+ private OperationContext getContext(final boolean sync)
+ {
+ if (sync)
+ {
+ return getContext();
+ }
+ else
+ {
+ return DummyOperationContext.getInstance();
+ }
+ }
+
+
+
// Inner Classes
// ----------------------------------------------------------------------------
+
+ static class DummyOperationContext implements OperationContext
+ {
+
+ private static DummyOperationContext instance = new DummyOperationContext();
+
+ public static OperationContext getInstance()
+ {
+ return instance;
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.persistence.OperationContext#complete()
+ */
+ public void complete()
+ {
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.persistence.OperationContext#executeOnCompletion(org.hornetq.core.journal.IOAsyncTask)
+ */
+ public void executeOnCompletion(IOAsyncTask runnable)
+ {
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.persistence.OperationContext#replicationDone()
+ */
+ public void replicationDone()
+ {
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.persistence.OperationContext#replicationLineUp()
+ */
+ public void replicationLineUp()
+ {
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.journal.IOCompletion#lineUp()
+ */
+ public void lineUp()
+ {
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.asyncio.AIOCallback#done()
+ */
+ public void done()
+ {
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.asyncio.AIOCallback#onError(int, java.lang.String)
+ */
+ public void onError(int errorCode, String errorMessage)
+ {
+ }
+
+ }
+
private static class XidEncoding implements EncodingSupport
{
final Xid xid;
15 years, 1 month
JBoss hornetq SVN: r8478 - trunk/tests/src/org/hornetq/tests/integration/client.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2009-12-01 11:33:35 -0500 (Tue, 01 Dec 2009)
New Revision: 8478
Added:
trunk/tests/src/org/hornetq/tests/integration/client/BlockingSendTest.java
Log:
Added: trunk/tests/src/org/hornetq/tests/integration/client/BlockingSendTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/client/BlockingSendTest.java (rev 0)
+++ trunk/tests/src/org/hornetq/tests/integration/client/BlockingSendTest.java 2009-12-01 16:33:35 UTC (rev 8478)
@@ -0,0 +1,112 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.integration.client;
+
+import org.hornetq.core.client.ClientConsumer;
+import org.hornetq.core.client.ClientMessage;
+import org.hornetq.core.client.ClientProducer;
+import org.hornetq.core.client.ClientSession;
+import org.hornetq.core.client.ClientSessionFactory;
+import org.hornetq.core.server.HornetQServer;
+import org.hornetq.tests.util.ServiceTestBase;
+
+/**
+ * A BlockingSendTest
+ *
+ * @author <mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public class BlockingSendTest extends ServiceTestBase
+{
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ public void testSinglePersistentBlockingNonSync() throws Exception
+ {
+ HornetQServer server = createServer(true);
+ ClientSession session = null;
+ ClientSessionFactory factory = null;
+
+ try
+ {
+
+ server.getConfiguration().setJournalSyncNonTransactional(false);
+ server.getConfiguration().setJournalBufferTimeout(15000);
+
+ server.start();
+
+ System.out.println("sync = " + server.getConfiguration().isJournalSyncNonTransactional());
+
+ factory = createFactory(false);
+ factory.setBlockOnPersistentSend(true);
+
+ session = factory.createSession();
+
+ session.createQueue("address", "queue");
+
+ ClientProducer prod = session.createProducer("address");
+
+ ClientMessage message = session.createClientMessage(true);
+
+ prod.send(message);
+
+ ClientConsumer consumer = session.createConsumer("queue");
+
+ session.start();
+
+ ClientMessage msg = consumer.receive(5000);
+
+ assertNotNull(msg);
+
+ msg.acknowledge();
+
+ }
+ finally
+ {
+ if (factory != null)
+ {
+ factory.close();
+ }
+
+ if (session != null)
+ {
+ session.close();
+ }
+
+ if (server != null)
+ {
+ server.stop();
+ }
+ }
+
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
15 years, 1 month
JBoss hornetq SVN: r8477 - in trunk: docs/user-manual/en and 26 other directories.
by do-not-reply@jboss.org
Author: ataylor
Date: 2009-12-01 04:01:01 -0500 (Tue, 01 Dec 2009)
New Revision: 8477
Added:
trunk/examples/jms/message-group2/
trunk/examples/jms/message-group2/build.bat
trunk/examples/jms/message-group2/build.sh
trunk/examples/jms/message-group2/build.xml
trunk/examples/jms/message-group2/readme.html
trunk/examples/jms/message-group2/server0/
trunk/examples/jms/message-group2/server0/client-jndi.properties
trunk/examples/jms/message-group2/server0/hornetq-beans.xml
trunk/examples/jms/message-group2/server0/hornetq-configuration.xml
trunk/examples/jms/message-group2/server0/hornetq-jms.xml
trunk/examples/jms/message-group2/server0/hornetq-users.xml
trunk/examples/jms/message-group2/src/
trunk/examples/jms/message-group2/src/org/
trunk/examples/jms/message-group2/src/org/hornetq/
trunk/examples/jms/message-group2/src/org/hornetq/jms/
trunk/examples/jms/message-group2/src/org/hornetq/jms/example/
trunk/examples/jms/message-group2/src/org/hornetq/jms/example/MessageGroup2Example.java
trunk/tests/src/org/hornetq/tests/integration/client/MessageGroupingConnectionFactoryTest.java
Modified:
trunk/docs/user-manual/en/message-grouping.xml
trunk/examples/jms/hornetq-jms-examples.iml
trunk/hornetq.iws
trunk/src/config/common/schema/hornetq-jms.xsd
trunk/src/main/org/hornetq/core/client/ClientSessionFactory.java
trunk/src/main/org/hornetq/core/client/impl/ClientProducerImpl.java
trunk/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
trunk/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
trunk/src/main/org/hornetq/core/client/impl/FailoverManager.java
trunk/src/main/org/hornetq/core/client/impl/FailoverManagerImpl.java
trunk/src/main/org/hornetq/jms/client/HornetQConnectionFactory.java
trunk/src/main/org/hornetq/jms/server/JMSServerManager.java
trunk/src/main/org/hornetq/jms/server/config/ConnectionFactoryConfiguration.java
trunk/src/main/org/hornetq/jms/server/config/impl/ConnectionFactoryConfigurationImpl.java
trunk/src/main/org/hornetq/jms/server/impl/JMSServerDeployer.java
trunk/src/main/org/hornetq/jms/server/impl/JMSServerManagerImpl.java
trunk/src/main/org/hornetq/jms/server/management/JMSServerControl.java
trunk/src/main/org/hornetq/jms/server/management/impl/JMSServerControlImpl.java
trunk/tests/jms-tests/src/org/hornetq/jms/tests/CTSMiscellaneousTest.java
trunk/tests/jms-tests/src/org/hornetq/jms/tests/JMSTestCase.java
trunk/tests/jms-tests/src/org/hornetq/jms/tests/tools/container/LocalTestServer.java
trunk/tests/src/org/hornetq/tests/integration/client/MessageGroupingTest.java
trunk/tests/src/org/hornetq/tests/integration/jms/FloodServerTest.java
trunk/tests/src/org/hornetq/tests/integration/jms/client/PreACKJMSTest.java
trunk/tests/src/org/hornetq/tests/integration/jms/client/ReSendLargeMessageTest.java
trunk/tests/src/org/hornetq/tests/integration/jms/client/SessionClosedOnRemotingConnectionFailureTest.java
trunk/tests/src/org/hornetq/tests/integration/jms/client/TextMessageTest.java
trunk/tests/src/org/hornetq/tests/integration/jms/divert/DivertAndACKClientTest.java
trunk/tests/src/org/hornetq/tests/integration/jms/server/management/JMSServerControlTest.java
trunk/tests/src/org/hornetq/tests/integration/jms/server/management/JMSServerControlUsingJMSTest.java
trunk/tests/src/org/hornetq/tests/util/JMSTestBase.java
Log:
https://jira.jboss.org/jira/browse/HORNETQ-229 - added the ability to control grouping at the connection factry level
Modified: trunk/docs/user-manual/en/message-grouping.xml
===================================================================
--- trunk/docs/user-manual/en/message-grouping.xml 2009-12-01 08:55:10 UTC (rev 8476)
+++ trunk/docs/user-manual/en/message-grouping.xml 2009-12-01 09:01:01 UTC (rev 8477)
@@ -71,6 +71,22 @@
</entries>
<autogroup>true</autogroup>
</connection-factory></programlisting>
+ <para>Alternatively you can set the group id via the connection factory. All messages sent with producers created
+ via this connection factory will set the <literal>JMSXGroupID</literal> to the specified value on all messages
+ sent. To configure the group id set it on the connection factory in the <literal>hornetq-jms.xml</literal>
+ config file as follows
+ <programlisting>
+ <connection-factory name="ConnectionFactory">
+ <connectors>
+ <connector-ref connector-name="netty-connector"/>
+ </connectors>
+ <entries>
+ <entry name="ConnectionFactory"/>
+ </entries>
+ <group-id>Group-0</group-id>
+ </connection-factory>
+ </programlisting>
+ </para>
</section>
<section>
<title>Example</title>
@@ -78,6 +94,11 @@
groups are configured and used with JMS.</para>
</section>
<section>
+ <title>Example</title>
+ <para>See <xref linkend="examples.message-group2"/> for an example which shows how message
+ groups are configured via a connection factory.</para>
+ </section>
+ <section>
<title> Clustered Grouping</title>
<para>Using the Grouping function in a cluster is a bit more complex. This is because messages
with a particular group id can arrive on any node so each node needs to know about which
Modified: trunk/examples/jms/hornetq-jms-examples.iml
===================================================================
--- trunk/examples/jms/hornetq-jms-examples.iml 2009-12-01 08:55:10 UTC (rev 8476)
+++ trunk/examples/jms/hornetq-jms-examples.iml 2009-12-01 09:01:01 UTC (rev 8477)
@@ -32,6 +32,7 @@
<sourceFolder url="file://$MODULE_DIR$/management/src" isTestSource="false" />
<sourceFolder url="file://$MODULE_DIR$/message-counters/src" isTestSource="false" />
<sourceFolder url="file://$MODULE_DIR$/message-group/src" isTestSource="false" />
+ <sourceFolder url="file://$MODULE_DIR$/message-group2/src" isTestSource="true" />
<sourceFolder url="file://$MODULE_DIR$/message-priority/src" isTestSource="false" />
<sourceFolder url="file://$MODULE_DIR$/no-consumer-buffering/src" isTestSource="false" />
<sourceFolder url="file://$MODULE_DIR$/paging/src" isTestSource="false" />
Copied: trunk/examples/jms/message-group2/build.bat (from rev 8456, trunk/examples/jms/message-group/build.bat)
===================================================================
--- trunk/examples/jms/message-group2/build.bat (rev 0)
+++ trunk/examples/jms/message-group2/build.bat 2009-12-01 09:01:01 UTC (rev 8477)
@@ -0,0 +1,13 @@
+@echo off
+
+set "OVERRIDE_ANT_HOME=..\..\..\tools\ant"
+
+if exist "..\..\..\src\bin\build.bat" (
+ rem running from TRUNK
+ call ..\..\..\src\bin\build.bat %*
+) else (
+ rem running from the distro
+ call ..\..\..\bin\build.bat %*
+)
+
+set "OVERRIDE_ANT_HOME="
Copied: trunk/examples/jms/message-group2/build.sh (from rev 8456, trunk/examples/jms/message-group/build.sh)
===================================================================
--- trunk/examples/jms/message-group2/build.sh (rev 0)
+++ trunk/examples/jms/message-group2/build.sh 2009-12-01 09:01:01 UTC (rev 8477)
@@ -0,0 +1,15 @@
+#!/bin/sh
+
+OVERRIDE_ANT_HOME=../../../tools/ant
+export OVERRIDE_ANT_HOME
+
+if [ -f "../../../src/bin/build.sh" ]; then
+ # running from TRUNK
+ ../../../src/bin/build.sh "$@"
+else
+ # running from the distro
+ ../../../bin/build.sh "$@"
+fi
+
+
+
Copied: trunk/examples/jms/message-group2/build.xml (from rev 8456, trunk/examples/jms/message-group/build.xml)
===================================================================
--- trunk/examples/jms/message-group2/build.xml (rev 0)
+++ trunk/examples/jms/message-group2/build.xml 2009-12-01 09:01:01 UTC (rev 8477)
@@ -0,0 +1,34 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!DOCTYPE project [
+ <!ENTITY libraries SYSTEM "../../../thirdparty/libraries.ent">
+ ]>
+<!--
+ ~ Copyright 2009 Red Hat, Inc.
+ ~ Red Hat licenses this file to you under the Apache License, version
+ ~ 2.0 (the "License"); you may not use this file except in compliance
+ ~ with the License. You may obtain a copy of the License at
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ ~ implied. See the License for the specific language governing
+ ~ permissions and limitations under the License.
+ -->
+<project default="run" name="HornetQ JMS Message Group Example">
+
+ <import file="../../common/build.xml"/>
+
+ <target name="run">
+ <antcall target="runExample">
+ <param name="example.classname" value="org.hornetq.jms.example.MessageGroup2Example"/>
+ </antcall>
+ </target>
+
+ <target name="runRemote">
+ <antcall target="runExample">
+ <param name="example.classname" value="org.hornetq.jms.example.MessageGroup2Example"/>
+ <param name="hornetq.example.runServer" value="false"/>
+ </antcall>
+ </target>
+
+</project>
Copied: trunk/examples/jms/message-group2/readme.html (from rev 8456, trunk/examples/jms/message-group/readme.html)
===================================================================
--- trunk/examples/jms/message-group2/readme.html (rev 0)
+++ trunk/examples/jms/message-group2/readme.html 2009-12-01 09:01:01 UTC (rev 8477)
@@ -0,0 +1,142 @@
+<html>
+ <head>
+ <title>HornetQ Message Group Example</title>
+ <link rel="stylesheet" type="text/css" href="../../common/common.css" />
+ <link rel="stylesheet" type="text/css" href="../../common/prettify.css" />
+ <script type="text/javascript" src="../../common/prettify.js"></script>
+ </head>
+ <body onload="prettyPrint()">
+ <h1>Message Group Example</h1>
+ <br>
+ <p>This example shows you how to configure and use message groups via a connection factory with HornetQ.</p>
+
+ <p>Message groups are sets of messages that has the following characteristics: </p>
+ <li>Messages in a message group share the same group id, i.e. they have same JMSXGroupID string property values.</li>
+ <li>Messages in a message group will be all delivered to no more than one of the queue's consumers. The consumer that receives the
+ first message of a group will receive all the messages that belongs to the group.</li>
+
+ <p>You can make any message belong to a message group by setting a 'group-id' on the connection factory. All producers created via this connection factory will set that group id on its messages.
+ In this example we set the group id 'Group-0'on a connection factory and send messages via 2 different producers and check that only 1 consumer receives them. </p>
+
+ <p>Alternatively, HornetQ's connection factories can be configured to <em>auto group</em> messages. By setting <code>autogroup</code> to </code>true</code> on the <code>HornetQConnectionFactory</code>
+ (or setting <code><autogroup>true</autogroup></code> in <code>hornetq-jms.xml</code>'s connection factory settings), a random unique id
+ will be picked to create a message group. <em>Every messages</em> sent by a producer created from this connection factory will automatically
+ be part of this message group.</p>
+
+ <br>
+ <h2>Example step-by-step</h2>
+ <p><i>To run the example, simply type <code>./build.sh</code> (or <code>build.bat</code> on windows) from this directory</i></p>
+ <br>
+ <ol>
+ <li>First we need to get an initial context so we can look-up the JMS connection factory and destination objects from JNDI. This initial context will get it's properties from the <code>client-jndi.properties</code> file in the directory <code>../common/config</code></li>
+ <pre class="prettyprint">
+ <code>InitialContext initialContext = getContext();</code>
+ </pre>
+
+ <li>We look-up the JMS queue object from JNDI</li>
+ <pre class="prettyprint">
+ <code>Queue queue = (Queue) initialContext.lookup("/queue/exampleQueue");</code>
+ </pre>
+
+ <li>We look-up the JMS connection factory object from JNDI</li>
+ <pre class="prettyprint">
+ <code>ConnectionFactory cf = (ConnectionFactory) initialContext.lookup("/ConnectionFactory");</code>
+ </pre>
+
+ <li>We create a JMS connection</li>
+ <pre class="prettyprint">
+ <code>connection = cf.createConnection();</code>
+ </pre>
+
+ <li>We create a JMS session. The session is created as non transacted and will auto acknowledge messages.</li>
+ <pre class="prettyprint">
+ <code>Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);</code>
+ </pre>
+
+ <li>We create 2 JMS message producers on the session. This will be used to send the messages.</li>
+ <pre class="prettyprint">
+ <code>
+ MessageProducer producer1 = session.createProducer(queue);
+
+ MessageProducer producer2 = session.createProducer(queue);</code>
+ </pre>
+
+ <li>We create two consumers.</li>
+ <pre class="prettyprint">
+ <code>
+ MessageConsumer consumer1 = session.createConsumer(queue);
+ consumer1.setMessageListener(new SimpleMessageListener("consumer-1"));
+ MessageConsumer consumer2 = session.createConsumer(queue);
+ consumer2.setMessageListener(new SimpleMessageListener("consumer-2"));
+ </code>
+ </pre>
+
+ <li>We create and send 10 text messages using each producer</li>
+ <pre class="prettyprint">
+ <code>
+ int msgCount = 10;
+ for (int i = 0; i < msgCount; i++)
+ {
+ TextMessage m = session.createTextMessage("producer1 message " + i);
+ producer1.send(m);
+ System.out.println("Sent message: " + m.getText());
+ TextMessage m2 = session.createTextMessage("producer2 message " + i);
+ producer2.send(m2);
+ System.out.println("Sent message: " + m2.getText());
+ }
+ </code>
+ </pre>
+
+ <li>We start the connection.</li>
+ <pre class="prettyprint">
+ <code>connection.start();</code>
+ </pre>
+
+ <li>We check the group messages are received by only one consumer</li>
+ <pre class="prettyprint">
+ <code>
+ String trueReceiver = messageReceiverMap.get("producer1 message " + 0);
+ for (int i = 0; i < msgCount; i++)
+ {
+ String receiver = messageReceiverMap.get("producer1 message " + i);
+ if (!trueReceiver.equals(receiver))
+ {
+ System.out.println("Group message [producer1 message " + i + "] went to wrong receiver: " + receiver);
+ result = false;
+ }
+ receiver = messageReceiverMap.get("producer2 message " + i);
+ if (!trueReceiver.equals(receiver))
+ {
+ System.out.println("Group message [producer2 message " + i + "] went to wrong receiver: " + receiver);
+ result = false;
+ }
+ }
+
+ </code>
+ </pre>
+
+ <li>And finally, <b>always</b> remember to close your JMS connections and resources after use, in a <code>finally</code> block. Closing a JMS connection will automatically close all of its sessions, consumers, producer and browser objects</li>
+
+ <pre class="prettyprint">
+ <code>finally
+ {
+ if (initialContext != null)
+ {
+ initialContext.close();
+ }
+ if (connection != null)
+ {
+ connection.close();
+ }
+ }</code>
+ </pre>
+ </ol>
+
+ <h2>More information</h2>
+
+ <ul>
+ <li>User Manual's <a href="../../../docs/user-manual/en/html_single/index.html#message-grouping2">Message Grouping chapter</a></li>
+ </ul>
+
+ </body>
+</html>
\ No newline at end of file
Copied: trunk/examples/jms/message-group2/server0/client-jndi.properties (from rev 8456, trunk/examples/jms/message-group/server0/client-jndi.properties)
===================================================================
--- trunk/examples/jms/message-group2/server0/client-jndi.properties (rev 0)
+++ trunk/examples/jms/message-group2/server0/client-jndi.properties 2009-12-01 09:01:01 UTC (rev 8477)
@@ -0,0 +1,3 @@
+java.naming.factory.initial=org.jnp.interfaces.NamingContextFactory
+java.naming.provider.url=jnp://localhost:1099
+java.naming.factory.url.pkgs=org.jboss.naming:org.jnp.interfaces
Copied: trunk/examples/jms/message-group2/server0/hornetq-beans.xml (from rev 8456, trunk/examples/jms/message-group/server0/hornetq-beans.xml)
===================================================================
--- trunk/examples/jms/message-group2/server0/hornetq-beans.xml (rev 0)
+++ trunk/examples/jms/message-group2/server0/hornetq-beans.xml 2009-12-01 09:01:01 UTC (rev 8477)
@@ -0,0 +1,59 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<deployment xmlns="urn:jboss:bean-deployer:2.0">
+
+ <bean name="Naming" class="org.jnp.server.NamingBeanImpl"/>
+
+ <!-- JNDI server. Disable this if you don't want JNDI -->
+ <bean name="JNDIServer" class="org.jnp.server.Main">
+ <property name="namingInfo">
+ <inject bean="Naming"/>
+ </property>
+ <property name="port">1099</property>
+ <property name="bindAddress">localhost</property>
+ <property name="rmiPort">1098</property>
+ <property name="rmiBindAddress">localhost</property>
+ </bean>
+
+ <!-- MBean server -->
+ <bean name="MBeanServer" class="javax.management.MBeanServer">
+ <constructor factoryClass="java.lang.management.ManagementFactory"
+ factoryMethod="getPlatformMBeanServer"/>
+ </bean>
+
+ <!-- The core configuration -->
+ <bean name="Configuration" class="org.hornetq.core.config.impl.FileConfiguration"/>
+
+ <!-- The security manager -->
+ <bean name="HornetQSecurityManager" class="org.hornetq.core.security.impl.HornetQSecurityManagerImpl">
+ <start ignored="true"/>
+ <stop ignored="true"/>
+ </bean>
+
+ <!-- The core server -->
+ <bean name="HornetQServer" class="org.hornetq.core.server.impl.HornetQServerImpl">
+ <constructor>
+ <parameter>
+ <inject bean="Configuration"/>
+ </parameter>
+ <parameter>
+ <inject bean="MBeanServer"/>
+ </parameter>
+ <parameter>
+ <inject bean="HornetQSecurityManager"/>
+ </parameter>
+ </constructor>
+ <start ignored="true"/>
+ <stop ignored="true"/>
+ </bean>
+
+ <!-- The JMS server -->
+ <bean name="JMSServerManager" class="org.hornetq.jms.server.impl.JMSServerManagerImpl">
+ <constructor>
+ <parameter>
+ <inject bean="HornetQServer"/>
+ </parameter>
+ </constructor>
+ </bean>
+
+</deployment>
Copied: trunk/examples/jms/message-group2/server0/hornetq-configuration.xml (from rev 8456, trunk/examples/jms/message-group/server0/hornetq-configuration.xml)
===================================================================
--- trunk/examples/jms/message-group2/server0/hornetq-configuration.xml (rev 0)
+++ trunk/examples/jms/message-group2/server0/hornetq-configuration.xml 2009-12-01 09:01:01 UTC (rev 8477)
@@ -0,0 +1,33 @@
+<configuration xmlns="urn:hornetq"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="urn:hornetq /schema/hornetq-configuration.xsd">
+
+ <!-- Connectors -->
+ <connectors>
+ <connector name="netty-connector">
+ <factory-class>org.hornetq.integration.transports.netty.NettyConnectorFactory</factory-class>
+ </connector>
+ </connectors>
+
+ <!-- Acceptors -->
+ <acceptors>
+ <acceptor name="netty-acceptor">
+ <factory-class>org.hornetq.integration.transports.netty.NettyAcceptorFactory</factory-class>
+ </acceptor>
+ </acceptors>
+
+ <!-- Other config -->
+
+ <security-settings>
+ <!--security for example queue-->
+ <security-setting match="jms.queue.exampleQueue">
+ <permission type="createDurableQueue" roles="guest"/>
+ <permission type="deleteDurableQueue" roles="guest"/>
+ <permission type="createTempQueue" roles="guest"/>
+ <permission type="deleteTempQueue" roles="guest"/>
+ <permission type="consume" roles="guest"/>
+ <permission type="send" roles="guest"/>
+ </security-setting>
+ </security-settings>
+
+</configuration>
Copied: trunk/examples/jms/message-group2/server0/hornetq-jms.xml (from rev 8456, trunk/examples/jms/message-group/server0/hornetq-jms.xml)
===================================================================
--- trunk/examples/jms/message-group2/server0/hornetq-jms.xml (rev 0)
+++ trunk/examples/jms/message-group2/server0/hornetq-jms.xml 2009-12-01 09:01:01 UTC (rev 8477)
@@ -0,0 +1,20 @@
+<configuration xmlns="urn:hornetq"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="urn:hornetq /schema/hornetq-jms.xsd">
+ <!--the connection factory used by the example-->
+ <connection-factory name="ConnectionFactory">
+ <connectors>
+ <connector-ref connector-name="netty-connector"/>
+ </connectors>
+ <entries>
+ <entry name="ConnectionFactory"/>
+ </entries>
+ <group-id>Group-0</group-id>
+ </connection-factory>
+
+ <!--the queue used by the example-->
+ <queue name="exampleQueue">
+ <entry name="/queue/exampleQueue"/>
+ </queue>
+
+</configuration>
\ No newline at end of file
Copied: trunk/examples/jms/message-group2/server0/hornetq-users.xml (from rev 8456, trunk/examples/jms/message-group/server0/hornetq-users.xml)
===================================================================
--- trunk/examples/jms/message-group2/server0/hornetq-users.xml (rev 0)
+++ trunk/examples/jms/message-group2/server0/hornetq-users.xml 2009-12-01 09:01:01 UTC (rev 8477)
@@ -0,0 +1,7 @@
+<configuration xmlns="urn:hornetq" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="urn:hornetq /schema/hornetq-users.xsd">
+ <!-- the default user. this is used where username is null-->
+ <defaultuser name="guest" password="guest">
+ <role name="guest"/>
+ </defaultuser>
+</configuration>
\ No newline at end of file
Copied: trunk/examples/jms/message-group2/src/org/hornetq/jms/example/MessageGroup2Example.java (from rev 8456, trunk/examples/jms/message-group/src/org/hornetq/jms/example/MessageGroupExample.java)
===================================================================
--- trunk/examples/jms/message-group2/src/org/hornetq/jms/example/MessageGroup2Example.java (rev 0)
+++ trunk/examples/jms/message-group2/src/org/hornetq/jms/example/MessageGroup2Example.java 2009-12-01 09:01:01 UTC (rev 8477)
@@ -0,0 +1,160 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.hornetq.jms.example;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.naming.InitialContext;
+
+import org.hornetq.common.example.HornetQExample;
+import org.hornetq.jms.client.HornetQMessage;
+
+/**
+ * A simple JMS Queue example that sends and receives message groups.
+ *
+ * @author <a href="hgao(a)redhat.com">Howard Gao</a>
+ */
+public class MessageGroup2Example extends HornetQExample
+{
+ private Map<String, String> messageReceiverMap = new ConcurrentHashMap<String, String>();
+ private boolean result = true;
+
+ public static void main(String[] args)
+ {
+ new MessageGroup2Example().run(args);
+ }
+
+ public boolean runExample() throws Exception
+ {
+ Connection connection = null;
+ InitialContext initialContext = null;
+ try
+ {
+ //Step 1. Create an initial context to perform the JNDI lookup.
+ initialContext = getContext(0);
+
+ //Step 2. Perform a lookup on the queue
+ Queue queue = (Queue) initialContext.lookup("/queue/exampleQueue");
+
+ //Step 3. Perform a lookup on the Connection Factory
+ ConnectionFactory cf = (ConnectionFactory) initialContext.lookup("/ConnectionFactory");
+
+ //Step 4. Create a JMS Connection
+ connection = cf.createConnection();
+
+ //Step 5. Create a JMS Session
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ //Step 6. Create 2 JMS Message Producers
+ MessageProducer producer1 = session.createProducer(queue);
+
+ MessageProducer producer2 = session.createProducer(queue);
+
+ //Step 7. Create two consumers
+ MessageConsumer consumer1 = session.createConsumer(queue);
+ consumer1.setMessageListener(new SimpleMessageListener("consumer-1"));
+ MessageConsumer consumer2 = session.createConsumer(queue);
+ consumer2.setMessageListener(new SimpleMessageListener("consumer-2"));
+
+ //Step 8. Create and send 10 text messages with each producer
+ int msgCount = 10;
+ for (int i = 0; i < msgCount; i++)
+ {
+ TextMessage m = session.createTextMessage("producer1 message " + i);
+ producer1.send(m);
+ System.out.println("Sent message: " + m.getText());
+ TextMessage m2 = session.createTextMessage("producer2 message " + i);
+ producer2.send(m2);
+ System.out.println("Sent message: " + m2.getText());
+ }
+
+ System.out.println("all messages are sent");
+
+ //Step 9. Start the connection
+ connection.start();
+
+ Thread.sleep(2000);
+
+ //Step 10. check the group messages are received by only one consumer
+
+ String trueReceiver = messageReceiverMap.get("producer1 message " + 0);
+ for (int i = 0; i < msgCount; i++)
+ {
+ String receiver = messageReceiverMap.get("producer1 message " + i);
+ if (!trueReceiver.equals(receiver))
+ {
+ System.out.println("Group message [producer1 message " + i + "] went to wrong receiver: " + receiver);
+ result = false;
+ }
+ receiver = messageReceiverMap.get("producer2 message " + i);
+ if (!trueReceiver.equals(receiver))
+ {
+ System.out.println("Group message [producer2 message " + i + "] went to wrong receiver: " + receiver);
+ result = false;
+ }
+ }
+
+ return result;
+ }
+ finally
+ {
+ //Step 11. Be sure to close our JMS resources!
+ if (initialContext != null)
+ {
+ initialContext.close();
+ }
+ if(connection != null)
+ {
+ connection.close();
+ }
+ }
+ }
+
+ private class SimpleMessageListener implements MessageListener
+ {
+ private String name;
+
+ public SimpleMessageListener(String listenerName)
+ {
+ name = listenerName;
+ }
+
+ public void onMessage(Message message)
+ {
+ try
+ {
+ TextMessage msg = (TextMessage)message;
+ System.out.format("Message: [%s] received by %s\n",
+ msg.getText(),
+ name);
+ messageReceiverMap.put(msg.getText(), name);
+ }
+ catch (JMSException e)
+ {
+ e.printStackTrace();
+ }
+ }
+ }
+
+}
\ No newline at end of file
Modified: trunk/hornetq.iws
===================================================================
--- trunk/hornetq.iws 2009-12-01 08:55:10 UTC (rev 8476)
+++ trunk/hornetq.iws 2009-12-01 09:01:01 UTC (rev 8477)
@@ -28,6 +28,7 @@
<change type="MODIFICATION" beforePath="$PROJECT_DIR$/tests/jms-tests/src/org/hornetq/jms/tests/tools/container/LocalTestServer.java" afterPath="$PROJECT_DIR$/tests/jms-tests/src/org/hornetq/jms/tests/tools/container/LocalTestServer.java" />
<change type="MODIFICATION" beforePath="$PROJECT_DIR$/tests/src/org/hornetq/tests/integration/jms/client/SessionClosedOnRemotingConnectionFailureTest.java" afterPath="$PROJECT_DIR$/tests/src/org/hornetq/tests/integration/jms/client/SessionClosedOnRemotingConnectionFailureTest.java" />
<change type="MODIFICATION" beforePath="$PROJECT_DIR$/src/main/org/hornetq/jms/server/impl/JMSServerDeployer.java" afterPath="$PROJECT_DIR$/src/main/org/hornetq/jms/server/impl/JMSServerDeployer.java" />
+ <change type="MODIFICATION" beforePath="$PROJECT_DIR$/hornetq.iws" afterPath="$PROJECT_DIR$/hornetq.iws" />
<change type="MODIFICATION" beforePath="$PROJECT_DIR$/docs/user-manual/en/message-grouping.xml" afterPath="$PROJECT_DIR$/docs/user-manual/en/message-grouping.xml" />
<change type="NEW" beforePath="" afterPath="$PROJECT_DIR$/examples/jms/message-group2/src/org/hornetq" />
<change type="NEW" beforePath="" afterPath="$PROJECT_DIR$/examples/jms/message-group2/src/org" />
@@ -54,10 +55,6 @@
<change type="MODIFICATION" beforePath="$PROJECT_DIR$/src/main/org/hornetq/jms/client/HornetQConnectionFactory.java" afterPath="$PROJECT_DIR$/src/main/org/hornetq/jms/client/HornetQConnectionFactory.java" />
<change type="NEW" beforePath="" afterPath="$PROJECT_DIR$/examples/jms/message-group2/server0/hornetq-jms.xml" />
</list>
- <list name="intellij" comment="https://jira.jboss.org/jira/browse/HORNETQ-229 - added the ability to control grouping at the connection factry level">
- <change type="MODIFICATION" beforePath="$PROJECT_DIR$/hornetq.ipr" afterPath="$PROJECT_DIR$/hornetq.ipr" />
- <change type="NEW" beforePath="" afterPath="$PROJECT_DIR$/hornetq.iws" />
- </list>
<ignored path="messaging.iws" />
<ignored path=".idea/workspace.xml" />
</component>
@@ -2188,7 +2185,7 @@
<frame x="0" y="25" width="1920" height="1150" extended-state="0" />
<editor active="false" />
<layout>
- <window_info id="Changes" active="false" anchor="bottom" auto_hide="false" internal_type="DOCKED" type="DOCKED" visible="true" weight="0.23558648" sideWeight="0.0" order="7" side_tool="false" />
+ <window_info id="Changes" active="true" anchor="bottom" auto_hide="false" internal_type="DOCKED" type="DOCKED" visible="true" weight="0.23558648" sideWeight="0.0" order="7" side_tool="false" />
<window_info id="Palette" active="false" anchor="right" auto_hide="false" internal_type="DOCKED" type="DOCKED" visible="false" weight="0.33" sideWeight="0.5" order="3" side_tool="false" />
<window_info id="Ant Build" active="false" anchor="right" auto_hide="false" internal_type="DOCKED" type="DOCKED" visible="false" weight="0.25" sideWeight="0.5" order="1" side_tool="false" />
<window_info id="Find" active="false" anchor="bottom" auto_hide="false" internal_type="DOCKED" type="DOCKED" visible="false" weight="0.21704657" sideWeight="0.5" order="1" side_tool="false" />
@@ -2199,7 +2196,7 @@
<window_info id="TODO" active="false" anchor="bottom" auto_hide="false" internal_type="DOCKED" type="DOCKED" visible="false" weight="0.33" sideWeight="0.5" order="6" side_tool="false" />
<window_info id="Structure" active="false" anchor="left" auto_hide="false" internal_type="DOCKED" type="DOCKED" visible="false" weight="0.24959914" sideWeight="0.7006937" order="1" side_tool="false" />
<window_info id="Maven Projects" active="false" anchor="right" auto_hide="false" internal_type="DOCKED" type="DOCKED" visible="false" weight="0.33" sideWeight="0.5" order="3" side_tool="false" />
- <window_info id="Project" active="true" anchor="left" auto_hide="false" internal_type="DOCKED" type="DOCKED" visible="true" weight="0.1769437" sideWeight="0.7574553" order="0" side_tool="false" />
+ <window_info id="Project" active="false" anchor="left" auto_hide="false" internal_type="DOCKED" type="DOCKED" visible="true" weight="0.1769437" sideWeight="0.7574553" order="0" side_tool="false" />
<window_info id="Dependency Viewer" active="false" anchor="bottom" auto_hide="false" internal_type="DOCKED" type="DOCKED" visible="false" weight="0.33" sideWeight="0.5" order="7" side_tool="false" />
<window_info id="Inspection" active="false" anchor="bottom" auto_hide="false" internal_type="DOCKED" type="DOCKED" visible="false" weight="0.4" sideWeight="0.5" order="5" side_tool="false" />
<window_info id="Run" active="false" anchor="bottom" auto_hide="false" internal_type="DOCKED" type="DOCKED" visible="false" weight="0.26540756" sideWeight="0.5" order="2" side_tool="false" />
Modified: trunk/src/config/common/schema/hornetq-jms.xsd
===================================================================
--- trunk/src/config/common/schema/hornetq-jms.xsd 2009-12-01 08:55:10 UTC (rev 8476)
+++ trunk/src/config/common/schema/hornetq-jms.xsd 2009-12-01 09:01:01 UTC (rev 8477)
@@ -127,6 +127,9 @@
<xsd:element name="thread-pool-max-size" type="xsd:int"
maxOccurs="1" minOccurs="0">
</xsd:element>
+ <xsd:element name="group-id" type="xsd:string"
+ maxOccurs="1" minOccurs="0">
+ </xsd:element>
</xsd:all>
<xsd:attribute name="name" type="xsd:string"></xsd:attribute>
</xsd:complexType>
Modified: trunk/src/main/org/hornetq/core/client/ClientSessionFactory.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/ClientSessionFactory.java 2009-12-01 08:55:10 UTC (rev 8476)
+++ trunk/src/main/org/hornetq/core/client/ClientSessionFactory.java 2009-12-01 09:01:01 UTC (rev 8477)
@@ -181,4 +181,8 @@
void close();
ClientSessionFactory copy();
+
+ void setGroupID(String groupID);
+
+ String getGroupID();
}
Modified: trunk/src/main/org/hornetq/core/client/impl/ClientProducerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ClientProducerImpl.java 2009-12-01 08:55:10 UTC (rev 8476)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientProducerImpl.java 2009-12-01 09:01:01 UTC (rev 8477)
@@ -83,6 +83,7 @@
final boolean blockOnNonPersistentSend,
final boolean blockOnPersistentSend,
final boolean autoGroup,
+ final SimpleString groupID,
final int minLargeMessageSize,
final Channel channel)
{
@@ -104,7 +105,7 @@
}
else
{
- this.groupID = null;
+ this.groupID = groupID;
}
this.minLargeMessageSize = minLargeMessageSize;
Modified: trunk/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java 2009-12-01 08:55:10 UTC (rev 8476)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java 2009-12-01 09:01:01 UTC (rev 8477)
@@ -209,6 +209,8 @@
private static ScheduledExecutorService globalScheduledThreadPool;
+ private String groupID;
+
private static synchronized ExecutorService getGlobalThreadPool()
{
if (globalThreadPool == null)
@@ -381,6 +383,8 @@
cacheLargeMessagesClient = other.isCacheLargeMessagesClient();
initialMessagePacketSize = other.getInitialMessagePacketSize();
+
+ groupID = other.getGroupID();
}
public ClientSessionFactoryImpl()
@@ -980,7 +984,17 @@
{
return new ClientSessionFactoryImpl(this);
}
-
+
+ public void setGroupID(final String groupID)
+ {
+ this.groupID = groupID;
+ }
+
+ public String getGroupID()
+ {
+ return groupID;
+ }
+
// DiscoveryListener implementation --------------------------------------------------------
public synchronized void connectorsChanged()
@@ -1125,7 +1139,8 @@
consumerMaxRate,
blockOnNonPersistentSend,
blockOnPersistentSend,
- initialMessagePacketSize);
+ initialMessagePacketSize,
+ groupID);
return session;
}
Modified: trunk/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java 2009-12-01 08:55:10 UTC (rev 8476)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java 2009-12-01 09:01:01 UTC (rev 8477)
@@ -177,6 +177,8 @@
private volatile boolean workDone;
+ private final String groupID;
+
// Constructors ----------------------------------------------------------------------------
public ClientSessionImpl(final FailoverManager connectionManager,
@@ -200,6 +202,7 @@
final boolean cacheLargeMessageClient,
final int minLargeMessageSize,
final int initialMessagePacketSize,
+ final String groupID,
final RemotingConnection remotingConnection,
final int version,
final Channel channel,
@@ -253,6 +256,8 @@
this.initialMessagePacketSize = initialMessagePacketSize;
+ this.groupID = groupID;
+
producerCreditManager = new ClientProducerCreditManagerImpl(this, producerWindowSize);
}
@@ -1390,6 +1395,7 @@
autoCommitSends && blockOnNonPersistentSend,
autoCommitSends && blockOnPersistentSend,
autoGroup,
+ groupID == null?null:new SimpleString(groupID),
minLargeMessageSize,
channel);
Modified: trunk/src/main/org/hornetq/core/client/impl/FailoverManager.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/FailoverManager.java 2009-12-01 08:55:10 UTC (rev 8476)
+++ trunk/src/main/org/hornetq/core/client/impl/FailoverManager.java 2009-12-01 09:01:01 UTC (rev 8477)
@@ -47,7 +47,8 @@
final int consumerMaxRate,
final boolean blockOnNonPersistentSend,
final boolean blockOnPersistentSend,
- final int initialMessagePacketSize) throws HornetQException;
+ final int initialMessagePacketSize,
+ final String groupID) throws HornetQException;
void removeSession(final ClientSessionInternal session);
Modified: trunk/src/main/org/hornetq/core/client/impl/FailoverManagerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/FailoverManagerImpl.java 2009-12-01 08:55:10 UTC (rev 8476)
+++ trunk/src/main/org/hornetq/core/client/impl/FailoverManagerImpl.java 2009-12-01 09:01:01 UTC (rev 8477)
@@ -311,7 +311,8 @@
final int consumerMaxRate,
final boolean blockOnNonPersistentSend,
final boolean blockOnPersistentSend,
- final int initialMessagePacketSize) throws HornetQException
+ final int initialMessagePacketSize,
+ final String groupID) throws HornetQException
{
synchronized (createSessionLock)
{
@@ -423,6 +424,7 @@
cacheLargeMessageClient,
minLargeMessageSize,
initialMessagePacketSize,
+ groupID,
theConnection,
response.getServerVersion(),
sessionChannel,
Modified: trunk/src/main/org/hornetq/jms/client/HornetQConnectionFactory.java
===================================================================
--- trunk/src/main/org/hornetq/jms/client/HornetQConnectionFactory.java 2009-12-01 08:55:10 UTC (rev 8476)
+++ trunk/src/main/org/hornetq/jms/client/HornetQConnectionFactory.java 2009-12-01 09:01:01 UTC (rev 8477)
@@ -557,6 +557,16 @@
return sessionFactory;
}
+ public void setGroupID(final String groupID)
+ {
+ sessionFactory.setGroupID(groupID);
+ }
+
+ public String getGroupID()
+ {
+ return sessionFactory.getGroupID();
+ }
+
public void close()
{
sessionFactory.close();
Modified: trunk/src/main/org/hornetq/jms/server/JMSServerManager.java
===================================================================
--- trunk/src/main/org/hornetq/jms/server/JMSServerManager.java 2009-12-01 08:55:10 UTC (rev 8476)
+++ trunk/src/main/org/hornetq/jms/server/JMSServerManager.java 2009-12-01 09:01:01 UTC (rev 8477)
@@ -165,6 +165,7 @@
long maxRetryInterval,
int reconnectAttempts,
boolean failoverOnServerShutdown,
+ String groupId,
List<String> jndiBindings) throws Exception;
void createConnectionFactory(String name,
@@ -199,6 +200,7 @@
long maxRetryInterval,
int reconnectAttempts,
boolean failoverOnServerShutdown,
+ String groupId,
List<String> jndiBindings) throws Exception;
/**
Modified: trunk/src/main/org/hornetq/jms/server/config/ConnectionFactoryConfiguration.java
===================================================================
--- trunk/src/main/org/hornetq/jms/server/config/ConnectionFactoryConfiguration.java 2009-12-01 08:55:10 UTC (rev 8476)
+++ trunk/src/main/org/hornetq/jms/server/config/ConnectionFactoryConfiguration.java 2009-12-01 09:01:01 UTC (rev 8477)
@@ -159,4 +159,7 @@
void setFailoverOnServerShutdown(boolean failoverOnServerShutdown);
+ String getGroupID();
+
+ void setGroupID(String groupID);
}
Modified: trunk/src/main/org/hornetq/jms/server/config/impl/ConnectionFactoryConfigurationImpl.java
===================================================================
--- trunk/src/main/org/hornetq/jms/server/config/impl/ConnectionFactoryConfigurationImpl.java 2009-12-01 08:55:10 UTC (rev 8476)
+++ trunk/src/main/org/hornetq/jms/server/config/impl/ConnectionFactoryConfigurationImpl.java 2009-12-01 09:01:01 UTC (rev 8477)
@@ -102,6 +102,8 @@
private boolean failoverOnServerShutdown = ClientSessionFactoryImpl.DEFAULT_FAILOVER_ON_SERVER_SHUTDOWN;
+ private String groupID = null;
+
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
@@ -471,6 +473,16 @@
this.failoverOnServerShutdown = failoverOnServerShutdown;
}
+ public String getGroupID()
+ {
+ return groupID;
+ }
+
+ public void setGroupID(String groupID)
+ {
+ this.groupID = groupID;
+ }
+
// Public --------------------------------------------------------
// Package protected ---------------------------------------------
Modified: trunk/src/main/org/hornetq/jms/server/impl/JMSServerDeployer.java
===================================================================
--- trunk/src/main/org/hornetq/jms/server/impl/JMSServerDeployer.java 2009-12-01 08:55:10 UTC (rev 8476)
+++ trunk/src/main/org/hornetq/jms/server/impl/JMSServerDeployer.java 2009-12-01 09:01:01 UTC (rev 8477)
@@ -154,7 +154,7 @@
int threadPoolMaxSize = getInteger(e, "thread-pool-max-size", ClientSessionFactoryImpl.DEFAULT_THREAD_POOL_MAX_SIZE, MINUS_ONE_OR_GT_ZERO);
String connectionLoadBalancingPolicyClassName = getString(e, "connection-load-balancing-policy-class-name", ClientSessionFactoryImpl.DEFAULT_CONNECTION_LOAD_BALANCING_POLICY_CLASS_NAME, Validators.NOT_NULL_OR_EMPTY);
long discoveryInitialWaitTimeout = getLong(e, "discovery-initial-wait-timeout", ClientSessionFactoryImpl.DEFAULT_DISCOVERY_INITIAL_WAIT_TIMEOUT, GT_ZERO);
-
+ String groupid = getString(e, "group-id", null, Validators.NO_CHECK);
List<String> jndiBindings = new ArrayList<String>();
List<Pair<TransportConfiguration, TransportConfiguration>> connectorConfigs = new ArrayList<Pair<TransportConfiguration, TransportConfiguration>>();
DiscoveryGroupConfiguration discoveryGroupConfiguration = null;
@@ -265,6 +265,7 @@
maxRetryInterval,
reconnectAttempts,
failoverOnServerShutdown,
+ groupid,
jndiBindings);
}
else
@@ -298,6 +299,7 @@
maxRetryInterval,
reconnectAttempts,
failoverOnServerShutdown,
+ groupid,
jndiBindings);
}
}
Modified: trunk/src/main/org/hornetq/jms/server/impl/JMSServerManagerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/jms/server/impl/JMSServerManagerImpl.java 2009-12-01 08:55:10 UTC (rev 8476)
+++ trunk/src/main/org/hornetq/jms/server/impl/JMSServerManagerImpl.java 2009-12-01 09:01:01 UTC (rev 8477)
@@ -408,6 +408,7 @@
final long maxRetryInterval,
final int reconnectAttempts,
final boolean failoverOnServerShutdown,
+ final String groupId,
final List<String> jndiBindings) throws Exception
{
checkInitialised();
@@ -442,6 +443,7 @@
cf.setMaxRetryInterval(maxRetryInterval);
cf.setReconnectAttempts(reconnectAttempts);
cf.setFailoverOnServerShutdown(failoverOnServerShutdown);
+ cf.setGroupID(groupId);
}
bindConnectionFactory(cf, name, jndiBindings);
@@ -479,6 +481,7 @@
final long maxRetryInterval,
final int reconnectAttempts,
final boolean failoverOnServerShutdown,
+ final String groupId,
final List<String> jndiBindings) throws Exception
{
checkInitialised();
@@ -752,7 +755,7 @@
if (config == null)
{
return;
- }
+ }
if (config.getContext() != null)
{
@@ -796,6 +799,7 @@
config.getMaxRetryInterval(),
config.getReconnectAttempts(),
config.isFailoverOnServerShutdown(),
+ config.getGroupID(),
Arrays.asList(config.getBindings()));
}
else
@@ -829,6 +833,7 @@
config.getMaxRetryInterval(),
config.getReconnectAttempts(),
config.isFailoverOnServerShutdown(),
+ config.getGroupID(),
Arrays.asList(config.getBindings()));
}
}
Modified: trunk/src/main/org/hornetq/jms/server/management/JMSServerControl.java
===================================================================
--- trunk/src/main/org/hornetq/jms/server/management/JMSServerControl.java 2009-12-01 08:55:10 UTC (rev 8476)
+++ trunk/src/main/org/hornetq/jms/server/management/JMSServerControl.java 2009-12-01 09:01:01 UTC (rev 8477)
@@ -122,6 +122,7 @@
long maxRetryInterval,
int reconnectAttempts,
boolean failoverOnServerShutdown,
+ String groupID,
Object[] jndiBindings) throws Exception;
void createConnectionFactory(@Parameter(name = "name") String name,
@@ -156,6 +157,7 @@
@Parameter(name = "maxRetryInterval") long maxRetryInterval,
@Parameter(name = "reconnectAttempts") int reconnectAttempts,
@Parameter(name = "failoverOnServerShutdown") boolean failoverOnServerShutdown,
+ @Parameter(name = "groupID") String groupID,
@Parameter(name = "jndiBindings", desc = "comma-separated list of JNDI bindings") String jndiBindings) throws Exception;
void createConnectionFactory(String name,
@@ -203,6 +205,7 @@
long maxRetryInterval,
int reconnectAttempts,
boolean failoverOnServerShutdown,
+ String groupID,
Object[] jndiBindings) throws Exception;
@Operation(desc = "Create a JMS ConnectionFactory", impact = ACTION)
@@ -238,6 +241,7 @@
@Parameter(name = "maxRetryInterval") long maxRetryInterval,
@Parameter(name = "reconnectAttempts") int reconnectAttempts,
@Parameter(name = "failoverOnServerShutdown") boolean failoverOnServerShutdown,
+ @Parameter(name = "groupID") String groupID,
@Parameter(name = "jndiBindings", desc = "comma-separated list of JNDI bindings") String jndiBindings) throws Exception;
void createConnectionFactory(String name,
Modified: trunk/src/main/org/hornetq/jms/server/management/impl/JMSServerControlImpl.java
===================================================================
--- trunk/src/main/org/hornetq/jms/server/management/impl/JMSServerControlImpl.java 2009-12-01 08:55:10 UTC (rev 8476)
+++ trunk/src/main/org/hornetq/jms/server/management/impl/JMSServerControlImpl.java 2009-12-01 09:01:01 UTC (rev 8477)
@@ -249,6 +249,7 @@
final long maxRetryInterval,
final int reconnectAttempts,
final boolean failoverOnServerShutdown,
+ final String groupID,
final Object[] jndiBindings) throws Exception
{
List<Pair<TransportConfiguration, TransportConfiguration>> pairs = convertToConnectorPairs(liveConnectorsTransportClassNames,
@@ -287,6 +288,7 @@
maxRetryInterval,
reconnectAttempts,
failoverOnServerShutdown,
+ groupID,
jndiBindingsList);
sendNotification(NotificationType.CONNECTION_FACTORY_CREATED, name);
@@ -324,6 +326,7 @@
final long maxRetryInterval,
final int reconnectAttempts,
final boolean failoverOnServerShutdown,
+ final String groupID,
final String jndiBindings) throws Exception
{
Object[] liveClassNames = toArray(liveTransportClassNames);
@@ -364,6 +367,7 @@
maxRetryInterval,
reconnectAttempts,
failoverOnServerShutdown,
+ groupID,
bindings);
}
@@ -423,6 +427,7 @@
final long maxRetryInterval,
final int reconnectAttempts,
final boolean failoverOnServerShutdown,
+ final String groupID,
final Object[] jndiBindings) throws Exception
{
List<String> jndiBindingsList = convert(jndiBindings);
@@ -459,6 +464,7 @@
maxRetryInterval,
reconnectAttempts,
failoverOnServerShutdown,
+ groupID,
jndiBindingsList);
sendNotification(NotificationType.CONNECTION_FACTORY_CREATED, name);
@@ -496,6 +502,7 @@
final long maxRetryInterval,
final int reconnectAttempts,
final boolean failoverOnServerShutdown,
+ final String groupID,
final String jndiBindings) throws Exception
{
Object[] bindings = toArray(jndiBindings);
@@ -532,6 +539,7 @@
maxRetryInterval,
reconnectAttempts,
failoverOnServerShutdown,
+ groupID,
bindings);
}
Modified: trunk/tests/jms-tests/src/org/hornetq/jms/tests/CTSMiscellaneousTest.java
===================================================================
--- trunk/tests/jms-tests/src/org/hornetq/jms/tests/CTSMiscellaneousTest.java 2009-12-01 08:55:10 UTC (rev 8476)
+++ trunk/tests/jms-tests/src/org/hornetq/jms/tests/CTSMiscellaneousTest.java 2009-12-01 09:01:01 UTC (rev 8477)
@@ -117,6 +117,7 @@
DEFAULT_MAX_RETRY_INTERVAL,
DEFAULT_RECONNECT_ATTEMPTS,
DEFAULT_FAILOVER_ON_SERVER_SHUTDOWN,
+ null,
jndiBindings);
cf = (HornetQConnectionFactory)getInitialContext().lookup("/StrictTCKConnectionFactory");
Modified: trunk/tests/jms-tests/src/org/hornetq/jms/tests/JMSTestCase.java
===================================================================
--- trunk/tests/jms-tests/src/org/hornetq/jms/tests/JMSTestCase.java 2009-12-01 08:55:10 UTC (rev 8476)
+++ trunk/tests/jms-tests/src/org/hornetq/jms/tests/JMSTestCase.java 2009-12-01 09:01:01 UTC (rev 8477)
@@ -113,6 +113,7 @@
DEFAULT_MAX_RETRY_INTERVAL,
DEFAULT_RECONNECT_ATTEMPTS,
DEFAULT_FAILOVER_ON_SERVER_SHUTDOWN,
+ null,
jndiBindings);
cf = (HornetQConnectionFactory)getInitialContext().lookup("/testsuitecf");
Modified: trunk/tests/jms-tests/src/org/hornetq/jms/tests/tools/container/LocalTestServer.java
===================================================================
--- trunk/tests/jms-tests/src/org/hornetq/jms/tests/tools/container/LocalTestServer.java 2009-12-01 08:55:10 UTC (rev 8476)
+++ trunk/tests/jms-tests/src/org/hornetq/jms/tests/tools/container/LocalTestServer.java 2009-12-01 09:01:01 UTC (rev 8477)
@@ -342,6 +342,7 @@
DEFAULT_MAX_RETRY_INTERVAL,
DEFAULT_RECONNECT_ATTEMPTS,
DEFAULT_FAILOVER_ON_SERVER_SHUTDOWN,
+ null,
jndiBindings);
}
Added: trunk/tests/src/org/hornetq/tests/integration/client/MessageGroupingConnectionFactoryTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/client/MessageGroupingConnectionFactoryTest.java (rev 0)
+++ trunk/tests/src/org/hornetq/tests/integration/client/MessageGroupingConnectionFactoryTest.java 2009-12-01 09:01:01 UTC (rev 8477)
@@ -0,0 +1,218 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.hornetq.tests.integration.client;
+
+import org.hornetq.tests.util.UnitTestCase;
+import org.hornetq.core.logging.Logger;
+import org.hornetq.core.server.HornetQServer;
+import org.hornetq.core.server.HornetQ;
+import org.hornetq.core.client.*;
+import org.hornetq.core.client.impl.ClientSessionFactoryImpl;
+import org.hornetq.core.exception.HornetQException;
+import org.hornetq.core.config.impl.ConfigurationImpl;
+import org.hornetq.core.config.TransportConfiguration;
+import org.hornetq.core.message.impl.MessageImpl;
+import org.hornetq.utils.SimpleString;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.ArrayList;
+
+/**
+ * @author <a href="mailto:andy.taylor@jboss.org">Andy Taylor</a>
+ * Created Dec 1, 2009
+ */
+public class MessageGroupingConnectionFactoryTest extends UnitTestCase
+{
+ private static final Logger log = Logger.getLogger(MessageGroupingTest.class);
+
+ private HornetQServer server;
+
+ private ClientSession clientSession;
+
+ private SimpleString qName = new SimpleString("MessageGroupingTestQueue");
+
+
+ public void testBasicGroupingUsingConnection() throws Exception
+ {
+ doTestBasicGroupingUsingConnectionFactory(false);
+ }
+
+ public void testBasicGroupingUsingConnectionDirect() throws Exception
+ {
+ doTestBasicGroupingUsingConnectionFactory(true);
+ }
+
+ public void testBasicGroupingMultipleProducers() throws Exception
+ {
+ doTestBasicGroupingMultipleProducers(false);
+ }
+
+ public void testBasicGroupingMultipleProducersDirect() throws Exception
+ {
+ doTestBasicGroupingMultipleProducers(true);
+ }
+
+ private void doTestBasicGroupingUsingConnectionFactory(boolean directDelivery) throws Exception
+ {
+ ClientProducer clientProducer = clientSession.createProducer(qName);
+ ClientConsumer consumer = clientSession.createConsumer(qName);
+ ClientConsumer consumer2 = clientSession.createConsumer(qName);
+ if (directDelivery)
+ {
+ clientSession.start();
+ }
+ int numMessages = 100;
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = createTextMessage("m" + i, clientSession);
+ clientProducer.send(message);
+ }
+ if (!directDelivery)
+ {
+ clientSession.start();
+ }
+ CountDownLatch latch = new CountDownLatch(numMessages);
+ DummyMessageHandler dummyMessageHandler = new DummyMessageHandler(latch, true);
+ consumer.setMessageHandler(dummyMessageHandler);
+ DummyMessageHandler dummyMessageHandler2 = new DummyMessageHandler(latch, true);
+ consumer2.setMessageHandler(dummyMessageHandler2);
+ assertTrue(latch.await(10, TimeUnit.SECONDS));
+ assertEquals(100, dummyMessageHandler.list.size());
+ assertEquals(0, dummyMessageHandler2.list.size());
+ consumer.close();
+ consumer2.close();
+ }
+
+ private void doTestBasicGroupingMultipleProducers(boolean directDelivery) throws Exception
+ {
+ ClientProducer clientProducer = clientSession.createProducer(qName);
+ ClientProducer clientProducer2 = clientSession.createProducer(qName);
+ ClientProducer clientProducer3 = clientSession.createProducer(qName);
+ ClientConsumer consumer = clientSession.createConsumer(qName);
+ ClientConsumer consumer2 = clientSession.createConsumer(qName);
+ if (directDelivery)
+ {
+ clientSession.start();
+ }
+ int numMessages = 100;
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = createTextMessage("m" + i, clientSession);
+ clientProducer.send(message);
+ clientProducer2.send(message);
+ clientProducer3.send(message);
+ }
+ if (!directDelivery)
+ {
+ clientSession.start();
+ }
+ CountDownLatch latch = new CountDownLatch(numMessages * 3);
+ DummyMessageHandler dummyMessageHandler = new DummyMessageHandler(latch, true);
+ consumer.setMessageHandler(dummyMessageHandler);
+ DummyMessageHandler dummyMessageHandler2 = new DummyMessageHandler(latch, true);
+ consumer2.setMessageHandler(dummyMessageHandler2);
+ assertTrue(latch.await(10, TimeUnit.SECONDS));
+ assertEquals(300, dummyMessageHandler.list.size());
+ assertEquals(0, dummyMessageHandler2.list.size());
+ consumer.close();
+ consumer2.close();
+ }
+
+ protected void tearDown() throws Exception
+ {
+ if (clientSession != null)
+ {
+ try
+ {
+ clientSession.close();
+ }
+ catch (HornetQException e1)
+ {
+ //
+ }
+ }
+ if (server != null && server.isStarted())
+ {
+ try
+ {
+ server.stop();
+ }
+ catch (Exception e1)
+ {
+ //
+ }
+ }
+ server = null;
+ clientSession = null;
+
+ super.tearDown();
+ }
+
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+
+ ConfigurationImpl configuration = new ConfigurationImpl();
+ configuration.setSecurityEnabled(false);
+ TransportConfiguration transportConfig = new TransportConfiguration(INVM_ACCEPTOR_FACTORY);
+ configuration.getAcceptorConfigurations().add(transportConfig);
+ server = HornetQ.newHornetQServer(configuration, false);
+ // start the server
+ server.start();
+
+ // then we create a client as normal
+ ClientSessionFactory sessionFactory = new ClientSessionFactoryImpl(new TransportConfiguration(INVM_CONNECTOR_FACTORY));
+ sessionFactory.setGroupID("grp1");
+ clientSession = sessionFactory.createSession(false, true, true);
+ clientSession.createQueue(qName, qName, null, false);
+ }
+
+ private static class DummyMessageHandler implements MessageHandler
+ {
+ ArrayList<ClientMessage> list = new ArrayList<ClientMessage>();
+
+ private CountDownLatch latch;
+
+ private final boolean acknowledge;
+
+ public DummyMessageHandler(CountDownLatch latch, boolean acknowledge)
+ {
+ this.latch = latch;
+ this.acknowledge = acknowledge;
+ }
+
+ public void onMessage(ClientMessage message)
+ {
+ list.add(message);
+ if (acknowledge)
+ {
+ try
+ {
+ message.acknowledge();
+ }
+ catch (HornetQException e)
+ {
+ // ignore
+ }
+ }
+ latch.countDown();
+ }
+
+ public void reset(CountDownLatch latch)
+ {
+ list.clear();
+ this.latch = latch;
+ }
+ }
+}
Modified: trunk/tests/src/org/hornetq/tests/integration/client/MessageGroupingTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/client/MessageGroupingTest.java 2009-12-01 08:55:10 UTC (rev 8476)
+++ trunk/tests/src/org/hornetq/tests/integration/client/MessageGroupingTest.java 2009-12-01 09:01:01 UTC (rev 8477)
@@ -153,6 +153,39 @@
consumer2.close();
}
+ private void doTestBasicGroupingUsingConnectionFactory(boolean directDelivery) throws Exception
+ {
+ ClientProducer clientProducer = clientSession.createProducer(qName);
+ ClientConsumer consumer = clientSession.createConsumer(qName);
+ ClientConsumer consumer2 = clientSession.createConsumer(qName);
+ if (directDelivery)
+ {
+ clientSession.start();
+ }
+ SimpleString groupId = new SimpleString("grp1");
+ int numMessages = 100;
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = createTextMessage("m" + i, clientSession);
+ message.putStringProperty(MessageImpl.HDR_GROUP_ID, groupId);
+ clientProducer.send(message);
+ }
+ if (!directDelivery)
+ {
+ clientSession.start();
+ }
+ CountDownLatch latch = new CountDownLatch(numMessages);
+ DummyMessageHandler dummyMessageHandler = new DummyMessageHandler(latch, true);
+ consumer.setMessageHandler(dummyMessageHandler);
+ DummyMessageHandler dummyMessageHandler2 = new DummyMessageHandler(latch, true);
+ consumer2.setMessageHandler(dummyMessageHandler2);
+ assertTrue(latch.await(10, TimeUnit.SECONDS));
+ assertEquals(100, dummyMessageHandler.list.size());
+ assertEquals(0, dummyMessageHandler2.list.size());
+ consumer.close();
+ consumer2.close();
+ }
+
public void testMultipleGroupingConsumeHalf() throws Exception
{
ClientProducer clientProducer = clientSession.createProducer(qName);
Modified: trunk/tests/src/org/hornetq/tests/integration/jms/FloodServerTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/jms/FloodServerTest.java 2009-12-01 08:55:10 UTC (rev 8476)
+++ trunk/tests/src/org/hornetq/tests/integration/jms/FloodServerTest.java 2009-12-01 09:01:01 UTC (rev 8477)
@@ -175,6 +175,7 @@
1000,
reconnectAttempts,
failoverOnServerShutdown,
+ null,
jndiBindings);
}
Modified: trunk/tests/src/org/hornetq/tests/integration/jms/client/PreACKJMSTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/jms/client/PreACKJMSTest.java 2009-12-01 08:55:10 UTC (rev 8476)
+++ trunk/tests/src/org/hornetq/tests/integration/jms/client/PreACKJMSTest.java 2009-12-01 09:01:01 UTC (rev 8477)
@@ -248,6 +248,7 @@
DEFAULT_MAX_RETRY_INTERVAL,
reconnectAttempts,
failoverOnServerShutdown,
+ null,
jndiBindings);
}
Modified: trunk/tests/src/org/hornetq/tests/integration/jms/client/ReSendLargeMessageTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/jms/client/ReSendLargeMessageTest.java 2009-12-01 08:55:10 UTC (rev 8476)
+++ trunk/tests/src/org/hornetq/tests/integration/jms/client/ReSendLargeMessageTest.java 2009-12-01 09:01:01 UTC (rev 8477)
@@ -287,6 +287,7 @@
DEFAULT_MAX_RETRY_INTERVAL,
reconnectAttempts,
failoverOnServerShutdown,
+ null,
jndiBindings);
}
Modified: trunk/tests/src/org/hornetq/tests/integration/jms/client/SessionClosedOnRemotingConnectionFailureTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/jms/client/SessionClosedOnRemotingConnectionFailureTest.java 2009-12-01 08:55:10 UTC (rev 8476)
+++ trunk/tests/src/org/hornetq/tests/integration/jms/client/SessionClosedOnRemotingConnectionFailureTest.java 2009-12-01 09:01:01 UTC (rev 8477)
@@ -118,6 +118,7 @@
DEFAULT_MAX_RETRY_INTERVAL,
0,
false,
+ null,
jndiBindings);
Modified: trunk/tests/src/org/hornetq/tests/integration/jms/client/TextMessageTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/jms/client/TextMessageTest.java 2009-12-01 08:55:10 UTC (rev 8476)
+++ trunk/tests/src/org/hornetq/tests/integration/jms/client/TextMessageTest.java 2009-12-01 09:01:01 UTC (rev 8477)
@@ -275,6 +275,7 @@
DEFAULT_MAX_RETRY_INTERVAL,
reconnectAttempts,
failoverOnServerShutdown,
+ null,
jndiBindings);
}
Modified: trunk/tests/src/org/hornetq/tests/integration/jms/divert/DivertAndACKClientTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/jms/divert/DivertAndACKClientTest.java 2009-12-01 08:55:10 UTC (rev 8476)
+++ trunk/tests/src/org/hornetq/tests/integration/jms/divert/DivertAndACKClientTest.java 2009-12-01 09:01:01 UTC (rev 8477)
@@ -187,6 +187,7 @@
DEFAULT_MAX_RETRY_INTERVAL,
reconnectAttempts,
failoverOnServerShutdown,
+ null,
jndiBindings);
}
Modified: trunk/tests/src/org/hornetq/tests/integration/jms/server/management/JMSServerControlTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/jms/server/management/JMSServerControlTest.java 2009-12-01 08:55:10 UTC (rev 8476)
+++ trunk/tests/src/org/hornetq/tests/integration/jms/server/management/JMSServerControlTest.java 2009-12-01 09:01:01 UTC (rev 8477)
@@ -453,6 +453,7 @@
ClientSessionFactoryImpl.DEFAULT_MAX_RETRY_INTERVAL,
ClientSessionFactoryImpl.DEFAULT_RECONNECT_ATTEMPTS,
ClientSessionFactoryImpl.DEFAULT_FAILOVER_ON_SERVER_SHUTDOWN,
+ null,
bindings);
}
});
@@ -499,6 +500,7 @@
ClientSessionFactoryImpl.DEFAULT_MAX_RETRY_INTERVAL,
ClientSessionFactoryImpl.DEFAULT_RECONNECT_ATTEMPTS,
ClientSessionFactoryImpl.DEFAULT_FAILOVER_ON_SERVER_SHUTDOWN,
+ null,
jndiBindings);
}
});
Modified: trunk/tests/src/org/hornetq/tests/integration/jms/server/management/JMSServerControlUsingJMSTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/jms/server/management/JMSServerControlUsingJMSTest.java 2009-12-01 08:55:10 UTC (rev 8476)
+++ trunk/tests/src/org/hornetq/tests/integration/jms/server/management/JMSServerControlUsingJMSTest.java 2009-12-01 09:01:01 UTC (rev 8477)
@@ -124,6 +124,7 @@
long maxRetryInterval,
int reconnectAttempts,
boolean failoverOnServerShutdown,
+ String groupId,
Object[] jndiBindings) throws Exception
{
proxy.invokeOperation("createConnectionFactory",
@@ -158,6 +159,7 @@
maxRetryInterval,
reconnectAttempts,
failoverOnServerShutdown,
+ groupId,
jndiBindings);
}
@@ -193,6 +195,7 @@
long maxRetryInterval,
int reconnectAttempts,
boolean failoverOnServerShutdown,
+ String groupId,
String jndiBindings) throws Exception
{
proxy.invokeOperation("createConnectionFactory",
@@ -227,6 +230,7 @@
maxRetryInterval,
reconnectAttempts,
failoverOnServerShutdown,
+ groupId,
jndiBindings);
}
@@ -410,6 +414,7 @@
long maxRetryInterval,
int reconnectAttempts,
boolean failoverOnServerShutdown,
+ String groupId,
Object[] jndiBindings) throws Exception
{
proxy.invokeOperation("createConnectionFactory",
@@ -445,6 +450,7 @@
maxRetryInterval,
reconnectAttempts,
failoverOnServerShutdown,
+ groupId,
jndiBindings);
}
@@ -481,6 +487,7 @@
long maxRetryInterval,
int reconnectAttempts,
boolean failoverOnServerShutdown,
+ String groupId,
String jndiBindings) throws Exception
{
proxy.invokeOperation("createConnectionFactory",
@@ -516,6 +523,7 @@
maxRetryInterval,
reconnectAttempts,
failoverOnServerShutdown,
+ groupId,
jndiBindings);
}
Modified: trunk/tests/src/org/hornetq/tests/util/JMSTestBase.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/util/JMSTestBase.java 2009-12-01 08:55:10 UTC (rev 8476)
+++ trunk/tests/src/org/hornetq/tests/util/JMSTestBase.java 2009-12-01 09:01:01 UTC (rev 8477)
@@ -226,6 +226,7 @@
DEFAULT_MAX_RETRY_INTERVAL,
reconnectAttempts,
failoverOnServerShutdown,
+ null,
jndiBindings);
}
15 years, 1 month