Author: gaohoward
Date: 2011-09-05 10:18:21 -0400 (Mon, 05 Sep 2011)
New Revision: 11295
Added:
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/FrameEventListener.java
Modified:
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/HornetQStompException.java
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/Stomp.java
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompConnection.java
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompFrame.java
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompSession.java
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v11/StompFrameHandlerV11.java
Log:
more coding
Added:
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/FrameEventListener.java
===================================================================
---
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/FrameEventListener.java
(rev 0)
+++
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/FrameEventListener.java 2011-09-05
14:18:21 UTC (rev 11295)
@@ -0,0 +1,29 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.hornetq.core.protocol.stomp;
+
+/**
+ *
+ * @author <a href="mailto:hgao@redhat.com">Howard Gao</a>
+ */
+public interface FrameEventListener
+{
+
+ void replySent(StompFrame reply);
+
+}
Modified:
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/HornetQStompException.java
===================================================================
---
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/HornetQStompException.java 2011-09-05
00:37:40 UTC (rev 11294)
+++
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/HornetQStompException.java 2011-09-05
14:18:21 UTC (rev 11295)
@@ -15,6 +15,10 @@
import java.util.ArrayList;
import java.util.List;
+/**
+ *
+ * @author <a href="mailto:hgao@redhat.com">Howard Gao</a>
+ */
public class HornetQStompException extends Exception {
private static final long serialVersionUID = -274452327574950068L;
Modified:
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/Stomp.java
===================================================================
---
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/Stomp.java 2011-09-05
00:37:40 UTC (rev 11294)
+++
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/Stomp.java 2011-09-05
14:18:21 UTC (rev 11295)
@@ -168,6 +168,8 @@
//1.1
String ACCEPT_VERSION = "accept-version";
String HOST = "host";
+
+ Object HEART_BEAT = "heart-beat";
}
public interface Error
@@ -189,6 +191,8 @@
String VERSION = "version";
String SERVER = "server";
+
+ String HEART_BEAT = "heart-beat";
}
public interface Ack
Modified:
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompConnection.java
===================================================================
---
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompConnection.java 2011-09-05
00:37:40 UTC (rev 11294)
+++
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompConnection.java 2011-09-05
14:18:21 UTC (rev 11295)
@@ -76,6 +76,8 @@
private VersionedStompFrameHandler frameHandler;
private boolean initialized;
+
+ private FrameEventListener stompListener;
public StompDecoder getDecoder()
{
@@ -444,6 +446,10 @@
if (reply != null)
{
sendFrame(reply);
+ if (stompListener != null)
+ {
+ stompListener.replySent(reply);
+ }
}
}
@@ -662,8 +668,13 @@
}
public StompFrame createStompMessage(ServerMessage serverMessage,
- StompSubscription subscription, int deliveryCount)
+ StompSubscription subscription, int deliveryCount) throws Exception
{
return frameHandler.createMessageFrame(serverMessage, subscription,
deliveryCount);
}
+
+ public void addStompEventListener(FrameEventListener listener)
+ {
+ this.stompListener = listener;
+ }
}
Modified:
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompFrame.java
===================================================================
---
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompFrame.java 2011-09-05
00:37:40 UTC (rev 11294)
+++
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompFrame.java 2011-09-05
14:18:21 UTC (rev 11295)
@@ -56,10 +56,18 @@
private int size;
+ private boolean disconnect;
+
public StompFrame(String command)
{
+ this(command, false);
+ }
+
+ public StompFrame(String command, boolean disconnect)
+ {
this.command = command;
this.headers = new LinkedHashMap<String, String>();
+ this.disconnect = disconnect;
}
public String getCommand()
@@ -179,4 +187,9 @@
}
return new byte[0];
}
+
+ public boolean needsDisconnect()
+ {
+ return disconnect;
+ }
}
Modified:
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompSession.java
===================================================================
---
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompSession.java 2011-09-05
00:37:40 UTC (rev 11294)
+++
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompSession.java 2011-09-05
14:18:21 UTC (rev 11295)
@@ -105,7 +105,7 @@
}
catch (Exception e)
{
- e.printStackTrace();
+ log.error("Error delivering stomp messages", e);
return 0;
}
Modified:
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v11/StompFrameHandlerV11.java
===================================================================
---
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v11/StompFrameHandlerV11.java 2011-09-05
00:37:40 UTC (rev 11294)
+++
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v11/StompFrameHandlerV11.java 2011-09-05
14:18:21 UTC (rev 11295)
@@ -12,7 +12,6 @@
*/
package org.hornetq.core.protocol.stomp.v11;
-import java.io.UnsupportedEncodingException;
import java.util.Map;
import org.hornetq.api.core.HornetQBuffer;
@@ -20,6 +19,7 @@
import org.hornetq.api.core.SimpleString;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.message.impl.MessageImpl;
+import org.hornetq.core.protocol.stomp.FrameEventListener;
import org.hornetq.core.protocol.stomp.HornetQStompException;
import org.hornetq.core.protocol.stomp.Stomp;
import org.hornetq.core.protocol.stomp.StompConnection;
@@ -27,20 +27,24 @@
import org.hornetq.core.protocol.stomp.StompSubscription;
import org.hornetq.core.protocol.stomp.StompUtils;
import org.hornetq.core.protocol.stomp.VersionedStompFrameHandler;
-import org.hornetq.core.protocol.stomp.Stomp.Headers;
-import org.hornetq.core.protocol.stomp.v10.StompFrameHandlerV10;
import org.hornetq.core.server.ServerMessage;
import org.hornetq.core.server.impl.ServerMessageImpl;
-import org.hornetq.spi.core.security.HornetQSecurityManager;
import org.hornetq.utils.DataConstants;
-public class StompFrameHandlerV11 extends VersionedStompFrameHandler
+/**
+ *
+ * @author <a href="mailto:hgao@redhat.com">Howard Gao</a>
+ */
+public class StompFrameHandlerV11 extends VersionedStompFrameHandler implements
FrameEventListener
{
private static final Logger log = Logger.getLogger(StompFrameHandlerV11.class);
+
+ private HeartBeater heartBeater;
public StompFrameHandlerV11(StompConnection connection)
{
this.connection = connection;
+ connection.addStompEventListener(this);
}
@Override
@@ -53,43 +57,81 @@
String clientID = headers.get(Stomp.Headers.Connect.CLIENT_ID);
String requestID = headers.get(Stomp.Headers.Connect.REQUEST_ID);
- if (connection.validateUser(login, passcode))
+ try
{
- connection.setClientID(clientID);
- connection.setValid(true);
-
- response = new StompFrame(Stomp.Responses.CONNECTED);
-
- //version
- response.addHeader(Stomp.Headers.Connected.VERSION, connection.getVersion());
-
- //session
- response.addHeader(Stomp.Headers.Connected.SESSION,
connection.getID().toString());
-
- //server
- response.addHeader(Stomp.Headers.Connected.SERVER,
connection.getHornetQServerName());
-
- if (requestID != null)
+ if (connection.validateUser(login, passcode))
{
- response.addHeader(Stomp.Headers.Connected.RESPONSE_ID, requestID);
+ connection.setClientID(clientID);
+ connection.setValid(true);
+
+ response = new StompFrame(Stomp.Responses.CONNECTED);
+
+ // version
+ response.addHeader(Stomp.Headers.Connected.VERSION,
+ connection.getVersion());
+
+ // session
+ response.addHeader(Stomp.Headers.Connected.SESSION, connection
+ .getID().toString());
+
+ // server
+ response.addHeader(Stomp.Headers.Connected.SERVER,
+ connection.getHornetQServerName());
+
+ if (requestID != null)
+ {
+ response.addHeader(Stomp.Headers.Connected.RESPONSE_ID,
+ requestID);
+ }
+
+ // heart-beat. We need to start after connected frame has been sent.
+ // otherwise the client may receive heart-beat before it receives
+ // connected frame.
+ String heartBeat = headers.get(Stomp.Headers.Connect.HEART_BEAT);
+
+ if (heartBeat != null)
+ {
+ handleHeartBeat(heartBeat);
+ response.addHeader(Stomp.Headers.Connected.HEART_BEAT,
"20,100");
+ }
}
+ else
+ {
+ // not valid
+ response = new StompFrame(Stomp.Responses.ERROR, true);
+ response.addHeader(Stomp.Headers.Error.VERSION, "1.0,1.1");
+
+ response.setBody("Supported protocol versions are 1.0 and 1.1");
+ }
}
- else
+ catch (HornetQStompException e)
{
- //not valid
- response = new StompFrame(Stomp.Responses.ERROR);
- response.addHeader(Stomp.Headers.Error.VERSION, "1.0,1.1");
-
- response.setBody("Supported protocol versions are 1.0 and 1.1");
-
- connection.sendFrame(response);
- connection.destroy();
-
- return null;
+ response = e.getFrame();
}
return response;
}
+ //ping parameters, hard-code for now
+ //the server can support min 20 milliseconds and receive ping at 100 milliseconds
(20,100)
+ private void handleHeartBeat(String heartBeatHeader) throws HornetQStompException
+ {
+ String[] params = heartBeatHeader.split(",");
+ if (params.length != 2)
+ {
+ throw new HornetQStompException("Incorrect heartbeat header " +
heartBeatHeader);
+ }
+
+ //client ping
+ long minPingInterval = Long.valueOf(params[0]);
+ //client receive ping
+ long minAcceptInterval = Long.valueOf(params[1]);
+
+ if ((minPingInterval != 0) || (minAcceptInterval != 0))
+ {
+ heartBeater = new HeartBeater(minPingInterval, minAcceptInterval);
+ }
+ }
+
@Override
public StompFrame onDisconnect(StompFrame frame)
{
@@ -359,4 +401,129 @@
}
+ @Override
+ public void replySent(StompFrame reply)
+ {
+ if (reply.getCommand().equals(Stomp.Responses.CONNECTED))
+ {
+ //kick off the pinger
+ startHeartBeat();
+ }
+ if (reply.needsDisconnect())
+ {
+ connection.destroy();
+ }
+ }
+
+ private void startHeartBeat()
+ {
+ if (heartBeater != null)
+ {
+ heartBeater.start();
+ }
+ }
+
+ //server heart beat (20,100) (hard coded)
+ //algorithm:
+ //(a) server ping: if server hasn't sent any frame within serverPing
+ //interval, send a ping.
+ //(b) accept ping: if server hasn't received any frame within
+ // 2*serverAcceptPing, disconnect!
+ private class HeartBeater extends Thread
+ {
+ long serverPing = 0;
+ long serverAcceptPing = 0;
+ long waitingTime = 0;
+ volatile boolean shutdown = false;
+ volatile long pings = 0;
+ volatile long accepts = 0;
+
+ public HeartBeater(long clientPing, long clientAcceptPing)
+ {
+ if (clientPing != 0)
+ {
+ serverAcceptPing = clientPing > 100 ? clientPing : 100;
+ }
+
+ if (clientAcceptPing != 0)
+ {
+ serverPing = clientAcceptPing > 20 ? clientAcceptPing : 20;
+ if (serverAcceptPing != 0)
+ {
+ waitingTime = serverPing > serverAcceptPing ? serverAcceptPing :
serverPing;
+ }
+ else
+ {
+ waitingTime = serverPing;
+ }
+ }
+ }
+
+ public void run()
+ {
+ long lastPing = 0;
+ long lastAccepted = System.currentTimeMillis();
+
+ synchronized (this)
+ {
+ while (!shutdown)
+ {
+ long dur1 = 0;
+ long dur2 = 0;
+
+ if (serverPing != 0)
+ {
+ if (pings == 0)
+ {
+ dur1 = System.currentTimeMillis() - lastPing;
+ if (dur1 >= serverPing)
+ {
+ lastPing = System.currentTimeMillis();
+ connection.ping();
+ dur1 = 0;
+ }
+ }
+ else
+ {
+ dur1 = 5;
+ pings = 0;
+ }
+ }
+
+ if (serverAcceptPing != 0)
+ {
+ if (accepts == 0)
+ {
+ dur2 = System.currentTimeMillis() - lastAccepted;
+ if (dur2 > (2 * serverAcceptPing))
+ {
+ connection.setValid(false);
+ shutdown = true;
+ break;
+ }
+ }
+ else
+ {
+ lastAccepted = System.currentTimeMillis();
+ accepts = 0;
+ }
+ }
+
+ long waitTime1 = serverPing - dur1;
+ long waitTime2 = serverAcceptPing*2 - dur2;
+
+ long waitTime = waitTime1 < waitTime2 ? waitTime1 : waitTime2;
+
+ try
+ {
+ this.wait(waitTime);
+ }
+ catch (InterruptedException e)
+ {
+ }
+ }
+ }
+ }
+ }
+
}