[hornetq-commits] JBoss hornetq SVN: r11629 - in trunk: hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v11 and 2 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Tue Nov 1 23:05:58 EDT 2011


Author: gaohoward
Date: 2011-11-01 23:05:57 -0400 (Tue, 01 Nov 2011)
New Revision: 11629

Modified:
   trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompFrame.java
   trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v11/StompFrameHandlerV11.java
   trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/AbstractClientStompFrame.java
   trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/AbstractStompClientConnection.java
   trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/ClientStompFrame.java
   trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/ClientStompFrameV10.java
   trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/ClientStompFrameV11.java
   trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/StompClientConnection.java
   trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/StompClientConnectionV10.java
   trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/StompClientConnectionV11.java
   trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/v11/StompTestV11.java
Log:
Fix Stomp heart-beat issue: 
It should send a 'new line' byte rather than a STOMP frame, as per spec


Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompFrame.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompFrame.java	2011-11-02 01:16:08 UTC (rev 11628)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompFrame.java	2011-11-02 03:05:57 UTC (rev 11629)
@@ -58,6 +58,8 @@
    
    protected boolean disconnect;
    
+   protected boolean isPing;
+   
    public StompFrame(String command)
    {
       this(command, false);
@@ -109,6 +111,16 @@
       out += body;
       return out;
    }
+   
+   public boolean isPing()
+   {
+      return isPing;
+   }
+   
+   public void setPing(boolean ping)
+   {
+      isPing = ping;
+   }
  
    public HornetQBuffer toHornetQBuffer() throws Exception
    {
@@ -123,6 +135,12 @@
             buffer = HornetQBuffers.dynamicBuffer(512);
          }
 
+         if (isPing())
+         {
+            buffer.writeByte((byte)10);
+            return buffer;
+         }
+
          StringBuffer head = new StringBuffer();
          head.append(command);
          head.append(Stomp.NEWLINE);

Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v11/StompFrameHandlerV11.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v11/StompFrameHandlerV11.java	2011-11-02 01:16:08 UTC (rev 11628)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v11/StompFrameHandlerV11.java	2011-11-02 03:05:57 UTC (rev 11629)
@@ -126,6 +126,7 @@
       {
          response = new HornetQStompException("Encoding error.", e).getFrame();
       }
+
       return response;
    }
 
@@ -193,7 +194,7 @@
 
    @Override
    public StompFrame onSend(StompFrame frame)
