[hornetq-commits] JBoss hornetq SVN: r11404 - in branches/STOMP11: tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util and 1 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Fri Sep 23 12:09:49 EDT 2011


Author: gaohoward
Date: 2011-09-23 12:09:48 -0400 (Fri, 23 Sep 2011)
New Revision: 11404

Modified:
   branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v10/StompFrameHandlerV10.java
   branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/AbstractClientStompFrame.java
   branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/AbstractStompClientConnection.java
   branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/ClientStompFrame.java
   branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/StompClientConnection.java
   branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/v11/StompTestV11.java
Log:
tests


Modified: branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v10/StompFrameHandlerV10.java
===================================================================
--- branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v10/StompFrameHandlerV10.java	2011-09-23 13:53:53 UTC (rev 11403)
+++ branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v10/StompFrameHandlerV10.java	2011-09-23 16:09:48 UTC (rev 11404)
@@ -13,7 +13,6 @@
 package org.hornetq.core.protocol.stomp.v10;
 
 import java.io.UnsupportedEncodingException;
-import java.util.List;
 import java.util.Map;
 
 import org.hornetq.api.core.HornetQBuffer;
@@ -27,7 +26,6 @@
 import org.hornetq.core.protocol.stomp.StompConnection;
 import org.hornetq.core.protocol.stomp.StompDecoder;
 import org.hornetq.core.protocol.stomp.StompFrame;
-import org.hornetq.core.protocol.stomp.StompFrame.Header;
 import org.hornetq.core.protocol.stomp.StompSubscription;
 import org.hornetq.core.protocol.stomp.StompUtils;
 import org.hornetq.core.protocol.stomp.VersionedStompFrameHandler;
@@ -53,7 +51,6 @@
    @Override
    public StompFrame onConnect(StompFrame frame)
    {
-      log.error("-----------------onConnection ()");
       StompFrame response = null;
       Map<String, String> headers = frame.getHeadersMap();
       String login = (String)headers.get(Stomp.Headers.Connect.LOGIN);
@@ -61,10 +58,8 @@
       String clientID = (String)headers.get(Stomp.Headers.Connect.CLIENT_ID);
       String requestID = (String)headers.get(Stomp.Headers.Connect.REQUEST_ID);
 
-      log.error("------------ validating user: " + login + " code " + passcode);
       if (connection.validateUser(login, passcode))
       {
-         log.error("-------user OK!!!");
          connection.setClientID(clientID);
          connection.setValid(true);
          
@@ -84,7 +79,6 @@
       }
       else
       {
-         log.error("--------user NOT ok!!");
          //not valid
          response = new StompFrameV10(Stomp.Responses.ERROR);
          response.addHeader(Stomp.Headers.Error.MESSAGE, "Failed to connect");
@@ -95,7 +89,6 @@
          catch (UnsupportedEncodingException e)
          {
             log.error("Encoding problem", e);
-            //then we will send a null body message.
          }
       }
       return response;

Modified: branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/AbstractClientStompFrame.java
===================================================================
--- branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/AbstractClientStompFrame.java	2011-09-23 13:53:53 UTC (rev 11403)
+++ branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/AbstractClientStompFrame.java	2011-09-23 16:09:48 UTC (rev 11404)
@@ -70,7 +70,7 @@
       }
       sb.append((char)0);
       
-      String data = new String(sb.toString());
+      String data = sb.toString();
       
       byte[] byteValue = data.getBytes("UTF-8");
       
@@ -82,6 +82,35 @@
    }
 
    @Override
