Author: jmesnil
Date: 2010-01-20 09:53:17 -0500 (Wed, 20 Jan 2010)
New Revision: 8812
Added:
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/stomp/StompException.java
Removed:
branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/integration/stomp/StompTest2.java
Modified:
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/stomp/StompFrameDelimiter.java
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/NettyAcceptor.java
branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/integration/stomp/StompTest.java
Log:
https://jira.jboss.org/jira/browse/HORNETQ-129: Implement STOMP v1.0
* added code to have a complete Stomp CONNECT + SEND + DISCONNECT use case
Added:
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/stomp/StompException.java
===================================================================
---
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/stomp/StompException.java
(rev 0)
+++
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/stomp/StompException.java 2010-01-20
14:53:17 UTC (rev 8812)
@@ -0,0 +1,52 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.integration.stomp;
+
+/**
+ * A StompException
+ *
+ * @author jmesnil
+ *
+ *
+ */
+public class StompException extends Exception
+{
+
+ /**
+ * @param string
+ */
+ public StompException(String string)
+ {
+ super(string);
+ }
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
Modified:
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/stomp/StompFrameDelimiter.java
===================================================================
---
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/stomp/StompFrameDelimiter.java 2010-01-20
14:38:53 UTC (rev 8811)
+++
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/stomp/StompFrameDelimiter.java 2010-01-20
14:53:17 UTC (rev 8812)
@@ -28,6 +28,6 @@
public StompFrameDelimiter()
{
- super(MAX_DATA_LENGTH, true, Delimiters.nulDelimiter());
+ super(MAX_DATA_LENGTH, false, Delimiters.nulDelimiter());
}
}
Modified:
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/NettyAcceptor.java
===================================================================
---
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/NettyAcceptor.java 2010-01-20
14:38:53 UTC (rev 8811)
+++
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/NettyAcceptor.java 2010-01-20
14:53:17 UTC (rev 8812)
@@ -13,6 +13,9 @@
package org.hornetq.integration.transports.netty;
+import java.io.ByteArrayOutputStream;
+import java.io.OutputStreamWriter;
+import java.io.PrintWriter;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.HashMap;
@@ -49,8 +52,10 @@
import org.hornetq.core.server.management.NotificationService;
import org.hornetq.integration.stomp.Stomp;
import org.hornetq.integration.stomp.StompDestinationConverter;
+import org.hornetq.integration.stomp.StompException;
import org.hornetq.integration.stomp.StompFrame;
import org.hornetq.integration.stomp.StompMarshaller;
+import org.hornetq.jms.client.HornetQBytesMessage;
import org.hornetq.jms.client.HornetQTextMessage;
import org.hornetq.spi.core.remoting.Acceptor;
import org.hornetq.spi.core.remoting.BufferHandler;
@@ -565,15 +570,16 @@
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws
Exception
{
+ StompFrame frame = (StompFrame)e.getMessage();
+ System.out.println(">>> got frame " + frame);
+
+ // need to interact with HornetQ server & session
+ HornetQServer server = serverHandler.getServer();
+ RemotingConnection connection =
serverHandler.getRemotingConnection(e.getChannel().getId());
+
try
{
- StompFrame frame = (StompFrame)e.getMessage();
- System.out.println(">>> got frame " + frame);
- // need to interact with HornetQ server & session
- HornetQServer server = serverHandler.getServer();
- RemotingConnection connection =
serverHandler.getRemotingConnection(e.getChannel().getId());
-
String command = frame.getCommand();
StompFrame response = null;
@@ -581,6 +587,10 @@
{
response = onConnect(frame, server, connection);
}
+ if (Stomp.Commands.DISCONNECT.equals(command))
+ {
+ response = onDisconnect(frame, server, connection);
+ }
else if (Stomp.Commands.SEND.equals(command))
{
response = onSend(frame, server, connection);
@@ -598,14 +608,68 @@
connection.getTransportConnection().write(buffer, true);
}
}
+ catch (StompException ex)
+ {
+ // Let the stomp client know about any protocol errors.
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ PrintWriter stream = new PrintWriter(new OutputStreamWriter(baos,
"UTF-8"));
+ ex.printStackTrace(stream);
+ stream.append(Stomp.NULL + Stomp.NEWLINE);
+ stream.close();
+
+ Map<String, Object> headers = new HashMap<String, Object>();
+ headers.put(Stomp.Headers.Error.MESSAGE, e.getMessage());
+
+ final String receiptId = (String)
frame.getHeaders().get(Stomp.Headers.RECEIPT_REQUESTED);
+ if (receiptId != null) {
+ headers.put(Stomp.Headers.Response.RECEIPT_ID, receiptId);
+ }
+
+ StompFrame errorMessage = new StompFrame(Stomp.Responses.ERROR, headers,
baos.toByteArray());
+ byte[] bytes = marshaller.marshal(errorMessage);
+ HornetQBuffer buffer = HornetQBuffers.wrappedBuffer(bytes);
+ System.out.println("ready to send reply: " + buffer);
+ connection.getTransportConnection().write(buffer, true);
+
+ }
catch (Exception ex)
{
ex.printStackTrace();
}
}
- private StompFrame onSend(StompFrame frame, HornetQServer server,
RemotingConnection connection) throws HornetQException
+ private void checkConnected(RemotingConnection connection) throws StompException
{
+ ServerSession session = sessions.get(connection);
+ if (session == null)
+ {
+ throw new StompException("Not connected");
+ }
+ }
+ private StompFrame onDisconnect(StompFrame frame, HornetQServer server,
RemotingConnection connection) throws StompException
+ {
+ checkConnected(connection);
+
+ ServerSession session = sessions.get(connection);
+ if (session != null)
+ {
+ try
+ {
+ session.close();
+ }
+ catch (Exception e)
+ {
+ throw new StompException(e.getMessage());
+ }
+ sessions.remove(connection);
+ }
+ return null;
+ }
+
+ private StompFrame onSend(StompFrame frame, HornetQServer server,
RemotingConnection connection) throws HornetQException, StompException
+ {
+ checkConnected(connection);
+
Map<String, Object> headers = frame.getHeaders();
String queue = (String)headers.get(Stomp.Headers.Send.DESTINATION);
/*
@@ -615,6 +679,10 @@
boolean durable = (Boolean)headers.get(Stomp.Headers.Send.PERSISTENT);
*/
byte type = HornetQTextMessage.TYPE;
+ if (headers.containsKey(Stomp.Headers.CONTENT_LENGTH))
+ {
+ type = HornetQBytesMessage.TYPE;
+ }
long timestamp = System.currentTimeMillis();
boolean durable = false;
long expiration = -1;
@@ -625,9 +693,15 @@
message.setType(type);
message.setTimestamp(timestamp);
message.setAddress(address);
- String content = new String(frame.getContent());
- System.out.println(">>> got: " + content);
-
message.getBodyBuffer().writeNullableSimpleString(SimpleString.toSimpleString(content));
+ byte[] content = frame.getContent();
+ if (type == HornetQTextMessage.TYPE)
+ {
+
message.getBodyBuffer().writeNullableSimpleString(SimpleString.toSimpleString(new
String(content)));
+ }
+ else
+ {
+ message.getBodyBuffer().writeBytes(content);
+ }
ServerSession session = sessions.get(connection);
SessionSendMessage packet = new SessionSendMessage(message, false);
Modified:
branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/integration/stomp/StompTest.java
===================================================================
---
branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/integration/stomp/StompTest.java 2010-01-20
14:38:53 UTC (rev 8811)
+++
branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/integration/stomp/StompTest.java 2010-01-20
14:53:17 UTC (rev 8812)
@@ -79,7 +79,32 @@
Assert.assertTrue(f.startsWith("CONNECTED"));
Assert.assertTrue(f.indexOf("response-id:1") >= 0);
}
+
+ public void testDisconnectAndError() throws Exception {
+ String connect_frame = "CONNECT\n" + "login: brianm\n" +
"passcode: wombats\n" + "request-id: 1\n" + "\n" +
Stomp.NULL;
+ sendFrame(connect_frame);
+
+ String f = receiveFrame(10000);
+ Assert.assertTrue(f.startsWith("CONNECTED"));
+ Assert.assertTrue(f.indexOf("response-id:1") >= 0);
+
+ connect_frame = "DISCONNECT\n\n" + Stomp.NULL;
+ sendFrame(connect_frame);
+
+ // sending a message will result in an error
+ String frame =
+ "SEND\n" +
+ "destination:/queue/" + getQueueName() + "\n\n" +
+ "Hello World" +
+ Stomp.NULL;
+ sendFrame(frame);
+
+ f = receiveFrame(10000);
+ Assert.assertTrue(f.startsWith("ERROR"));
+ }
+
+
public void testSendMessage() throws Exception {
MessageConsumer consumer = session.createConsumer(queue);
@@ -150,7 +175,47 @@
long tmsg = message.getJMSTimestamp();
Assert.assertTrue(Math.abs(tnow - tmsg) < 1000);
}
+
+ public void testSendMessageWithContentLength() throws Exception {
+ MessageConsumer consumer = session.createConsumer(queue);
+
+ String frame =
+ "CONNECT\n" +
+ "login: brianm\n" +
+ "passcode: wombats\n\n" +
+ Stomp.NULL;
+ sendFrame(frame);
+
+ frame = receiveFrame(10000);
+ Assert.assertTrue(frame.startsWith("CONNECTED"));
+
+ byte[] data = new byte[] {1, 2, 3, 4};
+
+ frame =
+ "SEND\n" +
+ "destination:/queue/" + getQueueName() + "\n"
+
+ "content-length:" + data.length + "\n\n" +
+ new String(data) +
+ Stomp.NULL;
+
+ sendFrame(frame);
+
+ BytesMessage message = (BytesMessage) consumer.receive(1000);
+ Assert.assertNotNull(message);
+ assertEquals(data.length, message.getBodyLength());
+ assertEquals(data[0], message.readByte());
+ assertEquals(data[1], message.readByte());
+ assertEquals(data[2], message.readByte());
+ assertEquals(data[3], message.readByte());
+
+ // Make sure that the timestamp is valid - should
+ // be very close to the current time.
+ long tnow = System.currentTimeMillis();
+ long tmsg = message.getJMSTimestamp();
+ Assert.assertTrue(Math.abs(tnow - tmsg) < 1000);
+ }
+
public void _testJMSXGroupIdCanBeSet() throws Exception {
MessageConsumer consumer = session.createConsumer(queue);
Deleted:
branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/integration/stomp/StompTest2.java
===================================================================
---
branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/integration/stomp/StompTest2.java 2010-01-20
14:38:53 UTC (rev 8811)
+++
branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/integration/stomp/StompTest2.java 2010-01-20
14:53:17 UTC (rev 8812)
@@ -1,90 +0,0 @@
-/*
- * Copyright 2009 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied. See the License for the specific language governing
- * permissions and limitations under the License.
- */
-
-package org.hornetq.tests.integration.stomp;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import org.hornetq.api.core.TransportConfiguration;
-import org.hornetq.core.config.Configuration;
-import org.hornetq.core.config.impl.ConfigurationImpl;
-import org.hornetq.core.server.HornetQServer;
-import org.hornetq.core.server.HornetQServers;
-import org.hornetq.integration.transports.netty.NettyAcceptor;
-import org.hornetq.integration.transports.netty.NettyAcceptorFactory;
-import org.hornetq.integration.transports.netty.TransportConstants;
-
-import junit.framework.TestCase;
-
-/**
- * A StompTest
- *
- * @author jmesnil
- *
- *
- */
-public class StompTest2 extends TestCase
-{
-
- // Constants -----------------------------------------------------
-
- // Attributes ----------------------------------------------------
-
- private HornetQServer server;
-
- // Static --------------------------------------------------------
-
- // Constructors --------------------------------------------------
-
- // Public --------------------------------------------------------
-
- public void testFoo() throws Exception
- {
- Thread.sleep(10);
- }
-
- // Package protected ---------------------------------------------
-
- @Override
- protected void setUp() throws Exception
- {
- super.setUp();
-
- Configuration config = new ConfigurationImpl();
- config.setSecurityEnabled(false);
-
- Map<String, Object> params = new HashMap<String, Object>();
- params.put(TransportConstants.PROTOCOL_PROP_NAME,
TransportConstants.STOMP_PROTOCOL);
- params.put(TransportConstants.PORT_PROP_NAME,
TransportConstants.DEFAULT_STOMP_PORT);
- TransportConfiguration stompTransport = new
TransportConfiguration(NettyAcceptorFactory.class.getName(), params);
- config.getAcceptorConfigurations().add(stompTransport);
-
- server = HornetQServers.newHornetQServer(config);
- server.start();
- }
-
- @Override
- protected void tearDown() throws Exception
- {
- server.stop();
-
- super.tearDown();
- }
- // Protected -----------------------------------------------------
-
- // Private -------------------------------------------------------
-
- // Inner classes -------------------------------------------------
-
-}