-   {
+   {      
       StompFrame response = null;
       try
       {
@@ -492,7 +493,7 @@
    public StompFrame createPingFrame() throws UnsupportedEncodingException
    {
       StompFrame frame = new StompFrame(Stomp.Commands.STOMP);
-      frame.setBody("\n");
+      frame.setPing(true);
       return frame;
    }
    
@@ -658,31 +659,37 @@
       buffer.readBytes(decoder.workingBuffer, decoder.data, readable);
 
       decoder.data += readable;
-
+      
       if (decoder.command == null)
       {
-         if (decoder.data < 4)
+         int offset = 0;
+         
+         //check for ping
+         while (decoder.workingBuffer[offset] == StompDecoder.NEW_LINE)
          {
-            // Need at least four bytes to identify the command
-            // - up to 3 bytes for the command name + potentially another byte for a leading \n
-
-            return null;
-         }
-
-         int offset;
-
-         if (decoder.workingBuffer[0] == StompDecoder.NEW_LINE)
-         {
+            if (heartBeater != null)
+            {
+               //client ping
+               heartBeater.pingAccepted();
+            }
             // Yuck, some badly behaved STOMP clients add a \n *after* the terminating NUL char at the end of the
             // STOMP
             // frame this can manifest as an extra \n at the beginning when the next STOMP frame is read - we need to
             // deal
             // with this
-            offset = 1;
+            offset++;
+            if (offset >= decoder.data)
+            {
+               decoder.data = 0;
+               return null;
+            }
          }
-         else
+            
+         if (decoder.data < 4)
          {
-            offset = 0;
+            // Need at least four bytes to identify the command
+            // - up to 3 bytes for the command name + potentially another byte for a leading \n
+            return null;
          }
 
          byte b = decoder.workingBuffer[offset];
@@ -1025,7 +1032,7 @@
       }
 
       // Now the body
-
+      
       byte[] content = null;
 
       if (decoder.contentLength != -1)

Modified: trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/AbstractClientStompFrame.java
===================================================================
--- trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/AbstractClientStompFrame.java	2011-11-02 01:16:08 UTC (rev 11628)
+++ trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/AbstractClientStompFrame.java	2011-11-02 03:05:57 UTC (rev 11629)
@@ -25,7 +25,7 @@
  * @author <a href="mailto:hgao at redhat.com">Howard Gao</a>
  *
  */
-public class AbstractClientStompFrame implements ClientStompFrame
+public abstract class AbstractClientStompFrame implements ClientStompFrame
 {
    protected static final String HEADER_RECEIPT = "receipt";
    
@@ -56,6 +56,13 @@
    @Override
    public ByteBuffer toByteBuffer() throws UnsupportedEncodingException
    {
+      if (isPing())
+      {
+         ByteBuffer buffer = ByteBuffer.allocateDirect(1);
+         buffer.put((byte)0x0A);
+         buffer.rewind();
+         return buffer;
+      }
       StringBuffer sb = new StringBuffer();
       sb.append(command + "\n");
       int n = headers.size();

Modified: trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/AbstractStompClientConnection.java
===================================================================
--- trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/AbstractStompClientConnection.java	2011-11-02 01:16:08 UTC (rev 11628)
+++ trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/AbstractStompClientConnection.java	2011-11-02 03:05:57 UTC (rev 11629)
@@ -177,8 +177,8 @@
                
                if (validateFrame(frame))
                {
-                 frameQueue.offer(frame);
-                 receiveList.clear();
+                  frameQueue.offer(frame);
+                  receiveList.clear();
                }
                else
                {
@@ -188,13 +188,25 @@
          }
          else
          {
-            receiveList.add(b);
+            if (b == 10 && receiveList.size() == 0)
+            {
+               //may be a ping
+               incrementServerPing();
+            }
+            else
+            {
+               receiveList.add(b);
+            }
          }
       }
       //clear readbuffer
       readBuffer.rewind();
    }
    
+   protected void incrementServerPing()
+   {
+   }
+
    private boolean validateFrame(ClientStompFrame f) throws UnsupportedEncodingException
    {
       String h = f.getHeader("content-length");

Modified: trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/ClientStompFrame.java
===================================================================
--- trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/ClientStompFrame.java	2011-11-02 01:16:08 UTC (rev 11628)
+++ trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/ClientStompFrame.java	2011-11-02 03:05:57 UTC (rev 11629)
@@ -42,4 +42,6 @@
 
    public ByteBuffer toByteBufferWithExtra(String str)  throws UnsupportedEncodingException;
    
+   public boolean isPing();
+   
 }

Modified: trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/ClientStompFrameV10.java
===================================================================
--- trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/ClientStompFrameV10.java	2011-11-02 01:16:08 UTC (rev 11628)
+++ trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/ClientStompFrameV10.java	2011-11-02 03:05:57 UTC (rev 11629)
@@ -25,5 +25,11 @@
    {
       super(command);
    }
+
+   @Override
+   public boolean isPing()
+   {
+      return false;
+   }
    
 }

Modified: trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/ClientStompFrameV11.java
===================================================================
--- trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/ClientStompFrameV11.java	2011-11-02 01:16:08 UTC (rev 11628)
+++ trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/ClientStompFrameV11.java	2011-11-02 03:05:57 UTC (rev 11629)
@@ -21,6 +21,7 @@
 public class ClientStompFrameV11 extends AbstractClientStompFrame
 {
    boolean forceOneway = false;
+   boolean isPing = false;
    
    public ClientStompFrameV11(String command)
    {
@@ -43,4 +44,14 @@
       }
       return false;
    }
+
+   public void setPing(boolean b)
+   {
+      isPing = b;
+   }
+   
+   public boolean isPing()
+   {
+      return isPing;
+   }
 }

Modified: trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/StompClientConnection.java
===================================================================
--- trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/StompClientConnection.java	2011-11-02 01:16:08 UTC (rev 11628)
+++ trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/StompClientConnection.java	2011-11-02 03:05:57 UTC (rev 11629)
@@ -52,6 +52,8 @@
    void destroy();
 
    ClientStompFrame sendWickedFrame(ClientStompFrame frame) throws IOException, InterruptedException;
+
+   int getServerPingNumber();
    
 }
 

