[hornetq-commits] JBoss hornetq SVN: r11295 - in branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp: v11 and 1 other directory.

do-not-reply at jboss.org do-not-reply at jboss.org
Mon Sep 5 10:18:21 EDT 2011


Author: gaohoward
Date: 2011-09-05 10:18:21 -0400 (Mon, 05 Sep 2011)
New Revision: 11295

Added:
   branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/FrameEventListener.java
Modified:
   branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/HornetQStompException.java
   branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/Stomp.java
   branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompConnection.java
   branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompFrame.java
   branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompSession.java
   branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v11/StompFrameHandlerV11.java
Log:
more coding


Added: branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/FrameEventListener.java
===================================================================
--- branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/FrameEventListener.java	                        (rev 0)
+++ branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/FrameEventListener.java	2011-09-05 14:18:21 UTC (rev 11295)
@@ -0,0 +1,29 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.hornetq.core.protocol.stomp;
+
+/**
+ * 
+ * @author <a href="mailto:hgao at redhat.com">Howard Gao</a>
+ */
+public interface FrameEventListener
+{
+
+   void replySent(StompFrame reply);
+
+}

Modified: branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/HornetQStompException.java
===================================================================
--- branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/HornetQStompException.java	2011-09-05 00:37:40 UTC (rev 11294)
+++ branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/HornetQStompException.java	2011-09-05 14:18:21 UTC (rev 11295)
@@ -15,6 +15,10 @@
 import java.util.ArrayList;
 import java.util.List;
 
+/**
+ * 
+ * @author <a href="mailto:hgao at redhat.com">Howard Gao</a>
+ */
 public class HornetQStompException extends Exception {
 
    private static final long serialVersionUID = -274452327574950068L;

Modified: branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/Stomp.java
===================================================================
--- branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/Stomp.java	2011-09-05 00:37:40 UTC (rev 11294)
+++ branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/Stomp.java	2011-09-05 14:18:21 UTC (rev 11295)
@@ -168,6 +168,8 @@
          //1.1
          String ACCEPT_VERSION = "accept-version";
          String HOST = "host";
+
+         Object HEART_BEAT = "heart-beat";
       }
 
       public interface Error
@@ -189,6 +191,8 @@
          String VERSION = "version";
 
          String SERVER = "server";
+
+         String HEART_BEAT = "heart-beat";
       }
 
       public interface Ack

Modified: branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompConnection.java
===================================================================
--- branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompConnection.java	2011-09-05 00:37:40 UTC (rev 11294)
+++ branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompConnection.java	2011-09-05 14:18:21 UTC (rev 11295)
@@ -76,6 +76,8 @@
    private VersionedStompFrameHandler frameHandler;
    
    private boolean initialized;
+   
+   private FrameEventListener stompListener;
 
    public StompDecoder getDecoder()
    {
@@ -444,6 +446,10 @@
       if (reply != null)
       {
          sendFrame(reply);
+         if (stompListener != null)
+         {
+            stompListener.replySent(reply);
+         }
       }
    }
 
@@ -662,8 +668,13 @@
    }
 
    public StompFrame createStompMessage(ServerMessage serverMessage,
-         StompSubscription subscription, int deliveryCount)
+         StompSubscription subscription, int deliveryCount) throws Exception
    {
       return frameHandler.createMessageFrame(serverMessage, subscription, deliveryCount);
    }
+
+   public void addStompEventListener(FrameEventListener listener)
+   {
+      this.stompListener = listener;
+   }
 }

Modified: branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompFrame.java
===================================================================
--- branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompFrame.java	2011-09-05 00:37:40 UTC (rev 11294)
+++ branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompFrame.java	2011-09-05 14:18:21 UTC (rev 11295)
@@ -56,10 +56,18 @@
 
    private int size;
    
+   private boolean disconnect;
+   
    public StompFrame(String command)
    {
+      this(command, false);
+   }
+
+   public StompFrame(String command, boolean disconnect)
+   {
       this.command = command;
       this.headers = new LinkedHashMap<String, String>();
+      this.disconnect = disconnect;
    }
 
    public String getCommand()
@@ -179,4 +187,9 @@
       }
       return new byte[0];
    }
+
+   public boolean needsDisconnect()
+   {
+      return disconnect;
+   }
 }

Modified: branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompSession.java
===================================================================
--- branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompSession.java	2011-09-05 00:37:40 UTC (rev 11294)
+++ branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompSession.java	2011-09-05 14:18:21 UTC (rev 11295)
@@ -105,7 +105,7 @@
       }
       catch (Exception e)
       {
-         e.printStackTrace();
+         log.error("Error delivering stomp messages", e);
          return 0;
       }
 

