Author: timfox
Date: 2010-10-05 05:51:27 -0400 (Tue, 05 Oct 2010)
New Revision: 9746
Removed:
trunk/src/main/org/hornetq/core/protocol/stomp/StompFrameDecoder.java
Modified:
trunk/docs/user-manual/en/interoperability.xml
trunk/src/main/org/hornetq/core/protocol/stomp/StompConnection.java
trunk/src/main/org/hornetq/core/protocol/stomp/StompFrame.java
trunk/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java
trunk/src/main/org/hornetq/core/protocol/stomp/StompSession.java
trunk/src/main/org/hornetq/core/protocol/stomp/StompUtils.java
trunk/src/main/org/hornetq/core/protocol/stomp/WebSocketStompFrameEncoder.java
trunk/src/main/org/hornetq/core/remoting/impl/netty/NettyAcceptor.java
trunk/tests/src/org/hornetq/tests/integration/stomp/StompTest.java
Log:
https://jira.jboss.org/browse/HORNETQ-544
Modified: trunk/docs/user-manual/en/interoperability.xml
===================================================================
--- trunk/docs/user-manual/en/interoperability.xml 2010-10-04 22:20:52 UTC (rev 9745)
+++ trunk/docs/user-manual/en/interoperability.xml 2010-10-05 09:51:27 UTC (rev 9746)
@@ -57,6 +57,18 @@
When a Stomp client subscribes (or unsubscribes) for a destination (using a
<literal>SUBSCRIBE</literal>
or <literal>UNSUBSCRIBE</literal> frame), the destination is
mapped to a HornetQ queue.</para>
</section>
+ <section>
+ <title>STOMP and connection-ttl</title>
+ <para>Well behaved STOMP clients will always send a DISCONNECT frame before
closing their connections. In this case the server
+ will clear up any server side resources such as sessions and consumers
synchronously. However if STOMP clients exit without
+ sending a DISCONNECT frame or if they crash the server will have no way of
knowing immediately whether the client is still alive
+ or not. STOMP connections therefore default to a connection-ttl value of 1 minute
(see chapter on <link linkend="connection-ttl"
+ >connection-ttl</link> for more information. This value can be
overridden using connection-ttl-override.
+ </para>
+ <note><para>Please note that the STOMP protocol does not contain any
heartbeat frame. It is therefore the user's responsibility to make sure
+ data is sent within connection-ttl or the server will assume the client is dead
and clean up server side resources.</para></note>
+ </section>
+
<section>
<title>Stomp and JMS interoperabilty</title>
<section>
Modified: trunk/src/main/org/hornetq/core/protocol/stomp/StompConnection.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/stomp/StompConnection.java 2010-10-04
22:20:52 UTC (rev 9745)
+++ trunk/src/main/org/hornetq/core/protocol/stomp/StompConnection.java 2010-10-05
09:51:27 UTC (rev 9746)
@@ -49,6 +49,13 @@
private boolean valid;
private boolean destroyed = false;
+
+ private StompDecoder decoder = new StompDecoder();
+
+ public StompDecoder getDecoder()
+ {
+ return decoder;
+ }
StompConnection(final Connection transportConnection, final StompProtocolManager
manager)
{
Modified: trunk/src/main/org/hornetq/core/protocol/stomp/StompFrame.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/stomp/StompFrame.java 2010-10-04 22:20:52 UTC
(rev 9745)
+++ trunk/src/main/org/hornetq/core/protocol/stomp/StompFrame.java 2010-10-05 09:51:27 UTC
(rev 9746)
@@ -22,31 +22,40 @@
import org.hornetq.api.core.HornetQBuffer;
import org.hornetq.api.core.HornetQBuffers;
+import org.hornetq.core.logging.Logger;
/**
* Represents all the data in a STOMP frame.
*
* @author <a href="http://hiramchirino.com">chirino</a>
+ * @author Tim Fox
+ *
*/
class StompFrame
{
+ private static final Logger log = Logger.getLogger(StompFrame.class);
+
public static final byte[] NO_DATA = new byte[] {};
+
private static final byte[] END_OF_FRAME = new byte[] { 0, '\n' };
private final String command;
+
private final Map<String, Object> headers;
+
private final byte[] content;
-
+
private HornetQBuffer buffer = null;
+
private int size;
-
+
public StompFrame(String command, Map<String, Object> headers, byte[] data)
{
this.command = command;
this.headers = headers;
this.content = data;
}
-
+
public StompFrame(String command, Map<String, Object> headers)
{
this.command = command;
@@ -63,7 +72,7 @@
{
return content;
}
-
+
public Map<String, Object> getHeaders()
{
return headers;
@@ -95,7 +104,8 @@
out += new String(content);
return out;
}
-
+
+
public HornetQBuffer toHornetQBuffer() throws Exception
{
if (buffer == null)
Deleted: trunk/src/main/org/hornetq/core/protocol/stomp/StompFrameDecoder.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/stomp/StompFrameDecoder.java 2010-10-04
22:20:52 UTC (rev 9745)
+++ trunk/src/main/org/hornetq/core/protocol/stomp/StompFrameDecoder.java 2010-10-05
09:51:27 UTC (rev 9746)
@@ -1,209 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *
http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.hornetq.core.protocol.stomp;
-
-import java.io.IOException;
-import java.util.HashMap;
-
-import org.hornetq.api.core.HornetQBuffer;
-import org.hornetq.core.logging.Logger;
-
-/**
- * Implements marshalling and unmarsalling the <a
href="http://stomp.codehaus.org/">Stomp</a> protocol.
- */
-class StompFrameDecoder
-{
- private static final Logger log = Logger.getLogger(StompFrameDecoder.class);
-
- private static final int MAX_COMMAND_LENGTH = 1024;
-
- private static final int MAX_HEADER_LENGTH = 1024 * 10;
-
- private static final int MAX_HEADERS = 1000;
-
- private static final int MAX_DATA_LENGTH = 1024 * 1024 * 10;
-
- public StompFrame decode(HornetQBuffer buffer)
- {
- try
- {
- String command = null;
-
- // skip white space to next real action line
- while (true) {
- command = StompFrameDecoder.readLine(buffer,
StompFrameDecoder.MAX_COMMAND_LENGTH, "The maximum command length was
exceeded");
- if (command == null) {
- return null;
- }
- else {
- command = command.trim();
- if (command.length() > 0) {
- break;
- }
- }
- }
-
- // Parse the headers
- HashMap<String, Object> headers = new HashMap<String, Object>(25);
- while (true)
- {
- String line = StompFrameDecoder.readLine(buffer,
StompFrameDecoder.MAX_HEADER_LENGTH, "The maximum header length was exceeded");
- if (line == null)
- {
- return null;
- }
-
- if (headers.size() > StompFrameDecoder.MAX_HEADERS)
- {
- throw new StompException("The maximum number of headers was
exceeded", true);
- }
-
- if (line.trim().length() == 0)
- {
- break;
- }
-
- try
- {
- int seperator_index = line.indexOf(Stomp.Headers.SEPARATOR);
- if (seperator_index == -1)
- {
- return null;
- }
- String name = line.substring(0, seperator_index).trim();
- String value = line.substring(seperator_index + 1, line.length()).trim();
- headers.put(name, value);
- }
- catch (Exception e)
- {
- throw new StompException("Unable to parse header line [" + line
+ "]", true);
- }
- }
- // Read in the data part.
- byte[] data = StompFrame.NO_DATA;
- String contentLength = (String)headers.get(Stomp.Headers.CONTENT_LENGTH);
- if (contentLength != null)
- {
-
- // Bless the client, he's telling us how much data to read in.
- int length;
- try
- {
- length = Integer.parseInt(contentLength.trim());
- }
- catch (NumberFormatException e)
- {
- throw new StompException("Specified content-length is not a valid
integer", true);
- }
-
- if (length > StompFrameDecoder.MAX_DATA_LENGTH)
- {
- throw new StompException("The maximum data length was exceeded",
true);
- }
-
- if (buffer.readableBytes() < length)
- {
- return null;
- }
-
- data = new byte[length];
- buffer.readBytes(data);
-
- if (!buffer.readable())
- {
- return null;
- }
- if (buffer.readByte() != 0)
- {
- throw new StompException(Stomp.Headers.CONTENT_LENGTH + " bytes were
read and " +
- "there was no trailing null byte",
true);
- }
- }
- else
- {
- byte[] body = new byte[StompFrameDecoder.MAX_DATA_LENGTH];
- boolean bodyCorrectlyEnded = false;
- int count = 0;
- while (buffer.readable())
- {
- byte b = buffer.readByte();
-
- if (b == (byte)'\0')
- {
- bodyCorrectlyEnded = true;
- break;
- }
- else
- {
- body[count++] = b;
- }
- }
-
- if (!bodyCorrectlyEnded)
- {
- return null;
- }
-
- data = new byte[count];
- System.arraycopy(body, 0, data, 0, count);
- }
-
- return new StompFrame(command, headers, data);
- }
- catch (IOException e)
- {
- log.error("Unable to decode stomp frame", e);
- return null;
- }
- }
-
- private static String readLine(HornetQBuffer in, int maxLength, String errorMessage)
throws IOException
- {
- char[] chars = new char[MAX_HEADER_LENGTH];
-
- if (!in.readable())
- {
- return null;
- }
-
- boolean properString = false;
- int count = 0;
- while (in.readable())
- {
- byte b = in.readByte();
-
- if (b == (byte)'\n')
- {
- properString = true;
- break;
- }
- else
- {
- chars[count++] = (char)b;
- }
- }
- if (properString)
- {
- return new String(chars, 0, count);
- }
- else
- {
- return null;
- }
- }
-}
Modified: trunk/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java 2010-10-04
22:20:52 UTC (rev 9745)
+++ trunk/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java 2010-10-05
09:51:27 UTC (rev 9746)
@@ -59,8 +59,6 @@
private final HornetQServer server;
- private final StompFrameDecoder frameDecoder;
-
private final Executor executor;
private final Map<String, StompSession> transactedSessions = new
HashMap<String, StompSession>();
@@ -105,7 +103,6 @@
public StompProtocolManager(final HornetQServer server, final List<Interceptor>
interceptors)
{
this.server = server;
- this.frameDecoder = new StompFrameDecoder();
this.executor = server.getExecutorFactory().getExecutor();
}
@@ -115,8 +112,9 @@
{
StompConnection conn = new StompConnection(connection, this);
- //Note that STOMP has no heartbeat, so if connection ttl is non zero, data must
continue to be sent or connection will be timed out and closed!
-
+ // Note that STOMP has no heartbeat, so if connection ttl is non zero, data must
continue to be sent or connection
+ // will be timed out and closed!
+
long ttl = server.getConfiguration().getConnectionTTLOverride();
if (ttl != -1)
@@ -127,7 +125,7 @@
{
// Default to 1 minute - which is same as core protocol
return new ConnectionEntry(conn, System.currentTimeMillis(), 1 * 60 * 1000);
- }
+ }
}
public void removeHandler(String name)
@@ -136,121 +134,123 @@
public int isReadyToHandle(HornetQBuffer buffer)
{
- int start = buffer.readerIndex();
+ // This never gets called
- StompFrame frame = frameDecoder.decode(buffer);
-
- if (frame == null)
- {
- return -1;
- }
- else
- {
- return buffer.readerIndex() - start;
- }
+ return -1;
}
public void handleBuffer(final RemotingConnection connection, final HornetQBuffer
buffer)
{
- try
- {
- doHandleBuffer(connection, buffer);
- }
- finally
- {
- server.getStorageManager().clearContext();
- }
- }
+ StompConnection conn = (StompConnection)connection;
+
+ StompDecoder decoder = conn.getDecoder();
- private void doHandleBuffer(final RemotingConnection connection, final HornetQBuffer
buffer)
- {
- StompConnection conn = (StompConnection)connection;
- StompFrame request = null;
- try
+ do
{
- request = frameDecoder.decode(buffer);
- if (log.isTraceEnabled())
+ StompFrame request;
+
+ try
{
- log.trace("received " + request);
+ request = decoder.decode(buffer);
}
+ catch (Exception e)
+ {
+ log.error("Failed to decode", e);
- String command = request.getCommand();
- StompFrame response = null;
-
- if (Stomp.Commands.CONNECT.equals(command))
- {
- response = onConnect(request, conn);
+ return;
}
- else if (Stomp.Commands.DISCONNECT.equals(command))
+
+ if (request == null)
{
- response = onDisconnect(request, conn);
+ return;
}
- else if (Stomp.Commands.SEND.equals(command))
+
+ try
{
- response = onSend(request, conn);
- }
- else if (Stomp.Commands.SUBSCRIBE.equals(command))
- {
- response = onSubscribe(request, conn);
- }
- else if (Stomp.Commands.UNSUBSCRIBE.equals(command))
- {
- response = onUnsubscribe(request, conn);
- }
- else if (Stomp.Commands.ACK.equals(command))
- {
- response = onAck(request, conn);
- }
- else if (Stomp.Commands.BEGIN.equals(command))
- {
- response = onBegin(request, server, conn);
- }
- else if (Stomp.Commands.COMMIT.equals(command))
- {
- response = onCommit(request, conn);
- }
- else if (Stomp.Commands.ABORT.equals(command))
- {
- response = onAbort(request, conn);
- }
- else
- {
- log.error("Unsupported Stomp frame: " + request);
- response = new StompFrame(Stomp.Responses.ERROR,
- new HashMap<String, Object>(),
- ("Unsupported frame: " +
command).getBytes());
- }
+ String command = request.getCommand();
- if (request.getHeaders().containsKey(Stomp.Headers.RECEIPT_REQUESTED))
- {
- if (response == null)
+ StompFrame response = null;
+
+ if (Stomp.Commands.CONNECT.equals(command))
{
- Map<String, Object> h = new HashMap<String, Object>();
- response = new StompFrame(Stomp.Responses.RECEIPT, h);
+ response = onConnect(request, conn);
}
- response.getHeaders().put(Stomp.Headers.Response.RECEIPT_ID,
-
request.getHeaders().get(Stomp.Headers.RECEIPT_REQUESTED));
- }
+ else if (Stomp.Commands.DISCONNECT.equals(command))
+ {
+ response = onDisconnect(request, conn);
+ }
+ else if (Stomp.Commands.SEND.equals(command))
+ {
+ response = onSend(request, conn);
+ }
+ else if (Stomp.Commands.SUBSCRIBE.equals(command))
+ {
+ response = onSubscribe(request, conn);
+ }
+ else if (Stomp.Commands.UNSUBSCRIBE.equals(command))
+ {
+ response = onUnsubscribe(request, conn);
+ }
+ else if (Stomp.Commands.ACK.equals(command))
+ {
+ response = onAck(request, conn);
+ }
+ else if (Stomp.Commands.BEGIN.equals(command))
+ {
+ response = onBegin(request, server, conn);
+ }
+ else if (Stomp.Commands.COMMIT.equals(command))
+ {
+ response = onCommit(request, conn);
+ }
+ else if (Stomp.Commands.ABORT.equals(command))
+ {
+ response = onAbort(request, conn);
+ }
+ else
+ {
+ log.error("Unsupported Stomp frame: " + request);
+ response = new StompFrame(Stomp.Responses.ERROR,
+ new HashMap<String, Object>(),
+ ("Unsupported frame: " +
command).getBytes());
+ }
- if (response != null)
- {
- sendReply(conn, response);
+ if (request.getHeaders().containsKey(Stomp.Headers.RECEIPT_REQUESTED))
+ {
+ log.info("receipt requested");
+ if (response == null)
+ {
+ Map<String, Object> h = new HashMap<String, Object>();
+ response = new StompFrame(Stomp.Responses.RECEIPT, h);
+ }
+ response.getHeaders().put(Stomp.Headers.Response.RECEIPT_ID,
+
request.getHeaders().get(Stomp.Headers.RECEIPT_REQUESTED));
+ }
+
+ if (response != null)
+ {
+ sendReply(conn, response);
+ }
+
+ if (Stomp.Commands.DISCONNECT.equals(command))
+ {
+ conn.destroy();
+ }
}
-
- if (Stomp.Commands.DISCONNECT.equals(command))
+ catch (Exception e)
{
- conn.destroy();
+ e.printStackTrace();
+ StompFrame error = createError(e, request);
+ if (error != null)
+ {
+ sendReply(conn, error);
+ }
}
- }
- catch (Exception e)
- {
- e.printStackTrace();
- StompFrame error = createError(e, request);
- if (error != null)
+ finally
{
- sendReply(conn, error);
+ server.getStorageManager().clearContext();
}
- }
+ } while (decoder.hasBytes());
}
// Public --------------------------------------------------------
@@ -466,7 +466,8 @@
StompSession stompSession = sessions.get(connection.getID());
if (stompSession == null)
{
- stompSession = new StompSession(connection, this,
server.getStorageManager().newContext(server.getExecutorFactory().getExecutor()));
+ stompSession = new StompSession(connection, this, server.getStorageManager()
+
.newContext(server.getExecutorFactory().getExecutor()));
String name = UUIDGenerator.getInstance().generateStringUUID();
ServerSession session = server.createSession(name,
connection.getLogin(),
@@ -516,7 +517,7 @@
cleanup(connection);
return null;
}
-
+
private StompFrame onSend(StompFrame frame, StompConnection connection) throws
Exception
{
checkConnected(connection);
@@ -554,7 +555,8 @@
{
message.putStringProperty(CONNECTION_ID_PROP, connection.getID().toString());
}
- stompSession.getSession().send(message, true);
+ stompSession.getSession().send(message, true);
+
return null;
}
Modified: trunk/src/main/org/hornetq/core/protocol/stomp/StompSession.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/stomp/StompSession.java 2010-10-04 22:20:52
UTC (rev 9745)
+++ trunk/src/main/org/hornetq/core/protocol/stomp/StompSession.java 2010-10-05 09:51:27
UTC (rev 9746)
@@ -93,7 +93,7 @@
HornetQBuffer buffer = serverMessage.getBodyBuffer();
int bodyPos = serverMessage.getEndOfBodyPosition() == -1 ? buffer.writerIndex()
- :
serverMessage.getEndOfBodyPosition();
+ :
serverMessage.getEndOfBodyPosition();
int size = bodyPos - buffer.readerIndex();
buffer.readerIndex(MessageImpl.BUFFER_HEADER_SPACE + DataConstants.SIZE_INT);
byte[] data = new byte[size];
@@ -108,7 +108,8 @@
if (text != null)
{
data = text.toString().getBytes("UTF-8");
- } else
+ }
+ else
{
data = new byte[0];
}
Modified: trunk/src/main/org/hornetq/core/protocol/stomp/StompUtils.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/stomp/StompUtils.java 2010-10-04 22:20:52 UTC
(rev 9745)
+++ trunk/src/main/org/hornetq/core/protocol/stomp/StompUtils.java 2010-10-05 09:51:27 UTC
(rev 9746)
@@ -21,6 +21,7 @@
import org.hornetq.api.core.Message;
import org.hornetq.api.core.SimpleString;
import org.hornetq.core.client.impl.ClientMessageImpl;
+import org.hornetq.core.logging.Logger;
import org.hornetq.core.message.impl.MessageInternal;
import org.hornetq.core.server.impl.ServerMessageImpl;
@@ -34,7 +35,10 @@
class StompUtils
{
// Constants -----------------------------------------------------
+
+ private static final Logger log = Logger.getLogger(StompUtils.class);
+
// Attributes ----------------------------------------------------
// Static --------------------------------------------------------
@@ -53,6 +57,7 @@
{
msg.setDurable(Boolean.parseBoolean(persistent));
}
+
// FIXME should use a proper constant
msg.putObjectProperty("JMSCorrelationID",
headers.remove(Stomp.Headers.Send.CORRELATION_ID));
msg.putObjectProperty("JMSType",
headers.remove(Stomp.Headers.Send.TYPE));
Modified: trunk/src/main/org/hornetq/core/protocol/stomp/WebSocketStompFrameEncoder.java
===================================================================
---
trunk/src/main/org/hornetq/core/protocol/stomp/WebSocketStompFrameEncoder.java 2010-10-04
22:20:52 UTC (rev 9745)
+++
trunk/src/main/org/hornetq/core/protocol/stomp/WebSocketStompFrameEncoder.java 2010-10-05
09:51:27 UTC (rev 9746)
@@ -38,20 +38,25 @@
*/
public class WebSocketStompFrameEncoder extends OneToOneEncoder
{
-
- private final StompFrameDecoder decoder = new StompFrameDecoder();
-
@Override
protected Object encode(ChannelHandlerContext ctx, Channel channel, Object msg) throws
Exception
{
if (msg instanceof ChannelBuffer)
{
+ // FIXME - this is a silly way to do this - a better way to do this would be to
create a new protocol, with protocol manager etc
+ // and re-use some of the STOMP codec stuff - Tim
+
+
// this is ugly and slow!
// we have to go ChannelBuffer -> HornetQBuffer -> StompFrame -> String
-> WebSocketFrame
// since HornetQ protocol SPI requires to return HornetQBuffer to the transport
HornetQBuffer buffer = new ChannelBufferWrapper((ChannelBuffer)msg);
+
+ StompDecoder decoder = new StompDecoder();
+
StompFrame frame = decoder.decode(buffer);
+
if (frame != null)
{
WebSocketFrame wsFrame = new DefaultWebSocketFrame(frame.asString());
Modified: trunk/src/main/org/hornetq/core/remoting/impl/netty/NettyAcceptor.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/impl/netty/NettyAcceptor.java 2010-10-04
22:20:52 UTC (rev 9745)
+++ trunk/src/main/org/hornetq/core/remoting/impl/netty/NettyAcceptor.java 2010-10-05
09:51:27 UTC (rev 9746)
@@ -356,8 +356,8 @@
if (protocol == ProtocolType.CORE)
{
// Core protocol uses its own optimised decoder
-
- handlers.put("hornetq-decode", new HornetQFrameDecoder2());
+
+ handlers.put("hornetq-decoder", new HornetQFrameDecoder2());
}
else if (protocol == ProtocolType.STOMP_WS)
{
@@ -367,13 +367,17 @@
handlers.put("hornetq-decoder", new
HornetQFrameDecoder(decoder));
handlers.put("websocket-handler", new
WebSocketServerHandler());
}
+ else if (protocol == ProtocolType.STOMP)
+ {
+ //With STOMP the decoding is handled in the StompFrame class
+ }
else
{
- handlers.put("hornetq-decoder", new
HornetQFrameDecoder(decoder));
+ handlers.put("hornetq-decoder", new
HornetQFrameDecoder(decoder));
}
handlers.put("handler", new
HornetQServerChannelHandler(channelGroup, handler, new Listener()));
-
+
/**
* STOMP_WS protocol mandates use of named handlers to be able to replace
http codecs
* by websocket codecs after handshake.
Modified: trunk/tests/src/org/hornetq/tests/integration/stomp/StompTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/stomp/StompTest.java 2010-10-04 22:20:52
UTC (rev 9745)
+++ trunk/tests/src/org/hornetq/tests/integration/stomp/StompTest.java 2010-10-05 09:51:27
UTC (rev 9746)
@@ -90,7 +90,7 @@
private JMSServerManager server;
- public void _testSendManyMessages() throws Exception
+ public void testSendManyMessages() throws Exception
{
MessageConsumer consumer = session.createConsumer(queue);
@@ -106,7 +106,7 @@
public void onMessage(Message arg0)
{
- System.out.println("<<< " + (1000 - latch.getCount()));
+ //System.out.println("<<< " + (1000 - latch.getCount()));
latch.countDown();
}
});
@@ -115,12 +115,40 @@
for (int i = 1; i <= count; i++)
{
// Thread.sleep(1);
- System.out.println(">>> " + i);
+ //System.out.println(">>> " + i);
sendFrame(frame);
}
assertTrue(latch.await(60, TimeUnit.SECONDS));
+ }
+
+ public void testPerf() throws Exception
+ {
+ String frame = "CONNECT\n" + "login: brianm\n" +
"passcode: wombats\n\n" + Stomp.NULL;
+ sendFrame(frame);
+ frame = receiveFrame(10000);
+ Assert.assertTrue(frame.startsWith("CONNECTED"));
+
+ int count = 100000;
+
+ frame = "SEND\n" + "destination:" + getQueuePrefix() +
getQueueName() + "\n\n" +
"ABCDJIMTEST<GRV>http://techcrunch.com/2010/09/23/thelikestream-digg-for-facebook-likes/<GRV>0"
+ Stomp.NULL;
+
+ long start = System.currentTimeMillis();
+
+ for (int i = 1; i <= count; i++)
+ {
+ sendFrame(frame);
+
+ if (i % 1000 == 0)
+ {
+ log.info("Sent " + i);
+ }
+ }
+
+ long end = System.currentTimeMillis();
+
+ log.info("That took " + (end-start));
}
public void testConnect() throws Exception
@@ -185,7 +213,7 @@
frame = receiveFrame(10000);
Assert.assertTrue(frame.startsWith("CONNECTED"));
- frame = "\nSEND\n" + "destination:" + getQueuePrefix() +
getQueueName() + "\n\n" + "Hello World" + Stomp.NULL;
+ frame = "SEND\n" + "destination:" + getQueuePrefix() +
getQueueName() + "\n\n" + "Hello World" + Stomp.NULL;
sendFrame(frame);
@@ -199,7 +227,38 @@
long tmsg = message.getJMSTimestamp();
Assert.assertTrue(Math.abs(tnow - tmsg) < 1000);
}
+
+ /*
+ * Some STOMP clients erroneously put a new line \n *after* the terminating NUL char
at the end of the frame
+ * This means next frame read might have a \n a the beginning.
+ * This is contrary to STOMP spec but we deal with it so we can work nicely with
crappy STOMP clients
+ */
+ public void testSendMessageWithLeadingNewLine() throws Exception
+ {
+ MessageConsumer consumer = session.createConsumer(queue);
+
+ String frame = "CONNECT\n" + "login: brianm\n" +
"passcode: wombats\n\n" + Stomp.NULL + "\n";
+ sendFrame(frame);
+
+ frame = receiveFrame(10000);
+ Assert.assertTrue(frame.startsWith("CONNECTED"));
+
+ frame = "SEND\n" + "destination:" + getQueuePrefix() +
getQueueName() + "\n\n" + "Hello World" + Stomp.NULL +
"\n";
+
+ sendFrame(frame);
+
+ TextMessage message = (TextMessage)consumer.receive(1000);
+ Assert.assertNotNull(message);
+ Assert.assertEquals("Hello World", message.getText());
+
+ // Make sure that the timestamp is valid - should
+ // be very close to the current time.
+ long tnow = System.currentTimeMillis();
+ long tmsg = message.getJMSTimestamp();
+ Assert.assertTrue(Math.abs(tnow - tmsg) < 1000);
+ }
+
public void testSendMessageWithReceipt() throws Exception
{