Modified: trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/StompClientConnectionV10.java
===================================================================
--- trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/StompClientConnectionV10.java	2011-11-02 01:16:08 UTC (rev 11628)
+++ trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/StompClientConnectionV10.java	2011-11-02 03:05:57 UTC (rev 11629)
@@ -88,14 +88,16 @@
    @Override
    public void startPinger(long interval)
    {
-      // TODO Auto-generated method stub
-      
    }
 
    @Override
    public void stopPinger()
    {
-      // TODO Auto-generated method stub
-      
    }
+
+   @Override
+   public int getServerPingNumber()
+   {
+      return 0;
+   }
 }

Modified: trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/StompClientConnectionV11.java
===================================================================
--- trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/StompClientConnectionV11.java	2011-11-02 01:16:08 UTC (rev 11628)
+++ trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/StompClientConnectionV11.java	2011-11-02 03:05:57 UTC (rev 11629)
@@ -29,6 +29,7 @@
    public static final String RECEIPT_HEADER = "receipt";
    
    private Pinger pinger;
+   private volatile int serverPingCounter;
 
    public StompClientConnectionV11(String host, int port) throws IOException
    {
@@ -126,6 +127,7 @@
    public void disconnect() throws IOException, InterruptedException
    {
       stopPinger();
+      
       ClientStompFrame frame = factory.newFrame(DISCONNECT_COMMAND);
       frame.addHeader("receipt", "1");
       
@@ -184,6 +186,7 @@
          pingFrame = (ClientStompFrameV11) createFrame("STOMP");
          pingFrame.setBody("\n");
          pingFrame.setForceOneway();
+         pingFrame.setPing(true);
       }
       
       public void startPing()
@@ -205,12 +208,8 @@
             {
                try
                {
-                  System.out.println("============sending ping");
-                  
                   sendFrame(pingFrame);
                   
-                  System.out.println("Pinged " + pingFrame);
-                  
                   this.wait(pingInterval);
                }
                catch (Exception e)
@@ -219,9 +218,20 @@
                   e.printStackTrace();
                }
             }
-            System.out.println("Pinger stopped");
          }
       }
    }
 
+   @Override
+   public int getServerPingNumber()
+   {
+      return serverPingCounter;
+   }
+   
+   protected void incrementServerPing()
+   {
+      serverPingCounter++;
+   }
+
+
 }

