[hornetq-commits] JBoss hornetq SVN: r8855 - trunk/src/main/org/hornetq/core/protocol/stomp.

do-not-reply at jboss.org do-not-reply at jboss.org
Thu Jan 28 08:27:55 EST 2010


Author: jmesnil
Date: 2010-01-28 08:27:55 -0500 (Thu, 28 Jan 2010)
New Revision: 8855

Modified:
   trunk/src/main/org/hornetq/core/protocol/stomp/Stomp.java
   trunk/src/main/org/hornetq/core/protocol/stomp/StompConnection.java
   trunk/src/main/org/hornetq/core/protocol/stomp/StompException.java
   trunk/src/main/org/hornetq/core/protocol/stomp/StompFrameError.java
   trunk/src/main/org/hornetq/core/protocol/stomp/StompMarshaller.java
   trunk/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java
   trunk/src/main/org/hornetq/core/protocol/stomp/StompSession.java
   trunk/src/main/org/hornetq/core/protocol/stomp/StompSubscription.java
   trunk/src/main/org/hornetq/core/protocol/stomp/StompUtils.java
Log:
fix code format

Modified: trunk/src/main/org/hornetq/core/protocol/stomp/Stomp.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/stomp/Stomp.java	2010-01-28 13:26:39 UTC (rev 8854)
+++ trunk/src/main/org/hornetq/core/protocol/stomp/Stomp.java	2010-01-28 13:27:55 UTC (rev 8855)
@@ -17,108 +17,165 @@
  */
 package org.hornetq.core.protocol.stomp;
 
-
 /**
  * The standard verbs and headers used for the <a href="http://stomp.codehaus.org/">STOMP</a> protocol.
  *
  * @version $Revision: 57 $
  */
