[jboss-remoting-commits] JBoss Remoting SVN: r4291 - in remoting3/trunk: api/src/main/java/org/jboss/cx/remoting and 13 other directories.

jboss-remoting-commits at lists.jboss.org jboss-remoting-commits at lists.jboss.org
Thu Jun 12 12:34:09 EDT 2008


Author: david.lloyd at jboss.com
Date: 2008-06-12 12:34:08 -0400 (Thu, 12 Jun 2008)
New Revision: 4291

Added:
   remoting3/trunk/http/src/main/java/org/jboss/cx/remoting/http/impl/
   remoting3/trunk/http/src/main/java/org/jboss/cx/remoting/http/impl/Action.java
   remoting3/trunk/http/src/main/java/org/jboss/cx/remoting/http/impl/RemotingHttpSession.java
Removed:
   remoting3/trunk/http/src/main/java/org/jboss/cx/remoting/http/RemotingHttpSessionContext.java
Modified:
   remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/Endpoint.java
   remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/marshal/Marshaller.java
   remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/wrapper/EndpointWrapper.java
   remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/stream/ObjectSink.java
   remoting3/trunk/build.xml
   remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/ClientMarker.java
   remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreEndpoint.java
   remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreInboundClient.java
   remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreOutboundClient.java
   remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreOutboundRequest.java
   remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreOutboundService.java
   remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreSession.java
   remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/protocol/LocalProtocolHandlerFactory.java
   remoting3/trunk/jrpp/src/main/java/org/jboss/cx/remoting/jrpp/JrppServer.java
   remoting3/trunk/log-jul/src/main/java/org/jboss/cx/remoting/log/Logger.java
   remoting3/trunk/mc-deployers/src/main/java/org/jboss/cx/remoting/beans/SessionBean.java
   remoting3/trunk/samples/src/main/java/org/jboss/cx/remoting/samples/simple/JrppBasicExampleMain.java
   remoting3/trunk/samples/src/main/java/org/jboss/cx/remoting/samples/simple/JrppStreamExampleMain.java
   remoting3/trunk/samples/src/main/java/org/jboss/cx/remoting/samples/simple/LocalBasicExampleMain.java
   remoting3/trunk/samples/src/main/java/org/jboss/cx/remoting/samples/simple/LocalStreamExampleMain.java
   remoting3/trunk/standalone/src/main/java/org/jboss/cx/remoting/Remoting.java
   remoting3/trunk/util/src/main/java/org/jboss/cx/remoting/util/ObjectMessageInput.java
Log:
Point commit...

Modified: remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/Endpoint.java
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/Endpoint.java	2008-06-11 22:09:16 UTC (rev 4290)
+++ remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/Endpoint.java	2008-06-12 16:34:08 UTC (rev 4291)
@@ -32,12 +32,13 @@
      *
      * @param remoteUri the URI of the server to connect to
      * @param attributeMap the attribute map to use to configure this session
+     * @param rootListener the root request listener for this end of the session
      * @return a new session
      *
      * @throws RemotingException if there is a problem creating the session, or if the request or reply type does not
      * match the remote service
      */
-    Session openSession(URI remoteUri, AttributeMap attributeMap) throws RemotingException;
+    Session openSession(URI remoteUri, AttributeMap attributeMap, RequestListener<?, ?> rootListener) throws RemotingException;
 
     /**
      * Open an inbound session from another endpoint.  Used by protocol handlers.
@@ -45,9 +46,10 @@
      * You must have the TODO permission to invoke this method.
      *
      * @param handler the protocol handler to use
+     * @param rootListener the root request listener for this end of the session
      * @return the protocol context
      */
-    ProtocolContext openIncomingSession(ProtocolHandler handler) throws RemotingException;
+    ProtocolContext openSession(ProtocolHandler handler, RequestListener<?, ?> rootListener) throws RemotingException;
 
     /**
      * Get the name of this endpoint.

Modified: remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/marshal/Marshaller.java
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/marshal/Marshaller.java	2008-06-11 22:09:16 UTC (rev 4290)
+++ remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/marshal/Marshaller.java	2008-06-12 16:34:08 UTC (rev 4291)
@@ -9,7 +9,8 @@
 
 /**
  * A marshaller/unmarshaller for transmitting data over a wire protocol of some sort.  Each marshaller instance is
- * guaranteed to be used by only one thread.  Marshallers are not pooled or reused in any way.
+ * guaranteed to be used by only one thread.  Marshallers are not pooled or reused in any way.  Any pooling of marshallers
+ * must be done by implementations of this class and/or {@link org.jboss.cx.remoting.spi.marshal.MarshallerFactory}.
  */
 public interface Marshaller extends Serializable {
 

Modified: remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/wrapper/EndpointWrapper.java
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/wrapper/EndpointWrapper.java	2008-06-11 22:09:16 UTC (rev 4290)
+++ remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/wrapper/EndpointWrapper.java	2008-06-12 16:34:08 UTC (rev 4291)
@@ -29,12 +29,12 @@
         return delegate.getAttributes();
     }
 
