[hornetq-commits] JBoss hornetq SVN: r8840 - in branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/protocol: stomp and 1 other directory.

do-not-reply at jboss.org do-not-reply at jboss.org
Fri Jan 22 07:46:07 EST 2010


Author: jmesnil
Date: 2010-01-22 07:46:06 -0500 (Fri, 22 Jan 2010)
New Revision: 8840

Added:
   branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/protocol/stomp/
   branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/protocol/stomp/StompConnection.java
   branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/protocol/stomp/StompException.java
   branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/protocol/stomp/StompFrame.java
   branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/protocol/stomp/StompFrameError.java
   branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/protocol/stomp/StompMarshaller.java
   branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java
   branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/protocol/stomp/StompProtocolManagerFactory.java
   branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/protocol/stomp/StompSessionCallback.java
   branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/protocol/stomp/StompUtils.java
Removed:
   branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/protocol/stomp/ProtocolException.java
   branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/protocol/stomp/StompDestinationConverter.java
   branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/protocol/stomp/StompException.java
   branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/protocol/stomp/StompFrame.java
   branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/protocol/stomp/StompFrameError.java
   branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/protocol/stomp/StompMarshaller.java
   branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/protocol/stomp/StompPacketDecoder.java
Modified:
   branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/protocol/stomp/Stomp.java
   branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/protocol/stomp/StompFrameDelimiter.java
Log:
https://jira.jboss.org/jira/browse/HORNETQ-129: Implement STOMP v1.0

* updated code with trunk's refactoring of protocol support

Copied: branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/protocol/stomp (from rev 8826, branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/protocol/stomp)

Deleted: branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/protocol/stomp/ProtocolException.java
===================================================================
--- branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/protocol/stomp/ProtocolException.java	2010-01-21 14:41:55 UTC (rev 8826)
+++ branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/protocol/stomp/ProtocolException.java	2010-01-22 12:46:06 UTC (rev 8840)
@@ -1,50 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.hornetq.integration.protocol.stomp;
-
-import java.io.IOException;
-
-/**
- * @author <a href="http://hiramchirino.com">chirino</a>
- */
-public class ProtocolException extends IOException {
-    private static final long serialVersionUID = -2869735532997332242L;
-    private final boolean fatal;
-
-    public ProtocolException() {
-        this(null);
-    }
-
-    public ProtocolException(String s) {
-        this(s, false);
-    }
-
-    public ProtocolException(String s, boolean fatal) {
-        this(s, fatal, null);
-    }
-
-    public ProtocolException(String s, boolean fatal, Throwable cause) {
-        super(s);
-        this.fatal = fatal;
-        initCause(cause);
-    }
-
-    public boolean isFatal() {
-        return fatal;
-    }
-}

Modified: branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/protocol/stomp/Stomp.java
===================================================================
--- branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/protocol/stomp/Stomp.java	2010-01-21 14:41:55 UTC (rev 8826)
+++ branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/protocol/stomp/Stomp.java	2010-01-22 12:46:06 UTC (rev 8840)
@@ -15,7 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.hornetq.integration.protocol.stomp;
+package org.hornetq.core.protocol.stomp;
 
 
 /**

Added: branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/protocol/stomp/StompConnection.java
===================================================================
--- branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/protocol/stomp/StompConnection.java	                        (rev 0)
+++ branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/protocol/stomp/StompConnection.java	2010-01-22 12:46:06 UTC (rev 8840)
@@ -0,0 +1,140 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.protocol.stomp;
+
+import java.util.Collections;
+import java.util.List;
+
+import org.hornetq.api.core.HornetQBuffer;
+import org.hornetq.api.core.HornetQBuffers;
+import org.hornetq.api.core.HornetQException;
+import org.hornetq.core.logging.Logger;
+import org.hornetq.core.remoting.CloseListener;
+import org.hornetq.core.remoting.FailureListener;
+import org.hornetq.spi.core.protocol.ProtocolManager;
+import org.hornetq.spi.core.protocol.RemotingConnection;
+import org.hornetq.spi.core.remoting.Connection;
+
+/**
+ * A StompConnection
+ *
+ * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
+ *
+ *
+ */
+public class StompConnection implements RemotingConnection
+{
+   private static final Logger log = Logger.getLogger(StompConnection.class);
+
+   private final ProtocolManager manager;
+   
+   private final Connection transportConnection;
+      
+   StompConnection(final Connection transportConnection, final ProtocolManager manager)
+   {
+      this.transportConnection = transportConnection;
+      
+      this.manager = manager;
+   }
+
+   public void addCloseListener(CloseListener listener)
+   {
+   }
+
+   public void addFailureListener(FailureListener listener)
+   {
+   }
+
+   public boolean checkDataReceived()
+   {
+      return true;
+   }
+
+   public HornetQBuffer createBuffer(int size)
+   {
+      return HornetQBuffers.dynamicBuffer(size);
+   }
+
+   public void destroy()
+   {
+   }
+
+   public void disconnect()
+   {
+   }
+
+   public void fail(HornetQException me)
+   {
+   }
+
+   public void flush()
+   {  
+   }
+
+   public List<FailureListener> getFailureListeners()
+   {
+      return Collections.EMPTY_LIST;
+   }
+
+   public Object getID()
+   {
+      return transportConnection.getID();
+   }
+
+   public String getRemoteAddress()
+   {      
+      return transportConnection.getRemoteAddress();
+   }
+
+   public Connection getTransportConnection()
+   {
+      return transportConnection;
+   }
+
+   public boolean isClient()
+   {
+      return false;
+   }
+
+   public boolean isDestroyed()
+   {
+      return false;
+   }
+
+   public boolean removeCloseListener(CloseListener listener)
+   {
+      return false;
+   }
+
+   public boolean removeFailureListener(FailureListener listener)
+   {
+      return false;
+   }
+
+   public void setFailureListeners(List<FailureListener> listeners)
+   {
+   }
+
+   
+   public void bufferReceived(Object connectionID, HornetQBuffer buffer)
+   {
+      manager.handleBuffer(this, buffer);
+   }
+
+   public int isReadyToHandle(HornetQBuffer buffer)
+   {
+      return -1;
+   }
+
+}

