[hornetq-commits] JBoss hornetq SVN: r9746 - in trunk: src/main/org/hornetq/core/protocol/stomp and 2 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Tue Oct 5 05:51:28 EDT 2010


Author: timfox
Date: 2010-10-05 05:51:27 -0400 (Tue, 05 Oct 2010)
New Revision: 9746

Removed:
   trunk/src/main/org/hornetq/core/protocol/stomp/StompFrameDecoder.java
Modified:
   trunk/docs/user-manual/en/interoperability.xml
   trunk/src/main/org/hornetq/core/protocol/stomp/StompConnection.java
   trunk/src/main/org/hornetq/core/protocol/stomp/StompFrame.java
   trunk/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java
   trunk/src/main/org/hornetq/core/protocol/stomp/StompSession.java
   trunk/src/main/org/hornetq/core/protocol/stomp/StompUtils.java
   trunk/src/main/org/hornetq/core/protocol/stomp/WebSocketStompFrameEncoder.java
   trunk/src/main/org/hornetq/core/remoting/impl/netty/NettyAcceptor.java
   trunk/tests/src/org/hornetq/tests/integration/stomp/StompTest.java
Log:
https://jira.jboss.org/browse/HORNETQ-544

Modified: trunk/docs/user-manual/en/interoperability.xml
===================================================================
--- trunk/docs/user-manual/en/interoperability.xml	2010-10-04 22:20:52 UTC (rev 9745)
+++ trunk/docs/user-manual/en/interoperability.xml	2010-10-05 09:51:27 UTC (rev 9746)
@@ -57,6 +57,18 @@
             When a Stomp client subscribes (or unsubscribes) for a destination (using a <literal>SUBSCRIBE</literal>
             or <literal>UNSUBSCRIBE</literal> frame), the destination is mapped to a HornetQ queue.</para>
         </section>