-    public Session openSession(final URI remoteUri, final AttributeMap attributeMap) throws RemotingException {
-        return delegate.openSession(remoteUri, attributeMap);
+    public Session openSession(final URI remoteUri, final AttributeMap attributeMap, final RequestListener<?, ?> rootListener) throws RemotingException {
+        return delegate.openSession(remoteUri, attributeMap, rootListener);
     }
 
-    public ProtocolContext openIncomingSession(final ProtocolHandler handler) throws RemotingException {
-        return delegate.openIncomingSession(handler);
+    public ProtocolContext openSession(final ProtocolHandler handler, final RequestListener<?, ?> rootListener) throws RemotingException {
+        return delegate.openSession(handler, rootListener);
     }
 
     public String getName() {

Modified: remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/stream/ObjectSink.java
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/stream/ObjectSink.java	2008-06-11 22:09:16 UTC (rev 4290)
+++ remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/stream/ObjectSink.java	2008-06-12 16:34:08 UTC (rev 4291)
@@ -2,11 +2,12 @@
 
 import java.io.Closeable;
 import java.io.IOException;
+import java.io.Flushable;
 
 /**
  *
  */
-public interface ObjectSink<T> extends Closeable {
+public interface ObjectSink<T> extends Flushable, Closeable {
     void accept(T instance) throws IOException;
 
     /**

Modified: remoting3/trunk/build.xml
===================================================================
--- remoting3/trunk/build.xml	2008-06-11 22:09:16 UTC (rev 4290)
+++ remoting3/trunk/build.xml	2008-06-12 16:34:08 UTC (rev 4291)
@@ -1131,7 +1131,7 @@
         <path id="version.classpath">
             <pathelement location="version/target/main/classes"/>
         </path>
-        <java classpathref="version.classpath" classname="org.jboss.cx.remoting.version.Version" outputproperty="version"/>
+        <java classpathref="version.classpath" classname="org.jboss.cx.remoting.version.Verssion" outputproperty="version"/>
         <property name="version" value="UNKNOWN"/>
     </target>
 

Modified: remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/ClientMarker.java
===================================================================
--- remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/ClientMarker.java	2008-06-11 22:09:16 UTC (rev 4290)
+++ remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/ClientMarker.java	2008-06-12 16:34:08 UTC (rev 4291)
@@ -18,7 +18,7 @@
         this.clientIdentifer = clientIdentifer;
     }
 
-    public ClientIdentifier getContextIdentifer() {
+    public ClientIdentifier getClientIdentifer() {
         return clientIdentifer;
     }
 

Modified: remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreEndpoint.java
===================================================================
--- remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreEndpoint.java	2008-06-11 22:09:16 UTC (rev 4290)
+++ remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreEndpoint.java	2008-06-12 16:34:08 UTC (rev 4291)
@@ -54,7 +54,6 @@
     }
 
     private String name;
-    private RequestListener<?, ?> rootListener;
 
     private final AtomicStateMachine<State> state = AtomicStateMachine.start(State.INITIAL);
     private final Set<SessionListener> sessionListeners = CollectionUtil.synchronizedSet(new LinkedHashSet<SessionListener>());
@@ -96,21 +95,10 @@
         return name;
     }
 
-    public void setRootListener(final RequestListener<?, ?> rootListener) {
-        this.rootListener = rootListener;
-    }
-
-    public RequestListener<?, ?> getRootListener() {
-        return rootListener;
-    }
-
     // Lifecycle
 
     public void create() {
         // todo security check
-        if (rootListener == null) {
-            throw new NullPointerException("rootListener is null");
-        }
     }
 
     public void start() {
@@ -132,7 +120,6 @@
     }
 
     public void destroy() {
-        rootListener = null;
         executor = null;
     }
 
@@ -143,7 +130,7 @@
         return endpointMap;
     }
 
-    public Session openSession(final URI uri, final AttributeMap attributeMap) throws RemotingException {
+    public Session openSession(final URI uri, final AttributeMap attributeMap, final RequestListener<?, ?> rootListener) throws RemotingException {
         if (uri == null) {
             throw new NullPointerException("uri is null");
         }
@@ -184,11 +171,11 @@
         }
     }
 
-    public ProtocolContext openIncomingSession(final ProtocolHandler handler) throws RemotingException {
+    public ProtocolContext openSession(final ProtocolHandler handler, final RequestListener<?, ?> rootListener) throws RemotingException {
         state.requireHold(State.UP);
         try {
             final CoreSession session = new CoreSession(CoreEndpoint.this);
-            session.initializeServer(handler, createClient(rootListener));
+            session.initializeServer(handler, rootListener == null ? null : createClient(rootListener));
             sessions.add(session);
             return session.getProtocolContext();
         } finally {
@@ -216,7 +203,7 @@
     public <I, O> Client<I, O> createClient(RequestListener<I, O> requestListener) {
         final CoreInboundClient<I, O> inbound = new CoreInboundClient<I, O>(requestListener, executor);
         final CoreOutboundClient<I, O> outbound = new CoreOutboundClient<I, O>(executor);
-        inbound.initialize(outbound.getContextClient());
+        inbound.initialize(outbound.getClientInitiator());
         outbound.initialize(inbound.getClientResponder());
         return outbound.getUserContext();
     }

Modified: remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreInboundClient.java
===================================================================
--- remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreInboundClient.java	2008-06-11 22:09:16 UTC (rev 4290)
+++ remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreInboundClient.java	2008-06-12 16:34:08 UTC (rev 4291)
@@ -62,7 +62,9 @@
         this.clientInitiator = clientInitiator;
         state.releaseDowngrade();
         try {
-            requestListener.handleClientOpen(clientContext);
+            if (requestListener != null) {
+                requestListener.handleClientOpen(clientContext);
+            }
         } finally {
             state.release();
         }

Modified: remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreOutboundClient.java
===================================================================
--- remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreOutboundClient.java	2008-06-11 22:09:16 UTC (rev 4290)
+++ remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreOutboundClient.java	2008-06-12 16:34:08 UTC (rev 4291)
@@ -58,7 +58,7 @@
         return userClient;
     }
 
-    ClientInitiator getContextClient() {
+    ClientInitiator getClientInitiator() {
         return clientInitiator;
     }
 
@@ -73,6 +73,13 @@
         }
     }
 
+    void invalidate() {
+        // An extra instance was mistakenly created; we'll kill this one and quietly step away
+        state.transition(State.DOWN);
+        userClient = null;
+        clientResponder = null;
+    }
+
     public final class UserClient extends AbstractRealClient<I, O> {
 
         private UserClient() {
@@ -128,9 +135,9 @@
             try {
                 final QueueExecutor queueExecutor = new QueueExecutor();
                 final CoreOutboundRequest<I, O> outboundRequest = new CoreOutboundRequest<I, O>();
-                final RequestResponder<I> requestTerminus = clientResponder.createNewRequest(outboundRequest.getReplier());
-                outboundRequest.setRequester(requestTerminus);
-                requestTerminus.handleRequest(request, queueExecutor);
+                final RequestResponder<I> requestResponder = clientResponder.createNewRequest(outboundRequest.getReplier());
+                outboundRequest.setRequestResponder(requestResponder);
+                requestResponder.handleRequest(request, queueExecutor);
                 final FutureReply<O> futureReply = outboundRequest.getFutureReply();
                 futureReply.addCompletionNotifier(new RequestCompletionHandler<O>() {
                     public void notifyComplete(final FutureReply<O> futureReply) {
@@ -149,7 +156,7 @@
             try {
                 final CoreOutboundRequest<I, O> outboundRequest = new CoreOutboundRequest<I, O>();
                 final RequestResponder<I> requestTerminus = clientResponder.createNewRequest(outboundRequest.getReplier());
-                outboundRequest.setRequester(requestTerminus);
+                outboundRequest.setRequestResponder(requestTerminus);
                 requestTerminus.handleRequest(request, executor);
                 return outboundRequest.getFutureReply();
             } finally {

Modified: remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreOutboundRequest.java
===================================================================
--- remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreOutboundRequest.java	2008-06-11 22:09:16 UTC (rev 4290)
+++ remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreOutboundRequest.java	2008-06-12 16:34:08 UTC (rev 4291)
@@ -41,7 +41,7 @@
         return requestResponder;
     }
 
-    public void setRequester(final RequestResponder<I> requestResponder) {
+    public void setRequestResponder(final RequestResponder<I> requestResponder) {
         this.requestResponder = requestResponder;
     }
 

Modified: remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreOutboundService.java
===================================================================
--- remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreOutboundService.java	2008-06-11 22:09:16 UTC (rev 4290)
+++ remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreOutboundService.java	2008-06-12 16:34:08 UTC (rev 4291)
@@ -107,7 +107,7 @@
 
         public Client<I, O> createContext() throws RemotingException {
             final CoreOutboundClient<I, O> client = new CoreOutboundClient<I, O>(executor);
-            final ClientResponder<I, O> clientResponder = serviceResponder.createNewClient(client.getContextClient());
+            final ClientResponder<I, O> clientResponder = serviceResponder.createNewClient(client.getClientInitiator());
             client.initialize(clientResponder);
             return client.getUserContext();
         }

Modified: remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreSession.java
===================================================================
--- remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreSession.java	2008-06-11 22:09:16 UTC (rev 4290)
+++ remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreSession.java	2008-06-12 16:34:08 UTC (rev 4291)
@@ -36,6 +36,8 @@
 import org.jboss.cx.remoting.spi.stream.StreamDetector;
 import org.jboss.cx.remoting.spi.stream.StreamSerializer;
 import org.jboss.cx.remoting.spi.stream.StreamSerializerFactory;
+import org.jboss.cx.remoting.spi.marshal.ObjectResolver;
+import org.jboss.cx.remoting.spi.marshal.MarshallerFactory;
 import org.jboss.cx.remoting.util.AtomicStateMachine;
 import org.jboss.cx.remoting.util.AttributeMap;
 import org.jboss.cx.remoting.util.CollectionUtil;
@@ -58,6 +60,8 @@
 
     // stream serialization detectors - immutable (for now?)
     private final List<StreamDetector> streamDetectors;
+    private List<ObjectResolver> resolvers = new ArrayList<ObjectResolver>();
+    private MarshallerFactory marshallerFactory;
 
     // Contexts and services that are available on the remote end of this session
     // In these paris, the Server points to the ProtocolHandler, and the Client points to...whatever
@@ -85,6 +89,7 @@
     private Client<?, ?> rootClient;
 
     private final AtomicStateMachine<State> state = AtomicStateMachine.start(State.NEW);
+    private ObjectResolver resolver; // todo - initialize to a composite resolver
 
     // Constructors
 
@@ -107,6 +112,23 @@
         }
     }
 
+    public MarshallerFactory getMarshallerFactory() {
+        return marshallerFactory;
+    }
+
+    public void setMarshallerFactory(final MarshallerFactory marshallerFactory) {
+        this.marshallerFactory = marshallerFactory;
+    }
+
+    public void addFirstResolver(ObjectResolver resolver) {
+        resolvers.add(0, resolver);
+    }
+
+    public void addLastResolver(ObjectResolver resolver) {
+        resolvers.add(resolver);
+    }
+
+
     // Initializers
 
     private <I, O> void doInitialize(final ProtocolHandler protocolHandler, final Client<I, O> rootClient) {
@@ -132,7 +154,7 @@
         }
         final ProtocolClientResponderImpl<I, O> contextServer = new ProtocolClientResponderImpl<I,O>(remoteIdentifier);
         final CoreOutboundClient<I, O> coreOutboundClient = new CoreOutboundClient<I, O>(executor);
-        clientContexts.put(remoteIdentifier, new ClientContextPair<I, O>(coreOutboundClient.getContextClient(), contextServer, remoteIdentifier));
+        clientContexts.put(remoteIdentifier, new ClientContextPair<I, O>(coreOutboundClient.getClientInitiator(), contextServer, remoteIdentifier));
         coreOutboundClient.initialize(contextServer);
         this.rootClient = coreOutboundClient.getUserContext();
         log.trace("Initialized session with remote context %s", remoteIdentifier);
@@ -291,7 +313,7 @@
             if (target == null) {
                 throw new NullPointerException("target is null");
             }
-            return new ObjectMessageOutputImpl(target, streamDetectors, endpoint.getOrderedExecutor());
+            return marshallerFactory.createRootMarshaller(resolver, getClass().getClassLoader() /* todo this is WRONG */).getMessageOutput(target);
         }
 
         public ObjectMessageOutput getMessageOutput(ByteMessageOutput target, Executor streamExecutor) throws IOException {
@@ -301,14 +323,14 @@
             if (streamExecutor == null) {
                 throw new NullPointerException("streamExecutor is null");
             }
-            return new ObjectMessageOutputImpl(target, streamDetectors, streamExecutor);
+            return marshallerFactory.createRootMarshaller(resolver, getClass().getClassLoader() /* todo this is WRONG */).getMessageOutput(target);
         }
 
         public ObjectMessageInput getMessageInput(ByteMessageInput source) throws IOException {
             if (source == null) {
                 throw new NullPointerException("source is null");
             }
-            return new ObjectMessageInputImpl(source);
+            return marshallerFactory.createRootMarshaller(resolver, getClass().getClassLoader() /* todo this is WRONG */).getMessageInput(source);
         }
 
         public String getLocalEndpointName() {
@@ -548,223 +570,6 @@
         }
     }
 
-    // message output
-
-    private final class ObjectMessageOutputImpl extends JBossObjectOutputStream implements ObjectMessageOutput {
-        private final ByteMessageOutput target;
-        private final List<StreamDetector> streamDetectors;
-        private final List<StreamSerializer> streamSerializers = new ArrayList<StreamSerializer>();
-        private final Executor streamExecutor;
-
-        private ObjectMessageOutputImpl(final ByteMessageOutput target, final List<StreamDetector> streamDetectors, final Executor streamExecutor) throws IOException {
-            super(new OutputStream() {
-                public void write(int b) throws IOException {
-                    target.write(b);
-                }
-
-                public void write(byte b[]) throws IOException {
-                    target.write(b);
-                }
-
-                public void write(byte b[], int off, int len) throws IOException {
-                    target.write(b, off, len);
-                }
-
-                public void flush() throws IOException {
-                    target.flush();
-                }
-
-                public void close() throws IOException {
-                    target.close();
-                }
-            }, true);
-            if (target == null) {
-                throw new NullPointerException("target is null");
-            }
-            if (streamDetectors == null) {
-                throw new NullPointerException("streamDetectors is null");
-            }
-            if (streamExecutor == null) {
-                throw new NullPointerException("streamExecutor is null");
-            }
-            enableReplaceObject(true);
-            this.target = target;
-            this.streamDetectors = streamDetectors;
-            this.streamExecutor = streamExecutor;
-        }
-
-        public void commit() throws IOException {
-            close();
-            target.commit();
-            for (StreamSerializer serializer : streamSerializers) {
-                try {
-                    serializer.handleOpen();
-                } catch (Exception ex) {
-                    // todo - log
-                }
-            }
-            streamSerializers.clear();
-        }
-
-        public int getBytesWritten() throws IOException {
-            flush();
-            return target.getBytesWritten();
-        }
-
-        private final <I, O> ClientMarker doContextReplace(ClientResponder<I, O> clientResponder) throws IOException {
-            final ClientIdentifier clientIdentifier = protocolHandler.openClient();
-            final ProtocolClientInitiatorImpl<I, O> contextClient = new ProtocolClientInitiatorImpl<I, O>(clientIdentifier);
-            new ServerContextPair<I, O>(contextClient, clientResponder);
-            return new ClientMarker(clientIdentifier);
-        }
-
-        private final <I, O> ClientSourceMarker doContextSourceReplace(ServiceResponder<I, O> serviceResponder) throws IOException {
-            final ServiceIdentifier serviceIdentifier = protocolHandler.openService();
-            final ProtocolServiceInitiatorImpl serviceClient = new ProtocolServiceInitiatorImpl(serviceIdentifier);
-            new ServerServicePair<I, O>(serviceClient, serviceResponder);
-            return new ClientSourceMarker(serviceIdentifier);
-        }
-
-        protected Object replaceObject(Object obj) throws IOException {
-            final Object testObject = super.replaceObject(obj);
-            if (testObject instanceof AbstractRealClient) {
-                return doContextReplace(((AbstractRealClient<?, ?>) obj).getContextServer());
-            } else if (testObject instanceof AbstractRealClientSource) {
-                return doContextSourceReplace(((AbstractRealClientSource<?, ?>) obj).getServiceServer());
-            }
-            for (StreamDetector detector : streamDetectors) {
-                final StreamSerializerFactory factory = detector.detectStream(testObject);
-                if (factory != null) {
-                    final StreamIdentifier streamIdentifier = protocolHandler.openStream();
-                    if (streamIdentifier == null) {
-                        throw new NullPointerException("streamIdentifier is null");
-                    }
-                    final CoreStream stream = new CoreStream(CoreSession.this, streamExecutor, streamIdentifier, factory, testObject);
-                    if (streams.putIfAbsent(streamIdentifier, stream) != null) {
-                        throw new IOException("Duplicate stream identifier encountered: " + streamIdentifier);
-                    }
-                    streamSerializers.add(stream.getStreamSerializer());
-                    log.trace("Writing stream marker for object: %s", testObject);
-                    return new StreamMarker(factory.getClass(), streamIdentifier);
-                }
-            }
-            return testObject;
-        }
-    }
-
-    // message input
-
-    private final class ObjectInputImpl extends JBossObjectInputStream {
-
-        private ClassLoader classLoader;
-
-        public ObjectInputImpl(final InputStream is) throws IOException {
-            super(is);
-            enableResolveObject(true);
-        }
-
-        public Object resolveObject(Object obj) throws IOException {
-            final Object testObject = super.resolveObject(obj);
-            if (testObject instanceof StreamMarker) {
-                StreamMarker marker = (StreamMarker) testObject;
-                final StreamIdentifier streamIdentifier = marker.getStreamIdentifier();
-                if (streamIdentifier == null) {
-                    throw new NullPointerException("streamIdentifier is null");
-                }
-                final StreamSerializerFactory streamSerializerFactory;
-                try {
-                    streamSerializerFactory = marker.getFactoryClass().newInstance();
-                } catch (InstantiationException e) {
-                    throw new IOException("Failed to instantiate a stream: " + e);
-                } catch (IllegalAccessException e) {
-                    throw new IOException("Failed to instantiate a stream: " + e);
-                }
-                final CoreStream stream = new CoreStream(CoreSession.this, endpoint.getOrderedExecutor(), streamIdentifier, streamSerializerFactory);
-                if (streams.putIfAbsent(streamIdentifier, stream) != null) {
-                    throw new IOException("Duplicate stream received");
-                }
-                return stream.getRemoteSerializer().getRemoteInstance();
-            } else {
-                return testObject;
-            }
-        }
-
-        protected Class<?> resolveClass(ObjectStreamClass desc) throws IOException, ClassNotFoundException {
-            final String name = desc.getName();
-            if (classLoader != null) {
-                if (primitiveTypes.containsKey(name)) {
-                    return primitiveTypes.get(name);
-                } else {
-                    return Class.forName(name, false, classLoader);
-                }
-            } else {
-                return super.resolveClass(desc);
-            }
-        }
-
-        protected Class<?> resolveProxyClass(String[] interfaces) throws IOException, ClassNotFoundException {
-            return super.resolveProxyClass(interfaces);
-        }
-
-        public Object readObject(final ClassLoader loader) throws ClassNotFoundException, IOException {
-            classLoader = loader;
-            try {
-                return readObject();
-            } finally {
-                classLoader = null;
-            }
-        }
-    }
-
-    private final class ObjectMessageInputImpl extends DelegatingObjectInput implements ObjectMessageInput {
-        private CoreSession.ObjectInputImpl objectInput;
-
-        private ObjectMessageInputImpl(final ObjectInputImpl objectInput) throws IOException {
-            super(objectInput);
-            this.objectInput = objectInput;
-        }
-
-        private ObjectMessageInputImpl(final ByteMessageInput source) throws IOException {
-            this(new ObjectInputImpl(new InputStream() {
-                public int read(byte b[]) throws IOException {
-                    return source.read(b);
-                }
-
-                public int read(byte b[], int off, int len) throws IOException {
-                    return source.read(b, off, len);
-                }
-
-                public int read() throws IOException {
-                    return source.read();
-                }
-
-                public void close() throws IOException {
-                    source.close();
-                }
-
-                public int available() throws IOException {
-                    return source.remaining();
-                }
-            }));
-        }
-
-        public Object readObject() throws ClassNotFoundException, IOException {
-            return objectInput.readObject();
-        }
-
-        public Object readObject(ClassLoader loader) throws ClassNotFoundException, IOException {
-            return objectInput.readObject(loader);
-        }
-
-        public int remaining() {
-            try {
-                return objectInput.available();
-            } catch (IOException e) {
-                throw new IllegalStateException("Available failed", e);
-            }
-        }
-    }
-
     private final class WeakProtocolContextServerReference<I, O> extends WeakReference<ProtocolClientResponderImpl<I, O>> {
         private final ClientContextPair<I, O> contextPair;
 
@@ -1034,22 +839,4 @@
             }
         }
     }
