[hornetq-commits] JBoss hornetq SVN: r8843 - in trunk: src/main/org/hornetq/api/core and 19 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Fri Jan 22 10:32:14 EST 2010


Author: jmesnil
Date: 2010-01-22 10:32:13 -0500 (Fri, 22 Jan 2010)
New Revision: 8843

Added:
   trunk/src/main/org/hornetq/core/protocol/core/impl/CoreSessionCallback.java
   trunk/src/main/org/hornetq/core/protocol/stomp/
   trunk/src/main/org/hornetq/core/protocol/stomp/Stomp.java
   trunk/src/main/org/hornetq/core/protocol/stomp/StompConnection.java
   trunk/src/main/org/hornetq/core/protocol/stomp/StompException.java
   trunk/src/main/org/hornetq/core/protocol/stomp/StompFrame.java
   trunk/src/main/org/hornetq/core/protocol/stomp/StompFrameDelimiter.java
   trunk/src/main/org/hornetq/core/protocol/stomp/StompFrameError.java
   trunk/src/main/org/hornetq/core/protocol/stomp/StompMarshaller.java
   trunk/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java
   trunk/src/main/org/hornetq/core/protocol/stomp/StompProtocolManagerFactory.java
   trunk/src/main/org/hornetq/core/protocol/stomp/StompSessionCallback.java
   trunk/src/main/org/hornetq/core/protocol/stomp/StompUtils.java
   trunk/tests/src/org/hornetq/tests/integration/stomp/
   trunk/tests/src/org/hornetq/tests/integration/stomp/StompTest.java
Modified:
   trunk/build-hornetq.xml
   trunk/src/main/org/hornetq/api/core/Message.java
   trunk/src/main/org/hornetq/core/message/impl/MessageImpl.java
   trunk/src/main/org/hornetq/core/protocol/aardvark/impl/AardvarkProtocolManager.java
   trunk/src/main/org/hornetq/core/protocol/core/ServerSessionPacketHandler.java
   trunk/src/main/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java
   trunk/src/main/org/hornetq/core/protocol/core/impl/HornetQPacketHandler.java
   trunk/src/main/org/hornetq/core/remoting/impl/invm/InVMAcceptor.java
   trunk/src/main/org/hornetq/core/remoting/impl/invm/InVMAcceptorFactory.java
   trunk/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java
   trunk/src/main/org/hornetq/core/server/HornetQServer.java
   trunk/src/main/org/hornetq/core/server/ServerSession.java
   trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
   trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
   trunk/src/main/org/hornetq/integration/transports/netty/ChannelPipelineSupport.java
   trunk/src/main/org/hornetq/integration/transports/netty/NettyAcceptor.java
   trunk/src/main/org/hornetq/integration/transports/netty/NettyAcceptorFactory.java
   trunk/src/main/org/hornetq/integration/transports/netty/NettyConnection.java
   trunk/src/main/org/hornetq/integration/transports/netty/TransportConstants.java
   trunk/src/main/org/hornetq/jms/client/HornetQBytesMessage.java
   trunk/src/main/org/hornetq/jms/client/HornetQMapMessage.java
   trunk/src/main/org/hornetq/jms/client/HornetQMessage.java
   trunk/src/main/org/hornetq/jms/client/HornetQObjectMessage.java
   trunk/src/main/org/hornetq/jms/client/HornetQStreamMessage.java
   trunk/src/main/org/hornetq/jms/client/HornetQTextMessage.java
   trunk/src/main/org/hornetq/spi/core/protocol/ProtocolManager.java
   trunk/src/main/org/hornetq/spi/core/remoting/AcceptorFactory.java
   trunk/tests/src/org/hornetq/tests/integration/http/NettyHttpTest.java
   trunk/tests/src/org/hornetq/tests/integration/remoting/AardvarkProtocolTest.java
   trunk/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyAcceptorFactoryTest.java
   trunk/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyAcceptorTest.java
Log:
https://jira.jboss.org/jira/browse/HORNETQ-129: Implement STOMP v1.0

* bootstrap of the Stomp protocol implementation

Modified: trunk/build-hornetq.xml
===================================================================
--- trunk/build-hornetq.xml	2010-01-22 14:25:11 UTC (rev 8842)
+++ trunk/build-hornetq.xml	2010-01-22 15:32:13 UTC (rev 8843)
@@ -1540,23 +1540,20 @@
    </target>
 
    <target name="debugServer" depends="jar">
+      <mkdir dir="logs"/>
       <java classname="org.hornetq.integration.bootstrap.HornetQBootstrapServer" fork="true">
          <jvmarg value="-XX:+UseParallelGC"/>
          <jvmarg value="-Xms512M"/>
          <jvmarg value="-Xmx2048M"/>
          <jvmarg value="-XX:+AggressiveOpts"/>
          <jvmarg value="-XX:+UseFastAccessorMethods"/>
-         <jvmarg value="-Xdebug"/>
-         <jvmarg value="-Xnoagent"/>
-         <jvmarg value="-Djava.compiler=NONE"/>
          <jvmarg value="-Dcom.sun.management.jmxremote"/>
-         <jvmarg value="-Xrunjdwp:transport=dt_socket,server=y,suspend=y,address=8000"/>
-         <jvmarg value="-Djava.util.logging.config.file=${src.config.standalone.non-clustered.dir}/logging.properties"/>
+         <jvmarg value="-Djava.util.logging.config.file=${src.config.trunk.non-clustered.dir}/logging.properties"/>
          <jvmarg value="-Djava.library.path=${native.bin.dir}"/>
-         <jvmarg value="-Djava.naming.factory.initial=org.jnp.interfaces.NamingContextFactory"/>
-         <jvmarg value="-Djava.naming.factory.url.pkgs=org.jboss.naming:org.jnp.interfaces"/>
+         <jvmarg line="-Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=y,address=8000"/>
          <arg line="hornetq-beans.xml"/>
-         <classpath path="${src.config.standalone.non-clustered.dir}" />
+         <classpath path="${src.config.trunk.non-clustered.dir}" />
+         <classpath refid="jms.standalone.server.classpath"/>
       </java>
    </target>
 

Modified: trunk/src/main/org/hornetq/api/core/Message.java
===================================================================
--- trunk/src/main/org/hornetq/api/core/Message.java	2010-01-22 14:25:11 UTC (rev 8842)
+++ trunk/src/main/org/hornetq/api/core/Message.java	2010-01-22 15:32:13 UTC (rev 8843)
@@ -70,6 +70,18 @@
 
    public static final SimpleString HDR_LAST_VALUE_NAME = new SimpleString("_HQ_LVQ_NAME");
 
+   public static final byte DEFAULT_TYPE = 0;
+
+   public static final byte OBJECT_TYPE = 2;
+
+   public static final byte TEXT_TYPE = 3;
+
+   public static final byte BYTES_TYPE = 4;
+
+   public static final byte MAP_TYPE = 5;
+
+   public static final byte STREAM_TYPE = 6;
+   
    /**
     * Returns the messageID.
     * <br>

Modified: trunk/src/main/org/hornetq/core/message/impl/MessageImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/message/impl/MessageImpl.java	2010-01-22 14:25:11 UTC (rev 8842)
+++ trunk/src/main/org/hornetq/core/message/impl/MessageImpl.java	2010-01-22 15:32:13 UTC (rev 8843)
@@ -252,6 +252,12 @@
       return type;
    }
 
+   public void setType(byte type)
+   {
+      this.type = type;
+   }
+
+
    public boolean isDurable()
    {
       return durable;

Modified: trunk/src/main/org/hornetq/core/protocol/aardvark/impl/AardvarkProtocolManager.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/aardvark/impl/AardvarkProtocolManager.java	2010-01-22 14:25:11 UTC (rev 8842)
+++ trunk/src/main/org/hornetq/core/protocol/aardvark/impl/AardvarkProtocolManager.java	2010-01-22 15:32:13 UTC (rev 8843)
@@ -56,6 +56,10 @@
 
       return new ConnectionEntry(conn, 0, 0);
    }
+   
+   public void removeHandler(String name)
+   {
+   }
 
    public void handleBuffer(final RemotingConnection conn, final HornetQBuffer buffer)
    {                 
@@ -69,10 +73,9 @@
                                                       true,
                                                       true,
                                                       true,
-                                                      false);
+                                                      false,
+                                                      new AardvarkSessionCallback(conn.getTransportConnection()));
          
-         session.setCallback(new AardvarkSessionCallback(conn.getTransportConnection()));
-         
          final SimpleString name = new SimpleString("hornetq.aardvark");
          
          session.createQueue(name, name, null, false, false);

Modified: trunk/src/main/org/hornetq/core/protocol/core/ServerSessionPacketHandler.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/core/ServerSessionPacketHandler.java	2010-01-22 14:25:11 UTC (rev 8842)
+++ trunk/src/main/org/hornetq/core/protocol/core/ServerSessionPacketHandler.java	2010-01-22 15:32:13 UTC (rev 8843)
@@ -50,13 +50,11 @@
 import javax.transaction.xa.Xid;
 
 import org.hornetq.api.core.HornetQException;
-import org.hornetq.api.core.SimpleString;
 import org.hornetq.core.exception.HornetQXAException;
 import org.hornetq.core.journal.IOAsyncTask;
 import org.hornetq.core.logging.Logger;
 import org.hornetq.core.persistence.OperationContext;
 import org.hornetq.core.persistence.StorageManager;
-import org.hornetq.core.protocol.core.impl.CoreProtocolManager;
 import org.hornetq.core.protocol.core.impl.PacketImpl;
 import org.hornetq.core.protocol.core.impl.wireformat.CreateQueueMessage;
 import org.hornetq.core.protocol.core.impl.wireformat.HornetQExceptionMessage;
@@ -71,12 +69,8 @@
 import org.hornetq.core.protocol.core.impl.wireformat.SessionDeleteQueueMessage;
 import org.hornetq.core.protocol.core.impl.wireformat.SessionExpiredMessage;
 import org.hornetq.core.protocol.core.impl.wireformat.SessionForceConsumerDelivery;
-import org.hornetq.core.protocol.core.impl.wireformat.SessionProducerCreditsMessage;
 import org.hornetq.core.protocol.core.impl.wireformat.SessionQueueQueryMessage;
 import org.hornetq.core.protocol.core.impl.wireformat.SessionQueueQueryResponseMessage;
-import org.hornetq.core.protocol.core.impl.wireformat.SessionReceiveContinuationMessage;
-import org.hornetq.core.protocol.core.impl.wireformat.SessionReceiveLargeMessage;
-import org.hornetq.core.protocol.core.impl.wireformat.SessionReceiveMessage;
 import org.hornetq.core.protocol.core.impl.wireformat.SessionRequestProducerCreditsMessage;
 import org.hornetq.core.protocol.core.impl.wireformat.SessionSendContinuationMessage;
 import org.hornetq.core.protocol.core.impl.wireformat.SessionSendLargeMessage;
@@ -100,7 +94,6 @@
 import org.hornetq.core.server.QueueQueryResult;
 import org.hornetq.core.server.ServerMessage;
 import org.hornetq.core.server.ServerSession;
-import org.hornetq.spi.core.protocol.SessionCallback;
 
 /**
  * A ServerSessionPacketHandler
@@ -110,12 +103,10 @@
  * @author <a href="mailto:andy.taylor at jboss.org>Andy Taylor</a>
  * @author <a href="mailto:clebert.suconic at jboss.org>Clebert Suconic</a>
  */
-public class ServerSessionPacketHandler implements ChannelHandler, CloseListener, FailureListener, SessionCallback
+public class ServerSessionPacketHandler implements ChannelHandler, CloseListener, FailureListener
 {
    private static final Logger log = Logger.getLogger(ServerSessionPacketHandler.class);
 
-   private final CoreProtocolManager protocolManager;
-   
    private final ServerSession session;
 
    private final OperationContext sessionContext;
@@ -127,14 +118,11 @@
 
    private volatile CoreRemotingConnection remotingConnection;
 
-   public ServerSessionPacketHandler(final CoreProtocolManager protocolManager,
-                                     final ServerSession session,
+   public ServerSessionPacketHandler(final ServerSession session,
                                      final OperationContext sessionContext,
                                      final StorageManager storageManager,
                                      final Channel channel)
    {
-      this.protocolManager = protocolManager;
-      
       this.session = session;
 
       this.storageManager = storageManager;
@@ -567,46 +555,7 @@
          channel.close();
       }
    }
-
-   public int sendLargeMessage(long consumerID, byte[] headerBuffer, long bodySize, int deliveryCount)
-   {
-      Packet packet = new SessionReceiveLargeMessage(consumerID, headerBuffer, bodySize, deliveryCount);
-
-      channel.send(packet);
-
-      return packet.getPacketSize();
-   }
-
-   public int sendLargeMessageContinuation(long consumerID, byte[] body, boolean continues, boolean requiresResponse)
-   {
-      Packet packet = new SessionReceiveContinuationMessage(consumerID, body, continues, requiresResponse);
-
-      channel.send(packet);
-
-      return packet.getPacketSize();
-   }
-
-   public int sendMessage(ServerMessage message, long consumerID, int deliveryCount)
-   {
-      Packet packet = new SessionReceiveMessage(consumerID, message, deliveryCount);
-
-      channel.send(packet);
-
-      return packet.getPacketSize();
-   }
-
-   public void sendProducerCreditsMessage(int credits, SimpleString address, int offset)
-   {
-      Packet packet = new SessionProducerCreditsMessage(credits, address, offset);
-
-      channel.send(packet);
-   }
    
-   public void closed()
-   {
-      protocolManager.removeHandler(session.getName());
-   }
-   
    public int transferConnection(final CoreRemotingConnection newConnection, final int lastReceivedCommandID)
    {
       // We need to disable delivery on all the consumers while the transfer is occurring- otherwise packets might get

Modified: trunk/src/main/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java	2010-01-22 14:25:11 UTC (rev 8842)
+++ trunk/src/main/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java	2010-01-22 15:32:13 UTC (rev 8843)
@@ -30,6 +30,7 @@
 import org.hornetq.core.server.HornetQServer;
 import org.hornetq.spi.core.protocol.ConnectionEntry;
 import org.hornetq.spi.core.protocol.ProtocolManager;
+import org.hornetq.spi.core.protocol.RemotingConnection;
 import org.hornetq.spi.core.remoting.Connection;
 
 /**
@@ -120,7 +121,7 @@
       sessionHandlers.remove(name);
    }
 
-   public void bufferReceived(Object connectionID, HornetQBuffer buffer)
+   public void handleBuffer(RemotingConnection connection, HornetQBuffer buffer)
    {
    }
 

Added: trunk/src/main/org/hornetq/core/protocol/core/impl/CoreSessionCallback.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/core/impl/CoreSessionCallback.java	                        (rev 0)
+++ trunk/src/main/org/hornetq/core/protocol/core/impl/CoreSessionCallback.java	2010-01-22 15:32:13 UTC (rev 8843)
@@ -0,0 +1,86 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.hornetq.core.protocol.core.impl;
+
+import org.hornetq.api.core.SimpleString;
+import org.hornetq.core.protocol.core.Channel;
+import org.hornetq.core.protocol.core.Packet;
+import org.hornetq.core.protocol.core.impl.wireformat.SessionProducerCreditsMessage;
+import org.hornetq.core.protocol.core.impl.wireformat.SessionReceiveContinuationMessage;
+import org.hornetq.core.protocol.core.impl.wireformat.SessionReceiveLargeMessage;
+import org.hornetq.core.protocol.core.impl.wireformat.SessionReceiveMessage;
+import org.hornetq.core.server.ServerMessage;
+import org.hornetq.spi.core.protocol.ProtocolManager;
+import org.hornetq.spi.core.protocol.SessionCallback;
+
+/**
+ * A CoreSessionCallback
+ *
+ * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
+ *
+ *
+ */
+public final class CoreSessionCallback implements SessionCallback
+{
+   private final Channel channel;
+
+   private ProtocolManager protocolManager;
+
+   private String name;
+
+   public CoreSessionCallback(String name, ProtocolManager protocolManager, Channel channel)
+   {
+      this.name = name;
+      this.protocolManager = protocolManager;
+      this.channel = channel;
+   }
+
+   public int sendLargeMessage(long consumerID, byte[] headerBuffer, long bodySize, int deliveryCount)
+   {
+      Packet packet = new SessionReceiveLargeMessage(consumerID, headerBuffer, bodySize, deliveryCount);
+
+      channel.send(packet);
+
+      return packet.getPacketSize();
+   }
+
+   public int sendLargeMessageContinuation(long consumerID, byte[] body, boolean continues, boolean requiresResponse)
+   {
+      Packet packet = new SessionReceiveContinuationMessage(consumerID, body, continues, requiresResponse);
+
+      channel.send(packet);
+
+      return packet.getPacketSize();
+   }
+
+   public int sendMessage(ServerMessage message, long consumerID, int deliveryCount)
+   {
+      Packet packet = new SessionReceiveMessage(consumerID, message, deliveryCount);
+
+      channel.send(packet);
+
+      return packet.getPacketSize();
+   }
+
+   public void sendProducerCreditsMessage(int credits, SimpleString address, int offset)
+   {
+      Packet packet = new SessionProducerCreditsMessage(credits, address, offset);
+
+      channel.send(packet);
+   }
+
+   public void closed()
+   {
+      protocolManager.removeHandler(name);
+   }
+}
\ No newline at end of file

Modified: trunk/src/main/org/hornetq/core/protocol/core/impl/HornetQPacketHandler.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/core/impl/HornetQPacketHandler.java	2010-01-22 14:25:11 UTC (rev 8842)
+++ trunk/src/main/org/hornetq/core/protocol/core/impl/HornetQPacketHandler.java	2010-01-22 15:32:13 UTC (rev 8843)
@@ -163,18 +163,15 @@
                                                       request.isAutoCommitSends(),
                                                       request.isAutoCommitAcks(),
                                                       request.isPreAcknowledge(),
-                                                      request.isXA());
+                                                      request.isXA(),
+                                                      new CoreSessionCallback(request.getName(), protocolManager, channel));
 
