Author: jmesnil
Date: 2010-01-19 11:04:31 -0500 (Tue, 19 Jan 2010)
New Revision: 8807
Added:
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/remoting/PacketDecoder.java
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/remoting/impl/CorePacketDecoder.java
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/stomp/
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/stomp/ProtocolConverter.java
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/stomp/ProtocolException.java
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/stomp/Stomp.java
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/stomp/StompFrame.java
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/stomp/StompFrameDelimiter.java
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/stomp/StompFrameError.java
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/stomp/StompMarshaller.java
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/stomp/StompPacketDecoder.java
branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/integration/stomp/
branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/integration/stomp/StompTest.java
branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/integration/stomp/StompTest2.java
Removed:
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/remoting/impl/PacketDecoder.java
Modified:
branches/HORNETQ-129_STOMP_protocol/.classpath
branches/HORNETQ-129_STOMP_protocol/build-hornetq.xml
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/client/impl/FailoverManagerImpl.java
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/remoting/impl/RemotingConnectionImpl.java
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/remoting/impl/invm/InVMConnection.java
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/ChannelPipelineSupport.java
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/HornetQChannelHandler.java
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/NettyAcceptor.java
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/NettyConnection.java
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/NettyConnector.java
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/TransportConstants.java
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/spi/core/remoting/BufferHandler.java
branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/integration/http/NettyHttpTest.java
branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyAcceptorFactoryTest.java
branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyAcceptorTest.java
branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyConnectorTest.java
Log:
https://jira.jboss.org/jira/browse/HORNETQ-129: Implement STOMP v1.0
* incomplete prototype
* copied StompTest from StompConnect project and adapt it to use HornetQ server
Modified: branches/HORNETQ-129_STOMP_protocol/.classpath
===================================================================
--- branches/HORNETQ-129_STOMP_protocol/.classpath 2010-01-19 15:48:47 UTC (rev 8806)
+++ branches/HORNETQ-129_STOMP_protocol/.classpath 2010-01-19 16:04:31 UTC (rev 8807)
@@ -100,7 +100,7 @@
<classpathentry kind="con"
path="org.eclipse.jdt.launching.JRE_CONTAINER"/>
<classpathentry kind="lib" path="tests/tmpfiles"/>
<classpathentry kind="lib"
path="thirdparty/net/java/dev/javacc/lib/javacc.jar"/>
- <classpathentry kind="lib"
path="thirdparty/org/jboss/netty/lib/netty.jar"/>
+ <classpathentry kind="lib"
path="thirdparty/org/jboss/netty/lib/netty.jar"
sourcepath="/Users/jmesnil/.m2/repository/org/jboss/netty/netty/3.1.5.GA/netty-3.1.5.GA-sources.jar"/>
<classpathentry kind="lib"
path="thirdparty/log4j/lib/log4j.jar"/>
<classpathentry kind="lib"
path="thirdparty/org/jboss/naming/lib/jnpserver.jar"/>
<classpathentry kind="lib"
path="thirdparty/org/jboss/security/lib/jbosssx.jar"/>
Modified: branches/HORNETQ-129_STOMP_protocol/build-hornetq.xml
===================================================================
--- branches/HORNETQ-129_STOMP_protocol/build-hornetq.xml 2010-01-19 15:48:47 UTC (rev
8806)
+++ branches/HORNETQ-129_STOMP_protocol/build-hornetq.xml 2010-01-19 16:04:31 UTC (rev
8807)
@@ -1511,7 +1511,7 @@
<jvmarg value="-Dcom.sun.management.jmxremote"/>
<jvmarg
value="-Djava.util.logging.config.file=${src.config.trunk.non-clustered.dir}/logging.properties"/>
<jvmarg value="-Djava.library.path=${native.bin.dir}"/>
- <!--<jvmarg line="-Xdebug
-Xrunjdwp:transport=dt_socket,server=y,suspend=y,address=8000"/>-->
+ <!-- <jvmarg line="-Xdebug
-Xrunjdwp:transport=dt_socket,server=y,suspend=y,address=8000"/> -->
<arg line="hornetq-beans.xml"/>
<classpath path="${src.config.trunk.non-clustered.dir}" />
<classpath refid="jms.standalone.server.classpath"/>
@@ -1540,23 +1540,20 @@
</target>
<target name="debugServer" depends="jar">
+ <mkdir dir="logs"/>
<java
classname="org.hornetq.integration.bootstrap.HornetQBootstrapServer"
fork="true">
<jvmarg value="-XX:+UseParallelGC"/>
<jvmarg value="-Xms512M"/>
<jvmarg value="-Xmx2048M"/>
<jvmarg value="-XX:+AggressiveOpts"/>
<jvmarg value="-XX:+UseFastAccessorMethods"/>
- <jvmarg value="-Xdebug"/>
- <jvmarg value="-Xnoagent"/>
- <jvmarg value="-Djava.compiler=NONE"/>
<jvmarg value="-Dcom.sun.management.jmxremote"/>
- <jvmarg
value="-Xrunjdwp:transport=dt_socket,server=y,suspend=y,address=8000"/>
- <jvmarg
value="-Djava.util.logging.config.file=${src.config.standalone.non-clustered.dir}/logging.properties"/>
+ <jvmarg
value="-Djava.util.logging.config.file=${src.config.trunk.non-clustered.dir}/logging.properties"/>
<jvmarg value="-Djava.library.path=${native.bin.dir}"/>
- <jvmarg
value="-Djava.naming.factory.initial=org.jnp.interfaces.NamingContextFactory"/>
- <jvmarg
value="-Djava.naming.factory.url.pkgs=org.jboss.naming:org.jnp.interfaces"/>
+ <jvmarg line="-Xdebug
-Xrunjdwp:transport=dt_socket,server=y,suspend=y,address=8000"/>
<arg line="hornetq-beans.xml"/>
- <classpath path="${src.config.standalone.non-clustered.dir}" />
+ <classpath path="${src.config.trunk.non-clustered.dir}" />
+ <classpath refid="jms.standalone.server.classpath"/>
</java>
</target>
Modified:
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/client/impl/FailoverManagerImpl.java
===================================================================
---
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/client/impl/FailoverManagerImpl.java 2010-01-19
15:48:47 UTC (rev 8806)
+++
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/client/impl/FailoverManagerImpl.java 2010-01-19
16:04:31 UTC (rev 8807)
@@ -39,8 +39,10 @@
import org.hornetq.core.remoting.ChannelHandler;
import org.hornetq.core.remoting.FailureListener;
import org.hornetq.core.remoting.Packet;
+import org.hornetq.core.remoting.PacketDecoder;
import org.hornetq.core.remoting.RemotingConnection;
import org.hornetq.core.remoting.impl.AbstractBufferHandler;
+import org.hornetq.core.remoting.impl.CorePacketDecoder;
import org.hornetq.core.remoting.impl.RemotingConnectionImpl;
import org.hornetq.core.remoting.impl.wireformat.CreateSessionMessage;
import org.hornetq.core.remoting.impl.wireformat.CreateSessionResponseMessage;
@@ -529,6 +531,8 @@
private volatile boolean stopPingingAfterOne;
+ private final PacketDecoder decoder = new CorePacketDecoder();
+
public void stopPingingAfterOne()
{
stopPingingAfterOne = true;
@@ -1086,13 +1090,13 @@
private class DelegatingBufferHandler extends AbstractBufferHandler
{
- public void bufferReceived(final Object connectionID, final HornetQBuffer buffer)
+ public void bufferReceived(final Object connectionID, final HornetQBuffer buffer,
final PacketDecoder decoder)
{
RemotingConnection theConn = connection;
if (theConn != null && connectionID == theConn.getID())
{
- theConn.bufferReceived(connectionID, buffer);
+ theConn.bufferReceived(connectionID, buffer, decoder );
}
}
}
Copied:
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/remoting/PacketDecoder.java
(from rev 8798, trunk/src/main/org/hornetq/core/remoting/impl/PacketDecoder.java)
===================================================================
---
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/remoting/PacketDecoder.java
(rev 0)
+++
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/remoting/PacketDecoder.java 2010-01-19
16:04:31 UTC (rev 8807)
@@ -0,0 +1,30 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.remoting;
+
+import org.hornetq.api.core.HornetQBuffer;
+
+/**
+ * A PacketDecoder
+ *
+ * @author jmesnil
+ *
+ *
+ */
+public interface PacketDecoder
+{
+
+ public abstract Packet decode(final HornetQBuffer in);
+
+}
\ No newline at end of file
Copied:
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/remoting/impl/CorePacketDecoder.java
(from rev 8798, trunk/src/main/org/hornetq/core/remoting/impl/PacketDecoder.java)
===================================================================
---
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/remoting/impl/CorePacketDecoder.java
(rev 0)
+++
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/remoting/impl/CorePacketDecoder.java 2010-01-19
16:04:31 UTC (rev 8807)
@@ -0,0 +1,495 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.remoting.impl;
+
+import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.CREATESESSION;
+import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.CREATESESSION_RESP;
+import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.CREATE_QUEUE;
+import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.CREATE_REPLICATION;
+import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.DELETE_QUEUE;
+import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.DISCONNECT;
+import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.EXCEPTION;
+import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.NULL_RESPONSE;
+import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.PACKETS_CONFIRMED;
+import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.PING;
+import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.REATTACH_SESSION;
+import static
org.hornetq.core.remoting.impl.wireformat.PacketImpl.REATTACH_SESSION_RESP;
+import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.REPLICATION_APPEND;
+import static
org.hornetq.core.remoting.impl.wireformat.PacketImpl.REPLICATION_APPEND_TX;
+import static
org.hornetq.core.remoting.impl.wireformat.PacketImpl.REPLICATION_COMMIT_ROLLBACK;
+import static
org.hornetq.core.remoting.impl.wireformat.PacketImpl.REPLICATION_COMPARE_DATA;
+import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.REPLICATION_DELETE;
+import static
org.hornetq.core.remoting.impl.wireformat.PacketImpl.REPLICATION_DELETE_TX;
+import static
org.hornetq.core.remoting.impl.wireformat.PacketImpl.REPLICATION_LARGE_MESSAGE_BEGIN;
+import static
org.hornetq.core.remoting.impl.wireformat.PacketImpl.REPLICATION_LARGE_MESSAGE_END;
+import static
org.hornetq.core.remoting.impl.wireformat.PacketImpl.REPLICATION_LARGE_MESSAGE_WRITE;
+import static
org.hornetq.core.remoting.impl.wireformat.PacketImpl.REPLICATION_PAGE_EVENT;
+import static
org.hornetq.core.remoting.impl.wireformat.PacketImpl.REPLICATION_PAGE_WRITE;
+import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.REPLICATION_PREPARE;
+import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.REPLICATION_RESPONSE;
+import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_ACKNOWLEDGE;
+import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_BINDINGQUERY;
+import static
org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_BINDINGQUERY_RESP;
+import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_CLOSE;
+import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_COMMIT;
+import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_CONSUMER_CLOSE;
+import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_CREATECONSUMER;
+import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_EXPIRED;
+import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_FLOWTOKEN;
+import static
org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_FORCE_CONSUMER_DELIVERY;
+import static
org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_PRODUCER_CREDITS;
+import static
org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_PRODUCER_REQUEST_CREDITS;
+import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_QUEUEQUERY;
+import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_QUEUEQUERY_RESP;
+import static
org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_RECEIVE_CONTINUATION;
+import static
org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_RECEIVE_LARGE_MSG;
+import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_RECEIVE_MSG;
+import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_ROLLBACK;
+import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_SEND;
+import static
org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_SEND_CONTINUATION;
+import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_SEND_LARGE;
+import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_START;
+import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_STOP;
+import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_XA_COMMIT;
+import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_XA_END;
+import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_XA_FORGET;
+import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_XA_GET_TIMEOUT;
+import static
org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_XA_GET_TIMEOUT_RESP;
+import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_XA_INDOUBT_XIDS;
+import static
org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_XA_INDOUBT_XIDS_RESP;
+import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_XA_JOIN;
+import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_XA_PREPARE;
+import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_XA_RESP;
+import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_XA_RESUME;
+import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_XA_ROLLBACK;
+import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_XA_SET_TIMEOUT;
+import static
org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_XA_SET_TIMEOUT_RESP;
+import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_XA_START;
+import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_XA_SUSPEND;
+
+import org.hornetq.api.core.HornetQBuffer;
+import org.hornetq.core.logging.Logger;
+import org.hornetq.core.remoting.Packet;
+import org.hornetq.core.remoting.PacketDecoder;
+import org.hornetq.core.remoting.impl.wireformat.CreateQueueMessage;
+import org.hornetq.core.remoting.impl.wireformat.CreateReplicationSessionMessage;
+import org.hornetq.core.remoting.impl.wireformat.CreateSessionMessage;
+import org.hornetq.core.remoting.impl.wireformat.CreateSessionResponseMessage;
+import org.hornetq.core.remoting.impl.wireformat.HornetQExceptionMessage;
+import org.hornetq.core.remoting.impl.wireformat.NullResponseMessage;
+import org.hornetq.core.remoting.impl.wireformat.PacketImpl;
+import org.hornetq.core.remoting.impl.wireformat.PacketsConfirmedMessage;
+import org.hornetq.core.remoting.impl.wireformat.Ping;
+import org.hornetq.core.remoting.impl.wireformat.ReattachSessionMessage;
+import org.hornetq.core.remoting.impl.wireformat.ReattachSessionResponseMessage;
+import org.hornetq.core.remoting.impl.wireformat.ReplicationAddMessage;
+import org.hornetq.core.remoting.impl.wireformat.ReplicationAddTXMessage;
+import org.hornetq.core.remoting.impl.wireformat.ReplicationCommitMessage;
+import org.hornetq.core.remoting.impl.wireformat.ReplicationCompareDataMessage;
+import org.hornetq.core.remoting.impl.wireformat.ReplicationDeleteMessage;
+import org.hornetq.core.remoting.impl.wireformat.ReplicationDeleteTXMessage;
+import org.hornetq.core.remoting.impl.wireformat.ReplicationLargeMessageBeingMessage;
+import org.hornetq.core.remoting.impl.wireformat.ReplicationLargeMessageWriteMessage;
+import org.hornetq.core.remoting.impl.wireformat.ReplicationLargemessageEndMessage;
+import org.hornetq.core.remoting.impl.wireformat.ReplicationPageEventMessage;
+import org.hornetq.core.remoting.impl.wireformat.ReplicationPageWriteMessage;
+import org.hornetq.core.remoting.impl.wireformat.ReplicationPrepareMessage;
+import org.hornetq.core.remoting.impl.wireformat.ReplicationResponseMessage;
+import org.hornetq.core.remoting.impl.wireformat.RollbackMessage;
+import org.hornetq.core.remoting.impl.wireformat.SessionAcknowledgeMessage;
+import org.hornetq.core.remoting.impl.wireformat.SessionBindingQueryMessage;
+import org.hornetq.core.remoting.impl.wireformat.SessionBindingQueryResponseMessage;
+import org.hornetq.core.remoting.impl.wireformat.SessionCloseMessage;
+import org.hornetq.core.remoting.impl.wireformat.SessionCommitMessage;
+import org.hornetq.core.remoting.impl.wireformat.SessionConsumerCloseMessage;
+import org.hornetq.core.remoting.impl.wireformat.SessionConsumerFlowCreditMessage;
+import org.hornetq.core.remoting.impl.wireformat.SessionCreateConsumerMessage;
+import org.hornetq.core.remoting.impl.wireformat.SessionDeleteQueueMessage;
+import org.hornetq.core.remoting.impl.wireformat.SessionExpiredMessage;
+import org.hornetq.core.remoting.impl.wireformat.SessionForceConsumerDelivery;
+import org.hornetq.core.remoting.impl.wireformat.SessionProducerCreditsMessage;
+import org.hornetq.core.remoting.impl.wireformat.SessionQueueQueryMessage;
+import org.hornetq.core.remoting.impl.wireformat.SessionQueueQueryResponseMessage;
+import org.hornetq.core.remoting.impl.wireformat.SessionReceiveContinuationMessage;
+import org.hornetq.core.remoting.impl.wireformat.SessionReceiveLargeMessage;
+import org.hornetq.core.remoting.impl.wireformat.SessionReceiveMessage;
+import org.hornetq.core.remoting.impl.wireformat.SessionRequestProducerCreditsMessage;
+import org.hornetq.core.remoting.impl.wireformat.SessionSendContinuationMessage;
+import org.hornetq.core.remoting.impl.wireformat.SessionSendLargeMessage;
+import org.hornetq.core.remoting.impl.wireformat.SessionSendMessage;
+import org.hornetq.core.remoting.impl.wireformat.SessionXACommitMessage;
+import org.hornetq.core.remoting.impl.wireformat.SessionXAEndMessage;
+import org.hornetq.core.remoting.impl.wireformat.SessionXAForgetMessage;
+import org.hornetq.core.remoting.impl.wireformat.SessionXAGetInDoubtXidsResponseMessage;
+import org.hornetq.core.remoting.impl.wireformat.SessionXAGetTimeoutResponseMessage;
+import org.hornetq.core.remoting.impl.wireformat.SessionXAJoinMessage;
+import org.hornetq.core.remoting.impl.wireformat.SessionXAPrepareMessage;
+import org.hornetq.core.remoting.impl.wireformat.SessionXAResponseMessage;
+import org.hornetq.core.remoting.impl.wireformat.SessionXAResumeMessage;
+import org.hornetq.core.remoting.impl.wireformat.SessionXARollbackMessage;
+import org.hornetq.core.remoting.impl.wireformat.SessionXASetTimeoutMessage;
+import org.hornetq.core.remoting.impl.wireformat.SessionXASetTimeoutResponseMessage;
+import org.hornetq.core.remoting.impl.wireformat.SessionXAStartMessage;
+
+/**
+ * A CorePacketDecoder
+ *
+ * @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
+ *
+ *
+ */
+public class CorePacketDecoder implements PacketDecoder
+{
+ private static final Logger log = Logger.getLogger(CorePacketDecoder.class);
+
+ public Packet decode(final HornetQBuffer in)
+ {
+ final byte packetType = in.readByte();
+
+ Packet packet;
+
+ switch (packetType)
+ {
+ case PING:
+ {
+ packet = new Ping();
+ break;
+ }
+ case DISCONNECT:
+ {
+ packet = new PacketImpl(PacketImpl.DISCONNECT);
+ break;
+ }
+ case EXCEPTION:
+ {
+ packet = new HornetQExceptionMessage();
+ break;
+ }
+ case PACKETS_CONFIRMED:
+ {
+ packet = new PacketsConfirmedMessage();
+ break;
+ }
+ case CREATESESSION:
+ {
+ packet = new CreateSessionMessage();
+ break;
+ }
+ case CREATESESSION_RESP:
+ {
+ packet = new CreateSessionResponseMessage();
+ break;
+ }
+ case REATTACH_SESSION:
+ {
+ packet = new ReattachSessionMessage();
+ break;
+ }
+ case REATTACH_SESSION_RESP:
+ {
+ packet = new ReattachSessionResponseMessage();
+ break;
+ }
+ case SESS_CLOSE:
+ {
+ packet = new SessionCloseMessage();
+ break;
+ }
+ case SESS_CREATECONSUMER:
+ {
+ packet = new SessionCreateConsumerMessage();
+ break;
+ }
+ case SESS_ACKNOWLEDGE:
+ {
+ packet = new SessionAcknowledgeMessage();
+ break;
+ }
+ case SESS_EXPIRED:
+ {
+ packet = new SessionExpiredMessage();
+ break;
+ }
+ case SESS_COMMIT:
+ {
+ packet = new SessionCommitMessage();
+ break;
+ }
+ case SESS_ROLLBACK:
+ {
+ packet = new RollbackMessage();
+ break;
+ }
+ case SESS_QUEUEQUERY:
+ {
+ packet = new SessionQueueQueryMessage();
+ break;
+ }
+ case SESS_QUEUEQUERY_RESP:
+ {
+ packet = new SessionQueueQueryResponseMessage();
+ break;
+ }
+ case CREATE_QUEUE:
+ {
+ packet = new CreateQueueMessage();
+ break;
+ }
+ case DELETE_QUEUE:
+ {
+ packet = new SessionDeleteQueueMessage();
+ break;
+ }
+ case SESS_BINDINGQUERY:
+ {
+ packet = new SessionBindingQueryMessage();
+ break;
+ }
+ case SESS_BINDINGQUERY_RESP:
+ {
+ packet = new SessionBindingQueryResponseMessage();
+ break;
+ }
+ case SESS_XA_START:
+ {
+ packet = new SessionXAStartMessage();
+ break;
+ }
+ case SESS_XA_END:
+ {
+ packet = new SessionXAEndMessage();
+ break;
+ }
+ case SESS_XA_COMMIT:
+ {
+ packet = new SessionXACommitMessage();
+ break;
+ }
+ case SESS_XA_PREPARE:
+ {
+ packet = new SessionXAPrepareMessage();
+ break;
+ }
+ case SESS_XA_RESP:
+ {
+ packet = new SessionXAResponseMessage();
+ break;
+ }
+ case SESS_XA_ROLLBACK:
+ {
+ packet = new SessionXARollbackMessage();
+ break;
+ }
+ case SESS_XA_JOIN:
+ {
+ packet = new SessionXAJoinMessage();
+ break;
+ }
+ case SESS_XA_SUSPEND:
+ {
+ packet = new PacketImpl(PacketImpl.SESS_XA_SUSPEND);
+ break;
+ }
+ case SESS_XA_RESUME:
+ {
+ packet = new SessionXAResumeMessage();
+ break;
+ }
+ case SESS_XA_FORGET:
+ {
+ packet = new SessionXAForgetMessage();
+ break;
+ }
+ case SESS_XA_INDOUBT_XIDS:
+ {
+ packet = new PacketImpl(PacketImpl.SESS_XA_INDOUBT_XIDS);
+ break;
+ }
+ case SESS_XA_INDOUBT_XIDS_RESP:
+ {
+ packet = new SessionXAGetInDoubtXidsResponseMessage();
+ break;
+ }
+ case SESS_XA_SET_TIMEOUT:
+ {
+ packet = new SessionXASetTimeoutMessage();
+ break;
+ }
+ case SESS_XA_SET_TIMEOUT_RESP:
+ {
+ packet = new SessionXASetTimeoutResponseMessage();
+ break;
+ }
+ case SESS_XA_GET_TIMEOUT:
+ {
+ packet = new PacketImpl(PacketImpl.SESS_XA_GET_TIMEOUT);
+ break;
+ }
+ case SESS_XA_GET_TIMEOUT_RESP:
+ {
+ packet = new SessionXAGetTimeoutResponseMessage();
+ break;
+ }
+ case SESS_START:
+ {
+ packet = new PacketImpl(PacketImpl.SESS_START);
+ break;
+ }
+ case SESS_STOP:
+ {
+ packet = new PacketImpl(PacketImpl.SESS_STOP);
+ break;
+ }
+ case SESS_FLOWTOKEN:
+ {
+ packet = new SessionConsumerFlowCreditMessage();
+ break;
+ }
+ case SESS_SEND:
+ {
+ packet = new SessionSendMessage();
+ break;
+ }
+ case SESS_SEND_LARGE:
+ {
+ packet = new SessionSendLargeMessage();
+ break;
+ }
+ case SESS_RECEIVE_MSG:
+ {
+ packet = new SessionReceiveMessage();
+ break;
+ }
+ case SESS_RECEIVE_LARGE_MSG:
+ {
+ packet = new SessionReceiveLargeMessage();
+ break;
+ }
+ case SESS_CONSUMER_CLOSE:
+ {
+ packet = new SessionConsumerCloseMessage();
+ break;
+ }
+ case NULL_RESPONSE:
+ {
+ packet = new NullResponseMessage();
+ break;
+ }
+ case SESS_RECEIVE_CONTINUATION:
+ {
+ packet = new SessionReceiveContinuationMessage();
+ break;
+ }
+ case SESS_SEND_CONTINUATION:
+ {
+ packet = new SessionSendContinuationMessage();
+ break;
+ }
+ case SESS_PRODUCER_REQUEST_CREDITS:
+ {
+ packet = new SessionRequestProducerCreditsMessage();
+ break;
+ }
+ case SESS_PRODUCER_CREDITS:
+ {
+ packet = new SessionProducerCreditsMessage();
+ break;
+ }
+ case CREATE_REPLICATION:
+ {
+ packet = new CreateReplicationSessionMessage();
+ break;
+ }
+ case REPLICATION_APPEND:
+ {
+ packet = new ReplicationAddMessage();
+ break;
+ }
+ case REPLICATION_APPEND_TX:
+ {
+ packet = new ReplicationAddTXMessage();
+ break;
+ }
+ case REPLICATION_DELETE:
+ {
+ packet = new ReplicationDeleteMessage();
+ break;
+ }
+ case REPLICATION_DELETE_TX:
+ {
+ packet = new ReplicationDeleteTXMessage();
+ break;
+ }
+ case REPLICATION_PREPARE:
+ {
+ packet = new ReplicationPrepareMessage();
+ break;
+ }
+ case REPLICATION_COMMIT_ROLLBACK:
+ {
+ packet = new ReplicationCommitMessage();
+ break;
+ }
+ case REPLICATION_RESPONSE:
+ {
+ packet = new ReplicationResponseMessage();
+ break;
+ }
+ case REPLICATION_PAGE_WRITE:
+ {
+ packet = new ReplicationPageWriteMessage();
+ break;
+ }
+ case REPLICATION_PAGE_EVENT:
+ {
+ packet = new ReplicationPageEventMessage();
+ break;
+ }
+ case REPLICATION_LARGE_MESSAGE_BEGIN:
+ {
+ packet = new ReplicationLargeMessageBeingMessage();
+ break;
+ }
+ case REPLICATION_LARGE_MESSAGE_END:
+ {
+ packet = new ReplicationLargemessageEndMessage();
+ break;
+ }
+ case REPLICATION_LARGE_MESSAGE_WRITE:
+ {
+ packet = new ReplicationLargeMessageWriteMessage();
+ break;
+ }
+ case REPLICATION_COMPARE_DATA:
+ {
+ packet = new ReplicationCompareDataMessage();
+ break;
+ }
+ case SESS_FORCE_CONSUMER_DELIVERY:
+ {
+ packet = new SessionForceConsumerDelivery();
+ break;
+ }
+ default:
+ {
+ throw new IllegalArgumentException("Invalid type: " + packetType);
+ }
+ }
+
+ packet.decode(in);
+
+ return packet;
+ }
+
+}
Deleted:
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/remoting/impl/PacketDecoder.java
===================================================================
---
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/remoting/impl/PacketDecoder.java 2010-01-19
15:48:47 UTC (rev 8806)
+++
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/remoting/impl/PacketDecoder.java 2010-01-19
16:04:31 UTC (rev 8807)
@@ -1,494 +0,0 @@
-/*
- * Copyright 2009 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied. See the License for the specific language governing
- * permissions and limitations under the License.
- */
-
-package org.hornetq.core.remoting.impl;
-
-import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.CREATESESSION;
-import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.CREATESESSION_RESP;
-import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.CREATE_QUEUE;
-import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.CREATE_REPLICATION;
-import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.DELETE_QUEUE;
-import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.DISCONNECT;
-import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.EXCEPTION;
-import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.NULL_RESPONSE;
-import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.PACKETS_CONFIRMED;
-import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.PING;
-import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.REATTACH_SESSION;
-import static
org.hornetq.core.remoting.impl.wireformat.PacketImpl.REATTACH_SESSION_RESP;
-import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.REPLICATION_APPEND;
-import static
org.hornetq.core.remoting.impl.wireformat.PacketImpl.REPLICATION_APPEND_TX;
-import static
org.hornetq.core.remoting.impl.wireformat.PacketImpl.REPLICATION_COMMIT_ROLLBACK;
-import static
org.hornetq.core.remoting.impl.wireformat.PacketImpl.REPLICATION_COMPARE_DATA;
-import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.REPLICATION_DELETE;
-import static
org.hornetq.core.remoting.impl.wireformat.PacketImpl.REPLICATION_DELETE_TX;
-import static
org.hornetq.core.remoting.impl.wireformat.PacketImpl.REPLICATION_LARGE_MESSAGE_BEGIN;
-import static
org.hornetq.core.remoting.impl.wireformat.PacketImpl.REPLICATION_LARGE_MESSAGE_END;
-import static
org.hornetq.core.remoting.impl.wireformat.PacketImpl.REPLICATION_LARGE_MESSAGE_WRITE;
-import static
org.hornetq.core.remoting.impl.wireformat.PacketImpl.REPLICATION_PAGE_EVENT;
-import static
org.hornetq.core.remoting.impl.wireformat.PacketImpl.REPLICATION_PAGE_WRITE;
-import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.REPLICATION_PREPARE;
-import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.REPLICATION_RESPONSE;
-import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_ACKNOWLEDGE;
-import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_BINDINGQUERY;
-import static
org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_BINDINGQUERY_RESP;
-import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_CLOSE;
-import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_COMMIT;
-import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_CONSUMER_CLOSE;
-import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_CREATECONSUMER;
-import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_EXPIRED;
-import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_FLOWTOKEN;
-import static
org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_FORCE_CONSUMER_DELIVERY;
-import static
org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_PRODUCER_CREDITS;
-import static
org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_PRODUCER_REQUEST_CREDITS;
-import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_QUEUEQUERY;
-import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_QUEUEQUERY_RESP;
-import static
org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_RECEIVE_CONTINUATION;
-import static
org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_RECEIVE_LARGE_MSG;
-import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_RECEIVE_MSG;
-import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_ROLLBACK;
-import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_SEND;
-import static
org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_SEND_CONTINUATION;
-import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_SEND_LARGE;
-import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_START;
-import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_STOP;
-import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_XA_COMMIT;
-import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_XA_END;
-import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_XA_FORGET;
-import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_XA_GET_TIMEOUT;
-import static
org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_XA_GET_TIMEOUT_RESP;
-import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_XA_INDOUBT_XIDS;
-import static
org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_XA_INDOUBT_XIDS_RESP;
-import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_XA_JOIN;
-import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_XA_PREPARE;
-import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_XA_RESP;
-import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_XA_RESUME;
-import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_XA_ROLLBACK;
-import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_XA_SET_TIMEOUT;
-import static
org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_XA_SET_TIMEOUT_RESP;
-import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_XA_START;
-import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_XA_SUSPEND;
-
-import org.hornetq.api.core.HornetQBuffer;
-import org.hornetq.core.logging.Logger;
-import org.hornetq.core.remoting.Packet;
-import org.hornetq.core.remoting.impl.wireformat.CreateQueueMessage;
-import org.hornetq.core.remoting.impl.wireformat.CreateReplicationSessionMessage;
-import org.hornetq.core.remoting.impl.wireformat.CreateSessionMessage;
-import org.hornetq.core.remoting.impl.wireformat.CreateSessionResponseMessage;
-import org.hornetq.core.remoting.impl.wireformat.HornetQExceptionMessage;
-import org.hornetq.core.remoting.impl.wireformat.NullResponseMessage;
-import org.hornetq.core.remoting.impl.wireformat.PacketImpl;
-import org.hornetq.core.remoting.impl.wireformat.PacketsConfirmedMessage;
-import org.hornetq.core.remoting.impl.wireformat.Ping;
-import org.hornetq.core.remoting.impl.wireformat.ReattachSessionMessage;
-import org.hornetq.core.remoting.impl.wireformat.ReattachSessionResponseMessage;
-import org.hornetq.core.remoting.impl.wireformat.ReplicationAddMessage;
-import org.hornetq.core.remoting.impl.wireformat.ReplicationAddTXMessage;
-import org.hornetq.core.remoting.impl.wireformat.ReplicationCommitMessage;
-import org.hornetq.core.remoting.impl.wireformat.ReplicationCompareDataMessage;
-import org.hornetq.core.remoting.impl.wireformat.ReplicationDeleteMessage;
-import org.hornetq.core.remoting.impl.wireformat.ReplicationDeleteTXMessage;
-import org.hornetq.core.remoting.impl.wireformat.ReplicationLargeMessageBeingMessage;
-import org.hornetq.core.remoting.impl.wireformat.ReplicationLargeMessageWriteMessage;
-import org.hornetq.core.remoting.impl.wireformat.ReplicationLargemessageEndMessage;
-import org.hornetq.core.remoting.impl.wireformat.ReplicationPageEventMessage;
-import org.hornetq.core.remoting.impl.wireformat.ReplicationPageWriteMessage;
-import org.hornetq.core.remoting.impl.wireformat.ReplicationPrepareMessage;
-import org.hornetq.core.remoting.impl.wireformat.ReplicationResponseMessage;
-import org.hornetq.core.remoting.impl.wireformat.RollbackMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionAcknowledgeMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionBindingQueryMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionBindingQueryResponseMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionCloseMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionCommitMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionConsumerCloseMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionConsumerFlowCreditMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionCreateConsumerMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionDeleteQueueMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionExpiredMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionForceConsumerDelivery;
-import org.hornetq.core.remoting.impl.wireformat.SessionProducerCreditsMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionQueueQueryMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionQueueQueryResponseMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionReceiveContinuationMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionReceiveLargeMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionReceiveMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionRequestProducerCreditsMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionSendContinuationMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionSendLargeMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionSendMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionXACommitMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionXAEndMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionXAForgetMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionXAGetInDoubtXidsResponseMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionXAGetTimeoutResponseMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionXAJoinMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionXAPrepareMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionXAResponseMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionXAResumeMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionXARollbackMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionXASetTimeoutMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionXASetTimeoutResponseMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionXAStartMessage;
-
-/**
- * A PacketDecoder
- *
- * @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
- *
- *
- */
-public class PacketDecoder
-{
- private static final Logger log = Logger.getLogger(PacketDecoder.class);
-
- public Packet decode(final HornetQBuffer in)
- {
- final byte packetType = in.readByte();
-
- Packet packet;
-
- switch (packetType)
- {
- case PING:
- {
- packet = new Ping();
- break;
- }
- case DISCONNECT:
- {
- packet = new PacketImpl(PacketImpl.DISCONNECT);
- break;
- }
- case EXCEPTION:
- {
- packet = new HornetQExceptionMessage();
- break;
- }
- case PACKETS_CONFIRMED:
- {
- packet = new PacketsConfirmedMessage();
- break;
- }
- case CREATESESSION:
- {
- packet = new CreateSessionMessage();
- break;
- }
- case CREATESESSION_RESP:
- {
- packet = new CreateSessionResponseMessage();
- break;
- }
- case REATTACH_SESSION:
- {
- packet = new ReattachSessionMessage();
- break;
- }
- case REATTACH_SESSION_RESP:
- {
- packet = new ReattachSessionResponseMessage();
- break;
- }
- case SESS_CLOSE:
- {
- packet = new SessionCloseMessage();
- break;
- }
- case SESS_CREATECONSUMER:
- {
- packet = new SessionCreateConsumerMessage();
- break;
- }
- case SESS_ACKNOWLEDGE:
- {
- packet = new SessionAcknowledgeMessage();
- break;
- }
- case SESS_EXPIRED:
- {
- packet = new SessionExpiredMessage();
- break;
- }
- case SESS_COMMIT:
- {
- packet = new SessionCommitMessage();
- break;
- }
- case SESS_ROLLBACK:
- {
- packet = new RollbackMessage();
- break;
- }
- case SESS_QUEUEQUERY:
- {
- packet = new SessionQueueQueryMessage();
- break;
- }
- case SESS_QUEUEQUERY_RESP:
- {
- packet = new SessionQueueQueryResponseMessage();
- break;
- }
- case CREATE_QUEUE:
- {
- packet = new CreateQueueMessage();
- break;
- }
- case DELETE_QUEUE:
- {
- packet = new SessionDeleteQueueMessage();
- break;
- }
- case SESS_BINDINGQUERY:
- {
- packet = new SessionBindingQueryMessage();
- break;
- }
- case SESS_BINDINGQUERY_RESP:
- {
- packet = new SessionBindingQueryResponseMessage();
- break;
- }
- case SESS_XA_START:
- {
- packet = new SessionXAStartMessage();
- break;
- }
- case SESS_XA_END:
- {
- packet = new SessionXAEndMessage();
- break;
- }
- case SESS_XA_COMMIT:
- {
- packet = new SessionXACommitMessage();
- break;
- }
- case SESS_XA_PREPARE:
- {
- packet = new SessionXAPrepareMessage();
- break;
- }
- case SESS_XA_RESP:
- {
- packet = new SessionXAResponseMessage();
- break;
- }
- case SESS_XA_ROLLBACK:
- {
- packet = new SessionXARollbackMessage();
- break;
- }
- case SESS_XA_JOIN:
- {
- packet = new SessionXAJoinMessage();
- break;
- }
- case SESS_XA_SUSPEND:
- {
- packet = new PacketImpl(PacketImpl.SESS_XA_SUSPEND);
- break;
- }
- case SESS_XA_RESUME:
- {
- packet = new SessionXAResumeMessage();
- break;
- }
- case SESS_XA_FORGET:
- {
- packet = new SessionXAForgetMessage();
- break;
- }
- case SESS_XA_INDOUBT_XIDS:
- {
- packet = new PacketImpl(PacketImpl.SESS_XA_INDOUBT_XIDS);
- break;
- }
- case SESS_XA_INDOUBT_XIDS_RESP:
- {
- packet = new SessionXAGetInDoubtXidsResponseMessage();
- break;
- }
- case SESS_XA_SET_TIMEOUT:
- {
- packet = new SessionXASetTimeoutMessage();
- break;
- }
- case SESS_XA_SET_TIMEOUT_RESP:
- {
- packet = new SessionXASetTimeoutResponseMessage();
- break;
- }
- case SESS_XA_GET_TIMEOUT:
- {
- packet = new PacketImpl(PacketImpl.SESS_XA_GET_TIMEOUT);
- break;
- }
- case SESS_XA_GET_TIMEOUT_RESP:
- {
- packet = new SessionXAGetTimeoutResponseMessage();
- break;
- }
- case SESS_START:
- {
- packet = new PacketImpl(PacketImpl.SESS_START);
- break;
- }
- case SESS_STOP:
- {
- packet = new PacketImpl(PacketImpl.SESS_STOP);
- break;
- }
- case SESS_FLOWTOKEN:
- {
- packet = new SessionConsumerFlowCreditMessage();
- break;
- }
- case SESS_SEND:
- {
- packet = new SessionSendMessage();
- break;
- }
- case SESS_SEND_LARGE:
- {
- packet = new SessionSendLargeMessage();
- break;
- }
- case SESS_RECEIVE_MSG:
- {
- packet = new SessionReceiveMessage();
- break;
- }
- case SESS_RECEIVE_LARGE_MSG:
- {
- packet = new SessionReceiveLargeMessage();
- break;
- }
- case SESS_CONSUMER_CLOSE:
- {
- packet = new SessionConsumerCloseMessage();
- break;
- }
- case NULL_RESPONSE:
- {
- packet = new NullResponseMessage();
- break;
- }
- case SESS_RECEIVE_CONTINUATION:
- {
- packet = new SessionReceiveContinuationMessage();
- break;
- }
- case SESS_SEND_CONTINUATION:
- {
- packet = new SessionSendContinuationMessage();
- break;
- }
- case SESS_PRODUCER_REQUEST_CREDITS:
- {
- packet = new SessionRequestProducerCreditsMessage();
- break;
- }
- case SESS_PRODUCER_CREDITS:
- {
- packet = new SessionProducerCreditsMessage();
- break;
- }
- case CREATE_REPLICATION:
- {
- packet = new CreateReplicationSessionMessage();
- break;
- }
- case REPLICATION_APPEND:
- {
- packet = new ReplicationAddMessage();
- break;
- }
- case REPLICATION_APPEND_TX:
- {
- packet = new ReplicationAddTXMessage();
- break;
- }
- case REPLICATION_DELETE:
- {
- packet = new ReplicationDeleteMessage();
- break;
- }
- case REPLICATION_DELETE_TX:
- {
- packet = new ReplicationDeleteTXMessage();
- break;
- }
- case REPLICATION_PREPARE:
- {
- packet = new ReplicationPrepareMessage();
- break;
- }
- case REPLICATION_COMMIT_ROLLBACK:
- {
- packet = new ReplicationCommitMessage();
- break;
- }
- case REPLICATION_RESPONSE:
- {
- packet = new ReplicationResponseMessage();
- break;
- }
- case REPLICATION_PAGE_WRITE:
- {
- packet = new ReplicationPageWriteMessage();
- break;
- }
- case REPLICATION_PAGE_EVENT:
- {
- packet = new ReplicationPageEventMessage();
- break;
- }
- case REPLICATION_LARGE_MESSAGE_BEGIN:
- {
- packet = new ReplicationLargeMessageBeingMessage();
- break;
- }
- case REPLICATION_LARGE_MESSAGE_END:
- {
- packet = new ReplicationLargemessageEndMessage();
- break;
- }
- case REPLICATION_LARGE_MESSAGE_WRITE:
- {
- packet = new ReplicationLargeMessageWriteMessage();
- break;
- }
- case REPLICATION_COMPARE_DATA:
- {
- packet = new ReplicationCompareDataMessage();
- break;
- }
- case SESS_FORCE_CONSUMER_DELIVERY:
- {
- packet = new SessionForceConsumerDelivery();
- break;
- }
- default:
- {
- throw new IllegalArgumentException("Invalid type: " + packetType);
- }
- }
-
- packet.decode(in);
-
- return packet;
- }
-
-}
Modified:
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/remoting/impl/RemotingConnectionImpl.java
===================================================================
---
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/remoting/impl/RemotingConnectionImpl.java 2010-01-19
15:48:47 UTC (rev 8806)
+++
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/remoting/impl/RemotingConnectionImpl.java 2010-01-19
16:04:31 UTC (rev 8807)
@@ -28,8 +28,8 @@
import org.hornetq.core.remoting.CloseListener;
import org.hornetq.core.remoting.FailureListener;
import org.hornetq.core.remoting.Packet;
+import org.hornetq.core.remoting.PacketDecoder;
import org.hornetq.core.remoting.RemotingConnection;
-import org.hornetq.core.remoting.impl.wireformat.PacketImpl;
import org.hornetq.spi.core.remoting.Connection;
import org.hornetq.utils.SimpleIDGenerator;
@@ -79,8 +79,6 @@
private final Object failLock = new Object();
- private final PacketDecoder decoder = new PacketDecoder();
-
private volatile boolean dataReceived;
private final Executor executor;
@@ -123,7 +121,7 @@
this.interceptors = interceptors;
this.client = client;
-
+
this.executor = executor;
}
@@ -351,7 +349,7 @@
private volatile boolean executing;
- public void bufferReceived(final Object connectionID, final HornetQBuffer buffer)
+ public void bufferReceived(final Object connectionID, final HornetQBuffer buffer,
final PacketDecoder decoder)
{
final Packet packet = decoder.decode(buffer);
@@ -391,7 +389,46 @@
dataReceived = true;
}
+
+ public void packetReceived(final Object connectionID, final Packet packet)
+ {
+ if (packet.isAsyncExec() && executor != null)
+ {
+ executing = true;
+ executor.execute(new Runnable()
+ {
+ public void run()
+ {
+ try
+ {
+ doBufferReceived(packet);
+ }
+ catch (Throwable t)
+ {
+ RemotingConnectionImpl.log.error("Unexpected error", t);
+ }
+
+ executing = false;
+ }
+ });
+ }
+ else
+ {
+ //To prevent out of order execution if interleaving sync and async operations on
same connection
+ while (executing)
+ {
+ Thread.yield();
+ }
+
+ // Pings must always be handled out of band so we can send pings back to the
client quickly
+ // otherwise they would get in the queue with everything else which might give
an intolerable delay
+ doBufferReceived(packet);
+ }
+
+ dataReceived = true;
+ }
+
private void doBufferReceived(final Packet packet)
{
if (interceptors != null)
Modified:
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/remoting/impl/invm/InVMConnection.java
===================================================================
---
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/remoting/impl/invm/InVMConnection.java 2010-01-19
15:48:47 UTC (rev 8806)
+++
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/remoting/impl/invm/InVMConnection.java 2010-01-19
16:04:31 UTC (rev 8807)
@@ -19,6 +19,8 @@
import org.hornetq.api.core.HornetQBuffers;
import org.hornetq.api.core.HornetQException;
import org.hornetq.core.logging.Logger;
+import org.hornetq.core.remoting.PacketDecoder;
+import org.hornetq.core.remoting.impl.CorePacketDecoder;
import org.hornetq.spi.core.remoting.BufferHandler;
import org.hornetq.spi.core.remoting.Connection;
import org.hornetq.spi.core.remoting.ConnectionLifeCycleListener;
@@ -36,6 +38,8 @@
private final BufferHandler handler;
+ private final PacketDecoder decoder = new CorePacketDecoder();
+
private final ConnectionLifeCycleListener listener;
private final String id;
@@ -128,7 +132,7 @@
{
copied.readInt(); // read and discard
- handler.bufferReceived(id, copied);
+ handler.bufferReceived(id, copied, decoder);
}
}
catch (Exception e)
Modified:
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java
===================================================================
---
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java 2010-01-19
15:48:47 UTC (rev 8806)
+++
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java 2010-01-19
16:04:31 UTC (rev 8807)
@@ -32,6 +32,7 @@
import org.hornetq.core.remoting.Channel;
import org.hornetq.core.remoting.ChannelHandler;
import org.hornetq.core.remoting.Packet;
+import org.hornetq.core.remoting.PacketDecoder;
import org.hornetq.core.remoting.RemotingConnection;
import org.hornetq.core.remoting.impl.AbstractBufferHandler;
import org.hornetq.core.remoting.impl.RemotingConnectionImpl;
@@ -41,6 +42,7 @@
import org.hornetq.core.server.HornetQServer;
import org.hornetq.core.server.impl.HornetQPacketHandler;
import org.hornetq.core.server.management.ManagementService;
+import org.hornetq.integration.stomp.StompPacketDecoder;
import org.hornetq.spi.core.remoting.Acceptor;
import org.hornetq.spi.core.remoting.AcceptorFactory;
import org.hornetq.spi.core.remoting.BufferHandler;
@@ -184,7 +186,7 @@
a.start();
}
- //This thread checks connections that need to be closed, and also flushes
confirmations
+ // This thread checks connections that need to be closed, and also flushes
confirmations
failureCheckAndFlushThread = new
FailureCheckAndFlushThread(RemotingServiceImpl.CONNECTION_TTL_CHECK_INTERVAL);
failureCheckAndFlushThread.start();
@@ -423,13 +425,13 @@
private final class DelegatingBufferHandler extends AbstractBufferHandler
{
- public void bufferReceived(final Object connectionID, final HornetQBuffer buffer)
+ public void bufferReceived(final Object connectionID, final HornetQBuffer buffer,
final PacketDecoder decoder)
{
ConnectionEntry conn = connections.get(connectionID);
if (conn != null)
{
- conn.connection.bufferReceived(connectionID, buffer);
+ conn.connection.bufferReceived(connectionID, buffer, decoder);
}
}
}
@@ -495,9 +497,9 @@
for (ConnectionEntry entry : connections.values())
{
RemotingConnection conn = entry.connection;
-
+
boolean flush = true;
-
+
if (entry.ttl != -1)
{
if (now >= entry.lastCheck + entry.ttl)
@@ -505,7 +507,7 @@
if (!conn.checkDataReceived())
{
idsToRemove.add(conn.getID());
-
+
flush = false;
}
else
@@ -514,12 +516,12 @@
}
}
}
-
+
if (flush)
{
- //We flush any confirmations on the connection - this prevents idle
bridges for example
- //sitting there with many unacked messages
-
+ // We flush any confirmations on the connection - this prevents idle
bridges for example
+ // sitting there with many unacked messages
+
conn.flushConfirmations();
}
}
Added:
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/stomp/ProtocolConverter.java
===================================================================
---
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/stomp/ProtocolConverter.java
(rev 0)
+++
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/stomp/ProtocolConverter.java 2010-01-19
16:04:31 UTC (rev 8807)
@@ -0,0 +1,86 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.integration.stomp;
+
+import java.util.Map;
+
+import org.hornetq.api.core.client.HornetQClient;
+import org.hornetq.core.remoting.Packet;
+import org.hornetq.core.remoting.impl.wireformat.CreateSessionMessage;
+import org.hornetq.core.remoting.impl.wireformat.SessionCloseMessage;
+import org.hornetq.utils.UUIDGenerator;
+import org.hornetq.utils.VersionLoader;
+
+/**
+ * A ProtocolConverter
+ *
+ * @author <a href="mailto:jmesnil@redhat.com">Jeff Mesnil</a>
+ *
+ *
+ */
+public class ProtocolConverter
+{
+
+ public Packet toPacket(StompFrame frame)
+ {
+ String command = frame.getCommand();
+ Map<String, Object> headers = frame.getHeaders();
+ if (Stomp.Commands.CONNECT.equals(command))
+ {
+ String login = (String)headers.get("login");
+ String password = (String)headers.get("passcode");
+
+ String name = UUIDGenerator.getInstance().generateStringUUID();
+ long sessionChannelID = 12;
+ return new CreateSessionMessage(name,
+ sessionChannelID,
+
VersionLoader.getVersion().getIncrementingVersion(),
+ login,
+ password,
+ HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE,
+ false,
+ true,
+ true,
+ false,
+
HornetQClient.DEFAULT_CONFIRMATION_WINDOW_SIZE);
+ }
+ if (Stomp.Commands.DISCONNECT.equals(command))
+ {
+ return new SessionCloseMessage();
+ }
+ else
+ {
+ throw new RuntimeException("frame not supported: " + frame);
+ }
+ }
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
Added:
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/stomp/ProtocolException.java
===================================================================
---
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/stomp/ProtocolException.java
(rev 0)
+++
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/stomp/ProtocolException.java 2010-01-19
16:04:31 UTC (rev 8807)
@@ -0,0 +1,50 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.hornetq.integration.stomp;
+
+import java.io.IOException;
+
+/**
+ * @author <a href="http://hiramchirino.com">chirino</a>
+ */
+public class ProtocolException extends IOException {
+ private static final long serialVersionUID = -2869735532997332242L;
+ private final boolean fatal;
+
+ public ProtocolException() {
+ this(null);
+ }
+
+ public ProtocolException(String s) {
+ this(s, false);
+ }
+
+ public ProtocolException(String s, boolean fatal) {
+ this(s, fatal, null);
+ }
+
+ public ProtocolException(String s, boolean fatal, Throwable cause) {
+ super(s);
+ this.fatal = fatal;
+ initCause(cause);
+ }
+
+ public boolean isFatal() {
+ return fatal;
+ }
+}
Added:
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/stomp/Stomp.java
===================================================================
--- branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/stomp/Stomp.java
(rev 0)
+++
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/stomp/Stomp.java 2010-01-19
16:04:31 UTC (rev 8807)
@@ -0,0 +1,124 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.hornetq.integration.stomp;
+
+
+/**
+ * The standard verbs and headers used for the <a
href="http://stomp.codehaus.org/">STOMP</a> protocol.
+ *
+ * @version $Revision: 57 $
+ */
+public interface Stomp {
+ String NULL = "\u0000";
+ String NEWLINE = "\n";
+
+ public static interface Commands {
+ String CONNECT = "CONNECT";
+ String SEND = "SEND";
+ String DISCONNECT = "DISCONNECT";
+ String SUBSCRIBE = "SUB";
+ String UNSUBSCRIBE = "UNSUB";
+ String BEGIN_TRANSACTION = "BEGIN";
+ String COMMIT_TRANSACTION = "COMMIT";
+ String ABORT_TRANSACTION = "ABORT";
+ String BEGIN = "BEGIN";
+ String COMMIT = "COMMIT";
+ String ABORT = "ABORT";
+ String ACK = "ACK";
+ }
+
+ public interface Responses {
+ String CONNECTED = "CONNECTED";
+ String ERROR = "ERROR";
+ String MESSAGE = "MESSAGE";
+ String RECEIPT = "RECEIPT";
+ }
+
+ public interface Headers {
+ String SEPERATOR = ":";
+ String RECEIPT_REQUESTED = "receipt";
+ String TRANSACTION = "transaction";
+ String CONTENT_LENGTH = "content-length";
+
+ public interface Response {
+ String RECEIPT_ID = "receipt-id";
+ }
+
+ public interface Send {
+ String DESTINATION = "destination";
+ String CORRELATION_ID = "correlation-id";
+ String REPLY_TO = "reply-to";
+ String EXPIRATION_TIME = "expires";
+ String PRIORITY = "priority";
+ String TYPE = "type";
+ Object PERSISTENT = "persistent";
+ }
+
+ public interface Message {
+ String MESSAGE_ID = "message-id";
+ String DESTINATION = "destination";
+ String CORRELATION_ID = "correlation-id";
+ String EXPIRATION_TIME = "expires";
+ String REPLY_TO = "reply-to";
+ String PRORITY = "priority";
+ String REDELIVERED = "redelivered";
+ String TIMESTAMP = "timestamp";
+ String TYPE = "type";
+ String SUBSCRIPTION = "subscription";
+ }
+
+ public interface Subscribe {
+ String DESTINATION = "destination";
+ String ACK_MODE = "ack";
+ String ID = "id";
+ String SELECTOR = "selector";
+ String DURABLE_SUBSCRIPTION_NAME = "durable-subscription-name";
+ String NO_LOCAL = "no-local";
+
+ public interface AckModeValues {
+ String AUTO = "auto";
+ String CLIENT = "client";
+ }
+ }
+
+ public interface Unsubscribe {
+ String DESTINATION = "destination";
+ String ID = "id";
+ }
+
+ public interface Connect {
+ String LOGIN = "login";
+ String PASSCODE = "passcode";
+ String CLIENT_ID = "client-id";
+ String REQUEST_ID = "request-id";
+ }
+
+ public interface Error {
+ String MESSAGE = "message";
+ }
+
+ public interface Connected {
+ String SESSION = "session";
+ String RESPONSE_ID = "response-id";
+ }
+
+ public interface Ack {
+ String MESSAGE_ID = "message-id";
+ }
+ }
+}
Added:
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/stomp/StompFrame.java
===================================================================
---
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/stomp/StompFrame.java
(rev 0)
+++
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/stomp/StompFrame.java 2010-01-19
16:04:31 UTC (rev 8807)
@@ -0,0 +1,70 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.hornetq.integration.stomp;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Represents all the data in a STOMP frame.
+ *
+ * @author <a href="http://hiramchirino.com">chirino</a>
+ */
+public class StompFrame
+{
+ private static final byte[] NO_DATA = new byte[] {};
+
+ private String command;
+
+ private Map<String, Object> headers;
+
+ private byte[] content = StompFrame.NO_DATA;
+
+ public StompFrame()
+ {
+ this.headers = new HashMap<String, Object>();
+ }
+
+ public StompFrame(String command, Map<String, Object> headers, byte[] data)
+ {
+ this.command = command;
+ this.headers = headers;
+ this.content = data;
+ }
+
+ public String getCommand()
+ {
+ return command;
+ }
+
+ public byte[] getContent()
+ {
+ return content;
+ }
+
+ public Map<String, Object> getHeaders()
+ {
+ return headers;
+ }
+
+ @Override
+ public String toString()
+ {
+ return "StompFrame[command=" + command + ", headers=" + headers
+ ",content-length=" + content.length + "]";
+ }
+}
Added:
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/stomp/StompFrameDelimiter.java
===================================================================
---
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/stomp/StompFrameDelimiter.java
(rev 0)
+++
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/stomp/StompFrameDelimiter.java 2010-01-19
16:04:31 UTC (rev 8807)
@@ -0,0 +1,33 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.integration.stomp;
+
+import org.jboss.netty.handler.codec.frame.DelimiterBasedFrameDecoder;
+import org.jboss.netty.handler.codec.frame.Delimiters;
+
+/**
+ * A StompFrameDelimiter
+ *
+ * @author <a href="mailto:jmesnil@redhat.com">Jeff Mesnil</a>
+ */
+public class StompFrameDelimiter extends DelimiterBasedFrameDecoder
+{
+
+ private static final int MAX_DATA_LENGTH = 1024 * 1024 * 100;
+
+ public StompFrameDelimiter()
+ {
+ super(MAX_DATA_LENGTH, true, Delimiters.nulDelimiter());
+ }
+}
Added:
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/stomp/StompFrameError.java
===================================================================
---
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/stomp/StompFrameError.java
(rev 0)
+++
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/stomp/StompFrameError.java 2010-01-19
16:04:31 UTC (rev 8807)
@@ -0,0 +1,35 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.hornetq.integration.stomp;
+
+/**
+ * Command indicating that an invalid Stomp Frame was received.
+ *
+ * @author <a href="http://hiramchirino.com">chirino</a>
+ */
+public class StompFrameError extends StompFrame {
+ private final ProtocolException exception;
+
+ public StompFrameError(ProtocolException exception) {
+ this.exception = exception;
+ }
+
+ public ProtocolException getException() {
+ return exception;
+ }
+}
Added:
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/stomp/StompMarshaller.java
===================================================================
---
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/stomp/StompMarshaller.java
(rev 0)
+++
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/stomp/StompMarshaller.java 2010-01-19
16:04:31 UTC (rev 8807)
@@ -0,0 +1,191 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.hornetq.integration.stomp;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+
+import org.hornetq.api.core.HornetQBuffer;
+
+/**
+ * Implements marshalling and unmarsalling the <a
href="http://stomp.codehaus.org/">Stomp</a> protocol.
+ */
+public class StompMarshaller {
+ private static final byte[] NO_DATA = new byte[]{};
+ private static final byte[] END_OF_FRAME = new byte[]{0, '\n'};
+ private static final int MAX_COMMAND_LENGTH = 1024;
+ private static final int MAX_HEADER_LENGTH = 1024 * 10;
+ private static final int MAX_HEADERS = 1000;
+ private static final int MAX_DATA_LENGTH = 1024 * 1024 * 100;
+ private int version = 1;
+
+ public int getVersion() {
+ return version;
+ }
+
+ public void setVersion(int version) {
+ this.version = version;
+ }
+
+ public byte[] marshal(StompFrame command) throws IOException {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ DataOutputStream dos = new DataOutputStream(baos);
+ marshal(command, dos);
+ dos.close();
+ return baos.toByteArray();
+ }
+
+ public void marshal(StompFrame stomp, DataOutput os) throws IOException {
+ StringBuffer buffer = new StringBuffer();
+ buffer.append(stomp.getCommand());
+ buffer.append(Stomp.NEWLINE);
+
+ // Output the headers.
+ for (Iterator iter = stomp.getHeaders().entrySet().iterator(); iter.hasNext();)
{
+ Map.Entry entry = (Map.Entry) iter.next();
+ buffer.append(entry.getKey());
+ buffer.append(Stomp.Headers.SEPERATOR);
+ buffer.append(entry.getValue());
+ buffer.append(Stomp.NEWLINE);
+ }
+
+ // Add a newline to seperate the headers from the content.
+ buffer.append(Stomp.NEWLINE);
+
+ os.write(buffer.toString().getBytes("UTF-8"));
+ os.write(stomp.getContent());
+ os.write(END_OF_FRAME);
+ }
+
+ public StompFrame unmarshal(HornetQBuffer in) throws IOException {
+
+ try {
+ String action = null;
+
+ // skip white space to next real action line
+ while (true) {
+ action = readLine(in, MAX_COMMAND_LENGTH, "The maximum command
length was exceeded");
+ if (action == null) {
+ throw new IOException("connection was closed");
+ }
+ else {
+ action = action.trim();
+ if (action.length() > 0) {
+ break;
+ }
+ }
+ }
+
+ // Parse the headers
+ HashMap headers = new HashMap(25);
+ while (true) {
+ String line = readLine(in, MAX_HEADER_LENGTH, "The maximum header
length was exceeded");
+ if (line != null && line.trim().length() > 0) {
+
+ if (headers.size() > MAX_HEADERS) {
+ throw new ProtocolException("The maximum number of headers
was exceeded", true);
+ }
+
+ try {
+ int seperator_index = line.indexOf(Stomp.Headers.SEPERATOR);
+ String name = line.substring(0, seperator_index).trim();
+ String value = line.substring(seperator_index + 1,
line.length()).trim();
+ headers.put(name, value);
+ }
+ catch (Exception e) {
+ throw new ProtocolException("Unable to parser header line
[" + line + "]", true);
+ }
+ }
+ else {
+ break;
+ }
+ }
+
+ // Read in the data part.
+ byte[] data = NO_DATA;
+ String contentLength = (String) headers.get(Stomp.Headers.CONTENT_LENGTH);
+ if (contentLength != null) {
+
+ // Bless the client, he's telling us how much data to read in.
+ int length;
+ try {
+ length = Integer.parseInt(contentLength.trim());
+ }
+ catch (NumberFormatException e) {
+ throw new ProtocolException("Specified content-length is not a
valid integer", true);
+ }
+
+ if (length > MAX_DATA_LENGTH) {
+ throw new ProtocolException("The maximum data length was
exceeded", true);
+ }
+
+ data = new byte[length];
+ in.readBytes(data);
+
+ if (in.readByte() != 0) {
+ throw new ProtocolException(Stomp.Headers.CONTENT_LENGTH + "
bytes were read and " + "there was no trailing null byte", true);
+ }
+ }
+ else {
+
+ // We don't know how much to read.. data ends when we hit a 0
+ byte b;
+ ByteArrayOutputStream baos = null;
+ while (in.readableBytes() > 0 && (b = in.readByte()) != 0) {
+
+ if (baos == null) {
+ baos = new ByteArrayOutputStream();
+ }
+ else if (baos.size() > MAX_DATA_LENGTH) {
+ throw new ProtocolException("The maximum data length was
exceeded", true);
+ }
+
+ baos.write(b);
+ }
+
+ if (baos != null) {
+ baos.close();
+ data = baos.toByteArray();
+ }
+ }
+
+ return new StompFrame(action, headers, data);
+ }
+ catch (ProtocolException e) {
+ return new StompFrameError(e);
+ }
+ }
+
+ protected String readLine(HornetQBuffer in, int maxLength, String errorMessage)
throws IOException {
+ byte b;
+ ByteArrayOutputStream baos = new ByteArrayOutputStream(maxLength);
+ while ((b = in.readByte()) != '\n') {
+ if (baos.size() > maxLength) {
+ throw new ProtocolException(errorMessage, true);
+ }
+ baos.write(b);
+ }
+ byte[] sequence = baos.toByteArray();
+ return new String(sequence, "UTF-8");
+ }
+}
Added:
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/stomp/StompPacketDecoder.java
===================================================================
---
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/stomp/StompPacketDecoder.java
(rev 0)
+++
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/stomp/StompPacketDecoder.java 2010-01-19
16:04:31 UTC (rev 8807)
@@ -0,0 +1,52 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.integration.stomp;
+
+import org.hornetq.api.core.HornetQBuffer;
+import org.hornetq.core.remoting.Packet;
+import org.hornetq.core.remoting.PacketDecoder;
+
+/**
+ * A StompPacketDecoder
+ *
+ * @author <a href="mailto:jmesnil@redhat.com">Jeff Mesnil</a>
+ */
+public class StompPacketDecoder implements PacketDecoder
+{
+ private final StompMarshaller marshaller = new StompMarshaller();
+
+ private final ProtocolConverter converter = new ProtocolConverter();
+
+ // PacketDecoder implementation ----------------------------------
+
+ public Packet decode(HornetQBuffer in)
+ {
+ StompFrame frame;
+ try
+ {
+ frame = marshaller.unmarshal(in);
+ System.out.println(">>> " + frame);
+ Packet packet = converter.toPacket(frame);
+ packet.setChannelID(1);
+ System.out.println(">>> " + packet);
+
+ return packet;
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ return null;
+ }
+ }
+}
Modified:
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/ChannelPipelineSupport.java
===================================================================
---
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/ChannelPipelineSupport.java 2010-01-19
15:48:47 UTC (rev 8806)
+++
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/ChannelPipelineSupport.java 2010-01-19
16:04:31 UTC (rev 8807)
@@ -16,6 +16,7 @@
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLEngine;
+import org.hornetq.integration.stomp.StompFrameDelimiter;
import org.hornetq.spi.core.remoting.BufferHandler;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.handler.ssl.SslHandler;
@@ -44,9 +45,15 @@
// Public --------------------------------------------------------
- public static void addCodecFilter(final ChannelPipeline pipeline, final BufferHandler
handler)
+ public static void addStompCodecFilter(final ChannelPipeline pipeline, final
BufferHandler handler)
{
assert pipeline != null;
+ pipeline.addLast("delimiter", new StompFrameDelimiter());
+ }
+
+ public static void addHornetQCodecFilter(final ChannelPipeline pipeline, final
BufferHandler handler)
+ {
+ assert pipeline != null;
pipeline.addLast("decoder", new HornetQFrameDecoder2());
}
Modified:
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/HornetQChannelHandler.java
===================================================================
---
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/HornetQChannelHandler.java 2010-01-19
15:48:47 UTC (rev 8806)
+++
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/HornetQChannelHandler.java 2010-01-19
16:04:31 UTC (rev 8807)
@@ -15,6 +15,7 @@
import org.hornetq.api.core.HornetQException;
import org.hornetq.core.buffers.impl.ChannelBufferWrapper;
import org.hornetq.core.logging.Logger;
+import org.hornetq.core.remoting.PacketDecoder;
import org.hornetq.spi.core.remoting.BufferHandler;
import org.hornetq.spi.core.remoting.ConnectionLifeCycleListener;
import org.jboss.netty.buffer.ChannelBuffer;
@@ -31,23 +32,27 @@
* @author <a href="mailto:tlee@redhat.com">Trustin Lee</a>
* @version $Rev$, $Date$
*/
-class HornetQChannelHandler extends SimpleChannelHandler
+public class HornetQChannelHandler extends SimpleChannelHandler
{
private static final Logger log = Logger.getLogger(HornetQChannelHandler.class);
private final ChannelGroup group;
+ private final PacketDecoder decoder;
+
private final BufferHandler handler;
private final ConnectionLifeCycleListener listener;
volatile boolean active;
- HornetQChannelHandler(final ChannelGroup group,
- final BufferHandler handler,
- final ConnectionLifeCycleListener listener)
+ public HornetQChannelHandler(final ChannelGroup group,
+ final PacketDecoder decoder,
+ final BufferHandler handler,
+ final ConnectionLifeCycleListener listener)
{
this.group = group;
+ this.decoder = decoder;
this.handler = handler;
this.listener = listener;
}
@@ -64,7 +69,7 @@
{
ChannelBuffer buffer = (ChannelBuffer)e.getMessage();
- handler.bufferReceived(e.getChannel().getId(), new ChannelBufferWrapper(buffer));
+ handler.bufferReceived(e.getChannel().getId(), new ChannelBufferWrapper(buffer),
decoder);
}
@Override
Modified:
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/NettyAcceptor.java
===================================================================
---
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/NettyAcceptor.java 2010-01-19
15:48:47 UTC (rev 8806)
+++
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/NettyAcceptor.java 2010-01-19
16:04:31 UTC (rev 8807)
@@ -31,9 +31,12 @@
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.api.core.management.NotificationType;
import org.hornetq.core.logging.Logger;
+import org.hornetq.core.remoting.PacketDecoder;
+import org.hornetq.core.remoting.impl.CorePacketDecoder;
import org.hornetq.core.remoting.impl.ssl.SSLSupport;
import org.hornetq.core.server.management.Notification;
import org.hornetq.core.server.management.NotificationService;
+import org.hornetq.integration.stomp.StompPacketDecoder;
import org.hornetq.spi.core.remoting.Acceptor;
import org.hornetq.spi.core.remoting.BufferHandler;
import org.hornetq.spi.core.remoting.Connection;
@@ -102,6 +105,7 @@
private final boolean useInvm;
+ private final String protocol;
private final String host;
private final int port;
@@ -176,6 +180,9 @@
useInvm =
ConfigurationHelper.getBooleanProperty(TransportConstants.USE_INVM_PROP_NAME,
TransportConstants.DEFAULT_USE_INVM,
configuration);
+ protocol =
ConfigurationHelper.getStringProperty(TransportConstants.PROTOCOL_PROP_NAME,
+ TransportConstants.DEFAULT_PROTOCOL,
+ configuration);
host = ConfigurationHelper.getStringProperty(TransportConstants.HOST_PROP_NAME,
TransportConstants.DEFAULT_HOST,
configuration);
@@ -279,9 +286,18 @@
pipeline.addLast("httpResponseEncoder", new
HttpResponseEncoder());
pipeline.addLast("httphandler", new
HttpAcceptorHandler(httpKeepAliveRunnable, httpResponseTime));
}
-
- ChannelPipelineSupport.addCodecFilter(pipeline, handler);
- pipeline.addLast("handler", new
HornetQServerChannelHandler(channelGroup, handler, new Listener()));
+ PacketDecoder decoder;
+ if (protocol.equals(TransportConstants.STOMP_PROTOCOL))
+ {
+ ChannelPipelineSupport.addStompCodecFilter(pipeline, handler);
+ decoder = new StompPacketDecoder();
+ } else
+ {
+ ChannelPipelineSupport.addHornetQCodecFilter(pipeline, handler);
+ decoder = new CorePacketDecoder();
+ }
+
+ pipeline.addLast("handler", new
HornetQServerChannelHandler(channelGroup, decoder, handler, new Listener()));
return pipeline;
}
};
@@ -475,10 +491,11 @@
private final class HornetQServerChannelHandler extends HornetQChannelHandler
{
HornetQServerChannelHandler(final ChannelGroup group,
+ final PacketDecoder decoder,
final BufferHandler handler,
final ConnectionLifeCycleListener listener)
{
- super(group, handler, listener);
+ super(group, decoder, handler, listener);
}
@Override
Modified:
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/NettyConnection.java
===================================================================
---
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/NettyConnection.java 2010-01-19
15:48:47 UTC (rev 8806)
+++
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/NettyConnection.java 2010-01-19
16:04:31 UTC (rev 8807)
@@ -14,7 +14,6 @@
package org.hornetq.integration.transports.netty;
import org.hornetq.api.core.HornetQBuffer;
-import org.hornetq.api.core.HornetQException;
import org.hornetq.core.buffers.impl.ChannelBufferWrapper;
import org.hornetq.core.logging.Logger;
import org.hornetq.spi.core.remoting.Connection;
Modified:
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/NettyConnector.java
===================================================================
---
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/NettyConnector.java 2010-01-19
15:48:47 UTC (rev 8806)
+++
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/NettyConnector.java 2010-01-19
16:04:31 UTC (rev 8807)
@@ -30,6 +30,8 @@
import org.hornetq.api.core.HornetQException;
import org.hornetq.core.logging.Logger;
+import org.hornetq.core.remoting.PacketDecoder;
+import org.hornetq.core.remoting.impl.CorePacketDecoder;
import org.hornetq.core.remoting.impl.ssl.SSLSupport;
import org.hornetq.spi.core.remoting.BufferHandler;
import org.hornetq.spi.core.remoting.Connection;
@@ -93,6 +95,8 @@
private ChannelGroup channelGroup;
private final BufferHandler handler;
+
+ private final PacketDecoder decoder = new CorePacketDecoder();
private final ConnectionLifeCycleListener listener;
@@ -310,7 +314,7 @@
pipeline.addLast("httpResponseDecoder", new
HttpResponseDecoder());
pipeline.addLast("httphandler", new HttpHandler());
}
- ChannelPipelineSupport.addCodecFilter(pipeline, handler);
+ ChannelPipelineSupport.addHornetQCodecFilter(pipeline, handler);
pipeline.addLast("handler", new
HornetQClientChannelHandler(channelGroup, handler, new Listener()));
return pipeline;
}
@@ -441,7 +445,7 @@
final BufferHandler handler,
final ConnectionLifeCycleListener listener)
{
- super(group, handler, listener);
+ super(group, decoder, handler, listener);
}
}
Modified:
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/TransportConstants.java
===================================================================
---
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/TransportConstants.java 2010-01-19
15:48:47 UTC (rev 8806)
+++
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/TransportConstants.java 2010-01-19
16:04:31 UTC (rev 8807)
@@ -46,6 +46,8 @@
public static final String USE_INVM_PROP_NAME = "use-invm";
+ public static final String PROTOCOL_PROP_NAME = "protocol";
+
public static final String HOST_PROP_NAME = "host";
public static final String PORT_PROP_NAME = "port";
@@ -75,10 +77,18 @@
public static final boolean DEFAULT_USE_SERVLET = false;
+ public static final String HORNETQ_PROTOCOL = "hornetq";
+
+ public static final String STOMP_PROTOCOL = "stomp";
+
+ public static final String DEFAULT_PROTOCOL = HORNETQ_PROTOCOL;
+
public static final String DEFAULT_HOST = "localhost";
public static final int DEFAULT_PORT = 5445;
+ public static final int DEFAULT_STOMP_PORT = 61613;
+
public static final String DEFAULT_KEYSTORE_PATH = "hornetq.keystore";
public static final String DEFAULT_KEYSTORE_PASSWORD = "secureexample";
@@ -120,6 +130,7 @@
allowableAcceptorKeys.add(TransportConstants.HTTP_SERVER_SCAN_PERIOD_PROP_NAME);
allowableAcceptorKeys.add(TransportConstants.USE_NIO_PROP_NAME);
allowableAcceptorKeys.add(TransportConstants.USE_INVM_PROP_NAME);
+ allowableAcceptorKeys.add(TransportConstants.PROTOCOL_PROP_NAME);
allowableAcceptorKeys.add(TransportConstants.HOST_PROP_NAME);
allowableAcceptorKeys.add(TransportConstants.PORT_PROP_NAME);
allowableAcceptorKeys.add(TransportConstants.KEYSTORE_PATH_PROP_NAME);
Modified:
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/spi/core/remoting/BufferHandler.java
===================================================================
---
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/spi/core/remoting/BufferHandler.java 2010-01-19
15:48:47 UTC (rev 8806)
+++
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/spi/core/remoting/BufferHandler.java 2010-01-19
16:04:31 UTC (rev 8807)
@@ -13,6 +13,7 @@
package org.hornetq.spi.core.remoting;
import org.hornetq.api.core.HornetQBuffer;
+import org.hornetq.core.remoting.PacketDecoder;
/**
* A BufferHandler that will handle buffers received by an acceptor.
@@ -29,7 +30,7 @@
* @param connectionID the connection the buffer was received on
* @param buffer the buffer to decode
*/
- void bufferReceived(Object connectionID, HornetQBuffer buffer);
+ void bufferReceived(Object connectionID, HornetQBuffer buffer, PacketDecoder
decoder);
/**
* called by the remoting connection prior to {@link
org.hornetq.spi.core.remoting.BufferHandler#bufferReceived(Object,
org.hornetq.api.core.HornetQBuffer)}.
Modified:
branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/integration/http/NettyHttpTest.java
===================================================================
---
branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/integration/http/NettyHttpTest.java 2010-01-19
15:48:47 UTC (rev 8806)
+++
branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/integration/http/NettyHttpTest.java 2010-01-19
16:04:31 UTC (rev 8807)
@@ -25,6 +25,7 @@
import org.hornetq.api.core.HornetQBuffer;
import org.hornetq.api.core.HornetQException;
import org.hornetq.core.config.impl.ConfigurationImpl;
+import org.hornetq.core.remoting.PacketDecoder;
import org.hornetq.core.remoting.impl.AbstractBufferHandler;
import org.hornetq.integration.transports.netty.NettyAcceptor;
import org.hornetq.integration.transports.netty.NettyConnector;
@@ -474,7 +475,7 @@
this.latch = latch;
}
- public void bufferReceived(final Object connectionID, final HornetQBuffer buffer)
+ public void bufferReceived(final Object connectionID, final HornetQBuffer buffer,
final PacketDecoder decoder)
{
int i = buffer.readInt();
messages.add(i);
@@ -496,7 +497,7 @@
this.latch = latch;
}
- public void bufferReceived(final Object connectionID, final HornetQBuffer buffer)
+ public void bufferReceived(final Object connectionID, final HornetQBuffer buffer,
final PacketDecoder decoder)
{
int i = buffer.readInt();
@@ -529,7 +530,7 @@
return 0;
}
- public void bufferReceived(final Object connectionID, final HornetQBuffer buffer)
+ public void bufferReceived(final Object connectionID, final HornetQBuffer buffer,
final PacketDecoder decoder)
{
int i = buffer.readInt();
messages.add(i);
Added:
branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/integration/stomp/StompTest.java
===================================================================
---
branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/integration/stomp/StompTest.java
(rev 0)
+++
branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/integration/stomp/StompTest.java 2010-01-19
16:04:31 UTC (rev 8807)
@@ -0,0 +1,841 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.hornetq.tests.integration.stomp;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.Socket;
+import java.net.SocketTimeoutException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import javax.jms.BytesMessage;
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.JMSException;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import junit.framework.Assert;
+import junit.framework.TestCase;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.hornetq.api.core.TransportConfiguration;
+import org.hornetq.core.config.Configuration;
+import org.hornetq.core.config.impl.ConfigurationImpl;
+import org.hornetq.core.remoting.impl.invm.InVMAcceptorFactory;
+import org.hornetq.core.remoting.impl.invm.InVMConnectorFactory;
+import org.hornetq.core.server.HornetQServer;
+import org.hornetq.core.server.HornetQServers;
+import org.hornetq.integration.stomp.Stomp;
+import org.hornetq.integration.transports.netty.NettyAcceptorFactory;
+import org.hornetq.integration.transports.netty.TransportConstants;
+import org.hornetq.jms.client.HornetQConnectionFactory;
+import org.hornetq.jms.server.JMSServerManager;
+import org.hornetq.jms.server.config.JMSConfiguration;
+import org.hornetq.jms.server.config.impl.JMSConfigurationImpl;
+import org.hornetq.jms.server.config.impl.QueueConfigurationImpl;
+import org.hornetq.jms.server.impl.JMSServerManagerImpl;
+
+public class StompTest extends TestCase {
+ private static final transient Log log = LogFactory.getLog(StompTest.class);
+ private int port = 61613;
+ private Socket stompSocket;
+ private ByteArrayOutputStream inputBuffer;
+ private ConnectionFactory connectionFactory;
+ private Connection connection;
+ private Session session;
+ private Queue queue;
+ private JMSServerManager server;
+
+ public void testConnect() throws Exception {
+
+ String connect_frame = "CONNECT\n" + "login: brianm\n" +
"passcode: wombats\n" + "request-id: 1\n" + "\n" +
Stomp.NULL;
+ sendFrame(connect_frame);
+
+ String f = receiveFrame(10000);
+ Assert.assertTrue(f.startsWith("CONNECTED"));
+ Assert.assertTrue(f.indexOf("response-id:1") >= 0);
+ }
+
+ public void testSendMessage() throws Exception {
+
+ MessageConsumer consumer = session.createConsumer(queue);
+
+ String frame =
+ "CONNECT\n" +
+ "login: brianm\n" +
+ "passcode: wombats\n\n" +
+ Stomp.NULL;
+ sendFrame(frame);
+
+ frame = receiveFrame(10000);
+ Assert.assertTrue(frame.startsWith("CONNECTED"));
+
+ frame =
+ "SEND\n" +
+ "destination:/queue/" + getQueueName() +
"\n\n" +
+ "Hello World" +
+ Stomp.NULL;
+
+ sendFrame(frame);
+
+ TextMessage message = (TextMessage) consumer.receive(1000);
+ Assert.assertNotNull(message);
+ Assert.assertEquals("Hello World", message.getText());
+
+ // Make sure that the timestamp is valid - should
+ // be very close to the current time.
+ long tnow = System.currentTimeMillis();
+ long tmsg = message.getJMSTimestamp();
+ Assert.assertTrue(Math.abs(tnow - tmsg) < 1000);
+ }
+
+ public void _testJMSXGroupIdCanBeSet() throws Exception {
+
+ MessageConsumer consumer = session.createConsumer(queue);
+
+ String frame =
+ "CONNECT\n" +
+ "login: brianm\n" +
+ "passcode: wombats\n\n" +
+ Stomp.NULL;
+ sendFrame(frame);
+
+ frame = receiveFrame(10000);
+ Assert.assertTrue(frame.startsWith("CONNECTED"));
+
+ frame =
+ "SEND\n" +
+ "destination:/queue/" + getQueueName() + "\n"
+
+ "JMSXGroupID: TEST\n\n" +
+ "Hello World" +
+ Stomp.NULL;
+
+ sendFrame(frame);
+
+ TextMessage message = (TextMessage) consumer.receive(1000);
+ Assert.assertNotNull(message);
+ // TODO do we support it?
+ //Assert.assertEquals("TEST", ((TextMessage) message).getGroupID());
+ }
+
+ public void testSendMessageWithCustomHeadersAndSelector() throws Exception {
+
+ MessageConsumer consumer = session.createConsumer(queue, "foo =
'abc'");
+
+ String frame =
+ "CONNECT\n" +
+ "login: brianm\n" +
+ "passcode: wombats\n\n" +
+ Stomp.NULL;
+ sendFrame(frame);
+
+ frame = receiveFrame(10000);
+ Assert.assertTrue(frame.startsWith("CONNECTED"));
+
+ frame =
+ "SEND\n" +
+ "foo:abc\n" +
+ "bar:123\n" +
+ "destination:/queue/" + getQueueName() +
"\n\n" +
+ "Hello World" +
+ Stomp.NULL;
+
+ sendFrame(frame);
+
+ TextMessage message = (TextMessage) consumer.receive(1000);
+ Assert.assertNotNull(message);
+ Assert.assertEquals("Hello World", message.getText());
+ Assert.assertEquals("foo", "abc",
message.getStringProperty("foo"));
+ Assert.assertEquals("bar", "123",
message.getStringProperty("bar"));
+ }
+
+ public void _testSendMessageWithStandardHeaders() throws Exception {
+
+ MessageConsumer consumer = session.createConsumer(queue);
+
+ String frame =
+ "CONNECT\n" +
+ "login: brianm\n" +
+ "passcode: wombats\n\n" +
+ Stomp.NULL;
+ sendFrame(frame);
+
+ frame = receiveFrame(10000);
+ Assert.assertTrue(frame.startsWith("CONNECTED"));
+
+ frame =
+ "SEND\n" +
+ "correlation-id:c123\n" +
+ "priority:3\n" +
+ "type:t345\n" +
+ "JMSXGroupID:abc\n" +
+ "foo:abc\n" +
+ "bar:123\n" +
+ "destination:/queue/" + getQueueName() +
"\n\n" +
+ "Hello World" +
+ Stomp.NULL;
+
+ sendFrame(frame);
+
+ TextMessage message = (TextMessage) consumer.receive(1000);
+ Assert.assertNotNull(message);
+ Assert.assertEquals("Hello World", message.getText());
+ Assert.assertEquals("JMSCorrelationID", "c123",
message.getJMSCorrelationID());
+ Assert.assertEquals("getJMSType", "t345",
message.getJMSType());
+ Assert.assertEquals("getJMSPriority", 3, message.getJMSPriority());
+ Assert.assertEquals("foo", "abc",
message.getStringProperty("foo"));
+ Assert.assertEquals("bar", "123",
message.getStringProperty("bar"));
+
+ Assert.assertEquals("JMSXGroupID", "abc",
message.getStringProperty("JMSXGroupID"));
+ // FIXME do we support it?
+ //Assert.assertEquals("GroupID", "abc",
amqMessage.getGroupID());
+ }
+
+ public void testSubscribeWithAutoAck() throws Exception {
+
+ String frame =
+ "CONNECT\n" +
+ "login: brianm\n" +
+ "passcode: wombats\n\n" +
+ Stomp.NULL;
+ sendFrame(frame);
+
+ frame = receiveFrame(100000);
+ Assert.assertTrue(frame.startsWith("CONNECTED"));
+
+ frame =
+ "SUBSCRIBE\n" +
+ "destination:/queue/" + getQueueName() + "\n"
+
+ "ack:auto\n\n" +
+ Stomp.NULL;
+ sendFrame(frame);
+
+ sendMessage(getName());
+
+ frame = receiveFrame(10000);
+ Assert.assertTrue(frame.startsWith("MESSAGE"));
+
+ frame =
+ "DISCONNECT\n" +
+ "\n\n" +
+ Stomp.NULL;
+ sendFrame(frame);
+ }
+
+ public void testSubscribeWithAutoAckAndBytesMessage() throws Exception {
+
+ String frame =
+ "CONNECT\n" +
+ "login: brianm\n" +
+ "passcode: wombats\n\n" +
+ Stomp.NULL;
+ sendFrame(frame);
+
+ frame = receiveFrame(100000);
+ Assert.assertTrue(frame.startsWith("CONNECTED"));
+
+ frame =
+ "SUBSCRIBE\n" +
+ "destination:/queue/" + getQueueName() + "\n"
+
+ "ack:auto\n\n" +
+ Stomp.NULL;
+ sendFrame(frame);
+
+ sendBytesMessage(new byte[]{1, 2, 3, 4, 5});
+
+ frame = receiveFrame(10000);
+ Assert.assertTrue(frame.startsWith("MESSAGE"));
+
+ Pattern cl = Pattern.compile("Content-length:\\s*(\\d+)",
Pattern.CASE_INSENSITIVE);
+ Matcher cl_matcher = cl.matcher(frame);
+ Assert.assertTrue(cl_matcher.find());
+ Assert.assertEquals("5", cl_matcher.group(1));
+
+ Assert.assertFalse(Pattern.compile("type:\\s*null",
Pattern.CASE_INSENSITIVE).matcher(frame).find());
+
+ frame =
+ "DISCONNECT\n" +
+ "\n\n" +
+ Stomp.NULL;
+ sendFrame(frame);
+ }
+
+ public void testSubscribeWithMessageSentWithProperties() throws Exception {
+
+ String frame =
+ "CONNECT\n" +
+ "login: brianm\n" +
+ "passcode: wombats\n\n" +
+ Stomp.NULL;
+ sendFrame(frame);
+
+ frame = receiveFrame(100000);
+ Assert.assertTrue(frame.startsWith("CONNECTED"));
+
+ frame =
+ "SUBSCRIBE\n" +
+ "destination:/queue/" + getQueueName() + "\n"
+
+ "ack:auto\n\n" +
+ Stomp.NULL;
+ sendFrame(frame);
+
+ MessageProducer producer = session.createProducer(queue);
+ TextMessage message = session.createTextMessage("Hello World");
+ message.setStringProperty("s", "value");
+ message.setBooleanProperty("n", false);
+ message.setByteProperty("byte", (byte) 9);
+ message.setDoubleProperty("d", 2.0);
+ message.setFloatProperty("f", (float) 6.0);
+ message.setIntProperty("i", 10);
+ message.setLongProperty("l", 121);
+ message.setShortProperty("s", (short) 12);
+ producer.send(message);
+
+ frame = receiveFrame(10000);
+ Assert.assertTrue(frame.startsWith("MESSAGE"));
+
+// System.out.println("out: "+frame);
+
+ frame =
+ "DISCONNECT\n" +
+ "\n\n" +
+ Stomp.NULL;
+ sendFrame(frame);
+ }
+
+ public void testMessagesAreInOrder() throws Exception {
+ int ctr = 10;
+ String[] data = new String[ctr];
+
+ String frame =
+ "CONNECT\n" +
+ "login: brianm\n" +
+ "passcode: wombats\n\n" +
+ Stomp.NULL;
+ sendFrame(frame);
+
+ frame = receiveFrame(100000);
+ Assert.assertTrue(frame.startsWith("CONNECTED"));
+
+ frame =
+ "SUBSCRIBE\n" +
+ "destination:/queue/" + getQueueName() + "\n"
+
+ "ack:auto\n\n" +
+ Stomp.NULL;
+ sendFrame(frame);
+
+ for (int i = 0; i < ctr; ++i) {
+ data[i] = getName() + i;
+ sendMessage(data[i]);
+ }
+
+ for (int i = 0; i < ctr; ++i) {
+ frame = receiveFrame(1000);
+ Assert.assertTrue("Message not in order", frame.indexOf(data[i])
>= 0);
+ }
+
+ // sleep a while before publishing another set of messages
+ waitForFrameToTakeEffect();
+
+ for (int i = 0; i < ctr; ++i) {
+ data[i] = getName() + ":second:" + i;
+ sendMessage(data[i]);
+ }
+
+ for (int i = 0; i < ctr; ++i) {
+ frame = receiveFrame(1000);
+ Assert.assertTrue("Message not in order", frame.indexOf(data[i])
>= 0);
+ }
+
+ frame =
+ "DISCONNECT\n" +
+ "\n\n" +
+ Stomp.NULL;
+ sendFrame(frame);
+ }
+
+ public void testSubscribeWithAutoAckAndSelector() throws Exception {
+
+ String frame =
+ "CONNECT\n" +
+ "login: brianm\n" +
+ "passcode: wombats\n\n" +
+ Stomp.NULL;
+ sendFrame(frame);
+
+ frame = receiveFrame(100000);
+ Assert.assertTrue(frame.startsWith("CONNECTED"));
+
+ frame =
+ "SUBSCRIBE\n" +
+ "destination:/queue/" + getQueueName() + "\n"
+
+ "selector: foo = 'zzz'\n" +
+ "ack:auto\n\n" +
+ Stomp.NULL;
+ sendFrame(frame);
+
+ sendMessage("Ignored message", "foo", "1234");
+ sendMessage("Real message", "foo", "zzz");
+
+ frame = receiveFrame(10000);
+ Assert.assertTrue(frame.startsWith("MESSAGE"));
+ Assert.assertTrue("Should have received the real message but got: " +
frame, frame.indexOf("Real message") > 0);
+
+ frame =
+ "DISCONNECT\n" +
+ "\n\n" +
+ Stomp.NULL;
+ sendFrame(frame);
+ }
+
+ public void testSubscribeWithClientAck() throws Exception {
+
+ String frame =
+ "CONNECT\n" +
+ "login: brianm\n" +
+ "passcode: wombats\n\n" +
+ Stomp.NULL;
+ sendFrame(frame);
+
+ frame = receiveFrame(10000);
+ Assert.assertTrue(frame.startsWith("CONNECTED"));
+
+ frame =
+ "SUBSCRIBE\n" +
+ "destination:/queue/" + getQueueName() + "\n"
+
+ "ack:client\n\n" +
+ Stomp.NULL;
+
+ sendFrame(frame);
+ sendMessage(getName());
+ frame = receiveFrame(10000);
+ Assert.assertTrue(frame.startsWith("MESSAGE"));
+
+ frame =
+ "DISCONNECT\n" +
+ "\n\n" +
+ Stomp.NULL;
+ sendFrame(frame);
+
+ // message should be received since message was not acknowledged
+ MessageConsumer consumer = session.createConsumer(queue);
+ TextMessage message = (TextMessage) consumer.receive(1000);
+ Assert.assertNotNull(message);
+ Assert.assertTrue(message.getJMSRedelivered());
+ }
+
+ public void
testSubscribeWithClientAckThenConsumingAgainWithAutoAckWithNoDisconnectFrame() throws
Exception {
+ assertSubscribeWithClientAckThenConsumeWithAutoAck(false);
+ }
+
+ public void
testSubscribeWithClientAckThenConsumingAgainWithAutoAckWithExplicitDisconnect() throws
Exception {
+ assertSubscribeWithClientAckThenConsumeWithAutoAck(true);
+ }
+
+ protected void assertSubscribeWithClientAckThenConsumeWithAutoAck(boolean
sendDisconnect) throws Exception {
+
+ String frame =
+ "CONNECT\n" +
+ "login: brianm\n" +
+ "passcode: wombats\n\n" +
+ Stomp.NULL;
+ sendFrame(frame);
+
+ frame = receiveFrame(10000);
+ Assert.assertTrue(frame.startsWith("CONNECTED"));
+
+ frame =
+ "SUBSCRIBE\n" +
+ "destination:/queue/" + getQueueName() + "\n"
+
+ "ack:client\n\n" +
+ Stomp.NULL;
+
+ sendFrame(frame);
+ sendMessage(getName());
+
+ frame = receiveFrame(10000);
+ Assert.assertTrue(frame.startsWith("MESSAGE"));
+
+ log.info("Reconnecting!");
+
+ if (sendDisconnect) {
+ frame =
+ "DISCONNECT\n" +
+ "\n\n" +
+ Stomp.NULL;
+ sendFrame(frame);
+ reconnect();
+ }
+ else {
+ reconnect(1000);
+ }
+
+
+ // message should be received since message was not acknowledged
+ frame =
+ "CONNECT\n" +
+ "login: brianm\n" +
+ "passcode: wombats\n\n" +
+ Stomp.NULL;
+ sendFrame(frame);
+
+ frame = receiveFrame(10000);
+ Assert.assertTrue(frame.startsWith("CONNECTED"));
+
+ frame =
+ "SUBSCRIBE\n" +
+ "destination:/queue/" + getQueueName() +
"\n\n" +
+ Stomp.NULL;
+
+ sendFrame(frame);
+
+ frame = receiveFrame(10000);
+ Assert.assertTrue(frame.startsWith("MESSAGE"));
+
+ frame =
+ "DISCONNECT\n" +
+ "\n\n" +
+ Stomp.NULL;
+ sendFrame(frame);
+
+ // now lets make sure we don't see the message again
+ reconnect();
+
+ frame =
+ "CONNECT\n" +
+ "login: brianm\n" +
+ "passcode: wombats\n\n" +
+ Stomp.NULL;
+ sendFrame(frame);
+
+ frame = receiveFrame(10000);
+ Assert.assertTrue(frame.startsWith("CONNECTED"));
+
+ frame =
+ "SUBSCRIBE\n" +
+ "destination:/queue/" + getQueueName() +
"\n\n" +
+ Stomp.NULL;
+
+ sendFrame(frame);
+ sendMessage("shouldBeNextMessage");
+
+ frame = receiveFrame(10000);
+ Assert.assertTrue(frame.startsWith("MESSAGE"));
+ Assert.assertTrue(frame.contains("shouldBeNextMessage"));
+ }
+
+ public void testUnsubscribe() throws Exception {
+
+ String frame =
+ "CONNECT\n" +
+ "login: brianm\n" +
+ "passcode: wombats\n\n" +
+ Stomp.NULL;
+ sendFrame(frame);
+ frame = receiveFrame(100000);
+ Assert.assertTrue(frame.startsWith("CONNECTED"));
+
+ frame =
+ "SUBSCRIBE\n" +
+ "destination:/queue/" + getQueueName() + "\n"
+
+ "ack:auto\n\n" +
+ Stomp.NULL;
+ sendFrame(frame);
+
+ //send a message to our queue
+ sendMessage("first message");
+
+ //receive message from socket
+ frame = receiveFrame(1000);
+ Assert.assertTrue(frame.startsWith("MESSAGE"));
+
+ //remove suscription
+ frame =
+ "UNSUBSCRIBE\n" +
+ "destination:/queue/" + getQueueName() + "\n"
+
+ "\n\n" +
+ Stomp.NULL;
+ sendFrame(frame);
+
+ waitForFrameToTakeEffect();
+
+ //send a message to our queue
+ sendMessage("second message");
+
+ try {
+ frame = receiveFrame(1000);
+ log.info("Received frame: " + frame);
+ Assert.fail("No message should have been received since subscription was
removed");
+ }
+ catch (SocketTimeoutException e) {
+
+ }
+ }
+
+ public void testTransactionCommit() throws Exception {
+ MessageConsumer consumer = session.createConsumer(queue);
+
+ String frame =
+ "CONNECT\n" +
+ "login: brianm\n" +
+ "passcode: wombats\n\n" +
+ Stomp.NULL;
+ sendFrame(frame);
+
+ String f = receiveFrame(1000);
+ Assert.assertTrue(f.startsWith("CONNECTED"));
+
+ frame =
+ "BEGIN\n" +
+ "transaction: tx1\n" +
+ "\n\n" +
+ Stomp.NULL;
+ sendFrame(frame);
+
+ frame =
+ "SEND\n" +
+ "destination:/queue/" + getQueueName() + "\n"
+
+ "transaction: tx1\n" +
+ "\n\n" +
+ "Hello World" +
+ Stomp.NULL;
+ sendFrame(frame);
+
+ frame =
+ "COMMIT\n" +
+ "transaction: tx1\n" +
+ "\n\n" +
+ Stomp.NULL;
+ sendFrame(frame);
+
+ waitForFrameToTakeEffect();
+
+ TextMessage message = (TextMessage) consumer.receive(1000);
+ Assert.assertNotNull("Should have received a message", message);
+ }
+
+ public void testTransactionRollback() throws Exception {
+ MessageConsumer consumer = session.createConsumer(queue);
+
+ String frame =
+ "CONNECT\n" +
+ "login: brianm\n" +
+ "passcode: wombats\n\n" +
+ Stomp.NULL;
+ sendFrame(frame);
+
+ String f = receiveFrame(1000);
+ Assert.assertTrue(f.startsWith("CONNECTED"));
+
+ frame =
+ "BEGIN\n" +
+ "transaction: tx1\n" +
+ "\n\n" +
+ Stomp.NULL;
+ sendFrame(frame);
+
+ frame =
+ "SEND\n" +
+ "destination:/queue/" + getQueueName() + "\n"
+
+ "transaction: tx1\n" +
+ "\n" +
+ "first message" +
+ Stomp.NULL;
+ sendFrame(frame);
+
+ //rollback first message
+ frame =
+ "ABORT\n" +
+ "transaction: tx1\n" +
+ "\n\n" +
+ Stomp.NULL;
+ sendFrame(frame);
+
+ frame =
+ "BEGIN\n" +
+ "transaction: tx1\n" +
+ "\n\n" +
+ Stomp.NULL;
+ sendFrame(frame);
+
+ frame =
+ "SEND\n" +
+ "destination:/queue/" + getQueueName() + "\n"
+
+ "transaction: tx1\n" +
+ "\n" +
+ "second message" +
+ Stomp.NULL;
+ sendFrame(frame);
+
+ frame =
+ "COMMIT\n" +
+ "transaction: tx1\n" +
+ "\n\n" +
+ Stomp.NULL;
+ sendFrame(frame);
+
+ // This test case is currently failing
+ waitForFrameToTakeEffect();
+
+ //only second msg should be received since first msg was rolled back
+ TextMessage message = (TextMessage) consumer.receive(1000);
+ Assert.assertNotNull(message);
+ Assert.assertEquals("second message", message.getText().trim());
+ }
+
+ // Implementation methods
+ //-------------------------------------------------------------------------
+ protected void setUp() throws Exception {
+ server = createServer();
+ server.start();
+ connectionFactory = createConnectionFactory();
+
+ stompSocket = createSocket();
+ inputBuffer = new ByteArrayOutputStream();
+
+ connection = connectionFactory.createConnection();
+ session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ queue = session.createQueue(getQueueName());
+ connection.start();
+ }
+
+ /**
+ * @return
+ * @throws Exception
+ */
+ private JMSServerManager createServer() throws Exception
+ {
+ Configuration config = new ConfigurationImpl();
+ config.setSecurityEnabled(false);
+
+ Map<String, Object> params = new HashMap<String, Object>();
+ params.put(TransportConstants.PROTOCOL_PROP_NAME,
TransportConstants.STOMP_PROTOCOL);
+ params.put(TransportConstants.PORT_PROP_NAME,
TransportConstants.DEFAULT_STOMP_PORT);
+ TransportConfiguration stompTransport = new
TransportConfiguration(NettyAcceptorFactory.class.getName(), params);
+ config.getAcceptorConfigurations().add(stompTransport);
+ config.getAcceptorConfigurations().add(new
TransportConfiguration(InVMAcceptorFactory.class.getName()));
+ HornetQServer hornetQServer = HornetQServers.newHornetQServer(config);
+
+ JMSConfiguration jmsConfig = new JMSConfigurationImpl();
+ jmsConfig.getQueueConfigurations().add(new QueueConfigurationImpl(getQueueName(),
null, false, getQueueName()));
+ server = new JMSServerManagerImpl(hornetQServer, jmsConfig);
+ server.setContext(null);
+ return server;
+ }
+
+ protected void tearDown() throws Exception {
+ connection.close();
+ if (stompSocket != null) {
+ stompSocket.close();
+ }
+ server.stop();
+ }
+
+ protected void reconnect() throws Exception {
+ reconnect(0);
+ }
+ protected void reconnect(long sleep) throws Exception {
+ stompSocket.close();
+
+ if (sleep > 0) {
+ Thread.sleep(sleep);
+ }
+
+ stompSocket = createSocket();
+ inputBuffer = new ByteArrayOutputStream();
+ }
+
+ protected ConnectionFactory createConnectionFactory() {
+ return new HornetQConnectionFactory(new
TransportConfiguration(InVMConnectorFactory.class.getName()));
+ }
+
+ protected Socket createSocket() throws IOException {
+ return new Socket("127.0.0.1", port);
+ }
+
+ protected String getQueueName() {
+ return "test";
+ }
+
+ public void sendFrame(String data) throws Exception {
+ byte[] bytes = data.getBytes("UTF-8");
+ OutputStream outputStream = stompSocket.getOutputStream();
+ for (int i = 0; i < bytes.length; i++) {
+ outputStream.write(bytes[i]);
+ }
+ outputStream.flush();
+ }
+
+ public String receiveFrame(long timeOut) throws Exception {
+ stompSocket.setSoTimeout((int) timeOut);
+ InputStream is = stompSocket.getInputStream();
+ int c = 0;
+ for (; ;) {
+ c = is.read();
+ System.out.println(c);
+ if (c < 0) {
+ throw new IOException("socket closed.");
+ }
+ else if (c == 0) {
+ c = is.read();
+ Assert.assertEquals("Expecting stomp frame to terminate with
\0\n", c, '\n');
+ byte[] ba = inputBuffer.toByteArray();
+ inputBuffer.reset();
+ return new String(ba, "UTF-8");
+ }
+ else {
+ inputBuffer.write(c);
+ }
+ }
+ }
+
+ public void sendMessage(String msg) throws Exception {
+ sendMessage(msg, "foo", "xyz");
+ }
+
+ public void sendMessage(String msg, String propertyName, String propertyValue) throws
JMSException {
+ MessageProducer producer = session.createProducer(queue);
+ TextMessage message = session.createTextMessage(msg);
+ message.setStringProperty(propertyName, propertyValue);
+ producer.send(message);
+ }
+
+ public void sendBytesMessage(byte[] msg) throws Exception {
+ MessageProducer producer = session.createProducer(queue);
+ BytesMessage message = session.createBytesMessage();
+ message.writeBytes(msg);
+ producer.send(message);
+ }
+
+ protected void waitForFrameToTakeEffect() throws InterruptedException {
+ // bit of a dirty hack :)
+ // another option would be to force some kind of receipt to be returned
+ // from the frame
+ Thread.sleep(2000);
+ }
+}
Added:
branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/integration/stomp/StompTest2.java
===================================================================
---
branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/integration/stomp/StompTest2.java
(rev 0)
+++
branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/integration/stomp/StompTest2.java 2010-01-19
16:04:31 UTC (rev 8807)
@@ -0,0 +1,90 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.integration.stomp;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.hornetq.api.core.TransportConfiguration;
+import org.hornetq.core.config.Configuration;
+import org.hornetq.core.config.impl.ConfigurationImpl;
+import org.hornetq.core.server.HornetQServer;
+import org.hornetq.core.server.HornetQServers;
+import org.hornetq.integration.transports.netty.NettyAcceptor;
+import org.hornetq.integration.transports.netty.NettyAcceptorFactory;
+import org.hornetq.integration.transports.netty.TransportConstants;
+
+import junit.framework.TestCase;
+
+/**
+ * A StompTest
+ *
+ * @author jmesnil
+ *
+ *
+ */
+public class StompTest2 extends TestCase
+{
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ private HornetQServer server;
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ public void testFoo() throws Exception
+ {
+ Thread.sleep(10);
+ }
+
+ // Package protected ---------------------------------------------
+
+ @Override
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+
+ Configuration config = new ConfigurationImpl();
+ config.setSecurityEnabled(false);
+
+ Map<String, Object> params = new HashMap<String, Object>();
+ params.put(TransportConstants.PROTOCOL_PROP_NAME,
TransportConstants.STOMP_PROTOCOL);
+ params.put(TransportConstants.PORT_PROP_NAME,
TransportConstants.DEFAULT_STOMP_PORT);
+ TransportConfiguration stompTransport = new
TransportConfiguration(NettyAcceptorFactory.class.getName(), params);
+ config.getAcceptorConfigurations().add(stompTransport);
+
+ server = HornetQServers.newHornetQServer(config);
+ server.start();
+ }
+
+ @Override
+ protected void tearDown() throws Exception
+ {
+ server.stop();
+
+ super.tearDown();
+ }
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
Modified:
branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyAcceptorFactoryTest.java
===================================================================
---
branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyAcceptorFactoryTest.java 2010-01-19
15:48:47 UTC (rev 8806)
+++
branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyAcceptorFactoryTest.java 2010-01-19
16:04:31 UTC (rev 8807)
@@ -21,6 +21,7 @@
import org.hornetq.api.core.HornetQBuffer;
import org.hornetq.api.core.HornetQException;
import org.hornetq.core.config.impl.ConfigurationImpl;
+import org.hornetq.core.remoting.PacketDecoder;
import org.hornetq.core.remoting.impl.AbstractBufferHandler;
import org.hornetq.integration.transports.netty.NettyAcceptor;
import org.hornetq.integration.transports.netty.NettyAcceptorFactory;
@@ -47,7 +48,7 @@
BufferHandler handler = new AbstractBufferHandler()
{
- public void bufferReceived(final Object connectionID, final HornetQBuffer
buffer)
+ public void bufferReceived(Object connectionID, HornetQBuffer buffer,
PacketDecoder decoder)
{
}
};
Modified:
branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyAcceptorTest.java
===================================================================
---
branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyAcceptorTest.java 2010-01-19
15:48:47 UTC (rev 8806)
+++
branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyAcceptorTest.java 2010-01-19
16:04:31 UTC (rev 8807)
@@ -22,6 +22,7 @@
import org.hornetq.api.core.HornetQBuffer;
import org.hornetq.api.core.HornetQException;
import org.hornetq.core.config.impl.ConfigurationImpl;
+import org.hornetq.core.remoting.PacketDecoder;
import org.hornetq.core.remoting.impl.AbstractBufferHandler;
import org.hornetq.integration.transports.netty.NettyAcceptor;
import org.hornetq.integration.transports.netty.TransportConstants;
@@ -60,7 +61,7 @@
BufferHandler handler = new AbstractBufferHandler()
{
- public void bufferReceived(final Object connectionID, final HornetQBuffer
buffer)
+ public void bufferReceived(final Object connectionID, final HornetQBuffer
buffer, final PacketDecoder decoder)
{
}
};
Modified:
branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyConnectorTest.java
===================================================================
---
branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyConnectorTest.java 2010-01-19
15:48:47 UTC (rev 8806)
+++
branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyConnectorTest.java 2010-01-19
16:04:31 UTC (rev 8807)
@@ -21,6 +21,7 @@
import org.hornetq.api.core.HornetQBuffer;
import org.hornetq.api.core.HornetQException;
+import org.hornetq.core.remoting.PacketDecoder;
import org.hornetq.core.remoting.impl.AbstractBufferHandler;
import org.hornetq.integration.transports.netty.NettyConnector;
import org.hornetq.spi.core.remoting.BufferHandler;
@@ -51,7 +52,7 @@
{
BufferHandler handler = new AbstractBufferHandler()
{
- public void bufferReceived(final Object connectionID, final HornetQBuffer
buffer)
+ public void bufferReceived(final Object connectionID, final HornetQBuffer
buffer, final PacketDecoder decoder)
{
}
};
@@ -88,7 +89,7 @@
{
BufferHandler handler = new AbstractBufferHandler()
{
- public void bufferReceived(final Object connectionID, final HornetQBuffer
buffer)
+ public void bufferReceived(final Object connectionID, final HornetQBuffer
buffer, final PacketDecoder decoder)
{
}
};