[hornetq-commits] JBoss hornetq SVN: r11341 - in branches/STOMP11: hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v11 and 2 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Tue Sep 13 10:13:23 EDT 2011


Author: gaohoward
Date: 2011-09-13 10:13:22 -0400 (Tue, 13 Sep 2011)
New Revision: 11341

Modified:
   branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompFrame.java
   branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompVersions.java
   branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/VersionedStompFrameHandler.java
   branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v11/StompFrameHandlerV11.java
   branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v11/StompFrameV11.java
   branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/AbstractClientStompFrame.java
   branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/AbstractStompClientConnection.java
   branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/StompClientConnection.java
   branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/StompClientConnectionV10.java
   branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/StompClientConnectionV11.java
   branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/v11/StompTestV11.java
Log:
more tests


Modified: branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompFrame.java
===================================================================
--- branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompFrame.java	2011-09-13 11:15:17 UTC (rev 11340)
+++ branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompFrame.java	2011-09-13 14:13:22 UTC (rev 11341)
@@ -20,6 +20,7 @@
 import java.io.UnsupportedEncodingException;
 import java.util.ArrayList;
 import java.util.LinkedHashMap;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
@@ -184,13 +185,41 @@
          return escape(val);
       }
       
-      private String escape(String str)
+      public static String escape(String str)
       {
-         str = str.replaceAll("\n", "\\n");
-         str = str.replaceAll("\\", "\\\\");
-         str = str.replaceAll(":", "\\:");
+         int len = str.length();
          
-         return str;
+         char[] buffer = new char[2*len];
+         int iBuffer = 0;
+         for (int i = 0; i < len; i++)
+         {
+            char c = str.charAt(i);
+            if (c == '\n')
+            {
+               buffer[iBuffer++] = '\\';
+               buffer[iBuffer] = 'n';
+            }
+            else if (c == '\\')
+            {
+               buffer[iBuffer++] = '\\';
+               buffer[iBuffer] = '\\';
+            }
+            else if (c == ':')
+            {
+               buffer[iBuffer++] = '\\';
+               buffer[iBuffer] = ':';
+            }
+            else
+            {
+               buffer[iBuffer] = c;
+            }
+            iBuffer++;
+         }
+         
+         char[] total = new char[iBuffer];
+         System.arraycopy(buffer, 0, total, 0, iBuffer);
+         
+         return new String(total);
       }
    }
 
@@ -232,4 +261,9 @@
    {
       this.bytesBody = content;
    }
+
+   public void setNeedsDisconnect(boolean b)
+   {
+      disconnect = b;
+   }
 }

Modified: branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompVersions.java
===================================================================
--- branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompVersions.java	2011-09-13 11:15:17 UTC (rev 11340)
+++ branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompVersions.java	2011-09-13 14:13:22 UTC (rev 11341)
@@ -21,6 +21,15 @@
  */
 public enum StompVersions
 {
-	V1_0,
-	V1_1
+   V1_0,
+   V1_1;
+
+   public String toString()
+   {
+	   if (this == V1_0)
+      {
+         return "1.0";
+      }
+      return "1.1";
+   }
 }

Modified: branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/VersionedStompFrameHandler.java
===================================================================
--- branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/VersionedStompFrameHandler.java	2011-09-13 11:15:17 UTC (rev 11340)
+++ branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/VersionedStompFrameHandler.java	2011-09-13 14:13:22 UTC (rev 11341)
@@ -15,6 +15,7 @@
 import java.io.UnsupportedEncodingException;
 
 import org.hornetq.api.core.HornetQBuffer;
+import org.hornetq.core.logging.Logger;
 import org.hornetq.core.protocol.stomp.v10.StompFrameHandlerV10;
 import org.hornetq.core.protocol.stomp.v11.StompFrameHandlerV11;
 import org.hornetq.core.server.ServerMessage;