Deleted: branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/protocol/stomp/StompDestinationConverter.java
===================================================================
--- branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/protocol/stomp/StompDestinationConverter.java	2010-01-21 14:41:55 UTC (rev 8826)
+++ branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/protocol/stomp/StompDestinationConverter.java	2010-01-22 12:46:06 UTC (rev 8840)
@@ -1,85 +0,0 @@
-/*
- * Copyright 2010 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *    http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied.  See the License for the specific language governing
- * permissions and limitations under the License.
- */
-
-package org.hornetq.integration.protocol.stomp;
-
-import org.hornetq.api.core.HornetQException;
-import org.hornetq.api.core.SimpleString;
-import org.hornetq.jms.client.HornetQQueue;
-import org.hornetq.jms.client.HornetQTemporaryQueue;
-import org.hornetq.jms.client.HornetQTemporaryTopic;
-import org.hornetq.jms.client.HornetQTopic;
-
-/**
- * A StompDestinationConverter
- *
- * @author jmesnil
- *
- *
- */
-public class StompDestinationConverter
-{
-
-   // Constants -----------------------------------------------------
-
-   // Attributes ----------------------------------------------------
-
-   // Static --------------------------------------------------------
-
-   public static SimpleString convertDestination(String name) throws HornetQException
-   {
-      if (name == null)
-      {
-         throw new HornetQException(HornetQException.ILLEGAL_STATE, "No destination is specified!");
-      }
-      else if (name.startsWith("/queue/"))
-      {
-         String queueName = name.substring("/queue/".length(), name.length());
-         return HornetQQueue.createAddressFromName(queueName);
-      }
-      else if (name.startsWith("/topic/"))
-      {
-         String topicName = name.substring("/topic/".length(), name.length());
-         return HornetQTopic.createAddressFromName(topicName);
-      }
-      else if (name.startsWith("/temp-queue/"))
-      {
-         String tempName = name.substring("/temp-queue/".length(), name.length());
-         return HornetQTemporaryQueue.createAddressFromName(tempName);
-      }
-      else if (name.startsWith("/temp-topic/"))
-      {
-         String tempName = name.substring("/temp-topic/".length(), name.length());
-         return HornetQTemporaryTopic.createAddressFromName(tempName);
-      }
-      else
-      {
-         throw new HornetQException(HornetQException.ILLEGAL_STATE, "Illegal destination name: [" + name +
-                                                                    "] -- StompConnect destinations " +
-                                                                    "must begine with one of: /queue/ /topic/ /temp-queue/ /temp-topic/");
-      }
-   }
-
-   // Constructors --------------------------------------------------
-
-   // Public --------------------------------------------------------
-
-   // Package protected ---------------------------------------------
-
-   // Protected -----------------------------------------------------
-
-   // Private -------------------------------------------------------
-
-   // Inner classes -------------------------------------------------
-
-}

Deleted: branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/protocol/stomp/StompException.java
===================================================================
--- branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/protocol/stomp/StompException.java	2010-01-21 14:41:55 UTC (rev 8826)
+++ branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/protocol/stomp/StompException.java	2010-01-22 12:46:06 UTC (rev 8840)
@@ -1,52 +0,0 @@
-/*
- * Copyright 2010 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *    http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied.  See the License for the specific language governing
- * permissions and limitations under the License.
- */
-
-package org.hornetq.integration.protocol.stomp;
-
-/**
- * A StompException
- *
- * @author jmesnil
- *
- *
- */
-public class StompException extends Exception
-{
-
-   /**
-    * @param string
-    */
-   public StompException(String string)
-   {
-      super(string);
-   }
-
-   // Constants -----------------------------------------------------
-
-   // Attributes ----------------------------------------------------
-
-   // Static --------------------------------------------------------
-
-   // Constructors --------------------------------------------------
-
-   // Public --------------------------------------------------------
-
-   // Package protected ---------------------------------------------
-
-   // Protected -----------------------------------------------------
-
-   // Private -------------------------------------------------------
-
-   // Inner classes -------------------------------------------------
-
-}

Copied: branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/protocol/stomp/StompException.java (from rev 8830, branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/protocol/stomp/StompException.java)
===================================================================
--- branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/protocol/stomp/StompException.java	                        (rev 0)
+++ branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/protocol/stomp/StompException.java	2010-01-22 12:46:06 UTC (rev 8840)
@@ -0,0 +1,50 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.hornetq.core.protocol.stomp;
+
+import java.io.IOException;
+
+/**
+ * @author <a href="http://hiramchirino.com">chirino</a>
+ */
+class StompException extends IOException {
+    private static final long serialVersionUID = -2869735532997332242L;
+    private final boolean fatal;
+
+    public StompException() {
+        this(null);
+    }
+
+    public StompException(String s) {
+        this(s, false);
+    }
+
+    public StompException(String s, boolean fatal) {
+        this(s, fatal, null);
+    }
+
+    public StompException(String s, boolean fatal, Throwable cause) {
+        super(s);
+        this.fatal = fatal;
+        initCause(cause);
+    }
+
+    public boolean isFatal() {
+        return fatal;
+    }
+}

Deleted: branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/protocol/stomp/StompFrame.java
===================================================================
--- branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/protocol/stomp/StompFrame.java	2010-01-21 14:41:55 UTC (rev 8826)
+++ branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/protocol/stomp/StompFrame.java	2010-01-22 12:46:06 UTC (rev 8840)
@@ -1,70 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.hornetq.integration.protocol.stomp;
-
-import java.util.HashMap;
-import java.util.Map;
-
-/**
- * Represents all the data in a STOMP frame.
- *
- * @author <a href="http://hiramchirino.com">chirino</a>
- */
-public class StompFrame
-{
-   private static final byte[] NO_DATA = new byte[] {};
-
-   private String command;
-
-   private Map<String, Object> headers;
-
-   private byte[] content = StompFrame.NO_DATA;
-
-   public StompFrame()
-   {
-      this.headers = new HashMap<String, Object>();
-   }
-
-   public StompFrame(String command, Map<String, Object> headers, byte[] data)
-   {
-      this.command = command;
-      this.headers = headers;
-      this.content = data;
-   }
-
-   public String getCommand()
-   {
-      return command;
-   }
-
-   public byte[] getContent()
-   {
-      return content;
-   }
-
-   public Map<String, Object> getHeaders()
-   {
-      return headers;
-   }
-
-   @Override
-   public String toString()
-   {
-      return "StompFrame[command=" + command + ", headers=" + headers + ",content-length=" + content.length + "]";
-   }
-}