-
-    private static final Map<String, Class<?>> primitiveTypes = new HashMap<String, Class<?>>();
-
-    private static <T> void add(Class<T> type) {
-        primitiveTypes.put(type.getName(), type);
-    }
-
-    static {
-        add(void.class);
-        add(boolean.class);
-        add(byte.class);
-        add(short.class);
-        add(int.class);
-        add(long.class);
-        add(float.class);
-        add(double.class);
-        add(char.class);
-    }
 }

Modified: remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/protocol/LocalProtocolHandlerFactory.java
===================================================================
--- remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/protocol/LocalProtocolHandlerFactory.java	2008-06-11 22:09:16 UTC (rev 4290)
+++ remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/protocol/LocalProtocolHandlerFactory.java	2008-06-12 16:34:08 UTC (rev 4291)
@@ -45,7 +45,7 @@
             throw new RemotingException("No such local endpoint '" + otherEndpoint + "'");
         }
         final LocalProtocolHandler otherProtocolHandler = new LocalProtocolHandler(ourProtocolContext, otherEndpointName);
-        final ProtocolContext otherProtocolContext = otherEndpoint.openIncomingSession(otherProtocolHandler);
+        final ProtocolContext otherProtocolContext = otherEndpoint.openSession(otherProtocolHandler, null);
         final LocalProtocolHandler ourProtocolHandler = new LocalProtocolHandler(otherProtocolContext, endpointName);
         otherProtocolContext.receiveRemoteSideReady(endpointName);
         ourProtocolContext.receiveRemoteSideReady(otherEndpointName);

