[jboss-remoting-commits] JBoss Remoting SVN: r3666 - in remoting3/trunk: api/src/main/java/org/jboss/cx/remoting/spi/wrapper and 3 other directories.
jboss-remoting-commits at lists.jboss.org
jboss-remoting-commits at lists.jboss.org
Wed Mar 19 17:10:28 EDT 2008
Author: david.lloyd at jboss.com
Date: 2008-03-19 17:10:28 -0400 (Wed, 19 Mar 2008)
New Revision: 3666
Modified:
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/Endpoint.java
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/wrapper/EndpointWrapper.java
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/AbstractRealContext.java
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreEndpoint.java
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreOutboundContext.java
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreSession.java
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/protocol/LocalProtocolHandler.java
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/protocol/LocalProtocolHandlerFactory.java
remoting3/trunk/standalone/src/main/java/org/jboss/cx/remoting/Remoting.java
Log:
Get sessions to connect again
Modified: remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/Endpoint.java
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/Endpoint.java 2008-03-19 07:40:40 UTC (rev 3665)
+++ remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/Endpoint.java 2008-03-19 21:10:28 UTC (rev 3666)
@@ -32,13 +32,12 @@
*
* @param remoteUri the URI of the server to connect to
* @param attributeMap the attribute map to use to configure this session
- * @param rootContext the (local side of the) root context for the new session
* @return a new session
*
* @throws RemotingException if there is a problem creating the session, or if the request or reply type does not
* match the remote service
*/
- <I, O> Session openSession(URI remoteUri, AttributeMap attributeMap, Context<I, O> rootContext) throws RemotingException;
+ Session openSession(URI remoteUri, AttributeMap attributeMap) throws RemotingException;
/**
* Open an inbound session from another endpoint. Used by protocol handlers.
@@ -46,10 +45,9 @@
* You must have the TODO permission to invoke this method.
*
* @param handler the protocol handler to use
- * @param rootContext the (local side of the) root context for this session
* @return the protocol context
*/
- <I, O> ProtocolContext openIncomingSession(ProtocolHandler handler, Context<I, O> rootContext) throws RemotingException;
+ ProtocolContext openIncomingSession(ProtocolHandler handler) throws RemotingException;
/**
* Get the name of this endpoint.
Modified: remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/wrapper/EndpointWrapper.java
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/wrapper/EndpointWrapper.java 2008-03-19 07:40:40 UTC (rev 3665)
+++ remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/wrapper/EndpointWrapper.java 2008-03-19 21:10:28 UTC (rev 3666)
@@ -29,12 +29,12 @@
return delegate.getAttributes();
}
- public <I, O> Session openSession(final URI remoteUri, final AttributeMap attributeMap, final Context<I, O> rootContext) throws RemotingException {
- return delegate.openSession(remoteUri, attributeMap, rootContext);
+ public Session openSession(final URI remoteUri, final AttributeMap attributeMap) throws RemotingException {
+ return delegate.openSession(remoteUri, attributeMap);
}
- public <I, O> ProtocolContext openIncomingSession(final ProtocolHandler handler, final Context<I, O> rootContext) throws RemotingException {
- return delegate.openIncomingSession(handler, rootContext);
+ public ProtocolContext openIncomingSession(final ProtocolHandler handler) throws RemotingException {
+ return delegate.openIncomingSession(handler);
}
public String getName() {
Modified: remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/AbstractRealContext.java
===================================================================
--- remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/AbstractRealContext.java 2008-03-19 07:40:40 UTC (rev 3665)
+++ remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/AbstractRealContext.java 2008-03-19 21:10:28 UTC (rev 3666)
@@ -12,6 +12,9 @@
private ContextServer<I,O> contextServer;
protected AbstractRealContext(final ContextServer<I, O> contextServer) {
+ if (contextServer == null) {
+ throw new NullPointerException("contextServer is null");
+ }
this.contextServer = contextServer;
}
Modified: remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreEndpoint.java
===================================================================
--- remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreEndpoint.java 2008-03-19 07:40:40 UTC (rev 3665)
+++ remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreEndpoint.java 2008-03-19 21:10:28 UTC (rev 3666)
@@ -38,6 +38,7 @@
public final class CoreEndpoint {
private final String name;
+ private final RequestListener<?, ?> rootListener;
private final Endpoint userEndpoint = new UserEndpoint();
private final AtomicStateMachine<State> state = AtomicStateMachine.start(State.INITIAL);
@@ -54,8 +55,9 @@
DOWN,
}
- public CoreEndpoint(final String name) {
+ public CoreEndpoint(final String name, final RequestListener<?, ?> rootListener) {
this.name = name;
+ this.rootListener = rootListener;
}
private final ConcurrentMap<Object, Object> endpointMap = CollectionUtil.concurrentMap();
@@ -155,7 +157,7 @@
return endpointMap;
}
- public <I, O> Session openSession(final URI uri, final AttributeMap attributeMap, final Context<I, O> rootContext) throws RemotingException {
+ public Session openSession(final URI uri, final AttributeMap attributeMap) throws RemotingException {
final String scheme = uri.getScheme();
if (scheme == null) {
throw new RemotingException("No scheme on remote endpoint URI");
@@ -169,7 +171,7 @@
final ProtocolHandlerFactory factory = registration.getProtocolHandlerFactory();
try {
final CoreSession session = new CoreSession(CoreEndpoint.this);
- session.initializeClient(factory, uri, attributeMap, rootContext);
+ session.initializeClient(factory, uri, attributeMap, createContext(rootListener));
sessions.add(session);
return session.getUserSession();
} catch (IOException e) {
@@ -182,8 +184,16 @@
}
}
- public <I, O> ProtocolContext openIncomingSession(final ProtocolHandler handler, final Context<I, O> rootContext) throws RemotingException {
- return null;
+ public ProtocolContext openIncomingSession(final ProtocolHandler handler) throws RemotingException {
+ state.requireHold(State.UP);
+ try {
+ final CoreSession session = new CoreSession(CoreEndpoint.this);
+ session.initializeServer(handler, createContext(rootListener));
+ sessions.add(session);
+ return session.getProtocolContext();
+ } finally {
+ state.release();
+ }
}
public String getName() {
Modified: remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreOutboundContext.java
===================================================================
--- remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreOutboundContext.java 2008-03-19 07:40:40 UTC (rev 3665)
+++ remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreOutboundContext.java 2008-03-19 21:10:28 UTC (rev 3666)
@@ -22,10 +22,10 @@
private final ConcurrentMap<Object, Object> contextMap = CollectionUtil.concurrentMap();
private final AtomicStateMachine<State> state = AtomicStateMachine.start(State.INITIAL);
- private final Context<I, O> userContext = new UserContext();
private final ContextClient contextClient = new ContextClientImpl();
private final Executor executor;
+ private Context<I, O> userContext;
private ContextServer<I, O> contextServer;
public CoreOutboundContext(final Executor executor) {
@@ -35,6 +35,7 @@
public void initialize(final ContextServer<I, O> contextServer) {
state.requireTransitionExclusive(State.INITIAL, State.UP);
this.contextServer = contextServer;
+ userContext = new UserContext();
state.releaseExclusive();
}
@@ -67,10 +68,11 @@
}
@SuppressWarnings ({"SerializableInnerClassWithNonSerializableOuterClass"})
- public final class UserContext implements Context<I, O>, Serializable {
+ public final class UserContext extends AbstractRealContext<I, O> implements Serializable {
private static final long serialVersionUID = 1L;
private UserContext() {
+ super(contextServer);
}
private Object writeReplace() {
Modified: remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreSession.java
===================================================================
--- remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreSession.java 2008-03-19 07:40:40 UTC (rev 3665)
+++ remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreSession.java 2008-03-19 21:10:28 UTC (rev 3666)
@@ -103,21 +103,32 @@
// Initializers
private <I, O> void doInitialize(final ProtocolHandler protocolHandler, final Context<I, O> rootContext) {
+ if (protocolHandler == null) {
+ throw new NullPointerException("protocolHandler is null");
+ }
this.protocolHandler = protocolHandler;
if (rootContext instanceof AbstractRealContext) {
final AbstractRealContext<I, O> abstractRealContext = (AbstractRealContext<I, O>) rootContext;
// Forward local context
final ContextIdentifier localIdentifier = protocolHandler.getLocalRootContextIdentifier();
+ if (localIdentifier == null) {
+ throw new NullPointerException("localIdentifier is null");
+ }
final ProtocolContextClientImpl<I, O> contextClient = new ProtocolContextClientImpl<I, O>(localIdentifier);
serverContexts.put(localIdentifier, new ClientContextPair<I, O>(contextClient, abstractRealContext.getContextServer()));
+ log.trace("Initialized session with local context %s", localIdentifier);
}
// Forward remote context
final ContextIdentifier remoteIdentifier = protocolHandler.getRemoteRootContextIdentifier();
+ if (remoteIdentifier == null) {
+ throw new NullPointerException("remoteIdentifier is null");
+ }
final ProtocolContextServerImpl<I, O> contextServer = new ProtocolContextServerImpl<I,O>(remoteIdentifier);
clientContexts.put(remoteIdentifier, new WeakReference<ServerContextPair>(new ServerContextPair<I, O>(new BaseContextClient(), contextServer)));
final CoreOutboundContext<I, O> coreOutboundContext = new CoreOutboundContext<I, O>(executor);
coreOutboundContext.initialize(contextServer);
this.rootContext = coreOutboundContext.getUserContext();
+ log.trace("Initialized session with remote context %s", remoteIdentifier);
}
<I, O> void initializeServer(final ProtocolHandler protocolHandler, final Context<I, O> rootContext) {
@@ -172,12 +183,7 @@
private final ConcurrentMap<Object, Object> sessionMap = CollectionUtil.concurrentMap();
public void close() throws RemotingException {
- // todo -
- try {
- protocolHandler.closeSession();
- } catch (IOException e) {
- throw new RemotingException("Unable to close session: " + e.toString());
- }
+ shutdown();
// todo - should this be non-blocking?
state.waitFor(State.DOWN);
}
@@ -215,21 +221,49 @@
requestClient.handleReply((O)data);
}
+ // Lifecycle
+
+ private void shutdown() {
+ if (state.transition(State.UP, State.STOPPING)) {
+ try {
+ log.trace("Initiating session shutdown");
+ protocolHandler.closeSession();
+ } catch (IOException e) {
+ log.trace(e, "Protocol handler session close failed");
+ }
+ }
+ }
+
public final class ProtocolContextImpl implements ProtocolContext {
public void closeSession() {
- // todo ...
+ shutdown();
+ if (state.transition(State.STOPPING, State.DOWN)) {
+ log.trace("Session shut down");
+ }
}
public ObjectMessageOutput getMessageOutput(ByteMessageOutput target) throws IOException {
+ if (target == null) {
+ throw new NullPointerException("target is null");
+ }
return new ObjectMessageOutputImpl(target, streamDetectors, endpoint.getOrderedExecutor());
}
public ObjectMessageOutput getMessageOutput(ByteMessageOutput target, Executor streamExecutor) throws IOException {
+ if (target == null) {
+ throw new NullPointerException("target is null");
+ }
+ if (streamExecutor == null) {
+ throw new NullPointerException("streamExecutor is null");
+ }
return new ObjectMessageOutputImpl(target, streamDetectors, streamExecutor);
}
public ObjectMessageInput getMessageInput(ByteMessageInput source) throws IOException {
+ if (source == null) {
+ throw new NullPointerException("source is null");
+ }
return new ObjectMessageInputImpl(source);
}
@@ -238,6 +272,9 @@
}
public void receiveContextClose(ContextIdentifier remoteContextIdentifier, final boolean immediate, final boolean cancel, final boolean interrupt) {
+ if (remoteContextIdentifier == null) {
+ throw new NullPointerException("remoteContextIdentifier is null");
+ }
final ClientContextPair contextPair = serverContexts.remove(remoteContextIdentifier);
// todo - do the whole close operation
try {
@@ -248,11 +285,17 @@
}
public void closeStream(StreamIdentifier streamIdentifier) {
+ if (streamIdentifier == null) {
+ throw new NullPointerException("streamIdentifier is null");
+ }
final CoreStream coreStream = streams.remove(streamIdentifier);
// todo - shut down stream
}
public void receiveServiceClose(ServiceIdentifier serviceIdentifier) {
+ if (serviceIdentifier == null) {
+ throw new NullPointerException("serviceIdentifier is null");
+ }
final ClientServicePair servicePair = serverServices.remove(serviceIdentifier);
try {
servicePair.serviceServer.handleClose();
@@ -263,6 +306,12 @@
@SuppressWarnings ({"unchecked"})
public void receiveOpenedContext(ServiceIdentifier remoteServiceIdentifier, ContextIdentifier remoteContextIdentifier) {
+ if (remoteServiceIdentifier == null) {
+ throw new NullPointerException("remoteServiceIdentifier is null");
+ }
+ if (remoteContextIdentifier == null) {
+ throw new NullPointerException("remoteContextIdentifier is null");
+ }
try {
final ClientServicePair servicePair = serverServices.get(remoteServiceIdentifier);
final ProtocolContextClientImpl contextClient = new ProtocolContextClientImpl(remoteContextIdentifier);
@@ -275,6 +324,9 @@
}
public void receiveServiceClosing(ServiceIdentifier serviceIdentifier) {
+ if (serviceIdentifier == null) {
+ throw new NullPointerException("serviceIdentifier is null");
+ }
final WeakReference<ServerServicePair> ref = clientServices.get(serviceIdentifier);
final ServerServicePair servicePair = ref.get();
try {
@@ -285,6 +337,9 @@
}
public void receiveContextClosing(ContextIdentifier contextIdentifier, boolean done) {
+ if (contextIdentifier == null) {
+ throw new NullPointerException("contextIdentifier is null");
+ }
final WeakReference<ServerContextPair> ref = clientContexts.get(contextIdentifier);
final ServerContextPair contextPair = ref.get();
try {
@@ -295,6 +350,12 @@
}
public void receiveReply(ContextIdentifier contextIdentifier, RequestIdentifier requestIdentifier, Object reply) {
+ if (contextIdentifier == null) {
+ throw new NullPointerException("contextIdentifier is null");
+ }
+ if (requestIdentifier == null) {
+ throw new NullPointerException("requestIdentifier is null");
+ }
final WeakReference<ServerContextPair> ref = clientContexts.get(contextIdentifier);
final ServerContextPair contextPair = ref.get();
final RequestClient<?> requestClient = (RequestClient<?>) contextPair.contextServer.requests.get(requestIdentifier);
@@ -306,6 +367,15 @@
}
public void receiveException(ContextIdentifier contextIdentifier, RequestIdentifier requestIdentifier, RemoteExecutionException exception) {
+ if (contextIdentifier == null) {
+ throw new NullPointerException("contextIdentifier is null");
+ }
+ if (requestIdentifier == null) {
+ throw new NullPointerException("requestIdentifier is null");
+ }
+ if (exception == null) {
+ throw new NullPointerException("exception is null");
+ }
final WeakReference<ServerContextPair> ref = clientContexts.get(contextIdentifier);
final ServerContextPair contextPair = ref.get();
final RequestClient<?> requestClient = (RequestClient<?>) contextPair.contextServer.requests.get(requestIdentifier);
@@ -317,6 +387,12 @@
}
public void receiveCancelAcknowledge(ContextIdentifier contextIdentifier, RequestIdentifier requestIdentifier) {
+ if (contextIdentifier == null) {
+ throw new NullPointerException("contextIdentifier is null");
+ }
+ if (requestIdentifier == null) {
+ throw new NullPointerException("requestIdentifier is null");
+ }
final WeakReference<ServerContextPair> ref = clientContexts.get(contextIdentifier);
final ServerContextPair contextPair = ref.get();
final RequestClient<?> requestClient = (RequestClient<?>) contextPair.contextServer.requests.get(requestIdentifier);
@@ -328,6 +404,12 @@
}
public void receiveCancelRequest(ContextIdentifier remoteContextIdentifier, RequestIdentifier requestIdentifier, boolean mayInterrupt) {
+ if (remoteContextIdentifier == null) {
+ throw new NullPointerException("remoteContextIdentifier is null");
+ }
+ if (requestIdentifier == null) {
+ throw new NullPointerException("requestIdentifier is null");
+ }
final ClientContextPair contextPair = serverContexts.get(remoteContextIdentifier);
final RequestServer<?> requestServer = (RequestServer<?>) contextPair.contextClient.requests.get(requestIdentifier);
try {
@@ -338,6 +420,12 @@
}
public void receiveStreamData(StreamIdentifier streamIdentifier, ObjectMessageInput data) {
+ if (streamIdentifier == null) {
+ throw new NullPointerException("streamIdentifier is null");
+ }
+ if (data == null) {
+ throw new NullPointerException("data is null");
+ }
final CoreStream coreStream = streams.get(streamIdentifier);
coreStream.receiveStreamData(data);
}
@@ -355,19 +443,21 @@
@SuppressWarnings ({"unchecked"})
public void receiveRequest(final ContextIdentifier remoteContextIdentifier, final RequestIdentifier requestIdentifier, final Object request) {
+ if (remoteContextIdentifier == null) {
+ throw new NullPointerException("remoteContextIdentifier is null");
+ }
+ if (requestIdentifier == null) {
+ throw new NullPointerException("requestIdentifier is null");
+ }
final ClientContextPair contextPair = serverContexts.get(remoteContextIdentifier);
- final RequestServer requestServer = (RequestServer) contextPair.contextClient.requests.get(requestIdentifier);
+ if (contextPair == null) {
+ log.trace("Received a request on an unknown context %s", remoteContextIdentifier);
+ return;
+ }
try {
- if (requestServer != null) {
- requestServer.handleRequest(request, executor);
- } else {
- log.trace("Got a request on an unknown context identifier (%s)", remoteContextIdentifier);
- try {
- protocolHandler.sendException(remoteContextIdentifier, requestIdentifier, new RemoteExecutionException("Received a request on an invalid context"));
- } catch (IOException e) {
- log.trace("Failed to send exception: %s", e.getMessage());
- }
- }
+ final RequestClient client = contextPair.contextClient.addClient(requestIdentifier);
+ final RequestServer requestServer = contextPair.contextServer.createNewRequest(client);
+ requestServer.handleRequest(request, executor);
} catch (RemotingException e) {
e.printStackTrace();
}
@@ -447,6 +537,9 @@
final StreamSerializerFactory factory = detector.detectStream(testObject);
if (factory != null) {
final StreamIdentifier streamIdentifier = protocolHandler.openStream();
+ if (streamIdentifier == null) {
+ throw new NullPointerException("streamIdentifier is null");
+ }
final CoreStream stream = new CoreStream(CoreSession.this, streamExecutor, streamIdentifier, factory, testObject);
if (streams.putIfAbsent(streamIdentifier, stream) != null) {
throw new IOException("Duplicate stream identifier encountered: " + streamIdentifier);
@@ -587,6 +680,12 @@
private final ContextServer<I, O> contextServer;
private ClientContextPair(final ProtocolContextClientImpl<I, O> contextClient, final ContextServer<I, O> contextServer) {
+ if (contextClient == null) {
+ throw new NullPointerException("contextClient is null");
+ }
+ if (contextServer == null) {
+ throw new NullPointerException("contextServer is null");
+ }
this.contextClient = contextClient;
this.contextServer = contextServer;
}
@@ -597,6 +696,12 @@
private final ProtocolServiceServerImpl<I, O> serviceServer;
private ServerServicePair(final ServiceClient serviceClient, final ProtocolServiceServerImpl<I, O> serviceServer) {
+ if (serviceClient == null) {
+ throw new NullPointerException("serviceClient is null");
+ }
+ if (serviceServer == null) {
+ throw new NullPointerException("serviceServer is null");
+ }
this.serviceClient = serviceClient;
this.serviceServer = serviceServer;
}
@@ -607,6 +712,12 @@
private final ServiceServer<I, O> serviceServer;
private ClientServicePair(final ProtocolServiceClientImpl serviceClient, final ServiceServer<I, O> serviceServer) {
+ if (serviceClient == null) {
+ throw new NullPointerException("serviceClient is null");
+ }
+ if (serviceServer == null) {
+ throw new NullPointerException("serviceServer is null");
+ }
this.serviceClient = serviceClient;
this.serviceServer = serviceServer;
}
@@ -616,6 +727,9 @@
private final ServiceIdentifier serviceIdentifier;
public ProtocolServiceClientImpl(final ServiceIdentifier serviceIdentifier) {
+ if (serviceIdentifier == null) {
+ throw new NullPointerException("serviceIdentifier is null");
+ }
this.serviceIdentifier = serviceIdentifier;
}
@@ -650,6 +764,9 @@
public ContextServer<I, O> createNewContext(final ContextClient client) throws RemotingException {
try {
final ContextIdentifier contextIdentifier = protocolHandler.openContext(serviceIdentifier);
+ if (contextIdentifier == null) {
+ throw new NullPointerException("contextIdentifier is null");
+ }
clientContexts.put(contextIdentifier, new WeakReference<ServerContextPair>(new ServerContextPair<I, O>(client, new ProtocolContextServerImpl<I, O>(contextIdentifier))));
return new ProtocolContextServerImpl<I, O>(contextIdentifier);
} catch (RemotingException e) {
@@ -742,6 +859,9 @@
public RequestServer<I> createNewRequest(final RequestClient<O> requestClient) throws RemotingException {
try {
final RequestIdentifier requestIdentifier = protocolHandler.openRequest(contextIdentifier);
+ if (requestIdentifier == null) {
+ throw new NullPointerException("requestIdentifier is null");
+ }
requests.put(requestIdentifier, requestClient);
return new ProtocolRequestServerImpl(requestIdentifier);
} catch (RemotingException e) {
Modified: remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/protocol/LocalProtocolHandler.java
===================================================================
--- remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/protocol/LocalProtocolHandler.java 2008-03-19 07:40:40 UTC (rev 3665)
+++ remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/protocol/LocalProtocolHandler.java 2008-03-19 21:10:28 UTC (rev 3666)
@@ -6,6 +6,8 @@
import org.jboss.cx.remoting.spi.protocol.RequestIdentifier;
import org.jboss.cx.remoting.spi.protocol.ServiceIdentifier;
import org.jboss.cx.remoting.spi.protocol.StreamIdentifier;
+import org.jboss.cx.remoting.spi.protocol.SimpleContextIdentifier;
+import org.jboss.cx.remoting.spi.protocol.SimpleRequestIdentifier;
import org.jboss.cx.remoting.spi.ObjectMessageOutput;
import org.jboss.cx.remoting.util.AttributeMap;
import org.jboss.cx.remoting.RemoteExecutionException;
@@ -17,31 +19,41 @@
*
*/
public final class LocalProtocolHandler implements ProtocolHandler {
- public LocalProtocolHandler(final ProtocolContext context, final URI remoteUri, final AttributeMap attributeMap) {
+ private final ProtocolContext target;
+ private String remoteEndpointName;
+ private static final ContextIdentifier ROOT_IDENTIFIER = new SimpleContextIdentifier();
+ public LocalProtocolHandler(final ProtocolContext target, final String remoteEndpointName) {
+ this.target = target;
+ this.remoteEndpointName = remoteEndpointName;
}
public void sendReply(final ContextIdentifier remoteContextIdentifier, final RequestIdentifier requestIdentifier, final Object reply) throws IOException {
+ target.receiveReply(remoteContextIdentifier, requestIdentifier, reply);
}
public void sendException(final ContextIdentifier remoteContextIdentifier, final RequestIdentifier requestIdentifier, final RemoteExecutionException exception) throws IOException {
+ target.receiveException(remoteContextIdentifier, requestIdentifier, exception);
}
public void sendCancelAcknowledge(final ContextIdentifier remoteContextIdentifier, final RequestIdentifier requestIdentifier) throws IOException {
+ target.receiveCancelAcknowledge(remoteContextIdentifier, requestIdentifier);
}
public void sendServiceClosing(final ServiceIdentifier remoteServiceIdentifier) throws IOException {
+ target.receiveServiceClosing(remoteServiceIdentifier);
}
public void sendContextClosing(final ContextIdentifier remoteContextIdentifier, final boolean done) throws IOException {
+ target.receiveContextClosing(remoteContextIdentifier, done);
}
public ContextIdentifier getLocalRootContextIdentifier() {
- return null;
+ return ROOT_IDENTIFIER;
}
public ContextIdentifier getRemoteRootContextIdentifier() {
- return null;
+ return ROOT_IDENTIFIER;
}
public ContextIdentifier openContext(final ServiceIdentifier serviceIdentifier) throws IOException {
@@ -49,19 +61,23 @@
}
public void sendContextClose(final ContextIdentifier contextIdentifier, final boolean immediate, final boolean cancel, final boolean interrupt) throws IOException {
+ target.receiveContextClose(contextIdentifier, immediate, cancel, interrupt);
}
public RequestIdentifier openRequest(final ContextIdentifier contextIdentifier) throws IOException {
- return null;
+ return new SimpleRequestIdentifier();
}
public void sendServiceClose(final ServiceIdentifier serviceIdentifier) throws IOException {
+ target.receiveServiceClose(serviceIdentifier);
}
public void sendRequest(final ContextIdentifier contextIdentifier, final RequestIdentifier requestIdentifier, final Object request, final Executor streamExecutor) throws IOException {
+ target.receiveRequest(contextIdentifier, requestIdentifier, request);
}
public void sendCancelRequest(final ContextIdentifier contextIdentifier, final RequestIdentifier requestIdentifier, final boolean mayInterrupt) throws IOException {
+ target.receiveCancelRequest(contextIdentifier, requestIdentifier, mayInterrupt);
}
public ContextIdentifier openContext() throws IOException {
@@ -77,16 +93,19 @@
}
public void closeStream(final StreamIdentifier streamIdentifier) throws IOException {
+ // N/A
}
public ObjectMessageOutput sendStreamData(final StreamIdentifier streamIdentifier, final Executor streamExecutor) throws IOException {
+ // N/A
return null;
}
public void closeSession() throws IOException {
+ target.closeSession();
}
public String getRemoteEndpointName() {
- return null;
+ return remoteEndpointName;
}
}
Modified: remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/protocol/LocalProtocolHandlerFactory.java
===================================================================
--- remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/protocol/LocalProtocolHandlerFactory.java 2008-03-19 07:40:40 UTC (rev 3665)
+++ remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/protocol/LocalProtocolHandlerFactory.java 2008-03-19 21:10:28 UTC (rev 3666)
@@ -18,20 +18,38 @@
public final class LocalProtocolHandlerFactory implements ProtocolHandlerFactory {
@SuppressWarnings ({"UnusedDeclaration"})
private final Endpoint endpoint;
+ private final String endpointName;
public LocalProtocolHandlerFactory(final Endpoint endpoint) {
this.endpoint = endpoint;
+ endpointName = endpoint.getName();
}
- private static final ConcurrentMap<String, LocalProtocolHandlerFactory> endpoints = CollectionUtil.concurrentMap();
+ private static final ConcurrentMap<String, Endpoint> endpoints = CollectionUtil.concurrentMap();
public boolean isLocal(final URI uri) {
return true;
}
- public ProtocolHandler createHandler(final ProtocolContext context, final URI remoteUri, final AttributeMap attributeMap) throws IOException {
-
- return new LocalProtocolHandler(context, remoteUri, attributeMap);
+ public ProtocolHandler createHandler(final ProtocolContext ourProtocolContext, final URI remoteUri, final AttributeMap attributeMap) throws IOException {
+ final String part = remoteUri.getSchemeSpecificPart();
+ final int index = part.indexOf(':');
+ final String otherEndpointName;
+ if (index == -1) {
+ otherEndpointName = part;
+ } else {
+ otherEndpointName = part.substring(0, index);
+ }
+ final Endpoint otherEndpoint = endpoints.get(otherEndpointName);
+ if (otherEndpoint == null) {
+ throw new RemotingException("No such local endpoint '" + otherEndpoint + "'");
+ }
+ final LocalProtocolHandler otherProtocolHandler = new LocalProtocolHandler(ourProtocolContext, otherEndpointName);
+ final ProtocolContext otherProtocolContext = otherEndpoint.openIncomingSession(otherProtocolHandler);
+ final LocalProtocolHandler ourProtocolHandler = new LocalProtocolHandler(otherProtocolContext, endpointName);
+ otherProtocolContext.openSession(endpointName);
+ ourProtocolContext.openSession(otherEndpointName);
+ return ourProtocolHandler;
}
public void close() {
@@ -43,6 +61,6 @@
final LocalProtocolHandlerFactory handlerFactory = new LocalProtocolHandlerFactory(endpoint);
final Registration registration = endpoint.registerProtocol("local", handlerFactory);
registration.start();
- endpoints.putIfAbsent(name, handlerFactory);
+ endpoints.putIfAbsent(name, endpoint);
}
}
Modified: remoting3/trunk/standalone/src/main/java/org/jboss/cx/remoting/Remoting.java
===================================================================
--- remoting3/trunk/standalone/src/main/java/org/jboss/cx/remoting/Remoting.java 2008-03-19 07:40:40 UTC (rev 3665)
+++ remoting3/trunk/standalone/src/main/java/org/jboss/cx/remoting/Remoting.java 2008-03-19 21:10:28 UTC (rev 3666)
@@ -13,8 +13,8 @@
public final class Remoting {
private static final Logger log = Logger.getLogger(Remoting.class);
- public static Endpoint createEndpoint(String name) throws RemotingException {
- final CoreEndpoint coreEndpoint = new CoreEndpoint(name);
+ public static <I, O> Endpoint createEndpoint(String name, RequestListener<I, O> listener) throws RemotingException {
+ final CoreEndpoint coreEndpoint = new CoreEndpoint(name, listener);
final ExecutorService executorService = Executors.newCachedThreadPool();
coreEndpoint.setExecutor(executorService);
coreEndpoint.start();
More information about the jboss-remoting-commits
mailing list