Copied: branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/protocol/stomp/StompFrame.java (from rev 8829, branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/protocol/stomp/StompFrame.java)
===================================================================
--- branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/protocol/stomp/StompFrame.java	                        (rev 0)
+++ branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/protocol/stomp/StompFrame.java	2010-01-22 12:46:06 UTC (rev 8840)
@@ -0,0 +1,70 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.hornetq.core.protocol.stomp;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Represents all the data in a STOMP frame.
+ *
+ * @author <a href="http://hiramchirino.com">chirino</a>
+ */
+class StompFrame
+{
+   private static final byte[] NO_DATA = new byte[] {};
+
+   private String command;
+
+   private Map<String, Object> headers;
+
+   private byte[] content = StompFrame.NO_DATA;
+
+   public StompFrame()
+   {
+      this.headers = new HashMap<String, Object>();
+   }
+
+   public StompFrame(String command, Map<String, Object> headers, byte[] data)
+   {
+      this.command = command;
+      this.headers = headers;
+      this.content = data;
+   }
+
+   public String getCommand()
+   {
+      return command;
+   }
+
+   public byte[] getContent()
+   {
+      return content;
+   }
+
+   public Map<String, Object> getHeaders()
+   {
+      return headers;
+   }
+
+   @Override
+   public String toString()
+   {
+      return "StompFrame[command=" + command + ", headers=" + headers + ",content-length=" + content.length + "]";
+   }
+}

Modified: branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/protocol/stomp/StompFrameDelimiter.java
===================================================================
--- branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/protocol/stomp/StompFrameDelimiter.java	2010-01-21 14:41:55 UTC (rev 8826)
+++ branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/protocol/stomp/StompFrameDelimiter.java	2010-01-22 12:46:06 UTC (rev 8840)
@@ -11,7 +11,7 @@
  * permissions and limitations under the License.
  */
 
-package org.hornetq.integration.protocol.stomp;
+package org.hornetq.core.protocol.stomp;
 
 import org.jboss.netty.handler.codec.frame.DelimiterBasedFrameDecoder;
 import org.jboss.netty.handler.codec.frame.Delimiters;

Deleted: branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/protocol/stomp/StompFrameError.java
===================================================================
--- branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/protocol/stomp/StompFrameError.java	2010-01-21 14:41:55 UTC (rev 8826)
+++ branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/protocol/stomp/StompFrameError.java	2010-01-22 12:46:06 UTC (rev 8840)
@@ -1,35 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.hornetq.integration.protocol.stomp;
-
-/**
- * Command indicating that an invalid Stomp Frame was received.
- *
- * @author <a href="http://hiramchirino.com">chirino</a>
- */
-public class StompFrameError extends StompFrame {
-    private final ProtocolException exception;
-
-    public StompFrameError(ProtocolException exception) {
-        this.exception = exception;
-    }
-
-    public ProtocolException getException() {
-        return exception;
-    }
-}

Copied: branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/protocol/stomp/StompFrameError.java (from rev 8830, branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/protocol/stomp/StompFrameError.java)
===================================================================
--- branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/protocol/stomp/StompFrameError.java	                        (rev 0)
+++ branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/protocol/stomp/StompFrameError.java	2010-01-22 12:46:06 UTC (rev 8840)
@@ -0,0 +1,35 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.hornetq.core.protocol.stomp;
+
+/**
+ * Command indicating that an invalid Stomp Frame was received.
+ *
+ * @author <a href="http://hiramchirino.com">chirino</a>
+ */
+class StompFrameError extends StompFrame {
+    private final StompException exception;
+
+    public StompFrameError(StompException exception) {
+        this.exception = exception;
+    }
+
+    public StompException getException() {
+        return exception;
+    }
+}