Deleted: remoting3/trunk/http/src/main/java/org/jboss/cx/remoting/http/RemotingHttpSessionContext.java
===================================================================
--- remoting3/trunk/http/src/main/java/org/jboss/cx/remoting/http/RemotingHttpSessionContext.java	2008-06-11 22:09:16 UTC (rev 4290)
+++ remoting3/trunk/http/src/main/java/org/jboss/cx/remoting/http/RemotingHttpSessionContext.java	2008-06-12 16:34:08 UTC (rev 4291)
@@ -1,14 +0,0 @@
-package org.jboss.cx.remoting.http;
-
-/**
- *
- */
-public interface RemotingHttpSessionContext {
-
-    /**
-     * Get a channel context that can be used to transport HTTP messages for this session.
-     *
-     * @return the channel context
-     */
-    RemotingHttpChannelContext getChannelContext();
-}

Added: remoting3/trunk/http/src/main/java/org/jboss/cx/remoting/http/impl/Action.java
===================================================================
--- remoting3/trunk/http/src/main/java/org/jboss/cx/remoting/http/impl/Action.java	                        (rev 0)
+++ remoting3/trunk/http/src/main/java/org/jboss/cx/remoting/http/impl/Action.java	2008-06-12 16:34:08 UTC (rev 4291)
@@ -0,0 +1,8 @@
+package org.jboss.cx.remoting.http.impl;
+
+/**
+ *
+ */
+public interface Action {
+
+}

