[hornetq-commits] JBoss hornetq SVN: r8807 - in branches/HORNETQ-129_STOMP_protocol: src/main/org/hornetq/core/client/impl and 12 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Tue Jan 19 11:04:33 EST 2010


Author: jmesnil
Date: 2010-01-19 11:04:31 -0500 (Tue, 19 Jan 2010)
New Revision: 8807

Added:
   branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/remoting/PacketDecoder.java
   branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/remoting/impl/CorePacketDecoder.java
   branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/stomp/
   branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/stomp/ProtocolConverter.java
   branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/stomp/ProtocolException.java
   branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/stomp/Stomp.java
   branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/stomp/StompFrame.java
   branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/stomp/StompFrameDelimiter.java
   branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/stomp/StompFrameError.java
   branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/stomp/StompMarshaller.java
   branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/stomp/StompPacketDecoder.java
   branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/integration/stomp/
   branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/integration/stomp/StompTest.java
   branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/integration/stomp/StompTest2.java
Removed:
   branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/remoting/impl/PacketDecoder.java
Modified:
   branches/HORNETQ-129_STOMP_protocol/.classpath
   branches/HORNETQ-129_STOMP_protocol/build-hornetq.xml
   branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/client/impl/FailoverManagerImpl.java
   branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/remoting/impl/RemotingConnectionImpl.java
   branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/remoting/impl/invm/InVMConnection.java
   branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java
   branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/ChannelPipelineSupport.java
   branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/HornetQChannelHandler.java
   branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/NettyAcceptor.java
   branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/NettyConnection.java
   branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/NettyConnector.java
   branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/TransportConstants.java
   branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/spi/core/remoting/BufferHandler.java
   branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/integration/http/NettyHttpTest.java
   branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyAcceptorFactoryTest.java
   branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyAcceptorTest.java
   branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyConnectorTest.java
Log:
https://jira.jboss.org/jira/browse/HORNETQ-129: Implement STOMP v1.0

* incomplete prototype
* copied StompTest from StompConnect project and adapt it to use HornetQ server

Modified: branches/HORNETQ-129_STOMP_protocol/.classpath
===================================================================
--- branches/HORNETQ-129_STOMP_protocol/.classpath	2010-01-19 15:48:47 UTC (rev 8806)
+++ branches/HORNETQ-129_STOMP_protocol/.classpath	2010-01-19 16:04:31 UTC (rev 8807)
@@ -100,7 +100,7 @@
 	<classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER"/>
 	<classpathentry kind="lib" path="tests/tmpfiles"/>
 	<classpathentry kind="lib" path="thirdparty/net/java/dev/javacc/lib/javacc.jar"/>
-	<classpathentry kind="lib" path="thirdparty/org/jboss/netty/lib/netty.jar"/>
+	<classpathentry kind="lib" path="thirdparty/org/jboss/netty/lib/netty.jar" sourcepath="/Users/jmesnil/.m2/repository/org/jboss/netty/netty/3.1.5.GA/netty-3.1.5.GA-sources.jar"/>
 	<classpathentry kind="lib" path="thirdparty/log4j/lib/log4j.jar"/>
 	<classpathentry kind="lib" path="thirdparty/org/jboss/naming/lib/jnpserver.jar"/>
 	<classpathentry kind="lib" path="thirdparty/org/jboss/security/lib/jbosssx.jar"/>

Modified: branches/HORNETQ-129_STOMP_protocol/build-hornetq.xml
===================================================================
--- branches/HORNETQ-129_STOMP_protocol/build-hornetq.xml	2010-01-19 15:48:47 UTC (rev 8806)
+++ branches/HORNETQ-129_STOMP_protocol/build-hornetq.xml	2010-01-19 16:04:31 UTC (rev 8807)
@@ -1511,7 +1511,7 @@
          <jvmarg value="-Dcom.sun.management.jmxremote"/>
          <jvmarg value="-Djava.util.logging.config.file=${src.config.trunk.non-clustered.dir}/logging.properties"/>
          <jvmarg value="-Djava.library.path=${native.bin.dir}"/>
-         <!--<jvmarg line="-Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=y,address=8000"/>-->
+         <!-- <jvmarg line="-Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=y,address=8000"/> -->
          <arg line="hornetq-beans.xml"/>
          <classpath path="${src.config.trunk.non-clustered.dir}" />
          <classpath refid="jms.standalone.server.classpath"/>
@@ -1540,23 +1540,20 @@
    </target>
 
    <target name="debugServer" depends="jar">
+      <mkdir dir="logs"/>
       <java classname="org.hornetq.integration.bootstrap.HornetQBootstrapServer" fork="true">
          <jvmarg value="-XX:+UseParallelGC"/>
          <jvmarg value="-Xms512M"/>
          <jvmarg value="-Xmx2048M"/>
          <jvmarg value="-XX:+AggressiveOpts"/>
          <jvmarg value="-XX:+UseFastAccessorMethods"/>
-         <jvmarg value="-Xdebug"/>
-         <jvmarg value="-Xnoagent"/>
-         <jvmarg value="-Djava.compiler=NONE"/>
          <jvmarg value="-Dcom.sun.management.jmxremote"/>
-         <jvmarg value="-Xrunjdwp:transport=dt_socket,server=y,suspend=y,address=8000"/>
-         <jvmarg value="-Djava.util.logging.config.file=${src.config.standalone.non-clustered.dir}/logging.properties"/>
+         <jvmarg value="-Djava.util.logging.config.file=${src.config.trunk.non-clustered.dir}/logging.properties"/>
          <jvmarg value="-Djava.library.path=${native.bin.dir}"/>
-         <jvmarg value="-Djava.naming.factory.initial=org.jnp.interfaces.NamingContextFactory"/>
-         <jvmarg value="-Djava.naming.factory.url.pkgs=org.jboss.naming:org.jnp.interfaces"/>
+         <jvmarg line="-Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=y,address=8000"/>
          <arg line="hornetq-beans.xml"/>
-         <classpath path="${src.config.standalone.non-clustered.dir}" />
+         <classpath path="${src.config.trunk.non-clustered.dir}" />
+         <classpath refid="jms.standalone.server.classpath"/>
       </java>
    </target>
 

Modified: branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/client/impl/FailoverManagerImpl.java
===================================================================
--- branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/client/impl/FailoverManagerImpl.java	2010-01-19 15:48:47 UTC (rev 8806)
+++ branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/client/impl/FailoverManagerImpl.java	2010-01-19 16:04:31 UTC (rev 8807)
@@ -39,8 +39,10 @@
 import org.hornetq.core.remoting.ChannelHandler;
 import org.hornetq.core.remoting.FailureListener;
 import org.hornetq.core.remoting.Packet;
+import org.hornetq.core.remoting.PacketDecoder;
 import org.hornetq.core.remoting.RemotingConnection;
 import org.hornetq.core.remoting.impl.AbstractBufferHandler;
+import org.hornetq.core.remoting.impl.CorePacketDecoder;
 import org.hornetq.core.remoting.impl.RemotingConnectionImpl;
 import org.hornetq.core.remoting.impl.wireformat.CreateSessionMessage;
 import org.hornetq.core.remoting.impl.wireformat.CreateSessionResponseMessage;
@@ -529,6 +531,8 @@
 
    private volatile boolean stopPingingAfterOne;
 
