Author: jmesnil
Date: 2010-01-29 08:43:32 -0500 (Fri, 29 Jan 2010)
New Revision: 8858
Modified:
trunk/docs/user-manual/en/examples.xml
trunk/docs/user-manual/en/interoperability.xml
trunk/docs/user-manual/en/messaging-concepts.xml
trunk/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java
trunk/src/main/org/hornetq/core/protocol/stomp/StompSession.java
trunk/tests/src/org/hornetq/tests/integration/stomp/StompTest.java
Log:
https://jira.jboss.org/jira/browse/HORNETQ-129: Implement STOMP v1.0
* add documentation
* replace sysout calls by log.trace
Modified: trunk/docs/user-manual/en/examples.xml
===================================================================
--- trunk/docs/user-manual/en/examples.xml 2010-01-28 15:16:36 UTC (rev 8857)
+++ trunk/docs/user-manual/en/examples.xml 2010-01-29 13:43:32 UTC (rev 8858)
@@ -416,6 +416,11 @@
HornetQ queue with static message selectors (filters) using
JMS.</para>
</section>
<section>
+ <title>Stomp</title>
+ <para>The <literal>stomp</literal> example shows you how to
configure a
+ HornetQ server to send and receive Stomp messages.</para>
+ </section>
+ <section>
<title>Symmetric Cluster</title>
<para>The <literal>symmetric-cluster</literal> example
demonstrates a symmetric cluster
set-up with HornetQ.</para>
Modified: trunk/docs/user-manual/en/interoperability.xml
===================================================================
--- trunk/docs/user-manual/en/interoperability.xml 2010-01-28 15:16:36 UTC (rev 8857)
+++ trunk/docs/user-manual/en/interoperability.xml 2010-01-29 13:43:32 UTC (rev 8858)
@@ -18,28 +18,58 @@
<!-- =============================================================================
-->
<chapter id="interoperability">
<title>Interoperability</title>
- <section>
- <title>Stomp and StompConnect</title>
- <para><ulink
url="http://stomp.codehaus.org/">Stomp</ulink> is a wire protocol that
allows
- Stomp clients to communicate with Stomp Brokers. <ulink
-
url="http://stomp.codehaus.org/StompConnect">StompConnect<... is a
server that
+ <section id="stomp">
+ <title>Stomp</title>
+ <para><ulink
url="http://stomp.codehaus.org/">Stomp</ulink> is a text-orientated
wire protocol that allows
+ Stomp clients to communicate with Stomp Brokers.</para>
+ <para><ulink
url="http://stomp.codehaus.org/Clients">Stomp
clients</ulink> are available for
+ several languages and platforms making it a good choice for
interoperability.</para>
+ <section id="stomp.native">
+ <title>Native Stomp support</title>
+ <para>HornetQ provides native support for Stomp. To be able to send and
receive Stomp messages,
+ you must configure a <literal>NettyAcceptor</literal> with a
<literal>protocol</literal>
+ parameter set to <literal>stomp</literal>:</para>
+<programlisting>
+ <acceptor name="stomp-acceptor">
+ <factory-class>org.hornetq.integration.transports.netty.NettyAcceptorFactory</factory-class>
+ <param key="protocol" value="stomp"/>
+ <param key="port" value="61613"/>
+ </acceptor>
+</programlisting>
+ <para>With this configuration, HornetQ will accept Stomp connections on
+ the port <literal>61613</literal> (which is the default port of
the Stomp brokers).</para>
+ <para>See the <literal>stomp</literal> example which shows
how to configure a HornetQ server with Stomp.</para>
+ <section>
+ <title>Limitations</title>
+ <para>Message acknowledgements are not transactional. The ACK frame can
not be part of a transaction
+ (it will be ignored if its <literal>transaction</literal>
header is set).</para>
+ </section>
+ <section>
+ <title>Destination Mapping</title>
+ <para>Stomp messages are sent and received by specifying
"destinations".
+ If the Stomp destinations starts with
<literal>/queue/</literal>, <literal>/topic/</literal>,
+ <literal>/temp-queue/</literal> or
<literal>/temp-topic/</literal>, they will be mapped to corresponding
+ JMS Destinations. Ohterwise, they will be treated as regular HornetQ
addresses (for sent messages) and
+ queues (for subscription and received messages).</para>
+ </section>
+ </section>
+ <section id="stompconnect">
+ <title>StompConnect</title>
+ <para><ulink
url="http://stomp.codehaus.org/StompConnect">StompConnect<... is a
server that
can act as a Stomp broker and proxy the Stomp protocol to the standard JMS
API.
Consequently, using StompConnect it is possible to turn HornetQ into a Stomp
Broker and
use any of the available stomp clients. These include clients written in C,
C++, c# and
.net etc.</para>
- <para>To run StompConnect first start the HornetQ server and make sure that
it is using
+ <para>To run StompConnect first start the HornetQ server and make sure
that it is using
JNDI.</para>
- <para>Stomp requires the file
<literal>jndi.properties</literal> to be available on the
+ <para>Stomp requires the file
<literal>jndi.properties</literal> to be available on the
classpath. This should look something like:</para>
-
<programlisting>java.naming.factory.initial=org.jnp.interfaces.NamingContextFactory
+
<programlisting>java.naming.factory.initial=org.jnp.interfaces.NamingContextFactory
java.naming.provider.url=jnp://localhost:1099
java.naming.factory.url.pkgs=org.jboss.naming:org.jnp.interfaces</programlisting>
- <para>Make sure this file is in the classpath along with the StompConnect
jar and the
+ <para>Make sure this file is in the classpath along with the StompConnect
jar and the
HornetQ jars and simply run <literal>java
org.codehaus.stomp.jms.Main</literal>.</para>
- <para>HornetQ will shortly be implementing the Stomp protocol directly, so
you won't have to
- use StompConnect to be able to use HornetQ with Stomp clients.</para>
- <para>A list of STOMP clients is available <ulink
url="http://stomp.codehaus.org/Clients"
- >here.</ulink></para>
+ </section>
</section>
<section>
<title>REST</title>
Modified: trunk/docs/user-manual/en/messaging-concepts.xml
===================================================================
--- trunk/docs/user-manual/en/messaging-concepts.xml 2010-01-28 15:16:36 UTC (rev 8857)
+++ trunk/docs/user-manual/en/messaging-concepts.xml 2010-01-29 13:43:32 UTC (rev 8858)
@@ -203,15 +203,14 @@
<title>STOMP</title>
<para><ulink
url="http://en.wikipedia.org/wiki/Streaming_Text_Orientated_Messagin...
- >STOMP</ulink> is a very simple protocol for interoperating
with messaging
- systems. It defines a wire format, so theoretically any STOMP client can
work with
- any messaging system that supports STOMP. STOMP clients are available in
many
+ >Stomp</ulink> is a very simple protocol for interoperating
with messaging
+ systems. It defines a wire format, so theoretically any Stomp client can
work with
+ any messaging system that supports Stomp. Stomp clients are available in
many
different programming languages.</para>
- <para>HornetQ can be used by any STOMP client when using the <ulink
+ <para>HornetQ can be used by any Stomp client when using the <ulink
url="http://stomp.codehaus.org/StompConnect">StompConnect<...
broker which
translates the STOMP protocol to the JMS API.</para>
- <para>HornetQ will be shortly implementing the STOMP protocol on the
broker, thus
- avoiding having to use StompConnect.</para>
+ <para>Please see <xref linkend="stomp"/> for using
STOMP with HornetQ.</para>
</section>
<section>
<title>AMQP</title>
Modified: trunk/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java 2010-01-28
15:16:36 UTC (rev 8857)
+++ trunk/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java 2010-01-29
13:43:32 UTC (rev 8858)
@@ -126,8 +126,11 @@
try
{
request = marshaller.unmarshal(buffer);
- System.out.println("<<< " + request);
-
+ if (log.isTraceEnabled())
+ {
+ log.trace("received " + request);
+ }
+
String command = request.getCommand();
StompFrame response = null;
@@ -514,7 +517,10 @@
public int send(StompConnection connection, StompFrame frame)
{
- System.out.println(">>> " + frame);
+ if (log.isTraceEnabled())
+ {
+ log.trace("sent " + frame);
+ }
synchronized (connection)
{
if (connection.isDestroyed() || !connection.isValid())
Modified: trunk/src/main/org/hornetq/core/protocol/stomp/StompSession.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/stomp/StompSession.java 2010-01-28 15:16:36
UTC (rev 8857)
+++ trunk/src/main/org/hornetq/core/protocol/stomp/StompSession.java 2010-01-29 13:43:32
UTC (rev 8858)
@@ -80,6 +80,7 @@
headers.put(Stomp.Headers.Message.SUBSCRIPTION, subscription.getID());
}
byte[] data = new byte[] {};
+ serverMessage.getBodyBuffer().markReaderIndex();
if (serverMessage.getType() == Message.TEXT_TYPE)
{
SimpleString text =
serverMessage.getBodyBuffer().readNullableSimpleString();
@@ -97,6 +98,7 @@
buffer.readBytes(data);
headers.put(Headers.CONTENT_LENGTH, data.length);
}
+ serverMessage.getBodyBuffer().resetReaderIndex();
StompFrame frame = new StompFrame(Stomp.Responses.MESSAGE, headers, data);
StompUtils.copyStandardHeadersFromMessageToFrame(serverMessage, frame,
deliveryCount);
Modified: trunk/tests/src/org/hornetq/tests/integration/stomp/StompTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/stomp/StompTest.java 2010-01-28 15:16:36
UTC (rev 8857)
+++ trunk/tests/src/org/hornetq/tests/integration/stomp/StompTest.java 2010-01-29 13:43:32
UTC (rev 8858)
@@ -769,6 +769,7 @@
frame = receiveFrame(10000);
Assert.assertTrue(frame.startsWith("MESSAGE"));
+ System.out.println(frame);
Assert.assertTrue(frame.contains("shouldBeNextMessage"));
}
@@ -801,12 +802,12 @@
frame =
"UNSUBSCRIBE\n" +
"destination:/queue/" + getQueueName() + "\n"
+
+ "receipt:567\n" +
"\n\n" +
Stomp.NULL;
sendFrame(frame);
+ waitForReceipt();
- waitForFrameToTakeEffect();
-
//send a message to our queue
sendMessage("second message");
@@ -850,12 +851,12 @@
frame =
"UNSUBSCRIBE\n" +
"id:mysubid\n" +
+ "receipt: 345\n" +
"\n\n" +
Stomp.NULL;
sendFrame(frame);
+ waitForReceipt();
- waitForFrameToTakeEffect();
-
//send a message to our queue
sendMessage("second message");
@@ -893,24 +894,25 @@
"SEND\n" +
"destination:/queue/" + getQueueName() + "\n"
+
"transaction: tx1\n" +
+ "receipt: 123\n" +
"\n\n" +
"Hello World" +
Stomp.NULL;
sendFrame(frame);
-
- waitForFrameToTakeEffect();
+ waitForReceipt();
+
// check the message is not committed
assertNull(consumer.receive(100));
frame =
"COMMIT\n" +
"transaction: tx1\n" +
+ "receipt:456\n" +
"\n\n" +
Stomp.NULL;
sendFrame(frame);
+ waitForReceipt();
- waitForFrameToTakeEffect();
-
TextMessage message = (TextMessage) consumer.receive(1000);
Assert.assertNotNull("Should have received a message", message);
}
@@ -1070,13 +1072,12 @@
frame =
"COMMIT\n" +
"transaction: tx1\n" +
+ "receipt:789\n" +
"\n\n" +
Stomp.NULL;
sendFrame(frame);
+ waitForReceipt();
- // This test case is currently failing
- waitForFrameToTakeEffect();
-
//only second msg should be received since first msg was rolled back
TextMessage message = (TextMessage) consumer.receive(1000);
Assert.assertNotNull(message);
@@ -1366,6 +1367,12 @@
producer.send(message);
}
+ protected void waitForReceipt() throws Exception {
+ String frame = receiveFrame(50000);
+ assertNotNull(frame);
+ assertTrue(frame.indexOf("RECEIPT") > -1);
+ }
+
protected void waitForFrameToTakeEffect() throws InterruptedException {
// bit of a dirty hack :)
// another option would be to force some kind of receipt to be returned