Author: david.lloyd(a)jboss.com
Date: 2008-07-19 01:25:46 -0400 (Sat, 19 Jul 2008)
New Revision: 4409
Added:
remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/AbstractConnection.java
remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/Connection.java
remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/ConnectionListener.java
remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/ServiceRegistryImpl.java
Removed:
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/remote/RemoteClientEndpointListener.java
Modified:
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/AbstractCloseable.java
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/ClientImpl.java
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/ClientSourceImpl.java
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/EndpointImpl.java
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/FutureReplyImpl.java
remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/BasicHandler.java
remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/BasicProtocol.java
remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/ServiceRegistry.java
remoting3/trunk/protocol/basic/src/test/java/org/jboss/cx/remoting/protocol/basic/ConnectionTestCase.java
Log:
Replace root client concept with a service-by-id registry, not unlike TCP ports really
Modified:
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/AbstractCloseable.java
===================================================================
---
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/AbstractCloseable.java 2008-07-19
05:12:10 UTC (rev 4408)
+++
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/AbstractCloseable.java 2008-07-19
05:25:46 UTC (rev 4409)
@@ -56,7 +56,6 @@
throw new NullPointerException("executor is null");
}
this.executor = executor;
- log.trace("Opened %s", this);
}
/**
Deleted:
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/remote/RemoteClientEndpointListener.java
===================================================================
---
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/remote/RemoteClientEndpointListener.java 2008-07-19
05:12:10 UTC (rev 4408)
+++
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/remote/RemoteClientEndpointListener.java 2008-07-19
05:25:46 UTC (rev 4409)
@@ -1,38 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2008, JBoss Inc., and individual contributors as indicated
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
- */
-
-package org.jboss.cx.remoting.spi.remote;
-
-/**
- * A listener that watches for creation of remote client endpoints.
- */
-public interface RemoteClientEndpointListener {
-
- /**
- * Receive notification of the creation of a new endpoint.
- *
- * @param <I> the request type
- * @param <O> the reply type
- * @param endpoint the endpoint that was created
- */
- <I, O> void notifyCreated(RemoteClientEndpoint endpoint);
-}
Modified: remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/ClientImpl.java
===================================================================
---
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/ClientImpl.java 2008-07-19
05:12:10 UTC (rev 4408)
+++
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/ClientImpl.java 2008-07-19
05:25:46 UTC (rev 4409)
@@ -31,6 +31,7 @@
import org.jboss.cx.remoting.spi.remote.RemoteClientEndpoint;
import org.jboss.cx.remoting.spi.remote.ReplyHandler;
import org.jboss.cx.remoting.spi.remote.RemoteRequestContext;
+import org.jboss.cx.remoting.spi.remote.Handle;
import java.util.concurrent.Executor;
/**
@@ -38,13 +39,17 @@
*/
public final class ClientImpl<I, O> extends AbstractContextImpl<Client<I,
O>> implements Client<I, O> {
- private final RemoteClientEndpoint remoteClientEndpoint;
+ private final Handle<RemoteClientEndpoint> handle;
- ClientImpl(final RemoteClientEndpoint remoteClientEndpoint, final Executor executor)
{
+ ClientImpl(final Handle<RemoteClientEndpoint> handle, final Executor executor)
{
super(executor);
- this.remoteClientEndpoint = remoteClientEndpoint;
+ this.handle = handle;
}
+ protected void closeAction() throws RemotingException {
+ handle.close();
+ }
+
public O invoke(final I request) throws RemotingException, RemoteExecutionException
{
if (! isOpen()) {
throw new RemotingException("Client is not open");
@@ -52,7 +57,7 @@
final QueueExecutor executor = new QueueExecutor();
final FutureReplyImpl<O> futureReply = new
FutureReplyImpl<O>(executor);
final ReplyHandler replyHandler = futureReply.getReplyHandler();
- final RemoteRequestContext requestContext =
remoteClientEndpoint.receiveRequest(request, replyHandler);
+ final RemoteRequestContext requestContext =
handle.getResource().receiveRequest(request, replyHandler);
futureReply.setRemoteRequestContext(requestContext);
futureReply.addCompletionHandler(new RequestCompletionHandler<O>() {
public void notifyComplete(final FutureReply<O> reply) {
@@ -69,7 +74,7 @@
}
final FutureReplyImpl<O> futureReply = new
FutureReplyImpl<O>(executor);
final ReplyHandler replyHandler = futureReply.getReplyHandler();
- final RemoteRequestContext requestContext =
remoteClientEndpoint.receiveRequest(request, replyHandler);
+ final RemoteRequestContext requestContext =
handle.getResource().receiveRequest(request, replyHandler);
futureReply.setRemoteRequestContext(requestContext);
return futureReply;
}
@@ -78,7 +83,7 @@
if (! isOpen()) {
throw new RemotingException("Client is not open");
}
- remoteClientEndpoint.receiveRequest(request);
+ handle.getResource().receiveRequest(request);
}
public String toString() {
Modified:
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/ClientSourceImpl.java
===================================================================
---
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/ClientSourceImpl.java 2008-07-19
05:12:10 UTC (rev 4408)
+++
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/ClientSourceImpl.java 2008-07-19
05:25:46 UTC (rev 4409)
@@ -37,25 +37,28 @@
*/
public final class ClientSourceImpl<I, O> extends
AbstractCloseable<ClientSource<I, O>> implements ClientSource<I, O> {
- private final RemoteServiceEndpoint serviceEndpoint;
+ private final Handle<RemoteServiceEndpoint> handle;
private final Endpoint endpoint;
- ClientSourceImpl(final RemoteServiceEndpoint serviceEndpoint, final EndpointImpl
endpoint) {
+ ClientSourceImpl(final Handle<RemoteServiceEndpoint> handle, final EndpointImpl
endpoint) {
super(endpoint.getExecutor());
- this.serviceEndpoint = serviceEndpoint;
+ this.handle = handle;
this.endpoint = endpoint;
}
+ protected void closeAction() throws RemotingException {
+ handle.close();
+ }
+
public Client<I, O> createClient() throws RemotingException {
if (! isOpen()) {
throw new RemotingException("Client source is not open");
}
- final Handle<RemoteClientEndpoint> handle =
serviceEndpoint.createClientEndpoint();
+ final Handle<RemoteClientEndpoint> clientHandle =
handle.getResource().createClientEndpoint();
try {
- final Client<I, O> client =
endpoint.createClient(handle.getResource());
- return client;
+ return endpoint.createClient(clientHandle.getResource());
} finally {
- IoUtils.safeClose(handle);
+ IoUtils.safeClose(clientHandle);
}
}
Modified: remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/EndpointImpl.java
===================================================================
---
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/EndpointImpl.java 2008-07-19
05:12:10 UTC (rev 4408)
+++
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/EndpointImpl.java 2008-07-19
05:25:46 UTC (rev 4409)
@@ -139,7 +139,7 @@
boolean ok = false;
final Handle<RemoteClientEndpoint> handle = endpoint.getHandle();
try {
- final ClientImpl<I, O> client = new ClientImpl<I, O>(endpoint,
executor);
+ final ClientImpl<I, O> client = new ClientImpl<I, O>(handle,
executor);
client.addCloseHandler(new CloseHandler<Client<I, O>>() {
public void handleClose(final Client<I, O> closed) {
IoUtils.safeClose(handle);
@@ -154,16 +154,11 @@
}
}
- public <I, O> ClientSource<I, O> createClientSource(final
RemoteServiceEndpoint endpoint) throws RemotingException {
+ public <I, O> ClientSource<I, O> createClientSource(final
RemoteServiceEndpoint remoteServiceEndpoint) throws RemotingException {
boolean ok = false;
- final Handle<RemoteServiceEndpoint> handle = endpoint.getHandle();
+ final Handle<RemoteServiceEndpoint> handle =
remoteServiceEndpoint.getHandle();
try {
- final ClientSourceImpl<I, O> client = new ClientSourceImpl<I,
O>(endpoint, this);
- client.addCloseHandler(new CloseHandler<ClientSource<I, O>>() {
- public void handleClose(final ClientSource<I, O> closed) {
- IoUtils.safeClose(handle);
- }
- });
+ final ClientSourceImpl<I, O> client = new ClientSourceImpl<I,
O>(handle, this);
ok = true;
return client;
} finally {
Modified:
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/FutureReplyImpl.java
===================================================================
---
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/FutureReplyImpl.java 2008-07-19
05:12:10 UTC (rev 4408)
+++
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/FutureReplyImpl.java 2008-07-19
05:25:46 UTC (rev 4409)
@@ -195,6 +195,9 @@
}
while (state == State.WAITING || state == State.NEW) {
try {
+ if (now >= deadline) {
+ return null;
+ }
lock.wait(deadline - now);
} catch (InterruptedException e) {
intr = true;
Added:
remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/AbstractConnection.java
===================================================================
---
remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/AbstractConnection.java
(rev 0)
+++
remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/AbstractConnection.java 2008-07-19
05:25:46 UTC (rev 4409)
@@ -0,0 +1,44 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2008, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+
+package org.jboss.cx.remoting.protocol.basic;
+
+import org.jboss.cx.remoting.spi.AbstractCloseable;
+import java.util.concurrent.Executor;
+
+/**
+ *
+ */
+public abstract class AbstractConnection extends AbstractCloseable<Connection>
implements Connection {
+ /**
+ * Basic constructor.
+ *
+ * @param executor the executor used to execute the close notification handlers
+ */
+ protected AbstractConnection(final Executor executor) {
+ super(executor);
+ }
+
+ public String toString() {
+ return "connection <" + Integer.toString(hashCode()) +
">";
+ }
+}
Modified:
remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/BasicHandler.java
===================================================================
---
remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/BasicHandler.java 2008-07-19
05:12:10 UTC (rev 4408)
+++
remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/BasicHandler.java 2008-07-19
05:25:46 UTC (rev 4409)
@@ -33,7 +33,6 @@
import org.jboss.cx.remoting.spi.remote.ReplyHandler;
import org.jboss.cx.remoting.spi.remote.RemoteRequestContext;
import org.jboss.cx.remoting.spi.remote.Handle;
-import org.jboss.cx.remoting.spi.remote.RemoteClientEndpointListener;
import org.jboss.cx.remoting.spi.marshal.Unmarshaller;
import org.jboss.cx.remoting.spi.marshal.Marshaller;
import org.jboss.cx.remoting.spi.marshal.MarshallerFactory;
@@ -75,11 +74,11 @@
private final ConcurrentMap<Integer, RemoteClientEndpoint> remoteClients =
concurrentMap();
// running on remote node
private final ConcurrentMap<Integer, ReplyHandler> outstandingRequests =
concurrentMap();
- // forwarded to remote side
+ // forwarded to remote side (handled on this side)
private final ConcurrentMap<Integer, Handle<RemoteClientEndpoint>>
forwardedClients = concurrentMap();
- // forwarded to remote side
- private final ConcurrentMap<Integer, Handle<RemoteServiceEndpoint>>
forwardedServices = concurrentMap();
+ private final ServiceRegistry registry;
+
private final boolean server;
private final BufferAllocator<ByteBuffer> allocator;
@@ -90,16 +89,13 @@
private final ObjectResolver resolver;
private final ClassLoader classLoader;
- public BasicHandler(final boolean server, final BufferAllocator<ByteBuffer>
allocator, final RemoteClientEndpoint root, final Executor executor, final
RemoteClientEndpointListener remoteListener, final MarshallerFactory<ByteBuffer>
marshallerFactory) throws RemotingException {
+ public BasicHandler(final boolean server, final BufferAllocator<ByteBuffer>
allocator, final Executor executor, final MarshallerFactory<ByteBuffer>
marshallerFactory, final ServiceRegistry registry) {
this.server = server;
this.allocator = allocator;
this.executor = executor;
- forwardedClients.put(Integer.valueOf(0),
((RemoteClientEndpoint)root).getHandle());
- final RemoteClientEndpointImpl endpoint = new RemoteClientEndpointImpl(0,
marshallerFactory, allocator);
+ this.registry = registry;
+ final RemoteClientEndpointImpl endpoint = new RemoteClientEndpointImpl(0,
allocator);
remoteClients.put(Integer.valueOf(0), endpoint);
- if (remoteListener != null) {
- remoteListener.notifyCreated(endpoint);
- }
this.marshallerFactory = marshallerFactory;
// todo
resolver = IdentityResolver.getInstance();
@@ -118,10 +114,6 @@
* Sequence number of remote clients opened locally from services from the remote
side.
*/
private final AtomicInteger remoteClientIdSeq = new AtomicInteger(1);
- /**
- * Sequence number of services forwarded to the remote side.
- */
- private final AtomicInteger localServiceIdSeq = new AtomicInteger();
public void handleOpened(final AllocatedMessageChannel channel) {
if (isnew.getAndSet(false)) {
@@ -263,23 +255,21 @@
case CLIENT_OPEN: {
final int serviceId = buffer.getInt();
final int clientId = buffer.getInt();
- final Handle<RemoteServiceEndpoint> handle =
getForwardedService(serviceId);
+ final Handle<RemoteServiceEndpoint> handle =
registry.lookup(serviceId);
if (handle == null) {
- // todo log invalid request
+ log.warn("Received client open message for unknown service
%d", Integer.valueOf(serviceId));
break;
}
final RemoteServiceEndpoint serviceEndpoint = handle.getResource();
-
+ final Handle<RemoteClientEndpoint> clientHandle =
serviceEndpoint.createClientEndpoint();
+ // todo check for duplicate
+ // todo validate the client ID
+ log.trace("Opening client %d from service %d",
Integer.valueOf(clientId), Integer.valueOf(serviceId));
+ forwardedClients.put(Integer.valueOf(clientId), clientHandle);
break;
}
case SERVICE_CLOSE: {
- final int serviceId = buffer.getInt();
- final Handle<RemoteServiceEndpoint> handle =
takeForwardedService(serviceId);
- if (handle == null) {
- log.warn("Got client close message for unknown service
%d", Integer.valueOf(serviceId));
- break;
- }
- IoUtils.safeClose(handle);
+ registry.unbind(buffer.getInt());
break;
}
default: {
@@ -326,6 +316,10 @@
return remoteClients.get(Integer.valueOf(i));
}
+ RemoteServiceEndpoint getRemoteService(final int id) {
+ return new RemoteServiceEndpointImpl(allocator, id);
+ }
+
private final class ReplyHandlerImpl implements ReplyHandler {
private final AllocatedMessageChannel channel;
@@ -421,7 +415,7 @@
int id;
do {
id = remoteClientIdSeq.getAndIncrement() << 1 | (server ? 1 : 0);
- } while (remoteClients.putIfAbsent(Integer.valueOf(id), new
RemoteClientEndpointImpl(id, null, allocator)) != null);
+ } while (remoteClients.putIfAbsent(Integer.valueOf(id), new
RemoteClientEndpointImpl(id, allocator)) != null);
return id;
}
@@ -446,14 +440,6 @@
return outstandingRequests.remove(Integer.valueOf(id));
}
- public Handle<RemoteServiceEndpoint> getForwardedService(final int id) {
- return forwardedServices.get(Integer.valueOf(id));
- }
-
- public Handle<RemoteServiceEndpoint> takeForwardedService(final int id) {
- return forwardedServices.remove(Integer.valueOf(id));
- }
-
// Writer members
private final BlockingQueue<WriteHandler> outputQueue =
CollectionUtil.blockingQueue(64);
@@ -471,19 +457,14 @@
private final class RemoteClientEndpointImpl extends
AbstractAutoCloseable<RemoteClientEndpoint> implements RemoteClientEndpoint {
private final int identifier;
- private final MarshallerFactory<ByteBuffer> marshallerFactory;
private final BufferAllocator<ByteBuffer> allocator;
- public RemoteClientEndpointImpl(final int identifier, final
MarshallerFactory<ByteBuffer> marshallerFactory, final
BufferAllocator<ByteBuffer> allocator) {
+ public RemoteClientEndpointImpl(final int identifier, final
BufferAllocator<ByteBuffer> allocator) {
super(executor);
- if (marshallerFactory == null) {
- throw new NullPointerException("marshallerFactory is null");
- }
if (allocator == null) {
throw new NullPointerException("allocator is null");
}
this.identifier = identifier;
- this.marshallerFactory = marshallerFactory;
this.allocator = allocator;
addCloseHandler(new CloseHandler<RemoteClientEndpoint>() {
public void handleClose(final RemoteClientEndpoint closed) {
@@ -501,7 +482,7 @@
}
public void receiveRequest(final Object request) {
- log.trace("Received one-way request of type %s", request == null ?
"null" : request.getClass());
+ log.trace("Sending outbound one-way request of type %s", request ==
null ? "null" : request.getClass());
try {
final Marshaller<ByteBuffer> marshaller =
marshallerFactory.createMarshaller(null);
final List<ByteBuffer> bufferList = new
ArrayList<ByteBuffer>();
@@ -529,7 +510,7 @@
}
public RemoteRequestContext receiveRequest(final Object request, final
ReplyHandler handler) {
- log.trace("Received request of type %s", request == null ?
"null" : request.getClass());
+ log.trace("Sending outbound request of type %s", request == null ?
"null" : request.getClass());
try {
final Marshaller<ByteBuffer> marshaller =
marshallerFactory.createMarshaller(null);
final List<ByteBuffer> bufferList = new
ArrayList<ByteBuffer>();
@@ -596,13 +577,11 @@
public final class RemoteServiceEndpointImpl extends
AbstractAutoCloseable<RemoteServiceEndpoint> implements RemoteServiceEndpoint {
- private final MarshallerFactory<ByteBuffer> marshallerFactory;
private final BufferAllocator<ByteBuffer> allocator;
private final int identifier;
- protected RemoteServiceEndpointImpl(final MarshallerFactory<ByteBuffer>
marshallerFactory, final BufferAllocator<ByteBuffer> allocator, final int
identifier) {
+ protected RemoteServiceEndpointImpl(final BufferAllocator<ByteBuffer>
allocator, final int identifier) {
super(executor);
- this.marshallerFactory = marshallerFactory;
this.allocator = allocator;
this.identifier = identifier;
addCloseHandler(new CloseHandler<RemoteServiceEndpoint>() {
@@ -621,10 +600,11 @@
}
public Handle<RemoteClientEndpoint> createClientEndpoint() throws
RemotingException {
- final int id = openClientFromService();
+ final int clientId = openClientFromService();
final ByteBuffer buffer = allocator.allocate();
+ buffer.put((byte) MessageType.CLIENT_OPEN);
buffer.putInt(identifier);
- buffer.putInt(openClientFromService());
+ buffer.putInt(clientId);
buffer.flip();
// todo - probably should bail out if we're interrupted?
boolean intr = false;
@@ -632,7 +612,7 @@
try {
registerWriter(channel, new SimpleWriteHandler(allocator, buffer));
try {
- return new RemoteClientEndpointImpl(id, marshallerFactory,
allocator).getHandle();
+ return new RemoteClientEndpointImpl(clientId,
allocator).getHandle();
} finally {
if (intr) {
Thread.currentThread().interrupt();
Modified:
remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/BasicProtocol.java
===================================================================
---
remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/BasicProtocol.java 2008-07-19
05:12:10 UTC (rev 4408)
+++
remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/BasicProtocol.java 2008-07-19
05:25:46 UTC (rev 4409)
@@ -22,10 +22,9 @@
package org.jboss.cx.remoting.protocol.basic;
-import org.jboss.cx.remoting.spi.remote.RemoteClientEndpoint;
-import org.jboss.cx.remoting.spi.remote.RemoteServiceEndpoint;
-import org.jboss.cx.remoting.spi.remote.RemoteClientEndpointListener;
import org.jboss.cx.remoting.RemotingException;
+import org.jboss.cx.remoting.spi.remote.RemoteServiceEndpoint;
+import org.jboss.cx.remoting.spi.remote.Handle;
import org.jboss.cx.remoting.core.marshal.JavaSerializationMarshallerFactory;
import org.jboss.xnio.IoHandlerFactory;
import org.jboss.xnio.ChannelSource;
@@ -53,20 +52,13 @@
* Create a request server for the basic protocol.
*
* @param executor the executor to use for invocations
- * @param localRootSource the service to draw client endpoints from for root clients
on inbound connections
* @param allocator the buffer allocator to use
- * @param remoteListener a listener which receives notification of the remote root
client of the incoming connection
* @return a handler factory for passing to an XNIO server
*/
- public static IoHandlerFactory<AllocatedMessageChannel> createServer(final
Executor executor, final RemoteServiceEndpoint localRootSource, final
BufferAllocator<ByteBuffer> allocator, final RemoteClientEndpointListener
remoteListener) {
+ public static IoHandlerFactory<AllocatedMessageChannel> createServer(final
Executor executor, final BufferAllocator<ByteBuffer> allocator, final
ServiceRegistry serviceRegistry) {
return new IoHandlerFactory<AllocatedMessageChannel>() {
public IoHandler<? super AllocatedMessageChannel> createHandler() {
- try {
- final RemoteClientEndpoint remoteClientEndpoint =
localRootSource.createClientEndpoint().getResource();
- return new BasicHandler(true, allocator, remoteClientEndpoint,
executor, remoteListener, new JavaSerializationMarshallerFactory(executor));
- } catch (RemotingException e) {
- throw new IllegalStateException("The local root endpoint is
unusable", e);
- }
+ return new BasicHandler(true, allocator, executor, new
JavaSerializationMarshallerFactory(executor), serviceRegistry);
}
};
}
@@ -75,19 +67,22 @@
* Create a request client for the basic protocol.
*
* @param executor the executor to use for invocations
- * @param localRoot the client endpoint to use as the local root client
* @param channelSource the XNIO channel source to use to establish the connection
* @param allocator the buffer allocator to use
- * @return the future client endpoint of the remote side's root client
+ * @param serviceRegistry the service registry
+ * @return a handle which may be used to close the connection
* @throws IOException if an error occurs
*/
- public static IoFuture<RemoteClientEndpoint> connect(final Executor executor,
final RemoteClientEndpoint localRoot, final ChannelSource<AllocatedMessageChannel>
channelSource, final BufferAllocator<ByteBuffer> allocator) throws IOException {
- final BasicHandler basicHandler = new BasicHandler(false, allocator, localRoot,
executor, null, new JavaSerializationMarshallerFactory(executor));
+ public static IoFuture<Connection> connect(final Executor executor, final
ChannelSource<AllocatedMessageChannel> channelSource, final
BufferAllocator<ByteBuffer> allocator, final ServiceRegistry serviceRegistry) throws
IOException {
+ final BasicHandler basicHandler = new BasicHandler(false, allocator, executor,
new JavaSerializationMarshallerFactory(executor), serviceRegistry);
final IoFuture<AllocatedMessageChannel> futureChannel =
channelSource.open(basicHandler);
- return new AbstractConvertingIoFuture<RemoteClientEndpoint,
AllocatedMessageChannel>(futureChannel) {
- protected RemoteClientEndpoint convert(final AllocatedMessageChannel channel)
throws RemotingException {
- final RemoteClientEndpoint remoteClientEndpoint =
basicHandler.getRemoteClient(0);
- return remoteClientEndpoint;
+ return new AbstractConvertingIoFuture<Connection,
AllocatedMessageChannel>(futureChannel) {
+ protected Connection convert(final AllocatedMessageChannel channel) throws
RemotingException {
+ return new AbstractConnection(executor) {
+ public Handle<RemoteServiceEndpoint> getServiceForId(final int
id) throws RemotingException {
+ return basicHandler.getRemoteService(id).getHandle();
+ }
+ };
}
};
}
Added:
remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/Connection.java
===================================================================
---
remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/Connection.java
(rev 0)
+++
remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/Connection.java 2008-07-19
05:25:46 UTC (rev 4409)
@@ -0,0 +1,35 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2008, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+
+package org.jboss.cx.remoting.protocol.basic;
+
+import org.jboss.cx.remoting.spi.remote.RemoteServiceEndpoint;
+import org.jboss.cx.remoting.spi.remote.Handle;
+import org.jboss.cx.remoting.Closeable;
+import org.jboss.cx.remoting.RemotingException;
+
+/**
+ *
+ */
+public interface Connection extends Closeable<Connection> {
+ Handle<RemoteServiceEndpoint> getServiceForId(int id) throws
RemotingException;
+}
Added:
remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/ConnectionListener.java
===================================================================
---
remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/ConnectionListener.java
(rev 0)
+++
remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/ConnectionListener.java 2008-07-19
05:25:46 UTC (rev 4409)
@@ -0,0 +1,30 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2008, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+
+package org.jboss.cx.remoting.protocol.basic;
+
+/**
+ *
+ */
+public interface ConnectionListener {
+ void handleOpened(Connection connection);
+}
Modified:
remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/ServiceRegistry.java
===================================================================
---
remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/ServiceRegistry.java 2008-07-19
05:12:10 UTC (rev 4408)
+++
remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/ServiceRegistry.java 2008-07-19
05:25:46 UTC (rev 4409)
@@ -36,5 +36,7 @@
void unbind(int id) throws RemotingException;
+ void clear();
+
Handle<RemoteServiceEndpoint> lookup(int id) throws RemotingException;
}
Added:
remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/ServiceRegistryImpl.java
===================================================================
---
remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/ServiceRegistryImpl.java
(rev 0)
+++
remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/ServiceRegistryImpl.java 2008-07-19
05:25:46 UTC (rev 4409)
@@ -0,0 +1,111 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2008, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+
+package org.jboss.cx.remoting.protocol.basic;
+
+import org.jboss.cx.remoting.spi.remote.RemoteServiceEndpoint;
+import org.jboss.cx.remoting.spi.remote.Handle;
+import org.jboss.cx.remoting.RemotingException;
+import org.jboss.cx.remoting.util.CollectionUtil;
+import org.jboss.xnio.IoUtils;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.Iterator;
+
+/**
+ *
+ */
+public final class ServiceRegistryImpl implements ServiceRegistry {
+
+ private static final int START = 32768;
+
+ private final ConcurrentMap<Integer, Handle<RemoteServiceEndpoint>> map =
CollectionUtil.concurrentMap();
+ private final AtomicInteger dynamicSequence = new AtomicInteger(START);
+ private final ServiceRegistry parent;
+
+ public ServiceRegistryImpl(final ServiceRegistry parent) {
+ this.parent = parent;
+ }
+
+ public ServiceRegistryImpl() {
+ parent = null;
+ }
+
+ public int bind(final RemoteServiceEndpoint remoteServiceEndpoint) throws
RemotingException {
+ final Handle<RemoteServiceEndpoint> handle =
remoteServiceEndpoint.getHandle();
+ boolean ok = false;
+ try {
+ for (;;) {
+ final int seqVal = dynamicSequence.getAndIncrement();
+ if (seqVal < 0) {
+ dynamicSequence.compareAndSet(seqVal, START);
+ continue;
+ }
+ if (map.putIfAbsent(Integer.valueOf(seqVal), handle) != null) {
+ ok = true;
+ return seqVal;
+ }
+ }
+ } finally {
+ if (! ok) {
+ IoUtils.safeClose(handle);
+ }
+ }
+ }
+
+ public void bind(final RemoteServiceEndpoint remoteServiceEndpoint, final int id)
throws RemotingException {
+ final Handle<RemoteServiceEndpoint> handle =
remoteServiceEndpoint.getHandle();
+ boolean ok = false;
+ try {
+ if (map.putIfAbsent(Integer.valueOf(id), handle) != null) {
+ throw new RemotingException("Service already bound to that
ID");
+ }
+ ok = true;
+ } finally {
+ if (! ok) {
+ IoUtils.safeClose(handle);
+ }
+ }
+ }
+
+ public void unbind(final int id) throws RemotingException {
+ map.remove(Integer.valueOf(id));
+ }
+
+ public void clear() {
+ Iterator<Handle<RemoteServiceEndpoint>> it =
map.values().iterator();
+ while (it.hasNext()) {
+ IoUtils.safeClose(it.next());
+ it.remove();
+ }
+ }
+
+ public Handle<RemoteServiceEndpoint> lookup(final int id) throws
RemotingException {
+ final Handle<RemoteServiceEndpoint> handle = map.get(Integer.valueOf(id));
+ return handle != null || parent == null ? handle.getResource().getHandle() :
parent.lookup(id);
+ }
+
+ protected void finalize() throws Throwable {
+ clear();
+ super.finalize();
+ }
+}
Modified:
remoting3/trunk/protocol/basic/src/test/java/org/jboss/cx/remoting/protocol/basic/ConnectionTestCase.java
===================================================================
---
remoting3/trunk/protocol/basic/src/test/java/org/jboss/cx/remoting/protocol/basic/ConnectionTestCase.java 2008-07-19
05:12:10 UTC (rev 4408)
+++
remoting3/trunk/protocol/basic/src/test/java/org/jboss/cx/remoting/protocol/basic/ConnectionTestCase.java 2008-07-19
05:25:46 UTC (rev 4409)
@@ -22,40 +22,42 @@
package org.jboss.cx.remoting.protocol.basic;
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.net.InetSocketAddress;
+import java.io.Closeable;
import junit.framework.TestCase;
-import org.jboss.xnio.Xnio;
-import org.jboss.xnio.IoUtils;
-import org.jboss.xnio.BufferAllocator;
-import org.jboss.xnio.ConfigurableFactory;
-import org.jboss.xnio.ChannelSource;
-import org.jboss.xnio.TcpClient;
-import org.jboss.xnio.IoFuture;
-import org.jboss.xnio.channels.Channels;
-import org.jboss.xnio.channels.AllocatedMessageChannel;
import org.jboss.cx.remoting.core.EndpointImpl;
-import org.jboss.cx.remoting.RequestContext;
-import org.jboss.cx.remoting.RemoteExecutionException;
+import org.jboss.cx.remoting.test.support.LoggingHelper;
import org.jboss.cx.remoting.RequestListener;
import org.jboss.cx.remoting.ClientContext;
import org.jboss.cx.remoting.ServiceContext;
+import org.jboss.cx.remoting.RequestContext;
+import org.jboss.cx.remoting.RemoteExecutionException;
+import org.jboss.cx.remoting.ClientSource;
import org.jboss.cx.remoting.Client;
import org.jboss.cx.remoting.RemotingException;
-import org.jboss.cx.remoting.test.support.LoggingHelper;
+import org.jboss.cx.remoting.FutureReply;
+import org.jboss.cx.remoting.AbstractRequestListener;
import org.jboss.cx.remoting.spi.remote.RemoteServiceEndpoint;
-import org.jboss.cx.remoting.spi.remote.RemoteClientEndpointListener;
import org.jboss.cx.remoting.spi.remote.Handle;
-import org.jboss.cx.remoting.spi.remote.RemoteClientEndpoint;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import java.util.Collections;
-import java.util.List;
-import java.util.LinkedList;
-import java.nio.ByteBuffer;
-import java.net.InetSocketAddress;
-import java.io.Closeable;
+import org.jboss.xnio.BufferAllocator;
+import org.jboss.xnio.IoUtils;
+import org.jboss.xnio.Xnio;
+import org.jboss.xnio.IoHandlerFactory;
+import org.jboss.xnio.ConfigurableFactory;
+import org.jboss.xnio.IoFuture;
+import org.jboss.xnio.TcpConnector;
+import org.jboss.xnio.TcpClient;
+import org.jboss.xnio.ChannelSource;
+import org.jboss.xnio.channels.AllocatedMessageChannel;
+import org.jboss.xnio.channels.Channels;
+import org.jboss.xnio.channels.StreamChannel;
/**
*
@@ -66,13 +68,9 @@
}
public void testConnection() throws Throwable {
+ final String REQUEST = "request";
+ final String REPLY = "reply";
final List<Throwable> problems = Collections.synchronizedList(new
LinkedList<Throwable>());
- final AtomicBoolean clientOpened = new AtomicBoolean(false);
- final AtomicBoolean client2Opened = new AtomicBoolean(false);
- final AtomicBoolean serviceOpened = new AtomicBoolean(false);
- final AtomicBoolean clientClosed = new AtomicBoolean(false);
- final AtomicBoolean serviceClosed = new AtomicBoolean(false);
- final CountDownLatch cleanupLatch = new CountDownLatch(2);
final ExecutorService executorService = Executors.newCachedThreadPool();
try {
final BufferAllocator<ByteBuffer> allocator = new
BufferAllocator<ByteBuffer>() {
@@ -88,6 +86,69 @@
final EndpointImpl endpoint = new EndpointImpl();
endpoint.setExecutor(executorService);
endpoint.start();
+ try {
+ final ServiceRegistry serviceRegistry = new ServiceRegistryImpl();
+ try {
+ final Handle<RemoteServiceEndpoint> serviceEndpointHandle =
endpoint.createServiceEndpoint(new AbstractRequestListener<Object, Object>() {
+ public void handleRequest(final RequestContext<Object>
context, final Object request) throws RemoteExecutionException {
+ try {
+ context.sendReply(REPLY);
+ } catch (RemotingException e) {
+ e.printStackTrace();
+ }
+ }
+ });
+ try {
+ serviceRegistry.bind(serviceEndpointHandle.getResource(),
13);
+ final IoHandlerFactory<AllocatedMessageChannel>
handlerFactory = BasicProtocol.createServer(executorService, allocator, serviceRegistry);
+ final IoHandlerFactory<StreamChannel> newHandlerFactory
= Channels.convertStreamToAllocatedMessage(handlerFactory, 32768, 32768);
+ final ConfigurableFactory<Closeable> tcpServerFactory =
xnio.createTcpServer(newHandlerFactory, new InetSocketAddress(12345));
+ final Closeable tcpServerCloseable =
tcpServerFactory.create();
+ try {
+ final ConfigurableFactory<TcpConnector>
connectorFactory = xnio.createTcpConnector();
+ final TcpConnector connector =
connectorFactory.create();
+ try {
+ final TcpClient tcpClient =
connector.createChannelSource(new InetSocketAddress("localhost", 12345));
+ final ChannelSource<AllocatedMessageChannel>
channelSource = Channels.convertStreamToAllocatedMessage(tcpClient, 32768, 32768);
+ final IoFuture<Connection> futureCloseable =
BasicProtocol.connect(executorService, channelSource, allocator, serviceRegistry);
+ final Connection connection = futureCloseable.get();
+ try {
+ final Handle<RemoteServiceEndpoint>
handleThirteen = connection.getServiceForId(13);
+ try {
+ final RemoteServiceEndpoint serviceThirteen =
handleThirteen.getResource();
+ final ClientSource<Object,Object>
clientSource = endpoint.createClientSource(serviceThirteen);
+ try {
+ final Client<Object,Object> client
= clientSource.createClient();
+ try {
+ final FutureReply<Object>
future = client.send(REQUEST);
+ assertEquals(REPLY, future.get(500L,
TimeUnit.MILLISECONDS));
+ } finally {
+ IoUtils.safeClose(client);
+ }
+ } finally {
+ IoUtils.safeClose(clientSource);
+ }
+ } finally {
+ IoUtils.safeClose(handleThirteen);
+ }
+ } finally {
+ IoUtils.safeClose(connection);
+ }
+ } finally {
+ // todo close connector
+ }
+ } finally {
+ IoUtils.safeClose(tcpServerCloseable);
+ }
+ } finally {
+ IoUtils.safeClose(serviceEndpointHandle);
+ }
+ } finally {
+ serviceRegistry.clear();
+ }
+ } finally {
+ endpoint.stop();
+ }
} finally {
IoUtils.safeClose(xnio);
}