[hornetq-commits] JBoss hornetq SVN: r8892 - in trunk: src/main/org/hornetq/core/protocol/stomp and 4 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Mon Feb 22 05:16:43 EST 2010


Author: jmesnil
Date: 2010-02-22 05:16:42 -0500 (Mon, 22 Feb 2010)
New Revision: 8892

Added:
   trunk/src/main/org/hornetq/core/protocol/stomp/StompFrameDecoder.java
   trunk/tests/src/org/hornetq/tests/stress/stomp/
   trunk/tests/src/org/hornetq/tests/stress/stomp/StompStressTest.java
Removed:
   trunk/src/main/org/hornetq/core/protocol/stomp/StompFrameDelimiter.java
   trunk/src/main/org/hornetq/core/protocol/stomp/StompFrameError.java
   trunk/src/main/org/hornetq/core/protocol/stomp/StompMarshaller.java
Modified:
   trunk/docs/user-manual/en/interoperability.xml
   trunk/src/main/org/hornetq/core/protocol/stomp/Stomp.java
   trunk/src/main/org/hornetq/core/protocol/stomp/StompFrame.java
   trunk/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java
   trunk/src/main/org/hornetq/core/protocol/stomp/StompSession.java
   trunk/src/main/org/hornetq/core/protocol/stomp/StompSubscription.java
   trunk/src/main/org/hornetq/integration/transports/netty/ChannelPipelineSupport.java
   trunk/tests/src/org/hornetq/tests/integration/stomp/StompTest.java
Log:
https://jira.jboss.org/jira/browse/HORNETQ-129: Implement STOMP v1.0

* refactoring to handle correctly frame body with null bytes

Modified: trunk/docs/user-manual/en/interoperability.xml
===================================================================
--- trunk/docs/user-manual/en/interoperability.xml	2010-02-22 09:33:27 UTC (rev 8891)
+++ trunk/docs/user-manual/en/interoperability.xml	2010-02-22 10:16:42 UTC (rev 8892)
@@ -72,11 +72,14 @@
             to an address.
             When a Stomp client subscribes (or unsubscribes) for a destination (using a <literal>SUBSCRIBE</literal>
             or <literal>UNSUBSCRIBE</literal> frame), the destination is mapped to a HornetQ queue.</para>
-           <section>
-             <title>Using JMS destinations</title>
-             <para>As explained in <xref linkend="jms-core-mapping" />, JMS destinations are also mapped to HornetQ addresses and queues.
-             If you want to use Stomp to send messages to JMS destinations, the Stomp destinations must follow the same convention:</para>
-             <itemizedlist>
+        </section>
+        <section>
+          <title>Stomp and JMS interoperabilty</title>
+          <section>
+            <title>Using JMS destinations</title>
+            <para>As explained in <xref linkend="jms-core-mapping" />, JMS destinations are also mapped to HornetQ addresses and queues.
+              If you want to use Stomp to send messages to JMS destinations, the Stomp destinations must follow the same convention:</para>
+            <itemizedlist>
               <listitem>
                 <para>send or subscribe to a JMS <emphasis>Queue</emphasis> by prepending the queue name by <literal>jms.queue.</literal>.</para>
                 <para>For example, to send a message to the <literal>orders</literal> JMS Queue, the Stomp client must send the frame:</para>
@@ -92,15 +95,34 @@
                 <para>send or subscribe to a JMS <emphasis>Topic</emphasis> by prepending the topic name by <literal>jms.topic.</literal>.</para>
                 <para>For example to subscribe to the <literal>stocks</literal> JMS Topic, the Stomp client must send the frame:</para>
                 <programlisting>
-SUBSCRIBE
-destination:jms.topic.stocks
-
-^@
+  SUBSCRIBE
+  destination:jms.topic.stocks
+  
+  ^@
                 </programlisting>
               </listitem>
              </itemizedlist>
-             
            </section>
+
+           <section>
+             <title>Send and consuming Stomp message from JMS</title>
+             <para>Stomp messages can be sent and consumed from a JMS Destination by using <literal>BytesMessage</literal> where
+                the Stomp message body is stored in the JMS BytesMessage body.</para>
+             <para>If the Stomp message contained a UTF-8 String, the corresponding code to read the string from a JMS BytesMessage is:</para>
+             <programlisting>
+BytesMessage message = (BytesMessage)consumer.receive();
+byte[] data = new byte[1024];
+int size = message.readBytes(data);
+String text = new String(data, 0, size, "UTF-8");
+             </programlisting>
+             <para>Conversely, to send a JMS BytesMessage destined to be consumed by Stomp as a UTF-8 String, the code is:</para>
+             <programlisting>
+String text = ...
+BytesMessage message = session.createBytesMessage();
+message.writeBytes(text.getBytes("UTF-8"));
+producer.send(message);
+             </programlisting>
+          </section>
         </section>
     </section>
     <section>