Modified: branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v11/StompFrameHandlerV11.java
===================================================================
--- branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v11/StompFrameHandlerV11.java	2011-09-05 00:37:40 UTC (rev 11294)
+++ branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v11/StompFrameHandlerV11.java	2011-09-05 14:18:21 UTC (rev 11295)
@@ -12,7 +12,6 @@
  */
 package org.hornetq.core.protocol.stomp.v11;
 
-import java.io.UnsupportedEncodingException;
 import java.util.Map;
 
 import org.hornetq.api.core.HornetQBuffer;
@@ -20,6 +19,7 @@
 import org.hornetq.api.core.SimpleString;
 import org.hornetq.core.logging.Logger;
 import org.hornetq.core.message.impl.MessageImpl;
+import org.hornetq.core.protocol.stomp.FrameEventListener;
 import org.hornetq.core.protocol.stomp.HornetQStompException;
 import org.hornetq.core.protocol.stomp.Stomp;
 import org.hornetq.core.protocol.stomp.StompConnection;
@@ -27,20 +27,24 @@
 import org.hornetq.core.protocol.stomp.StompSubscription;
 import org.hornetq.core.protocol.stomp.StompUtils;
 import org.hornetq.core.protocol.stomp.VersionedStompFrameHandler;
-import org.hornetq.core.protocol.stomp.Stomp.Headers;
-import org.hornetq.core.protocol.stomp.v10.StompFrameHandlerV10;
 import org.hornetq.core.server.ServerMessage;
 import org.hornetq.core.server.impl.ServerMessageImpl;
-import org.hornetq.spi.core.security.HornetQSecurityManager;
 import org.hornetq.utils.DataConstants;
 
-public class StompFrameHandlerV11 extends VersionedStompFrameHandler
+/**
+ * 
+ * @author <a href="mailto:hgao at redhat.com">Howard Gao</a>
+ */
+public class StompFrameHandlerV11 extends VersionedStompFrameHandler implements FrameEventListener
 {
    private static final Logger log = Logger.getLogger(StompFrameHandlerV11.class);
+   
+   private HeartBeater heartBeater;
 
    public StompFrameHandlerV11(StompConnection connection)
    {
       this.connection = connection;
+      connection.addStompEventListener(this);
    }
 
    @Override
@@ -53,43 +57,81 @@
       String clientID = headers.get(Stomp.Headers.Connect.CLIENT_ID);
       String requestID = headers.get(Stomp.Headers.Connect.REQUEST_ID);
 
-      if (connection.validateUser(login, passcode))
+      try
       {
-         connection.setClientID(clientID);
-         connection.setValid(true);
-         
-         response = new StompFrame(Stomp.Responses.CONNECTED);
-         
-         //version
-         response.addHeader(Stomp.Headers.Connected.VERSION, connection.getVersion());
-         
-         //session
-         response.addHeader(Stomp.Headers.Connected.SESSION, connection.getID().toString());
-         
-         //server
-         response.addHeader(Stomp.Headers.Connected.SERVER, connection.getHornetQServerName());
-         
-         if (requestID != null)
+         if (connection.validateUser(login, passcode))
          {
-            response.addHeader(Stomp.Headers.Connected.RESPONSE_ID, requestID);
+            connection.setClientID(clientID);
+            connection.setValid(true);
+
+            response = new StompFrame(Stomp.Responses.CONNECTED);
+
+            // version
+            response.addHeader(Stomp.Headers.Connected.VERSION,
+                  connection.getVersion());
+
+            // session
+            response.addHeader(Stomp.Headers.Connected.SESSION, connection
+                  .getID().toString());
+
+            // server
+            response.addHeader(Stomp.Headers.Connected.SERVER,
+                  connection.getHornetQServerName());
+
+            if (requestID != null)
+            {
+               response.addHeader(Stomp.Headers.Connected.RESPONSE_ID,
+                     requestID);
+            }
+
+            // heart-beat. We need to start after connected frame has been sent.
+            // otherwise the client may receive heart-beat before it receives
+            // connected frame.
+            String heartBeat = headers.get(Stomp.Headers.Connect.HEART_BEAT);
+
+            if (heartBeat != null)
+            {
+               handleHeartBeat(heartBeat);
+               response.addHeader(Stomp.Headers.Connected.HEART_BEAT, "20,100");
+            }
          }
+         else
+         {
+            // not valid
+            response = new StompFrame(Stomp.Responses.ERROR, true);
+            response.addHeader(Stomp.Headers.Error.VERSION, "1.0,1.1");
+
+            response.setBody("Supported protocol versions are 1.0 and 1.1");
+         }
       }
-      else
+      catch (HornetQStompException e)
       {
-         //not valid
-         response = new StompFrame(Stomp.Responses.ERROR);
-         response.addHeader(Stomp.Headers.Error.VERSION, "1.0,1.1");
-         
-         response.setBody("Supported protocol versions are 1.0 and 1.1");
-         
-         connection.sendFrame(response);
-         connection.destroy();
-         
-         return null;
+         response = e.getFrame();
       }
       return response;
    }
 
