[hornetq-commits] JBoss hornetq SVN: r9194 - in trunk: examples/jms/stomp and 6 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Mon May 3 09:10:21 EDT 2010


Author: jmesnil
Date: 2010-05-03 09:10:21 -0400 (Mon, 03 May 2010)
New Revision: 9194

Modified:
   trunk/docs/user-manual/en/interoperability.xml
   trunk/examples/jms/stomp-websockets/readme.html
   trunk/examples/jms/stomp-websockets/src/org/hornetq/jms/example/StompWebSocketExample.java
   trunk/examples/jms/stomp/
   trunk/examples/jms/stomp/readme.html
   trunk/examples/jms/stomp/server0/
   trunk/examples/jms/stomp/src/org/hornetq/jms/example/StompExample.java
   trunk/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java
   trunk/src/main/org/hornetq/core/protocol/stomp/StompSession.java
   trunk/tests/src/org/hornetq/tests/integration/stomp/StompTest.java
Log:
[HORNETQ-379] improve Stomp / JMS mapping

* map a Stomp message with a content-length header to a JMS TextMessage
  or a Core message with a nullable SimpleString in the body buffer
* map a Stomp message without a content-length header to a JMS BytesMessage
  or a Core message with a byte[] in the body buffer
* updated test and examples
* added documentation about this mapping


Modified: trunk/docs/user-manual/en/interoperability.xml
===================================================================
--- trunk/docs/user-manual/en/interoperability.xml	2010-05-03 08:35:24 UTC (rev 9193)
+++ trunk/docs/user-manual/en/interoperability.xml	2010-05-03 13:10:21 UTC (rev 9194)
@@ -89,23 +89,18 @@
            </section>
 
            <section>
-             <title>Send and consuming Stomp message from JMS</title>
-             <para>Stomp messages can be sent and consumed from a JMS Destination by using <literal>BytesMessage</literal> where
-                the Stomp message body is stored in the JMS BytesMessage body.</para>
-             <para>If the Stomp message contained a UTF-8 String, the corresponding code to read the string from a JMS BytesMessage is:</para>
-             <programlisting>
-BytesMessage message = (BytesMessage)consumer.receive();
-byte[] data = new byte[1024];
-int size = message.readBytes(data);
-String text = new String(data, 0, size, "UTF-8");
-             </programlisting>
-             <para>Conversely, to send a JMS BytesMessage destined to be consumed by Stomp as a UTF-8 String, the code is:</para>
-             <programlisting>
-String text = ...
-BytesMessage message = session.createBytesMessage();
-message.writeBytes(text.getBytes("UTF-8"));
-producer.send(message);
-             </programlisting>
+             <title>Send and consuming Stomp message from JMS or HornetQ Core API</title>
+             <para>Stomp is mainly a text-orientated protocol. To make it simpler to interoperate with JMS and HornetQ Core API, 
+               our Stomp implementation checks for presence of the <literal>content-length</literal> header to decide how to map a Stomp message
+               to a JMS Message or a Core message.
+             </para>
+             <para>If the Stomp message has a <literal>content-length</literal> header, it will be mapped to a JMS <emphasis>TextMessage</emphasis>
+               or a Core message with a <emphasis>single nullable SimpleString in the body buffer</emphasis>.</para>
+             <para>Alternatively, if the Stomp message does <emphasis>not</emphasis> have a <literal>content-length</literal> header, 
+               it will be mapped to a JMS <emphasis>BytesMessage</emphasis>
+               or a Core message with a <emphasis>byte[] in the body buffer</emphasis>.</para>
+             <para>The same logic applies when mapping a JMS message or a Core message to Stomp. A Stomp client can check the presence
+                of the <literal>content-length</literal> header to determine the type of the message body (UTF-8 String or bytes).</para>
           </section>
         </section>
         
@@ -118,8 +113,8 @@
          <programlisting>
 &lt;acceptor name="stomp-ws-acceptor">
 	&lt;factory-class>org.hornetq.core.remoting.impl.netty.NettyAcceptorFactory&lt;/factory-class>
-	&lt;param key="protocol"  value="stomp_ws"/>
-	&lt;param key="port"  value="61614"/>
+	&lt;param key="protocol" value="stomp_ws"/>
+	&lt;param key="port" value="61614"/>
 &lt;/acceptor>
          </programlisting>
          <para>With this configuration, HornetQ will accept Stomp connections over Web Sockets on 


Property changes on: trunk/examples/jms/stomp
___________________________________________________________________
Name: svn:ignore
   + build