+   public ByteBuffer toByteBufferWithExtra(String str) throws UnsupportedEncodingException
+   {
+      StringBuffer sb = new StringBuffer();
+      sb.append(command + "\n");
+      int n = headers.size();
+      for (int i = 0; i < n; i++)
+      {
+         sb.append(headers.get(i).key + ":" + headers.get(i).val + "\n");
+      }
+      sb.append("\n");
+      if (body != null)
+      {
+         sb.append(body);
+      }
+      sb.append((char)0);
+      sb.append(str);
+      
+      String data = sb.toString();
+      
+      byte[] byteValue = data.getBytes("UTF-8");
+      
+      ByteBuffer buffer = ByteBuffer.allocateDirect(byteValue.length);
+      buffer.put(byteValue);
+      
+      buffer.rewind();
+      return buffer;
+   }
+
+   @Override
    public boolean needsReply()
    {
       if ("CONNECT".equals(command) || headerKeys.contains(HEADER_RECEIPT))

Modified: branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/AbstractStompClientConnection.java
===================================================================
--- branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/AbstractStompClientConnection.java	2011-09-23 13:53:53 UTC (rev 11403)
+++ branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/AbstractStompClientConnection.java	2011-09-23 16:09:48 UTC (rev 11404)
@@ -113,7 +113,38 @@
       }
       return response;
    }
-   
+
+   public ClientStompFrame sendWickedFrame(ClientStompFrame frame) throws IOException, InterruptedException
+   {
+      ClientStompFrame response = null;
+      ByteBuffer buffer = frame.toByteBufferWithExtra("\n");
+      
+      while (buffer.remaining() > 0)
+      {
+         socketChannel.write(buffer);
+      }
+      
+      //now response
+      if (frame.needsReply())
+      {
+         response = receiveFrame();
+         
+         //filter out server ping
+         while (response != null)
+         {
+            if (response.getCommand().equals("STOMP"))
+            {
+               response = receiveFrame();
+            }
+            else
+            {
+               break;
+            }
+         }
+      }
+      return response;
+   }
+
    public ClientStompFrame receiveFrame() throws InterruptedException
    {
       return frameQueue.poll(10, TimeUnit.SECONDS);
@@ -138,9 +169,16 @@
                   frameBytes[j] = receiveList.get(j);
                }
                ClientStompFrame frame = factory.createFrame(new String(frameBytes, "UTF-8"));
-               frameQueue.offer(frame);
                
-               receiveList.clear();
+               if (validateFrame(frame))
+               {
+                 frameQueue.offer(frame);
+                 receiveList.clear();
+               }
+               else
+               {
+                  receiveList.add(b);
+               }
             }
          }
          else
@@ -152,6 +190,20 @@
       readBuffer.rewind();
    }
    
+   private boolean validateFrame(ClientStompFrame f) throws UnsupportedEncodingException
+   {
+      String h = f.getHeader("content-length");
+      if (h != null)
+      {
+         int len = Integer.valueOf(h);
+         if (f.getBody().getBytes("UTF-8").length < len)
+         {
+            return false;
+         }
+      }
+      return true;
+   }
+   
    protected void close() throws IOException
    {
       socketChannel.close();

Modified: branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/ClientStompFrame.java
===================================================================
--- branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/ClientStompFrame.java	2011-09-23 13:53:53 UTC (rev 11403)
+++ branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/ClientStompFrame.java	2011-09-23 16:09:48 UTC (rev 11404)
@@ -39,5 +39,7 @@
    public String getHeader(String header);
 
    public String getBody();
+
+   public ByteBuffer toByteBufferWithExtra(String str)  throws UnsupportedEncodingException;
    
 }

Modified: branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/StompClientConnection.java
===================================================================
--- branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/StompClientConnection.java	2011-09-23 13:53:53 UTC (rev 11403)
+++ branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/StompClientConnection.java	2011-09-23 16:09:48 UTC (rev 11404)
@@ -48,6 +48,8 @@
    void stopPinger();
 
    void destroy();
+
+   ClientStompFrame sendWickedFrame(ClientStompFrame frame) throws IOException, InterruptedException;
    
 }
 

Modified: branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/v11/StompTestV11.java
===================================================================
--- branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/v11/StompTestV11.java	2011-09-23 13:53:53 UTC (rev 11403)
+++ branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/v11/StompTestV11.java	2011-09-23 16:09:48 UTC (rev 11404)
@@ -22,6 +22,7 @@
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
+import javax.jms.BytesMessage;
 import javax.jms.Message;
 import javax.jms.MessageConsumer;
 import javax.jms.MessageListener;