+   private final PacketDecoder decoder = new CorePacketDecoder();
+
    public void stopPingingAfterOne()
    {
       stopPingingAfterOne = true;
@@ -1086,13 +1090,13 @@
 
    private class DelegatingBufferHandler extends AbstractBufferHandler
    {
-      public void bufferReceived(final Object connectionID, final HornetQBuffer buffer)
+      public void bufferReceived(final Object connectionID, final HornetQBuffer buffer, final PacketDecoder decoder)
       {
          RemotingConnection theConn = connection;
 
          if (theConn != null && connectionID == theConn.getID())
          {
-            theConn.bufferReceived(connectionID, buffer);
+            theConn.bufferReceived(connectionID, buffer, decoder );
          }
       }
    }

Copied: branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/remoting/PacketDecoder.java (from rev 8798, trunk/src/main/org/hornetq/core/remoting/impl/PacketDecoder.java)
===================================================================
--- branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/remoting/PacketDecoder.java	                        (rev 0)
+++ branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/remoting/PacketDecoder.java	2010-01-19 16:04:31 UTC (rev 8807)
@@ -0,0 +1,30 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.remoting;
+
+import org.hornetq.api.core.HornetQBuffer;
+
+/**
+ * A PacketDecoder
+ *
+ * @author jmesnil
+ *
+ *
+ */
+public interface PacketDecoder
+{
+
+   public abstract Packet decode(final HornetQBuffer in);
+
+}
\ No newline at end of file

Copied: branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/remoting/impl/CorePacketDecoder.java (from rev 8798, trunk/src/main/org/hornetq/core/remoting/impl/PacketDecoder.java)
===================================================================
--- branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/remoting/impl/CorePacketDecoder.java	                        (rev 0)
+++ branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/remoting/impl/CorePacketDecoder.java	2010-01-19 16:04:31 UTC (rev 8807)
@@ -0,0 +1,495 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.remoting.impl;
+
+import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.CREATESESSION;
+import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.CREATESESSION_RESP;
+import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.CREATE_QUEUE;
+import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.CREATE_REPLICATION;
+import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.DELETE_QUEUE;
+import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.DISCONNECT;
+import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.EXCEPTION;
+import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.NULL_RESPONSE;
+import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.PACKETS_CONFIRMED;
+import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.PING;
+import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.REATTACH_SESSION;
+import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.REATTACH_SESSION_RESP;
+import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.REPLICATION_APPEND;
+import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.REPLICATION_APPEND_TX;
+import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.REPLICATION_COMMIT_ROLLBACK;
+import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.REPLICATION_COMPARE_DATA;
+import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.REPLICATION_DELETE;
+import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.REPLICATION_DELETE_TX;
+import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.REPLICATION_LARGE_MESSAGE_BEGIN;
+import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.REPLICATION_LARGE_MESSAGE_END;
+import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.REPLICATION_LARGE_MESSAGE_WRITE;
+import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.REPLICATION_PAGE_EVENT;
+import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.REPLICATION_PAGE_WRITE;
+import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.REPLICATION_PREPARE;
+import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.REPLICATION_RESPONSE;
+import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_ACKNOWLEDGE;
+import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_BINDINGQUERY;
+import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_BINDINGQUERY_RESP;
+import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_CLOSE;
+import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_COMMIT;
+import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_CONSUMER_CLOSE;
+import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_CREATECONSUMER;
+import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_EXPIRED;
+import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_FLOWTOKEN;
+import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_FORCE_CONSUMER_DELIVERY;
+import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_PRODUCER_CREDITS;
+import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_PRODUCER_REQUEST_CREDITS;
+import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_QUEUEQUERY;
+import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_QUEUEQUERY_RESP;
+import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_RECEIVE_CONTINUATION;
+import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_RECEIVE_LARGE_MSG;
+import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_RECEIVE_MSG;
+import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_ROLLBACK;
+import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_SEND;
+import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_SEND_CONTINUATION;
+import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_SEND_LARGE;
+import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_START;
+import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_STOP;
+import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_XA_COMMIT;
+import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_XA_END;
+import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_XA_FORGET;
+import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_XA_GET_TIMEOUT;
+import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_XA_GET_TIMEOUT_RESP;
+import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_XA_INDOUBT_XIDS;
+import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_XA_INDOUBT_XIDS_RESP;
+import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_XA_JOIN;
+import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_XA_PREPARE;
+import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_XA_RESP;
+import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_XA_RESUME;
+import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_XA_ROLLBACK;
+import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_XA_SET_TIMEOUT;
+import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_XA_SET_TIMEOUT_RESP;
+import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_XA_START;
+import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_XA_SUSPEND;
+
+import org.hornetq.api.core.HornetQBuffer;
+import org.hornetq.core.logging.Logger;
+import org.hornetq.core.remoting.Packet;
+import org.hornetq.core.remoting.PacketDecoder;
+import org.hornetq.core.remoting.impl.wireformat.CreateQueueMessage;
+import org.hornetq.core.remoting.impl.wireformat.CreateReplicationSessionMessage;
+import org.hornetq.core.remoting.impl.wireformat.CreateSessionMessage;
+import org.hornetq.core.remoting.impl.wireformat.CreateSessionResponseMessage;
+import org.hornetq.core.remoting.impl.wireformat.HornetQExceptionMessage;
+import org.hornetq.core.remoting.impl.wireformat.NullResponseMessage;
+import org.hornetq.core.remoting.impl.wireformat.PacketImpl;
+import org.hornetq.core.remoting.impl.wireformat.PacketsConfirmedMessage;
+import org.hornetq.core.remoting.impl.wireformat.Ping;
+import org.hornetq.core.remoting.impl.wireformat.ReattachSessionMessage;
+import org.hornetq.core.remoting.impl.wireformat.ReattachSessionResponseMessage;
+import org.hornetq.core.remoting.impl.wireformat.ReplicationAddMessage;
+import org.hornetq.core.remoting.impl.wireformat.ReplicationAddTXMessage;
+import org.hornetq.core.remoting.impl.wireformat.ReplicationCommitMessage;
+import org.hornetq.core.remoting.impl.wireformat.ReplicationCompareDataMessage;
+import org.hornetq.core.remoting.impl.wireformat.ReplicationDeleteMessage;
+import org.hornetq.core.remoting.impl.wireformat.ReplicationDeleteTXMessage;
+import org.hornetq.core.remoting.impl.wireformat.ReplicationLargeMessageBeingMessage;
+import org.hornetq.core.remoting.impl.wireformat.ReplicationLargeMessageWriteMessage;
+import org.hornetq.core.remoting.impl.wireformat.ReplicationLargemessageEndMessage;
+import org.hornetq.core.remoting.impl.wireformat.ReplicationPageEventMessage;
+import org.hornetq.core.remoting.impl.wireformat.ReplicationPageWriteMessage;
+import org.hornetq.core.remoting.impl.wireformat.ReplicationPrepareMessage;
+import org.hornetq.core.remoting.impl.wireformat.ReplicationResponseMessage;
+import org.hornetq.core.remoting.impl.wireformat.RollbackMessage;
+import org.hornetq.core.remoting.impl.wireformat.SessionAcknowledgeMessage;
+import org.hornetq.core.remoting.impl.wireformat.SessionBindingQueryMessage;
+import org.hornetq.core.remoting.impl.wireformat.SessionBindingQueryResponseMessage;
+import org.hornetq.core.remoting.impl.wireformat.SessionCloseMessage;
+import org.hornetq.core.remoting.impl.wireformat.SessionCommitMessage;
+import org.hornetq.core.remoting.impl.wireformat.SessionConsumerCloseMessage;
+import org.hornetq.core.remoting.impl.wireformat.SessionConsumerFlowCreditMessage;
+import org.hornetq.core.remoting.impl.wireformat.SessionCreateConsumerMessage;
+import org.hornetq.core.remoting.impl.wireformat.SessionDeleteQueueMessage;
+import org.hornetq.core.remoting.impl.wireformat.SessionExpiredMessage;
+import org.hornetq.core.remoting.impl.wireformat.SessionForceConsumerDelivery;
+import org.hornetq.core.remoting.impl.wireformat.SessionProducerCreditsMessage;
+import org.hornetq.core.remoting.impl.wireformat.SessionQueueQueryMessage;
+import org.hornetq.core.remoting.impl.wireformat.SessionQueueQueryResponseMessage;
+import org.hornetq.core.remoting.impl.wireformat.SessionReceiveContinuationMessage;
+import org.hornetq.core.remoting.impl.wireformat.SessionReceiveLargeMessage;
+import org.hornetq.core.remoting.impl.wireformat.SessionReceiveMessage;
+import org.hornetq.core.remoting.impl.wireformat.SessionRequestProducerCreditsMessage;
+import org.hornetq.core.remoting.impl.wireformat.SessionSendContinuationMessage;
+import org.hornetq.core.remoting.impl.wireformat.SessionSendLargeMessage;
+import org.hornetq.core.remoting.impl.wireformat.SessionSendMessage;
+import org.hornetq.core.remoting.impl.wireformat.SessionXACommitMessage;
+import org.hornetq.core.remoting.impl.wireformat.SessionXAEndMessage;
+import org.hornetq.core.remoting.impl.wireformat.SessionXAForgetMessage;
+import org.hornetq.core.remoting.impl.wireformat.SessionXAGetInDoubtXidsResponseMessage;
+import org.hornetq.core.remoting.impl.wireformat.SessionXAGetTimeoutResponseMessage;
+import org.hornetq.core.remoting.impl.wireformat.SessionXAJoinMessage;
+import org.hornetq.core.remoting.impl.wireformat.SessionXAPrepareMessage;
+import org.hornetq.core.remoting.impl.wireformat.SessionXAResponseMessage;
+import org.hornetq.core.remoting.impl.wireformat.SessionXAResumeMessage;
+import org.hornetq.core.remoting.impl.wireformat.SessionXARollbackMessage;
+import org.hornetq.core.remoting.impl.wireformat.SessionXASetTimeoutMessage;
+import org.hornetq.core.remoting.impl.wireformat.SessionXASetTimeoutResponseMessage;
+import org.hornetq.core.remoting.impl.wireformat.SessionXAStartMessage;
+
+/**
+ * A CorePacketDecoder
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ *
+ */
+public class CorePacketDecoder implements PacketDecoder
+{
+   private static final Logger log = Logger.getLogger(CorePacketDecoder.class);
+
+   public Packet decode(final HornetQBuffer in)
+   {
+      final byte packetType = in.readByte();
+
+      Packet packet;
+
+      switch (packetType)
+      {
+         case PING:
+         {
+            packet = new Ping();
+            break;
+         }
+         case DISCONNECT:
+         {
+            packet = new PacketImpl(PacketImpl.DISCONNECT);
+            break;
+         }
+         case EXCEPTION:
+         {
+            packet = new HornetQExceptionMessage();
+            break;
+         }
+         case PACKETS_CONFIRMED:
+         {
+            packet = new PacketsConfirmedMessage();
+            break;
+         }
+         case CREATESESSION:
+         {
+            packet = new CreateSessionMessage();
+            break;
+         }
+         case CREATESESSION_RESP:
+         {
+            packet = new CreateSessionResponseMessage();
+            break;
+         }
+         case REATTACH_SESSION:
+         {
+            packet = new ReattachSessionMessage();
+            break;
+         }
+         case REATTACH_SESSION_RESP:
+         {
+            packet = new ReattachSessionResponseMessage();
+            break;
+         }
+         case SESS_CLOSE:
+         {
+            packet = new SessionCloseMessage();
+            break;
+         }
+         case SESS_CREATECONSUMER:
+         {
+            packet = new SessionCreateConsumerMessage();
+            break;
+         }
+         case SESS_ACKNOWLEDGE:
+         {
+            packet = new SessionAcknowledgeMessage();
+            break;
+         }
+         case SESS_EXPIRED:
+         {
+            packet = new SessionExpiredMessage();
+            break;
+         }
+         case SESS_COMMIT:
+         {
+            packet = new SessionCommitMessage();
+            break;
+         }
+         case SESS_ROLLBACK:
+         {
+            packet = new RollbackMessage();
+            break;
+         }
+         case SESS_QUEUEQUERY:
+         {
+            packet = new SessionQueueQueryMessage();
+            break;
+         }
+         case SESS_QUEUEQUERY_RESP:
+         {
+            packet = new SessionQueueQueryResponseMessage();
+            break;
+         }
+         case CREATE_QUEUE:
+         {
+            packet = new CreateQueueMessage();
+            break;
+         }
+         case DELETE_QUEUE:
+         {
+            packet = new SessionDeleteQueueMessage();
+            break;
+         }
+         case SESS_BINDINGQUERY:
+         {
+            packet = new SessionBindingQueryMessage();
+            break;
+         }
+         case SESS_BINDINGQUERY_RESP:
+         {
+            packet = new SessionBindingQueryResponseMessage();
+            break;
+         }
+         case SESS_XA_START:
+         {
+            packet = new SessionXAStartMessage();
+            break;
+         }
+         case SESS_XA_END:
+         {
+            packet = new SessionXAEndMessage();
+            break;
+         }
+         case SESS_XA_COMMIT:
+         {
+            packet = new SessionXACommitMessage();
+            break;
+         }
+         case SESS_XA_PREPARE:
+         {
+            packet = new SessionXAPrepareMessage();
+            break;
+         }
+         case SESS_XA_RESP:
+         {
+            packet = new SessionXAResponseMessage();
+            break;
+         }
+         case SESS_XA_ROLLBACK:
+         {
+            packet = new SessionXARollbackMessage();
+            break;
+         }
+         case SESS_XA_JOIN:
+         {
+            packet = new SessionXAJoinMessage();
+            break;
+         }
+         case SESS_XA_SUSPEND:
+         {
+            packet = new PacketImpl(PacketImpl.SESS_XA_SUSPEND);
+            break;
+         }
+         case SESS_XA_RESUME:
+         {
+            packet = new SessionXAResumeMessage();
+            break;
+         }
+         case SESS_XA_FORGET:
+         {
+            packet = new SessionXAForgetMessage();
+            break;
+         }
+         case SESS_XA_INDOUBT_XIDS:
+         {
+            packet = new PacketImpl(PacketImpl.SESS_XA_INDOUBT_XIDS);
+            break;
+         }
+         case SESS_XA_INDOUBT_XIDS_RESP:
+         {
+            packet = new SessionXAGetInDoubtXidsResponseMessage();
+            break;
+         }
+         case SESS_XA_SET_TIMEOUT:
+         {
+            packet = new SessionXASetTimeoutMessage();
+            break;
+         }
+         case SESS_XA_SET_TIMEOUT_RESP:
+         {
+            packet = new SessionXASetTimeoutResponseMessage();
+            break;
+         }
+         case SESS_XA_GET_TIMEOUT:
+         {
+            packet = new PacketImpl(PacketImpl.SESS_XA_GET_TIMEOUT);
+            break;
+         }
+         case SESS_XA_GET_TIMEOUT_RESP:
+         {
+            packet = new SessionXAGetTimeoutResponseMessage();
+            break;
+         }
+         case SESS_START:
+         {
+            packet = new PacketImpl(PacketImpl.SESS_START);
+            break;
+         }
+         case SESS_STOP:
+         {
+            packet = new PacketImpl(PacketImpl.SESS_STOP);
+            break;
+         }
+         case SESS_FLOWTOKEN:
+         {
+            packet = new SessionConsumerFlowCreditMessage();
+            break;
+         }
+         case SESS_SEND:
+         {
+            packet = new SessionSendMessage();
+            break;
+         }
+         case SESS_SEND_LARGE:
+         {
+            packet = new SessionSendLargeMessage();
+            break;
+         }
+         case SESS_RECEIVE_MSG:
+         {
+            packet = new SessionReceiveMessage();
+            break;
+         }
+         case SESS_RECEIVE_LARGE_MSG:
+         {
+            packet = new SessionReceiveLargeMessage();
+            break;
+         }
+         case SESS_CONSUMER_CLOSE:
+         {
+            packet = new SessionConsumerCloseMessage();
+            break;
+         }
+         case NULL_RESPONSE:
+         {
+            packet = new NullResponseMessage();
+            break;
+         }
+         case SESS_RECEIVE_CONTINUATION:
+         {
+            packet = new SessionReceiveContinuationMessage();
+            break;
+         }
+         case SESS_SEND_CONTINUATION:
+         {
+            packet = new SessionSendContinuationMessage();
+            break;
+         }
+         case SESS_PRODUCER_REQUEST_CREDITS:
+         {
+            packet = new SessionRequestProducerCreditsMessage();
+            break;
+         }
+         case SESS_PRODUCER_CREDITS:
+         {
+            packet = new SessionProducerCreditsMessage();
+            break;
+         }
+         case CREATE_REPLICATION:
+         {
+            packet = new CreateReplicationSessionMessage();
+            break;
+         }
+         case REPLICATION_APPEND:
+         {
+            packet = new ReplicationAddMessage();
+            break;
+         }
+         case REPLICATION_APPEND_TX:
+         {
+            packet = new ReplicationAddTXMessage();
+            break;
+         }
+         case REPLICATION_DELETE:
+         {
+            packet = new ReplicationDeleteMessage();
+            break;
+         }
+         case REPLICATION_DELETE_TX:
+         {
+            packet = new ReplicationDeleteTXMessage();
+            break;
+         }
+         case REPLICATION_PREPARE:
+         {
+            packet = new ReplicationPrepareMessage();
+            break;
+         }
+         case REPLICATION_COMMIT_ROLLBACK:
+         {
+            packet = new ReplicationCommitMessage();
+            break;
+         }
+         case REPLICATION_RESPONSE:
+         {
+            packet = new ReplicationResponseMessage();
+            break;
+         }
+         case REPLICATION_PAGE_WRITE:
+         {
+            packet = new ReplicationPageWriteMessage();
+            break;
+         }
+         case REPLICATION_PAGE_EVENT:
+         {
+            packet = new ReplicationPageEventMessage();
+            break;
+         }
+         case REPLICATION_LARGE_MESSAGE_BEGIN:
+         {
+            packet = new ReplicationLargeMessageBeingMessage();
+            break;
+         }
+         case REPLICATION_LARGE_MESSAGE_END:
+         {
+            packet = new ReplicationLargemessageEndMessage();
+            break;
+         }
+         case REPLICATION_LARGE_MESSAGE_WRITE:
+         {
+            packet = new ReplicationLargeMessageWriteMessage();
+            break;
+         }
+         case REPLICATION_COMPARE_DATA:
+         {
+            packet = new ReplicationCompareDataMessage();
+            break;
+         }
+         case SESS_FORCE_CONSUMER_DELIVERY:
+         {
+            packet = new SessionForceConsumerDelivery();
+            break;
+         }
+         default:
+         {
+            throw new IllegalArgumentException("Invalid type: " + packetType);
+         }
+      }
+
+      packet.decode(in);
+
+      return packet;
+   }
+
+}

Deleted: branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/remoting/impl/PacketDecoder.java
===================================================================
--- branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/remoting/impl/PacketDecoder.java	2010-01-19 15:48:47 UTC (rev 8806)
+++ branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/remoting/impl/PacketDecoder.java	2010-01-19 16:04:31 UTC (rev 8807)
@@ -1,494 +0,0 @@
-/*
- * Copyright 2009 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *    http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied.  See the License for the specific language governing
- * permissions and limitations under the License.
- */
-
-package org.hornetq.core.remoting.impl;
-
-import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.CREATESESSION;
-import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.CREATESESSION_RESP;
-import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.CREATE_QUEUE;
-import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.CREATE_REPLICATION;
-import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.DELETE_QUEUE;
-import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.DISCONNECT;
-import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.EXCEPTION;
-import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.NULL_RESPONSE;
-import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.PACKETS_CONFIRMED;
-import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.PING;
-import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.REATTACH_SESSION;
-import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.REATTACH_SESSION_RESP;
-import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.REPLICATION_APPEND;
-import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.REPLICATION_APPEND_TX;
-import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.REPLICATION_COMMIT_ROLLBACK;
-import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.REPLICATION_COMPARE_DATA;
-import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.REPLICATION_DELETE;
-import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.REPLICATION_DELETE_TX;
-import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.REPLICATION_LARGE_MESSAGE_BEGIN;
-import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.REPLICATION_LARGE_MESSAGE_END;
-import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.REPLICATION_LARGE_MESSAGE_WRITE;
-import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.REPLICATION_PAGE_EVENT;
-import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.REPLICATION_PAGE_WRITE;
-import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.REPLICATION_PREPARE;
-import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.REPLICATION_RESPONSE;
-import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_ACKNOWLEDGE;
-import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_BINDINGQUERY;
-import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_BINDINGQUERY_RESP;
-import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_CLOSE;
-import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_COMMIT;
-import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_CONSUMER_CLOSE;
-import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_CREATECONSUMER;
-import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_EXPIRED;
-import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_FLOWTOKEN;
-import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_FORCE_CONSUMER_DELIVERY;
-import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_PRODUCER_CREDITS;
-import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_PRODUCER_REQUEST_CREDITS;
-import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_QUEUEQUERY;
-import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_QUEUEQUERY_RESP;
-import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_RECEIVE_CONTINUATION;
-import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_RECEIVE_LARGE_MSG;
-import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_RECEIVE_MSG;
-import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_ROLLBACK;
-import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_SEND;
-import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_SEND_CONTINUATION;
-import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_SEND_LARGE;
-import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_START;
-import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_STOP;
-import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_XA_COMMIT;
-import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_XA_END;
-import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_XA_FORGET;
-import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_XA_GET_TIMEOUT;
-import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_XA_GET_TIMEOUT_RESP;
-import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_XA_INDOUBT_XIDS;
-import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_XA_INDOUBT_XIDS_RESP;
-import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_XA_JOIN;
-import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_XA_PREPARE;
-import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_XA_RESP;
-import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_XA_RESUME;
-import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_XA_ROLLBACK;
-import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_XA_SET_TIMEOUT;
-import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_XA_SET_TIMEOUT_RESP;
-import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_XA_START;
-import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_XA_SUSPEND;
-
-import org.hornetq.api.core.HornetQBuffer;
-import org.hornetq.core.logging.Logger;
-import org.hornetq.core.remoting.Packet;
-import org.hornetq.core.remoting.impl.wireformat.CreateQueueMessage;
-import org.hornetq.core.remoting.impl.wireformat.CreateReplicationSessionMessage;
-import org.hornetq.core.remoting.impl.wireformat.CreateSessionMessage;
-import org.hornetq.core.remoting.impl.wireformat.CreateSessionResponseMessage;
-import org.hornetq.core.remoting.impl.wireformat.HornetQExceptionMessage;
-import org.hornetq.core.remoting.impl.wireformat.NullResponseMessage;
-import org.hornetq.core.remoting.impl.wireformat.PacketImpl;
-import org.hornetq.core.remoting.impl.wireformat.PacketsConfirmedMessage;
-import org.hornetq.core.remoting.impl.wireformat.Ping;
-import org.hornetq.core.remoting.impl.wireformat.ReattachSessionMessage;
-import org.hornetq.core.remoting.impl.wireformat.ReattachSessionResponseMessage;
-import org.hornetq.core.remoting.impl.wireformat.ReplicationAddMessage;
-import org.hornetq.core.remoting.impl.wireformat.ReplicationAddTXMessage;
-import org.hornetq.core.remoting.impl.wireformat.ReplicationCommitMessage;
-import org.hornetq.core.remoting.impl.wireformat.ReplicationCompareDataMessage;
-import org.hornetq.core.remoting.impl.wireformat.ReplicationDeleteMessage;
-import org.hornetq.core.remoting.impl.wireformat.ReplicationDeleteTXMessage;
-import org.hornetq.core.remoting.impl.wireformat.ReplicationLargeMessageBeingMessage;
-import org.hornetq.core.remoting.impl.wireformat.ReplicationLargeMessageWriteMessage;
-import org.hornetq.core.remoting.impl.wireformat.ReplicationLargemessageEndMessage;
-import org.hornetq.core.remoting.impl.wireformat.ReplicationPageEventMessage;
-import org.hornetq.core.remoting.impl.wireformat.ReplicationPageWriteMessage;
-import org.hornetq.core.remoting.impl.wireformat.ReplicationPrepareMessage;
-import org.hornetq.core.remoting.impl.wireformat.ReplicationResponseMessage;
-import org.hornetq.core.remoting.impl.wireformat.RollbackMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionAcknowledgeMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionBindingQueryMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionBindingQueryResponseMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionCloseMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionCommitMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionConsumerCloseMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionConsumerFlowCreditMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionCreateConsumerMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionDeleteQueueMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionExpiredMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionForceConsumerDelivery;
-import org.hornetq.core.remoting.impl.wireformat.SessionProducerCreditsMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionQueueQueryMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionQueueQueryResponseMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionReceiveContinuationMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionReceiveLargeMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionReceiveMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionRequestProducerCreditsMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionSendContinuationMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionSendLargeMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionSendMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionXACommitMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionXAEndMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionXAForgetMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionXAGetInDoubtXidsResponseMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionXAGetTimeoutResponseMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionXAJoinMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionXAPrepareMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionXAResponseMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionXAResumeMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionXARollbackMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionXASetTimeoutMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionXASetTimeoutResponseMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionXAStartMessage;
-
-/**
- * A PacketDecoder
- *
- * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- *
- *
- */
-public class PacketDecoder
-{
-   private static final Logger log = Logger.getLogger(PacketDecoder.class);
-
-   public Packet decode(final HornetQBuffer in)
-   {
-      final byte packetType = in.readByte();
-
-      Packet packet;
-
-      switch (packetType)
-      {
-         case PING:
-         {
-            packet = new Ping();
-            break;
-         }
-         case DISCONNECT:
-         {
-            packet = new PacketImpl(PacketImpl.DISCONNECT);
-            break;
-         }
-         case EXCEPTION:
-         {
-            packet = new HornetQExceptionMessage();
-            break;
-         }
-         case PACKETS_CONFIRMED:
-         {
-            packet = new PacketsConfirmedMessage();
-            break;
-         }
-         case CREATESESSION:
-         {
-            packet = new CreateSessionMessage();
-            break;
-         }
-         case CREATESESSION_RESP:
-         {
-            packet = new CreateSessionResponseMessage();
-            break;
-         }
-         case REATTACH_SESSION:
-         {
-            packet = new ReattachSessionMessage();
-            break;
-         }
-         case REATTACH_SESSION_RESP:
-         {
-            packet = new ReattachSessionResponseMessage();
-            break;
-         }
-         case SESS_CLOSE:
-         {
-            packet = new SessionCloseMessage();
-            break;
-         }
-         case SESS_CREATECONSUMER:
-         {
-            packet = new SessionCreateConsumerMessage();
-            break;
-         }
-         case SESS_ACKNOWLEDGE:
-         {
-            packet = new SessionAcknowledgeMessage();
-            break;
-         }
-         case SESS_EXPIRED:
-         {
-            packet = new SessionExpiredMessage();
-            break;
-         }
-         case SESS_COMMIT:
-         {
-            packet = new SessionCommitMessage();
-            break;
-         }
-         case SESS_ROLLBACK:
-         {
-            packet = new RollbackMessage();
-            break;
-         }
-         case SESS_QUEUEQUERY:
-         {
-            packet = new SessionQueueQueryMessage();
-            break;
-         }
-         case SESS_QUEUEQUERY_RESP:
-         {
-            packet = new SessionQueueQueryResponseMessage();
-            break;
-         }
-         case CREATE_QUEUE:
-         {
-            packet = new CreateQueueMessage();
-            break;
-         }
-         case DELETE_QUEUE:
-         {
-            packet = new SessionDeleteQueueMessage();
-            break;
-         }
-         case SESS_BINDINGQUERY:
-         {
-            packet = new SessionBindingQueryMessage();
-            break;
-         }
-         case SESS_BINDINGQUERY_RESP:
-         {
-            packet = new SessionBindingQueryResponseMessage();
-            break;
-         }
-         case SESS_XA_START:
-         {
-            packet = new SessionXAStartMessage();
-            break;
-         }
-         case SESS_XA_END:
-         {
-            packet = new SessionXAEndMessage();
-            break;
-         }
-         case SESS_XA_COMMIT:
-         {
-            packet = new SessionXACommitMessage();
-            break;
-         }
-         case SESS_XA_PREPARE:
-         {
-            packet = new SessionXAPrepareMessage();
-            break;
-         }
-         case SESS_XA_RESP:
-         {
-            packet = new SessionXAResponseMessage();
-            break;
-         }
-         case SESS_XA_ROLLBACK:
-         {
-            packet = new SessionXARollbackMessage();
-            break;
-         }
-         case SESS_XA_JOIN:
-         {
-            packet = new SessionXAJoinMessage();
-            break;
-         }
-         case SESS_XA_SUSPEND:
-         {
-            packet = new PacketImpl(PacketImpl.SESS_XA_SUSPEND);
-            break;
-         }
-         case SESS_XA_RESUME:
-         {
-            packet = new SessionXAResumeMessage();
-            break;
-         }
-         case SESS_XA_FORGET:
-         {
-            packet = new SessionXAForgetMessage();
-            break;
-         }
-         case SESS_XA_INDOUBT_XIDS:
-         {
-            packet = new PacketImpl(PacketImpl.SESS_XA_INDOUBT_XIDS);
-            break;
-         }
-         case SESS_XA_INDOUBT_XIDS_RESP:
-         {
-            packet = new SessionXAGetInDoubtXidsResponseMessage();
-            break;
-         }
-         case SESS_XA_SET_TIMEOUT:
-         {
-            packet = new SessionXASetTimeoutMessage();
-            break;
-         }
-         case SESS_XA_SET_TIMEOUT_RESP:
-         {
-            packet = new SessionXASetTimeoutResponseMessage();
-            break;
-         }
-         case SESS_XA_GET_TIMEOUT:
-         {
-            packet = new PacketImpl(PacketImpl.SESS_XA_GET_TIMEOUT);
-            break;
-         }
-         case SESS_XA_GET_TIMEOUT_RESP:
-         {
-            packet = new SessionXAGetTimeoutResponseMessage();
-            break;
-         }
-         case SESS_START:
-         {
-            packet = new PacketImpl(PacketImpl.SESS_START);
-            break;
-         }
-         case SESS_STOP:
-         {
-            packet = new PacketImpl(PacketImpl.SESS_STOP);
-            break;
-         }
-         case SESS_FLOWTOKEN:
-         {
-            packet = new SessionConsumerFlowCreditMessage();
-            break;
-         }
-         case SESS_SEND:
-         {
-            packet = new SessionSendMessage();
-            break;
-         }
-         case SESS_SEND_LARGE:
-         {
-            packet = new SessionSendLargeMessage();
-            break;
-         }
-         case SESS_RECEIVE_MSG:
-         {
-            packet = new SessionReceiveMessage();
-            break;
-         }
-         case SESS_RECEIVE_LARGE_MSG:
-         {
-            packet = new SessionReceiveLargeMessage();
-            break;
-         }
-         case SESS_CONSUMER_CLOSE:
-         {
-            packet = new SessionConsumerCloseMessage();
-            break;
-         }
-         case NULL_RESPONSE:
-         {
-            packet = new NullResponseMessage();
-            break;
-         }
-         case SESS_RECEIVE_CONTINUATION:
-         {
-            packet = new SessionReceiveContinuationMessage();
-            break;
-         }
-         case SESS_SEND_CONTINUATION:
-         {
-            packet = new SessionSendContinuationMessage();
-            break;
-         }
-         case SESS_PRODUCER_REQUEST_CREDITS:
-         {
-            packet = new SessionRequestProducerCreditsMessage();
-            break;
-         }
-         case SESS_PRODUCER_CREDITS:
-         {
-            packet = new SessionProducerCreditsMessage();
-            break;
-         }
-         case CREATE_REPLICATION:
-         {
-            packet = new CreateReplicationSessionMessage();
-            break;
-         }
-         case REPLICATION_APPEND:
-         {
-            packet = new ReplicationAddMessage();
-            break;
-         }
-         case REPLICATION_APPEND_TX:
-         {
-            packet = new ReplicationAddTXMessage();
-            break;
-         }
-         case REPLICATION_DELETE:
-         {
-            packet = new ReplicationDeleteMessage();
-            break;
-         }
-         case REPLICATION_DELETE_TX:
-         {
-            packet = new ReplicationDeleteTXMessage();
-            break;
-         }
-         case REPLICATION_PREPARE:
-         {
-            packet = new ReplicationPrepareMessage();
-            break;
-         }
-         case REPLICATION_COMMIT_ROLLBACK:
-         {
-            packet = new ReplicationCommitMessage();
-            break;
-         }
-         case REPLICATION_RESPONSE:
-         {
-            packet = new ReplicationResponseMessage();
-            break;
-         }
-         case REPLICATION_PAGE_WRITE:
-         {
-            packet = new ReplicationPageWriteMessage();
-            break;
-         }
-         case REPLICATION_PAGE_EVENT:
-         {
-            packet = new ReplicationPageEventMessage();
-            break;
-         }
-         case REPLICATION_LARGE_MESSAGE_BEGIN:
-         {
-            packet = new ReplicationLargeMessageBeingMessage();
-            break;
-         }
-         case REPLICATION_LARGE_MESSAGE_END:
-         {
-            packet = new ReplicationLargemessageEndMessage();
-            break;
-         }
-         case REPLICATION_LARGE_MESSAGE_WRITE:
-         {
-            packet = new ReplicationLargeMessageWriteMessage();
-            break;
-         }
-         case REPLICATION_COMPARE_DATA:
-         {
-            packet = new ReplicationCompareDataMessage();
-            break;
-         }
-         case SESS_FORCE_CONSUMER_DELIVERY:
-         {
-            packet = new SessionForceConsumerDelivery();
-            break;
-         }
-         default:
-         {
-            throw new IllegalArgumentException("Invalid type: " + packetType);
-         }
-      }
-
-      packet.decode(in);
-
-      return packet;
-   }
-
-}

Modified: branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/remoting/impl/RemotingConnectionImpl.java
===================================================================
--- branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/remoting/impl/RemotingConnectionImpl.java	2010-01-19 15:48:47 UTC (rev 8806)
+++ branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/remoting/impl/RemotingConnectionImpl.java	2010-01-19 16:04:31 UTC (rev 8807)
@@ -28,8 +28,8 @@
 import org.hornetq.core.remoting.CloseListener;
 import org.hornetq.core.remoting.FailureListener;
 import org.hornetq.core.remoting.Packet;
+import org.hornetq.core.remoting.PacketDecoder;
 import org.hornetq.core.remoting.RemotingConnection;
-import org.hornetq.core.remoting.impl.wireformat.PacketImpl;
 import org.hornetq.spi.core.remoting.Connection;
 import org.hornetq.utils.SimpleIDGenerator;
 
@@ -79,8 +79,6 @@
 
    private final Object failLock = new Object();
 
-   private final PacketDecoder decoder = new PacketDecoder();
-
    private volatile boolean dataReceived;
 
    private final Executor executor;
@@ -123,7 +121,7 @@
       this.interceptors = interceptors;
 
       this.client = client;
-
+      
       this.executor = executor;
    }
 
