[jboss-cvs] JBoss Messaging SVN: r5455 - in trunk: examples/messaging/src/org/jboss/messaging/example and 5 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Wed Dec 3 08:56:24 EST 2008


Author: ataylor
Date: 2008-12-03 08:56:24 -0500 (Wed, 03 Dec 2008)
New Revision: 5455

Added:
   trunk/examples/messaging/src/org/jboss/messaging/example/HttpClient.java
   trunk/tests/src/org/jboss/messaging/tests/integration/http/
   trunk/tests/src/org/jboss/messaging/tests/integration/http/CoreClientOverHttpTest.java
Modified:
   trunk/build-thirdparty.xml
   trunk/src/config/jbm-configuration.xml
   trunk/src/main/org/jboss/messaging/integration/transports/netty/NettyAcceptor.java
   trunk/src/main/org/jboss/messaging/integration/transports/netty/NettyConnector.java
   trunk/src/main/org/jboss/messaging/integration/transports/netty/TransportConstants.java
   trunk/tests/src/org/jboss/messaging/tests/util/UnitTestCase.java
Log:
netty http integration

Modified: trunk/build-thirdparty.xml
===================================================================
--- trunk/build-thirdparty.xml	2008-12-03 03:59:55 UTC (rev 5454)
+++ trunk/build-thirdparty.xml	2008-12-03 13:56:24 UTC (rev 5455)
@@ -85,7 +85,7 @@
            Dependencies required to build the transport jar
       -->
 
-      <componentref name="netty" version="3.0.2.GA"/>
+      <componentref name="netty" version="3.1.0.ALPHA2"/>
       <componentref name="apache-mina" version="2.0.0-M3-20080730.120633-1"/>
 
       <!--

Added: trunk/examples/messaging/src/org/jboss/messaging/example/HttpClient.java
===================================================================
--- trunk/examples/messaging/src/org/jboss/messaging/example/HttpClient.java	                        (rev 0)
+++ trunk/examples/messaging/src/org/jboss/messaging/example/HttpClient.java	2008-12-03 13:56:24 UTC (rev 5455)
@@ -0,0 +1,85 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.messaging.example;
+
+import org.jboss.messaging.core.client.ClientConsumer;
+import org.jboss.messaging.core.client.ClientMessage;
+import org.jboss.messaging.core.client.ClientProducer;
+import org.jboss.messaging.core.client.ClientSession;
+import org.jboss.messaging.core.client.ClientSessionFactory;
+import org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl;
+import org.jboss.messaging.core.config.TransportConfiguration;
+import org.jboss.messaging.core.exception.MessagingException;
+import org.jboss.messaging.jms.client.JBossTextMessage;
+import org.jboss.messaging.util.SimpleString;
+
+import java.util.HashMap;
+
+/**
+ * Uses the core messaging API to send and receive a message to a queue via http. You will need to enable the server with
+ * the netty http acceptor for this to work.
+ * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
+ */
+public class HttpClient
+{ public static void main(final String[] args)
+   {
+      ClientSession clientSession = null;
+      try
+      {
+         HashMap<String, Object> params = new HashMap<String, Object>();
+         params.put("jbm.remoting.netty.httpenabled", true);
+         params.put("jbm.remoting.netty.port", 8080);
+         ClientSessionFactory sessionFactory =
+            new ClientSessionFactoryImpl(new TransportConfiguration("org.jboss.messaging.integration.transports.netty.NettyConnectorFactory", params));
+         clientSession = sessionFactory.createSession(false, true, true);
+         SimpleString queue = new SimpleString("queuejms.testQueue");
+         ClientProducer clientProducer = clientSession.createProducer(queue);
+         ClientMessage message = clientSession.createClientMessage(JBossTextMessage.TYPE, false, 0,
+                                                       System.currentTimeMillis(), (byte) 1);
+         message.getBody().putString("Hello!");
+         clientProducer.send(message);
+         ClientConsumer clientConsumer = clientSession.createConsumer(queue);
+         clientSession.start();
+         ClientMessage msg = clientConsumer.receive(5000);
+         System.out.println("msg.getPayload() = " + msg.getBody().getString());
+         msg.acknowledge();
+      }
+      catch(Exception e)
+      {
+         e.printStackTrace();
+      }
+      finally
+      {
+         if (clientSession != null)
+         {
+            try
+            {
+               clientSession.close();
+            }
+            catch (MessagingException ignore)
+            {
+            }
+         }
+      }
+   }
+}
+

