JBoss hornetq SVN: r11554 - trunk/tests/unit-tests/src/test/java/org/hornetq/tests/unit/core/config/impl.
by do-not-reply@jboss.org
Author: borges
Date: 2011-10-17 07:07:39 -0400 (Mon, 17 Oct 2011)
New Revision: 11554
Modified:
trunk/tests/unit-tests/src/test/java/org/hornetq/tests/unit/core/config/impl/ConfigurationValidationTest.java
Log:
FIX: Correct the reference path of searched resource.
When using Ant the classpath would be manually modified to include the
"schema" folder. In practice, leaving "schema/" out tests only for the
manual classpath modification.
Modified: trunk/tests/unit-tests/src/test/java/org/hornetq/tests/unit/core/config/impl/ConfigurationValidationTest.java
===================================================================
--- trunk/tests/unit-tests/src/test/java/org/hornetq/tests/unit/core/config/impl/ConfigurationValidationTest.java 2011-10-17 09:13:09 UTC (rev 11553)
+++ trunk/tests/unit-tests/src/test/java/org/hornetq/tests/unit/core/config/impl/ConfigurationValidationTest.java 2011-10-17 11:07:39 UTC (rev 11554)
@@ -24,7 +24,7 @@
* A ConfigurationValidationTr
*
* @author jmesnil
- *
+ *
* Created 22 janv. 2009 14:53:19
*
*
@@ -51,7 +51,7 @@
String xml = "<configuration xmlns='urn:hornetq'>" + "</configuration>";
Element element = XMLUtil.stringToElement(xml);
Assert.assertNotNull(element);
- XMLUtil.validate(element, "hornetq-configuration.xsd");
+ XMLUtil.validate(element, "schema/hornetq-configuration.xsd");
}
public void testFullConfiguration() throws Exception
13 years, 2 months
JBoss hornetq SVN: r11553 - trunk.
by do-not-reply@jboss.org
Author: borges
Date: 2011-10-17 05:13:09 -0400 (Mon, 17 Oct 2011)
New Revision: 11553
Modified:
trunk/pom.xml
Log:
Set findbugs failOnError=false
Modified: trunk/pom.xml
===================================================================
--- trunk/pom.xml 2011-10-17 07:57:52 UTC (rev 11552)
+++ trunk/pom.xml 2011-10-17 09:13:09 UTC (rev 11553)
@@ -562,6 +562,7 @@
<configuration>
<xmlOutput>true</xmlOutput>
<effort>Max</effort>
+ <failOnError>false</failOnError>
</configuration>
</plugin>
13 years, 2 months
JBoss hornetq SVN: r11552 - in branches: STOMP11_PERF and 1 other directory.
by do-not-reply@jboss.org
Author: gaohoward
Date: 2011-10-17 03:57:52 -0400 (Mon, 17 Oct 2011)
New Revision: 11552
Added:
branches/STOMP11_PERF/
Log:
branch for stomp perf
Property changes on: branches/STOMP11_PERF
___________________________________________________________________
Added: svn:ignore
+ build
eclipse-output
thirdparty
logs
ObjectStore
tmp
data
junit*.properties
target
.metadata
Added: svn:mergeinfo
+ /branches/HORNETQ-720_Replication:10878-11528
/branches/STOMP11:11225-11517
13 years, 2 months
JBoss hornetq SVN: r11551 - branches.
by do-not-reply@jboss.org
Author: gaohoward
Date: 2011-10-17 03:56:49 -0400 (Mon, 17 Oct 2011)
New Revision: 11551
Removed:
branches/STOMP11_PERF/
Log:
remove branch
13 years, 2 months
JBoss hornetq SVN: r11550 - trunk/docs/user-manual/en.
by do-not-reply@jboss.org
Author: gaohoward
Date: 2011-10-17 01:37:10 -0400 (Mon, 17 Oct 2011)
New Revision: 11550
Modified:
trunk/docs/user-manual/en/interoperability.xml
Log:
added notes for stomp 1.1 impl
Modified: trunk/docs/user-manual/en/interoperability.xml
===================================================================
--- trunk/docs/user-manual/en/interoperability.xml 2011-10-17 03:11:44 UTC (rev 11549)
+++ trunk/docs/user-manual/en/interoperability.xml 2011-10-17 05:37:10 UTC (rev 11550)
@@ -49,6 +49,21 @@
<para>Message acknowledgements are not transactional. The ACK frame can not be part of a transaction
(it will be ignored if its <literal>transaction</literal> header is set).</para>
</section>
+ <section>
+ <title>Stomp 1.1 Notes</title>
+ <section>
+ <title>Virtual Hosting</title>
+ <para>HornetQ currently doesn't support virtual hosting, which means the 'host' header
+ in CONNECT fram will be ignored.</para>
+ </section>
+ <section>
+ <title>Heart-beating</title>
+ <para>HornetQ specifies a minimum value for both client and server heart-beat intervals.
+ The minimum interval for both client and server heartbeats is 500 milliseconds. That means if
+ a client sends a CONNECT frame with heartbeat values lower than 500, the server will defaults
+ the value to 500 milliseconds regardless the values of the 'heart-beat' header in the frame.</para>
+ </section>
+ </section>
</section>
<section>
13 years, 2 months
JBoss hornetq SVN: r11549 - in trunk/examples/jms: stomp1.1 and 6 other directories.
by do-not-reply@jboss.org
Author: gaohoward
Date: 2011-10-16 23:11:44 -0400 (Sun, 16 Oct 2011)
New Revision: 11549
Added:
trunk/examples/jms/stomp1.1/
trunk/examples/jms/stomp1.1/build.bat
trunk/examples/jms/stomp1.1/build.sh
trunk/examples/jms/stomp1.1/build.xml
trunk/examples/jms/stomp1.1/readme.html
trunk/examples/jms/stomp1.1/server0/
trunk/examples/jms/stomp1.1/server0/client-jndi.properties
trunk/examples/jms/stomp1.1/server0/hornetq-beans.xml
trunk/examples/jms/stomp1.1/server0/hornetq-configuration.xml
trunk/examples/jms/stomp1.1/server0/hornetq-jms.xml
trunk/examples/jms/stomp1.1/server0/hornetq-users.xml
trunk/examples/jms/stomp1.1/src/
trunk/examples/jms/stomp1.1/src/org/
trunk/examples/jms/stomp1.1/src/org/hornetq/
trunk/examples/jms/stomp1.1/src/org/hornetq/jms/
trunk/examples/jms/stomp1.1/src/org/hornetq/jms/example/
trunk/examples/jms/stomp1.1/src/org/hornetq/jms/example/StompExample.java
Log:
Adding a Stomp 1.1 example
Added: trunk/examples/jms/stomp1.1/build.bat
===================================================================
--- trunk/examples/jms/stomp1.1/build.bat (rev 0)
+++ trunk/examples/jms/stomp1.1/build.bat 2011-10-17 03:11:44 UTC (rev 11549)
@@ -0,0 +1,13 @@
+@echo off
+
+set "OVERRIDE_ANT_HOME=..\..\..\tools\ant"
+
+if exist "..\..\..\src\bin\build.bat" (
+ rem running from TRUNK
+ call ..\..\..\src\bin\build.bat %*
+) else (
+ rem running from the distro
+ call ..\..\..\bin\build.bat %*
+)
+
+set "OVERRIDE_ANT_HOME="
Added: trunk/examples/jms/stomp1.1/build.sh
===================================================================
--- trunk/examples/jms/stomp1.1/build.sh (rev 0)
+++ trunk/examples/jms/stomp1.1/build.sh 2011-10-17 03:11:44 UTC (rev 11549)
@@ -0,0 +1,15 @@
+#!/bin/sh
+
+OVERRIDE_ANT_HOME=../../tools/ant
+export OVERRIDE_ANT_HOME
+
+if [ -f "../../../src/bin/build.sh" ]; then
+ # running from TRUNK
+ ../../../src/bin/build.sh "$@"
+else
+ # running from the distro
+ ../../bin/build.sh "$@"
+fi
+
+
+
Property changes on: trunk/examples/jms/stomp1.1/build.sh
___________________________________________________________________
Added: svn:executable
+ *
Added: trunk/examples/jms/stomp1.1/build.xml
===================================================================
--- trunk/examples/jms/stomp1.1/build.xml (rev 0)
+++ trunk/examples/jms/stomp1.1/build.xml 2011-10-17 03:11:44 UTC (rev 11549)
@@ -0,0 +1,34 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!DOCTYPE project [
+ <!ENTITY libraries SYSTEM "../../../thirdparty/libraries.ent">
+ ]>
+<!--
+ ~ Copyright 2009 Red Hat, Inc.
+ ~ Red Hat licenses this file to you under the Apache License, version
+ ~ 2.0 (the "License"); you may not use this file except in compliance
+ ~ with the License. You may obtain a copy of the License at
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ ~ implied. See the License for the specific language governing
+ ~ permissions and limitations under the License.
+ -->
+<project default="run" name="HornetQ Stomp Example">
+
+ <import file="../../common/build.xml"/>
+
+ <target name="run">
+ <antcall target="runExample">
+ <param name="example.classname" value="org.hornetq.jms.example.StompExample"/>
+ </antcall>
+ </target>
+
+ <target name="runRemote">
+ <antcall target="runExample">
+ <param name="example.classname" value="org.hornetq.jms.example.StompExample"/>
+ <param name="hornetq.example.runServer" value="false"/>
+ </antcall>
+ </target>
+
+</project>
\ No newline at end of file
Added: trunk/examples/jms/stomp1.1/readme.html
===================================================================
--- trunk/examples/jms/stomp1.1/readme.html (rev 0)
+++ trunk/examples/jms/stomp1.1/readme.html 2011-10-17 03:11:44 UTC (rev 11549)
@@ -0,0 +1,119 @@
+<html>
+ <head>
+ <title>HornetQ Stomp 1.1 Example</title>
+ <link rel="stylesheet" type="text/css" href="../../common/common.css" />
+ <link rel="stylesheet" type="text/css" href="../../common/prettify.css" />
+ <script type="text/javascript" src="../../common/prettify.js"></script>
+ </head>
+ <body onload="prettyPrint()">
+ <h1>Stomp 1.1 Example</h1>
+
+ <p>This example shows you how to configure HornetQ to send and receive Stomp messages using Stomp 1.1 protocol.</p>
+ <p>The example will start a HornetQ server configured with Stomp and JMS.</p>
+ <p>The client will open a socket to initiate a Stomp 1.1 connection and then send one Stomp message (using TCP directly).
+ The client will then consume a message from a JMS Queue and check it is the message sent with Stomp.</p>
+
+ <h2>Example step-by-step</h2>
+ <p><i>To run the example, simply type <code>./build.sh</code> (or <code>build.bat</code> on windows) from this directory</i></p>
+
+ <ol>
+ <li>We create a TCP socket to connect to the Stomp port
+ <pre class="prettyprint">
+ Socket socket = new Socket("localhost", 61613);
+ </pre>
+
+ <li>We negotiate a Stomp 1.1 connection to the server</li>
+ <pre class="prettyprint">
+ String connectFrame = "CONNECT\n" +
+ "accept-version:1.1\n" +
+ "host:localhost\n" +
+ "login:guest\n" +
+ "passcode:guest\n" +
+ "request-id:1\n" +
+ "\n" +
+ END_OF_FRAME;
+ sendFrame(socket, connectFrame);
+ </pre>
+
+ <li>We receive a response showing that the connection version</li>
+ <pre>
+ String response = receiveFrame(socket);
+ System.out.println("response: " + response);
+ </pre>
+
+ <li>We send a SEND frame (a Stomp message) to the destination <code>jms.queue.exampleQueue</code>
+ (which corresponds to the HornetQ address for the JMS Queue <code>exampleQueue</code>) with a text body</li>
+ <pre class="prettyprint">
+ String text = "Hello World from Stomp 1.1 !";
+ String message = "SEND\n" +
+ "destination:jms.queue.exampleQueue\n" +
+ "\n" +
+ text +
+ END_OF_FRAME;
+ sendFrame(socket, message);
+ System.out.println("Sent Stomp message: " + text);
+ </pre>
+
+ <li>We send a DISCONNECT frame to disconnect from the server</li>
+ <pre class="prettyprint">
+ String disconnectFrame = "DISCONNECT\n" +
+ "\n" +
+ Stomp.NULL;
+ sendFrame(socket, disconnectFrame);
+ </pre>
+
+ <li>We close the TCP socket</li>
+ <pre class="prettyprint">
+ socket.close();
+ </pre>
+
+ <li>We create an initial context to perform the JNDI lookup.</li>
+ <pre class="prettyprint">
+ initialContext = getContext(0);
+ </pre>
+
+ <li>We perform a lookup on the queue and the connection factory</li>
+ <pre class="prettyprint">
+ Queue queue = (Queue)initialContext.lookup("/queue/exampleQueue");
+ ConnectionFactory cf = (ConnectionFactory)initialContext.lookup("/ConnectionFactory");
+ </pre>
+
+ <li>We create a JMS Connection, Session and a MessageConsumer on the queue</li>
+ <pre class="prettyprint">
+ connection = cf.createConnection();
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageConsumer consumer = session.createConsumer(queue);
+ </pre>
+
+ <li>We start the connection</li>
+ <pre class="prettyprint">
+ <code>connection.start();</code>
+ </pre>
+
+ <li>We receive the message. Stomp messages are mapped to JMS TextMessage.</li>
+ <pre class="prettyprint">
+ TextMessage messageReceived = (TextMessage)consumer.receive(5000);
+ System.out.println("Received JMS message: " + messageReceived.getText());
+ </pre>
+
+ <li>And finally, <b>always</b> remember to close your JMS connections and resources after use, in a <code>finally</code> block. Closing a JMS connection will automatically close all of its sessions, consumers, producer and browser objects</li>
+
+ <pre class="prettyprint">
+ <code>finally
+ {
+ if (initialContext != null)
+ {
+ initialContext.close();
+ }
+ if (connection != null)
+ {
+ connection.close();
+ }
+ }</code>
+ </pre>
+
+
+
+ </ol>
+ </body>
+</html>
Added: trunk/examples/jms/stomp1.1/server0/client-jndi.properties
===================================================================
--- trunk/examples/jms/stomp1.1/server0/client-jndi.properties (rev 0)
+++ trunk/examples/jms/stomp1.1/server0/client-jndi.properties 2011-10-17 03:11:44 UTC (rev 11549)
@@ -0,0 +1,3 @@
+java.naming.factory.initial=org.jnp.interfaces.NamingContextFactory
+java.naming.provider.url=jnp://localhost:1099
+java.naming.factory.url.pkgs=org.jboss.naming:org.jnp.interfaces
Added: trunk/examples/jms/stomp1.1/server0/hornetq-beans.xml
===================================================================
--- trunk/examples/jms/stomp1.1/server0/hornetq-beans.xml (rev 0)
+++ trunk/examples/jms/stomp1.1/server0/hornetq-beans.xml 2011-10-17 03:11:44 UTC (rev 11549)
@@ -0,0 +1,59 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<deployment xmlns="urn:jboss:bean-deployer:2.0">
+
+ <bean name="Naming" class="org.jnp.server.NamingBeanImpl"/>
+
+ <!-- JNDI server. Disable this if you don't want JNDI -->
+ <bean name="JNDIServer" class="org.jnp.server.Main">
+ <property name="namingInfo">
+ <inject bean="Naming"/>
+ </property>
+ <property name="port">1099</property>
+ <property name="bindAddress">localhost</property>
+ <property name="rmiPort">1098</property>
+ <property name="rmiBindAddress">localhost</property>
+ </bean>
+
+ <!-- MBean server -->
+ <bean name="MBeanServer" class="javax.management.MBeanServer">
+ <constructor factoryClass="java.lang.management.ManagementFactory"
+ factoryMethod="getPlatformMBeanServer"/>
+ </bean>
+
+ <!-- The core configuration -->
+ <bean name="Configuration" class="org.hornetq.core.config.impl.FileConfiguration"/>
+
+ <!-- The security manager -->
+ <bean name="HornetQSecurityManager" class="org.hornetq.spi.core.security.HornetQSecurityManagerImpl">
+ <start ignored="true"/>
+ <stop ignored="true"/>
+ </bean>
+
+ <!-- The core server -->
+ <bean name="HornetQServer" class="org.hornetq.core.server.impl.HornetQServerImpl">
+ <constructor>
+ <parameter>
+ <inject bean="Configuration"/>
+ </parameter>
+ <parameter>
+ <inject bean="MBeanServer"/>
+ </parameter>
+ <parameter>
+ <inject bean="HornetQSecurityManager"/>
+ </parameter>
+ </constructor>
+ <start ignored="true"/>
+ <stop ignored="true"/>
+ </bean>
+
+ <!-- The JMS server -->
+ <bean name="JMSServerManager" class="org.hornetq.jms.server.impl.JMSServerManagerImpl">
+ <constructor>
+ <parameter>
+ <inject bean="HornetQServer"/>
+ </parameter>
+ </constructor>
+ </bean>
+
+</deployment>
Added: trunk/examples/jms/stomp1.1/server0/hornetq-configuration.xml
===================================================================
--- trunk/examples/jms/stomp1.1/server0/hornetq-configuration.xml (rev 0)
+++ trunk/examples/jms/stomp1.1/server0/hornetq-configuration.xml 2011-10-17 03:11:44 UTC (rev 11549)
@@ -0,0 +1,42 @@
+<configuration xmlns="urn:hornetq"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="urn:hornetq /schema/hornetq-configuration.xsd">
+
+ <!-- Connectors -->
+
+ <connectors>
+ <connector name="netty-connector">
+ <factory-class>org.hornetq.core.remoting.impl.netty.NettyConnectorFactory</factory-class>
+ </connector>
+ </connectors>
+
+ <!-- Acceptors -->
+ <acceptors>
+ <!-- a regular Netty acceptor used by the JMS client -->
+ <acceptor name="netty-acceptor">
+ <factory-class>org.hornetq.core.remoting.impl.netty.NettyAcceptorFactory</factory-class>
+ </acceptor>
+ <!-- the stomp-acceptor is configured for the Stomp protocol and -->
+ <!-- will listen on port 61613 (default Stomp port) -->
+ <acceptor name="stomp-acceptor">
+ <factory-class>org.hornetq.core.remoting.impl.netty.NettyAcceptorFactory</factory-class>
+ <param key="protocol" value="stomp" />
+ <param key="port" value="61613" />
+ </acceptor>
+ </acceptors>
+
+ <!-- Other config -->
+
+ <security-settings>
+ <!--security for example queue-->
+ <security-setting match="jms.queue.exampleQueue">
+ <permission type="createDurableQueue" roles="guest"/>
+ <permission type="deleteDurableQueue" roles="guest"/>
+ <permission type="createNonDurableQueue" roles="guest"/>
+ <permission type="deleteNonDurableQueue" roles="guest"/>
+ <permission type="consume" roles="guest"/>
+ <permission type="send" roles="guest"/>
+ </security-setting>
+ </security-settings>
+
+</configuration>
Added: trunk/examples/jms/stomp1.1/server0/hornetq-jms.xml
===================================================================
--- trunk/examples/jms/stomp1.1/server0/hornetq-jms.xml (rev 0)
+++ trunk/examples/jms/stomp1.1/server0/hornetq-jms.xml 2011-10-17 03:11:44 UTC (rev 11549)
@@ -0,0 +1,19 @@
+<configuration xmlns="urn:hornetq"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="urn:hornetq /schema/hornetq-jms.xsd">
+ <!--the connection factory used by the example-->
+ <connection-factory name="ConnectionFactory">
+ <connectors>
+ <connector-ref connector-name="netty-connector"/>
+ </connectors>
+ <entries>
+ <entry name="ConnectionFactory"/>
+ </entries>
+ </connection-factory>
+
+ <!--the queue used by the example-->
+ <queue name="exampleQueue">
+ <entry name="/queue/exampleQueue"/>
+ </queue>
+
+</configuration>
Added: trunk/examples/jms/stomp1.1/server0/hornetq-users.xml
===================================================================
--- trunk/examples/jms/stomp1.1/server0/hornetq-users.xml (rev 0)
+++ trunk/examples/jms/stomp1.1/server0/hornetq-users.xml 2011-10-17 03:11:44 UTC (rev 11549)
@@ -0,0 +1,7 @@
+<configuration xmlns="urn:hornetq" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="urn:hornetq /schema/hornetq-users.xsd">
+ <!-- the default user. this is used where username is null-->
+ <defaultuser name="guest" password="guest">
+ <role name="guest"/>
+ </defaultuser>
+</configuration>
\ No newline at end of file
Added: trunk/examples/jms/stomp1.1/src/org/hornetq/jms/example/StompExample.java
===================================================================
--- trunk/examples/jms/stomp1.1/src/org/hornetq/jms/example/StompExample.java (rev 0)
+++ trunk/examples/jms/stomp1.1/src/org/hornetq/jms/example/StompExample.java 2011-10-17 03:11:44 UTC (rev 11549)
@@ -0,0 +1,152 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.hornetq.jms.example;
+
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.Socket;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.MessageConsumer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.naming.InitialContext;
+
+import org.hornetq.common.example.HornetQExample;
+
+/**
+ * An example where a Stomp 1.1 client sends a message on a TCP socket
+ * and consumes it from a JMS MessageConsumer.
+ *
+ * @author <a href="mailto:jmesnil@redhat.com">Jeff Mesnil</a>
+ * @author <a href="mailto:hgao@redhat.com">Howard Gao</a>
+ */
+public class StompExample extends HornetQExample
+{
+ private static final String END_OF_FRAME = "\u0000";
+
+ public static void main(final String[] args)
+ {
+ new StompExample().run(args);
+ }
+
+ @Override
+ public boolean runExample() throws Exception
+ {
+ Connection connection = null;
+ InitialContext initialContext = null;
+
+ try
+ {
+ // Step 1. Create a TCP socket to connect to the Stomp port
+ Socket socket = new Socket("localhost", 61613);
+
+ // Step 2. Send a CONNECT frame to connect to the server
+ String connectFrame = "CONNECT\n" +
+ "accept-version:1.1\n" +
+ "host:localhost\n" +
+ "login:guest\n" +
+ "passcode:guest\n" +
+ "request-id:1\n" +
+ "\n" +
+ END_OF_FRAME;
+ sendFrame(socket, connectFrame);
+
+ String response = receiveFrame(socket);
+ System.out.println("response: " + response);
+
+ // Step 3. Send a SEND frame (a Stomp message) to the
+ // jms.queue.exampleQueue address with a text body
+ String text = "Hello World from Stomp 1.1 !";
+ String message = "SEND\n" +
+ "destination:jms.queue.exampleQueue\n" +
+ "\n" +
+ text +
+ END_OF_FRAME;
+ sendFrame(socket, message);
+ System.out.println("Sent Stomp message: " + text);
+
+ // Step 4. Send a DISCONNECT frame to disconnect from the server
+ String disconnectFrame = "DISCONNECT\n" +
+ "\n" +
+ END_OF_FRAME;
+ sendFrame(socket, disconnectFrame);
+
+ // Step 5. Slose the TCP socket
+ socket.close();
+
+ // We will now consume from JMS the message sent with Stomp.
+
+ // Step 6. Create an initial context to perform the JNDI lookup.
+ initialContext = getContext(0);
+
+ // Step 7. Perform a lookup on the queue and the connection factory
+ Queue queue = (Queue)initialContext.lookup("/queue/exampleQueue");
+ ConnectionFactory cf = (ConnectionFactory)initialContext.lookup("/ConnectionFactory");
+
+ // Step 8.Create a JMS Connection, Session and a MessageConsumer on the queue
+ connection = cf.createConnection();
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageConsumer consumer = session.createConsumer(queue);
+
+ // Step 9. Start the Connection
+ connection.start();
+
+ // Step 10. Receive the message
+ TextMessage messageReceived = (TextMessage)consumer.receive(5000);
+ System.out.println("Received JMS message: " + messageReceived.getText());
+
+ return true;
+ }
+ finally
+ {
+ // Step 11. Be sure to close our JMS resources!
+ if (initialContext != null)
+ {
+ initialContext.close();
+ }
+ if (connection != null)
+ {
+ connection.close();
+ }
+ }
+ }
+
+ private static void sendFrame(Socket socket, String data) throws Exception
+ {
+ byte[] bytes = data.getBytes("UTF-8");
+ OutputStream outputStream = socket.getOutputStream();
+ for (int i = 0; i < bytes.length; i++)
+ {
+ outputStream.write(bytes[i]);
+ }
+ outputStream.flush();
+ }
+
+ private static String receiveFrame(Socket socket) throws Exception
+ {
+ InputStream inputStream = socket.getInputStream();
+ byte[] buffer = new byte[1024];
+ int size = inputStream.read(buffer);
+
+ byte[] data = new byte[size];
+ System.arraycopy(buffer, 0, data, 0, size);
+
+ String frame = new String(data, "UTF-8");
+ return frame;
+
+ }
+
+}
13 years, 2 months
JBoss hornetq SVN: r11548 - trunk/docs/user-manual/en.
by do-not-reply@jboss.org
Author: gaohoward
Date: 2011-10-16 23:08:34 -0400 (Sun, 16 Oct 2011)
New Revision: 11548
Modified:
trunk/docs/user-manual/en/examples.xml
trunk/docs/user-manual/en/interoperability.xml
Log:
Update doc to add stomp 1.1 example and stomp 1.1 support
Modified: trunk/docs/user-manual/en/examples.xml
===================================================================
--- trunk/docs/user-manual/en/examples.xml 2011-10-14 18:53:33 UTC (rev 11547)
+++ trunk/docs/user-manual/en/examples.xml 2011-10-17 03:08:34 UTC (rev 11548)
@@ -455,6 +455,11 @@
HornetQ server to send and receive Stomp messages.</para>
</section>
<section>
+ <title>Stomp1.1</title>
+ <para>The <literal>stomp</literal> example shows you how to configure a
+ HornetQ server to send and receive Stomp messages via a Stomp 1.1 connection.</para>
+ </section>
+ <section>
<title>Stomp Over Web Sockets</title>
<para>The <literal>stomp-websockets</literal> example shows you how to configure a
HornetQ server to send and receive Stomp messages directly from Web browsers (provided
Modified: trunk/docs/user-manual/en/interoperability.xml
===================================================================
--- trunk/docs/user-manual/en/interoperability.xml 2011-10-14 18:53:33 UTC (rev 11547)
+++ trunk/docs/user-manual/en/interoperability.xml 2011-10-17 03:08:34 UTC (rev 11548)
@@ -26,7 +26,7 @@
<section id="stomp">
<title>Stomp</title>
<para><ulink url="http://stomp.codehaus.org/">Stomp</ulink> is a text-orientated wire protocol that allows
- Stomp clients to communicate with Stomp Brokers.</para>
+ Stomp clients to communicate with Stomp Brokers. HornetQ now supports both Stomp 1.0 and Stomp 1.1.</para>
<para><ulink url="http://stomp.codehaus.org/Clients">Stomp clients</ulink> are available for
several languages and platforms making it a good choice for interoperability.</para>
<section id="stomp.native">
13 years, 2 months
JBoss hornetq SVN: r11547 - branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7392/src/config/common.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-10-14 14:53:33 -0400 (Fri, 14 Oct 2011)
New Revision: 11547
Modified:
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7392/src/config/common/hornetq-version.properties
Log:
version update
Modified: branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7392/src/config/common/hornetq-version.properties
===================================================================
--- branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7392/src/config/common/hornetq-version.properties 2011-10-14 18:52:56 UTC (rev 11546)
+++ branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7392/src/config/common/hornetq-version.properties 2011-10-14 18:53:33 UTC (rev 11547)
@@ -1,4 +1,4 @@
-hornetq.version.versionName=HQ_2_2_5_GA_EAP_JBPAPP-7242
+hornetq.version.versionName=HQ_2_2_5_GA_EAP_JBPAPP-7392
hornetq.version.majorVersion=2
hornetq.version.minorVersion=2
hornetq.version.microVersion=5
13 years, 2 months
JBoss hornetq SVN: r11546 - in branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7392: src/main/org/hornetq/core/protocol/core/impl/wireformat and 2 other directories.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-10-14 14:52:56 -0400 (Fri, 14 Oct 2011)
New Revision: 11546
Modified:
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7392/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7392/src/main/org/hornetq/core/client/impl/LargeMessageControllerImpl.java
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7392/src/main/org/hornetq/core/protocol/core/impl/wireformat/SessionContinuationMessage.java
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7392/src/main/org/hornetq/core/protocol/core/impl/wireformat/SessionReceiveContinuationMessage.java
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7392/src/main/org/hornetq/core/server/impl/ScheduledDeliveryHandlerImpl.java
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7392/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7392/tests/src/org/hornetq/tests/integration/client/ConsumerWindowSizeTest.java
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7392/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java
Log:
https://issues.jboss.org/browse/JBPAPP-7389 - flow control on large message
Modified: branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7392/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java
===================================================================
--- branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7392/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java 2011-10-14 17:38:29 UTC (rev 11545)
+++ branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7392/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java 2011-10-14 18:52:56 UTC (rev 11546)
@@ -574,7 +574,7 @@
largeMessageCache.deleteOnExit();
}
- currentLargeMessageController = new LargeMessageControllerImpl(this, packet.getLargeMessageSize(), 60, largeMessageCache);
+ currentLargeMessageController = new LargeMessageControllerImpl(this, packet.getLargeMessageSize(), 5, largeMessageCache);
if (currentChunkMessage.isCompressed())
{
@@ -596,7 +596,18 @@
{
return;
}
- currentLargeMessageController.addPacket(chunk);
+ if (currentLargeMessageController == null)
+ {
+ if (log.isTraceEnabled())
+ {
+ log.trace("Sending back credits for largeController = null " + chunk.getPacketSize());
+ }
+ flowControl(chunk.getPacketSize(), false);
+ }
+ else
+ {
+ currentLargeMessageController.addPacket(chunk);
+ }
}
public void clear(boolean waitForOnMessage) throws HornetQException
@@ -609,12 +620,39 @@
while (iter.hasNext())
{
- ClientMessageInternal message = iter.next();
+ try
+ {
+ ClientMessageInternal message = iter.next();
+
+ if (message.isLargeMessage())
+ {
+ ClientLargeMessageInternal largeMessage = (ClientLargeMessageInternal)message;
+ largeMessage.getLargeMessageController().cancel();
+ }
- flowControlBeforeConsumption(message);
+ flowControlBeforeConsumption(message);
+ }
+ catch (Exception e)
+ {
+ log.warn(e.getMessage(), e);
+ }
}
clearBuffer();
+
+ try
+ {
+ if (currentLargeMessageController != null)
+ {
+ currentLargeMessageController.cancel();
+ currentLargeMessageController = null;
+ }
+ }
+ catch (Throwable e)
+ {
+ // nothing that could be done here
+ log.warn(e.getMessage(), e);
+ }
}
// Need to send credits for the messages in the buffer
@@ -680,6 +718,11 @@
if (clientWindowSize >= 0)
{
creditsToSend += messageBytes;
+
+ if (log.isTraceEnabled())
+ {
+ log.trace(this + "::FlowControl::creditsToSend=" + creditsToSend + ", clientWindowSize = " + clientWindowSize + " messageBytes = " + messageBytes);
+ }
if (creditsToSend >= clientWindowSize)
{
@@ -687,7 +730,7 @@
{
if (ClientConsumerImpl.trace)
{
- ClientConsumerImpl.log.trace("Sending " + creditsToSend + " -1, for slow consumer");
+ ClientConsumerImpl.log.trace("FlowControl::Sending " + creditsToSend + " -1, for slow consumer");
}
// sending the credits - 1 initially send to fire the slow consumer, or the slow consumer would be
Modified: branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7392/src/main/org/hornetq/core/client/impl/LargeMessageControllerImpl.java
===================================================================
--- branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7392/src/main/org/hornetq/core/client/impl/LargeMessageControllerImpl.java 2011-10-14 17:38:29 UTC (rev 11545)
+++ branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7392/src/main/org/hornetq/core/client/impl/LargeMessageControllerImpl.java 2011-10-14 18:52:56 UTC (rev 11546)
@@ -31,6 +31,7 @@
import org.hornetq.api.core.HornetQException;
import org.hornetq.api.core.SimpleString;
import org.hornetq.core.logging.Logger;
+import org.hornetq.core.protocol.core.Packet;
import org.hornetq.core.protocol.core.impl.wireformat.SessionReceiveContinuationMessage;
import org.hornetq.utils.DataConstants;
import org.hornetq.utils.UTF8Util;
@@ -141,7 +142,7 @@
{
checkForPacket(totalSize - 1);
}
- catch (Exception ignored)
+ catch (Throwable ignored)
{
}
}
@@ -227,6 +228,24 @@
public synchronized void cancel()
{
+
+ int totalSize = 0;
+ Packet polledPacket = null;
+ while ((polledPacket = packets.poll()) != null)
+ {
+ totalSize += polledPacket.getPacketSize();
+ }
+
+ try
+ {
+ consumerInternal.flowControl(totalSize, false);
+ }
+ catch (Exception ignored)
+ {
+ // what else can we do here?
+ log.warn(ignored.getMessage(), ignored);
+ }
+
packets.offer(new SessionReceiveContinuationMessage());
streamEnded = true;
streamClosed = true;
@@ -279,6 +298,11 @@
public synchronized void saveBuffer(final OutputStream output) throws HornetQException
{
+ if (streamClosed)
+ {
+ throw new HornetQException(HornetQException.ILLEGAL_STATE,
+ "The large message lost connection with its session, either because of a rollback or a closed session");
+ }
setOutputStream(output);
waitCompletion(0);
}
Modified: branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7392/src/main/org/hornetq/core/protocol/core/impl/wireformat/SessionContinuationMessage.java
===================================================================
--- branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7392/src/main/org/hornetq/core/protocol/core/impl/wireformat/SessionContinuationMessage.java 2011-10-14 17:38:29 UTC (rev 11545)
+++ branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7392/src/main/org/hornetq/core/protocol/core/impl/wireformat/SessionContinuationMessage.java 2011-10-14 18:52:56 UTC (rev 11546)
@@ -64,7 +64,14 @@
*/
public byte[] getBody()
{
- return body;
+ if (size <= 0)
+ {
+ return new byte[0];
+ }
+ else
+ {
+ return body;
+ }
}
/**
Modified: branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7392/src/main/org/hornetq/core/protocol/core/impl/wireformat/SessionReceiveContinuationMessage.java
===================================================================
--- branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7392/src/main/org/hornetq/core/protocol/core/impl/wireformat/SessionReceiveContinuationMessage.java 2011-10-14 17:38:29 UTC (rev 11545)
+++ branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7392/src/main/org/hornetq/core/protocol/core/impl/wireformat/SessionReceiveContinuationMessage.java 2011-10-14 18:52:56 UTC (rev 11546)
@@ -77,6 +77,22 @@
super.encodeRest(buffer);
buffer.writeLong(consumerID);
}
+ @Override
+ public int getPacketSize()
+ {
+ if (size == -1)
+ {
+ // This packet was created by the LargeMessageController
+ // TODO: Get rid of this scenario
+ return 0;
+ }
+ else
+ {
+ return size;
+ }
+ }
+
+
@Override
public void decodeRest(final HornetQBuffer buffer)
Modified: branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7392/src/main/org/hornetq/core/server/impl/ScheduledDeliveryHandlerImpl.java
===================================================================
--- branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7392/src/main/org/hornetq/core/server/impl/ScheduledDeliveryHandlerImpl.java 2011-10-14 17:38:29 UTC (rev 11545)
+++ branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7392/src/main/org/hornetq/core/server/impl/ScheduledDeliveryHandlerImpl.java 2011-10-14 18:52:56 UTC (rev 11546)
@@ -17,7 +17,6 @@
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
-import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
Modified: branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7392/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
===================================================================
--- branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7392/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java 2011-10-14 17:38:29 UTC (rev 11545)
+++ branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7392/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java 2011-10-14 18:52:56 UTC (rev 11546)
@@ -61,7 +61,7 @@
// Constants ------------------------------------------------------------------------------------
private static final Logger log = Logger.getLogger(ServerConsumerImpl.class);
-
+
private static boolean isTrace = log.isTraceEnabled();
// Static ---------------------------------------------------------------------------------------
@@ -85,14 +85,12 @@
private boolean started;
private volatile LargeMessageDeliverer largeMessageDeliverer = null;
-
+
public String debug()
{
return toString() + "::Delivering " + this.deliveringRefs.size();
}
- private boolean largeMessageInDelivery;
-
/**
* if we are a browse only consumer we don't need to worry about acknowledgemenets or being started/stopeed by the session.
*/
@@ -117,7 +115,7 @@
private final Binding binding;
private boolean transferring = false;
-
+
/* As well as consumer credit based flow control, we also tap into TCP flow control (assuming transport is using TCP)
* This is useful in the case where consumer-window-size = -1, but we don't want to OOM by sending messages ad infinitum to the Netty
* write queue when the TCP buffer is full, e.g. the client is slow or has died.
@@ -165,11 +163,11 @@
minLargeMessageSize = session.getMinLargeMessageSize();
this.strictUpdateDeliveryCount = strictUpdateDeliveryCount;
-
+
this.callback.addReadyListener(this);
this.creationTime = System.currentTimeMillis();
-
+
if (browseOnly)
{
browserDeliverer = new BrowserDeliverer(messageQueue.iterator());
@@ -187,7 +185,7 @@
{
return id;
}
-
+
public boolean isBrowseOnly()
{
return browseOnly;
@@ -197,12 +195,12 @@
{
return creationTime;
}
-
+
public String getConnectionID()
{
return this.session.getConnectionID().toString();
}
-
+
public String getSessionID()
{
return this.session.getName();
@@ -212,20 +210,23 @@
{
if (availableCredits != null && availableCredits.get() <= 0)
{
- if (log.isDebugEnabled() )
+ if (log.isDebugEnabled())
{
- log.debug(this + " is busy for the lack of credits!!!");
+ log.debug(this + " is busy for the lack of credits. Current credits = " +
+ availableCredits +
+ " Can't receive reference " +
+ ref);
}
-
+
return HandleStatus.BUSY;
}
-
-// TODO - https://jira.jboss.org/browse/HORNETQ-533
-// if (!writeReady.get())
-// {
-// return HandleStatus.BUSY;
-// }
-
+
+ // TODO - https://jira.jboss.org/browse/HORNETQ-533
+ // if (!writeReady.get())
+ // {
+ // return HandleStatus.BUSY;
+ // }
+
synchronized (lock)
{
// If the consumer is stopped then we don't accept the message, it
@@ -238,11 +239,23 @@
// If there is a pendingLargeMessage we can't take another message
// This has to be checked inside the lock as the set to null is done inside the lock
- if (largeMessageInDelivery)
+ if (largeMessageDeliverer != null)
{
+ if (log.isDebugEnabled())
+ {
+ log.debug(this + " is busy delivering large message " +
+ largeMessageDeliverer +
+ ", can't deliver reference " +
+ ref);
+ }
return HandleStatus.BUSY;
}
+ if (log.isTraceEnabled())
+ {
+ log.trace("Handling reference " + ref);
+ }
+
final ServerMessage message = ref.getMessage();
if (filter != null && !filter.match(message))
@@ -265,7 +278,9 @@
// the updateDeliveryCount would still be updated after c
if (strictUpdateDeliveryCount && !ref.isPaged())
{
- if (ref.getMessage().isDurable() && ref.getQueue().isDurable() && !ref.getQueue().isInternalQueue())
+ if (ref.getMessage().isDurable() && ref.getQueue().isDurable() &&
+ !ref.getQueue().isInternalQueue() &&
+ !ref.isPaged())
{
storageManager.updateDeliveryCount(ref);
}
@@ -306,7 +321,7 @@
public void close(final boolean failed) throws Exception
{
callback.removeReadyListener(this);
-
+
setStarted(false);
if (largeMessageDeliverer != null)
@@ -352,8 +367,8 @@
props.putSimpleStringProperty(ManagementHelper.HDR_ROUTING_NAME, binding.getRoutingName());
- props.putSimpleStringProperty(ManagementHelper.HDR_FILTERSTRING, filter == null ? null
- : filter.getFilterString());
+ props.putSimpleStringProperty(ManagementHelper.HDR_FILTERSTRING,
+ filter == null ? null : filter.getFilterString());
props.putIntProperty(ManagementHelper.HDR_DISTANCE, binding.getDistance());
@@ -403,10 +418,28 @@
}
}
- public LinkedList<MessageReference> cancelRefs(final boolean failed, final boolean lastConsumedAsDelivered, final Transaction tx) throws Exception
+ public LinkedList<MessageReference> cancelRefs(final boolean failed,
+ final boolean lastConsumedAsDelivered,
+ final Transaction tx) throws Exception
{
boolean performACK = lastConsumedAsDelivered;
+ try
+ {
+ if (largeMessageDeliverer != null)
+ {
+ largeMessageDeliverer.finish();
+ }
+ }
+ catch (Throwable e)
+ {
+ log.warn("Error on resetting large message deliver - " + largeMessageDeliverer, e);
+ }
+ finally
+ {
+ largeMessageDeliverer = null;
+ }
+
LinkedList<MessageReference> refs = new LinkedList<MessageReference>();
if (!deliveringRefs.isEmpty())
@@ -427,8 +460,9 @@
{
if (!failed)
{
- //We don't decrement delivery count if the client failed, since there's a possibility that refs were actually delivered but we just didn't get any acks for them
- //before failure
+ // We don't decrement delivery count if the client failed, since there's a possibility that refs
+ // were actually delivered but we just didn't get any acks for them
+ // before failure
ref.decrementDeliveryCount();
}
@@ -461,21 +495,6 @@
synchronized (lock)
{
this.transferring = transferring;
-
- if (transferring)
- {
- // Now we must wait for any large message delivery to finish
- while (largeMessageInDelivery)
- {
- try
- {
- Thread.sleep(1);
- }
- catch (InterruptedException ignore)
- {
- }
- }
- }
}
// Outside the lock
@@ -504,18 +523,23 @@
}
public void receiveCredits(final int credits) throws Exception
- {
+ {
if (credits == -1)
{
+ if (log.isDebugEnabled())
+ {
+ log.debug(this + ":: FlowControl::Received disable flow control message");
+ }
// No flow control
availableCredits = null;
-
- //There may be messages already in the queue
+
+ // There may be messages already in the queue
promptDelivery();
}
else if (credits == 0)
{
- //reset, used on slow consumers
+ // reset, used on slow consumers
+ log.debug(this + ":: FlowControl::Received reset flow control message");
availableCredits.set(0);
}
else
@@ -524,16 +548,17 @@
if (log.isDebugEnabled())
{
- log.debug(this + "::Received " + credits +
- " credits, previous value = " +
- previous +
- " currentValue = " +
- availableCredits.get());
+ log.debug(this + "::FlowControl::Received " +
+ credits +
+ " credits, previous value = " +
+ previous +
+ " currentValue = " +
+ availableCredits.get());
}
if (previous <= 0 && previous + credits > 0)
{
- if (log.isTraceEnabled() )
+ if (log.isTraceEnabled())
{
log.trace(this + "::calling promptDelivery from receiving credits");
}
@@ -553,7 +578,7 @@
{
return;
}
-
+
// Acknowledge acknowledges all refs delivered by the consumer up to and including the one explicitly
// acknowledged
@@ -585,21 +610,21 @@
}
while (ref.getMessage().getMessageID() != messageID);
}
-
+
public void individualAcknowledge(final boolean autoCommitAcks, final Transaction tx, final long messageID) throws Exception
{
if (browseOnly)
{
return;
}
-
+
MessageReference ref = removeReferenceByID(messageID);
-
+
if (ref == null)
{
throw new IllegalStateException("Cannot find ref to ack " + messageID);
}
-
+
if (autoCommitAcks)
{
ref.getQueue().acknowledge(ref);
@@ -639,13 +664,13 @@
return ref;
}
-
+
public void readyForWriting(final boolean ready)
{
if (ready)
{
writeReady.set(true);
-
+
promptDelivery();
}
else
@@ -664,28 +689,35 @@
private void promptDelivery()
{
- synchronized (lock)
+ if (largeMessageDeliverer != null)
{
- // largeMessageDeliverer is aways set inside a lock
- // if we don't acquire a lock, we will have NPE eventually
- if (largeMessageDeliverer != null)
+ resumeLargeMessage();
+ }
+ else
+ {
+ if (browseOnly)
{
- resumeLargeMessage();
+ messageQueue.getExecutor().execute(browserDeliverer);
}
else
{
- if (browseOnly)
- {
- messageQueue.getExecutor().execute(browserDeliverer);
- }
- else
- {
- messageQueue.forceDelivery();
- }
+ messageQueue.forceDelivery();
}
}
}
+ private void forceDelivery()
+ {
+ if (browseOnly)
+ {
+ messageQueue.getExecutor().execute(browserDeliverer);
+ }
+ else
+ {
+ messageQueue.deliverAsync();
+ }
+ }
+
private void resumeLargeMessage()
{
messageQueue.getExecutor().execute(resumeLargeMessageRunnable);
@@ -693,8 +725,6 @@
private void deliverLargeMessage(final MessageReference ref, final ServerMessage message) throws Exception
{
- largeMessageInDelivery = true;
-
final LargeMessageDeliverer localDeliverer = new LargeMessageDeliverer((LargeServerMessage)message, ref);
// it doesn't need lock because deliverLargeMesasge is already inside the lock()
@@ -713,6 +743,14 @@
if (availableCredits != null)
{
availableCredits.addAndGet(-packetSize);
+
+ if (log.isTraceEnabled())
+ {
+ log.trace(this + "::FlowControl::delivery standard taking " +
+ packetSize +
+ " from credits, available now is " +
+ availableCredits);
+ }
}
}
@@ -729,16 +767,7 @@
{
if (largeMessageDeliverer == null || largeMessageDeliverer.deliver())
{
- if (browseOnly)
- {
- messageQueue.getExecutor().execute(browserDeliverer);
- }
- else
- {
- // prompt Delivery only if chunk was finished
-
- messageQueue.deliverAsync();
- }
+ forceDelivery();
}
}
catch (Exception e)
@@ -786,6 +815,12 @@
if (availableCredits != null && availableCredits.get() <= 0)
{
+ if (log.isTraceEnabled())
+ {
+ log.trace(this + "::FlowControl::delivery largeMessage interrupting as there are no more credits, available=" +
+ availableCredits);
+ }
+
return false;
}
@@ -794,7 +829,7 @@
context = largeMessage.getBodyEncoder();
sizePendingLargeMessage = context.getLargeBodySize();
-
+
context.open();
sentInitialPacket = true;
@@ -807,6 +842,15 @@
if (availableCredits != null)
{
availableCredits.addAndGet(-packetSize);
+
+ if (log.isTraceEnabled())
+ {
+ log.trace(this + "::FlowControl::" +
+ " deliver initialpackage with " +
+ packetSize +
+ " delivered, available now = " +
+ availableCredits);
+ }
}
// Execute the rest of the large message on a different thread so as not to tie up the delivery thread
@@ -822,7 +866,8 @@
{
if (ServerConsumerImpl.isTrace)
{
- log.trace("deliverLargeMessage: Leaving loop of send LargeMessage because of credits");
+ log.trace(this + "::FlowControl::deliverLargeMessage Leaving loop of send LargeMessage because of credits, available=" +
+ availableCredits);
}
return false;
@@ -845,16 +890,17 @@
int chunkLen = body.length;
- if (ServerConsumerImpl.isTrace)
- {
- log.trace("deliverLargeMessage: Sending " + packetSize +
- " availableCredits now is " +
- availableCredits);
- }
-
if (availableCredits != null)
{
availableCredits.addAndGet(-packetSize);
+
+ if (log.isTraceEnabled())
+ {
+ log.trace(this + "::FlowControl::largeMessage deliver continuation, packetSize=" +
+ packetSize +
+ " available now=" +
+ availableCredits);
+ }
}
positionPendingLargeMessage += chunkLen;
@@ -903,8 +949,6 @@
largeMessageDeliverer = null;
- largeMessageInDelivery = false;
-
largeMessage = null;
}
}
@@ -920,7 +964,7 @@
}
private final LinkedListIterator<MessageReference> iterator;
-
+
public synchronized void close()
{
iterator.close();
Modified: branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7392/tests/src/org/hornetq/tests/integration/client/ConsumerWindowSizeTest.java
===================================================================
--- branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7392/tests/src/org/hornetq/tests/integration/client/ConsumerWindowSizeTest.java 2011-10-14 17:38:29 UTC (rev 11545)
+++ branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7392/tests/src/org/hornetq/tests/integration/client/ConsumerWindowSizeTest.java 2011-10-14 18:52:56 UTC (rev 11546)
@@ -37,6 +37,7 @@
import org.hornetq.core.server.Consumer;
import org.hornetq.core.server.HornetQServer;
import org.hornetq.core.server.impl.ServerConsumerImpl;
+import org.hornetq.core.settings.impl.AddressSettings;
import org.hornetq.tests.util.ServiceTestBase;
/**
@@ -911,6 +912,123 @@
internalTestSlowConsumerOnMessageHandlerNoBuffers(true);
}
+ public void testFlowControl() throws Exception
+ {
+ internalTestFlowControlOnRollback(false);
+ }
+
+ public void testFlowControlLargeMessage() throws Exception
+ {
+ internalTestFlowControlOnRollback(true);
+ }
+
+ private void internalTestFlowControlOnRollback(final boolean isLargeMessage) throws Exception
+ {
+
+ HornetQServer server = createServer(false, isNetty());
+
+ AddressSettings settings = new AddressSettings();
+ settings.setMaxDeliveryAttempts(-1);
+ server.getAddressSettingsRepository().addMatch("#", settings);
+
+ ClientSession session = null;
+
+ try
+ {
+ final int numberOfMessages = 100;
+
+ server.start();
+
+ locator.setConsumerWindowSize(300000);
+
+ if (isLargeMessage)
+ {
+ // something to ensure we are using large messages
+ locator.setMinLargeMessageSize(100);
+ }
+ else
+ {
+ // To make sure large messages won't kick in, we set anything large
+ locator.setMinLargeMessageSize(Integer.MAX_VALUE);
+ }
+
+ ClientSessionFactory sf = locator.createSessionFactory();
+
+ session = sf.createSession(false, false, false);
+
+ SimpleString ADDRESS = new SimpleString("some-queue");
+
+ session.createQueue(ADDRESS, ADDRESS, true);
+
+
+ ClientProducer producer = session.createProducer(ADDRESS);
+
+ for (int i = 0 ; i < numberOfMessages; i++)
+ {
+ ClientMessage msg = session.createMessage(true);
+ msg.putIntProperty("count", i);
+ msg.getBodyBuffer().writeBytes(new byte[1024]);
+ producer.send(msg);
+ }
+
+ session.commit();
+
+ ClientConsumerInternal consumer = (ClientConsumerInternal)session.createConsumer(ADDRESS);
+
+ session.start();
+
+ for (int repeat = 0; repeat < 100; repeat ++)
+ {
+ System.out.println("Repeat " + repeat);
+ long timeout = System.currentTimeMillis() + 2000;
+ // At least 10 messages on the buffer
+ while (timeout > System.currentTimeMillis() && consumer.getBufferSize() <= 10)
+ {
+ Thread.sleep(10);
+ }
+ assertTrue(consumer.getBufferSize() >= 10);
+
+ ClientMessage msg = consumer.receive(500);
+ msg.getBodyBuffer().readByte();
+ assertNotNull(msg);
+ msg.acknowledge();
+ session.rollback();
+ }
+
+
+ for (int i = 0 ; i < numberOfMessages; i++)
+ {
+ ClientMessage msg = consumer.receive(5000);
+ assertNotNull(msg);
+ System.out.println("msg " + msg);
+ msg.getBodyBuffer().readByte();
+ msg.acknowledge();
+ session.commit();
+ }
+
+ }
+ finally
+ {
+ try
+ {
+ if (session != null)
+ {
+ session.close();
+ }
+ }
+ catch (Exception ignored)
+ {
+ }
+
+ if (server.isStarted())
+ {
+ server.stop();
+ }
+ }
+ }
+
+
+
public void internalTestSlowConsumerOnMessageHandlerNoBuffers(final boolean largeMessages) throws Exception
{
Modified: branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7392/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java
===================================================================
--- branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7392/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java 2011-10-14 17:38:29 UTC (rev 11545)
+++ branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7392/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java 2011-10-14 18:52:56 UTC (rev 11546)
@@ -15,6 +15,9 @@
import java.io.ByteArrayOutputStream;
import java.util.HashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
import javax.transaction.xa.XAResource;
import javax.transaction.xa.Xid;
@@ -70,7 +73,136 @@
{
return false;
}
+
+ public void testRollbackPartiallyConsumedBuffer() throws Exception
+ {
+ for (int i = 0 ; i < 1; i++)
+ {
+ log.info("#test " + i);
+ internalTestRollbackPartiallyConsumedBuffer(false);
+ tearDown();
+ setUp();
+
+ }
+
+ }
+
+ public void testRollbackPartiallyConsumedBufferWithRedeliveryDelay() throws Exception
+ {
+ internalTestRollbackPartiallyConsumedBuffer(true);
+ }
+
+
+ private void internalTestRollbackPartiallyConsumedBuffer(final boolean redeliveryDelay) throws Exception
+ {
+ final int messageSize = 100 * 1024;
+
+ final ClientSession session;
+
+ try
+ {
+ server = createServer(true, isNetty());
+
+ AddressSettings settings = new AddressSettings();
+ if (redeliveryDelay)
+ {
+ settings.setRedeliveryDelay(1000);
+ if (locator.isCompressLargeMessage())
+ {
+ locator.setConsumerWindowSize(0);
+ }
+ }
+ settings.setMaxDeliveryAttempts(-1);
+
+ server.getAddressSettingsRepository().addMatch("#", settings);
+
+ server.start();
+
+ ClientSessionFactory sf = locator.createSessionFactory();
+
+ session = sf.createSession(false, false, false);
+
+ session.createQueue(ADDRESS, ADDRESS, true);
+
+ ClientProducer producer = session.createProducer(LargeMessageTest.ADDRESS);
+
+ for (int i = 0 ; i < 20; i++)
+ {
+ Message clientFile = createLargeClientMessage(session, messageSize, true);
+
+ clientFile.putIntProperty("value", i);
+
+ producer.send(clientFile);
+ }
+
+ session.commit();
+
+ session.start();
+
+ final CountDownLatch latch = new CountDownLatch(1);
+
+ final AtomicInteger errors = new AtomicInteger(0);
+
+ ClientConsumer consumer = session.createConsumer(LargeMessageTest.ADDRESS);
+
+ consumer.setMessageHandler(new MessageHandler()
+ {
+ int counter = 0;
+ public void onMessage(ClientMessage message)
+ {
+ message.getBodyBuffer().readByte();
+ System.out.println("message:" + message);
+ try
+ {
+ if (counter ++ < 20)
+ {
+ Thread.sleep(100);
+ System.out.println("Rollback");
+ message.acknowledge();
+ session.rollback();
+ }
+ else
+ {
+ message.acknowledge();
+ session.commit();
+ }
+
+ if (counter == 40)
+ {
+ latch.countDown();
+ }
+ }
+ catch (Exception e)
+ {
+ latch.countDown();
+ e.printStackTrace();
+ errors.incrementAndGet();
+ }
+ }
+ });
+
+ assertTrue(latch.await(40, TimeUnit.SECONDS));
+
+ consumer.close();
+
+ session.close();
+
+ validateNoFilesOnLargeDir();
+ }
+ finally
+ {
+ try
+ {
+ server.stop();
+ }
+ catch (Throwable ignored)
+ {
+ }
+ }
+
+ }
+
public void testCloseConsumer() throws Exception
{
final int messageSize = (int)(3.5 * HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE);
@@ -124,7 +256,7 @@
{
try
{
- server.stop();
+ session.close();
}
catch (Throwable ignored)
{
@@ -132,7 +264,7 @@
try
{
- session.close();
+ server.stop();
}
catch (Throwable ignored)
{
@@ -500,16 +632,17 @@
ClientConsumer consumerExpiry = session.createConsumer(ADDRESS_EXPIRY);
ClientMessage msg1 = consumerExpiry.receive(5000);
+ assertTrue(msg1.isLargeMessage());
Assert.assertNotNull(msg1);
msg1.acknowledge();
- session.rollback();
-
for (int j = 0; j < messageSize; j++)
{
Assert.assertEquals(UnitTestCase.getSamplebyte(j), msg1.getBodyBuffer().readByte());
}
+ session.rollback();
+
consumerExpiry.close();
for (int i = 0; i < 10; i++)
@@ -521,13 +654,13 @@
Assert.assertNotNull(msg1);
msg1.acknowledge();
- session.rollback();
-
for (int j = 0; j < messageSize; j++)
{
Assert.assertEquals(UnitTestCase.getSamplebyte(j), msg1.getBodyBuffer().readByte());
}
+ session.rollback();
+
consumerExpiry.close();
}
@@ -638,13 +771,13 @@
Assert.assertNotNull(msg1);
msg1.acknowledge();
- session.rollback();
-
for (int j = 0; j < messageSize; j++)
{
Assert.assertEquals(UnitTestCase.getSamplebyte(j), msg1.getBodyBuffer().readByte());
}
+ session.rollback();
+
consumerExpiry.close();
for (int i = 0; i < 10; i++)
@@ -655,13 +788,13 @@
Assert.assertNotNull(msg1);
msg1.acknowledge();
- session.rollback();
-
for (int j = 0; j < messageSize; j++)
{
Assert.assertEquals(UnitTestCase.getSamplebyte(j), msg1.getBodyBuffer().readByte());
}
+ session.rollback();
+
consumerExpiry.close();
}
@@ -1892,6 +2025,7 @@
ClientConsumer consumer = session.createConsumer(queue[1]);
ClientMessage msg = consumer.receive(LargeMessageTest.RECEIVE_WAIT_TIME);
+ msg.getBodyBuffer().readByte();
Assert.assertNull(consumer.receiveImmediate());
Assert.assertNotNull(msg);
13 years, 2 months
JBoss hornetq SVN: r11545 - branches/one-offs.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-10-14 13:38:29 -0400 (Fri, 14 Oct 2011)
New Revision: 11545
Added:
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7392/
Log:
creating one-off patch
13 years, 2 months