[jboss-remoting-commits] JBoss Remoting SVN: r4395 - in remoting3/trunk: api/src/main/java/org/jboss/cx/remoting/spi and 7 other directories.

jboss-remoting-commits at lists.jboss.org jboss-remoting-commits at lists.jboss.org
Fri Jul 18 19:18:53 EDT 2008


Author: david.lloyd at jboss.com
Date: 2008-07-18 19:18:52 -0400 (Fri, 18 Jul 2008)
New Revision: 4395

Added:
   remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/ServiceRegistry.java
Modified:
   remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/Endpoint.java
   remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/SpiUtils.java
   remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/remote/RemoteClientEndpoint.java
   remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/remote/RemoteClientEndpointListener.java
   remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/remote/RemoteServiceEndpoint.java
   remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/remote/ReplyHandler.java
   remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/wrapper/EndpointWrapper.java
   remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/ClientImpl.java
   remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/ClientSourceImpl.java
   remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/EndpointImpl.java
   remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/FutureReplyImpl.java
   remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/RemoteClientEndpointLocalImpl.java
   remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/RemoteServiceEndpointLocalImpl.java
   remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/RequestContextImpl.java
   remoting3/trunk/core/src/test/java/org/jboss/cx/remoting/core/EndpointTestCase.java
   remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/BasicHandler.java
   remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/BasicProtocol.java
   remoting3/trunk/protocol/basic/src/test/java/org/jboss/cx/remoting/protocol/basic/ConnectionTestCase.java
   remoting3/trunk/standalone/src/main/java/org/jboss/cx/remoting/Remoting.java
Log:
RemoteClient/ServiceEndpoints need and should not be generic

Modified: remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/Endpoint.java
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/Endpoint.java	2008-07-18 23:18:15 UTC (rev 4394)
+++ remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/Endpoint.java	2008-07-18 23:18:52 UTC (rev 4395)
@@ -37,7 +37,7 @@
      * @return the client
      * @throws RemotingException if an error occurs
      */
-    <I, O> RemoteClientEndpoint<I, O> createClientEndpoint(RequestListener<I, O> requestListener) throws RemotingException;
+    <I, O> RemoteClientEndpoint createClientEndpoint(RequestListener<I, O> requestListener) throws RemotingException;
 
     /**
      * Create a client source that can be used to acquire clients associated with a request listener on this endpoint.
@@ -52,7 +52,7 @@
      * @return the context source
      * @throws RemotingException if an error occurs
      */
-    <I, O> RemoteServiceEndpoint<I, O> createServiceEndpoint(RequestListener<I, O> requestListener) throws RemotingException;
+    <I, O> RemoteServiceEndpoint createServiceEndpoint(RequestListener<I, O> requestListener) throws RemotingException;
 
     /**
      * Create a client from a remote client endpoint.
@@ -63,7 +63,7 @@
      * @return the client
      * @throws RemotingException if an error occurs
      */
-    <I, O> Client<I, O> createClient(RemoteClientEndpoint<I, O> endpoint) throws RemotingException;
+    <I, O> Client<I, O> createClient(RemoteClientEndpoint endpoint) throws RemotingException;
 
     /**
      * Create a client source from a remote service endpoint.
@@ -74,5 +74,5 @@
      * @return the client source
      * @throws RemotingException if an error occurs
      */
-    <I, O> ClientSource<I, O> createClientSource(RemoteServiceEndpoint<I, O> endpoint) throws RemotingException;
+    <I, O> ClientSource<I, O> createClientSource(RemoteServiceEndpoint endpoint) throws RemotingException;
 }

Modified: remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/SpiUtils.java
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/SpiUtils.java	2008-07-18 23:18:15 UTC (rev 4394)
+++ remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/SpiUtils.java	2008-07-18 23:18:52 UTC (rev 4395)
@@ -48,7 +48,7 @@
      * @param msg the message
      * @param cause the cause
      */