@@ -351,7 +349,7 @@
 
    private volatile boolean executing;
 
-   public void bufferReceived(final Object connectionID, final HornetQBuffer buffer)
+   public void bufferReceived(final Object connectionID, final HornetQBuffer buffer, final PacketDecoder decoder)
    {
       final Packet packet = decoder.decode(buffer);
 
@@ -391,7 +389,46 @@
      
       dataReceived = true;
    }
+   
+   public void packetReceived(final Object connectionID, final Packet packet)
+   {
+      if (packet.isAsyncExec() && executor != null)
+      {
+         executing = true;
 
+         executor.execute(new Runnable()
+         {
+            public void run()
+            {
+               try
+               {
+                  doBufferReceived(packet);
+               }
+               catch (Throwable t)
+               {
+                  RemotingConnectionImpl.log.error("Unexpected error", t);
+               }
+
+               executing = false;
+            }
+         });
+      }
+      else
+      {
+         //To prevent out of order execution if interleaving sync and async operations on same connection
+         while (executing)
+         {
+            Thread.yield();
+         }
+         
+         // Pings must always be handled out of band so we can send pings back to the client quickly
+         // otherwise they would get in the queue with everything else which might give an intolerable delay
+         doBufferReceived(packet);
+      }
+     
+      dataReceived = true;
+   }
+
    private void doBufferReceived(final Packet packet)
    {
       if (interceptors != null)

Modified: branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/remoting/impl/invm/InVMConnection.java
===================================================================
--- branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/remoting/impl/invm/InVMConnection.java	2010-01-19 15:48:47 UTC (rev 8806)
+++ branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/remoting/impl/invm/InVMConnection.java	2010-01-19 16:04:31 UTC (rev 8807)
@@ -19,6 +19,8 @@
 import org.hornetq.api.core.HornetQBuffers;
 import org.hornetq.api.core.HornetQException;
 import org.hornetq.core.logging.Logger;
+import org.hornetq.core.remoting.PacketDecoder;
+import org.hornetq.core.remoting.impl.CorePacketDecoder;
 import org.hornetq.spi.core.remoting.BufferHandler;
 import org.hornetq.spi.core.remoting.Connection;
 import org.hornetq.spi.core.remoting.ConnectionLifeCycleListener;
@@ -36,6 +38,8 @@
 
    private final BufferHandler handler;
 
+   private final PacketDecoder decoder = new CorePacketDecoder();
+   
    private final ConnectionLifeCycleListener listener;
 
    private final String id;
@@ -128,7 +132,7 @@
                   {
                      copied.readInt(); // read and discard
 
-                     handler.bufferReceived(id, copied);
+                     handler.bufferReceived(id, copied, decoder);
                   }
                }
                catch (Exception e)

