[hornetq-commits] JBoss hornetq SVN: r11351 - in branches/STOMP11: hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v10 and 3 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Thu Sep 15 10:34:00 EDT 2011


Author: gaohoward
Date: 2011-09-15 10:34:00 -0400 (Thu, 15 Sep 2011)
New Revision: 11351

Modified:
   branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/HornetQStompException.java
   branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompConnection.java
   branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompDecoder.java
   branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompFrame.java
   branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/VersionedStompFrameHandler.java
   branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v10/StompFrameHandlerV10.java
   branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v11/StompFrameHandlerV11.java
   branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v11/StompFrameV11.java
   branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/StompClientConnectionV11.java
   branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/StompFrameFactoryV11.java
   branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/v11/StompTestV11.java
Log:
tests


Modified: branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/HornetQStompException.java
===================================================================
--- branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/HornetQStompException.java	2011-09-15 06:25:53 UTC (rev 11350)
+++ branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/HornetQStompException.java	2011-09-15 14:34:00 UTC (rev 11351)
@@ -27,6 +27,7 @@
    private List<Header> headers = new ArrayList<Header>(10);
    private String body;
    private VersionedStompFrameHandler handler;
+   private boolean disconnect;
    
    public HornetQStompException(StompConnection connection, String msg)
    {
@@ -85,6 +86,7 @@
          frame = handler.createStompFrame("ERROR");
          frame.addHeader("message", this.getMessage());
       }
+      frame.setNeedsDisconnect(disconnect);
       return frame;
    }
 
@@ -99,4 +101,9 @@
          this.val = val;
       }
    }
+
+   public void setDisconnect(boolean b)
+   {
+      disconnect = b;
+   }
 }

Modified: branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompConnection.java
===================================================================
--- branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompConnection.java	2011-09-15 06:25:53 UTC (rev 11350)
+++ branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompConnection.java	2011-09-15 14:34:00 UTC (rev 11351)
@@ -371,8 +371,6 @@
    {
       String acceptVersion = frame.getHeader(Stomp.Headers.ACCEPT_VERSION);
       
-      log.error("----------------- acceptVersion: " + acceptVersion);
-      
       if (acceptVersion == null)
       {
          this.version = StompVersions.V1_0;
@@ -401,6 +399,7 @@
             error.addHeader("version", acceptVersion);
             error.addHeader("content-type", "text/plain");
             error.setBody("Supported protocol version are " + manager.getSupportedVersionsAsString());
+            error.setDisconnect(true);
             throw error;
          }
          log.error("------------------ negotiated version is " + this.version);
@@ -438,11 +437,11 @@
          stompListener.requestAccepted(request);
       }
 