-         ServerSessionPacketHandler handler = new ServerSessionPacketHandler(protocolManager,
-                                                                             session,
+         ServerSessionPacketHandler handler = new ServerSessionPacketHandler(session,
                                                                              server.getStorageManager()
                                                                                    .newContext(server.getExecutorFactory()
                                                                                                      .getExecutor()),
                                                                              server.getStorageManager(),
                                                                              channel);
-
-         session.setCallback(handler);
-
          channel.setHandler(handler);
 
          // TODO - where is this removed?

Added: trunk/src/main/org/hornetq/core/protocol/stomp/Stomp.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/stomp/Stomp.java	                        (rev 0)
+++ trunk/src/main/org/hornetq/core/protocol/stomp/Stomp.java	2010-01-22 15:32:13 UTC (rev 8843)
@@ -0,0 +1,124 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.hornetq.core.protocol.stomp;
+
+
+/**
+ * The standard verbs and headers used for the <a href="http://stomp.codehaus.org/">STOMP</a> protocol.
+ *
+ * @version $Revision: 57 $
+ */
+public interface Stomp {
+    String NULL = "\u0000";
+    String NEWLINE = "\n";
+
+    public static interface Commands {
+        String CONNECT = "CONNECT";
+        String SEND = "SEND";
+        String DISCONNECT = "DISCONNECT";
+        String SUBSCRIBE = "SUBSCRIBE";
+        String UNSUBSCRIBE = "UNSUBSCRIBE";
+        String BEGIN_TRANSACTION = "BEGIN";
+        String COMMIT_TRANSACTION = "COMMIT";
+        String ABORT_TRANSACTION = "ABORT";
+        String BEGIN = "BEGIN";
+        String COMMIT = "COMMIT";
+        String ABORT = "ABORT";
+        String ACK = "ACK";
+    }
+
+    public interface Responses {
+        String CONNECTED = "CONNECTED";
+        String ERROR = "ERROR";
+        String MESSAGE = "MESSAGE";
+        String RECEIPT = "RECEIPT";
+    }
+
+    public interface Headers {
+        String SEPERATOR = ":";
+        String RECEIPT_REQUESTED = "receipt";
+        String TRANSACTION = "transaction";
+        String CONTENT_LENGTH = "content-length";
+
+        public interface Response {
+            String RECEIPT_ID = "receipt-id";
+        }
+
+        public interface Send {
+            String DESTINATION = "destination";
+            String CORRELATION_ID = "correlation-id";
+            String REPLY_TO = "reply-to";
+            String EXPIRATION_TIME = "expires";
+            String PRIORITY = "priority";
+            String TYPE = "type";
+            Object PERSISTENT = "persistent";
+        }
+
+        public interface Message {
+            String MESSAGE_ID = "message-id";
+            String DESTINATION = "destination";
+            String CORRELATION_ID = "correlation-id";
+            String EXPIRATION_TIME = "expires";
+            String REPLY_TO = "reply-to";
+            String PRORITY = "priority";
+            String REDELIVERED = "redelivered";
+            String TIMESTAMP = "timestamp";
+            String TYPE = "type";
+            String SUBSCRIPTION = "subscription";
+        }
+
+        public interface Subscribe {
+            String DESTINATION = "destination";
+            String ACK_MODE = "ack";
+            String ID = "id";
+            String SELECTOR = "selector";
+            String DURABLE_SUBSCRIPTION_NAME = "durable-subscription-name";
+            String NO_LOCAL = "no-local";
+
+            public interface AckModeValues {
+                String AUTO = "auto";
+                String CLIENT = "client";
+            }
+        }
+
+        public interface Unsubscribe {
+            String DESTINATION = "destination";
+            String ID = "id";
+        }
+
+        public interface Connect {
+            String LOGIN = "login";
+            String PASSCODE = "passcode";
+            String CLIENT_ID = "client-id";
+            String REQUEST_ID = "request-id";
+        }
+
+        public interface Error {
+            String MESSAGE = "message";
+        }
+
+        public interface Connected {
+            String SESSION = "session";
+            String RESPONSE_ID = "response-id";
+        }
+
+        public interface Ack {
+            String MESSAGE_ID = "message-id";
+        }
+    }
+}

Added: trunk/src/main/org/hornetq/core/protocol/stomp/StompConnection.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/stomp/StompConnection.java	                        (rev 0)
+++ trunk/src/main/org/hornetq/core/protocol/stomp/StompConnection.java	2010-01-22 15:32:13 UTC (rev 8843)
@@ -0,0 +1,140 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.protocol.stomp;
+
+import java.util.Collections;
+import java.util.List;
+
+import org.hornetq.api.core.HornetQBuffer;
+import org.hornetq.api.core.HornetQBuffers;
+import org.hornetq.api.core.HornetQException;
+import org.hornetq.core.logging.Logger;
+import org.hornetq.core.remoting.CloseListener;
+import org.hornetq.core.remoting.FailureListener;
+import org.hornetq.spi.core.protocol.ProtocolManager;
+import org.hornetq.spi.core.protocol.RemotingConnection;
+import org.hornetq.spi.core.remoting.Connection;
+
+/**
+ * A StompConnection
+ *
+ * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
+ *
+ *
+ */
+public class StompConnection implements RemotingConnection
+{
+   private static final Logger log = Logger.getLogger(StompConnection.class);
+
+   private final ProtocolManager manager;
+   
+   private final Connection transportConnection;
+      
+   StompConnection(final Connection transportConnection, final ProtocolManager manager)
+   {
+      this.transportConnection = transportConnection;
+      
+      this.manager = manager;
+   }
+
+   public void addCloseListener(CloseListener listener)
+   {
+   }
+
+   public void addFailureListener(FailureListener listener)
+   {
+   }
+
+   public boolean checkDataReceived()
+   {
+      return true;
+   }
+
+   public HornetQBuffer createBuffer(int size)
+   {
+      return HornetQBuffers.dynamicBuffer(size);
+   }
+
+   public void destroy()
+   {
+   }
+
+   public void disconnect()
+   {
+   }
+
+   public void fail(HornetQException me)
+   {
+   }
+
+   public void flush()
+   {  
+   }
+
+   public List<FailureListener> getFailureListeners()
+   {
+      return Collections.EMPTY_LIST;
+   }
+
+   public Object getID()
+   {
+      return transportConnection.getID();
+   }
+
+   public String getRemoteAddress()
+   {      
+      return transportConnection.getRemoteAddress();
+   }
+
+   public Connection getTransportConnection()
+   {
+      return transportConnection;
+   }
+
+   public boolean isClient()
+   {
+      return false;
+   }
+
+   public boolean isDestroyed()
+   {
+      return false;
+   }
+
+   public boolean removeCloseListener(CloseListener listener)
+   {
+      return false;
+   }
+
+   public boolean removeFailureListener(FailureListener listener)
+   {
+      return false;
+   }
+
+   public void setFailureListeners(List<FailureListener> listeners)
+   {
+   }
+
+   
+   public void bufferReceived(Object connectionID, HornetQBuffer buffer)
+   {
+      manager.handleBuffer(this, buffer);
+   }
+
+   public int isReadyToHandle(HornetQBuffer buffer)
+   {
+      return -1;
+   }
+
+}

Added: trunk/src/main/org/hornetq/core/protocol/stomp/StompException.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/stomp/StompException.java	                        (rev 0)
+++ trunk/src/main/org/hornetq/core/protocol/stomp/StompException.java	2010-01-22 15:32:13 UTC (rev 8843)
@@ -0,0 +1,50 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.hornetq.core.protocol.stomp;
+
+import java.io.IOException;
+
+/**
+ * @author <a href="http://hiramchirino.com">chirino</a>
+ */
+class StompException extends IOException {
+    private static final long serialVersionUID = -2869735532997332242L;
+    private final boolean fatal;
+
+    public StompException() {
+        this(null);
+    }
+
+    public StompException(String s) {
+        this(s, false);
+    }
+
+    public StompException(String s, boolean fatal) {
+        this(s, fatal, null);
+    }
+
+    public StompException(String s, boolean fatal, Throwable cause) {
+        super(s);
+        this.fatal = fatal;
+        initCause(cause);
+    }
+
+    public boolean isFatal() {
+        return fatal;
+    }
+}

Added: trunk/src/main/org/hornetq/core/protocol/stomp/StompFrame.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/stomp/StompFrame.java	                        (rev 0)
+++ trunk/src/main/org/hornetq/core/protocol/stomp/StompFrame.java	2010-01-22 15:32:13 UTC (rev 8843)
@@ -0,0 +1,70 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.hornetq.core.protocol.stomp;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Represents all the data in a STOMP frame.
+ *
+ * @author <a href="http://hiramchirino.com">chirino</a>
+ */
+class StompFrame
+{
+   private static final byte[] NO_DATA = new byte[] {};
+
+   private String command;
+
+   private Map<String, Object> headers;
+
+   private byte[] content = StompFrame.NO_DATA;
+
+   public StompFrame()
+   {
+      this.headers = new HashMap<String, Object>();
+   }
+
+   public StompFrame(String command, Map<String, Object> headers, byte[] data)
+   {
+      this.command = command;
+      this.headers = headers;
+      this.content = data;
+   }
+
+   public String getCommand()
+   {
+      return command;
+   }
+
+   public byte[] getContent()
+   {
+      return content;
+   }
+
+   public Map<String, Object> getHeaders()
+   {
+      return headers;
+   }
+
+   @Override
+   public String toString()
+   {
+      return "StompFrame[command=" + command + ", headers=" + headers + ",content-length=" + content.length + "]";
+   }
+}

Added: trunk/src/main/org/hornetq/core/protocol/stomp/StompFrameDelimiter.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/stomp/StompFrameDelimiter.java	                        (rev 0)
+++ trunk/src/main/org/hornetq/core/protocol/stomp/StompFrameDelimiter.java	2010-01-22 15:32:13 UTC (rev 8843)
@@ -0,0 +1,33 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.protocol.stomp;
+
+import org.jboss.netty.handler.codec.frame.DelimiterBasedFrameDecoder;
+import org.jboss.netty.handler.codec.frame.Delimiters;
+
+/**
+ * A StompFrameDelimiter
+ *
+ * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
+ */
+public class StompFrameDelimiter extends DelimiterBasedFrameDecoder
+{
+
+   private static final int MAX_DATA_LENGTH = 1024 * 1024 * 100;
+
+   public StompFrameDelimiter()
+   {
+      super(MAX_DATA_LENGTH, false, Delimiters.nulDelimiter());
+   }
+}

Added: trunk/src/main/org/hornetq/core/protocol/stomp/StompFrameError.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/stomp/StompFrameError.java	                        (rev 0)
+++ trunk/src/main/org/hornetq/core/protocol/stomp/StompFrameError.java	2010-01-22 15:32:13 UTC (rev 8843)
@@ -0,0 +1,35 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.hornetq.core.protocol.stomp;
+
+/**
+ * Command indicating that an invalid Stomp Frame was received.
+ *
+ * @author <a href="http://hiramchirino.com">chirino</a>
+ */
+class StompFrameError extends StompFrame {
+    private final StompException exception;
+
+    public StompFrameError(StompException exception) {
+        this.exception = exception;
+    }
+
+    public StompException getException() {
+        return exception;
+    }
+}

Added: trunk/src/main/org/hornetq/core/protocol/stomp/StompMarshaller.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/stomp/StompMarshaller.java	                        (rev 0)
+++ trunk/src/main/org/hornetq/core/protocol/stomp/StompMarshaller.java	2010-01-22 15:32:13 UTC (rev 8843)
@@ -0,0 +1,191 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.hornetq.core.protocol.stomp;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+
+import org.hornetq.api.core.HornetQBuffer;
+
+/**
+ * Implements marshalling and unmarsalling the <a href="http://stomp.codehaus.org/">Stomp</a> protocol.
+ */
+class StompMarshaller {
+    private static final byte[] NO_DATA = new byte[]{};
+    private static final byte[] END_OF_FRAME = new byte[]{0, '\n'};
+    private static final int MAX_COMMAND_LENGTH = 1024;
+    private static final int MAX_HEADER_LENGTH = 1024 * 10;
+    private static final int MAX_HEADERS = 1000;
+    private static final int MAX_DATA_LENGTH = 1024 * 1024 * 100;
+    private int version = 1;
+
+    public int getVersion() {
+        return version;
+    }
+
+    public void setVersion(int version) {
+        this.version = version;
+    }
+
+    public byte[] marshal(StompFrame command) throws IOException {
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        DataOutputStream dos = new DataOutputStream(baos);
+        marshal(command, dos);
+        dos.close();
+        return baos.toByteArray();
+    }
+
+    public void marshal(StompFrame stomp, DataOutput os) throws IOException {
+        StringBuffer buffer = new StringBuffer();
+        buffer.append(stomp.getCommand());
+        buffer.append(Stomp.NEWLINE);
+
+        // Output the headers.
+        for (Iterator iter = stomp.getHeaders().entrySet().iterator(); iter.hasNext();) {
+            Map.Entry entry = (Map.Entry) iter.next();
+            buffer.append(entry.getKey());
+            buffer.append(Stomp.Headers.SEPERATOR);
+            buffer.append(entry.getValue());
+            buffer.append(Stomp.NEWLINE);
+        }
+
+        // Add a newline to seperate the headers from the content.
+        buffer.append(Stomp.NEWLINE);
+
+        os.write(buffer.toString().getBytes("UTF-8"));
+        os.write(stomp.getContent());
+        os.write(END_OF_FRAME);
+    }
+
+    public StompFrame unmarshal(HornetQBuffer in) throws IOException {
+
+        try {
+            String action = null;
+
+            // skip white space to next real action line
+            while (true) {
+                action = readLine(in, MAX_COMMAND_LENGTH, "The maximum command length was exceeded");
+                if (action == null) {
+                    throw new IOException("connection was closed");
+                }
+                else {
+                    action = action.trim();
+                    if (action.length() > 0) {
+                        break;
+                    }
+                }
+            }
+
+            // Parse the headers
+            HashMap headers = new HashMap(25);
+            while (true) {
+                String line = readLine(in, MAX_HEADER_LENGTH, "The maximum header length was exceeded");
+                if (line != null && line.trim().length() > 0) {
+
+                    if (headers.size() > MAX_HEADERS) {
+                        throw new StompException("The maximum number of headers was exceeded", true);
+                    }
+
+                    try {
+                        int seperator_index = line.indexOf(Stomp.Headers.SEPERATOR);
+                        String name = line.substring(0, seperator_index).trim();
+                        String value = line.substring(seperator_index + 1, line.length()).trim();
+                        headers.put(name, value);
+                    }
+                    catch (Exception e) {
+                        throw new StompException("Unable to parser header line [" + line + "]", true);
+                    }
+                }
+                else {
+                    break;
+                }
+            }
+
+            // Read in the data part.
+            byte[] data = NO_DATA;
+            String contentLength = (String) headers.get(Stomp.Headers.CONTENT_LENGTH);
+            if (contentLength != null) {
+
+                // Bless the client, he's telling us how much data to read in.
+                int length;
+                try {
+                    length = Integer.parseInt(contentLength.trim());
+                }
+                catch (NumberFormatException e) {
+                    throw new StompException("Specified content-length is not a valid integer", true);
+                }
+
+                if (length > MAX_DATA_LENGTH) {
+                    throw new StompException("The maximum data length was exceeded", true);
+                }
+
+                data = new byte[length];
+                in.readBytes(data);
+
+                if (in.readByte() != 0) {
+                    throw new StompException(Stomp.Headers.CONTENT_LENGTH + " bytes were read and " + "there was no trailing null byte", true);
+                }
+            }
+            else {
+
+                // We don't know how much to read.. data ends when we hit a 0
+                byte b;
+                ByteArrayOutputStream baos = null;
+                while (in.readableBytes() > 0 && (b = in.readByte()) != 0) {
+
+                    if (baos == null) {
+                        baos = new ByteArrayOutputStream();
+                    }
+                    else if (baos.size() > MAX_DATA_LENGTH) {
+                        throw new StompException("The maximum data length was exceeded", true);
+                    }
+
+                    baos.write(b);
+                }
+
+                if (baos != null) {
+                    baos.close();
+                    data = baos.toByteArray();
+                }
+            }
+
+            return new StompFrame(action, headers, data);
+        }
+        catch (StompException e) {
+            return new StompFrameError(e);
+        }
+    }
+
+    protected String readLine(HornetQBuffer in, int maxLength, String errorMessage) throws IOException {
+        byte b;
+        ByteArrayOutputStream baos = new ByteArrayOutputStream(maxLength);
+        while ((b = in.readByte()) != '\n') {
+            if (baos.size() > maxLength) {
+                throw new StompException(errorMessage, true);
+            }
+            baos.write(b);
+        }
+        byte[] sequence = baos.toByteArray();
+        return new String(sequence, "UTF-8");
+    }
+}

Added: trunk/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java	                        (rev 0)
+++ trunk/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java	2010-01-22 15:32:13 UTC (rev 8843)
@@ -0,0 +1,295 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.protocol.stomp;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.io.PrintWriter;
+import java.io.UnsupportedEncodingException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.hornetq.api.core.HornetQBuffer;
+import org.hornetq.api.core.HornetQBuffers;
+import org.hornetq.api.core.HornetQException;
+import org.hornetq.api.core.Interceptor;
+import org.hornetq.api.core.Message;
+import org.hornetq.api.core.SimpleString;
+import org.hornetq.api.core.client.HornetQClient;
+import org.hornetq.core.logging.Logger;
+import org.hornetq.core.server.HornetQServer;
+import org.hornetq.core.server.ServerMessage;
+import org.hornetq.core.server.ServerSession;
+import org.hornetq.core.server.impl.ServerMessageImpl;
+import org.hornetq.spi.core.protocol.ConnectionEntry;
+import org.hornetq.spi.core.protocol.ProtocolManager;
+import org.hornetq.spi.core.protocol.RemotingConnection;
+import org.hornetq.spi.core.remoting.Connection;
+import org.hornetq.utils.UUIDGenerator;
+
+/**
+ * StompProtocolManager
+ * 
+ * A stupid protocol to demonstrate how to implement a new protocol in HornetQ
+ *
+ * @author Tim Fox
+ *
+ *
+ */
+public class StompProtocolManager implements ProtocolManager
+{
+   private static final Logger log = Logger.getLogger(StompProtocolManager.class);
+
+   private final HornetQServer server;
+
+   private final StompMarshaller marshaller;
+
+   private final Map<RemotingConnection, ServerSession> sessions = new HashMap<RemotingConnection, ServerSession>();
+
+   public StompProtocolManager(final HornetQServer server, final List<Interceptor> interceptors)
+   {
+      this.server = server;
+      this.marshaller = new StompMarshaller();
+   }
+
+   public ConnectionEntry createConnectionEntry(final Connection connection)
+   {
+      StompConnection conn = new StompConnection(connection, this);
+
+      return new ConnectionEntry(conn, 0, 0);
+   }
+
+   public void removeHandler(String name)
+   {
+   }
+
+   public void handleBuffer(RemotingConnection connection, HornetQBuffer buffer)
+   {
+      StompFrame frame = null;
+      try
+      {
+         frame = marshaller.unmarshal(buffer);
+         System.out.println("RECEIVED " + frame);
+
+         String command = frame.getCommand();
+
+         StompFrame response = null;
+         if (Stomp.Commands.CONNECT.equals(command))
+         {
+            response = onConnect(frame, server, connection);
+         }
+         else if (Stomp.Commands.DISCONNECT.equals(command))
+         {
+            response = onDisconnect(frame, server, connection);
+         }
+         else if (Stomp.Commands.SEND.equals(command))
+         {
+            response = onSend(frame, server, connection);
+         }
+         else if (Stomp.Commands.SUBSCRIBE.equals(command))
+         {
+            response = onSubscribe(frame, server, connection);
+         }
+         else
+         {
+            log.error("Unsupported Stomp frame: " + frame);
+            response = new StompFrame(Stomp.Responses.ERROR,
+                                      new HashMap<String, Object>(),
+                                      ("Unsupported frame: " + command).getBytes());
+         }
+
+         if (response != null)
+         {
+            send(connection, response);
+         }
+      }
+      catch (StompException ex)
+      {
+         ByteArrayOutputStream baos = new ByteArrayOutputStream();
+         try
+         {
+            // Let the stomp client know about any protocol errors.
+            PrintWriter stream = new PrintWriter(new OutputStreamWriter(baos, "UTF-8"));
+            ex.printStackTrace(stream);
+            stream.close();
+         }
+         catch (UnsupportedEncodingException e)
+         {
+            // TODO Auto-generated catch block
+            e.printStackTrace();
+         }
+
+         Map<String, Object> headers = new HashMap<String, Object>();
+         headers.put(Stomp.Headers.Error.MESSAGE, ex.getMessage());
+
+         final String receiptId = (String)frame.getHeaders().get(Stomp.Headers.RECEIPT_REQUESTED);
+         if (receiptId != null)
+         {
+            headers.put(Stomp.Headers.Response.RECEIPT_ID, receiptId);
+         }
+
+         StompFrame errorMessage = new StompFrame(Stomp.Responses.ERROR, headers, baos.toByteArray());
+         try
+         {
+            send(connection, errorMessage);
+         }
+         catch (IOException e)
+         {
+            // TODO Auto-generated catch block
+            e.printStackTrace();
+         }
+      }
+      catch (Exception ex)
+      {
+         ex.printStackTrace();
+      }
+   }
+
+   private StompFrame onSubscribe(StompFrame frame, HornetQServer server, RemotingConnection connection) throws Exception,
+                                                                                                        StompException,
+                                                                                                        HornetQException
+   {
+      Map<String, Object> headers = frame.getHeaders();
+      String queue = (String)headers.get(Stomp.Headers.Send.DESTINATION);
+      SimpleString queueName = SimpleString.toSimpleString(StompUtils.toHornetQAddress(queue));
+
+      ServerSession session = checkAndGetSession(connection);
+      long consumerID = server.getStorageManager().generateUniqueID();
+      session.createConsumer(consumerID, queueName, null, false);
+      session.receiveConsumerCredits(consumerID, -1);
+      session.start();
+
+      return null;
+   }
+
+   private ServerSession checkAndGetSession(RemotingConnection connection) throws StompException
+   {
+      ServerSession session = sessions.get(connection);
+      if (session == null)
+      {
+         throw new StompException("Not connected");
+      }
+      return session;
+   }
+
+   private StompFrame onDisconnect(StompFrame frame, HornetQServer server, RemotingConnection connection) throws StompException
+   {
+      ServerSession session = checkAndGetSession(connection);
+      if (session != null)
+      {
+         try
+         {
+            session.close();
+         }
+         catch (Exception e)
+         {
+            throw new StompException(e.getMessage());
+         }
+         sessions.remove(connection);
+      }
+      return null;
+   }
+
+   private StompFrame onSend(StompFrame frame, HornetQServer server, RemotingConnection connection) throws Exception
+   {
+      ServerSession session = checkAndGetSession(connection);
+
+      Map<String, Object> headers = frame.getHeaders();
+      String queue = (String)headers.get(Stomp.Headers.Send.DESTINATION);
+      /*
+      String type = (String)headers.get(Stomp.Headers.Send.TYPE);
+      long expiration = (Long)headers.get(Stomp.Headers.Send.EXPIRATION_TIME);
+      byte priority = (Byte)headers.get(Stomp.Headers.Send.PRIORITY);
+      boolean durable = (Boolean)headers.get(Stomp.Headers.Send.PERSISTENT);
+      */
+      byte type = Message.TEXT_TYPE;
+      if (headers.containsKey(Stomp.Headers.CONTENT_LENGTH))
+      {
+         type = Message.BYTES_TYPE;
+      }
+      long timestamp = System.currentTimeMillis();
+      boolean durable = false;
+      long expiration = -1;
+      byte priority = 9;
+      SimpleString address = SimpleString.toSimpleString(StompUtils.toHornetQAddress(queue));
+
+      ServerMessageImpl message = new ServerMessageImpl(server.getStorageManager().generateUniqueID(), 512);
+      message.setType(type);
+      message.setTimestamp(timestamp);
+      message.setAddress(address);
+      byte[] content = frame.getContent();
+      if (type == Message.TEXT_TYPE)
+      {
+         message.getBodyBuffer().writeNullableSimpleString(SimpleString.toSimpleString(new String(content)));
+      }
+      else
+      {
+         message.getBodyBuffer().writeBytes(content);
+      }
+
+      session.send(message);
+      if (headers.containsKey(Stomp.Headers.RECEIPT_REQUESTED))
+      {
+         Map<String, Object> h = new HashMap<String, Object>();
+         h.put(Stomp.Headers.Response.RECEIPT_ID, headers.get(Stomp.Headers.RECEIPT_REQUESTED));
+         return new StompFrame(Stomp.Responses.RECEIPT, h, new byte[] {});
+      }
+      else
+      {
+         return null;
+      }
+   }
+
+   private StompFrame onConnect(StompFrame frame, HornetQServer server, final RemotingConnection connection) throws Exception
+   {
+      Map<String, Object> headers = frame.getHeaders();
+      String login = (String)headers.get(Stomp.Headers.Connect.LOGIN);
+      String passcode = (String)headers.get(Stomp.Headers.Connect.PASSCODE);
+      String requestID = (String)headers.get(Stomp.Headers.Connect.REQUEST_ID);
+
+      String name = UUIDGenerator.getInstance().generateStringUUID();
+      ServerSession session = server.createSession(name,
+                                                   login,
+                                                   passcode,
+                                                   HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE,
+                                                   connection,
+                                                   true,
+                                                   true,
+                                                   false,
+                                                   false,
+                                                   new StompSessionCallback(marshaller, connection));
+      sessions.put(connection, session);
+      System.out.println(">>> created session " + session);
+      HashMap<String, Object> h = new HashMap<String, Object>();
+      h.put(Stomp.Headers.Connected.SESSION, name);
+      h.put(Stomp.Headers.Connected.RESPONSE_ID, requestID);
+      return new StompFrame(Stomp.Responses.CONNECTED, h, new byte[] {});
+   }
+
+   private void send(RemotingConnection connection, StompFrame frame) throws IOException
+   {
+      System.out.println("SENDING >>> " + frame);
+      byte[] bytes = marshaller.marshal(frame);
+      HornetQBuffer buffer = HornetQBuffers.wrappedBuffer(bytes);
+      System.out.println("ready to send reply: " + buffer);
+      connection.getTransportConnection().write(buffer, true);
+   }
+
+   public int isReadyToHandle(HornetQBuffer buffer)
+   {
+      return -1;
+   }
+}

Added: trunk/src/main/org/hornetq/core/protocol/stomp/StompProtocolManagerFactory.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/stomp/StompProtocolManagerFactory.java	                        (rev 0)
+++ trunk/src/main/org/hornetq/core/protocol/stomp/StompProtocolManagerFactory.java	2010-01-22 15:32:13 UTC (rev 8843)
@@ -0,0 +1,38 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.protocol.stomp;
+
+import java.util.List;
+
+import org.hornetq.api.core.Interceptor;
+import org.hornetq.core.server.HornetQServer;
+import org.hornetq.spi.core.protocol.ProtocolManager;
+import org.hornetq.spi.core.protocol.ProtocolManagerFactory;
+
+/**
+ * A StompProtocolManagerFactory
+ *
+ * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
+ *
+ *
+ */
+public class StompProtocolManagerFactory implements ProtocolManagerFactory
+{
+
+   public ProtocolManager createProtocolManager(final HornetQServer server, final List<Interceptor> interceptors)
+   {
+      return new StompProtocolManager(server, interceptors);
+   }
+
+}

Added: trunk/src/main/org/hornetq/core/protocol/stomp/StompSessionCallback.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/stomp/StompSessionCallback.java	                        (rev 0)
+++ trunk/src/main/org/hornetq/core/protocol/stomp/StompSessionCallback.java	2010-01-22 15:32:13 UTC (rev 8843)
@@ -0,0 +1,93 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.hornetq.core.protocol.stomp;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.hornetq.api.core.HornetQBuffer;
+import org.hornetq.api.core.HornetQBuffers;
+import org.hornetq.api.core.Message;
+import org.hornetq.api.core.SimpleString;
+import org.hornetq.core.server.ServerMessage;
+import org.hornetq.spi.core.protocol.RemotingConnection;
+import org.hornetq.spi.core.protocol.SessionCallback;
+
+/**
+ * A StompSessionCallback
+ *
+ * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
+ */
+class StompSessionCallback implements SessionCallback
+{
+   private final RemotingConnection connection;
+
+   private final StompMarshaller marshaller;
+
+   StompSessionCallback(final StompMarshaller marshaller, final RemotingConnection connection)
+   {
+      this.marshaller = marshaller;
+      this.connection = connection;
+   }
+
+   public void sendProducerCreditsMessage(int credits, SimpleString address, int offset)
+   {
+   }
+
+   public int sendMessage(ServerMessage serverMessage, long consumerID, int deliveryCount)
+   {
+      try
+      {
+         Map<String, Object> headers = new HashMap<String, Object>();
+         headers.put(Stomp.Headers.Message.DESTINATION, StompUtils.toStompDestination(serverMessage.getAddress()
+                                                                                                       .toString()));
+         byte[] data = new byte[] {};
+         if (serverMessage.getType() == Message.TEXT_TYPE)
+         {
+            SimpleString text = serverMessage.getBodyBuffer().readNullableSimpleString();
+            if (text != null)
+            {
+               data = text.toString().getBytes();
+            }
+         }
+         StompFrame msg = new StompFrame(Stomp.Responses.MESSAGE, headers, data);
+         System.out.println("SENDING : " + msg);
+         byte[] bytes = marshaller.marshal(msg);
+         HornetQBuffer buffer = HornetQBuffers.wrappedBuffer(bytes);
+         connection.getTransportConnection().write(buffer, true);
+
+         return bytes.length;
+
+      }
+      catch (Exception e)
+      {
+         e.printStackTrace();
+         return 0;
+      }
+
+   }
+
+   public int sendLargeMessageContinuation(long consumerID, byte[] body, boolean continues, boolean requiresResponse)
+   {
+      return 0;
+   }
+
+   public int sendLargeMessage(long consumerID, byte[] headerBuffer, long bodySize, int deliveryCount)
+   {
+      return 0;
+   }
+
+   public void closed()
+   {
+   }
+}
\ No newline at end of file

Added: trunk/src/main/org/hornetq/core/protocol/stomp/StompUtils.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/stomp/StompUtils.java	                        (rev 0)
+++ trunk/src/main/org/hornetq/core/protocol/stomp/StompUtils.java	2010-01-22 15:32:13 UTC (rev 8843)
@@ -0,0 +1,127 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.protocol.stomp;
+
+import org.hornetq.api.core.HornetQException;
+
+/**
+ * A StompUtils
+ *
+ * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
+ *
+ *
+ */
+class StompUtils
+{
+
+   public static String HQ_QUEUE_PREFIX = "jms.queue.";
+
+   public static String STOMP_QUEUE_PREFIX = "/queue/";
+
+   public static String HQ_TEMP_QUEUE_PREFIX = "jms.tempqueue.";
+
+   public static String STOMP_TEMP_QUEUE_PREFIX = "/temp-queue/";
+
+   public static String HQ_TOPIC_PREFIX = "jms.topic.";
+
+   public static String STOMP_TOPIC_PREFIX = "/topic/";
+
+   public static String HQ_TEMP_TOPIC_PREFIX = "jms.temptopic.";
+
+   public static String STOMP_TEMP_TOPIC_PREFIX = "/temp-topic/";
+
+   // Constants -----------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+
+   // Static --------------------------------------------------------
+
+   public static String toHornetQAddress(String stompDestination) throws HornetQException
+   {
+      if (stompDestination == null)
+      {
+         throw new HornetQException(HornetQException.ILLEGAL_STATE, "No destination is specified!");
+      }
+      else if (stompDestination.startsWith(STOMP_QUEUE_PREFIX))
+      {
+         return convert(stompDestination, STOMP_QUEUE_PREFIX, HQ_QUEUE_PREFIX);
+      }
+      else if (stompDestination.startsWith(STOMP_TOPIC_PREFIX))
+      {
+         return convert(stompDestination, STOMP_TOPIC_PREFIX, HQ_TOPIC_PREFIX);
+      }
+      else if (stompDestination.startsWith(STOMP_TEMP_QUEUE_PREFIX))
+      {
+         return convert(stompDestination, STOMP_TEMP_QUEUE_PREFIX, HQ_TEMP_QUEUE_PREFIX);
+      }
+      else if (stompDestination.startsWith(STOMP_TEMP_TOPIC_PREFIX))
+      {
+         return convert(stompDestination, STOMP_TEMP_TOPIC_PREFIX, HQ_TEMP_TOPIC_PREFIX);
+      }
+      else
+      {
+         throw new HornetQException(HornetQException.ILLEGAL_STATE, "Illegal destination name: [" + stompDestination +
+                                                                    "] -- StompConnect destinations " +
+                                                                    "must begin with one of: /queue/ /topic/ /temp-queue/ /temp-topic/");
+      }
+   }
+
+   public static String toStompDestination(String hornetqAddress) throws HornetQException
+   {
+      if (hornetqAddress == null)
+      {
+         throw new HornetQException(HornetQException.ILLEGAL_STATE, "No destination is specified!");
+      }
+      else if (hornetqAddress.startsWith(HQ_QUEUE_PREFIX))
+      {
+         return convert(hornetqAddress, HQ_QUEUE_PREFIX, STOMP_QUEUE_PREFIX);
+      }
+      else if (hornetqAddress.startsWith(HQ_TOPIC_PREFIX))
+      {
+         return convert(hornetqAddress, HQ_TOPIC_PREFIX, STOMP_TOPIC_PREFIX);
+      }
+      else if (hornetqAddress.startsWith(HQ_TEMP_QUEUE_PREFIX))
+      {
+         return convert(hornetqAddress, HQ_TEMP_QUEUE_PREFIX, STOMP_TEMP_QUEUE_PREFIX);
+      }
+      else if (hornetqAddress.startsWith(HQ_TEMP_TOPIC_PREFIX))
+      {
+         return convert(hornetqAddress, HQ_TEMP_TOPIC_PREFIX, STOMP_TEMP_TOPIC_PREFIX);
+      }
+      else
+      {
+         throw new HornetQException(HornetQException.ILLEGAL_STATE, "Illegal address name: [" + hornetqAddress +
+                                                                    "] -- Acceptable address must comply to JMS semantics");
+      }
+   }
+   
+   private static String convert(String str, String oldPrefix, String newPrefix) 
+   {
+      String sub = str.substring(oldPrefix.length(), str.length());
+      return new String(newPrefix + sub);
+   }
+
+   // Constructors --------------------------------------------------
+
+   // Public --------------------------------------------------------
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+
+}

Modified: trunk/src/main/org/hornetq/core/remoting/impl/invm/InVMAcceptor.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/impl/invm/InVMAcceptor.java	2010-01-22 14:25:11 UTC (rev 8842)
+++ trunk/src/main/org/hornetq/core/remoting/impl/invm/InVMAcceptor.java	2010-01-22 15:32:13 UTC (rev 8843)
@@ -58,14 +58,11 @@
    private boolean paused;
 
    private NotificationService notificationService;
-   
-   private final ProtocolType protocol;
 
    public InVMAcceptor(final Map<String, Object> configuration,
                        final BufferHandler handler,                       
                        final ConnectionLifeCycleListener listener,
-                       final Executor threadPool,
-                       final ProtocolType protocol)
+                       final Executor threadPool)
    {
       this.handler = handler;
       
@@ -74,8 +71,6 @@
       id = ConfigurationHelper.getIntProperty(TransportConstants.SERVER_ID_PROP_NAME, 0, configuration);
 
       executorFactory = new OrderedExecutorFactory(threadPool);
-      
-      this.protocol = protocol;
    }
 
    public synchronized void start() throws Exception