Modified: trunk/examples/jms/stomp/readme.html
===================================================================
--- trunk/examples/jms/stomp/readme.html	2010-05-03 08:35:24 UTC (rev 9193)
+++ trunk/examples/jms/stomp/readme.html	2010-05-03 13:10:21 UTC (rev 9194)
@@ -82,13 +82,10 @@
            <code>connection.start();</code>
         </pre>
 
-        <li>We receive the message. Stomp messages are mapped to JMS BytesMessages.</li>
+        <li>We receive the message. Stomp messages are mapped to JMS TextMessage.</li>
         <pre class="prettyprint">
-          BytesMessage messageReceived = (BytesMessage)consumer.receive(5000);
-          byte[] data = new byte[1024];
-          int size = messageReceived.readBytes(data);
-          String receivedText = new String(data, 0, size, "UTF-8");
-          System.out.println("Received JMS message: " + receivedText);
+          TextMessage messageReceived = (TextMessage)consumer.receive(5000);
+          System.out.println("Received JMS message: " + messageReceived.getText());
         </pre>
 
         <li>And finally, <b>always</b> remember to close your JMS connections and resources after use, in a <code>finally</code> block. Closing a JMS connection will automatically close all of its sessions, consumers, producer and browser objects</li>


Property changes on: trunk/examples/jms/stomp/server0
___________________________________________________________________
Name: svn:ignore
   + data


Modified: trunk/examples/jms/stomp/src/org/hornetq/jms/example/StompExample.java
===================================================================
--- trunk/examples/jms/stomp/src/org/hornetq/jms/example/StompExample.java	2010-05-03 08:35:24 UTC (rev 9193)
+++ trunk/examples/jms/stomp/src/org/hornetq/jms/example/StompExample.java	2010-05-03 13:10:21 UTC (rev 9194)
@@ -15,12 +15,12 @@
 import java.io.OutputStream;
 import java.net.Socket;
 
-import javax.jms.BytesMessage;
 import javax.jms.Connection;
 import javax.jms.ConnectionFactory;
 import javax.jms.MessageConsumer;
 import javax.jms.Queue;
 import javax.jms.Session;
+import javax.jms.TextMessage;
 import javax.naming.InitialContext;
 
 import org.hornetq.common.example.HornetQExample;
@@ -61,7 +61,7 @@
          sendFrame(socket, connectFrame);
 
          // Step 3. Send a SEND frame (a Stomp message) to the
-         // queue /queue/exampleQueue with a text body
+         // jms.queue.exampleQueue address with a text body
          String text = "Hello, world from Stomp!";
          String message = "SEND\n" + 
             "destination: jms.queue.exampleQueue\n" +
@@ -98,11 +98,8 @@
          connection.start();
 
          // Step 10. Receive the message
-         BytesMessage messageReceived = (BytesMessage)consumer.receive(5000);
-         byte[] data = new byte[1024];
-         int size = messageReceived.readBytes(data);
-         String receivedText = new String(data, 0, size, "UTF-8");
-         System.out.println("Received JMS message: " + receivedText);
+         TextMessage messageReceived = (TextMessage)consumer.receive(5000);
+         System.out.println("Received JMS message: " + messageReceived.getText());
 
          return true;
       }

Modified: trunk/examples/jms/stomp-websockets/readme.html
===================================================================
--- trunk/examples/jms/stomp-websockets/readme.html	2010-05-03 08:35:24 UTC (rev 9193)
+++ trunk/examples/jms/stomp-websockets/readme.html	2010-05-03 13:10:21 UTC (rev 9194)
@@ -32,6 +32,15 @@
      <h2>Example Setup</h2>
      <p>The example will start a HornetQ server configured with Stomp over Web Sockets and JMS. Web browsers clients and
        Java application will exchange message using a JMS Topic.</p>
+     <para>To enable Stomp over Web Sockets, the server must have a <literal>NettyAcceptor</literal> configured with a <literal>protocol</literal>
+       parameter set to <literal>stomp_ws</literal>:</para>
+     <pre class="prettyprint">
+&lt;acceptor name="stomp-websocket">
+   &lt;factory-class>org.hornetq.core.remoting.impl.netty.NettyAcceptorFactory&lt;/factory-class>
+   &lt;param key="protocol" value="stomp_ws"/>
+   &lt;param key="port" value="61614"/>
+&lt;/acceptor>
+     </pre>
      
      <h2>Example step-by-step</h2>
      <p>To run the example, you need to start HornetQ server from the <code>bin</code> directory and specify this example's

Modified: trunk/examples/jms/stomp-websockets/src/org/hornetq/jms/example/StompWebSocketExample.java
===================================================================
--- trunk/examples/jms/stomp-websockets/src/org/hornetq/jms/example/StompWebSocketExample.java	2010-05-03 08:35:24 UTC (rev 9193)
+++ trunk/examples/jms/stomp-websockets/src/org/hornetq/jms/example/StompWebSocketExample.java	2010-05-03 13:10:21 UTC (rev 9194)
@@ -14,7 +14,6 @@
 
 import java.util.Date;
 