-public interface Stomp {
-    String NULL = "\u0000";
-    String NEWLINE = "\n";
+public interface Stomp
+{
+   String NULL = "\u0000";
 
-    public static interface Commands {
-        String CONNECT = "CONNECT";
-        String SEND = "SEND";
-        String DISCONNECT = "DISCONNECT";
-        String SUBSCRIBE = "SUBSCRIBE";
-        String UNSUBSCRIBE = "UNSUBSCRIBE";
-        String BEGIN_TRANSACTION = "BEGIN";
-        String COMMIT_TRANSACTION = "COMMIT";
-        String ABORT_TRANSACTION = "ABORT";
-        String BEGIN = "BEGIN";
-        String COMMIT = "COMMIT";
-        String ABORT = "ABORT";
-        String ACK = "ACK";
-    }
+   String NEWLINE = "\n";
 
-    public interface Responses {
-        String CONNECTED = "CONNECTED";
-        String ERROR = "ERROR";
-        String MESSAGE = "MESSAGE";
-        String RECEIPT = "RECEIPT";
-    }
+   public static interface Commands
+   {
+      String CONNECT = "CONNECT";
 
-    public interface Headers {
-        String SEPERATOR = ":";
-        String RECEIPT_REQUESTED = "receipt";
-        String TRANSACTION = "transaction";
-        String CONTENT_LENGTH = "content-length";
+      String SEND = "SEND";
 
-        public interface Response {
-            String RECEIPT_ID = "receipt-id";
-        }
+      String DISCONNECT = "DISCONNECT";
 
-        public interface Send {
-            String DESTINATION = "destination";
-            String CORRELATION_ID = "correlation-id";
-            String REPLY_TO = "reply-to";
-            String EXPIRATION_TIME = "expires";
-            String PRIORITY = "priority";
-            String TYPE = "type";
-            Object PERSISTENT = "persistent";
-        }
+      String SUBSCRIBE = "SUBSCRIBE";
 
-        public interface Message {
-            String MESSAGE_ID = "message-id";
-            String DESTINATION = "destination";
-            String CORRELATION_ID = "correlation-id";
-            String EXPIRATION_TIME = "expires";
-            String REPLY_TO = "reply-to";
-            String PRORITY = "priority";
-            String REDELIVERED = "redelivered";
-            String TIMESTAMP = "timestamp";
-            String TYPE = "type";
-            String SUBSCRIPTION = "subscription";
-        }
+      String UNSUBSCRIBE = "UNSUBSCRIBE";
 
-        public interface Subscribe {
-            String DESTINATION = "destination";
-            String ACK_MODE = "ack";
-            String ID = "id";
-            String SELECTOR = "selector";
-            String DURABLE_SUBSCRIPTION_NAME = "durable-subscription-name";
-            String NO_LOCAL = "no-local";
+      String BEGIN_TRANSACTION = "BEGIN";
 
-            public interface AckModeValues {
-                String AUTO = "auto";
-                String CLIENT = "client";
-            }
-        }
+      String COMMIT_TRANSACTION = "COMMIT";
 
-        public interface Unsubscribe {
-            String DESTINATION = "destination";
-            String ID = "id";
-        }
+      String ABORT_TRANSACTION = "ABORT";
 
-        public interface Connect {
-            String LOGIN = "login";
-            String PASSCODE = "passcode";
-            String CLIENT_ID = "client-id";
-            String REQUEST_ID = "request-id";
-        }
+      String BEGIN = "BEGIN";
 
-        public interface Error {
-            String MESSAGE = "message";
-        }
+      String COMMIT = "COMMIT";
 
-        public interface Connected {
-            String SESSION = "session";
-            String RESPONSE_ID = "response-id";
-        }
+      String ABORT = "ABORT";
 
-        public interface Ack {
-            String MESSAGE_ID = "message-id";
-        }
-    }
+      String ACK = "ACK";
+   }
+
+   public interface Responses
+   {
+      String CONNECTED = "CONNECTED";
+
+      String ERROR = "ERROR";
+
+      String MESSAGE = "MESSAGE";
+
+      String RECEIPT = "RECEIPT";
+   }
+
+   public interface Headers
+   {
+      String SEPERATOR = ":";
+
+      String RECEIPT_REQUESTED = "receipt";
+
+      String TRANSACTION = "transaction";
+
+      String CONTENT_LENGTH = "content-length";
+
+      public interface Response
+      {
+         String RECEIPT_ID = "receipt-id";
+      }
+
+      public interface Send
+      {
+         String DESTINATION = "destination";
+
+         String CORRELATION_ID = "correlation-id";
+
+         String REPLY_TO = "reply-to";
+
+         String EXPIRATION_TIME = "expires";
+
+         String PRIORITY = "priority";
+
+         String TYPE = "type";
+
+         Object PERSISTENT = "persistent";
+      }
+
+      public interface Message
+      {
+         String MESSAGE_ID = "message-id";
+
+         String DESTINATION = "destination";
+
+         String CORRELATION_ID = "correlation-id";
+
+         String EXPIRATION_TIME = "expires";
+
+         String REPLY_TO = "reply-to";
+
+         String PRORITY = "priority";
+
+         String REDELIVERED = "redelivered";
+
+         String TIMESTAMP = "timestamp";
+
+         String TYPE = "type";
+
+         String SUBSCRIPTION = "subscription";
+      }
+
+      public interface Subscribe
+      {
+         String DESTINATION = "destination";
+
+         String ACK_MODE = "ack";
+
+         String ID = "id";
+
+         String SELECTOR = "selector";
+
+         String DURABLE_SUBSCRIPTION_NAME = "durable-subscription-name";
+
+         String NO_LOCAL = "no-local";
+
+         public interface AckModeValues
+         {
+            String AUTO = "auto";
+
+            String CLIENT = "client";
+         }
+      }
+
+      public interface Unsubscribe
+      {
+         String DESTINATION = "destination";
+
+         String ID = "id";
+      }
+
+      public interface Connect
+      {
+         String LOGIN = "login";
+
+         String PASSCODE = "passcode";
+
+         String CLIENT_ID = "client-id";
+
+         String REQUEST_ID = "request-id";
+      }
+
+      public interface Error
+      {
+         String MESSAGE = "message";
+      }
+
+      public interface Connected
+      {
+         String SESSION = "session";
+
+         String RESPONSE_ID = "response-id";
+      }
+
+      public interface Ack
+      {
+         String MESSAGE_ID = "message-id";
+      }
+   }
 }

Modified: trunk/src/main/org/hornetq/core/protocol/stomp/StompConnection.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/stomp/StompConnection.java	2010-01-28 13:26:39 UTC (rev 8854)
+++ trunk/src/main/org/hornetq/core/protocol/stomp/StompConnection.java	2010-01-28 13:27:55 UTC (rev 8855)
@@ -39,11 +39,11 @@
    private static final Logger log = Logger.getLogger(StompConnection.class);
 
    private final StompProtocolManager manager;
-   
+
    private final Connection transportConnection;
-      
+
    private String login;
-   
+
    private String passcode;
 
    private String clientID;
@@ -52,12 +52,10 @@
 
    private boolean destroyed = false;
 
-   private final List<FailureListener> failureListeners = new CopyOnWriteArrayList<FailureListener>();
-
    StompConnection(final Connection transportConnection, final StompProtocolManager manager)
    {
       this.transportConnection = transportConnection;
-      
+
       this.manager = manager;
    }
 
@@ -67,12 +65,6 @@
 
    public void addFailureListener(FailureListener listener)
    {
-      if (listener == null)
-      {
-         throw new IllegalStateException("FailureListener cannot be null");
-      }
-
-      failureListeners.add(listener);
    }
 
    public boolean checkDataReceived()
@@ -95,8 +87,8 @@
       destroyed = true;
 
       transportConnection.close();
-      
-      callFailureListeners(new HornetQException(HornetQException.INTERNAL_ERROR, "Stomp connection destroyed"));
+
+      manager.cleanup(this);
    }
 
    public void disconnect()