-    public static void safeHandleException(final ReplyHandler<?> replyHandler, final String msg, final Throwable cause) {
+    public static void safeHandleException(final ReplyHandler replyHandler, final String msg, final Throwable cause) {
         try {
             replyHandler.handleException(msg, cause);
         } catch (Throwable t) {
@@ -63,7 +63,7 @@
      * @param replyHandler the reply handler
      * @param reply the reply
      */
-    public static <O> void safeHandleReply(final ReplyHandler<O> replyHandler, final O reply) {
+    public static <O> void safeHandleReply(final ReplyHandler replyHandler, final O reply) {
         try {
             replyHandler.handleReply(reply);
         } catch (Throwable t) {
@@ -76,7 +76,7 @@
      *
      * @param replyHandler the reply handler
      */
-    public static void safeHandleCancellation(final ReplyHandler<?> replyHandler) {
+    public static void safeHandleCancellation(final ReplyHandler replyHandler) {
         try {
             replyHandler.handleCancellation();
         } catch (Throwable t) {
@@ -136,7 +136,7 @@
 
     private static final RemoteRequestContext BLANK_REMOTE_REQUEST_CONTEXT = new BlankRemoteRequestContext();
 
-    public static void safeAutoClose(final RemoteClientEndpoint<?, ?> remoteClientEndpoint) {
+    public static void safeAutoClose(final RemoteClientEndpoint remoteClientEndpoint) {
         try {
             remoteClientEndpoint.autoClose();
         } catch (Throwable t) {
@@ -144,7 +144,7 @@
         }
     }
 
-    public static void safeAutoClose(final RemoteServiceEndpoint<Object, Object> remoteServiceEndpoint) {
+    public static void safeAutoClose(final RemoteServiceEndpoint remoteServiceEndpoint) {
         try {
             remoteServiceEndpoint.autoClose();
         } catch (Throwable t) {

Modified: remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/remote/RemoteClientEndpoint.java
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/remote/RemoteClientEndpoint.java	2008-07-18 23:18:15 UTC (rev 4394)
+++ remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/remote/RemoteClientEndpoint.java	2008-07-18 23:18:52 UTC (rev 4395)
@@ -25,13 +25,12 @@
 import org.jboss.cx.remoting.Closeable;
 import org.jboss.cx.remoting.RemotingException;
 import org.jboss.cx.remoting.CloseHandler;
-import org.jboss.cx.remoting.Client;
 
 /**
  * A remote client endpoint, which can be passed to remote endpoints.  Remote systems can then use the client endpoint
  * to make invocations, or they may pass the client endpoint on to other remote systems.
  */
-public interface RemoteClientEndpoint<I, O> extends Closeable<RemoteClientEndpoint<I, O>> {
+public interface RemoteClientEndpoint extends Closeable<RemoteClientEndpoint> {
 
     /**
      * Receive a one-way request from a remote system.  This method is intended to be called by protocol handlers.  No
@@ -39,7 +38,7 @@
      *
      * @param request the request
      */
-    void receiveRequest(I request);
+    void receiveRequest(Object request);
 
     /**
      * Receive a request from a remote system.  This method is intended to be called by protocol handlers.  If the
@@ -51,7 +50,7 @@
      * @param replyHandler a handler for the reply
      * @return a context which may be used to cancel the request
      */
-    RemoteRequestContext receiveRequest(I request, ReplyHandler<O> replyHandler);
+    RemoteRequestContext receiveRequest(Object request, ReplyHandler replyHandler);
 
     /**
      * Get a handle to this client endpoint.  The client endpoint will not auto-close as long as there is at least
@@ -62,7 +61,7 @@
      * @return the handle
      * @throws RemotingException if a handle could not be acquired
      */
-    Handle<RemoteClientEndpoint<I, O>> getHandle() throws RemotingException;
+    Handle<RemoteClientEndpoint> getHandle() throws RemotingException;
 
     /**
      * Automatically close this client endpoint when all handles and local client instances are closed.
@@ -82,5 +81,5 @@
      *
      * @param handler the handler to be called
      */
-    void addCloseHandler(final CloseHandler<? super RemoteClientEndpoint<I, O>> handler);
+    void addCloseHandler(final CloseHandler<? super RemoteClientEndpoint> handler);
 }

Modified: remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/remote/RemoteClientEndpointListener.java
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/remote/RemoteClientEndpointListener.java	2008-07-18 23:18:15 UTC (rev 4394)
+++ remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/remote/RemoteClientEndpointListener.java	2008-07-18 23:18:52 UTC (rev 4395)
@@ -34,5 +34,5 @@
      * @param <O> the reply type
      * @param endpoint the endpoint that was created
      */
-    <I, O> void notifyCreated(RemoteClientEndpoint<I, O> endpoint);
+    <I, O> void notifyCreated(RemoteClientEndpoint endpoint);
 }

Modified: remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/remote/RemoteServiceEndpoint.java
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/remote/RemoteServiceEndpoint.java	2008-07-18 23:18:15 UTC (rev 4394)
+++ remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/remote/RemoteServiceEndpoint.java	2008-07-18 23:18:52 UTC (rev 4395)
@@ -25,7 +25,6 @@
 import org.jboss.cx.remoting.Closeable;
 import org.jboss.cx.remoting.RemotingException;
 import org.jboss.cx.remoting.CloseHandler;
-import org.jboss.cx.remoting.ClientSource;
 
 /**
  * A remote service endpoint, which can be passed to remote endpoints.  Remote systems can then use the service endpoint
@@ -33,7 +32,7 @@
  * has the advantage that a round trip to the remote side is not necessary; the local side can spawn a client endpoint
  * and simply notify the remote side of the change.
  */
-public interface RemoteServiceEndpoint<I, O> extends Closeable<RemoteServiceEndpoint<I, O>> {
+public interface RemoteServiceEndpoint extends Closeable<RemoteServiceEndpoint> {
 
     /**
      * Create a client endpoint for the service corresponding to this service endpoint.
@@ -41,7 +40,7 @@
      * @return a client endpoint
      * @throws RemotingException if a client could not be opened
      */
-    RemoteClientEndpoint<I, O> createClientEndpoint() throws RemotingException;
+    RemoteClientEndpoint createClientEndpoint() throws RemotingException;
 
     /**
      * Get a handle to this service endpoint.  The service endpoint will not auto-close as long as there is at least
@@ -52,7 +51,7 @@
      * @return the handle
      * @throws RemotingException if a handle could not be acquired
      */
-    Handle<RemoteServiceEndpoint<I, O>> getHandle() throws RemotingException;
+    Handle<RemoteServiceEndpoint> getHandle() throws RemotingException;
 
     /**
      * Automatically close this service endpoint when all handles and local client source instances
@@ -70,5 +69,5 @@
      *
      * @param handler the handler to be called
      */
-    void addCloseHandler(final CloseHandler<? super RemoteServiceEndpoint<I, O>> handler);
+    void addCloseHandler(final CloseHandler<? super RemoteServiceEndpoint> handler);
 }

Modified: remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/remote/ReplyHandler.java
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/remote/ReplyHandler.java	2008-07-18 23:18:15 UTC (rev 4394)
+++ remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/remote/ReplyHandler.java	2008-07-18 23:18:52 UTC (rev 4395)
@@ -26,14 +26,14 @@
  * A handler for replies from a request.  The handler should respect the first invocation made on it, and ignore
  * any subsequent invocations.
  */
-public interface ReplyHandler<O> {
+public interface ReplyHandler {
 
     /**
      * Handle a successful reply.
      *
      * @param reply the reply
      */
-    void handleReply(O reply);
+    void handleReply(Object reply);
 
     /**
      * Handle a remote exception.

Modified: remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/wrapper/EndpointWrapper.java
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/wrapper/EndpointWrapper.java	2008-07-18 23:18:15 UTC (rev 4394)
+++ remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/wrapper/EndpointWrapper.java	2008-07-18 23:18:52 UTC (rev 4395)
@@ -41,28 +41,28 @@
     /**
      * {@inheritDoc}  This implementation calls the same method on the delegate object.
      */
-    public <I, O> RemoteClientEndpoint<I, O> createClientEndpoint(final RequestListener<I, O> requestListener) throws RemotingException {
+    public <I, O> RemoteClientEndpoint createClientEndpoint(final RequestListener<I, O> requestListener) throws RemotingException {
         return delegate.createClientEndpoint(requestListener);
     }
 
     /**
      * {@inheritDoc}  This implementation calls the same method on the delegate object.
      */
-    public <I, O> RemoteServiceEndpoint<I, O> createServiceEndpoint(final RequestListener<I, O> requestListener) throws RemotingException {
+    public <I, O> RemoteServiceEndpoint createServiceEndpoint(final RequestListener<I, O> requestListener) throws RemotingException {
         return delegate.createServiceEndpoint(requestListener);
     }
 
     /**
      * {@inheritDoc}  This implementation calls the same method on the delegate object.
      */
-    public <I, O> Client<I, O> createClient(final RemoteClientEndpoint<I, O> endpoint) throws RemotingException {
+    public <I, O> Client<I, O> createClient(final RemoteClientEndpoint endpoint) throws RemotingException {
         return delegate.createClient(endpoint);
     }
 
     /**
      * {@inheritDoc}  This implementation calls the same method on the delegate object.
      */
-    public <I, O> ClientSource<I, O> createClientSource(final RemoteServiceEndpoint<I, O> endpoint) throws RemotingException {
+    public <I, O> ClientSource<I, O> createClientSource(final RemoteServiceEndpoint endpoint) throws RemotingException {
         return delegate.createClientSource(endpoint);
     }
 }

Modified: remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/ClientImpl.java
===================================================================
--- remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/ClientImpl.java	2008-07-18 23:18:15 UTC (rev 4394)
+++ remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/ClientImpl.java	2008-07-18 23:18:52 UTC (rev 4395)
@@ -38,9 +38,9 @@
  */
 public final class ClientImpl<I, O> extends AbstractContextImpl<Client<I, O>> implements Client<I, O> {
 
-    private final RemoteClientEndpoint<I, O> remoteClientEndpoint;
+    private final RemoteClientEndpoint remoteClientEndpoint;
 
-    ClientImpl(final RemoteClientEndpoint<I, O> remoteClientEndpoint, final Executor executor) {
+    ClientImpl(final RemoteClientEndpoint remoteClientEndpoint, final Executor executor) {
         super(executor);
         this.remoteClientEndpoint = remoteClientEndpoint;
     }
@@ -51,7 +51,7 @@
         }
         final QueueExecutor executor = new QueueExecutor();
         final FutureReplyImpl<O> futureReply = new FutureReplyImpl<O>(executor);
-        final ReplyHandler<O> replyHandler = futureReply.getReplyHandler();
+        final ReplyHandler replyHandler = futureReply.getReplyHandler();
         final RemoteRequestContext requestContext = remoteClientEndpoint.receiveRequest(request, replyHandler);
         futureReply.setRemoteRequestContext(requestContext);
         futureReply.addCompletionHandler(new RequestCompletionHandler<O>() {
@@ -68,7 +68,7 @@
             throw new RemotingException("Client is not open");
         }
         final FutureReplyImpl<O> futureReply = new FutureReplyImpl<O>(executor);
-        final ReplyHandler<O> replyHandler = futureReply.getReplyHandler();
+        final ReplyHandler replyHandler = futureReply.getReplyHandler();
         final RemoteRequestContext requestContext = remoteClientEndpoint.receiveRequest(request, replyHandler);
         futureReply.setRemoteRequestContext(requestContext);
         return futureReply;

Modified: remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/ClientSourceImpl.java
===================================================================
--- remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/ClientSourceImpl.java	2008-07-18 23:18:15 UTC (rev 4394)
+++ remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/ClientSourceImpl.java	2008-07-18 23:18:52 UTC (rev 4395)
@@ -35,10 +35,10 @@
  */
 public final class ClientSourceImpl<I, O> extends AbstractCloseable<ClientSource<I, O>> implements ClientSource<I, O> {
 
-    private final RemoteServiceEndpoint<I, O> serviceEndpoint;
+    private final RemoteServiceEndpoint serviceEndpoint;
     private final Endpoint endpoint;
 
-    ClientSourceImpl(final RemoteServiceEndpoint<I, O> serviceEndpoint, final EndpointImpl endpoint) {
+    ClientSourceImpl(final RemoteServiceEndpoint serviceEndpoint, final EndpointImpl endpoint) {
         super(endpoint.getExecutor());
         this.serviceEndpoint = serviceEndpoint;
         this.endpoint = endpoint;
@@ -48,7 +48,7 @@
         if (! isOpen()) {
             throw new RemotingException("Client source is not open");
         }
-        final RemoteClientEndpoint<I,O> clientEndpoint = serviceEndpoint.createClientEndpoint();
+        final RemoteClientEndpoint clientEndpoint = serviceEndpoint.createClientEndpoint();
         final Client<I, O> client = endpoint.createClient(clientEndpoint);
         clientEndpoint.autoClose();
         return client;

Modified: remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/EndpointImpl.java
===================================================================
--- remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/EndpointImpl.java	2008-07-18 23:18:15 UTC (rev 4394)
+++ remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/EndpointImpl.java	2008-07-18 23:18:52 UTC (rev 4395)
@@ -121,23 +121,23 @@
         return endpointMap;
     }
 
-    public <I, O> RemoteClientEndpoint<I, O> createClientEndpoint(final RequestListener<I, O> requestListener) throws RemotingException {
+    public <I, O> RemoteClientEndpoint createClientEndpoint(final RequestListener<I, O> requestListener) throws RemotingException {
         final RemoteClientEndpointLocalImpl<I, O> clientEndpoint = new RemoteClientEndpointLocalImpl<I, O>(executor, requestListener);
         clientEndpoint.addCloseHandler(remover);
         clientEndpoint.open();
         return clientEndpoint;
     }
 
-    public <I, O> RemoteServiceEndpoint<I, O> createServiceEndpoint(final RequestListener<I, O> requestListener) throws RemotingException {
+    public <I, O> RemoteServiceEndpoint createServiceEndpoint(final RequestListener<I, O> requestListener) throws RemotingException {
         final RemoteServiceEndpointLocalImpl<I, O> serviceEndpoint = new RemoteServiceEndpointLocalImpl<I, O>(executor, requestListener);
         serviceEndpoint.addCloseHandler(remover);
         serviceEndpoint.open();
         return serviceEndpoint;
     }
 
-    public <I, O> Client<I, O> createClient(final RemoteClientEndpoint<I, O> endpoint) throws RemotingException {
+    public <I, O> Client<I, O> createClient(final RemoteClientEndpoint endpoint) throws RemotingException {
         boolean ok = false;
-        final Handle<RemoteClientEndpoint<I,O>> handle = endpoint.getHandle();
+        final Handle<RemoteClientEndpoint> handle = endpoint.getHandle();
         try {
             final ClientImpl<I, O> client = new ClientImpl<I, O>(endpoint, executor);
             client.addCloseHandler(new CloseHandler<Client<I, O>>() {
@@ -154,9 +154,9 @@
         }
     }
 
-    public <I, O> ClientSource<I, O> createClientSource(final RemoteServiceEndpoint<I, O> endpoint) throws RemotingException {
+    public <I, O> ClientSource<I, O> createClientSource(final RemoteServiceEndpoint endpoint) throws RemotingException {
         boolean ok = false;
-        final Handle<RemoteServiceEndpoint<I,O>> handle = endpoint.getHandle();
+        final Handle<RemoteServiceEndpoint> handle = endpoint.getHandle();
         try {
             final ClientSourceImpl<I, O> client = new ClientSourceImpl<I, O>(endpoint, this);
             client.addCloseHandler(new CloseHandler<ClientSource<I, O>>() {

Modified: remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/FutureReplyImpl.java
===================================================================
--- remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/FutureReplyImpl.java	2008-07-18 23:18:15 UTC (rev 4394)
+++ remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/FutureReplyImpl.java	2008-07-18 23:18:52 UTC (rev 4395)
@@ -40,7 +40,7 @@
 public final class FutureReplyImpl<O> implements FutureReply<O> {
 
     private final Executor executor;
-    private final ReplyHandler<O> replyHandler = new Handler();
+    private final ReplyHandler replyHandler = new Handler();
     private final Object lock = new Object();
     // @protectedby lock
     private State state = State.NEW;
@@ -264,7 +264,7 @@
         return this;
     }
 
-    ReplyHandler<O> getReplyHandler() {
+    ReplyHandler getReplyHandler() {
         return replyHandler;
     }
 
@@ -284,9 +284,10 @@
         }
     }
 
-    private final class Handler implements ReplyHandler<O> {
+    private final class Handler implements ReplyHandler {
 
-        public void handleReply(final O reply) {
+        @SuppressWarnings({ "unchecked" })
+        public void handleReply(final Object reply) {
             synchronized (lock) {
                 while (state == State.NEW) {
                     boolean intr = false;
@@ -304,7 +305,7 @@
                 }
                 if (state == State.WAITING) {
                     state = State.DONE;
-                    result = reply;
+                    result = (O) reply;
                     runCompletionHandlers();
                     lock.notifyAll();
                 }

Modified: remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/RemoteClientEndpointLocalImpl.java
===================================================================
--- remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/RemoteClientEndpointLocalImpl.java	2008-07-18 23:18:15 UTC (rev 4394)
+++ remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/RemoteClientEndpointLocalImpl.java	2008-07-18 23:18:52 UTC (rev 4395)
@@ -37,7 +37,7 @@
 /**
  *
  */
-public final class RemoteClientEndpointLocalImpl<I, O> extends AbstractAutoCloseable<RemoteClientEndpoint<I, O>> implements RemoteClientEndpoint<I, O> {
+public final class RemoteClientEndpointLocalImpl<I, O> extends AbstractAutoCloseable<RemoteClientEndpoint> implements RemoteClientEndpoint {
 
     private final RequestListener<I, O> requestListener;
     private final Executor executor;
@@ -60,12 +60,13 @@
         this(executor, requestListener, new ClientContextImpl(executor));
     }
 
-    public void receiveRequest(final I request) {
+    public void receiveRequest(final Object request) {
         final RequestContextImpl<O> context = new RequestContextImpl<O>(clientContext);
         executor.execute(new Runnable() {
+            @SuppressWarnings({ "unchecked" })
             public void run() {
                 try {
-                    requestListener.handleRequest(context, request);
+                    requestListener.handleRequest(context, (I) request);
                 } catch (Throwable t) {
                     log.error(t, "Unexpected exception in request listener");
                 }
@@ -73,12 +74,13 @@
         });
     }
 
-    public RemoteRequestContext receiveRequest(final I request, final ReplyHandler<O> replyHandler) {
+    public RemoteRequestContext receiveRequest(final Object request, final ReplyHandler replyHandler) {
         final RequestContextImpl<O> context = new RequestContextImpl<O>(replyHandler, clientContext);
         executor.execute(new Runnable() {
+            @SuppressWarnings({ "unchecked" })
             public void run() {
                 try {
-                    requestListener.handleRequest(context, request);
+                    requestListener.handleRequest(context, (I) request);
                 } catch (RemoteExecutionException e) {
                     SpiUtils.safeHandleException(replyHandler, e.getMessage(), e.getCause());
                 } catch (Throwable t) {
@@ -96,8 +98,8 @@
     void open() throws RemotingException {
         try {
             requestListener.handleClientOpen(clientContext);
-            addCloseHandler(new CloseHandler<RemoteClientEndpoint<I, O>>() {
-                public void handleClose(final RemoteClientEndpoint<I, O> closed) {
+            addCloseHandler(new CloseHandler<RemoteClientEndpoint>() {
+                public void handleClose(final RemoteClientEndpoint closed) {
                     try {
                         requestListener.handleClientClose(clientContext);
                     } catch (Throwable t) {

Modified: remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/RemoteServiceEndpointLocalImpl.java
===================================================================
--- remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/RemoteServiceEndpointLocalImpl.java	2008-07-18 23:18:15 UTC (rev 4394)
+++ remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/RemoteServiceEndpointLocalImpl.java	2008-07-18 23:18:52 UTC (rev 4395)
@@ -34,7 +34,7 @@
 /**
  *
  */
-public final class RemoteServiceEndpointLocalImpl<I, O> extends AbstractAutoCloseable<RemoteServiceEndpoint<I, O>> implements RemoteServiceEndpoint<I, O> {
+public final class RemoteServiceEndpointLocalImpl<I, O> extends AbstractAutoCloseable<RemoteServiceEndpoint> implements RemoteServiceEndpoint {
 
     private final RequestListener<I, O> requestListener;
     private final ServiceContextImpl serviceContext;
@@ -49,7 +49,7 @@
         serviceContext = new ServiceContextImpl(executor);
     }
 
-    public RemoteClientEndpoint<I, O> createClientEndpoint() throws RemotingException {
+    public RemoteClientEndpoint createClientEndpoint() throws RemotingException {
         if (isOpen()) {
             final RemoteClientEndpointLocalImpl<I, O> clientEndpoint = new RemoteClientEndpointLocalImpl<I, O>(executor, this, requestListener);
             clientEndpoint.open();
@@ -62,8 +62,8 @@
     void open() throws RemotingException {
         try {
             requestListener.handleServiceOpen(serviceContext);
-            addCloseHandler(new CloseHandler<RemoteServiceEndpoint<I, O>>() {
-                public void handleClose(final RemoteServiceEndpoint<I, O> closed) {
+            addCloseHandler(new CloseHandler<RemoteServiceEndpoint>() {
+                public void handleClose(final RemoteServiceEndpoint closed) {
                     try {
                         requestListener.handleServiceClose(serviceContext);
                     } catch (Throwable t) {

Modified: remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/RequestContextImpl.java
===================================================================
--- remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/RequestContextImpl.java	2008-07-18 23:18:15 UTC (rev 4394)
+++ remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/RequestContextImpl.java	2008-07-18 23:18:52 UTC (rev 4395)
@@ -40,7 +40,7 @@
 
     private final AtomicBoolean closed = new AtomicBoolean();
     private final Object cancelLock = new Object();
-    private final ReplyHandler<O> replyHandler;
+    private final ReplyHandler replyHandler;
     private final ClientContextImpl clientContext;
 
     private final AtomicBoolean cancelled = new AtomicBoolean();
@@ -48,7 +48,7 @@
     private Set<RequestCancelHandler<O>> cancelHandlers;
     private final TaggingExecutor executor;
 
-    RequestContextImpl(final ReplyHandler<O> replyHandler, final ClientContextImpl clientContext) {
+    RequestContextImpl(final ReplyHandler replyHandler, final ClientContextImpl clientContext) {
         this.replyHandler = replyHandler;
         this.clientContext = clientContext;
         executor = new TaggingExecutor(clientContext.getExecutor());

Modified: remoting3/trunk/core/src/test/java/org/jboss/cx/remoting/core/EndpointTestCase.java
===================================================================
--- remoting3/trunk/core/src/test/java/org/jboss/cx/remoting/core/EndpointTestCase.java	2008-07-18 23:18:15 UTC (rev 4394)
+++ remoting3/trunk/core/src/test/java/org/jboss/cx/remoting/core/EndpointTestCase.java	2008-07-18 23:18:52 UTC (rev 4395)
@@ -78,7 +78,7 @@
             endpoint.setExecutor(executorService);
             endpoint.start();
             try {
-                final RemoteClientEndpoint<Object,Object> clientEndpoint = endpoint.createClientEndpoint(new AbstractRequestListener<Object, Object>() {
+                final RemoteClientEndpoint clientEndpoint = endpoint.createClientEndpoint(new AbstractRequestListener<Object, Object>() {
                     public void handleRequest(final RequestContext<Object> context, final Object request) throws RemoteExecutionException {
                         assertEquals(request, requestObj);
                         try {
@@ -93,8 +93,8 @@
                     }
                 });
                 try {
-                    clientEndpoint.addCloseHandler(new CloseHandler<RemoteClientEndpoint<Object, Object>>() {
-                        public void handleClose(final RemoteClientEndpoint<Object, Object> closed) {
+                    clientEndpoint.addCloseHandler(new CloseHandler<RemoteClientEndpoint>() {
+                        public void handleClose(final RemoteClientEndpoint closed) {
                             clientEndpointClosed.set(true);
                         }
                     });
@@ -137,7 +137,7 @@
             endpoint.setExecutor(executorService);
             endpoint.start();
             try {
-                final RemoteClientEndpoint<Object,Object> clientEndpoint = endpoint.createClientEndpoint(new AbstractRequestListener<Object, Object>() {
+                final RemoteClientEndpoint clientEndpoint = endpoint.createClientEndpoint(new AbstractRequestListener<Object, Object>() {
                     public void handleRequest(final RequestContext<Object> context, final Object request) throws RemoteExecutionException {
                         assertEquals(request, requestObj);
                         try {
@@ -152,8 +152,8 @@
                     }
                 });
                 try {
-                    clientEndpoint.addCloseHandler(new CloseHandler<RemoteClientEndpoint<Object, Object>>() {
-                        public void handleClose(final RemoteClientEndpoint<Object, Object> closed) {
+                    clientEndpoint.addCloseHandler(new CloseHandler<RemoteClientEndpoint>() {
+                        public void handleClose(final RemoteClientEndpoint closed) {
                             clientEndpointClosed.set(true);
                         }
                     });

Modified: remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/BasicHandler.java
===================================================================
--- remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/BasicHandler.java	2008-07-18 23:18:15 UTC (rev 4394)
+++ remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/BasicHandler.java	2008-07-18 23:18:52 UTC (rev 4395)
@@ -52,6 +52,7 @@
 import static org.jboss.cx.remoting.protocol.basic.MessageType.REQUEST_FAILED;
 import static org.jboss.cx.remoting.protocol.basic.MessageType.CANCEL_ACK;
 import org.jboss.cx.remoting.RemotingException;
+import org.jboss.cx.remoting.CloseHandler;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.Executor;
@@ -71,13 +72,13 @@
     private static final Logger log = Logger.getLogger(BasicHandler.class);
 
     // clients whose requests get forwarded to the remote side
-    private final ConcurrentMap<Integer, RemoteClientEndpoint<?, ?>> remoteClients = concurrentMap();
+    private final ConcurrentMap<Integer, RemoteClientEndpoint> remoteClients = concurrentMap();
     // running on remote node
-    private final ConcurrentMap<Integer, ReplyHandler<?>> outstandingRequests = concurrentMap();
+    private final ConcurrentMap<Integer, ReplyHandler> outstandingRequests = concurrentMap();
     // forwarded to remote side
-    private final ConcurrentMap<Integer, Handle<RemoteClientEndpoint<?, ?>>> forwardedClients = concurrentMap();
+    private final ConcurrentMap<Integer, Handle<RemoteClientEndpoint>> forwardedClients = concurrentMap();
     // forwarded to remote side
-    private final ConcurrentMap<Integer, Handle<RemoteServiceEndpoint<?, ?>>> forwardedServices = concurrentMap();
+    private final ConcurrentMap<Integer, Handle<RemoteServiceEndpoint>> forwardedServices = concurrentMap();
 
     private final boolean server;
     private final BufferAllocator<ByteBuffer> allocator;
@@ -90,12 +91,12 @@
     private final ClassLoader classLoader;
 
     @SuppressWarnings({ "unchecked" })
-    public <I, O> BasicHandler(final boolean server, final BufferAllocator<ByteBuffer> allocator, final RemoteClientEndpoint<I, O> root, final Executor executor, final RemoteClientEndpointListener remoteListener, final MarshallerFactory<ByteBuffer> marshallerFactory) throws RemotingException {
+    public BasicHandler(final boolean server, final BufferAllocator<ByteBuffer> allocator, final RemoteClientEndpoint root, final Executor executor, final RemoteClientEndpointListener remoteListener, final MarshallerFactory<ByteBuffer> marshallerFactory) throws RemotingException {
         this.server = server;
         this.allocator = allocator;
         this.executor = executor;
         forwardedClients.put(Integer.valueOf(0), ((RemoteClientEndpoint)root).getHandle());
-        final RemoteClientEndpointImpl<Object, Object> endpoint = new RemoteClientEndpointImpl<Object, Object>(0, marshallerFactory, allocator);
+        final RemoteClientEndpointImpl endpoint = new RemoteClientEndpointImpl(0, marshallerFactory, allocator);
         remoteClients.put(Integer.valueOf(0), endpoint);
         if (remoteListener != null) {
             remoteListener.notifyCreated(endpoint);
@@ -148,7 +149,7 @@
             switch (msgType) {
                 case REQUEST_ONEWAY: {
                     final int clientId = buffer.getInt();
-                    final Handle<RemoteClientEndpoint<?, ?>> handle = getForwardedClient(clientId);
+                    final Handle<RemoteClientEndpoint> handle = getForwardedClient(clientId);
                     if (handle == null) {
                         log.trace("Request on invalid client ID %d", Integer.valueOf(clientId));
                         return;
@@ -165,13 +166,13 @@
                         log.trace("Class not found in one-way request for client ID %d", Integer.valueOf(clientId));
                         break;
                     }
-                    final RemoteClientEndpoint<?, ?> clientEndpoint = handle.getResource();
-                    receiveRequest(clientEndpoint, payload);
+                    final RemoteClientEndpoint clientEndpoint = handle.getResource();
+                    clientEndpoint.receiveRequest(payload);
                     break;
                 }
                 case REQUEST: {
                     final int clientId = buffer.getInt();
-                    final Handle<RemoteClientEndpoint<?, ?>> handle = getForwardedClient(clientId);
+                    final Handle<RemoteClientEndpoint> handle = getForwardedClient(clientId);
                     if (handle == null) {
                         log.trace("Request on invalid client ID %d", Integer.valueOf(clientId));
                         break;
@@ -190,13 +191,13 @@
                         log.trace("Class not found in request ID %d for client ID %d", Integer.valueOf(requestId), Integer.valueOf(clientId));
                         break;
                     }
-                    final RemoteClientEndpoint<?, ?> clientEndpoint = handle.getResource();
-                    receiveRequest(clientEndpoint, new ReplyHandlerImpl(channel, requestId, allocator), payload);
+                    final RemoteClientEndpoint clientEndpoint = handle.getResource();
+                    clientEndpoint.receiveRequest(payload, (ReplyHandler) new ReplyHandlerImpl(channel, requestId, allocator));
                     break;
                 }
                 case REPLY: {
                     final int requestId = buffer.getInt();
-                    final ReplyHandler<?> replyHandler = takeOutstandingReqeust(requestId);
+                    final ReplyHandler replyHandler = takeOutstandingReqeust(requestId);
                     if (replyHandler == null) {
                         log.trace("Got reply to unknown request %d", Integer.valueOf(requestId));
                         break;
@@ -215,12 +216,12 @@
                         log.trace("Class not found in reply to request ID %d", Integer.valueOf(requestId));
                         break;
                     }
-                    handleReply(replyHandler, payload);
+                    SpiUtils.safeHandleReply(replyHandler, payload);
                     break;
                 }
                 case REQUEST_FAILED: {
                     final int requestId = buffer.getInt();
-                    final ReplyHandler<?> replyHandler = takeOutstandingReqeust(requestId);
+                    final ReplyHandler replyHandler = takeOutstandingReqeust(requestId);
                     if (replyHandler == null) {
                         log.trace("Got reply to unknown request %d", Integer.valueOf(requestId));
                         break;
@@ -247,26 +248,40 @@
                         log.trace("Class not found in exception reply to request ID %d", Integer.valueOf(requestId));
                         break;
                     }
-                    handleException(replyHandler, message, cause);
+                    SpiUtils.safeHandleException(replyHandler, message == null ? null : message.toString(), cause instanceof Throwable ? (Throwable) cause : null);
                     break;
                 }
                 case CLIENT_CLOSE: {
+                    final int clientId = buffer.getInt();
+                    final Handle<RemoteClientEndpoint> handle = takeForwardedClient(clientId);
+                    if (handle == null) {
+                        log.warn("Got client close message for unknown client %d", Integer.valueOf(clientId));
+                        break;
+                    }
+                    IoUtils.safeClose(handle);
                     break;
                 }
                 case CLIENT_OPEN: {
                     final int serviceId = buffer.getInt();
                     final int clientId = buffer.getInt();
-                    final Handle<RemoteServiceEndpoint<?, ?>> handle = getForwardedService(serviceId);
+                    final Handle<RemoteServiceEndpoint> handle = getForwardedService(serviceId);
                     if (handle == null) {
                         // todo log invalid request
                         break;
                     }
-                    final RemoteServiceEndpoint<?, ?> serviceEndpoint = handle.getResource();
-                    final RemoteClientEndpoint<?, ?> clientEndpoint = serviceEndpoint.createClientEndpoint();
+                    final RemoteServiceEndpoint serviceEndpoint = handle.getResource();
+                    final RemoteClientEndpoint clientEndpoint = serviceEndpoint.createClientEndpoint();
 
                     break;
                 }
                 case SERVICE_CLOSE: {
+                    final int serviceId = buffer.getInt();
+                    final Handle<RemoteServiceEndpoint> handle = takeForwardedService(serviceId);
+                    if (handle == null) {
+                        log.warn("Got client close message for unknown service %d", Integer.valueOf(serviceId));
+                        break;
+                    }
+                    IoUtils.safeClose(handle);
                     break;
                 }
                 default: {
@@ -309,15 +324,11 @@
     public void handleClosed(final AllocatedMessageChannel channel) {
     }
 
-    private <I, O> ReplyHandler<O> createReplyHandler(final AllocatedMessageChannel channel, final int requestId) {
-        return new ReplyHandlerImpl<O>(channel, requestId, allocator);
-    }
-
-    RemoteClientEndpoint<?, ?> getRemoteClient(final int i) {
+    RemoteClientEndpoint getRemoteClient(final int i) {
         return remoteClients.get(Integer.valueOf(i));
     }
 
-    private final class ReplyHandlerImpl<O> implements ReplyHandler<O> {
+    private final class ReplyHandlerImpl implements ReplyHandler {
 
         private final AllocatedMessageChannel channel;
         private final int requestId;
@@ -335,7 +346,7 @@
             this.allocator = allocator;
         }
 
-        public void handleReply(final O reply) {
+        public void handleReply(final Object reply) {
             ByteBuffer buffer = allocator.allocate();
             buffer.put((byte) REPLY);
             buffer.putInt(requestId);
@@ -400,7 +411,7 @@
 
     // Session mgmt
 
-    public int openRequest(ReplyHandler<?> handler) {
+    public int openRequest(ReplyHandler handler) {
         int id;
         do {
             id = localRequestIdSeq.getAndIncrement();
@@ -412,12 +423,12 @@
         int id;
         do {
             id = remoteClientIdSeq.getAndIncrement() << 1 | (server ? 1 : 0);
-        } while (remoteClients.putIfAbsent(Integer.valueOf(id), new RemoteClientEndpointImpl<Object, Object>(id, null, allocator)) != null);
+        } while (remoteClients.putIfAbsent(Integer.valueOf(id), new RemoteClientEndpointImpl(id, null, allocator)) != null);
         return id;
     }
 
     @SuppressWarnings({ "unchecked" })
-    public void openClientForForwardedService(int id, RemoteClientEndpoint<?, ?> clientEndpoint) {
+    public void openClientForForwardedService(int id, RemoteClientEndpoint clientEndpoint) {
         try {
             forwardedClients.put(Integer.valueOf(id), ((RemoteClientEndpoint)clientEndpoint).getHandle());
         } catch (RemotingException e) {
@@ -425,37 +436,26 @@
         }
     }
 
-    public Handle<RemoteClientEndpoint<?, ?>> getForwardedClient(int id) {
+    public Handle<RemoteClientEndpoint> getForwardedClient(int id) {
         return forwardedClients.get(Integer.valueOf(id));
     }
 
-    public ReplyHandler<?> takeOutstandingReqeust(int id) {
+    private Handle<RemoteClientEndpoint> takeForwardedClient(final int id) {
+        return forwardedClients.remove(Integer.valueOf(id));
+    }
+
+    public ReplyHandler takeOutstandingReqeust(int id) {
         return outstandingRequests.remove(Integer.valueOf(id));
     }
 
-    public Handle<RemoteServiceEndpoint<?, ?>> getForwardedService(final int id) {
+    public Handle<RemoteServiceEndpoint> getForwardedService(final int id) {
         return forwardedServices.get(Integer.valueOf(id));
     }
 
-    @SuppressWarnings({ "unchecked" })
-    private static <I, O> void receiveRequest(RemoteClientEndpoint<I, O> clientEndpoint, Object request) {
-        clientEndpoint.receiveRequest((I) request);
+    public Handle<RemoteServiceEndpoint> takeForwardedService(final int id) {
+        return forwardedServices.remove(Integer.valueOf(id));
     }
 
-    @SuppressWarnings({ "unchecked" })
-    private static <I, O> RemoteRequestContext receiveRequest(RemoteClientEndpoint<I, O> clientEndpoint, ReplyHandler<O> replyHandler, Object request) {
-        return clientEndpoint.receiveRequest((I) request, replyHandler);
-    }
-
-    @SuppressWarnings({ "unchecked" })
-    private static <O> void handleReply(final ReplyHandler<O> replyHandler, final Object reply) {
-        SpiUtils.safeHandleReply(replyHandler, (O) reply);
-    }
-
-    private static void handleException(final ReplyHandler<?> handler, final Object message, final Object cause) {
-        SpiUtils.safeHandleException(handler, message == null ? null : message.toString(), cause instanceof Throwable ? (Throwable) cause : null);
-    }
-
     // Writer members
 
     private final BlockingQueue<WriteHandler> outputQueue = CollectionUtil.blockingQueue(64);
@@ -470,7 +470,7 @@
 
     // client endpoint
 
-    private final class RemoteClientEndpointImpl<I, O> extends AbstractAutoCloseable<RemoteClientEndpoint<I, O>> implements RemoteClientEndpoint<I, O> {
+    private final class RemoteClientEndpointImpl extends AbstractAutoCloseable<RemoteClientEndpoint> implements RemoteClientEndpoint {
 
         private final int identifier;
         private final MarshallerFactory<ByteBuffer> marshallerFactory;
@@ -487,9 +487,22 @@
             this.identifier = identifier;
             this.marshallerFactory = marshallerFactory;
             this.allocator = allocator;
+            addCloseHandler(new CloseHandler<RemoteClientEndpoint>() {
+                public void handleClose(final RemoteClientEndpoint closed) {
+                    ByteBuffer buffer = allocator.allocate();
+                    buffer.put((byte) MessageType.CLIENT_CLOSE);
+                    buffer.putInt(identifier);
+                    buffer.flip();
+                    try {
+                        registerWriter(channel, new SimpleWriteHandler(allocator, buffer));
+                    } catch (InterruptedException e) {
+                        log.warn("Client close notification was interrupted before it could be sent");
+                    }
+                }
+            });
         }
 
-        public void receiveRequest(final I request) {
+        public void receiveRequest(final Object request) {
             log.trace("Received one-way request of type %s", request == null ? "null" : request.getClass());
             try {
                 final Marshaller<ByteBuffer> marshaller = marshallerFactory.createMarshaller(null);
@@ -517,7 +530,7 @@
             }
         }
 
-        public RemoteRequestContext receiveRequest(final I request, final ReplyHandler<O> handler) {
+        public RemoteRequestContext receiveRequest(final Object request, final ReplyHandler handler) {
             log.trace("Received request of type %s", request == null ? "null" : request.getClass());
             try {
                 final Marshaller<ByteBuffer> marshaller = marshallerFactory.createMarshaller(null);
@@ -579,7 +592,7 @@
         }
     }
 
-    public final class RemoteServiceEndpointImpl<I, O> extends AbstractAutoCloseable<RemoteServiceEndpoint<I, O>> implements RemoteServiceEndpoint<I, O> {
+    public final class RemoteServiceEndpointImpl extends AbstractAutoCloseable<RemoteServiceEndpoint> implements RemoteServiceEndpoint {
 
         private final MarshallerFactory<ByteBuffer> marshallerFactory;
         private final BufferAllocator<ByteBuffer> allocator;
@@ -590,20 +603,34 @@
             this.marshallerFactory = marshallerFactory;
             this.allocator = allocator;
             this.identifier = identifier;
+            addCloseHandler(new CloseHandler<RemoteServiceEndpoint>() {
+                public void handleClose(final RemoteServiceEndpoint closed) {
+                    ByteBuffer buffer = allocator.allocate();
+                    buffer.put((byte) MessageType.SERVICE_CLOSE);
+                    buffer.putInt(identifier);
+                    buffer.flip();
+                    try {
+                        registerWriter(channel, new SimpleWriteHandler(allocator, buffer));
+                    } catch (InterruptedException e) {
+                        log.warn("Service close notification was interrupted before it could be sent");
+                    }
+                }
+            });
         }
 
-        public RemoteClientEndpoint<I, O> createClientEndpoint() throws RemotingException {
+        public RemoteClientEndpoint createClientEndpoint() throws RemotingException {
             final int id = openClientFromService();
             final ByteBuffer buffer = allocator.allocate();
             buffer.putInt(identifier);
             buffer.putInt(openClientFromService());
             buffer.flip();
+            // todo - probably should bail out if we're interrupted?
             boolean intr = false;
             for (;;) {
                 try {
                     registerWriter(channel, new SimpleWriteHandler(allocator, buffer));
                     try {
-                        return new RemoteClientEndpointImpl<I,O>(id, marshallerFactory, allocator);
+                        return new RemoteClientEndpointImpl(id, marshallerFactory, allocator);
                     } finally {
                         if (intr) {
                             Thread.currentThread().interrupt();

Modified: remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/BasicProtocol.java
===================================================================
--- remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/BasicProtocol.java	2008-07-18 23:18:15 UTC (rev 4394)
+++ remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/BasicProtocol.java	2008-07-18 23:18:52 UTC (rev 4395)
@@ -26,7 +26,6 @@
 import org.jboss.cx.remoting.spi.remote.RemoteServiceEndpoint;
 import org.jboss.cx.remoting.spi.remote.RemoteClientEndpointListener;
 import org.jboss.cx.remoting.RemotingException;
-import org.jboss.cx.remoting.core.marshal.JBossSerializationMarshallerFactory;
 import org.jboss.cx.remoting.core.marshal.JavaSerializationMarshallerFactory;
 import org.jboss.xnio.IoHandlerFactory;
 import org.jboss.xnio.ChannelSource;
@@ -59,11 +58,11 @@
      * @param remoteListener a listener which receives notification of the remote root client of the incoming connection
      * @return a handler factory for passing to an XNIO server
      */
-    public static IoHandlerFactory<AllocatedMessageChannel> createServer(final Executor executor, final RemoteServiceEndpoint<?, ?> localRootSource, final BufferAllocator<ByteBuffer> allocator, final RemoteClientEndpointListener remoteListener) {
+    public static IoHandlerFactory<AllocatedMessageChannel> createServer(final Executor executor, final RemoteServiceEndpoint localRootSource, final BufferAllocator<ByteBuffer> allocator, final RemoteClientEndpointListener remoteListener) {
         return new IoHandlerFactory<AllocatedMessageChannel>() {
             public IoHandler<? super AllocatedMessageChannel> createHandler() {
                 try {
-                    final RemoteClientEndpoint<?, ?> remoteClientEndpoint = localRootSource.createClientEndpoint();
+                    final RemoteClientEndpoint remoteClientEndpoint = localRootSource.createClientEndpoint();
                     try {
                         return new BasicHandler(true, allocator, remoteClientEndpoint, executor, remoteListener, new JavaSerializationMarshallerFactory(executor));
                     } finally {
@@ -92,13 +91,13 @@
      * @return the future client endpoint of the remote side's root client
      * @throws IOException if an error occurs
      */
-    public static <I, O> IoFuture<RemoteClientEndpoint<I, O>> connect(final Executor executor, final RemoteClientEndpoint<?, ?> localRoot, final ChannelSource<AllocatedMessageChannel> channelSource, final BufferAllocator<ByteBuffer> allocator) throws IOException {
+    public static <I, O> IoFuture<RemoteClientEndpoint> connect(final Executor executor, final RemoteClientEndpoint localRoot, final ChannelSource<AllocatedMessageChannel> channelSource, final BufferAllocator<ByteBuffer> allocator) throws IOException {
         final BasicHandler basicHandler = new BasicHandler(false, allocator, localRoot, executor, null, new JavaSerializationMarshallerFactory(executor));
         final IoFuture<AllocatedMessageChannel> futureChannel = channelSource.open(basicHandler);
-        return new AbstractConvertingIoFuture<RemoteClientEndpoint<I, O>, AllocatedMessageChannel>(futureChannel) {
+        return new AbstractConvertingIoFuture<RemoteClientEndpoint, AllocatedMessageChannel>(futureChannel) {
             @SuppressWarnings({ "unchecked" })
-            protected RemoteClientEndpoint<I, O> convert(final AllocatedMessageChannel channel) throws RemotingException {
-                final RemoteClientEndpoint<?, ?> remoteClientEndpoint = basicHandler.getRemoteClient(0);
+            protected RemoteClientEndpoint convert(final AllocatedMessageChannel channel) throws RemotingException {
+                final RemoteClientEndpoint remoteClientEndpoint = basicHandler.getRemoteClient(0);
                 return (RemoteClientEndpoint) remoteClientEndpoint;
             }
         };

Added: remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/ServiceRegistry.java
===================================================================
--- remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/ServiceRegistry.java	                        (rev 0)
+++ remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/ServiceRegistry.java	2008-07-18 23:18:52 UTC (rev 4395)
@@ -0,0 +1,39 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2008, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.cx.remoting.protocol.basic;
+
+import org.jboss.cx.remoting.spi.remote.RemoteServiceEndpoint;
+import org.jboss.cx.remoting.RemotingException;
+
+/**
+ *
+ */
+public interface ServiceRegistry {
+    int bind(RemoteServiceEndpoint remoteServiceEndpoint) throws RemotingException;
+
+    void bind(RemoteServiceEndpoint remoteServiceEndpoint, int id) throws RemotingException;
+
+    void unbind(int id) throws RemotingException;
+
+    RemoteServiceEndpoint
+}

Modified: remoting3/trunk/protocol/basic/src/test/java/org/jboss/cx/remoting/protocol/basic/ConnectionTestCase.java
===================================================================
--- remoting3/trunk/protocol/basic/src/test/java/org/jboss/cx/remoting/protocol/basic/ConnectionTestCase.java	2008-07-18 23:18:15 UTC (rev 4394)
+++ remoting3/trunk/protocol/basic/src/test/java/org/jboss/cx/remoting/protocol/basic/ConnectionTestCase.java	2008-07-18 23:18:52 UTC (rev 4395)
@@ -50,6 +50,9 @@
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
+import java.util.Collections;
+import java.util.List;
+import java.util.LinkedList;
 import java.nio.ByteBuffer;
 import java.net.InetSocketAddress;
 import java.io.Closeable;
@@ -63,11 +66,13 @@
     }
 
     public void testConnection() throws Throwable {
+        final List<Throwable> problems = Collections.synchronizedList(new LinkedList<Throwable>());
         final AtomicBoolean clientOpened = new AtomicBoolean(false);
+        final AtomicBoolean client2Opened = new AtomicBoolean(false);
         final AtomicBoolean serviceOpened = new AtomicBoolean(false);
         final AtomicBoolean clientClosed = new AtomicBoolean(false);
         final AtomicBoolean serviceClosed = new AtomicBoolean(false);
-        final CountDownLatch clientCloseLatch = new CountDownLatch(1);
+        final CountDownLatch cleanupLatch = new CountDownLatch(2);
         final ExecutorService executorService = Executors.newCachedThreadPool();
         try {
             final BufferAllocator<ByteBuffer> allocator = new BufferAllocator<ByteBuffer>() {
@@ -84,13 +89,19 @@
                 endpoint.setExecutor(executorService);
                 endpoint.start();
                 try {
-                    final RemoteServiceEndpoint<Object,Object> serverServiceEndpoint = endpoint.createServiceEndpoint(new RequestListener<Object, Object>() {
+                    final RemoteServiceEndpoint serverServiceEndpoint = endpoint.createServiceEndpoint(new RequestListener<Object, Object>() {
                         public void handleClientOpen(final ClientContext context) {
-                            clientOpened.set(true);
+                            if (clientOpened.getAndSet(true)) {
+                                if (client2Opened.getAndSet(true)) {
+                                    problems.add(new IllegalStateException("Too many client opens"));
+                                }
+                            }
                         }
 
                         public void handleServiceOpen(final ServiceContext context) {
-                            serviceOpened.set(true);
+                            if (serviceOpened.getAndSet(true)) {
+                                problems.add(new IllegalStateException("Multiple service opens"));
+                            }
                         }
 
                         public void handleRequest(final RequestContext<Object> context, final Object request) throws RemoteExecutionException {
@@ -101,61 +112,78 @@
                                 try {
                                     context.sendFailure("failed", e);
                                 } catch (RemotingException e1) {
-                                    System.out.println("Double fault!");
+                                    problems.add(e1);
                                 }
                             }
                         }
 
                         public void handleServiceClose(final ServiceContext context) {
-                            serviceClosed.set(true);
+                            if (serviceClosed.getAndSet(true)) {
+                                problems.add(new IllegalStateException("Multiple service closes"));
+                            }
+                            cleanupLatch.countDown();
                         }
 
                         public void handleClientClose(final ClientContext context) {
-                            clientClosed.set(true);
-                            clientCloseLatch.countDown();
+                            if (clientClosed.getAndSet(true)) {
+                                problems.add(new IllegalStateException("Multiple client closes"));
+                            }
+                            cleanupLatch.countDown();
                         }
                     });
                     try {
-                        final Handle<RemoteServiceEndpoint<Object,Object>> handle = serverServiceEndpoint.getHandle();
+                        final Handle<RemoteServiceEndpoint> serviceHandle = serverServiceEndpoint.getHandle();
                         serverServiceEndpoint.autoClose();
                         try {
                             final RemoteClientEndpointListener remoteListener = new RemoteClientEndpointListener() {
-
-                                public <I, O> void notifyCreated(final RemoteClientEndpoint<I, O> endpoint) {
-
+                                public <I, O> void notifyCreated(final RemoteClientEndpoint endpoint) {
                                 }
                             };
                             final ConfigurableFactory<Closeable> tcpServer = xnio.createTcpServer(executorService, Channels.convertStreamToAllocatedMessage(BasicProtocol.createServer(executorService, serverServiceEndpoint, allocator, remoteListener), 32768, 32768), new InetSocketAddress(12345));
                             final Closeable tcpServerCloseable = tcpServer.create();
                             try {
                                 // now create a client to connect to it
-                                final RemoteClientEndpoint<?,?> localRoot = serverServiceEndpoint.createClientEndpoint();
-                                final InetSocketAddress destAddr = new InetSocketAddress("localhost", 12345);
-                                final TcpClient tcpClient = xnio.createTcpConnector().create().createChannelSource(destAddr);
-                                final ChannelSource<AllocatedMessageChannel> messageChannelSource = Channels.convertStreamToAllocatedMessage(tcpClient, 32768, 32768);
-                                final IoFuture<RemoteClientEndpoint<Object,Object>> futureClient = BasicProtocol.connect(executorService, localRoot, messageChannelSource, allocator);
-                                final RemoteClientEndpoint<Object, Object> clientEndpoint = futureClient.get();
+                                final RemoteClientEndpoint localRoot = ;
                                 try {
-                                    final Client<Object,Object> client = endpoint.createClient(clientEndpoint);
+                                    serverServiceEndpoint.autoClose();
+                                    final InetSocketAddress destAddr = new InetSocketAddress("localhost", 12345);
+                                    final TcpClient tcpClient = xnio.createTcpConnector().create().createChannelSource(destAddr);
                                     try {
-                                        clientEndpoint.autoClose();
-                                        final Object result = client.send("Test").get();
-                                        assertEquals("response", result);
-                                        client.close();
-                                        tcpServerCloseable.close();
-                                        handle.close();
+                                        final ChannelSource<AllocatedMessageChannel> messageChannelSource = Channels.convertStreamToAllocatedMessage(tcpClient, 32768, 32768);
+                                        final IoFuture<RemoteClientEndpoint> futureClient = BasicProtocol.connect(executorService, localRoot, messageChannelSource, allocator);
+                                        final RemoteClientEndpoint clientEndpoint = futureClient.get();
+                                        try {
+                                            final Client<Object,Object> client = endpoint.createClient(clientEndpoint);
+                                            try {
+                                                clientEndpoint.autoClose();
+                                                final Object result = client.send("Test").get();
+                                                assertEquals("response", result);
+                                                client.close();
+                                                cleanupLatch.await(500L, TimeUnit.MILLISECONDS);
+                                                tcpServerCloseable.close();
+                                                serviceHandle.close();
+                                                assertTrue(serviceOpened.get());
+                                                assertTrue(clientOpened.get());
+                                                assertTrue(client2Opened.get());
+                                                assertTrue(clientClosed.get());
+                                                assertTrue(serviceClosed.get());
+                                            } finally {
+                                                IoUtils.safeClose(client);
+                                            }
+                                        } finally {
+                                            IoUtils.safeClose(clientEndpoint);
+                                        }
                                     } finally {
-                                        IoUtils.safeClose(client);
-                                        clientCloseLatch.await(500L, TimeUnit.MILLISECONDS);
+                                        // todo close tcpClient
                                     }
                                 } finally {
-                                    IoUtils.safeClose(clientEndpoint);
+                                    IoUtils.safeClose(localRoot);
                                 }
                             } finally {
                                 IoUtils.safeClose(tcpServerCloseable);
                             }
                         } finally {
-                            IoUtils.safeClose(handle);
+                            IoUtils.safeClose(serviceHandle);
                         }
                     } finally {
                         IoUtils.safeClose(serverServiceEndpoint);
@@ -169,9 +197,8 @@
         } finally {
             executorService.shutdownNow();
         }
-        assertTrue(serviceOpened.get());
-        assertTrue(clientOpened.get());
-        assertTrue(clientClosed.get());
-        assertTrue(serviceClosed.get());
+        for (Throwable t : problems) {
+            throw t;
+        }
     }
 }

Modified: remoting3/trunk/standalone/src/main/java/org/jboss/cx/remoting/Remoting.java
===================================================================
--- remoting3/trunk/standalone/src/main/java/org/jboss/cx/remoting/Remoting.java	2008-07-18 23:18:15 UTC (rev 4394)
+++ remoting3/trunk/standalone/src/main/java/org/jboss/cx/remoting/Remoting.java	2008-07-18 23:18:52 UTC (rev 4395)
@@ -31,7 +31,7 @@
     }
 
     public static <I, O> Client<I, O> createLocalClient(Endpoint endpoint, RequestListener<I, O> requestListener) throws RemotingException {
-        final RemoteClientEndpoint<I, O> clientEndpoint = endpoint.createClientEndpoint(requestListener);
+        final RemoteClientEndpoint clientEndpoint = endpoint.createClientEndpoint(requestListener);
         try {
             return endpoint.createClient(clientEndpoint);
         } finally {
@@ -40,7 +40,7 @@
     }
 
     public static <I, O> ClientSource<I, O> createLocalClientSource(Endpoint endpoint, RequestListener<I, O> requestListener) throws RemotingException {
-        final RemoteServiceEndpoint<I, O> serviceEndpoint = endpoint.createServiceEndpoint(requestListener);
+        final RemoteServiceEndpoint serviceEndpoint = endpoint.createServiceEndpoint(requestListener);
         try {
             return endpoint.createClientSource(serviceEndpoint);
         } finally {




More information about the jboss-remoting-commits mailing list