[hornetq-commits] JBoss hornetq SVN: r9194 - in trunk: examples/jms/stomp and 6 other directories.
do-not-reply at jboss.org
do-not-reply at jboss.org
Mon May 3 09:10:21 EDT 2010
Author: jmesnil
Date: 2010-05-03 09:10:21 -0400 (Mon, 03 May 2010)
New Revision: 9194
Modified:
trunk/docs/user-manual/en/interoperability.xml
trunk/examples/jms/stomp-websockets/readme.html
trunk/examples/jms/stomp-websockets/src/org/hornetq/jms/example/StompWebSocketExample.java
trunk/examples/jms/stomp/
trunk/examples/jms/stomp/readme.html
trunk/examples/jms/stomp/server0/
trunk/examples/jms/stomp/src/org/hornetq/jms/example/StompExample.java
trunk/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java
trunk/src/main/org/hornetq/core/protocol/stomp/StompSession.java
trunk/tests/src/org/hornetq/tests/integration/stomp/StompTest.java
Log:
[HORNETQ-379] improve Stomp / JMS mapping
* map a Stomp message with a content-length header to a JMS TextMessage
or a Core message with a nullable SimpleString in the body buffer
* map a Stomp message without a content-length header to a JMS BytesMessage
or a Core message with a byte[] in the body buffer
* updated test and examples
* added documentation about this mapping
Modified: trunk/docs/user-manual/en/interoperability.xml
===================================================================
--- trunk/docs/user-manual/en/interoperability.xml 2010-05-03 08:35:24 UTC (rev 9193)
+++ trunk/docs/user-manual/en/interoperability.xml 2010-05-03 13:10:21 UTC (rev 9194)
@@ -89,23 +89,18 @@
</section>
<section>
- <title>Send and consuming Stomp message from JMS</title>
- <para>Stomp messages can be sent and consumed from a JMS Destination by using <literal>BytesMessage</literal> where
- the Stomp message body is stored in the JMS BytesMessage body.</para>
- <para>If the Stomp message contained a UTF-8 String, the corresponding code to read the string from a JMS BytesMessage is:</para>
- <programlisting>
-BytesMessage message = (BytesMessage)consumer.receive();
-byte[] data = new byte[1024];
-int size = message.readBytes(data);
-String text = new String(data, 0, size, "UTF-8");
- </programlisting>
- <para>Conversely, to send a JMS BytesMessage destined to be consumed by Stomp as a UTF-8 String, the code is:</para>
- <programlisting>
-String text = ...
-BytesMessage message = session.createBytesMessage();
-message.writeBytes(text.getBytes("UTF-8"));
-producer.send(message);
- </programlisting>
+ <title>Send and consuming Stomp message from JMS or HornetQ Core API</title>
+ <para>Stomp is mainly a text-orientated protocol. To make it simpler to interoperate with JMS and HornetQ Core API,
+ our Stomp implementation checks for presence of the <literal>content-length</literal> header to decide how to map a Stomp message
+ to a JMS Message or a Core message.
+ </para>
+ <para>If the Stomp message has a <literal>content-length</literal> header, it will be mapped to a JMS <emphasis>TextMessage</emphasis>
+ or a Core message with a <emphasis>single nullable SimpleString in the body buffer</emphasis>.</para>
+ <para>Alternatively, if the Stomp message does <emphasis>not</emphasis> have a <literal>content-length</literal> header,
+ it will be mapped to a JMS <emphasis>BytesMessage</emphasis>
+ or a Core message with a <emphasis>byte[] in the body buffer</emphasis>.</para>
+ <para>The same logic applies when mapping a JMS message or a Core message to Stomp. A Stomp client can check the presence
+ of the <literal>content-length</literal> header to determine the type of the message body (UTF-8 String or bytes).</para>
</section>
</section>
@@ -118,8 +113,8 @@
<programlisting>
<acceptor name="stomp-ws-acceptor">
<factory-class>org.hornetq.core.remoting.impl.netty.NettyAcceptorFactory</factory-class>
- <param key="protocol" value="stomp_ws"/>
- <param key="port" value="61614"/>
+ <param key="protocol" value="stomp_ws"/>
+ <param key="port" value="61614"/>
</acceptor>
</programlisting>
<para>With this configuration, HornetQ will accept Stomp connections over Web Sockets on
Property changes on: trunk/examples/jms/stomp
___________________________________________________________________
Name: svn:ignore
+ build
Modified: trunk/examples/jms/stomp/readme.html
===================================================================
--- trunk/examples/jms/stomp/readme.html 2010-05-03 08:35:24 UTC (rev 9193)
+++ trunk/examples/jms/stomp/readme.html 2010-05-03 13:10:21 UTC (rev 9194)
@@ -82,13 +82,10 @@
<code>connection.start();</code>
</pre>
- <li>We receive the message. Stomp messages are mapped to JMS BytesMessages.</li>
+ <li>We receive the message. Stomp messages are mapped to JMS TextMessage.</li>
<pre class="prettyprint">
- BytesMessage messageReceived = (BytesMessage)consumer.receive(5000);
- byte[] data = new byte[1024];
- int size = messageReceived.readBytes(data);
- String receivedText = new String(data, 0, size, "UTF-8");
- System.out.println("Received JMS message: " + receivedText);
+ TextMessage messageReceived = (TextMessage)consumer.receive(5000);
+ System.out.println("Received JMS message: " + messageReceived.getText());
</pre>
<li>And finally, <b>always</b> remember to close your JMS connections and resources after use, in a <code>finally</code> block. Closing a JMS connection will automatically close all of its sessions, consumers, producer and browser objects</li>
Property changes on: trunk/examples/jms/stomp/server0
___________________________________________________________________
Name: svn:ignore
+ data
Modified: trunk/examples/jms/stomp/src/org/hornetq/jms/example/StompExample.java
===================================================================
--- trunk/examples/jms/stomp/src/org/hornetq/jms/example/StompExample.java 2010-05-03 08:35:24 UTC (rev 9193)
+++ trunk/examples/jms/stomp/src/org/hornetq/jms/example/StompExample.java 2010-05-03 13:10:21 UTC (rev 9194)
@@ -15,12 +15,12 @@
import java.io.OutputStream;
import java.net.Socket;
-import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.MessageConsumer;
import javax.jms.Queue;
import javax.jms.Session;
+import javax.jms.TextMessage;
import javax.naming.InitialContext;
import org.hornetq.common.example.HornetQExample;
@@ -61,7 +61,7 @@
sendFrame(socket, connectFrame);
// Step 3. Send a SEND frame (a Stomp message) to the
- // queue /queue/exampleQueue with a text body
+ // jms.queue.exampleQueue address with a text body
String text = "Hello, world from Stomp!";
String message = "SEND\n" +
"destination: jms.queue.exampleQueue\n" +
@@ -98,11 +98,8 @@
connection.start();
// Step 10. Receive the message
- BytesMessage messageReceived = (BytesMessage)consumer.receive(5000);
- byte[] data = new byte[1024];
- int size = messageReceived.readBytes(data);
- String receivedText = new String(data, 0, size, "UTF-8");
- System.out.println("Received JMS message: " + receivedText);
+ TextMessage messageReceived = (TextMessage)consumer.receive(5000);
+ System.out.println("Received JMS message: " + messageReceived.getText());
return true;
}
Modified: trunk/examples/jms/stomp-websockets/readme.html
===================================================================
--- trunk/examples/jms/stomp-websockets/readme.html 2010-05-03 08:35:24 UTC (rev 9193)
+++ trunk/examples/jms/stomp-websockets/readme.html 2010-05-03 13:10:21 UTC (rev 9194)
@@ -32,6 +32,15 @@
<h2>Example Setup</h2>
<p>The example will start a HornetQ server configured with Stomp over Web Sockets and JMS. Web browsers clients and
Java application will exchange message using a JMS Topic.</p>
+ <para>To enable Stomp over Web Sockets, the server must have a <literal>NettyAcceptor</literal> configured with a <literal>protocol</literal>
+ parameter set to <literal>stomp_ws</literal>:</para>
+ <pre class="prettyprint">
+<acceptor name="stomp-websocket">
+ <factory-class>org.hornetq.core.remoting.impl.netty.NettyAcceptorFactory</factory-class>
+ <param key="protocol" value="stomp_ws"/>
+ <param key="port" value="61614"/>
+</acceptor>
+ </pre>
<h2>Example step-by-step</h2>
<p>To run the example, you need to start HornetQ server from the <code>bin</code> directory and specify this example's
Modified: trunk/examples/jms/stomp-websockets/src/org/hornetq/jms/example/StompWebSocketExample.java
===================================================================
--- trunk/examples/jms/stomp-websockets/src/org/hornetq/jms/example/StompWebSocketExample.java 2010-05-03 08:35:24 UTC (rev 9193)
+++ trunk/examples/jms/stomp-websockets/src/org/hornetq/jms/example/StompWebSocketExample.java 2010-05-03 13:10:21 UTC (rev 9194)
@@ -14,7 +14,6 @@
import java.util.Date;
-import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.MessageConsumer;
@@ -57,19 +56,17 @@
// use JMS bytes message with UTF-8 String to send a text to Stomp clients
String text = "message sent from a Java application at " + new Date();
- BytesMessage message = session.createBytesMessage();
- message.writeBytes(text.getBytes("UTF-8"));
+ //BytesMessage message = session.createBytesMessage();
+ //message.writeBytes(text.getBytes("UTF-8"));
+ TextMessage message = session.createTextMessage(text);
System.out.println("Sent message: " + text);
producer.send(message);
connection.start();
- message = (BytesMessage)consumer.receive();
- byte[] data = new byte[1024];
- int size = message.readBytes(data);
- text = new String(data, 0, size, "UTF-8");
- System.out.println("Received message: " + text);
+ message = (TextMessage)consumer.receive();
+ System.out.println("Received message: " + message.getText());
return true;
}
Modified: trunk/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java 2010-05-03 08:35:24 UTC (rev 9193)
+++ trunk/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java 2010-05-03 13:10:21 UTC (rev 9194)
@@ -504,15 +504,23 @@
Map<String, Object> headers = frame.getHeaders();
String destination = (String)headers.remove(Stomp.Headers.Send.DESTINATION);
String txID = (String)headers.remove(Stomp.Headers.TRANSACTION);
- byte type = Message.BYTES_TYPE;
long timestamp = System.currentTimeMillis();
ServerMessageImpl message = new ServerMessageImpl(server.getStorageManager().generateUniqueID(), 512);
- message.setType(type);
message.setTimestamp(timestamp);
message.setAddress(SimpleString.toSimpleString(destination));
StompUtils.copyStandardHeadersFromFrameToMessage(frame, message);
- message.getBodyBuffer().writeBytes(frame.getContent());
+ if (headers.containsKey(Stomp.Headers.CONTENT_LENGTH))
+ {
+ message.setType(Message.BYTES_TYPE);
+ message.getBodyBuffer().writeBytes(frame.getContent());
+ }
+ else
+ {
+ message.setType(Message.TEXT_TYPE);
+ String text = new String(frame.getContent(), "UTF-8");
+ message.getBodyBuffer().writeNullableSimpleString(SimpleString.toSimpleString(text));
+ }
StompSession stompSession = null;
if (txID == null)
Modified: trunk/src/main/org/hornetq/core/protocol/stomp/StompSession.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/stomp/StompSession.java 2010-05-03 08:35:24 UTC (rev 9193)
+++ trunk/src/main/org/hornetq/core/protocol/stomp/StompSession.java 2010-05-03 13:10:21 UTC (rev 9194)
@@ -19,6 +19,7 @@
import java.util.concurrent.ConcurrentHashMap;
import org.hornetq.api.core.HornetQBuffer;
+import org.hornetq.api.core.Message;
import org.hornetq.api.core.SimpleString;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.message.impl.MessageImpl;
@@ -95,10 +96,23 @@
: serverMessage.getEndOfBodyPosition();
int size = bodyPos - buffer.readerIndex();
byte[] data = new byte[size];
- buffer.readBytes(data);
- headers.put(Headers.CONTENT_LENGTH, data.length);
+ if (serverMessage.containsProperty(Stomp.Headers.CONTENT_LENGTH) || serverMessage.getType() == Message.BYTES_TYPE)
+ {
+ headers.put(Headers.CONTENT_LENGTH, data.length);
+ buffer.readBytes(data);
+ }
+ else
+ {
+ SimpleString text = buffer.readNullableSimpleString();
+ if (text != null)
+ {
+ data = text.toString().getBytes("UTF-8");
+ } else
+ {
+ data = new byte[0];
+ }
+ }
serverMessage.getBodyBuffer().resetReaderIndex();
-
StompFrame frame = new StompFrame(Stomp.Responses.MESSAGE, headers, data);
StompUtils.copyStandardHeadersFromMessageToFrame(serverMessage, frame, deliveryCount);
Modified: trunk/tests/src/org/hornetq/tests/integration/stomp/StompTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/stomp/StompTest.java 2010-05-03 08:35:24 UTC (rev 9193)
+++ trunk/tests/src/org/hornetq/tests/integration/stomp/StompTest.java 2010-05-03 13:10:21 UTC (rev 9194)
@@ -41,6 +41,7 @@
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
+import javax.jms.TextMessage;
import javax.jms.Topic;
import junit.framework.Assert;
@@ -188,10 +189,10 @@
sendFrame(frame);
- BytesMessage message = (BytesMessage)consumer.receive(1000);
+ TextMessage message = (TextMessage)consumer.receive(1000);
Assert.assertNotNull(message);
- Assert.assertEquals("Hello World", readContent(message));
-
+ Assert.assertEquals("Hello World", message.getText());
+
// Make sure that the timestamp is valid - should
// be very close to the current time.
long tnow = System.currentTimeMillis();
@@ -224,10 +225,10 @@
Assert.assertTrue(f.startsWith("RECEIPT"));
Assert.assertTrue(f.indexOf("receipt-id:1234") >= 0);
- BytesMessage message = (BytesMessage)consumer.receive(1000);
+ TextMessage message = (TextMessage)consumer.receive(1000);
Assert.assertNotNull(message);
- Assert.assertEquals("Hello World", readContent(message));
-
+ Assert.assertEquals("Hello World", message.getText());
+
// Make sure that the timestamp is valid - should
// be very close to the current time.
long tnow = System.currentTimeMillis();
@@ -292,9 +293,9 @@
sendFrame(frame);
- BytesMessage message = (BytesMessage)consumer.receive(1000);
+ TextMessage message = (TextMessage)consumer.receive(1000);
Assert.assertNotNull(message);
- Assert.assertEquals("Hello World", readContent(message));
+ Assert.assertEquals("Hello World", message.getText());
// differ from StompConnect
Assert.assertEquals("TEST", message.getStringProperty("JMSXGroupID"));
}
@@ -321,9 +322,9 @@
sendFrame(frame);
- BytesMessage message = (BytesMessage)consumer.receive(1000);
+ TextMessage message = (TextMessage)consumer.receive(1000);
Assert.assertNotNull(message);
- Assert.assertEquals("Hello World", readContent(message));
+ Assert.assertEquals("Hello World", message.getText());
Assert.assertEquals("foo", "abc", message.getStringProperty("foo"));
Assert.assertEquals("bar", "123", message.getStringProperty("bar"));
}
@@ -355,9 +356,9 @@
sendFrame(frame);
- BytesMessage message = (BytesMessage)consumer.receive(1000);
+ TextMessage message = (TextMessage)consumer.receive(1000);
Assert.assertNotNull(message);
- Assert.assertEquals("Hello World", readContent(message));
+ Assert.assertEquals("Hello World", message.getText());
Assert.assertEquals("JMSCorrelationID", "c123", message.getJMSCorrelationID());
Assert.assertEquals("getJMSType", "t345", message.getJMSType());
Assert.assertEquals("getJMSPriority", 3, message.getJMSPriority());
@@ -992,9 +993,9 @@
waitForReceipt();
// only second msg should be received since first msg was rolled back
- BytesMessage message = (BytesMessage)consumer.receive(1000);
+ TextMessage message = (TextMessage)consumer.receive(1000);
Assert.assertNotNull(message);
- Assert.assertEquals("second message", readContent(message));
+ Assert.assertEquals("second message", message.getText());
}
public void testSubscribeToTopic() throws Exception
@@ -1411,12 +1412,14 @@
public void sendMessage(String msg) throws Exception
{
- sendMessage(msg.getBytes("UTF-8"), "foo", "xyz", queue);
+ sendMessage(msg, queue);
}
public void sendMessage(String msg, Destination destination) throws Exception
{
- sendMessage(msg.getBytes("UTF-8"), "foo", "xyz", destination);
+ MessageProducer producer = session.createProducer(destination);
+ TextMessage message = session.createTextMessage(msg);
+ producer.send(message);
}
public void sendMessage(byte[] data, Destination destination) throws Exception
@@ -1438,13 +1441,6 @@
producer.send(message);
}
- public String readContent(BytesMessage message) throws Exception
- {
- byte[] data = new byte[1024];
- int size = message.readBytes(data);
- return new String(data, 0, size, "UTF-8");
- }
-
protected void waitForReceipt() throws Exception
{
String frame = receiveFrame(50000);
More information about the hornetq-commits
mailing list