@@ -24,7 +25,9 @@
  * @author <a href="mailto:hgao at redhat.com">Howard Gao</a>
  */
 public abstract class VersionedStompFrameHandler
-{   
+{
+   private static final Logger log = Logger.getLogger(VersionedStompFrameHandler.class);
+
    protected StompConnection connection;
    
    public static VersionedStompFrameHandler getHandler(StompConnection connection, StompVersions version)
@@ -93,9 +96,13 @@
          response = onUnknown(request.getCommand());
       }
       
-      if (request.hasHeader(Stomp.Headers.RECEIPT_REQUESTED) && (response == null))
+      log.error("-------------------- handled " + request);
+
+      if (response == null)
       {
-         response = handleReceipt(request.getHeader(Stomp.Headers.RECEIPT_REQUESTED));
+         response = postprocess(request);
+         
+         log.error("---------------postprocessed response: " + response);
       }
       
       return response;
@@ -126,6 +133,11 @@
       
       return receipt;
    }
+   
+   public StompFrame postprocess(StompFrame request)
+   {
+      return null;
+   }
 
    public abstract StompFrame createMessageFrame(ServerMessage serverMessage,
          StompSubscription subscription, int deliveryCount) throws Exception;

Modified: branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v11/StompFrameHandlerV11.java
===================================================================
--- branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v11/StompFrameHandlerV11.java	2011-09-13 11:15:17 UTC (rev 11340)
+++ branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v11/StompFrameHandlerV11.java	2011-09-13 14:13:22 UTC (rev 11341)
@@ -146,9 +146,25 @@
    @Override
    public StompFrame onDisconnect(StompFrame frame)
    {
-      connection.destroy();
+      log.error("----------------- frame: " + frame);
+      
       return null;
    }
+   
+   @Override
+   public StompFrame postprocess(StompFrame request)
+   {
+      StompFrame response = null;
+      if (request.hasHeader(Stomp.Headers.RECEIPT_REQUESTED))
+      {
+         response = handleReceipt(request.getHeader(Stomp.Headers.RECEIPT_REQUESTED));
+         if (request.getCommand().equals(Stomp.Commands.DISCONNECT))
+         {
+            response.setNeedsDisconnect(true);
+         }
+      }
+      return response;
+   }
 
    @Override
    public StompFrame onSend(StompFrame frame)
@@ -415,6 +431,8 @@
    @Override
    public void replySent(StompFrame reply)
    {
+      log.error("----------------------- reply sent notified: " + reply);
+      
       if (reply.getCommand().equals(Stomp.Responses.CONNECTED))
       {
          //kick off the pinger
@@ -807,8 +825,10 @@
          // Now the headers
 
          boolean isEscaping = false;
-         SimpleBytes holder = new SimpleBytes(1024);         
+         SimpleBytes holder = new SimpleBytes(1024);      
          
+         log.error("--------------------------------- Decoding command: " + decoder.command);
+         
          outer: while (true)
          {
             byte b = decoder.workingBuffer[decoder.pos++];
@@ -887,6 +907,8 @@
                   }
                   holder.reset();
                   
+                  log.error("---------- A new header decoded: " + decoder.headerName + " : " + headerValue);
+                  
                   decoder.headers.put(decoder.headerName, headerValue);
 
                   if (decoder.headerName.equals(StompDecoder.CONTENT_LENGTH_HEADER_NAME))
@@ -914,6 +936,8 @@
                   decoder.whiteSpaceOnly = false;
 
                   decoder.headerValueWhitespace = false;
+                  
+                  holder.append(b);
                }
             }
             if (decoder.pos == decoder.data)