Deleted: branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/protocol/stomp/StompMarshaller.java
===================================================================
--- branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/protocol/stomp/StompMarshaller.java	2010-01-21 14:41:55 UTC (rev 8826)
+++ branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/protocol/stomp/StompMarshaller.java	2010-01-22 12:46:06 UTC (rev 8840)
@@ -1,191 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.hornetq.integration.protocol.stomp;
-
-import java.io.ByteArrayOutputStream;
-import java.io.DataOutput;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
-
-import org.hornetq.api.core.HornetQBuffer;
-
-/**
- * Implements marshalling and unmarsalling the <a href="http://stomp.codehaus.org/">Stomp</a> protocol.
- */
-public class StompMarshaller {
-    private static final byte[] NO_DATA = new byte[]{};
-    private static final byte[] END_OF_FRAME = new byte[]{0, '\n'};
-    private static final int MAX_COMMAND_LENGTH = 1024;
-    private static final int MAX_HEADER_LENGTH = 1024 * 10;
-    private static final int MAX_HEADERS = 1000;
-    private static final int MAX_DATA_LENGTH = 1024 * 1024 * 100;
-    private int version = 1;
-
-    public int getVersion() {
-        return version;
-    }
-
-    public void setVersion(int version) {
-        this.version = version;
-    }
-
-    public byte[] marshal(StompFrame command) throws IOException {
-        ByteArrayOutputStream baos = new ByteArrayOutputStream();
-        DataOutputStream dos = new DataOutputStream(baos);
-        marshal(command, dos);
-        dos.close();
-        return baos.toByteArray();
-    }
-
-    public void marshal(StompFrame stomp, DataOutput os) throws IOException {
-        StringBuffer buffer = new StringBuffer();
-        buffer.append(stomp.getCommand());
-        buffer.append(Stomp.NEWLINE);
-
-        // Output the headers.
-        for (Iterator iter = stomp.getHeaders().entrySet().iterator(); iter.hasNext();) {
-            Map.Entry entry = (Map.Entry) iter.next();
-            buffer.append(entry.getKey());
-            buffer.append(Stomp.Headers.SEPERATOR);
-            buffer.append(entry.getValue());
-            buffer.append(Stomp.NEWLINE);
-        }
-
-        // Add a newline to seperate the headers from the content.
-        buffer.append(Stomp.NEWLINE);
-
-        os.write(buffer.toString().getBytes("UTF-8"));
-        os.write(stomp.getContent());
-        os.write(END_OF_FRAME);
-    }
-
-    public StompFrame unmarshal(HornetQBuffer in) throws IOException {
-
-        try {
-            String action = null;
-
-            // skip white space to next real action line
-            while (true) {
-                action = readLine(in, MAX_COMMAND_LENGTH, "The maximum command length was exceeded");
-                if (action == null) {
-                    throw new IOException("connection was closed");
-                }
-                else {
-                    action = action.trim();
-                    if (action.length() > 0) {
-                        break;
-                    }
-                }
-            }
-
-            // Parse the headers
-            HashMap headers = new HashMap(25);
-            while (true) {
-                String line = readLine(in, MAX_HEADER_LENGTH, "The maximum header length was exceeded");
-                if (line != null && line.trim().length() > 0) {
-
-                    if (headers.size() > MAX_HEADERS) {
-                        throw new ProtocolException("The maximum number of headers was exceeded", true);
-                    }
-
-                    try {
-                        int seperator_index = line.indexOf(Stomp.Headers.SEPERATOR);
-                        String name = line.substring(0, seperator_index).trim();
-                        String value = line.substring(seperator_index + 1, line.length()).trim();
-                        headers.put(name, value);
-                    }
-                    catch (Exception e) {
-                        throw new ProtocolException("Unable to parser header line [" + line + "]", true);
-                    }
-                }
-                else {
-                    break;
-                }
-            }
-
-            // Read in the data part.
-            byte[] data = NO_DATA;
-            String contentLength = (String) headers.get(Stomp.Headers.CONTENT_LENGTH);
-            if (contentLength != null) {
-
-                // Bless the client, he's telling us how much data to read in.
-                int length;
-                try {
-                    length = Integer.parseInt(contentLength.trim());
-                }
-                catch (NumberFormatException e) {
-                    throw new ProtocolException("Specified content-length is not a valid integer", true);
-                }
-
-                if (length > MAX_DATA_LENGTH) {
-                    throw new ProtocolException("The maximum data length was exceeded", true);
-                }
-
-                data = new byte[length];
-                in.readBytes(data);
-
-                if (in.readByte() != 0) {
-                    throw new ProtocolException(Stomp.Headers.CONTENT_LENGTH + " bytes were read and " + "there was no trailing null byte", true);
-                }
-            }
-            else {
-
-                // We don't know how much to read.. data ends when we hit a 0
-                byte b;
-                ByteArrayOutputStream baos = null;
-                while (in.readableBytes() > 0 && (b = in.readByte()) != 0) {
-
-                    if (baos == null) {
-                        baos = new ByteArrayOutputStream();
-                    }
-                    else if (baos.size() > MAX_DATA_LENGTH) {
-                        throw new ProtocolException("The maximum data length was exceeded", true);
-                    }
-
-                    baos.write(b);
-                }
-
-                if (baos != null) {
-                    baos.close();
-                    data = baos.toByteArray();
-                }
-            }
-
-            return new StompFrame(action, headers, data);
-        }
-        catch (ProtocolException e) {
-            return new StompFrameError(e);
-        }
-    }
-
-    protected String readLine(HornetQBuffer in, int maxLength, String errorMessage) throws IOException {
-        byte b;
-        ByteArrayOutputStream baos = new ByteArrayOutputStream(maxLength);
-        while ((b = in.readByte()) != '\n') {
-            if (baos.size() > maxLength) {
-                throw new ProtocolException(errorMessage, true);
-            }
-            baos.write(b);
-        }
-        byte[] sequence = baos.toByteArray();
-        return new String(sequence, "UTF-8");
-    }
-}