Modified: trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/v11/StompTestV11.java
===================================================================
--- trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/v11/StompTestV11.java	2011-11-02 01:16:08 UTC (rev 11628)
+++ trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/v11/StompTestV11.java	2011-11-02 03:05:57 UTC (rev 11629)
@@ -55,6 +55,7 @@
    
    protected void tearDown() throws Exception
    {
+      System.out.println("Connection 11 : " + connV11.isConnected());
       if (connV11.isConnected())
       {
          connV11.disconnect();
@@ -238,6 +239,7 @@
       //unsub
       ClientStompFrame unsubFrame = newConn.createFrame("UNSUBSCRIBE");
       unsubFrame.addHeader("id", "a-sub");
+      newConn.sendFrame(unsubFrame);
       
       newConn.disconnect();
    }
@@ -491,9 +493,8 @@
       ClientStompFrame reply = connV11.sendFrame(frame);
       
       assertEquals("CONNECTED", reply.getCommand());
-      
       assertEquals("500,500", reply.getHeader("heart-beat"));
-      
+
       connV11.disconnect();
       
       //heart-beat (500,1000)
@@ -518,7 +519,7 @@
       Thread.sleep(10000);
       
       //now check the frame size
-      int size = connV11.getFrameQueueSize();
+      int size = connV11.getServerPingNumber();
       
       System.out.println("ping received: " + size);
       
@@ -533,12 +534,211 @@
       //send will be ok
       connV11.sendFrame(frame);
       
-      connV11.stopPinger();
-      
       connV11.disconnect();
+   }
+   
+   public void testSendWithHeartBeatsAndReceive() throws Exception
+   {
+      StompClientConnection newConn = null;
+      try
+      {
+         ClientStompFrame frame = connV11.createFrame("CONNECT");
+         frame.addHeader("host", "127.0.0.1");
+         frame.addHeader("login", this.defUser);
+         frame.addHeader("passcode", this.defPass);
+         frame.addHeader("heart-beat", "500,1000");
+         frame.addHeader("accept-version", "1.0,1.1");
 
+         connV11.sendFrame(frame);
+
+         connV11.startPinger(500);
+
+         frame = connV11.createFrame("SEND");
+         frame.addHeader("destination", getQueuePrefix() + getQueueName());
+         frame.addHeader("content-type", "text/plain");
+
+         for (int i = 0; i < 10; i++)
+         {
+            frame.setBody("Hello World " + i + "!");
+            connV11.sendFrame(frame);
+            Thread.sleep(500);
+         }
+
+         // subscribe
+         newConn = StompClientConnectionFactory.createClientConnection("1.1",
+               hostname, port);
+         newConn.connect(defUser, defPass);
+
+         ClientStompFrame subFrame = newConn.createFrame("SUBSCRIBE");
+         subFrame.addHeader("id", "a-sub");
+         subFrame.addHeader("destination", getQueuePrefix() + getQueueName());
+         subFrame.addHeader("ack", "auto");
+
+         newConn.sendFrame(subFrame);
+
+         int cnt = 0;
+
+         frame = newConn.receiveFrame();
+
+         while (frame != null)
+         {
+            cnt++;
+            Thread.sleep(500);
+            frame = newConn.receiveFrame(5000);
+         }
+
+         assertEquals(10, cnt);
+
+         // unsub
+         ClientStompFrame unsubFrame = newConn.createFrame("UNSUBSCRIBE");
+         unsubFrame.addHeader("id", "a-sub");
+         newConn.sendFrame(unsubFrame);
+      }
+      finally
+      {
+         if (newConn != null)
+            newConn.disconnect();
+         connV11.disconnect();
+      }
    }
    