Modified: branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v11/StompFrameV11.java
===================================================================
--- branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v11/StompFrameV11.java	2011-09-13 11:15:17 UTC (rev 11340)
+++ branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v11/StompFrameV11.java	2011-09-13 14:13:22 UTC (rev 11341)
@@ -49,7 +49,14 @@
    {
       if (buffer == null)
       {
-         buffer = HornetQBuffers.dynamicBuffer(bytesBody.length + 512);
+         if (bytesBody != null)
+         {
+            buffer = HornetQBuffers.dynamicBuffer(bytesBody.length + 512);
+         }
+         else
+         {
+            buffer = HornetQBuffers.dynamicBuffer(512);
+         }
 
          StringBuffer head = new StringBuffer();
          head.append(command);
@@ -66,7 +73,11 @@
          head.append(Stomp.NEWLINE);
 
          buffer.writeBytes(head.toString().getBytes("UTF-8"));
-         buffer.writeBytes(bytesBody);
+         if (bytesBody != null)
+         {
+            buffer.writeBytes(bytesBody);
+         }
+
          buffer.writeBytes(END_OF_FRAME);
 
          size = buffer.writerIndex();

Modified: branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/AbstractClientStompFrame.java
===================================================================
--- branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/AbstractClientStompFrame.java	2011-09-13 11:15:17 UTC (rev 11340)
+++ branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/AbstractClientStompFrame.java	2011-09-13 14:13:22 UTC (rev 11341)
@@ -64,10 +64,15 @@
          sb.append(headers.get(i).key + ":" + headers.get(i).val + "\n");
       }
       sb.append("\n");
-      sb.append(body);
+      if (body != null)
+      {
+         sb.append(body);
+      }
       sb.append((char)0);
       
       String data = new String(sb.toString());
+      
+      System.out.println("---------------------------full frame is : " + data);
       byte[] byteValue = data.getBytes("UTF-8");
       
       ByteBuffer buffer = ByteBuffer.allocateDirect(byteValue.length);

Modified: branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/AbstractStompClientConnection.java
===================================================================
--- branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/AbstractStompClientConnection.java	2011-09-13 11:15:17 UTC (rev 11340)
+++ branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/AbstractStompClientConnection.java	2011-09-13 14:13:22 UTC (rev 11341)
@@ -129,7 +129,6 @@
          }
          else
          {
-            System.out.println("Added to list: " + b);
             receiveList.add(b);
          }
       }
@@ -180,11 +179,22 @@
    public void connect() throws Exception
    {
       connect(null, null);
+      connected = true;
    }
    
    public void connect(String username, String password) throws Exception
    {
       throw new RuntimeException("connect method not implemented!");
    }
+   
+   public boolean isConnected()
+   {
+      return connected;
+   }
+   
+   public String getVersion()
+   {
+      return version;
+   }
 
 }

Modified: branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/StompClientConnection.java
===================================================================
--- branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/StompClientConnection.java	2011-09-13 11:15:17 UTC (rev 11340)
+++ branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/StompClientConnection.java	2011-09-13 14:13:22 UTC (rev 11341)
@@ -29,6 +29,14 @@
    void connect() throws Exception;
 
    void disconnect() throws IOException, InterruptedException;
+
+   void connect(String defUser, String defPass) throws Exception;
+
+   boolean isConnected();
+
+   String getVersion();
+
+   ClientStompFrame createFrame(String command);
    
 }
 

Modified: branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/StompClientConnectionV10.java
===================================================================
--- branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/StompClientConnectionV10.java	2011-09-13 11:15:17 UTC (rev 11340)
+++ branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/StompClientConnectionV10.java	2011-09-13 14:13:22 UTC (rev 11341)
@@ -36,6 +36,8 @@
       
       ClientStompFrame response = this.sendFrame(frame);
       System.out.println("Got response : " + response);
+      
+      connected = true;
    }
 
    @Override
@@ -45,5 +47,15 @@
       this.sendFrame(frame);
       
       close();
+      
+      connected = false;
    }
+
+   @Override
+   public ClientStompFrame createFrame(
+         String command)
+   {
+      // TODO Auto-generated method stub
+      return null;
+   }
 }

Modified: branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/StompClientConnectionV11.java
===================================================================
--- branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/StompClientConnectionV11.java	2011-09-13 11:15:17 UTC (rev 11340)
+++ branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/StompClientConnectionV11.java	2011-09-13 14:13:22 UTC (rev 11341)
@@ -55,6 +55,7 @@
          this.passcode = passcode;
          this.connected = true;
       }
