[hornetq-commits] JBoss hornetq SVN: r8830 - in branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq: core/server/impl and 2 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Thu Jan 21 12:05:07 EST 2010


Author: jmesnil
Date: 2010-01-21 12:05:06 -0500 (Thu, 21 Jan 2010)
New Revision: 8830

Added:
   branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/protocol/stomp/StompException.java
   branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/protocol/stomp/StompFrameDecoder.java
   branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/protocol/stomp/StompUtils.java
   branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/ServerHolder.java
Removed:
   branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/protocol/stomp/ProtocolException.java
   branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/protocol/stomp/StompDestinationConverter.java
   branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/protocol/stomp/StompException.java
   branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/protocol/stomp/StompPacketDecoder.java
   branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/ServerHolder.java
Modified:
   branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/server/HornetQServer.java
   branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
   branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/protocol/stomp/StompChannelHandler.java
   branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/protocol/stomp/StompFrameError.java
   branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/protocol/stomp/StompMarshaller.java
   branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/protocol/stomp/StompSessionCallback.java
   branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/ChannelPipelineSupport.java
   branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/NettyAcceptor.java
Log:
https://jira.jboss.org/jira/browse/HORNETQ-129: Implement STOMP v1.0

* code cleanup

Modified: branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/server/HornetQServer.java
===================================================================
--- branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/server/HornetQServer.java	2010-01-21 16:46:55 UTC (rev 8829)
+++ branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/server/HornetQServer.java	2010-01-21 17:05:06 UTC (rev 8830)
@@ -85,8 +85,6 @@
 
    void removeSession(String name) throws Exception;
 
-   ServerSession getSession(String name);
-
    Set<ServerSession> getSessions();
 
    boolean isStarted();

Modified: branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java	2010-01-21 16:46:55 UTC (rev 8829)
+++ branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java	2010-01-21 17:05:06 UTC (rev 8830)
@@ -598,11 +598,6 @@
       sessions.remove(name);
    }
 
-   public ServerSession getSession(final String name)
-   {
-      return sessions.get(name);
-   }
-
    public synchronized List<ServerSession> getSessions(final String connectionID)
    {
       Set<Entry<String, ServerSession>> sessionEntries = sessions.entrySet();

Deleted: branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/protocol/stomp/ProtocolException.java
===================================================================
--- branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/protocol/stomp/ProtocolException.java	2010-01-21 16:46:55 UTC (rev 8829)
+++ branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/protocol/stomp/ProtocolException.java	2010-01-21 17:05:06 UTC (rev 8830)
@@ -1,50 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.hornetq.integration.protocol.stomp;
-
-import java.io.IOException;
-
-/**
- * @author <a href="http://hiramchirino.com">chirino</a>
- */
-class ProtocolException extends IOException {
-    private static final long serialVersionUID = -2869735532997332242L;
-    private final boolean fatal;
-
-    public ProtocolException() {
-        this(null);
-    }
-
-    public ProtocolException(String s) {
-        this(s, false);
-    }
-
-    public ProtocolException(String s, boolean fatal) {
-        this(s, fatal, null);
-    }
-
-    public ProtocolException(String s, boolean fatal, Throwable cause) {
-        super(s);
-        this.fatal = fatal;
-        initCause(cause);
-    }
-
-    public boolean isFatal() {
-        return fatal;
-    }
-}

Modified: branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/protocol/stomp/StompChannelHandler.java
===================================================================
--- branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/protocol/stomp/StompChannelHandler.java	2010-01-21 16:46:55 UTC (rev 8829)
+++ branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/protocol/stomp/StompChannelHandler.java	2010-01-21 17:05:06 UTC (rev 8830)
@@ -14,6 +14,7 @@
 package org.hornetq.integration.protocol.stomp;
 
 import java.io.ByteArrayOutputStream;
+import java.io.IOException;
 import java.io.OutputStreamWriter;
 import java.io.PrintWriter;
 import java.util.HashMap;
@@ -51,7 +52,7 @@
 
    private final Map<RemotingConnection, ServerSession> sessions = new HashMap<RemotingConnection, ServerSession>();
 
-   private ServerHolder serverHandler;
+   private ServerHolder serverHolder;
 
    public StompChannelHandler(ServerHolder serverHolder,
                               final ChannelGroup group,
@@ -59,18 +60,18 @@
                               final ConnectionLifeCycleListener listener)
    {
       super(group, listener, acceptor);
-      this.serverHandler = serverHolder;
+      this.serverHolder = serverHolder;
       this.marshaller = new StompMarshaller();
    }
 
    public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception
    {
       StompFrame frame = (StompFrame)e.getMessage();
-      System.out.println(">>> got frame " + frame);
+      System.out.println("RECEIVED " + frame);
 
       // need to interact with HornetQ server & session
-      HornetQServer server = serverHandler.getServer();
-      RemotingConnection connection = serverHandler.getRemotingConnection(e.getChannel().getId());
+      HornetQServer server = serverHolder.getServer();
+      RemotingConnection connection = serverHolder.getRemotingConnection(e.getChannel().getId());
 
       try
       {
@@ -104,11 +105,7 @@
 
          if (response != null)
          {
-            System.out.println(">>> will reply " + response);
-            byte[] bytes = marshaller.marshal(response);
-            HornetQBuffer buffer = HornetQBuffers.wrappedBuffer(bytes);
-            System.out.println("ready to send reply: " + buffer);
-            connection.getTransportConnection().write(buffer, true);
+            send(connection, response);
          }
       }
       catch (StompException ex)
@@ -129,11 +126,7 @@
          }
 
          StompFrame errorMessage = new StompFrame(Stomp.Responses.ERROR, headers, baos.toByteArray());
-         byte[] bytes = marshaller.marshal(errorMessage);
-         HornetQBuffer buffer = HornetQBuffers.wrappedBuffer(bytes);
-         System.out.println("ready to send reply: " + buffer);
-         connection.getTransportConnection().write(buffer, true);
-
+         send(connection, errorMessage);
       }
       catch (Exception ex)
       {
@@ -141,21 +134,13 @@
       }
    }
 