Modified: trunk/src/config/jbm-configuration.xml
===================================================================
--- trunk/src/config/jbm-configuration.xml	2008-12-03 03:59:55 UTC (rev 5454)
+++ trunk/src/config/jbm-configuration.xml	2008-12-03 13:56:24 UTC (rev 5455)
@@ -120,16 +120,24 @@
       </acceptor>
       <!-- Netty standard TCP acceptor -->
       <acceptor>
-         <factory-class>org.jboss.messaging.integration.transports.netty.NettyAcceptorFactory</factory-class>            	                                 
+         <factory-class>org.jboss.messaging.integration.transports.netty.NettyAcceptorFactory</factory-class>
       </acceptor>   
       <!-- Netty SSL Acceptor
       <acceptor>
-         <factory-class>org.jboss.messaging.integration.transports.netty.NettyAcceptorFactory</factory-class>            	            
+         <factory-class>org.jboss.messaging.integration.transports.netty.NettyAcceptorFactory</factory-class>
          <param key="jbm.remoting.netty.host" value="localhost" type="String"/>
          <param key="jbm.remoting.netty.port" value="5500" type="Integer"/>	                     
          <param key="jbm.remoting.netty.sslenabled" value="true" type="Boolean"/>
       </acceptor> 
-      -->      
+      -->
+       <!-- Netty HTTP Acceptor
+      <acceptor>
+         <factory-class>org.jboss.messaging.integration.transports.netty.NettyAcceptorFactory</factory-class>
+         <param key="jbm.remoting.netty.host" value="localhost" type="String"/>
+         <param key="jbm.remoting.netty.port" value="8080" type="Integer"/>
+         <param key="jbm.remoting.netty.httpenabled" value="true" type="Boolean"/>  
+      </acceptor>-->
+
       <!-- Mina Acceptor -->
       <acceptor>
          <factory-class>org.jboss.messaging.integration.transports.mina.MinaAcceptorFactory</factory-class>           	            

Modified: trunk/src/main/org/jboss/messaging/integration/transports/netty/NettyAcceptor.java
===================================================================
--- trunk/src/main/org/jboss/messaging/integration/transports/netty/NettyAcceptor.java	2008-12-03 03:59:55 UTC (rev 5454)
+++ trunk/src/main/org/jboss/messaging/integration/transports/netty/NettyAcceptor.java	2008-12-03 13:56:24 UTC (rev 5455)
@@ -22,16 +22,6 @@
 
 package org.jboss.messaging.integration.transports.netty;
 
-import static org.jboss.netty.channel.Channels.pipeline;
-
-import java.net.InetSocketAddress;
-import java.util.Map;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-
-import javax.net.ssl.SSLContext;
-
 import org.jboss.messaging.core.logging.Logger;
 import org.jboss.messaging.core.remoting.impl.ssl.SSLSupport;
 import org.jboss.messaging.core.remoting.spi.Acceptor;
@@ -41,6 +31,7 @@
 import org.jboss.messaging.util.ConfigurationHelper;
 import org.jboss.messaging.util.JBMThreadFactory;
 import org.jboss.netty.bootstrap.ServerBootstrap;
+import org.jboss.netty.buffer.ChannelBuffer;
 import org.jboss.netty.channel.Channel;
 import org.jboss.netty.channel.ChannelFactory;
 import org.jboss.netty.channel.ChannelFuture;