Copied: branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/protocol/stomp/StompMarshaller.java (from rev 8830, branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/protocol/stomp/StompMarshaller.java)
===================================================================
--- branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/protocol/stomp/StompMarshaller.java	                        (rev 0)
+++ branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/protocol/stomp/StompMarshaller.java	2010-01-22 12:46:06 UTC (rev 8840)
@@ -0,0 +1,191 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.hornetq.core.protocol.stomp;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+
+import org.hornetq.api.core.HornetQBuffer;
+
+/**
+ * Implements marshalling and unmarsalling the <a href="http://stomp.codehaus.org/">Stomp</a> protocol.
+ */
+class StompMarshaller {
+    private static final byte[] NO_DATA = new byte[]{};
+    private static final byte[] END_OF_FRAME = new byte[]{0, '\n'};
+    private static final int MAX_COMMAND_LENGTH = 1024;
+    private static final int MAX_HEADER_LENGTH = 1024 * 10;
+    private static final int MAX_HEADERS = 1000;
+    private static final int MAX_DATA_LENGTH = 1024 * 1024 * 100;
+    private int version = 1;
+
+    public int getVersion() {
+        return version;
+    }
+
+    public void setVersion(int version) {
+        this.version = version;
+    }
+
+    public byte[] marshal(StompFrame command) throws IOException {
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        DataOutputStream dos = new DataOutputStream(baos);
+        marshal(command, dos);
+        dos.close();
+        return baos.toByteArray();
+    }
+
+    public void marshal(StompFrame stomp, DataOutput os) throws IOException {
+        StringBuffer buffer = new StringBuffer();
+        buffer.append(stomp.getCommand());
+        buffer.append(Stomp.NEWLINE);
+
+        // Output the headers.
+        for (Iterator iter = stomp.getHeaders().entrySet().iterator(); iter.hasNext();) {
+            Map.Entry entry = (Map.Entry) iter.next();
+            buffer.append(entry.getKey());
+            buffer.append(Stomp.Headers.SEPERATOR);
+            buffer.append(entry.getValue());
+            buffer.append(Stomp.NEWLINE);
+        }
+
+        // Add a newline to seperate the headers from the content.
+        buffer.append(Stomp.NEWLINE);
+
+        os.write(buffer.toString().getBytes("UTF-8"));
+        os.write(stomp.getContent());
+        os.write(END_OF_FRAME);
+    }
+
+    public StompFrame unmarshal(HornetQBuffer in) throws IOException {
+
+        try {
+            String action = null;
+
+            // skip white space to next real action line
+            while (true) {
+                action = readLine(in, MAX_COMMAND_LENGTH, "The maximum command length was exceeded");
+                if (action == null) {
+                    throw new IOException("connection was closed");
+                }
+                else {
+                    action = action.trim();
+                    if (action.length() > 0) {
+                        break;
+                    }
+                }
+            }
+
+            // Parse the headers
+            HashMap headers = new HashMap(25);
+            while (true) {
+                String line = readLine(in, MAX_HEADER_LENGTH, "The maximum header length was exceeded");
+                if (line != null && line.trim().length() > 0) {
+
+                    if (headers.size() > MAX_HEADERS) {
+                        throw new StompException("The maximum number of headers was exceeded", true);
+                    }
+
+                    try {
+                        int seperator_index = line.indexOf(Stomp.Headers.SEPERATOR);
+                        String name = line.substring(0, seperator_index).trim();
+                        String value = line.substring(seperator_index + 1, line.length()).trim();
+                        headers.put(name, value);
+                    }
+                    catch (Exception e) {
+                        throw new StompException("Unable to parser header line [" + line + "]", true);
+                    }
+                }
+                else {
+                    break;
+                }
+            }
+
+            // Read in the data part.
+            byte[] data = NO_DATA;
+            String contentLength = (String) headers.get(Stomp.Headers.CONTENT_LENGTH);
+            if (contentLength != null) {
+
+                // Bless the client, he's telling us how much data to read in.
+                int length;
+                try {
+                    length = Integer.parseInt(contentLength.trim());
+                }
+                catch (NumberFormatException e) {
+                    throw new StompException("Specified content-length is not a valid integer", true);
+                }
+
+                if (length > MAX_DATA_LENGTH) {
+                    throw new StompException("The maximum data length was exceeded", true);
+                }
+
+                data = new byte[length];
+                in.readBytes(data);
+
+                if (in.readByte() != 0) {
+                    throw new StompException(Stomp.Headers.CONTENT_LENGTH + " bytes were read and " + "there was no trailing null byte", true);
+                }
+            }
+            else {
+
+                // We don't know how much to read.. data ends when we hit a 0
+                byte b;
+                ByteArrayOutputStream baos = null;
+                while (in.readableBytes() > 0 && (b = in.readByte()) != 0) {
+
+                    if (baos == null) {
+                        baos = new ByteArrayOutputStream();
+                    }
+                    else if (baos.size() > MAX_DATA_LENGTH) {
+                        throw new StompException("The maximum data length was exceeded", true);
+                    }
+
+                    baos.write(b);
+                }
+
+                if (baos != null) {
+                    baos.close();
+                    data = baos.toByteArray();
+                }
+            }
+
+            return new StompFrame(action, headers, data);
+        }
+        catch (StompException e) {
+            return new StompFrameError(e);
+        }
+    }
+
+    protected String readLine(HornetQBuffer in, int maxLength, String errorMessage) throws IOException {
+        byte b;
+        ByteArrayOutputStream baos = new ByteArrayOutputStream(maxLength);
+        while ((b = in.readByte()) != '\n') {
+            if (baos.size() > maxLength) {
+                throw new StompException(errorMessage, true);
+            }
+            baos.write(b);
+        }
+        byte[] sequence = baos.toByteArray();
+        return new String(sequence, "UTF-8");
+    }
+}

Deleted: branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/protocol/stomp/StompPacketDecoder.java
===================================================================
--- branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/protocol/stomp/StompPacketDecoder.java	2010-01-21 14:41:55 UTC (rev 8826)
+++ branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/protocol/stomp/StompPacketDecoder.java	2010-01-22 12:46:06 UTC (rev 8840)
@@ -1,51 +0,0 @@
-/*
- * Copyright 2010 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *    http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied.  See the License for the specific language governing
- * permissions and limitations under the License.
- */
-
-package org.hornetq.integration.protocol.stomp;
-
-import org.hornetq.api.core.HornetQBuffer;
-import org.hornetq.core.buffers.impl.ChannelBufferWrapper;
-import org.jboss.netty.buffer.ChannelBuffer;
-import org.jboss.netty.channel.ChannelHandlerContext;
-import org.jboss.netty.channel.ChannelPipelineCoverage;
-import org.jboss.netty.channel.Channels;
-import org.jboss.netty.channel.MessageEvent;
-import org.jboss.netty.channel.SimpleChannelHandler;
-
-/**
- * A StompPacketDecoder
- *
- * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
- */
- at ChannelPipelineCoverage("one")
-public class StompPacketDecoder extends SimpleChannelHandler
-{
-   private final StompMarshaller marshaller;
-   
-   // PacketDecoder implementation ----------------------------------
-
-   public StompPacketDecoder(final StompMarshaller marshaller)
-   {
-      this.marshaller = marshaller;
-   }
-
-   @Override
-   public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception
-   {
-      ChannelBuffer in = (ChannelBuffer)e.getMessage();
-      HornetQBuffer buffer = new ChannelBufferWrapper(in);
-      StompFrame frame = marshaller.unmarshal(buffer);
-      
-      Channels.fireMessageReceived(ctx, frame);
-   }
-}

