[hornetq-commits] JBoss hornetq SVN: r11293 - in branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp: v10 and 1 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Sun Sep 4 20:34:30 EDT 2011


Author: gaohoward
Date: 2011-09-04 20:34:29 -0400 (Sun, 04 Sep 2011)
New Revision: 11293

Added:
   branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/VersionedStompFrameHandler.java
   branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v10/
   branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v10/StompFrameHandlerV10.java
   branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v11/
   branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v11/StompFrameHandlerV11.java
Modified:
   branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/HornetQStompException.java
   branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/Stomp.java
   branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompConnection.java
   branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompFrame.java
   branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompProtocolManager.java
   branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompSession.java
   branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompUtils.java
Log:
curr work


Modified: branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/HornetQStompException.java
===================================================================
--- branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/HornetQStompException.java	2011-09-02 11:52:55 UTC (rev 11292)
+++ branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/HornetQStompException.java	2011-09-05 00:34:29 UTC (rev 11293)
@@ -1,3 +1,15 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
 package org.hornetq.core.protocol.stomp;
 
 import java.util.ArrayList;
@@ -35,6 +47,11 @@
       this.body = body;
    }
 
+   public StompFrame getFrame()
+   {
+      return null;
+   }
+
    private class Header
    {
       public String key;

Modified: branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/Stomp.java
===================================================================
--- branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/Stomp.java	2011-09-02 11:52:55 UTC (rev 11292)
+++ branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/Stomp.java	2011-09-05 00:34:29 UTC (rev 11293)
@@ -40,12 +40,6 @@
 
       String UNSUBSCRIBE = "UNSUBSCRIBE";
 
-      String BEGIN_TRANSACTION = "BEGIN";
-
-      String COMMIT_TRANSACTION = "COMMIT";
-
-      String ABORT_TRANSACTION = "ABORT";
-
       String BEGIN = "BEGIN";
 
       String COMMIT = "COMMIT";
@@ -53,6 +47,11 @@
       String ABORT = "ABORT";
 
       String ACK = "ACK";
+
+      //1.1
+      String NACK = "NACK";
+      
+      String STOMP = "STOMP";
    }
 
    public interface Responses
@@ -76,6 +75,10 @@
 
       String CONTENT_LENGTH = "content-length";
 
+      String ACCEPT_VERSION = "accept-version";
+
+      String CONTENT_TYPE = "content-type";
+
       public interface Response
       {
          String RECEIPT_ID = "receipt-id";
@@ -140,6 +143,8 @@
             String AUTO = "auto";
 
             String CLIENT = "client";
+            
+            String CLIENT_INDIVIDUAL = "client-individual";
          }
       }
 
@@ -167,7 +172,11 @@
 
       public interface Error
       {
+         //1.0 only
          String MESSAGE = "message";
+         
+         //1.1
+         String VERSION = "version";
       }
 
       public interface Connected
@@ -175,11 +184,19 @@
          String SESSION = "session";
 
          String RESPONSE_ID = "response-id";
+
+         //1.1
+         String VERSION = "version";
+
+         String SERVER = "server";
       }
 
       public interface Ack
       {
          String MESSAGE_ID = "message-id";
+         
+         //1.1
+         String SUBSCRIPTION = "subscription";
       }
    }
 }

Modified: branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompConnection.java
===================================================================
--- branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompConnection.java	2011-09-02 11:52:55 UTC (rev 11292)
+++ branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompConnection.java	2011-09-05 00:34:29 UTC (rev 11293)
@@ -27,6 +27,8 @@
 import org.hornetq.core.logging.Logger;
 import org.hornetq.core.remoting.CloseListener;
 import org.hornetq.core.remoting.FailureListener;
+import org.hornetq.core.server.ServerMessage;
+import org.hornetq.core.server.impl.ServerMessageImpl;
 import org.hornetq.spi.core.protocol.RemotingConnection;
 import org.hornetq.spi.core.remoting.Connection;
 
@@ -39,8 +41,9 @@
  */
 public class StompConnection implements RemotingConnection
 {
-
    private static final Logger log = Logger.getLogger(StompConnection.class);
+   
+   protected static final String CONNECTION_ID_PROP = "__HQ_CID";
 
    private final StompProtocolManager manager;
 
@@ -68,7 +71,11 @@
    
    private volatile boolean dataReceived;
    
-   private StompVersions version = StompVersions.V1_0;
+   private StompVersions version;
+   
+   private VersionedStompFrameHandler frameHandler;
+   
+   private boolean initialized;
 
    public StompDecoder getDecoder()
    {
@@ -199,10 +206,6 @@
       manager.cleanup(this);
    }
 
-   public void disconnect()
-   {
-   }
-
    public void fail(final HornetQException me)
    {
       synchronized (failLock)
@@ -358,8 +361,10 @@
     * accept-version value takes form of "v1,v2,v3..."
     * we need to return the highest supported version
     */
-   public void negotiateVersion(String acceptVersion) throws HornetQStompException
+   public void negotiateVersion(StompFrame frame) throws HornetQStompException
    {
+      String acceptVersion = frame.getHeader(Stomp.Headers.ACCEPT_VERSION);
+      
       if (acceptVersion == null)
       {
          this.version = StompVersions.V1_0;
@@ -391,6 +396,9 @@
             throw error;
          }
       }
+      
+      this.frameHandler = VersionedStompFrameHandler.getHandler(this, this.version);
+      this.initialized = true;
    }
 
    //reject if the host doesn't match
@@ -411,4 +419,251 @@
          throw error;
       }
    }
+
+   public void handleFrame(StompFrame request)
+   {
+      StompFrame reply = null;
+      try
+      {
+         if (!initialized)
+         {
+            if (!Stomp.Commands.CONNECT.equals(request.getCommand()))
+            {
+               throw new HornetQStompException("Connection hasn't been established.");
+            }
+            //decide version
+            negotiateVersion(request);
+         }
+         reply = frameHandler.handleFrame(request);
+      }
+      catch (HornetQStompException e)
+      {
+         reply = e.getFrame();
+      }
+      
+      if (reply != null)
+      {
+         sendFrame(reply);
+      }
+   }
+
+   public void sendFrame(StompFrame frame)
+   {
+      manager.sendReply(this, frame);
+   }
+
+   public boolean validateUser(String login, String passcode)
+   {
+      this.valid = manager.validateUser(login, passcode);
+      if (valid)
+      {
+         this.login = login;
+         this.passcode = passcode;
+      }
+      return valid;
+   }
+
+   public ServerMessageImpl createServerMessage()
+   {
+      return manager.createServerMessage();
+   }
+
+   public StompSession getSession(String txID) throws HornetQStompException
+   {
+      StompSession session = null;
+      try
+      {
+         if (txID == null)
+         {
+            session = manager.getSession(this);
+         }
+         else
+         {
+            session = manager.getTransactedSession(this, txID);
+         }
+      }
+      catch (Exception e)
+      {
+         throw new HornetQStompException("Exception getting session", e);
+      }
+      
+      return session;
+   }
+
+   public void validate() throws HornetQStompException
+   {
+      if (!this.valid)
+      {
+         throw new HornetQStompException("Connection is not valid.");
+      }
+   }
+
+   public void sendServerMessage(ServerMessageImpl message, String txID) throws HornetQStompException
+   {
+      StompSession stompSession = getSession(txID);
+
+      if (stompSession.isNoLocal())
+      {
+         message.putStringProperty(CONNECTION_ID_PROP, getID().toString());
+      }
+      try
+      {
+         stompSession.getSession().send(message, true);
+      }
+      catch (Exception e)
+      {
+         throw new HornetQStompException("Error sending message " + message, e);
+      }
+   }
+
+   @Override
+   public void disconnect()
+   {
+      destroy();
+   }
+
+   public void beginTransaction(String txID) throws HornetQStompException
+   {
+      try
+      {
+         manager.beginTransaction(this, txID);
+      }
+      catch (HornetQStompException e)
+      {
+         throw e;
+      }
+      catch (Exception e)
+      {
+         throw new HornetQStompException("Error beginning a transaction: " + txID, e);
+      }
+   }
+
+   public void commitTransaction(String txID) throws HornetQStompException
+   {
+      try
+      {
+         manager.commitTransaction(this, txID);
+      }
+      catch (HornetQStompException e)
+      {
+         throw e;
+      }
+      catch (Exception e)
+      {
+         throw new HornetQStompException("Error committing " + txID, e);
+      }
+   }
+
+   public void abortTransaction(String txID) throws HornetQStompException
+   {
+      try
+      {
+         manager.abortTransaction(this, txID);
+      }
+      catch (HornetQStompException e)
+      {
+         throw e;
+      }
+      catch (Exception e)
+      {
+         throw new HornetQStompException("Error aborting " + txID, e);
+      }
+   }
+
+   public void subscribe(String destination, String selector, String ack,
+         String id, String durableSubscriptionName, boolean noLocal) throws HornetQStompException
+   {
+      if (noLocal)
+      {
+         String noLocalFilter = CONNECTION_ID_PROP + " <> '" + getID().toString() + "'";
+         if (selector == null)
+         {
+            selector = noLocalFilter;
+         }
+         else
+         {
+            selector += " AND " + noLocalFilter;
+         }
+      }
+      if (ack == null)
+      {
+         ack = Stomp.Headers.Subscribe.AckModeValues.AUTO;
+      }
+
+      String subscriptionID = null;
+      if (id != null)
+      {
+         subscriptionID = id;
+      }
+      else
+      {
+         if (destination == null)
+         {
+            throw new HornetQStompException("Client must set destination or id header to a SUBSCRIBE command");
+         }
+         subscriptionID = "subscription/" + destination;
+      }
+      
+      try
+      {
+         manager.createSubscription(this, subscriptionID, durableSubscriptionName, destination, selector, ack, noLocal);
+      }
+      catch (HornetQStompException e)
+      {
+         throw e;
+      }
+      catch (Exception e)
+      {
+         throw new HornetQStompException("Error creating subscription " + subscriptionID, e);
+      }
+   }
+
+   public void unsubscribe(String subscriptionID) throws HornetQStompException
+   {
+      try
+      {
+         manager.unsubscribe(this, subscriptionID);
+      }
+      catch (HornetQStompException e)
+      {
+         throw e;
+      }
+      catch (Exception e)
+      {
+         throw new HornetQStompException("Error unsubscripting " + subscriptionID, e);
+      }
+   }
+
+   public void acknowledge(String messageID, String subscriptionID) throws HornetQStompException
+   {
+      try
+      {
+         manager.acknowledge(this, messageID, subscriptionID);
+      }
+      catch (HornetQStompException e)
+      {
+         throw e;
+      }
+      catch (Exception e)
+      {
+         throw new HornetQStompException("Error acknowledging message " + messageID, e);
+      }
+   }
+
+   public String getVersion()
+   {
+      return String.valueOf(version);
+   }
+
+   public String getHornetQServerName()
+   {
+      //hard coded, review later.
+      return "HornetQ/2.2.5 HornetQ Messaging Engine";
+   }
+
+   public StompFrame createStompMessage(ServerMessage serverMessage,
+         StompSubscription subscription, int deliveryCount)
+   {
+      return frameHandler.createMessageFrame(serverMessage, subscription, deliveryCount);
+   }
 }