+      String cmd = request.getCommand();
       try
       {
          if (!initialized)
          {
-            String cmd = request.getCommand();
             if ( ! (Stomp.Commands.CONNECT.equals(cmd) || Stomp.Commands.STOMP.equals(cmd)))
             {
                throw new HornetQStompException("Connection hasn't been established.");
@@ -461,6 +460,11 @@
       {
          sendFrame(reply);
       }
+      
+      if (Stomp.Commands.DISCONNECT.equals(cmd))
+      {
+         this.disconnect();
+      }
    }
 
    public void sendFrame(StompFrame frame)

Modified: branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompDecoder.java
===================================================================
--- branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompDecoder.java	2011-09-15 06:25:53 UTC (rev 11350)
+++ branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompDecoder.java	2011-09-15 14:34:00 UTC (rev 11351)
@@ -115,6 +115,8 @@
    public static final byte U = (byte)'U';
 
    public static final byte N = (byte)'N';
+   
+   public static final byte LN = (byte)'n';
 
    public static final byte HEADER_SEPARATOR = (byte)':';
 

Modified: branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompFrame.java
===================================================================
--- branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompFrame.java	2011-09-15 06:25:53 UTC (rev 11350)
+++ branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompFrame.java	2011-09-15 14:34:00 UTC (rev 11351)
@@ -177,11 +177,15 @@
 
       public String getEscapedKey()
       {
+         log.error("----------------key is : |" + key + "|");
+         log.error("----------------esc'd: |" + escape(key) + "|");
          return escape(key);
       }
 
       public String getEscapedValue()
       {
+         log.error("----------------val is : |" + val + "|");
+         log.error("----------------esc'd v: |" + escape(val) + "|");
          return escape(val);
       }
       

Modified: branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/VersionedStompFrameHandler.java
===================================================================
--- branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/VersionedStompFrameHandler.java	2011-09-15 06:25:53 UTC (rev 11350)
+++ branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/VersionedStompFrameHandler.java	2011-09-15 14:34:00 UTC (rev 11351)
@@ -134,10 +134,7 @@
       return receipt;
    }
    
-   public StompFrame postprocess(StompFrame request)
-   {
-      return null;
-   }
+   public abstract StompFrame postprocess(StompFrame request);
 
    public abstract StompFrame createMessageFrame(ServerMessage serverMessage,
          StompSubscription subscription, int deliveryCount) throws Exception;

Modified: branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v10/StompFrameHandlerV10.java
===================================================================
--- branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v10/StompFrameHandlerV10.java	2011-09-15 06:25:53 UTC (rev 11350)
+++ branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v10/StompFrameHandlerV10.java	2011-09-15 14:34:00 UTC (rev 11351)
@@ -104,7 +104,6 @@
    @Override
    public StompFrame onDisconnect(StompFrame frame)
    {
-      connection.destroy();
       return null;
    }
 
@@ -398,5 +397,20 @@
       // TODO Auto-generated method stub
       
    }
+   
+   @Override
+   public StompFrame postprocess(StompFrame request)
+   {
+      StompFrame response = null;
+      if (request.hasHeader(Stomp.Headers.RECEIPT_REQUESTED))
+      {
+         response = handleReceipt(request.getHeader(Stomp.Headers.RECEIPT_REQUESTED));
+         if (request.getCommand().equals(Stomp.Commands.DISCONNECT))
+         {
+            response.setNeedsDisconnect(true);
+         }
+      }
+      return response;
+   }
 
 }

Modified: branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v11/StompFrameHandlerV11.java
===================================================================
--- branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v11/StompFrameHandlerV11.java	2011-09-15 06:25:53 UTC (rev 11350)
+++ branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v11/StompFrameHandlerV11.java	2011-09-15 14:34:00 UTC (rev 11351)
@@ -387,7 +387,7 @@
          StompSubscription subscription, int deliveryCount)
          throws Exception
    {
-      StompFrame frame = new StompFrame(Stomp.Responses.MESSAGE);
+      StompFrame frame = new StompFrameV11(Stomp.Responses.MESSAGE);
       
       if (subscription.getID() != null)
       {
@@ -403,7 +403,7 @@
       byte[] data = new byte[size];
       if (serverMessage.containsProperty(Stomp.Headers.CONTENT_LENGTH) || serverMessage.getType() == Message.BYTES_TYPE)
       {
-         frame.addHeader(Stomp.Headers.CONTENT_LENGTH, String.valueOf(data.length));
+         frame.addHeader(Stomp.Headers.CONTENT_LENGTH, String.valueOf(data.length > 0 ? (data.length - 1) : data.length));
          buffer.readBytes(data);
       }
       else
@@ -417,7 +417,6 @@
          {
             data = new byte[0];
          }
-         frame.addHeader(Stomp.Headers.CONTENT_TYPE, "text/plain");
       }
       
       frame.setByteBody(data);
@@ -426,6 +425,8 @@
 
       StompUtils.copyStandardHeadersFromMessageToFrame(serverMessage, frame, deliveryCount);
       
+      log.error("-------------------- frame created: " + frame);
+      
       return frame;
 
    }
@@ -888,6 +889,19 @@
 
                   break;
                }
