[hornetq-commits] JBoss hornetq SVN: r8809 - in branches/HORNETQ-129_STOMP_protocol: src/main/org/hornetq/core/remoting/server/impl and 5 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Wed Jan 20 06:02:07 EST 2010


Author: jmesnil
Date: 2010-01-20 06:02:05 -0500 (Wed, 20 Jan 2010)
New Revision: 8809

Added:
   branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/ServerHolder.java
   branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/DummyServerHolder.java
Removed:
   branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/stomp/ProtocolConverter.java
Modified:
   branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/remoting/impl/invm/InVMAcceptor.java
   branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/remoting/impl/invm/InVMAcceptorFactory.java
   branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java
   branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/stomp/StompPacketDecoder.java
   branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/ChannelPipelineSupport.java
   branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/HornetQChannelHandler.java
   branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/NettyAcceptor.java
   branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/NettyAcceptorFactory.java
   branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/NettyConnector.java
   branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/spi/core/remoting/AcceptorFactory.java
   branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/integration/http/NettyHttpTest.java
   branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyAcceptorFactoryTest.java
   branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyAcceptorTest.java
Log:
https://jira.jboss.org/jira/browse/HORNETQ-129: Implement STOMP v1.0

* barely enough ugly code to make StompTest.testConnect pass :)

Modified: branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/remoting/impl/invm/InVMAcceptor.java
===================================================================
--- branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/remoting/impl/invm/InVMAcceptor.java	2010-01-20 09:17:21 UTC (rev 8808)
+++ branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/remoting/impl/invm/InVMAcceptor.java	2010-01-20 11:02:05 UTC (rev 8809)
@@ -23,6 +23,7 @@
 import org.hornetq.core.logging.Logger;
 import org.hornetq.core.server.management.Notification;
 import org.hornetq.core.server.management.NotificationService;
+import org.hornetq.integration.transports.netty.ServerHolder;
 import org.hornetq.spi.core.remoting.Acceptor;
 import org.hornetq.spi.core.remoting.BufferHandler;
 import org.hornetq.spi.core.remoting.Connection;