Modified: branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompFrame.java
===================================================================
--- branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompFrame.java	2011-09-02 11:52:55 UTC (rev 11292)
+++ branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompFrame.java	2011-09-05 00:34:29 UTC (rev 11293)
@@ -17,6 +17,10 @@
  */
 package org.hornetq.core.protocol.stomp;
 
+import java.io.UnsupportedEncodingException;
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 
@@ -31,7 +35,7 @@
  * @author Tim Fox
  * 
  */
-class StompFrame
+public class StompFrame
 {
    private static final Logger log = Logger.getLogger(StompFrame.class);
 
@@ -39,45 +43,30 @@
 
    private static final byte[] END_OF_FRAME = new byte[] { 0, '\n' };
 
-   private final String command;
+   private String command;
 
-   private final Map<String, Object> headers;
+   private Map<String, String> headers;
+   
+   //stomp 1.1 talks about repetitive headers.
+   private List<Header> allHeaders = new ArrayList<Header>();
 
-   private final byte[] content;
+   private String body;
 
    private HornetQBuffer buffer = null;
 
    private int size;
    
-   public StompFrame(String command, Map<String, Object> headers, byte[] data)
+   public StompFrame(String command)
    {
       this.command = command;
-      this.headers = headers;
-      this.content = data;
+      this.headers = new LinkedHashMap<String, String>();
    }
 
-   public StompFrame(String command, Map<String, Object> headers)
-   {
-      this.command = command;
-      this.headers = headers;
-      this.content = NO_DATA;
-   }
-
    public String getCommand()
    {
       return command;
    }
 
-   public byte[] getContent()
-   {
-      return content;
-   }
-   
-   public Map<String, Object> getHeaders()
-   {
-      return headers;
-   }
-
    public int getEncodedSize() throws Exception
    {
       if (buffer == null)
@@ -90,18 +79,18 @@
    @Override
    public String toString()
    {
-      return "StompFrame[command=" + command + ", headers=" + headers + ", content-length=" + content.length + "]";
+      return "StompFrame[command=" + command + ", headers=" + headers + ", content-length=";
    }
 
    public String asString()
    {
       String out = command + '\n';
-      for (Entry<String, Object> header : headers.entrySet())
+      for (Entry<String, String> header : headers.entrySet())
       {
          out += header.getKey() + ": " + header.getValue() + '\n';
       }
       out += '\n';
-      out += new String(content);
+      out += body;
       return out;
    }
 
@@ -116,7 +105,7 @@
          head.append(command);
          head.append(Stomp.NEWLINE);
          // Output the headers.
-         for (Map.Entry<String, Object> header : headers.entrySet())
+         for (Map.Entry<String, String> header : headers.entrySet())
          {
             head.append(header.getKey());
             head.append(Stomp.Headers.SEPARATOR);
@@ -134,4 +123,60 @@
       }
       return buffer;
    }
+
+   public String getHeader(String key)
+   {
+      return headers.get(key);
+   }
+
+   public void addHeader(String key, String val)
+   {
+      if (!headers.containsKey(key))
+      {
+         headers.put(key, val);
+      }
+      allHeaders.add(new Header(key, val));
+   }
+   
+   public Map<String, String> getHeadersMap()
+   {
+      return headers;
+   }
+   
+   private class Header
+   {
+      public String key;
+      public String val;
+      
+      public Header(String key, String val)
+      {
+         this.key = key;
+         this.val = val;
+      }
+   }
+
+   public void setBody(String body)
+   {
+      this.body = body;
+   }
+
+   public boolean hasHeader(String key)
+   {
+      return headers.containsKey(key);
+   }
+
+   public String getBody()
+   {
+      return body;
+   }
+   
+   //Since 1.1, there is a content-type header that needs to take care of
+   public byte[] getBodyAsBytes() throws UnsupportedEncodingException
+   {
+      if (body != null)
+      {
+         return body.getBytes("UTF-8");
+      }
+      return new byte[0];
+   }
 }

Modified: branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompProtocolManager.java
===================================================================
--- branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompProtocolManager.java	2011-09-02 11:52:55 UTC (rev 11292)
+++ branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompProtocolManager.java	2011-09-05 00:34:29 UTC (rev 11293)
@@ -13,10 +13,6 @@
 
 package org.hornetq.core.protocol.stomp;
 
-import java.io.ByteArrayOutputStream;
-import java.io.OutputStreamWriter;
-import java.io.PrintWriter;
-import java.io.UnsupportedEncodingException;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
@@ -27,8 +23,6 @@
 import org.hornetq.api.core.HornetQBuffer;
 import org.hornetq.api.core.HornetQException;
 import org.hornetq.api.core.Interceptor;
-import org.hornetq.api.core.Message;
-import org.hornetq.api.core.SimpleString;
 import org.hornetq.api.core.client.HornetQClient;
 import org.hornetq.core.journal.IOAsyncTask;
 import org.hornetq.core.logging.Logger;
@@ -53,9 +47,6 @@
 
    private static final Logger log = Logger.getLogger(StompProtocolManager.class);
 
-   // TODO use same value than HornetQConnection
-   private static final String CONNECTION_ID_PROP = "__HQ_CID";
-
    // Attributes ----------------------------------------------------
 
    private final HornetQServer server;
@@ -69,36 +60,6 @@
 
    // Static --------------------------------------------------------
 
