[jboss-remoting-commits] JBoss Remoting SVN: r4706 - in remoting3/trunk: api/src/main/java/org/jboss/remoting and 6 other directories.

jboss-remoting-commits at lists.jboss.org jboss-remoting-commits at lists.jboss.org
Wed Nov 19 14:15:56 EST 2008


Author: david.lloyd at jboss.com
Date: 2008-11-19 14:15:56 -0500 (Wed, 19 Nov 2008)
New Revision: 4706

Modified:
   remoting3/trunk/api/src/main/java/org/jboss/remoting/Endpoint.java
   remoting3/trunk/build.xml
   remoting3/trunk/core/src/main/java/org/jboss/remoting/core/EndpointImpl.java
   remoting3/trunk/core/src/test/java/org/jboss/remoting/core/EndpointTestCase.java
   remoting3/trunk/protocol/basic/src/test/java/org/jboss/remoting/protocol/basic/BasicTestCase.java
   remoting3/trunk/protocol/multiplex/src/test/java/org/jboss/remoting/protocol/multiplex/ConnectionTestCase.java
   remoting3/trunk/samples/src/main/java/org/jboss/remoting/samples/simple/LocalBasicExampleMain.java
   remoting3/trunk/samples/src/main/java/org/jboss/remoting/samples/simple/LocalStreamExampleMain.java
   remoting3/trunk/standalone/src/main/java/org/jboss/remoting/Remoting.java
Log:
JBREM-1066; make Endpoint closeable

Modified: remoting3/trunk/api/src/main/java/org/jboss/remoting/Endpoint.java
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/remoting/Endpoint.java	2008-11-19 16:22:04 UTC (rev 4705)
+++ remoting3/trunk/api/src/main/java/org/jboss/remoting/Endpoint.java	2008-11-19 19:15:56 UTC (rev 4706)
@@ -11,7 +11,7 @@
 /**
  * A potential participant in a JBoss Remoting communications relationship.
  */
