[jboss-remoting-commits] JBoss Remoting SVN: r3666 - in remoting3/trunk: api/src/main/java/org/jboss/cx/remoting/spi/wrapper and 3 other directories.

jboss-remoting-commits at lists.jboss.org jboss-remoting-commits at lists.jboss.org
Wed Mar 19 17:10:28 EDT 2008


Author: david.lloyd at jboss.com
Date: 2008-03-19 17:10:28 -0400 (Wed, 19 Mar 2008)
New Revision: 3666

Modified:
   remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/Endpoint.java
   remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/wrapper/EndpointWrapper.java
   remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/AbstractRealContext.java
   remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreEndpoint.java
   remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreOutboundContext.java
   remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreSession.java
   remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/protocol/LocalProtocolHandler.java
   remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/protocol/LocalProtocolHandlerFactory.java
   remoting3/trunk/standalone/src/main/java/org/jboss/cx/remoting/Remoting.java
Log:
Get sessions to connect again

Modified: remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/Endpoint.java
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/Endpoint.java	2008-03-19 07:40:40 UTC (rev 3665)
+++ remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/Endpoint.java	2008-03-19 21:10:28 UTC (rev 3666)
@@ -32,13 +32,12 @@
      *
      * @param remoteUri the URI of the server to connect to
      * @param attributeMap the attribute map to use to configure this session
-     * @param rootContext the (local side of the) root context for the new session
      * @return a new session
      *
      * @throws RemotingException if there is a problem creating the session, or if the request or reply type does not
      * match the remote service
      */
-    <I, O> Session openSession(URI remoteUri, AttributeMap attributeMap, Context<I, O> rootContext) throws RemotingException;
+    Session openSession(URI remoteUri, AttributeMap attributeMap) throws RemotingException;
 
     /**
      * Open an inbound session from another endpoint.  Used by protocol handlers.
@@ -46,10 +45,9 @@
      * You must have the TODO permission to invoke this method.
      *
      * @param handler the protocol handler to use
-     * @param rootContext the (local side of the) root context for this session
      * @return the protocol context
      */