-import javax.jms.BytesMessage;
 import javax.jms.Connection;
 import javax.jms.ConnectionFactory;
 import javax.jms.MessageConsumer;
@@ -57,19 +56,17 @@
 
          // use JMS bytes message with UTF-8 String to send a text to Stomp clients
          String text = "message sent from a Java application at " + new Date();
-         BytesMessage message = session.createBytesMessage();
-         message.writeBytes(text.getBytes("UTF-8"));
+         //BytesMessage message = session.createBytesMessage();
+         //message.writeBytes(text.getBytes("UTF-8"));
+         TextMessage message = session.createTextMessage(text);
          System.out.println("Sent message: " + text);
 
          producer.send(message);
 
          connection.start();
 
-         message = (BytesMessage)consumer.receive();
-         byte[] data = new byte[1024];
-         int size = message.readBytes(data);
-         text = new String(data, 0, size, "UTF-8");
-         System.out.println("Received message: " + text);
+         message = (TextMessage)consumer.receive();
+         System.out.println("Received message: " + message.getText());
 
          return true;
       }

Modified: trunk/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java	2010-05-03 08:35:24 UTC (rev 9193)
+++ trunk/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java	2010-05-03 13:10:21 UTC (rev 9194)
@@ -504,15 +504,23 @@
       Map<String, Object> headers = frame.getHeaders();
       String destination = (String)headers.remove(Stomp.Headers.Send.DESTINATION);
       String txID = (String)headers.remove(Stomp.Headers.TRANSACTION);
-      byte type = Message.BYTES_TYPE;
       long timestamp = System.currentTimeMillis();
 
       ServerMessageImpl message = new ServerMessageImpl(server.getStorageManager().generateUniqueID(), 512);
-      message.setType(type);
       message.setTimestamp(timestamp);
       message.setAddress(SimpleString.toSimpleString(destination));
       StompUtils.copyStandardHeadersFromFrameToMessage(frame, message);
-      message.getBodyBuffer().writeBytes(frame.getContent());
+      if (headers.containsKey(Stomp.Headers.CONTENT_LENGTH))
+      {
+         message.setType(Message.BYTES_TYPE);
+         message.getBodyBuffer().writeBytes(frame.getContent());
+      }
+      else
+      {
+         message.setType(Message.TEXT_TYPE);
+         String text = new String(frame.getContent(), "UTF-8");
+         message.getBodyBuffer().writeNullableSimpleString(SimpleString.toSimpleString(text));
+      }
 
       StompSession stompSession = null;
       if (txID == null)

Modified: trunk/src/main/org/hornetq/core/protocol/stomp/StompSession.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/stomp/StompSession.java	2010-05-03 08:35:24 UTC (rev 9193)
+++ trunk/src/main/org/hornetq/core/protocol/stomp/StompSession.java	2010-05-03 13:10:21 UTC (rev 9194)
@@ -19,6 +19,7 @@
 import java.util.concurrent.ConcurrentHashMap;
 
 import org.hornetq.api.core.HornetQBuffer;
+import org.hornetq.api.core.Message;
 import org.hornetq.api.core.SimpleString;
 import org.hornetq.core.logging.Logger;
 import org.hornetq.core.message.impl.MessageImpl;
@@ -95,10 +96,23 @@
                                                                  : serverMessage.getEndOfBodyPosition();
          int size = bodyPos - buffer.readerIndex();
          byte[] data = new byte[size];
-         buffer.readBytes(data);
-         headers.put(Headers.CONTENT_LENGTH, data.length);
+         if (serverMessage.containsProperty(Stomp.Headers.CONTENT_LENGTH) || serverMessage.getType() == Message.BYTES_TYPE)
+         {
+            headers.put(Headers.CONTENT_LENGTH, data.length);
+            buffer.readBytes(data);
+         }
+         else
+         {
+            SimpleString text = buffer.readNullableSimpleString();
+            if (text != null)
+            {
+               data = text.toString().getBytes("UTF-8");
+            } else
+            {
+               data = new byte[0];
+            }
+         }
          serverMessage.getBodyBuffer().resetReaderIndex();
-
          StompFrame frame = new StompFrame(Stomp.Responses.MESSAGE, headers, data);
          StompUtils.copyStandardHeadersFromMessageToFrame(serverMessage, frame, deliveryCount);
 

