Author: david.lloyd(a)jboss.com
Date: 2008-11-19 14:15:56 -0500 (Wed, 19 Nov 2008)
New Revision: 4706
Modified:
remoting3/trunk/api/src/main/java/org/jboss/remoting/Endpoint.java
remoting3/trunk/build.xml
remoting3/trunk/core/src/main/java/org/jboss/remoting/core/EndpointImpl.java
remoting3/trunk/core/src/test/java/org/jboss/remoting/core/EndpointTestCase.java
remoting3/trunk/protocol/basic/src/test/java/org/jboss/remoting/protocol/basic/BasicTestCase.java
remoting3/trunk/protocol/multiplex/src/test/java/org/jboss/remoting/protocol/multiplex/ConnectionTestCase.java
remoting3/trunk/samples/src/main/java/org/jboss/remoting/samples/simple/LocalBasicExampleMain.java
remoting3/trunk/samples/src/main/java/org/jboss/remoting/samples/simple/LocalStreamExampleMain.java
remoting3/trunk/standalone/src/main/java/org/jboss/remoting/Remoting.java
Log:
JBREM-1066; make Endpoint closeable
Modified: remoting3/trunk/api/src/main/java/org/jboss/remoting/Endpoint.java
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/remoting/Endpoint.java 2008-11-19 16:22:04
UTC (rev 4705)
+++ remoting3/trunk/api/src/main/java/org/jboss/remoting/Endpoint.java 2008-11-19 19:15:56
UTC (rev 4706)
@@ -11,7 +11,7 @@
/**
* A potential participant in a JBoss Remoting communications relationship.
*/
-public interface Endpoint {
+public interface Endpoint extends HandleableCloseable<Endpoint> {
/**
* Get the endpoint attribute map. This is a storage area for any data associated
with this endpoint, including
* (but not limited to) connection and protocol information, and application
information.
Modified: remoting3/trunk/build.xml
===================================================================
--- remoting3/trunk/build.xml 2008-11-19 16:22:04 UTC (rev 4705)
+++ remoting3/trunk/build.xml 2008-11-19 19:15:56 UTC (rev 4706)
@@ -338,6 +338,8 @@
<target name="api" description="Build the API module"
depends="lib.marshalling-api,lib.xnio-api,api.compile">
<path id="api.classpath">
<pathelement location="api/target/main/classes"/>
+ <pathelement location="${lib.xnio-api.local}"/>
+ <pathelement location="${lib.marshalling-api.local}"/>
</path>
</target>
@@ -404,8 +406,6 @@
<classpath>
<path refid="api.classpath"/>
<path refid="version.classpath"/>
- <pathelement location="${lib.marshalling-api.local}"/>
- <pathelement location="${lib.xnio-api.local}"/>
</classpath>
</javac>
<touch file="core/target/main/.lastcompile"
verbose="false"/>
@@ -435,9 +435,7 @@
<path refid="api.classpath"/>
<path refid="core.classpath"/>
<path refid="testing-support.classpath"/>
- <pathelement location="${lib.marshalling-api.local}"/>
<pathelement location="${lib.junit.local}"/>
- <pathelement location="${lib.xnio-api.local}"/>
</classpath>
</javac>
<touch file="core/target/test/.lastcompile"
verbose="false"/>
@@ -462,8 +460,6 @@
<path refid="testing-support.classpath"/>
<pathelement location="core/target/test/classes"/>
<pathelement location="${lib.junit.local}"/>
- <pathelement location="${lib.marshalling-api.local}"/>
- <pathelement location="${lib.xnio-api.local}"/>
</classpath>
<batchtest fork="yes"
todir="core/target/test-results"
haltonfailure="no">
@@ -605,9 +601,7 @@
<path refid="protocol.multiplex.classpath"/>
<path refid="testing-support.classpath"/>
<pathelement location="${lib.junit.local}"/>
- <pathelement location="${lib.marshalling-api.local}"/>
<pathelement location="${lib.river.local}"/>
- <pathelement location="${lib.xnio-api.local}"/>
<pathelement location="${lib.xnio-nio.local}"/>
</classpath>
</javac>
@@ -636,9 +630,7 @@
<path refid="testing-support.classpath"/>
<pathelement
location="protocol/multiplex/target/test/classes"/>
<pathelement location="${lib.junit.local}"/>
- <pathelement location="${lib.marshalling-api.local}"/>
<pathelement location="${lib.river.local}"/>
- <pathelement location="${lib.xnio-api.local}"/>
<pathelement location="${lib.xnio-nio.local}"/>
</classpath>
<batchtest fork="yes"
todir="protocol/multiplex/target/test-results"
@@ -732,9 +724,7 @@
<path refid="standalone.classpath"/>
<path refid="testing-support.classpath"/>
<pathelement location="${lib.junit.local}"/>
- <pathelement location="${lib.marshalling-api.local}"/>
<pathelement location="${lib.river.local}"/>
- <pathelement location="${lib.xnio-api.local}"/>
<pathelement location="${lib.xnio-nio.local}"/>
</classpath>
</javac>
@@ -764,9 +754,7 @@
<path refid="testing-support.classpath"/>
<pathelement
location="protocol/basic/target/test/classes"/>
<pathelement location="${lib.junit.local}"/>
- <pathelement location="${lib.marshalling-api.local}"/>
<pathelement location="${lib.river.local}"/>
- <pathelement location="${lib.xnio-api.local}"/>
<pathelement location="${lib.xnio-nio.local}"/>
</classpath>
<batchtest fork="yes"
todir="protocol/basic/target/test-results"
Modified: remoting3/trunk/core/src/main/java/org/jboss/remoting/core/EndpointImpl.java
===================================================================
---
remoting3/trunk/core/src/main/java/org/jboss/remoting/core/EndpointImpl.java 2008-11-19
16:22:04 UTC (rev 4705)
+++
remoting3/trunk/core/src/main/java/org/jboss/remoting/core/EndpointImpl.java 2008-11-19
19:15:56 UTC (rev 4706)
@@ -1,19 +1,16 @@
package org.jboss.remoting.core;
-import java.io.Closeable;
import java.io.IOException;
+import java.io.Closeable;
import java.net.URI;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
+import java.lang.ref.WeakReference;
import org.jboss.remoting.Client;
import org.jboss.remoting.ClientSource;
-import org.jboss.remoting.CloseHandler;
import org.jboss.remoting.Endpoint;
import org.jboss.remoting.RequestListener;
import org.jboss.remoting.ServiceListener;
@@ -22,24 +19,26 @@
import org.jboss.remoting.EndpointPermission;
import org.jboss.remoting.RemoteServiceConfiguration;
import org.jboss.remoting.ServiceURI;
+import org.jboss.remoting.CloseHandler;
import org.jboss.remoting.core.util.OrderedExecutorFactory;
import org.jboss.remoting.core.util.CollectionUtil;
-import org.jboss.remoting.core.util.NamingThreadFactory;
-import org.jboss.remoting.spi.AbstractSimpleCloseable;
import org.jboss.remoting.spi.Handle;
import org.jboss.remoting.spi.RequestHandler;
import org.jboss.remoting.spi.RequestHandlerSource;
+import org.jboss.remoting.spi.AbstractHandleableCloseable;
+import org.jboss.remoting.spi.AbstractSimpleCloseable;
import org.jboss.remoting.version.Version;
import org.jboss.xnio.FailedIoFuture;
import org.jboss.xnio.FinishedIoFuture;
import org.jboss.xnio.IoFuture;
import org.jboss.xnio.IoUtils;
+import org.jboss.xnio.WeakCloseable;
import org.jboss.xnio.log.Logger;
/**
*
*/
-public final class EndpointImpl implements Endpoint {
+public final class EndpointImpl extends AbstractHandleableCloseable<Endpoint>
implements Endpoint {
static {
// Print Remoting "greeting" message
@@ -48,12 +47,10 @@
private static final Logger log =
Logger.getLogger("org.jboss.remoting.endpoint");
- private String name;
+ private final String name;
- private OrderedExecutorFactory orderedExecutorFactory;
- private ExecutorService executorService;
+ private final OrderedExecutorFactory orderedExecutorFactory;
- private final Set<Closeable> resources =
CollectionUtil.synchronizedWeakHashSet();
private final ConcurrentMap<Object, Object> endpointMap =
CollectionUtil.concurrentMap();
private final Object serviceLock = new Object();
@@ -67,82 +64,29 @@
private static final EndpointPermission REGISTER_REMOTE_SERVICE_PERM = new
EndpointPermission("registerRemoteService");
private static final EndpointPermission ADD_SERVICE_LISTENER_PERM = new
EndpointPermission("addServiceListener");
- public EndpointImpl() {
+ public EndpointImpl(final Executor executor, final String name) {
+ super(executor);
+ this.executor = executor;
+ this.name = name;
+ orderedExecutorFactory = new OrderedExecutorFactory(executor);
}
- // Dependencies
+ private final Executor executor;
- private Executor executor;
-
- public Executor getExecutor() {
- return executor;
- }
-
- Executor getOrderedExecutor() {
+ protected Executor getOrderedExecutor() {
return orderedExecutorFactory.getOrderedExecutor();
}
- public void setExecutor(final Executor executor) {
- this.executor = executor;
- orderedExecutorFactory = new OrderedExecutorFactory(executor);
+ protected Executor getExecutor() {
+ return executor;
}
- // Configuration
+ // Endpoint implementation
- public void setName(final String name) {
- this.name = name;
- }
-
public String getName() {
return name;
}
- // Lifecycle
-
- public void start() {
- // todo security check
- if (executor == null) {
- executor = executorService = Executors.newCachedThreadPool(new
NamingThreadFactory(Executors.defaultThreadFactory(), "Remoting endpoint %s"));
- setExecutor(executorService);
- }
- }
-
- public void stop() {
- // todo security check
- boolean intr = false;
- try {
- for (Closeable resource : resources) {
- IoUtils.safeClose(resource);
- }
- synchronized (resources) {
- while (! resources.isEmpty()) {
- try {
- resources.wait();
- } catch (InterruptedException e) {
- intr = true;
- }
- }
- }
- if (executorService != null) {
- executorService.shutdown();
- boolean done = false;
- do try {
- done = executorService.awaitTermination(30L, TimeUnit.SECONDS);
- } catch (InterruptedException e) {
- intr = true;
- } while (! done);
- executorService = null;
- executor = null;
- }
- } finally {
- if (intr) {
- Thread.currentThread().interrupt();
- }
- }
- }
-
- // Endpoint implementation
-
public ConcurrentMap<Object, Object> getAttributes() {
return endpointMap;
}
@@ -152,12 +96,22 @@
if (sm != null) {
sm.checkPermission(CREATE_REQUEST_HANDLER_PERM);
}
- LocalRequestHandler.Config<I, O> config = new
LocalRequestHandler.Config<I,O>(requestClass, replyClass);
+ LocalRequestHandler.Config<I, O> config = new
LocalRequestHandler.Config<I, O>(requestClass, replyClass);
config.setExecutor(executor);
config.setRequestListener(requestListener);
config.setClientContext(new ClientContextImpl(executor));
final LocalRequestHandler<I, O> localRequestHandler = new
LocalRequestHandler<I, O>(config);
- localRequestHandler.addCloseHandler(remover);
+ final WeakCloseable lrhCloseable = new WeakCloseable(new
WeakReference<Closeable>(localRequestHandler));
+ final Key key = addCloseHandler(new CloseHandler<Endpoint>() {
+ public void handleClose(final Endpoint closed) {
+ IoUtils.safeClose(lrhCloseable);
+ }
+ });
+ localRequestHandler.addCloseHandler(new CloseHandler<RequestHandler>() {
+ public void handleClose(final RequestHandler closed) {
+ key.remove();
+ }
+ });
localRequestHandler.open();
return localRequestHandler.getHandle();
}
@@ -217,7 +171,17 @@
}
}
}
- localRequestHandlerSource.addCloseHandler(remover);
+ final WeakCloseable lrhCloseable = new WeakCloseable(new
WeakReference<Closeable>(localRequestHandlerSource));
+ final Key key = addCloseHandler(new CloseHandler<Endpoint>() {
+ public void handleClose(final Endpoint closed) {
+ IoUtils.safeClose(lrhCloseable);
+ }
+ });
+ localRequestHandlerSource.addCloseHandler(new
CloseHandler<RequestHandlerSource>() {
+ public void handleClose(final RequestHandlerSource closed) {
+ key.remove();
+ }
+ });
localRequestHandlerSource.open();
return localRequestHandlerSource.getHandle();
}
@@ -235,6 +199,17 @@
final Handle<RequestHandler> handle = requestHandler.getHandle();
try {
final ClientImpl<I, O> client = ClientImpl.create(handle, executor,
requestType, replyType);
+ final WeakCloseable lrhCloseable = new WeakCloseable(new
WeakReference<Closeable>(client));
+ final Key key = addCloseHandler(new CloseHandler<Endpoint>() {
+ public void handleClose(final Endpoint closed) {
+ IoUtils.safeClose(lrhCloseable);
+ }
+ });
+ client.addCloseHandler(new CloseHandler<Client>() {
+ public void handleClose(final Client closed) {
+ key.remove();
+ }
+ });
ok = true;
return client;
} finally {
@@ -253,6 +228,17 @@
final Handle<RequestHandlerSource> handle =
requestHandlerSource.getHandle();
try {
final ClientSourceImpl<I, O> clientSource =
ClientSourceImpl.create(handle, this, requestClass, replyClass);
+ final WeakCloseable lrhCloseable = new WeakCloseable(new
WeakReference<Closeable>(clientSource));
+ final Key key = addCloseHandler(new CloseHandler<Endpoint>() {
+ public void handleClose(final Endpoint closed) {
+ IoUtils.safeClose(lrhCloseable);
+ }
+ });
+ clientSource.addCloseHandler(new CloseHandler<ClientSource>() {
+ public void handleClose(final ClientSource closed) {
+ key.remove();
+ }
+ });
ok = true;
return clientSource;
} finally {
@@ -459,20 +445,6 @@
}
}
- private final ResourceRemover remover = new ResourceRemover();
-
- private final class ResourceRemover implements CloseHandler<Closeable> {
- public void handleClose(final Closeable closed) {
- synchronized (resources)
- {
- resources.remove(closed);
- if (resources.isEmpty()) {
- resources.notifyAll();
- }
- }
- }
- }
-
public String toString() {
return "endpoint \"" + name + "\" <" +
Integer.toString(hashCode()) + ">";
}
Modified:
remoting3/trunk/core/src/test/java/org/jboss/remoting/core/EndpointTestCase.java
===================================================================
---
remoting3/trunk/core/src/test/java/org/jboss/remoting/core/EndpointTestCase.java 2008-11-19
16:22:04 UTC (rev 4705)
+++
remoting3/trunk/core/src/test/java/org/jboss/remoting/core/EndpointTestCase.java 2008-11-19
19:15:56 UTC (rev 4706)
@@ -51,21 +51,11 @@
private static final Logger log = Logger.getLogger(EndpointTestCase.class);
- private static void safeStop(EndpointImpl endpoint) {
- try {
- endpoint.stop();
- } catch (Throwable t) {
- t.printStackTrace();
- }
- }
-
public void testCreate() throws Throwable {
- final EndpointImpl endpoint = new EndpointImpl();
final ExecutorService executorService = Executors.newCachedThreadPool();
+ final EndpointImpl endpoint = new EndpointImpl(executorService,
"foo");
try {
- endpoint.setExecutor(executorService);
- endpoint.start();
- endpoint.stop();
+ endpoint.close();
executorService.shutdown();
assertTrue(executorService.awaitTermination(1L, TimeUnit.SECONDS));
} finally {
@@ -76,13 +66,11 @@
public void testLocalClientInvoke() throws Throwable {
final AtomicBoolean clientEndpointClosed = new AtomicBoolean(false);
final AtomicBoolean clientClosed = new AtomicBoolean(false);
- final EndpointImpl endpoint = new EndpointImpl();
final ExecutorService executorService = Executors.newCachedThreadPool();
- final Object requestObj = new Object();
- final Object replyObj = new Object();
try {
- endpoint.setExecutor(executorService);
- endpoint.start();
+ final EndpointImpl endpoint = new EndpointImpl(executorService,
"test-endpoint");
+ final Object requestObj = new Object();
+ final Object replyObj = new Object();
try {
final Handle<RequestHandler> handle =
endpoint.createRequestHandler(new AbstractRequestListener<Object, Object>() {
public void handleRequest(final RequestContext<Object> context,
final Object request) throws RemoteExecutionException {
@@ -90,43 +78,44 @@
try {
context.sendReply(replyObj);
} catch (IOException e) {
- try {
- context.sendFailure(e.getMessage(), e);
- } catch (IOException e1) {
- fail("double fault");
- }
+ log.error(e, "Error sending reply!");
}
}
}, Object.class, Object.class);
- final RequestHandler requestHandler = handle.getResource();
try {
- requestHandler.addCloseHandler(new
CloseHandler<RequestHandler>() {
- public void handleClose(final RequestHandler closed) {
- clientEndpointClosed.set(true);
- }
- });
- final Client<Object,Object> client =
endpoint.createClient(requestHandler, Object.class, Object.class);
+ final RequestHandler requestHandler = handle.getResource();
try {
- client.addCloseHandler(new CloseHandler<Client<Object,
Object>>() {
- public void handleClose(final Client<Object, Object>
closed) {
- clientClosed.set(true);
+ requestHandler.addCloseHandler(new
CloseHandler<RequestHandler>() {
+ public void handleClose(final RequestHandler closed) {
+ clientEndpointClosed.set(true);
}
});
- assertEquals(replyObj, client.invoke(requestObj));
- client.close();
+ final Client<Object,Object> client =
endpoint.createClient(requestHandler, Object.class, Object.class);
+ try {
+ client.addCloseHandler(new CloseHandler<Client<Object,
Object>>() {
+ public void handleClose(final Client<Object,
Object> closed) {
+ clientClosed.set(true);
+ }
+ });
+ handle.close();
+ assertEquals(replyObj, client.invoke(requestObj));
+ client.close();
+ executorService.shutdown();
+ assertTrue(executorService.awaitTermination(1L,
TimeUnit.SECONDS));
+ assertTrue(clientEndpointClosed.get());
+ assertTrue(clientClosed.get());
+ } finally {
+ IoUtils.safeClose(client);
+ }
} finally {
- IoUtils.safeClose(client);
+ IoUtils.safeClose(requestHandler);
}
} finally {
- IoUtils.safeClose(requestHandler);
+ IoUtils.safeClose(handle);
}
} finally {
- safeStop(endpoint);
+ IoUtils.safeClose(endpoint);
}
- executorService.shutdown();
- assertTrue(executorService.awaitTermination(1L, TimeUnit.SECONDS));
- assertTrue(clientEndpointClosed.get());
- assertTrue(clientClosed.get());
} finally {
executorService.shutdownNow();
}
@@ -135,14 +124,12 @@
public void testLocalClientSend() throws Throwable {
final AtomicBoolean clientEndpointClosed = new AtomicBoolean(false);
final AtomicBoolean clientClosed = new AtomicBoolean(false);
- final EndpointImpl endpoint = new EndpointImpl();
final ExecutorService executorService = Executors.newCachedThreadPool();
- final Object requestObj = new Object();
- final Object replyObj = new Object();
try {
- endpoint.setExecutor(executorService);
- endpoint.start();
+ final EndpointImpl endpoint = new EndpointImpl(executorService,
"test-endpoint");
try {
+ final Object requestObj = new Object();
+ final Object replyObj = new Object();
final Handle<RequestHandler> handle =
endpoint.createRequestHandler(new AbstractRequestListener<Object, Object>() {
public void handleRequest(final RequestContext<Object> context,
final Object request) throws RemoteExecutionException {
assertEquals(request, requestObj);
@@ -153,65 +140,78 @@
}
}
}, Object.class, Object.class);
- final RequestHandler requestHandler = handle.getResource();
try {
- requestHandler.addCloseHandler(new
CloseHandler<RequestHandler>() {
- public void handleClose(final RequestHandler closed) {
- clientEndpointClosed.set(true);
- }
- });
- final Client<Object,Object> client =
endpoint.createClient(requestHandler, Object.class, Object.class);
+ final RequestHandler requestHandler = handle.getResource();
try {
- client.addCloseHandler(new CloseHandler<Client<Object,
Object>>() {
- public void handleClose(final Client<Object, Object>
closed) {
- clientClosed.set(true);
+ requestHandler.addCloseHandler(new
CloseHandler<RequestHandler>() {
+ public void handleClose(final RequestHandler closed) {
+ clientEndpointClosed.set(true);
}
});
- assertEquals(replyObj, client.send(requestObj).get());
- client.close();
+ final Client<Object,Object> client =
endpoint.createClient(requestHandler, Object.class, Object.class);
+ try {
+ client.addCloseHandler(new CloseHandler<Client<Object,
Object>>() {
+ public void handleClose(final Client<Object,
Object> closed) {
+ clientClosed.set(true);
+ }
+ });
+ handle.close();
+ final IoFuture<Object> futureReply =
client.send(requestObj);
+ assertEquals(IoFuture.Status.DONE, futureReply.await(1L,
TimeUnit.SECONDS));
+ assertEquals(replyObj, futureReply.get());
+ client.close();
+ executorService.shutdown();
+ assertTrue(executorService.awaitTermination(1L,
TimeUnit.SECONDS));
+ assertTrue(clientEndpointClosed.get());
+ assertTrue(clientClosed.get());
+ } finally {
+ IoUtils.safeClose(client);
+ }
} finally {
- IoUtils.safeClose(client);
+ IoUtils.safeClose(requestHandler);
}
} finally {
- IoUtils.safeClose(requestHandler);
+ IoUtils.safeClose(handle);
}
} finally {
- safeStop(endpoint);
+ IoUtils.safeClose(endpoint);
}
- executorService.shutdown();
- assertTrue(executorService.awaitTermination(1L, TimeUnit.SECONDS));
- assertTrue(clientEndpointClosed.get());
- assertTrue(clientClosed.get());
} finally {
executorService.shutdownNow();
}
}
public void testUnsentReply() throws Throwable {
- final EndpointImpl endpoint = new EndpointImpl();
final ExecutorService executorService = Executors.newCachedThreadPool();
- final Object requestObj = new Object();
try {
- endpoint.setExecutor(executorService);
- endpoint.start();
+ final EndpointImpl endpoint = new EndpointImpl(executorService,
"test-endpoint");
try {
+ final Object requestObj = new Object();
final Handle<RequestHandler> handle =
endpoint.createRequestHandler(new AbstractRequestListener<Object, Object>() {
public void handleRequest(final RequestContext<Object> context,
final Object request) throws RemoteExecutionException {
assertEquals(request, requestObj);
// don't send a reply!!
}
}, Object.class, Object.class);
- final RequestHandler requestHandler = handle.getResource();
try {
- final Client<Object,Object> client =
endpoint.createClient(requestHandler, Object.class, Object.class);
- final IoFuture<Object> futureReply = client.send(requestObj);
- assertEquals(IoFuture.Status.FAILED, futureReply.await(500L,
TimeUnit.MILLISECONDS));
- assertTrue(futureReply.getException() instanceof
IndeterminateOutcomeException);
+ final RequestHandler requestHandler = handle.getResource();
+ try {
+ final Client<Object,Object> client =
endpoint.createClient(requestHandler, Object.class, Object.class);
+ try {
+ final IoFuture<Object> futureReply =
client.send(requestObj);
+ assertEquals(IoFuture.Status.FAILED, futureReply.await(500L,
TimeUnit.MILLISECONDS));
+ assertTrue(futureReply.getException() instanceof
IndeterminateOutcomeException);
+ } finally {
+ IoUtils.safeClose(client);
+ }
+ } finally {
+ IoUtils.safeClose(requestHandler);
+ }
} finally {
- IoUtils.safeClose(requestHandler);
+ IoUtils.safeClose(handle);
}
} finally {
- safeStop(endpoint);
+ IoUtils.safeClose(endpoint);
}
} finally {
executorService.shutdownNow();
@@ -219,13 +219,11 @@
}
public void testUnsentReply2() throws Throwable {
- final EndpointImpl endpoint = new EndpointImpl();
final ExecutorService executorService = Executors.newCachedThreadPool();
- final Object requestObj = new Object();
try {
- endpoint.setExecutor(executorService);
- endpoint.start();
+ final EndpointImpl endpoint = new EndpointImpl(executorService,
"test-endpoint");
try {
+ final Object requestObj = new Object();
final Handle<RequestHandler> handle =
endpoint.createRequestHandler(new AbstractRequestListener<Object, Object>() {
public void handleRequest(final RequestContext<Object> context,
final Object request) throws RemoteExecutionException {
assertEquals(request, requestObj);
@@ -249,19 +247,28 @@
public void run() {
}
});
+ // don't send a reply!!
}
}, Object.class, Object.class);
- final RequestHandler requestHandler = handle.getResource();
try {
- final Client<Object,Object> client =
endpoint.createClient(requestHandler, Object.class, Object.class);
- final IoFuture<Object> futureReply = client.send(requestObj);
- assertEquals(IoFuture.Status.FAILED, futureReply.await(500L,
TimeUnit.MILLISECONDS));
- assertTrue(futureReply.getException() instanceof
IndeterminateOutcomeException);
+ final RequestHandler requestHandler = handle.getResource();
+ try {
+ final Client<Object,Object> client =
endpoint.createClient(requestHandler, Object.class, Object.class);
+ try {
+ final IoFuture<Object> futureReply =
client.send(requestObj);
+ assertEquals(IoFuture.Status.FAILED, futureReply.await(500L,
TimeUnit.MILLISECONDS));
+ assertTrue(futureReply.getException() instanceof
IndeterminateOutcomeException);
+ } finally {
+ IoUtils.safeClose(client);
+ }
+ } finally {
+ IoUtils.safeClose(requestHandler);
+ }
} finally {
- IoUtils.safeClose(requestHandler);
+ IoUtils.safeClose(handle);
}
} finally {
- safeStop(endpoint);
+ IoUtils.safeClose(endpoint);
}
} finally {
executorService.shutdownNow();
Modified:
remoting3/trunk/protocol/basic/src/test/java/org/jboss/remoting/protocol/basic/BasicTestCase.java
===================================================================
---
remoting3/trunk/protocol/basic/src/test/java/org/jboss/remoting/protocol/basic/BasicTestCase.java 2008-11-19
16:22:04 UTC (rev 4705)
+++
remoting3/trunk/protocol/basic/src/test/java/org/jboss/remoting/protocol/basic/BasicTestCase.java 2008-11-19
19:15:56 UTC (rev 4706)
@@ -57,52 +57,83 @@
public static void testConnect() throws Throwable {
ExecutorService executor = new ThreadPoolExecutor(10, 10, 0L, TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>());
- Xnio xnio = NioXnio.create(executor, 2, 2, 2);
- final BasicConfiguration configuration = new BasicConfiguration();
- configuration.setExecutor(executor);
- configuration.setMarshallerFactory(new RiverMarshallerFactory());
- final MarshallingConfiguration marshallingConfiguration = new
MarshallingConfiguration();
- configuration.setMarshallingConfiguration(marshallingConfiguration);
- final Endpoint endpoint = Remoting.createEndpoint("test");
- final Handle<RequestHandler> requestHandlerHandle =
endpoint.createRequestHandler(new AbstractRequestListener<Object, Object>() {
- public void handleRequest(final RequestContext<Object> context, final
Object request) throws RemoteExecutionException {
- System.out.println("Got a request! " + request.toString());
+ try {
+ Xnio xnio = NioXnio.create(executor, 2, 2, 2);
+ try {
+ final BasicConfiguration configuration = new BasicConfiguration();
+ configuration.setExecutor(executor);
+ configuration.setMarshallerFactory(new RiverMarshallerFactory());
+ final MarshallingConfiguration marshallingConfiguration = new
MarshallingConfiguration();
+ configuration.setMarshallingConfiguration(marshallingConfiguration);
+ final Endpoint endpoint = Remoting.createEndpoint(executor,
"test");
try {
- context.sendReply("GOOMBA");
- } catch (IOException e) {
+ final Handle<RequestHandler> requestHandlerHandle =
endpoint.createRequestHandler(new AbstractRequestListener<Object, Object>() {
+ public void handleRequest(final RequestContext<Object>
context, final Object request) throws RemoteExecutionException {
+ System.out.println("Got a request! " +
request.toString());
+ try {
+ context.sendReply("GOOMBA");
+ } catch (IOException e) {
+ try {
+ context.sendFailure("Failed", e);
+ } catch (IOException e1) {
+ // buh
+ }
+ }
+ }
+ }, Object.class, Object.class);
try {
- context.sendFailure("Failed", e);
- } catch (IOException e1) {
- // buh
- }
- }
- }
- }, Object.class, Object.class);
- final ChannelSource<StreamChannel> channelSource =
xnio.createPipeServer(executor, IoUtils.singletonHandlerFactory(new
IoHandler<StreamChannel>() {
- public void handleOpened(final StreamChannel channel) {
- try {
- System.out.println("Opening channel");
- BasicProtocol.createServer(requestHandlerHandle, channel,
configuration);
- } catch (IOException e) {
- e.printStackTrace();
- IoUtils.safeClose(channel);
- }
- }
+ final ChannelSource<StreamChannel> channelSource =
xnio.createPipeServer(executor, IoUtils.singletonHandlerFactory(new
IoHandler<StreamChannel>() {
+ public void handleOpened(final StreamChannel channel) {
+ try {
+ System.out.println("Opening channel");
+ BasicProtocol.createServer(requestHandlerHandle,
channel, configuration);
+ } catch (IOException e) {
+ e.printStackTrace();
+ IoUtils.safeClose(channel);
+ }
+ }
- public void handleReadable(final StreamChannel channel) {
- }
+ public void handleReadable(final StreamChannel channel) {
+ }
- public void handleWritable(final StreamChannel channel) {
- }
+ public void handleWritable(final StreamChannel channel) {
+ }
- public void handleClosed(final StreamChannel channel) {
- System.out.println("Closing channel");
+ public void handleClosed(final StreamChannel channel) {
+ System.out.println("Closing channel");
+ }
+ }));
+ final IoFuture<StreamChannel> futureChannel =
channelSource.open(IoUtils.nullHandler());
+ assertEquals(IoFuture.Status.DONE, futureChannel.await(1L,
TimeUnit.SECONDS));
+ final StreamChannel channel = futureChannel.get();
+ try {
+ final Handle<RequestHandler> clientHandlerHandle =
BasicProtocol.createClient(channel, configuration);
+ try {
+ final Client<Object,Object> client =
endpoint.createClient(clientHandlerHandle.getResource(), Object.class, Object.class);
+ try {
+ final IoFuture<Object> futureReply =
client.send("GORBA!");
+ assertEquals(IoFuture.Status.DONE,
futureReply.await(500L, TimeUnit.MILLISECONDS));
+ System.out.println("Reply is:" +
futureReply.get());
+ } finally {
+ IoUtils.safeClose(client);
+ }
+ } finally {
+ IoUtils.safeClose(clientHandlerHandle);
+ }
+ } finally {
+ IoUtils.safeClose(channel);
+ }
+ } finally {
+ IoUtils.safeClose(requestHandlerHandle);
+ }
+ } finally {
+ IoUtils.safeClose(endpoint);
+ }
+ } finally {
+ IoUtils.safeClose(xnio);
}
- }));
- final IoFuture<StreamChannel> futureChannel =
channelSource.open(IoUtils.nullHandler());
- final Handle<RequestHandler> clientHandlerHandle =
BasicProtocol.createClient(futureChannel.get(), configuration);
- final Client<Object,Object> client =
endpoint.createClient(clientHandlerHandle.getResource(), Object.class, Object.class);
- System.out.println("Reply is:" + client.invoke("GORBA!"));
-
+ } finally {
+ executor.shutdownNow();
+ }
}
}
Modified:
remoting3/trunk/protocol/multiplex/src/test/java/org/jboss/remoting/protocol/multiplex/ConnectionTestCase.java
===================================================================
---
remoting3/trunk/protocol/multiplex/src/test/java/org/jboss/remoting/protocol/multiplex/ConnectionTestCase.java 2008-11-19
16:22:04 UTC (rev 4705)
+++
remoting3/trunk/protocol/multiplex/src/test/java/org/jboss/remoting/protocol/multiplex/ConnectionTestCase.java 2008-11-19
19:15:56 UTC (rev 4706)
@@ -79,13 +79,9 @@
final BufferAllocator<ByteBuffer> allocator =
Buffers.createHeapByteBufferAllocator(1024);
final Xnio xnio = NioXnio.create();
try {
- final EndpointImpl remoteEndpoint = new EndpointImpl();
- remoteEndpoint.setExecutor(closeableExecutor);
- remoteEndpoint.start();
+ final EndpointImpl remoteEndpoint = new EndpointImpl(closeableExecutor,
"left-side");
try {
- final EndpointImpl endpoint = new EndpointImpl();
- endpoint.setExecutor(closeableExecutor);
- endpoint.start();
+ final EndpointImpl endpoint = new EndpointImpl(closeableExecutor,
"right-side");
try {
final CountDownLatch latch = new CountDownLatch(1);
final MultiplexConfiguration configuration = new
MultiplexConfiguration();
@@ -167,10 +163,10 @@
IoUtils.safeClose(requestHandlerSourceHandle);
}
} finally {
- endpoint.stop();
+ IoUtils.safeClose(endpoint);
}
} finally {
- remoteEndpoint.stop();
+ IoUtils.safeClose(remoteEndpoint);
}
} finally {
IoUtils.safeClose(xnio);
Modified:
remoting3/trunk/samples/src/main/java/org/jboss/remoting/samples/simple/LocalBasicExampleMain.java
===================================================================
---
remoting3/trunk/samples/src/main/java/org/jboss/remoting/samples/simple/LocalBasicExampleMain.java 2008-11-19
16:22:04 UTC (rev 4705)
+++
remoting3/trunk/samples/src/main/java/org/jboss/remoting/samples/simple/LocalBasicExampleMain.java 2008-11-19
19:15:56 UTC (rev 4706)
@@ -27,7 +27,7 @@
IoUtils.safeClose(client);
}
} finally {
- Remoting.closeEndpoint(endpoint);
+ IoUtils.safeClose(endpoint);
}
}
}
\ No newline at end of file
Modified:
remoting3/trunk/samples/src/main/java/org/jboss/remoting/samples/simple/LocalStreamExampleMain.java
===================================================================
---
remoting3/trunk/samples/src/main/java/org/jboss/remoting/samples/simple/LocalStreamExampleMain.java 2008-11-19
16:22:04 UTC (rev 4705)
+++
remoting3/trunk/samples/src/main/java/org/jboss/remoting/samples/simple/LocalStreamExampleMain.java 2008-11-19
19:15:56 UTC (rev 4706)
@@ -45,7 +45,7 @@
IoUtils.safeClose(client);
}
} finally {
- Remoting.closeEndpoint(endpoint);
+ IoUtils.safeClose(endpoint);
}
}
}
Modified: remoting3/trunk/standalone/src/main/java/org/jboss/remoting/Remoting.java
===================================================================
--- remoting3/trunk/standalone/src/main/java/org/jboss/remoting/Remoting.java 2008-11-19
16:22:04 UTC (rev 4705)
+++ remoting3/trunk/standalone/src/main/java/org/jboss/remoting/Remoting.java 2008-11-19
19:15:56 UTC (rev 4706)
@@ -1,6 +1,13 @@
package org.jboss.remoting;
import java.io.IOException;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Executor;
+import java.util.Collection;
+import java.util.Iterator;
import org.jboss.remoting.core.EndpointImpl;
import org.jboss.remoting.spi.RequestHandler;
import org.jboss.remoting.spi.RequestHandlerSource;
@@ -11,27 +18,31 @@
*
*/
public final class Remoting {
- // lifecycle lock
- private static final Object lifecycle = new Object();
- public static Endpoint createEndpoint(String name) throws IOException {
- synchronized (lifecycle) {
- final EndpointImpl endpointImpl = new EndpointImpl();
- endpointImpl.setName(name);
- endpointImpl.start();
- return endpointImpl;
- }
+ public static Endpoint createEndpoint(final String name) {
+ return createEndpoint(name, 10);
}
- public static void closeEndpoint(Endpoint endpoint) {
- synchronized (lifecycle) {
- if (endpoint instanceof EndpointImpl) {
- final EndpointImpl endpointImpl = (EndpointImpl) endpoint;
- endpointImpl.stop();
+ public static Endpoint createEndpoint(final String name, final int maxThreads) {
+ final ThreadPoolExecutor executor = new ThreadPoolExecutor(0, maxThreads,
Long.MAX_VALUE, TimeUnit.NANOSECONDS, new AlwaysBlockingQueue<Runnable>(new
SynchronousQueue<Runnable>()), new ThreadPoolExecutor.AbortPolicy());
+ final EndpointImpl endpoint = new EndpointImpl(executor, name);
+ endpoint.addCloseHandler(new CloseHandler<Endpoint>() {
+ public void handleClose(final Endpoint closed) {
+ executor.shutdown();
+ try {
+ executor.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
}
- }
+ });
+ return endpoint;
}
+ public static Endpoint createEndpoint(final Executor executor, final String name) {
+ return new EndpointImpl(executor, name);
+ }
+
public static <I, O> Client<I, O> createLocalClient(final Endpoint
endpoint, final RequestListener<I, O> requestListener, final Class<I>
requestClass, final Class<O> replyClass) throws IOException {
final Handle<RequestHandler> handle =
endpoint.createRequestHandler(requestListener, requestClass, replyClass);
try {
@@ -50,6 +61,128 @@
}
}
+ private static class AlwaysBlockingQueue<T> implements BlockingQueue<T>
{
+ private final BlockingQueue<T> delegate;
+
+ public AlwaysBlockingQueue(final BlockingQueue<T> delegate) {
+ this.delegate = delegate;
+ }
+
+ public boolean offer(final T o) {
+ try {
+ delegate.put(o);
+ return true;
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ return false;
+ }
+ }
+
+ public boolean offer(final T o, final long timeout, final TimeUnit unit) throws
InterruptedException {
+ return delegate.offer(o, timeout, unit);
+ }
+
+ public T poll(final long timeout, final TimeUnit unit) throws
InterruptedException {
+ return delegate.poll(timeout, unit);
+ }
+
+ public T take() throws InterruptedException {
+ return delegate.take();
+ }
+
+ public void put(final T o) throws InterruptedException {
+ delegate.put(o);
+ }
+
+ public int remainingCapacity() {
+ return delegate.remainingCapacity();
+ }
+
+ public boolean add(final T o) {
+ return delegate.add(o);
+ }
+
+ public int drainTo(final Collection<? super T> c) {
+ return delegate.drainTo(c);
+ }
+
+ public int drainTo(final Collection<? super T> c, final int maxElements) {
+ return delegate.drainTo(c, maxElements);
+ }
+
+ public T poll() {
+ return delegate.poll();
+ }
+
+ public T remove() {
+ return delegate.remove();
+ }
+
+ public T peek() {
+ return delegate.peek();
+ }
+
+ public T element() {
+ return delegate.element();
+ }
+
+ public int size() {
+ return delegate.size();
+ }
+
+ public boolean isEmpty() {
+ return delegate.isEmpty();
+ }
+
+ public boolean contains(final Object o) {
+ return delegate.contains(o);
+ }
+
+ public Iterator<T> iterator() {
+ return delegate.iterator();
+ }
+
+ public Object[] toArray() {
+ return delegate.toArray();
+ }
+
+ public <T> T[] toArray(final T[] a) {
+ return delegate.toArray(a);
+ }
+
+ public boolean remove(final Object o) {
+ return delegate.remove(o);
+ }
+
+ public boolean containsAll(final Collection<?> c) {
+ return delegate.containsAll(c);
+ }
+
+ public boolean addAll(final Collection<? extends T> c) {
+ return delegate.addAll(c);
+ }
+
+ public boolean removeAll(final Collection<?> c) {
+ return delegate.removeAll(c);
+ }
+
+ public boolean retainAll(final Collection<?> c) {
+ return delegate.retainAll(c);
+ }
+
+ public void clear() {
+ delegate.clear();
+ }
+
+ public boolean equals(final Object o) {
+ return delegate.equals(o);
+ }
+
+ public int hashCode() {
+ return delegate.hashCode();
+ }
+ }
+
// privates
private Remoting() { /* empty */ }