JBoss Remoting SVN: r4644 - remoting2/branches/2.x.
by jboss-remoting-commits@lists.jboss.org
Author: trustin
Date: 2008-11-04 04:14:05 -0500 (Tue, 04 Nov 2008)
New Revision: 4644
Modified:
remoting2/branches/2.x/.classpath
Log:
Fixed build path errors in the Eclipse .classpath
Modified: remoting2/branches/2.x/.classpath
===================================================================
--- remoting2/branches/2.x/.classpath 2008-11-04 03:15:22 UTC (rev 4643)
+++ remoting2/branches/2.x/.classpath 2008-11-04 09:14:05 UTC (rev 4644)
@@ -3,14 +3,9 @@
<classpathentry kind="src" path="src/main"/>
<classpathentry kind="src" path="src/tests"/>
<classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER"/>
- <classpathentry kind="lib" path="lib/apache-tomcat/tomcat-apr.jar"/>
- <classpathentry kind="lib" path="lib/jboss/jboss-common.jar"/>
<classpathentry kind="lib" path="lib/apache-commons/lib/commons-httpclient.jar"/>
<classpathentry kind="lib" path="lib/apache-commons/lib/commons-logging-api.jar"/>
<classpathentry kind="lib" path="lib/apache-log4j/lib/log4j.jar"/>
- <classpathentry kind="lib" path="lib/apache-tomcat/tomcat-coyote.jar"/>
- <classpathentry kind="lib" path="lib/apache-tomcat/tomcat-http.jar"/>
- <classpathentry kind="lib" path="lib/apache-tomcat/tomcat-util.jar"/>
<classpathentry kind="lib" path="lib/dom4j/lib/dom4j.jar"/>
<classpathentry kind="lib" path="lib/jboss/jboss-jmx.jar"/>
<classpathentry kind="lib" path="lib/jboss/jboss-serialization.jar"/>
@@ -32,7 +27,12 @@
<classpathentry kind="lib" path="lib/spring/spring-web.jar"/>
<classpathentry kind="lib" path="lib/spring/spring-webmvc.jar"/>
<classpathentry kind="lib" path="lib/trove/lib/trove.jar"/>
+ <classpathentry kind="lib" path="lib/jgroups/lib/jgroups-all.jar"/>
+ <classpathentry kind="lib" path="lib/jboss/jboss-common-core.jar"/>
+ <classpathentry kind="lib" path="lib/jboss/jboss-j2se.jar"/>
+ <classpathentry kind="lib" path="lib/jboss/jboss-logging-log4j.jar"/>
+ <classpathentry kind="lib" path="lib/jboss/jboss-logging-spi.jar"/>
+ <classpathentry kind="lib" path="lib/jbossweb/jbossweb.jar"/>
<classpathentry kind="var" path="ANT_HOME/lib/ant-junit.jar"/>
- <classpathentry kind="lib" path="lib/jgroups/lib/jgroups-all.jar"/>
<classpathentry kind="output" path="bin"/>
</classpath>
16 years
JBoss Remoting SVN: r4643 - in remoting3/trunk: core/src/main/java/org/jboss/remoting/core and 4 other directories.
by jboss-remoting-commits@lists.jboss.org
Author: david.lloyd(a)jboss.com
Date: 2008-11-03 22:15:22 -0500 (Mon, 03 Nov 2008)
New Revision: 4643
Modified:
remoting3/trunk/api/src/main/java/org/jboss/remoting/spi/AbstractHandleableCloseable.java
remoting3/trunk/core/src/main/java/org/jboss/remoting/core/EndpointImpl.java
remoting3/trunk/core/src/test/java/org/jboss/remoting/core/EndpointTestCase.java
remoting3/trunk/protocol/basic/src/test/java/org/jboss/remoting/protocol/basic/BasicTestCase.java
remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/MultiplexProtocol.java
remoting3/trunk/testing-support/src/main/resources/testing.policy
Log:
Various test fixes
Modified: remoting3/trunk/api/src/main/java/org/jboss/remoting/spi/AbstractHandleableCloseable.java
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/remoting/spi/AbstractHandleableCloseable.java 2008-11-04 02:34:36 UTC (rev 4642)
+++ remoting3/trunk/api/src/main/java/org/jboss/remoting/spi/AbstractHandleableCloseable.java 2008-11-04 03:15:22 UTC (rev 4643)
@@ -99,6 +99,7 @@
/**
* {@inheritDoc}
*/
+ @SuppressWarnings({ "unchecked" })
public final void close() throws IOException {
if (! closed.getAndSet(true)) {
log.trace("Closed %s", this);
@@ -113,7 +114,7 @@
}
});
} catch (RejectedExecutionException ree) {
- log.warn("Unable to execute close handler (execution rejected) for %s (%s)", this, ree.getMessage());
+ SpiUtils.safeHandleClose(handler, (T) AbstractHandleableCloseable.this);
}
}
closeHandlers = null;
@@ -134,7 +135,12 @@
closeHandlers.add(handler);
return new Key() {
public void remove() {
- closeHandlers.remove(handler);
+ synchronized (closeLock) {
+ final Set<CloseHandler<? super T>> closeHandlers = AbstractHandleableCloseable.this.closeHandlers;
+ if (closeHandlers != null) {
+ closeHandlers.remove(handler);
+ }
+ }
}
};
}
Modified: remoting3/trunk/core/src/main/java/org/jboss/remoting/core/EndpointImpl.java
===================================================================
--- remoting3/trunk/core/src/main/java/org/jboss/remoting/core/EndpointImpl.java 2008-11-04 02:34:36 UTC (rev 4642)
+++ remoting3/trunk/core/src/main/java/org/jboss/remoting/core/EndpointImpl.java 2008-11-04 03:15:22 UTC (rev 4643)
@@ -11,7 +11,6 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
-import java.security.AccessController;
import org.jboss.remoting.Client;
import org.jboss.remoting.ClientSource;
import org.jboss.remoting.CloseHandler;
@@ -149,7 +148,10 @@
}
public <I, O> Handle<RequestHandler> createRequestHandler(final RequestListener<I, O> requestListener, final Class<I> requestClass, final Class<O> replyClass) throws IOException {
- AccessController.checkPermission(CREATE_REQUEST_HANDLER_PERM);
+ SecurityManager sm = System.getSecurityManager();
+ if (sm != null) {
+ sm.checkPermission(CREATE_REQUEST_HANDLER_PERM);
+ }
LocalRequestHandler.Config<I, O> config = new LocalRequestHandler.Config<I,O>(requestClass, replyClass);
config.setExecutor(executor);
config.setRequestListener(requestListener);
@@ -161,7 +163,10 @@
}
public <I, O> Handle<RequestHandlerSource> registerService(final LocalServiceConfiguration<I, O> configuration) throws IOException {
- AccessController.checkPermission(REGISTER_SERVICE_PERM);
+ SecurityManager sm = System.getSecurityManager();
+ if (sm != null) {
+ sm.checkPermission(REGISTER_SERVICE_PERM);
+ }
final String serviceType = configuration.getServiceType();
final String groupName = configuration.getGroupName();
final int metric = configuration.getMetric();
@@ -214,7 +219,10 @@
}
public <I, O> Client<I, O> createClient(final RequestHandler requestHandler, final Class<I> requestType, final Class<O> replyType) throws IOException {
- AccessController.checkPermission(CREATE_CLIENT_PERM);
+ SecurityManager sm = System.getSecurityManager();
+ if (sm != null) {
+ sm.checkPermission(CREATE_CLIENT_PERM);
+ }
boolean ok = false;
final Handle<RequestHandler> handle = requestHandler.getHandle();
try {
@@ -234,7 +242,10 @@
}
public <I, O> ClientSource<I, O> createClientSource(final RequestHandlerSource requestHandlerSource, final Class<I> requestClass, final Class<O> replyClass) throws IOException {
- AccessController.checkPermission(CREATE_CLIENT_SOURCE_PERM);
+ SecurityManager sm = System.getSecurityManager();
+ if (sm != null) {
+ sm.checkPermission(CREATE_CLIENT_SOURCE_PERM);
+ }
boolean ok = false;
final Handle<RequestHandlerSource> handle = requestHandlerSource.getHandle();
try {
@@ -324,7 +335,10 @@
}
public SimpleCloseable registerRemoteService(final RemoteServiceConfiguration configuration) throws IllegalArgumentException, IOException {
- AccessController.checkPermission(REGISTER_REMOTE_SERVICE_PERM);
+ SecurityManager sm = System.getSecurityManager();
+ if (sm != null) {
+ sm.checkPermission(REGISTER_REMOTE_SERVICE_PERM);
+ }
final RequestHandlerSource handlerSource = configuration.getRequestHandlerSource();
final String serviceType = configuration.getServiceType();
final String groupName = configuration.getGroupName();
@@ -381,7 +395,10 @@
}
public SimpleCloseable addServiceListener(final ServiceListener serviceListener, final boolean onlyNew) {
- AccessController.checkPermission(ADD_SERVICE_LISTENER_PERM);
+ SecurityManager sm = System.getSecurityManager();
+ if (sm != null) {
+ sm.checkPermission(ADD_SERVICE_LISTENER_PERM);
+ }
final Object key = new Object();
synchronized (serviceLock) {
final ServiceListenerRegistration registration = new ServiceListenerRegistration(serviceListener);
Modified: remoting3/trunk/core/src/test/java/org/jboss/remoting/core/EndpointTestCase.java
===================================================================
--- remoting3/trunk/core/src/test/java/org/jboss/remoting/core/EndpointTestCase.java 2008-11-04 02:34:36 UTC (rev 4642)
+++ remoting3/trunk/core/src/test/java/org/jboss/remoting/core/EndpointTestCase.java 2008-11-04 03:15:22 UTC (rev 4643)
@@ -37,6 +37,7 @@
import org.jboss.remoting.spi.RequestHandler;
import org.jboss.remoting.spi.Handle;
import org.jboss.xnio.IoUtils;
+import org.jboss.xnio.log.Logger;
/**
*
@@ -46,6 +47,8 @@
LoggingHelper.init();
}
+ private static final Logger log = Logger.getLogger(EndpointTestCase.class);
+
private static void safeStop(EndpointImpl endpoint) {
try {
endpoint.stop();
@@ -92,7 +95,7 @@
}
}
}
- }, INIT_ME, INIT_ME2);
+ }, Object.class, Object.class);
final RequestHandler requestHandler = handle.getResource();
try {
requestHandler.addCloseHandler(new CloseHandler<RequestHandler>() {
@@ -100,7 +103,7 @@
clientEndpointClosed.set(true);
}
});
- final Client<Object,Object> client = endpoint.createClient(requestHandler, requestType, replyType);
+ final Client<Object,Object> client = endpoint.createClient(requestHandler, Object.class, Object.class);
try {
client.addCloseHandler(new CloseHandler<Client<Object, Object>>() {
public void handleClose(final Client<Object, Object> closed) {
@@ -144,14 +147,10 @@
try {
context.sendReply(replyObj);
} catch (IOException e) {
- try {
- context.sendFailure(e.getMessage(), e);
- } catch (IOException e1) {
- fail("double fault");
- }
+ log.error(e, "Error sending reply!");
}
}
- }, INIT_ME, INIT_ME2);
+ }, Object.class, Object.class);
final RequestHandler requestHandler = handle.getResource();
try {
requestHandler.addCloseHandler(new CloseHandler<RequestHandler>() {
@@ -159,7 +158,7 @@
clientEndpointClosed.set(true);
}
});
- final Client<Object,Object> client = endpoint.createClient(requestHandler, requestType, replyType);
+ final Client<Object,Object> client = endpoint.createClient(requestHandler, Object.class, Object.class);
try {
client.addCloseHandler(new CloseHandler<Client<Object, Object>>() {
public void handleClose(final Client<Object, Object> closed) {
Modified: remoting3/trunk/protocol/basic/src/test/java/org/jboss/remoting/protocol/basic/BasicTestCase.java
===================================================================
--- remoting3/trunk/protocol/basic/src/test/java/org/jboss/remoting/protocol/basic/BasicTestCase.java 2008-11-04 02:34:36 UTC (rev 4642)
+++ remoting3/trunk/protocol/basic/src/test/java/org/jboss/remoting/protocol/basic/BasicTestCase.java 2008-11-04 03:15:22 UTC (rev 4643)
@@ -77,7 +77,7 @@
}
}
}
- }, INIT_ME, INIT_ME2);
+ }, Object.class, Object.class);
final ChannelSource<StreamChannel> channelSource = xnio.createPipeServer(executor, IoUtils.singletonHandlerFactory(new IoHandler<StreamChannel>() {
public void handleOpened(final StreamChannel channel) {
try {
@@ -101,7 +101,7 @@
}));
final IoFuture<StreamChannel> futureChannel = channelSource.open(IoUtils.nullHandler());
final Handle<RequestHandler> clientHandlerHandle = BasicProtocol.createClient(futureChannel.get(), configuration);
- final Client<Object,Object> client = endpoint.createClient(clientHandlerHandle.getResource(), requestType, replyType);
+ final Client<Object,Object> client = endpoint.createClient(clientHandlerHandle.getResource(), Object.class, Object.class);
System.out.println("Reply is:" + client.invoke("GORBA!"));
}
Modified: remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/MultiplexProtocol.java
===================================================================
--- remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/MultiplexProtocol.java 2008-11-04 02:34:36 UTC (rev 4642)
+++ remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/MultiplexProtocol.java 2008-11-04 03:15:22 UTC (rev 4643)
@@ -48,7 +48,9 @@
/**
* Create a request server for the multiplex protocol.
*
- * @return a handler factory for passing to an XNIO server @param executor the executor to use for invocations @param configuration
+ * @param endpoint the endpoint
+ * @param configuration the configuration
+ * @return a handler factory for passing to an XNIO server
*/
public static IoHandlerFactory<AllocatedMessageChannel> createServer(final Endpoint endpoint, final MultiplexConfiguration configuration) {
return new IoHandlerFactory<AllocatedMessageChannel>() {
@@ -61,8 +63,11 @@
/**
* Create a request client for the multiplex protocol.
*
+ * @param endpoint the endpoint
+ * @param configuration the configuration
+ * @param channelSource the XNIO channel source to use to establish the connection @param allocator the buffer allocator to use
* @return a handle which may be used to close the connection
- * @throws IOException if an error occurs @param executor the executor to use for invocations @param channelSource the XNIO channel source to use to establish the connection @param allocator the buffer allocator to use @param configuration
+ * @throws IOException if an error occurs
*/
public static IoFuture<SimpleCloseable> connect(final Endpoint endpoint, final MultiplexConfiguration configuration, final ChannelSource<AllocatedMessageChannel> channelSource) throws IOException {
final MultiplexHandler multiplexHandler = new MultiplexHandler(endpoint, configuration);
@@ -85,7 +90,7 @@
}
public String toString() {
- return "connection <" + Integer.toString(hashCode()) + ">";
+ return "Remoting multiplex connection <" + Integer.toString(hashCode()) + ">";
}
}
}
Modified: remoting3/trunk/testing-support/src/main/resources/testing.policy
===================================================================
--- remoting3/trunk/testing-support/src/main/resources/testing.policy 2008-11-04 02:34:36 UTC (rev 4642)
+++ remoting3/trunk/testing-support/src/main/resources/testing.policy 2008-11-04 03:15:22 UTC (rev 4643)
@@ -9,12 +9,16 @@
grant codeBase "file:${build.home}/core/target/test/classes/-"
{
permission java.lang.RuntimePermission "modifyThread"; // for executor control
+ permission org.jboss.remoting.EndpointPermission "createRequestHandler";
+ permission org.jboss.remoting.EndpointPermission "createClient";
};
grant codeBase "file:${build.home}/protocol/basic/target/test/classes/-"
{
permission java.lang.RuntimePermission "modifyThread"; // for executor control
permission java.net.SocketPermission "*:*", "accept, connect, resolve";
+ permission org.jboss.remoting.EndpointPermission "createRequestHandler";
+ permission org.jboss.remoting.EndpointPermission "createClient";
};
grant codeBase "file:${build.home}/protocol/multiplex/target/test/classes/-"
@@ -33,6 +37,7 @@
grant codeBase "file:${build.home}/core/target/main/classes/-"
{
permission java.util.PropertyPermission "jboss.remoting.*", "read";
+ permission org.jboss.remoting.EndpointPermission "*";
};
grant codeBase "file:${build.home}/protocol/basic/target/main/classes/-"
16 years
JBoss Remoting SVN: r4642 - in remoting3/trunk: api/src/main/java/org/jboss/remoting and 12 other directories.
by jboss-remoting-commits@lists.jboss.org
Author: david.lloyd(a)jboss.com
Date: 2008-11-03 21:34:36 -0500 (Mon, 03 Nov 2008)
New Revision: 4642
Added:
remoting3/trunk/api/src/main/java/org/jboss/remoting/EndpointPermission.java
remoting3/trunk/api/src/main/java/org/jboss/remoting/LocalServiceConfiguration.java
remoting3/trunk/api/src/main/java/org/jboss/remoting/RemoteRequestException.java
remoting3/trunk/api/src/main/java/org/jboss/remoting/RemoteServiceConfiguration.java
remoting3/trunk/api/src/main/java/org/jboss/remoting/spi/AutoCloseable.java
remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/IdentityHashIntegerBiMap.java
remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/IdentityHashIntegerResourceBiMap.java
remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/IntegerBiMap.java
remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/IntegerResourceBiMap.java
Removed:
remoting3/trunk/api/src/main/java/org/jboss/remoting/ClientSourceWrapper.java
remoting3/trunk/api/src/main/java/org/jboss/remoting/ClientWrapper.java
remoting3/trunk/api/src/main/java/org/jboss/remoting/EndpointWrapper.java
remoting3/trunk/api/src/main/java/org/jboss/remoting/IOExceptionCarrier.java
remoting3/trunk/api/src/main/java/org/jboss/remoting/RequestContextWrapper.java
remoting3/trunk/api/src/main/java/org/jboss/remoting/stream/ObjectSinkWrapper.java
remoting3/trunk/api/src/main/java/org/jboss/remoting/stream/ObjectSourceWrapper.java
remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/IdentityIntMap.java
remoting3/trunk/util/src/main/java/org/jboss/remoting/util/ConcurrentIntegerMap.java
remoting3/trunk/util/src/main/java/org/jboss/remoting/util/EmulatedConcurrentIntegerHashMap.java
Modified:
remoting3/trunk/api/src/main/java/org/jboss/remoting/Endpoint.java
remoting3/trunk/api/src/main/java/org/jboss/remoting/HandleableCloseable.java
remoting3/trunk/api/src/main/java/org/jboss/remoting/IndeterminateOutcomeException.java
remoting3/trunk/api/src/main/java/org/jboss/remoting/RemoteReplyException.java
remoting3/trunk/api/src/main/java/org/jboss/remoting/ReplyException.java
remoting3/trunk/api/src/main/java/org/jboss/remoting/RequestContext.java
remoting3/trunk/api/src/main/java/org/jboss/remoting/spi/AbstractAutoCloseable.java
remoting3/trunk/api/src/main/java/org/jboss/remoting/spi/AbstractHandleableCloseable.java
remoting3/trunk/api/src/main/java/org/jboss/remoting/spi/Handle.java
remoting3/trunk/api/src/main/java/org/jboss/remoting/spi/RequestHandler.java
remoting3/trunk/api/src/main/java/org/jboss/remoting/spi/RequestHandlerSource.java
remoting3/trunk/api/src/test/java/org/jboss/remoting/spi/CloseableTestCase.java
remoting3/trunk/build.properties
remoting3/trunk/core/src/main/java/org/jboss/remoting/core/ClientExternalizer.java
remoting3/trunk/core/src/main/java/org/jboss/remoting/core/ClientImpl.java
remoting3/trunk/core/src/main/java/org/jboss/remoting/core/ClientSourceExternalizer.java
remoting3/trunk/core/src/main/java/org/jboss/remoting/core/ClientSourceImpl.java
remoting3/trunk/core/src/main/java/org/jboss/remoting/core/EndpointImpl.java
remoting3/trunk/core/src/main/java/org/jboss/remoting/core/FutureReplyImpl.java
remoting3/trunk/core/src/main/java/org/jboss/remoting/core/LocalRequestHandler.java
remoting3/trunk/core/src/main/java/org/jboss/remoting/core/LocalRequestHandlerSource.java
remoting3/trunk/core/src/main/java/org/jboss/remoting/core/RequestContextImpl.java
remoting3/trunk/core/src/test/java/org/jboss/remoting/core/EndpointTestCase.java
remoting3/trunk/protocol/basic/src/main/java/org/jboss/remoting/protocol/basic/BasicConfiguration.java
remoting3/trunk/protocol/basic/src/main/java/org/jboss/remoting/protocol/basic/BasicProtocol.java
remoting3/trunk/protocol/basic/src/test/java/org/jboss/remoting/protocol/basic/BasicTestCase.java
remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/MessageType.java
remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/MultiplexConfiguration.java
remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/MultiplexHandler.java
remoting3/trunk/samples/src/main/java/org/jboss/remoting/samples/simple/LocalBasicExampleMain.java
remoting3/trunk/samples/src/main/java/org/jboss/remoting/samples/simple/LocalStreamExampleMain.java
remoting3/trunk/standalone/src/main/java/org/jboss/remoting/Remoting.java
remoting3/trunk/transporter/src/main/java/org/jboss/remoting/transporter/Transporter.java
remoting3/trunk/util/src/main/java/org/jboss/remoting/util/CollectionUtil.java
Log:
More massive cleanup; more marshaller integration work; type safety strengthening; exception cleanup; various bug fixes...
Deleted: remoting3/trunk/api/src/main/java/org/jboss/remoting/ClientSourceWrapper.java
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/remoting/ClientSourceWrapper.java 2008-10-30 05:00:22 UTC (rev 4641)
+++ remoting3/trunk/api/src/main/java/org/jboss/remoting/ClientSourceWrapper.java 2008-11-04 02:34:36 UTC (rev 4642)
@@ -1,50 +0,0 @@
-package org.jboss.remoting;
-
-import org.jboss.remoting.CloseHandler;
-import org.jboss.remoting.Client;
-import org.jboss.remoting.ClientSource;
-import java.io.IOException;
-
-/**
- * A simple delegating wrapper for client sources.
- *
- * @param <I> the request type
- * @param <O> the reply type
- */
-public class ClientSourceWrapper<I, O> implements ClientSource<I, O> {
- private final ClientSource<I, O> delegate;
-
- /**
- * Construct a new instance. Calls will be sent to the given {@code delegate} by default.
- *
- * @param delegate the delegate client instance
- */
- protected ClientSourceWrapper(ClientSource<I, O> delegate) {
- this.delegate = delegate;
- }
-
- /**
- * {@inheritDoc} This implementation calls the same method on the delegate object.
- */
- public void close() throws IOException {
- delegate.close();
- }
-
- /**
- * {@inheritDoc} This implementation calls the same method on the delegate object.
- */
- public void addCloseHandler(final CloseHandler<? super ClientSource<I, O>> closeHandler) {
- delegate.addCloseHandler(new CloseHandler<ClientSource<I, O>>() {
- public void handleClose(final ClientSource<I, O> closed) {
- closeHandler.handleClose(ClientSourceWrapper.this);
- }
- });
- }
-
- /**
- * {@inheritDoc} This implementation calls the same method on the delegate object.
- */
- public Client<I, O> createClient() throws IOException {
- return delegate.createClient();
- }
-}
Deleted: remoting3/trunk/api/src/main/java/org/jboss/remoting/ClientWrapper.java
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/remoting/ClientWrapper.java 2008-10-30 05:00:22 UTC (rev 4641)
+++ remoting3/trunk/api/src/main/java/org/jboss/remoting/ClientWrapper.java 2008-11-04 02:34:36 UTC (rev 4642)
@@ -1,65 +0,0 @@
-package org.jboss.remoting;
-
-import java.util.concurrent.ConcurrentMap;
-import java.io.IOException;
-import org.jboss.remoting.CloseHandler;
-import org.jboss.remoting.Client;
-import org.jboss.xnio.IoFuture;
-
-/**
- * A simple delegating wrapper for clients.
- *
- * @param <I> the request type
- * @param <O> the reply type
- */
-public class ClientWrapper<I, O> implements Client<I, O> {
- protected final Client<I, O> delegate;
-
- /**
- * Construct a new instance. Calls will be sent to the given {@code delegate} by default.
- *
- * @param delegate the delegate client instance
- */
- protected ClientWrapper(final Client<I, O> delegate) {
- this.delegate = delegate;
- }
-
- /**
- * {@inheritDoc} This implementation calls the same method on the delegate object.
- */
- public void close() throws IOException {
- delegate.close();
- }
-
- /**
- * {@inheritDoc} This implementation calls the same method on the delegate object.
- */
- public void addCloseHandler(final CloseHandler<? super Client<I, O>> closeHandler) {
- delegate.addCloseHandler(new CloseHandler<Client<I, O>>() {
- public void handleClose(final Client<I, O> closed) {
- closeHandler.handleClose(ClientWrapper.this);
- }
- });
- }
-
- /**
- * {@inheritDoc} This implementation calls the same method on the delegate object.
- */
- public O invoke(final I request) throws IOException {
- return delegate.invoke(request);
- }
-
- /**
- * {@inheritDoc} This implementation calls the same method on the delegate object.
- */
- public IoFuture<O> send(final I request) throws IOException {
- return delegate.send(request);
- }
-
- /**
- * {@inheritDoc} This implementation calls the same method on the delegate object.
- */
- public ConcurrentMap<Object, Object> getAttributes() {
- return delegate.getAttributes();
- }
-}
Modified: remoting3/trunk/api/src/main/java/org/jboss/remoting/Endpoint.java
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/remoting/Endpoint.java 2008-10-30 05:00:22 UTC (rev 4641)
+++ remoting3/trunk/api/src/main/java/org/jboss/remoting/Endpoint.java 2008-11-04 02:34:36 UTC (rev 4642)
@@ -33,54 +33,61 @@
* Create a request handler that can be used to receive incoming requests on this endpoint. The client may be passed to a
* remote endpoint as part of a request or a reply, or it may be used locally.
*
- * You must have the TODO permission to invoke this method.
+ * You must have the {@link org.jboss.remoting.EndpointPermission createRequestHandler EndpointPermission} to invoke this method.
*
* @param <I> the request type
* @param <O> the reply type
* @param requestListener the request listener
+ * @param requestClass the class of requests sent to this request listener
+ * @param replyClass the class of replies received back from this request listener
* @return a handle for the client
* @throws IOException if an error occurs
*/
- <I, O> Handle<RequestHandler> createRequestHandler(RequestListener<I, O> requestListener) throws IOException;
+ <I, O> Handle<RequestHandler> createRequestHandler(RequestListener<I, O> requestListener, final Class<I> requestClass, final Class<O> replyClass) throws IOException;
/**
* Create a request handler source that can be used to acquire clients associated with a request listener on this endpoint.
- * The request handler source may be passed to a remote endpoint as part of a request or a reply, or it may be used locally.
+ * The request handler source may be ignored, passed to a remote endpoint as part of a request or a reply, or used locally.
* The objects that are produced by this method may be used to mass-produce {@code RequestHandler} instances.
*
- * You must have the TODO permission to invoke this method.
+ * You must have the {@link org.jboss.remoting.EndpointPermission registerService EndpointPermission} to invoke this method.
*
* @param <I> the request type
* @param <O> the reply type
- * @param requestListener the request listener
- * @param serviceType the type of service to advertise
- * @param groupName the name of the group of this type to be part of
- * @return a handle for the client source
+ * @param configuration the configuration to use
* @throws IOException if an error occurs
*/
- <I, O> Handle<RequestHandlerSource> createRequestHandlerSource(RequestListener<I, O> requestListener, String serviceType, String groupName) throws IOException;
+ <I, O> Handle<RequestHandlerSource> registerService(LocalServiceConfiguration<I, O> configuration) throws IOException;
/**
* Create a client that uses the given request handler to handle its requests.
*
+ * You must have the {@link org.jboss.remoting.EndpointPermission createClient EndpointPermission} to invoke this method.
+ *
* @param <I> the request type
* @param <O> the reply type
* @param handler the request handler
+ * @param requestClass the class of requests sent through this client
+ * @param replyClass the class of replies received back through this client
* @return the client
* @throws IOException if an error occurs
*/
- <I, O> Client<I, O> createClient(RequestHandler handler) throws IOException;
+ <I, O> Client<I, O> createClient(RequestHandler handler, Class<I> requestClass, Class<O> replyClass) throws IOException;
/**
* Create a client source that uses the given request handler source to generate clients.
*
+ * You must have the {@link org.jboss.remoting.EndpointPermission createClientSource EndpointPermission} to invoke this method.
+ *
* @param <I> the request type
* @param <O> the reply type
* @param handlerSource the request handler source
+ * @param requestClass the class of requests sent through this client source
+ * @param replyClass the class of replies received back through this client source
* @return the client source
* @throws IOException if an error occurs
*/
- <I, O> ClientSource<I, O> createClientSource(RequestHandlerSource handlerSource) throws IOException;
+ <I, O> ClientSource<I, O> createClientSource(RequestHandlerSource handlerSource, Class<I> requestClass, Class<O> replyClass) throws IOException;
/**
* Attempt to locate a service. The return value then be queried for the service's {@code ClientSource}.
@@ -88,32 +95,33 @@
* @param <I> the request type
* @param <O> the reply type
* @param serviceUri the URI of the service
+ * @param requestClass the class of requests sent through the client source
+ * @param replyClass the class of replies received back through the client source
* @return the future service
* @throws IllegalArgumentException if the given URI is not a valid Remoting service URI
*/
- <I, O> IoFuture<ClientSource<I, O>> locateService(URI serviceUri) throws IllegalArgumentException;
+ <I, O> IoFuture<ClientSource<I, O>> locateService(URI serviceUri, Class<I> requestClass, Class<O> replyClass) throws IllegalArgumentException;
/**
* Register a remotely available service.<p>
* The remote endpoint may not have the same name as this endpoint. The group name and service type must be
* non-{@code null} and non-empty. The metric must be greater than zero.
*
- * @param serviceType the service type string
- * @param groupName the group name
- * @param endpointName the name of the remote endpoint
- * @param handlerSource the remote handler source
- * @param metric the preference metric, lower is more preferred
- * @return a handle corresponding to the registration
+ * You must have the {@link org.jboss.remoting.EndpointPermission registerRemoteService EndpointPermission} to invoke this method.
+ *
+ * @param configuration the remote service configuration
* @throws IllegalArgumentException if one of the given arguments was not valid
* @throws IOException if an error occurs with the registration
*/
- SimpleCloseable registerRemoteService(String serviceType, String groupName, String endpointName, RequestHandlerSource handlerSource, int metric) throws IllegalArgumentException, IOException;
+ SimpleCloseable registerRemoteService(RemoteServiceConfiguration configuration) throws IllegalArgumentException, IOException;
/**
* Add a listener for observing when local and remote services are added. The caller may specify whether the listener
* should be notified of the complete list of currently registered services (set {@code onlyNew} to {@code false})
* or only services registered after the time of calling this method (set {@code onlyNew} to {@code true}).
*
+ * You must have the {@link org.jboss.remoting.EndpointPermission addServiceListener EndpointPermission} to invoke this method.
+ *
* @param serviceListener the listener
* @param onlyNew {@code true} if only new registrations should be sent to the listener
* @return a handle which may be used to unregister the listener
Added: remoting3/trunk/api/src/main/java/org/jboss/remoting/EndpointPermission.java
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/remoting/EndpointPermission.java (rev 0)
+++ remoting3/trunk/api/src/main/java/org/jboss/remoting/EndpointPermission.java 2008-11-04 02:34:36 UTC (rev 4642)
@@ -0,0 +1,61 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2008, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.remoting;
+
+import java.security.BasicPermission;
+
+/**
+ * This class is for permissions relating to Remoting endpoints.
+ */
+public class EndpointPermission extends BasicPermission {
+
+ private static final long serialVersionUID = 4984517897378387571L;
+
+ /**
+ * Creates a new {@code EndpointPermission} object with the specified name.
+ * The name is the symbolic name of the {@code EndpointPermission}.
+ *
+ * @param name the name of the {@code EndpointPermission}
+ *
+ * @throws NullPointerException if {@code name} is {@code null}
+ * @throws IllegalArgumentException if {@code name} is empty
+ */
+ public EndpointPermission(String name) throws NullPointerException, IllegalArgumentException {
+ super(name);
+ }
+
+ /**
+ * Creates a new {@code EndpointPermission} object with the specified name.
+ * The name is the symbolic name of the {@code EndpointPermission}, and the
+ * actions string is currently unused.
+ *
+ * @param name the name of the {@code EndpointPermission}
+ * @param actions ignored
+ *
+ * @throws NullPointerException if {@code name} is {@code null}
+ * @throws IllegalArgumentException if {@code name} is empty
+ */
+ public EndpointPermission(String name, String actions) throws NullPointerException, IllegalArgumentException {
+ super(name, actions);
+ }
+}
Deleted: remoting3/trunk/api/src/main/java/org/jboss/remoting/EndpointWrapper.java
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/remoting/EndpointWrapper.java 2008-10-30 05:00:22 UTC (rev 4641)
+++ remoting3/trunk/api/src/main/java/org/jboss/remoting/EndpointWrapper.java 2008-11-04 02:34:36 UTC (rev 4642)
@@ -1,94 +0,0 @@
-package org.jboss.remoting;
-
-import java.util.concurrent.ConcurrentMap;
-import java.io.IOException;
-import java.net.URI;
-import org.jboss.remoting.Endpoint;
-import org.jboss.remoting.RequestListener;
-import org.jboss.remoting.Client;
-import org.jboss.remoting.ClientSource;
-import org.jboss.remoting.SimpleCloseable;
-import org.jboss.remoting.ServiceListener;
-import org.jboss.remoting.spi.RequestHandler;
-import org.jboss.remoting.spi.RequestHandlerSource;
-import org.jboss.remoting.spi.Handle;
-import org.jboss.xnio.IoFuture;
-
-/**
- * A simple delegating wrapper for endpoints.
- */
-public class EndpointWrapper implements Endpoint {
- protected final Endpoint delegate;
-
- /**
- * Construct a new instance. Calls will be sent to the given {@code delegate} by default.
- *
- * @param delegate the delegate client instance
- */
- protected EndpointWrapper(final Endpoint delegate) {
- this.delegate = delegate;
- }
-
- /**
- * {@inheritDoc} This implementation calls the same method on the delegate object.
- */
- public ConcurrentMap<Object, Object> getAttributes() {
- return delegate.getAttributes();
- }
-
- /**
- * {@inheritDoc} This implementation calls the same method on the delegate object.
- */
- public String getName() {
- return delegate.getName();
- }
-
- /**
- * {@inheritDoc} This implementation calls the same method on the delegate object.
- */
- public <I, O> Handle<RequestHandler> createRequestHandler(final RequestListener<I, O> requestListener) throws IOException {
- return delegate.createRequestHandler(requestListener);
- }
-
- /**
- * {@inheritDoc} This implementation calls the same method on the delegate object.
- */
- public <I, O> Handle<RequestHandlerSource> createRequestHandlerSource(final RequestListener<I, O> requestListener, final String serviceType, final String groupName) throws IOException {
- return delegate.createRequestHandlerSource(requestListener, serviceType, groupName);
- }
-
- /**
- * {@inheritDoc} This implementation calls the same method on the delegate object.
- */
- public <I, O> Client<I, O> createClient(final RequestHandler handler) throws IOException {
- return delegate.createClient(handler);
- }
-
- /**
- * {@inheritDoc} This implementation calls the same method on the delegate object.
- */
- public <I, O> ClientSource<I, O> createClientSource(final RequestHandlerSource handlerSource) throws IOException {
- return delegate.createClientSource(handlerSource);
- }
-
- /**
- * {@inheritDoc} This implementation calls the same method on the delegate object.
- */
- public <I, O> IoFuture<ClientSource<I, O>> locateService(final URI serviceUri) throws IllegalArgumentException {
- return delegate.locateService(serviceUri);
- }
-
- /**
- * {@inheritDoc} This implementation calls the same method on the delegate object.
- */
- public SimpleCloseable registerRemoteService(final String serviceType, final String groupName, final String endpointName, final RequestHandlerSource handlerSource, final int metric) throws IllegalArgumentException, IOException {
- return delegate.registerRemoteService(serviceType, groupName, endpointName, handlerSource, metric);
- }
-
- /**
- * {@inheritDoc} This implementation calls the same method on the delegate object.
- */
- public SimpleCloseable addServiceListener(final ServiceListener serviceListener, final boolean onlyNew) {
- return delegate.addServiceListener(serviceListener, true);
- }
-}
Modified: remoting3/trunk/api/src/main/java/org/jboss/remoting/HandleableCloseable.java
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/remoting/HandleableCloseable.java 2008-10-30 05:00:22 UTC (rev 4641)
+++ remoting3/trunk/api/src/main/java/org/jboss/remoting/HandleableCloseable.java 2008-11-04 02:34:36 UTC (rev 4642)
@@ -22,6 +22,18 @@
* takes place.
*
* @param handler the close handler
+ * @return a key which may be used to later remove this handler
*/
- void addCloseHandler(CloseHandler<? super T> handler);
+ Key addCloseHandler(CloseHandler<? super T> handler);
+
+ /**
+ * A key which may be used to remove this handler.
+ */
+ interface Key {
+
+ /**
+ * Remove the registered handler. Calling this method more than once has no additional effect.
+ */
+ void remove();
+ }
}
Deleted: remoting3/trunk/api/src/main/java/org/jboss/remoting/IOExceptionCarrier.java
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/remoting/IOExceptionCarrier.java 2008-10-30 05:00:22 UTC (rev 4641)
+++ remoting3/trunk/api/src/main/java/org/jboss/remoting/IOExceptionCarrier.java 2008-11-04 02:34:36 UTC (rev 4642)
@@ -1,29 +0,0 @@
-package org.jboss.remoting;
-
-import java.io.IOException;
-
-/**
- * A runtime exception that carries an {@link java.io.IOException} as a cause.
- */
-public class IOExceptionCarrier extends RuntimeException {
-
- private static final long serialVersionUID = -1602940590696531671L;
-
- /**
- * Construct a new carrier.
- *
- * @param cause the nested cause
- */
- public IOExceptionCarrier(IOException cause) {
- super(cause);
- }
-
- /**
- * Get the cause.
- *
- * @return the cause
- */
- public IOException getCause() {
- return (IOException) super.getCause();
- }
-}
Modified: remoting3/trunk/api/src/main/java/org/jboss/remoting/IndeterminateOutcomeException.java
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/remoting/IndeterminateOutcomeException.java 2008-10-30 05:00:22 UTC (rev 4641)
+++ remoting3/trunk/api/src/main/java/org/jboss/remoting/IndeterminateOutcomeException.java 2008-11-04 02:34:36 UTC (rev 4642)
@@ -3,7 +3,7 @@
/**
* An exception that is thrown when an operation terminates in such a way that the outcome cannot be known.
*/
-public class IndeterminateOutcomeException extends RemoteExecutionException {
+public class IndeterminateOutcomeException extends RemotingException {
private static final long serialVersionUID = 6304843915977033800L;
Added: remoting3/trunk/api/src/main/java/org/jboss/remoting/LocalServiceConfiguration.java
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/remoting/LocalServiceConfiguration.java (rev 0)
+++ remoting3/trunk/api/src/main/java/org/jboss/remoting/LocalServiceConfiguration.java 2008-11-04 02:34:36 UTC (rev 4642)
@@ -0,0 +1,77 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2008, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.remoting;
+
+/**
+ * A configuration for a service to be deployed into the endpoint.
+ */
+public final class LocalServiceConfiguration<I, O> {
+ private final RequestListener<I, O> requestListener;
+ private final Class<I> requestClass;
+ private final Class<O> replyClass;
+ private String serviceType;
+ private String groupName;
+ private int metric;
+
+ public LocalServiceConfiguration(final RequestListener<I, O> requestListener, final Class<I> requestClass, final Class<O> replyClass) {
+ this.requestListener = requestListener;
+ this.requestClass = requestClass;
+ this.replyClass = replyClass;
+ }
+
+ public RequestListener<I, O> getRequestListener() {
+ return requestListener;
+ }
+
+ public Class<I> getRequestClass() {
+ return requestClass;
+ }
+
+ public Class<O> getReplyClass() {
+ return replyClass;
+ }
+
+ public String getServiceType() {
+ return serviceType;
+ }
+
+ public void setServiceType(final String serviceType) {
+ this.serviceType = serviceType;
+ }
+
+ public String getGroupName() {
+ return groupName;
+ }
+
+ public void setGroupName(final String groupName) {
+ this.groupName = groupName;
+ }
+
+ public int getMetric() {
+ return metric;
+ }
+
+ public void setMetric(final int metric) {
+ this.metric = metric;
+ }
+}
Modified: remoting3/trunk/api/src/main/java/org/jboss/remoting/RemoteReplyException.java
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/remoting/RemoteReplyException.java 2008-10-30 05:00:22 UTC (rev 4641)
+++ remoting3/trunk/api/src/main/java/org/jboss/remoting/RemoteReplyException.java 2008-11-04 02:34:36 UTC (rev 4642)
@@ -23,7 +23,7 @@
package org.jboss.remoting;
/**
- * An exception indicating that a the remote side tried and failed to send a reply message; the remote side would be
+ * An exception indicating that a the remote side tried and failed to send a reply message. The remote side would be
* aware of this type of failure, so the outcome is determinate; thus it extends {@code RemoteExecutionException}.
*/
public class RemoteReplyException extends RemoteExecutionException {
Added: remoting3/trunk/api/src/main/java/org/jboss/remoting/RemoteRequestException.java
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/remoting/RemoteRequestException.java (rev 0)
+++ remoting3/trunk/api/src/main/java/org/jboss/remoting/RemoteRequestException.java 2008-11-04 02:34:36 UTC (rev 4642)
@@ -0,0 +1,71 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2008, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.remoting;
+
+/**
+ * Remote request exception. Indicates that the request arrived at the remote side, but the handler could not be
+ * executed for some reason (for example, the request sent might not be of the correct type).
+ */
+public final class RemoteRequestException extends RemotingException {
+
+ private static final long serialVersionUID = 5494334026096542700L;
+
+ /**
+ * Constructs a <tt>RemoteRequestException</tt> with no detail message. The cause is not initialized, and may
+ * subsequently be initialized by a call to {@link #initCause(Throwable) initCause}.
+ */
+ public RemoteRequestException() {
+ }
+
+ /**
+ * Constructs a <tt>RemoteRequestException</tt> with the specified detail message. The cause is not initialized, and may
+ * subsequently be initialized by a call to {@link #initCause(Throwable) initCause}.
+ *
+ * @param msg the detail message
+ */
+ public RemoteRequestException(String msg) {
+ super(msg);
+ }
+
+ /**
+ * Constructs a <tt>RemoteRequestException</tt> with the specified cause. The detail message is set to:
+ * <pre>
+ * (cause == null ? null : cause.toString())</pre>
+ * (which typically contains the class and detail message of <tt>cause</tt>).
+ *
+ * @param cause the cause (which is saved for later retrieval by the {@link #getCause()} method)
+ */
+ public RemoteRequestException(Throwable cause) {
+ super(cause);
+ }
+
+ /**
+ * Constructs a <tt>RemoteRequestException</tt> with the specified detail message and cause.
+ *
+ * @param msg the detail message
+ * @param cause the cause (which is saved for later retrieval by the {@link #getCause()} method)
+ */
+ public RemoteRequestException(String msg, Throwable cause) {
+ super(msg, cause);
+ }
+}
Added: remoting3/trunk/api/src/main/java/org/jboss/remoting/RemoteServiceConfiguration.java
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/remoting/RemoteServiceConfiguration.java (rev 0)
+++ remoting3/trunk/api/src/main/java/org/jboss/remoting/RemoteServiceConfiguration.java 2008-11-04 02:34:36 UTC (rev 4642)
@@ -0,0 +1,76 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2008, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.remoting;
+
+import org.jboss.remoting.spi.RequestHandlerSource;
+
+/**
+ *
+ */
+public final class RemoteServiceConfiguration {
+ private String serviceType;
+ private String groupName;
+ private String endpointName;
+ private RequestHandlerSource requestHandlerSource;
+ private int metric;
+
+ public String getServiceType() {
+ return serviceType;
+ }
+
+ public void setServiceType(final String serviceType) {
+ this.serviceType = serviceType;
+ }
+
+ public String getGroupName() {
+ return groupName;
+ }
+
+ public void setGroupName(final String groupName) {
+ this.groupName = groupName;
+ }
+
+ public String getEndpointName() {
+ return endpointName;
+ }
+
+ public void setEndpointName(final String endpointName) {
+ this.endpointName = endpointName;
+ }
+
+ public RequestHandlerSource getRequestHandlerSource() {
+ return requestHandlerSource;
+ }
+
+ public void setRequestHandlerSource(final RequestHandlerSource requestHandlerSource) {
+ this.requestHandlerSource = requestHandlerSource;
+ }
+
+ public int getMetric() {
+ return metric;
+ }
+
+ public void setMetric(final int metric) {
+ this.metric = metric;
+ }
+}
Modified: remoting3/trunk/api/src/main/java/org/jboss/remoting/ReplyException.java
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/remoting/ReplyException.java 2008-10-30 05:00:22 UTC (rev 4641)
+++ remoting3/trunk/api/src/main/java/org/jboss/remoting/ReplyException.java 2008-11-04 02:34:36 UTC (rev 4642)
@@ -23,24 +23,49 @@
package org.jboss.remoting;
/**
- *
+ * An exception type indicating that an operation completed and a reply was received, but it could not be delivered
+ * to the client. Possible causes include (but are not limited to) class cast problems and unmarshalling problems.
*/
public class ReplyException extends RemotingException {
private static final long serialVersionUID = 5562116026829381932L;
+ /**
+ * Constructs a <tt>ReplyException</tt> with no detail message. The cause is not initialized, and may subsequently be
+ * initialized by a call to {@link #initCause(Throwable) initCause}.
+ */
public ReplyException() {
}
- public ReplyException(final String msg) {
+ /**
+ * Constructs a <tt>ReplyException</tt> with the specified detail message. The cause is not initialized, and may
+ * subsequently be initialized by a call to {@link #initCause(Throwable) initCause}.
+ *
+ * @param msg the detail message
+ */
+ public ReplyException(String msg) {
super(msg);
}
- public ReplyException(final Throwable cause) {
+ /**
+ * Constructs a <tt>ReplyException</tt> with the specified cause. The detail message is set to:
+ * <pre>
+ * (cause == null ? null : cause.toString())</pre>
+ * (which typically contains the class and detail message of <tt>cause</tt>).
+ *
+ * @param cause the cause (which is saved for later retrieval by the {@link #getCause()} method)
+ */
+ public ReplyException(Throwable cause) {
super(cause);
}
- public ReplyException(final String msg, final Throwable cause) {
+ /**
+ * Constructs a <tt>ReplyException</tt> with the specified detail message and cause.
+ *
+ * @param msg the detail message
+ * @param cause the cause (which is saved for later retrieval by the {@link #getCause()} method)
+ */
+ public ReplyException(String msg, Throwable cause) {
super(msg, cause);
}
}
Modified: remoting3/trunk/api/src/main/java/org/jboss/remoting/RequestContext.java
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/remoting/RequestContext.java 2008-10-30 05:00:22 UTC (rev 4641)
+++ remoting3/trunk/api/src/main/java/org/jboss/remoting/RequestContext.java 2008-11-04 02:34:36 UTC (rev 4642)
@@ -1,6 +1,7 @@
package org.jboss.remoting;
import java.util.concurrent.Executor;
+import java.util.concurrent.RejectedExecutionException;
import java.io.IOException;
/**
@@ -74,5 +75,5 @@
*
* @param command the task to execute
*/
- void execute(Runnable command);
+ void execute(Runnable command) throws RejectedExecutionException;
}
Deleted: remoting3/trunk/api/src/main/java/org/jboss/remoting/RequestContextWrapper.java
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/remoting/RequestContextWrapper.java 2008-10-30 05:00:22 UTC (rev 4641)
+++ remoting3/trunk/api/src/main/java/org/jboss/remoting/RequestContextWrapper.java 2008-11-04 02:34:36 UTC (rev 4642)
@@ -1,73 +0,0 @@
-package org.jboss.remoting;
-
-import org.jboss.remoting.RequestCancelHandler;
-import org.jboss.remoting.RequestContext;
-import org.jboss.remoting.ClientContext;
-import java.io.IOException;
-
-/**
- * A simple delegating wrapper for request context instances.
- *
- * @param <O> the reply type
- */
-public class RequestContextWrapper<O> implements RequestContext<O> {
- protected final RequestContext<O> delegate;
-
- /**
- * Construct a new instance. Calls will be sent to the given {@code delegate} by default.
- *
- * @param delegate the delegate client instance
- */
- protected RequestContextWrapper(final RequestContext<O> delegate) {
- this.delegate = delegate;
- }
-
- /**
- * {@inheritDoc} This implementation calls the same method on the delegate object.
- */
- public ClientContext getContext() {
- return delegate.getContext();
- }
-
- /**
- * {@inheritDoc} This implementation calls the same method on the delegate object.
- */
- public boolean isCancelled() {
- return delegate.isCancelled();
- }
-
- /**
- * {@inheritDoc} This implementation calls the same method on the delegate object.
- */
- public void sendReply(O reply) throws IOException, IllegalStateException {
- delegate.sendReply(reply);
- }
-
- /**
- * {@inheritDoc} This implementation calls the same method on the delegate object.
- */
- public void sendFailure(String msg, Throwable cause) throws IOException, IllegalStateException {
- delegate.sendFailure(msg, cause);
- }
-
- /**
- * {@inheritDoc} This implementation calls the same method on the delegate object.
- */
- public void sendCancelled() throws IOException, IllegalStateException {
- delegate.sendCancelled();
- }
-
- /**
- * {@inheritDoc} This implementation calls the same method on the delegate object.
- */
- public void addCancelHandler(final RequestCancelHandler<O> requestCancelHandler) {
- delegate.addCancelHandler(requestCancelHandler);
- }
-
- /**
- * {@inheritDoc} This implementation calls the same method on the delegate object.
- */
- public void execute(final Runnable command) {
- delegate.execute(command);
- }
-}
Modified: remoting3/trunk/api/src/main/java/org/jboss/remoting/spi/AbstractAutoCloseable.java
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/remoting/spi/AbstractAutoCloseable.java 2008-10-30 05:00:22 UTC (rev 4641)
+++ remoting3/trunk/api/src/main/java/org/jboss/remoting/spi/AbstractAutoCloseable.java 2008-11-04 02:34:36 UTC (rev 4642)
@@ -26,13 +26,15 @@
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicInteger;
import org.jboss.remoting.RemotingException;
+import org.jboss.remoting.CloseHandler;
import org.jboss.xnio.log.Logger;
+import org.jboss.xnio.IoUtils;
/**
* A closeable implementation that supports reference counting. Since the initial reference count is zero, implementors
* must be careful to ensure that the first operation invoked is a call to {@link #getHandle()}.
*/
-public abstract class AbstractAutoCloseable<T> extends AbstractHandleableCloseable<T> {
+public abstract class AbstractAutoCloseable<T> extends AbstractHandleableCloseable<T> implements AutoCloseable<T> {
private final AtomicInteger refcount = new AtomicInteger(0);
private final Executor executor;
@@ -100,12 +102,20 @@
}
private final class HandleImpl extends AbstractHandleableCloseable<Handle<T>> implements Handle<T> {
+ private final Key key;
+
private HandleImpl() throws IOException {
super(AbstractAutoCloseable.this.executor);
+ key = AbstractAutoCloseable.this.addCloseHandler(new CloseHandler<T>() {
+ public void handleClose(final T closed) {
+ IoUtils.safeClose(HandleImpl.this);
+ }
+ });
inc();
}
protected void closeAction() throws IOException {
+ key.remove();
dec();
}
Modified: remoting3/trunk/api/src/main/java/org/jboss/remoting/spi/AbstractHandleableCloseable.java
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/remoting/spi/AbstractHandleableCloseable.java 2008-10-30 05:00:22 UTC (rev 4641)
+++ remoting3/trunk/api/src/main/java/org/jboss/remoting/spi/AbstractHandleableCloseable.java 2008-11-04 02:34:36 UTC (rev 4642)
@@ -126,12 +126,17 @@
/**
* {@inheritDoc}
*/
- public void addCloseHandler(final CloseHandler<? super T> handler) {
+ public Key addCloseHandler(final CloseHandler<? super T> handler) {
synchronized (closeLock) {
if (closeHandlers == null) {
closeHandlers = new HashSet<CloseHandler<? super T>>();
}
closeHandlers.add(handler);
+ return new Key() {
+ public void remove() {
+ closeHandlers.remove(handler);
+ }
+ };
}
}
Added: remoting3/trunk/api/src/main/java/org/jboss/remoting/spi/AutoCloseable.java
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/remoting/spi/AutoCloseable.java (rev 0)
+++ remoting3/trunk/api/src/main/java/org/jboss/remoting/spi/AutoCloseable.java 2008-11-04 02:34:36 UTC (rev 4642)
@@ -0,0 +1,33 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2008, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.remoting.spi;
+
+import org.jboss.remoting.HandleableCloseable;
+import java.io.IOException;
+
+/**
+ *
+ */
+public interface AutoCloseable<T> extends HandleableCloseable<T> {
+ Handle<T> getHandle() throws IOException;
+}
Modified: remoting3/trunk/api/src/main/java/org/jboss/remoting/spi/Handle.java
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/remoting/spi/Handle.java 2008-10-30 05:00:22 UTC (rev 4641)
+++ remoting3/trunk/api/src/main/java/org/jboss/remoting/spi/Handle.java 2008-11-04 02:34:36 UTC (rev 4642)
@@ -50,5 +50,5 @@
*
* @param handler the handler
*/
- void addCloseHandler(final CloseHandler<? super Handle<T>> handler);
+ Key addCloseHandler(final CloseHandler<? super Handle<T>> handler);
}
Modified: remoting3/trunk/api/src/main/java/org/jboss/remoting/spi/RequestHandler.java
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/remoting/spi/RequestHandler.java 2008-10-30 05:00:22 UTC (rev 4641)
+++ remoting3/trunk/api/src/main/java/org/jboss/remoting/spi/RequestHandler.java 2008-11-04 02:34:36 UTC (rev 4642)
@@ -24,14 +24,12 @@
import java.io.IOException;
import org.jboss.remoting.CloseHandler;
-import org.jboss.remoting.HandleableCloseable;
-import org.jboss.remoting.RemotingException;
/**
* A request handler, which can be passed to remote endpoints. Remote systems can then use the handler
* to make invocations, or they may forward a handler on to other remote systems.
*/
-public interface RequestHandler extends HandleableCloseable<RequestHandler> {
+public interface RequestHandler extends AutoCloseable<RequestHandler> {
/**
* Receive a request from a remote system. This method is intended to be called by protocol handlers. If the
@@ -60,7 +58,7 @@
* Close this request handler. The outcome of any outstanding requests is not defined, though implementations
* should make an effort to cancel any outstanding requests.
*
- * @throws RemotingException if the client endpoint could not be closed
+ * @throws java.io.IOException if the client endpoint could not be closed
*/
void close() throws IOException;
@@ -69,5 +67,5 @@
*
* @param handler the handler to be called
*/
- void addCloseHandler(final CloseHandler<? super RequestHandler> handler);
+ Key addCloseHandler(final CloseHandler<? super RequestHandler> handler);
}
Modified: remoting3/trunk/api/src/main/java/org/jboss/remoting/spi/RequestHandlerSource.java
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/remoting/spi/RequestHandlerSource.java 2008-10-30 05:00:22 UTC (rev 4641)
+++ remoting3/trunk/api/src/main/java/org/jboss/remoting/spi/RequestHandlerSource.java 2008-11-04 02:34:36 UTC (rev 4642)
@@ -24,7 +24,6 @@
import java.io.IOException;
import org.jboss.remoting.CloseHandler;
-import org.jboss.remoting.HandleableCloseable;
import org.jboss.remoting.RemotingException;
/**
@@ -33,7 +32,7 @@
* has the advantage that a round trip to the remote side is not necessary; the local side can spawn a request handler
* and simply notify the remote side of the change.
*/
-public interface RequestHandlerSource extends HandleableCloseable<RequestHandlerSource> {
+public interface RequestHandlerSource extends AutoCloseable<RequestHandlerSource> {
/**
* Create a request handler for the service corresponding to this request handler source.
@@ -64,5 +63,5 @@
*
* @param handler the handler to be called
*/
- void addCloseHandler(final CloseHandler<? super RequestHandlerSource> handler);
+ Key addCloseHandler(final CloseHandler<? super RequestHandlerSource> handler);
}
Deleted: remoting3/trunk/api/src/main/java/org/jboss/remoting/stream/ObjectSinkWrapper.java
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/remoting/stream/ObjectSinkWrapper.java 2008-10-30 05:00:22 UTC (rev 4641)
+++ remoting3/trunk/api/src/main/java/org/jboss/remoting/stream/ObjectSinkWrapper.java 2008-11-04 02:34:36 UTC (rev 4642)
@@ -1,26 +0,0 @@
-package org.jboss.remoting.stream;
-
-import java.io.IOException;
-
-/**
- *
- */
-public class ObjectSinkWrapper<T> implements ObjectSink<T> {
- private final ObjectSink<T> target;
-
- public ObjectSinkWrapper(final ObjectSink<T> target) {
- this.target = target;
- }
-
- public void accept(final T instance) throws IOException {
- target.accept(instance);
- }
-
- public void flush() throws IOException {
- target.flush();
- }
-
- public void close() throws IOException {
- target.close();
- }
-}
Deleted: remoting3/trunk/api/src/main/java/org/jboss/remoting/stream/ObjectSourceWrapper.java
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/remoting/stream/ObjectSourceWrapper.java 2008-10-30 05:00:22 UTC (rev 4641)
+++ remoting3/trunk/api/src/main/java/org/jboss/remoting/stream/ObjectSourceWrapper.java 2008-11-04 02:34:36 UTC (rev 4642)
@@ -1,26 +0,0 @@
-package org.jboss.remoting.stream;
-
-import java.io.IOException;
-
-/**
- *
- */
-public class ObjectSourceWrapper<T> implements ObjectSource<T> {
- private final ObjectSource<T> target;
-
- public ObjectSourceWrapper(final ObjectSource<T> target) {
- this.target = target;
- }
-
- public boolean hasNext() throws IOException {
- return target.hasNext();
- }
-
- public T next() throws IOException {
- return target.next();
- }
-
- public void close() throws IOException {
- target.close();
- }
-}
Modified: remoting3/trunk/api/src/test/java/org/jboss/remoting/spi/CloseableTestCase.java
===================================================================
--- remoting3/trunk/api/src/test/java/org/jboss/remoting/spi/CloseableTestCase.java 2008-10-30 05:00:22 UTC (rev 4641)
+++ remoting3/trunk/api/src/test/java/org/jboss/remoting/spi/CloseableTestCase.java 2008-11-04 02:34:36 UTC (rev 4642)
@@ -27,11 +27,11 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
import org.jboss.xnio.IoUtils;
import org.jboss.remoting.CloseHandler;
import org.jboss.remoting.test.support.LoggingHelper;
-import org.jboss.remoting.spi.Handle;
/**
*
@@ -179,4 +179,17 @@
executorService.shutdownNow();
}
}
+
+ public void testHandlerRemoval() throws Throwable {
+ final Executor executor = IoUtils.directExecutor();
+ final AbstractAutoCloseable<Object> closeable = new AbstractAutoCloseable<Object>(executor) {
+ // empty
+ };
+ final Handle<Object> rootHandle = closeable.getHandle();
+ try {
+
+ } finally {
+ IoUtils.safeClose(closeable);
+ }
+ }
}
Modified: remoting3/trunk/build.properties
===================================================================
--- remoting3/trunk/build.properties 2008-10-30 05:00:22 UTC (rev 4641)
+++ remoting3/trunk/build.properties 2008-11-04 02:34:36 UTC (rev 4642)
@@ -115,7 +115,7 @@
lib.jboss-managed.local=${local.repository}/${lib.jboss-managed.local-path}
lib.jboss-managed.remote=${remote.repository}/${lib.jboss-managed.remote-path}
-lib.marshalling-api.version=1.0.0.Beta2
+lib.marshalling-api.version=1.0.0.CR1
lib.marshalling-api.name=marshalling-api.jar
lib.marshalling-api.license=lgpl
lib.marshalling-api.dir=jboss/marshalling/${lib.marshalling-api.version}/lib
Modified: remoting3/trunk/core/src/main/java/org/jboss/remoting/core/ClientExternalizer.java
===================================================================
--- remoting3/trunk/core/src/main/java/org/jboss/remoting/core/ClientExternalizer.java 2008-10-30 05:00:22 UTC (rev 4641)
+++ remoting3/trunk/core/src/main/java/org/jboss/remoting/core/ClientExternalizer.java 2008-11-04 02:34:36 UTC (rev 4642)
@@ -42,15 +42,25 @@
this.endpoint = endpoint;
}
- @SuppressWarnings({ "unchecked" })
+ private static <I, O> void doWriteExternal(final ClientImpl<I, O> client, final ObjectOutput output) throws IOException {
+ output.writeObject(client.getRequestClass());
+ output.writeObject(client.getReplyClass());
+ output.writeObject(client.getRequestHandlerHandle().getResource());
+ }
+
public void writeExternal(final Object o, final ObjectOutput output) throws IOException {
- output.writeObject(((ClientImpl)o).getRequestHandlerHandle().getResource());
+ doWriteExternal((ClientImpl<?, ?>) o, output);
}
- @SuppressWarnings({ "unchecked" })
+ private <I, O> ClientImpl<I, O> doCreateExternal(Class<I> requestClass, Class<O> replyClass, RequestHandler handler) throws IOException {
+ return new ClientImpl<I, O>(handler.getHandle(), endpoint.getExecutor(), requestClass, replyClass);
+ }
+
public Object createExternal(final Class<?> aClass, final ObjectInput input, final Creator creator) throws IOException, ClassNotFoundException {
+ final Class<?> requestClass = (Class<?>) input.readObject();
+ final Class<?> replyClass = (Class<?>) input.readObject();
final RequestHandler handler = (RequestHandler) input.readObject();
- return new ClientImpl(handler.getHandle(), endpoint.getExecutor());
+ return doCreateExternal(requestClass, replyClass, handler);
}
public void readExternal(final Object o, final ObjectInput input) throws IOException, ClassNotFoundException {
Modified: remoting3/trunk/core/src/main/java/org/jboss/remoting/core/ClientImpl.java
===================================================================
--- remoting3/trunk/core/src/main/java/org/jboss/remoting/core/ClientImpl.java 2008-10-30 05:00:22 UTC (rev 4641)
+++ remoting3/trunk/core/src/main/java/org/jboss/remoting/core/ClientImpl.java 2008-11-04 02:34:36 UTC (rev 4642)
@@ -39,10 +39,14 @@
public final class ClientImpl<I, O> extends AbstractContextImpl<Client<I, O>> implements Client<I, O> {
private final Handle<RequestHandler> handle;
+ private final Class<I> requestClass;
+ private final Class<O> replyClass;
- ClientImpl(final Handle<RequestHandler> handle, final Executor executor) {
+ ClientImpl(final Handle<RequestHandler> handle, final Executor executor, final Class<I> requestClass, final Class<O> replyClass) {
super(executor);
this.handle = handle;
+ this.requestClass = requestClass;
+ this.replyClass = replyClass;
}
protected void closeAction() throws IOException {
@@ -53,10 +57,11 @@
if (! isOpen()) {
throw new IOException("Client is not open");
}
+ final I actualRequest = requestClass.cast(request);
final QueueExecutor executor = new QueueExecutor();
- final FutureReplyImpl<O> futureReply = new FutureReplyImpl<O>(executor);
+ final FutureReplyImpl<O> futureReply = new FutureReplyImpl<O>(executor, replyClass);
final ReplyHandler replyHandler = futureReply.getReplyHandler();
- final RemoteRequestContext requestContext = handle.getResource().receiveRequest(request, replyHandler);
+ final RemoteRequestContext requestContext = handle.getResource().receiveRequest(actualRequest, replyHandler);
futureReply.setRemoteRequestContext(requestContext);
futureReply.addNotifier(new IoFuture.Notifier<O>() {
public void notify(final IoFuture<O> future) {
@@ -80,7 +85,7 @@
if (! isOpen()) {
throw new IOException("Client is not open");
}
- final FutureReplyImpl<O> futureReply = new FutureReplyImpl<O>(executor);
+ final FutureReplyImpl<O> futureReply = new FutureReplyImpl<O>(executor, replyClass);
final ReplyHandler replyHandler = futureReply.getReplyHandler();
final RemoteRequestContext requestContext = handle.getResource().receiveRequest(request, replyHandler);
futureReply.setRemoteRequestContext(requestContext);
@@ -94,4 +99,12 @@
Handle<RequestHandler> getRequestHandlerHandle() {
return handle;
}
+
+ Class<I> getRequestClass() {
+ return requestClass;
+ }
+
+ Class<O> getReplyClass() {
+ return replyClass;
+ }
}
Modified: remoting3/trunk/core/src/main/java/org/jboss/remoting/core/ClientSourceExternalizer.java
===================================================================
--- remoting3/trunk/core/src/main/java/org/jboss/remoting/core/ClientSourceExternalizer.java 2008-10-30 05:00:22 UTC (rev 4641)
+++ remoting3/trunk/core/src/main/java/org/jboss/remoting/core/ClientSourceExternalizer.java 2008-11-04 02:34:36 UTC (rev 4642)
@@ -42,15 +42,25 @@
this.endpoint = endpoint;
}
- @SuppressWarnings({ "unchecked" })
+ private static <I, O> void doWriteExternal(final ClientSourceImpl<I, O> clientSource, final ObjectOutput output) throws IOException {
+ output.writeObject(clientSource.getRequestClass());
+ output.writeObject(clientSource.getReplyClass());
+ output.writeObject(clientSource.getRequestHandlerSourceHandle().getResource());
+ }
+
public void writeExternal(final Object o, final ObjectOutput output) throws IOException {
- output.writeObject(((ClientSourceImpl) o).getRequestHandlerSourceHandle().getResource());
+ doWriteExternal((ClientSourceImpl<?, ?>) o, output);
}
- @SuppressWarnings({ "unchecked" })
+ private <I, O> ClientSourceImpl<I, O> doCreateExternal(Class<I> requestClass, Class<O> replyClass, RequestHandlerSource handlerSource) throws IOException {
+ return new ClientSourceImpl<I, O>(handlerSource.getHandle(), endpoint, requestClass, replyClass);
+ }
+
public Object createExternal(final Class<?> aClass, final ObjectInput input, final Creator creator) throws IOException, ClassNotFoundException {
- final RequestHandlerSource handler = (RequestHandlerSource) input.readObject();
- return new ClientSourceImpl(handler.getHandle(), endpoint);
+ final Class<?> requestClass = (Class<?>) input.readObject();
+ final Class<?> replyClass = (Class<?>) input.readObject();
+ final RequestHandlerSource handlerSource = (RequestHandlerSource) input.readObject();
+ return doCreateExternal(requestClass, replyClass, handlerSource);
}
public void readExternal(final Object o, final ObjectInput input) throws IOException, ClassNotFoundException {
Modified: remoting3/trunk/core/src/main/java/org/jboss/remoting/core/ClientSourceImpl.java
===================================================================
--- remoting3/trunk/core/src/main/java/org/jboss/remoting/core/ClientSourceImpl.java 2008-10-30 05:00:22 UTC (rev 4641)
+++ remoting3/trunk/core/src/main/java/org/jboss/remoting/core/ClientSourceImpl.java 2008-11-04 02:34:36 UTC (rev 4642)
@@ -39,11 +39,15 @@
private final Handle<RequestHandlerSource> handle;
private final Endpoint endpoint;
+ private final Class<I> requestClass;
+ private final Class<O> replyClass;
- ClientSourceImpl(final Handle<RequestHandlerSource> handle, final EndpointImpl endpoint) {
+ ClientSourceImpl(final Handle<RequestHandlerSource> handle, final EndpointImpl endpoint, final Class<I> requestClass, final Class<O> replyClass) {
super(endpoint.getExecutor());
this.handle = handle;
this.endpoint = endpoint;
+ this.requestClass = requestClass;
+ this.replyClass = replyClass;
}
protected void closeAction() throws IOException {
@@ -56,7 +60,7 @@
}
final Handle<RequestHandler> clientHandle = handle.getResource().createRequestHandler();
try {
- return endpoint.createClient(clientHandle.getResource());
+ return endpoint.createClient(clientHandle.getResource(), requestClass, replyClass);
} finally {
IoUtils.safeClose(clientHandle);
}
@@ -69,4 +73,12 @@
Handle<RequestHandlerSource> getRequestHandlerSourceHandle() {
return handle;
}
+
+ Class<I> getRequestClass() {
+ return requestClass;
+ }
+
+ Class<O> getReplyClass() {
+ return replyClass;
+ }
}
Modified: remoting3/trunk/core/src/main/java/org/jboss/remoting/core/EndpointImpl.java
===================================================================
--- remoting3/trunk/core/src/main/java/org/jboss/remoting/core/EndpointImpl.java 2008-10-30 05:00:22 UTC (rev 4641)
+++ remoting3/trunk/core/src/main/java/org/jboss/remoting/core/EndpointImpl.java 2008-11-04 02:34:36 UTC (rev 4642)
@@ -11,6 +11,7 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
+import java.security.AccessController;
import org.jboss.remoting.Client;
import org.jboss.remoting.ClientSource;
import org.jboss.remoting.CloseHandler;
@@ -18,6 +19,9 @@
import org.jboss.remoting.RequestListener;
import org.jboss.remoting.ServiceListener;
import org.jboss.remoting.SimpleCloseable;
+import org.jboss.remoting.LocalServiceConfiguration;
+import org.jboss.remoting.EndpointPermission;
+import org.jboss.remoting.RemoteServiceConfiguration;
import org.jboss.remoting.spi.AbstractSimpleCloseable;
import org.jboss.remoting.spi.Handle;
import org.jboss.remoting.spi.RequestHandler;
@@ -36,13 +40,15 @@
/**
*
*/
-public class EndpointImpl implements Endpoint {
+public final class EndpointImpl implements Endpoint {
static {
// Print Remoting "greeting" message
Logger.getLogger("org.jboss.remoting").info("JBoss Remoting version %s", Version.VERSION);
}
+ private static final Logger log = Logger.getLogger(Endpoint.class);
+
private String name;
private OrderedExecutorFactory orderedExecutorFactory;
@@ -55,6 +61,13 @@
private final Map<Object, ServiceListenerRegistration> serviceListenerMap = CollectionUtil.hashMap();
private final Set<ServiceRegistration> serviceRegistrations = CollectionUtil.hashSet();
+ private static final EndpointPermission CREATE_REQUEST_HANDLER_PERM = new EndpointPermission("createRequestHandler");
+ private static final EndpointPermission REGISTER_SERVICE_PERM = new EndpointPermission("registerService");
+ private static final EndpointPermission CREATE_CLIENT_PERM = new EndpointPermission("createClient");
+ private static final EndpointPermission CREATE_CLIENT_SOURCE_PERM = new EndpointPermission("createClientSource");
+ private static final EndpointPermission REGISTER_REMOTE_SERVICE_PERM = new EndpointPermission("registerRemoteService");
+ private static final EndpointPermission ADD_SERVICE_LISTENER_PERM = new EndpointPermission("addServiceListener");
+
public EndpointImpl() {
}
@@ -135,14 +148,23 @@
return endpointMap;
}
- public <I, O> Handle<RequestHandler> createRequestHandler(final RequestListener<I, O> requestListener) throws IOException {
- final LocalRequestHandler<I, O> localRequestHandler = new LocalRequestHandler<I, O>(executor, requestListener);
+ public <I, O> Handle<RequestHandler> createRequestHandler(final RequestListener<I, O> requestListener, final Class<I> requestClass, final Class<O> replyClass) throws IOException {
+ AccessController.checkPermission(CREATE_REQUEST_HANDLER_PERM);
+ LocalRequestHandler.Config<I, O> config = new LocalRequestHandler.Config<I,O>(requestClass, replyClass);
+ config.setExecutor(executor);
+ config.setRequestListener(requestListener);
+ config.setClientContext(new ClientContextImpl(executor));
+ final LocalRequestHandler<I, O> localRequestHandler = new LocalRequestHandler<I, O>(config);
localRequestHandler.addCloseHandler(remover);
localRequestHandler.open();
return localRequestHandler.getHandle();
}
- public <I, O> Handle<RequestHandlerSource> createRequestHandlerSource(final RequestListener<I, O> requestListener, final String serviceType, final String groupName) throws IOException {
+ public <I, O> Handle<RequestHandlerSource> registerService(final LocalServiceConfiguration<I, O> configuration) throws IOException {
+ AccessController.checkPermission(REGISTER_SERVICE_PERM);
+ final String serviceType = configuration.getServiceType();
+ final String groupName = configuration.getGroupName();
+ final int metric = configuration.getMetric();
if (serviceType == null) {
throw new NullPointerException("serviceType is null");
}
@@ -155,7 +177,13 @@
if (groupName.length() == 0) {
throw new IllegalArgumentException("groupName is empty");
}
- final LocalRequestHandlerSource<I, O> localRequestHandlerSource = new LocalRequestHandlerSource<I, O>(executor, requestListener);
+ if (metric < 0) {
+ throw new IllegalArgumentException("metric must be greater than or equal to zero");
+ }
+ final LocalRequestHandlerSource.Config<I, O> config = new LocalRequestHandlerSource.Config<I,O>(configuration.getRequestClass(), configuration.getReplyClass());
+ config.setRequestListener(configuration.getRequestListener());
+ config.setExecutor(executor);
+ final LocalRequestHandlerSource<I, O> localRequestHandlerSource = new LocalRequestHandlerSource<I, O>(config);
final ServiceRegistration registration = new ServiceRegistration(serviceType, groupName, name, localRequestHandlerSource);
final AbstractSimpleCloseable newHandle = new AbstractSimpleCloseable(executor) {
protected void closeAction() throws IOException {
@@ -169,11 +197,11 @@
serviceRegistrations.add(registration);
for (final ServiceListenerRegistration slr : serviceListenerMap.values()) {
final ServiceListener listener = slr.getServiceListener();
- slr.getExecutor().execute(new Runnable() {
- public void run() {
- listener.localServiceCreated(slr.handle, serviceType, groupName, localRequestHandlerSource);
- }
- });
+ try {
+ listener.localServiceCreated(slr.handle, serviceType, groupName, localRequestHandlerSource);
+ } catch (Throwable t) {
+ logListenerError(t);
+ }
}
}
localRequestHandlerSource.addCloseHandler(remover);
@@ -181,11 +209,16 @@
return localRequestHandlerSource.getHandle();
}
- public <I, O> Client<I, O> createClient(final RequestHandler requestHandler) throws IOException {
+ private static void logListenerError(final Throwable t) {
+ log.error(t, "Service listener threw an exception");
+ }
+
+ public <I, O> Client<I, O> createClient(final RequestHandler requestHandler, final Class<I> requestType, final Class<O> replyType) throws IOException {
+ AccessController.checkPermission(CREATE_CLIENT_PERM);
boolean ok = false;
final Handle<RequestHandler> handle = requestHandler.getHandle();
try {
- final ClientImpl<I, O> client = new ClientImpl<I, O>(handle, executor);
+ final ClientImpl<I, O> client = new ClientImpl<I, O>(handle, executor, requestType, replyType);
client.addCloseHandler(new CloseHandler<Client<I, O>>() {
public void handleClose(final Client<I, O> closed) {
IoUtils.safeClose(handle);
@@ -200,11 +233,12 @@
}
}
- public <I, O> ClientSource<I, O> createClientSource(final RequestHandlerSource requestHandlerSource) throws IOException {
+ public <I, O> ClientSource<I, O> createClientSource(final RequestHandlerSource requestHandlerSource, final Class<I> requestClass, final Class<O> replyClass) throws IOException {
+ AccessController.checkPermission(CREATE_CLIENT_SOURCE_PERM);
boolean ok = false;
final Handle<RequestHandlerSource> handle = requestHandlerSource.getHandle();
try {
- final ClientSourceImpl<I, O> clientSource = new ClientSourceImpl<I, O>(handle, this);
+ final ClientSourceImpl<I, O> clientSource = new ClientSourceImpl<I, O>(handle, this, requestClass, replyClass);
ok = true;
return clientSource;
} finally {
@@ -214,7 +248,7 @@
}
}
- public <I, O> IoFuture<ClientSource<I, O>> locateService(final URI serviceUri) throws IllegalArgumentException {
+ public <I, O> IoFuture<ClientSource<I, O>> locateService(final URI serviceUri, final Class<I> requestClass, final Class<O> replyClass) throws IllegalArgumentException {
if (serviceUri == null) {
throw new NullPointerException("serviceUri is null");
}
@@ -262,7 +296,7 @@
}
try {
// match!
- final ClientSource<I, O> clientSource = createClientSource(requestHandlerSource);
+ final ClientSource<I, O> clientSource = createClientSource(requestHandlerSource, requestClass, replyClass);
futureClientSource.setResult(clientSource);
} catch (IOException e) {
futureClientSource.setException(e);
@@ -282,14 +316,23 @@
handlerSource = candidates.get(idx).getHandlerSource();
}
try {
- return new FinishedIoFuture<ClientSource<I,O>>(EndpointImpl.this.<I, O>createClientSource(handlerSource));
+ return new FinishedIoFuture<ClientSource<I,O>>(createClientSource(handlerSource, requestClass, replyClass));
} catch (IOException e) {
return new FailedIoFuture<ClientSource<I,O>>(e);
}
}
}
- public SimpleCloseable registerRemoteService(final String serviceType, final String groupName, final String endpointName, final RequestHandlerSource handlerSource, final int metric) throws IllegalArgumentException, IOException {
+ public SimpleCloseable registerRemoteService(final RemoteServiceConfiguration configuration) throws IllegalArgumentException, IOException {
+ AccessController.checkPermission(REGISTER_REMOTE_SERVICE_PERM);
+ final RequestHandlerSource handlerSource = configuration.getRequestHandlerSource();
+ final String serviceType = configuration.getServiceType();
+ final String groupName = configuration.getGroupName();
+ final String endpointName = configuration.getEndpointName();
+ final int metric = configuration.getMetric();
+ if (handlerSource == null) {
+ throw new NullPointerException("handlerSource is null");
+ }
if (serviceType == null) {
throw new NullPointerException("serviceType is null");
}
@@ -327,21 +370,21 @@
serviceRegistrations.add(registration);
for (final ServiceListenerRegistration slr : serviceListenerMap.values()) {
final ServiceListener listener = slr.getServiceListener();
- slr.getExecutor().execute(new Runnable() {
- public void run() {
- listener.remoteServiceRegistered(slr.handle, endpointName, serviceType, groupName, metric, handlerSource, newHandle);
- }
- });
+ try {
+ listener.remoteServiceRegistered(slr.handle, endpointName, serviceType, groupName, metric, handlerSource, newHandle);
+ } catch (Throwable t) {
+ logListenerError(t);
+ }
}
}
return newHandle;
}
public SimpleCloseable addServiceListener(final ServiceListener serviceListener, final boolean onlyNew) {
+ AccessController.checkPermission(ADD_SERVICE_LISTENER_PERM);
final Object key = new Object();
synchronized (serviceLock) {
- final Executor orderedExecutor = getOrderedExecutor();
- final ServiceListenerRegistration registration = new ServiceListenerRegistration(serviceListener, orderedExecutor);
+ final ServiceListenerRegistration registration = new ServiceListenerRegistration(serviceListener);
serviceListenerMap.put(key, registration);
final AbstractSimpleCloseable handle = new AbstractSimpleCloseable(executor) {
protected void closeAction() throws IOException {
@@ -353,18 +396,14 @@
registration.setHandle(handle);
if (! onlyNew) {
for (final ServiceRegistration reg : serviceRegistrations) {
- if (reg.isRemote()) { // x is remote
- orderedExecutor.execute(new Runnable() {
- public void run() {
- serviceListener.remoteServiceRegistered(handle, reg.getEndpointName(), reg.getServiceType(), reg.getGroupName(), reg.getMetric(), reg.getHandlerSource(), reg.getHandle());
- }
- });
- } else { // x is local
- orderedExecutor.execute(new Runnable() {
- public void run() {
- serviceListener.localServiceCreated(handle, reg.getServiceType(), reg.getGroupName(), reg.getHandlerSource());
- }
- });
+ try {
+ if (reg.isRemote()) { // x is remote
+ serviceListener.remoteServiceRegistered(handle, reg.getEndpointName(), reg.getServiceType(), reg.getGroupName(), reg.getMetric(), reg.getHandlerSource(), reg.getHandle());
+ } else { // x is local
+ serviceListener.localServiceCreated(handle, reg.getServiceType(), reg.getGroupName(), reg.getHandlerSource());
+ }
+ } catch (Throwable t) {
+ logListenerError(t);
}
}
}
@@ -374,22 +413,16 @@
private static final class ServiceListenerRegistration {
private final ServiceListener serviceListener;
- private final Executor executor;
private volatile SimpleCloseable handle;
- private ServiceListenerRegistration(final ServiceListener serviceListener, final Executor executor) {
+ private ServiceListenerRegistration(final ServiceListener serviceListener) {
this.serviceListener = serviceListener;
- this.executor = executor;
}
ServiceListener getServiceListener() {
return serviceListener;
}
- Executor getExecutor() {
- return executor;
- }
-
void setHandle(final SimpleCloseable handle) {
this.handle = handle;
}
Modified: remoting3/trunk/core/src/main/java/org/jboss/remoting/core/FutureReplyImpl.java
===================================================================
--- remoting3/trunk/core/src/main/java/org/jboss/remoting/core/FutureReplyImpl.java 2008-10-30 05:00:22 UTC (rev 4641)
+++ remoting3/trunk/core/src/main/java/org/jboss/remoting/core/FutureReplyImpl.java 2008-11-04 02:34:36 UTC (rev 4642)
@@ -26,6 +26,7 @@
import java.util.concurrent.Executor;
import org.jboss.remoting.spi.RemoteRequestContext;
import org.jboss.remoting.spi.ReplyHandler;
+import org.jboss.remoting.ReplyException;
import org.jboss.xnio.AbstractIoFuture;
import org.jboss.xnio.IoFuture;
@@ -35,11 +36,13 @@
public final class FutureReplyImpl<O> extends AbstractIoFuture<O> {
private final Executor executor;
+ private final Class<O> replyType;
private final ReplyHandler replyHandler = new Handler();
private volatile RemoteRequestContext remoteRequestContext;
- public FutureReplyImpl(final Executor executor) {
+ public FutureReplyImpl(final Executor executor, final Class<O> replyType) {
this.executor = executor;
+ this.replyType = replyType;
}
void setRemoteRequestContext(final RemoteRequestContext remoteRequestContext) {
@@ -62,9 +65,15 @@
private final class Handler implements ReplyHandler {
- @SuppressWarnings({ "unchecked" })
public void handleReply(final Object reply) {
- setResult((O) reply);
+ final O actualReply;
+ try {
+ actualReply = replyType.cast(reply);
+ } catch (ClassCastException e) {
+ setException(new ReplyException("Reply was of the wrong type (got <" + reply.getClass().getName() + ">; expected <? extends " + replyType.getName() + ">"));
+ return;
+ }
+ setResult(actualReply);
}
public void handleException(final IOException exception) {
Modified: remoting3/trunk/core/src/main/java/org/jboss/remoting/core/LocalRequestHandler.java
===================================================================
--- remoting3/trunk/core/src/main/java/org/jboss/remoting/core/LocalRequestHandler.java 2008-10-30 05:00:22 UTC (rev 4641)
+++ remoting3/trunk/core/src/main/java/org/jboss/remoting/core/LocalRequestHandler.java 2008-11-04 02:34:36 UTC (rev 4642)
@@ -24,9 +24,11 @@
import java.io.IOException;
import java.util.concurrent.Executor;
+import java.util.concurrent.RejectedExecutionException;
import org.jboss.remoting.CloseHandler;
import org.jboss.remoting.RemoteExecutionException;
import org.jboss.remoting.RequestListener;
+import org.jboss.remoting.RemoteRequestException;
import org.jboss.remoting.spi.AbstractAutoCloseable;
import org.jboss.remoting.spi.RemoteRequestContext;
import org.jboss.remoting.spi.ReplyHandler;
@@ -41,37 +43,44 @@
private final RequestListener<I, O> requestListener;
private final ClientContextImpl clientContext;
+ private final Class<I> requestClass;
+ private final Class<O> replyClass;
private static final Logger log = Logger.getLogger(LocalRequestHandler.class);
- private LocalRequestHandler(final Executor executor, final RequestListener<I, O> requestListener, final ClientContextImpl clientContext) {
- super(executor);
- this.requestListener = requestListener;
- this.clientContext = clientContext;
+ LocalRequestHandler(Config<I, O> config) {
+ super(config.getExecutor());
+ requestListener = config.getRequestListener();
+ clientContext = config.getClientContext();
+ requestClass = config.getRequestClass();
+ replyClass = config.getReplyClass();
}
- LocalRequestHandler(final Executor executor, final LocalRequestHandlerSource<I, O> service, final RequestListener<I, O> requestListener) {
- this(executor, requestListener, new ClientContextImpl(service.getServiceContext()));
- }
-
- LocalRequestHandler(final Executor executor, final RequestListener<I, O> requestListener) {
- this(executor, requestListener, new ClientContextImpl(executor));
- }
-
public RemoteRequestContext receiveRequest(final Object request, final ReplyHandler replyHandler) {
- final RequestContextImpl<O> context = new RequestContextImpl<O>(replyHandler, clientContext);
- context.execute(new Runnable() {
- @SuppressWarnings({ "unchecked" })
- public void run() {
- try {
- requestListener.handleRequest(context, (I) request);
- } catch (RemoteExecutionException e) {
- SpiUtils.safeHandleException(replyHandler, e);
- } catch (Throwable t) {
- SpiUtils.safeHandleException(replyHandler, new RemoteExecutionException("Request handler threw an exception", t));
+ final RequestContextImpl<O> context = new RequestContextImpl<O>(replyHandler, clientContext, replyClass);
+ try {
+ final I castRequest;
+ try {
+ castRequest = requestClass.cast(request);
+ } catch (ClassCastException e) {
+ SpiUtils.safeHandleException(replyHandler, new RemoteRequestException("Request is the wrong type; expected " + requestClass + " but got " + request.getClass()));
+ return SpiUtils.getBlankRemoteRequestContext();
+ }
+ context.execute(new Runnable() {
+ public void run() {
+ try {
+ requestListener.handleRequest(context, castRequest);
+ } catch (RemoteExecutionException e) {
+ SpiUtils.safeHandleException(replyHandler, e);
+ } catch (Throwable t) {
+ SpiUtils.safeHandleException(replyHandler, new RemoteExecutionException("Request handler threw an exception", t));
+ }
}
- }
- });
+ });
+ } catch (RejectedExecutionException e) {
+ SpiUtils.safeHandleException(replyHandler, new RemoteRequestException("Execution was rejected (server may be too busy)", e));
+ return SpiUtils.getBlankRemoteRequestContext();
+ }
return new RemoteRequestContext() {
public void cancel() {
context.cancel();
@@ -101,4 +110,50 @@
public String toString() {
return "local request handler <" + Integer.toString(hashCode(), 16) + "> (request listener = " + String.valueOf(requestListener) + ")";
}
+
+ static class Config<I, O> {
+ private final Class<I> requestClass;
+ private final Class<O> replyClass;
+
+ private Executor executor;
+ private RequestListener<I, O> requestListener;
+ private ClientContextImpl clientContext;
+
+ Config(final Class<I> requestClass, final Class<O> replyClass) {
+ this.requestClass = requestClass;
+ this.replyClass = replyClass;
+ }
+
+ public Class<I> getRequestClass() {
+ return requestClass;
+ }
+
+ public Class<O> getReplyClass() {
+ return replyClass;
+ }
+
+ public Executor getExecutor() {
+ return executor;
+ }
+
+ public void setExecutor(final Executor executor) {
+ this.executor = executor;
+ }
+
+ public RequestListener<I, O> getRequestListener() {
+ return requestListener;
+ }
+
+ public void setRequestListener(final RequestListener<I, O> requestListener) {
+ this.requestListener = requestListener;
+ }
+
+ public ClientContextImpl getClientContext() {
+ return clientContext;
+ }
+
+ public void setClientContext(final ClientContextImpl clientContext) {
+ this.clientContext = clientContext;
+ }
+ }
}
Modified: remoting3/trunk/core/src/main/java/org/jboss/remoting/core/LocalRequestHandlerSource.java
===================================================================
--- remoting3/trunk/core/src/main/java/org/jboss/remoting/core/LocalRequestHandlerSource.java 2008-10-30 05:00:22 UTC (rev 4641)
+++ remoting3/trunk/core/src/main/java/org/jboss/remoting/core/LocalRequestHandlerSource.java 2008-11-04 02:34:36 UTC (rev 4642)
@@ -40,19 +40,27 @@
private final RequestListener<I, O> requestListener;
private final ServiceContextImpl serviceContext;
private final Executor executor;
+ private final Class<I> requestClass;
+ private final Class<O> replyClass;
private static final Logger log = Logger.getLogger(LocalRequestHandlerSource.class);
- LocalRequestHandlerSource(final Executor executor, final RequestListener<I, O> requestListener) {
- super(executor);
- this.requestListener = requestListener;
- this.executor = executor;
+ LocalRequestHandlerSource(final Config<I, O> config) {
+ super(config.getExecutor());
+ requestClass = config.getRequestClass();
+ replyClass = config.getReplyClass();
+ requestListener = config.getRequestListener();
+ executor = config.getExecutor();
serviceContext = new ServiceContextImpl(executor);
}
public Handle<RequestHandler> createRequestHandler() throws IOException {
if (isOpen()) {
- final LocalRequestHandler<I, O> localRequestHandler = new LocalRequestHandler<I, O>(executor, this, requestListener);
+ final LocalRequestHandler.Config<I, O> config = new LocalRequestHandler.Config<I, O>(requestClass, replyClass);
+ config.setExecutor(executor);
+ config.setRequestListener(requestListener);
+ config.setClientContext(new ClientContextImpl(serviceContext));
+ final LocalRequestHandler<I, O> localRequestHandler = new LocalRequestHandler<I, O>(config);
localRequestHandler.open();
return localRequestHandler.getHandle();
} else {
@@ -86,4 +94,40 @@
public String toString() {
return "local request handler source <" + Integer.toString(hashCode(), 16) + "> (request listener = " + String.valueOf(requestListener) + ")";
}
+
+ static class Config<I, O> {
+ private final Class<I> requestClass;
+ private final Class<O> replyClass;
+ private Executor executor;
+ private RequestListener<I, O> requestListener;
+
+ Config(final Class<I> requestClass, final Class<O> replyClass) {
+ this.requestClass = requestClass;
+ this.replyClass = replyClass;
+ }
+
+ public Class<I> getRequestClass() {
+ return requestClass;
+ }
+
+ public Class<O> getReplyClass() {
+ return replyClass;
+ }
+
+ public Executor getExecutor() {
+ return executor;
+ }
+
+ public void setExecutor(final Executor executor) {
+ this.executor = executor;
+ }
+
+ public RequestListener<I, O> getRequestListener() {
+ return requestListener;
+ }
+
+ public void setRequestListener(final RequestListener<I, O> requestListener) {
+ this.requestListener = requestListener;
+ }
+ }
}
Modified: remoting3/trunk/core/src/main/java/org/jboss/remoting/core/RequestContextImpl.java
===================================================================
--- remoting3/trunk/core/src/main/java/org/jboss/remoting/core/RequestContextImpl.java 2008-10-30 05:00:22 UTC (rev 4641)
+++ remoting3/trunk/core/src/main/java/org/jboss/remoting/core/RequestContextImpl.java 2008-11-04 02:34:36 UTC (rev 4642)
@@ -27,6 +27,7 @@
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.Executor;
import org.jboss.remoting.ClientContext;
import org.jboss.remoting.IndeterminateOutcomeException;
import org.jboss.remoting.RemoteExecutionException;
@@ -51,13 +52,16 @@
private boolean cancelled;
// @protectedby cancelLock
private Set<RequestCancelHandler<O>> cancelHandlers;
- private final RequestListenerExecutor executor;
+ private final RequestListenerExecutor interruptingExecutor;
+ private final Class<O> replyClass;
- RequestContextImpl(final ReplyHandler replyHandler, final ClientContextImpl clientContext) {
+ RequestContextImpl(final ReplyHandler replyHandler, final ClientContextImpl clientContext, final Class<O> replyClass) {
this.replyHandler = replyHandler;
this.clientContext = clientContext;
+ this.replyClass = replyClass;
+ final Executor executor = clientContext.getExecutor();
//noinspection ThisEscapedInObjectConstruction
- executor = new RequestListenerExecutor(clientContext.getExecutor(), this);
+ interruptingExecutor = new RequestListenerExecutor(executor, this);
}
public ClientContext getContext() {
@@ -72,8 +76,15 @@
public void sendReply(final O reply) throws IOException, IllegalStateException {
if (! closed.getAndSet(true)) {
+ final O actualReply;
try {
- replyHandler.handleReply(reply);
+ actualReply = replyClass.cast(reply);
+ } catch (ClassCastException e) {
+ SpiUtils.safeHandleException(replyHandler, new RemoteReplyException("Remote reply was the wrong type", e));
+ throw e;
+ }
+ try {
+ replyHandler.handleReply(actualReply);
} catch (IOException e) {
SpiUtils.safeHandleException(replyHandler, new RemoteReplyException("Remote reply failed", e));
throw e;
@@ -118,7 +129,7 @@
}
public void execute(final Runnable command) {
- executor.execute(command);
+ interruptingExecutor.execute(command);
}
protected void cancel() {
@@ -127,7 +138,7 @@
cancelled = true;
if (cancelHandlers != null) {
for (final RequestCancelHandler<O> handler : cancelHandlers) {
- executor.execute(new Runnable() {
+ interruptingExecutor.execute(new Runnable() {
public void run() {
SpiUtils.safeNotifyCancellation(handler, RequestContextImpl.this);
}
@@ -135,7 +146,7 @@
}
cancelHandlers = null;
}
- executor.interruptAll();
+ interruptingExecutor.interruptAll();
}
}
}
Modified: remoting3/trunk/core/src/test/java/org/jboss/remoting/core/EndpointTestCase.java
===================================================================
--- remoting3/trunk/core/src/test/java/org/jboss/remoting/core/EndpointTestCase.java 2008-10-30 05:00:22 UTC (rev 4641)
+++ remoting3/trunk/core/src/test/java/org/jboss/remoting/core/EndpointTestCase.java 2008-11-04 02:34:36 UTC (rev 4642)
@@ -92,7 +92,7 @@
}
}
}
- });
+ }, INIT_ME, INIT_ME2);
final RequestHandler requestHandler = handle.getResource();
try {
requestHandler.addCloseHandler(new CloseHandler<RequestHandler>() {
@@ -100,7 +100,7 @@
clientEndpointClosed.set(true);
}
});
- final Client<Object,Object> client = endpoint.createClient(requestHandler);
+ final Client<Object,Object> client = endpoint.createClient(requestHandler, requestType, replyType);
try {
client.addCloseHandler(new CloseHandler<Client<Object, Object>>() {
public void handleClose(final Client<Object, Object> closed) {
@@ -151,7 +151,7 @@
}
}
}
- });
+ }, INIT_ME, INIT_ME2);
final RequestHandler requestHandler = handle.getResource();
try {
requestHandler.addCloseHandler(new CloseHandler<RequestHandler>() {
@@ -159,7 +159,7 @@
clientEndpointClosed.set(true);
}
});
- final Client<Object,Object> client = endpoint.createClient(requestHandler);
+ final Client<Object,Object> client = endpoint.createClient(requestHandler, requestType, replyType);
try {
client.addCloseHandler(new CloseHandler<Client<Object, Object>>() {
public void handleClose(final Client<Object, Object> closed) {
Modified: remoting3/trunk/protocol/basic/src/main/java/org/jboss/remoting/protocol/basic/BasicConfiguration.java
===================================================================
--- remoting3/trunk/protocol/basic/src/main/java/org/jboss/remoting/protocol/basic/BasicConfiguration.java 2008-10-30 05:00:22 UTC (rev 4641)
+++ remoting3/trunk/protocol/basic/src/main/java/org/jboss/remoting/protocol/basic/BasicConfiguration.java 2008-11-04 02:34:36 UTC (rev 4642)
@@ -23,7 +23,7 @@
package org.jboss.remoting.protocol.basic;
import org.jboss.marshalling.MarshallerFactory;
-import org.jboss.marshalling.Configuration;
+import org.jboss.marshalling.MarshallingConfiguration;
import org.jboss.xnio.BufferAllocator;
import java.util.concurrent.Executor;
import java.nio.ByteBuffer;
@@ -33,7 +33,7 @@
*/
public final class BasicConfiguration {
private MarshallerFactory marshallerFactory;
- private Configuration marshallingConfiguration;
+ private MarshallingConfiguration marshallingConfiguration;
private int linkMetric;
private Executor executor;
private BufferAllocator<ByteBuffer> allocator;
@@ -46,11 +46,11 @@
this.marshallerFactory = marshallerFactory;
}
- public Configuration getMarshallingConfiguration() {
+ public MarshallingConfiguration getMarshallingConfiguration() {
return marshallingConfiguration;
}
- public void setMarshallingConfiguration(final Configuration marshallingConfiguration) {
+ public void setMarshallingConfiguration(final MarshallingConfiguration marshallingConfiguration) {
this.marshallingConfiguration = marshallingConfiguration;
}
Modified: remoting3/trunk/protocol/basic/src/main/java/org/jboss/remoting/protocol/basic/BasicProtocol.java
===================================================================
--- remoting3/trunk/protocol/basic/src/main/java/org/jboss/remoting/protocol/basic/BasicProtocol.java 2008-10-30 05:00:22 UTC (rev 4641)
+++ remoting3/trunk/protocol/basic/src/main/java/org/jboss/remoting/protocol/basic/BasicProtocol.java 2008-11-04 02:34:36 UTC (rev 4642)
@@ -28,7 +28,7 @@
import org.jboss.xnio.channels.StreamChannel;
import org.jboss.xnio.channels.ChannelOutputStream;
import org.jboss.xnio.channels.ChannelInputStream;
-import org.jboss.marshalling.Configuration;
+import org.jboss.marshalling.MarshallingConfiguration;
import org.jboss.marshalling.MarshallerFactory;
import org.jboss.marshalling.Unmarshaller;
import org.jboss.marshalling.Marshaller;
@@ -52,7 +52,7 @@
public static final void createServer(final Handle<RequestHandler> requestHandlerHandle, final StreamChannel streamChannel, final BasicConfiguration configuration) throws IOException {
final RequestHandler requestHandler = requestHandlerHandle.getResource();
- final Configuration marshallerConfiguration = configuration.getMarshallingConfiguration();
+ final MarshallingConfiguration marshallerConfiguration = configuration.getMarshallingConfiguration();
final MarshallerFactory marshallerFactory = configuration.getMarshallerFactory();
final Marshaller marshaller = marshallerFactory.createMarshaller(marshallerConfiguration);
final Unmarshaller unmarshaller = marshallerFactory.createUnmarshaller(marshallerConfiguration);
@@ -60,12 +60,14 @@
marshaller.start(Marshalling.createByteOutput(new ChannelOutputStream(streamChannel)));
unmarshaller.start(Marshalling.createByteInput(new ChannelInputStream(streamChannel)));
final BlockingQueue<FutureBasicReply> replyQueue = new LinkedBlockingQueue<FutureBasicReply>();
+ // todo - handle rejected execution...
executor.execute(new BasicServerReplyTransmitter(replyQueue, marshaller, streamChannel, requestHandlerHandle));
+ // todo - handle rejected execution...
executor.execute(new BasicServerRequestConsumer(unmarshaller, requestHandler, replyQueue, streamChannel, requestHandlerHandle));
}
public static final Handle<RequestHandler> createClient(final StreamChannel streamChannel, final BasicConfiguration configuration) throws IOException {
- final Configuration marshallerConfiguration = configuration.getMarshallingConfiguration();
+ final MarshallingConfiguration marshallerConfiguration = configuration.getMarshallingConfiguration();
final MarshallerFactory marshallerFactory = configuration.getMarshallerFactory();
final Marshaller marshaller = marshallerFactory.createMarshaller(marshallerConfiguration);
final Unmarshaller unmarshaller = marshallerFactory.createUnmarshaller(marshallerConfiguration);
@@ -74,6 +76,7 @@
unmarshaller.start(Marshalling.createByteInput(new ChannelInputStream(streamChannel)));
final Lock reqLock = new ReentrantLock();
final Queue<ReplyHandler> replyQueue = new LinkedList<ReplyHandler>();
+ // todo - handle rejected execution...
executor.execute(new BasicHandlerReplyConsumer(unmarshaller, streamChannel, reqLock, replyQueue));
return new BasicRequestHandler(reqLock, marshaller, replyQueue, streamChannel, executor).getHandle();
}
Modified: remoting3/trunk/protocol/basic/src/test/java/org/jboss/remoting/protocol/basic/BasicTestCase.java
===================================================================
--- remoting3/trunk/protocol/basic/src/test/java/org/jboss/remoting/protocol/basic/BasicTestCase.java 2008-10-30 05:00:22 UTC (rev 4641)
+++ remoting3/trunk/protocol/basic/src/test/java/org/jboss/remoting/protocol/basic/BasicTestCase.java 2008-11-04 02:34:36 UTC (rev 4642)
@@ -40,7 +40,7 @@
import org.jboss.remoting.test.support.LoggingHelper;
import org.jboss.remoting.spi.RequestHandler;
import org.jboss.remoting.spi.Handle;
-import org.jboss.marshalling.Configuration;
+import org.jboss.marshalling.MarshallingConfiguration;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.LinkedBlockingQueue;
@@ -61,7 +61,7 @@
final BasicConfiguration configuration = new BasicConfiguration();
configuration.setExecutor(executor);
configuration.setMarshallerFactory(new RiverMarshallerFactory());
- final Configuration marshallingConfiguration = new Configuration();
+ final MarshallingConfiguration marshallingConfiguration = new MarshallingConfiguration();
configuration.setMarshallingConfiguration(marshallingConfiguration);
final Endpoint endpoint = Remoting.createEndpoint("test");
final Handle<RequestHandler> requestHandlerHandle = endpoint.createRequestHandler(new AbstractRequestListener<Object, Object>() {
@@ -77,7 +77,7 @@
}
}
}
- });
+ }, INIT_ME, INIT_ME2);
final ChannelSource<StreamChannel> channelSource = xnio.createPipeServer(executor, IoUtils.singletonHandlerFactory(new IoHandler<StreamChannel>() {
public void handleOpened(final StreamChannel channel) {
try {
@@ -101,7 +101,7 @@
}));
final IoFuture<StreamChannel> futureChannel = channelSource.open(IoUtils.nullHandler());
final Handle<RequestHandler> clientHandlerHandle = BasicProtocol.createClient(futureChannel.get(), configuration);
- final Client<Object,Object> client = endpoint.createClient(clientHandlerHandle.getResource());
+ final Client<Object,Object> client = endpoint.createClient(clientHandlerHandle.getResource(), requestType, replyType);
System.out.println("Reply is:" + client.invoke("GORBA!"));
}
Added: remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/IdentityHashIntegerBiMap.java
===================================================================
--- remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/IdentityHashIntegerBiMap.java (rev 0)
+++ remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/IdentityHashIntegerBiMap.java 2008-11-04 02:34:36 UTC (rev 4642)
@@ -0,0 +1,79 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2008, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.remoting.protocol.multiplex;
+
+import java.util.HashMap;
+import java.util.IdentityHashMap;
+
+/**
+ *
+ */
+final class IdentityHashIntegerBiMap<T> implements IntegerBiMap<T> {
+
+ private final HashMap<Integer, T> leftMap;
+ private final IdentityHashMap<T, Integer> rightMap;
+
+ public IdentityHashIntegerBiMap(int initialCapacity, float loadFactor) {
+ leftMap = new HashMap<Integer, T>(initialCapacity, loadFactor);
+ rightMap = new IdentityHashMap<T, Integer>((int) (initialCapacity / loadFactor));
+ }
+
+ public IdentityHashIntegerBiMap() {
+ this(256, 0.4f);
+ }
+
+ public int get(final T key, final int defValue) {
+ final Integer v = rightMap.get(key);
+ return v == null ? defValue : v.intValue();
+ }
+
+ public T get(final int key) {
+ return leftMap.get(Integer.valueOf(key));
+ }
+
+ public void put(final int key1, final T key2) {
+ final Integer key1Obj = Integer.valueOf(key1);
+ final T oldKey2 = leftMap.put(key1Obj, key2);
+ final Integer oldKey1Obj = rightMap.put(key2, key1Obj);
+ rightMap.remove(oldKey2);
+ leftMap.remove(oldKey1Obj);
+ }
+
+ public T remove(final int key) {
+ final T oldRightKey = leftMap.remove(Integer.valueOf(key));
+ rightMap.remove(oldRightKey);
+ return oldRightKey;
+ }
+
+ public void remove(final T key) {
+ leftMap.remove(rightMap.remove(key));
+ }
+
+ public static <T> IntegerBiMap<T> create() {
+ return new IdentityHashIntegerBiMap<T>();
+ }
+
+ public static <T> IntegerBiMap<T> createSynchronizing() {
+ return IntegerBiMap.Util.synchronizing(new IdentityHashIntegerBiMap<T>());
+ }
+}
Added: remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/IdentityHashIntegerResourceBiMap.java
===================================================================
--- remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/IdentityHashIntegerResourceBiMap.java (rev 0)
+++ remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/IdentityHashIntegerResourceBiMap.java 2008-11-04 02:34:36 UTC (rev 4642)
@@ -0,0 +1,104 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2008, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.remoting.protocol.multiplex;
+
+import java.util.HashMap;
+import java.util.IdentityHashMap;
+import java.util.Iterator;
+import java.util.Map;
+import org.jboss.remoting.spi.AutoCloseable;
+import org.jboss.remoting.spi.Handle;
+
+/**
+ *
+ */
+final class IdentityHashIntegerResourceBiMap<T extends AutoCloseable<T>> implements IntegerResourceBiMap<T> {
+
+ private final HashMap<Integer, Handle<T>> leftMap;
+ private final IdentityHashMap<T, Integer> rightMap;
+
+ public IdentityHashIntegerResourceBiMap(int initialCapacity, float loadFactor) {
+ leftMap = new HashMap<Integer, Handle<T>>(initialCapacity, loadFactor);
+ rightMap = new IdentityHashMap<T, Integer>((int) (initialCapacity / loadFactor));
+ }
+
+ public IdentityHashIntegerResourceBiMap() {
+ this(256, 0.4f);
+ }
+
+ public int get(final T key, final int defValue) {
+ final Integer v = rightMap.get(key);
+ return v == null ? defValue : v.intValue();
+ }
+
+ public Handle<T> get(final int key) {
+ return leftMap.get(Integer.valueOf(key));
+ }
+
+ public void put(final int key1, final Handle<T> key2) {
+ final Integer key1Obj = Integer.valueOf(key1);
+ final Handle<T> oldKey2 = leftMap.put(key1Obj, key2);
+ final Integer oldKey1Obj = rightMap.put(key2.getResource(), key1Obj);
+ if (oldKey2 != null) rightMap.remove(oldKey2.getResource());
+ if (oldKey1Obj != null) leftMap.remove(oldKey1Obj);
+ }
+
+ public Handle<T> remove(final int key) {
+ final Handle<T> oldRightKey = leftMap.remove(Integer.valueOf(key));
+ if (oldRightKey != null) rightMap.remove(oldRightKey.getResource());
+ return oldRightKey;
+ }
+
+ public void remove(final T key) {
+ leftMap.remove(rightMap.remove(key));
+ }
+
+ public static <T extends AutoCloseable<T>> IntegerResourceBiMap<T> create() {
+ return new IdentityHashIntegerResourceBiMap<T>();
+ }
+
+ public static <T extends AutoCloseable<T>> IntegerResourceBiMap<T> createSynchronizing() {
+ return Util.synchronizing(new IdentityHashIntegerResourceBiMap<T>());
+ }
+
+ public Iterator<Handle<T>> iterator() {
+ final Iterator<Map.Entry<Integer, Handle<T>>> delegate = leftMap.entrySet().iterator();
+ return new Iterator<Handle<T>>() {
+ private Map.Entry<Integer, Handle<T>> current;
+
+ public boolean hasNext() {
+ return delegate.hasNext();
+ }
+
+ public Handle<T> next() {
+ current = delegate.next();
+ return current.getValue();
+ }
+
+ public void remove() {
+ delegate.remove();
+ rightMap.remove(current.getValue().getResource());
+ }
+ };
+ }
+}
\ No newline at end of file
Deleted: remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/IdentityIntMap.java
===================================================================
--- remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/IdentityIntMap.java 2008-10-30 05:00:22 UTC (rev 4641)
+++ remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/IdentityIntMap.java 2008-11-04 02:34:36 UTC (rev 4642)
@@ -1,153 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2008, JBoss Inc., and individual contributors as indicated
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-
-package org.jboss.remoting.protocol.multiplex;
-
-import java.util.Arrays;
-
-/**
- *
- */
-final class IdentityIntMap<T> {
-
- private int[] values;
- private Object[] keys;
- private int count;
- private int resizeCount;
-
- public IdentityIntMap(int initialCapacity, final float loadFactor) {
- if (initialCapacity < 1) {
- throw new IllegalArgumentException("initialCapacity must be > 0");
- }
- if (loadFactor <= 0.0f || loadFactor >= 1.0f) {
- throw new IllegalArgumentException("loadFactor must be > 0.0 and < 1.0");
- }
- if (initialCapacity < 16) {
- initialCapacity = 16;
- } else {
- // round up
- final int c = Integer.highestOneBit(initialCapacity) - 1;
- initialCapacity = Integer.highestOneBit(initialCapacity + c);
- }
- keys = new Object[initialCapacity];
- values = new int[initialCapacity];
- resizeCount = (int) ((double) initialCapacity * (double) loadFactor);
- }
-
- public IdentityIntMap(final float loadFactor) {
- this(64, loadFactor);
- }
-
- public IdentityIntMap(final int initialCapacity) {
- this(initialCapacity, 0.5f);
- }
-
- public IdentityIntMap() {
- this(0.5f);
- }
-
- public int get(T key, int defVal) {
- if (key == null) {
- throw new NullPointerException("key is null");
- }
- final Object[] keys = this.keys;
- final int mask = keys.length - 1;
- int hc = System.identityHashCode(key) & mask;
- Object v;
- for (;;) {
- v = keys[hc];
- if (v == key) {
- return values[hc];
- }
- if (v == null) {
- // not found
- return defVal;
- }
- hc = (hc + 1) & mask;
- }
- }
-
- public void put(T key, int value) {
- if (key == null) {
- throw new NullPointerException("key is null");
- }
- final Object[] keys = this.keys;
- final int mask = keys.length - 1;
- final int[] values = this.values;
- Object v;
- int hc = System.identityHashCode(key) & mask;
- for (int idx = hc;; idx = hc++ & mask) {
- v = keys[idx];
- if (v == null) {
- keys[idx] = key;
- values[idx] = value;
- if (++count > resizeCount) {
- resize();
- }
- return;
- }
- if (v == key) {
- values[idx] = value;
- return;
- }
- }
- }
-
- private final void resize() {
- final Object[] oldKeys = keys;
- final int oldsize = oldKeys.length;
- final int[] oldValues = values;
- if (oldsize >= 0x40000000) {
- throw new IllegalStateException("Table full");
- }
- final int newsize = oldsize << 1;
- final int mask = newsize - 1;
- final Object[] newKeys = new Object[newsize];
- final int[] newValues = new int[newsize];
- keys = newKeys;
- values = newValues;
- if ((resizeCount <<= 1) == 0) {
- resizeCount = Integer.MAX_VALUE;
- }
- for (int oi = 0; oi < oldsize; oi ++) {
- final Object key = oldKeys[oi];
- if (key != null) {
- int ni = System.identityHashCode(key) & mask;
- for (;;) {
- final Object v = newKeys[ni];
- if (v == null) {
- // found
- newKeys[ni] = key;
- newValues[ni] = oldValues[oi];
- break;
- }
- ni = (ni + 1) & mask;
- }
- }
- }
- }
-
- public void clear() {
- Arrays.fill(keys, null);
- count = 0;
- }
-}
Added: remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/IntegerBiMap.java
===================================================================
--- remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/IntegerBiMap.java (rev 0)
+++ remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/IntegerBiMap.java 2008-11-04 02:34:36 UTC (rev 4642)
@@ -0,0 +1,89 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2008, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.remoting.protocol.multiplex;
+
+/**
+ *
+ */
+interface IntegerBiMap<T> {
+ int get(T key, int defValue);
+
+ T get(int key);
+
+ void put(int key1, T key2);
+
+ T remove(int key);
+
+ void remove(T key);
+
+ class Util {
+
+ private Util() {
+ }
+
+ private static class SyncWrapper<T> implements IntegerBiMap<T> {
+
+ private final IntegerBiMap<T> orig;
+ private final Object lock;
+
+ private SyncWrapper(IntegerBiMap<T> orig, Object lock) {
+ this.orig = orig;
+ this.lock = lock;
+ }
+
+ public int get(final T key, final int defValue) {
+ synchronized (lock) {
+ return orig.get(key, defValue);
+ }
+ }
+
+ public T get(final int key) {
+ synchronized (lock) {
+ return orig.get(key);
+ }
+ }
+
+ public void put(final int key1, final T key2) {
+ synchronized (lock) {
+ orig.put(key1, key2);
+ }
+ }
+
+ public T remove(final int key) {
+ synchronized (lock) {
+ return orig.remove(key);
+ }
+ }
+
+ public void remove(final T key) {
+ synchronized (lock) {
+ orig.remove(key);
+ }
+ }
+ }
+
+ public static <T> IntegerBiMap<T> synchronizing(IntegerBiMap<T> orig) {
+ return new SyncWrapper<T>(orig, new Object());
+ }
+ }
+}
Added: remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/IntegerResourceBiMap.java
===================================================================
--- remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/IntegerResourceBiMap.java (rev 0)
+++ remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/IntegerResourceBiMap.java 2008-11-04 02:34:36 UTC (rev 4642)
@@ -0,0 +1,97 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2008, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.remoting.protocol.multiplex;
+
+import org.jboss.remoting.spi.AutoCloseable;
+import org.jboss.remoting.spi.Handle;
+import java.util.Iterator;
+
+/**
+ *
+ */
+interface IntegerResourceBiMap<T extends AutoCloseable<T>> extends Iterable<Handle<T>> {
+ int get(T key, int defValue);
+
+ Handle<T> get(int key);
+
+ void put(int key1, Handle<T> key2);
+
+ Handle<T> remove(int key);
+
+ void remove(T key);
+
+ class Util {
+
+ private Util() {
+ }
+
+ private static class SyncWrapper<T extends AutoCloseable<T>> implements IntegerResourceBiMap<T> {
+
+ private final IntegerResourceBiMap<T> orig;
+ private final Object lock;
+
+ private SyncWrapper(IntegerResourceBiMap<T> orig, Object lock) {
+ this.orig = orig;
+ this.lock = lock;
+ }
+
+ public int get(final T key, final int defValue) {
+ synchronized (lock) {
+ return orig.get(key, defValue);
+ }
+ }
+
+ public Handle<T> get(final int key) {
+ synchronized (lock) {
+ return orig.get(key);
+ }
+ }
+
+ public void put(final int key1, final Handle<T> key2) {
+ synchronized (lock) {
+ orig.put(key1, key2);
+ }
+ }
+
+ public Handle<T> remove(final int key) {
+ synchronized (lock) {
+ return orig.remove(key);
+ }
+ }
+
+ public void remove(final T key) {
+ synchronized (lock) {
+ orig.remove(key);
+ }
+ }
+
+ public Iterator<Handle<T>> iterator() {
+ return null;
+ }
+ }
+
+ public static <T extends AutoCloseable<T>> IntegerResourceBiMap<T> synchronizing(IntegerResourceBiMap<T> orig) {
+ return new SyncWrapper<T>(orig, new Object());
+ }
+ }
+}
Modified: remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/MessageType.java
===================================================================
--- remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/MessageType.java 2008-10-30 05:00:22 UTC (rev 4641)
+++ remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/MessageType.java 2008-11-04 02:34:36 UTC (rev 4642)
@@ -27,16 +27,26 @@
*/
enum MessageType {
- // One-way request, no return value may be sent
- // Two-way request, return value is expected
+ /**
+ * The request part of a request-response sequence, sent from the Client to the RequestListener.
+ */
REQUEST(2),
- // Reply
+ /**
+ * The reply part of a request-response sequence, sent from the RequestListener to the Client.
+ */
REPLY(3),
- // Attempt to cancel a request
+ /**
+ * A cancellation request for an outstanding request, sent from the Client to the RequestListener.
+ */
CANCEL_REQUEST(4),
- // Acknowledge that a request was cancelled
+ /**
+ * Acknowlegement that a request was cancelled, sent from the RequestListener to the Client.
+ */
CANCEL_ACK(5),
- // Request failed due to protocol or unmarshalling problem
+ /**
+ * Message that the request could not be received on the remote end, sent from to the Client from the
+ * protocol handler as a
+ */
REQUEST_RECEIVE_FAILED(6),
// Request failed due to exception
REQUEST_FAILED(7),
Modified: remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/MultiplexConfiguration.java
===================================================================
--- remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/MultiplexConfiguration.java 2008-10-30 05:00:22 UTC (rev 4641)
+++ remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/MultiplexConfiguration.java 2008-11-04 02:34:36 UTC (rev 4642)
@@ -26,14 +26,14 @@
import java.nio.ByteBuffer;
import org.jboss.xnio.BufferAllocator;
import org.jboss.marshalling.MarshallerFactory;
-import org.jboss.marshalling.Configuration;
+import org.jboss.marshalling.MarshallingConfiguration;
/**
* A configuration object for the multiplex protocol.
*/
public final class MultiplexConfiguration {
private MarshallerFactory marshallerFactory;
- private Configuration marshallingConfiguration;
+ private MarshallingConfiguration marshallingConfiguration;
private int linkMetric;
private Executor executor;
private BufferAllocator<ByteBuffer> allocator;
@@ -67,7 +67,7 @@
*
* @return the configuration
*/
- public Configuration getMarshallingConfiguration() {
+ public MarshallingConfiguration getMarshallingConfiguration() {
return marshallingConfiguration;
}
@@ -76,7 +76,7 @@
*
* @param marshallingConfiguration the configuration
*/
- public void setMarshallingConfiguration(final Configuration marshallingConfiguration) {
+ public void setMarshallingConfiguration(final MarshallingConfiguration marshallingConfiguration) {
this.marshallingConfiguration = marshallingConfiguration;
}
Modified: remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/MultiplexHandler.java
===================================================================
--- remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/MultiplexHandler.java 2008-10-30 05:00:22 UTC (rev 4641)
+++ remoting3/trunk/protocol/multiplex/src/main/java/org/jboss/remoting/protocol/multiplex/MultiplexHandler.java 2008-11-04 02:34:36 UTC (rev 4642)
@@ -34,19 +34,19 @@
import org.jboss.remoting.spi.Handle;
import org.jboss.remoting.spi.SpiUtils;
import org.jboss.remoting.spi.AbstractAutoCloseable;
-import static org.jboss.remoting.util.CollectionUtil.concurrentIntegerMap;
import org.jboss.remoting.util.CollectionUtil;
-import org.jboss.remoting.util.ConcurrentIntegerMap;
import org.jboss.remoting.CloseHandler;
import org.jboss.remoting.Endpoint;
import org.jboss.remoting.SimpleCloseable;
import org.jboss.remoting.RemoteExecutionException;
import org.jboss.remoting.IndeterminateOutcomeException;
+import org.jboss.remoting.ReplyException;
+import org.jboss.remoting.RemoteServiceConfiguration;
import org.jboss.marshalling.MarshallerFactory;
import org.jboss.marshalling.Unmarshaller;
import org.jboss.marshalling.ByteOutput;
import org.jboss.marshalling.Marshaller;
-import org.jboss.marshalling.Configuration;
+import org.jboss.marshalling.MarshallingConfiguration;
import org.jboss.marshalling.ObjectTable;
import org.jboss.marshalling.Marshalling;
import java.util.concurrent.BlockingQueue;
@@ -60,6 +60,7 @@
import java.io.IOException;
import java.io.InvalidClassException;
import java.io.InterruptedIOException;
+import java.io.InvalidObjectException;
/**
* Protocol handler for the basic message-oriented Remoting protocol.
@@ -70,40 +71,41 @@
//--== Connection configuration items ==--
private final MarshallerFactory marshallerFactory;
- private final Configuration marshallingConfiguration;
+ private final MarshallingConfiguration marshallingConfiguration;
private final int linkMetric;
private final Executor executor;
// buffer allocator for outbound message assembly
private final BufferAllocator<ByteBuffer> allocator;
// running on remote node
- private final ConcurrentIntegerMap<ReplyHandler> remoteRequests = concurrentIntegerMap();
+ private final IntegerBiMap<ReplyHandler> remoteRequests = IdentityHashIntegerBiMap.createSynchronizing();
// running on local node
- private final ConcurrentIntegerMap<RemoteRequestContext> localRequests = concurrentIntegerMap();
+ private final IntegerBiMap<RemoteRequestContext> localRequests = IdentityHashIntegerBiMap.createSynchronizing();
// sequence for remote requests
private final AtomicInteger requestSequence = new AtomicInteger();
// clients whose requests get forwarded to the remote side
// even #s were opened from services forwarded to us (our sequence)
// odd #s were forwarded directly to us (remote sequence)
- private final ConcurrentIntegerMap<RequestHandler> remoteClients = concurrentIntegerMap();
+ private final IntegerBiMap<RequestHandler> remoteClients = IdentityHashIntegerBiMap.createSynchronizing();
// forwarded to remote side (handled on this side)
- private final ConcurrentIntegerMap<Handle<RequestHandler>> forwardedClients = concurrentIntegerMap();
- // sequence for forwarded clients (unsigned; shift left one bit, add one)
+ private final IntegerResourceBiMap<RequestHandler> forwardedClients = IdentityHashIntegerResourceBiMap.createSynchronizing();
+ // sequence for forwarded clients (shift left one bit, add one, limit is 2^30)
private final AtomicInteger forwardedClientSequence = new AtomicInteger();
- // sequence for clients created from services forwarded to us (unsigned; shift left one bit)
+ // sequence for clients created from services forwarded to us (shift left one bit, limit is 2^30)
private final AtomicInteger remoteClientSequence = new AtomicInteger();
// services forwarded to us
- private final ConcurrentIntegerMap<RequestHandlerSource> remoteServices = concurrentIntegerMap();
+ private final IntegerBiMap<RequestHandlerSource> remoteServices = IdentityHashIntegerBiMap.createSynchronizing();
// forwarded to remote side (handled on this side)
- private final ConcurrentIntegerMap<Handle<RequestHandlerSource>> forwardedServices = concurrentIntegerMap();
+ private final IntegerResourceBiMap<RequestHandlerSource> forwardedServices = IdentityHashIntegerResourceBiMap.createSynchronizing();
// sequence for forwarded services
- private final AtomicInteger serviceSequence = new AtomicInteger();
+ private final AtomicInteger forwardedServiceSequence = new AtomicInteger();
private final Endpoint endpoint;
private volatile AllocatedMessageChannel channel;
+ private static final StackTraceElement[] emptyStackTraceElements = new StackTraceElement[0];
public MultiplexHandler(final Endpoint endpoint, final MultiplexConfiguration configuration) {
this.endpoint = endpoint;
@@ -114,6 +116,28 @@
linkMetric = configuration.getLinkMetric();
}
+ // sequence methods
+
+ int nextRequest() {
+ return requestSequence.getAndIncrement() & 0x7fffffff;
+ }
+
+ int nextForwardedClient() {
+ return (forwardedClientSequence.getAndIncrement() << 1 | 1) & 0x7fffffff;
+ }
+
+ int nextRemoteClient() {
+ return remoteClientSequence.getAndIncrement() << 1 & 0x7fffffff;
+ }
+
+ int nextForwardedService() {
+ return forwardedServiceSequence.getAndIncrement() & 0x7fffffff;
+ }
+
+ void setChannel(final AllocatedMessageChannel channel) {
+ this.channel = channel;
+ }
+
public void handleOpened(final AllocatedMessageChannel channel) {
channel.resumeReads();
}
@@ -165,18 +189,40 @@
payload = unmarshaller.readObject();
unmarshaller.finish();
} catch (ClassNotFoundException e) {
- log.trace("Class not found in request ID %d for client ID %d", Integer.valueOf(requestId), Integer.valueOf(clientId));
- // todo - send request receive failed message
break;
}
} finally {
IoUtils.safeClose(unmarshaller);
}
- } catch (IOException ex) {
+ } catch (Exception ex) {
+ // IOException | ClassNotFoundException
log.trace("Failed to unmarshal a request (%s), sending %s", ex, MessageType.REQUEST_RECEIVE_FAILED);
- // todo send a request failure message
+ try {
+ final Marshaller marshaller = marshallerFactory.createMarshaller(marshallingConfiguration);
+ try {
+ List<ByteBuffer> buffers = new ArrayList<ByteBuffer>();
+ marshaller.start(createByteOutput(allocator, buffers));
+ marshaller.write(MessageType.REQUEST_RECEIVE_FAILED.getId());
+ final IOException ioe = new IOException("Request receive failed");
+ ex.setStackTrace(emptyStackTraceElements);
+ ioe.initCause(ex);
+ ioe.setStackTrace(emptyStackTraceElements);
+ marshaller.writeObject(ioe);
+ marshaller.finish();
+ registerWriter(channel, new SimpleWriteHandler(allocator, buffers));
+ } catch (InterruptedException e1) {
+ Thread.currentThread().interrupt();
+ log.debug("Remoting channel handler thread interrupted; closing channel");
+ IoUtils.safeClose(channel);
+ } finally {
+ IoUtils.safeClose(marshaller);
+ }
+ } catch (IOException ioe) {
+ log.warn("Failed to send notification of failure to unmarshal a request: %s", ioe);
+ }
break;
}
+ // request received OK
final RequestHandler requestHandler = handle.getResource();
requestHandler.receiveRequest(payload, new ReplyHandlerImpl(channel, requestId, allocator));
break;
@@ -207,7 +253,7 @@
} catch (IOException ex) {
log.trace("Failed to unmarshal a reply (%s), sending a ReplyException", ex);
// todo
- SpiUtils.safeHandleException(replyHandler, ex);
+ SpiUtils.safeHandleException(replyHandler, new ReplyException("Unmarshal failed", ex));
break;
}
SpiUtils.safeHandleReply(replyHandler, payload);
@@ -238,9 +284,24 @@
log.trace("Got reply to unknown request %d", Integer.valueOf(requestId));
break;
}
- final String reason = readUTFZ(buffer);
-
- // todo - throw a new ReplyException
+ final IOException cause;
+ try {
+ final Unmarshaller unmarshaller = marshallerFactory.createUnmarshaller(marshallingConfiguration);
+ try {
+ unmarshaller.start(Marshalling.createByteInput(buffer));
+ cause = (IOException) unmarshaller.readObject();
+ unmarshaller.finish();
+ } finally {
+ IoUtils.safeClose(unmarshaller);
+ }
+ } catch (IOException e) {
+ SpiUtils.safeHandleException(replyHandler, new RemoteExecutionException("Remote operation failed; the remote exception could not be read", e));
+ break;
+ } catch (ClassNotFoundException e) {
+ SpiUtils.safeHandleException(replyHandler, new RemoteExecutionException("Remote operation failed; the remote exception could not be read", e));
+ break;
+ }
+ SpiUtils.safeHandleException(replyHandler, cause);
break;
}
case REQUEST_FAILED: {
@@ -250,13 +311,13 @@
log.trace("Got reply to unknown request %d", Integer.valueOf(requestId));
break;
}
- final Throwable cause;
+ final IOException cause;
try {
final Unmarshaller unmarshaller = marshallerFactory.createUnmarshaller(marshallingConfiguration);
try {
unmarshaller.start(Marshalling.createByteInput(buffer));
try {
- cause = (Throwable) unmarshaller.readObject();
+ cause = (IOException) unmarshaller.readObject();
} catch (ClassNotFoundException e) {
replyHandler.handleException(new InvalidClassException("Class not found: " + e.toString()));
log.trace("Class not found in exception reply to request ID %d", Integer.valueOf(requestId));
@@ -339,7 +400,11 @@
final int calcMetric = baseMetric + linkMetric;
if (calcMetric > 0) {
try {
- final SimpleCloseable closeable = endpoint.registerRemoteService(serviceType, groupName, endpointName, handlerSource, calcMetric);
+ final RemoteServiceConfiguration config = new RemoteServiceConfiguration();
+ config.setServiceType(serviceType);
+ config.setGroupName(groupName);
+ config.setEndpointName(endpointName);
+ final SimpleCloseable closeable = endpoint.registerRemoteService(config);
// todo - something with that closeable
} catch (IOException e) {
log.error(e, "Unable to register remote service");
@@ -436,15 +501,14 @@
}
public void handleException(final IOException exception) throws IOException {
- ByteBuffer buffer = allocator.allocate();
- buffer.put((byte) MessageType.REQUEST_FAILED.getId());
- buffer.putInt(requestId);
try {
final Marshaller marshaller = marshallerFactory.createMarshaller(marshallingConfiguration);
try {
final List<ByteBuffer> bufferList = new ArrayList<ByteBuffer>();
final ByteOutput output = createByteOutput(allocator, bufferList);
try {
+ marshaller.write(MessageType.REQUEST_FAILED.getId());
+ marshaller.writeInt(requestId);
marshaller.start(output);
marshaller.writeObject(exception);
marshaller.close();
@@ -561,7 +625,7 @@
this.allocator = allocator;
addCloseHandler(new CloseHandler<RequestHandler>() {
public void handleClose(final RequestHandler closed) {
- remoteClients.remove(identifier, this);
+ remoteClients.remove(identifier);
ByteBuffer buffer = allocator.allocate();
buffer.put((byte) MessageType.CLIENT_CLOSE.getId());
buffer.putInt(identifier);
@@ -587,10 +651,8 @@
marshaller.write(MessageType.REQUEST.getId());
marshaller.writeInt(identifier);
- int id;
- do {
- id = requestSequence.getAndIncrement();
- } while (remoteRequests.putIfAbsent(id, handler) != null);
+ final int id = nextRequest();
+ remoteRequests.put(id, handler);
marshaller.writeInt(id);
marshaller.writeObject(request);
marshaller.close();
@@ -683,15 +745,13 @@
}
public Handle<RequestHandler> createRequestHandler() throws IOException {
- int id;
- do {
- id = remoteClientSequence.getAndIncrement() << 1;
- } while (remoteClients.putIfAbsent(id, new RequestHandlerImpl(id, MultiplexHandler.this.allocator)) != null);
- final int clientId = id;
+ final int id = nextRemoteClient();
+ final RequestHandler requestHandler = new RequestHandlerImpl(id, MultiplexHandler.this.allocator);
+ remoteClients.put(id, requestHandler);
final ByteBuffer buffer = allocator.allocate();
buffer.put((byte) MessageType.CLIENT_OPEN.getId());
buffer.putInt(identifier);
- buffer.putInt(clientId);
+ buffer.putInt(id);
buffer.flip();
// todo - probably should bail out if we're interrupted?
boolean intr = false;
@@ -699,7 +759,7 @@
try {
registerWriter(channel, new SimpleWriteHandler(allocator, buffer));
try {
- return new RequestHandlerImpl(clientId, allocator).getHandle();
+ return new RequestHandlerImpl(id, allocator).getHandle();
} finally {
if (intr) {
Thread.currentThread().interrupt();
@@ -769,25 +829,32 @@
};
}
+ private final ProtocolObjectTableWriter protocolObjectTableWriter = new ProtocolObjectTableWriter();
+
public class ProtocolObjectTableWriter implements ObjectTable.Writer {
public void writeObject(final Marshaller marshaller, final Object o) throws IOException {
-
+ final RequestHandler requestHandler = (RequestHandler) o;
+ final int existingId = forwardedClients.get(requestHandler, -1);
+ marshaller.write(1);
+ if (existingId == -1) {
+ final int newId = nextForwardedClient();
+ forwardedClients.put(newId, requestHandler.getHandle());
+ marshaller.writeInt(newId);
+ } else {
+ marshaller.writeInt(existingId);
+ }
}
}
public class ProtocolObjectTable implements ObjectTable {
- public Writer getObjectWriter(final Object o) throws IOException /* fixed in beta2 */ {
+ public Writer getObjectWriter(final Object o) throws IOException {
if (o instanceof RequestHandler) {
- final RequestHandler requestHandler = (RequestHandler) o;
-
- } else if (o instanceof RequestHandlerSource) {
- final RequestHandlerSource requestHandlerSource = (RequestHandlerSource) o;
-
+ return protocolObjectTableWriter;
} else {
+ return null;
}
- return null;
}
public Object readObject(final Unmarshaller unmarshaller) throws IOException, ClassNotFoundException {
@@ -795,18 +862,18 @@
case 1: {
// remote client
final int id = unmarshaller.readInt();
+ return remoteClients.get(id);
}
case 2: {
// remote client source
+ final int id = unmarshaller.readInt();
+ return remoteServices.get(id);
}
- case 3: {
- // stream
- }
default: {
// invalid
+ throw new InvalidObjectException("Invalid ID sent for protocol object table");
}
}
- return null;
}
}
}
Modified: remoting3/trunk/samples/src/main/java/org/jboss/remoting/samples/simple/LocalBasicExampleMain.java
===================================================================
--- remoting3/trunk/samples/src/main/java/org/jboss/remoting/samples/simple/LocalBasicExampleMain.java 2008-10-30 05:00:22 UTC (rev 4641)
+++ remoting3/trunk/samples/src/main/java/org/jboss/remoting/samples/simple/LocalBasicExampleMain.java 2008-11-04 02:34:36 UTC (rev 4642)
@@ -18,7 +18,7 @@
final StringRot13RequestListener listener = new StringRot13RequestListener();
final Endpoint endpoint = Remoting.createEndpoint("simple");
try {
- final Client<String,String> client = Remoting.createLocalClient(endpoint, listener);
+ final Client<String,String> client = Remoting.createLocalClient(endpoint, listener, null, null);
try {
final String original = "The Secret Message\n";
final String result = client.invoke(original);
Modified: remoting3/trunk/samples/src/main/java/org/jboss/remoting/samples/simple/LocalStreamExampleMain.java
===================================================================
--- remoting3/trunk/samples/src/main/java/org/jboss/remoting/samples/simple/LocalStreamExampleMain.java 2008-10-30 05:00:22 UTC (rev 4641)
+++ remoting3/trunk/samples/src/main/java/org/jboss/remoting/samples/simple/LocalStreamExampleMain.java 2008-11-04 02:34:36 UTC (rev 4642)
@@ -21,7 +21,7 @@
final StreamingRot13RequestListener listener = new StreamingRot13RequestListener();
final Endpoint endpoint = Remoting.createEndpoint("simple");
try {
- final Client<Reader,Reader> client = Remoting.createLocalClient(endpoint, listener);
+ final Client<Reader,Reader> client = Remoting.createLocalClient(endpoint, listener, null, null);
try {
final String original = "The Secret Message\n";
final StringReader originalReader = new StringReader(original);
Modified: remoting3/trunk/standalone/src/main/java/org/jboss/remoting/Remoting.java
===================================================================
--- remoting3/trunk/standalone/src/main/java/org/jboss/remoting/Remoting.java 2008-10-30 05:00:22 UTC (rev 4641)
+++ remoting3/trunk/standalone/src/main/java/org/jboss/remoting/Remoting.java 2008-11-04 02:34:36 UTC (rev 4642)
@@ -32,19 +32,19 @@
}
}
- public static <I, O> Client<I, O> createLocalClient(Endpoint endpoint, RequestListener<I, O> requestListener) throws IOException {
- final Handle<RequestHandler> handle = endpoint.createRequestHandler(requestListener);
+ public static <I, O> Client<I, O> createLocalClient(final Endpoint endpoint, final RequestListener<I, O> requestListener, final Class<I> requestClass, final Class<O> replyClass) throws IOException {
+ final Handle<RequestHandler> handle = endpoint.createRequestHandler(requestListener, requestClass, replyClass);
try {
- return endpoint.createClient(handle.getResource());
+ return endpoint.createClient(handle.getResource(), requestClass, replyClass);
} finally {
IoUtils.safeClose(handle);
}
}
- public static <I, O> ClientSource<I, O> createLocalClientSource(Endpoint endpoint, RequestListener<I, O> requestListener, final String serviceType, final String groupName) throws IOException {
- final Handle<RequestHandlerSource> handle = endpoint.createRequestHandlerSource(requestListener, serviceType, groupName);
+ public static <I, O> ClientSource<I, O> createLocalClientSource(final Endpoint endpoint, final LocalServiceConfiguration<I, O> config) throws IOException {
+ final Handle<RequestHandlerSource> handle = endpoint.registerService(config);
try {
- return endpoint.createClientSource(handle.getResource());
+ return endpoint.createClientSource(handle.getResource(), config.getRequestClass(), config.getReplyClass());
} finally {
IoUtils.safeClose(handle);
}
Modified: remoting3/trunk/transporter/src/main/java/org/jboss/remoting/transporter/Transporter.java
===================================================================
--- remoting3/trunk/transporter/src/main/java/org/jboss/remoting/transporter/Transporter.java 2008-10-30 05:00:22 UTC (rev 4641)
+++ remoting3/trunk/transporter/src/main/java/org/jboss/remoting/transporter/Transporter.java 2008-11-04 02:34:36 UTC (rev 4642)
@@ -50,9 +50,9 @@
*/
public static <T> T createTransporter(Endpoint endpoint, Class<T> interfaceType, T instance) throws IOException {
boolean ok = false;
- final Handle<RequestHandler> requestHandlerHandle = endpoint.createRequestHandler(new TransporterRequestListener<T>(instance));
+ final Handle<RequestHandler> requestHandlerHandle = endpoint.createRequestHandler(new TransporterRequestListener<T>(instance), TransporterInvocation.class, Object.class);
try {
- final Client<TransporterInvocation,Object> client = endpoint.createClient(requestHandlerHandle.getResource());
+ final Client<TransporterInvocation,Object> client = endpoint.createClient(requestHandlerHandle.getResource(), TransporterInvocation.class, Object.class);
try {
requestHandlerHandle.close();
final T proxy = createProxy(interfaceType, client);
@@ -68,8 +68,7 @@
}
}
- @SuppressWarnings({ "unchecked" })
private static <T> T createProxy(final Class<T> interfaceType, final Client<TransporterInvocation, Object> client) {
- return (T) Proxy.newProxyInstance(interfaceType.getClassLoader(), new Class<?>[] { interfaceType }, new TransporterInvocationHandler(client));
+ return interfaceType.cast(Proxy.newProxyInstance(interfaceType.getClassLoader(), new Class<?>[] { interfaceType }, new TransporterInvocationHandler(client)));
}
}
Modified: remoting3/trunk/util/src/main/java/org/jboss/remoting/util/CollectionUtil.java
===================================================================
--- remoting3/trunk/util/src/main/java/org/jboss/remoting/util/CollectionUtil.java 2008-10-30 05:00:22 UTC (rev 4641)
+++ remoting3/trunk/util/src/main/java/org/jboss/remoting/util/CollectionUtil.java 2008-11-04 02:34:36 UTC (rev 4642)
@@ -69,15 +69,6 @@
}
/**
- * Create a concurrent integer-keyed map.
- *
- * @return a concurrent integer-keyed map
- */
- public static <V> ConcurrentIntegerMap<V> concurrentIntegerMap() {
- return new EmulatedConcurrentIntegerHashMap<V>(CollectionUtil.<Integer, V>concurrentMap());
- }
-
- /**
* Create a synchronized map that obeys the contract for {@code ConcurrentMap}.
*
* @param original the map to be wrapped
Deleted: remoting3/trunk/util/src/main/java/org/jboss/remoting/util/ConcurrentIntegerMap.java
===================================================================
--- remoting3/trunk/util/src/main/java/org/jboss/remoting/util/ConcurrentIntegerMap.java 2008-10-30 05:00:22 UTC (rev 4641)
+++ remoting3/trunk/util/src/main/java/org/jboss/remoting/util/ConcurrentIntegerMap.java 2008-11-04 02:34:36 UTC (rev 4642)
@@ -1,51 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2008, JBoss Inc., and individual contributors as indicated
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-
-package org.jboss.remoting.util;
-
-/**
- * A concurrent map that is keyed by integer.
- */
-public interface ConcurrentIntegerMap<V> {
-
- V get(int key);
-
- V put(int key, V value);
-
- V putIfAbsent(int key, V value);
-
- V remove(int key);
-
- boolean remove(int key, Object oldValue);
-
- V replace(int key, V value);
-
- boolean replace(int key, V oldValue, V newValue);
-
- void clear();
-
- boolean isEmpty();
-
- boolean equals(Object other);
-
- int hashCode();
-}
Deleted: remoting3/trunk/util/src/main/java/org/jboss/remoting/util/EmulatedConcurrentIntegerHashMap.java
===================================================================
--- remoting3/trunk/util/src/main/java/org/jboss/remoting/util/EmulatedConcurrentIntegerHashMap.java 2008-10-30 05:00:22 UTC (rev 4641)
+++ remoting3/trunk/util/src/main/java/org/jboss/remoting/util/EmulatedConcurrentIntegerHashMap.java 2008-11-04 02:34:36 UTC (rev 4642)
@@ -1,81 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2008, JBoss Inc., and individual contributors as indicated
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-
-package org.jboss.remoting.util;
-
-import java.util.concurrent.ConcurrentMap;
-
-/**
- *
- */
-public final class EmulatedConcurrentIntegerHashMap<V> implements ConcurrentIntegerMap<V> {
-
- private final ConcurrentMap<Integer, V> delegate;
-
- public EmulatedConcurrentIntegerHashMap(final ConcurrentMap<Integer, V> delegate) {
- this.delegate = delegate;
- }
-
- public V get(final int key) {
- return delegate.get(Integer.valueOf(key));
- }
-
- public V put(final int key, final V value) {
- return delegate.put(Integer.valueOf(key), value);
- }
-
- public V putIfAbsent(final int key, final V value) {
- return delegate.putIfAbsent(Integer.valueOf(key), value);
- }
-
- public V remove(final int key) {
- return delegate.remove(Integer.valueOf(key));
- }
-
- public boolean remove(final int key, final Object oldValue) {
- return delegate.remove(Integer.valueOf(key), oldValue);
- }
-
- public V replace(final int key, final V value) {
- return delegate.replace(Integer.valueOf(key), value);
- }
-
- public boolean replace(final int key, final V oldValue, final V newValue) {
- return delegate.replace(Integer.valueOf(key), oldValue, newValue);
- }
-
- public void clear() {
- delegate.clear();
- }
-
- public boolean isEmpty() {
- return delegate.isEmpty();
- }
-
- public boolean equals(final Object obj) {
- return super.equals(obj);
- }
-
- public int hashCode() {
- return super.hashCode();
- }
-}
16 years