Added: branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java
===================================================================
--- branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java	                        (rev 0)
+++ branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java	2010-01-22 12:46:06 UTC (rev 8840)
@@ -0,0 +1,296 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.protocol.stomp;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.io.PrintWriter;
+import java.io.UnsupportedEncodingException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.hornetq.api.core.HornetQBuffer;
+import org.hornetq.api.core.HornetQBuffers;
+import org.hornetq.api.core.HornetQException;
+import org.hornetq.api.core.Interceptor;
+import org.hornetq.api.core.Message;
+import org.hornetq.api.core.SimpleString;
+import org.hornetq.api.core.client.HornetQClient;
+import org.hornetq.core.logging.Logger;
+import org.hornetq.core.server.HornetQServer;
+import org.hornetq.core.server.ServerMessage;
+import org.hornetq.core.server.ServerSession;
+import org.hornetq.core.server.impl.ServerMessageImpl;
+import org.hornetq.spi.core.protocol.ConnectionEntry;
+import org.hornetq.spi.core.protocol.ProtocolManager;
+import org.hornetq.spi.core.protocol.RemotingConnection;
+import org.hornetq.spi.core.remoting.Connection;
+import org.hornetq.utils.UUIDGenerator;
+
+/**
+ * StompProtocolManager
+ * 
+ * A stupid protocol to demonstrate how to implement a new protocol in HornetQ
+ *
+ * @author Tim Fox
+ *
+ *
+ */
+public class StompProtocolManager implements ProtocolManager
+{
+   private static final Logger log = Logger.getLogger(StompProtocolManager.class);
+
+   private final HornetQServer server;
+
+   private final StompMarshaller marshaller;
+
+   private final Map<RemotingConnection, ServerSession> sessions = new HashMap<RemotingConnection, ServerSession>();
+
+   public StompProtocolManager(final HornetQServer server, final List<Interceptor> interceptors)
+   {
+      this.server = server;
+      this.marshaller = new StompMarshaller();
+   }
+
+   public ConnectionEntry createConnectionEntry(final Connection connection)
+   {
+      StompConnection conn = new StompConnection(connection, this);
+
+      return new ConnectionEntry(conn, 0, 0);
+   }
+
+   public void removeHandler(String name)
+   {
+   }
+
+   public void handleBuffer(RemotingConnection connection, HornetQBuffer buffer)
+   {
+      StompFrame frame = null;
+      try
+      {
+         frame = marshaller.unmarshal(buffer);
+         System.out.println("RECEIVED " + frame);
+
+         String command = frame.getCommand();
+
+         StompFrame response = null;
+         if (Stomp.Commands.CONNECT.equals(command))
+         {
+            response = onConnect(frame, server, connection);
+         }
+         else if (Stomp.Commands.DISCONNECT.equals(command))
+         {
+            response = onDisconnect(frame, server, connection);
+         }
+         else if (Stomp.Commands.SEND.equals(command))
+         {
+            response = onSend(frame, server, connection);
+         }
+         else if (Stomp.Commands.SUBSCRIBE.equals(command))
+         {
+            response = onSubscribe(frame, server, connection);
+         }
+         else
+         {
+            log.error("Unsupported Stomp frame: " + frame);
+            response = new StompFrame(Stomp.Responses.ERROR,
+                                      new HashMap<String, Object>(),
+                                      ("Unsupported frame: " + command).getBytes());
+         }
+
+         if (response != null)
+         {
+            send(connection, response);
+         }
+      }
+      catch (StompException ex)
+      {
+         ByteArrayOutputStream baos = new ByteArrayOutputStream();
+         try
+         {
+            // Let the stomp client know about any protocol errors.
+            PrintWriter stream = new PrintWriter(new OutputStreamWriter(baos, "UTF-8"));
+            ex.printStackTrace(stream);
+            stream.close();
+         }
+         catch (UnsupportedEncodingException e)
+         {
+            // TODO Auto-generated catch block
+            e.printStackTrace();
+         }
+
+         Map<String, Object> headers = new HashMap<String, Object>();
+         headers.put(Stomp.Headers.Error.MESSAGE, ex.getMessage());
+
+         final String receiptId = (String)frame.getHeaders().get(Stomp.Headers.RECEIPT_REQUESTED);
+         if (receiptId != null)
+         {
+            headers.put(Stomp.Headers.Response.RECEIPT_ID, receiptId);
+         }
+
+         StompFrame errorMessage = new StompFrame(Stomp.Responses.ERROR, headers, baos.toByteArray());
+         try
+         {
+            send(connection, errorMessage);
+         }
+         catch (IOException e)
+         {
+            // TODO Auto-generated catch block
+            e.printStackTrace();
+         }
+      }
+      catch (Exception ex)
+      {
+         ex.printStackTrace();
+      }
+   }
+
+   private StompFrame onSubscribe(StompFrame frame, HornetQServer server, RemotingConnection connection) throws Exception,
+                                                                                                        StompException,
+                                                                                                        HornetQException
+   {
+      Map<String, Object> headers = frame.getHeaders();
+      String queue = (String)headers.get(Stomp.Headers.Send.DESTINATION);
+      SimpleString queueName = SimpleString.toSimpleString(StompUtils.toHornetQAddress(queue));
+
+      ServerSession session = checkAndGetSession(connection);
+      long consumerID = server.getStorageManager().generateUniqueID();
+      session.createConsumer(consumerID, queueName, null, false);
+      session.receiveConsumerCredits(consumerID, -1);
+      session.start();
+
+      return null;
+   }
+
+   private ServerSession checkAndGetSession(RemotingConnection connection) throws StompException
+   {
+      ServerSession session = sessions.get(connection);
+      if (session == null)
+      {
+         throw new StompException("Not connected");
+      }
+      return session;
+   }
+
+   private StompFrame onDisconnect(StompFrame frame, HornetQServer server, RemotingConnection connection) throws StompException
+   {
+      ServerSession session = checkAndGetSession(connection);
+      if (session != null)
+      {
+         try
+         {
+            session.close();
+         }
+         catch (Exception e)
+         {
+            throw new StompException(e.getMessage());
+         }
+         sessions.remove(connection);
+      }
+      return null;
+   }
+
+   private StompFrame onSend(StompFrame frame, HornetQServer server, RemotingConnection connection) throws Exception
+   {
+      ServerSession session = checkAndGetSession(connection);
+
+      Map<String, Object> headers = frame.getHeaders();
+      String queue = (String)headers.get(Stomp.Headers.Send.DESTINATION);
+      /*
+      String type = (String)headers.get(Stomp.Headers.Send.TYPE);
+      long expiration = (Long)headers.get(Stomp.Headers.Send.EXPIRATION_TIME);
+      byte priority = (Byte)headers.get(Stomp.Headers.Send.PRIORITY);
+      boolean durable = (Boolean)headers.get(Stomp.Headers.Send.PERSISTENT);
+      */
+      byte type = Message.TEXT_TYPE;
+      if (headers.containsKey(Stomp.Headers.CONTENT_LENGTH))
+      {
+         type = Message.BYTES_TYPE;
+      }
+      long timestamp = System.currentTimeMillis();
+      boolean durable = false;
+      long expiration = -1;
+      byte priority = 9;
+      SimpleString address = SimpleString.toSimpleString(StompUtils.toHornetQAddress(queue));
+
+      ServerMessage message = new ServerMessageImpl(server.getStorageManager().generateUniqueID(), 512);
+      message.setType(type);
+      message.setTimestamp(timestamp);
+      message.setAddress(address);
+      byte[] content = frame.getContent();
+      if (type == Message.TEXT_TYPE)
+      {
+         message.getBodyBuffer().writeNullableSimpleString(SimpleString.toSimpleString(new String(content)));
+      }
+      else
+      {
+         message.getBodyBuffer().writeBytes(content);
+      }
+
+      session.send(message);
+      if (headers.containsKey(Stomp.Headers.RECEIPT_REQUESTED))
+      {
+         Map<String, Object> h = new HashMap<String, Object>();
+         h.put(Stomp.Headers.Response.RECEIPT_ID, headers.get(Stomp.Headers.RECEIPT_REQUESTED));
+         return new StompFrame(Stomp.Responses.RECEIPT, h, new byte[] {});
+      }
+      else
+      {
+         return null;
+      }
+   }
+
+   private StompFrame onConnect(StompFrame frame, HornetQServer server, final RemotingConnection connection) throws Exception
+   {
+      Map<String, Object> headers = frame.getHeaders();
+      String login = (String)headers.get(Stomp.Headers.Connect.LOGIN);
+      String passcode = (String)headers.get(Stomp.Headers.Connect.PASSCODE);
+      String requestID = (String)headers.get(Stomp.Headers.Connect.REQUEST_ID);
+
+      String name = UUIDGenerator.getInstance().generateStringUUID();
+      ServerSession session = server.createSession(name,
+                                                   login,
+                                                   passcode,
+                                                   HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE,
+                                                   connection,
+                                                   true,
+                                                   true,
+                                                   false,
+                                                   false,
+                                                   new StompSessionCallback(marshaller, connection));
+      sessions.put(connection, session);
+      System.out.println(">>> created session " + session);
+      HashMap<String, Object> h = new HashMap<String, Object>();
+      h.put(Stomp.Headers.Connected.SESSION, name);
+      h.put(Stomp.Headers.Connected.RESPONSE_ID, requestID);
+      return new StompFrame(Stomp.Responses.CONNECTED, h, new byte[] {});
+   }
+
+   private void send(RemotingConnection connection, StompFrame frame) throws IOException
+   {
+      System.out.println("SENDING >>> " + frame);
+      byte[] bytes = marshaller.marshal(frame);
+      HornetQBuffer buffer = HornetQBuffers.wrappedBuffer(bytes);
+      System.out.println("ready to send reply: " + buffer);
+      connection.getTransportConnection().write(buffer, true);
+   }
+
+   public int isReadyToHandle(HornetQBuffer buffer)
+   {
+      return -1;
+   }
+
+}

