Author: david.lloyd(a)jboss.com
Date: 2008-07-03 22:50:34 -0400 (Thu, 03 Jul 2008)
New Revision: 4365
Modified:
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/Closeable.java
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/SpiUtils.java
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/remote/Handle.java
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/remote/RemoteClientEndpoint.java
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/remote/RemoteServiceEndpoint.java
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/wrapper/ClientSourceWrapper.java
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/wrapper/ClientWrapper.java
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/AbstractCloseable.java
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/EndpointImpl.java
remoting3/trunk/core/src/test/java/org/jboss/cx/remoting/core/EndpointTestCase.java
Log:
Have endpoints auto-close all resources on stop; fix declaration of close handlers to
allow more general handlers
Modified: remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/Closeable.java
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/Closeable.java 2008-07-04
02:21:32 UTC (rev 4364)
+++ remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/Closeable.java 2008-07-04
02:50:34 UTC (rev 4365)
@@ -20,5 +20,5 @@
*
* @param handler the close handler
*/
- void addCloseHandler(CloseHandler<T> handler);
+ void addCloseHandler(CloseHandler<? super T> handler);
}
Modified: remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/SpiUtils.java
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/SpiUtils.java 2008-07-04
02:21:32 UTC (rev 4364)
+++ remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/SpiUtils.java 2008-07-04
02:50:34 UTC (rev 4365)
@@ -104,7 +104,7 @@
* @param handler the close handler
* @param closed the object that was closed
*/
- public static <T> void safeHandleClose(final CloseHandler<T> handler,
final T closed) {
+ public static <T> void safeHandleClose(final CloseHandler<? super T>
handler, final T closed) {
try {
handler.handleClose(closed);
} catch (Throwable t) {
Modified: remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/remote/Handle.java
===================================================================
---
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/remote/Handle.java 2008-07-04
02:21:32 UTC (rev 4364)
+++
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/remote/Handle.java 2008-07-04
02:50:34 UTC (rev 4365)
@@ -50,5 +50,5 @@
*
* @param handler the handler
*/
- void addCloseHandler(final CloseHandler<Handle<T>> handler);
+ void addCloseHandler(final CloseHandler<? super Handle<T>> handler);
}
Modified:
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/remote/RemoteClientEndpoint.java
===================================================================
---
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/remote/RemoteClientEndpoint.java 2008-07-04
02:21:32 UTC (rev 4364)
+++
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/remote/RemoteClientEndpoint.java 2008-07-04
02:50:34 UTC (rev 4365)
@@ -90,5 +90,5 @@
*
* @param handler the handler to be called
*/
- void addCloseHandler(final CloseHandler<RemoteClientEndpoint<I, O>>
handler);
+ void addCloseHandler(final CloseHandler<? super RemoteClientEndpoint<I,
O>> handler);
}
Modified:
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/remote/RemoteServiceEndpoint.java
===================================================================
---
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/remote/RemoteServiceEndpoint.java 2008-07-04
02:21:32 UTC (rev 4364)
+++
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/remote/RemoteServiceEndpoint.java 2008-07-04
02:50:34 UTC (rev 4365)
@@ -75,5 +75,5 @@
*
* @param handler the handler to be called
*/
- void addCloseHandler(final CloseHandler<RemoteServiceEndpoint<I, O>>
handler);
+ void addCloseHandler(final CloseHandler<? super RemoteServiceEndpoint<I,
O>> handler);
}
Modified:
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/wrapper/ClientSourceWrapper.java
===================================================================
---
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/wrapper/ClientSourceWrapper.java 2008-07-04
02:21:32 UTC (rev 4364)
+++
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/wrapper/ClientSourceWrapper.java 2008-07-04
02:50:34 UTC (rev 4365)
@@ -33,7 +33,7 @@
/**
* {@inheritDoc} This implementation calls the same method on the delegate object.
*/
- public void addCloseHandler(final CloseHandler<ClientSource<I, O>>
closeHandler) {
+ public void addCloseHandler(final CloseHandler<? super ClientSource<I,
O>> closeHandler) {
delegate.addCloseHandler(new CloseHandler<ClientSource<I, O>>() {
public void handleClose(final ClientSource<I, O> closed) {
closeHandler.handleClose(ClientSourceWrapper.this);
Modified:
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/wrapper/ClientWrapper.java
===================================================================
---
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/wrapper/ClientWrapper.java 2008-07-04
02:21:32 UTC (rev 4364)
+++
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/wrapper/ClientWrapper.java 2008-07-04
02:50:34 UTC (rev 4365)
@@ -35,7 +35,7 @@
/**
* {@inheritDoc} This implementation calls the same method on the delegate object.
*/
- public void addCloseHandler(final CloseHandler<Client<I, O>>
closeHandler) {
+ public void addCloseHandler(final CloseHandler<? super Client<I, O>>
closeHandler) {
delegate.addCloseHandler(new CloseHandler<Client<I, O>>() {
public void handleClose(final Client<I, O> closed) {
closeHandler.handleClose(ClientWrapper.this);
Modified:
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/AbstractCloseable.java
===================================================================
---
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/AbstractCloseable.java 2008-07-04
02:21:32 UTC (rev 4364)
+++
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/AbstractCloseable.java 2008-07-04
02:50:34 UTC (rev 4365)
@@ -43,7 +43,7 @@
protected final Executor executor;
private final Object closeLock = new Object();
private final AtomicBoolean closed = new AtomicBoolean();
- private Set<CloseHandler<T>> closeHandlers;
+ private Set<CloseHandler<? super T>> closeHandlers;
protected AbstractCloseable(final Executor executor) {
if (executor == null) {
@@ -60,7 +60,7 @@
if (! closed.getAndSet(true)) {
synchronized (closeLock) {
if (closeHandlers != null) {
- for (final CloseHandler<T> handler : closeHandlers) {
+ for (final CloseHandler<? super T> handler : closeHandlers) {
executor.execute(new Runnable() {
@SuppressWarnings({ "unchecked" })
public void run() {
@@ -74,10 +74,10 @@
}
}
- public void addCloseHandler(final CloseHandler<T> handler) {
+ public void addCloseHandler(final CloseHandler<? super T> handler) {
synchronized (closeLock) {
if (closeHandlers == null) {
- closeHandlers = new HashSet<CloseHandler<T>>();
+ closeHandlers = new HashSet<CloseHandler<? super T>>();
}
closeHandlers.add(handler);
}
Modified: remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/EndpointImpl.java
===================================================================
---
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/EndpointImpl.java 2008-07-04
02:21:32 UTC (rev 4364)
+++
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/EndpointImpl.java 2008-07-04
02:50:34 UTC (rev 4365)
@@ -4,9 +4,13 @@
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.Set;
+import java.io.Closeable;
import org.jboss.cx.remoting.Endpoint;
import org.jboss.cx.remoting.RequestListener;
import org.jboss.cx.remoting.RemotingException;
+import org.jboss.cx.remoting.CloseHandler;
import org.jboss.cx.remoting.core.util.OrderedExecutorFactory;
import org.jboss.cx.remoting.spi.remote.RemoteClientEndpoint;
import org.jboss.cx.remoting.spi.remote.RemoteServiceEndpoint;
@@ -14,6 +18,7 @@
import org.jboss.cx.remoting.util.NamingThreadFactory;
import org.jboss.cx.remoting.version.Version;
import org.jboss.xnio.log.Logger;
+import org.jboss.xnio.IoUtils;
/**
*
@@ -30,6 +35,7 @@
private OrderedExecutorFactory orderedExecutorFactory;
private ExecutorService executorService;
+ private final Set<Closeable> resources =
CollectionUtil.synchronizedWeakHashSet();
private final ConcurrentMap<Object, Object> endpointMap =
CollectionUtil.concurrentMap();
public EndpointImpl() {
@@ -43,7 +49,7 @@
return executor;
}
- public Executor getOrderedExecutor() {
+ Executor getOrderedExecutor() {
return orderedExecutorFactory.getOrderedExecutor();
}
@@ -74,12 +80,36 @@
public void stop() {
// todo security check
- if (executorService != null) {
- executorService.shutdown();
- executorService = null;
- executor = null;
+ boolean intr = false;
+ try {
+ for (Closeable resource : resources) {
+ IoUtils.safeClose(resource);
+ }
+ synchronized (resources) {
+ while (! resources.isEmpty()) {
+ try {
+ resources.wait();
+ } catch (InterruptedException e) {
+ intr = true;
+ }
+ }
+ }
+ if (executorService != null) {
+ executorService.shutdown();
+ boolean done = false;
+ do try {
+ done = executorService.awaitTermination(30L, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ intr = true;
+ } while (! done);
+ executorService = null;
+ executor = null;
+ }
+ } finally {
+ if (intr) {
+ Thread.currentThread().interrupt();
+ }
}
- // todo
}
// Endpoint implementation
@@ -90,13 +120,24 @@
public <I, O> RemoteClientEndpoint<I, O> createClient(final
RequestListener<I, O> requestListener) throws RemotingException {
final RemoteClientEndpointLocalImpl<I, O> clientEndpoint = new
RemoteClientEndpointLocalImpl<I, O>(executor, requestListener);
+ clientEndpoint.addCloseHandler(remover);
clientEndpoint.open();
return clientEndpoint;
}
public <I, O> RemoteServiceEndpoint<I, O> createService(final
RequestListener<I, O> requestListener) throws RemotingException {
final RemoteServiceEndpointLocalImpl<I, O> serviceEndpoint = new
RemoteServiceEndpointLocalImpl<I, O>(executor, requestListener);
+ serviceEndpoint.addCloseHandler(remover);
serviceEndpoint.open();
return serviceEndpoint;
}
+
+ private final ResourceRemover remover = new ResourceRemover();
+
+ private final class ResourceRemover implements CloseHandler<Closeable> {
+ public void handleClose(final Closeable closed) {
+ resources.remove(closed);
+ resources.notifyAll();
+ }
+ }
}
Modified:
remoting3/trunk/core/src/test/java/org/jboss/cx/remoting/core/EndpointTestCase.java
===================================================================
---
remoting3/trunk/core/src/test/java/org/jboss/cx/remoting/core/EndpointTestCase.java 2008-07-04
02:21:32 UTC (rev 4364)
+++
remoting3/trunk/core/src/test/java/org/jboss/cx/remoting/core/EndpointTestCase.java 2008-07-04
02:50:34 UTC (rev 4365)
@@ -61,6 +61,7 @@
endpoint.start();
endpoint.stop();
executorService.shutdown();
+ assertTrue(executorService.awaitTermination(1L, TimeUnit.SECONDS));
} finally {
executorService.shutdownNow();
}
@@ -117,7 +118,7 @@
safeStop(endpoint);
}
executorService.shutdown();
- executorService.awaitTermination(1L, TimeUnit.SECONDS);
+ assertTrue(executorService.awaitTermination(1L, TimeUnit.SECONDS));
assertTrue(clientEndpointClosed.get());
assertTrue(clientClosed.get());
} finally {
@@ -176,7 +177,7 @@
safeStop(endpoint);
}
executorService.shutdown();
- executorService.awaitTermination(1L, TimeUnit.SECONDS);
+ assertTrue(executorService.awaitTermination(1L, TimeUnit.SECONDS));
assertTrue(clientEndpointClosed.get());
assertTrue(clientClosed.get());
} finally {