Modified: trunk/tests/src/org/hornetq/tests/integration/stomp/StompTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/stomp/StompTest.java	2010-05-03 08:35:24 UTC (rev 9193)
+++ trunk/tests/src/org/hornetq/tests/integration/stomp/StompTest.java	2010-05-03 13:10:21 UTC (rev 9194)
@@ -41,6 +41,7 @@
 import javax.jms.MessageProducer;
 import javax.jms.Queue;
 import javax.jms.Session;
+import javax.jms.TextMessage;
 import javax.jms.Topic;
 
 import junit.framework.Assert;
@@ -188,10 +189,10 @@
 
       sendFrame(frame);
 
-      BytesMessage message = (BytesMessage)consumer.receive(1000);
+      TextMessage message = (TextMessage)consumer.receive(1000);
       Assert.assertNotNull(message);
-      Assert.assertEquals("Hello World", readContent(message));
-
+      Assert.assertEquals("Hello World", message.getText());
+      
       // Make sure that the timestamp is valid - should
       // be very close to the current time.
       long tnow = System.currentTimeMillis();
@@ -224,10 +225,10 @@
       Assert.assertTrue(f.startsWith("RECEIPT"));
       Assert.assertTrue(f.indexOf("receipt-id:1234") >= 0);
 
-      BytesMessage message = (BytesMessage)consumer.receive(1000);
+      TextMessage message = (TextMessage)consumer.receive(1000);
       Assert.assertNotNull(message);
-      Assert.assertEquals("Hello World", readContent(message));
-
+      Assert.assertEquals("Hello World", message.getText());
+      
       // Make sure that the timestamp is valid - should
       // be very close to the current time.
       long tnow = System.currentTimeMillis();
@@ -292,9 +293,9 @@
 
       sendFrame(frame);
 
-      BytesMessage message = (BytesMessage)consumer.receive(1000);
+      TextMessage message = (TextMessage)consumer.receive(1000);
       Assert.assertNotNull(message);
-      Assert.assertEquals("Hello World", readContent(message));
+      Assert.assertEquals("Hello World", message.getText());
       // differ from StompConnect
       Assert.assertEquals("TEST", message.getStringProperty("JMSXGroupID"));
    }
@@ -321,9 +322,9 @@
 
       sendFrame(frame);
 
-      BytesMessage message = (BytesMessage)consumer.receive(1000);
+      TextMessage message = (TextMessage)consumer.receive(1000);
       Assert.assertNotNull(message);
-      Assert.assertEquals("Hello World", readContent(message));
+      Assert.assertEquals("Hello World", message.getText());
       Assert.assertEquals("foo", "abc", message.getStringProperty("foo"));
       Assert.assertEquals("bar", "123", message.getStringProperty("bar"));
    }
@@ -355,9 +356,9 @@
 
       sendFrame(frame);
 
-      BytesMessage message = (BytesMessage)consumer.receive(1000);
+      TextMessage message = (TextMessage)consumer.receive(1000);
       Assert.assertNotNull(message);
-      Assert.assertEquals("Hello World", readContent(message));
+      Assert.assertEquals("Hello World", message.getText());
       Assert.assertEquals("JMSCorrelationID", "c123", message.getJMSCorrelationID());
       Assert.assertEquals("getJMSType", "t345", message.getJMSType());
       Assert.assertEquals("getJMSPriority", 3, message.getJMSPriority());
@@ -992,9 +993,9 @@
       waitForReceipt();
 
       // only second msg should be received since first msg was rolled back
-      BytesMessage message = (BytesMessage)consumer.receive(1000);
+      TextMessage message = (TextMessage)consumer.receive(1000);
       Assert.assertNotNull(message);
-      Assert.assertEquals("second message", readContent(message));
+      Assert.assertEquals("second message", message.getText());
    }
 
    public void testSubscribeToTopic() throws Exception
@@ -1411,12 +1412,14 @@
 
    public void sendMessage(String msg) throws Exception
    {
-      sendMessage(msg.getBytes("UTF-8"), "foo", "xyz", queue);
+      sendMessage(msg, queue);
    }
 
    public void sendMessage(String msg, Destination destination) throws Exception
    {
-      sendMessage(msg.getBytes("UTF-8"), "foo", "xyz", destination);
+      MessageProducer producer = session.createProducer(destination);
+      TextMessage message = session.createTextMessage(msg);
+      producer.send(message);
    }
 
    public void sendMessage(byte[] data, Destination destination) throws Exception
@@ -1438,13 +1441,6 @@
       producer.send(message);
    }
 
-   public String readContent(BytesMessage message) throws Exception
-   {
-      byte[] data = new byte[1024];
-      int size = message.readBytes(data);
-      return new String(data, 0, size, "UTF-8");
-   }
-
    protected void waitForReceipt() throws Exception
    {
       String frame = receiveFrame(50000);



More information about the hornetq-commits mailing list