Modified: trunk/src/main/org/hornetq/core/remoting/impl/invm/InVMAcceptorFactory.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/impl/invm/InVMAcceptorFactory.java	2010-01-22 14:25:11 UTC (rev 8842)
+++ trunk/src/main/org/hornetq/core/remoting/impl/invm/InVMAcceptorFactory.java	2010-01-22 15:32:13 UTC (rev 8843)
@@ -17,7 +17,6 @@
 import java.util.concurrent.Executor;
 import java.util.concurrent.ScheduledExecutorService;
 
-import org.hornetq.spi.core.protocol.ProtocolType;
 import org.hornetq.spi.core.remoting.Acceptor;
 import org.hornetq.spi.core.remoting.AcceptorFactory;
 import org.hornetq.spi.core.remoting.BufferDecoder;
@@ -37,10 +36,9 @@
                                   final BufferDecoder decoder,
                                   final ConnectionLifeCycleListener listener,
                                   final Executor threadPool,
-                                  final ScheduledExecutorService scheduledThreadPool,
-                                  final ProtocolType protocol)
+                                  final ScheduledExecutorService scheduledThreadPool)
    {
-      return new InVMAcceptor(configuration, handler, listener, threadPool, protocol);
+      return new InVMAcceptor(configuration, handler, listener, threadPool);
    }
 
    public Set<String> getAllowableProperties()

