[hornetq-commits] JBoss hornetq SVN: r8892 - in trunk: src/main/org/hornetq/core/protocol/stomp and 4 other directories.
do-not-reply at jboss.org
do-not-reply at jboss.org
Mon Feb 22 05:16:43 EST 2010
Author: jmesnil
Date: 2010-02-22 05:16:42 -0500 (Mon, 22 Feb 2010)
New Revision: 8892
Added:
trunk/src/main/org/hornetq/core/protocol/stomp/StompFrameDecoder.java
trunk/tests/src/org/hornetq/tests/stress/stomp/
trunk/tests/src/org/hornetq/tests/stress/stomp/StompStressTest.java
Removed:
trunk/src/main/org/hornetq/core/protocol/stomp/StompFrameDelimiter.java
trunk/src/main/org/hornetq/core/protocol/stomp/StompFrameError.java
trunk/src/main/org/hornetq/core/protocol/stomp/StompMarshaller.java
Modified:
trunk/docs/user-manual/en/interoperability.xml
trunk/src/main/org/hornetq/core/protocol/stomp/Stomp.java
trunk/src/main/org/hornetq/core/protocol/stomp/StompFrame.java
trunk/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java
trunk/src/main/org/hornetq/core/protocol/stomp/StompSession.java
trunk/src/main/org/hornetq/core/protocol/stomp/StompSubscription.java
trunk/src/main/org/hornetq/integration/transports/netty/ChannelPipelineSupport.java
trunk/tests/src/org/hornetq/tests/integration/stomp/StompTest.java
Log:
https://jira.jboss.org/jira/browse/HORNETQ-129: Implement STOMP v1.0
* refactoring to handle correctly frame body with null bytes
Modified: trunk/docs/user-manual/en/interoperability.xml
===================================================================
--- trunk/docs/user-manual/en/interoperability.xml 2010-02-22 09:33:27 UTC (rev 8891)
+++ trunk/docs/user-manual/en/interoperability.xml 2010-02-22 10:16:42 UTC (rev 8892)
@@ -72,11 +72,14 @@
to an address.
When a Stomp client subscribes (or unsubscribes) for a destination (using a <literal>SUBSCRIBE</literal>
or <literal>UNSUBSCRIBE</literal> frame), the destination is mapped to a HornetQ queue.</para>
- <section>
- <title>Using JMS destinations</title>
- <para>As explained in <xref linkend="jms-core-mapping" />, JMS destinations are also mapped to HornetQ addresses and queues.
- If you want to use Stomp to send messages to JMS destinations, the Stomp destinations must follow the same convention:</para>
- <itemizedlist>
+ </section>
+ <section>
+ <title>Stomp and JMS interoperabilty</title>
+ <section>
+ <title>Using JMS destinations</title>
+ <para>As explained in <xref linkend="jms-core-mapping" />, JMS destinations are also mapped to HornetQ addresses and queues.
+ If you want to use Stomp to send messages to JMS destinations, the Stomp destinations must follow the same convention:</para>
+ <itemizedlist>
<listitem>
<para>send or subscribe to a JMS <emphasis>Queue</emphasis> by prepending the queue name by <literal>jms.queue.</literal>.</para>
<para>For example, to send a message to the <literal>orders</literal> JMS Queue, the Stomp client must send the frame:</para>
@@ -92,15 +95,34 @@
<para>send or subscribe to a JMS <emphasis>Topic</emphasis> by prepending the topic name by <literal>jms.topic.</literal>.</para>
<para>For example to subscribe to the <literal>stocks</literal> JMS Topic, the Stomp client must send the frame:</para>
<programlisting>
-SUBSCRIBE
-destination:jms.topic.stocks
-
-^@
+ SUBSCRIBE
+ destination:jms.topic.stocks
+
+ ^@
</programlisting>
</listitem>
</itemizedlist>
-
</section>
+
+ <section>
+ <title>Send and consuming Stomp message from JMS</title>
+ <para>Stomp messages can be sent and consumed from a JMS Destination by using <literal>BytesMessage</literal> where
+ the Stomp message body is stored in the JMS BytesMessage body.</para>
+ <para>If the Stomp message contained a UTF-8 String, the corresponding code to read the string from a JMS BytesMessage is:</para>
+ <programlisting>
+BytesMessage message = (BytesMessage)consumer.receive();
+byte[] data = new byte[1024];
+int size = message.readBytes(data);
+String text = new String(data, 0, size, "UTF-8");
+ </programlisting>
+ <para>Conversely, to send a JMS BytesMessage destined to be consumed by Stomp as a UTF-8 String, the code is:</para>
+ <programlisting>
+String text = ...
+BytesMessage message = session.createBytesMessage();
+message.writeBytes(text.getBytes("UTF-8"));
+producer.send(message);
+ </programlisting>
+ </section>
</section>
</section>
<section>
Modified: trunk/src/main/org/hornetq/core/protocol/stomp/Stomp.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/stomp/Stomp.java 2010-02-22 09:33:27 UTC (rev 8891)
+++ trunk/src/main/org/hornetq/core/protocol/stomp/Stomp.java 2010-02-22 10:16:42 UTC (rev 8892)
@@ -68,7 +68,7 @@
public interface Headers
{
- String SEPERATOR = ":";
+ String SEPARATOR = ":";
String RECEIPT_REQUESTED = "receipt";
Modified: trunk/src/main/org/hornetq/core/protocol/stomp/StompFrame.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/stomp/StompFrame.java 2010-02-22 09:33:27 UTC (rev 8891)
+++ trunk/src/main/org/hornetq/core/protocol/stomp/StompFrame.java 2010-02-22 10:16:42 UTC (rev 8892)
@@ -17,10 +17,11 @@
*/
package org.hornetq.core.protocol.stomp;
-import java.io.IOException;
-import java.util.HashMap;
import java.util.Map;
+import org.hornetq.api.core.HornetQBuffer;
+import org.hornetq.api.core.HornetQBuffers;
+
/**
* Represents all the data in a STOMP frame.
*
@@ -28,27 +29,29 @@
*/
class StompFrame
{
- private static final byte[] NO_DATA = new byte[] {};
+ public static final byte[] NO_DATA = new byte[] {};
+ private static final byte[] END_OF_FRAME = new byte[] { 0, '\n' };
- private String command;
+ private final String command;
+ private final Map<String, Object> headers;
+ private final byte[] content;
+
+ private HornetQBuffer buffer = null;
+ private int size;
- private Map<String, Object> headers;
-
- private byte[] content = StompFrame.NO_DATA;
-
- private int size = -1;
-
- public StompFrame()
- {
- this.headers = new HashMap<String, Object>();
- }
-
public StompFrame(String command, Map<String, Object> headers, byte[] data)
{
this.command = command;
this.headers = headers;
this.content = data;
}
+
+ public StompFrame(String command, Map<String, Object> headers)
+ {
+ this.command = command;
+ this.headers = headers;
+ this.content = NO_DATA;
+ }
public String getCommand()
{
@@ -65,22 +68,13 @@
return headers;
}
- public int getEncodedSize()
+ public int getEncodedSize() throws Exception
{
- if (size == -1)
+ if (buffer == null)
{
- StompMarshaller marshaller = new StompMarshaller();
- try
- {
- size = marshaller.marshal(this).length;
- }
- catch (IOException e)
- {
- return -1;
- }
+ buffer = toHornetQBuffer();
}
-
- return size ;
+ return size;
}
@Override
@@ -88,5 +82,33 @@
{
return "StompFrame[command=" + command + ", headers=" + headers + ", content-length=" + content.length + "]";
}
+
+ public HornetQBuffer toHornetQBuffer() throws Exception
+ {
+ if (buffer == null)
+ {
+ buffer = HornetQBuffers.dynamicBuffer(content.length + 512);
+ StringBuffer head = new StringBuffer();
+ head.append(command);
+ head.append(Stomp.NEWLINE);
+ // Output the headers.
+ for (Map.Entry<String, Object> header : headers.entrySet())
+ {
+ head.append(header.getKey());
+ head.append(Stomp.Headers.SEPARATOR);
+ head.append(header.getValue());
+ head.append(Stomp.NEWLINE);
+ }
+ // Add a newline to separate the headers from the content.
+ head.append(Stomp.NEWLINE);
+
+ buffer.writeBytes(head.toString().getBytes("UTF-8"));
+ buffer.writeBytes(content);
+ buffer.writeBytes(END_OF_FRAME);
+
+ size = buffer.writerIndex();
+ }
+ return buffer;
+ }
}
Copied: trunk/src/main/org/hornetq/core/protocol/stomp/StompFrameDecoder.java (from rev 8887, trunk/src/main/org/hornetq/core/protocol/stomp/StompMarshaller.java)
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/stomp/StompFrameDecoder.java (rev 0)
+++ trunk/src/main/org/hornetq/core/protocol/stomp/StompFrameDecoder.java 2010-02-22 10:16:42 UTC (rev 8892)
@@ -0,0 +1,200 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.hornetq.core.protocol.stomp;
+
+import java.io.IOException;
+import java.util.HashMap;
+
+import org.hornetq.api.core.HornetQBuffer;
+import org.hornetq.core.logging.Logger;
+
+/**
+ * Implements marshalling and unmarsalling the <a href="http://stomp.codehaus.org/">Stomp</a> protocol.
+ */
+class StompFrameDecoder
+{
+ private static final Logger log = Logger.getLogger(StompFrameDecoder.class);
+
+ private static final int MAX_COMMAND_LENGTH = 1024;
+
+ private static final int MAX_HEADER_LENGTH = 1024 * 10;
+
+ private static final int MAX_HEADERS = 1000;
+
+ private static final int MAX_DATA_LENGTH = 1024 * 1024 * 10;
+
+ public StompFrame decode(HornetQBuffer buffer)
+ {
+ try
+ {
+ String command = StompFrameDecoder.readLine(buffer, StompFrameDecoder.MAX_COMMAND_LENGTH, "The maximum command length was exceeded");
+ if (command == null)
+ {
+ return null;
+ }
+ command = command.trim();
+ if (command.length() == 0)
+ {
+ return null;
+ }
+
+ // Parse the headers
+ HashMap<String, Object> headers = new HashMap<String, Object>(25);
+ while (true)
+ {
+ String line = StompFrameDecoder.readLine(buffer, StompFrameDecoder.MAX_HEADER_LENGTH, "The maximum header length was exceeded");
+ if (line == null)
+ {
+ return null;
+ }
+
+ if (headers.size() > StompFrameDecoder.MAX_HEADERS)
+ {
+ throw new StompException("The maximum number of headers was exceeded", true);
+ }
+
+ if (line.trim().length() == 0)
+ {
+ break;
+ }
+
+ try
+ {
+ int seperator_index = line.indexOf(Stomp.Headers.SEPARATOR);
+ if (seperator_index == -1)
+ {
+ return null;
+ }
+ String name = line.substring(0, seperator_index).trim();
+ String value = line.substring(seperator_index + 1, line.length()).trim();
+ headers.put(name, value);
+ }
+ catch (Exception e)
+ {
+ throw new StompException("Unable to parse header line [" + line + "]", true);
+ }
+ }
+ // Read in the data part.
+ byte[] data = StompFrame.NO_DATA;
+ String contentLength = (String)headers.get(Stomp.Headers.CONTENT_LENGTH);
+ if (contentLength != null)
+ {
+
+ // Bless the client, he's telling us how much data to read in.
+ int length;
+ try
+ {
+ length = Integer.parseInt(contentLength.trim());
+ }
+ catch (NumberFormatException e)
+ {
+ throw new StompException("Specified content-length is not a valid integer", true);
+ }
+
+ if (length > StompFrameDecoder.MAX_DATA_LENGTH)
+ {
+ throw new StompException("The maximum data length was exceeded", true);
+ }
+
+ if (buffer.readableBytes() < length)
+ {
+ return null;
+ }
+
+ data = new byte[length];
+ buffer.readBytes(data);
+
+ if (buffer.readByte() != 0)
+ {
+ throw new StompException(Stomp.Headers.CONTENT_LENGTH + " bytes were read and " +
+ "there was no trailing null byte", true);
+ }
+ }
+ else
+ {
+ byte[] body = new byte[StompFrameDecoder.MAX_DATA_LENGTH];
+ boolean bodyCorrectlyEnded = false;
+ int count = 0;
+ while (buffer.readable())
+ {
+ byte b = buffer.readByte();
+
+ if (b == (byte)'\0')
+ {
+ bodyCorrectlyEnded = true;
+ break;
+ }
+ else
+ {
+ body[count++] = b;
+ }
+ }
+
+ if (!bodyCorrectlyEnded)
+ {
+ return null;
+ }
+
+ data = new byte[count];
+ System.arraycopy(body, 0, data, 0, count);
+ }
+
+ return new StompFrame(command, headers, data);
+ }
+ catch (IOException e)
+ {
+ log.error("Unable to decode stomp frame", e);
+ return null;
+ }
+ }
+
+ private static String readLine(HornetQBuffer in, int maxLength, String errorMessage) throws IOException
+ {
+ char[] chars = new char[MAX_HEADER_LENGTH];
+
+ if (!in.readable())
+ {
+ return null;
+ }
+
+ boolean properString = false;
+ int count = 0;
+ while (in.readable())
+ {
+ byte b = in.readByte();
+
+ if (b == (byte)'\n')
+ {
+ properString = true;
+ break;
+ }
+ else
+ {
+ chars[count++] = (char)b;
+ }
+ }
+ if (properString)
+ {
+ return new String(chars, 0, count);
+ }
+ else
+ {
+ return null;
+ }
+ }
+}
Deleted: trunk/src/main/org/hornetq/core/protocol/stomp/StompFrameDelimiter.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/stomp/StompFrameDelimiter.java 2010-02-22 09:33:27 UTC (rev 8891)
+++ trunk/src/main/org/hornetq/core/protocol/stomp/StompFrameDelimiter.java 2010-02-22 10:16:42 UTC (rev 8892)
@@ -1,33 +0,0 @@
-/*
- * Copyright 2009 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied. See the License for the specific language governing
- * permissions and limitations under the License.
- */
-
-package org.hornetq.core.protocol.stomp;
-
-import org.jboss.netty.handler.codec.frame.DelimiterBasedFrameDecoder;
-import org.jboss.netty.handler.codec.frame.Delimiters;
-
-/**
- * A StompFrameDelimiter
- *
- * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
- */
-public class StompFrameDelimiter extends DelimiterBasedFrameDecoder
-{
-
- private static final int MAX_DATA_LENGTH = 1024 * 1024 * 100;
-
- public StompFrameDelimiter()
- {
- super(MAX_DATA_LENGTH, false, Delimiters.nulDelimiter());
- }
-}
Deleted: trunk/src/main/org/hornetq/core/protocol/stomp/StompFrameError.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/stomp/StompFrameError.java 2010-02-22 09:33:27 UTC (rev 8891)
+++ trunk/src/main/org/hornetq/core/protocol/stomp/StompFrameError.java 2010-02-22 10:16:42 UTC (rev 8892)
@@ -1,38 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.hornetq.core.protocol.stomp;
-
-/**
- * Command indicating that an invalid Stomp Frame was received.
- *
- * @author <a href="http://hiramchirino.com">chirino</a>
- */
-class StompFrameError extends StompFrame
-{
- private final StompException exception;
-
- public StompFrameError(StompException exception)
- {
- this.exception = exception;
- }
-
- public StompException getException()
- {
- return exception;
- }
-}
Deleted: trunk/src/main/org/hornetq/core/protocol/stomp/StompMarshaller.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/stomp/StompMarshaller.java 2010-02-22 09:33:27 UTC (rev 8891)
+++ trunk/src/main/org/hornetq/core/protocol/stomp/StompMarshaller.java 2010-02-22 10:16:42 UTC (rev 8892)
@@ -1,235 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.hornetq.core.protocol.stomp;
-
-import java.io.ByteArrayOutputStream;
-import java.io.DataOutput;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
-
-import org.hornetq.api.core.HornetQBuffer;
-
-/**
- * Implements marshalling and unmarsalling the <a href="http://stomp.codehaus.org/">Stomp</a> protocol.
- */
-class StompMarshaller
-{
- public static final byte[] NO_DATA = new byte[] {};
-
- private static final byte[] END_OF_FRAME = new byte[] { 0, '\n' };
-
- private static final int MAX_COMMAND_LENGTH = 1024;
-
- private static final int MAX_HEADER_LENGTH = 1024 * 10;
-
- private static final int MAX_HEADERS = 1000;
-
- private static final int MAX_DATA_LENGTH = 1024 * 1024 * 100;
-
- private int version = 1;
-
- public int getVersion()
- {
- return version;
- }
-
- public void setVersion(int version)
- {
- this.version = version;
- }
-
- public byte[] marshal(StompFrame command) throws IOException
- {
- ByteArrayOutputStream baos = new ByteArrayOutputStream();
- DataOutputStream dos = new DataOutputStream(baos);
- marshal(command, dos);
- dos.close();
- return baos.toByteArray();
- }
-
- public void marshal(StompFrame stomp, DataOutput os) throws IOException
- {
- StringBuffer buffer = new StringBuffer();
- buffer.append(stomp.getCommand());
- buffer.append(Stomp.NEWLINE);
-
- // Output the headers.
- for (Iterator<Map.Entry<String, Object>> iter = stomp.getHeaders().entrySet().iterator(); iter.hasNext();)
- {
- Map.Entry<String, Object> entry = iter.next();
- buffer.append(entry.getKey());
- buffer.append(Stomp.Headers.SEPERATOR);
- buffer.append(entry.getValue());
- buffer.append(Stomp.NEWLINE);
- }
-
- // Add a newline to seperate the headers from the content.
- buffer.append(Stomp.NEWLINE);
-
- os.write(buffer.toString().getBytes("UTF-8"));
- os.write(stomp.getContent());
- os.write(END_OF_FRAME);
- }
-
- public StompFrame unmarshal(HornetQBuffer in) throws IOException
- {
-
- try
- {
- String action = null;
-
- // skip white space to next real action line
- while (true)
- {
- action = readLine(in, MAX_COMMAND_LENGTH, "The maximum command length was exceeded");
- if (action == null)
- {
- throw new IOException("connection was closed");
- }
- else
- {
- action = action.trim();
- if (action.length() > 0)
- {
- break;
- }
- }
- }
-
- // Parse the headers
- HashMap<String, Object> headers = new HashMap<String, Object>(25);
- while (true)
- {
- String line = readLine(in, MAX_HEADER_LENGTH, "The maximum header length was exceeded");
- if (line != null && line.trim().length() > 0)
- {
-
- if (headers.size() > MAX_HEADERS)
- {
- throw new StompException("The maximum number of headers was exceeded", true);
- }
-
- try
- {
- int seperator_index = line.indexOf(Stomp.Headers.SEPERATOR);
- String name = line.substring(0, seperator_index).trim();
- String value = line.substring(seperator_index + 1, line.length()).trim();
- headers.put(name, value);
- }
- catch (Exception e)
- {
- throw new StompException("Unable to parser header line [" + line + "]", true);
- }
- }
- else
- {
- break;
- }
- }
-
- // Read in the data part.
- byte[] data = NO_DATA;
- String contentLength = (String)headers.get(Stomp.Headers.CONTENT_LENGTH);
- if (contentLength != null)
- {
-
- // Bless the client, he's telling us how much data to read in.
- int length;
- try
- {
- length = Integer.parseInt(contentLength.trim());
- }
- catch (NumberFormatException e)
- {
- throw new StompException("Specified content-length is not a valid integer", true);
- }
-
- if (length > MAX_DATA_LENGTH)
- {
- throw new StompException("The maximum data length was exceeded", true);
- }
-
- data = new byte[length];
- in.readBytes(data);
-
- if (in.readByte() != 0)
- {
- throw new StompException(Stomp.Headers.CONTENT_LENGTH + " bytes were read and " +
- "there was no trailing null byte", true);
- }
- }
- else
- {
-
- // We don't know how much to read.. data ends when we hit a 0
- byte b;
- ByteArrayOutputStream baos = null;
- while (in.readableBytes() > 0 && (b = in.readByte()) != 0)
- {
-
- if (baos == null)
- {
- baos = new ByteArrayOutputStream();
- }
- else if (baos.size() > MAX_DATA_LENGTH)
- {
- throw new StompException("The maximum data length was exceeded", true);
- }
-
- baos.write(b);
- }
-
- if (baos != null)
- {
- baos.close();
- data = baos.toByteArray();
- }
- }
-
- return new StompFrame(action, headers, data);
- }
- catch (StompException e)
- {
- return new StompFrameError(e);
- }
- }
-
- protected String readLine(HornetQBuffer in, int maxLength, String errorMessage) throws IOException
- {
- char[] chars = new char[MAX_HEADER_LENGTH];
-
- int count = 0;
- while (in.readable())
- {
- byte b = in.readByte();
-
- if (b == (byte)'\n')
- {
- break;
- }
- else
- {
- chars[count++] = (char)b;
- }
- }
- return new String(chars, 0, count);
- }
-}
Modified: trunk/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java 2010-02-22 09:33:27 UTC (rev 8891)
+++ trunk/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java 2010-02-22 10:16:42 UTC (rev 8892)
@@ -14,7 +14,6 @@
package org.hornetq.core.protocol.stomp;
import java.io.ByteArrayOutputStream;
-import java.io.IOException;
import java.io.OutputStreamWriter;
import java.io.PrintWriter;
import java.io.UnsupportedEncodingException;
@@ -26,7 +25,6 @@
import java.util.concurrent.Executor;
import org.hornetq.api.core.HornetQBuffer;
-import org.hornetq.api.core.HornetQBuffers;
import org.hornetq.api.core.HornetQException;
import org.hornetq.api.core.Interceptor;
import org.hornetq.api.core.Message;
@@ -61,7 +59,7 @@
private final HornetQServer server;
- private final StompMarshaller marshaller;
+ private final StompFrameDecoder frameDecoder;
private final Executor executor;
@@ -106,7 +104,7 @@
public StompProtocolManager(final HornetQServer server, final List<Interceptor> interceptors)
{
this.server = server;
- this.marshaller = new StompMarshaller();
+ this.frameDecoder = new StompFrameDecoder();
this.executor = server.getExecutorFactory().getExecutor();
}
@@ -125,9 +123,21 @@
public int isReadyToHandle(HornetQBuffer buffer)
{
- return -1;
+ int start = buffer.readerIndex();
+
+ StompFrame frame = frameDecoder.decode(buffer);
+
+ if (frame == null)
+ {
+ return -1;
+ }
+ else
+ {
+ return buffer.readerIndex() - start;
+ }
}
+
public void handleBuffer(final RemotingConnection connection, final HornetQBuffer buffer)
{
executor.execute(new Runnable()
@@ -146,21 +156,21 @@
});
}
- private void doHandleBuffer(RemotingConnection connection, HornetQBuffer buffer)
+ private void doHandleBuffer(final RemotingConnection connection, final HornetQBuffer buffer)
{
StompConnection conn = (StompConnection)connection;
StompFrame request = null;
try
{
- request = marshaller.unmarshal(buffer);
+ request = frameDecoder.decode(buffer);
if (log.isTraceEnabled())
{
log.trace("received " + request);
}
String command = request.getCommand();
-
StompFrame response = null;
+
if (Stomp.Commands.CONNECT.equals(command))
{
response = onConnect(request, conn);
@@ -199,7 +209,6 @@
}
else
{
-
log.error("Unsupported Stomp frame: " + request);
response = new StompFrame(Stomp.Responses.ERROR,
new HashMap<String, Object>(),
@@ -211,7 +220,7 @@
if (response == null)
{
Map<String, Object> h = new HashMap<String, Object>();
- response = new StompFrame(Stomp.Responses.RECEIPT, h, StompMarshaller.NO_DATA);
+ response = new StompFrame(Stomp.Responses.RECEIPT, h);
}
response.getHeaders().put(Stomp.Headers.Response.RECEIPT_ID,
request.getHeaders().get(Stomp.Headers.RECEIPT_REQUESTED));
@@ -325,7 +334,7 @@
boolean unsubscribed = stompSession.unsubscribe(subscriptionID);
if (!unsubscribed)
{
- throw new StompException("Cannot unsubscribe as a subscription exists for id: " + subscriptionID);
+ throw new StompException("Cannot unsubscribe as no subscription exists for id: " + subscriptionID);
}
return null;
}
@@ -473,11 +482,7 @@
Map<String, Object> headers = frame.getHeaders();
String destination = (String)headers.remove(Stomp.Headers.Send.DESTINATION);
String txID = (String)headers.remove(Stomp.Headers.TRANSACTION);
- byte type = Message.TEXT_TYPE;
- if (headers.containsKey(Stomp.Headers.CONTENT_LENGTH))
- {
- type = Message.BYTES_TYPE;
- }
+ byte type = Message.BYTES_TYPE;
long timestamp = System.currentTimeMillis();
ServerMessageImpl message = new ServerMessageImpl(server.getStorageManager().generateUniqueID(), 512);
@@ -485,15 +490,7 @@
message.setTimestamp(timestamp);
message.setAddress(SimpleString.toSimpleString(destination));
StompUtils.copyStandardHeadersFromFrameToMessage(frame, message);
- byte[] content = frame.getContent();
- if (type == Message.TEXT_TYPE)
- {
- message.getBodyBuffer().writeNullableSimpleString(SimpleString.toSimpleString(new String(content)));
- }
- else
- {
- message.getBodyBuffer().writeBytes(content);
- }
+ message.getBodyBuffer().writeBytes(frame.getContent());
StompSession stompSession = null;
if (txID == null)
@@ -533,7 +530,7 @@
{
h.put(Stomp.Headers.Connected.RESPONSE_ID, requestID);
}
- return new StompFrame(Stomp.Responses.CONNECTED, h, StompMarshaller.NO_DATA);
+ return new StompFrame(Stomp.Responses.CONNECTED, h);
}
public void send(final StompConnection connection, final StompFrame frame)
@@ -619,16 +616,16 @@
try
{
- byte[] bytes = marshaller.marshal(frame);
- HornetQBuffer buffer = HornetQBuffers.wrappedBuffer(bytes);
+ HornetQBuffer buffer = frame.toHornetQBuffer();
connection.getTransportConnection().write(buffer, true);
}
- catch (IOException e)
+ catch (Exception e)
{
log.error("Unable to send frame " + frame, e);
}
}
}
+
// Inner classes -------------------------------------------------
}
Modified: trunk/src/main/org/hornetq/core/protocol/stomp/StompSession.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/stomp/StompSession.java 2010-02-22 09:33:27 UTC (rev 8891)
+++ trunk/src/main/org/hornetq/core/protocol/stomp/StompSession.java 2010-02-22 10:16:42 UTC (rev 8892)
@@ -18,7 +18,6 @@
import java.util.Map.Entry;
import org.hornetq.api.core.HornetQBuffer;
-import org.hornetq.api.core.Message;
import org.hornetq.api.core.SimpleString;
import org.hornetq.core.message.impl.MessageImpl;
import org.hornetq.core.persistence.OperationContext;
@@ -28,6 +27,7 @@
import org.hornetq.core.server.ServerSession;
import org.hornetq.spi.core.protocol.RemotingConnection;
import org.hornetq.spi.core.protocol.SessionCallback;
+import org.hornetq.utils.DataConstants;
import org.hornetq.utils.UUIDGenerator;
/**
@@ -85,32 +85,20 @@
{
headers.put(Stomp.Headers.Message.SUBSCRIPTION, subscription.getID());
}
- byte[] data = new byte[] {};
- serverMessage.getBodyBuffer().markReaderIndex();
- if (serverMessage.getType() == Message.TEXT_TYPE)
- {
- SimpleString text = serverMessage.getBodyBuffer().readNullableSimpleString();
- if (text != null)
- {
- data = text.toString().getBytes("UTF-8");
- }
- }
- else
- {
- HornetQBuffer buffer = serverMessage.getBodyBuffer();
- buffer.readerIndex(MessageImpl.BUFFER_HEADER_SPACE);
- int size = serverMessage.getEndOfBodyPosition() - buffer.readerIndex();
- data = new byte[size];
- buffer.readBytes(data);
- headers.put(Headers.CONTENT_LENGTH, data.length);
- }
+ HornetQBuffer buffer = serverMessage.getBodyBuffer();
+ buffer.readerIndex(MessageImpl.BUFFER_HEADER_SPACE + DataConstants.SIZE_INT);
+ int bodyPos = serverMessage.getEndOfBodyPosition() == -1 ? buffer.writerIndex() : serverMessage.getEndOfBodyPosition();
+ int size = bodyPos - buffer.readerIndex();
+ byte[] data = new byte[size];
+ buffer.readBytes(data);
+ headers.put(Headers.CONTENT_LENGTH, data.length);
serverMessage.getBodyBuffer().resetReaderIndex();
-
+
StompFrame frame = new StompFrame(Stomp.Responses.MESSAGE, headers, data);
StompUtils.copyStandardHeadersFromMessageToFrame(serverMessage, frame, deliveryCount);
manager.send(connection, frame);
- int size = frame.getEncodedSize();
+ int length = frame.getEncodedSize();
if (subscription.getAck().equals(Stomp.Headers.Subscribe.AckModeValues.AUTO))
{
@@ -121,7 +109,7 @@
{
messagesToAck.put(serverMessage.getMessageID(), consumerID);
}
- return size;
+ return length;
}
catch (Exception e)
@@ -183,7 +171,7 @@
// Already exists
if (query.getConsumerCount() > 0)
{
- throw new IllegalStateException("Cannot create a subscriber on the durable subscription since it already has subscriber(s)");
+ throw new IllegalStateException("Cannot create a subscriber on the durable subscription since it already has a subscriber: " + queue);
}
}
} else
@@ -194,7 +182,7 @@
}
session.createConsumer(consumerID, queue, SimpleString.toSimpleString(selector), false);
session.receiveConsumerCredits(consumerID, -1);
- StompSubscription subscription = new StompSubscription(subscriptionID, destination, ack);
+ StompSubscription subscription = new StompSubscription(subscriptionID, ack);
subscriptions.put(consumerID, subscription);
// FIXME not very smart: since we can't start the consumer, we start the session
// every time to start the new consumer (and all previous consumers...)
Modified: trunk/src/main/org/hornetq/core/protocol/stomp/StompSubscription.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/stomp/StompSubscription.java 2010-02-22 09:33:27 UTC (rev 8891)
+++ trunk/src/main/org/hornetq/core/protocol/stomp/StompSubscription.java 2010-02-22 10:16:42 UTC (rev 8892)
@@ -28,18 +28,15 @@
private final String subID;
- private final String destination;
-
private final String ack;
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
- public StompSubscription(String subID, String destination, String ack)
+ public StompSubscription(String subID, String ack)
{
this.subID = subID;
- this.destination = destination;
this.ack = ack;
}
@@ -50,11 +47,6 @@
return ack;
}
- public String getDestination()
- {
- return destination;
- }
-
public String getID()
{
return subID;
Modified: trunk/src/main/org/hornetq/integration/transports/netty/ChannelPipelineSupport.java
===================================================================
--- trunk/src/main/org/hornetq/integration/transports/netty/ChannelPipelineSupport.java 2010-02-22 09:33:27 UTC (rev 8891)
+++ trunk/src/main/org/hornetq/integration/transports/netty/ChannelPipelineSupport.java 2010-02-22 10:16:42 UTC (rev 8892)
@@ -16,7 +16,6 @@
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLEngine;
-import org.hornetq.core.protocol.stomp.StompFrameDelimiter;
import org.hornetq.spi.core.protocol.ProtocolType;
import org.hornetq.spi.core.remoting.BufferDecoder;
import org.jboss.netty.channel.ChannelPipeline;
@@ -55,10 +54,6 @@
//Core protocol uses it's own optimised decoder
pipeline.addLast("decoder", new HornetQFrameDecoder2());
}
- else if (protocol == ProtocolType.STOMP)
- {
- pipeline.addLast("decoder", new StompFrameDelimiter());
- }
else
{
pipeline.addLast("decoder", new HornetQFrameDecoder(decoder));
Modified: trunk/tests/src/org/hornetq/tests/integration/stomp/StompTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/stomp/StompTest.java 2010-02-22 09:33:27 UTC (rev 8891)
+++ trunk/tests/src/org/hornetq/tests/integration/stomp/StompTest.java 2010-02-22 10:16:42 UTC (rev 8892)
@@ -25,6 +25,8 @@
import java.net.SocketTimeoutException;
import java.util.HashMap;
import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@@ -33,12 +35,12 @@
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
-import javax.jms.JMSException;
+import javax.jms.Message;
import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
-import javax.jms.TextMessage;
import javax.jms.Topic;
import junit.framework.Assert;
@@ -76,6 +78,45 @@
private Topic topic;
private JMSServerManager server;
+ public void _testSendManyMessages() throws Exception {
+ MessageConsumer consumer = session.createConsumer(queue);
+
+ String frame =
+ "CONNECT\n" +
+ "login: brianm\n" +
+ "passcode: wombats\n\n" +
+ Stomp.NULL;
+ sendFrame(frame);
+ frame = receiveFrame(10000);
+
+ Assert.assertTrue(frame.startsWith("CONNECTED"));
+ int count = 1000;
+ final CountDownLatch latch = new CountDownLatch(count);
+ consumer.setMessageListener(new MessageListener()
+ {
+
+ public void onMessage(Message arg0)
+ {
+ System.out.println("<<< " + (1000 - latch.getCount()));
+ latch.countDown();
+ }
+ });
+
+ frame =
+ "SEND\n" +
+ "destination:" + getQueuePrefix() + getQueueName() + "\n\n" +
+ "Hello World" +
+ Stomp.NULL;
+ for (int i=1; i <= count; i++) {
+ // Thread.sleep(1);
+ System.out.println(">>> " + i);
+ sendFrame(frame);
+ }
+
+ assertTrue(latch.await(60, TimeUnit.SECONDS));
+
+ }
+
public void testConnect() throws Exception {
String connect_frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n" + "request-id: 1\n" + "\n" + Stomp.NULL;
@@ -137,9 +178,9 @@
sendFrame(frame);
- TextMessage message = (TextMessage) consumer.receive(1000);
+ BytesMessage message = (BytesMessage) consumer.receive(1000);
Assert.assertNotNull(message);
- Assert.assertEquals("Hello World", message.getText());
+ Assert.assertEquals("Hello World", readContent(message));
// Make sure that the timestamp is valid - should
// be very close to the current time.
@@ -175,9 +216,9 @@
Assert.assertTrue(f.startsWith("RECEIPT"));
Assert.assertTrue(f.indexOf("receipt-id:1234") >= 0);
- TextMessage message = (TextMessage) consumer.receive(1000);
+ BytesMessage message = (BytesMessage) consumer.receive(1000);
Assert.assertNotNull(message);
- Assert.assertEquals("Hello World", message.getText());
+ Assert.assertEquals("Hello World", readContent(message));
// Make sure that the timestamp is valid - should
// be very close to the current time.
@@ -200,16 +241,17 @@
frame = receiveFrame(10000);
Assert.assertTrue(frame.startsWith("CONNECTED"));
- byte[] data = new byte[] {1, 2, 3, 4};
-
+ byte[] data = new byte[] {1, 0, 0, 4};
+
frame =
"SEND\n" +
"destination:" + getQueuePrefix() + getQueueName() + "\n" +
- "content-length:" + data.length + "\n\n" +
- new String(data) +
- Stomp.NULL;
-
- sendFrame(frame);
+ "content-length:" + data.length + "\n\n";
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ baos.write(frame.getBytes("UTF-8"));
+ baos.write(data);
+ baos.write('\0');
+ sendFrame(baos.toByteArray());
BytesMessage message = (BytesMessage) consumer.receive(1000);
Assert.assertNotNull(message);
@@ -218,12 +260,6 @@
assertEquals(data[1], message.readByte());
assertEquals(data[2], message.readByte());
assertEquals(data[3], message.readByte());
-
- // Make sure that the timestamp is valid - should
- // be very close to the current time.
- long tnow = System.currentTimeMillis();
- long tmsg = message.getJMSTimestamp();
- Assert.assertTrue(Math.abs(tnow - tmsg) < 1000);
}
public void testJMSXGroupIdCanBeSet() throws Exception {
@@ -249,10 +285,11 @@
sendFrame(frame);
- TextMessage message = (TextMessage) consumer.receive(1000);
+ BytesMessage message = (BytesMessage) consumer.receive(1000);
Assert.assertNotNull(message);
+ Assert.assertEquals("Hello World", readContent(message));
// differ from StompConnect
- Assert.assertEquals("TEST", ((TextMessage) message).getStringProperty("JMSXGroupID"));
+ Assert.assertEquals("TEST", message.getStringProperty("JMSXGroupID"));
}
public void testSendMessageWithCustomHeadersAndSelector() throws Exception {
@@ -279,9 +316,9 @@
sendFrame(frame);
- TextMessage message = (TextMessage) consumer.receive(1000);
+ BytesMessage message = (BytesMessage) consumer.receive(1000);
Assert.assertNotNull(message);
- Assert.assertEquals("Hello World", message.getText());
+ Assert.assertEquals("Hello World", readContent(message));
Assert.assertEquals("foo", "abc", message.getStringProperty("foo"));
Assert.assertEquals("bar", "123", message.getStringProperty("bar"));
}
@@ -315,9 +352,9 @@
sendFrame(frame);
- TextMessage message = (TextMessage) consumer.receive(1000);
+ BytesMessage message = (BytesMessage) consumer.receive(1000);
Assert.assertNotNull(message);
- Assert.assertEquals("Hello World", message.getText());
+ Assert.assertEquals("Hello World", readContent(message));
Assert.assertEquals("JMSCorrelationID", "c123", message.getJMSCorrelationID());
Assert.assertEquals("getJMSType", "t345", message.getJMSType());
Assert.assertEquals("getJMSPriority", 3, message.getJMSPriority());
@@ -364,7 +401,7 @@
// message should not be received as it was auto-acked
MessageConsumer consumer = session.createConsumer(queue);
- TextMessage message = (TextMessage) consumer.receive(1000);
+ Message message = consumer.receive(1000);
Assert.assertNull(message);
}
@@ -389,7 +426,7 @@
sendFrame(frame);
byte[] payload = new byte[]{1, 2, 3, 4, 5};
- sendBytesMessage(payload);
+ sendMessage(payload, queue);
frame = receiveFrame(10000);
Assert.assertTrue(frame.startsWith("MESSAGE"));
@@ -429,7 +466,7 @@
sendFrame(frame);
MessageProducer producer = session.createProducer(queue);
- TextMessage message = session.createTextMessage("Hello World");
+ BytesMessage message = session.createBytesMessage();
message.setStringProperty("S", "value");
message.setBooleanProperty("n", false);
message.setByteProperty("byte", (byte) 9);
@@ -438,6 +475,7 @@
message.setIntProperty("i", 10);
message.setLongProperty("l", 121);
message.setShortProperty("s", (short) 12);
+ message.writeBytes("Hello World".getBytes("UTF-8"));
producer.send(message);
frame = receiveFrame(10000);
@@ -532,12 +570,6 @@
"\n\n" +
Stomp.NULL;
sendFrame(frame);
-
- // message should not be received as it was auto-acked
- MessageConsumer consumer = session.createConsumer(queue);
- TextMessage message = (TextMessage) consumer.receive(1000);
- Assert.assertNull(message);
-
}
public void testMessagesAreInOrder() throws Exception {
@@ -667,7 +699,7 @@
// message should not be received since message was acknowledged by the client
MessageConsumer consumer = session.createConsumer(queue);
- TextMessage message = (TextMessage) consumer.receive(1000);
+ Message message = consumer.receive(1000);
Assert.assertNull(message);
}
@@ -703,7 +735,7 @@
// message should be received since message was not acknowledged
MessageConsumer consumer = session.createConsumer(queue);
- TextMessage message = (TextMessage) consumer.receive(1000);
+ Message message = consumer.receive(1000);
Assert.assertNotNull(message);
Assert.assertTrue(message.getJMSRedelivered());
}
@@ -957,7 +989,7 @@
sendFrame(frame);
waitForReceipt();
- TextMessage message = (TextMessage) consumer.receive(1000);
+ Message message = consumer.receive(1000);
Assert.assertNotNull("Should have received a message", message);
}
@@ -998,7 +1030,7 @@
Stomp.NULL;
sendFrame(frame);
- TextMessage message = (TextMessage) consumer.receive(1000);
+ Message message = consumer.receive(1000);
Assert.assertNotNull("Should have received a message", message);
// 2nd tx with same tx ID
@@ -1025,7 +1057,7 @@
Stomp.NULL;
sendFrame(frame);
- message = (TextMessage) consumer.receive(1000);
+ message = consumer.receive(1000);
Assert.assertNotNull("Should have received a message", message);
}
@@ -1123,9 +1155,9 @@
waitForReceipt();
//only second msg should be received since first msg was rolled back
- TextMessage message = (TextMessage) consumer.receive(1000);
+ BytesMessage message = (BytesMessage) consumer.receive(1000);
Assert.assertNotNull(message);
- Assert.assertEquals("second message", message.getText().trim());
+ Assert.assertEquals("second message", readContent(message));
}
public void testSubscribeToTopic() throws Exception {
@@ -1203,25 +1235,21 @@
String subscribeFrame =
"SUBSCRIBE\n" +
"destination:" + getTopicPrefix() + getTopicName() + "\n" +
- "receipt: 12\n" +
"durable-subscription-name: " + getName() + "\n" +
"\n\n" +
Stomp.NULL;
sendFrame(subscribeFrame);
- // wait for SUBSCRIBE's receipt
- frame = receiveFrame(10000);
- Assert.assertTrue(frame.startsWith("RECEIPT"));
+ waitForFrameToTakeEffect();
String disconnectFrame =
"DISCONNECT\n" +
"\n\n" +
Stomp.NULL;
sendFrame(disconnectFrame);
- stompSocket.close();
+ waitForFrameToTakeEffect();
// send the message when the durable subscriber is disconnected
sendMessage(getName(), topic);
-
reconnect(1000);
sendFrame(connectFame);
@@ -1229,9 +1257,6 @@
Assert.assertTrue(frame.startsWith("CONNECTED"));
sendFrame(subscribeFrame);
- // wait for SUBSCRIBE's receipt
- frame = receiveFrame(10000);
- Assert.assertTrue(frame.startsWith("RECEIPT"));
// we must have received the message
frame = receiveFrame(10000);
@@ -1522,6 +1547,14 @@
outputStream.flush();
}
+ public void sendFrame(byte[] data) throws Exception {
+ OutputStream outputStream = stompSocket.getOutputStream();
+ for (int i = 0; i < data.length; i++) {
+ outputStream.write(data[i]);
+ }
+ outputStream.flush();
+ }
+
public String receiveFrame(long timeOut) throws Exception {
stompSocket.setSoTimeout((int) timeOut);
InputStream is = stompSocket.getInputStream();
@@ -1550,31 +1583,36 @@
}
public void sendMessage(String msg) throws Exception {
- sendMessage(msg, "foo", "xyz", queue);
+ sendMessage(msg.getBytes("UTF-8"), "foo", "xyz", queue);
}
public void sendMessage(String msg, Destination destination) throws Exception {
- sendMessage(msg, "foo", "xyz", destination);
+ sendMessage(msg.getBytes("UTF-8"), "foo", "xyz", destination);
}
- public void sendMessage(String msg, String propertyName, String propertyValue) throws JMSException {
- sendMessage(msg, propertyName, propertyValue, queue);
- }
+ public void sendMessage(byte[] data, Destination destination) throws Exception {
+ sendMessage(data, "foo", "xyz", destination);
+ }
- public void sendMessage(String msg, String propertyName, String propertyValue, Destination destination) throws JMSException {
- MessageProducer producer = session.createProducer(destination);
- TextMessage message = session.createTextMessage(msg);
- message.setStringProperty(propertyName, propertyValue);
- producer.send(message);
+ public void sendMessage(String msg, String propertyName, String propertyValue) throws Exception {
+ sendMessage(msg.getBytes("UTF-8"), propertyName, propertyValue, queue);
}
- public void sendBytesMessage(byte[] msg) throws Exception {
- MessageProducer producer = session.createProducer(queue);
+ public void sendMessage(byte[] data, String propertyName, String propertyValue, Destination destination) throws Exception {
+ MessageProducer producer = session.createProducer(destination);
BytesMessage message = session.createBytesMessage();
- message.writeBytes(msg);
+ message.setStringProperty(propertyName, propertyValue);
+ message.writeBytes(data);
producer.send(message);
}
-
+
+ public String readContent(BytesMessage message) throws Exception
+ {
+ byte[] data = new byte[1024];
+ int size = message.readBytes(data);
+ return new String(data, 0, size, "UTF-8");
+ }
+
protected void waitForReceipt() throws Exception {
String frame = receiveFrame(50000);
assertNotNull(frame);
Added: trunk/tests/src/org/hornetq/tests/stress/stomp/StompStressTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/stress/stomp/StompStressTest.java (rev 0)
+++ trunk/tests/src/org/hornetq/tests/stress/stomp/StompStressTest.java 2010-02-22 10:16:42 UTC (rev 8892)
@@ -0,0 +1,182 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.hornetq.tests.stress.stomp;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.Socket;
+import java.util.HashMap;
+import java.util.Map;
+
+import junit.framework.Assert;
+
+import org.hornetq.api.core.TransportConfiguration;
+import org.hornetq.core.config.Configuration;
+import org.hornetq.core.config.CoreQueueConfiguration;
+import org.hornetq.core.config.impl.ConfigurationImpl;
+import org.hornetq.core.logging.Logger;
+import org.hornetq.core.protocol.stomp.Stomp;
+import org.hornetq.core.remoting.impl.invm.InVMAcceptorFactory;
+import org.hornetq.core.server.HornetQServer;
+import org.hornetq.core.server.HornetQServers;
+import org.hornetq.integration.transports.netty.NettyAcceptorFactory;
+import org.hornetq.integration.transports.netty.TransportConstants;
+import org.hornetq.spi.core.protocol.ProtocolType;
+import org.hornetq.tests.util.UnitTestCase;
+
+public class StompStressTest extends UnitTestCase
+{
+ private static final transient Logger log = Logger.getLogger(StompStressTest.class);
+
+ private static final int COUNT = 100;
+
+ private int port = 61613;
+
+ private Socket stompSocket;
+
+ private ByteArrayOutputStream inputBuffer;
+
+ private String destination = "stomp.stress.queue";
+
+ private HornetQServer server;
+
+ public void testSendAndReceiveMessage() throws Exception
+ {
+ String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL;
+ sendFrame(frame);
+
+ frame = receiveFrame(10000);
+ Assert.assertTrue(frame.startsWith("CONNECTED"));
+
+ frame = "SEND\n" + "destination:" + destination + "\n\n";
+
+ for (int i = 0; i < COUNT; i++)
+ {
+ sendFrame(frame + "count=" + i + Stomp.NULL);
+ }
+
+ frame = "SUBSCRIBE\n" + "destination:" + destination + "\n" + "ack:auto\n\n" + Stomp.NULL;
+ sendFrame(frame);
+
+ for (int i = 0; i < COUNT; i++)
+ {
+ frame = receiveFrame(10000);
+ Assert.assertTrue(frame.startsWith("MESSAGE"));
+ Assert.assertTrue(frame.indexOf("destination:") > 0);
+ Assert.assertTrue(frame.indexOf("count=" + i) > 0);
+ }
+
+ frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
+ sendFrame(frame);
+ }
+
+ // Implementation methods
+ // -------------------------------------------------------------------------
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+
+ server = createServer();
+ server.start();
+
+ stompSocket = createSocket();
+ inputBuffer = new ByteArrayOutputStream();
+ }
+
+ private HornetQServer createServer() throws Exception
+ {
+ Configuration config = new ConfigurationImpl();
+ config.setSecurityEnabled(false);
+ config.setPersistenceEnabled(false);
+
+ Map<String, Object> params = new HashMap<String, Object>();
+ params.put(TransportConstants.PROTOCOL_PROP_NAME, ProtocolType.STOMP.toString());
+ params.put(TransportConstants.PORT_PROP_NAME, TransportConstants.DEFAULT_STOMP_PORT);
+ TransportConfiguration stompTransport = new TransportConfiguration(NettyAcceptorFactory.class.getName(), params);
+ config.getAcceptorConfigurations().add(stompTransport);
+ config.getAcceptorConfigurations().add(new TransportConfiguration(InVMAcceptorFactory.class.getName()));
+ config.getQueueConfigurations().add(new CoreQueueConfiguration(destination, destination, null, false));
+ return HornetQServers.newHornetQServer(config);
+ }
+
+ protected void tearDown() throws Exception
+ {
+ if (stompSocket != null)
+ {
+ stompSocket.close();
+ }
+ server.stop();
+
+ super.tearDown();
+ }
+
+ protected Socket createSocket() throws IOException
+ {
+ return new Socket("127.0.0.1", port);
+ }
+
+ public void sendFrame(String data) throws Exception
+ {
+ byte[] bytes = data.getBytes("UTF-8");
+ OutputStream outputStream = stompSocket.getOutputStream();
+ for (int i = 0; i < bytes.length; i++)
+ {
+ outputStream.write(bytes[i]);
+ }
+ outputStream.flush();
+ }
+
+ public void sendFrame(byte[] data) throws Exception
+ {
+ OutputStream outputStream = stompSocket.getOutputStream();
+ for (int i = 0; i < data.length; i++)
+ {
+ outputStream.write(data[i]);
+ }
+ outputStream.flush();
+ }
+
+ public String receiveFrame(long timeOut) throws Exception
+ {
+ stompSocket.setSoTimeout((int)timeOut);
+ InputStream is = stompSocket.getInputStream();
+ int c = 0;
+ for (;;)
+ {
+ c = is.read();
+ if (c < 0)
+ {
+ throw new IOException("socket closed.");
+ }
+ else if (c == 0)
+ {
+ c = is.read();
+ if (c != '\n')
+ {
+ byte[] ba = inputBuffer.toByteArray();
+ System.out.println(new String(ba, "UTF-8"));
+ }
+ Assert.assertEquals("Expecting stomp frame to terminate with \0\n", c, '\n');
+ byte[] ba = inputBuffer.toByteArray();
+ inputBuffer.reset();
+ return new String(ba, "UTF-8");
+ }
+ else
+ {
+ inputBuffer.write(c);
+ }
+ }
+ }
+}
More information about the hornetq-commits
mailing list