Modified: trunk/src/main/org/hornetq/core/protocol/stomp/Stomp.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/stomp/Stomp.java	2010-02-22 09:33:27 UTC (rev 8891)
+++ trunk/src/main/org/hornetq/core/protocol/stomp/Stomp.java	2010-02-22 10:16:42 UTC (rev 8892)
@@ -68,7 +68,7 @@
 
    public interface Headers
    {
-      String SEPERATOR = ":";
+      String SEPARATOR = ":";
 
       String RECEIPT_REQUESTED = "receipt";
 

Modified: trunk/src/main/org/hornetq/core/protocol/stomp/StompFrame.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/stomp/StompFrame.java	2010-02-22 09:33:27 UTC (rev 8891)
+++ trunk/src/main/org/hornetq/core/protocol/stomp/StompFrame.java	2010-02-22 10:16:42 UTC (rev 8892)
@@ -17,10 +17,11 @@
  */
 package org.hornetq.core.protocol.stomp;
 
-import java.io.IOException;
-import java.util.HashMap;
 import java.util.Map;
 
+import org.hornetq.api.core.HornetQBuffer;
+import org.hornetq.api.core.HornetQBuffers;
+
 /**
  * Represents all the data in a STOMP frame.
  *
@@ -28,27 +29,29 @@
  */
 class StompFrame
 {
-   private static final byte[] NO_DATA = new byte[] {};
+   public static final byte[] NO_DATA = new byte[] {};
+   private static final byte[] END_OF_FRAME = new byte[] { 0, '\n' };
 
-   private String command;
+   private final String command;
+   private final Map<String, Object> headers;
+   private final byte[] content;
+   
+   private HornetQBuffer buffer = null;
+   private int size;
 
-   private Map<String, Object> headers;
-
-   private byte[] content = StompFrame.NO_DATA;
-
-   private int size = -1;
-
-   public StompFrame()
-   {
-      this.headers = new HashMap<String, Object>();
-   }
-
    public StompFrame(String command, Map<String, Object> headers, byte[] data)
    {
       this.command = command;
       this.headers = headers;
       this.content = data;
    }
+   
+   public StompFrame(String command, Map<String, Object> headers)
+   {
+      this.command = command;
+      this.headers = headers;
+      this.content = NO_DATA;
+   }
 
    public String getCommand()
    {
@@ -65,22 +68,13 @@
       return headers;
    }
 
-   public int getEncodedSize()
+   public int getEncodedSize() throws Exception
    {
-      if (size == -1)
+      if (buffer == null)
       {
-         StompMarshaller marshaller = new StompMarshaller();
-         try
-         {
-            size = marshaller.marshal(this).length;
-         }
-         catch (IOException e)
-         {
-            return -1;
-         }
+         buffer = toHornetQBuffer();
       }
-
-      return size ;
+      return size;
    }
 
    @Override
@@ -88,5 +82,33 @@
    {
       return "StompFrame[command=" + command + ", headers=" + headers + ", content-length=" + content.length + "]";
    }
+   
+   public HornetQBuffer toHornetQBuffer() throws Exception
+   {
+      if (buffer == null)
+      {
+         buffer = HornetQBuffers.dynamicBuffer(content.length + 512);
 
+         StringBuffer head = new StringBuffer();
+         head.append(command);
+         head.append(Stomp.NEWLINE);
+         // Output the headers.
+         for (Map.Entry<String, Object> header : headers.entrySet())
+         {
+            head.append(header.getKey());
+            head.append(Stomp.Headers.SEPARATOR);
+            head.append(header.getValue());
+            head.append(Stomp.NEWLINE);
+         }
+         // Add a newline to separate the headers from the content.
+         head.append(Stomp.NEWLINE);
+
+         buffer.writeBytes(head.toString().getBytes("UTF-8"));
+         buffer.writeBytes(content);
+         buffer.writeBytes(END_OF_FRAME);
+
+         size = buffer.writerIndex();
+      }
+      return buffer;
+   }
 }

Copied: trunk/src/main/org/hornetq/core/protocol/stomp/StompFrameDecoder.java (from rev 8887, trunk/src/main/org/hornetq/core/protocol/stomp/StompMarshaller.java)
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/stomp/StompFrameDecoder.java	                        (rev 0)
+++ trunk/src/main/org/hornetq/core/protocol/stomp/StompFrameDecoder.java	2010-02-22 10:16:42 UTC (rev 8892)
@@ -0,0 +1,200 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.hornetq.core.protocol.stomp;
+
+import java.io.IOException;
+import java.util.HashMap;
+
+import org.hornetq.api.core.HornetQBuffer;
+import org.hornetq.core.logging.Logger;
+
+/**
+ * Implements marshalling and unmarsalling the <a href="http://stomp.codehaus.org/">Stomp</a> protocol.
+ */
+class StompFrameDecoder
+{
+   private static final Logger log = Logger.getLogger(StompFrameDecoder.class);
+
+   private static final int MAX_COMMAND_LENGTH = 1024;
+
+   private static final int MAX_HEADER_LENGTH = 1024 * 10;
+
+   private static final int MAX_HEADERS = 1000;
+
+   private static final int MAX_DATA_LENGTH = 1024 * 1024 * 10;
+
+   public StompFrame decode(HornetQBuffer buffer)
+   {
+      try
+      {
+         String command = StompFrameDecoder.readLine(buffer, StompFrameDecoder.MAX_COMMAND_LENGTH, "The maximum command length was exceeded");
+         if (command == null)
+         {
+            return null;
+         }
+         command = command.trim();
+         if (command.length() == 0)
+         {
+            return null;
+         }
+         
+         // Parse the headers
+         HashMap<String, Object> headers = new HashMap<String, Object>(25);
+         while (true)
+         {
+            String line = StompFrameDecoder.readLine(buffer, StompFrameDecoder.MAX_HEADER_LENGTH, "The maximum header length was exceeded");
+            if (line == null)
+            {
+               return null;
+            }
+
+            if (headers.size() > StompFrameDecoder.MAX_HEADERS)
+            {
+               throw new StompException("The maximum number of headers was exceeded", true);
+            }
+
+            if (line.trim().length() == 0)
+            {
+               break;
+            }
+
+            try
+            {
+               int seperator_index = line.indexOf(Stomp.Headers.SEPARATOR);
+               if (seperator_index == -1)
+               {
+                  return null;
+               }
+               String name = line.substring(0, seperator_index).trim();
+               String value = line.substring(seperator_index + 1, line.length()).trim();
+               headers.put(name, value);
+            }
+            catch (Exception e)
+            {
+               throw new StompException("Unable to parse header line [" + line + "]", true);
+            }
+         }
+         // Read in the data part.
+         byte[] data = StompFrame.NO_DATA;
+         String contentLength = (String)headers.get(Stomp.Headers.CONTENT_LENGTH);
+         if (contentLength != null)
+         {
+
+            // Bless the client, he's telling us how much data to read in.
+            int length;
+            try
+            {
+               length = Integer.parseInt(contentLength.trim());
+            }
+            catch (NumberFormatException e)
+            {
+               throw new StompException("Specified content-length is not a valid integer", true);
+            }
+
+            if (length > StompFrameDecoder.MAX_DATA_LENGTH)
+            {
+               throw new StompException("The maximum data length was exceeded", true);
+            }
+
+            if (buffer.readableBytes() < length)
+            {
+               return null;
+            }
+            
+            data = new byte[length];
+            buffer.readBytes(data);
+
+            if (buffer.readByte() != 0)
+            {
+               throw new StompException(Stomp.Headers.CONTENT_LENGTH + " bytes were read and " +
+                                        "there was no trailing null byte", true);
+            }
+         }
+         else
+         {
+            byte[] body = new byte[StompFrameDecoder.MAX_DATA_LENGTH];
+            boolean bodyCorrectlyEnded = false;
+            int count = 0;
+            while (buffer.readable())
+            {
+               byte b = buffer.readByte();
+
+               if (b == (byte)'\0')
+               {
+                  bodyCorrectlyEnded = true;
+                  break;
+               }
+               else
+               {
+                  body[count++] = b;
+               }
+            }
+
+            if (!bodyCorrectlyEnded)
+            {
+               return null;
+            }
+
+            data = new byte[count];
+            System.arraycopy(body, 0, data, 0, count);
+         }
+
+         return new StompFrame(command, headers, data);
+      }
+      catch (IOException e)
+      {
+         log.error("Unable to decode stomp frame", e);
+         return null;
+      }
+   }
+   
+   private static String readLine(HornetQBuffer in, int maxLength, String errorMessage) throws IOException
+   {
+      char[] chars = new char[MAX_HEADER_LENGTH];
+
+      if (!in.readable())
+      {
+         return null;
+      }
+      
+      boolean properString = false;
+      int count = 0;
+      while (in.readable())
+      {
+         byte b = in.readByte();
+
+         if (b == (byte)'\n')
+         {
+            properString = true;
+            break;
+         }
+         else
+         {
+            chars[count++] = (char)b;
+         }
+      }
+      if (properString)
+      {
+         return new String(chars, 0, count);
+      }
+      else
+      {
+         return null;
+      }
+   }
+}

Deleted: trunk/src/main/org/hornetq/core/protocol/stomp/StompFrameDelimiter.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/stomp/StompFrameDelimiter.java	2010-02-22 09:33:27 UTC (rev 8891)
+++ trunk/src/main/org/hornetq/core/protocol/stomp/StompFrameDelimiter.java	2010-02-22 10:16:42 UTC (rev 8892)
@@ -1,33 +0,0 @@
-/*
- * Copyright 2009 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *    http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied.  See the License for the specific language governing
- * permissions and limitations under the License.
- */
-
-package org.hornetq.core.protocol.stomp;
-
-import org.jboss.netty.handler.codec.frame.DelimiterBasedFrameDecoder;
-import org.jboss.netty.handler.codec.frame.Delimiters;
-
-/**
- * A StompFrameDelimiter
- *
- * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
- */
-public class StompFrameDelimiter extends DelimiterBasedFrameDecoder
-{
-
-   private static final int MAX_DATA_LENGTH = 1024 * 1024 * 100;
-
-   public StompFrameDelimiter()
-   {
-      super(MAX_DATA_LENGTH, false, Delimiters.nulDelimiter());
-   }
-}

Deleted: trunk/src/main/org/hornetq/core/protocol/stomp/StompFrameError.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/stomp/StompFrameError.java	2010-02-22 09:33:27 UTC (rev 8891)
+++ trunk/src/main/org/hornetq/core/protocol/stomp/StompFrameError.java	2010-02-22 10:16:42 UTC (rev 8892)
@@ -1,38 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.hornetq.core.protocol.stomp;
-
-/**
- * Command indicating that an invalid Stomp Frame was received.
- *
- * @author <a href="http://hiramchirino.com">chirino</a>
- */
-class StompFrameError extends StompFrame
-{
-   private final StompException exception;
-
-   public StompFrameError(StompException exception)
-   {
-      this.exception = exception;
-   }
-
-   public StompException getException()
-   {
-      return exception;
-   }
-}

Deleted: trunk/src/main/org/hornetq/core/protocol/stomp/StompMarshaller.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/stomp/StompMarshaller.java	2010-02-22 09:33:27 UTC (rev 8891)
+++ trunk/src/main/org/hornetq/core/protocol/stomp/StompMarshaller.java	2010-02-22 10:16:42 UTC (rev 8892)
@@ -1,235 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.hornetq.core.protocol.stomp;
-
-import java.io.ByteArrayOutputStream;
-import java.io.DataOutput;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
-
-import org.hornetq.api.core.HornetQBuffer;
-
-/**
- * Implements marshalling and unmarsalling the <a href="http://stomp.codehaus.org/">Stomp</a> protocol.
- */
-class StompMarshaller
-{
-   public static final byte[] NO_DATA = new byte[] {};
-
-   private static final byte[] END_OF_FRAME = new byte[] { 0, '\n' };
-
-   private static final int MAX_COMMAND_LENGTH = 1024;
-
-   private static final int MAX_HEADER_LENGTH = 1024 * 10;
-
-   private static final int MAX_HEADERS = 1000;
-
-   private static final int MAX_DATA_LENGTH = 1024 * 1024 * 100;
-
-   private int version = 1;
-
-   public int getVersion()
-   {
-      return version;
-   }
-
-   public void setVersion(int version)
-   {
-      this.version = version;
-   }
-
-   public byte[] marshal(StompFrame command) throws IOException
-   {
-      ByteArrayOutputStream baos = new ByteArrayOutputStream();
-      DataOutputStream dos = new DataOutputStream(baos);
-      marshal(command, dos);
-      dos.close();
-      return baos.toByteArray();
-   }
-
-   public void marshal(StompFrame stomp, DataOutput os) throws IOException
-   {
-      StringBuffer buffer = new StringBuffer();
-      buffer.append(stomp.getCommand());
-      buffer.append(Stomp.NEWLINE);
-
-      // Output the headers.
-      for (Iterator<Map.Entry<String, Object>> iter = stomp.getHeaders().entrySet().iterator(); iter.hasNext();)
-      {
-         Map.Entry<String, Object> entry = iter.next();
-         buffer.append(entry.getKey());
-         buffer.append(Stomp.Headers.SEPERATOR);
-         buffer.append(entry.getValue());
-         buffer.append(Stomp.NEWLINE);
-      }
-
-      // Add a newline to seperate the headers from the content.
-      buffer.append(Stomp.NEWLINE);
-
-      os.write(buffer.toString().getBytes("UTF-8"));
-      os.write(stomp.getContent());
-      os.write(END_OF_FRAME);
-   }
-
-   public StompFrame unmarshal(HornetQBuffer in) throws IOException
-   {
-
-      try
-      {
-         String action = null;
-
-         // skip white space to next real action line
-         while (true)
-         {
-            action = readLine(in, MAX_COMMAND_LENGTH, "The maximum command length was exceeded");
-            if (action == null)
-            {
-               throw new IOException("connection was closed");
-            }
-            else
-            {
-               action = action.trim();
-               if (action.length() > 0)
-               {
-                  break;
-               }
-            }
-         }
-
-         // Parse the headers
-         HashMap<String, Object> headers = new HashMap<String, Object>(25);
-         while (true)
-         {
-            String line = readLine(in, MAX_HEADER_LENGTH, "The maximum header length was exceeded");
-            if (line != null && line.trim().length() > 0)
-            {
-
-               if (headers.size() > MAX_HEADERS)
-               {
-                  throw new StompException("The maximum number of headers was exceeded", true);
-               }
-
-               try
-               {
-                  int seperator_index = line.indexOf(Stomp.Headers.SEPERATOR);
-                  String name = line.substring(0, seperator_index).trim();
-                  String value = line.substring(seperator_index + 1, line.length()).trim();
-                  headers.put(name, value);
-               }
-               catch (Exception e)
-               {
-                  throw new StompException("Unable to parser header line [" + line + "]", true);
-               }
-            }
-            else
-            {
-               break;
-            }
-         }
-
-         // Read in the data part.
-         byte[] data = NO_DATA;
-         String contentLength = (String)headers.get(Stomp.Headers.CONTENT_LENGTH);
-         if (contentLength != null)
-         {
-
-            // Bless the client, he's telling us how much data to read in.
-            int length;
-            try
-            {
-               length = Integer.parseInt(contentLength.trim());
-            }
-            catch (NumberFormatException e)
-            {
-               throw new StompException("Specified content-length is not a valid integer", true);
-            }
-
-            if (length > MAX_DATA_LENGTH)
-            {
-               throw new StompException("The maximum data length was exceeded", true);
-            }
-
-            data = new byte[length];
-            in.readBytes(data);
-
-            if (in.readByte() != 0)
-            {
-               throw new StompException(Stomp.Headers.CONTENT_LENGTH + " bytes were read and " +
-                                        "there was no trailing null byte", true);
-            }
-         }
-         else
-         {
-
-            // We don't know how much to read.. data ends when we hit a 0
-            byte b;
-            ByteArrayOutputStream baos = null;
-            while (in.readableBytes() > 0 && (b = in.readByte()) != 0)
-            {
-
-               if (baos == null)
-               {
-                  baos = new ByteArrayOutputStream();
-               }
-               else if (baos.size() > MAX_DATA_LENGTH)
-               {
-                  throw new StompException("The maximum data length was exceeded", true);
-               }
-
-               baos.write(b);
-            }
-
-            if (baos != null)
-            {
-               baos.close();
-               data = baos.toByteArray();
-            }
-         }
-
-         return new StompFrame(action, headers, data);
-      }
-      catch (StompException e)
-      {
-         return new StompFrameError(e);
-      }
-   }
-
-   protected String readLine(HornetQBuffer in, int maxLength, String errorMessage) throws IOException
-   {
-      char[] chars = new char[MAX_HEADER_LENGTH];
-
-      int count = 0;
-      while (in.readable())
-      {
-         byte b = in.readByte();
-
-         if (b == (byte)'\n')
-         {
-            break;
-         }
-         else
-         {
-            chars[count++] = (char)b;
-         }
-      }
-      return new String(chars, 0, count);
-   }
-}

Modified: trunk/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java	2010-02-22 09:33:27 UTC (rev 8891)
+++ trunk/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java	2010-02-22 10:16:42 UTC (rev 8892)
@@ -14,7 +14,6 @@
 package org.hornetq.core.protocol.stomp;
 
 import java.io.ByteArrayOutputStream;
-import java.io.IOException;
 import java.io.OutputStreamWriter;
 import java.io.PrintWriter;
 import java.io.UnsupportedEncodingException;
@@ -26,7 +25,6 @@
 import java.util.concurrent.Executor;
 
 import org.hornetq.api.core.HornetQBuffer;
-import org.hornetq.api.core.HornetQBuffers;
 import org.hornetq.api.core.HornetQException;
 import org.hornetq.api.core.Interceptor;
 import org.hornetq.api.core.Message;
@@ -61,7 +59,7 @@
 
    private final HornetQServer server;
 
-   private final StompMarshaller marshaller;
+   private final StompFrameDecoder frameDecoder;
 
    private final Executor executor;
 
@@ -106,7 +104,7 @@
    public StompProtocolManager(final HornetQServer server, final List<Interceptor> interceptors)
    {
       this.server = server;
-      this.marshaller = new StompMarshaller();
+      this.frameDecoder = new StompFrameDecoder();
       this.executor = server.getExecutorFactory().getExecutor();
    }
 
@@ -125,9 +123,21 @@
 
    public int isReadyToHandle(HornetQBuffer buffer)
    {
-      return -1;
+      int start = buffer.readerIndex();
+
+      StompFrame frame = frameDecoder.decode(buffer);
+
+      if (frame == null)
+      {
+         return -1;
+      } 
+      else
+      {
+         return buffer.readerIndex() - start;
+      }
    }
 
+
    public void handleBuffer(final RemotingConnection connection, final HornetQBuffer buffer)
    {
       executor.execute(new Runnable()
@@ -146,21 +156,21 @@
       });
    }
    