Modified: trunk/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java	2010-01-22 14:25:11 UTC (rev 8842)
+++ trunk/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java	2010-01-22 15:32:13 UTC (rev 8843)
@@ -30,9 +30,11 @@
 import org.hornetq.core.logging.Logger;
 import org.hornetq.core.protocol.aardvark.impl.AardvarkProtocolManagerFactory;
 import org.hornetq.core.protocol.core.impl.CoreProtocolManagerFactory;
+import org.hornetq.core.protocol.stomp.StompProtocolManagerFactory;
 import org.hornetq.core.remoting.server.RemotingService;
 import org.hornetq.core.server.HornetQServer;
 import org.hornetq.core.server.management.ManagementService;
+import org.hornetq.integration.transports.netty.TransportConstants;
 import org.hornetq.spi.core.protocol.ConnectionEntry;
 import org.hornetq.spi.core.protocol.ProtocolManager;
 import org.hornetq.spi.core.protocol.ProtocolType;
@@ -121,6 +123,7 @@
       this.scheduledThreadPool = scheduledThreadPool;
       
       this.protocolMap.put(ProtocolType.CORE, new CoreProtocolManagerFactory().createProtocolManager(server, interceptors));
+      this.protocolMap.put(ProtocolType.STOMP, new StompProtocolManagerFactory().createProtocolManager(server, interceptors));
       this.protocolMap.put(ProtocolType.AARDVARK, new AardvarkProtocolManagerFactory().createProtocolManager(server, interceptors));
    }
 
@@ -159,9 +162,9 @@
                }
             }
             
-            //TODO - allow protocol type to be configured from Configuration for each acceptor
+            String protocolString = ConfigurationHelper.getStringProperty(TransportConstants.PROTOCOL_PROP_NAME, TransportConstants.DEFAULT_PROTOCOL, info.getParams());
 
-            ProtocolType protocol = hackProtocol;
+            ProtocolType protocol = ProtocolType.valueOf(protocolString.toUpperCase());
             
             ProtocolManager manager = protocolMap.get(protocol);
             
@@ -170,8 +173,7 @@
                                                        manager,
                                                        this,
                                                        threadPool,
-                                                       scheduledThreadPool,
-                                                       protocol);
+                                                       scheduledThreadPool);
 
             acceptors.add(acceptor);
 
@@ -200,9 +202,6 @@
 
       started = true;
    }
-   
-   //FIXME - temp hack so we can choose AARDVARK as protocol
-   public static ProtocolType hackProtocol = ProtocolType.CORE;
 
    public synchronized void freeze()
    {

Modified: trunk/src/main/org/hornetq/core/server/HornetQServer.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/HornetQServer.java	2010-01-22 14:25:11 UTC (rev 8842)
+++ trunk/src/main/org/hornetq/core/server/HornetQServer.java	2010-01-22 15:32:13 UTC (rev 8843)
@@ -35,6 +35,7 @@
 import org.hornetq.core.transaction.ResourceManager;
 import org.hornetq.core.version.Version;
 import org.hornetq.spi.core.protocol.RemotingConnection;
+import org.hornetq.spi.core.protocol.SessionCallback;
 import org.hornetq.spi.core.security.HornetQSecurityManager;
 import org.hornetq.utils.ExecutorFactory;
 
@@ -80,12 +81,11 @@
                                boolean autoCommitSends,
                                boolean autoCommitAcks,
                                boolean preAcknowledge,
-                               boolean xa) throws Exception;
+                               boolean xa,
+                               final SessionCallback callback) throws Exception;
 
    void removeSession(String name) throws Exception;
 
-   ServerSession getSession(String name);
-
    Set<ServerSession> getSessions();
 
    boolean isStarted();

Modified: trunk/src/main/org/hornetq/core/server/ServerSession.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/ServerSession.java	2010-01-22 14:25:11 UTC (rev 8842)
+++ trunk/src/main/org/hornetq/core/server/ServerSession.java	2010-01-22 15:32:13 UTC (rev 8843)
@@ -111,6 +111,4 @@
    void setTransferring(boolean transferring);
    
    void runConnectionFailureRunners();
-   
-   void setCallback(SessionCallback callback);
 }