-    <I, O> ProtocolContext openIncomingSession(ProtocolHandler handler, Context<I, O> rootContext) throws RemotingException;
+    ProtocolContext openIncomingSession(ProtocolHandler handler) throws RemotingException;
 
     /**
      * Get the name of this endpoint.

Modified: remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/wrapper/EndpointWrapper.java
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/wrapper/EndpointWrapper.java	2008-03-19 07:40:40 UTC (rev 3665)
+++ remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/wrapper/EndpointWrapper.java	2008-03-19 21:10:28 UTC (rev 3666)
@@ -29,12 +29,12 @@
         return delegate.getAttributes();
     }
 
-    public <I, O> Session openSession(final URI remoteUri, final AttributeMap attributeMap, final Context<I, O> rootContext) throws RemotingException {
-        return delegate.openSession(remoteUri, attributeMap, rootContext);
+    public Session openSession(final URI remoteUri, final AttributeMap attributeMap) throws RemotingException {
+        return delegate.openSession(remoteUri, attributeMap);
     }
 
-    public <I, O> ProtocolContext openIncomingSession(final ProtocolHandler handler, final Context<I, O> rootContext) throws RemotingException {
-        return delegate.openIncomingSession(handler, rootContext);
+    public ProtocolContext openIncomingSession(final ProtocolHandler handler) throws RemotingException {
+        return delegate.openIncomingSession(handler);
     }
 
     public String getName() {

Modified: remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/AbstractRealContext.java
===================================================================
--- remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/AbstractRealContext.java	2008-03-19 07:40:40 UTC (rev 3665)
+++ remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/AbstractRealContext.java	2008-03-19 21:10:28 UTC (rev 3666)
@@ -12,6 +12,9 @@
     private ContextServer<I,O> contextServer;
 
     protected AbstractRealContext(final ContextServer<I, O> contextServer) {
+        if (contextServer == null) {
+            throw new NullPointerException("contextServer is null");
+        }
         this.contextServer = contextServer;
     }
 

Modified: remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreEndpoint.java
===================================================================
--- remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreEndpoint.java	2008-03-19 07:40:40 UTC (rev 3665)
+++ remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreEndpoint.java	2008-03-19 21:10:28 UTC (rev 3666)
@@ -38,6 +38,7 @@
 public final class CoreEndpoint {
 
     private final String name;
+    private final RequestListener<?, ?> rootListener;
     private final Endpoint userEndpoint = new UserEndpoint();
     private final AtomicStateMachine<State> state = AtomicStateMachine.start(State.INITIAL);
 
@@ -54,8 +55,9 @@
         DOWN,
     }
 
-    public CoreEndpoint(final String name) {
+    public CoreEndpoint(final String name, final RequestListener<?, ?> rootListener) {
         this.name = name;
+        this.rootListener = rootListener;
     }
 
     private final ConcurrentMap<Object, Object> endpointMap = CollectionUtil.concurrentMap();
@@ -155,7 +157,7 @@
             return endpointMap;
         }
 
-        public <I, O> Session openSession(final URI uri, final AttributeMap attributeMap, final Context<I, O> rootContext) throws RemotingException {
+        public Session openSession(final URI uri, final AttributeMap attributeMap) throws RemotingException {
             final String scheme = uri.getScheme();
             if (scheme == null) {
                 throw new RemotingException("No scheme on remote endpoint URI");
@@ -169,7 +171,7 @@
                 final ProtocolHandlerFactory factory = registration.getProtocolHandlerFactory();
                 try {
                     final CoreSession session = new CoreSession(CoreEndpoint.this);
-                    session.initializeClient(factory, uri, attributeMap, rootContext);
+                    session.initializeClient(factory, uri, attributeMap, createContext(rootListener));
                     sessions.add(session);
                     return session.getUserSession();
                 } catch (IOException e) {
@@ -182,8 +184,16 @@
             }
         }
 
-        public <I, O> ProtocolContext openIncomingSession(final ProtocolHandler handler, final Context<I, O> rootContext) throws RemotingException {
-            return null;
+        public ProtocolContext openIncomingSession(final ProtocolHandler handler) throws RemotingException {
+            state.requireHold(State.UP);
+            try {
+                final CoreSession session = new CoreSession(CoreEndpoint.this);
+                session.initializeServer(handler, createContext(rootListener));
+                sessions.add(session);
+                return session.getProtocolContext();
+            } finally {
+                state.release();
+            }
         }
 
         public String getName() {

Modified: remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreOutboundContext.java
===================================================================
--- remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreOutboundContext.java	2008-03-19 07:40:40 UTC (rev 3665)
+++ remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreOutboundContext.java	2008-03-19 21:10:28 UTC (rev 3666)
@@ -22,10 +22,10 @@
 
     private final ConcurrentMap<Object, Object> contextMap = CollectionUtil.concurrentMap();
     private final AtomicStateMachine<State> state = AtomicStateMachine.start(State.INITIAL);
-    private final Context<I, O> userContext = new UserContext();
     private final ContextClient contextClient = new ContextClientImpl();
     private final Executor executor;
 
+    private Context<I, O> userContext;
     private ContextServer<I, O> contextServer;
     
     public CoreOutboundContext(final Executor executor) {
@@ -35,6 +35,7 @@
     public void initialize(final ContextServer<I, O> contextServer) {
         state.requireTransitionExclusive(State.INITIAL, State.UP);
         this.contextServer = contextServer;
+        userContext = new UserContext();
         state.releaseExclusive();
     }
 
@@ -67,10 +68,11 @@
     }
 
     @SuppressWarnings ({"SerializableInnerClassWithNonSerializableOuterClass"})
-    public final class UserContext implements Context<I, O>, Serializable {
+    public final class UserContext extends AbstractRealContext<I, O> implements Serializable {
         private static final long serialVersionUID = 1L;
 
         private UserContext() {
+            super(contextServer);
         }
 
         private Object writeReplace() {

Modified: remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreSession.java
===================================================================
--- remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreSession.java	2008-03-19 07:40:40 UTC (rev 3665)
+++ remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreSession.java	2008-03-19 21:10:28 UTC (rev 3666)
@@ -103,21 +103,32 @@
     // Initializers
 
     private <I, O> void doInitialize(final ProtocolHandler protocolHandler, final Context<I, O> rootContext) {
+        if (protocolHandler == null) {
+            throw new NullPointerException("protocolHandler is null");
+        }
         this.protocolHandler = protocolHandler;
         if (rootContext instanceof AbstractRealContext) {
             final AbstractRealContext<I, O> abstractRealContext = (AbstractRealContext<I, O>) rootContext;
             // Forward local context
             final ContextIdentifier localIdentifier = protocolHandler.getLocalRootContextIdentifier();
+            if (localIdentifier == null) {
+                throw new NullPointerException("localIdentifier is null");
+            }
             final ProtocolContextClientImpl<I, O> contextClient = new ProtocolContextClientImpl<I, O>(localIdentifier);
             serverContexts.put(localIdentifier, new ClientContextPair<I, O>(contextClient, abstractRealContext.getContextServer()));
+            log.trace("Initialized session with local context %s", localIdentifier);
         }
         // Forward remote context
         final ContextIdentifier remoteIdentifier = protocolHandler.getRemoteRootContextIdentifier();
+        if (remoteIdentifier == null) {
+            throw new NullPointerException("remoteIdentifier is null");
+        }
         final ProtocolContextServerImpl<I, O> contextServer = new ProtocolContextServerImpl<I,O>(remoteIdentifier);
         clientContexts.put(remoteIdentifier, new WeakReference<ServerContextPair>(new ServerContextPair<I, O>(new BaseContextClient(), contextServer)));
         final CoreOutboundContext<I, O> coreOutboundContext = new CoreOutboundContext<I, O>(executor);
         coreOutboundContext.initialize(contextServer);
         this.rootContext = coreOutboundContext.getUserContext();
+        log.trace("Initialized session with remote context %s", remoteIdentifier);
     }
 
     <I, O> void initializeServer(final ProtocolHandler protocolHandler, final Context<I, O> rootContext) {
@@ -172,12 +183,7 @@
         private final ConcurrentMap<Object, Object> sessionMap = CollectionUtil.concurrentMap();
 
         public void close() throws RemotingException {
-            // todo -
-            try {
-                protocolHandler.closeSession();
-            } catch (IOException e) {
-                throw new RemotingException("Unable to close session: " + e.toString());
-            }
+            shutdown();
             // todo - should this be non-blocking?
             state.waitFor(State.DOWN);
         }
@@ -215,21 +221,49 @@
         requestClient.handleReply((O)data);
     }
 
+    // Lifecycle
+
+    private void shutdown() {
+        if (state.transition(State.UP, State.STOPPING)) {
+            try {
+                log.trace("Initiating session shutdown");
+                protocolHandler.closeSession();
+            } catch (IOException e) {
+                log.trace(e, "Protocol handler session close failed");
+            }
+        }
+    }
+
     public final class ProtocolContextImpl implements ProtocolContext {
 
         public void closeSession() {
-            // todo ...
+            shutdown();
+            if (state.transition(State.STOPPING, State.DOWN)) {
+                log.trace("Session shut down");
+            }
         }
 
         public ObjectMessageOutput getMessageOutput(ByteMessageOutput target) throws IOException {
+            if (target == null) {
+                throw new NullPointerException("target is null");
+            }
             return new ObjectMessageOutputImpl(target, streamDetectors, endpoint.getOrderedExecutor());
         }
 
         public ObjectMessageOutput getMessageOutput(ByteMessageOutput target, Executor streamExecutor) throws IOException {
+            if (target == null) {
+                throw new NullPointerException("target is null");
+            }
+            if (streamExecutor == null) {
+                throw new NullPointerException("streamExecutor is null");
+            }
             return new ObjectMessageOutputImpl(target, streamDetectors, streamExecutor);
         }
 
         public ObjectMessageInput getMessageInput(ByteMessageInput source) throws IOException {
+            if (source == null) {
+                throw new NullPointerException("source is null");
+            }
             return new ObjectMessageInputImpl(source);
         }
 
@@ -238,6 +272,9 @@
         }
 
         public void receiveContextClose(ContextIdentifier remoteContextIdentifier, final boolean immediate, final boolean cancel, final boolean interrupt) {
+            if (remoteContextIdentifier == null) {
+                throw new NullPointerException("remoteContextIdentifier is null");
+            }
             final ClientContextPair contextPair = serverContexts.remove(remoteContextIdentifier);
             // todo - do the whole close operation
             try {
@@ -248,11 +285,17 @@
         }
 
         public void closeStream(StreamIdentifier streamIdentifier) {
+            if (streamIdentifier == null) {
+                throw new NullPointerException("streamIdentifier is null");
+            }
             final CoreStream coreStream = streams.remove(streamIdentifier);
             // todo - shut down stream
         }
 
         public void receiveServiceClose(ServiceIdentifier serviceIdentifier) {
+            if (serviceIdentifier == null) {
+                throw new NullPointerException("serviceIdentifier is null");
+            }
             final ClientServicePair servicePair = serverServices.remove(serviceIdentifier);
             try {
                 servicePair.serviceServer.handleClose();
@@ -263,6 +306,12 @@
 
         @SuppressWarnings ({"unchecked"})
         public void receiveOpenedContext(ServiceIdentifier remoteServiceIdentifier, ContextIdentifier remoteContextIdentifier) {
+            if (remoteServiceIdentifier == null) {
+                throw new NullPointerException("remoteServiceIdentifier is null");
+            }
+            if (remoteContextIdentifier == null) {
+                throw new NullPointerException("remoteContextIdentifier is null");
+            }
             try {
                 final ClientServicePair servicePair = serverServices.get(remoteServiceIdentifier);
                 final ProtocolContextClientImpl contextClient = new ProtocolContextClientImpl(remoteContextIdentifier);
@@ -275,6 +324,9 @@
         }
 
         public void receiveServiceClosing(ServiceIdentifier serviceIdentifier) {
+            if (serviceIdentifier == null) {
+                throw new NullPointerException("serviceIdentifier is null");
+            }
             final WeakReference<ServerServicePair> ref = clientServices.get(serviceIdentifier);
             final ServerServicePair servicePair = ref.get();
             try {
@@ -285,6 +337,9 @@
         }
 
         public void receiveContextClosing(ContextIdentifier contextIdentifier, boolean done) {
+            if (contextIdentifier == null) {
+                throw new NullPointerException("contextIdentifier is null");
+            }
             final WeakReference<ServerContextPair> ref = clientContexts.get(contextIdentifier);
             final ServerContextPair contextPair = ref.get();
             try {
@@ -295,6 +350,12 @@
         }
 
         public void receiveReply(ContextIdentifier contextIdentifier, RequestIdentifier requestIdentifier, Object reply) {
+            if (contextIdentifier == null) {
+                throw new NullPointerException("contextIdentifier is null");
+            }
+            if (requestIdentifier == null) {
+                throw new NullPointerException("requestIdentifier is null");
+            }
             final WeakReference<ServerContextPair> ref = clientContexts.get(contextIdentifier);
             final ServerContextPair contextPair = ref.get();
             final RequestClient<?> requestClient = (RequestClient<?>) contextPair.contextServer.requests.get(requestIdentifier);
@@ -306,6 +367,15 @@
         }
 
         public void receiveException(ContextIdentifier contextIdentifier, RequestIdentifier requestIdentifier, RemoteExecutionException exception) {
+            if (contextIdentifier == null) {
+                throw new NullPointerException("contextIdentifier is null");
+            }
+            if (requestIdentifier == null) {
+                throw new NullPointerException("requestIdentifier is null");
+            }
+            if (exception == null) {
+                throw new NullPointerException("exception is null");
+            }
             final WeakReference<ServerContextPair> ref = clientContexts.get(contextIdentifier);
             final ServerContextPair contextPair = ref.get();
             final RequestClient<?> requestClient = (RequestClient<?>) contextPair.contextServer.requests.get(requestIdentifier);
@@ -317,6 +387,12 @@
         }
 
         public void receiveCancelAcknowledge(ContextIdentifier contextIdentifier, RequestIdentifier requestIdentifier) {
+            if (contextIdentifier == null) {
+                throw new NullPointerException("contextIdentifier is null");
+            }
+            if (requestIdentifier == null) {
+                throw new NullPointerException("requestIdentifier is null");
+            }
             final WeakReference<ServerContextPair> ref = clientContexts.get(contextIdentifier);
             final ServerContextPair contextPair = ref.get();
             final RequestClient<?> requestClient = (RequestClient<?>) contextPair.contextServer.requests.get(requestIdentifier);
@@ -328,6 +404,12 @@
         }
 
         public void receiveCancelRequest(ContextIdentifier remoteContextIdentifier, RequestIdentifier requestIdentifier, boolean mayInterrupt) {
+            if (remoteContextIdentifier == null) {
+                throw new NullPointerException("remoteContextIdentifier is null");
+            }
+            if (requestIdentifier == null) {
+                throw new NullPointerException("requestIdentifier is null");
+            }
             final ClientContextPair contextPair = serverContexts.get(remoteContextIdentifier);
             final RequestServer<?> requestServer = (RequestServer<?>) contextPair.contextClient.requests.get(requestIdentifier);
             try {
@@ -338,6 +420,12 @@
         }
 
         public void receiveStreamData(StreamIdentifier streamIdentifier, ObjectMessageInput data) {
+            if (streamIdentifier == null) {
+                throw new NullPointerException("streamIdentifier is null");
+            }
+            if (data == null) {
+                throw new NullPointerException("data is null");
+            }
             final CoreStream coreStream = streams.get(streamIdentifier);
             coreStream.receiveStreamData(data);
         }
@@ -355,19 +443,21 @@
 
         @SuppressWarnings ({"unchecked"})
         public void receiveRequest(final ContextIdentifier remoteContextIdentifier, final RequestIdentifier requestIdentifier, final Object request) {
+            if (remoteContextIdentifier == null) {
+                throw new NullPointerException("remoteContextIdentifier is null");
+            }
+            if (requestIdentifier == null) {
+                throw new NullPointerException("requestIdentifier is null");
+            }
             final ClientContextPair contextPair = serverContexts.get(remoteContextIdentifier);
-            final RequestServer requestServer = (RequestServer) contextPair.contextClient.requests.get(requestIdentifier);
+            if (contextPair == null) {
+                log.trace("Received a request on an unknown context %s", remoteContextIdentifier);
+                return;
+            }
             try {
-                if (requestServer != null) {
-                    requestServer.handleRequest(request, executor);
-                } else {
-                    log.trace("Got a request on an unknown context identifier (%s)", remoteContextIdentifier);
-                    try {
-                        protocolHandler.sendException(remoteContextIdentifier, requestIdentifier, new RemoteExecutionException("Received a request on an invalid context"));
-                    } catch (IOException e) {
-                        log.trace("Failed to send exception: %s", e.getMessage());
-                    }
-                }
+                final RequestClient client = contextPair.contextClient.addClient(requestIdentifier);
+                final RequestServer requestServer = contextPair.contextServer.createNewRequest(client);
+                requestServer.handleRequest(request, executor);
             } catch (RemotingException e) {
                 e.printStackTrace();
             }
@@ -447,6 +537,9 @@
                 final StreamSerializerFactory factory = detector.detectStream(testObject);
                 if (factory != null) {
                     final StreamIdentifier streamIdentifier = protocolHandler.openStream();
+                    if (streamIdentifier == null) {
+                        throw new NullPointerException("streamIdentifier is null");
+                    }
                     final CoreStream stream = new CoreStream(CoreSession.this, streamExecutor, streamIdentifier, factory, testObject);
                     if (streams.putIfAbsent(streamIdentifier, stream) != null) {
                         throw new IOException("Duplicate stream identifier encountered: " + streamIdentifier);
@@ -587,6 +680,12 @@
         private final ContextServer<I, O> contextServer;
 
         private ClientContextPair(final ProtocolContextClientImpl<I, O> contextClient, final ContextServer<I, O> contextServer) {
+            if (contextClient == null) {
+                throw new NullPointerException("contextClient is null");
+            }
+            if (contextServer == null) {
+                throw new NullPointerException("contextServer is null");
+            }
             this.contextClient = contextClient;
             this.contextServer = contextServer;
         }
@@ -597,6 +696,12 @@
         private final ProtocolServiceServerImpl<I, O> serviceServer;
 
         private ServerServicePair(final ServiceClient serviceClient, final ProtocolServiceServerImpl<I, O> serviceServer) {
+            if (serviceClient == null) {
+                throw new NullPointerException("serviceClient is null");
+            }
+            if (serviceServer == null) {
+                throw new NullPointerException("serviceServer is null");
+            }
             this.serviceClient = serviceClient;
             this.serviceServer = serviceServer;
         }
@@ -607,6 +712,12 @@
         private final ServiceServer<I, O> serviceServer;
 
         private ClientServicePair(final ProtocolServiceClientImpl serviceClient, final ServiceServer<I, O> serviceServer) {
+            if (serviceClient == null) {
+                throw new NullPointerException("serviceClient is null");
+            }
+            if (serviceServer == null) {
+                throw new NullPointerException("serviceServer is null");
+            }
             this.serviceClient = serviceClient;
             this.serviceServer = serviceServer;
         }
@@ -616,6 +727,9 @@
         private final ServiceIdentifier serviceIdentifier;
 
         public ProtocolServiceClientImpl(final ServiceIdentifier serviceIdentifier) {
+            if (serviceIdentifier == null) {
+                throw new NullPointerException("serviceIdentifier is null");
+            }
             this.serviceIdentifier = serviceIdentifier;
         }
 
@@ -650,6 +764,9 @@
         public ContextServer<I, O> createNewContext(final ContextClient client) throws RemotingException {
             try {
                 final ContextIdentifier contextIdentifier = protocolHandler.openContext(serviceIdentifier);
+                if (contextIdentifier == null) {
+                    throw new NullPointerException("contextIdentifier is null");
+                }
                 clientContexts.put(contextIdentifier, new WeakReference<ServerContextPair>(new ServerContextPair<I, O>(client, new ProtocolContextServerImpl<I, O>(contextIdentifier))));
                 return new ProtocolContextServerImpl<I, O>(contextIdentifier);
             } catch (RemotingException e) {
@@ -742,6 +859,9 @@
         public RequestServer<I> createNewRequest(final RequestClient<O> requestClient) throws RemotingException {
             try {
                 final RequestIdentifier requestIdentifier = protocolHandler.openRequest(contextIdentifier);
+                if (requestIdentifier == null) {
+                    throw new NullPointerException("requestIdentifier is null");
+                }
                 requests.put(requestIdentifier, requestClient);
                 return new ProtocolRequestServerImpl(requestIdentifier);
             } catch (RemotingException e) {

Modified: remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/protocol/LocalProtocolHandler.java
===================================================================
--- remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/protocol/LocalProtocolHandler.java	2008-03-19 07:40:40 UTC (rev 3665)
+++ remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/protocol/LocalProtocolHandler.java	2008-03-19 21:10:28 UTC (rev 3666)
@@ -6,6 +6,8 @@
 import org.jboss.cx.remoting.spi.protocol.RequestIdentifier;
 import org.jboss.cx.remoting.spi.protocol.ServiceIdentifier;
 import org.jboss.cx.remoting.spi.protocol.StreamIdentifier;
+import org.jboss.cx.remoting.spi.protocol.SimpleContextIdentifier;
+import org.jboss.cx.remoting.spi.protocol.SimpleRequestIdentifier;
 import org.jboss.cx.remoting.spi.ObjectMessageOutput;
 import org.jboss.cx.remoting.util.AttributeMap;
 import org.jboss.cx.remoting.RemoteExecutionException;
@@ -17,31 +19,41 @@
  *
  */
 public final class LocalProtocolHandler implements ProtocolHandler {
-    public LocalProtocolHandler(final ProtocolContext context, final URI remoteUri, final AttributeMap attributeMap) {
+    private final ProtocolContext target;
+    private String remoteEndpointName;
+    private static final ContextIdentifier ROOT_IDENTIFIER = new SimpleContextIdentifier();
 
+    public LocalProtocolHandler(final ProtocolContext target, final String remoteEndpointName) {
+        this.target = target;
+        this.remoteEndpointName = remoteEndpointName;
     }
 
     public void sendReply(final ContextIdentifier remoteContextIdentifier, final RequestIdentifier requestIdentifier, final Object reply) throws IOException {
+        target.receiveReply(remoteContextIdentifier, requestIdentifier, reply);
     }
 
     public void sendException(final ContextIdentifier remoteContextIdentifier, final RequestIdentifier requestIdentifier, final RemoteExecutionException exception) throws IOException {
+        target.receiveException(remoteContextIdentifier, requestIdentifier, exception);
     }
 
     public void sendCancelAcknowledge(final ContextIdentifier remoteContextIdentifier, final RequestIdentifier requestIdentifier) throws IOException {
+        target.receiveCancelAcknowledge(remoteContextIdentifier, requestIdentifier);
     }
 
     public void sendServiceClosing(final ServiceIdentifier remoteServiceIdentifier) throws IOException {
+        target.receiveServiceClosing(remoteServiceIdentifier);
     }
 
     public void sendContextClosing(final ContextIdentifier remoteContextIdentifier, final boolean done) throws IOException {
+        target.receiveContextClosing(remoteContextIdentifier, done);
     }
 
     public ContextIdentifier getLocalRootContextIdentifier() {
-        return null;
+        return ROOT_IDENTIFIER;
     }
 
     public ContextIdentifier getRemoteRootContextIdentifier() {
-        return null;
+        return ROOT_IDENTIFIER;
     }
 
     public ContextIdentifier openContext(final ServiceIdentifier serviceIdentifier) throws IOException {
@@ -49,19 +61,23 @@
     }
 
     public void sendContextClose(final ContextIdentifier contextIdentifier, final boolean immediate, final boolean cancel, final boolean interrupt) throws IOException {
+        target.receiveContextClose(contextIdentifier, immediate, cancel, interrupt);
     }
 
     public RequestIdentifier openRequest(final ContextIdentifier contextIdentifier) throws IOException {
-        return null;
+        return new SimpleRequestIdentifier();
     }
 
     public void sendServiceClose(final ServiceIdentifier serviceIdentifier) throws IOException {
+        target.receiveServiceClose(serviceIdentifier);
     }
 
     public void sendRequest(final ContextIdentifier contextIdentifier, final RequestIdentifier requestIdentifier, final Object request, final Executor streamExecutor) throws IOException {
+        target.receiveRequest(contextIdentifier, requestIdentifier, request);
     }
 
     public void sendCancelRequest(final ContextIdentifier contextIdentifier, final RequestIdentifier requestIdentifier, final boolean mayInterrupt) throws IOException {
+        target.receiveCancelRequest(contextIdentifier, requestIdentifier, mayInterrupt);
     }
 
     public ContextIdentifier openContext() throws IOException {
@@ -77,16 +93,19 @@
     }
 
     public void closeStream(final StreamIdentifier streamIdentifier) throws IOException {
+        // N/A
     }
 
     public ObjectMessageOutput sendStreamData(final StreamIdentifier streamIdentifier, final Executor streamExecutor) throws IOException {
+        // N/A
         return null;
     }
 
     public void closeSession() throws IOException {
+        target.closeSession();
     }
 
     public String getRemoteEndpointName() {
-        return null;
+        return remoteEndpointName;
     }
 }

Modified: remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/protocol/LocalProtocolHandlerFactory.java
===================================================================
--- remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/protocol/LocalProtocolHandlerFactory.java	2008-03-19 07:40:40 UTC (rev 3665)
+++ remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/protocol/LocalProtocolHandlerFactory.java	2008-03-19 21:10:28 UTC (rev 3666)
@@ -18,20 +18,38 @@
 public final class LocalProtocolHandlerFactory implements ProtocolHandlerFactory {
     @SuppressWarnings ({"UnusedDeclaration"})
     private final Endpoint endpoint;
+    private final String endpointName;
 
     public LocalProtocolHandlerFactory(final Endpoint endpoint) {
         this.endpoint = endpoint;
+        endpointName = endpoint.getName();
     }
 
-    private static final ConcurrentMap<String, LocalProtocolHandlerFactory> endpoints = CollectionUtil.concurrentMap();
+    private static final ConcurrentMap<String, Endpoint> endpoints = CollectionUtil.concurrentMap();
 
     public boolean isLocal(final URI uri) {
         return true;
     }
 
-    public ProtocolHandler createHandler(final ProtocolContext context, final URI remoteUri, final AttributeMap attributeMap) throws IOException {
-
-        return new LocalProtocolHandler(context, remoteUri, attributeMap);
+    public ProtocolHandler createHandler(final ProtocolContext ourProtocolContext, final URI remoteUri, final AttributeMap attributeMap) throws IOException {
+        final String part = remoteUri.getSchemeSpecificPart();
+        final int index = part.indexOf(':');
+        final String otherEndpointName;
+        if (index == -1) {
+            otherEndpointName = part;
+        } else {
+            otherEndpointName = part.substring(0, index);
+        }
+        final Endpoint otherEndpoint = endpoints.get(otherEndpointName);
+        if (otherEndpoint == null) {
+            throw new RemotingException("No such local endpoint '" + otherEndpoint + "'");
+        }
+        final LocalProtocolHandler otherProtocolHandler = new LocalProtocolHandler(ourProtocolContext, otherEndpointName);
+        final ProtocolContext otherProtocolContext = otherEndpoint.openIncomingSession(otherProtocolHandler);
+        final LocalProtocolHandler ourProtocolHandler = new LocalProtocolHandler(otherProtocolContext, endpointName);
+        otherProtocolContext.openSession(endpointName);
+        ourProtocolContext.openSession(otherEndpointName);
+        return ourProtocolHandler;
     }
 
     public void close() {
@@ -43,6 +61,6 @@
         final LocalProtocolHandlerFactory handlerFactory = new LocalProtocolHandlerFactory(endpoint);
         final Registration registration = endpoint.registerProtocol("local", handlerFactory);
         registration.start();
-        endpoints.putIfAbsent(name, handlerFactory);
+        endpoints.putIfAbsent(name, endpoint);
     }
 }

Modified: remoting3/trunk/standalone/src/main/java/org/jboss/cx/remoting/Remoting.java
===================================================================
--- remoting3/trunk/standalone/src/main/java/org/jboss/cx/remoting/Remoting.java	2008-03-19 07:40:40 UTC (rev 3665)
+++ remoting3/trunk/standalone/src/main/java/org/jboss/cx/remoting/Remoting.java	2008-03-19 21:10:28 UTC (rev 3666)
@@ -13,8 +13,8 @@
 public final class Remoting {
     private static final Logger log = Logger.getLogger(Remoting.class);
 
-    public static Endpoint createEndpoint(String name) throws RemotingException {
-        final CoreEndpoint coreEndpoint = new CoreEndpoint(name);
+    public static <I, O> Endpoint createEndpoint(String name, RequestListener<I, O> listener) throws RemotingException {
+        final CoreEndpoint coreEndpoint = new CoreEndpoint(name, listener);
         final ExecutorService executorService = Executors.newCachedThreadPool();
         coreEndpoint.setExecutor(executorService);
         coreEndpoint.start();




More information about the jboss-remoting-commits mailing list