+      connected = true;
    }
 
    public void connect1(String username, String passcode) throws IOException, InterruptedException
@@ -85,11 +86,18 @@
    public void disconnect() throws IOException, InterruptedException
    {
       ClientStompFrame frame = factory.newFrame(DISCONNECT_COMMAND);
-      frame.addHeader(RECEIPT_HEADER, "77");
       
-      this.sendFrame(frame);
+      ClientStompFrame result = this.sendFrame(frame);
       
       close();
+      
+      connected = false;
    }
 
+   @Override
+   public ClientStompFrame createFrame(String command)
+   {
+      return new ClientStompFrameV11(command);
+   }
+
 }

Modified: branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/v11/StompTestV11.java
===================================================================
--- branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/v11/StompTestV11.java	2011-09-13 11:15:17 UTC (rev 11340)
+++ branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/v11/StompTestV11.java	2011-09-13 14:13:22 UTC (rev 11341)
@@ -18,14 +18,89 @@
 package org.hornetq.tests.integration.stomp.v11;
 
 import org.hornetq.core.logging.Logger;
+import org.hornetq.tests.integration.stomp.util.ClientStompFrame;
 import org.hornetq.tests.integration.stomp.util.StompClientConnection;
+import org.hornetq.tests.integration.stomp.util.StompClientConnectionFactory;
 
+
 public class StompTestV11 extends StompTestBase2
 {
    private static final transient Logger log = Logger.getLogger(StompTestV11.class);
    
+   private StompClientConnection connV10;
+   private StompClientConnection connV11;
+   
+   protected void setUp() throws Exception
+   {
+      super.setUp();
+      connV10 = StompClientConnectionFactory.createClientConnection("1.0", hostname, port);
+      connV11 = StompClientConnectionFactory.createClientConnection("1.1", hostname, port);
+   }
+   
+   protected void tearDown() throws Exception
+   {
+      if (connV10.isConnected())
+      {
+         connV10.disconnect();
+      }
+      if (connV11.isConnected())
+      {
+         connV11.disconnect();
+      }
+      super.tearDown();
+   }
+   
    public void testConnection() throws Exception
    {
-      StompClientConnection connection = StompClientConnectionFactory.createClientConnection("1.1", hostname, port);
+      StompClientConnection connection = StompClientConnectionFactory.createClientConnection("1.0", hostname, port);
+      
+      connection.connect(defUser, defPass);
+      
+      assertTrue(connection.isConnected());
+      
+      assertEquals("1.0", connection.getVersion());
+      
+      connection.disconnect();
+
+      connection = StompClientConnectionFactory.createClientConnection("1.1", hostname, port);
+      
+      connection.connect(defUser, defPass);
+      
+      assertTrue(connection.isConnected());
+      
+      assertEquals("1.1", connection.getVersion());
+      
+      connection.disconnect();
    }
+   
+   public void testNegotiation() throws Exception
+   {
+      ClientStompFrame frame = connV11.createFrame("CONNECT");
+      frame.addHeader("accept-version", "1.0,1.1");
+      frame.addHeader("host", "127.0.0.1");
+      frame.addHeader("login", this.defUser);
+      frame.addHeader("passcode", this.defPass);
+      
+      ClientStompFrame reply = connV11.sendFrame(frame);
+      
+      assertEquals("CONNECTED", reply.getCommand());
+      
+      //reply headers: version, session, server
+      assertEquals("1.1", reply.getHeader("version"));
+      
+      String sessionId = reply.getHeader("session");
+      
+      log.info("session id: " + sessionId);
+
+      assertNotNull(sessionId);
+      
+      String server = reply.getHeader("server");
+      
+      log.info("server: " + server);
+      
+      assertNotNull(server);
+      
+      connV11.disconnect();
+      
+   }
 }



More information about the hornetq-commits mailing list