@@ -60,6 +61,7 @@
 
    public InVMAcceptor(final Map<String, Object> configuration,
                        final BufferHandler handler,
+                       final ServerHolder holder,
                        final ConnectionLifeCycleListener listener,
                        final Executor threadPool)
    {

Modified: branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/remoting/impl/invm/InVMAcceptorFactory.java
===================================================================
--- branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/remoting/impl/invm/InVMAcceptorFactory.java	2010-01-20 09:17:21 UTC (rev 8808)
+++ branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/remoting/impl/invm/InVMAcceptorFactory.java	2010-01-20 11:02:05 UTC (rev 8809)
@@ -17,6 +17,7 @@
 import java.util.concurrent.Executor;
 import java.util.concurrent.ScheduledExecutorService;
 
+import org.hornetq.integration.transports.netty.ServerHolder;
 import org.hornetq.spi.core.remoting.Acceptor;
 import org.hornetq.spi.core.remoting.AcceptorFactory;
 import org.hornetq.spi.core.remoting.BufferHandler;
@@ -32,11 +33,12 @@
 {
    public Acceptor createAcceptor(final Map<String, Object> configuration,
                                   final BufferHandler handler,
+                                  final ServerHolder holder,
                                   final ConnectionLifeCycleListener listener,
                                   final Executor threadPool,
                                   final ScheduledExecutorService scheduledThreadPool)
    {
-      return new InVMAcceptor(configuration, handler, listener, threadPool);
+      return new InVMAcceptor(configuration, handler, holder, listener, threadPool);
    }
 
    public Set<String> getAllowableProperties()

Modified: branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java
===================================================================
--- branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java	2010-01-20 09:17:21 UTC (rev 8808)
+++ branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java	2010-01-20 11:02:05 UTC (rev 8809)
@@ -41,6 +41,7 @@
 import org.hornetq.core.server.HornetQServer;
 import org.hornetq.core.server.impl.HornetQPacketHandler;
 import org.hornetq.core.server.management.ManagementService;
+import org.hornetq.integration.transports.netty.ServerHolder;
 import org.hornetq.spi.core.remoting.Acceptor;
 import org.hornetq.spi.core.remoting.AcceptorFactory;
 import org.hornetq.spi.core.remoting.BufferHandler;
@@ -161,6 +162,27 @@
 
             Acceptor acceptor = factory.createAcceptor(info.getParams(),
                                                        bufferHandler,
+                                                       new ServerHolder()
+            {
+               public HornetQServer getServer()
+               {
+                  return server;
+               }
+               
+               public RemotingConnection getRemotingConnection(int connectionID)
+               {
+                  ConnectionEntry conn = connections.get(connectionID);
+
+                  if (conn != null)
+                  {
+                     return conn.connection;
+                  }
+                  else
+                  {
+                     return null;
+                  }
+               }
+            },
                                                        this,
                                                        threadPool,
                                                        scheduledThreadPool);

Deleted: branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/stomp/ProtocolConverter.java
===================================================================
--- branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/stomp/ProtocolConverter.java	2010-01-20 09:17:21 UTC (rev 8808)
+++ branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/stomp/ProtocolConverter.java	2010-01-20 11:02:05 UTC (rev 8809)
@@ -1,86 +0,0 @@
-/*
- * Copyright 2009 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *    http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied.  See the License for the specific language governing
- * permissions and limitations under the License.
- */
-
-package org.hornetq.integration.stomp;
-
-import java.util.Map;
-
-import org.hornetq.api.core.client.HornetQClient;
-import org.hornetq.core.remoting.Packet;
-import org.hornetq.core.remoting.impl.wireformat.CreateSessionMessage;
-import org.hornetq.core.remoting.impl.wireformat.SessionCloseMessage;
-import org.hornetq.utils.UUIDGenerator;
-import org.hornetq.utils.VersionLoader;
-
-/**
- * A ProtocolConverter
- *
- * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
- *
- *
- */
-public class ProtocolConverter
-{
-
-   public Packet toPacket(StompFrame frame)
-   {
-      String command = frame.getCommand();
-      Map<String, Object> headers = frame.getHeaders();
-      if (Stomp.Commands.CONNECT.equals(command))
-      {
-         String login = (String)headers.get("login");
-         String password = (String)headers.get("passcode");
-
-         String name = UUIDGenerator.getInstance().generateStringUUID();
-         long sessionChannelID = 12;
-         return new CreateSessionMessage(name,
-                                         sessionChannelID,
-                                         VersionLoader.getVersion().getIncrementingVersion(),
-                                         login,
-                                         password,
-                                         HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE,
-                                         false,
-                                         true,
-                                         true,
-                                         false,
-                                         HornetQClient.DEFAULT_CONFIRMATION_WINDOW_SIZE);
-      }
-      if (Stomp.Commands.DISCONNECT.equals(command))
-      {
-         return new SessionCloseMessage();
-      }
-      else
-      {
-         throw new RuntimeException("frame not supported: " + frame);
-      }
-   }
-
-   // Constants -----------------------------------------------------
-
-   // Attributes ----------------------------------------------------
-
-   // Static --------------------------------------------------------
-
-   // Constructors --------------------------------------------------
-
-   // Public --------------------------------------------------------
-
-   // Package protected ---------------------------------------------
-
-   // Protected -----------------------------------------------------
-
-   // Private -------------------------------------------------------
-
-   // Inner classes -------------------------------------------------
-
-}

Modified: branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/stomp/StompPacketDecoder.java
===================================================================
--- branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/stomp/StompPacketDecoder.java	2010-01-20 09:17:21 UTC (rev 8808)
+++ branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/stomp/StompPacketDecoder.java	2010-01-20 11:02:05 UTC (rev 8809)
@@ -14,39 +14,38 @@
 package org.hornetq.integration.stomp;
 
 import org.hornetq.api.core.HornetQBuffer;
-import org.hornetq.core.remoting.Packet;
-import org.hornetq.core.remoting.PacketDecoder;
+import org.hornetq.core.buffers.impl.ChannelBufferWrapper;
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.channel.ChannelPipelineCoverage;
+import org.jboss.netty.channel.Channels;
+import org.jboss.netty.channel.MessageEvent;
+import org.jboss.netty.channel.SimpleChannelHandler;
 
 /**
  * A StompPacketDecoder
  *
  * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
  */
-public class StompPacketDecoder implements PacketDecoder
+ at ChannelPipelineCoverage("one")
+public class StompPacketDecoder extends SimpleChannelHandler
 {
-   private final StompMarshaller marshaller = new StompMarshaller();
-
-   private final ProtocolConverter converter = new ProtocolConverter();
-
+   private final StompMarshaller marshaller;
+   
    // PacketDecoder implementation ----------------------------------
 
-   public Packet decode(HornetQBuffer in)
+   public StompPacketDecoder(final StompMarshaller marshaller)
    {
-      StompFrame frame;
-      try
-      {
-         frame = marshaller.unmarshal(in);
-         System.out.println(">>> " + frame);
-         Packet packet = converter.toPacket(frame);
-         packet.setChannelID(1);
-         System.out.println(">>> " + packet);
+      this.marshaller = marshaller;
+   }
 
-         return packet;
-      }
-      catch (Exception e)
-      {
-         e.printStackTrace();
-         return null;
-      }
+   @Override
+   public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception
+   {
+      ChannelBuffer in = (ChannelBuffer)e.getMessage();
+      HornetQBuffer buffer = new ChannelBufferWrapper(in);
+      StompFrame frame = marshaller.unmarshal(buffer);
+      
+      Channels.fireMessageReceived(ctx, frame);
    }
 }

Modified: branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/ChannelPipelineSupport.java
===================================================================
--- branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/ChannelPipelineSupport.java	2010-01-20 09:17:21 UTC (rev 8808)
+++ branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/ChannelPipelineSupport.java	2010-01-20 11:02:05 UTC (rev 8809)
@@ -17,6 +17,8 @@
 import javax.net.ssl.SSLEngine;
 
 import org.hornetq.integration.stomp.StompFrameDelimiter;
+import org.hornetq.integration.stomp.StompMarshaller;
+import org.hornetq.integration.stomp.StompPacketDecoder;
 import org.hornetq.spi.core.remoting.BufferHandler;
 import org.jboss.netty.channel.ChannelPipeline;
 import org.jboss.netty.handler.ssl.SslHandler;
@@ -45,10 +47,12 @@
 
    // Public --------------------------------------------------------
 
-   public static void addStompCodecFilter(final ChannelPipeline pipeline, final BufferHandler handler)
+   public static void addStompStack(final ChannelPipeline pipeline, final ServerHolder serverHandler)
    {
       assert pipeline != null;
+      StompMarshaller marshaller = new StompMarshaller();
       pipeline.addLast("delimiter", new StompFrameDelimiter());
+      pipeline.addLast("codec", new StompPacketDecoder(marshaller));
    }
 
    public static void addHornetQCodecFilter(final ChannelPipeline pipeline, final BufferHandler handler)

Modified: branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/HornetQChannelHandler.java
===================================================================
--- branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/HornetQChannelHandler.java	2010-01-20 09:17:21 UTC (rev 8808)
+++ branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/HornetQChannelHandler.java	2010-01-20 11:02:05 UTC (rev 8809)
@@ -13,12 +13,8 @@
 package org.hornetq.integration.transports.netty;
 
 import org.hornetq.api.core.HornetQException;
-import org.hornetq.core.buffers.impl.ChannelBufferWrapper;
 import org.hornetq.core.logging.Logger;
-import org.hornetq.core.remoting.PacketDecoder;
-import org.hornetq.spi.core.remoting.BufferHandler;
 import org.hornetq.spi.core.remoting.ConnectionLifeCycleListener;
-import org.jboss.netty.buffer.ChannelBuffer;
 import org.jboss.netty.channel.ChannelHandlerContext;
 import org.jboss.netty.channel.ChannelStateEvent;
 import org.jboss.netty.channel.ExceptionEvent;
@@ -32,28 +28,20 @@
  * @author <a href="mailto:tlee at redhat.com">Trustin Lee</a>
  * @version $Rev$, $Date$
  */
-public class HornetQChannelHandler extends SimpleChannelHandler
+public abstract class HornetQChannelHandler extends SimpleChannelHandler
 {
    private static final Logger log = Logger.getLogger(HornetQChannelHandler.class);
 
    private final ChannelGroup group;
 
-   private final PacketDecoder decoder;
-
-   private final BufferHandler handler;
-
    private final ConnectionLifeCycleListener listener;
 
    volatile boolean active;
 
    public HornetQChannelHandler(final ChannelGroup group,
-                                final PacketDecoder decoder,
-                                final BufferHandler handler,
                                 final ConnectionLifeCycleListener listener)
    {
       this.group = group;
-      this.decoder = decoder;
-      this.handler = handler;
       this.listener = listener;
    }
 
@@ -63,16 +51,10 @@
       group.add(e.getChannel());
       ctx.sendUpstream(e);
    }
+   
+   public abstract void messageReceived(final ChannelHandlerContext ctx, final MessageEvent e) throws Exception;
 
    @Override
-   public void messageReceived(final ChannelHandlerContext ctx, final MessageEvent e) throws Exception
-   {
-      ChannelBuffer buffer = (ChannelBuffer)e.getMessage();
-
-      handler.bufferReceived(e.getChannel().getId(), new ChannelBufferWrapper(buffer), decoder);
-   }
-
-   @Override
    public void channelDisconnected(final ChannelHandlerContext ctx, final ChannelStateEvent e) throws Exception
    {
       synchronized (this)

Modified: branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/NettyAcceptor.java
===================================================================
--- branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/NettyAcceptor.java	2010-01-20 09:17:21 UTC (rev 8808)
+++ branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/NettyAcceptor.java	2010-01-20 11:02:05 UTC (rev 8809)
@@ -15,6 +15,7 @@
 
 import java.net.InetSocketAddress;
 import java.net.SocketAddress;
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
@@ -26,25 +27,36 @@
 
 import javax.net.ssl.SSLContext;
 
+import org.hornetq.api.core.HornetQBuffer;
+import org.hornetq.api.core.HornetQBuffers;
 import org.hornetq.api.core.HornetQException;
 import org.hornetq.api.core.SimpleString;
 import org.hornetq.api.core.TransportConfiguration;
+import org.hornetq.api.core.client.HornetQClient;
 import org.hornetq.api.core.management.NotificationType;
+import org.hornetq.core.buffers.impl.ChannelBufferWrapper;
 import org.hornetq.core.logging.Logger;
 import org.hornetq.core.remoting.PacketDecoder;
+import org.hornetq.core.remoting.RemotingConnection;
 import org.hornetq.core.remoting.impl.CorePacketDecoder;
 import org.hornetq.core.remoting.impl.ssl.SSLSupport;
+import org.hornetq.core.server.HornetQServer;
+import org.hornetq.core.server.ServerSession;
 import org.hornetq.core.server.management.Notification;
 import org.hornetq.core.server.management.NotificationService;
-import org.hornetq.integration.stomp.StompPacketDecoder;
+import org.hornetq.integration.stomp.Stomp;
+import org.hornetq.integration.stomp.StompFrame;
+import org.hornetq.integration.stomp.StompMarshaller;
 import org.hornetq.spi.core.remoting.Acceptor;
 import org.hornetq.spi.core.remoting.BufferHandler;
 import org.hornetq.spi.core.remoting.Connection;
 import org.hornetq.spi.core.remoting.ConnectionLifeCycleListener;
 import org.hornetq.utils.ConfigurationHelper;
 import org.hornetq.utils.TypedProperties;
+import org.hornetq.utils.UUIDGenerator;
 import org.hornetq.utils.VersionLoader;
 import org.jboss.netty.bootstrap.ServerBootstrap;
+import org.jboss.netty.buffer.ChannelBuffer;
 import org.jboss.netty.channel.Channel;
 import org.jboss.netty.channel.ChannelFactory;
 import org.jboss.netty.channel.ChannelFuture;
@@ -55,6 +67,7 @@
 import org.jboss.netty.channel.ChannelPipelineFactory;
 import org.jboss.netty.channel.ChannelStateEvent;
 import org.jboss.netty.channel.Channels;
+import org.jboss.netty.channel.MessageEvent;
 import org.jboss.netty.channel.group.ChannelGroup;
 import org.jboss.netty.channel.group.ChannelGroupFuture;
 import org.jboss.netty.channel.group.DefaultChannelGroup;
@@ -106,6 +119,7 @@
    private final boolean useInvm;
 
    private final String protocol;
+
    private final String host;
 
    private final int port;
@@ -134,14 +148,19 @@
 
    private VirtualExecutorService bossExecutor;
 
+   private ServerHolder serverHandler;
+
    public NettyAcceptor(final Map<String, Object> configuration,
                         final BufferHandler handler,
+                        final ServerHolder serverHandler,
                         final ConnectionLifeCycleListener listener,
                         final Executor threadPool,
                         final ScheduledExecutorService scheduledThreadPool)
    {
       this.handler = handler;
 
+      this.serverHandler = serverHandler;
+      
       this.listener = listener;
 
       sslEnabled = ConfigurationHelper.getBooleanProperty(TransportConstants.SSL_ENABLED_PROP_NAME,
@@ -181,8 +200,8 @@
                                                        TransportConstants.DEFAULT_USE_INVM,
                                                        configuration);
       protocol = ConfigurationHelper.getStringProperty(TransportConstants.PROTOCOL_PROP_NAME,
-                                                TransportConstants.DEFAULT_PROTOCOL,
-                                                configuration);
+                                                       TransportConstants.DEFAULT_PROTOCOL,
+                                                       configuration);
       host = ConfigurationHelper.getStringProperty(TransportConstants.HOST_PROP_NAME,
                                                    TransportConstants.DEFAULT_HOST,
                                                    configuration);
@@ -286,18 +305,21 @@
                pipeline.addLast("httpResponseEncoder", new HttpResponseEncoder());
                pipeline.addLast("httphandler", new HttpAcceptorHandler(httpKeepAliveRunnable, httpResponseTime));
             }
-            PacketDecoder decoder;
             if (protocol.equals(TransportConstants.STOMP_PROTOCOL))
             {
-               ChannelPipelineSupport.addStompCodecFilter(pipeline, handler);
-               decoder = new StompPacketDecoder();
-            } else
+               ChannelPipelineSupport.addStompStack(pipeline, serverHandler);
+               pipeline.addLast("handler", new StompChannelHandler(serverHandler, new StompMarshaller(), channelGroup, new Listener()));
+            }
+            else
             {
                ChannelPipelineSupport.addHornetQCodecFilter(pipeline, handler);
-               decoder = new CorePacketDecoder();
+               PacketDecoder decoder = new CorePacketDecoder();
+               pipeline.addLast("handler", new HornetQServerChannelHandler(channelGroup,
+                                                                           decoder,
+                                                                           handler,
+                                                                           new Listener()));
             }
-            
-            pipeline.addLast("handler", new HornetQServerChannelHandler(channelGroup, decoder, handler, new Listener()));
+
             return pipeline;
          }
       };
