Author: jmesnil
Date: 2010-01-21 12:05:06 -0500 (Thu, 21 Jan 2010)
New Revision: 8830
Added:
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/protocol/stomp/StompException.java
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/protocol/stomp/StompFrameDecoder.java
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/protocol/stomp/StompUtils.java
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/ServerHolder.java
Removed:
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/protocol/stomp/ProtocolException.java
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/protocol/stomp/StompDestinationConverter.java
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/protocol/stomp/StompException.java
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/protocol/stomp/StompPacketDecoder.java
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/ServerHolder.java
Modified:
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/server/HornetQServer.java
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/protocol/stomp/StompChannelHandler.java
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/protocol/stomp/StompFrameError.java
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/protocol/stomp/StompMarshaller.java
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/protocol/stomp/StompSessionCallback.java
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/ChannelPipelineSupport.java
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/NettyAcceptor.java
Log:
https://jira.jboss.org/jira/browse/HORNETQ-129: Implement STOMP v1.0
* code cleanup
Modified:
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/server/HornetQServer.java
===================================================================
---
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/server/HornetQServer.java 2010-01-21
16:46:55 UTC (rev 8829)
+++
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/server/HornetQServer.java 2010-01-21
17:05:06 UTC (rev 8830)
@@ -85,8 +85,6 @@
void removeSession(String name) throws Exception;
- ServerSession getSession(String name);
-
Set<ServerSession> getSessions();
boolean isStarted();
Modified:
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
---
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2010-01-21
16:46:55 UTC (rev 8829)
+++
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2010-01-21
17:05:06 UTC (rev 8830)
@@ -598,11 +598,6 @@
sessions.remove(name);
}
- public ServerSession getSession(final String name)
- {
- return sessions.get(name);
- }
-
public synchronized List<ServerSession> getSessions(final String connectionID)
{
Set<Entry<String, ServerSession>> sessionEntries =
sessions.entrySet();
Deleted:
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/protocol/stomp/ProtocolException.java
===================================================================
---
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/protocol/stomp/ProtocolException.java 2010-01-21
16:46:55 UTC (rev 8829)
+++
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/protocol/stomp/ProtocolException.java 2010-01-21
17:05:06 UTC (rev 8830)
@@ -1,50 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *
http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.hornetq.integration.protocol.stomp;
-
-import java.io.IOException;
-
-/**
- * @author <a href="http://hiramchirino.com">chirino</a>
- */
-class ProtocolException extends IOException {
- private static final long serialVersionUID = -2869735532997332242L;
- private final boolean fatal;
-
- public ProtocolException() {
- this(null);
- }
-
- public ProtocolException(String s) {
- this(s, false);
- }
-
- public ProtocolException(String s, boolean fatal) {
- this(s, fatal, null);
- }
-
- public ProtocolException(String s, boolean fatal, Throwable cause) {
- super(s);
- this.fatal = fatal;
- initCause(cause);
- }
-
- public boolean isFatal() {
- return fatal;
- }
-}
Modified:
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/protocol/stomp/StompChannelHandler.java
===================================================================
---
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/protocol/stomp/StompChannelHandler.java 2010-01-21
16:46:55 UTC (rev 8829)
+++
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/protocol/stomp/StompChannelHandler.java 2010-01-21
17:05:06 UTC (rev 8830)
@@ -14,6 +14,7 @@
package org.hornetq.integration.protocol.stomp;
import java.io.ByteArrayOutputStream;
+import java.io.IOException;
import java.io.OutputStreamWriter;
import java.io.PrintWriter;
import java.util.HashMap;
@@ -51,7 +52,7 @@
private final Map<RemotingConnection, ServerSession> sessions = new
HashMap<RemotingConnection, ServerSession>();
- private ServerHolder serverHandler;
+ private ServerHolder serverHolder;
public StompChannelHandler(ServerHolder serverHolder,
final ChannelGroup group,
@@ -59,18 +60,18 @@
final ConnectionLifeCycleListener listener)
{
super(group, listener, acceptor);
- this.serverHandler = serverHolder;
+ this.serverHolder = serverHolder;
this.marshaller = new StompMarshaller();
}
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws
Exception
{
StompFrame frame = (StompFrame)e.getMessage();
- System.out.println(">>> got frame " + frame);
+ System.out.println("RECEIVED " + frame);
// need to interact with HornetQ server & session
- HornetQServer server = serverHandler.getServer();
- RemotingConnection connection =
serverHandler.getRemotingConnection(e.getChannel().getId());
+ HornetQServer server = serverHolder.getServer();
+ RemotingConnection connection =
serverHolder.getRemotingConnection(e.getChannel().getId());
try
{
@@ -104,11 +105,7 @@
if (response != null)
{
- System.out.println(">>> will reply " + response);
- byte[] bytes = marshaller.marshal(response);
- HornetQBuffer buffer = HornetQBuffers.wrappedBuffer(bytes);
- System.out.println("ready to send reply: " + buffer);
- connection.getTransportConnection().write(buffer, true);
+ send(connection, response);
}
}
catch (StompException ex)
@@ -129,11 +126,7 @@
}
StompFrame errorMessage = new StompFrame(Stomp.Responses.ERROR, headers,
baos.toByteArray());
- byte[] bytes = marshaller.marshal(errorMessage);
- HornetQBuffer buffer = HornetQBuffers.wrappedBuffer(bytes);
- System.out.println("ready to send reply: " + buffer);
- connection.getTransportConnection().write(buffer, true);
-
+ send(connection, errorMessage);
}
catch (Exception ex)
{
@@ -141,21 +134,13 @@
}
}
- /**
- * @param frame
- * @param server
- * @param connection
- * @return
- * @throws StompException
- * @throws HornetQException
- */
private StompFrame onSubscribe(StompFrame frame, HornetQServer server,
RemotingConnection connection) throws Exception,
StompException,
HornetQException
{
Map<String, Object> headers = frame.getHeaders();
String queue = (String)headers.get(Stomp.Headers.Send.DESTINATION);
- SimpleString queueName = StompDestinationConverter.toHornetQAddress(queue);
+ SimpleString queueName = StompUtils.toHornetQAddress(queue);
ServerSession session = checkAndGetSession(connection);
long consumerID = server.getStorageManager().generateUniqueID();
@@ -215,7 +200,7 @@
boolean durable = false;
long expiration = -1;
byte priority = 9;
- SimpleString address = StompDestinationConverter.toHornetQAddress(queue);
+ SimpleString address = StompUtils.toHornetQAddress(queue);
ServerMessage message = new
ServerMessageImpl(server.getStorageManager().generateUniqueID(), 512);
message.setType(type);
@@ -252,17 +237,16 @@
String requestID = (String)headers.get(Stomp.Headers.Connect.REQUEST_ID);
String name = UUIDGenerator.getInstance().generateStringUUID();
- server.createSession(name,
- login,
- passcode,
- HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE,
- connection,
- true,
- true,
- false,
- false,
- new StompSessionCallback(marshaller, connection));
- ServerSession session = server.getSession(name);
+ ServerSession session = server.createSession(name,
+ login,
+ passcode,
+
HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE,
+ connection,
+ true,
+ true,
+ false,
+ false,
+ new StompSessionCallback(marshaller,
connection));
sessions.put(connection, session);
System.out.println(">>> created session " + session);
HashMap<String, Object> h = new HashMap<String, Object>();
@@ -270,4 +254,13 @@
h.put(Stomp.Headers.Connected.RESPONSE_ID, requestID);
return new StompFrame(Stomp.Responses.CONNECTED, h, new byte[] {});
}
+
+ private void send(RemotingConnection connection, StompFrame frame) throws IOException
+ {
+ System.out.println("SENDING >>> " + frame);
+ byte[] bytes = marshaller.marshal(frame);
+ HornetQBuffer buffer = HornetQBuffers.wrappedBuffer(bytes);
+ System.out.println("ready to send reply: " + buffer);
+ connection.getTransportConnection().write(buffer, true);
+ }
}
\ No newline at end of file
Deleted:
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/protocol/stomp/StompDestinationConverter.java
===================================================================
---
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/protocol/stomp/StompDestinationConverter.java 2010-01-21
16:46:55 UTC (rev 8829)
+++
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/protocol/stomp/StompDestinationConverter.java 2010-01-21
17:05:06 UTC (rev 8830)
@@ -1,118 +0,0 @@
-/*
- * Copyright 2010 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied. See the License for the specific language governing
- * permissions and limitations under the License.
- */
-
-package org.hornetq.integration.protocol.stomp;
-
-import org.hornetq.api.core.HornetQException;
-import org.hornetq.api.core.SimpleString;
-import org.hornetq.jms.client.HornetQQueue;
-import org.hornetq.jms.client.HornetQTemporaryQueue;
-import org.hornetq.jms.client.HornetQTemporaryTopic;
-import org.hornetq.jms.client.HornetQTopic;
-
-/**
- * A StompDestinationConverter
- *
- * @author jmesnil
- *
- *
- */
-class StompDestinationConverter
-{
-
- // Constants -----------------------------------------------------
-
- // Attributes ----------------------------------------------------
-
- // Static --------------------------------------------------------
-
- public static SimpleString toHornetQAddress(String stompDestination) throws
HornetQException
- {
- if (stompDestination == null)
- {
- throw new HornetQException(HornetQException.ILLEGAL_STATE, "No destination
is specified!");
- }
- else if (stompDestination.startsWith("/queue/"))
- {
- String queueName = stompDestination.substring("/queue/".length(),
stompDestination.length());
- return HornetQQueue.createAddressFromName(queueName);
- }
- else if (stompDestination.startsWith("/topic/"))
- {
- String topicName = stompDestination.substring("/topic/".length(),
stompDestination.length());
- return HornetQTopic.createAddressFromName(topicName);
- }
- else if (stompDestination.startsWith("/temp-queue/"))
- {
- String tempName = stompDestination.substring("/temp-queue/".length(),
stompDestination.length());
- return HornetQTemporaryQueue.createAddressFromName(tempName);
- }
- else if (stompDestination.startsWith("/temp-topic/"))
- {
- String tempName = stompDestination.substring("/temp-topic/".length(),
stompDestination.length());
- return HornetQTemporaryTopic.createAddressFromName(tempName);
- }
- else
- {
- throw new HornetQException(HornetQException.ILLEGAL_STATE, "Illegal
destination name: [" + stompDestination +
- "] --
StompConnect destinations " +
- "must begine
with one of: /queue/ /topic/ /temp-queue/ /temp-topic/");
- }
- }
-
- public static String toStompDestination(String hornetqAddress) throws
HornetQException
- {
- if (hornetqAddress == null)
- {
- throw new HornetQException(HornetQException.ILLEGAL_STATE, "No destination
is specified!");
- }
- else if (hornetqAddress.startsWith(HornetQQueue.JMS_QUEUE_ADDRESS_PREFIX))
- {
- return "/queue/" +
hornetqAddress.substring(HornetQQueue.JMS_QUEUE_ADDRESS_PREFIX.length(),
- hornetqAddress.length());
- }
- else if
(hornetqAddress.startsWith(HornetQTemporaryQueue.JMS_TEMP_QUEUE_ADDRESS_PREFIX))
- {
- return "/temp-queue/" +
hornetqAddress.substring(HornetQTemporaryQueue.JMS_TEMP_QUEUE_ADDRESS_PREFIX.length(),
- hornetqAddress.length());
- }
- else if (hornetqAddress.startsWith(HornetQTopic.JMS_TOPIC_ADDRESS_PREFIX))
- {
- return "/topic/" +
hornetqAddress.substring(HornetQTopic.JMS_TOPIC_ADDRESS_PREFIX.length(),
- hornetqAddress.length());
- }
- else if
(hornetqAddress.startsWith(HornetQTemporaryTopic.JMS_TEMP_TOPIC_ADDRESS_PREFIX))
- {
- return "/temp-topic/" +
hornetqAddress.substring(HornetQTemporaryTopic.JMS_TEMP_TOPIC_ADDRESS_PREFIX.length(),
- hornetqAddress.length());
- }
- else
- {
- throw new HornetQException(HornetQException.ILLEGAL_STATE, "Illegal address
name: [" + hornetqAddress +
- "] -- Acceptable
address must comply to JMS semantics");
- }
- }
-
- // Constructors --------------------------------------------------
-
- // Public --------------------------------------------------------
-
- // Package protected ---------------------------------------------
-
- // Protected -----------------------------------------------------
-
- // Private -------------------------------------------------------
-
- // Inner classes -------------------------------------------------
-
-}
Deleted:
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/protocol/stomp/StompException.java
===================================================================
---
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/protocol/stomp/StompException.java 2010-01-21
16:46:55 UTC (rev 8829)
+++
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/protocol/stomp/StompException.java 2010-01-21
17:05:06 UTC (rev 8830)
@@ -1,50 +0,0 @@
-/*
- * Copyright 2010 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied. See the License for the specific language governing
- * permissions and limitations under the License.
- */
-
-package org.hornetq.integration.protocol.stomp;
-
-/**
- * A StompException
- *
- * @author <a href="mailto:jmesnil@redhat.com">Jeff Mesnil</a>
- */
-class StompException extends Exception
-{
-
- /**
- * @param string
- */
- public StompException(String string)
- {
- super(string);
- }
-
- // Constants -----------------------------------------------------
-
- // Attributes ----------------------------------------------------
-
- // Static --------------------------------------------------------
-
- // Constructors --------------------------------------------------
-
- // Public --------------------------------------------------------
-
- // Package protected ---------------------------------------------
-
- // Protected -----------------------------------------------------
-
- // Private -------------------------------------------------------
-
- // Inner classes -------------------------------------------------
-
-}
Copied:
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/protocol/stomp/StompException.java
(from rev 8829,
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/protocol/stomp/ProtocolException.java)
===================================================================
---
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/protocol/stomp/StompException.java
(rev 0)
+++
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/protocol/stomp/StompException.java 2010-01-21
17:05:06 UTC (rev 8830)
@@ -0,0 +1,50 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.hornetq.integration.protocol.stomp;
+
+import java.io.IOException;
+
+/**
+ * @author <a href="http://hiramchirino.com">chirino</a>
+ */
+class StompException extends IOException {
+ private static final long serialVersionUID = -2869735532997332242L;
+ private final boolean fatal;
+
+ public StompException() {
+ this(null);
+ }
+
+ public StompException(String s) {
+ this(s, false);
+ }
+
+ public StompException(String s, boolean fatal) {
+ this(s, fatal, null);
+ }
+
+ public StompException(String s, boolean fatal, Throwable cause) {
+ super(s);
+ this.fatal = fatal;
+ initCause(cause);
+ }
+
+ public boolean isFatal() {
+ return fatal;
+ }
+}
Copied:
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/protocol/stomp/StompFrameDecoder.java
(from rev 8829,
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/protocol/stomp/StompPacketDecoder.java)
===================================================================
---
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/protocol/stomp/StompFrameDecoder.java
(rev 0)
+++
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/protocol/stomp/StompFrameDecoder.java 2010-01-21
17:05:06 UTC (rev 8830)
@@ -0,0 +1,51 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.integration.protocol.stomp;
+
+import org.hornetq.api.core.HornetQBuffer;
+import org.hornetq.core.buffers.impl.ChannelBufferWrapper;
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.channel.ChannelPipelineCoverage;
+import org.jboss.netty.channel.Channels;
+import org.jboss.netty.channel.MessageEvent;
+import org.jboss.netty.channel.SimpleChannelHandler;
+
+/**
+ * A StompFrameDecoder
+ *
+ * @author <a href="mailto:jmesnil@redhat.com">Jeff Mesnil</a>
+ */
+@ChannelPipelineCoverage("one")
+public class StompFrameDecoder extends SimpleChannelHandler
+{
+ private final StompMarshaller marshaller;
+
+ // PacketDecoder implementation ----------------------------------
+
+ public StompFrameDecoder()
+ {
+ this.marshaller = new StompMarshaller();
+ }
+
+ @Override
+ public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws
Exception
+ {
+ ChannelBuffer in = (ChannelBuffer)e.getMessage();
+ HornetQBuffer buffer = new ChannelBufferWrapper(in);
+ StompFrame frame = marshaller.unmarshal(buffer);
+
+ Channels.fireMessageReceived(ctx, frame);
+ }
+}
Modified:
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/protocol/stomp/StompFrameError.java
===================================================================
---
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/protocol/stomp/StompFrameError.java 2010-01-21
16:46:55 UTC (rev 8829)
+++
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/protocol/stomp/StompFrameError.java 2010-01-21
17:05:06 UTC (rev 8830)
@@ -23,13 +23,13 @@
* @author <a href="http://hiramchirino.com">chirino</a>
*/
class StompFrameError extends StompFrame {
- private final ProtocolException exception;
+ private final StompException exception;
- public StompFrameError(ProtocolException exception) {
+ public StompFrameError(StompException exception) {
this.exception = exception;
}
- public ProtocolException getException() {
+ public StompException getException() {
return exception;
}
}
Modified:
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/protocol/stomp/StompMarshaller.java
===================================================================
---
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/protocol/stomp/StompMarshaller.java 2010-01-21
16:46:55 UTC (rev 8829)
+++
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/protocol/stomp/StompMarshaller.java 2010-01-21
17:05:06 UTC (rev 8830)
@@ -103,7 +103,7 @@
if (line != null && line.trim().length() > 0) {
if (headers.size() > MAX_HEADERS) {
- throw new ProtocolException("The maximum number of headers
was exceeded", true);
+ throw new StompException("The maximum number of headers was
exceeded", true);
}
try {
@@ -113,7 +113,7 @@
headers.put(name, value);
}
catch (Exception e) {
- throw new ProtocolException("Unable to parser header line
[" + line + "]", true);
+ throw new StompException("Unable to parser header line
[" + line + "]", true);
}
}
else {
@@ -132,18 +132,18 @@
length = Integer.parseInt(contentLength.trim());
}
catch (NumberFormatException e) {
- throw new ProtocolException("Specified content-length is not a
valid integer", true);
+ throw new StompException("Specified content-length is not a
valid integer", true);
}
if (length > MAX_DATA_LENGTH) {
- throw new ProtocolException("The maximum data length was
exceeded", true);
+ throw new StompException("The maximum data length was
exceeded", true);
}
data = new byte[length];
in.readBytes(data);
if (in.readByte() != 0) {
- throw new ProtocolException(Stomp.Headers.CONTENT_LENGTH + "
bytes were read and " + "there was no trailing null byte", true);
+ throw new StompException(Stomp.Headers.CONTENT_LENGTH + " bytes
were read and " + "there was no trailing null byte", true);
}
}
else {
@@ -157,7 +157,7 @@
baos = new ByteArrayOutputStream();
}
else if (baos.size() > MAX_DATA_LENGTH) {
- throw new ProtocolException("The maximum data length was
exceeded", true);
+ throw new StompException("The maximum data length was
exceeded", true);
}
baos.write(b);
@@ -171,7 +171,7 @@
return new StompFrame(action, headers, data);
}
- catch (ProtocolException e) {
+ catch (StompException e) {
return new StompFrameError(e);
}
}
@@ -181,7 +181,7 @@
ByteArrayOutputStream baos = new ByteArrayOutputStream(maxLength);
while ((b = in.readByte()) != '\n') {
if (baos.size() > maxLength) {
- throw new ProtocolException(errorMessage, true);
+ throw new StompException(errorMessage, true);
}
baos.write(b);
}
Deleted:
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/protocol/stomp/StompPacketDecoder.java
===================================================================
---
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/protocol/stomp/StompPacketDecoder.java 2010-01-21
16:46:55 UTC (rev 8829)
+++
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/protocol/stomp/StompPacketDecoder.java 2010-01-21
17:05:06 UTC (rev 8830)
@@ -1,51 +0,0 @@
-/*
- * Copyright 2010 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied. See the License for the specific language governing
- * permissions and limitations under the License.
- */
-
-package org.hornetq.integration.protocol.stomp;
-
-import org.hornetq.api.core.HornetQBuffer;
-import org.hornetq.core.buffers.impl.ChannelBufferWrapper;
-import org.jboss.netty.buffer.ChannelBuffer;
-import org.jboss.netty.channel.ChannelHandlerContext;
-import org.jboss.netty.channel.ChannelPipelineCoverage;
-import org.jboss.netty.channel.Channels;
-import org.jboss.netty.channel.MessageEvent;
-import org.jboss.netty.channel.SimpleChannelHandler;
-
-/**
- * A StompPacketDecoder
- *
- * @author <a href="mailto:jmesnil@redhat.com">Jeff Mesnil</a>
- */
-@ChannelPipelineCoverage("one")
-public class StompPacketDecoder extends SimpleChannelHandler
-{
- private final StompMarshaller marshaller;
-
- // PacketDecoder implementation ----------------------------------
-
- public StompPacketDecoder()
- {
- this.marshaller = new StompMarshaller();
- }
-
- @Override
- public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws
Exception
- {
- ChannelBuffer in = (ChannelBuffer)e.getMessage();
- HornetQBuffer buffer = new ChannelBufferWrapper(in);
- StompFrame frame = marshaller.unmarshal(buffer);
-
- Channels.fireMessageReceived(ctx, frame);
- }
-}
Modified:
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/protocol/stomp/StompSessionCallback.java
===================================================================
---
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/protocol/stomp/StompSessionCallback.java 2010-01-21
16:46:55 UTC (rev 8829)
+++
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/protocol/stomp/StompSessionCallback.java 2010-01-21
17:05:06 UTC (rev 8830)
@@ -49,7 +49,7 @@
try
{
Map<String, Object> headers = new HashMap<String, Object>();
- headers.put(Stomp.Headers.Message.DESTINATION,
StompDestinationConverter.toStompDestination(serverMessage.getAddress()
+ headers.put(Stomp.Headers.Message.DESTINATION,
StompUtils.toStompDestination(serverMessage.getAddress()
.toString()));
byte[] data = new byte[] {};
if (serverMessage.getType() == HornetQTextMessage.TYPE)
Copied:
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/protocol/stomp/StompUtils.java
(from rev 8829,
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/protocol/stomp/StompDestinationConverter.java)
===================================================================
---
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/protocol/stomp/StompUtils.java
(rev 0)
+++
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/protocol/stomp/StompUtils.java 2010-01-21
17:05:06 UTC (rev 8830)
@@ -0,0 +1,118 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.integration.protocol.stomp;
+
+import org.hornetq.api.core.HornetQException;
+import org.hornetq.api.core.SimpleString;
+import org.hornetq.jms.client.HornetQQueue;
+import org.hornetq.jms.client.HornetQTemporaryQueue;
+import org.hornetq.jms.client.HornetQTemporaryTopic;
+import org.hornetq.jms.client.HornetQTopic;
+
+/**
+ * A StompUtils
+ *
+ * @author <a href="mailto:jmesnil@redhat.com">Jeff Mesnil</a>
+ *
+ *
+ */
+class StompUtils
+{
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ // Static --------------------------------------------------------
+
+ public static SimpleString toHornetQAddress(String stompDestination) throws
HornetQException
+ {
+ if (stompDestination == null)
+ {
+ throw new HornetQException(HornetQException.ILLEGAL_STATE, "No destination
is specified!");
+ }
+ else if (stompDestination.startsWith("/queue/"))
+ {
+ String queueName = stompDestination.substring("/queue/".length(),
stompDestination.length());
+ return HornetQQueue.createAddressFromName(queueName);
+ }
+ else if (stompDestination.startsWith("/topic/"))
+ {
+ String topicName = stompDestination.substring("/topic/".length(),
stompDestination.length());
+ return HornetQTopic.createAddressFromName(topicName);
+ }
+ else if (stompDestination.startsWith("/temp-queue/"))
+ {
+ String tempName = stompDestination.substring("/temp-queue/".length(),
stompDestination.length());
+ return HornetQTemporaryQueue.createAddressFromName(tempName);
+ }
+ else if (stompDestination.startsWith("/temp-topic/"))
+ {
+ String tempName = stompDestination.substring("/temp-topic/".length(),
stompDestination.length());
+ return HornetQTemporaryTopic.createAddressFromName(tempName);
+ }
+ else
+ {
+ throw new HornetQException(HornetQException.ILLEGAL_STATE, "Illegal
destination name: [" + stompDestination +
+ "] --
StompConnect destinations " +
+ "must begine
with one of: /queue/ /topic/ /temp-queue/ /temp-topic/");
+ }
+ }
+
+ public static String toStompDestination(String hornetqAddress) throws
HornetQException
+ {
+ if (hornetqAddress == null)
+ {
+ throw new HornetQException(HornetQException.ILLEGAL_STATE, "No destination
is specified!");
+ }
+ else if (hornetqAddress.startsWith(HornetQQueue.JMS_QUEUE_ADDRESS_PREFIX))
+ {
+ return "/queue/" +
hornetqAddress.substring(HornetQQueue.JMS_QUEUE_ADDRESS_PREFIX.length(),
+ hornetqAddress.length());
+ }
+ else if
(hornetqAddress.startsWith(HornetQTemporaryQueue.JMS_TEMP_QUEUE_ADDRESS_PREFIX))
+ {
+ return "/temp-queue/" +
hornetqAddress.substring(HornetQTemporaryQueue.JMS_TEMP_QUEUE_ADDRESS_PREFIX.length(),
+ hornetqAddress.length());
+ }
+ else if (hornetqAddress.startsWith(HornetQTopic.JMS_TOPIC_ADDRESS_PREFIX))
+ {
+ return "/topic/" +
hornetqAddress.substring(HornetQTopic.JMS_TOPIC_ADDRESS_PREFIX.length(),
+ hornetqAddress.length());
+ }
+ else if
(hornetqAddress.startsWith(HornetQTemporaryTopic.JMS_TEMP_TOPIC_ADDRESS_PREFIX))
+ {
+ return "/temp-topic/" +
hornetqAddress.substring(HornetQTemporaryTopic.JMS_TEMP_TOPIC_ADDRESS_PREFIX.length(),
+ hornetqAddress.length());
+ }
+ else
+ {
+ throw new HornetQException(HornetQException.ILLEGAL_STATE, "Illegal address
name: [" + hornetqAddress +
+ "] -- Acceptable
address must comply to JMS semantics");
+ }
+ }
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
Modified:
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/ChannelPipelineSupport.java
===================================================================
---
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/ChannelPipelineSupport.java 2010-01-21
16:46:55 UTC (rev 8829)
+++
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/ChannelPipelineSupport.java 2010-01-21
17:05:06 UTC (rev 8830)
@@ -16,8 +16,6 @@
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLEngine;
-import org.hornetq.integration.protocol.stomp.StompFrameDelimiter;
-import org.hornetq.integration.protocol.stomp.StompPacketDecoder;
import org.hornetq.spi.core.remoting.BufferHandler;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.handler.ssl.SslHandler;
@@ -46,13 +44,6 @@
// Public --------------------------------------------------------
- public static void addStompStack(final ChannelPipeline pipeline, final ServerHolder
serverHandler)
- {
- assert pipeline != null;
- pipeline.addLast("delimiter", new StompFrameDelimiter());
- pipeline.addLast("codec", new StompPacketDecoder());
- }
-
public static void addHornetQCodecFilter(final ChannelPipeline pipeline, final
BufferHandler handler)
{
assert pipeline != null;
Modified:
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/NettyAcceptor.java
===================================================================
---
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/NettyAcceptor.java 2010-01-21
16:46:55 UTC (rev 8829)
+++
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/NettyAcceptor.java 2010-01-21
17:05:06 UTC (rev 8830)
@@ -37,6 +37,8 @@
import org.hornetq.core.server.management.Notification;
import org.hornetq.core.server.management.NotificationService;
import org.hornetq.integration.protocol.stomp.StompChannelHandler;
+import org.hornetq.integration.protocol.stomp.StompFrameDelimiter;
+import org.hornetq.integration.protocol.stomp.StompFrameDecoder;
import org.hornetq.spi.core.remoting.Acceptor;
import org.hornetq.spi.core.remoting.BufferHandler;
import org.hornetq.spi.core.remoting.Connection;
@@ -132,7 +134,7 @@
private VirtualExecutorService bossExecutor;
- private ServerHolder serverHandler;
+ private ServerHolder serverHolder;
public NettyAcceptor(final Map<String, Object> configuration,
final BufferHandler handler,
@@ -143,7 +145,7 @@
{
this.handler = handler;
- this.serverHandler = serverHandler;
+ this.serverHolder = serverHandler;
this.listener = listener;
@@ -291,8 +293,9 @@
}
if (protocol.equals(TransportConstants.STOMP_PROTOCOL))
{
- ChannelPipelineSupport.addStompStack(pipeline, serverHandler);
- pipeline.addLast("handler", new
StompChannelHandler(serverHandler,
+ pipeline.addLast("delimiter", new StompFrameDelimiter());
+ pipeline.addLast("codec", new StompFrameDecoder());
+ pipeline.addLast("handler", new
StompChannelHandler(serverHolder,
channelGroup,
NettyAcceptor.this,
new Listener()));
Deleted:
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/ServerHolder.java
===================================================================
---
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/ServerHolder.java 2010-01-21
16:46:55 UTC (rev 8829)
+++
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/ServerHolder.java 2010-01-21
17:05:06 UTC (rev 8830)
@@ -1,31 +0,0 @@
-/*
- * Copyright 2010 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied. See the License for the specific language governing
- * permissions and limitations under the License.
- */
-
-package org.hornetq.integration.transports.netty;
-
-import org.hornetq.core.remoting.RemotingConnection;
-import org.hornetq.core.server.HornetQServer;
-
-/**
- * A ServerHolder
- *
- * @author jmesnil
- *
- *
- */
-public interface ServerHolder
-{
- HornetQServer getServer();
-
- RemotingConnection getRemotingConnection(int connectionID);
-}
Copied:
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/ServerHolder.java
(from rev 8829,
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/ServerHolder.java)
===================================================================
---
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/ServerHolder.java
(rev 0)
+++
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/ServerHolder.java 2010-01-21
17:05:06 UTC (rev 8830)
@@ -0,0 +1,31 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.integration.transports.netty;
+
+import org.hornetq.core.remoting.RemotingConnection;
+import org.hornetq.core.server.HornetQServer;
+
+/**
+ * A ServerHolder
+ *
+ * @author jmesnil
+ *
+ *
+ */
+public interface ServerHolder
+{
+ HornetQServer getServer();
+
+ RemotingConnection getRemotingConnection(int connectionID);
+}