-   private void doHandleBuffer(RemotingConnection connection, HornetQBuffer buffer)
+   private void doHandleBuffer(final RemotingConnection connection, final HornetQBuffer buffer)
    {
       StompConnection conn = (StompConnection)connection;
       StompFrame request = null;
       try
       {
-         request = marshaller.unmarshal(buffer);
+         request = frameDecoder.decode(buffer);
          if (log.isTraceEnabled())
          {
             log.trace("received " + request);
          }
          
          String command = request.getCommand();
-
          StompFrame response = null;
+
          if (Stomp.Commands.CONNECT.equals(command))
          {
             response = onConnect(request, conn);
@@ -199,7 +209,6 @@
          }
          else
          {
-
             log.error("Unsupported Stomp frame: " + request);
             response = new StompFrame(Stomp.Responses.ERROR,
                                       new HashMap<String, Object>(),
@@ -211,7 +220,7 @@
             if (response == null)
             {
                Map<String, Object> h = new HashMap<String, Object>();
-               response = new StompFrame(Stomp.Responses.RECEIPT, h, StompMarshaller.NO_DATA);
+               response = new StompFrame(Stomp.Responses.RECEIPT, h);
             }
             response.getHeaders().put(Stomp.Headers.Response.RECEIPT_ID,
                                       request.getHeaders().get(Stomp.Headers.RECEIPT_REQUESTED));
@@ -325,7 +334,7 @@
       boolean unsubscribed = stompSession.unsubscribe(subscriptionID);
       if (!unsubscribed)
       {
-         throw new StompException("Cannot unsubscribe as a subscription exists for id: " + subscriptionID);
+         throw new StompException("Cannot unsubscribe as no subscription exists for id: " + subscriptionID);
       }
       return null;
    }
@@ -473,11 +482,7 @@
       Map<String, Object> headers = frame.getHeaders();
       String destination = (String)headers.remove(Stomp.Headers.Send.DESTINATION);
       String txID = (String)headers.remove(Stomp.Headers.TRANSACTION);
-      byte type = Message.TEXT_TYPE;
-      if (headers.containsKey(Stomp.Headers.CONTENT_LENGTH))
-      {
-         type = Message.BYTES_TYPE;
-      }
+      byte type = Message.BYTES_TYPE;
       long timestamp = System.currentTimeMillis();
 
       ServerMessageImpl message = new ServerMessageImpl(server.getStorageManager().generateUniqueID(), 512);
@@ -485,15 +490,7 @@
       message.setTimestamp(timestamp);
       message.setAddress(SimpleString.toSimpleString(destination));
       StompUtils.copyStandardHeadersFromFrameToMessage(frame, message);
-      byte[] content = frame.getContent();
-      if (type == Message.TEXT_TYPE)
-      {
-         message.getBodyBuffer().writeNullableSimpleString(SimpleString.toSimpleString(new String(content)));
-      }
-      else
-      {
-         message.getBodyBuffer().writeBytes(content);
-      }
+      message.getBodyBuffer().writeBytes(frame.getContent());
 
       StompSession stompSession = null;
       if (txID == null)
@@ -533,7 +530,7 @@
       {
          h.put(Stomp.Headers.Connected.RESPONSE_ID, requestID);
       }
-      return new StompFrame(Stomp.Responses.CONNECTED, h, StompMarshaller.NO_DATA);
+      return new StompFrame(Stomp.Responses.CONNECTED, h);
    }
 
    public void send(final StompConnection connection, final StompFrame frame)
@@ -619,16 +616,16 @@
 
          try
          {
-            byte[] bytes = marshaller.marshal(frame);
-            HornetQBuffer buffer = HornetQBuffers.wrappedBuffer(bytes);
+            HornetQBuffer buffer = frame.toHornetQBuffer();
             connection.getTransportConnection().write(buffer, true);
          }
-         catch (IOException e)
+         catch (Exception e)
          {
             log.error("Unable to send frame " + frame, e);
          }
       }
    }
 
