[hornetq-commits] JBoss hornetq SVN: r11352 - in branches/STOMP11: tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util and 1 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Fri Sep 16 01:20:30 EDT 2011


Author: gaohoward
Date: 2011-09-16 01:20:30 -0400 (Fri, 16 Sep 2011)
New Revision: 11352

Modified:
   branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v11/StompFrameHandlerV11.java
   branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/AbstractStompClientConnection.java
   branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/ClientStompFrameV11.java
   branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/StompClientConnection.java
   branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/StompClientConnectionV10.java
   branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/StompClientConnectionV11.java
   branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/v11/StompTestV11.java
Log:
tests


Modified: branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v11/StompFrameHandlerV11.java
===================================================================
--- branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v11/StompFrameHandlerV11.java	2011-09-15 14:34:00 UTC (rev 11351)
+++ branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v11/StompFrameHandlerV11.java	2011-09-16 05:20:30 UTC (rev 11352)
@@ -99,7 +99,14 @@
             if (heartBeat != null)
             {
                handleHeartBeat(heartBeat);
-               response.addHeader(Stomp.Headers.Connected.HEART_BEAT, "20,100");
+               if (heartBeater == null)
+               {
+                  response.addHeader(Stomp.Headers.Connected.HEART_BEAT, "0,0");
+               }
+               else
+               {
+                  response.addHeader(Stomp.Headers.Connected.HEART_BEAT, heartBeater.getServerHeartBeatValue());
+               }
             }
          }
          else
@@ -147,7 +154,18 @@
    public StompFrame onDisconnect(StompFrame frame)
    {
       log.error("----------------- frame: " + frame);
-      
+      if (this.heartBeater != null)
+      {
+         heartBeater.shutdown();
+         try
+         {
+            heartBeater.join();
+         }
+         catch (InterruptedException e)
+         {
+            log.warn("Interrupted while waiting for heart beater to die", e);
+         }
+      }
       return null;
    }
    
@@ -371,7 +389,11 @@
    @Override
    public StompFrame onStomp(StompFrame request)
    {
-      return onConnect(request);
+      if (!connection.isValid())
+      {
+         return onConnect(request);
+      }
+      return null;
    }
 
    @Override
@@ -479,6 +501,9 @@
    // 2*serverAcceptPing, disconnect!
    private class HeartBeater extends Thread
    {
+      final int MIN_SERVER_PING = 200;
+      final int MIN_CLIENT_PING = 500;
+      
       long serverPing = 0;
       long serverAcceptPing = 0;
       volatile boolean shutdown = false;
@@ -490,15 +515,26 @@
       {
          if (clientPing != 0)
          {
-            serverAcceptPing = clientPing > 100 ? clientPing : 100;
+            serverAcceptPing = clientPing > MIN_CLIENT_PING ? clientPing : MIN_CLIENT_PING;
          }
          
          if (clientAcceptPing != 0)
          {
-            serverPing = clientAcceptPing > 20 ? clientAcceptPing : 20;
+            serverPing = clientAcceptPing > MIN_SERVER_PING ? clientAcceptPing : MIN_SERVER_PING;
          }
       }
       
+      public synchronized void shutdown()
+      {
+         shutdown = true;
+         this.notify();
+      }
+
+      public String getServerHeartBeatValue()
+      {
+         return String.valueOf(serverPing) + "," + String.valueOf(serverAcceptPing);
+      }
+
       public void pinged()
       {
          lastPingTime.set(System.currentTimeMillis());
@@ -537,6 +573,9 @@
                if (serverAcceptPing != 0)
                {
                   dur2 = System.currentTimeMillis() - lastAccepted.get();
+                  
+                  log.error("-------------------------- dur2 is " + dur2);
+                  
                   if (dur2 > (2 * serverAcceptPing))
                   {
                      connection.disconnect();
@@ -545,24 +584,51 @@
                   }
                }
                
-               long waitTime1 = serverPing - dur1;
-               long waitTime2 = serverAcceptPing * 2 - dur2;
-
-               long waitTime = waitTime1 < waitTime2 ? waitTime1 : waitTime2;
+               long waitTime1 = 0;
+               long waitTime2 = 0;
                
+               if (serverPing > 0)
+               {
+                  waitTime1 = serverPing - dur1;
+               }
+               
+               if (serverAcceptPing > 0)
+               {
+                  waitTime2 = serverAcceptPing * 2 - dur2;
+               }
+               
+               long waitTime = 10l;
+               
+               if ((waitTime1 > 0) && (waitTime1 > 0))
+               {
+                  waitTime = waitTime1 < waitTime2 ? waitTime1 : waitTime2;
+               }
+               else if (waitTime1 > 0)
+               {
+                  waitTime = waitTime1;
+               }
+               else if (waitTime2 > 0)
+               {
+                  waitTime = waitTime2;
+               }
+               
                try
                {
+                  log.error("-------------------waiting for " + waitTime);
                   this.wait(waitTime);
+                  log.error("--------------------wake up " );
                }
                catch (InterruptedException e)
                {
                }
             }
+            log.error("-------------------------HeartBeat thread shut down!");
          }
       }
 
       public void pingAccepted()
       {
+         log.error("------------------------Ping accepted!");
          this.lastAccepted.set(System.currentTimeMillis());
       }
    }