@@ -50,96 +41,123 @@
 import org.jboss.netty.channel.ChannelPipelineCoverage;
 import org.jboss.netty.channel.ChannelPipelineFactory;
 import org.jboss.netty.channel.ChannelStateEvent;
+import static org.jboss.netty.channel.Channels.pipeline;
+import static org.jboss.netty.channel.Channels.write;
+import org.jboss.netty.channel.DefaultMessageEvent;
+import org.jboss.netty.channel.MessageEvent;
+import org.jboss.netty.channel.SimpleChannelHandler;
 import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
 import org.jboss.netty.channel.socket.oio.OioServerSocketChannelFactory;
+import org.jboss.netty.handler.codec.http.DefaultHttpResponse;
+import org.jboss.netty.handler.codec.http.HttpHeaders;
+import org.jboss.netty.handler.codec.http.HttpRequest;
+import org.jboss.netty.handler.codec.http.HttpRequestDecoder;
+import org.jboss.netty.handler.codec.http.HttpResponse;
+import org.jboss.netty.handler.codec.http.HttpResponseEncoder;
+import org.jboss.netty.handler.codec.http.HttpResponseStatus;
+import org.jboss.netty.handler.codec.http.HttpVersion;
 import org.jboss.netty.handler.ssl.SslHandler;
 
