Author: jmesnil
Date: 2010-01-22 10:32:13 -0500 (Fri, 22 Jan 2010)
New Revision: 8843
Added:
trunk/src/main/org/hornetq/core/protocol/core/impl/CoreSessionCallback.java
trunk/src/main/org/hornetq/core/protocol/stomp/
trunk/src/main/org/hornetq/core/protocol/stomp/Stomp.java
trunk/src/main/org/hornetq/core/protocol/stomp/StompConnection.java
trunk/src/main/org/hornetq/core/protocol/stomp/StompException.java
trunk/src/main/org/hornetq/core/protocol/stomp/StompFrame.java
trunk/src/main/org/hornetq/core/protocol/stomp/StompFrameDelimiter.java
trunk/src/main/org/hornetq/core/protocol/stomp/StompFrameError.java
trunk/src/main/org/hornetq/core/protocol/stomp/StompMarshaller.java
trunk/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java
trunk/src/main/org/hornetq/core/protocol/stomp/StompProtocolManagerFactory.java
trunk/src/main/org/hornetq/core/protocol/stomp/StompSessionCallback.java
trunk/src/main/org/hornetq/core/protocol/stomp/StompUtils.java
trunk/tests/src/org/hornetq/tests/integration/stomp/
trunk/tests/src/org/hornetq/tests/integration/stomp/StompTest.java
Modified:
trunk/build-hornetq.xml
trunk/src/main/org/hornetq/api/core/Message.java
trunk/src/main/org/hornetq/core/message/impl/MessageImpl.java
trunk/src/main/org/hornetq/core/protocol/aardvark/impl/AardvarkProtocolManager.java
trunk/src/main/org/hornetq/core/protocol/core/ServerSessionPacketHandler.java
trunk/src/main/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java
trunk/src/main/org/hornetq/core/protocol/core/impl/HornetQPacketHandler.java
trunk/src/main/org/hornetq/core/remoting/impl/invm/InVMAcceptor.java
trunk/src/main/org/hornetq/core/remoting/impl/invm/InVMAcceptorFactory.java
trunk/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java
trunk/src/main/org/hornetq/core/server/HornetQServer.java
trunk/src/main/org/hornetq/core/server/ServerSession.java
trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
trunk/src/main/org/hornetq/integration/transports/netty/ChannelPipelineSupport.java
trunk/src/main/org/hornetq/integration/transports/netty/NettyAcceptor.java
trunk/src/main/org/hornetq/integration/transports/netty/NettyAcceptorFactory.java
trunk/src/main/org/hornetq/integration/transports/netty/NettyConnection.java
trunk/src/main/org/hornetq/integration/transports/netty/TransportConstants.java
trunk/src/main/org/hornetq/jms/client/HornetQBytesMessage.java
trunk/src/main/org/hornetq/jms/client/HornetQMapMessage.java
trunk/src/main/org/hornetq/jms/client/HornetQMessage.java
trunk/src/main/org/hornetq/jms/client/HornetQObjectMessage.java
trunk/src/main/org/hornetq/jms/client/HornetQStreamMessage.java
trunk/src/main/org/hornetq/jms/client/HornetQTextMessage.java
trunk/src/main/org/hornetq/spi/core/protocol/ProtocolManager.java
trunk/src/main/org/hornetq/spi/core/remoting/AcceptorFactory.java
trunk/tests/src/org/hornetq/tests/integration/http/NettyHttpTest.java
trunk/tests/src/org/hornetq/tests/integration/remoting/AardvarkProtocolTest.java
trunk/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyAcceptorFactoryTest.java
trunk/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyAcceptorTest.java
Log:
https://jira.jboss.org/jira/browse/HORNETQ-129: Implement STOMP v1.0
* bootstrap of the Stomp protocol implementation
Modified: trunk/build-hornetq.xml
===================================================================
--- trunk/build-hornetq.xml 2010-01-22 14:25:11 UTC (rev 8842)
+++ trunk/build-hornetq.xml 2010-01-22 15:32:13 UTC (rev 8843)
@@ -1540,23 +1540,20 @@
</target>
<target name="debugServer" depends="jar">
+ <mkdir dir="logs"/>
<java
classname="org.hornetq.integration.bootstrap.HornetQBootstrapServer"
fork="true">
<jvmarg value="-XX:+UseParallelGC"/>
<jvmarg value="-Xms512M"/>
<jvmarg value="-Xmx2048M"/>
<jvmarg value="-XX:+AggressiveOpts"/>
<jvmarg value="-XX:+UseFastAccessorMethods"/>
- <jvmarg value="-Xdebug"/>
- <jvmarg value="-Xnoagent"/>
- <jvmarg value="-Djava.compiler=NONE"/>
<jvmarg value="-Dcom.sun.management.jmxremote"/>
- <jvmarg
value="-Xrunjdwp:transport=dt_socket,server=y,suspend=y,address=8000"/>
- <jvmarg
value="-Djava.util.logging.config.file=${src.config.standalone.non-clustered.dir}/logging.properties"/>
+ <jvmarg
value="-Djava.util.logging.config.file=${src.config.trunk.non-clustered.dir}/logging.properties"/>
<jvmarg value="-Djava.library.path=${native.bin.dir}"/>
- <jvmarg
value="-Djava.naming.factory.initial=org.jnp.interfaces.NamingContextFactory"/>
- <jvmarg
value="-Djava.naming.factory.url.pkgs=org.jboss.naming:org.jnp.interfaces"/>
+ <jvmarg line="-Xdebug
-Xrunjdwp:transport=dt_socket,server=y,suspend=y,address=8000"/>
<arg line="hornetq-beans.xml"/>
- <classpath path="${src.config.standalone.non-clustered.dir}" />
+ <classpath path="${src.config.trunk.non-clustered.dir}" />
+ <classpath refid="jms.standalone.server.classpath"/>
</java>
</target>
Modified: trunk/src/main/org/hornetq/api/core/Message.java
===================================================================
--- trunk/src/main/org/hornetq/api/core/Message.java 2010-01-22 14:25:11 UTC (rev 8842)
+++ trunk/src/main/org/hornetq/api/core/Message.java 2010-01-22 15:32:13 UTC (rev 8843)
@@ -70,6 +70,18 @@
public static final SimpleString HDR_LAST_VALUE_NAME = new
SimpleString("_HQ_LVQ_NAME");
+ public static final byte DEFAULT_TYPE = 0;
+
+ public static final byte OBJECT_TYPE = 2;
+
+ public static final byte TEXT_TYPE = 3;
+
+ public static final byte BYTES_TYPE = 4;
+
+ public static final byte MAP_TYPE = 5;
+
+ public static final byte STREAM_TYPE = 6;
+
/**
* Returns the messageID.
* <br>
Modified: trunk/src/main/org/hornetq/core/message/impl/MessageImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/message/impl/MessageImpl.java 2010-01-22 14:25:11 UTC
(rev 8842)
+++ trunk/src/main/org/hornetq/core/message/impl/MessageImpl.java 2010-01-22 15:32:13 UTC
(rev 8843)
@@ -252,6 +252,12 @@
return type;
}
+ public void setType(byte type)
+ {
+ this.type = type;
+ }
+
+
public boolean isDurable()
{
return durable;
Modified:
trunk/src/main/org/hornetq/core/protocol/aardvark/impl/AardvarkProtocolManager.java
===================================================================
---
trunk/src/main/org/hornetq/core/protocol/aardvark/impl/AardvarkProtocolManager.java 2010-01-22
14:25:11 UTC (rev 8842)
+++
trunk/src/main/org/hornetq/core/protocol/aardvark/impl/AardvarkProtocolManager.java 2010-01-22
15:32:13 UTC (rev 8843)
@@ -56,6 +56,10 @@
return new ConnectionEntry(conn, 0, 0);
}
+
+ public void removeHandler(String name)
+ {
+ }
public void handleBuffer(final RemotingConnection conn, final HornetQBuffer buffer)
{
@@ -69,10 +73,9 @@
true,
true,
true,
- false);
+ false,
+ new
AardvarkSessionCallback(conn.getTransportConnection()));
- session.setCallback(new
AardvarkSessionCallback(conn.getTransportConnection()));
-
final SimpleString name = new SimpleString("hornetq.aardvark");
session.createQueue(name, name, null, false, false);
Modified: trunk/src/main/org/hornetq/core/protocol/core/ServerSessionPacketHandler.java
===================================================================
---
trunk/src/main/org/hornetq/core/protocol/core/ServerSessionPacketHandler.java 2010-01-22
14:25:11 UTC (rev 8842)
+++
trunk/src/main/org/hornetq/core/protocol/core/ServerSessionPacketHandler.java 2010-01-22
15:32:13 UTC (rev 8843)
@@ -50,13 +50,11 @@
import javax.transaction.xa.Xid;
import org.hornetq.api.core.HornetQException;
-import org.hornetq.api.core.SimpleString;
import org.hornetq.core.exception.HornetQXAException;
import org.hornetq.core.journal.IOAsyncTask;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.persistence.OperationContext;
import org.hornetq.core.persistence.StorageManager;
-import org.hornetq.core.protocol.core.impl.CoreProtocolManager;
import org.hornetq.core.protocol.core.impl.PacketImpl;
import org.hornetq.core.protocol.core.impl.wireformat.CreateQueueMessage;
import org.hornetq.core.protocol.core.impl.wireformat.HornetQExceptionMessage;
@@ -71,12 +69,8 @@
import org.hornetq.core.protocol.core.impl.wireformat.SessionDeleteQueueMessage;
import org.hornetq.core.protocol.core.impl.wireformat.SessionExpiredMessage;
import org.hornetq.core.protocol.core.impl.wireformat.SessionForceConsumerDelivery;
-import org.hornetq.core.protocol.core.impl.wireformat.SessionProducerCreditsMessage;
import org.hornetq.core.protocol.core.impl.wireformat.SessionQueueQueryMessage;
import org.hornetq.core.protocol.core.impl.wireformat.SessionQueueQueryResponseMessage;
-import org.hornetq.core.protocol.core.impl.wireformat.SessionReceiveContinuationMessage;
-import org.hornetq.core.protocol.core.impl.wireformat.SessionReceiveLargeMessage;
-import org.hornetq.core.protocol.core.impl.wireformat.SessionReceiveMessage;
import
org.hornetq.core.protocol.core.impl.wireformat.SessionRequestProducerCreditsMessage;
import org.hornetq.core.protocol.core.impl.wireformat.SessionSendContinuationMessage;
import org.hornetq.core.protocol.core.impl.wireformat.SessionSendLargeMessage;
@@ -100,7 +94,6 @@
import org.hornetq.core.server.QueueQueryResult;
import org.hornetq.core.server.ServerMessage;
import org.hornetq.core.server.ServerSession;
-import org.hornetq.spi.core.protocol.SessionCallback;
/**
* A ServerSessionPacketHandler
@@ -110,12 +103,10 @@
* @author <a href="mailto:andy.taylor@jboss.org>Andy Taylor</a>
* @author <a href="mailto:clebert.suconic@jboss.org>Clebert
Suconic</a>
*/
-public class ServerSessionPacketHandler implements ChannelHandler, CloseListener,
FailureListener, SessionCallback
+public class ServerSessionPacketHandler implements ChannelHandler, CloseListener,
FailureListener
{
private static final Logger log = Logger.getLogger(ServerSessionPacketHandler.class);
- private final CoreProtocolManager protocolManager;
-
private final ServerSession session;
private final OperationContext sessionContext;
@@ -127,14 +118,11 @@
private volatile CoreRemotingConnection remotingConnection;
- public ServerSessionPacketHandler(final CoreProtocolManager protocolManager,
- final ServerSession session,
+ public ServerSessionPacketHandler(final ServerSession session,
final OperationContext sessionContext,
final StorageManager storageManager,
final Channel channel)
{
- this.protocolManager = protocolManager;
-
this.session = session;
this.storageManager = storageManager;
@@ -567,46 +555,7 @@
channel.close();
}
}
-
- public int sendLargeMessage(long consumerID, byte[] headerBuffer, long bodySize, int
deliveryCount)
- {
- Packet packet = new SessionReceiveLargeMessage(consumerID, headerBuffer, bodySize,
deliveryCount);
-
- channel.send(packet);
-
- return packet.getPacketSize();
- }
-
- public int sendLargeMessageContinuation(long consumerID, byte[] body, boolean
continues, boolean requiresResponse)
- {
- Packet packet = new SessionReceiveContinuationMessage(consumerID, body, continues,
requiresResponse);
-
- channel.send(packet);
-
- return packet.getPacketSize();
- }
-
- public int sendMessage(ServerMessage message, long consumerID, int deliveryCount)
- {
- Packet packet = new SessionReceiveMessage(consumerID, message, deliveryCount);
-
- channel.send(packet);
-
- return packet.getPacketSize();
- }
-
- public void sendProducerCreditsMessage(int credits, SimpleString address, int offset)
- {
- Packet packet = new SessionProducerCreditsMessage(credits, address, offset);
-
- channel.send(packet);
- }
- public void closed()
- {
- protocolManager.removeHandler(session.getName());
- }
-
public int transferConnection(final CoreRemotingConnection newConnection, final int
lastReceivedCommandID)
{
// We need to disable delivery on all the consumers while the transfer is
occurring- otherwise packets might get
Modified: trunk/src/main/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java 2010-01-22
14:25:11 UTC (rev 8842)
+++ trunk/src/main/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java 2010-01-22
15:32:13 UTC (rev 8843)
@@ -30,6 +30,7 @@
import org.hornetq.core.server.HornetQServer;
import org.hornetq.spi.core.protocol.ConnectionEntry;
import org.hornetq.spi.core.protocol.ProtocolManager;
+import org.hornetq.spi.core.protocol.RemotingConnection;
import org.hornetq.spi.core.remoting.Connection;
/**
@@ -120,7 +121,7 @@
sessionHandlers.remove(name);
}
- public void bufferReceived(Object connectionID, HornetQBuffer buffer)
+ public void handleBuffer(RemotingConnection connection, HornetQBuffer buffer)
{
}
Added: trunk/src/main/org/hornetq/core/protocol/core/impl/CoreSessionCallback.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/core/impl/CoreSessionCallback.java
(rev 0)
+++ trunk/src/main/org/hornetq/core/protocol/core/impl/CoreSessionCallback.java 2010-01-22
15:32:13 UTC (rev 8843)
@@ -0,0 +1,86 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.hornetq.core.protocol.core.impl;
+
+import org.hornetq.api.core.SimpleString;
+import org.hornetq.core.protocol.core.Channel;
+import org.hornetq.core.protocol.core.Packet;
+import org.hornetq.core.protocol.core.impl.wireformat.SessionProducerCreditsMessage;
+import org.hornetq.core.protocol.core.impl.wireformat.SessionReceiveContinuationMessage;
+import org.hornetq.core.protocol.core.impl.wireformat.SessionReceiveLargeMessage;
+import org.hornetq.core.protocol.core.impl.wireformat.SessionReceiveMessage;
+import org.hornetq.core.server.ServerMessage;
+import org.hornetq.spi.core.protocol.ProtocolManager;
+import org.hornetq.spi.core.protocol.SessionCallback;
+
+/**
+ * A CoreSessionCallback
+ *
+ * @author <a href="mailto:jmesnil@redhat.com">Jeff Mesnil</a>
+ *
+ *
+ */
+public final class CoreSessionCallback implements SessionCallback
+{
+ private final Channel channel;
+
+ private ProtocolManager protocolManager;
+
+ private String name;
+
+ public CoreSessionCallback(String name, ProtocolManager protocolManager, Channel
channel)
+ {
+ this.name = name;
+ this.protocolManager = protocolManager;
+ this.channel = channel;
+ }
+
+ public int sendLargeMessage(long consumerID, byte[] headerBuffer, long bodySize, int
deliveryCount)
+ {
+ Packet packet = new SessionReceiveLargeMessage(consumerID, headerBuffer, bodySize,
deliveryCount);
+
+ channel.send(packet);
+
+ return packet.getPacketSize();
+ }
+
+ public int sendLargeMessageContinuation(long consumerID, byte[] body, boolean
continues, boolean requiresResponse)
+ {
+ Packet packet = new SessionReceiveContinuationMessage(consumerID, body, continues,
requiresResponse);
+
+ channel.send(packet);
+
+ return packet.getPacketSize();
+ }
+
+ public int sendMessage(ServerMessage message, long consumerID, int deliveryCount)
+ {
+ Packet packet = new SessionReceiveMessage(consumerID, message, deliveryCount);
+
+ channel.send(packet);
+
+ return packet.getPacketSize();
+ }
+
+ public void sendProducerCreditsMessage(int credits, SimpleString address, int offset)
+ {
+ Packet packet = new SessionProducerCreditsMessage(credits, address, offset);
+
+ channel.send(packet);
+ }
+
+ public void closed()
+ {
+ protocolManager.removeHandler(name);
+ }
+}
\ No newline at end of file
Modified: trunk/src/main/org/hornetq/core/protocol/core/impl/HornetQPacketHandler.java
===================================================================
---
trunk/src/main/org/hornetq/core/protocol/core/impl/HornetQPacketHandler.java 2010-01-22
14:25:11 UTC (rev 8842)
+++
trunk/src/main/org/hornetq/core/protocol/core/impl/HornetQPacketHandler.java 2010-01-22
15:32:13 UTC (rev 8843)
@@ -163,18 +163,15 @@
request.isAutoCommitSends(),
request.isAutoCommitAcks(),
request.isPreAcknowledge(),
- request.isXA());
+ request.isXA(),
+ new
CoreSessionCallback(request.getName(), protocolManager, channel));
- ServerSessionPacketHandler handler = new
ServerSessionPacketHandler(protocolManager,
- session,
+ ServerSessionPacketHandler handler = new ServerSessionPacketHandler(session,
server.getStorageManager()
.newContext(server.getExecutorFactory()
.getExecutor()),
server.getStorageManager(),
channel);
-
- session.setCallback(handler);
-
channel.setHandler(handler);
// TODO - where is this removed?
Added: trunk/src/main/org/hornetq/core/protocol/stomp/Stomp.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/stomp/Stomp.java (rev
0)
+++ trunk/src/main/org/hornetq/core/protocol/stomp/Stomp.java 2010-01-22 15:32:13 UTC (rev
8843)
@@ -0,0 +1,124 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.hornetq.core.protocol.stomp;
+
+
+/**
+ * The standard verbs and headers used for the <a
href="http://stomp.codehaus.org/">STOMP</a> protocol.
+ *
+ * @version $Revision: 57 $
+ */
+public interface Stomp {
+ String NULL = "\u0000";
+ String NEWLINE = "\n";
+
+ public static interface Commands {
+ String CONNECT = "CONNECT";
+ String SEND = "SEND";
+ String DISCONNECT = "DISCONNECT";
+ String SUBSCRIBE = "SUBSCRIBE";
+ String UNSUBSCRIBE = "UNSUBSCRIBE";
+ String BEGIN_TRANSACTION = "BEGIN";
+ String COMMIT_TRANSACTION = "COMMIT";
+ String ABORT_TRANSACTION = "ABORT";
+ String BEGIN = "BEGIN";
+ String COMMIT = "COMMIT";
+ String ABORT = "ABORT";
+ String ACK = "ACK";
+ }
+
+ public interface Responses {
+ String CONNECTED = "CONNECTED";
+ String ERROR = "ERROR";
+ String MESSAGE = "MESSAGE";
+ String RECEIPT = "RECEIPT";
+ }
+
+ public interface Headers {
+ String SEPERATOR = ":";
+ String RECEIPT_REQUESTED = "receipt";
+ String TRANSACTION = "transaction";
+ String CONTENT_LENGTH = "content-length";
+
+ public interface Response {
+ String RECEIPT_ID = "receipt-id";
+ }
+
+ public interface Send {
+ String DESTINATION = "destination";
+ String CORRELATION_ID = "correlation-id";
+ String REPLY_TO = "reply-to";
+ String EXPIRATION_TIME = "expires";
+ String PRIORITY = "priority";
+ String TYPE = "type";
+ Object PERSISTENT = "persistent";
+ }
+
+ public interface Message {
+ String MESSAGE_ID = "message-id";
+ String DESTINATION = "destination";
+ String CORRELATION_ID = "correlation-id";
+ String EXPIRATION_TIME = "expires";
+ String REPLY_TO = "reply-to";
+ String PRORITY = "priority";
+ String REDELIVERED = "redelivered";
+ String TIMESTAMP = "timestamp";
+ String TYPE = "type";
+ String SUBSCRIPTION = "subscription";
+ }
+
+ public interface Subscribe {
+ String DESTINATION = "destination";
+ String ACK_MODE = "ack";
+ String ID = "id";
+ String SELECTOR = "selector";
+ String DURABLE_SUBSCRIPTION_NAME = "durable-subscription-name";
+ String NO_LOCAL = "no-local";
+
+ public interface AckModeValues {
+ String AUTO = "auto";
+ String CLIENT = "client";
+ }
+ }
+
+ public interface Unsubscribe {
+ String DESTINATION = "destination";
+ String ID = "id";
+ }
+
+ public interface Connect {
+ String LOGIN = "login";
+ String PASSCODE = "passcode";
+ String CLIENT_ID = "client-id";
+ String REQUEST_ID = "request-id";
+ }
+
+ public interface Error {
+ String MESSAGE = "message";
+ }
+
+ public interface Connected {
+ String SESSION = "session";
+ String RESPONSE_ID = "response-id";
+ }
+
+ public interface Ack {
+ String MESSAGE_ID = "message-id";
+ }
+ }
+}
Added: trunk/src/main/org/hornetq/core/protocol/stomp/StompConnection.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/stomp/StompConnection.java
(rev 0)
+++ trunk/src/main/org/hornetq/core/protocol/stomp/StompConnection.java 2010-01-22
15:32:13 UTC (rev 8843)
@@ -0,0 +1,140 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.protocol.stomp;
+
+import java.util.Collections;
+import java.util.List;
+
+import org.hornetq.api.core.HornetQBuffer;
+import org.hornetq.api.core.HornetQBuffers;
+import org.hornetq.api.core.HornetQException;
+import org.hornetq.core.logging.Logger;
+import org.hornetq.core.remoting.CloseListener;
+import org.hornetq.core.remoting.FailureListener;
+import org.hornetq.spi.core.protocol.ProtocolManager;
+import org.hornetq.spi.core.protocol.RemotingConnection;
+import org.hornetq.spi.core.remoting.Connection;
+
+/**
+ * A StompConnection
+ *
+ * @author <a href="mailto:jmesnil@redhat.com">Jeff Mesnil</a>
+ *
+ *
+ */
+public class StompConnection implements RemotingConnection
+{
+ private static final Logger log = Logger.getLogger(StompConnection.class);
+
+ private final ProtocolManager manager;
+
+ private final Connection transportConnection;
+
+ StompConnection(final Connection transportConnection, final ProtocolManager manager)
+ {
+ this.transportConnection = transportConnection;
+
+ this.manager = manager;
+ }
+
+ public void addCloseListener(CloseListener listener)
+ {
+ }
+
+ public void addFailureListener(FailureListener listener)
+ {
+ }
+
+ public boolean checkDataReceived()
+ {
+ return true;
+ }
+
+ public HornetQBuffer createBuffer(int size)
+ {
+ return HornetQBuffers.dynamicBuffer(size);
+ }
+
+ public void destroy()
+ {
+ }
+
+ public void disconnect()
+ {
+ }
+
+ public void fail(HornetQException me)
+ {
+ }
+
+ public void flush()
+ {
+ }
+
+ public List<FailureListener> getFailureListeners()
+ {
+ return Collections.EMPTY_LIST;
+ }
+
+ public Object getID()
+ {
+ return transportConnection.getID();
+ }
+
+ public String getRemoteAddress()
+ {
+ return transportConnection.getRemoteAddress();
+ }
+
+ public Connection getTransportConnection()
+ {
+ return transportConnection;
+ }
+
+ public boolean isClient()
+ {
+ return false;
+ }
+
+ public boolean isDestroyed()
+ {
+ return false;
+ }
+
+ public boolean removeCloseListener(CloseListener listener)
+ {
+ return false;
+ }
+
+ public boolean removeFailureListener(FailureListener listener)
+ {
+ return false;
+ }
+
+ public void setFailureListeners(List<FailureListener> listeners)
+ {
+ }
+
+
+ public void bufferReceived(Object connectionID, HornetQBuffer buffer)
+ {
+ manager.handleBuffer(this, buffer);
+ }
+
+ public int isReadyToHandle(HornetQBuffer buffer)
+ {
+ return -1;
+ }
+
+}
Added: trunk/src/main/org/hornetq/core/protocol/stomp/StompException.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/stomp/StompException.java
(rev 0)
+++ trunk/src/main/org/hornetq/core/protocol/stomp/StompException.java 2010-01-22 15:32:13
UTC (rev 8843)
@@ -0,0 +1,50 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.hornetq.core.protocol.stomp;
+
+import java.io.IOException;
+
+/**
+ * @author <a href="http://hiramchirino.com">chirino</a>
+ */
+class StompException extends IOException {
+ private static final long serialVersionUID = -2869735532997332242L;
+ private final boolean fatal;
+
+ public StompException() {
+ this(null);
+ }
+
+ public StompException(String s) {
+ this(s, false);
+ }
+
+ public StompException(String s, boolean fatal) {
+ this(s, fatal, null);
+ }
+
+ public StompException(String s, boolean fatal, Throwable cause) {
+ super(s);
+ this.fatal = fatal;
+ initCause(cause);
+ }
+
+ public boolean isFatal() {
+ return fatal;
+ }
+}
Added: trunk/src/main/org/hornetq/core/protocol/stomp/StompFrame.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/stomp/StompFrame.java
(rev 0)
+++ trunk/src/main/org/hornetq/core/protocol/stomp/StompFrame.java 2010-01-22 15:32:13 UTC
(rev 8843)
@@ -0,0 +1,70 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.hornetq.core.protocol.stomp;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Represents all the data in a STOMP frame.
+ *
+ * @author <a href="http://hiramchirino.com">chirino</a>
+ */
+class StompFrame
+{
+ private static final byte[] NO_DATA = new byte[] {};
+
+ private String command;
+
+ private Map<String, Object> headers;
+
+ private byte[] content = StompFrame.NO_DATA;
+
+ public StompFrame()
+ {
+ this.headers = new HashMap<String, Object>();
+ }
+
+ public StompFrame(String command, Map<String, Object> headers, byte[] data)
+ {
+ this.command = command;
+ this.headers = headers;
+ this.content = data;
+ }
+
+ public String getCommand()
+ {
+ return command;
+ }
+
+ public byte[] getContent()
+ {
+ return content;
+ }
+
+ public Map<String, Object> getHeaders()
+ {
+ return headers;
+ }
+
+ @Override
+ public String toString()
+ {
+ return "StompFrame[command=" + command + ", headers=" + headers
+ ",content-length=" + content.length + "]";
+ }
+}
Added: trunk/src/main/org/hornetq/core/protocol/stomp/StompFrameDelimiter.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/stomp/StompFrameDelimiter.java
(rev 0)
+++ trunk/src/main/org/hornetq/core/protocol/stomp/StompFrameDelimiter.java 2010-01-22
15:32:13 UTC (rev 8843)
@@ -0,0 +1,33 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.protocol.stomp;
+
+import org.jboss.netty.handler.codec.frame.DelimiterBasedFrameDecoder;
+import org.jboss.netty.handler.codec.frame.Delimiters;
+
+/**
+ * A StompFrameDelimiter
+ *
+ * @author <a href="mailto:jmesnil@redhat.com">Jeff Mesnil</a>
+ */
+public class StompFrameDelimiter extends DelimiterBasedFrameDecoder
+{
+
+ private static final int MAX_DATA_LENGTH = 1024 * 1024 * 100;
+
+ public StompFrameDelimiter()
+ {
+ super(MAX_DATA_LENGTH, false, Delimiters.nulDelimiter());
+ }
+}
Added: trunk/src/main/org/hornetq/core/protocol/stomp/StompFrameError.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/stomp/StompFrameError.java
(rev 0)
+++ trunk/src/main/org/hornetq/core/protocol/stomp/StompFrameError.java 2010-01-22
15:32:13 UTC (rev 8843)
@@ -0,0 +1,35 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.hornetq.core.protocol.stomp;
+
+/**
+ * Command indicating that an invalid Stomp Frame was received.
+ *
+ * @author <a href="http://hiramchirino.com">chirino</a>
+ */
+class StompFrameError extends StompFrame {
+ private final StompException exception;
+
+ public StompFrameError(StompException exception) {
+ this.exception = exception;
+ }
+
+ public StompException getException() {
+ return exception;
+ }
+}
Added: trunk/src/main/org/hornetq/core/protocol/stomp/StompMarshaller.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/stomp/StompMarshaller.java
(rev 0)
+++ trunk/src/main/org/hornetq/core/protocol/stomp/StompMarshaller.java 2010-01-22
15:32:13 UTC (rev 8843)
@@ -0,0 +1,191 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.hornetq.core.protocol.stomp;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+
+import org.hornetq.api.core.HornetQBuffer;
+
+/**
+ * Implements marshalling and unmarsalling the <a
href="http://stomp.codehaus.org/">Stomp</a> protocol.
+ */
+class StompMarshaller {
+ private static final byte[] NO_DATA = new byte[]{};
+ private static final byte[] END_OF_FRAME = new byte[]{0, '\n'};
+ private static final int MAX_COMMAND_LENGTH = 1024;
+ private static final int MAX_HEADER_LENGTH = 1024 * 10;
+ private static final int MAX_HEADERS = 1000;
+ private static final int MAX_DATA_LENGTH = 1024 * 1024 * 100;
+ private int version = 1;
+
+ public int getVersion() {
+ return version;
+ }
+
+ public void setVersion(int version) {
+ this.version = version;
+ }
+
+ public byte[] marshal(StompFrame command) throws IOException {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ DataOutputStream dos = new DataOutputStream(baos);
+ marshal(command, dos);
+ dos.close();
+ return baos.toByteArray();
+ }
+
+ public void marshal(StompFrame stomp, DataOutput os) throws IOException {
+ StringBuffer buffer = new StringBuffer();
+ buffer.append(stomp.getCommand());
+ buffer.append(Stomp.NEWLINE);
+
+ // Output the headers.
+ for (Iterator iter = stomp.getHeaders().entrySet().iterator(); iter.hasNext();)
{
+ Map.Entry entry = (Map.Entry) iter.next();
+ buffer.append(entry.getKey());
+ buffer.append(Stomp.Headers.SEPERATOR);
+ buffer.append(entry.getValue());
+ buffer.append(Stomp.NEWLINE);
+ }
+
+ // Add a newline to seperate the headers from the content.
+ buffer.append(Stomp.NEWLINE);
+
+ os.write(buffer.toString().getBytes("UTF-8"));
+ os.write(stomp.getContent());
+ os.write(END_OF_FRAME);
+ }
+
+ public StompFrame unmarshal(HornetQBuffer in) throws IOException {
+
+ try {
+ String action = null;
+
+ // skip white space to next real action line
+ while (true) {
+ action = readLine(in, MAX_COMMAND_LENGTH, "The maximum command
length was exceeded");
+ if (action == null) {
+ throw new IOException("connection was closed");
+ }
+ else {
+ action = action.trim();
+ if (action.length() > 0) {
+ break;
+ }
+ }
+ }
+
+ // Parse the headers
+ HashMap headers = new HashMap(25);
+ while (true) {
+ String line = readLine(in, MAX_HEADER_LENGTH, "The maximum header
length was exceeded");
+ if (line != null && line.trim().length() > 0) {
+
+ if (headers.size() > MAX_HEADERS) {
+ throw new StompException("The maximum number of headers was
exceeded", true);
+ }
+
+ try {
+ int seperator_index = line.indexOf(Stomp.Headers.SEPERATOR);
+ String name = line.substring(0, seperator_index).trim();
+ String value = line.substring(seperator_index + 1,
line.length()).trim();
+ headers.put(name, value);
+ }
+ catch (Exception e) {
+ throw new StompException("Unable to parser header line
[" + line + "]", true);
+ }
+ }
+ else {
+ break;
+ }
+ }
+
+ // Read in the data part.
+ byte[] data = NO_DATA;
+ String contentLength = (String) headers.get(Stomp.Headers.CONTENT_LENGTH);
+ if (contentLength != null) {
+
+ // Bless the client, he's telling us how much data to read in.
+ int length;
+ try {
+ length = Integer.parseInt(contentLength.trim());
+ }
+ catch (NumberFormatException e) {
+ throw new StompException("Specified content-length is not a
valid integer", true);
+ }
+
+ if (length > MAX_DATA_LENGTH) {
+ throw new StompException("The maximum data length was
exceeded", true);
+ }
+
+ data = new byte[length];
+ in.readBytes(data);
+
+ if (in.readByte() != 0) {
+ throw new StompException(Stomp.Headers.CONTENT_LENGTH + " bytes
were read and " + "there was no trailing null byte", true);
+ }
+ }
+ else {
+
+ // We don't know how much to read.. data ends when we hit a 0
+ byte b;
+ ByteArrayOutputStream baos = null;
+ while (in.readableBytes() > 0 && (b = in.readByte()) != 0) {
+
+ if (baos == null) {
+ baos = new ByteArrayOutputStream();
+ }
+ else if (baos.size() > MAX_DATA_LENGTH) {
+ throw new StompException("The maximum data length was
exceeded", true);
+ }
+
+ baos.write(b);
+ }
+
+ if (baos != null) {
+ baos.close();
+ data = baos.toByteArray();
+ }
+ }
+
+ return new StompFrame(action, headers, data);
+ }
+ catch (StompException e) {
+ return new StompFrameError(e);
+ }
+ }
+
+ protected String readLine(HornetQBuffer in, int maxLength, String errorMessage)
throws IOException {
+ byte b;
+ ByteArrayOutputStream baos = new ByteArrayOutputStream(maxLength);
+ while ((b = in.readByte()) != '\n') {
+ if (baos.size() > maxLength) {
+ throw new StompException(errorMessage, true);
+ }
+ baos.write(b);
+ }
+ byte[] sequence = baos.toByteArray();
+ return new String(sequence, "UTF-8");
+ }
+}
Added: trunk/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java
(rev 0)
+++ trunk/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java 2010-01-22
15:32:13 UTC (rev 8843)
@@ -0,0 +1,295 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.protocol.stomp;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.io.PrintWriter;
+import java.io.UnsupportedEncodingException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.hornetq.api.core.HornetQBuffer;
+import org.hornetq.api.core.HornetQBuffers;
+import org.hornetq.api.core.HornetQException;
+import org.hornetq.api.core.Interceptor;
+import org.hornetq.api.core.Message;
+import org.hornetq.api.core.SimpleString;
+import org.hornetq.api.core.client.HornetQClient;
+import org.hornetq.core.logging.Logger;
+import org.hornetq.core.server.HornetQServer;
+import org.hornetq.core.server.ServerMessage;
+import org.hornetq.core.server.ServerSession;
+import org.hornetq.core.server.impl.ServerMessageImpl;
+import org.hornetq.spi.core.protocol.ConnectionEntry;
+import org.hornetq.spi.core.protocol.ProtocolManager;
+import org.hornetq.spi.core.protocol.RemotingConnection;
+import org.hornetq.spi.core.remoting.Connection;
+import org.hornetq.utils.UUIDGenerator;
+
+/**
+ * StompProtocolManager
+ *
+ * A stupid protocol to demonstrate how to implement a new protocol in HornetQ
+ *
+ * @author Tim Fox
+ *
+ *
+ */
+public class StompProtocolManager implements ProtocolManager
+{
+ private static final Logger log = Logger.getLogger(StompProtocolManager.class);
+
+ private final HornetQServer server;
+
+ private final StompMarshaller marshaller;
+
+ private final Map<RemotingConnection, ServerSession> sessions = new
HashMap<RemotingConnection, ServerSession>();
+
+ public StompProtocolManager(final HornetQServer server, final List<Interceptor>
interceptors)
+ {
+ this.server = server;
+ this.marshaller = new StompMarshaller();
+ }
+
+ public ConnectionEntry createConnectionEntry(final Connection connection)
+ {
+ StompConnection conn = new StompConnection(connection, this);
+
+ return new ConnectionEntry(conn, 0, 0);
+ }
+
+ public void removeHandler(String name)
+ {
+ }
+
+ public void handleBuffer(RemotingConnection connection, HornetQBuffer buffer)
+ {
+ StompFrame frame = null;
+ try
+ {
+ frame = marshaller.unmarshal(buffer);
+ System.out.println("RECEIVED " + frame);
+
+ String command = frame.getCommand();
+
+ StompFrame response = null;
+ if (Stomp.Commands.CONNECT.equals(command))
+ {
+ response = onConnect(frame, server, connection);
+ }
+ else if (Stomp.Commands.DISCONNECT.equals(command))
+ {
+ response = onDisconnect(frame, server, connection);
+ }
+ else if (Stomp.Commands.SEND.equals(command))
+ {
+ response = onSend(frame, server, connection);
+ }
+ else if (Stomp.Commands.SUBSCRIBE.equals(command))
+ {
+ response = onSubscribe(frame, server, connection);
+ }
+ else
+ {
+ log.error("Unsupported Stomp frame: " + frame);
+ response = new StompFrame(Stomp.Responses.ERROR,
+ new HashMap<String, Object>(),
+ ("Unsupported frame: " +
command).getBytes());
+ }
+
+ if (response != null)
+ {
+ send(connection, response);
+ }
+ }
+ catch (StompException ex)
+ {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ try
+ {
+ // Let the stomp client know about any protocol errors.
+ PrintWriter stream = new PrintWriter(new OutputStreamWriter(baos,
"UTF-8"));
+ ex.printStackTrace(stream);
+ stream.close();
+ }
+ catch (UnsupportedEncodingException e)
+ {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+
+ Map<String, Object> headers = new HashMap<String, Object>();
+ headers.put(Stomp.Headers.Error.MESSAGE, ex.getMessage());
+
+ final String receiptId =
(String)frame.getHeaders().get(Stomp.Headers.RECEIPT_REQUESTED);
+ if (receiptId != null)
+ {
+ headers.put(Stomp.Headers.Response.RECEIPT_ID, receiptId);
+ }
+
+ StompFrame errorMessage = new StompFrame(Stomp.Responses.ERROR, headers,
baos.toByteArray());
+ try
+ {
+ send(connection, errorMessage);
+ }
+ catch (IOException e)
+ {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ }
+ catch (Exception ex)
+ {
+ ex.printStackTrace();
+ }
+ }
+
+ private StompFrame onSubscribe(StompFrame frame, HornetQServer server,
RemotingConnection connection) throws Exception,
+
StompException,
+
HornetQException
+ {
+ Map<String, Object> headers = frame.getHeaders();
+ String queue = (String)headers.get(Stomp.Headers.Send.DESTINATION);
+ SimpleString queueName =
SimpleString.toSimpleString(StompUtils.toHornetQAddress(queue));
+
+ ServerSession session = checkAndGetSession(connection);
+ long consumerID = server.getStorageManager().generateUniqueID();
+ session.createConsumer(consumerID, queueName, null, false);
+ session.receiveConsumerCredits(consumerID, -1);
+ session.start();
+
+ return null;
+ }
+
+ private ServerSession checkAndGetSession(RemotingConnection connection) throws
StompException
+ {
+ ServerSession session = sessions.get(connection);
+ if (session == null)
+ {
+ throw new StompException("Not connected");
+ }
+ return session;
+ }
+
+ private StompFrame onDisconnect(StompFrame frame, HornetQServer server,
RemotingConnection connection) throws StompException
+ {
+ ServerSession session = checkAndGetSession(connection);
+ if (session != null)
+ {
+ try
+ {
+ session.close();
+ }
+ catch (Exception e)
+ {
+ throw new StompException(e.getMessage());
+ }
+ sessions.remove(connection);
+ }
+ return null;
+ }
+
+ private StompFrame onSend(StompFrame frame, HornetQServer server, RemotingConnection
connection) throws Exception
+ {
+ ServerSession session = checkAndGetSession(connection);
+
+ Map<String, Object> headers = frame.getHeaders();
+ String queue = (String)headers.get(Stomp.Headers.Send.DESTINATION);
+ /*
+ String type = (String)headers.get(Stomp.Headers.Send.TYPE);
+ long expiration = (Long)headers.get(Stomp.Headers.Send.EXPIRATION_TIME);
+ byte priority = (Byte)headers.get(Stomp.Headers.Send.PRIORITY);
+ boolean durable = (Boolean)headers.get(Stomp.Headers.Send.PERSISTENT);
+ */
+ byte type = Message.TEXT_TYPE;
+ if (headers.containsKey(Stomp.Headers.CONTENT_LENGTH))
+ {
+ type = Message.BYTES_TYPE;
+ }
+ long timestamp = System.currentTimeMillis();
+ boolean durable = false;
+ long expiration = -1;
+ byte priority = 9;
+ SimpleString address =
SimpleString.toSimpleString(StompUtils.toHornetQAddress(queue));
+
+ ServerMessageImpl message = new
ServerMessageImpl(server.getStorageManager().generateUniqueID(), 512);
+ message.setType(type);
+ message.setTimestamp(timestamp);
+ message.setAddress(address);
+ byte[] content = frame.getContent();
+ if (type == Message.TEXT_TYPE)
+ {
+
message.getBodyBuffer().writeNullableSimpleString(SimpleString.toSimpleString(new
String(content)));
+ }
+ else
+ {
+ message.getBodyBuffer().writeBytes(content);
+ }
+
+ session.send(message);
+ if (headers.containsKey(Stomp.Headers.RECEIPT_REQUESTED))
+ {
+ Map<String, Object> h = new HashMap<String, Object>();
+ h.put(Stomp.Headers.Response.RECEIPT_ID,
headers.get(Stomp.Headers.RECEIPT_REQUESTED));
+ return new StompFrame(Stomp.Responses.RECEIPT, h, new byte[] {});
+ }
+ else
+ {
+ return null;
+ }
+ }
+
+ private StompFrame onConnect(StompFrame frame, HornetQServer server, final
RemotingConnection connection) throws Exception
+ {
+ Map<String, Object> headers = frame.getHeaders();
+ String login = (String)headers.get(Stomp.Headers.Connect.LOGIN);
+ String passcode = (String)headers.get(Stomp.Headers.Connect.PASSCODE);
+ String requestID = (String)headers.get(Stomp.Headers.Connect.REQUEST_ID);
+
+ String name = UUIDGenerator.getInstance().generateStringUUID();
+ ServerSession session = server.createSession(name,
+ login,
+ passcode,
+
HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE,
+ connection,
+ true,
+ true,
+ false,
+ false,
+ new StompSessionCallback(marshaller,
connection));
+ sessions.put(connection, session);
+ System.out.println(">>> created session " + session);
+ HashMap<String, Object> h = new HashMap<String, Object>();
+ h.put(Stomp.Headers.Connected.SESSION, name);
+ h.put(Stomp.Headers.Connected.RESPONSE_ID, requestID);
+ return new StompFrame(Stomp.Responses.CONNECTED, h, new byte[] {});
+ }
+
+ private void send(RemotingConnection connection, StompFrame frame) throws IOException
+ {
+ System.out.println("SENDING >>> " + frame);
+ byte[] bytes = marshaller.marshal(frame);
+ HornetQBuffer buffer = HornetQBuffers.wrappedBuffer(bytes);
+ System.out.println("ready to send reply: " + buffer);
+ connection.getTransportConnection().write(buffer, true);
+ }
+
+ public int isReadyToHandle(HornetQBuffer buffer)
+ {
+ return -1;
+ }
+}
Added: trunk/src/main/org/hornetq/core/protocol/stomp/StompProtocolManagerFactory.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/stomp/StompProtocolManagerFactory.java
(rev 0)
+++
trunk/src/main/org/hornetq/core/protocol/stomp/StompProtocolManagerFactory.java 2010-01-22
15:32:13 UTC (rev 8843)
@@ -0,0 +1,38 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.protocol.stomp;
+
+import java.util.List;
+
+import org.hornetq.api.core.Interceptor;
+import org.hornetq.core.server.HornetQServer;
+import org.hornetq.spi.core.protocol.ProtocolManager;
+import org.hornetq.spi.core.protocol.ProtocolManagerFactory;
+
+/**
+ * A StompProtocolManagerFactory
+ *
+ * @author <a href="mailto:jmesnil@redhat.com">Jeff Mesnil</a>
+ *
+ *
+ */
+public class StompProtocolManagerFactory implements ProtocolManagerFactory
+{
+
+ public ProtocolManager createProtocolManager(final HornetQServer server, final
List<Interceptor> interceptors)
+ {
+ return new StompProtocolManager(server, interceptors);
+ }
+
+}
Added: trunk/src/main/org/hornetq/core/protocol/stomp/StompSessionCallback.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/stomp/StompSessionCallback.java
(rev 0)
+++ trunk/src/main/org/hornetq/core/protocol/stomp/StompSessionCallback.java 2010-01-22
15:32:13 UTC (rev 8843)
@@ -0,0 +1,93 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.hornetq.core.protocol.stomp;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.hornetq.api.core.HornetQBuffer;
+import org.hornetq.api.core.HornetQBuffers;
+import org.hornetq.api.core.Message;
+import org.hornetq.api.core.SimpleString;
+import org.hornetq.core.server.ServerMessage;
+import org.hornetq.spi.core.protocol.RemotingConnection;
+import org.hornetq.spi.core.protocol.SessionCallback;
+
+/**
+ * A StompSessionCallback
+ *
+ * @author <a href="mailto:jmesnil@redhat.com">Jeff Mesnil</a>
+ */
+class StompSessionCallback implements SessionCallback
+{
+ private final RemotingConnection connection;
+
+ private final StompMarshaller marshaller;
+
+ StompSessionCallback(final StompMarshaller marshaller, final RemotingConnection
connection)
+ {
+ this.marshaller = marshaller;
+ this.connection = connection;
+ }
+
+ public void sendProducerCreditsMessage(int credits, SimpleString address, int offset)
+ {
+ }
+
+ public int sendMessage(ServerMessage serverMessage, long consumerID, int
deliveryCount)
+ {
+ try
+ {
+ Map<String, Object> headers = new HashMap<String, Object>();
+ headers.put(Stomp.Headers.Message.DESTINATION,
StompUtils.toStompDestination(serverMessage.getAddress()
+
.toString()));
+ byte[] data = new byte[] {};
+ if (serverMessage.getType() == Message.TEXT_TYPE)
+ {
+ SimpleString text =
serverMessage.getBodyBuffer().readNullableSimpleString();
+ if (text != null)
+ {
+ data = text.toString().getBytes();
+ }
+ }
+ StompFrame msg = new StompFrame(Stomp.Responses.MESSAGE, headers, data);
+ System.out.println("SENDING : " + msg);
+ byte[] bytes = marshaller.marshal(msg);
+ HornetQBuffer buffer = HornetQBuffers.wrappedBuffer(bytes);
+ connection.getTransportConnection().write(buffer, true);
+
+ return bytes.length;
+
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ return 0;
+ }
+
+ }
+
+ public int sendLargeMessageContinuation(long consumerID, byte[] body, boolean
continues, boolean requiresResponse)
+ {
+ return 0;
+ }
+
+ public int sendLargeMessage(long consumerID, byte[] headerBuffer, long bodySize, int
deliveryCount)
+ {
+ return 0;
+ }
+
+ public void closed()
+ {
+ }
+}
\ No newline at end of file
Added: trunk/src/main/org/hornetq/core/protocol/stomp/StompUtils.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/stomp/StompUtils.java
(rev 0)
+++ trunk/src/main/org/hornetq/core/protocol/stomp/StompUtils.java 2010-01-22 15:32:13 UTC
(rev 8843)
@@ -0,0 +1,127 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.protocol.stomp;
+
+import org.hornetq.api.core.HornetQException;
+
+/**
+ * A StompUtils
+ *
+ * @author <a href="mailto:jmesnil@redhat.com">Jeff Mesnil</a>
+ *
+ *
+ */
+class StompUtils
+{
+
+ public static String HQ_QUEUE_PREFIX = "jms.queue.";
+
+ public static String STOMP_QUEUE_PREFIX = "/queue/";
+
+ public static String HQ_TEMP_QUEUE_PREFIX = "jms.tempqueue.";
+
+ public static String STOMP_TEMP_QUEUE_PREFIX = "/temp-queue/";
+
+ public static String HQ_TOPIC_PREFIX = "jms.topic.";
+
+ public static String STOMP_TOPIC_PREFIX = "/topic/";
+
+ public static String HQ_TEMP_TOPIC_PREFIX = "jms.temptopic.";
+
+ public static String STOMP_TEMP_TOPIC_PREFIX = "/temp-topic/";
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ // Static --------------------------------------------------------
+
+ public static String toHornetQAddress(String stompDestination) throws
HornetQException
+ {
+ if (stompDestination == null)
+ {
+ throw new HornetQException(HornetQException.ILLEGAL_STATE, "No destination
is specified!");
+ }
+ else if (stompDestination.startsWith(STOMP_QUEUE_PREFIX))
+ {
+ return convert(stompDestination, STOMP_QUEUE_PREFIX, HQ_QUEUE_PREFIX);
+ }
+ else if (stompDestination.startsWith(STOMP_TOPIC_PREFIX))
+ {
+ return convert(stompDestination, STOMP_TOPIC_PREFIX, HQ_TOPIC_PREFIX);
+ }
+ else if (stompDestination.startsWith(STOMP_TEMP_QUEUE_PREFIX))
+ {
+ return convert(stompDestination, STOMP_TEMP_QUEUE_PREFIX,
HQ_TEMP_QUEUE_PREFIX);
+ }
+ else if (stompDestination.startsWith(STOMP_TEMP_TOPIC_PREFIX))
+ {
+ return convert(stompDestination, STOMP_TEMP_TOPIC_PREFIX,
HQ_TEMP_TOPIC_PREFIX);
+ }
+ else
+ {
+ throw new HornetQException(HornetQException.ILLEGAL_STATE, "Illegal
destination name: [" + stompDestination +
+ "] --
StompConnect destinations " +
+ "must begin with
one of: /queue/ /topic/ /temp-queue/ /temp-topic/");
+ }
+ }
+
+ public static String toStompDestination(String hornetqAddress) throws
HornetQException
+ {
+ if (hornetqAddress == null)
+ {
+ throw new HornetQException(HornetQException.ILLEGAL_STATE, "No destination
is specified!");
+ }
+ else if (hornetqAddress.startsWith(HQ_QUEUE_PREFIX))
+ {
+ return convert(hornetqAddress, HQ_QUEUE_PREFIX, STOMP_QUEUE_PREFIX);
+ }
+ else if (hornetqAddress.startsWith(HQ_TOPIC_PREFIX))
+ {
+ return convert(hornetqAddress, HQ_TOPIC_PREFIX, STOMP_TOPIC_PREFIX);
+ }
+ else if (hornetqAddress.startsWith(HQ_TEMP_QUEUE_PREFIX))
+ {
+ return convert(hornetqAddress, HQ_TEMP_QUEUE_PREFIX, STOMP_TEMP_QUEUE_PREFIX);
+ }
+ else if (hornetqAddress.startsWith(HQ_TEMP_TOPIC_PREFIX))
+ {
+ return convert(hornetqAddress, HQ_TEMP_TOPIC_PREFIX, STOMP_TEMP_TOPIC_PREFIX);
+ }
+ else
+ {
+ throw new HornetQException(HornetQException.ILLEGAL_STATE, "Illegal address
name: [" + hornetqAddress +
+ "] -- Acceptable
address must comply to JMS semantics");
+ }
+ }
+
+ private static String convert(String str, String oldPrefix, String newPrefix)
+ {
+ String sub = str.substring(oldPrefix.length(), str.length());
+ return new String(newPrefix + sub);
+ }
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
Modified: trunk/src/main/org/hornetq/core/remoting/impl/invm/InVMAcceptor.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/impl/invm/InVMAcceptor.java 2010-01-22
14:25:11 UTC (rev 8842)
+++ trunk/src/main/org/hornetq/core/remoting/impl/invm/InVMAcceptor.java 2010-01-22
15:32:13 UTC (rev 8843)
@@ -58,14 +58,11 @@
private boolean paused;
private NotificationService notificationService;
-
- private final ProtocolType protocol;
public InVMAcceptor(final Map<String, Object> configuration,
final BufferHandler handler,
final ConnectionLifeCycleListener listener,
- final Executor threadPool,
- final ProtocolType protocol)
+ final Executor threadPool)
{
this.handler = handler;
@@ -74,8 +71,6 @@
id = ConfigurationHelper.getIntProperty(TransportConstants.SERVER_ID_PROP_NAME, 0,
configuration);
executorFactory = new OrderedExecutorFactory(threadPool);
-
- this.protocol = protocol;
}
public synchronized void start() throws Exception
Modified: trunk/src/main/org/hornetq/core/remoting/impl/invm/InVMAcceptorFactory.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/impl/invm/InVMAcceptorFactory.java 2010-01-22
14:25:11 UTC (rev 8842)
+++ trunk/src/main/org/hornetq/core/remoting/impl/invm/InVMAcceptorFactory.java 2010-01-22
15:32:13 UTC (rev 8843)
@@ -17,7 +17,6 @@
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
-import org.hornetq.spi.core.protocol.ProtocolType;
import org.hornetq.spi.core.remoting.Acceptor;
import org.hornetq.spi.core.remoting.AcceptorFactory;
import org.hornetq.spi.core.remoting.BufferDecoder;
@@ -37,10 +36,9 @@
final BufferDecoder decoder,
final ConnectionLifeCycleListener listener,
final Executor threadPool,
- final ScheduledExecutorService scheduledThreadPool,
- final ProtocolType protocol)
+ final ScheduledExecutorService scheduledThreadPool)
{
- return new InVMAcceptor(configuration, handler, listener, threadPool, protocol);
+ return new InVMAcceptor(configuration, handler, listener, threadPool);
}
public Set<String> getAllowableProperties()
Modified: trunk/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java
===================================================================
---
trunk/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java 2010-01-22
14:25:11 UTC (rev 8842)
+++
trunk/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java 2010-01-22
15:32:13 UTC (rev 8843)
@@ -30,9 +30,11 @@
import org.hornetq.core.logging.Logger;
import org.hornetq.core.protocol.aardvark.impl.AardvarkProtocolManagerFactory;
import org.hornetq.core.protocol.core.impl.CoreProtocolManagerFactory;
+import org.hornetq.core.protocol.stomp.StompProtocolManagerFactory;
import org.hornetq.core.remoting.server.RemotingService;
import org.hornetq.core.server.HornetQServer;
import org.hornetq.core.server.management.ManagementService;
+import org.hornetq.integration.transports.netty.TransportConstants;
import org.hornetq.spi.core.protocol.ConnectionEntry;
import org.hornetq.spi.core.protocol.ProtocolManager;
import org.hornetq.spi.core.protocol.ProtocolType;
@@ -121,6 +123,7 @@
this.scheduledThreadPool = scheduledThreadPool;
this.protocolMap.put(ProtocolType.CORE, new
CoreProtocolManagerFactory().createProtocolManager(server, interceptors));
+ this.protocolMap.put(ProtocolType.STOMP, new
StompProtocolManagerFactory().createProtocolManager(server, interceptors));
this.protocolMap.put(ProtocolType.AARDVARK, new
AardvarkProtocolManagerFactory().createProtocolManager(server, interceptors));
}
@@ -159,9 +162,9 @@
}
}
- //TODO - allow protocol type to be configured from Configuration for each
acceptor
+ String protocolString =
ConfigurationHelper.getStringProperty(TransportConstants.PROTOCOL_PROP_NAME,
TransportConstants.DEFAULT_PROTOCOL, info.getParams());
- ProtocolType protocol = hackProtocol;
+ ProtocolType protocol = ProtocolType.valueOf(protocolString.toUpperCase());
ProtocolManager manager = protocolMap.get(protocol);
@@ -170,8 +173,7 @@
manager,
this,
threadPool,
- scheduledThreadPool,
- protocol);
+ scheduledThreadPool);
acceptors.add(acceptor);
@@ -200,9 +202,6 @@
started = true;
}
-
- //FIXME - temp hack so we can choose AARDVARK as protocol
- public static ProtocolType hackProtocol = ProtocolType.CORE;
public synchronized void freeze()
{
Modified: trunk/src/main/org/hornetq/core/server/HornetQServer.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/HornetQServer.java 2010-01-22 14:25:11 UTC (rev
8842)
+++ trunk/src/main/org/hornetq/core/server/HornetQServer.java 2010-01-22 15:32:13 UTC (rev
8843)
@@ -35,6 +35,7 @@
import org.hornetq.core.transaction.ResourceManager;
import org.hornetq.core.version.Version;
import org.hornetq.spi.core.protocol.RemotingConnection;
+import org.hornetq.spi.core.protocol.SessionCallback;
import org.hornetq.spi.core.security.HornetQSecurityManager;
import org.hornetq.utils.ExecutorFactory;
@@ -80,12 +81,11 @@
boolean autoCommitSends,
boolean autoCommitAcks,
boolean preAcknowledge,
- boolean xa) throws Exception;
+ boolean xa,
+ final SessionCallback callback) throws Exception;
void removeSession(String name) throws Exception;
- ServerSession getSession(String name);
-
Set<ServerSession> getSessions();
boolean isStarted();
Modified: trunk/src/main/org/hornetq/core/server/ServerSession.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/ServerSession.java 2010-01-22 14:25:11 UTC (rev
8842)
+++ trunk/src/main/org/hornetq/core/server/ServerSession.java 2010-01-22 15:32:13 UTC (rev
8843)
@@ -111,6 +111,4 @@
void setTransferring(boolean transferring);
void runConnectionFailureRunners();
-
- void setCallback(SessionCallback callback);
}
Modified: trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2010-01-22 14:25:11
UTC (rev 8842)
+++ trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2010-01-22 15:32:13
UTC (rev 8843)
@@ -106,6 +106,7 @@
import org.hornetq.core.version.Version;
import org.hornetq.spi.core.logging.LogDelegateFactory;
import org.hornetq.spi.core.protocol.RemotingConnection;
+import org.hornetq.spi.core.protocol.SessionCallback;
import org.hornetq.spi.core.security.HornetQSecurityManager;
import org.hornetq.utils.ExecutorFactory;
import org.hornetq.utils.HornetQThreadFactory;
@@ -542,7 +543,8 @@
final boolean autoCommitSends,
final boolean autoCommitAcks,
final boolean preAcknowledge,
- final boolean xa) throws Exception
+ final boolean xa,
+ final SessionCallback callback) throws Exception
{
if (securityStore != null)
{
@@ -565,7 +567,8 @@
securityStore,
managementService,
this,
-
configuration.getManagementAddress());
+
configuration.getManagementAddress(),
+ callback);
sessions.put(name, session);
@@ -595,11 +598,6 @@
sessions.remove(name);
}
- public ServerSession getSession(final String name)
- {
- return sessions.get(name);
- }
-
public synchronized List<ServerSession> getSessions(final String connectionID)
{
Set<Entry<String, ServerSession>> sessionEntries =
sessions.entrySet();
Modified: trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2010-01-22 14:25:11
UTC (rev 8842)
+++ trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2010-01-22 15:32:13
UTC (rev 8843)
@@ -134,7 +134,7 @@
private final RoutingContext routingContext = new RoutingContextImpl(null);
- private SessionCallback callback;
+ private final SessionCallback callback;
// Constructors
---------------------------------------------------------------------------------
@@ -154,7 +154,8 @@
final SecurityStore securityStore,
final ManagementService managementService,
final HornetQServer server,
- final SimpleString managementAddress) throws Exception
+ final SimpleString managementAddress,
+ final SessionCallback callback) throws Exception
{
this.username = username;
@@ -193,6 +194,8 @@
this.managementAddress = managementAddress;
+ this.callback = callback;
+
remotingConnection.addFailureListener(this);
remotingConnection.addCloseListener(this);
@@ -200,11 +203,6 @@
// ServerSession implementation
----------------------------------------------------------------------------
- public void setCallback(final SessionCallback callback)
- {
- this.callback = callback;
- }
-
public String getUsername()
{
return username;
Modified:
trunk/src/main/org/hornetq/integration/transports/netty/ChannelPipelineSupport.java
===================================================================
---
trunk/src/main/org/hornetq/integration/transports/netty/ChannelPipelineSupport.java 2010-01-22
14:25:11 UTC (rev 8842)
+++
trunk/src/main/org/hornetq/integration/transports/netty/ChannelPipelineSupport.java 2010-01-22
15:32:13 UTC (rev 8843)
@@ -16,6 +16,7 @@
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLEngine;
+import org.hornetq.core.protocol.stomp.StompFrameDelimiter;
import org.hornetq.spi.core.protocol.ProtocolType;
import org.hornetq.spi.core.remoting.BufferDecoder;
import org.jboss.netty.channel.ChannelPipeline;
@@ -54,9 +55,12 @@
//Core protocol uses it's own optimised decoder
pipeline.addLast("decoder", new HornetQFrameDecoder2());
}
+ else if (protocol == ProtocolType.STOMP)
+ {
+ pipeline.addLast("decoder", new StompFrameDelimiter());
+ }
else
{
- //Use the old frame decoder for other protocols
pipeline.addLast("decoder", new HornetQFrameDecoder(decoder));
}
}
Modified: trunk/src/main/org/hornetq/integration/transports/netty/NettyAcceptor.java
===================================================================
--- trunk/src/main/org/hornetq/integration/transports/netty/NettyAcceptor.java 2010-01-22
14:25:11 UTC (rev 8842)
+++ trunk/src/main/org/hornetq/integration/transports/netty/NettyAcceptor.java 2010-01-22
15:32:13 UTC (rev 8843)
@@ -78,7 +78,7 @@
*/
public class NettyAcceptor implements Acceptor
{
- private static final Logger log = Logger.getLogger(NettyAcceptor.class);
+ static final Logger log = Logger.getLogger(NettyAcceptor.class);
private ChannelFactory channelFactory;
@@ -106,6 +106,8 @@
private final boolean useInvm;
+ private final ProtocolType protocol;
+
private final String host;
private final int port;
@@ -134,8 +136,6 @@
private VirtualExecutorService bossExecutor;
- private final ProtocolType protocol;
-
private boolean paused;
public NettyAcceptor(final Map<String, Object> configuration,
@@ -143,8 +143,7 @@
final BufferDecoder decoder,
final ConnectionLifeCycleListener listener,
final Executor threadPool,
- final ScheduledExecutorService scheduledThreadPool,
- final ProtocolType protocol)
+ final ScheduledExecutorService scheduledThreadPool)
{
this.handler = handler;
@@ -152,8 +151,6 @@
this.listener = listener;
- this.protocol = protocol;
-
sslEnabled =
ConfigurationHelper.getBooleanProperty(TransportConstants.SSL_ENABLED_PROP_NAME,
TransportConstants.DEFAULT_SSL_ENABLED,
configuration);
@@ -190,6 +187,11 @@
useInvm =
ConfigurationHelper.getBooleanProperty(TransportConstants.USE_INVM_PROP_NAME,
TransportConstants.DEFAULT_USE_INVM,
configuration);
+ String protocolStr =
ConfigurationHelper.getStringProperty(TransportConstants.PROTOCOL_PROP_NAME,
+
TransportConstants.DEFAULT_PROTOCOL,
+ configuration);
+ protocol = ProtocolType.valueOf(protocolStr);
+
host = ConfigurationHelper.getStringProperty(TransportConstants.HOST_PROP_NAME,
TransportConstants.DEFAULT_HOST,
configuration);
Modified:
trunk/src/main/org/hornetq/integration/transports/netty/NettyAcceptorFactory.java
===================================================================
---
trunk/src/main/org/hornetq/integration/transports/netty/NettyAcceptorFactory.java 2010-01-22
14:25:11 UTC (rev 8842)
+++
trunk/src/main/org/hornetq/integration/transports/netty/NettyAcceptorFactory.java 2010-01-22
15:32:13 UTC (rev 8843)
@@ -18,7 +18,6 @@
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
-import org.hornetq.spi.core.protocol.ProtocolType;
import org.hornetq.spi.core.remoting.Acceptor;
import org.hornetq.spi.core.remoting.AcceptorFactory;
import org.hornetq.spi.core.remoting.BufferDecoder;
@@ -37,11 +36,9 @@
final BufferDecoder decoder,
final ConnectionLifeCycleListener listener,
final Executor threadPool,
- final ScheduledExecutorService scheduledThreadPool,
- final ProtocolType protocol)
+ final ScheduledExecutorService scheduledThreadPool)
{
- return new NettyAcceptor(configuration, handler, decoder, listener, threadPool,
scheduledThreadPool,
- protocol);
+ return new NettyAcceptor(configuration, handler, decoder, listener, threadPool,
scheduledThreadPool);
}
public Set<String> getAllowableProperties()
Modified: trunk/src/main/org/hornetq/integration/transports/netty/NettyConnection.java
===================================================================
---
trunk/src/main/org/hornetq/integration/transports/netty/NettyConnection.java 2010-01-22
14:25:11 UTC (rev 8842)
+++
trunk/src/main/org/hornetq/integration/transports/netty/NettyConnection.java 2010-01-22
15:32:13 UTC (rev 8843)
@@ -14,7 +14,6 @@
package org.hornetq.integration.transports.netty;
import org.hornetq.api.core.HornetQBuffer;
-import org.hornetq.api.core.HornetQException;
import org.hornetq.core.buffers.impl.ChannelBufferWrapper;
import org.hornetq.core.logging.Logger;
import org.hornetq.spi.core.protocol.ProtocolType;
Modified: trunk/src/main/org/hornetq/integration/transports/netty/TransportConstants.java
===================================================================
---
trunk/src/main/org/hornetq/integration/transports/netty/TransportConstants.java 2010-01-22
14:25:11 UTC (rev 8842)
+++
trunk/src/main/org/hornetq/integration/transports/netty/TransportConstants.java 2010-01-22
15:32:13 UTC (rev 8843)
@@ -16,6 +16,8 @@
import java.util.HashSet;
import java.util.Set;
+import org.hornetq.spi.core.protocol.ProtocolType;
+
/**
* A TransportConstants
*
@@ -46,6 +48,8 @@
public static final String USE_INVM_PROP_NAME = "use-invm";
+ public static final String PROTOCOL_PROP_NAME = "protocol";
+
public static final String HOST_PROP_NAME = "host";
public static final String PORT_PROP_NAME = "port";
@@ -75,10 +79,14 @@
public static final boolean DEFAULT_USE_SERVLET = false;
+ public static final String DEFAULT_PROTOCOL = ProtocolType.CORE.toString();
+
public static final String DEFAULT_HOST = "localhost";
public static final int DEFAULT_PORT = 5445;
+ public static final int DEFAULT_STOMP_PORT = 61613;
+
public static final String DEFAULT_KEYSTORE_PATH = "hornetq.keystore";
public static final String DEFAULT_KEYSTORE_PASSWORD = "secureexample";
@@ -120,6 +128,7 @@
allowableAcceptorKeys.add(TransportConstants.HTTP_SERVER_SCAN_PERIOD_PROP_NAME);
allowableAcceptorKeys.add(TransportConstants.USE_NIO_PROP_NAME);
allowableAcceptorKeys.add(TransportConstants.USE_INVM_PROP_NAME);
+ allowableAcceptorKeys.add(TransportConstants.PROTOCOL_PROP_NAME);
allowableAcceptorKeys.add(TransportConstants.HOST_PROP_NAME);
allowableAcceptorKeys.add(TransportConstants.PORT_PROP_NAME);
allowableAcceptorKeys.add(TransportConstants.KEYSTORE_PATH_PROP_NAME);
Modified: trunk/src/main/org/hornetq/jms/client/HornetQBytesMessage.java
===================================================================
--- trunk/src/main/org/hornetq/jms/client/HornetQBytesMessage.java 2010-01-22 14:25:11 UTC
(rev 8842)
+++ trunk/src/main/org/hornetq/jms/client/HornetQBytesMessage.java 2010-01-22 15:32:13 UTC
(rev 8843)
@@ -19,6 +19,7 @@
import javax.jms.MessageFormatException;
import org.hornetq.api.core.HornetQBuffer;
+import org.hornetq.api.core.Message;
import org.hornetq.api.core.client.ClientMessage;
import org.hornetq.api.core.client.ClientSession;
import org.hornetq.core.logging.Logger;
@@ -42,7 +43,7 @@
private static final Logger log = Logger.getLogger(HornetQBytesMessage.class);
- public static final byte TYPE = 4;
+ public static final byte TYPE = Message.BYTES_TYPE;
// Attributes ----------------------------------------------------
Modified: trunk/src/main/org/hornetq/jms/client/HornetQMapMessage.java
===================================================================
--- trunk/src/main/org/hornetq/jms/client/HornetQMapMessage.java 2010-01-22 14:25:11 UTC
(rev 8842)
+++ trunk/src/main/org/hornetq/jms/client/HornetQMapMessage.java 2010-01-22 15:32:13 UTC
(rev 8843)
@@ -22,6 +22,7 @@
import javax.jms.MapMessage;
import javax.jms.MessageFormatException;
+import org.hornetq.api.core.Message;
import org.hornetq.api.core.PropertyConversionException;
import org.hornetq.api.core.SimpleString;
import org.hornetq.api.core.client.ClientMessage;
@@ -45,7 +46,7 @@
{
// Constants -----------------------------------------------------
- public static final byte TYPE = 5;
+ public static final byte TYPE = Message.MAP_TYPE;
// Attributes ----------------------------------------------------
Modified: trunk/src/main/org/hornetq/jms/client/HornetQMessage.java
===================================================================
--- trunk/src/main/org/hornetq/jms/client/HornetQMessage.java 2010-01-22 14:25:11 UTC (rev
8842)
+++ trunk/src/main/org/hornetq/jms/client/HornetQMessage.java 2010-01-22 15:32:13 UTC (rev
8843)
@@ -83,7 +83,7 @@
private static final String JMSXGROUPID = "JMSXGroupID";
- public static final byte TYPE = 0;
+ public static final byte TYPE = org.hornetq.api.core.Message.DEFAULT_TYPE;
public static Map<String, Object> coreMaptoJMSMap(final Map<String,
Object> coreMessage)
{
Modified: trunk/src/main/org/hornetq/jms/client/HornetQObjectMessage.java
===================================================================
--- trunk/src/main/org/hornetq/jms/client/HornetQObjectMessage.java 2010-01-22 14:25:11
UTC (rev 8842)
+++ trunk/src/main/org/hornetq/jms/client/HornetQObjectMessage.java 2010-01-22 15:32:13
UTC (rev 8843)
@@ -22,6 +22,7 @@
import javax.jms.JMSException;
import javax.jms.ObjectMessage;
+import org.hornetq.api.core.Message;
import org.hornetq.api.core.client.ClientMessage;
import org.hornetq.api.core.client.ClientSession;
@@ -44,7 +45,7 @@
{
// Constants -----------------------------------------------------
- public static final byte TYPE = 2;
+ public static final byte TYPE = Message.OBJECT_TYPE;
// Attributes ----------------------------------------------------
Modified: trunk/src/main/org/hornetq/jms/client/HornetQStreamMessage.java
===================================================================
--- trunk/src/main/org/hornetq/jms/client/HornetQStreamMessage.java 2010-01-22 14:25:11
UTC (rev 8842)
+++ trunk/src/main/org/hornetq/jms/client/HornetQStreamMessage.java 2010-01-22 15:32:13
UTC (rev 8843)
@@ -19,6 +19,7 @@
import javax.jms.StreamMessage;
import org.hornetq.api.core.HornetQBuffer;
+import org.hornetq.api.core.Message;
import org.hornetq.api.core.client.ClientMessage;
import org.hornetq.api.core.client.ClientSession;
import org.hornetq.core.client.impl.ClientMessageImpl;
@@ -48,7 +49,7 @@
private static final Logger log = Logger.getLogger(HornetQStreamMessage.class);
- public static final byte TYPE = 6;
+ public static final byte TYPE = Message.STREAM_TYPE;
// Attributes ----------------------------------------------------
Modified: trunk/src/main/org/hornetq/jms/client/HornetQTextMessage.java
===================================================================
--- trunk/src/main/org/hornetq/jms/client/HornetQTextMessage.java 2010-01-22 14:25:11 UTC
(rev 8842)
+++ trunk/src/main/org/hornetq/jms/client/HornetQTextMessage.java 2010-01-22 15:32:13 UTC
(rev 8843)
@@ -17,6 +17,7 @@
import javax.jms.TextMessage;
import org.hornetq.api.core.HornetQBuffer;
+import org.hornetq.api.core.Message;
import org.hornetq.api.core.SimpleString;
import org.hornetq.api.core.client.ClientMessage;
import org.hornetq.api.core.client.ClientSession;
@@ -42,7 +43,7 @@
{
// Constants -----------------------------------------------------
- public static final byte TYPE = 3;
+ public static final byte TYPE = Message.TEXT_TYPE;
public static final Logger log = Logger.getLogger(HornetQTextMessage.class);
Modified: trunk/src/main/org/hornetq/spi/core/protocol/ProtocolManager.java
===================================================================
--- trunk/src/main/org/hornetq/spi/core/protocol/ProtocolManager.java 2010-01-22 14:25:11
UTC (rev 8842)
+++ trunk/src/main/org/hornetq/spi/core/protocol/ProtocolManager.java 2010-01-22 15:32:13
UTC (rev 8843)
@@ -13,8 +13,8 @@
package org.hornetq.spi.core.protocol;
+import org.hornetq.api.core.HornetQBuffer;
import org.hornetq.spi.core.remoting.BufferDecoder;
-import org.hornetq.spi.core.remoting.BufferHandler;
import org.hornetq.spi.core.remoting.Connection;
/**
@@ -24,7 +24,14 @@
*
*
*/
-public interface ProtocolManager extends BufferHandler, BufferDecoder
+public interface ProtocolManager extends BufferDecoder
{
ConnectionEntry createConnectionEntry(Connection connection);
+
+ public void removeHandler(final String name);
+
+ public int isReadyToHandle(HornetQBuffer buffer);
+
+ void handleBuffer(RemotingConnection connection, HornetQBuffer buffer);
+
}
Modified: trunk/src/main/org/hornetq/spi/core/remoting/AcceptorFactory.java
===================================================================
--- trunk/src/main/org/hornetq/spi/core/remoting/AcceptorFactory.java 2010-01-22 14:25:11
UTC (rev 8842)
+++ trunk/src/main/org/hornetq/spi/core/remoting/AcceptorFactory.java 2010-01-22 15:32:13
UTC (rev 8843)
@@ -18,12 +18,10 @@
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
-import org.hornetq.spi.core.protocol.ProtocolType;
-
/**
* A factory for creating acceptors.
* <p/>
- * An Acceptor is an endpoin that a {@link org.hornetq.spi.core.remoting.Connector} will
connect to and is used by the remoting service.
+ * An Acceptor is an endpoint that a {@link org.hornetq.spi.core.remoting.Connector} will
connect to and is used by the remoting service.
*
* @author <a href="ataylor(a)redhat.com">Andy Taylor</a>
* @author <a href="tim.fox(a)jboss.com">Tim Fox</a>
@@ -47,8 +45,7 @@
BufferDecoder decoder,
ConnectionLifeCycleListener listener,
Executor threadPool,
- ScheduledExecutorService scheduledThreadPool,
- ProtocolType protocol);
+ ScheduledExecutorService scheduledThreadPool);
/**
* Returns the allowable properties for this acceptor.
Modified: trunk/tests/src/org/hornetq/tests/integration/http/NettyHttpTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/http/NettyHttpTest.java 2010-01-22
14:25:11 UTC (rev 8842)
+++ trunk/tests/src/org/hornetq/tests/integration/http/NettyHttpTest.java 2010-01-22
15:32:13 UTC (rev 8843)
@@ -98,7 +98,7 @@
conf.put(TransportConstants.HTTP_CLIENT_IDLE_SCAN_PERIOD, -1l);
DummyConnectionLifeCycleListener acceptorListener = new
DummyConnectionLifeCycleListener(connCreatedLatch);
SimpleBufferHandler acceptorHandler = new SimpleBufferHandler(acceptorLatch);
- acceptor = new NettyAcceptor(conf, acceptorHandler, null, acceptorListener,
threadPool, scheduledThreadPool, ProtocolType.CORE);
+ acceptor = new NettyAcceptor(conf, acceptorHandler, null, acceptorListener,
threadPool, scheduledThreadPool);
acceptor.start();
SimpleBufferHandler2 connectorHandler = new SimpleBufferHandler2(connectorLatch);
@@ -152,7 +152,7 @@
conf.put(TransportConstants.HTTP_CLIENT_IDLE_SCAN_PERIOD, -1l);
DummyConnectionLifeCycleListener acceptorListener = new
DummyConnectionLifeCycleListener(connCreatedLatch);
SimpleBufferHandler acceptorHandler = new SimpleBufferHandler(acceptorLatch);
- acceptor = new NettyAcceptor(conf, acceptorHandler, null, acceptorListener,
threadPool, scheduledThreadPool, ProtocolType.CORE);
+ acceptor = new NettyAcceptor(conf, acceptorHandler, null, acceptorListener,
threadPool, scheduledThreadPool);
acceptor.start();
SimpleBufferHandler2 connectorHandler = new SimpleBufferHandler2(connectorLatch);
@@ -210,7 +210,7 @@
conf.put(TransportConstants.HTTP_CLIENT_IDLE_SCAN_PERIOD, -1l);
DummyConnectionLifeCycleListener acceptorListener = new
DummyConnectionLifeCycleListener(connCreatedLatch);
SimpleBufferHandler acceptorHandler = new SimpleBufferHandler(acceptorLatch);
- acceptor = new NettyAcceptor(conf, acceptorHandler, null, acceptorListener,
threadPool, scheduledThreadPool, ProtocolType.CORE);
+ acceptor = new NettyAcceptor(conf, acceptorHandler, null, acceptorListener,
threadPool, scheduledThreadPool);
acceptor.start();
SimpleBufferHandler connectorHandler = new SimpleBufferHandler(connectorLatch);
@@ -269,7 +269,7 @@
conf.put(TransportConstants.HTTP_CLIENT_IDLE_SCAN_PERIOD, -1l);
DummyConnectionLifeCycleListener acceptorListener = new
DummyConnectionLifeCycleListener(connCreatedLatch);
SimpleBufferHandler acceptorHandler = new SimpleBufferHandler(acceptorLatch);
- acceptor = new NettyAcceptor(conf, acceptorHandler, null, acceptorListener,
threadPool, scheduledThreadPool, ProtocolType.CORE);
+ acceptor = new NettyAcceptor(conf, acceptorHandler, null, acceptorListener,
threadPool, scheduledThreadPool);
acceptor.start();
SimpleBufferHandler connectorHandler = new SimpleBufferHandler(connectorLatch);
@@ -327,7 +327,7 @@
conf.put(TransportConstants.HTTP_CLIENT_IDLE_PROP_NAME, 500l);
DummyConnectionLifeCycleListener acceptorListener = new
DummyConnectionLifeCycleListener(connCreatedLatch);
SimpleBufferHandler acceptorHandler = new SimpleBufferHandler(acceptorLatch);
- acceptor = new NettyAcceptor(conf, acceptorHandler, null, acceptorListener,
threadPool, scheduledThreadPool, ProtocolType.CORE);
+ acceptor = new NettyAcceptor(conf, acceptorHandler, null, acceptorListener,
threadPool, scheduledThreadPool);
acceptor.start();
SimpleBufferHandler connectorHandler = new SimpleBufferHandler(connectorLatch);
@@ -381,7 +381,7 @@
conf.put(TransportConstants.HTTP_SERVER_SCAN_PERIOD_PROP_NAME, 5000l);
DummyConnectionLifeCycleListener acceptorListener = new
DummyConnectionLifeCycleListener(connCreatedLatch);
SimpleBufferHandler acceptorHandler = new SimpleBufferHandler(acceptorLatch);
- acceptor = new NettyAcceptor(conf, acceptorHandler, null, acceptorListener,
threadPool, scheduledThreadPool, ProtocolType.CORE);
+ acceptor = new NettyAcceptor(conf, acceptorHandler, null, acceptorListener,
threadPool, scheduledThreadPool);
acceptor.start();
BogusResponseHandler connectorHandler = new BogusResponseHandler(connectorLatch);
@@ -428,7 +428,7 @@
conf.put(TransportConstants.HTTP_SERVER_SCAN_PERIOD_PROP_NAME, 5000l);
DummyConnectionLifeCycleListener acceptorListener = new
DummyConnectionLifeCycleListener(connCreatedLatch);
SimpleBufferHandler2 acceptorHandler = new SimpleBufferHandler2(acceptorLatch);
- acceptor = new NettyAcceptor(conf, acceptorHandler, null, acceptorListener,
threadPool, scheduledThreadPool, ProtocolType.CORE);
+ acceptor = new NettyAcceptor(conf, acceptorHandler, null, acceptorListener,
threadPool, scheduledThreadPool);
acceptor.start();
BogusResponseHandler connectorHandler = new BogusResponseHandler(connectorLatch);
Modified:
trunk/tests/src/org/hornetq/tests/integration/remoting/AardvarkProtocolTest.java
===================================================================
---
trunk/tests/src/org/hornetq/tests/integration/remoting/AardvarkProtocolTest.java 2010-01-22
14:25:11 UTC (rev 8842)
+++
trunk/tests/src/org/hornetq/tests/integration/remoting/AardvarkProtocolTest.java 2010-01-22
15:32:13 UTC (rev 8843)
@@ -45,8 +45,6 @@
public void testAardvark() throws Exception
{
- RemotingServiceImpl.hackProtocol = ProtocolType.AARDVARK;
-
Configuration config = new ConfigurationImpl();
config.setSecurityEnabled(false);
@@ -56,6 +54,7 @@
params.put(TransportConstants.PORT_PROP_NAME, 9876);
params.put(TransportConstants.HOST_PROP_NAME, "127.0.0.1");
+ params.put(TransportConstants.PROTOCOL_PROP_NAME,
ProtocolType.AARDVARK.toString());
TransportConfiguration tc = new
TransportConfiguration(NettyAcceptorFactory.class.getCanonicalName(),
params);
@@ -95,7 +94,5 @@
socket.close();
server.stop();
-
- RemotingServiceImpl.hackProtocol = ProtocolType.CORE;
}
}
Added: trunk/tests/src/org/hornetq/tests/integration/stomp/StompTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/stomp/StompTest.java
(rev 0)
+++ trunk/tests/src/org/hornetq/tests/integration/stomp/StompTest.java 2010-01-22 15:32:13
UTC (rev 8843)
@@ -0,0 +1,964 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.hornetq.tests.integration.stomp;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.Socket;
+import java.net.SocketTimeoutException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import javax.jms.BytesMessage;
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.JMSException;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import junit.framework.Assert;
+import junit.framework.TestCase;
+
+import org.hornetq.api.core.TransportConfiguration;
+import org.hornetq.core.config.Configuration;
+import org.hornetq.core.config.impl.ConfigurationImpl;
+import org.hornetq.core.logging.Logger;
+import org.hornetq.core.protocol.stomp.Stomp;
+import org.hornetq.core.remoting.impl.invm.InVMAcceptorFactory;
+import org.hornetq.core.remoting.impl.invm.InVMConnectorFactory;
+import org.hornetq.core.server.HornetQServer;
+import org.hornetq.core.server.HornetQServers;
+import org.hornetq.integration.transports.netty.NettyAcceptorFactory;
+import org.hornetq.integration.transports.netty.TransportConstants;
+import org.hornetq.jms.client.HornetQConnectionFactory;
+import org.hornetq.jms.server.JMSServerManager;
+import org.hornetq.jms.server.config.JMSConfiguration;
+import org.hornetq.jms.server.config.impl.JMSConfigurationImpl;
+import org.hornetq.jms.server.config.impl.QueueConfigurationImpl;
+import org.hornetq.jms.server.impl.JMSServerManagerImpl;
+import org.hornetq.spi.core.protocol.ProtocolType;
+
+public class StompTest extends TestCase {
+ private static final transient Logger log = Logger.getLogger(StompTest.class);
+ private int port = 61613;
+ private Socket stompSocket;
+ private ByteArrayOutputStream inputBuffer;
+ private ConnectionFactory connectionFactory;
+ private Connection connection;
+ private Session session;
+ private Queue queue;
+ private JMSServerManager server;
+
+ public void testConnect() throws Exception {
+
+ String connect_frame = "CONNECT\n" + "login: brianm\n" +
"passcode: wombats\n" + "request-id: 1\n" + "\n" +
Stomp.NULL;
+ sendFrame(connect_frame);
+
+ String f = receiveFrame(10000);
+ Assert.assertTrue(f.startsWith("CONNECTED"));
+ Assert.assertTrue(f.indexOf("response-id:1") >= 0);
+ }
+
+ public void testDisconnectAndError() throws Exception {
+
+ String connect_frame = "CONNECT\n" + "login: brianm\n" +
"passcode: wombats\n" + "request-id: 1\n" + "\n" +
Stomp.NULL;
+ sendFrame(connect_frame);
+
+ String f = receiveFrame(10000);
+ Assert.assertTrue(f.startsWith("CONNECTED"));
+ Assert.assertTrue(f.indexOf("response-id:1") >= 0);
+
+ connect_frame = "DISCONNECT\n\n" + Stomp.NULL;
+ sendFrame(connect_frame);
+
+ // sending a message will result in an error
+ String frame =
+ "SEND\n" +
+ "destination:/queue/" + getQueueName() + "\n\n" +
+ "Hello World" +
+ Stomp.NULL;
+ sendFrame(frame);
+
+ f = receiveFrame(10000);
+ Assert.assertTrue(f.startsWith("ERROR"));
+ }
+
+
+ public void testSendMessage() throws Exception {
+
+ MessageConsumer consumer = session.createConsumer(queue);
+
+ String frame =
+ "CONNECT\n" +
+ "login: brianm\n" +
+ "passcode: wombats\n\n" +
+ Stomp.NULL;
+ sendFrame(frame);
+
+ frame = receiveFrame(10000);
+ Assert.assertTrue(frame.startsWith("CONNECTED"));
+
+ frame =
+ "SEND\n" +
+ "destination:/queue/" + getQueueName() +
"\n\n" +
+ "Hello World" +
+ Stomp.NULL;
+
+ sendFrame(frame);
+
+ TextMessage message = (TextMessage) consumer.receive(1000);
+ Assert.assertNotNull(message);
+ Assert.assertEquals("Hello World", message.getText());
+
+ // Make sure that the timestamp is valid - should
+ // be very close to the current time.
+ long tnow = System.currentTimeMillis();
+ long tmsg = message.getJMSTimestamp();
+ Assert.assertTrue(Math.abs(tnow - tmsg) < 1000);
+ }
+
+ public void testSendMessageWithReceipt() throws Exception {
+
+ MessageConsumer consumer = session.createConsumer(queue);
+
+ String frame =
+ "CONNECT\n" +
+ "login: brianm\n" +
+ "passcode: wombats\n\n" +
+ Stomp.NULL;
+ sendFrame(frame);
+
+ frame = receiveFrame(10000);
+ Assert.assertTrue(frame.startsWith("CONNECTED"));
+
+ frame =
+ "SEND\n" +
+ "destination:/queue/" + getQueueName() + "\n"
+
+ "receipt: 1234\n\n" +
+ "Hello World" +
+ Stomp.NULL;
+
+ sendFrame(frame);
+
+ String f = receiveFrame(10000);
+ Assert.assertTrue(f.startsWith("RECEIPT"));
+ Assert.assertTrue(f.indexOf("receipt-id:1234") >= 0);
+
+ TextMessage message = (TextMessage) consumer.receive(1000);
+ Assert.assertNotNull(message);
+ Assert.assertEquals("Hello World", message.getText());
+
+ // Make sure that the timestamp is valid - should
+ // be very close to the current time.
+ long tnow = System.currentTimeMillis();
+ long tmsg = message.getJMSTimestamp();
+ Assert.assertTrue(Math.abs(tnow - tmsg) < 1000);
+ }
+
+ public void testSendMessageWithContentLength() throws Exception {
+
+ MessageConsumer consumer = session.createConsumer(queue);
+
+ String frame =
+ "CONNECT\n" +
+ "login: brianm\n" +
+ "passcode: wombats\n\n" +
+ Stomp.NULL;
+ sendFrame(frame);
+
+ frame = receiveFrame(10000);
+ Assert.assertTrue(frame.startsWith("CONNECTED"));
+
+ byte[] data = new byte[] {1, 2, 3, 4};
+
+ frame =
+ "SEND\n" +
+ "destination:/queue/" + getQueueName() + "\n"
+
+ "content-length:" + data.length + "\n\n" +
+ new String(data) +
+ Stomp.NULL;
+
+ sendFrame(frame);
+
+ BytesMessage message = (BytesMessage) consumer.receive(1000);
+ Assert.assertNotNull(message);
+ assertEquals(data.length, message.getBodyLength());
+ assertEquals(data[0], message.readByte());
+ assertEquals(data[1], message.readByte());
+ assertEquals(data[2], message.readByte());
+ assertEquals(data[3], message.readByte());
+
+ // Make sure that the timestamp is valid - should
+ // be very close to the current time.
+ long tnow = System.currentTimeMillis();
+ long tmsg = message.getJMSTimestamp();
+ Assert.assertTrue(Math.abs(tnow - tmsg) < 1000);
+ }
+
+ public void _testJMSXGroupIdCanBeSet() throws Exception {
+
+ MessageConsumer consumer = session.createConsumer(queue);
+
+ String frame =
+ "CONNECT\n" +
+ "login: brianm\n" +
+ "passcode: wombats\n\n" +
+ Stomp.NULL;
+ sendFrame(frame);
+
+ frame = receiveFrame(10000);
+ Assert.assertTrue(frame.startsWith("CONNECTED"));
+
+ frame =
+ "SEND\n" +
+ "destination:/queue/" + getQueueName() + "\n"
+
+ "JMSXGroupID: TEST\n\n" +
+ "Hello World" +
+ Stomp.NULL;
+
+ sendFrame(frame);
+
+ TextMessage message = (TextMessage) consumer.receive(1000);
+ Assert.assertNotNull(message);
+ // TODO do we support it?
+ //Assert.assertEquals("TEST", ((TextMessage) message).getGroupID());
+ }
+
+ public void _testSendMessageWithCustomHeadersAndSelector() throws Exception {
+
+ MessageConsumer consumer = session.createConsumer(queue, "foo =
'abc'");
+
+ String frame =
+ "CONNECT\n" +
+ "login: brianm\n" +
+ "passcode: wombats\n\n" +
+ Stomp.NULL;
+ sendFrame(frame);
+
+ frame = receiveFrame(10000);
+ Assert.assertTrue(frame.startsWith("CONNECTED"));
+
+ frame =
+ "SEND\n" +
+ "foo:abc\n" +
+ "bar:123\n" +
+ "destination:/queue/" + getQueueName() +
"\n\n" +
+ "Hello World" +
+ Stomp.NULL;
+
+ sendFrame(frame);
+
+ TextMessage message = (TextMessage) consumer.receive(1000);
+ Assert.assertNotNull(message);
+ Assert.assertEquals("Hello World", message.getText());
+ Assert.assertEquals("foo", "abc",
message.getStringProperty("foo"));
+ Assert.assertEquals("bar", "123",
message.getStringProperty("bar"));
+ }
+
+ public void _testSendMessageWithStandardHeaders() throws Exception {
+
+ MessageConsumer consumer = session.createConsumer(queue);
+
+ String frame =
+ "CONNECT\n" +
+ "login: brianm\n" +
+ "passcode: wombats\n\n" +
+ Stomp.NULL;
+ sendFrame(frame);
+
+ frame = receiveFrame(10000);
+ Assert.assertTrue(frame.startsWith("CONNECTED"));
+
+ frame =
+ "SEND\n" +
+ "correlation-id:c123\n" +
+ "priority:3\n" +
+ "type:t345\n" +
+ "JMSXGroupID:abc\n" +
+ "foo:abc\n" +
+ "bar:123\n" +
+ "destination:/queue/" + getQueueName() +
"\n\n" +
+ "Hello World" +
+ Stomp.NULL;
+
+ sendFrame(frame);
+
+ TextMessage message = (TextMessage) consumer.receive(1000);
+ Assert.assertNotNull(message);
+ Assert.assertEquals("Hello World", message.getText());
+ Assert.assertEquals("JMSCorrelationID", "c123",
message.getJMSCorrelationID());
+ Assert.assertEquals("getJMSType", "t345",
message.getJMSType());
+ Assert.assertEquals("getJMSPriority", 3, message.getJMSPriority());
+ Assert.assertEquals("foo", "abc",
message.getStringProperty("foo"));
+ Assert.assertEquals("bar", "123",
message.getStringProperty("bar"));
+
+ Assert.assertEquals("JMSXGroupID", "abc",
message.getStringProperty("JMSXGroupID"));
+ // FIXME do we support it?
+ //Assert.assertEquals("GroupID", "abc",
amqMessage.getGroupID());
+ }
+
+ public void testSubscribeWithAutoAck() throws Exception {
+
+ String frame =
+ "CONNECT\n" +
+ "login: brianm\n" +
+ "passcode: wombats\n\n" +
+ Stomp.NULL;
+ sendFrame(frame);
+
+ frame = receiveFrame(100000);
+ Assert.assertTrue(frame.startsWith("CONNECTED"));
+
+ frame =
+ "SUBSCRIBE\n" +
+ "destination:/queue/" + getQueueName() + "\n"
+
+ "ack:auto\n\n" +
+ Stomp.NULL;
+ sendFrame(frame);
+
+ sendMessage(getName());
+
+ frame = receiveFrame(10000);
+ Assert.assertTrue(frame.startsWith("MESSAGE"));
+ Assert.assertTrue(frame.indexOf("destination:") > 0);
+ Assert.assertTrue(frame.indexOf(getName()) > 0);
+
+ frame =
+ "DISCONNECT\n" +
+ "\n\n" +
+ Stomp.NULL;
+ sendFrame(frame);
+ }
+
+ public void _testSubscribeWithAutoAckAndBytesMessage() throws Exception {
+
+ String frame =
+ "CONNECT\n" +
+ "login: brianm\n" +
+ "passcode: wombats\n\n" +
+ Stomp.NULL;
+ sendFrame(frame);
+
+ frame = receiveFrame(100000);
+ Assert.assertTrue(frame.startsWith("CONNECTED"));
+
+ frame =
+ "SUBSCRIBE\n" +
+ "destination:/queue/" + getQueueName() + "\n"
+
+ "ack:auto\n\n" +
+ Stomp.NULL;
+ sendFrame(frame);
+
+ sendBytesMessage(new byte[]{1, 2, 3, 4, 5});
+
+ frame = receiveFrame(10000);
+ Assert.assertTrue(frame.startsWith("MESSAGE"));
+
+ Pattern cl = Pattern.compile("Content-length:\\s*(\\d+)",
Pattern.CASE_INSENSITIVE);
+ Matcher cl_matcher = cl.matcher(frame);
+ Assert.assertTrue(cl_matcher.find());
+ Assert.assertEquals("5", cl_matcher.group(1));
+
+ Assert.assertFalse(Pattern.compile("type:\\s*null",
Pattern.CASE_INSENSITIVE).matcher(frame).find());
+
+ frame =
+ "DISCONNECT\n" +
+ "\n\n" +
+ Stomp.NULL;
+ sendFrame(frame);
+ }
+
+ public void _testSubscribeWithMessageSentWithProperties() throws Exception {
+
+ String frame =
+ "CONNECT\n" +
+ "login: brianm\n" +
+ "passcode: wombats\n\n" +
+ Stomp.NULL;
+ sendFrame(frame);
+
+ frame = receiveFrame(100000);
+ Assert.assertTrue(frame.startsWith("CONNECTED"));
+
+ frame =
+ "SUBSCRIBE\n" +
+ "destination:/queue/" + getQueueName() + "\n"
+
+ "ack:auto\n\n" +
+ Stomp.NULL;
+ sendFrame(frame);
+
+ MessageProducer producer = session.createProducer(queue);
+ TextMessage message = session.createTextMessage("Hello World");
+ message.setStringProperty("S", "value");
+ message.setBooleanProperty("n", false);
+ message.setByteProperty("byte", (byte) 9);
+ message.setDoubleProperty("d", 2.0);
+ message.setFloatProperty("f", (float) 6.0);
+ message.setIntProperty("i", 10);
+ message.setLongProperty("l", 121);
+ message.setShortProperty("s", (short) 12);
+ producer.send(message);
+
+ frame = receiveFrame(10000);
+ Assert.assertTrue(frame.startsWith("MESSAGE"));
+ Assert.assertTrue(frame.indexOf("S:") > 0);
+ Assert.assertTrue(frame.indexOf("n:") > 0);
+ Assert.assertTrue(frame.indexOf("byte:") > 0);
+ Assert.assertTrue(frame.indexOf("d:") > 0);
+ Assert.assertTrue(frame.indexOf("f:") > 0);
+ Assert.assertTrue(frame.indexOf("i:") > 0);
+ Assert.assertTrue(frame.indexOf("l:") > 0);
+ Assert.assertTrue(frame.indexOf("s:") > 0);
+ Assert.assertTrue(frame.indexOf("Hello World") > 0);
+
+// System.out.println("out: "+frame);
+
+ frame =
+ "DISCONNECT\n" +
+ "\n\n" +
+ Stomp.NULL;
+ sendFrame(frame);
+ }
+
+ public void _testMessagesAreInOrder() throws Exception {
+ int ctr = 10;
+ String[] data = new String[ctr];
+
+ String frame =
+ "CONNECT\n" +
+ "login: brianm\n" +
+ "passcode: wombats\n\n" +
+ Stomp.NULL;
+ sendFrame(frame);
+
+ frame = receiveFrame(100000);
+ Assert.assertTrue(frame.startsWith("CONNECTED"));
+
+ frame =
+ "SUBSCRIBE\n" +
+ "destination:/queue/" + getQueueName() + "\n"
+
+ "ack:auto\n\n" +
+ Stomp.NULL;
+ sendFrame(frame);
+
+ for (int i = 0; i < ctr; ++i) {
+ data[i] = getName() + i;
+ sendMessage(data[i]);
+ }
+
+ for (int i = 0; i < ctr; ++i) {
+ frame = receiveFrame(1000);
+ Assert.assertTrue("Message not in order", frame.indexOf(data[i])
>= 0);
+ }
+
+ // sleep a while before publishing another set of messages
+ waitForFrameToTakeEffect();
+
+ for (int i = 0; i < ctr; ++i) {
+ data[i] = getName() + ":second:" + i;
+ sendMessage(data[i]);
+ }
+
+ for (int i = 0; i < ctr; ++i) {
+ frame = receiveFrame(1000);
+ Assert.assertTrue("Message not in order", frame.indexOf(data[i])
>= 0);
+ }
+
+ frame =
+ "DISCONNECT\n" +
+ "\n\n" +
+ Stomp.NULL;
+ sendFrame(frame);
+ }
+
+ public void _testSubscribeWithAutoAckAndSelector() throws Exception {
+
+ String frame =
+ "CONNECT\n" +
+ "login: brianm\n" +
+ "passcode: wombats\n\n" +
+ Stomp.NULL;
+ sendFrame(frame);
+
+ frame = receiveFrame(100000);
+ Assert.assertTrue(frame.startsWith("CONNECTED"));
+
+ frame =
+ "SUBSCRIBE\n" +
+ "destination:/queue/" + getQueueName() + "\n"
+
+ "selector: foo = 'zzz'\n" +
+ "ack:auto\n\n" +
+ Stomp.NULL;
+ sendFrame(frame);
+
+ sendMessage("Ignored message", "foo", "1234");
+ sendMessage("Real message", "foo", "zzz");
+
+ frame = receiveFrame(10000);
+ Assert.assertTrue(frame.startsWith("MESSAGE"));
+ Assert.assertTrue("Should have received the real message but got: " +
frame, frame.indexOf("Real message") > 0);
+
+ frame =
+ "DISCONNECT\n" +
+ "\n\n" +
+ Stomp.NULL;
+ sendFrame(frame);
+ }
+
+ public void _testSubscribeWithClientAck() throws Exception {
+
+ String frame =
+ "CONNECT\n" +
+ "login: brianm\n" +
+ "passcode: wombats\n\n" +
+ Stomp.NULL;
+ sendFrame(frame);
+
+ frame = receiveFrame(10000);
+ Assert.assertTrue(frame.startsWith("CONNECTED"));
+
+ frame =
+ "SUBSCRIBE\n" +
+ "destination:/queue/" + getQueueName() + "\n"
+
+ "ack:client\n\n" +
+ Stomp.NULL;
+
+ sendFrame(frame);
+ sendMessage(getName());
+ frame = receiveFrame(10000);
+ Assert.assertTrue(frame.startsWith("MESSAGE"));
+
+ frame =
+ "DISCONNECT\n" +
+ "\n\n" +
+ Stomp.NULL;
+ sendFrame(frame);
+
+ // message should be received since message was not acknowledged
+ MessageConsumer consumer = session.createConsumer(queue);
+ TextMessage message = (TextMessage) consumer.receive(1000);
+ Assert.assertNotNull(message);
+ Assert.assertTrue(message.getJMSRedelivered());
+ }
+
+ public void
_testSubscribeWithClientAckThenConsumingAgainWithAutoAckWithNoDisconnectFrame() throws
Exception {
+ assertSubscribeWithClientAckThenConsumeWithAutoAck(false);
+ }
+
+ public void
_testSubscribeWithClientAckThenConsumingAgainWithAutoAckWithExplicitDisconnect() throws
Exception {
+ assertSubscribeWithClientAckThenConsumeWithAutoAck(true);
+ }
+
+ protected void assertSubscribeWithClientAckThenConsumeWithAutoAck(boolean
sendDisconnect) throws Exception {
+
+ String frame =
+ "CONNECT\n" +
+ "login: brianm\n" +
+ "passcode: wombats\n\n" +
+ Stomp.NULL;
+ sendFrame(frame);
+
+ frame = receiveFrame(10000);
+ Assert.assertTrue(frame.startsWith("CONNECTED"));
+
+ frame =
+ "SUBSCRIBE\n" +
+ "destination:/queue/" + getQueueName() + "\n"
+
+ "ack:client\n\n" +
+ Stomp.NULL;
+
+ sendFrame(frame);
+ sendMessage(getName());
+
+ frame = receiveFrame(10000);
+ Assert.assertTrue(frame.startsWith("MESSAGE"));
+
+ log.info("Reconnecting!");
+
+ if (sendDisconnect) {
+ frame =
+ "DISCONNECT\n" +
+ "\n\n" +
+ Stomp.NULL;
+ sendFrame(frame);
+ reconnect();
+ }
+ else {
+ reconnect(1000);
+ }
+
+
+ // message should be received since message was not acknowledged
+ frame =
+ "CONNECT\n" +
+ "login: brianm\n" +
+ "passcode: wombats\n\n" +
+ Stomp.NULL;
+ sendFrame(frame);
+
+ frame = receiveFrame(10000);
+ Assert.assertTrue(frame.startsWith("CONNECTED"));
+
+ frame =
+ "SUBSCRIBE\n" +
+ "destination:/queue/" + getQueueName() +
"\n\n" +
+ Stomp.NULL;
+
+ sendFrame(frame);
+
+ frame = receiveFrame(10000);
+ Assert.assertTrue(frame.startsWith("MESSAGE"));
+
+ frame =
+ "DISCONNECT\n" +
+ "\n\n" +
+ Stomp.NULL;
+ sendFrame(frame);
+
+ // now lets make sure we don't see the message again
+ reconnect();
+
+ frame =
+ "CONNECT\n" +
+ "login: brianm\n" +
+ "passcode: wombats\n\n" +
+ Stomp.NULL;
+ sendFrame(frame);
+
+ frame = receiveFrame(10000);
+ Assert.assertTrue(frame.startsWith("CONNECTED"));
+
+ frame =
+ "SUBSCRIBE\n" +
+ "destination:/queue/" + getQueueName() +
"\n\n" +
+ Stomp.NULL;
+
+ sendFrame(frame);
+ sendMessage("shouldBeNextMessage");
+
+ frame = receiveFrame(10000);
+ Assert.assertTrue(frame.startsWith("MESSAGE"));
+ Assert.assertTrue(frame.contains("shouldBeNextMessage"));
+ }
+
+ public void _testUnsubscribe() throws Exception {
+
+ String frame =
+ "CONNECT\n" +
+ "login: brianm\n" +
+ "passcode: wombats\n\n" +
+ Stomp.NULL;
+ sendFrame(frame);
+ frame = receiveFrame(100000);
+ Assert.assertTrue(frame.startsWith("CONNECTED"));
+
+ frame =
+ "SUBSCRIBE\n" +
+ "destination:/queue/" + getQueueName() + "\n"
+
+ "ack:auto\n\n" +
+ Stomp.NULL;
+ sendFrame(frame);
+
+ //send a message to our queue
+ sendMessage("first message");
+
+ //receive message from socket
+ frame = receiveFrame(1000);
+ Assert.assertTrue(frame.startsWith("MESSAGE"));
+
+ //remove suscription
+ frame =
+ "UNSUBSCRIBE\n" +
+ "destination:/queue/" + getQueueName() + "\n"
+
+ "\n\n" +
+ Stomp.NULL;
+ sendFrame(frame);
+
+ waitForFrameToTakeEffect();
+
+ //send a message to our queue
+ sendMessage("second message");
+
+ try {
+ frame = receiveFrame(1000);
+ log.info("Received frame: " + frame);
+ Assert.fail("No message should have been received since subscription was
removed");
+ }
+ catch (SocketTimeoutException e) {
+
+ }
+ }
+
+ public void _testTransactionCommit() throws Exception {
+ MessageConsumer consumer = session.createConsumer(queue);
+
+ String frame =
+ "CONNECT\n" +
+ "login: brianm\n" +
+ "passcode: wombats\n\n" +
+ Stomp.NULL;
+ sendFrame(frame);
+
+ String f = receiveFrame(1000);
+ Assert.assertTrue(f.startsWith("CONNECTED"));
+
+ frame =
+ "BEGIN\n" +
+ "transaction: tx1\n" +
+ "\n\n" +
+ Stomp.NULL;
+ sendFrame(frame);
+
+ frame =
+ "SEND\n" +
+ "destination:/queue/" + getQueueName() + "\n"
+
+ "transaction: tx1\n" +
+ "\n\n" +
+ "Hello World" +
+ Stomp.NULL;
+ sendFrame(frame);
+
+ waitForFrameToTakeEffect();
+ // check the message is not committed
+ assertNull(consumer.receive(100));
+
+ frame =
+ "COMMIT\n" +
+ "transaction: tx1\n" +
+ "\n\n" +
+ Stomp.NULL;
+ sendFrame(frame);
+
+ waitForFrameToTakeEffect();
+
+ TextMessage message = (TextMessage) consumer.receive(1000);
+ Assert.assertNotNull("Should have received a message", message);
+ }
+
+ public void _testTransactionRollback() throws Exception {
+ MessageConsumer consumer = session.createConsumer(queue);
+
+ String frame =
+ "CONNECT\n" +
+ "login: brianm\n" +
+ "passcode: wombats\n\n" +
+ Stomp.NULL;
+ sendFrame(frame);
+
+ String f = receiveFrame(1000);
+ Assert.assertTrue(f.startsWith("CONNECTED"));
+
+ frame =
+ "BEGIN\n" +
+ "transaction: tx1\n" +
+ "\n\n" +
+ Stomp.NULL;
+ sendFrame(frame);
+
+ frame =
+ "SEND\n" +
+ "destination:/queue/" + getQueueName() + "\n"
+
+ "transaction: tx1\n" +
+ "\n" +
+ "first message" +
+ Stomp.NULL;
+ sendFrame(frame);
+
+ //rollback first message
+ frame =
+ "ABORT\n" +
+ "transaction: tx1\n" +
+ "\n\n" +
+ Stomp.NULL;
+ sendFrame(frame);
+
+ frame =
+ "BEGIN\n" +
+ "transaction: tx1\n" +
+ "\n\n" +
+ Stomp.NULL;
+ sendFrame(frame);
+
+ frame =
+ "SEND\n" +
+ "destination:/queue/" + getQueueName() + "\n"
+
+ "transaction: tx1\n" +
+ "\n" +
+ "second message" +
+ Stomp.NULL;
+ sendFrame(frame);
+
+ frame =
+ "COMMIT\n" +
+ "transaction: tx1\n" +
+ "\n\n" +
+ Stomp.NULL;
+ sendFrame(frame);
+
+ // This test case is currently failing
+ waitForFrameToTakeEffect();
+
+ //only second msg should be received since first msg was rolled back
+ TextMessage message = (TextMessage) consumer.receive(1000);
+ Assert.assertNotNull(message);
+ Assert.assertEquals("second message", message.getText().trim());
+ }
+
+ // Implementation methods
+ //-------------------------------------------------------------------------
+ protected void setUp() throws Exception {
+ server = createServer();
+ server.start();
+ connectionFactory = createConnectionFactory();
+
+ stompSocket = createSocket();
+ inputBuffer = new ByteArrayOutputStream();
+
+ connection = connectionFactory.createConnection();
+ session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ queue = session.createQueue(getQueueName());
+ connection.start();
+ }
+
+ /**
+ * @return
+ * @throws Exception
+ */
+ private JMSServerManager createServer() throws Exception
+ {
+ Configuration config = new ConfigurationImpl();
+ config.setSecurityEnabled(false);
+ config.setPersistenceEnabled(false);
+
+ Map<String, Object> params = new HashMap<String, Object>();
+ params.put(TransportConstants.PROTOCOL_PROP_NAME, ProtocolType.STOMP.toString());
+ params.put(TransportConstants.PORT_PROP_NAME,
TransportConstants.DEFAULT_STOMP_PORT);
+ TransportConfiguration stompTransport = new
TransportConfiguration(NettyAcceptorFactory.class.getName(), params);
+ config.getAcceptorConfigurations().add(stompTransport);
+ config.getAcceptorConfigurations().add(new
TransportConfiguration(InVMAcceptorFactory.class.getName()));
+ HornetQServer hornetQServer = HornetQServers.newHornetQServer(config);
+
+ JMSConfiguration jmsConfig = new JMSConfigurationImpl();
+ jmsConfig.getQueueConfigurations().add(new QueueConfigurationImpl(getQueueName(),
null, false, getQueueName()));
+ server = new JMSServerManagerImpl(hornetQServer, jmsConfig);
+ server.setContext(null);
+ return server;
+ }
+
+ protected void tearDown() throws Exception {
+ connection.close();
+ if (stompSocket != null) {
+ stompSocket.close();
+ }
+ server.stop();
+ }
+
+ protected void reconnect() throws Exception {
+ reconnect(0);
+ }
+ protected void reconnect(long sleep) throws Exception {
+ stompSocket.close();
+
+ if (sleep > 0) {
+ Thread.sleep(sleep);
+ }
+
+ stompSocket = createSocket();
+ inputBuffer = new ByteArrayOutputStream();
+ }
+
+ protected ConnectionFactory createConnectionFactory() {
+ return new HornetQConnectionFactory(new
TransportConfiguration(InVMConnectorFactory.class.getName()));
+ }
+
+ protected Socket createSocket() throws IOException {
+ return new Socket("127.0.0.1", port);
+ }
+
+ protected String getQueueName() {
+ return "test";
+ }
+
+ public void sendFrame(String data) throws Exception {
+ byte[] bytes = data.getBytes("UTF-8");
+ OutputStream outputStream = stompSocket.getOutputStream();
+ for (int i = 0; i < bytes.length; i++) {
+ outputStream.write(bytes[i]);
+ }
+ outputStream.flush();
+ }
+
+ public String receiveFrame(long timeOut) throws Exception {
+ stompSocket.setSoTimeout((int) timeOut);
+ InputStream is = stompSocket.getInputStream();
+ int c = 0;
+ for (; ;) {
+ c = is.read();
+ if (c < 0) {
+ throw new IOException("socket closed.");
+ }
+ else if (c == 0) {
+ c = is.read();
+ if (c != '\n')
+ {
+ byte[] ba = inputBuffer.toByteArray();
+ System.out.println(new String(ba, "UTF-8"));
+ }
+ Assert.assertEquals("Expecting stomp frame to terminate with
\0\n", c, '\n');
+ byte[] ba = inputBuffer.toByteArray();
+ inputBuffer.reset();
+ return new String(ba, "UTF-8");
+ }
+ else {
+ inputBuffer.write(c);
+ }
+ }
+ }
+
+ public void sendMessage(String msg) throws Exception {
+ sendMessage(msg, "foo", "xyz");
+ }
+
+ public void sendMessage(String msg, String propertyName, String propertyValue) throws
JMSException {
+ MessageProducer producer = session.createProducer(queue);
+ TextMessage message = session.createTextMessage(msg);
+ message.setStringProperty(propertyName, propertyValue);
+ producer.send(message);
+ }
+
+ public void sendBytesMessage(byte[] msg) throws Exception {
+ MessageProducer producer = session.createProducer(queue);
+ BytesMessage message = session.createBytesMessage();
+ message.writeBytes(msg);
+ producer.send(message);
+ }
+
+ protected void waitForFrameToTakeEffect() throws InterruptedException {
+ // bit of a dirty hack :)
+ // another option would be to force some kind of receipt to be returned
+ // from the frame
+ Thread.sleep(2000);
+ }
+}
Modified:
trunk/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyAcceptorFactoryTest.java
===================================================================
---
trunk/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyAcceptorFactoryTest.java 2010-01-22
14:25:11 UTC (rev 8842)
+++
trunk/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyAcceptorFactoryTest.java 2010-01-22
15:32:13 UTC (rev 8843)
@@ -73,8 +73,7 @@
null,
listener,
Executors.newCachedThreadPool(),
-
Executors.newScheduledThreadPool(ConfigurationImpl.DEFAULT_SCHEDULED_THREAD_POOL_MAX_SIZE),
- ProtocolType.CORE);
+
Executors.newScheduledThreadPool(ConfigurationImpl.DEFAULT_SCHEDULED_THREAD_POOL_MAX_SIZE));
Assert.assertTrue(acceptor instanceof NettyAcceptor);
}
Modified:
trunk/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyAcceptorTest.java
===================================================================
---
trunk/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyAcceptorTest.java 2010-01-22
14:25:11 UTC (rev 8842)
+++
trunk/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyAcceptorTest.java 2010-01-22
15:32:13 UTC (rev 8843)
@@ -86,8 +86,7 @@
null,
listener,
Executors.newCachedThreadPool(),
-
Executors.newScheduledThreadPool(ConfigurationImpl.DEFAULT_SCHEDULED_THREAD_POOL_MAX_SIZE),
- ProtocolType.CORE);
+
Executors.newScheduledThreadPool(ConfigurationImpl.DEFAULT_SCHEDULED_THREAD_POOL_MAX_SIZE));
acceptor.start();
Assert.assertTrue(acceptor.isStarted());