[jboss-remoting-commits] JBoss Remoting SVN: r4463 - in remoting3/trunk: api/src/main/java/org/jboss/cx/remoting/spi and 7 other directories.

jboss-remoting-commits at lists.jboss.org jboss-remoting-commits at lists.jboss.org
Mon Aug 4 22:59:58 EDT 2008


Author: david.lloyd at jboss.com
Date: 2008-08-04 22:59:58 -0400 (Mon, 04 Aug 2008)
New Revision: 4463

Added:
   remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/HandleableCloseable.java
   remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/RemoteServiceRegistration.java
   remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/ServiceListener.java
   remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/SimpleCloseable.java
   remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/AbstractHandleableCloseable.java
   remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/AbstractSimpleCloseable.java
   remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/FutureClientSource.java
   remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/ServiceRegistration.java
Removed:
   remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/Closeable.java
   remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/AbstractCloseable.java
Modified:
   remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/Client.java
   remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/ClientContext.java
   remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/ClientSource.java
   remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/Endpoint.java
   remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/ServiceContext.java
   remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/AbstractAutoCloseable.java
   remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/remote/Handle.java
   remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/remote/RequestHandler.java
   remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/remote/RequestHandlerSource.java
   remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/wrapper/EndpointWrapper.java
   remoting3/trunk/api/src/test/java/org/jboss/cx/remoting/spi/CloseableTestCase.java
   remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/AbstractContextImpl.java
   remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/ClientSourceImpl.java
   remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/EndpointImpl.java
   remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/AbstractConnection.java
   remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/Connection.java
   remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/MessageType.java
   remoting3/trunk/protocol/basic/src/test/java/org/jboss/cx/remoting/protocol/basic/ConnectionTestCase.java
   remoting3/trunk/standalone/src/main/java/org/jboss/cx/remoting/Remoting.java
Log:
API changes for service registration, location

Modified: remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/Client.java
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/Client.java	2008-08-04 04:21:46 UTC (rev 4462)
+++ remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/Client.java	2008-08-05 02:59:58 UTC (rev 4463)
@@ -10,7 +10,7 @@
  * @param <I> the request type
  * @param <O> the reply type
  */