@@ -572,6 +638,7 @@
    {
       if (heartBeater != null)
       {
+         log.error("----------------------PING accepted: " + request);
          heartBeater.pingAccepted();
       }
    }

Modified: branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/AbstractStompClientConnection.java
===================================================================
--- branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/AbstractStompClientConnection.java	2011-09-15 14:34:00 UTC (rev 11351)
+++ branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/AbstractStompClientConnection.java	2011-09-16 05:20:30 UTC (rev 11352)
@@ -195,5 +195,10 @@
    {
       return version;
    }
+   
+   public int getFrameQueueSize()
+   {
+      return this.frameQueue.size();
+   }
 
 }

Modified: branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/ClientStompFrameV11.java
===================================================================
--- branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/ClientStompFrameV11.java	2011-09-15 14:34:00 UTC (rev 11351)
+++ branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/ClientStompFrameV11.java	2011-09-16 05:20:30 UTC (rev 11352)
@@ -20,14 +20,23 @@
  */
 public class ClientStompFrameV11 extends AbstractClientStompFrame
 {
+   boolean forceOneway = false;
+   
    public ClientStompFrameV11(String command)
    {
       super(command);
    }
+   
+   public void setForceOneway()
+   {
+      forceOneway = true;
+   }
 
    @Override
    public boolean needsReply()
    {
+      if (forceOneway) return false;
+      
       if ("CONNECT".equals(command) || "STOMP".equals(command) || headerKeys.contains(HEADER_RECEIPT))
       {
          return true;

Modified: branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/StompClientConnection.java
===================================================================
--- branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/StompClientConnection.java	2011-09-15 14:34:00 UTC (rev 11351)
+++ branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/StompClientConnection.java	2011-09-16 05:20:30 UTC (rev 11352)
@@ -37,6 +37,13 @@
    String getVersion();
 
    ClientStompFrame createFrame(String command);
+
+   //number of frames at the queue
+   int getFrameQueueSize();
+
+   void startPinger(long interval);
+
+   void stopPinger();
    
 }
 

Modified: branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/StompClientConnectionV10.java
===================================================================
--- branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/StompClientConnectionV10.java	2011-09-15 14:34:00 UTC (rev 11351)
+++ branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/StompClientConnectionV10.java	2011-09-16 05:20:30 UTC (rev 11352)
@@ -65,4 +65,18 @@
       // TODO Auto-generated method stub
       return null;
    }
+
+   @Override
+   public void startPinger(long interval)
+   {
+      // TODO Auto-generated method stub
+      
+   }
+
+   @Override
+   public void stopPinger()
+   {
+      // TODO Auto-generated method stub
+      
+   }
 }

Modified: branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/StompClientConnectionV11.java
===================================================================
--- branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/StompClientConnectionV11.java	2011-09-15 14:34:00 UTC (rev 11351)
+++ branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/util/StompClientConnectionV11.java	2011-09-16 05:20:30 UTC (rev 11352)
@@ -27,6 +27,8 @@
    public static final String HOST_HEADER = "host";
    public static final String VERSION_HEADER = "version";
    public static final String RECEIPT_HEADER = "receipt";
+   
+   private Pinger pinger;
 
    public StompClientConnectionV11(String host, int port) throws IOException
    {
@@ -93,6 +95,7 @@
    @Override
    public void disconnect() throws IOException, InterruptedException
    {
+      stopPinger();
       ClientStompFrame frame = factory.newFrame(DISCONNECT_COMMAND);
       frame.addHeader("receipt", "1");
       
@@ -114,4 +117,81 @@
       return new ClientStompFrameV11(command);
    }
 
+   @Override
+   public void startPinger(long interval)
+   {
+      pinger = new Pinger(interval);
+      pinger.startPing();
+   }
+
+   @Override
+   public void stopPinger()
+   {
+      if (pinger != null)
+      {
+         pinger.stopPing();
+         try
+         {
+            pinger.join();
+         }
+         catch (InterruptedException e)
+         {
+            e.printStackTrace();
+         }
+         pinger = null;
+      }
+   }
+   
+   private class Pinger extends Thread
+   {
+      long pingInterval;
+      ClientStompFrameV11 pingFrame;
+      volatile boolean stop = false;
+      
+      public Pinger(long interval)
+      {
+         this.pingInterval = interval;
+         pingFrame = (ClientStompFrameV11) createFrame("STOMP");
+         pingFrame.setBody("\n");
+         pingFrame.setForceOneway();
+      }
+      
+      public void startPing()
+      {
+         start();
+      }
+      
+      public synchronized void stopPing()
+      {
+         stop = true;
+         this.notify();
+      }
+      
+      public void run()
+      {
+         synchronized (this)
+         {
+            while (!stop)
+            {
+               try
+               {
+                  System.out.println("============sending ping");
+                  
+                  sendFrame(pingFrame);
+                  
+                  System.out.println("Pinged " + pingFrame);
+                  
+                  this.wait(pingInterval);
+               }
+               catch (Exception e)
+               {
+                  stop = true;
+                  e.printStackTrace();
+               }
+            }
+            System.out.println("Pinger stopped");
+         }
+      }
+   }
+
 }