Modified: branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java
===================================================================
--- branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java	2010-01-19 15:48:47 UTC (rev 8806)
+++ branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java	2010-01-19 16:04:31 UTC (rev 8807)
@@ -32,6 +32,7 @@
 import org.hornetq.core.remoting.Channel;
 import org.hornetq.core.remoting.ChannelHandler;
 import org.hornetq.core.remoting.Packet;
+import org.hornetq.core.remoting.PacketDecoder;
 import org.hornetq.core.remoting.RemotingConnection;
 import org.hornetq.core.remoting.impl.AbstractBufferHandler;
 import org.hornetq.core.remoting.impl.RemotingConnectionImpl;
@@ -41,6 +42,7 @@
 import org.hornetq.core.server.HornetQServer;
 import org.hornetq.core.server.impl.HornetQPacketHandler;
 import org.hornetq.core.server.management.ManagementService;
+import org.hornetq.integration.stomp.StompPacketDecoder;
 import org.hornetq.spi.core.remoting.Acceptor;
 import org.hornetq.spi.core.remoting.AcceptorFactory;
 import org.hornetq.spi.core.remoting.BufferHandler;
@@ -184,7 +186,7 @@
          a.start();
       }
 
-      //This thread checks connections that need to be closed, and also flushes confirmations
+      // This thread checks connections that need to be closed, and also flushes confirmations
       failureCheckAndFlushThread = new FailureCheckAndFlushThread(RemotingServiceImpl.CONNECTION_TTL_CHECK_INTERVAL);
 
       failureCheckAndFlushThread.start();
@@ -423,13 +425,13 @@
 
    private final class DelegatingBufferHandler extends AbstractBufferHandler
    {
-      public void bufferReceived(final Object connectionID, final HornetQBuffer buffer)
+      public void bufferReceived(final Object connectionID, final HornetQBuffer buffer, final PacketDecoder decoder)
       {
          ConnectionEntry conn = connections.get(connectionID);
 
          if (conn != null)
          {
-            conn.connection.bufferReceived(connectionID, buffer);
+            conn.connection.bufferReceived(connectionID, buffer, decoder);
          }
       }
    }
@@ -495,9 +497,9 @@
             for (ConnectionEntry entry : connections.values())
             {
                RemotingConnection conn = entry.connection;
-               
+
                boolean flush = true;
-               
+
                if (entry.ttl != -1)
                {
                   if (now >= entry.lastCheck + entry.ttl)
@@ -505,7 +507,7 @@
                      if (!conn.checkDataReceived())
                      {
                         idsToRemove.add(conn.getID());
-                        
+
                         flush = false;
                      }
                      else
@@ -514,12 +516,12 @@
                      }
                   }
                }
-               
+
                if (flush)
                {
-                  //We flush any confirmations on the connection - this prevents idle bridges for example
-                  //sitting there with many unacked messages
-                                    
+                  // We flush any confirmations on the connection - this prevents idle bridges for example
+                  // sitting there with many unacked messages
+
                   conn.flushConfirmations();
                }
             }

Added: branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/stomp/ProtocolConverter.java
===================================================================
--- branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/stomp/ProtocolConverter.java	                        (rev 0)
+++ branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/stomp/ProtocolConverter.java	2010-01-19 16:04:31 UTC (rev 8807)
@@ -0,0 +1,86 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.integration.stomp;
+
+import java.util.Map;
+
+import org.hornetq.api.core.client.HornetQClient;
+import org.hornetq.core.remoting.Packet;
+import org.hornetq.core.remoting.impl.wireformat.CreateSessionMessage;
+import org.hornetq.core.remoting.impl.wireformat.SessionCloseMessage;
+import org.hornetq.utils.UUIDGenerator;
+import org.hornetq.utils.VersionLoader;
+
+/**
+ * A ProtocolConverter
+ *
+ * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
+ *
+ *
+ */
+public class ProtocolConverter
+{
+
+   public Packet toPacket(StompFrame frame)
+   {
+      String command = frame.getCommand();
+      Map<String, Object> headers = frame.getHeaders();
+      if (Stomp.Commands.CONNECT.equals(command))
+      {
+         String login = (String)headers.get("login");
+         String password = (String)headers.get("passcode");
+
+         String name = UUIDGenerator.getInstance().generateStringUUID();
+         long sessionChannelID = 12;
+         return new CreateSessionMessage(name,
+                                         sessionChannelID,
+                                         VersionLoader.getVersion().getIncrementingVersion(),
+                                         login,
+                                         password,
+                                         HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE,
+                                         false,
+                                         true,
+                                         true,
+                                         false,
+                                         HornetQClient.DEFAULT_CONFIRMATION_WINDOW_SIZE);
+      }
+      if (Stomp.Commands.DISCONNECT.equals(command))
+      {
+         return new SessionCloseMessage();
+      }
+      else
+      {
+         throw new RuntimeException("frame not supported: " + frame);
+      }
+   }
+
+   // Constants -----------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   // Public --------------------------------------------------------
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+
+}

Added: branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/stomp/ProtocolException.java
===================================================================
--- branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/stomp/ProtocolException.java	                        (rev 0)
+++ branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/stomp/ProtocolException.java	2010-01-19 16:04:31 UTC (rev 8807)
@@ -0,0 +1,50 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.hornetq.integration.stomp;
+
+import java.io.IOException;
+
+/**
+ * @author <a href="http://hiramchirino.com">chirino</a>
+ */
+public class ProtocolException extends IOException {
+    private static final long serialVersionUID = -2869735532997332242L;
+    private final boolean fatal;
+
+    public ProtocolException() {
+        this(null);
+    }
+
+    public ProtocolException(String s) {
+        this(s, false);
+    }
+
+    public ProtocolException(String s, boolean fatal) {
+        this(s, fatal, null);
+    }
+
+    public ProtocolException(String s, boolean fatal, Throwable cause) {
+        super(s);
+        this.fatal = fatal;
+        initCause(cause);
+    }
+
+    public boolean isFatal() {
+        return fatal;
+    }
+}

Added: branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/stomp/Stomp.java
===================================================================
--- branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/stomp/Stomp.java	                        (rev 0)
+++ branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/stomp/Stomp.java	2010-01-19 16:04:31 UTC (rev 8807)
@@ -0,0 +1,124 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.hornetq.integration.stomp;
+
+
+/**
+ * The standard verbs and headers used for the <a href="http://stomp.codehaus.org/">STOMP</a> protocol.
+ *
+ * @version $Revision: 57 $
+ */
+public interface Stomp {
+    String NULL = "\u0000";
+    String NEWLINE = "\n";
+
+    public static interface Commands {
+        String CONNECT = "CONNECT";
+        String SEND = "SEND";
+        String DISCONNECT = "DISCONNECT";
+        String SUBSCRIBE = "SUB";
+        String UNSUBSCRIBE = "UNSUB";
+        String BEGIN_TRANSACTION = "BEGIN";
+        String COMMIT_TRANSACTION = "COMMIT";
+        String ABORT_TRANSACTION = "ABORT";
+        String BEGIN = "BEGIN";
+        String COMMIT = "COMMIT";
+        String ABORT = "ABORT";
+        String ACK = "ACK";
+    }
+
+    public interface Responses {
+        String CONNECTED = "CONNECTED";
+        String ERROR = "ERROR";
+        String MESSAGE = "MESSAGE";
+        String RECEIPT = "RECEIPT";
+    }
+
+    public interface Headers {
+        String SEPERATOR = ":";
+        String RECEIPT_REQUESTED = "receipt";
+        String TRANSACTION = "transaction";
+        String CONTENT_LENGTH = "content-length";
+
+        public interface Response {
+            String RECEIPT_ID = "receipt-id";
+        }
+
+        public interface Send {
+            String DESTINATION = "destination";
+            String CORRELATION_ID = "correlation-id";
+            String REPLY_TO = "reply-to";
+            String EXPIRATION_TIME = "expires";
+            String PRIORITY = "priority";
+            String TYPE = "type";
+            Object PERSISTENT = "persistent";
+        }
+
+        public interface Message {
+            String MESSAGE_ID = "message-id";
+            String DESTINATION = "destination";
+            String CORRELATION_ID = "correlation-id";
+            String EXPIRATION_TIME = "expires";
+            String REPLY_TO = "reply-to";
+            String PRORITY = "priority";
+            String REDELIVERED = "redelivered";
+            String TIMESTAMP = "timestamp";
+            String TYPE = "type";
+            String SUBSCRIPTION = "subscription";
+        }
+
+        public interface Subscribe {
+            String DESTINATION = "destination";
+            String ACK_MODE = "ack";
+            String ID = "id";
+            String SELECTOR = "selector";
+            String DURABLE_SUBSCRIPTION_NAME = "durable-subscription-name";
+            String NO_LOCAL = "no-local";
+
+            public interface AckModeValues {
+                String AUTO = "auto";
+                String CLIENT = "client";
+            }
+        }
+
+        public interface Unsubscribe {
+            String DESTINATION = "destination";
+            String ID = "id";
+        }
+
+        public interface Connect {
+            String LOGIN = "login";
+            String PASSCODE = "passcode";
+            String CLIENT_ID = "client-id";
+            String REQUEST_ID = "request-id";
+        }
+
+        public interface Error {
+            String MESSAGE = "message";
+        }
+
+        public interface Connected {
+            String SESSION = "session";
+            String RESPONSE_ID = "response-id";
+        }
+
+        public interface Ack {
+            String MESSAGE_ID = "message-id";
+        }
+    }
+}

Added: branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/stomp/StompFrame.java
===================================================================
--- branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/stomp/StompFrame.java	                        (rev 0)
+++ branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/stomp/StompFrame.java	2010-01-19 16:04:31 UTC (rev 8807)
@@ -0,0 +1,70 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.hornetq.integration.stomp;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Represents all the data in a STOMP frame.
+ *
+ * @author <a href="http://hiramchirino.com">chirino</a>
+ */
+public class StompFrame
+{
+   private static final byte[] NO_DATA = new byte[] {};
+
+   private String command;
+
+   private Map<String, Object> headers;
+
+   private byte[] content = StompFrame.NO_DATA;
+
+   public StompFrame()
+   {
+      this.headers = new HashMap<String, Object>();
+   }
+
+   public StompFrame(String command, Map<String, Object> headers, byte[] data)
+   {
+      this.command = command;
+      this.headers = headers;
+      this.content = data;
+   }
+
+   public String getCommand()
+   {
+      return command;
+   }
+
+   public byte[] getContent()
+   {
+      return content;
+   }
+
+   public Map<String, Object> getHeaders()
+   {
+      return headers;
+   }
+
+   @Override
+   public String toString()
+   {
+      return "StompFrame[command=" + command + ", headers=" + headers + ",content-length=" + content.length + "]";
+   }
+}