Added: branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/protocol/stomp/StompProtocolManagerFactory.java
===================================================================
--- branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/protocol/stomp/StompProtocolManagerFactory.java	                        (rev 0)
+++ branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/protocol/stomp/StompProtocolManagerFactory.java	2010-01-22 12:46:06 UTC (rev 8840)
@@ -0,0 +1,38 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.protocol.stomp;
+
+import java.util.List;
+
+import org.hornetq.api.core.Interceptor;
+import org.hornetq.core.server.HornetQServer;
+import org.hornetq.spi.core.protocol.ProtocolManager;
+import org.hornetq.spi.core.protocol.ProtocolManagerFactory;
+
+/**
+ * A StompProtocolManagerFactory
+ *
+ * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
+ *
+ *
+ */
+public class StompProtocolManagerFactory implements ProtocolManagerFactory
+{
+
+   public ProtocolManager createProtocolManager(final HornetQServer server, final List<Interceptor> interceptors)
+   {
+      return new StompProtocolManager(server, interceptors);
+   }
+
+}

Copied: branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/protocol/stomp/StompSessionCallback.java (from rev 8836, branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/protocol/stomp/StompSessionCallback.java)
===================================================================
--- branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/protocol/stomp/StompSessionCallback.java	                        (rev 0)
+++ branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/protocol/stomp/StompSessionCallback.java	2010-01-22 12:46:06 UTC (rev 8840)
@@ -0,0 +1,93 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.hornetq.core.protocol.stomp;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.hornetq.api.core.HornetQBuffer;
+import org.hornetq.api.core.HornetQBuffers;
+import org.hornetq.api.core.Message;
+import org.hornetq.api.core.SimpleString;
+import org.hornetq.core.server.ServerMessage;
+import org.hornetq.spi.core.protocol.RemotingConnection;
+import org.hornetq.spi.core.protocol.SessionCallback;
+
+/**
+ * A StompSessionCallback
+ *
+ * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
+ */
+class StompSessionCallback implements SessionCallback
+{
+   private final RemotingConnection connection;
+
+   private final StompMarshaller marshaller;
+
+   StompSessionCallback(final StompMarshaller marshaller, final RemotingConnection connection)
+   {
+      this.marshaller = marshaller;
+      this.connection = connection;
+   }
+
+   public void sendProducerCreditsMessage(int credits, SimpleString address, int offset)
+   {
+   }
+
+   public int sendMessage(ServerMessage serverMessage, long consumerID, int deliveryCount)
+   {
+      try
+      {
+         Map<String, Object> headers = new HashMap<String, Object>();
+         headers.put(Stomp.Headers.Message.DESTINATION, StompUtils.toStompDestination(serverMessage.getAddress()
+                                                                                                       .toString()));
+         byte[] data = new byte[] {};
+         if (serverMessage.getType() == Message.TEXT_TYPE)
+         {
+            SimpleString text = serverMessage.getBodyBuffer().readNullableSimpleString();
+            if (text != null)
+            {
+               data = text.toString().getBytes();
+            }
+         }
+         StompFrame msg = new StompFrame(Stomp.Responses.MESSAGE, headers, data);
+         System.out.println("SENDING : " + msg);
+         byte[] bytes = marshaller.marshal(msg);
+         HornetQBuffer buffer = HornetQBuffers.wrappedBuffer(bytes);
+         connection.getTransportConnection().write(buffer, true);
+
+         return bytes.length;
+
+      }
+      catch (Exception e)
+      {
+         e.printStackTrace();
+         return 0;
+      }
+
+   }
+
+   public int sendLargeMessageContinuation(long consumerID, byte[] body, boolean continues, boolean requiresResponse)
+   {
+      return 0;
+   }
+
+   public int sendLargeMessage(long consumerID, byte[] headerBuffer, long bodySize, int deliveryCount)
+   {
+      return 0;
+   }
+
+   public void closed()
+   {
+   }
+}
\ No newline at end of file