-   private static StompFrame createError(Exception e, StompFrame request)
-   {
-      ByteArrayOutputStream baos = new ByteArrayOutputStream();
-      try
-      {
-         // Let the stomp client know about any protocol errors.
-         PrintWriter stream = new PrintWriter(new OutputStreamWriter(baos, "UTF-8"));
-         e.printStackTrace(stream);
-         stream.close();
-
-         Map<String, Object> headers = new HashMap<String, Object>();
-         headers.put(Stomp.Headers.Error.MESSAGE, e.getMessage());
-
-         final String receiptId = (String)request.getHeaders().get(Stomp.Headers.RECEIPT_REQUESTED);
-         if (receiptId != null)
-         {
-            headers.put(Stomp.Headers.Response.RECEIPT_ID, receiptId);
-         }
-
-         byte[] payload = baos.toByteArray();
-         headers.put(Stomp.Headers.CONTENT_LENGTH, payload.length);
-         return new StompFrame(Stomp.Responses.ERROR, headers, payload);
-      }
-      catch (UnsupportedEncodingException ex)
-      {
-         log.warn("Unable to create ERROR frame from the exception", ex);
-         return null;
-      }
-   }
-
    // Constructors --------------------------------------------------
 
    public StompProtocolManager(final HornetQServer server, final List<Interceptor> interceptors)
@@ -143,19 +104,15 @@
 
    public void handleBuffer(final RemotingConnection connection, final HornetQBuffer buffer)
    {
-      long start = System.nanoTime();
       StompConnection conn = (StompConnection)connection;
       
       conn.setDataReceived();
       
       StompDecoder decoder = conn.getDecoder();
-      
-     // log.info("in handle");
 
       do
       {
          StompFrame request;
-         
          try
          {
             request = decoder.decode(buffer);
@@ -163,7 +120,6 @@
          catch (Exception e)
          {
             log.error("Failed to decode", e);
-
             return;
          }
          
@@ -174,93 +130,13 @@
 
          try
          {
-            String command = request.getCommand();
-
-            StompFrame response = null;
-
-            if (Stomp.Commands.CONNECT.equals(command))
-            {
-               response = onConnect(request, conn);
-            }
-            else if (Stomp.Commands.DISCONNECT.equals(command))
-            {
-               response = onDisconnect(request, conn);
-            }
-            else if (Stomp.Commands.SEND.equals(command))
-            {
-               response = onSend(request, conn);
-            }
-            else if (Stomp.Commands.SUBSCRIBE.equals(command))
-            {
-               response = onSubscribe(request, conn);
-            }
-            else if (Stomp.Commands.UNSUBSCRIBE.equals(command))
-            {
-               response = onUnsubscribe(request, conn);
-            }
-            else if (Stomp.Commands.ACK.equals(command))
-            {
-               response = onAck(request, conn);
-            }
-            else if (Stomp.Commands.BEGIN.equals(command))
-            {
-               response = onBegin(request, server, conn);
-            }
-            else if (Stomp.Commands.COMMIT.equals(command))
-            {
-               response = onCommit(request, conn);
-            }
-            else if (Stomp.Commands.ABORT.equals(command))
-            {
-               response = onAbort(request, conn);
-            }
-            else
-            {
-               log.error("Unsupported Stomp frame: " + request);
-               response = new StompFrame(Stomp.Responses.ERROR,
-                                         new HashMap<String, Object>(),
-                                         ("Unsupported frame: " + command).getBytes());
-            }
-
-            if (request.getHeaders().containsKey(Stomp.Headers.RECEIPT_REQUESTED))
-            {
-               if (response == null)
-               {
-                  Map<String, Object> h = new HashMap<String, Object>();
-                  response = new StompFrame(Stomp.Responses.RECEIPT, h);
-               }
-               response.getHeaders().put(Stomp.Headers.Response.RECEIPT_ID,
-                                         request.getHeaders().get(Stomp.Headers.RECEIPT_REQUESTED));
-            }
-
-            if (response != null)
-            {
-               sendReply(conn, response);
-            }
-
-            if (Stomp.Commands.DISCONNECT.equals(command))
-            {
-               conn.destroy();
-            }
+            conn.handleFrame(request);
          }
-         catch (Exception e)
-         {
-            e.printStackTrace();
-            StompFrame error = createError(e, request);
-            if (error != null)
-            {
-               sendReply(conn, error);
-            }
-         }
          finally
          {
             server.getStorageManager().clearContext();
          }
       } while (decoder.hasBytes());
-      
-      long end = System.nanoTime();
-      
-     // log.info("handle took " + (end-start));
    }
 
    // Public --------------------------------------------------------
@@ -297,182 +173,8 @@
 
    // Private -------------------------------------------------------
 
-   private StompFrame onSubscribe(StompFrame frame, StompConnection connection) throws Exception
+   public StompSession getSession(StompConnection connection) throws Exception
    {
-      Map<String, Object> headers = frame.getHeaders();
-      String destination = (String)headers.get(Stomp.Headers.Subscribe.DESTINATION);
-      String selector = (String)headers.get(Stomp.Headers.Subscribe.SELECTOR);
-      String ack = (String)headers.get(Stomp.Headers.Subscribe.ACK_MODE);
-      String id = (String)headers.get(Stomp.Headers.Subscribe.ID);
-      String durableSubscriptionName = (String)headers.get(Stomp.Headers.Subscribe.DURABLE_SUBSCRIBER_NAME);
-      boolean noLocal = false;
-      if (headers.containsKey(Stomp.Headers.Subscribe.NO_LOCAL))
-      {
-         noLocal = Boolean.parseBoolean((String)headers.get(Stomp.Headers.Subscribe.NO_LOCAL));
-      }
-      if (noLocal)
-      {
-         String noLocalFilter = CONNECTION_ID_PROP + " <> '" + connection.getID().toString() + "'";
-         if (selector == null)
-         {
-            selector = noLocalFilter;
-         }
-         else
-         {
-            selector += " AND " + noLocalFilter;
-         }
-      }
-      if (ack == null)
-      {
-         ack = Stomp.Headers.Subscribe.AckModeValues.AUTO;
-      }
-      String subscriptionID = null;
-      if (id != null)
-      {
-         subscriptionID = id;
-      }
-      else
-      {
-         if (destination == null)
-         {
-            throw new StompException("Client must set destination or id header to a SUBSCRIBE command");
-         }
-         subscriptionID = "subscription/" + destination;
-      }
-      StompSession stompSession = getSession(connection);
-      stompSession.setNoLocal(noLocal);
-      if (stompSession.containsSubscription(subscriptionID))
-      {
-         throw new StompException("There already is a subscription for: " + subscriptionID +
-                                  ". Either use unique subscription IDs or do not create multiple subscriptions for the same destination");
-      }
-      long consumerID = server.getStorageManager().generateUniqueID();
-      String clientID = (connection.getClientID() != null) ? connection.getClientID() : null;
-      stompSession.addSubscription(consumerID,
-                                   subscriptionID,
-                                   clientID,
-                                   durableSubscriptionName,
-                                   destination,
-                                   selector,
-                                   ack);
-
-      return null;
-   }
-
-   private StompFrame onUnsubscribe(StompFrame frame, StompConnection connection) throws Exception
-   {
-      Map<String, Object> headers = frame.getHeaders();
-      String destination = (String)headers.get(Stomp.Headers.Unsubscribe.DESTINATION);
-      String id = (String)headers.get(Stomp.Headers.Unsubscribe.ID);
-
-      String subscriptionID = null;
-      if (id != null)
-      {
-         subscriptionID = id;
-      }
-      else
-      {
-         if (destination == null)
-         {
-            throw new StompException("Must specify the subscription's id or the destination you are unsubscribing from");
-         }
-         subscriptionID = "subscription/" + destination;
-      }
-
-      StompSession stompSession = getSession(connection);
-      boolean unsubscribed = stompSession.unsubscribe(subscriptionID);
-      if (!unsubscribed)
-      {
-         throw new StompException("Cannot unsubscribe as no subscription exists for id: " + subscriptionID);
-      }
-      return null;
-   }
-
-   private StompFrame onAck(StompFrame frame, StompConnection connection) throws Exception
-   {
-      Map<String, Object> headers = frame.getHeaders();
-      String messageID = (String)headers.get(Stomp.Headers.Ack.MESSAGE_ID);
-      String txID = (String)headers.get(Stomp.Headers.TRANSACTION);
-      StompSession stompSession = null;
-      if (txID != null)
-      {
-         log.warn("Transactional acknowledgement is not supported");
-      }
-      stompSession = getSession(connection);
-      stompSession.acknowledge(messageID);
-
-      return null;
-   }
-
-   private StompFrame onBegin(StompFrame frame, HornetQServer server, StompConnection connection) throws Exception
-   {
-      Map<String, Object> headers = frame.getHeaders();
-      String txID = (String)headers.get(Stomp.Headers.TRANSACTION);
-      if (txID == null)
-      {
-         throw new StompException("transaction header is mandatory to BEGIN a transaction");
-      }
-      if (transactedSessions.containsKey(txID))
-      {
-         throw new StompException("Transaction already started: " + txID);
-      }
-      // create the transacted session
-      getTransactedSession(connection, txID);
-
-      return null;
-   }
-
-   private StompFrame onCommit(StompFrame frame, StompConnection connection) throws Exception
-   {
-      Map<String, Object> headers = frame.getHeaders();
-      String txID = (String)headers.get(Stomp.Headers.TRANSACTION);
-      if (txID == null)
-      {
-         throw new StompException("transaction header is mandatory to COMMIT a transaction");
-      }
-
-      StompSession session = getTransactedSession(connection, txID);
-      if (session == null)
-      {
-         throw new StompException("No transaction started: " + txID);
-      }
-      transactedSessions.remove(txID);
-      session.getSession().commit();
-
-      return null;
-   }
-
-   private StompFrame onAbort(StompFrame frame, StompConnection connection) throws Exception
-   {
-      Map<String, Object> headers = frame.getHeaders();
-      String txID = (String)headers.get(Stomp.Headers.TRANSACTION);
-      if (txID == null)
-      {
-         throw new StompException("transaction header is mandatory to ABORT a transaction");
-      }
-
-      StompSession session = getTransactedSession(connection, txID);
-
-      if (session == null)
-      {
-         throw new StompException("No transaction started: " + txID);
-      }
-      transactedSessions.remove(txID);
-      session.getSession().rollback(false);
-
-      return null;
-   }
-
-   private void checkConnected(StompConnection connection) throws StompException
-   {
-      if (!connection.isValid())
-      {
-         throw new StompException("Not connected");
-      }
-   }
-
-   private StompSession getSession(StompConnection connection) throws Exception
-   {
       StompSession stompSession = sessions.get(connection.getID());
       if (stompSession == null)
       {
@@ -497,7 +199,7 @@
       return stompSession;
    }
 
-   private StompSession getTransactedSession(StompConnection connection, String txID) throws Exception
+   public StompSession getTransactedSession(StompConnection connection, String txID) throws Exception
    {
       StompSession stompSession = transactedSessions.get(txID);
       if (stompSession == null)
@@ -522,89 +224,6 @@
       return stompSession;
    }
 
-   private StompFrame onDisconnect(StompFrame frame, StompConnection connection) throws Exception
-   {
-      cleanup(connection);
-      return null;
-   }
-   
-   private StompFrame onSend(StompFrame frame, StompConnection connection) throws Exception
-   {
-      checkConnected(connection);
-      Map<String, Object> headers = frame.getHeaders();
-      String destination = (String)headers.remove(Stomp.Headers.Send.DESTINATION);
-      String txID = (String)headers.remove(Stomp.Headers.TRANSACTION);
-      long timestamp = System.currentTimeMillis();
-
-      ServerMessageImpl message = new ServerMessageImpl(server.getStorageManager().generateUniqueID(), 512);
-      message.setTimestamp(timestamp);
-      message.setAddress(SimpleString.toSimpleString(destination));
-      StompUtils.copyStandardHeadersFromFrameToMessage(frame, message);
-      if (headers.containsKey(Stomp.Headers.CONTENT_LENGTH))
-      {
-         message.setType(Message.BYTES_TYPE);
-         message.getBodyBuffer().writeBytes(frame.getContent());
-      }
-      else
-      {
-         message.setType(Message.TEXT_TYPE);
-         String text = new String(frame.getContent(), "UTF-8");
-         message.getBodyBuffer().writeNullableSimpleString(SimpleString.toSimpleString(text));
-      }
-
-      StompSession stompSession = null;
-      if (txID == null)
-      {
-         stompSession = getSession(connection);
-      }
-      else
-      {
-         stompSession = getTransactedSession(connection, txID);
-      }
-      if (stompSession.isNoLocal())
-      {
-         message.putStringProperty(CONNECTION_ID_PROP, connection.getID().toString());
-      }
-      stompSession.getSession().send(message, true);           
-      
-      return null;
-   }
-
-   private StompFrame onConnect(StompFrame frame, final StompConnection connection) throws Exception
-   {
-      Map<String, Object> headers = frame.getHeaders();
-      String login = (String)headers.get(Stomp.Headers.Connect.LOGIN);
-      String passcode = (String)headers.get(Stomp.Headers.Connect.PASSCODE);
-      String clientID = (String)headers.get(Stomp.Headers.Connect.CLIENT_ID);
-      String requestID = (String)headers.get(Stomp.Headers.Connect.REQUEST_ID);
-      //since 1.1
-      String acceptVersion = (String)headers.get(Stomp.Headers.Connect.ACCEPT_VERSION);
-      String host = (String)headers.get(Stomp.Headers.Connect.HOST);
-
-      HornetQSecurityManager sm = server.getSecurityManager();
-      
-      // The sm will be null case security is not enabled...
-      if (sm != null)
-      {
-         sm.validateUser(login, passcode);
-      }
-
-      connection.negotiateVersion(acceptVersion);
-      connection.setHost(host);
-      connection.setLogin(login);
-      connection.setPasscode(passcode);
-      connection.setClientID(clientID);
-      connection.setValid(true);
-
-      HashMap<String, Object> h = new HashMap<String, Object>();
-      h.put(Stomp.Headers.Connected.SESSION, connection.getID());
-      if (requestID != null)
-      {
-         h.put(Stomp.Headers.Connected.RESPONSE_ID, requestID);
-      }
-      return new StompFrame(Stomp.Responses.CONNECTED, h);
-   }
-
    public void cleanup(final StompConnection connection)
    {
       connection.setValid(false);
@@ -652,15 +271,18 @@
       });
    }
 