Added: remoting3/trunk/http/src/main/java/org/jboss/cx/remoting/http/impl/RemotingHttpSession.java
===================================================================
--- remoting3/trunk/http/src/main/java/org/jboss/cx/remoting/http/impl/RemotingHttpSession.java	                        (rev 0)
+++ remoting3/trunk/http/src/main/java/org/jboss/cx/remoting/http/impl/RemotingHttpSession.java	2008-06-12 16:34:08 UTC (rev 4291)
@@ -0,0 +1,31 @@
+package org.jboss.cx.remoting.http.impl;
+
+import java.util.Queue;
+import org.jboss.cx.remoting.util.CollectionUtil;
+import org.jboss.cx.remoting.util.ByteMessageInput;
+import org.jboss.cx.remoting.http.RemotingHttpChannelContext;
+import org.jboss.cx.remoting.http.HttpMessageWriter;
+
+/**
+ *
+ */
+public final class RemotingHttpSession {
+    private final Queue<Action> outboundQueue = CollectionUtil.linkedList();
+
+    private final class ChannelContext implements RemotingHttpChannelContext {
+
+        public void processInboundMessage(final ByteMessageInput input) {
+        }
+
+        public HttpMessageWriter waitForOutgoingHttpMessage(final int millis) {
+            synchronized (outboundQueue) {
+                if (outboundQueue.element() != null) {
+                    while (! outboundQueue.isEmpty()) {
+                        Action action = outboundQueue.remove();
+                    }
+                }
+            }
+            return null;
+        }
+    }
+}

