Author: jmesnil
Date: 2010-01-20 06:02:05 -0500 (Wed, 20 Jan 2010)
New Revision: 8809
Added:
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/ServerHolder.java
branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/DummyServerHolder.java
Removed:
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/stomp/ProtocolConverter.java
Modified:
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/remoting/impl/invm/InVMAcceptor.java
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/remoting/impl/invm/InVMAcceptorFactory.java
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/stomp/StompPacketDecoder.java
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/ChannelPipelineSupport.java
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/HornetQChannelHandler.java
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/NettyAcceptor.java
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/NettyAcceptorFactory.java
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/NettyConnector.java
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/spi/core/remoting/AcceptorFactory.java
branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/integration/http/NettyHttpTest.java
branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyAcceptorFactoryTest.java
branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyAcceptorTest.java
Log:
https://jira.jboss.org/jira/browse/HORNETQ-129: Implement STOMP v1.0
* barely enough ugly code to make StompTest.testConnect pass :)
Modified:
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/remoting/impl/invm/InVMAcceptor.java
===================================================================
---
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/remoting/impl/invm/InVMAcceptor.java 2010-01-20
09:17:21 UTC (rev 8808)
+++
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/remoting/impl/invm/InVMAcceptor.java 2010-01-20
11:02:05 UTC (rev 8809)
@@ -23,6 +23,7 @@
import org.hornetq.core.logging.Logger;
import org.hornetq.core.server.management.Notification;
import org.hornetq.core.server.management.NotificationService;
+import org.hornetq.integration.transports.netty.ServerHolder;
import org.hornetq.spi.core.remoting.Acceptor;
import org.hornetq.spi.core.remoting.BufferHandler;
import org.hornetq.spi.core.remoting.Connection;
@@ -60,6 +61,7 @@
public InVMAcceptor(final Map<String, Object> configuration,
final BufferHandler handler,
+ final ServerHolder holder,
final ConnectionLifeCycleListener listener,
final Executor threadPool)
{
Modified:
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/remoting/impl/invm/InVMAcceptorFactory.java
===================================================================
---
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/remoting/impl/invm/InVMAcceptorFactory.java 2010-01-20
09:17:21 UTC (rev 8808)
+++
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/remoting/impl/invm/InVMAcceptorFactory.java 2010-01-20
11:02:05 UTC (rev 8809)
@@ -17,6 +17,7 @@
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
+import org.hornetq.integration.transports.netty.ServerHolder;
import org.hornetq.spi.core.remoting.Acceptor;
import org.hornetq.spi.core.remoting.AcceptorFactory;
import org.hornetq.spi.core.remoting.BufferHandler;
@@ -32,11 +33,12 @@
{
public Acceptor createAcceptor(final Map<String, Object> configuration,
final BufferHandler handler,
+ final ServerHolder holder,
final ConnectionLifeCycleListener listener,
final Executor threadPool,
final ScheduledExecutorService scheduledThreadPool)
{
- return new InVMAcceptor(configuration, handler, listener, threadPool);
+ return new InVMAcceptor(configuration, handler, holder, listener, threadPool);
}
public Set<String> getAllowableProperties()
Modified:
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java
===================================================================
---
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java 2010-01-20
09:17:21 UTC (rev 8808)
+++
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java 2010-01-20
11:02:05 UTC (rev 8809)
@@ -41,6 +41,7 @@
import org.hornetq.core.server.HornetQServer;
import org.hornetq.core.server.impl.HornetQPacketHandler;
import org.hornetq.core.server.management.ManagementService;
+import org.hornetq.integration.transports.netty.ServerHolder;
import org.hornetq.spi.core.remoting.Acceptor;
import org.hornetq.spi.core.remoting.AcceptorFactory;
import org.hornetq.spi.core.remoting.BufferHandler;
@@ -161,6 +162,27 @@
Acceptor acceptor = factory.createAcceptor(info.getParams(),
bufferHandler,
+ new ServerHolder()
+ {
+ public HornetQServer getServer()
+ {
+ return server;
+ }
+
+ public RemotingConnection getRemotingConnection(int connectionID)
+ {
+ ConnectionEntry conn = connections.get(connectionID);
+
+ if (conn != null)
+ {
+ return conn.connection;
+ }
+ else
+ {
+ return null;
+ }
+ }
+ },
this,
threadPool,
scheduledThreadPool);
Deleted:
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/stomp/ProtocolConverter.java
===================================================================
---
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/stomp/ProtocolConverter.java 2010-01-20
09:17:21 UTC (rev 8808)
+++
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/stomp/ProtocolConverter.java 2010-01-20
11:02:05 UTC (rev 8809)
@@ -1,86 +0,0 @@
-/*
- * Copyright 2009 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied. See the License for the specific language governing
- * permissions and limitations under the License.
- */
-
-package org.hornetq.integration.stomp;
-
-import java.util.Map;
-
-import org.hornetq.api.core.client.HornetQClient;
-import org.hornetq.core.remoting.Packet;
-import org.hornetq.core.remoting.impl.wireformat.CreateSessionMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionCloseMessage;
-import org.hornetq.utils.UUIDGenerator;
-import org.hornetq.utils.VersionLoader;
-
-/**
- * A ProtocolConverter
- *
- * @author <a href="mailto:jmesnil@redhat.com">Jeff Mesnil</a>
- *
- *
- */
-public class ProtocolConverter
-{
-
- public Packet toPacket(StompFrame frame)
- {
- String command = frame.getCommand();
- Map<String, Object> headers = frame.getHeaders();
- if (Stomp.Commands.CONNECT.equals(command))
- {
- String login = (String)headers.get("login");
- String password = (String)headers.get("passcode");
-
- String name = UUIDGenerator.getInstance().generateStringUUID();
- long sessionChannelID = 12;
- return new CreateSessionMessage(name,
- sessionChannelID,
-
VersionLoader.getVersion().getIncrementingVersion(),
- login,
- password,
- HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE,
- false,
- true,
- true,
- false,
-
HornetQClient.DEFAULT_CONFIRMATION_WINDOW_SIZE);
- }
- if (Stomp.Commands.DISCONNECT.equals(command))
- {
- return new SessionCloseMessage();
- }
- else
- {
- throw new RuntimeException("frame not supported: " + frame);
- }
- }
-
- // Constants -----------------------------------------------------
-
- // Attributes ----------------------------------------------------
-
- // Static --------------------------------------------------------
-
- // Constructors --------------------------------------------------
-
- // Public --------------------------------------------------------
-
- // Package protected ---------------------------------------------
-
- // Protected -----------------------------------------------------
-
- // Private -------------------------------------------------------
-
- // Inner classes -------------------------------------------------
-
-}
Modified:
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/stomp/StompPacketDecoder.java
===================================================================
---
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/stomp/StompPacketDecoder.java 2010-01-20
09:17:21 UTC (rev 8808)
+++
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/stomp/StompPacketDecoder.java 2010-01-20
11:02:05 UTC (rev 8809)
@@ -14,39 +14,38 @@
package org.hornetq.integration.stomp;
import org.hornetq.api.core.HornetQBuffer;
-import org.hornetq.core.remoting.Packet;
-import org.hornetq.core.remoting.PacketDecoder;
+import org.hornetq.core.buffers.impl.ChannelBufferWrapper;
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.channel.ChannelPipelineCoverage;
+import org.jboss.netty.channel.Channels;
+import org.jboss.netty.channel.MessageEvent;
+import org.jboss.netty.channel.SimpleChannelHandler;
/**
* A StompPacketDecoder
*
* @author <a href="mailto:jmesnil@redhat.com">Jeff Mesnil</a>
*/
-public class StompPacketDecoder implements PacketDecoder
+@ChannelPipelineCoverage("one")
+public class StompPacketDecoder extends SimpleChannelHandler
{
- private final StompMarshaller marshaller = new StompMarshaller();
-
- private final ProtocolConverter converter = new ProtocolConverter();
-
+ private final StompMarshaller marshaller;
+
// PacketDecoder implementation ----------------------------------
- public Packet decode(HornetQBuffer in)
+ public StompPacketDecoder(final StompMarshaller marshaller)
{
- StompFrame frame;
- try
- {
- frame = marshaller.unmarshal(in);
- System.out.println(">>> " + frame);
- Packet packet = converter.toPacket(frame);
- packet.setChannelID(1);
- System.out.println(">>> " + packet);
+ this.marshaller = marshaller;
+ }
- return packet;
- }
- catch (Exception e)
- {
- e.printStackTrace();
- return null;
- }
+ @Override
+ public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws
Exception
+ {
+ ChannelBuffer in = (ChannelBuffer)e.getMessage();
+ HornetQBuffer buffer = new ChannelBufferWrapper(in);
+ StompFrame frame = marshaller.unmarshal(buffer);
+
+ Channels.fireMessageReceived(ctx, frame);
}
}
Modified:
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/ChannelPipelineSupport.java
===================================================================
---
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/ChannelPipelineSupport.java 2010-01-20
09:17:21 UTC (rev 8808)
+++
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/ChannelPipelineSupport.java 2010-01-20
11:02:05 UTC (rev 8809)
@@ -17,6 +17,8 @@
import javax.net.ssl.SSLEngine;
import org.hornetq.integration.stomp.StompFrameDelimiter;
+import org.hornetq.integration.stomp.StompMarshaller;
+import org.hornetq.integration.stomp.StompPacketDecoder;
import org.hornetq.spi.core.remoting.BufferHandler;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.handler.ssl.SslHandler;
@@ -45,10 +47,12 @@
// Public --------------------------------------------------------
- public static void addStompCodecFilter(final ChannelPipeline pipeline, final
BufferHandler handler)
+ public static void addStompStack(final ChannelPipeline pipeline, final ServerHolder
serverHandler)
{
assert pipeline != null;
+ StompMarshaller marshaller = new StompMarshaller();
pipeline.addLast("delimiter", new StompFrameDelimiter());
+ pipeline.addLast("codec", new StompPacketDecoder(marshaller));
}
public static void addHornetQCodecFilter(final ChannelPipeline pipeline, final
BufferHandler handler)
Modified:
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/HornetQChannelHandler.java
===================================================================
---
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/HornetQChannelHandler.java 2010-01-20
09:17:21 UTC (rev 8808)
+++
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/HornetQChannelHandler.java 2010-01-20
11:02:05 UTC (rev 8809)
@@ -13,12 +13,8 @@
package org.hornetq.integration.transports.netty;
import org.hornetq.api.core.HornetQException;
-import org.hornetq.core.buffers.impl.ChannelBufferWrapper;
import org.hornetq.core.logging.Logger;
-import org.hornetq.core.remoting.PacketDecoder;
-import org.hornetq.spi.core.remoting.BufferHandler;
import org.hornetq.spi.core.remoting.ConnectionLifeCycleListener;
-import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelStateEvent;
import org.jboss.netty.channel.ExceptionEvent;
@@ -32,28 +28,20 @@
* @author <a href="mailto:tlee@redhat.com">Trustin Lee</a>
* @version $Rev$, $Date$
*/
-public class HornetQChannelHandler extends SimpleChannelHandler
+public abstract class HornetQChannelHandler extends SimpleChannelHandler
{
private static final Logger log = Logger.getLogger(HornetQChannelHandler.class);
private final ChannelGroup group;
- private final PacketDecoder decoder;
-
- private final BufferHandler handler;
-
private final ConnectionLifeCycleListener listener;
volatile boolean active;
public HornetQChannelHandler(final ChannelGroup group,
- final PacketDecoder decoder,
- final BufferHandler handler,
final ConnectionLifeCycleListener listener)
{
this.group = group;
- this.decoder = decoder;
- this.handler = handler;
this.listener = listener;
}
@@ -63,16 +51,10 @@
group.add(e.getChannel());
ctx.sendUpstream(e);
}
+
+ public abstract void messageReceived(final ChannelHandlerContext ctx, final
MessageEvent e) throws Exception;
@Override
- public void messageReceived(final ChannelHandlerContext ctx, final MessageEvent e)
throws Exception
- {
- ChannelBuffer buffer = (ChannelBuffer)e.getMessage();
-
- handler.bufferReceived(e.getChannel().getId(), new ChannelBufferWrapper(buffer),
decoder);
- }
-
- @Override
public void channelDisconnected(final ChannelHandlerContext ctx, final
ChannelStateEvent e) throws Exception
{
synchronized (this)
Modified:
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/NettyAcceptor.java
===================================================================
---
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/NettyAcceptor.java 2010-01-20
09:17:21 UTC (rev 8808)
+++
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/NettyAcceptor.java 2010-01-20
11:02:05 UTC (rev 8809)
@@ -15,6 +15,7 @@
import java.net.InetSocketAddress;
import java.net.SocketAddress;
+import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@@ -26,25 +27,36 @@
import javax.net.ssl.SSLContext;
+import org.hornetq.api.core.HornetQBuffer;
+import org.hornetq.api.core.HornetQBuffers;
import org.hornetq.api.core.HornetQException;
import org.hornetq.api.core.SimpleString;
import org.hornetq.api.core.TransportConfiguration;
+import org.hornetq.api.core.client.HornetQClient;
import org.hornetq.api.core.management.NotificationType;
+import org.hornetq.core.buffers.impl.ChannelBufferWrapper;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.remoting.PacketDecoder;
+import org.hornetq.core.remoting.RemotingConnection;
import org.hornetq.core.remoting.impl.CorePacketDecoder;
import org.hornetq.core.remoting.impl.ssl.SSLSupport;
+import org.hornetq.core.server.HornetQServer;
+import org.hornetq.core.server.ServerSession;
import org.hornetq.core.server.management.Notification;
import org.hornetq.core.server.management.NotificationService;
-import org.hornetq.integration.stomp.StompPacketDecoder;
+import org.hornetq.integration.stomp.Stomp;
+import org.hornetq.integration.stomp.StompFrame;
+import org.hornetq.integration.stomp.StompMarshaller;
import org.hornetq.spi.core.remoting.Acceptor;
import org.hornetq.spi.core.remoting.BufferHandler;
import org.hornetq.spi.core.remoting.Connection;
import org.hornetq.spi.core.remoting.ConnectionLifeCycleListener;
import org.hornetq.utils.ConfigurationHelper;
import org.hornetq.utils.TypedProperties;
+import org.hornetq.utils.UUIDGenerator;
import org.hornetq.utils.VersionLoader;
import org.jboss.netty.bootstrap.ServerBootstrap;
+import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFactory;
import org.jboss.netty.channel.ChannelFuture;
@@ -55,6 +67,7 @@
import org.jboss.netty.channel.ChannelPipelineFactory;
import org.jboss.netty.channel.ChannelStateEvent;
import org.jboss.netty.channel.Channels;
+import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.group.ChannelGroup;
import org.jboss.netty.channel.group.ChannelGroupFuture;
import org.jboss.netty.channel.group.DefaultChannelGroup;
@@ -106,6 +119,7 @@
private final boolean useInvm;
private final String protocol;
+
private final String host;
private final int port;
@@ -134,14 +148,19 @@
private VirtualExecutorService bossExecutor;
+ private ServerHolder serverHandler;
+
public NettyAcceptor(final Map<String, Object> configuration,
final BufferHandler handler,
+ final ServerHolder serverHandler,
final ConnectionLifeCycleListener listener,
final Executor threadPool,
final ScheduledExecutorService scheduledThreadPool)
{
this.handler = handler;
+ this.serverHandler = serverHandler;
+
this.listener = listener;
sslEnabled =
ConfigurationHelper.getBooleanProperty(TransportConstants.SSL_ENABLED_PROP_NAME,
@@ -181,8 +200,8 @@
TransportConstants.DEFAULT_USE_INVM,
configuration);
protocol =
ConfigurationHelper.getStringProperty(TransportConstants.PROTOCOL_PROP_NAME,
- TransportConstants.DEFAULT_PROTOCOL,
- configuration);
+
TransportConstants.DEFAULT_PROTOCOL,
+ configuration);
host = ConfigurationHelper.getStringProperty(TransportConstants.HOST_PROP_NAME,
TransportConstants.DEFAULT_HOST,
configuration);
@@ -286,18 +305,21 @@
pipeline.addLast("httpResponseEncoder", new
HttpResponseEncoder());
pipeline.addLast("httphandler", new
HttpAcceptorHandler(httpKeepAliveRunnable, httpResponseTime));
}
- PacketDecoder decoder;
if (protocol.equals(TransportConstants.STOMP_PROTOCOL))
{
- ChannelPipelineSupport.addStompCodecFilter(pipeline, handler);
- decoder = new StompPacketDecoder();
- } else
+ ChannelPipelineSupport.addStompStack(pipeline, serverHandler);
+ pipeline.addLast("handler", new
StompChannelHandler(serverHandler, new StompMarshaller(), channelGroup, new Listener()));
+ }
+ else
{
ChannelPipelineSupport.addHornetQCodecFilter(pipeline, handler);
- decoder = new CorePacketDecoder();
+ PacketDecoder decoder = new CorePacketDecoder();
+ pipeline.addLast("handler", new
HornetQServerChannelHandler(channelGroup,
+ decoder,
+ handler,
+ new
Listener()));
}
-
- pipeline.addLast("handler", new
HornetQServerChannelHandler(channelGroup, decoder, handler, new Listener()));
+
return pipeline;
}
};
@@ -487,15 +509,115 @@
// Inner classes
-----------------------------------------------------------------------------
+ private final class HornetQServerChannelHandler extends AbstractServerChannelHandler
+ {
+ private PacketDecoder decoder;
+ private BufferHandler handler;
+
+ HornetQServerChannelHandler(final ChannelGroup group,
+ final PacketDecoder decoder,
+ final BufferHandler handler,
+ final ConnectionLifeCycleListener listener)
+ {
+ super(group, listener);
+
+ this.decoder = decoder;
+ this.handler = handler;
+ }
+
+ @Override
+ public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws
Exception
+ {
+ ChannelBuffer buffer = (ChannelBuffer)e.getMessage();
+
+ handler.bufferReceived(e.getChannel().getId(), new ChannelBufferWrapper(buffer),
decoder);
+ }
+
+ }
+
@ChannelPipelineCoverage("one")
- private final class HornetQServerChannelHandler extends HornetQChannelHandler
+ public final class StompChannelHandler extends AbstractServerChannelHandler
{
- HornetQServerChannelHandler(final ChannelGroup group,
- final PacketDecoder decoder,
- final BufferHandler handler,
+ private final StompMarshaller marshaller;
+
+ private ServerHolder serverHandler;
+
+ public StompChannelHandler(ServerHolder serverHolder,
+ StompMarshaller marshaller,
+ final ChannelGroup group,
+ final ConnectionLifeCycleListener listener)
+ {
+ super(group, listener);
+ this.serverHandler = serverHolder;
+ this.marshaller = marshaller;
+ }
+
+ public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws
Exception
+ {
+ StompFrame frame = (StompFrame)e.getMessage();
+ System.out.println(">>> got frame " + frame);
+
+ // need to interact with HornetQ server & session
+ HornetQServer server = serverHandler.getServer();
+ RemotingConnection connection =
serverHandler.getRemotingConnection(e.getChannel().getId());
+
+ String command = frame.getCommand();
+
+ StompFrame response = null;
+ if (Stomp.Commands.CONNECT.equals(command))
+ {
+ response = onConnect(frame, server, connection);
+ }
+ else
+ {
+ log.error("Unsupported Stomp frame: " + frame);
+ }
+ if (response != null)
+ {
+ System.out.println(">>> will reply " + response);
+ byte[] bytes = marshaller.marshal(response);
+ HornetQBuffer buffer = HornetQBuffers.wrappedBuffer(bytes);
+ System.out.println("ready to send reply: " + buffer);
+ connection.getTransportConnection().write(buffer, true);
+ }
+ }
+
+ private StompFrame onConnect(StompFrame frame, HornetQServer server,
RemotingConnection connection) throws Exception
+ {
+ Map<String, Object> headers = frame.getHeaders();
+ String login = (String)headers.get(Stomp.Headers.Connect.LOGIN);
+ String passcode = (String)headers.get(Stomp.Headers.Connect.PASSCODE);
+ String requestID = (String)headers.get(Stomp.Headers.Connect.REQUEST_ID);
+
+ String name = UUIDGenerator.getInstance().generateStringUUID();
+ server.createSession(name,
+ 1,
+ login,
+ passcode,
+ HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE,
+ VersionLoader.getVersion().getIncrementingVersion(),
+ connection,
+ true,
+ true,
+ false,
+ false,
+ -1);
+ ServerSession session = server.getSession(name);
+ System.out.println(">>> created session " + session);
+ HashMap<String, Object> h = new HashMap<String, Object>();
+ h.put(Stomp.Headers.Connected.SESSION, name);
+ h.put(Stomp.Headers.Connected.RESPONSE_ID, requestID);
+ return new StompFrame(Stomp.Responses.CONNECTED, h, new byte[] {});
+ }
+ }
+
+ @ChannelPipelineCoverage("one")
+ public abstract class AbstractServerChannelHandler extends HornetQChannelHandler
+ {
+ protected AbstractServerChannelHandler(final ChannelGroup group,
final ConnectionLifeCycleListener listener)
{
- super(group, decoder, handler, listener);
+ super(group, listener);
}
@Override
Modified:
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/NettyAcceptorFactory.java
===================================================================
---
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/NettyAcceptorFactory.java 2010-01-20
09:17:21 UTC (rev 8808)
+++
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/NettyAcceptorFactory.java 2010-01-20
11:02:05 UTC (rev 8809)
@@ -32,11 +32,12 @@
{
public Acceptor createAcceptor(final Map<String, Object> configuration,
final BufferHandler handler,
+ final ServerHolder serverHolder,
final ConnectionLifeCycleListener listener,
final Executor threadPool,
final ScheduledExecutorService scheduledThreadPool)
{
- return new NettyAcceptor(configuration, handler, listener, threadPool,
scheduledThreadPool);
+ return new NettyAcceptor(configuration, handler, serverHolder, listener,
threadPool, scheduledThreadPool);
}
public Set<String> getAllowableProperties()
Modified:
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/NettyConnector.java
===================================================================
---
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/NettyConnector.java 2010-01-20
09:17:21 UTC (rev 8808)
+++
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/NettyConnector.java 2010-01-20
11:02:05 UTC (rev 8809)
@@ -29,6 +29,7 @@
import javax.net.ssl.SSLException;
import org.hornetq.api.core.HornetQException;
+import org.hornetq.core.buffers.impl.ChannelBufferWrapper;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.remoting.PacketDecoder;
import org.hornetq.core.remoting.impl.CorePacketDecoder;
@@ -441,12 +442,23 @@
@ChannelPipelineCoverage("one")
private final class HornetQClientChannelHandler extends HornetQChannelHandler
{
+ private BufferHandler handler;
+
HornetQClientChannelHandler(final ChannelGroup group,
final BufferHandler handler,
final ConnectionLifeCycleListener listener)
{
- super(group, decoder, handler, listener);
+ super(group, listener);
+ this.handler = handler;
}
+
+ @Override
+ public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws
Exception
+ {
+ ChannelBuffer buffer = (ChannelBuffer)e.getMessage();
+
+ handler.bufferReceived(e.getChannel().getId(), new ChannelBufferWrapper(buffer),
decoder);
+ }
}
@ChannelPipelineCoverage("one")
Added:
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/ServerHolder.java
===================================================================
---
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/ServerHolder.java
(rev 0)
+++
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/ServerHolder.java 2010-01-20
11:02:05 UTC (rev 8809)
@@ -0,0 +1,31 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.integration.transports.netty;
+
+import org.hornetq.core.remoting.RemotingConnection;
+import org.hornetq.core.server.HornetQServer;
+
+/**
+ * A ServerHolder
+ *
+ * @author jmesnil
+ *
+ *
+ */
+public interface ServerHolder
+{
+ HornetQServer getServer();
+
+ RemotingConnection getRemotingConnection(int connectionID);
+}
Modified:
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/spi/core/remoting/AcceptorFactory.java
===================================================================
---
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/spi/core/remoting/AcceptorFactory.java 2010-01-20
09:17:21 UTC (rev 8808)
+++
branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/spi/core/remoting/AcceptorFactory.java 2010-01-20
11:02:05 UTC (rev 8809)
@@ -18,6 +18,8 @@
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
+import org.hornetq.integration.transports.netty.ServerHolder;
+
/**
* A factory for creating acceptors.
* <p/>
@@ -41,6 +43,7 @@
*/
Acceptor createAcceptor(final Map<String, Object> configuration,
BufferHandler handler,
+ ServerHolder holder,
ConnectionLifeCycleListener listener,
Executor threadPool,
ScheduledExecutorService scheduledThreadPool);
Modified:
branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/integration/http/NettyHttpTest.java
===================================================================
---
branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/integration/http/NettyHttpTest.java 2010-01-20
09:17:21 UTC (rev 8808)
+++
branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/integration/http/NettyHttpTest.java 2010-01-20
11:02:05 UTC (rev 8809)
@@ -32,6 +32,7 @@
import org.hornetq.spi.core.remoting.BufferHandler;
import org.hornetq.spi.core.remoting.Connection;
import org.hornetq.spi.core.remoting.ConnectionLifeCycleListener;
+import org.hornetq.tests.unit.core.remoting.impl.netty.DummyServerHolder;
import org.hornetq.tests.util.UnitTestCase;
/**
@@ -98,7 +99,8 @@
conf.put(TransportConstants.HTTP_CLIENT_IDLE_SCAN_PERIOD, -1l);
DummyConnectionLifeCycleListener acceptorListener = new
DummyConnectionLifeCycleListener(connCreatedLatch);
SimpleBufferHandler acceptorHandler = new SimpleBufferHandler(acceptorLatch);
- acceptor = new NettyAcceptor(conf, acceptorHandler, acceptorListener, threadPool,
scheduledThreadPool);
+ DummyServerHolder serverHolder = new DummyServerHolder();
+ acceptor = new NettyAcceptor(conf, acceptorHandler, serverHolder, acceptorListener,
threadPool, scheduledThreadPool);
acceptor.start();
SimpleBufferHandler2 connectorHandler = new SimpleBufferHandler2(connectorLatch);
@@ -152,7 +154,8 @@
conf.put(TransportConstants.HTTP_CLIENT_IDLE_SCAN_PERIOD, -1l);
DummyConnectionLifeCycleListener acceptorListener = new
DummyConnectionLifeCycleListener(connCreatedLatch);
SimpleBufferHandler acceptorHandler = new SimpleBufferHandler(acceptorLatch);
- acceptor = new NettyAcceptor(conf, acceptorHandler, acceptorListener, threadPool,
scheduledThreadPool);
+ DummyServerHolder serverHolder = new DummyServerHolder();
+ acceptor = new NettyAcceptor(conf, acceptorHandler, serverHolder, acceptorListener,
threadPool, scheduledThreadPool);
acceptor.start();
SimpleBufferHandler2 connectorHandler = new SimpleBufferHandler2(connectorLatch);
@@ -210,7 +213,8 @@
conf.put(TransportConstants.HTTP_CLIENT_IDLE_SCAN_PERIOD, -1l);
DummyConnectionLifeCycleListener acceptorListener = new
DummyConnectionLifeCycleListener(connCreatedLatch);
SimpleBufferHandler acceptorHandler = new SimpleBufferHandler(acceptorLatch);
- acceptor = new NettyAcceptor(conf, acceptorHandler, acceptorListener, threadPool,
scheduledThreadPool);
+ DummyServerHolder serverHolder = new DummyServerHolder();
+ acceptor = new NettyAcceptor(conf, acceptorHandler, serverHolder, acceptorListener,
threadPool, scheduledThreadPool);
acceptor.start();
SimpleBufferHandler connectorHandler = new SimpleBufferHandler(connectorLatch);
@@ -269,7 +273,8 @@
conf.put(TransportConstants.HTTP_CLIENT_IDLE_SCAN_PERIOD, -1l);
DummyConnectionLifeCycleListener acceptorListener = new
DummyConnectionLifeCycleListener(connCreatedLatch);
SimpleBufferHandler acceptorHandler = new SimpleBufferHandler(acceptorLatch);
- acceptor = new NettyAcceptor(conf, acceptorHandler, acceptorListener, threadPool,
scheduledThreadPool);
+ DummyServerHolder serverHolder = new DummyServerHolder();
+ acceptor = new NettyAcceptor(conf, acceptorHandler, serverHolder, acceptorListener,
threadPool, scheduledThreadPool);
acceptor.start();
SimpleBufferHandler connectorHandler = new SimpleBufferHandler(connectorLatch);
@@ -327,7 +332,8 @@
conf.put(TransportConstants.HTTP_CLIENT_IDLE_PROP_NAME, 500l);
DummyConnectionLifeCycleListener acceptorListener = new
DummyConnectionLifeCycleListener(connCreatedLatch);
SimpleBufferHandler acceptorHandler = new SimpleBufferHandler(acceptorLatch);
- acceptor = new NettyAcceptor(conf, acceptorHandler, acceptorListener, threadPool,
scheduledThreadPool);
+ DummyServerHolder serverHolder = new DummyServerHolder();
+ acceptor = new NettyAcceptor(conf, acceptorHandler, serverHolder, acceptorListener,
threadPool, scheduledThreadPool);
acceptor.start();
SimpleBufferHandler connectorHandler = new SimpleBufferHandler(connectorLatch);
@@ -381,7 +387,8 @@
conf.put(TransportConstants.HTTP_SERVER_SCAN_PERIOD_PROP_NAME, 5000l);
DummyConnectionLifeCycleListener acceptorListener = new
DummyConnectionLifeCycleListener(connCreatedLatch);
SimpleBufferHandler acceptorHandler = new SimpleBufferHandler(acceptorLatch);
- acceptor = new NettyAcceptor(conf, acceptorHandler, acceptorListener, threadPool,
scheduledThreadPool);
+ DummyServerHolder serverHolder = new DummyServerHolder();
+ acceptor = new NettyAcceptor(conf, acceptorHandler, serverHolder, acceptorListener,
threadPool, scheduledThreadPool);
acceptor.start();
BogusResponseHandler connectorHandler = new BogusResponseHandler(connectorLatch);
@@ -428,7 +435,8 @@
conf.put(TransportConstants.HTTP_SERVER_SCAN_PERIOD_PROP_NAME, 5000l);
DummyConnectionLifeCycleListener acceptorListener = new
DummyConnectionLifeCycleListener(connCreatedLatch);
SimpleBufferHandler2 acceptorHandler = new SimpleBufferHandler2(acceptorLatch);
- acceptor = new NettyAcceptor(conf, acceptorHandler, acceptorListener, threadPool,
scheduledThreadPool);
+ DummyServerHolder serverHolder = new DummyServerHolder();
+ acceptor = new NettyAcceptor(conf, acceptorHandler, serverHolder, acceptorListener,
threadPool, scheduledThreadPool);
acceptor.start();
BogusResponseHandler connectorHandler = new BogusResponseHandler(connectorLatch);
Added:
branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/DummyServerHolder.java
===================================================================
---
branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/DummyServerHolder.java
(rev 0)
+++
branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/DummyServerHolder.java 2010-01-20
11:02:05 UTC (rev 8809)
@@ -0,0 +1,57 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.unit.core.remoting.impl.netty;
+
+import org.hornetq.core.remoting.RemotingConnection;
+import org.hornetq.core.server.HornetQServer;
+import org.hornetq.integration.transports.netty.ServerHolder;
+
+/**
+ * A DummyServerHolder
+ *
+ * @author jmesnil
+ *
+ *
+ */
+public class DummyServerHolder implements ServerHolder
+{
+ public HornetQServer getServer()
+ {
+ return null;
+ }
+
+ public RemotingConnection getRemotingConnection(int connectionID)
+ {
+ return null;
+ }
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
Modified:
branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyAcceptorFactoryTest.java
===================================================================
---
branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyAcceptorFactoryTest.java 2010-01-20
09:17:21 UTC (rev 8808)
+++
branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyAcceptorFactoryTest.java 2010-01-20
11:02:05 UTC (rev 8809)
@@ -24,6 +24,7 @@
import org.hornetq.core.remoting.PacketDecoder;
import org.hornetq.integration.transports.netty.NettyAcceptor;
import org.hornetq.integration.transports.netty.NettyAcceptorFactory;
+import org.hornetq.integration.transports.netty.ServerHolder;
import org.hornetq.spi.core.remoting.Acceptor;
import org.hornetq.spi.core.remoting.BufferHandler;
import org.hornetq.spi.core.remoting.Connection;
@@ -67,9 +68,10 @@
{
}
};
-
+ ServerHolder holder = new DummyServerHolder();
Acceptor acceptor = factory.createAcceptor(params,
handler,
+ holder,
listener,
Executors.newCachedThreadPool(),
Executors.newScheduledThreadPool(ConfigurationImpl.DEFAULT_SCHEDULED_THREAD_POOL_MAX_SIZE));
Modified:
branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyAcceptorTest.java
===================================================================
---
branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyAcceptorTest.java 2010-01-20
09:17:21 UTC (rev 8808)
+++
branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyAcceptorTest.java 2010-01-20
11:02:05 UTC (rev 8809)
@@ -24,6 +24,7 @@
import org.hornetq.core.config.impl.ConfigurationImpl;
import org.hornetq.core.remoting.PacketDecoder;
import org.hornetq.integration.transports.netty.NettyAcceptor;
+import org.hornetq.integration.transports.netty.ServerHolder;
import org.hornetq.integration.transports.netty.TransportConstants;
import org.hornetq.spi.core.remoting.BufferHandler;
import org.hornetq.spi.core.remoting.Connection;
@@ -81,8 +82,11 @@
{
}
};
+ ServerHolder holder = new DummyServerHolder();
+
NettyAcceptor acceptor = new NettyAcceptor(params,
handler,
+ holder,
listener,
Executors.newCachedThreadPool(),
Executors.newScheduledThreadPool(ConfigurationImpl.DEFAULT_SCHEDULED_THREAD_POOL_MAX_SIZE));