-   /**
-    * @param frame
-    * @param server
-    * @param connection
-    * @return
-    * @throws StompException 
-    * @throws HornetQException 
-    */
    private StompFrame onSubscribe(StompFrame frame, HornetQServer server, RemotingConnection connection) throws Exception,
                                                                                                         StompException,
                                                                                                         HornetQException
    {
       Map<String, Object> headers = frame.getHeaders();
       String queue = (String)headers.get(Stomp.Headers.Send.DESTINATION);
-      SimpleString queueName = StompDestinationConverter.toHornetQAddress(queue);
+      SimpleString queueName = StompUtils.toHornetQAddress(queue);
 
       ServerSession session = checkAndGetSession(connection);
       long consumerID = server.getStorageManager().generateUniqueID();
@@ -215,7 +200,7 @@
       boolean durable = false;
       long expiration = -1;
       byte priority = 9;
-      SimpleString address = StompDestinationConverter.toHornetQAddress(queue);
+      SimpleString address = StompUtils.toHornetQAddress(queue);
 
       ServerMessage message = new ServerMessageImpl(server.getStorageManager().generateUniqueID(), 512);
       message.setType(type);
@@ -252,17 +237,16 @@
       String requestID = (String)headers.get(Stomp.Headers.Connect.REQUEST_ID);
 
       String name = UUIDGenerator.getInstance().generateStringUUID();
-      server.createSession(name,
-                           login,
-                           passcode,
-                           HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE,
-                           connection,
-                           true,
-                           true,
-                           false,
-                           false,
-                           new StompSessionCallback(marshaller, connection));
-      ServerSession session = server.getSession(name);
+      ServerSession session = server.createSession(name,
+                                                   login,
+                                                   passcode,
+                                                   HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE,
+                                                   connection,
+                                                   true,
+                                                   true,
+                                                   false,
+                                                   false,
+                                                   new StompSessionCallback(marshaller, connection));
       sessions.put(connection, session);
       System.out.println(">>> created session " + session);
       HashMap<String, Object> h = new HashMap<String, Object>();
@@ -270,4 +254,13 @@
       h.put(Stomp.Headers.Connected.RESPONSE_ID, requestID);
       return new StompFrame(Stomp.Responses.CONNECTED, h, new byte[] {});
    }
