[hornetq-commits] JBoss hornetq SVN: r8829 - in branches/HORNETQ-129_STOMP_protocol: src/main/org/hornetq/core/remoting/server and 7 other directories.
do-not-reply at jboss.org
do-not-reply at jboss.org
Thu Jan 21 11:46:55 EST 2010
Author: jmesnil
Date: 2010-01-21 11:46:55 -0500 (Thu, 21 Jan 2010)
New Revision: 8829
Added:
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/protocol/core/CoreSessionCallback.java
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/protocol/stomp/StompChannelHandler.java
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/protocol/stomp/StompSessionCallback.java
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/AbstractServerChannelHandler.java
Removed:
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/stomp/
Modified:
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/protocol/core/HornetQPacketHandler.java
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/protocol/core/ServerSessionPacketHandler.java
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/remoting/server/ProtocolManager.java
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/server/HornetQServer.java
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/server/ServerSession.java
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/protocol/stomp/ProtocolException.java
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/protocol/stomp/StompDestinationConverter.java
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/protocol/stomp/StompException.java
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/protocol/stomp/StompFrame.java
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/protocol/stomp/StompFrameError.java
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/protocol/stomp/StompMarshaller.java
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/protocol/stomp/StompPacketDecoder.java
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/ChannelPipelineSupport.java
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/NettyAcceptor.java
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/ServerHolder.java
branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/integration/stomp/StompTest.java
Log:
https://jira.jboss.org/jira/browse/HORNETQ-129: Implement STOMP v1.0
* refactored so that Stomp implementation is all located in o.h.integration.protocol.stomp
* modified HornetQServer.createSession to pass the SessionCallback directly
Added: branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/protocol/core/CoreSessionCallback.java
===================================================================
--- branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/protocol/core/CoreSessionCallback.java (rev 0)
+++ branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/protocol/core/CoreSessionCallback.java 2010-01-21 16:46:55 UTC (rev 8829)
@@ -0,0 +1,72 @@
+package org.hornetq.core.protocol.core;
+
+import org.hornetq.api.core.SimpleString;
+import org.hornetq.core.protocol.core.wireformat.SessionProducerCreditsMessage;
+import org.hornetq.core.protocol.core.wireformat.SessionReceiveContinuationMessage;
+import org.hornetq.core.protocol.core.wireformat.SessionReceiveLargeMessage;
+import org.hornetq.core.protocol.core.wireformat.SessionReceiveMessage;
+import org.hornetq.core.remoting.server.ProtocolManager;
+import org.hornetq.core.server.ServerMessage;
+import org.hornetq.core.server.SessionCallback;
+
+/**
+ * A CoreSessionCallback
+ *
+ * @author jmesnil
+ *
+ *
+ */
+public final class CoreSessionCallback implements SessionCallback
+{
+ private final Channel channel;
+
+ private ProtocolManager protocolManager;
+
+ private String name;
+
+ public CoreSessionCallback(String name, ProtocolManager protocolManager, Channel channel)
+ {
+ this.name = name;
+ this.protocolManager = protocolManager;
+ this.channel = channel;
+ }
+
+ public int sendLargeMessage(long consumerID, byte[] headerBuffer, long bodySize, int deliveryCount)
+ {
+ Packet packet = new SessionReceiveLargeMessage(consumerID, headerBuffer, bodySize, deliveryCount);
+
+ channel.send(packet);
+
+ return packet.getPacketSize();
+ }
+
+ public int sendLargeMessageContinuation(long consumerID, byte[] body, boolean continues, boolean requiresResponse)
+ {
+ Packet packet = new SessionReceiveContinuationMessage(consumerID, body, continues, requiresResponse);
+
+ channel.send(packet);
+
+ return packet.getPacketSize();
+ }
+
+ public int sendMessage(ServerMessage message, long consumerID, int deliveryCount)
+ {
+ Packet packet = new SessionReceiveMessage(consumerID, message, deliveryCount);
+
+ channel.send(packet);
+
+ return packet.getPacketSize();
+ }
+
+ public void sendProducerCreditsMessage(int credits, SimpleString address, int offset)
+ {
+ Packet packet = new SessionProducerCreditsMessage(credits, address, offset);
+
+ channel.send(packet);
+ }
+
+ public void closed()
+ {
+ protocolManager.removeHandler(name);
+ }
+}
\ No newline at end of file
Modified: branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/protocol/core/HornetQPacketHandler.java
===================================================================
--- branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/protocol/core/HornetQPacketHandler.java 2010-01-21 15:40:24 UTC (rev 8828)
+++ branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/protocol/core/HornetQPacketHandler.java 2010-01-21 16:46:55 UTC (rev 8829)
@@ -148,7 +148,7 @@
"Server will not accept create session requests");
}
- Channel channel = connection.getChannel(request.getSessionChannelID(), request.getWindowSize());
+ final Channel channel = connection.getChannel(request.getSessionChannelID(), request.getWindowSize());
ServerSession session = server.createSession(request.getName(),
request.getUsername(),
@@ -158,18 +158,16 @@
request.isAutoCommitSends(),
request.isAutoCommitAcks(),
request.isPreAcknowledge(),
- request.isXA());
+ request.isXA(),
+ new CoreSessionCallback(request.getName(), protocolManager, channel));
- ServerSessionPacketHandler handler = new ServerSessionPacketHandler(protocolManager,
- session,
+ ServerSessionPacketHandler handler = new ServerSessionPacketHandler(session,
server.getStorageManager()
.newContext(server.getExecutorFactory()
.getExecutor()),
server.getStorageManager(),
channel);
- session.setCallback(handler);
-
channel.setHandler(handler);
// TODO - where is this removed?
Modified: branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/protocol/core/ServerSessionPacketHandler.java
===================================================================
--- branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/protocol/core/ServerSessionPacketHandler.java 2010-01-21 15:40:24 UTC (rev 8828)
+++ branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/protocol/core/ServerSessionPacketHandler.java 2010-01-21 16:46:55 UTC (rev 8829)
@@ -50,7 +50,6 @@
import javax.transaction.xa.Xid;
import org.hornetq.api.core.HornetQException;
-import org.hornetq.api.core.SimpleString;
import org.hornetq.core.exception.HornetQXAException;
import org.hornetq.core.journal.IOAsyncTask;
import org.hornetq.core.logging.Logger;
@@ -69,12 +68,8 @@
import org.hornetq.core.protocol.core.wireformat.SessionDeleteQueueMessage;
import org.hornetq.core.protocol.core.wireformat.SessionExpiredMessage;
import org.hornetq.core.protocol.core.wireformat.SessionForceConsumerDelivery;
-import org.hornetq.core.protocol.core.wireformat.SessionProducerCreditsMessage;
import org.hornetq.core.protocol.core.wireformat.SessionQueueQueryMessage;
import org.hornetq.core.protocol.core.wireformat.SessionQueueQueryResponseMessage;
-import org.hornetq.core.protocol.core.wireformat.SessionReceiveContinuationMessage;
-import org.hornetq.core.protocol.core.wireformat.SessionReceiveLargeMessage;
-import org.hornetq.core.protocol.core.wireformat.SessionReceiveMessage;
import org.hornetq.core.protocol.core.wireformat.SessionRequestProducerCreditsMessage;
import org.hornetq.core.protocol.core.wireformat.SessionSendContinuationMessage;
import org.hornetq.core.protocol.core.wireformat.SessionSendLargeMessage;
@@ -98,7 +93,6 @@
import org.hornetq.core.server.QueueQueryResult;
import org.hornetq.core.server.ServerMessage;
import org.hornetq.core.server.ServerSession;
-import org.hornetq.core.server.SessionCallback;
/**
* A ServerSessionPacketHandler
@@ -108,12 +102,10 @@
* @author <a href="mailto:andy.taylor at jboss.org>Andy Taylor</a>
* @author <a href="mailto:clebert.suconic at jboss.org>Clebert Suconic</a>
*/
-public class ServerSessionPacketHandler implements ChannelHandler, CloseListener, FailureListener, SessionCallback
+public class ServerSessionPacketHandler implements ChannelHandler, CloseListener, FailureListener
{
private static final Logger log = Logger.getLogger(ServerSessionPacketHandler.class);
- private final CoreProtocolManager protocolManager;
-
private final ServerSession session;
private final OperationContext sessionContext;
@@ -125,14 +117,11 @@
private volatile CoreRemotingConnection remotingConnection;
- public ServerSessionPacketHandler(final CoreProtocolManager protocolManager,
- final ServerSession session,
+ public ServerSessionPacketHandler(final ServerSession session,
final OperationContext sessionContext,
final StorageManager storageManager,
final Channel channel)
{
- this.protocolManager = protocolManager;
-
this.session = session;
this.storageManager = storageManager;
@@ -565,46 +554,7 @@
channel.close();
}
}
-
- public int sendLargeMessage(long consumerID, byte[] headerBuffer, long bodySize, int deliveryCount)
- {
- Packet packet = new SessionReceiveLargeMessage(consumerID, headerBuffer, bodySize, deliveryCount);
-
- channel.send(packet);
-
- return packet.getPacketSize();
- }
-
- public int sendLargeMessageContinuation(long consumerID, byte[] body, boolean continues, boolean requiresResponse)
- {
- Packet packet = new SessionReceiveContinuationMessage(consumerID, body, continues, requiresResponse);
-
- channel.send(packet);
-
- return packet.getPacketSize();
- }
-
- public int sendMessage(ServerMessage message, long consumerID, int deliveryCount)
- {
- Packet packet = new SessionReceiveMessage(consumerID, message, deliveryCount);
-
- channel.send(packet);
-
- return packet.getPacketSize();
- }
-
- public void sendProducerCreditsMessage(int credits, SimpleString address, int offset)
- {
- Packet packet = new SessionProducerCreditsMessage(credits, address, offset);
-
- channel.send(packet);
- }
- public void closed()
- {
- protocolManager.removeHandler(session.getName());
- }
-
public int transferConnection(final CoreRemotingConnection newConnection, final int lastReceivedCommandID)
{
// We need to disable delivery on all the consumers while the transfer is occurring- otherwise packets might get
Modified: branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/remoting/server/ProtocolManager.java
===================================================================
--- branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/remoting/server/ProtocolManager.java 2010-01-21 15:40:24 UTC (rev 8828)
+++ branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/remoting/server/ProtocolManager.java 2010-01-21 16:46:55 UTC (rev 8829)
@@ -26,4 +26,6 @@
{
ConnectionEntry createConnectionEntry(Connection connection);
+ void removeHandler(String name);
+
}
Modified: branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java
===================================================================
--- branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java 2010-01-21 15:40:24 UTC (rev 8828)
+++ branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java 2010-01-21 16:46:55 UTC (rev 8829)
@@ -29,7 +29,6 @@
import org.hornetq.core.config.Configuration;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.protocol.core.CoreProtocolManager;
-import org.hornetq.core.protocol.core.CoreRemotingConnection;
import org.hornetq.core.remoting.ProtocolType;
import org.hornetq.core.remoting.RemotingConnection;
import org.hornetq.core.remoting.server.ConnectionEntry;
@@ -170,14 +169,13 @@
return server;
}
- public CoreRemotingConnection getRemotingConnection(int connectionID)
+ public RemotingConnection getRemotingConnection(int connectionID)
{
ConnectionEntry conn = connections.get(connectionID);
if (conn != null)
{
- // FIXME ....
- return (CoreRemotingConnection)conn.connection;
+ return conn.connection;
}
else
{
Modified: branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/server/HornetQServer.java
===================================================================
--- branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/server/HornetQServer.java 2010-01-21 15:40:24 UTC (rev 8828)
+++ branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/server/HornetQServer.java 2010-01-21 16:46:55 UTC (rev 8829)
@@ -24,7 +24,7 @@
import org.hornetq.core.persistence.StorageManager;
import org.hornetq.core.postoffice.PostOffice;
import org.hornetq.core.protocol.core.Channel;
-import org.hornetq.core.protocol.core.CoreRemotingConnection;
+import org.hornetq.core.remoting.RemotingConnection;
import org.hornetq.core.remoting.server.RemotingService;
import org.hornetq.core.replication.ReplicationEndpoint;
import org.hornetq.core.security.Role;
@@ -76,11 +76,12 @@
String username,
String password,
int minLargeMessageSize,
- CoreRemotingConnection remotingConnection,
+ RemotingConnection remotingConnection,
boolean autoCommitSends,
boolean autoCommitAcks,
boolean preAcknowledge,
- boolean xa) throws Exception;
+ boolean xa,
+ final SessionCallback callback) throws Exception;
void removeSession(String name) throws Exception;
Modified: branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/server/ServerSession.java
===================================================================
--- branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/server/ServerSession.java 2010-01-21 15:40:24 UTC (rev 8828)
+++ branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/server/ServerSession.java 2010-01-21 16:46:55 UTC (rev 8829)
@@ -110,6 +110,4 @@
void setTransferring(boolean transferring);
void runConnectionFailureRunners();
-
- void setCallback(SessionCallback callback);
}
Modified: branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2010-01-21 15:40:24 UTC (rev 8828)
+++ branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2010-01-21 16:46:55 UTC (rev 8829)
@@ -69,7 +69,7 @@
import org.hornetq.core.postoffice.impl.LocalQueueBinding;
import org.hornetq.core.postoffice.impl.PostOfficeImpl;
import org.hornetq.core.protocol.core.Channel;
-import org.hornetq.core.protocol.core.CoreRemotingConnection;
+import org.hornetq.core.remoting.RemotingConnection;
import org.hornetq.core.remoting.server.RemotingService;
import org.hornetq.core.remoting.server.impl.RemotingServiceImpl;
import org.hornetq.core.replication.ReplicationEndpoint;
@@ -87,6 +87,7 @@
import org.hornetq.core.server.Queue;
import org.hornetq.core.server.QueueFactory;
import org.hornetq.core.server.ServerSession;
+import org.hornetq.core.server.SessionCallback;
import org.hornetq.core.server.cluster.ClusterManager;
import org.hornetq.core.server.cluster.DivertConfiguration;
import org.hornetq.core.server.cluster.QueueConfiguration;
@@ -538,11 +539,12 @@
final String username,
final String password,
final int minLargeMessageSize,
- final CoreRemotingConnection connection,
+ final RemotingConnection connection,
final boolean autoCommitSends,
final boolean autoCommitAcks,
final boolean preAcknowledge,
- final boolean xa) throws Exception
+ final boolean xa,
+ final SessionCallback callback) throws Exception
{
if (securityStore != null)
{
@@ -565,7 +567,8 @@
securityStore,
managementService,
this,
- configuration.getManagementAddress());
+ configuration.getManagementAddress(),
+ callback);
sessions.put(name, session);
Modified: branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
===================================================================
--- branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2010-01-21 15:40:24 UTC (rev 8828)
+++ branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2010-01-21 16:46:55 UTC (rev 8829)
@@ -42,9 +42,9 @@
import org.hornetq.core.postoffice.Bindings;
import org.hornetq.core.postoffice.PostOffice;
import org.hornetq.core.postoffice.QueueBinding;
-import org.hornetq.core.protocol.core.CoreRemotingConnection;
import org.hornetq.core.remoting.CloseListener;
import org.hornetq.core.remoting.FailureListener;
+import org.hornetq.core.remoting.RemotingConnection;
import org.hornetq.core.security.CheckType;
import org.hornetq.core.security.SecurityStore;
import org.hornetq.core.server.BindingQueryResult;
@@ -99,7 +99,7 @@
private final boolean strictUpdateDeliveryCount;
- private CoreRemotingConnection remotingConnection;
+ private RemotingConnection remotingConnection;
private final Map<Long, ServerConsumer> consumers = new ConcurrentHashMap<Long, ServerConsumer>();
@@ -134,7 +134,7 @@
private final RoutingContext routingContext = new RoutingContextImpl(null);
- private SessionCallback callback;
+ private final SessionCallback callback;
// Constructors ---------------------------------------------------------------------------------
@@ -147,14 +147,15 @@
final boolean preAcknowledge,
final boolean strictUpdateDeliveryCount,
final boolean xa,
- final CoreRemotingConnection remotingConnection,
+ final RemotingConnection remotingConnection,
final StorageManager storageManager,
final PostOffice postOffice,
final ResourceManager resourceManager,
final SecurityStore securityStore,
final ManagementService managementService,
final HornetQServer server,
- final SimpleString managementAddress) throws Exception
+ final SimpleString managementAddress,
+ final SessionCallback callback) throws Exception
{
this.username = username;
@@ -193,6 +194,8 @@
this.managementAddress = managementAddress;
+ this.callback = callback;
+
remotingConnection.addFailureListener(this);
remotingConnection.addCloseListener(this);
@@ -200,11 +203,6 @@
// ServerSession implementation ----------------------------------------------------------------------------
- public void setCallback(final SessionCallback callback)
- {
- this.callback = callback;
- }
-
public String getUsername()
{
return username;
Modified: branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/protocol/stomp/ProtocolException.java
===================================================================
--- branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/protocol/stomp/ProtocolException.java 2010-01-21 15:40:24 UTC (rev 8828)
+++ branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/protocol/stomp/ProtocolException.java 2010-01-21 16:46:55 UTC (rev 8829)
@@ -22,7 +22,7 @@
/**
* @author <a href="http://hiramchirino.com">chirino</a>
*/
-public class ProtocolException extends IOException {
+class ProtocolException extends IOException {
private static final long serialVersionUID = -2869735532997332242L;
private final boolean fatal;
Added: branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/protocol/stomp/StompChannelHandler.java
===================================================================
--- branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/protocol/stomp/StompChannelHandler.java (rev 0)
+++ branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/protocol/stomp/StompChannelHandler.java 2010-01-21 16:46:55 UTC (rev 8829)
@@ -0,0 +1,273 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.integration.protocol.stomp;
+
+import java.io.ByteArrayOutputStream;
+import java.io.OutputStreamWriter;
+import java.io.PrintWriter;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.hornetq.api.core.HornetQBuffer;
+import org.hornetq.api.core.HornetQBuffers;
+import org.hornetq.api.core.HornetQException;
+import org.hornetq.api.core.SimpleString;
+import org.hornetq.api.core.client.HornetQClient;
+import org.hornetq.core.logging.Logger;
+import org.hornetq.core.remoting.RemotingConnection;
+import org.hornetq.core.server.HornetQServer;
+import org.hornetq.core.server.ServerMessage;
+import org.hornetq.core.server.ServerSession;
+import org.hornetq.core.server.impl.ServerMessageImpl;
+import org.hornetq.integration.transports.netty.AbstractServerChannelHandler;
+import org.hornetq.integration.transports.netty.NettyAcceptor;
+import org.hornetq.integration.transports.netty.ServerHolder;
+import org.hornetq.jms.client.HornetQBytesMessage;
+import org.hornetq.jms.client.HornetQTextMessage;
+import org.hornetq.spi.core.remoting.ConnectionLifeCycleListener;
+import org.hornetq.utils.UUIDGenerator;
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.channel.ChannelPipelineCoverage;
+import org.jboss.netty.channel.MessageEvent;
+import org.jboss.netty.channel.group.ChannelGroup;
+
+ at ChannelPipelineCoverage("one")
+public final class StompChannelHandler extends AbstractServerChannelHandler
+{
+ static final Logger log = Logger.getLogger(StompChannelHandler.class);
+
+ final StompMarshaller marshaller;
+
+ private final Map<RemotingConnection, ServerSession> sessions = new HashMap<RemotingConnection, ServerSession>();
+
+ private ServerHolder serverHandler;
+
+ public StompChannelHandler(ServerHolder serverHolder,
+ final ChannelGroup group,
+ NettyAcceptor acceptor,
+ final ConnectionLifeCycleListener listener)
+ {
+ super(group, listener, acceptor);
+ this.serverHandler = serverHolder;
+ this.marshaller = new StompMarshaller();
+ }
+
+ public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception
+ {
+ StompFrame frame = (StompFrame)e.getMessage();
+ System.out.println(">>> got frame " + frame);
+
+ // need to interact with HornetQ server & session
+ HornetQServer server = serverHandler.getServer();
+ RemotingConnection connection = serverHandler.getRemotingConnection(e.getChannel().getId());
+
+ try
+ {
+
+ String command = frame.getCommand();
+
+ StompFrame response = null;
+ if (Stomp.Commands.CONNECT.equals(command))
+ {
+ response = onConnect(frame, server, connection);
+ }
+ else if (Stomp.Commands.DISCONNECT.equals(command))
+ {
+ response = onDisconnect(frame, server, connection);
+ }
+ else if (Stomp.Commands.SEND.equals(command))
+ {
+ response = onSend(frame, server, connection);
+ }
+ else if (Stomp.Commands.SUBSCRIBE.equals(command))
+ {
+ response = onSubscribe(frame, server, connection);
+ }
+ else
+ {
+ log.error("Unsupported Stomp frame: " + frame);
+ response = new StompFrame(Stomp.Responses.ERROR,
+ new HashMap<String, Object>(),
+ ("Unsupported frame: " + command).getBytes());
+ }
+
+ if (response != null)
+ {
+ System.out.println(">>> will reply " + response);
+ byte[] bytes = marshaller.marshal(response);
+ HornetQBuffer buffer = HornetQBuffers.wrappedBuffer(bytes);
+ System.out.println("ready to send reply: " + buffer);
+ connection.getTransportConnection().write(buffer, true);
+ }
+ }
+ catch (StompException ex)
+ {
+ // Let the stomp client know about any protocol errors.
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ PrintWriter stream = new PrintWriter(new OutputStreamWriter(baos, "UTF-8"));
+ ex.printStackTrace(stream);
+ stream.close();
+
+ Map<String, Object> headers = new HashMap<String, Object>();
+ headers.put(Stomp.Headers.Error.MESSAGE, e.getMessage());
+
+ final String receiptId = (String)frame.getHeaders().get(Stomp.Headers.RECEIPT_REQUESTED);
+ if (receiptId != null)
+ {
+ headers.put(Stomp.Headers.Response.RECEIPT_ID, receiptId);
+ }
+
+ StompFrame errorMessage = new StompFrame(Stomp.Responses.ERROR, headers, baos.toByteArray());
+ byte[] bytes = marshaller.marshal(errorMessage);
+ HornetQBuffer buffer = HornetQBuffers.wrappedBuffer(bytes);
+ System.out.println("ready to send reply: " + buffer);
+ connection.getTransportConnection().write(buffer, true);
+
+ }
+ catch (Exception ex)
+ {
+ ex.printStackTrace();
+ }
+ }
+
+ /**
+ * @param frame
+ * @param server
+ * @param connection
+ * @return
+ * @throws StompException
+ * @throws HornetQException
+ */
+ private StompFrame onSubscribe(StompFrame frame, HornetQServer server, RemotingConnection connection) throws Exception,
+ StompException,
+ HornetQException
+ {
+ Map<String, Object> headers = frame.getHeaders();
+ String queue = (String)headers.get(Stomp.Headers.Send.DESTINATION);
+ SimpleString queueName = StompDestinationConverter.toHornetQAddress(queue);
+
+ ServerSession session = checkAndGetSession(connection);
+ long consumerID = server.getStorageManager().generateUniqueID();
+ session.createConsumer(consumerID, queueName, null, false);
+ session.receiveConsumerCredits(consumerID, -1);
+ session.start();
+
+ return null;
+ }
+
+ private ServerSession checkAndGetSession(RemotingConnection connection) throws StompException
+ {
+ ServerSession session = sessions.get(connection);
+ if (session == null)
+ {
+ throw new StompException("Not connected");
+ }
+ return session;
+ }
+
+ private StompFrame onDisconnect(StompFrame frame, HornetQServer server, RemotingConnection connection) throws StompException
+ {
+ ServerSession session = checkAndGetSession(connection);
+ if (session != null)
+ {
+ try
+ {
+ session.close();
+ }
+ catch (Exception e)
+ {
+ throw new StompException(e.getMessage());
+ }
+ sessions.remove(connection);
+ }
+ return null;
+ }
+
+ private StompFrame onSend(StompFrame frame, HornetQServer server, RemotingConnection connection) throws Exception
+ {
+ ServerSession session = checkAndGetSession(connection);
+
+ Map<String, Object> headers = frame.getHeaders();
+ String queue = (String)headers.get(Stomp.Headers.Send.DESTINATION);
+ /*
+ String type = (String)headers.get(Stomp.Headers.Send.TYPE);
+ long expiration = (Long)headers.get(Stomp.Headers.Send.EXPIRATION_TIME);
+ byte priority = (Byte)headers.get(Stomp.Headers.Send.PRIORITY);
+ boolean durable = (Boolean)headers.get(Stomp.Headers.Send.PERSISTENT);
+ */
+ byte type = HornetQTextMessage.TYPE;
+ if (headers.containsKey(Stomp.Headers.CONTENT_LENGTH))
+ {
+ type = HornetQBytesMessage.TYPE;
+ }
+ long timestamp = System.currentTimeMillis();
+ boolean durable = false;
+ long expiration = -1;
+ byte priority = 9;
+ SimpleString address = StompDestinationConverter.toHornetQAddress(queue);
+
+ ServerMessage message = new ServerMessageImpl(server.getStorageManager().generateUniqueID(), 512);
+ message.setType(type);
+ message.setTimestamp(timestamp);
+ message.setAddress(address);
+ byte[] content = frame.getContent();
+ if (type == HornetQTextMessage.TYPE)
+ {
+ message.getBodyBuffer().writeNullableSimpleString(SimpleString.toSimpleString(new String(content)));
+ }
+ else
+ {
+ message.getBodyBuffer().writeBytes(content);
+ }
+
+ session.send(message);
+ if (headers.containsKey(Stomp.Headers.RECEIPT_REQUESTED))
+ {
+ Map<String, Object> h = new HashMap<String, Object>();
+ h.put(Stomp.Headers.Response.RECEIPT_ID, headers.get(Stomp.Headers.RECEIPT_REQUESTED));
+ return new StompFrame(Stomp.Responses.RECEIPT, h, new byte[] {});
+ }
+ else
+ {
+ return null;
+ }
+ }
+
+ private StompFrame onConnect(StompFrame frame, HornetQServer server, final RemotingConnection connection) throws Exception
+ {
+ Map<String, Object> headers = frame.getHeaders();
+ String login = (String)headers.get(Stomp.Headers.Connect.LOGIN);
+ String passcode = (String)headers.get(Stomp.Headers.Connect.PASSCODE);
+ String requestID = (String)headers.get(Stomp.Headers.Connect.REQUEST_ID);
+
+ String name = UUIDGenerator.getInstance().generateStringUUID();
+ server.createSession(name,
+ login,
+ passcode,
+ HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE,
+ connection,
+ true,
+ true,
+ false,
+ false,
+ new StompSessionCallback(marshaller, connection));
+ ServerSession session = server.getSession(name);
+ sessions.put(connection, session);
+ System.out.println(">>> created session " + session);
+ HashMap<String, Object> h = new HashMap<String, Object>();
+ h.put(Stomp.Headers.Connected.SESSION, name);
+ h.put(Stomp.Headers.Connected.RESPONSE_ID, requestID);
+ return new StompFrame(Stomp.Responses.CONNECTED, h, new byte[] {});
+ }
+}
\ No newline at end of file
Modified: branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/protocol/stomp/StompDestinationConverter.java
===================================================================
--- branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/protocol/stomp/StompDestinationConverter.java 2010-01-21 15:40:24 UTC (rev 8828)
+++ branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/protocol/stomp/StompDestinationConverter.java 2010-01-21 16:46:55 UTC (rev 8829)
@@ -27,7 +27,7 @@
*
*
*/
-public class StompDestinationConverter
+class StompDestinationConverter
{
// Constants -----------------------------------------------------
@@ -36,65 +36,69 @@
// Static --------------------------------------------------------
- public static SimpleString convertDestination(String name) throws HornetQException
+ public static SimpleString toHornetQAddress(String stompDestination) throws HornetQException
{
- if (name == null)
+ if (stompDestination == null)
{
throw new HornetQException(HornetQException.ILLEGAL_STATE, "No destination is specified!");
}
- else if (name.startsWith("/queue/"))
+ else if (stompDestination.startsWith("/queue/"))
{
- String queueName = name.substring("/queue/".length(), name.length());
+ String queueName = stompDestination.substring("/queue/".length(), stompDestination.length());
return HornetQQueue.createAddressFromName(queueName);
}
- else if (name.startsWith("/topic/"))
+ else if (stompDestination.startsWith("/topic/"))
{
- String topicName = name.substring("/topic/".length(), name.length());
+ String topicName = stompDestination.substring("/topic/".length(), stompDestination.length());
return HornetQTopic.createAddressFromName(topicName);
}
- else if (name.startsWith("/temp-queue/"))
+ else if (stompDestination.startsWith("/temp-queue/"))
{
- String tempName = name.substring("/temp-queue/".length(), name.length());
+ String tempName = stompDestination.substring("/temp-queue/".length(), stompDestination.length());
return HornetQTemporaryQueue.createAddressFromName(tempName);
}
- else if (name.startsWith("/temp-topic/"))
+ else if (stompDestination.startsWith("/temp-topic/"))
{
- String tempName = name.substring("/temp-topic/".length(), name.length());
+ String tempName = stompDestination.substring("/temp-topic/".length(), stompDestination.length());
return HornetQTemporaryTopic.createAddressFromName(tempName);
}
else
{
- throw new HornetQException(HornetQException.ILLEGAL_STATE, "Illegal destination name: [" + name +
+ throw new HornetQException(HornetQException.ILLEGAL_STATE, "Illegal destination name: [" + stompDestination +
"] -- StompConnect destinations " +
"must begine with one of: /queue/ /topic/ /temp-queue/ /temp-topic/");
}
}
- public static String toStomp(String address) throws HornetQException
+ public static String toStompDestination(String hornetqAddress) throws HornetQException
{
- if (address == null)
+ if (hornetqAddress == null)
{
throw new HornetQException(HornetQException.ILLEGAL_STATE, "No destination is specified!");
}
- else if (address.startsWith(HornetQQueue.JMS_QUEUE_ADDRESS_PREFIX))
+ else if (hornetqAddress.startsWith(HornetQQueue.JMS_QUEUE_ADDRESS_PREFIX))
{
- return "/queue/" + address.substring(HornetQQueue.JMS_QUEUE_ADDRESS_PREFIX.length(), address.length());
+ return "/queue/" + hornetqAddress.substring(HornetQQueue.JMS_QUEUE_ADDRESS_PREFIX.length(),
+ hornetqAddress.length());
}
- else if (address.startsWith(HornetQTemporaryQueue.JMS_TEMP_QUEUE_ADDRESS_PREFIX))
+ else if (hornetqAddress.startsWith(HornetQTemporaryQueue.JMS_TEMP_QUEUE_ADDRESS_PREFIX))
{
- return "/temp-queue/" + address.substring(HornetQTemporaryQueue.JMS_TEMP_QUEUE_ADDRESS_PREFIX.length(), address.length());
+ return "/temp-queue/" + hornetqAddress.substring(HornetQTemporaryQueue.JMS_TEMP_QUEUE_ADDRESS_PREFIX.length(),
+ hornetqAddress.length());
}
- else if (address.startsWith(HornetQTopic.JMS_TOPIC_ADDRESS_PREFIX))
+ else if (hornetqAddress.startsWith(HornetQTopic.JMS_TOPIC_ADDRESS_PREFIX))
{
- return "/topic/" + address.substring(HornetQTopic.JMS_TOPIC_ADDRESS_PREFIX.length(), address.length());
+ return "/topic/" + hornetqAddress.substring(HornetQTopic.JMS_TOPIC_ADDRESS_PREFIX.length(),
+ hornetqAddress.length());
}
- else if (address.startsWith(HornetQTemporaryTopic.JMS_TEMP_TOPIC_ADDRESS_PREFIX))
+ else if (hornetqAddress.startsWith(HornetQTemporaryTopic.JMS_TEMP_TOPIC_ADDRESS_PREFIX))
{
- return "/temp-topic/" + address.substring(HornetQTemporaryTopic.JMS_TEMP_TOPIC_ADDRESS_PREFIX.length(), address.length());
+ return "/temp-topic/" + hornetqAddress.substring(HornetQTemporaryTopic.JMS_TEMP_TOPIC_ADDRESS_PREFIX.length(),
+ hornetqAddress.length());
}
else
{
- throw new HornetQException(HornetQException.ILLEGAL_STATE, "Illegal address name: [" + address +
+ throw new HornetQException(HornetQException.ILLEGAL_STATE, "Illegal address name: [" + hornetqAddress +
"] -- Acceptable address must comply to JMS semantics");
}
}
Modified: branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/protocol/stomp/StompException.java
===================================================================
--- branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/protocol/stomp/StompException.java 2010-01-21 15:40:24 UTC (rev 8828)
+++ branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/protocol/stomp/StompException.java 2010-01-21 16:46:55 UTC (rev 8829)
@@ -16,11 +16,9 @@
/**
* A StompException
*
- * @author jmesnil
- *
- *
+ * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
*/
-public class StompException extends Exception
+class StompException extends Exception
{
/**
Modified: branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/protocol/stomp/StompFrame.java
===================================================================
--- branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/protocol/stomp/StompFrame.java 2010-01-21 15:40:24 UTC (rev 8828)
+++ branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/protocol/stomp/StompFrame.java 2010-01-21 16:46:55 UTC (rev 8829)
@@ -25,7 +25,7 @@
*
* @author <a href="http://hiramchirino.com">chirino</a>
*/
-public class StompFrame
+class StompFrame
{
private static final byte[] NO_DATA = new byte[] {};
Modified: branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/protocol/stomp/StompFrameError.java
===================================================================
--- branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/protocol/stomp/StompFrameError.java 2010-01-21 15:40:24 UTC (rev 8828)
+++ branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/protocol/stomp/StompFrameError.java 2010-01-21 16:46:55 UTC (rev 8829)
@@ -22,7 +22,7 @@
*
* @author <a href="http://hiramchirino.com">chirino</a>
*/
-public class StompFrameError extends StompFrame {
+class StompFrameError extends StompFrame {
private final ProtocolException exception;
public StompFrameError(ProtocolException exception) {
Modified: branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/protocol/stomp/StompMarshaller.java
===================================================================
--- branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/protocol/stomp/StompMarshaller.java 2010-01-21 15:40:24 UTC (rev 8828)
+++ branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/protocol/stomp/StompMarshaller.java 2010-01-21 16:46:55 UTC (rev 8829)
@@ -30,7 +30,7 @@
/**
* Implements marshalling and unmarsalling the <a href="http://stomp.codehaus.org/">Stomp</a> protocol.
*/
-public class StompMarshaller {
+class StompMarshaller {
private static final byte[] NO_DATA = new byte[]{};
private static final byte[] END_OF_FRAME = new byte[]{0, '\n'};
private static final int MAX_COMMAND_LENGTH = 1024;
Modified: branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/protocol/stomp/StompPacketDecoder.java
===================================================================
--- branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/protocol/stomp/StompPacketDecoder.java 2010-01-21 15:40:24 UTC (rev 8828)
+++ branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/protocol/stomp/StompPacketDecoder.java 2010-01-21 16:46:55 UTC (rev 8829)
@@ -34,9 +34,9 @@
// PacketDecoder implementation ----------------------------------
- public StompPacketDecoder(final StompMarshaller marshaller)
+ public StompPacketDecoder()
{
- this.marshaller = marshaller;
+ this.marshaller = new StompMarshaller();
}
@Override
Added: branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/protocol/stomp/StompSessionCallback.java
===================================================================
--- branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/protocol/stomp/StompSessionCallback.java (rev 0)
+++ branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/protocol/stomp/StompSessionCallback.java 2010-01-21 16:46:55 UTC (rev 8829)
@@ -0,0 +1,93 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package org.hornetq.integration.protocol.stomp;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.hornetq.api.core.HornetQBuffer;
+import org.hornetq.api.core.HornetQBuffers;
+import org.hornetq.api.core.SimpleString;
+import org.hornetq.core.remoting.RemotingConnection;
+import org.hornetq.core.server.ServerMessage;
+import org.hornetq.core.server.SessionCallback;
+import org.hornetq.jms.client.HornetQTextMessage;
+
+/**
+ * A StompSessionCallback
+ *
+ * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
+ */
+class StompSessionCallback implements SessionCallback
+{
+ private final RemotingConnection connection;
+
+ private final StompMarshaller marshaller;
+
+ StompSessionCallback(final StompMarshaller marshaller, final RemotingConnection connection)
+ {
+ this.marshaller = marshaller;
+ this.connection = connection;
+ }
+
+ public void sendProducerCreditsMessage(int credits, SimpleString address, int offset)
+ {
+ }
+
+ public int sendMessage(ServerMessage serverMessage, long consumerID, int deliveryCount)
+ {
+ try
+ {
+ Map<String, Object> headers = new HashMap<String, Object>();
+ headers.put(Stomp.Headers.Message.DESTINATION, StompDestinationConverter.toStompDestination(serverMessage.getAddress()
+ .toString()));
+ byte[] data = new byte[] {};
+ if (serverMessage.getType() == HornetQTextMessage.TYPE)
+ {
+ SimpleString text = serverMessage.getBodyBuffer().readNullableSimpleString();
+ if (text != null)
+ {
+ data = text.toString().getBytes();
+ }
+ }
+ StompFrame msg = new StompFrame(Stomp.Responses.MESSAGE, headers, data);
+ System.out.println("SENDING : " + msg);
+ byte[] bytes = marshaller.marshal(msg);
+ HornetQBuffer buffer = HornetQBuffers.wrappedBuffer(bytes);
+ connection.getTransportConnection().write(buffer, true);
+
+ return bytes.length;
+
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ return 0;
+ }
+
+ }
+
+ public int sendLargeMessageContinuation(long consumerID, byte[] body, boolean continues, boolean requiresResponse)
+ {
+ return 0;
+ }
+
+ public int sendLargeMessage(long consumerID, byte[] headerBuffer, long bodySize, int deliveryCount)
+ {
+ return 0;
+ }
+
+ public void closed()
+ {
+ }
+}
\ No newline at end of file
Added: branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/AbstractServerChannelHandler.java
===================================================================
--- branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/AbstractServerChannelHandler.java (rev 0)
+++ branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/AbstractServerChannelHandler.java 2010-01-21 16:46:55 UTC (rev 8829)
@@ -0,0 +1,64 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.integration.transports.netty;
+
+import org.hornetq.spi.core.remoting.ConnectionLifeCycleListener;
+import org.jboss.netty.channel.ChannelFuture;
+import org.jboss.netty.channel.ChannelFutureListener;
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.channel.ChannelPipelineCoverage;
+import org.jboss.netty.channel.ChannelStateEvent;
+import org.jboss.netty.channel.group.ChannelGroup;
+import org.jboss.netty.handler.ssl.SslHandler;
+
+ at ChannelPipelineCoverage("one")
+public abstract class AbstractServerChannelHandler extends HornetQChannelHandler
+{
+ private NettyAcceptor acceptor;
+
+ protected AbstractServerChannelHandler(final ChannelGroup group, final ConnectionLifeCycleListener listener, NettyAcceptor acceptor)
+ {
+ super(group, listener);
+ this.acceptor = acceptor;
+ }
+
+ @Override
+ public void channelConnected(final ChannelHandlerContext ctx, final ChannelStateEvent e) throws Exception
+ {
+ new NettyConnection(e.getChannel(), acceptor.newListener());
+
+ SslHandler sslHandler = ctx.getPipeline().get(SslHandler.class);
+ if (sslHandler != null)
+ {
+ sslHandler.handshake(e.getChannel()).addListener(new ChannelFutureListener()
+ {
+ public void operationComplete(final ChannelFuture future) throws Exception
+ {
+ if (future.isSuccess())
+ {
+ active = true;
+ }
+ else
+ {
+ future.getChannel().close();
+ }
+ }
+ });
+ }
+ else
+ {
+ active = true;
+ }
+ }
+}
\ No newline at end of file
Modified: branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/ChannelPipelineSupport.java
===================================================================
--- branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/ChannelPipelineSupport.java 2010-01-21 15:40:24 UTC (rev 8828)
+++ branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/ChannelPipelineSupport.java 2010-01-21 16:46:55 UTC (rev 8829)
@@ -17,7 +17,6 @@
import javax.net.ssl.SSLEngine;
import org.hornetq.integration.protocol.stomp.StompFrameDelimiter;
-import org.hornetq.integration.protocol.stomp.StompMarshaller;
import org.hornetq.integration.protocol.stomp.StompPacketDecoder;
import org.hornetq.spi.core.remoting.BufferHandler;
import org.jboss.netty.channel.ChannelPipeline;
@@ -50,9 +49,8 @@
public static void addStompStack(final ChannelPipeline pipeline, final ServerHolder serverHandler)
{
assert pipeline != null;
- StompMarshaller marshaller = new StompMarshaller();
pipeline.addLast("delimiter", new StompFrameDelimiter());
- pipeline.addLast("codec", new StompPacketDecoder(marshaller));
+ pipeline.addLast("codec", new StompPacketDecoder());
}
public static void addHornetQCodecFilter(final ChannelPipeline pipeline, final BufferHandler handler)
Modified: branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/NettyAcceptor.java
===================================================================
--- branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/NettyAcceptor.java 2010-01-21 15:40:24 UTC (rev 8828)
+++ branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/NettyAcceptor.java 2010-01-21 16:46:55 UTC (rev 8829)
@@ -13,12 +13,8 @@
package org.hornetq.integration.transports.netty;
-import java.io.ByteArrayOutputStream;
-import java.io.OutputStreamWriter;
-import java.io.PrintWriter;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
-import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@@ -30,52 +26,31 @@
import javax.net.ssl.SSLContext;
-import org.hornetq.api.core.HornetQBuffer;
-import org.hornetq.api.core.HornetQBuffers;
import org.hornetq.api.core.HornetQException;
import org.hornetq.api.core.SimpleString;
import org.hornetq.api.core.TransportConfiguration;
-import org.hornetq.api.core.client.HornetQClient;
import org.hornetq.api.core.management.NotificationType;
import org.hornetq.core.buffers.impl.ChannelBufferWrapper;
import org.hornetq.core.logging.Logger;
-import org.hornetq.core.protocol.core.CoreRemotingConnection;
import org.hornetq.core.remoting.ProtocolType;
-import org.hornetq.core.remoting.RemotingConnection;
import org.hornetq.core.remoting.impl.ssl.SSLSupport;
-import org.hornetq.core.server.HornetQServer;
-import org.hornetq.core.server.ServerMessage;
-import org.hornetq.core.server.ServerSession;
-import org.hornetq.core.server.SessionCallback;
-import org.hornetq.core.server.impl.ServerMessageImpl;
import org.hornetq.core.server.management.Notification;
import org.hornetq.core.server.management.NotificationService;
-import org.hornetq.integration.protocol.stomp.Stomp;
-import org.hornetq.integration.protocol.stomp.StompDestinationConverter;
-import org.hornetq.integration.protocol.stomp.StompException;
-import org.hornetq.integration.protocol.stomp.StompFrame;
-import org.hornetq.integration.protocol.stomp.StompMarshaller;
-import org.hornetq.jms.client.HornetQBytesMessage;
-import org.hornetq.jms.client.HornetQTextMessage;
+import org.hornetq.integration.protocol.stomp.StompChannelHandler;
import org.hornetq.spi.core.remoting.Acceptor;
import org.hornetq.spi.core.remoting.BufferHandler;
import org.hornetq.spi.core.remoting.Connection;
import org.hornetq.spi.core.remoting.ConnectionLifeCycleListener;
import org.hornetq.utils.ConfigurationHelper;
import org.hornetq.utils.TypedProperties;
-import org.hornetq.utils.UUIDGenerator;
import org.hornetq.utils.VersionLoader;
import org.jboss.netty.bootstrap.ServerBootstrap;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFactory;
-import org.jboss.netty.channel.ChannelFuture;
-import org.jboss.netty.channel.ChannelFutureListener;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelPipeline;
-import org.jboss.netty.channel.ChannelPipelineCoverage;
import org.jboss.netty.channel.ChannelPipelineFactory;
-import org.jboss.netty.channel.ChannelStateEvent;
import org.jboss.netty.channel.Channels;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.group.ChannelGroup;
@@ -87,7 +62,6 @@
import org.jboss.netty.channel.socket.oio.OioServerSocketChannelFactory;
import org.jboss.netty.handler.codec.http.HttpRequestDecoder;
import org.jboss.netty.handler.codec.http.HttpResponseEncoder;
-import org.jboss.netty.handler.ssl.SslHandler;
import org.jboss.netty.util.Version;
import org.jboss.netty.util.VirtualExecutorService;
@@ -102,7 +76,7 @@
*/
public class NettyAcceptor implements Acceptor
{
- private static final Logger log = Logger.getLogger(NettyAcceptor.class);
+ static final Logger log = Logger.getLogger(NettyAcceptor.class);
private ChannelFactory channelFactory;
@@ -319,8 +293,8 @@
{
ChannelPipelineSupport.addStompStack(pipeline, serverHandler);
pipeline.addLast("handler", new StompChannelHandler(serverHandler,
- new StompMarshaller(),
channelGroup,
+ NettyAcceptor.this,
new Listener()));
}
else
@@ -516,6 +490,11 @@
this.notificationService = notificationService;
}
+ public ConnectionLifeCycleListener newListener()
+ {
+ return new Listener();
+ }
+
// Inner classes -----------------------------------------------------------------------------
private final class HornetQServerChannelHandler extends AbstractServerChannelHandler
@@ -526,7 +505,7 @@
final BufferHandler handler,
final ConnectionLifeCycleListener listener)
{
- super(group, listener);
+ super(group, listener, NettyAcceptor.this);
this.handler = handler;
}
@@ -541,329 +520,8 @@
}
- @ChannelPipelineCoverage("one")
- public final class StompChannelHandler extends AbstractServerChannelHandler
+ class Listener implements ConnectionLifeCycleListener
{
- private final StompMarshaller marshaller;
-
- private final Map<RemotingConnection, ServerSession> sessions = new HashMap<RemotingConnection, ServerSession>();
-
- private ServerHolder serverHandler;
-
- public StompChannelHandler(ServerHolder serverHolder,
- StompMarshaller marshaller,
- final ChannelGroup group,
- final ConnectionLifeCycleListener listener)
- {
- super(group, listener);
- this.serverHandler = serverHolder;
- this.marshaller = marshaller;
- }
-
- public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception
- {
- StompFrame frame = (StompFrame)e.getMessage();
- System.out.println(">>> got frame " + frame);
-
- // need to interact with HornetQ server & session
- HornetQServer server = serverHandler.getServer();
- CoreRemotingConnection connection = serverHandler.getRemotingConnection(e.getChannel().getId());
-
- try
- {
-
- String command = frame.getCommand();
-
- StompFrame response = null;
- if (Stomp.Commands.CONNECT.equals(command))
- {
- response = onConnect(frame, server, connection);
- }
- else if (Stomp.Commands.DISCONNECT.equals(command))
- {
- response = onDisconnect(frame, server, connection);
- }
- else if (Stomp.Commands.SEND.equals(command))
- {
- response = onSend(frame, server, connection);
- }
- else if (Stomp.Commands.SUBSCRIBE.equals(command))
- {
- response = onSubscribe(frame, server, connection);
- }
- else
- {
- log.error("Unsupported Stomp frame: " + frame);
- response = new StompFrame(Stomp.Responses.ERROR,
- new HashMap<String, Object>(),
- ("Unsupported frame: " + command).getBytes());
- }
-
- if (response != null)
- {
- System.out.println(">>> will reply " + response);
- byte[] bytes = marshaller.marshal(response);
- HornetQBuffer buffer = HornetQBuffers.wrappedBuffer(bytes);
- System.out.println("ready to send reply: " + buffer);
- connection.getTransportConnection().write(buffer, true);
- }
- }
- catch (StompException ex)
- {
- // Let the stomp client know about any protocol errors.
- ByteArrayOutputStream baos = new ByteArrayOutputStream();
- PrintWriter stream = new PrintWriter(new OutputStreamWriter(baos, "UTF-8"));
- ex.printStackTrace(stream);
- stream.close();
-
- Map<String, Object> headers = new HashMap<String, Object>();
- headers.put(Stomp.Headers.Error.MESSAGE, e.getMessage());
-
- final String receiptId = (String)frame.getHeaders().get(Stomp.Headers.RECEIPT_REQUESTED);
- if (receiptId != null)
- {
- headers.put(Stomp.Headers.Response.RECEIPT_ID, receiptId);
- }
-
- StompFrame errorMessage = new StompFrame(Stomp.Responses.ERROR, headers, baos.toByteArray());
- byte[] bytes = marshaller.marshal(errorMessage);
- HornetQBuffer buffer = HornetQBuffers.wrappedBuffer(bytes);
- System.out.println("ready to send reply: " + buffer);
- connection.getTransportConnection().write(buffer, true);
-
- }
- catch (Exception ex)
- {
- ex.printStackTrace();
- }
- }
-
- /**
- * @param frame
- * @param server
- * @param connection
- * @return
- * @throws StompException
- * @throws HornetQException
- */
- private StompFrame onSubscribe(StompFrame frame, HornetQServer server, RemotingConnection connection) throws Exception,
- StompException,
- HornetQException
- {
- Map<String, Object> headers = frame.getHeaders();
- String queue = (String)headers.get(Stomp.Headers.Send.DESTINATION);
- SimpleString queueName = StompDestinationConverter.convertDestination(queue);
-
- ServerSession session = checkAndGetSession(connection);
- long consumerID = server.getStorageManager().generateUniqueID();
- session.createConsumer(consumerID, queueName, null, false);
- session.receiveConsumerCredits(consumerID, -1);
- session.start();
-
- return null;
- }
-
- private ServerSession checkAndGetSession(RemotingConnection connection) throws StompException
- {
- ServerSession session = sessions.get(connection);
- if (session == null)
- {
- throw new StompException("Not connected");
- }
- return session;
- }
-
- private StompFrame onDisconnect(StompFrame frame, HornetQServer server, RemotingConnection connection) throws StompException
- {
- ServerSession session = checkAndGetSession(connection);
- if (session != null)
- {
- try
- {
- session.close();
- }
- catch (Exception e)
- {
- throw new StompException(e.getMessage());
- }
- sessions.remove(connection);
- }
- return null;
- }
-
- private StompFrame onSend(StompFrame frame, HornetQServer server, RemotingConnection connection) throws Exception
- {
- ServerSession session = checkAndGetSession(connection);
-
- Map<String, Object> headers = frame.getHeaders();
- String queue = (String)headers.get(Stomp.Headers.Send.DESTINATION);
- /*
- String type = (String)headers.get(Stomp.Headers.Send.TYPE);
- long expiration = (Long)headers.get(Stomp.Headers.Send.EXPIRATION_TIME);
- byte priority = (Byte)headers.get(Stomp.Headers.Send.PRIORITY);
- boolean durable = (Boolean)headers.get(Stomp.Headers.Send.PERSISTENT);
- */
- byte type = HornetQTextMessage.TYPE;
- if (headers.containsKey(Stomp.Headers.CONTENT_LENGTH))
- {
- type = HornetQBytesMessage.TYPE;
- }
- long timestamp = System.currentTimeMillis();
- boolean durable = false;
- long expiration = -1;
- byte priority = 9;
- SimpleString address = StompDestinationConverter.convertDestination(queue);
-
- ServerMessage message = new ServerMessageImpl(server.getStorageManager().generateUniqueID(), 512);
- message.setType(type);
- message.setTimestamp(timestamp);
- message.setAddress(address);
- byte[] content = frame.getContent();
- if (type == HornetQTextMessage.TYPE)
- {
- message.getBodyBuffer().writeNullableSimpleString(SimpleString.toSimpleString(new String(content)));
- }
- else
- {
- message.getBodyBuffer().writeBytes(content);
- }
-
- session.send(message);
- if (headers.containsKey(Stomp.Headers.RECEIPT_REQUESTED))
- {
- Map<String, Object> h = new HashMap<String, Object>();
- h.put(Stomp.Headers.Response.RECEIPT_ID, headers.get(Stomp.Headers.RECEIPT_REQUESTED));
- return new StompFrame(Stomp.Responses.RECEIPT, h, new byte[] {});
- }
- else
- {
- return null;
- }
- }
-
- private StompFrame onConnect(StompFrame frame, HornetQServer server, final CoreRemotingConnection connection) throws Exception
- {
- Map<String, Object> headers = frame.getHeaders();
- String login = (String)headers.get(Stomp.Headers.Connect.LOGIN);
- String passcode = (String)headers.get(Stomp.Headers.Connect.PASSCODE);
- String requestID = (String)headers.get(Stomp.Headers.Connect.REQUEST_ID);
-
- String name = UUIDGenerator.getInstance().generateStringUUID();
- server.createSession(name,
- login,
- passcode,
- HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE,
- connection,
- true,
- true,
- false,
- false);
- ServerSession session = server.getSession(name);
- session.setCallback(new SessionCallback()
- {
- public void sendProducerCreditsMessage(int credits, SimpleString address, int offset)
- {
- }
-
- public int sendMessage(ServerMessage serverMessage, long consumerID, int deliveryCount)
- {
- try
- {
- Map<String, Object> headers = new HashMap<String, Object>();
- headers.put(Stomp.Headers.Message.DESTINATION,
- StompDestinationConverter.toStomp(serverMessage.getAddress().toString()));
- byte[] data = new byte[] {};
- if (serverMessage.getType() == HornetQTextMessage.TYPE)
- {
- SimpleString text = serverMessage.getBodyBuffer().readNullableSimpleString();
- if (text != null)
- {
- data = text.toString().getBytes();
- }
- }
- StompFrame msg = new StompFrame(Stomp.Responses.MESSAGE, headers, data);
- System.out.println("SENDING : " + msg);
- byte[] bytes = marshaller.marshal(msg);
- HornetQBuffer buffer = HornetQBuffers.wrappedBuffer(bytes);
- connection.getTransportConnection().write(buffer, true);
-
- return bytes.length;
-
- }
- catch (Exception e)
- {
- e.printStackTrace();
- return 0;
- }
-
- }
-
- public int sendLargeMessageContinuation(long consumerID,
- byte[] body,
- boolean continues,
- boolean requiresResponse)
- {
- return 0;
- }
-
- public int sendLargeMessage(long consumerID, byte[] headerBuffer, long bodySize, int deliveryCount)
- {
- return 0;
- }
-
- public void closed()
- {
- }
- });
- sessions.put(connection, session);
- System.out.println(">>> created session " + session);
- HashMap<String, Object> h = new HashMap<String, Object>();
- h.put(Stomp.Headers.Connected.SESSION, name);
- h.put(Stomp.Headers.Connected.RESPONSE_ID, requestID);
- return new StompFrame(Stomp.Responses.CONNECTED, h, new byte[] {});
- }
- }
-
- @ChannelPipelineCoverage("one")
- public abstract class AbstractServerChannelHandler extends HornetQChannelHandler
- {
- protected AbstractServerChannelHandler(final ChannelGroup group, final ConnectionLifeCycleListener listener)
- {
- super(group, listener);
- }
-
- @Override
- public void channelConnected(final ChannelHandlerContext ctx, final ChannelStateEvent e) throws Exception
- {
- new NettyConnection(e.getChannel(), new Listener());
-
- SslHandler sslHandler = ctx.getPipeline().get(SslHandler.class);
- if (sslHandler != null)
- {
- sslHandler.handshake(e.getChannel()).addListener(new ChannelFutureListener()
- {
- public void operationComplete(final ChannelFuture future) throws Exception
- {
- if (future.isSuccess())
- {
- active = true;
- }
- else
- {
- future.getChannel().close();
- }
- }
- });
- }
- else
- {
- active = true;
- }
- }
- }
-
- private class Listener implements ConnectionLifeCycleListener
- {
public void connectionCreated(final Connection connection, final ProtocolType protocol)
{
if (connections.putIfAbsent(connection.getID(), connection) != null)
@@ -896,4 +554,5 @@
}
}
+
}
Modified: branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/ServerHolder.java
===================================================================
--- branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/ServerHolder.java 2010-01-21 15:40:24 UTC (rev 8828)
+++ branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/ServerHolder.java 2010-01-21 16:46:55 UTC (rev 8829)
@@ -13,7 +13,7 @@
package org.hornetq.integration.transports.netty;
-import org.hornetq.core.protocol.core.CoreRemotingConnection;
+import org.hornetq.core.remoting.RemotingConnection;
import org.hornetq.core.server.HornetQServer;
/**
@@ -27,6 +27,5 @@
{
HornetQServer getServer();
- // FIXME should NOT be CoreRemotingConnection but RemotingConnection
- CoreRemotingConnection getRemotingConnection(int connectionID);
+ RemotingConnection getRemotingConnection(int connectionID);
}
Modified: branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/integration/stomp/StompTest.java
===================================================================
--- branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/integration/stomp/StompTest.java 2010-01-21 15:40:24 UTC (rev 8828)
+++ branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/integration/stomp/StompTest.java 2010-01-21 16:46:55 UTC (rev 8829)
@@ -849,6 +849,7 @@
{
Configuration config = new ConfigurationImpl();
config.setSecurityEnabled(false);
+ config.setPersistenceEnabled(false);
Map<String, Object> params = new HashMap<String, Object>();
params.put(TransportConstants.PROTOCOL_PROP_NAME, TransportConstants.STOMP_PROTOCOL);
More information about the hornetq-commits
mailing list