Modified: trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java	2010-01-22 14:25:11 UTC (rev 8842)
+++ trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java	2010-01-22 15:32:13 UTC (rev 8843)
@@ -106,6 +106,7 @@
 import org.hornetq.core.version.Version;
 import org.hornetq.spi.core.logging.LogDelegateFactory;
 import org.hornetq.spi.core.protocol.RemotingConnection;
+import org.hornetq.spi.core.protocol.SessionCallback;
 import org.hornetq.spi.core.security.HornetQSecurityManager;
 import org.hornetq.utils.ExecutorFactory;
 import org.hornetq.utils.HornetQThreadFactory;
@@ -542,7 +543,8 @@
                                       final boolean autoCommitSends,
                                       final boolean autoCommitAcks,
                                       final boolean preAcknowledge,
-                                      final boolean xa) throws Exception
+                                      final boolean xa,
+                                      final SessionCallback callback) throws Exception
    {
       if (securityStore != null)
       {
@@ -565,7 +567,8 @@
                                                               securityStore,
                                                               managementService,
                                                               this,
-                                                              configuration.getManagementAddress());
+                                                              configuration.getManagementAddress(),
+                                                              callback);
 
       sessions.put(name, session);
 
@@ -595,11 +598,6 @@
       sessions.remove(name);
    }
 
-   public ServerSession getSession(final String name)
-   {
-      return sessions.get(name);
-   }
-
    public synchronized List<ServerSession> getSessions(final String connectionID)
    {
       Set<Entry<String, ServerSession>> sessionEntries = sessions.entrySet();

Modified: trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java	2010-01-22 14:25:11 UTC (rev 8842)
+++ trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java	2010-01-22 15:32:13 UTC (rev 8843)
@@ -134,7 +134,7 @@
 
    private final RoutingContext routingContext = new RoutingContextImpl(null);
 
-   private SessionCallback callback;
+   private final SessionCallback callback;
 
    // Constructors ---------------------------------------------------------------------------------
 
@@ -154,7 +154,8 @@
                             final SecurityStore securityStore,
                             final ManagementService managementService,
                             final HornetQServer server,
-                            final SimpleString managementAddress) throws Exception
+                            final SimpleString managementAddress,
+                            final SessionCallback callback) throws Exception
    {
       this.username = username;
 
@@ -193,6 +194,8 @@
 
       this.managementAddress = managementAddress;
 
+      this.callback = callback;
+      
       remotingConnection.addFailureListener(this);
 
       remotingConnection.addCloseListener(this);
@@ -200,11 +203,6 @@
 
    // ServerSession implementation ----------------------------------------------------------------------------
 
-   public void setCallback(final SessionCallback callback)
-   {
-      this.callback = callback;
-   }
-
    public String getUsername()
    {
       return username;

Modified: trunk/src/main/org/hornetq/integration/transports/netty/ChannelPipelineSupport.java
===================================================================
--- trunk/src/main/org/hornetq/integration/transports/netty/ChannelPipelineSupport.java	2010-01-22 14:25:11 UTC (rev 8842)
+++ trunk/src/main/org/hornetq/integration/transports/netty/ChannelPipelineSupport.java	2010-01-22 15:32:13 UTC (rev 8843)
@@ -16,6 +16,7 @@
 import javax.net.ssl.SSLContext;
 import javax.net.ssl.SSLEngine;
 
+import org.hornetq.core.protocol.stomp.StompFrameDelimiter;
 import org.hornetq.spi.core.protocol.ProtocolType;
 import org.hornetq.spi.core.remoting.BufferDecoder;
 import org.jboss.netty.channel.ChannelPipeline;
@@ -54,9 +55,12 @@
          //Core protocol uses it's own optimised decoder
          pipeline.addLast("decoder", new HornetQFrameDecoder2());
       }
+      else if (protocol == ProtocolType.STOMP)
+      {
+         pipeline.addLast("decoder", new StompFrameDelimiter());
+      }
       else
       {
-         //Use the old frame decoder for other protocols
          pipeline.addLast("decoder", new HornetQFrameDecoder(decoder));
       }
    }

Modified: trunk/src/main/org/hornetq/integration/transports/netty/NettyAcceptor.java
===================================================================
--- trunk/src/main/org/hornetq/integration/transports/netty/NettyAcceptor.java	2010-01-22 14:25:11 UTC (rev 8842)
+++ trunk/src/main/org/hornetq/integration/transports/netty/NettyAcceptor.java	2010-01-22 15:32:13 UTC (rev 8843)
@@ -78,7 +78,7 @@
  */
 public class NettyAcceptor implements Acceptor
 {
-   private static final Logger log = Logger.getLogger(NettyAcceptor.class);
+   static final Logger log = Logger.getLogger(NettyAcceptor.class);
 
    private ChannelFactory channelFactory;
 
@@ -106,6 +106,8 @@
 
    private final boolean useInvm;
 
+   private final ProtocolType protocol;
+
    private final String host;
 
    private final int port;
@@ -134,8 +136,6 @@
 
    private VirtualExecutorService bossExecutor;
    
-   private final ProtocolType protocol;
-   
    private boolean paused;
 
    public NettyAcceptor(final Map<String, Object> configuration,
@@ -143,8 +143,7 @@
                         final BufferDecoder decoder,
                         final ConnectionLifeCycleListener listener,
                         final Executor threadPool,
-                        final ScheduledExecutorService scheduledThreadPool,
-                        final ProtocolType protocol)
+                        final ScheduledExecutorService scheduledThreadPool)
    {
       this.handler = handler;
       
@@ -152,8 +151,6 @@
 
       this.listener = listener;
       
-      this.protocol = protocol;
-      
       sslEnabled = ConfigurationHelper.getBooleanProperty(TransportConstants.SSL_ENABLED_PROP_NAME,
                                                           TransportConstants.DEFAULT_SSL_ENABLED,
                                                           configuration);
@@ -190,6 +187,11 @@
       useInvm = ConfigurationHelper.getBooleanProperty(TransportConstants.USE_INVM_PROP_NAME,
                                                        TransportConstants.DEFAULT_USE_INVM,
                                                        configuration);
+      String protocolStr = ConfigurationHelper.getStringProperty(TransportConstants.PROTOCOL_PROP_NAME,
+                                                       TransportConstants.DEFAULT_PROTOCOL,
+                                                       configuration);
+      protocol = ProtocolType.valueOf(protocolStr);
+      
       host = ConfigurationHelper.getStringProperty(TransportConstants.HOST_PROP_NAME,
                                                    TransportConstants.DEFAULT_HOST,
                                                    configuration);

Modified: trunk/src/main/org/hornetq/integration/transports/netty/NettyAcceptorFactory.java
===================================================================
--- trunk/src/main/org/hornetq/integration/transports/netty/NettyAcceptorFactory.java	2010-01-22 14:25:11 UTC (rev 8842)
+++ trunk/src/main/org/hornetq/integration/transports/netty/NettyAcceptorFactory.java	2010-01-22 15:32:13 UTC (rev 8843)
@@ -18,7 +18,6 @@
 import java.util.concurrent.Executor;
 import java.util.concurrent.ScheduledExecutorService;
 
-import org.hornetq.spi.core.protocol.ProtocolType;
 import org.hornetq.spi.core.remoting.Acceptor;
 import org.hornetq.spi.core.remoting.AcceptorFactory;
 import org.hornetq.spi.core.remoting.BufferDecoder;
@@ -37,11 +36,9 @@
                                   final BufferDecoder decoder,
                                   final ConnectionLifeCycleListener listener,
                                   final Executor threadPool,
-                                  final ScheduledExecutorService scheduledThreadPool,
-                                  final ProtocolType protocol)
+                                  final ScheduledExecutorService scheduledThreadPool)
    {
-      return new NettyAcceptor(configuration, handler, decoder, listener, threadPool, scheduledThreadPool,
-                               protocol);
+      return new NettyAcceptor(configuration, handler, decoder, listener, threadPool, scheduledThreadPool);
    }
 
    public Set<String> getAllowableProperties()

Modified: trunk/src/main/org/hornetq/integration/transports/netty/NettyConnection.java
===================================================================
--- trunk/src/main/org/hornetq/integration/transports/netty/NettyConnection.java	2010-01-22 14:25:11 UTC (rev 8842)
+++ trunk/src/main/org/hornetq/integration/transports/netty/NettyConnection.java	2010-01-22 15:32:13 UTC (rev 8843)
@@ -14,7 +14,6 @@
 package org.hornetq.integration.transports.netty;
 
 import org.hornetq.api.core.HornetQBuffer;
-import org.hornetq.api.core.HornetQException;
 import org.hornetq.core.buffers.impl.ChannelBufferWrapper;
 import org.hornetq.core.logging.Logger;
 import org.hornetq.spi.core.protocol.ProtocolType;

Modified: trunk/src/main/org/hornetq/integration/transports/netty/TransportConstants.java
===================================================================
--- trunk/src/main/org/hornetq/integration/transports/netty/TransportConstants.java	2010-01-22 14:25:11 UTC (rev 8842)
+++ trunk/src/main/org/hornetq/integration/transports/netty/TransportConstants.java	2010-01-22 15:32:13 UTC (rev 8843)
@@ -16,6 +16,8 @@
 import java.util.HashSet;
 import java.util.Set;
 
+import org.hornetq.spi.core.protocol.ProtocolType;
+
 /**
  * A TransportConstants
  * 
@@ -46,6 +48,8 @@
 
    public static final String USE_INVM_PROP_NAME = "use-invm";
 
+   public static final String PROTOCOL_PROP_NAME = "protocol";
+
    public static final String HOST_PROP_NAME = "host";
 
    public static final String PORT_PROP_NAME = "port";
@@ -75,10 +79,14 @@
 
    public static final boolean DEFAULT_USE_SERVLET = false;
 
+   public static final String DEFAULT_PROTOCOL = ProtocolType.CORE.toString();
+
    public static final String DEFAULT_HOST = "localhost";
 
    public static final int DEFAULT_PORT = 5445;
 
+   public static final int DEFAULT_STOMP_PORT = 61613;
+
    public static final String DEFAULT_KEYSTORE_PATH = "hornetq.keystore";
 
    public static final String DEFAULT_KEYSTORE_PASSWORD = "secureexample";
@@ -120,6 +128,7 @@
       allowableAcceptorKeys.add(TransportConstants.HTTP_SERVER_SCAN_PERIOD_PROP_NAME);
       allowableAcceptorKeys.add(TransportConstants.USE_NIO_PROP_NAME);
       allowableAcceptorKeys.add(TransportConstants.USE_INVM_PROP_NAME);
+      allowableAcceptorKeys.add(TransportConstants.PROTOCOL_PROP_NAME);
       allowableAcceptorKeys.add(TransportConstants.HOST_PROP_NAME);
       allowableAcceptorKeys.add(TransportConstants.PORT_PROP_NAME);
       allowableAcceptorKeys.add(TransportConstants.KEYSTORE_PATH_PROP_NAME);

Modified: trunk/src/main/org/hornetq/jms/client/HornetQBytesMessage.java
===================================================================
--- trunk/src/main/org/hornetq/jms/client/HornetQBytesMessage.java	2010-01-22 14:25:11 UTC (rev 8842)
+++ trunk/src/main/org/hornetq/jms/client/HornetQBytesMessage.java	2010-01-22 15:32:13 UTC (rev 8843)
@@ -19,6 +19,7 @@
 import javax.jms.MessageFormatException;
 
 import org.hornetq.api.core.HornetQBuffer;
+import org.hornetq.api.core.Message;
 import org.hornetq.api.core.client.ClientMessage;
 import org.hornetq.api.core.client.ClientSession;
 import org.hornetq.core.logging.Logger;
@@ -42,7 +43,7 @@
 
    private static final Logger log = Logger.getLogger(HornetQBytesMessage.class);
 
-   public static final byte TYPE = 4;
+   public static final byte TYPE = Message.BYTES_TYPE;
 
    // Attributes ----------------------------------------------------
 

Modified: trunk/src/main/org/hornetq/jms/client/HornetQMapMessage.java
===================================================================
--- trunk/src/main/org/hornetq/jms/client/HornetQMapMessage.java	2010-01-22 14:25:11 UTC (rev 8842)
+++ trunk/src/main/org/hornetq/jms/client/HornetQMapMessage.java	2010-01-22 15:32:13 UTC (rev 8843)
@@ -22,6 +22,7 @@
 import javax.jms.MapMessage;
 import javax.jms.MessageFormatException;
 
+import org.hornetq.api.core.Message;
 import org.hornetq.api.core.PropertyConversionException;
 import org.hornetq.api.core.SimpleString;
 import org.hornetq.api.core.client.ClientMessage;
@@ -45,7 +46,7 @@
 {
    // Constants -----------------------------------------------------
 
-   public static final byte TYPE = 5;
+   public static final byte TYPE = Message.MAP_TYPE;
 
    // Attributes ----------------------------------------------------
 

Modified: trunk/src/main/org/hornetq/jms/client/HornetQMessage.java
===================================================================
--- trunk/src/main/org/hornetq/jms/client/HornetQMessage.java	2010-01-22 14:25:11 UTC (rev 8842)
+++ trunk/src/main/org/hornetq/jms/client/HornetQMessage.java	2010-01-22 15:32:13 UTC (rev 8843)
@@ -83,7 +83,7 @@
 
    private static final String JMSXGROUPID = "JMSXGroupID";
 
-   public static final byte TYPE = 0;
+   public static final byte TYPE = org.hornetq.api.core.Message.DEFAULT_TYPE;
 
    public static Map<String, Object> coreMaptoJMSMap(final Map<String, Object> coreMessage)
    {

Modified: trunk/src/main/org/hornetq/jms/client/HornetQObjectMessage.java
===================================================================
--- trunk/src/main/org/hornetq/jms/client/HornetQObjectMessage.java	2010-01-22 14:25:11 UTC (rev 8842)
+++ trunk/src/main/org/hornetq/jms/client/HornetQObjectMessage.java	2010-01-22 15:32:13 UTC (rev 8843)
@@ -22,6 +22,7 @@
 import javax.jms.JMSException;
 import javax.jms.ObjectMessage;
 
+import org.hornetq.api.core.Message;
 import org.hornetq.api.core.client.ClientMessage;
 import org.hornetq.api.core.client.ClientSession;
 
@@ -44,7 +45,7 @@
 {
    // Constants -----------------------------------------------------
 
-   public static final byte TYPE = 2;
+   public static final byte TYPE = Message.OBJECT_TYPE;
 
    // Attributes ----------------------------------------------------
 

Modified: trunk/src/main/org/hornetq/jms/client/HornetQStreamMessage.java
===================================================================
--- trunk/src/main/org/hornetq/jms/client/HornetQStreamMessage.java	2010-01-22 14:25:11 UTC (rev 8842)
+++ trunk/src/main/org/hornetq/jms/client/HornetQStreamMessage.java	2010-01-22 15:32:13 UTC (rev 8843)
@@ -19,6 +19,7 @@
 import javax.jms.StreamMessage;
 
 import org.hornetq.api.core.HornetQBuffer;
+import org.hornetq.api.core.Message;
 import org.hornetq.api.core.client.ClientMessage;
 import org.hornetq.api.core.client.ClientSession;
 import org.hornetq.core.client.impl.ClientMessageImpl;
@@ -48,7 +49,7 @@
 
    private static final Logger log = Logger.getLogger(HornetQStreamMessage.class);
 
-   public static final byte TYPE = 6;
+   public static final byte TYPE = Message.STREAM_TYPE;
 
    // Attributes ----------------------------------------------------
 

Modified: trunk/src/main/org/hornetq/jms/client/HornetQTextMessage.java
===================================================================
--- trunk/src/main/org/hornetq/jms/client/HornetQTextMessage.java	2010-01-22 14:25:11 UTC (rev 8842)
+++ trunk/src/main/org/hornetq/jms/client/HornetQTextMessage.java	2010-01-22 15:32:13 UTC (rev 8843)
@@ -17,6 +17,7 @@
 import javax.jms.TextMessage;
 
 import org.hornetq.api.core.HornetQBuffer;
+import org.hornetq.api.core.Message;
 import org.hornetq.api.core.SimpleString;
 import org.hornetq.api.core.client.ClientMessage;
 import org.hornetq.api.core.client.ClientSession;
@@ -42,7 +43,7 @@
 {
    // Constants -----------------------------------------------------
 
-   public static final byte TYPE = 3;
+   public static final byte TYPE = Message.TEXT_TYPE;
 
    public static final Logger log = Logger.getLogger(HornetQTextMessage.class);
 

Modified: trunk/src/main/org/hornetq/spi/core/protocol/ProtocolManager.java
===================================================================
--- trunk/src/main/org/hornetq/spi/core/protocol/ProtocolManager.java	2010-01-22 14:25:11 UTC (rev 8842)
+++ trunk/src/main/org/hornetq/spi/core/protocol/ProtocolManager.java	2010-01-22 15:32:13 UTC (rev 8843)
@@ -13,8 +13,8 @@
 
 package org.hornetq.spi.core.protocol;
 
+import org.hornetq.api.core.HornetQBuffer;
 import org.hornetq.spi.core.remoting.BufferDecoder;
-import org.hornetq.spi.core.remoting.BufferHandler;
 import org.hornetq.spi.core.remoting.Connection;
 
 /**
@@ -24,7 +24,14 @@
  *
  *
  */