+               case StompDecoder.LN:
+               {
+                  if (isEscaping)
+                  {
+                     holder.append(StompDecoder.NEW_LINE);
+                     isEscaping = false;
+                  }
+                  else
+                  {
+                     holder.append(b);
+                  }
+                  break;
+               }
                case StompDecoder.NEW_LINE:
                {
                   if (decoder.whiteSpaceOnly)

Modified: branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v11/StompFrameV11.java
===================================================================
--- branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v11/StompFrameV11.java	2011-09-15 06:25:53 UTC (rev 11350)
+++ branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v11/StompFrameV11.java	2011-09-15 14:34:00 UTC (rev 11351)
@@ -71,6 +71,8 @@
          }
          // Add a newline to separate the headers from the content.
          head.append(Stomp.NEWLINE);
+         
+         log.error("------------------------_______now head: " + head);
 
          buffer.writeBytes(head.toString().getBytes("UTF-8"));
          if (bytesBody != null)
@@ -91,8 +93,12 @@
       if (!headers.containsKey(key))
       {
          headers.put(key, val);
+         allHeaders.add(new Header(key, val));
       }
-      allHeaders.add(new Header(key, val));
+      else if (!key.equals(Stomp.Headers.CONTENT_LENGTH))
+      {
+         allHeaders.add(new Header(key, val));
+      }
    }
 
 