@@ -108,7 +100,7 @@
    }
 
    public void flush()
-   {  
+   {
    }
 
    public List<FailureListener> getFailureListeners()
@@ -124,7 +116,7 @@
    }
 
    public String getRemoteAddress()
-   {      
+   {
       return transportConnection.getRemoteAddress();
    }
 
@@ -150,22 +142,13 @@
 
    public boolean removeFailureListener(FailureListener listener)
    {
-      if (listener == null)
-      {
-         throw new IllegalStateException("FailureListener cannot be null");
-      }
-
-      return failureListeners.remove(listener);
+      return false;
    }
 
    public void setFailureListeners(List<FailureListener> listeners)
    {
-      failureListeners.clear();
-
-      failureListeners.addAll(listeners);
    }
 
-   
    public void bufferReceived(Object connectionID, HornetQBuffer buffer)
    {
       manager.handleBuffer(this, buffer);
@@ -200,30 +183,9 @@
    {
       return valid;
    }
-   
+
    public void setValid(boolean valid)
    {
       this.valid = valid;
    }
-   
-   private void callFailureListeners(final HornetQException me)
-   {
-      final List<FailureListener> listenersClone = new ArrayList<FailureListener>(failureListeners);
-
-      for (final FailureListener listener : listenersClone)
-      {
-         try
-         {
-            listener.connectionFailed(me);
-         }
-         catch (final Throwable t)
-         {
-            // Failure of one listener to execute shouldn't prevent others
-            // from
-            // executing
-            log.error("Failed to execute failure listener", t);
-         }
-      }
-   }
-
 }

Modified: trunk/src/main/org/hornetq/core/protocol/stomp/StompException.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/stomp/StompException.java	2010-01-28 13:26:39 UTC (rev 8854)
+++ trunk/src/main/org/hornetq/core/protocol/stomp/StompException.java	2010-01-28 13:27:55 UTC (rev 8855)
@@ -22,29 +22,36 @@
 /**
  * @author <a href="http://hiramchirino.com">chirino</a>
  */
-class StompException extends IOException {
-    private static final long serialVersionUID = -2869735532997332242L;
-    private final boolean fatal;
+class StompException extends IOException
+{
+   private static final long serialVersionUID = -2869735532997332242L;
 
-    public StompException() {
-        this(null);
-    }
+   private final boolean fatal;
 
-    public StompException(String s) {
-        this(s, false);
-    }
+   public StompException()
+   {
+      this(null);
+   }
 
-    public StompException(String s, boolean fatal) {
-        this(s, fatal, null);
-    }
+   public StompException(String s)
+   {
+      this(s, false);
+   }
 
-    public StompException(String s, boolean fatal, Throwable cause) {
-        super(s);
-        this.fatal = fatal;
-        initCause(cause);
-    }
+   public StompException(String s, boolean fatal)
+   {
+      this(s, fatal, null);
+   }
 
-    public boolean isFatal() {
-        return fatal;
-    }
+   public StompException(String s, boolean fatal, Throwable cause)
+   {
+      super(s);
+      this.fatal = fatal;
+      initCause(cause);
+   }
+
+   public boolean isFatal()
+   {
+      return fatal;
+   }
 }

Modified: trunk/src/main/org/hornetq/core/protocol/stomp/StompFrameError.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/stomp/StompFrameError.java	2010-01-28 13:26:39 UTC (rev 8854)
+++ trunk/src/main/org/hornetq/core/protocol/stomp/StompFrameError.java	2010-01-28 13:27:55 UTC (rev 8855)
@@ -22,14 +22,17 @@
  *
  * @author <a href="http://hiramchirino.com">chirino</a>
  */
-class StompFrameError extends StompFrame {
-    private final StompException exception;
+class StompFrameError extends StompFrame
+{
+   private final StompException exception;
 
-    public StompFrameError(StompException exception) {
-        this.exception = exception;
-    }
+   public StompFrameError(StompException exception)
+   {
+      this.exception = exception;
+   }
 
-    public StompException getException() {
-        return exception;
-    }
+   public StompException getException()
+   {
+      return exception;
+   }
 }

Modified: trunk/src/main/org/hornetq/core/protocol/stomp/StompMarshaller.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/stomp/StompMarshaller.java	2010-01-28 13:26:39 UTC (rev 8854)
+++ trunk/src/main/org/hornetq/core/protocol/stomp/StompMarshaller.java	2010-01-28 13:27:55 UTC (rev 8855)
@@ -30,169 +30,206 @@
 /**
  * Implements marshalling and unmarsalling the <a href="http://stomp.codehaus.org/">Stomp</a> protocol.
  */
-class StompMarshaller {
-    public static final byte[] NO_DATA = new byte[]{};
-    private static final byte[] END_OF_FRAME = new byte[]{0, '\n'};
-    private static final int MAX_COMMAND_LENGTH = 1024;
-    private static final int MAX_HEADER_LENGTH = 1024 * 10;
-    private static final int MAX_HEADERS = 1000;
-    private static final int MAX_DATA_LENGTH = 1024 * 1024 * 100;
-    private int version = 1;
+class StompMarshaller
+{
+   public static final byte[] NO_DATA = new byte[] {};
 
-    public int getVersion() {
-        return version;
-    }
+   private static final byte[] END_OF_FRAME = new byte[] { 0, '\n' };
 
-    public void setVersion(int version) {
-        this.version = version;
-    }
+   private static final int MAX_COMMAND_LENGTH = 1024;
 
-    public byte[] marshal(StompFrame command) throws IOException {
-        ByteArrayOutputStream baos = new ByteArrayOutputStream();
-        DataOutputStream dos = new DataOutputStream(baos);
-        marshal(command, dos);
-        dos.close();
-        return baos.toByteArray();
-    }
+   private static final int MAX_HEADER_LENGTH = 1024 * 10;
 
-    public void marshal(StompFrame stomp, DataOutput os) throws IOException {
-        StringBuffer buffer = new StringBuffer();
-        buffer.append(stomp.getCommand());
-        buffer.append(Stomp.NEWLINE);
+   private static final int MAX_HEADERS = 1000;
 
-        // Output the headers.
-        for (Iterator<Map.Entry<String, Object>> iter = stomp.getHeaders().entrySet().iterator(); iter.hasNext();) {
-            Map.Entry<String, Object> entry = iter.next();
-            buffer.append(entry.getKey());
-            buffer.append(Stomp.Headers.SEPERATOR);
-            buffer.append(entry.getValue());
-            buffer.append(Stomp.NEWLINE);
-        }
+   private static final int MAX_DATA_LENGTH = 1024 * 1024 * 100;
 
-        // Add a newline to seperate the headers from the content.
-        buffer.append(Stomp.NEWLINE);
+   private int version = 1;
 
-        os.write(buffer.toString().getBytes("UTF-8"));
-        os.write(stomp.getContent());
-        os.write(END_OF_FRAME);
-    }
+   public int getVersion()
+   {
+      return version;
+   }
 
-    public StompFrame unmarshal(HornetQBuffer in) throws IOException {
+   public void setVersion(int version)
+   {
+      this.version = version;
+   }
 
-        try {
-            String action = null;
+   public byte[] marshal(StompFrame command) throws IOException
+   {
+      ByteArrayOutputStream baos = new ByteArrayOutputStream();
+      DataOutputStream dos = new DataOutputStream(baos);
+      marshal(command, dos);
+      dos.close();
+      return baos.toByteArray();
+   }
 
-            // skip white space to next real action line
-            while (true) {
-                action = readLine(in, MAX_COMMAND_LENGTH, "The maximum command length was exceeded");
-                if (action == null) {
-                    throw new IOException("connection was closed");
-                }
-                else {
-                    action = action.trim();
-                    if (action.length() > 0) {
-                        break;
-                    }
-                }
+   public void marshal(StompFrame stomp, DataOutput os) throws IOException
+   {
+      StringBuffer buffer = new StringBuffer();
+      buffer.append(stomp.getCommand());
+      buffer.append(Stomp.NEWLINE);
+
+      // Output the headers.
+      for (Iterator<Map.Entry<String, Object>> iter = stomp.getHeaders().entrySet().iterator(); iter.hasNext();)
+      {
+         Map.Entry<String, Object> entry = iter.next();
+         buffer.append(entry.getKey());
+         buffer.append(Stomp.Headers.SEPERATOR);
+         buffer.append(entry.getValue());
+         buffer.append(Stomp.NEWLINE);
+      }
+
+      // Add a newline to seperate the headers from the content.
+      buffer.append(Stomp.NEWLINE);
+
+      os.write(buffer.toString().getBytes("UTF-8"));
+      os.write(stomp.getContent());
+      os.write(END_OF_FRAME);
+   }
+
+   public StompFrame unmarshal(HornetQBuffer in) throws IOException
+   {
+
+      try
+      {
+         String action = null;
+
+         // skip white space to next real action line
+         while (true)
+         {
+            action = readLine(in, MAX_COMMAND_LENGTH, "The maximum command length was exceeded");
+            if (action == null)
+            {
+               throw new IOException("connection was closed");
             }
+            else
+            {
+               action = action.trim();
+               if (action.length() > 0)
+               {
+                  break;
+               }
+            }
+         }
 
-            // Parse the headers
-            HashMap<String, Object> headers = new HashMap<String, Object>(25);
-            while (true) {
-                String line = readLine(in, MAX_HEADER_LENGTH, "The maximum header length was exceeded");
-                if (line != null && line.trim().length() > 0) {
+         // Parse the headers
+         HashMap<String, Object> headers = new HashMap<String, Object>(25);
+         while (true)
+         {
+            String line = readLine(in, MAX_HEADER_LENGTH, "The maximum header length was exceeded");
+            if (line != null && line.trim().length() > 0)
+            {
 
-                    if (headers.size() > MAX_HEADERS) {
-                        throw new StompException("The maximum number of headers was exceeded", true);
-                    }
+               if (headers.size() > MAX_HEADERS)
+               {
+                  throw new StompException("The maximum number of headers was exceeded", true);
+               }
 
-                    try {
-                        int seperator_index = line.indexOf(Stomp.Headers.SEPERATOR);
-                        String name = line.substring(0, seperator_index).trim();
-                        String value = line.substring(seperator_index + 1, line.length()).trim();
-                        headers.put(name, value);
-                    }
-                    catch (Exception e) {
-                        throw new StompException("Unable to parser header line [" + line + "]", true);
-                    }
-                }
-                else {
-                    break;
-                }
+               try
+               {
+                  int seperator_index = line.indexOf(Stomp.Headers.SEPERATOR);
+                  String name = line.substring(0, seperator_index).trim();
+                  String value = line.substring(seperator_index + 1, line.length()).trim();
+                  headers.put(name, value);
+               }
+               catch (Exception e)
+               {
+                  throw new StompException("Unable to parser header line [" + line + "]", true);
+               }
             }
+            else
+            {
+               break;
+            }
+         }
 
-            // Read in the data part.
-            byte[] data = NO_DATA;
-            String contentLength = (String) headers.get(Stomp.Headers.CONTENT_LENGTH);
-            if (contentLength != null) {
+         // Read in the data part.
+         byte[] data = NO_DATA;
+         String contentLength = (String)headers.get(Stomp.Headers.CONTENT_LENGTH);
+         if (contentLength != null)
+         {
 
-                // Bless the client, he's telling us how much data to read in.
-                int length;
-                try {
-                    length = Integer.parseInt(contentLength.trim());
-                }
-                catch (NumberFormatException e) {
-                    throw new StompException("Specified content-length is not a valid integer", true);
-                }
+            // Bless the client, he's telling us how much data to read in.
+            int length;
+            try
+            {
+               length = Integer.parseInt(contentLength.trim());
+            }
+            catch (NumberFormatException e)
+            {
+               throw new StompException("Specified content-length is not a valid integer", true);
+            }
 
-                if (length > MAX_DATA_LENGTH) {
-                    throw new StompException("The maximum data length was exceeded", true);
-                }
+            if (length > MAX_DATA_LENGTH)
+            {
+               throw new StompException("The maximum data length was exceeded", true);
+            }
 
-                data = new byte[length];
-                in.readBytes(data);
+            data = new byte[length];
+            in.readBytes(data);
 
-                if (in.readByte() != 0) {
-                    throw new StompException(Stomp.Headers.CONTENT_LENGTH + " bytes were read and " + "there was no trailing null byte", true);
-                }
+            if (in.readByte() != 0)
+            {
+               throw new StompException(Stomp.Headers.CONTENT_LENGTH + " bytes were read and " +
+                                        "there was no trailing null byte", true);
             }
-            else {
+         }
+         else
+         {
 
-                // We don't know how much to read.. data ends when we hit a 0
-                byte b;
-                ByteArrayOutputStream baos = null;
-                while (in.readableBytes() > 0 && (b = in.readByte()) != 0) {
+            // We don't know how much to read.. data ends when we hit a 0
+            byte b;
+            ByteArrayOutputStream baos = null;
+            while (in.readableBytes() > 0 && (b = in.readByte()) != 0)
+            {
 
-                    if (baos == null) {
-                        baos = new ByteArrayOutputStream();
-                    }
-                    else if (baos.size() > MAX_DATA_LENGTH) {
-                        throw new StompException("The maximum data length was exceeded", true);
-                    }
+               if (baos == null)
+               {
+                  baos = new ByteArrayOutputStream();
+               }
+               else if (baos.size() > MAX_DATA_LENGTH)
+               {
+                  throw new StompException("The maximum data length was exceeded", true);
+               }
 
-                    baos.write(b);
-                }
+               baos.write(b);
+            }
 
-                if (baos != null) {
-                    baos.close();
-                    data = baos.toByteArray();
-                }
+            if (baos != null)
+            {
+               baos.close();
+               data = baos.toByteArray();
             }
+         }
 
-            return new StompFrame(action, headers, data);
-        }
-        catch (StompException e) {
-            return new StompFrameError(e);
-        }
-    }
+         return new StompFrame(action, headers, data);
+      }
+      catch (StompException e)
+      {
+         return new StompFrameError(e);
+      }
+   }
 
-    protected String readLine(HornetQBuffer in, int maxLength, String errorMessage) throws IOException {
-       char[] chars = new char[MAX_HEADER_LENGTH];
-       
-       int count = 0;
-       while (in.readable())
-       {
-          byte b = in.readByte();
-          
-          if (b == (byte)'\n')
-          {
-             break;
-          }             
-          else
-          {
-             chars[count++] = (char)b;
-          }
-       }
-       return new String(chars, 0, count);
-    }
+   protected String readLine(HornetQBuffer in, int maxLength, String errorMessage) throws IOException
+   {
+      char[] chars = new char[MAX_HEADER_LENGTH];
+
+      int count = 0;
+      while (in.readable())
+      {
+         byte b = in.readByte();
+
+         if (b == (byte)'\n')
+         {
+            break;
+         }
+         else
+         {
+            chars[count++] = (char)b;
+         }
+      }
+      return new String(chars, 0, count);
+   }
 }

Modified: trunk/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java	2010-01-28 13:26:39 UTC (rev 8854)
+++ trunk/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java	2010-01-28 13:27:55 UTC (rev 8855)
@@ -191,7 +191,7 @@
          {
             send(conn, response);
          }
-         
+
          if (Stomp.Commands.DISCONNECT.equals(command))
          {
             conn.destroy();
@@ -285,7 +285,7 @@
    private StompFrame onAck(StompFrame frame, HornetQServer server, StompConnection connection) throws Exception
    {
       Map<String, Object> headers = frame.getHeaders();
-      String messageID = (String)headers.get(Stomp.Headers.Ack.MESSAGE_ID);      
+      String messageID = (String)headers.get(Stomp.Headers.Ack.MESSAGE_ID);
       String txID = (String)headers.get(Stomp.Headers.TRANSACTION);
       StompSession stompSession = null;
       if (txID != null)
@@ -294,7 +294,7 @@
       }
       stompSession = getSession(connection);
       stompSession.acknowledge(messageID);
-      
+
       return null;
    }
 
@@ -537,5 +537,47 @@
       }
    }
 
+   public void cleanup(StompConnection connection)
+   {
+      connection.setValid(false);
+
+      StompSession session = sessions.remove(connection);
+      if (session != null)
+      {
+         try
+         {
+            session.getSession().rollback(true);
+            session.getSession().close();
+            session.getSession().runConnectionFailureRunners();
+         }
+         catch (Exception e)
+         {
+            log.warn(e.getMessage(), e);
+         }
+      }
+
+      // removed the transacted session belonging to the connection
+      Iterator<Entry<String, StompSession>> iterator = transactedSessions.entrySet().iterator();
+      while (iterator.hasNext())
+      {
+         Map.Entry<String, StompSession> entry = (Map.Entry<String, StompSession>)iterator.next();
+         if (entry.getValue().getConnection() == connection)
+         {
+            ServerSession serverSession = entry.getValue().getSession();
+            try
+            {
+               serverSession.rollback(true);
+               serverSession.close();
+               serverSession.runConnectionFailureRunners();
+            }
+            catch (Exception e)
+            {
+               log.warn(e.getMessage(), e);
+            }
+            iterator.remove();
+         }
+      }
+   }
+
    // Inner classes -------------------------------------------------
 }

Modified: trunk/src/main/org/hornetq/core/protocol/stomp/StompSession.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/stomp/StompSession.java	2010-01-28 13:26:39 UTC (rev 8854)
+++ trunk/src/main/org/hornetq/core/protocol/stomp/StompSession.java	2010-01-28 13:27:55 UTC (rev 8855)
@@ -97,10 +97,10 @@
             buffer.readBytes(data);
             headers.put(Headers.CONTENT_LENGTH, data.length);
          }
-         
+
          StompFrame frame = new StompFrame(Stomp.Responses.MESSAGE, headers, data);
          StompUtils.copyStandardHeadersFromMessageToFrame(serverMessage, frame, deliveryCount);
-         
+
          int length = manager.send(connection, frame);
 
          if (subscription.getAck().equals(Stomp.Headers.Subscribe.AckModeValues.AUTO))
@@ -150,14 +150,11 @@
       SimpleString queue = SimpleString.toSimpleString(destination);
       if (destination.startsWith(StompUtils.HQ_TOPIC_PREFIX))
       {
-         //subscribes to a topic
+         // subscribes to a topic
          queue = UUIDGenerator.getInstance().generateSimpleStringUUID();
          session.createQueue(SimpleString.toSimpleString(destination), queue, null, true, false);
       }
-      session.createConsumer(consumerID,
-                             queue,
-                             SimpleString.toSimpleString(selector),
-                             false);
+      session.createConsumer(consumerID, queue, SimpleString.toSimpleString(selector), false);
       session.receiveConsumerCredits(consumerID, -1);
       StompSubscription subscription = new StompSubscription(subscriptionID, destination, ack);
       subscriptions.put(consumerID, subscription);
@@ -185,7 +182,7 @@
    }
 
    boolean containsSubscription(String subscriptionID)
-   {     
+   {
       Iterator<Entry<Long, StompSubscription>> iterator = subscriptions.entrySet().iterator();
       while (iterator.hasNext())
       {

Modified: trunk/src/main/org/hornetq/core/protocol/stomp/StompSubscription.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/stomp/StompSubscription.java	2010-01-28 13:26:39 UTC (rev 8854)
+++ trunk/src/main/org/hornetq/core/protocol/stomp/StompSubscription.java	2010-01-28 13:27:55 UTC (rev 8855)
@@ -54,7 +54,7 @@
    {
       return destination;
    }
-   
+
    public String getID()
    {
       return subID;

Modified: trunk/src/main/org/hornetq/core/protocol/stomp/StompUtils.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/stomp/StompUtils.java	2010-01-28 13:26:39 UTC (rev 8854)
+++ trunk/src/main/org/hornetq/core/protocol/stomp/StompUtils.java	2010-01-28 13:27:55 UTC (rev 8855)
@@ -124,11 +124,11 @@
    public static void copyStandardHeadersFromFrameToMessage(StompFrame frame, ServerMessageImpl msg) throws Exception
    {
       Map<String, Object> headers = new HashMap<String, Object>(frame.getHeaders());
-      
+
       String priority = (String)headers.remove(Stomp.Headers.Send.PRIORITY);
       if (priority != null)
       {
-      msg.setPriority(Byte.parseByte(priority));
+         msg.setPriority(Byte.parseByte(priority));
       }
       String persistent = (String)headers.remove(Stomp.Headers.Send.PERSISTENT);
       if (persistent != null)
@@ -160,34 +160,39 @@
       }
    }
 
-   public static void copyStandardHeadersFromMessageToFrame(Message message, StompFrame command, int deliveryCount) throws Exception {
+   public static void copyStandardHeadersFromMessageToFrame(Message message, StompFrame command, int deliveryCount) throws Exception
+   {
       final Map<String, Object> headers = command.getHeaders();
       headers.put(Stomp.Headers.Message.DESTINATION, toStompDestination(message.getAddress().toString()));
       headers.put(Stomp.Headers.Message.MESSAGE_ID, message.getMessageID());
 
-      if (message.getObjectProperty("JMSCorrelationID") != null) {
-          headers.put(Stomp.Headers.Message.CORRELATION_ID, message.getObjectProperty("JMSCorrelationID"));
+      if (message.getObjectProperty("JMSCorrelationID") != null)
+      {
+         headers.put(Stomp.Headers.Message.CORRELATION_ID, message.getObjectProperty("JMSCorrelationID"));
       }
       headers.put(Stomp.Headers.Message.EXPIRATION_TIME, "" + message.getExpiration());
       headers.put(Stomp.Headers.Message.REDELIVERED, deliveryCount > 1);
       headers.put(Stomp.Headers.Message.PRORITY, "" + message.getPriority());
 
-      if (message.getStringProperty(ClientMessageImpl.REPLYTO_HEADER_NAME) != null) {
-          headers.put(Stomp.Headers.Message.REPLY_TO, toStompDestination(message.getStringProperty(ClientMessageImpl.REPLYTO_HEADER_NAME)));
+      if (message.getStringProperty(ClientMessageImpl.REPLYTO_HEADER_NAME) != null)
+      {
+         headers.put(Stomp.Headers.Message.REPLY_TO,
+                     toStompDestination(message.getStringProperty(ClientMessageImpl.REPLYTO_HEADER_NAME)));
       }
       headers.put(Stomp.Headers.Message.TIMESTAMP, "" + message.getTimestamp());
 
-      if (message.getObjectProperty("JMSType") != null) {
-          headers.put(Stomp.Headers.Message.TYPE, message.getObjectProperty("JMSType"));
+      if (message.getObjectProperty("JMSType") != null)
+      {
+         headers.put(Stomp.Headers.Message.TYPE, message.getObjectProperty("JMSType"));
       }
 
       // now lets add all the message headers
       Set<SimpleString> names = message.getPropertyNames();
       for (SimpleString name : names)
       {
-          headers.put(name.toString(), message.getObjectProperty(name));
+         headers.put(name.toString(), message.getObjectProperty(name));
       }
-  }
+   }
    // Constructors --------------------------------------------------
 
    // Public --------------------------------------------------------



More information about the hornetq-commits mailing list