-public interface ProtocolManager extends BufferHandler, BufferDecoder
+public interface ProtocolManager extends BufferDecoder
 {
    ConnectionEntry createConnectionEntry(Connection connection);
+   
+   public void removeHandler(final String name);
+
+   public int isReadyToHandle(HornetQBuffer buffer);
+   
+   void handleBuffer(RemotingConnection connection, HornetQBuffer buffer);
+
 }

Modified: trunk/src/main/org/hornetq/spi/core/remoting/AcceptorFactory.java
===================================================================
--- trunk/src/main/org/hornetq/spi/core/remoting/AcceptorFactory.java	2010-01-22 14:25:11 UTC (rev 8842)
+++ trunk/src/main/org/hornetq/spi/core/remoting/AcceptorFactory.java	2010-01-22 15:32:13 UTC (rev 8843)
@@ -18,12 +18,10 @@
 import java.util.concurrent.Executor;
 import java.util.concurrent.ScheduledExecutorService;
 
-import org.hornetq.spi.core.protocol.ProtocolType;
-
 /**
  * A factory for creating acceptors.
  * <p/>
- * An Acceptor is an endpoin that a {@link org.hornetq.spi.core.remoting.Connector} will connect to and is used by the remoting service.
+ * An Acceptor is an endpoint that a {@link org.hornetq.spi.core.remoting.Connector} will connect to and is used by the remoting service.
  *
  * @author <a href="ataylor at redhat.com">Andy Taylor</a>
  * @author <a href="tim.fox at jboss.com">Tim Fox</a>
@@ -47,8 +45,7 @@
                            BufferDecoder decoder,
                            ConnectionLifeCycleListener listener,
                            Executor threadPool,
-                           ScheduledExecutorService scheduledThreadPool,
-                           ProtocolType protocol);
+                           ScheduledExecutorService scheduledThreadPool);
 
    /**
     * Returns the allowable properties for this acceptor.

Modified: trunk/tests/src/org/hornetq/tests/integration/http/NettyHttpTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/http/NettyHttpTest.java	2010-01-22 14:25:11 UTC (rev 8842)
+++ trunk/tests/src/org/hornetq/tests/integration/http/NettyHttpTest.java	2010-01-22 15:32:13 UTC (rev 8843)
@@ -98,7 +98,7 @@
       conf.put(TransportConstants.HTTP_CLIENT_IDLE_SCAN_PERIOD, -1l);
       DummyConnectionLifeCycleListener acceptorListener = new DummyConnectionLifeCycleListener(connCreatedLatch);
       SimpleBufferHandler acceptorHandler = new SimpleBufferHandler(acceptorLatch);
-      acceptor = new NettyAcceptor(conf, acceptorHandler, null, acceptorListener, threadPool, scheduledThreadPool, ProtocolType.CORE);
+      acceptor = new NettyAcceptor(conf, acceptorHandler, null, acceptorListener, threadPool, scheduledThreadPool);
       acceptor.start();
 
       SimpleBufferHandler2 connectorHandler = new SimpleBufferHandler2(connectorLatch);
@@ -152,7 +152,7 @@
       conf.put(TransportConstants.HTTP_CLIENT_IDLE_SCAN_PERIOD, -1l);
       DummyConnectionLifeCycleListener acceptorListener = new DummyConnectionLifeCycleListener(connCreatedLatch);
       SimpleBufferHandler acceptorHandler = new SimpleBufferHandler(acceptorLatch);
-      acceptor = new NettyAcceptor(conf, acceptorHandler, null, acceptorListener, threadPool, scheduledThreadPool, ProtocolType.CORE);
+      acceptor = new NettyAcceptor(conf, acceptorHandler, null, acceptorListener, threadPool, scheduledThreadPool);
       acceptor.start();
 
       SimpleBufferHandler2 connectorHandler = new SimpleBufferHandler2(connectorLatch);
@@ -210,7 +210,7 @@
       conf.put(TransportConstants.HTTP_CLIENT_IDLE_SCAN_PERIOD, -1l);
       DummyConnectionLifeCycleListener acceptorListener = new DummyConnectionLifeCycleListener(connCreatedLatch);
       SimpleBufferHandler acceptorHandler = new SimpleBufferHandler(acceptorLatch);
-      acceptor = new NettyAcceptor(conf, acceptorHandler, null, acceptorListener, threadPool, scheduledThreadPool, ProtocolType.CORE);
+      acceptor = new NettyAcceptor(conf, acceptorHandler, null, acceptorListener, threadPool, scheduledThreadPool);
       acceptor.start();
 
       SimpleBufferHandler connectorHandler = new SimpleBufferHandler(connectorLatch);
@@ -269,7 +269,7 @@
       conf.put(TransportConstants.HTTP_CLIENT_IDLE_SCAN_PERIOD, -1l);
       DummyConnectionLifeCycleListener acceptorListener = new DummyConnectionLifeCycleListener(connCreatedLatch);
       SimpleBufferHandler acceptorHandler = new SimpleBufferHandler(acceptorLatch);
-      acceptor = new NettyAcceptor(conf, acceptorHandler, null, acceptorListener, threadPool, scheduledThreadPool, ProtocolType.CORE);
+      acceptor = new NettyAcceptor(conf, acceptorHandler, null, acceptorListener, threadPool, scheduledThreadPool);
       acceptor.start();
 
       SimpleBufferHandler connectorHandler = new SimpleBufferHandler(connectorLatch);
@@ -327,7 +327,7 @@
       conf.put(TransportConstants.HTTP_CLIENT_IDLE_PROP_NAME, 500l);
       DummyConnectionLifeCycleListener acceptorListener = new DummyConnectionLifeCycleListener(connCreatedLatch);
       SimpleBufferHandler acceptorHandler = new SimpleBufferHandler(acceptorLatch);
-      acceptor = new NettyAcceptor(conf, acceptorHandler, null, acceptorListener, threadPool, scheduledThreadPool, ProtocolType.CORE);
+      acceptor = new NettyAcceptor(conf, acceptorHandler, null, acceptorListener, threadPool, scheduledThreadPool);
       acceptor.start();
 
       SimpleBufferHandler connectorHandler = new SimpleBufferHandler(connectorLatch);
@@ -381,7 +381,7 @@
       conf.put(TransportConstants.HTTP_SERVER_SCAN_PERIOD_PROP_NAME, 5000l);
       DummyConnectionLifeCycleListener acceptorListener = new DummyConnectionLifeCycleListener(connCreatedLatch);
       SimpleBufferHandler acceptorHandler = new SimpleBufferHandler(acceptorLatch);
-      acceptor = new NettyAcceptor(conf, acceptorHandler, null, acceptorListener, threadPool, scheduledThreadPool, ProtocolType.CORE);
+      acceptor = new NettyAcceptor(conf, acceptorHandler, null, acceptorListener, threadPool, scheduledThreadPool);
       acceptor.start();
 
       BogusResponseHandler connectorHandler = new BogusResponseHandler(connectorLatch);
@@ -428,7 +428,7 @@
       conf.put(TransportConstants.HTTP_SERVER_SCAN_PERIOD_PROP_NAME, 5000l);
       DummyConnectionLifeCycleListener acceptorListener = new DummyConnectionLifeCycleListener(connCreatedLatch);
       SimpleBufferHandler2 acceptorHandler = new SimpleBufferHandler2(acceptorLatch);
-      acceptor = new NettyAcceptor(conf, acceptorHandler, null, acceptorListener, threadPool, scheduledThreadPool, ProtocolType.CORE);
+      acceptor = new NettyAcceptor(conf, acceptorHandler, null, acceptorListener, threadPool, scheduledThreadPool);
       acceptor.start();
 
       BogusResponseHandler connectorHandler = new BogusResponseHandler(connectorLatch);

Modified: trunk/tests/src/org/hornetq/tests/integration/remoting/AardvarkProtocolTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/remoting/AardvarkProtocolTest.java	2010-01-22 14:25:11 UTC (rev 8842)
+++ trunk/tests/src/org/hornetq/tests/integration/remoting/AardvarkProtocolTest.java	2010-01-22 15:32:13 UTC (rev 8843)
@@ -45,8 +45,6 @@
 
    public void testAardvark() throws Exception
    {
-      RemotingServiceImpl.hackProtocol = ProtocolType.AARDVARK;
-      
       Configuration config = new ConfigurationImpl();
       
       config.setSecurityEnabled(false);
@@ -56,6 +54,7 @@
       
       params.put(TransportConstants.PORT_PROP_NAME, 9876);
       params.put(TransportConstants.HOST_PROP_NAME, "127.0.0.1");
+      params.put(TransportConstants.PROTOCOL_PROP_NAME, ProtocolType.AARDVARK.toString());
       
       TransportConfiguration tc = new TransportConfiguration(NettyAcceptorFactory.class.getCanonicalName(),
                                                              params);
@@ -95,7 +94,5 @@
       socket.close();
       
       server.stop();
-      
-      RemotingServiceImpl.hackProtocol = ProtocolType.CORE;
    }
 }

Added: trunk/tests/src/org/hornetq/tests/integration/stomp/StompTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/stomp/StompTest.java	                        (rev 0)
+++ trunk/tests/src/org/hornetq/tests/integration/stomp/StompTest.java	2010-01-22 15:32:13 UTC (rev 8843)
@@ -0,0 +1,964 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.hornetq.tests.integration.stomp;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.Socket;
+import java.net.SocketTimeoutException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import javax.jms.BytesMessage;
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.JMSException;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import junit.framework.Assert;
+import junit.framework.TestCase;
+
+import org.hornetq.api.core.TransportConfiguration;
+import org.hornetq.core.config.Configuration;
+import org.hornetq.core.config.impl.ConfigurationImpl;
+import org.hornetq.core.logging.Logger;
+import org.hornetq.core.protocol.stomp.Stomp;
+import org.hornetq.core.remoting.impl.invm.InVMAcceptorFactory;
+import org.hornetq.core.remoting.impl.invm.InVMConnectorFactory;
+import org.hornetq.core.server.HornetQServer;
+import org.hornetq.core.server.HornetQServers;
+import org.hornetq.integration.transports.netty.NettyAcceptorFactory;
+import org.hornetq.integration.transports.netty.TransportConstants;
+import org.hornetq.jms.client.HornetQConnectionFactory;
+import org.hornetq.jms.server.JMSServerManager;
+import org.hornetq.jms.server.config.JMSConfiguration;
+import org.hornetq.jms.server.config.impl.JMSConfigurationImpl;
+import org.hornetq.jms.server.config.impl.QueueConfigurationImpl;
+import org.hornetq.jms.server.impl.JMSServerManagerImpl;
+import org.hornetq.spi.core.protocol.ProtocolType;
+
+public class StompTest extends TestCase {
+    private static final transient Logger log = Logger.getLogger(StompTest.class);
+    private int port = 61613;
+    private Socket stompSocket;
+    private ByteArrayOutputStream inputBuffer;
+    private ConnectionFactory connectionFactory;
+    private Connection connection;
+    private Session session;
+    private Queue queue;
+    private JMSServerManager server;
+
+    public void testConnect() throws Exception {
+
+       String connect_frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n" + "request-id: 1\n" + "\n" + Stomp.NULL;
+       sendFrame(connect_frame);
+
+        String f = receiveFrame(10000);
+        Assert.assertTrue(f.startsWith("CONNECTED"));
+        Assert.assertTrue(f.indexOf("response-id:1") >= 0);
+    }
+    
+    public void testDisconnectAndError() throws Exception {
+
+       String connect_frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n" + "request-id: 1\n" + "\n" + Stomp.NULL;
+       sendFrame(connect_frame);
+
+       String f = receiveFrame(10000);
+       Assert.assertTrue(f.startsWith("CONNECTED"));
+       Assert.assertTrue(f.indexOf("response-id:1") >= 0);
+       
+       connect_frame = "DISCONNECT\n\n" + Stomp.NULL;
+       sendFrame(connect_frame);
+       
+       // sending a message will result in an error
+       String frame =
+          "SEND\n" +
+                  "destination:/queue/" + getQueueName() + "\n\n" +
+                  "Hello World" +
+                  Stomp.NULL;
+       sendFrame(frame);
+
+       f = receiveFrame(10000);
+       Assert.assertTrue(f.startsWith("ERROR"));
+   }
+
+
+    public void testSendMessage() throws Exception {
+
+        MessageConsumer consumer = session.createConsumer(queue);
+
+        String frame =
+                "CONNECT\n" +
+                        "login: brianm\n" +
+                        "passcode: wombats\n\n" +
+                        Stomp.NULL;
+        sendFrame(frame);
+
+        frame = receiveFrame(10000);
+        Assert.assertTrue(frame.startsWith("CONNECTED"));
+
+        frame =
+                "SEND\n" +
+                        "destination:/queue/" + getQueueName() + "\n\n" +
+                        "Hello World" +
+                        Stomp.NULL;
+
+        sendFrame(frame);
+        
+        TextMessage message = (TextMessage) consumer.receive(1000);
+        Assert.assertNotNull(message);
+        Assert.assertEquals("Hello World", message.getText());
+
+        // Make sure that the timestamp is valid - should
+        // be very close to the current time.
+        long tnow = System.currentTimeMillis();
+        long tmsg = message.getJMSTimestamp();
+        Assert.assertTrue(Math.abs(tnow - tmsg) < 1000);
+    }
+    
+    public void testSendMessageWithReceipt() throws Exception {
+
+       MessageConsumer consumer = session.createConsumer(queue);
+
+       String frame =
+               "CONNECT\n" +
+                       "login: brianm\n" +
+                       "passcode: wombats\n\n" +
+                       Stomp.NULL;
+       sendFrame(frame);
+
+       frame = receiveFrame(10000);
+       Assert.assertTrue(frame.startsWith("CONNECTED"));
+
+       frame =
+               "SEND\n" +
+                       "destination:/queue/" + getQueueName() + "\n" +
+                       "receipt: 1234\n\n" +
+                       "Hello World" +
+                       Stomp.NULL;
+
+       sendFrame(frame);
+
+       String f = receiveFrame(10000);
+       Assert.assertTrue(f.startsWith("RECEIPT"));
+       Assert.assertTrue(f.indexOf("receipt-id:1234") >= 0);
+
+       TextMessage message = (TextMessage) consumer.receive(1000);
+       Assert.assertNotNull(message);
+       Assert.assertEquals("Hello World", message.getText());
+
+       // Make sure that the timestamp is valid - should
+       // be very close to the current time.
+       long tnow = System.currentTimeMillis();
+       long tmsg = message.getJMSTimestamp();
+       Assert.assertTrue(Math.abs(tnow - tmsg) < 1000);
+   }
+    
+    public void testSendMessageWithContentLength() throws Exception {
+
+       MessageConsumer consumer = session.createConsumer(queue);
+
+       String frame =
+               "CONNECT\n" +
+                       "login: brianm\n" +
+                       "passcode: wombats\n\n" +
+                       Stomp.NULL;
+       sendFrame(frame);
+
+       frame = receiveFrame(10000);
+       Assert.assertTrue(frame.startsWith("CONNECTED"));
+
+       byte[] data = new byte[] {1, 2, 3, 4};
+        
+       frame =
+               "SEND\n" +
+                       "destination:/queue/" + getQueueName() + "\n" +
+                       "content-length:" + data.length + "\n\n" +
+                       new String(data) +
+                       Stomp.NULL;
+
+       sendFrame(frame);
+       
+       BytesMessage message = (BytesMessage) consumer.receive(1000);
+       Assert.assertNotNull(message);
+       assertEquals(data.length, message.getBodyLength());
+       assertEquals(data[0], message.readByte());
+       assertEquals(data[1], message.readByte());
+       assertEquals(data[2], message.readByte());
+       assertEquals(data[3], message.readByte());
+
+       // Make sure that the timestamp is valid - should
+       // be very close to the current time.
+       long tnow = System.currentTimeMillis();
+       long tmsg = message.getJMSTimestamp();
+       Assert.assertTrue(Math.abs(tnow - tmsg) < 1000);
+   }
+
+    public void _testJMSXGroupIdCanBeSet() throws Exception {
+
+        MessageConsumer consumer = session.createConsumer(queue);
+
+        String frame =
+                "CONNECT\n" +
+                        "login: brianm\n" +
+                        "passcode: wombats\n\n" +
+                        Stomp.NULL;
+        sendFrame(frame);
+
+        frame = receiveFrame(10000);
+        Assert.assertTrue(frame.startsWith("CONNECTED"));
+
+        frame =
+                "SEND\n" +
+                        "destination:/queue/" + getQueueName() + "\n" +
+                        "JMSXGroupID: TEST\n\n" +
+                        "Hello World" +
+                        Stomp.NULL;
+
+        sendFrame(frame);
+
+        TextMessage message = (TextMessage) consumer.receive(1000);
+        Assert.assertNotNull(message);
+        // TODO do we support it?
+        //Assert.assertEquals("TEST", ((TextMessage) message).getGroupID());
+    }
+
+    public void _testSendMessageWithCustomHeadersAndSelector() throws Exception {
+
+        MessageConsumer consumer = session.createConsumer(queue, "foo = 'abc'");
+
+        String frame =
+                "CONNECT\n" +
+                        "login: brianm\n" +
+                        "passcode: wombats\n\n" +
+                        Stomp.NULL;
+        sendFrame(frame);
+
+        frame = receiveFrame(10000);
+        Assert.assertTrue(frame.startsWith("CONNECTED"));
+
+        frame =
+                "SEND\n" +
+                        "foo:abc\n" +
+                        "bar:123\n" +
+                        "destination:/queue/" + getQueueName() + "\n\n" +
+                        "Hello World" +
+                        Stomp.NULL;
+
+        sendFrame(frame);
+
+        TextMessage message = (TextMessage) consumer.receive(1000);
+        Assert.assertNotNull(message);
+        Assert.assertEquals("Hello World", message.getText());
+        Assert.assertEquals("foo", "abc", message.getStringProperty("foo"));
+        Assert.assertEquals("bar", "123", message.getStringProperty("bar"));
+    }
+
+    public void _testSendMessageWithStandardHeaders() throws Exception {
+
+        MessageConsumer consumer = session.createConsumer(queue);
+
+        String frame =
+                "CONNECT\n" +
+                        "login: brianm\n" +
+                        "passcode: wombats\n\n" +
+                        Stomp.NULL;
+        sendFrame(frame);
+
+        frame = receiveFrame(10000);
+        Assert.assertTrue(frame.startsWith("CONNECTED"));
+
+        frame =
+                "SEND\n" +
+                        "correlation-id:c123\n" +
+                        "priority:3\n" +
+                        "type:t345\n" +
+                        "JMSXGroupID:abc\n" +
+                        "foo:abc\n" +
+                        "bar:123\n" +
+                        "destination:/queue/" + getQueueName() + "\n\n" +
+                        "Hello World" +
+                        Stomp.NULL;
+
+        sendFrame(frame);
+
+        TextMessage message = (TextMessage) consumer.receive(1000);
+        Assert.assertNotNull(message);
+        Assert.assertEquals("Hello World", message.getText());
+        Assert.assertEquals("JMSCorrelationID", "c123", message.getJMSCorrelationID());
+        Assert.assertEquals("getJMSType", "t345", message.getJMSType());
+        Assert.assertEquals("getJMSPriority", 3, message.getJMSPriority());
+        Assert.assertEquals("foo", "abc", message.getStringProperty("foo"));
+        Assert.assertEquals("bar", "123", message.getStringProperty("bar"));
+
+        Assert.assertEquals("JMSXGroupID", "abc", message.getStringProperty("JMSXGroupID"));
+        // FIXME do we support it?
+        //Assert.assertEquals("GroupID", "abc", amqMessage.getGroupID());
+    }
+
+    public void testSubscribeWithAutoAck() throws Exception {
+
+        String frame =
+                "CONNECT\n" +
+                        "login: brianm\n" +
+                        "passcode: wombats\n\n" +
+                        Stomp.NULL;
+        sendFrame(frame);
+
+        frame = receiveFrame(100000);
+        Assert.assertTrue(frame.startsWith("CONNECTED"));
+
+        frame =
+                "SUBSCRIBE\n" +
+                        "destination:/queue/" + getQueueName() + "\n" +
+                        "ack:auto\n\n" +
+                        Stomp.NULL;
+        sendFrame(frame);
+
+        sendMessage(getName());
+
+        frame = receiveFrame(10000);
+        Assert.assertTrue(frame.startsWith("MESSAGE"));
+        Assert.assertTrue(frame.indexOf("destination:") > 0);
+        Assert.assertTrue(frame.indexOf(getName()) > 0);
+
+        frame =
+                "DISCONNECT\n" +
+                        "\n\n" +
+                        Stomp.NULL;
+        sendFrame(frame);
+    }
+
+    public void _testSubscribeWithAutoAckAndBytesMessage() throws Exception {
+
+        String frame =
+                "CONNECT\n" +
+                        "login: brianm\n" +
+                        "passcode: wombats\n\n" +
+                        Stomp.NULL;
+        sendFrame(frame);
+
+        frame = receiveFrame(100000);
+        Assert.assertTrue(frame.startsWith("CONNECTED"));
+
+        frame =
+                "SUBSCRIBE\n" +
+                        "destination:/queue/" + getQueueName() + "\n" +
+                        "ack:auto\n\n" +
+                        Stomp.NULL;
+        sendFrame(frame);
+
+        sendBytesMessage(new byte[]{1, 2, 3, 4, 5});
+
+        frame = receiveFrame(10000);
+        Assert.assertTrue(frame.startsWith("MESSAGE"));
+
+        Pattern cl = Pattern.compile("Content-length:\\s*(\\d+)", Pattern.CASE_INSENSITIVE);
+        Matcher cl_matcher = cl.matcher(frame);
+        Assert.assertTrue(cl_matcher.find());
+        Assert.assertEquals("5", cl_matcher.group(1));
+
+        Assert.assertFalse(Pattern.compile("type:\\s*null", Pattern.CASE_INSENSITIVE).matcher(frame).find());
+
+        frame =
+                "DISCONNECT\n" +
+                        "\n\n" +
+                        Stomp.NULL;
+        sendFrame(frame);
+    }
+
+    public void _testSubscribeWithMessageSentWithProperties() throws Exception {
+
+        String frame =
+                "CONNECT\n" +
+                        "login: brianm\n" +
+                        "passcode: wombats\n\n" +
+                        Stomp.NULL;
+        sendFrame(frame);
+
+        frame = receiveFrame(100000);
+        Assert.assertTrue(frame.startsWith("CONNECTED"));
+
+        frame =
+                "SUBSCRIBE\n" +
+                        "destination:/queue/" + getQueueName() + "\n" +
+                        "ack:auto\n\n" +
+                        Stomp.NULL;
+        sendFrame(frame);
+
+        MessageProducer producer = session.createProducer(queue);
+        TextMessage message = session.createTextMessage("Hello World");
+        message.setStringProperty("S", "value");
+        message.setBooleanProperty("n", false);
+        message.setByteProperty("byte", (byte) 9);
+        message.setDoubleProperty("d", 2.0);
+        message.setFloatProperty("f", (float) 6.0);
+        message.setIntProperty("i", 10);
+        message.setLongProperty("l", 121);
+        message.setShortProperty("s", (short) 12);
+        producer.send(message);
+
+        frame = receiveFrame(10000);
+        Assert.assertTrue(frame.startsWith("MESSAGE"));
+        Assert.assertTrue(frame.indexOf("S:") > 0);
+        Assert.assertTrue(frame.indexOf("n:") > 0);
+        Assert.assertTrue(frame.indexOf("byte:") > 0);
+        Assert.assertTrue(frame.indexOf("d:") > 0);
+        Assert.assertTrue(frame.indexOf("f:") > 0);
+        Assert.assertTrue(frame.indexOf("i:") > 0);
+        Assert.assertTrue(frame.indexOf("l:") > 0);
+        Assert.assertTrue(frame.indexOf("s:") > 0);
+        Assert.assertTrue(frame.indexOf("Hello World") > 0);
+
+//        System.out.println("out: "+frame);
+
+        frame =
+                "DISCONNECT\n" +
+                        "\n\n" +
+                        Stomp.NULL;
+        sendFrame(frame);
+    }
+
+    public void _testMessagesAreInOrder() throws Exception {
+        int ctr = 10;
+        String[] data = new String[ctr];
+
+        String frame =
+                "CONNECT\n" +
+                        "login: brianm\n" +
+                        "passcode: wombats\n\n" +
+                        Stomp.NULL;
+        sendFrame(frame);
+
+        frame = receiveFrame(100000);
+        Assert.assertTrue(frame.startsWith("CONNECTED"));
+
+        frame =
+                "SUBSCRIBE\n" +
+                        "destination:/queue/" + getQueueName() + "\n" +
+                        "ack:auto\n\n" +
+                        Stomp.NULL;
+        sendFrame(frame);
+
+        for (int i = 0; i < ctr; ++i) {
+            data[i] = getName() + i;
+            sendMessage(data[i]);
+        }
+
+        for (int i = 0; i < ctr; ++i) {
+            frame = receiveFrame(1000);
+            Assert.assertTrue("Message not in order", frame.indexOf(data[i]) >= 0);
+        }
+
+        // sleep a while before publishing another set of messages
+        waitForFrameToTakeEffect();
+
+        for (int i = 0; i < ctr; ++i) {
+            data[i] = getName() + ":second:" + i;
+            sendMessage(data[i]);
+        }
+
+        for (int i = 0; i < ctr; ++i) {
+            frame = receiveFrame(1000);
+            Assert.assertTrue("Message not in order", frame.indexOf(data[i]) >= 0);
+        }
+
+        frame =
+                "DISCONNECT\n" +
+                        "\n\n" +
+                        Stomp.NULL;
+        sendFrame(frame);
+    }
+
+    public void _testSubscribeWithAutoAckAndSelector() throws Exception {
+
+        String frame =
+                "CONNECT\n" +
+                        "login: brianm\n" +
+                        "passcode: wombats\n\n" +
+                        Stomp.NULL;
+        sendFrame(frame);
+
+        frame = receiveFrame(100000);
+        Assert.assertTrue(frame.startsWith("CONNECTED"));
+
+        frame =
+                "SUBSCRIBE\n" +
+                        "destination:/queue/" + getQueueName() + "\n" +
+                        "selector: foo = 'zzz'\n" +
+                        "ack:auto\n\n" +
+                        Stomp.NULL;
+        sendFrame(frame);
+
+        sendMessage("Ignored message", "foo", "1234");
+        sendMessage("Real message", "foo", "zzz");
+
+        frame = receiveFrame(10000);
+        Assert.assertTrue(frame.startsWith("MESSAGE"));
+        Assert.assertTrue("Should have received the real message but got: " + frame, frame.indexOf("Real message") > 0);
+
+        frame =
+                "DISCONNECT\n" +
+                        "\n\n" +
+                        Stomp.NULL;
+        sendFrame(frame);
+    }
+
+    public void _testSubscribeWithClientAck() throws Exception {
+
+        String frame =
+                "CONNECT\n" +
+                        "login: brianm\n" +
+                        "passcode: wombats\n\n" +
+                        Stomp.NULL;
+        sendFrame(frame);
+
+        frame = receiveFrame(10000);
+        Assert.assertTrue(frame.startsWith("CONNECTED"));
+
+        frame =
+                "SUBSCRIBE\n" +
+                        "destination:/queue/" + getQueueName() + "\n" +
+                        "ack:client\n\n" +
+                        Stomp.NULL;
+
+        sendFrame(frame);
+        sendMessage(getName());
+        frame = receiveFrame(10000);
+        Assert.assertTrue(frame.startsWith("MESSAGE"));
+
+        frame =
+                "DISCONNECT\n" +
+                        "\n\n" +
+                        Stomp.NULL;
+        sendFrame(frame);
+
+        // message should be received since message was not acknowledged
+        MessageConsumer consumer = session.createConsumer(queue);
+        TextMessage message = (TextMessage) consumer.receive(1000);
+        Assert.assertNotNull(message);
+        Assert.assertTrue(message.getJMSRedelivered());
+    }
+
+    public void _testSubscribeWithClientAckThenConsumingAgainWithAutoAckWithNoDisconnectFrame() throws Exception {
+        assertSubscribeWithClientAckThenConsumeWithAutoAck(false);
+    }
+
+    public void _testSubscribeWithClientAckThenConsumingAgainWithAutoAckWithExplicitDisconnect() throws Exception {
+        assertSubscribeWithClientAckThenConsumeWithAutoAck(true);
+    }
+
+    protected void assertSubscribeWithClientAckThenConsumeWithAutoAck(boolean sendDisconnect) throws Exception {
+
+        String frame =
+                "CONNECT\n" +
+                        "login: brianm\n" +
+                        "passcode: wombats\n\n" +
+                        Stomp.NULL;
+        sendFrame(frame);
+
+        frame = receiveFrame(10000);
+        Assert.assertTrue(frame.startsWith("CONNECTED"));
+
+        frame =
+                "SUBSCRIBE\n" +
+                        "destination:/queue/" + getQueueName() + "\n" +
+                        "ack:client\n\n" +
+                        Stomp.NULL;
+
+        sendFrame(frame);
+        sendMessage(getName());
+
+        frame = receiveFrame(10000);
+        Assert.assertTrue(frame.startsWith("MESSAGE"));
+
+        log.info("Reconnecting!");
+        
+        if (sendDisconnect) {
+            frame =
+                    "DISCONNECT\n" +
+                            "\n\n" +
+                            Stomp.NULL;
+            sendFrame(frame);
+            reconnect();
+        }
+        else {
+            reconnect(1000);
+        }
+
+
+        // message should be received since message was not acknowledged
+        frame =
+                "CONNECT\n" +
+                        "login: brianm\n" +
+                        "passcode: wombats\n\n" +
+                        Stomp.NULL;
+        sendFrame(frame);
+
+        frame = receiveFrame(10000);
+        Assert.assertTrue(frame.startsWith("CONNECTED"));
+
+        frame =
+                "SUBSCRIBE\n" +
+                        "destination:/queue/" + getQueueName() + "\n\n" +
+                        Stomp.NULL;
+
+        sendFrame(frame);
+
+        frame = receiveFrame(10000);
+        Assert.assertTrue(frame.startsWith("MESSAGE"));
+
+        frame =
+                "DISCONNECT\n" +
+                        "\n\n" +
+                        Stomp.NULL;
+        sendFrame(frame);
+
+        // now lets make sure we don't see the message again
+        reconnect();
+
+        frame =
+                "CONNECT\n" +
+                        "login: brianm\n" +
+                        "passcode: wombats\n\n" +
+                        Stomp.NULL;
+        sendFrame(frame);
+
+        frame = receiveFrame(10000);
+        Assert.assertTrue(frame.startsWith("CONNECTED"));
+
+        frame =
+                "SUBSCRIBE\n" +
+                        "destination:/queue/" + getQueueName() + "\n\n" +
+                        Stomp.NULL;
+
+        sendFrame(frame);
+        sendMessage("shouldBeNextMessage");
+
+        frame = receiveFrame(10000);
+        Assert.assertTrue(frame.startsWith("MESSAGE"));
+        Assert.assertTrue(frame.contains("shouldBeNextMessage"));
+    }
+
+    public void _testUnsubscribe() throws Exception {
+
+        String frame =
+                "CONNECT\n" +
+                        "login: brianm\n" +
+                        "passcode: wombats\n\n" +
+                        Stomp.NULL;
+        sendFrame(frame);
+        frame = receiveFrame(100000);
+        Assert.assertTrue(frame.startsWith("CONNECTED"));
+
+        frame =
+                "SUBSCRIBE\n" +
+                        "destination:/queue/" + getQueueName() + "\n" +
+                        "ack:auto\n\n" +
+                        Stomp.NULL;
+        sendFrame(frame);
+
+        //send a message to our queue
+        sendMessage("first message");
+
+        //receive message from socket
+        frame = receiveFrame(1000);
+        Assert.assertTrue(frame.startsWith("MESSAGE"));
+
+        //remove suscription
+        frame =
+                "UNSUBSCRIBE\n" +
+                        "destination:/queue/" + getQueueName() + "\n" +
+                        "\n\n" +
+                        Stomp.NULL;
+        sendFrame(frame);
+
+        waitForFrameToTakeEffect();
+
+        //send a message to our queue
+        sendMessage("second message");
+
+        try {
+            frame = receiveFrame(1000);
+            log.info("Received frame: " + frame);
+            Assert.fail("No message should have been received since subscription was removed");
+        }
+        catch (SocketTimeoutException e) {
+
+        }
+    }
+
+    public void _testTransactionCommit() throws Exception {
+        MessageConsumer consumer = session.createConsumer(queue);
+
+        String frame =
+                "CONNECT\n" +
+                        "login: brianm\n" +
+                        "passcode: wombats\n\n" +
+                        Stomp.NULL;
+        sendFrame(frame);
+
+        String f = receiveFrame(1000);
+        Assert.assertTrue(f.startsWith("CONNECTED"));
+
+        frame =
+                "BEGIN\n" +
+                        "transaction: tx1\n" +
+                        "\n\n" +
+                        Stomp.NULL;
+        sendFrame(frame);
+
+        frame =
+                "SEND\n" +
+                        "destination:/queue/" + getQueueName() + "\n" +
+                        "transaction: tx1\n" +
+                        "\n\n" +
+                        "Hello World" +
+                        Stomp.NULL;
+        sendFrame(frame);
+
+        waitForFrameToTakeEffect();
+        // check the message is not committed
+        assertNull(consumer.receive(100));
+        
+        frame =
+                "COMMIT\n" +
+                        "transaction: tx1\n" +
+                        "\n\n" +
+                        Stomp.NULL;
+        sendFrame(frame);
+
+        waitForFrameToTakeEffect();
+
+        TextMessage message = (TextMessage) consumer.receive(1000);
+        Assert.assertNotNull("Should have received a message", message);
+    }
+
+    public void _testTransactionRollback() throws Exception {
+        MessageConsumer consumer = session.createConsumer(queue);
+
+        String frame =
+                "CONNECT\n" +
+                        "login: brianm\n" +
+                        "passcode: wombats\n\n" +
+                        Stomp.NULL;
+        sendFrame(frame);
+
+        String f = receiveFrame(1000);
+        Assert.assertTrue(f.startsWith("CONNECTED"));
+
+        frame =
+                "BEGIN\n" +
+                        "transaction: tx1\n" +
+                        "\n\n" +
+                        Stomp.NULL;
+        sendFrame(frame);
+
+        frame =
+                "SEND\n" +
+                        "destination:/queue/" + getQueueName() + "\n" +
+                        "transaction: tx1\n" +
+                        "\n" +
+                        "first message" +
+                        Stomp.NULL;
+        sendFrame(frame);
+
+        //rollback first message
+        frame =
+                "ABORT\n" +
+                        "transaction: tx1\n" +
+                        "\n\n" +
+                        Stomp.NULL;
+        sendFrame(frame);
+
+        frame =
+                "BEGIN\n" +
+                        "transaction: tx1\n" +
+                        "\n\n" +
+                        Stomp.NULL;
+        sendFrame(frame);
+
+        frame =
+                "SEND\n" +
+                        "destination:/queue/" + getQueueName() + "\n" +
+                        "transaction: tx1\n" +
+                        "\n" +
+                        "second message" +
+                        Stomp.NULL;
+        sendFrame(frame);
+
+        frame =
+                "COMMIT\n" +
+                        "transaction: tx1\n" +
+                        "\n\n" +
+                        Stomp.NULL;
+        sendFrame(frame);
+
+        // This test case is currently failing
+        waitForFrameToTakeEffect();
+
+        //only second msg should be received since first msg was rolled back
+        TextMessage message = (TextMessage) consumer.receive(1000);
+        Assert.assertNotNull(message);
+        Assert.assertEquals("second message", message.getText().trim());
+    }
+
+    // Implementation methods
+    //-------------------------------------------------------------------------
+    protected void setUp() throws Exception {
+       server = createServer();
+       server.start();
+        connectionFactory = createConnectionFactory();
+
+        stompSocket = createSocket();
+        inputBuffer = new ByteArrayOutputStream();
+
+        connection = connectionFactory.createConnection();
+        session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        queue = session.createQueue(getQueueName());
+        connection.start();
+    }
+
+    /**
+    * @return
+    * @throws Exception 
+    */
+   private JMSServerManager createServer() throws Exception
+   {
+      Configuration config = new ConfigurationImpl();
+      config.setSecurityEnabled(false);
+      config.setPersistenceEnabled(false);
+
+      Map<String, Object> params = new HashMap<String, Object>();
+      params.put(TransportConstants.PROTOCOL_PROP_NAME, ProtocolType.STOMP.toString());
+      params.put(TransportConstants.PORT_PROP_NAME, TransportConstants.DEFAULT_STOMP_PORT);
+      TransportConfiguration stompTransport = new TransportConfiguration(NettyAcceptorFactory.class.getName(), params);
+      config.getAcceptorConfigurations().add(stompTransport);
+      config.getAcceptorConfigurations().add(new TransportConfiguration(InVMAcceptorFactory.class.getName()));
+       HornetQServer hornetQServer = HornetQServers.newHornetQServer(config);
+       
+       JMSConfiguration jmsConfig = new JMSConfigurationImpl();
+       jmsConfig.getQueueConfigurations().add(new QueueConfigurationImpl(getQueueName(), null, false, getQueueName()));
+       server = new JMSServerManagerImpl(hornetQServer, jmsConfig);
+       server.setContext(null);
+       return server;
+   }
+
+   protected void tearDown() throws Exception {
+        connection.close();
+        if (stompSocket != null) {
+            stompSocket.close();
+        }
+        server.stop();
+    }
+
+    protected void reconnect() throws Exception {
+        reconnect(0);
+    }
+    protected void reconnect(long sleep) throws Exception {
+        stompSocket.close();
+
+        if (sleep > 0) {
+            Thread.sleep(sleep);
+        }
+
+        stompSocket = createSocket();
+        inputBuffer = new ByteArrayOutputStream();
+    }
+
+    protected ConnectionFactory createConnectionFactory() {
+       return new HornetQConnectionFactory(new TransportConfiguration(InVMConnectorFactory.class.getName()));
+    }
+
+    protected Socket createSocket() throws IOException {
+        return new Socket("127.0.0.1", port);
+    }
+
+    protected String getQueueName() {
+        return "test";
+    }
+
+    public void sendFrame(String data) throws Exception {
+        byte[] bytes = data.getBytes("UTF-8");
+        OutputStream outputStream = stompSocket.getOutputStream();
+        for (int i = 0; i < bytes.length; i++) {
+            outputStream.write(bytes[i]);
+        }
+        outputStream.flush();
+    }
+
+    public String receiveFrame(long timeOut) throws Exception {
+        stompSocket.setSoTimeout((int) timeOut);
+        InputStream is = stompSocket.getInputStream();
+        int c = 0;
+        for (; ;) {
+            c = is.read();
+            if (c < 0) {
+                throw new IOException("socket closed.");
+            }
+            else if (c == 0) {
+                c = is.read();
+                if (c != '\n')
+                {
+                   byte[] ba = inputBuffer.toByteArray();
+                   System.out.println(new String(ba, "UTF-8"));
+                }
+                Assert.assertEquals("Expecting stomp frame to terminate with \0\n", c, '\n');
+                byte[] ba = inputBuffer.toByteArray();
+                inputBuffer.reset();
+                return new String(ba, "UTF-8");
+            }
+            else {
+                inputBuffer.write(c);
+            }
+        }
+    }
+
+    public void sendMessage(String msg) throws Exception {
+        sendMessage(msg, "foo", "xyz");
+    }
+
+    public void sendMessage(String msg, String propertyName, String propertyValue) throws JMSException {
+        MessageProducer producer = session.createProducer(queue);
+        TextMessage message = session.createTextMessage(msg);
+        message.setStringProperty(propertyName, propertyValue);
+        producer.send(message);
+    }
+
+    public void sendBytesMessage(byte[] msg) throws Exception {
+        MessageProducer producer = session.createProducer(queue);
+        BytesMessage message = session.createBytesMessage();
+        message.writeBytes(msg);
+        producer.send(message);
+    }
+
+    protected void waitForFrameToTakeEffect() throws InterruptedException {
+        // bit of a dirty hack :)
+        // another option would be to force some kind of receipt to be returned
+        // from the frame
+        Thread.sleep(2000);
+    }
+}