-public interface Client<I, O> extends Closeable<Client<I, O>> {
+public interface Client<I, O> extends HandleableCloseable<Client<I, O>> {
     /**
      * Send a request and block until a reply is received.
      * <p/>

Modified: remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/ClientContext.java
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/ClientContext.java	2008-08-04 04:21:46 UTC (rev 4462)
+++ remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/ClientContext.java	2008-08-05 02:59:58 UTC (rev 4463)
@@ -5,7 +5,7 @@
 /**
  * The server context for a single remote client instance.
  */
-public interface ClientContext extends Closeable<ClientContext> {
+public interface ClientContext extends HandleableCloseable<ClientContext> {
     /**
      * Get the attributes for this end of the channel as a map.
      *

Modified: remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/ClientSource.java
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/ClientSource.java	2008-08-04 04:21:46 UTC (rev 4462)
+++ remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/ClientSource.java	2008-08-05 02:59:58 UTC (rev 4463)
@@ -8,7 +8,7 @@
  * @param <I> the request type
  * @param <O> the reply type
  */
-public interface ClientSource<I, O> extends Closeable<ClientSource<I, O>> {
+public interface ClientSource<I, O> extends HandleableCloseable<ClientSource<I, O>> {
     /**
      * Close the context source.  New contexts may no longer be created after this
      * method is called.  Subsequent calls to this method have no additional effect.

Deleted: remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/Closeable.java
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/Closeable.java	2008-08-04 04:21:46 UTC (rev 4462)
+++ remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/Closeable.java	2008-08-05 02:59:58 UTC (rev 4463)
@@ -1,26 +0,0 @@
-package org.jboss.cx.remoting;
-
-import java.io.IOException;
-
-/**
- * A Remoting resource that can be closed.
- *
- * @param <T> the type that is passed to the close handler
- */
-public interface Closeable<T> extends java.io.Closeable {
-
-    /**
-     * Close, waiting for any outstanding processing to finish.
-     *
-     * @throws IOException if the close failed
-     */
-    void close() throws IOException;
-
-    /**
-     * Add a handler that will be called upon close.  The handler may be called before or after the close acutally
-     * takes place.
-     *
-     * @param handler the close handler
-     */
-    void addCloseHandler(CloseHandler<? super T> handler);
-}

Modified: remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/Endpoint.java
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/Endpoint.java	2008-08-04 04:21:46 UTC (rev 4462)
+++ remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/Endpoint.java	2008-08-05 02:59:58 UTC (rev 4463)
@@ -1,10 +1,12 @@
 package org.jboss.cx.remoting;
 
 import java.util.concurrent.ConcurrentMap;
+import java.net.URI;
 import java.io.IOException;
 import org.jboss.cx.remoting.spi.remote.RequestHandler;
 import org.jboss.cx.remoting.spi.remote.RequestHandlerSource;
 import org.jboss.cx.remoting.spi.remote.Handle;
+import org.jboss.xnio.IoFuture;
 
 /**
  * A potential participant in a JBoss Remoting communications relationship.
@@ -51,10 +53,12 @@
      * @param <I> the request type
      * @param <O> the reply type
      * @param requestListener the request listener
+     * @param serviceType the type of service to advertise
+     * @param groupName the name of the group of this type to be part of
      * @return a handle for the client source
      * @throws IOException if an error occurs
      */
-    <I, O> Handle<RequestHandlerSource> createRequestHandlerSource(RequestListener<I, O> requestListener) throws IOException;
+    <I, O> Handle<RequestHandlerSource> createRequestHandlerSource(RequestListener<I, O> requestListener, String serviceType, String groupName) throws IOException;
 
     /**
      * Create a client that uses the given request handler to handle its requests.
@@ -77,4 +81,42 @@
      * @throws IOException if an error occurs
      */
     <I, O> ClientSource<I, O> createClientSource(RequestHandlerSource handlerSource) throws IOException;
+
+    /**
+     * Attempt to locate a service.  The return value then be queried for the service's {@code ClientSource}.
+     *
+     * @param <I> the request type
+     * @param <O> the reply type
+     * @param serviceUri the URI of the service
+     * @return the future service
+     * @throws IllegalArgumentException if the given URI is not a valid Remoting service URI
+     */
+    <I, O> IoFuture<ClientSource<I, O>> locateService(URI serviceUri) throws IllegalArgumentException;
+
+    /**
+     * Register a remotely available service.<p>
+     * The remote endpoint may not have the same name as this endpoint.  The group name and service type must be
+     * non-{@code null} and non-empty.  The metric must be greater than zero.
+     *
+     * @param serviceType the service type string
+     * @param groupName the group name
+     * @param endpointName the name of the remote endpoint
+     * @param handlerSource the remote handler source
+     * @param metric the preference metric, lower is more preferred
+     * @return a handle corresponding to the registration
+     * @throws IllegalArgumentException if one of the given arguments was not valid
+     * @throws IOException if an error occurs with the registration
+     */
+    SimpleCloseable registerRemoteService(String serviceType, String groupName, String endpointName, RequestHandlerSource handlerSource, int metric) throws IllegalArgumentException, IOException;
+
+    /**
+     * Add a listener for observing when local and remote services are added.  The caller may specify whether the listener
+     * should be notified of the complete list of currently registered services (set {@code onlyNew} to {@code false})
+     * or only services registered after the time of calling this method (set {@code onlyNew} to {@code true}).
+     *
+     * @param serviceListener the listener
+     * @param onlyNew {@code true} if only new registrations should be sent to the listener
+     * @return a handle which may be used to unregister the listener
+     */
+    SimpleCloseable addServiceListener(ServiceListener serviceListener, boolean onlyNew);
 }

Copied: remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/HandleableCloseable.java (from rev 4437, remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/Closeable.java)
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/HandleableCloseable.java	                        (rev 0)
+++ remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/HandleableCloseable.java	2008-08-05 02:59:58 UTC (rev 4463)
@@ -0,0 +1,27 @@
+package org.jboss.cx.remoting;
+
+import java.io.IOException;
+import java.io.Closeable;
+
+/**
+ * A Remoting resource that can be closed.
+ *
+ * @param <T> the type that is passed to the close handler
+ */
+public interface HandleableCloseable<T> extends Closeable {
+
+    /**
+     * Close, waiting for any outstanding processing to finish.
+     *
+     * @throws IOException if the close failed
+     */
+    void close() throws IOException;
+
+    /**
+     * Add a handler that will be called upon close.  The handler may be called before or after the close acutally
+     * takes place.
+     *
+     * @param handler the close handler
+     */
+    void addCloseHandler(CloseHandler<? super T> handler);
+}

Added: remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/RemoteServiceRegistration.java
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/RemoteServiceRegistration.java	                        (rev 0)
+++ remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/RemoteServiceRegistration.java	2008-08-05 02:59:58 UTC (rev 4463)
@@ -0,0 +1,29 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2008, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.cx.remoting;
+
+/**
+ *
+ */
+public interface RemoteServiceRegistration extends HandleableCloseable<RemoteServiceRegistration> {
+}

Modified: remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/ServiceContext.java
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/ServiceContext.java	2008-08-04 04:21:46 UTC (rev 4462)
+++ remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/ServiceContext.java	2008-08-05 02:59:58 UTC (rev 4463)
@@ -6,7 +6,7 @@
  * The server-side context of a service.  Used to hold state relating to a service (known as a {@code ContextSource} on
  * the client side).
  */
-public interface ServiceContext extends Closeable<ServiceContext> {
+public interface ServiceContext extends HandleableCloseable<ServiceContext> {
 
     /**
      * Get an attribute map which can be used to cache arbitrary state on the server side.

Added: remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/ServiceListener.java
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/ServiceListener.java	                        (rev 0)
+++ remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/ServiceListener.java	2008-08-05 02:59:58 UTC (rev 4463)
@@ -0,0 +1,56 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2008, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.cx.remoting;
+
+import org.jboss.cx.remoting.spi.remote.RequestHandlerSource;
+
+/**
+ * A listener for watching service registrations on an endpoint.
+ */
+public interface ServiceListener {
+
+    /**
+     * Receive notification that a local service was added.  To receive a notification when it is closed, register a
+     * close handler on the provided {@code requestHandlerSource} parameter.
+     *
+     * @param listenerHandle the handle to this listener
+     * @param serviceType the service type string
+     * @param groupName the group name string
+     * @param requestHandlerSource the request handler source
+     */
+    void localServiceCreated(SimpleCloseable listenerHandle, String serviceType, String groupName, RequestHandlerSource requestHandlerSource);
+
+    /**
+     * Receive notification that a remote service was registered.  To receive a notification when it is unregistered, register a
+     * close handler on the provided {@code handle} parameter.
+     *
+     * @param listenerHandle the handle to this listener
+     * @param endpointName the remote endpoint name
+     * @param serviceType the service type string
+     * @param groupName the group name string
+     * @param metric the metric value
+     * @param requestHandlerSource the request handler source
+     * @param handle the handle to the registration
+     */
+    void remoteServiceRegistered(SimpleCloseable listenerHandle, String endpointName, String serviceType, String groupName, int metric, RequestHandlerSource requestHandlerSource, SimpleCloseable handle);
+}

Added: remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/SimpleCloseable.java
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/SimpleCloseable.java	                        (rev 0)
+++ remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/SimpleCloseable.java	2008-08-05 02:59:58 UTC (rev 4463)
@@ -0,0 +1,29 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2008, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.cx.remoting;
+
+/**
+ * A simple close-only handle.
+ */
+public interface SimpleCloseable extends HandleableCloseable<SimpleCloseable> {
+}

Modified: remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/AbstractAutoCloseable.java
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/AbstractAutoCloseable.java	2008-08-04 04:21:46 UTC (rev 4462)
+++ remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/AbstractAutoCloseable.java	2008-08-05 02:59:58 UTC (rev 4463)
@@ -33,7 +33,7 @@
  * A closeable implementation that supports reference counting.  Since the initial reference count is zero, implementors
  * must be careful to ensure that the first operation invoked is a call to {@link #getHandle()}.
  */
-public abstract class AbstractAutoCloseable<T> extends AbstractCloseable<T> {
+public abstract class AbstractAutoCloseable<T> extends AbstractHandleableCloseable<T> {
 
     private final AtomicInteger refcount = new AtomicInteger(0);
     private final Executor executor;
@@ -100,7 +100,7 @@
         return new HandleImpl();
     }
 
-    private final class HandleImpl extends AbstractCloseable<Handle<T>> implements Handle<T> {
+    private final class HandleImpl extends AbstractHandleableCloseable<Handle<T>> implements Handle<T> {
         private HandleImpl() throws IOException {
             super(AbstractAutoCloseable.this.executor);
             inc();

Deleted: remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/AbstractCloseable.java
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/AbstractCloseable.java	2008-08-04 04:21:46 UTC (rev 4462)
+++ remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/AbstractCloseable.java	2008-08-05 02:59:58 UTC (rev 4463)
@@ -1,178 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2008, JBoss Inc., and individual contributors as indicated
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-
-package org.jboss.cx.remoting.spi;
-
-import org.jboss.cx.remoting.Closeable;
-import org.jboss.cx.remoting.RemotingException;
-import org.jboss.cx.remoting.CloseHandler;
-import org.jboss.cx.remoting.spi.SpiUtils;
-import org.jboss.xnio.IoUtils;
-import org.jboss.xnio.log.Logger;
-import java.util.HashSet;
-import java.util.Set;
-import java.util.concurrent.Executor;
-import java.util.concurrent.RejectedExecutionException;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.security.AccessController;
-import java.security.PrivilegedAction;
-import java.io.IOException;
-
-/**
- * A basic implementation of a closeable resource.  Use as a convenient base class for your closeable resources.
- * Ensures that the {@code close()} method is idempotent; implements the registry of close handlers.
- */
-public abstract class AbstractCloseable<T> implements Closeable<T> {
-
-    private static final Logger log = Logger.getLogger(AbstractCloseable.class);
-
-    protected final Executor executor;
-    private final Object closeLock = new Object();
-    private final AtomicBoolean closed = new AtomicBoolean();
-    private Set<CloseHandler<? super T>> closeHandlers;
-
-    private static final boolean LEAK_DEBUGGING;
-    private final StackTraceElement[] backtrace;
-
-    static {
-        boolean b = false;
-        try {
-            b = Boolean.parseBoolean(AccessController.doPrivileged(new PrivilegedAction<String>() {
-                public String run() {
-                    return System.getProperty("jboss.remoting.leakdebugging", "false");
-                }
-            }));
-        } catch (SecurityException se) {
-            b = false;
-        }
-        LEAK_DEBUGGING = b;
-    }
-
-    /**
-     * Basic constructor.
-     *
-     * @param executor the executor used to execute the close notification handlers
-     */
-    protected AbstractCloseable(final Executor executor) {
-        if (executor == null) {
-            throw new NullPointerException("executor is null");
-        }
-        this.executor = executor;
-        backtrace = LEAK_DEBUGGING ? new Throwable().getStackTrace() : null;
-    }
-
-    /**
-     * Read the status of this resource.  This is just a snapshot in time; there is no guarantee that the resource
-     * will remain open for any amount of time, even if this method returns {@code true}.
-     *
-     * @return {@code true} if the resource is still open
-     */
-    protected boolean isOpen() {
-        return ! closed.get();
-    }
-
-    /**
-     * Called exactly once when the {@code close()} method is invoked; the actual close operation should take place here.
-     *
-     * @throws RemotingException if the close failed
-     */
-    protected void closeAction() throws IOException {}
-
-    /**
-     * {@inheritDoc}
-     */
-    public final void close() throws IOException {
-        if (! closed.getAndSet(true)) {
-            log.trace("Closed %s", this);
-            synchronized (closeLock) {
-                if (closeHandlers != null) {
-                    for (final CloseHandler<? super T> handler : closeHandlers) {
-                        try {
-                            executor.execute(new Runnable() {
-                                @SuppressWarnings({ "unchecked" })
-                                public void run() {
-                                    SpiUtils.safeHandleClose(handler, (T) AbstractCloseable.this);
-                                }
-                            });
-                        } catch (RejectedExecutionException ree) {
-                            log.warn("Unable to execute close handler (execution rejected) for %s (%s)", this, ree.getMessage());
-                        }
-                    }
-                    closeHandlers = null;
-                }
-            }
-            closeAction();
-        }
-    }
-
-    /**
-     * {@inheritDoc}
-     */
-    public void addCloseHandler(final CloseHandler<? super T> handler) {
-        synchronized (closeLock) {
-            if (closeHandlers == null) {
-                closeHandlers = new HashSet<CloseHandler<? super T>>();
-            }
-            closeHandlers.add(handler);
-        }
-    }
-
-    /**
-     * Get the executor to use for handler invocation.
-     *
-     * @return the executor
-     */
-    protected Executor getExecutor() {
-        return executor;
-    }
-
-    /**
-     * Finalize this closeable instance.  If the instance hasn't been closed, it is closed and a warning is logged.
-     */
-    protected void finalize() throws Throwable {
-        try {
-            super.finalize();
-        } finally {
-            if (isOpen()) {
-                if (LEAK_DEBUGGING) {
-                    final Throwable t = new LeakThrowable();
-                    t.setStackTrace(backtrace);
-                    log.warn(t, "Leaked a %s instance: %s", getClass().getName(), this);
-                } else {
-                    log.warn("Leaked a %s instance: %s", getClass().getName(), this);
-                }
-                IoUtils.safeClose(this);
-            }
-        }
-    }
-
-    @SuppressWarnings({ "serial" })
-    private static final class LeakThrowable extends Throwable {
-
-        public LeakThrowable() {
-        }
-
-        public String toString() {
-            return "a leaked reference";
-        }
-    }
-}

Copied: remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/AbstractHandleableCloseable.java (from rev 4439, remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/AbstractCloseable.java)
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/AbstractHandleableCloseable.java	                        (rev 0)
+++ remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/AbstractHandleableCloseable.java	2008-08-05 02:59:58 UTC (rev 4463)
@@ -0,0 +1,178 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2008, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.cx.remoting.spi;
+
+import org.jboss.cx.remoting.HandleableCloseable;
+import org.jboss.cx.remoting.RemotingException;
+import org.jboss.cx.remoting.CloseHandler;
+import org.jboss.cx.remoting.spi.SpiUtils;
+import org.jboss.xnio.IoUtils;
+import org.jboss.xnio.log.Logger;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.Executor;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.security.AccessController;
+import java.security.PrivilegedAction;
+import java.io.IOException;
+
+/**
+ * A basic implementation of a closeable resource.  Use as a convenient base class for your closeable resources.
+ * Ensures that the {@code close()} method is idempotent; implements the registry of close handlers.
+ */
+public abstract class AbstractHandleableCloseable<T> implements HandleableCloseable<T> {
+
+    private static final Logger log = Logger.getLogger(AbstractHandleableCloseable.class);
+
+    protected final Executor executor;
+    private final Object closeLock = new Object();
+    private final AtomicBoolean closed = new AtomicBoolean();
+    private Set<CloseHandler<? super T>> closeHandlers;
+
+    private static final boolean LEAK_DEBUGGING;
+    private final StackTraceElement[] backtrace;
+
+    static {
+        boolean b = false;
+        try {
+            b = Boolean.parseBoolean(AccessController.doPrivileged(new PrivilegedAction<String>() {
+                public String run() {
+                    return System.getProperty("jboss.remoting.leakdebugging", "false");
+                }
+            }));
+        } catch (SecurityException se) {
+            b = false;
+        }
+        LEAK_DEBUGGING = b;
+    }
+
+    /**
+     * Basic constructor.
+     *
+     * @param executor the executor used to execute the close notification handlers
+     */
+    protected AbstractHandleableCloseable(final Executor executor) {
+        if (executor == null) {
+            throw new NullPointerException("executor is null");
+        }
+        this.executor = executor;
+        backtrace = LEAK_DEBUGGING ? new Throwable().getStackTrace() : null;
+    }
+
+    /**
+     * Read the status of this resource.  This is just a snapshot in time; there is no guarantee that the resource
+     * will remain open for any amount of time, even if this method returns {@code true}.
+     *
+     * @return {@code true} if the resource is still open
+     */
+    protected boolean isOpen() {
+        return ! closed.get();
+    }
+
+    /**
+     * Called exactly once when the {@code close()} method is invoked; the actual close operation should take place here.
+     *
+     * @throws RemotingException if the close failed
+     */
+    protected void closeAction() throws IOException {}
+
+    /**
+     * {@inheritDoc}
+     */
+    public final void close() throws IOException {
+        if (! closed.getAndSet(true)) {
+            log.trace("Closed %s", this);
+            synchronized (closeLock) {
+                if (closeHandlers != null) {
+                    for (final CloseHandler<? super T> handler : closeHandlers) {
+                        try {
+                            executor.execute(new Runnable() {
+                                @SuppressWarnings({ "unchecked" })
+                                public void run() {
+                                    SpiUtils.safeHandleClose(handler, (T) AbstractHandleableCloseable.this);
+                                }
+                            });
+                        } catch (RejectedExecutionException ree) {
+                            log.warn("Unable to execute close handler (execution rejected) for %s (%s)", this, ree.getMessage());
+                        }
+                    }
+                    closeHandlers = null;
+                }
+            }
+            closeAction();
+        }
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    public void addCloseHandler(final CloseHandler<? super T> handler) {
+        synchronized (closeLock) {
+            if (closeHandlers == null) {
+                closeHandlers = new HashSet<CloseHandler<? super T>>();
+            }
+            closeHandlers.add(handler);
+        }
+    }
+
+    /**
+     * Get the executor to use for handler invocation.
+     *
+     * @return the executor
+     */
+    protected Executor getExecutor() {
+        return executor;
+    }
+
+    /**
+     * Finalize this closeable instance.  If the instance hasn't been closed, it is closed and a warning is logged.
+     */
+    protected void finalize() throws Throwable {
+        try {
+            super.finalize();
+        } finally {
+            if (isOpen()) {
+                if (LEAK_DEBUGGING) {
+                    final Throwable t = new LeakThrowable();
+                    t.setStackTrace(backtrace);
+                    log.warn(t, "Leaked a %s instance: %s", getClass().getName(), this);
+                } else {
+                    log.warn("Leaked a %s instance: %s", getClass().getName(), this);
+                }
+                IoUtils.safeClose(this);
+            }
+        }
+    }
+
+    @SuppressWarnings({ "serial" })
+    private static final class LeakThrowable extends Throwable {
+
+        public LeakThrowable() {
+        }
+
+        public String toString() {
+            return "a leaked reference";
+        }
+    }
+}

Added: remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/AbstractSimpleCloseable.java
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/AbstractSimpleCloseable.java	                        (rev 0)
+++ remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/AbstractSimpleCloseable.java	2008-08-05 02:59:58 UTC (rev 4463)
@@ -0,0 +1,41 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2008, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.cx.remoting.spi;
+
+import org.jboss.cx.remoting.SimpleCloseable;
+import java.util.concurrent.Executor;
+
+/**
+ * An abstract simple closeable implementation.
+ */
+public abstract class AbstractSimpleCloseable extends AbstractHandleableCloseable<SimpleCloseable> implements SimpleCloseable {
+
+    /**
+     * Basic constructor.
+     *
+     * @param executor the executor used to execute the close notification handlers
+     */
+    protected AbstractSimpleCloseable(final Executor executor) {
+        super(executor);
+    }
+}

Modified: remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/remote/Handle.java
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/remote/Handle.java	2008-08-04 04:21:46 UTC (rev 4462)
+++ remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/remote/Handle.java	2008-08-05 02:59:58 UTC (rev 4463)
@@ -22,14 +22,14 @@
 
 package org.jboss.cx.remoting.spi.remote;
 
-import org.jboss.cx.remoting.Closeable;
+import org.jboss.cx.remoting.HandleableCloseable;
 import org.jboss.cx.remoting.CloseHandler;
 import java.io.IOException;
 
 /**
  * A handle to a local resource.
  */
-public interface Handle<T> extends Closeable<Handle<T>> {
+public interface Handle<T> extends HandleableCloseable<Handle<T>> {
 
     /**
      * Get the resource.

Modified: remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/remote/RequestHandler.java
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/remote/RequestHandler.java	2008-08-04 04:21:46 UTC (rev 4462)
+++ remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/remote/RequestHandler.java	2008-08-05 02:59:58 UTC (rev 4463)
@@ -22,7 +22,7 @@
 
 package org.jboss.cx.remoting.spi.remote;
 
-import org.jboss.cx.remoting.Closeable;
+import org.jboss.cx.remoting.HandleableCloseable;
 import org.jboss.cx.remoting.RemotingException;
 import org.jboss.cx.remoting.CloseHandler;
 import java.io.IOException;
@@ -31,7 +31,7 @@
  * A request handler, which can be passed to remote endpoints.  Remote systems can then use the handler
  * to make invocations, or they may forward a handler on to other remote systems.
  */
-public interface RequestHandler extends Closeable<RequestHandler> {
+public interface RequestHandler extends HandleableCloseable<RequestHandler> {
 
     /**
      * Receive a one-way request from a remote system.  This method is intended to be called by protocol handlers.  No

Modified: remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/remote/RequestHandlerSource.java
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/remote/RequestHandlerSource.java	2008-08-04 04:21:46 UTC (rev 4462)
+++ remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/remote/RequestHandlerSource.java	2008-08-05 02:59:58 UTC (rev 4463)
@@ -22,7 +22,7 @@
 
 package org.jboss.cx.remoting.spi.remote;
 
-import org.jboss.cx.remoting.Closeable;
+import org.jboss.cx.remoting.HandleableCloseable;
 import org.jboss.cx.remoting.RemotingException;
 import org.jboss.cx.remoting.CloseHandler;
 import java.io.IOException;
@@ -33,7 +33,7 @@
  * has the advantage that a round trip to the remote side is not necessary; the local side can spawn a request handler
  * and simply notify the remote side of the change.
  */
-public interface RequestHandlerSource extends Closeable<RequestHandlerSource> {
+public interface RequestHandlerSource extends HandleableCloseable<RequestHandlerSource> {
 
     /**
      * Create a request handler for the service corresponding to this request handler source.

Modified: remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/wrapper/EndpointWrapper.java
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/wrapper/EndpointWrapper.java	2008-08-04 04:21:46 UTC (rev 4462)
+++ remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/wrapper/EndpointWrapper.java	2008-08-05 02:59:58 UTC (rev 4463)
@@ -2,13 +2,17 @@
 
 import java.util.concurrent.ConcurrentMap;
 import java.io.IOException;
+import java.net.URI;
 import org.jboss.cx.remoting.Endpoint;
 import org.jboss.cx.remoting.RequestListener;
 import org.jboss.cx.remoting.Client;
 import org.jboss.cx.remoting.ClientSource;
+import org.jboss.cx.remoting.SimpleCloseable;
+import org.jboss.cx.remoting.ServiceListener;
 import org.jboss.cx.remoting.spi.remote.RequestHandler;
 import org.jboss.cx.remoting.spi.remote.RequestHandlerSource;
 import org.jboss.cx.remoting.spi.remote.Handle;
+import org.jboss.xnio.IoFuture;
 
 /**
  * A simple delegating wrapper for endpoints.
@@ -49,8 +53,8 @@
     /**
      * {@inheritDoc}  This implementation calls the same method on the delegate object.
      */
-    public <I, O> Handle<RequestHandlerSource> createRequestHandlerSource(final RequestListener<I, O> requestListener) throws IOException {
-        return delegate.createRequestHandlerSource(requestListener);
+    public <I, O> Handle<RequestHandlerSource> createRequestHandlerSource(final RequestListener<I, O> requestListener, final String serviceType, final String groupName) throws IOException {
+        return delegate.createRequestHandlerSource(requestListener, serviceType, groupName);
     }
 
     /**
@@ -66,4 +70,25 @@
     public <I, O> ClientSource<I, O> createClientSource(final RequestHandlerSource handlerSource) throws IOException {
         return delegate.createClientSource(handlerSource);
     }
+
+    /**
+     * {@inheritDoc}  This implementation calls the same method on the delegate object.
+     */
+    public <I, O> IoFuture<ClientSource<I, O>> locateService(final URI serviceUri) throws IllegalArgumentException {
+        return delegate.locateService(serviceUri);
+    }
+
+    /**
+     * {@inheritDoc}  This implementation calls the same method on the delegate object.
+     */
+    public SimpleCloseable registerRemoteService(final String serviceType, final String groupName, final String endpointName, final RequestHandlerSource handlerSource, final int metric) throws IllegalArgumentException, IOException {
+        return delegate.registerRemoteService(serviceType, groupName, endpointName, handlerSource, metric);
+    }
+
+    /**
+     * {@inheritDoc}  This implementation calls the same method on the delegate object.
+     */
+    public SimpleCloseable addServiceListener(final ServiceListener serviceListener, final boolean onlyNew) {
+        return delegate.addServiceListener(serviceListener, true);
+    }
 }

Modified: remoting3/trunk/api/src/test/java/org/jboss/cx/remoting/spi/CloseableTestCase.java
===================================================================
--- remoting3/trunk/api/src/test/java/org/jboss/cx/remoting/spi/CloseableTestCase.java	2008-08-04 04:21:46 UTC (rev 4462)
+++ remoting3/trunk/api/src/test/java/org/jboss/cx/remoting/spi/CloseableTestCase.java	2008-08-05 02:59:58 UTC (rev 4463)
@@ -46,7 +46,7 @@
         try {
             final AtomicBoolean closed = new AtomicBoolean();
             final CountDownLatch latch = new CountDownLatch(1);
-            final AbstractCloseable<Object> closeable = new AbstractCloseable<Object>(executorService) {
+            final AbstractHandleableCloseable<Object> closeable = new AbstractHandleableCloseable<Object>(executorService) {
                 // empty
             };
             try {

Modified: remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/AbstractContextImpl.java
===================================================================
--- remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/AbstractContextImpl.java	2008-08-04 04:21:46 UTC (rev 4462)
+++ remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/AbstractContextImpl.java	2008-08-05 02:59:58 UTC (rev 4463)
@@ -25,12 +25,12 @@
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.Executor;
 import org.jboss.cx.remoting.util.CollectionUtil;
-import org.jboss.cx.remoting.spi.AbstractCloseable;
+import org.jboss.cx.remoting.spi.AbstractHandleableCloseable;
 
 /**
  *
  */
-public abstract class AbstractContextImpl<T> extends AbstractCloseable<T> {
+public abstract class AbstractContextImpl<T> extends AbstractHandleableCloseable<T> {
 
     private final ConcurrentMap<Object, Object> attributes = CollectionUtil.concurrentMap();
 

Modified: remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/ClientSourceImpl.java
===================================================================
--- remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/ClientSourceImpl.java	2008-08-04 04:21:46 UTC (rev 4462)
+++ remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/ClientSourceImpl.java	2008-08-05 02:59:58 UTC (rev 4463)
@@ -29,14 +29,14 @@
 import org.jboss.cx.remoting.spi.remote.RequestHandler;
 import org.jboss.cx.remoting.spi.remote.RequestHandlerSource;
 import org.jboss.cx.remoting.spi.remote.Handle;
-import org.jboss.cx.remoting.spi.AbstractCloseable;
+import org.jboss.cx.remoting.spi.AbstractHandleableCloseable;
 import org.jboss.xnio.IoUtils;
 import java.io.IOException;
 
 /**
  *
  */
-public final class ClientSourceImpl<I, O> extends AbstractCloseable<ClientSource<I, O>> implements ClientSource<I, O> {
+public final class ClientSourceImpl<I, O> extends AbstractHandleableCloseable<ClientSource<I, O>> implements ClientSource<I, O> {
 
     private final Handle<RequestHandlerSource> handle;
     private final Endpoint endpoint;

Modified: remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/EndpointImpl.java
===================================================================
--- remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/EndpointImpl.java	2008-08-04 04:21:46 UTC (rev 4462)
+++ remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/EndpointImpl.java	2008-08-05 02:59:58 UTC (rev 4463)
@@ -6,22 +6,32 @@
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import java.util.Set;
+import java.util.Map;
+import java.util.List;
 import java.io.Closeable;
 import java.io.IOException;
+import java.net.URI;
 import org.jboss.cx.remoting.Endpoint;
 import org.jboss.cx.remoting.RequestListener;
 import org.jboss.cx.remoting.CloseHandler;
 import org.jboss.cx.remoting.Client;
 import org.jboss.cx.remoting.ClientSource;
+import org.jboss.cx.remoting.SimpleCloseable;
+import org.jboss.cx.remoting.ServiceListener;
 import org.jboss.cx.remoting.core.util.OrderedExecutorFactory;
 import org.jboss.cx.remoting.spi.remote.RequestHandler;
 import org.jboss.cx.remoting.spi.remote.RequestHandlerSource;
 import org.jboss.cx.remoting.spi.remote.Handle;
+import org.jboss.cx.remoting.spi.AbstractSimpleCloseable;
 import org.jboss.cx.remoting.util.CollectionUtil;
 import org.jboss.cx.remoting.util.NamingThreadFactory;
+import org.jboss.cx.remoting.util.ServiceURI;
 import org.jboss.cx.remoting.version.Version;
 import org.jboss.xnio.log.Logger;
 import org.jboss.xnio.IoUtils;
+import org.jboss.xnio.IoFuture;
+import org.jboss.xnio.FinishedIoFuture;
+import org.jboss.xnio.FailedIoFuture;
 
 /**
  *
@@ -41,6 +51,10 @@
     private final Set<Closeable> resources = CollectionUtil.synchronizedWeakHashSet();
     private final ConcurrentMap<Object, Object> endpointMap = CollectionUtil.concurrentMap();
 
+    private final Object serviceLock = new Object();
+    private final Map<Object, ServiceListenerRegistration> serviceListenerMap = CollectionUtil.hashMap();
+    private final Set<ServiceRegistration> serviceRegistrations = CollectionUtil.hashSet();
+
     public EndpointImpl() {
     }
 
@@ -128,8 +142,40 @@
         return localRequestHandler.getHandle();
     }
 
-    public <I, O> Handle<RequestHandlerSource> createRequestHandlerSource(final RequestListener<I, O> requestListener) throws IOException {
+    public <I, O> Handle<RequestHandlerSource> createRequestHandlerSource(final RequestListener<I, O> requestListener, final String serviceType, final String groupName) throws IOException {
+        if (serviceType == null) {
+            throw new NullPointerException("serviceType is null");
+        }
+        if (groupName == null) {
+            throw new NullPointerException("groupName is null");
+        }
+        if (serviceType.length() == 0) {
+            throw new IllegalArgumentException("serviceType is empty");
+        }
+        if (groupName.length() == 0) {
+            throw new IllegalArgumentException("groupName is empty");
+        }
         final LocalRequestHandlerSource<I, O> localRequestHandlerSource = new LocalRequestHandlerSource<I, O>(executor, requestListener);
+        final ServiceRegistration registration = new ServiceRegistration(serviceType, groupName, name, localRequestHandlerSource);
+        final AbstractSimpleCloseable newHandle = new AbstractSimpleCloseable(executor) {
+            protected void closeAction() throws IOException {
+                synchronized (serviceLock) {
+                    serviceRegistrations.remove(registration);
+                }
+            }
+        };
+        registration.setHandle(newHandle);
+        synchronized (serviceLock) {
+            serviceRegistrations.add(registration);
+            for (final ServiceListenerRegistration slr : serviceListenerMap.values()) {
+                final ServiceListener listener = slr.getServiceListener();
+                slr.getExecutor().execute(new Runnable() {
+                    public void run() {
+                        listener.localServiceCreated(slr.handle, serviceType, groupName, localRequestHandlerSource);
+                    }
+                });
+            }
+        }
         localRequestHandlerSource.addCloseHandler(remover);
         localRequestHandlerSource.open();
         return localRequestHandlerSource.getHandle();
@@ -168,6 +214,188 @@
         }
     }
 
+    public <I, O> IoFuture<ClientSource<I, O>> locateService(final URI serviceUri) throws IllegalArgumentException {
+        // todo - should this be typesafe?
+        if (serviceUri == null) {
+            throw new NullPointerException("serviceUri is null");
+        }
+        if (! ServiceURI.isRemotingServiceUri(serviceUri)) {
+            throw new IllegalArgumentException("Not a valid remoting service URI");
+        }
+        final String endpointName = ServiceURI.getEndpointName(serviceUri);
+        final String groupName = ServiceURI.getGroupName(serviceUri);
+        final String serviceType = ServiceURI.getServiceType(serviceUri);
+        synchronized (serviceLock) {
+            int bestMetric = Integer.MAX_VALUE;
+            List<ServiceRegistration> candidates = CollectionUtil.arrayList();
+            for (ServiceRegistration svc : serviceRegistrations) {
+                if (svc.matches(serviceType, groupName, endpointName)) {
+                    final int metric = svc.getMetric();
+                    if (metric < bestMetric) {
+                        candidates.clear();
+                        candidates.add(svc);
+                    } else if (metric == bestMetric) {
+                        candidates.add(svc);
+                    }
+                }
+            }
+            final int size = candidates.size();
+            if (size == 0) {
+                final FutureClientSource<I, O> futureClientSource = new FutureClientSource<I, O>();
+                final SimpleCloseable listenerHandle = addServiceListener(new ServiceListener() {
+
+                    public void localServiceCreated(final SimpleCloseable listenerHandle, final String addedServiceType, final String addedGroupName, final RequestHandlerSource requestHandlerSource) {
+                        remoteServiceRegistered(listenerHandle, name, addedServiceType, addedGroupName, 0, requestHandlerSource, null);
+                    }
+
+                    public void remoteServiceRegistered(final SimpleCloseable listenerHandle, final String addedEndpointName, final String addedServiceType, final String addedGroupName, final int metric, final RequestHandlerSource requestHandlerSource, final SimpleCloseable handle) {
+                        if (endpointName != null && endpointName.length() > 0 && !endpointName.equals(addedEndpointName)) {
+                            // no match
+                            return;
+                        }
+                        if (serviceType != null && serviceType.length() > 0 && !serviceType.equals(addedServiceType)) {
+                            // no match
+                            return;
+                        }
+                        if (groupName != null && groupName.length() > 0 && !groupName.equals(addedGroupName)) {
+                            // no match
+                            return;
+                        }
+                        try {
+                            // match!
+                            final ClientSource<I, O> clientSource = createClientSource(requestHandlerSource);
+                            futureClientSource.setResult(clientSource);
+                        } catch (IOException e) {
+                            futureClientSource.setException(e);
+                        } finally {
+                            IoUtils.safeClose(listenerHandle);
+                        }
+                    }
+                }, true);
+                futureClientSource.setListenerHandle(listenerHandle);
+                return futureClientSource;
+            }
+            final RequestHandlerSource handlerSource;
+            if (size == 1) {
+                handlerSource = candidates.get(0).getHandlerSource();
+            } else {
+                int idx = (int) ((double) candidates.size() * Math.random());
+                handlerSource = candidates.get(idx).getHandlerSource();
+            }
+            try {
+                return new FinishedIoFuture<ClientSource<I,O>>(EndpointImpl.this.<I, O>createClientSource(handlerSource));
+            } catch (IOException e) {
+                return new FailedIoFuture<ClientSource<I,O>>(e);
+            }
+        }
+    }
+
+    public SimpleCloseable registerRemoteService(final String serviceType, final String groupName, final String endpointName, final RequestHandlerSource handlerSource, final int metric) throws IllegalArgumentException, IOException {
+        if (serviceType == null) {
+            throw new NullPointerException("serviceType is null");
+        }
+        if (groupName == null) {
+            throw new NullPointerException("groupName is null");
+        }
+        if (endpointName == null) {
+            throw new NullPointerException("endpointName is null");
+        }
+        if (serviceType.length() == 0) {
+            throw new IllegalArgumentException("serviceType is empty");
+        }
+        if (groupName.length() == 0) {
+            throw new IllegalArgumentException("groupName is empty");
+        }
+        if (endpointName.length() == 0) {
+            throw new IllegalArgumentException("endpointName is empty");
+        }
+        if (endpointName.equals(name)) {
+            throw new IllegalArgumentException("remote endpoint has the same name as the local endpoint");
+        }
+        if (metric < 1) {
+            throw new IllegalArgumentException("metric must be greater than zero");
+        }
+        final ServiceRegistration registration = new ServiceRegistration(serviceType, groupName, endpointName, metric, handlerSource);
+        final AbstractSimpleCloseable newHandle = new AbstractSimpleCloseable(executor) {
+            protected void closeAction() throws IOException {
+                synchronized (serviceLock) {
+                    serviceRegistrations.remove(registration);
+                }
+            }
+        };
+        registration.setHandle(newHandle);
+        synchronized (serviceLock) {
+            serviceRegistrations.add(registration);
+            for (final ServiceListenerRegistration slr : serviceListenerMap.values()) {
+                final ServiceListener listener = slr.getServiceListener();
+                slr.getExecutor().execute(new Runnable() {
+                    public void run() {
+                        listener.remoteServiceRegistered(slr.handle, endpointName, serviceType, groupName, metric, handlerSource, newHandle);
+                    }
+                });
+            }
+        }
+        return newHandle;
+    }
+
+    public SimpleCloseable addServiceListener(final ServiceListener serviceListener, final boolean onlyNew) {
+        final Object key = new Object();
+        synchronized (serviceLock) {
+            final Executor orderedExecutor = getOrderedExecutor();
+            final ServiceListenerRegistration registration = new ServiceListenerRegistration(serviceListener, orderedExecutor);
+            serviceListenerMap.put(key, registration);
+            final AbstractSimpleCloseable handle = new AbstractSimpleCloseable(executor) {
+                protected void closeAction() throws IOException {
+                    synchronized (serviceLock) {
+                        serviceListenerMap.remove(key);
+                    }
+                }
+            };
+            registration.setHandle(handle);
+            if (! onlyNew) {
+                for (final ServiceRegistration reg : serviceRegistrations) {
+                    if (reg.isRemote()) { // x is remote
+                        orderedExecutor.execute(new Runnable() {
+                            public void run() {
+                                serviceListener.remoteServiceRegistered(handle, reg.getEndpointName(), reg.getServiceType(), reg.getGroupName(), reg.getMetric(), reg.getHandlerSource(), reg.getHandle());
+                            }
+                        });
+                    } else { // x is local
+                        orderedExecutor.execute(new Runnable() {
+                            public void run() {
+                                serviceListener.localServiceCreated(handle, reg.getServiceType(), reg.getGroupName(), reg.getHandlerSource());
+                            }
+                        });
+                    }
+                }
+            }
+            return handle;
+        }
+    }
+
+    private static final class ServiceListenerRegistration {
+        private final ServiceListener serviceListener;
+        private final Executor executor;
+        private volatile SimpleCloseable handle;
+
+        private ServiceListenerRegistration(final ServiceListener serviceListener, final Executor executor) {
+            this.serviceListener = serviceListener;
+            this.executor = executor;
+        }
+
+        ServiceListener getServiceListener() {
+            return serviceListener;
+        }
+
+        Executor getExecutor() {
+            return executor;
+        }
+
+        void setHandle(final SimpleCloseable handle) {
+            this.handle = handle;
+        }
+    }
+
     private final ResourceRemover remover = new ResourceRemover();
 
     private final class ResourceRemover implements CloseHandler<Closeable> {

Added: remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/FutureClientSource.java
===================================================================
--- remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/FutureClientSource.java	                        (rev 0)
+++ remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/FutureClientSource.java	2008-08-05 02:59:58 UTC (rev 4463)
@@ -0,0 +1,55 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2008, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.cx.remoting.core;
+
+import org.jboss.xnio.AbstractIoFuture;
+import org.jboss.xnio.IoFuture;
+import org.jboss.xnio.IoUtils;
+import org.jboss.cx.remoting.ClientSource;
+import org.jboss.cx.remoting.SimpleCloseable;
+import java.io.IOException;
+
+/**
+ *
+ */
+public final class FutureClientSource<I, O> extends AbstractIoFuture<ClientSource<I, O>> {
+
+    private volatile SimpleCloseable listenerHandle;
+
+    protected boolean setException(final IOException exception) {
+        return super.setException(exception);
+    }
+
+    protected boolean setResult(final ClientSource<I, O> result) {
+        return super.setResult(result);
+    }
+
+    public IoFuture<ClientSource<I, O>> cancel() {
+        IoUtils.safeClose(listenerHandle);
+        return this;
+    }
+
+    void setListenerHandle(final SimpleCloseable listenerHandle) {
+        this.listenerHandle = listenerHandle;
+    }
+}

Added: remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/ServiceRegistration.java
===================================================================
--- remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/ServiceRegistration.java	                        (rev 0)
+++ remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/ServiceRegistration.java	2008-08-05 02:59:58 UTC (rev 4463)
@@ -0,0 +1,95 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2008, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.cx.remoting.core;
+
+import org.jboss.cx.remoting.spi.remote.RequestHandlerSource;
+import org.jboss.cx.remoting.SimpleCloseable;
+
+/**
+ *
+ */
+public final class ServiceRegistration {
+    private final boolean remote;
+    private final String serviceType;
+    private final String groupName;
+    private final String endpointName;
+    private final int metric;
+    private final RequestHandlerSource handlerSource;
+    private volatile SimpleCloseable handle;
+
+    ServiceRegistration(final String serviceType, final String groupName, final String endpointName, final int metric, final RequestHandlerSource handlerSource) {
+        remote = true;
+        this.serviceType = serviceType;
+        this.groupName = groupName;
+        this.endpointName = endpointName;
+        this.metric = metric;
+        this.handlerSource = handlerSource;
+    }
+
+    ServiceRegistration(final String serviceType, final String groupName, final String endpointName, final RequestHandlerSource handlerSource) {
+        remote = false;
+        metric = 0;
+        this.serviceType = serviceType;
+        this.groupName = groupName;
+        this.endpointName = endpointName;
+        this.handlerSource = handlerSource;
+    }
+
+    public boolean matches(final String serviceType, final String groupName, final String endpointName) {
+        return  (serviceType == null || serviceType.length() == 0 || serviceType.equals(this.serviceType)) &&
+                (groupName == null || groupName.length() == 0 || groupName.equals(this.groupName)) &&
+                (endpointName == null || endpointName.length() == 0 || endpointName.equals(this.endpointName));
+    }
+
+    public boolean isRemote() {
+        return remote;
+    }
+
+    public String getServiceType() {
+        return serviceType;
+    }
+
+    public String getGroupName() {
+        return groupName;
+    }
+
+    public String getEndpointName() {
+        return endpointName;
+    }
+
+    public int getMetric() {
+        return metric;
+    }
+
+    public RequestHandlerSource getHandlerSource() {
+        return handlerSource;
+    }
+
+    public SimpleCloseable getHandle() {
+        return handle;
+    }
+
+    void setHandle(final SimpleCloseable handle) {
+        this.handle = handle;
+    }
+}

Modified: remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/AbstractConnection.java
===================================================================
--- remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/AbstractConnection.java	2008-08-04 04:21:46 UTC (rev 4462)
+++ remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/AbstractConnection.java	2008-08-05 02:59:58 UTC (rev 4463)
@@ -22,13 +22,13 @@
 
 package org.jboss.cx.remoting.protocol.basic;
 
-import org.jboss.cx.remoting.spi.AbstractCloseable;
+import org.jboss.cx.remoting.spi.AbstractHandleableCloseable;
 import java.util.concurrent.Executor;
 
 /**
  *
  */
-public abstract class AbstractConnection extends AbstractCloseable<Connection> implements Connection {
+public abstract class AbstractConnection extends AbstractHandleableCloseable<Connection> implements Connection {
     /**
      * Basic constructor.
      *

Modified: remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/Connection.java
===================================================================
--- remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/Connection.java	2008-08-04 04:21:46 UTC (rev 4462)
+++ remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/Connection.java	2008-08-05 02:59:58 UTC (rev 4463)
@@ -24,12 +24,12 @@
 
 import org.jboss.cx.remoting.spi.remote.RequestHandlerSource;
 import org.jboss.cx.remoting.spi.remote.Handle;
-import org.jboss.cx.remoting.Closeable;
+import org.jboss.cx.remoting.HandleableCloseable;
 import org.jboss.cx.remoting.RemotingException;
 
 /**
  *
  */
-public interface Connection extends Closeable<Connection> {
+public interface Connection extends HandleableCloseable<Connection> {
     Handle<RequestHandlerSource> getServiceForId(int id) throws RemotingException;
 }

Modified: remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/MessageType.java
===================================================================
--- remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/MessageType.java	2008-08-04 04:21:46 UTC (rev 4462)
+++ remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/MessageType.java	2008-08-05 02:59:58 UTC (rev 4463)
@@ -33,11 +33,16 @@
     public static final int CANCEL_REQUEST     = 4;
     public static final int CANCEL_ACK         = 5;
     public static final int REQUEST_FAILED     = 6;
-    // Remote side called .close() on a forwarded RemoteClientEndpoint
+    // Remote side called .close() on a forwarded RequestHandler
     public static final int CLIENT_CLOSE       = 7;
-    // Remote side called .close() on a forwarded RemoteClientEndpoint
+    // Remote side pulled a new RequestHandler off of a forwarded RequestHandlerSource
     public static final int CLIENT_OPEN        = 8;
+    // Remote side called .close() on a forwarded RequestHandlerSource
     public static final int SERVICE_CLOSE      = 9;
+    // Remote side brought a new service online
+    public static final int SERVICE_ADVERTISE  = 10;
+    // Remote side's service is no longer available
+    public static final int SERVICE_UNADVERTISE= 11;
 
     private MessageType() {
     }

Modified: remoting3/trunk/protocol/basic/src/test/java/org/jboss/cx/remoting/protocol/basic/ConnectionTestCase.java
===================================================================
--- remoting3/trunk/protocol/basic/src/test/java/org/jboss/cx/remoting/protocol/basic/ConnectionTestCase.java	2008-08-04 04:21:46 UTC (rev 4462)
+++ remoting3/trunk/protocol/basic/src/test/java/org/jboss/cx/remoting/protocol/basic/ConnectionTestCase.java	2008-08-05 02:59:58 UTC (rev 4463)
@@ -93,7 +93,7 @@
                                     problems.add(e);
                                 }
                             }
-                        });
+                        }, INIT_ME, INIT_ME);
                         try {
                             serviceRegistry.bind(requestHandlerSourceHandle.getResource(), 13);
                             final IoHandlerFactory<AllocatedMessageChannel> handlerFactory = BasicProtocol.createServer(closeableExecutor, allocator, serviceRegistry);

Modified: remoting3/trunk/standalone/src/main/java/org/jboss/cx/remoting/Remoting.java
===================================================================
--- remoting3/trunk/standalone/src/main/java/org/jboss/cx/remoting/Remoting.java	2008-08-04 04:21:46 UTC (rev 4462)
+++ remoting3/trunk/standalone/src/main/java/org/jboss/cx/remoting/Remoting.java	2008-08-05 02:59:58 UTC (rev 4463)
@@ -42,7 +42,7 @@
     }
 
     public static <I, O> ClientSource<I, O> createLocalClientSource(Endpoint endpoint, RequestListener<I, O> requestListener) throws IOException {
-        final Handle<RequestHandlerSource> handle = endpoint.createRequestHandlerSource(requestListener);
+        final Handle<RequestHandlerSource> handle = endpoint.createRequestHandlerSource(requestListener, INIT_ME, INIT_ME);
         try {
             return endpoint.createClientSource(handle.getResource());
         } finally {




More information about the jboss-remoting-commits mailing list