[jboss-cvs] JBoss Messaging SVN: r5455 - in trunk: examples/messaging/src/org/jboss/messaging/example and 5 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Wed Dec 3 08:56:24 EST 2008
Author: ataylor
Date: 2008-12-03 08:56:24 -0500 (Wed, 03 Dec 2008)
New Revision: 5455
Added:
trunk/examples/messaging/src/org/jboss/messaging/example/HttpClient.java
trunk/tests/src/org/jboss/messaging/tests/integration/http/
trunk/tests/src/org/jboss/messaging/tests/integration/http/CoreClientOverHttpTest.java
Modified:
trunk/build-thirdparty.xml
trunk/src/config/jbm-configuration.xml
trunk/src/main/org/jboss/messaging/integration/transports/netty/NettyAcceptor.java
trunk/src/main/org/jboss/messaging/integration/transports/netty/NettyConnector.java
trunk/src/main/org/jboss/messaging/integration/transports/netty/TransportConstants.java
trunk/tests/src/org/jboss/messaging/tests/util/UnitTestCase.java
Log:
netty http integration
Modified: trunk/build-thirdparty.xml
===================================================================
--- trunk/build-thirdparty.xml 2008-12-03 03:59:55 UTC (rev 5454)
+++ trunk/build-thirdparty.xml 2008-12-03 13:56:24 UTC (rev 5455)
@@ -85,7 +85,7 @@
Dependencies required to build the transport jar
-->
- <componentref name="netty" version="3.0.2.GA"/>
+ <componentref name="netty" version="3.1.0.ALPHA2"/>
<componentref name="apache-mina" version="2.0.0-M3-20080730.120633-1"/>
<!--
Added: trunk/examples/messaging/src/org/jboss/messaging/example/HttpClient.java
===================================================================
--- trunk/examples/messaging/src/org/jboss/messaging/example/HttpClient.java (rev 0)
+++ trunk/examples/messaging/src/org/jboss/messaging/example/HttpClient.java 2008-12-03 13:56:24 UTC (rev 5455)
@@ -0,0 +1,85 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.messaging.example;
+
+import org.jboss.messaging.core.client.ClientConsumer;
+import org.jboss.messaging.core.client.ClientMessage;
+import org.jboss.messaging.core.client.ClientProducer;
+import org.jboss.messaging.core.client.ClientSession;
+import org.jboss.messaging.core.client.ClientSessionFactory;
+import org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl;
+import org.jboss.messaging.core.config.TransportConfiguration;
+import org.jboss.messaging.core.exception.MessagingException;
+import org.jboss.messaging.jms.client.JBossTextMessage;
+import org.jboss.messaging.util.SimpleString;
+
+import java.util.HashMap;
+
+/**
+ * Uses the core messaging API to send and receive a message to a queue via http. You will need to enable the server with
+ * the netty http acceptor for this to work.
+ * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
+ */
+public class HttpClient
+{ public static void main(final String[] args)
+ {
+ ClientSession clientSession = null;
+ try
+ {
+ HashMap<String, Object> params = new HashMap<String, Object>();
+ params.put("jbm.remoting.netty.httpenabled", true);
+ params.put("jbm.remoting.netty.port", 8080);
+ ClientSessionFactory sessionFactory =
+ new ClientSessionFactoryImpl(new TransportConfiguration("org.jboss.messaging.integration.transports.netty.NettyConnectorFactory", params));
+ clientSession = sessionFactory.createSession(false, true, true);
+ SimpleString queue = new SimpleString("queuejms.testQueue");
+ ClientProducer clientProducer = clientSession.createProducer(queue);
+ ClientMessage message = clientSession.createClientMessage(JBossTextMessage.TYPE, false, 0,
+ System.currentTimeMillis(), (byte) 1);
+ message.getBody().putString("Hello!");
+ clientProducer.send(message);
+ ClientConsumer clientConsumer = clientSession.createConsumer(queue);
+ clientSession.start();
+ ClientMessage msg = clientConsumer.receive(5000);
+ System.out.println("msg.getPayload() = " + msg.getBody().getString());
+ msg.acknowledge();
+ }
+ catch(Exception e)
+ {
+ e.printStackTrace();
+ }
+ finally
+ {
+ if (clientSession != null)
+ {
+ try
+ {
+ clientSession.close();
+ }
+ catch (MessagingException ignore)
+ {
+ }
+ }
+ }
+ }
+}
+
Modified: trunk/src/config/jbm-configuration.xml
===================================================================
--- trunk/src/config/jbm-configuration.xml 2008-12-03 03:59:55 UTC (rev 5454)
+++ trunk/src/config/jbm-configuration.xml 2008-12-03 13:56:24 UTC (rev 5455)
@@ -120,16 +120,24 @@
</acceptor>
<!-- Netty standard TCP acceptor -->
<acceptor>
- <factory-class>org.jboss.messaging.integration.transports.netty.NettyAcceptorFactory</factory-class>
+ <factory-class>org.jboss.messaging.integration.transports.netty.NettyAcceptorFactory</factory-class>
</acceptor>
<!-- Netty SSL Acceptor
<acceptor>
- <factory-class>org.jboss.messaging.integration.transports.netty.NettyAcceptorFactory</factory-class>
+ <factory-class>org.jboss.messaging.integration.transports.netty.NettyAcceptorFactory</factory-class>
<param key="jbm.remoting.netty.host" value="localhost" type="String"/>
<param key="jbm.remoting.netty.port" value="5500" type="Integer"/>
<param key="jbm.remoting.netty.sslenabled" value="true" type="Boolean"/>
</acceptor>
- -->
+ -->
+ <!-- Netty HTTP Acceptor
+ <acceptor>
+ <factory-class>org.jboss.messaging.integration.transports.netty.NettyAcceptorFactory</factory-class>
+ <param key="jbm.remoting.netty.host" value="localhost" type="String"/>
+ <param key="jbm.remoting.netty.port" value="8080" type="Integer"/>
+ <param key="jbm.remoting.netty.httpenabled" value="true" type="Boolean"/>
+ </acceptor>-->
+
<!-- Mina Acceptor -->
<acceptor>
<factory-class>org.jboss.messaging.integration.transports.mina.MinaAcceptorFactory</factory-class>
Modified: trunk/src/main/org/jboss/messaging/integration/transports/netty/NettyAcceptor.java
===================================================================
--- trunk/src/main/org/jboss/messaging/integration/transports/netty/NettyAcceptor.java 2008-12-03 03:59:55 UTC (rev 5454)
+++ trunk/src/main/org/jboss/messaging/integration/transports/netty/NettyAcceptor.java 2008-12-03 13:56:24 UTC (rev 5455)
@@ -22,16 +22,6 @@
package org.jboss.messaging.integration.transports.netty;
-import static org.jboss.netty.channel.Channels.pipeline;
-
-import java.net.InetSocketAddress;
-import java.util.Map;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-
-import javax.net.ssl.SSLContext;
-
import org.jboss.messaging.core.logging.Logger;
import org.jboss.messaging.core.remoting.impl.ssl.SSLSupport;
import org.jboss.messaging.core.remoting.spi.Acceptor;
@@ -41,6 +31,7 @@
import org.jboss.messaging.util.ConfigurationHelper;
import org.jboss.messaging.util.JBMThreadFactory;
import org.jboss.netty.bootstrap.ServerBootstrap;
+import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFactory;
import org.jboss.netty.channel.ChannelFuture;
@@ -50,96 +41,123 @@
import org.jboss.netty.channel.ChannelPipelineCoverage;
import org.jboss.netty.channel.ChannelPipelineFactory;
import org.jboss.netty.channel.ChannelStateEvent;
+import static org.jboss.netty.channel.Channels.pipeline;
+import static org.jboss.netty.channel.Channels.write;
+import org.jboss.netty.channel.DefaultMessageEvent;
+import org.jboss.netty.channel.MessageEvent;
+import org.jboss.netty.channel.SimpleChannelHandler;
import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
import org.jboss.netty.channel.socket.oio.OioServerSocketChannelFactory;
+import org.jboss.netty.handler.codec.http.DefaultHttpResponse;
+import org.jboss.netty.handler.codec.http.HttpHeaders;
+import org.jboss.netty.handler.codec.http.HttpRequest;
+import org.jboss.netty.handler.codec.http.HttpRequestDecoder;
+import org.jboss.netty.handler.codec.http.HttpResponse;
+import org.jboss.netty.handler.codec.http.HttpResponseEncoder;
+import org.jboss.netty.handler.codec.http.HttpResponseStatus;
+import org.jboss.netty.handler.codec.http.HttpVersion;
import org.jboss.netty.handler.ssl.SslHandler;
+import javax.net.ssl.SSLContext;
+import java.net.InetSocketAddress;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
/**
* A Netty TCP Acceptor that supports SSL
*
* @author <a href="ataylor at redhat.com">Andy Taylor</a>
* @author <a href="tim.fox at jboss.com">Tim Fox</a>
* @author <a href="tlee at redhat.com">Trustin Lee</a>
- *
* @version $Rev$, $Date$
*/
public class NettyAcceptor implements Acceptor
{
private static final Logger log = Logger.getLogger(NettyAcceptor.class);
-
+
private ExecutorService bossExecutor;
+
private ExecutorService workerExecutor;
+
private ChannelFactory channelFactory;
+
private Channel serverChannel;
+
private ServerBootstrap bootstrap;
private final BufferHandler handler;
private final ConnectionLifeCycleListener listener;
-
+
private final boolean sslEnabled;
-
+
+ private final boolean httpEnabled;
+
private final boolean useNio;
-
+
private final String host;
private final int port;
-
+
private final String keyStorePath;
-
+
private final String keyStorePassword;
-
+
private final String trustStorePath;
-
+
private final String trustStorePassword;
-
+
private final boolean tcpNoDelay;
-
+
private final int tcpSendBufferSize;
-
+
private final int tcpReceiveBufferSize;
- public NettyAcceptor(final Map<String, Object> configuration, final BufferHandler handler,
- final ConnectionLifeCycleListener listener)
- {
+ public NettyAcceptor(final Map<String, Object> configuration, final BufferHandler handler,
+ final ConnectionLifeCycleListener listener)
+ {
this.handler = handler;
this.listener = listener;
-
+
this.sslEnabled =
- ConfigurationHelper.getBooleanProperty(TransportConstants.SSL_ENABLED_PROP_NAME, TransportConstants.DEFAULT_SSL_ENABLED, configuration);
- this.useNio =
- ConfigurationHelper.getBooleanProperty(TransportConstants.USE_NIO_PROP_NAME, TransportConstants.DEFAULT_USE_NIO, configuration);
+ ConfigurationHelper.getBooleanProperty(TransportConstants.SSL_ENABLED_PROP_NAME, TransportConstants.DEFAULT_SSL_ENABLED, configuration);
+ this.httpEnabled =
+ ConfigurationHelper.getBooleanProperty(TransportConstants.HTTP_ENABLED_PROP_NAME, TransportConstants.DEFAULT_HTTP_ENABLED, configuration);
+ this.useNio =
+ ConfigurationHelper.getBooleanProperty(TransportConstants.USE_NIO_PROP_NAME, TransportConstants.DEFAULT_USE_NIO, configuration);
this.host =
- ConfigurationHelper.getStringProperty(TransportConstants.HOST_PROP_NAME, TransportConstants.DEFAULT_HOST, configuration);
+ ConfigurationHelper.getStringProperty(TransportConstants.HOST_PROP_NAME, TransportConstants.DEFAULT_HOST, configuration);
this.port =
- ConfigurationHelper.getIntProperty(TransportConstants.PORT_PROP_NAME, TransportConstants.DEFAULT_PORT, configuration);
+ ConfigurationHelper.getIntProperty(TransportConstants.PORT_PROP_NAME, TransportConstants.DEFAULT_PORT, configuration);
if (sslEnabled)
{
this.keyStorePath =
- ConfigurationHelper.getStringProperty(TransportConstants.KEYSTORE_PATH_PROP_NAME, TransportConstants.DEFAULT_KEYSTORE_PATH, configuration);
+ ConfigurationHelper.getStringProperty(TransportConstants.KEYSTORE_PATH_PROP_NAME, TransportConstants.DEFAULT_KEYSTORE_PATH, configuration);
this.keyStorePassword =
- ConfigurationHelper.getStringProperty(TransportConstants.KEYSTORE_PASSWORD_PROP_NAME, TransportConstants.DEFAULT_KEYSTORE_PASSWORD, configuration);
+ ConfigurationHelper.getStringProperty(TransportConstants.KEYSTORE_PASSWORD_PROP_NAME, TransportConstants.DEFAULT_KEYSTORE_PASSWORD, configuration);
this.trustStorePath =
- ConfigurationHelper.getStringProperty(TransportConstants.TRUSTSTORE_PATH_PROP_NAME, TransportConstants.DEFAULT_TRUSTSTORE_PATH, configuration);
+ ConfigurationHelper.getStringProperty(TransportConstants.TRUSTSTORE_PATH_PROP_NAME, TransportConstants.DEFAULT_TRUSTSTORE_PATH, configuration);
this.trustStorePassword =
- ConfigurationHelper.getStringProperty(TransportConstants.TRUSTSTORE_PASSWORD_PROP_NAME, TransportConstants.DEFAULT_TRUSTSTORE_PASSWORD, configuration);
- }
+ ConfigurationHelper.getStringProperty(TransportConstants.TRUSTSTORE_PASSWORD_PROP_NAME, TransportConstants.DEFAULT_TRUSTSTORE_PASSWORD, configuration);
+ }
else
{
this.keyStorePath = null;
this.keyStorePassword = null;
this.trustStorePath = null;
- this.trustStorePassword = null;
+ this.trustStorePassword = null;
}
-
+
this.tcpNoDelay =
- ConfigurationHelper.getBooleanProperty(TransportConstants.TCP_NODELAY_PROPNAME, TransportConstants.DEFAULT_TCP_NODELAY, configuration);
+ ConfigurationHelper.getBooleanProperty(TransportConstants.TCP_NODELAY_PROPNAME, TransportConstants.DEFAULT_TCP_NODELAY, configuration);
this.tcpSendBufferSize =
- ConfigurationHelper.getIntProperty(TransportConstants.TCP_SENDBUFFER_SIZE_PROPNAME, TransportConstants.DEFAULT_TCP_SENDBUFFER_SIZE, configuration);
+ ConfigurationHelper.getIntProperty(TransportConstants.TCP_SENDBUFFER_SIZE_PROPNAME, TransportConstants.DEFAULT_TCP_SENDBUFFER_SIZE, configuration);
this.tcpReceiveBufferSize =
- ConfigurationHelper.getIntProperty(TransportConstants.TCP_RECEIVEBUFFER_SIZE_PROPNAME, TransportConstants.DEFAULT_TCP_RECEIVEBUFFER_SIZE, configuration);
-
+ ConfigurationHelper.getIntProperty(TransportConstants.TCP_RECEIVEBUFFER_SIZE_PROPNAME, TransportConstants.DEFAULT_TCP_RECEIVEBUFFER_SIZE, configuration);
+
}
public synchronized void start() throws Exception
@@ -149,7 +167,7 @@
//Already started
return;
}
-
+
bossExecutor = Executors.newCachedThreadPool(new JBMThreadFactory("jbm-netty-acceptor-boss-threads"));
workerExecutor = Executors.newCachedThreadPool(new JBMThreadFactory("jbm-netty-acceptor-worker-threads"));
if (useNio)
@@ -187,7 +205,8 @@
context = null; // Unused
}
- bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
+ bootstrap.setPipelineFactory(new ChannelPipelineFactory()
+ {
public ChannelPipeline getPipeline() throws Exception
{
ChannelPipeline pipeline = pipeline();
@@ -195,6 +214,13 @@
{
ChannelPipelineSupport.addSSLFilter(pipeline, context, false);
}
+ if (httpEnabled)
+ {
+ pipeline.addLast("httpRequestDecoder", new HttpRequestDecoder());
+ pipeline.addLast("httpResponseEncoder", new HttpResponseEncoder());
+ pipeline.addLast("httphandler", new HttpHandler());
+ }
+
ChannelPipelineSupport.addCodecFilter(pipeline, handler);
pipeline.addLast("handler", new MessagingServerChannelHandler(handler, listener));
return pipeline;
@@ -203,11 +229,11 @@
// Bind
bootstrap.setOption("localAddress", new InetSocketAddress(host, port));
- bootstrap.setOption("child.tcpNoDelay", tcpNoDelay);
+ bootstrap.setOption("child.tcpNoDelay", tcpNoDelay);
if (tcpReceiveBufferSize != -1)
{
bootstrap.setOption("child.receiveBufferSize", tcpReceiveBufferSize);
- }
+ }
if (tcpSendBufferSize != -1)
{
bootstrap.setOption("child.sendBufferSize", tcpSendBufferSize);
@@ -229,7 +255,7 @@
serverChannel.close().awaitUninterruptibly();
bossExecutor.shutdown();
workerExecutor.shutdown();
- for (;;)
+ for (; ;)
{
try
{
@@ -268,10 +294,13 @@
{
public void operationComplete(ChannelFuture future) throws Exception
{
- if (future.isSuccess()) {
+ if (future.isSuccess())
+ {
listener.connectionCreated(tc);
active = true;
- } else {
+ }
+ else
+ {
future.getChannel().close();
}
}
@@ -284,4 +313,26 @@
}
}
}
+
+ @ChannelPipelineCoverage("all")
+ class HttpHandler extends SimpleChannelHandler
+ {
+ @Override
+ public void messageReceived(final ChannelHandlerContext ctx,final MessageEvent e) throws Exception
+ {
+ HttpRequest request = (HttpRequest) e.getMessage();
+ MessageEvent event = new DefaultMessageEvent(e.getChannel(), e.getFuture(), request.getContent(), e.getRemoteAddress());
+ ctx.sendUpstream(event);
+ }
+
+ @Override
+ public void writeRequested(final ChannelHandlerContext ctx,final MessageEvent e) throws Exception
+ {
+ HttpResponse response = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
+ ChannelBuffer buf = (ChannelBuffer) e.getMessage();
+ response.setContent(buf);
+ response.addHeader(HttpHeaders.CONTENT_LENGTH, String.valueOf(buf.writerIndex()));
+ write(ctx, e.getChannel(), e.getFuture(), response, e.getRemoteAddress());
+ }
+ }
}
Modified: trunk/src/main/org/jboss/messaging/integration/transports/netty/NettyConnector.java
===================================================================
--- trunk/src/main/org/jboss/messaging/integration/transports/netty/NettyConnector.java 2008-12-03 03:59:55 UTC (rev 5454)
+++ trunk/src/main/org/jboss/messaging/integration/transports/netty/NettyConnector.java 2008-12-03 13:56:24 UTC (rev 5455)
@@ -21,17 +21,6 @@
*/
package org.jboss.messaging.integration.transports.netty;
-import static org.jboss.netty.channel.Channels.pipeline;
-
-import java.net.InetSocketAddress;
-import java.util.Map;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-
-import javax.net.ssl.SSLContext;
-import javax.net.ssl.SSLException;
-
import org.jboss.messaging.core.logging.Logger;
import org.jboss.messaging.core.remoting.impl.ssl.SSLSupport;
import org.jboss.messaging.core.remoting.spi.BufferHandler;
@@ -41,16 +30,39 @@
import org.jboss.messaging.util.ConfigurationHelper;
import org.jboss.messaging.util.JBMThreadFactory;
import org.jboss.netty.bootstrap.ClientBootstrap;
+import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFactory;
import org.jboss.netty.channel.ChannelFuture;
+import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelPipelineCoverage;
import org.jboss.netty.channel.ChannelPipelineFactory;
+import static org.jboss.netty.channel.Channels.pipeline;
+import static org.jboss.netty.channel.Channels.write;
+import org.jboss.netty.channel.DefaultMessageEvent;
+import org.jboss.netty.channel.MessageEvent;
+import org.jboss.netty.channel.SimpleChannelHandler;
import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
import org.jboss.netty.channel.socket.oio.OioClientSocketChannelFactory;
+import org.jboss.netty.handler.codec.http.DefaultHttpRequest;
+import org.jboss.netty.handler.codec.http.HttpHeaders;
+import org.jboss.netty.handler.codec.http.HttpMethod;
+import org.jboss.netty.handler.codec.http.HttpRequest;
+import org.jboss.netty.handler.codec.http.HttpRequestEncoder;
+import org.jboss.netty.handler.codec.http.HttpResponse;
+import org.jboss.netty.handler.codec.http.HttpResponseDecoder;
+import org.jboss.netty.handler.codec.http.HttpVersion;
import org.jboss.netty.handler.ssl.SslHandler;
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLException;
+import java.net.InetSocketAddress;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
/**
*
* A NettyConnector
@@ -80,6 +92,8 @@
private final boolean sslEnabled;
+ private final boolean httpEnabled;
+
private final boolean useNio;
private final String host;
@@ -123,6 +137,9 @@
this.sslEnabled = ConfigurationHelper.getBooleanProperty(TransportConstants.SSL_ENABLED_PROP_NAME,
TransportConstants.DEFAULT_SSL_ENABLED,
configuration);
+ this.httpEnabled =
+ ConfigurationHelper.getBooleanProperty(TransportConstants.HTTP_ENABLED_PROP_NAME, TransportConstants.DEFAULT_HTTP_ENABLED, configuration);
+
this.useNio = ConfigurationHelper.getBooleanProperty(TransportConstants.USE_NIO_PROP_NAME,
TransportConstants.DEFAULT_USE_NIO,
configuration);
@@ -219,6 +236,12 @@
{
ChannelPipelineSupport.addSSLFilter(pipeline, context, true);
}
+ if (httpEnabled)
+ {
+ pipeline.addLast("httpRequestEncoder", new HttpRequestEncoder());
+ pipeline.addLast("httpResponseDecoder", new HttpResponseDecoder());
+ pipeline.addLast("httphandler", new HttpHandler());
+ }
ChannelPipelineSupport.addCodecFilter(pipeline, handler);
pipeline.addLast("handler", new MessagingClientChannelHandler(handler, listener));
return pipeline;
@@ -327,4 +350,28 @@
super(handler, listener);
}
}
+
+ @ChannelPipelineCoverage("all")
+ class HttpHandler extends SimpleChannelHandler
+ {
+ @Override
+ public void messageReceived(final ChannelHandlerContext ctx, final MessageEvent e) throws Exception
+ {
+ HttpResponse response = (HttpResponse) e.getMessage();
+ MessageEvent event = new DefaultMessageEvent(e.getChannel(), e.getFuture(), response.getContent(), e.getRemoteAddress());
+ ctx.sendUpstream(event);
+ }
+
+ @Override
+ public void writeRequested(final ChannelHandlerContext ctx, final MessageEvent e) throws Exception
+ {
+ HttpRequest httpRequest = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.POST, "/jbm/");
+ ChannelBuffer buf = (ChannelBuffer) e.getMessage();
+ httpRequest.setContent(buf);
+ httpRequest.addHeader(HttpHeaders.CONTENT_LENGTH, String.valueOf(buf.writerIndex()));
+ write(ctx, e.getChannel(), e.getFuture(), httpRequest, e.getRemoteAddress());
+ }
+ }
+
+
}
Modified: trunk/src/main/org/jboss/messaging/integration/transports/netty/TransportConstants.java
===================================================================
--- trunk/src/main/org/jboss/messaging/integration/transports/netty/TransportConstants.java 2008-12-03 03:59:55 UTC (rev 5454)
+++ trunk/src/main/org/jboss/messaging/integration/transports/netty/TransportConstants.java 2008-12-03 13:56:24 UTC (rev 5455)
@@ -31,6 +31,8 @@
{
public static final String SSL_ENABLED_PROP_NAME = "jbm.remoting.netty.sslenabled";
+ public static final String HTTP_ENABLED_PROP_NAME = "jbm.remoting.netty.httpenabled";
+
public static final String USE_NIO_PROP_NAME = "jbm.remoting.netty.usenio";
public static final String HOST_PROP_NAME = "jbm.remoting.netty.host";
@@ -71,5 +73,7 @@
public static final int DEFAULT_TCP_SENDBUFFER_SIZE = 32768;
- public static final int DEFAULT_TCP_RECEIVEBUFFER_SIZE = 32768;
+ public static final int DEFAULT_TCP_RECEIVEBUFFER_SIZE = 32768;
+
+ public static final boolean DEFAULT_HTTP_ENABLED = false;
}
Added: trunk/tests/src/org/jboss/messaging/tests/integration/http/CoreClientOverHttpTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/http/CoreClientOverHttpTest.java (rev 0)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/http/CoreClientOverHttpTest.java 2008-12-03 13:56:24 UTC (rev 5455)
@@ -0,0 +1,98 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.messaging.tests.integration.http;
+
+import org.jboss.messaging.core.client.ClientConsumer;
+import org.jboss.messaging.core.client.ClientMessage;
+import org.jboss.messaging.core.client.ClientProducer;
+import org.jboss.messaging.core.client.ClientSession;
+import org.jboss.messaging.core.client.ClientSessionFactory;
+import org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl;
+import org.jboss.messaging.core.config.Configuration;
+import org.jboss.messaging.core.config.TransportConfiguration;
+import org.jboss.messaging.core.config.impl.ConfigurationImpl;
+import org.jboss.messaging.core.server.MessagingService;
+import org.jboss.messaging.core.server.impl.MessagingServiceImpl;
+import org.jboss.messaging.jms.client.JBossTextMessage;
+import org.jboss.messaging.tests.util.UnitTestCase;
+import org.jboss.messaging.util.SimpleString;
+
+import java.util.HashMap;
+
+/**
+ * @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
+ */
+public class CoreClientOverHttpTest extends UnitTestCase
+{
+ public void testCoreHttpClient() throws Exception
+ {
+ final SimpleString QUEUE = new SimpleString("CoreClientOverHttpTestQueue");
+
+ Configuration conf = new ConfigurationImpl();
+
+ conf.setSecurityEnabled(false);
+
+ HashMap<String, Object> params = new HashMap<String, Object>();
+ params.put("jbm.remoting.netty.httpenabled", true);
+ conf.getAcceptorConfigurations().add(new TransportConfiguration(NETTY_ACCEPTOR_FACTORY, params));
+
+ MessagingService messagingService = MessagingServiceImpl.newNullStorageMessagingServer(conf);
+
+ messagingService.start();
+
+ ClientSessionFactory sf = new ClientSessionFactoryImpl(new TransportConfiguration(NETTY_CONNECTOR_FACTORY, params));
+
+ ClientSession session = sf.createSession(false, true, true);
+
+ session.createQueue(QUEUE, QUEUE, null, false, false, true);
+
+ ClientProducer producer = session.createProducer(QUEUE);
+
+ final int numMessages = 100;
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = session.createClientMessage(JBossTextMessage.TYPE, false, 0,
+ System.currentTimeMillis(), (byte) 1);
+ message.getBody().putString("CoreClientOverHttpTest");
+ message.getBody().flip();
+ producer.send(message);
+ }
+
+ ClientConsumer consumer = session.createConsumer(QUEUE);
+
+ session.start();
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message2 = consumer.receive();
+
+ assertEquals("CoreClientOverHttpTest", message2.getBody().getString());
+
+ message2.acknowledge();
+ }
+
+ session.close();
+
+ messagingService.stop();
+ }
+}
Modified: trunk/tests/src/org/jboss/messaging/tests/util/UnitTestCase.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/util/UnitTestCase.java 2008-12-03 03:59:55 UTC (rev 5454)
+++ trunk/tests/src/org/jboss/messaging/tests/util/UnitTestCase.java 2008-12-03 13:56:24 UTC (rev 5455)
@@ -22,22 +22,7 @@
package org.jboss.messaging.tests.util;
-import java.io.BufferedInputStream;
-import java.io.BufferedOutputStream;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileOutputStream;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.nio.ByteBuffer;
-import java.util.Iterator;
-import java.util.List;
-import java.util.UUID;
-
-import javax.transaction.xa.Xid;
-
import junit.framework.TestCase;
-
import org.easymock.EasyMock;
import org.easymock.IArgumentMatcher;
import org.jboss.messaging.core.client.ClientMessage;
@@ -52,10 +37,23 @@
import org.jboss.messaging.core.server.impl.ServerMessageImpl;
import org.jboss.messaging.jms.client.JBossTextMessage;
+import javax.transaction.xa.Xid;
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.util.Iterator;
+import java.util.List;
+import java.util.UUID;
+
/**
- *
+ *
* Helper base class for our unit tests
- *
+ *
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
* @author <a href="mailto:csuconic at redhat.com">Clebert</a>
*
@@ -68,6 +66,9 @@
public static final String INVM_CONNECTOR_FACTORY = "org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory";
+ public static final String NETTY_ACCEPTOR_FACTORY = "org.jboss.messaging.integration.transports.netty.NettyAcceptorFactory";
+
+ public static final String NETTY_CONNECTOR_FACTORY = "org.jboss.messaging.integration.transports.netty.NettyConnectorFactory";
// Attributes ----------------------------------------------------
private String testDir = System.getProperty("java.io.tmpdir", "/tmp") + "/jbm-unit-test";
More information about the jboss-cvs-commits
mailing list