+
+   private void send(RemotingConnection connection, StompFrame frame) throws IOException
+   {
+      System.out.println("SENDING >>> " + frame);
+      byte[] bytes = marshaller.marshal(frame);
+      HornetQBuffer buffer = HornetQBuffers.wrappedBuffer(bytes);
+      System.out.println("ready to send reply: " + buffer);
+      connection.getTransportConnection().write(buffer, true);
+   }
 }
\ No newline at end of file

Deleted: branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/protocol/stomp/StompDestinationConverter.java
===================================================================
--- branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/protocol/stomp/StompDestinationConverter.java	2010-01-21 16:46:55 UTC (rev 8829)
+++ branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/protocol/stomp/StompDestinationConverter.java	2010-01-21 17:05:06 UTC (rev 8830)
@@ -1,118 +0,0 @@
-/*
- * Copyright 2010 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *    http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied.  See the License for the specific language governing
- * permissions and limitations under the License.
- */
-
-package org.hornetq.integration.protocol.stomp;
-
-import org.hornetq.api.core.HornetQException;
-import org.hornetq.api.core.SimpleString;
-import org.hornetq.jms.client.HornetQQueue;
-import org.hornetq.jms.client.HornetQTemporaryQueue;
-import org.hornetq.jms.client.HornetQTemporaryTopic;
-import org.hornetq.jms.client.HornetQTopic;
-
-/**
- * A StompDestinationConverter
- *
- * @author jmesnil
- *
- *
- */
-class StompDestinationConverter
-{
-
-   // Constants -----------------------------------------------------
-
-   // Attributes ----------------------------------------------------
-
-   // Static --------------------------------------------------------
-
-   public static SimpleString toHornetQAddress(String stompDestination) throws HornetQException
-   {
-      if (stompDestination == null)
-      {
-         throw new HornetQException(HornetQException.ILLEGAL_STATE, "No destination is specified!");
-      }
-      else if (stompDestination.startsWith("/queue/"))
-      {
-         String queueName = stompDestination.substring("/queue/".length(), stompDestination.length());
-         return HornetQQueue.createAddressFromName(queueName);
-      }
-      else if (stompDestination.startsWith("/topic/"))
-      {
-         String topicName = stompDestination.substring("/topic/".length(), stompDestination.length());
-         return HornetQTopic.createAddressFromName(topicName);
-      }
-      else if (stompDestination.startsWith("/temp-queue/"))
-      {
-         String tempName = stompDestination.substring("/temp-queue/".length(), stompDestination.length());
-         return HornetQTemporaryQueue.createAddressFromName(tempName);
-      }
-      else if (stompDestination.startsWith("/temp-topic/"))
-      {
-         String tempName = stompDestination.substring("/temp-topic/".length(), stompDestination.length());
-         return HornetQTemporaryTopic.createAddressFromName(tempName);
-      }
-      else
-      {
-         throw new HornetQException(HornetQException.ILLEGAL_STATE, "Illegal destination name: [" + stompDestination +
-                                                                    "] -- StompConnect destinations " +
-                                                                    "must begine with one of: /queue/ /topic/ /temp-queue/ /temp-topic/");
-      }
-   }
-
-   public static String toStompDestination(String hornetqAddress) throws HornetQException
-   {
-      if (hornetqAddress == null)
-      {
-         throw new HornetQException(HornetQException.ILLEGAL_STATE, "No destination is specified!");
-      }
-      else if (hornetqAddress.startsWith(HornetQQueue.JMS_QUEUE_ADDRESS_PREFIX))
-      {
-         return "/queue/" + hornetqAddress.substring(HornetQQueue.JMS_QUEUE_ADDRESS_PREFIX.length(),
-                                                     hornetqAddress.length());
-      }
-      else if (hornetqAddress.startsWith(HornetQTemporaryQueue.JMS_TEMP_QUEUE_ADDRESS_PREFIX))
-      {
-         return "/temp-queue/" + hornetqAddress.substring(HornetQTemporaryQueue.JMS_TEMP_QUEUE_ADDRESS_PREFIX.length(),
-                                                          hornetqAddress.length());
-      }
-      else if (hornetqAddress.startsWith(HornetQTopic.JMS_TOPIC_ADDRESS_PREFIX))
-      {
-         return "/topic/" + hornetqAddress.substring(HornetQTopic.JMS_TOPIC_ADDRESS_PREFIX.length(),
-                                                     hornetqAddress.length());
-      }
-      else if (hornetqAddress.startsWith(HornetQTemporaryTopic.JMS_TEMP_TOPIC_ADDRESS_PREFIX))
-      {
-         return "/temp-topic/" + hornetqAddress.substring(HornetQTemporaryTopic.JMS_TEMP_TOPIC_ADDRESS_PREFIX.length(),
-                                                          hornetqAddress.length());
-      }
-      else
-      {
-         throw new HornetQException(HornetQException.ILLEGAL_STATE, "Illegal address name: [" + hornetqAddress +
-                                                                    "] -- Acceptable address must comply to JMS semantics");
-      }
-   }
-
-   // Constructors --------------------------------------------------
-
-   // Public --------------------------------------------------------
-
-   // Package protected ---------------------------------------------
-
-   // Protected -----------------------------------------------------
-
-   // Private -------------------------------------------------------
-
-   // Inner classes -------------------------------------------------
-
-}

