From do-not-reply at jboss.org Mon Sep 5 10:18:22 2011
Content-Type: multipart/mixed; boundary="===============9016937098290607611=="
MIME-Version: 1.0
From: do-not-reply at jboss.org
To: hornetq-commits at lists.jboss.org
Subject: [hornetq-commits] JBoss hornetq SVN: r11295 - in
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp:
v11 and 1 other directory.
Date: Mon, 05 Sep 2011 10:18:21 -0400
Message-ID: <201109051418.p85EILui027835@svn01.web.mwc.hst.phx2.redhat.com>
--===============9016937098290607611==
Content-Type: text/plain; charset="utf-8"
MIME-Version: 1.0
Content-Transfer-Encoding: quoted-printable
Author: gaohoward
Date: 2011-09-05 10:18:21 -0400 (Mon, 05 Sep 2011)
New Revision: 11295
Added:
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/st=
omp/FrameEventListener.java
Modified:
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/st=
omp/HornetQStompException.java
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/st=
omp/Stomp.java
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/st=
omp/StompConnection.java
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/st=
omp/StompFrame.java
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/st=
omp/StompSession.java
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/st=
omp/v11/StompFrameHandlerV11.java
Log:
more coding
Added: branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protoco=
l/stomp/FrameEventListener.java
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D
--- branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/s=
tomp/FrameEventListener.java (rev 0)
+++ branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/s=
tomp/FrameEventListener.java 2011-09-05 14:18:21 UTC (rev 11295)
@@ -0,0 +1,29 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.hornetq.core.protocol.stomp;
+
+/**
+ * =
+ * @author Howard Gao
+ */
+public interface FrameEventListener
+{
+
+ void replySent(StompFrame reply);
+
+}
Modified: branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/prot=
ocol/stomp/HornetQStompException.java
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D
--- branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/s=
tomp/HornetQStompException.java 2011-09-05 00:37:40 UTC (rev 11294)
+++ branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/s=
tomp/HornetQStompException.java 2011-09-05 14:18:21 UTC (rev 11295)
@@ -15,6 +15,10 @@
import java.util.ArrayList;
import java.util.List;
=
+/**
+ * =
+ * @author Howard Gao
+ */
public class HornetQStompException extends Exception {
=
private static final long serialVersionUID =3D -274452327574950068L;
Modified: branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/prot=
ocol/stomp/Stomp.java
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D
--- branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/s=
tomp/Stomp.java 2011-09-05 00:37:40 UTC (rev 11294)
+++ branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/s=
tomp/Stomp.java 2011-09-05 14:18:21 UTC (rev 11295)
@@ -168,6 +168,8 @@
//1.1
String ACCEPT_VERSION =3D "accept-version";
String HOST =3D "host";
+
+ Object HEART_BEAT =3D "heart-beat";
}
=
public interface Error
@@ -189,6 +191,8 @@
String VERSION =3D "version";
=
String SERVER =3D "server";
+
+ String HEART_BEAT =3D "heart-beat";
}
=
public interface Ack
Modified: branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/prot=
ocol/stomp/StompConnection.java
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D
--- branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/s=
tomp/StompConnection.java 2011-09-05 00:37:40 UTC (rev 11294)
+++ branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/s=
tomp/StompConnection.java 2011-09-05 14:18:21 UTC (rev 11295)
@@ -76,6 +76,8 @@
private VersionedStompFrameHandler frameHandler;
=
private boolean initialized;
+ =
+ private FrameEventListener stompListener;
=
public StompDecoder getDecoder()
{
@@ -444,6 +446,10 @@
if (reply !=3D null)
{
sendFrame(reply);
+ if (stompListener !=3D null)
+ {
+ stompListener.replySent(reply);
+ }
}
}
=
@@ -662,8 +668,13 @@
}
=
public StompFrame createStompMessage(ServerMessage serverMessage,
- StompSubscription subscription, int deliveryCount)
+ StompSubscription subscription, int deliveryCount) throws Excepti=
on
{
return frameHandler.createMessageFrame(serverMessage, subscription, =
deliveryCount);
}
+
+ public void addStompEventListener(FrameEventListener listener)
+ {
+ this.stompListener =3D listener;
+ }
}
Modified: branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/prot=
ocol/stomp/StompFrame.java
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D
--- branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/s=
tomp/StompFrame.java 2011-09-05 00:37:40 UTC (rev 11294)
+++ branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/s=
tomp/StompFrame.java 2011-09-05 14:18:21 UTC (rev 11295)
@@ -56,10 +56,18 @@
=
private int size;
=
+ private boolean disconnect;
+ =
public StompFrame(String command)
{
+ this(command, false);
+ }
+
+ public StompFrame(String command, boolean disconnect)
+ {
this.command =3D command;
this.headers =3D new LinkedHashMap();
+ this.disconnect =3D disconnect;
}
=
public String getCommand()
@@ -179,4 +187,9 @@
}
return new byte[0];
}
+
+ public boolean needsDisconnect()
+ {
+ return disconnect;
+ }
}
Modified: branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/prot=
ocol/stomp/StompSession.java
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D
--- branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/s=
tomp/StompSession.java 2011-09-05 00:37:40 UTC (rev 11294)
+++ branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/s=
tomp/StompSession.java 2011-09-05 14:18:21 UTC (rev 11295)
@@ -105,7 +105,7 @@
}
catch (Exception e)
{
- e.printStackTrace();
+ log.error("Error delivering stomp messages", e);
return 0;
}
=
Modified: branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/prot=
ocol/stomp/v11/StompFrameHandlerV11.java
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D
--- branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/s=
tomp/v11/StompFrameHandlerV11.java 2011-09-05 00:37:40 UTC (rev 11294)
+++ branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/s=
tomp/v11/StompFrameHandlerV11.java 2011-09-05 14:18:21 UTC (rev 11295)
@@ -12,7 +12,6 @@
*/
package org.hornetq.core.protocol.stomp.v11;
=
-import java.io.UnsupportedEncodingException;
import java.util.Map;
=
import org.hornetq.api.core.HornetQBuffer;
@@ -20,6 +19,7 @@
import org.hornetq.api.core.SimpleString;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.message.impl.MessageImpl;
+import org.hornetq.core.protocol.stomp.FrameEventListener;
import org.hornetq.core.protocol.stomp.HornetQStompException;
import org.hornetq.core.protocol.stomp.Stomp;
import org.hornetq.core.protocol.stomp.StompConnection;
@@ -27,20 +27,24 @@
import org.hornetq.core.protocol.stomp.StompSubscription;
import org.hornetq.core.protocol.stomp.StompUtils;
import org.hornetq.core.protocol.stomp.VersionedStompFrameHandler;
-import org.hornetq.core.protocol.stomp.Stomp.Headers;
-import org.hornetq.core.protocol.stomp.v10.StompFrameHandlerV10;
import org.hornetq.core.server.ServerMessage;
import org.hornetq.core.server.impl.ServerMessageImpl;
-import org.hornetq.spi.core.security.HornetQSecurityManager;
import org.hornetq.utils.DataConstants;
=
-public class StompFrameHandlerV11 extends VersionedStompFrameHandler
+/**
+ * =
+ * @author Howard Gao
+ */
+public class StompFrameHandlerV11 extends VersionedStompFrameHandler imple=
ments FrameEventListener
{
private static final Logger log =3D Logger.getLogger(StompFrameHandlerV=
11.class);
+ =
+ private HeartBeater heartBeater;
=
public StompFrameHandlerV11(StompConnection connection)
{
this.connection =3D connection;
+ connection.addStompEventListener(this);
}
=
@Override
@@ -53,43 +57,81 @@
String clientID =3D headers.get(Stomp.Headers.Connect.CLIENT_ID);
String requestID =3D headers.get(Stomp.Headers.Connect.REQUEST_ID);
=
- if (connection.validateUser(login, passcode))
+ try
{
- connection.setClientID(clientID);
- connection.setValid(true);
- =
- response =3D new StompFrame(Stomp.Responses.CONNECTED);
- =
- //version
- response.addHeader(Stomp.Headers.Connected.VERSION, connection.ge=
tVersion());
- =
- //session
- response.addHeader(Stomp.Headers.Connected.SESSION, connection.ge=
tID().toString());
- =
- //server
- response.addHeader(Stomp.Headers.Connected.SERVER, connection.get=
HornetQServerName());
- =
- if (requestID !=3D null)
+ if (connection.validateUser(login, passcode))
{
- response.addHeader(Stomp.Headers.Connected.RESPONSE_ID, reques=
tID);
+ connection.setClientID(clientID);
+ connection.setValid(true);
+
+ response =3D new StompFrame(Stomp.Responses.CONNECTED);
+
+ // version
+ response.addHeader(Stomp.Headers.Connected.VERSION,
+ connection.getVersion());
+
+ // session
+ response.addHeader(Stomp.Headers.Connected.SESSION, connection
+ .getID().toString());
+
+ // server
+ response.addHeader(Stomp.Headers.Connected.SERVER,
+ connection.getHornetQServerName());
+
+ if (requestID !=3D null)
+ {
+ response.addHeader(Stomp.Headers.Connected.RESPONSE_ID,
+ requestID);
+ }
+
+ // heart-beat. We need to start after connected frame has been=
sent.
+ // otherwise the client may receive heart-beat before it recei=
ves
+ // connected frame.
+ String heartBeat =3D headers.get(Stomp.Headers.Connect.HEART_B=
EAT);
+
+ if (heartBeat !=3D null)
+ {
+ handleHeartBeat(heartBeat);
+ response.addHeader(Stomp.Headers.Connected.HEART_BEAT, "20,=
100");
+ }
}
+ else
+ {
+ // not valid
+ response =3D new StompFrame(Stomp.Responses.ERROR, true);
+ response.addHeader(Stomp.Headers.Error.VERSION, "1.0,1.1");
+
+ response.setBody("Supported protocol versions are 1.0 and 1.1"=
);
+ }
}
- else
+ catch (HornetQStompException e)
{
- //not valid
- response =3D new StompFrame(Stomp.Responses.ERROR);
- response.addHeader(Stomp.Headers.Error.VERSION, "1.0,1.1");
- =
- response.setBody("Supported protocol versions are 1.0 and 1.1");
- =
- connection.sendFrame(response);
- connection.destroy();
- =
- return null;
+ response =3D e.getFrame();
}
return response;
}
=
+ //ping parameters, hard-code for now
+ //the server can support min 20 milliseconds and receive ping at 100 mi=
lliseconds (20,100)
+ private void handleHeartBeat(String heartBeatHeader) throws HornetQStom=
pException
+ {
+ String[] params =3D heartBeatHeader.split(",");
+ if (params.length !=3D 2)
+ {
+ throw new HornetQStompException("Incorrect heartbeat header " + h=
eartBeatHeader);
+ }
+ =
+ //client ping
+ long minPingInterval =3D Long.valueOf(params[0]);
+ //client receive ping
+ long minAcceptInterval =3D Long.valueOf(params[1]);
+ =
+ if ((minPingInterval !=3D 0) || (minAcceptInterval !=3D 0))
+ {
+ heartBeater =3D new HeartBeater(minPingInterval, minAcceptInterva=
l);
+ }
+ }
+
@Override
public StompFrame onDisconnect(StompFrame frame)
{
@@ -359,4 +401,129 @@
=
}
=
+ @Override
+ public void replySent(StompFrame reply)
+ {
+ if (reply.getCommand().equals(Stomp.Responses.CONNECTED))
+ {
+ //kick off the pinger
+ startHeartBeat();
+ }
+ if (reply.needsDisconnect())
+ {
+ connection.destroy();
+ }
+ }
+ =
+ private void startHeartBeat()
+ {
+ if (heartBeater !=3D null)
+ {
+ heartBeater.start();
+ }
+ }
+ =
+ //server heart beat (20,100) (hard coded)
+ //algorithm: =
+ //(a) server ping: if server hasn't sent any frame within serverPing =
+ //interval, send a ping. =
+ //(b) accept ping: if server hasn't received any frame within
+ // 2*serverAcceptPing, disconnect!
+ private class HeartBeater extends Thread
+ {
+ long serverPing =3D 0;
+ long serverAcceptPing =3D 0;
+ long waitingTime =3D 0;
+ volatile boolean shutdown =3D false;
+ volatile long pings =3D 0;
+ volatile long accepts =3D 0;
+
+ public HeartBeater(long clientPing, long clientAcceptPing)
+ {
+ if (clientPing !=3D 0)
+ {
+ serverAcceptPing =3D clientPing > 100 ? clientPing : 100;
+ }
+ =
+ if (clientAcceptPing !=3D 0)
+ {
+ serverPing =3D clientAcceptPing > 20 ? clientAcceptPing : 20;
+ if (serverAcceptPing !=3D 0)
+ {
+ waitingTime =3D serverPing > serverAcceptPing ? serverAccep=
tPing : serverPing;
+ }
+ else
+ {
+ waitingTime =3D serverPing;
+ }
+ }
+ }
+ =
+ public void run()
+ {
+ long lastPing =3D 0;
+ long lastAccepted =3D System.currentTimeMillis();
+ =
+ synchronized (this)
+ {
+ while (!shutdown)
+ {
+ long dur1 =3D 0;
+ long dur2 =3D 0;
+ =
+ if (serverPing !=3D 0)
+ {
+ if (pings =3D=3D 0)
+ {
+ dur1 =3D System.currentTimeMillis() - lastPing;
+ if (dur1 >=3D serverPing)
+ {
+ lastPing =3D System.currentTimeMillis();
+ connection.ping();
+ dur1 =3D 0;
+ }
+ }
+ else
+ {
+ dur1 =3D 5;
+ pings =3D 0;
+ }
+ }
+
+ if (serverAcceptPing !=3D 0)
+ {
+ if (accepts =3D=3D 0)
+ {
+ dur2 =3D System.currentTimeMillis() - lastAccepted;
+ if (dur2 > (2 * serverAcceptPing))
+ {
+ connection.setValid(false);
+ shutdown =3D true;
+ break;
+ }
+ }
+ else
+ {
+ lastAccepted =3D System.currentTimeMillis();
+ accepts =3D 0;
+ }
+ }
+ =
+ long waitTime1 =3D serverPing - dur1;
+ long waitTime2 =3D serverAcceptPing*2 - dur2;
+
+ long waitTime =3D waitTime1 < waitTime2 ? waitTime1 : waitT=
ime2;
+ =
+ try
+ {
+ this.wait(waitTime);
+ }
+ catch (InterruptedException e)
+ {
+ }
+ }
+ }
+ }
+ }
+
}
--===============9016937098290607611==--