Author: david.lloyd(a)jboss.com
Date: 2008-07-18 19:18:52 -0400 (Fri, 18 Jul 2008)
New Revision: 4395
Added:
remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/ServiceRegistry.java
Modified:
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/Endpoint.java
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/SpiUtils.java
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/remote/RemoteClientEndpoint.java
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/remote/RemoteClientEndpointListener.java
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/remote/RemoteServiceEndpoint.java
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/remote/ReplyHandler.java
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/wrapper/EndpointWrapper.java
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/ClientImpl.java
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/ClientSourceImpl.java
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/EndpointImpl.java
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/FutureReplyImpl.java
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/RemoteClientEndpointLocalImpl.java
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/RemoteServiceEndpointLocalImpl.java
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/RequestContextImpl.java
remoting3/trunk/core/src/test/java/org/jboss/cx/remoting/core/EndpointTestCase.java
remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/BasicHandler.java
remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/BasicProtocol.java
remoting3/trunk/protocol/basic/src/test/java/org/jboss/cx/remoting/protocol/basic/ConnectionTestCase.java
remoting3/trunk/standalone/src/main/java/org/jboss/cx/remoting/Remoting.java
Log:
RemoteClient/ServiceEndpoints need and should not be generic
Modified: remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/Endpoint.java
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/Endpoint.java 2008-07-18
23:18:15 UTC (rev 4394)
+++ remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/Endpoint.java 2008-07-18
23:18:52 UTC (rev 4395)
@@ -37,7 +37,7 @@
* @return the client
* @throws RemotingException if an error occurs
*/
- <I, O> RemoteClientEndpoint<I, O>
createClientEndpoint(RequestListener<I, O> requestListener) throws
RemotingException;
+ <I, O> RemoteClientEndpoint createClientEndpoint(RequestListener<I, O>
requestListener) throws RemotingException;
/**
* Create a client source that can be used to acquire clients associated with a
request listener on this endpoint.
@@ -52,7 +52,7 @@
* @return the context source
* @throws RemotingException if an error occurs
*/
- <I, O> RemoteServiceEndpoint<I, O>
createServiceEndpoint(RequestListener<I, O> requestListener) throws
RemotingException;
+ <I, O> RemoteServiceEndpoint createServiceEndpoint(RequestListener<I, O>
requestListener) throws RemotingException;
/**
* Create a client from a remote client endpoint.
@@ -63,7 +63,7 @@
* @return the client
* @throws RemotingException if an error occurs
*/
- <I, O> Client<I, O> createClient(RemoteClientEndpoint<I, O>
endpoint) throws RemotingException;
+ <I, O> Client<I, O> createClient(RemoteClientEndpoint endpoint) throws
RemotingException;
/**
* Create a client source from a remote service endpoint.
@@ -74,5 +74,5 @@
* @return the client source
* @throws RemotingException if an error occurs
*/
- <I, O> ClientSource<I, O> createClientSource(RemoteServiceEndpoint<I,
O> endpoint) throws RemotingException;
+ <I, O> ClientSource<I, O> createClientSource(RemoteServiceEndpoint
endpoint) throws RemotingException;
}
Modified: remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/SpiUtils.java
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/SpiUtils.java 2008-07-18
23:18:15 UTC (rev 4394)
+++ remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/SpiUtils.java 2008-07-18
23:18:52 UTC (rev 4395)
@@ -48,7 +48,7 @@
* @param msg the message
* @param cause the cause
*/
- public static void safeHandleException(final ReplyHandler<?> replyHandler,
final String msg, final Throwable cause) {
+ public static void safeHandleException(final ReplyHandler replyHandler, final String
msg, final Throwable cause) {
try {
replyHandler.handleException(msg, cause);
} catch (Throwable t) {
@@ -63,7 +63,7 @@
* @param replyHandler the reply handler
* @param reply the reply
*/
- public static <O> void safeHandleReply(final ReplyHandler<O>
replyHandler, final O reply) {
+ public static <O> void safeHandleReply(final ReplyHandler replyHandler, final O
reply) {
try {
replyHandler.handleReply(reply);
} catch (Throwable t) {
@@ -76,7 +76,7 @@
*
* @param replyHandler the reply handler
*/
- public static void safeHandleCancellation(final ReplyHandler<?> replyHandler)
{
+ public static void safeHandleCancellation(final ReplyHandler replyHandler) {
try {
replyHandler.handleCancellation();
} catch (Throwable t) {
@@ -136,7 +136,7 @@
private static final RemoteRequestContext BLANK_REMOTE_REQUEST_CONTEXT = new
BlankRemoteRequestContext();
- public static void safeAutoClose(final RemoteClientEndpoint<?, ?>
remoteClientEndpoint) {
+ public static void safeAutoClose(final RemoteClientEndpoint remoteClientEndpoint) {
try {
remoteClientEndpoint.autoClose();
} catch (Throwable t) {
@@ -144,7 +144,7 @@
}
}
- public static void safeAutoClose(final RemoteServiceEndpoint<Object, Object>
remoteServiceEndpoint) {
+ public static void safeAutoClose(final RemoteServiceEndpoint remoteServiceEndpoint)
{
try {
remoteServiceEndpoint.autoClose();
} catch (Throwable t) {
Modified:
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/remote/RemoteClientEndpoint.java
===================================================================
---
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/remote/RemoteClientEndpoint.java 2008-07-18
23:18:15 UTC (rev 4394)
+++
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/remote/RemoteClientEndpoint.java 2008-07-18
23:18:52 UTC (rev 4395)
@@ -25,13 +25,12 @@
import org.jboss.cx.remoting.Closeable;
import org.jboss.cx.remoting.RemotingException;
import org.jboss.cx.remoting.CloseHandler;
-import org.jboss.cx.remoting.Client;
/**
* A remote client endpoint, which can be passed to remote endpoints. Remote systems can
then use the client endpoint
* to make invocations, or they may pass the client endpoint on to other remote systems.
*/
-public interface RemoteClientEndpoint<I, O> extends
Closeable<RemoteClientEndpoint<I, O>> {
+public interface RemoteClientEndpoint extends Closeable<RemoteClientEndpoint> {
/**
* Receive a one-way request from a remote system. This method is intended to be
called by protocol handlers. No
@@ -39,7 +38,7 @@
*
* @param request the request
*/
- void receiveRequest(I request);
+ void receiveRequest(Object request);
/**
* Receive a request from a remote system. This method is intended to be called by
protocol handlers. If the
@@ -51,7 +50,7 @@
* @param replyHandler a handler for the reply
* @return a context which may be used to cancel the request
*/
- RemoteRequestContext receiveRequest(I request, ReplyHandler<O> replyHandler);
+ RemoteRequestContext receiveRequest(Object request, ReplyHandler replyHandler);
/**
* Get a handle to this client endpoint. The client endpoint will not auto-close as
long as there is at least
@@ -62,7 +61,7 @@
* @return the handle
* @throws RemotingException if a handle could not be acquired
*/
- Handle<RemoteClientEndpoint<I, O>> getHandle() throws RemotingException;
+ Handle<RemoteClientEndpoint> getHandle() throws RemotingException;
/**
* Automatically close this client endpoint when all handles and local client
instances are closed.
@@ -82,5 +81,5 @@
*
* @param handler the handler to be called
*/
- void addCloseHandler(final CloseHandler<? super RemoteClientEndpoint<I,
O>> handler);
+ void addCloseHandler(final CloseHandler<? super RemoteClientEndpoint>
handler);
}
Modified:
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/remote/RemoteClientEndpointListener.java
===================================================================
---
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/remote/RemoteClientEndpointListener.java 2008-07-18
23:18:15 UTC (rev 4394)
+++
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/remote/RemoteClientEndpointListener.java 2008-07-18
23:18:52 UTC (rev 4395)
@@ -34,5 +34,5 @@
* @param <O> the reply type
* @param endpoint the endpoint that was created
*/
- <I, O> void notifyCreated(RemoteClientEndpoint<I, O> endpoint);
+ <I, O> void notifyCreated(RemoteClientEndpoint endpoint);
}
Modified:
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/remote/RemoteServiceEndpoint.java
===================================================================
---
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/remote/RemoteServiceEndpoint.java 2008-07-18
23:18:15 UTC (rev 4394)
+++
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/remote/RemoteServiceEndpoint.java 2008-07-18
23:18:52 UTC (rev 4395)
@@ -25,7 +25,6 @@
import org.jboss.cx.remoting.Closeable;
import org.jboss.cx.remoting.RemotingException;
import org.jboss.cx.remoting.CloseHandler;
-import org.jboss.cx.remoting.ClientSource;
/**
* A remote service endpoint, which can be passed to remote endpoints. Remote systems
can then use the service endpoint
@@ -33,7 +32,7 @@
* has the advantage that a round trip to the remote side is not necessary; the local
side can spawn a client endpoint
* and simply notify the remote side of the change.
*/
-public interface RemoteServiceEndpoint<I, O> extends
Closeable<RemoteServiceEndpoint<I, O>> {
+public interface RemoteServiceEndpoint extends Closeable<RemoteServiceEndpoint> {
/**
* Create a client endpoint for the service corresponding to this service endpoint.
@@ -41,7 +40,7 @@
* @return a client endpoint
* @throws RemotingException if a client could not be opened
*/
- RemoteClientEndpoint<I, O> createClientEndpoint() throws RemotingException;
+ RemoteClientEndpoint createClientEndpoint() throws RemotingException;
/**
* Get a handle to this service endpoint. The service endpoint will not auto-close
as long as there is at least
@@ -52,7 +51,7 @@
* @return the handle
* @throws RemotingException if a handle could not be acquired
*/
- Handle<RemoteServiceEndpoint<I, O>> getHandle() throws
RemotingException;
+ Handle<RemoteServiceEndpoint> getHandle() throws RemotingException;
/**
* Automatically close this service endpoint when all handles and local client source
instances
@@ -70,5 +69,5 @@
*
* @param handler the handler to be called
*/
- void addCloseHandler(final CloseHandler<? super RemoteServiceEndpoint<I,
O>> handler);
+ void addCloseHandler(final CloseHandler<? super RemoteServiceEndpoint>
handler);
}
Modified:
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/remote/ReplyHandler.java
===================================================================
---
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/remote/ReplyHandler.java 2008-07-18
23:18:15 UTC (rev 4394)
+++
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/remote/ReplyHandler.java 2008-07-18
23:18:52 UTC (rev 4395)
@@ -26,14 +26,14 @@
* A handler for replies from a request. The handler should respect the first invocation
made on it, and ignore
* any subsequent invocations.
*/
-public interface ReplyHandler<O> {
+public interface ReplyHandler {
/**
* Handle a successful reply.
*
* @param reply the reply
*/
- void handleReply(O reply);
+ void handleReply(Object reply);
/**
* Handle a remote exception.
Modified:
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/wrapper/EndpointWrapper.java
===================================================================
---
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/wrapper/EndpointWrapper.java 2008-07-18
23:18:15 UTC (rev 4394)
+++
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/wrapper/EndpointWrapper.java 2008-07-18
23:18:52 UTC (rev 4395)
@@ -41,28 +41,28 @@
/**
* {@inheritDoc} This implementation calls the same method on the delegate object.
*/
- public <I, O> RemoteClientEndpoint<I, O> createClientEndpoint(final
RequestListener<I, O> requestListener) throws RemotingException {
+ public <I, O> RemoteClientEndpoint createClientEndpoint(final
RequestListener<I, O> requestListener) throws RemotingException {
return delegate.createClientEndpoint(requestListener);
}
/**
* {@inheritDoc} This implementation calls the same method on the delegate object.
*/
- public <I, O> RemoteServiceEndpoint<I, O> createServiceEndpoint(final
RequestListener<I, O> requestListener) throws RemotingException {
+ public <I, O> RemoteServiceEndpoint createServiceEndpoint(final
RequestListener<I, O> requestListener) throws RemotingException {
return delegate.createServiceEndpoint(requestListener);
}
/**
* {@inheritDoc} This implementation calls the same method on the delegate object.
*/
- public <I, O> Client<I, O> createClient(final RemoteClientEndpoint<I,
O> endpoint) throws RemotingException {
+ public <I, O> Client<I, O> createClient(final RemoteClientEndpoint
endpoint) throws RemotingException {
return delegate.createClient(endpoint);
}
/**
* {@inheritDoc} This implementation calls the same method on the delegate object.
*/
- public <I, O> ClientSource<I, O> createClientSource(final
RemoteServiceEndpoint<I, O> endpoint) throws RemotingException {
+ public <I, O> ClientSource<I, O> createClientSource(final
RemoteServiceEndpoint endpoint) throws RemotingException {
return delegate.createClientSource(endpoint);
}
}
Modified: remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/ClientImpl.java
===================================================================
---
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/ClientImpl.java 2008-07-18
23:18:15 UTC (rev 4394)
+++
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/ClientImpl.java 2008-07-18
23:18:52 UTC (rev 4395)
@@ -38,9 +38,9 @@
*/
public final class ClientImpl<I, O> extends AbstractContextImpl<Client<I,
O>> implements Client<I, O> {
- private final RemoteClientEndpoint<I, O> remoteClientEndpoint;
+ private final RemoteClientEndpoint remoteClientEndpoint;
- ClientImpl(final RemoteClientEndpoint<I, O> remoteClientEndpoint, final
Executor executor) {
+ ClientImpl(final RemoteClientEndpoint remoteClientEndpoint, final Executor executor)
{
super(executor);
this.remoteClientEndpoint = remoteClientEndpoint;
}
@@ -51,7 +51,7 @@
}
final QueueExecutor executor = new QueueExecutor();
final FutureReplyImpl<O> futureReply = new
FutureReplyImpl<O>(executor);
- final ReplyHandler<O> replyHandler = futureReply.getReplyHandler();
+ final ReplyHandler replyHandler = futureReply.getReplyHandler();
final RemoteRequestContext requestContext =
remoteClientEndpoint.receiveRequest(request, replyHandler);
futureReply.setRemoteRequestContext(requestContext);
futureReply.addCompletionHandler(new RequestCompletionHandler<O>() {
@@ -68,7 +68,7 @@
throw new RemotingException("Client is not open");
}
final FutureReplyImpl<O> futureReply = new
FutureReplyImpl<O>(executor);
- final ReplyHandler<O> replyHandler = futureReply.getReplyHandler();
+ final ReplyHandler replyHandler = futureReply.getReplyHandler();
final RemoteRequestContext requestContext =
remoteClientEndpoint.receiveRequest(request, replyHandler);
futureReply.setRemoteRequestContext(requestContext);
return futureReply;
Modified:
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/ClientSourceImpl.java
===================================================================
---
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/ClientSourceImpl.java 2008-07-18
23:18:15 UTC (rev 4394)
+++
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/ClientSourceImpl.java 2008-07-18
23:18:52 UTC (rev 4395)
@@ -35,10 +35,10 @@
*/
public final class ClientSourceImpl<I, O> extends
AbstractCloseable<ClientSource<I, O>> implements ClientSource<I, O> {
- private final RemoteServiceEndpoint<I, O> serviceEndpoint;
+ private final RemoteServiceEndpoint serviceEndpoint;
private final Endpoint endpoint;
- ClientSourceImpl(final RemoteServiceEndpoint<I, O> serviceEndpoint, final
EndpointImpl endpoint) {
+ ClientSourceImpl(final RemoteServiceEndpoint serviceEndpoint, final EndpointImpl
endpoint) {
super(endpoint.getExecutor());
this.serviceEndpoint = serviceEndpoint;
this.endpoint = endpoint;
@@ -48,7 +48,7 @@
if (! isOpen()) {
throw new RemotingException("Client source is not open");
}
- final RemoteClientEndpoint<I,O> clientEndpoint =
serviceEndpoint.createClientEndpoint();
+ final RemoteClientEndpoint clientEndpoint =
serviceEndpoint.createClientEndpoint();
final Client<I, O> client = endpoint.createClient(clientEndpoint);
clientEndpoint.autoClose();
return client;
Modified: remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/EndpointImpl.java
===================================================================
---
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/EndpointImpl.java 2008-07-18
23:18:15 UTC (rev 4394)
+++
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/EndpointImpl.java 2008-07-18
23:18:52 UTC (rev 4395)
@@ -121,23 +121,23 @@
return endpointMap;
}
- public <I, O> RemoteClientEndpoint<I, O> createClientEndpoint(final
RequestListener<I, O> requestListener) throws RemotingException {
+ public <I, O> RemoteClientEndpoint createClientEndpoint(final
RequestListener<I, O> requestListener) throws RemotingException {
final RemoteClientEndpointLocalImpl<I, O> clientEndpoint = new
RemoteClientEndpointLocalImpl<I, O>(executor, requestListener);
clientEndpoint.addCloseHandler(remover);
clientEndpoint.open();
return clientEndpoint;
}
- public <I, O> RemoteServiceEndpoint<I, O> createServiceEndpoint(final
RequestListener<I, O> requestListener) throws RemotingException {
+ public <I, O> RemoteServiceEndpoint createServiceEndpoint(final
RequestListener<I, O> requestListener) throws RemotingException {
final RemoteServiceEndpointLocalImpl<I, O> serviceEndpoint = new
RemoteServiceEndpointLocalImpl<I, O>(executor, requestListener);
serviceEndpoint.addCloseHandler(remover);
serviceEndpoint.open();
return serviceEndpoint;
}
- public <I, O> Client<I, O> createClient(final RemoteClientEndpoint<I,
O> endpoint) throws RemotingException {
+ public <I, O> Client<I, O> createClient(final RemoteClientEndpoint
endpoint) throws RemotingException {
boolean ok = false;
- final Handle<RemoteClientEndpoint<I,O>> handle =
endpoint.getHandle();
+ final Handle<RemoteClientEndpoint> handle = endpoint.getHandle();
try {
final ClientImpl<I, O> client = new ClientImpl<I, O>(endpoint,
executor);
client.addCloseHandler(new CloseHandler<Client<I, O>>() {
@@ -154,9 +154,9 @@
}
}
- public <I, O> ClientSource<I, O> createClientSource(final
RemoteServiceEndpoint<I, O> endpoint) throws RemotingException {
+ public <I, O> ClientSource<I, O> createClientSource(final
RemoteServiceEndpoint endpoint) throws RemotingException {
boolean ok = false;
- final Handle<RemoteServiceEndpoint<I,O>> handle =
endpoint.getHandle();
+ final Handle<RemoteServiceEndpoint> handle = endpoint.getHandle();
try {
final ClientSourceImpl<I, O> client = new ClientSourceImpl<I,
O>(endpoint, this);
client.addCloseHandler(new CloseHandler<ClientSource<I, O>>() {
Modified:
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/FutureReplyImpl.java
===================================================================
---
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/FutureReplyImpl.java 2008-07-18
23:18:15 UTC (rev 4394)
+++
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/FutureReplyImpl.java 2008-07-18
23:18:52 UTC (rev 4395)
@@ -40,7 +40,7 @@
public final class FutureReplyImpl<O> implements FutureReply<O> {
private final Executor executor;
- private final ReplyHandler<O> replyHandler = new Handler();
+ private final ReplyHandler replyHandler = new Handler();
private final Object lock = new Object();
// @protectedby lock
private State state = State.NEW;
@@ -264,7 +264,7 @@
return this;
}
- ReplyHandler<O> getReplyHandler() {
+ ReplyHandler getReplyHandler() {
return replyHandler;
}
@@ -284,9 +284,10 @@
}
}
- private final class Handler implements ReplyHandler<O> {
+ private final class Handler implements ReplyHandler {
- public void handleReply(final O reply) {
+ @SuppressWarnings({ "unchecked" })
+ public void handleReply(final Object reply) {
synchronized (lock) {
while (state == State.NEW) {
boolean intr = false;
@@ -304,7 +305,7 @@
}
if (state == State.WAITING) {
state = State.DONE;
- result = reply;
+ result = (O) reply;
runCompletionHandlers();
lock.notifyAll();
}
Modified:
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/RemoteClientEndpointLocalImpl.java
===================================================================
---
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/RemoteClientEndpointLocalImpl.java 2008-07-18
23:18:15 UTC (rev 4394)
+++
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/RemoteClientEndpointLocalImpl.java 2008-07-18
23:18:52 UTC (rev 4395)
@@ -37,7 +37,7 @@
/**
*
*/
-public final class RemoteClientEndpointLocalImpl<I, O> extends
AbstractAutoCloseable<RemoteClientEndpoint<I, O>> implements
RemoteClientEndpoint<I, O> {
+public final class RemoteClientEndpointLocalImpl<I, O> extends
AbstractAutoCloseable<RemoteClientEndpoint> implements RemoteClientEndpoint {
private final RequestListener<I, O> requestListener;
private final Executor executor;
@@ -60,12 +60,13 @@
this(executor, requestListener, new ClientContextImpl(executor));
}
- public void receiveRequest(final I request) {
+ public void receiveRequest(final Object request) {
final RequestContextImpl<O> context = new
RequestContextImpl<O>(clientContext);
executor.execute(new Runnable() {
+ @SuppressWarnings({ "unchecked" })
public void run() {
try {
- requestListener.handleRequest(context, request);
+ requestListener.handleRequest(context, (I) request);
} catch (Throwable t) {
log.error(t, "Unexpected exception in request listener");
}
@@ -73,12 +74,13 @@
});
}
- public RemoteRequestContext receiveRequest(final I request, final
ReplyHandler<O> replyHandler) {
+ public RemoteRequestContext receiveRequest(final Object request, final ReplyHandler
replyHandler) {
final RequestContextImpl<O> context = new
RequestContextImpl<O>(replyHandler, clientContext);
executor.execute(new Runnable() {
+ @SuppressWarnings({ "unchecked" })
public void run() {
try {
- requestListener.handleRequest(context, request);
+ requestListener.handleRequest(context, (I) request);
} catch (RemoteExecutionException e) {
SpiUtils.safeHandleException(replyHandler, e.getMessage(),
e.getCause());
} catch (Throwable t) {
@@ -96,8 +98,8 @@
void open() throws RemotingException {
try {
requestListener.handleClientOpen(clientContext);
- addCloseHandler(new CloseHandler<RemoteClientEndpoint<I, O>>() {
- public void handleClose(final RemoteClientEndpoint<I, O> closed) {
+ addCloseHandler(new CloseHandler<RemoteClientEndpoint>() {
+ public void handleClose(final RemoteClientEndpoint closed) {
try {
requestListener.handleClientClose(clientContext);
} catch (Throwable t) {
Modified:
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/RemoteServiceEndpointLocalImpl.java
===================================================================
---
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/RemoteServiceEndpointLocalImpl.java 2008-07-18
23:18:15 UTC (rev 4394)
+++
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/RemoteServiceEndpointLocalImpl.java 2008-07-18
23:18:52 UTC (rev 4395)
@@ -34,7 +34,7 @@
/**
*
*/
-public final class RemoteServiceEndpointLocalImpl<I, O> extends
AbstractAutoCloseable<RemoteServiceEndpoint<I, O>> implements
RemoteServiceEndpoint<I, O> {
+public final class RemoteServiceEndpointLocalImpl<I, O> extends
AbstractAutoCloseable<RemoteServiceEndpoint> implements RemoteServiceEndpoint {
private final RequestListener<I, O> requestListener;
private final ServiceContextImpl serviceContext;
@@ -49,7 +49,7 @@
serviceContext = new ServiceContextImpl(executor);
}
- public RemoteClientEndpoint<I, O> createClientEndpoint() throws
RemotingException {
+ public RemoteClientEndpoint createClientEndpoint() throws RemotingException {
if (isOpen()) {
final RemoteClientEndpointLocalImpl<I, O> clientEndpoint = new
RemoteClientEndpointLocalImpl<I, O>(executor, this, requestListener);
clientEndpoint.open();
@@ -62,8 +62,8 @@
void open() throws RemotingException {
try {
requestListener.handleServiceOpen(serviceContext);
- addCloseHandler(new CloseHandler<RemoteServiceEndpoint<I, O>>()
{
- public void handleClose(final RemoteServiceEndpoint<I, O> closed)
{
+ addCloseHandler(new CloseHandler<RemoteServiceEndpoint>() {
+ public void handleClose(final RemoteServiceEndpoint closed) {
try {
requestListener.handleServiceClose(serviceContext);
} catch (Throwable t) {
Modified:
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/RequestContextImpl.java
===================================================================
---
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/RequestContextImpl.java 2008-07-18
23:18:15 UTC (rev 4394)
+++
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/RequestContextImpl.java 2008-07-18
23:18:52 UTC (rev 4395)
@@ -40,7 +40,7 @@
private final AtomicBoolean closed = new AtomicBoolean();
private final Object cancelLock = new Object();
- private final ReplyHandler<O> replyHandler;
+ private final ReplyHandler replyHandler;
private final ClientContextImpl clientContext;
private final AtomicBoolean cancelled = new AtomicBoolean();
@@ -48,7 +48,7 @@
private Set<RequestCancelHandler<O>> cancelHandlers;
private final TaggingExecutor executor;
- RequestContextImpl(final ReplyHandler<O> replyHandler, final ClientContextImpl
clientContext) {
+ RequestContextImpl(final ReplyHandler replyHandler, final ClientContextImpl
clientContext) {
this.replyHandler = replyHandler;
this.clientContext = clientContext;
executor = new TaggingExecutor(clientContext.getExecutor());
Modified:
remoting3/trunk/core/src/test/java/org/jboss/cx/remoting/core/EndpointTestCase.java
===================================================================
---
remoting3/trunk/core/src/test/java/org/jboss/cx/remoting/core/EndpointTestCase.java 2008-07-18
23:18:15 UTC (rev 4394)
+++
remoting3/trunk/core/src/test/java/org/jboss/cx/remoting/core/EndpointTestCase.java 2008-07-18
23:18:52 UTC (rev 4395)
@@ -78,7 +78,7 @@
endpoint.setExecutor(executorService);
endpoint.start();
try {
- final RemoteClientEndpoint<Object,Object> clientEndpoint =
endpoint.createClientEndpoint(new AbstractRequestListener<Object, Object>() {
+ final RemoteClientEndpoint clientEndpoint =
endpoint.createClientEndpoint(new AbstractRequestListener<Object, Object>() {
public void handleRequest(final RequestContext<Object> context,
final Object request) throws RemoteExecutionException {
assertEquals(request, requestObj);
try {
@@ -93,8 +93,8 @@
}
});
try {
- clientEndpoint.addCloseHandler(new
CloseHandler<RemoteClientEndpoint<Object, Object>>() {
- public void handleClose(final RemoteClientEndpoint<Object,
Object> closed) {
+ clientEndpoint.addCloseHandler(new
CloseHandler<RemoteClientEndpoint>() {
+ public void handleClose(final RemoteClientEndpoint closed) {
clientEndpointClosed.set(true);
}
});
@@ -137,7 +137,7 @@
endpoint.setExecutor(executorService);
endpoint.start();
try {
- final RemoteClientEndpoint<Object,Object> clientEndpoint =
endpoint.createClientEndpoint(new AbstractRequestListener<Object, Object>() {
+ final RemoteClientEndpoint clientEndpoint =
endpoint.createClientEndpoint(new AbstractRequestListener<Object, Object>() {
public void handleRequest(final RequestContext<Object> context,
final Object request) throws RemoteExecutionException {
assertEquals(request, requestObj);
try {
@@ -152,8 +152,8 @@
}
});
try {
- clientEndpoint.addCloseHandler(new
CloseHandler<RemoteClientEndpoint<Object, Object>>() {
- public void handleClose(final RemoteClientEndpoint<Object,
Object> closed) {
+ clientEndpoint.addCloseHandler(new
CloseHandler<RemoteClientEndpoint>() {
+ public void handleClose(final RemoteClientEndpoint closed) {
clientEndpointClosed.set(true);
}
});
Modified:
remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/BasicHandler.java
===================================================================
---
remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/BasicHandler.java 2008-07-18
23:18:15 UTC (rev 4394)
+++
remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/BasicHandler.java 2008-07-18
23:18:52 UTC (rev 4395)
@@ -52,6 +52,7 @@
import static org.jboss.cx.remoting.protocol.basic.MessageType.REQUEST_FAILED;
import static org.jboss.cx.remoting.protocol.basic.MessageType.CANCEL_ACK;
import org.jboss.cx.remoting.RemotingException;
+import org.jboss.cx.remoting.CloseHandler;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executor;
@@ -71,13 +72,13 @@
private static final Logger log = Logger.getLogger(BasicHandler.class);
// clients whose requests get forwarded to the remote side
- private final ConcurrentMap<Integer, RemoteClientEndpoint<?, ?>>
remoteClients = concurrentMap();
+ private final ConcurrentMap<Integer, RemoteClientEndpoint> remoteClients =
concurrentMap();
// running on remote node
- private final ConcurrentMap<Integer, ReplyHandler<?>> outstandingRequests
= concurrentMap();
+ private final ConcurrentMap<Integer, ReplyHandler> outstandingRequests =
concurrentMap();
// forwarded to remote side
- private final ConcurrentMap<Integer, Handle<RemoteClientEndpoint<?,
?>>> forwardedClients = concurrentMap();
+ private final ConcurrentMap<Integer, Handle<RemoteClientEndpoint>>
forwardedClients = concurrentMap();
// forwarded to remote side
- private final ConcurrentMap<Integer, Handle<RemoteServiceEndpoint<?,
?>>> forwardedServices = concurrentMap();
+ private final ConcurrentMap<Integer, Handle<RemoteServiceEndpoint>>
forwardedServices = concurrentMap();
private final boolean server;
private final BufferAllocator<ByteBuffer> allocator;
@@ -90,12 +91,12 @@
private final ClassLoader classLoader;
@SuppressWarnings({ "unchecked" })
- public <I, O> BasicHandler(final boolean server, final
BufferAllocator<ByteBuffer> allocator, final RemoteClientEndpoint<I, O> root,
final Executor executor, final RemoteClientEndpointListener remoteListener, final
MarshallerFactory<ByteBuffer> marshallerFactory) throws RemotingException {
+ public BasicHandler(final boolean server, final BufferAllocator<ByteBuffer>
allocator, final RemoteClientEndpoint root, final Executor executor, final
RemoteClientEndpointListener remoteListener, final MarshallerFactory<ByteBuffer>
marshallerFactory) throws RemotingException {
this.server = server;
this.allocator = allocator;
this.executor = executor;
forwardedClients.put(Integer.valueOf(0),
((RemoteClientEndpoint)root).getHandle());
- final RemoteClientEndpointImpl<Object, Object> endpoint = new
RemoteClientEndpointImpl<Object, Object>(0, marshallerFactory, allocator);
+ final RemoteClientEndpointImpl endpoint = new RemoteClientEndpointImpl(0,
marshallerFactory, allocator);
remoteClients.put(Integer.valueOf(0), endpoint);
if (remoteListener != null) {
remoteListener.notifyCreated(endpoint);
@@ -148,7 +149,7 @@
switch (msgType) {
case REQUEST_ONEWAY: {
final int clientId = buffer.getInt();
- final Handle<RemoteClientEndpoint<?, ?>> handle =
getForwardedClient(clientId);
+ final Handle<RemoteClientEndpoint> handle =
getForwardedClient(clientId);
if (handle == null) {
log.trace("Request on invalid client ID %d",
Integer.valueOf(clientId));
return;
@@ -165,13 +166,13 @@
log.trace("Class not found in one-way request for client ID
%d", Integer.valueOf(clientId));
break;
}
- final RemoteClientEndpoint<?, ?> clientEndpoint =
handle.getResource();
- receiveRequest(clientEndpoint, payload);
+ final RemoteClientEndpoint clientEndpoint = handle.getResource();
+ clientEndpoint.receiveRequest(payload);
break;
}
case REQUEST: {
final int clientId = buffer.getInt();
- final Handle<RemoteClientEndpoint<?, ?>> handle =
getForwardedClient(clientId);
+ final Handle<RemoteClientEndpoint> handle =
getForwardedClient(clientId);
if (handle == null) {
log.trace("Request on invalid client ID %d",
Integer.valueOf(clientId));
break;
@@ -190,13 +191,13 @@
log.trace("Class not found in request ID %d for client ID
%d", Integer.valueOf(requestId), Integer.valueOf(clientId));
break;
}
- final RemoteClientEndpoint<?, ?> clientEndpoint =
handle.getResource();
- receiveRequest(clientEndpoint, new ReplyHandlerImpl(channel,
requestId, allocator), payload);
+ final RemoteClientEndpoint clientEndpoint = handle.getResource();
+ clientEndpoint.receiveRequest(payload, (ReplyHandler) new
ReplyHandlerImpl(channel, requestId, allocator));
break;
}
case REPLY: {
final int requestId = buffer.getInt();
- final ReplyHandler<?> replyHandler =
takeOutstandingReqeust(requestId);
+ final ReplyHandler replyHandler = takeOutstandingReqeust(requestId);
if (replyHandler == null) {
log.trace("Got reply to unknown request %d",
Integer.valueOf(requestId));
break;
@@ -215,12 +216,12 @@
log.trace("Class not found in reply to request ID %d",
Integer.valueOf(requestId));
break;
}
- handleReply(replyHandler, payload);
+ SpiUtils.safeHandleReply(replyHandler, payload);
break;
}
case REQUEST_FAILED: {
final int requestId = buffer.getInt();
- final ReplyHandler<?> replyHandler =
takeOutstandingReqeust(requestId);
+ final ReplyHandler replyHandler = takeOutstandingReqeust(requestId);
if (replyHandler == null) {
log.trace("Got reply to unknown request %d",
Integer.valueOf(requestId));
break;
@@ -247,26 +248,40 @@
log.trace("Class not found in exception reply to request ID
%d", Integer.valueOf(requestId));
break;
}
- handleException(replyHandler, message, cause);
+ SpiUtils.safeHandleException(replyHandler, message == null ? null :
message.toString(), cause instanceof Throwable ? (Throwable) cause : null);
break;
}
case CLIENT_CLOSE: {
+ final int clientId = buffer.getInt();
+ final Handle<RemoteClientEndpoint> handle =
takeForwardedClient(clientId);
+ if (handle == null) {
+ log.warn("Got client close message for unknown client
%d", Integer.valueOf(clientId));
+ break;
+ }
+ IoUtils.safeClose(handle);
break;
}
case CLIENT_OPEN: {
final int serviceId = buffer.getInt();
final int clientId = buffer.getInt();
- final Handle<RemoteServiceEndpoint<?, ?>> handle =
getForwardedService(serviceId);
+ final Handle<RemoteServiceEndpoint> handle =
getForwardedService(serviceId);
if (handle == null) {
// todo log invalid request
break;
}
- final RemoteServiceEndpoint<?, ?> serviceEndpoint =
handle.getResource();
- final RemoteClientEndpoint<?, ?> clientEndpoint =
serviceEndpoint.createClientEndpoint();
+ final RemoteServiceEndpoint serviceEndpoint = handle.getResource();
+ final RemoteClientEndpoint clientEndpoint =
serviceEndpoint.createClientEndpoint();
break;
}
case SERVICE_CLOSE: {
+ final int serviceId = buffer.getInt();
+ final Handle<RemoteServiceEndpoint> handle =
takeForwardedService(serviceId);
+ if (handle == null) {
+ log.warn("Got client close message for unknown service
%d", Integer.valueOf(serviceId));
+ break;
+ }
+ IoUtils.safeClose(handle);
break;
}
default: {
@@ -309,15 +324,11 @@
public void handleClosed(final AllocatedMessageChannel channel) {
}
- private <I, O> ReplyHandler<O> createReplyHandler(final
AllocatedMessageChannel channel, final int requestId) {
- return new ReplyHandlerImpl<O>(channel, requestId, allocator);
- }
-
- RemoteClientEndpoint<?, ?> getRemoteClient(final int i) {
+ RemoteClientEndpoint getRemoteClient(final int i) {
return remoteClients.get(Integer.valueOf(i));
}
- private final class ReplyHandlerImpl<O> implements ReplyHandler<O> {
+ private final class ReplyHandlerImpl implements ReplyHandler {
private final AllocatedMessageChannel channel;
private final int requestId;
@@ -335,7 +346,7 @@
this.allocator = allocator;
}
- public void handleReply(final O reply) {
+ public void handleReply(final Object reply) {
ByteBuffer buffer = allocator.allocate();
buffer.put((byte) REPLY);
buffer.putInt(requestId);
@@ -400,7 +411,7 @@
// Session mgmt
- public int openRequest(ReplyHandler<?> handler) {
+ public int openRequest(ReplyHandler handler) {
int id;
do {
id = localRequestIdSeq.getAndIncrement();
@@ -412,12 +423,12 @@
int id;
do {
id = remoteClientIdSeq.getAndIncrement() << 1 | (server ? 1 : 0);
- } while (remoteClients.putIfAbsent(Integer.valueOf(id), new
RemoteClientEndpointImpl<Object, Object>(id, null, allocator)) != null);
+ } while (remoteClients.putIfAbsent(Integer.valueOf(id), new
RemoteClientEndpointImpl(id, null, allocator)) != null);
return id;
}
@SuppressWarnings({ "unchecked" })
- public void openClientForForwardedService(int id, RemoteClientEndpoint<?, ?>
clientEndpoint) {
+ public void openClientForForwardedService(int id, RemoteClientEndpoint
clientEndpoint) {
try {
forwardedClients.put(Integer.valueOf(id),
((RemoteClientEndpoint)clientEndpoint).getHandle());
} catch (RemotingException e) {
@@ -425,37 +436,26 @@
}
}
- public Handle<RemoteClientEndpoint<?, ?>> getForwardedClient(int id) {
+ public Handle<RemoteClientEndpoint> getForwardedClient(int id) {
return forwardedClients.get(Integer.valueOf(id));
}
- public ReplyHandler<?> takeOutstandingReqeust(int id) {
+ private Handle<RemoteClientEndpoint> takeForwardedClient(final int id) {
+ return forwardedClients.remove(Integer.valueOf(id));
+ }
+
+ public ReplyHandler takeOutstandingReqeust(int id) {
return outstandingRequests.remove(Integer.valueOf(id));
}
- public Handle<RemoteServiceEndpoint<?, ?>> getForwardedService(final int
id) {
+ public Handle<RemoteServiceEndpoint> getForwardedService(final int id) {
return forwardedServices.get(Integer.valueOf(id));
}
- @SuppressWarnings({ "unchecked" })
- private static <I, O> void receiveRequest(RemoteClientEndpoint<I, O>
clientEndpoint, Object request) {
- clientEndpoint.receiveRequest((I) request);
+ public Handle<RemoteServiceEndpoint> takeForwardedService(final int id) {
+ return forwardedServices.remove(Integer.valueOf(id));
}
- @SuppressWarnings({ "unchecked" })
- private static <I, O> RemoteRequestContext
receiveRequest(RemoteClientEndpoint<I, O> clientEndpoint, ReplyHandler<O>
replyHandler, Object request) {
- return clientEndpoint.receiveRequest((I) request, replyHandler);
- }
-
- @SuppressWarnings({ "unchecked" })
- private static <O> void handleReply(final ReplyHandler<O> replyHandler,
final Object reply) {
- SpiUtils.safeHandleReply(replyHandler, (O) reply);
- }
-
- private static void handleException(final ReplyHandler<?> handler, final Object
message, final Object cause) {
- SpiUtils.safeHandleException(handler, message == null ? null :
message.toString(), cause instanceof Throwable ? (Throwable) cause : null);
- }
-
// Writer members
private final BlockingQueue<WriteHandler> outputQueue =
CollectionUtil.blockingQueue(64);
@@ -470,7 +470,7 @@
// client endpoint
- private final class RemoteClientEndpointImpl<I, O> extends
AbstractAutoCloseable<RemoteClientEndpoint<I, O>> implements
RemoteClientEndpoint<I, O> {
+ private final class RemoteClientEndpointImpl extends
AbstractAutoCloseable<RemoteClientEndpoint> implements RemoteClientEndpoint {
private final int identifier;
private final MarshallerFactory<ByteBuffer> marshallerFactory;
@@ -487,9 +487,22 @@
this.identifier = identifier;
this.marshallerFactory = marshallerFactory;
this.allocator = allocator;
+ addCloseHandler(new CloseHandler<RemoteClientEndpoint>() {
+ public void handleClose(final RemoteClientEndpoint closed) {
+ ByteBuffer buffer = allocator.allocate();
+ buffer.put((byte) MessageType.CLIENT_CLOSE);
+ buffer.putInt(identifier);
+ buffer.flip();
+ try {
+ registerWriter(channel, new SimpleWriteHandler(allocator,
buffer));
+ } catch (InterruptedException e) {
+ log.warn("Client close notification was interrupted before
it could be sent");
+ }
+ }
+ });
}
- public void receiveRequest(final I request) {
+ public void receiveRequest(final Object request) {
log.trace("Received one-way request of type %s", request == null ?
"null" : request.getClass());
try {
final Marshaller<ByteBuffer> marshaller =
marshallerFactory.createMarshaller(null);
@@ -517,7 +530,7 @@
}
}
- public RemoteRequestContext receiveRequest(final I request, final
ReplyHandler<O> handler) {
+ public RemoteRequestContext receiveRequest(final Object request, final
ReplyHandler handler) {
log.trace("Received request of type %s", request == null ?
"null" : request.getClass());
try {
final Marshaller<ByteBuffer> marshaller =
marshallerFactory.createMarshaller(null);
@@ -579,7 +592,7 @@
}
}
- public final class RemoteServiceEndpointImpl<I, O> extends
AbstractAutoCloseable<RemoteServiceEndpoint<I, O>> implements
RemoteServiceEndpoint<I, O> {
+ public final class RemoteServiceEndpointImpl extends
AbstractAutoCloseable<RemoteServiceEndpoint> implements RemoteServiceEndpoint {
private final MarshallerFactory<ByteBuffer> marshallerFactory;
private final BufferAllocator<ByteBuffer> allocator;
@@ -590,20 +603,34 @@
this.marshallerFactory = marshallerFactory;
this.allocator = allocator;
this.identifier = identifier;
+ addCloseHandler(new CloseHandler<RemoteServiceEndpoint>() {
+ public void handleClose(final RemoteServiceEndpoint closed) {
+ ByteBuffer buffer = allocator.allocate();
+ buffer.put((byte) MessageType.SERVICE_CLOSE);
+ buffer.putInt(identifier);
+ buffer.flip();
+ try {
+ registerWriter(channel, new SimpleWriteHandler(allocator,
buffer));
+ } catch (InterruptedException e) {
+ log.warn("Service close notification was interrupted before
it could be sent");
+ }
+ }
+ });
}
- public RemoteClientEndpoint<I, O> createClientEndpoint() throws
RemotingException {
+ public RemoteClientEndpoint createClientEndpoint() throws RemotingException {
final int id = openClientFromService();
final ByteBuffer buffer = allocator.allocate();
buffer.putInt(identifier);
buffer.putInt(openClientFromService());
buffer.flip();
+ // todo - probably should bail out if we're interrupted?
boolean intr = false;
for (;;) {
try {
registerWriter(channel, new SimpleWriteHandler(allocator, buffer));
try {
- return new RemoteClientEndpointImpl<I,O>(id,
marshallerFactory, allocator);
+ return new RemoteClientEndpointImpl(id, marshallerFactory,
allocator);
} finally {
if (intr) {
Thread.currentThread().interrupt();
Modified:
remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/BasicProtocol.java
===================================================================
---
remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/BasicProtocol.java 2008-07-18
23:18:15 UTC (rev 4394)
+++
remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/BasicProtocol.java 2008-07-18
23:18:52 UTC (rev 4395)
@@ -26,7 +26,6 @@
import org.jboss.cx.remoting.spi.remote.RemoteServiceEndpoint;
import org.jboss.cx.remoting.spi.remote.RemoteClientEndpointListener;
import org.jboss.cx.remoting.RemotingException;
-import org.jboss.cx.remoting.core.marshal.JBossSerializationMarshallerFactory;
import org.jboss.cx.remoting.core.marshal.JavaSerializationMarshallerFactory;
import org.jboss.xnio.IoHandlerFactory;
import org.jboss.xnio.ChannelSource;
@@ -59,11 +58,11 @@
* @param remoteListener a listener which receives notification of the remote root
client of the incoming connection
* @return a handler factory for passing to an XNIO server
*/
- public static IoHandlerFactory<AllocatedMessageChannel> createServer(final
Executor executor, final RemoteServiceEndpoint<?, ?> localRootSource, final
BufferAllocator<ByteBuffer> allocator, final RemoteClientEndpointListener
remoteListener) {
+ public static IoHandlerFactory<AllocatedMessageChannel> createServer(final
Executor executor, final RemoteServiceEndpoint localRootSource, final
BufferAllocator<ByteBuffer> allocator, final RemoteClientEndpointListener
remoteListener) {
return new IoHandlerFactory<AllocatedMessageChannel>() {
public IoHandler<? super AllocatedMessageChannel> createHandler() {
try {
- final RemoteClientEndpoint<?, ?> remoteClientEndpoint =
localRootSource.createClientEndpoint();
+ final RemoteClientEndpoint remoteClientEndpoint =
localRootSource.createClientEndpoint();
try {
return new BasicHandler(true, allocator, remoteClientEndpoint,
executor, remoteListener, new JavaSerializationMarshallerFactory(executor));
} finally {
@@ -92,13 +91,13 @@
* @return the future client endpoint of the remote side's root client
* @throws IOException if an error occurs
*/
- public static <I, O> IoFuture<RemoteClientEndpoint<I, O>>
connect(final Executor executor, final RemoteClientEndpoint<?, ?> localRoot, final
ChannelSource<AllocatedMessageChannel> channelSource, final
BufferAllocator<ByteBuffer> allocator) throws IOException {
+ public static <I, O> IoFuture<RemoteClientEndpoint> connect(final
Executor executor, final RemoteClientEndpoint localRoot, final
ChannelSource<AllocatedMessageChannel> channelSource, final
BufferAllocator<ByteBuffer> allocator) throws IOException {
final BasicHandler basicHandler = new BasicHandler(false, allocator, localRoot,
executor, null, new JavaSerializationMarshallerFactory(executor));
final IoFuture<AllocatedMessageChannel> futureChannel =
channelSource.open(basicHandler);
- return new AbstractConvertingIoFuture<RemoteClientEndpoint<I, O>,
AllocatedMessageChannel>(futureChannel) {
+ return new AbstractConvertingIoFuture<RemoteClientEndpoint,
AllocatedMessageChannel>(futureChannel) {
@SuppressWarnings({ "unchecked" })
- protected RemoteClientEndpoint<I, O> convert(final
AllocatedMessageChannel channel) throws RemotingException {
- final RemoteClientEndpoint<?, ?> remoteClientEndpoint =
basicHandler.getRemoteClient(0);
+ protected RemoteClientEndpoint convert(final AllocatedMessageChannel channel)
throws RemotingException {
+ final RemoteClientEndpoint remoteClientEndpoint =
basicHandler.getRemoteClient(0);
return (RemoteClientEndpoint) remoteClientEndpoint;
}
};
Added:
remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/ServiceRegistry.java
===================================================================
---
remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/ServiceRegistry.java
(rev 0)
+++
remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/ServiceRegistry.java 2008-07-18
23:18:52 UTC (rev 4395)
@@ -0,0 +1,39 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2008, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+
+package org.jboss.cx.remoting.protocol.basic;
+
+import org.jboss.cx.remoting.spi.remote.RemoteServiceEndpoint;
+import org.jboss.cx.remoting.RemotingException;
+
+/**
+ *
+ */
+public interface ServiceRegistry {
+ int bind(RemoteServiceEndpoint remoteServiceEndpoint) throws RemotingException;
+
+ void bind(RemoteServiceEndpoint remoteServiceEndpoint, int id) throws
RemotingException;
+
+ void unbind(int id) throws RemotingException;
+
+ RemoteServiceEndpoint
+}
Modified:
remoting3/trunk/protocol/basic/src/test/java/org/jboss/cx/remoting/protocol/basic/ConnectionTestCase.java
===================================================================
---
remoting3/trunk/protocol/basic/src/test/java/org/jboss/cx/remoting/protocol/basic/ConnectionTestCase.java 2008-07-18
23:18:15 UTC (rev 4394)
+++
remoting3/trunk/protocol/basic/src/test/java/org/jboss/cx/remoting/protocol/basic/ConnectionTestCase.java 2008-07-18
23:18:52 UTC (rev 4395)
@@ -50,6 +50,9 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
+import java.util.Collections;
+import java.util.List;
+import java.util.LinkedList;
import java.nio.ByteBuffer;
import java.net.InetSocketAddress;
import java.io.Closeable;
@@ -63,11 +66,13 @@
}
public void testConnection() throws Throwable {
+ final List<Throwable> problems = Collections.synchronizedList(new
LinkedList<Throwable>());
final AtomicBoolean clientOpened = new AtomicBoolean(false);
+ final AtomicBoolean client2Opened = new AtomicBoolean(false);
final AtomicBoolean serviceOpened = new AtomicBoolean(false);
final AtomicBoolean clientClosed = new AtomicBoolean(false);
final AtomicBoolean serviceClosed = new AtomicBoolean(false);
- final CountDownLatch clientCloseLatch = new CountDownLatch(1);
+ final CountDownLatch cleanupLatch = new CountDownLatch(2);
final ExecutorService executorService = Executors.newCachedThreadPool();
try {
final BufferAllocator<ByteBuffer> allocator = new
BufferAllocator<ByteBuffer>() {
@@ -84,13 +89,19 @@
endpoint.setExecutor(executorService);
endpoint.start();
try {
- final RemoteServiceEndpoint<Object,Object>
serverServiceEndpoint = endpoint.createServiceEndpoint(new RequestListener<Object,
Object>() {
+ final RemoteServiceEndpoint serverServiceEndpoint =
endpoint.createServiceEndpoint(new RequestListener<Object, Object>() {
public void handleClientOpen(final ClientContext context) {
- clientOpened.set(true);
+ if (clientOpened.getAndSet(true)) {
+ if (client2Opened.getAndSet(true)) {
+ problems.add(new IllegalStateException("Too many
client opens"));
+ }
+ }
}
public void handleServiceOpen(final ServiceContext context) {
- serviceOpened.set(true);
+ if (serviceOpened.getAndSet(true)) {
+ problems.add(new IllegalStateException("Multiple
service opens"));
+ }
}
public void handleRequest(final RequestContext<Object>
context, final Object request) throws RemoteExecutionException {
@@ -101,61 +112,78 @@
try {
context.sendFailure("failed", e);
} catch (RemotingException e1) {
- System.out.println("Double fault!");
+ problems.add(e1);
}
}
}
public void handleServiceClose(final ServiceContext context) {
- serviceClosed.set(true);
+ if (serviceClosed.getAndSet(true)) {
+ problems.add(new IllegalStateException("Multiple
service closes"));
+ }
+ cleanupLatch.countDown();
}
public void handleClientClose(final ClientContext context) {
- clientClosed.set(true);
- clientCloseLatch.countDown();
+ if (clientClosed.getAndSet(true)) {
+ problems.add(new IllegalStateException("Multiple
client closes"));
+ }
+ cleanupLatch.countDown();
}
});
try {
- final Handle<RemoteServiceEndpoint<Object,Object>>
handle = serverServiceEndpoint.getHandle();
+ final Handle<RemoteServiceEndpoint> serviceHandle =
serverServiceEndpoint.getHandle();
serverServiceEndpoint.autoClose();
try {
final RemoteClientEndpointListener remoteListener = new
RemoteClientEndpointListener() {
-
- public <I, O> void notifyCreated(final
RemoteClientEndpoint<I, O> endpoint) {
-
+ public <I, O> void notifyCreated(final
RemoteClientEndpoint endpoint) {
}
};
final ConfigurableFactory<Closeable> tcpServer =
xnio.createTcpServer(executorService,
Channels.convertStreamToAllocatedMessage(BasicProtocol.createServer(executorService,
serverServiceEndpoint, allocator, remoteListener), 32768, 32768), new
InetSocketAddress(12345));
final Closeable tcpServerCloseable = tcpServer.create();
try {
// now create a client to connect to it
- final RemoteClientEndpoint<?,?> localRoot =
serverServiceEndpoint.createClientEndpoint();
- final InetSocketAddress destAddr = new
InetSocketAddress("localhost", 12345);
- final TcpClient tcpClient =
xnio.createTcpConnector().create().createChannelSource(destAddr);
- final ChannelSource<AllocatedMessageChannel>
messageChannelSource = Channels.convertStreamToAllocatedMessage(tcpClient, 32768, 32768);
- final
IoFuture<RemoteClientEndpoint<Object,Object>> futureClient =
BasicProtocol.connect(executorService, localRoot, messageChannelSource, allocator);
- final RemoteClientEndpoint<Object, Object>
clientEndpoint = futureClient.get();
+ final RemoteClientEndpoint localRoot = ;
try {
- final Client<Object,Object> client =
endpoint.createClient(clientEndpoint);
+ serverServiceEndpoint.autoClose();
+ final InetSocketAddress destAddr = new
InetSocketAddress("localhost", 12345);
+ final TcpClient tcpClient =
xnio.createTcpConnector().create().createChannelSource(destAddr);
try {
- clientEndpoint.autoClose();
- final Object result =
client.send("Test").get();
- assertEquals("response", result);
- client.close();
- tcpServerCloseable.close();
- handle.close();
+ final
ChannelSource<AllocatedMessageChannel> messageChannelSource =
Channels.convertStreamToAllocatedMessage(tcpClient, 32768, 32768);
+ final IoFuture<RemoteClientEndpoint>
futureClient = BasicProtocol.connect(executorService, localRoot, messageChannelSource,
allocator);
+ final RemoteClientEndpoint clientEndpoint =
futureClient.get();
+ try {
+ final Client<Object,Object> client =
endpoint.createClient(clientEndpoint);
+ try {
+ clientEndpoint.autoClose();
+ final Object result =
client.send("Test").get();
+ assertEquals("response",
result);
+ client.close();
+ cleanupLatch.await(500L,
TimeUnit.MILLISECONDS);
+ tcpServerCloseable.close();
+ serviceHandle.close();
+ assertTrue(serviceOpened.get());
+ assertTrue(clientOpened.get());
+ assertTrue(client2Opened.get());
+ assertTrue(clientClosed.get());
+ assertTrue(serviceClosed.get());
+ } finally {
+ IoUtils.safeClose(client);
+ }
+ } finally {
+ IoUtils.safeClose(clientEndpoint);
+ }
} finally {
- IoUtils.safeClose(client);
- clientCloseLatch.await(500L,
TimeUnit.MILLISECONDS);
+ // todo close tcpClient
}
} finally {
- IoUtils.safeClose(clientEndpoint);
+ IoUtils.safeClose(localRoot);
}
} finally {
IoUtils.safeClose(tcpServerCloseable);
}
} finally {
- IoUtils.safeClose(handle);
+ IoUtils.safeClose(serviceHandle);
}
} finally {
IoUtils.safeClose(serverServiceEndpoint);
@@ -169,9 +197,8 @@
} finally {
executorService.shutdownNow();
}
- assertTrue(serviceOpened.get());
- assertTrue(clientOpened.get());
- assertTrue(clientClosed.get());
- assertTrue(serviceClosed.get());
+ for (Throwable t : problems) {
+ throw t;
+ }
}
}
Modified: remoting3/trunk/standalone/src/main/java/org/jboss/cx/remoting/Remoting.java
===================================================================
---
remoting3/trunk/standalone/src/main/java/org/jboss/cx/remoting/Remoting.java 2008-07-18
23:18:15 UTC (rev 4394)
+++
remoting3/trunk/standalone/src/main/java/org/jboss/cx/remoting/Remoting.java 2008-07-18
23:18:52 UTC (rev 4395)
@@ -31,7 +31,7 @@
}
public static <I, O> Client<I, O> createLocalClient(Endpoint endpoint,
RequestListener<I, O> requestListener) throws RemotingException {
- final RemoteClientEndpoint<I, O> clientEndpoint =
endpoint.createClientEndpoint(requestListener);
+ final RemoteClientEndpoint clientEndpoint =
endpoint.createClientEndpoint(requestListener);
try {
return endpoint.createClient(clientEndpoint);
} finally {
@@ -40,7 +40,7 @@
}
public static <I, O> ClientSource<I, O> createLocalClientSource(Endpoint
endpoint, RequestListener<I, O> requestListener) throws RemotingException {
- final RemoteServiceEndpoint<I, O> serviceEndpoint =
endpoint.createServiceEndpoint(requestListener);
+ final RemoteServiceEndpoint serviceEndpoint =
endpoint.createServiceEndpoint(requestListener);
try {
return endpoint.createClientSource(serviceEndpoint);
} finally {