-   private void sendReply(final StompConnection connection, final StompFrame frame)
+   public void sendReply(final StompConnection connection, final StompFrame frame)
    {
       server.getStorageManager().afterCompleteOperations(new IOAsyncTask()
       {
          public void onError(final int errorCode, final String errorMessage)
          {
             log.warn("Error processing IOCallback code = " + errorCode + " message = " + errorMessage);
+            
+            HornetQStompException e = new HornetQStompException("Error sending reply",
+                  new HornetQException(errorCode, errorMessage));
 
-            StompFrame error = createError(new HornetQException(errorCode, errorMessage), frame);
+            StompFrame error = e.getFrame();
             send(connection, error);
          }
 
@@ -681,5 +303,95 @@
       return "hornetq";
    }
 
+   public boolean validateUser(String login, String passcode)
+   {
+      boolean validated = true;
+      
+      HornetQSecurityManager sm = server.getSecurityManager();
+      
+      // The sm will be null case security is not enabled...
+      if (sm != null)
+      {
+         validated = sm.validateUser(login, passcode);
+      }
+      
+      return validated;
+   }
+
+   public ServerMessageImpl createServerMessage()
+   {
+      return new ServerMessageImpl(server.getStorageManager().generateUniqueID(), 512);
+   }
+
+   public void commitTransaction(StompConnection connection, String txID) throws Exception
+   {
+      StompSession session = getTransactedSession(connection, txID);
+      if (session == null)
+      {
+         throw new HornetQStompException("No transaction started: " + txID);
+      }
+      transactedSessions.remove(txID);
+      session.getSession().commit();
+   }
+
+   public void abortTransaction(StompConnection connection, String txID) throws Exception
+   {
+      StompSession session = getTransactedSession(connection, txID);
+      if (session == null)
+      {
+         throw new HornetQStompException("No transaction started: " + txID);
+      }
+      transactedSessions.remove(txID);
+      session.getSession().rollback(false);
+   }
    // Inner classes -------------------------------------------------
+
+   public void createSubscription(StompConnection connection,
+         String subscriptionID, String durableSubscriptionName,
+         String destination, String selector, String ack, boolean noLocal) throws Exception
+   {
+      StompSession stompSession = getSession(connection);
+      stompSession.setNoLocal(noLocal);
+      if (stompSession.containsSubscription(subscriptionID))
+      {
+         throw new HornetQStompException("There already is a subscription for: " + subscriptionID +
+                                  ". Either use unique subscription IDs or do not create multiple subscriptions for the same destination");
+      }
+      long consumerID = server.getStorageManager().generateUniqueID();
+      String clientID = (connection.getClientID() != null) ? connection.getClientID() : null;
+      stompSession.addSubscription(consumerID,
+                                   subscriptionID,
+                                   clientID,
+                                   durableSubscriptionName,
+                                   destination,
+                                   selector,
+                                   ack);
+   }
+
+   public void unsubscribe(StompConnection connection,
+         String subscriptionID) throws Exception
+   {
+      StompSession stompSession = getSession(connection);
+      boolean unsubscribed = stompSession.unsubscribe(subscriptionID);
+      if (!unsubscribed)
+      {
+         throw new HornetQStompException("Cannot unsubscribe as no subscription exists for id: " + subscriptionID);
+      }
+   }
+
+   public void acknowledge(StompConnection connection, String messageID, String subscriptionID) throws Exception
+   {
+      StompSession stompSession = getSession(connection);
+      stompSession.acknowledge(messageID, subscriptionID);
+   }
+
+   public void beginTransaction(StompConnection connection, String txID) throws Exception
+   {
+      if (transactedSessions.containsKey(txID))
+      {
+         throw new HornetQStompException("Transaction already started: " + txID);
+      }
+      // create the transacted session
+      getTransactedSession(connection, txID);
+   }
 }

Modified: branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompSession.java
===================================================================
--- branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompSession.java	2011-09-02 11:52:55 UTC (rev 11292)
+++ branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompSession.java	2011-09-05 00:34:29 UTC (rev 11293)
@@ -12,7 +12,6 @@
  */
 package org.hornetq.core.protocol.stomp;
 
-import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map;
 import java.util.Map.Entry;
@@ -39,7 +38,7 @@
  *
  * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
  */
-class StompSession implements SessionCallback
+public class StompSession implements SessionCallback
 {
    private static final Logger log = Logger.getLogger(StompSession.class);
 
@@ -84,41 +83,9 @@
       try
       {
          StompSubscription subscription = subscriptions.get(consumerID);
-
-         Map<String, Object> headers = new HashMap<String, Object>();
-         headers.put(Stomp.Headers.Message.DESTINATION, serverMessage.getAddress().toString());
-         if (subscription.getID() != null)
-         {
-            headers.put(Stomp.Headers.Message.SUBSCRIPTION, subscription.getID());
-         }
-         HornetQBuffer buffer = serverMessage.getBodyBuffer();
-
-         int bodyPos = serverMessage.getEndOfBodyPosition() == -1 ? buffer.writerIndex()
-                                                                 : serverMessage.getEndOfBodyPosition();
-         int size = bodyPos - buffer.readerIndex();
-         buffer.readerIndex(MessageImpl.BUFFER_HEADER_SPACE + DataConstants.SIZE_INT);
-         byte[] data = new byte[size];
-         if (serverMessage.containsProperty(Stomp.Headers.CONTENT_LENGTH) || serverMessage.getType() == Message.BYTES_TYPE)
-         {
-            headers.put(Headers.CONTENT_LENGTH, data.length);
-            buffer.readBytes(data);
-         }
-         else
-         {
-            SimpleString text = buffer.readNullableSimpleString();
-            if (text != null)
-            {
-               data = text.toString().getBytes("UTF-8");
-            }
-            else
-            {
-               data = new byte[0];
-            }
-         }
-         serverMessage.getBodyBuffer().resetReaderIndex();
-         StompFrame frame = new StompFrame(Stomp.Responses.MESSAGE, headers, data);
-         StompUtils.copyStandardHeadersFromMessageToFrame(serverMessage, frame, deliveryCount);
-
+         
+         StompFrame frame = connection.createStompMessage(serverMessage, subscription, deliveryCount);
+         
          if (subscription.getAck().equals(Stomp.Headers.Subscribe.AckModeValues.AUTO))
          {
             session.acknowledge(consumerID, serverMessage.getMessageID());
@@ -168,10 +135,19 @@
       connection.getTransportConnection().removeReadyListener(listener);
    }
 
-   public void acknowledge(String messageID) throws Exception
+   public void acknowledge(String messageID, String subscriptionID) throws Exception
    {
       long id = Long.parseLong(messageID);
       long consumerID = messagesToAck.remove(id);
+      StompSubscription sub = subscriptions.get(consumerID);
+
+      if (subscriptionID != null)
+      {
+         if (!sub.getID().equals(subscriptionID))
+         {
+            throw new HornetQStompException("subscription id " + subscriptionID + " does not match " + sub.getID());
+         }
+      }
       session.acknowledge(consumerID, id);
       session.commit();
    }

Modified: branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompUtils.java
===================================================================
--- branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompUtils.java	2011-09-02 11:52:55 UTC (rev 11292)
+++ branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompUtils.java	2011-09-05 00:34:29 UTC (rev 11293)
@@ -32,7 +32,7 @@
  *
  *
  */
-class StompUtils
+public class StompUtils
 {
    // Constants -----------------------------------------------------
    private static final String DEFAULT_MESSAGE_PRIORITY= "4";
@@ -46,8 +46,6 @@
 
    public static void copyStandardHeadersFromFrameToMessage(StompFrame frame, ServerMessageImpl msg) throws Exception
    {
-      Map<String, Object> headers = new HashMap<String, Object>(frame.getHeaders());
-
       String priority = (String)headers.remove(Stomp.Headers.Send.PRIORITY);
       if (priority != null)
       {
@@ -93,27 +91,26 @@
 
    public static void copyStandardHeadersFromMessageToFrame(MessageInternal message, StompFrame command, int deliveryCount) throws Exception
    {
-      final Map<String, Object> headers = command.getHeaders();
-      headers.put(Stomp.Headers.Message.DESTINATION, message.getAddress().toString());
-      headers.put(Stomp.Headers.Message.MESSAGE_ID, message.getMessageID());
+      command.addHeader(Stomp.Headers.Message.MESSAGE_ID, String.valueOf(message.getMessageID()));
+      command.addHeader(Stomp.Headers.Message.DESTINATION, message.getAddress().toString());
 
       if (message.getObjectProperty("JMSCorrelationID") != null)
       {
-         headers.put(Stomp.Headers.Message.CORRELATION_ID, message.getObjectProperty("JMSCorrelationID"));
+         command.addHeader(Stomp.Headers.Message.CORRELATION_ID, message.getObjectProperty("JMSCorrelationID").toString());
       }
-      headers.put(Stomp.Headers.Message.EXPIRATION_TIME, "" + message.getExpiration());
-      headers.put(Stomp.Headers.Message.REDELIVERED, deliveryCount > 1);
-      headers.put(Stomp.Headers.Message.PRORITY, "" + message.getPriority());
+      command.addHeader(Stomp.Headers.Message.EXPIRATION_TIME, "" + message.getExpiration());
+      command.addHeader(Stomp.Headers.Message.REDELIVERED, String.valueOf(deliveryCount > 1));
+      command.addHeader(Stomp.Headers.Message.PRORITY, "" + message.getPriority());
       if (message.getStringProperty(ClientMessageImpl.REPLYTO_HEADER_NAME) != null)
       {
-         headers.put(Stomp.Headers.Message.REPLY_TO,
+         command.addHeader(Stomp.Headers.Message.REPLY_TO,
                      message.getStringProperty(ClientMessageImpl.REPLYTO_HEADER_NAME));
       }
-      headers.put(Stomp.Headers.Message.TIMESTAMP, "" + message.getTimestamp());
+      command.addHeader(Stomp.Headers.Message.TIMESTAMP, "" + message.getTimestamp());
 
       if (message.getObjectProperty("JMSType") != null)
       {
-         headers.put(Stomp.Headers.Message.TYPE, message.getObjectProperty("JMSType"));
+         command.addHeader(Stomp.Headers.Message.TYPE, message.getObjectProperty("JMSType").toString());
       }
 
       // now lets add all the message headers
@@ -127,7 +124,7 @@
             continue;
          }
 
-         headers.put(name.toString(), message.getObjectProperty(name));
+         command.addHeader(name.toString(), message.getObjectProperty(name).toString());
       }
    }
    // Constructors --------------------------------------------------

Added: branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/VersionedStompFrameHandler.java
===================================================================
--- branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/VersionedStompFrameHandler.java	                        (rev 0)
+++ branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/VersionedStompFrameHandler.java	2011-09-05 00:34:29 UTC (rev 11293)
@@ -0,0 +1,131 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.hornetq.core.protocol.stomp;
+
+import java.io.UnsupportedEncodingException;
+
+import org.hornetq.core.protocol.stomp.v10.StompFrameHandlerV10;
+import org.hornetq.core.protocol.stomp.v11.StompFrameHandlerV11;
+import org.hornetq.core.server.ServerMessage;
+
+/**
+ *
+ * @author <a href="mailto:hgao at redhat.com">Howard Gao</a>
+ */
+public abstract class VersionedStompFrameHandler
+{   
+   protected StompConnection connection;
+   
+   public static VersionedStompFrameHandler getHandler(StompConnection connection, StompVersions version)
+   {
+      if (version == StompVersions.V1_0)
+      {
+         return new StompFrameHandlerV10(connection);
+      }
+      if (version == StompVersions.V1_1)
+      {
+         return new StompFrameHandlerV11(connection);
+      }
+      return null;
+   }
+
+   public StompFrame handleFrame(StompFrame request)
+   {
+      StompFrame response = null;
+      
+      if (Stomp.Commands.SEND.equals(request.getCommand()))
+      {
+         response = onSend(request);
+      }
+      else if (Stomp.Commands.ACK.equals(request.getCommand()))
+      {
+         response = onAck(request);
+      }
+      else if (Stomp.Commands.NACK.equals(request.getCommand()))
+      {
+         response = onNack(request);
+      }
+      else if (Stomp.Commands.BEGIN.equals(request.getCommand()))
+      {
+         response = onBegin(request);
+      }
+      else if (Stomp.Commands.COMMIT.equals(request.getCommand()))
+      {
+         response = onCommit(request);
+      }
+      else if (Stomp.Commands.ABORT.equals(request.getCommand()))
+      {
+         response = onAbort(request);
+      }
+      else if (Stomp.Commands.SUBSCRIBE.equals(request.getCommand()))
+      {
+         response = onSubscribe(request);
+      }
+      else if (Stomp.Commands.UNSUBSCRIBE.equals(request.getCommand()))
+      {
+         response = onUnsubscribe(request);
+      }
+      else if (Stomp.Commands.CONNECT.equals(request.getCommand()))
+      {
+         response = onConnect(request);
+      }
+      else if (Stomp.Commands.STOMP.equals(request.getCommand()))
+      {
+         response = onStomp(request);
+      }
+      else if (Stomp.Commands.DISCONNECT.equals(request.getCommand()))
+      {
+         response = onDisconnect(request);
+      }
+      else
+      {
+         response = onUnknown(request.getCommand());
+      }
+      
+      if (request.hasHeader(Stomp.Headers.RECEIPT_REQUESTED) && (response == null))
+      {
+         response = handleReceipt(request.getHeader(Stomp.Headers.RECEIPT_REQUESTED));
+      }
+      
+      return response;
+   }
+
+   public abstract StompFrame onConnect(StompFrame frame);
+   public abstract StompFrame onDisconnect(StompFrame frame);
+   public abstract StompFrame onSend(StompFrame frame);
+   public abstract StompFrame onAck(StompFrame request);
+   public abstract StompFrame onBegin(StompFrame frame);
+   public abstract StompFrame onCommit(StompFrame request);
+   public abstract StompFrame onAbort(StompFrame request);
+   public abstract StompFrame onSubscribe(StompFrame request);
+   public abstract StompFrame onUnsubscribe(StompFrame request);
+   public abstract StompFrame onStomp(StompFrame request);
+   public abstract StompFrame onNack(StompFrame request);
+   
+   public StompFrame onUnknown(String command)
+   {
+      StompFrame response = new HornetQStompException("Unsupported command " + command).getFrame();
+      return response;
+   }
+   
+   public StompFrame handleReceipt(String receiptID)
+   {
+      StompFrame receipt = new StompFrame(Stomp.Responses.RECEIPT);
+      receipt.addHeader(Stomp.Headers.Response.RECEIPT_ID, receiptID);
+      
+      return receipt;
+   }
+
+   public abstract StompFrame createMessageFrame(ServerMessage serverMessage,
+         StompSubscription subscription, int deliveryCount) throws Exception;
+}

Added: branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v10/StompFrameHandlerV10.java
===================================================================
--- branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v10/StompFrameHandlerV10.java	                        (rev 0)
+++ branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v10/StompFrameHandlerV10.java	2011-09-05 00:34:29 UTC (rev 11293)
@@ -0,0 +1,348 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.hornetq.core.protocol.stomp.v10;
+
+import java.io.UnsupportedEncodingException;
+import java.util.Map;
+
+import org.hornetq.api.core.HornetQBuffer;
+import org.hornetq.api.core.Message;
+import org.hornetq.api.core.SimpleString;
+import org.hornetq.core.logging.Logger;
+import org.hornetq.core.message.impl.MessageImpl;
+import org.hornetq.core.protocol.stomp.HornetQStompException;
+import org.hornetq.core.protocol.stomp.Stomp;
+import org.hornetq.core.protocol.stomp.StompConnection;
+import org.hornetq.core.protocol.stomp.StompFrame;
+import org.hornetq.core.protocol.stomp.StompSubscription;
+import org.hornetq.core.protocol.stomp.StompUtils;
+import org.hornetq.core.protocol.stomp.VersionedStompFrameHandler;
+import org.hornetq.core.protocol.stomp.Stomp.Headers;
+import org.hornetq.core.server.ServerMessage;
+import org.hornetq.core.server.impl.ServerMessageImpl;
+import org.hornetq.utils.DataConstants;
+
+/**
+*
+* @author <a href="mailto:hgao at redhat.com">Howard Gao</a>
+*/
+public class StompFrameHandlerV10 extends VersionedStompFrameHandler
+{
+   private static final Logger log = Logger.getLogger(StompFrameHandlerV10.class);
+   
+   public StompFrameHandlerV10(StompConnection connection)
+   {
+      this.connection = connection;
+   }
+
+   @Override
+   public StompFrame onConnect(StompFrame frame)
+   {
+      StompFrame response = null;
+      Map<String, String> headers = frame.getHeadersMap();
+      String login = (String)headers.get(Stomp.Headers.Connect.LOGIN);
+      String passcode = (String)headers.get(Stomp.Headers.Connect.PASSCODE);
+      String clientID = (String)headers.get(Stomp.Headers.Connect.CLIENT_ID);
+      String requestID = (String)headers.get(Stomp.Headers.Connect.REQUEST_ID);
+
+      if (connection.validateUser(login, passcode))
+      {
+         connection.setClientID(clientID);
+         connection.setValid(true);
+         
+         response = new StompFrame(Stomp.Responses.CONNECTED);
+         
+         response.addHeader(Stomp.Headers.Connected.SESSION, connection.getID().toString());
+         
+         if (requestID != null)
+         {
+            response.addHeader(Stomp.Headers.Connected.RESPONSE_ID, requestID);
+         }
+      }
+      else
+      {
+         //not valid
+         response = new StompFrame(Stomp.Responses.ERROR);
+         response.addHeader(Stomp.Headers.Error.MESSAGE, "Failed to connect");
+         response.setBody("The login account is not valid.");
+         
+         connection.sendFrame(response);
+         connection.destroy();
+         
+         return null;
+      }
+      return response;
+   }
+
+   @Override
+   public StompFrame onDisconnect(StompFrame frame)
+   {
+      connection.destroy();
+      return null;
+   }
+
+   @Override
+   public StompFrame onSend(StompFrame frame)
+   {
+      StompFrame response = null;
+      try
+      {
+         connection.validate();
+         String destination = frame.getHeader(Stomp.Headers.Send.DESTINATION);
+         String txID = frame.getHeader(Stomp.Headers.TRANSACTION);
+
+         long timestamp = System.currentTimeMillis();
+
+         ServerMessageImpl message = connection.createServerMessage();
+         message.setTimestamp(timestamp);
+         message.setAddress(SimpleString.toSimpleString(destination));
+         StompUtils.copyStandardHeadersFromFrameToMessage(frame, message);
+         if (frame.hasHeader(Stomp.Headers.CONTENT_LENGTH))
+         {
+            message.setType(Message.BYTES_TYPE);
+            message.getBodyBuffer().writeBytes(frame.getBodyAsBytes());
+         }
+         else
+         {
+            message.setType(Message.TEXT_TYPE);
+            String text = frame.getBody();
+            message.getBodyBuffer().writeNullableSimpleString(SimpleString.toSimpleString(text));
+         }
+
+         connection.sendServerMessage(message, txID);
+      }
+      catch (HornetQStompException e)
+      {
+         response = e.getFrame();
+      }
+      catch (Exception e)
+      {
+         response = new HornetQStompException("Error handling send", e).getFrame();
+      }
+
+      return response;
+   }
+
+   @Override
+   public StompFrame onBegin(StompFrame frame)
+   {
+      StompFrame response = null;
+      String txID = frame.getHeader(Stomp.Headers.TRANSACTION);
+      if (txID == null)
+      {
+         response = new HornetQStompException("Need a transaction id to begin").getFrame();
+      }
+      else
+      {
+         try
+         {
+            connection.beginTransaction(txID);
+         }
+         catch (HornetQStompException e)
+         {
+            response = e.getFrame();
+         }
+      }
+      return response;
+   }
+
+   @Override
+   public StompFrame onCommit(StompFrame request)
+   {
+      StompFrame response = null;
+      
+      String txID = request.getHeader(Stomp.Headers.TRANSACTION);
+      if (txID == null)
+      {
+         response = new HornetQStompException("transaction header is mandatory to COMMIT a transaction").getFrame();
+         return response;
+      }
+
+      try
+      {
+         connection.commitTransaction(txID);
+      }
+      catch (HornetQStompException e)
+      {
+         response = e.getFrame();
+      }
+      return response;
+   }
+
+   @Override
+   public StompFrame onAbort(StompFrame request)
+   {
+      StompFrame response = null;
+      String txID = request.getHeader(Stomp.Headers.TRANSACTION);
+
+      if (txID == null)
+      {
+         response = new HornetQStompException("transaction header is mandatory to ABORT a transaction").getFrame();
+         return response;
+      }
+      
+      try
+      {
+         connection.abortTransaction(txID);
+      }
+      catch (HornetQStompException e)
+      {
+         response = e.getFrame();
+      }
+      
+      return response;
+   }
+
+   @Override
+   public StompFrame onSubscribe(StompFrame request)
+   {
+      StompFrame response = null;
+      String destination = request.getHeader(Stomp.Headers.Subscribe.DESTINATION);
+      
+      String selector = request.getHeader(Stomp.Headers.Subscribe.SELECTOR);
+      String ack = request.getHeader(Stomp.Headers.Subscribe.ACK_MODE);
+      String id = request.getHeader(Stomp.Headers.Subscribe.ID);
+      String durableSubscriptionName = request.getHeader(Stomp.Headers.Subscribe.DURABLE_SUBSCRIBER_NAME);
+      boolean noLocal = false;
+      
+      if (request.hasHeader(Stomp.Headers.Subscribe.NO_LOCAL))
+      {
+         noLocal = Boolean.parseBoolean(request.getHeader(Stomp.Headers.Subscribe.NO_LOCAL));
+      }
+      
+      try
+      {
+         connection.subscribe(destination, selector, ack, id, durableSubscriptionName, noLocal);
+      }
+      catch (HornetQStompException e)
+      {
+         response = e.getFrame();
+      }
+      
+      return response;
+   }
+
+   @Override
+   public StompFrame onUnsubscribe(StompFrame request)
+   {
+      StompFrame response = null;
+      String destination = request.getHeader(Stomp.Headers.Unsubscribe.DESTINATION);
+      String id = request.getHeader(Stomp.Headers.Unsubscribe.ID);
+
+      String subscriptionID = null;
+      if (id != null)
+      {
+         subscriptionID = id;
+      }
+      else
+      {
+         if (destination == null)
+         {
+            response = new HornetQStompException("Must specify the subscription's id or " +
+            		"the destination you are unsubscribing from").getFrame();
+            return response;
+         }
+         subscriptionID = "subscription/" + destination;
+      }
+      
+      try
+      {
+         connection.unsubscribe(subscriptionID);
+      }
+      catch (HornetQStompException e)
+      {
+         return e.getFrame();
+      }
+      return response;
+   }
+
+   @Override
+   public StompFrame onAck(StompFrame request)
+   {
+      StompFrame response = null;
+      
+      String messageID = request.getHeader(Stomp.Headers.Ack.MESSAGE_ID);
+      String txID = request.getHeader(Stomp.Headers.TRANSACTION);
+
+      if (txID != null)
+      {
+         log.warn("Transactional acknowledgement is not supported");
+      }
+      
+      try
+      {
+         connection.acknowledge(messageID, null);
+      }
+      catch (HornetQStompException e)
+      {
+         response = e.getFrame();
+      }
+
+      return response;
+   }
+
+   @Override
+   public StompFrame onStomp(StompFrame request)
+   {
+      return onUnknown(request.getCommand());
+   }
+
+   @Override
+   public StompFrame onNack(StompFrame request)
+   {
+      return onUnknown(request.getCommand());
+   }
+
+   @Override
+   public StompFrame createMessageFrame(ServerMessage serverMessage,
+         StompSubscription subscription, int deliveryCount) throws Exception
+   {
+      StompFrame frame = new StompFrame(Stomp.Responses.MESSAGE);
+      
+      if (subscription.getID() != null)
+      {
+         frame.addHeader(Stomp.Headers.Message.SUBSCRIPTION, subscription.getID());
+      }
+      HornetQBuffer buffer = serverMessage.getBodyBuffer();
+
+      int bodyPos = serverMessage.getEndOfBodyPosition() == -1 ? buffer.writerIndex()
+                                                              : serverMessage.getEndOfBodyPosition();
+      int size = bodyPos - buffer.readerIndex();
+      buffer.readerIndex(MessageImpl.BUFFER_HEADER_SPACE + DataConstants.SIZE_INT);
+      byte[] data = new byte[size];
+      if (serverMessage.containsProperty(Stomp.Headers.CONTENT_LENGTH) || serverMessage.getType() == Message.BYTES_TYPE)
+      {
+         frame.addHeader(Headers.CONTENT_LENGTH, String.valueOf(data.length));
+         buffer.readBytes(data);
+      }
+      else
+      {
+         SimpleString text = buffer.readNullableSimpleString();
+         if (text != null)
+         {
+            data = text.toString().getBytes("UTF-8");
+         }
+         else
+         {
+            data = new byte[0];
+         }
+      }
+      serverMessage.getBodyBuffer().resetReaderIndex();
+
+      StompUtils.copyStandardHeadersFromMessageToFrame(serverMessage, frame, deliveryCount);
+      
+      return frame;
+
+   }
+
+}

Added: branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v11/StompFrameHandlerV11.java
===================================================================
--- branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v11/StompFrameHandlerV11.java	                        (rev 0)
+++ branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v11/StompFrameHandlerV11.java	2011-09-05 00:34:29 UTC (rev 11293)
@@ -0,0 +1,362 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.hornetq.core.protocol.stomp.v11;
+
+import java.io.UnsupportedEncodingException;
+import java.util.Map;
+
+import org.hornetq.api.core.HornetQBuffer;
+import org.hornetq.api.core.Message;
+import org.hornetq.api.core.SimpleString;
+import org.hornetq.core.logging.Logger;
+import org.hornetq.core.message.impl.MessageImpl;
+import org.hornetq.core.protocol.stomp.HornetQStompException;
+import org.hornetq.core.protocol.stomp.Stomp;
+import org.hornetq.core.protocol.stomp.StompConnection;
+import org.hornetq.core.protocol.stomp.StompFrame;
+import org.hornetq.core.protocol.stomp.StompSubscription;
+import org.hornetq.core.protocol.stomp.StompUtils;
+import org.hornetq.core.protocol.stomp.VersionedStompFrameHandler;
+import org.hornetq.core.protocol.stomp.Stomp.Headers;
+import org.hornetq.core.protocol.stomp.v10.StompFrameHandlerV10;
+import org.hornetq.core.server.ServerMessage;
+import org.hornetq.core.server.impl.ServerMessageImpl;
+import org.hornetq.spi.core.security.HornetQSecurityManager;
+import org.hornetq.utils.DataConstants;
+
+public class StompFrameHandlerV11 extends VersionedStompFrameHandler
+{
+   private static final Logger log = Logger.getLogger(StompFrameHandlerV11.class);
+
+   public StompFrameHandlerV11(StompConnection connection)
+   {
+      this.connection = connection;
+   }
+
+   @Override
+   public StompFrame onConnect(StompFrame frame)
+   {
+      StompFrame response = null;
+      Map<String, String> headers = frame.getHeadersMap();
+      String login = headers.get(Stomp.Headers.Connect.LOGIN);
+      String passcode = headers.get(Stomp.Headers.Connect.PASSCODE);
+      String clientID = headers.get(Stomp.Headers.Connect.CLIENT_ID);
+      String requestID = headers.get(Stomp.Headers.Connect.REQUEST_ID);
+
+      if (connection.validateUser(login, passcode))
+      {
+         connection.setClientID(clientID);
+         connection.setValid(true);
+         
+         response = new StompFrame(Stomp.Responses.CONNECTED);
+         
+         //version
+         response.addHeader(Stomp.Headers.Connected.VERSION, connection.getVersion());
+         
+         //session
+         response.addHeader(Stomp.Headers.Connected.SESSION, connection.getID().toString());
+         
+         //server
+         response.addHeader(Stomp.Headers.Connected.SERVER, connection.getHornetQServerName());
+         
+         if (requestID != null)
+         {
+            response.addHeader(Stomp.Headers.Connected.RESPONSE_ID, requestID);
+         }
+      }
+      else
+      {
+         //not valid
+         response = new StompFrame(Stomp.Responses.ERROR);
+         response.addHeader(Stomp.Headers.Error.VERSION, "1.0,1.1");
+         
+         response.setBody("Supported protocol versions are 1.0 and 1.1");
+         
+         connection.sendFrame(response);
+         connection.destroy();
+         
+         return null;
+      }
+      return response;
+   }
+
+   @Override
+   public StompFrame onDisconnect(StompFrame frame)
+   {
+      connection.destroy();
+      return null;
+   }
+
+   @Override
+   public StompFrame onSend(StompFrame frame)
+   {
+      StompFrame response = null;
+      try
+      {
+         connection.validate();
+         String destination = frame.getHeader(Stomp.Headers.Send.DESTINATION);
+         String txID = frame.getHeader(Stomp.Headers.TRANSACTION);
+
+         long timestamp = System.currentTimeMillis();
+
+         ServerMessageImpl message = connection.createServerMessage();
+         message.setTimestamp(timestamp);
+         message.setAddress(SimpleString.toSimpleString(destination));
+         StompUtils.copyStandardHeadersFromFrameToMessage(frame, message);
+         if (frame.hasHeader(Stomp.Headers.CONTENT_LENGTH))
+         {
+            message.setType(Message.BYTES_TYPE);
+            message.getBodyBuffer().writeBytes(frame.getBodyAsBytes());
+         }
+         else
+         {
+            message.setType(Message.TEXT_TYPE);
+            String text = frame.getBody();
+            message.getBodyBuffer().writeNullableSimpleString(SimpleString.toSimpleString(text));
+         }
+
+         connection.sendServerMessage(message, txID);
+      }
+      catch (HornetQStompException e)
+      {
+         response = e.getFrame();
+      }
+      catch (Exception e)
+      {
+         response = new HornetQStompException("Error handling send", e).getFrame();
+      }
+
+      return response;
+   }
+
+   @Override
+   public StompFrame onBegin(StompFrame frame)
+   {
+      StompFrame response = null;
+      String txID = frame.getHeader(Stomp.Headers.TRANSACTION);
+      if (txID == null)
+      {
+         response = new HornetQStompException("Need a transaction id to begin").getFrame();
+      }
+      else
+      {
+         try
+         {
+            connection.beginTransaction(txID);
+         }
+         catch (HornetQStompException e)
+         {
+            response = e.getFrame();
+         }
+      }
+      return response;
+   }
+
+   @Override
+   public StompFrame onCommit(StompFrame request)
+   {
+      StompFrame response = null;
+      
+      String txID = request.getHeader(Stomp.Headers.TRANSACTION);
+      if (txID == null)
+      {
+         response = new HornetQStompException("transaction header is mandatory to COMMIT a transaction").getFrame();
+         return response;
+      }
+
+      try
+      {
+         connection.commitTransaction(txID);
+      }
+      catch (HornetQStompException e)
+      {
+         response = e.getFrame();
+      }
+      return response;
+   }
+
+   @Override
+   public StompFrame onAbort(StompFrame request)
+   {
+      StompFrame response = null;
+      String txID = request.getHeader(Stomp.Headers.TRANSACTION);
+
+      if (txID == null)
+      {
+         response = new HornetQStompException("transaction header is mandatory to ABORT a transaction").getFrame();
+         return response;
+      }
+      
+      try
+      {
+         connection.abortTransaction(txID);
+      }
+      catch (HornetQStompException e)
+      {
+         response = e.getFrame();
+      }
+      
+      return response;
+   }
+
+   @Override
+   public StompFrame onSubscribe(StompFrame request)
+   {
+      StompFrame response = null;
+      String destination = request.getHeader(Stomp.Headers.Subscribe.DESTINATION);
+      
+      String selector = request.getHeader(Stomp.Headers.Subscribe.SELECTOR);
+      String ack = request.getHeader(Stomp.Headers.Subscribe.ACK_MODE);
+      String id = request.getHeader(Stomp.Headers.Subscribe.ID);
+      String durableSubscriptionName = request.getHeader(Stomp.Headers.Subscribe.DURABLE_SUBSCRIBER_NAME);
+      boolean noLocal = false;
+      
+      if (request.hasHeader(Stomp.Headers.Subscribe.NO_LOCAL))
+      {
+         noLocal = Boolean.parseBoolean(request.getHeader(Stomp.Headers.Subscribe.NO_LOCAL));
+      }
+      
+      try
+      {
+         connection.subscribe(destination, selector, ack, id, durableSubscriptionName, noLocal);
+      }
+      catch (HornetQStompException e)
+      {
+         response = e.getFrame();
+      }
+      
+      return response;
+   }
+
+   @Override
+   public StompFrame onUnsubscribe(StompFrame request)
+   {
+      StompFrame response = null;
+      //unsubscribe in 1.1 only needs id header
+      String id = request.getHeader(Stomp.Headers.Unsubscribe.ID);
+
+      String subscriptionID = null;
+      if (id != null)
+      {
+         subscriptionID = id;
+      }
+      else
+      {
+          response = new HornetQStompException("Must specify the subscription's id").getFrame();
+          return response;
+      }
+      
+      try
+      {
+         connection.unsubscribe(subscriptionID);
+      }
+      catch (HornetQStompException e)
+      {
+         response = e.getFrame();
+      }
+      return response;
+   }
+
+   @Override
+   public StompFrame onAck(StompFrame request)
+   {
+      StompFrame response = null;
+      
+      String messageID = request.getHeader(Stomp.Headers.Ack.MESSAGE_ID);
+      String txID = request.getHeader(Stomp.Headers.TRANSACTION);
+      String subscriptionID = request.getHeader(Stomp.Headers.Ack.SUBSCRIPTION);
+
+      if (txID != null)
+      {
+         log.warn("Transactional acknowledgement is not supported");
+      }
+      
+      if (subscriptionID == null)
+      {
+         response = new HornetQStompException("subscription header is required").getFrame();
+         return response;
+      }
+      
+      try
+      {
+         connection.acknowledge(messageID, subscriptionID);
+      }
+      catch (HornetQStompException e)
+      {
+         response = e.getFrame();
+      }
+
+      return response;
+   }
+
+   @Override
+   public StompFrame onStomp(StompFrame request)
+   {
+      return onConnect(request);
+   }
+
+   @Override
+   public StompFrame onNack(StompFrame request)
+   {
+      //this eventually means discard the message (it never be redelivered again).
+      //we can consider supporting redeliver to a different sub.
+      return onAck(request);
+   }
+
+   @Override
+   public StompFrame createMessageFrame(ServerMessage serverMessage,
+         StompSubscription subscription, int deliveryCount)
+         throws Exception
+   {
+      StompFrame frame = new StompFrame(Stomp.Responses.MESSAGE);
+      
+      if (subscription.getID() != null)
+      {
+         frame.addHeader(Stomp.Headers.Message.SUBSCRIPTION, subscription.getID());
+      }
+      
+      HornetQBuffer buffer = serverMessage.getBodyBuffer();
+
+      int bodyPos = serverMessage.getEndOfBodyPosition() == -1 ? buffer.writerIndex()
+                                                              : serverMessage.getEndOfBodyPosition();
+      int size = bodyPos - buffer.readerIndex();
+      buffer.readerIndex(MessageImpl.BUFFER_HEADER_SPACE + DataConstants.SIZE_INT);
+      byte[] data = new byte[size];
+      if (serverMessage.containsProperty(Stomp.Headers.CONTENT_LENGTH) || serverMessage.getType() == Message.BYTES_TYPE)
+      {
+         frame.addHeader(Stomp.Headers.CONTENT_LENGTH, String.valueOf(data.length));
+         buffer.readBytes(data);
+      }
+      else
+      {
+         SimpleString text = buffer.readNullableSimpleString();
+         if (text != null)
+         {
+            data = text.toString().getBytes("UTF-8");
+         }
+         else
+         {
+            data = new byte[0];
+         }
+      }
+      frame.addHeader(Stomp.Headers.CONTENT_TYPE, "text/plain");
+      
+      serverMessage.getBodyBuffer().resetReaderIndex();
+
+      StompUtils.copyStandardHeadersFromMessageToFrame(serverMessage, frame, deliveryCount);
+      
+      return frame;
+
+   }
+
+}



More information about the hornetq-commits mailing list