Modified: branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/v11/StompTestV11.java
===================================================================
--- branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/v11/StompTestV11.java	2011-09-15 14:34:00 UTC (rev 11351)
+++ branches/STOMP11/tests/integration-tests/src/test/java/org/hornetq/tests/integration/stomp/v11/StompTestV11.java	2011-09-16 05:20:30 UTC (rev 11352)
@@ -17,6 +17,8 @@
  */
 package org.hornetq.tests.integration.stomp.v11;
 
+import java.io.IOException;
+
 import org.hornetq.core.logging.Logger;
 import org.hornetq.tests.integration.stomp.util.ClientStompFrame;
 import org.hornetq.tests.integration.stomp.util.StompClientConnection;
@@ -350,6 +352,116 @@
       
       newConn.disconnect();
    }
+   
+   public void testHeartBeat() throws Exception
+   {
+      //no heart beat at all if heat-beat absent
+      ClientStompFrame frame = connV11.createFrame("CONNECT");
+      frame.addHeader("host", "127.0.0.1");
+      frame.addHeader("login", this.defUser);
+      frame.addHeader("passcode", this.defPass);
+      
+      ClientStompFrame reply = connV11.sendFrame(frame);
+      
+      assertEquals("CONNECTED", reply.getCommand());
+      
+      Thread.sleep(5000);
+      
+      assertEquals(0, connV11.getFrameQueueSize());
+      
+      connV11.disconnect();
+      
+      //no heart beat for (0,0)
+      connV11 = StompClientConnectionFactory.createClientConnection("1.1", hostname, port);
+      frame = connV11.createFrame("CONNECT");
+      frame.addHeader("host", "127.0.0.1");
+      frame.addHeader("login", this.defUser);
+      frame.addHeader("passcode", this.defPass);
+      frame.addHeader("heart-beat", "0,0");
+      frame.addHeader("accept-version", "1.0,1.1");
+      
+      reply = connV11.sendFrame(frame);
+      
+      assertEquals("CONNECTED", reply.getCommand());
+      
+      assertEquals("0,0", reply.getHeader("heart-beat"));
+      
+      Thread.sleep(5000);
+      
+      assertEquals(0, connV11.getFrameQueueSize());
+      
+      connV11.disconnect();
+
+      //heart-beat (1,0), should receive a min client ping accepted by server
+      connV11 = StompClientConnectionFactory.createClientConnection("1.1", hostname, port);
+      frame = connV11.createFrame("CONNECT");
+      frame.addHeader("host", "127.0.0.1");
+      frame.addHeader("login", this.defUser);
+      frame.addHeader("passcode", this.defPass);
+      frame.addHeader("heart-beat", "1,0");
+      frame.addHeader("accept-version", "1.0,1.1");
+      
+      reply = connV11.sendFrame(frame);
+      
+      assertEquals("CONNECTED", reply.getCommand());
+      
+      assertEquals("0,500", reply.getHeader("heart-beat"));
+      
+      Thread.sleep(2000);
+      
+      //now server side should be disconnected because we didn't send ping for 2 sec
+      frame = connV11.createFrame("SEND");
+      frame.addHeader("destination", getQueuePrefix() + getQueueName());
+      frame.addHeader("content-type", "text/plain");
+      frame.setBody("Hello World");
+
+      //send will fail
+      try
+      {
+         connV11.sendFrame(frame);
+         fail("connection should have been destroyed by now");
+      }
+      catch (IOException e)
+      {
+         //ignore
+      }
+      
+      //heart-beat (1,0), start a ping, then send a message, should be ok.
+      connV11 = StompClientConnectionFactory.createClientConnection("1.1", hostname, port);
+      frame = connV11.createFrame("CONNECT");
+      frame.addHeader("host", "127.0.0.1");
+      frame.addHeader("login", this.defUser);
+      frame.addHeader("passcode", this.defPass);
+      frame.addHeader("heart-beat", "1,0");
+      frame.addHeader("accept-version", "1.0,1.1");
+      
+      reply = connV11.sendFrame(frame);
+      
+      assertEquals("CONNECTED", reply.getCommand());
+      
+      assertEquals("0,500", reply.getHeader("heart-beat"));
+      
+      System.out.println("========== start pinger!");
+      
+      connV11.startPinger(500);
+      
+      Thread.sleep(2000);
+      
+      //now server side should be disconnected because we didn't send ping for 2 sec
+      frame = connV11.createFrame("SEND");
+      frame.addHeader("destination", getQueuePrefix() + getQueueName());
+      frame.addHeader("content-type", "text/plain");
+      frame.setBody("Hello World");
+
+      //send will be ok
+      connV11.sendFrame(frame);
+      
+      connV11.stopPinger();
+      
+      connV11.disconnect();
+
+   }
+   
 }
 
 



More information about the hornetq-commits mailing list