Author: jmesnil
Date: 2010-01-20 08:24:55 -0500 (Wed, 20 Jan 2010)
New Revision: 8810
Added:
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/stomp/StompDestinationConverter.java
Modified:
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/message/impl/MessageImpl.java
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/server/ServerMessage.java
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/NettyAcceptor.java
branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/integration/stomp/StompTest.java
branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/unit/core/postoffice/impl/BindingsImplTest.java
Log:
https://jira.jboss.org/jira/browse/HORNETQ-129: Implement STOMP v1.0
* barely enough ugly code to make StompTest.testSendMessage pass this time
Modified:
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/message/impl/MessageImpl.java
===================================================================
---
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/message/impl/MessageImpl.java 2010-01-20
11:02:05 UTC (rev 8809)
+++
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/message/impl/MessageImpl.java 2010-01-20
13:24:55 UTC (rev 8810)
@@ -249,7 +249,14 @@
{
return type;
}
+
+ public void setType(byte type)
+ {
+ this.type = type;
+ }
+
+
public boolean isDurable()
{
return durable;
Modified:
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/server/ServerMessage.java
===================================================================
---
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/server/ServerMessage.java 2010-01-20
11:02:05 UTC (rev 8809)
+++
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/server/ServerMessage.java 2010-01-20
13:24:55 UTC (rev 8810)
@@ -62,4 +62,6 @@
boolean storeIsPaging();
void encodeMessageIDToBuffer();
+
+ void setType(byte type);
}
Added:
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/stomp/StompDestinationConverter.java
===================================================================
---
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/stomp/StompDestinationConverter.java
(rev 0)
+++
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/stomp/StompDestinationConverter.java 2010-01-20
13:24:55 UTC (rev 8810)
@@ -0,0 +1,85 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.integration.stomp;
+
+import org.hornetq.api.core.HornetQException;
+import org.hornetq.api.core.SimpleString;
+import org.hornetq.jms.client.HornetQQueue;
+import org.hornetq.jms.client.HornetQTemporaryQueue;
+import org.hornetq.jms.client.HornetQTemporaryTopic;
+import org.hornetq.jms.client.HornetQTopic;
+
+/**
+ * A StompDestinationConverter
+ *
+ * @author jmesnil
+ *
+ *
+ */
+public class StompDestinationConverter
+{
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ // Static --------------------------------------------------------
+
+ public static SimpleString convertDestination(String name) throws HornetQException
+ {
+ if (name == null)
+ {
+ throw new HornetQException(HornetQException.ILLEGAL_STATE, "No destination
is specified!");
+ }
+ else if (name.startsWith("/queue/"))
+ {
+ String queueName = name.substring("/queue/".length(), name.length());
+ return HornetQQueue.createAddressFromName(queueName);
+ }
+ else if (name.startsWith("/topic/"))
+ {
+ String topicName = name.substring("/topic/".length(), name.length());
+ return HornetQTopic.createAddressFromName(topicName);
+ }
+ else if (name.startsWith("/temp-queue/"))
+ {
+ String tempName = name.substring("/temp-queue/".length(),
name.length());
+ return HornetQTemporaryQueue.createAddressFromName(tempName);
+ }
+ else if (name.startsWith("/temp-topic/"))
+ {
+ String tempName = name.substring("/temp-topic/".length(),
name.length());
+ return HornetQTemporaryTopic.createAddressFromName(tempName);
+ }
+ else
+ {
+ throw new HornetQException(HornetQException.ILLEGAL_STATE, "Illegal
destination name: [" + name +
+ "] --
StompConnect destinations " +
+ "must begine
with one of: /queue/ /topic/ /temp-queue/ /temp-topic/");
+ }
+ }
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
Modified:
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/NettyAcceptor.java
===================================================================
---
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/NettyAcceptor.java 2010-01-20
11:02:05 UTC (rev 8809)
+++
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/NettyAcceptor.java 2010-01-20
13:24:55 UTC (rev 8810)
@@ -40,13 +40,18 @@
import org.hornetq.core.remoting.RemotingConnection;
import org.hornetq.core.remoting.impl.CorePacketDecoder;
import org.hornetq.core.remoting.impl.ssl.SSLSupport;
+import org.hornetq.core.remoting.impl.wireformat.SessionSendMessage;
import org.hornetq.core.server.HornetQServer;
+import org.hornetq.core.server.ServerMessage;
import org.hornetq.core.server.ServerSession;
+import org.hornetq.core.server.impl.ServerMessageImpl;
import org.hornetq.core.server.management.Notification;
import org.hornetq.core.server.management.NotificationService;
import org.hornetq.integration.stomp.Stomp;
+import org.hornetq.integration.stomp.StompDestinationConverter;
import org.hornetq.integration.stomp.StompFrame;
import org.hornetq.integration.stomp.StompMarshaller;
+import org.hornetq.jms.client.HornetQTextMessage;
import org.hornetq.spi.core.remoting.Acceptor;
import org.hornetq.spi.core.remoting.BufferHandler;
import org.hornetq.spi.core.remoting.Connection;
@@ -160,7 +165,7 @@
this.handler = handler;
this.serverHandler = serverHandler;
-
+
this.listener = listener;
sslEnabled =
ConfigurationHelper.getBooleanProperty(TransportConstants.SSL_ENABLED_PROP_NAME,
@@ -308,7 +313,10 @@
if (protocol.equals(TransportConstants.STOMP_PROTOCOL))
{
ChannelPipelineSupport.addStompStack(pipeline, serverHandler);
- pipeline.addLast("handler", new
StompChannelHandler(serverHandler, new StompMarshaller(), channelGroup, new Listener()));
+ pipeline.addLast("handler", new
StompChannelHandler(serverHandler,
+ new
StompMarshaller(),
+ channelGroup,
+ new Listener()));
}
else
{
@@ -512,19 +520,20 @@
private final class HornetQServerChannelHandler extends AbstractServerChannelHandler
{
private PacketDecoder decoder;
+
private BufferHandler handler;
HornetQServerChannelHandler(final ChannelGroup group,
- final PacketDecoder decoder,
- final BufferHandler handler,
- final ConnectionLifeCycleListener listener)
- {
- super(group, listener);
-
- this.decoder = decoder;
- this.handler = handler;
- }
+ final PacketDecoder decoder,
+ final BufferHandler handler,
+ final ConnectionLifeCycleListener listener)
+ {
+ super(group, listener);
+ this.decoder = decoder;
+ this.handler = handler;
+ }
+
@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws
Exception
{
@@ -532,14 +541,16 @@
handler.bufferReceived(e.getChannel().getId(), new ChannelBufferWrapper(buffer),
decoder);
}
-
+
}
-
+
@ChannelPipelineCoverage("one")
public final class StompChannelHandler extends AbstractServerChannelHandler
{
private final StompMarshaller marshaller;
+ private final Map<RemotingConnection, ServerSession> sessions = new
HashMap<RemotingConnection, ServerSession>();
+
private ServerHolder serverHandler;
public StompChannelHandler(ServerHolder serverHolder,
@@ -554,31 +565,82 @@
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws
Exception
{
- StompFrame frame = (StompFrame)e.getMessage();
- System.out.println(">>> got frame " + frame);
+ try
+ {
+ StompFrame frame = (StompFrame)e.getMessage();
+ System.out.println(">>> got frame " + frame);
- // need to interact with HornetQ server & session
- HornetQServer server = serverHandler.getServer();
- RemotingConnection connection =
serverHandler.getRemotingConnection(e.getChannel().getId());
+ // need to interact with HornetQ server & session
+ HornetQServer server = serverHandler.getServer();
+ RemotingConnection connection =
serverHandler.getRemotingConnection(e.getChannel().getId());
- String command = frame.getCommand();
+ String command = frame.getCommand();
- StompFrame response = null;
- if (Stomp.Commands.CONNECT.equals(command))
+ StompFrame response = null;
+ if (Stomp.Commands.CONNECT.equals(command))
+ {
+ response = onConnect(frame, server, connection);
+ }
+ else if (Stomp.Commands.SEND.equals(command))
+ {
+ response = onSend(frame, server, connection);
+ }
+ else
+ {
+ log.error("Unsupported Stomp frame: " + frame);
+ }
+ if (response != null)
+ {
+ System.out.println(">>> will reply " + response);
+ byte[] bytes = marshaller.marshal(response);
+ HornetQBuffer buffer = HornetQBuffers.wrappedBuffer(bytes);
+ System.out.println("ready to send reply: " + buffer);
+ connection.getTransportConnection().write(buffer, true);
+ }
+ }
+ catch (Exception ex)
{
- response = onConnect(frame, server, connection);
+ ex.printStackTrace();
}
- else
+ }
+
+ private StompFrame onSend(StompFrame frame, HornetQServer server,
RemotingConnection connection) throws HornetQException
+ {
+ Map<String, Object> headers = frame.getHeaders();
+ String queue = (String)headers.get(Stomp.Headers.Send.DESTINATION);
+ /*
+ String type = (String)headers.get(Stomp.Headers.Send.TYPE);
+ long expiration = (Long)headers.get(Stomp.Headers.Send.EXPIRATION_TIME);
+ byte priority = (Byte)headers.get(Stomp.Headers.Send.PRIORITY);
+ boolean durable = (Boolean)headers.get(Stomp.Headers.Send.PERSISTENT);
+ */
+ byte type = HornetQTextMessage.TYPE;
+ long timestamp = System.currentTimeMillis();
+ boolean durable = false;
+ long expiration = -1;
+ byte priority = 9;
+ SimpleString address = StompDestinationConverter.convertDestination(queue);
+
+ ServerMessage message = new
ServerMessageImpl(server.getStorageManager().generateUniqueID(), 512);
+ message.setType(type);
+ message.setTimestamp(timestamp);
+ message.setAddress(address);
+ String content = new String(frame.getContent());
+ System.out.println(">>> got: " + content);
+
message.getBodyBuffer().writeNullableSimpleString(SimpleString.toSimpleString(content));
+
+ ServerSession session = sessions.get(connection);
+ SessionSendMessage packet = new SessionSendMessage(message, false);
+ session.handleSend(packet);
+ if (headers.containsKey(Stomp.Headers.RECEIPT_REQUESTED))
{
- log.error("Unsupported Stomp frame: " + frame);
+ Map<String, Object> h = new HashMap<String, Object>();
+ h.put(Stomp.Headers.Response.RECEIPT_ID,
headers.get(Stomp.Headers.RECEIPT_REQUESTED));
+ return new StompFrame(Stomp.Responses.RECEIPT, h, new byte[] {});
}
- if (response != null)
+ else
{
- System.out.println(">>> will reply " + response);
- byte[] bytes = marshaller.marshal(response);
- HornetQBuffer buffer = HornetQBuffers.wrappedBuffer(bytes);
- System.out.println("ready to send reply: " + buffer);
- connection.getTransportConnection().write(buffer, true);
+ return null;
}
}
@@ -603,6 +665,7 @@
false,
-1);
ServerSession session = server.getSession(name);
+ sessions.put(connection, session);
System.out.println(">>> created session " + session);
HashMap<String, Object> h = new HashMap<String, Object>();
h.put(Stomp.Headers.Connected.SESSION, name);
@@ -610,12 +673,11 @@
return new StompFrame(Stomp.Responses.CONNECTED, h, new byte[] {});
}
}
-
+
@ChannelPipelineCoverage("one")
public abstract class AbstractServerChannelHandler extends HornetQChannelHandler
{
- protected AbstractServerChannelHandler(final ChannelGroup group,
- final ConnectionLifeCycleListener listener)
+ protected AbstractServerChannelHandler(final ChannelGroup group, final
ConnectionLifeCycleListener listener)
{
super(group, listener);
}
Modified:
branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/integration/stomp/StompTest.java
===================================================================
---
branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/integration/stomp/StompTest.java 2010-01-20
11:02:05 UTC (rev 8809)
+++
branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/integration/stomp/StompTest.java 2010-01-20
13:24:55 UTC (rev 8810)
@@ -41,11 +41,10 @@
import junit.framework.Assert;
import junit.framework.TestCase;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.core.config.Configuration;
import org.hornetq.core.config.impl.ConfigurationImpl;
+import org.hornetq.core.logging.Logger;
import org.hornetq.core.remoting.impl.invm.InVMAcceptorFactory;
import org.hornetq.core.remoting.impl.invm.InVMConnectorFactory;
import org.hornetq.core.server.HornetQServer;
@@ -61,7 +60,7 @@
import org.hornetq.jms.server.impl.JMSServerManagerImpl;
public class StompTest extends TestCase {
- private static final transient Log log = LogFactory.getLog(StompTest.class);
+ private static final transient Logger log = Logger.getLogger(StompTest.class);
private int port = 61613;
private Socket stompSocket;
private ByteArrayOutputStream inputBuffer;
@@ -102,7 +101,7 @@
Stomp.NULL;
sendFrame(frame);
-
+
TextMessage message = (TextMessage) consumer.receive(1000);
Assert.assertNotNull(message);
Assert.assertEquals("Hello World", message.getText());
@@ -113,7 +112,45 @@
long tmsg = message.getJMSTimestamp();
Assert.assertTrue(Math.abs(tnow - tmsg) < 1000);
}
+
+ public void testSendMessageWithReceipt() throws Exception {
+ MessageConsumer consumer = session.createConsumer(queue);
+
+ String frame =
+ "CONNECT\n" +
+ "login: brianm\n" +
+ "passcode: wombats\n\n" +
+ Stomp.NULL;
+ sendFrame(frame);
+
+ frame = receiveFrame(10000);
+ Assert.assertTrue(frame.startsWith("CONNECTED"));
+
+ frame =
+ "SEND\n" +
+ "destination:/queue/" + getQueueName() + "\n"
+
+ "receipt: 1234\n\n" +
+ "Hello World" +
+ Stomp.NULL;
+
+ sendFrame(frame);
+
+ String f = receiveFrame(10000);
+ Assert.assertTrue(f.startsWith("RECEIPT"));
+ Assert.assertTrue(f.indexOf("receipt-id:1234") >= 0);
+
+ TextMessage message = (TextMessage) consumer.receive(1000);
+ Assert.assertNotNull(message);
+ Assert.assertEquals("Hello World", message.getText());
+
+ // Make sure that the timestamp is valid - should
+ // be very close to the current time.
+ long tnow = System.currentTimeMillis();
+ long tmsg = message.getJMSTimestamp();
+ Assert.assertTrue(Math.abs(tnow - tmsg) < 1000);
+ }
+
public void _testJMSXGroupIdCanBeSet() throws Exception {
MessageConsumer consumer = session.createConsumer(queue);
Modified:
branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/unit/core/postoffice/impl/BindingsImplTest.java
===================================================================
---
branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/unit/core/postoffice/impl/BindingsImplTest.java 2010-01-20
11:02:05 UTC (rev 8809)
+++
branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/unit/core/postoffice/impl/BindingsImplTest.java 2010-01-20
13:24:55 UTC (rev 8810)
@@ -663,6 +663,15 @@
// TODO Auto-generated method stub
return 0;
}
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.server.ServerMessage#setType(byte)
+ */
+ public void setType(byte type)
+ {
+ // TODO Auto-generated method stub
+
+ }
public HornetQBuffer getWholeBuffer()
{