Author: gaohoward
Date: 2011-09-06 23:36:06 -0400 (Tue, 06 Sep 2011)
New Revision: 11296
Modified:
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/FrameEventListener.java
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompConnection.java
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompProtocolManager.java
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v11/StompFrameHandlerV11.java
Log:
heart beat
Modified:
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/FrameEventListener.java
===================================================================
---
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/FrameEventListener.java 2011-09-05
14:18:21 UTC (rev 11295)
+++
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/FrameEventListener.java 2011-09-07
03:36:06 UTC (rev 11296)
@@ -26,4 +26,6 @@
void replySent(StompFrame reply);
+ void requestAccepted(StompFrame request);
+
}
Modified:
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompConnection.java
===================================================================
---
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompConnection.java 2011-09-05
14:18:21 UTC (rev 11295)
+++
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompConnection.java 2011-09-07
03:36:06 UTC (rev 11296)
@@ -425,6 +425,12 @@
public void handleFrame(StompFrame request)
{
StompFrame reply = null;
+
+ if (stompListener != null)
+ {
+ stompListener.requestAccepted(request);
+ }
+
try
{
if (!initialized)
@@ -446,10 +452,6 @@
if (reply != null)
{
sendFrame(reply);
- if (stompListener != null)
- {
- stompListener.replySent(reply);
- }
}
}
@@ -677,4 +679,22 @@
{
this.stompListener = listener;
}
+
+ //send a ping stomp frame
+ public void ping(StompFrame pingFrame)
+ {
+ manager.sendReply(this, pingFrame);
+ }
+
+ public void physicalSend(StompFrame frame) throws Exception
+ {
+ HornetQBuffer buffer = frame.toHornetQBuffer();
+ getTransportConnection().write(buffer, false, false);
+
+ if (stompListener != null)
+ {
+ stompListener.replySent(frame);
+ }
+
+ }
}
Modified:
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompProtocolManager.java
===================================================================
---
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompProtocolManager.java 2011-09-05
14:18:21 UTC (rev 11295)
+++
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/StompProtocolManager.java 2011-09-07
03:36:06 UTC (rev 11296)
@@ -157,8 +157,7 @@
try
{
- HornetQBuffer buffer = frame.toHornetQBuffer();
- connection.getTransportConnection().write(buffer, false, false);
+ connection.physicalSend(frame);
}
catch (Exception e)
{
Modified:
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v11/StompFrameHandlerV11.java
===================================================================
---
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v11/StompFrameHandlerV11.java 2011-09-05
14:18:21 UTC (rev 11295)
+++
branches/STOMP11/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v11/StompFrameHandlerV11.java 2011-09-07
03:36:06 UTC (rev 11296)
@@ -13,6 +13,7 @@
package org.hornetq.core.protocol.stomp.v11;
import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
import org.hornetq.api.core.HornetQBuffer;
import org.hornetq.api.core.Message;
@@ -409,10 +410,19 @@
//kick off the pinger
startHeartBeat();
}
+
if (reply.needsDisconnect())
{
connection.destroy();
}
+ else
+ {
+ //update ping
+ if (heartBeater != null)
+ {
+ heartBeater.pinged();
+ }
+ }
}
private void startHeartBeat()
@@ -423,6 +433,13 @@
}
}
+ public StompFrame createPingFrame()
+ {
+ StompFrame frame = new StompFrame(Stomp.Commands.STOMP);
+ frame.setBody("\n");
+ return frame;
+ }
+
//server heart beat (20,100) (hard coded)
//algorithm:
//(a) server ping: if server hasn't sent any frame within serverPing
@@ -433,10 +450,10 @@
{
long serverPing = 0;
long serverAcceptPing = 0;
- long waitingTime = 0;
volatile boolean shutdown = false;
- volatile long pings = 0;
- volatile long accepts = 0;
+ AtomicLong lastPingTime = new AtomicLong(0);
+ AtomicLong lastAccepted = new AtomicLong(0);
+ StompFrame pingFrame;
public HeartBeater(long clientPing, long clientAcceptPing)
{
@@ -448,21 +465,18 @@
if (clientAcceptPing != 0)
{
serverPing = clientAcceptPing > 20 ? clientAcceptPing : 20;
- if (serverAcceptPing != 0)
- {
- waitingTime = serverPing > serverAcceptPing ? serverAcceptPing :
serverPing;
- }
- else
- {
- waitingTime = serverPing;
- }
}
}
+ public void pinged()
+ {
+ lastPingTime.set(System.currentTimeMillis());
+ }
+
public void run()
{
- long lastPing = 0;
- long lastAccepted = System.currentTimeMillis();
+ lastAccepted.set(System.currentTimeMillis());
+ pingFrame = createPingFrame();
synchronized (this)
{
@@ -473,44 +487,28 @@
if (serverPing != 0)
{
- if (pings == 0)
+ dur1 = System.currentTimeMillis() - lastPingTime.get();
+ if (dur1 >= serverPing)
{
- dur1 = System.currentTimeMillis() - lastPing;
- if (dur1 >= serverPing)
- {
- lastPing = System.currentTimeMillis();
- connection.ping();
- dur1 = 0;
- }
+ lastPingTime.set(System.currentTimeMillis());
+ connection.ping(pingFrame);
+ dur1 = 0;
}
- else
- {
- dur1 = 5;
- pings = 0;
- }
}
if (serverAcceptPing != 0)
{
- if (accepts == 0)
+ dur2 = System.currentTimeMillis() - lastAccepted.get();
+ if (dur2 > (2 * serverAcceptPing))
{
- dur2 = System.currentTimeMillis() - lastAccepted;
- if (dur2 > (2 * serverAcceptPing))
- {
- connection.setValid(false);
- shutdown = true;
- break;
- }
+ connection.disconnect();
+ shutdown = true;
+ break;
}
- else
- {
- lastAccepted = System.currentTimeMillis();
- accepts = 0;
- }
}
long waitTime1 = serverPing - dur1;
- long waitTime2 = serverAcceptPing*2 - dur2;
+ long waitTime2 = serverAcceptPing * 2 - dur2;
long waitTime = waitTime1 < waitTime2 ? waitTime1 : waitTime2;
@@ -524,6 +522,20 @@
}
}
}
+
+ public void pingAccepted()
+ {
+ this.lastAccepted.set(System.currentTimeMillis());
+ }
}
+ @Override
+ public void requestAccepted(StompFrame request)
+ {
+ if (heartBeater != null)
+ {
+ heartBeater.pingAccepted();
+ }
+ }
+
}