JBoss hornetq SVN: r8496 - trunk/tests/src/org/hornetq/tests/integration/remoting.
by do-not-reply@jboss.org
Author: ataylor
Date: 2009-12-02 05:36:54 -0500 (Wed, 02 Dec 2009)
New Revision: 8496
Modified:
trunk/tests/src/org/hornetq/tests/integration/remoting/PingTest.java
Log:
fixed ping test
Modified: trunk/tests/src/org/hornetq/tests/integration/remoting/PingTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/remoting/PingTest.java 2009-12-02 10:12:45 UTC (rev 8495)
+++ trunk/tests/src/org/hornetq/tests/integration/remoting/PingTest.java 2009-12-02 10:36:54 UTC (rev 8496)
@@ -228,7 +228,7 @@
ClientSession session = csf.createSession(false, true, true);
- assertEquals(1, ((ClientSessionFactoryInternal)csf).numConnections());
+ assertEquals(1, csf.numConnections());
session.addFailureListener(clientListener);
@@ -296,7 +296,7 @@
{
// server must received at least one ping from the client to pass
// so that the server connection TTL is configured with the client value
- final CountDownLatch pingOnServerLatch = new CountDownLatch(1);
+ final CountDownLatch pingOnServerLatch = new CountDownLatch(2);
server.getRemotingService().addInterceptor(new Interceptor()
{
@@ -363,14 +363,14 @@
serverConn.addCloseListener(serverListener);
- assertTrue("server has not received any ping from the client" , pingOnServerLatch.await(2000, TimeUnit.MILLISECONDS));
+ assertTrue("server has not received any ping from the client" , pingOnServerLatch.await(4000, TimeUnit.MILLISECONDS));
// we let the server receives at least 1 ping (so that it uses the client ConnectionTTL value)
//Setting the handler to null will prevent server sending pings back to client
serverConn.getChannel(0, -1).setHandler(null);
- assertTrue(clientLatch.await(4 * CLIENT_FAILURE_CHECK_PERIOD, TimeUnit.MILLISECONDS));
+ assertTrue(clientLatch.await(8 * CLIENT_FAILURE_CHECK_PERIOD, TimeUnit.MILLISECONDS));
//Server connection will be closed too, when client closes client side connection after failure is detected
assertTrue(serverLatch.await(2 * RemotingServiceImpl.CONNECTION_TTL_CHECK_INTERVAL, TimeUnit.MILLISECONDS));
15 years, 1 month
JBoss hornetq SVN: r8495 - in trunk: src/config/common/schema and 2 other directories.
by do-not-reply@jboss.org
Author: timfox
Date: 2009-12-02 05:12:45 -0500 (Wed, 02 Dec 2009)
New Revision: 8495
Modified:
trunk/docs/user-manual/en/configuration-index.xml
trunk/docs/user-manual/en/persistence.xml
trunk/src/config/common/schema/hornetq-configuration.xsd
trunk/src/main/org/hornetq/core/config/impl/FileConfiguration.java
trunk/tests/config/ConfigurationTest-full-config.xml
Log:
changed journal-max-aio to journal-max-io
Modified: trunk/docs/user-manual/en/configuration-index.xml
===================================================================
--- trunk/docs/user-manual/en/configuration-index.xml 2009-12-02 10:07:13 UTC (rev 8494)
+++ trunk/docs/user-manual/en/configuration-index.xml 2009-12-02 10:12:45 UTC (rev 8495)
@@ -172,8 +172,8 @@
<entry>128 * 1024</entry>
</row>
<row>
- <entry><link linkend="configuring.message.journal.journal-max-aio"
- >journal-max-aio</link></entry>
+ <entry><link linkend="configuring.message.journal.journal-max-io"
+ >journal-max-io</link></entry>
<entry>Integer</entry>
<entry>the maximum number of write requests that can be in the AIO queue
at any one time</entry>
Modified: trunk/docs/user-manual/en/persistence.xml
===================================================================
--- trunk/docs/user-manual/en/persistence.xml 2009-12-02 10:07:13 UTC (rev 8494)
+++ trunk/docs/user-manual/en/persistence.xml 2009-12-02 10:12:45 UTC (rev 8495)
@@ -183,8 +183,8 @@
<para>Depending on how much data you expect your queues to contain at steady state
you should tune this number of files to match that total amount of data.</para>
</listitem>
- <listitem id="configuring.message.journal.journal-max-aio">
- <para><literal>journal-max-aio</literal></para>
+ <listitem id="configuring.message.journal.journal-max-io">
+ <para><literal>journal-max-io</literal></para>
<para>When using an AIO journal, write requests are queued up before being submitted
to AIO for execution. Then when AIO has completed them it calls HornetQ back.
This parameter controls the maximum number of write requests that can be in the
Modified: trunk/src/config/common/schema/hornetq-configuration.xsd
===================================================================
--- trunk/src/config/common/schema/hornetq-configuration.xsd 2009-12-02 10:07:13 UTC (rev 8494)
+++ trunk/src/config/common/schema/hornetq-configuration.xsd 2009-12-02 10:12:45 UTC (rev 8495)
@@ -166,7 +166,7 @@
</xsd:element>
<xsd:element maxOccurs="1" minOccurs="0" name="journal-compact-min-files" type="xsd:int">
</xsd:element>
- <xsd:element maxOccurs="1" minOccurs="0" name="journal-max-aio" type="xsd:int">
+ <xsd:element maxOccurs="1" minOccurs="0" name="journal-max-io" type="xsd:int">
</xsd:element>
<xsd:element maxOccurs="1" minOccurs="0" name="perf-blast-pages" type="xsd:int">
</xsd:element>
Modified: trunk/src/main/org/hornetq/core/config/impl/FileConfiguration.java
===================================================================
--- trunk/src/main/org/hornetq/core/config/impl/FileConfiguration.java 2009-12-02 10:07:13 UTC (rev 8494)
+++ trunk/src/main/org/hornetq/core/config/impl/FileConfiguration.java 2009-12-02 10:12:45 UTC (rev 8495)
@@ -348,7 +348,7 @@
GT_ZERO);
int journalMaxIO = getInteger(e,
- "journal-max-aio",
+ "journal-max-io",
journalType == JournalType.ASYNCIO ? DEFAULT_JOURNAL_MAX_IO_AIO
: DEFAULT_JOURNAL_MAX_IO_NIO,
GT_ZERO);
Modified: trunk/tests/config/ConfigurationTest-full-config.xml
===================================================================
--- trunk/tests/config/ConfigurationTest-full-config.xml 2009-12-02 10:07:13 UTC (rev 8494)
+++ trunk/tests/config/ConfigurationTest-full-config.xml 2009-12-02 10:12:45 UTC (rev 8495)
@@ -45,7 +45,7 @@
<journal-sync-non-transactional>true</journal-sync-non-transactional>
<journal-file-size>12345678</journal-file-size>
<journal-min-files>100</journal-min-files>
- <journal-max-aio>56546</journal-max-aio>
+ <journal-max-io>56546</journal-max-io>
<large-messages-directory>largemessagesdir</large-messages-directory>
<memory-warning-threshold>95</memory-warning-threshold>
<memory-measure-interval>54321</memory-measure-interval>
15 years, 1 month
JBoss hornetq SVN: r8494 - in trunk: examples/jms/bridge/server0 and 1 other directories.
by do-not-reply@jboss.org
Author: jmesnil
Date: 2009-12-02 05:07:13 -0500 (Wed, 02 Dec 2009)
New Revision: 8494
Modified:
trunk/docs/user-manual/en/core-bridges.xml
trunk/examples/jms/bridge/server0/hornetq-configuration.xml
trunk/examples/jms/bridge/server1/hornetq-configuration.xml
Log:
allow bridges to be deployed if clustered is false
* updated doc & examples
Modified: trunk/docs/user-manual/en/core-bridges.xml
===================================================================
--- trunk/docs/user-manual/en/core-bridges.xml 2009-12-02 09:47:39 UTC (rev 8493)
+++ trunk/docs/user-manual/en/core-bridges.xml 2009-12-02 10:07:13 UTC (rev 8494)
@@ -63,9 +63,6 @@
backup-connector-name="backup-remote-connector"/>
</bridge>
</programlisting>
- <para>Please also note that in order for bridges to be deployed on a server, the <literal
- >clustered</literal> attribute needs to be set to <literal>true</literal> in
- <literal>hornetq-configuration.xml</literal>.</para>
<para>In the above example we have shown all the parameters its possible to configure for a
bridge. In practice you might use many of the defaults so it won't be necessary to
specify them all explicitly.</para>
Modified: trunk/examples/jms/bridge/server0/hornetq-configuration.xml
===================================================================
--- trunk/examples/jms/bridge/server0/hornetq-configuration.xml 2009-12-02 09:47:39 UTC (rev 8493)
+++ trunk/examples/jms/bridge/server0/hornetq-configuration.xml 2009-12-02 10:07:13 UTC (rev 8494)
@@ -1,7 +1,6 @@
<configuration xmlns="urn:hornetq"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="urn:hornetq /schema/hornetq-configuration.xsd">
- <clustered>true</clustered>
<!-- Connectors -->
<connectors>
Modified: trunk/examples/jms/bridge/server1/hornetq-configuration.xml
===================================================================
--- trunk/examples/jms/bridge/server1/hornetq-configuration.xml 2009-12-02 09:47:39 UTC (rev 8493)
+++ trunk/examples/jms/bridge/server1/hornetq-configuration.xml 2009-12-02 10:07:13 UTC (rev 8494)
@@ -2,8 +2,6 @@
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="urn:hornetq /schema/hornetq-configuration.xsd">
- <clustered>true</clustered>
-
<!-- Connectors -->
<connectors>
15 years, 1 month
JBoss hornetq SVN: r8493 - in trunk: examples/jms/message-group and 2 other directories.
by do-not-reply@jboss.org
Author: ataylor
Date: 2009-12-02 04:47:39 -0500 (Wed, 02 Dec 2009)
New Revision: 8493
Modified:
trunk/docs/user-manual/en/message-grouping.xml
trunk/examples/jms/message-group/readme.html
trunk/examples/jms/message-group/src/org/hornetq/jms/example/MessageGroupExample.java
trunk/tests/src/org/hornetq/tests/integration/jms/bridge/JMSBridgeTest.java
Log:
removed JMSXGroupSeq references
Modified: trunk/docs/user-manual/en/message-grouping.xml
===================================================================
--- trunk/docs/user-manual/en/message-grouping.xml 2009-12-02 09:45:17 UTC (rev 8492)
+++ trunk/docs/user-manual/en/message-grouping.xml 2009-12-02 09:47:39 UTC (rev 8493)
@@ -44,19 +44,15 @@
<title>Using JMS</title>
<para>The property name used to identify the message group is <literal
>JMSXGroupID</literal>.</para>
- <para>Within the same group, messages can also set a <literal>JMSXGroupSeq</literal>
- <literal>int</literal> property (starting at 1).</para>
<programlisting>
// send 2 messages in the same group to ensure the same
// consumer will receive both
Message message = ...
message.setStringProperty("JMSXGroupID", "Group-0");
- message.setIntProperty("JMSXGroupSeq", 1);
producer.send(message);
message = ...
message.setStringProperty("JMSXGroupID", "Group-0");
- message.setIntProperty("JMSXGroupSeq", 2);
producer.send(message);
</programlisting>
<para>Alternatively, you can set <literal>autogroup</literal> to true on the <literal
Modified: trunk/examples/jms/message-group/readme.html
===================================================================
--- trunk/examples/jms/message-group/readme.html 2009-12-02 09:45:17 UTC (rev 8492)
+++ trunk/examples/jms/message-group/readme.html 2009-12-02 09:47:39 UTC (rev 8493)
@@ -79,7 +79,6 @@
{
groupMessages[i] = session.createTextMessage("Group-0 message " + i);
groupMessages[i].setStringProperty(HornetQMessage.JMSXGROUPID, "Group-0");
- groupMessages[i].setIntProperty("JMSXGroupSeq", i + 1);
producer.send(groupMessages[i]);
System.out.println("Sent message: " + groupMessages[i].getText());
}
Modified: trunk/examples/jms/message-group/src/org/hornetq/jms/example/MessageGroupExample.java
===================================================================
--- trunk/examples/jms/message-group/src/org/hornetq/jms/example/MessageGroupExample.java 2009-12-02 09:45:17 UTC (rev 8492)
+++ trunk/examples/jms/message-group/src/org/hornetq/jms/example/MessageGroupExample.java 2009-12-02 09:47:39 UTC (rev 8493)
@@ -82,7 +82,6 @@
{
groupMessages[i] = session.createTextMessage("Group-0 message " + i);
groupMessages[i].setStringProperty(HornetQMessage.JMSXGROUPID, "Group-0");
- groupMessages[i].setIntProperty("JMSXGroupSeq", i + 1);
producer.send(groupMessages[i]);
System.out.println("Sent message: " + groupMessages[i].getText());
}
@@ -136,10 +135,9 @@
try
{
TextMessage msg = (TextMessage)message;
- System.out.format("Message: [%s] received by %s, (%s in the group)\n",
+ System.out.format("Message: [%s] received by %s\n",
msg.getText(),
- name,
- msg.getIntProperty("JMSXGroupSeq"));
+ name);
messageReceiverMap.put(msg.getText(), name);
}
catch (JMSException e)
Modified: trunk/tests/src/org/hornetq/tests/integration/jms/bridge/JMSBridgeTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/jms/bridge/JMSBridgeTest.java 2009-12-02 09:45:17 UTC (rev 8492)
+++ trunk/tests/src/org/hornetq/tests/integration/jms/bridge/JMSBridgeTest.java 2009-12-02 09:47:39 UTC (rev 8493)
@@ -772,7 +772,6 @@
//We add some JMSX ones too
tm.setStringProperty("JMSXGroupID", "mygroup543");
- tm.setIntProperty("JMSXGroupSeq", 777);
prod.send(tm);
@@ -802,7 +801,6 @@
assertEquals(23, tm.getIntProperty("Sausages"));
assertEquals("mygroup543", tm.getStringProperty("JMSXGroupID"));
- assertEquals(777, tm.getIntProperty("JMSXGroupSeq"));
if (on)
{
@@ -847,8 +845,7 @@
assertTrue(tm.getBooleanProperty("cheese"));
assertEquals(23, tm.getIntProperty("Sausages"));
- assertEquals("mygroup543", tm.getStringProperty("JMSXGroupID"));
- assertEquals(777, tm.getIntProperty("JMSXGroupSeq"));
+ assertEquals("mygroup543", tm.getStringProperty("JMSXGroupID"));
String header = tm.getStringProperty(HornetQMessage.JBOSS_MESSAGING_BRIDGE_MESSAGE_ID_LIST);
15 years, 1 month
JBoss hornetq SVN: r8492 - in trunk: src/main/org/hornetq/core/remoting/impl/invm and 6 other directories.
by do-not-reply@jboss.org
Author: jmesnil
Date: 2009-12-02 04:45:17 -0500 (Wed, 02 Dec 2009)
New Revision: 8492
Modified:
trunk/src/main/org/hornetq/core/client/impl/FailoverManagerImpl.java
trunk/src/main/org/hornetq/core/remoting/impl/invm/InVMConnectorFactory.java
trunk/src/main/org/hornetq/core/remoting/spi/ConnectorFactory.java
trunk/src/main/org/hornetq/integration/transports/netty/NettyConnector.java
trunk/src/main/org/hornetq/integration/transports/netty/NettyConnectorFactory.java
trunk/tests/src/org/hornetq/tests/integration/cluster/distribution/NettySymmetricClusterWithBackupTest.java
trunk/tests/src/org/hornetq/tests/integration/cluster/distribution/NettySymmetricClusterWithDiscoveryTest.java
trunk/tests/src/org/hornetq/tests/integration/http/NettyHttpTest.java
trunk/tests/src/org/hornetq/tests/integration/largemessage/mock/MockConnectorFactory.java
trunk/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyConnectorTest.java
Log:
fixed connection cleanup ordering issue
* on the client side, use an ordered executor to ensure that runnables which
disconnect & destroy the connection are called in the correct order
Modified: trunk/src/main/org/hornetq/core/client/impl/FailoverManagerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/FailoverManagerImpl.java 2009-12-02 09:43:08 UTC (rev 8491)
+++ trunk/src/main/org/hornetq/core/client/impl/FailoverManagerImpl.java 2009-12-02 09:45:17 UTC (rev 8492)
@@ -20,6 +20,7 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
@@ -177,6 +178,8 @@
private final ScheduledExecutorService scheduledThreadPool;
+ private final Executor closeExecutor;
+
private RemotingConnection connection;
private final long retryInterval;
@@ -271,6 +274,8 @@
this.orderedExecutorFactory = new OrderedExecutorFactory(threadPool);
+ this.closeExecutor = orderedExecutorFactory.getExecutor();
+
this.interceptors = interceptors;
}
@@ -912,6 +917,7 @@
connector = connectorFactory.createConnector(transportParams,
handler,
this,
+ closeExecutor,
threadPool,
scheduledThreadPool);
@@ -1075,7 +1081,7 @@
if (type == PacketImpl.DISCONNECT)
{
- threadPool.execute(new Runnable()
+ closeExecutor.execute(new Runnable()
{
// Must be executed on new thread since cannot block the netty thread for a long time and fail can
// cause reconnect loop
Modified: trunk/src/main/org/hornetq/core/remoting/impl/invm/InVMConnectorFactory.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/impl/invm/InVMConnectorFactory.java 2009-12-02 09:43:08 UTC (rev 8491)
+++ trunk/src/main/org/hornetq/core/remoting/impl/invm/InVMConnectorFactory.java 2009-12-02 09:45:17 UTC (rev 8492)
@@ -33,6 +33,7 @@
public Connector createConnector(final Map<String, Object> configuration,
final BufferHandler handler,
final ConnectionLifeCycleListener listener,
+ final Executor closExecutor,
final Executor threadPool,
final ScheduledExecutorService scheduledThreadPool)
{
Modified: trunk/src/main/org/hornetq/core/remoting/spi/ConnectorFactory.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/spi/ConnectorFactory.java 2009-12-02 09:43:08 UTC (rev 8491)
+++ trunk/src/main/org/hornetq/core/remoting/spi/ConnectorFactory.java 2009-12-02 09:45:17 UTC (rev 8492)
@@ -29,6 +29,7 @@
{
Connector createConnector(Map<String, Object> configuration, BufferHandler handler,
ConnectionLifeCycleListener listener,
+ Executor closeExecutor,
Executor threadPool,
ScheduledExecutorService scheduledThreadPool);
Modified: trunk/src/main/org/hornetq/integration/transports/netty/NettyConnector.java
===================================================================
--- trunk/src/main/org/hornetq/integration/transports/netty/NettyConnector.java 2009-12-02 09:43:08 UTC (rev 8491)
+++ trunk/src/main/org/hornetq/integration/transports/netty/NettyConnector.java 2009-12-02 09:45:17 UTC (rev 8492)
@@ -134,6 +134,8 @@
private final ScheduledExecutorService scheduledThreadPool;
+ private final Executor closeExecutor;
+
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
@@ -143,6 +145,7 @@
public NettyConnector(final Map<String, Object> configuration,
final BufferHandler handler,
final ConnectionLifeCycleListener listener,
+ final Executor closeExecutor,
final Executor threadPool,
final ScheduledExecutorService scheduledThreadPool)
{
@@ -225,6 +228,8 @@
TransportConstants.DEFAULT_TCP_RECEIVEBUFFER_SIZE,
configuration);
+ this.closeExecutor = closeExecutor;
+
virtualExecutor = new VirtualExecutorService(threadPool);
this.scheduledThreadPool = scheduledThreadPool;
@@ -607,28 +612,26 @@
if (connections.remove(connectionID) != null)
{
// Execute on different thread to avoid deadlocks
- new Thread()
+ closeExecutor.execute(new Runnable()
{
- @Override
public void run()
{
listener.connectionDestroyed(connectionID);
}
- }.start();
+ });
}
}
public void connectionException(final Object connectionID, final HornetQException me)
{
// Execute on different thread to avoid deadlocks
- new Thread()
+ closeExecutor.execute(new Runnable()
{
- @Override
public void run()
{
listener.connectionException(connectionID, me);
}
- }.start();
+ });
}
}
Modified: trunk/src/main/org/hornetq/integration/transports/netty/NettyConnectorFactory.java
===================================================================
--- trunk/src/main/org/hornetq/integration/transports/netty/NettyConnectorFactory.java 2009-12-02 09:43:08 UTC (rev 8491)
+++ trunk/src/main/org/hornetq/integration/transports/netty/NettyConnectorFactory.java 2009-12-02 09:45:17 UTC (rev 8492)
@@ -33,10 +33,11 @@
public Connector createConnector(final Map<String, Object> configuration,
final BufferHandler handler,
final ConnectionLifeCycleListener listener,
+ final Executor closeExecutor,
final Executor threadPool,
final ScheduledExecutorService scheduledThreadPool)
{
- return new NettyConnector(configuration, handler, listener, threadPool, scheduledThreadPool);
+ return new NettyConnector(configuration, handler, listener, closeExecutor, threadPool, scheduledThreadPool);
}
public Set<String> getAllowableProperties()
Modified: trunk/tests/src/org/hornetq/tests/integration/cluster/distribution/NettySymmetricClusterWithBackupTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/cluster/distribution/NettySymmetricClusterWithBackupTest.java 2009-12-02 09:43:08 UTC (rev 8491)
+++ trunk/tests/src/org/hornetq/tests/integration/cluster/distribution/NettySymmetricClusterWithBackupTest.java 2009-12-02 09:45:17 UTC (rev 8492)
@@ -41,4 +41,15 @@
{
return false;
}
+
+ public void _test() throws Exception
+ {
+ for (int i = 0; i < 50; i++)
+ {
+ System.out.println("\n\n" + i + "\n\n");
+ testStartStopServers();
+ tearDown();
+ setUp();
+ }
+ }
}
Modified: trunk/tests/src/org/hornetq/tests/integration/cluster/distribution/NettySymmetricClusterWithDiscoveryTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/cluster/distribution/NettySymmetricClusterWithDiscoveryTest.java 2009-12-02 09:43:08 UTC (rev 8491)
+++ trunk/tests/src/org/hornetq/tests/integration/cluster/distribution/NettySymmetricClusterWithDiscoveryTest.java 2009-12-02 09:45:17 UTC (rev 8492)
@@ -34,5 +34,16 @@
{
return false;
}
+
+ public void _test() throws Exception
+ {
+ for (int i = 0; i < 50; i++)
+ {
+ System.out.println("\n\n" + i + "\n\n");
+ testStartStopServers();
+ tearDown();
+ setUp();
+ }
+ }
}
Modified: trunk/tests/src/org/hornetq/tests/integration/http/NettyHttpTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/http/NettyHttpTest.java 2009-12-02 09:43:08 UTC (rev 8491)
+++ trunk/tests/src/org/hornetq/tests/integration/http/NettyHttpTest.java 2009-12-02 09:45:17 UTC (rev 8492)
@@ -101,7 +101,7 @@
acceptor.start();
SimpleBufferHandler2 connectorHandler = new SimpleBufferHandler2(connectorLatch);
- connector = new NettyConnector(conf, connectorHandler, new DummyConnectionLifeCycleListener(null), threadPool, scheduledThreadPool);
+ connector = new NettyConnector(conf, connectorHandler, new DummyConnectionLifeCycleListener(null), threadPool, threadPool, scheduledThreadPool);
connector.start();
Connection conn = connector.createConnection();
connCreatedLatch.await(5, TimeUnit.SECONDS);
@@ -151,7 +151,7 @@
acceptor.start();
SimpleBufferHandler2 connectorHandler = new SimpleBufferHandler2(connectorLatch);
- connector = new NettyConnector(conf, connectorHandler, new DummyConnectionLifeCycleListener(null), threadPool, scheduledThreadPool);
+ connector = new NettyConnector(conf, connectorHandler, new DummyConnectionLifeCycleListener(null), threadPool, threadPool, scheduledThreadPool);
connector.start();
Connection conn = connector.createConnection();
connCreatedLatch.await(5, TimeUnit.SECONDS);
@@ -204,7 +204,7 @@
acceptor.start();
SimpleBufferHandler connectorHandler = new SimpleBufferHandler(connectorLatch);
- connector = new NettyConnector(conf, connectorHandler, new DummyConnectionLifeCycleListener(null), threadPool, scheduledThreadPool);
+ connector = new NettyConnector(conf, connectorHandler, new DummyConnectionLifeCycleListener(null), threadPool, threadPool, scheduledThreadPool);
connector.start();
Connection conn = connector.createConnection();
connCreatedLatch.await(5, TimeUnit.SECONDS);
@@ -258,7 +258,7 @@
acceptor.start();
SimpleBufferHandler connectorHandler = new SimpleBufferHandler(connectorLatch);
- connector = new NettyConnector(conf, connectorHandler, new DummyConnectionLifeCycleListener(null), threadPool, scheduledThreadPool);
+ connector = new NettyConnector(conf, connectorHandler, new DummyConnectionLifeCycleListener(null), threadPool, threadPool, scheduledThreadPool);
connector.start();
Connection conn = connector.createConnection();
connCreatedLatch.await(5, TimeUnit.SECONDS);
@@ -311,7 +311,7 @@
acceptor.start();
SimpleBufferHandler connectorHandler = new SimpleBufferHandler(connectorLatch);
- connector = new NettyConnector(conf, connectorHandler, new DummyConnectionLifeCycleListener(null), threadPool, scheduledThreadPool);
+ connector = new NettyConnector(conf, connectorHandler, new DummyConnectionLifeCycleListener(null), threadPool, threadPool, scheduledThreadPool);
connector.start();
Connection conn = connector.createConnection();
connCreatedLatch.await(5, TimeUnit.SECONDS);
@@ -360,7 +360,7 @@
acceptor.start();
BogusResponseHandler connectorHandler = new BogusResponseHandler(connectorLatch);
- connector = new NettyConnector(conf, connectorHandler, new DummyConnectionLifeCycleListener(null), threadPool, scheduledThreadPool);
+ connector = new NettyConnector(conf, connectorHandler, new DummyConnectionLifeCycleListener(null), threadPool, threadPool, scheduledThreadPool);
connector.start();
Connection conn = connector.createConnection();
connCreatedLatch.await(5, TimeUnit.SECONDS);
@@ -402,7 +402,7 @@
acceptor.start();
BogusResponseHandler connectorHandler = new BogusResponseHandler(connectorLatch);
- connector = new NettyConnector(conf, connectorHandler, new DummyConnectionLifeCycleListener(null), threadPool, scheduledThreadPool);
+ connector = new NettyConnector(conf, connectorHandler, new DummyConnectionLifeCycleListener(null), threadPool, threadPool, scheduledThreadPool);
connector.start();
Connection conn = connector.createConnection();
connCreatedLatch.await(5, TimeUnit.SECONDS);
Modified: trunk/tests/src/org/hornetq/tests/integration/largemessage/mock/MockConnectorFactory.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/largemessage/mock/MockConnectorFactory.java 2009-12-02 09:43:08 UTC (rev 8491)
+++ trunk/tests/src/org/hornetq/tests/integration/largemessage/mock/MockConnectorFactory.java 2009-12-02 09:45:17 UTC (rev 8492)
@@ -52,6 +52,7 @@
public Connector createConnector(final Map<String, Object> configuration,
final BufferHandler handler,
final ConnectionLifeCycleListener listener,
+ final Executor closeExecutor,
final Executor executor, ScheduledExecutorService scheduledThreadPool)
{
return new MockConnector(configuration, handler, listener);
Modified: trunk/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyConnectorTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyConnectorTest.java 2009-12-02 09:43:08 UTC (rev 8491)
+++ trunk/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyConnectorTest.java 2009-12-02 09:45:17 UTC (rev 8492)
@@ -69,7 +69,7 @@
}
};
- NettyConnector connector = new NettyConnector(params, handler, listener, Executors.newCachedThreadPool(), Executors.newScheduledThreadPool(5));
+ NettyConnector connector = new NettyConnector(params, handler, listener, Executors.newCachedThreadPool(), Executors.newCachedThreadPool(), Executors.newScheduledThreadPool(5));
connector.start();
assertTrue(connector.isStarted());
@@ -103,7 +103,7 @@
try
{
- new NettyConnector(params, null, listener, Executors.newCachedThreadPool(), Executors.newScheduledThreadPool(5));
+ new NettyConnector(params, null, listener, Executors.newCachedThreadPool(), Executors.newCachedThreadPool(), Executors.newScheduledThreadPool(5));
fail("Should throw Exception");
}
@@ -114,7 +114,7 @@
try
{
- new NettyConnector(params, handler, null, Executors.newCachedThreadPool(), Executors.newScheduledThreadPool(5));
+ new NettyConnector(params, handler, null, Executors.newCachedThreadPool(), Executors.newCachedThreadPool(), Executors.newScheduledThreadPool(5));
fail("Should throw Exception");
}
15 years, 1 month
JBoss hornetq SVN: r8491 - trunk/docs/user-manual/en.
by do-not-reply@jboss.org
Author: ataylor
Date: 2009-12-02 04:43:08 -0500 (Wed, 02 Dec 2009)
New Revision: 8491
Modified:
trunk/docs/user-manual/en/examples.xml
Log:
doc fix
Modified: trunk/docs/user-manual/en/examples.xml
===================================================================
--- trunk/docs/user-manual/en/examples.xml 2009-12-02 08:59:02 UTC (rev 8490)
+++ trunk/docs/user-manual/en/examples.xml 2009-12-02 09:43:08 UTC (rev 8491)
@@ -292,6 +292,11 @@
</itemizedlist>
</para>
</section>
+ <section id="examples.message-group2">
+ <title>Message Group</title>
+ <para>The <literal>message-group2</literal> example shows you how to configure and use
+ message groups with HornetQ via a connection factory.</para>
+ </section>
<section id="producer-rate-limiting-example">
<title>Message Producer Rate Limiting</title>
<para>The <literal>producer-rte-limit</literal> example demonstrates how, with HornetQ,
15 years, 1 month
JBoss hornetq SVN: r8490 - trunk/src/main/org/hornetq/core/persistence/impl/journal.
by do-not-reply@jboss.org
Author: jmesnil
Date: 2009-12-02 03:59:02 -0500 (Wed, 02 Dec 2009)
New Revision: 8490
Modified:
trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
Log:
Journal type selection
* fallback to NIO if ASYNCIO has been configured but libaio is not installed
Modified: trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2009-12-02 05:04:05 UTC (rev 8489)
+++ trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2009-12-02 08:59:02 UTC (rev 8490)
@@ -248,7 +248,7 @@
config.getJournalBufferTimeout_AIO(),
config.isLogJournalWriteRate());
}
- else if (config.getJournalType() == JournalType.NIO)
+ else if (journalTypeToUse == JournalType.NIO)
{
log.info("Using NIO Journal");
journalFF = new NIOSequentialFileFactory(journalDir,
15 years, 1 month
JBoss hornetq SVN: r8489 - in trunk/tests/src/org/hornetq/tests/unit/util: sizeof and 1 other directory.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2009-12-02 00:04:05 -0500 (Wed, 02 Dec 2009)
New Revision: 8489
Added:
trunk/tests/src/org/hornetq/tests/unit/util/sizeof/
trunk/tests/src/org/hornetq/tests/unit/util/sizeof/ObjectSizeOf.java
trunk/tests/src/org/hornetq/tests/unit/util/sizeof/ServerMessageSizeOf.java
trunk/tests/src/org/hornetq/tests/unit/util/sizeof/SizeOfBase.java
Log:
Adding sizeof tests (first commit)
Added: trunk/tests/src/org/hornetq/tests/unit/util/sizeof/ObjectSizeOf.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/util/sizeof/ObjectSizeOf.java (rev 0)
+++ trunk/tests/src/org/hornetq/tests/unit/util/sizeof/ObjectSizeOf.java 2009-12-02 05:04:05 UTC (rev 8489)
@@ -0,0 +1,34 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.unit.util.sizeof;
+
+/**
+ * Calculate the size of objects on the heap
+ * based on this article:
+ * http://www.javaworld.com/javaworld/javatips/jw-javatip130.html
+ *
+ * @author <mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public class ObjectSizeOf extends SizeOfBase
+{
+
+ @Override
+ protected Object newObject()
+ {
+ return new Object();
+ }
+
+}
Added: trunk/tests/src/org/hornetq/tests/unit/util/sizeof/ServerMessageSizeOf.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/util/sizeof/ServerMessageSizeOf.java (rev 0)
+++ trunk/tests/src/org/hornetq/tests/unit/util/sizeof/ServerMessageSizeOf.java 2009-12-02 05:04:05 UTC (rev 8489)
@@ -0,0 +1,52 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.unit.util.sizeof;
+
+import org.hornetq.core.server.impl.ServerMessageImpl;
+
+/**
+ * A ServerMessageSizeOf
+ *
+ * @author <mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public class ServerMessageSizeOf extends SizeOfBase
+{
+
+ @Override
+ protected Object newObject()
+ {
+ return new ServerMessageImpl(1, 2000);
+ }
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
Added: trunk/tests/src/org/hornetq/tests/unit/util/sizeof/SizeOfBase.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/util/sizeof/SizeOfBase.java (rev 0)
+++ trunk/tests/src/org/hornetq/tests/unit/util/sizeof/SizeOfBase.java 2009-12-02 05:04:05 UTC (rev 8489)
@@ -0,0 +1,100 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.unit.util.sizeof;
+
+import org.hornetq.tests.util.UnitTestCase;
+
+/**
+ * A Base class for tests that are calculating size of objects
+ *
+ * @author <mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public abstract class SizeOfBase extends UnitTestCase
+{
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ private final Runtime runtime = Runtime.getRuntime();
+
+ private static final int numberOfObjects = 10000;
+
+ public void testCalculateSize()
+ {
+ getMemorySize();
+ newObject();
+
+ int i = 0;
+ long heap1 = 0;
+ long heap2 = 0;
+ long totalMemory1 = 0;
+ long totalMemory2 = 0;
+
+
+ final Object obj[] = new Object[numberOfObjects];
+ // make sure we load the classes before
+
+ heap1 = getMemorySize();
+
+ totalMemory1 = runtime.totalMemory();
+
+ for (i = 0; i < numberOfObjects; i++)
+ {
+ obj[i] = newObject();
+ }
+
+ heap2 = getMemorySize();
+
+ totalMemory2 = runtime.totalMemory();
+
+ final int size = Math.round(((float)(heap2 - heap1)) / numberOfObjects);
+
+ if (totalMemory1 != totalMemory2)
+ {
+ System.out.println("Warning: JVM allocated more data what would make results invalid");
+ }
+
+ System.out.println("heap1 = " + heap1 + ", heap2 = " + heap2 + ", size = " + size);
+
+ }
+
+ private long getMemorySize()
+ {
+ for (int i = 0; i < 5; i++)
+ {
+ forceGC();
+ }
+ return runtime.totalMemory() - runtime.freeMemory();
+ }
+
+ protected abstract Object newObject();
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
15 years, 1 month
JBoss hornetq SVN: r8488 - in trunk/src/main/org/hornetq/core: journal/impl and 2 other directories.
by do-not-reply@jboss.org
Author: timfox
Date: 2009-12-01 18:48:59 -0500 (Tue, 01 Dec 2009)
New Revision: 8488
Modified:
trunk/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java
trunk/src/main/org/hornetq/core/journal/impl/TimedBuffer.java
trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
Log:
mainly remove log.infos
Modified: trunk/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java 2009-12-01 23:32:48 UTC (rev 8487)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java 2009-12-01 23:48:59 UTC (rev 8488)
@@ -153,7 +153,7 @@
private final boolean blockOnPersistentSend;
private final int minLargeMessageSize;
-
+
private final int initialMessagePacketSize;
private final boolean cacheLargeMessageClient;
@@ -253,7 +253,7 @@
this.cacheLargeMessageClient = cacheLargeMessageClient;
this.minLargeMessageSize = minLargeMessageSize;
-
+
this.initialMessagePacketSize = initialMessagePacketSize;
this.groupID = groupID;
@@ -694,9 +694,9 @@
if (consumer != null)
{
ClientMessageInternal clMessage = (ClientMessageInternal)message.getMessage();
-
+
clMessage.setDeliveryCount(message.getDeliveryCount());
-
+
clMessage.setFlowControlSize(message.getPacketSize());
consumer.handleMessage(clMessage);
@@ -779,7 +779,7 @@
{
return;
}
-
+
boolean resetCreditManager = false;
// We lock the channel to prevent any packets to be added to the resend
@@ -801,13 +801,13 @@
ReattachSessionResponseMessage response = (ReattachSessionResponseMessage)channel1.sendBlocking(request);
if (response.isReattached())
- {
+ {
// The session was found on the server - we reattached transparently ok
channel.replayCommands(response.getLastReceivedCommandID(), channel.getID());
}
else
- {
+ {
// The session wasn't found on the server - probably we're failing over onto a backup server where the
// session
// won't exist or the target server has been restarted - in this case the session will need to be recreated,
@@ -831,8 +831,8 @@
{
channel1.sendBlocking(createRequest);
retry = false;
- }
- catch(HornetQException e)
+ }
+ catch (HornetQException e)
{
// the session was created while its server was starting, retry it:
if (e.getCode() == HornetQException.SESSION_CREATION_REJECTED)
@@ -847,9 +847,8 @@
throw e;
}
}
- } while(retry);
-
- log.info("created session " + name);
+ }
+ while (retry);
channel.clearCommands();
@@ -869,16 +868,16 @@
Connection conn = channel.getConnection().getTransportConnection();
HornetQBuffer buffer = createConsumerRequest.encode(channel.getConnection());
-
+
conn.write(buffer, false);
int clientWindowSize = entry.getValue().getClientWindowSize();
-
+
if (clientWindowSize != 0)
{
SessionConsumerFlowCreditMessage packet = new SessionConsumerFlowCreditMessage(entry.getKey(),
clientWindowSize);
-
+
packet.setChannelID(channel.getID());
buffer = packet.encode(channel.getConnection());
@@ -931,9 +930,9 @@
if (resetCreditManager)
{
producerCreditManager.reset();
-
- //Also need to send more credits for consumers, otherwise the system could hand with the server
- //not having any credits to send
+
+ // Also need to send more credits for consumers, otherwise the system could hand with the server
+ // not having any credits to send
}
}
@@ -1400,15 +1399,14 @@
// consumer
if (windowSize != 0)
- {
+ {
channel.send(new SessionConsumerFlowCreditMessage(consumerID, windowSize));
}
return consumer;
}
- private ClientProducer internalCreateProducer(final SimpleString address,
- final int maxRate) throws HornetQException
+ private ClientProducer internalCreateProducer(final SimpleString address, final int maxRate) throws HornetQException
{
checkClosed();
@@ -1419,7 +1417,7 @@
autoCommitSends && blockOnNonPersistentSend,
autoCommitSends && blockOnPersistentSend,
autoGroup,
- groupID == null?null:new SimpleString(groupID),
+ groupID == null ? null : new SimpleString(groupID),
minLargeMessageSize,
channel);
Modified: trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java 2009-12-01 23:32:48 UTC (rev 8487)
+++ trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java 2009-12-01 23:48:59 UTC (rev 8488)
@@ -335,8 +335,6 @@
this.compactPercentage = (float)compactPercentage / 100f;
}
- log.info("creating journal with max io " + maxIO);
-
this.compactMinFiles = compactMinFiles;
this.fileSize = fileSize;
Modified: trunk/src/main/org/hornetq/core/journal/impl/TimedBuffer.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/TimedBuffer.java 2009-12-01 23:32:48 UTC (rev 8487)
+++ trunk/src/main/org/hornetq/core/journal/impl/TimedBuffer.java 2009-12-01 23:48:59 UTC (rev 8488)
@@ -97,9 +97,6 @@
public TimedBuffer(final int size, final int timeout, final boolean logRates)
{
- log.info("timed buffer size " + size);
- log.info("timed buffer timeout " + timeout);
-
bufferSize = size;
this.logRates = logRates;
Modified: trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2009-12-01 23:32:48 UTC (rev 8487)
+++ trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2009-12-01 23:48:59 UTC (rev 8488)
@@ -241,17 +241,16 @@
if (journalTypeToUse == JournalType.ASYNCIO)
{
- log.info("AIO journal selected");
+ log.info("Using AIO Journal");
journalFF = new AIOSequentialFileFactory(journalDir,
config.getJournalBufferSize_AIO(),
config.getJournalBufferTimeout_AIO(),
config.isLogJournalWriteRate());
- log.info("AIO loaded successfully");
}
else if (config.getJournalType() == JournalType.NIO)
{
- log.info("NIO Journal selected");
+ log.info("Using NIO Journal");
journalFF = new NIOSequentialFileFactory(journalDir,
true,
config.getJournalBufferSize_NIO(),
Modified: trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2009-12-01 23:32:48 UTC (rev 8487)
+++ trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2009-12-01 23:48:59 UTC (rev 8488)
@@ -1187,13 +1187,6 @@
}
initialised = true;
-
- log.info("********** initialised");
-
- if (System.getProperty("org.hornetq.opt.routeblast") != null)
- {
- runRouteBlast();
- }
}
/**
@@ -1481,197 +1474,7 @@
throw new IllegalArgumentException("Error instantiating transformer class \"" + className + "\"", e);
}
}
-
- // private void runRouteBlastNoWait() throws Exception
- // {
- // SimpleString address = new SimpleString("rbnw_address");
- // SimpleString queueName = new SimpleString("rbnw_name");
- //
- // createQueue(address, queueName, null, true, false, true);
- //
- // Queue queue = (Queue)postOffice.getBinding(queueName).getBindable();
- //
- // RBConsumer consumer = new RBConsumer(queue);
- //
- // queue.addConsumer(consumer);
- //
- // final int bodySize = 1024;
- //
- // byte[] body = new byte[bodySize];
- //
- // final int numMessages = 10000000;
- //
- // for (int i = 0; i < numMessages; i++)
- // {
- // final ServerMessage msg = new ServerMessageImpl(storageManager.generateUniqueID(), 1500);
- //
- // msg.getBodyBuffer().writeBytes(body);
- //
- // msg.setDestination(address);
- //
- // msg.setDurable(true);
- //
- // postOffice.route(msg);
- // }
- // }
-
- private LinkedBlockingQueue<RouteBlastRunner> available = new LinkedBlockingQueue<RouteBlastRunner>();
-
- private void runRouteBlast() throws Exception
- {
- log.info("*** running route blast");
-
- final int numThreads = 1;
-
- final int numClients = 1000;
-
- for (int i = 0; i < numClients; i++)
- {
- RouteBlastRunner run = new RouteBlastRunner(new SimpleString("fooaddress" + i));
-
- run.setup();
-
- available.add(run);
- }
-
- log.info("setup, now running");
-
- Set<Thread> runners = new HashSet<Thread>();
-
- for (int i = 0; i < numThreads; i++)
- {
- Thread t = new Thread(new Foo());
-
- runners.add(t);
-
- t.start();
- }
-
- for (Thread t : runners)
- {
- t.join();
- }
- }
-
- class RouteBlastRunner implements Runnable
- {
- private SimpleString address;
-
- private Set<Consumer> consumers = new HashSet<Consumer>();
-
- RouteBlastRunner(SimpleString address)
- {
- this.address = address;
- }
-
- public void setup() throws Exception
- {
- final int numQueues = 1;
-
- for (int i = 0; i < numQueues; i++)
- {
- SimpleString queueName = new SimpleString(address + ".hq.route_blast_queue" + i);
-
- createQueue(address, queueName, null, true, false, true);
-
- Queue queue = (Queue)postOffice.getBinding(queueName).getBindable();
-
- RBConsumer consumer = new RBConsumer(queue);
-
- queue.addConsumer(consumer);
-
- // log.info("added consumer to queue " + queue);
-
- consumers.add(consumer);
- }
- }
-
- public void run()
- {
- try
- {
- final int bodySize = 1024;
-
- byte[] body = new byte[bodySize];
-
- final ServerMessage msg = new ServerMessageImpl(storageManager.generateUniqueID(), 1500);
-
- msg.getBodyBuffer().writeBytes(body);
-
- msg.setDestination(address);
-
- msg.setDurable(true);
-
- postOffice.route(msg);
-
- storageManager.afterCompleteOperations(new IOAsyncTask()
- {
- public void onError(int errorCode, String errorMessage)
- {
- log.error("Error processing IOCallback code = " + errorCode + " message = " + errorMessage);
- }
-
- public void done()
- {
- available.add(RouteBlastRunner.this);
- }
- });
- }
- catch (Exception e)
- {
- log.error("Failed to run runner", e);
- }
-
- }
- }
-
- class Foo implements Runnable
- {
- public void run()
- {
- for (int i = 0; i < 1000000; i++)
- {
- try
- {
- RouteBlastRunner runner = available.take();
-
- runner.run();
- }
- catch (InterruptedException e)
- {
- log.error("Interrupted", e);
- }
- }
- }
- }
-
- private class RBConsumer implements Consumer
- {
- private Queue queue;
-
- RBConsumer(Queue queue)
- {
- this.queue = queue;
- }
-
- public Filter getFilter()
- {
- return null;
- }
-
- public HandleStatus handle(MessageReference reference) throws Exception
- {
- reference.handled();
-
- queue.acknowledge(reference);
-
- // log.info("acking");
-
- return HandleStatus.HANDLED;
- }
-
- }
-
+
// Inner classes
// --------------------------------------------------------------------------------
}
15 years, 1 month
JBoss hornetq SVN: r8487 - in trunk/src/main/org/hornetq/core/server: impl and 1 other directory.
by do-not-reply@jboss.org
Author: timfox
Date: 2009-12-01 18:32:48 -0500 (Tue, 01 Dec 2009)
New Revision: 8487
Modified:
trunk/src/main/org/hornetq/core/server/Queue.java
trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java
trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
Log:
a few tweaks
Modified: trunk/src/main/org/hornetq/core/server/Queue.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/Queue.java 2009-12-01 23:13:07 UTC (rev 8486)
+++ trunk/src/main/org/hornetq/core/server/Queue.java 2009-12-01 23:32:48 UTC (rev 8487)
@@ -120,7 +120,6 @@
boolean hasMatchingConsumer(ServerMessage message);
- // Only used in testing
void deliverNow();
Collection<Consumer> getConsumers();
Modified: trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2009-12-01 23:13:07 UTC (rev 8486)
+++ trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2009-12-01 23:32:48 UTC (rev 8487)
@@ -674,8 +674,7 @@
securityStore,
sessionExecutor,
channel,
- managementService,
- // queueFactory,
+ managementService,
this,
configuration.getManagementAddress());
Modified: trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java 2009-12-01 23:13:07 UTC (rev 8486)
+++ trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java 2009-12-01 23:32:48 UTC (rev 8487)
@@ -26,7 +26,6 @@
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@@ -107,8 +106,6 @@
private final Runnable deliverRunner = new DeliverRunner();
- private final Semaphore lock = new Semaphore(1);
-
private final StorageManager storageManager;
private final HierarchicalRepository<AddressSettings> addressSettingsRepository;
@@ -1342,10 +1339,7 @@
{
// We have consumers with filters which don't match, so we need
// to prompt delivery every time
- // a new message arrives - this is why you really shouldn't use
- // filters with queues - in most cases
- // it's an ant-pattern since it would cause a queue scan on each
- // message
+ // a new message arrives
deliver();
}
}
Modified: trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java 2009-12-01 23:13:07 UTC (rev 8486)
+++ trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java 2009-12-01 23:32:48 UTC (rev 8487)
@@ -105,7 +105,7 @@
private Runnable browserDeliverer;
- private final boolean updateDeliveries;
+ private final boolean strictUpdateDeliveryCount;
private final StorageManager storageManager;
@@ -132,7 +132,7 @@
final StorageManager storageManager,
final Channel channel,
final boolean preAcknowledge,
- final boolean updateDeliveries,
+ final boolean strictUpdateDeliveryCount,
final Executor executor,
final ManagementService managementService) throws Exception
{
@@ -163,7 +163,7 @@
minLargeMessageSize = session.getMinLargeMessageSize();
- this.updateDeliveries = updateDeliveries;
+ this.strictUpdateDeliveryCount = strictUpdateDeliveryCount;
if (browseOnly)
{
@@ -229,7 +229,7 @@
// If updateDeliveries = false (set by strict-update),
// the updateDeliveryCount would still be updated after cancel
- if (updateDeliveries)
+ if (strictUpdateDeliveryCount)
{
if (ref.getMessage().isDurable() && ref.getQueue().isDurable())
{
Modified: trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2009-12-01 23:13:07 UTC (rev 8486)
+++ trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2009-12-01 23:32:48 UTC (rev 8487)
@@ -155,7 +155,7 @@
private final boolean preAcknowledge;
- private final boolean updateDeliveries;
+ private final boolean strictUpdateDeliveryCount;
private RemotingConnection remotingConnection;
@@ -205,7 +205,7 @@
final boolean autoCommitSends,
final boolean autoCommitAcks,
final boolean preAcknowledge,
- final boolean updateDeliveries,
+ final boolean strictUpdateDeliveryCount,
final boolean xa,
final RemotingConnection remotingConnection,
final StorageManager storageManager,
@@ -249,7 +249,7 @@
tx = new TransactionImpl(storageManager);
}
- this.updateDeliveries = updateDeliveries;
+ this.strictUpdateDeliveryCount = strictUpdateDeliveryCount;
this.channel = channel;
@@ -404,7 +404,7 @@
storageManager,
channel,
preAcknowledge,
- updateDeliveries,
+ strictUpdateDeliveryCount,
executor,
managementService);
15 years, 1 month