[hornetq-commits] JBoss hornetq SVN: r11985 - trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v11.

do-not-reply at jboss.org do-not-reply at jboss.org
Fri Jan 6 07:15:08 EST 2012


Author: borges
Date: 2012-01-06 07:15:07 -0500 (Fri, 06 Jan 2012)
New Revision: 11985

Modified:
   trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v11/StompFrameHandlerV11.java
Log:
Fix waitTime comparison (which used waitTime1 twice instead of using waitTime2).

Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v11/StompFrameHandlerV11.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v11/StompFrameHandlerV11.java	2012-01-06 12:10:51 UTC (rev 11984)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/protocol/stomp/v11/StompFrameHandlerV11.java	2012-01-06 12:15:07 UTC (rev 11985)
@@ -28,15 +28,15 @@
 import org.hornetq.core.protocol.stomp.VersionedStompFrameHandler;
 
 /**
- * 
+ *
  * @author <a href="mailto:hgao at redhat.com">Howard Gao</a>
  */
 public class StompFrameHandlerV11 extends VersionedStompFrameHandler implements FrameEventListener
 {
    private static final Logger log = Logger.getLogger(StompFrameHandlerV11.class);
-   
+
    private static final char ESC_CHAR = '\\';
-   
+
    private HeartBeater heartBeater;
 
    public StompFrameHandlerV11(StompConnection connection)
@@ -130,12 +130,12 @@
       {
          throw new HornetQStompException("Incorrect heartbeat header " + heartBeatHeader);
       }
-      
+
       //client ping
       long minPingInterval = Long.valueOf(params[0]);
       //client receive ping
       long minAcceptInterval = Long.valueOf(params[1]);
-      
+
       if ((minPingInterval != 0) || (minAcceptInterval != 0))
       {
          heartBeater = new HeartBeater(minPingInterval, minAcceptInterval);
@@ -177,7 +177,7 @@
           response = new HornetQStompException("Must specify the subscription's id").getFrame();
           return response;
       }
-      
+
       try
       {
          connection.unsubscribe(subscriptionID);
@@ -193,7 +193,7 @@
    public StompFrame onAck(StompFrame request)
    {
       StompFrame response = null;
-      
+
       String messageID = request.getHeader(Stomp.Headers.Ack.MESSAGE_ID);
       String txID = request.getHeader(Stomp.Headers.TRANSACTION);
       String subscriptionID = request.getHeader(Stomp.Headers.Ack.SUBSCRIPTION);
@@ -202,13 +202,13 @@
       {
          log.warn("Transactional acknowledgement is not supported");
       }
-      
+
       if (subscriptionID == null)
       {
          response = new HornetQStompException("subscription header is required").getFrame();
          return response;
       }
-      
+
       try
       {
          connection.acknowledge(messageID, subscriptionID);
@@ -247,7 +247,7 @@
          //kick off the pinger
          startHeartBeat();
       }
-      
+
       if (reply.needsDisconnect())
       {
          connection.disconnect();
@@ -261,7 +261,7 @@
          }
       }
    }
-   
+
    private void startHeartBeat()
    {
       if (heartBeater != null)
@@ -269,25 +269,25 @@
          heartBeater.start();
       }
    }
-   
+
    public StompFrame createPingFrame() throws UnsupportedEncodingException
    {
       StompFrame frame = new StompFrame(Stomp.Commands.STOMP);
       frame.setPing(true);
       return frame;
    }
-   
-   //server heart beat 
-   //algorithm: 
-   //(a) server ping: if server hasn't sent any frame within serverPing 
-   //interval, send a ping. 
+
+   //server heart beat
+   //algorithm:
+   //(a) server ping: if server hasn't sent any frame within serverPing
+   //interval, send a ping.
    //(b) accept ping: if server hasn't received any frame within
    // 2*serverAcceptPing, disconnect!
    private class HeartBeater extends Thread
    {
       final int MIN_SERVER_PING = 500;
       final int MIN_CLIENT_PING = 500;
-      
+
       long serverPing = 0;
       long serverAcceptPing = 0;
       volatile boolean shutdown = false;
@@ -301,13 +301,13 @@
          {
             serverAcceptPing = clientPing > MIN_CLIENT_PING ? clientPing : MIN_CLIENT_PING;
          }
-         
+
          if (clientAcceptPing != 0)
          {
             serverPing = clientAcceptPing > MIN_SERVER_PING ? clientAcceptPing : MIN_SERVER_PING;
          }
       }