Modified: remoting3/trunk/jrpp/src/main/java/org/jboss/cx/remoting/jrpp/JrppServer.java
===================================================================
--- remoting3/trunk/jrpp/src/main/java/org/jboss/cx/remoting/jrpp/JrppServer.java	2008-06-11 22:09:16 UTC (rev 4290)
+++ remoting3/trunk/jrpp/src/main/java/org/jboss/cx/remoting/jrpp/JrppServer.java	2008-06-12 16:34:08 UTC (rev 4291)
@@ -15,6 +15,7 @@
 import org.apache.mina.handler.multiton.SingleSessionIoHandlerFactory;
 import org.apache.mina.transport.socket.nio.NioSocketAcceptor;
 import org.jboss.cx.remoting.Endpoint;
+import org.jboss.cx.remoting.RequestListener;
 import org.jboss.cx.remoting.jrpp.mina.FramingIoFilter;
 import org.jboss.cx.remoting.spi.protocol.ProtocolContext;
 import org.jboss.cx.remoting.util.AttributeMap;
@@ -31,6 +32,8 @@
     private SocketAddress socketAddress;
     /** Protocol support object.  Set before {@code create}. */
     private JrppProtocolSupport protocolSupport;
+    /** Root request listener.  Set before {@code create}. */
+    private RequestListener<?, ?> rootListener;
 
     // calculated properties
 