Added: branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/stomp/StompFrameDelimiter.java
===================================================================
--- branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/stomp/StompFrameDelimiter.java	                        (rev 0)
+++ branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/stomp/StompFrameDelimiter.java	2010-01-19 16:04:31 UTC (rev 8807)
@@ -0,0 +1,33 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.integration.stomp;
+
+import org.jboss.netty.handler.codec.frame.DelimiterBasedFrameDecoder;
+import org.jboss.netty.handler.codec.frame.Delimiters;
+
+/**
+ * A StompFrameDelimiter
+ *
+ * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
+ */
+public class StompFrameDelimiter extends DelimiterBasedFrameDecoder
+{
+
+   private static final int MAX_DATA_LENGTH = 1024 * 1024 * 100;
+
+   public StompFrameDelimiter()
+   {
+      super(MAX_DATA_LENGTH, true, Delimiters.nulDelimiter());
+   }
+}

Added: branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/stomp/StompFrameError.java
===================================================================
--- branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/stomp/StompFrameError.java	                        (rev 0)
+++ branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/stomp/StompFrameError.java	2010-01-19 16:04:31 UTC (rev 8807)
@@ -0,0 +1,35 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.hornetq.integration.stomp;
+
+/**
+ * Command indicating that an invalid Stomp Frame was received.
+ *
+ * @author <a href="http://hiramchirino.com">chirino</a>
+ */
+public class StompFrameError extends StompFrame {
+    private final ProtocolException exception;
+
+    public StompFrameError(ProtocolException exception) {
+        this.exception = exception;
+    }
+
+    public ProtocolException getException() {
+        return exception;
+    }
+}

Added: branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/stomp/StompMarshaller.java
===================================================================
--- branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/stomp/StompMarshaller.java	                        (rev 0)
+++ branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/stomp/StompMarshaller.java	2010-01-19 16:04:31 UTC (rev 8807)
@@ -0,0 +1,191 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.hornetq.integration.stomp;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+
+import org.hornetq.api.core.HornetQBuffer;
+
+/**
+ * Implements marshalling and unmarsalling the <a href="http://stomp.codehaus.org/">Stomp</a> protocol.
+ */
+public class StompMarshaller {
+    private static final byte[] NO_DATA = new byte[]{};
+    private static final byte[] END_OF_FRAME = new byte[]{0, '\n'};
+    private static final int MAX_COMMAND_LENGTH = 1024;
+    private static final int MAX_HEADER_LENGTH = 1024 * 10;
+    private static final int MAX_HEADERS = 1000;
+    private static final int MAX_DATA_LENGTH = 1024 * 1024 * 100;
+    private int version = 1;
+
+    public int getVersion() {
+        return version;
+    }
+
+    public void setVersion(int version) {
+        this.version = version;
+    }
+
+    public byte[] marshal(StompFrame command) throws IOException {
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        DataOutputStream dos = new DataOutputStream(baos);
+        marshal(command, dos);
+        dos.close();
+        return baos.toByteArray();
+    }
+
+    public void marshal(StompFrame stomp, DataOutput os) throws IOException {
+        StringBuffer buffer = new StringBuffer();
+        buffer.append(stomp.getCommand());
+        buffer.append(Stomp.NEWLINE);
+
+        // Output the headers.
+        for (Iterator iter = stomp.getHeaders().entrySet().iterator(); iter.hasNext();) {
+            Map.Entry entry = (Map.Entry) iter.next();
+            buffer.append(entry.getKey());
+            buffer.append(Stomp.Headers.SEPERATOR);
+            buffer.append(entry.getValue());
+            buffer.append(Stomp.NEWLINE);
+        }
+
+        // Add a newline to seperate the headers from the content.
+        buffer.append(Stomp.NEWLINE);
+
+        os.write(buffer.toString().getBytes("UTF-8"));
+        os.write(stomp.getContent());
+        os.write(END_OF_FRAME);
+    }
+
+    public StompFrame unmarshal(HornetQBuffer in) throws IOException {
+
+        try {
+            String action = null;
+
+            // skip white space to next real action line
+            while (true) {
+                action = readLine(in, MAX_COMMAND_LENGTH, "The maximum command length was exceeded");
+                if (action == null) {
+                    throw new IOException("connection was closed");
+                }
+                else {
+                    action = action.trim();
+                    if (action.length() > 0) {
+                        break;
+                    }
+                }
+            }
+
+            // Parse the headers
+            HashMap headers = new HashMap(25);
+            while (true) {
+                String line = readLine(in, MAX_HEADER_LENGTH, "The maximum header length was exceeded");
+                if (line != null && line.trim().length() > 0) {
+
+                    if (headers.size() > MAX_HEADERS) {
+                        throw new ProtocolException("The maximum number of headers was exceeded", true);
+                    }
+
+                    try {
+                        int seperator_index = line.indexOf(Stomp.Headers.SEPERATOR);
+                        String name = line.substring(0, seperator_index).trim();
+                        String value = line.substring(seperator_index + 1, line.length()).trim();
+                        headers.put(name, value);
+                    }
+                    catch (Exception e) {
+                        throw new ProtocolException("Unable to parser header line [" + line + "]", true);
+                    }
+                }
+                else {
+                    break;
+                }
+            }
+
+            // Read in the data part.
+            byte[] data = NO_DATA;
+            String contentLength = (String) headers.get(Stomp.Headers.CONTENT_LENGTH);
+            if (contentLength != null) {
+
+                // Bless the client, he's telling us how much data to read in.
+                int length;
+                try {
+                    length = Integer.parseInt(contentLength.trim());
+                }
+                catch (NumberFormatException e) {
+                    throw new ProtocolException("Specified content-length is not a valid integer", true);
+                }
+
+                if (length > MAX_DATA_LENGTH) {
+                    throw new ProtocolException("The maximum data length was exceeded", true);
+                }
+
+                data = new byte[length];
+                in.readBytes(data);
+
+                if (in.readByte() != 0) {
+                    throw new ProtocolException(Stomp.Headers.CONTENT_LENGTH + " bytes were read and " + "there was no trailing null byte", true);
+                }
+            }
+            else {
+
+                // We don't know how much to read.. data ends when we hit a 0
+                byte b;
+                ByteArrayOutputStream baos = null;
+                while (in.readableBytes() > 0 && (b = in.readByte()) != 0) {
+
+                    if (baos == null) {
+                        baos = new ByteArrayOutputStream();
+                    }
+                    else if (baos.size() > MAX_DATA_LENGTH) {
+                        throw new ProtocolException("The maximum data length was exceeded", true);
+                    }
+
+                    baos.write(b);
+                }
+
+                if (baos != null) {
+                    baos.close();
+                    data = baos.toByteArray();
+                }
+            }
+
+            return new StompFrame(action, headers, data);
+        }
+        catch (ProtocolException e) {
+            return new StompFrameError(e);
+        }
+    }
+
+    protected String readLine(HornetQBuffer in, int maxLength, String errorMessage) throws IOException {
+        byte b;
+        ByteArrayOutputStream baos = new ByteArrayOutputStream(maxLength);
+        while ((b = in.readByte()) != '\n') {
+            if (baos.size() > maxLength) {
+                throw new ProtocolException(errorMessage, true);
+            }
+            baos.write(b);
+        }
+        byte[] sequence = baos.toByteArray();
+        return new String(sequence, "UTF-8");
+    }
+}

Added: branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/stomp/StompPacketDecoder.java
===================================================================
--- branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/stomp/StompPacketDecoder.java	                        (rev 0)
+++ branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/stomp/StompPacketDecoder.java	2010-01-19 16:04:31 UTC (rev 8807)
@@ -0,0 +1,52 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.integration.stomp;
+
+import org.hornetq.api.core.HornetQBuffer;
+import org.hornetq.core.remoting.Packet;
+import org.hornetq.core.remoting.PacketDecoder;
+
+/**
+ * A StompPacketDecoder
+ *
+ * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
+ */
+public class StompPacketDecoder implements PacketDecoder
+{
+   private final StompMarshaller marshaller = new StompMarshaller();
+
+   private final ProtocolConverter converter = new ProtocolConverter();
+
+   // PacketDecoder implementation ----------------------------------
+
+   public Packet decode(HornetQBuffer in)
+   {
+      StompFrame frame;
+      try
+      {
+         frame = marshaller.unmarshal(in);
+         System.out.println(">>> " + frame);
+         Packet packet = converter.toPacket(frame);
+         packet.setChannelID(1);
+         System.out.println(">>> " + packet);
+
+         return packet;
+      }
+      catch (Exception e)
+      {
+         e.printStackTrace();
+         return null;
+      }
+   }
+}

Modified: branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/ChannelPipelineSupport.java
===================================================================
--- branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/ChannelPipelineSupport.java	2010-01-19 15:48:47 UTC (rev 8806)
+++ branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/ChannelPipelineSupport.java	2010-01-19 16:04:31 UTC (rev 8807)
@@ -16,6 +16,7 @@
 import javax.net.ssl.SSLContext;
 import javax.net.ssl.SSLEngine;
 
+import org.hornetq.integration.stomp.StompFrameDelimiter;
 import org.hornetq.spi.core.remoting.BufferHandler;
 import org.jboss.netty.channel.ChannelPipeline;
 import org.jboss.netty.handler.ssl.SslHandler;
@@ -44,9 +45,15 @@
 
    // Public --------------------------------------------------------
 
-   public static void addCodecFilter(final ChannelPipeline pipeline, final BufferHandler handler)
+   public static void addStompCodecFilter(final ChannelPipeline pipeline, final BufferHandler handler)
    {
       assert pipeline != null;
+      pipeline.addLast("delimiter", new StompFrameDelimiter());
+   }
+
+   public static void addHornetQCodecFilter(final ChannelPipeline pipeline, final BufferHandler handler)
+   {
+      assert pipeline != null;
       pipeline.addLast("decoder", new HornetQFrameDecoder2());
    }
 

Modified: branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/HornetQChannelHandler.java
===================================================================
--- branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/HornetQChannelHandler.java	2010-01-19 15:48:47 UTC (rev 8806)
+++ branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/HornetQChannelHandler.java	2010-01-19 16:04:31 UTC (rev 8807)
@@ -15,6 +15,7 @@
 import org.hornetq.api.core.HornetQException;
 import org.hornetq.core.buffers.impl.ChannelBufferWrapper;
 import org.hornetq.core.logging.Logger;
+import org.hornetq.core.remoting.PacketDecoder;
 import org.hornetq.spi.core.remoting.BufferHandler;
 import org.hornetq.spi.core.remoting.ConnectionLifeCycleListener;
 import org.jboss.netty.buffer.ChannelBuffer;
@@ -31,23 +32,27 @@
  * @author <a href="mailto:tlee at redhat.com">Trustin Lee</a>
  * @version $Rev$, $Date$
  */
-class HornetQChannelHandler extends SimpleChannelHandler
+public class HornetQChannelHandler extends SimpleChannelHandler
 {
    private static final Logger log = Logger.getLogger(HornetQChannelHandler.class);
 
    private final ChannelGroup group;
 
+   private final PacketDecoder decoder;
+
    private final BufferHandler handler;
 
    private final ConnectionLifeCycleListener listener;
 
    volatile boolean active;
 
-   HornetQChannelHandler(final ChannelGroup group,
-                         final BufferHandler handler,
-                         final ConnectionLifeCycleListener listener)
+   public HornetQChannelHandler(final ChannelGroup group,
+                                final PacketDecoder decoder,
+                                final BufferHandler handler,
+                                final ConnectionLifeCycleListener listener)
    {
       this.group = group;
+      this.decoder = decoder;
       this.handler = handler;
       this.listener = listener;
    }
@@ -64,7 +69,7 @@
    {
       ChannelBuffer buffer = (ChannelBuffer)e.getMessage();
 
-      handler.bufferReceived(e.getChannel().getId(), new ChannelBufferWrapper(buffer));
+      handler.bufferReceived(e.getChannel().getId(), new ChannelBufferWrapper(buffer), decoder);
    }
 
    @Override

Modified: branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/NettyAcceptor.java
===================================================================
--- branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/NettyAcceptor.java	2010-01-19 15:48:47 UTC (rev 8806)
+++ branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/NettyAcceptor.java	2010-01-19 16:04:31 UTC (rev 8807)
@@ -31,9 +31,12 @@
 import org.hornetq.api.core.TransportConfiguration;
 import org.hornetq.api.core.management.NotificationType;
 import org.hornetq.core.logging.Logger;
+import org.hornetq.core.remoting.PacketDecoder;
+import org.hornetq.core.remoting.impl.CorePacketDecoder;
 import org.hornetq.core.remoting.impl.ssl.SSLSupport;
 import org.hornetq.core.server.management.Notification;
 import org.hornetq.core.server.management.NotificationService;
+import org.hornetq.integration.stomp.StompPacketDecoder;
 import org.hornetq.spi.core.remoting.Acceptor;
 import org.hornetq.spi.core.remoting.BufferHandler;
 import org.hornetq.spi.core.remoting.Connection;
@@ -102,6 +105,7 @@
 
    private final boolean useInvm;
 
+   private final String protocol;
    private final String host;
 
    private final int port;
@@ -176,6 +180,9 @@
       useInvm = ConfigurationHelper.getBooleanProperty(TransportConstants.USE_INVM_PROP_NAME,
                                                        TransportConstants.DEFAULT_USE_INVM,
                                                        configuration);
+      protocol = ConfigurationHelper.getStringProperty(TransportConstants.PROTOCOL_PROP_NAME,
+                                                TransportConstants.DEFAULT_PROTOCOL,
+                                                configuration);
       host = ConfigurationHelper.getStringProperty(TransportConstants.HOST_PROP_NAME,
                                                    TransportConstants.DEFAULT_HOST,
                                                    configuration);
@@ -279,9 +286,18 @@
                pipeline.addLast("httpResponseEncoder", new HttpResponseEncoder());
                pipeline.addLast("httphandler", new HttpAcceptorHandler(httpKeepAliveRunnable, httpResponseTime));
             }
