Author: jmesnil
Date: 2010-01-22 07:46:06 -0500 (Fri, 22 Jan 2010)
New Revision: 8840
Added:
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/protocol/stomp/
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/protocol/stomp/StompConnection.java
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/protocol/stomp/StompException.java
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/protocol/stomp/StompFrame.java
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/protocol/stomp/StompFrameError.java
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/protocol/stomp/StompMarshaller.java
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/protocol/stomp/StompProtocolManagerFactory.java
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/protocol/stomp/StompSessionCallback.java
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/protocol/stomp/StompUtils.java
Removed:
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/protocol/stomp/ProtocolException.java
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/protocol/stomp/StompDestinationConverter.java
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/protocol/stomp/StompException.java
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/protocol/stomp/StompFrame.java
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/protocol/stomp/StompFrameError.java
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/protocol/stomp/StompMarshaller.java
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/protocol/stomp/StompPacketDecoder.java
Modified:
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/protocol/stomp/Stomp.java
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/protocol/stomp/StompFrameDelimiter.java
Log:
https://jira.jboss.org/jira/browse/HORNETQ-129: Implement STOMP v1.0
* updated code with trunk's refactoring of protocol support
Copied: branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/protocol/stomp (from
rev 8826,
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/protocol/stomp)
Deleted:
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/protocol/stomp/ProtocolException.java
===================================================================
---
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/protocol/stomp/ProtocolException.java 2010-01-21
14:41:55 UTC (rev 8826)
+++
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/protocol/stomp/ProtocolException.java 2010-01-22
12:46:06 UTC (rev 8840)
@@ -1,50 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *
http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.hornetq.integration.protocol.stomp;
-
-import java.io.IOException;
-
-/**
- * @author <a href="http://hiramchirino.com">chirino</a>
- */
-public class ProtocolException extends IOException {
- private static final long serialVersionUID = -2869735532997332242L;
- private final boolean fatal;
-
- public ProtocolException() {
- this(null);
- }
-
- public ProtocolException(String s) {
- this(s, false);
- }
-
- public ProtocolException(String s, boolean fatal) {
- this(s, fatal, null);
- }
-
- public ProtocolException(String s, boolean fatal, Throwable cause) {
- super(s);
- this.fatal = fatal;
- initCause(cause);
- }
-
- public boolean isFatal() {
- return fatal;
- }
-}
Modified:
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/protocol/stomp/Stomp.java
===================================================================
---
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/protocol/stomp/Stomp.java 2010-01-21
14:41:55 UTC (rev 8826)
+++
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/protocol/stomp/Stomp.java 2010-01-22
12:46:06 UTC (rev 8840)
@@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.hornetq.integration.protocol.stomp;
+package org.hornetq.core.protocol.stomp;
/**
Added:
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/protocol/stomp/StompConnection.java
===================================================================
---
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/protocol/stomp/StompConnection.java
(rev 0)
+++
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/protocol/stomp/StompConnection.java 2010-01-22
12:46:06 UTC (rev 8840)
@@ -0,0 +1,140 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.protocol.stomp;
+
+import java.util.Collections;
+import java.util.List;
+
+import org.hornetq.api.core.HornetQBuffer;
+import org.hornetq.api.core.HornetQBuffers;
+import org.hornetq.api.core.HornetQException;
+import org.hornetq.core.logging.Logger;
+import org.hornetq.core.remoting.CloseListener;
+import org.hornetq.core.remoting.FailureListener;
+import org.hornetq.spi.core.protocol.ProtocolManager;
+import org.hornetq.spi.core.protocol.RemotingConnection;
+import org.hornetq.spi.core.remoting.Connection;
+
+/**
+ * A StompConnection
+ *
+ * @author <a href="mailto:jmesnil@redhat.com">Jeff Mesnil</a>
+ *
+ *
+ */
+public class StompConnection implements RemotingConnection
+{
+ private static final Logger log = Logger.getLogger(StompConnection.class);
+
+ private final ProtocolManager manager;
+
+ private final Connection transportConnection;
+
+ StompConnection(final Connection transportConnection, final ProtocolManager manager)
+ {
+ this.transportConnection = transportConnection;
+
+ this.manager = manager;
+ }
+
+ public void addCloseListener(CloseListener listener)
+ {
+ }
+
+ public void addFailureListener(FailureListener listener)
+ {
+ }
+
+ public boolean checkDataReceived()
+ {
+ return true;
+ }
+
+ public HornetQBuffer createBuffer(int size)
+ {
+ return HornetQBuffers.dynamicBuffer(size);
+ }
+
+ public void destroy()
+ {
+ }
+
+ public void disconnect()
+ {
+ }
+
+ public void fail(HornetQException me)
+ {
+ }
+
+ public void flush()
+ {
+ }
+
+ public List<FailureListener> getFailureListeners()
+ {
+ return Collections.EMPTY_LIST;
+ }
+
+ public Object getID()
+ {
+ return transportConnection.getID();
+ }
+
+ public String getRemoteAddress()
+ {
+ return transportConnection.getRemoteAddress();
+ }
+
+ public Connection getTransportConnection()
+ {
+ return transportConnection;
+ }
+
+ public boolean isClient()
+ {
+ return false;
+ }
+
+ public boolean isDestroyed()
+ {
+ return false;
+ }
+
+ public boolean removeCloseListener(CloseListener listener)
+ {
+ return false;
+ }
+
+ public boolean removeFailureListener(FailureListener listener)
+ {
+ return false;
+ }
+
+ public void setFailureListeners(List<FailureListener> listeners)
+ {
+ }
+
+
+ public void bufferReceived(Object connectionID, HornetQBuffer buffer)
+ {
+ manager.handleBuffer(this, buffer);
+ }
+
+ public int isReadyToHandle(HornetQBuffer buffer)
+ {
+ return -1;
+ }
+
+}
Deleted:
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/protocol/stomp/StompDestinationConverter.java
===================================================================
---
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/protocol/stomp/StompDestinationConverter.java 2010-01-21
14:41:55 UTC (rev 8826)
+++
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/protocol/stomp/StompDestinationConverter.java 2010-01-22
12:46:06 UTC (rev 8840)
@@ -1,85 +0,0 @@
-/*
- * Copyright 2010 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied. See the License for the specific language governing
- * permissions and limitations under the License.
- */
-
-package org.hornetq.integration.protocol.stomp;
-
-import org.hornetq.api.core.HornetQException;
-import org.hornetq.api.core.SimpleString;
-import org.hornetq.jms.client.HornetQQueue;
-import org.hornetq.jms.client.HornetQTemporaryQueue;
-import org.hornetq.jms.client.HornetQTemporaryTopic;
-import org.hornetq.jms.client.HornetQTopic;
-
-/**
- * A StompDestinationConverter
- *
- * @author jmesnil
- *
- *
- */
-public class StompDestinationConverter
-{
-
- // Constants -----------------------------------------------------
-
- // Attributes ----------------------------------------------------
-
- // Static --------------------------------------------------------
-
- public static SimpleString convertDestination(String name) throws HornetQException
- {
- if (name == null)
- {
- throw new HornetQException(HornetQException.ILLEGAL_STATE, "No destination
is specified!");
- }
- else if (name.startsWith("/queue/"))
- {
- String queueName = name.substring("/queue/".length(), name.length());
- return HornetQQueue.createAddressFromName(queueName);
- }
- else if (name.startsWith("/topic/"))
- {
- String topicName = name.substring("/topic/".length(), name.length());
- return HornetQTopic.createAddressFromName(topicName);
- }
- else if (name.startsWith("/temp-queue/"))
- {
- String tempName = name.substring("/temp-queue/".length(),
name.length());
- return HornetQTemporaryQueue.createAddressFromName(tempName);
- }
- else if (name.startsWith("/temp-topic/"))
- {
- String tempName = name.substring("/temp-topic/".length(),
name.length());
- return HornetQTemporaryTopic.createAddressFromName(tempName);
- }
- else
- {
- throw new HornetQException(HornetQException.ILLEGAL_STATE, "Illegal
destination name: [" + name +
- "] --
StompConnect destinations " +
- "must begine
with one of: /queue/ /topic/ /temp-queue/ /temp-topic/");
- }
- }
-
- // Constructors --------------------------------------------------
-
- // Public --------------------------------------------------------
-
- // Package protected ---------------------------------------------
-
- // Protected -----------------------------------------------------
-
- // Private -------------------------------------------------------
-
- // Inner classes -------------------------------------------------
-
-}
Deleted:
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/protocol/stomp/StompException.java
===================================================================
---
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/protocol/stomp/StompException.java 2010-01-21
14:41:55 UTC (rev 8826)
+++
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/protocol/stomp/StompException.java 2010-01-22
12:46:06 UTC (rev 8840)
@@ -1,52 +0,0 @@
-/*
- * Copyright 2010 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied. See the License for the specific language governing
- * permissions and limitations under the License.
- */
-
-package org.hornetq.integration.protocol.stomp;
-
-/**
- * A StompException
- *
- * @author jmesnil
- *
- *
- */
-public class StompException extends Exception
-{
-
- /**
- * @param string
- */
- public StompException(String string)
- {
- super(string);
- }
-
- // Constants -----------------------------------------------------
-
- // Attributes ----------------------------------------------------
-
- // Static --------------------------------------------------------
-
- // Constructors --------------------------------------------------
-
- // Public --------------------------------------------------------
-
- // Package protected ---------------------------------------------
-
- // Protected -----------------------------------------------------
-
- // Private -------------------------------------------------------
-
- // Inner classes -------------------------------------------------
-
-}
Copied:
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/protocol/stomp/StompException.java
(from rev 8830,
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/protocol/stomp/StompException.java)
===================================================================
---
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/protocol/stomp/StompException.java
(rev 0)
+++
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/protocol/stomp/StompException.java 2010-01-22
12:46:06 UTC (rev 8840)
@@ -0,0 +1,50 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.hornetq.core.protocol.stomp;
+
+import java.io.IOException;
+
+/**
+ * @author <a href="http://hiramchirino.com">chirino</a>
+ */
+class StompException extends IOException {
+ private static final long serialVersionUID = -2869735532997332242L;
+ private final boolean fatal;
+
+ public StompException() {
+ this(null);
+ }
+
+ public StompException(String s) {
+ this(s, false);
+ }
+
+ public StompException(String s, boolean fatal) {
+ this(s, fatal, null);
+ }
+
+ public StompException(String s, boolean fatal, Throwable cause) {
+ super(s);
+ this.fatal = fatal;
+ initCause(cause);
+ }
+
+ public boolean isFatal() {
+ return fatal;
+ }
+}
Deleted:
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/protocol/stomp/StompFrame.java
===================================================================
---
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/protocol/stomp/StompFrame.java 2010-01-21
14:41:55 UTC (rev 8826)
+++
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/protocol/stomp/StompFrame.java 2010-01-22
12:46:06 UTC (rev 8840)
@@ -1,70 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *
http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.hornetq.integration.protocol.stomp;
-
-import java.util.HashMap;
-import java.util.Map;
-
-/**
- * Represents all the data in a STOMP frame.
- *
- * @author <a href="http://hiramchirino.com">chirino</a>
- */
-public class StompFrame
-{
- private static final byte[] NO_DATA = new byte[] {};
-
- private String command;
-
- private Map<String, Object> headers;
-
- private byte[] content = StompFrame.NO_DATA;
-
- public StompFrame()
- {
- this.headers = new HashMap<String, Object>();
- }
-
- public StompFrame(String command, Map<String, Object> headers, byte[] data)
- {
- this.command = command;
- this.headers = headers;
- this.content = data;
- }
-
- public String getCommand()
- {
- return command;
- }
-
- public byte[] getContent()
- {
- return content;
- }
-
- public Map<String, Object> getHeaders()
- {
- return headers;
- }
-
- @Override
- public String toString()
- {
- return "StompFrame[command=" + command + ", headers=" + headers
+ ",content-length=" + content.length + "]";
- }
-}
Copied:
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/protocol/stomp/StompFrame.java
(from rev 8829,
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/protocol/stomp/StompFrame.java)
===================================================================
---
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/protocol/stomp/StompFrame.java
(rev 0)
+++
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/protocol/stomp/StompFrame.java 2010-01-22
12:46:06 UTC (rev 8840)
@@ -0,0 +1,70 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.hornetq.core.protocol.stomp;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Represents all the data in a STOMP frame.
+ *
+ * @author <a href="http://hiramchirino.com">chirino</a>
+ */
+class StompFrame
+{
+ private static final byte[] NO_DATA = new byte[] {};
+
+ private String command;
+
+ private Map<String, Object> headers;
+
+ private byte[] content = StompFrame.NO_DATA;
+
+ public StompFrame()
+ {
+ this.headers = new HashMap<String, Object>();
+ }
+
+ public StompFrame(String command, Map<String, Object> headers, byte[] data)
+ {
+ this.command = command;
+ this.headers = headers;
+ this.content = data;
+ }
+
+ public String getCommand()
+ {
+ return command;
+ }
+
+ public byte[] getContent()
+ {
+ return content;
+ }
+
+ public Map<String, Object> getHeaders()
+ {
+ return headers;
+ }
+
+ @Override
+ public String toString()
+ {
+ return "StompFrame[command=" + command + ", headers=" + headers
+ ",content-length=" + content.length + "]";
+ }
+}
Modified:
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/protocol/stomp/StompFrameDelimiter.java
===================================================================
---
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/protocol/stomp/StompFrameDelimiter.java 2010-01-21
14:41:55 UTC (rev 8826)
+++
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/protocol/stomp/StompFrameDelimiter.java 2010-01-22
12:46:06 UTC (rev 8840)
@@ -11,7 +11,7 @@
* permissions and limitations under the License.
*/
-package org.hornetq.integration.protocol.stomp;
+package org.hornetq.core.protocol.stomp;
import org.jboss.netty.handler.codec.frame.DelimiterBasedFrameDecoder;
import org.jboss.netty.handler.codec.frame.Delimiters;
Deleted:
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/protocol/stomp/StompFrameError.java
===================================================================
---
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/protocol/stomp/StompFrameError.java 2010-01-21
14:41:55 UTC (rev 8826)
+++
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/protocol/stomp/StompFrameError.java 2010-01-22
12:46:06 UTC (rev 8840)
@@ -1,35 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *
http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.hornetq.integration.protocol.stomp;
-
-/**
- * Command indicating that an invalid Stomp Frame was received.
- *
- * @author <a href="http://hiramchirino.com">chirino</a>
- */
-public class StompFrameError extends StompFrame {
- private final ProtocolException exception;
-
- public StompFrameError(ProtocolException exception) {
- this.exception = exception;
- }
-
- public ProtocolException getException() {
- return exception;
- }
-}
Copied:
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/protocol/stomp/StompFrameError.java
(from rev 8830,
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/protocol/stomp/StompFrameError.java)
===================================================================
---
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/protocol/stomp/StompFrameError.java
(rev 0)
+++
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/protocol/stomp/StompFrameError.java 2010-01-22
12:46:06 UTC (rev 8840)
@@ -0,0 +1,35 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.hornetq.core.protocol.stomp;
+
+/**
+ * Command indicating that an invalid Stomp Frame was received.
+ *
+ * @author <a href="http://hiramchirino.com">chirino</a>
+ */
+class StompFrameError extends StompFrame {
+ private final StompException exception;
+
+ public StompFrameError(StompException exception) {
+ this.exception = exception;
+ }
+
+ public StompException getException() {
+ return exception;
+ }
+}
Deleted:
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/protocol/stomp/StompMarshaller.java
===================================================================
---
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/protocol/stomp/StompMarshaller.java 2010-01-21
14:41:55 UTC (rev 8826)
+++
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/protocol/stomp/StompMarshaller.java 2010-01-22
12:46:06 UTC (rev 8840)
@@ -1,191 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *
http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.hornetq.integration.protocol.stomp;
-
-import java.io.ByteArrayOutputStream;
-import java.io.DataOutput;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
-
-import org.hornetq.api.core.HornetQBuffer;
-
-/**
- * Implements marshalling and unmarsalling the <a
href="http://stomp.codehaus.org/">Stomp</a> protocol.
- */
-public class StompMarshaller {
- private static final byte[] NO_DATA = new byte[]{};
- private static final byte[] END_OF_FRAME = new byte[]{0, '\n'};
- private static final int MAX_COMMAND_LENGTH = 1024;
- private static final int MAX_HEADER_LENGTH = 1024 * 10;
- private static final int MAX_HEADERS = 1000;
- private static final int MAX_DATA_LENGTH = 1024 * 1024 * 100;
- private int version = 1;
-
- public int getVersion() {
- return version;
- }
-
- public void setVersion(int version) {
- this.version = version;
- }
-
- public byte[] marshal(StompFrame command) throws IOException {
- ByteArrayOutputStream baos = new ByteArrayOutputStream();
- DataOutputStream dos = new DataOutputStream(baos);
- marshal(command, dos);
- dos.close();
- return baos.toByteArray();
- }
-
- public void marshal(StompFrame stomp, DataOutput os) throws IOException {
- StringBuffer buffer = new StringBuffer();
- buffer.append(stomp.getCommand());
- buffer.append(Stomp.NEWLINE);
-
- // Output the headers.
- for (Iterator iter = stomp.getHeaders().entrySet().iterator(); iter.hasNext();)
{
- Map.Entry entry = (Map.Entry) iter.next();
- buffer.append(entry.getKey());
- buffer.append(Stomp.Headers.SEPERATOR);
- buffer.append(entry.getValue());
- buffer.append(Stomp.NEWLINE);
- }
-
- // Add a newline to seperate the headers from the content.
- buffer.append(Stomp.NEWLINE);
-
- os.write(buffer.toString().getBytes("UTF-8"));
- os.write(stomp.getContent());
- os.write(END_OF_FRAME);
- }
-
- public StompFrame unmarshal(HornetQBuffer in) throws IOException {
-
- try {
- String action = null;
-
- // skip white space to next real action line
- while (true) {
- action = readLine(in, MAX_COMMAND_LENGTH, "The maximum command
length was exceeded");
- if (action == null) {
- throw new IOException("connection was closed");
- }
- else {
- action = action.trim();
- if (action.length() > 0) {
- break;
- }
- }
- }
-
- // Parse the headers
- HashMap headers = new HashMap(25);
- while (true) {
- String line = readLine(in, MAX_HEADER_LENGTH, "The maximum header
length was exceeded");
- if (line != null && line.trim().length() > 0) {
-
- if (headers.size() > MAX_HEADERS) {
- throw new ProtocolException("The maximum number of headers
was exceeded", true);
- }
-
- try {
- int seperator_index = line.indexOf(Stomp.Headers.SEPERATOR);
- String name = line.substring(0, seperator_index).trim();
- String value = line.substring(seperator_index + 1,
line.length()).trim();
- headers.put(name, value);
- }
- catch (Exception e) {
- throw new ProtocolException("Unable to parser header line
[" + line + "]", true);
- }
- }
- else {
- break;
- }
- }
-
- // Read in the data part.
- byte[] data = NO_DATA;
- String contentLength = (String) headers.get(Stomp.Headers.CONTENT_LENGTH);
- if (contentLength != null) {
-
- // Bless the client, he's telling us how much data to read in.
- int length;
- try {
- length = Integer.parseInt(contentLength.trim());
- }
- catch (NumberFormatException e) {
- throw new ProtocolException("Specified content-length is not a
valid integer", true);
- }
-
- if (length > MAX_DATA_LENGTH) {
- throw new ProtocolException("The maximum data length was
exceeded", true);
- }
-
- data = new byte[length];
- in.readBytes(data);
-
- if (in.readByte() != 0) {
- throw new ProtocolException(Stomp.Headers.CONTENT_LENGTH + "
bytes were read and " + "there was no trailing null byte", true);
- }
- }
- else {
-
- // We don't know how much to read.. data ends when we hit a 0
- byte b;
- ByteArrayOutputStream baos = null;
- while (in.readableBytes() > 0 && (b = in.readByte()) != 0) {
-
- if (baos == null) {
- baos = new ByteArrayOutputStream();
- }
- else if (baos.size() > MAX_DATA_LENGTH) {
- throw new ProtocolException("The maximum data length was
exceeded", true);
- }
-
- baos.write(b);
- }
-
- if (baos != null) {
- baos.close();
- data = baos.toByteArray();
- }
- }
-
- return new StompFrame(action, headers, data);
- }
- catch (ProtocolException e) {
- return new StompFrameError(e);
- }
- }
-
- protected String readLine(HornetQBuffer in, int maxLength, String errorMessage)
throws IOException {
- byte b;
- ByteArrayOutputStream baos = new ByteArrayOutputStream(maxLength);
- while ((b = in.readByte()) != '\n') {
- if (baos.size() > maxLength) {
- throw new ProtocolException(errorMessage, true);
- }
- baos.write(b);
- }
- byte[] sequence = baos.toByteArray();
- return new String(sequence, "UTF-8");
- }
-}
Copied:
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/protocol/stomp/StompMarshaller.java
(from rev 8830,
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/protocol/stomp/StompMarshaller.java)
===================================================================
---
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/protocol/stomp/StompMarshaller.java
(rev 0)
+++
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/protocol/stomp/StompMarshaller.java 2010-01-22
12:46:06 UTC (rev 8840)
@@ -0,0 +1,191 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.hornetq.core.protocol.stomp;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+
+import org.hornetq.api.core.HornetQBuffer;
+
+/**
+ * Implements marshalling and unmarsalling the <a
href="http://stomp.codehaus.org/">Stomp</a> protocol.
+ */
+class StompMarshaller {
+ private static final byte[] NO_DATA = new byte[]{};
+ private static final byte[] END_OF_FRAME = new byte[]{0, '\n'};
+ private static final int MAX_COMMAND_LENGTH = 1024;
+ private static final int MAX_HEADER_LENGTH = 1024 * 10;
+ private static final int MAX_HEADERS = 1000;
+ private static final int MAX_DATA_LENGTH = 1024 * 1024 * 100;
+ private int version = 1;
+
+ public int getVersion() {
+ return version;
+ }
+
+ public void setVersion(int version) {
+ this.version = version;
+ }
+
+ public byte[] marshal(StompFrame command) throws IOException {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ DataOutputStream dos = new DataOutputStream(baos);
+ marshal(command, dos);
+ dos.close();
+ return baos.toByteArray();
+ }
+
+ public void marshal(StompFrame stomp, DataOutput os) throws IOException {
+ StringBuffer buffer = new StringBuffer();
+ buffer.append(stomp.getCommand());
+ buffer.append(Stomp.NEWLINE);
+
+ // Output the headers.
+ for (Iterator iter = stomp.getHeaders().entrySet().iterator(); iter.hasNext();)
{
+ Map.Entry entry = (Map.Entry) iter.next();
+ buffer.append(entry.getKey());
+ buffer.append(Stomp.Headers.SEPERATOR);
+ buffer.append(entry.getValue());
+ buffer.append(Stomp.NEWLINE);
+ }
+
+ // Add a newline to seperate the headers from the content.
+ buffer.append(Stomp.NEWLINE);
+
+ os.write(buffer.toString().getBytes("UTF-8"));
+ os.write(stomp.getContent());
+ os.write(END_OF_FRAME);
+ }
+
+ public StompFrame unmarshal(HornetQBuffer in) throws IOException {
+
+ try {
+ String action = null;
+
+ // skip white space to next real action line
+ while (true) {
+ action = readLine(in, MAX_COMMAND_LENGTH, "The maximum command
length was exceeded");
+ if (action == null) {
+ throw new IOException("connection was closed");
+ }
+ else {
+ action = action.trim();
+ if (action.length() > 0) {
+ break;
+ }
+ }
+ }
+
+ // Parse the headers
+ HashMap headers = new HashMap(25);
+ while (true) {
+ String line = readLine(in, MAX_HEADER_LENGTH, "The maximum header
length was exceeded");
+ if (line != null && line.trim().length() > 0) {
+
+ if (headers.size() > MAX_HEADERS) {
+ throw new StompException("The maximum number of headers was
exceeded", true);
+ }
+
+ try {
+ int seperator_index = line.indexOf(Stomp.Headers.SEPERATOR);
+ String name = line.substring(0, seperator_index).trim();
+ String value = line.substring(seperator_index + 1,
line.length()).trim();
+ headers.put(name, value);
+ }
+ catch (Exception e) {
+ throw new StompException("Unable to parser header line
[" + line + "]", true);
+ }
+ }
+ else {
+ break;
+ }
+ }
+
+ // Read in the data part.
+ byte[] data = NO_DATA;
+ String contentLength = (String) headers.get(Stomp.Headers.CONTENT_LENGTH);
+ if (contentLength != null) {
+
+ // Bless the client, he's telling us how much data to read in.
+ int length;
+ try {
+ length = Integer.parseInt(contentLength.trim());
+ }
+ catch (NumberFormatException e) {
+ throw new StompException("Specified content-length is not a
valid integer", true);
+ }
+
+ if (length > MAX_DATA_LENGTH) {
+ throw new StompException("The maximum data length was
exceeded", true);
+ }
+
+ data = new byte[length];
+ in.readBytes(data);
+
+ if (in.readByte() != 0) {
+ throw new StompException(Stomp.Headers.CONTENT_LENGTH + " bytes
were read and " + "there was no trailing null byte", true);
+ }
+ }
+ else {
+
+ // We don't know how much to read.. data ends when we hit a 0
+ byte b;
+ ByteArrayOutputStream baos = null;
+ while (in.readableBytes() > 0 && (b = in.readByte()) != 0) {
+
+ if (baos == null) {
+ baos = new ByteArrayOutputStream();
+ }
+ else if (baos.size() > MAX_DATA_LENGTH) {
+ throw new StompException("The maximum data length was
exceeded", true);
+ }
+
+ baos.write(b);
+ }
+
+ if (baos != null) {
+ baos.close();
+ data = baos.toByteArray();
+ }
+ }
+
+ return new StompFrame(action, headers, data);
+ }
+ catch (StompException e) {
+ return new StompFrameError(e);
+ }
+ }
+
+ protected String readLine(HornetQBuffer in, int maxLength, String errorMessage)
throws IOException {
+ byte b;
+ ByteArrayOutputStream baos = new ByteArrayOutputStream(maxLength);
+ while ((b = in.readByte()) != '\n') {
+ if (baos.size() > maxLength) {
+ throw new StompException(errorMessage, true);
+ }
+ baos.write(b);
+ }
+ byte[] sequence = baos.toByteArray();
+ return new String(sequence, "UTF-8");
+ }
+}
Deleted:
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/protocol/stomp/StompPacketDecoder.java
===================================================================
---
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/protocol/stomp/StompPacketDecoder.java 2010-01-21
14:41:55 UTC (rev 8826)
+++
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/protocol/stomp/StompPacketDecoder.java 2010-01-22
12:46:06 UTC (rev 8840)
@@ -1,51 +0,0 @@
-/*
- * Copyright 2010 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied. See the License for the specific language governing
- * permissions and limitations under the License.
- */
-
-package org.hornetq.integration.protocol.stomp;
-
-import org.hornetq.api.core.HornetQBuffer;
-import org.hornetq.core.buffers.impl.ChannelBufferWrapper;
-import org.jboss.netty.buffer.ChannelBuffer;
-import org.jboss.netty.channel.ChannelHandlerContext;
-import org.jboss.netty.channel.ChannelPipelineCoverage;
-import org.jboss.netty.channel.Channels;
-import org.jboss.netty.channel.MessageEvent;
-import org.jboss.netty.channel.SimpleChannelHandler;
-
-/**
- * A StompPacketDecoder
- *
- * @author <a href="mailto:jmesnil@redhat.com">Jeff Mesnil</a>
- */
-@ChannelPipelineCoverage("one")
-public class StompPacketDecoder extends SimpleChannelHandler
-{
- private final StompMarshaller marshaller;
-
- // PacketDecoder implementation ----------------------------------
-
- public StompPacketDecoder(final StompMarshaller marshaller)
- {
- this.marshaller = marshaller;
- }
-
- @Override
- public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws
Exception
- {
- ChannelBuffer in = (ChannelBuffer)e.getMessage();
- HornetQBuffer buffer = new ChannelBufferWrapper(in);
- StompFrame frame = marshaller.unmarshal(buffer);
-
- Channels.fireMessageReceived(ctx, frame);
- }
-}
Added:
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java
===================================================================
---
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java
(rev 0)
+++
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/protocol/stomp/StompProtocolManager.java 2010-01-22
12:46:06 UTC (rev 8840)
@@ -0,0 +1,296 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.protocol.stomp;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.io.PrintWriter;
+import java.io.UnsupportedEncodingException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.hornetq.api.core.HornetQBuffer;
+import org.hornetq.api.core.HornetQBuffers;
+import org.hornetq.api.core.HornetQException;
+import org.hornetq.api.core.Interceptor;
+import org.hornetq.api.core.Message;
+import org.hornetq.api.core.SimpleString;
+import org.hornetq.api.core.client.HornetQClient;
+import org.hornetq.core.logging.Logger;
+import org.hornetq.core.server.HornetQServer;
+import org.hornetq.core.server.ServerMessage;
+import org.hornetq.core.server.ServerSession;
+import org.hornetq.core.server.impl.ServerMessageImpl;
+import org.hornetq.spi.core.protocol.ConnectionEntry;
+import org.hornetq.spi.core.protocol.ProtocolManager;
+import org.hornetq.spi.core.protocol.RemotingConnection;
+import org.hornetq.spi.core.remoting.Connection;
+import org.hornetq.utils.UUIDGenerator;
+
+/**
+ * StompProtocolManager
+ *
+ * A stupid protocol to demonstrate how to implement a new protocol in HornetQ
+ *
+ * @author Tim Fox
+ *
+ *
+ */
+public class StompProtocolManager implements ProtocolManager
+{
+ private static final Logger log = Logger.getLogger(StompProtocolManager.class);
+
+ private final HornetQServer server;
+
+ private final StompMarshaller marshaller;
+
+ private final Map<RemotingConnection, ServerSession> sessions = new
HashMap<RemotingConnection, ServerSession>();
+
+ public StompProtocolManager(final HornetQServer server, final List<Interceptor>
interceptors)
+ {
+ this.server = server;
+ this.marshaller = new StompMarshaller();
+ }
+
+ public ConnectionEntry createConnectionEntry(final Connection connection)
+ {
+ StompConnection conn = new StompConnection(connection, this);
+
+ return new ConnectionEntry(conn, 0, 0);
+ }
+
+ public void removeHandler(String name)
+ {
+ }
+
+ public void handleBuffer(RemotingConnection connection, HornetQBuffer buffer)
+ {
+ StompFrame frame = null;
+ try
+ {
+ frame = marshaller.unmarshal(buffer);
+ System.out.println("RECEIVED " + frame);
+
+ String command = frame.getCommand();
+
+ StompFrame response = null;
+ if (Stomp.Commands.CONNECT.equals(command))
+ {
+ response = onConnect(frame, server, connection);
+ }
+ else if (Stomp.Commands.DISCONNECT.equals(command))
+ {
+ response = onDisconnect(frame, server, connection);
+ }
+ else if (Stomp.Commands.SEND.equals(command))
+ {
+ response = onSend(frame, server, connection);
+ }
+ else if (Stomp.Commands.SUBSCRIBE.equals(command))
+ {
+ response = onSubscribe(frame, server, connection);
+ }
+ else
+ {
+ log.error("Unsupported Stomp frame: " + frame);
+ response = new StompFrame(Stomp.Responses.ERROR,
+ new HashMap<String, Object>(),
+ ("Unsupported frame: " +
command).getBytes());
+ }
+
+ if (response != null)
+ {
+ send(connection, response);
+ }
+ }
+ catch (StompException ex)
+ {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ try
+ {
+ // Let the stomp client know about any protocol errors.
+ PrintWriter stream = new PrintWriter(new OutputStreamWriter(baos,
"UTF-8"));
+ ex.printStackTrace(stream);
+ stream.close();
+ }
+ catch (UnsupportedEncodingException e)
+ {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+
+ Map<String, Object> headers = new HashMap<String, Object>();
+ headers.put(Stomp.Headers.Error.MESSAGE, ex.getMessage());
+
+ final String receiptId =
(String)frame.getHeaders().get(Stomp.Headers.RECEIPT_REQUESTED);
+ if (receiptId != null)
+ {
+ headers.put(Stomp.Headers.Response.RECEIPT_ID, receiptId);
+ }
+
+ StompFrame errorMessage = new StompFrame(Stomp.Responses.ERROR, headers,
baos.toByteArray());
+ try
+ {
+ send(connection, errorMessage);
+ }
+ catch (IOException e)
+ {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ }
+ catch (Exception ex)
+ {
+ ex.printStackTrace();
+ }
+ }
+
+ private StompFrame onSubscribe(StompFrame frame, HornetQServer server,
RemotingConnection connection) throws Exception,
+
StompException,
+
HornetQException
+ {
+ Map<String, Object> headers = frame.getHeaders();
+ String queue = (String)headers.get(Stomp.Headers.Send.DESTINATION);
+ SimpleString queueName =
SimpleString.toSimpleString(StompUtils.toHornetQAddress(queue));
+
+ ServerSession session = checkAndGetSession(connection);
+ long consumerID = server.getStorageManager().generateUniqueID();
+ session.createConsumer(consumerID, queueName, null, false);
+ session.receiveConsumerCredits(consumerID, -1);
+ session.start();
+
+ return null;
+ }
+
+ private ServerSession checkAndGetSession(RemotingConnection connection) throws
StompException
+ {
+ ServerSession session = sessions.get(connection);
+ if (session == null)
+ {
+ throw new StompException("Not connected");
+ }
+ return session;
+ }
+
+ private StompFrame onDisconnect(StompFrame frame, HornetQServer server,
RemotingConnection connection) throws StompException
+ {
+ ServerSession session = checkAndGetSession(connection);
+ if (session != null)
+ {
+ try
+ {
+ session.close();
+ }
+ catch (Exception e)
+ {
+ throw new StompException(e.getMessage());
+ }
+ sessions.remove(connection);
+ }
+ return null;
+ }
+
+ private StompFrame onSend(StompFrame frame, HornetQServer server, RemotingConnection
connection) throws Exception
+ {
+ ServerSession session = checkAndGetSession(connection);
+
+ Map<String, Object> headers = frame.getHeaders();
+ String queue = (String)headers.get(Stomp.Headers.Send.DESTINATION);
+ /*
+ String type = (String)headers.get(Stomp.Headers.Send.TYPE);
+ long expiration = (Long)headers.get(Stomp.Headers.Send.EXPIRATION_TIME);
+ byte priority = (Byte)headers.get(Stomp.Headers.Send.PRIORITY);
+ boolean durable = (Boolean)headers.get(Stomp.Headers.Send.PERSISTENT);
+ */
+ byte type = Message.TEXT_TYPE;
+ if (headers.containsKey(Stomp.Headers.CONTENT_LENGTH))
+ {
+ type = Message.BYTES_TYPE;
+ }
+ long timestamp = System.currentTimeMillis();
+ boolean durable = false;
+ long expiration = -1;
+ byte priority = 9;
+ SimpleString address =
SimpleString.toSimpleString(StompUtils.toHornetQAddress(queue));
+
+ ServerMessage message = new
ServerMessageImpl(server.getStorageManager().generateUniqueID(), 512);
+ message.setType(type);
+ message.setTimestamp(timestamp);
+ message.setAddress(address);
+ byte[] content = frame.getContent();
+ if (type == Message.TEXT_TYPE)
+ {
+
message.getBodyBuffer().writeNullableSimpleString(SimpleString.toSimpleString(new
String(content)));
+ }
+ else
+ {
+ message.getBodyBuffer().writeBytes(content);
+ }
+
+ session.send(message);
+ if (headers.containsKey(Stomp.Headers.RECEIPT_REQUESTED))
+ {
+ Map<String, Object> h = new HashMap<String, Object>();
+ h.put(Stomp.Headers.Response.RECEIPT_ID,
headers.get(Stomp.Headers.RECEIPT_REQUESTED));
+ return new StompFrame(Stomp.Responses.RECEIPT, h, new byte[] {});
+ }
+ else
+ {
+ return null;
+ }
+ }
+
+ private StompFrame onConnect(StompFrame frame, HornetQServer server, final
RemotingConnection connection) throws Exception
+ {
+ Map<String, Object> headers = frame.getHeaders();
+ String login = (String)headers.get(Stomp.Headers.Connect.LOGIN);
+ String passcode = (String)headers.get(Stomp.Headers.Connect.PASSCODE);
+ String requestID = (String)headers.get(Stomp.Headers.Connect.REQUEST_ID);
+
+ String name = UUIDGenerator.getInstance().generateStringUUID();
+ ServerSession session = server.createSession(name,
+ login,
+ passcode,
+
HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE,
+ connection,
+ true,
+ true,
+ false,
+ false,
+ new StompSessionCallback(marshaller,
connection));
+ sessions.put(connection, session);
+ System.out.println(">>> created session " + session);
+ HashMap<String, Object> h = new HashMap<String, Object>();
+ h.put(Stomp.Headers.Connected.SESSION, name);
+ h.put(Stomp.Headers.Connected.RESPONSE_ID, requestID);
+ return new StompFrame(Stomp.Responses.CONNECTED, h, new byte[] {});
+ }
+
+ private void send(RemotingConnection connection, StompFrame frame) throws IOException
+ {
+ System.out.println("SENDING >>> " + frame);
+ byte[] bytes = marshaller.marshal(frame);
+ HornetQBuffer buffer = HornetQBuffers.wrappedBuffer(bytes);
+ System.out.println("ready to send reply: " + buffer);
+ connection.getTransportConnection().write(buffer, true);
+ }
+
+ public int isReadyToHandle(HornetQBuffer buffer)
+ {
+ return -1;
+ }
+
+}
Added:
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/protocol/stomp/StompProtocolManagerFactory.java
===================================================================
---
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/protocol/stomp/StompProtocolManagerFactory.java
(rev 0)
+++
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/protocol/stomp/StompProtocolManagerFactory.java 2010-01-22
12:46:06 UTC (rev 8840)
@@ -0,0 +1,38 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.protocol.stomp;
+
+import java.util.List;
+
+import org.hornetq.api.core.Interceptor;
+import org.hornetq.core.server.HornetQServer;
+import org.hornetq.spi.core.protocol.ProtocolManager;
+import org.hornetq.spi.core.protocol.ProtocolManagerFactory;
+
+/**
+ * A StompProtocolManagerFactory
+ *
+ * @author <a href="mailto:jmesnil@redhat.com">Jeff Mesnil</a>
+ *
+ *
+ */
+public class StompProtocolManagerFactory implements ProtocolManagerFactory
+{
+
+ public ProtocolManager createProtocolManager(final HornetQServer server, final
List<Interceptor> interceptors)
+ {
+ return new StompProtocolManager(server, interceptors);
+ }
+
+}
Copied:
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/protocol/stomp/StompSessionCallback.java
(from rev 8836,
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/protocol/stomp/StompSessionCallback.java)
===================================================================
---
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/protocol/stomp/StompSessionCallback.java
(rev 0)
+++
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/protocol/stomp/StompSessionCallback.java 2010-01-22
12:46:06 UTC (rev 8840)
@@ -0,0 +1,93 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.hornetq.core.protocol.stomp;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.hornetq.api.core.HornetQBuffer;
+import org.hornetq.api.core.HornetQBuffers;
+import org.hornetq.api.core.Message;
+import org.hornetq.api.core.SimpleString;
+import org.hornetq.core.server.ServerMessage;
+import org.hornetq.spi.core.protocol.RemotingConnection;
+import org.hornetq.spi.core.protocol.SessionCallback;
+
+/**
+ * A StompSessionCallback
+ *
+ * @author <a href="mailto:jmesnil@redhat.com">Jeff Mesnil</a>
+ */
+class StompSessionCallback implements SessionCallback
+{
+ private final RemotingConnection connection;
+
+ private final StompMarshaller marshaller;
+
+ StompSessionCallback(final StompMarshaller marshaller, final RemotingConnection
connection)
+ {
+ this.marshaller = marshaller;
+ this.connection = connection;
+ }
+
+ public void sendProducerCreditsMessage(int credits, SimpleString address, int offset)
+ {
+ }
+
+ public int sendMessage(ServerMessage serverMessage, long consumerID, int
deliveryCount)
+ {
+ try
+ {
+ Map<String, Object> headers = new HashMap<String, Object>();
+ headers.put(Stomp.Headers.Message.DESTINATION,
StompUtils.toStompDestination(serverMessage.getAddress()
+
.toString()));
+ byte[] data = new byte[] {};
+ if (serverMessage.getType() == Message.TEXT_TYPE)
+ {
+ SimpleString text =
serverMessage.getBodyBuffer().readNullableSimpleString();
+ if (text != null)
+ {
+ data = text.toString().getBytes();
+ }
+ }
+ StompFrame msg = new StompFrame(Stomp.Responses.MESSAGE, headers, data);
+ System.out.println("SENDING : " + msg);
+ byte[] bytes = marshaller.marshal(msg);
+ HornetQBuffer buffer = HornetQBuffers.wrappedBuffer(bytes);
+ connection.getTransportConnection().write(buffer, true);
+
+ return bytes.length;
+
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ return 0;
+ }
+
+ }
+
+ public int sendLargeMessageContinuation(long consumerID, byte[] body, boolean
continues, boolean requiresResponse)
+ {
+ return 0;
+ }
+
+ public int sendLargeMessage(long consumerID, byte[] headerBuffer, long bodySize, int
deliveryCount)
+ {
+ return 0;
+ }
+
+ public void closed()
+ {
+ }
+}
\ No newline at end of file
Copied:
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/protocol/stomp/StompUtils.java
(from rev 8830,
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/protocol/stomp/StompUtils.java)
===================================================================
---
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/protocol/stomp/StompUtils.java
(rev 0)
+++
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/protocol/stomp/StompUtils.java 2010-01-22
12:46:06 UTC (rev 8840)
@@ -0,0 +1,127 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.protocol.stomp;
+
+import org.hornetq.api.core.HornetQException;
+
+/**
+ * A StompUtils
+ *
+ * @author <a href="mailto:jmesnil@redhat.com">Jeff Mesnil</a>
+ *
+ *
+ */
+class StompUtils
+{
+
+ public static String HQ_QUEUE_PREFIX = "jms.queue.";
+
+ public static String STOMP_QUEUE_PREFIX = "/queue/";
+
+ public static String HQ_TEMP_QUEUE_PREFIX = "jms.tempqueue.";
+
+ public static String STOMP_TEMP_QUEUE_PREFIX = "/temp-queue/";
+
+ public static String HQ_TOPIC_PREFIX = "jms.topic.";
+
+ public static String STOMP_TOPIC_PREFIX = "/topic/";
+
+ public static String HQ_TEMP_TOPIC_PREFIX = "jms.temptopic.";
+
+ public static String STOMP_TEMP_TOPIC_PREFIX = "/temp-topic/";
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ // Static --------------------------------------------------------
+
+ public static String toHornetQAddress(String stompDestination) throws
HornetQException
+ {
+ if (stompDestination == null)
+ {
+ throw new HornetQException(HornetQException.ILLEGAL_STATE, "No destination
is specified!");
+ }
+ else if (stompDestination.startsWith(STOMP_QUEUE_PREFIX))
+ {
+ return convert(stompDestination, STOMP_QUEUE_PREFIX, HQ_QUEUE_PREFIX);
+ }
+ else if (stompDestination.startsWith(STOMP_TOPIC_PREFIX))
+ {
+ return convert(stompDestination, STOMP_TOPIC_PREFIX, HQ_TOPIC_PREFIX);
+ }
+ else if (stompDestination.startsWith(STOMP_TEMP_QUEUE_PREFIX))
+ {
+ return convert(stompDestination, STOMP_TEMP_QUEUE_PREFIX,
HQ_TEMP_QUEUE_PREFIX);
+ }
+ else if (stompDestination.startsWith(STOMP_TEMP_TOPIC_PREFIX))
+ {
+ return convert(stompDestination, STOMP_TEMP_TOPIC_PREFIX,
HQ_TEMP_TOPIC_PREFIX);
+ }
+ else
+ {
+ throw new HornetQException(HornetQException.ILLEGAL_STATE, "Illegal
destination name: [" + stompDestination +
+ "] --
StompConnect destinations " +
+ "must begin with
one of: /queue/ /topic/ /temp-queue/ /temp-topic/");
+ }
+ }
+
+ public static String toStompDestination(String hornetqAddress) throws
HornetQException
+ {
+ if (hornetqAddress == null)
+ {
+ throw new HornetQException(HornetQException.ILLEGAL_STATE, "No destination
is specified!");
+ }
+ else if (hornetqAddress.startsWith(HQ_QUEUE_PREFIX))
+ {
+ return convert(hornetqAddress, HQ_QUEUE_PREFIX, STOMP_QUEUE_PREFIX);
+ }
+ else if (hornetqAddress.startsWith(HQ_TOPIC_PREFIX))
+ {
+ return convert(hornetqAddress, HQ_TOPIC_PREFIX, STOMP_TOPIC_PREFIX);
+ }
+ else if (hornetqAddress.startsWith(HQ_TEMP_QUEUE_PREFIX))
+ {
+ return convert(hornetqAddress, HQ_TEMP_QUEUE_PREFIX, STOMP_TEMP_QUEUE_PREFIX);
+ }
+ else if (hornetqAddress.startsWith(HQ_TEMP_TOPIC_PREFIX))
+ {
+ return convert(hornetqAddress, HQ_TEMP_TOPIC_PREFIX, STOMP_TEMP_TOPIC_PREFIX);
+ }
+ else
+ {
+ throw new HornetQException(HornetQException.ILLEGAL_STATE, "Illegal address
name: [" + hornetqAddress +
+ "] -- Acceptable
address must comply to JMS semantics");
+ }
+ }
+
+ private static String convert(String str, String oldPrefix, String newPrefix)
+ {
+ String sub = str.substring(oldPrefix.length(), str.length());
+ return new String(newPrefix + sub);
+ }
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}