From do-not-reply at jboss.org Mon Sep 5 10:18:22 2011 Content-Type: multipart/mixed; boundary="===============9016937098290607611==" MIME-Version: 1.0 From: do-not-reply at jboss.org To: hornetq-commits at lists.jboss.org Subject: [hornetq-commits] JBoss hornetq SVN: r11295 - in branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp: v11 and 1 other directory. Date: Mon, 05 Sep 2011 10:18:21 -0400 Message-ID: <201109051418.p85EILui027835@svn01.web.mwc.hst.phx2.redhat.com> --===============9016937098290607611== Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: quoted-printable Author: gaohoward Date: 2011-09-05 10:18:21 -0400 (Mon, 05 Sep 2011) New Revision: 11295 Added: branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/st= omp/FrameEventListener.java Modified: branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/st= omp/HornetQStompException.java branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/st= omp/Stomp.java branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/st= omp/StompConnection.java branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/st= omp/StompFrame.java branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/st= omp/StompSession.java branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/st= omp/v11/StompFrameHandlerV11.java Log: more coding Added: branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protoco= l/stomp/FrameEventListener.java =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D --- branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/s= tomp/FrameEventListener.java (rev 0) +++ branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/s= tomp/FrameEventListener.java 2011-09-05 14:18:21 UTC (rev 11295) @@ -0,0 +1,29 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.hornetq.core.protocol.stomp; + +/** + * = + * @author Howard Gao + */ +public interface FrameEventListener +{ + + void replySent(StompFrame reply); + +} Modified: branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/prot= ocol/stomp/HornetQStompException.java =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D --- branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/s= tomp/HornetQStompException.java 2011-09-05 00:37:40 UTC (rev 11294) +++ branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/s= tomp/HornetQStompException.java 2011-09-05 14:18:21 UTC (rev 11295) @@ -15,6 +15,10 @@ import java.util.ArrayList; import java.util.List; = +/** + * = + * @author Howard Gao + */ public class HornetQStompException extends Exception { = private static final long serialVersionUID =3D -274452327574950068L; Modified: branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/prot= ocol/stomp/Stomp.java =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D --- branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/s= tomp/Stomp.java 2011-09-05 00:37:40 UTC (rev 11294) +++ branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/s= tomp/Stomp.java 2011-09-05 14:18:21 UTC (rev 11295) @@ -168,6 +168,8 @@ //1.1 String ACCEPT_VERSION =3D "accept-version"; String HOST =3D "host"; + + Object HEART_BEAT =3D "heart-beat"; } = public interface Error @@ -189,6 +191,8 @@ String VERSION =3D "version"; = String SERVER =3D "server"; + + String HEART_BEAT =3D "heart-beat"; } = public interface Ack Modified: branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/prot= ocol/stomp/StompConnection.java =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D --- branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/s= tomp/StompConnection.java 2011-09-05 00:37:40 UTC (rev 11294) +++ branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/s= tomp/StompConnection.java 2011-09-05 14:18:21 UTC (rev 11295) @@ -76,6 +76,8 @@ private VersionedStompFrameHandler frameHandler; = private boolean initialized; + = + private FrameEventListener stompListener; = public StompDecoder getDecoder() { @@ -444,6 +446,10 @@ if (reply !=3D null) { sendFrame(reply); + if (stompListener !=3D null) + { + stompListener.replySent(reply); + } } } = @@ -662,8 +668,13 @@ } = public StompFrame createStompMessage(ServerMessage serverMessage, - StompSubscription subscription, int deliveryCount) + StompSubscription subscription, int deliveryCount) throws Excepti= on { return frameHandler.createMessageFrame(serverMessage, subscription, = deliveryCount); } + + public void addStompEventListener(FrameEventListener listener) + { + this.stompListener =3D listener; + } } Modified: branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/prot= ocol/stomp/StompFrame.java =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D --- branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/s= tomp/StompFrame.java 2011-09-05 00:37:40 UTC (rev 11294) +++ branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/s= tomp/StompFrame.java 2011-09-05 14:18:21 UTC (rev 11295) @@ -56,10 +56,18 @@ = private int size; = + private boolean disconnect; + = public StompFrame(String command) { + this(command, false); + } + + public StompFrame(String command, boolean disconnect) + { this.command =3D command; this.headers =3D new LinkedHashMap(); + this.disconnect =3D disconnect; } = public String getCommand() @@ -179,4 +187,9 @@ } return new byte[0]; } + + public boolean needsDisconnect() + { + return disconnect; + } } Modified: branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/prot= ocol/stomp/StompSession.java =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D --- branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/s= tomp/StompSession.java 2011-09-05 00:37:40 UTC (rev 11294) +++ branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/s= tomp/StompSession.java 2011-09-05 14:18:21 UTC (rev 11295) @@ -105,7 +105,7 @@ } catch (Exception e) { - e.printStackTrace(); + log.error("Error delivering stomp messages", e); return 0; } = Modified: branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/prot= ocol/stomp/v11/StompFrameHandlerV11.java =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D --- branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/s= tomp/v11/StompFrameHandlerV11.java 2011-09-05 00:37:40 UTC (rev 11294) +++ branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/s= tomp/v11/StompFrameHandlerV11.java 2011-09-05 14:18:21 UTC (rev 11295) @@ -12,7 +12,6 @@ */ package org.hornetq.core.protocol.stomp.v11; = -import java.io.UnsupportedEncodingException; import java.util.Map; = import org.hornetq.api.core.HornetQBuffer; @@ -20,6 +19,7 @@ import org.hornetq.api.core.SimpleString; import org.hornetq.core.logging.Logger; import org.hornetq.core.message.impl.MessageImpl; +import org.hornetq.core.protocol.stomp.FrameEventListener; import org.hornetq.core.protocol.stomp.HornetQStompException; import org.hornetq.core.protocol.stomp.Stomp; import org.hornetq.core.protocol.stomp.StompConnection; @@ -27,20 +27,24 @@ import org.hornetq.core.protocol.stomp.StompSubscription; import org.hornetq.core.protocol.stomp.StompUtils; import org.hornetq.core.protocol.stomp.VersionedStompFrameHandler; -import org.hornetq.core.protocol.stomp.Stomp.Headers; -import org.hornetq.core.protocol.stomp.v10.StompFrameHandlerV10; import org.hornetq.core.server.ServerMessage; import org.hornetq.core.server.impl.ServerMessageImpl; -import org.hornetq.spi.core.security.HornetQSecurityManager; import org.hornetq.utils.DataConstants; = -public class StompFrameHandlerV11 extends VersionedStompFrameHandler +/** + * = + * @author Howard Gao + */ +public class StompFrameHandlerV11 extends VersionedStompFrameHandler imple= ments FrameEventListener { private static final Logger log =3D Logger.getLogger(StompFrameHandlerV= 11.class); + = + private HeartBeater heartBeater; = public StompFrameHandlerV11(StompConnection connection) { this.connection =3D connection; + connection.addStompEventListener(this); } = @Override @@ -53,43 +57,81 @@ String clientID =3D headers.get(Stomp.Headers.Connect.CLIENT_ID); String requestID =3D headers.get(Stomp.Headers.Connect.REQUEST_ID); = - if (connection.validateUser(login, passcode)) + try { - connection.setClientID(clientID); - connection.setValid(true); - = - response =3D new StompFrame(Stomp.Responses.CONNECTED); - = - //version - response.addHeader(Stomp.Headers.Connected.VERSION, connection.ge= tVersion()); - = - //session - response.addHeader(Stomp.Headers.Connected.SESSION, connection.ge= tID().toString()); - = - //server - response.addHeader(Stomp.Headers.Connected.SERVER, connection.get= HornetQServerName()); - = - if (requestID !=3D null) + if (connection.validateUser(login, passcode)) { - response.addHeader(Stomp.Headers.Connected.RESPONSE_ID, reques= tID); + connection.setClientID(clientID); + connection.setValid(true); + + response =3D new StompFrame(Stomp.Responses.CONNECTED); + + // version + response.addHeader(Stomp.Headers.Connected.VERSION, + connection.getVersion()); + + // session + response.addHeader(Stomp.Headers.Connected.SESSION, connection + .getID().toString()); + + // server + response.addHeader(Stomp.Headers.Connected.SERVER, + connection.getHornetQServerName()); + + if (requestID !=3D null) + { + response.addHeader(Stomp.Headers.Connected.RESPONSE_ID, + requestID); + } + + // heart-beat. We need to start after connected frame has been= sent. + // otherwise the client may receive heart-beat before it recei= ves + // connected frame. + String heartBeat =3D headers.get(Stomp.Headers.Connect.HEART_B= EAT); + + if (heartBeat !=3D null) + { + handleHeartBeat(heartBeat); + response.addHeader(Stomp.Headers.Connected.HEART_BEAT, "20,= 100"); + } } + else + { + // not valid + response =3D new StompFrame(Stomp.Responses.ERROR, true); + response.addHeader(Stomp.Headers.Error.VERSION, "1.0,1.1"); + + response.setBody("Supported protocol versions are 1.0 and 1.1"= ); + } } - else + catch (HornetQStompException e) { - //not valid - response =3D new StompFrame(Stomp.Responses.ERROR); - response.addHeader(Stomp.Headers.Error.VERSION, "1.0,1.1"); - = - response.setBody("Supported protocol versions are 1.0 and 1.1"); - = - connection.sendFrame(response); - connection.destroy(); - = - return null; + response =3D e.getFrame(); } return response; } = + //ping parameters, hard-code for now + //the server can support min 20 milliseconds and receive ping at 100 mi= lliseconds (20,100) + private void handleHeartBeat(String heartBeatHeader) throws HornetQStom= pException + { + String[] params =3D heartBeatHeader.split(","); + if (params.length !=3D 2) + { + throw new HornetQStompException("Incorrect heartbeat header " + h= eartBeatHeader); + } + = + //client ping + long minPingInterval =3D Long.valueOf(params[0]); + //client receive ping + long minAcceptInterval =3D Long.valueOf(params[1]); + = + if ((minPingInterval !=3D 0) || (minAcceptInterval !=3D 0)) + { + heartBeater =3D new HeartBeater(minPingInterval, minAcceptInterva= l); + } + } + @Override public StompFrame onDisconnect(StompFrame frame) { @@ -359,4 +401,129 @@ = } = + @Override + public void replySent(StompFrame reply) + { + if (reply.getCommand().equals(Stomp.Responses.CONNECTED)) + { + //kick off the pinger + startHeartBeat(); + } + if (reply.needsDisconnect()) + { + connection.destroy(); + } + } + = + private void startHeartBeat() + { + if (heartBeater !=3D null) + { + heartBeater.start(); + } + } + = + //server heart beat (20,100) (hard coded) + //algorithm: = + //(a) server ping: if server hasn't sent any frame within serverPing = + //interval, send a ping. = + //(b) accept ping: if server hasn't received any frame within + // 2*serverAcceptPing, disconnect! + private class HeartBeater extends Thread + { + long serverPing =3D 0; + long serverAcceptPing =3D 0; + long waitingTime =3D 0; + volatile boolean shutdown =3D false; + volatile long pings =3D 0; + volatile long accepts =3D 0; + + public HeartBeater(long clientPing, long clientAcceptPing) + { + if (clientPing !=3D 0) + { + serverAcceptPing =3D clientPing > 100 ? clientPing : 100; + } + = + if (clientAcceptPing !=3D 0) + { + serverPing =3D clientAcceptPing > 20 ? clientAcceptPing : 20; + if (serverAcceptPing !=3D 0) + { + waitingTime =3D serverPing > serverAcceptPing ? serverAccep= tPing : serverPing; + } + else + { + waitingTime =3D serverPing; + } + } + } + = + public void run() + { + long lastPing =3D 0; + long lastAccepted =3D System.currentTimeMillis(); + = + synchronized (this) + { + while (!shutdown) + { + long dur1 =3D 0; + long dur2 =3D 0; + = + if (serverPing !=3D 0) + { + if (pings =3D=3D 0) + { + dur1 =3D System.currentTimeMillis() - lastPing; + if (dur1 >=3D serverPing) + { + lastPing =3D System.currentTimeMillis(); + connection.ping(); + dur1 =3D 0; + } + } + else + { + dur1 =3D 5; + pings =3D 0; + } + } + + if (serverAcceptPing !=3D 0) + { + if (accepts =3D=3D 0) + { + dur2 =3D System.currentTimeMillis() - lastAccepted; + if (dur2 > (2 * serverAcceptPing)) + { + connection.setValid(false); + shutdown =3D true; + break; + } + } + else + { + lastAccepted =3D System.currentTimeMillis(); + accepts =3D 0; + } + } + = + long waitTime1 =3D serverPing - dur1; + long waitTime2 =3D serverAcceptPing*2 - dur2; + + long waitTime =3D waitTime1 < waitTime2 ? waitTime1 : waitT= ime2; + = + try + { + this.wait(waitTime); + } + catch (InterruptedException e) + { + } + } + } + } + } + } --===============9016937098290607611==--