Modified: branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/StompClientConnectionV11.java
===================================================================
--- branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/StompClientConnectionV11.java	2011-09-15 06:25:53 UTC (rev 11350)
+++ branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/StompClientConnectionV11.java	2011-09-15 14:34:00 UTC (rev 11351)
@@ -94,9 +94,15 @@
    public void disconnect() throws IOException, InterruptedException
    {
       ClientStompFrame frame = factory.newFrame(DISCONNECT_COMMAND);
+      frame.addHeader("receipt", "1");
       
       ClientStompFrame result = this.sendFrame(frame);
       
+      if (result == null || (!"RECEIPT".equals(result.getCommand())) || (!"1".equals(result.getHeader("receipt-id"))))
+      {
+         throw new IOException("Disconnect failed! " + result);
+      }
+      
       close();
       
       connected = false;

Modified: branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/StompFrameFactoryV11.java
===================================================================
--- branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/StompFrameFactoryV11.java	2011-09-15 06:25:53 UTC (rev 11350)
+++ branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/StompFrameFactoryV11.java	2011-09-15 14:34:00 UTC (rev 11351)
@@ -12,8 +12,12 @@
  */
 package org.hornetq.tests.integration.stomp.util;
 
+import java.io.UnsupportedEncodingException;
 import java.util.StringTokenizer;
 
+import org.hornetq.core.protocol.stomp.HornetQStompException;
+import org.hornetq.core.protocol.stomp.StompDecoder;
+
 /**
  * 
  * @author <a href="mailto:hgao at redhat.com">Howard Gao</a>
@@ -40,11 +44,17 @@
 
    @Override
 
-   public ClientStompFrame createFrame(String data)
+   public ClientStompFrame createFrame(final String data)
    {
+      System.out.println("Data: |" + data + "|");
       //split the string at "\n\n"
       String[] dataFields = data.split("\n\n");
       
+      System.out.println("DataFields[0] |" + dataFields[0]);
+      if (dataFields.length > 1)
+      {
+         System.out.println("DataFields[1] |" + dataFields[1]);
+      }
       StringTokenizer tokenizer = new StringTokenizer(dataFields[0], "\n");
       
       String command = tokenizer.nextToken();
@@ -53,7 +63,8 @@
       while (tokenizer.hasMoreTokens())
       {
          String header = tokenizer.nextToken();
-         String[] fields = header.split(":");
+         System.out.println("header is: " + header);
+         String[] fields = splitHeader(header);
          frame.addHeader(fields[0], fields[1]);
       }
       
@@ -64,7 +75,111 @@
       }
       return frame;
    }
+   
+   //find true :
+   private String[] splitHeader(String header)
+   {
+      StringBuffer sbKey = new StringBuffer();
+      StringBuffer sbVal = new StringBuffer();
+      boolean isEsc = false;
+      boolean isKey = true;
+      
+      for (int i = 0; i < header.length(); i++)
+      {
+         char b = header.charAt(i);
 
+         switch (b)
+         {
+            //escaping
+            case '\\':
+            {
+               if (isEsc)
+               {
+                  //this is a backslash
+                  if (isKey)
+                  {
+                     sbKey.append(b);
+                  }
+                  else
+                  {
+                     sbVal.append(b);
+                  }
+                  isEsc = false;
+               }
+               else
+               {
+                  //begin escaping
+                  isEsc = true;
+               }
+               break;
+            }
+            case ':':
+            {
+               if (isEsc)
+               {
+                  if (isKey)
+                  {
+                     sbKey.append(b);
+                  }
+                  else
+                  {
+                     sbVal.append(b);
+                  }
+                  isEsc = false;
+               }
+               else
+               {
+                  isKey = false;
+               }
+               break;
+            }
+            case 'n':
+            {
+               if (isEsc)
+               {
+                  if (isKey)
+                  {
+                     sbKey.append('\n');
+                  }
+                  else
+                  {
+                     sbVal.append('\n');
+                  }
+                  isEsc = false;
+               }
+               else
+               {
+                  if (isKey)
+                  {
+                     sbKey.append(b);
+                  }
+                  else
+                  {
+                     sbVal.append(b);
+                  }
+               }
+               break;
+            }
+            default:
+            {
+               if (isKey)
+               {
+                  sbKey.append(b);
+               }
+               else
+               {
+                  sbVal.append(b);
+               }
+            }
+         }
+      }
+      String[] result = new String[2];
+      result[0] = sbKey.toString();
+      result[1] = sbVal.toString();
+      
+      return result;
+   }
+
    @Override
    public ClientStompFrame newFrame(String command)
    {

Modified: branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/v11/StompTestV11.java
===================================================================
--- branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/v11/StompTestV11.java	2011-09-15 06:25:53 UTC (rev 11350)
+++ branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/v11/StompTestV11.java	2011-09-15 14:34:00 UTC (rev 11351)
@@ -96,7 +96,7 @@
       
       //reply headers: version, session, server
       assertEquals(null, reply.getHeader("version"));
-      
+
       connV11.disconnect();
 
       // case 2 accept-version=1.0, result: 1.0
@@ -164,7 +164,6 @@
       
       System.out.println("Got error frame " + reply);
       
-      connV11.disconnect();
    }
    
    public void testSendAndReceive() throws Exception
@@ -303,6 +302,54 @@
       newConn.disconnect();
    }
 
+   public void testHeaderEncoding() throws Exception
+   {
+      connV11.connect(defUser, defPass);
+      ClientStompFrame frame = connV11.createFrame("SEND");
+      
+      String body = "Hello World 1!";
+      String cLen = String.valueOf(body.getBytes("UTF-8").length);
+      
+      frame.addHeader("destination", getQueuePrefix() + getQueueName());
+      frame.addHeader("content-type", "application/xml");
+      frame.addHeader("content-length", cLen);
+      String hKey = "special-header\\\\\\n\\:";
+      String hVal = "\\:\\\\\\ngood";
+      frame.addHeader(hKey, hVal);
+      
+      System.out.println("key: |" + hKey + "| val: |" + hVal);
+      
+      frame.setBody(body);
+      
+      connV11.sendFrame(frame);
+      
+      //subscribe
+      StompClientConnection newConn = StompClientConnectionFactory.createClientConnection("1.1", hostname, port);
+      newConn.connect(defUser, defPass);
+      
+      ClientStompFrame subFrame = newConn.createFrame("SUBSCRIBE");
+      subFrame.addHeader("id", "a-sub");
+      subFrame.addHeader("destination", getQueuePrefix() + getQueueName());
+      subFrame.addHeader("ack", "auto");
+      
+      newConn.sendFrame(subFrame);
+      
+      frame = newConn.receiveFrame();
+      
+      System.out.println("received " + frame);
+      
+      assertEquals("MESSAGE", frame.getCommand());
+      
+      String value = frame.getHeader("special-header" + "\\" + "\n" + ":");
+      
+      assertEquals(":" + "\\" + "\n" + "good", value);
+      
+      //unsub
+      ClientStompFrame unsubFrame = newConn.createFrame("UNSUBSCRIBE");
+      unsubFrame.addHeader("id", "a-sub");
+      
+      newConn.disconnect();
+   }
 }
 
 



More information about the hornetq-commits mailing list