+import javax.net.ssl.SSLContext;
+import java.net.InetSocketAddress;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
 /**
  * A Netty TCP Acceptor that supports SSL
  *
  * @author <a href="ataylor at redhat.com">Andy Taylor</a>
  * @author <a href="tim.fox at jboss.com">Tim Fox</a>
  * @author <a href="tlee at redhat.com">Trustin Lee</a>
- *
  * @version $Rev$, $Date$
  */
 public class NettyAcceptor implements Acceptor
 {
    private static final Logger log = Logger.getLogger(NettyAcceptor.class);
-   
+
    private ExecutorService bossExecutor;
+
    private ExecutorService workerExecutor;
+
    private ChannelFactory channelFactory;
+
    private Channel serverChannel;
+
    private ServerBootstrap bootstrap;
 
    private final BufferHandler handler;
 
    private final ConnectionLifeCycleListener listener;
-   
+
    private final boolean sslEnabled;
-   
+
+   private final boolean httpEnabled;
+
    private final boolean useNio;
-   
+
    private final String host;
 
    private final int port;
-         
+
    private final String keyStorePath;
- 
+
    private final String keyStorePassword;
-   
+
    private final String trustStorePath;
-   
+
    private final String trustStorePassword;
-   
+
    private final boolean tcpNoDelay;
-   
+
    private final int tcpSendBufferSize;
-   
+
    private final int tcpReceiveBufferSize;
 
-   public NettyAcceptor(final Map<String, Object> configuration,  final BufferHandler handler,
-                       final ConnectionLifeCycleListener listener)
-   {   
+   public NettyAcceptor(final Map<String, Object> configuration, final BufferHandler handler,
+                        final ConnectionLifeCycleListener listener)
+   {
       this.handler = handler;
 
       this.listener = listener;
-      
+
       this.sslEnabled =
-         ConfigurationHelper.getBooleanProperty(TransportConstants.SSL_ENABLED_PROP_NAME, TransportConstants.DEFAULT_SSL_ENABLED, configuration);
-      this.useNio = 
-         ConfigurationHelper.getBooleanProperty(TransportConstants.USE_NIO_PROP_NAME, TransportConstants.DEFAULT_USE_NIO, configuration);
+            ConfigurationHelper.getBooleanProperty(TransportConstants.SSL_ENABLED_PROP_NAME, TransportConstants.DEFAULT_SSL_ENABLED, configuration);
+      this.httpEnabled =
+            ConfigurationHelper.getBooleanProperty(TransportConstants.HTTP_ENABLED_PROP_NAME, TransportConstants.DEFAULT_HTTP_ENABLED, configuration);
+      this.useNio =
+            ConfigurationHelper.getBooleanProperty(TransportConstants.USE_NIO_PROP_NAME, TransportConstants.DEFAULT_USE_NIO, configuration);
       this.host =
-         ConfigurationHelper.getStringProperty(TransportConstants.HOST_PROP_NAME, TransportConstants.DEFAULT_HOST, configuration);
+            ConfigurationHelper.getStringProperty(TransportConstants.HOST_PROP_NAME, TransportConstants.DEFAULT_HOST, configuration);
       this.port =
-         ConfigurationHelper.getIntProperty(TransportConstants.PORT_PROP_NAME, TransportConstants.DEFAULT_PORT, configuration);
+            ConfigurationHelper.getIntProperty(TransportConstants.PORT_PROP_NAME, TransportConstants.DEFAULT_PORT, configuration);
       if (sslEnabled)
       {
          this.keyStorePath =
-            ConfigurationHelper.getStringProperty(TransportConstants.KEYSTORE_PATH_PROP_NAME, TransportConstants.DEFAULT_KEYSTORE_PATH, configuration);
+               ConfigurationHelper.getStringProperty(TransportConstants.KEYSTORE_PATH_PROP_NAME, TransportConstants.DEFAULT_KEYSTORE_PATH, configuration);
          this.keyStorePassword =
-            ConfigurationHelper.getStringProperty(TransportConstants.KEYSTORE_PASSWORD_PROP_NAME, TransportConstants.DEFAULT_KEYSTORE_PASSWORD, configuration);
+               ConfigurationHelper.getStringProperty(TransportConstants.KEYSTORE_PASSWORD_PROP_NAME, TransportConstants.DEFAULT_KEYSTORE_PASSWORD, configuration);
          this.trustStorePath =
-            ConfigurationHelper.getStringProperty(TransportConstants.TRUSTSTORE_PATH_PROP_NAME, TransportConstants.DEFAULT_TRUSTSTORE_PATH, configuration);
+               ConfigurationHelper.getStringProperty(TransportConstants.TRUSTSTORE_PATH_PROP_NAME, TransportConstants.DEFAULT_TRUSTSTORE_PATH, configuration);
          this.trustStorePassword =
-            ConfigurationHelper.getStringProperty(TransportConstants.TRUSTSTORE_PASSWORD_PROP_NAME, TransportConstants.DEFAULT_TRUSTSTORE_PASSWORD, configuration);
-      }   
+               ConfigurationHelper.getStringProperty(TransportConstants.TRUSTSTORE_PASSWORD_PROP_NAME, TransportConstants.DEFAULT_TRUSTSTORE_PASSWORD, configuration);
+      }
       else
       {
          this.keyStorePath = null;
          this.keyStorePassword = null;
          this.trustStorePath = null;
-         this.trustStorePassword = null; 
+         this.trustStorePassword = null;
       }
-      
+
       this.tcpNoDelay =
-         ConfigurationHelper.getBooleanProperty(TransportConstants.TCP_NODELAY_PROPNAME, TransportConstants.DEFAULT_TCP_NODELAY, configuration);
+            ConfigurationHelper.getBooleanProperty(TransportConstants.TCP_NODELAY_PROPNAME, TransportConstants.DEFAULT_TCP_NODELAY, configuration);
       this.tcpSendBufferSize =
-         ConfigurationHelper.getIntProperty(TransportConstants.TCP_SENDBUFFER_SIZE_PROPNAME, TransportConstants.DEFAULT_TCP_SENDBUFFER_SIZE, configuration);
+            ConfigurationHelper.getIntProperty(TransportConstants.TCP_SENDBUFFER_SIZE_PROPNAME, TransportConstants.DEFAULT_TCP_SENDBUFFER_SIZE, configuration);
       this.tcpReceiveBufferSize =
-         ConfigurationHelper.getIntProperty(TransportConstants.TCP_RECEIVEBUFFER_SIZE_PROPNAME, TransportConstants.DEFAULT_TCP_RECEIVEBUFFER_SIZE, configuration);
-    
+            ConfigurationHelper.getIntProperty(TransportConstants.TCP_RECEIVEBUFFER_SIZE_PROPNAME, TransportConstants.DEFAULT_TCP_RECEIVEBUFFER_SIZE, configuration);
+
    }
 
    public synchronized void start() throws Exception
@@ -149,7 +167,7 @@
          //Already started
          return;
       }
