Author: david.lloyd(a)jboss.com
Date: 2008-08-04 22:59:58 -0400 (Mon, 04 Aug 2008)
New Revision: 4463
Added:
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/HandleableCloseable.java
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/RemoteServiceRegistration.java
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/ServiceListener.java
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/SimpleCloseable.java
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/AbstractHandleableCloseable.java
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/AbstractSimpleCloseable.java
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/FutureClientSource.java
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/ServiceRegistration.java
Removed:
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/Closeable.java
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/AbstractCloseable.java
Modified:
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/Client.java
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/ClientContext.java
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/ClientSource.java
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/Endpoint.java
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/ServiceContext.java
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/AbstractAutoCloseable.java
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/remote/Handle.java
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/remote/RequestHandler.java
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/remote/RequestHandlerSource.java
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/wrapper/EndpointWrapper.java
remoting3/trunk/api/src/test/java/org/jboss/cx/remoting/spi/CloseableTestCase.java
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/AbstractContextImpl.java
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/ClientSourceImpl.java
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/EndpointImpl.java
remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/AbstractConnection.java
remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/Connection.java
remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/MessageType.java
remoting3/trunk/protocol/basic/src/test/java/org/jboss/cx/remoting/protocol/basic/ConnectionTestCase.java
remoting3/trunk/standalone/src/main/java/org/jboss/cx/remoting/Remoting.java
Log:
API changes for service registration, location
Modified: remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/Client.java
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/Client.java 2008-08-04
04:21:46 UTC (rev 4462)
+++ remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/Client.java 2008-08-05
02:59:58 UTC (rev 4463)
@@ -10,7 +10,7 @@
* @param <I> the request type
* @param <O> the reply type
*/
-public interface Client<I, O> extends Closeable<Client<I, O>> {
+public interface Client<I, O> extends HandleableCloseable<Client<I, O>>
{
/**
* Send a request and block until a reply is received.
* <p/>
Modified: remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/ClientContext.java
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/ClientContext.java 2008-08-04
04:21:46 UTC (rev 4462)
+++ remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/ClientContext.java 2008-08-05
02:59:58 UTC (rev 4463)
@@ -5,7 +5,7 @@
/**
* The server context for a single remote client instance.
*/
-public interface ClientContext extends Closeable<ClientContext> {
+public interface ClientContext extends HandleableCloseable<ClientContext> {
/**
* Get the attributes for this end of the channel as a map.
*
Modified: remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/ClientSource.java
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/ClientSource.java 2008-08-04
04:21:46 UTC (rev 4462)
+++ remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/ClientSource.java 2008-08-05
02:59:58 UTC (rev 4463)
@@ -8,7 +8,7 @@
* @param <I> the request type
* @param <O> the reply type
*/
-public interface ClientSource<I, O> extends Closeable<ClientSource<I,
O>> {
+public interface ClientSource<I, O> extends
HandleableCloseable<ClientSource<I, O>> {
/**
* Close the context source. New contexts may no longer be created after this
* method is called. Subsequent calls to this method have no additional effect.
Deleted: remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/Closeable.java
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/Closeable.java 2008-08-04
04:21:46 UTC (rev 4462)
+++ remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/Closeable.java 2008-08-05
02:59:58 UTC (rev 4463)
@@ -1,26 +0,0 @@
-package org.jboss.cx.remoting;
-
-import java.io.IOException;
-
-/**
- * A Remoting resource that can be closed.
- *
- * @param <T> the type that is passed to the close handler
- */
-public interface Closeable<T> extends java.io.Closeable {
-
- /**
- * Close, waiting for any outstanding processing to finish.
- *
- * @throws IOException if the close failed
- */
- void close() throws IOException;
-
- /**
- * Add a handler that will be called upon close. The handler may be called before or
after the close acutally
- * takes place.
- *
- * @param handler the close handler
- */
- void addCloseHandler(CloseHandler<? super T> handler);
-}
Modified: remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/Endpoint.java
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/Endpoint.java 2008-08-04
04:21:46 UTC (rev 4462)
+++ remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/Endpoint.java 2008-08-05
02:59:58 UTC (rev 4463)
@@ -1,10 +1,12 @@
package org.jboss.cx.remoting;
import java.util.concurrent.ConcurrentMap;
+import java.net.URI;
import java.io.IOException;
import org.jboss.cx.remoting.spi.remote.RequestHandler;
import org.jboss.cx.remoting.spi.remote.RequestHandlerSource;
import org.jboss.cx.remoting.spi.remote.Handle;
+import org.jboss.xnio.IoFuture;
/**
* A potential participant in a JBoss Remoting communications relationship.
@@ -51,10 +53,12 @@
* @param <I> the request type
* @param <O> the reply type
* @param requestListener the request listener
+ * @param serviceType the type of service to advertise
+ * @param groupName the name of the group of this type to be part of
* @return a handle for the client source
* @throws IOException if an error occurs
*/
- <I, O> Handle<RequestHandlerSource>
createRequestHandlerSource(RequestListener<I, O> requestListener) throws
IOException;
+ <I, O> Handle<RequestHandlerSource>
createRequestHandlerSource(RequestListener<I, O> requestListener, String
serviceType, String groupName) throws IOException;
/**
* Create a client that uses the given request handler to handle its requests.
@@ -77,4 +81,42 @@
* @throws IOException if an error occurs
*/
<I, O> ClientSource<I, O> createClientSource(RequestHandlerSource
handlerSource) throws IOException;
+
+ /**
+ * Attempt to locate a service. The return value then be queried for the
service's {@code ClientSource}.
+ *
+ * @param <I> the request type
+ * @param <O> the reply type
+ * @param serviceUri the URI of the service
+ * @return the future service
+ * @throws IllegalArgumentException if the given URI is not a valid Remoting service
URI
+ */
+ <I, O> IoFuture<ClientSource<I, O>> locateService(URI serviceUri)
throws IllegalArgumentException;
+
+ /**
+ * Register a remotely available service.<p>
+ * The remote endpoint may not have the same name as this endpoint. The group name
and service type must be
+ * non-{@code null} and non-empty. The metric must be greater than zero.
+ *
+ * @param serviceType the service type string
+ * @param groupName the group name
+ * @param endpointName the name of the remote endpoint
+ * @param handlerSource the remote handler source
+ * @param metric the preference metric, lower is more preferred
+ * @return a handle corresponding to the registration
+ * @throws IllegalArgumentException if one of the given arguments was not valid
+ * @throws IOException if an error occurs with the registration
+ */
+ SimpleCloseable registerRemoteService(String serviceType, String groupName, String
endpointName, RequestHandlerSource handlerSource, int metric) throws
IllegalArgumentException, IOException;
+
+ /**
+ * Add a listener for observing when local and remote services are added. The caller
may specify whether the listener
+ * should be notified of the complete list of currently registered services (set
{@code onlyNew} to {@code false})
+ * or only services registered after the time of calling this method (set {@code
onlyNew} to {@code true}).
+ *
+ * @param serviceListener the listener
+ * @param onlyNew {@code true} if only new registrations should be sent to the
listener
+ * @return a handle which may be used to unregister the listener
+ */
+ SimpleCloseable addServiceListener(ServiceListener serviceListener, boolean
onlyNew);
}
Copied: remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/HandleableCloseable.java
(from rev 4437, remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/Closeable.java)
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/HandleableCloseable.java
(rev 0)
+++
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/HandleableCloseable.java 2008-08-05
02:59:58 UTC (rev 4463)
@@ -0,0 +1,27 @@
+package org.jboss.cx.remoting;
+
+import java.io.IOException;
+import java.io.Closeable;
+
+/**
+ * A Remoting resource that can be closed.
+ *
+ * @param <T> the type that is passed to the close handler
+ */
+public interface HandleableCloseable<T> extends Closeable {
+
+ /**
+ * Close, waiting for any outstanding processing to finish.
+ *
+ * @throws IOException if the close failed
+ */
+ void close() throws IOException;
+
+ /**
+ * Add a handler that will be called upon close. The handler may be called before or
after the close acutally
+ * takes place.
+ *
+ * @param handler the close handler
+ */
+ void addCloseHandler(CloseHandler<? super T> handler);
+}
Added:
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/RemoteServiceRegistration.java
===================================================================
---
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/RemoteServiceRegistration.java
(rev 0)
+++
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/RemoteServiceRegistration.java 2008-08-05
02:59:58 UTC (rev 4463)
@@ -0,0 +1,29 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2008, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+
+package org.jboss.cx.remoting;
+
+/**
+ *
+ */
+public interface RemoteServiceRegistration extends
HandleableCloseable<RemoteServiceRegistration> {
+}
Modified: remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/ServiceContext.java
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/ServiceContext.java 2008-08-04
04:21:46 UTC (rev 4462)
+++ remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/ServiceContext.java 2008-08-05
02:59:58 UTC (rev 4463)
@@ -6,7 +6,7 @@
* The server-side context of a service. Used to hold state relating to a service (known
as a {@code ContextSource} on
* the client side).
*/
-public interface ServiceContext extends Closeable<ServiceContext> {
+public interface ServiceContext extends HandleableCloseable<ServiceContext> {
/**
* Get an attribute map which can be used to cache arbitrary state on the server
side.
Added: remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/ServiceListener.java
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/ServiceListener.java
(rev 0)
+++
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/ServiceListener.java 2008-08-05
02:59:58 UTC (rev 4463)
@@ -0,0 +1,56 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2008, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+
+package org.jboss.cx.remoting;
+
+import org.jboss.cx.remoting.spi.remote.RequestHandlerSource;
+
+/**
+ * A listener for watching service registrations on an endpoint.
+ */
+public interface ServiceListener {
+
+ /**
+ * Receive notification that a local service was added. To receive a notification
when it is closed, register a
+ * close handler on the provided {@code requestHandlerSource} parameter.
+ *
+ * @param listenerHandle the handle to this listener
+ * @param serviceType the service type string
+ * @param groupName the group name string
+ * @param requestHandlerSource the request handler source
+ */
+ void localServiceCreated(SimpleCloseable listenerHandle, String serviceType, String
groupName, RequestHandlerSource requestHandlerSource);
+
+ /**
+ * Receive notification that a remote service was registered. To receive a
notification when it is unregistered, register a
+ * close handler on the provided {@code handle} parameter.
+ *
+ * @param listenerHandle the handle to this listener
+ * @param endpointName the remote endpoint name
+ * @param serviceType the service type string
+ * @param groupName the group name string
+ * @param metric the metric value
+ * @param requestHandlerSource the request handler source
+ * @param handle the handle to the registration
+ */
+ void remoteServiceRegistered(SimpleCloseable listenerHandle, String endpointName,
String serviceType, String groupName, int metric, RequestHandlerSource
requestHandlerSource, SimpleCloseable handle);
+}
Added: remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/SimpleCloseable.java
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/SimpleCloseable.java
(rev 0)
+++
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/SimpleCloseable.java 2008-08-05
02:59:58 UTC (rev 4463)
@@ -0,0 +1,29 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2008, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+
+package org.jboss.cx.remoting;
+
+/**
+ * A simple close-only handle.
+ */
+public interface SimpleCloseable extends HandleableCloseable<SimpleCloseable> {
+}
Modified:
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/AbstractAutoCloseable.java
===================================================================
---
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/AbstractAutoCloseable.java 2008-08-04
04:21:46 UTC (rev 4462)
+++
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/AbstractAutoCloseable.java 2008-08-05
02:59:58 UTC (rev 4463)
@@ -33,7 +33,7 @@
* A closeable implementation that supports reference counting. Since the initial
reference count is zero, implementors
* must be careful to ensure that the first operation invoked is a call to {@link
#getHandle()}.
*/
-public abstract class AbstractAutoCloseable<T> extends AbstractCloseable<T>
{
+public abstract class AbstractAutoCloseable<T> extends
AbstractHandleableCloseable<T> {
private final AtomicInteger refcount = new AtomicInteger(0);
private final Executor executor;
@@ -100,7 +100,7 @@
return new HandleImpl();
}
- private final class HandleImpl extends AbstractCloseable<Handle<T>>
implements Handle<T> {
+ private final class HandleImpl extends
AbstractHandleableCloseable<Handle<T>> implements Handle<T> {
private HandleImpl() throws IOException {
super(AbstractAutoCloseable.this.executor);
inc();
Deleted:
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/AbstractCloseable.java
===================================================================
---
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/AbstractCloseable.java 2008-08-04
04:21:46 UTC (rev 4462)
+++
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/AbstractCloseable.java 2008-08-05
02:59:58 UTC (rev 4463)
@@ -1,178 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2008, JBoss Inc., and individual contributors as indicated
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
- */
-
-package org.jboss.cx.remoting.spi;
-
-import org.jboss.cx.remoting.Closeable;
-import org.jboss.cx.remoting.RemotingException;
-import org.jboss.cx.remoting.CloseHandler;
-import org.jboss.cx.remoting.spi.SpiUtils;
-import org.jboss.xnio.IoUtils;
-import org.jboss.xnio.log.Logger;
-import java.util.HashSet;
-import java.util.Set;
-import java.util.concurrent.Executor;
-import java.util.concurrent.RejectedExecutionException;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.security.AccessController;
-import java.security.PrivilegedAction;
-import java.io.IOException;
-
-/**
- * A basic implementation of a closeable resource. Use as a convenient base class for
your closeable resources.
- * Ensures that the {@code close()} method is idempotent; implements the registry of
close handlers.
- */
-public abstract class AbstractCloseable<T> implements Closeable<T> {
-
- private static final Logger log = Logger.getLogger(AbstractCloseable.class);
-
- protected final Executor executor;
- private final Object closeLock = new Object();
- private final AtomicBoolean closed = new AtomicBoolean();
- private Set<CloseHandler<? super T>> closeHandlers;
-
- private static final boolean LEAK_DEBUGGING;
- private final StackTraceElement[] backtrace;
-
- static {
- boolean b = false;
- try {
- b = Boolean.parseBoolean(AccessController.doPrivileged(new
PrivilegedAction<String>() {
- public String run() {
- return System.getProperty("jboss.remoting.leakdebugging",
"false");
- }
- }));
- } catch (SecurityException se) {
- b = false;
- }
- LEAK_DEBUGGING = b;
- }
-
- /**
- * Basic constructor.
- *
- * @param executor the executor used to execute the close notification handlers
- */
- protected AbstractCloseable(final Executor executor) {
- if (executor == null) {
- throw new NullPointerException("executor is null");
- }
- this.executor = executor;
- backtrace = LEAK_DEBUGGING ? new Throwable().getStackTrace() : null;
- }
-
- /**
- * Read the status of this resource. This is just a snapshot in time; there is no
guarantee that the resource
- * will remain open for any amount of time, even if this method returns {@code
true}.
- *
- * @return {@code true} if the resource is still open
- */
- protected boolean isOpen() {
- return ! closed.get();
- }
-
- /**
- * Called exactly once when the {@code close()} method is invoked; the actual close
operation should take place here.
- *
- * @throws RemotingException if the close failed
- */
- protected void closeAction() throws IOException {}
-
- /**
- * {@inheritDoc}
- */
- public final void close() throws IOException {
- if (! closed.getAndSet(true)) {
- log.trace("Closed %s", this);
- synchronized (closeLock) {
- if (closeHandlers != null) {
- for (final CloseHandler<? super T> handler : closeHandlers) {
- try {
- executor.execute(new Runnable() {
- @SuppressWarnings({ "unchecked" })
- public void run() {
- SpiUtils.safeHandleClose(handler, (T)
AbstractCloseable.this);
- }
- });
- } catch (RejectedExecutionException ree) {
- log.warn("Unable to execute close handler (execution
rejected) for %s (%s)", this, ree.getMessage());
- }
- }
- closeHandlers = null;
- }
- }
- closeAction();
- }
- }
-
- /**
- * {@inheritDoc}
- */
- public void addCloseHandler(final CloseHandler<? super T> handler) {
- synchronized (closeLock) {
- if (closeHandlers == null) {
- closeHandlers = new HashSet<CloseHandler<? super T>>();
- }
- closeHandlers.add(handler);
- }
- }
-
- /**
- * Get the executor to use for handler invocation.
- *
- * @return the executor
- */
- protected Executor getExecutor() {
- return executor;
- }
-
- /**
- * Finalize this closeable instance. If the instance hasn't been closed, it is
closed and a warning is logged.
- */
- protected void finalize() throws Throwable {
- try {
- super.finalize();
- } finally {
- if (isOpen()) {
- if (LEAK_DEBUGGING) {
- final Throwable t = new LeakThrowable();
- t.setStackTrace(backtrace);
- log.warn(t, "Leaked a %s instance: %s",
getClass().getName(), this);
- } else {
- log.warn("Leaked a %s instance: %s", getClass().getName(),
this);
- }
- IoUtils.safeClose(this);
- }
- }
- }
-
- @SuppressWarnings({ "serial" })
- private static final class LeakThrowable extends Throwable {
-
- public LeakThrowable() {
- }
-
- public String toString() {
- return "a leaked reference";
- }
- }
-}
Copied:
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/AbstractHandleableCloseable.java
(from rev 4439,
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/AbstractCloseable.java)
===================================================================
---
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/AbstractHandleableCloseable.java
(rev 0)
+++
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/AbstractHandleableCloseable.java 2008-08-05
02:59:58 UTC (rev 4463)
@@ -0,0 +1,178 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2008, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+
+package org.jboss.cx.remoting.spi;
+
+import org.jboss.cx.remoting.HandleableCloseable;
+import org.jboss.cx.remoting.RemotingException;
+import org.jboss.cx.remoting.CloseHandler;
+import org.jboss.cx.remoting.spi.SpiUtils;
+import org.jboss.xnio.IoUtils;
+import org.jboss.xnio.log.Logger;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.Executor;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.security.AccessController;
+import java.security.PrivilegedAction;
+import java.io.IOException;
+
+/**
+ * A basic implementation of a closeable resource. Use as a convenient base class for
your closeable resources.
+ * Ensures that the {@code close()} method is idempotent; implements the registry of
close handlers.
+ */
+public abstract class AbstractHandleableCloseable<T> implements
HandleableCloseable<T> {
+
+ private static final Logger log =
Logger.getLogger(AbstractHandleableCloseable.class);
+
+ protected final Executor executor;
+ private final Object closeLock = new Object();
+ private final AtomicBoolean closed = new AtomicBoolean();
+ private Set<CloseHandler<? super T>> closeHandlers;
+
+ private static final boolean LEAK_DEBUGGING;
+ private final StackTraceElement[] backtrace;
+
+ static {
+ boolean b = false;
+ try {
+ b = Boolean.parseBoolean(AccessController.doPrivileged(new
PrivilegedAction<String>() {
+ public String run() {
+ return System.getProperty("jboss.remoting.leakdebugging",
"false");
+ }
+ }));
+ } catch (SecurityException se) {
+ b = false;
+ }
+ LEAK_DEBUGGING = b;
+ }
+
+ /**
+ * Basic constructor.
+ *
+ * @param executor the executor used to execute the close notification handlers
+ */
+ protected AbstractHandleableCloseable(final Executor executor) {
+ if (executor == null) {
+ throw new NullPointerException("executor is null");
+ }
+ this.executor = executor;
+ backtrace = LEAK_DEBUGGING ? new Throwable().getStackTrace() : null;
+ }
+
+ /**
+ * Read the status of this resource. This is just a snapshot in time; there is no
guarantee that the resource
+ * will remain open for any amount of time, even if this method returns {@code
true}.
+ *
+ * @return {@code true} if the resource is still open
+ */
+ protected boolean isOpen() {
+ return ! closed.get();
+ }
+
+ /**
+ * Called exactly once when the {@code close()} method is invoked; the actual close
operation should take place here.
+ *
+ * @throws RemotingException if the close failed
+ */
+ protected void closeAction() throws IOException {}
+
+ /**
+ * {@inheritDoc}
+ */
+ public final void close() throws IOException {
+ if (! closed.getAndSet(true)) {
+ log.trace("Closed %s", this);
+ synchronized (closeLock) {
+ if (closeHandlers != null) {
+ for (final CloseHandler<? super T> handler : closeHandlers) {
+ try {
+ executor.execute(new Runnable() {
+ @SuppressWarnings({ "unchecked" })
+ public void run() {
+ SpiUtils.safeHandleClose(handler, (T)
AbstractHandleableCloseable.this);
+ }
+ });
+ } catch (RejectedExecutionException ree) {
+ log.warn("Unable to execute close handler (execution
rejected) for %s (%s)", this, ree.getMessage());
+ }
+ }
+ closeHandlers = null;
+ }
+ }
+ closeAction();
+ }
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ public void addCloseHandler(final CloseHandler<? super T> handler) {
+ synchronized (closeLock) {
+ if (closeHandlers == null) {
+ closeHandlers = new HashSet<CloseHandler<? super T>>();
+ }
+ closeHandlers.add(handler);
+ }
+ }
+
+ /**
+ * Get the executor to use for handler invocation.
+ *
+ * @return the executor
+ */
+ protected Executor getExecutor() {
+ return executor;
+ }
+
+ /**
+ * Finalize this closeable instance. If the instance hasn't been closed, it is
closed and a warning is logged.
+ */
+ protected void finalize() throws Throwable {
+ try {
+ super.finalize();
+ } finally {
+ if (isOpen()) {
+ if (LEAK_DEBUGGING) {
+ final Throwable t = new LeakThrowable();
+ t.setStackTrace(backtrace);
+ log.warn(t, "Leaked a %s instance: %s",
getClass().getName(), this);
+ } else {
+ log.warn("Leaked a %s instance: %s", getClass().getName(),
this);
+ }
+ IoUtils.safeClose(this);
+ }
+ }
+ }
+
+ @SuppressWarnings({ "serial" })
+ private static final class LeakThrowable extends Throwable {
+
+ public LeakThrowable() {
+ }
+
+ public String toString() {
+ return "a leaked reference";
+ }
+ }
+}
Added:
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/AbstractSimpleCloseable.java
===================================================================
---
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/AbstractSimpleCloseable.java
(rev 0)
+++
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/AbstractSimpleCloseable.java 2008-08-05
02:59:58 UTC (rev 4463)
@@ -0,0 +1,41 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2008, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+
+package org.jboss.cx.remoting.spi;
+
+import org.jboss.cx.remoting.SimpleCloseable;
+import java.util.concurrent.Executor;
+
+/**
+ * An abstract simple closeable implementation.
+ */
+public abstract class AbstractSimpleCloseable extends
AbstractHandleableCloseable<SimpleCloseable> implements SimpleCloseable {
+
+ /**
+ * Basic constructor.
+ *
+ * @param executor the executor used to execute the close notification handlers
+ */
+ protected AbstractSimpleCloseable(final Executor executor) {
+ super(executor);
+ }
+}
Modified: remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/remote/Handle.java
===================================================================
---
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/remote/Handle.java 2008-08-04
04:21:46 UTC (rev 4462)
+++
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/remote/Handle.java 2008-08-05
02:59:58 UTC (rev 4463)
@@ -22,14 +22,14 @@
package org.jboss.cx.remoting.spi.remote;
-import org.jboss.cx.remoting.Closeable;
+import org.jboss.cx.remoting.HandleableCloseable;
import org.jboss.cx.remoting.CloseHandler;
import java.io.IOException;
/**
* A handle to a local resource.
*/
-public interface Handle<T> extends Closeable<Handle<T>> {
+public interface Handle<T> extends HandleableCloseable<Handle<T>> {
/**
* Get the resource.
Modified:
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/remote/RequestHandler.java
===================================================================
---
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/remote/RequestHandler.java 2008-08-04
04:21:46 UTC (rev 4462)
+++
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/remote/RequestHandler.java 2008-08-05
02:59:58 UTC (rev 4463)
@@ -22,7 +22,7 @@
package org.jboss.cx.remoting.spi.remote;
-import org.jboss.cx.remoting.Closeable;
+import org.jboss.cx.remoting.HandleableCloseable;
import org.jboss.cx.remoting.RemotingException;
import org.jboss.cx.remoting.CloseHandler;
import java.io.IOException;
@@ -31,7 +31,7 @@
* A request handler, which can be passed to remote endpoints. Remote systems can then
use the handler
* to make invocations, or they may forward a handler on to other remote systems.
*/
-public interface RequestHandler extends Closeable<RequestHandler> {
+public interface RequestHandler extends HandleableCloseable<RequestHandler> {
/**
* Receive a one-way request from a remote system. This method is intended to be
called by protocol handlers. No
Modified:
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/remote/RequestHandlerSource.java
===================================================================
---
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/remote/RequestHandlerSource.java 2008-08-04
04:21:46 UTC (rev 4462)
+++
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/remote/RequestHandlerSource.java 2008-08-05
02:59:58 UTC (rev 4463)
@@ -22,7 +22,7 @@
package org.jboss.cx.remoting.spi.remote;
-import org.jboss.cx.remoting.Closeable;
+import org.jboss.cx.remoting.HandleableCloseable;
import org.jboss.cx.remoting.RemotingException;
import org.jboss.cx.remoting.CloseHandler;
import java.io.IOException;
@@ -33,7 +33,7 @@
* has the advantage that a round trip to the remote side is not necessary; the local
side can spawn a request handler
* and simply notify the remote side of the change.
*/
-public interface RequestHandlerSource extends Closeable<RequestHandlerSource> {
+public interface RequestHandlerSource extends
HandleableCloseable<RequestHandlerSource> {
/**
* Create a request handler for the service corresponding to this request handler
source.
Modified:
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/wrapper/EndpointWrapper.java
===================================================================
---
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/wrapper/EndpointWrapper.java 2008-08-04
04:21:46 UTC (rev 4462)
+++
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/wrapper/EndpointWrapper.java 2008-08-05
02:59:58 UTC (rev 4463)
@@ -2,13 +2,17 @@
import java.util.concurrent.ConcurrentMap;
import java.io.IOException;
+import java.net.URI;
import org.jboss.cx.remoting.Endpoint;
import org.jboss.cx.remoting.RequestListener;
import org.jboss.cx.remoting.Client;
import org.jboss.cx.remoting.ClientSource;
+import org.jboss.cx.remoting.SimpleCloseable;
+import org.jboss.cx.remoting.ServiceListener;
import org.jboss.cx.remoting.spi.remote.RequestHandler;
import org.jboss.cx.remoting.spi.remote.RequestHandlerSource;
import org.jboss.cx.remoting.spi.remote.Handle;
+import org.jboss.xnio.IoFuture;
/**
* A simple delegating wrapper for endpoints.
@@ -49,8 +53,8 @@
/**
* {@inheritDoc} This implementation calls the same method on the delegate object.
*/
- public <I, O> Handle<RequestHandlerSource>
createRequestHandlerSource(final RequestListener<I, O> requestListener) throws
IOException {
- return delegate.createRequestHandlerSource(requestListener);
+ public <I, O> Handle<RequestHandlerSource>
createRequestHandlerSource(final RequestListener<I, O> requestListener, final String
serviceType, final String groupName) throws IOException {
+ return delegate.createRequestHandlerSource(requestListener, serviceType,
groupName);
}
/**
@@ -66,4 +70,25 @@
public <I, O> ClientSource<I, O> createClientSource(final
RequestHandlerSource handlerSource) throws IOException {
return delegate.createClientSource(handlerSource);
}
+
+ /**
+ * {@inheritDoc} This implementation calls the same method on the delegate object.
+ */
+ public <I, O> IoFuture<ClientSource<I, O>> locateService(final URI
serviceUri) throws IllegalArgumentException {
+ return delegate.locateService(serviceUri);
+ }
+
+ /**
+ * {@inheritDoc} This implementation calls the same method on the delegate object.
+ */
+ public SimpleCloseable registerRemoteService(final String serviceType, final String
groupName, final String endpointName, final RequestHandlerSource handlerSource, final int
metric) throws IllegalArgumentException, IOException {
+ return delegate.registerRemoteService(serviceType, groupName, endpointName,
handlerSource, metric);
+ }
+
+ /**
+ * {@inheritDoc} This implementation calls the same method on the delegate object.
+ */
+ public SimpleCloseable addServiceListener(final ServiceListener serviceListener,
final boolean onlyNew) {
+ return delegate.addServiceListener(serviceListener, true);
+ }
}
Modified:
remoting3/trunk/api/src/test/java/org/jboss/cx/remoting/spi/CloseableTestCase.java
===================================================================
---
remoting3/trunk/api/src/test/java/org/jboss/cx/remoting/spi/CloseableTestCase.java 2008-08-04
04:21:46 UTC (rev 4462)
+++
remoting3/trunk/api/src/test/java/org/jboss/cx/remoting/spi/CloseableTestCase.java 2008-08-05
02:59:58 UTC (rev 4463)
@@ -46,7 +46,7 @@
try {
final AtomicBoolean closed = new AtomicBoolean();
final CountDownLatch latch = new CountDownLatch(1);
- final AbstractCloseable<Object> closeable = new
AbstractCloseable<Object>(executorService) {
+ final AbstractHandleableCloseable<Object> closeable = new
AbstractHandleableCloseable<Object>(executorService) {
// empty
};
try {
Modified:
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/AbstractContextImpl.java
===================================================================
---
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/AbstractContextImpl.java 2008-08-04
04:21:46 UTC (rev 4462)
+++
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/AbstractContextImpl.java 2008-08-05
02:59:58 UTC (rev 4463)
@@ -25,12 +25,12 @@
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
import org.jboss.cx.remoting.util.CollectionUtil;
-import org.jboss.cx.remoting.spi.AbstractCloseable;
+import org.jboss.cx.remoting.spi.AbstractHandleableCloseable;
/**
*
*/
-public abstract class AbstractContextImpl<T> extends AbstractCloseable<T> {
+public abstract class AbstractContextImpl<T> extends
AbstractHandleableCloseable<T> {
private final ConcurrentMap<Object, Object> attributes =
CollectionUtil.concurrentMap();
Modified:
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/ClientSourceImpl.java
===================================================================
---
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/ClientSourceImpl.java 2008-08-04
04:21:46 UTC (rev 4462)
+++
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/ClientSourceImpl.java 2008-08-05
02:59:58 UTC (rev 4463)
@@ -29,14 +29,14 @@
import org.jboss.cx.remoting.spi.remote.RequestHandler;
import org.jboss.cx.remoting.spi.remote.RequestHandlerSource;
import org.jboss.cx.remoting.spi.remote.Handle;
-import org.jboss.cx.remoting.spi.AbstractCloseable;
+import org.jboss.cx.remoting.spi.AbstractHandleableCloseable;
import org.jboss.xnio.IoUtils;
import java.io.IOException;
/**
*
*/
-public final class ClientSourceImpl<I, O> extends
AbstractCloseable<ClientSource<I, O>> implements ClientSource<I, O> {
+public final class ClientSourceImpl<I, O> extends
AbstractHandleableCloseable<ClientSource<I, O>> implements ClientSource<I,
O> {
private final Handle<RequestHandlerSource> handle;
private final Endpoint endpoint;
Modified: remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/EndpointImpl.java
===================================================================
---
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/EndpointImpl.java 2008-08-04
04:21:46 UTC (rev 4462)
+++
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/EndpointImpl.java 2008-08-05
02:59:58 UTC (rev 4463)
@@ -6,22 +6,32 @@
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.Set;
+import java.util.Map;
+import java.util.List;
import java.io.Closeable;
import java.io.IOException;
+import java.net.URI;
import org.jboss.cx.remoting.Endpoint;
import org.jboss.cx.remoting.RequestListener;
import org.jboss.cx.remoting.CloseHandler;
import org.jboss.cx.remoting.Client;
import org.jboss.cx.remoting.ClientSource;
+import org.jboss.cx.remoting.SimpleCloseable;
+import org.jboss.cx.remoting.ServiceListener;
import org.jboss.cx.remoting.core.util.OrderedExecutorFactory;
import org.jboss.cx.remoting.spi.remote.RequestHandler;
import org.jboss.cx.remoting.spi.remote.RequestHandlerSource;
import org.jboss.cx.remoting.spi.remote.Handle;
+import org.jboss.cx.remoting.spi.AbstractSimpleCloseable;
import org.jboss.cx.remoting.util.CollectionUtil;
import org.jboss.cx.remoting.util.NamingThreadFactory;
+import org.jboss.cx.remoting.util.ServiceURI;
import org.jboss.cx.remoting.version.Version;
import org.jboss.xnio.log.Logger;
import org.jboss.xnio.IoUtils;
+import org.jboss.xnio.IoFuture;
+import org.jboss.xnio.FinishedIoFuture;
+import org.jboss.xnio.FailedIoFuture;
/**
*
@@ -41,6 +51,10 @@
private final Set<Closeable> resources =
CollectionUtil.synchronizedWeakHashSet();
private final ConcurrentMap<Object, Object> endpointMap =
CollectionUtil.concurrentMap();
+ private final Object serviceLock = new Object();
+ private final Map<Object, ServiceListenerRegistration> serviceListenerMap =
CollectionUtil.hashMap();
+ private final Set<ServiceRegistration> serviceRegistrations =
CollectionUtil.hashSet();
+
public EndpointImpl() {
}
@@ -128,8 +142,40 @@
return localRequestHandler.getHandle();
}
- public <I, O> Handle<RequestHandlerSource>
createRequestHandlerSource(final RequestListener<I, O> requestListener) throws
IOException {
+ public <I, O> Handle<RequestHandlerSource>
createRequestHandlerSource(final RequestListener<I, O> requestListener, final String
serviceType, final String groupName) throws IOException {
+ if (serviceType == null) {
+ throw new NullPointerException("serviceType is null");
+ }
+ if (groupName == null) {
+ throw new NullPointerException("groupName is null");
+ }
+ if (serviceType.length() == 0) {
+ throw new IllegalArgumentException("serviceType is empty");
+ }
+ if (groupName.length() == 0) {
+ throw new IllegalArgumentException("groupName is empty");
+ }
final LocalRequestHandlerSource<I, O> localRequestHandlerSource = new
LocalRequestHandlerSource<I, O>(executor, requestListener);
+ final ServiceRegistration registration = new ServiceRegistration(serviceType,
groupName, name, localRequestHandlerSource);
+ final AbstractSimpleCloseable newHandle = new AbstractSimpleCloseable(executor)
{
+ protected void closeAction() throws IOException {
+ synchronized (serviceLock) {
+ serviceRegistrations.remove(registration);
+ }
+ }
+ };
+ registration.setHandle(newHandle);
+ synchronized (serviceLock) {
+ serviceRegistrations.add(registration);
+ for (final ServiceListenerRegistration slr : serviceListenerMap.values()) {
+ final ServiceListener listener = slr.getServiceListener();
+ slr.getExecutor().execute(new Runnable() {
+ public void run() {
+ listener.localServiceCreated(slr.handle, serviceType, groupName,
localRequestHandlerSource);
+ }
+ });
+ }
+ }
localRequestHandlerSource.addCloseHandler(remover);
localRequestHandlerSource.open();
return localRequestHandlerSource.getHandle();
@@ -168,6 +214,188 @@
}
}
+ public <I, O> IoFuture<ClientSource<I, O>> locateService(final URI
serviceUri) throws IllegalArgumentException {
+ // todo - should this be typesafe?
+ if (serviceUri == null) {
+ throw new NullPointerException("serviceUri is null");
+ }
+ if (! ServiceURI.isRemotingServiceUri(serviceUri)) {
+ throw new IllegalArgumentException("Not a valid remoting service
URI");
+ }
+ final String endpointName = ServiceURI.getEndpointName(serviceUri);
+ final String groupName = ServiceURI.getGroupName(serviceUri);
+ final String serviceType = ServiceURI.getServiceType(serviceUri);
+ synchronized (serviceLock) {
+ int bestMetric = Integer.MAX_VALUE;
+ List<ServiceRegistration> candidates = CollectionUtil.arrayList();
+ for (ServiceRegistration svc : serviceRegistrations) {
+ if (svc.matches(serviceType, groupName, endpointName)) {
+ final int metric = svc.getMetric();
+ if (metric < bestMetric) {
+ candidates.clear();
+ candidates.add(svc);
+ } else if (metric == bestMetric) {
+ candidates.add(svc);
+ }
+ }
+ }
+ final int size = candidates.size();
+ if (size == 0) {
+ final FutureClientSource<I, O> futureClientSource = new
FutureClientSource<I, O>();
+ final SimpleCloseable listenerHandle = addServiceListener(new
ServiceListener() {
+
+ public void localServiceCreated(final SimpleCloseable listenerHandle,
final String addedServiceType, final String addedGroupName, final RequestHandlerSource
requestHandlerSource) {
+ remoteServiceRegistered(listenerHandle, name, addedServiceType,
addedGroupName, 0, requestHandlerSource, null);
+ }
+
+ public void remoteServiceRegistered(final SimpleCloseable
listenerHandle, final String addedEndpointName, final String addedServiceType, final
String addedGroupName, final int metric, final RequestHandlerSource requestHandlerSource,
final SimpleCloseable handle) {
+ if (endpointName != null && endpointName.length() > 0
&& !endpointName.equals(addedEndpointName)) {
+ // no match
+ return;
+ }
+ if (serviceType != null && serviceType.length() > 0
&& !serviceType.equals(addedServiceType)) {
+ // no match
+ return;
+ }
+ if (groupName != null && groupName.length() > 0
&& !groupName.equals(addedGroupName)) {
+ // no match
+ return;
+ }
+ try {
+ // match!
+ final ClientSource<I, O> clientSource =
createClientSource(requestHandlerSource);
+ futureClientSource.setResult(clientSource);
+ } catch (IOException e) {
+ futureClientSource.setException(e);
+ } finally {
+ IoUtils.safeClose(listenerHandle);
+ }
+ }
+ }, true);
+ futureClientSource.setListenerHandle(listenerHandle);
+ return futureClientSource;
+ }
+ final RequestHandlerSource handlerSource;
+ if (size == 1) {
+ handlerSource = candidates.get(0).getHandlerSource();
+ } else {
+ int idx = (int) ((double) candidates.size() * Math.random());
+ handlerSource = candidates.get(idx).getHandlerSource();
+ }
+ try {
+ return new
FinishedIoFuture<ClientSource<I,O>>(EndpointImpl.this.<I,
O>createClientSource(handlerSource));
+ } catch (IOException e) {
+ return new FailedIoFuture<ClientSource<I,O>>(e);
+ }
+ }
+ }
+
+ public SimpleCloseable registerRemoteService(final String serviceType, final String
groupName, final String endpointName, final RequestHandlerSource handlerSource, final int
metric) throws IllegalArgumentException, IOException {
+ if (serviceType == null) {
+ throw new NullPointerException("serviceType is null");
+ }
+ if (groupName == null) {
+ throw new NullPointerException("groupName is null");
+ }
+ if (endpointName == null) {
+ throw new NullPointerException("endpointName is null");
+ }
+ if (serviceType.length() == 0) {
+ throw new IllegalArgumentException("serviceType is empty");
+ }
+ if (groupName.length() == 0) {
+ throw new IllegalArgumentException("groupName is empty");
+ }
+ if (endpointName.length() == 0) {
+ throw new IllegalArgumentException("endpointName is empty");
+ }
+ if (endpointName.equals(name)) {
+ throw new IllegalArgumentException("remote endpoint has the same name as
the local endpoint");
+ }
+ if (metric < 1) {
+ throw new IllegalArgumentException("metric must be greater than
zero");
+ }
+ final ServiceRegistration registration = new ServiceRegistration(serviceType,
groupName, endpointName, metric, handlerSource);
+ final AbstractSimpleCloseable newHandle = new AbstractSimpleCloseable(executor)
{
+ protected void closeAction() throws IOException {
+ synchronized (serviceLock) {
+ serviceRegistrations.remove(registration);
+ }
+ }
+ };
+ registration.setHandle(newHandle);
+ synchronized (serviceLock) {
+ serviceRegistrations.add(registration);
+ for (final ServiceListenerRegistration slr : serviceListenerMap.values()) {
+ final ServiceListener listener = slr.getServiceListener();
+ slr.getExecutor().execute(new Runnable() {
+ public void run() {
+ listener.remoteServiceRegistered(slr.handle, endpointName,
serviceType, groupName, metric, handlerSource, newHandle);
+ }
+ });
+ }
+ }
+ return newHandle;
+ }
+
+ public SimpleCloseable addServiceListener(final ServiceListener serviceListener,
final boolean onlyNew) {
+ final Object key = new Object();
+ synchronized (serviceLock) {
+ final Executor orderedExecutor = getOrderedExecutor();
+ final ServiceListenerRegistration registration = new
ServiceListenerRegistration(serviceListener, orderedExecutor);
+ serviceListenerMap.put(key, registration);
+ final AbstractSimpleCloseable handle = new AbstractSimpleCloseable(executor)
{
+ protected void closeAction() throws IOException {
+ synchronized (serviceLock) {
+ serviceListenerMap.remove(key);
+ }
+ }
+ };
+ registration.setHandle(handle);
+ if (! onlyNew) {
+ for (final ServiceRegistration reg : serviceRegistrations) {
+ if (reg.isRemote()) { // x is remote
+ orderedExecutor.execute(new Runnable() {
+ public void run() {
+ serviceListener.remoteServiceRegistered(handle,
reg.getEndpointName(), reg.getServiceType(), reg.getGroupName(), reg.getMetric(),
reg.getHandlerSource(), reg.getHandle());
+ }
+ });
+ } else { // x is local
+ orderedExecutor.execute(new Runnable() {
+ public void run() {
+ serviceListener.localServiceCreated(handle,
reg.getServiceType(), reg.getGroupName(), reg.getHandlerSource());
+ }
+ });
+ }
+ }
+ }
+ return handle;
+ }
+ }
+
+ private static final class ServiceListenerRegistration {
+ private final ServiceListener serviceListener;
+ private final Executor executor;
+ private volatile SimpleCloseable handle;
+
+ private ServiceListenerRegistration(final ServiceListener serviceListener, final
Executor executor) {
+ this.serviceListener = serviceListener;
+ this.executor = executor;
+ }
+
+ ServiceListener getServiceListener() {
+ return serviceListener;
+ }
+
+ Executor getExecutor() {
+ return executor;
+ }
+
+ void setHandle(final SimpleCloseable handle) {
+ this.handle = handle;
+ }
+ }
+
private final ResourceRemover remover = new ResourceRemover();
private final class ResourceRemover implements CloseHandler<Closeable> {
Added:
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/FutureClientSource.java
===================================================================
--- remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/FutureClientSource.java
(rev 0)
+++
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/FutureClientSource.java 2008-08-05
02:59:58 UTC (rev 4463)
@@ -0,0 +1,55 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2008, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+
+package org.jboss.cx.remoting.core;
+
+import org.jboss.xnio.AbstractIoFuture;
+import org.jboss.xnio.IoFuture;
+import org.jboss.xnio.IoUtils;
+import org.jboss.cx.remoting.ClientSource;
+import org.jboss.cx.remoting.SimpleCloseable;
+import java.io.IOException;
+
+/**
+ *
+ */
+public final class FutureClientSource<I, O> extends
AbstractIoFuture<ClientSource<I, O>> {
+
+ private volatile SimpleCloseable listenerHandle;
+
+ protected boolean setException(final IOException exception) {
+ return super.setException(exception);
+ }
+
+ protected boolean setResult(final ClientSource<I, O> result) {
+ return super.setResult(result);
+ }
+
+ public IoFuture<ClientSource<I, O>> cancel() {
+ IoUtils.safeClose(listenerHandle);
+ return this;
+ }
+
+ void setListenerHandle(final SimpleCloseable listenerHandle) {
+ this.listenerHandle = listenerHandle;
+ }
+}
Added:
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/ServiceRegistration.java
===================================================================
---
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/ServiceRegistration.java
(rev 0)
+++
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/ServiceRegistration.java 2008-08-05
02:59:58 UTC (rev 4463)
@@ -0,0 +1,95 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2008, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+
+package org.jboss.cx.remoting.core;
+
+import org.jboss.cx.remoting.spi.remote.RequestHandlerSource;
+import org.jboss.cx.remoting.SimpleCloseable;
+
+/**
+ *
+ */
+public final class ServiceRegistration {
+ private final boolean remote;
+ private final String serviceType;
+ private final String groupName;
+ private final String endpointName;
+ private final int metric;
+ private final RequestHandlerSource handlerSource;
+ private volatile SimpleCloseable handle;
+
+ ServiceRegistration(final String serviceType, final String groupName, final String
endpointName, final int metric, final RequestHandlerSource handlerSource) {
+ remote = true;
+ this.serviceType = serviceType;
+ this.groupName = groupName;
+ this.endpointName = endpointName;
+ this.metric = metric;
+ this.handlerSource = handlerSource;
+ }
+
+ ServiceRegistration(final String serviceType, final String groupName, final String
endpointName, final RequestHandlerSource handlerSource) {
+ remote = false;
+ metric = 0;
+ this.serviceType = serviceType;
+ this.groupName = groupName;
+ this.endpointName = endpointName;
+ this.handlerSource = handlerSource;
+ }
+
+ public boolean matches(final String serviceType, final String groupName, final String
endpointName) {
+ return (serviceType == null || serviceType.length() == 0 ||
serviceType.equals(this.serviceType)) &&
+ (groupName == null || groupName.length() == 0 ||
groupName.equals(this.groupName)) &&
+ (endpointName == null || endpointName.length() == 0 ||
endpointName.equals(this.endpointName));
+ }
+
+ public boolean isRemote() {
+ return remote;
+ }
+
+ public String getServiceType() {
+ return serviceType;
+ }
+
+ public String getGroupName() {
+ return groupName;
+ }
+
+ public String getEndpointName() {
+ return endpointName;
+ }
+
+ public int getMetric() {
+ return metric;
+ }
+
+ public RequestHandlerSource getHandlerSource() {
+ return handlerSource;
+ }
+
+ public SimpleCloseable getHandle() {
+ return handle;
+ }
+
+ void setHandle(final SimpleCloseable handle) {
+ this.handle = handle;
+ }
+}
Modified:
remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/AbstractConnection.java
===================================================================
---
remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/AbstractConnection.java 2008-08-04
04:21:46 UTC (rev 4462)
+++
remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/AbstractConnection.java 2008-08-05
02:59:58 UTC (rev 4463)
@@ -22,13 +22,13 @@
package org.jboss.cx.remoting.protocol.basic;
-import org.jboss.cx.remoting.spi.AbstractCloseable;
+import org.jboss.cx.remoting.spi.AbstractHandleableCloseable;
import java.util.concurrent.Executor;
/**
*
*/
-public abstract class AbstractConnection extends AbstractCloseable<Connection>
implements Connection {
+public abstract class AbstractConnection extends
AbstractHandleableCloseable<Connection> implements Connection {
/**
* Basic constructor.
*
Modified:
remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/Connection.java
===================================================================
---
remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/Connection.java 2008-08-04
04:21:46 UTC (rev 4462)
+++
remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/Connection.java 2008-08-05
02:59:58 UTC (rev 4463)
@@ -24,12 +24,12 @@
import org.jboss.cx.remoting.spi.remote.RequestHandlerSource;
import org.jboss.cx.remoting.spi.remote.Handle;
-import org.jboss.cx.remoting.Closeable;
+import org.jboss.cx.remoting.HandleableCloseable;
import org.jboss.cx.remoting.RemotingException;
/**
*
*/
-public interface Connection extends Closeable<Connection> {
+public interface Connection extends HandleableCloseable<Connection> {
Handle<RequestHandlerSource> getServiceForId(int id) throws RemotingException;
}
Modified:
remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/MessageType.java
===================================================================
---
remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/MessageType.java 2008-08-04
04:21:46 UTC (rev 4462)
+++
remoting3/trunk/protocol/basic/src/main/java/org/jboss/cx/remoting/protocol/basic/MessageType.java 2008-08-05
02:59:58 UTC (rev 4463)
@@ -33,11 +33,16 @@
public static final int CANCEL_REQUEST = 4;
public static final int CANCEL_ACK = 5;
public static final int REQUEST_FAILED = 6;
- // Remote side called .close() on a forwarded RemoteClientEndpoint
+ // Remote side called .close() on a forwarded RequestHandler
public static final int CLIENT_CLOSE = 7;
- // Remote side called .close() on a forwarded RemoteClientEndpoint
+ // Remote side pulled a new RequestHandler off of a forwarded RequestHandlerSource
public static final int CLIENT_OPEN = 8;
+ // Remote side called .close() on a forwarded RequestHandlerSource
public static final int SERVICE_CLOSE = 9;
+ // Remote side brought a new service online
+ public static final int SERVICE_ADVERTISE = 10;
+ // Remote side's service is no longer available
+ public static final int SERVICE_UNADVERTISE= 11;
private MessageType() {
}
Modified:
remoting3/trunk/protocol/basic/src/test/java/org/jboss/cx/remoting/protocol/basic/ConnectionTestCase.java
===================================================================
---
remoting3/trunk/protocol/basic/src/test/java/org/jboss/cx/remoting/protocol/basic/ConnectionTestCase.java 2008-08-04
04:21:46 UTC (rev 4462)
+++
remoting3/trunk/protocol/basic/src/test/java/org/jboss/cx/remoting/protocol/basic/ConnectionTestCase.java 2008-08-05
02:59:58 UTC (rev 4463)
@@ -93,7 +93,7 @@
problems.add(e);
}
}
- });
+ }, INIT_ME, INIT_ME);
try {
serviceRegistry.bind(requestHandlerSourceHandle.getResource(), 13);
final IoHandlerFactory<AllocatedMessageChannel>
handlerFactory = BasicProtocol.createServer(closeableExecutor, allocator,
serviceRegistry);
Modified: remoting3/trunk/standalone/src/main/java/org/jboss/cx/remoting/Remoting.java
===================================================================
---
remoting3/trunk/standalone/src/main/java/org/jboss/cx/remoting/Remoting.java 2008-08-04
04:21:46 UTC (rev 4462)
+++
remoting3/trunk/standalone/src/main/java/org/jboss/cx/remoting/Remoting.java 2008-08-05
02:59:58 UTC (rev 4463)
@@ -42,7 +42,7 @@
}
public static <I, O> ClientSource<I, O> createLocalClientSource(Endpoint
endpoint, RequestListener<I, O> requestListener) throws IOException {
- final Handle<RequestHandlerSource> handle =
endpoint.createRequestHandlerSource(requestListener);
+ final Handle<RequestHandlerSource> handle =
endpoint.createRequestHandlerSource(requestListener, INIT_ME, INIT_ME);
try {
return endpoint.createClientSource(handle.getResource());
} finally {