Deleted: branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/protocol/stomp/StompException.java
===================================================================
--- branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/protocol/stomp/StompException.java	2010-01-21 16:46:55 UTC (rev 8829)
+++ branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/protocol/stomp/StompException.java	2010-01-21 17:05:06 UTC (rev 8830)
@@ -1,50 +0,0 @@
-/*
- * Copyright 2010 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *    http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied.  See the License for the specific language governing
- * permissions and limitations under the License.
- */
-
-package org.hornetq.integration.protocol.stomp;
-
-/**
- * A StompException
- *
- * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
- */
-class StompException extends Exception
-{
-
-   /**
-    * @param string
-    */
-   public StompException(String string)
-   {
-      super(string);
-   }
-
-   // Constants -----------------------------------------------------
-
-   // Attributes ----------------------------------------------------
-
-   // Static --------------------------------------------------------
-
-   // Constructors --------------------------------------------------
-
-   // Public --------------------------------------------------------
-
-   // Package protected ---------------------------------------------
-
-   // Protected -----------------------------------------------------
-
-   // Private -------------------------------------------------------
-
-   // Inner classes -------------------------------------------------
-
-}

Copied: branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/protocol/stomp/StompException.java (from rev 8829, branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/protocol/stomp/ProtocolException.java)
===================================================================
--- branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/protocol/stomp/StompException.java	                        (rev 0)
+++ branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/protocol/stomp/StompException.java	2010-01-21 17:05:06 UTC (rev 8830)
@@ -0,0 +1,50 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.hornetq.integration.protocol.stomp;
+
+import java.io.IOException;
+
+/**
+ * @author <a href="http://hiramchirino.com">chirino</a>
+ */
+class StompException extends IOException {
+    private static final long serialVersionUID = -2869735532997332242L;
+    private final boolean fatal;
+
+    public StompException() {
+        this(null);
+    }
+
+    public StompException(String s) {
+        this(s, false);
+    }
+
+    public StompException(String s, boolean fatal) {
+        this(s, fatal, null);
+    }
+
+    public StompException(String s, boolean fatal, Throwable cause) {
+        super(s);
+        this.fatal = fatal;
+        initCause(cause);
+    }
+
+    public boolean isFatal() {
+        return fatal;
+    }
+}