- 
+
       bossExecutor = Executors.newCachedThreadPool(new JBMThreadFactory("jbm-netty-acceptor-boss-threads"));
       workerExecutor = Executors.newCachedThreadPool(new JBMThreadFactory("jbm-netty-acceptor-worker-threads"));
       if (useNio)
@@ -187,7 +205,8 @@
          context = null; // Unused
       }
 
-      bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
+      bootstrap.setPipelineFactory(new ChannelPipelineFactory()
+      {
          public ChannelPipeline getPipeline() throws Exception
          {
             ChannelPipeline pipeline = pipeline();
@@ -195,6 +214,13 @@
             {
                ChannelPipelineSupport.addSSLFilter(pipeline, context, false);
             }
+            if (httpEnabled)
+            {
+               pipeline.addLast("httpRequestDecoder", new HttpRequestDecoder());
+               pipeline.addLast("httpResponseEncoder", new HttpResponseEncoder());
+               pipeline.addLast("httphandler", new HttpHandler());
+            }
+
             ChannelPipelineSupport.addCodecFilter(pipeline, handler);
             pipeline.addLast("handler", new MessagingServerChannelHandler(handler, listener));
             return pipeline;
@@ -203,11 +229,11 @@
 
       // Bind
       bootstrap.setOption("localAddress", new InetSocketAddress(host, port));
-      bootstrap.setOption("child.tcpNoDelay", tcpNoDelay);    
+      bootstrap.setOption("child.tcpNoDelay", tcpNoDelay);
       if (tcpReceiveBufferSize != -1)
       {
          bootstrap.setOption("child.receiveBufferSize", tcpReceiveBufferSize);
-      }     
+      }
       if (tcpSendBufferSize != -1)
       {
          bootstrap.setOption("child.sendBufferSize", tcpSendBufferSize);
@@ -229,7 +255,7 @@
       serverChannel.close().awaitUninterruptibly();
       bossExecutor.shutdown();
       workerExecutor.shutdown();
-      for (;;)
+      for (; ;)
       {
          try
          {
@@ -268,10 +294,13 @@
             {
                public void operationComplete(ChannelFuture future) throws Exception
                {
-                  if (future.isSuccess()) {
+                  if (future.isSuccess())
+                  {
                      listener.connectionCreated(tc);
                      active = true;
-                  } else {
+                  }
+                  else
+                  {
                      future.getChannel().close();
                   }
                }
@@ -284,4 +313,26 @@
          }
       }
    }
+
+   @ChannelPipelineCoverage("all")
+   class HttpHandler extends SimpleChannelHandler
+   {
+      @Override
+      public void messageReceived(final ChannelHandlerContext ctx,final MessageEvent e) throws Exception
+      {
+         HttpRequest request = (HttpRequest) e.getMessage();
+         MessageEvent event = new DefaultMessageEvent(e.getChannel(), e.getFuture(), request.getContent(), e.getRemoteAddress());
+         ctx.sendUpstream(event);
+      }
+
+      @Override
+      public void writeRequested(final ChannelHandlerContext ctx,final MessageEvent e) throws Exception
+      {
+         HttpResponse response = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
+         ChannelBuffer buf = (ChannelBuffer) e.getMessage();
+         response.setContent(buf);
+         response.addHeader(HttpHeaders.CONTENT_LENGTH, String.valueOf(buf.writerIndex()));
+         write(ctx, e.getChannel(), e.getFuture(), response, e.getRemoteAddress());
+      }
+   }
 }

Modified: trunk/src/main/org/jboss/messaging/integration/transports/netty/NettyConnector.java
===================================================================
--- trunk/src/main/org/jboss/messaging/integration/transports/netty/NettyConnector.java	2008-12-03 03:59:55 UTC (rev 5454)
+++ trunk/src/main/org/jboss/messaging/integration/transports/netty/NettyConnector.java	2008-12-03 13:56:24 UTC (rev 5455)
@@ -21,17 +21,6 @@
  */
 package org.jboss.messaging.integration.transports.netty;
 
-import static org.jboss.netty.channel.Channels.pipeline;
-
-import java.net.InetSocketAddress;
-import java.util.Map;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-
-import javax.net.ssl.SSLContext;
-import javax.net.ssl.SSLException;
-
 import org.jboss.messaging.core.logging.Logger;
 import org.jboss.messaging.core.remoting.impl.ssl.SSLSupport;
 import org.jboss.messaging.core.remoting.spi.BufferHandler;
@@ -41,16 +30,39 @@
 import org.jboss.messaging.util.ConfigurationHelper;
 import org.jboss.messaging.util.JBMThreadFactory;
 import org.jboss.netty.bootstrap.ClientBootstrap;
+import org.jboss.netty.buffer.ChannelBuffer;
 import org.jboss.netty.channel.Channel;
 import org.jboss.netty.channel.ChannelFactory;
 import org.jboss.netty.channel.ChannelFuture;
+import org.jboss.netty.channel.ChannelHandlerContext;
 import org.jboss.netty.channel.ChannelPipeline;
 import org.jboss.netty.channel.ChannelPipelineCoverage;
 import org.jboss.netty.channel.ChannelPipelineFactory;
+import static org.jboss.netty.channel.Channels.pipeline;
+import static org.jboss.netty.channel.Channels.write;
+import org.jboss.netty.channel.DefaultMessageEvent;
+import org.jboss.netty.channel.MessageEvent;
+import org.jboss.netty.channel.SimpleChannelHandler;
 import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
 import org.jboss.netty.channel.socket.oio.OioClientSocketChannelFactory;
+import org.jboss.netty.handler.codec.http.DefaultHttpRequest;
+import org.jboss.netty.handler.codec.http.HttpHeaders;
+import org.jboss.netty.handler.codec.http.HttpMethod;
+import org.jboss.netty.handler.codec.http.HttpRequest;
+import org.jboss.netty.handler.codec.http.HttpRequestEncoder;
+import org.jboss.netty.handler.codec.http.HttpResponse;
+import org.jboss.netty.handler.codec.http.HttpResponseDecoder;
+import org.jboss.netty.handler.codec.http.HttpVersion;
 import org.jboss.netty.handler.ssl.SslHandler;
 
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLException;
+import java.net.InetSocketAddress;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
 /**
  *
  * A NettyConnector
@@ -80,6 +92,8 @@
 
    private final boolean sslEnabled;
 
+   private final boolean httpEnabled;
+
    private final boolean useNio;
 
    private final String host;
@@ -123,6 +137,9 @@
       this.sslEnabled = ConfigurationHelper.getBooleanProperty(TransportConstants.SSL_ENABLED_PROP_NAME,
                                                                TransportConstants.DEFAULT_SSL_ENABLED,
                                                                configuration);
+      this.httpEnabled =
+                  ConfigurationHelper.getBooleanProperty(TransportConstants.HTTP_ENABLED_PROP_NAME, TransportConstants.DEFAULT_HTTP_ENABLED, configuration);
+
       this.useNio = ConfigurationHelper.getBooleanProperty(TransportConstants.USE_NIO_PROP_NAME,
                                                            TransportConstants.DEFAULT_USE_NIO,
                                                            configuration);
@@ -219,6 +236,12 @@
             {
                ChannelPipelineSupport.addSSLFilter(pipeline, context, true);
             }
+            if (httpEnabled)
+            {
+               pipeline.addLast("httpRequestEncoder", new HttpRequestEncoder());
+               pipeline.addLast("httpResponseDecoder", new HttpResponseDecoder());
+               pipeline.addLast("httphandler", new HttpHandler());
+            }
             ChannelPipelineSupport.addCodecFilter(pipeline, handler);
             pipeline.addLast("handler", new MessagingClientChannelHandler(handler, listener));
             return pipeline;
@@ -327,4 +350,28 @@
          super(handler, listener);
       }
    }
+
+   @ChannelPipelineCoverage("all")
+   class HttpHandler extends SimpleChannelHandler
+   {
+      @Override
+      public void messageReceived(final ChannelHandlerContext ctx, final MessageEvent e) throws Exception
+      {
+         HttpResponse response = (HttpResponse) e.getMessage();
+         MessageEvent event = new DefaultMessageEvent(e.getChannel(), e.getFuture(), response.getContent(), e.getRemoteAddress());
+         ctx.sendUpstream(event);
+      }
+
+      @Override
+      public void writeRequested(final ChannelHandlerContext ctx, final MessageEvent e) throws Exception
+      {
+         HttpRequest httpRequest = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.POST, "/jbm/");
+         ChannelBuffer buf = (ChannelBuffer) e.getMessage();
+         httpRequest.setContent(buf);
+         httpRequest.addHeader(HttpHeaders.CONTENT_LENGTH, String.valueOf(buf.writerIndex()));
+         write(ctx, e.getChannel(), e.getFuture(), httpRequest, e.getRemoteAddress());
+      }
+   }
+
+
 }

Modified: trunk/src/main/org/jboss/messaging/integration/transports/netty/TransportConstants.java
===================================================================
--- trunk/src/main/org/jboss/messaging/integration/transports/netty/TransportConstants.java	2008-12-03 03:59:55 UTC (rev 5454)
+++ trunk/src/main/org/jboss/messaging/integration/transports/netty/TransportConstants.java	2008-12-03 13:56:24 UTC (rev 5455)
@@ -31,6 +31,8 @@
 {
    public static final String SSL_ENABLED_PROP_NAME = "jbm.remoting.netty.sslenabled";
    
+   public static final String HTTP_ENABLED_PROP_NAME = "jbm.remoting.netty.httpenabled";
+   
    public static final String USE_NIO_PROP_NAME = "jbm.remoting.netty.usenio";
    
    public static final String HOST_PROP_NAME = "jbm.remoting.netty.host";
@@ -71,5 +73,7 @@
    
    public static final int DEFAULT_TCP_SENDBUFFER_SIZE = 32768;
    
-   public static final int DEFAULT_TCP_RECEIVEBUFFER_SIZE = 32768;  
+   public static final int DEFAULT_TCP_RECEIVEBUFFER_SIZE = 32768;
+
+   public static final boolean DEFAULT_HTTP_ENABLED = false;
 }

Added: trunk/tests/src/org/jboss/messaging/tests/integration/http/CoreClientOverHttpTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/http/CoreClientOverHttpTest.java	                        (rev 0)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/http/CoreClientOverHttpTest.java	2008-12-03 13:56:24 UTC (rev 5455)
@@ -0,0 +1,98 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.messaging.tests.integration.http;
+
+import org.jboss.messaging.core.client.ClientConsumer;
+import org.jboss.messaging.core.client.ClientMessage;
+import org.jboss.messaging.core.client.ClientProducer;
+import org.jboss.messaging.core.client.ClientSession;
+import org.jboss.messaging.core.client.ClientSessionFactory;
+import org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl;
+import org.jboss.messaging.core.config.Configuration;
+import org.jboss.messaging.core.config.TransportConfiguration;
+import org.jboss.messaging.core.config.impl.ConfigurationImpl;
+import org.jboss.messaging.core.server.MessagingService;
+import org.jboss.messaging.core.server.impl.MessagingServiceImpl;
+import org.jboss.messaging.jms.client.JBossTextMessage;
+import org.jboss.messaging.tests.util.UnitTestCase;
+import org.jboss.messaging.util.SimpleString;
+
+import java.util.HashMap;
+
+/**
+ * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
+ */
+public class CoreClientOverHttpTest extends UnitTestCase
+{
+   public void testCoreHttpClient() throws Exception
+   {
+      final SimpleString QUEUE = new SimpleString("CoreClientOverHttpTestQueue");
+
+      Configuration conf = new ConfigurationImpl();
+
+      conf.setSecurityEnabled(false);
+
+      HashMap<String, Object> params = new HashMap<String, Object>();
+      params.put("jbm.remoting.netty.httpenabled", true);
+      conf.getAcceptorConfigurations().add(new TransportConfiguration(NETTY_ACCEPTOR_FACTORY, params));
+
+      MessagingService messagingService = MessagingServiceImpl.newNullStorageMessagingServer(conf);
+
+      messagingService.start();
+
+      ClientSessionFactory sf = new ClientSessionFactoryImpl(new TransportConfiguration(NETTY_CONNECTOR_FACTORY, params));
+
+      ClientSession session = sf.createSession(false, true, true);
+
+      session.createQueue(QUEUE, QUEUE, null, false, false, true);
+      
+      ClientProducer producer = session.createProducer(QUEUE);
+
+      final int numMessages = 100;
+
+      for (int i = 0; i < numMessages; i++)
+      {
+         ClientMessage message = session.createClientMessage(JBossTextMessage.TYPE, false, 0,
+               System.currentTimeMillis(), (byte) 1);
+         message.getBody().putString("CoreClientOverHttpTest");
+         message.getBody().flip();
+         producer.send(message);
+      }
+
+      ClientConsumer consumer = session.createConsumer(QUEUE);
+
+      session.start();
+
+      for (int i = 0; i < numMessages; i++)
+      {
+         ClientMessage message2 = consumer.receive();
+
+         assertEquals("CoreClientOverHttpTest", message2.getBody().getString());
+
+         message2.acknowledge();
+      }
+
+      session.close();
+
+      messagingService.stop();
+   }
+}

Modified: trunk/tests/src/org/jboss/messaging/tests/util/UnitTestCase.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/util/UnitTestCase.java	2008-12-03 03:59:55 UTC (rev 5454)
+++ trunk/tests/src/org/jboss/messaging/tests/util/UnitTestCase.java	2008-12-03 13:56:24 UTC (rev 5455)
@@ -22,22 +22,7 @@
 
 package org.jboss.messaging.tests.util;
 
-import java.io.BufferedInputStream;
-import java.io.BufferedOutputStream;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileOutputStream;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.nio.ByteBuffer;
-import java.util.Iterator;
-import java.util.List;
-import java.util.UUID;
-
-import javax.transaction.xa.Xid;
-
 import junit.framework.TestCase;
-
 import org.easymock.EasyMock;
 import org.easymock.IArgumentMatcher;
 import org.jboss.messaging.core.client.ClientMessage;
@@ -52,10 +37,23 @@
 import org.jboss.messaging.core.server.impl.ServerMessageImpl;
 import org.jboss.messaging.jms.client.JBossTextMessage;
 
+import javax.transaction.xa.Xid;
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.util.Iterator;
+import java.util.List;
+import java.util.UUID;
+
 /**
- * 
+ *
  * Helper base class for our unit tests
- * 
+ *
  * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
  * @author <a href="mailto:csuconic at redhat.com">Clebert</a>
  *
@@ -68,6 +66,9 @@
 
    public static final String INVM_CONNECTOR_FACTORY = "org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory";
 
+   public static final String NETTY_ACCEPTOR_FACTORY = "org.jboss.messaging.integration.transports.netty.NettyAcceptorFactory";
+
+   public static final String NETTY_CONNECTOR_FACTORY = "org.jboss.messaging.integration.transports.netty.NettyConnectorFactory";
    // Attributes ----------------------------------------------------
 
    private String testDir = System.getProperty("java.io.tmpdir", "/tmp") + "/jbm-unit-test";




More information about the jboss-cvs-commits mailing list