Author: ron.sigal(a)jboss.com
Date: 2009-11-12 22:36:46 -0500 (Thu, 12 Nov 2009)
New Revision: 5590
Added:
remoting3/trunk/samples/src/main/java/org/jboss/remoting3/samples/socket/
remoting3/trunk/samples/src/main/java/org/jboss/remoting3/samples/socket/RequestHandlerFuture.java
remoting3/trunk/samples/src/main/java/org/jboss/remoting3/samples/socket/SocketConnectionProvider.java
remoting3/trunk/samples/src/main/java/org/jboss/remoting3/samples/socket/SocketHandleableCloseable.java
remoting3/trunk/samples/src/main/java/org/jboss/remoting3/samples/socket/SocketObjectTable.java
remoting3/trunk/samples/src/main/java/org/jboss/remoting3/samples/socket/SocketProtocol.java
remoting3/trunk/samples/src/main/java/org/jboss/remoting3/samples/socket/SocketRequestListenerProxy.java
remoting3/trunk/samples/src/main/java/org/jboss/remoting3/samples/socket/SocketServiceConfiguration.java
remoting3/trunk/samples/src/main/java/org/jboss/remoting3/samples/socket/SocketUsageExamples.java
remoting3/trunk/samples/src/main/java/org/jboss/remoting3/samples/socket/client/
remoting3/trunk/samples/src/main/java/org/jboss/remoting3/samples/socket/client/SocketClientConnectionHandler.java
remoting3/trunk/samples/src/main/java/org/jboss/remoting3/samples/socket/client/SocketClientRequestHandler.java
remoting3/trunk/samples/src/main/java/org/jboss/remoting3/samples/socket/client/SocketRequestHandlerConnector.java
remoting3/trunk/samples/src/main/java/org/jboss/remoting3/samples/socket/server/
remoting3/trunk/samples/src/main/java/org/jboss/remoting3/samples/socket/server/SocketClientListener.java
remoting3/trunk/samples/src/main/java/org/jboss/remoting3/samples/socket/server/SocketServerConnectionHandler.java
remoting3/trunk/samples/src/main/java/org/jboss/remoting3/samples/socket/server/SocketServerReplyHandler.java
remoting3/trunk/samples/src/main/java/org/jboss/remoting3/samples/socket/server/SocketServerRequestHandler.java
Log:
JBREM-1167: Adding a simple socket based protocol.
Added:
remoting3/trunk/samples/src/main/java/org/jboss/remoting3/samples/socket/RequestHandlerFuture.java
===================================================================
---
remoting3/trunk/samples/src/main/java/org/jboss/remoting3/samples/socket/RequestHandlerFuture.java
(rev 0)
+++
remoting3/trunk/samples/src/main/java/org/jboss/remoting3/samples/socket/RequestHandlerFuture.java 2009-11-13
03:36:46 UTC (rev 5590)
@@ -0,0 +1,63 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2009, Red Hat Middleware LLC, and individual contributors
+ * as indicated by the @author tags. See the copyright.txt file in the
+ * distribution for a full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+
+package org.jboss.remoting3.samples.socket;
+
+import java.io.IOException;
+
+import org.jboss.remoting3.spi.RequestHandler;
+import org.jboss.xnio.Result;
+
+/**
+ * @author <a href="ron.sigal(a)jboss.com">Ron Sigal</a>
+ * @version
+ * <p>
+ * Copyright Oct 24, 2009
+ * </p>
+ */
+public class RequestHandlerFuture implements Result<RequestHandler> {
+ private RequestHandler requestHandler;
+ private IOException exception;
+
+ @Override
+ public boolean setCancelled() {return true;}
+
+ @Override
+ public boolean setException(IOException exception) {
+ this.exception = exception;
+ return true;
+ }
+
+ @Override
+ public boolean setResult(RequestHandler result) {
+ this.requestHandler = result;
+ return true;
+ }
+
+ public RequestHandler get() {
+ return requestHandler;
+ }
+
+ public IOException getException() {
+ return exception;
+ }
+}
Added:
remoting3/trunk/samples/src/main/java/org/jboss/remoting3/samples/socket/SocketConnectionProvider.java
===================================================================
---
remoting3/trunk/samples/src/main/java/org/jboss/remoting3/samples/socket/SocketConnectionProvider.java
(rev 0)
+++
remoting3/trunk/samples/src/main/java/org/jboss/remoting3/samples/socket/SocketConnectionProvider.java 2009-11-13
03:36:46 UTC (rev 5590)
@@ -0,0 +1,107 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2009, Red Hat Middleware LLC, and individual contributors
+ * as indicated by the @author tags. See the copyright.txt file in the
+ * distribution for a full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+package org.jboss.remoting3.samples.socket;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.concurrent.Executor;
+
+import org.jboss.remoting3.CloseHandler;
+import org.jboss.remoting3.Endpoint;
+import org.jboss.remoting3.samples.socket.client.SocketClientConnectionHandler;
+import org.jboss.remoting3.samples.socket.server.SocketServerConnectionHandler;
+import org.jboss.remoting3.spi.AbstractHandleableCloseable;
+import org.jboss.remoting3.spi.ConnectionHandler;
+import org.jboss.remoting3.spi.ConnectionHandlerContext;
+import org.jboss.remoting3.spi.ConnectionHandlerFactory;
+import org.jboss.remoting3.spi.ConnectionProvider;
+import org.jboss.remoting3.spi.ConnectionProviderContext;
+import org.jboss.xnio.Cancellable;
+import org.jboss.xnio.IoUtils;
+import org.jboss.xnio.OptionMap;
+import org.jboss.xnio.Result;
+
+/**
+ * @author <a href="ron.sigal(a)jboss.com">Ron Sigal</a>
+ * @version
+ * <p>
+ * Copyright Oct 14, 2009
+ * </p>
+ */
+public class SocketConnectionProvider<T, I, O> extends
AbstractHandleableCloseable<SocketHandleableCloseable> implements
ConnectionProvider<T> {
+ private Endpoint endpoint;
+ private String host;
+ private int port;
+ private SocketServerConnectionHandler<I, O> connectionHandler;
+
+
+ public SocketConnectionProvider(Endpoint endpoint, Executor executor, String host) {
+ super(executor);
+ this.endpoint = endpoint;
+ this.host = host;
+ SocketProtocol.initializeMarshalling(endpoint, executor);
+ }
+
+ public Cancellable connect(final URI uri, final OptionMap connectOptions,
Result<ConnectionHandlerFactory> result) throws IllegalArgumentException {
+ result.setResult(new ConnectionHandlerFactory() {
+ public ConnectionHandler createInstance(ConnectionHandlerContext
connectionContext) {
+ final ConnectionHandler connectionHandler = new
SocketClientConnectionHandler(uri, connectOptions, getExecutor(), host, port);
+ registerCloseHandler(connectionHandler);
+ return connectionHandler;
+ }
+ });
+ return null;
+ }
+
+ public T getProviderInterface() {
+ return null;
+ }
+
+ public void start(final ConnectionProviderContext context, final int port) throws
IOException {
+ this.port = port;
+ context.accept(new ConnectionHandlerFactory() {
+ public ConnectionHandler createInstance(ConnectionHandlerContext
connectionContext) {
+ connectionHandler = new SocketServerConnectionHandler<I, O>(endpoint,
getExecutor(), connectionContext, host, port);
+ registerCloseHandler(connectionHandler);
+ try {
+ connectionHandler.start();
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ return connectionHandler;
+ }});
+ }
+
+ public void stop() {
+ if (connectionHandler != null) {
+ connectionHandler.stop();
+ }
+ }
+
+ protected void registerCloseHandler(final ConnectionHandler connectionHandler) {
+ addCloseHandler(new CloseHandler<SocketHandleableCloseable>() {
+ public void handleClose(SocketHandleableCloseable closed) {
+ IoUtils.safeClose(connectionHandler);
+ }
+ });
+ }
+}
Added:
remoting3/trunk/samples/src/main/java/org/jboss/remoting3/samples/socket/SocketHandleableCloseable.java
===================================================================
---
remoting3/trunk/samples/src/main/java/org/jboss/remoting3/samples/socket/SocketHandleableCloseable.java
(rev 0)
+++
remoting3/trunk/samples/src/main/java/org/jboss/remoting3/samples/socket/SocketHandleableCloseable.java 2009-11-13
03:36:46 UTC (rev 5590)
@@ -0,0 +1,34 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2009, Red Hat Middleware LLC, and individual contributors
+ * as indicated by the @author tags. See the copyright.txt file in the
+ * distribution for a full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+package org.jboss.remoting3.samples.socket;
+
+import org.jboss.remoting3.HandleableCloseable;
+
+/**
+ * @author <a href="ron.sigal(a)jboss.com">Ron Sigal</a>
+ * @version
+ * <p>
+ * Copyright Oct 31, 2009
+ * </p>
+ */
+public interface SocketHandleableCloseable extends
HandleableCloseable<SocketHandleableCloseable>{
+}
Added:
remoting3/trunk/samples/src/main/java/org/jboss/remoting3/samples/socket/SocketObjectTable.java
===================================================================
---
remoting3/trunk/samples/src/main/java/org/jboss/remoting3/samples/socket/SocketObjectTable.java
(rev 0)
+++
remoting3/trunk/samples/src/main/java/org/jboss/remoting3/samples/socket/SocketObjectTable.java 2009-11-13
03:36:46 UTC (rev 5590)
@@ -0,0 +1,114 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2009, Red Hat Middleware LLC, and individual contributors
+ * as indicated by the @author tags. See the copyright.txt file in the
+ * distribution for a full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+package org.jboss.remoting3.samples.socket;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.concurrent.Executor;
+
+import org.jboss.marshalling.Marshaller;
+import org.jboss.marshalling.Unmarshaller;
+import org.jboss.remoting3.Endpoint;
+import org.jboss.remoting3.RequestListener;
+import org.jboss.remoting3.samples.socket.server.SocketClientListener;
+import org.jboss.xnio.log.Logger;
+
+/**
+ * @author <a href="ron.sigal(a)jboss.com">Ron Sigal</a>
+ * @version
+ * <p>
+ * Copyright Nov 9, 2009
+ * </p>
+ */
+public class SocketObjectTable<I, O> implements org.jboss.marshalling.ObjectTable
{
+ private static final Logger log = Logger.getLogger(SocketObjectTable.class);
+ private static final EndpointToken ENDPOINT_TOKEN = new EndpointToken();
+ private static final ExecutorToken EXECUTOR_TOKEN = new ExecutorToken();
+
+ private Endpoint endpoint;
+ private Executor executor;
+ private SocketObjectTableWriter socketObjectTableWriter;
+
+ public SocketObjectTable(Endpoint endpoint, Executor executor) {
+ this.endpoint = endpoint;
+ this.executor = executor;
+ socketObjectTableWriter = new SocketObjectTableWriter();
+ }
+
+ @Override
+ public Writer getObjectWriter(Object object) throws IOException {
+ if (object instanceof Endpoint || object instanceof Executor || object instanceof
RequestListener<?, ?>) {
+ return socketObjectTableWriter;
+ }
+ return null;
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public Object readObject(Unmarshaller unmarshaller) throws IOException,
ClassNotFoundException {
+ Object o = unmarshaller.readObject();
+ if (o instanceof EndpointToken) {
+ return endpoint;
+ }
+ if (o instanceof ExecutorToken) {
+ return executor;
+ }
+ if (o instanceof SocketServiceConfiguration<?, ?>) {
+ return new SocketRequestListenerProxy<I, O>(endpoint,
(SocketServiceConfiguration<I, O>) o);
+ }
+ return o;
+ }
+
+
+ private static class SocketObjectTableWriter implements Writer {
+ public void writeObject(Marshaller marshaller, Object object) throws IOException {
+ if (object instanceof Endpoint) {
+ log.info(this + " got Endpoint: " + object);
+ marshaller.writeObject(ENDPOINT_TOKEN);
+ }
+ else if (object instanceof Executor) {
+ log.info(this + " got Executor: " + object);
+ marshaller.writeObject(EXECUTOR_TOKEN);
+ }
+ else if (object instanceof RequestListener<?, ?>) {
+ log.info(this + " got RequestListener: " + object);
+ SocketServiceConfiguration<?, ?> config =
SocketClientListener.getRequestListenerInfo((RequestListener<?, ?>) object);
+ if (config == null) {
+ throw new RuntimeException(object + " is not registered with
SocketClientListener");
+ }
+ marshaller.writeObject(config);
+ }
+ else {
+ throw new RuntimeException("expecting Endpoint or
RequestListener");
+ }
+ }
+ }
+
+
+ private static class EndpointToken implements Serializable {
+ private static final long serialVersionUID = -7307241847641193094L;
+ }
+
+ private static class ExecutorToken implements Serializable {
+ private static final long serialVersionUID = -8687614439586428163L;
+ }
+}
Added:
remoting3/trunk/samples/src/main/java/org/jboss/remoting3/samples/socket/SocketProtocol.java
===================================================================
---
remoting3/trunk/samples/src/main/java/org/jboss/remoting3/samples/socket/SocketProtocol.java
(rev 0)
+++
remoting3/trunk/samples/src/main/java/org/jboss/remoting3/samples/socket/SocketProtocol.java 2009-11-13
03:36:46 UTC (rev 5590)
@@ -0,0 +1,135 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2009, Red Hat Middleware LLC, and individual contributors
+ * as indicated by the @author tags. See the copyright.txt file in the
+ * distribution for a full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+package org.jboss.remoting3.samples.socket;
+
+import java.io.IOException;
+import java.util.concurrent.Executor;
+
+import org.jboss.marshalling.MarshallerFactory;
+import org.jboss.marshalling.MarshallingConfiguration;
+import org.jboss.marshalling.reflect.SunReflectiveCreator;
+import org.jboss.marshalling.river.RiverMarshallerFactory;
+import org.jboss.remoting3.ClientListener;
+import org.jboss.remoting3.Endpoint;
+import org.jboss.remoting3.RequestListener;
+import org.jboss.remoting3.Endpoint.ServiceBuilder;
+import org.jboss.remoting3.samples.socket.server.SocketClientListener;
+import org.jboss.remoting3.spi.ConnectionProvider;
+import org.jboss.remoting3.spi.ConnectionProviderContext;
+import org.jboss.remoting3.spi.ConnectionProviderFactory;
+import org.jboss.xnio.Cancellable;
+import org.jboss.xnio.IoUtils;
+
+/**
+ * @author <a href="ron.sigal(a)jboss.com">Ron Sigal</a>
+ * @version
+ * <p>
+ * Copyright Oct 14, 2009
+ * </p>
+ */
+public class SocketProtocol
+{
+ private static MarshallerFactory marshallerFactory;
+ private static MarshallingConfiguration marshallingConfiguration;
+
+
+ /**
+ * Register ConnectionProvider.
+ * This endpoint can be a socket transport client.
+ */
+ static public <T, I, O> void registerClientTransport(final Endpoint endpoint,
final Executor executor, final String host) {
+ endpoint.addConnectionProvider("socket", new
ConnectionProviderFactory<T>() {
+ public ConnectionProvider<T> createInstance(ConnectionProviderContext
context) {
+ return new SocketConnectionProvider<T, I, O>(endpoint, executor,
host);
+ }});
+ }
+
+
+ /**
+ * Register ConnectionProvider and start its listening facility.
+ * This endpoint can be both a client and server for the socket transport.
+ */
+ static public <T, I, O> Cancellable registerServerTransport(Endpoint endpoint,
Executor executor, final String host, final int port) {
+ final SocketConnectionProvider<T, I, O> connectionProvider = new
SocketConnectionProvider<T, I, O>(endpoint, executor, host);
+ endpoint.addConnectionProvider("socket", new
ConnectionProviderFactory<T>() {
+ public ConnectionProvider<T> createInstance(ConnectionProviderContext
context) {
+ try {
+ connectionProvider.start(context, port);
+ return connectionProvider;
+ } catch (IOException e) {
+ throw new RuntimeException("unable to start
SocketServerConnectionProvider", e);
+ }
+ }
+ });
+
+ return new Cancellable() {
+ public Cancellable cancel() {
+ connectionProvider.stop();
+ return IoUtils.nullCancellable();
+ }
+ };
+ }
+
+
+ /**
+ * Register a service with an endpoint.
+ * This endpoint must be acting as a socket transport server.
+ */
+ @SuppressWarnings("unchecked")
+ static public <I, O> void startService(Endpoint endpoint, Executor executor,
SocketServiceConfiguration<I, O> socketConfig, final RequestListener<I, O>
requestListener) throws IOException {
+ String serviceType = socketConfig.getServiceType();
+ String groupName = socketConfig.getGroupName();
+ ClientListener<I, O> clientListener = new SocketClientListener<I,
O>(endpoint, socketConfig, requestListener);
+ ServiceBuilder<I, O> sb = (ServiceBuilder<I, O>)
endpoint.serviceBuilder();
+ sb.setRequestType(socketConfig.getRequestClass());
+ sb.setReplyType(socketConfig.getResponseClass());
+ sb.setClientListener(clientListener);
+ sb.setServiceType(serviceType);
+ sb.setGroupName(groupName);
+ sb.register();
+ }
+
+
+ static public <I, O> void initializeMarshalling(Endpoint endpoint, Executor
executor) {
+ marshallerFactory = new RiverMarshallerFactory();
+ marshallingConfiguration = new MarshallingConfiguration();
+ marshallingConfiguration.setCreator(new SunReflectiveCreator());
+ marshallingConfiguration.setObjectTable(new SocketObjectTable<I, O>(endpoint,
executor));
+ }
+
+
+ static public MarshallerFactory getMarshallerFactory() throws IllegalStateException {
+ if (marshallerFactory == null) {
+ throw new IllegalStateException("marshalling has not been
initialized");
+ }
+ return marshallerFactory;
+ }
+
+
+ static public MarshallingConfiguration getMarshallingConfiguration() throws
IllegalStateException {
+ if (marshallingConfiguration == null) {
+ throw new IllegalStateException("marshalling has not been
initialized");
+ }
+ return marshallingConfiguration;
+ }
+}
+
Added:
remoting3/trunk/samples/src/main/java/org/jboss/remoting3/samples/socket/SocketRequestListenerProxy.java
===================================================================
---
remoting3/trunk/samples/src/main/java/org/jboss/remoting3/samples/socket/SocketRequestListenerProxy.java
(rev 0)
+++
remoting3/trunk/samples/src/main/java/org/jboss/remoting3/samples/socket/SocketRequestListenerProxy.java 2009-11-13
03:36:46 UTC (rev 5590)
@@ -0,0 +1,121 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2009, Red Hat Middleware LLC, and individual contributors
+ * as indicated by the @author tags. See the copyright.txt file in the
+ * distribution for a full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+package org.jboss.remoting3.samples.socket;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.TimeUnit;
+
+import org.jboss.remoting3.Client;
+import org.jboss.remoting3.Connection;
+import org.jboss.remoting3.Endpoint;
+import org.jboss.remoting3.RemoteExecutionException;
+import org.jboss.remoting3.RequestContext;
+import org.jboss.remoting3.RequestListener;
+import org.jboss.xnio.IoFuture;
+import org.jboss.xnio.OptionMap;
+import org.jboss.xnio.log.Logger;
+
+/**
+ * @author <a href="ron.sigal(a)jboss.com">Ron Sigal</a>
+ * @version
+ * <p>
+ * Copyright Nov 9, 2009
+ * </p>
+ */
+public class SocketRequestListenerProxy<I, O> implements RequestListener<I,
O>, Serializable {
+ private static final long serialVersionUID = -5260475991325355302L;
+ private static Logger log = Logger.getLogger(SocketRequestListenerProxy.class);
+
+ private Connection connection;
+ private Client<I, O> client;
+
+ public SocketRequestListenerProxy(Endpoint endpoint, SocketServiceConfiguration<I,
O> config) throws IOException {
+ URI uri;
+ try {
+ uri = new URI("socket://" + config.getHost() + ":" +
config.getPort());
+ } catch (URISyntaxException e) {
+ throw new IOException(e.getCause());
+ }
+ connection = getFutureResult(endpoint.connect(uri, OptionMap.EMPTY),
"couldn't create Connection");
+ client = getFutureResult(connection.openClient(config.getServiceType(),
config.getGroupName(), config.getRequestClass(), config.getResponseClass()),
"couldn't create Client");
+ }
+
+ @Override
+ public void handleClose() {
+ if (connection != null) {
+ try {
+ connection.close();
+ } catch (IOException e) {
+ log.info(this + " unable to close connection " + connection);
+ }
+ }
+ if (client != null) {
+ try {
+ client.close();
+ } catch (IOException e) {
+ log.info(this + " unable to close client " + client);
+ }
+ }
+ }
+
+ @Override
+ public void handleRequest(RequestContext<O> context, I request) throws
RemoteExecutionException {
+ try {
+ O reply = client.invoke(request);
+ context.sendReply(reply);
+ } catch (CancellationException e) {
+ try {
+ context.sendCancelled();
+ } catch (IllegalStateException e1) {
+ throw new RemoteExecutionException(e1);
+ } catch (IOException e1) {
+ throw new RemoteExecutionException(e1);
+ }
+ } catch (IOException e) {
+ try {
+ context.sendFailure(e.getMessage(), e);
+ } catch (IllegalStateException e1) {
+ throw new RemoteExecutionException(e1);
+ } catch (IOException e1) {
+ throw new RemoteExecutionException(e1);
+ }
+ }
+ }
+
+ static <T> T getFutureResult(IoFuture<T> future, String errorMessage)
throws IOException {
+ switch (future.await(5000, TimeUnit.MILLISECONDS)) {
+ case DONE: {
+ return future.get();
+ }
+ case FAILED: {
+ throw future.getException();
+ }
+ default: {
+ throw new IOException("unexpeced future state: " + future);
+ }
+ }
+ }
+}
\ No newline at end of file
Added:
remoting3/trunk/samples/src/main/java/org/jboss/remoting3/samples/socket/SocketServiceConfiguration.java
===================================================================
---
remoting3/trunk/samples/src/main/java/org/jboss/remoting3/samples/socket/SocketServiceConfiguration.java
(rev 0)
+++
remoting3/trunk/samples/src/main/java/org/jboss/remoting3/samples/socket/SocketServiceConfiguration.java 2009-11-13
03:36:46 UTC (rev 5590)
@@ -0,0 +1,100 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2009, Red Hat Middleware LLC, and individual contributors
+ * as indicated by the @author tags. See the copyright.txt file in the
+ * distribution for a full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+package org.jboss.remoting3.samples.socket;
+
+/**
+ * @author <a href="ron.sigal(a)jboss.com">Ron Sigal</a>
+ * @version
+ * <p>
+ * Copyright Oct 16, 2009
+ * </p>
+ */
+public class SocketServiceConfiguration<I, O> {
+ private String serviceType;
+ private String groupName;
+ private Class<I> requestClass;
+ private Class<O> responseClass;
+ private String host;
+ private int port;
+
+ public SocketServiceConfiguration(String serviceType, String groupName, Class<I>
requestClass, Class<O> responseClass, String host, int port) {
+ this.serviceType = serviceType;
+ this.groupName = groupName;
+ this.requestClass = requestClass;
+ this.responseClass = responseClass;
+ this.host = host;
+ this.port = port;
+ }
+
+ public String getServiceType() {
+ return serviceType;
+ }
+
+ public void setServiceType(String serviceType) {
+ this.serviceType = serviceType;
+ }
+
+ public String getGroupName() {
+ return groupName;
+ }
+
+ public void setGroupName(String groupName) {
+ this.groupName = groupName;
+ }
+
+ public Class<I> getRequestClass() {
+ return requestClass;
+ }
+
+ public void setRequestClass(Class<I> requestClass) {
+ this.requestClass = requestClass;
+ }
+
+ public Class<O> getResponseClass() {
+ return responseClass;
+ }
+
+ public void setResponseClass(Class<O> responseClass) {
+ this.responseClass = responseClass;
+ }
+
+ public String getHost() {
+ return host;
+ }
+
+ public void setHost(String host) {
+ this.host = host;
+ }
+
+ public int getPort() {
+ return port;
+ }
+
+ public void setPort(int port) {
+ this.port = port;
+ }
+
+ public String toString() {
+ return "[serviceType=" + serviceType + ", groupName=" +
groupName + "requestClass=" + requestClass + ", responseClass= " +
responseClass + ", host=" + host + ", port=" + port + "]";
+ }
+}
+
Added:
remoting3/trunk/samples/src/main/java/org/jboss/remoting3/samples/socket/SocketUsageExamples.java
===================================================================
---
remoting3/trunk/samples/src/main/java/org/jboss/remoting3/samples/socket/SocketUsageExamples.java
(rev 0)
+++
remoting3/trunk/samples/src/main/java/org/jboss/remoting3/samples/socket/SocketUsageExamples.java 2009-11-13
03:36:46 UTC (rev 5590)
@@ -0,0 +1,602 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2009, Red Hat Middleware LLC, and individual contributors
+ * as indicated by the @author tags. See the copyright.txt file in the
+ * distribution for a full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+package org.jboss.remoting3.samples.socket;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import junit.framework.TestCase;
+
+import org.jboss.remoting3.Client;
+import org.jboss.remoting3.ClientConnector;
+import org.jboss.remoting3.Connection;
+import org.jboss.remoting3.Endpoint;
+import org.jboss.remoting3.RemoteExecutionException;
+import org.jboss.remoting3.Remoting;
+import org.jboss.remoting3.RequestContext;
+import org.jboss.remoting3.RequestListener;
+import org.jboss.xnio.Cancellable;
+import org.jboss.xnio.IoFuture;
+import org.jboss.xnio.OptionMap;
+import org.jboss.xnio.log.Logger;
+
+/**
+ * @author <a href="ron.sigal(a)jboss.com">Ron Sigal</a>
+ * @version
+ * <p>
+ * Copyright Oct 14, 2009
+ * </p>
+ */
+public class SocketUsageExamples extends TestCase {
+ private static final Logger log = Logger.getLogger(SocketUsageExamples.class);
+ private static final String DR_NICK_REQUEST = "Hi everybody!";
+ private static final String DR_NICK_RESPONSE = "Hi Dr. Nick!";
+ private static final String DR_FRANKENSTEIN_REQUEST = "Dr. Frankenstein?";
+ private static final String DR_FRANKENSTEIN_RESPONSE = "It's
Frankenshteen!";
+ private static final String SERVICE_TYPE = "testservice";
+ private static final String GROUP_NAME = "testgroup";
+ private static final String HOST = "localhost";
+ private static final int PORT = 6789;
+ private static int portCounter;
+
+
+ /**
+ * Sends a message and gets a result, using Client.send().
+ *
+ * @throws Exception
+ */
+ public void testSocketTransportSend() throws Exception {
+ // Start server service.
+ log.info("entering " + getName());
+ ExecutorService serverExecutor = new ThreadPoolExecutor(10, 10, 0L,
TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());
+ Endpoint serverEndpoint = Remoting.createEndpoint(serverExecutor,
"server");
+ int serverPort = PORT + portCounter++;
+ Cancellable socketServer = SocketProtocol.registerServerTransport(serverEndpoint,
serverExecutor, HOST, serverPort);
+ SocketServiceConfiguration<String, String> socketServiceConfiguration = new
SocketServiceConfiguration<String, String>(SERVICE_TYPE, GROUP_NAME, String.class,
String.class, HOST, serverPort);
+ SocketProtocol.startService(serverEndpoint, serverExecutor,
socketServiceConfiguration, new DrNickRequestListener());
+
+ // Create client and connect to server.
+ ExecutorService clientExecutor = new ThreadPoolExecutor(10, 10, 0L,
TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());
+ Endpoint clientEndpoint = Remoting.createEndpoint(serverExecutor,
"client");
+ SocketProtocol.registerClientTransport(clientEndpoint, clientExecutor, HOST);
+ Connection connection = getFutureResult(clientEndpoint.connect(new
URI("socket://" + HOST + ":" + serverPort), OptionMap.EMPTY),
"couldn't create Connection");
+ Client<String, String> client =
getFutureResult(connection.openClient(SERVICE_TYPE, GROUP_NAME, String.class,
String.class), "couldn't create Client");
+
+ // Send message and get response.
+ String response = getFutureResult(client.send(DR_NICK_REQUEST), "couldn't
get response");
+ assertEquals(DR_NICK_RESPONSE, response);
+
+ // Shut down.
+ client.close();
+ connection.close();
+ clientEndpoint.close();
+ socketServer.cancel();
+ serverEndpoint.close();
+ log.info(getName() + " PASSES");
+ }
+
+
+ /**
+ * Sends a message and gets a result, using Client.invoke().
+ *
+ * @throws Exception
+ */
+ public void testSocketTransportInvoke() throws Exception {
+ // Start server service.
+ log.info("entering " + getName());
+ ExecutorService serverExecutor = new ThreadPoolExecutor(10, 10, 0L,
TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());
+ Endpoint serverEndpoint = Remoting.createEndpoint(serverExecutor,
"server");
+ int serverPort = PORT + portCounter++;
+ Cancellable socketServer = SocketProtocol.registerServerTransport(serverEndpoint,
serverExecutor, HOST, serverPort);
+ SocketServiceConfiguration<String, String> socketServiceConfiguration = new
SocketServiceConfiguration<String, String>(SERVICE_TYPE, GROUP_NAME, String.class,
String.class, HOST, serverPort);
+ SocketProtocol.startService(serverEndpoint, serverExecutor,
socketServiceConfiguration, new DrNickRequestListener());
+
+ // Create client and connect to server.
+ ExecutorService clientExecutor = new ThreadPoolExecutor(10, 10, 0L,
TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());
+ Endpoint clientEndpoint = Remoting.createEndpoint(serverExecutor,
"client");
+ SocketProtocol.registerClientTransport(clientEndpoint, clientExecutor, HOST);
+ Connection connection = getFutureResult(clientEndpoint.connect(new
URI("socket://" + HOST + ":" + serverPort), OptionMap.EMPTY),
"couldn't create Connection");
+ Client<String, String> client =
getFutureResult(connection.openClient(SERVICE_TYPE, GROUP_NAME, String.class,
String.class), "couldn't create Client");
+
+ // Send message and get response.
+ String response = client.invoke(DR_NICK_REQUEST);
+ assertEquals(DR_NICK_RESPONSE, response);
+
+ // Shut down.
+ client.close();
+ connection.close();
+ clientEndpoint.close();
+ socketServer.cancel();
+ serverEndpoint.close();
+ log.info(getName() + " PASSES");
+ }
+
+
+ /**
+ * Creates two Endpoints in server mode and sends messages in both directions.
+ *
+ * @throws Exception
+ */
+ public void testSocketTwoWayTransport() throws Exception {
+ // Start west coast service.
+ log.info("entering " + getName());
+ ExecutorService westernExecutor = new ThreadPoolExecutor(10, 10, 0L,
TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());
+ Endpoint westernEndpoint = Remoting.createEndpoint(westernExecutor, "west
coast server");
+ int westernPort = PORT + portCounter++;
+ Cancellable westernServer = SocketProtocol.registerServerTransport(westernEndpoint,
westernExecutor, HOST, westernPort);
+ SocketServiceConfiguration<String, String> westernServiceConfiguration = new
SocketServiceConfiguration<String, String>(SERVICE_TYPE, GROUP_NAME, String.class,
String.class, HOST, westernPort);
+ SocketProtocol.startService(westernEndpoint, westernExecutor,
westernServiceConfiguration, new DrNickRequestListener());
+
+ // Start east coast service.
+ ExecutorService easternExecutor = new ThreadPoolExecutor(10, 10, 0L,
TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());
+ Endpoint easternEndpoint = Remoting.createEndpoint(easternExecutor, "west
coast server");
+ int easternPort = PORT + portCounter++;
+ Cancellable easternServer = SocketProtocol.registerServerTransport(easternEndpoint,
easternExecutor, HOST, easternPort);
+ SocketServiceConfiguration<String, String> easternServiceConfiguration = new
SocketServiceConfiguration<String, String>(SERVICE_TYPE, GROUP_NAME, String.class,
String.class, HOST, easternPort);
+ SocketProtocol.startService(easternEndpoint, easternExecutor,
easternServiceConfiguration, new DrNickRequestListener());
+
+ // Send message east to west and get response.
+ Connection eastWestConnection = getFutureResult(easternEndpoint.connect(new
URI("socket://" + HOST + ":" + westernPort), OptionMap.EMPTY),
"couldn't create Connection");
+ Client<String, String> eastWestClient =
getFutureResult(eastWestConnection.openClient(SERVICE_TYPE, GROUP_NAME, String.class,
String.class), "couldn't create Client");
+ String response = getFutureResult(eastWestClient.send(DR_NICK_REQUEST),
"couldn't get response from west coast");
+ assertEquals(DR_NICK_RESPONSE, response);
+ log.info("EAST to WEST PASSES");
+ response = getFutureResult(eastWestClient.send(DR_NICK_REQUEST), "couldn't
get response from west coast");
+ assertEquals(DR_NICK_RESPONSE, response);
+ log.info("EAST to WEST PASSES AGAIN");
+
+ // Send message east to west and get response.
+ Connection westEastConnection = getFutureResult(westernEndpoint.connect(new
URI("socket://" + HOST + ":" + easternPort), OptionMap.EMPTY),
"couldn't create Connection");
+ Client<String, String> westEastClient =
getFutureResult(westEastConnection.openClient(SERVICE_TYPE, GROUP_NAME, String.class,
String.class), "couldn't create Client");
+ response = getFutureResult(westEastClient.send(DR_NICK_REQUEST), "couldn't
get response from east coast");
+ assertEquals(DR_NICK_RESPONSE, response);
+ log.info("WEST to EAST PASSES");
+ response = getFutureResult(westEastClient.send(DR_NICK_REQUEST), "couldn't
get response from east coast");
+ assertEquals(DR_NICK_RESPONSE, response);
+ log.info("WEST to EAST PASSES AGAIN");
+
+ // Shut down.
+ eastWestClient.close();
+ eastWestConnection.close();
+ westEastClient.close();
+ westEastConnection.close();
+ easternServer.cancel();
+ westernServer.cancel();
+ westernEndpoint.close();
+ easternEndpoint.close();
+ log.info(getName() + " PASSES");
+ }
+
+
+ /**
+ * Registers two services on a remote Endpoint and sends a message to both of them.
+ *
+ * @throws Exception
+ */
+ public void testSocketMultipleServices() throws Exception {
+ // Create server.
+ log.info("entering " + getName());
+ ExecutorService serverExecutor = new ThreadPoolExecutor(10, 10, 0L,
TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());
+ Endpoint serverEndpoint = Remoting.createEndpoint(serverExecutor,
"server");
+ int serverPort = PORT + portCounter++;
+ Cancellable socketServer = SocketProtocol.registerServerTransport(serverEndpoint,
serverExecutor, HOST, serverPort);
+
+ // Start first service.
+ SocketServiceConfiguration<String, String> socketServiceConfiguration = new
SocketServiceConfiguration<String, String>(SERVICE_TYPE + "1", GROUP_NAME,
String.class, String.class, HOST, serverPort);
+ SocketProtocol.startService(serverEndpoint, serverExecutor,
socketServiceConfiguration, new DrNickRequestListener());
+
+ // Start second service.
+ socketServiceConfiguration = new SocketServiceConfiguration<String,
String>(SERVICE_TYPE + "2", GROUP_NAME, String.class, String.class, HOST,
serverPort);
+ SocketProtocol.startService(serverEndpoint, serverExecutor,
socketServiceConfiguration, new DrFrankensteinRequestListener());
+
+ // Create client endpoint and get connection.
+ ExecutorService clientExecutor = new ThreadPoolExecutor(10, 10, 0L,
TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());
+ Endpoint clientEndpoint = Remoting.createEndpoint(serverExecutor,
"client");
+ SocketProtocol.registerClientTransport(clientEndpoint, clientExecutor, HOST);
+ Connection connection = getFutureResult(clientEndpoint.connect(new
URI("socket://" + HOST + ":" + serverPort), OptionMap.EMPTY),
"couldn't create Connection");
+
+ // Send message to first service and get response.
+ Client<String, String> client1 =
getFutureResult(connection.openClient(SERVICE_TYPE + "1", GROUP_NAME,
String.class, String.class), "couldn't create Client");
+ String response = getFutureResult(client1.send(DR_NICK_REQUEST), "couldn't
get response");
+ assertEquals(DR_NICK_RESPONSE, response);
+
+ // Send message to second service and get response.
+ Client<String, String> client2 =
getFutureResult(connection.openClient(SERVICE_TYPE + "2", GROUP_NAME,
String.class, String.class), "couldn't create Client");
+ response = getFutureResult(client2.send(DR_FRANKENSTEIN_REQUEST),
"couldn't get response");
+ assertEquals(DR_FRANKENSTEIN_RESPONSE, response);
+
+ // Shut down.
+ client1.close();
+ client2.close();
+ connection.close();
+ clientEndpoint.close();
+ socketServer.cancel();
+ serverEndpoint.close();
+ log.info(getName() + " PASSES");
+ }
+
+
+ /**
+ * Sends url for local service and gets callbacks from a remote service.
+ *
+ * @throws Exception
+ */
+ @SuppressWarnings("unchecked")
+ public void testSocketTransportCallbackWithURL() throws Exception {
+ // Start remote service.
+ log.info("entering " + getName());
+ ExecutorService remoteExecutor = new ThreadPoolExecutor(10, 10, 0L,
TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());
+ Endpoint remoteEndpoint = Remoting.createEndpoint(remoteExecutor, "remote
endpoint");
+ int remotePort = PORT + portCounter++;
+ Cancellable remoteServer = SocketProtocol.registerServerTransport(remoteEndpoint,
remoteExecutor, HOST, remotePort);
+ SocketServiceConfiguration<RequestWrapper, Object> remoteServiceConfiguration
= new SocketServiceConfiguration<RequestWrapper, Object>(SERVICE_TYPE +
"remote", GROUP_NAME, RequestWrapper.class, Object.class, HOST, remotePort);
+ SocketProtocol.startService(remoteEndpoint, remoteExecutor,
remoteServiceConfiguration, new CallbackSenderRequestListenerURI(remoteEndpoint));
+
+ // Start local service.
+ ExecutorService localExecutor = new ThreadPoolExecutor(10, 10, 0L,
TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());
+ Endpoint localEndpoint = Remoting.createEndpoint(localExecutor, "local
server");
+ int localPort = PORT + portCounter++;
+ Cancellable localServer = SocketProtocol.registerServerTransport(localEndpoint,
localExecutor, HOST, localPort);
+ SocketServiceConfiguration<Object, Object> localServiceConfiguration = new
SocketServiceConfiguration<Object, Object>(SERVICE_TYPE + "local",
GROUP_NAME, Object.class, Object.class, HOST, localPort);
+ CallbackReceiverRequestListener callbackReceiver = new
CallbackReceiverRequestListener();
+ SocketProtocol.startService(localEndpoint, localExecutor,
localServiceConfiguration, callbackReceiver);
+
+ // Send message to remote server and get callbacks.
+ Connection connection = getFutureResult(localEndpoint.connect(new
URI("socket://" + HOST + ":" + remotePort), OptionMap.EMPTY),
"couldn't create Connection");
+ Client<Object, RequestWrapper> client =
getFutureResult(connection.openClient(SERVICE_TYPE + "remote", GROUP_NAME,
Object.class, RequestWrapper.class), "couldn't create Client");
+ RequestWrapper wrapper = new RequestWrapper();
+ wrapper.setUrl("socket://" + HOST + ":" + localPort);
+ wrapper.setServiceType(SERVICE_TYPE + "local");
+ wrapper.setGroupName(GROUP_NAME);
+ wrapper.setPayload("callback");
+ client.send(wrapper);
+ Object callback = callbackReceiver.getNext();
+ assertEquals("callback", callback);
+ callback = callbackReceiver.getNext();
+ assertEquals("callback", callback);
+ callback = callbackReceiver.getNext();
+ assertEquals("callback", callback);
+
+ // Shut down.
+ client.close();
+ connection.close();
+ localServer.cancel();
+ remoteServer.cancel();
+ remoteEndpoint.close();
+ localEndpoint.close();
+ log.info(getName() + " PASSES");
+ }
+
+
+ /**
+ * Sends a ClientConnector and gets callbacks.
+ *
+ * @throws Exception
+ */
+ public void testSocketTransportCallbackWithClientConnector() throws Exception {
+ // Start remote service.
+ log.info("entering " + getName());
+ ExecutorService remoteExecutor = new ThreadPoolExecutor(10, 10, 0L,
TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());
+ Endpoint remoteEndpoint = Remoting.createEndpoint(remoteExecutor, "remote
endpoint");
+ int remotePort = PORT + portCounter++;
+ Cancellable remoteServer = SocketProtocol.registerServerTransport(remoteEndpoint,
remoteExecutor, HOST, remotePort);
+ SocketServiceConfiguration<Object, Object> remoteServiceConfiguration = new
SocketServiceConfiguration<Object, Object>(SERVICE_TYPE + "remote",
GROUP_NAME, Object.class, Object.class, HOST, remotePort);
+ SocketProtocol.startService(remoteEndpoint, remoteExecutor,
remoteServiceConfiguration, new CallbackSenderRequestListenerClientConnector());
+
+ // Create local endpoint.
+ ExecutorService localExecutor = new ThreadPoolExecutor(10, 10, 0L,
TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());
+ Endpoint localEndpoint = Remoting.createEndpoint(localExecutor, "local
server");
+ SocketProtocol.registerClientTransport(localEndpoint, localExecutor, HOST);
+
+ // Send ClientConnector to remote server and get callbacks.
+ Connection connection = getFutureResult(localEndpoint.connect(new
URI("socket://" + HOST + ":" + remotePort), OptionMap.EMPTY),
"couldn't create Connection");
+ Client<Object, Object> client =
getFutureResult(connection.openClient(SERVICE_TYPE + "remote", GROUP_NAME,
Object.class, Object.class), "couldn't create Client");
+ CallbackReceiverRequestListener callbackReceiver = new
CallbackReceiverRequestListener();
+ ClientConnector<Object, Object> clientConnector =
connection.createClientConnector(callbackReceiver, Object.class, Object.class);
+ assertEquals("OK", client.invoke(clientConnector));
+ assertEquals("sent callback", client.invoke("send first
callback"));
+ Object callback = callbackReceiver.getNext();
+ assertEquals("callback0", callback);
+ assertEquals("sent callback", client.invoke("send second
callback"));
+ callback = callbackReceiver.getNext();
+ assertEquals("callback1", callback);
+
+ // Shut down.
+ client.close();
+ connection.close();
+ remoteServer.cancel();
+ remoteEndpoint.close();
+ localEndpoint.close();
+ log.info(getName() + " PASSES");
+ }
+
+
+ static <T> T getFutureResult(IoFuture<T> future, String errorMessage)
throws Exception {
+ switch (future.await(5000, TimeUnit.MILLISECONDS)) {
+ case DONE: {
+ return future.get();
+ }
+ case FAILED: {
+ log.error(errorMessage);
+ throw future.getException();
+ }
+ default: {
+ throw new Exception("unexpeced future state: " + future);
+ }
+ }
+ }
+
+
+ public static class DrNickRequestListener implements RequestListener<String,
String> {
+ public void handleClose() {
+ }
+
+ public void handleRequest(RequestContext<String> context, String request)
throws RemoteExecutionException {
+ try {
+ log.info(this + ": got request: " + request);
+ if (SocketUsageExamples.DR_NICK_REQUEST.equalsIgnoreCase(request))
+ context.sendReply(SocketUsageExamples.DR_NICK_RESPONSE);
+ else
+ context.sendReply(request);
+ log.info(this + ": sent response");
+ } catch (IllegalStateException e) {
+ throw new RemoteExecutionException("Dr. Nick has left the state",
e);
+ }
+ catch (IOException e){
+ throw new RemoteExecutionException("Dr. Nick has left the
building", e);
+ }
+ }
+ }
+
+
+ public static class DrFrankensteinRequestListener implements
RequestListener<String, String> {
+ public void handleClose() {
+ }
+
+ public void handleRequest(RequestContext<String> context, String request)
throws RemoteExecutionException {
+ try {
+ log.info(this + ": got request: " + request);
+ if (SocketUsageExamples.DR_FRANKENSTEIN_REQUEST.equalsIgnoreCase(request))
+ context.sendReply(SocketUsageExamples.DR_FRANKENSTEIN_RESPONSE);
+ else
+ context.sendReply(request);
+ log.info(this + ": sent response");
+ } catch (IllegalStateException e) {
+ throw new RemoteExecutionException("Dr. Nick has left the state",
e);
+ }
+ catch (IOException e){
+ throw new RemoteExecutionException("Dr. Nick has left the
building", e);
+ }
+ }
+ }
+
+
+ public static class RequestWrapper implements Serializable {
+ private static final long serialVersionUID = 1L;
+ private String url;
+ private String serviceType;
+ private String groupName;
+ private Object payload;
+ public String getUrl() {
+ return url;
+ }
+ public void setUrl(String url) {
+ this.url = url;
+ }
+ public String getServiceType() {
+ return serviceType;
+ }
+ public void setServiceType(String serviceType) {
+ this.serviceType = serviceType;
+ }
+ public String getGroupName() {
+ return groupName;
+ }
+ public void setGroupName(String groupName) {
+ this.groupName = groupName;
+ }
+ public Object getPayload() {
+ return payload;
+ }
+ public void setPayload(Object payload) {
+ this.payload = payload;
+ }
+ }
+
+
+ static class CallbackSenderRequestListenerURI<I, O> implements
RequestListener<RequestWrapper, RequestWrapper> {
+ private Endpoint endpoint;
+
+ public CallbackSenderRequestListenerURI(Endpoint endpoint) {
+ this.endpoint = endpoint;
+ }
+
+ public void handleClose() {
+ }
+
+ public void handleRequest(RequestContext<RequestWrapper> context,
RequestWrapper request) throws RemoteExecutionException {
+ log.info(this + ": got request: " + request);
+ Connection connection = null;
+ Client<Object, Object> client = null;
+ try {
+ connection = getFutureResult(endpoint.connect(new URI(request.getUrl()),
OptionMap.EMPTY), "couldn't create Connection");
+ client = getFutureResult(connection.openClient(request.getServiceType(),
request.getGroupName(), Object.class, Object.class), "couldn't create
Client");
+ log.info(this + " got client: " + client);
+ client.send(request.payload);
+ log.info(this + ": sent callback");
+ client.send(request.payload);
+ log.info(this + ": sent callback");
+ client.send(request.payload);
+ log.info(this + ": sent callback");
+ } catch (IOException e) {
+ e.printStackTrace();
+ } catch (URISyntaxException e) {
+ e.printStackTrace();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ finally {
+ try {
+ client.close();
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ try {
+ connection.close();
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+ }
+
+
+ static class CallbackSenderRequestListenerClientConnector implements
RequestListener<Object, Object> {
+ private ClientConnector<Object, Object> clientConnector;
+ private Client<Object, Object> client;
+ private int counter;
+
+ public void handleClose() {
+ if (client != null) {
+ try {
+ client.close();
+ client = null;
+ } catch (IOException e) {
+ log.warn(this + " unable to close Client " + client);
+ }
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void handleRequest(RequestContext<Object> context, Object request)
throws RemoteExecutionException {
+ log.info(this + ": got request: " + request);
+
+ if (request instanceof ClientConnector<?, ?>) {
+ clientConnector = (ClientConnector<Object, Object>) request;
+ try {
+ client = clientConnector.getFutureClient().get();
+ } catch (Exception e) {
+ log.error("unable to create Client", e);
+ fail(context, "unable to create Client", e);
+ return;
+ }
+ answer(context, "OK");
+ return;
+ }
+
+ if (clientConnector == null) {
+ fail(context, "ClientConnector has not been received", null);
+ return;
+ }
+
+ try {
+ client.send("callback" + counter++);
+ answer(context, "sent callback");
+ } catch (IOException e) {
+ fail(context, "unable to send response", e);
+ }
+ }
+
+ void answer(RequestContext<Object> context, Object response) {
+ try {
+ context.sendReply(response);
+ return;
+ } catch (Exception e) {
+ try {
+ context.sendFailure("unable to respond", e);
+ return;
+ } catch (Exception e1) {
+ log.error(this + " unable to return exception", e1);
+ return;
+ }
+ }
+ }
+
+ void fail(RequestContext<Object> context, String response, Throwable t) {
+ log.error(response, t);
+ try {
+ context.sendFailure(response, t);
+ return;
+ } catch (Exception e) {
+ try {
+ context.sendFailure("unable to respond", e);
+ return;
+ } catch (Exception e1) {
+ log.error(this + " unable to return exception", e1);
+ return;
+ }
+ }
+ }
+ }
+
+
+ public static class CallbackReceiverRequestListener implements
RequestListener<Object, Object> {
+ private static Logger log =
Logger.getLogger(CallbackReceiverRequestListener.class);
+ private ArrayList<Object> callbacks = new ArrayList<Object>();
+
+ public void handleClose() {
+ }
+
+ public void handleRequest(RequestContext<Object> context, Object callback)
throws RemoteExecutionException {
+ log.info(this + ": got callback: " + callback);
+ callbacks.add(callback);
+ synchronized (callbacks) {
+ callbacks.notify();
+ }
+ try {
+ context.sendReply("got callback");
+ } catch (Exception e) {
+ try {
+ context.sendFailure("unable to send reply", e);
+ } catch (Exception e1) {
+ log.warn("unable to send failure message", e1);
+ }
+ }
+ }
+
+ public Object getNext() {
+ if (callbacks.size() == 0) {
+ synchronized (callbacks) {
+ while (true) {
+ try {
+ log.info(this + " waiting for a callback to return");
+ callbacks.wait();
+ break;
+ } catch (InterruptedException e) {
+ // ignore
+ }
+ }
+ }
+ }
+ return callbacks.remove(0);
+ }
+ }
+}
+
Added:
remoting3/trunk/samples/src/main/java/org/jboss/remoting3/samples/socket/client/SocketClientConnectionHandler.java
===================================================================
---
remoting3/trunk/samples/src/main/java/org/jboss/remoting3/samples/socket/client/SocketClientConnectionHandler.java
(rev 0)
+++
remoting3/trunk/samples/src/main/java/org/jboss/remoting3/samples/socket/client/SocketClientConnectionHandler.java 2009-11-13
03:36:46 UTC (rev 5590)
@@ -0,0 +1,136 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2009, Red Hat Middleware LLC, and individual contributors
+ * as indicated by the @author tags. See the copyright.txt file in the
+ * distribution for a full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+package org.jboss.remoting3.samples.socket.client;
+
+import java.io.IOException;
+import java.net.Socket;
+import java.net.URI;
+import java.util.concurrent.Executor;
+
+import org.jboss.marshalling.Marshaller;
+import org.jboss.marshalling.MarshallerFactory;
+import org.jboss.marshalling.Marshalling;
+import org.jboss.marshalling.MarshallingConfiguration;
+import org.jboss.marshalling.Unmarshaller;
+import org.jboss.remoting3.CloseHandler;
+import org.jboss.remoting3.samples.socket.SocketHandleableCloseable;
+import org.jboss.remoting3.samples.socket.SocketProtocol;
+import org.jboss.remoting3.spi.AbstractHandleableCloseable;
+import org.jboss.remoting3.spi.ConnectionHandler;
+import org.jboss.remoting3.spi.RequestHandler;
+import org.jboss.remoting3.spi.RequestHandlerConnector;
+import org.jboss.xnio.Cancellable;
+import org.jboss.xnio.IoUtils;
+import org.jboss.xnio.OptionMap;
+import org.jboss.xnio.Result;
+import org.jboss.xnio.log.Logger;
+
+/**
+ * @author <a href="ron.sigal(a)jboss.com">Ron Sigal</a>
+ * @version
+ * <p>
+ * Copyright Oct 14, 2009
+ * </p>
+ */
+public class SocketClientConnectionHandler extends
AbstractHandleableCloseable<SocketHandleableCloseable> implements ConnectionHandler,
SocketHandleableCloseable {
+ private static final Logger log =
Logger.getLogger("org.jboss.remoting.samples.socket.client.SocketClientConnectionHandler");
+
+ private String remoteHost;
+ private int remotePort;
+ private String localHost;
+
+ public SocketClientConnectionHandler(URI uri, OptionMap connectOptions, Executor
executor, String localHost, int localPort) {
+ super(executor);
+ this.remoteHost = uri.getHost();
+ this.remotePort = uri.getPort();
+ this.localHost = localHost;
+ }
+
+ public RequestHandlerConnector createConnector(RequestHandler localHandler) {
+ try {
+ SocketRequestHandlerConnector<?, ?> connector = new
SocketRequestHandlerConnector<Object, Object>(getExecutor(), localHandler,
localHost);
+ registerCloseHandler(connector);
+ return connector;
+ } catch (IOException e) {
+ log.error(this + " unable to create SocketRequestHandlerConnector",
e);
+ return null;
+ }
+ }
+
+ public Cancellable open(String serviceName, String groupName,
Result<RequestHandler> result) {
+ try
+ {
+ final Socket socket = new Socket(remoteHost, remotePort);
+ log.info("client created socket");
+ MarshallerFactory factory = SocketProtocol.getMarshallerFactory();
+ MarshallingConfiguration configuration =
SocketProtocol.getMarshallingConfiguration();
+ final Marshaller marshaller = factory.createMarshaller(configuration);
+ final Unmarshaller unmarshaller = factory.createUnmarshaller(configuration);
+ marshaller.start(Marshalling.createByteOutput(socket.getOutputStream()));
+ marshaller.writeUTF(serviceName);
+ marshaller.writeUTF(groupName);
+ marshaller.flush();
+ unmarshaller.start(Marshalling.createByteInput(socket.getInputStream()));
+ result.setResult(new SocketClientRequestHandler(getExecutor(), marshaller,
unmarshaller));
+ registerCloseHandler(socket, marshaller, unmarshaller);
+ } catch (IOException e) {
+ result.setException(e);
+ }
+ return IoUtils.nullCancellable();
+ }
+
+ protected void registerCloseHandler(final Socket socket, final Marshaller marshaller,
final Unmarshaller unmarshaller) {
+ addCloseHandler(new CloseHandler<Object>() {
+ public void handleClose(Object closed) {
+ try {
+ marshaller.close();
+ } catch (IOException e) {
+ log.warn(this + " unable to close marshaller: " + marshaller);
+ } finally {
+ try {
+ unmarshaller.close();
+ } catch (IOException e) {
+ log.warn(this + " unable to close unmarshaller: " +
unmarshaller);
+ } finally {
+ try {
+ socket.close();
+ } catch (IOException e) {
+ log.warn(this + " unable to close socket: " + socket);
+ }
+ }
+ }
+ }
+ });
+ }
+
+ protected void registerCloseHandler(final SocketRequestHandlerConnector<?, ?>
requestHandlerConnector) {
+ addCloseHandler(new CloseHandler<Object>() {
+ public void handleClose(Object closed) {
+ try {
+ requestHandlerConnector.close();
+ } catch (IOException e) {
+ log.warn(this + " unable to close SocketRequestHandlerConnector:
" + requestHandlerConnector);
+ }
+ }
+ });
+ }
+}
Added:
remoting3/trunk/samples/src/main/java/org/jboss/remoting3/samples/socket/client/SocketClientRequestHandler.java
===================================================================
---
remoting3/trunk/samples/src/main/java/org/jboss/remoting3/samples/socket/client/SocketClientRequestHandler.java
(rev 0)
+++
remoting3/trunk/samples/src/main/java/org/jboss/remoting3/samples/socket/client/SocketClientRequestHandler.java 2009-11-13
03:36:46 UTC (rev 5590)
@@ -0,0 +1,90 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2009, Red Hat Middleware LLC, and individual contributors
+ * as indicated by the @author tags. See the copyright.txt file in the
+ * distribution for a full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+package org.jboss.remoting3.samples.socket.client;
+
+import java.io.IOException;
+import java.util.concurrent.Executor;
+
+import org.jboss.marshalling.Marshaller;
+import org.jboss.marshalling.Unmarshaller;
+import org.jboss.remoting3.spi.AbstractHandleableCloseable;
+import org.jboss.remoting3.spi.RemoteRequestContext;
+import org.jboss.remoting3.spi.ReplyHandler;
+import org.jboss.remoting3.spi.RequestHandler;
+import org.jboss.remoting3.spi.SpiUtils;
+import org.jboss.xnio.IoUtils;
+import org.jboss.xnio.log.Logger;
+
+/**
+ * @author <a href="ron.sigal(a)jboss.com">Ron Sigal</a>
+ * @version
+ * <p>
+ * Copyright Oct 14, 2009
+ * </p>
+ */
+public class SocketClientRequestHandler extends
AbstractHandleableCloseable<RequestHandler> implements RequestHandler {
+ private static final Logger log =
Logger.getLogger("org.jboss.remoting.samples.socket.SocketRequestHandler");
+
+ private Marshaller marshaller;
+ private Unmarshaller unmarshaller;
+
+ public SocketClientRequestHandler(Executor executor, Marshaller marshaller,
Unmarshaller unmarshaller) {
+ super(executor);
+ this.marshaller = marshaller;
+ this.unmarshaller = unmarshaller;
+ }
+
+ public RemoteRequestContext receiveRequest(Object request, final ReplyHandler
replyHandler) {
+ try {
+ marshaller.writeObject(request);
+ marshaller.flush();
+ log.info(this + ": sent request: " + request);
+ }
+ catch (IOException e) {
+ SpiUtils.safeHandleException(replyHandler, e);
+ }
+
+ getExecutor().execute(new Runnable() {
+ public void run() {
+ try {
+ log.info(SocketClientRequestHandler.this + ": waiting for
reply");
+ Object reply = unmarshaller.readObject();
+ log.info(this + ": reply: " + reply);
+ SpiUtils.safeHandleReply(replyHandler, reply);
+ } catch (ClassNotFoundException e) {
+ SpiUtils.safeHandleException(replyHandler, new IOException("Cannot
find class: " + e.getMessage(), e));
+ } catch (IOException e) {
+ SpiUtils.safeHandleException(replyHandler, e);
+ }
+ }
+ });
+
+ return new RemoteRequestContext() {
+ public RemoteRequestContext cancel() {
+ log.debug("Closing " + SocketClientRequestHandler.this);
+ IoUtils.safeClose(SocketClientRequestHandler.this);
+ return this;
+ }
+ };
+ }
+}
+
Added:
remoting3/trunk/samples/src/main/java/org/jboss/remoting3/samples/socket/client/SocketRequestHandlerConnector.java
===================================================================
---
remoting3/trunk/samples/src/main/java/org/jboss/remoting3/samples/socket/client/SocketRequestHandlerConnector.java
(rev 0)
+++
remoting3/trunk/samples/src/main/java/org/jboss/remoting3/samples/socket/client/SocketRequestHandlerConnector.java 2009-11-13
03:36:46 UTC (rev 5590)
@@ -0,0 +1,188 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2009, Red Hat Middleware LLC, and individual contributors
+ * as indicated by the @author tags. See the copyright.txt file in the
+ * distribution for a full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+package org.jboss.remoting3.samples.socket.client;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.lang.reflect.Method;
+import java.net.InetSocketAddress;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.util.concurrent.Executor;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import org.jboss.marshalling.Marshaller;
+import org.jboss.marshalling.MarshallerFactory;
+import org.jboss.marshalling.Marshalling;
+import org.jboss.marshalling.MarshallingConfiguration;
+import org.jboss.marshalling.Unmarshaller;
+import org.jboss.remoting3.CloseHandler;
+import org.jboss.remoting3.samples.socket.SocketHandleableCloseable;
+import org.jboss.remoting3.samples.socket.SocketProtocol;
+import org.jboss.remoting3.samples.socket.server.SocketServerRequestHandler;
+import org.jboss.remoting3.spi.AbstractHandleableCloseable;
+import org.jboss.remoting3.spi.RequestHandler;
+import org.jboss.remoting3.spi.RequestHandlerConnector;
+import org.jboss.xnio.Cancellable;
+import org.jboss.xnio.IoUtils;
+import org.jboss.xnio.Result;
+import org.jboss.xnio.log.Logger;
+
+/**
+ * @author <a href="ron.sigal(a)jboss.com">Ron Sigal</a>
+ * @version
+ * <p>
+ * Copyright Nov 10, 2009
+ * </p>
+ */
+public class SocketRequestHandlerConnector<I, O> extends
AbstractHandleableCloseable<SocketHandleableCloseable> implements
RequestHandlerConnector, SocketHandleableCloseable, Serializable {
+ private static final long serialVersionUID = 37933691697892626L;
+ private static final Logger log =
Logger.getLogger(SocketRequestHandlerConnector.class);
+
+ private String callbackHost;
+ private transient RequestHandlerServer requestHandlerServer;
+ private int callbackPort;
+ private SocketClientRequestHandler socketClientRequestHandler;
+
+ public SocketRequestHandlerConnector() {
+ // ???
+ super(new ThreadPoolExecutor(10, 10, 0L, TimeUnit.SECONDS, new
LinkedBlockingQueue<Runnable>()));
+ }
+
+ public SocketRequestHandlerConnector(Executor executor, RequestHandler
localRequestHandler, String callbackHost) throws IOException {
+ super(executor);
+ this.callbackHost = callbackHost;
+ requestHandlerServer = new RequestHandlerServer(localRequestHandler,
callbackHost);
+ callbackPort = requestHandlerServer.getLocalPort();
+ requestHandlerServer.start();
+ }
+
+ public Cancellable createRequestHandler(Result<RequestHandler> result) throws
SecurityException {
+ if (socketClientRequestHandler != null) {
+ throw new SecurityException(this + ": a SocketClientRequestHandler has
already been created");
+ }
+
+ try
+ {
+ Socket socket = new Socket(callbackHost, callbackPort);
+ log.info("server created callback Socket");
+ MarshallerFactory factory = SocketProtocol.getMarshallerFactory();
+ MarshallingConfiguration configuration =
SocketProtocol.getMarshallingConfiguration();
+ final Marshaller marshaller = factory.createMarshaller(configuration);
+ final Unmarshaller unmarshaller = factory.createUnmarshaller(configuration);
+ marshaller.start(Marshalling.createByteOutput(socket.getOutputStream()));
+ marshaller.flush();
+ unmarshaller.start(Marshalling.createByteInput(socket.getInputStream()));
+ socketClientRequestHandler = new SocketClientRequestHandler(getExecutor(),
marshaller, unmarshaller);
+ result.setResult(socketClientRequestHandler);
+ registerCloseHandler(socketClientRequestHandler);
+ registerCloseHandler(marshaller);
+ registerCloseHandler(unmarshaller);
+ registerCloseHandler(socket);
+ } catch (IOException e) {
+ result.setException(e);
+ }
+
+ return IoUtils.nullCancellable();
+ }
+
+ void registerCloseHandler(final Object o) {
+ try {
+ final Method close = o.getClass().getMethod("close");
+ addCloseHandler(new CloseHandler<Object>() {
+ public void handleClose(Object closed) {
+ try {
+ close.invoke(o);
+ } catch (Exception e) {
+ log.warn(this + " unable to close " + o, e);
+ }
+ }
+ });
+ } catch (Exception e) {
+ throw new RuntimeException(this + " got object without close method: "
+ o);
+ }
+ }
+
+ static class RequestHandlerServer extends Thread {
+ private RequestHandler localRequestHandler;
+ private ServerSocket serverSocket;
+ private Socket socket;
+ private SocketServerRequestHandler socketServerRequestHandler;
+
+ RequestHandlerServer(RequestHandler localRequestHandler, String localHost) throws
IOException {
+ this.localRequestHandler = localRequestHandler;
+ serverSocket = new ServerSocket();
+ serverSocket.bind(new InetSocketAddress(localHost, 0));
+ }
+
+ public void run() {
+ try
+ {
+ socket = serverSocket.accept();
+ log.info("client created callback Socket");
+ socketServerRequestHandler = new
SocketServerRequestHandler(localRequestHandler, socket);
+ socketServerRequestHandler.start();
+ } catch (IOException e) {
+ log.error(this + " unable to accept a new Socket", e);
+ } finally {
+ try {
+ serverSocket.close();
+ } catch (IOException e) {
+ log.warn(this + " unable to close ServerSocket " + serverSocket,
e);
+ }
+ serverSocket = null;
+ }
+ }
+
+ int getLocalPort() {
+ return serverSocket.getLocalPort();
+ }
+
+ void close() {
+ try {
+ if (serverSocket != null) {
+ serverSocket.close();
+ }
+ } catch (IOException e) {
+ log.warn(this + " unable to close ServerSocket " + serverSocket,
e);
+ } finally {
+ try {
+ if (socket != null) {
+ socket.close();
+ }
+ } catch (IOException e) {
+ log.warn(this + " unable to close Socket " + socket, e);
+ } finally {
+ try {
+ if (socketServerRequestHandler != null) {
+ socketServerRequestHandler.close();
+ }
+ } catch (IOException e) {
+ log.warn(this + " unable to close SocketServerRequestHandler
" + socketServerRequestHandler, e);
+ }
+ }
+ }
+ }
+ }
+}
Added:
remoting3/trunk/samples/src/main/java/org/jboss/remoting3/samples/socket/server/SocketClientListener.java
===================================================================
---
remoting3/trunk/samples/src/main/java/org/jboss/remoting3/samples/socket/server/SocketClientListener.java
(rev 0)
+++
remoting3/trunk/samples/src/main/java/org/jboss/remoting3/samples/socket/server/SocketClientListener.java 2009-11-13
03:36:46 UTC (rev 5590)
@@ -0,0 +1,69 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2009, Red Hat Middleware LLC, and individual contributors
+ * as indicated by the @author tags. See the copyright.txt file in the
+ * distribution for a full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+package org.jboss.remoting3.samples.socket.server;
+
+import java.io.IOException;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.jboss.remoting3.ClientContext;
+import org.jboss.remoting3.ClientListener;
+import org.jboss.remoting3.CloseHandler;
+import org.jboss.remoting3.Endpoint;
+import org.jboss.remoting3.RequestListener;
+import org.jboss.remoting3.samples.socket.SocketServiceConfiguration;
+
+/**
+ * @author <a href="ron.sigal(a)jboss.com">Ron Sigal</a>
+ * @version
+ * <p>
+ * Copyright Oct 16, 2009
+ * </p>
+ */
+public class SocketClientListener<I, O> implements ClientListener<I, O> {
+ private static ConcurrentHashMap<RequestListener<?, ?>,
SocketServiceConfiguration<?, ?>> requestListeners = new
ConcurrentHashMap<RequestListener<?, ?>, SocketServiceConfiguration<?,
?>>();
+ private RequestListener<I, O> requestListener;
+
+ public static SocketServiceConfiguration<?, ?>
getRequestListenerInfo(RequestListener<?, ?> requestListener) {
+ return requestListeners.get(requestListener);
+ }
+
+ public SocketClientListener(Endpoint endpoint, SocketServiceConfiguration<I, O>
socketConfig, final RequestListener<I, O> requestListener) throws IOException {
+ if (requestListeners.containsKey(requestListener)) {
+ throw new IOException(requestListener + " is already registered");
+ }
+ if (requestListeners.values().contains(socketConfig)) {
+ throw new IOException("RequestListener with characterized by " +
socketConfig + " is already registered");
+ }
+ requestListeners.put(requestListener, socketConfig);
+ endpoint.addCloseHandler(new CloseHandler<Object>() {
+ public void handleClose(Object closed) {
+ requestListeners.remove(requestListener);
+ }
+ });
+ this.requestListener = requestListener;
+ }
+
+ public RequestListener<I, O> handleClientOpen(ClientContext clientContext) {
+ return requestListener;
+ }
+}
+
Added:
remoting3/trunk/samples/src/main/java/org/jboss/remoting3/samples/socket/server/SocketServerConnectionHandler.java
===================================================================
---
remoting3/trunk/samples/src/main/java/org/jboss/remoting3/samples/socket/server/SocketServerConnectionHandler.java
(rev 0)
+++
remoting3/trunk/samples/src/main/java/org/jboss/remoting3/samples/socket/server/SocketServerConnectionHandler.java 2009-11-13
03:36:46 UTC (rev 5590)
@@ -0,0 +1,129 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2009, Red Hat Middleware LLC, and individual contributors
+ * as indicated by the @author tags. See the copyright.txt file in the
+ * distribution for a full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+package org.jboss.remoting3.samples.socket.server;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.util.concurrent.Executor;
+
+import org.jboss.marshalling.MarshallingConfiguration;
+import org.jboss.marshalling.reflect.SunReflectiveCreator;
+import org.jboss.remoting3.CloseHandler;
+import org.jboss.remoting3.Endpoint;
+import org.jboss.remoting3.samples.socket.SocketHandleableCloseable;
+import org.jboss.remoting3.spi.AbstractHandleableCloseable;
+import org.jboss.remoting3.spi.ConnectionHandler;
+import org.jboss.remoting3.spi.ConnectionHandlerContext;
+import org.jboss.remoting3.spi.RequestHandler;
+import org.jboss.remoting3.spi.RequestHandlerConnector;
+import org.jboss.xnio.Cancellable;
+import org.jboss.xnio.Result;
+import org.jboss.xnio.log.Logger;
+
+/**
+ * @author <a href="ron.sigal(a)jboss.com">Ron Sigal</a>
+ * @version
+ * <p>
+ * Copyright Oct 16, 2009
+ * </p>
+ */
+public class SocketServerConnectionHandler<I, O> extends
AbstractHandleableCloseable<SocketHandleableCloseable> implements ConnectionHandler,
SocketHandleableCloseable {
+ private static final Logger log =
Logger.getLogger("org.jboss.remoting.samples.socket.server.SocketServerAcceptor");
+
+ private Endpoint endpoint;
+ private ConnectionHandlerContext connectionHandlerContext;
+ private String host;
+ private int port;
+ private MarshallingConfiguration marshallingConfig;
+ private boolean running;
+
+ public SocketServerConnectionHandler(Endpoint endpoint, Executor executor,
ConnectionHandlerContext connectionHandlerContext, String host, int port) {
+ super(executor);
+ this.endpoint = endpoint;
+ this.host = host;
+ this.port = port;
+ this.connectionHandlerContext = connectionHandlerContext;
+ marshallingConfig = new MarshallingConfiguration();
+ marshallingConfig.setCreator(new SunReflectiveCreator());
+ }
+
+ public void start() throws IOException {
+ running = true;
+ final ServerSocket ss = new ServerSocket(port, 200, InetAddress.getByName(host));
+ new Thread() {
+ public void run() {
+ while (running) {
+ try {
+ Socket socket = ss.accept();
+ log.info("server created socket");
+ SocketServerRequestHandler requestHandler = new
SocketServerRequestHandler(endpoint, socket, connectionHandlerContext);
+ registerServerSocketCloseHandler(requestHandler);
+ requestHandler.start();
+ }
+ catch (IOException e) {
+ log.error("Error handling new connection", e);
+ }
+ }
+ try {
+ ss.close();
+ } catch (IOException e) {
+ log.warn("Error closing ServerSocket: " + ss);
+ }
+ }
+ }.start();
+ }
+
+
+ public void stop() {
+ running = false;
+ try {
+ close();
+ } catch (IOException e) {
+ log.warn(this + " unable to close");
+ }
+ }
+
+ @Override
+ public RequestHandlerConnector createConnector(RequestHandler localHandler) {
+ return null;
+ }
+
+ @Override
+ public Cancellable open(String serviceName, String groupName,
Result<RequestHandler> result) {
+ return null;
+ }
+
+ protected void registerServerSocketCloseHandler(final SocketServerRequestHandler
requestHandler) {
+ addCloseHandler(new CloseHandler<Object>() {
+ public void handleClose(Object closed) {
+ try {
+ requestHandler.close();
+ } catch (IOException e) {
+ log.warn("unable to close SocketServerRequestHandler: " +
requestHandler);
+ }
+ }
+ });
+ }
+}
+
Added:
remoting3/trunk/samples/src/main/java/org/jboss/remoting3/samples/socket/server/SocketServerReplyHandler.java
===================================================================
---
remoting3/trunk/samples/src/main/java/org/jboss/remoting3/samples/socket/server/SocketServerReplyHandler.java
(rev 0)
+++
remoting3/trunk/samples/src/main/java/org/jboss/remoting3/samples/socket/server/SocketServerReplyHandler.java 2009-11-13
03:36:46 UTC (rev 5590)
@@ -0,0 +1,61 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2009, Red Hat Middleware LLC, and individual contributors
+ * as indicated by the @author tags. See the copyright.txt file in the
+ * distribution for a full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+package org.jboss.remoting3.samples.socket.server;
+
+import java.io.IOException;
+
+import org.jboss.marshalling.Marshaller;
+import org.jboss.remoting3.spi.ReplyHandler;
+import org.jboss.xnio.log.Logger;
+
+/**
+ * @author <a href="ron.sigal(a)jboss.com">Ron Sigal</a>
+ * @version
+ * <p>
+ * Copyright Oct 16, 2009
+ * </p>
+ */
+public class SocketServerReplyHandler implements ReplyHandler
+{
+ private static final Logger log = Logger.getLogger(SocketServerReplyHandler.class);
+ private Marshaller marshaller;
+
+ public SocketServerReplyHandler(Marshaller marshaller) {
+ this.marshaller = marshaller;
+ }
+
+ public void handleCancellation() throws IOException {
+ }
+
+ public void handleException(IOException exception) throws IOException {
+ marshaller.writeObject(exception);
+ marshaller.flush();
+ }
+
+ public void handleReply(Object reply) throws IOException {
+ log.info(this + " handling reply: " + reply);
+ marshaller.writeObject(reply);
+ marshaller.flush();
+ log.info(this + " handled reply: " + reply);
+ }
+}
+
Added:
remoting3/trunk/samples/src/main/java/org/jboss/remoting3/samples/socket/server/SocketServerRequestHandler.java
===================================================================
---
remoting3/trunk/samples/src/main/java/org/jboss/remoting3/samples/socket/server/SocketServerRequestHandler.java
(rev 0)
+++
remoting3/trunk/samples/src/main/java/org/jboss/remoting3/samples/socket/server/SocketServerRequestHandler.java 2009-11-13
03:36:46 UTC (rev 5590)
@@ -0,0 +1,178 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2009, Red Hat Middleware LLC, and individual contributors
+ * as indicated by the @author tags. See the copyright.txt file in the
+ * distribution for a full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+package org.jboss.remoting3.samples.socket.server;
+
+import java.io.IOException;
+import java.net.Socket;
+
+import org.jboss.marshalling.Marshaller;
+import org.jboss.marshalling.MarshallerFactory;
+import org.jboss.marshalling.Marshalling;
+import org.jboss.marshalling.MarshallingConfiguration;
+import org.jboss.marshalling.Unmarshaller;
+import org.jboss.remoting3.CloseHandler;
+import org.jboss.remoting3.Endpoint;
+import org.jboss.remoting3.ServiceNotFoundException;
+import org.jboss.remoting3.ServiceURI;
+import org.jboss.remoting3.samples.socket.RequestHandlerFuture;
+import org.jboss.remoting3.samples.socket.SocketProtocol;
+import org.jboss.remoting3.spi.ConnectionHandlerContext;
+import org.jboss.remoting3.spi.RemoteRequestContext;
+import org.jboss.remoting3.spi.ReplyHandler;
+import org.jboss.remoting3.spi.RequestHandler;
+import org.jboss.xnio.OptionMap;
+import org.jboss.xnio.log.Logger;
+
+/**
+ * @author <a href="ron.sigal(a)jboss.com">Ron Sigal</a>
+ * @version
+ * <p>
+ * Copyright Oct 14, 2009
+ * </p>
+ */
+public class SocketServerRequestHandler extends Thread implements RequestHandler {
+ private static final Logger log = Logger.getLogger(SocketServerRequestHandler.class);
+ private Socket socket;
+ private Marshaller marshaller;
+ private Unmarshaller unmarshaller;
+ private RequestHandler requestHandler;
+ private ReplyHandler replyHandler;
+ private boolean running;
+
+ /**
+ * Calling this constructor creates a
+ *
+ */
+ public <I, O> SocketServerRequestHandler(final Endpoint endpoint, Socket socket,
ConnectionHandlerContext connectionHandlerContext) {
+ try {
+ this.socket = socket;
+ MarshallerFactory factory = SocketProtocol.getMarshallerFactory();
+ MarshallingConfiguration configuration =
SocketProtocol.getMarshallingConfiguration();
+ marshaller = factory.createMarshaller(configuration);
+ marshaller.start(Marshalling.createByteOutput(socket.getOutputStream()));
+ marshaller.flush();
+ unmarshaller = factory.createUnmarshaller(configuration);
+ unmarshaller.start(Marshalling.createByteInput(socket.getInputStream()));
+ final String serviceType = unmarshaller.readUTF();
+ final String groupName = unmarshaller.readUTF();
+ final RequestHandlerFuture requestHandlerFuture = new RequestHandlerFuture();
+
+ ConnectionHandlerContext.ServiceResult serviceResult = new
ConnectionHandlerContext.ServiceResult() {
+ public void opened(final RequestHandler requestHandler, final OptionMap
optionMap) {
+ requestHandlerFuture.setResult(requestHandler);
+ }
+ public void notFound() {
+ requestHandlerFuture.setException(new
ServiceNotFoundException(ServiceURI.create(serviceType, groupName, endpoint.getName()),
"No such service located"));
+ }
+ };
+
+ connectionHandlerContext.openService(serviceType, groupName, OptionMap.EMPTY,
serviceResult);
+ requestHandler = requestHandlerFuture.get();
+ if (requestHandler == null) {
+ throw requestHandlerFuture.getException();
+ }
+ replyHandler = new SocketServerReplyHandler(marshaller);
+ } catch (Exception e) {
+ throw new RuntimeException("unable to process socket: " + socket, e);
+ }
+ }
+
+
+ public <I, O> SocketServerRequestHandler(RequestHandler localRequestHandler,
Socket socket) {
+ try {
+ this.requestHandler = localRequestHandler;
+ this.socket = socket;
+ MarshallerFactory factory = SocketProtocol.getMarshallerFactory();
+ MarshallingConfiguration configuration =
SocketProtocol.getMarshallingConfiguration();
+ marshaller = factory.createMarshaller(configuration);
+ marshaller.start(Marshalling.createByteOutput(socket.getOutputStream()));
+ marshaller.flush();
+ unmarshaller = factory.createUnmarshaller(configuration);
+ unmarshaller.start(Marshalling.createByteInput(socket.getInputStream()));
+ replyHandler = new SocketServerReplyHandler(marshaller);
+ } catch (Exception e) {
+ throw new RuntimeException("unable to process socket: " + socket, e);
+ }
+ }
+
+
+ @Override
+ public void run() {
+ running = true;
+ while (running) {
+ Object request;
+ try {
+ log.info(SocketServerRequestHandler.this + " waiting for next
request");
+ request = unmarshaller.readObject();
+ log.info(SocketServerRequestHandler.this + " got request: " +
request);
+ requestHandler.receiveRequest(request, replyHandler);
+ } catch (ClassNotFoundException e) {
+ e.printStackTrace();
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+ finishClose();
+ }
+
+
+ @Override
+ public void close() throws IOException {
+ running = false;
+ }
+
+
+ @Override
+ public RemoteRequestContext receiveRequest(Object request, ReplyHandler replyHandler)
{
+ return null;
+ }
+
+
+ @Override
+ public org.jboss.remoting3.HandleableCloseable.Key addCloseHandler( CloseHandler<?
super RequestHandler> handler) {
+ return null;
+ }
+
+ public String toString() {
+ return "SocketServerRequestHandler[" + super.toString() + "]";
+ }
+
+ protected void finishClose() {
+ try {
+ marshaller.close();
+ } catch (IOException e) {
+ log.error(this + " unable to close Marshaller " + marshaller, e);
+ } finally {
+ try {
+ unmarshaller.close();
+ } catch (IOException e) {
+ log.error(this + " unable to close Unmarshaller " + unmarshaller,
e);
+ } finally {
+ try {
+ socket.close();
+ } catch (IOException e) {
+ log.error(this + " unable to close Socket " + socket, e);
+ }
+ }
+ }
+ }
+}