+      <section>
+        <title>STOMP and connection-ttl</title>
+        <para>Well behaved STOMP clients will always send a DISCONNECT frame before closing their connections. In this case the server
+          will clear up any server side resources such as sessions and consumers synchronously. However if STOMP clients exit without
+        sending a DISCONNECT frame or if they crash the server will have no way of knowing immediately whether the client is still alive
+        or not. STOMP connections therefore default to a connection-ttl value of 1 minute (see chapter on <link linkend="connection-ttl"
+          >connection-ttl</link> for more information. This value can be overridden using connection-ttl-override.
+        </para>
+        <note><para>Please note that the STOMP protocol does not contain any heartbeat frame. It is therefore the user's responsibility to make sure
+        data is sent within connection-ttl or the server will assume the client is dead and clean up server side resources.</para></note>
+      </section>
+      
         <section>
           <title>Stomp and JMS interoperabilty</title>
           <section>

Modified: trunk/src/main/org/hornetq/core/protocol/stomp/StompConnection.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/stomp/StompConnection.java	2010-10-04 22:20:52 UTC (rev 9745)
+++ trunk/src/main/org/hornetq/core/protocol/stomp/StompConnection.java	2010-10-05 09:51:27 UTC (rev 9746)
@@ -49,6 +49,13 @@
    private boolean valid;
 
    private boolean destroyed = false;
+   
+   private StompDecoder decoder = new StompDecoder();
+   
+   public StompDecoder getDecoder()
+   {
+      return decoder;
+   }
 
    StompConnection(final Connection transportConnection, final StompProtocolManager manager)
    {

Modified: trunk/src/main/org/hornetq/core/protocol/stomp/StompFrame.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/stomp/StompFrame.java	2010-10-04 22:20:52 UTC (rev 9745)
+++ trunk/src/main/org/hornetq/core/protocol/stomp/StompFrame.java	2010-10-05 09:51:27 UTC (rev 9746)
@@ -22,31 +22,40 @@
 
 import org.hornetq.api.core.HornetQBuffer;
 import org.hornetq.api.core.HornetQBuffers;
+import org.hornetq.core.logging.Logger;
 
 /**
  * Represents all the data in a STOMP frame.
  *
  * @author <a href="http://hiramchirino.com">chirino</a>
+ * @author Tim Fox
+ * 
  */
 class StompFrame
 {
+   private static final Logger log = Logger.getLogger(StompFrame.class);
+
    public static final byte[] NO_DATA = new byte[] {};
+
    private static final byte[] END_OF_FRAME = new byte[] { 0, '\n' };
 
    private final String command;
+
    private final Map<String, Object> headers;
+
    private final byte[] content;
-   
+
    private HornetQBuffer buffer = null;
+
    private int size;
-
+   
    public StompFrame(String command, Map<String, Object> headers, byte[] data)
    {
       this.command = command;
       this.headers = headers;
       this.content = data;
    }
-   
+
    public StompFrame(String command, Map<String, Object> headers)
    {
       this.command = command;
@@ -63,7 +72,7 @@
    {
       return content;
    }
-
+   
    public Map<String, Object> getHeaders()
    {
       return headers;
@@ -95,7 +104,8 @@
       out += new String(content);
       return out;
    }
-   
+
+ 
    public HornetQBuffer toHornetQBuffer() throws Exception
    {
       if (buffer == null)

Deleted: trunk/src/main/org/hornetq/core/protocol/stomp/StompFrameDecoder.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/stomp/StompFrameDecoder.java	2010-10-04 22:20:52 UTC (rev 9745)
+++ trunk/src/main/org/hornetq/core/protocol/stomp/StompFrameDecoder.java	2010-10-05 09:51:27 UTC (rev 9746)
@@ -1,209 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.hornetq.core.protocol.stomp;
-
-import java.io.IOException;
-import java.util.HashMap;
-
-import org.hornetq.api.core.HornetQBuffer;
-import org.hornetq.core.logging.Logger;
-
-/**
- * Implements marshalling and unmarsalling the <a href="http://stomp.codehaus.org/">Stomp</a> protocol.
- */
-class StompFrameDecoder
-{
-   private static final Logger log = Logger.getLogger(StompFrameDecoder.class);
-
-   private static final int MAX_COMMAND_LENGTH = 1024;
-
-   private static final int MAX_HEADER_LENGTH = 1024 * 10;
-
-   private static final int MAX_HEADERS = 1000;
-
-   private static final int MAX_DATA_LENGTH = 1024 * 1024 * 10;
-
-   public StompFrame decode(HornetQBuffer buffer)
-   {
-      try
-      {
-         String command = null;
-
-         // skip white space to next real action line
-         while (true) {
-            command = StompFrameDecoder.readLine(buffer, StompFrameDecoder.MAX_COMMAND_LENGTH, "The maximum command length was exceeded");
-             if (command == null) {
-                return null;
-             }
-             else {
-                command = command.trim();
-                 if (command.length() > 0) {
-                     break;
-                 }
-             }
-         }
-         
-         // Parse the headers
-         HashMap<String, Object> headers = new HashMap<String, Object>(25);
-         while (true)
-         {
-            String line = StompFrameDecoder.readLine(buffer, StompFrameDecoder.MAX_HEADER_LENGTH, "The maximum header length was exceeded");
-            if (line == null)
-            {
-               return null;
-            }
-
-            if (headers.size() > StompFrameDecoder.MAX_HEADERS)
-            {
-               throw new StompException("The maximum number of headers was exceeded", true);
-            }
-
-            if (line.trim().length() == 0)
-            {
-               break;
-            }
-
-            try
-            {
-               int seperator_index = line.indexOf(Stomp.Headers.SEPARATOR);
-               if (seperator_index == -1)
-               {
-                  return null;
-               }
-               String name = line.substring(0, seperator_index).trim();
-               String value = line.substring(seperator_index + 1, line.length()).trim();
-               headers.put(name, value);
-            }
-            catch (Exception e)
-            {
-               throw new StompException("Unable to parse header line [" + line + "]", true);
-            }
-         }
-         // Read in the data part.
-         byte[] data = StompFrame.NO_DATA;
-         String contentLength = (String)headers.get(Stomp.Headers.CONTENT_LENGTH);
-         if (contentLength != null)
-         {
-
-            // Bless the client, he's telling us how much data to read in.
-            int length;
-            try
-            {
-               length = Integer.parseInt(contentLength.trim());
-            }
-            catch (NumberFormatException e)
-            {
-               throw new StompException("Specified content-length is not a valid integer", true);
-            }
-
-            if (length > StompFrameDecoder.MAX_DATA_LENGTH)
-            {
-               throw new StompException("The maximum data length was exceeded", true);
-            }
-
-            if (buffer.readableBytes() < length)
-            {
-               return null;
-            }
-            
-            data = new byte[length];
-            buffer.readBytes(data);
-
-            if (!buffer.readable())
-            {
-               return null;
-            }
-            if (buffer.readByte() != 0)
-            {
-               throw new StompException(Stomp.Headers.CONTENT_LENGTH + " bytes were read and " +
-                                        "there was no trailing null byte", true);
-            }
-         }
-         else
-         {
-            byte[] body = new byte[StompFrameDecoder.MAX_DATA_LENGTH];
-            boolean bodyCorrectlyEnded = false;
-            int count = 0;
-            while (buffer.readable())
-            {
-               byte b = buffer.readByte();
-
-               if (b == (byte)'\0')
-               {
-                  bodyCorrectlyEnded = true;
-                  break;
-               }
-               else
-               {
-                  body[count++] = b;
-               }
-            }
-
-            if (!bodyCorrectlyEnded)
-            {
-               return null;
-            }
-
-            data = new byte[count];
-            System.arraycopy(body, 0, data, 0, count);
-         }
-
-         return new StompFrame(command, headers, data);
-      }
-      catch (IOException e)
-      {
-         log.error("Unable to decode stomp frame", e);
-         return null;
-      }
-   }
-   
-   private static String readLine(HornetQBuffer in, int maxLength, String errorMessage) throws IOException
-   {
-      char[] chars = new char[MAX_HEADER_LENGTH];
-
-      if (!in.readable())
-      {
-         return null;
-      }
-      
-      boolean properString = false;
-      int count = 0;
-      while (in.readable())
-      {
-         byte b = in.readByte();
-
-         if (b == (byte)'\n')
-         {
-            properString = true;
-            break;
-         }
-         else
-         {
-            chars[count++] = (char)b;
-         }
-      }
-      if (properString)
-      {
-         return new String(chars, 0, count);
-      }
-      else
-      {
-         return null;
-      }
-   }
-}

Modified: trunk/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java	2010-10-04 22:20:52 UTC (rev 9745)
+++ trunk/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java	2010-10-05 09:51:27 UTC (rev 9746)
@@ -59,8 +59,6 @@
 
    private final HornetQServer server;
 
-   private final StompFrameDecoder frameDecoder;
-
    private final Executor executor;
 
    private final Map<String, StompSession> transactedSessions = new HashMap<String, StompSession>();
@@ -105,7 +103,6 @@
    public StompProtocolManager(final HornetQServer server, final List<Interceptor> interceptors)
    {
       this.server = server;
-      this.frameDecoder = new StompFrameDecoder();
       this.executor = server.getExecutorFactory().getExecutor();
    }
 
@@ -115,8 +112,9 @@
    {
       StompConnection conn = new StompConnection(connection, this);
 
-      //Note that STOMP has no heartbeat, so if connection ttl is non zero, data must continue to be sent or connection will be timed out and closed!
-      
+      // Note that STOMP has no heartbeat, so if connection ttl is non zero, data must continue to be sent or connection
+      // will be timed out and closed!
+
       long ttl = server.getConfiguration().getConnectionTTLOverride();
 
       if (ttl != -1)
@@ -127,7 +125,7 @@
       {
          // Default to 1 minute - which is same as core protocol
          return new ConnectionEntry(conn, System.currentTimeMillis(), 1 * 60 * 1000);
-      }            
+      }
    }
 
    public void removeHandler(String name)
@@ -136,121 +134,123 @@
 
    public int isReadyToHandle(HornetQBuffer buffer)
    {
-      int start = buffer.readerIndex();
+      // This never gets called
 
-      StompFrame frame = frameDecoder.decode(buffer);
-
-      if (frame == null)
-      {
-         return -1;
-      }
-      else
-      {
-         return buffer.readerIndex() - start;
-      }
+      return -1;
    }
 
    public void handleBuffer(final RemotingConnection connection, final HornetQBuffer buffer)
    {
-      try
-      {
-         doHandleBuffer(connection, buffer);
-      }
-      finally
-      {
-         server.getStorageManager().clearContext();
-      }
-   }
+      StompConnection conn = (StompConnection)connection;
+      
+      StompDecoder decoder = conn.getDecoder();
 
-   private void doHandleBuffer(final RemotingConnection connection, final HornetQBuffer buffer)
-   {
-      StompConnection conn = (StompConnection)connection;
-      StompFrame request = null;
-      try
+      do
       {
-         request = frameDecoder.decode(buffer);
-         if (log.isTraceEnabled())
+         StompFrame request;
+         
+         try
          {
-            log.trace("received " + request);
+            request = decoder.decode(buffer);
          }
+         catch (Exception e)
+         {
+            log.error("Failed to decode", e);
 
-         String command = request.getCommand();
-         StompFrame response = null;
-
-         if (Stomp.Commands.CONNECT.equals(command))
-         {
-            response = onConnect(request, conn);
+            return;
          }
-         else if (Stomp.Commands.DISCONNECT.equals(command))
+         
+         if (request == null)
          {
-            response = onDisconnect(request, conn);
+            return;
          }
-         else if (Stomp.Commands.SEND.equals(command))
+
+         try
          {
-            response = onSend(request, conn);
-         }
-         else if (Stomp.Commands.SUBSCRIBE.equals(command))
-         {
-            response = onSubscribe(request, conn);
-         }
-         else if (Stomp.Commands.UNSUBSCRIBE.equals(command))
-         {
-            response = onUnsubscribe(request, conn);
-         }
-         else if (Stomp.Commands.ACK.equals(command))
-         {
-            response = onAck(request, conn);
-         }
-         else if (Stomp.Commands.BEGIN.equals(command))
-         {
-            response = onBegin(request, server, conn);
-         }
-         else if (Stomp.Commands.COMMIT.equals(command))
-         {
-            response = onCommit(request, conn);
-         }
-         else if (Stomp.Commands.ABORT.equals(command))
-         {
-            response = onAbort(request, conn);
-         }
-         else
-         {
-            log.error("Unsupported Stomp frame: " + request);
-            response = new StompFrame(Stomp.Responses.ERROR,
-                                      new HashMap<String, Object>(),
-                                      ("Unsupported frame: " + command).getBytes());
-         }
+            String command = request.getCommand();
 
-         if (request.getHeaders().containsKey(Stomp.Headers.RECEIPT_REQUESTED))
-         {
-            if (response == null)
+            StompFrame response = null;
+
+            if (Stomp.Commands.CONNECT.equals(command))
             {
-               Map<String, Object> h = new HashMap<String, Object>();
-               response = new StompFrame(Stomp.Responses.RECEIPT, h);
+               response = onConnect(request, conn);
             }
-            response.getHeaders().put(Stomp.Headers.Response.RECEIPT_ID,
-                                      request.getHeaders().get(Stomp.Headers.RECEIPT_REQUESTED));
-         }
+            else if (Stomp.Commands.DISCONNECT.equals(command))
+            {
+               response = onDisconnect(request, conn);
+            }
+            else if (Stomp.Commands.SEND.equals(command))
+            {
+               response = onSend(request, conn);
+            }
+            else if (Stomp.Commands.SUBSCRIBE.equals(command))
+            {
+               response = onSubscribe(request, conn);
+            }
+            else if (Stomp.Commands.UNSUBSCRIBE.equals(command))
+            {
+               response = onUnsubscribe(request, conn);
+            }
+            else if (Stomp.Commands.ACK.equals(command))
+            {
+               response = onAck(request, conn);
+            }
+            else if (Stomp.Commands.BEGIN.equals(command))
+            {
+               response = onBegin(request, server, conn);
+            }
+            else if (Stomp.Commands.COMMIT.equals(command))
+            {
+               response = onCommit(request, conn);
+            }
+            else if (Stomp.Commands.ABORT.equals(command))
+            {
+               response = onAbort(request, conn);
+            }
+            else
+            {
+               log.error("Unsupported Stomp frame: " + request);
+               response = new StompFrame(Stomp.Responses.ERROR,
+                                         new HashMap<String, Object>(),
+                                         ("Unsupported frame: " + command).getBytes());
+            }
 
-         if (response != null)
-         {
-            sendReply(conn, response);
+            if (request.getHeaders().containsKey(Stomp.Headers.RECEIPT_REQUESTED))
+            {
+               log.info("receipt requested");
+               if (response == null)
+               {
+                  Map<String, Object> h = new HashMap<String, Object>();
+                  response = new StompFrame(Stomp.Responses.RECEIPT, h);
+               }
+               response.getHeaders().put(Stomp.Headers.Response.RECEIPT_ID,
+                                         request.getHeaders().get(Stomp.Headers.RECEIPT_REQUESTED));
+            }
+
+            if (response != null)
+            {
+               sendReply(conn, response);
+            }
+
+            if (Stomp.Commands.DISCONNECT.equals(command))
+            {
+               conn.destroy();
+            }
          }
-
-         if (Stomp.Commands.DISCONNECT.equals(command))
+         catch (Exception e)
          {
-            conn.destroy();
+            e.printStackTrace();
+            StompFrame error = createError(e, request);
+            if (error != null)
+            {
+               sendReply(conn, error);
+            }
          }
-      }
-      catch (Exception e)
-      {
-         e.printStackTrace();
-         StompFrame error = createError(e, request);
-         if (error != null)
+         finally
          {
-            sendReply(conn, error);
+            server.getStorageManager().clearContext();
          }
-      }
+      } while (decoder.hasBytes());
    }
 
    // Public --------------------------------------------------------
@@ -466,7 +466,8 @@
       StompSession stompSession = sessions.get(connection.getID());
       if (stompSession == null)
       {
-         stompSession = new StompSession(connection, this, server.getStorageManager().newContext(server.getExecutorFactory().getExecutor()));
+         stompSession = new StompSession(connection, this, server.getStorageManager()
+                                                                 .newContext(server.getExecutorFactory().getExecutor()));
          String name = UUIDGenerator.getInstance().generateStringUUID();
          ServerSession session = server.createSession(name,
                                                       connection.getLogin(),
@@ -516,7 +517,7 @@
       cleanup(connection);
       return null;
    }
-
+   
    private StompFrame onSend(StompFrame frame, StompConnection connection) throws Exception
    {
       checkConnected(connection);
@@ -554,7 +555,8 @@
       {
          message.putStringProperty(CONNECTION_ID_PROP, connection.getID().toString());
       }
-      stompSession.getSession().send(message, true);
+      stompSession.getSession().send(message, true);           
+      
       return null;
    }
 

Modified: trunk/src/main/org/hornetq/core/protocol/stomp/StompSession.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/stomp/StompSession.java	2010-10-04 22:20:52 UTC (rev 9745)
+++ trunk/src/main/org/hornetq/core/protocol/stomp/StompSession.java	2010-10-05 09:51:27 UTC (rev 9746)
@@ -93,7 +93,7 @@
          HornetQBuffer buffer = serverMessage.getBodyBuffer();
 
          int bodyPos = serverMessage.getEndOfBodyPosition() == -1 ? buffer.writerIndex()
-                                                                  : serverMessage.getEndOfBodyPosition();
+                                                                 : serverMessage.getEndOfBodyPosition();
          int size = bodyPos - buffer.readerIndex();
          buffer.readerIndex(MessageImpl.BUFFER_HEADER_SPACE + DataConstants.SIZE_INT);
          byte[] data = new byte[size];
@@ -108,7 +108,8 @@
             if (text != null)
             {
                data = text.toString().getBytes("UTF-8");
-            } else
+            }
+            else
             {
                data = new byte[0];
             }

Modified: trunk/src/main/org/hornetq/core/protocol/stomp/StompUtils.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/stomp/StompUtils.java	2010-10-04 22:20:52 UTC (rev 9745)
+++ trunk/src/main/org/hornetq/core/protocol/stomp/StompUtils.java	2010-10-05 09:51:27 UTC (rev 9746)
@@ -21,6 +21,7 @@
 import org.hornetq.api.core.Message;
 import org.hornetq.api.core.SimpleString;
 import org.hornetq.core.client.impl.ClientMessageImpl;
+import org.hornetq.core.logging.Logger;
 import org.hornetq.core.message.impl.MessageInternal;
 import org.hornetq.core.server.impl.ServerMessageImpl;
 
@@ -34,7 +35,10 @@
 class StompUtils
 {
    // Constants -----------------------------------------------------
+   
+   private static final Logger log = Logger.getLogger(StompUtils.class);
 
+
    // Attributes ----------------------------------------------------
 
    // Static --------------------------------------------------------
@@ -53,6 +57,7 @@
       {
          msg.setDurable(Boolean.parseBoolean(persistent));
       }
+      
       // FIXME should use a proper constant
       msg.putObjectProperty("JMSCorrelationID", headers.remove(Stomp.Headers.Send.CORRELATION_ID));
       msg.putObjectProperty("JMSType", headers.remove(Stomp.Headers.Send.TYPE));

Modified: trunk/src/main/org/hornetq/core/protocol/stomp/WebSocketStompFrameEncoder.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/stomp/WebSocketStompFrameEncoder.java	2010-10-04 22:20:52 UTC (rev 9745)
+++ trunk/src/main/org/hornetq/core/protocol/stomp/WebSocketStompFrameEncoder.java	2010-10-05 09:51:27 UTC (rev 9746)
@@ -38,20 +38,25 @@
  */
 public class WebSocketStompFrameEncoder extends OneToOneEncoder
 {
-
-   private final StompFrameDecoder decoder = new StompFrameDecoder();
-   
    @Override
    protected Object encode(ChannelHandlerContext ctx, Channel channel, Object msg) throws Exception
    {
       
       if (msg instanceof ChannelBuffer)
       {
+         // FIXME - this is a silly way to do this - a better way to do this would be to create a new protocol, with protocol manager etc
+         // and re-use some of the STOMP codec stuff - Tim
+         
+         
          // this is ugly and slow!
          // we have to go ChannelBuffer -> HornetQBuffer -> StompFrame -> String -> WebSocketFrame
          // since HornetQ protocol SPI requires to return HornetQBuffer to the transport
          HornetQBuffer buffer = new ChannelBufferWrapper((ChannelBuffer)msg);
+         
+         StompDecoder decoder = new StompDecoder();
+         
          StompFrame frame = decoder.decode(buffer);
+         
          if (frame != null)
          {
             WebSocketFrame wsFrame = new DefaultWebSocketFrame(frame.asString());

Modified: trunk/src/main/org/hornetq/core/remoting/impl/netty/NettyAcceptor.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/impl/netty/NettyAcceptor.java	2010-10-04 22:20:52 UTC (rev 9745)
+++ trunk/src/main/org/hornetq/core/remoting/impl/netty/NettyAcceptor.java	2010-10-05 09:51:27 UTC (rev 9746)
@@ -356,8 +356,8 @@
             if (protocol == ProtocolType.CORE)
             {
                // Core protocol uses its own optimised decoder
-               
-               handlers.put("hornetq-decode", new HornetQFrameDecoder2());
+
+               handlers.put("hornetq-decoder", new HornetQFrameDecoder2());
             }
             else if (protocol == ProtocolType.STOMP_WS)
             {
@@ -367,13 +367,17 @@
                handlers.put("hornetq-decoder", new HornetQFrameDecoder(decoder));
                handlers.put("websocket-handler", new WebSocketServerHandler());
             }
+            else if (protocol == ProtocolType.STOMP)
+            {
+               //With STOMP the decoding is handled in the StompFrame class
+            }
             else
             {
-                handlers.put("hornetq-decoder", new HornetQFrameDecoder(decoder));
+               handlers.put("hornetq-decoder", new HornetQFrameDecoder(decoder));
             }
 
             handlers.put("handler", new HornetQServerChannelHandler(channelGroup, handler, new Listener()));
-            
+
             /**
              * STOMP_WS protocol mandates use of named handlers to be able to replace http codecs
              * by websocket codecs after handshake.

Modified: trunk/tests/src/org/hornetq/tests/integration/stomp/StompTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/stomp/StompTest.java	2010-10-04 22:20:52 UTC (rev 9745)
+++ trunk/tests/src/org/hornetq/tests/integration/stomp/StompTest.java	2010-10-05 09:51:27 UTC (rev 9746)
@@ -90,7 +90,7 @@
 
    private JMSServerManager server;
    
-   public void _testSendManyMessages() throws Exception
+   public void testSendManyMessages() throws Exception
    {
       MessageConsumer consumer = session.createConsumer(queue);
 
@@ -106,7 +106,7 @@
 
          public void onMessage(Message arg0)
          {
-            System.out.println("<<< " + (1000 - latch.getCount()));
+            //System.out.println("<<< " + (1000 - latch.getCount()));
             latch.countDown();
          }
       });
@@ -115,12 +115,40 @@
       for (int i = 1; i <= count; i++)
       {
          // Thread.sleep(1);
-         System.out.println(">>> " + i);
+         //System.out.println(">>> " + i);
          sendFrame(frame);
       }
 
       assertTrue(latch.await(60, TimeUnit.SECONDS));
+   }
+   
+   public void testPerf() throws Exception
+   {
+      String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL;
+      sendFrame(frame);
+      frame = receiveFrame(10000);
 
+      Assert.assertTrue(frame.startsWith("CONNECTED"));
+            
+      int count = 100000;
+      
+      frame = "SEND\n" + "destination:" + getQueuePrefix() + getQueueName() + "\n\n" + "ABCDJIMTEST<GRV>http://techcrunch.com/2010/09/23/thelikestream-digg-for-facebook-likes/<GRV>0" + Stomp.NULL;
+      
+      long start = System.currentTimeMillis();
+      
+      for (int i = 1; i <= count; i++)
+      {
+         sendFrame(frame);
+         
+         if (i % 1000 == 0)
+         {
+            log.info("Sent " + i);
+         }
+      }
+      
+      long end = System.currentTimeMillis();
+
+      log.info("That took " + (end-start));
    }
 
    public void testConnect() throws Exception
@@ -185,7 +213,7 @@
       frame = receiveFrame(10000);
       Assert.assertTrue(frame.startsWith("CONNECTED"));
 
-      frame = "\nSEND\n" + "destination:" + getQueuePrefix() + getQueueName() + "\n\n" + "Hello World" + Stomp.NULL;
+      frame = "SEND\n" + "destination:" + getQueuePrefix() + getQueueName() + "\n\n" + "Hello World" + Stomp.NULL;
 
       sendFrame(frame);
 
@@ -199,7 +227,38 @@
       long tmsg = message.getJMSTimestamp();
       Assert.assertTrue(Math.abs(tnow - tmsg) < 1000);
    }
+   
+   /*
+    * Some STOMP clients erroneously put a new line \n *after* the terminating NUL char at the end of the frame
+    * This means next frame read might have a \n a the beginning.
+    * This is contrary to STOMP spec but we deal with it so we can work nicely with crappy STOMP clients
+    */
+   public void testSendMessageWithLeadingNewLine() throws Exception
+   {
 
+      MessageConsumer consumer = session.createConsumer(queue);
+
+      String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL + "\n";
+      sendFrame(frame);
+
+      frame = receiveFrame(10000);
+      Assert.assertTrue(frame.startsWith("CONNECTED"));
+
+      frame = "SEND\n" + "destination:" + getQueuePrefix() + getQueueName() + "\n\n" + "Hello World" + Stomp.NULL + "\n";
+
+      sendFrame(frame);
+
+      TextMessage message = (TextMessage)consumer.receive(1000);
+      Assert.assertNotNull(message);
+      Assert.assertEquals("Hello World", message.getText());
+      
+      // Make sure that the timestamp is valid - should
+      // be very close to the current time.
+      long tnow = System.currentTimeMillis();
+      long tmsg = message.getJMSTimestamp();
+      Assert.assertTrue(Math.abs(tnow - tmsg) < 1000);
+   }
+
    public void testSendMessageWithReceipt() throws Exception
    {
 



More information about the hornetq-commits mailing list