Author: david.lloyd(a)jboss.com
Date: 2008-06-12 12:34:08 -0400 (Thu, 12 Jun 2008)
New Revision: 4291
Added:
remoting3/trunk/http/src/main/java/org/jboss/cx/remoting/http/impl/
remoting3/trunk/http/src/main/java/org/jboss/cx/remoting/http/impl/Action.java
remoting3/trunk/http/src/main/java/org/jboss/cx/remoting/http/impl/RemotingHttpSession.java
Removed:
remoting3/trunk/http/src/main/java/org/jboss/cx/remoting/http/RemotingHttpSessionContext.java
Modified:
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/Endpoint.java
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/marshal/Marshaller.java
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/wrapper/EndpointWrapper.java
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/stream/ObjectSink.java
remoting3/trunk/build.xml
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/ClientMarker.java
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreEndpoint.java
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreInboundClient.java
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreOutboundClient.java
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreOutboundRequest.java
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreOutboundService.java
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreSession.java
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/protocol/LocalProtocolHandlerFactory.java
remoting3/trunk/jrpp/src/main/java/org/jboss/cx/remoting/jrpp/JrppServer.java
remoting3/trunk/log-jul/src/main/java/org/jboss/cx/remoting/log/Logger.java
remoting3/trunk/mc-deployers/src/main/java/org/jboss/cx/remoting/beans/SessionBean.java
remoting3/trunk/samples/src/main/java/org/jboss/cx/remoting/samples/simple/JrppBasicExampleMain.java
remoting3/trunk/samples/src/main/java/org/jboss/cx/remoting/samples/simple/JrppStreamExampleMain.java
remoting3/trunk/samples/src/main/java/org/jboss/cx/remoting/samples/simple/LocalBasicExampleMain.java
remoting3/trunk/samples/src/main/java/org/jboss/cx/remoting/samples/simple/LocalStreamExampleMain.java
remoting3/trunk/standalone/src/main/java/org/jboss/cx/remoting/Remoting.java
remoting3/trunk/util/src/main/java/org/jboss/cx/remoting/util/ObjectMessageInput.java
Log:
Point commit...
Modified: remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/Endpoint.java
===================================================================
--- remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/Endpoint.java 2008-06-11
22:09:16 UTC (rev 4290)
+++ remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/Endpoint.java 2008-06-12
16:34:08 UTC (rev 4291)
@@ -32,12 +32,13 @@
*
* @param remoteUri the URI of the server to connect to
* @param attributeMap the attribute map to use to configure this session
+ * @param rootListener the root request listener for this end of the session
* @return a new session
*
* @throws RemotingException if there is a problem creating the session, or if the
request or reply type does not
* match the remote service
*/
- Session openSession(URI remoteUri, AttributeMap attributeMap) throws
RemotingException;
+ Session openSession(URI remoteUri, AttributeMap attributeMap, RequestListener<?,
?> rootListener) throws RemotingException;
/**
* Open an inbound session from another endpoint. Used by protocol handlers.
@@ -45,9 +46,10 @@
* You must have the TODO permission to invoke this method.
*
* @param handler the protocol handler to use
+ * @param rootListener the root request listener for this end of the session
* @return the protocol context
*/
- ProtocolContext openIncomingSession(ProtocolHandler handler) throws
RemotingException;
+ ProtocolContext openSession(ProtocolHandler handler, RequestListener<?, ?>
rootListener) throws RemotingException;
/**
* Get the name of this endpoint.
Modified:
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/marshal/Marshaller.java
===================================================================
---
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/marshal/Marshaller.java 2008-06-11
22:09:16 UTC (rev 4290)
+++
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/marshal/Marshaller.java 2008-06-12
16:34:08 UTC (rev 4291)
@@ -9,7 +9,8 @@
/**
* A marshaller/unmarshaller for transmitting data over a wire protocol of some sort.
Each marshaller instance is
- * guaranteed to be used by only one thread. Marshallers are not pooled or reused in any
way.
+ * guaranteed to be used by only one thread. Marshallers are not pooled or reused in any
way. Any pooling of marshallers
+ * must be done by implementations of this class and/or {@link
org.jboss.cx.remoting.spi.marshal.MarshallerFactory}.
*/
public interface Marshaller extends Serializable {
Modified:
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/wrapper/EndpointWrapper.java
===================================================================
---
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/wrapper/EndpointWrapper.java 2008-06-11
22:09:16 UTC (rev 4290)
+++
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/spi/wrapper/EndpointWrapper.java 2008-06-12
16:34:08 UTC (rev 4291)
@@ -29,12 +29,12 @@
return delegate.getAttributes();
}
- public Session openSession(final URI remoteUri, final AttributeMap attributeMap)
throws RemotingException {
- return delegate.openSession(remoteUri, attributeMap);
+ public Session openSession(final URI remoteUri, final AttributeMap attributeMap,
final RequestListener<?, ?> rootListener) throws RemotingException {
+ return delegate.openSession(remoteUri, attributeMap, rootListener);
}
- public ProtocolContext openIncomingSession(final ProtocolHandler handler) throws
RemotingException {
- return delegate.openIncomingSession(handler);
+ public ProtocolContext openSession(final ProtocolHandler handler, final
RequestListener<?, ?> rootListener) throws RemotingException {
+ return delegate.openSession(handler, rootListener);
}
public String getName() {
Modified: remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/stream/ObjectSink.java
===================================================================
---
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/stream/ObjectSink.java 2008-06-11
22:09:16 UTC (rev 4290)
+++
remoting3/trunk/api/src/main/java/org/jboss/cx/remoting/stream/ObjectSink.java 2008-06-12
16:34:08 UTC (rev 4291)
@@ -2,11 +2,12 @@
import java.io.Closeable;
import java.io.IOException;
+import java.io.Flushable;
/**
*
*/
-public interface ObjectSink<T> extends Closeable {
+public interface ObjectSink<T> extends Flushable, Closeable {
void accept(T instance) throws IOException;
/**
Modified: remoting3/trunk/build.xml
===================================================================
--- remoting3/trunk/build.xml 2008-06-11 22:09:16 UTC (rev 4290)
+++ remoting3/trunk/build.xml 2008-06-12 16:34:08 UTC (rev 4291)
@@ -1131,7 +1131,7 @@
<path id="version.classpath">
<pathelement location="version/target/main/classes"/>
</path>
- <java classpathref="version.classpath"
classname="org.jboss.cx.remoting.version.Version"
outputproperty="version"/>
+ <java classpathref="version.classpath"
classname="org.jboss.cx.remoting.version.Verssion"
outputproperty="version"/>
<property name="version" value="UNKNOWN"/>
</target>
Modified: remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/ClientMarker.java
===================================================================
---
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/ClientMarker.java 2008-06-11
22:09:16 UTC (rev 4290)
+++
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/ClientMarker.java 2008-06-12
16:34:08 UTC (rev 4291)
@@ -18,7 +18,7 @@
this.clientIdentifer = clientIdentifer;
}
- public ClientIdentifier getContextIdentifer() {
+ public ClientIdentifier getClientIdentifer() {
return clientIdentifer;
}
Modified: remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreEndpoint.java
===================================================================
---
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreEndpoint.java 2008-06-11
22:09:16 UTC (rev 4290)
+++
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreEndpoint.java 2008-06-12
16:34:08 UTC (rev 4291)
@@ -54,7 +54,6 @@
}
private String name;
- private RequestListener<?, ?> rootListener;
private final AtomicStateMachine<State> state =
AtomicStateMachine.start(State.INITIAL);
private final Set<SessionListener> sessionListeners =
CollectionUtil.synchronizedSet(new LinkedHashSet<SessionListener>());
@@ -96,21 +95,10 @@
return name;
}
- public void setRootListener(final RequestListener<?, ?> rootListener) {
- this.rootListener = rootListener;
- }
-
- public RequestListener<?, ?> getRootListener() {
- return rootListener;
- }
-
// Lifecycle
public void create() {
// todo security check
- if (rootListener == null) {
- throw new NullPointerException("rootListener is null");
- }
}
public void start() {
@@ -132,7 +120,6 @@
}
public void destroy() {
- rootListener = null;
executor = null;
}
@@ -143,7 +130,7 @@
return endpointMap;
}
- public Session openSession(final URI uri, final AttributeMap attributeMap) throws
RemotingException {
+ public Session openSession(final URI uri, final AttributeMap attributeMap, final
RequestListener<?, ?> rootListener) throws RemotingException {
if (uri == null) {
throw new NullPointerException("uri is null");
}
@@ -184,11 +171,11 @@
}
}
- public ProtocolContext openIncomingSession(final ProtocolHandler handler) throws
RemotingException {
+ public ProtocolContext openSession(final ProtocolHandler handler, final
RequestListener<?, ?> rootListener) throws RemotingException {
state.requireHold(State.UP);
try {
final CoreSession session = new CoreSession(CoreEndpoint.this);
- session.initializeServer(handler, createClient(rootListener));
+ session.initializeServer(handler, rootListener == null ? null :
createClient(rootListener));
sessions.add(session);
return session.getProtocolContext();
} finally {
@@ -216,7 +203,7 @@
public <I, O> Client<I, O> createClient(RequestListener<I, O>
requestListener) {
final CoreInboundClient<I, O> inbound = new CoreInboundClient<I,
O>(requestListener, executor);
final CoreOutboundClient<I, O> outbound = new CoreOutboundClient<I,
O>(executor);
- inbound.initialize(outbound.getContextClient());
+ inbound.initialize(outbound.getClientInitiator());
outbound.initialize(inbound.getClientResponder());
return outbound.getUserContext();
}
Modified:
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreInboundClient.java
===================================================================
---
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreInboundClient.java 2008-06-11
22:09:16 UTC (rev 4290)
+++
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreInboundClient.java 2008-06-12
16:34:08 UTC (rev 4291)
@@ -62,7 +62,9 @@
this.clientInitiator = clientInitiator;
state.releaseDowngrade();
try {
- requestListener.handleClientOpen(clientContext);
+ if (requestListener != null) {
+ requestListener.handleClientOpen(clientContext);
+ }
} finally {
state.release();
}
Modified:
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreOutboundClient.java
===================================================================
---
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreOutboundClient.java 2008-06-11
22:09:16 UTC (rev 4290)
+++
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreOutboundClient.java 2008-06-12
16:34:08 UTC (rev 4291)
@@ -58,7 +58,7 @@
return userClient;
}
- ClientInitiator getContextClient() {
+ ClientInitiator getClientInitiator() {
return clientInitiator;
}
@@ -73,6 +73,13 @@
}
}
+ void invalidate() {
+ // An extra instance was mistakenly created; we'll kill this one and quietly
step away
+ state.transition(State.DOWN);
+ userClient = null;
+ clientResponder = null;
+ }
+
public final class UserClient extends AbstractRealClient<I, O> {
private UserClient() {
@@ -128,9 +135,9 @@
try {
final QueueExecutor queueExecutor = new QueueExecutor();
final CoreOutboundRequest<I, O> outboundRequest = new
CoreOutboundRequest<I, O>();
- final RequestResponder<I> requestTerminus =
clientResponder.createNewRequest(outboundRequest.getReplier());
- outboundRequest.setRequester(requestTerminus);
- requestTerminus.handleRequest(request, queueExecutor);
+ final RequestResponder<I> requestResponder =
clientResponder.createNewRequest(outboundRequest.getReplier());
+ outboundRequest.setRequestResponder(requestResponder);
+ requestResponder.handleRequest(request, queueExecutor);
final FutureReply<O> futureReply =
outboundRequest.getFutureReply();
futureReply.addCompletionNotifier(new RequestCompletionHandler<O>()
{
public void notifyComplete(final FutureReply<O> futureReply) {
@@ -149,7 +156,7 @@
try {
final CoreOutboundRequest<I, O> outboundRequest = new
CoreOutboundRequest<I, O>();
final RequestResponder<I> requestTerminus =
clientResponder.createNewRequest(outboundRequest.getReplier());
- outboundRequest.setRequester(requestTerminus);
+ outboundRequest.setRequestResponder(requestTerminus);
requestTerminus.handleRequest(request, executor);
return outboundRequest.getFutureReply();
} finally {
Modified:
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreOutboundRequest.java
===================================================================
---
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreOutboundRequest.java 2008-06-11
22:09:16 UTC (rev 4290)
+++
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreOutboundRequest.java 2008-06-12
16:34:08 UTC (rev 4291)
@@ -41,7 +41,7 @@
return requestResponder;
}
- public void setRequester(final RequestResponder<I> requestResponder) {
+ public void setRequestResponder(final RequestResponder<I> requestResponder) {
this.requestResponder = requestResponder;
}
Modified:
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreOutboundService.java
===================================================================
---
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreOutboundService.java 2008-06-11
22:09:16 UTC (rev 4290)
+++
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreOutboundService.java 2008-06-12
16:34:08 UTC (rev 4291)
@@ -107,7 +107,7 @@
public Client<I, O> createContext() throws RemotingException {
final CoreOutboundClient<I, O> client = new CoreOutboundClient<I,
O>(executor);
- final ClientResponder<I, O> clientResponder =
serviceResponder.createNewClient(client.getContextClient());
+ final ClientResponder<I, O> clientResponder =
serviceResponder.createNewClient(client.getClientInitiator());
client.initialize(clientResponder);
return client.getUserContext();
}
Modified: remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreSession.java
===================================================================
---
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreSession.java 2008-06-11
22:09:16 UTC (rev 4290)
+++
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/CoreSession.java 2008-06-12
16:34:08 UTC (rev 4291)
@@ -36,6 +36,8 @@
import org.jboss.cx.remoting.spi.stream.StreamDetector;
import org.jboss.cx.remoting.spi.stream.StreamSerializer;
import org.jboss.cx.remoting.spi.stream.StreamSerializerFactory;
+import org.jboss.cx.remoting.spi.marshal.ObjectResolver;
+import org.jboss.cx.remoting.spi.marshal.MarshallerFactory;
import org.jboss.cx.remoting.util.AtomicStateMachine;
import org.jboss.cx.remoting.util.AttributeMap;
import org.jboss.cx.remoting.util.CollectionUtil;
@@ -58,6 +60,8 @@
// stream serialization detectors - immutable (for now?)
private final List<StreamDetector> streamDetectors;
+ private List<ObjectResolver> resolvers = new
ArrayList<ObjectResolver>();
+ private MarshallerFactory marshallerFactory;
// Contexts and services that are available on the remote end of this session
// In these paris, the Server points to the ProtocolHandler, and the Client points
to...whatever
@@ -85,6 +89,7 @@
private Client<?, ?> rootClient;
private final AtomicStateMachine<State> state =
AtomicStateMachine.start(State.NEW);
+ private ObjectResolver resolver; // todo - initialize to a composite resolver
// Constructors
@@ -107,6 +112,23 @@
}
}
+ public MarshallerFactory getMarshallerFactory() {
+ return marshallerFactory;
+ }
+
+ public void setMarshallerFactory(final MarshallerFactory marshallerFactory) {
+ this.marshallerFactory = marshallerFactory;
+ }
+
+ public void addFirstResolver(ObjectResolver resolver) {
+ resolvers.add(0, resolver);
+ }
+
+ public void addLastResolver(ObjectResolver resolver) {
+ resolvers.add(resolver);
+ }
+
+
// Initializers
private <I, O> void doInitialize(final ProtocolHandler protocolHandler, final
Client<I, O> rootClient) {
@@ -132,7 +154,7 @@
}
final ProtocolClientResponderImpl<I, O> contextServer = new
ProtocolClientResponderImpl<I,O>(remoteIdentifier);
final CoreOutboundClient<I, O> coreOutboundClient = new
CoreOutboundClient<I, O>(executor);
- clientContexts.put(remoteIdentifier, new ClientContextPair<I,
O>(coreOutboundClient.getContextClient(), contextServer, remoteIdentifier));
+ clientContexts.put(remoteIdentifier, new ClientContextPair<I,
O>(coreOutboundClient.getClientInitiator(), contextServer, remoteIdentifier));
coreOutboundClient.initialize(contextServer);
this.rootClient = coreOutboundClient.getUserContext();
log.trace("Initialized session with remote context %s",
remoteIdentifier);
@@ -291,7 +313,7 @@
if (target == null) {
throw new NullPointerException("target is null");
}
- return new ObjectMessageOutputImpl(target, streamDetectors,
endpoint.getOrderedExecutor());
+ return marshallerFactory.createRootMarshaller(resolver,
getClass().getClassLoader() /* todo this is WRONG */).getMessageOutput(target);
}
public ObjectMessageOutput getMessageOutput(ByteMessageOutput target, Executor
streamExecutor) throws IOException {
@@ -301,14 +323,14 @@
if (streamExecutor == null) {
throw new NullPointerException("streamExecutor is null");
}
- return new ObjectMessageOutputImpl(target, streamDetectors, streamExecutor);
+ return marshallerFactory.createRootMarshaller(resolver,
getClass().getClassLoader() /* todo this is WRONG */).getMessageOutput(target);
}
public ObjectMessageInput getMessageInput(ByteMessageInput source) throws
IOException {
if (source == null) {
throw new NullPointerException("source is null");
}
- return new ObjectMessageInputImpl(source);
+ return marshallerFactory.createRootMarshaller(resolver,
getClass().getClassLoader() /* todo this is WRONG */).getMessageInput(source);
}
public String getLocalEndpointName() {
@@ -548,223 +570,6 @@
}
}
- // message output
-
- private final class ObjectMessageOutputImpl extends JBossObjectOutputStream
implements ObjectMessageOutput {
- private final ByteMessageOutput target;
- private final List<StreamDetector> streamDetectors;
- private final List<StreamSerializer> streamSerializers = new
ArrayList<StreamSerializer>();
- private final Executor streamExecutor;
-
- private ObjectMessageOutputImpl(final ByteMessageOutput target, final
List<StreamDetector> streamDetectors, final Executor streamExecutor) throws
IOException {
- super(new OutputStream() {
- public void write(int b) throws IOException {
- target.write(b);
- }
-
- public void write(byte b[]) throws IOException {
- target.write(b);
- }
-
- public void write(byte b[], int off, int len) throws IOException {
- target.write(b, off, len);
- }
-
- public void flush() throws IOException {
- target.flush();
- }
-
- public void close() throws IOException {
- target.close();
- }
- }, true);
- if (target == null) {
- throw new NullPointerException("target is null");
- }
- if (streamDetectors == null) {
- throw new NullPointerException("streamDetectors is null");
- }
- if (streamExecutor == null) {
- throw new NullPointerException("streamExecutor is null");
- }
- enableReplaceObject(true);
- this.target = target;
- this.streamDetectors = streamDetectors;
- this.streamExecutor = streamExecutor;
- }
-
- public void commit() throws IOException {
- close();
- target.commit();
- for (StreamSerializer serializer : streamSerializers) {
- try {
- serializer.handleOpen();
- } catch (Exception ex) {
- // todo - log
- }
- }
- streamSerializers.clear();
- }
-
- public int getBytesWritten() throws IOException {
- flush();
- return target.getBytesWritten();
- }
-
- private final <I, O> ClientMarker doContextReplace(ClientResponder<I,
O> clientResponder) throws IOException {
- final ClientIdentifier clientIdentifier = protocolHandler.openClient();
- final ProtocolClientInitiatorImpl<I, O> contextClient = new
ProtocolClientInitiatorImpl<I, O>(clientIdentifier);
- new ServerContextPair<I, O>(contextClient, clientResponder);
- return new ClientMarker(clientIdentifier);
- }
-
- private final <I, O> ClientSourceMarker
doContextSourceReplace(ServiceResponder<I, O> serviceResponder) throws IOException
{
- final ServiceIdentifier serviceIdentifier = protocolHandler.openService();
- final ProtocolServiceInitiatorImpl serviceClient = new
ProtocolServiceInitiatorImpl(serviceIdentifier);
- new ServerServicePair<I, O>(serviceClient, serviceResponder);
- return new ClientSourceMarker(serviceIdentifier);
- }
-
- protected Object replaceObject(Object obj) throws IOException {
- final Object testObject = super.replaceObject(obj);
- if (testObject instanceof AbstractRealClient) {
- return doContextReplace(((AbstractRealClient<?, ?>)
obj).getContextServer());
- } else if (testObject instanceof AbstractRealClientSource) {
- return doContextSourceReplace(((AbstractRealClientSource<?, ?>)
obj).getServiceServer());
- }
- for (StreamDetector detector : streamDetectors) {
- final StreamSerializerFactory factory =
detector.detectStream(testObject);
- if (factory != null) {
- final StreamIdentifier streamIdentifier =
protocolHandler.openStream();
- if (streamIdentifier == null) {
- throw new NullPointerException("streamIdentifier is
null");
- }
- final CoreStream stream = new CoreStream(CoreSession.this,
streamExecutor, streamIdentifier, factory, testObject);
- if (streams.putIfAbsent(streamIdentifier, stream) != null) {
- throw new IOException("Duplicate stream identifier
encountered: " + streamIdentifier);
- }
- streamSerializers.add(stream.getStreamSerializer());
- log.trace("Writing stream marker for object: %s",
testObject);
- return new StreamMarker(factory.getClass(), streamIdentifier);
- }
- }
- return testObject;
- }
- }
-
- // message input
-
- private final class ObjectInputImpl extends JBossObjectInputStream {
-
- private ClassLoader classLoader;
-
- public ObjectInputImpl(final InputStream is) throws IOException {
- super(is);
- enableResolveObject(true);
- }
-
- public Object resolveObject(Object obj) throws IOException {
- final Object testObject = super.resolveObject(obj);
- if (testObject instanceof StreamMarker) {
- StreamMarker marker = (StreamMarker) testObject;
- final StreamIdentifier streamIdentifier = marker.getStreamIdentifier();
- if (streamIdentifier == null) {
- throw new NullPointerException("streamIdentifier is
null");
- }
- final StreamSerializerFactory streamSerializerFactory;
- try {
- streamSerializerFactory = marker.getFactoryClass().newInstance();
- } catch (InstantiationException e) {
- throw new IOException("Failed to instantiate a stream: " +
e);
- } catch (IllegalAccessException e) {
- throw new IOException("Failed to instantiate a stream: " +
e);
- }
- final CoreStream stream = new CoreStream(CoreSession.this,
endpoint.getOrderedExecutor(), streamIdentifier, streamSerializerFactory);
- if (streams.putIfAbsent(streamIdentifier, stream) != null) {
- throw new IOException("Duplicate stream received");
- }
- return stream.getRemoteSerializer().getRemoteInstance();
- } else {
- return testObject;
- }
- }
-
- protected Class<?> resolveClass(ObjectStreamClass desc) throws IOException,
ClassNotFoundException {
- final String name = desc.getName();
- if (classLoader != null) {
- if (primitiveTypes.containsKey(name)) {
- return primitiveTypes.get(name);
- } else {
- return Class.forName(name, false, classLoader);
- }
- } else {
- return super.resolveClass(desc);
- }
- }
-
- protected Class<?> resolveProxyClass(String[] interfaces) throws
IOException, ClassNotFoundException {
- return super.resolveProxyClass(interfaces);
- }
-
- public Object readObject(final ClassLoader loader) throws ClassNotFoundException,
IOException {
- classLoader = loader;
- try {
- return readObject();
- } finally {
- classLoader = null;
- }
- }
- }
-
- private final class ObjectMessageInputImpl extends DelegatingObjectInput implements
ObjectMessageInput {
- private CoreSession.ObjectInputImpl objectInput;
-
- private ObjectMessageInputImpl(final ObjectInputImpl objectInput) throws
IOException {
- super(objectInput);
- this.objectInput = objectInput;
- }
-
- private ObjectMessageInputImpl(final ByteMessageInput source) throws IOException
{
- this(new ObjectInputImpl(new InputStream() {
- public int read(byte b[]) throws IOException {
- return source.read(b);
- }
-
- public int read(byte b[], int off, int len) throws IOException {
- return source.read(b, off, len);
- }
-
- public int read() throws IOException {
- return source.read();
- }
-
- public void close() throws IOException {
- source.close();
- }
-
- public int available() throws IOException {
- return source.remaining();
- }
- }));
- }
-
- public Object readObject() throws ClassNotFoundException, IOException {
- return objectInput.readObject();
- }
-
- public Object readObject(ClassLoader loader) throws ClassNotFoundException,
IOException {
- return objectInput.readObject(loader);
- }
-
- public int remaining() {
- try {
- return objectInput.available();
- } catch (IOException e) {
- throw new IllegalStateException("Available failed", e);
- }
- }
- }
-
private final class WeakProtocolContextServerReference<I, O> extends
WeakReference<ProtocolClientResponderImpl<I, O>> {
private final ClientContextPair<I, O> contextPair;
@@ -1034,22 +839,4 @@
}
}
}
-
- private static final Map<String, Class<?>> primitiveTypes = new
HashMap<String, Class<?>>();
-
- private static <T> void add(Class<T> type) {
- primitiveTypes.put(type.getName(), type);
- }
-
- static {
- add(void.class);
- add(boolean.class);
- add(byte.class);
- add(short.class);
- add(int.class);
- add(long.class);
- add(float.class);
- add(double.class);
- add(char.class);
- }
}
Modified:
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/protocol/LocalProtocolHandlerFactory.java
===================================================================
---
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/protocol/LocalProtocolHandlerFactory.java 2008-06-11
22:09:16 UTC (rev 4290)
+++
remoting3/trunk/core/src/main/java/org/jboss/cx/remoting/core/protocol/LocalProtocolHandlerFactory.java 2008-06-12
16:34:08 UTC (rev 4291)
@@ -45,7 +45,7 @@
throw new RemotingException("No such local endpoint '" +
otherEndpoint + "'");
}
final LocalProtocolHandler otherProtocolHandler = new
LocalProtocolHandler(ourProtocolContext, otherEndpointName);
- final ProtocolContext otherProtocolContext =
otherEndpoint.openIncomingSession(otherProtocolHandler);
+ final ProtocolContext otherProtocolContext =
otherEndpoint.openSession(otherProtocolHandler, null);
final LocalProtocolHandler ourProtocolHandler = new
LocalProtocolHandler(otherProtocolContext, endpointName);
otherProtocolContext.receiveRemoteSideReady(endpointName);
ourProtocolContext.receiveRemoteSideReady(otherEndpointName);
Deleted:
remoting3/trunk/http/src/main/java/org/jboss/cx/remoting/http/RemotingHttpSessionContext.java
===================================================================
---
remoting3/trunk/http/src/main/java/org/jboss/cx/remoting/http/RemotingHttpSessionContext.java 2008-06-11
22:09:16 UTC (rev 4290)
+++
remoting3/trunk/http/src/main/java/org/jboss/cx/remoting/http/RemotingHttpSessionContext.java 2008-06-12
16:34:08 UTC (rev 4291)
@@ -1,14 +0,0 @@
-package org.jboss.cx.remoting.http;
-
-/**
- *
- */
-public interface RemotingHttpSessionContext {
-
- /**
- * Get a channel context that can be used to transport HTTP messages for this
session.
- *
- * @return the channel context
- */
- RemotingHttpChannelContext getChannelContext();
-}
Added: remoting3/trunk/http/src/main/java/org/jboss/cx/remoting/http/impl/Action.java
===================================================================
--- remoting3/trunk/http/src/main/java/org/jboss/cx/remoting/http/impl/Action.java
(rev 0)
+++
remoting3/trunk/http/src/main/java/org/jboss/cx/remoting/http/impl/Action.java 2008-06-12
16:34:08 UTC (rev 4291)
@@ -0,0 +1,8 @@
+package org.jboss.cx.remoting.http.impl;
+
+/**
+ *
+ */
+public interface Action {
+
+}
Added:
remoting3/trunk/http/src/main/java/org/jboss/cx/remoting/http/impl/RemotingHttpSession.java
===================================================================
---
remoting3/trunk/http/src/main/java/org/jboss/cx/remoting/http/impl/RemotingHttpSession.java
(rev 0)
+++
remoting3/trunk/http/src/main/java/org/jboss/cx/remoting/http/impl/RemotingHttpSession.java 2008-06-12
16:34:08 UTC (rev 4291)
@@ -0,0 +1,31 @@
+package org.jboss.cx.remoting.http.impl;
+
+import java.util.Queue;
+import org.jboss.cx.remoting.util.CollectionUtil;
+import org.jboss.cx.remoting.util.ByteMessageInput;
+import org.jboss.cx.remoting.http.RemotingHttpChannelContext;
+import org.jboss.cx.remoting.http.HttpMessageWriter;
+
+/**
+ *
+ */
+public final class RemotingHttpSession {
+ private final Queue<Action> outboundQueue = CollectionUtil.linkedList();
+
+ private final class ChannelContext implements RemotingHttpChannelContext {
+
+ public void processInboundMessage(final ByteMessageInput input) {
+ }
+
+ public HttpMessageWriter waitForOutgoingHttpMessage(final int millis) {
+ synchronized (outboundQueue) {
+ if (outboundQueue.element() != null) {
+ while (! outboundQueue.isEmpty()) {
+ Action action = outboundQueue.remove();
+ }
+ }
+ }
+ return null;
+ }
+ }
+}
Modified: remoting3/trunk/jrpp/src/main/java/org/jboss/cx/remoting/jrpp/JrppServer.java
===================================================================
---
remoting3/trunk/jrpp/src/main/java/org/jboss/cx/remoting/jrpp/JrppServer.java 2008-06-11
22:09:16 UTC (rev 4290)
+++
remoting3/trunk/jrpp/src/main/java/org/jboss/cx/remoting/jrpp/JrppServer.java 2008-06-12
16:34:08 UTC (rev 4291)
@@ -15,6 +15,7 @@
import org.apache.mina.handler.multiton.SingleSessionIoHandlerFactory;
import org.apache.mina.transport.socket.nio.NioSocketAcceptor;
import org.jboss.cx.remoting.Endpoint;
+import org.jboss.cx.remoting.RequestListener;
import org.jboss.cx.remoting.jrpp.mina.FramingIoFilter;
import org.jboss.cx.remoting.spi.protocol.ProtocolContext;
import org.jboss.cx.remoting.util.AttributeMap;
@@ -31,6 +32,8 @@
private SocketAddress socketAddress;
/** Protocol support object. Set before {@code create}. */
private JrppProtocolSupport protocolSupport;
+ /** Root request listener. Set before {@code create}. */
+ private RequestListener<?, ?> rootListener;
// calculated properties
@@ -79,6 +82,14 @@
this.endpoint = endpoint;
}
+ public RequestListener<?, ?> getRootListener() {
+ return rootListener;
+ }
+
+ public void setRootListener(final RequestListener<?, ?> rootListener) {
+ this.rootListener = rootListener;
+ }
+
// Lifecycle
@SuppressWarnings ({"unchecked"})
@@ -118,7 +129,7 @@
public SingleSessionIoHandler getHandler(IoSession ioSession) throws IOException
{
final JrppConnection connection = new JrppConnection(attributeMap);
connection.initializeServer(ioSession);
- final ProtocolContext protocolContext =
endpoint.openIncomingSession(connection.getProtocolHandler());
+ final ProtocolContext protocolContext =
endpoint.openSession(connection.getProtocolHandler(), rootListener);
connection.start(protocolContext);
return connection.getIoHandler();
}
Modified: remoting3/trunk/log-jul/src/main/java/org/jboss/cx/remoting/log/Logger.java
===================================================================
--- remoting3/trunk/log-jul/src/main/java/org/jboss/cx/remoting/log/Logger.java 2008-06-11
22:09:16 UTC (rev 4290)
+++ remoting3/trunk/log-jul/src/main/java/org/jboss/cx/remoting/log/Logger.java 2008-06-12
16:34:08 UTC (rev 4291)
@@ -8,6 +8,9 @@
*/
public final class Logger {
public static final class Level extends java.util.logging.Level {
+
+ private static final long serialVersionUID = 9150446594030531854L;
+
protected Level(final String name, final int value) {
super(name, value);
}
Modified:
remoting3/trunk/mc-deployers/src/main/java/org/jboss/cx/remoting/beans/SessionBean.java
===================================================================
---
remoting3/trunk/mc-deployers/src/main/java/org/jboss/cx/remoting/beans/SessionBean.java 2008-06-11
22:09:16 UTC (rev 4290)
+++
remoting3/trunk/mc-deployers/src/main/java/org/jboss/cx/remoting/beans/SessionBean.java 2008-06-12
16:34:08 UTC (rev 4291)
@@ -47,7 +47,7 @@
}
public void start() throws RemotingException {
- session = endpoint.openSession(destination, attributeMap);
+ session = endpoint.openSession(destination, attributeMap, null);
}
public void stop() throws RemotingException {
Modified:
remoting3/trunk/samples/src/main/java/org/jboss/cx/remoting/samples/simple/JrppBasicExampleMain.java
===================================================================
---
remoting3/trunk/samples/src/main/java/org/jboss/cx/remoting/samples/simple/JrppBasicExampleMain.java 2008-06-11
22:09:16 UTC (rev 4290)
+++
remoting3/trunk/samples/src/main/java/org/jboss/cx/remoting/samples/simple/JrppBasicExampleMain.java 2008-06-12
16:34:08 UTC (rev 4291)
@@ -23,11 +23,11 @@
public static void main(String[] args) throws IOException, RemoteExecutionException,
URISyntaxException {
Security.addProvider(new Provider());
final StringRot13RequestListener listener = new StringRot13RequestListener();
- final Endpoint endpoint = Remoting.createEndpoint("simple", listener);
+ final Endpoint endpoint = Remoting.createEndpoint("simple");
try {
- final JrppServer jrppServer = Remoting.addJrppServer(endpoint, new
InetSocketAddress(12345), AttributeMap.EMPTY);
+ final JrppServer jrppServer = Remoting.addJrppServer(endpoint, new
InetSocketAddress(12345), listener, AttributeMap.EMPTY);
try {
- Session session = endpoint.openSession(new
URI("jrpp://localhost:12345"), AttributeMap.EMPTY);
+ Session session = endpoint.openSession(new
URI("jrpp://localhost:12345"), AttributeMap.EMPTY, null);
try {
final Client<String,String> client = session.getRootClient();
try {
Modified:
remoting3/trunk/samples/src/main/java/org/jboss/cx/remoting/samples/simple/JrppStreamExampleMain.java
===================================================================
---
remoting3/trunk/samples/src/main/java/org/jboss/cx/remoting/samples/simple/JrppStreamExampleMain.java 2008-06-11
22:09:16 UTC (rev 4290)
+++
remoting3/trunk/samples/src/main/java/org/jboss/cx/remoting/samples/simple/JrppStreamExampleMain.java 2008-06-12
16:34:08 UTC (rev 4291)
@@ -26,11 +26,11 @@
public static void main(String[] args) throws IOException, RemoteExecutionException,
URISyntaxException {
Security.addProvider(new Provider());
final StreamingRot13RequestListener listener = new
StreamingRot13RequestListener();
- final Endpoint endpoint = Remoting.createEndpoint("simple", listener);
+ final Endpoint endpoint = Remoting.createEndpoint("simple");
try {
- final JrppServer jrppServer = Remoting.addJrppServer(endpoint, new
InetSocketAddress(12345), AttributeMap.EMPTY);
+ final JrppServer jrppServer = Remoting.addJrppServer(endpoint, new
InetSocketAddress(12345), listener, AttributeMap.EMPTY);
try {
- Session session = endpoint.openSession(new
URI("jrpp://localhost:12345"), AttributeMap.EMPTY);
+ Session session = endpoint.openSession(new
URI("jrpp://localhost:12345"), AttributeMap.EMPTY, listener);
try {
final Client<Reader,Reader> client = session.getRootClient();
try {
Modified:
remoting3/trunk/samples/src/main/java/org/jboss/cx/remoting/samples/simple/LocalBasicExampleMain.java
===================================================================
---
remoting3/trunk/samples/src/main/java/org/jboss/cx/remoting/samples/simple/LocalBasicExampleMain.java 2008-06-11
22:09:16 UTC (rev 4290)
+++
remoting3/trunk/samples/src/main/java/org/jboss/cx/remoting/samples/simple/LocalBasicExampleMain.java 2008-06-12
16:34:08 UTC (rev 4291)
@@ -17,7 +17,7 @@
public static void main(String[] args) throws IOException, RemoteExecutionException
{
Security.addProvider(new Provider());
final StringRot13RequestListener listener = new StringRot13RequestListener();
- final Endpoint endpoint = Remoting.createEndpoint("simple", listener);
+ final Endpoint endpoint = Remoting.createEndpoint("simple");
try {
final Client<String,String> client = endpoint.createClient(listener);
try {
Modified:
remoting3/trunk/samples/src/main/java/org/jboss/cx/remoting/samples/simple/LocalStreamExampleMain.java
===================================================================
---
remoting3/trunk/samples/src/main/java/org/jboss/cx/remoting/samples/simple/LocalStreamExampleMain.java 2008-06-11
22:09:16 UTC (rev 4290)
+++
remoting3/trunk/samples/src/main/java/org/jboss/cx/remoting/samples/simple/LocalStreamExampleMain.java 2008-06-12
16:34:08 UTC (rev 4291)
@@ -20,7 +20,7 @@
public static void main(String[] args) throws IOException, RemoteExecutionException
{
Security.addProvider(new Provider());
final StreamingRot13RequestListener listener = new
StreamingRot13RequestListener();
- final Endpoint endpoint = Remoting.createEndpoint("simple", listener);
+ final Endpoint endpoint = Remoting.createEndpoint("simple");
try {
final Client<Reader,Reader> client = endpoint.createClient(listener);
try {
Modified: remoting3/trunk/standalone/src/main/java/org/jboss/cx/remoting/Remoting.java
===================================================================
---
remoting3/trunk/standalone/src/main/java/org/jboss/cx/remoting/Remoting.java 2008-06-11
22:09:16 UTC (rev 4290)
+++
remoting3/trunk/standalone/src/main/java/org/jboss/cx/remoting/Remoting.java 2008-06-12
16:34:08 UTC (rev 4291)
@@ -21,12 +21,11 @@
// lifecycle lock
private static final Object lifecycle = new Object();
- public static <I, O> Endpoint createEndpoint(String name, RequestListener<I,
O> listener) throws IOException {
+ public static <I, O> Endpoint createEndpoint(String name) throws IOException {
synchronized (lifecycle) {
boolean ok = false;
final CoreEndpoint coreEndpoint = new CoreEndpoint();
coreEndpoint.setName(name);
- coreEndpoint.setRootListener(listener);
coreEndpoint.create();
try {
coreEndpoint.start();
@@ -81,7 +80,7 @@
}
}
- public static JrppServer addJrppServer(Endpoint endpoint, SocketAddress address,
AttributeMap attributeMap) throws IOException {
+ public static JrppServer addJrppServer(Endpoint endpoint, SocketAddress address,
RequestListener<?, ?> rootRequestListener, AttributeMap attributeMap) throws
IOException {
synchronized (lifecycle) {
boolean ok = false;
final JrppServer jrppServer = new JrppServer();
@@ -89,6 +88,7 @@
jrppServer.setSocketAddress(address);
jrppServer.setAttributeMap(attributeMap);
jrppServer.setEndpoint(endpoint);
+ jrppServer.setRootListener(rootRequestListener);
jrppServer.create();
try {
jrppServer.start();
Modified:
remoting3/trunk/util/src/main/java/org/jboss/cx/remoting/util/ObjectMessageInput.java
===================================================================
---
remoting3/trunk/util/src/main/java/org/jboss/cx/remoting/util/ObjectMessageInput.java 2008-06-11
22:09:16 UTC (rev 4290)
+++
remoting3/trunk/util/src/main/java/org/jboss/cx/remoting/util/ObjectMessageInput.java 2008-06-12
16:34:08 UTC (rev 4291)
@@ -16,14 +16,4 @@
* @throws IOException if an I/O error occurs
*/
Object readObject() throws ClassNotFoundException, IOException;
-
- /**
- * Read an object using the given classloader.
- *
- * @param loader the classloader to use
- * @return the object from the message
- * @throws ClassNotFoundException if the class of the object could not be resolved by
the classloader
- * @throws IOException if an I/O error occurs
- */
- Object readObject(ClassLoader loader) throws ClassNotFoundException, IOException;
}