Copied: branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/protocol/stomp/StompFrameDecoder.java (from rev 8829, branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/protocol/stomp/StompPacketDecoder.java)
===================================================================
--- branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/protocol/stomp/StompFrameDecoder.java	                        (rev 0)
+++ branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/protocol/stomp/StompFrameDecoder.java	2010-01-21 17:05:06 UTC (rev 8830)
@@ -0,0 +1,51 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.integration.protocol.stomp;
+
+import org.hornetq.api.core.HornetQBuffer;
+import org.hornetq.core.buffers.impl.ChannelBufferWrapper;
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.channel.ChannelPipelineCoverage;
+import org.jboss.netty.channel.Channels;
+import org.jboss.netty.channel.MessageEvent;
+import org.jboss.netty.channel.SimpleChannelHandler;
+
+/**
+ * A StompFrameDecoder
+ *
+ * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
+ */
+ at ChannelPipelineCoverage("one")
+public class StompFrameDecoder extends SimpleChannelHandler
+{
+   private final StompMarshaller marshaller;
+   
+   // PacketDecoder implementation ----------------------------------
+
+   public StompFrameDecoder()
+   {
+      this.marshaller = new StompMarshaller();
+   }
+
+   @Override
+   public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception
+   {
+      ChannelBuffer in = (ChannelBuffer)e.getMessage();
+      HornetQBuffer buffer = new ChannelBufferWrapper(in);
+      StompFrame frame = marshaller.unmarshal(buffer);
+      
+      Channels.fireMessageReceived(ctx, frame);
+   }
+}

Modified: branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/protocol/stomp/StompFrameError.java
===================================================================
--- branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/protocol/stomp/StompFrameError.java	2010-01-21 16:46:55 UTC (rev 8829)
+++ branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/protocol/stomp/StompFrameError.java	2010-01-21 17:05:06 UTC (rev 8830)
@@ -23,13 +23,13 @@
  * @author <a href="http://hiramchirino.com">chirino</a>
  */
 class StompFrameError extends StompFrame {
-    private final ProtocolException exception;
+    private final StompException exception;
 
-    public StompFrameError(ProtocolException exception) {
+    public StompFrameError(StompException exception) {
         this.exception = exception;
     }
 
-    public ProtocolException getException() {
+    public StompException getException() {
         return exception;
     }
 }

Modified: branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/protocol/stomp/StompMarshaller.java
===================================================================
--- branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/protocol/stomp/StompMarshaller.java	2010-01-21 16:46:55 UTC (rev 8829)
+++ branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/protocol/stomp/StompMarshaller.java	2010-01-21 17:05:06 UTC (rev 8830)
@@ -103,7 +103,7 @@
                 if (line != null && line.trim().length() > 0) {
 
                     if (headers.size() > MAX_HEADERS) {
-                        throw new ProtocolException("The maximum number of headers was exceeded", true);
+                        throw new StompException("The maximum number of headers was exceeded", true);
                     }
 
                     try {
@@ -113,7 +113,7 @@
                         headers.put(name, value);
                     }
                     catch (Exception e) {
-                        throw new ProtocolException("Unable to parser header line [" + line + "]", true);
+                        throw new StompException("Unable to parser header line [" + line + "]", true);
                     }
                 }
                 else {
@@ -132,18 +132,18 @@
                     length = Integer.parseInt(contentLength.trim());
                 }
                 catch (NumberFormatException e) {
-                    throw new ProtocolException("Specified content-length is not a valid integer", true);
+                    throw new StompException("Specified content-length is not a valid integer", true);
                 }
 
                 if (length > MAX_DATA_LENGTH) {
-                    throw new ProtocolException("The maximum data length was exceeded", true);
+                    throw new StompException("The maximum data length was exceeded", true);
                 }
 
                 data = new byte[length];
                 in.readBytes(data);
 
                 if (in.readByte() != 0) {
-                    throw new ProtocolException(Stomp.Headers.CONTENT_LENGTH + " bytes were read and " + "there was no trailing null byte", true);
+                    throw new StompException(Stomp.Headers.CONTENT_LENGTH + " bytes were read and " + "there was no trailing null byte", true);
                 }
             }
             else {
@@ -157,7 +157,7 @@
                         baos = new ByteArrayOutputStream();
                     }
                     else if (baos.size() > MAX_DATA_LENGTH) {
-                        throw new ProtocolException("The maximum data length was exceeded", true);
+                        throw new StompException("The maximum data length was exceeded", true);
                     }
 
                     baos.write(b);
@@ -171,7 +171,7 @@
 
             return new StompFrame(action, headers, data);
         }