+   public void testSendAndReceiveWithHeartBeats() throws Exception
+   {
+      connV11.connect(defUser, defPass);
+      ClientStompFrame frame = connV11.createFrame("SEND");
+      frame.addHeader("destination", getQueuePrefix() + getQueueName());
+      frame.addHeader("content-type", "text/plain");
+      
+      for (int i = 0; i < 10; i++)
+      {
+         frame.setBody("Hello World " + i + "!");
+         connV11.sendFrame(frame);
+         Thread.sleep(500);
+      }
+      
+      //subscribe
+      StompClientConnection newConn = StompClientConnectionFactory.createClientConnection("1.1", hostname, port);
+      try
+      {
+         frame = newConn.createFrame("CONNECT");
+         frame.addHeader("host", "127.0.0.1");
+         frame.addHeader("login", this.defUser);
+         frame.addHeader("passcode", this.defPass);
+         frame.addHeader("heart-beat", "500,1000");
+         frame.addHeader("accept-version", "1.0,1.1");
+
+         newConn.sendFrame(frame);
+
+         newConn.startPinger(500);
+
+         Thread.sleep(500);
+
+         ClientStompFrame subFrame = newConn.createFrame("SUBSCRIBE");
+         subFrame.addHeader("id", "a-sub");
+         subFrame.addHeader("destination", getQueuePrefix() + getQueueName());
+         subFrame.addHeader("ack", "auto");
+
+         newConn.sendFrame(subFrame);
+
+         int cnt = 0;
+
+         frame = newConn.receiveFrame();
+
+         while (frame != null)
+         {
+            cnt++;
+            Thread.sleep(500);
+            frame = newConn.receiveFrame(5000);
+         }
+
+         assertEquals(10, cnt);
+
+         // unsub
+         ClientStompFrame unsubFrame = newConn.createFrame("UNSUBSCRIBE");
+         unsubFrame.addHeader("id", "a-sub");
+         newConn.sendFrame(unsubFrame);
+      }
+      finally
+      {
+         newConn.disconnect();
+      }
+   }
+
+   public void testSendWithHeartBeatsAndReceiveWithHeartBeats() throws Exception
+   {
+      StompClientConnection newConn = null;
+      try
+      {
+         ClientStompFrame frame = connV11.createFrame("CONNECT");
+         frame.addHeader("host", "127.0.0.1");
+         frame.addHeader("login", this.defUser);
+         frame.addHeader("passcode", this.defPass);
+         frame.addHeader("heart-beat", "500,1000");
+         frame.addHeader("accept-version", "1.0,1.1");
+
+         connV11.sendFrame(frame);
+
+         connV11.startPinger(500);
+
+         frame = connV11.createFrame("SEND");
+         frame.addHeader("destination", getQueuePrefix() + getQueueName());
+         frame.addHeader("content-type", "text/plain");
+
+         for (int i = 0; i < 10; i++)
+         {
+            frame.setBody("Hello World " + i + "!");
+            connV11.sendFrame(frame);
+            Thread.sleep(500);
+         }
+
+         // subscribe
+         newConn = StompClientConnectionFactory.createClientConnection("1.1",
+               hostname, port);
+         frame = newConn.createFrame("CONNECT");
+         frame.addHeader("host", "127.0.0.1");
+         frame.addHeader("login", this.defUser);
+         frame.addHeader("passcode", this.defPass);
+         frame.addHeader("heart-beat", "500,1000");
+         frame.addHeader("accept-version", "1.0,1.1");
+
+         newConn.sendFrame(frame);
+
+         newConn.startPinger(500);
+
+         Thread.sleep(500);
+
+         ClientStompFrame subFrame = newConn.createFrame("SUBSCRIBE");
+         subFrame.addHeader("id", "a-sub");
+         subFrame.addHeader("destination", getQueuePrefix() + getQueueName());
+         subFrame.addHeader("ack", "auto");
+
+         newConn.sendFrame(subFrame);
+
+         int cnt = 0;
+
+         frame = newConn.receiveFrame();
+
+         while (frame != null)
+         {
+            cnt++;
+            Thread.sleep(500);
+            frame = newConn.receiveFrame(5000);
+         }
+         assertEquals(10, cnt);
+
+         // unsub
+         ClientStompFrame unsubFrame = newConn.createFrame("UNSUBSCRIBE");
+         unsubFrame.addHeader("id", "a-sub");
+         newConn.sendFrame(unsubFrame);
+      }
+      finally
+      {
+         if (newConn != null)
+            newConn.disconnect();
+         connV11.disconnect();
+      }
+   }
+
    public void testNack() throws Exception
    {
       connV11.connect(defUser, defPass);



More information about the hornetq-commits mailing list