-      
+
       public synchronized void shutdown()
       {
          shutdown = true;
@@ -336,14 +336,14 @@
          {
             log.error("Cannot create ping frame due to encoding problem.", e1);
          }
-         
+
          synchronized (this)
          {
             while (!shutdown)
             {
                long dur1 = 0;
                long dur2 = 0;
-               
+
                if (serverPing != 0)
                {
                   dur1 = System.currentTimeMillis() - lastPingTime.get();
@@ -358,7 +358,7 @@
                if (serverAcceptPing != 0)
                {
                   dur2 = System.currentTimeMillis() - lastAccepted.get();
-                  
+
                   if (dur2 > (2 * serverAcceptPing))
                   {
                      connection.disconnect();
@@ -366,25 +366,25 @@
                      break;
                   }
                }
-               
+
                long waitTime1 = 0;
                long waitTime2 = 0;
-               
+
                if (serverPing > 0)
                {
                   waitTime1 = serverPing - dur1;
                }
-               
+
                if (serverAcceptPing > 0)
                {
                   waitTime2 = serverAcceptPing * 2 - dur2;
                }
-               
+
                long waitTime = 10l;
-               
-               if ((waitTime1 > 0) && (waitTime1 > 0))
+
+               if ((waitTime1 > 0) && (waitTime2 > 0))
                {
-                  waitTime = waitTime1 < waitTime2 ? waitTime1 : waitTime2;
+                  waitTime = Math.min(waitTime1, waitTime2);
                }
                else if (waitTime1 > 0)
                {
@@ -394,7 +394,7 @@
                {
                   waitTime = waitTime2;
                }
-               
+
                try
                {
                   this.wait(waitTime);
@@ -426,7 +426,7 @@
    {
       return new StompFrameV11(command);
    }
-   
+
    //all frame except CONNECT are decoded here.
    @Override
    public StompFrame decode(StompDecoder decoder, final HornetQBuffer buffer) throws HornetQStompException
@@ -441,11 +441,11 @@
       buffer.readBytes(decoder.workingBuffer, decoder.data, readable);
 
       decoder.data += readable;
-      
+
       if (decoder.command == null)
       {
          int offset = 0;
-         
+
          //check for ping
          while (decoder.workingBuffer[offset] == StompDecoder.NEW_LINE)
          {
@@ -466,7 +466,7 @@
                return null;
             }
          }
-            
+
          if (decoder.data < 4)
          {
             // Need at least four bytes to identify the command
@@ -527,7 +527,7 @@
                   decoder.command = StompDecoder.COMMAND_COMMIT;
                }
                /**** added by meddy, 27 april 2011, handle header parser for reply to websocket protocol ****/
-               else if (decoder.workingBuffer[offset+7] == StompDecoder.E) 
+               else if (decoder.workingBuffer[offset+7] == StompDecoder.E)
                {
                   if (!decoder.tryIncrement(offset + StompDecoder.COMMAND_CONNECTED_LENGTH + 1))
                   {
@@ -535,7 +535,7 @@
                   }
 
                   // CONNECTED
-                  decoder.command = StompDecoder.COMMAND_CONNECTED;                  
+                  decoder.command = StompDecoder.COMMAND_CONNECTED;
                }
                /**** end ****/
                else
@@ -680,8 +680,8 @@
          // Now the headers
 
          boolean isEscaping = false;
-         SimpleBytes holder = new SimpleBytes(1024);      
-         
+         SimpleBytes holder = new SimpleBytes(1024);
+
          outer: while (true)
          {
             byte b = decoder.workingBuffer[decoder.pos++];
@@ -724,7 +724,7 @@
                         {
                            throw new HornetQStompException("Encoding exception", e);
                         }
-                        
+
                         holder.reset();
 
                         decoder.inHeaderName = false;
@@ -772,14 +772,14 @@
                      throw new HornetQStompException("Encoding exception.", e);
                   }
                   holder.reset();
-                  
+
                   decoder.headers.put(decoder.headerName, headerValue);
 
                   if (decoder.headerName.equals(StompDecoder.CONTENT_LENGTH_HEADER_NAME))
                   {
                      decoder.contentLength = Integer.parseInt(headerValue);
                   }
-                  
+
                   if (decoder.headerName.equals(StompDecoder.CONTENT_TYPE_HEADER_NAME))
                   {
                      decoder.contentType = headerValue;
@@ -800,7 +800,7 @@
                   decoder.whiteSpaceOnly = false;
 
                   decoder.headerValueWhitespace = false;
-                  
+
                   holder.append(b);
                }
             }
@@ -814,7 +814,7 @@
       }
 
       // Now the body
-      
+
       byte[] content = null;
 
       if (decoder.contentLength != -1)
@@ -830,7 +830,7 @@
             System.arraycopy(decoder.workingBuffer, decoder.pos, content, 0, decoder.contentLength);
 
             decoder.pos += decoder.contentLength;
-            
+
             //drain all the rest
             if (decoder.bodyStart == -1)
             {
@@ -867,7 +867,7 @@
             }
          }
       }
-      
+
       if (content != null)
       {
          if (decoder.data > decoder.pos)
@@ -894,5 +894,5 @@
          return null;
       }
    }
-   
+
 }



More information about the hornetq-commits mailing list