Copied: branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/protocol/stomp/StompUtils.java (from rev 8830, branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/protocol/stomp/StompUtils.java)
===================================================================
--- branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/protocol/stomp/StompUtils.java	                        (rev 0)
+++ branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/protocol/stomp/StompUtils.java	2010-01-22 12:46:06 UTC (rev 8840)
@@ -0,0 +1,127 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.protocol.stomp;
+
+import org.hornetq.api.core.HornetQException;
+
+/**
+ * A StompUtils
+ *
+ * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
+ *
+ *
+ */
+class StompUtils
+{
+
+   public static String HQ_QUEUE_PREFIX = "jms.queue.";
+
+   public static String STOMP_QUEUE_PREFIX = "/queue/";
+
+   public static String HQ_TEMP_QUEUE_PREFIX = "jms.tempqueue.";
+
+   public static String STOMP_TEMP_QUEUE_PREFIX = "/temp-queue/";
+
+   public static String HQ_TOPIC_PREFIX = "jms.topic.";
+
+   public static String STOMP_TOPIC_PREFIX = "/topic/";
+
+   public static String HQ_TEMP_TOPIC_PREFIX = "jms.temptopic.";
+
+   public static String STOMP_TEMP_TOPIC_PREFIX = "/temp-topic/";
+
+   // Constants -----------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+
+   // Static --------------------------------------------------------
+
+   public static String toHornetQAddress(String stompDestination) throws HornetQException
+   {
+      if (stompDestination == null)
+      {
+         throw new HornetQException(HornetQException.ILLEGAL_STATE, "No destination is specified!");
+      }
+      else if (stompDestination.startsWith(STOMP_QUEUE_PREFIX))
+      {
+         return convert(stompDestination, STOMP_QUEUE_PREFIX, HQ_QUEUE_PREFIX);
+      }
+      else if (stompDestination.startsWith(STOMP_TOPIC_PREFIX))
+      {
+         return convert(stompDestination, STOMP_TOPIC_PREFIX, HQ_TOPIC_PREFIX);
+      }
+      else if (stompDestination.startsWith(STOMP_TEMP_QUEUE_PREFIX))
+      {
+         return convert(stompDestination, STOMP_TEMP_QUEUE_PREFIX, HQ_TEMP_QUEUE_PREFIX);
+      }
+      else if (stompDestination.startsWith(STOMP_TEMP_TOPIC_PREFIX))
+      {
+         return convert(stompDestination, STOMP_TEMP_TOPIC_PREFIX, HQ_TEMP_TOPIC_PREFIX);
+      }
+      else
+      {
+         throw new HornetQException(HornetQException.ILLEGAL_STATE, "Illegal destination name: [" + stompDestination +
+                                                                    "] -- StompConnect destinations " +
+                                                                    "must begin with one of: /queue/ /topic/ /temp-queue/ /temp-topic/");
+      }
+   }
+
+   public static String toStompDestination(String hornetqAddress) throws HornetQException
+   {
+      if (hornetqAddress == null)
+      {
+         throw new HornetQException(HornetQException.ILLEGAL_STATE, "No destination is specified!");
+      }
+      else if (hornetqAddress.startsWith(HQ_QUEUE_PREFIX))
+      {
+         return convert(hornetqAddress, HQ_QUEUE_PREFIX, STOMP_QUEUE_PREFIX);
+      }
+      else if (hornetqAddress.startsWith(HQ_TOPIC_PREFIX))
+      {
+         return convert(hornetqAddress, HQ_TOPIC_PREFIX, STOMP_TOPIC_PREFIX);
+      }
+      else if (hornetqAddress.startsWith(HQ_TEMP_QUEUE_PREFIX))
+      {
+         return convert(hornetqAddress, HQ_TEMP_QUEUE_PREFIX, STOMP_TEMP_QUEUE_PREFIX);
+      }
+      else if (hornetqAddress.startsWith(HQ_TEMP_TOPIC_PREFIX))
+      {
+         return convert(hornetqAddress, HQ_TEMP_TOPIC_PREFIX, STOMP_TEMP_TOPIC_PREFIX);
+      }
+      else
+      {
+         throw new HornetQException(HornetQException.ILLEGAL_STATE, "Illegal address name: [" + hornetqAddress +
+                                                                    "] -- Acceptable address must comply to JMS semantics");
+      }
+   }
+   
+   private static String convert(String str, String oldPrefix, String newPrefix) 
+   {
+      String sub = str.substring(oldPrefix.length(), str.length());
+      return new String(newPrefix + sub);
+   }
+
+   // Constructors --------------------------------------------------
+
+   // Public --------------------------------------------------------
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+
+}



More information about the hornetq-commits mailing list