Modified: trunk/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyAcceptorFactoryTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyAcceptorFactoryTest.java	2010-01-22 14:25:11 UTC (rev 8842)
+++ trunk/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyAcceptorFactoryTest.java	2010-01-22 15:32:13 UTC (rev 8843)
@@ -73,8 +73,7 @@
                                                  null,
                                                  listener,
                                                  Executors.newCachedThreadPool(),
-                                                 Executors.newScheduledThreadPool(ConfigurationImpl.DEFAULT_SCHEDULED_THREAD_POOL_MAX_SIZE),
-                                                 ProtocolType.CORE);
+                                                 Executors.newScheduledThreadPool(ConfigurationImpl.DEFAULT_SCHEDULED_THREAD_POOL_MAX_SIZE));
 
       Assert.assertTrue(acceptor instanceof NettyAcceptor);
    }

Modified: trunk/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyAcceptorTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyAcceptorTest.java	2010-01-22 14:25:11 UTC (rev 8842)
+++ trunk/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyAcceptorTest.java	2010-01-22 15:32:13 UTC (rev 8843)
@@ -86,8 +86,7 @@
                                                  null,
                                                  listener,
                                                  Executors.newCachedThreadPool(),
-                                                 Executors.newScheduledThreadPool(ConfigurationImpl.DEFAULT_SCHEDULED_THREAD_POOL_MAX_SIZE),
-                                                 ProtocolType.CORE);
+                                                 Executors.newScheduledThreadPool(ConfigurationImpl.DEFAULT_SCHEDULED_THREAD_POOL_MAX_SIZE));
 
       acceptor.start();
       Assert.assertTrue(acceptor.isStarted());



More information about the hornetq-commits mailing list