@@ -487,15 +509,115 @@
 
    // Inner classes -----------------------------------------------------------------------------
 
+   private final class HornetQServerChannelHandler extends AbstractServerChannelHandler
+   {
+      private PacketDecoder decoder;
+      private BufferHandler handler;
+
+      HornetQServerChannelHandler(final ChannelGroup group,
+                                   final PacketDecoder decoder,
+                                   final BufferHandler handler,
+                                   final ConnectionLifeCycleListener listener)
+       {
+          super(group, listener);
+          
+          this.decoder = decoder;
+          this.handler = handler;
+       }
+
+      @Override
+      public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception
+      {
+         ChannelBuffer buffer = (ChannelBuffer)e.getMessage();
+
+         handler.bufferReceived(e.getChannel().getId(), new ChannelBufferWrapper(buffer), decoder);
+      }
+      
+   }
+   
    @ChannelPipelineCoverage("one")
-   private final class HornetQServerChannelHandler extends HornetQChannelHandler
+   public final class StompChannelHandler extends AbstractServerChannelHandler
    {
-      HornetQServerChannelHandler(final ChannelGroup group,
-                                  final PacketDecoder decoder,
-                                  final BufferHandler handler,
+      private final StompMarshaller marshaller;
+
+      private ServerHolder serverHandler;
+
+      public StompChannelHandler(ServerHolder serverHolder,
+                                 StompMarshaller marshaller,
+                                 final ChannelGroup group,
+                                 final ConnectionLifeCycleListener listener)
+      {
+         super(group, listener);
+         this.serverHandler = serverHolder;
+         this.marshaller = marshaller;
+      }
+
+      public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception
+      {
+         StompFrame frame = (StompFrame)e.getMessage();
+         System.out.println(">>> got frame " + frame);
+
+         // need to interact with HornetQ server & session
+         HornetQServer server = serverHandler.getServer();
+         RemotingConnection connection = serverHandler.getRemotingConnection(e.getChannel().getId());
+
+         String command = frame.getCommand();
+
+         StompFrame response = null;
+         if (Stomp.Commands.CONNECT.equals(command))
+         {
+            response = onConnect(frame, server, connection);
+         }
+         else
+         {
+            log.error("Unsupported Stomp frame: " + frame);
+         }
+         if (response != null)
+         {
+            System.out.println(">>> will reply " + response);
+            byte[] bytes = marshaller.marshal(response);
+            HornetQBuffer buffer = HornetQBuffers.wrappedBuffer(bytes);
+            System.out.println("ready to send reply: " + buffer);
+            connection.getTransportConnection().write(buffer, true);
+         }
+      }
+
+      private StompFrame onConnect(StompFrame frame, HornetQServer server, RemotingConnection connection) throws Exception
+      {
+         Map<String, Object> headers = frame.getHeaders();
+         String login = (String)headers.get(Stomp.Headers.Connect.LOGIN);
+         String passcode = (String)headers.get(Stomp.Headers.Connect.PASSCODE);
+         String requestID = (String)headers.get(Stomp.Headers.Connect.REQUEST_ID);
+
+         String name = UUIDGenerator.getInstance().generateStringUUID();
+         server.createSession(name,
+                              1,
+                              login,
+                              passcode,
+                              HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE,
+                              VersionLoader.getVersion().getIncrementingVersion(),
+                              connection,
+                              true,
+                              true,
+                              false,
+                              false,
+                              -1);
+         ServerSession session = server.getSession(name);
+         System.out.println(">>> created session " + session);
+         HashMap<String, Object> h = new HashMap<String, Object>();
+         h.put(Stomp.Headers.Connected.SESSION, name);
+         h.put(Stomp.Headers.Connected.RESPONSE_ID, requestID);
+         return new StompFrame(Stomp.Responses.CONNECTED, h, new byte[] {});
+      }
+   }
+   
+   @ChannelPipelineCoverage("one")
+   public abstract class AbstractServerChannelHandler extends HornetQChannelHandler
+   {
+      protected AbstractServerChannelHandler(final ChannelGroup group,
                                   final ConnectionLifeCycleListener listener)
       {
-         super(group, decoder, handler, listener);
+         super(group, listener);
       }
 
       @Override

Modified: branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/NettyAcceptorFactory.java
===================================================================
--- branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/NettyAcceptorFactory.java	2010-01-20 09:17:21 UTC (rev 8808)
+++ branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/NettyAcceptorFactory.java	2010-01-20 11:02:05 UTC (rev 8809)
@@ -32,11 +32,12 @@
 {
    public Acceptor createAcceptor(final Map<String, Object> configuration,
                                   final BufferHandler handler,
+                                  final ServerHolder serverHolder,
                                   final ConnectionLifeCycleListener listener,
                                   final Executor threadPool,
                                   final ScheduledExecutorService scheduledThreadPool)
    {
-      return new NettyAcceptor(configuration, handler, listener, threadPool, scheduledThreadPool);
+      return new NettyAcceptor(configuration, handler, serverHolder, listener, threadPool, scheduledThreadPool);
    }
 
    public Set<String> getAllowableProperties()

Modified: branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/NettyConnector.java
===================================================================
--- branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/NettyConnector.java	2010-01-20 09:17:21 UTC (rev 8808)
+++ branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/NettyConnector.java	2010-01-20 11:02:05 UTC (rev 8809)
@@ -29,6 +29,7 @@
 import javax.net.ssl.SSLException;
 
 import org.hornetq.api.core.HornetQException;
+import org.hornetq.core.buffers.impl.ChannelBufferWrapper;
 import org.hornetq.core.logging.Logger;
 import org.hornetq.core.remoting.PacketDecoder;
 import org.hornetq.core.remoting.impl.CorePacketDecoder;
@@ -441,12 +442,23 @@
    @ChannelPipelineCoverage("one")
    private final class HornetQClientChannelHandler extends HornetQChannelHandler
    {
+      private BufferHandler handler;
+
       HornetQClientChannelHandler(final ChannelGroup group,
                                   final BufferHandler handler,
                                   final ConnectionLifeCycleListener listener)
       {
-         super(group, decoder, handler, listener);
+         super(group, listener);
+         this.handler = handler;
       }
+
+      @Override
+      public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception
+      {
+         ChannelBuffer buffer = (ChannelBuffer)e.getMessage();
+
+         handler.bufferReceived(e.getChannel().getId(), new ChannelBufferWrapper(buffer), decoder);
+      }
    }
 
    @ChannelPipelineCoverage("one")

Added: branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/ServerHolder.java
===================================================================
--- branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/ServerHolder.java	                        (rev 0)
+++ branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/integration/transports/netty/ServerHolder.java	2010-01-20 11:02:05 UTC (rev 8809)
@@ -0,0 +1,31 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.integration.transports.netty;
+
+import org.hornetq.core.remoting.RemotingConnection;
+import org.hornetq.core.server.HornetQServer;
+
+/**
+ * A ServerHolder
+ *
+ * @author jmesnil
+ *
+ *
+ */
+public interface ServerHolder
+{
+   HornetQServer getServer();
+
+   RemotingConnection getRemotingConnection(int connectionID);
+}

Modified: branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/spi/core/remoting/AcceptorFactory.java
===================================================================
--- branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/spi/core/remoting/AcceptorFactory.java	2010-01-20 09:17:21 UTC (rev 8808)
+++ branches/HORNETQ-129_STOMP_protocol/src/main/org/hornetq/spi/core/remoting/AcceptorFactory.java	2010-01-20 11:02:05 UTC (rev 8809)
@@ -18,6 +18,8 @@
 import java.util.concurrent.Executor;
 import java.util.concurrent.ScheduledExecutorService;
 
+import org.hornetq.integration.transports.netty.ServerHolder;
+
 /**
  * A factory for creating acceptors.
  * <p/>
@@ -41,6 +43,7 @@
     */
    Acceptor createAcceptor(final Map<String, Object> configuration,
                            BufferHandler handler,
+                           ServerHolder holder,
                            ConnectionLifeCycleListener listener,
                            Executor threadPool,
                            ScheduledExecutorService scheduledThreadPool);

Modified: branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/integration/http/NettyHttpTest.java
===================================================================
--- branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/integration/http/NettyHttpTest.java	2010-01-20 09:17:21 UTC (rev 8808)
+++ branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/integration/http/NettyHttpTest.java	2010-01-20 11:02:05 UTC (rev 8809)
@@ -32,6 +32,7 @@
 import org.hornetq.spi.core.remoting.BufferHandler;
 import org.hornetq.spi.core.remoting.Connection;
 import org.hornetq.spi.core.remoting.ConnectionLifeCycleListener;
+import org.hornetq.tests.unit.core.remoting.impl.netty.DummyServerHolder;
 import org.hornetq.tests.util.UnitTestCase;
 
 /**
@@ -98,7 +99,8 @@
       conf.put(TransportConstants.HTTP_CLIENT_IDLE_SCAN_PERIOD, -1l);
       DummyConnectionLifeCycleListener acceptorListener = new DummyConnectionLifeCycleListener(connCreatedLatch);
       SimpleBufferHandler acceptorHandler = new SimpleBufferHandler(acceptorLatch);
-      acceptor = new NettyAcceptor(conf, acceptorHandler, acceptorListener, threadPool, scheduledThreadPool);
+      DummyServerHolder serverHolder = new DummyServerHolder();
+      acceptor = new NettyAcceptor(conf, acceptorHandler, serverHolder, acceptorListener, threadPool, scheduledThreadPool);
       acceptor.start();
 
       SimpleBufferHandler2 connectorHandler = new SimpleBufferHandler2(connectorLatch);
@@ -152,7 +154,8 @@
       conf.put(TransportConstants.HTTP_CLIENT_IDLE_SCAN_PERIOD, -1l);
       DummyConnectionLifeCycleListener acceptorListener = new DummyConnectionLifeCycleListener(connCreatedLatch);
       SimpleBufferHandler acceptorHandler = new SimpleBufferHandler(acceptorLatch);
-      acceptor = new NettyAcceptor(conf, acceptorHandler, acceptorListener, threadPool, scheduledThreadPool);
+      DummyServerHolder serverHolder = new DummyServerHolder();
+      acceptor = new NettyAcceptor(conf, acceptorHandler, serverHolder, acceptorListener, threadPool, scheduledThreadPool);
       acceptor.start();
 
       SimpleBufferHandler2 connectorHandler = new SimpleBufferHandler2(connectorLatch);
@@ -210,7 +213,8 @@
       conf.put(TransportConstants.HTTP_CLIENT_IDLE_SCAN_PERIOD, -1l);
       DummyConnectionLifeCycleListener acceptorListener = new DummyConnectionLifeCycleListener(connCreatedLatch);
       SimpleBufferHandler acceptorHandler = new SimpleBufferHandler(acceptorLatch);
-      acceptor = new NettyAcceptor(conf, acceptorHandler, acceptorListener, threadPool, scheduledThreadPool);
+      DummyServerHolder serverHolder = new DummyServerHolder();
+      acceptor = new NettyAcceptor(conf, acceptorHandler, serverHolder, acceptorListener, threadPool, scheduledThreadPool);
       acceptor.start();
 
       SimpleBufferHandler connectorHandler = new SimpleBufferHandler(connectorLatch);
@@ -269,7 +273,8 @@
       conf.put(TransportConstants.HTTP_CLIENT_IDLE_SCAN_PERIOD, -1l);
       DummyConnectionLifeCycleListener acceptorListener = new DummyConnectionLifeCycleListener(connCreatedLatch);
       SimpleBufferHandler acceptorHandler = new SimpleBufferHandler(acceptorLatch);
-      acceptor = new NettyAcceptor(conf, acceptorHandler, acceptorListener, threadPool, scheduledThreadPool);
+      DummyServerHolder serverHolder = new DummyServerHolder();
+      acceptor = new NettyAcceptor(conf, acceptorHandler, serverHolder, acceptorListener, threadPool, scheduledThreadPool);
       acceptor.start();
 
       SimpleBufferHandler connectorHandler = new SimpleBufferHandler(connectorLatch);
@@ -327,7 +332,8 @@
       conf.put(TransportConstants.HTTP_CLIENT_IDLE_PROP_NAME, 500l);
       DummyConnectionLifeCycleListener acceptorListener = new DummyConnectionLifeCycleListener(connCreatedLatch);
       SimpleBufferHandler acceptorHandler = new SimpleBufferHandler(acceptorLatch);
-      acceptor = new NettyAcceptor(conf, acceptorHandler, acceptorListener, threadPool, scheduledThreadPool);
+      DummyServerHolder serverHolder = new DummyServerHolder();
+      acceptor = new NettyAcceptor(conf, acceptorHandler, serverHolder, acceptorListener, threadPool, scheduledThreadPool);
       acceptor.start();
 
       SimpleBufferHandler connectorHandler = new SimpleBufferHandler(connectorLatch);
@@ -381,7 +387,8 @@
       conf.put(TransportConstants.HTTP_SERVER_SCAN_PERIOD_PROP_NAME, 5000l);
       DummyConnectionLifeCycleListener acceptorListener = new DummyConnectionLifeCycleListener(connCreatedLatch);
       SimpleBufferHandler acceptorHandler = new SimpleBufferHandler(acceptorLatch);
-      acceptor = new NettyAcceptor(conf, acceptorHandler, acceptorListener, threadPool, scheduledThreadPool);
+      DummyServerHolder serverHolder = new DummyServerHolder();
+      acceptor = new NettyAcceptor(conf, acceptorHandler, serverHolder, acceptorListener, threadPool, scheduledThreadPool);
       acceptor.start();
 
       BogusResponseHandler connectorHandler = new BogusResponseHandler(connectorLatch);
@@ -428,7 +435,8 @@
       conf.put(TransportConstants.HTTP_SERVER_SCAN_PERIOD_PROP_NAME, 5000l);
       DummyConnectionLifeCycleListener acceptorListener = new DummyConnectionLifeCycleListener(connCreatedLatch);
       SimpleBufferHandler2 acceptorHandler = new SimpleBufferHandler2(acceptorLatch);
-      acceptor = new NettyAcceptor(conf, acceptorHandler, acceptorListener, threadPool, scheduledThreadPool);
+      DummyServerHolder serverHolder = new DummyServerHolder();
+      acceptor = new NettyAcceptor(conf, acceptorHandler, serverHolder, acceptorListener, threadPool, scheduledThreadPool);
       acceptor.start();
 
       BogusResponseHandler connectorHandler = new BogusResponseHandler(connectorLatch);

Added: branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/DummyServerHolder.java
===================================================================
--- branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/DummyServerHolder.java	                        (rev 0)
+++ branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/DummyServerHolder.java	2010-01-20 11:02:05 UTC (rev 8809)
@@ -0,0 +1,57 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.unit.core.remoting.impl.netty;
+
+import org.hornetq.core.remoting.RemotingConnection;
+import org.hornetq.core.server.HornetQServer;
+import org.hornetq.integration.transports.netty.ServerHolder;
+
+/**
+ * A DummyServerHolder
+ *
+ * @author jmesnil
+ *
+ *
+ */
+public class DummyServerHolder implements ServerHolder
+{
+   public HornetQServer getServer()
+   {
+      return null;
+   }
+   
+   public RemotingConnection getRemotingConnection(int connectionID)
+   {
+      return null;
+   }
+
+   // Constants -----------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   // Public --------------------------------------------------------
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+
+}

Modified: branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyAcceptorFactoryTest.java
===================================================================
--- branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyAcceptorFactoryTest.java	2010-01-20 09:17:21 UTC (rev 8808)
+++ branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyAcceptorFactoryTest.java	2010-01-20 11:02:05 UTC (rev 8809)
@@ -24,6 +24,7 @@
 import org.hornetq.core.remoting.PacketDecoder;
 import org.hornetq.integration.transports.netty.NettyAcceptor;
 import org.hornetq.integration.transports.netty.NettyAcceptorFactory;
+import org.hornetq.integration.transports.netty.ServerHolder;
 import org.hornetq.spi.core.remoting.Acceptor;
 import org.hornetq.spi.core.remoting.BufferHandler;
 import org.hornetq.spi.core.remoting.Connection;
@@ -67,9 +68,10 @@
          {
          }
       };
-
+      ServerHolder holder = new DummyServerHolder();
       Acceptor acceptor = factory.createAcceptor(params,
                                                  handler,
+                                                 holder,
                                                  listener,
                                                  Executors.newCachedThreadPool(),
                                                  Executors.newScheduledThreadPool(ConfigurationImpl.DEFAULT_SCHEDULED_THREAD_POOL_MAX_SIZE));

Modified: branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyAcceptorTest.java
===================================================================
--- branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyAcceptorTest.java	2010-01-20 09:17:21 UTC (rev 8808)
+++ branches/HORNETQ-129_STOMP_protocol/tests/src/org/hornetq/tests/unit/core/remoting/impl/netty/NettyAcceptorTest.java	2010-01-20 11:02:05 UTC (rev 8809)
@@ -24,6 +24,7 @@
 import org.hornetq.core.config.impl.ConfigurationImpl;
 import org.hornetq.core.remoting.PacketDecoder;
 import org.hornetq.integration.transports.netty.NettyAcceptor;
+import org.hornetq.integration.transports.netty.ServerHolder;
 import org.hornetq.integration.transports.netty.TransportConstants;
 import org.hornetq.spi.core.remoting.BufferHandler;
 import org.hornetq.spi.core.remoting.Connection;
@@ -81,8 +82,11 @@
          {
          }
       };
+      ServerHolder holder = new DummyServerHolder();
+
       NettyAcceptor acceptor = new NettyAcceptor(params,
                                                  handler,
+                                                 holder,
                                                  listener,
                                                  Executors.newCachedThreadPool(),
                                                  Executors.newScheduledThreadPool(ConfigurationImpl.DEFAULT_SCHEDULED_THREAD_POOL_MAX_SIZE));



More information about the hornetq-commits mailing list