@@ -1210,7 +1211,104 @@
       connV11.disconnect();
    }
 
+   public void testSendMessage() throws Exception
+   {
+      MessageConsumer consumer = session.createConsumer(queue);
 
+      connV11.connect(defUser, defPass);
+
+      ClientStompFrame frame = connV11.createFrame("SEND");
+      frame.addHeader("destination", getQueuePrefix() + getQueueName());
+      frame.setBody("Hello World");
+      
+      connV11.sendFrame(frame);
+
+      TextMessage message = (TextMessage)consumer.receive(1000);
+      Assert.assertNotNull(message);
+      Assert.assertEquals("Hello World", message.getText());
+      // Assert default priority 4 is used when priority header is not set
+      Assert.assertEquals("getJMSPriority", 4, message.getJMSPriority());
+
+      // Make sure that the timestamp is valid - should
+      // be very close to the current time.
+      long tnow = System.currentTimeMillis();
+      long tmsg = message.getJMSTimestamp();
+      Assert.assertTrue(Math.abs(tnow - tmsg) < 1000);
+   }
+
+   public void testSendMessageWithContentLength() throws Exception
+   {
+      MessageConsumer consumer = session.createConsumer(queue);
+
+      connV11.connect(defUser, defPass);
+
+      byte[] data = new byte[] { 1, 0, 0, 4 };
+      
+      ClientStompFrame frame = connV11.createFrame("SEND");
+      
+      frame.addHeader("destination", getQueuePrefix() + getQueueName());
+      frame.setBody(new String(data, "UTF-8"));
+      
+      frame.addHeader("content-length", String.valueOf(data.length));
+
+      connV11.sendFrame(frame);
+      
+      BytesMessage message = (BytesMessage)consumer.receive(10000);
+      Assert.assertNotNull(message);
+      //there is one extra null byte
+      assertEquals(data.length + 1, message.getBodyLength());
+      assertEquals(data[0], message.readByte());
+      assertEquals(data[1], message.readByte());
+      assertEquals(data[2], message.readByte());
+      assertEquals(data[3], message.readByte());
+   }
+
+   public void testSendMessageWithCustomHeadersAndSelector() throws Exception
+   {
+      MessageConsumer consumer = session.createConsumer(queue, "foo = 'abc'");
+
+      connV11.connect(defUser, defPass);
+
+      ClientStompFrame frame = connV11.createFrame("SEND");
+      frame.addHeader("foo", "abc");
+      frame.addHeader("bar", "123");
+      
+      frame.addHeader("destination", getQueuePrefix() + getQueueName());
+      frame.setBody("Hello World");
+      
+      connV11.sendFrame(frame);
+
+      TextMessage message = (TextMessage)consumer.receive(1000);
+      Assert.assertNotNull(message);
+      Assert.assertEquals("Hello World", message.getText());
+      Assert.assertEquals("foo", "abc", message.getStringProperty("foo"));
+      Assert.assertEquals("bar", "123", message.getStringProperty("bar"));
+   }
+   
+   public void testSendMessageWithLeadingNewLine() throws Exception
+   {
+      MessageConsumer consumer = session.createConsumer(queue);
+
+      connV11.connect(defUser, defPass);
+      
+      ClientStompFrame frame = connV11.createFrame("SEND");
+
+      frame.addHeader("destination", getQueuePrefix() + getQueueName());
+      frame.setBody("Hello World");
+
+      connV11.sendWickedFrame(frame);
+
+      TextMessage message = (TextMessage)consumer.receive(1000);
+      Assert.assertNotNull(message);
+      Assert.assertEquals("Hello World", message.getText());
+
+      // Make sure that the timestamp is valid - should
+      // be very close to the current time.
+      long tnow = System.currentTimeMillis();
+      long tmsg = message.getJMSTimestamp();
+      Assert.assertTrue(Math.abs(tnow - tmsg) < 1000);
+   }
+
    //-----------------private help methods
    
    private void abortTransaction(StompClientConnection conn, String txID) throws IOException, InterruptedException



More information about the hornetq-commits mailing list