-public interface Endpoint {
+public interface Endpoint extends HandleableCloseable<Endpoint> {
     /**
      * Get the endpoint attribute map.  This is a storage area for any data associated with this endpoint, including
      * (but not limited to) connection and protocol information, and application information.

Modified: remoting3/trunk/build.xml
===================================================================
--- remoting3/trunk/build.xml	2008-11-19 16:22:04 UTC (rev 4705)
+++ remoting3/trunk/build.xml	2008-11-19 19:15:56 UTC (rev 4706)
@@ -338,6 +338,8 @@
     <target name="api" description="Build the API module" depends="lib.marshalling-api,lib.xnio-api,api.compile">
         <path id="api.classpath">
             <pathelement location="api/target/main/classes"/>
+            <pathelement location="${lib.xnio-api.local}"/>
+            <pathelement location="${lib.marshalling-api.local}"/>
         </path>
     </target>
 
@@ -404,8 +406,6 @@
             <classpath>
                 <path refid="api.classpath"/>
                 <path refid="version.classpath"/>
-                <pathelement location="${lib.marshalling-api.local}"/>
-                <pathelement location="${lib.xnio-api.local}"/>
             </classpath>
         </javac>
         <touch file="core/target/main/.lastcompile" verbose="false"/>
@@ -435,9 +435,7 @@
                 <path refid="api.classpath"/>
                 <path refid="core.classpath"/>
                 <path refid="testing-support.classpath"/>
-                <pathelement location="${lib.marshalling-api.local}"/>
                 <pathelement location="${lib.junit.local}"/>
-                <pathelement location="${lib.xnio-api.local}"/>
             </classpath>
         </javac>
         <touch file="core/target/test/.lastcompile" verbose="false"/>
@@ -462,8 +460,6 @@
                 <path refid="testing-support.classpath"/>
                 <pathelement location="core/target/test/classes"/>
                 <pathelement location="${lib.junit.local}"/>
-                <pathelement location="${lib.marshalling-api.local}"/>
-                <pathelement location="${lib.xnio-api.local}"/>
             </classpath>
             <batchtest fork="yes" todir="core/target/test-results"
                        haltonfailure="no">
@@ -605,9 +601,7 @@
                 <path refid="protocol.multiplex.classpath"/>
                 <path refid="testing-support.classpath"/>
                 <pathelement location="${lib.junit.local}"/>
-                <pathelement location="${lib.marshalling-api.local}"/>
                 <pathelement location="${lib.river.local}"/>
-                <pathelement location="${lib.xnio-api.local}"/>
                 <pathelement location="${lib.xnio-nio.local}"/>
             </classpath>
         </javac>
@@ -636,9 +630,7 @@
                 <path refid="testing-support.classpath"/>
                 <pathelement location="protocol/multiplex/target/test/classes"/>
                 <pathelement location="${lib.junit.local}"/>
-                <pathelement location="${lib.marshalling-api.local}"/>
                 <pathelement location="${lib.river.local}"/>
-                <pathelement location="${lib.xnio-api.local}"/>
                 <pathelement location="${lib.xnio-nio.local}"/>
             </classpath>
             <batchtest fork="yes" todir="protocol/multiplex/target/test-results"
@@ -732,9 +724,7 @@
                 <path refid="standalone.classpath"/>
                 <path refid="testing-support.classpath"/>
                 <pathelement location="${lib.junit.local}"/>
-                <pathelement location="${lib.marshalling-api.local}"/>
                 <pathelement location="${lib.river.local}"/>
-                <pathelement location="${lib.xnio-api.local}"/>
                 <pathelement location="${lib.xnio-nio.local}"/>
             </classpath>
         </javac>
@@ -764,9 +754,7 @@
                 <path refid="testing-support.classpath"/>
                 <pathelement location="protocol/basic/target/test/classes"/>
                 <pathelement location="${lib.junit.local}"/>
-                <pathelement location="${lib.marshalling-api.local}"/>
                 <pathelement location="${lib.river.local}"/>
-                <pathelement location="${lib.xnio-api.local}"/>
                 <pathelement location="${lib.xnio-nio.local}"/>
             </classpath>
             <batchtest fork="yes" todir="protocol/basic/target/test-results"

Modified: remoting3/trunk/core/src/main/java/org/jboss/remoting/core/EndpointImpl.java
===================================================================
--- remoting3/trunk/core/src/main/java/org/jboss/remoting/core/EndpointImpl.java	2008-11-19 16:22:04 UTC (rev 4705)
+++ remoting3/trunk/core/src/main/java/org/jboss/remoting/core/EndpointImpl.java	2008-11-19 19:15:56 UTC (rev 4706)
@@ -1,19 +1,16 @@
 package org.jboss.remoting.core;
 
-import java.io.Closeable;
 import java.io.IOException;
+import java.io.Closeable;
 import java.net.URI;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.Executor;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
+import java.lang.ref.WeakReference;
 import org.jboss.remoting.Client;
 import org.jboss.remoting.ClientSource;
-import org.jboss.remoting.CloseHandler;
 import org.jboss.remoting.Endpoint;
 import org.jboss.remoting.RequestListener;
 import org.jboss.remoting.ServiceListener;
@@ -22,24 +19,26 @@
 import org.jboss.remoting.EndpointPermission;
 import org.jboss.remoting.RemoteServiceConfiguration;
 import org.jboss.remoting.ServiceURI;
+import org.jboss.remoting.CloseHandler;
 import org.jboss.remoting.core.util.OrderedExecutorFactory;
 import org.jboss.remoting.core.util.CollectionUtil;
-import org.jboss.remoting.core.util.NamingThreadFactory;
-import org.jboss.remoting.spi.AbstractSimpleCloseable;
 import org.jboss.remoting.spi.Handle;
 import org.jboss.remoting.spi.RequestHandler;
 import org.jboss.remoting.spi.RequestHandlerSource;
+import org.jboss.remoting.spi.AbstractHandleableCloseable;
+import org.jboss.remoting.spi.AbstractSimpleCloseable;
 import org.jboss.remoting.version.Version;
 import org.jboss.xnio.FailedIoFuture;
 import org.jboss.xnio.FinishedIoFuture;
 import org.jboss.xnio.IoFuture;
 import org.jboss.xnio.IoUtils;
+import org.jboss.xnio.WeakCloseable;
 import org.jboss.xnio.log.Logger;
 
 /**
  *
  */
-public final class EndpointImpl implements Endpoint {
+public final class EndpointImpl extends AbstractHandleableCloseable<Endpoint> implements Endpoint {
 
     static {
         // Print Remoting "greeting" message
@@ -48,12 +47,10 @@
 
     private static final Logger log = Logger.getLogger("org.jboss.remoting.endpoint");
 
-    private String name;
+    private final String name;
 
-    private OrderedExecutorFactory orderedExecutorFactory;
-    private ExecutorService executorService;
+    private final OrderedExecutorFactory orderedExecutorFactory;
 
-    private final Set<Closeable> resources = CollectionUtil.synchronizedWeakHashSet();
     private final ConcurrentMap<Object, Object> endpointMap = CollectionUtil.concurrentMap();
 
     private final Object serviceLock = new Object();
@@ -67,82 +64,29 @@
     private static final EndpointPermission REGISTER_REMOTE_SERVICE_PERM = new EndpointPermission("registerRemoteService");
     private static final EndpointPermission ADD_SERVICE_LISTENER_PERM = new EndpointPermission("addServiceListener");
 
-    public EndpointImpl() {
+    public EndpointImpl(final Executor executor, final String name) {
+        super(executor);
+        this.executor = executor;
+        this.name = name;
+        orderedExecutorFactory = new OrderedExecutorFactory(executor);
     }
 
-    // Dependencies
+    private final Executor executor;
 
-    private Executor executor;
-
-    public Executor getExecutor() {
-        return executor;
-    }
-
-    Executor getOrderedExecutor() {
+    protected Executor getOrderedExecutor() {
         return orderedExecutorFactory.getOrderedExecutor();
     }
 
-    public void setExecutor(final Executor executor) {
-        this.executor = executor;
-        orderedExecutorFactory = new OrderedExecutorFactory(executor);
+    protected Executor getExecutor() {
+        return executor;
     }
 
-    // Configuration
+    // Endpoint implementation
 
-    public void setName(final String name) {
-        this.name = name;
-    }
-
     public String getName() {
         return name;
     }
 
-    // Lifecycle
-
-    public void start() {
-        // todo security check
-        if (executor == null) {
-            executor = executorService = Executors.newCachedThreadPool(new NamingThreadFactory(Executors.defaultThreadFactory(), "Remoting endpoint %s"));
-            setExecutor(executorService);
-        }
-    }
-
-    public void stop() {
-        // todo security check
-        boolean intr = false;
-        try {
-            for (Closeable resource : resources) {
-                IoUtils.safeClose(resource);
-            }
-            synchronized (resources) {
-                while (! resources.isEmpty()) {
-                    try {
-                        resources.wait();
-                    } catch (InterruptedException e) {
-                        intr = true;
-                    }
-                }
-            }
-            if (executorService != null) {
-                executorService.shutdown();
-                boolean done = false;
-                do try {
-                    done = executorService.awaitTermination(30L, TimeUnit.SECONDS);
-                } catch (InterruptedException e) {
-                    intr = true;
-                } while (! done);
-                executorService = null;
-                executor = null;
-            }
-        } finally {
-            if (intr) {
-                Thread.currentThread().interrupt();
-            }
-        }
-    }
-
-    // Endpoint implementation
-
     public ConcurrentMap<Object, Object> getAttributes() {
         return endpointMap;
     }
@@ -152,12 +96,22 @@
         if (sm != null) {
             sm.checkPermission(CREATE_REQUEST_HANDLER_PERM);
         }
-        LocalRequestHandler.Config<I, O> config = new LocalRequestHandler.Config<I,O>(requestClass, replyClass);
+        LocalRequestHandler.Config<I, O> config = new LocalRequestHandler.Config<I, O>(requestClass, replyClass);
         config.setExecutor(executor);
         config.setRequestListener(requestListener);
         config.setClientContext(new ClientContextImpl(executor));
         final LocalRequestHandler<I, O> localRequestHandler = new LocalRequestHandler<I, O>(config);
-        localRequestHandler.addCloseHandler(remover);
+        final WeakCloseable lrhCloseable = new WeakCloseable(new WeakReference<Closeable>(localRequestHandler));
+        final Key key = addCloseHandler(new CloseHandler<Endpoint>() {
+            public void handleClose(final Endpoint closed) {
+                IoUtils.safeClose(lrhCloseable);
+            }
+        });
+        localRequestHandler.addCloseHandler(new CloseHandler<RequestHandler>() {
+            public void handleClose(final RequestHandler closed) {
+                key.remove();
+            }
+        });
         localRequestHandler.open();
         return localRequestHandler.getHandle();
     }
@@ -217,7 +171,17 @@
                 }
             }
         }
-        localRequestHandlerSource.addCloseHandler(remover);
+        final WeakCloseable lrhCloseable = new WeakCloseable(new WeakReference<Closeable>(localRequestHandlerSource));
+        final Key key = addCloseHandler(new CloseHandler<Endpoint>() {
+            public void handleClose(final Endpoint closed) {
+                IoUtils.safeClose(lrhCloseable);
+            }
+        });
+        localRequestHandlerSource.addCloseHandler(new CloseHandler<RequestHandlerSource>() {
+            public void handleClose(final RequestHandlerSource closed) {
+                key.remove();
+            }
+        });
         localRequestHandlerSource.open();
         return localRequestHandlerSource.getHandle();
     }
@@ -235,6 +199,17 @@
         final Handle<RequestHandler> handle = requestHandler.getHandle();
         try {
             final ClientImpl<I, O> client = ClientImpl.create(handle, executor, requestType, replyType);
+            final WeakCloseable lrhCloseable = new WeakCloseable(new WeakReference<Closeable>(client));
+            final Key key = addCloseHandler(new CloseHandler<Endpoint>() {
+                public void handleClose(final Endpoint closed) {
+                    IoUtils.safeClose(lrhCloseable);
+                }
+            });
+            client.addCloseHandler(new CloseHandler<Client>() {
+                public void handleClose(final Client closed) {
+                    key.remove();
+                }
+            });
             ok = true;
             return client;
         } finally {
@@ -253,6 +228,17 @@
         final Handle<RequestHandlerSource> handle = requestHandlerSource.getHandle();
         try {
             final ClientSourceImpl<I, O> clientSource = ClientSourceImpl.create(handle, this, requestClass, replyClass);
+            final WeakCloseable lrhCloseable = new WeakCloseable(new WeakReference<Closeable>(clientSource));
+            final Key key = addCloseHandler(new CloseHandler<Endpoint>() {
+                public void handleClose(final Endpoint closed) {
+                    IoUtils.safeClose(lrhCloseable);
+                }
+            });
+            clientSource.addCloseHandler(new CloseHandler<ClientSource>() {
+                public void handleClose(final ClientSource closed) {
+                    key.remove();
+                }
+            });
             ok = true;
             return clientSource;
         } finally {
@@ -459,20 +445,6 @@
         }
     }
 
-    private final ResourceRemover remover = new ResourceRemover();
-
-    private final class ResourceRemover implements CloseHandler<Closeable> {
-        public void handleClose(final Closeable closed) {
-            synchronized (resources)
-            {
-                resources.remove(closed);
-                if (resources.isEmpty()) {
-                    resources.notifyAll();
-                }
-            }
-        }
-    }
-
     public String toString() {
         return "endpoint \"" + name + "\" <" + Integer.toString(hashCode()) + ">";
     }

Modified: remoting3/trunk/core/src/test/java/org/jboss/remoting/core/EndpointTestCase.java
===================================================================
--- remoting3/trunk/core/src/test/java/org/jboss/remoting/core/EndpointTestCase.java	2008-11-19 16:22:04 UTC (rev 4705)
+++ remoting3/trunk/core/src/test/java/org/jboss/remoting/core/EndpointTestCase.java	2008-11-19 19:15:56 UTC (rev 4706)
@@ -51,21 +51,11 @@
 
     private static final Logger log = Logger.getLogger(EndpointTestCase.class);
 
-    private static void safeStop(EndpointImpl endpoint) {
-        try {
-            endpoint.stop();
-        } catch (Throwable t) {
-            t.printStackTrace();
-        }
-    }
-
     public void testCreate() throws Throwable {
-        final EndpointImpl endpoint = new EndpointImpl();
         final ExecutorService executorService = Executors.newCachedThreadPool();
+        final EndpointImpl endpoint = new EndpointImpl(executorService, "foo");
         try {
-            endpoint.setExecutor(executorService);
-            endpoint.start();
-            endpoint.stop();
+            endpoint.close();
             executorService.shutdown();
             assertTrue(executorService.awaitTermination(1L, TimeUnit.SECONDS));
         } finally {
@@ -76,13 +66,11 @@
     public void testLocalClientInvoke() throws Throwable {
         final AtomicBoolean clientEndpointClosed = new AtomicBoolean(false);
         final AtomicBoolean clientClosed = new AtomicBoolean(false);
-        final EndpointImpl endpoint = new EndpointImpl();
         final ExecutorService executorService = Executors.newCachedThreadPool();
-        final Object requestObj = new Object();
-        final Object replyObj = new Object();
         try {
-            endpoint.setExecutor(executorService);
-            endpoint.start();
+            final EndpointImpl endpoint = new EndpointImpl(executorService, "test-endpoint");
+            final Object requestObj = new Object();
+            final Object replyObj = new Object();
             try {
                 final Handle<RequestHandler> handle = endpoint.createRequestHandler(new AbstractRequestListener<Object, Object>() {
                     public void handleRequest(final RequestContext<Object> context, final Object request) throws RemoteExecutionException {
@@ -90,43 +78,44 @@
                         try {
                             context.sendReply(replyObj);
                         } catch (IOException e) {
-                            try {
-                                context.sendFailure(e.getMessage(), e);
-                            } catch (IOException e1) {
-                                fail("double fault");
-                            }
+                            log.error(e, "Error sending reply!");
                         }
                     }
                 }, Object.class, Object.class);
-                final RequestHandler requestHandler = handle.getResource();
                 try {
-                    requestHandler.addCloseHandler(new CloseHandler<RequestHandler>() {
-                        public void handleClose(final RequestHandler closed) {
-                            clientEndpointClosed.set(true);
-                        }
-                    });
-                    final Client<Object,Object> client = endpoint.createClient(requestHandler, Object.class, Object.class);
+                    final RequestHandler requestHandler = handle.getResource();
                     try {
-                        client.addCloseHandler(new CloseHandler<Client<Object, Object>>() {
-                            public void handleClose(final Client<Object, Object> closed) {
-                                clientClosed.set(true);
+                        requestHandler.addCloseHandler(new CloseHandler<RequestHandler>() {
+                            public void handleClose(final RequestHandler closed) {
+                                clientEndpointClosed.set(true);
                             }
                         });
-                        assertEquals(replyObj, client.invoke(requestObj));
-                        client.close();
+                        final Client<Object,Object> client = endpoint.createClient(requestHandler, Object.class, Object.class);
+                        try {
+                            client.addCloseHandler(new CloseHandler<Client<Object, Object>>() {
+                                public void handleClose(final Client<Object, Object> closed) {
+                                    clientClosed.set(true);
+                                }
+                            });
+                            handle.close();
+                            assertEquals(replyObj, client.invoke(requestObj));
+                            client.close();
+                            executorService.shutdown();
+                            assertTrue(executorService.awaitTermination(1L, TimeUnit.SECONDS));
+                            assertTrue(clientEndpointClosed.get());
+                            assertTrue(clientClosed.get());
+                        } finally {
+                            IoUtils.safeClose(client);
+                        }
                     } finally {
-                        IoUtils.safeClose(client);
+                        IoUtils.safeClose(requestHandler);
                     }
                 } finally {
-                    IoUtils.safeClose(requestHandler);
+                    IoUtils.safeClose(handle);
                 }
             } finally {
-                safeStop(endpoint);
+                IoUtils.safeClose(endpoint);
             }
-            executorService.shutdown();
-            assertTrue(executorService.awaitTermination(1L, TimeUnit.SECONDS));
-            assertTrue(clientEndpointClosed.get());
-            assertTrue(clientClosed.get());
         } finally {
             executorService.shutdownNow();
         }
@@ -135,14 +124,12 @@
     public void testLocalClientSend() throws Throwable {
         final AtomicBoolean clientEndpointClosed = new AtomicBoolean(false);
         final AtomicBoolean clientClosed = new AtomicBoolean(false);
-        final EndpointImpl endpoint = new EndpointImpl();
         final ExecutorService executorService = Executors.newCachedThreadPool();
-        final Object requestObj = new Object();
-        final Object replyObj = new Object();
         try {
-            endpoint.setExecutor(executorService);
-            endpoint.start();
+            final EndpointImpl endpoint = new EndpointImpl(executorService, "test-endpoint");
             try {
+                final Object requestObj = new Object();
+                final Object replyObj = new Object();
                 final Handle<RequestHandler> handle = endpoint.createRequestHandler(new AbstractRequestListener<Object, Object>() {
                     public void handleRequest(final RequestContext<Object> context, final Object request) throws RemoteExecutionException {
                         assertEquals(request, requestObj);
@@ -153,65 +140,78 @@
                         }
                     }
                 }, Object.class, Object.class);
-                final RequestHandler requestHandler = handle.getResource();
                 try {
-                    requestHandler.addCloseHandler(new CloseHandler<RequestHandler>() {
-                        public void handleClose(final RequestHandler closed) {
-                            clientEndpointClosed.set(true);
-                        }
-                    });
-                    final Client<Object,Object> client = endpoint.createClient(requestHandler, Object.class, Object.class);
+                    final RequestHandler requestHandler = handle.getResource();
                     try {
-                        client.addCloseHandler(new CloseHandler<Client<Object, Object>>() {
-                            public void handleClose(final Client<Object, Object> closed) {
-                                clientClosed.set(true);
+                        requestHandler.addCloseHandler(new CloseHandler<RequestHandler>() {
+                            public void handleClose(final RequestHandler closed) {
+                                clientEndpointClosed.set(true);
                             }
                         });
-                        assertEquals(replyObj, client.send(requestObj).get());
-                        client.close();
+                        final Client<Object,Object> client = endpoint.createClient(requestHandler, Object.class, Object.class);
+                        try {
+                            client.addCloseHandler(new CloseHandler<Client<Object, Object>>() {
+                                public void handleClose(final Client<Object, Object> closed) {
+                                    clientClosed.set(true);
+                                }
+                            });
+                            handle.close();
+                            final IoFuture<Object> futureReply = client.send(requestObj);
+                            assertEquals(IoFuture.Status.DONE, futureReply.await(1L, TimeUnit.SECONDS));
+                            assertEquals(replyObj, futureReply.get());
+                            client.close();
+                            executorService.shutdown();
+                            assertTrue(executorService.awaitTermination(1L, TimeUnit.SECONDS));
+                            assertTrue(clientEndpointClosed.get());
+                            assertTrue(clientClosed.get());
+                        } finally {
+                            IoUtils.safeClose(client);
+                        }
                     } finally {
-                        IoUtils.safeClose(client);
+                        IoUtils.safeClose(requestHandler);
                     }
                 } finally {
-                    IoUtils.safeClose(requestHandler);
+                    IoUtils.safeClose(handle);
                 }
             } finally {
-                safeStop(endpoint);
+                IoUtils.safeClose(endpoint);
             }
-            executorService.shutdown();
-            assertTrue(executorService.awaitTermination(1L, TimeUnit.SECONDS));
-            assertTrue(clientEndpointClosed.get());
-            assertTrue(clientClosed.get());
         } finally {
             executorService.shutdownNow();
         }
     }
 
     public void testUnsentReply() throws Throwable {
-        final EndpointImpl endpoint = new EndpointImpl();
         final ExecutorService executorService = Executors.newCachedThreadPool();
-        final Object requestObj = new Object();
         try {
-            endpoint.setExecutor(executorService);
-            endpoint.start();
+            final EndpointImpl endpoint = new EndpointImpl(executorService, "test-endpoint");
             try {
+                final Object requestObj = new Object();
                 final Handle<RequestHandler> handle = endpoint.createRequestHandler(new AbstractRequestListener<Object, Object>() {
                     public void handleRequest(final RequestContext<Object> context, final Object request) throws RemoteExecutionException {
                         assertEquals(request, requestObj);
                         // don't send a reply!!
                     }
                 }, Object.class, Object.class);
-                final RequestHandler requestHandler = handle.getResource();
                 try {
-                    final Client<Object,Object> client = endpoint.createClient(requestHandler, Object.class, Object.class);
-                    final IoFuture<Object> futureReply = client.send(requestObj);
-                    assertEquals(IoFuture.Status.FAILED, futureReply.await(500L, TimeUnit.MILLISECONDS));
-                    assertTrue(futureReply.getException() instanceof IndeterminateOutcomeException);
+                    final RequestHandler requestHandler = handle.getResource();
+                    try {
+                        final Client<Object,Object> client = endpoint.createClient(requestHandler, Object.class, Object.class);
+                        try {
+                            final IoFuture<Object> futureReply = client.send(requestObj);
+                            assertEquals(IoFuture.Status.FAILED, futureReply.await(500L, TimeUnit.MILLISECONDS));
+                            assertTrue(futureReply.getException() instanceof IndeterminateOutcomeException);
+                        } finally {
+                            IoUtils.safeClose(client);
+                        }
+                    } finally {
+                        IoUtils.safeClose(requestHandler);
+                    }
                 } finally {
-                    IoUtils.safeClose(requestHandler);
+                    IoUtils.safeClose(handle);
                 }
             } finally {
-                safeStop(endpoint);
+                IoUtils.safeClose(endpoint);
             }
         } finally {
             executorService.shutdownNow();
@@ -219,13 +219,11 @@
     }
 
     public void testUnsentReply2() throws Throwable {
-        final EndpointImpl endpoint = new EndpointImpl();
         final ExecutorService executorService = Executors.newCachedThreadPool();
-        final Object requestObj = new Object();
         try {
-            endpoint.setExecutor(executorService);
-            endpoint.start();
+            final EndpointImpl endpoint = new EndpointImpl(executorService, "test-endpoint");
             try {
+                final Object requestObj = new Object();
                 final Handle<RequestHandler> handle = endpoint.createRequestHandler(new AbstractRequestListener<Object, Object>() {
                     public void handleRequest(final RequestContext<Object> context, final Object request) throws RemoteExecutionException {
                         assertEquals(request, requestObj);
@@ -249,19 +247,28 @@
                             public void run() {
                             }
                         });
+                        // don't send a reply!!
                     }
                 }, Object.class, Object.class);
-                final RequestHandler requestHandler = handle.getResource();
                 try {
-                    final Client<Object,Object> client = endpoint.createClient(requestHandler, Object.class, Object.class);
-                    final IoFuture<Object> futureReply = client.send(requestObj);
-                    assertEquals(IoFuture.Status.FAILED, futureReply.await(500L, TimeUnit.MILLISECONDS));
-                    assertTrue(futureReply.getException() instanceof IndeterminateOutcomeException);
+                    final RequestHandler requestHandler = handle.getResource();
+                    try {
+                        final Client<Object,Object> client = endpoint.createClient(requestHandler, Object.class, Object.class);
+                        try {
+                            final IoFuture<Object> futureReply = client.send(requestObj);
+                            assertEquals(IoFuture.Status.FAILED, futureReply.await(500L, TimeUnit.MILLISECONDS));
+                            assertTrue(futureReply.getException() instanceof IndeterminateOutcomeException);
+                        } finally {
+                            IoUtils.safeClose(client);
+                        }
+                    } finally {
+                        IoUtils.safeClose(requestHandler);
+                    }
                 } finally {
-                    IoUtils.safeClose(requestHandler);
+                    IoUtils.safeClose(handle);
                 }
             } finally {
-                safeStop(endpoint);
+                IoUtils.safeClose(endpoint);
             }
         } finally {
             executorService.shutdownNow();

Modified: remoting3/trunk/protocol/basic/src/test/java/org/jboss/remoting/protocol/basic/BasicTestCase.java
===================================================================
--- remoting3/trunk/protocol/basic/src/test/java/org/jboss/remoting/protocol/basic/BasicTestCase.java	2008-11-19 16:22:04 UTC (rev 4705)
+++ remoting3/trunk/protocol/basic/src/test/java/org/jboss/remoting/protocol/basic/BasicTestCase.java	2008-11-19 19:15:56 UTC (rev 4706)
@@ -57,52 +57,83 @@
 
     public static void testConnect() throws Throwable {
         ExecutorService executor = new ThreadPoolExecutor(10, 10, 0L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());
-        Xnio xnio = NioXnio.create(executor, 2, 2, 2);
-        final BasicConfiguration configuration = new BasicConfiguration();
-        configuration.setExecutor(executor);
-        configuration.setMarshallerFactory(new RiverMarshallerFactory());
-        final MarshallingConfiguration marshallingConfiguration = new MarshallingConfiguration();
-        configuration.setMarshallingConfiguration(marshallingConfiguration);
-        final Endpoint endpoint = Remoting.createEndpoint("test");
-        final Handle<RequestHandler> requestHandlerHandle = endpoint.createRequestHandler(new AbstractRequestListener<Object, Object>() {
-            public void handleRequest(final RequestContext<Object> context, final Object request) throws RemoteExecutionException {
-                System.out.println("Got a request! " + request.toString());
+        try {
+            Xnio xnio = NioXnio.create(executor, 2, 2, 2);
+            try {
+                final BasicConfiguration configuration = new BasicConfiguration();
+                configuration.setExecutor(executor);
+                configuration.setMarshallerFactory(new RiverMarshallerFactory());
+                final MarshallingConfiguration marshallingConfiguration = new MarshallingConfiguration();
+                configuration.setMarshallingConfiguration(marshallingConfiguration);
+                final Endpoint endpoint = Remoting.createEndpoint(executor, "test");
                 try {
-                    context.sendReply("GOOMBA");
-                } catch (IOException e) {
+                    final Handle<RequestHandler> requestHandlerHandle = endpoint.createRequestHandler(new AbstractRequestListener<Object, Object>() {
+                        public void handleRequest(final RequestContext<Object> context, final Object request) throws RemoteExecutionException {
+                            System.out.println("Got a request! " + request.toString());
+                            try {
+                                context.sendReply("GOOMBA");
+                            } catch (IOException e) {
+                                try {
+                                    context.sendFailure("Failed", e);
+                                } catch (IOException e1) {
+                                    // buh
+                                }
+                            }
+                        }
+                    }, Object.class, Object.class);
                     try {
-                        context.sendFailure("Failed", e);
-                    } catch (IOException e1) {
-                        // buh
-                    }
-                }
-            }
-        }, Object.class, Object.class);
-        final ChannelSource<StreamChannel> channelSource = xnio.createPipeServer(executor, IoUtils.singletonHandlerFactory(new IoHandler<StreamChannel>() {
-            public void handleOpened(final StreamChannel channel) {
-                try {
-                    System.out.println("Opening channel");
-                    BasicProtocol.createServer(requestHandlerHandle, channel, configuration);
-                } catch (IOException e) {
-                    e.printStackTrace();
-                    IoUtils.safeClose(channel);
-                }
-            }
+                        final ChannelSource<StreamChannel> channelSource = xnio.createPipeServer(executor, IoUtils.singletonHandlerFactory(new IoHandler<StreamChannel>() {
+                            public void handleOpened(final StreamChannel channel) {
+                                try {
+                                    System.out.println("Opening channel");
+                                    BasicProtocol.createServer(requestHandlerHandle, channel, configuration);
+                                } catch (IOException e) {
+                                    e.printStackTrace();
+                                    IoUtils.safeClose(channel);
+                                }
+                            }
 
-            public void handleReadable(final StreamChannel channel) {
-            }
+                            public void handleReadable(final StreamChannel channel) {
+                            }
 
-            public void handleWritable(final StreamChannel channel) {
-            }
+                            public void handleWritable(final StreamChannel channel) {
+                            }
 
-            public void handleClosed(final StreamChannel channel) {
-                System.out.println("Closing channel");
+                            public void handleClosed(final StreamChannel channel) {
+                                System.out.println("Closing channel");
+                            }
+                        }));
+                        final IoFuture<StreamChannel> futureChannel = channelSource.open(IoUtils.nullHandler());
+                        assertEquals(IoFuture.Status.DONE, futureChannel.await(1L, TimeUnit.SECONDS));
+                        final StreamChannel channel = futureChannel.get();
+                        try {
+                            final Handle<RequestHandler> clientHandlerHandle = BasicProtocol.createClient(channel, configuration);
+                            try {
+                                final Client<Object,Object> client = endpoint.createClient(clientHandlerHandle.getResource(), Object.class, Object.class);
+                                try {
+                                    final IoFuture<Object> futureReply = client.send("GORBA!");
+                                    assertEquals(IoFuture.Status.DONE, futureReply.await(500L, TimeUnit.MILLISECONDS));
+                                    System.out.println("Reply is:" + futureReply.get());
+                                } finally {
+                                    IoUtils.safeClose(client);
+                                }
+                            } finally {
+                                IoUtils.safeClose(clientHandlerHandle);
+                            }
+                        } finally {
+                            IoUtils.safeClose(channel);
+                        }
+                    } finally {
+                        IoUtils.safeClose(requestHandlerHandle);
+                    }
+                } finally {
+                    IoUtils.safeClose(endpoint);
+                }
+            } finally {
+                IoUtils.safeClose(xnio);
             }
-        }));
-        final IoFuture<StreamChannel> futureChannel = channelSource.open(IoUtils.nullHandler());
-        final Handle<RequestHandler> clientHandlerHandle = BasicProtocol.createClient(futureChannel.get(), configuration);
-        final Client<Object,Object> client = endpoint.createClient(clientHandlerHandle.getResource(), Object.class, Object.class);
-        System.out.println("Reply is:" + client.invoke("GORBA!"));
-
+        } finally {
+            executor.shutdownNow();
+        }
     }
 }

Modified: remoting3/trunk/protocol/multiplex/src/test/java/org/jboss/remoting/protocol/multiplex/ConnectionTestCase.java
===================================================================
--- remoting3/trunk/protocol/multiplex/src/test/java/org/jboss/remoting/protocol/multiplex/ConnectionTestCase.java	2008-11-19 16:22:04 UTC (rev 4705)
+++ remoting3/trunk/protocol/multiplex/src/test/java/org/jboss/remoting/protocol/multiplex/ConnectionTestCase.java	2008-11-19 19:15:56 UTC (rev 4706)
@@ -79,13 +79,9 @@
             final BufferAllocator<ByteBuffer> allocator = Buffers.createHeapByteBufferAllocator(1024);
             final Xnio xnio = NioXnio.create();
             try {
-                final EndpointImpl remoteEndpoint = new EndpointImpl();
-                remoteEndpoint.setExecutor(closeableExecutor);
-                remoteEndpoint.start();
+                final EndpointImpl remoteEndpoint = new EndpointImpl(closeableExecutor, "left-side");
                 try {
-                    final EndpointImpl endpoint = new EndpointImpl();
-                    endpoint.setExecutor(closeableExecutor);
-                    endpoint.start();
+                    final EndpointImpl endpoint = new EndpointImpl(closeableExecutor, "right-side");
                     try {
                         final CountDownLatch latch = new CountDownLatch(1);
                         final MultiplexConfiguration configuration = new MultiplexConfiguration();
@@ -167,10 +163,10 @@
                             IoUtils.safeClose(requestHandlerSourceHandle);
                         }
                     } finally {
-                        endpoint.stop();
+                        IoUtils.safeClose(endpoint);
                     }
                 } finally {
-                    remoteEndpoint.stop();
+                    IoUtils.safeClose(remoteEndpoint);
                 }
             } finally {
                 IoUtils.safeClose(xnio);

Modified: remoting3/trunk/samples/src/main/java/org/jboss/remoting/samples/simple/LocalBasicExampleMain.java
===================================================================
--- remoting3/trunk/samples/src/main/java/org/jboss/remoting/samples/simple/LocalBasicExampleMain.java	2008-11-19 16:22:04 UTC (rev 4705)
+++ remoting3/trunk/samples/src/main/java/org/jboss/remoting/samples/simple/LocalBasicExampleMain.java	2008-11-19 19:15:56 UTC (rev 4706)
@@ -27,7 +27,7 @@
                 IoUtils.safeClose(client);
             }
         } finally {
-            Remoting.closeEndpoint(endpoint);
+            IoUtils.safeClose(endpoint);
         }
     }
 }
\ No newline at end of file

Modified: remoting3/trunk/samples/src/main/java/org/jboss/remoting/samples/simple/LocalStreamExampleMain.java
===================================================================
--- remoting3/trunk/samples/src/main/java/org/jboss/remoting/samples/simple/LocalStreamExampleMain.java	2008-11-19 16:22:04 UTC (rev 4705)
+++ remoting3/trunk/samples/src/main/java/org/jboss/remoting/samples/simple/LocalStreamExampleMain.java	2008-11-19 19:15:56 UTC (rev 4706)
@@ -45,7 +45,7 @@
                 IoUtils.safeClose(client);
             }
         } finally {
-            Remoting.closeEndpoint(endpoint);
+            IoUtils.safeClose(endpoint);
         }
     }
 }

Modified: remoting3/trunk/standalone/src/main/java/org/jboss/remoting/Remoting.java
===================================================================
--- remoting3/trunk/standalone/src/main/java/org/jboss/remoting/Remoting.java	2008-11-19 16:22:04 UTC (rev 4705)
+++ remoting3/trunk/standalone/src/main/java/org/jboss/remoting/Remoting.java	2008-11-19 19:15:56 UTC (rev 4706)
@@ -1,6 +1,13 @@
 package org.jboss.remoting;
 
 import java.io.IOException;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Executor;
+import java.util.Collection;
+import java.util.Iterator;
 import org.jboss.remoting.core.EndpointImpl;
 import org.jboss.remoting.spi.RequestHandler;
 import org.jboss.remoting.spi.RequestHandlerSource;
@@ -11,27 +18,31 @@
  *
  */
 public final class Remoting {
-    // lifecycle lock
-    private static final Object lifecycle = new Object();
 
-    public static Endpoint createEndpoint(String name) throws IOException {
-        synchronized (lifecycle) {
-            final EndpointImpl endpointImpl = new EndpointImpl();
-            endpointImpl.setName(name);
-            endpointImpl.start();
-            return endpointImpl;
-        }
+    public static Endpoint createEndpoint(final String name) {
+        return createEndpoint(name, 10);
     }
 
-    public static void closeEndpoint(Endpoint endpoint) {
-        synchronized (lifecycle) {
-            if (endpoint instanceof EndpointImpl) {
-                final EndpointImpl endpointImpl = (EndpointImpl) endpoint;
-                endpointImpl.stop();
+    public static Endpoint createEndpoint(final String name, final int maxThreads) {
+        final ThreadPoolExecutor executor = new ThreadPoolExecutor(0, maxThreads, Long.MAX_VALUE, TimeUnit.NANOSECONDS, new AlwaysBlockingQueue<Runnable>(new SynchronousQueue<Runnable>()), new ThreadPoolExecutor.AbortPolicy());
+        final EndpointImpl endpoint = new EndpointImpl(executor, name);
+        endpoint.addCloseHandler(new CloseHandler<Endpoint>() {
+            public void handleClose(final Endpoint closed) {
+                executor.shutdown();
+                try {
+                    executor.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
+                } catch (InterruptedException e) {
+                    Thread.currentThread().interrupt();
+                }
             }
-        }
+        });
+        return endpoint;
     }
 
+    public static Endpoint createEndpoint(final Executor executor, final String name) {
+        return new EndpointImpl(executor, name);
+    }
+
     public static <I, O> Client<I, O> createLocalClient(final Endpoint endpoint, final RequestListener<I, O> requestListener, final Class<I> requestClass, final Class<O> replyClass) throws IOException {
         final Handle<RequestHandler> handle = endpoint.createRequestHandler(requestListener, requestClass, replyClass);
         try {
@@ -50,6 +61,128 @@
         }
     }
 
+    private static class AlwaysBlockingQueue<T> implements BlockingQueue<T> {
+        private final BlockingQueue<T> delegate;
+
+        public AlwaysBlockingQueue(final BlockingQueue<T> delegate) {
+            this.delegate = delegate;
+        }
+
+        public boolean offer(final T o) {
+            try {
+                delegate.put(o);
+                return true;
+            } catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
+                return false;
+            }
+        }
+
+        public boolean offer(final T o, final long timeout, final TimeUnit unit) throws InterruptedException {
+            return delegate.offer(o, timeout, unit);
+        }
+
+        public T poll(final long timeout, final TimeUnit unit) throws InterruptedException {
+            return delegate.poll(timeout, unit);
+        }
+
+        public T take() throws InterruptedException {
+            return delegate.take();
+        }
+
+        public void put(final T o) throws InterruptedException {
+            delegate.put(o);
+        }
+
+        public int remainingCapacity() {
+            return delegate.remainingCapacity();
+        }
+
+        public boolean add(final T o) {
+            return delegate.add(o);
+        }
+
+        public int drainTo(final Collection<? super T> c) {
+            return delegate.drainTo(c);
+        }
+
+        public int drainTo(final Collection<? super T> c, final int maxElements) {
+            return delegate.drainTo(c, maxElements);
+        }
+
+        public T poll() {
+            return delegate.poll();
+        }
+
+        public T remove() {
+            return delegate.remove();
+        }
+
+        public T peek() {
+            return delegate.peek();
+        }
+
+        public T element() {
+            return delegate.element();
+        }
+
+        public int size() {
+            return delegate.size();
+        }
+
+        public boolean isEmpty() {
+            return delegate.isEmpty();
+        }
+
+        public boolean contains(final Object o) {
+            return delegate.contains(o);
+        }
+
+        public Iterator<T> iterator() {
+            return delegate.iterator();
+        }
+
+        public Object[] toArray() {
+            return delegate.toArray();
+        }
+
+        public <T> T[] toArray(final T[] a) {
+            return delegate.toArray(a);
+        }
+
+        public boolean remove(final Object o) {
+            return delegate.remove(o);
+        }
+
+        public boolean containsAll(final Collection<?> c) {
+            return delegate.containsAll(c);
+        }
+
+        public boolean addAll(final Collection<? extends T> c) {
+            return delegate.addAll(c);
+        }
+
+        public boolean removeAll(final Collection<?> c) {
+            return delegate.removeAll(c);
+        }
+
+        public boolean retainAll(final Collection<?> c) {
+            return delegate.retainAll(c);
+        }
+
+        public void clear() {
+            delegate.clear();
+        }
+
+        public boolean equals(final Object o) {
+            return delegate.equals(o);
+        }
+
+        public int hashCode() {
+            return delegate.hashCode();
+        }
+    }
+
     // privates
 
     private Remoting() { /* empty */ }




More information about the jboss-remoting-commits mailing list