Author: alessio.soldano(a)jboss.com
Date: 2009-07-02 10:38:33 -0400 (Thu, 02 Jul 2009)
New Revision: 10293
Added:
stack/native/branches/netty/modules/core/src/main/java/org/jboss/ws/core/client/WSServerPipelineFactory.java
Modified:
stack/native/branches/netty/modules/core/src/main/java/org/jboss/ws/extensions/wsrm/transport/backchannel/RMBackPortsInvocationHandler.java
stack/native/branches/netty/modules/core/src/main/java/org/jboss/ws/extensions/wsrm/transport/backchannel/RMBackPortsServer.java
stack/native/branches/netty/modules/core/src/main/java/org/jboss/ws/extensions/wsrm/transport/backchannel/RMCallbackHandler.java
stack/native/branches/netty/modules/core/src/main/java/org/jboss/ws/extensions/wsrm/transport/backchannel/RMCallbackHandlerImpl.java
Log:
Using Netty instead of Remoting for WS-RM back ports server
Added:
stack/native/branches/netty/modules/core/src/main/java/org/jboss/ws/core/client/WSServerPipelineFactory.java
===================================================================
---
stack/native/branches/netty/modules/core/src/main/java/org/jboss/ws/core/client/WSServerPipelineFactory.java
(rev 0)
+++
stack/native/branches/netty/modules/core/src/main/java/org/jboss/ws/core/client/WSServerPipelineFactory.java 2009-07-02
14:38:33 UTC (rev 10293)
@@ -0,0 +1,82 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2006, Red Hat Middleware LLC, and individual contributors
+ * as indicated by the @author tags. See the copyright.txt file in the
+ * distribution for a full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+package org.jboss.ws.core.client;
+
+import static org.jboss.netty.channel.Channels.pipeline;
+
+import org.jboss.netty.channel.ChannelHandler;
+import org.jboss.netty.channel.ChannelPipeline;
+import org.jboss.netty.channel.ChannelPipelineFactory;
+import org.jboss.netty.handler.codec.http.HttpChunkAggregator;
+import org.jboss.netty.handler.codec.http.HttpRequestDecoder;
+import org.jboss.netty.handler.codec.http.HttpResponseEncoder;
+
+/**
+ *
+ * @author alessio.soldano(a)jboss.com
+ * @since 01-Jul-2009
+ *
+ */
+public class WSServerPipelineFactory implements ChannelPipelineFactory
+{
+ private static final int MAX_CONTENT_SIZE = 1073741824;
+ private ChannelHandler requestHandler;
+ private ChannelHandler sshHandler;
+
+ public ChannelPipeline getPipeline() throws Exception
+ {
+ // Create a default pipeline implementation.
+ ChannelPipeline pipeline = pipeline();
+ // Uncomment the following line if you want HTTPS
+ //SSLEngine engine =
SecureChatSslContextFactory.getServerContext().createSSLEngine();
+ //engine.setUseClientMode(false);
+ //pipeline.addLast("ssl", new SslHandler(engine));
+
+ pipeline.addLast("decoder", new HttpRequestDecoder());
+ // Uncomment the following line if you don't want to handle HttpChunks.
+ pipeline.addLast("aggregator", new
HttpChunkAggregator(MAX_CONTENT_SIZE));
+ pipeline.addLast("encoder", new HttpResponseEncoder());
+ pipeline.addLast("handler", requestHandler);
+ return pipeline;
+ }
+
+ public ChannelHandler getRequestHandler()
+ {
+ return requestHandler;
+ }
+
+ public void setRequestHandler(ChannelHandler requestHandler)
+ {
+ this.requestHandler = requestHandler;
+ }
+
+ public ChannelHandler getSshHandler()
+ {
+ return sshHandler;
+ }
+
+ public void setSshHandler(ChannelHandler sshHandler)
+ {
+ this.sshHandler = sshHandler;
+ }
+
+}
Property changes on:
stack/native/branches/netty/modules/core/src/main/java/org/jboss/ws/core/client/WSServerPipelineFactory.java
___________________________________________________________________
Name: svn:keywords
+ Id Revision
Name: svn:eol-style
+ LF
Modified:
stack/native/branches/netty/modules/core/src/main/java/org/jboss/ws/extensions/wsrm/transport/backchannel/RMBackPortsInvocationHandler.java
===================================================================
---
stack/native/branches/netty/modules/core/src/main/java/org/jboss/ws/extensions/wsrm/transport/backchannel/RMBackPortsInvocationHandler.java 2009-07-02
14:04:22 UTC (rev 10292)
+++
stack/native/branches/netty/modules/core/src/main/java/org/jboss/ws/extensions/wsrm/transport/backchannel/RMBackPortsInvocationHandler.java 2009-07-02
14:38:33 UTC (rev 10293)
@@ -21,76 +21,179 @@
*/
package org.jboss.ws.extensions.wsrm.transport.backchannel;
+import java.net.URL;
+import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
+import java.util.Map;
+import java.util.Set;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
-import javax.management.MBeanServer;
-
import org.jboss.logging.Logger;
-import org.jboss.remoting.InvocationRequest;
-import org.jboss.remoting.ServerInvocationHandler;
-import org.jboss.remoting.ServerInvoker;
-import org.jboss.remoting.callback.InvokerCallbackHandler;
-import org.jboss.remoting.transport.coyote.RequestMap;
-import org.jboss.remoting.transport.http.HTTPMetadataConstants;
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.buffer.ChannelBufferInputStream;
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.channel.ChannelStateEvent;
+import org.jboss.netty.channel.ExceptionEvent;
+import org.jboss.netty.channel.MessageEvent;
+import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
+import org.jboss.netty.handler.codec.http.Cookie;
+import org.jboss.netty.handler.codec.http.CookieDecoder;
+import org.jboss.netty.handler.codec.http.CookieEncoder;
+import org.jboss.netty.handler.codec.http.DefaultHttpResponse;
+import org.jboss.netty.handler.codec.http.HttpHeaders;
+import org.jboss.netty.handler.codec.http.HttpRequest;
+import org.jboss.netty.handler.codec.http.HttpResponse;
+import org.jboss.netty.handler.codec.http.HttpResponseStatus;
+import org.jboss.netty.handler.codec.http.HttpVersion;
+import org.jboss.ws.extensions.wsrm.transport.RMMessage;
+import org.jboss.ws.extensions.wsrm.transport.RMUnMarshaller;
/**
* TODO: Add comment
*
* @author richard.opalka(a)jboss.com
+ * @author alessio.soldano(a)jboss.com
*
* @since Nov 20, 2007
*/
-public final class RMBackPortsInvocationHandler implements ServerInvocationHandler
+public final class RMBackPortsInvocationHandler extends SimpleChannelUpstreamHandler
{
private static final Logger LOG =
Logger.getLogger(RMBackPortsInvocationHandler.class);
private final List<RMCallbackHandler> callbacks = new
LinkedList<RMCallbackHandler>();
private final Lock lock = new ReentrantLock();
-
+
+
public RMBackPortsInvocationHandler()
{
}
- public RMCallbackHandler getCallback(String requestPath)
+ @Override
+ public void channelOpen(ChannelHandlerContext ctx, ChannelStateEvent e)
{
+ // HERE: Add all accepted channels to the group
+ // so that they are closed properly on shutdown
+ // If the added channel is closed before shutdown,
+ // it will be removed from the group automatically.
+ RMBackPortsServer.channelGroup.add(ctx.getChannel());
+ }
+
+ @Override
+ public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws
Exception
+ {
+ HttpRequest request = (HttpRequest)e.getMessage();
+ ChannelBuffer content = request.getContent();
+
+ Map<String, Object> requestHeaders = new HashMap<String, Object>();
+ for (String headerName : request.getHeaderNames())
+ {
+ requestHeaders.put(headerName, request.getHeaders(headerName));
+ }
+ boolean error = false;
+ try
+ {
+ String requestPath = new URL(request.getUri()).getPath();
+ RMMessage message =
(RMMessage)RMUnMarshaller.getInstance().read(content.readable() ? new
ChannelBufferInputStream(content) : null, requestHeaders);
+ handle(requestPath, message);
+ }
+ catch (Throwable t)
+ {
+ error = true;
+ LOG.error("Error decoding request to the backport", t);
+ }
+ finally
+ {
+ writeResponse(e, request, error);
+ }
+ }
+
+
+ private void handle(String requestPath, RMMessage message)
+ {
this.lock.lock();
try
{
+ boolean handlerExists = false;
for (RMCallbackHandler handler : this.callbacks)
{
if (handler.getHandledPath().equals(requestPath))
- return handler;
+ {
+ handlerExists = true;
+ LOG.debug("Handling request path: " + requestPath);
+ handler.handle(message);
+ break;
+ }
}
+ if (handlerExists == false)
+ LOG.warn("No callback handler registered for path: " +
requestPath);
}
finally
{
this.lock.unlock();
}
+ }
+
+ private void writeResponse(MessageEvent e, HttpRequest request, boolean error)
+ {
+ // Build the response object.
+ HttpResponse response = new DefaultHttpResponse(HttpVersion.HTTP_1_1, error ?
HttpResponseStatus.INTERNAL_SERVER_ERROR : HttpResponseStatus.OK);
+ response.setHeader(HttpHeaders.Names.CONTENT_TYPE, "text/plain;
charset=UTF-8");
- return null;
+ String cookieString = request.getHeader(HttpHeaders.Names.COOKIE);
+ if (cookieString != null)
+ {
+ CookieDecoder cookieDecoder = new CookieDecoder();
+ Set<Cookie> cookies = cookieDecoder.decode(cookieString);
+ if (!cookies.isEmpty())
+ {
+ // Reset the cookies if necessary.
+ CookieEncoder cookieEncoder = new CookieEncoder(true);
+ for (Cookie cookie : cookies)
+ {
+ cookieEncoder.addCookie(cookie);
+ }
+ response.addHeader(HttpHeaders.Names.SET_COOKIE, cookieEncoder.encode());
+ }
+ }
+
+ // Write the response.
+ e.getChannel().write(response);
+ e.getChannel().close();
}
- public void registerCallback(RMCallbackHandler callbackHandler)
+ @Override
+ public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws
Exception
{
+ e.getCause().printStackTrace();
+ e.getChannel().close();
+ }
+
+ public RMCallbackHandler getCallback(String requestPath)
+ {
this.lock.lock();
try
{
- this.callbacks.add(callbackHandler);
+ for (RMCallbackHandler handler : this.callbacks)
+ {
+ if (handler.getHandledPath().equals(requestPath))
+ return handler;
+ }
}
finally
{
this.lock.unlock();
}
+
+ return null;
}
- public void unregisterCallback(RMCallbackHandler callbackHandler)
+ public void registerCallback(RMCallbackHandler callbackHandler)
{
this.lock.lock();
try
{
- this.callbacks.remove(callbackHandler);
+ this.callbacks.add(callbackHandler);
}
finally
{
@@ -98,53 +201,16 @@
}
}
- public Object invoke(InvocationRequest request) throws Throwable
+ public void unregisterCallback(RMCallbackHandler callbackHandler)
{
this.lock.lock();
try
{
- RequestMap rm = (RequestMap)request.getRequestPayload();
- String requestPath = (String)rm.get(HTTPMetadataConstants.PATH);
- boolean handlerExists = false;
- for (RMCallbackHandler handler : this.callbacks)
- {
- if (handler.getHandledPath().equals(requestPath))
- {
- handlerExists = true;
- LOG.debug("Handling request path: " + requestPath);
- handler.handle(request);
- break;
- }
- }
- if (handlerExists == false)
- LOG.warn("No callback handler registered for path: " +
requestPath);
-
- return null;
+ this.callbacks.remove(callbackHandler);
}
finally
{
this.lock.unlock();
}
}
-
- public void addListener(InvokerCallbackHandler callbackHandler)
- {
- // do nothing - we're using custom callback handlers
- }
-
- public void removeListener(InvokerCallbackHandler callbackHandler)
- {
- // do nothing - we're using custom callback handlers
- }
-
- public void setInvoker(ServerInvoker arg0)
- {
- // do nothing
- }
-
- public void setMBeanServer(MBeanServer arg0)
- {
- // do nothing
- }
-
}
Modified:
stack/native/branches/netty/modules/core/src/main/java/org/jboss/ws/extensions/wsrm/transport/backchannel/RMBackPortsServer.java
===================================================================
---
stack/native/branches/netty/modules/core/src/main/java/org/jboss/ws/extensions/wsrm/transport/backchannel/RMBackPortsServer.java 2009-07-02
14:04:22 UTC (rev 10292)
+++
stack/native/branches/netty/modules/core/src/main/java/org/jboss/ws/extensions/wsrm/transport/backchannel/RMBackPortsServer.java 2009-07-02
14:38:33 UTC (rev 10293)
@@ -21,19 +21,26 @@
*/
package org.jboss.ws.extensions.wsrm.transport.backchannel;
+import java.net.InetSocketAddress;
+import java.util.concurrent.Executors;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.jboss.logging.Logger;
-import org.jboss.remoting.InvokerLocator;
-import org.jboss.remoting.transport.Connector;
+import org.jboss.netty.bootstrap.ServerBootstrap;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelFactory;
+import org.jboss.netty.channel.group.ChannelGroup;
+import org.jboss.netty.channel.group.DefaultChannelGroup;
+import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
+import org.jboss.ws.core.client.WSServerPipelineFactory;
import org.jboss.ws.extensions.wsrm.api.RMException;
-import org.jboss.ws.extensions.wsrm.transport.RemotingRMUnMarshaller;
/**
* Back ports server used by addressable clients
*
* @author richard.opalka(a)jboss.com
+ * @author alessio.soldano(a)jboss.com
*
* @since Nov 20, 2007
*/
@@ -43,9 +50,9 @@
private static final Lock CLASS_LOCK = new ReentrantLock();
private static final long WAIT_PERIOD = 100;
private static RMBackPortsServer INSTANCE;
+ static final ChannelGroup channelGroup = new
DefaultChannelGroup("rmBackPortsServer");
private final Object instanceLock = new Object();
- private final Connector connector;
private final String scheme;
private final String host;
private final int port;
@@ -53,6 +60,7 @@
private boolean started;
private boolean stopped;
private boolean terminated;
+ private ChannelFactory factory;
public final void registerCallback(RMCallbackHandler callbackHandler)
{
@@ -69,8 +77,7 @@
return this.handler.getCallback(requestPath);
}
- private RMBackPortsServer(String scheme, String host, int port)
- throws RMException
+ private RMBackPortsServer(String scheme, String host, int port) throws RMException
{
super();
this.scheme = scheme;
@@ -78,17 +85,20 @@
this.port = port;
try
{
- // we have to use custom unmarshaller because default one removes CRNLs
- String customUnmarshaller = "/?unmarshaller=" +
RemotingRMUnMarshaller.class.getName();
- InvokerLocator il = new InvokerLocator(this.scheme + "://" + this.host
+ ":" + this.port + customUnmarshaller);
- this.connector = new Connector();
- this.connector.setInvokerLocator(il.getLocatorURI());
- this.connector.create();
-
+ factory = new NioServerSocketChannelFactory(Executors.newCachedThreadPool(),
Executors.newCachedThreadPool());
+
+ ServerBootstrap bootstrap = new ServerBootstrap(factory);
this.handler = new RMBackPortsInvocationHandler();
- this.connector.addInvocationHandler("wsrmBackPortsHandler",
this.handler);
- this.connector.start();
- LOG.debug("WS-RM Backports Server started on: " +
il.getLocatorURI());
+ WSServerPipelineFactory channelPipelineFactory = new WSServerPipelineFactory();
+ channelPipelineFactory.setRequestHandler(this.handler);
+ bootstrap.setPipelineFactory(channelPipelineFactory);
+ bootstrap.setOption("child.tcpNoDelay", true);
+ bootstrap.setOption("child.keepAlive", true);
+ // Bind and start to accept incoming connections.
+ Channel c = bootstrap.bind(new InetSocketAddress(this.port));
+ channelGroup.add(c);
+ LOG.debug("WS-RM Backports Server started on port: " + this.port);
+ System.out.println("WS-RM Backports Server started on port: " +
this.port);
}
catch (Exception e)
{
@@ -135,7 +145,13 @@
}
try
{
- connector.stop();
+ //Close all connections and server sockets.
+ channelGroup.close().awaitUninterruptibly();
+ //Shutdown the selector loop (boss and worker).
+ if (factory != null)
+ {
+ factory.releaseExternalResources();
+ }
}
finally
{
@@ -177,8 +193,7 @@
* @return WS-RM back ports server
* @throws RMException
*/
- public static RMBackPortsServer getInstance(String scheme, String host, int port)
- throws RMException
+ public static RMBackPortsServer getInstance(String scheme, String host, int port)
throws RMException
{
CLASS_LOCK.lock();
try
Modified:
stack/native/branches/netty/modules/core/src/main/java/org/jboss/ws/extensions/wsrm/transport/backchannel/RMCallbackHandler.java
===================================================================
---
stack/native/branches/netty/modules/core/src/main/java/org/jboss/ws/extensions/wsrm/transport/backchannel/RMCallbackHandler.java 2009-07-02
14:04:22 UTC (rev 10292)
+++
stack/native/branches/netty/modules/core/src/main/java/org/jboss/ws/extensions/wsrm/transport/backchannel/RMCallbackHandler.java 2009-07-02
14:38:33 UTC (rev 10293)
@@ -21,7 +21,6 @@
*/
package org.jboss.ws.extensions.wsrm.transport.backchannel;
-import org.jboss.remoting.InvocationRequest;
import org.jboss.ws.extensions.wsrm.transport.RMMessage;
import org.jboss.ws.extensions.wsrm.transport.RMUnassignedMessageListener;
@@ -35,7 +34,7 @@
public interface RMCallbackHandler
{
String getHandledPath();
- void handle(InvocationRequest payload);
+ void handle(RMMessage message);
RMMessage getMessage(String messageId);
Throwable getFault(String messageId);
void addUnassignedMessageListener(RMUnassignedMessageListener listener);
Modified:
stack/native/branches/netty/modules/core/src/main/java/org/jboss/ws/extensions/wsrm/transport/backchannel/RMCallbackHandlerImpl.java
===================================================================
---
stack/native/branches/netty/modules/core/src/main/java/org/jboss/ws/extensions/wsrm/transport/backchannel/RMCallbackHandlerImpl.java 2009-07-02
14:04:22 UTC (rev 10292)
+++
stack/native/branches/netty/modules/core/src/main/java/org/jboss/ws/extensions/wsrm/transport/backchannel/RMCallbackHandlerImpl.java 2009-07-02
14:38:33 UTC (rev 10293)
@@ -27,7 +27,6 @@
import java.util.Map;
import org.jboss.logging.Logger;
-import org.jboss.remoting.InvocationRequest;
import org.jboss.ws.core.MessageTrace;
import org.jboss.ws.extensions.wsrm.transport.RMMessage;
import org.jboss.ws.extensions.wsrm.transport.RMUnassignedMessageListener;
@@ -70,9 +69,8 @@
return this.handledPath;
}
- public final void handle(InvocationRequest request)
+ public final void handle(RMMessage message)
{
- RMMessage message = (RMMessage)request.getParameter();
synchronized (instanceLock)
{
String requestMessage = new String(message.getPayload());