+
    // Inner classes -------------------------------------------------
 }

Modified: trunk/src/main/org/hornetq/core/protocol/stomp/StompSession.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/stomp/StompSession.java	2010-02-22 09:33:27 UTC (rev 8891)
+++ trunk/src/main/org/hornetq/core/protocol/stomp/StompSession.java	2010-02-22 10:16:42 UTC (rev 8892)
@@ -18,7 +18,6 @@
 import java.util.Map.Entry;
 
 import org.hornetq.api.core.HornetQBuffer;
-import org.hornetq.api.core.Message;
 import org.hornetq.api.core.SimpleString;
 import org.hornetq.core.message.impl.MessageImpl;
 import org.hornetq.core.persistence.OperationContext;
@@ -28,6 +27,7 @@
 import org.hornetq.core.server.ServerSession;
 import org.hornetq.spi.core.protocol.RemotingConnection;
 import org.hornetq.spi.core.protocol.SessionCallback;
+import org.hornetq.utils.DataConstants;
 import org.hornetq.utils.UUIDGenerator;
 
 /**
@@ -85,32 +85,20 @@
          {
             headers.put(Stomp.Headers.Message.SUBSCRIPTION, subscription.getID());
          }
-         byte[] data = new byte[] {};
-         serverMessage.getBodyBuffer().markReaderIndex();
-         if (serverMessage.getType() == Message.TEXT_TYPE)
-         {
-            SimpleString text = serverMessage.getBodyBuffer().readNullableSimpleString();
-            if (text != null)
-            {
-               data = text.toString().getBytes("UTF-8");
-            }
-         }
-         else
-         {
-            HornetQBuffer buffer = serverMessage.getBodyBuffer();
-            buffer.readerIndex(MessageImpl.BUFFER_HEADER_SPACE);
-            int size = serverMessage.getEndOfBodyPosition() - buffer.readerIndex();
-            data = new byte[size];
-            buffer.readBytes(data);
-            headers.put(Headers.CONTENT_LENGTH, data.length);
-         }
+         HornetQBuffer buffer = serverMessage.getBodyBuffer();
+         buffer.readerIndex(MessageImpl.BUFFER_HEADER_SPACE + DataConstants.SIZE_INT);
+         int bodyPos = serverMessage.getEndOfBodyPosition() == -1 ? buffer.writerIndex() : serverMessage.getEndOfBodyPosition();
+         int size = bodyPos - buffer.readerIndex();
+         byte[] data = new byte[size];
+         buffer.readBytes(data);
+         headers.put(Headers.CONTENT_LENGTH, data.length);
          serverMessage.getBodyBuffer().resetReaderIndex();
-
+         
          StompFrame frame = new StompFrame(Stomp.Responses.MESSAGE, headers, data);
          StompUtils.copyStandardHeadersFromMessageToFrame(serverMessage, frame, deliveryCount);
 
          manager.send(connection, frame);
-         int size =  frame.getEncodedSize();
+         int length =  frame.getEncodedSize();
          
          if (subscription.getAck().equals(Stomp.Headers.Subscribe.AckModeValues.AUTO))
          {
@@ -121,7 +109,7 @@
          {
             messagesToAck.put(serverMessage.getMessageID(), consumerID);
          }
-         return size;
+         return length;
 
       }
       catch (Exception e)
@@ -183,7 +171,7 @@
                // Already exists
                if (query.getConsumerCount() > 0)
                {
-                  throw new IllegalStateException("Cannot create a subscriber on the durable subscription since it already has subscriber(s)");
+                  throw new IllegalStateException("Cannot create a subscriber on the durable subscription since it already has a subscriber: " + queue);
                }
             }
          } else
@@ -194,7 +182,7 @@
       }
       session.createConsumer(consumerID, queue, SimpleString.toSimpleString(selector), false);
       session.receiveConsumerCredits(consumerID, -1);
-      StompSubscription subscription = new StompSubscription(subscriptionID, destination, ack);
+      StompSubscription subscription = new StompSubscription(subscriptionID, ack);
       subscriptions.put(consumerID, subscription);
       // FIXME not very smart: since we can't start the consumer, we start the session
       // every time to start the new consumer (and all previous consumers...)

Modified: trunk/src/main/org/hornetq/core/protocol/stomp/StompSubscription.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/stomp/StompSubscription.java	2010-02-22 09:33:27 UTC (rev 8891)
+++ trunk/src/main/org/hornetq/core/protocol/stomp/StompSubscription.java	2010-02-22 10:16:42 UTC (rev 8892)
@@ -28,18 +28,15 @@
 
    private final String subID;
 
-   private final String destination;
-
    private final String ack;
 
    // Static --------------------------------------------------------
 
    // Constructors --------------------------------------------------
 
-   public StompSubscription(String subID, String destination, String ack)
+   public StompSubscription(String subID, String ack)
    {
       this.subID = subID;
-      this.destination = destination;
       this.ack = ack;
    }
 
@@ -50,11 +47,6 @@
       return ack;
    }
 
-   public String getDestination()
-   {
-      return destination;
-   }
-
    public String getID()
    {
       return subID;

Modified: trunk/src/main/org/hornetq/integration/transports/netty/ChannelPipelineSupport.java
===================================================================
--- trunk/src/main/org/hornetq/integration/transports/netty/ChannelPipelineSupport.java	2010-02-22 09:33:27 UTC (rev 8891)
+++ trunk/src/main/org/hornetq/integration/transports/netty/ChannelPipelineSupport.java	2010-02-22 10:16:42 UTC (rev 8892)
@@ -16,7 +16,6 @@
 import javax.net.ssl.SSLContext;
 import javax.net.ssl.SSLEngine;
 
-import org.hornetq.core.protocol.stomp.StompFrameDelimiter;
 import org.hornetq.spi.core.protocol.ProtocolType;
 import org.hornetq.spi.core.remoting.BufferDecoder;
 import org.jboss.netty.channel.ChannelPipeline;
@@ -55,10 +54,6 @@
          //Core protocol uses it's own optimised decoder
          pipeline.addLast("decoder", new HornetQFrameDecoder2());
       }
-      else if (protocol == ProtocolType.STOMP)
-      {
-         pipeline.addLast("decoder", new StompFrameDelimiter());
-      }
       else
       {
          pipeline.addLast("decoder", new HornetQFrameDecoder(decoder));

Modified: trunk/tests/src/org/hornetq/tests/integration/stomp/StompTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/stomp/StompTest.java	2010-02-22 09:33:27 UTC (rev 8891)
+++ trunk/tests/src/org/hornetq/tests/integration/stomp/StompTest.java	2010-02-22 10:16:42 UTC (rev 8892)
@@ -25,6 +25,8 @@
 import java.net.SocketTimeoutException;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
@@ -33,12 +35,12 @@
 import javax.jms.ConnectionFactory;
 import javax.jms.DeliveryMode;
 import javax.jms.Destination;
-import javax.jms.JMSException;
+import javax.jms.Message;
 import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
 import javax.jms.MessageProducer;
 import javax.jms.Queue;
 import javax.jms.Session;
-import javax.jms.TextMessage;
 import javax.jms.Topic;
 
 import junit.framework.Assert;
@@ -76,6 +78,45 @@
     private Topic topic;
     private JMSServerManager server;
 
+    public void _testSendManyMessages() throws Exception {
+       MessageConsumer consumer = session.createConsumer(queue);
+ 
+       String frame =
+               "CONNECT\n" +
+                       "login: brianm\n" +
+                       "passcode: wombats\n\n" +
+                       Stomp.NULL;
+       sendFrame(frame);
+       frame = receiveFrame(10000);
+       
+       Assert.assertTrue(frame.startsWith("CONNECTED"));    
+       int count = 1000;
+       final CountDownLatch latch = new CountDownLatch(count);
+       consumer.setMessageListener(new MessageListener()
+      {
+         
+         public void onMessage(Message arg0)
+         {
+            System.out.println("<<< " + (1000 - latch.getCount()));
+            latch.countDown();
+         }
+      });
+       
+       frame =
+               "SEND\n" +
+                       "destination:" + getQueuePrefix() + getQueueName() + "\n\n" +
+                       "Hello World" +
+                       Stomp.NULL;    
+       for (int i=1; i <= count; i++) {
+          // Thread.sleep(1);
+          System.out.println(">>> " + i);
+          sendFrame(frame);
+       }
+       
+       assertTrue(latch.await(60, TimeUnit.SECONDS));
+        
+    }
+    
     public void testConnect() throws Exception {
 
        String connect_frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n" + "request-id: 1\n" + "\n" + Stomp.NULL;
@@ -137,9 +178,9 @@
 
         sendFrame(frame);
         
-        TextMessage message = (TextMessage) consumer.receive(1000);
+        BytesMessage message = (BytesMessage) consumer.receive(1000);
         Assert.assertNotNull(message);
-        Assert.assertEquals("Hello World", message.getText());
+        Assert.assertEquals("Hello World", readContent(message));
 
         // Make sure that the timestamp is valid - should
         // be very close to the current time.
@@ -175,9 +216,9 @@
        Assert.assertTrue(f.startsWith("RECEIPT"));
        Assert.assertTrue(f.indexOf("receipt-id:1234") >= 0);
 
-       TextMessage message = (TextMessage) consumer.receive(1000);
+       BytesMessage message = (BytesMessage) consumer.receive(1000);
        Assert.assertNotNull(message);
-       Assert.assertEquals("Hello World", message.getText());
+       Assert.assertEquals("Hello World", readContent(message));
 
        // Make sure that the timestamp is valid - should
        // be very close to the current time.
@@ -200,16 +241,17 @@
        frame = receiveFrame(10000);
        Assert.assertTrue(frame.startsWith("CONNECTED"));
 
-       byte[] data = new byte[] {1, 2, 3, 4};
-        
+       byte[] data = new byte[] {1, 0, 0, 4};
+       
        frame =
                "SEND\n" +
                        "destination:" + getQueuePrefix() + getQueueName() + "\n" +
-                       "content-length:" + data.length + "\n\n" +
-                       new String(data) +
-                       Stomp.NULL;
-
-       sendFrame(frame);
+                       "content-length:" + data.length + "\n\n";
+       ByteArrayOutputStream baos = new ByteArrayOutputStream();
+       baos.write(frame.getBytes("UTF-8"));
+       baos.write(data);
+       baos.write('\0');
+       sendFrame(baos.toByteArray());
        
        BytesMessage message = (BytesMessage) consumer.receive(1000);
        Assert.assertNotNull(message);
@@ -218,12 +260,6 @@
        assertEquals(data[1], message.readByte());
        assertEquals(data[2], message.readByte());
        assertEquals(data[3], message.readByte());
-
-       // Make sure that the timestamp is valid - should
-       // be very close to the current time.
-       long tnow = System.currentTimeMillis();
-       long tmsg = message.getJMSTimestamp();
-       Assert.assertTrue(Math.abs(tnow - tmsg) < 1000);
    }
 
     public void testJMSXGroupIdCanBeSet() throws Exception {
@@ -249,10 +285,11 @@
 
         sendFrame(frame);
 
-        TextMessage message = (TextMessage) consumer.receive(1000);
+        BytesMessage message = (BytesMessage) consumer.receive(1000);
         Assert.assertNotNull(message);
+        Assert.assertEquals("Hello World", readContent(message));
         // differ from StompConnect
-        Assert.assertEquals("TEST", ((TextMessage) message).getStringProperty("JMSXGroupID"));
+        Assert.assertEquals("TEST", message.getStringProperty("JMSXGroupID"));
     }
 
     public void testSendMessageWithCustomHeadersAndSelector() throws Exception {
@@ -279,9 +316,9 @@
 
         sendFrame(frame);
 
-        TextMessage message = (TextMessage) consumer.receive(1000);
+        BytesMessage message = (BytesMessage) consumer.receive(1000);
         Assert.assertNotNull(message);
-        Assert.assertEquals("Hello World", message.getText());
+        Assert.assertEquals("Hello World", readContent(message));
         Assert.assertEquals("foo", "abc", message.getStringProperty("foo"));
         Assert.assertEquals("bar", "123", message.getStringProperty("bar"));
     }
@@ -315,9 +352,9 @@
 
         sendFrame(frame);
 
-        TextMessage message = (TextMessage) consumer.receive(1000);
+        BytesMessage message = (BytesMessage) consumer.receive(1000);
         Assert.assertNotNull(message);
-        Assert.assertEquals("Hello World", message.getText());
+        Assert.assertEquals("Hello World", readContent(message));
         Assert.assertEquals("JMSCorrelationID", "c123", message.getJMSCorrelationID());
         Assert.assertEquals("getJMSType", "t345", message.getJMSType());
         Assert.assertEquals("getJMSPriority", 3, message.getJMSPriority());
@@ -364,7 +401,7 @@
         
         // message should not be received as it was auto-acked
         MessageConsumer consumer = session.createConsumer(queue);
-        TextMessage message = (TextMessage) consumer.receive(1000);
+        Message message = consumer.receive(1000);
         Assert.assertNull(message);
 
     }
@@ -389,7 +426,7 @@
         sendFrame(frame);
 
         byte[] payload = new byte[]{1, 2, 3, 4, 5}; 
-        sendBytesMessage(payload);
+        sendMessage(payload, queue);
 
         frame = receiveFrame(10000);
         Assert.assertTrue(frame.startsWith("MESSAGE"));
@@ -429,7 +466,7 @@
         sendFrame(frame);
 
         MessageProducer producer = session.createProducer(queue);
-        TextMessage message = session.createTextMessage("Hello World");
+        BytesMessage message = session.createBytesMessage();
         message.setStringProperty("S", "value");
         message.setBooleanProperty("n", false);
         message.setByteProperty("byte", (byte) 9);
@@ -438,6 +475,7 @@
         message.setIntProperty("i", 10);
         message.setLongProperty("l", 121);
         message.setShortProperty("s", (short) 12);
+        message.writeBytes("Hello World".getBytes("UTF-8"));
         producer.send(message);
 
         frame = receiveFrame(10000);
@@ -532,12 +570,6 @@
                        "\n\n" +
                        Stomp.NULL;
        sendFrame(frame);
-       
-       // message should not be received as it was auto-acked
-       MessageConsumer consumer = session.createConsumer(queue);
-       TextMessage message = (TextMessage) consumer.receive(1000);
-       Assert.assertNull(message);
-
    }
     
     public void testMessagesAreInOrder() throws Exception {
@@ -667,7 +699,7 @@
 
        // message should not be received since message was acknowledged by the client
        MessageConsumer consumer = session.createConsumer(queue);
-       TextMessage message = (TextMessage) consumer.receive(1000);
+       Message message = consumer.receive(1000);
        Assert.assertNull(message);
    }
     
@@ -703,7 +735,7 @@
 
         // message should be received since message was not acknowledged
         MessageConsumer consumer = session.createConsumer(queue);
-        TextMessage message = (TextMessage) consumer.receive(1000);
+        Message message = consumer.receive(1000);
         Assert.assertNotNull(message);
         Assert.assertTrue(message.getJMSRedelivered());
     }
@@ -957,7 +989,7 @@
         sendFrame(frame);
         waitForReceipt();
 
-        TextMessage message = (TextMessage) consumer.receive(1000);
+        Message message = consumer.receive(1000);
         Assert.assertNotNull("Should have received a message", message);
     }
     
@@ -998,7 +1030,7 @@
                        Stomp.NULL;
        sendFrame(frame);
 
-       TextMessage message = (TextMessage) consumer.receive(1000);
+       Message message = consumer.receive(1000);
        Assert.assertNotNull("Should have received a message", message);
 
        // 2nd tx with same tx ID
@@ -1025,7 +1057,7 @@
                        Stomp.NULL;
        sendFrame(frame);
 
-       message = (TextMessage) consumer.receive(1000);
+       message = consumer.receive(1000);
        Assert.assertNotNull("Should have received a message", message);
 }
     
@@ -1123,9 +1155,9 @@
         waitForReceipt();
 
         //only second msg should be received since first msg was rolled back
-        TextMessage message = (TextMessage) consumer.receive(1000);
+        BytesMessage message = (BytesMessage) consumer.receive(1000);
         Assert.assertNotNull(message);
-        Assert.assertEquals("second message", message.getText().trim());
+        Assert.assertEquals("second message", readContent(message));
     }
     
     public void testSubscribeToTopic() throws Exception {
@@ -1203,25 +1235,21 @@
        String subscribeFrame =
                "SUBSCRIBE\n" +
                        "destination:" + getTopicPrefix() + getTopicName() + "\n" +
-                       "receipt: 12\n" +
                        "durable-subscription-name: " + getName() + "\n" + 
                        "\n\n" +
                        Stomp.NULL;
        sendFrame(subscribeFrame);
-       // wait for SUBSCRIBE's receipt
-       frame = receiveFrame(10000);
-       Assert.assertTrue(frame.startsWith("RECEIPT"));
+       waitForFrameToTakeEffect();
 
        String disconnectFrame =
           "DISCONNECT\n" +
                   "\n\n" +
                   Stomp.NULL;
        sendFrame(disconnectFrame);
-       stompSocket.close();
+       waitForFrameToTakeEffect();
        
        // send the message when the durable subscriber is disconnected
        sendMessage(getName(), topic);
-  
 
        reconnect(1000);
        sendFrame(connectFame);
@@ -1229,9 +1257,6 @@
        Assert.assertTrue(frame.startsWith("CONNECTED"));
        
        sendFrame(subscribeFrame);
-       // wait for SUBSCRIBE's receipt
-       frame = receiveFrame(10000);
-       Assert.assertTrue(frame.startsWith("RECEIPT"));
 
        // we must have received the message 
        frame = receiveFrame(10000);
@@ -1522,6 +1547,14 @@
         outputStream.flush();
     }
 
+    public void sendFrame(byte[] data) throws Exception {
+        OutputStream outputStream = stompSocket.getOutputStream();
+        for (int i = 0; i < data.length; i++) {
+            outputStream.write(data[i]);
+        }
+        outputStream.flush();
+    }
+    
     public String receiveFrame(long timeOut) throws Exception {
         stompSocket.setSoTimeout((int) timeOut);
         InputStream is = stompSocket.getInputStream();
@@ -1550,31 +1583,36 @@
     }
 
     public void sendMessage(String msg) throws Exception {
-       sendMessage(msg, "foo", "xyz", queue);
+       sendMessage(msg.getBytes("UTF-8"), "foo", "xyz", queue);
    }
 
     public void sendMessage(String msg, Destination destination) throws Exception {
-        sendMessage(msg, "foo", "xyz", destination);
+        sendMessage(msg.getBytes("UTF-8"), "foo", "xyz", destination);
     }
 
-    public void sendMessage(String msg, String propertyName, String propertyValue) throws JMSException {
-       sendMessage(msg, propertyName, propertyValue, queue);
-    }
+    public void sendMessage(byte[] data, Destination destination) throws Exception {
+       sendMessage(data, "foo", "xyz", destination);
+   }
     
-    public void sendMessage(String msg, String propertyName, String propertyValue, Destination destination) throws JMSException {
-        MessageProducer producer = session.createProducer(destination);
-        TextMessage message = session.createTextMessage(msg);
-        message.setStringProperty(propertyName, propertyValue);
-        producer.send(message);
+    public void sendMessage(String msg, String propertyName, String propertyValue) throws Exception {
+       sendMessage(msg.getBytes("UTF-8"), propertyName, propertyValue, queue);
     }
 
-    public void sendBytesMessage(byte[] msg) throws Exception {
-        MessageProducer producer = session.createProducer(queue);
+    public void sendMessage(byte[] data, String propertyName, String propertyValue, Destination destination) throws Exception {
+        MessageProducer producer = session.createProducer(destination);
         BytesMessage message = session.createBytesMessage();
-        message.writeBytes(msg);
+        message.setStringProperty(propertyName, propertyValue);
+        message.writeBytes(data);
         producer.send(message);
     }
-
+    
+    public String readContent(BytesMessage message) throws Exception
+    {
+       byte[] data = new byte[1024];
+       int size = message.readBytes(data);
+       return new String(data, 0, size, "UTF-8");
+    }
+    
     protected void waitForReceipt() throws Exception {
        String frame = receiveFrame(50000);
        assertNotNull(frame);

Added: trunk/tests/src/org/hornetq/tests/stress/stomp/StompStressTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/stress/stomp/StompStressTest.java	                        (rev 0)
+++ trunk/tests/src/org/hornetq/tests/stress/stomp/StompStressTest.java	2010-02-22 10:16:42 UTC (rev 8892)
@@ -0,0 +1,182 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.hornetq.tests.stress.stomp;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.Socket;
+import java.util.HashMap;
+import java.util.Map;
+
+import junit.framework.Assert;
+
+import org.hornetq.api.core.TransportConfiguration;
+import org.hornetq.core.config.Configuration;
+import org.hornetq.core.config.CoreQueueConfiguration;
+import org.hornetq.core.config.impl.ConfigurationImpl;
+import org.hornetq.core.logging.Logger;
+import org.hornetq.core.protocol.stomp.Stomp;
+import org.hornetq.core.remoting.impl.invm.InVMAcceptorFactory;
+import org.hornetq.core.server.HornetQServer;
+import org.hornetq.core.server.HornetQServers;
+import org.hornetq.integration.transports.netty.NettyAcceptorFactory;
+import org.hornetq.integration.transports.netty.TransportConstants;
+import org.hornetq.spi.core.protocol.ProtocolType;
+import org.hornetq.tests.util.UnitTestCase;
+
+public class StompStressTest extends UnitTestCase
+{
+   private static final transient Logger log = Logger.getLogger(StompStressTest.class);
+
+   private static final int COUNT = 100;
+
+   private int port = 61613;
+
+   private Socket stompSocket;
+
+   private ByteArrayOutputStream inputBuffer;
+
+   private String destination = "stomp.stress.queue";
+
+   private HornetQServer server;
+
+   public void testSendAndReceiveMessage() throws Exception
+   {
+      String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL;
+      sendFrame(frame);
+
+      frame = receiveFrame(10000);
+      Assert.assertTrue(frame.startsWith("CONNECTED"));
+
+      frame = "SEND\n" + "destination:" + destination + "\n\n";
+
+      for (int i = 0; i < COUNT; i++)
+      {
+         sendFrame(frame + "count=" + i + Stomp.NULL);
+      }
+
+      frame = "SUBSCRIBE\n" + "destination:" + destination + "\n" + "ack:auto\n\n" + Stomp.NULL;
+      sendFrame(frame);
+
+      for (int i = 0; i < COUNT; i++)
+      {
+         frame = receiveFrame(10000);
+         Assert.assertTrue(frame.startsWith("MESSAGE"));
+         Assert.assertTrue(frame.indexOf("destination:") > 0);
+         Assert.assertTrue(frame.indexOf("count=" + i) > 0);
+      }
+
+      frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
+      sendFrame(frame);
+   }
+
+   // Implementation methods
+   // -------------------------------------------------------------------------
+   protected void setUp() throws Exception
+   {
+      super.setUp();
+
+      server = createServer();
+      server.start();
+
+      stompSocket = createSocket();
+      inputBuffer = new ByteArrayOutputStream();
+   }
+
+   private HornetQServer createServer() throws Exception
+   {
+      Configuration config = new ConfigurationImpl();
+      config.setSecurityEnabled(false);
+      config.setPersistenceEnabled(false);
+
+      Map<String, Object> params = new HashMap<String, Object>();
+      params.put(TransportConstants.PROTOCOL_PROP_NAME, ProtocolType.STOMP.toString());
+      params.put(TransportConstants.PORT_PROP_NAME, TransportConstants.DEFAULT_STOMP_PORT);
+      TransportConfiguration stompTransport = new TransportConfiguration(NettyAcceptorFactory.class.getName(), params);
+      config.getAcceptorConfigurations().add(stompTransport);
+      config.getAcceptorConfigurations().add(new TransportConfiguration(InVMAcceptorFactory.class.getName()));
+      config.getQueueConfigurations().add(new CoreQueueConfiguration(destination, destination, null, false));
+      return HornetQServers.newHornetQServer(config);
+   }
+
+   protected void tearDown() throws Exception
+   {
+      if (stompSocket != null)
+      {
+         stompSocket.close();
+      }
+      server.stop();
+
+      super.tearDown();
+   }
+
+   protected Socket createSocket() throws IOException
+   {
+      return new Socket("127.0.0.1", port);
+   }
+
+   public void sendFrame(String data) throws Exception
+   {
+      byte[] bytes = data.getBytes("UTF-8");
+      OutputStream outputStream = stompSocket.getOutputStream();
+      for (int i = 0; i < bytes.length; i++)
+      {
+         outputStream.write(bytes[i]);
+      }
+      outputStream.flush();
+   }
+
+   public void sendFrame(byte[] data) throws Exception
+   {
+      OutputStream outputStream = stompSocket.getOutputStream();
+      for (int i = 0; i < data.length; i++)
+      {
+         outputStream.write(data[i]);
+      }
+      outputStream.flush();
+   }
+
+   public String receiveFrame(long timeOut) throws Exception
+   {
+      stompSocket.setSoTimeout((int)timeOut);
+      InputStream is = stompSocket.getInputStream();
+      int c = 0;
+      for (;;)
+      {
+         c = is.read();
+         if (c < 0)
+         {
+            throw new IOException("socket closed.");
+         }
+         else if (c == 0)
+         {
+            c = is.read();
+            if (c != '\n')
+            {
+               byte[] ba = inputBuffer.toByteArray();
+               System.out.println(new String(ba, "UTF-8"));
+            }
+            Assert.assertEquals("Expecting stomp frame to terminate with \0\n", c, '\n');
+            byte[] ba = inputBuffer.toByteArray();
+            inputBuffer.reset();
+            return new String(ba, "UTF-8");
+         }
+         else
+         {
+            inputBuffer.write(c);
+         }
+      }
+   }
+}



More information about the hornetq-commits mailing list