[hornetq-commits] JBoss hornetq SVN: r11296 - in branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp: v11 and 1 other directory.

do-not-reply at jboss.org do-not-reply at jboss.org
Tue Sep 6 23:36:07 EDT 2011


Author: gaohoward
Date: 2011-09-06 23:36:06 -0400 (Tue, 06 Sep 2011)
New Revision: 11296

Modified:
   branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/FrameEventListener.java
   branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompConnection.java
   branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompProtocolManager.java
   branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v11/StompFrameHandlerV11.java
Log:
heart beat


Modified: branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/FrameEventListener.java
===================================================================
--- branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/FrameEventListener.java	2011-09-05 14:18:21 UTC (rev 11295)
+++ branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/FrameEventListener.java	2011-09-07 03:36:06 UTC (rev 11296)
@@ -26,4 +26,6 @@
 
    void replySent(StompFrame reply);
 
+   void requestAccepted(StompFrame request);
+
 }

Modified: branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompConnection.java
===================================================================
--- branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompConnection.java	2011-09-05 14:18:21 UTC (rev 11295)
+++ branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompConnection.java	2011-09-07 03:36:06 UTC (rev 11296)
@@ -425,6 +425,12 @@
    public void handleFrame(StompFrame request)
    {
       StompFrame reply = null;
+      
+      if (stompListener != null)
+      {
+         stompListener.requestAccepted(request);
+      }
+
       try
       {
          if (!initialized)
@@ -446,10 +452,6 @@
       if (reply != null)
       {
          sendFrame(reply);
-         if (stompListener != null)
-         {
-            stompListener.replySent(reply);
-         }
       }
    }
 
@@ -677,4 +679,22 @@
    {
       this.stompListener = listener;
    }
+
+   //send a ping stomp frame
+   public void ping(StompFrame pingFrame)
+   {
+      manager.sendReply(this, pingFrame);
+   }
+
+   public void physicalSend(StompFrame frame) throws Exception
+   {
+      HornetQBuffer buffer = frame.toHornetQBuffer();
+      getTransportConnection().write(buffer, false, false);
+
+      if (stompListener != null)
+      {
+         stompListener.replySent(frame);
+      }
+
+   }
 }

Modified: branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompProtocolManager.java
===================================================================
--- branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompProtocolManager.java	2011-09-05 14:18:21 UTC (rev 11295)
+++ branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompProtocolManager.java	2011-09-07 03:36:06 UTC (rev 11296)
@@ -157,8 +157,7 @@
 
          try
          {
-            HornetQBuffer buffer = frame.toHornetQBuffer();
-            connection.getTransportConnection().write(buffer, false, false);
+            connection.physicalSend(frame);
          }
          catch (Exception e)
          {

Modified: branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v11/StompFrameHandlerV11.java
===================================================================
--- branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v11/StompFrameHandlerV11.java	2011-09-05 14:18:21 UTC (rev 11295)
+++ branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v11/StompFrameHandlerV11.java	2011-09-07 03:36:06 UTC (rev 11296)
@@ -13,6 +13,7 @@
 package org.hornetq.core.protocol.stomp.v11;
 
 import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
 
 import org.hornetq.api.core.HornetQBuffer;
 import org.hornetq.api.core.Message;
@@ -409,10 +410,19 @@
          //kick off the pinger
          startHeartBeat();
       }
+      
       if (reply.needsDisconnect())
       {
          connection.destroy();
       }
+      else
+      {
+         //update ping
+         if (heartBeater != null)
+         {
+            heartBeater.pinged();
+         }
+      }
    }
    
    private void startHeartBeat()
@@ -423,6 +433,13 @@
       }
    }
    
+   public StompFrame createPingFrame()
+   {
+      StompFrame frame = new StompFrame(Stomp.Commands.STOMP);
+      frame.setBody("\n");
+      return frame;
+   }
+   
    //server heart beat (20,100) (hard coded)
    //algorithm: 
    //(a) server ping: if server hasn't sent any frame within serverPing 
@@ -433,10 +450,10 @@
    {
       long serverPing = 0;
       long serverAcceptPing = 0;
-      long waitingTime = 0;
       volatile boolean shutdown = false;
-      volatile long pings = 0;
-      volatile long accepts = 0;
+      AtomicLong lastPingTime = new AtomicLong(0);
+      AtomicLong lastAccepted = new AtomicLong(0);
+      StompFrame pingFrame;
 
       public HeartBeater(long clientPing, long clientAcceptPing)
       {
@@ -448,21 +465,18 @@
          if (clientAcceptPing != 0)
          {
             serverPing = clientAcceptPing > 20 ? clientAcceptPing : 20;
-            if (serverAcceptPing != 0)
-            {
-               waitingTime = serverPing > serverAcceptPing ? serverAcceptPing : serverPing;
-            }
-            else
-            {
-               waitingTime = serverPing;
-            }
          }
       }
       
+      public void pinged()
+      {
+         lastPingTime.set(System.currentTimeMillis());
+      }
+
       public void run()
       {
-         long lastPing = 0;
-         long lastAccepted = System.currentTimeMillis();
+         lastAccepted.set(System.currentTimeMillis());
+         pingFrame = createPingFrame();
          
          synchronized (this)
          {
@@ -473,44 +487,28 @@
                
                if (serverPing != 0)
                {
-                  if (pings == 0)
+                  dur1 = System.currentTimeMillis() - lastPingTime.get();
+                  if (dur1 >= serverPing)
                   {
-                     dur1 = System.currentTimeMillis() - lastPing;
-                     if (dur1 >= serverPing)
-                     {
-                        lastPing = System.currentTimeMillis();
-                        connection.ping();
-                        dur1 = 0;
-                     }
+                     lastPingTime.set(System.currentTimeMillis());
+                     connection.ping(pingFrame);
+                     dur1 = 0;
                   }
-                  else
-                  {
-                     dur1 = 5;
-                     pings = 0;
-                  }
                }
 
                if (serverAcceptPing != 0)
                {
-                  if (accepts == 0)
+                  dur2 = System.currentTimeMillis() - lastAccepted.get();
+                  if (dur2 > (2 * serverAcceptPing))
                   {
-                     dur2 = System.currentTimeMillis() - lastAccepted;
-                     if (dur2 > (2 * serverAcceptPing))
-                     {
-                        connection.setValid(false);
-                        shutdown = true;
-                        break;
-                     }
+                     connection.disconnect();
+                     shutdown = true;
+                     break;
                   }
-                  else
-                  {
-                     lastAccepted = System.currentTimeMillis();
-                     accepts = 0;
-                  }
                }
                
                long waitTime1 = serverPing - dur1;
-               long waitTime2 = serverAcceptPing*2 - dur2;
+               long waitTime2 = serverAcceptPing * 2 - dur2;
 
                long waitTime = waitTime1 < waitTime2 ? waitTime1 : waitTime2;
                
@@ -524,6 +522,20 @@
             }
          }
       }
+
+      public void pingAccepted()
+      {
+         this.lastAccepted.set(System.currentTimeMillis());
+      }
    }
 
+   @Override
+   public void requestAccepted(StompFrame request)
+   {
+      if (heartBeater != null)
+      {
+         heartBeater.pingAccepted();
+      }
+   }
+
 }



More information about the hornetq-commits mailing list