+   //ping parameters, hard-code for now
+   //the server can support min 20 milliseconds and receive ping at 100 milliseconds (20,100)
+   private void handleHeartBeat(String heartBeatHeader) throws HornetQStompException
+   {
+      String[] params = heartBeatHeader.split(",");
+      if (params.length != 2)
+      {
+         throw new HornetQStompException("Incorrect heartbeat header " + heartBeatHeader);
+      }
+      
+      //client ping
+      long minPingInterval = Long.valueOf(params[0]);
+      //client receive ping
+      long minAcceptInterval = Long.valueOf(params[1]);
+      
+      if ((minPingInterval != 0) || (minAcceptInterval != 0))
+      {
+         heartBeater = new HeartBeater(minPingInterval, minAcceptInterval);
+      }
+   }
+
    @Override
    public StompFrame onDisconnect(StompFrame frame)
    {
@@ -359,4 +401,129 @@
 
    }
 
+   @Override
+   public void replySent(StompFrame reply)
+   {
+      if (reply.getCommand().equals(Stomp.Responses.CONNECTED))
+      {
+         //kick off the pinger
+         startHeartBeat();
+      }
+      if (reply.needsDisconnect())
+      {
+         connection.destroy();
+      }
+   }
+   
+   private void startHeartBeat()
+   {
+      if (heartBeater != null)
+      {
+         heartBeater.start();
+      }
+   }
+   
+   //server heart beat (20,100) (hard coded)
+   //algorithm: 
+   //(a) server ping: if server hasn't sent any frame within serverPing 
+   //interval, send a ping. 
+   //(b) accept ping: if server hasn't received any frame within
+   // 2*serverAcceptPing, disconnect!
+   private class HeartBeater extends Thread
+   {
+      long serverPing = 0;
+      long serverAcceptPing = 0;
+      long waitingTime = 0;
+      volatile boolean shutdown = false;
+      volatile long pings = 0;
+      volatile long accepts = 0;
+
+      public HeartBeater(long clientPing, long clientAcceptPing)
+      {
+         if (clientPing != 0)
+         {
+            serverAcceptPing = clientPing > 100 ? clientPing : 100;
+         }
+         
+         if (clientAcceptPing != 0)
+         {
+            serverPing = clientAcceptPing > 20 ? clientAcceptPing : 20;
+            if (serverAcceptPing != 0)
+            {
+               waitingTime = serverPing > serverAcceptPing ? serverAcceptPing : serverPing;
+            }
+            else
+            {
+               waitingTime = serverPing;
+            }
+         }
+      }
+      
+      public void run()
+      {
+         long lastPing = 0;
+         long lastAccepted = System.currentTimeMillis();
+         
+         synchronized (this)
+         {
+            while (!shutdown)
+            {
+               long dur1 = 0;
+               long dur2 = 0;
+               
+               if (serverPing != 0)
+               {
+                  if (pings == 0)
+                  {
+                     dur1 = System.currentTimeMillis() - lastPing;
+                     if (dur1 >= serverPing)
+                     {
+                        lastPing = System.currentTimeMillis();
+                        connection.ping();
+                        dur1 = 0;
+                     }
+                  }
+                  else
+                  {
+                     dur1 = 5;
+                     pings = 0;
+                  }
+               }
+
+               if (serverAcceptPing != 0)
+               {
+                  if (accepts == 0)
+                  {
+                     dur2 = System.currentTimeMillis() - lastAccepted;
+                     if (dur2 > (2 * serverAcceptPing))
+                     {
+                        connection.setValid(false);
+                        shutdown = true;
+                        break;
+                     }
+                  }
+                  else
+                  {
+                     lastAccepted = System.currentTimeMillis();
+                     accepts = 0;
+                  }
+               }
+               
+               long waitTime1 = serverPing - dur1;
+               long waitTime2 = serverAcceptPing*2 - dur2;
+
+               long waitTime = waitTime1 < waitTime2 ? waitTime1 : waitTime2;
+               
+               try
+               {
+                  this.wait(waitTime);
+               }
+               catch (InterruptedException e)
+               {
+               }
+            }
+         }
+      }
+   }
+
 }



More information about the hornetq-commits mailing list