@@ -79,6 +82,14 @@
         this.endpoint = endpoint;
     }
 
+    public RequestListener<?, ?> getRootListener() {
+        return rootListener;
+    }
+
+    public void setRootListener(final RequestListener<?, ?> rootListener) {
+        this.rootListener = rootListener;
+    }
+
     // Lifecycle
 
     @SuppressWarnings ({"unchecked"})
@@ -118,7 +129,7 @@
         public SingleSessionIoHandler getHandler(IoSession ioSession) throws IOException {
             final JrppConnection connection = new JrppConnection(attributeMap);
             connection.initializeServer(ioSession);
-            final ProtocolContext protocolContext = endpoint.openIncomingSession(connection.getProtocolHandler());
+            final ProtocolContext protocolContext = endpoint.openSession(connection.getProtocolHandler(), rootListener);
             connection.start(protocolContext);
             return connection.getIoHandler();
         }

Modified: remoting3/trunk/log-jul/src/main/java/org/jboss/cx/remoting/log/Logger.java
===================================================================
--- remoting3/trunk/log-jul/src/main/java/org/jboss/cx/remoting/log/Logger.java	2008-06-11 22:09:16 UTC (rev 4290)
+++ remoting3/trunk/log-jul/src/main/java/org/jboss/cx/remoting/log/Logger.java	2008-06-12 16:34:08 UTC (rev 4291)
@@ -8,6 +8,9 @@
  */
 public final class Logger {
     public static final class Level extends java.util.logging.Level {
+
+        private static final long serialVersionUID = 9150446594030531854L;
+
         protected Level(final String name, final int value) {
             super(name, value);
         }

Modified: remoting3/trunk/mc-deployers/src/main/java/org/jboss/cx/remoting/beans/SessionBean.java
===================================================================
--- remoting3/trunk/mc-deployers/src/main/java/org/jboss/cx/remoting/beans/SessionBean.java	2008-06-11 22:09:16 UTC (rev 4290)
+++ remoting3/trunk/mc-deployers/src/main/java/org/jboss/cx/remoting/beans/SessionBean.java	2008-06-12 16:34:08 UTC (rev 4291)
@@ -47,7 +47,7 @@
     }
 
     public void start() throws RemotingException {
-        session = endpoint.openSession(destination, attributeMap);
+        session = endpoint.openSession(destination, attributeMap, null);
     }
 
     public void stop() throws RemotingException {

Modified: remoting3/trunk/samples/src/main/java/org/jboss/cx/remoting/samples/simple/JrppBasicExampleMain.java
===================================================================
--- remoting3/trunk/samples/src/main/java/org/jboss/cx/remoting/samples/simple/JrppBasicExampleMain.java	2008-06-11 22:09:16 UTC (rev 4290)
+++ remoting3/trunk/samples/src/main/java/org/jboss/cx/remoting/samples/simple/JrppBasicExampleMain.java	2008-06-12 16:34:08 UTC (rev 4291)
@@ -23,11 +23,11 @@
     public static void main(String[] args) throws IOException, RemoteExecutionException, URISyntaxException {
         Security.addProvider(new Provider());
         final StringRot13RequestListener listener = new StringRot13RequestListener();
-        final Endpoint endpoint = Remoting.createEndpoint("simple", listener);
+        final Endpoint endpoint = Remoting.createEndpoint("simple");
         try {
-            final JrppServer jrppServer = Remoting.addJrppServer(endpoint, new InetSocketAddress(12345), AttributeMap.EMPTY);
+            final JrppServer jrppServer = Remoting.addJrppServer(endpoint, new InetSocketAddress(12345), listener, AttributeMap.EMPTY);
             try {
-                Session session = endpoint.openSession(new URI("jrpp://localhost:12345"), AttributeMap.EMPTY);
+                Session session = endpoint.openSession(new URI("jrpp://localhost:12345"), AttributeMap.EMPTY, null);
                 try {
                     final Client<String,String> client = session.getRootClient();
                     try {

Modified: remoting3/trunk/samples/src/main/java/org/jboss/cx/remoting/samples/simple/JrppStreamExampleMain.java
===================================================================
--- remoting3/trunk/samples/src/main/java/org/jboss/cx/remoting/samples/simple/JrppStreamExampleMain.java	2008-06-11 22:09:16 UTC (rev 4290)
+++ remoting3/trunk/samples/src/main/java/org/jboss/cx/remoting/samples/simple/JrppStreamExampleMain.java	2008-06-12 16:34:08 UTC (rev 4291)
@@ -26,11 +26,11 @@
     public static void main(String[] args) throws IOException, RemoteExecutionException, URISyntaxException {
         Security.addProvider(new Provider());
         final StreamingRot13RequestListener listener = new StreamingRot13RequestListener();
-        final Endpoint endpoint = Remoting.createEndpoint("simple", listener);
+        final Endpoint endpoint = Remoting.createEndpoint("simple");
         try {
-            final JrppServer jrppServer = Remoting.addJrppServer(endpoint, new InetSocketAddress(12345), AttributeMap.EMPTY);
+            final JrppServer jrppServer = Remoting.addJrppServer(endpoint, new InetSocketAddress(12345), listener, AttributeMap.EMPTY);
             try {
-                Session session = endpoint.openSession(new URI("jrpp://localhost:12345"), AttributeMap.EMPTY);
+                Session session = endpoint.openSession(new URI("jrpp://localhost:12345"), AttributeMap.EMPTY, listener);
                 try {
                     final Client<Reader,Reader> client = session.getRootClient();
                     try {

Modified: remoting3/trunk/samples/src/main/java/org/jboss/cx/remoting/samples/simple/LocalBasicExampleMain.java
===================================================================
--- remoting3/trunk/samples/src/main/java/org/jboss/cx/remoting/samples/simple/LocalBasicExampleMain.java	2008-06-11 22:09:16 UTC (rev 4290)
+++ remoting3/trunk/samples/src/main/java/org/jboss/cx/remoting/samples/simple/LocalBasicExampleMain.java	2008-06-12 16:34:08 UTC (rev 4291)
@@ -17,7 +17,7 @@
     public static void main(String[] args) throws IOException, RemoteExecutionException {
         Security.addProvider(new Provider());
         final StringRot13RequestListener listener = new StringRot13RequestListener();
-        final Endpoint endpoint = Remoting.createEndpoint("simple", listener);
+        final Endpoint endpoint = Remoting.createEndpoint("simple");
         try {
             final Client<String,String> client = endpoint.createClient(listener);
             try {

Modified: remoting3/trunk/samples/src/main/java/org/jboss/cx/remoting/samples/simple/LocalStreamExampleMain.java
===================================================================
--- remoting3/trunk/samples/src/main/java/org/jboss/cx/remoting/samples/simple/LocalStreamExampleMain.java	2008-06-11 22:09:16 UTC (rev 4290)
+++ remoting3/trunk/samples/src/main/java/org/jboss/cx/remoting/samples/simple/LocalStreamExampleMain.java	2008-06-12 16:34:08 UTC (rev 4291)
@@ -20,7 +20,7 @@
     public static void main(String[] args) throws IOException, RemoteExecutionException {
         Security.addProvider(new Provider());
         final StreamingRot13RequestListener listener = new StreamingRot13RequestListener();
-        final Endpoint endpoint = Remoting.createEndpoint("simple", listener);
+        final Endpoint endpoint = Remoting.createEndpoint("simple");
         try {
             final Client<Reader,Reader> client = endpoint.createClient(listener);
             try {

Modified: remoting3/trunk/standalone/src/main/java/org/jboss/cx/remoting/Remoting.java
===================================================================
--- remoting3/trunk/standalone/src/main/java/org/jboss/cx/remoting/Remoting.java	2008-06-11 22:09:16 UTC (rev 4290)
+++ remoting3/trunk/standalone/src/main/java/org/jboss/cx/remoting/Remoting.java	2008-06-12 16:34:08 UTC (rev 4291)
@@ -21,12 +21,11 @@
     // lifecycle lock
     private static final Object lifecycle = new Object();
 
-    public static <I, O> Endpoint createEndpoint(String name, RequestListener<I, O> listener) throws IOException {
+    public static <I, O> Endpoint createEndpoint(String name) throws IOException {
         synchronized (lifecycle) {
             boolean ok = false;
             final CoreEndpoint coreEndpoint = new CoreEndpoint();
             coreEndpoint.setName(name);
-            coreEndpoint.setRootListener(listener);
             coreEndpoint.create();
             try {
                 coreEndpoint.start();
@@ -81,7 +80,7 @@
         }
     }
 
-    public static JrppServer addJrppServer(Endpoint endpoint, SocketAddress address, AttributeMap attributeMap) throws IOException {
+    public static JrppServer addJrppServer(Endpoint endpoint, SocketAddress address, RequestListener<?, ?> rootRequestListener, AttributeMap attributeMap) throws IOException {
         synchronized (lifecycle) {
             boolean ok = false;
             final JrppServer jrppServer = new JrppServer();
@@ -89,6 +88,7 @@
             jrppServer.setSocketAddress(address);
             jrppServer.setAttributeMap(attributeMap);
             jrppServer.setEndpoint(endpoint);
+            jrppServer.setRootListener(rootRequestListener);
             jrppServer.create();
             try {
                 jrppServer.start();

Modified: remoting3/trunk/util/src/main/java/org/jboss/cx/remoting/util/ObjectMessageInput.java
===================================================================
--- remoting3/trunk/util/src/main/java/org/jboss/cx/remoting/util/ObjectMessageInput.java	2008-06-11 22:09:16 UTC (rev 4290)
+++ remoting3/trunk/util/src/main/java/org/jboss/cx/remoting/util/ObjectMessageInput.java	2008-06-12 16:34:08 UTC (rev 4291)
@@ -16,14 +16,4 @@
      * @throws IOException if an I/O error occurs
      */
     Object readObject() throws ClassNotFoundException, IOException;
-
-    /**
-     * Read an object using the given classloader.
-     *
-     * @param loader the classloader to use
-     * @return the object from the message
-     * @throws ClassNotFoundException if the class of the object could not be resolved by the classloader
-     * @throws IOException if an I/O error occurs
-     */
-    Object readObject(ClassLoader loader) throws ClassNotFoundException, IOException;
 }




More information about the jboss-remoting-commits mailing list