Author: david.lloyd(a)jboss.com
Date: 2008-02-25 19:40:52 -0500 (Mon, 25 Feb 2008)
New Revision: 3502
Added:
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/service/ClassReply.java
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/service/ClassRequest.java
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/service/ServiceReply.java
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/service/ServiceRequest.java
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/ServiceLocatorListener.java
Removed:
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/InterceptorDeploymentSpec.java
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/ServiceDeploymentSpec.java
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/ServiceLocator.java
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/service/SecurityService.java
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/service/TxnService.java
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/AbstractClientInterceptor.java
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/AbstractInterceptor.java
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/AbstractServerInterceptor.java
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/ClientInterceptor.java
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/ClientInterceptorFactory.java
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/Discovery.java
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/EndpointProvider.java
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/Interceptor.java
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/InterceptorContext.java
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/InterceptorSpec.java
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/ListenerFactoryContext.java
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/ServerInterceptor.java
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/ServerInterceptorFactory.java
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreEndpointProvider.java
remoting3/trunk/jrpp/src/main/java/org/jboss/cx/remoting/jrpp/id/IdentifierManager.java
Modified:
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/Context.java
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/Endpoint.java
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/RemotingException.java
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/RequestContext.java
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/Session.java
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/protocol/ProtocolContext.java
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/protocol/ProtocolHandler.java
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/protocol/ProtocolHandlerFactory.java
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/wrapper/EndpointWrapper.java
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/wrapper/RequestContextWrapper.java
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/wrapper/SessionWrapper.java
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreEndpoint.java
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreInboundContext.java
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreInboundRequest.java
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreInboundService.java
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreOutboundService.java
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreSession.java
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/LocalProtocol.java
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/StreamMarker.java
remoting3/trunk/http/src/main/java/org/jboss/cx/remoting/http/RemotingHttpSessionImpl.java
remoting3/trunk/jrpp/src/main/java/org/jboss/cx/remoting/jrpp/JrppConnection.java
remoting3/trunk/jrpp/src/main/java/org/jboss/cx/remoting/jrpp/id/JrppContextIdentifier.java
remoting3/trunk/jrpp/src/main/java/org/jboss/cx/remoting/jrpp/id/JrppRequestIdentifier.java
remoting3/trunk/jrpp/src/main/java/org/jboss/cx/remoting/jrpp/id/JrppServiceIdentifier.java
remoting3/trunk/jrpp/src/main/java/org/jboss/cx/remoting/jrpp/id/JrppStreamIdentifier.java
remoting3/trunk/jrpp/src/main/java/org/jboss/cx/remoting/jrpp/id/JrppSubChannelIdentifier.java
remoting3/trunk/standalone/src/main/java/org/jboss/cx/remoting/Remoting.java
Log:
Major overhaul of the service location system. Now Context and ContextSource instances
can be sent in requests and replies to facilitate service location using the standard
request mechanism - with the added benefit of providing secondary service such as
classloading etc. using the exact same mechanism.
Modified: remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/Context.java
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/Context.java 2008-02-25
13:35:25 UTC (rev 3501)
+++ remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/Context.java 2008-02-26
00:40:52 UTC (rev 3502)
@@ -1,12 +1,13 @@
package org.jboss.cx.remoting;
import java.util.concurrent.ConcurrentMap;
+import java.io.Closeable;
/**
* A communications context. The context may be associated with a
security/authentication state and a transactional
* state, as well as other state maintained by the remote side.
*/
-public interface Context<I, O> {
+public interface Context<I, O> extends Closeable {
void close() throws RemotingException;
Modified: remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/Endpoint.java
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/Endpoint.java 2008-02-25
13:35:25 UTC (rev 3501)
+++ remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/Endpoint.java 2008-02-26
00:40:52 UTC (rev 3502)
@@ -3,8 +3,6 @@
import java.net.URI;
import java.util.concurrent.ConcurrentMap;
import org.jboss.cx.remoting.util.AttributeMap;
-import org.jboss.cx.remoting.spi.Discovery;
-import org.jboss.cx.remoting.spi.Registration;
import org.jboss.cx.remoting.spi.protocol.ProtocolRegistration;
import org.jboss.cx.remoting.spi.protocol.ProtocolRegistrationSpec;
@@ -21,29 +19,10 @@
ConcurrentMap<Object, Object> getAttributes();
/**
- * Shut down this endpoint. Cancel any outstanding requests, tear down thread
pools.
- */
- void shutdown();
-
- /**
- * Add a shutdown listener. This listener will be called after shutdown has been
initiated.
- *
- * @param listener the listener
- */
- void addShutdownListener(EndpointShutdownListener listener);
-
- /**
- * Remove a previously added shutdown listener.
- *
- * @param listener the listener
- */
- void removeShutdownListener(EndpointShutdownListener listener);
-
- /**
* Open a session with another endpoint. The protocol used is determined by the URI
scheme. The URI user-info part
* must be {@code null} unless the specific protocol has an additional authentication
scheme (e.g. HTTP BASIC). The
- * authority is used to locate the server (the exact interpretation is dependent upon
the protocol). The URI path is
- * the service to connect to. The path may be relative to a protocol-specific
deployment path.
+ * authority is used to locate the server (the exact interpretation is dependent upon
the protocol). The path may be
+ * relative to a protocol-specific deployment path.
*
* @param remoteUri the URI of the server to connect to
* @param attributeMap the attribute map to use to configure this session
@@ -62,18 +41,6 @@
String getName();
/**
- * Deploy a service into this endpoint.
- *
- * @param spec the specification for this service deployment
- *
- * @return a registration that may be used to control this deployment
- *
- * @throws RemotingException if the registration failed
- * @throws IllegalArgumentException if the specification failed validation
- */
- <I, O> Registration deployService(ServiceDeploymentSpec<I, O> spec)
throws RemotingException, IllegalArgumentException;
-
- /**
* Register a protocol specification for this endpoint.
*
* @param spec the protocol specification
@@ -86,32 +53,21 @@
ProtocolRegistration registerProtocol(ProtocolRegistrationSpec spec) throws
RemotingException, IllegalArgumentException;
/**
- * Deploy a context interceptor type into this endpoint. Subsequent sessions may
negotiate to use this context
- * service.
+ * Create a context that can be used to invoke a request listener on this endpoint.
The context may be passed to a
+ * remote endpoint as part of a request or a reply, or it may be used locally.
*
- * @param spec the deployment specification
- *
- * @return a registration that may be used to control this deployment
- *
- * @throws RemotingException if the registration failed
- * @throws IllegalArgumentException if the specification failed validation
+ * @param requestListener the request listener
+ * @return the context
*/
- Registration deployInterceptorType(InterceptorDeploymentSpec spec) throws
RemotingException, IllegalArgumentException;
+ <I, O> Context<I, O> createContext(RequestListener<I, O>
requestListener);
/**
- * Discover a remote endpoint. Adds the host to the internal routing table of the
endpoint. Higher cost indicates
- * a less desirable route.
- * <p/>
- * The next hop URI should also include a path component, if the target endpoint is
deployed relative to a base path
- * (e.g. a servlet).
+ * Create a context source that can be used to acquire contexts associated with a
request listener on this endpoint.
+ * The context source may be passed to a remote endpoint as part of a request or a
reply, or it may be used locally.
+ * The objects that are produced by this method may be used to mass-produce {@code
Context} instances.
*
- * @param endpointName the name of the discovered endpoint
- * @param nextHop the URI of the means to connect to the next "hop" towards
the named endpoint
- * @param cost the "cost" associated with traversing this route
- *
- * @return an obejct representing the discovery
- *
- * @throws RemotingException if there is a problem with the discovery parameters
+ * @param requestListener the request listener
+ * @return the context source
*/
- Discovery discover(String endpointName, URI nextHop, int cost) throws
RemotingException;
+ <I, O> ContextSource<I, O> createService(RequestListener<I, O>
requestListener);
}
Deleted:
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/InterceptorDeploymentSpec.java
===================================================================
---
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/InterceptorDeploymentSpec.java 2008-02-25
13:35:25 UTC (rev 3501)
+++
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/InterceptorDeploymentSpec.java 2008-02-26
00:40:52 UTC (rev 3502)
@@ -1,40 +0,0 @@
-package org.jboss.cx.remoting;
-
-import org.jboss.cx.remoting.spi.ClientInterceptorFactory;
-
-/**
- *
- */
-public final class InterceptorDeploymentSpec {
- private final String identifier;
- private final int preference;
- private final ClientInterceptorFactory contextInterceptorFactory;
-
- public InterceptorDeploymentSpec(final String identifier, final int preference, final
ClientInterceptorFactory contextInterceptorFactory) {
- this.identifier = identifier;
- this.preference = preference;
- this.contextInterceptorFactory = contextInterceptorFactory;
- }
-
- public static final InterceptorDeploymentSpec DEFAULT = new
InterceptorDeploymentSpec(null, 1000, null);
-
- public String getIdentifier() {
- return identifier;
- }
-
- public ClientInterceptorFactory getContextInterceptorFactory() {
- return contextInterceptorFactory;
- }
-
- public InterceptorDeploymentSpec setIdentifier(String identifier) {
- return new InterceptorDeploymentSpec(identifier, preference,
contextInterceptorFactory);
- }
-
- public InterceptorDeploymentSpec setPreference(int preference) {
- return new InterceptorDeploymentSpec(identifier, preference,
contextInterceptorFactory);
- }
-
- public InterceptorDeploymentSpec
setContextInterceptorFactory(ClientInterceptorFactory contextInterceptorFactory) {
- return new InterceptorDeploymentSpec(identifier, preference,
contextInterceptorFactory);
- }
-}
Modified: remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/RemotingException.java
===================================================================
---
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/RemotingException.java 2008-02-25
13:35:25 UTC (rev 3501)
+++
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/RemotingException.java 2008-02-26
00:40:52 UTC (rev 3502)
@@ -1,9 +1,11 @@
package org.jboss.cx.remoting;
+import java.io.IOException;
+
/**
*
*/
-public class RemotingException extends Exception {
+public class RemotingException extends IOException {
public RemotingException() {
}
Modified: remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/RequestContext.java
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/RequestContext.java 2008-02-25
13:35:25 UTC (rev 3501)
+++ remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/RequestContext.java 2008-02-26
00:40:52 UTC (rev 3502)
@@ -1,9 +1,11 @@
package org.jboss.cx.remoting;
+import java.util.concurrent.Executor;
+
/**
* The context of a single request.
*/
-public interface RequestContext<O> {
+public interface RequestContext<O> extends Executor {
/**
* Determine whether the current request was cancelled.
*
@@ -46,4 +48,12 @@
* @param handler
*/
void addCancelHandler(RequestCancelHandler<O> handler);
+
+ /**
+ * Execute a task in the context of this request. This method can be used to
continue execution of a request. Any
+ * tasks submitted to this executor will be interruptable in the event of
cancellation.
+ *
+ * @param command the task to execute
+ */
+ void execute(Runnable command);
}
Deleted:
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/ServiceDeploymentSpec.java
===================================================================
---
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/ServiceDeploymentSpec.java 2008-02-25
13:35:25 UTC (rev 3501)
+++
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/ServiceDeploymentSpec.java 2008-02-26
00:40:52 UTC (rev 3502)
@@ -1,123 +0,0 @@
-package org.jboss.cx.remoting;
-
-import java.util.Collections;
-import java.util.List;
-import org.jboss.cx.remoting.util.CollectionUtil;
-import org.jboss.cx.remoting.spi.InterceptorSpec;
-
-/**
- *
- */
-public final class ServiceDeploymentSpec<I, O> {
- private final List<InterceptorSpec> interceptorSpecs;
- private final String serviceName;
- private final String serviceType;
- private final Class<I> requestType;
- private final Class<O> replyType;
- private final RequestListener<I, O> requestListener;
-
- public static final ServiceDeploymentSpec<Void, Void> DEFAULT = new
ServiceDeploymentSpec<Void, Void>(Collections.<InterceptorSpec>emptyList(),
null, null, Void.class, Void.class, null);
-
- private ServiceDeploymentSpec(final List<InterceptorSpec> interceptorSpecs,
final String serviceName, final String serviceType, final Class<I> requestType,
final Class<O> replyType, final RequestListener<I, O> requestListener) {
- this.interceptorSpecs = interceptorSpecs;
- this.serviceName = serviceName;
- this.serviceType = serviceType;
- this.requestType = requestType;
- this.replyType = replyType;
- this.requestListener = requestListener;
- }
-
- public List<InterceptorSpec> getInterceptorSpecs() {
- return interceptorSpecs;
- }
-
- public String getServiceName() {
- return serviceName;
- }
-
- public String getServiceType() {
- return serviceType;
- }
-
- public Class<I> getRequestType() {
- return requestType;
- }
-
- public Class<O> getReplyType() {
- return replyType;
- }
-
- public RequestListener<I, O> getRequestListener() {
- return requestListener;
- }
-
- public ServiceDeploymentSpec<I, O> setInterceptorSpecs(InterceptorSpec...
specs) {
- if (specs == null) {
- throw new NullPointerException("specs is null");
- }
- return new ServiceDeploymentSpec<I,
O>(CollectionUtil.unmodifiableList(specs.clone()), serviceName, serviceType,
requestType, replyType, requestListener);
- }
-
- public ServiceDeploymentSpec<I, O> setServiceGroupName(String serviceName) {
- if (serviceName == null) {
- throw new NullPointerException("serviceName is null");
- }
- return new ServiceDeploymentSpec<I, O>(interceptorSpecs, serviceName,
serviceType, requestType, replyType, requestListener);
- }
-
- public ServiceDeploymentSpec<I, O> setServiceType(final String serviceType) {
- if (serviceType == null) {
- throw new NullPointerException("serviceType is null");
- }
- return new ServiceDeploymentSpec<I, O>(interceptorSpecs, serviceName,
serviceType, requestType, replyType, requestListener);
- }
-
- public <T> ServiceDeploymentSpec<T, O> setRequestType(Class<T>
requestType) {
- if (requestType == null) {
- throw new NullPointerException("requestType is null");
- }
- return new ServiceDeploymentSpec<T, O>(interceptorSpecs, serviceName,
serviceType, requestType, replyType, null);
- }
-
- public <T> ServiceDeploymentSpec<I, T> setReplyType(Class<T>
replyType) {
- if (replyType == null) {
- throw new NullPointerException("replyType is null");
- }
- return new ServiceDeploymentSpec<I, T>(interceptorSpecs, serviceName,
serviceType, requestType, replyType, null);
- }
-
- public ServiceDeploymentSpec<I, O> setRequestListener(RequestListener<I,
O> requestListener) {
- if (requestListener == null) {
- throw new NullPointerException("requestListener is null");
- }
- return new ServiceDeploymentSpec<I, O>(interceptorSpecs, serviceName,
serviceType, requestType, replyType, requestListener);
- }
-
- public String toString() {
- StringBuilder builder = new StringBuilder();
- builder.append("Service specification for ");
- if (serviceType == null) {
- builder.append("untyped ");
- } else {
- builder.append("typed (\"");
- builder.append(serviceType);
- builder.append("\") ");
- }
- if (serviceName == null) {
- builder.append(", unnamed service ");
- } else {
- builder.append("service named \"");
- builder.append(serviceName);
- builder.append("\" ");
- }
- builder.append(": Request type is ");
- builder.append(requestType.getName());
- builder.append(", reply type is ");
- builder.append(replyType.getName());
- builder.append(", interceptors are ");
- builder.append(interceptorSpecs.toString());
- builder.append(", request listener is ");
- builder.append(requestListener.toString());
- return builder.toString();
- }
-}
Deleted: remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/ServiceLocator.java
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/ServiceLocator.java 2008-02-25
13:35:25 UTC (rev 3501)
+++ remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/ServiceLocator.java 2008-02-26
00:40:52 UTC (rev 3502)
@@ -1,189 +0,0 @@
-package org.jboss.cx.remoting;
-
-import java.util.Collections;
-import java.util.Set;
-
-/**
- *
- */
-public final class ServiceLocator<I, O> {
-
- /**
- * A basic service locator. Use this instance to create more specific locators.
- */
- public static final ServiceLocator<Void, Void> DEFAULT = new
ServiceLocator<Void, Void>(Void.class, Void.class, null, "*",
"*", Collections.<String>emptySet());
-
- private final Class<I> requestType;
- private final Class<O> replyType;
- private final String serviceType;
- private final String serviceGroupName;
- private final String endpointName;
- private final Set<String> availableInterceptors;
-
- private ServiceLocator(final Class<I> requestType, final Class<O>
replyType, final String serviceType, final String serviceGroupName, final String
endpointName, final Set<String> availableInterceptors) {
- if (requestType == null) {
- throw new NullPointerException("requestType is null");
- }
- if (replyType == null) {
- throw new NullPointerException("replyType is null");
- }
- if (availableInterceptors == null) {
- throw new NullPointerException("availableInterceptors is null");
- }
- this.requestType = requestType;
- this.replyType = replyType;
- this.serviceType = serviceType;
- this.serviceGroupName = serviceGroupName;
- this.endpointName = endpointName;
- this.availableInterceptors = availableInterceptors;
- }
-
- /**
- * Get the request type for this service locator. The remote service will match this
request type if the actual
- * service accepts this type, or a superclass or superinterface thereof.
- *
- * @return the request type
- */
- public Class<I> getRequestType() {
- return requestType;
- }
-
- /**
- * Get the reply type for this service locator. The remote service will match this
reply type if the actual
- * service returns this type, or a subtype thereof.
- *
- * @return the reply type
- */
- public Class<O> getReplyType() {
- return replyType;
- }
-
- /**
- * Get the name of the service group for this service locator.
- *
- * @return the service group name
- */
- public String getServiceGroupName() {
- return serviceGroupName;
- }
-
- /**
- * Get the type of the service for this service locator.
- *
- * @return the service type
- */
- public String getServiceType() {
- return serviceType;
- }
-
- /**
- * Get the name of the endpoitn for this service locator.
- *
- * @return the endpoint name
- */
- public String getEndpointName() {
- return endpointName;
- }
-
- /**
- * Get the names of the interceptors that the client has available.
- *
- * @return the names
- */
- public Set<String> getAvailableInterceptors() {
- return availableInterceptors;
- }
-
- /**
- * Change the request type. This method does not modify this object; instead, it
returns a new modified instance.
- *
- * @param requestType the new request type
- *
- * @return an updated service locator
- */
- public <T> ServiceLocator<T, O> setRequestType(Class<T>
requestType) {
- return new ServiceLocator<T, O>(requestType, replyType, serviceType,
serviceGroupName, endpointName, availableInterceptors);
- }
-
- /**
- * Change the request type. This method does not modify this object; instead, it
returns a new modified instance.
- *
- * @param replyType the new request type
- *
- * @return an updated service locator
- */
- public <T> ServiceLocator<I, T> setReplyType(Class<T> replyType) {
- return new ServiceLocator<I, T>(requestType, replyType, serviceType,
serviceGroupName, endpointName, availableInterceptors);
- }
-
- /**
- * Change the service type. The service type is a string that identifies the
"kind" of service provided. All
- * services of a given type should accept the same request and reply types as well.
- * <p/>
- * The service type should be a dot-separated name (like an Internet host name).
- * <p/>
- * This method does not modify this object; instead, it returns a new modified
instance.
- *
- * @param serviceType the new service type; may not be {@code null}
- *
- * @return an updated service locator
- */
- public ServiceLocator<I, O> setServiceType(String serviceType) {
- if (serviceType == null) {
- throw new NullPointerException("serviceType is null");
- }
- return new ServiceLocator<I, O>(requestType, replyType, serviceType,
serviceGroupName, endpointName, availableInterceptors);
- }
-
- /**
- * Change the service group name. The service group name is a string that identifies
a group of endpoints that are
- * all providing the same service, for load-balancing or clustering purposes.
- *
- * @param serviceGroupName
- *
- * @return an updated service locator
- */
- public ServiceLocator<I, O> setServiceGroupName(String serviceGroupName) {
- if (serviceGroupName == null) {
- throw new NullPointerException("serviceGroupName is null");
- }
- return new ServiceLocator<I, O>(requestType, replyType, serviceType,
serviceGroupName, endpointName, availableInterceptors);
- }
-
- /**
- * Change the endpoint name.
- * <p/>
- * The endpoint name should be a dot-separated name (like an Internet host name). A
{@code "*"}
- * character can be used as a wildcard to match any name. So, the name {@code
"foo.*"} would match {@code
- * "foo.bar"} and {@code "foo.bar.two"} but not {@code
"foobar"}.
- * <p/>
- * If no endpoint name is specified, then this value defaults to {@code
"*"} (match all endpoints).
- * <p/>
- * This method does not modify this object; instead, it returns a new modified
instance.
- *
- * @param endpointName the new endpoint name; may not be {@code null}
- *
- * @return an updated service locator
- */
- public ServiceLocator<I, O> setEndpointName(String endpointName) {
- if (endpointName == null) {
- throw new NullPointerException("endpointName is null");
- }
- return new ServiceLocator<I, O>(requestType, replyType, serviceType,
serviceGroupName, endpointName, availableInterceptors);
- }
-
- /**
- * Change the set of locally available interceptors.
- *
- * @param availableInterceptors the set of interceptors
- *
- * @return an updated service locator
- */
- public ServiceLocator<I, O> setAvailableInterceptors(Set<String>
availableInterceptors) {
- if (availableInterceptors == null) {
- throw new NullPointerException("availableInterceptors is null");
- }
- return new ServiceLocator<I, O>(requestType, replyType, serviceType,
serviceGroupName, endpointName, availableInterceptors);
- }
-
-}
Modified: remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/Session.java
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/Session.java 2008-02-25
13:35:25 UTC (rev 3501)
+++ remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/Session.java 2008-02-26
00:40:52 UTC (rev 3502)
@@ -1,6 +1,7 @@
package org.jboss.cx.remoting;
import java.util.concurrent.ConcurrentMap;
+import java.io.Closeable;
/**
* Represents a point-to-point relationship with another endpoint.
@@ -9,7 +10,7 @@
* <p/>
* A session may be shared safely among multiple threads.
*/
-public interface Session {
+public interface Session extends Closeable {
/**
* Close this session. Any associated connection(s) will be closed. Calling this
method multiple times has no
* effect.
@@ -38,10 +39,9 @@
String getRemoteEndpointName();
/**
- * Establish an agreement to communicate with a service on the remote side.
+ * Get the root context for this session.
*
- * @param locator the locator for the service
- * @return a context source which may be used to create communication contexts
+ * @return the root context
*/
- <I, O> ContextSource<I, O> openService(ServiceLocator<I, O>
locator) throws RemotingException;
+ <I, O> Context<I, O> getRootContext();
}
Added: remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/service/ClassReply.java
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/service/ClassReply.java
(rev 0)
+++
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/service/ClassReply.java 2008-02-26
00:40:52 UTC (rev 3502)
@@ -0,0 +1,23 @@
+package org.jboss.cx.remoting.service;
+
+import java.io.Serializable;
+
+/**
+ *
+ */
+public final class ClassReply implements Serializable {
+ private static final long serialVersionUID = 1L;
+
+ private byte[] classBytes;
+
+ public ClassReply() {
+ }
+
+ public byte[] getClassBytes() {
+ return classBytes;
+ }
+
+ public void setClassBytes(final byte[] classBytes) {
+ this.classBytes = classBytes;
+ }
+}
Added: remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/service/ClassRequest.java
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/service/ClassRequest.java
(rev 0)
+++
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/service/ClassRequest.java 2008-02-26
00:40:52 UTC (rev 3502)
@@ -0,0 +1,23 @@
+package org.jboss.cx.remoting.service;
+
+import java.io.Serializable;
+
+/**
+ *
+ */
+public final class ClassRequest implements Serializable {
+ private static final long serialVersionUID = 1L;
+
+ private String name;
+
+ public ClassRequest() {
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public void setName(final String name) {
+ this.name = name;
+ }
+}
Deleted:
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/service/SecurityService.java
===================================================================
---
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/service/SecurityService.java 2008-02-25
13:35:25 UTC (rev 3501)
+++
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/service/SecurityService.java 2008-02-26
00:40:52 UTC (rev 3502)
@@ -1,34 +0,0 @@
-package org.jboss.cx.remoting.service;
-
-import org.jboss.cx.remoting.RemotingException;
-
-import javax.security.auth.callback.CallbackHandler;
-
-/**
- *
- */
-public interface SecurityService {
-
- /**
- * @param userName
- *
- * @throws RemotingException
- */
- void changeUser(String userName) throws RemotingException;
-
- /**
- * @param userName
- * @param password
- *
- * @throws RemotingException
- */
- void changeUser(String userName, char[] password) throws RemotingException;
-
- /**
- * @param clientCallbackHandler
- *
- * @throws RemotingException
- */
- void changeUser(CallbackHandler clientCallbackHandler) throws RemotingException;
-
-}
Added: remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/service/ServiceReply.java
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/service/ServiceReply.java
(rev 0)
+++
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/service/ServiceReply.java 2008-02-26
00:40:52 UTC (rev 3502)
@@ -0,0 +1,34 @@
+package org.jboss.cx.remoting.service;
+
+import org.jboss.cx.remoting.ContextSource;
+import org.jboss.cx.remoting.Context;
+import java.io.Serializable;
+
+/**
+ *
+ */
+public final class ServiceReply<I, O> implements Serializable {
+ private static final long serialVersionUID = 1L;
+
+ private ContextSource<I, O> serviceContextSource;
+ private Context<ClassRequest, ClassReply> classLoadingContext;
+
+ public ServiceReply() {
+ }
+
+ public ContextSource<I, O> getServiceContextSource() {
+ return serviceContextSource;
+ }
+
+ public void setServiceContextSource(final ContextSource<I, O>
serviceContextSource) {
+ this.serviceContextSource = serviceContextSource;
+ }
+
+ public Context<ClassRequest, ClassReply> getClassLoadingContext() {
+ return classLoadingContext;
+ }
+
+ public void setClassLoadingContext(final Context<ClassRequest, ClassReply>
classLoadingContext) {
+ this.classLoadingContext = classLoadingContext;
+ }
+}
Added:
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/service/ServiceRequest.java
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/service/ServiceRequest.java
(rev 0)
+++
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/service/ServiceRequest.java 2008-02-26
00:40:52 UTC (rev 3502)
@@ -0,0 +1,50 @@
+package org.jboss.cx.remoting.service;
+
+import java.net.URI;
+import java.io.Serializable;
+
+/**
+ *
+ */
+public final class ServiceRequest<I, O> implements Serializable {
+ private static final long serialVersionUID = 1L;
+
+ private URI uri;
+ private Class<I> requestType;
+ private Class<O> replyType;
+
+ public ServiceRequest() {
+ }
+
+ public static <I, O> ServiceRequest<I, O> create(Class<I>
requestType, Class<O> replyType, URI uri) {
+ ServiceRequest<I, O> serviceRequest = new ServiceRequest<I, O>();
+ serviceRequest.setRequestType(requestType);
+ serviceRequest.setReplyType(replyType);
+ serviceRequest.setUri(uri);
+ return serviceRequest;
+ }
+
+ public URI getUri() {
+ return uri;
+ }
+
+ public void setUri(final URI uri) {
+ this.uri = uri;
+ }
+
+ public Class<I> getRequestType() {
+ return requestType;
+ }
+
+ public void setRequestType(final Class<I> requestType) {
+ this.requestType = requestType;
+ }
+
+ public Class<O> getReplyType() {
+ return replyType;
+ }
+
+ public void setReplyType(final Class<O> replyType) {
+ this.replyType = replyType;
+ }
+}
Deleted: remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/service/TxnService.java
===================================================================
---
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/service/TxnService.java 2008-02-25
13:35:25 UTC (rev 3501)
+++
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/service/TxnService.java 2008-02-26
00:40:52 UTC (rev 3502)
@@ -1,40 +0,0 @@
-package org.jboss.cx.remoting.service;
-
-import org.jboss.cx.remoting.RemotingException;
-
-import javax.transaction.xa.XAResource;
-
-/**
- *
- */
-public interface TxnService {
- /**
- * Begin a transaction on the remote side.
- *
- * @throws org.jboss.cx.remoting.RemotingException if the transaction could not be
started
- */
- void begin() throws RemotingException;
-
- /**
- * Commit the current transaction on the remote side.
- *
- * @throws RemotingException if the transaction could not be committed.
- */
- void commit() throws RemotingException;
-
- /**
- * Roll back the current transaction on the remote side.
- *
- * @throws RemotingException if the transaction could not be rolled back
- */
- void rollback() throws RemotingException;
-
- /**
- * Get an XA resource to control transactions on the remote side for this context.
- *
- * @return the XA resource
- *
- * @throws RemotingException if the XA resource could not be acquired
- */
- XAResource getXAResource() throws RemotingException;
-}
Deleted:
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/AbstractClientInterceptor.java
===================================================================
---
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/AbstractClientInterceptor.java 2008-02-25
13:35:25 UTC (rev 3501)
+++
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/AbstractClientInterceptor.java 2008-02-26
00:40:52 UTC (rev 3502)
@@ -1,14 +0,0 @@
-package org.jboss.cx.remoting.spi;
-
-/**
- * A simple base implementation of {@code ContextServiceInterceptor}. Use this class as
a base for simple
- * implementations of that interface.
- */
-public abstract class AbstractClientInterceptor<T> extends AbstractInterceptor
implements ClientInterceptor<T> {
- protected AbstractClientInterceptor() {
- }
-
- public T getContextService(InterceptorContext context) {
- return null;
- }
-}
Deleted:
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/AbstractInterceptor.java
===================================================================
---
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/AbstractInterceptor.java 2008-02-25
13:35:25 UTC (rev 3501)
+++
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/AbstractInterceptor.java 2008-02-26
00:40:52 UTC (rev 3502)
@@ -1,34 +0,0 @@
-package org.jboss.cx.remoting.spi;
-
-import org.jboss.cx.remoting.RemoteExecutionException;
-
-/**
- *
- */
-public abstract class AbstractInterceptor implements Interceptor {
- protected AbstractInterceptor() {
- }
-
- public void processRequest(final InterceptorContext context, final Object request) {
- context.nextRequest(request);
- }
-
- public void processReply(final InterceptorContext context, final Object reply) {
- context.nextReply(reply);
- }
-
- public void processCancelRequest(final InterceptorContext context, final boolean
mayInterrupt) {
- context.nextCancelRequest(mayInterrupt);
- }
-
- public void processCancelAcknowledge(final InterceptorContext context) {
- context.nextCancelAcknowledge();
- }
-
- public void processException(final InterceptorContext context, final
RemoteExecutionException exception) {
- context.nextException(exception);
- }
-
- public void close() {
- }
-}
Deleted:
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/AbstractServerInterceptor.java
===================================================================
---
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/AbstractServerInterceptor.java 2008-02-25
13:35:25 UTC (rev 3501)
+++
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/AbstractServerInterceptor.java 2008-02-26
00:40:52 UTC (rev 3502)
@@ -1,10 +0,0 @@
-package org.jboss.cx.remoting.spi;
-
-/**
- * A simple base implementation of {@code ContextServiceInterceptor}. Use this class as
a base for simple
- * implementations of that interface.
- */
-public abstract class AbstractServerInterceptor extends AbstractInterceptor implements
ServerInterceptor {
- protected AbstractServerInterceptor() {
- }
-}
Deleted:
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/ClientInterceptor.java
===================================================================
---
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/ClientInterceptor.java 2008-02-25
13:35:25 UTC (rev 3501)
+++
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/ClientInterceptor.java 2008-02-26
00:40:52 UTC (rev 3502)
@@ -1,16 +0,0 @@
-package org.jboss.cx.remoting.spi;
-
-/**
- * An interceptor that provides an additional service to a {@code Context}. A context
service interceptor is created
- * for every context service for each context.
- */
-public interface ClientInterceptor<T> extends Interceptor {
-
- /**
- * Get the context service object associated with this handler. This instance is the
end-user's interface into this
- * service. If no interface is available for this context service, return {@code
null}.
- *
- * @return the context service object
- */
- T getContextService(InterceptorContext context);
-}
Deleted:
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/ClientInterceptorFactory.java
===================================================================
---
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/ClientInterceptorFactory.java 2008-02-25
13:35:25 UTC (rev 3501)
+++
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/ClientInterceptorFactory.java 2008-02-26
00:40:52 UTC (rev 3502)
@@ -1,11 +0,0 @@
-package org.jboss.cx.remoting.spi;
-
-import org.jboss.cx.remoting.Context;
-import org.jboss.cx.remoting.spi.protocol.ContextIdentifier;
-
-/**
- *
- */
-public interface ClientInterceptorFactory {
- ClientInterceptor createInstance(Context<?, ?> context, ContextIdentifier
contextIdentifier);
-}
Deleted: remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/Discovery.java
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/Discovery.java 2008-02-25
13:35:25 UTC (rev 3501)
+++ remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/Discovery.java 2008-02-26
00:40:52 UTC (rev 3502)
@@ -1,18 +0,0 @@
-package org.jboss.cx.remoting.spi;
-
-/**
- *
- */
-public interface Discovery {
- /**
- * Signal that the discovered route has gone offline.
- */
- void remove();
-
- /**
- * Change the cost of this route.
- *
- * @param cost the new cost
- */
- void updateCost(int cost);
-}
Deleted:
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/EndpointProvider.java
===================================================================
---
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/EndpointProvider.java 2008-02-25
13:35:25 UTC (rev 3501)
+++
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/EndpointProvider.java 2008-02-26
00:40:52 UTC (rev 3502)
@@ -1,10 +0,0 @@
-package org.jboss.cx.remoting.spi;
-
-import org.jboss.cx.remoting.Endpoint;
-
-/**
- *
- */
-public interface EndpointProvider {
- Endpoint createEndpoint(String name);
-}
Deleted: remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/Interceptor.java
===================================================================
---
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/Interceptor.java 2008-02-25
13:35:25 UTC (rev 3501)
+++
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/Interceptor.java 2008-02-26
00:40:52 UTC (rev 3502)
@@ -1,52 +0,0 @@
-package org.jboss.cx.remoting.spi;
-
-import org.jboss.cx.remoting.RemoteExecutionException;
-
-/**
- *
- */
-public interface Interceptor {
- /**
- * Process a request.
- *
- * @param context the context service interceptor context
- * @param request the outbound request
- */
- void processRequest(InterceptorContext context, Object request);
-
- /**
- * Process a request reply.
- *
- * @param context the context service interceptor context
- * @param reply the inbound reply
- */
- void processReply(InterceptorContext context, Object reply);
-
- /**
- * Process a request exception.
- *
- * @param context the context service interceptor context
- * @param exception the inbound exception
- */
- void processException(InterceptorContext context, RemoteExecutionException
exception);
-
- /**
- * Process a cancellation request.
- *
- * @param context the context service interceptor context
- * @param mayInterrupt {@code true} if the operation can be interrupted
- */
- void processCancelRequest(InterceptorContext context, boolean mayInterrupt);
-
- /**
- * Process a cancellation acknowledgement.
- *
- * @param context the context service interceptor context
- */
- void processCancelAcknowledge(InterceptorContext context);
-
- /**
- * Close this interceptor.
- */
- void close();
-}
Deleted:
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/InterceptorContext.java
===================================================================
---
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/InterceptorContext.java 2008-02-25
13:35:25 UTC (rev 3501)
+++
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/InterceptorContext.java 2008-02-26
00:40:52 UTC (rev 3502)
@@ -1,18 +0,0 @@
-package org.jboss.cx.remoting.spi;
-
-import org.jboss.cx.remoting.RemoteExecutionException;
-
-/**
- *
- */
-public interface InterceptorContext {
- void nextRequest(Object request);
-
- void nextReply(Object reply);
-
- void nextException(RemoteExecutionException exception);
-
- void nextCancelRequest(boolean mayInterrupt);
-
- void nextCancelAcknowledge();
-}
Deleted: remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/InterceptorSpec.java
===================================================================
---
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/InterceptorSpec.java 2008-02-25
13:35:25 UTC (rev 3501)
+++
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/InterceptorSpec.java 2008-02-26
00:40:52 UTC (rev 3502)
@@ -1,78 +0,0 @@
-package org.jboss.cx.remoting.spi;
-
-import java.io.Serializable;
-import java.util.Comparator;
-
-/**
- *
- */
-public final class InterceptorSpec implements Serializable {
- private static final long serialVersionUID = 1L;
-
- private final int slot;
- private final Class<?> interceptorClass;
- private final Type type;
-
- public enum Type {
- PRIVATE,
- OPTIONAL,
- REQUIRED
- }
-
- public InterceptorSpec(final int slot, final Class<?> interceptorClass, final
Type type) {
- if (type == null) throw new NullPointerException("'type' parameter
is null");
- if (interceptorClass == null) throw new
NullPointerException("'interceptorClass' parameter is null");
- if (slot < 0) throw new IllegalArgumentException("'slot'
parameter must not be negative");
- this.slot = slot;
- this.interceptorClass = interceptorClass;
- this.type = type;
- }
-
- public int getSlot() {
- return slot;
- }
-
- public Class<?> getInterceptorClass() {
- return interceptorClass;
- }
-
- public Type getType() {
- return type;
- }
-
- private transient int hashCode;
- private transient boolean hashCodeDone;
-
- public int hashCode() {
- if (! hashCodeDone) {
- hashCode = slot ^ 37 * interceptorClass.hashCode() ^ 97 * type.hashCode();
- hashCodeDone = true;
- }
- return hashCode;
- }
-
- public boolean equals(Object obj) {
- if (! (obj instanceof InterceptorSpec)) {
- return false;
- }
- InterceptorSpec other = (InterceptorSpec) obj;
- return other.slot == slot &&
other.interceptorClass.equals(interceptorClass) && other.type == type;
- }
-
- public static final class ComparatorImpl implements
Comparator<InterceptorSpec>, Serializable {
- private static final long serialVersionUID = 1L;
-
- private ComparatorImpl() {
- }
-
- public int compare(final InterceptorSpec o1, final InterceptorSpec o2) {
- return o2.slot - o1.slot;
- }
- }
-
- private static final Comparator<InterceptorSpec> comparator = new
ComparatorImpl();
-
- public static Comparator<InterceptorSpec> getComparator() {
- return comparator;
- }
-}
Deleted:
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/ListenerFactoryContext.java
===================================================================
---
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/ListenerFactoryContext.java 2008-02-25
13:35:25 UTC (rev 3501)
+++
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/ListenerFactoryContext.java 2008-02-26
00:40:52 UTC (rev 3502)
@@ -1,31 +0,0 @@
-package org.jboss.cx.remoting.spi;
-
-import org.jboss.cx.remoting.RemotingException;
-
-/**
- *
- */
-public interface ListenerFactoryContext {
- /**
- * Add an interceptor that the remote side is required to acknowledge.
- *
- * @param name
- */
- <T> T requireInterceptor(String name, Class<T> interceptorType) throws
RemotingException;
-
- /**
- * Add an interceptor if the remote side agrees.
- *
- * @param name
- */
- <T> T offerInterceptor(String name, Class<T> interceptorType) throws
RemotingException;
-
- /**
- * Add an interceptor to the local side. The remote side is not notified.
- *
- * @param name
- */
- <T> T addPrivateInterceptor(String name, Class<T> interceptorType) throws
RemotingException;
-
-
-}
Deleted:
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/ServerInterceptor.java
===================================================================
---
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/ServerInterceptor.java 2008-02-25
13:35:25 UTC (rev 3501)
+++
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/ServerInterceptor.java 2008-02-26
00:40:52 UTC (rev 3502)
@@ -1,8 +0,0 @@
-package org.jboss.cx.remoting.spi;
-
-/**
- * An interceptor that provides an additional service to a {@code Context}. A context
service interceptor is created
- * for every context service for each context.
- */
-public interface ServerInterceptor extends Interceptor {
-}
Deleted:
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/ServerInterceptorFactory.java
===================================================================
---
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/ServerInterceptorFactory.java 2008-02-25
13:35:25 UTC (rev 3501)
+++
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/ServerInterceptorFactory.java 2008-02-26
00:40:52 UTC (rev 3502)
@@ -1,10 +0,0 @@
-package org.jboss.cx.remoting.spi;
-
-import org.jboss.cx.remoting.spi.protocol.ContextIdentifier;
-
-/**
- *
- */
-public interface ServerInterceptorFactory {
- ServerInterceptor createInstance(ContextIdentifier contextIdentifier);
-}
Modified:
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/protocol/ProtocolContext.java
===================================================================
---
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/protocol/ProtocolContext.java 2008-02-25
13:35:25 UTC (rev 3501)
+++
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/protocol/ProtocolContext.java 2008-02-26
00:40:52 UTC (rev 3502)
@@ -3,7 +3,6 @@
import java.io.IOException;
import java.util.concurrent.Executor;
import org.jboss.cx.remoting.RemoteExecutionException;
-import org.jboss.cx.remoting.ServiceLocator;
import org.jboss.cx.remoting.util.ByteInput;
import org.jboss.cx.remoting.util.ByteOutput;
import org.jboss.cx.remoting.util.MessageInput;
@@ -16,8 +15,6 @@
/* CLIENT methods */
- void receiveServiceActivate(ServiceIdentifier serviceIdentifier);
-
void receiveReply(ContextIdentifier contextIdentifier, RequestIdentifier
requestIdentifier, Object reply);
void receiveException(ContextIdentifier contextIdentifier, RequestIdentifier
requestIdentifier, RemoteExecutionException exception);
@@ -30,8 +27,6 @@
void closeContext(ContextIdentifier remoteContextIdentifier);
- void receiveServiceRequest(ServiceIdentifier remoteServiceIdentifier,
ServiceLocator<?, ?> locator);
-
void closeService(ServiceIdentifier remoteServiceIdentifier);
void receiveOpenedContext(ServiceIdentifier remoteServiceIdentifier,
ContextIdentifier remoteContextIdentifier);
Modified:
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/protocol/ProtocolHandler.java
===================================================================
---
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/protocol/ProtocolHandler.java 2008-02-25
13:35:25 UTC (rev 3501)
+++
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/protocol/ProtocolHandler.java 2008-02-26
00:40:52 UTC (rev 3502)
@@ -1,11 +1,8 @@
package org.jboss.cx.remoting.spi.protocol;
import java.io.IOException;
-import java.io.ObjectInput;
-import java.io.ObjectOutput;
import java.util.concurrent.Executor;
import org.jboss.cx.remoting.RemoteExecutionException;
-import org.jboss.cx.remoting.ServiceLocator;
import org.jboss.cx.remoting.util.MessageOutput;
/**
@@ -33,15 +30,6 @@
/* SERVER methods */
/**
- * Send a service activation response to the remote side. The service identifier
will have been produced by
- * the protocol handler on the remote side.
- *
- * @param remoteServiceIdentifier the remote service identifier
- * @throws IOException if an I/O error occurs
- */
- void sendServiceActivate(ServiceIdentifier remoteServiceIdentifier) throws
IOException;
-
- /**
* Send the reply to a request.
*
* @param remoteContextIdentifier the context that the request was received under
@@ -82,9 +70,27 @@
/* CLIENT methods */
/**
+ * Get the identifier for the root context for this session. The root context lives
as long as the session is up.
+ * This identifier is used to invoke the root context listener from the local side to
the remote side.
+ *
+ * @return the identifier for the root context
+ * @throws IOException if an I/O error occurs
+ */
+ ContextIdentifier getLocalRootContextIdentifier();
+
+ /**
+ * Get the identifier for the root context for this session. The root context lives
as long as the session is up.
+ * This identifier is used to invoke the root context listener from the remote side
to the local side.
+ *
+ * @return the identifier for the root context
+ * @throws IOException if an I/O error occurs
+ */
+ ContextIdentifier getRemoteRootContextIdentifier();
+
+ /**
* Get a new context identifier that will be used to send requests to the remote
side. The service identifier
- * was previously acquired from the {@link #openService()} method. Should send a
message to the remote side such
- * that the {@link ProtocolContext#receiveOpenedContext(ServiceIdentifier,
ContextIdentifier)} method is called with
+ * was received from the remote side. Should send a message to the remote side such
that the
+ * {@link ProtocolContext#receiveOpenedContext(ServiceIdentifier, ContextIdentifier)}
method is called with
* the new service and context identifiers.
*
* @param serviceIdentifier the service identifier
@@ -113,7 +119,7 @@
RequestIdentifier openRequest(ContextIdentifier contextIdentifier) throws
IOException;
/**
- * Get a new service identifier that will be used to request a service from the
remote side.
+ * Get a new service identifier that may be transmitted to the remote side.
*
* @return the new service identifier
* @throws IOException if an I/O error occurs
@@ -121,16 +127,6 @@
ServiceIdentifier openService() throws IOException;
/**
- * Send a service activation request to the remote side. The service identifier will
have been obtained from
- * the {@link #openService()} method on this {@code ProtocolHandler}.
- *
- * @param serviceIdentifier the service identifier
- * @param locator the locator for the new service
- * @throws IOException if an I/O error occurs
- */
- void sendServiceRequest(ServiceIdentifier serviceIdentifier, ServiceLocator<?,
?> locator) throws IOException;
-
- /**
* Send a notification that the client is no longer using the given service.
*
* @param serviceIdentifier the service identifier
@@ -162,6 +158,14 @@
/* SESSION methods */
/**
+ * Open a serviceless context. The context identifier may be transmitted to the
remote side.
+ *
+ * @return a context identifier
+ * @throws IOException if an I/O error occurs
+ */
+ ContextIdentifier openContext() throws IOException;
+
+ /**
* Open a stream on this session. Since either side may open a stream, it is
important that the client and
* server side take precautions to ensure that both the client and server will not
initiate the same stream at
* the same time.
@@ -182,27 +186,9 @@
void closeStream(StreamIdentifier streamIdentifier) throws IOException;
/**
- * Read a stream identifier from a message.
- *
- * @param input
- * @return the new stream identifier
- * @throws IOException if an I/O error occurs
- */
- StreamIdentifier readStreamIdentifier(ObjectInput input) throws IOException;
-
- /**
- * Write a stream identifier to an object output stream.
- *
- * @param output the output to write to
- * @param identifier the identifier to write
- * @throws IOException if an I/O error occurs
- */
- void writeStreamIdentifier(ObjectOutput output, StreamIdentifier identifier) throws
IOException;
-
- /**
* Send data over a stream. Returns a message output buffer that the message is
written into. When the message
- * is fully written, the {@link org.jboss.cx.remoting.util.MessageOutput#commit()}
method will be called to perform the transmission. The
- * supplied executor should be passed in to
+ * is fully written, the {@link org.jboss.cx.remoting.util.MessageOutput#commit()}
method will be called to perform
+ * the transmission. The supplied executor should be passed in to
* {@link
org.jboss.cx.remoting.spi.protocol.ProtocolContext#getMessageOutput(org.jboss.cx.remoting.util.ByteOutput,
java.util.concurrent.Executor)},
* if that method is used for serialization.
*
Modified:
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/protocol/ProtocolHandlerFactory.java
===================================================================
---
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/protocol/ProtocolHandlerFactory.java 2008-02-25
13:35:25 UTC (rev 3501)
+++
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/protocol/ProtocolHandlerFactory.java 2008-02-26
00:40:52 UTC (rev 3502)
@@ -25,7 +25,7 @@
*
* @param context the protocol context to use for inbound data
* @param remoteUri the URI of the remote side
- * @param attributeMap
+ * @param attributeMap the attributes for the underlying protocol to apply
* @return the protocol handler for outbound data
*
* @throws IOException if the handler could not be created
Modified:
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/wrapper/EndpointWrapper.java
===================================================================
---
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/wrapper/EndpointWrapper.java 2008-02-25
13:35:25 UTC (rev 3501)
+++
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/wrapper/EndpointWrapper.java 2008-02-26
00:40:52 UTC (rev 3502)
@@ -3,14 +3,12 @@
import java.net.URI;
import java.util.concurrent.ConcurrentMap;
import org.jboss.cx.remoting.Endpoint;
-import org.jboss.cx.remoting.EndpointShutdownListener;
-import org.jboss.cx.remoting.InterceptorDeploymentSpec;
import org.jboss.cx.remoting.RemotingException;
-import org.jboss.cx.remoting.ServiceDeploymentSpec;
import org.jboss.cx.remoting.Session;
+import org.jboss.cx.remoting.Context;
+import org.jboss.cx.remoting.RequestListener;
+import org.jboss.cx.remoting.ContextSource;
import org.jboss.cx.remoting.util.AttributeMap;
-import org.jboss.cx.remoting.spi.Discovery;
-import org.jboss.cx.remoting.spi.Registration;
import org.jboss.cx.remoting.spi.protocol.ProtocolRegistration;
import org.jboss.cx.remoting.spi.protocol.ProtocolRegistrationSpec;
@@ -28,39 +26,23 @@
return delegate.getAttributes();
}
- public void shutdown() {
- delegate.shutdown();
- }
-
- public Session openSession(final URI remoteUri, AttributeMap attributeMap) throws
RemotingException {
+ public Session openSession(final URI remoteUri, final AttributeMap attributeMap)
throws RemotingException {
return delegate.openSession(remoteUri, attributeMap);
}
- public Discovery discover(final String endpointName, final URI nextHop, final int
cost) throws RemotingException {
- return delegate.discover(endpointName, nextHop, cost);
- }
-
- public Registration deployInterceptorType(final InterceptorDeploymentSpec spec)
throws RemotingException {
- return delegate.deployInterceptorType(spec);
- }
-
public String getName() {
return delegate.getName();
}
- public <I, O> Registration deployService(ServiceDeploymentSpec<I, O>
spec) throws RemotingException, IllegalArgumentException {
- return delegate.deployService(spec);
- }
-
public ProtocolRegistration registerProtocol(final ProtocolRegistrationSpec spec)
throws RemotingException, IllegalArgumentException {
return delegate.registerProtocol(spec);
}
- public void addShutdownListener(final EndpointShutdownListener listener) {
- delegate.addShutdownListener(listener);
+ public <I, O> Context<I, O> createContext(final RequestListener<I,
O> requestListener) {
+ return delegate.createContext(requestListener);
}
- public void removeShutdownListener(final EndpointShutdownListener listener) {
- delegate.removeShutdownListener(listener);
+ public <I, O> ContextSource<I, O> createService(final
RequestListener<I, O> requestListener) {
+ return delegate.createService(requestListener);
}
}
Modified:
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/wrapper/RequestContextWrapper.java
===================================================================
---
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/wrapper/RequestContextWrapper.java 2008-02-25
13:35:25 UTC (rev 3501)
+++
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/wrapper/RequestContextWrapper.java 2008-02-26
00:40:52 UTC (rev 3502)
@@ -34,4 +34,7 @@
delegate.addCancelHandler(requestCancelHandler);
}
+ public void execute(final Runnable command) {
+ delegate.execute(command);
+ }
}
Modified:
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/wrapper/SessionWrapper.java
===================================================================
---
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/wrapper/SessionWrapper.java 2008-02-25
13:35:25 UTC (rev 3501)
+++
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/wrapper/SessionWrapper.java 2008-02-26
00:40:52 UTC (rev 3502)
@@ -1,10 +1,9 @@
package org.jboss.cx.remoting.spi.wrapper;
import java.util.concurrent.ConcurrentMap;
-import org.jboss.cx.remoting.ContextSource;
import org.jboss.cx.remoting.RemotingException;
-import org.jboss.cx.remoting.ServiceLocator;
import org.jboss.cx.remoting.Session;
+import org.jboss.cx.remoting.Context;
/**
*
@@ -32,7 +31,7 @@
return delegate.getRemoteEndpointName();
}
- public <I, O> ContextSource<I, O> openService(final ServiceLocator<I,
O> locator) throws RemotingException {
- return delegate.openService(locator);
+ public <I, O> Context<I, O> getRootContext() {
+ return delegate.getRootContext();
}
}
Modified: remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreEndpoint.java
===================================================================
---
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreEndpoint.java 2008-02-25
13:35:25 UTC (rev 3501)
+++
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreEndpoint.java 2008-02-26
00:40:52 UTC (rev 3502)
@@ -2,30 +2,23 @@
import java.io.IOException;
import java.net.URI;
-import java.util.Iterator;
import java.util.List;
-import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
-import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
import org.jboss.cx.remoting.Endpoint;
import org.jboss.cx.remoting.EndpointShutdownListener;
-import org.jboss.cx.remoting.InterceptorDeploymentSpec;
import org.jboss.cx.remoting.RemotingException;
-import org.jboss.cx.remoting.ServiceDeploymentSpec;
-import org.jboss.cx.remoting.ServiceLocator;
import org.jboss.cx.remoting.Session;
+import org.jboss.cx.remoting.Context;
+import org.jboss.cx.remoting.RequestListener;
+import org.jboss.cx.remoting.ContextSource;
import org.jboss.cx.remoting.util.CollectionUtil;
import org.jboss.cx.remoting.version.Version;
import org.jboss.cx.remoting.log.Logger;
import org.jboss.cx.remoting.util.AtomicStateMachine;
import org.jboss.cx.remoting.util.AttributeMap;
-import org.jboss.cx.remoting.spi.Discovery;
-import org.jboss.cx.remoting.spi.Registration;
import org.jboss.cx.remoting.spi.protocol.ProtocolContext;
import org.jboss.cx.remoting.spi.protocol.ProtocolHandler;
import org.jboss.cx.remoting.spi.protocol.ProtocolHandlerFactory;
@@ -48,7 +41,8 @@
private final Endpoint userEndpoint = new UserEndpoint();
private final OrderedExecutorFactory orderedExecutorFactory;
private final AtomicStateMachine<State> state =
AtomicStateMachine.start(State.UP);
- private final ExecutorService executor;
+ private final Executor executor;
+ private final RequestListener<?, ?> rootRequestListener;
static {
Logger.getLogger("org.jboss.cx.remoting").info("JBoss Remoting
version %s", Version.VERSION);
@@ -59,16 +53,16 @@
DOWN,
}
- protected CoreEndpoint(final String name) {
+ protected CoreEndpoint(final String name, final RequestListener<?, ?>
rootRequestListener) {
this.name = name;
// todo - make this configurable
executor = Executors.newCachedThreadPool();
orderedExecutorFactory = new OrderedExecutorFactory(executor);
+ this.rootRequestListener = rootRequestListener;
}
private final ConcurrentMap<Object, Object> endpointMap =
CollectionUtil.concurrentMap();
private final ConcurrentMap<String, CoreProtocolRegistration> protocolMap =
CollectionUtil.concurrentMap();
- private final ConcurrentMap<ServiceKey, CoreDeployedService<?, ?>>
services = CollectionUtil.concurrentMap();
private final Set<CoreSession> sessions =
CollectionUtil.synchronizedSet(CollectionUtil.<CoreSession>hashSet());
// accesses protected by {@code shutdownListeners} - always lock AFTER {@code state}
private final List<EndpointShutdownListener> shutdownListeners =
CollectionUtil.arrayList();
@@ -94,89 +88,10 @@
sessions.notifyAll();
}
- @SuppressWarnings ({"unchecked"})
- <I, O> CoreDeployedService<I, O>
locateDeployedService(ServiceLocator<I, O> locator) {
- state.requireHold(State.UP);
- try {
- final String name = locator.getServiceGroupName();
- final String type = locator.getServiceType();
- // first try the quick (exact) lookup
- if (name.indexOf('*') == -1) {
- final CoreDeployedService<I, O> service =
(CoreDeployedService<I, O>) services.get(new ServiceKey(name, type));
- if (service != null) {
- return service;
- } else {
- return null;
- }
- }
- final Pattern pattern = createWildcardPattern(name);
- for (Map.Entry<ServiceKey,CoreDeployedService<?,?>> entry :
services.entrySet()) {
- final CoreEndpoint.ServiceKey key = entry.getKey();
- final String entryName = key.getName();
- final String entryType = key.getType();
- if (entryType.equals(type) &&
pattern.matcher(entryName).matches()) {
- return (CoreDeployedService<I, O>) entry.getValue();
- }
- }
- return null;
- } finally {
- state.release();
- }
+ RequestListener<?, ?> getRootRequestListener() {
+ return rootRequestListener;
}
- private static final Pattern wildcardPattern =
Pattern.compile("^([^*]+|\\*)+$");
-
- private static Pattern createWildcardPattern(final String string) {
- final Matcher matcher = wildcardPattern.matcher(string);
- final StringBuilder target = new StringBuilder(string.length() * 2);
- while (matcher.find()) {
- final String val = matcher.group(1);
- if ("*".equals(val)) {
- target.append(".*");
- } else {
- target.append(Pattern.quote(val));
- }
- }
- return Pattern.compile(target.toString());
- }
-
- private final class ServiceKey {
- private final String name;
- private final String type;
-
- private ServiceKey(final String name, final String type) {
- this.name = name;
- this.type = type;
- }
-
- private String getName() {
- return name;
- }
-
- private String getType() {
- return type;
- }
-
- public boolean equals(final Object o) {
- if (this == o) return true;
- if (o == null || getClass() != o.getClass()) return false;
-
- final ServiceKey that = (ServiceKey) o;
-
- if (name != null ? !name.equals(that.name) : that.name != null) return
false;
- if (type != null ? !type.equals(that.type) : that.type != null) return
false;
-
- return true;
- }
-
- public int hashCode() {
- int result;
- result = (name != null ? name.hashCode() : 0);
- result = 31 * result + (type != null ? type.hashCode() : 0);
- return result;
- }
- }
-
public final class CoreProtocolServerContext implements ProtocolServerContext {
private CoreProtocolServerContext() {
}
@@ -184,8 +99,6 @@
public ProtocolContext establishSession(ProtocolHandler handler) {
final CoreSession session = new CoreSession(CoreEndpoint.this);
session.initializeServer(handler);
-
- //, handler);
return session.getProtocolContext();
}
}
@@ -262,38 +175,8 @@
coreSession.shutdown();
}
sessions.clear();
- executor.shutdown();
}
- public void addShutdownListener(EndpointShutdownListener listener) {
- final State currentState = state.getStateHold();
- try {
- switch (currentState) {
- case UP:
- synchronized(shutdownListeners) {
- shutdownListeners.add(listener);
- return;
- }
- default:
- // must be shut down!
- listener.handleShutdown(this);
- }
- } finally {
- state.release();
- }
- }
-
- public void removeShutdownListener(EndpointShutdownListener listener) {
- synchronized(shutdownListeners) {
- final Iterator<EndpointShutdownListener> i =
shutdownListeners.iterator();
- while (i.hasNext()) {
- if (i.next() == listener) {
- i.remove();
- }
- }
- }
- }
-
public Session openSession(final URI uri, final AttributeMap attributeMap) throws
RemotingException {
final String scheme = uri.getScheme();
if (scheme == null) {
@@ -325,29 +208,6 @@
return name;
}
- public <I, O> Registration deployService(final ServiceDeploymentSpec<I,
O> spec) throws RemotingException {
- if (spec.getServiceName() == null) {
- throw new NullPointerException("spec.getServiceName() is
null");
- }
- if (spec.getServiceType() == null) {
- throw new NullPointerException("spec.getServiceType() is
null");
- }
- if (spec.getRequestListener() == null) {
- throw new NullPointerException("spec.getRequestListener() is
null");
- }
- state.requireHold(State.UP);
- try {
- final CoreDeployedService<I, O> service = new
CoreDeployedService<I, O>(spec.getServiceName(), spec.getServiceType(),
spec.getRequestListener());
- if (services.putIfAbsent(new ServiceKey(spec.getServiceName(),
spec.getServiceType()), service) != null) {
- throw new RemotingException("A service with the same name is
already deployed");
- }
- // todo - return a registration instance
- return null;
- } finally {
- state.release();
- }
- }
-
public ProtocolRegistration registerProtocol(ProtocolRegistrationSpec spec)
throws RemotingException, IllegalArgumentException {
if (spec.getScheme() == null) {
throw new NullPointerException("spec.getScheme() is null");
@@ -365,14 +225,13 @@
}
}
- public Registration deployInterceptorType(final InterceptorDeploymentSpec spec)
throws RemotingException {
- // todo - interceptors
+ public <I, O> Context<I, O> createContext(RequestListener<I, O>
requestListener) {
return null;
}
- public Discovery discover(String endpointName, URI nextHop, int cost) throws
RemotingException {
- // todo - implement
+ public <I, O> ContextSource<I, O> createService(RequestListener<I,
O> requestListener) {
return null;
}
+
}
}
Deleted:
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreEndpointProvider.java
===================================================================
---
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreEndpointProvider.java 2008-02-25
13:35:25 UTC (rev 3501)
+++
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreEndpointProvider.java 2008-02-26
00:40:52 UTC (rev 3502)
@@ -1,30 +0,0 @@
-package org.jboss.cx.remoting.core;
-
-import java.util.HashSet;
-import java.util.Set;
-import org.jboss.cx.remoting.Endpoint;
-import org.jboss.cx.remoting.RemotingException;
-import org.jboss.cx.remoting.util.CollectionUtil;
-import org.jboss.cx.remoting.spi.EndpointProvider;
-
-/**
- *
- */
-public final class CoreEndpointProvider implements EndpointProvider {
- private final LocalProtocol localProtocol = new LocalProtocol();
- private final Set<String> endpointNames = CollectionUtil.synchronizedSet(new
HashSet<String>());
-
- public Endpoint createEndpoint(String name) {
- // todo - need a way to signal the removal of an endpoint
- if (! endpointNames.add(name)) {
- throw new IllegalArgumentException("Failed to create endpoint (endpoint
with the same name already exists");
- }
- final Endpoint userEndpoint = new CoreEndpoint(name).getUserEndpoint();
- try {
- localProtocol.addToEndpoint(userEndpoint);
- } catch (RemotingException e) {
- throw new IllegalStateException("Cannot create endpoint", e);
- }
- return userEndpoint;
- }
-}
Modified:
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreInboundContext.java
===================================================================
---
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreInboundContext.java 2008-02-25
13:35:25 UTC (rev 3501)
+++
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreInboundContext.java 2008-02-26
00:40:52 UTC (rev 3502)
@@ -1,12 +1,10 @@
package org.jboss.cx.remoting.core;
-import java.util.List;
import java.util.concurrent.ConcurrentMap;
import org.jboss.cx.remoting.RemoteExecutionException;
import org.jboss.cx.remoting.RemotingException;
import org.jboss.cx.remoting.RequestListener;
import org.jboss.cx.remoting.util.CollectionUtil;
-import org.jboss.cx.remoting.spi.ServerInterceptorFactory;
import org.jboss.cx.remoting.spi.protocol.ContextIdentifier;
import org.jboss.cx.remoting.spi.protocol.RequestIdentifier;
@@ -21,7 +19,7 @@
private final ConcurrentMap<RequestIdentifier,CoreInboundRequest<I, O>>
requests = CollectionUtil.concurrentMap();
- public CoreInboundContext(final ContextIdentifier contextIdentifier, final
CoreSession coreSession, final RequestListener<I, O> requestListener, final
List<ServerInterceptorFactory> factoryList) {
+ public CoreInboundContext(final ContextIdentifier contextIdentifier, final
CoreSession coreSession, final RequestListener<I, O> requestListener) {
this.contextIdentifier = contextIdentifier;
this.coreSession = coreSession;
this.requestListener = requestListener;
@@ -64,7 +62,7 @@
// Request mgmt
CoreInboundRequest<I, O> createInboundRequest(final RequestIdentifier
requestIdentifier, final I request) {
- return new CoreInboundRequest<I, O>(requestIdentifier, request, this,
requestListener);
+ return new CoreInboundRequest<I, O>(requestIdentifier, this,
requestListener, coreSession.getExecutor());
}
CoreInboundRequest<I, O> getInboundRequest(RequestIdentifier requestIdentifier)
{
Modified:
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreInboundRequest.java
===================================================================
---
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreInboundRequest.java 2008-02-25
13:35:25 UTC (rev 3501)
+++
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreInboundRequest.java 2008-02-26
00:40:52 UTC (rev 3502)
@@ -8,6 +8,12 @@
import org.jboss.cx.remoting.util.AtomicStateMachine;
import org.jboss.cx.remoting.log.Logger;
import org.jboss.cx.remoting.spi.protocol.RequestIdentifier;
+import java.util.concurrent.Executor;
+import java.util.Set;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Iterator;
+import java.util.LinkedList;
/**
*
@@ -16,18 +22,23 @@
private static final Logger log = Logger.getLogger(CoreInboundRequest.class);
private final RequestIdentifier requestIdentifier;
- private final I request;
private final CoreInboundContext<I, O> context;
private final RequestListener<I,O> requestListener;
+ private final Executor executor;
private final AtomicStateMachine<State> state =
AtomicStateMachine.start(State.INITIAL);
private final UserRequestContext userRequestContext = new UserRequestContext();
- public CoreInboundRequest(final RequestIdentifier requestIdentifier, final I request,
final CoreInboundContext<I, O> context, final RequestListener<I, O>
requestListener) {
+ private boolean mayInterrupt;
+ private boolean cancel;
+ private Set<Thread> tasks;
+ private List<RequestCancelHandler<O>> cancelHandlers;
+
+ public CoreInboundRequest(final RequestIdentifier requestIdentifier, final
CoreInboundContext<I, O> context, final RequestListener<I, O> requestListener,
final Executor executor) {
this.requestIdentifier = requestIdentifier;
- this.request = request;
this.context = context;
this.requestListener = requestListener;
+ this.executor = executor;
}
private enum State {
@@ -61,6 +72,29 @@
}
public void receiveCancelRequest(final boolean mayInterrupt) {
+ synchronized(this) {
+ if (! cancel) {
+ cancel = true;
+ this.mayInterrupt = mayInterrupt;
+ if (mayInterrupt) {
+ for (Thread t : tasks) {
+ t.interrupt();
+ }
+ }
+ if (cancelHandlers != null) {
+ final Iterator<RequestCancelHandler<O>> i =
cancelHandlers.iterator();
+ while (i.hasNext()) {
+ final RequestCancelHandler<O> handler = i.next();
+ i.remove();
+ executor.execute(new Runnable() {
+ public void run() {
+ handler.notifyCancel(userRequestContext, mayInterrupt);
+ }
+ });
+ }
+ }
+ }
+ }
}
public final class UserRequestContext implements RequestContext<O> {
@@ -88,7 +122,40 @@
}
public void addCancelHandler(final RequestCancelHandler<O>
requestCancelHandler) {
- // todo - should be a list
+ final boolean mayInterrupt;
+ synchronized(CoreInboundRequest.this) {
+ if (!cancel) {
+ if (cancelHandlers == null) {
+ cancelHandlers = new
LinkedList<RequestCancelHandler<O>>();
+ }
+ cancelHandlers.add(requestCancelHandler);
+ return;
+ }
+ // otherwise, unlock and notify now
+ mayInterrupt = CoreInboundRequest.this.mayInterrupt;
+ }
+ requestCancelHandler.notifyCancel(this, mayInterrupt);
}
+
+ public void execute(final Runnable command) {
+ executor.execute(new Runnable() {
+ public void run() {
+ final Thread thread = Thread.currentThread();
+ synchronized(CoreInboundRequest.this) {
+ if (tasks == null) {
+ tasks = new HashSet<Thread>();
+ }
+ tasks.add(thread);
+ }
+ try {
+ command.run();
+ } finally {
+ synchronized(CoreInboundRequest.this) {
+ tasks.remove(thread);
+ }
+ }
+ }
+ });
+ }
}
}
Modified:
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreInboundService.java
===================================================================
---
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreInboundService.java 2008-02-25
13:35:25 UTC (rev 3501)
+++
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreInboundService.java 2008-02-26
00:40:52 UTC (rev 3502)
@@ -2,7 +2,6 @@
import org.jboss.cx.remoting.RemotingException;
import org.jboss.cx.remoting.RequestListener;
-import org.jboss.cx.remoting.ServiceLocator;
import org.jboss.cx.remoting.log.Logger;
import org.jboss.cx.remoting.spi.protocol.ContextIdentifier;
import org.jboss.cx.remoting.spi.protocol.ServiceIdentifier;
@@ -17,14 +16,13 @@
private final ServiceIdentifier serviceIdentifier;
private final RequestListener<I, O> requestListener;
- public CoreInboundService(final CoreEndpoint coreEndpoint, final CoreSession
coreSession, final ServiceIdentifier serviceIdentifier, final ServiceLocator<I, O>
locator) throws RemotingException {
+ public CoreInboundService(final CoreSession coreSession, final ServiceIdentifier
serviceIdentifier, final RequestListener<I, O> requestListener) throws
RemotingException {
this.coreSession = coreSession;
this.serviceIdentifier = serviceIdentifier;
- final CoreDeployedService<I, O> service =
coreEndpoint.locateDeployedService(locator);
- requestListener = service.getRequestListener();
+ this.requestListener = requestListener;
}
- public void receivedOpenedContext(final ContextIdentifier remoteContextIdentifier) {
+ void receivedOpenedContext(final ContextIdentifier remoteContextIdentifier) {
coreSession.createServerContext(serviceIdentifier, remoteContextIdentifier,
requestListener);
}
}
Modified:
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreOutboundService.java
===================================================================
---
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreOutboundService.java 2008-02-25
13:35:25 UTC (rev 3501)
+++
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreOutboundService.java 2008-02-26
00:40:52 UTC (rev 3502)
@@ -1,14 +1,10 @@
package org.jboss.cx.remoting.core;
-import java.util.List;
import org.jboss.cx.remoting.Context;
import org.jboss.cx.remoting.ContextSource;
import org.jboss.cx.remoting.RemotingException;
-import org.jboss.cx.remoting.ServiceLocator;
import org.jboss.cx.remoting.util.AtomicStateMachine;
-import org.jboss.cx.remoting.util.CollectionUtil;
import org.jboss.cx.remoting.log.Logger;
-import org.jboss.cx.remoting.spi.ClientInterceptorFactory;
import org.jboss.cx.remoting.spi.protocol.ServiceIdentifier;
/**
@@ -21,8 +17,6 @@
private CoreSession coreSession;
private final AtomicStateMachine<State> state =
AtomicStateMachine.start(State.WAITING_FOR_REPLY);
private final ContextSource<I, O> userContextSource = new UserContextSource();
- private final List<ClientInterceptorFactory> interceptorFactories =
CollectionUtil.arrayList();
- private final ServiceLocator<I,O> locator;
private enum State {
WAITING_FOR_REPLY,
@@ -31,10 +25,9 @@
DOWN
}
- protected CoreOutboundService(final CoreSession coreSession, final ServiceIdentifier
serviceIdentifier, final ServiceLocator<I, O> locator) {
+ protected CoreOutboundService(final CoreSession coreSession, final ServiceIdentifier
serviceIdentifier) {
this.coreSession = coreSession;
this.serviceIdentifier = serviceIdentifier;
- this.locator = locator;
}
// State mgmt
@@ -48,7 +41,6 @@
// Outbound protocol messages
void sendServiceRequest() throws RemotingException {
- coreSession.sendServiceRequest(serviceIdentifier, locator);
}
// Inbound protocol messages
Modified: remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreSession.java
===================================================================
---
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreSession.java 2008-02-25
13:35:25 UTC (rev 3501)
+++
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreSession.java 2008-02-26
00:40:52 UTC (rev 3502)
@@ -14,12 +14,11 @@
import java.util.Map;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
-import org.jboss.cx.remoting.ContextSource;
import org.jboss.cx.remoting.RemoteExecutionException;
import org.jboss.cx.remoting.RemotingException;
import org.jboss.cx.remoting.RequestListener;
-import org.jboss.cx.remoting.ServiceLocator;
import org.jboss.cx.remoting.Session;
+import org.jboss.cx.remoting.Context;
import org.jboss.cx.remoting.core.stream.DefaultStreamDetector;
import org.jboss.cx.remoting.util.AtomicStateMachine;
import org.jboss.cx.remoting.util.AttributeMap;
@@ -39,6 +38,7 @@
import org.jboss.cx.remoting.spi.stream.StreamDetector;
import org.jboss.cx.remoting.spi.stream.StreamSerializer;
import org.jboss.cx.remoting.spi.stream.StreamSerializerFactory;
+import org.jboss.cx.remoting.spi.wrapper.ContextWrapper;
/**
@@ -75,6 +75,8 @@
private ProtocolHandler protocolHandler;
/** The remote endpoint name. Set on CONNECTING -> UP */
private String remoteEndpointName;
+ /** The root client context. Set on CONNECTING -> UP */
+ private Context<?, ?> rootContext;
private final AtomicStateMachine<State> state =
AtomicStateMachine.start(State.NEW);
@@ -91,15 +93,25 @@
// Initializers
+ @SuppressWarnings ({"unchecked"})
void initializeServer(final ProtocolHandler protocolHandler) {
if (protocolHandler == null) {
throw new NullPointerException("protocolHandler is null");
}
state.requireTransitionExclusive(State.NEW, State.CONNECTING);
- this.protocolHandler = protocolHandler;
- state.releaseExclusive();
+ try {
+ this.protocolHandler = protocolHandler;
+ final RequestListener<?, ?> listener =
endpoint.getRootRequestListener();
+ if (listener != null) {
+ final ContextIdentifier contextIdentifier =
protocolHandler.getRemoteRootContextIdentifier();
+ serverContexts.put(contextIdentifier, new
CoreInboundContext(contextIdentifier, this, listener));
+ }
+ } finally {
+ state.releaseExclusive();
+ }
}
+ @SuppressWarnings ({"unchecked"})
void initializeClient(final ProtocolHandlerFactory protocolHandlerFactory, final URI
remoteUri, final AttributeMap attributeMap) throws IOException {
if (protocolHandlerFactory == null) {
throw new NullPointerException("protocolHandlerFactory is null");
@@ -107,6 +119,11 @@
state.requireTransitionExclusive(State.NEW, State.CONNECTING);
try {
protocolHandler = protocolHandlerFactory.createHandler(protocolContext,
remoteUri, attributeMap);
+ final RequestListener<?, ?> listener =
endpoint.getRootRequestListener();
+ if (listener != null) {
+ final ContextIdentifier contextIdentifier =
protocolHandler.getRemoteRootContextIdentifier();
+ serverContexts.put(contextIdentifier, new
CoreInboundContext(contextIdentifier, this, listener));
+ }
} finally {
state.releaseExclusive();
}
@@ -132,22 +149,6 @@
return true;
}
- void sendServiceRequest(final ServiceIdentifier serviceIdentifier, final
ServiceLocator<?,?> locator) throws RemotingException {
- try {
- protocolHandler.sendServiceRequest(serviceIdentifier, locator);
- } catch (IOException e) {
- throw new RemotingException("Failed to send a service request: " +
e);
- }
- }
-
- void sendServiceActivate(final ServiceIdentifier serviceIdentifier) throws
RemotingException {
- try {
- protocolHandler.sendServiceActivate(serviceIdentifier);
- } catch (IOException e) {
- throw new RemotingException("Failed to send a service activate: " +
e);
- }
- }
-
void sendReply(final ContextIdentifier contextIdentifier, final RequestIdentifier
requestIdentifier, final Object reply) throws RemotingException {
try {
protocolHandler.sendReply(contextIdentifier, requestIdentifier, reply);
@@ -210,6 +211,10 @@
return protocolHandler;
}
+ Executor getExecutor() {
+ return endpoint.getExecutor();
+ }
+
// Thread-local instance
private static final ThreadLocal<CoreSession> instance = new
ThreadLocal<CoreSession>();
@@ -285,7 +290,7 @@
}
state.requireHold(State.UP);
try {
- final CoreInboundContext<I, O> context = new CoreInboundContext<I,
O>(remoteContextIdentifier, this, requestListener, null);
+ final CoreInboundContext<I, O> context = new CoreInboundContext<I,
O>(remoteContextIdentifier, this, requestListener);
log.trace("Adding new server (inbound) context, ID = %s",
remoteContextIdentifier);
serverContexts.put(remoteContextIdentifier, context);
return context;
@@ -326,69 +331,6 @@
// Service mgmt
- <I, O> CoreOutboundService<I, O> createService(final ServiceLocator<I,
O> locator) throws RemotingException {
- if (locator == null) {
- throw new NullPointerException("locator is null");
- }
- state.requireHold(State.UP);
- try {
- final ServiceIdentifier serviceIdentifier;
- try {
- serviceIdentifier = protocolHandler.openService();
- } catch (IOException e) {
- throw new RemotingException("Failed to open service: " +
e.toString());
- }
- final CoreOutboundService<I, O> service = new CoreOutboundService<I,
O>(this, serviceIdentifier, locator);
- log.trace("Adding new client service, ID = %s",
serviceIdentifier);
- services.put(serviceIdentifier, new
WeakReference<CoreOutboundService>(service));
- return service;
- } finally {
- state.release();
- }
- }
-
- <I, O> CoreInboundService<I, O> createServerService(final
ServiceIdentifier serviceIdentifier, final ServiceLocator<I, O> locator) {
- if (serviceIdentifier == null) {
- throw new NullPointerException("serviceIdentifier is null");
- }
- if (locator == null) {
- throw new NullPointerException("locator is null");
- }
- state.requireHold(State.UP);
- try {
- final CoreInboundService<I, O> service;
- try {
- service = new CoreInboundService<I, O>(endpoint, this,
serviceIdentifier, locator);
- } catch (RemotingException e) {
- try {
- sendServiceTerminate(serviceIdentifier);
- } catch (RemotingException e1) {
- log.trace("Failed to notify client of service termination:
%s", e);
- }
- return null;
- }
- try {
- sendServiceActivate(serviceIdentifier);
- } catch (RemotingException e) {
- log.trace("Failed to notify client of service activation: %s",
e);
- return null;
- }
- log.trace("Adding new server service, ID = %s",
serviceIdentifier);
- serverServices.put(serviceIdentifier, service);
- return service;
- } finally {
- state.release();
- }
- }
-
- private void sendServiceTerminate(final ServiceIdentifier serviceIdentifier) throws
RemotingException {
- try {
- protocolHandler.sendServiceTerminate(serviceIdentifier);
- } catch (IOException e) {
- throw new RemotingException("Failed to send service terminate: " +
e.toString());
- }
- }
-
CoreOutboundService getService(final ServiceIdentifier serviceIdentifier) {
if (serviceIdentifier == null) {
throw new NullPointerException("serviceIdentifier is null");
@@ -445,23 +387,9 @@
return remoteEndpointName;
}
- public <I, O> ContextSource<I, O> openService(ServiceLocator<I,
O> locator) throws RemotingException {
- if (locator == null) {
- throw new NullPointerException("locator is null");
- }
- if (locator.getServiceType() == null) {
- throw new NullPointerException("locator.getServiceType() is
null");
- }
- state.waitForNotHold(State.CONNECTING);
- try {
- state.require(State.UP);
- final CoreOutboundService<I, O> service = createService(locator);
- service.sendServiceRequest();
- service.await();
- return service.getUserContextSource();
- } finally {
- state.release();
- }
+ @SuppressWarnings ({"unchecked"})
+ public <I, O> Context<I, O> getRootContext() {
+ return (Context<I, O>) rootContext;
}
}
@@ -511,20 +439,6 @@
}
}
- @SuppressWarnings({"unchecked"})
- public void receiveServiceRequest(ServiceIdentifier serviceIdentifier,
ServiceLocator<?, ?> locator) {
- createServerService(serviceIdentifier, locator);
- }
-
- public void receiveServiceActivate(ServiceIdentifier serviceIdentifier) {
- final CoreOutboundService service = getService(serviceIdentifier);
- if (service != null) {
- service.receiveServiceActivate();
- } else {
- log.trace("Got service activate for an unknown service (%s)",
serviceIdentifier);
- }
- }
-
public void receiveServiceTerminate(ServiceIdentifier serviceIdentifier) {
final CoreOutboundService service = getService(serviceIdentifier);
if (service != null) {
@@ -572,11 +486,20 @@
coreStream.receiveStreamData(data);
}
+ @SuppressWarnings ({"unchecked"})
public void openSession(String remoteEndpointName) {
state.waitForNotExclusive(State.NEW);
try {
state.requireTransition(State.CONNECTING, State.UP);
CoreSession.this.remoteEndpointName = remoteEndpointName;
+ final ContextIdentifier rootContextIdentifier =
protocolHandler.getLocalRootContextIdentifier();
+ final CoreOutboundContext outboundContext = new
CoreOutboundContext(CoreSession.this, rootContextIdentifier);
+ rootContext = new ContextWrapper(outboundContext.getUserContext()) {
+ public void close() throws RemotingException {
+ throw new RemotingException("close() not allowed on root
context");
+ }
+ };
+ contexts.put(rootContextIdentifier, new
WeakReference<CoreOutboundContext>(outboundContext));
} finally {
state.releaseExclusive();
}
Modified:
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/LocalProtocol.java
===================================================================
---
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/LocalProtocol.java 2008-02-25
13:35:25 UTC (rev 3501)
+++
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/LocalProtocol.java 2008-02-26
00:40:52 UTC (rev 3502)
@@ -9,7 +9,6 @@
import org.jboss.cx.remoting.Endpoint;
import org.jboss.cx.remoting.RemoteExecutionException;
import org.jboss.cx.remoting.RemotingException;
-import org.jboss.cx.remoting.ServiceLocator;
import org.jboss.cx.remoting.util.MessageOutput;
import org.jboss.cx.remoting.util.AttributeMap;
import org.jboss.cx.remoting.util.CollectionUtil;
@@ -121,32 +120,6 @@
log.trace("Closing stream for local protocol");
}
- public StreamIdentifier readStreamIdentifier(ObjectInput input) throws
IOException {
- throw new UnsupportedOperationException("streams");
- }
-
- public void writeStreamIdentifier(ObjectOutput output, StreamIdentifier
identifier) throws IOException {
- throw new UnsupportedOperationException("streams");
- }
-
- public StreamIdentifier readStreamIdentifier(MessageInput input) throws
IOException {
- throw new UnsupportedOperationException("streams");
- }
-
- public void writeStreamIdentifier(MessageOutput output, StreamIdentifier
identifier) throws IOException {
- throw new UnsupportedOperationException("streams");
- }
-
- public void sendServiceRequest(ServiceIdentifier serviceIdentifier,
ServiceLocator<?, ?> locator) throws IOException {
- log.trace("Sending service request for local protocol");
- remoteContext.receiveServiceRequest(serviceIdentifier, locator);
- }
-
- public void sendServiceActivate(ServiceIdentifier serviceIdentifier) throws
IOException {
- log.trace("Sending service activation for local protocol");
- remoteContext.receiveServiceActivate(serviceIdentifier);
- }
-
public void sendReply(ContextIdentifier remoteContextIdentifier,
RequestIdentifier requestIdentifier, Object reply) throws IOException {
log.trace("Sending stream for local protocol");
remoteContext.receiveReply(remoteContextIdentifier, requestIdentifier,
reply);
@@ -170,11 +143,23 @@
public void sendServiceTerminate(ServiceIdentifier remoteServiceIdentifier)
throws IOException {
}
+ public ContextIdentifier getLocalRootContextIdentifier() {
+ return null;
+ }
+
+ public ContextIdentifier getRemoteRootContextIdentifier() {
+ return null;
+ }
+
public void sendCancelRequest(ContextIdentifier contextIdentifier,
RequestIdentifier requestIdentifier, boolean mayInterrupt) throws IOException {
log.trace("Sending cancel request for local protocol");
remoteContext.receiveCancelRequest(contextIdentifier, requestIdentifier,
mayInterrupt);
}
+ public ContextIdentifier openContext() throws IOException {
+ return null;
+ }
+
public MessageOutput sendStreamData(StreamIdentifier streamIdentifier, final
Executor streamExeceutor) throws IOException {
throw new UnsupportedOperationException("streams");
}
Added:
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/ServiceLocatorListener.java
===================================================================
---
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/ServiceLocatorListener.java
(rev 0)
+++
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/ServiceLocatorListener.java 2008-02-26
00:40:52 UTC (rev 3502)
@@ -0,0 +1,25 @@
+package org.jboss.cx.remoting.core;
+
+import org.jboss.cx.remoting.RequestListener;
+import org.jboss.cx.remoting.RequestContext;
+import org.jboss.cx.remoting.RemoteExecutionException;
+import org.jboss.cx.remoting.util.ServiceURI;
+import org.jboss.cx.remoting.service.ServiceRequest;
+import org.jboss.cx.remoting.service.ServiceReply;
+import java.net.URI;
+
+/**
+ *
+ */
+public final class ServiceLocatorListener<I, O> implements
RequestListener<ServiceRequest<I, O>, ServiceReply<I, O>> {
+
+
+ public void handleRequest(final RequestContext<ServiceReply<I, O>>
requestContext, final ServiceRequest<I, O> request) throws RemoteExecutionException,
InterruptedException {
+ final URI uri = request.getUri();
+ final ServiceURI serviceURI = new ServiceURI(uri);
+ final String endpointName = serviceURI.getEndpointName();
+ final String groupName = serviceURI.getGroupName();
+ final String serviceType = serviceURI.getServiceType();
+
+ }
+}
Modified: remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/StreamMarker.java
===================================================================
---
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/StreamMarker.java 2008-02-25
13:35:25 UTC (rev 3501)
+++
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/StreamMarker.java 2008-02-26
00:40:52 UTC (rev 3502)
@@ -44,13 +44,13 @@
public void writeExternal(ObjectOutput out) throws IOException {
out.writeObject(factoryClass);
- coreSession.getProtocolHandler().writeStreamIdentifier(out, streamIdentifier);
+ out.writeObject(streamIdentifier);
}
@SuppressWarnings ({"unchecked"})
public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException
{
coreSession = CoreSession.getInstance();
factoryClass = (Class<? extends StreamSerializerFactory>) in.readObject();
- streamIdentifier = coreSession.getProtocolHandler().readStreamIdentifier(in);
+ streamIdentifier = (StreamIdentifier) in.readObject();
}
}
Modified:
remoting3/trunk/http/src/main/java/org/jboss/cx/remoting/http/RemotingHttpSessionImpl.java
===================================================================
---
remoting3/trunk/http/src/main/java/org/jboss/cx/remoting/http/RemotingHttpSessionImpl.java 2008-02-25
13:35:25 UTC (rev 3501)
+++
remoting3/trunk/http/src/main/java/org/jboss/cx/remoting/http/RemotingHttpSessionImpl.java 2008-02-26
00:40:52 UTC (rev 3502)
@@ -15,7 +15,6 @@
import org.jboss.cx.remoting.spi.protocol.RequestIdentifier;
import org.jboss.cx.remoting.spi.protocol.StreamIdentifier;
import org.jboss.cx.remoting.RemoteExecutionException;
-import org.jboss.cx.remoting.ServiceLocator;
import java.util.LinkedList;
import java.util.Set;
@@ -185,6 +184,14 @@
});
}
+ public ContextIdentifier getLocalRootContextIdentifier() {
+ return null;
+ }
+
+ public ContextIdentifier getRemoteRootContextIdentifier() {
+ return null;
+ }
+
public ContextIdentifier openContext(final ServiceIdentifier serviceIdentifier)
throws IOException {
final ContextIdentifier contextIdentifier = null;
outgoingQueue.add(new OutputAction() {
@@ -218,26 +225,6 @@
return null;
}
- public void sendServiceRequest(final ServiceIdentifier serviceIdentifier, final
ServiceLocator<?, ?> locator) throws IOException {
- outgoingQueue.add(new OutputAction() {
- public void run(ByteOutput target) throws IOException {
- final MessageOutput msgOutput =
protocolContext.getMessageOutput(target);
- write(msgOutput, MsgType.SERVICE_REQUEST);
- write(msgOutput, serviceIdentifier);
- msgOutput.writeObject(locator.getRequestType());
- msgOutput.writeObject(locator.getReplyType());
- msgOutput.writeUTF(locator.getServiceType());
- msgOutput.writeUTF(locator.getServiceGroupName());
- final Set<String> interceptors =
locator.getAvailableInterceptors();
- msgOutput.writeInt(interceptors.size());
- for (String name : interceptors) {
- msgOutput.writeUTF(name);
- }
- msgOutput.commit();
- }
- });
- }
-
public void closeService(final ServiceIdentifier serviceIdentifier) throws
IOException {
outgoingQueue.add(new OutputAction() {
public void run(ByteOutput target) throws IOException {
@@ -273,6 +260,10 @@
});
}
+ public ContextIdentifier openContext() throws IOException {
+ return null;
+ }
+
public StreamIdentifier openStream() throws IOException {
return null;
}
Modified:
remoting3/trunk/jrpp/src/main/java/org/jboss/cx/remoting/jrpp/JrppConnection.java
===================================================================
---
remoting3/trunk/jrpp/src/main/java/org/jboss/cx/remoting/jrpp/JrppConnection.java 2008-02-25
13:35:25 UTC (rev 3501)
+++
remoting3/trunk/jrpp/src/main/java/org/jboss/cx/remoting/jrpp/JrppConnection.java 2008-02-26
00:40:52 UTC (rev 3502)
@@ -1,7 +1,6 @@
package org.jboss.cx.remoting.jrpp;
import java.io.IOException;
-import java.io.ObjectInput;
import java.io.ObjectOutput;
import static java.lang.Math.min;
import java.util.Enumeration;
@@ -11,6 +10,7 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicInteger;
import org.apache.mina.common.AttributeKey;
import org.apache.mina.common.IdleStatus;
import org.apache.mina.common.IoBuffer;
@@ -20,13 +20,12 @@
import org.apache.mina.handler.multiton.SingleSessionIoHandler;
import org.jboss.cx.remoting.CommonKeys;
import org.jboss.cx.remoting.RemoteExecutionException;
-import org.jboss.cx.remoting.ServiceLocator;
import org.jboss.cx.remoting.util.AtomicStateMachine;
import org.jboss.cx.remoting.util.AttributeMap;
import org.jboss.cx.remoting.util.MessageOutput;
import org.jboss.cx.remoting.util.MessageInput;
import org.jboss.cx.remoting.util.CollectionUtil;
-import org.jboss.cx.remoting.jrpp.id.IdentifierManager;
+import org.jboss.cx.remoting.util.WeakHashSet;
import org.jboss.cx.remoting.jrpp.id.JrppContextIdentifier;
import org.jboss.cx.remoting.jrpp.id.JrppRequestIdentifier;
import org.jboss.cx.remoting.jrpp.id.JrppServiceIdentifier;
@@ -71,7 +70,6 @@
private final ProtocolHandler protocolHandler;
private final SingleSessionIoHandler ioHandler;
- private final IdentifierManager identifierManager;
private final AttributeMap attributeMap;
private IoSession ioSession;
@@ -79,6 +77,18 @@
private ProtocolContext protocolContext;
private IOException failureReason;
+ private boolean client;
+
+ private final AtomicInteger streamIdSequence = new AtomicInteger(0);
+ private final AtomicInteger contextIdSequence = new AtomicInteger(1);
+ private final AtomicInteger serviceIdSequence = new AtomicInteger(0);
+ private final AtomicInteger requestIdSequence = new AtomicInteger(0);
+
+ private final Set<StreamIdentifier> liveStreamSet =
CollectionUtil.synchronizedSet(new WeakHashSet<StreamIdentifier>());
+ private final Set<ContextIdentifier> liveContextSet =
CollectionUtil.synchronizedSet(new WeakHashSet<ContextIdentifier>());
+ private final Set<RequestIdentifier> liveRequestSet =
CollectionUtil.synchronizedSet(new WeakHashSet<RequestIdentifier>());
+ private final Set<ServiceIdentifier> liveServiceSet =
CollectionUtil.synchronizedSet(new WeakHashSet<ServiceIdentifier>());
+
/**
* The negotiated protocol version. Value is set to {@code min(PROTOCOL_VERSION,
remote PROTOCOL_VERSION)}.
*/
@@ -118,7 +128,6 @@
public JrppConnection(final AttributeMap attributeMap) {
this.attributeMap = attributeMap;
ioHandler = new IoHandlerImpl();
- identifierManager = new IdentifierManager();
protocolHandler = new RemotingProtocolHandler();
}
@@ -128,6 +137,7 @@
ioSession.setAttribute(JRPP_CONNECTION, this);
this.ioSession = ioSession;
this.protocolContext = protocolContext;
+ client = true;
} finally {
state.releaseExclusive();
}
@@ -140,6 +150,7 @@
this.ioSession = ioSession;
final ProtocolContext protocolContext =
protocolServerContext.establishSession(protocolHandler);
this.protocolContext = protocolContext;
+ client = false;
} finally {
state.releaseExclusive();
}
@@ -361,6 +372,42 @@
}
}
+ private JrppContextIdentifier getNewContextIdentifier() {
+ for (;;) {
+ final JrppContextIdentifier contextIdentifier = new
JrppContextIdentifier(client, contextIdSequence.getAndIncrement());
+ if (liveContextSet.add(contextIdentifier)) {
+ return contextIdentifier;
+ }
+ }
+ }
+
+ private JrppRequestIdentifier getNewRequestIdentifier() {
+ for (;;) {
+ final JrppRequestIdentifier requestIdentifier = new
JrppRequestIdentifier(client, requestIdSequence.getAndIncrement());
+ if (liveRequestSet.add(requestIdentifier)) {
+ return requestIdentifier;
+ }
+ }
+ }
+
+ private JrppStreamIdentifier getNewStreamIdentifier() {
+ for (;;) {
+ final JrppStreamIdentifier streamIdentifier = new
JrppStreamIdentifier(client, streamIdSequence.getAndIncrement());
+ if (liveStreamSet.add(streamIdentifier)) {
+ return streamIdentifier;
+ }
+ }
+ }
+
+ private JrppServiceIdentifier getNewServiceIdentifier() {
+ for (;;) {
+ final JrppServiceIdentifier serviceIdentifier = new
JrppServiceIdentifier(client, serviceIdSequence.getAndIncrement());
+ if (liveServiceSet.add(serviceIdentifier)) {
+ return serviceIdentifier;
+ }
+ }
+ }
+
private static IoBuffer newBuffer(final int initialSize, final boolean autoexpand) {
return IoBuffer.allocate(initialSize + 4).setAutoExpand(autoexpand).skip(4);
}
@@ -368,7 +415,7 @@
public final class RemotingProtocolHandler implements ProtocolHandler {
public ContextIdentifier openContext(ServiceIdentifier serviceIdentifier) throws
IOException {
- final ContextIdentifier contextIdentifier = new
JrppContextIdentifier(identifierManager.getIdentifier());
+ final ContextIdentifier contextIdentifier = getNewContextIdentifier();
final IoBuffer buffer = newBuffer(60, false);
final MessageOutput output = protocolContext.getMessageOutput(new
IoBufferByteOutput(buffer, ioSession));
write(output, MessageType.OPEN_CONTEXT);
@@ -379,15 +426,15 @@
}
public RequestIdentifier openRequest(ContextIdentifier contextIdentifier) throws
IOException {
- return new JrppRequestIdentifier(identifierManager.getIdentifier());
+ return getNewRequestIdentifier();
}
public StreamIdentifier openStream() throws IOException {
- return new JrppStreamIdentifier(identifierManager.getIdentifier());
+ return getNewStreamIdentifier();
}
public ServiceIdentifier openService() throws IOException {
- return new JrppServiceIdentifier(identifierManager.getIdentifier());
+ return getNewServiceIdentifier();
}
public void closeSession() throws IOException {
@@ -437,52 +484,6 @@
}
}
- public StreamIdentifier readStreamIdentifier(ObjectInput input) throws
IOException {
- return new JrppStreamIdentifier(input);
- }
-
- public void writeStreamIdentifier(ObjectOutput output, StreamIdentifier
identifier) throws IOException {
- write(output, identifier);
- }
-
- public void sendServiceRequest(ServiceIdentifier serviceIdentifier,
ServiceLocator<?, ?> locator) throws IOException {
- if (serviceIdentifier == null) {
- throw new NullPointerException("serviceIdentifier is null");
- }
- if (locator == null) {
- throw new NullPointerException("locator is null");
- }
- if (! state.in(State.UP)) {
- throw new IllegalStateException("JrppConnection is not in the UP
state!");
- }
- final IoBuffer buffer = newBuffer(500, true);
- final MessageOutput output = protocolContext.getMessageOutput(new
IoBufferByteOutput(buffer, ioSession));
- write(output, MessageType.SERVICE_REQUEST);
- write(output, serviceIdentifier);
- output.writeObject(locator.getRequestType());
- output.writeObject(locator.getReplyType());
- output.writeUTF(locator.getServiceType());
- output.writeUTF(locator.getServiceGroupName());
- final Set<String> interceptors = locator.getAvailableInterceptors();
- final int cnt = interceptors.size();
- output.writeInt(cnt);
- for (String name : interceptors) {
- output.writeUTF(name);
- }
- output.commit();
- }
-
- public void sendServiceActivate(ServiceIdentifier serviceIdentifier) throws
IOException {
- if (serviceIdentifier == null) {
- throw new NullPointerException("serviceIdentifier is null");
- }
- final IoBuffer buffer = newBuffer(60, false);
- final MessageOutput output = protocolContext.getMessageOutput(new
IoBufferByteOutput(buffer, ioSession));
- write(output, MessageType.SERVICE_ACTIVATE);
- write(output, serviceIdentifier);
- output.commit();
- }
-
public void sendReply(ContextIdentifier remoteContextIdentifier,
RequestIdentifier requestIdentifier, Object reply) throws IOException {
if (remoteContextIdentifier == null) {
throw new NullPointerException("remoteContextIdentifier is
null");
@@ -560,6 +561,14 @@
output.commit();
}
+ public ContextIdentifier getLocalRootContextIdentifier() {
+ return null;
+ }
+
+ public ContextIdentifier getRemoteRootContextIdentifier() {
+ return null;
+ }
+
public void sendCancelRequest(ContextIdentifier contextIdentifier,
RequestIdentifier requestIdentifier, boolean mayInterrupt) throws IOException {
if (contextIdentifier == null) {
throw new NullPointerException("contextIdentifier is null");
@@ -576,6 +585,10 @@
output.commit();
}
+ public ContextIdentifier openContext() throws IOException {
+ return null;
+ }
+
public MessageOutput sendStreamData(StreamIdentifier streamIdentifier, Executor
streamExecutor) throws IOException {
if (streamIdentifier == null) {
throw new NullPointerException("streamIdentifier is null");
@@ -633,22 +646,6 @@
}
}
- private ContextIdentifier readCtxtId(MessageInput input) throws IOException {
- return new JrppContextIdentifier(input.readShort());
- }
-
- private ServiceIdentifier readSvcId(MessageInput input) throws IOException {
- return new JrppServiceIdentifier(input.readShort());
- }
-
- private StreamIdentifier readStrId(MessageInput input) throws IOException {
- return new JrppStreamIdentifier(input.readShort());
- }
-
- private RequestIdentifier readReqId(MessageInput input) throws IOException {
- return new JrppRequestIdentifier(input.readShort());
- }
-
public void messageReceived(Object message) throws Exception {
final boolean trace = log.isTrace();
final MessageInput input = protocolContext.getMessageInput(new
IoBufferByteInput((IoBuffer) message));
@@ -809,56 +806,56 @@
case UP: {
switch (type) {
case OPEN_CONTEXT: {
- final ServiceIdentifier serviceIdentifier =
readSvcId(input);
- final ContextIdentifier contextIdentifier =
readCtxtId(input);
+ final ServiceIdentifier serviceIdentifier =
(ServiceIdentifier) input.readObject();
+ final ContextIdentifier contextIdentifier =
(ContextIdentifier) input.readObject();
protocolContext.receiveOpenedContext(serviceIdentifier,
contextIdentifier);
return;
}
case CANCEL_ACK: {
- final ContextIdentifier contextIdentifier =
readCtxtId(input);
- final RequestIdentifier requestIdentifier =
readReqId(input);
+ final ContextIdentifier contextIdentifier =
(ContextIdentifier) input.readObject();
+ final RequestIdentifier requestIdentifier =
(RequestIdentifier) input.readObject();
protocolContext.receiveCancelAcknowledge(contextIdentifier,
requestIdentifier);
return;
}
case CANCEL_REQ: {
- final ContextIdentifier contextIdentifier =
readCtxtId(input);
- final RequestIdentifier requestIdentifier =
readReqId(input);
+ final ContextIdentifier contextIdentifier =
(ContextIdentifier) input.readObject();
+ final RequestIdentifier requestIdentifier =
(RequestIdentifier) input.readObject();
final boolean mayInterrupt = input.readBoolean();
protocolContext.receiveCancelRequest(contextIdentifier,
requestIdentifier, mayInterrupt);
return;
}
case CLOSE_CONTEXT: {
- final ContextIdentifier contextIdentifier =
readCtxtId(input);
+ final ContextIdentifier contextIdentifier =
(ContextIdentifier) input.readObject();
protocolContext.closeContext(contextIdentifier);
return;
}
case CLOSE_SERVICE: {
- final ServiceIdentifier serviceIdentifier =
readSvcId(input);
+ final ServiceIdentifier serviceIdentifier =
(ServiceIdentifier) input.readObject();
protocolContext.closeService(serviceIdentifier);
return;
}
case CLOSE_STREAM: {
- final StreamIdentifier streamIdentifier = readStrId(input);
+ final StreamIdentifier streamIdentifier = (StreamIdentifier)
input.readObject();
protocolContext.closeStream(streamIdentifier);
return;
}
case EXCEPTION: {
- final ContextIdentifier contextIdentifier =
readCtxtId(input);
- final RequestIdentifier requestIdentifier =
readReqId(input);
+ final ContextIdentifier contextIdentifier =
(ContextIdentifier) input.readObject();
+ final RequestIdentifier requestIdentifier =
(RequestIdentifier) input.readObject();
final RemoteExecutionException exception =
(RemoteExecutionException) input.readObject();
protocolContext.receiveException(contextIdentifier,
requestIdentifier, exception);
return;
}
case REPLY: {
- final ContextIdentifier contextIdentifier =
readCtxtId(input);
- final RequestIdentifier requestIdentifier =
readReqId(input);
+ final ContextIdentifier contextIdentifier =
(ContextIdentifier) input.readObject();
+ final RequestIdentifier requestIdentifier =
(RequestIdentifier) input.readObject();
final Object reply = input.readObject();
protocolContext.receiveReply(contextIdentifier,
requestIdentifier, reply);
return;
}
case REQUEST: {
- final ContextIdentifier contextIdentifier =
readCtxtId(input);
- final RequestIdentifier requestIdentifier =
readReqId(input);
+ final ContextIdentifier contextIdentifier =
(ContextIdentifier) input.readObject();
+ final RequestIdentifier requestIdentifier =
(RequestIdentifier) input.readObject();
final Object request = input.readObject();
if (trace) {
log.trace("Received request - body is %s",
request);
@@ -866,38 +863,13 @@
protocolContext.receiveRequest(contextIdentifier,
requestIdentifier, request);
return;
}
- case SERVICE_ACTIVATE: {
- final ServiceIdentifier serviceIdentifier =
readSvcId(input);
- protocolContext.receiveServiceActivate(serviceIdentifier);
- return;
- }
- case SERVICE_REQUEST: {
- final ServiceIdentifier serviceIdentifier =
readSvcId(input);
- final Class<?> requestType = (Class<?>)
input.readObject();
- final Class<?> replyType = (Class<?>)
input.readObject();
- final String serviceType = input.readUTF();
- final String serviceGroupName = input.readUTF();
- final Set<String> interceptors =
CollectionUtil.hashSet();
- int c = input.readInt();
- for (int i = 0; i < c; i ++) {
- interceptors.add(input.readUTF());
- }
- final ServiceLocator<?, ?> locator =
ServiceLocator.DEFAULT
- .setRequestType(requestType)
- .setReplyType(replyType)
- .setServiceType(serviceType)
- .setServiceGroupName(serviceGroupName)
- .setAvailableInterceptors(interceptors);
- protocolContext.receiveServiceRequest(serviceIdentifier,
locator);
- return;
- }
- case SERVICE_TERMINATE: {
- final ServiceIdentifier serviceIdentifier =
readSvcId(input);
+ case SERVICE_TERMINATE: {
+ final ServiceIdentifier serviceIdentifier =
(ServiceIdentifier) input.readObject();
protocolContext.receiveServiceTerminate(serviceIdentifier);
return;
}
case STREAM_DATA: {
- final StreamIdentifier streamIdentifier = readStrId(input);
+ final StreamIdentifier streamIdentifier = (StreamIdentifier)
input.readObject();
protocolContext.receiveStreamData(streamIdentifier, input);
return;
}
@@ -931,8 +903,6 @@
EXCEPTION,
REPLY,
REQUEST,
- SERVICE_ACTIVATE,
- SERVICE_REQUEST,
SERVICE_TERMINATE,
STREAM_DATA,
}
Deleted:
remoting3/trunk/jrpp/src/main/java/org/jboss/cx/remoting/jrpp/id/IdentifierManager.java
===================================================================
---
remoting3/trunk/jrpp/src/main/java/org/jboss/cx/remoting/jrpp/id/IdentifierManager.java 2008-02-25
13:35:25 UTC (rev 3501)
+++
remoting3/trunk/jrpp/src/main/java/org/jboss/cx/remoting/jrpp/id/IdentifierManager.java 2008-02-26
00:40:52 UTC (rev 3502)
@@ -1,27 +0,0 @@
-package org.jboss.cx.remoting.jrpp.id;
-
-import java.util.BitSet;
-
-/**
- *
- */
-public final class IdentifierManager {
- private final BitSet bitSet = new BitSet(64);
-
- public synchronized short getIdentifier() {
- final int id = bitSet.nextClearBit(1);
- if (id > 0xffff) {
- return 0;
- } else {
- return (short) id;
- }
- }
-
- public synchronized void freeIdentifier(short id) {
- bitSet.clear(id & 0xffff);
- }
-
- public void getIdentifier(final short id) {
-
- }
-}
Modified:
remoting3/trunk/jrpp/src/main/java/org/jboss/cx/remoting/jrpp/id/JrppContextIdentifier.java
===================================================================
---
remoting3/trunk/jrpp/src/main/java/org/jboss/cx/remoting/jrpp/id/JrppContextIdentifier.java 2008-02-25
13:35:25 UTC (rev 3501)
+++
remoting3/trunk/jrpp/src/main/java/org/jboss/cx/remoting/jrpp/id/JrppContextIdentifier.java 2008-02-26
00:40:52 UTC (rev 3502)
@@ -9,12 +9,11 @@
*/
@SuppressWarnings ({"EqualsAndHashcode"})
public final class JrppContextIdentifier extends JrppSubChannelIdentifier implements
ContextIdentifier {
- public JrppContextIdentifier(short id) throws IOException {
- super(id);
+ public JrppContextIdentifier() {
}
- public JrppContextIdentifier(ObjectInputStream ois) throws IOException {
- super(ois);
+ public JrppContextIdentifier(final boolean client, final int id) {
+ super(client, id);
}
public boolean equals(Object obj) {
Modified:
remoting3/trunk/jrpp/src/main/java/org/jboss/cx/remoting/jrpp/id/JrppRequestIdentifier.java
===================================================================
---
remoting3/trunk/jrpp/src/main/java/org/jboss/cx/remoting/jrpp/id/JrppRequestIdentifier.java 2008-02-25
13:35:25 UTC (rev 3501)
+++
remoting3/trunk/jrpp/src/main/java/org/jboss/cx/remoting/jrpp/id/JrppRequestIdentifier.java 2008-02-26
00:40:52 UTC (rev 3502)
@@ -9,12 +9,11 @@
*/
@SuppressWarnings ({"EqualsAndHashcode"})
public final class JrppRequestIdentifier extends JrppSubChannelIdentifier implements
RequestIdentifier {
- public JrppRequestIdentifier(short id) throws IOException {
- super(id);
+ public JrppRequestIdentifier() {
}
- public JrppRequestIdentifier(MessageInput input) throws IOException {
- super(input);
+ public JrppRequestIdentifier(final boolean client, final int id) {
+ super(client, id);
}
public boolean equals(Object obj) {
Modified:
remoting3/trunk/jrpp/src/main/java/org/jboss/cx/remoting/jrpp/id/JrppServiceIdentifier.java
===================================================================
---
remoting3/trunk/jrpp/src/main/java/org/jboss/cx/remoting/jrpp/id/JrppServiceIdentifier.java 2008-02-25
13:35:25 UTC (rev 3501)
+++
remoting3/trunk/jrpp/src/main/java/org/jboss/cx/remoting/jrpp/id/JrppServiceIdentifier.java 2008-02-26
00:40:52 UTC (rev 3502)
@@ -9,12 +9,11 @@
*/
@SuppressWarnings ({"EqualsAndHashcode"})
public final class JrppServiceIdentifier extends JrppSubChannelIdentifier implements
ServiceIdentifier {
- public JrppServiceIdentifier(short id) throws IOException {
- super(id);
+ public JrppServiceIdentifier() {
}
- public JrppServiceIdentifier(MessageInput input) throws IOException {
- super(input);
+ public JrppServiceIdentifier(final boolean client, final int id) {
+ super(client, id);
}
public boolean equals(Object obj) {
Modified:
remoting3/trunk/jrpp/src/main/java/org/jboss/cx/remoting/jrpp/id/JrppStreamIdentifier.java
===================================================================
---
remoting3/trunk/jrpp/src/main/java/org/jboss/cx/remoting/jrpp/id/JrppStreamIdentifier.java 2008-02-25
13:35:25 UTC (rev 3501)
+++
remoting3/trunk/jrpp/src/main/java/org/jboss/cx/remoting/jrpp/id/JrppStreamIdentifier.java 2008-02-26
00:40:52 UTC (rev 3502)
@@ -9,12 +9,11 @@
*/
@SuppressWarnings ({"EqualsAndHashcode"})
public final class JrppStreamIdentifier extends JrppSubChannelIdentifier implements
StreamIdentifier {
- public JrppStreamIdentifier(short id) throws IOException {
- super(id);
+ public JrppStreamIdentifier() {
}
- public JrppStreamIdentifier(ObjectInput input) throws IOException {
- super(input);
+ public JrppStreamIdentifier(final boolean client, final int id) {
+ super(client, id);
}
public boolean equals(Object obj) {
Modified:
remoting3/trunk/jrpp/src/main/java/org/jboss/cx/remoting/jrpp/id/JrppSubChannelIdentifier.java
===================================================================
---
remoting3/trunk/jrpp/src/main/java/org/jboss/cx/remoting/jrpp/id/JrppSubChannelIdentifier.java 2008-02-25
13:35:25 UTC (rev 3501)
+++
remoting3/trunk/jrpp/src/main/java/org/jboss/cx/remoting/jrpp/id/JrppSubChannelIdentifier.java 2008-02-26
00:40:52 UTC (rev 3502)
@@ -2,43 +2,48 @@
import java.io.IOException;
import java.io.ObjectInput;
-import java.util.concurrent.atomic.AtomicBoolean;
+import java.io.Externalizable;
+import java.io.ObjectOutput;
/**
*
*/
-public abstract class JrppSubChannelIdentifier {
- private final short id;
- private final AtomicBoolean dead = new AtomicBoolean(false);
+public abstract class JrppSubChannelIdentifier implements Externalizable {
+ private /*final*/ boolean client;
+ private /*final*/ int id;
- public JrppSubChannelIdentifier(short id) throws IOException {
+ protected JrppSubChannelIdentifier() {
+ }
+
+ protected JrppSubChannelIdentifier(final boolean client, final int id) {
+ if (id < 0) {
+ throw new IllegalArgumentException("id must be >= 0");
+ }
+ this.client = client;
this.id = id;
}
- public JrppSubChannelIdentifier(ObjectInput input) throws IOException {
- id = input.readShort();
+ public final void writeExternal(ObjectOutput out) throws IOException {
+ out.writeInt(id << 1 | (client ? 0 : 1));
}
- public void release(IdentifierManager manager) {
- if (!dead.getAndSet(true)) {
- manager.freeIdentifier(id);
- }
+ public final void readExternal(ObjectInput in) throws IOException,
ClassNotFoundException {
+ int i = in.readInt();
+ id = i >>> 1;
+ client = (i & 1) == 0;
}
- public short getId() {
- if (dead.get()) {
- throw new IllegalStateException("Read channel ID after close");
- }
+ public int getId() {
return id;
}
public boolean equals(Object obj) {
if (!(obj instanceof JrppSubChannelIdentifier)) return false;
JrppSubChannelIdentifier other = (JrppSubChannelIdentifier) obj;
- return !(dead.get() || other.dead.get()) && other.id == id;
+ return other.id == id && other.client == client;
}
public int hashCode() {
- return id;
+ return id << 1 | (client ? 0 : 1);
}
}
Modified: remoting3/trunk/standalone/src/main/java/org/jboss/cx/remoting/Remoting.java
===================================================================
---
remoting3/trunk/standalone/src/main/java/org/jboss/cx/remoting/Remoting.java 2008-02-25
13:35:25 UTC (rev 3501)
+++
remoting3/trunk/standalone/src/main/java/org/jboss/cx/remoting/Remoting.java 2008-02-26
00:40:52 UTC (rev 3502)
@@ -2,13 +2,12 @@
import java.io.IOException;
import java.net.URI;
-import org.jboss.cx.remoting.core.CoreEndpointProvider;
import org.jboss.cx.remoting.util.AttributeHashMap;
import org.jboss.cx.remoting.util.AttributeMap;
import org.jboss.cx.remoting.log.Logger;
-import org.jboss.cx.remoting.spi.EndpointProvider;
import org.jboss.cx.remoting.spi.wrapper.ContextSourceWrapper;
import org.jboss.cx.remoting.spi.wrapper.SessionWrapper;
+import org.jboss.cx.remoting.core.CoreEndpoint;
import javax.security.auth.callback.Callback;
import javax.security.auth.callback.CallbackHandler;
@@ -22,75 +21,16 @@
public final class Remoting {
private static final Logger log = Logger.getLogger(Remoting.class);
- private static final class EndpointProviderHolder {
- private static final EndpointProvider provider = new CoreEndpointProvider();
- }
-
public static Endpoint createEndpoint(String name) {
- return EndpointProviderHolder.provider.createEndpoint(name);
+ return null;
}
public static Session createEndpointAndSession(String endpointName, URI remoteUri,
final String userName, final char[] password) throws RemotingException {
- final Endpoint endpoint = createEndpoint(endpointName);
- boolean ok = false;
- final CallbackHandler callbackHandler = new CallbackHandler() {
- public void handle(Callback[] callbacks) throws IOException,
UnsupportedCallbackException {
- for (Callback callback : callbacks) {
- if (callback instanceof NameCallback) {
- ((NameCallback)callback).setName(userName);
- } else if (callback instanceof PasswordCallback) {
- ((PasswordCallback)callback).setPassword(password);
- } else {
- throw new UnsupportedCallbackException(callback);
- }
- }
- }
- };
- final AttributeMap attributeMap = new AttributeHashMap();
- attributeMap.put(CommonKeys.AUTH_CALLBACK_HANDLER, callbackHandler);
- try {
- final Session session = new SessionWrapper(endpoint.openSession(remoteUri,
attributeMap)) {
- public void close() throws RemotingException {
- try {
- super.close();
- } finally {
- endpoint.shutdown();
- }
- }
- };
- ok = true;
- return session;
- } finally {
- if (! ok) {
- endpoint.shutdown();
- }
- }
+ return null;
}
public static <I, O> ContextSource<I, O>
createEndpointAndOpenService(String endpointName, URI remoteUri, String userName, char[]
password, Class<I> requestType, Class<O> replyType, String serviceType, String
serviceGroupName) throws RemotingException {
- final Session session = createEndpointAndSession(endpointName, remoteUri,
userName, password);
- boolean ok = false;
- try {
- final ContextSource<I, O> service = new ContextSourceWrapper<I,
O>(session.openService(ServiceLocator.DEFAULT.setRequestType(requestType).setReplyType(replyType).setServiceGroupName(serviceGroupName).setServiceType(serviceType)))
{
- public void close() {
- try {
- super.close();
- } finally {
- try {
- session.close();
- } catch (RemotingException e) {
- log.error(e, "Failed to close Remoting session");
- }
- }
- }
- };
- ok = true;
- return service;
- } finally {
- if (! ok) {
- session.close();
- }
- }
+ return null;
}
// privates