-
-            ChannelPipelineSupport.addCodecFilter(pipeline, handler);
-            pipeline.addLast("handler", new HornetQServerChannelHandler(channelGroup, handler, new Listener()));
+            PacketDecoder decoder;
+            if (protocol.equals(TransportConstants.STOMP_PROTOCOL))
+            {
+               ChannelPipelineSupport.addStompCodecFilter(pipeline, handler);
+               decoder = new StompPacketDecoder();
+            } else
+            {
+               ChannelPipelineSupport.addHornetQCodecFilter(pipeline, handler);
+               decoder = new CorePacketDecoder();
+            }
+            
+            pipeline.addLast("handler", new HornetQServerChannelHandler(channelGroup, decoder, handler, new Listener()));
             return pipeline;
          }
       };
@@ -475,10 +491,11 @@
    private final class HornetQServerChannelHandler extends HornetQChannelHandler
    {
       HornetQServerChannelHandler(final ChannelGroup group,
+                                  final PacketDecoder decoder,
                                   final BufferHandler handler,
                                   final ConnectionLifeCycleListener listener)
       {
-         super(group, handler, listener);
+         super(group, decoder, handler, listener);
       }
 
       @Override

Modified: branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/NettyConnection.java
===================================================================
--- branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/NettyConnection.java	2010-01-19 15:48:47 UTC (rev 8806)
+++ branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/NettyConnection.java	2010-01-19 16:04:31 UTC (rev 8807)
@@ -14,7 +14,6 @@
 package org.hornetq.integration.transports.netty;
 
 import org.hornetq.api.core.HornetQBuffer;
-import org.hornetq.api.core.HornetQException;
 import org.hornetq.core.buffers.impl.ChannelBufferWrapper;
 import org.hornetq.core.logging.Logger;
 import org.hornetq.spi.core.remoting.Connection;

Modified: branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/NettyConnector.java
===================================================================
--- branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/NettyConnector.java	2010-01-19 15:48:47 UTC (rev 8806)
+++ branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/NettyConnector.java	2010-01-19 16:04:31 UTC (rev 8807)
@@ -30,6 +30,8 @@
 
 import org.hornetq.api.core.HornetQException;
 import org.hornetq.core.logging.Logger;
+import org.hornetq.core.remoting.PacketDecoder;
+import org.hornetq.core.remoting.impl.CorePacketDecoder;
 import org.hornetq.core.remoting.impl.ssl.SSLSupport;
 import org.hornetq.spi.core.remoting.BufferHandler;
 import org.hornetq.spi.core.remoting.Connection;
@@ -93,6 +95,8 @@
    private ChannelGroup channelGroup;
 
    private final BufferHandler handler;
+   
+   private final PacketDecoder decoder = new CorePacketDecoder();
 
    private final ConnectionLifeCycleListener listener;
 
@@ -310,7 +314,7 @@
                pipeline.addLast("httpResponseDecoder", new HttpResponseDecoder());
                pipeline.addLast("httphandler", new HttpHandler());
             }
-            ChannelPipelineSupport.addCodecFilter(pipeline, handler);
+            ChannelPipelineSupport.addHornetQCodecFilter(pipeline, handler);
             pipeline.addLast("handler", new HornetQClientChannelHandler(channelGroup, handler, new Listener()));
             return pipeline;
          }
@@ -441,7 +445,7 @@
                                   final BufferHandler handler,
                                   final ConnectionLifeCycleListener listener)
       {
-         super(group, handler, listener);
+         super(group, decoder, handler, listener);
       }
    }
 

Modified: branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/TransportConstants.java
===================================================================
--- branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/TransportConstants.java	2010-01-19 15:48:47 UTC (rev 8806)
+++ branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/TransportConstants.java	2010-01-19 16:04:31 UTC (rev 8807)
@@ -46,6 +46,8 @@
 
    public static final String USE_INVM_PROP_NAME = "use-invm";
 
+   public static final String PROTOCOL_PROP_NAME = "protocol";
+
    public static final String HOST_PROP_NAME = "host";
 
    public static final String PORT_PROP_NAME = "port";
@@ -75,10 +77,18 @@
 
    public static final boolean DEFAULT_USE_SERVLET = false;
 
+   public static final String HORNETQ_PROTOCOL = "hornetq";
+
+   public static final String STOMP_PROTOCOL = "stomp";
+
+   public static final String DEFAULT_PROTOCOL = HORNETQ_PROTOCOL;
+
    public static final String DEFAULT_HOST = "localhost";
 
    public static final int DEFAULT_PORT = 5445;
 
+   public static final int DEFAULT_STOMP_PORT = 61613;
+
    public static final String DEFAULT_KEYSTORE_PATH = "hornetq.keystore";
 
    public static final String DEFAULT_KEYSTORE_PASSWORD = "secureexample";
@@ -120,6 +130,7 @@
       allowableAcceptorKeys.add(TransportConstants.HTTP_SERVER_SCAN_PERIOD_PROP_NAME);
       allowableAcceptorKeys.add(TransportConstants.USE_NIO_PROP_NAME);
       allowableAcceptorKeys.add(TransportConstants.USE_INVM_PROP_NAME);
+      allowableAcceptorKeys.add(TransportConstants.PROTOCOL_PROP_NAME);
       allowableAcceptorKeys.add(TransportConstants.HOST_PROP_NAME);
       allowableAcceptorKeys.add(TransportConstants.PORT_PROP_NAME);
       allowableAcceptorKeys.add(TransportConstants.KEYSTORE_PATH_PROP_NAME);

Modified: branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/spi/core/remoting/BufferHandler.java
===================================================================
--- branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/spi/core/remoting/BufferHandler.java	2010-01-19 15:48:47 UTC (rev 8806)
+++ branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/spi/core/remoting/BufferHandler.java	2010-01-19 16:04:31 UTC (rev 8807)
@@ -13,6 +13,7 @@
 package org.hornetq.spi.core.remoting;
 
 import org.hornetq.api.core.HornetQBuffer;
+import org.hornetq.core.remoting.PacketDecoder;
 
 /**
  * A BufferHandler that will handle buffers received by an acceptor.
@@ -29,7 +30,7 @@
     * @param connectionID the connection the buffer was received on
     * @param buffer       the buffer to decode
     */