-        catch (ProtocolException e) {
+        catch (StompException e) {
             return new StompFrameError(e);
         }
     }
@@ -181,7 +181,7 @@
         ByteArrayOutputStream baos = new ByteArrayOutputStream(maxLength);
         while ((b = in.readByte()) != '\n') {
             if (baos.size() > maxLength) {
-                throw new ProtocolException(errorMessage, true);
+                throw new StompException(errorMessage, true);
             }
             baos.write(b);
         }

Deleted: branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/protocol/stomp/StompPacketDecoder.java
===================================================================
--- branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/protocol/stomp/StompPacketDecoder.java	2010-01-21 16:46:55 UTC (rev 8829)
+++ branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/protocol/stomp/StompPacketDecoder.java	2010-01-21 17:05:06 UTC (rev 8830)
@@ -1,51 +0,0 @@
-/*
- * Copyright 2010 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *    http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied.  See the License for the specific language governing
- * permissions and limitations under the License.
- */
-
-package org.hornetq.integration.protocol.stomp;
-
-import org.hornetq.api.core.HornetQBuffer;
-import org.hornetq.core.buffers.impl.ChannelBufferWrapper;
-import org.jboss.netty.buffer.ChannelBuffer;
-import org.jboss.netty.channel.ChannelHandlerContext;
-import org.jboss.netty.channel.ChannelPipelineCoverage;
-import org.jboss.netty.channel.Channels;
-import org.jboss.netty.channel.MessageEvent;
-import org.jboss.netty.channel.SimpleChannelHandler;
-
-/**
- * A StompPacketDecoder
- *
- * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
- */
- at ChannelPipelineCoverage("one")
-public class StompPacketDecoder extends SimpleChannelHandler
-{
-   private final StompMarshaller marshaller;
-   
-   // PacketDecoder implementation ----------------------------------
-
-   public StompPacketDecoder()
-   {
-      this.marshaller = new StompMarshaller();
-   }
-
-   @Override
-   public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception
-   {
-      ChannelBuffer in = (ChannelBuffer)e.getMessage();
-      HornetQBuffer buffer = new ChannelBufferWrapper(in);
-      StompFrame frame = marshaller.unmarshal(buffer);
-      
-      Channels.fireMessageReceived(ctx, frame);
-   }
-}