-   void bufferReceived(Object connectionID, HornetQBuffer buffer);
+   void bufferReceived(Object connectionID, HornetQBuffer buffer, PacketDecoder decoder);
 
    /**
     * called by the remoting connection prior to {@link org.hornetq.spi.core.remoting.BufferHandler#bufferReceived(Object, org.hornetq.api.core.HornetQBuffer)}.

Modified: branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/integration/http/NettyHttpTest.java
===================================================================
--- branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/integration/http/NettyHttpTest.java	2010-01-19 15:48:47 UTC (rev 8806)
+++ branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/integration/http/NettyHttpTest.java	2010-01-19 16:04:31 UTC (rev 8807)
@@ -25,6 +25,7 @@
 import org.hornetq.api.core.HornetQBuffer;
 import org.hornetq.api.core.HornetQException;
 import org.hornetq.core.config.impl.ConfigurationImpl;
+import org.hornetq.core.remoting.PacketDecoder;
 import org.hornetq.core.remoting.impl.AbstractBufferHandler;
 import org.hornetq.integration.transports.netty.NettyAcceptor;
 import org.hornetq.integration.transports.netty.NettyConnector;
@@ -474,7 +475,7 @@
          this.latch = latch;
       }
 
-      public void bufferReceived(final Object connectionID, final HornetQBuffer buffer)
+      public void bufferReceived(final Object connectionID, final HornetQBuffer buffer, final PacketDecoder decoder)
       {
          int i = buffer.readInt();
          messages.add(i);
@@ -496,7 +497,7 @@
          this.latch = latch;
       }
 
-      public void bufferReceived(final Object connectionID, final HornetQBuffer buffer)
+      public void bufferReceived(final Object connectionID, final HornetQBuffer buffer, final PacketDecoder decoder)
       {
          int i = buffer.readInt();
 
@@ -529,7 +530,7 @@
          return 0;
       }
 
-      public void bufferReceived(final Object connectionID, final HornetQBuffer buffer)
+      public void bufferReceived(final Object connectionID, final HornetQBuffer buffer, final PacketDecoder decoder)
       {
          int i = buffer.readInt();
          messages.add(i);

Added: branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/integration/stomp/StompTest.java
===================================================================
--- branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/integration/stomp/StompTest.java	                        (rev 0)
+++ branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/integration/stomp/StompTest.java	2010-01-19 16:04:31 UTC (rev 8807)
@@ -0,0 +1,841 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.hornetq.tests.integration.stomp;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.Socket;
+import java.net.SocketTimeoutException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import javax.jms.BytesMessage;
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.JMSException;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import junit.framework.Assert;
+import junit.framework.TestCase;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.hornetq.api.core.TransportConfiguration;
+import org.hornetq.core.config.Configuration;
+import org.hornetq.core.config.impl.ConfigurationImpl;
+import org.hornetq.core.remoting.impl.invm.InVMAcceptorFactory;
+import org.hornetq.core.remoting.impl.invm.InVMConnectorFactory;
+import org.hornetq.core.server.HornetQServer;
+import org.hornetq.core.server.HornetQServers;
+import org.hornetq.integration.stomp.Stomp;
+import org.hornetq.integration.transports.netty.NettyAcceptorFactory;
+import org.hornetq.integration.transports.netty.TransportConstants;
+import org.hornetq.jms.client.HornetQConnectionFactory;
+import org.hornetq.jms.server.JMSServerManager;
+import org.hornetq.jms.server.config.JMSConfiguration;
+import org.hornetq.jms.server.config.impl.JMSConfigurationImpl;
+import org.hornetq.jms.server.config.impl.QueueConfigurationImpl;
+import org.hornetq.jms.server.impl.JMSServerManagerImpl;
+
+public class StompTest extends TestCase {
+    private static final transient Log log = LogFactory.getLog(StompTest.class);
+    private int port = 61613;
+    private Socket stompSocket;
+    private ByteArrayOutputStream inputBuffer;
+    private ConnectionFactory connectionFactory;
+    private Connection connection;
+    private Session session;
+    private Queue queue;
+   private JMSServerManager server;
+
+    public void testConnect() throws Exception {
+
+        String connect_frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n" + "request-id: 1\n" + "\n" + Stomp.NULL;
+        sendFrame(connect_frame);
+
+        String f = receiveFrame(10000);
+        Assert.assertTrue(f.startsWith("CONNECTED"));
+        Assert.assertTrue(f.indexOf("response-id:1") >= 0);
+    }
+
+    public void testSendMessage() throws Exception {
+
+        MessageConsumer consumer = session.createConsumer(queue);
+
+        String frame =
+                "CONNECT\n" +
+                        "login: brianm\n" +
+                        "passcode: wombats\n\n" +
+                        Stomp.NULL;
+        sendFrame(frame);
+
+        frame = receiveFrame(10000);
+        Assert.assertTrue(frame.startsWith("CONNECTED"));
+
+        frame =
+                "SEND\n" +
+                        "destination:/queue/" + getQueueName() + "\n\n" +
+                        "Hello World" +
+                        Stomp.NULL;
+
+        sendFrame(frame);
+
+        TextMessage message = (TextMessage) consumer.receive(1000);
+        Assert.assertNotNull(message);
+        Assert.assertEquals("Hello World", message.getText());
+
+        // Make sure that the timestamp is valid - should
+        // be very close to the current time.
+        long tnow = System.currentTimeMillis();
+        long tmsg = message.getJMSTimestamp();
+        Assert.assertTrue(Math.abs(tnow - tmsg) < 1000);
+    }
+
+    public void _testJMSXGroupIdCanBeSet() throws Exception {
+
+        MessageConsumer consumer = session.createConsumer(queue);
+
+        String frame =
+                "CONNECT\n" +
+                        "login: brianm\n" +
+                        "passcode: wombats\n\n" +
+                        Stomp.NULL;
+        sendFrame(frame);
+
+        frame = receiveFrame(10000);
+        Assert.assertTrue(frame.startsWith("CONNECTED"));
+
+        frame =
+                "SEND\n" +
+                        "destination:/queue/" + getQueueName() + "\n" +
+                        "JMSXGroupID: TEST\n\n" +
+                        "Hello World" +
+                        Stomp.NULL;
+
+        sendFrame(frame);
+
+        TextMessage message = (TextMessage) consumer.receive(1000);
+        Assert.assertNotNull(message);
+        // TODO do we support it?
+        //Assert.assertEquals("TEST", ((TextMessage) message).getGroupID());
+    }
+
+    public void testSendMessageWithCustomHeadersAndSelector() throws Exception {
+
+        MessageConsumer consumer = session.createConsumer(queue, "foo = 'abc'");
+
+        String frame =
+                "CONNECT\n" +
+                        "login: brianm\n" +
+                        "passcode: wombats\n\n" +
+                        Stomp.NULL;
+        sendFrame(frame);
+
+        frame = receiveFrame(10000);
+        Assert.assertTrue(frame.startsWith("CONNECTED"));
+
+        frame =
+                "SEND\n" +
+                        "foo:abc\n" +
+                        "bar:123\n" +
+                        "destination:/queue/" + getQueueName() + "\n\n" +
+                        "Hello World" +
+                        Stomp.NULL;
+
+        sendFrame(frame);
+
+        TextMessage message = (TextMessage) consumer.receive(1000);
+        Assert.assertNotNull(message);
+        Assert.assertEquals("Hello World", message.getText());
+        Assert.assertEquals("foo", "abc", message.getStringProperty("foo"));
+        Assert.assertEquals("bar", "123", message.getStringProperty("bar"));
+    }
+
+    public void _testSendMessageWithStandardHeaders() throws Exception {
+
+        MessageConsumer consumer = session.createConsumer(queue);
+
+        String frame =
+                "CONNECT\n" +
+                        "login: brianm\n" +
+                        "passcode: wombats\n\n" +
+                        Stomp.NULL;
+        sendFrame(frame);
+
+        frame = receiveFrame(10000);
+        Assert.assertTrue(frame.startsWith("CONNECTED"));
+
+        frame =
+                "SEND\n" +
+                        "correlation-id:c123\n" +
+                        "priority:3\n" +
+                        "type:t345\n" +
+                        "JMSXGroupID:abc\n" +
+                        "foo:abc\n" +
+                        "bar:123\n" +
+                        "destination:/queue/" + getQueueName() + "\n\n" +
+                        "Hello World" +
+                        Stomp.NULL;
+
+        sendFrame(frame);
+
+        TextMessage message = (TextMessage) consumer.receive(1000);
+        Assert.assertNotNull(message);
+        Assert.assertEquals("Hello World", message.getText());
+        Assert.assertEquals("JMSCorrelationID", "c123", message.getJMSCorrelationID());
+        Assert.assertEquals("getJMSType", "t345", message.getJMSType());
+        Assert.assertEquals("getJMSPriority", 3, message.getJMSPriority());
+        Assert.assertEquals("foo", "abc", message.getStringProperty("foo"));
+        Assert.assertEquals("bar", "123", message.getStringProperty("bar"));
+
+        Assert.assertEquals("JMSXGroupID", "abc", message.getStringProperty("JMSXGroupID"));
+        // FIXME do we support it?
+        //Assert.assertEquals("GroupID", "abc", amqMessage.getGroupID());
+    }
+
+    public void testSubscribeWithAutoAck() throws Exception {
+
+        String frame =
+                "CONNECT\n" +
+                        "login: brianm\n" +
+                        "passcode: wombats\n\n" +
+                        Stomp.NULL;
+        sendFrame(frame);
+
+        frame = receiveFrame(100000);
+        Assert.assertTrue(frame.startsWith("CONNECTED"));
+
+        frame =
+                "SUBSCRIBE\n" +
+                        "destination:/queue/" + getQueueName() + "\n" +
+                        "ack:auto\n\n" +
+                        Stomp.NULL;
+        sendFrame(frame);
+
+        sendMessage(getName());
+
+        frame = receiveFrame(10000);
+        Assert.assertTrue(frame.startsWith("MESSAGE"));
+
+        frame =
+                "DISCONNECT\n" +
+                        "\n\n" +
+                        Stomp.NULL;
+        sendFrame(frame);
+    }
+
+    public void testSubscribeWithAutoAckAndBytesMessage() throws Exception {
+
+        String frame =
+                "CONNECT\n" +
+                        "login: brianm\n" +
+                        "passcode: wombats\n\n" +
+                        Stomp.NULL;
+        sendFrame(frame);
+
+        frame = receiveFrame(100000);
+        Assert.assertTrue(frame.startsWith("CONNECTED"));
+
+        frame =
+                "SUBSCRIBE\n" +
+                        "destination:/queue/" + getQueueName() + "\n" +
+                        "ack:auto\n\n" +
+                        Stomp.NULL;
+        sendFrame(frame);
+
+        sendBytesMessage(new byte[]{1, 2, 3, 4, 5});
+
+        frame = receiveFrame(10000);
+        Assert.assertTrue(frame.startsWith("MESSAGE"));
+
+        Pattern cl = Pattern.compile("Content-length:\\s*(\\d+)", Pattern.CASE_INSENSITIVE);
+        Matcher cl_matcher = cl.matcher(frame);
+        Assert.assertTrue(cl_matcher.find());
+        Assert.assertEquals("5", cl_matcher.group(1));
+
+        Assert.assertFalse(Pattern.compile("type:\\s*null", Pattern.CASE_INSENSITIVE).matcher(frame).find());
+
+        frame =
+                "DISCONNECT\n" +
+                        "\n\n" +
+                        Stomp.NULL;
+        sendFrame(frame);
+    }
+
+    public void testSubscribeWithMessageSentWithProperties() throws Exception {
+
+        String frame =
+                "CONNECT\n" +
+                        "login: brianm\n" +
+                        "passcode: wombats\n\n" +
+                        Stomp.NULL;
+        sendFrame(frame);
+
+        frame = receiveFrame(100000);
+        Assert.assertTrue(frame.startsWith("CONNECTED"));
+
+        frame =
+                "SUBSCRIBE\n" +
+                        "destination:/queue/" + getQueueName() + "\n" +
+                        "ack:auto\n\n" +
+                        Stomp.NULL;
+        sendFrame(frame);
+
+        MessageProducer producer = session.createProducer(queue);
+        TextMessage message = session.createTextMessage("Hello World");
+        message.setStringProperty("s", "value");
+        message.setBooleanProperty("n", false);
+        message.setByteProperty("byte", (byte) 9);
+        message.setDoubleProperty("d", 2.0);
+        message.setFloatProperty("f", (float) 6.0);
+        message.setIntProperty("i", 10);
+        message.setLongProperty("l", 121);
+        message.setShortProperty("s", (short) 12);
+        producer.send(message);
+
+        frame = receiveFrame(10000);
+        Assert.assertTrue(frame.startsWith("MESSAGE"));
+
+//        System.out.println("out: "+frame);
+
+        frame =
+                "DISCONNECT\n" +
+                        "\n\n" +
+                        Stomp.NULL;
+        sendFrame(frame);
+    }
+
+    public void testMessagesAreInOrder() throws Exception {
+        int ctr = 10;
+        String[] data = new String[ctr];
+
+        String frame =
+                "CONNECT\n" +
+                        "login: brianm\n" +
+                        "passcode: wombats\n\n" +
+                        Stomp.NULL;
+        sendFrame(frame);
+
+        frame = receiveFrame(100000);
+        Assert.assertTrue(frame.startsWith("CONNECTED"));
+
+        frame =
+                "SUBSCRIBE\n" +
+                        "destination:/queue/" + getQueueName() + "\n" +
+                        "ack:auto\n\n" +
+                        Stomp.NULL;
+        sendFrame(frame);
+
+        for (int i = 0; i < ctr; ++i) {
+            data[i] = getName() + i;
+            sendMessage(data[i]);
+        }
+
+        for (int i = 0; i < ctr; ++i) {
+            frame = receiveFrame(1000);
+            Assert.assertTrue("Message not in order", frame.indexOf(data[i]) >= 0);
+        }
+
+        // sleep a while before publishing another set of messages
+        waitForFrameToTakeEffect();
+
+        for (int i = 0; i < ctr; ++i) {
+            data[i] = getName() + ":second:" + i;
+            sendMessage(data[i]);
+        }
+
+        for (int i = 0; i < ctr; ++i) {
+            frame = receiveFrame(1000);
+            Assert.assertTrue("Message not in order", frame.indexOf(data[i]) >= 0);
+        }
+
+        frame =
+                "DISCONNECT\n" +
+                        "\n\n" +
+                        Stomp.NULL;
+        sendFrame(frame);
+    }
+
+    public void testSubscribeWithAutoAckAndSelector() throws Exception {
+
+        String frame =
+                "CONNECT\n" +
+                        "login: brianm\n" +
+                        "passcode: wombats\n\n" +
+                        Stomp.NULL;
+        sendFrame(frame);
+
+        frame = receiveFrame(100000);
+        Assert.assertTrue(frame.startsWith("CONNECTED"));
+
+        frame =
+                "SUBSCRIBE\n" +
+                        "destination:/queue/" + getQueueName() + "\n" +
+                        "selector: foo = 'zzz'\n" +
+                        "ack:auto\n\n" +
+                        Stomp.NULL;
+        sendFrame(frame);
+
+        sendMessage("Ignored message", "foo", "1234");
+        sendMessage("Real message", "foo", "zzz");
+
+        frame = receiveFrame(10000);
+        Assert.assertTrue(frame.startsWith("MESSAGE"));
+        Assert.assertTrue("Should have received the real message but got: " + frame, frame.indexOf("Real message") > 0);
+
+        frame =
+                "DISCONNECT\n" +
+                        "\n\n" +
+                        Stomp.NULL;
+        sendFrame(frame);
+    }
+
+    public void testSubscribeWithClientAck() throws Exception {
+
+        String frame =
+                "CONNECT\n" +
+                        "login: brianm\n" +
+                        "passcode: wombats\n\n" +
+                        Stomp.NULL;
+        sendFrame(frame);
+
+        frame = receiveFrame(10000);
+        Assert.assertTrue(frame.startsWith("CONNECTED"));
+
+        frame =
+                "SUBSCRIBE\n" +
+                        "destination:/queue/" + getQueueName() + "\n" +
+                        "ack:client\n\n" +
+                        Stomp.NULL;
+
+        sendFrame(frame);
+        sendMessage(getName());
+        frame = receiveFrame(10000);
+        Assert.assertTrue(frame.startsWith("MESSAGE"));
+
+        frame =
+                "DISCONNECT\n" +
+                        "\n\n" +
+                        Stomp.NULL;
+        sendFrame(frame);
+
+        // message should be received since message was not acknowledged
+        MessageConsumer consumer = session.createConsumer(queue);
+        TextMessage message = (TextMessage) consumer.receive(1000);
+        Assert.assertNotNull(message);
+        Assert.assertTrue(message.getJMSRedelivered());
+    }
+
+    public void testSubscribeWithClientAckThenConsumingAgainWithAutoAckWithNoDisconnectFrame() throws Exception {
+        assertSubscribeWithClientAckThenConsumeWithAutoAck(false);
+    }
+
+    public void testSubscribeWithClientAckThenConsumingAgainWithAutoAckWithExplicitDisconnect() throws Exception {
+        assertSubscribeWithClientAckThenConsumeWithAutoAck(true);
+    }
+
+    protected void assertSubscribeWithClientAckThenConsumeWithAutoAck(boolean sendDisconnect) throws Exception {
+
+        String frame =
+                "CONNECT\n" +
+                        "login: brianm\n" +
+                        "passcode: wombats\n\n" +
+                        Stomp.NULL;
+        sendFrame(frame);
+
+        frame = receiveFrame(10000);
+        Assert.assertTrue(frame.startsWith("CONNECTED"));
+
+        frame =
+                "SUBSCRIBE\n" +
+                        "destination:/queue/" + getQueueName() + "\n" +
+                        "ack:client\n\n" +
+                        Stomp.NULL;
+
+        sendFrame(frame);
+        sendMessage(getName());
+
+        frame = receiveFrame(10000);
+        Assert.assertTrue(frame.startsWith("MESSAGE"));
+
+        log.info("Reconnecting!");
+        
+        if (sendDisconnect) {
+            frame =
+                    "DISCONNECT\n" +
+                            "\n\n" +
+                            Stomp.NULL;
+            sendFrame(frame);
+            reconnect();
+        }
+        else {
+            reconnect(1000);
+        }
+
+
+        // message should be received since message was not acknowledged
+        frame =
+                "CONNECT\n" +
+                        "login: brianm\n" +
+                        "passcode: wombats\n\n" +
+                        Stomp.NULL;
+        sendFrame(frame);
+
+        frame = receiveFrame(10000);
+        Assert.assertTrue(frame.startsWith("CONNECTED"));
+
+        frame =
+                "SUBSCRIBE\n" +
+                        "destination:/queue/" + getQueueName() + "\n\n" +
+                        Stomp.NULL;
+
+        sendFrame(frame);
+
+        frame = receiveFrame(10000);
+        Assert.assertTrue(frame.startsWith("MESSAGE"));
+
+        frame =
+                "DISCONNECT\n" +
+                        "\n\n" +
+                        Stomp.NULL;
+        sendFrame(frame);
+
+        // now lets make sure we don't see the message again
+        reconnect();
+
+        frame =
+                "CONNECT\n" +
+                        "login: brianm\n" +
+                        "passcode: wombats\n\n" +
+                        Stomp.NULL;
+        sendFrame(frame);
+
+        frame = receiveFrame(10000);
+        Assert.assertTrue(frame.startsWith("CONNECTED"));
+
+        frame =
+                "SUBSCRIBE\n" +
+                        "destination:/queue/" + getQueueName() + "\n\n" +
+                        Stomp.NULL;
+
+        sendFrame(frame);
+        sendMessage("shouldBeNextMessage");
+
+        frame = receiveFrame(10000);
+        Assert.assertTrue(frame.startsWith("MESSAGE"));
+        Assert.assertTrue(frame.contains("shouldBeNextMessage"));
+    }
+
+    public void testUnsubscribe() throws Exception {
+
+        String frame =
+                "CONNECT\n" +
+                        "login: brianm\n" +
+                        "passcode: wombats\n\n" +
+                        Stomp.NULL;
+        sendFrame(frame);
+        frame = receiveFrame(100000);
+        Assert.assertTrue(frame.startsWith("CONNECTED"));
+
+        frame =
+                "SUBSCRIBE\n" +
+                        "destination:/queue/" + getQueueName() + "\n" +
+                        "ack:auto\n\n" +
+                        Stomp.NULL;
+        sendFrame(frame);
+
+        //send a message to our queue
+        sendMessage("first message");
+
+        //receive message from socket
+        frame = receiveFrame(1000);
+        Assert.assertTrue(frame.startsWith("MESSAGE"));
+
+        //remove suscription
+        frame =
+                "UNSUBSCRIBE\n" +
+                        "destination:/queue/" + getQueueName() + "\n" +
+                        "\n\n" +
+                        Stomp.NULL;
+        sendFrame(frame);
+
+        waitForFrameToTakeEffect();
+
+        //send a message to our queue
+        sendMessage("second message");
+
+        try {
+            frame = receiveFrame(1000);
+            log.info("Received frame: " + frame);
+            Assert.fail("No message should have been received since subscription was removed");
+        }
+        catch (SocketTimeoutException e) {
+
+        }
+    }
+
+    public void testTransactionCommit() throws Exception {
+        MessageConsumer consumer = session.createConsumer(queue);
+
+        String frame =
+                "CONNECT\n" +
+                        "login: brianm\n" +
+                        "passcode: wombats\n\n" +
+                        Stomp.NULL;
+        sendFrame(frame);
+
+        String f = receiveFrame(1000);
+        Assert.assertTrue(f.startsWith("CONNECTED"));
+
+        frame =
+                "BEGIN\n" +
+                        "transaction: tx1\n" +
+                        "\n\n" +
+                        Stomp.NULL;
+        sendFrame(frame);
+
+        frame =
+                "SEND\n" +
+                        "destination:/queue/" + getQueueName() + "\n" +
+                        "transaction: tx1\n" +
+                        "\n\n" +
+                        "Hello World" +
+                        Stomp.NULL;
+        sendFrame(frame);
+
+        frame =
+                "COMMIT\n" +
+                        "transaction: tx1\n" +
+                        "\n\n" +
+                        Stomp.NULL;
+        sendFrame(frame);
+
+        waitForFrameToTakeEffect();
+
+        TextMessage message = (TextMessage) consumer.receive(1000);
+        Assert.assertNotNull("Should have received a message", message);
+    }
+
+    public void testTransactionRollback() throws Exception {
+        MessageConsumer consumer = session.createConsumer(queue);
+
+        String frame =
+                "CONNECT\n" +
+                        "login: brianm\n" +
+                        "passcode: wombats\n\n" +
+                        Stomp.NULL;
+        sendFrame(frame);
+
+        String f = receiveFrame(1000);
+        Assert.assertTrue(f.startsWith("CONNECTED"));
+
+        frame =
+                "BEGIN\n" +
+                        "transaction: tx1\n" +
+                        "\n\n" +
+                        Stomp.NULL;
+        sendFrame(frame);
+
+        frame =
+                "SEND\n" +
+                        "destination:/queue/" + getQueueName() + "\n" +
+                        "transaction: tx1\n" +
+                        "\n" +
+                        "first message" +
+                        Stomp.NULL;
+        sendFrame(frame);
+
+        //rollback first message
+        frame =
+                "ABORT\n" +
+                        "transaction: tx1\n" +
+                        "\n\n" +
+                        Stomp.NULL;
+        sendFrame(frame);
+
+        frame =
+                "BEGIN\n" +
+                        "transaction: tx1\n" +
+                        "\n\n" +
+                        Stomp.NULL;
+        sendFrame(frame);
+
+        frame =
+                "SEND\n" +
+                        "destination:/queue/" + getQueueName() + "\n" +
+                        "transaction: tx1\n" +
+                        "\n" +
+                        "second message" +
+                        Stomp.NULL;
+        sendFrame(frame);
+
+        frame =
+                "COMMIT\n" +
+                        "transaction: tx1\n" +
+                        "\n\n" +
+                        Stomp.NULL;
+        sendFrame(frame);
+
+        // This test case is currently failing
+        waitForFrameToTakeEffect();
+
+        //only second msg should be received since first msg was rolled back
+        TextMessage message = (TextMessage) consumer.receive(1000);
+        Assert.assertNotNull(message);
+        Assert.assertEquals("second message", message.getText().trim());
+    }
+
+    // Implementation methods
+    //-------------------------------------------------------------------------
+    protected void setUp() throws Exception {
+       server = createServer();
+       server.start();
+        connectionFactory = createConnectionFactory();
+
+        stompSocket = createSocket();
+        inputBuffer = new ByteArrayOutputStream();
+
+        connection = connectionFactory.createConnection();
+        session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        queue = session.createQueue(getQueueName());
+        connection.start();
+    }
+
+    /**
+    * @return
+    * @throws Exception 
+    */
+   private JMSServerManager createServer() throws Exception
+   {
+      Configuration config = new ConfigurationImpl();
+      config.setSecurityEnabled(false);
+
+      Map<String, Object> params = new HashMap<String, Object>();
+      params.put(TransportConstants.PROTOCOL_PROP_NAME, TransportConstants.STOMP_PROTOCOL);
+      params.put(TransportConstants.PORT_PROP_NAME, TransportConstants.DEFAULT_STOMP_PORT);
+      TransportConfiguration stompTransport = new TransportConfiguration(NettyAcceptorFactory.class.getName(), params);
+      config.getAcceptorConfigurations().add(stompTransport);
+      config.getAcceptorConfigurations().add(new TransportConfiguration(InVMAcceptorFactory.class.getName()));
+       HornetQServer hornetQServer = HornetQServers.newHornetQServer(config);
+       
+       JMSConfiguration jmsConfig = new JMSConfigurationImpl();
+       jmsConfig.getQueueConfigurations().add(new QueueConfigurationImpl(getQueueName(), null, false, getQueueName()));
+       server = new JMSServerManagerImpl(hornetQServer, jmsConfig);
+       server.setContext(null);
+       return server;
+   }
+
+   protected void tearDown() throws Exception {
+        connection.close();
+        if (stompSocket != null) {
+            stompSocket.close();
+        }
+        server.stop();
+    }
+
+    protected void reconnect() throws Exception {
+        reconnect(0);
+    }
+    protected void reconnect(long sleep) throws Exception {
+        stompSocket.close();
+
+        if (sleep > 0) {
+            Thread.sleep(sleep);
+        }
+
+        stompSocket = createSocket();
+        inputBuffer = new ByteArrayOutputStream();
+    }
+
+    protected ConnectionFactory createConnectionFactory() {
+       return new HornetQConnectionFactory(new TransportConfiguration(InVMConnectorFactory.class.getName()));
+    }
+
+    protected Socket createSocket() throws IOException {
+        return new Socket("127.0.0.1", port);
+    }
+
+    protected String getQueueName() {
+        return "test";
+    }
+
+    public void sendFrame(String data) throws Exception {
+        byte[] bytes = data.getBytes("UTF-8");
+        OutputStream outputStream = stompSocket.getOutputStream();
+        for (int i = 0; i < bytes.length; i++) {
+            outputStream.write(bytes[i]);
+        }
+        outputStream.flush();
+    }
+
+    public String receiveFrame(long timeOut) throws Exception {
+        stompSocket.setSoTimeout((int) timeOut);
+        InputStream is = stompSocket.getInputStream();
+        int c = 0;
+        for (; ;) {
+            c = is.read();
+            System.out.println(c);
+            if (c < 0) {
+                throw new IOException("socket closed.");
+            }
+            else if (c == 0) {
+                c = is.read();
+                Assert.assertEquals("Expecting stomp frame to terminate with \0\n", c, '\n');
+                byte[] ba = inputBuffer.toByteArray();
+                inputBuffer.reset();
+                return new String(ba, "UTF-8");
+            }
+            else {
+                inputBuffer.write(c);
+            }
+        }
+    }
+
+    public void sendMessage(String msg) throws Exception {
+        sendMessage(msg, "foo", "xyz");
+    }
+
+    public void sendMessage(String msg, String propertyName, String propertyValue) throws JMSException {
+        MessageProducer producer = session.createProducer(queue);
+        TextMessage message = session.createTextMessage(msg);
+        message.setStringProperty(propertyName, propertyValue);
+        producer.send(message);
+    }
+
+    public void sendBytesMessage(byte[] msg) throws Exception {
+        MessageProducer producer = session.createProducer(queue);
+        BytesMessage message = session.createBytesMessage();
+        message.writeBytes(msg);
+        producer.send(message);
+    }
+
+    protected void waitForFrameToTakeEffect() throws InterruptedException {
+        // bit of a dirty hack :)
+        // another option would be to force some kind of receipt to be returned
+        // from the frame
+        Thread.sleep(2000);
+    }
+}

Added: branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/integration/stomp/StompTest2.java
===================================================================
--- branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/integration/stomp/StompTest2.java	                        (rev 0)
+++ branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/integration/stomp/StompTest2.java	2010-01-19 16:04:31 UTC (rev 8807)
@@ -0,0 +1,90 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.integration.stomp;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.hornetq.api.core.TransportConfiguration;
+import org.hornetq.core.config.Configuration;
+import org.hornetq.core.config.impl.ConfigurationImpl;
+import org.hornetq.core.server.HornetQServer;
+import org.hornetq.core.server.HornetQServers;
+import org.hornetq.integration.transports.netty.NettyAcceptor;
+import org.hornetq.integration.transports.netty.NettyAcceptorFactory;
+import org.hornetq.integration.transports.netty.TransportConstants;
+
+import junit.framework.TestCase;
+
+/**
+ * A StompTest
+ *
+ * @author jmesnil
+ *
+ *
+ */
+public class StompTest2 extends TestCase
+{
+
+   // Constants -----------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+
+   private HornetQServer server;
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   // Public --------------------------------------------------------
+
+   public void testFoo() throws Exception
+   {
+      Thread.sleep(10);
+   }
+
+   // Package protected ---------------------------------------------
+
+   @Override
+   protected void setUp() throws Exception
+   {
+      super.setUp();
+
+      Configuration config = new ConfigurationImpl();
+      config.setSecurityEnabled(false);
+
+      Map<String, Object> params = new HashMap<String, Object>();
+      params.put(TransportConstants.PROTOCOL_PROP_NAME, TransportConstants.STOMP_PROTOCOL);
+      params.put(TransportConstants.PORT_PROP_NAME, TransportConstants.DEFAULT_STOMP_PORT);
+      TransportConfiguration stompTransport = new TransportConfiguration(NettyAcceptorFactory.class.getName(), params);
+      config.getAcceptorConfigurations().add(stompTransport);
+
+      server = HornetQServers.newHornetQServer(config);
+      server.start();
+   }
+
+   @Override
+   protected void tearDown() throws Exception
+   {
+      server.stop();
+
+      super.tearDown();
+   }
+   // Protected -----------------------------------------------------
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+
+}

Modified: branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyAcceptorFactoryTest.java
===================================================================
--- branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyAcceptorFactoryTest.java	2010-01-19 15:48:47 UTC (rev 8806)
+++ branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyAcceptorFactoryTest.java	2010-01-19 16:04:31 UTC (rev 8807)
@@ -21,6 +21,7 @@
 import org.hornetq.api.core.HornetQBuffer;
 import org.hornetq.api.core.HornetQException;
 import org.hornetq.core.config.impl.ConfigurationImpl;
+import org.hornetq.core.remoting.PacketDecoder;
 import org.hornetq.core.remoting.impl.AbstractBufferHandler;
 import org.hornetq.integration.transports.netty.NettyAcceptor;
 import org.hornetq.integration.transports.netty.NettyAcceptorFactory;
@@ -47,7 +48,7 @@
       BufferHandler handler = new AbstractBufferHandler()
       {
 
-         public void bufferReceived(final Object connectionID, final HornetQBuffer buffer)
+         public void bufferReceived(Object connectionID, HornetQBuffer buffer, PacketDecoder decoder)
          {
          }
       };

Modified: branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyAcceptorTest.java
===================================================================
--- branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyAcceptorTest.java	2010-01-19 15:48:47 UTC (rev 8806)
+++ branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyAcceptorTest.java	2010-01-19 16:04:31 UTC (rev 8807)
@@ -22,6 +22,7 @@
 import org.hornetq.api.core.HornetQBuffer;
 import org.hornetq.api.core.HornetQException;
 import org.hornetq.core.config.impl.ConfigurationImpl;
+import org.hornetq.core.remoting.PacketDecoder;
 import org.hornetq.core.remoting.impl.AbstractBufferHandler;
 import org.hornetq.integration.transports.netty.NettyAcceptor;
 import org.hornetq.integration.transports.netty.TransportConstants;
@@ -60,7 +61,7 @@
       BufferHandler handler = new AbstractBufferHandler()
       {
 
-         public void bufferReceived(final Object connectionID, final HornetQBuffer buffer)
+         public void bufferReceived(final Object connectionID, final HornetQBuffer buffer, final PacketDecoder decoder)
          {
          }
       };

Modified: branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyConnectorTest.java
===================================================================
--- branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyConnectorTest.java	2010-01-19 15:48:47 UTC (rev 8806)
+++ branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyConnectorTest.java	2010-01-19 16:04:31 UTC (rev 8807)
@@ -21,6 +21,7 @@
 
 import org.hornetq.api.core.HornetQBuffer;
 import org.hornetq.api.core.HornetQException;
+import org.hornetq.core.remoting.PacketDecoder;
 import org.hornetq.core.remoting.impl.AbstractBufferHandler;
 import org.hornetq.integration.transports.netty.NettyConnector;
 import org.hornetq.spi.core.remoting.BufferHandler;
@@ -51,7 +52,7 @@
    {
       BufferHandler handler = new AbstractBufferHandler()
       {
-         public void bufferReceived(final Object connectionID, final HornetQBuffer buffer)
+         public void bufferReceived(final Object connectionID, final HornetQBuffer buffer, final PacketDecoder decoder)
          {
          }
       };
@@ -88,7 +89,7 @@
    {
       BufferHandler handler = new AbstractBufferHandler()
       {
-         public void bufferReceived(final Object connectionID, final HornetQBuffer buffer)
+         public void bufferReceived(final Object connectionID, final HornetQBuffer buffer, final PacketDecoder decoder)
          {
          }
       };



More information about the hornetq-commits mailing list