Modified: branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/protocol/stomp/StompSessionCallback.java
===================================================================
--- branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/protocol/stomp/StompSessionCallback.java	2010-01-21 16:46:55 UTC (rev 8829)
+++ branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/protocol/stomp/StompSessionCallback.java	2010-01-21 17:05:06 UTC (rev 8830)
@@ -49,7 +49,7 @@
       try
       {
          Map<String, Object> headers = new HashMap<String, Object>();
-         headers.put(Stomp.Headers.Message.DESTINATION, StompDestinationConverter.toStompDestination(serverMessage.getAddress()
+         headers.put(Stomp.Headers.Message.DESTINATION, StompUtils.toStompDestination(serverMessage.getAddress()
                                                                                                        .toString()));
          byte[] data = new byte[] {};
          if (serverMessage.getType() == HornetQTextMessage.TYPE)

Copied: branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/protocol/stomp/StompUtils.java (from rev 8829, branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/protocol/stomp/StompDestinationConverter.java)
===================================================================
--- branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/protocol/stomp/StompUtils.java	                        (rev 0)
+++ branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/protocol/stomp/StompUtils.java	2010-01-21 17:05:06 UTC (rev 8830)
@@ -0,0 +1,118 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.integration.protocol.stomp;
+
+import org.hornetq.api.core.HornetQException;
+import org.hornetq.api.core.SimpleString;
+import org.hornetq.jms.client.HornetQQueue;
+import org.hornetq.jms.client.HornetQTemporaryQueue;
+import org.hornetq.jms.client.HornetQTemporaryTopic;
+import org.hornetq.jms.client.HornetQTopic;
+
+/**
+ * A StompUtils
+ *
+ * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
+ *
+ *
+ */
+class StompUtils
+{
+
+   // Constants -----------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+
+   // Static --------------------------------------------------------
+
+   public static SimpleString toHornetQAddress(String stompDestination) throws HornetQException
+   {
+      if (stompDestination == null)
+      {
+         throw new HornetQException(HornetQException.ILLEGAL_STATE, "No destination is specified!");
+      }
+      else if (stompDestination.startsWith("/queue/"))
+      {
+         String queueName = stompDestination.substring("/queue/".length(), stompDestination.length());
+         return HornetQQueue.createAddressFromName(queueName);
+      }
+      else if (stompDestination.startsWith("/topic/"))
+      {
+         String topicName = stompDestination.substring("/topic/".length(), stompDestination.length());
+         return HornetQTopic.createAddressFromName(topicName);
+      }
+      else if (stompDestination.startsWith("/temp-queue/"))
+      {
+         String tempName = stompDestination.substring("/temp-queue/".length(), stompDestination.length());
+         return HornetQTemporaryQueue.createAddressFromName(tempName);
+      }
+      else if (stompDestination.startsWith("/temp-topic/"))
+      {
+         String tempName = stompDestination.substring("/temp-topic/".length(), stompDestination.length());
+         return HornetQTemporaryTopic.createAddressFromName(tempName);
+      }
+      else
+      {
+         throw new HornetQException(HornetQException.ILLEGAL_STATE, "Illegal destination name: [" + stompDestination +
+                                                                    "] -- StompConnect destinations " +
+                                                                    "must begine with one of: /queue/ /topic/ /temp-queue/ /temp-topic/");
+      }
+   }
+
+   public static String toStompDestination(String hornetqAddress) throws HornetQException
+   {
+      if (hornetqAddress == null)
+      {
+         throw new HornetQException(HornetQException.ILLEGAL_STATE, "No destination is specified!");
+      }
+      else if (hornetqAddress.startsWith(HornetQQueue.JMS_QUEUE_ADDRESS_PREFIX))
+      {
+         return "/queue/" + hornetqAddress.substring(HornetQQueue.JMS_QUEUE_ADDRESS_PREFIX.length(),
+                                                     hornetqAddress.length());
+      }
+      else if (hornetqAddress.startsWith(HornetQTemporaryQueue.JMS_TEMP_QUEUE_ADDRESS_PREFIX))
+      {
+         return "/temp-queue/" + hornetqAddress.substring(HornetQTemporaryQueue.JMS_TEMP_QUEUE_ADDRESS_PREFIX.length(),
+                                                          hornetqAddress.length());
+      }
+      else if (hornetqAddress.startsWith(HornetQTopic.JMS_TOPIC_ADDRESS_PREFIX))
+      {
+         return "/topic/" + hornetqAddress.substring(HornetQTopic.JMS_TOPIC_ADDRESS_PREFIX.length(),
+                                                     hornetqAddress.length());
+      }
+      else if (hornetqAddress.startsWith(HornetQTemporaryTopic.JMS_TEMP_TOPIC_ADDRESS_PREFIX))
+      {
+         return "/temp-topic/" + hornetqAddress.substring(HornetQTemporaryTopic.JMS_TEMP_TOPIC_ADDRESS_PREFIX.length(),
+                                                          hornetqAddress.length());
+      }
+      else
+      {
+         throw new HornetQException(HornetQException.ILLEGAL_STATE, "Illegal address name: [" + hornetqAddress +
+                                                                    "] -- Acceptable address must comply to JMS semantics");
+      }
+   }
+
+   // Constructors --------------------------------------------------
+
+   // Public --------------------------------------------------------
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+
+}

Modified: branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/ChannelPipelineSupport.java
===================================================================
--- branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/ChannelPipelineSupport.java	2010-01-21 16:46:55 UTC (rev 8829)
+++ branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/ChannelPipelineSupport.java	2010-01-21 17:05:06 UTC (rev 8830)
@@ -16,8 +16,6 @@
 import javax.net.ssl.SSLContext;
 import javax.net.ssl.SSLEngine;
 
-import org.hornetq.integration.protocol.stomp.StompFrameDelimiter;
-import org.hornetq.integration.protocol.stomp.StompPacketDecoder;
 import org.hornetq.spi.core.remoting.BufferHandler;
 import org.jboss.netty.channel.ChannelPipeline;
 import org.jboss.netty.handler.ssl.SslHandler;
@@ -46,13 +44,6 @@
 
    // Public --------------------------------------------------------
 
-   public static void addStompStack(final ChannelPipeline pipeline, final ServerHolder serverHandler)
-   {
-      assert pipeline != null;
-      pipeline.addLast("delimiter", new StompFrameDelimiter());
-      pipeline.addLast("codec", new StompPacketDecoder());
-   }
-
    public static void addHornetQCodecFilter(final ChannelPipeline pipeline, final BufferHandler handler)
    {
       assert pipeline != null;

Modified: branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/NettyAcceptor.java
===================================================================
--- branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/NettyAcceptor.java	2010-01-21 16:46:55 UTC (rev 8829)
+++ branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/NettyAcceptor.java	2010-01-21 17:05:06 UTC (rev 8830)
@@ -37,6 +37,8 @@
 import org.hornetq.core.server.management.Notification;
 import org.hornetq.core.server.management.NotificationService;
 import org.hornetq.integration.protocol.stomp.StompChannelHandler;
+import org.hornetq.integration.protocol.stomp.StompFrameDelimiter;
+import org.hornetq.integration.protocol.stomp.StompFrameDecoder;
 import org.hornetq.spi.core.remoting.Acceptor;
 import org.hornetq.spi.core.remoting.BufferHandler;
 import org.hornetq.spi.core.remoting.Connection;
@@ -132,7 +134,7 @@
 
    private VirtualExecutorService bossExecutor;
 
-   private ServerHolder serverHandler;
+   private ServerHolder serverHolder;
 
    public NettyAcceptor(final Map<String, Object> configuration,
                         final BufferHandler handler,
@@ -143,7 +145,7 @@
    {
       this.handler = handler;
 
-      this.serverHandler = serverHandler;
+      this.serverHolder = serverHandler;
 
       this.listener = listener;
 
@@ -291,8 +293,9 @@
             }
             if (protocol.equals(TransportConstants.STOMP_PROTOCOL))
             {
-               ChannelPipelineSupport.addStompStack(pipeline, serverHandler);
-               pipeline.addLast("handler", new StompChannelHandler(serverHandler,
+               pipeline.addLast("delimiter", new StompFrameDelimiter());
+               pipeline.addLast("codec", new StompFrameDecoder());
+               pipeline.addLast("handler", new StompChannelHandler(serverHolder,
                                                                    channelGroup,
                                                                    NettyAcceptor.this,
                                                                    new Listener()));

Deleted: branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/ServerHolder.java
===================================================================
--- branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/ServerHolder.java	2010-01-21 16:46:55 UTC (rev 8829)
+++ branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/ServerHolder.java	2010-01-21 17:05:06 UTC (rev 8830)
@@ -1,31 +0,0 @@
-/*
- * Copyright 2010 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *    http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied.  See the License for the specific language governing
- * permissions and limitations under the License.
- */
-
-package org.hornetq.integration.transports.netty;
-
-import org.hornetq.core.remoting.RemotingConnection;
-import org.hornetq.core.server.HornetQServer;
-
-/**
- * A ServerHolder
- *
- * @author jmesnil
- *
- *
- */
-public interface ServerHolder
-{
-   HornetQServer getServer();
-
-   RemotingConnection getRemotingConnection(int connectionID);
-}

Copied: branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/ServerHolder.java (from rev 8829, branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/ServerHolder.java)
===================================================================
--- branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/ServerHolder.java	                        (rev 0)
+++ branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/ServerHolder.java	2010-01-21 17:05:06 UTC (rev 8830)
@@ -0,0 +1,31 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.integration.transports.netty;
+
+import org.hornetq.core.remoting.RemotingConnection;
+import org.hornetq.core.server.HornetQServer;
+
+/**
+ * A ServerHolder
+ *
+ * @author jmesnil
+ *
+ *
+ */
+public interface ServerHolder
+{
+   HornetQServer getServer();
+
+   RemotingConnection getRemotingConnection(int connectionID);
+}



More information about the hornetq-commits mailing list